Skip to content

Commit

Permalink
Merge pull request #69 from betarixm/develop
Browse files Browse the repository at this point in the history
Iteration 04
  • Loading branch information
leejiwon1125 authored Nov 16, 2023
2 parents c2a6edc + 3878c79 commit cc1320b
Show file tree
Hide file tree
Showing 18 changed files with 692 additions and 55 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,12 @@ lazy val worker = (project in file("worker"))
lazy val network = (project in file("rpc"))
.settings(
commonSettings,
idePackagePrefix := Some("kr.ac.postech.paranode.rpc")
idePackagePrefix := Some("kr.ac.postech.paranode.rpc"),
libraryDependencies ++= Seq(
"io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
)
)
34 changes: 11 additions & 23 deletions core/src/main/scala/Block.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,20 @@ class Block(val records: LazyList[Record]) extends AnyVal {
} finally writer.close()
}

def partition(workers: List[WorkerMetadata]): List[Partition] = {
def isInKeyRange(key: Key, range: KeyRange): Boolean =
(key >= range._1) && (key <= range._2)
def filterByKeyRange(keyRange: KeyRange): Block = new Block(
records.filter(keyRange.includes)
)

val groupedRecords = for {
worker <- workers
keyRange = worker.keyRange.getOrElse(
throw new AssertionError("KeyRange must be defined")
)
filteredRecords = records.filter(record =>
isInKeyRange(record.key, keyRange)
)
} yield new Partition(
worker,
new Block(filteredRecords)
)
def partition(keyRange: KeyRange): Partition =
(keyRange, filterByKeyRange(keyRange))

groupedRecords
}
def partition(keyRanges: List[KeyRange]): List[Partition] =
keyRanges.map(partition)

def sort(block: Block): Block = {
val sortedRecords = block.records.sortBy(_.key)
new Block(sortedRecords)
}
def sort(): Block =
new Block(records.sortBy(_.key))

def sample(block: Block): LazyList[Key] =
Record.sampleWithInterval(block.records)
def sample(): LazyList[Key] =
Record.sampleWithInterval(records)

}
7 changes: 7 additions & 0 deletions core/src/main/scala/KeyRange.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package kr.ac.postech.paranode.core

case class KeyRange(from: Key, to: Key) {
def includes(key: Key): Boolean = (from <= key) && (key <= to)

def includes(record: Record): Boolean = includes(record.key)
}
3 changes: 1 addition & 2 deletions core/src/main/scala/package.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package kr.ac.postech.paranode

package object core {
type KeyRange = (Key, Key)
type Partition = (WorkerMetadata, Block)
type Partition = (KeyRange, Block)
}
69 changes: 40 additions & 29 deletions core/src/test/scala/BlockSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ import java.io.File
import scala.io.Source

