Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimal PoC #87

Merged
merged 7 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/Block.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Block(val records: LazyList[Record]) extends AnyVal {

def writeTo(path: Path): File = {
val file = File(path)
val writer = file.bufferedWriter()
val writer = file.bufferedOutput()

try {
toChars.foreach(writer.write(_))
Expand All @@ -65,7 +65,7 @@ class Block(val records: LazyList[Record]) extends AnyVal {
.zipWithIndex
.map({ case (records, index) =>
val file = File(directory / s"partition.$index")
val writer = file.bufferedWriter()
val writer = file.bufferedOutput()

try {
records.foreach(_.toChars.foreach(writer.write(_)))
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/Key.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@ object Key {
def fromByteString(byteString: ByteString): Key = new Key(
byteString.toByteArray
)

def min(size: Int = 10) = new Key(Array.fill[Byte](size)(0x00.toByte))

def max(size: Int = 10) = new Key(Array.fill[Byte](size)(0xff.toByte))
}

class Key(val underlying: Array[Byte]) extends AnyVal with Ordered[Key] {
case class Key(underlying: Array[Byte]) extends AnyVal with Ordered[Key] {
def is(that: Key): Boolean = underlying sameElements that.underlying

def hex: String = underlying.map("%02x" format _).mkString

def prior: Key = new Key(underlying.init :+ (underlying.last - 1).toByte)

override def compare(that: Key): Int = underlying
.zip(that.underlying)
.map { case (a, b) =>
a - b
a.toChar - b.toChar
}
.find(_ != 0)
.getOrElse(0)
Expand Down
25 changes: 22 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,33 @@ services:
dockerfile: docker/master/Dockerfile
args:
- NUMBER_OF_WORKERS=2
worker:
worker-0:
build:
context: .
dockerfile: docker/worker/Dockerfile
args:
- WORKER_ID=0
- MASTER_HOST=master
- MASTER_PORT=50051
deploy:
replicas: 2
depends_on:
- master
worker-1:
build:
context: .
dockerfile: docker/worker/Dockerfile
args:
- WORKER_ID=0
- MASTER_HOST=master
- MASTER_PORT=50051
depends_on:
- master
worker-2:
build:
context: .
dockerfile: docker/worker/Dockerfile
args:
- WORKER_ID=0
- MASTER_HOST=master
- MASTER_PORT=50051
depends_on:
- master
6 changes: 4 additions & 2 deletions docker/worker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM sbtscala/scala-sbt:eclipse-temurin-jammy-20.0.2_9_1.9.6_2.13.12

ARG WORKER_ID

ARG MASTER_HOST
ARG MASTER_PORT

Expand All @@ -10,7 +12,7 @@ ENV SBT_OPTS="-Xmx2G -Xss2M"

RUN mkdir -p /app /data /output

COPY docker/worker/data /data
COPY docker/worker/data/${WORKER_ID} /data

WORKDIR /app

Expand All @@ -32,4 +34,4 @@ COPY . .

RUN sbt --batch compile

ENTRYPOINT sbt --batch -v "worker/run ${MASTER_HOST}:${MASTER_PORT} -I /data/0 /data/1 /data/2 -O /output/"
ENTRYPOINT sbt --batch -v "worker/run ${MASTER_HOST}:${MASTER_PORT} -I /data/0 /data/1 -O /output/"
500 changes: 500 additions & 0 deletions docker/worker/data/0/0/1

Large diffs are not rendered by default.

500 changes: 500 additions & 0 deletions docker/worker/data/0/0/2

Large diffs are not rendered by default.

500 changes: 500 additions & 0 deletions docker/worker/data/0/1/1

Large diffs are not rendered by default.

500 changes: 500 additions & 0 deletions docker/worker/data/0/1/2

Large diffs are not rendered by default.

500 changes: 500 additions & 0 deletions docker/worker/data/1/0/1

Large diffs are not rendered by default.

500 changes: 500 additions & 0 deletions docker/worker/data/1/0/2

Large diffs are not rendered by default.

500 changes: 500 additions & 0 deletions docker/worker/data/1/1/1

Large diffs are not rendered by default.

500 changes: 500 additions & 0 deletions docker/worker/data/1/1/2

Large diffs are not rendered by default.

256 changes: 0 additions & 256 deletions docker/worker/data/1/3

This file was deleted.

500 changes: 500 additions & 0 deletions docker/worker/data/2/0/1

Large diffs are not rendered by default.

244 changes: 244 additions & 0 deletions docker/worker/data/1/2 → docker/worker/data/2/0/2
100644 → 100755

Large diffs are not rendered by default.

244 changes: 244 additions & 0 deletions docker/worker/data/0/1 → docker/worker/data/2/1/1
100644 → 100755

Large diffs are not rendered by default.

244 changes: 244 additions & 0 deletions docker/worker/data/0/0 → docker/worker/data/2/1/2
100644 → 100755

Large diffs are not rendered by default.

256 changes: 0 additions & 256 deletions docker/worker/data/2/4

This file was deleted.

256 changes: 0 additions & 256 deletions docker/worker/data/2/5

This file was deleted.

35 changes: 20 additions & 15 deletions master/src/main/scala/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,25 @@ import scala.concurrent.ExecutionContextExecutor
object Master extends Logging {
private def workersWithKeyRange(
keys: List[Key],
workers: List[WorkerMetadata]
): List[WorkerMetadata] =
keys
.sliding(
keys.size / workers.size,
keys.size / workers.size
workers: List[WorkerMetadata],
min: Key,
max: Key
): List[WorkerMetadata] = {
val keysWithLowerBound = keys :+ min

val startKeys = keysWithLowerBound.sorted
.grouped(
(keysWithLowerBound.size.toDouble / workers.size.ceil).ceil.toInt
)
.toList
.map(keys => KeyRange.tupled(keys.head, keys.last))
.zip(workers)
.map { case (keyRange, worker) =>
worker.copy(keyRange = Some(keyRange))
}
.map(_.head)

val pairs = startKeys.zip(startKeys.tail.map(_.prior) :+ max)

workers.zip(pairs).map { case (worker, (min, max)) =>
worker.copy(keyRange = Some(KeyRange(min, max)))
}
}

def main(args: Array[String]): Unit = {
val masterArguments = new MasterArguments(args)
Expand Down Expand Up @@ -73,11 +79,10 @@ object Master extends Logging {
.flatMap(_.sampledKeys)
.map(Key.fromByteString)

logger.info("[Master] Sampled")

val sortedSampledKeys = sampledKeys.sorted
logger.info(s"[Master] Sampled $sampledKeys")

val workers = workersWithKeyRange(sortedSampledKeys, workerInfo)
val workers =
workersWithKeyRange(sampledKeys, workerInfo, Key.min(), Key.max())

logger.info(s"[Master] Key ranges with worker: $workers")

Expand Down
Loading