The Batch Layer

The goal of a BI system is to answer any question (within reason) asked of it. In the Lambda architecture, any question can be implemented as function that takes all the data as input – unfortunately something that consumes the whole dataset is likely not to perform.

In the Lambda architecture, the batch layer precomputes the master dataset into batch views so that queries can be run with low latency. This requires balancing what needs to be precomputed  & what needs to be computed on the fly at execution time (Rather like aggregates in a star schema), the key is precompute just enough information to enable the query to return in an acceptable time.

The batch layer runs functions over the master dataset to precompute intermediate data called batch views. The batch views are loaded by the serving layer, which indexes them to allow rapid access to that data.

The speed layer compensates for the high latency of the batch layer by providing low-latency updates using data that has yet to be precomputed into a batch view.

(Rather like aggregates/caching in memory, with more esoteric queries going to the relational engine).

Queries are then satisfied by processing data from the serving layer views and the speed layer views, and merging the results

*you should take the opportunity to thoroughly explore the data & connect diverse pieces of data together! – assuming you have a priori knowledge of the necessary ‘joined’ datasets!

A naive strategy for computing on the batch layer would be to precompute all possible queries and cache the results in the serving layer. Unfortunately you can’t always precompute everything. Consider the pageviews-over-time query as an example. If you wanted to precompute every potential query, you’d need to determine the answer for every possible range of hours for every URL. But the number of ranges of hours within a given time frame can be huge. In a one-year period, there are approximately 380 million distinct hour ranges. To precompute the query, you’d need to precompute and index 380 million values for every URL. This is obviously infeasible and an unworkable solution

…Yet this is very much achievable using OLAP tools

BigData: Principles & Practices of scalable real-time data systems

Nathan Marz
BigData: Principles & Practices of scalable real-time data systems

“Web-scale applications like real-time analytics or e-commerce sites deal with a lot of data, whose volume and velocity exceed the limits of traditional RDBMS. These systems require architectures built around clusters of machines to store & process data of any size, or speed. Forunately, scale and simplicity are not mutually exclusive.

…Build big data systems using an architecture designed specifically to capture and analyse web-scale data. This book presents the Lambda Architecturea scalable, easy-to-understand approach that can be built and run by a small team. You’ll explore the theory of bigdata systems and how to implement them in practice. In addition to discovering a general framework for processing bigdata, you’ll learn specif technologies like Hadoop, Storm & NoSQL DBs.

What is the Lambda Architecture?
Nathan Marz came up with the term Lambda Architecture (LA) for a generic, scalable and fault-tolerant data processing architecture, based on his experience working on distributed data processing systems.

Batch layer
The batch layer precomputes results using a distributed processing system that can handle very large quantities of data. The batch layer aims at perfect accuracy by being able to process all available data when generating views. This means it can fix any errors by recomputing based on the complete data set, then updating existing views. Output is typically stored in a read-only database, with updates completely replacing existing precomputed views.Apache Hadoop is the de facto standard batch-processing system used in most high-throughput architectures.

Speed layer
The speed layer processes data streams in real time and without the requirements of fix-ups or completeness. This layer sacrifices throughput as it aims to minimize latency by providing real-time views into the most recent data. Essentially, the speed layer is responsible for filling the “gap” caused by the batch layer’s lag in providing views based on the most recent data. This layer’s views may not be as accurate or complete as the ones eventually produced by the batch layer, but they are available almost immediately after data is received, and can be replaced when the batch layer’s views for the same data become available. Stream-processing technologies typically used in this layer include Apache Storm, SQLstream and Apache Spark. Output is typically stored on fast NoSQL databases.

Serving layer
Output from the batch and speed layers are stored in the serving layer, which responds to ad-hoc queries by returning precomputed views or building views from the processed data. Examples of technologies used in the serving layer include Druid, which provides a single cluster to handle output from both layers.[7]Dedicated stores used in the serving layer include Apache Cassandra or Apache HBase for speed-layer output, and Elephant DB orCloudera Impala for batch-layer output.

The premise behind the Lambda architecture is you should be able to run ad-hoc queries against all of your data to get results, but doing so is unreasonably expensive in terms of resource. Technically it is now feasible to run ad-hoc queries against your Big Data (Cloudera Impala), but querying a petabyte dataset everytime you want to compute the number of pageviews for a URL may not always be the most efficient approach. So the idea is to precompute the results as a set of views, and you query the views. I tend to call these Question Focused Datasets (e.g. pageviews QFD).

The LA aims to satisfy the needs for a robust system that is fault-tolerant, both against hardware failures and human mistakes, being able to serve a wide range of workloads and use cases, and in which low-latency reads and updates are required. The resulting system should be linearly scalable, and it should scale out rather than up. Here’s how it looks like, from a high-level perspective:

