## Friday, December 30, 2016

### A hole in the wall

I am a big fan of good and delicious food, irrespective of where it is sold. That includes street vendors, and “holes in the wall,” which I have always associated with small nondescript places, with no signs on the venue, no place to sit, and a staff that exudes a slightly higher risk of contracting dysentery, typhoid, or other gastrointestinal diseases. That description might be a bit extreme, but I had some of the best meals in similar places, including the famous Hyderabadi Dum-Biryani in a place not so far from that description.

So where did the phrase a “hole in the wall” come from? On another historical tour of Florence, our tour guide and language enthusiast pointed out some of the palaces where Italian nobility such as the Medici family lived long time ago. Invariably at the entrance there was a slit or a hole in the wall, and the tour guide told us the story that after the nobility hosted lavish dinner parties, instead of throwing the remaining food away, they would give it to the unfortunate lining up in front of the palace through that small hole in the wall of the building. Since the food was delicious, eating at the hole in the wall was sought after during these times, and the tour guide surmised that this is the origin of the phrase. I could not verify that claim, however one site online lists a similar story:

the hole made in the wall of the debtors' or other prisons, through which the poor prisoners received the money, broken meat, or other donations of the charitably inclined”

Regardless of the origin of the phrase, the story and the imagery were vivid, and they stuck with me.

## Thursday, December 29, 2016

### A paper a day keeps the doctor away: Efficient Commit Protocols for the Tree of Processes Model of Distributed Transactions

The two-phase commit protocol is widely known in the database community, yet despite its notoriety, finding the paper that first described it proved pretty difficult. The closest find is the paper about commit protocols for distributed transactions, which describes the protocol in detail, and introduces two additional modifications. The paper references the original publications of the protocol by Gray, Lamport, and others, however I could not find these notes  online.

The paper describes the 2PC protocol, both under normal operation, and when failures occur. For normal operation, when the user decides to commit a transaction, the coordinator initiates the first phase of the protocol--the prepare phase, by sending prepare messages to all its subordinates. Each subordinate that can commit the transaction, writes a prepare log record, and sends an ACK back to the coordinator. The subordinates that can't commit the transaction, write an abort log record, and  respond with an Abort message to the coordinator. An abort message vetoes the transaction, and so the subordinates that cannot commit the transaction can just abort it, and forget about it.

After the coordinator receives all the responses from its subordinates, it initiates the second phase of the protocol. If all responses were YES votes, the coordinator moves to the commit phase, where it writes a commit record, and sends a commit message to all the subordinates.  Upon receiving the commit message, the subordinates write a commit log, send an ACK to the coordinator, and commit the transaction.

On the other hand if one subordinate vetoed the transaction, the coordinator moves to the abort phase, writes an abort record, and sends abort messages to all the subordinates that are in the prepared state. Each subordinate writes an abort record, sends an ACK to the coordinator, and aborts the transaction. Once the coordinator receives all the ACK messages from its subordinates, it writes an end record, and forgets about the transaction.

In the protocol, all record writes happen before sending messages, which minimizes communications with the coordinator when recovering from failure.

With all these messages going around it is hard to envision that everything will go on smoothly. The authors then describe the 2PC protocol in the presence of failures due to site and networking issues. The authors assume that as part of the recovery, there is a process that reads the logs on stable storage and accumulates information about the executing transactions at the time of the crash. This information is used to respond to queries from other sites. The authors then present a comprehensive state machine of where the transaction failed during the 2PC protocol, and how to recover from it.  For example, if the transaction was in the prepared state, the recovery process tries to contact the coordinator to see how to proceed with the transaction. When the recovery site responds, the recovery process proceeds with handling the Commit/Abort response according to the 2PC in the absence of failures. If the recovery process finds a transaction without a commit log, it rolls back the transaction. If it finds a transaction in the committing/aborting states--when the node is acting as a coordinator, before the crash--the recovery process periodically tries to send Commit/Abort messages to the subordinates that have not acknowledged yet. Once all ACKs are received, the recovery process ends the transaction, and moves along.

The authors then present modifications of the 2PC commit that optimize the messages sent between the coordinators, and the subordinates. They observe that in the absence of any information in the crashed site about a transaction, the correct response is to abort the transaction. This observation leads to the presumed abort protocol. The protocol takes advantage of knowing that some subordinates might execute complete and partial read-only transactions: ones where there is no UNDO/REDO logs written. For these transactions, we can skip parts of the 2PC protocol. For example, if a subordinate during a prepare statement finds the transaction read-only, it issues a READ vote to the coordinator, and forgets the transaction.  The coordinator then does not include the subordinate in the COMMIT/ABORT phase of the protocol. The coordinator also skips the second phase of the 2PC protocol if it is READ only, and gets READ votes from its subordinates. The authors present other states of the presumed abort protocol, and what messages in the 2PC protocol are skipped.

