Skip to content

Commit

Permalink
Merge pull request #85 from betarixm/refactor/services
Browse files Browse the repository at this point in the history
Minimal PoC
  • Loading branch information
betarixm authored Nov 27, 2023
2 parents 8aba854 + e106129 commit ae9ec83
Show file tree
Hide file tree
Showing 41 changed files with 2,423 additions and 492 deletions.
7 changes: 7 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.bsp
.idea
docs
**/.bloop
target
**/target
**/Dockerfile
9 changes: 4 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ lazy val commonSettings = Seq(
"org.scalactic" %% "scalactic" % "3.2.17",
"org.scalatest" %% "scalatest" % "3.2.17" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.17" % "test",
"io.reactivex.rxjava3" % "rxjava" % "3.0.4"
"io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
"org.apache.logging.log4j" %% "log4j-api-scala" % "13.0.0",
"org.apache.logging.log4j" % "log4j-core" % "2.22.0" % Runtime
)
)

Expand All @@ -42,10 +45,6 @@ lazy val rpc = (project in file("rpc"))
.settings(
commonSettings,
idePackagePrefix := Some("kr.ac.postech.paranode.rpc"),
libraryDependencies ++= Seq(
"io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
)
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/log4j2.properties
20 changes: 15 additions & 5 deletions core/src/main/scala/Block.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package kr.ac.postech.paranode.core

import org.apache.logging.log4j.scala.Logging

import java.io.BufferedOutputStream
import java.io.File
import java.io.FileOutputStream
import scala.io.Source
import scala.reflect.io.Path

object Block {
object Block extends Logging {

implicit class Blocks(blocks: List[Block]) {
def merged: Block = new Block(Record.merged(blocks.map(_.records)))
}

def fromBytes(
bytes: LazyList[Byte],
keyLength: Int = 10,
Expand All @@ -28,8 +35,11 @@ object Block {
path: Path,
keyLength: Int = 10,
valueLength: Int = 90
): Block =
): Block = {
logger.info(s"[Block] Reading block from $path")

Block.fromSource(Source.fromURI(path.toURI), keyLength, valueLength)
}

}

Expand Down Expand Up @@ -57,10 +67,10 @@ class Block(val records: LazyList[Record]) extends AnyVal {
def partition(keyRanges: List[KeyRange]): List[Partition] =
keyRanges.map(partition)

def sort(): Block =
def sorted: Block =
new Block(records.sortBy(_.key))

def sample(): LazyList[Key] =
Record.sampleWithInterval(records)
def sample(number: Int = 64): LazyList[Key] =
Record.sample(records, number)

}
8 changes: 8 additions & 0 deletions core/src/main/scala/Key.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package kr.ac.postech.paranode.core

import com.google.protobuf.ByteString

object Key {
def fromString(string: String): Key = new Key(string.getBytes())

def fromByteString(byteString: ByteString): Key = new Key(
byteString.toByteArray
)
}

class Key(val underlying: Array[Byte]) extends AnyVal with Ordered[Key] {
def is(that: Key): Boolean = underlying sameElements that.underlying

def hex: String = underlying.map("%02x" format _).mkString

override def compare(that: Key): Int = underlying
.zip(that.underlying)
.map { case (a, b) =>
Expand Down
53 changes: 34 additions & 19 deletions core/src/main/scala/Record.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kr.ac.postech.paranode.core

object Record {
import org.apache.logging.log4j.scala.Logging

object Record extends Logging {
def fromString(string: String, keyLength: Int = 10): Record =
Record.fromBytes(string.getBytes(), keyLength)

Expand All @@ -14,27 +16,40 @@ object Record {
keyLength: Int = 10,
valueLength: Int = 90
): LazyList[Record] = {
val recordLength = keyLength + valueLength
val (head, tail) = bytes.splitAt(recordLength)

Record.fromBytes(head.toArray, keyLength) #:: Record
.fromBytesToRecords(
tail,
keyLength,
valueLength
)
if (bytes.isEmpty) {
LazyList.empty
} else {
val recordLength = keyLength + valueLength
val (head, tail) = bytes.splitAt(recordLength)

Record.fromBytes(head.toArray, keyLength) #:: Record
.fromBytesToRecords(
tail,
keyLength,
valueLength
)
}
}

def sampleWithInterval(
def sample(
records: LazyList[Record],
interval: Int = 10
): LazyList[Key] = {
if (records.isEmpty)
LazyList.empty[Key]
else {
val (current, rest) = records.splitAt(interval)
val head = current.head.key
head #:: sampleWithInterval(rest, interval)
number: Int = 64
): LazyList[Key] = records.take(number).map(_.key)

def merged(
listOfRecords: List[LazyList[Record]]
): LazyList[Record] = {
if (listOfRecords.isEmpty) {
LazyList.empty
} else {
val sortedListOfRecords =
listOfRecords.sorted(Ordering.by((_: LazyList[Record]).head.key))

sortedListOfRecords.head.head #:: merged(
(sortedListOfRecords.head.tail :: sortedListOfRecords.tail).filter(
_.nonEmpty
)
)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/BlockSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class BlockSpec extends AnyFlatSpec {
)
)

val sortedBlock = block.sort()
val sortedBlock = block.sorted

val expectedBlock =
new Block(
Expand Down
18 changes: 18 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
services:
master:
build:
context: .
dockerfile: docker/master/Dockerfile
args:
- NUMBER_OF_WORKERS=2
worker:
build:
context: .
dockerfile: docker/worker/Dockerfile
args:
- MASTER_HOST=master
- MASTER_PORT=50051
deploy:
replicas: 2
depends_on:
- master
29 changes: 29 additions & 0 deletions docker/master/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
FROM sbtscala/scala-sbt:eclipse-temurin-jammy-20.0.2_9_1.9.6_2.13.12

ARG NUMBER_OF_WORKERS

ENV NUMBER_OF_WORKERS=${NUMBER_OF_WORKERS}

RUN mkdir -p /app

WORKDIR /app

COPY build.sbt log4j2.properties ./
COPY core/build.sbt ./core/
COPY master/build.sbt ./master/
COPY rpc/build.sbt ./rpc/
COPY utils/build.sbt ./utils/
COPY worker/build.sbt ./worker/
COPY project/build.properties project/plugins.sbt project/scalapb.sbt ./project/

RUN sbt --batch compile

COPY rpc/src/main/protobuf ./rpc/src/main/protobuf

RUN sbt --batch compile

COPY . .

RUN sbt --batch compile

ENTRYPOINT sbt --batch -v "master/run ${NUMBER_OF_WORKERS}"
35 changes: 35 additions & 0 deletions docker/worker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
FROM sbtscala/scala-sbt:eclipse-temurin-jammy-20.0.2_9_1.9.6_2.13.12

ARG MASTER_HOST
ARG MASTER_PORT

ENV MASTER_HOST=${MASTER_HOST}
ENV MASTER_PORT=${MASTER_PORT}

ENV SBT_OPTS="-Xmx2G -Xss2M"

RUN mkdir -p /app /data /output

COPY docker/worker/data /data

WORKDIR /app

COPY build.sbt log4j2.properties ./
COPY core/build.sbt ./core/
COPY master/build.sbt ./master/
COPY rpc/build.sbt ./rpc/
COPY utils/build.sbt ./utils/
COPY worker/build.sbt ./worker/
COPY project/build.properties project/plugins.sbt project/scalapb.sbt ./project/

RUN sbt --batch compile

COPY rpc/src/main/protobuf ./rpc/src/main/protobuf

RUN sbt --batch compile

COPY . .

RUN sbt --batch compile

ENTRYPOINT sbt --batch -v "worker/run ${MASTER_HOST}:${MASTER_PORT} -I /data/0 /data/1 /data/2 -O /output/"
Loading

0 comments on commit ae9ec83

Please sign in to comment.