Category

Python

Why Companies Prefer to Use Python with Hadoop ?

Why Companies Prefer to Use Python with Hadoop?

By | Python | One Comment

The Hadoop framework is written in java language but it is absolutely possible for Hadoop programs to be coded in Python or C++ Language. This implies that data architects don’t have to learn java if they are familiar with python. The python comes across as one of the most user-friendly to learn and adapt language and yet extremely powerful for end-to-end advanced analytics applications. It can be possible to write programs like map-reduce in python language without the need for translating the code into java jar files.
Python as a programming language is easy to understand and flexible. It is capable and powerful enough to run end-to-end advanced analytical applications.

Now, let’s take a look at how some of the top-notch global companies were using Hadoop in association with python:


1) Facebook :

In the domain of image processing, facebook is second to none. Each day the facebook processes millions and millions of images based on unorganized data for that facebook had to enable HDFS; it helps to store and extract tremendous volumes of data while using the Python as a backend language to perform a large chunk of its Image Processing Applications including Image resizing, Facial Image Extraction, etc.
While working on a big data project, the application developers have the option to choose from a myriad of programming languages– Python, Java, R, SQL, Julia, Scala, C, and MATLAB. But the latest usage statistics posted on multiple websites depict that a large percentage of application developers and data scientists prefer python to another programming language.
At present, Python is one of the widely used general-purpose programming languages. The software developers use Python for developing a variety of desktop GUI applications and web applications. Also, Python does not come with any native features to simplify big data application development. But Python, unlike other programming languages, emphasizes code readability.

2) Amazon :

Based on consumer research and buying patterns, Amazon recommends suitable products to the existing users. This is done by a robust machine learning engine powered by Python, which seamlessly interacts with the Hadoop ecosystem, aiding in delivering top of the line product recommendation system and boosting fault-tolerant database interactions.

3) Quora search algorithm :

Quora’s backend is constructed on Python; hence its the language used for interaction with HDFS. Also, Quora need to manage a vast number of textual data, thanks to Hadoop, Apache spark and a few other data-warehousing technologies. Quora uses the power of Hadoop coupled with python to drag out questions from searches or for suggestions.

Python Hadoop features and advantages :

a) Python with Apache Hadoop is used to store, process and analyze raw data sets. For unprocessed applications, we use python to write map-reduce programs to run on a Hadoop cluster. Hadoop has become a standard in distributed data processing but depends on java in the past. Today, there are many open-source projects that support Hadoop in Python. Python supports other Hadoop ecosystem projects and its components such as HBase, Hive, Spark, Storm, Flume, Accumulo, etc.


b) Hadoop requires Java runtime environment JRE 1.6 or higher because Hadoop is developed on top of java APIs. Hadoop works from low-level single node to a high-level multi-node cluster environment. Map-reduce is mainly used for the simultaneous processing of large sets of data. initially, it is a proposal designed by google to provide parallelism data distribution and fault-tolerance.


c) The reasons behind using Hadoop with python instead of java are not all that different than the classic java vs. python arguments. One of the most significant differences is that we don’t have to compile our code instead we can use a scripting language. This makes more interactional development of analytics possible makes maintaining and fixing applications in production environments compact and simpler to read code and so much more. Also by integrating python with Hadoop, you get access to world-class data analysis libraries such as numpy, scipy, nltk and scikit-learn that are object-oriented for both inside of Python and outside.

Python + RabbitMQ

By | Python, RabbitMQ | No Comments

In one of our Python applications, there was a requirement for a pub-sub pattern. In which the publisher will publish message and there can be multiple subscribers, listening to different types of messages.

For this, we used RabbitMQ open source message broker. It is a message broker, which accepts and forwards messages. RabbitMQ supports multiple protocols for different types of applications. The one we chose for our project is AMQP, as it’s the core protocol supported by RabbitMQ, is very robust, and is recommended by the RabbitMQ team. RabbitMQ is lightweight and easy to deploy on premises and in the cloud.

We used Pika, which is a Python client recommended by the RabbitMQ team.

Install Pika –

pip install pika --upgrade

Terms used

  • Producer – The program which sends message (publishes)
  • Consumer – The program which receives message (consumes)
  • Queue – Queue is a buffer, messages are stored inside queue. Queue lives inside RabbitMQ.
  • Exchange – In RabbitMQ, producer never sends message directly to queue, it always needs to go through an exchange. Exchange receives messages from producer on one side and then forwards them to single or multiple queues based on the type of exchange.
  • Bindings – Relationship between exchange and queue is called binding.

Exchange Types

  • Direct – Sends a message to the queue whose binding key matches.
  • Topic – Sends the message to queues based on the pattern.
  • Headers – Routes messages to queues based on message header values instead of routing key.
  • Fanout – Broadcasts a message to all the queues.