The authors then examine what would happen if they eliminated the ACKs to the COMMIT messages. The observations lead to the presumed commit modification of the 2PC protocol. In presumed commit, the coordinator behaves similar to the presumed abort protocol, with minor modifications:
• Before sending a prepare statement, the coordinator collects the names of all subordinates
• It writes both abort and commit records
• It requires ACKs for aborts, and not for commits
• It writes an end record after aborts and not for commits
• For read only transactions, it does not write any record

The authors end by comparing the log I/O performance, and messages sent of the 2PC protocol, vs Presumed Abort, and Presumed Commit protocols, and describing how to extend the protocols to multi-level tree of processes, where non-leaf nodes act as coordinators as well as subordinates, while leaf nodes act as subordinates, with the root node as a coordinator.

## Thursday, December 22, 2016

### The wrong end of the stick

One of my favorite activities while traveling is to take a walking tour of the city I am visiting. The tour usually consists of a small group led by a tour guide, who invariably is a student of art or history studying abroad, or an expat humanities graduate who is living abroad and is augmenting their income by giving tours. The tours are always enjoyable, combining stories about the city and its history, architecture, and cultural spots with frequent stops to coffee and dessert shops. Sometimes you get a special tour guide, who in addition to being a history buff, is also a linguistics enthusiast. When that happens, you hear special stories about the historical origin of phrases: something I am very interested in.
In Rome, I had such a tour guide, and the story stuck with me, although I could not verify its accuracy. I could find one website that has a similar reference to the story. It was hilarious and I remembered it to this day. It is the story of the origin of the phrase “the wrong end of the stick.” The tour guide explained that in the old Roman empire, before the advent of toilet paper and private sanitation, people used to go to public toilets to relieve themselves. When they were done, they would wipe themselves using a stick with a sponge at the end, and pass the sticks around after cleaning up. You can imagine how you’d feel if you grabbed the stick by the wrong end.

## Wednesday, December 21, 2016

### A paper a day keeps the doctor away: BlinkDB: Queries with Bounded Errors and Bounded Response Times on Very Large Data

The latest advances in Big Data systems have made storing and computing over large amounts of data more tractable than in the past. Users' expectations for how long a query should take to complete have not on the other hand  changed, and remain independent of the amount of data that needs to be processed. The expectation mismatch of query run time causes user frustration when iteratively exploring large data sets in search of an insight. How can we alleviate that frustration? BlinkDB offers users a way to balance result accuracy with query execution time: the users can either get quantifiably approximate answers very quickly, or they can elect to wait for a longer period of time to get more accurate results. BlinkDB accomplishes this tradeoff through the magic of dynamic sample selection, and an adaptive optimization framework.

The authors start with an illustrative example of computing the average session time for all users in New York. If the table that stores users' sessions is very large, and cannot fit in memory, the query will take a long time to complete since disk reads are expensive. If the same query ran on a smaller sample of data that could fit in memory, it would run orders of magnitude more quickly. The query would produce approximate results that would be accurate enough for many practical purposes. Sampling theory can help with quantifying the accuracy of the results.

BlinkDB is different from other sampling based systems in that it does not make any assumptions on the values of the filtered attributes in the WHERE, GROUP BY, and HAVING clauses in the SQL queries it processes. The only assumptions it makes is that the set of columns that appear in these filters remain stable over time. This allows BlinkDB to process a flexible variety of workloads. To test the validity of the stable column set assumption, the authors analyzed 18K queries from Conviva--a company managing video distribution across the Internet, and 69K queries from Facebook. In both cases, over 90% of the queries were covered by 10% of the columns.

To process the queries, BlinkDB consists of two modules: one for sample creation, and the other for sample selection. The sample creation module creates stratified samples which ensure that rare values of any column are overly represented relative to a uniform random sample. The stratified sampling strategy allows BlinkDB to answer queries about any value regardless of how frequently it is represented in the underlying data. The sample selection module selects which samples to run queries on, to satisfy the query's error and response time constraints.

BlinkDB supports a constrained set of SQL aggregation queries--COUNT, AVG, SUM, and QUANTILE, where operations can be annotated with error bounds. BlinkDB does not support arbitrary joins, or nested queries, however it supports joining a large sampled table with smaller tables, if the smaller tables can fit in memory on a single node in the cluster.

