Friday, December 30, 2016

A hole in the wall

I am a big fan of good and delicious food, irrespective of where it is sold. That includes street vendors, and “holes in the wall,” which I have always associated with small nondescript places, with no signs on the venue, no place to sit, and a staff that exudes a slightly higher risk of contracting dysentery, typhoid, or other gastrointestinal diseases. That description might be a bit extreme, but I had some of the best meals in similar places, including the famous Hyderabadi Dum-Biryani in a place not so far from that description.

So where did the phrase a “hole in the wall” come from? On another historical tour of Florence, our tour guide and language enthusiast pointed out some of the palaces where Italian nobility such as the Medici family lived long time ago. Invariably at the entrance there was a slit or a hole in the wall, and the tour guide told us the story that after the nobility hosted lavish dinner parties, instead of throwing the remaining food away, they would give it to the unfortunate lining up in front of the palace through that small hole in the wall of the building. Since the food was delicious, eating at the hole in the wall was sought after during these times, and the tour guide surmised that this is the origin of the phrase. I could not verify that claim, however one site online lists a similar story:

              the hole made in the wall of the debtors' or other prisons, through which the poor prisoners received the money, broken meat, or other donations of the charitably inclined”

Regardless of the origin of the phrase, the story and the imagery were vivid, and they stuck with me.

Thursday, December 29, 2016

A paper a day keeps the doctor away: Efficient Commit Protocols for the Tree of Processes Model of Distributed Transactions

The two-phase commit protocol is widely known in the database community, yet despite its notoriety, finding the paper that first described it proved pretty difficult. The closest find is the paper about commit protocols for distributed transactions, which describes the protocol in detail, and introduces two additional modifications. The paper references the original publications of the protocol by Gray, Lamport, and others, however I could not find these notes  online.

The paper describes the 2PC protocol, both under normal operation, and when failures occur. For normal operation, when the user decides to commit a transaction, the coordinator initiates the first phase of the protocol--the prepare phase, by sending prepare messages to all its subordinates. Each subordinate that can commit the transaction, writes a prepare log record, and sends an ACK back to the coordinator. The subordinates that can't commit the transaction, write an abort log record, and  respond with an Abort message to the coordinator. An abort message vetoes the transaction, and so the subordinates that cannot commit the transaction can just abort it, and forget about it.

After the coordinator receives all the responses from its subordinates, it initiates the second phase of the protocol. If all responses were YES votes, the coordinator moves to the commit phase, where it writes a commit record, and sends a commit message to all the subordinates.  Upon receiving the commit message, the subordinates write a commit log, send an ACK to the coordinator, and commit the transaction.

On the other hand if one subordinate vetoed the transaction, the coordinator moves to the abort phase, writes an abort record, and sends abort messages to all the subordinates that are in the prepared state. Each subordinate writes an abort record, sends an ACK to the coordinator, and aborts the transaction. Once the coordinator receives all the ACK messages from its subordinates, it writes an end record, and forgets about the transaction.

In the protocol, all record writes happen before sending messages, which minimizes communications with the coordinator when recovering from failure.

With all these messages going around it is hard to envision that everything will go on smoothly. The authors then describe the 2PC protocol in the presence of failures due to site and networking issues. The authors assume that as part of the recovery, there is a process that reads the logs on stable storage and accumulates information about the executing transactions at the time of the crash. This information is used to respond to queries from other sites. The authors then present a comprehensive state machine of where the transaction failed during the 2PC protocol, and how to recover from it.  For example, if the transaction was in the prepared state, the recovery process tries to contact the coordinator to see how to proceed with the transaction. When the recovery site responds, the recovery process proceeds with handling the Commit/Abort response according to the 2PC in the absence of failures. If the recovery process finds a transaction without a commit log, it rolls back the transaction. If it finds a transaction in the committing/aborting states--when the node is acting as a coordinator, before the crash--the recovery process periodically tries to send Commit/Abort messages to the subordinates that have not acknowledged yet. Once all ACKs are received, the recovery process ends the transaction, and moves along.