In our application, we get data from different exchanges for different indexes. When new data is received we have to perform actions based on the type of exchange and type of index. So we have to subscribe based on exchange and index type. To implement that we used ‘Topic’ exchange.

Messages sent to a topic exchange can’t have an arbitrary routing_key – it must be a list of words, delimited by dots. E.g. in case of our application we used keys in the form of exchange.index i.e. ‘nse.nifty’, ‘nse.banknifty’ etc. There can be as many words in the routing key as you like, up to the limit of 255 bytes.

The binding key must also be in the same form. The logic behind the topic exchange is – a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. However there are two important special cases for binding keys:

* (star) can substitute for exactly one word.

# (hash) can substitute for zero or more words.

Following diagram explains the message flow in our application.

Python + RabbitMQ Blog Image

P – Publisher (sends message when data is available with particular routing key)

S – Subscribers (listening for particular message using queues bound with specific keys)

NSE Queue – It is interested in all the message with ‘nse.*’ key i.e. all the data from nse exchange

NYSE Queue – Interested in all the data from nyse exchange (‘nyse.*’ key)

That means message with routing key ‘nse.nifty’ or ‘nse.banknifty’ will be delivered to ‘NSE’ queue.

Following code gives example of  the ‘Publisher’ program in our application.

Publisher –

#publisher.py
import pika
class Publisher:
def __init__(self, config):
self.config = config

def publish(self, routing_key, message):
connection = self.create_connection()
# Create a new channel with the next available channel number or pass in a channel number to use
channel = connection.channel()

# Creates an exchange if it does not already exist, and if the exchange exists,
# verifies that it is of the correct and expected class.
channel.exchange_declare(exchange=self.config['exchange'], exchange_type='topic')

#Publishes message to the exchange with the given routing key
channel.basic_publish(exchange=self.config['exchange'], routing_key=routing_key, body=message)
print(" [x] Sent message %r for %r" % (message,routing_key))

# Create new connection
def create_connection(self):
param = pika.ConnectionParameters(host=self.config['host'], port = self.config['port'])
return pika.BlockingConnection(param)

config = { 'host': 'localhost','port': 5672, 'exchange' : 'my_exchange'}
publisher = Publisher(config)
publisher.publish(‘nse.nifty’, ‘New Data’)

channel.exchange_declare method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.

channel.basic_publish method publishes the message for the given exchanges and routing_key.

python publisher.py

This will publish the message ‘New Data’ with routing key ‘nse.nifty’ to exchange.

Following code gives an example of ‘Subscriber’ in our application.

Subscriber –

#subscriber.py
import pika
import sys

class Subscriber:
def __init__(self, queueName, bindingKey, config):
self.queueName = queueName
self.bindingKey = bindingKey
self.config = config
self.connection = self._create_connection()

def __del__(self):
self.connection.close()
def _create_connection(self):
parameters = pika.ConnectionParameters(host=self.config['host'], port = self.config['port'])
return pika.BlockingConnection(parameters)

def on_message_callback(self, channel, method, properties, body):
binding_key = method.routing_key
print("received new message for -" + binding_key)

def setup(self):
channel = self.connection.channel()
channel.exchange_declare(exchange=self.config['exchange'],exchange_type='topic')

# This method creates or checks a queue
channel.queue_declare(queue=self.queueName)
# Binds the queue to the specified exchange
channel.queue_bind(queue=self.queueName,exchange=self.config['exchange'],
routing_key=self.bindingKey)

channel.basic_consume(queue=self.queueName, on_message_callback=self.on_message_callback, auto_ack=True)
print(' [*] Waiting for data for ' + self.queueName + '. To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

config = { 'host': 'localhost', 'port': 5672, 'exchange' : 'my_exchange'}
if len(sys.argv) < 2:
print('Usage: ' + __file__ + ' <QueueName> <BindingKey>')
sys.exit()
else:
queueName = sys.argv[1]
#key in the form exchange.*
key = sys.argv[2]
subscriber = Subscriber(queueName, key, config)
subscriber.setup()

channel.queue_declare creates a queue on particular channel, if it does not already exist. Creating a queue using queue_declare is idempotent ‒ we can run the command as many times as we like, and only one will be created.

channel.queue_bind method binds the queue with queueName to a specific exchange using specified binding key.

channel.basic_consume method tells the broker to start a consumer process, which checks for messages on a specified queue, and then registers a callback function that should be executed in the Subscriber class when a message is available and has been delivered to the client.

To receive messages from ‘NSE’ queue (for all the keys of the format  ‘nse.*’ ) run:

python subscriber.py nse_queue nse.*

To receive messages from ‘NYSE’ queue run:

python subscriber.py nyse_queuse nyse.*