Recomputable Data Systems (RCDS)

The evolution happening in distributed storage, distributed processing, and in-memory processing, opens the door to new ways of serving data for analytics. Instead of using complex incremental processes to serve data to your consumers, a Recomputable Data System (RCDS) re-computes your analytics datasets by reading ALL the raw data every time it runs. It is also capable of handling batch and real-time processing, presenting a current and consolidated view of your business whenever you need it.

Introduction

The RCDS is largely inspired by the work of Nathan Marz on the Lambda Architecture. It also applies to many concepts from the ELT (Extract-Load-Transform) framework. The data modeling is inspired by the work done by temporal data modelers (data vault, anchor modeling, fact-based and dimensional) but with a desire to simplify the modeling and support automation. In the following examples, I talk about specific technologies and tools whose architecture is generic and file-based so it is adaptable to a wide variety of solutions, from Big Data to small scale.

Architecture

image

Input Layer

In the input layer, the data is received by an extract process in its raw form and stored as a file in a “new data” folder. The folder is structured by repository, supplier and source. A timestamp must be added to the filename to make it unique when added to an existing folder. In large scale scenarios, you could also use “year”, “month” or “day” subfolders to help with the partitioning and the file distribution.

For example, if you are receiving customer data from salesforce daily, and at random times, the folders and files will look like this:

repository supplier source files
new salesforce customer salesforce_customer_2017_08_18_05_33_44.csv
      salesforce_customer_2017_08_19_13_55_22.csv
      salesforce_customer_2017_08_20_02_08_39.csv

Batch Layer

We want to store the data in the rawest form possible to avoid any human mistakes … but we also want to be sure the files are usable. In the Batch Layer, a “load data” process parse the new files (to make them usable) and store them in the data lake. The new files are then moved to an archive folder. Unzipping and parsing are expensive operations so having it done once and for all is a good practice.

For example, when receiving a “click” JSON file from a clickstream provider called Track.io:

repository supplier source files
new trackio click trackio_click_2017_08_18_05_33_44.json
      trackio_click_2017_08_19_13_55_22.json
      trackio_click_2017_08_20_02_08_39.json

The files are then parsed and stored in the data lake in their usable form:

repository supplier source files
lake trackio click trackio_click_2017_08_18_05_33_44.parquet
      trackio_click_2017_08_19_13_55_22.parquet
      trackio_click_2017_08_20_02_08_39.parquet

Then, the new files are moved to the archive repository:

repository supplier source files
archive trackio click trackio_click_2017_08_18_05_33_44.json
      trackio_click_2017_08_19_13_55_22.json
      trackio_click_2017_08_20_02_08_39.json

The Data Lake is an immutable repository of usable source files used to compute datasets. It is immutable, meaning that we don’t delete files or update files in the lake folders. We only add new files to existing folders. The Data Lake is the ultimate source of truth, everything can always be recomputed from it.

The load process move the files from the “new” repository to the batch layer. This process is important because when we compute multiple datasets from the data lake, we want all our datasets to be based on the exact same source data. If we skip the input layer and add new files, as they come, directly to the lake, we would have final datasets that utilize source files that were not present when the computing of the first datasets started.

The Data Archive is also immutable. It is a repository of raw files before parsing.

The batch files are final datasets that are ready to serve to our data consumers. In collaboration with the business, a temporal data model is created, and data integration rules are defined for the transformation of the lake files into integrated batch files.

The first step is to drop and rebuild the batch files. If you are using a language like Pandas, some powerful filling functions exists allowing you to directly build something resembling a snowflaked dimensional model. I prefer to not use the term “dimensional” because it forces you to classify everything as a dimension or a fact. In real-life, the distinction between a fact and a dimension is not always clear. Some data elements can even play both roles. I prefer using the term “normalized temporal model” (see next section “data modeling”). I am adding a prefix “btc” to my batch files to prepare them for being served next to speed files.

The next step is the denormalization of your normalized temporal model. At this time, the design is driven by the front-end querying tool you are using. You can denormalize your model into dimensions and facts if desired. You can also fully denormalize everything into flat files. Flat files are becoming a very efficient way to store and query data into a distributed columnar system like Amazon Redshift.

For example, a normalized temporal model and a flat file for a columnar engine:

repository files
batch btc_customer.parquet
  btc_product.parquet
  btc_product_category.parquet
  btc_sales.parquet
  btc_sales_flat.parquet

