In-Stream Big Data Processing

Originally posted on Highly Scalable Blog:

The shortcomings and drawbacks of batch-oriented data processing were widely recognized by the Big Data community quite a long time ago. It became clear that real-time query processing and in-stream processing is the immediate need in many practical applications. In recent years, this idea got a lot of traction and a whole bunch of solutions like Twitter’s Storm, Yahoo’s S4, Cloudera’s Impala, Apache Spark, and Apache Tez appeared and joined the army of Big Data and NoSQL systems. This article is an effort to explore techniques used by developers of in-stream data processing systems, trace the connections of these techniques to massive batch processing and OLTP/OLAP databases, and discuss how one unified query engine can support in-stream, batch, and OLAP processing at the same time.

At Grid Dynamics, we recently faced a necessity to build an in-stream data processing system that aimed to crunch about 8 billion events daily providing…

View original 5,219 more words

The Batch Layer

The goal of a BI system is to answer any question (within reason) asked of it. In the Lambda architecture, any question can be implemented as function that takes all the data as input – unfortunately something that consumes the whole dataset is likely not to perform.

In the Lambda architecture, the batch layer precomputes the master dataset into batch views so that queries can be run with low latency. This requires balancing what needs to be precomputed  & what needs to be computed on the fly at execution time (Rather like aggregates in a star schema), the key is precompute just enough information to enable the query to return in an acceptable time.

The batch layer runs functions over the master dataset to precompute intermediate data called batch views. The batch views are loaded by the serving layer, which indexes them to allow rapid access to that data.

The speed layer compensates for the high latency of the batch layer by providing low-latency updates using data that has yet to be precomputed into a batch view.

(Rather like aggregates/caching in memory, with more esoteric queries going to the relational engine).

Queries are then satisfied by processing data from the serving layer views and the speed layer views, and merging the results

*you should take the opportunity to thoroughly explore the data & connect diverse pieces of data together! – assuming you have a priori knowledge of the necessary ‘joined’ datasets!

A naive strategy for computing on the batch layer would be to precompute all possible queries and cache the results in the serving layer. Unfortunately you can’t always precompute everything. Consider the pageviews-over-time query as an example. If you wanted to precompute every potential query, you’d need to determine the answer for every possible range of hours for every URL. But the number of ranges of hours within a given time frame can be huge. In a one-year period, there are approximately 380 million distinct hour ranges. To precompute the query, you’d need to precompute and index 380 million values for every URL. This is obviously infeasible and an unworkable solution

…Yet this is very much achievable using OLAP tools

RDDs are the new bytecode of Apache Spark

Originally posted on O. Girardot:

With the Apache Spark 1.3 release the Dataframe API for Spark SQL got introduced, for those of you who missed the big announcements, I’d recommend to read the article : Introducing Dataframes in Spark for Large Scale Data Science from the Databricks blog. Dataframes are very popular among data scientists, personally I’ve mainly been using them with the great Python library Pandas but there are many examples in R (originally) and Julia.

Of course if you’re using only Spark’s core features, nothing seems to have changed with Spark 1.3 : Spark’s main abstraction remains the RDD (Resilient Distributed Dataset), its API is very stable now and everyone used it to handle any kind of data since now.

But the introduction of Dataframe is actually a big deal, because when RDDs were the only option to load data, it was obvious that you needed to parse your “maybe” un-structured data using RDDs, transform…

View original 917 more words

the properties of (big)data





.when designing your (big data) system, you want to be able to answer as many questions as possible. If you can, you want to store the rawest information you can get your hands on – the rawer your data – the more questions you can ask of it.

storing ‘super-atomic’ raw data is hugely valuable because you rarely you rarely know in advance all the questions you want answered.

By keeping the rawest data possible, you maximize the ability to obtain new insights, whereas summarizing (aggregating), overwriting or deleting information limits what the data can tell you.

if the algorithm generating data is likely to change over time, then store the unstructured (unprocessed) data – the data can be re computed from source as the algorithm improves.


Unlike the RDBMS/OLTP world of updates, you don’t update or delete data, you only add (append) more. This provides two advantages

– human-fault tolerance

– simplicity: indexes are not required as no data objects need to be retrieved or updated. Storing a master dataset can be as simple as flat (S3, HDFS) files.

noSQL is not a panacea…

NoSQL is not a panacea
The past decade has seen a huge amount of innovation in scalable data systems.
These include large-scale computation systems like Hadoop and databases such as
Cassandra and Riak. These systems can handle very large amounts of data, but with
serious trade-offs.
Hadoop, for example, can parallelize large-scale batch computations on very large
amounts of data, but the computations have high latency. You don’t use Hadoop for
anything where you need low-latency results.

NoSQL databases like Cassandra achieve their scalability by offering you a much
more limited data model than you’re used to with something like SQL. Squeezing
your application into these limited data models can be very complex. And because the
databases are mutable, they’re not human-fault tolerant.

These tools on their own are not a panacea. But when intelligently used in conjunction
with one another, you can produce scalable systems for arbitrary data problems
with human-fault tolerance and a minimum of complexity. This is the Lambda

BigData: Principles & Practices of scalable real-time data systems

Nathan Marz
BigData: Principles & Practices of scalable real-time data systems

“Web-scale applications like real-time analytics or e-commerce sites deal with a lot of data, whose volume and velocity exceed the limits of traditional RDBMS. These systems require architectures built around clusters of machines to store & process data of any size, or speed. Forunately, scale and simplicity are not mutually exclusive.

