Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #227 from twitter/feature/better-storm-tests
Browse files Browse the repository at this point in the history
Fix Storm Tests and ClassCastException in Storm jobs
  • Loading branch information
johnynek committed Sep 19, 2013
2 parents 5d1f0d2 + f422942 commit 23ee185
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 84 deletions.
1 change: 1 addition & 0 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ object SummingbirdBuild extends Build {
)

lazy val summingbirdStorm = module("storm").settings(
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % algebirdVersion,
withCross("com.twitter" %% "bijection-core" % bijectionVersion),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ case class StormEnv(override val jobName: String, override val args: Args)
args.optional("name")
.getOrElse(jobName.split("\\.").last)

Storm.remote(classSuffix, builder.opts)
Storm.remote(builder.opts)
.withConfigUpdater { config =>
val c = ConfigBijection.invert(config)
val transformed = ConfigBijection(ajob.transformConfig(c))
KryoRegistrationHelper.registerInjections(transformed, eventCodecPairs)
KryoRegistrationHelper.registerInjectionDefaults(transformed, codecPairs)
transformed
}.run(builder.node.name(builder.id).asInstanceOf[Producer[Storm, _]])
}.run(
builder.node.name(builder.id).asInstanceOf[Producer[Storm, _]],
classSuffix
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.twitter.util.Future
import java.io.{ Closeable, Serializable }

// Represents the logic in the flatMap bolts
trait FlatMapOperation[-T, +U] extends Serializable with Closeable { self =>
trait FlatMapOperation[-T, +U] extends Serializable with Closeable {
def apply(t: T): Future[TraversableOnce[U]]

override def close { }
Expand All @@ -32,14 +32,19 @@ trait FlatMapOperation[-T, +U] extends Serializable with Closeable { self =>
* case we don't want to completely choke on large expansions (and
* joins).
*/
def andThen[V](fmo: FlatMapOperation[U, V]): FlatMapOperation[T, V] =
def andThen[V](fmo: FlatMapOperation[U, V]): FlatMapOperation[T, V] = {
val self = this // Using the standard "self" at the top of the
// trait caused a nullpointerexception after
// serialization. I think that Kryo mis-serializes that reference.
new FlatMapOperation[T, V] {
def apply(t: T) = self(t).flatMap { tr =>
val next: Seq[Future[TraversableOnce[V]]] = tr.map { fmo.apply(_) }.toSeq
Future.collect(next).map(_.flatten) // flatten the inner
}

override def close { self.close; fmo.close }
}
}
}

class FunctionFlatMapOperation[T, U](@transient fm: T => TraversableOnce[U])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ sealed trait StormService[-K, +V]
case class StoreWrapper[K, V](store: StoreFactory[K, V]) extends StormService[K, V]

object Storm {
def local(name: String, options: Map[String, Options] = Map.empty): LocalStorm =
new LocalStorm(name, options, identity)
def local(options: Map[String, Options] = Map.empty): LocalStorm =
new LocalStorm(options, identity)

def remote(name: String, options: Map[String, Options] = Map.empty): RemoteStorm =
new RemoteStorm(name, options, identity)
def remote(options: Map[String, Options] = Map.empty): RemoteStorm =
new RemoteStorm(options, identity)

def timedSpout[T](spout: Spout[T])
(implicit timeOf: TimeExtractor[T]): Spout[(Long, T)] =
Expand Down Expand Up @@ -210,7 +210,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config
val stormSpout = optionMaps.foldLeft(spout.asInstanceOf[Spout[(Long, Any)]]) {
case (spout, OptionMap(op)) =>
spout.flatMap { case (time, t) =>
op.asInstanceOf[Any => Option[Nothing]].apply(t)
op.asInstanceOf[Any => Option[_]].apply(t)
.map { x => (time, x) } }
case _ => sys.error("not possible, given the above call to span.")
}.getSpout
Expand Down Expand Up @@ -258,7 +258,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config

private def foldOperations(head: FMItem, tail: List[FMItem]) = {
val operation = head match {
case OptionMap(op) => FlatMapOperation(op.andThen(_.iterator))
case OptionMap(op) => FlatMapOperation(op.andThen(_.iterator).asInstanceOf[Any => TraversableOnce[Any]])
case FactoryCell(store) => serviceOperation(store)
case FlatMap(op) => op
}
Expand All @@ -267,7 +267,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config
acc.asInstanceOf[FlatMapOperation[Any, (Any, Any)]],
store.asInstanceOf[StoreFactory[Any, Any]]
).asInstanceOf[FlatMapOperation[Any, Any]]
case (acc, OptionMap(op)) => acc.andThen(FlatMapOperation[Any, Any](op.andThen(_.iterator)))
case (acc, OptionMap(op)) => acc.andThen(FlatMapOperation[Any, Any](op.andThen(_.iterator).asInstanceOf[Any => TraversableOnce[Any]]))
case (acc, FlatMap(op)) => acc.andThen(op.asInstanceOf[FlatMapOperation[Any, Any]])
}
}
Expand Down Expand Up @@ -396,30 +396,29 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config
populate(topologyBuilder, summer, name)
topologyBuilder.createTopology
}
def run(summer: Producer[Storm, _]): Unit = run(plan(summer))
def run(topology: StormTopology): Unit
def run(summer: Producer[Storm, _], jobName: String): Unit = run(plan(summer), jobName)
def run(topology: StormTopology, jobName: String): Unit
}

