From f048ec2986a58f40f7dfde580c10f4ecca2b7fa1 Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 16:54:30 -0800 Subject: [PATCH 01/14] add some more badges to readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 30b56d26..bfae6599 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ [![Build Status](https://travis-ci.org/winitzki/joinrun-scala.svg?branch=master)](https://travis-ci.org/winitzki/joinrun-scala) [![Coverage Status](https://codecov.io/gh/winitzki/joinrun-scala/coverage.svg?branch=master)](https://codecov.io/gh/winitzki/joinrun-scala?branch=master) +[![Version](http://img.shields.io/badge/version-0.7.7-blue.svg?style=flat)](https://github.com/winitzki/joinrun-scala/releases) +[![License](https://img.shields.io/github/license/mashape/apistatus.svg)](https://opensource.org/licenses/MIT) [![Join the chat at https://gitter.im/joinrun-scala/Lobby](https://badges.gitter.im/joinrun-scala/Lobby.svg)](https://gitter.im/joinrun-scala/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) From 35a6ed37ffc0398754ae272580cf4c63f1383c85 Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 17:10:01 -0800 Subject: [PATCH 02/14] clean up some tests --- .../scala/code/winitzki/jc/JoinRunSpec.scala | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala index a7f944ed..9a173c61 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala @@ -2,15 +2,18 @@ package code.winitzki.jc import JoinRun._ import org.scalatest.concurrent.TimeLimitedTests -import org.scalatest.concurrent.Waiters.Waiter +import org.scalatest.concurrent.Waiters.{PatienceConfig, Waiter} import org.scalatest.time.{Millis, Span} import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + import scala.concurrent.duration._ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with BeforeAndAfterEach { var tp0: Pool = _ + implicit val patienceConfig = PatienceConfig(timeout = Span(500, Millis)) + override def beforeEach(): Unit = { tp0 = new FixedPool(4) } @@ -40,7 +43,7 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo b.isBound shouldEqual false c.isBound shouldEqual false - join(tp0)(runSimple { case a(_) + c(_) => b() }) + join(runSimple { case a(_) + c(_) => b() }) a.isBound shouldEqual true b.isBound shouldEqual false @@ -63,12 +66,27 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo join(tp0)(runSimple { case a(_) + b(_) + c(_) => }) a.logSoup shouldEqual "Join{a + b + c => ...}\nNo molecules" + } + + it should "correctly list molecules present in soup" in { + val a = new M[Unit]("a") + val b = new M[Unit]("b") + val c = new M[Unit]("c") + val f = new B[Unit, Unit]("f") + + join(tp0)( + runSimple { case a(_) + b(_) + c(_) + f(_, r) => r() } + ) + a.logSoup shouldEqual "Join{a + b + c + f/B => ...}\nNo molecules" + a() a() b() - waitSome() - waitSome() - a.logSoup shouldEqual "Join{a + b + c => ...}\nMolecules: a() * 2, b()" + Thread.sleep(400) + a.logSoup shouldEqual "Join{a + b + c + f/B => ...}\nMolecules: a() * 2, b()" + c() + f() + a.logSoup shouldEqual "Join{a + b + c + f/B => ...}\nMolecules: a()" } it should "define a reaction with correct inputs with non-default pattern-matching at end of reaction" in { @@ -151,8 +169,7 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo it should "throw exception when join pattern is nonlinear" in { val thrown = intercept[Exception] { val a = new M[Unit]("a") - join(tp0)( runSimple { case a(_) + a(_) => () }) - a() + join( runSimple { case a(_) + a(_) => () }) } thrown.getMessage shouldEqual "Nonlinear pattern: a used twice" @@ -161,8 +178,7 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo it should "throw exception when join pattern is nonlinear, with blocking molecule" in { val thrown = intercept[Exception] { val a = new B[Unit,Unit]("a") - join(tp0)( runSimple { case a(_,r) + a(_,s) => () }) - a() + join( runSimple { case a(_,r) + a(_,s) => () }) } thrown.getMessage shouldEqual "Nonlinear pattern: a/B used twice" } @@ -227,7 +243,6 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo join(tp0)( runSimple { case a(x) + b(0) => a(x+1) }, runSimple { case a(z) + f(_, r) => r(z) }) a(1) b(2) - waitSome() f() shouldEqual 1 } From 5579ba7920ce8340de1585388ce466bf7bf4c146 Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 17:10:17 -0800 Subject: [PATCH 03/14] try getting rid of warnings, and add more stringent warnings --- build.sbt | 19 +++++++++++- .../code/winitzki/jc/JoinDefinition.scala | 2 ++ .../main/scala/code/winitzki/jc/JoinRun.scala | 7 +++-- .../main/scala/code/winitzki/jc/Library.scala | 6 ++-- .../scala/code/winitzki/jc/MutableBag.scala | 31 ++++++++++++------- .../main/scala/code/winitzki/jc/Pool.scala | 5 +-- .../winitzki/jc/JoinRunBlockingSpec.scala | 4 +-- .../main/scala/code/winitzki/jc/Macros.scala | 9 ++++-- .../scala/code/winitzki/jc/MacrosSpec.scala | 1 + 9 files changed, 58 insertions(+), 26 deletions(-) diff --git a/build.sbt b/build.sbt index 367c3fb0..cc34b09f 100644 --- a/build.sbt +++ b/build.sbt @@ -21,7 +21,24 @@ val commonSettings = Defaults.coreDefaultSettings ++ Seq( resolvers += Resolver.sonatypeRepo("snapshots"), resolvers += Resolver.sonatypeRepo("releases"), resolvers += "Typesafe releases" at "http://repo.typesafe.com/typesafe/releases", - scalacOptions ++= Seq() + + scalacOptions ++= Seq( // https://tpolecat.github.io/2014/04/11/scalac-flags.html + "-deprecation", + "-encoding", "UTF-8", // yes, this is 2 args + "-feature", + "-language:existentials", + "-language:higherKinds", + "-language:implicitConversions", + "-unchecked", + // "-Xfatal-warnings", + "-Xlint", + "-Yno-adapted-args", // Makes Reaction JoinRun.object + fail + "-Ywarn-dead-code", // N.B. doesn't work well with the ??? hole + "-Ywarn-numeric-widen", + "-Ywarn-value-discard", + // "-Xfuture", // Makes Benchmarks code fail + "-Ywarn-unused-import" // 2.11 only + ) ) tutSettings diff --git a/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala b/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala index 4532a9cb..2ba30a47 100644 --- a/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala +++ b/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala @@ -69,6 +69,7 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, private[jc] def setQuiescenceCallback(callback: M[Unit]): Unit = { quiescenceCallbacks.add(callback) + () } private lazy val possibleReactions: Map[Molecule, Seq[Reaction]] = reactionInfos.toSeq @@ -271,6 +272,7 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, else joinPool.runClosure(buildInjectClosure(m, molValue), currentReactionInfo.getOrElse(emptyReactionInfo)) } + () } // Remove a blocking molecule if it is present. diff --git a/lib/src/main/scala/code/winitzki/jc/JoinRun.scala b/lib/src/main/scala/code/winitzki/jc/JoinRun.scala index ed44f5de..a303852f 100644 --- a/lib/src/main/scala/code/winitzki/jc/JoinRun.scala +++ b/lib/src/main/scala/code/winitzki/jc/JoinRun.scala @@ -283,7 +283,7 @@ object JoinRun { * @return an unapply operation */ object + { - def unapply(attr:Any) = Some(attr,attr) + def unapply(attr:Any): Option[(Any, Any)] = Some((attr,attr)) } /** Create a reaction value out of a simple reaction body. @@ -592,7 +592,10 @@ object JoinRun { private val errorLog: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() - private[jc] def reportError(message: String): Unit = errorLog.add(message) + private[jc] def reportError(message: String): Unit = { + errorLog.add(message) + () + } def errors = errorLog.iterator().asScala.toIterable diff --git a/lib/src/main/scala/code/winitzki/jc/Library.scala b/lib/src/main/scala/code/winitzki/jc/Library.scala index 5cf46bcf..8fc94a44 100644 --- a/lib/src/main/scala/code/winitzki/jc/Library.scala +++ b/lib/src/main/scala/code/winitzki/jc/Library.scala @@ -19,7 +19,7 @@ object Library { val p = Promise[T]() join(pool,pool)( - runSimple { case f(x) => p.success(x) } + runSimple { case f(x) => p.success(x); () } ) (f, p.future) } @@ -53,9 +53,9 @@ object Library { } } - def withPool[B](pool: => Pool)(doWork: Pool => B): Try[B] = cleanup(pool)(_.shutdownNow())(doWork) + def withPool[T](pool: => Pool)(doWork: Pool => T): Try[T] = cleanup(pool)(_.shutdownNow())(doWork) - def cleanup[A,B](resource: => A)(cleanup: A => Unit)(doWork: A => B): Try[B] = { + def cleanup[T,R](resource: => T)(cleanup: T => Unit)(doWork: T => R): Try[R] = { try { Success(doWork(resource)) } catch { diff --git a/lib/src/main/scala/code/winitzki/jc/MutableBag.scala b/lib/src/main/scala/code/winitzki/jc/MutableBag.scala index b6480438..d538e0d1 100644 --- a/lib/src/main/scala/code/winitzki/jc/MutableBag.scala +++ b/lib/src/main/scala/code/winitzki/jc/MutableBag.scala @@ -56,12 +56,15 @@ class MutableBag[K,V] { def getOne(k: K): Option[V] = bag.get(k).flatMap(_.headOption.map(_._1)) - def addToBag(k: K, v: V): Unit = bag.get(k) match { - case Some(vs) => - val newCount = vs.getOrElse(v, 0) + 1 - vs += (v -> newCount) - - case None => bag += (k -> mutable.Map(v -> 1)) + def addToBag(k: K, v: V): Unit = { + bag.get(k) match { + case Some(vs) => + val newCount = vs.getOrElse(v, 0) + 1 + vs += (v -> newCount) + + case None => bag += (k -> mutable.Map(v -> 1)) + } + () } def removeFromBag(k: K, v: V): Unit = bag.get(k).foreach { vs => @@ -92,12 +95,15 @@ class ConcurrentMutableBag[K,V] { def getOne(k: K): Option[V] = getMap.get(k).flatMap(_.headOption.map(_._1)) - def addToBag(k: K, v: V): Unit = if (bagConcurrentMap.containsKey(k)) { - val vs = bagConcurrentMap.get(k) - val newCount = vs.getOrDefault(v, 0) + 1 - vs.put(v, newCount) - } else { - bagConcurrentMap.put(k, new ConcurrentHashMap[V, Int](Map(v -> 1).asJava)) + def addToBag(k: K, v: V): Unit = { + if (bagConcurrentMap.containsKey(k)) { + val vs = bagConcurrentMap.get(k) + val newCount = vs.getOrDefault(v, 0) + 1 + vs.put(v, newCount) + } else { + bagConcurrentMap.put(k, new ConcurrentHashMap[V, Int](Map(v -> 1).asJava)) + } + () } def removeFromBag(k: K, v: V): Unit = if (bagConcurrentMap.containsKey(k)) { @@ -109,6 +115,7 @@ class ConcurrentMutableBag[K,V] { vs.put(v, newCount) if (vs.isEmpty) bagConcurrentMap.remove(k) + () } def removeFromBag(anotherBag: mutable.Map[K,V]): Unit = diff --git a/lib/src/main/scala/code/winitzki/jc/Pool.scala b/lib/src/main/scala/code/winitzki/jc/Pool.scala index 408ae678..e97f19d9 100644 --- a/lib/src/main/scala/code/winitzki/jc/Pool.scala +++ b/lib/src/main/scala/code/winitzki/jc/Pool.scala @@ -33,9 +33,9 @@ trait Pool { private[jc] class PoolExecutor(threads: Int = 8, execFactory: Int => ExecutorService) extends Pool { protected val execService = execFactory(threads) - val sleepTime = 200 + val sleepTime = 200L - def shutdownNow() = new Thread { + def shutdownNow(): Unit = new Thread { try{ execService.shutdown() execService.awaitTermination(sleepTime, TimeUnit.MILLISECONDS) @@ -44,6 +44,7 @@ private[jc] class PoolExecutor(threads: Int = 8, execFactory: Int => ExecutorSer execService.awaitTermination(sleepTime, TimeUnit.MILLISECONDS) execService.shutdownNow() } + () } def runClosure(closure: => Unit, info: ReactionInfo): Unit = diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala index 59eddc0c..6ebf65b5 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala @@ -261,11 +261,9 @@ class JoinRunBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests w ) g2() shouldEqual 1 // this should initially work d() // do not inject c(). Now the first reaction is blocked because second reaction cannot start. - waitSome() - g2(timeout = 100 millis)() shouldEqual None // this should be blocked now + g2(timeout = 300 millis)() shouldEqual None // this should be blocked now tp.shutdownNow() tp1.shutdownNow() - } def makeBlockingCheck(sleeping: => Unit, tp1: Pool): (B[Unit,Unit], B[Unit,Int]) = { diff --git a/macros/src/main/scala/code/winitzki/jc/Macros.scala b/macros/src/main/scala/code/winitzki/jc/Macros.scala index d46db455..544d8c02 100644 --- a/macros/src/main/scala/code/winitzki/jc/Macros.scala +++ b/macros/src/main/scala/code/winitzki/jc/Macros.scala @@ -142,10 +142,11 @@ object Macros { case DefDef(_, TermName("applyOrElse"), _, _, _, Match(_, list)) => info = list - // this is matched by a closure which is not a partial function + // this is matched by a closure which is not a partial function. Not used now. + /* case Function(List(ValDef(_, TermName(_), TypeTree(), EmptyTree)), Match(Ident(TermName(_)), list)) => info = list - + */ case _ => super.traverse(tree) } @@ -243,10 +244,12 @@ object Macros { // After traversing the subtrees, we append this molecule information. inputMolecules.append((t.symbol, flag1, Some(flag2), getSha1(binder1))) - // matcher with wrong number of arguments - neither 1 nor 2 + // matcher with wrong number of arguments - neither 1 nor 2. This seems to be never called. + /* case UnApply(Apply(Select(t@Ident(TermName(_)), TermName("unapply")), List(Ident(TermName("")))), _) if t.tpe <:< typeOf[Molecule] => inputMolecules.append((t.symbol, WrongReplyVarF, None, getSha1(t))) + */ // possibly a molecule injection case Apply(Select(t@Ident(TermName(_)), TermName("apply")), binder) => diff --git a/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala b/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala index bdfdc66c..dbc51710 100644 --- a/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala +++ b/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala @@ -378,6 +378,7 @@ class MacrosSpec extends FlatSpec with Matchers with BeforeAndAfterEach { "val r = & { case e() => }" shouldNot compile // no pattern variable in a non-blocking molecule "e" "val r = & { case e(_,_) => }" shouldNot compile // two pattern variables in a non-blocking molecule "e" + "val r = & { case e(_,_,_) => }" shouldNot compile // two pattern variables in a non-blocking molecule "e" "val r = & { case a() => }" shouldNot compile // no pattern variable for reply in "a" "val r = & { case a(_) => }" shouldNot compile // no pattern variable for reply in "a" From b3aaa7a81eb9088af7c9b279ecf2323d39542978 Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 17:50:16 -0800 Subject: [PATCH 04/14] simplify MapReduceSpec --- .../code/winitzki/benchmark/MapReduceSpec.scala | 8 ++++---- docs/chymyst04.md | 16 +++++++++------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/benchmark/src/test/scala/code/winitzki/benchmark/MapReduceSpec.scala b/benchmark/src/test/scala/code/winitzki/benchmark/MapReduceSpec.scala index afc26e5c..a645c75d 100644 --- a/benchmark/src/test/scala/code/winitzki/benchmark/MapReduceSpec.scala +++ b/benchmark/src/test/scala/code/winitzki/benchmark/MapReduceSpec.scala @@ -52,17 +52,17 @@ class MapReduceSpec extends FlatSpec with Matchers { val fetch = b[Unit,Int] val tp = new FixedPool(4) + // declare the reaction for "map" join(tp)( - & { case carrier(a) => val res = f(a); interm(res) } + & { case carrier(x) => val res = f(x); interm(res) } ) // reactions for "reduce" must be together since they share "accum" join(tp)( - & { case accum((n, b)) + interm(res) if n > 0 => + & { case accum((n, b)) + interm(res) => accum((n+1, reduceB(b, res) )) }, - & { case accum((0, _)) + interm(res) => accum((1, res)) }, & { case accum((n, b)) + fetch(_, reply) if n == arr.size => reply(b) } ) @@ -70,7 +70,7 @@ class MapReduceSpec extends FlatSpec with Matchers { accum((0, 0)) arr.foreach(i => carrier(i)) val result = fetch() - result shouldEqual 338350 + result shouldEqual arr.map(f).reduce(reduceB) // 338350 } } diff --git a/docs/chymyst04.md b/docs/chymyst04.md index 2fe84cb5..3320b45f 100644 --- a/docs/chymyst04.md +++ b/docs/chymyst04.md @@ -62,7 +62,7 @@ val interm = m[B] Therefore, we need a reaction of this shape: ```scala -run { case carrier(a) => val res = f(a); interm(res) } +run { case carrier(x) => val res = f(x); interm(res) } ``` @@ -100,20 +100,22 @@ Let us change the type of `accum` to carry a tuple `(Int, B)`. The first element of the tuple will now represent a counter, which indicates how many intermediate results we have already processed. Reactions with `accum` will increment the counter; the reaction with `fetch` will proceed only if the counter is equal to the length of the array. -We will also include a condition on the counter that will start the accumulation when the counter is equal to 0. - ```scala val accum = m[(Int, B)] -run { case accum((n, b)) + interm(res) if n > 0 => +run { case accum((n, b)) + interm(res) => accum((n+1, reduceB(b, res) )) }, -run { case accum((0, _)) + interm(res) => accum((1, res)) }, run { case accum((n, b)) + fetch(_, reply) if n == arr.size => reply(b) } ``` -We can now inject all `carrier` molecules, a single `accum((0, null))` molecule, and a `fetch()` molecule. +What value should we inject with `accum` initially? +When the first `interm(res)` molecule arrives, we will need to call `reduceB(x, res)` with some value `x` of type `B`. +Since we assume that `B` is a monoid, there must be a special value, say `bZero`, such that `reduceB(bZero, res)==res`. +So `bZero` is the value we need to inject on the initial `accum` molecule. + +We can now inject all `carrier` molecules, a single `accum((0, bZero))` molecule, and a `fetch()` molecule. Because of the guard condition, the reaction with `fetch()` will not run until all intermediate results have been accumulated. Here is the complete code for this example (see also `MapReduceSpec.scala` in the unit tests). @@ -139,7 +141,7 @@ object C extends App { // declare the reaction for "map" join( - run { case carrier(a) => val res = f(a); interm(res) } + run { case carrier(x) => val res = f(x); interm(res) } ) // reactions for "reduce" must be together since they share "accum" From b4aee6a8f8de751b49b431073706c74399976eb6 Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 17:50:32 -0800 Subject: [PATCH 05/14] add warning flags to scala.sbt, try to avoid some warnings in code --- .../code/winitzki/benchmark/Benchmarks1.scala | 2 +- .../scala/code/winitzki/benchmark/Common.scala | 2 +- build.sbt | 6 +++--- .../code/winitzki/test/MoreBlockingSpec.scala | 3 ++- .../scala/code/winitzki/test/ParallelOrSpec.scala | 1 + .../code/winitzki/test/SingletonMoleculeSpec.scala | 3 ++- .../code/winitzki/test/StaticAnalysisSpec.scala | 2 +- lib/src/main/scala/code/winitzki/jc/Pool.scala | 9 ++++++--- .../main/scala/code/winitzki/jc/SmartPool.scala | 12 +++++++----- .../code/winitzki/jc/JoinRunBlockingSpec.scala | 5 +++-- .../test/scala/code/winitzki/jc/JoinRunSpec.scala | 14 +++++++++++++- .../scala/code/winitzki/jc/JoinRunUtilsSpec.scala | 2 +- .../test/scala/code/winitzki/jc/LibrarySpec.scala | 2 +- .../test/scala/code/winitzki/jc/MacrosSpec.scala | 3 ++- 14 files changed, 44 insertions(+), 22 deletions(-) diff --git a/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks1.scala b/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks1.scala index 87816d16..5776d516 100644 --- a/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks1.scala +++ b/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks1.scala @@ -3,7 +3,7 @@ package code.winitzki.benchmark import java.time.LocalDateTime import code.winitzki.benchmark.Common._ -import code.winitzki.jc.{Pool, FixedPool} +import code.winitzki.jc.Pool import code.winitzki.jc.Macros._ import code.winitzki.jc.JoinRun._ diff --git a/benchmark/src/main/scala/code/winitzki/benchmark/Common.scala b/benchmark/src/main/scala/code/winitzki/benchmark/Common.scala index fe137afc..2f1979e5 100644 --- a/benchmark/src/main/scala/code/winitzki/benchmark/Common.scala +++ b/benchmark/src/main/scala/code/winitzki/benchmark/Common.scala @@ -4,7 +4,7 @@ import java.time.LocalDateTime import java.time.temporal.ChronoUnit object Common { - val warmupTimeMs = 50 + val warmupTimeMs = 50L def elapsed(initTime: LocalDateTime): Long = initTime.until(LocalDateTime.now, ChronoUnit.MILLIS) diff --git a/build.sbt b/build.sbt index cc34b09f..e9dc8a51 100644 --- a/build.sbt +++ b/build.sbt @@ -23,16 +23,16 @@ val commonSettings = Defaults.coreDefaultSettings ++ Seq( resolvers += "Typesafe releases" at "http://repo.typesafe.com/typesafe/releases", scalacOptions ++= Seq( // https://tpolecat.github.io/2014/04/11/scalac-flags.html - "-deprecation", +// "-deprecation", + "-unchecked", "-encoding", "UTF-8", // yes, this is 2 args "-feature", "-language:existentials", "-language:higherKinds", "-language:implicitConversions", - "-unchecked", // "-Xfatal-warnings", "-Xlint", - "-Yno-adapted-args", // Makes Reaction JoinRun.object + fail +// "-Yno-adapted-args", // Makes calling a() fail to substitute a Unit argument into a.apply(x: Unit) "-Ywarn-dead-code", // N.B. doesn't work well with the ??? hole "-Ywarn-numeric-widen", "-Ywarn-value-discard", diff --git a/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala b/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala index ab72d4d0..7c9a864b 100644 --- a/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala @@ -10,6 +10,7 @@ import org.scalatest.concurrent.Waiters.Waiter import org.scalatest.time.{Millis, Span} import scala.concurrent.duration._ +import scala.language.postfixOps class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests { @@ -70,7 +71,7 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests { collect(0) val numberOfFailures = (1 to 10000).map { _ => - if (f(timeout = 1000.millis)().isEmpty) 1 else 0 + if (f(timeout = 1000 millis)().isEmpty) 1 else 0 }.sum // we used to have about 4% numberOfFailures (but we get zero failures if we do not nullify the semaphore!) and about 4 numberOfFalseReplies in 100,000. diff --git a/joinrun/src/test/scala/code/winitzki/test/ParallelOrSpec.scala b/joinrun/src/test/scala/code/winitzki/test/ParallelOrSpec.scala index 657d9872..9b1af921 100644 --- a/joinrun/src/test/scala/code/winitzki/test/ParallelOrSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/ParallelOrSpec.scala @@ -8,6 +8,7 @@ import org.scalatest.{FlatSpec, Matchers} import scala.annotation.tailrec import scala.concurrent.duration._ +import scala.language.postfixOps import scala.reflect.ClassTag class ParallelOrSpec extends FlatSpec with Matchers { diff --git a/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala b/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala index 60d3c065..c5f60703 100644 --- a/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala @@ -3,10 +3,11 @@ package code.winitzki.test import code.winitzki.jc.JoinRun._ import code.winitzki.jc.Macros.{run => &} import code.winitzki.jc.Macros._ -import code.winitzki.jc.{CachedPool, FixedPool, SmartPool} +import code.winitzki.jc.{FixedPool, SmartPool} import org.scalatest.concurrent.TimeLimitedTests import org.scalatest.time.{Millis, Span} import org.scalatest.{FlatSpec, Matchers} +import scala.language.postfixOps import scala.concurrent.duration._ diff --git a/joinrun/src/test/scala/code/winitzki/test/StaticAnalysisSpec.scala b/joinrun/src/test/scala/code/winitzki/test/StaticAnalysisSpec.scala index bfc1c9e0..5f6b9612 100644 --- a/joinrun/src/test/scala/code/winitzki/test/StaticAnalysisSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/StaticAnalysisSpec.scala @@ -12,7 +12,7 @@ class StaticAnalysisSpec extends FlatSpec with Matchers with TimeLimitedTests { val timeLimit = Span(1000, Millis) - val warmupTimeMs = 50 + val warmupTimeMs = 50L def waitSome(): Unit = Thread.sleep(warmupTimeMs) diff --git a/lib/src/main/scala/code/winitzki/jc/Pool.scala b/lib/src/main/scala/code/winitzki/jc/Pool.scala index e97f19d9..59fcb591 100644 --- a/lib/src/main/scala/code/winitzki/jc/Pool.scala +++ b/lib/src/main/scala/code/winitzki/jc/Pool.scala @@ -6,6 +6,7 @@ import java.util.concurrent._ import code.winitzki.jc.JoinRun.ReactionInfo import scala.concurrent.{ExecutionContext, Future} +import scala.language.reflectiveCalls class CachedPool(threads: Int) extends PoolExecutor(threads, t => new ThreadPoolExecutor(1, t, 1L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], new ThreadFactoryWithInfo) @@ -43,9 +44,9 @@ private[jc] class PoolExecutor(threads: Int = 8, execFactory: Int => ExecutorSer execService.shutdownNow() execService.awaitTermination(sleepTime, TimeUnit.MILLISECONDS) execService.shutdownNow() + () } - () - } + }.run() def runClosure(closure: => Unit, info: ReactionInfo): Unit = execService.execute(new RunnableWithInfo(closure, info)) @@ -57,8 +58,10 @@ private[jc] class PoolExecutor(threads: Int = 8, execFactory: Int => ExecutorSer private[jc] class PoolFutureExecutor(threads: Int = 8, execFactory: Int => ExecutorService) extends PoolExecutor(threads, execFactory) { private val execContext = ExecutionContext.fromExecutor(execService) - override def runClosure(closure: => Unit, info: ReactionInfo): Unit = + override def runClosure(closure: => Unit, info: ReactionInfo): Unit = { Future { closure }(execContext) + () + } } /** Create a pool from a Handler interface. The pool will submit tasks using a Handler.post() method. diff --git a/lib/src/main/scala/code/winitzki/jc/SmartPool.scala b/lib/src/main/scala/code/winitzki/jc/SmartPool.scala index 4da6c7f7..40d4b715 100644 --- a/lib/src/main/scala/code/winitzki/jc/SmartPool.scala +++ b/lib/src/main/scala/code/winitzki/jc/SmartPool.scala @@ -48,8 +48,8 @@ class SmartPool(parallelism: Int) extends Pool { private val queue = new LinkedBlockingQueue[Runnable](maxQueueCapacity) val initialThreads: Int = parallelism - val secondsToRecycleThread = 1 - val shutdownWaitTimeMs = 200 + val secondsToRecycleThread = 1L + val shutdownWaitTimeMs = 200L private val executor = new ThreadPoolExecutor(initialThreads, parallelism, secondsToRecycleThread, TimeUnit.SECONDS, queue, newThreadFactory) @@ -61,12 +61,14 @@ class SmartPool(parallelism: Int) extends Pool { executor.shutdownNow() executor.awaitTermination(shutdownWaitTimeMs, TimeUnit.MILLISECONDS) executor.shutdownNow() - + () } - } + }.run() - override def runClosure(closure: => Unit, info: ReactionInfo): Unit = + override def runClosure(closure: => Unit, info: ReactionInfo): Unit = { executor.submit(new RunnableWithInfo(closure, info)) + () + } override def isInactive: Boolean = executor.isShutdown || executor.isTerminated } diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala index 6ebf65b5..391daf3a 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala @@ -4,7 +4,8 @@ import JoinRun._ import Library.withPool import org.scalatest.concurrent.TimeLimitedTests import org.scalatest.time.{Millis, Span} -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers} +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} +import scala.language.postfixOps import scala.concurrent.duration.DurationInt @@ -25,7 +26,7 @@ class JoinRunBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests w val timeLimit = Span(1500, Millis) - val warmupTimeMs = 50 + val warmupTimeMs = 50L def waitSome(): Unit = Thread.sleep(warmupTimeMs) diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala index 9a173c61..005a79c2 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala @@ -7,6 +7,7 @@ import org.scalatest.time.{Millis, Span} import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} import scala.concurrent.duration._ +import scala.language.postfixOps class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with BeforeAndAfterEach { @@ -24,7 +25,7 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo val timeLimit = Span(5000, Millis) - val warmupTimeMs = 50 + val warmupTimeMs = 50L def waitSome(): Unit = Thread.sleep(warmupTimeMs) @@ -141,6 +142,17 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo waiter.await() } + it should "start a reaction with one input with Nothing in the molecule" in { + + val waiter = new Waiter + + val a = new M[Nothing]("a") + join(tp0)( runSimple { case a(_) => waiter.dismiss() }) + + a(_) + waiter.await() + } + it should "start a simple reaction chain" in { val waiter = new Waiter diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunUtilsSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunUtilsSpec.scala index 46ccb358..56041520 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunUtilsSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunUtilsSpec.scala @@ -9,7 +9,7 @@ class JoinRunUtilsSpec extends FlatSpec with Matchers with TimeLimitedTests { val timeLimit = Span(500, Millis) - val warmupTimeMs = 50 + val warmupTimeMs = 50L def waitSome(): Unit = Thread.sleep(warmupTimeMs) diff --git a/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala b/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala index b1dffc81..f7d563a7 100644 --- a/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala @@ -15,7 +15,7 @@ class LibrarySpec extends FlatSpec with Matchers with TimeLimitedTests { val timeLimit = Span(500, Millis) - val warmupTimeMs = 50 + val warmupTimeMs = 50L val patienceConfig = PatienceConfig(timeout = Span(500, Millis)) diff --git a/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala b/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala index dbc51710..47e4a9ca 100644 --- a/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala +++ b/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala @@ -5,10 +5,11 @@ import Macros.{getName, rawTree, m,b, run => &} import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} import scala.concurrent.duration.DurationInt +import scala.language.postfixOps class MacrosSpec extends FlatSpec with Matchers with BeforeAndAfterEach { - val warmupTimeMs = 50 + val warmupTimeMs = 50L var tp0: Pool = _ From f45a8f48d8f098ae7f9c3eac7a64bd69a5270d35 Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 18:01:51 -0800 Subject: [PATCH 06/14] remove the non-working test with Nothing --- lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala index 005a79c2..29f8fc83 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala @@ -142,17 +142,6 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo waiter.await() } - it should "start a reaction with one input with Nothing in the molecule" in { - - val waiter = new Waiter - - val a = new M[Nothing]("a") - join(tp0)( runSimple { case a(_) => waiter.dismiss() }) - - a(_) - waiter.await() - } - it should "start a simple reaction chain" in { val waiter = new Waiter From 7fba4b06cf9417bd1ba33322e4982072f52b292d Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 18:02:21 -0800 Subject: [PATCH 07/14] avoid some more warnings --- .../src/test/scala/code/winitzki/benchmark/MergesortSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/src/test/scala/code/winitzki/benchmark/MergesortSpec.scala b/benchmark/src/test/scala/code/winitzki/benchmark/MergesortSpec.scala index 2d9cb03b..737cbc2b 100644 --- a/benchmark/src/test/scala/code/winitzki/benchmark/MergesortSpec.scala +++ b/benchmark/src/test/scala/code/winitzki/benchmark/MergesortSpec.scala @@ -81,7 +81,7 @@ class MergesortSpec extends FlatSpec with Matchers { ) // inject lower-level mergesort - mergesort(part1, sorted1) + mergesort(part2, sorted2) + mergesort((part1, sorted1)) + mergesort((part2, sorted2)) } } ) From 883f85d12d869e8310f6393530d5c3e480f8826b Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 18:02:34 -0800 Subject: [PATCH 08/14] change the reaction body type so that it returns Any --- lib/src/main/scala/code/winitzki/jc/JoinRun.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/main/scala/code/winitzki/jc/JoinRun.scala b/lib/src/main/scala/code/winitzki/jc/JoinRun.scala index a303852f..ae16c1e3 100644 --- a/lib/src/main/scala/code/winitzki/jc/JoinRun.scala +++ b/lib/src/main/scala/code/winitzki/jc/JoinRun.scala @@ -568,7 +568,7 @@ object JoinRun { /** Type alias for reaction body. * */ - private[jc] type ReactionBody = PartialFunction[UnapplyArg, Unit] + private[jc] type ReactionBody = PartialFunction[UnapplyArg, Any] def join(rs: Reaction*): WarningsAndErrors = join(defaultReactionPool, defaultJoinPool)(rs: _*) def join(reactionPool: Pool)(rs: Reaction*): WarningsAndErrors = join(reactionPool, reactionPool)(rs: _*) From 6c5d634177cb724af8b79479048efb3be46fac61 Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 18:08:29 -0800 Subject: [PATCH 09/14] get rid of the last real warnings --- .../scala/code/winitzki/benchmark/Benchmarks9.scala | 2 +- .../scala/code/winitzki/benchmark/JiansenJoin.scala | 4 +++- .../main/scala/code/winitzki/benchmark/MainApp.scala | 12 ++++++++---- .../scala/code/winitzki/test/MoreBlockingSpec.scala | 4 ++-- .../src/test/scala/code/winitzki/jc/MacrosSpec.scala | 4 ++-- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks9.scala b/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks9.scala index 08bee6df..0205263a 100644 --- a/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks9.scala +++ b/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks9.scala @@ -86,7 +86,7 @@ object Benchmarks9 { val all_done = m[Int] val f = b[LocalDateTime,Long] - val tp = new SmartPool(MainApp.threads) // this benchmark will not work with a fixed pool + val tp = new SmartPool(MainAppConfig.threads) // this benchmark will not work with a fixed pool join(tp)( run { case all_done(0) + f(tInit, r) => r(elapsed(tInit)) }, diff --git a/benchmark/src/main/scala/code/winitzki/benchmark/JiansenJoin.scala b/benchmark/src/main/scala/code/winitzki/benchmark/JiansenJoin.scala index b2eb4b23..5b331d49 100644 --- a/benchmark/src/main/scala/code/winitzki/benchmark/JiansenJoin.scala +++ b/benchmark/src/main/scala/code/winitzki/benchmark/JiansenJoin.scala @@ -5,6 +5,8 @@ This file is not a part of JoinRun. Only the benchmark application uses this fil for performance comparisons between JoinRun and ScalaJoin. Some minor changes were made to accommodate updates in the Scala standard library since 2011. + +This code is not being maintained. Compiling this file will give lots of warnings. */ /** A module providing constracts for join patterns. @@ -267,7 +269,7 @@ class SynName[Arg, R](implicit owner: Join, argT:ClassManifest[Arg], resT:ClassM object and{ // def unapply(attr:(Set[NameBase], Queue[(NameBase, Any)], PartialFunction[Any, Any], Int)) = { def unapply(attr:Any) = { - Some(attr,attr) + Some((attr,attr)) } } diff --git a/benchmark/src/main/scala/code/winitzki/benchmark/MainApp.scala b/benchmark/src/main/scala/code/winitzki/benchmark/MainApp.scala index 0a6a8f7e..1db6b99b 100644 --- a/benchmark/src/main/scala/code/winitzki/benchmark/MainApp.scala +++ b/benchmark/src/main/scala/code/winitzki/benchmark/MainApp.scala @@ -7,7 +7,15 @@ import code.winitzki.benchmark.Benchmarks9._ import code.winitzki.jc.{FixedPool, Pool} import code.winitzki.jc.JoinRun.{defaultJoinPool, defaultReactionPool} +object MainAppConfig { + + val n = 50000 + + val threads = 8 +} + object MainApp extends App { + import MainAppConfig._ val version = "0.0.5" def run3times(task: => Long): Long = { @@ -24,10 +32,6 @@ object MainApp extends App { (result + prime2 + 1) / 2 } - val n = 50000 - - val threads = 8 - println(s"Benchmark parameters: count to $n, threads = $threads") Seq[(String, (Int, Pool) => Long)]( diff --git a/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala b/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala index 7c9a864b..cf69df41 100644 --- a/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala @@ -1,6 +1,6 @@ package code.winitzki.test -import code.winitzki.jc.{FixedPool, SmartPool} +import code.winitzki.jc.FixedPool import code.winitzki.jc.JoinRun._ import code.winitzki.jc.Macros.{run => &} import code.winitzki.jc.Macros._ @@ -78,7 +78,7 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests { // now it seems to pass even with a million iterations (but that's too long for Travis). val numberOfFalseReplies = get() - (numberOfFailures, numberOfFalseReplies) shouldEqual (0,0) + (numberOfFailures, numberOfFalseReplies) shouldEqual ((0,0)) tp.shutdownNow() } diff --git a/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala b/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala index 47e4a9ca..d9cf0c26 100644 --- a/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala +++ b/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala @@ -559,11 +559,11 @@ class MacrosSpec extends FlatSpec with Matchers with BeforeAndAfterEach { x shouldEqual "x" - val y = { + val (y1,y2) = { val z = getName (z, getName) } - y shouldEqual("z", "y") + (y1,y2) shouldEqual(("z", "y")) } it should "correctly recognize nested injections of non-blocking molecules" in { From cfb6c07e919daf2c4096a3ac5ae04e21fe43f44b Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 18:17:27 -0800 Subject: [PATCH 10/14] fixing MacroSpec's incorrect test --- .../code/winitzki/benchmark/JiansenFairnessSpec.scala | 2 +- .../test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala | 2 +- macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala | 8 +++++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/benchmark/src/test/scala/code/winitzki/benchmark/JiansenFairnessSpec.scala b/benchmark/src/test/scala/code/winitzki/benchmark/JiansenFairnessSpec.scala index 3fe965a3..2c948150 100644 --- a/benchmark/src/test/scala/code/winitzki/benchmark/JiansenFairnessSpec.scala +++ b/benchmark/src/test/scala/code/winitzki/benchmark/JiansenFairnessSpec.scala @@ -7,7 +7,7 @@ import org.scalatest.{FlatSpec, Matchers} class JiansenFairnessSpec extends FlatSpec with Matchers with TimeLimitedTests { - val timeLimit = Span(500, Millis) + val timeLimit = Span(2000, Millis) // fairness over reactions: // We have n molecules A:JA[Unit], which can all interact with a single molecule C:JA[(Int,Array[Int])]. diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala index 391daf3a..c707b432 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala @@ -316,7 +316,7 @@ class JoinRunBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests w it should "not block the smart threadpool with BlockingIdle(Thread.sleep)" in { val tp = new SmartPool(1) val (g, g2) = makeBlockingCheck(BlockingIdle{Thread.sleep(500)}, tp) - g2(timeout = 50 millis)() shouldEqual Some(1) // this should not be blocked + g2(timeout = 150 millis)() shouldEqual Some(1) // this should not be blocked tp.currentPoolSize shouldEqual 2 g() // now we know that the first reaction has finished tp.currentPoolSize shouldEqual 1 diff --git a/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala b/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala index d9cf0c26..d9bea043 100644 --- a/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala +++ b/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala @@ -559,11 +559,17 @@ class MacrosSpec extends FlatSpec with Matchers with BeforeAndAfterEach { x shouldEqual "x" + val y = { + val z = getName + (z, getName) + } + y shouldEqual(("z", "y")) + val (y1,y2) = { val z = getName (z, getName) } - (y1,y2) shouldEqual(("z", "y")) + (y1, y2) shouldEqual(("z", "x$7")) } it should "correctly recognize nested injections of non-blocking molecules" in { From f44ca471e4795aaccec3b9035035a676b6e39a74 Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 19:21:59 -0800 Subject: [PATCH 11/14] add a test for resuming reactions with blocking molecules; fix the bug related to that --- .../winitzki/test/SingletonMoleculeSpec.scala | 21 +++++++++ .../code/winitzki/jc/JoinDefinition.scala | 43 +++++++++++-------- .../main/scala/code/winitzki/jc/Pool.scala | 1 - .../scala/code/winitzki/jc/JoinRunSpec.scala | 29 +++++++++++++ 4 files changed, 75 insertions(+), 19 deletions(-) diff --git a/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala b/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala index c5f60703..1d0e0977 100644 --- a/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala @@ -206,6 +206,27 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests tp.shutdownNow() } + it should "refuse to define a blocking molecule as a singleton" in { + + val tp = new FixedPool(1) + + val c = m[Int] + val d = m[Int] + val f = b[Unit, Unit] + + val thrown = intercept[Exception] { + join(tp)( + & { case f(_, r) => r() }, + & { case c(x) + d(_) => d(x) }, + & { case _ => f(); d(0) } + ) + } + + thrown.getMessage shouldEqual "In Join{c + d => ...; f/B => ...}: Refusing to inject molecule f/B() as a singleton (must be a non-blocking molecule)" + + tp.shutdownNow() + } + it should "report that the value of a singleton is ready even if called early" in { val tp = new FixedPool(1) diff --git a/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala b/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala index 2ba30a47..82f2e52d 100644 --- a/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala +++ b/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala @@ -85,6 +85,11 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, // moleculesAndValues.foreach{ case (m, v) => m(v) } } + private sealed trait ReactionExitStatus + private case object ReactionExitSuccess extends ReactionExitStatus + private case object ReactionExitFailure extends ReactionExitStatus + private case object ReactionExitRetryFailure extends ReactionExitStatus + /** This closure will be run on the reaction thread pool to start a new reaction. * * @param reaction Reaction to run. @@ -92,9 +97,10 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, */ private def buildReactionClosure(reaction: Reaction, usedInputs: LinearMoleculeBag): Unit = { if (logLevel > 1) println(s"Debug: In $this: reaction {$reaction} started on thread pool $reactionPool with thread id ${Thread.currentThread().getId}") - try { + val exitStatus : ReactionExitStatus = try { // Here we actually apply the reaction body to its input molecules. reaction.body.apply(UnapplyRun(usedInputs)) + ReactionExitSuccess } catch { // Various exceptions that occurred while running the reaction. case e: ExceptionInJoinRun => @@ -102,24 +108,23 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, // We should not try to recover from this; it is most either an error on user's part // or a bug in JoinRun. reportError(s"In $this: Reaction {$reaction} produced an exception that is internal to JoinRun. Input molecules ${moleculeBagToString(usedInputs)} were not injected again. Message: ${e.getMessage}") - // Let's not print it, and let's not throw it again, since it's our internal exception. - // e.printStackTrace() // This will be printed asynchronously, out of order with the previous message. - // throw e + // Let's not print it, and let's not throw it again, since it's our internal exception. + // e.printStackTrace() // This will be printed asynchronously, out of order with the previous message. + // throw e + ReactionExitFailure case e: Exception => // Running the reaction body produced an exception. Note that the exception has killed a thread. - // We will now re-insert the input molecules. Hopefully, no side-effects or output molecules were produced so far. - val aboutMolecules = if (reaction.retry) { - usedInputs.foreach { - case (mol: M[_], v) => inject(mol, v) - case (mol: B[_, _], v) => () // Do not re-inject blocking molecules - the reply action will be still usable when the reaction re-runs. - } - "were injected again" + // We will now re-insert the input molecules (except the blocking ones). Hopefully, no side-effects or output molecules were produced so far. + val (status, aboutMolecules) = if (reaction.retry) { + usedInputs.foreach { case (mol, v) => inject(mol, v) } + (ReactionExitRetryFailure, "were injected again") } - else "were consumed and not injected again" + else (ReactionExitFailure, "were consumed and not injected again") reportError(s"In $this: Reaction {$reaction} produced an exception. Input molecules ${moleculeBagToString(usedInputs)} $aboutMolecules. Message: ${e.getMessage}") -// e.printStackTrace() // This will be printed asynchronously, out of order with the previous message. + // e.printStackTrace() // This will be printed asynchronously, out of order with the previous message. Let's not print this. + status } // Now that the reaction is finished, we inspect the results. @@ -141,8 +146,11 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, val messageMultipleReply = blockingMoleculesWithMultipleReply map { s => s"Error: In $this: Reaction {$reaction} replied to $s more than once" } // We will report all errors to each blocking molecule. + // However, if the reaction failed with retry, we don't yet need to release semaphores and don't need to report errors due to missing reply. + val notFailedWithRetry = exitStatus != ReactionExitRetryFailure val errorMessage = Seq(messageNoReply, messageMultipleReply).flatten.mkString("; ") - val haveErrorsWithBlockingMolecules = blockingMoleculesWithNoReply.nonEmpty || blockingMoleculesWithMultipleReply.nonEmpty + val haveErrorsWithBlockingMolecules = + (blockingMoleculesWithNoReply.nonEmpty && notFailedWithRetry)|| blockingMoleculesWithMultipleReply.nonEmpty // Insert error messages into the reply wrappers and release all semaphores. usedInputs.foreach { @@ -150,7 +158,7 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, if (haveErrorsWithBlockingMolecules) { replyValue.errorMessage = Some(errorMessage) } - replyValue.releaseSemaphore() + if (notFailedWithRetry) replyValue.releaseSemaphore() case _ => () } @@ -224,7 +232,6 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, // A basic check that we are using our mutable structures safely. We should never see this error. if (!reaction.inputMolecules.toSet.equals(usedInputs.keySet)) { val message = s"Internal error: In $this: attempt to start reaction {$reaction} with incorrect inputs ${moleculeBagToString(usedInputs)}" - println(message) throw new ExceptionWrongInputs(message) } // Build a closure out of the reaction, and run that closure on the reaction's thread pool. @@ -266,7 +273,7 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, moleculesPresent.addToBag(m, molValue) singletonValues.put(m, molValue) } else { - throw new ExceptionInjectingSingleton(s"In $this: Refusing to inject molecule $m($molValue) because it is not a singleton") + throw new ExceptionInjectingSingleton(s"In $this: Refusing to inject molecule $m($molValue) as a singleton (must be a non-blocking molecule)") } } else @@ -335,7 +342,7 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, if (m.isSingleton) { if (singletonValues.containsKey(m)) { singletonValues.get(m).asInstanceOf[AbsMolValue[T]].getValue - } else throw new Exception(s"The volatile reader for singleton ($m) is not yet ready") + } else throw new Exception(s"Internal error: In $this: The volatile reader for singleton ($m) is not yet ready") } else throw new ExceptionNoSingleton(s"In $this: volatile reader requested for non-singleton ($m)") diff --git a/lib/src/main/scala/code/winitzki/jc/Pool.scala b/lib/src/main/scala/code/winitzki/jc/Pool.scala index 59fcb591..a11e1309 100644 --- a/lib/src/main/scala/code/winitzki/jc/Pool.scala +++ b/lib/src/main/scala/code/winitzki/jc/Pool.scala @@ -25,7 +25,6 @@ trait Pool { def runClosure(closure: => Unit, info: ReactionInfo): Unit - def isActive: Boolean = !isInactive def isInactive: Boolean def canMakeThreads: Boolean = true diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala index 29f8fc83..a2258d29 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala @@ -355,4 +355,33 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo result shouldEqual Some(()) } + it should "resume fault-tolerant reactions that contain blocking molecules" in { + val n = 20 + + val probabilityOfCrash = 0.5 + + val c = new M[Int]("counter") + val d = new B[Unit, Unit]("decrement") + val g = new B[Unit, Unit]("getValue") + val tp = new FixedPool(2) + + join(tp0)( + runSimple { case c(x) + d(_, r) => + if (scala.util.Random.nextDouble >= probabilityOfCrash) { c(x - 1); r() } else throw new Exception("crash! (it's OK, ignore this)") + }.withRetry onThreads tp, + runSimple { case c(0) + g(_, r) => r() } + ) + c(n) + (1 to n).foreach { _ => + if (d(timeout = 1500 millis)().isEmpty) { + println(JoinRun.errors.toList) // this should not happen, but will be helpful for debugging + } + } + + val result = g(timeout = 1500 millis)() + JoinRun.errors.exists(_.contains("Message: crash! (it's OK, ignore this)")) + tp.shutdownNow() + result shouldEqual Some(()) + } + } From bd749818ca93d5495e393b2598426f60e1dc9e7d Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 19:51:40 -0800 Subject: [PATCH 12/14] additional tests for smart thread and static analysis --- .../winitzki/test/SingletonMoleculeSpec.scala | 58 +++++++++++++++++++ .../main/scala/code/winitzki/jc/Pool.scala | 4 +- .../scala/code/winitzki/jc/SmartPool.scala | 2 +- .../code/winitzki/jc/StaticAnalysis.scala | 8 +-- .../scala/code/winitzki/jc/PoolSpec.scala | 41 ++++++++++--- 5 files changed, 97 insertions(+), 16 deletions(-) diff --git a/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala b/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala index 1d0e0977..94e2bb7e 100644 --- a/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala @@ -118,6 +118,24 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests tp.shutdownNow() } + it should "signal error when a singleton is consumed multiple times by reaction" in { + + val tp = new FixedPool(3) + + val thrown = intercept[Exception] { + val d = m[Unit] + val e = m[Unit] + + join(tp)( + & { case e(_) + d(_) + d(_) => d() }, + & { case _ => d() } // singleton + ) + } + thrown.getMessage shouldEqual "In Join{d + d + e => ...}: Incorrect chemistry: singleton (d) consumed 2 times by reaction d(_) + d(_) + e(_) => d()" + + tp.shutdownNow() + } + it should "signal error when a singleton is injected but not bound to any join definition" in { val tp = new FixedPool(3) @@ -325,4 +343,44 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests tp3.shutdownNow() } + it should "signal error when a singleton is injected fewer times than declared" in { + + val tp = new FixedPool(3) + + val thrown = intercept[Exception] { + val c = b[Unit, String] + val d = m[Unit] + val e = m[Unit] + val f = m[Unit] + + join(tp)( + & { case d(_) +e(_) + f(_) + c(_, r) => r("ok"); d(); e(); f() }, + & { case _ => if (false) { d(); e() }; f(); } // singletons d() and e() will actually not be injected because of a condition + ) + } + thrown.getMessage shouldEqual "In Join{c/B + d + e + f => ...}: Too few singletons injected: d injected 0 times instead of 1, e injected 0 times instead of 1" + + tp.shutdownNow() + } + + it should "signal no error (but a warning) when a singleton is injected more times than declared" in { + + val tp = new FixedPool(3) + + val c = b[Unit, String] + val d = m[Unit] + val e = m[Unit] + val f = m[Unit] + + val warnings = join(tp)( + & { case d(_) + e(_) + f(_) + c(_, r) => r("ok"); d(); e(); f() }, + & { case _ => (1 to 2).foreach { _ => d(); e() }; f(); } // singletons d() and e() will actually be injected more times + ) + + warnings.errors shouldEqual Seq() + warnings.warnings shouldEqual Seq("Possibly too many singletons injected: d injected 2 times instead of 1, e injected 2 times instead of 1") + + tp.shutdownNow() + } + } diff --git a/lib/src/main/scala/code/winitzki/jc/Pool.scala b/lib/src/main/scala/code/winitzki/jc/Pool.scala index a11e1309..f5ae802f 100644 --- a/lib/src/main/scala/code/winitzki/jc/Pool.scala +++ b/lib/src/main/scala/code/winitzki/jc/Pool.scala @@ -26,8 +26,6 @@ trait Pool { def runClosure(closure: => Unit, info: ReactionInfo): Unit def isInactive: Boolean - - def canMakeThreads: Boolean = true } private[jc] class PoolExecutor(threads: Int = 8, execFactory: Int => ExecutorService) extends Pool { @@ -45,7 +43,7 @@ private[jc] class PoolExecutor(threads: Int = 8, execFactory: Int => ExecutorSer execService.shutdownNow() () } - }.run() + }.start() def runClosure(closure: => Unit, info: ReactionInfo): Unit = execService.execute(new RunnableWithInfo(closure, info)) diff --git a/lib/src/main/scala/code/winitzki/jc/SmartPool.scala b/lib/src/main/scala/code/winitzki/jc/SmartPool.scala index 40d4b715..c499e515 100644 --- a/lib/src/main/scala/code/winitzki/jc/SmartPool.scala +++ b/lib/src/main/scala/code/winitzki/jc/SmartPool.scala @@ -63,7 +63,7 @@ class SmartPool(parallelism: Int) extends Pool { executor.shutdownNow() () } - }.run() + }.start() override def runClosure(closure: => Unit, info: ReactionInfo): Unit = { executor.submit(new RunnableWithInfo(closure, info)) diff --git a/lib/src/main/scala/code/winitzki/jc/StaticAnalysis.scala b/lib/src/main/scala/code/winitzki/jc/StaticAnalysis.scala index c31411f0..235a9d1c 100644 --- a/lib/src/main/scala/code/winitzki/jc/StaticAnalysis.scala +++ b/lib/src/main/scala/code/winitzki/jc/StaticAnalysis.scala @@ -289,9 +289,9 @@ private object StaticAnalysis { .filter { case (mol, count) => singletonsInjected.getOrElse(mol, 0) < count } .map { case (mol, count) => val countInjected = singletonsInjected.getOrElse(mol, 0) - s"$mol $countInjected times instead of $count" + s"$mol injected $countInjected times instead of $count" } - if (foundErrors.nonEmpty) Seq(s"Too few singletons injected: ${foundErrors.mkString(", ")}") else Seq() + if (foundErrors.nonEmpty) Seq(s"Too few singletons injected: ${foundErrors.toList.sorted.mkString(", ")}") else Seq() } // Inspect the singletons actually injected. Their multiplicities must be not more than the declared multiplicities. @@ -300,9 +300,9 @@ private object StaticAnalysis { .filter { case (mol, count) => singletonsInjected.getOrElse(mol, 0) > count } .map { case (mol, count) => val countInjected = singletonsInjected.getOrElse(mol, 0) - s"$mol $countInjected times instead of $count" + s"$mol injected $countInjected times instead of $count" } - if (foundErrors.nonEmpty) Seq(s"Possibly too many singletons injected: ${foundErrors.mkString(", ")}") else Seq() + if (foundErrors.nonEmpty) Seq(s"Possibly too many singletons injected: ${foundErrors.toList.sorted.mkString(", ")}") else Seq() } } diff --git a/lib/src/test/scala/code/winitzki/jc/PoolSpec.scala b/lib/src/test/scala/code/winitzki/jc/PoolSpec.scala index 2166141d..98fef0a5 100644 --- a/lib/src/test/scala/code/winitzki/jc/PoolSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/PoolSpec.scala @@ -13,8 +13,6 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { val patienceConfig = PatienceConfig(timeout = Span(500, Millis)) - val dummyInfo = emptyReactionInfo - behavior of "thread with info" def checkPool(tp: Pool): Unit = { @@ -25,10 +23,10 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { case t : ThreadWithInfo => Some(t.reactionInfo) case _ => None } - waiter { threadInfoOptOpt shouldEqual Some(Some(dummyInfo)) } + waiter { threadInfoOptOpt shouldEqual Some(Some(emptyReactionInfo)) } waiter.dismiss() - }, dummyInfo) + }, emptyReactionInfo) waiter.await()(patienceConfig, implicitly[Position]) } @@ -88,7 +86,7 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { } catch { case e: InterruptedException => () } - }, dummyInfo) + }, emptyReactionInfo) waiter.await()(patienceConfig, implicitly[Position]) @@ -110,7 +108,7 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { other.printStackTrace() waiter { false shouldEqual true } } - }, dummyInfo) + }, emptyReactionInfo) Thread.sleep(20) tp.shutdownNow() @@ -134,7 +132,7 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { } catch { case e: InterruptedException => () } - }, dummyInfo) + }, emptyReactionInfo) waiter.await()(patienceConfig, implicitly[Position]) @@ -156,7 +154,7 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { other.printStackTrace() waiter.dismiss() } - }, dummyInfo) + }, emptyReactionInfo) Thread.sleep(20) tp.shutdownNow() @@ -164,4 +162,31 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { waiter.await()(patienceConfig, implicitly[Position]) } + behavior of "smart thread" + + it should "run tasks on ordinary threads" in { + var x = 0 + new RunnableWithInfo({x = 1}, emptyReactionInfo).run() + + x shouldEqual 1 + + } + + it should "run tasks on smart threads and store info" in { + val waiter = new Waiter + + var x = 0 + val runnable = new RunnableWithInfo({ + x = 1; waiter.dismiss() + }, emptyReactionInfo) + val smartThread = new ThreadWithInfo(runnable) + smartThread.reactionInfo shouldEqual None // too early now, the runnable has not yet started + smartThread.start() + + waiter.await()(patienceConfig, implicitly[Position]) + + x shouldEqual 1 + smartThread.reactionInfo shouldEqual Some(emptyReactionInfo) + } + } From d8a79cd83149ea03b4b85d92c164bcf9456f3640 Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 20:03:44 -0800 Subject: [PATCH 13/14] more tests for coverage --- lib/src/main/scala/code/winitzki/jc/JoinRun.scala | 7 +------ lib/src/main/scala/code/winitzki/jc/MutableBag.scala | 3 ++- .../scala/code/winitzki/jc/JoinRunBlockingSpec.scala | 2 +- .../test/scala/code/winitzki/jc/LibrarySpec.scala | 7 +++++++ .../test/scala/code/winitzki/jc/MutableBagSpec.scala | 12 ++++++++++++ 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/lib/src/main/scala/code/winitzki/jc/JoinRun.scala b/lib/src/main/scala/code/winitzki/jc/JoinRun.scala index ae16c1e3..dd982255 100644 --- a/lib/src/main/scala/code/winitzki/jc/JoinRun.scala +++ b/lib/src/main/scala/code/winitzki/jc/JoinRun.scala @@ -20,12 +20,7 @@ import scala.collection.JavaConverters._ object JoinRun { - sealed trait InputPatternType { - def isUnconditional: Boolean = this match { - case Wildcard | SimpleVar => true - case _ => false - } - } + sealed trait InputPatternType case object Wildcard extends InputPatternType diff --git a/lib/src/main/scala/code/winitzki/jc/MutableBag.scala b/lib/src/main/scala/code/winitzki/jc/MutableBag.scala index d538e0d1..001d601d 100644 --- a/lib/src/main/scala/code/winitzki/jc/MutableBag.scala +++ b/lib/src/main/scala/code/winitzki/jc/MutableBag.scala @@ -80,6 +80,7 @@ class MutableBag[K,V] { anotherBag.foreach { case (k, v) => removeFromBag(k, v) } } +/* // about 30% slower than MutableBag, and not sure we need it, since all operations with molecule bag are synchronized now. class ConcurrentMutableBag[K,V] { @@ -123,7 +124,7 @@ class ConcurrentMutableBag[K,V] { } -/* */ +*/ // previous implementation - becomes slow if we have many repeated values, fails performance test /* class MutableBag[K,V] { diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala index c707b432..036c8e6f 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala @@ -326,7 +326,7 @@ class JoinRunBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests w it should "implement BlockingIdle(BlockingIdle()) as BlockingIdle()" in { val tp = new SmartPool(1) val (g, g2) = makeBlockingCheck(BlockingIdle{BlockingIdle{Thread.sleep(300)}}, tp) - g2(timeout = 50 millis)() shouldEqual Some(1) // this should not be blocked + g2(timeout = 150 millis)() shouldEqual Some(1) // this should not be blocked tp.currentPoolSize shouldEqual 2 g() // now we know that the first reaction has finished tp.currentPoolSize shouldEqual 1 diff --git a/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala b/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala index f7d563a7..08a5d613 100644 --- a/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala @@ -121,4 +121,11 @@ class LibrarySpec extends FlatSpec with Matchers with TimeLimitedTests { tp.shutdownNow() } + behavior of "cleanup with resource" + + it should "catch exceptions and not fail" in { + val tryX = cleanup(1)(_ => throw new Exception("ignore this exception"))(_ => throw new Exception("foo")) + tryX.isFailure shouldEqual true + } + } \ No newline at end of file diff --git a/lib/src/test/scala/code/winitzki/jc/MutableBagSpec.scala b/lib/src/test/scala/code/winitzki/jc/MutableBagSpec.scala index ced96942..f4f7c006 100644 --- a/lib/src/test/scala/code/winitzki/jc/MutableBagSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/MutableBagSpec.scala @@ -23,6 +23,18 @@ class MutableBagSpec extends FlatSpec with Matchers with TimeLimitedTests { b.getOne(1) shouldEqual Some("a") } + it should "make a bag with one element" in { + val b = MutableBag.of(1, "a") + b.size shouldEqual 1 + b.getOne(1) shouldEqual Some("a") + } + + it should "print a bag" in { + val b = MutableBag.of(1, "a") + b.addToBag(2, "b") + b.toString shouldEqual "Map(2 -> Map(b -> 1), 1 -> Map(a -> 1))" + } + it should "add two elements with the same key and the same value, them remove them both" in { val b = new MutableBag[Int, String] b.addToBag(1, "a") From 48d83eba693ebbfa4e87d790b5b3716c49eee989 Mon Sep 17 00:00:00 2001 From: winitzki Date: Mon, 19 Dec 2016 20:43:43 -0800 Subject: [PATCH 14/14] relax limits on one more test --- .../src/test/scala/code/winitzki/test/MoreBlockingSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala b/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala index cf69df41..05157be5 100644 --- a/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala @@ -164,10 +164,10 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests { & { case d(x) + incr(_, r) => r(); wait(); d(x+1) } ) d(100) - incr() // update started and is waiting for e() + incr() // reaction 3 started and is waiting for e() get_d(timeout = 400 millis)() shouldEqual None e() - get_d(timeout = 400 millis)() shouldEqual Some(101) + get_d(timeout = 800 millis)() shouldEqual Some(101) tp.shutdownNow() }