The authors then present modifications of the 2PC commit that optimize the messages sent between the coordinators, and the subordinates. They observe that in the absence of any information in the crashed site about a transaction, the correct response is to abort the transaction. This observation leads to the presumed abort protocol. The protocol takes advantage of knowing that some subordinates might execute complete and partial read-only transactions: ones where there is no UNDO/REDO logs written. For these transactions, we can skip parts of the 2PC protocol. For example, if a subordinate during a prepare statement finds the transaction read-only, it issues a READ vote to the coordinator, and forgets the transaction.  The coordinator then does not include the subordinate in the COMMIT/ABORT phase of the protocol. The coordinator also skips the second phase of the 2PC protocol if it is READ only, and gets READ votes from its subordinates. The authors present other states of the presumed abort protocol, and what messages in the 2PC protocol are skipped.

The authors then examine what would happen if they eliminated the ACKs to the COMMIT messages. The observations lead to the presumed commit modification of the 2PC protocol. In presumed commit, the coordinator behaves similar to the presumed abort protocol, with minor modifications:
  • Before sending a prepare statement, the coordinator collects the names of all subordinates
  • It writes both abort and commit records
  • It requires ACKs for aborts, and not for commits
  • It writes an end record after aborts and not for commits
  • For read only transactions, it does not write any record

The authors end by comparing the log I/O performance, and messages sent of the 2PC protocol, vs Presumed Abort, and Presumed Commit protocols, and describing how to extend the protocols to multi-level tree of processes, where non-leaf nodes act as coordinators as well as subordinates, while leaf nodes act as subordinates, with the root node as a coordinator.

Thursday, December 22, 2016

The wrong end of the stick

One of my favorite activities while traveling is to take a walking tour of the city I am visiting. The tour usually consists of a small group led by a tour guide, who invariably is a student of art or history studying abroad, or an expat humanities graduate who is living abroad and is augmenting their income by giving tours. The tours are always enjoyable, combining stories about the city and its history, architecture, and cultural spots with frequent stops to coffee and dessert shops. Sometimes you get a special tour guide, who in addition to being a history buff, is also a linguistics enthusiast. When that happens, you hear special stories about the historical origin of phrases: something I am very interested in.
In Rome, I had such a tour guide, and the story stuck with me, although I could not verify its accuracy. I could find one website that has a similar reference to the story. It was hilarious and I remembered it to this day. It is the story of the origin of the phrase “the wrong end of the stick.” The tour guide explained that in the old Roman empire, before the advent of toilet paper and private sanitation, people used to go to public toilets to relieve themselves. When they were done, they would wipe themselves using a stick with a sponge at the end, and pass the sticks around after cleaning up. You can imagine how you’d feel if you grabbed the stick by the wrong end.

Wednesday, December 21, 2016

A paper a day keeps the doctor away: BlinkDB: Queries with Bounded Errors and Bounded Response Times on Very Large Data

The latest advances in Big Data systems have made storing and computing over large amounts of data more tractable than in the past. Users' expectations for how long a query should take to complete have not on the other hand  changed, and remain independent of the amount of data that needs to be processed. The expectation mismatch of query run time causes user frustration when iteratively exploring large data sets in search of an insight. How can we alleviate that frustration? BlinkDB offers users a way to balance result accuracy with query execution time: the users can either get quantifiably approximate answers very quickly, or they can elect to wait for a longer period of time to get more accurate results. BlinkDB accomplishes this tradeoff through the magic of dynamic sample selection, and an adaptive optimization framework.

The authors start with an illustrative example of computing the average session time for all users in New York. If the table that stores users' sessions is very large, and cannot fit in memory, the query will take a long time to complete since disk reads are expensive. If the same query ran on a smaller sample of data that could fit in memory, it would run orders of magnitude more quickly. The query would produce approximate results that would be accurate enough for many practical purposes. Sampling theory can help with quantifying the accuracy of the results.

BlinkDB is different from other sampling based systems in that it does not make any assumptions on the values of the filtered attributes in the WHERE, GROUP BY, and HAVING clauses in the SQL queries it processes. The only assumptions it makes is that the set of columns that appear in these filters remain stable over time. This allows BlinkDB to process a flexible variety of workloads. To test the validity of the stable column set assumption, the authors analyzed 18K queries from Conviva--a company managing video distribution across the Internet, and 69K queries from Facebook. In both cases, over 90% of the queries were covered by 10% of the columns.

