Skip to main content

A paper a day keeps the doctor away: MillWheel: Fault-Tolerant Stream Processing at Internet Scale


The recent data explosion, and the increase in appetite for fast results spurred a lot of interest in low-latency data processing systems. One such system is MillWheel, presented in the paper "MillWheel: Fault-Tolerant Stream Processing at Internet Scale", which is widely used at Google.

In MillWheel, the users specify a directed computation graph that describe what they would like to do, and write application code that runs on each individual node in the graph. The system takes care of managing the flow of data within the graph, persisting the state of the computation, and handling any failures that occur, relieving the users from that burden.

MillWheel exposes an API for record processing, that handles each record in an idempotent fashion, with an exactly once delivery semantics. The system checkpoints progress with a fine granularity, removing the need to buffer data between external senders.

The authors describe the system using the Zeitgeist product at Google as an example, where it is used to figure out the breakout search queries for a given period. The computation involves bucketing records in one second intervals, and comparing the expected traffic to a predictive model. If the quantities differ consistently over a reasonable amount of buckets, the computation concludes that the query is either spiking or dipping.

The authors list the principles for designing a stream processing at scale:
  • Data availability: The data should be available soon after it is published
  • Persistency: Persistent state abstractions should be available to user code
  • Out-of-order processing: Out of order data should be handled gracefully
  • Watermarks: to handle late arriving data
  • Latency: Reasonable latency as the system scales
  • Delivery semantics: Exactly once delivery semantics

MillWheel represents inputs and outputs as tuples consisting of a key, value, and a timestamp. The key provides filters for the computation, the value is a byte array that contains the payload, and the timestamp is an arbitrary value assigned by the user, but usually tracks the wall clock time when the event has occurred. MillWheel guarantees an idempotent processing of the tuples as long as the users adhere to the MillWheel API and state persistence mechanisms. When failures occur, MillWheel handles the restarts and retries for the user. To guarantee delivery, MillWheel checkpoints records atomically.

MillWheel exposes some key concepts for its users. Core to MillWheel is the concept of a computation, which encompasses the application logic containing arbitrary user code. MillWheel delivers records to these computations idempotently, and if the user is contacting external systems, it is up to them to ensure that the effects of their code on such systems is also idempotent.

Next are keys, and key extraction functions. The user specifies a key extraction function, which associates a key with the record being processed. In the Zeitgeist example, the key could be a search query, or a cookie fingerprint to detect spam or bot traffic.

Next are streams, which provide the delivery mechanism between different computations. The computation subscribes to zero or more input streams, and publishes one or more output streams. The streams have associated names, and don't have any restrictions on the numbers of subscribers and publishers.

Next is the persistent state, which is an opaque byte string managed on a per-key basis. MillWheel persists the state in a highly available and replicated data store such as BigTable or spanner. Some examples of the persisted state are counters, window aggregation data, or data needed for a join.

Next are the water marks which are used to handle late arriving data. MillWheel computes the watermark for a computation through the formula:
min(oldest work of A, low watermark of C : C outputs to A)

Computations should expect a small rate of late records beyond the low watermark, and it is up to the system to handle these. For example Zeitgeist drops such data (typically around 0.001% of records).

Next are timers, which are per key hooks triggered either at a specific wall clock time, or a watermark value.

To guarantee exactly once delivery, MillWheel generates a unique ID for every record. Before processing the record, it checks against deduplication data, and discards the record if it is a duplicate. Then it runs the user code for the input record, and commits the pending changes to the backing store. It then acknowledges the senders and sends the results to the downstream consumers. To make the deduping fast, MillWheel uses a bloom filter for the matches. If the record ID is not in the filter, MillWheel reads the backing store to determine if the record is a duplicate or not. MillWheel garbage collects the IDs for past deliveries after it can guarantee that all internal senders have finished retrying.

The authors evaluate MillWheel on a multitude of dimensions, including output latency, watermark lag, and scale. They share some of the results of testing the system when deployed to a varying number of CPUs. When testing on 200 CPUs, the median record delay is 3.6 milliseconds and the 95th percentile latency is 30 milliseconds. After enabling exactly once delivery and strong productions, the median latency jumps to 33.7 milliseconds and the 95th percentile latency to 93.8 milliseconds. For tests ranging from 20 CPUs to 2000 CPUs, the median latency stayed roughly constant regardless of the system size, and the 99th percentile latency was on the order of magnitude of 100 milliseconds.

The authors list some of the internal consumers at Google for MillWheel, including billing pipelines that depend on its exactly once guarantees, ads systems that rely on the low latency updates for customer facing dashboards, Zeitgeist and anomaly-detection services, and image processing for Google Street View.

The authors end with comparing the system to others such as S4 and Sonora--which don't address exactly once processing, and Storm which recently added that support with Trident.

Comments

Popular posts from this blog

Kindle Paperwhite

I have always been allergic to buying specialized electronic devices that do only one thing, such as the Kindle, the iPod, and fitness trackers. Why buy these when technology evolves so fast that a multi-purpose device such as the phone or a smart watch can eventually do the same thing, but with the convenience of updates that fix bugs and add functionality? So, I was shocked when this weekend I made an impulse buy and got the newest Kindle Paperwhite—a special purpose device for reading eBooks. I was walking past the Amazon store in the mall and saw that the newest Kindle Paperwhites were marked down by $40 for the holidays. The device looked good in the display, so I went in to look at it closely. The Paperwhite is small and light, with a 6” screen that is backlit and waterproof.   The text was crisp and readable, and in the ambient light, it felt like I am reading a printed book. I was sold and bought it on the spot. At home I have struggled to put it down. The bo...

A paper a day keeps the doctor away: NoDB

In most database systems, the user defines the shape of the data that is stored and queried using concepts such as entities and relations. The database system takes care of translating that shape into physical storage, and managing its lifecycle. Most of the systems store data in the form of tuples, either in row format, or broken down into columns and stored in columnar format. The system also stores metadata associated with the data, that helps with speedy retrieval and processing. Defining the shape of the data a priori, and transforming it from the raw or ingestion format to the storage format is a cost that database systems incur to make queries faster. What if we can have fast queries without incurring that initial cost? In the paper " NoDB: Efficient Query Execution on Raw Data Files ", the authors examine that question, and advocate a system (NoDB) that answers it. The authors start with the motivation for such a system. With the recent explosion of data...

A paper a day keeps the dr away: Dapper a Large-Scale Distributed Systems Tracing Infrastructure

Modern Internet scale applications are a challenge to monitor and diagnose. The applications are usually comprised of complex distributed systems that are built by multiple teams, sometimes using different languages and technologies. When one component fails or misbehaves, it becomes a nightmare to figure out what went wrong and where. Monitoring and tracing systems aim to make that problem a bit more tractable, and Dapper, a system by Google for large scale distributed systems tracing is one such system. The paper starts by setting the context for Dapper through the use of a real service: "universal search". In universal search, the user types in a query that gets federated to multiple search backends such as web search, image search, local search, video search, news search, as well as advertising systems to display ads. The results are then combined and presented back to the user. Thousands of machines could be involved in returning that result, and any poor p...