Monday, March 21, 2016

A paper a day keeps the doctor away: NoDB


In most database systems, the user defines the shape of the data that is stored and queried using concepts such as entities and relations. The database system takes care of translating that shape into physical storage, and managing its lifecycle. Most of the systems store data in the form of tuples, either in row format, or broken down into columns and stored in columnar format. The system also stores metadata associated with the data, that helps with speedy retrieval and processing. Defining the shape of the data a priori, and transforming it from the raw or ingestion format to the storage format is a cost that database systems incur to make queries faster. What if we can have fast queries without incurring that initial cost? In the paper "NoDB: Efficient Query Execution on Raw Data Files", the authors examine that question, and advocate a system (NoDB) that answers it.

The authors start with the motivation for such a system. With the recent explosion of data, and our insatiable appetite for quick insights, loading and transforming the data a priori is not a welcome overhead. This is especially the case if the user wants to perform a series of ad-hoc explorations on vast amounts of data, and is interested in decreasing the data-to-query time. The authors propose that one way to accomplish that is to eliminate the loading phase, and advocate that querying over the raw data in-situ is a viable option.

Instead of starting from scratch, the authors modify a traditional relational database (PostgreSQL) into a NoDB system, and discuss how they improved query performance using techniques such as selective parsing, adaptive indexing, and cashing techniques.

The authors start with two straightforward approaches to query raw data files. In the first approach, they propose to run the loading procedure whenever a table is referenced, and then evaluate the query on the loaded data. The loaded data may be discarded after the query executes, or may be persisted on disk.

In the second approach the authors discuss tightly integrated raw file access within the query execution logic, where the leaf operators of a query plan are enriched with the ability to access raw data files including parsing and tokenization. The data parsing and processing occurs in a pipelined fashion, where the raw file is read in chunks and parsed and passed immediately to the rest of the query plan operators.

Both approaches require the schema to be known and declared a priori, and the tables to be defined as in-situ tables; features that are offered by modern database systems such as MySQL. The downside of both approaches is that the data is not kept in persistent tables, and so for repeated queries against the tables, every invocation needs to perform the loading and parsing from scratch. Moreover both approaches can't make use of indexes and cost optimizations for the raw data file.

The authors then examine the NoDB solution, which aims to provide in-situ query processing performance competitive with traditional database systems. The authors discuss how they created data structures that minimize the cost of raw data access for in-situ querying, and selectively eliminate the need for raw data access through careful caching and scheduling. The authors use processing over a CSV file as a vehicle to explain their ideas.

When a query is submitted to the NoDB solution, and references relational tables that are not yet loaded, the system overrides the scan operator with the ability to access raw data files directly, with the remaining query plan generated by the optimizer remaining unchanged. The system speeds up the raw access through a variety of techniques. First, it performs selective tokenization of the raw files, where it aborts tokenizing tuples as soon as it finds the required attributes for the query. Second it forms tuples selectively, where only the ones that contain the attributes for a query are fully composed. Third it keeps an adaptive positional map that describes the raw file shape discovered so far. The system uses the positional map to reduce parsing and tokenization costs by using it to navigate and retrieve raw data faster.

Initially the positional map is empty. As the system executes queries it continuously augments the map by populating it while tokenizing the raw file for the current query. For subsequent queries, the system uses the map to jump to the exact position in the file for the tuple it is looking for, or as close to it as possible.

NoDB also uses a cache that temporarily holds previous accessed data, and accesses the cache instead of reprocessing the raw data file when another query runs.

Another technique the authors used is to calculate table statistics using sampled data instead of the full data set. The authors modified the  PostgreSQL scan operator to create statistics on the fly, and the system would invoke the native PostgreSQL statistics routines with a sample of the data, and store and use them similar to the way the conventional DB does.

The authors share the result of the experiments they've conducted on the modified system, and compare it traditional DBMS performance for similar queries. The authors implemented NoDB on top of PostgreSQL 9.0, and ran their experiments on a Sun X4140 server with 2x Quad-Core AMD Opteron processor (64 bit), 2.7 GHz, 512 KB L1 cache, 2 MB L2 cache and 6 MB L3 cache, 32 GB RAM, 4 x 250GB 10000 RPM SATA disks (RAID-0), using Ubuntu 9.04. The experiments used a raw data file of 11GB containing 7.5 million tuples, each containing 150 attributes with random integers.

The authors investigate the effect of the positional map with varying storage capacity. The authors show that the positional map improved response times by a factor of 2. The authors also observed linear scalability as the file size was increased gradually from 2GB to 92GB.

The other experiments show that the auxiliary structures (caching, maps) reduce the time to access raw data files and amortize the overhead across subsequent queries.

The authors close with the challenges with their approach including data type conversion, complex data schemas, and integration with external tools, and the opportunities for NoDB including flexible storage, adaptive indexing, and file system interfaces.