LA overview

  1. All data entering the system is dispatched to both the batch layer and the speed layer for processing.
  2. The batch layer has two functions: (i) managing the master dataset (an immutable, append-only set of raw data), and (ii) to pre-compute the batch views.
  3. The serving layer indexes the batch views so that they can be queried in low-latency, ad-hoc way.
  4. The speed layer compensates for the high latency of updates to the serving layer and deals with recent data only.
  5. Any incoming query can be answered by merging results from batch views and real-time views.

(Re)installing Apache Tika

Install a Windows subversion client (to make life easier)

Download Tika
This will install all the required Tika packages
SVN > Checkout
URL of repository:

Download Java SDK
Before downloading Maven, you need the java SDK

Install it to a pathname without spaces, such as c:\j2se1.6.

Once Java is installed, you must ensure that the commands from the Java SDK are in your PATH environment variable.

Set environmental variable JAVA_HOME to installation dir
Control Panel\System and Security\System
> Advanced system settings > SYSTEM VARIABLES > NEW

Append the full path of the Java compiler to the system path
Append the string C:\jdk1.7.0_79 to the end of the system variable ‘PATH’

Download Maven

  1. navigate in explorer to the Maven directory
  2. go to a dir IN the bin
  3. copy the address in the address bar(must end with bin)
  4. go to Start and type in “env”
  5. Select “edit the system evironment variables”
  6. find the PATH variable which must also have an existing value for Java as Maven needs Java.
  7. append a ; + paste the path.
  8. restart to update system
  9. run “mvn install” in the client ???

Download the ‘binaries’? *BIN* & *SRC*

Extract to
& C:\apache-maven-3.2.5\src

Maven in PATH

You run Maven by invoking a command-line tool: mvn.bat from the bin directory of the Maven. To do this conveniently, c:\mvn3.0.4\bin must be in your PATH, just like the J2SE SDK commands. You can add directories to your PATH in the control panel; the details vary by Windows version.

Add M2_HOME to sys vars

Update PATH variable, append Maven bin folder – %M2_HOME%\bin, so that you can run the Maven’s command everywhere.

Check install using
mvn –version
Should echo back windows etc.

Go to the src dir
Run mvn install


Now to connect Tika with Pentaho!

Advanced Analytics with Apache Spark: What is Spark?

The challenges of Data Science

The hard truths:

First, the majority of work that goes into conducting successful analyses lie in preprocessing data. Data is messy, and cleansing, munging, fusing, mushing and many other verbs are prerequisites for doing anything useful with it…A typical data (processing) pipeline requires spending far more time in feature engineering and selection than in choosing and writing algorithms. The Data Scientist must choose from a wide variety of potential features (data). Each comes with its own challenges in converting to vectors fit for machine learning algorithms. A system needs to support more flexible transformations than turning a 2-d array (tuple) of doubles into a mathematical model.

Second, iteration. Models typically require multiple passes/scans over the same data to reach convergence. Iteration is also crucial within the Data Scientist’s own workflow, usually the results of a query inform the next query that should run. Feature and algorithm selection, significance testing and finding the right hyper-parameters all require multiple experiments.

Third, implementation. Models need to be operationalised and become part of a production service and may need to be rebuilt periodically or even in real time – analytics in the ‘lab’ vs analytics in the ‘factory’. Exploratory analytics usually occurs in e.g. R, whereas production applications’ data pipelines are written in C++ or Java.

Enter Apache Spark

Apache Spark is an open-source cluster computing framework originally developed in the AMPLab at UC Berkeley. In contrast toHadoop’s two-stage disk-based MapReduce paradigm, Spark’s in-memory primitives provide performance up to 100 times faster for certain applications.[1] By allowing user programs to load data into a cluster’s memory and query it repeatedly, Spark is well suited to machine learning algorithms.[2]

Spark is an open-source framework that combines an engine for distributed programs across clusters of machines with an elegant model for writing programs atop it.

Word Count

In this SCALA example, we build a dataset of (String, Int) pairs called counts and then save it to a file:

