A fable of an Aggregator

June 13, 2020 by guidj

Many parallel data computing tasks can be solved with one abstract data type (ADT). We will describe how an Aggregator does that by walking through a problem we want to solve with parallelism and uncovering the ideal properties of an ADT that enable us to do so.

Relevance of Aggregations: The Desirable ADT

In the world of analytics and machine learning, data processing makes up a significant chunk of the plumbing required to do both. In the world of big data, or medium-sized data for that matter, parallel processing enables efficient usage of disparate computing resources. Quite frequently, the data we’re referring to is represented by a collection of records.

Lest we resort to expanding our analysis over the entirety of computing, let’s by start defining the nature of the type of problems we’re interested in. In analytics workloads, most computations can be broken down to a series of aggregations, in their most basic form. Borrowing from economics, an aggregation would be a summary measure.

To place the concept of an aggregation into context, let’s take a basic problem of adding up numbers - a task that can be applied to numerous domains, such as computing impressions (e.g. the number of times an item was played), distributions (e.g. logins per hour), summary statistics (e.g. mean transaction time), and ratios (e.g. rate of event failures).

A very trivial, and serial, solution to the problem of adding up numbers in a collection is described in the following algorithm:

total = 0
for i in 0...len(collection):
	total += collection[i]

We simply iterate over our collection of numbers and add them individually to a cumulative total. The time complexity of this algorithm is exactly O(n). Should the collection of numbers be so large, O(n) can become a relatively long waiting period.

Abstract data types in computer science can play a pivotal role in changing the dynamics of a given problem. We wish to solve aggregations in parallel, and so, in order to do that we must go in search of the properties that would define an ADT capable of enabling us to do just that.

Our Working Problem

Charging ourselves with figuring out the properties that would define an ADT that is conducive to computing aggregations in parallel, we start by defining a guiding policy of parallel computing. There are three steps for us:

  1. Define a way to break down the problem into pieces, each sub-task belonging to a bucket
  2. Define a way to do our computing on each bucket of sub-tasks
  3. Define a way to put together the results from each bucket into a final result

We’ll address the three steps of breaking down a problem, solving each sub-task, and combining the partial results into a final result by solving for one use case: our collection summation problem.

More formally, given a collection of numbers, we wish to compute their sum.

Finding Desirable Properties

So, how can we sum up a collection of numbers in parallel? To answer this question, we need to determine wether number summation is an operation that lends itself to parallelism. However, parallelism itself is an implementation detail, made possible by the existence of multiple executors that run at the same time. This guarantee can only be made by hardware design. At the application programming level, an operation needs to be made concurrent, and concurrent operations can then be executed in parallel. And so, we rephrase our question and ask: how can we sum up a collection of numbers concurrently?

Concurrency

To determine whether our problem can be solved concurrently, we’ll start by reducing our collection summation problem to its smallest form, and then use induction to arrive at an answer.

If our collection was made up of a single number, there would be no effort needed to arrive at a result. And no concurrency involved either. The result would be the same as the single value present in our collection.

If our collection was made up of two numbers, we would simply add both numbers. This would again require no concurrency.

If our collection was made up of three numbers, we would be restricted to adding two of them together first, and then adding the result of that first addition to the third value. These two steps would have to occur in sequence.

If our collection was made up of four numbers, then we’d have a chance to apply two operations concurrently: adding any two values at the same time, resulting in two partial sums. These partial sums could then be added together in a third step.

Thus, to answer our question, precisely with four numbers we are able to have concurrency. And from that, parallelism can occur. Concurrency is our first property.

Partitioning

The way we solved our problem inductively was to add pairs of values. This implies a breakdown of input, a collection of numbers, into pairs that can be executed concurrently.

We shall call this breakdown partitioning (Not to be confused with data partitioning for storage).

The partitioning is done according to the input to our operation: in this case, pairs of numbers become the single unit of execution for the binary operation of addition. Being able to partition our data to process sub-tasks is our second property.

Associativity

From our induction, we observed that the binary operation of addition that is used on pairs of values from our collection is the same one used on the partial results: we sum up pairs, and sum up the partial sums from those same pairs.

This tells us that not all aggregation operations are conducive to concurrent execution. For instance, number multiplication has the same properties as addition: given a collection, we can multiply any two pairs of numbers and multiply the result of that by other partial multiplications of pairs to yield the same result. Adding strings, on the other hand, does not, because the order in which we add the strings will alter the final result.

And this conduciveness depends both on the operator itself as well as the data on which it is applied.

This property, that the pairing of the the values in our collection is irrelevant, is in fact the associative property of binary operations. This is our third property.

For strings, a binary operation that would work would be to add any two strings and sort the characters after — this way, regardless of the pairing of values, the final result would be the same.

Single Transformer

Finally, we observe than once all sub-tasks’ results are gathered, they can be reduced to produce the final result using the exact same operation.

For the sake of simplicity, we’ll call this property single transformer. It tells us that our ADT should have its binary operation be such that it applies to the first elements and to the partial results equally.


With this, we have arrived at four properties:

  1. Concurrency
  2. Partitioning
  3. Associativity
  4. Single Transformer

Note, however, that properties (1) and (2) refer to how we’d handle the collection of data we wish we process. Meanwhile, (3) and (4) are traits of the operation of our ADT.

Alas, our story isn’t over yet. We must still visit the forest of inevitable trouble.

Addressing Edge Cases

