Next generation tools for data science

By DAVID ADAMS

Since inception, this blog has defined “data science” as inference derived from data too big to fit on a single computer. Thus the ability to manipulate big data is essential to our notion of data science. While MapReduce remains a fundamental tool, many interesting analyses require more than it can offer. For instance, the well-known Mantel-Haenszel estimator cannot be implemented in a single MapReduce. Apache Spark and Google Cloud Dataflow represent two alternatives as “next generation” data processing frameworks. This post compares the two, relying on the author’s first-hand experience and subsequent background research.

Introduction


That MapReduce was the solution to write data processing pipelines scalable to hundreds of terabytes (or more) is evidenced by the massive uptake. This was true within Google as well as outside of Google in the form of Hadoop/MapReduce (for some “Hadoop” and “data science” are synonyms). However, it didn’t take long for the pain of writing and running many-stage MapReduce programs to become apparent. MapReduce’s limitations motivated two different groups, AMPLab (Apache Spark) and Google (Cloud Dataflow), to write next-generation data processing frameworks. The two groups came at the problem from different vantage points and this can be seen in both obvious and subtle differences in the end-products. Dataflow was designed as a marriage of Google frameworks for expressing complex batch pipelines (FlumeJava) and streaming (MillWheel) pipelines. Spark was developed at UC Berkeley to enable exploratory analysis and ML at scale.

Six months ago, Google donated the Cloud Dataflow programming model and SDKs to the Apache Software Foundation, resulting in the incubating project Apache Beam. Going forward, Apache Beam will become the way to express data processing pipelines, while Cloud Dataflow remains as a fully managed service for executing these pipelines. Similarly, I’ll refer to the programming model and API as Beam and the service that runs on GCP as Dataflow. Example code in this post uses the current Dataflow SDK, but similar concepts and programming patterns hold in Apache Beam.

In this post, I’m going to shed light on the similarities and differences of Spark and Beam/Dataflow. For framing purposes, Spark’s sweet spot is quickly developing exploratory/interactive analysis and iterative algorithms, e.g., gradient descent and MCMC, whereas Dataflow’s sweet spot is processing streaming data and highly-optimized, robust, fixed pipelines.

The rest of the post is organized as follows: the most important difference between the two, a motivating example with implementations in Spark and Beam/Dataflow, an example of an clustering algorithm written in Spark, examples of streaming processing in Spark and Dataflow, and finally, other major differences between the frameworks.

When MapReduce is not enough

MapReduce is a great tool for single-stage parallel data processing. A few pain points emerge when trying to use MapReduce beyond small pipelines:
  • Expressing complex pipelines requires significant boilerplate, separate programs, and the “interface” between stages to be files.
  • Writing intermediate results to disk between stages of pipelines is a serious bottleneck and forces the user to hand-optimize the placement of these divisions
  • Performing exploratory analysis requires reading and writing to disk, which is slow
  • Expressing streaming pipelines (low-latency and infinite data sources) is not supported,
  • Writing multi-stage pipelines are easily to stumble upon, take for example trying to measure the effects of a randomized experiment on ratios of metrics using Mantel-Haenszel (see more below).
Both Beam and Spark solve most of these problems storing results of MapReduce steps in by optimizing away many steps and storing necessary intermediate results in memory, PCollections for Dataflow and resilient distributed dataset (RDD) for Spark, instead of writing to disk just to load back into memory again.

The core difference: graph evaluation

The fundamental and enduring difference between Spark and Beam is that Spark only builds as much of the computation graph as needed (for lazy evaluation on actions) whereas Dataflow builds the entire computation graph before it is optimized and sent to the service or cluster to be executed. Many significant differences between the two are a consequence of this distinction. Thus it is easy to use Spark for interactive analysis (from a python or scala shell) and to prototype ML algorithms but harder to do so in Dataflow. To be clear, it isn’t that one cannot do ML with Dataflow — single pass, streaming algorithms are fine. But many ML algorithms, such as basic gradient descent and MCMC, make data-driven decisions of how many iterations they need to run until convergence. Spark provides the user with greater flexibility.

On the other hand, the benefits of requiring a complete computation graph are that it allows systems that execute Beam pipelines, like Dataflow to optimize the complete graph. More importantly, decoupling graph construction from execution lets Beam create a language-agnostic representation of the computation graph. Thus, Dataflow graphs can be executed on other distributed processing back-ends, even including Spark. This property is what enabled the creation of the Apache Beam project.

