SCD Hash Merge/Upsert (Snowflake)

— Persist update scripts for table ** tbl_name **

!set variable_substitution=true
USE ROLE TALEND_ETL;
USE DATABASE EDW_&{database_name};
USE WAREHOUSE TALEND_ETL_WH;


— tbl_name PERSIST TABLE UPDATER


— Clear out the old table from yesterday
DROP TABLE IF EXISTS PERSIST.”tbl_name”;
— Clone the current table to _OLD
CREATE OR REPLACE TABLE PERSIST.”tbl_name” CLONE PERSIST.”tbl_name”;

— Sweep #1: Update rows where the hash is unchanged and insert rows where the hash doesn’t match
MERGE INTO PERSIST.tbl_name T
USING (
select staging.hash_diff(array_construct(
sedol,CLASS_FINANCIAL_INSTRUMENT,EXCLUDE_MIFIDII_REPORTING,IS_PAIF,
IS_LEVERAGED_FINANCIAL_INSTRUMENT,IS_PRIIP,OPTION_TYPE,
NOTIONAL_CURRENCY_1,NOTIONAL_CURRENCY_2,OPTION_EXERCISE_STYLE,
MATURITY_DATE,EXPIRY_DATE,DELIVERY_TYPE,PRICE_MULTIPLIER,STRIKE_PRICE,
STRIKE_PRICE_CURRENCY,IS_COMMODITY_DERIVATIVE
)) as tracking_hash, *
from landing.tbl_name
) S
ON T.tracking_hash = S.tracking_hash
AND T.sedol = S.sedol
AND T.valid_to_date = to_date(‘9999-12-31’)
WHEN MATCHED THEN UPDATE SET
t.sedol = s.sedol, t.CLASS_FINANCIAL_INSTRUMENT = s.CLASS_FINANCIAL_INSTRUMENT,
t.EXCLUDE_MIFIDII_REPORTING = s.EXCLUDE_MIFIDII_REPORTING,
t.IS_PAIF = s.IS_PAIF, t.IS_LEVERAGED_FINANCIAL_INSTRUMENT = s.IS_LEVERAGED_FINANCIAL_INSTRUMENT,
t.IS_PRIIP = s.IS_PRIIP, t.OPTION_TYPE = s.OPTION_TYPE,
t.NOTIONAL_CURRENCY_1 = s.NOTIONAL_CURRENCY_1, t.NOTIONAL_CURRENCY_2 = s.NOTIONAL_CURRENCY_2,
t.OPTION_EXERCISE_STYLE = s.OPTION_EXERCISE_STYLE, t.MATURITY_DATE = s.MATURITY_DATE,
t.EXPIRY_DATE = s.EXPIRY_DATE, t.DELIVERY_TYPE = s.DELIVERY_TYPE,
t.PRICE_MULTIPLIER = s.PRICE_MULTIPLIER, t.STRIKE_PRICE = s.STRIKE_PRICE,
t.STRIKE_PRICE_CURRENCY = s.STRIKE_PRICE_CURRENCY,
t.IS_COMMODITY_DERIVATIVE = s.IS_COMMODITY_DERIVATIVE,
t.META_EXTRACT_DATE = s.META_EXTRACT_DATE, t.META_EXTRACT_TIME = s.META_EXTRACT_TIME,
t.META_SOURCE = s.META_SOURCE
WHEN NOT MATCHED THEN INSERT(
tracking_hash, sedol, CLASS_FINANCIAL_INSTRUMENT,EXCLUDE_MIFIDII_REPORTING,IS_PAIF,
IS_LEVERAGED_FINANCIAL_INSTRUMENT,IS_PRIIP,OPTION_TYPE,
NOTIONAL_CURRENCY_1,NOTIONAL_CURRENCY_2,OPTION_EXERCISE_STYLE,
MATURITY_DATE,EXPIRY_DATE,DELIVERY_TYPE,PRICE_MULTIPLIER,STRIKE_PRICE,
STRIKE_PRICE_CURRENCY,IS_COMMODITY_DERIVATIVE, valid_from_date, valid_to_date,
current_record_yn,
meta_extract_date, meta_extract_time, meta_source

)
VALUES(
S.tracking_hash, S.sedol, S.CLASS_FINANCIAL_INSTRUMENT,S.EXCLUDE_MIFIDII_REPORTING,S.IS_PAIF,
S.IS_LEVERAGED_FINANCIAL_INSTRUMENT,S.IS_PRIIP,S.OPTION_TYPE,
S.NOTIONAL_CURRENCY_1,S.NOTIONAL_CURRENCY_2,S.OPTION_EXERCISE_STYLE, S.MATURITY_DATE,S.EXPIRY_DATE,S.DELIVERY_TYPE,S.PRICE_MULTIPLIER,S.STRIKE_PRICE, S.STRIKE_PRICE_CURRENCY,S.IS_COMMODITY_DERIVATIVE,
dateadd(day,-1,to_date(S.META_EXTRACT_DATE)), to_date(‘9999-12-31’),
‘Y’,

S.meta_extract_date, S.meta_extract_time, S.meta_source
);