val file = spark.textFile(“hdfs://…”)
val counts = file.flatMap(line => line.split(” “))
.map(word => (word, 1))
.reduceByKey(_ + _)

One way to understand Spark is to compare it to its predecessor, MapReduce. Spark maintains MapReduce’s linear scalability and fault tolerance, but extends it in 3 important ways:

First, rather than relying on a rigid map-then-reduce format, Spark’s engine can execture a more general directed acyclic graph (DAG) of operators. 

This means that (rather than MapReduce writing out intermediate results to the distributed filesystem), Spark can pass them directly to the next step in the pipeline.

Secondly, Spark has a rich set of transformations that enable users to express computation more naturally. Its streamlined API can represent complex pipelines in a few lines of code.

Thirdly, Spark leverages in-memory processing (rather than distributed disk). Developers can materialise any point in a processing pipeline into memory across the cluster, meaning that future steps that want to deal with the same dataset need not recompute it from disk.

Perhaps most importantly, Spark’s collapsing of the full pipeline, from preprocessing to model evaluation, into a single programming environment can dramatically speed up productivity. By packaging an expressive programming model with a set of analytic libraries under REPL it avoids round trips and moving data back and forth. Spark spans the gap between lab and factory analytics, Spark is better at being an operational system than most exploratory systems and better for data exploration that the technologies commonly used in operational systems. Spark also has tremendous versatility offering functionality like SQL, being able to ingest data from streaming services like Flume & Kafka, it has machine learning tools and graph processing capabilities.

ES: Filter types

and filter
bool filter
exists filter
geo bounding box filter
geo distance filter
geo distance range filter
geo polygon filter
geoshape filter
geohash cell filter
has child filter
has parent filter
ids filter
indices filter
limit filter
match all filter
missing filter
nested filter
not filter
or filter
prefix filter
query filter
range filter
regexp filter
script filter
term filter
terms filter
type filter

Chapter 12: Structured search

Combining Filters

SELECT product
FROM products
WHERE (price 20 OR productID = ‘XXYYZZ’)
AND (price !=30)

Bool Filter

The bool filter is comprised 3 sections

“bool” : {
“must”:          [ ],
“should”:       [ ],
“must_not”:   [ ],

-MUST: All of the clauses must match. The equivalent of AND
-SHOULD: At least ONE of the clauses must match. The equivalent of OR
-MUST_NOT: All of the clauses must NOT match. The equivalent of NOT


To replicate the preceding SQL example, we will take the two term filters that we used previously and place them inside the should clause of a bool filter, and add another clause to deal with the NOT condition:

GET /my_store/products/_search
   "query" : {
      "filtered" : { 
         "filter" : {
            "bool" : {
              "should" : [
                 { "term" : {"price" : 20}}, 
                 { "term" : {"productID" : "XHDK-A-1293-#fJ3"}} 
              "must_not" : {
                 "term" : {"price" : 30} 
Note that we still need to use a filtered query to wrap everything.
These two term filters are children of the bool filter, and since they are placed inside the should clause, at least one of them needs to match.
If a product has a price of 30, it is automatically excluded because it matches a must_notclause.

Our search results return two hits, each document satisfying a different clause in the bool filter:

"hits" : [
        "_id" :     "1",
        "_score" :  1.0,
        "_source" : {
          "price" :     10,
          "productID" : "XHDK-A-1293-#fJ3" 
        "_id" :     "2",
        "_score" :  1.0,
        "_source" : {
          "price" :     20, 
          "productID" : "KDKE-B-9947-#kL5"
Matches the term filter for productID = "XHDK-A-1293-#fJ3"
Matches the term filter for price = 20

Chapter 8: (Sorting and) relevance

(pp117) What is Relevance?

The relevance score of each document is represented by a +ve float called _score. The higher the _score, the more relevant the document.

A query clause generates a _score for each document. How that score is calculated depends on the type of query used. 

Different queries are used for different purposes.
– A fuzzy query might determine the _score by calculating how similar the spelling of the query term is to that within documents.
– A terms query would incorporate the percentage of terms that were found.

The standard similarity algorithm used in ES is known as a term freqy/inverse document freqy or TD/IDFwhich takes the following factors into account:

-Term Freqy
-Inverse Document Freqy
-Field-length norm : How long is the field? The longer it is, the less likely it is that the words in the field will be relevant. A term appearing in a <short title> field carries more weight than the same term appearing in a long <content> field.


Consider a document containing 100 words wherein the word cat appears 3 times. The term frequency (i.e., tf) for cat is then (3 / 100) = 0.03. Now, assume we have 10 million documents and the word cat appears in one thousand of these. Then, the inverse document frequency (i.e., idf) is calculated as log(10,000,000 / 1,000) = 4. Thus, the Tf-idf weight is the product of these quantities: 0.03 * 4 = 0.12.

Ch3: Data In, Data Out (inverted indexes)

In the real world, not all entities of the same type looks the same. One person might just have a home telephone, another a cell # and another both, another none of these. In the RDBMS world, each entities demands its own column, or to be modelled as a KV pair. This leads to waste and redundancy. Brazillian people may have 10+ family names, a Brit one.

The problem comes when we need to store these entities. Traditionally, this as been accomplished through a RDBMS with columns and rows.

Of course, we don’t only need to store the data, we need to query, use the data. While noSQL solutons (eg MongoDB) exist that allow us to store objects as documents, they still require us to think about how we want to query our data, and which fields require an index to speed up retrieval.

In ES, all data in every field is indexed by default. Every field has a dedicated inverted index and unlike most other DBs, it can use all of those inverted indices in the same query

What is an inverted index?