Another consequence of Beam’s decision to require specification of the full computation graph is that Dataflow may be offered as a service. This running-a-pipeline-job-as-a-service takes a graph specification and data inputs and produces outputs. The Cloud Dataflow Service can be thought of as a black box where details such as the number of VMs to use, overall or per stage, or distribution of work can all be left up to the service. Of course, most of these details can be specified if desired, see here for a more expansive discussion. This is in stark comparison to Spark, which is cluster-oriented instead of job-oriented. To run a fixed pipeline in Spark requires spinning up a Spark cluster, running the pipeline and tearing the cluster down. Even though there are tools to make this process easier (see Cloud Dataproc on GCP or Databricks on AWS) cluster management remains the responsibility of the user.

A motivating example for going beyond Hadoop/MapReduce

To compare the two frameworks, I’m going to solve the same problem twice then highlight key differences. Suppose we have on an online auction site with a wide variety of products for sale. Users browse the images and specifications of products in the inventory and may choose to bid on them. Because it is an auction site, prices for any item vary all the time. Products range in value from a few dollars (emoji eraser kits) to thousands (nitro coffee kits) so an important way we track them is by product category.

We want to run a randomized control experiment to determine the effect of high-resolution images on our sales. Say, we wish to determine the impact of the experiment on average sell price. The naive thing to do would be simply to compare the average selling price of treatment to that of control. However this could be misleading — this measure will show a difference even if the price for each individual item remained constant and only the mix of products sold was affected. To be concrete, imagine the experiment caused us to sell relatively more nitro coffee kits than emoji kits. The Mantel-Haenszel estimator (also known as Cochran-Mantel-Haenszel) is meant precisely to address this problem by adjusting for the mix of items being compared between treatment and control. The basic idea is first to compute the average price separately in treatment and control for each individual stratum (aka "slice") and then to take the ratio of weighted combination of prices in treatment to that of control. The key is that the combination weights are the same for treatment and control, and these are chosen in some way to minimize estimator variance.

Widely used in medicine for count data, the MH estimator and its generalizations are ubiquitous within data science at Google. In a future post we will cover applications and extensions of MH to analytic problems at Google. For now, we'll just go ahead and compute point MH estimates for this (non-count) price data, using product categories as our slices. Let's say in product category $i$ we sell $n_{t,i}$ and $n_{c,i}$ items in treatment and control respectively for a total sale value of $X_{t,i}$ and $X_{c,i}$. The MH estimate for the (mix-adjusted) ratio of prices between treatment and control is
$$
MH(t,c)=\frac{\sum_i w_i \frac{X_{t,i}}{n_{t,i}}} {\sum_i w_i \frac{X_{c,i}}{n_{c,i}}}
$$ where the weights $w_i$ is the harmonic mean of $n_{t,i}$ and $n_{c,i}$. To facilitate computation, we will rewrite the formula as
$$
MH(t,c)=\frac{\sum_i X_{t,i} \left(\frac{n_{c,i}}{n_{t,i} + n_{c,i}}\right)}
                       {\sum_i X_{c,i} \left(\frac{n_{t,i}}{n_{t,i} + n_{c,i}}\right)}
$$ taking care to omit terms where $n_{t,i} + n_{c,i} = 0$. Let's start with a csv of the following schema:
 exp_id, product_id, price, sale_count
If I wrote a MapReduce pipeline to calculate the MH ratio of prices, it would require three MapReduce programs:
  1. Map (if header skip else value ) → Reduce (sum reducer)
  2. Map (key value ) → Reduce (no-op -- value is iter)
  3. Map (key <> value ) → Reduce (sum reducer)
The net result would be a significant amount of code, considering a trivial word-count MapReduce requires roughly 60 lines of mostly boilerplate Java code. That’s a lot of code that I’m not going to write. For the curious reader, much has been written about Hadoop/Mapreduce versus Spark, such as this post at Dattamsha.

Compare Spark vs. Google Dataflow/Beam
A mashup of Apache Spark and Dataflow/Beam

MH in python

