Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Core concepts

Sam Ritchie edited this page Aug 27, 2013 · 12 revisions

This page introduces the core concepts of Summingbird. There are two main types of data: streams and snapshots. Streams are full histories of data. Stores are snapshots of system state at a certain time. Below it will help to ask: is this data a stream or a snapshot?

Producer

The Producer is Summingbird’s data stream abstraction. Every job begins by creating an initial Producer[P, T] node out of a P#Source[T] by calling the Producer.source method:

def source[P <: Platform[P], T: Manifest](s: P#Source[T]): Producer[P, T]

Once you’ve created a Producer, all operations in the Producer API are fair game. After building up your desired MapReduce workflow, you’ll need to hand your Producer to an instance of Platform to compile the MapReduce workflow down to that particular Platform’s notion of a “plan”. More on the “plan” later.

Platform

A Summingbird Platform instance can be implemented for any streaming MapReduce library that knows how to make sense of the operations one can perform on a Producer. The Summingbird repository contains Platform implementations for Storm, Scalding and in-memory. Here’s the Platform trait:

trait Platform[P <: Platform[P]] {
  type Source[_]
  type Store[_, _]
  type Sink[_]
  type Service[_, _]
  type Plan[_]

  def plan[T](completed: Producer[P, T]): Plan[T]
}

Each of the type variables locks down one of Summingbird’s core concepts for a particular execution platform. As you build up a graph of operations on your Producer, you’ll pull in various instances of these types. Let’s discuss each type variable in turn.

Source

A Source represents a stream of data. Each execution platform has its own notion of a data source. For example, the Memory platform defines a Source[T] to be any TraversableOnce[T]:

type Source[T] = TraversableOnce[T]

As a result, any Scala sequence is fair game:

import com.twitter.summingbird._
import com.twitter.summingbird.memory._

val producer: Producer[Memory, Int] = Producer.source(Seq(1,2,3))

Storm’s sources are often backed by realtime distributed queues, so the Storm platform’s Source type is a bit different:

type Source[+T] = com.twitter.tormenta.spout.Spout[(Long, T)]

This source is a Tormenta Spout tuned to produce instances of T along with an associated timestamp. While different, the Memory platform’s TraversableOnce[T] and the Storm platform’s Spout[(Long, T)] are, logically, generators of data with sensical implementations of the methods in the Producer API.

Store

State lives in a Store. The Store is the main snapshot data representation. As summingbird works with associative operators (Semigroups and Monoids), store update is always associative:

newStore = assoc(old, value)

where <assoc(assoc(a, b), c) == assoc(a, assoc(b, c))>. The associative property is exploited for parallelism and fault tolerance in combining batch and realtime modes.

This and the following concepts need some documentation love. Treat them as TODOs.

A Store accepts a snapshot of a KeyedProducer's output. In the Memory and Storm cases, a store is defined as a key-value store. In Scalding mode, a store is a SequenceFile containing the key-value pairs of the entire dataset; each value is paired with a BatchID. The value is meant to represent the value for that key, aggregated up to (but not including!) the time at which the paired BatchID begins. That sounds confusing, so I’ll clean it up.

Sink

Unlike the Store, the Sink allows you to materialize a stream representation of the Producer’s value. A Sink is a stream, not a snapshot. In Storm and Memory, a Sink is just a function call; that function call might populate a log stream, or another realtime queue that a further Summingbird topology could pull in as a data source. in the Scalding platform, a Sink outputs a Sequencefile containing ALL instances of T that have been seen in the Producer’s stream, associated with a current time.

In the word count example, inserting a call to write and sinking before a call to sumByKey would produce a stream of (word, 1L) pairs. Calling producer.write(sink) after a sumByKey will produce a stream of monotonically increasing counts for each word, effectively a scanLeft output of the updates to every key in the stream.

Service

A Service can represent reading of both streams and snapshots, or even asynchronous function calls that require negligible time to compute. A service allows the user to perform a leftJoin against the current values within a Producer’s stream. A Service can also provide access to Store that is being materialized earlier in the job, or in a dependant job. One can also create a fixed snapshot to join against, consider a large static lookup table that is too large to fit in memory on any particular node.

In the Memory or Storm cases, a Service is a key-value store against which the stream performs lookups. Before a service join, a producer must have type Producer[P, (K, V)]. A join against a P#Service[K, RightV] will return a Producer[P, (K, (V, Option[RightV]))].

The Scalding platform allows you to join against the stream output by another Summingbird job’s sink. The join is implemented in a clever way that prevents any key on the left side of the join from seeing the right side’s value at a future point in time. Put another way, the Scalding platform’s service join is non-clairvoyant.

Plan

The plan is the final representation of the MapReduce flow produced by a Platform after a call to platform.plan(producer). For Storm, the plan is a StormTopology instance that the user can execute using Storm’s supplied methods. For the Memory platform, the plan is an in-memory Stream[T] containing the output of the supplied producer.

The Scalding platform supplies the user with a PipeFactory[T]. This is the same type as the source, and can be executed as described in the docs for the Scalding platform.