— Sweep #2: Update (expire) rows where we’ve inserted a replacement row

update PERSIST.tbl_name T
set T.valid_to_date = dateadd(day,-2,to_date(S.META_EXTRACT_DATE)),
T.current_record_yn = ‘N’

from (
select staging.hash_diff(array_construct(
sedol,CLASS_FINANCIAL_INSTRUMENT,EXCLUDE_MIFIDII_REPORTING,IS_PAIF,
IS_LEVERAGED_FINANCIAL_INSTRUMENT,IS_PRIIP,OPTION_TYPE,
NOTIONAL_CURRENCY_1,NOTIONAL_CURRENCY_2,OPTION_EXERCISE_STYLE,
MATURITY_DATE,EXPIRY_DATE,DELIVERY_TYPE,PRICE_MULTIPLIER,STRIKE_PRICE,
STRIKE_PRICE_CURRENCY,IS_COMMODITY_DERIVATIVE
)) as tracking_hash, sedol, meta_extract_date
from landing.tbl_name
) S
where t.sedol = s.sedol
and t.valid_to_date = to_date(‘9999-12-31’)
and t.tracking_hash <> s.tracking_hash;

The Spark Programming Model

Spark programming usually starts with a dataset or few, usually residing in some form of distributed, persistent storage like HDFS. Writing a spark program usually consists of a few related steps

  • Defining a set of transformations on input data sets
  • Invoking actions that output the transformed datasets to persistent storage or return results (to local memory)
  • Running local computations that operate on the results computed in a distributed fashion – these can help you decide what the next transformations and actions could be

Spark functions revolve around storage and execution

SCALA uses type inference

Whenever we create a new variable in Scala, we must preface it with either val or var
– val are immutable and cannot be changed to refer to another value once assigned
-var can be changed to refer to different objects of the same type.

Hadoop in Practice

Reading Hadoop in Practice

“It was a revelation to observe our MapReduce jobs crunching through our data in minutes. Of course, what we weren’t expecting was the amount of time that we would spend debugging and performance-tuning our MR jobs.

Not to mention the new roles we took on as production administrators-the biggest surprise in this role was the number of disk failures we encountered during those first few months supporting production [1998].

The greatest challenge we faced when working with Hadoop, and specifically MR, was learning how to (think) solve problems with it.

After one is used to thinking in MR, the next challenge is typically related to the logistics of working with Hadoop, such as how to move data in & out of HDFS.”

41-ma5s5hgL._AA160_[1]

MongoDB Applied Design Patterns – Ch.4

Operational Intelligence pp.37 Converting tx data to actionable information.

  • Of course, the starting point for any of these techniques is getting the raw tx data into your datastore.
  • Once you have the data, the 1st priority is to generate actionable reports – ideally in real time
  • Finally, we explore more traditional batch “Hierarchical” aggregation

Consider a typical Apache weblog record: 127.0.0.1 – frank [10/Oct/2000:13:55:36 -0700] “GET /apache_pb.gif HTTP/1.0” 200 2326 One option might be to bung the whole data row into its own document ie the whole string in one slot. However, this is not particularly useful (hard to query) or efficient. For example, if you wanted to find events on the same page you’d have to write a nasty regex which would require a full scan of the collection. The preferred approach is to extract the relevant information into individual fields in a MongoDB document. The UTC timestamp data format stores the verbose timestamp as a meager 8 bytes, rather than the natural 28 bytes. Consider the following document that captures all the data from the log entry:

{

_id: Objectid(…), host: “127.0.0.1”, logname: null, user: ‘frank’, time: ISOGetData(“2000-10-10T20:55:36Z”), request “GET /apache_pb.gif HTTP/1.0”, status: 200, request_size: 2326, referrer: “Http://……”, user_agent: “Whatever browser, O/S etc@

}