Readers who prefer Python to prose can see the full Jupyter notebook solution in my Spark vs Dataflow github project. To make the comparison easier, I separate the business logic from parallelized code. There are two business logic functions, calc_numerator and calc_denominator, that take all of count and price data for a particular product and calculate the numerator (and denominator) of the above formula.

MH in Apache Spark

The first thing to do for Spark is start the Spark shell, which involves adding the a few things to the PATH and running execfile on pyspark/shell.py (see the notebook for details). Next comes the Spark code:

from operator import add

# We want to calculate MH(v_{t,i},n_{t,i},v_{c,i},n_{c,i}), where t and c are treatment
#
and control. v and n in our cases are value of the sale prices and sale_count.
input_rdd = sc.textFile('sim_data_{0}_{1}.csv'.format(NUM_LE, NUM_HE))
header = input_rdd.first() # Remove the first line.
parsed_input_rdd = input_rdd.filter(lambda x: x !=header)
.map(lambda x: convert_line(x.split(',')))
transformed = parsed_input_rdd.map(lambda x: ((x[exp], x[prod]),
(x[sale_count]*x[price], x[sale_count])))

(sp, clks) = (0, 1) # sale price and sale_count
(ep, spc) = (0, 1) # exp_id&product_id, sp&sale_count
(exp2, prod2) = (0, 1) # exp_id, product_id


# For each product cross exp_id, sum the sale prices and sale_count
grouped_result = transformed.reduceByKey(lambda x,y: (x[sp]+y[sp], x[clks]+y[clks]))
grouped_by_product = grouped_result.map(lambda x: ((x[ep][prod2]), (x[ep][exp2], x[spc][sp], x[spc][clks]))).groupByKey()

numerator_sum = grouped_by_product.map(lambda x: calc_numerator(x)).reduce(add)
denominator_sum = grouped_by_product.map(lambda x: calc_denominator(x)).reduce(add)
effect = numerator_sum / denominator_sum
print(numerator_sum, denominator_sum, effect)

Spark functions fall into two categories: transformations that manipulate the data record by record (e.g., map and filter), and actions that cause the data to be reorganized (e.g., groupByKey, reduceByKey). The significance of the distinction is that transformations have no immediate effect and are only acted upon when an action is reached, that is, Spark does lazy evaluation. 

Excluding business-logic, the program is only a handful of lines of code.
  • sc.textFile(...) transforms a file on disk into an RDD (the distributed data structure in Spark)
  • input_rdd.first() acts on the RDD returning first, header, element to the driver (my notebook).
  • input_rdd.filter(...).map(...) transforms input_rdd removing the header then converts each csv line into floats and ints.
  • parsed_input_rdd.map(...) transforms records into key-value tuples ((exp_id, product_id), (cost, clicks))
  • transformed.reduceByKey(...) acts on transformed causing input_rdd.filter(...).map(...) and parsed_input_rdd.map(...) to be executed and produces the total clicks and cost by (exp_id, product_id)
  • grouped_result.map(...).groupByKey() acts to produce the same data, only grouped by product_id instead of product_id and experiment_id
  • grouped_by_product.map(...).reduce(add) transforms the data per product_id into the numerator and denominator of the MH calculation and then performs the action of summing the results using the add function.

MH in Apache Beam

The organization of the Dataflow code is rather similar to Spark overall, with a handful of subtle, but important distinctions. One of the less interesting differences is that Dataflow doesn’t yet have a sum function for the reduce, so I have to write my own (t_sum). Note that this code is using the Dataflow SDK (not the new Beam SDK).

import apache_beam as beam

def t_sum(values):
   result = [0,0]
   for v in values:
       result[0] += v[0]
       result[1] += v[1]
   return (result[0], result[1])

# Create a pipeline executing on a direct runner (local, non-cloud).
# DirectPipelineRunner is the default runner, I'm setting it here to show how one
# would change it to run on the Dataflow Service.
pipeline_options = beam.utils.options.PipelineOptions(['--runner=DirectPipelineRunner'])
p = beam.Pipeline(options=pipeline_options)

parsed_input_rdd = (p
| 'load records' >>
beam.io.Read(beam.io.TextFileSource('sim_data_{0}_{1}.csv'.format(NUM_LE, NUM_HE)))
| 'filter header' >> beam.Filter(lambda x: x[0] != '#')
| 'split line' >> beam.Map(lambda x: convert_line(x.split(','))))
transformed = (parsed_input_rdd
| 'reshape' >> beam.Map((lambda x: ((x[exp], x[prod]),
(x[price]*x[sale_count], x[sale_count])))))