To process the queries, BlinkDB consists of two modules: one for sample creation, and the other for sample selection. The sample creation module creates stratified samples which ensure that rare values of any column are overly represented relative to a uniform random sample. The stratified sampling strategy allows BlinkDB to answer queries about any value regardless of how frequently it is represented in the underlying data. The sample selection module selects which samples to run queries on, to satisfy the query's error and response time constraints.

BlinkDB supports a constrained set of SQL aggregation queries--COUNT, AVG, SUM, and QUANTILE, where operations can be annotated with error bounds. BlinkDB does not support arbitrary joins, or nested queries, however it supports joining a large sampled table with smaller tables, if the smaller tables can fit in memory on a single node in the cluster.

The authors explain in detail how BlinkDB creates stratified samples. Intuitively, the algorithm starts with a sample size $n$, and counts the distinct dimension value combinations $x$ for the query column sets for that sample size $N_n(x)$. After the counts are complete, for each distinct dimension value combination $x$, the algorithm takes uniform samples from $N_n(x)$ rows randomly without replacement forming a sample $S_x$. The full sample space is the union of all the $S_x$ samples. The sample size $n$ directly determines the error of the query results. Because of the relationship between the sample size $n$ and the error rate, the authors claim that the samples are hierarchical, where $S_n \subset S_{n^{max}}$, and so we don't need to compute multiple sample spaces decreasing the storage overhead for samples. The authors claim that the overhead is roughly $2.5\%$ of the original table size.

The authors explain how BlinkDB selects the query column sets to use to create the stratified samples. They pose the problem as an optimization problem, that factors in the sparsity of the data, the workload characteristics, and the sample storage cost. Once the query column sets are selected, BlinkDB creates the stratified samples, and maintains them over time. Creating uniform samples roughly takes a hundred seconds to create, while stratified samples take between 5 and 30 minutes to create, depending on the number of unique values in the query column sets.

The authors implemented BlinkDB on top of Hive/Hadoop and Shark with minimum changes to the underlying query processing system. BlinkDB adds two major components to the Hive framework: offline sampling modules that creates and maintains samples over time, and a runtime sample selection component. The offline sampling module uses techniques such as reservoir sampling and binomial sampling to create the stratified samples across dimensions.

The authors tested the system on a 100-node cluster using TPC-H benchmarks, and real world workload from Facebook, and Conviva.  The clusters used Amazon's EC2 extra-large instances, each with 8 cores, 68GB of RAM, and 800GB disk space. The cluster used 75TB of disk space, and 6TB of distributed RAM cache.

The authors ran performance experiments using the Conviva data--a single  fact table, with about 104 columns, and 5.5 billion rows. The experiments show that queries on 17 TB of data take about 2 seconds to finish with a 90-98% accuracy vs thousands of seconds using systems that don't use stratified sampling.

The authors end by reviewing other systems in the literature such as Spark, Shark, and Dremel, which work well if the data processed fits into the aggregate memory in the cluster, but break down if the data size increases beyond that. The authors also compare BlinkDB to other systems that employ sampling to speed up the computations such as STRAT, SciBORQ, and AQUA.

Tuesday, December 6, 2016

A paper a day keeps the dr away: Dapper a Large-Scale Distributed Systems Tracing Infrastructure

Modern Internet scale applications are a challenge to monitor and diagnose. The applications are usually comprised of complex distributed systems that are built by multiple teams, sometimes using different languages and technologies. When one component fails or misbehaves, it becomes a nightmare to figure out what went wrong and where. Monitoring and tracing systems aim to make that problem a bit more tractable, and Dapper, a system by Google for large scale distributed systems tracing is one such system.

The paper starts by setting the context for Dapper through the use of a real service: "universal search". In universal search, the user types in a query that gets federated to multiple search backends such as web search, image search, local search, video search, news search, as well as advertising systems to display ads. The results are then combined and presented back to the user. Thousands of machines could be involved in returning that result, and any poor performance in one of them can cause end-user latency. For services such as search, the latency is very important, since end-users are very sensitive to it. How can one diagnose such latency problems and pinpoint the offending sub-service?

