Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Test to illustrate https://github.com/twitter/summingbird/issues/671 #672

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
Copyright 2013 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.scalding

import cascading.flow.FlowDef
import cascading.pipe.Pipe
import cascading.property.ConfigDef
import cascading.property.ConfigDef.Setter
import cascading.tuple.Fields
import com.twitter.scalding.{Test => TestMode, _}
import com.twitter.summingbird._
import com.twitter.summingbird.batch.option.Reducers
import com.twitter.summingbird.option.MonoidIsCommutative
import org.scalatest.WordSpec

/**
* Tests for application of named options.
*/
class NamedOptionsSpec extends WordSpec {

private val ReducerKey = "mapred.reduce.tasks"
private val FlatMapNodeName1 = "FM1"
private val FlatMapNodeName2 = "FM2"
private val SummerNodeName1 = "SM1"
private val SummerNodeName2 = "SM2"

private val IdentitySink = new Sink[Int] {
override def write(incoming: PipeFactory[Int]): PipeFactory[Int] = incoming
}

implicit def timeExtractor[T <: (Int, _)] =
new TimeExtractor[T] {
override def apply(t: T) = t._1.toLong
}

def pipeConfig(pipe: Pipe): Map[String, List[String]] = {
val configCollector = new Setter {
var config = Map.empty[String, List[String]]
override def set(key: String, value: String): String = {
if (config.contains(key)) {
config = config.updated(key, value :: config(key))
} else {
config += key -> List(value)
}
""
}
override def get(key: String): String = ???
override def update(key: String, value: String): String = ???
}

def recurse(p: Pipe): Unit = {
val cfg = p.getStepConfigDef
if (!cfg.isEmpty) {
cfg.apply(ConfigDef.Mode.REPLACE, configCollector)
}
p.getPrevious.foreach(recurse(_))
}

recurse(pipe)
configCollector.config
}

def verify[T](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is T?

options: Map[String, Options],
expectedReducers: Int)(
jobGen: (Producer[Scalding, (Int, Int)], scalding.Store[Int, Int]) => TailProducer[Scalding, Any]) = {

val src = Scalding.sourceFromMappable { dr => IterableSource(List.empty[(Int, Int)]) }
val store = TestStore[Int, Int]("store", TestUtil.simpleBatcher, Map.empty[Int, Int], Long.MaxValue)
val job = jobGen(src, store)
val interval = TestUtil.toTimeInterval(1L, Long.MaxValue)

val scaldingPlatform = Scalding("named options test", options)
val mode: Mode = TestMode(t => (store.sourceToBuffer).get(t))

val flowToPipe = scaldingPlatform
.plan(job)
.apply((interval, mode))
.right
.get
._2

val fd = new FlowDef
val typedPipe = flowToPipe.apply((fd, mode))
def tupleSetter[T] = new TupleSetter[T] {
override def apply(arg: T) = {
val tup = cascading.tuple.Tuple.size(1)
tup.set(0, arg)
tup
}
override def arity = 1
}
val pipe = typedPipe.toPipe(new Fields("0"))(fd, mode, tupleSetter)
val numReducers = pipeConfig(pipe)(ReducerKey).head.toInt
assert(numReducers === expectedReducers)
}

"The ScaldingPlatform" should {
"with same setting on multiple names use the one for the node" in {
val fmReducers = 50
val smReducers = 100

val options = Map(
FlatMapNodeName1 -> Options().set(Reducers(fmReducers)),
SummerNodeName1 -> Options().set(Reducers(smReducers)))

verify(options, smReducers) { (source, store) =>
source
.flatMap(Some(_)).name(FlatMapNodeName1)
.sumByKey(store).name(SummerNodeName1)
}
}

"use named option from the closest node when two names defined one after the other" in {
val smReducers1 = 50
val smReducers2 = 100

val options = Map(
SummerNodeName1 -> Options().set(Reducers(smReducers1)),
SummerNodeName2 -> Options().set(Reducers(smReducers2)))

verify(options, smReducers1) { (source, store) =>
source
.flatMap(Some(_))
.sumByKey(store).name(SummerNodeName1).name(SummerNodeName2)
}
}

"use named option from the upstream node if option not defined on current node" in {
val fmReducers1 = 50
val fmReducers2 = 100

val options = Map(
FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)),
FlatMapNodeName2 -> Options().set(Reducers(fmReducers2)))

verify(options, fmReducers2) { (source, store) =>
source
.flatMap(Some(_)).name(FlatMapNodeName1)
.sumByKey(store).name(SummerNodeName1)
.map { case (k, (optV, v)) => k }.name(FlatMapNodeName2)
.write(IdentitySink)
}
}