Monday, March 14, 2016

A paper a day keeps the doctor away: MillWheel: Fault-Tolerant Stream Processing at Internet Scale


The recent data explosion, and the increase in appetite for fast results spurred a lot of interest in low-latency data processing systems. One such system is MillWheel, presented in the paper "MillWheel: Fault-Tolerant Stream Processing at Internet Scale", which is widely used at Google.

In MillWheel, the users specify a directed computation graph that describe what they would like to do, and write application code that runs on each individual node in the graph. The system takes care of managing the flow of data within the graph, persisting the state of the computation, and handling any failures that occur, relieving the users from that burden.

MillWheel exposes an API for record processing, that handles each record in an idempotent fashion, with an exactly once delivery semantics. The system checkpoints progress with a fine granularity, removing the need to buffer data between external senders.

The authors describe the system using the Zeitgeist product at Google as an example, where it is used to figure out the breakout search queries for a given period. The computation involves bucketing records in one second intervals, and comparing the expected traffic to a predictive model. If the quantities differ consistently over a reasonable amount of buckets, the computation concludes that the query is either spiking or dipping.

The authors list the principles for designing a stream processing at scale:
  • Data availability: The data should be available soon after it is published
  • Persistency: Persistent state abstractions should be available to user code
  • Out-of-order processing: Out of order data should be handled gracefully
  • Watermarks: to handle late arriving data
  • Latency: Reasonable latency as the system scales
  • Delivery semantics: Exactly once delivery semantics

MillWheel represents inputs and outputs as tuples consisting of a key, value, and a timestamp. The key provides filters for the computation, the value is a byte array that contains the payload, and the timestamp is an arbitrary value assigned by the user, but usually tracks the wall clock time when the event has occurred. MillWheel guarantees an idempotent processing of the tuples as long as the users adhere to the MillWheel API and state persistence mechanisms. When failures occur, MillWheel handles the restarts and retries for the user. To guarantee delivery, MillWheel checkpoints records atomically.

MillWheel exposes some key concepts for its users. Core to MillWheel is the concept of a computation, which encompasses the application logic containing arbitrary user code. MillWheel delivers records to these computations idempotently, and if the user is contacting external systems, it is up to them to ensure that the effects of their code on such systems is also idempotent.

Next are keys, and key extraction functions. The user specifies a key extraction function, which associates a key with the record being processed. In the Zeitgeist example, the key could be a search query, or a cookie fingerprint to detect spam or bot traffic.

Next are streams, which provide the delivery mechanism between different computations. The computation subscribes to zero or more input streams, and publishes one or more output streams. The streams have associated names, and don't have any restrictions on the numbers of subscribers and publishers.

Next is the persistent state, which is an opaque byte string managed on a per-key basis. MillWheel persists the state in a highly available and replicated data store such as BigTable or spanner. Some examples of the persisted state are counters, window aggregation data, or data needed for a join.

Next are the water marks which are used to handle late arriving data. MillWheel computes the watermark for a computation through the formula:
min(oldest work of A, low watermark of C : C outputs to A)

Computations should expect a small rate of late records beyond the low watermark, and it is up to the system to handle these. For example Zeitgeist drops such data (typically around 0.001% of records).

Next are timers, which are per key hooks triggered either at a specific wall clock time, or a watermark value.

To guarantee exactly once delivery, MillWheel generates a unique ID for every record. Before processing the record, it checks against deduplication data, and discards the record if it is a duplicate. Then it runs the user code for the input record, and commits the pending changes to the backing store. It then acknowledges the senders and sends the results to the downstream consumers. To make the deduping fast, MillWheel uses a bloom filter for the matches. If the record ID is not in the filter, MillWheel reads the backing store to determine if the record is a duplicate or not. MillWheel garbage collects the IDs for past deliveries after it can guarantee that all internal senders have finished retrying.

The authors evaluate MillWheel on a multitude of dimensions, including output latency, watermark lag, and scale. They share some of the results of testing the system when deployed to a varying number of CPUs. When testing on 200 CPUs, the median record delay is 3.6 milliseconds and the 95th percentile latency is 30 milliseconds. After enabling exactly once delivery and strong productions, the median latency jumps to 33.7 milliseconds and the 95th percentile latency to 93.8 milliseconds. For tests ranging from 20 CPUs to 2000 CPUs, the median latency stayed roughly constant regardless of the system size, and the 99th percentile latency was on the order of magnitude of 100 milliseconds.

The authors list some of the internal consumers at Google for MillWheel, including billing pipelines that depend on its exactly once guarantees, ads systems that rely on the low latency updates for customer facing dashboards, Zeitgeist and anomaly-detection services, and image processing for Google Street View.

The authors end with comparing the system to others such as S4 and Sonora--which don't address exactly once processing, and Storm which recently added that support with Trident.