MongoDB has a configurable write concern which trades off write consistency with write speed. w=0 means you do not require Mongo to acknowledge receipt of the insert. w=1 the opposite. The former is obviously faster but may lose some data. using j=TRUE tells Mongo to use an on-disk journal file to persist data before writing data back to the ‘regular’ data files. This is the safest, but slowest option. You can also require that Mongo replicate the data to replica set(s) before returning, And combine these strategies e.g. >>> db.events.insert(event, j=TRUE, w=N)  [n>1] However, the chapter does not go on to suggest how one might parse such raw weblog data into something more structured. However, here’s a worked example using Pentaho. A Kettle package is available here ************************************************************************************************* Now the data has been processed (parsed) one can begin querying. >>> q_events = db.events.find({‘path’:’/apache_pb.gif’}) Would return all documents wit the apache_pb.gif value in the path field

__________________________________________

INDEXING? Be mindful about performance and indexing. >>> db_events.ensure_index(‘path’) **Be wary of the size they take up in RAM. It makes sense to index here as the entire number of ‘path’ values is small in relation to the number of documents, which curtails the space the index needs. >>>db.command(‘collstats’, ‘events’) [‘indexSizes’] will show the size of the index

__________________________________________

>>> q_events = db.events.find(‘time’:{‘$gte’:datetime(2000,10,10), ‘$lt’:datetime(2000,10,11)}) Will return documents from the events collection that occurred between October and November

>>>q_events = db_events.find({‘host’: ‘127.0.0.1’, ‘time’:{‘$gte:datetime(2000,10,10)}}) Returns events on host 127.0.0.1 on or after Oct 2000 *Note performance may be improved by a compound index. A great blog on Mongo Indexing here

Counting requests by date and page Finding requests is useful, but often the query will need summarisation. This is best done using MongoDB’s aggregation framework 

Here’s a link translating SQL queries to Aggregation Queries

In the below example, you can consider $match = WHERE; $project = SELECT and $group = GROUP BY

PDI & MongoDB (ETL)

Having still not yet fully got to grips with network configuration and waiting for delivery of a final SD card, i’m turning my attention to ETL, specifically learning Pentaho Data Integration (Kettle).

PDI

I’m reading Pulvirenti and Roldan’s Pentaho Data Integration Cookbook which is pretty recent (mid 2011). Notes below are taken from there.

All of the ETL jobs/packages featured in Pentaho Data Integration Cookbook can be downloaded here and opened up in Kettle.

There are also some further tutorials here and some useful Pentaho setup instructions here

Connecting to a database

It looks like there are 3 ways to connect to a database from Pentaho

  1. Create a connection to a supported DBMS
  2. Create a driver-based connection
  3. Use the bigdata utilities within Spoon.

In order to create a DB connection, you’ll need

  1. Host name – or IP address of the DB server (*I still haven’t quite yet fixed IPs on the 5 Raspberry Pis*)
  2. Port number (*as above – some settings on R Pi still to be made*)
  3. User Name
  4. Password

3 & 4 are generally null for MongoDB.

Process

  1. Open Spoon and create a new transformation
  2. View > right-click ‘connections’, select ‘new’ The DB connection dialog appears
  3. Select the DB engine

However, there is no DBMS connection (yet) for MongoDB in Spoon. Instead, switch to the design view and choose Bigdata.

Again, I am do not fully understand which IP and ports to enter until I have finished the sysconfig on each R Pi. The pic below shows me connecting to a MongoDB instance on my laptop, using the default (unchanged) port. I would need to specify the collection if several existed.

PentahoMongoDBInput

Alternatively, if you want to create a connection to a DBMS not in the list; First of all you have to get the JDBC driver for MongoDB.

Copy the .jar file containing the driver to the folder libtext/JDBC directory inside the Kettle installation directory. Create the connection.

In this case, choose Generic Database. In the settings frame specifiy the connection string(?), the driver class name, the uname and pw. In order to find these settings you will have to refer to the driver documentation. I’m trying to locate this information from the 10gen community/driver developers.

Re-use the connection

Avoid creating the DB connection again and again by sharing the connection. Right-click the DB connection under the DB connections tree and click on share. The connection will now be available to be used in all transformations & jobs. Shared connections are shown in bold in the console.

Check the connection is open at runtime

Insert a preceding step Check DB connection. The entry will return True or False.

Creating parameterised inputs