The authors explain in detail how BlinkDB creates stratified samples. Intuitively, the algorithm starts with a sample size $n$, and counts the distinct dimension value combinations $x$ for the query column sets for that sample size $N_n(x)$. After the counts are complete, for each distinct dimension value combination $x$, the algorithm takes uniform samples from $N_n(x)$ rows randomly without replacement forming a sample $S_x$. The full sample space is the union of all the $S_x$ samples. The sample size $n$ directly determines the error of the query results. Because of the relationship between the sample size $n$ and the error rate, the authors claim that the samples are hierarchical, where $S_n \subset S_{n^{max}}$, and so we don't need to compute multiple sample spaces decreasing the storage overhead for samples. The authors claim that the overhead is roughly $2.5\%$ of the original table size.

The authors explain how BlinkDB selects the query column sets to use to create the stratified samples. They pose the problem as an optimization problem, that factors in the sparsity of the data, the workload characteristics, and the sample storage cost. Once the query column sets are selected, BlinkDB creates the stratified samples, and maintains them over time. Creating uniform samples roughly takes a hundred seconds to create, while stratified samples take between 5 and 30 minutes to create, depending on the number of unique values in the query column sets.

The authors implemented BlinkDB on top of Hive/Hadoop and Shark with minimum changes to the underlying query processing system. BlinkDB adds two major components to the Hive framework: offline sampling modules that creates and maintains samples over time, and a runtime sample selection component. The offline sampling module uses techniques such as reservoir sampling and binomial sampling to create the stratified samples across dimensions.

The authors tested the system on a 100-node cluster using TPC-H benchmarks, and real world workload from Facebook, and Conviva.  The clusters used Amazon's EC2 extra-large instances, each with 8 cores, 68GB of RAM, and 800GB disk space. The cluster used 75TB of disk space, and 6TB of distributed RAM cache.

The authors ran performance experiments using the Conviva data--a single  fact table, with about 104 columns, and 5.5 billion rows. The experiments show that queries on 17 TB of data take about 2 seconds to finish with a 90-98% accuracy vs thousands of seconds using systems that don't use stratified sampling.

The authors end by reviewing other systems in the literature such as Spark, Shark, and Dremel, which work well if the data processed fits into the aggregate memory in the cluster, but break down if the data size increases beyond that. The authors also compare BlinkDB to other systems that employ sampling to speed up the computations such as STRAT, SciBORQ, and AQUA.

## Tuesday, December 6, 2016

### A paper a day keeps the dr away: Dapper a Large-Scale Distributed Systems Tracing Infrastructure

Modern Internet scale applications are a challenge to monitor and diagnose. The applications are usually comprised of complex distributed systems that are built by multiple teams, sometimes using different languages and technologies. When one component fails or misbehaves, it becomes a nightmare to figure out what went wrong and where. Monitoring and tracing systems aim to make that problem a bit more tractable, and Dapper, a system by Google for large scale distributed systems tracing is one such system.

The paper starts by setting the context for Dapper through the use of a real service: "universal search". In universal search, the user types in a query that gets federated to multiple search backends such as web search, image search, local search, video search, news search, as well as advertising systems to display ads. The results are then combined and presented back to the user. Thousands of machines could be involved in returning that result, and any poor performance in one of them can cause end-user latency. For services such as search, the latency is very important, since end-users are very sensitive to it. How can one diagnose such latency problems and pinpoint the offending sub-service?

Enter Dapper, Google's distributed tracing infrastructure. The authors start by listing the system's requirements and design goals: low monitoring overhead, application level transparency, scalability, and low latency availability of the data--roughly within a minute from generation. The author explain that Dapper chooses application level transparency instead of cooperative monitoring where developers write code to instrument their components, because the latter is fragile due to instrumentation omissions and bugs. To achieve transparency, Dapper restricts tracing instrumentation to a small corpus of ubiquitous threading, control flow, and RPC libraries.  Dapper also uses adaptive sampling to scale the system to the vast amount of telemetry generated, and reduce the overhead of collecting data. The authors compare how Dapper differs from other distributed tracing systems such as Pinpoint, Magpie, and X-trace.

The authors then explain how Dapper stitches federated requests together, as in the example of universal search, where a single query fans out to multiple services, that in turn could fan out the query to another tier of sub-services. The authors explain the two approaches commonly used to stitch the causal relationship between requests: black box scheme, which relies on statistical inference to form the sub-request relationships, and annotation based scheme, where each request is annotated to help form these relations. Dapper implements an annotation based scheme, which is made possible because most services at Google communicate uniformly using RPC. The approach is not restrictive though, since one can instrument other protocols such as HTTP, SMTP, etc. to the same effect.