…Build big data systems using an architecture designed specifically to capture and analyse web-scale data. This book presents the Lambda Architecturea scalable, easy-to-understand approach that can be built and run by a small team. You’ll explore the theory of bigdata systems and how to implement them in practice. In addition to discovering a general framework for processing bigdata, you’ll learn specif technologies like Hadoop, Storm & NoSQL DBs.

What is the Lambda Architecture?
Nathan Marz came up with the term Lambda Architecture (LA) for a generic, scalable and fault-tolerant data processing architecture, based on his experience working on distributed data processing systems.

Batch layer
The batch layer precomputes results using a distributed processing system that can handle very large quantities of data. The batch layer aims at perfect accuracy by being able to process all available data when generating views. This means it can fix any errors by recomputing based on the complete data set, then updating existing views. Output is typically stored in a read-only database, with updates completely replacing existing precomputed views.Apache Hadoop is the de facto standard batch-processing system used in most high-throughput architectures.

Speed layer
The speed layer processes data streams in real time and without the requirements of fix-ups or completeness. This layer sacrifices throughput as it aims to minimize latency by providing real-time views into the most recent data. Essentially, the speed layer is responsible for filling the “gap” caused by the batch layer’s lag in providing views based on the most recent data. This layer’s views may not be as accurate or complete as the ones eventually produced by the batch layer, but they are available almost immediately after data is received, and can be replaced when the batch layer’s views for the same data become available. Stream-processing technologies typically used in this layer include Apache Storm, SQLstream and Apache Spark. Output is typically stored on fast NoSQL databases.

Serving layer
Output from the batch and speed layers are stored in the serving layer, which responds to ad-hoc queries by returning precomputed views or building views from the processed data. Examples of technologies used in the serving layer include Druid, which provides a single cluster to handle output from both layers.[7]Dedicated stores used in the serving layer include Apache Cassandra or Apache HBase for speed-layer output, and Elephant DB orCloudera Impala for batch-layer output.

The premise behind the Lambda architecture is you should be able to run ad-hoc queries against all of your data to get results, but doing so is unreasonably expensive in terms of resource. Technically it is now feasible to run ad-hoc queries against your Big Data (Cloudera Impala), but querying a petabyte dataset everytime you want to compute the number of pageviews for a URL may not always be the most efficient approach. So the idea is to precompute the results as a set of views, and you query the views. I tend to call these Question Focused Datasets (e.g. pageviews QFD).

The LA aims to satisfy the needs for a robust system that is fault-tolerant, both against hardware failures and human mistakes, being able to serve a wide range of workloads and use cases, and in which low-latency reads and updates are required. The resulting system should be linearly scalable, and it should scale out rather than up. Here’s how it looks like, from a high-level perspective:

LA overview

  1. All data entering the system is dispatched to both the batch layer and the speed layer for processing.
  2. The batch layer has two functions: (i) managing the master dataset (an immutable, append-only set of raw data), and (ii) to pre-compute the batch views.
  3. The serving layer indexes the batch views so that they can be queried in low-latency, ad-hoc way.
  4. The speed layer compensates for the high latency of updates to the serving layer and deals with recent data only.
  5. Any incoming query can be answered by merging results from batch views and real-time views.

(Re)installing Apache Tika

Install a Windows subversion client (to make life easier)

Download Tika
This will install all the required Tika packages
SVN > Checkout
URL of repository:

Download Java SDK
Before downloading Maven, you need the java SDK

Install it to a pathname without spaces, such as c:\j2se1.6.

Once Java is installed, you must ensure that the commands from the Java SDK are in your PATH environment variable.

Set environmental variable JAVA_HOME to installation dir
Control Panel\System and Security\System
> Advanced system settings > SYSTEM VARIABLES > NEW

Append the full path of the Java compiler to the system path
Append the string C:\jdk1.7.0_79 to the end of the system variable ‘PATH’

Download Maven

  1. navigate in explorer to the Maven directory
  2. go to a dir IN the bin
  3. copy the address in the address bar(must end with bin)
  4. go to Start and type in “env”
  5. Select “edit the system evironment variables”
  6. find the PATH variable which must also have an existing value for Java as Maven needs Java.
  7. append a ; + paste the path.
  8. restart to update system
  9. run “mvn install” in the client ???

Download the ‘binaries’? *BIN* & *SRC*

Extract to
& C:\apache-maven-3.2.5\src

Maven in PATH

You run Maven by invoking a command-line tool: mvn.bat from the bin directory of the Maven. To do this conveniently, c:\mvn3.0.4\bin must be in your PATH, just like the J2SE SDK commands. You can add directories to your PATH in the control panel; the details vary by Windows version.

Add M2_HOME to sys vars

Update PATH variable, append Maven bin folder – %M2_HOME%\bin, so that you can run the Maven’s command everywhere.

Check install using
mvn –version
Should echo back windows etc.

Go to the src dir
Run mvn install


Now to connect Tika with Pentaho!

Scala Actions

first – returns the first element of the RDD (typically a file header)

take – returns a given number of records (like *nix HEAD)

collect – returns the whole dataset

Actions, by default, return results to the local process. The saveAsTextFile actions saves the contents of an RDD to persistent storage, such as HDFS
-This action creates a directory & writes out each partition as a file within it.

foreach println – prints out each value in the array on its own line

foreach printlin is an example of functional programming, where one function (foreach) is passed as an argument to another function (println)