Skip to main content

A paper a day keeps the doctor away: MillWheel: Fault-Tolerant Stream Processing at Internet Scale


The recent data explosion, and the increase in appetite for fast results spurred a lot of interest in low-latency data processing systems. One such system is MillWheel, presented in the paper "MillWheel: Fault-Tolerant Stream Processing at Internet Scale", which is widely used at Google.

In MillWheel, the users specify a directed computation graph that describe what they would like to do, and write application code that runs on each individual node in the graph. The system takes care of managing the flow of data within the graph, persisting the state of the computation, and handling any failures that occur, relieving the users from that burden.

MillWheel exposes an API for record processing, that handles each record in an idempotent fashion, with an exactly once delivery semantics. The system checkpoints progress with a fine granularity, removing the need to buffer data between external senders.

The authors describe the system using the Zeitgeist product at Google as an example, where it is used to figure out the breakout search queries for a given period. The computation involves bucketing records in one second intervals, and comparing the expected traffic to a predictive model. If the quantities differ consistently over a reasonable amount of buckets, the computation concludes that the query is either spiking or dipping.

The authors list the principles for designing a stream processing at scale:
  • Data availability: The data should be available soon after it is published
  • Persistency: Persistent state abstractions should be available to user code
  • Out-of-order processing: Out of order data should be handled gracefully
  • Watermarks: to handle late arriving data
  • Latency: Reasonable latency as the system scales
  • Delivery semantics: Exactly once delivery semantics

MillWheel represents inputs and outputs as tuples consisting of a key, value, and a timestamp. The key provides filters for the computation, the value is a byte array that contains the payload, and the timestamp is an arbitrary value assigned by the user, but usually tracks the wall clock time when the event has occurred. MillWheel guarantees an idempotent processing of the tuples as long as the users adhere to the MillWheel API and state persistence mechanisms. When failures occur, MillWheel handles the restarts and retries for the user. To guarantee delivery, MillWheel checkpoints records atomically.

MillWheel exposes some key concepts for its users. Core to MillWheel is the concept of a computation, which encompasses the application logic containing arbitrary user code. MillWheel delivers records to these computations idempotently, and if the user is contacting external systems, it is up to them to ensure that the effects of their code on such systems is also idempotent.

Next are keys, and key extraction functions. The user specifies a key extraction function, which associates a key with the record being processed. In the Zeitgeist example, the key could be a search query, or a cookie fingerprint to detect spam or bot traffic.

Next are streams, which provide the delivery mechanism between different computations. The computation subscribes to zero or more input streams, and publishes one or more output streams. The streams have associated names, and don't have any restrictions on the numbers of subscribers and publishers.

Next is the persistent state, which is an opaque byte string managed on a per-key basis. MillWheel persists the state in a highly available and replicated data store such as BigTable or spanner. Some examples of the persisted state are counters, window aggregation data, or data needed for a join.

Next are the water marks which are used to handle late arriving data. MillWheel computes the watermark for a computation through the formula:
min(oldest work of A, low watermark of C : C outputs to A)

Computations should expect a small rate of late records beyond the low watermark, and it is up to the system to handle these. For example Zeitgeist drops such data (typically around 0.001% of records).

Next are timers, which are per key hooks triggered either at a specific wall clock time, or a watermark value.

To guarantee exactly once delivery, MillWheel generates a unique ID for every record. Before processing the record, it checks against deduplication data, and discards the record if it is a duplicate. Then it runs the user code for the input record, and commits the pending changes to the backing store. It then acknowledges the senders and sends the results to the downstream consumers. To make the deduping fast, MillWheel uses a bloom filter for the matches. If the record ID is not in the filter, MillWheel reads the backing store to determine if the record is a duplicate or not. MillWheel garbage collects the IDs for past deliveries after it can guarantee that all internal senders have finished retrying.

The authors evaluate MillWheel on a multitude of dimensions, including output latency, watermark lag, and scale. They share some of the results of testing the system when deployed to a varying number of CPUs. When testing on 200 CPUs, the median record delay is 3.6 milliseconds and the 95th percentile latency is 30 milliseconds. After enabling exactly once delivery and strong productions, the median latency jumps to 33.7 milliseconds and the 95th percentile latency to 93.8 milliseconds. For tests ranging from 20 CPUs to 2000 CPUs, the median latency stayed roughly constant regardless of the system size, and the 99th percentile latency was on the order of magnitude of 100 milliseconds.

The authors list some of the internal consumers at Google for MillWheel, including billing pipelines that depend on its exactly once guarantees, ads systems that rely on the low latency updates for customer facing dashboards, Zeitgeist and anomaly-detection services, and image processing for Google Street View.

The authors end with comparing the system to others such as S4 and Sonora--which don't address exactly once processing, and Storm which recently added that support with Trident.

Comments

Popular posts from this blog

Why good customer service matters?

I am not an Apple fan, but I do like their computers, and recommend them to colleagues and friends for a variety of reasons. They are well designed, and in addition to an excellent user interface, they run a flavor of Unix--which makes the life of computer programmers a lot easier. But most importantly, Apple's customer support is impeccable, that despite all the hardware issues I experienced in the past, I still recommend Apple computers. Let me explain why. A year and a half ago, I bought a Mac Book Pro for work. At the time it was the first generation unibody laptop, that had an i7 processor, lots of memory, and lots of disk space. Alas, like first generation models everywhere, it also had a lot of hardware problems. The most annoying of which was the screen randomly turning dark, with the hard drive spinning out of control. The only way to get out of this state was by forcing a reboot by holding down the power button, and losing everything I have been working on. At first

Kindle Paperwhite

I have always been allergic to buying specialized electronic devices that do only one thing, such as the Kindle, the iPod, and fitness trackers. Why buy these when technology evolves so fast that a multi-purpose device such as the phone or a smart watch can eventually do the same thing, but with the convenience of updates that fix bugs and add functionality? So, I was shocked when this weekend I made an impulse buy and got the newest Kindle Paperwhite—a special purpose device for reading eBooks. I was walking past the Amazon store in the mall and saw that the newest Kindle Paperwhites were marked down by $40 for the holidays. The device looked good in the display, so I went in to look at it closely. The Paperwhite is small and light, with a 6” screen that is backlit and waterproof.   The text was crisp and readable, and in the ambient light, it felt like I am reading a printed book. I was sold and bought it on the spot. At home I have struggled to put it down. The books

New ASUS RT-AX88U router

  I have been using Asus routers for many years, and have been pretty happy with them. The web interface is superb, and the firmware upgrades are timely and easy to apply, and over the last couple of years have introduced newer features that kept my old router relevant and functional.   After many years of service, my older router finally gave way, and started dropping Wifi connections randomly, especially when under heavy load. The connection drop happens whenever the kids have a Zoom meeting, or my wife and I are on work calls. Turning the laptop/iPad Wifi off and on again did not help, and we usually had to reboot the router to be able to connect again. Out of curiosity I looked at the CPU/memory stats of the router under heavy load, and could not see any issues. Even when all of us were in video calls, the CPU/memory did not rise about 50%. I could not see anything abnormal in the logs either. Online I saw that a lot of people had similar problems after upgrading to the latest rout