Dapper models the relationship between requests using concepts such as trees, spans, and annotations.
In a trace, the basic unit of work is the span: identified by a name, span id, and a parent id. A single span models an RPC call, and spans are organized into a trace tree through the causal relationship of the spans that fulfill the request. For example every call to an additional infrastructure layer adds another span at a lower depth in the trace tree. A span contains information from each RPC, which usually involves a client-server pair, with the corresponding annotations (client send/receive, server send/receive, and application specific annotations)

Dapper auto-instruments applications to build trace trees, with spans and annotations at the following points:
• When a thread handles a traced control path
• Asynchronous calls through Google's callback libraries
• Communication through Google's RPC libraries

The tracing is language independent, and supports code written in C++ and Java.

The authors present the Dapper architecture, which implements a three stage process:
• Instrumented binaries write span data to local disk
• Daemons pull the instrumentation from all production machines to Dapper collectors
• Collectors write traces to Big Table with trace ids as the row key, and span ids as the column keys

The median latency for the process from when data is written locally to when it is available in Big Table is 15 seconds.

Dapper exposes an API that makes accessing trace data in Big Table easy. For security and privacy concerns Dapper stores only the names of the RPC methods, and not their payload. The annotations API enables application developers to add payload information if needed on an opt-in basis. The authors share some statistics on Dapper's usage within Google, including usage of the annotations API.

The authors evaluate the telemetry overhead for the generation, and collection stages, as well as the effect on production workloads. The creation overhead comes from generating and destroying spans and annotations, and persisting them to disk. The authors share that root spans add roughly 200ns, and that span annotations add negligible overhead (9ns-40ns) on a $2.2 GHz$ machine. The CPU overhead is roughly 0.3% in the worst case scenario, and networking overhead presents $0.01\%$ of the total network traffic. The latency overhead depends on the sampling rate, with full collection adding $16\%$ overhead to request latency, and $1/16$ sampling and below adding negligible overhead. The authors found that in high volume applications, a sampling rate of $1/1024$ contains enough information for diagnostics.

For lower traffic workloads, Dapper employs adaptive sampling that is parametrized by the desired rate of traces per unit time. The traces record the sampling probability used, which helps with analysis later. With sampling, Dapper users generate $1TB/day$, and store the data for 2 weeks.

In addition to the collection infrastructure, the Dapper team built an eco-system of tools that make accessing and analyzing the data a lot easier, including a depot API that provides trace access by ID,  bulk access through MapReduce operations, and indexed access. Dapper also provides a web interface for users to interact with the data.

The authors end with cataloguing Dapper usage within Google, from use during development phase of Ads Review services to help with performance improvements and discovering bottlenecks, to addressing long tail latency, inferring services dependencies, and network usage of various services.

## Tuesday, September 27, 2016

### A paper a day keeps the doctor away: Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services

Sixteen year ago, Eric Brewer introduced what is now known as the CAP theorem, which states that for a web service it is impossible to guarantee consistency, availability, and partition tolerance.  The conjecture was based on Brewer's experiences at Inktomi--a search engine company he cofounded, and was published without proof.  Gilbert and Lynch presented one in their paper: "Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services."

The paper is a good theoretical read, and the proofs the authors present are very tractable. They first begin by  formalizing the concepts of consistency (the authors use atomic in the paper), availability, and partition tolerance. For a consistent service, there is a total order on all operations such that each operation looks as if it were completed at a single instant. For availability, every request received by a non-failing node in the system must result in a response. Finally for partition tolerance the network is  allowed to lose arbitrarily many messages sent from one node to another.

The authors use these definitions to present their first impossibility result:

"It is impossible in the asynchronous network model to implement a read/write data object that guarantees the following properties:
• Availability
• Atomic consistency
in all fair executions (including those in which messages are lost). "

They prove the assertion by contradiction. The proof uses  two nodes/partitions in the system $A$ and $B$, where all the messages between $A$ and $B$ are lost. The proof assumes two operations $\alpha$ and $\beta$ that execute separately on $A$ and $B$, and are ordered such that $\beta$ occurs after $\alpha$ has ended.  $\alpha$ executes a write on partition $A$, $\beta$ executes a read from partition $B$ with all messages between $A$ and $B$ lost. Each operation on its own returns consistent results, while combined together as a new operation $\alpha+\beta$, return inconsistent data, proving the theorem.

The authors extend the result through a similar method of argument to all types of executions, since nodes $A$ and $B$ can't tell if the messages between them are lost in an asynchronous network (without the concept of clocks or time). The authors provide some example systems for asynchronous networks that provide two of the three guarantees (C,A, and P).

For partially synchronous systems, where every node has a clock, and all clocks increase at the same rate, but are not synchronized, the authors present another impossibility result:

"It is impossible in the partially synchronous network model to implement a read/write data object that guarantees the following properties:
• Availability
• Atomic consistency
in all executions (even those in which messages are lost)"

The proof is similar to the original impossibility result, with execution $\beta$ sufficiently delayed for the messages not to reach partition $B$.

The authors close by providing a weaker consistency condition that allows stale data to be returned when there are partitions through the use of a centralized node, and the formal requirements it places on the quality of the stale data returned .

## Monday, September 19, 2016

### A paper a day keeps the doctor away: Medians and Beyond: New Aggregation Techniques for Sensor Networks

The Internet of Things has spurred renewed interest in sensors and sensor telemetry.  For each IoT application, modern sensors--ones that support sensing, computation, and communication, collect data about their environment, and relay it back to a "base station". The base station aggregates data from all the sensors to make sense of the telemetry.

Since most of the sensors are deployed in settings that have severe constrains on power consumption and battery life,  it is usually expensive to send all the raw telemetry data to the "base station" without aggregation or sampling at each sensor. For certain class of metrics data, such as counts, sums, and averages, the base station can combine the aggregates from each sensor, and come up with meaningful statistics overall. However for sensor aggregates such as median, mode, and percentiles, the job of the base station becomes harder. The authors of the paper "Medians and Beyond: New Aggregation Techniques for Sensor Networks" present an algorithm called "Q-digest" that helps the base station calculate statistics such as medians, modes, histograms, quantiles, and range queries from aggregated sensor statistics.

The authors start by highlighting the need for sensors to send aggregated data by citing that the cost of transmitting one bit over the radio is more expensive than running one instruction on the sensor.  They also highlight that from a user's perspective, in most cases, individual sensor values are not important, and that users are fine with aggregate functions such as min/max, count, sum, average, and percentiles. They review projects such as TinyDB and Cougar, and expand on that work to enable computing percentiles through the Q-digest algorithm.

The authors first describe Q-digest using one sensor, and then explain how to combine Q-digests from multiple sensors to answer percentile and other queries. For one sensor, the authors assume that the sensor captures integers with the range $[1, \sigma]$.

The authors define a binary partition tree over the range, with $\sigma$ leaf nodes representing the values in the range, and the frequency of these values in the sensor data stream. The binary tree has a  height   $\log \sigma$, and each node in the tree represents a bucket that has a range defining the position and the width of the bucket. For example, the root node of the tree describes the full range $[1,\sigma]$, while its two children describe the ranges $[1,\sigma/2]$, and $[\sigma/2+1,\sigma]$ respectively, and the leaf nodes are the raw data in the stream. The Q-digest is a special subset of that conceptual tree that encodes the distribution of sensor values in a compressed format, and satisfies the following properties:

Given a compression parameter $k$, and a data stream of size $n$, a tree node $v$ is in the digest if:
$count(v) < [n/k]$ and $count(v) + count(v_p) + count (v_s) > [n/k]$ where $v_p$ is the parent of $v$, and $v_s$ are the siblings of $v$. The root and leaf nodes are exempt from satisfying both properties.

Intuitively the first property states that no node except leaf nodes should have a high count, and the second states that no node and its siblings should have low counts. The two properties help compress the tree into the Q-digest, and help with error bounds.

The authors walk through an example of building and compressing the Q-digest, and provide proofs of bounds on the size of the digest and maximum count error per node.

The authors then show how we can use the Q-digest to answer different types of queries. For example, to find the $q$th quantile, we sort the nodes of the Q-digest in increasing bucket right endpoint values. The sorted list gives a post-order traversal of the nodes of the Q-digest, that when scanned from the left and adding the counts of value frequencies seen so far, will at some point exceed $qn$. The node at which this happens is a good estimate of the $q$th quantile. The authors show that the Q-digest can also be used to calculate inverse quantile queries, range queries, and consensus queries.

The authors close by sharing the experimental evaluation of accuracy, and message size, power consumption of the algorithm, and discuss how to evolve the work to deal with dropped values in the sensor network.

## Monday, March 21, 2016

### A paper a day keeps the doctor away: NoDB

In most database systems, the user defines the shape of the data that is stored and queried using concepts such as entities and relations. The database system takes care of translating that shape into physical storage, and managing its lifecycle. Most of the systems store data in the form of tuples, either in row format, or broken down into columns and stored in columnar format. The system also stores metadata associated with the data, that helps with speedy retrieval and processing. Defining the shape of the data a priori, and transforming it from the raw or ingestion format to the storage format is a cost that database systems incur to make queries faster. What if we can have fast queries without incurring that initial cost? In the paper "NoDB: Efficient Query Execution on Raw Data Files", the authors examine that question, and advocate a system (NoDB) that answers it.