Data Modeling

We use normalized temporal modeling (NTM) to present an integrated view of the business.

The batch files have two possible designs:

· Temporal

· Transactional

image

The temporal files (like customer, product and product_category) represent the value of the attributes at different times. The primary key is the “natural key” + a “record effective time” (rec_eff_time). A “record expiration time” (rec_exp_time) and a “record is current flag” (rec_is_current) are added to simplify querying and denormalization. When the data is loaded into a temporal file, only the original value and the changes are preserved.

The transactional files (like sales) represents an ultimate unchanging fact about the business. Because it is a business fact, we don’t need to track its history. It will not change.

We are not using surrogate keys anywhere in our data model. We use natural keys. Using natural keys gives us the possibility to load temporal and transactional files in parallel (and faster) without any dependencies or lookups.

Roleplaying foreign keys will have a suffix added to the natural key (like sales.customer_id_billing).

After the temporal model is ready you can start denormalizing if needed. A fully denormalized flat file would look like this:

image

To get an accurate point-in-time representation, the temporal files are denormalized from top to bottom. First, temporal parents are rolled into temporal childs by using the child “rec_eff_time”. At the end, the remaining denormalized temporal datasets are rolled into the transactional file using the timeline that is relevant (in this example, the sales_time). When I mention “rolled into”, I mean we compute a new file because this is an immutable processing world. Roleplaying attributes are also getting tagged with their suffixes along the way.

Workflow

We use a tool like airflow to build a DAG (directed acyclic graph) that executes our compute processes and serves the data to the serving layer. We also use parallel processing as much as we can to get faster results.

image

Speed Layer

The speed files support the need for real-time querying. While the batch files are getting computed (possibly for a few hours) new data keeps coming in and gets added to the “new data” folder. A “compute speed files” process reads the “new data” folder and processes it continuously, in real-time. The speed files land in a “speed” repository. Sometimes, the speed files are not as rich as the batch files but they serve their purpose of broadcasting very recent events. The next time the compute batch process runs, the speed files data will now be part of the richer, better batch files and the cycle continues. When the development is done right, you can re-use the same business transformation functions both on batch and speed processing. I am adding a prefix “spd” to my speed files to prepare them for being serve next to batch files.

For example, we computed speed files and batch files:

repository files
batch btc_customer.csv
  btc_product.csv
  btc_product_category.csv
  btc_sales.csv
  btc_sales_flat.csv
speed spd_customer.csv
  spd_product.csv
  spd_product_category.csv
  spd_sales.csv
  spd_sales_flat.csv

Serving Layer

The batch files are uploaded to a technology accessible by your front-end tools like Amazon Redshift. In this case, they become batch tables. In our example, we would only publish the batch\btc_sales_flat.csv file to a btc_sales_flat table in Redshift.

The speed files are uploaded to a technology accessible by your front-end tools like Amazon Redshift. In this case they become speed tables. In our example, we would only publish the speed\spd_sales_flat.csv file to a spd_sales_flat table in Redshift.

The serving views’ role is to merge the data from the batch and speed tables into one structure that the end-users can access. In other words, you are presenting the batch data that may be a few hours old, next to the real-time speed data, into one consolidated interface. For example, this can be achieved with “union” views in Amazon Redshift but different tools handle this differently. In our example, we would have a view called “sales_view” doing the union of the btc_sales_flat and spd_sales_flat tables.

The serving tools are the interface between the data consumers and the serving views. It could be a mix of everything that make sense for your business like Tableau, SQL querying, Jupyter Notebooks, etc.

Master Data Management (MDM)

In every data system, you need information that does not come from an existing source (source-less data). You also need human “administrators” that add useful information like descriptions, mappings, lookups, etc. The ultimate goal is to stay away from loading human generated/maintained spreadsheets. To achieve this, you have to build an MDM user interface with basic CRUD functionalities on top of a normalized database where you store this master data. I usually use PostgreSQL and Django to achieve this. There are also many MDM tools out there but usually simplicity is all you need (hey, it almost works with spreadsheets apart from the user breaking the formatting every couple of months). The MDM layer is simply seen as another data consumer and the data it needs is pushed from the serving views to the MDM d atabase (like, for example, a natural key that needs to be mapped to a classification). The MDM database is then read by the compute processes the next time they run.

Automation