(sp, clks) = (0, 1) # sale price and sale_count
(ep, spc) = (0, 1) # exp_id&product_id, sp&sale_count
(exp2, prod2) = (0, 1) # exp_id, product_id

# For each product cross exp_id, sum the sale prices and sale_count
grouped_result = (transformed
| 'combine per product/id' >> beam.CombinePerKey(t_sum))
grouped_by_product = (grouped_result
| 'keyByExpProduct' >> beam.Map(lambda x: ((x[ep][prod2]),
(x[ep][exp2], x[spc][sp], x[spc][clks])))
| 'group' >> beam.GroupByKey())

numerator_sum = (grouped_by_product
| 'MapForNum' >> beam.Map(lambda x: calc_numerator(x))
| 'CombineNum' >> beam.CombineGlobally(sum))
numerator_sum | 'save numerator' >> beam.io.Write(beam.io.TextFileSink('./numerator_sum'))

denominator_sum = (grouped_by_product
| 'MapForDenom' >> beam.Map(lambda x: calc_denominator(x))
| 'CombineDenom' >> beam.CombineGlobally(sum))
denominator_sum | 'save denominator' >>
beam.io.Write(beam.io.TextFileSink('./denominator_sum'))
p.run()

Beam is conceptually two pieces: pipeline construction and pipeline execution. beam.Pipeline() returns a pipeline, say p, on which to build (using beam.Map, beam.GroupByKey, etc.) and p.run() executes the pipeline on a cluster, by the Dataflow Service, or in our case, locally.
  • beam.Pipeline(options=pipeline_options) begins constructing a pipeline to run locally. 
  • p | beam.io.Read(...) | beam.Filter(...) | beam.Map(...) add reading the file, filtering lines that look like the header (starting with ‘#’), converting each line into floats and ints to the graph.
  • parsed_input_rdd | beam.Map(...) adds mapping each record to be keyed by exp_id, product_id to the graph
  • transformed | beam.CombinePerKey(...) | beam.Map(...) | beam.GroupByKey() adds summing clicks and cost by exp_id, product_id and regrouping by product_id to the graph
  • grouped_by_product | beam.Map(...) | beam.CombineGlobally(...) adds calculating the numerator/denominator values and the global sum to the graph
  • numerator_sum | beam.Write(...) adds a sync for the numerator (there is a matching output for the denominator).
  • p.run() optimizes constructed graph and ships the result to be executed (in our case the local machine)

Comparison of implementations

At a high-level, both implementations look similar and use the same basic parallel operations. The first difference to point out is that Spark executes the graph only on actions, e.g., reduceByKey, whereas the runner executing Beam, e.g., Cloud Dataflow, executes the complete graph (when run is invoked). Another difference is that Dataflow graphs require sources and sinks, which means results must be piped to files and cannot be returned to the calling program (with the exception of using the local runner).

Spark’s sweet-spot: iterative algorithms

Spark started as a solution for doing exploratory analysis and Machine Learning. One of the simplest ways to show this off is the clustering technique k-means clustering. K-means works by repeatedly applying two steps, assigning points to clusters and updating cluster centers, until the location of the cluster centers have stabilized. Below is the core of the algorithm written in Spark.
# Load the data, remove the first line, and pick initial locations for the clusters.
# We put the x,y points in pt_rdd, and RDD of PtAgg (a class containing x, y,
# and count).
pt_rdd = parsed_input_rdd.map(lambda x: PtAgg(x[0], x[1], 1))
MAX_STEPS = 100;  MIN_DELTA = 0.001; delta = 1.0; step = 0
while delta > MIN_DELTA and step < MAX_STEPS:
   step += 1
   c_centers_old = copy.deepcopy(c_centers)
   b_c_centers = sc.broadcast(c_centers_old)
   # For every point, find the cluster its closer to and add to its total x, y, and count
   totals = pt_rdd.map(lambda x: pick_closest_center(x, b_c_centers.value)).reduce(
       lambda a,b: center_reduce(a,b))
   # Now update the location of the centers as the mean of all of the points closest to it
   # (unless there are none, in which case pick a new random spot).
   c_centers = [t.Normalize() if t.cnt != 0 else random_point_location() for t in totals]
   
   # compute the distance that each cluster center moves, the set the max of those as
   # the delta used to the stop condition.
   deltas = [math.sqrt(c.DistSqr(c_old)) for c, c_old in zip(c_centers, c_centers_old)]
   delta = max(deltas)
       
s = ' '.join([str(x) for x in c_centers])
print('final centers: {0}'.format(s))
  • parsed_input_rdd.map(...) maps the raw into into the PtAgg class we use to represent points
  • sc.broadcast(c_centers_old) sends the locations of the cluster centers to all machines holding RDDs
  • pt_rdd.map(...).reduce(...) maps each point to the center it’s closest to and then reduces to produce the average location of the points that are closest to each center (the result is in the driver script)
  • [t.Normalize() if t.cnt != 0 else random_point_location() for t in totals] updates the new cluster locations.
As it happens, I didn’t actually need to write that code because Spark has an ML library (mllib) containing many algorithms including k-means. In practice, k-means is a just a few functions calls, details are here.

Here is where the difference between Spark and Beam is most apparent — no iterative algorithm that can stop early, including k-means, can be expressed in Beam. This is because Beam builds the computation graph, then optimizes and ships it to be executed. With iterative algorithms, the complete graph structure cannot be known before hand (don’t know how many loops will be done), so it can’t be expressed in Beam. It is possible to express “loops” in Dataflow, but only a fixed number of loops can be added.

Beam/Dataflow’s sweet spot: streaming processing

Streaming processing is an ever-increasingly important topic for data science. After all, who among us really wants to wait for a daily batch pipeline to tell us how our live traffic experiments are doing? For a great introduction to streaming processing, I highly encourage reading Tyler Akidau’s two blog posts on the topic, The world beyond batch: Streaming 101 and 102.

When Dataflow was released, its strongest selling point was streaming (just read the paper). That’s not to say that Spark doesn’t support streaming process or that it lacks a unified engine for batch and streaming. The significant step Dataflow initially made was (1) a truly unified batch and streaming API and (2) support for event time processing of data, that is, analysis of the results windowed by when they happened not when they reached the analysis machine, (3) and a focus on watermarks (a method for tracking collection progress on unbounded data) and window completeness (has all of the data for this time period been collected). More practically, Dataflow, now Beam, has a streaming API that cleanly separates the important questions of streaming processing: what, where, when, how). It’s worth noting that different runners currently have different levels of support, e.g., Dataflow supports streaming for Java, but not yet for Python.