The authors start with the motivation for such a system. With the recent explosion of data, and our insatiable appetite for quick insights, loading and transforming the data a priori is not a welcome overhead. This is especially the case if the user wants to perform a series of ad-hoc explorations on vast amounts of data, and is interested in decreasing the data-to-query time. The authors propose that one way to accomplish that is to eliminate the loading phase, and advocate that querying over the raw data in-situ is a viable option.

Instead of starting from scratch, the authors modify a traditional relational database (PostgreSQL) into a NoDB system, and discuss how they improved query performance using techniques such as selective parsing, adaptive indexing, and cashing techniques.

The authors start with two straightforward approaches to query raw data files. In the first approach, they propose to run the loading procedure whenever a table is referenced, and then evaluate the query on the loaded data. The loaded data may be discarded after the query executes, or may be persisted on disk.

In the second approach the authors discuss tightly integrated raw file access within the query execution logic, where the leaf operators of a query plan are enriched with the ability to access raw data files including parsing and tokenization. The data parsing and processing occurs in a pipelined fashion, where the raw file is read in chunks and parsed and passed immediately to the rest of the query plan operators.

Both approaches require the schema to be known and declared a priori, and the tables to be defined as in-situ tables; features that are offered by modern database systems such as MySQL. The downside of both approaches is that the data is not kept in persistent tables, and so for repeated queries against the tables, every invocation needs to perform the loading and parsing from scratch. Moreover both approaches can't make use of indexes and cost optimizations for the raw data file.

The authors then examine the NoDB solution, which aims to provide in-situ query processing performance competitive with traditional database systems. The authors discuss how they created data structures that minimize the cost of raw data access for in-situ querying, and selectively eliminate the need for raw data access through careful caching and scheduling. The authors use processing over a CSV file as a vehicle to explain their ideas.

When a query is submitted to the NoDB solution, and references relational tables that are not yet loaded, the system overrides the scan operator with the ability to access raw data files directly, with the remaining query plan generated by the optimizer remaining unchanged. The system speeds up the raw access through a variety of techniques. First, it performs selective tokenization of the raw files, where it aborts tokenizing tuples as soon as it finds the required attributes for the query. Second it forms tuples selectively, where only the ones that contain the attributes for a query are fully composed. Third it keeps an adaptive positional map that describes the raw file shape discovered so far. The system uses the positional map to reduce parsing and tokenization costs by using it to navigate and retrieve raw data faster.

Initially the positional map is empty. As the system executes queries it continuously augments the map by populating it while tokenizing the raw file for the current query. For subsequent queries, the system uses the map to jump to the exact position in the file for the tuple it is looking for, or as close to it as possible.

NoDB also uses a cache that temporarily holds previous accessed data, and accesses the cache instead of reprocessing the raw data file when another query runs.

Another technique the authors used is to calculate table statistics using sampled data instead of the full data set. The authors modified the  PostgreSQL scan operator to create statistics on the fly, and the system would invoke the native PostgreSQL statistics routines with a sample of the data, and store and use them similar to the way the conventional DB does.

The authors share the result of the experiments they've conducted on the modified system, and compare it traditional DBMS performance for similar queries. The authors implemented NoDB on top of PostgreSQL 9.0, and ran their experiments on a Sun X4140 server with 2x Quad-Core AMD Opteron processor (64 bit), 2.7 GHz, 512 KB L1 cache, 2 MB L2 cache and 6 MB L3 cache, 32 GB RAM, 4 x 250GB 10000 RPM SATA disks (RAID-0), using Ubuntu 9.04. The experiments used a raw data file of 11GB containing 7.5 million tuples, each containing 150 attributes with random integers.

The authors investigate the effect of the positional map with varying storage capacity. The authors show that the positional map improved response times by a factor of 2. The authors also observed linear scalability as the file size was increased gradually from 2GB to 92GB.

The other experiments show that the auxiliary structures (caching, maps) reduce the time to access raw data files and amortize the overhead across subsequent queries.

The authors close with the challenges with their approach including data type conversion, complex data schemas, and integration with external tools, and the opportunities for NoDB including flexible storage, adaptive indexing, and file system interfaces.

## Monday, March 14, 2016

### A paper a day keeps the doctor away: MillWheel: Fault-Tolerant Stream Processing at Internet Scale

The recent data explosion, and the increase in appetite for fast results spurred a lot of interest in low-latency data processing systems. One such system is MillWheel, presented in the paper "MillWheel: Fault-Tolerant Stream Processing at Internet Scale", which is widely used at Google.

