Monday, March 14, 2016

A paper a day keeps the doctor away: MillWheel: Fault-Tolerant Stream Processing at Internet Scale


The recent data explosion, and the increase in appetite for fast results spurred a lot of interest in low-latency data processing systems. One such system is MillWheel, presented in the paper "MillWheel: Fault-Tolerant Stream Processing at Internet Scale", which is widely used at Google.

In MillWheel, the users specify a directed computation graph that describe what they would like to do, and write application code that runs on each individual node in the graph. The system takes care of managing the flow of data within the graph, persisting the state of the computation, and handling any failures that occur, relieving the users from that burden.

MillWheel exposes an API for record processing, that handles each record in an idempotent fashion, with an exactly once delivery semantics. The system checkpoints progress with a fine granularity, removing the need to buffer data between external senders.

The authors describe the system using the Zeitgeist product at Google as an example, where it is used to figure out the breakout search queries for a given period. The computation involves bucketing records in one second intervals, and comparing the expected traffic to a predictive model. If the quantities differ consistently over a reasonable amount of buckets, the computation concludes that the query is either spiking or dipping.

The authors list the principles for designing a stream processing at scale:
  • Data availability: The data should be available soon after it is published
  • Persistency: Persistent state abstractions should be available to user code
  • Out-of-order processing: Out of order data should be handled gracefully
  • Watermarks: to handle late arriving data
  • Latency: Reasonable latency as the system scales
  • Delivery semantics: Exactly once delivery semantics

MillWheel represents inputs and outputs as tuples consisting of a key, value, and a timestamp. The key provides filters for the computation, the value is a byte array that contains the payload, and the timestamp is an arbitrary value assigned by the user, but usually tracks the wall clock time when the event has occurred. MillWheel guarantees an idempotent processing of the tuples as long as the users adhere to the MillWheel API and state persistence mechanisms. When failures occur, MillWheel handles the restarts and retries for the user. To guarantee delivery, MillWheel checkpoints records atomically.

MillWheel exposes some key concepts for its users. Core to MillWheel is the concept of a computation, which encompasses the application logic containing arbitrary user code. MillWheel delivers records to these computations idempotently, and if the user is contacting external systems, it is up to them to ensure that the effects of their code on such systems is also idempotent.

Next are keys, and key extraction functions. The user specifies a key extraction function, which associates a key with the record being processed. In the Zeitgeist example, the key could be a search query, or a cookie fingerprint to detect spam or bot traffic.

Next are streams, which provide the delivery mechanism between different computations. The computation subscribes to zero or more input streams, and publishes one or more output streams. The streams have associated names, and don't have any restrictions on the numbers of subscribers and publishers.

Next is the persistent state, which is an opaque byte string managed on a per-key basis. MillWheel persists the state in a highly available and replicated data store such as BigTable or spanner. Some examples of the persisted state are counters, window aggregation data, or data needed for a join.

Next are the water marks which are used to handle late arriving data. MillWheel computes the watermark for a computation through the formula:
min(oldest work of A, low watermark of C : C outputs to A)

Computations should expect a small rate of late records beyond the low watermark, and it is up to the system to handle these. For example Zeitgeist drops such data (typically around 0.001% of records).

Next are timers, which are per key hooks triggered either at a specific wall clock time, or a watermark value.

To guarantee exactly once delivery, MillWheel generates a unique ID for every record. Before processing the record, it checks against deduplication data, and discards the record if it is a duplicate. Then it runs the user code for the input record, and commits the pending changes to the backing store. It then acknowledges the senders and sends the results to the downstream consumers. To make the deduping fast, MillWheel uses a bloom filter for the matches. If the record ID is not in the filter, MillWheel reads the backing store to determine if the record is a duplicate or not. MillWheel garbage collects the IDs for past deliveries after it can guarantee that all internal senders have finished retrying.

The authors evaluate MillWheel on a multitude of dimensions, including output latency, watermark lag, and scale. They share some of the results of testing the system when deployed to a varying number of CPUs. When testing on 200 CPUs, the median record delay is 3.6 milliseconds and the 95th percentile latency is 30 milliseconds. After enabling exactly once delivery and strong productions, the median latency jumps to 33.7 milliseconds and the 95th percentile latency to 93.8 milliseconds. For tests ranging from 20 CPUs to 2000 CPUs, the median latency stayed roughly constant regardless of the system size, and the 99th percentile latency was on the order of magnitude of 100 milliseconds.

The authors list some of the internal consumers at Google for MillWheel, including billing pipelines that depend on its exactly once guarantees, ads systems that rely on the low latency updates for customer facing dashboards, Zeitgeist and anomaly-detection services, and image processing for Google Street View.

The authors end with comparing the system to others such as S4 and Sonora--which don't address exactly once processing, and Storm which recently added that support with Trident.

No comments :

Post a Comment