class BlockSpec extends AnyFlatSpec {
implicit class ComparableKeyRange(keyRange: KeyRange) {
def is(that: KeyRange): Boolean =
(keyRange.from is that.from) && (keyRange.to is that.to)
}

implicit class ComparablePartition(partition: Partition) {
def is(that: Partition): Boolean = {
(partition._1 is that._1) && (partition._2 is that._2)
}
}

implicit class ComparableBlock(block: Block) {
def is(that: Block): Boolean = {
block.records
Expand Down Expand Up @@ -107,44 +118,44 @@ class BlockSpec extends AnyFlatSpec {
}

it should "be able to make partition" in {
val keyStart1 = new Key(Array(0x0))
val keyEnd1 = new Key(Array(0x4))
val keyRange1 = new KeyRange(keyStart1, keyEnd1)
val workerMetadata1 = WorkerMetadata("1.1.1.1", 123, Option(keyRange1))

val keyStart2 = new Key(Array(0x5))
val keyEnd2 = new Key(Array(0x9))
val keyRange2 = new KeyRange(keyStart2, keyEnd2)
val workerMetadata2 = WorkerMetadata("2.2.2.2", 123, Option(keyRange2))
val firstKeyRange = KeyRange(new Key(Array(0x0)), new Key(Array(0x4)))
val secondKeyRange = KeyRange(new Key(Array(0x5)), new Key(Array(0x9)))

val workers = List(workerMetadata1, workerMetadata2)

val block1 = new Block(
LazyList(
new Record(new Key(Array(0x1)), Array(0x1, 0x2, 0x3)),
new Record(new Key(Array(0x2)), Array(0x8, 0x9, 0x7))
)
)
val block2 = new Block(
LazyList(
new Record(new Key(Array(0x6)), Array(0x5, 0x6, 0x7))
)
)
val blocks = new Block(
LazyList(
new Record(new Key(Array(0x1)), Array(0x1, 0x2, 0x3)),
new Record(new Key(Array(0x6)), Array(0x5, 0x6, 0x7)),
new Record(new Key(Array(0x2)), Array(0x8, 0x9, 0x7))
)
)
val result = blocks.partition(workers)

val answer = List(
new Partition(workerMetadata1, block1),
new Partition(workerMetadata2, block2)
val partitions = blocks.partition(List(firstKeyRange, secondKeyRange))

val expectedPartitions = List(
new Partition(
firstKeyRange,
new Block(
LazyList(
new Record(new Key(Array(0x1)), Array(0x1, 0x2, 0x3)),
new Record(new Key(Array(0x2)), Array(0x8, 0x9, 0x7))
)
)
),
new Partition(
secondKeyRange,
new Block(
LazyList(
new Record(new Key(Array(0x6)), Array(0x5, 0x6, 0x7))
)
)
)
)
assert(result.zip(answer).forall({ case ((_, a), (_, b)) => a is b }))

assert(
partitions
.zip(expectedPartitions)
.forall(partitions => partitions._1 is partitions._2)
)
}

it should "be sortable" in {
Expand All @@ -157,7 +168,7 @@ class BlockSpec extends AnyFlatSpec {
)
)

val sortedBlock = block.sort(block)
val sortedBlock = block.sort()

val expectedBlock =
new Block(
Expand All @@ -183,7 +194,7 @@ class BlockSpec extends AnyFlatSpec {
)
)

val sample = block.sample(block)
val sample = block.sample()

val key1 = new Key(Array(0x0))
val key2 = new Key(Array(0x4))
Expand Down
28 changes: 28 additions & 0 deletions docs/report-progress-04.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Progress Report

> 11/10/2023 - 11/16/2023
## Completed Tasks

- Make `Partition` into tuple of `KeyRange` and `Block` (#46)
- Refactor `block.partition` with `groupBy` (#47)
- Define `KeyRange` class (#48)
- Test `gensort` on linux and macOS (#49)
- Implement `Blocks` (#50)
- Setup ScalaPB (#51)
- Write gRPC `proto` (#52)
- Implement Exchange RPC Service (#54)
- Implement Master RPC Service (#55)

## Assigned Tasks

- Merge proto related branches and implement (#62)
- @betarixm
- Implement Worker Server (#63)
- @hataehyeok
- Implement Exchange Server (#64)
- @leejiwon1125
- Implement Master Server (#65)
- @leejiwon1125
- CLI and Initialize (#66)
- @leejiwon1125
3 changes: 3 additions & 0 deletions project/scalapb.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6")

libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.11"
16 changes: 16 additions & 0 deletions rpc/src/main/protobuf/exchange.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package kr.ac.postech.paranode.rpc;

service Exchange {

rpc SaveRecords (GetMyRecordsRequest) returns (GetMyRecordsReply) {}
}

message GetMyRecordsRequest {
repeated string records = 1;
}

message GetMyRecordsReply {
bool isNice = 1;
}
19 changes: 19 additions & 0 deletions rpc/src/main/protobuf/hello.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";

package kr.ac.postech.paranode.rpc;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}
18 changes: 18 additions & 0 deletions rpc/src/main/protobuf/master.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package kr.ac.postech.paranode.rpc;

service Master {

rpc RegisterWorkerDirectory (RegisterRequest) returns (RegisterReply) {}
}

message RegisterRequest {
string ipAddress = 1;
repeated string inputDirectory = 2;
string outputDirectory = 3;
}

message RegisterReply {
bool isRegistered = 1;
}
47 changes: 47 additions & 0 deletions rpc/src/main/protobuf/worker.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
syntax = "proto3";

package kr.ac.postech.paranode.rpc;

service Worker {

rpc SampleKeys (SampleRequest) returns (SampleReply) {}
rpc MakePartitions (PartitionRequest) returns (PartitionReply) {}
rpc ExchangeWithOtherWorker (ExchangeRequest) returns (ExchangeReply) {}
rpc Merge (MergeRequest) returns (MergeReply) {}
}

message SampleRequest {
int32 numberOfKeys = 1;
}

message SampleReply {
repeated string sampledKeys = 1;
bool isNice = 2;
}

message PartitionRequest {

message WorkerPartition {
string workerIpAddress = 1;
string startKey = 2;
string endKey = 3;
}
repeated WorkerPartition partitions = 1;

}

message PartitionReply {
bool isNice = 1;
}

message ExchangeRequest { }

message ExchangeReply {
bool isNice = 1;
}

message MergeRequest { }

message MergeReply {
bool isNice = 1;
}
56 changes: 56 additions & 0 deletions rpc/src/main/scala/ExchangeClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package kr.ac.postech.paranode.rpc

import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import kr.ac.postech.paranode.rpc.exchange.ExchangeGrpc
import kr.ac.postech.paranode.rpc.exchange.ExchangeGrpc.ExchangeBlockingStub
import kr.ac.postech.paranode.rpc.exchange.GetMyRecordsReply
import kr.ac.postech.paranode.rpc.exchange.GetMyRecordsRequest

import java.util.concurrent.TimeUnit
import java.util.logging.Logger

object ExchangeClient {
def apply(host: String, port: Int): ExchangeClient = {
val channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.build
val blockingStub = ExchangeGrpc.blockingStub(channel)
new ExchangeClient(channel, blockingStub)
}

def main(args: Array[String]): Unit = {
val client = ExchangeClient("localhost", 30050)
try {
val response = client.SaveRecords()

println("My records is: " + response.isNice)
} finally {
client.shutdown()
}
}
}

class ExchangeClient private (
private val channel: ManagedChannel,
private val blockingStub: ExchangeBlockingStub
) {
private[this] val logger = Logger.getLogger(classOf[WorkerClient].getName)

def shutdown(): Unit = {
channel.shutdown.awaitTermination(5, TimeUnit.SECONDS)
}

def SaveRecords(): GetMyRecordsReply = {
logger.info(
"Try to save my records"
)

val request = GetMyRecordsRequest()
val response = blockingStub.saveRecords(request)
logger.info("My records is: " + response.isNice)

response
}
}
Loading

0 comments on commit cc1320b

Please sign in to comment.