One very important concept of lean analytics is to automate everything that can be automated. When dealing with temporal data and changing information, the loading patterns can quickly become very complicated. You only need to keep the changes, de-duplicate the natural keys, format your attributes (upper, lower, initcap, trim), etc. You also have to compute the “record expiration time” values and the “record is current” flags. You also need to denormalize your model for the serving layer.

You automate those tasks by coding generic functions. They are needed across the board and will help you eliminate approximately 75% of your ETL code. Those functions are:

format_attribute(string1, format, nullreplacement)

  • Input:
    • string1: the value to clean and format
    • format: the formatting to apply to the value
      • upper (NEW ORLEANS)
      • lower (new orleans)
      • initcap (New Orleans)
      • none (keep original value: nEW orLEANs)
    • nullreplacement: a value use to replace null values. For example, “?”
  • Action:
    • It removes extra spaces on the left, right and between words.
    • It replace null values by a value you provide.
    • It format the value in upper, lower or initcap.
  • Output: a string value

compute_temporal(temporal_df, natural_id)

  • Input:
    • A raw temporal dataframe (natural_id, rec_eff_time, attribute1, attribute2, etc)
    • The natural id to use for the computation
  • Action:
    • It reads the input dataframe and compute a clean temporal dataframe.
  • Output:
    • Includes the same columns as the input
    • It includes only new and changed data
    • It is de-duplicated (on natural ID’s and effective time)
    • It has missing data filled across time (temporal integration, backfill, forwardfill)
    • It includes “record expiration time” (rec_exp_time) and “record is current” flags (rec_is_current)

denorm_temporal(child_df, child_id, parent_df, parent_id, rolename)

  • It receives two temporal dataframes and merges them together by using the natural ID and the child rec_eff_time.
  • It also handles rolename scenarios by appending a suffix to the merged attributes.

denorm_transactional(child_df, child_id, child_time, parent_df, parent_id, rolename)

  • It receives a temporal dataframes (parent) and a transactional dataframe (child), and merges them together by using the natural ID and the desired child timeline.
  • It also handles rolename scenarios by appending a suffix to the merged attributes.

Other useful automation functions

Instead of trying to keep an expensive data modeling tool in sync, you can eliminate a lot of waste by generating your data models using your batch repository’s metadata and Graphviz. You can create functions that generate your normalized temporal data model and your flat data model.

It is also good practice to wrap your different data movement functions so they are done the same way with the same options every time. For example, adding functions to read and write data from S3, HDFS, local drives, databases, etc. It is also useful to add functions for the most common parsing tasks. For example, unzipping, parsing/flattening JSON, etc.

Testing and Specifications

It helps a lot when you are describing the transformations with examples (specification by example). The business will expect to be able to revise and tune business rules over time, and you cannot ask them to look at your code. But keeping examples up to date is difficult while the development is going on and features are added with agility and speed. Trying to maintain a source-to-target mapping spreadsheet is also very difficult. What if you could do test-driven development on your data processes? This is possible. You describe complex business rules with real-life examples using gherkin (given, when, then), and code your transformations as functions that support testing. I built a dataframe testing function that, after setting up your test data (Given), compares an expected output (Then) of a transformation function to the actual output (When). In my case, those automated tests are in a jupyter notebook that can be published as an HTML document that then becomes your living documentation. No need to keep it in sync, it really exists and is directly executable by your code. The tests become executable specifications.

So, where is the data warehouse?

As you can see, this architecture reads the raw data and computes the final serving tables. There is no concept of a physical data warehouse or intermediate data store where the transformed data is integrated and kept permanently. Before the advances in distributed and in-memory processing, the data warehouse mostly existed to support incremental processing and was required until recently. Now, we have the possibility to process large amounts of data in a fast and easier way, without using incremental techniques. Of course, we still need to do data integration. When integrating disparate attributes coming from multiple sources you may have to use a data structure that looks like a data warehouse … but this structure will be temporary, existing just while the batch processing runs. Also, if you don’t have access to a programming tool that supports missing value filling, you may have to create some temporary fact-based structure (one attribute per table) to support complex data integration scenarios.

So in conclusion, the data warehousing practices are still useful (data vault, anchor, fact-based, etc) but they are getting integrated in the processing code itself instead of having a life of their own outside the code. Much the same thing happens to dimensional modeling. It is still a very good way to present the data to the business but it has now become more of a presentation model than a physical data warehouse model.

Written on October 6, 2017