Skip to content

Commit

Permalink
Merge pull request #24 from winitzki/fix/tests-timeouts-2
Browse files Browse the repository at this point in the history
Some minor changes in tests and cleanup
  • Loading branch information
winitzki authored Dec 20, 2016
2 parents ef7eda6 + 48d83eb commit ab698e9
Show file tree
Hide file tree
Showing 30 changed files with 347 additions and 121 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
[![Build Status](https://travis-ci.org/winitzki/joinrun-scala.svg?branch=master)](https://travis-ci.org/winitzki/joinrun-scala)
[![Coverage Status](https://codecov.io/gh/winitzki/joinrun-scala/coverage.svg?branch=master)](https://codecov.io/gh/winitzki/joinrun-scala?branch=master)
[![Version](http://img.shields.io/badge/version-0.7.7-blue.svg?style=flat)](https://github.com/winitzki/joinrun-scala/releases)
[![License](https://img.shields.io/github/license/mashape/apistatus.svg)](https://opensource.org/licenses/MIT)

[![Join the chat at https://gitter.im/joinrun-scala/Lobby](https://badges.gitter.im/joinrun-scala/Lobby.svg)](https://gitter.im/joinrun-scala/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package code.winitzki.benchmark
import java.time.LocalDateTime

import code.winitzki.benchmark.Common._
import code.winitzki.jc.{Pool, FixedPool}
import code.winitzki.jc.Pool
import code.winitzki.jc.Macros._
import code.winitzki.jc.JoinRun._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ object Benchmarks9 {
val all_done = m[Int]
val f = b[LocalDateTime,Long]

val tp = new SmartPool(MainApp.threads) // this benchmark will not work with a fixed pool
val tp = new SmartPool(MainAppConfig.threads) // this benchmark will not work with a fixed pool

join(tp)(
run { case all_done(0) + f(tInit, r) => r(elapsed(tInit)) },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.time.LocalDateTime
import java.time.temporal.ChronoUnit

object Common {
val warmupTimeMs = 50
val warmupTimeMs = 50L

def elapsed(initTime: LocalDateTime): Long = initTime.until(LocalDateTime.now, ChronoUnit.MILLIS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ This file is not a part of JoinRun. Only the benchmark application uses this fil
for performance comparisons between JoinRun and ScalaJoin.
Some minor changes were made to accommodate updates in the Scala standard library since 2011.
This code is not being maintained. Compiling this file will give lots of warnings.
*/

/** A module providing constracts for join patterns.
Expand Down Expand Up @@ -267,7 +269,7 @@ class SynName[Arg, R](implicit owner: Join, argT:ClassManifest[Arg], resT:ClassM
object and{
// def unapply(attr:(Set[NameBase], Queue[(NameBase, Any)], PartialFunction[Any, Any], Int)) = {
def unapply(attr:Any) = {
Some(attr,attr)
Some((attr,attr))
}

}
Expand Down
12 changes: 8 additions & 4 deletions benchmark/src/main/scala/code/winitzki/benchmark/MainApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@ import code.winitzki.benchmark.Benchmarks9._
import code.winitzki.jc.{FixedPool, Pool}
import code.winitzki.jc.JoinRun.{defaultJoinPool, defaultReactionPool}

object MainAppConfig {

val n = 50000

val threads = 8
}

object MainApp extends App {
import MainAppConfig._
val version = "0.0.5"

def run3times(task: => Long): Long = {
Expand All @@ -24,10 +32,6 @@ object MainApp extends App {
(result + prime2 + 1) / 2
}

val n = 50000

val threads = 8

println(s"Benchmark parameters: count to $n, threads = $threads")

Seq[(String, (Int, Pool) => Long)](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.scalatest.{FlatSpec, Matchers}

class JiansenFairnessSpec extends FlatSpec with Matchers with TimeLimitedTests {

val timeLimit = Span(500, Millis)
val timeLimit = Span(2000, Millis)

// fairness over reactions:
// We have n molecules A:JA[Unit], which can all interact with a single molecule C:JA[(Int,Array[Int])].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,25 @@ class MapReduceSpec extends FlatSpec with Matchers {
val fetch = b[Unit,Int]

val tp = new FixedPool(4)

// declare the reaction for "map"
join(tp)(
& { case carrier(a) => val res = f(a); interm(res) }
& { case carrier(x) => val res = f(x); interm(res) }
)

// reactions for "reduce" must be together since they share "accum"
join(tp)(
& { case accum((n, b)) + interm(res) if n > 0 =>
& { case accum((n, b)) + interm(res) =>
accum((n+1, reduceB(b, res) ))
},
& { case accum((0, _)) + interm(res) => accum((1, res)) },
& { case accum((n, b)) + fetch(_, reply) if n == arr.size => reply(b) }
)

// inject molecules
accum((0, 0))
arr.foreach(i => carrier(i))
val result = fetch()
result shouldEqual 338350
result shouldEqual arr.map(f).reduce(reduceB) // 338350
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class MergesortSpec extends FlatSpec with Matchers {
)

// inject lower-level mergesort
mergesort(part1, sorted1) + mergesort(part2, sorted2)
mergesort((part1, sorted1)) + mergesort((part2, sorted2))
}
}
)
Expand Down
19 changes: 18 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,24 @@ val commonSettings = Defaults.coreDefaultSettings ++ Seq(
resolvers += Resolver.sonatypeRepo("snapshots"),
resolvers += Resolver.sonatypeRepo("releases"),
resolvers += "Typesafe releases" at "http://repo.typesafe.com/typesafe/releases",
scalacOptions ++= Seq()

scalacOptions ++= Seq( // https://tpolecat.github.io/2014/04/11/scalac-flags.html
// "-deprecation",
"-unchecked",
"-encoding", "UTF-8", // yes, this is 2 args
"-feature",
"-language:existentials",
"-language:higherKinds",
"-language:implicitConversions",
// "-Xfatal-warnings",
"-Xlint",
// "-Yno-adapted-args", // Makes calling a() fail to substitute a Unit argument into a.apply(x: Unit)
"-Ywarn-dead-code", // N.B. doesn't work well with the ??? hole
"-Ywarn-numeric-widen",
"-Ywarn-value-discard",
// "-Xfuture", // Makes Benchmarks code fail
"-Ywarn-unused-import" // 2.11 only
)
)

tutSettings
Expand Down
16 changes: 9 additions & 7 deletions docs/chymyst04.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ val interm = m[B]
Therefore, we need a reaction of this shape:

```scala
run { case carrier(a) => val res = f(a); interm(res) }
run { case carrier(x) => val res = f(x); interm(res) }

```

Expand Down Expand Up @@ -100,20 +100,22 @@ Let us change the type of `accum` to carry a tuple `(Int, B)`.
The first element of the tuple will now represent a counter, which indicates how many intermediate results we have already processed.
Reactions with `accum` will increment the counter; the reaction with `fetch` will proceed only if the counter is equal to the length of the array.

We will also include a condition on the counter that will start the accumulation when the counter is equal to 0.

```scala
val accum = m[(Int, B)]

run { case accum((n, b)) + interm(res) if n > 0 =>
run { case accum((n, b)) + interm(res) =>
accum((n+1, reduceB(b, res) ))
},
run { case accum((0, _)) + interm(res) => accum((1, res)) },
run { case accum((n, b)) + fetch(_, reply) if n == arr.size => reply(b) }

```

We can now inject all `carrier` molecules, a single `accum((0, null))` molecule, and a `fetch()` molecule.
What value should we inject with `accum` initially?
When the first `interm(res)` molecule arrives, we will need to call `reduceB(x, res)` with some value `x` of type `B`.
Since we assume that `B` is a monoid, there must be a special value, say `bZero`, such that `reduceB(bZero, res)==res`.
So `bZero` is the value we need to inject on the initial `accum` molecule.

We can now inject all `carrier` molecules, a single `accum((0, bZero))` molecule, and a `fetch()` molecule.
Because of the guard condition, the reaction with `fetch()` will not run until all intermediate results have been accumulated.

Here is the complete code for this example (see also `MapReduceSpec.scala` in the unit tests).
Expand All @@ -139,7 +141,7 @@ object C extends App {

// declare the reaction for "map"
join(
run { case carrier(a) => val res = f(a); interm(res) }
run { case carrier(x) => val res = f(x); interm(res) }
)

// reactions for "reduce" must be together since they share "accum"
Expand Down
11 changes: 6 additions & 5 deletions joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package code.winitzki.test

import code.winitzki.jc.{FixedPool, SmartPool}
import code.winitzki.jc.FixedPool
import code.winitzki.jc.JoinRun._
import code.winitzki.jc.Macros.{run => &}
import code.winitzki.jc.Macros._
Expand All @@ -10,6 +10,7 @@ import org.scalatest.concurrent.Waiters.Waiter
import org.scalatest.time.{Millis, Span}

import scala.concurrent.duration._
import scala.language.postfixOps

class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests {

Expand Down Expand Up @@ -70,14 +71,14 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests {
collect(0)

val numberOfFailures = (1 to 10000).map { _ =>
if (f(timeout = 1000.millis)().isEmpty) 1 else 0
if (f(timeout = 1000 millis)().isEmpty) 1 else 0
}.sum

// we used to have about 4% numberOfFailures (but we get zero failures if we do not nullify the semaphore!) and about 4 numberOfFalseReplies in 100,000.
// now it seems to pass even with a million iterations (but that's too long for Travis).
val numberOfFalseReplies = get()

(numberOfFailures, numberOfFalseReplies) shouldEqual (0,0)
(numberOfFailures, numberOfFalseReplies) shouldEqual ((0,0))

tp.shutdownNow()
}
Expand Down Expand Up @@ -163,10 +164,10 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests {
& { case d(x) + incr(_, r) => r(); wait(); d(x+1) }
)
d(100)
incr() // update started and is waiting for e()
incr() // reaction 3 started and is waiting for e()
get_d(timeout = 400 millis)() shouldEqual None
e()
get_d(timeout = 400 millis)() shouldEqual Some(101)
get_d(timeout = 800 millis)() shouldEqual Some(101)

tp.shutdownNow()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.scalatest.{FlatSpec, Matchers}

import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag

class ParallelOrSpec extends FlatSpec with Matchers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package code.winitzki.test
import code.winitzki.jc.JoinRun._
import code.winitzki.jc.Macros.{run => &}
import code.winitzki.jc.Macros._
import code.winitzki.jc.{CachedPool, FixedPool, SmartPool}
import code.winitzki.jc.{FixedPool, SmartPool}
import org.scalatest.concurrent.TimeLimitedTests
import org.scalatest.time.{Millis, Span}
import org.scalatest.{FlatSpec, Matchers}
import scala.language.postfixOps

import scala.concurrent.duration._

Expand Down Expand Up @@ -117,6 +118,24 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests
tp.shutdownNow()
}

it should "signal error when a singleton is consumed multiple times by reaction" in {

val tp = new FixedPool(3)

val thrown = intercept[Exception] {
val d = m[Unit]
val e = m[Unit]

join(tp)(
& { case e(_) + d(_) + d(_) => d() },
& { case _ => d() } // singleton
)
}
thrown.getMessage shouldEqual "In Join{d + d + e => ...}: Incorrect chemistry: singleton (d) consumed 2 times by reaction d(_) + d(_) + e(_) => d()"

tp.shutdownNow()
}

it should "signal error when a singleton is injected but not bound to any join definition" in {

val tp = new FixedPool(3)
Expand Down Expand Up @@ -205,6 +224,27 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests
tp.shutdownNow()
}

it should "refuse to define a blocking molecule as a singleton" in {

val tp = new FixedPool(1)

val c = m[Int]
val d = m[Int]
val f = b[Unit, Unit]

val thrown = intercept[Exception] {
join(tp)(
& { case f(_, r) => r() },
& { case c(x) + d(_) => d(x) },
& { case _ => f(); d(0) }
)
}

thrown.getMessage shouldEqual "In Join{c + d => ...; f/B => ...}: Refusing to inject molecule f/B() as a singleton (must be a non-blocking molecule)"

tp.shutdownNow()
}

it should "report that the value of a singleton is ready even if called early" in {

val tp = new FixedPool(1)
Expand Down Expand Up @@ -303,4 +343,44 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests
tp3.shutdownNow()
}

it should "signal error when a singleton is injected fewer times than declared" in {

val tp = new FixedPool(3)

val thrown = intercept[Exception] {
val c = b[Unit, String]
val d = m[Unit]
val e = m[Unit]
val f = m[Unit]

join(tp)(
& { case d(_) +e(_) + f(_) + c(_, r) => r("ok"); d(); e(); f() },
& { case _ => if (false) { d(); e() }; f(); } // singletons d() and e() will actually not be injected because of a condition
)
}
thrown.getMessage shouldEqual "In Join{c/B + d + e + f => ...}: Too few singletons injected: d injected 0 times instead of 1, e injected 0 times instead of 1"

tp.shutdownNow()
}

it should "signal no error (but a warning) when a singleton is injected more times than declared" in {

val tp = new FixedPool(3)

val c = b[Unit, String]
val d = m[Unit]
val e = m[Unit]
val f = m[Unit]

val warnings = join(tp)(
& { case d(_) + e(_) + f(_) + c(_, r) => r("ok"); d(); e(); f() },
& { case _ => (1 to 2).foreach { _ => d(); e() }; f(); } // singletons d() and e() will actually be injected more times
)

warnings.errors shouldEqual Seq()
warnings.warnings shouldEqual Seq("Possibly too many singletons injected: d injected 2 times instead of 1, e injected 2 times instead of 1")

tp.shutdownNow()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class StaticAnalysisSpec extends FlatSpec with Matchers with TimeLimitedTests {

val timeLimit = Span(1000, Millis)

val warmupTimeMs = 50
val warmupTimeMs = 50L

def waitSome(): Unit = Thread.sleep(warmupTimeMs)

Expand Down
Loading

0 comments on commit ab698e9

Please sign in to comment.