If you need to create an input dataset with data coming from an existing database you do it using a table input step. If the SELECT statement is simple and doesn’t change, it can be entered into the table input settings window. However, most of the time the dataset being selected is dynamic and the query needs to be flexible.

  1. Create a transformation
  2. Before getting the data segment, you have to create a stream that will provide the parameters for the statement
  3. Create a stream that builds a pre-cursor dataset with a single row and 2 columns; in the case of the sampledata dataset being used , we use the <product line> parameter and the <scale> parameter – but these could be any relevant fields from a MongoDB collection. Select a single column value for each eg ‘Classic Cars’ for <prouductline_par> and ‘1:10’ for productscale_par
  4. Add a data grid step or generate rows step.
  5. Now drag to the canvas a table input step and create a hop from the last step of the stream created above, towards this step.
  6. Now you can configure the table input step . Double-click it, select the appropriate DB connection and specify the appropriate filtersconditions in the WHERE clause ie WHERE productline = ? AND productscale = ?
  7. In the insert data from step list, select the name of the step that is linked to the table input step. Close the window.
  8. Finally, select the table input step and do a preview of the transformation. You should see a limited set of results based on the parameters.

How it works…

When you need to execute a SELECT statement with parameters, the 1st thing you have to do is to build a stream that provides the parameter values needed by the statement. The stream can be made of just one step eg a data grid with to-match values. The important thing is that the last step delivers the proper values to the table input step.

The last step in the stream is then linked to the table input step. The query is prepared and the values coming to the table input step are assigned/bound to the placeholders – that is where you used the ? symbol(s).
– The number of fields coming to a table input must be exactly the same as the number of questions marks found in the query
– ‘?’ can only be used to parameterise value expressions. Keywords or identifiers eg table names cannot be parameterised using this method, but via another method.

Executing the SELECT statement several times, each for a different set of parameters

…more coming soon

PDI – Writing to MongoDB

Got this to work today (kind of). I downloaded a file from http://data.gov.uk/dataset/gp-practice-prescribing-data

Showing GP prescription data over time and by anonymised surgery areas, 4.1m rows, 5 columns.

Imported the csv file from the desktop in to PDI, then wrote out to MongoDB. Note *no need to define a schema up-front here*, a nice feature of MongoDB/noSQL DBs. Although, admittedly it is a simple and consistent (non-nested) dataset.Image

The resultant new ‘collection’ in MongoDB.
Image

Useful guidance here http://wiki.pentaho.com/display/EAI/MongoDb+output

The full set of data has been written to a new MongoDB collection, but the results look a bit funny. Need to re-check the setup of the ETL package.

Made a few tweaks and now the data looks good.

I still haven’t got my Raspberry Pis set up. If I had, it would be interesting to try to get this job to run in parallel using the 51MB RAM in each of the four PIs. I’d hope it’d be a fair bit faster!
– It’s currently writing about 12.5k rows/second on the laptop alone, the job runs in 5m 26s.

I’m going to try and achieve the same now using MongoImport
http://docs.mongodb.org/manual/reference/mongoimport/

Example Syntax:

mongoimport --db users --collection contacts --type csv --file /opt/backups/contacts.csv

There’s some good discussion on the Google group ‘Mongodb-user’ about sharding and ‘chunking’ data prior to load. Now, if only I could get these 4 Raspberry Pis working!
https://groups.google.com/forum/?fromgroups=#!searchin/mongodb-user/import/mongodb-user/9sZoq5iN1KY/XqT-swxWqk0J

Jaspersoft ETL // Pentaho Data Integration (Spoon)

Finally got the community edition of Jaspersoft installed. Was a JAVA VM issue. Updated to 64-bit java and now running. Previously had a ‘PATH’ error.

There’s a MongoDB connector or ‘adapter’ in Jaspersoft parlance. Looking good so far. Don’t forget to fire up mongod.exe to start up the DB…..but, It turns out that Jaspersoft just white label Talend and the Talend ETL download points to the wrong software (iStudio, which is a visual report design tool).Image

So I’m switching to Kettle (Pentaho) for ETL.

PDI

My aim for the day is to find some open data (maybe the Guardian) and push it into MongoDB, through Kettle, as a new collection. Let’s see!

Installed Kettle. Now need to install a MongoDB driver and stick it in the libext folder (you can extract the kettle zip installation to anywhere, i’ve extracted to C:\Kettle.

https://github.com/mongodb/mongo-java-driver/downloads
– Basically following Matt Casters’ tutorial http://www.ibridge.be/?p=196, although that now seems a bit outdated and the functionality now baked in to PDI, see the updated demo http://wiki.pentaho.com/display/EAI/MongoDB+Input

So far, so good. Connected to my MongoDB, read in the collection.

Reading data out of my local Mongo Collection and in to PDI for onward transformation etc

But, get an error in the flow. Looks like PDI can’t talk/write to MongoDB. Head-scratching…

...So near and yet so far!

Taking quite a like to Kettle (PDI) and working my way through the 674 pages in the excellent http://www.amazon.co.uk/Pentaho-Kettle-Solutions-Integration-ebook/dp/B0042JSLWO/ref=dp_kinw_strp_1