In MillWheel, the users specify a directed computation graph that describe what they would like to do, and write application code that runs on each individual node in the graph. The system takes care of managing the flow of data within the graph, persisting the state of the computation, and handling any failures that occur, relieving the users from that burden.

MillWheel exposes an API for record processing, that handles each record in an idempotent fashion, with an exactly once delivery semantics. The system checkpoints progress with a fine granularity, removing the need to buffer data between external senders.

The authors describe the system using the Zeitgeist product at Google as an example, where it is used to figure out the breakout search queries for a given period. The computation involves bucketing records in one second intervals, and comparing the expected traffic to a predictive model. If the quantities differ consistently over a reasonable amount of buckets, the computation concludes that the query is either spiking or dipping.

The authors list the principles for designing a stream processing at scale:
• Data availability: The data should be available soon after it is published
• Persistency: Persistent state abstractions should be available to user code
• Out-of-order processing: Out of order data should be handled gracefully
• Watermarks: to handle late arriving data
• Latency: Reasonable latency as the system scales
• Delivery semantics: Exactly once delivery semantics

MillWheel represents inputs and outputs as tuples consisting of a key, value, and a timestamp. The key provides filters for the computation, the value is a byte array that contains the payload, and the timestamp is an arbitrary value assigned by the user, but usually tracks the wall clock time when the event has occurred. MillWheel guarantees an idempotent processing of the tuples as long as the users adhere to the MillWheel API and state persistence mechanisms. When failures occur, MillWheel handles the restarts and retries for the user. To guarantee delivery, MillWheel checkpoints records atomically.

MillWheel exposes some key concepts for its users. Core to MillWheel is the concept of a computation, which encompasses the application logic containing arbitrary user code. MillWheel delivers records to these computations idempotently, and if the user is contacting external systems, it is up to them to ensure that the effects of their code on such systems is also idempotent.

Next are keys, and key extraction functions. The user specifies a key extraction function, which associates a key with the record being processed. In the Zeitgeist example, the key could be a search query, or a cookie fingerprint to detect spam or bot traffic.

Next are streams, which provide the delivery mechanism between different computations. The computation subscribes to zero or more input streams, and publishes one or more output streams. The streams have associated names, and don't have any restrictions on the numbers of subscribers and publishers.

Next is the persistent state, which is an opaque byte string managed on a per-key basis. MillWheel persists the state in a highly available and replicated data store such as BigTable or spanner. Some examples of the persisted state are counters, window aggregation data, or data needed for a join.

Next are the water marks which are used to handle late arriving data. MillWheel computes the watermark for a computation through the formula:
min(oldest work of A, low watermark of C : C outputs to A)

Computations should expect a small rate of late records beyond the low watermark, and it is up to the system to handle these. For example Zeitgeist drops such data (typically around 0.001% of records).

Next are timers, which are per key hooks triggered either at a specific wall clock time, or a watermark value.

To guarantee exactly once delivery, MillWheel generates a unique ID for every record. Before processing the record, it checks against deduplication data, and discards the record if it is a duplicate. Then it runs the user code for the input record, and commits the pending changes to the backing store. It then acknowledges the senders and sends the results to the downstream consumers. To make the deduping fast, MillWheel uses a bloom filter for the matches. If the record ID is not in the filter, MillWheel reads the backing store to determine if the record is a duplicate or not. MillWheel garbage collects the IDs for past deliveries after it can guarantee that all internal senders have finished retrying.

The authors evaluate MillWheel on a multitude of dimensions, including output latency, watermark lag, and scale. They share some of the results of testing the system when deployed to a varying number of CPUs. When testing on 200 CPUs, the median record delay is 3.6 milliseconds and the 95th percentile latency is 30 milliseconds. After enabling exactly once delivery and strong productions, the median latency jumps to 33.7 milliseconds and the 95th percentile latency to 93.8 milliseconds. For tests ranging from 20 CPUs to 2000 CPUs, the median latency stayed roughly constant regardless of the system size, and the 99th percentile latency was on the order of magnitude of 100 milliseconds.

The authors list some of the internal consumers at Google for MillWheel, including billing pipelines that depend on its exactly once guarantees, ads systems that rely on the low latency updates for customer facing dashboards, Zeitgeist and anomaly-detection services, and image processing for Google Street View.

The authors end with comparing the system to others such as S4 and Sonora--which don't address exactly once processing, and Storm which recently added that support with Trident.

## Tuesday, February 23, 2016

### A paper a day keeps the doctor away: Gorilla: A Fast, Scalable, In-Memory Time Series Database

