From ada6a04e9f1a37bfff3bb197d125f1d83c92d62d Mon Sep 17 00:00:00 2001 From: Sergei Winitzki Date: Sun, 22 Jan 2017 19:31:33 -0800 Subject: [PATCH] implement pairing for dance --- docs/chymyst07.md | 79 +++++++++++- .../code/chymyst/test/Patterns01Spec.scala | 113 +++++++++++++----- 2 files changed, 162 insertions(+), 30 deletions(-) diff --git a/docs/chymyst07.md b/docs/chymyst07.md index 45bf3d08..863dceba 100644 --- a/docs/chymyst07.md +++ b/docs/chymyst07.md @@ -1037,7 +1037,7 @@ The reaction can start when a man and a woman are present. It is clear that we can simulate this via two molecules, `man` and `woman`, whose presence is required to start the reaction. ```scala -go { case man(_) + woman(_) => begin_dancing() } +go { case man(_) + woman(_) => beginDancing() } ``` @@ -1047,7 +1047,82 @@ The problem with the above reaction is that it does not respect the linear natur If processes emit several `man()` and `woman()` molecules quickly enough, they will be paired up in random order, rather than in the order of arrival in the queue. Also, nothing prevents several pairs to begin dancing at once, regardless of the dancer's positions in the queues. -TODO: expand +How can we enforce the order of arrival on the pairs? + +The only way to do that is to label each `man` and `woman` molecule with an integer that represents their position in the queue. +However, the external process that emits `man` and `woman` molecules does not know about our ordering requirements. +Therefore, to ensure that the labels are given out consistently, we need our own reaction that assigns the position labels. + +Let us define new molecules, `manL` representing "man with label" and `womanL` for "woman with label". +The dancing reaction will become +```scala +val manL = m[Int] +val womanL = m[Int] +go { case manL(m) + womanL(w) if m == w => beginDancing() } + +``` + +The last positions in the men's and women's queues should be maintained and updated as new dancers arrive. +Since the only way of keeping state is by putting data on molecules, we need new molecules that hold the state of the queue. +Let us call these molecules `queueMen` and `queueWomen`. +We can then define reactions that will produce new molecules, `manL` representing "man with label" and `womanL` for "woman with label": + +```scala +val man = m[Unit] +val manL = m[Int] +val queueMen = m[Int] +val woman = m[Unit] +val womanL = m[Int] +val queueWomen = m[Int] +val beginDancing = m[Unit] + +site( + go { case man(_) + queueMen(n) => queueMen(n+1) + manL(n+1) }, + go { case woman(_) + queueWomen(n) => queueWomen(n+1) + womanL(n+1) } +) + +``` + +The result of this chemistry is that a number of `manL` and `womanL` molecules may accumulate at the reaction site, each carrying their position label. +We now need to make sure they start dancing in the order of their position. + +For instance, it could be that we have `manL(0)`, `manL(1)`, `manL(2)`, `womanL(0)`, `womanL(1)`. +In that case, we should first let `manL(0)` and `womanL(0)` begin dancing, and only when they have done so, we may pair up `manL(1)` and `womanL(1)`. + +We might try writing the following reaction, + +```scala +go { case manL(m) + womanL(w) if m == w => beginDancing() } + +``` + +However, this reaction will not enforce the requirement that `manL(0)` and `womanL(0)` should begin dancing first. +How can we prevent the molecules `manL(1)` and `womanL(1)` from reacting if `manL(0)` and `womanL(0)` have not yet reacted? + +In the chemical machine, the only way to prevent reactions is to withhold some input molecules. +Therefore, the dancing reaction must have _another_ input molecule, say `mayBegin`. +If the dancing reaction has the form `manL + womanL + mayBegin => ...`, and if `mayBegin` carries value 0, +we can enforce the requirement that `manL(0)` and `womanL(0)` should begin dancing first. + +Now it is clear that the `mayBegin` molecule must carry the most recently used position label, and increment this label every time a new pair goes off to dance: + +```scala +go { case manL(m) + womanL(w) + mayBegin(l) if m == w && w == l => beginDancing(); mayBegin(l + 1) } + +``` + +In order to make sure that the previous pair has actually began dancing, let us make `beginDancing()` a _blocking_ molecule. +The next `mayBegin` will then be emitted only after `beginDancing` receives a reply, indicating that the dancing process has actually started. + +Finally, we must make sure that the auxiliary molecules are emitted only once and with correct values. +We can declare these molecules as singletons by writing a singleton reaction: + +```scala +go { case _ => queueMen(0) + queueWomen(0) + mayBegin(0) } + +``` + +The complete working code is found in `Patterns01Spec.scala`. ## Choose and reply to one of many blocking calls (Unix `select`, Actor Model's `receive`) diff --git a/joinrun/src/test/scala/code/chymyst/test/Patterns01Spec.scala b/joinrun/src/test/scala/code/chymyst/test/Patterns01Spec.scala index 746190e0..87d4e57c 100644 --- a/joinrun/src/test/scala/code/chymyst/test/Patterns01Spec.scala +++ b/joinrun/src/test/scala/code/chymyst/test/Patterns01Spec.scala @@ -23,8 +23,8 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach { behavior of "Chymyst" it should "implement barrier (rendezvous without data exchange) for two processes" in { - val barrier1 = b[Unit,Unit] - val barrier2 = b[Unit,Unit] + val barrier1 = b[Unit, Unit] + val barrier2 = b[Unit, Unit] val begin1 = m[Unit] val begin2 = m[Unit] @@ -36,8 +36,11 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach { val logFile = new ConcurrentLinkedQueue[String] def f1() = logFile.add("f1") + def f2() = logFile.add("f2") + def g1() = logFile.add("g1") + def g2() = logFile.add("g2") site(tp)( @@ -58,20 +61,22 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach { } it should "implement exchanger (rendezvous with data exchange) for two processes" in { - val barrier1 = b[Int,Int] - val barrier2 = b[Int,Int] + val barrier1 = b[Int, Int] + val barrier2 = b[Int, Int] val begin1 = m[Unit] val begin2 = m[Unit] val end1 = m[Int] val end2 = m[Int] - val done = b[Unit,(Int,Int)] + val done = b[Unit, (Int, Int)] site(tp)( go { case begin1(_) => - val x1 = 123 // some computation - val y1 = barrier1(x1) // receive value from Process 2 + val x1 = 123 + // some computation + val y1 = barrier1(x1) + // receive value from Process 2 val z = y1 * y1 // further computation end1(z) } @@ -79,8 +84,10 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach { site(tp)( go { case begin2(_) => - val x2 = 456 // some computation - val y2 = barrier2(x2) // receive value from Process 1 + val x2 = 456 + // some computation + val y2 = barrier2(x2) + // receive value from Process 1 val z = y2 * y2 // further computation end2(z) } @@ -92,20 +99,20 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach { ) site(tp)( - go { case end1(x1) + end2(x2) + done(_, r) => r((x1,x2))} + go { case end1(x1) + end2(x2) + done(_, r) => r((x1, x2)) } ) begin1() + begin2() // emit both molecules to enable starting the two reactions val result = done() - result shouldEqual ((456*456,123*123)) + result shouldEqual ((456 * 456, 123 * 123)) } it should "implement barrier (rendezvous without data exchange) with 4 processes" in { - val barrier1 = b[Unit,Unit] - val barrier2 = b[Unit,Unit] - val barrier3 = b[Unit,Unit] - val barrier4 = b[Unit,Unit] + val barrier1 = b[Unit, Unit] + val barrier2 = b[Unit, Unit] + val barrier3 = b[Unit, Unit] + val barrier4 = b[Unit, Unit] val begin1 = m[Unit] val begin2 = m[Unit] @@ -121,12 +128,19 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach { val logFile = new ConcurrentLinkedQueue[String] def f1() = logFile.add("f1") + def f2() = logFile.add("f2") + def f3() = logFile.add("f4") + def f4() = logFile.add("f3") + def g1() = logFile.add("g1") + def g2() = logFile.add("g2") + def g3() = logFile.add("g4") + def g4() = logFile.add("g3") site(tp)( @@ -153,48 +167,55 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach { val n = 100 // The number of rendezvous participants needs to be known in advance, or else we don't know how long still to wait for rendezvous. // There will be 2*n blocked threads; the test will fail with FixedPool(2*n-1). - val pool = new FixedPool(2*n) + val pool = new FixedPool(2 * n) - val barrier = b[Unit,Unit] + val barrier = b[Unit, Unit] val counterInit = m[Unit] - val counter = b[Int,Unit] + val counter = b[Int, Unit] val endCounter = m[Int] - val begin = m[(()=>Unit, ()=>Unit)] + val begin = m[(() => Unit, () => Unit)] val end = m[Unit] val done = b[Unit, Unit] val logFile = new ConcurrentLinkedQueue[String] - def f(n: Int)(): Unit = { logFile.add(s"f$n"); () } - def g(n: Int)(): Unit = { logFile.add(s"g$n"); () } + def f(n: Int)(): Unit = { + logFile.add(s"f$n"); + () + } + + def g(n: Int)(): Unit = { + logFile.add(s"g$n"); + () + } site(pool)( - go { case begin((f,g)) => f(); barrier(); g(); end() }, // this reaction will be run n times because we emit n molecules `begin` with various `f` and `g` + go { case begin((f, g)) => f(); barrier(); g(); end() }, // this reaction will be run n times because we emit n molecules `begin` with various `f` and `g` go { case barrier(_, replyB) + counterInit(_) => // this reaction will consume the very first barrier molecule emitted counter(1) // one reaction has reached the rendezvous point replyB() }, go { case barrier(_, replyB) + counter(k, replyC) => // the `counter` molecule holds the number (k) of the reactions that have reached the rendezvous before this reaction started. - if (k + 1 < n) counter(k+1); else println(s"rendezvous passed by $n reactions") + if (k + 1 < n) counter(k + 1); else println(s"rendezvous passed by $n reactions") replyC() // `replyC()` must be here. Doing `replyC()` before emitting `counter(k+1)` would have unblocked some reactions and allowed them to proceed beyond the rendezvous point without waiting for all others. replyB() }, - go { case end(_) + endCounter(k) => endCounter(k-1) }, - go { case endCounter(0) + done(_, r) => r()} + go { case end(_) + endCounter(k) => endCounter(k - 1) }, + go { case endCounter(0) + done(_, r) => r() } ) - (1 to n).foreach(i => begin((f(i),g(i)))) + (1 to n).foreach(i => begin((f(i), g(i)))) counterInit() endCounter(n) done.timeout()(1000 millis) shouldEqual Some(()) val result: Seq[String] = logFile.iterator().asScala.toSeq - result.size shouldEqual 2*n + result.size shouldEqual 2 * n // Now, there must be f_1, ..., f_n (in any order) before g_1, ..., g_n (also in any order). // We use sets to verify this. val setF = (0 until n).map(result.apply).toSet - val setG = (n until 2*n).map(result.apply).toSet + val setG = (n until 2 * n).map(result.apply).toSet val expectedSetF = (1 to n).map(i => s"f$i").toSet val expectedSetG = (1 to n).map(i => s"g$i").toSet @@ -208,4 +229,40 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach { pool.shutdownNow() } + it should "implement dance pairing with queue labels" in { + val man = m[Unit] + val manL = m[Int] + val queueMen = m[Int] + val woman = m[Unit] + val womanL = m[Int] + val queueWomen = m[Int] + val beginDancing = b[Int, Unit] + val mayBegin = m[Int] + + val danceCounter = m[List[Int]] + val done = b[Unit, List[Int]] + + val pool = new FixedPool(2) + + site(pool)( + go { case man(_) + queueMen(n) => queueMen(n + 1) + manL(n) }, + go { case woman(_) + queueWomen(n) => queueWomen(n + 1) + womanL(n) }, + go { case manL(xy) + womanL(xx) + mayBegin(l) if xx == xy && xy == l => beginDancing(l); mayBegin(l + 1) }, + go { case _ => queueMen(0) + queueWomen(0) + mayBegin(0) } + ) + + val total = 100 + + site(pool)( + go { case danceCounter(x) + done(_, r) if x.size == total => r(x) + danceCounter(x) }, + go { case beginDancing(xy, r) + danceCounter(x) => danceCounter(x :+ xy) + r() }, + go { case _ => danceCounter(Nil) } + ) + + (1 to total).map(_ => ()).foreach(man) + danceCounter.volatileValue shouldEqual Nil + (1 to total).map(_ => ()).foreach(woman) + done() shouldEqual (0 until total).toList + } + }