Thursday, June 13, 2013

Big data analytics frameworks

Since Google introduced the MapReduce paradigm in 2004, several solutions have appeared that use (or have qualities) of the original MapReduce paradigm. Google's original application of MapReduce was for the indexing of the World Wide Web. Although this application remains in popular usage, the problems that this simple model solves are growing.

Nearly a year before Storm was introduced to open source, Yahoo!'s S4 distributed stream computing platform was open sourced to Apache. S4 was released in October 2010 and provides a high-performance computing (HPC) platform that hides the complexity of parallel processing from the application developer. S4 implements a decentralized cluster architecture that is scalable and incorporates partial fault tolerance.

Since Hadoop was developed, a number of other big data analytics platforms have arrived that may be worthy of a look. These platforms range from simple script-based offerings to production environments similar to Hadoop.
One of the simplest is called bashreduce, which as the name suggests allows you to perform MapReduce-type operations across multiple machines in the Bash environment. bashreduce relies on Secure Shell (password-less) for the cluster of machines you plan to use, and then exists as a script through which you request jobs via UNIX®-style tools (sortawk,netcat, and the like).
GraphLab is another interesting implementation of the MapReduce abstraction that focuses on parallel implementation of machine learning algorithms. In GraphLab, the Map stage defines computations that can be performed independently in isolation (on separate hosts), and the Reduce stage combines the results.
Finally, a newcomer to the big data scene is Storm from Twitter (through the acquisition of BackType). Storm is defined as the "Hadoop of real-time processing" and is focused on stream processing and continuous computation (stream results out as they're computed). Storm is written in Clojure (a modern dialect of the Lisp language) but supports applications written in any language (such as Ruby and Python). Twitter released Storm as open source in September 2011.

Some of the most popular solutions are : 

StormTwitterStreamingTwitter's new streaming big-data analytics solution
S4Yahoo!StreamingDistributed stream computing platform from Yahoo!
HadoopApacheBatchFirst open source implementation of the MapReduce paradigm
SparkUC Berkeley AMPLabBatchRecent analytics platform that supports in-memory data sets and resiliency
DiscoNokiaBatchNokia's distributed MapReduce framework
HPCCLexisNexisBatchHPC cluster for big data


bashreduce lets you apply your favorite unix tools in a mapreduce fashion across multiple machines/cores. There’s no installation, administration, or distributed filesystem. You’ll need:

  • br somewhere handy in your path
  • vanilla unix tools: sort, awk, ssh, netcat, pv
  • password-less ssh to each machine you plan to use


Let’s start with a simpler scenario: I have a machine with multiple cores and with normal unix tools I’m relegated to using just one core. How does br help us here? Here’s br on an 8-core machine, essentially operating as a poor man’s multi-core sort
sort -k1,1 -S2G 4gb_file > 4gb_file_sortedcoreutils30m32.078s2.24 MBps
br -i 4gb_file -o 4gb_file_sortedcoreutils11m3.111s6.18 MBps
br -i 4gb_file -o 4gb_file_sortedbrp/brm7m13.695s9.44 MBps
Here lies the promise of mapreduce: rather than use my big honkin’ machine, I have a bunch of cheaper machines lying around that I can distribute my work to. How does br behave when I add four cheaper 4-core machines into the mix?

sort -k1,1 -S2G 4gb_file > 4gb_file_sortedcoreutils30m32.078s2.24 MBps
br -i 4gb_file -o 4gb_file_sortedcoreutils8m30.652s8.02 MBps
br -i 4gb_file -o 4gb_file_sortedbrp/brm4m7.596s16.54 MBps


The GraphLab journey began with the desire:

  • to rethink the way we approach Machine Learning and Graph analytics,
  • to demonstrate that with the right abstractions and system design we can achieve unprecedented levels of performance, and
  • to build a community around large-scale graph computation.
GraphLab 2.2 is just around the corner, see here for more details as to what is in it. Beyond that, we are exploring a new computation engine and further enhancements to the communication layer, as well as simpler integration with existing Cloud technologies, easier installation procedures, and an exciting new graph storage system.  And of course, we look forward to working with you to develop the roadmap and build the next generation of the GraphLab system.


Unlike Hadoop, Storm is a computation system and incorporates no concept of storage. This allows Storm to be used in a variety of contexts, whether data arrives dynamically from a nontraditional source or is stored in a storage system such as a database (or consumed by a controller for real-time manipulation of some other device, such as a trading system).

A Storm cluster is superficially similar to a Hadoop cluster. Whereas on Hadoop you run "MapReduce jobs", on Storm you run "topologies". "Jobs" and "topologies" themselves are very different -- one key difference is that a MapReduce job eventually finishes, whereas a topology processes messages forever (or until you kill it).

There are two kinds of nodes on a Storm cluster: the master node and the worker nodes. The master node runs a daemon called "Nimbus" that is similar to Hadoop's "JobTracker". Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures.

Each worker node runs a daemon called the "Supervisor". The supervisor listens for work assigned to its machine and starts and stops worker processes as necessary based on what Nimbus has assigned to it. Each worker process executes a subset of a topology; a running topology consists of many worker processes spread across many machines.

The core abstraction in Storm is the "stream". A stream is an unbounded sequence of tuples. Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way. For example, you may transform a stream of tweets into a stream of trending topics.

The basic primitives Storm provides for doing stream transformations are "spouts" and "bolts". Spouts and bolts have interfaces that you implement to run your application-specific logic.

A spout is a source of streams. For example, a spout may read tuples off of a Kestrel queue and emit them as a stream. Or a spout may connect to the Twitter API and emit a stream of tweets.

A bolt consumes any number of input streams, does some processing, and possibly emits new streams. Complex stream transformations, like computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts. Bolts can do anything from run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases, and more.

Networks of spouts and bolts are packaged into a "topology" which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt. Edges in the graph indicate which bolts are subscribing to which streams. When a spout or bolt emits a tuple to a stream, it sends the tuple to every bolt that subscribed to that stream.

No comments:

Post a Comment