Operating large scale Internet services today is a challenge, and making sure that the services run well with minimal customer disruptions is doubly so. The reason is that both require good visibility into how the individual service components are performing, which necessitates gathering and analyzing a lot of measurements about the performance.   The measurements vary from metrics annotated with labels or dimensions that can be used to filter and group the results at query time, to exception stacks, log lines, and trace events. Collecting and analyzing such a large amount of metrics is the realm of time series databases, and the paper: "Gorilla, a fast, scalable, in-memory time series database" presents such a system which is in use at Facebook to handle monitoring and alerting their vast infrastructure.

In the paper the authors start by articulating the design principles for Gorilla: they wanted a system that is always available for writes; they tolerated the loss of a small amount of data; and they wanted a geo-replicated system that is partition tolerant to network cuts, software updates, and configuration changes.

From their prior experience with monitoring systems at Facebook, the authors made a couple of insightful observations. First, they noticed that users of monitoring systems are mostly interested in recent data points rather than historical data, with almost 85% of the diagnostics queries being for data collected in the last 26 hours. Second they observed that users are interested in aggregate data rather than individual data points for diagnostics scenarios. Third the authors noticed a big disparity between write rates that exceeded millions per second, and read rates that are orders of magnitude lower, and mostly from dashboards and alerting systems.

These observations steered the authors toward an in-memory system that ingests and computes over the last 26 hours of data,  backed by an HBase solution for the historical data. At Facebook's scale this presented a challenge, and the authors shared some of the perf requirements in the paper:

• 700 million data points (time stamp and value) added per minute
• Data stored for 26 hours
• More than 40,000 queries per second at peak
• Reads succeed in under one millisecond.
• Support time series with 15 second granularity (4 points per minute per time series)
• Two in-memory, not co-located replicas (for disaster recovery capacity)
• Always serve reads even when a single server crashes
• Ability to quickly scan over all in memory data
• Support at least 2x growth per year.

The authors came up with brilliant compression schemes that allowed them to compress the time series data to roughly 1.37 bytes per point, or a 12x reduction in size, which allowed them to store 26 hours' worth of data in 20 hosts with 1.3TB of memory.

Each time series consists of monitoring data, modeled as a tuple: a string that serves as a key to the time series, a 64 bit timestamp, and a double precision float value. The time series are sharded by key, and each key goes to a Gorilla host. Every write goes to two different geo locations, and on host failure, all reads failover to the different region.

The authors compress the time series data using delta encoding using two ways. First is compressing the timestamp for every monitoring data tuple. The series starts with an uncompressed 64 bit timestamp. Further timestamps are compressed using delta-of-deltas. For example, if the deltas are 60, 60, 50, and 61 the delta of deltas are  0, -1, and 2. The delta of deltas are further encoded using variable length encoding depending on the range the delta falls into. For -63 to 64 the prefix is 10 followed by 7bits, for -255 to 256 the prefix is 110 followed by 9bits, and for -2047 to 2048 the prefix is 1110 followed by 12bits, otherwise the prefix is 1111 followed by 32 bits.

For monitored values, the floating points are compressed by xor'ing with the prior value, and encoding the resulting integer with a header '11' in two bits, number of leading zeros, how many meaningful bits exist, and the value of the meaningful bits. Again, the first monitored value is stored without compression. The subsequent values are XOR'ed with the previous values. If the result is 0, a single 0 bit is stored. Otherwise the authors calculate the number of leading and trailing zeros, and store a prefix 1 followed by:
0 if the block of meaningful bits falls within the block of previous meaningful bits, and store the meaningful block
Otherwise store 1, the length of leading 0s, the length of meaningful bits in the XORed value, and the meaningful bits

For Facebook's monitored data, the authors found that roughly 51% of the values are encoded with 1 bit,  about 30% compressed with '10', and 19% compressed with '11'.

The authors use in-memory data structures to store time series data with a lookup map from time series name to the data for quick access. When time series are deleted, they are tomb stoned.

The in-memory data structures are backed by disk, and the data is stored in GlusterFS with 3x replication. Each host owns multiple shards of data stored in a single directory per shard. The directory contains key lists, append only logs, complete block files, and checkpoints. The shard manager uses Paxos to assign shards to nodes.

Data is streamed between Gorilla instances without guaranteeing consistency between the geo locations. If one geo fails,  the query traffic is sent to the other geo center, and is only restored after the region has been healthy for 26 hours.

The authors then describe the tools that use Gorilla for monitoring and alerting. There is a low latency query processing tool, a correlation engine that compares the time series to millions of others to figure out what happened around the time a service broke, a charting service to visually find where the outliers are, and an aggregation service.

The authors end with their experiences operating Gorilla over the prior 6 months including fault tolerance, and help with diagnostic scenarios, and close with the lessons they learned, namely:
• Prioritize recent data over historical data