Thursday, March 17, 2016

Building robust real-time ETL on Google BigQuery. Lambda architecture inside BigQuery.

We have quite a standard setup with our data. Raw data is collected from several internal and external sources and is regularly uploaded into one place (in our case into Google BigQuery). Some sources (like our own production) do it hourly, others (often 3rd parties) do it daily.
The data is then transformed into many aggregated forms for consumption of end-users: dashboards for internal users, reports for partners, inputs that go back to production and affect product behaviour. This transformation may be quite involved, with multiple dependencies. But mostly we cope with the task using BigQuery SQL+UDF and saving results as separate tables.
The obvious way to perform these transformations is to schedule them. If this data source is done uploading every day at 1am, then schedule that job at 1.05am. If this hourly data is normally uploaded by 5th minute of the next hour – then schedule these jobs to run every 10th minute of every hour.  Small 5min gaps are not such a big problem anyway and everything is supposed to work nicely.
But the world is cruel! Data is not always delivered in time. Sometimes not delivered at all without manual intervention. If your hourly data made it to BigQuery at 11th minute – here you go, please wait for another hour to see it in dashboards. And if your transformations require several data sources – the mess becomes even worse.
Moreover, the data is not always correct (always incorrect to be more correct!). Once in a while there is an issue and data gets re-uploaded or undergoes additional cleanups. Then you need to take care of all jobs that used this data and refresh them.
Well, all these issues are issues with raw data and we should be fighting them. But this is a war that you can’t win in reality! Something will be broken anyway. If the data source is internal – your developers can have other prioritized tasks. And 3rd parties are just not under your control.  But at least it would be very nice if once your raw data is in place, all end users have instant access to it without waiting for scheduled refreshes.
This is a big real-world problem. And what are possible solutions?

Solution #1 – remove broken parts 
If ETL is causing problems – let’s get rid of it! Don’t store any intermediary results and just calculate everything on the fly when user needs it. With outstanding performance of BigQuery this is quite possible to do. Especially if what you are doing is just GROUP BY date and count(1) while being interested in only last 14 days of data.
Most analytics tasks are like this and we are using this technique a lot. But it’s not always possible to do. It is very inefficient for big and complicated transformations.
One problem is code complexity. Combining all transformations into one SQL query will make it very messy. Luckily one can use “views” instead of tables. Views in BigQuery are like tables, but with underlying query. They are logical views (not materialized), so they get refreshed every time you query a view.
Second problem is performance. This one is serious. No matter how fast and cheap current tools are - if you are running complex transformation on 1 year of data, it will take time and cost money. There is no way around. This problem kills this solution for some very important transformations but we still use it for everything else.

Solution #2 – build it properly
If there is no way around ETL, then you can try to build it properly. Not just a schedule in cron, but a sophisticated system that monitors updates to raw data tables and runs dependent jobs. Probably pub/sub pattern would fit well here.
The drawback of this approach is simple. While building such a sophisticated system is relatively easy, maintaining and troubleshooting it would be very difficult. The more code you write – the more bugs you have.
Luckily there is a better approach.

Solution #3 – lambda architecture! … well, somewhat
Lambda architecture is a famous approach to combine batch processing and real-time processing to get advantages of both.

Normally it is done using several systems. We are doing exactly the same just inside BigQuery. Here is how it works:

Batch layer. We are running daily queries and store results into static tables. All queries have the following structure:

And results are saved to table_static. Yes, BigQuery allows you to query a table and save results overwriting that same table. In the query we are using historical results that are already calculated and append new data. X days here is a fallback period, it allows to catch up with any post-factum changes in raw data. We assume here that any possible problems with the raw data should be resolved in X days.

Speed layer + Serving layer.   These two are combined into one query:

Yes, this is the same query! This query is saved as a view named table_live. All users (dashboards, other jobs, etc.) are getting results from this view. Views in BigQuery are logical, this means that every time you want something from a view it will recalculate last X days on the fly from raw data reflecting all possible changes and append to historical data.

SELECT * FROM table_live
(and save results to table_static)

This approach has several important advantages:
  • Every user is getting fresh results, no gap between raw data and transformed data (assuming you don’t amend data after X days, and you shouldn’t)
  • Users get results fast. Nobody is recalculating 1-year of data and last X days don’t take too long in BigQuery. One of the limits on X is this speed.
  • No need to think too much about scheduling batch jobs. You need to run it daily, but it doesn’t matter when. This system is also robust to batch job failing on one day or running twice. All your batch processing has to be down for X days for your to notice! 
  • This system requires very little code (aka points of failure). Your query is 10 lines longer now and is stored inside BigQuery as a view. Plus you need to run a very, very simple scheduler for batch jobs.
  • Costs of running such system may be even lower than with scheduling. It might seem that costs should be higher as we are constantly recalculating last X days. But our experience shows the opposite:
    • If you have hourly uploads of data, you would need to schedule processing every hour, on weekends too, and still fallback X days if your data can be broken. So over a week you do it 24*7=168 times.
    • But in reality your user could be opening that dashboard only 3 times a week. This system will run such job 7 times on schedule and 3 times on demand. Much cheaper. 

PS  If you like using tables sharded by date, just as we do – there is a solution for you too. I shall describe it in my next post. But here is the hint – you can use table wildcard functions on tables when part of them are actually views.

PPS  If views in BigQuery would support caching (just like normal queries do), making them materialized views in essence, then performance of this approach would improve even more. If you agree with me – please go and add a star to this feature request.

1 comment:

Note: Only a member of this blog may post a comment.