diff --git a/README.md b/README.md index 30b56d26..bfae6599 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ [![Build Status](https://travis-ci.org/winitzki/joinrun-scala.svg?branch=master)](https://travis-ci.org/winitzki/joinrun-scala) [![Coverage Status](https://codecov.io/gh/winitzki/joinrun-scala/coverage.svg?branch=master)](https://codecov.io/gh/winitzki/joinrun-scala?branch=master) +[![Version](http://img.shields.io/badge/version-0.7.7-blue.svg?style=flat)](https://github.com/winitzki/joinrun-scala/releases) +[![License](https://img.shields.io/github/license/mashape/apistatus.svg)](https://opensource.org/licenses/MIT) [![Join the chat at https://gitter.im/joinrun-scala/Lobby](https://badges.gitter.im/joinrun-scala/Lobby.svg)](https://gitter.im/joinrun-scala/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) diff --git a/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks1.scala b/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks1.scala index 87816d16..5776d516 100644 --- a/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks1.scala +++ b/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks1.scala @@ -3,7 +3,7 @@ package code.winitzki.benchmark import java.time.LocalDateTime import code.winitzki.benchmark.Common._ -import code.winitzki.jc.{Pool, FixedPool} +import code.winitzki.jc.Pool import code.winitzki.jc.Macros._ import code.winitzki.jc.JoinRun._ diff --git a/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks9.scala b/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks9.scala index 08bee6df..0205263a 100644 --- a/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks9.scala +++ b/benchmark/src/main/scala/code/winitzki/benchmark/Benchmarks9.scala @@ -86,7 +86,7 @@ object Benchmarks9 { val all_done = m[Int] val f = b[LocalDateTime,Long] - val tp = new SmartPool(MainApp.threads) // this benchmark will not work with a fixed pool + val tp = new SmartPool(MainAppConfig.threads) // this benchmark will not work with a fixed pool join(tp)( run { case all_done(0) + f(tInit, r) => r(elapsed(tInit)) }, diff --git a/benchmark/src/main/scala/code/winitzki/benchmark/Common.scala b/benchmark/src/main/scala/code/winitzki/benchmark/Common.scala index fe137afc..2f1979e5 100644 --- a/benchmark/src/main/scala/code/winitzki/benchmark/Common.scala +++ b/benchmark/src/main/scala/code/winitzki/benchmark/Common.scala @@ -4,7 +4,7 @@ import java.time.LocalDateTime import java.time.temporal.ChronoUnit object Common { - val warmupTimeMs = 50 + val warmupTimeMs = 50L def elapsed(initTime: LocalDateTime): Long = initTime.until(LocalDateTime.now, ChronoUnit.MILLIS) diff --git a/benchmark/src/main/scala/code/winitzki/benchmark/JiansenJoin.scala b/benchmark/src/main/scala/code/winitzki/benchmark/JiansenJoin.scala index b2eb4b23..5b331d49 100644 --- a/benchmark/src/main/scala/code/winitzki/benchmark/JiansenJoin.scala +++ b/benchmark/src/main/scala/code/winitzki/benchmark/JiansenJoin.scala @@ -5,6 +5,8 @@ This file is not a part of JoinRun. Only the benchmark application uses this fil for performance comparisons between JoinRun and ScalaJoin. Some minor changes were made to accommodate updates in the Scala standard library since 2011. + +This code is not being maintained. Compiling this file will give lots of warnings. */ /** A module providing constracts for join patterns. @@ -267,7 +269,7 @@ class SynName[Arg, R](implicit owner: Join, argT:ClassManifest[Arg], resT:ClassM object and{ // def unapply(attr:(Set[NameBase], Queue[(NameBase, Any)], PartialFunction[Any, Any], Int)) = { def unapply(attr:Any) = { - Some(attr,attr) + Some((attr,attr)) } } diff --git a/benchmark/src/main/scala/code/winitzki/benchmark/MainApp.scala b/benchmark/src/main/scala/code/winitzki/benchmark/MainApp.scala index 0a6a8f7e..1db6b99b 100644 --- a/benchmark/src/main/scala/code/winitzki/benchmark/MainApp.scala +++ b/benchmark/src/main/scala/code/winitzki/benchmark/MainApp.scala @@ -7,7 +7,15 @@ import code.winitzki.benchmark.Benchmarks9._ import code.winitzki.jc.{FixedPool, Pool} import code.winitzki.jc.JoinRun.{defaultJoinPool, defaultReactionPool} +object MainAppConfig { + + val n = 50000 + + val threads = 8 +} + object MainApp extends App { + import MainAppConfig._ val version = "0.0.5" def run3times(task: => Long): Long = { @@ -24,10 +32,6 @@ object MainApp extends App { (result + prime2 + 1) / 2 } - val n = 50000 - - val threads = 8 - println(s"Benchmark parameters: count to $n, threads = $threads") Seq[(String, (Int, Pool) => Long)]( diff --git a/benchmark/src/test/scala/code/winitzki/benchmark/JiansenFairnessSpec.scala b/benchmark/src/test/scala/code/winitzki/benchmark/JiansenFairnessSpec.scala index 3fe965a3..2c948150 100644 --- a/benchmark/src/test/scala/code/winitzki/benchmark/JiansenFairnessSpec.scala +++ b/benchmark/src/test/scala/code/winitzki/benchmark/JiansenFairnessSpec.scala @@ -7,7 +7,7 @@ import org.scalatest.{FlatSpec, Matchers} class JiansenFairnessSpec extends FlatSpec with Matchers with TimeLimitedTests { - val timeLimit = Span(500, Millis) + val timeLimit = Span(2000, Millis) // fairness over reactions: // We have n molecules A:JA[Unit], which can all interact with a single molecule C:JA[(Int,Array[Int])]. diff --git a/benchmark/src/test/scala/code/winitzki/benchmark/MapReduceSpec.scala b/benchmark/src/test/scala/code/winitzki/benchmark/MapReduceSpec.scala index afc26e5c..a645c75d 100644 --- a/benchmark/src/test/scala/code/winitzki/benchmark/MapReduceSpec.scala +++ b/benchmark/src/test/scala/code/winitzki/benchmark/MapReduceSpec.scala @@ -52,17 +52,17 @@ class MapReduceSpec extends FlatSpec with Matchers { val fetch = b[Unit,Int] val tp = new FixedPool(4) + // declare the reaction for "map" join(tp)( - & { case carrier(a) => val res = f(a); interm(res) } + & { case carrier(x) => val res = f(x); interm(res) } ) // reactions for "reduce" must be together since they share "accum" join(tp)( - & { case accum((n, b)) + interm(res) if n > 0 => + & { case accum((n, b)) + interm(res) => accum((n+1, reduceB(b, res) )) }, - & { case accum((0, _)) + interm(res) => accum((1, res)) }, & { case accum((n, b)) + fetch(_, reply) if n == arr.size => reply(b) } ) @@ -70,7 +70,7 @@ class MapReduceSpec extends FlatSpec with Matchers { accum((0, 0)) arr.foreach(i => carrier(i)) val result = fetch() - result shouldEqual 338350 + result shouldEqual arr.map(f).reduce(reduceB) // 338350 } } diff --git a/benchmark/src/test/scala/code/winitzki/benchmark/MergesortSpec.scala b/benchmark/src/test/scala/code/winitzki/benchmark/MergesortSpec.scala index 2d9cb03b..737cbc2b 100644 --- a/benchmark/src/test/scala/code/winitzki/benchmark/MergesortSpec.scala +++ b/benchmark/src/test/scala/code/winitzki/benchmark/MergesortSpec.scala @@ -81,7 +81,7 @@ class MergesortSpec extends FlatSpec with Matchers { ) // inject lower-level mergesort - mergesort(part1, sorted1) + mergesort(part2, sorted2) + mergesort((part1, sorted1)) + mergesort((part2, sorted2)) } } ) diff --git a/build.sbt b/build.sbt index 367c3fb0..e9dc8a51 100644 --- a/build.sbt +++ b/build.sbt @@ -21,7 +21,24 @@ val commonSettings = Defaults.coreDefaultSettings ++ Seq( resolvers += Resolver.sonatypeRepo("snapshots"), resolvers += Resolver.sonatypeRepo("releases"), resolvers += "Typesafe releases" at "http://repo.typesafe.com/typesafe/releases", - scalacOptions ++= Seq() + + scalacOptions ++= Seq( // https://tpolecat.github.io/2014/04/11/scalac-flags.html +// "-deprecation", + "-unchecked", + "-encoding", "UTF-8", // yes, this is 2 args + "-feature", + "-language:existentials", + "-language:higherKinds", + "-language:implicitConversions", + // "-Xfatal-warnings", + "-Xlint", +// "-Yno-adapted-args", // Makes calling a() fail to substitute a Unit argument into a.apply(x: Unit) + "-Ywarn-dead-code", // N.B. doesn't work well with the ??? hole + "-Ywarn-numeric-widen", + "-Ywarn-value-discard", + // "-Xfuture", // Makes Benchmarks code fail + "-Ywarn-unused-import" // 2.11 only + ) ) tutSettings diff --git a/docs/chymyst04.md b/docs/chymyst04.md index 2fe84cb5..3320b45f 100644 --- a/docs/chymyst04.md +++ b/docs/chymyst04.md @@ -62,7 +62,7 @@ val interm = m[B] Therefore, we need a reaction of this shape: ```scala -run { case carrier(a) => val res = f(a); interm(res) } +run { case carrier(x) => val res = f(x); interm(res) } ``` @@ -100,20 +100,22 @@ Let us change the type of `accum` to carry a tuple `(Int, B)`. The first element of the tuple will now represent a counter, which indicates how many intermediate results we have already processed. Reactions with `accum` will increment the counter; the reaction with `fetch` will proceed only if the counter is equal to the length of the array. -We will also include a condition on the counter that will start the accumulation when the counter is equal to 0. - ```scala val accum = m[(Int, B)] -run { case accum((n, b)) + interm(res) if n > 0 => +run { case accum((n, b)) + interm(res) => accum((n+1, reduceB(b, res) )) }, -run { case accum((0, _)) + interm(res) => accum((1, res)) }, run { case accum((n, b)) + fetch(_, reply) if n == arr.size => reply(b) } ``` -We can now inject all `carrier` molecules, a single `accum((0, null))` molecule, and a `fetch()` molecule. +What value should we inject with `accum` initially? +When the first `interm(res)` molecule arrives, we will need to call `reduceB(x, res)` with some value `x` of type `B`. +Since we assume that `B` is a monoid, there must be a special value, say `bZero`, such that `reduceB(bZero, res)==res`. +So `bZero` is the value we need to inject on the initial `accum` molecule. + +We can now inject all `carrier` molecules, a single `accum((0, bZero))` molecule, and a `fetch()` molecule. Because of the guard condition, the reaction with `fetch()` will not run until all intermediate results have been accumulated. Here is the complete code for this example (see also `MapReduceSpec.scala` in the unit tests). @@ -139,7 +141,7 @@ object C extends App { // declare the reaction for "map" join( - run { case carrier(a) => val res = f(a); interm(res) } + run { case carrier(x) => val res = f(x); interm(res) } ) // reactions for "reduce" must be together since they share "accum" diff --git a/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala b/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala index ab72d4d0..05157be5 100644 --- a/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala @@ -1,6 +1,6 @@ package code.winitzki.test -import code.winitzki.jc.{FixedPool, SmartPool} +import code.winitzki.jc.FixedPool import code.winitzki.jc.JoinRun._ import code.winitzki.jc.Macros.{run => &} import code.winitzki.jc.Macros._ @@ -10,6 +10,7 @@ import org.scalatest.concurrent.Waiters.Waiter import org.scalatest.time.{Millis, Span} import scala.concurrent.duration._ +import scala.language.postfixOps class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests { @@ -70,14 +71,14 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests { collect(0) val numberOfFailures = (1 to 10000).map { _ => - if (f(timeout = 1000.millis)().isEmpty) 1 else 0 + if (f(timeout = 1000 millis)().isEmpty) 1 else 0 }.sum // we used to have about 4% numberOfFailures (but we get zero failures if we do not nullify the semaphore!) and about 4 numberOfFalseReplies in 100,000. // now it seems to pass even with a million iterations (but that's too long for Travis). val numberOfFalseReplies = get() - (numberOfFailures, numberOfFalseReplies) shouldEqual (0,0) + (numberOfFailures, numberOfFalseReplies) shouldEqual ((0,0)) tp.shutdownNow() } @@ -163,10 +164,10 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests { & { case d(x) + incr(_, r) => r(); wait(); d(x+1) } ) d(100) - incr() // update started and is waiting for e() + incr() // reaction 3 started and is waiting for e() get_d(timeout = 400 millis)() shouldEqual None e() - get_d(timeout = 400 millis)() shouldEqual Some(101) + get_d(timeout = 800 millis)() shouldEqual Some(101) tp.shutdownNow() } diff --git a/joinrun/src/test/scala/code/winitzki/test/ParallelOrSpec.scala b/joinrun/src/test/scala/code/winitzki/test/ParallelOrSpec.scala index 657d9872..9b1af921 100644 --- a/joinrun/src/test/scala/code/winitzki/test/ParallelOrSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/ParallelOrSpec.scala @@ -8,6 +8,7 @@ import org.scalatest.{FlatSpec, Matchers} import scala.annotation.tailrec import scala.concurrent.duration._ +import scala.language.postfixOps import scala.reflect.ClassTag class ParallelOrSpec extends FlatSpec with Matchers { diff --git a/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala b/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala index 60d3c065..94e2bb7e 100644 --- a/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/SingletonMoleculeSpec.scala @@ -3,10 +3,11 @@ package code.winitzki.test import code.winitzki.jc.JoinRun._ import code.winitzki.jc.Macros.{run => &} import code.winitzki.jc.Macros._ -import code.winitzki.jc.{CachedPool, FixedPool, SmartPool} +import code.winitzki.jc.{FixedPool, SmartPool} import org.scalatest.concurrent.TimeLimitedTests import org.scalatest.time.{Millis, Span} import org.scalatest.{FlatSpec, Matchers} +import scala.language.postfixOps import scala.concurrent.duration._ @@ -117,6 +118,24 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests tp.shutdownNow() } + it should "signal error when a singleton is consumed multiple times by reaction" in { + + val tp = new FixedPool(3) + + val thrown = intercept[Exception] { + val d = m[Unit] + val e = m[Unit] + + join(tp)( + & { case e(_) + d(_) + d(_) => d() }, + & { case _ => d() } // singleton + ) + } + thrown.getMessage shouldEqual "In Join{d + d + e => ...}: Incorrect chemistry: singleton (d) consumed 2 times by reaction d(_) + d(_) + e(_) => d()" + + tp.shutdownNow() + } + it should "signal error when a singleton is injected but not bound to any join definition" in { val tp = new FixedPool(3) @@ -205,6 +224,27 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests tp.shutdownNow() } + it should "refuse to define a blocking molecule as a singleton" in { + + val tp = new FixedPool(1) + + val c = m[Int] + val d = m[Int] + val f = b[Unit, Unit] + + val thrown = intercept[Exception] { + join(tp)( + & { case f(_, r) => r() }, + & { case c(x) + d(_) => d(x) }, + & { case _ => f(); d(0) } + ) + } + + thrown.getMessage shouldEqual "In Join{c + d => ...; f/B => ...}: Refusing to inject molecule f/B() as a singleton (must be a non-blocking molecule)" + + tp.shutdownNow() + } + it should "report that the value of a singleton is ready even if called early" in { val tp = new FixedPool(1) @@ -303,4 +343,44 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests tp3.shutdownNow() } + it should "signal error when a singleton is injected fewer times than declared" in { + + val tp = new FixedPool(3) + + val thrown = intercept[Exception] { + val c = b[Unit, String] + val d = m[Unit] + val e = m[Unit] + val f = m[Unit] + + join(tp)( + & { case d(_) +e(_) + f(_) + c(_, r) => r("ok"); d(); e(); f() }, + & { case _ => if (false) { d(); e() }; f(); } // singletons d() and e() will actually not be injected because of a condition + ) + } + thrown.getMessage shouldEqual "In Join{c/B + d + e + f => ...}: Too few singletons injected: d injected 0 times instead of 1, e injected 0 times instead of 1" + + tp.shutdownNow() + } + + it should "signal no error (but a warning) when a singleton is injected more times than declared" in { + + val tp = new FixedPool(3) + + val c = b[Unit, String] + val d = m[Unit] + val e = m[Unit] + val f = m[Unit] + + val warnings = join(tp)( + & { case d(_) + e(_) + f(_) + c(_, r) => r("ok"); d(); e(); f() }, + & { case _ => (1 to 2).foreach { _ => d(); e() }; f(); } // singletons d() and e() will actually be injected more times + ) + + warnings.errors shouldEqual Seq() + warnings.warnings shouldEqual Seq("Possibly too many singletons injected: d injected 2 times instead of 1, e injected 2 times instead of 1") + + tp.shutdownNow() + } + } diff --git a/joinrun/src/test/scala/code/winitzki/test/StaticAnalysisSpec.scala b/joinrun/src/test/scala/code/winitzki/test/StaticAnalysisSpec.scala index bfc1c9e0..5f6b9612 100644 --- a/joinrun/src/test/scala/code/winitzki/test/StaticAnalysisSpec.scala +++ b/joinrun/src/test/scala/code/winitzki/test/StaticAnalysisSpec.scala @@ -12,7 +12,7 @@ class StaticAnalysisSpec extends FlatSpec with Matchers with TimeLimitedTests { val timeLimit = Span(1000, Millis) - val warmupTimeMs = 50 + val warmupTimeMs = 50L def waitSome(): Unit = Thread.sleep(warmupTimeMs) diff --git a/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala b/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala index 4532a9cb..82f2e52d 100644 --- a/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala +++ b/lib/src/main/scala/code/winitzki/jc/JoinDefinition.scala @@ -69,6 +69,7 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, private[jc] def setQuiescenceCallback(callback: M[Unit]): Unit = { quiescenceCallbacks.add(callback) + () } private lazy val possibleReactions: Map[Molecule, Seq[Reaction]] = reactionInfos.toSeq @@ -84,6 +85,11 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, // moleculesAndValues.foreach{ case (m, v) => m(v) } } + private sealed trait ReactionExitStatus + private case object ReactionExitSuccess extends ReactionExitStatus + private case object ReactionExitFailure extends ReactionExitStatus + private case object ReactionExitRetryFailure extends ReactionExitStatus + /** This closure will be run on the reaction thread pool to start a new reaction. * * @param reaction Reaction to run. @@ -91,9 +97,10 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, */ private def buildReactionClosure(reaction: Reaction, usedInputs: LinearMoleculeBag): Unit = { if (logLevel > 1) println(s"Debug: In $this: reaction {$reaction} started on thread pool $reactionPool with thread id ${Thread.currentThread().getId}") - try { + val exitStatus : ReactionExitStatus = try { // Here we actually apply the reaction body to its input molecules. reaction.body.apply(UnapplyRun(usedInputs)) + ReactionExitSuccess } catch { // Various exceptions that occurred while running the reaction. case e: ExceptionInJoinRun => @@ -101,24 +108,23 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, // We should not try to recover from this; it is most either an error on user's part // or a bug in JoinRun. reportError(s"In $this: Reaction {$reaction} produced an exception that is internal to JoinRun. Input molecules ${moleculeBagToString(usedInputs)} were not injected again. Message: ${e.getMessage}") - // Let's not print it, and let's not throw it again, since it's our internal exception. - // e.printStackTrace() // This will be printed asynchronously, out of order with the previous message. - // throw e + // Let's not print it, and let's not throw it again, since it's our internal exception. + // e.printStackTrace() // This will be printed asynchronously, out of order with the previous message. + // throw e + ReactionExitFailure case e: Exception => // Running the reaction body produced an exception. Note that the exception has killed a thread. - // We will now re-insert the input molecules. Hopefully, no side-effects or output molecules were produced so far. - val aboutMolecules = if (reaction.retry) { - usedInputs.foreach { - case (mol: M[_], v) => inject(mol, v) - case (mol: B[_, _], v) => () // Do not re-inject blocking molecules - the reply action will be still usable when the reaction re-runs. - } - "were injected again" + // We will now re-insert the input molecules (except the blocking ones). Hopefully, no side-effects or output molecules were produced so far. + val (status, aboutMolecules) = if (reaction.retry) { + usedInputs.foreach { case (mol, v) => inject(mol, v) } + (ReactionExitRetryFailure, "were injected again") } - else "were consumed and not injected again" + else (ReactionExitFailure, "were consumed and not injected again") reportError(s"In $this: Reaction {$reaction} produced an exception. Input molecules ${moleculeBagToString(usedInputs)} $aboutMolecules. Message: ${e.getMessage}") -// e.printStackTrace() // This will be printed asynchronously, out of order with the previous message. + // e.printStackTrace() // This will be printed asynchronously, out of order with the previous message. Let's not print this. + status } // Now that the reaction is finished, we inspect the results. @@ -140,8 +146,11 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, val messageMultipleReply = blockingMoleculesWithMultipleReply map { s => s"Error: In $this: Reaction {$reaction} replied to $s more than once" } // We will report all errors to each blocking molecule. + // However, if the reaction failed with retry, we don't yet need to release semaphores and don't need to report errors due to missing reply. + val notFailedWithRetry = exitStatus != ReactionExitRetryFailure val errorMessage = Seq(messageNoReply, messageMultipleReply).flatten.mkString("; ") - val haveErrorsWithBlockingMolecules = blockingMoleculesWithNoReply.nonEmpty || blockingMoleculesWithMultipleReply.nonEmpty + val haveErrorsWithBlockingMolecules = + (blockingMoleculesWithNoReply.nonEmpty && notFailedWithRetry)|| blockingMoleculesWithMultipleReply.nonEmpty // Insert error messages into the reply wrappers and release all semaphores. usedInputs.foreach { @@ -149,7 +158,7 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, if (haveErrorsWithBlockingMolecules) { replyValue.errorMessage = Some(errorMessage) } - replyValue.releaseSemaphore() + if (notFailedWithRetry) replyValue.releaseSemaphore() case _ => () } @@ -223,7 +232,6 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, // A basic check that we are using our mutable structures safely. We should never see this error. if (!reaction.inputMolecules.toSet.equals(usedInputs.keySet)) { val message = s"Internal error: In $this: attempt to start reaction {$reaction} with incorrect inputs ${moleculeBagToString(usedInputs)}" - println(message) throw new ExceptionWrongInputs(message) } // Build a closure out of the reaction, and run that closure on the reaction's thread pool. @@ -265,12 +273,13 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, moleculesPresent.addToBag(m, molValue) singletonValues.put(m, molValue) } else { - throw new ExceptionInjectingSingleton(s"In $this: Refusing to inject molecule $m($molValue) because it is not a singleton") + throw new ExceptionInjectingSingleton(s"In $this: Refusing to inject molecule $m($molValue) as a singleton (must be a non-blocking molecule)") } } else joinPool.runClosure(buildInjectClosure(m, molValue), currentReactionInfo.getOrElse(emptyReactionInfo)) } + () } // Remove a blocking molecule if it is present. @@ -333,7 +342,7 @@ private final class JoinDefinition(reactions: Seq[Reaction], reactionPool: Pool, if (m.isSingleton) { if (singletonValues.containsKey(m)) { singletonValues.get(m).asInstanceOf[AbsMolValue[T]].getValue - } else throw new Exception(s"The volatile reader for singleton ($m) is not yet ready") + } else throw new Exception(s"Internal error: In $this: The volatile reader for singleton ($m) is not yet ready") } else throw new ExceptionNoSingleton(s"In $this: volatile reader requested for non-singleton ($m)") diff --git a/lib/src/main/scala/code/winitzki/jc/JoinRun.scala b/lib/src/main/scala/code/winitzki/jc/JoinRun.scala index ed44f5de..dd982255 100644 --- a/lib/src/main/scala/code/winitzki/jc/JoinRun.scala +++ b/lib/src/main/scala/code/winitzki/jc/JoinRun.scala @@ -20,12 +20,7 @@ import scala.collection.JavaConverters._ object JoinRun { - sealed trait InputPatternType { - def isUnconditional: Boolean = this match { - case Wildcard | SimpleVar => true - case _ => false - } - } + sealed trait InputPatternType case object Wildcard extends InputPatternType @@ -283,7 +278,7 @@ object JoinRun { * @return an unapply operation */ object + { - def unapply(attr:Any) = Some(attr,attr) + def unapply(attr:Any): Option[(Any, Any)] = Some((attr,attr)) } /** Create a reaction value out of a simple reaction body. @@ -568,7 +563,7 @@ object JoinRun { /** Type alias for reaction body. * */ - private[jc] type ReactionBody = PartialFunction[UnapplyArg, Unit] + private[jc] type ReactionBody = PartialFunction[UnapplyArg, Any] def join(rs: Reaction*): WarningsAndErrors = join(defaultReactionPool, defaultJoinPool)(rs: _*) def join(reactionPool: Pool)(rs: Reaction*): WarningsAndErrors = join(reactionPool, reactionPool)(rs: _*) @@ -592,7 +587,10 @@ object JoinRun { private val errorLog: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() - private[jc] def reportError(message: String): Unit = errorLog.add(message) + private[jc] def reportError(message: String): Unit = { + errorLog.add(message) + () + } def errors = errorLog.iterator().asScala.toIterable diff --git a/lib/src/main/scala/code/winitzki/jc/Library.scala b/lib/src/main/scala/code/winitzki/jc/Library.scala index 5cf46bcf..8fc94a44 100644 --- a/lib/src/main/scala/code/winitzki/jc/Library.scala +++ b/lib/src/main/scala/code/winitzki/jc/Library.scala @@ -19,7 +19,7 @@ object Library { val p = Promise[T]() join(pool,pool)( - runSimple { case f(x) => p.success(x) } + runSimple { case f(x) => p.success(x); () } ) (f, p.future) } @@ -53,9 +53,9 @@ object Library { } } - def withPool[B](pool: => Pool)(doWork: Pool => B): Try[B] = cleanup(pool)(_.shutdownNow())(doWork) + def withPool[T](pool: => Pool)(doWork: Pool => T): Try[T] = cleanup(pool)(_.shutdownNow())(doWork) - def cleanup[A,B](resource: => A)(cleanup: A => Unit)(doWork: A => B): Try[B] = { + def cleanup[T,R](resource: => T)(cleanup: T => Unit)(doWork: T => R): Try[R] = { try { Success(doWork(resource)) } catch { diff --git a/lib/src/main/scala/code/winitzki/jc/MutableBag.scala b/lib/src/main/scala/code/winitzki/jc/MutableBag.scala index b6480438..001d601d 100644 --- a/lib/src/main/scala/code/winitzki/jc/MutableBag.scala +++ b/lib/src/main/scala/code/winitzki/jc/MutableBag.scala @@ -56,12 +56,15 @@ class MutableBag[K,V] { def getOne(k: K): Option[V] = bag.get(k).flatMap(_.headOption.map(_._1)) - def addToBag(k: K, v: V): Unit = bag.get(k) match { - case Some(vs) => - val newCount = vs.getOrElse(v, 0) + 1 - vs += (v -> newCount) - - case None => bag += (k -> mutable.Map(v -> 1)) + def addToBag(k: K, v: V): Unit = { + bag.get(k) match { + case Some(vs) => + val newCount = vs.getOrElse(v, 0) + 1 + vs += (v -> newCount) + + case None => bag += (k -> mutable.Map(v -> 1)) + } + () } def removeFromBag(k: K, v: V): Unit = bag.get(k).foreach { vs => @@ -77,6 +80,7 @@ class MutableBag[K,V] { anotherBag.foreach { case (k, v) => removeFromBag(k, v) } } +/* // about 30% slower than MutableBag, and not sure we need it, since all operations with molecule bag are synchronized now. class ConcurrentMutableBag[K,V] { @@ -92,12 +96,15 @@ class ConcurrentMutableBag[K,V] { def getOne(k: K): Option[V] = getMap.get(k).flatMap(_.headOption.map(_._1)) - def addToBag(k: K, v: V): Unit = if (bagConcurrentMap.containsKey(k)) { - val vs = bagConcurrentMap.get(k) - val newCount = vs.getOrDefault(v, 0) + 1 - vs.put(v, newCount) - } else { - bagConcurrentMap.put(k, new ConcurrentHashMap[V, Int](Map(v -> 1).asJava)) + def addToBag(k: K, v: V): Unit = { + if (bagConcurrentMap.containsKey(k)) { + val vs = bagConcurrentMap.get(k) + val newCount = vs.getOrDefault(v, 0) + 1 + vs.put(v, newCount) + } else { + bagConcurrentMap.put(k, new ConcurrentHashMap[V, Int](Map(v -> 1).asJava)) + } + () } def removeFromBag(k: K, v: V): Unit = if (bagConcurrentMap.containsKey(k)) { @@ -109,6 +116,7 @@ class ConcurrentMutableBag[K,V] { vs.put(v, newCount) if (vs.isEmpty) bagConcurrentMap.remove(k) + () } def removeFromBag(anotherBag: mutable.Map[K,V]): Unit = @@ -116,7 +124,7 @@ class ConcurrentMutableBag[K,V] { } -/* */ +*/ // previous implementation - becomes slow if we have many repeated values, fails performance test /* class MutableBag[K,V] { diff --git a/lib/src/main/scala/code/winitzki/jc/Pool.scala b/lib/src/main/scala/code/winitzki/jc/Pool.scala index 408ae678..f5ae802f 100644 --- a/lib/src/main/scala/code/winitzki/jc/Pool.scala +++ b/lib/src/main/scala/code/winitzki/jc/Pool.scala @@ -6,6 +6,7 @@ import java.util.concurrent._ import code.winitzki.jc.JoinRun.ReactionInfo import scala.concurrent.{ExecutionContext, Future} +import scala.language.reflectiveCalls class CachedPool(threads: Int) extends PoolExecutor(threads, t => new ThreadPoolExecutor(1, t, 1L, TimeUnit.SECONDS, new SynchronousQueue[Runnable], new ThreadFactoryWithInfo) @@ -24,18 +25,15 @@ trait Pool { def runClosure(closure: => Unit, info: ReactionInfo): Unit - def isActive: Boolean = !isInactive def isInactive: Boolean - - def canMakeThreads: Boolean = true } private[jc] class PoolExecutor(threads: Int = 8, execFactory: Int => ExecutorService) extends Pool { protected val execService = execFactory(threads) - val sleepTime = 200 + val sleepTime = 200L - def shutdownNow() = new Thread { + def shutdownNow(): Unit = new Thread { try{ execService.shutdown() execService.awaitTermination(sleepTime, TimeUnit.MILLISECONDS) @@ -43,8 +41,9 @@ private[jc] class PoolExecutor(threads: Int = 8, execFactory: Int => ExecutorSer execService.shutdownNow() execService.awaitTermination(sleepTime, TimeUnit.MILLISECONDS) execService.shutdownNow() + () } - } + }.start() def runClosure(closure: => Unit, info: ReactionInfo): Unit = execService.execute(new RunnableWithInfo(closure, info)) @@ -56,8 +55,10 @@ private[jc] class PoolExecutor(threads: Int = 8, execFactory: Int => ExecutorSer private[jc] class PoolFutureExecutor(threads: Int = 8, execFactory: Int => ExecutorService) extends PoolExecutor(threads, execFactory) { private val execContext = ExecutionContext.fromExecutor(execService) - override def runClosure(closure: => Unit, info: ReactionInfo): Unit = + override def runClosure(closure: => Unit, info: ReactionInfo): Unit = { Future { closure }(execContext) + () + } } /** Create a pool from a Handler interface. The pool will submit tasks using a Handler.post() method. diff --git a/lib/src/main/scala/code/winitzki/jc/SmartPool.scala b/lib/src/main/scala/code/winitzki/jc/SmartPool.scala index 4da6c7f7..c499e515 100644 --- a/lib/src/main/scala/code/winitzki/jc/SmartPool.scala +++ b/lib/src/main/scala/code/winitzki/jc/SmartPool.scala @@ -48,8 +48,8 @@ class SmartPool(parallelism: Int) extends Pool { private val queue = new LinkedBlockingQueue[Runnable](maxQueueCapacity) val initialThreads: Int = parallelism - val secondsToRecycleThread = 1 - val shutdownWaitTimeMs = 200 + val secondsToRecycleThread = 1L + val shutdownWaitTimeMs = 200L private val executor = new ThreadPoolExecutor(initialThreads, parallelism, secondsToRecycleThread, TimeUnit.SECONDS, queue, newThreadFactory) @@ -61,12 +61,14 @@ class SmartPool(parallelism: Int) extends Pool { executor.shutdownNow() executor.awaitTermination(shutdownWaitTimeMs, TimeUnit.MILLISECONDS) executor.shutdownNow() - + () } - } + }.start() - override def runClosure(closure: => Unit, info: ReactionInfo): Unit = + override def runClosure(closure: => Unit, info: ReactionInfo): Unit = { executor.submit(new RunnableWithInfo(closure, info)) + () + } override def isInactive: Boolean = executor.isShutdown || executor.isTerminated } diff --git a/lib/src/main/scala/code/winitzki/jc/StaticAnalysis.scala b/lib/src/main/scala/code/winitzki/jc/StaticAnalysis.scala index c31411f0..235a9d1c 100644 --- a/lib/src/main/scala/code/winitzki/jc/StaticAnalysis.scala +++ b/lib/src/main/scala/code/winitzki/jc/StaticAnalysis.scala @@ -289,9 +289,9 @@ private object StaticAnalysis { .filter { case (mol, count) => singletonsInjected.getOrElse(mol, 0) < count } .map { case (mol, count) => val countInjected = singletonsInjected.getOrElse(mol, 0) - s"$mol $countInjected times instead of $count" + s"$mol injected $countInjected times instead of $count" } - if (foundErrors.nonEmpty) Seq(s"Too few singletons injected: ${foundErrors.mkString(", ")}") else Seq() + if (foundErrors.nonEmpty) Seq(s"Too few singletons injected: ${foundErrors.toList.sorted.mkString(", ")}") else Seq() } // Inspect the singletons actually injected. Their multiplicities must be not more than the declared multiplicities. @@ -300,9 +300,9 @@ private object StaticAnalysis { .filter { case (mol, count) => singletonsInjected.getOrElse(mol, 0) > count } .map { case (mol, count) => val countInjected = singletonsInjected.getOrElse(mol, 0) - s"$mol $countInjected times instead of $count" + s"$mol injected $countInjected times instead of $count" } - if (foundErrors.nonEmpty) Seq(s"Possibly too many singletons injected: ${foundErrors.mkString(", ")}") else Seq() + if (foundErrors.nonEmpty) Seq(s"Possibly too many singletons injected: ${foundErrors.toList.sorted.mkString(", ")}") else Seq() } } diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala index 59eddc0c..036c8e6f 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunBlockingSpec.scala @@ -4,7 +4,8 @@ import JoinRun._ import Library.withPool import org.scalatest.concurrent.TimeLimitedTests import org.scalatest.time.{Millis, Span} -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers} +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} +import scala.language.postfixOps import scala.concurrent.duration.DurationInt @@ -25,7 +26,7 @@ class JoinRunBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests w val timeLimit = Span(1500, Millis) - val warmupTimeMs = 50 + val warmupTimeMs = 50L def waitSome(): Unit = Thread.sleep(warmupTimeMs) @@ -261,11 +262,9 @@ class JoinRunBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests w ) g2() shouldEqual 1 // this should initially work d() // do not inject c(). Now the first reaction is blocked because second reaction cannot start. - waitSome() - g2(timeout = 100 millis)() shouldEqual None // this should be blocked now + g2(timeout = 300 millis)() shouldEqual None // this should be blocked now tp.shutdownNow() tp1.shutdownNow() - } def makeBlockingCheck(sleeping: => Unit, tp1: Pool): (B[Unit,Unit], B[Unit,Int]) = { @@ -317,7 +316,7 @@ class JoinRunBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests w it should "not block the smart threadpool with BlockingIdle(Thread.sleep)" in { val tp = new SmartPool(1) val (g, g2) = makeBlockingCheck(BlockingIdle{Thread.sleep(500)}, tp) - g2(timeout = 50 millis)() shouldEqual Some(1) // this should not be blocked + g2(timeout = 150 millis)() shouldEqual Some(1) // this should not be blocked tp.currentPoolSize shouldEqual 2 g() // now we know that the first reaction has finished tp.currentPoolSize shouldEqual 1 @@ -327,7 +326,7 @@ class JoinRunBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests w it should "implement BlockingIdle(BlockingIdle()) as BlockingIdle()" in { val tp = new SmartPool(1) val (g, g2) = makeBlockingCheck(BlockingIdle{BlockingIdle{Thread.sleep(300)}}, tp) - g2(timeout = 50 millis)() shouldEqual Some(1) // this should not be blocked + g2(timeout = 150 millis)() shouldEqual Some(1) // this should not be blocked tp.currentPoolSize shouldEqual 2 g() // now we know that the first reaction has finished tp.currentPoolSize shouldEqual 1 diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala index a7f944ed..a2258d29 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunSpec.scala @@ -2,15 +2,19 @@ package code.winitzki.jc import JoinRun._ import org.scalatest.concurrent.TimeLimitedTests -import org.scalatest.concurrent.Waiters.Waiter +import org.scalatest.concurrent.Waiters.{PatienceConfig, Waiter} import org.scalatest.time.{Millis, Span} import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + import scala.concurrent.duration._ +import scala.language.postfixOps class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with BeforeAndAfterEach { var tp0: Pool = _ + implicit val patienceConfig = PatienceConfig(timeout = Span(500, Millis)) + override def beforeEach(): Unit = { tp0 = new FixedPool(4) } @@ -21,7 +25,7 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo val timeLimit = Span(5000, Millis) - val warmupTimeMs = 50 + val warmupTimeMs = 50L def waitSome(): Unit = Thread.sleep(warmupTimeMs) @@ -40,7 +44,7 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo b.isBound shouldEqual false c.isBound shouldEqual false - join(tp0)(runSimple { case a(_) + c(_) => b() }) + join(runSimple { case a(_) + c(_) => b() }) a.isBound shouldEqual true b.isBound shouldEqual false @@ -63,12 +67,27 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo join(tp0)(runSimple { case a(_) + b(_) + c(_) => }) a.logSoup shouldEqual "Join{a + b + c => ...}\nNo molecules" + } + + it should "correctly list molecules present in soup" in { + val a = new M[Unit]("a") + val b = new M[Unit]("b") + val c = new M[Unit]("c") + val f = new B[Unit, Unit]("f") + + join(tp0)( + runSimple { case a(_) + b(_) + c(_) + f(_, r) => r() } + ) + a.logSoup shouldEqual "Join{a + b + c + f/B => ...}\nNo molecules" + a() a() b() - waitSome() - waitSome() - a.logSoup shouldEqual "Join{a + b + c => ...}\nMolecules: a() * 2, b()" + Thread.sleep(400) + a.logSoup shouldEqual "Join{a + b + c + f/B => ...}\nMolecules: a() * 2, b()" + c() + f() + a.logSoup shouldEqual "Join{a + b + c + f/B => ...}\nMolecules: a()" } it should "define a reaction with correct inputs with non-default pattern-matching at end of reaction" in { @@ -151,8 +170,7 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo it should "throw exception when join pattern is nonlinear" in { val thrown = intercept[Exception] { val a = new M[Unit]("a") - join(tp0)( runSimple { case a(_) + a(_) => () }) - a() + join( runSimple { case a(_) + a(_) => () }) } thrown.getMessage shouldEqual "Nonlinear pattern: a used twice" @@ -161,8 +179,7 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo it should "throw exception when join pattern is nonlinear, with blocking molecule" in { val thrown = intercept[Exception] { val a = new B[Unit,Unit]("a") - join(tp0)( runSimple { case a(_,r) + a(_,s) => () }) - a() + join( runSimple { case a(_,r) + a(_,s) => () }) } thrown.getMessage shouldEqual "Nonlinear pattern: a/B used twice" } @@ -227,7 +244,6 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo join(tp0)( runSimple { case a(x) + b(0) => a(x+1) }, runSimple { case a(z) + f(_, r) => r(z) }) a(1) b(2) - waitSome() f() shouldEqual 1 } @@ -339,4 +355,33 @@ class JoinRunSpec extends FlatSpec with Matchers with TimeLimitedTests with Befo result shouldEqual Some(()) } + it should "resume fault-tolerant reactions that contain blocking molecules" in { + val n = 20 + + val probabilityOfCrash = 0.5 + + val c = new M[Int]("counter") + val d = new B[Unit, Unit]("decrement") + val g = new B[Unit, Unit]("getValue") + val tp = new FixedPool(2) + + join(tp0)( + runSimple { case c(x) + d(_, r) => + if (scala.util.Random.nextDouble >= probabilityOfCrash) { c(x - 1); r() } else throw new Exception("crash! (it's OK, ignore this)") + }.withRetry onThreads tp, + runSimple { case c(0) + g(_, r) => r() } + ) + c(n) + (1 to n).foreach { _ => + if (d(timeout = 1500 millis)().isEmpty) { + println(JoinRun.errors.toList) // this should not happen, but will be helpful for debugging + } + } + + val result = g(timeout = 1500 millis)() + JoinRun.errors.exists(_.contains("Message: crash! (it's OK, ignore this)")) + tp.shutdownNow() + result shouldEqual Some(()) + } + } diff --git a/lib/src/test/scala/code/winitzki/jc/JoinRunUtilsSpec.scala b/lib/src/test/scala/code/winitzki/jc/JoinRunUtilsSpec.scala index 46ccb358..56041520 100644 --- a/lib/src/test/scala/code/winitzki/jc/JoinRunUtilsSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/JoinRunUtilsSpec.scala @@ -9,7 +9,7 @@ class JoinRunUtilsSpec extends FlatSpec with Matchers with TimeLimitedTests { val timeLimit = Span(500, Millis) - val warmupTimeMs = 50 + val warmupTimeMs = 50L def waitSome(): Unit = Thread.sleep(warmupTimeMs) diff --git a/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala b/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala index b1dffc81..08a5d613 100644 --- a/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/LibrarySpec.scala @@ -15,7 +15,7 @@ class LibrarySpec extends FlatSpec with Matchers with TimeLimitedTests { val timeLimit = Span(500, Millis) - val warmupTimeMs = 50 + val warmupTimeMs = 50L val patienceConfig = PatienceConfig(timeout = Span(500, Millis)) @@ -121,4 +121,11 @@ class LibrarySpec extends FlatSpec with Matchers with TimeLimitedTests { tp.shutdownNow() } + behavior of "cleanup with resource" + + it should "catch exceptions and not fail" in { + val tryX = cleanup(1)(_ => throw new Exception("ignore this exception"))(_ => throw new Exception("foo")) + tryX.isFailure shouldEqual true + } + } \ No newline at end of file diff --git a/lib/src/test/scala/code/winitzki/jc/MutableBagSpec.scala b/lib/src/test/scala/code/winitzki/jc/MutableBagSpec.scala index ced96942..f4f7c006 100644 --- a/lib/src/test/scala/code/winitzki/jc/MutableBagSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/MutableBagSpec.scala @@ -23,6 +23,18 @@ class MutableBagSpec extends FlatSpec with Matchers with TimeLimitedTests { b.getOne(1) shouldEqual Some("a") } + it should "make a bag with one element" in { + val b = MutableBag.of(1, "a") + b.size shouldEqual 1 + b.getOne(1) shouldEqual Some("a") + } + + it should "print a bag" in { + val b = MutableBag.of(1, "a") + b.addToBag(2, "b") + b.toString shouldEqual "Map(2 -> Map(b -> 1), 1 -> Map(a -> 1))" + } + it should "add two elements with the same key and the same value, them remove them both" in { val b = new MutableBag[Int, String] b.addToBag(1, "a") diff --git a/lib/src/test/scala/code/winitzki/jc/PoolSpec.scala b/lib/src/test/scala/code/winitzki/jc/PoolSpec.scala index 2166141d..98fef0a5 100644 --- a/lib/src/test/scala/code/winitzki/jc/PoolSpec.scala +++ b/lib/src/test/scala/code/winitzki/jc/PoolSpec.scala @@ -13,8 +13,6 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { val patienceConfig = PatienceConfig(timeout = Span(500, Millis)) - val dummyInfo = emptyReactionInfo - behavior of "thread with info" def checkPool(tp: Pool): Unit = { @@ -25,10 +23,10 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { case t : ThreadWithInfo => Some(t.reactionInfo) case _ => None } - waiter { threadInfoOptOpt shouldEqual Some(Some(dummyInfo)) } + waiter { threadInfoOptOpt shouldEqual Some(Some(emptyReactionInfo)) } waiter.dismiss() - }, dummyInfo) + }, emptyReactionInfo) waiter.await()(patienceConfig, implicitly[Position]) } @@ -88,7 +86,7 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { } catch { case e: InterruptedException => () } - }, dummyInfo) + }, emptyReactionInfo) waiter.await()(patienceConfig, implicitly[Position]) @@ -110,7 +108,7 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { other.printStackTrace() waiter { false shouldEqual true } } - }, dummyInfo) + }, emptyReactionInfo) Thread.sleep(20) tp.shutdownNow() @@ -134,7 +132,7 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { } catch { case e: InterruptedException => () } - }, dummyInfo) + }, emptyReactionInfo) waiter.await()(patienceConfig, implicitly[Position]) @@ -156,7 +154,7 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { other.printStackTrace() waiter.dismiss() } - }, dummyInfo) + }, emptyReactionInfo) Thread.sleep(20) tp.shutdownNow() @@ -164,4 +162,31 @@ class PoolSpec extends FlatSpec with Matchers with TimeLimitedTests { waiter.await()(patienceConfig, implicitly[Position]) } + behavior of "smart thread" + + it should "run tasks on ordinary threads" in { + var x = 0 + new RunnableWithInfo({x = 1}, emptyReactionInfo).run() + + x shouldEqual 1 + + } + + it should "run tasks on smart threads and store info" in { + val waiter = new Waiter + + var x = 0 + val runnable = new RunnableWithInfo({ + x = 1; waiter.dismiss() + }, emptyReactionInfo) + val smartThread = new ThreadWithInfo(runnable) + smartThread.reactionInfo shouldEqual None // too early now, the runnable has not yet started + smartThread.start() + + waiter.await()(patienceConfig, implicitly[Position]) + + x shouldEqual 1 + smartThread.reactionInfo shouldEqual Some(emptyReactionInfo) + } + } diff --git a/macros/src/main/scala/code/winitzki/jc/Macros.scala b/macros/src/main/scala/code/winitzki/jc/Macros.scala index d46db455..544d8c02 100644 --- a/macros/src/main/scala/code/winitzki/jc/Macros.scala +++ b/macros/src/main/scala/code/winitzki/jc/Macros.scala @@ -142,10 +142,11 @@ object Macros { case DefDef(_, TermName("applyOrElse"), _, _, _, Match(_, list)) => info = list - // this is matched by a closure which is not a partial function + // this is matched by a closure which is not a partial function. Not used now. + /* case Function(List(ValDef(_, TermName(_), TypeTree(), EmptyTree)), Match(Ident(TermName(_)), list)) => info = list - + */ case _ => super.traverse(tree) } @@ -243,10 +244,12 @@ object Macros { // After traversing the subtrees, we append this molecule information. inputMolecules.append((t.symbol, flag1, Some(flag2), getSha1(binder1))) - // matcher with wrong number of arguments - neither 1 nor 2 + // matcher with wrong number of arguments - neither 1 nor 2. This seems to be never called. + /* case UnApply(Apply(Select(t@Ident(TermName(_)), TermName("unapply")), List(Ident(TermName("")))), _) if t.tpe <:< typeOf[Molecule] => inputMolecules.append((t.symbol, WrongReplyVarF, None, getSha1(t))) + */ // possibly a molecule injection case Apply(Select(t@Ident(TermName(_)), TermName("apply")), binder) => diff --git a/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala b/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala index bdfdc66c..d9bea043 100644 --- a/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala +++ b/macros/src/test/scala/code/winitzki/jc/MacrosSpec.scala @@ -5,10 +5,11 @@ import Macros.{getName, rawTree, m,b, run => &} import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} import scala.concurrent.duration.DurationInt +import scala.language.postfixOps class MacrosSpec extends FlatSpec with Matchers with BeforeAndAfterEach { - val warmupTimeMs = 50 + val warmupTimeMs = 50L var tp0: Pool = _ @@ -378,6 +379,7 @@ class MacrosSpec extends FlatSpec with Matchers with BeforeAndAfterEach { "val r = & { case e() => }" shouldNot compile // no pattern variable in a non-blocking molecule "e" "val r = & { case e(_,_) => }" shouldNot compile // two pattern variables in a non-blocking molecule "e" + "val r = & { case e(_,_,_) => }" shouldNot compile // two pattern variables in a non-blocking molecule "e" "val r = & { case a() => }" shouldNot compile // no pattern variable for reply in "a" "val r = & { case a(_) => }" shouldNot compile // no pattern variable for reply in "a" @@ -561,7 +563,13 @@ class MacrosSpec extends FlatSpec with Matchers with BeforeAndAfterEach { val z = getName (z, getName) } - y shouldEqual("z", "y") + y shouldEqual(("z", "y")) + + val (y1,y2) = { + val z = getName + (z, getName) + } + (y1, y2) shouldEqual(("z", "x$7")) } it should "correctly recognize nested injections of non-blocking molecules" in {