Enter Dapper, Google's distributed tracing infrastructure. The authors start by listing the system's requirements and design goals: low monitoring overhead, application level transparency, scalability, and low latency availability of the data--roughly within a minute from generation. The author explain that Dapper chooses application level transparency instead of cooperative monitoring where developers write code to instrument their components, because the latter is fragile due to instrumentation omissions and bugs. To achieve transparency, Dapper restricts tracing instrumentation to a small corpus of ubiquitous threading, control flow, and RPC libraries.  Dapper also uses adaptive sampling to scale the system to the vast amount of telemetry generated, and reduce the overhead of collecting data. The authors compare how Dapper differs from other distributed tracing systems such as Pinpoint, Magpie, and X-trace.

The authors then explain how Dapper stitches federated requests together, as in the example of universal search, where a single query fans out to multiple services, that in turn could fan out the query to another tier of sub-services. The authors explain the two approaches commonly used to stitch the causal relationship between requests: black box scheme, which relies on statistical inference to form the sub-request relationships, and annotation based scheme, where each request is annotated to help form these relations. Dapper implements an annotation based scheme, which is made possible because most services at Google communicate uniformly using RPC. The approach is not restrictive though, since one can instrument other protocols such as HTTP, SMTP, etc. to the same effect.

Dapper models the relationship between requests using concepts such as trees, spans, and annotations.
In a trace, the basic unit of work is the span: identified by a name, span id, and a parent id. A single span models an RPC call, and spans are organized into a trace tree through the causal relationship of the spans that fulfill the request. For example every call to an additional infrastructure layer adds another span at a lower depth in the trace tree. A span contains information from each RPC, which usually involves a client-server pair, with the corresponding annotations (client send/receive, server send/receive, and application specific annotations)

Dapper auto-instruments applications to build trace trees, with spans and annotations at the following points:
  • When a thread handles a traced control path
  • Asynchronous calls through Google's callback libraries
  • Communication through Google's RPC libraries

The tracing is language independent, and supports code written in C++ and Java.

The authors present the Dapper architecture, which implements a three stage process:
  • Instrumented binaries write span data to local disk
  • Daemons pull the instrumentation from all production machines to Dapper collectors
  • Collectors write traces to Big Table with trace ids as the row key, and span ids as the column keys

The median latency for the process from when data is written locally to when it is available in Big Table is 15 seconds.

Dapper exposes an API that makes accessing trace data in Big Table easy. For security and privacy concerns Dapper stores only the names of the RPC methods, and not their payload. The annotations API enables application developers to add payload information if needed on an opt-in basis. The authors share some statistics on Dapper's usage within Google, including usage of the annotations API.

The authors evaluate the telemetry overhead for the generation, and collection stages, as well as the effect on production workloads. The creation overhead comes from generating and destroying spans and annotations, and persisting them to disk. The authors share that root spans add roughly 200ns, and that span annotations add negligible overhead (9ns-40ns) on a $2.2 GHz$ machine. The CPU overhead is roughly 0.3% in the worst case scenario, and networking overhead presents $0.01\%$ of the total network traffic. The latency overhead depends on the sampling rate, with full collection adding $16\%$ overhead to request latency, and $1/16$ sampling and below adding negligible overhead. The authors found that in high volume applications, a sampling rate of $1/1024$ contains enough information for diagnostics.

For lower traffic workloads, Dapper employs adaptive sampling that is parametrized by the desired rate of traces per unit time. The traces record the sampling probability used, which helps with analysis later. With sampling, Dapper users generate $1TB/day$, and store the data for 2 weeks.

In addition to the collection infrastructure, the Dapper team built an eco-system of tools that make accessing and analyzing the data a lot easier, including a depot API that provides trace access by ID,  bulk access through MapReduce operations, and indexed access. Dapper also provides a web interface for users to interact with the data.

The authors end with cataloguing Dapper usage within Google, from use during development phase of Ads Review services to help with performance improvements and discovering bottlenecks, to addressing long tail latency, inferring services dependencies, and network usage of various services.