There are two cases we have yet to address: the empty collection and the single element collection. Before we accept our properties for an ADT that would work with our guiding policy of parallelism, we must ensure they work in these edge cases as well.

Single Value Collection

In our induction, early on, we stated that the result of summing up the values in a single value collection would be that same lonely value. However, through our induction, the procedure we followed with collections with more numbers involved adding up pairs of values. If we wanted to avoid special casing, then having a single value collection means we are missing one other value to form a pair.

For summation, in particular, it is evident that this missing value is a zero. With zero added to any value, we get that same value. In introducing zero as a surrogate for a missing pair’s companion, we can thus use the same procedure we have arrived at: adding a pair of values first and then adding up the partial sums.

Empty Collection

Finally, what if our collection is empty? We could be tempted to extend on the solution for the single valued collection, and introduce two surrogate values: Zero One and Zero Two. By adding both zeros we get zero.

There is, however, an alternative course of action. We could say, for instance, that no aggregation is defined for an empty collection. After all, without any value to compute, what summary is there to arrive at?

One can ask why this same rule does not apply to the single value collection use case. The answer is that aggregations, at least some of them, are in fact defined for single value collections. Summing up all the values in a collection of a single value does yield a result. For an empty collection, the result is undefined, and choosing to have it defined to be zero would be just as arbitrary, but murkier to adopt.

So, having solved these two edge cases, the single value and empty collection, we have arrived at our fifth property:

  1. Zero Value

Property (5) refers to the consistency in usage of our operation.

Putting it all Together

In combining some of the properties we have found, we can arrive at some pre-existing ADTs. For instance, in combining the of properties (3) Associativity and (4) Single Transformer, we effectively define a Semigroup. From Wikipedia:

In mathematics, a semigroup is an algebraic structure consisting of a set together with an associative binary operation.

If we add property (5) to our Semigroup, we get a Monoid. Defining all of this as APIs, we arrive at the following:

trait Semigroup[A] {
  def sum(x: A, y: A): A
}

trait Monoid[A] extends Semigroup[A] {
  def zeroValue: A
}

trait Aggregator[A] {
	def aggregate(values: Collection[A], monoid: Monoid[A]): Option[A]
}

A Monoid, as described by our trait definition, is a Semigroup with a zero value. We introduce the concept of an Aggregator, which defines a generalized signature of an aggregator for a collection of values of a given set A, using a Monoid for that same set. The return value of our aggregate function is an Option, since the result will be undefined when the collection is empty - as noted earlier.

It is hard to escape the fact that much of what we discussed, such as the associativity of the binary operation sum, are hardly captured by the signature of our traits. Still, with the properties we outlined, we have arrived at the definition of an ADT that enables parallel computing.

What will shall do in our final reflections is address some of the more earthly practicalities of using Aggregators and doing parallel computing with real machines.

Implementation Bottlenecks

Concurrent Sourcing

We observed how the pairing of values as a way to break down our problem enables concurrent computing. Being free of ordering of pairs means we require less control and coordination. This same rule applies on a larger scale, when we have multiple sources to read and process data from.

But there remains one implicit assumption we have made throughout: that our collection is in fact an ordered set of values. Associativity establishes that the pairing itself is irrelevant, but assumes ordering within the collection to be fixed. This adds one layer of control and complexity.

But, for the analytics applications we have in mind, this rarely need be the case. Diving back to the specific use case, to sum up numbers we needn’t concern ourselves with the order in which we add them:

(A + B) + C = (C + B) + A = (C + A) + B = ...

To relax the constraint of ordering implies that our binary operation need not only be associative, but commutative as well. This is our sixth property. A Semigroup already embodies the notion of an associative binary operation. Thus, what we require is in fact a Commutative Semigroup.

Recursive Sub-tasking

In our theoretical approach, we broke down our collection into many pairs of values to be handed off to an finite or infinite pool of executors. In practice, doing this breakdown itself can be time consuming. It can involve a recursive partitioning of our collection. For very large collections, this is hardly practical.

Returning to property (4), Single Transformer, we note that there is a way to circumvent having to create pairs. One way to do this is to treat our data source as a stream. Given a Monoid and a non-empty collection, we can iterate over the values, add them to the partial sum, and use the partial sum to form a pair with the next value:


(left, right) = (zeroValue, collection[0])
for i in 1...len(collection):
	left = left + right
  right = collection[i]
total = left + right

If we rid ourselves of the pair and extract the total into a cumulative variable, it becomes equivalent to our initial serial implementation:

total = 0
for i in 0...len(collection):
	total += collection[i]

So, rather than dividing our collection into pairs, we can simply break it up into smaller sub-collections and apply our iterative algorithm on each. Note that this does not preclude the use of a Monoid. The iterative nature of this computation is an implementation detail. What changes is what data we feed into our operations and what we do with its output.

The Final Specification

We started our fable in search for an ADT that is conducive for parallel computing of aggregations. From a specific use case, we analyzed the properties of the problem to formulate a type that could address our needs. We found six such properties:

  1. Concurrency
  2. Partitioning
  3. Associativity
  4. Single Transformer
  5. Zero Value
  6. Commutativity

Distilling these into pre-existing ADTs, we get:

  • (3) + (4) + (6): Commutative Semigroup
  • Commutative Semigroup + (5): Commutative Monoid

The Commutative Monoid forms the basis of our ADT, the Aggregator. Properties (1) and (2) pertain to how we handle our collection of values.

And this is where our fable ends. Computing lived concurrently ever after.