Iterative Algorithms

One of the strong suits of the Apache Spark is that it provides elegant and concise ways to express iterative algorithms without adding significant computational overhead. It is important though, to understand what are the implications of iterative processing and how to monitor and manage its different aspects.

Iterative Applications and Lineage

Distributed data structures in Spark can be perceived as recursive data structures where each RDD depends on some set of the parent RDDs. To quote the source:

Internally, each RDD is characterized by five main properties:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
    an HDFS file)

Since RDDs are immutable every transformation extends the lineage creating a deeper stack of dependencies. While it is not an issue in case of simple chaining, it can become a serious in iterative processing up to the point where evaluation may become impossible due to stack overflow. Spark provides two different methods to address this problem - checkpointing and flat transformations.

Checkpointing

Checkpointing (don’t be mistaken with checkpointing in Spark Streaming) is a mechanism which enables truncating the lineage. It has following consequences:

  • It saves RDD to Spark checkpoint directory.
  • It removes references to the partent RDDs.

checkpoint is lazy and unless given RDD is cached it will have to be reevaluated.

The biggest advantage of checkpointing is that it is universal. It can be used with any lineage independent of particular transformations.

“Flat Transformations”

As a complementary to truncating lineage we can use transformations which keep lineage short. This can be achieved either with built-in methods like SparkContext.union and PairRDDFunctions.cogroup or by preferring flat transformations over transformation chaining.

Creating an Union of Multiple RDDs

Whenever we perform and iterative union without referencing partial results we can replace transformation chaining with a single call to SparkContext.union. Lets use DAG to illustrate the difference between these two methods. Let’s assume that we have a sequence of RDDs:

  1. val rdds = (0L to 4L).map(i => sc.range(10L * i, 10L * (i + 1L)))

If we chain union calls:

  1. rdds.foldLeft(sc.emptyRDD[Long])((acc, rdd) => acc.union(rdd))

we’ll get a following DAG:

Iterative union

In contrast with a single call to SparkContext.union:

  1. sc.union(rdds)

we’ll get a shallow DAG which looks as shown below:

Single union

Joining Multiple RDDs

Similarly to creating an union it is possible to join multiple RDDs at once using RDD.cogroup although this approach is fairly limited. First of all Spark provide ability to cogroup up to 4 RDDs at the time. Moreover this process is quite expensive and it is not recommended when number of values per key is large. Nevertheless it can be useful in certain situations.

Pushing Transformations to the Local Data Structures

Relatively simple but powerful and efficient way to reduce the length of the lineage is to replace consecutive narrow transformations with their local equivalent. Since basic methods from RDD API have identical equivalents in Iterator API it is as simple as using mapPartitions. For example following chain of transformations:

  1. val rdd = sc.range(0L, 100L)
  2. rdd.filter(_ % 17 == 3).map(x => x until x * x).flatMap(_.map(_ + 1)).filter(_ % 2 != 0)

can be expressed as:

  1. rdd.mapPartitions(
  2. _.filter(_ % 17 == 3).map(x => x until x * x).flatMap(_.map(_ + 1)).filter(_ % 2 != 0)
  3. )

with minimal changes in the code.

This method is particularly useful when working with simple simulations and Monte Carlo methods. Consider following example:

  1. import breeze.stats.distributions.Gaussian
  2. val rdd = sc.parallelize(Array.fill(1000)(0.0))
  3. def simulate(iter: Iterator[Double]) = {
  4. val g = Gaussian.distribution((0.0, 1.0))
  5. iter.map(_ + g.get)
  6. }
  7. // This will result in java.lang.StackOverflowError
  8. (0 to 1000).foldLeft(rdd)((acc, _) => acc.mapPartitions(simulate _)).stats

which should result in java.lang.StackOverflowError while alternative approach:

  1. rdd.mapPartitions(iter => {
  2. (0 to 1000).foldLeft(iter)((acc, _) => simulate(acc))
  3. }).stats

should finish without any issues.

Another advantage of this approach is that it can be easily tested using local data structures without initializing SparkContext.

Unfortunately there is no drop in replacement in Python so working with PySpark requires a little bit more effort. For example with toolz library we can replace:

  1. rdd = sc.parallelize(range(100))
  2. (rdd
  3. .filter(lambda x: x % 17 == 3)
  4. .map(lambda x: range(x, x * x))
  5. .flatMap(lambda xs: (x + 1 for x in xs)))

with:

  1. from toolz.curried import filter, pipe, map, mapcat as flatMap
  2. def f(iter):
  3. return pipe(
  4. iter,
  5. filter(lambda x: x % 17 == 3),
  6. map(lambda x: range(x, x * x)),
  7. flatMap(lambda xs: (x + 1 for x in xs)))
  8. rdd.mapPartitions(f)

Truncating Lineage in Dataset API

Controling Number of Partitions in Iterative Applications