class RemoteStorm(jobName: String, options: Map[String, Options], updateConf: Config => Config)
extends Storm(options, updateConf) {
class RemoteStorm(options: Map[String, Options], updateConf: Config => Config) extends Storm(options, updateConf) {

override def withConfigUpdater(fn: Config => Config) =
new RemoteStorm(jobName, options, updateConf.andThen(fn))
new RemoteStorm(options, updateConf.andThen(fn))

def run(topology: StormTopology): Unit = {
override def run(topology: StormTopology, jobName: String): Unit = {
val topologyName = "summingbird_" + jobName
StormSubmitter.submitTopology(topologyName, baseConfig, topology)
}
}

class LocalStorm(jobName: String, options: Map[String, Options], updateConf: Config => Config)
class LocalStorm(options: Map[String, Options], updateConf: Config => Config)
extends Storm(options, updateConf) {
lazy val localCluster = new LocalCluster

override def withConfigUpdater(fn: Config => Config) =
new LocalStorm(jobName, options, updateConf.andThen(fn))
new LocalStorm(options, updateConf.andThen(fn))

def run(topology: StormTopology): Unit = {
override def run(topology: StormTopology, jobName: String): Unit = {
val topologyName = "summingbird_" + jobName
localCluster.submitTopology(topologyName, baseConfig, topology)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@

package com.twitter.summingbird.storm

import backtype.storm.LocalCluster
import backtype.storm.{ LocalCluster, Testing }
import backtype.storm.testing.{ CompleteTopologyParam, MockedSources }
import com.twitter.algebird.{MapAlgebra, Monoid}
import com.twitter.storehaus.{ ReadableStore, JMapStore }
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.summingbird._
import com.twitter.summingbird.batch.{BatchID, Batcher}
import com.twitter.summingbird.storm.spout.TraversableSpout
import com.twitter.tormenta.spout.Spout
import com.twitter.util.Future
import java.util.{Collections, HashMap, Map => JMap, UUID}
import java.util.concurrent.atomic.AtomicInteger
import org.specs._
import org.scalacheck._
import org.scalacheck.Prop._
import org.scalacheck.Properties
Expand All @@ -37,8 +40,6 @@ import scala.collection.mutable.{
SynchronizedBuffer,
SynchronizedMap
}
import Constants.DEFAULT_SPOUT_PARALLELISM

/**
* Tests for Summingbird's Storm planner.
*/
Expand All @@ -52,34 +53,19 @@ case class TestState[T, K, V](
placed: AtomicInteger = new AtomicInteger
)

object StormLaws extends Properties("Storm") {
object StormLaws extends Specification {
import MapAlgebra.sparseEquiv

// This is dangerous, obviously. The Storm platform graphs tested
// here use the UnitBatcher, so the actual time extraction isn't
// needed.
implicit def extractor[T]: TimeExtractor[T] = TimeExtractor(_ => 0L)
implicit val batcher = Batcher.unit

def createGlobalState[T, K, V] =
new MutableHashMap[String, TestState[T, K, V]]
with SynchronizedMap[String, TestState[T, K, V]]

/**
* Returns a serializable iterator that wraps the supplied
* TraversableOnce[T]. Every time "next" is accessed, the supplied
* side-effect onNext is called with the item to be returned.
*/
def toIterator[T](supplier: => TraversableOnce[T])(onNext: T => Unit): Iterator[T] =
new Iterator[T] with java.io.Serializable {
lazy val iterator = supplier.toIterator
override def hasNext = iterator.hasNext
override def next = {
val ret = iterator.next
onNext(ret)
ret
}
}

/**
* Global state shared by all tests.
*/
Expand All @@ -89,7 +75,7 @@ object StormLaws extends Properties("Storm") {
* Returns a MergeableStore that routes get, put and merge calls
* through to the backing store in the proper globalState entry.
*/
def testingStore(id: String)(onMerge: () => Unit) =
def testingStore(id: String) =
new MergeableStore[(Int, BatchID), Int] with java.io.Serializable {
val monoid = implicitly[Monoid[Int]]
def wrappedStore = globalState(id).store
Expand All @@ -101,14 +87,14 @@ object StormLaws extends Properties("Storm") {
wrappedStore.put(k, optV)
else
wrappedStore.remove(k)
onMerge()
globalState(id).placed.incrementAndGet
Future.Unit
}
override def merge(pair: ((Int, BatchID), Int)) = {
val (k, v) = pair
val newV = Monoid.plus(Some(v), getOpt(k)).flatMap(Monoid.nonZeroOption(_))
wrappedStore.put(k, newV)
onMerge()
globalState(id).placed.incrementAndGet
Future.Unit
}
}
Expand All @@ -120,7 +106,17 @@ object StormLaws extends Properties("Storm") {
*/
val testFn = { i: Int => List((i -> i)) }

val storm = Storm.local("scalaCheckJob")
val storm = Storm.local()

val completeTopologyParam = {
val ret = new CompleteTopologyParam()
ret.setMockedSources(new MockedSources)
ret.setStormConf(storm.baseConfig)
ret.setCleanupState(false)
ret
}

def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get

/**
* Perform a single run of TestGraphs.singleStepJob using the
Expand All @@ -132,63 +128,84 @@ object StormLaws extends Properties("Storm") {
globalState += (id -> TestState())

val cluster = new LocalCluster()
val items = toIterator(original)(globalState(id).used += _)

val job = mkJob(
Storm.source(Spout.fromTraversable(items)),
MergeableStoreSupplier(() => testingStore(id)(() => globalState(id).placed.incrementAndGet), Batcher.unit)
Storm.source(TraversableSpout(original)),
MergeableStoreSupplier(() => testingStore(id), Batcher.unit)
)

val topo = storm.plan(job)
val parallelism = DEFAULT_SPOUT_PARALLELISM.parHint

// Submit the topology locally.
cluster.submitTopology("testJob", storm.baseConfig, topo)

// Wait until the topology processes all elements.
while (globalState(id).placed.get < (original.size * parallelism)) {
Thread.sleep(10)
}
Testing.completeTopology(cluster, topo, completeTopologyParam)
// Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667
Thread.sleep(1000)
cluster.shutdown

(testFn, globalState(id))
}

property("StormPlatform matches Scala for single step jobs") =
forAll { original: List[Int] =>
val (fn, returnedState) =
runOnce(original)(
TestGraphs.singleStepJob[Storm, Int, Int, Int](_,_)(testFn)
)
Equiv[Map[Int, Int]].equiv(
TestGraphs.singleStepInScala(returnedState.used.toList)(fn),
returnedState.store.asScala.toMap
.collect { case ((k, batchID), Some(v)) => (k, v) }
"StormPlatform matches Scala for single step jobs" in {
val original = sample[List[Int]]
val (fn, returnedState) =
runOnce(original)(
TestGraphs.singleStepJob[Storm, Int, Int, Int](_,_)(testFn)
)
}
Equiv[Map[Int, Int]].equiv(
TestGraphs.singleStepInScala(original)(fn),
returnedState.store.asScala.toMap
.collect { case ((k, batchID), Some(v)) => (k, v) }
) must beTrue
}

val nextFn = { pair: ((Int, (Int, Option[Int]))) =>
val (k, (v, joinedV)) = pair
List((k -> joinedV.getOrElse(10)))
}

val serviceFn = Arbitrary.arbitrary[Int => Option[Int]].sample.get
val service = StoreWrapper[Int, Int](() =>
ReadableStore.fromFn(serviceFn)
)

property("StormPlatform matches Scala for left join jobs") =
forAll { original: List[Int] =>
val (fn, returnedState) =
runOnce(original)(
TestGraphs.leftJoinJob[
Storm, Int, Int, Int, Int, Int
](_, service, _)(testFn)(nextFn)
)
Equiv[Map[Int, Int]].equiv(
TestGraphs.leftJoinInScala(returnedState.used.toList)(serviceFn)
(fn)(nextFn),
returnedState.store.asScala.toMap
.collect { case ((k, batchID), Some(v)) => (k, v) }
val service = StoreWrapper[Int, Int](() => ReadableStore.fromFn(serviceFn))

"StormPlatform matches Scala for left join jobs" in {
val original = sample[List[Int]]

val (fn, returnedState) =
runOnce(original)(
TestGraphs.leftJoinJob[Storm, Int, Int, Int, Int, Int](_, service, _)(testFn)(nextFn)
)
}
Equiv[Map[Int, Int]].equiv(
TestGraphs.leftJoinInScala(original)(serviceFn)
(fn)(nextFn),
returnedState.store.asScala.toMap
.collect { case ((k, batchID), Some(v)) => (k, v) }
) must beTrue
}

"StormPlatform matches Scala for optionMap only jobs" in {
val original = sample[List[Int]]
val id = UUID.randomUUID.toString

val cluster = new LocalCluster()

globalState += (id -> TestState())

val producer =
Storm.source(TraversableSpout(original))
.filter(_ % 2 == 0)
.map(_ -> 10)
.sumByKey(Storm.store(testingStore(id)))

val topo = storm.plan(producer)

Testing.completeTopology(cluster, topo, completeTopologyParam)
// Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667
Thread.sleep(1000)
cluster.shutdown

Equiv[Map[Int, Int]].equiv(
MapAlgebra.sumByKey(original.filter(_ % 2 == 0).map(_ -> 10)),
globalState(id).store.asScala
.toMap
.collect { case ((k, batchID), Some(v)) => (k, v) }
) must beTrue
}
}

0 comments on commit 23ee185

Please sign in to comment.