The recent data
explosion, and the increase in appetite for fast results spurred a lot of
interest in low-latency data processing systems. One such system is MillWheel,
presented in the paper "MillWheel:
Fault-Tolerant Stream Processing at Internet Scale", which is widely
used at Google.
In MillWheel, the
users specify a directed computation graph that describe what they would like
to do, and write application code that runs on each individual node in the
graph. The system takes care of managing the flow of data within the graph,
persisting the state of the computation, and handling any failures that occur,
relieving the users from that burden.
MillWheel exposes an
API for record processing, that handles each record in an idempotent fashion,
with an exactly once delivery semantics. The system checkpoints progress with a
fine granularity, removing the need to buffer data between external senders.
The authors describe
the system using the Zeitgeist product at Google as an example, where it is
used to figure out the breakout search queries for a given period. The
computation involves bucketing records in one second intervals, and comparing
the expected traffic to a predictive model. If the quantities differ
consistently over a reasonable amount of buckets, the computation concludes
that the query is either spiking or dipping.
The authors list the
principles for designing a stream processing at scale:
- Data availability: The data should be available soon after it is published
- Persistency: Persistent state abstractions should be available to user code
- Out-of-order processing: Out of order data should be handled gracefully
- Watermarks: to handle late arriving data
- Latency: Reasonable latency as the system scales
- Delivery semantics: Exactly once delivery semantics
MillWheel represents
inputs and outputs as tuples consisting of a key, value, and a timestamp. The
key provides filters for the computation, the value is a byte array that
contains the payload, and the timestamp is an arbitrary value assigned by the
user, but usually tracks the wall clock time when the event has occurred.
MillWheel guarantees an idempotent processing of the tuples as long as the
users adhere to the MillWheel API and state persistence mechanisms. When
failures occur, MillWheel handles the restarts and retries for the user. To
guarantee delivery, MillWheel checkpoints records atomically.
MillWheel exposes
some key concepts for its users. Core to MillWheel is the concept of a
computation, which encompasses the application logic containing arbitrary user
code. MillWheel delivers records to these computations idempotently, and if the
user is contacting external systems, it is up to them to ensure that the
effects of their code on such systems is also idempotent.
Next are keys, and
key extraction functions. The user specifies a key extraction function, which
associates a key with the record being processed. In the Zeitgeist example, the
key could be a search query, or a cookie fingerprint to detect spam or bot traffic.
Next are streams,
which provide the delivery mechanism between different computations. The
computation subscribes to zero or more input streams, and publishes one or more
output streams. The streams have associated names, and don't have any
restrictions on the numbers of subscribers and publishers.
Next is the
persistent state, which is an opaque byte string managed on a per-key basis.
MillWheel persists the state in a highly available and replicated data store
such as BigTable or spanner. Some examples of the persisted state are counters,
window aggregation data, or data needed for a join.
Next are the water
marks which are used to handle late arriving data. MillWheel computes the
watermark for a computation through the formula:
min(oldest work of
A, low watermark of C : C outputs to A)
Computations should
expect a small rate of late records beyond the low watermark, and it is up to
the system to handle these. For example Zeitgeist drops such data (typically
around 0.001% of records).
Next are timers,
which are per key hooks triggered either at a specific wall clock time, or a
watermark value.
To guarantee exactly
once delivery, MillWheel generates a unique ID for every record. Before
processing the record, it checks against deduplication data, and discards the
record if it is a duplicate. Then it runs the user code for the input record,
and commits the pending changes to the backing store. It then acknowledges the
senders and sends the results to the downstream consumers. To make the deduping
fast, MillWheel uses a bloom filter for the matches. If the record ID is not in
the filter, MillWheel reads the backing store to determine if the record is a
duplicate or not. MillWheel garbage collects the IDs for past deliveries after
it can guarantee that all internal senders have finished retrying.
The authors evaluate
MillWheel on a multitude of dimensions, including output latency, watermark
lag, and scale. They share some of the results of testing the system when
deployed to a varying number of CPUs. When testing on 200 CPUs, the median
record delay is 3.6 milliseconds and the 95th percentile latency is 30
milliseconds. After enabling exactly once delivery and strong productions, the
median latency jumps to 33.7 milliseconds and the 95th percentile latency to
93.8 milliseconds. For tests ranging from 20 CPUs to 2000 CPUs, the median
latency stayed roughly constant regardless of the system size, and the 99th
percentile latency was on the order of magnitude of 100 milliseconds.
The authors list
some of the internal consumers at Google for MillWheel, including billing
pipelines that depend on its exactly once guarantees, ads systems that rely on
the low latency updates for customer facing dashboards, Zeitgeist and
anomaly-detection services, and image processing for Google Street View.
The authors end with
comparing the system to others such as S4 and Sonora--which don't address
exactly once processing, and Storm which recently added that support with
Trident.
Comments
Post a Comment