Skip to content

Declarative concurrency in Scala - The implementation of the chemical machine

License

Notifications You must be signed in to change notification settings

Chymyst/chymyst-core

Repository files navigation

Build Status Coverage Status Version License

Join the chat at https://gitter.im/joinrun-scala/Lobby

JoinRun and Chymyst - declarative concurrency in Scala

JoinRun is a core library that provides a Scala domain-specific language for declarative concurrency. Chymyst is a framework-in-planning that will build upon JoinRun to enable creating concurrent applications declaratively.

JoinRun/Chymyst are based on the chemical machine paradigm, known in the academic world as Join Calculus (JC). JC has the same expressive power as CSP (Communicating Sequential Processes) and the Actor model, but is easier to use.

The initial code of JoinRun was based on previous work by Jiansen He (https://github.com/Jiansen/ScalaJoin, 2011) and Philipp Haller (http://lampwww.epfl.ch/~phaller/joins/index.html, 2008), as well as on my earlier prototypes in Objective-C/iOS and Java/Android.

The current implementation is tested under Oracle JDK 8 with Scala 2.11 and 2.12. It also works with Scala 2.10 and with OpenJDK 7 (except for the new LocalDateTime functions used in tests, and some performance issues).

Overview of JoinRun

To get started, begin with this tutorial introduction.

I gave a presentation on an early version of JoinRun at Scalæ by the Bay 2016. See the talk video and these talk slides revised for the current version of JoinRun.

There is some technical documentation for JoinRun library.

Comparison: JC vs. actor model

JC is similar in some aspects to the well-known actor model framework (e.g. Akka).

JC has these features that are similar to actors:

  • the user's code does not explicitly work with threads / mutexes / semaphores / locks / monitors
  • concurrent processes interact by message-passing
  • messages carry immutable data
  • JC processes start automatically when messages of certain type become available, just as actors run automatically when a message is received

Main differences between actors and JC processes:

JC processes Actors
concurrent processes start automatically whenever several input data sets are available a desired number of concurrent actors must be created manually
processes are implicit, the user's code only manipulates messages the user's code must manipulate explicit references to actors as well as messages
processes typically wait for (and consume) several input messages at once actors wait for (and consume) only one input message at a time
processes are immutable and stateless, all data is stored on messages (which are also immutable) actors can mutate (“become another actor”); actors can hold mutable state
messages are held in an unordered bag messages are held in an ordered queue and processed in the order received
message data is statically typed message data is untyped

In talking about JoinRun and Chymyst, I follow the “chemical machine” metaphor and terminology, which differs from the terminology usually employed in academic papers on JC. Here is a dictionary:

Chemical machine Join Calculus JoinRun
input molecule message on channel case a(123) => ... // pattern-matching
molecule injector channel (port) name val a : M[Int]
blocking injector synchronous channel val q : B[Unit, Int]
reaction process val r1 = run { case a(x) + ... => ... }
injecting an output molecule sending a message a(123) // side effect
injecting a blocking molecule sending a synchronous message q() // returns Int
join definition join definition join(r1, r2, ...)

Comparison: JC vs. CSP

Similarities:

The channels of CSP are similar to JC's blocking molecules: sending a message will block until a process can be started that consumes the message and replies with a value.

Differences:

JC admits only one reply to a blocking channel; CSP can open a channel and send many messages to it.

JC will start processes automatically and concurrently whenever input molecules are available. In CSP, the user needs to create and manage new threads manually.

JC has non-blocking channels as a primitive construct. In CSP, non-blocking channels need to be simulated by additional user code.

Main features of JoinRun

Compared to ScalaJoin (Jiansen He's 2011 implementation of JC), JoinRun offers the following improvements:

  • Lighter syntax for join definitions. Compare:

JoinRun:

val a = m[Int]
val c = m[Int, Int]
join(
  run { case a(x) + c(y, reply) =>
    a(x+y)
    reply(x)
  }
)
a(1)

ScalaJoin:

object join1 extends Join {
  object a extends AsyName[Int]
  object c extends SynName[Int, Int]

  join {
    case a(x) and c(y) =>
      a(x+y)
      c.reply(x)
  }
}
a(1)

As a baseline reference, the most concise syntax for JC is available in JoCaml, at the price of modifying the OCaml compiler:

def a(x) & c(y) =  
   a(x+y) & reply x to c
spawn a(1)

In the JoCaml syntax, a and c are declared implicitly, together with the reaction. This kind of implicit declaration is not possible in JoinRun because Scala macros do not allow us to insert a new top-level name declaration into the code. So, declarations of molecule injectors need to be explicit. Other than that, JoinRun's syntax is closely modeled on that of ScalaJoin and JoCaml.

  • Molecule injectors (or “channels”) are not singleton objects as in ScalaJoin but locally scoped values. This is how the semantics of JC is implemented in JoCaml. In this way, we get more flexibility in defining molecules.
  • Reactions are not merely case clauses but locally scoped values (instances of class Reaction). JoinRun uses macros to perform some static analysis of reactions at compile time and detect some errors.
  • Reactions and molecules are composable: we can begin constructing a join definition incrementally, until we have n reactions and n different molecules, where n is a runtime parameter, with no limit on the number of reactions in one join definition, and no limit on the number of different molecules. (However, a join definition is immutable once it is written.)
  • Join definitions are instances of class JoinDefinition and are invisible to the user (as they should be according to the semantics of JC).
  • Some common cases of invalid join definitions are flagged (as run-time errors) before starting any processes; others are flagged when reactions are run (e.g. if a blocking molecule gets no reply).
  • Fine-grained threading control: each join definition and each reaction can be on a different, separate thread pool; we can use Akka actor-based or thread-based pools.
  • Fair nondeterminism: whenever a molecule can start several reactions, the reaction is chosen at random.
  • Fault tolerance: failed reactions are automatically restarted (when desired).
  • Tracing the execution via logging levels; automatic naming of molecules for debugging is available (via macro).

Status

Current version is 0.1.0. The semantics of JC (restricted to single machine) is fully implemented and tested.

Unit tests include examples such as concurrent counters, parallel “or”, concurrent merge-sort, and “dining philosophers”. JoinRun is about 50% faster than ScalaJoin on certain benchmarks that exercise a very large number of very short reactions. Performance tests indicate that the runtime can schedule about 300,000 reactions per second per CPU core, and the performance bottleneck is in submitting jobs to threads (a distant second bottleneck is pattern-matching in the internals of the library).

Known limitations:

  • JoinRun is about 3x slower than ScalaJoin on the blocking molecule benchmark.
  • JoinRun has no fairness with respect to the choice of molecules: If a reaction could proceed with many alternative sets of input molecules, the input molecules are not chosen at random.
  • JoinRun has no distributed execution (Jiansen He's Disjoin.scala is not ported to JoinRun, and probably will not be). Distributed computation should be implemented in a better way than posting channel names on an HTTP server. (However, JoinRun will use all cores on a single machine.)

Run unit tests

sbt test

The tests will produce some error messages and stack traces - this is normal, as long as all tests pass.

Some tests are timed and will fail on a slow machine.

Build the benchmark application

sbt benchmark/run will run the benchmark application.

Build the library

To build all JARs:

sbt assembly

will prepare a joinrun, benchmark, lib, and macros JAR assemblies.

The main library is in the joinrun JAR assembly. User code should depend on that JAR only.

Basic usage of JoinRun

Here is an example of “single-access non-blocking counter”. There is an integer counter value, to which we have non-blocking access via incr and decr molecules. We can also fetch the current counter value via the get molecule, which is blocking. The counter is initialized to the number we specify.

import code.winitzki.jc.JoinRun._
import code.winitzki.jc.Macros._

// Define the logic of the “non-blocking counter”.
def makeCounter(initCount: Int)
              : (M[Unit], M[Unit], B[Unit, Int]) = {
  val counter = m[Int] // non-blocking molecule with integer value
  val incr = m[Unit] // non-blocking molecule with empty value
  val decr = m[Unit] // empty non-blocking molecule
  val get = b[Unit, Int] // empty blocking molecule returning integer value

  join {
    run { counter(n) + incr(_) => counter(n+1) },
    run { counter(n) + decr(_) => counter(n-1) },
    run { counter(n) + get(_,res) => counter(n) + res(n) }
  }

  counter(initCount) // inject a single “counter(initCount)” molecule

  (incr, decr, get) // return the molecule injectors
}

// make a new counter: get the injectors
val (inc, dec, get) = makeCounter(100)

// use the counter: we can be on any thread,
// we can increment and decrement multiple times,
// and there will be no race conditions

inc() // non-blocking increment
      // more code

dec() // non-blocking decrement
      // more code

val x = get() // blocking call, returns the current value of the counter

Debugging and macros

It is sometimes not easy to make sure that the reactions are correctly designed. The library offers some debugging facilities:

  • each molecule is named
  • a macro is available to assign names automatically
  • the user can set a log level on each join definition

Here are the typical results:

import code.winitzki.jc.JoinRun._
import code.winitzki.jc.Macros._

val counter = b[Int] // the name of this molecule is "counter"
val decr = b[Unit] // the name is "decr"
val get = b[Unit,Int] // the name is "get"

join (
    run { case counter(n) + decr(_) if n > 0 => counter(n-1) },
    run { case counter(n) + get(_, res) => res(n) + counter(n) }
)

counter(5)

/* Let's start debugging... */
counter.setLogLevel(2)

/* Each molecule is automatically named: */
counter.toString // returns the string "counter"

decr() + decr() + decr()
/* This prints:
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting decr() on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules counter(5), decr()
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting decr() on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules decr()
Debug: In Join{counter + decr => ...; counter + get/S => ...}: starting reaction {counter + decr => ...} on thread pool code.winitzki.jc.JReactionPool@57efee08 while on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with inputs decr(), counter(5)
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting decr() on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules decr() * 2
Debug: In Join{counter + decr => ...; counter + get/S => ...}: reaction {counter + decr => ...} started on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with thread id 547
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting counter(4) on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules counter(4), decr() * 2
Debug: In Join{counter + decr => ...; counter + get/S => ...}: starting reaction {counter + decr => ...} on thread pool code.winitzki.jc.JReactionPool@57efee08 while on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with inputs decr(), counter(4)
Debug: In Join{counter + decr => ...; counter + get/S => ...}: reaction {counter + decr => ...} started on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with thread id 548
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting counter(3) on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules counter(3), decr()
Debug: In Join{counter + decr => ...; counter + get/S => ...}: starting reaction {counter + decr => ...} on thread pool code.winitzki.jc.JReactionPool@57efee08 while on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with inputs decr(), counter(3)
Debug: In Join{counter + decr => ...; counter + get/S => ...}: reaction {counter + decr => ...} started on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with thread id 549
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting counter(2) on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules counter(2)

*/
println(counter.logSoup)
/* This prints:
 Join{counter + decr => ...; counter + get/S => ...}
 Molecules: counter(2)
 */
decr() + decr() + decr()
/* This prints:
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting decr() on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules counter(2), decr()
Debug: In Join{counter + decr => ...; counter + get/S => ...}: starting reaction {counter + decr => ...} on thread pool code.winitzki.jc.JReactionPool@57efee08 while on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with inputs decr(), counter(2)
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting decr() on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules decr()
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting decr() on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules decr() * 2
Debug: In Join{counter + decr => ...; counter + get/S => ...}: reaction {counter + decr => ...} started on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with thread id 613
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting counter(1) on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules counter(1), decr() * 2
Debug: In Join{counter + decr => ...; counter + get/S => ...}: starting reaction {counter + decr => ...} on thread pool code.winitzki.jc.JReactionPool@57efee08 while on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with inputs decr(), counter(1)
Debug: In Join{counter + decr => ...; counter + get/S => ...}: reaction {counter + decr => ...} started on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with thread id 548
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting counter(0) on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules counter(0), decr()
*/
println(counter.logSoup)
/* This prints:
 Join{counter + decr => ...; counter + get/S => ...}
 Molecules: counter(0), decr()
 */

val x = get()
/* This results in x = 0 and prints:
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting get/S() on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules counter(0), decr(), get/S()
Debug: In Join{counter + decr => ...; counter + get/S => ...}: starting reaction {counter + get/S => ...} on thread pool code.winitzki.jc.JReactionPool@57efee08 while on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with inputs counter(0), get/S()
Debug: In Join{counter + decr => ...; counter + get/S => ...}: reaction {counter + get/S => ...} started on thread pool code.winitzki.jc.JJoinPool@36ce2e5d with thread id 549
Debug: Join{counter + decr => ...; counter + get/S => ...} injecting counter(0) on thread pool code.winitzki.jc.JJoinPool@36ce2e5d, now have molecules counter(0), decr()
*/