"use named option from the upstream node if option not defined on current node, even if upstream node is more than a node apart" in {
val fmReducers1 = 50
val fmReducers2 = 100

val options = Map(
FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)),
FlatMapNodeName2 -> Options().set(Reducers(fmReducers2)))

verify(options, fmReducers2) { (source, store) =>
source
.flatMap(Some(_)).name(FlatMapNodeName1)
.sumByKey(store).name(SummerNodeName1)
.flatMap { case (k, (optV, v)) => Some(k) }
.flatMap { k => List(k, k) }.name(FlatMapNodeName2)
.write(IdentitySink)
}
}

"use named option from the closest upstream node if same option defined on two upstream nodes" in {
val fmReducers1 = 50
val fmReducers2 = 100

val options = Map(
FlatMapNodeName1 -> Options().set(Reducers(fmReducers1)),
FlatMapNodeName2 -> Options().set(Reducers(fmReducers2)))

verify(options, fmReducers1) { (source, store) =>
source
.flatMap(Some(_))
.sumByKey(store).name(SummerNodeName1)
.flatMap { case (k, (optV, v)) => Some(k) }.name(FlatMapNodeName1)
.flatMap { k => List(k, k) }.name(FlatMapNodeName2)
.write(IdentitySink)
}
}

"options propagate backwards" in {
val fmReducers2 = 1000

/**
* Here FlatMapNodeName1 is closer to the summer node but doesn't have Reducers property
* defined so it is picked from FlatMapNodeName2.
*/
val options = Map(
FlatMapNodeName1 -> Options().set(MonoidIsCommutative),
FlatMapNodeName2 -> Options().set(Reducers(fmReducers2)))

verify(options, fmReducers2) { (source, store) =>
source
.flatMap(Some(_))
.sumByKey(store).name(SummerNodeName1)
.flatMap { case (k, (optV, v)) => Some(k) }.name(FlatMapNodeName1)
.flatMap { k => List(k, k) }.name(FlatMapNodeName2)
.write(IdentitySink)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,20 @@

package com.twitter.summingbird.storm

import java.util.{ Map => JMap }

import backtype.storm.generated.StormTopology
import com.twitter.algebird.{ MapAlgebra, Semigroup }
import com.twitter.storehaus.{ ReadableStore, JMapStore }
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.algebird.MapAlgebra
import com.twitter.summingbird._
import com.twitter.summingbird.online._
import com.twitter.summingbird.batch.Batcher
import com.twitter.summingbird.online.option._
import com.twitter.summingbird.storm.option._
import com.twitter.summingbird.batch.{ BatchID, Batcher }
import com.twitter.summingbird.storm.spout.TraversableSpout
import com.twitter.tormenta.spout.Spout
import com.twitter.util.Future
import java.util.{ Collections, HashMap, Map => JMap, UUID }
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.WordSpec
import org.scalacheck._
import org.scalacheck.Prop._
import org.scalacheck.Properties
import org.scalatest.WordSpec

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.{
ArrayBuffer,
HashMap => MutableHashMap,
Map => MutableMap,
SynchronizedBuffer,
SynchronizedMap
}
import scala.collection.mutable.{ HashMap => MutableHashMap, Map => MutableMap }

/**
* Tests for Summingbird's Storm planner.
*/
Expand Down Expand Up @@ -196,4 +182,22 @@ class TopologyTests extends WordSpec {

assert(TDistMap(0).get_common.get_parallelism_hint == 5)
}

"With same setting on multiple names we use the one for the node" in {
val fmNodeName = "flatMapper"
val smNodeName = "summer"
val p = Storm.source(TraversableSpout(sample[List[Int]]))
.flatMap(testFn).name(fmNodeName)
.sumByKey(TestStore.createStore[Int, Int]()._2).name(smNodeName)

val opts = Map(fmNodeName -> Options().set(SummerParallelism(10)),
smNodeName -> Options().set(SummerParallelism(20)))
val storm = Storm.local(opts)
val stormTopo = storm.plan(p).topology
val bolts = stormTopo.get_bolts

// Tail should use parallelism specified for the summer node
assert(bolts("Tail").get_common.get_parallelism_hint == 20)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
private type Prod[T] = Producer[Storm, T]

private[storm] def get[T <: AnyRef: ClassTag](dag: Dag[Storm], node: StormNode): Option[(String, T)] = {
val producer = node.members.last
val producer = node.members.head
Options.getFirst[T](options, dag.producerToPriorityNames(producer))
}

Expand Down