Skip to content

Commit

Permalink
Merge pull request #31 from Chymyst/feature/wart-remover
Browse files Browse the repository at this point in the history
add options for wart remover
  • Loading branch information
winitzki authored Dec 21, 2016
2 parents 15f62aa + da0e240 commit 0a6faef
Show file tree
Hide file tree
Showing 18 changed files with 109 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object Benchmarks9 {
collect(0)

val numberOfFailures = (1 to count*counterMultiplier).map { _ =>
if (f(timeout = 1.seconds)().isEmpty) 1 else 0
if (f.timeout(1.seconds)().isEmpty) 1 else 0
}.sum

// In this benchmark, we used to have about 4% numberOfFailures and about 2 numberOfFalseReplies in 100,000
Expand Down
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ val commonSettings = Defaults.coreDefaultSettings ++ Seq(

tutSettings

lazy val errorsForWartRemover = Seq(Wart.EitherProjectionPartial, Wart.Enumeration, Wart.Equals, Wart.ExplicitImplicitTypes, Wart.FinalCaseClass, Wart.FinalVal, Wart.LeakingSealed, Wart.NoNeedForMonad, Wart.Return, Wart.StringPlusAny, Wart.TraversableOps, Wart.TryPartial)

lazy val warningsForWartRemover = Seq() //Seq(Wart.Any, Wart.AsInstanceOf, Wart.ImplicitConversion, Wart.IsInstanceOf, Wart.JavaConversions, Wart.Option2Iterable, Wart.OptionPartial, Wart.Nothing, Wart.Product, Wart.Serializable, Wart.ToString, Wart.While)

lazy val joinrun = (project in file("joinrun"))
.settings(commonSettings: _*)
.settings(
Expand All @@ -61,6 +65,8 @@ lazy val macros = (project in file("macros"))
.settings(commonSettings: _*)
.settings(
name := "macros",
wartremoverWarnings in (Compile, compile) ++= warningsForWartRemover,
wartremoverErrors in (Compile, compile) ++= errorsForWartRemover,
libraryDependencies ++= Seq(
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.scalatest" %% "scalatest" % "3.0.0" % "test",
Expand All @@ -78,6 +84,8 @@ lazy val lib = (project in file("lib"))
name := "lib",
parallelExecution in Test := false,
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
wartremoverWarnings in (Compile, compile) ++= warningsForWartRemover,
wartremoverErrors in (Compile, compile) ++= errorsForWartRemover,
libraryDependencies ++= Seq(
// "com.typesafe.akka" %% "akka-actor" % "2.4.12",
"org.scalacheck" %% "scalacheck" % "1.13.4" % "test",
Expand Down
2 changes: 1 addition & 1 deletion docs/chymyst03.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ val f = b[Unit, Int]
site(...)

// call `f` with 200ms timeout:
val x: Option[Int] = f(timeout = 200 millis)()
val x: Option[Int] = f.timeout(200 millis)()

```

Expand Down
2 changes: 1 addition & 1 deletion docs/joinrun.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ import scala.concurrent.duration.DurationInt

val f = b[Int, String]

val result: Option[String] = f(timeout = 100 millis)(10)
val result: Option[String] = f.timeout(100 millis)(10)

```

Expand Down
16 changes: 8 additions & 8 deletions joinrun/src/test/scala/code/winitzki/test/MoreBlockingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests {
site(tp)(
& { case f(_, r) => val res = r(123); waiter { res shouldEqual true }; waiter.dismiss() }
)
f(timeout = 10.seconds)() shouldEqual Some(123)
f.timeout(10.seconds)() shouldEqual Some(123)

waiter.await()
tp.shutdownNow()
Expand All @@ -71,7 +71,7 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests {
collect(0)

val numberOfFailures = (1 to 10000).map { _ =>
if (f(timeout = 1000 millis)().isEmpty) 1 else 0
if (f.timeout(1000 millis)().isEmpty) 1 else 0
}.sum

// we used to have about 4% numberOfFailures (but we get zero failures if we do not nullify the semaphore!) and about 4 numberOfFalseReplies in 100,000.
Expand All @@ -94,7 +94,7 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests {
)
a.setLogLevel(4)
a.logSoup shouldEqual "Site{a + f/B => ...}\nNo molecules"
f(timeout = 100 millis)() shouldEqual None
f.timeout(100 millis)() shouldEqual None
// there should be no a(0) now, because the reaction has not yet run ("f" timed out and was withdrawn, so no molecules)
a.logSoup shouldEqual "Site{a + f/B => ...}\nNo molecules"
a(123)
Expand Down Expand Up @@ -122,7 +122,7 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests {
)

a.logSoup shouldEqual "Site{a + g/B => ...; f/B => ...}\nNo molecules"
f(timeout = 300 millis)() shouldEqual None // this times out because the f => ... reaction is blocked by g(), which is waiting for a()
f.timeout(300 millis)() shouldEqual None // this times out because the f => ... reaction is blocked by g(), which is waiting for a()
a.logSoup shouldEqual "Site{a + g/B => ...; f/B => ...}\nMolecules: g/B()" // f() should have been removed but g() remains
a(123) // Now g() starts reacting with a() and unblocks the "f" reaction, which should try to reply to "f" after "f" timed out.
// The attempt to reply to "f" should fail, which is indicated by returning "false" from "r(x)". This is verified by the "waiter".
Expand Down Expand Up @@ -165,9 +165,9 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests {
)
d(100)
incr() // reaction 3 started and is waiting for e()
get_d(timeout = 400 millis)() shouldEqual None
get_d.timeout(400 millis)() shouldEqual None
e()
get_d(timeout = 800 millis)() shouldEqual Some(101)
get_d.timeout(800 millis)() shouldEqual Some(101)

tp.shutdownNow()
}
Expand Down Expand Up @@ -213,10 +213,10 @@ class MoreBlockingSpec extends FlatSpec with Matchers with TimeLimitedTests {
& { case wait(_, r) + e(_) => r() },
& { case d(x) + incr(_, r) => wait(); r(); f(x+1) }
)
d.setLogLevel(3)
d.setLogLevel(4)
d(100)
c() // update started and is waiting for e(), which should come after incr() gets its reply
get_f(timeout = 400 millis)() shouldEqual None
get_f.timeout(400 millis)() shouldEqual None

tp.shutdownNow()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ class ParallelOrSpec extends FlatSpec with Matchers {
parallelOr(slowFalse, fastFalse, tp)() shouldEqual false
parallelOr(fastFalse, slowTrue, tp)() shouldEqual true

parallelOr(never, fastFalse, tp)(timeout = 200 millis)() shouldEqual None
parallelOr(never, slowFalse, tp)(timeout = 200 millis)() shouldEqual None
parallelOr(never, fastFalse, tp).timeout(200 millis)() shouldEqual None
parallelOr(never, slowFalse, tp).timeout(200 millis)() shouldEqual None

tp.shutdownNow()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests

(1 to 100).foreach { i =>
d(s"bad $i") // this "d" should not be emitted, even though "d" is sometimes not in the soup due to reactions!
// f(timeout = 200 millis)() shouldEqual Some("ok")
// f.timeout(200 millis)() shouldEqual Some("ok")
f()
}

Expand All @@ -54,7 +54,7 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests
(1 to 10).foreach { j =>
d(s"bad $i $j") // this "d" should not be emitted, even though we are immediately after a reaction site,
// and even if the initial d() emission was done late
f(timeout = 200 millis)() shouldEqual Some("ok")
f.timeout(200 millis)() shouldEqual Some("ok")
}

}
Expand Down Expand Up @@ -303,11 +303,11 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests
& { case d(x) + stabilize_d(_, r) => d(x); r() }, // Await stabilizing the presence of d
& { case _ => d(n) } // singleton
)
stabilize_d(timeout = 500 millis)()
stabilize_d.timeout(500 millis)()
d.volatileValue shouldEqual n

(n+1 to n+delta_n).map { i =>
incr(timeout = 500 millis)() shouldEqual Some(())
incr.timeout(500 millis)() shouldEqual Some(())

i - d.volatileValue // this is mostly 0 but sometimes 1
}.sum should be > 0 // there should be some cases when d.value reads the previous value
Expand All @@ -331,12 +331,12 @@ class SingletonMoleculeSpec extends FlatSpec with Matchers with TimeLimitedTests
& { case d(x) + stabilize_d(_, r) => d(x); r() } onThreads tp1, // Await stabilizing the presence of d
& { case _ => d(100) } // singleton
)
stabilize_d(timeout = 500 millis)() shouldEqual Some(())
stabilize_d.timeout(500 millis)() shouldEqual Some(())
d.volatileValue shouldEqual 100
incr(timeout = 500 millis)() shouldEqual Some(()) // update started and is waiting for e()
incr.timeout(500 millis)() shouldEqual Some(()) // update started and is waiting for e()
d.volatileValue shouldEqual 100 // We don't have d() present in the soup, but we can read its previous value.
e()
stabilize_d(timeout = 500 millis)() shouldEqual Some(())
stabilize_d.timeout(500 millis)() shouldEqual Some(())
d.volatileValue shouldEqual 101

tp1.shutdownNow()
Expand Down
39 changes: 21 additions & 18 deletions lib/src/main/scala/code/winitzki/jc/JoinRun.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ and Philipp Haller (http://lampwww.epfl.ch/~phaller/joins/index.html, 2008).
* */

import java.util.UUID
import java.util.concurrent.{Semaphore, TimeUnit, ConcurrentLinkedQueue}
import java.util.concurrent.{ConcurrentLinkedQueue, Semaphore, TimeUnit}

import code.winitzki.jc.JoinRunUtils.PersistentHashCode
import JoinRunUtils._

import scala.collection.mutable
import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -68,12 +68,12 @@ object JoinRun {
* None if we cannot determine anything because information is insufficient.
*/
private[jc] def matcherIsWeakerThan(info: InputMoleculeInfo): Option[Boolean] = {
if (molecule != info.molecule) Some(false)
if (molecule =!= info.molecule) Some(false)
else flag match {
case Wildcard | SimpleVar => Some(true)
case OtherInputPattern(matcher1) => info.flag match {
case SimpleConst(c) => Some(matcher1.isDefinedAt(c))
case OtherInputPattern(_) => if (sha1 == info.sha1) Some(true) else None // We can reliably determine identical matchers.
case OtherInputPattern(_) => if (sha1 === info.sha1) Some(true) else None // We can reliably determine identical matchers.
case _ => Some(false) // Here we can reliably determine that this matcher is not weaker.
}
case SimpleConst(c) => Some(info.flag match {
Expand All @@ -85,7 +85,7 @@ object JoinRun {
}

private[jc] def matcherIsWeakerThanOutput(info: OutputMoleculeInfo): Option[Boolean] = {
if (molecule != info.molecule) Some(false)
if (molecule =!= info.molecule) Some(false)
else flag match {
case Wildcard | SimpleVar => Some(true)
case OtherInputPattern(matcher1) => info.flag match {
Expand All @@ -103,7 +103,7 @@ object JoinRun {

// Here "similar" means either it's definitely weaker or it could be weaker (but it is definitely not stronger).
private[jc] def matcherIsSimilarToOutput(info: OutputMoleculeInfo): Option[Boolean] = {
if (molecule != info.molecule) Some(false)
if (molecule =!= info.molecule) Some(false)
else flag match {
case Wildcard | SimpleVar => Some(true)
case OtherInputPattern(matcher1) => info.flag match {
Expand Down Expand Up @@ -309,7 +309,7 @@ object JoinRun {
private[jc] sealed trait AbsMolValue[T] {
def getValue: T

override def toString: String = getValue match { case () => ""; case v@_ => v.toString }
override def toString: String = getValue match { case () => ""; case v@_ => v.asInstanceOf[T].toString }
}

/** Container for the value of a non-blocking molecule.
Expand All @@ -332,8 +332,15 @@ object JoinRun {
override def getValue: T = v
}

// Abstract molecule emitter. This type is used in collections of molecules that do not require knowledge of molecule types.
abstract sealed class Molecule extends PersistentHashCode {
/** Abstract molecule emitter class.
* This class is not parameterized b type and is used in collections of molecules that do not require knowledge of molecule types.
*
*/
sealed trait Molecule extends PersistentHashCode {

val name: String

override def toString: String = (if (name.isEmpty) "<no name>" else name) + (if (isBlocking) "/B" else "")

/** Check whether the molecule is already bound to a reaction site.
* Note that molecules can be emitted only if they are bound.
Expand Down Expand Up @@ -382,15 +389,13 @@ object JoinRun {
* @param name Name of the molecule, used for debugging only.
* @tparam T Type of the value carried by the molecule.
*/
final class M[T](val name: String) extends Molecule with (T => Unit) {
final class M[T](val name: String) extends (T => Unit) with Molecule {
/** Emit a non-blocking molecule.
*
* @param v Value to be put onto the emitted molecule.
*/
def apply(v: T): Unit = site.emit[T](this, MolValue(v))

override def toString: String = if (name.isEmpty) "<no name>" else name

def unapply(arg: UnapplyArg): Option[T] = arg match {

case UnapplyCheckSimple(inputMoleculesProbe) => // used only by _go
Expand Down Expand Up @@ -461,15 +466,15 @@ object JoinRun {
@volatile var replyRepeated: Boolean = false

private[jc] def releaseSemaphore(): Unit = synchronized {
if (semaphore != null) semaphore.release()
if (Option(semaphore).isDefined) semaphore.release()
}

private[jc] def deleteSemaphore(): Unit = synchronized {
semaphore = null
}

private[jc] def acquireSemaphore(timeoutNanos: Option[Long]): Boolean =
if (semaphore != null)
if (Option(semaphore).isDefined)
timeoutNanos match {
case Some(nanos) => semaphore.tryAcquire(nanos, TimeUnit.NANOSECONDS)
case None => semaphore.acquire(); true
Expand Down Expand Up @@ -502,7 +507,7 @@ object JoinRun {
* @tparam T Type of the value carried by the molecule.
* @tparam R Type of the value replied to the caller via the "reply" action.
*/
final class B[T, R](val name: String) extends Molecule with (T => R) {
final class B[T, R](val name: String) extends (T => R) with Molecule {

/** Emit a blocking molecule and receive a value when the reply action is performed.
*
Expand All @@ -518,11 +523,9 @@ object JoinRun {
* @param v Value to be put onto the emitted molecule.
* @return Non-empty option if the reply was received; None on timeout.
*/
def apply(timeout: Duration)(v: T): Option[R] =
def timeout(timeout: Duration)(v: T): Option[R] =
site.emitAndReplyWithTimeout[T,R](timeout.toNanos, this, v, new ReplyValue[T,R](molecule = this))

override def toString: String = name + "/B"

def unapply(arg: UnapplyArg): Option[(T, ReplyValue[T,R])] = arg match {

case UnapplyCheckSimple(inputMoleculesProbe) => // used only by _go
Expand Down
14 changes: 12 additions & 2 deletions lib/src/main/scala/code/winitzki/jc/JoinRunUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ private[jc] object JoinRunUtils {

def getSha1(c: Any): String = sha1Digest.digest(c.toString.getBytes("UTF-8")).map("%02X".format(_)).mkString

def flatten[T](optionSet: Option[Set[T]]): Set[T] = optionSet.getOrElse(Set())
def flatten[T](optionSeq: Option[Seq[T]]): Seq[T] = optionSeq.getOrElse(Seq())
// def flatten[T](optionSet: Option[Set[T]]): Set[T] = optionSet.getOrElse(Set())
// def flatten[T](optionSeq: Option[Seq[T]]): Seq[T] = optionSeq.getOrElse(Seq())

trait PersistentHashCode {
// Make hash code persistent across mutations with this simple trick.
Expand All @@ -30,4 +30,14 @@ private[jc] object JoinRunUtils {

def nonemptyOpt[S](s: Seq[S]): Option[Seq[S]] = if (s.isEmpty) None else Some(s)

@SuppressWarnings(Array("org.wartremover.warts.Equals"))
implicit final class AnyOpsEquals[A](self: A) {
def ===(other: A): Boolean = self == other
}

@SuppressWarnings(Array("org.wartremover.warts.Equals"))
implicit final class AnyOpsNotEquals[A](self: A) {
def =!=(other: A): Boolean = self != other
}

}
2 changes: 1 addition & 1 deletion lib/src/main/scala/code/winitzki/jc/Library.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object Library {
}
finally {
try {
if (resource != null) {
if (Option(resource).isDefined) {
cleanup(resource)
}
} catch {
Expand Down
2 changes: 2 additions & 0 deletions lib/src/main/scala/code/winitzki/jc/MutableBag.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class MutableBag[K,V] {

def getCount(k: K): Int = bag.getOrElse(k, mutable.Map()).values.sum

def isEmpty: Boolean = bag.isEmpty

def size: Int = bag.values.map(_.values.sum).sum

def getOne(k: K): Option[V] = bag.get(k).flatMap(_.headOption.map(_._1))
Expand Down
6 changes: 3 additions & 3 deletions lib/src/main/scala/code/winitzki/jc/ReactionSite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private final class ReactionSite(reactions: Seq[Reaction], reactionPool: Pool, s

// We will report all errors to each blocking molecule.
// However, if the reaction failed with retry, we don't yet need to release semaphores and don't need to report errors due to missing reply.
val notFailedWithRetry = exitStatus != ReactionExitRetryFailure
val notFailedWithRetry = exitStatus match { case ReactionExitRetryFailure => false; case _ => true }
val errorMessage = Seq(messageNoReply, messageMultipleReply).flatten.mkString("; ")
val haveErrorsWithBlockingMolecules =
(blockingMoleculesWithNoReply.nonEmpty && notFailedWithRetry)|| blockingMoleculesWithMultipleReply.nonEmpty
Expand Down Expand Up @@ -240,7 +240,7 @@ private final class ReactionSite(reactions: Seq[Reaction], reactionPool: Pool, s
else if (!Thread.currentThread().isInterrupted)
if (logLevel > 1) println(s"Debug: In $this: starting reaction {$reaction} on thread pool $poolForReaction while on thread pool $sitePool with inputs ${moleculeBagToString(usedInputs)}")
if (logLevel > 2) println(
if (moleculesPresent.size == 0)
if (moleculesPresent.isEmpty)
s"Debug: In $this: no molecules remaining"
else
s"Debug: In $this: remaining molecules ${moleculeBagToString(moleculesPresent)}"
Expand Down Expand Up @@ -412,7 +412,7 @@ private final class ReactionSite(reactions: Seq[Reaction], reactionPool: Pool, s
val (singletonsEmitted, diagnostics) = initializeJoinDef()
}

case class WarningsAndErrors(warnings: Seq[String], errors: Seq[String], joinDef: String) {
final case class WarningsAndErrors(warnings: Seq[String], errors: Seq[String], joinDef: String) {
def checkWarningsAndErrors(): Unit = {
if (warnings.nonEmpty) println(s"In $joinDef: ${warnings.mkString("; ")}")
if (errors.nonEmpty) throw new Exception(s"In $joinDef: ${errors.mkString("; ")}")
Expand Down
Loading

0 comments on commit 0a6faef

Please sign in to comment.