Skip to content

Commit

Permalink
implement pairing for dance
Browse files Browse the repository at this point in the history
  • Loading branch information
winitzki committed Jan 23, 2017
1 parent 8af93d7 commit ada6a04
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 30 deletions.
79 changes: 77 additions & 2 deletions docs/chymyst07.md
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ The reaction can start when a man and a woman are present.
It is clear that we can simulate this via two molecules, `man` and `woman`, whose presence is required to start the reaction.

```scala
go { case man(_) + woman(_) => begin_dancing() }
go { case man(_) + woman(_) => beginDancing() }

```

Expand All @@ -1047,7 +1047,82 @@ The problem with the above reaction is that it does not respect the linear natur
If processes emit several `man()` and `woman()` molecules quickly enough, they will be paired up in random order, rather than in the order of arrival in the queue.
Also, nothing prevents several pairs to begin dancing at once, regardless of the dancer's positions in the queues.

TODO: expand
How can we enforce the order of arrival on the pairs?

The only way to do that is to label each `man` and `woman` molecule with an integer that represents their position in the queue.
However, the external process that emits `man` and `woman` molecules does not know about our ordering requirements.
Therefore, to ensure that the labels are given out consistently, we need our own reaction that assigns the position labels.

Let us define new molecules, `manL` representing "man with label" and `womanL` for "woman with label".
The dancing reaction will become
```scala
val manL = m[Int]
val womanL = m[Int]
go { case manL(m) + womanL(w) if m == w => beginDancing() }

```

The last positions in the men's and women's queues should be maintained and updated as new dancers arrive.
Since the only way of keeping state is by putting data on molecules, we need new molecules that hold the state of the queue.
Let us call these molecules `queueMen` and `queueWomen`.
We can then define reactions that will produce new molecules, `manL` representing "man with label" and `womanL` for "woman with label":

```scala
val man = m[Unit]
val manL = m[Int]
val queueMen = m[Int]
val woman = m[Unit]
val womanL = m[Int]
val queueWomen = m[Int]
val beginDancing = m[Unit]

site(
go { case man(_) + queueMen(n) => queueMen(n+1) + manL(n+1) },
go { case woman(_) + queueWomen(n) => queueWomen(n+1) + womanL(n+1) }
)

```

The result of this chemistry is that a number of `manL` and `womanL` molecules may accumulate at the reaction site, each carrying their position label.
We now need to make sure they start dancing in the order of their position.

For instance, it could be that we have `manL(0)`, `manL(1)`, `manL(2)`, `womanL(0)`, `womanL(1)`.
In that case, we should first let `manL(0)` and `womanL(0)` begin dancing, and only when they have done so, we may pair up `manL(1)` and `womanL(1)`.

We might try writing the following reaction,

```scala
go { case manL(m) + womanL(w) if m == w => beginDancing() }

```

However, this reaction will not enforce the requirement that `manL(0)` and `womanL(0)` should begin dancing first.
How can we prevent the molecules `manL(1)` and `womanL(1)` from reacting if `manL(0)` and `womanL(0)` have not yet reacted?

In the chemical machine, the only way to prevent reactions is to withhold some input molecules.
Therefore, the dancing reaction must have _another_ input molecule, say `mayBegin`.
If the dancing reaction has the form `manL + womanL + mayBegin => ...`, and if `mayBegin` carries value 0,
we can enforce the requirement that `manL(0)` and `womanL(0)` should begin dancing first.

Now it is clear that the `mayBegin` molecule must carry the most recently used position label, and increment this label every time a new pair goes off to dance:

```scala
go { case manL(m) + womanL(w) + mayBegin(l) if m == w && w == l => beginDancing(); mayBegin(l + 1) }

```

In order to make sure that the previous pair has actually began dancing, let us make `beginDancing()` a _blocking_ molecule.
The next `mayBegin` will then be emitted only after `beginDancing` receives a reply, indicating that the dancing process has actually started.

Finally, we must make sure that the auxiliary molecules are emitted only once and with correct values.
We can declare these molecules as singletons by writing a singleton reaction:

```scala
go { case _ => queueMen(0) + queueWomen(0) + mayBegin(0) }

```