All this is easiest to show using the examples the Dataflow team used in their recent blog post, which is also a good read. The starkest contrast between the two is shown by their example of calculating hourly team scores for an online game.

Other differences in framework “philosophy”

There are several design decisions to show that Spark favors fast development by default and requires users to opt-in to performance whereas Dataflow favors high performance by default with the cost of slower development time.

Caching

Dataflow avoids the need for caching results by “fusing” sibling stages that would need the same input. Fusion comes with the limitation that mutable types should not be used. This is because the values are shared among fused sibling and modification would cause invalid results (this is verified against at run-time). This means writing correct Beam code with mutable types requires significantly more care. Spark re-computes by default, which can be much slower but doesn’t come with correctness issues. Spark does allow for many different caching methods, should caching be desired.

Equality

Dataflow uses a fast, language agnostic, serialized byte-level equality for comparing classes in GroupByKey and other steps. This becomes a huge issue if the program uses grouping operations on classes that contain hashmaps, sets, or anything that causes semantic equality to differ from byte equality. Spark deserializes classes and uses the class’s comparison operator for grouping by default and allows user to opt-into byte-level equality making the opposite tradeoff.

Overall performance

Unfortunately, we currently can’t make any definitive statements about which framework has better overall performance. The only study that compared Beam on the Dataflow Service to Spark used an older version of Spark (1.3). Given the significant performance increases with Spark 2.0, it’s unclear which framework has better performance at present.

Conclusion

Spark and Dataflow both solve the limitations of MapReduce with different viewpoints. Which tool is most appropriate will depend on the project. If the focus is interactivity, data exploration or algorithm development, Spark is probably the better option. If the focus is complex streaming processing (especially event time) then Beam would seem a better fit. For running scalable production pipelines, Beam on Cloud Dataflow is likely the better choice.