The complete working code is found in `Patterns01Spec.scala`.

## Choose and reply to one of many blocking calls (Unix `select`, Actor Model's `receive`)

Expand Down
113 changes: 85 additions & 28 deletions joinrun/src/test/scala/code/chymyst/test/Patterns01Spec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach {
behavior of "Chymyst"

it should "implement barrier (rendezvous without data exchange) for two processes" in {
val barrier1 = b[Unit,Unit]
val barrier2 = b[Unit,Unit]
val barrier1 = b[Unit, Unit]
val barrier2 = b[Unit, Unit]

val begin1 = m[Unit]
val begin2 = m[Unit]
Expand All @@ -36,8 +36,11 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach {
val logFile = new ConcurrentLinkedQueue[String]

def f1() = logFile.add("f1")

def f2() = logFile.add("f2")

def g1() = logFile.add("g1")

def g2() = logFile.add("g2")

site(tp)(
Expand All @@ -58,29 +61,33 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach {
}

it should "implement exchanger (rendezvous with data exchange) for two processes" in {
val barrier1 = b[Int,Int]
val barrier2 = b[Int,Int]
val barrier1 = b[Int, Int]
val barrier2 = b[Int, Int]

val begin1 = m[Unit]
val begin2 = m[Unit]

val end1 = m[Int]
val end2 = m[Int]
val done = b[Unit,(Int,Int)]
val done = b[Unit, (Int, Int)]

site(tp)(
go { case begin1(_) =>
val x1 = 123 // some computation
val y1 = barrier1(x1) // receive value from Process 2
val x1 = 123
// some computation
val y1 = barrier1(x1)
// receive value from Process 2
val z = y1 * y1 // further computation
end1(z)
}
)

site(tp)(
go { case begin2(_) =>
val x2 = 456 // some computation
val y2 = barrier2(x2) // receive value from Process 1
val x2 = 456
// some computation
val y2 = barrier2(x2)
// receive value from Process 1
val z = y2 * y2 // further computation
end2(z)
}
Expand All @@ -92,20 +99,20 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach {
)

site(tp)(
go { case end1(x1) + end2(x2) + done(_, r) => r((x1,x2))}
go { case end1(x1) + end2(x2) + done(_, r) => r((x1, x2)) }
)

begin1() + begin2() // emit both molecules to enable starting the two reactions

val result = done()
result shouldEqual ((456*456,123*123))
result shouldEqual ((456 * 456, 123 * 123))
}

it should "implement barrier (rendezvous without data exchange) with 4 processes" in {
val barrier1 = b[Unit,Unit]
val barrier2 = b[Unit,Unit]
val barrier3 = b[Unit,Unit]
val barrier4 = b[Unit,Unit]
val barrier1 = b[Unit, Unit]
val barrier2 = b[Unit, Unit]
val barrier3 = b[Unit, Unit]
val barrier4 = b[Unit, Unit]

val begin1 = m[Unit]
val begin2 = m[Unit]
Expand All @@ -121,12 +128,19 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach {
val logFile = new ConcurrentLinkedQueue[String]

def f1() = logFile.add("f1")

def f2() = logFile.add("f2")

def f3() = logFile.add("f4")

def f4() = logFile.add("f3")

def g1() = logFile.add("g1")

def g2() = logFile.add("g2")

def g3() = logFile.add("g4")

def g4() = logFile.add("g3")

site(tp)(
Expand All @@ -153,48 +167,55 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach {
val n = 100 // The number of rendezvous participants needs to be known in advance, or else we don't know how long still to wait for rendezvous.

// There will be 2*n blocked threads; the test will fail with FixedPool(2*n-1).
val pool = new FixedPool(2*n)
val pool = new FixedPool(2 * n)

val barrier = b[Unit,Unit]
val barrier = b[Unit, Unit]
val counterInit = m[Unit]
val counter = b[Int,Unit]
val counter = b[Int, Unit]
val endCounter = m[Int]
val begin = m[(()=>Unit, ()=>Unit)]
val begin = m[(() => Unit, () => Unit)]
val end = m[Unit]
val done = b[Unit, Unit]

val logFile = new ConcurrentLinkedQueue[String]

def f(n: Int)(): Unit = { logFile.add(s"f$n"); () }
def g(n: Int)(): Unit = { logFile.add(s"g$n"); () }
def f(n: Int)(): Unit = {
logFile.add(s"f$n");
()
}

def g(n: Int)(): Unit = {
logFile.add(s"g$n");
()
}

site(pool)(
go { case begin((f,g)) => f(); barrier(); g(); end() }, // this reaction will be run n times because we emit n molecules `begin` with various `f` and `g`
go { case begin((f, g)) => f(); barrier(); g(); end() }, // this reaction will be run n times because we emit n molecules `begin` with various `f` and `g`
go { case barrier(_, replyB) + counterInit(_) => // this reaction will consume the very first barrier molecule emitted
counter(1) // one reaction has reached the rendezvous point
replyB()
},
go { case barrier(_, replyB) + counter(k, replyC) => // the `counter` molecule holds the number (k) of the reactions that have reached the rendezvous before this reaction started.
if (k + 1 < n) counter(k+1); else println(s"rendezvous passed by $n reactions")
if (k + 1 < n) counter(k + 1); else println(s"rendezvous passed by $n reactions")
replyC() // `replyC()` must be here. Doing `replyC()` before emitting `counter(k+1)` would have unblocked some reactions and allowed them to proceed beyond the rendezvous point without waiting for all others.
replyB()
},
go { case end(_) + endCounter(k) => endCounter(k-1) },
go { case endCounter(0) + done(_, r) => r()}
go { case end(_) + endCounter(k) => endCounter(k - 1) },
go { case endCounter(0) + done(_, r) => r() }
)

(1 to n).foreach(i => begin((f(i),g(i))))
(1 to n).foreach(i => begin((f(i), g(i))))
counterInit()
endCounter(n)
done.timeout()(1000 millis) shouldEqual Some(())

val result: Seq[String] = logFile.iterator().asScala.toSeq
result.size shouldEqual 2*n
result.size shouldEqual 2 * n
// Now, there must be f_1, ..., f_n (in any order) before g_1, ..., g_n (also in any order).
// We use sets to verify this.

val setF = (0 until n).map(result.apply).toSet
val setG = (n until 2*n).map(result.apply).toSet
val setG = (n until 2 * n).map(result.apply).toSet

val expectedSetF = (1 to n).map(i => s"f$i").toSet
val expectedSetG = (1 to n).map(i => s"g$i").toSet
Expand All @@ -208,4 +229,40 @@ class Patterns01Spec extends FlatSpec with Matchers with BeforeAndAfterEach {
pool.shutdownNow()
}

it should "implement dance pairing with queue labels" in {
val man = m[Unit]
val manL = m[Int]
val queueMen = m[Int]
val woman = m[Unit]
val womanL = m[Int]
val queueWomen = m[Int]
val beginDancing = b[Int, Unit]
val mayBegin = m[Int]

val danceCounter = m[List[Int]]
val done = b[Unit, List[Int]]

val pool = new FixedPool(2)

site(pool)(
go { case man(_) + queueMen(n) => queueMen(n + 1) + manL(n) },
go { case woman(_) + queueWomen(n) => queueWomen(n + 1) + womanL(n) },
go { case manL(xy) + womanL(xx) + mayBegin(l) if xx == xy && xy == l => beginDancing(l); mayBegin(l + 1) },
go { case _ => queueMen(0) + queueWomen(0) + mayBegin(0) }
)

val total = 100

site(pool)(
go { case danceCounter(x) + done(_, r) if x.size == total => r(x) + danceCounter(x) },
go { case beginDancing(xy, r) + danceCounter(x) => danceCounter(x :+ xy) + r() },
go { case _ => danceCounter(Nil) }
)

(1 to total).map(_ => ()).foreach(man)
danceCounter.volatileValue shouldEqual Nil
(1 to total).map(_ => ()).foreach(woman)
done() shouldEqual (0 until total).toList
}

}

0 comments on commit ada6a04

Please sign in to comment.