Skip to content

Commit

Permalink
Merge pull request #83 from betarixm/develop
Browse files Browse the repository at this point in the history
Iteration 05
  • Loading branch information
betarixm authored Nov 23, 2023
2 parents cc1320b + bc0a272 commit 993094c
Show file tree
Hide file tree
Showing 16 changed files with 405 additions and 284 deletions.
34 changes: 19 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ lazy val commonSettings = Seq(
libraryDependencies ++= Seq(
"org.scalactic" %% "scalactic" % "3.2.17",
"org.scalatest" %% "scalatest" % "3.2.17" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.17" % "test"
"org.scalatest" %% "scalatest-flatspec" % "3.2.17" % "test",
"io.reactivex.rxjava3" % "rxjava" % "3.0.4"
)
)

lazy val root = (project in file("."))
.aggregate(core, utils, master, worker, network)
.aggregate(core, utils, master, worker, rpc)

lazy val utils = (project in file("utils"))
.settings(
Expand All @@ -37,29 +38,32 @@ lazy val core = (project in file("core"))
)
.dependsOn(utils)

lazy val master = (project in file("master"))
lazy val rpc = (project in file("rpc"))
.settings(
commonSettings,
idePackagePrefix := Some("kr.ac.postech.paranode.master")
idePackagePrefix := Some("kr.ac.postech.paranode.rpc"),
libraryDependencies ++= Seq(
"io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
)
)
.dependsOn(core)

lazy val worker = (project in file("worker"))
lazy val master = (project in file("master"))
.settings(
commonSettings,
idePackagePrefix := Some("kr.ac.postech.paranode.worker")
idePackagePrefix := Some("kr.ac.postech.paranode.master")
)
.dependsOn(core)
.dependsOn(rpc)

lazy val network = (project in file("rpc"))
lazy val worker = (project in file("worker"))
.settings(
commonSettings,
idePackagePrefix := Some("kr.ac.postech.paranode.rpc"),
libraryDependencies ++= Seq(
"io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
)
idePackagePrefix := Some("kr.ac.postech.paranode.worker")
)
.dependsOn(core)
.dependsOn(rpc)
43 changes: 43 additions & 0 deletions docs/report-progress-05.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Progress Report

> 11/17/2023 - 11/23/2023
## Discussion

### File Naming Convention

> For `/[INPUT DIR]/[BLOCK A]`
- Sorted Block A
- `/[INPUT DIR]/[BLOCK A]`
- A Partition of A
- `/[INPUT DIR]/[BLOCK A].[FROM].[TO]`
- Partition from Other Worker
- `/[OUTPUT DIR]/[UUID]`
- Merged
- `/[OUTPUT DIR]/result`

## Completed Tasks

- Merge proto related branches and implement (#62)
- Implement Worker Server (#63)
- Implement Exchange Server (#64)
- Implement Master Server (#65)
- CLI and Initialize (#66)

## Assigned Tasks

- Refactor Servers and Clients (#75)
- @betarixm
- Extract Key Range from Worker (#76)
- @leejiwon1125
- E2E Test (Small) (#77)
- @betarixm, @hataehyeok, @leejiwon1125
- Exchange Server and Client (#78)
- @leejiwon1125
- E2E Test (Real World) (#79)
- @betarixm, @hataehyeok, @leejiwon1125
- Grpc Test (#80)
- @hataehyeok
- Logging (#81)
- @betarixm
34 changes: 33 additions & 1 deletion master/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,39 @@
package kr.ac.postech.paranode.master

import kr.ac.postech.paranode.core.WorkerMetadata
import kr.ac.postech.paranode.rpc.MasterServer

import java.net._
import scala.util.Try

object Main {
def main(args: Array[String]): Unit = {
println("Hello world!")
val numberOfWorker = Try(args(0).toInt).getOrElse {
println("Invalid command")
return
}

val server = new MasterServer(scala.concurrent.ExecutionContext.global)
server.startServer()

while (server.getWorkerDetails.size < numberOfWorker) {
Thread.sleep(1000)
}

val workerInfo: List[WorkerMetadata] = server.getWorkerDetails

assert(workerInfo.size == numberOfWorker)

try {
val ipAddress = InetAddress.getLocalHost.getHostAddress

println(ipAddress + ":" + server.getPort)
println(workerInfo.map(_.host).mkString(", "))
} catch {
case e: Exception => e.printStackTrace()
}
// TODO: save workerInfo and start WorkerClient

}

}
18 changes: 18 additions & 0 deletions rpc/src/main/protobuf/common.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package kr.ac.postech.paranode.rpc;

message Node {
string host = 1;
int32 port = 2;
}

message KeyRange {
bytes from = 1;
bytes to = 2;
}

message WorkerMetadata {
Node node = 1;
KeyRange keyRange = 2;
}
11 changes: 4 additions & 7 deletions rpc/src/main/protobuf/exchange.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ syntax = "proto3";
package kr.ac.postech.paranode.rpc;

service Exchange {

rpc SaveRecords (GetMyRecordsRequest) returns (GetMyRecordsReply) {}
rpc SaveRecords (SaveRecordsRequest) returns (SaveRecordsReply) {}
}

message GetMyRecordsRequest {
repeated string records = 1;
message SaveRecordsRequest {
repeated bytes records = 1;
}

message GetMyRecordsReply {
bool isNice = 1;
}
message SaveRecordsReply {}
19 changes: 0 additions & 19 deletions rpc/src/main/protobuf/hello.proto

This file was deleted.

13 changes: 5 additions & 8 deletions rpc/src/main/protobuf/master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@ syntax = "proto3";

package kr.ac.postech.paranode.rpc;

import "common.proto";

service Master {

rpc RegisterWorkerDirectory (RegisterRequest) returns (RegisterReply) {}
rpc Register (RegisterRequest) returns (RegisterReply) {}
}

message RegisterRequest {
string ipAddress = 1;
repeated string inputDirectory = 2;
string outputDirectory = 3;
Node worker = 1;
}

message RegisterReply {
bool isRegistered = 1;
}
message RegisterReply {}
39 changes: 14 additions & 25 deletions rpc/src/main/protobuf/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ syntax = "proto3";

package kr.ac.postech.paranode.rpc;

service Worker {
import "common.proto";

rpc SampleKeys (SampleRequest) returns (SampleReply) {}
rpc MakePartitions (PartitionRequest) returns (PartitionReply) {}
rpc ExchangeWithOtherWorker (ExchangeRequest) returns (ExchangeReply) {}
service Worker {
rpc Sample (SampleRequest) returns (SampleReply) {}
rpc Partition (PartitionRequest) returns (PartitionReply) {}
rpc Exchange (ExchangeRequest) returns (ExchangeReply) {}
rpc Merge (MergeRequest) returns (MergeReply) {}
}

Expand All @@ -15,33 +16,21 @@ message SampleRequest {
}

message SampleReply {
repeated string sampledKeys = 1;
bool isNice = 2;
repeated bytes sampledKeys = 1;
}

message PartitionRequest {

message WorkerPartition {
string workerIpAddress = 1;
string startKey = 2;
string endKey = 3;
}
repeated WorkerPartition partitions = 1;

repeated WorkerMetadata workers = 1;
}

message PartitionReply {
bool isNice = 1;
}

message ExchangeRequest { }
message PartitionReply {}

message ExchangeReply {
bool isNice = 1;
message ExchangeRequest {
repeated WorkerMetadata workers = 1;
}

message MergeRequest { }
message ExchangeReply {}

message MergeRequest {}

message MergeReply {
bool isNice = 1;
}
message MergeReply {}
41 changes: 14 additions & 27 deletions rpc/src/main/scala/ExchangeClient.scala
Original file line number Diff line number Diff line change
@@ -1,56 +1,43 @@
package kr.ac.postech.paranode.rpc

import com.google.protobuf.ByteString
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import kr.ac.postech.paranode.rpc.exchange.ExchangeGrpc
import kr.ac.postech.paranode.rpc.exchange.ExchangeGrpc.ExchangeBlockingStub
import kr.ac.postech.paranode.rpc.exchange.GetMyRecordsReply
import kr.ac.postech.paranode.rpc.exchange.GetMyRecordsRequest
import kr.ac.postech.paranode.core.Record

import java.util.concurrent.TimeUnit
import java.util.logging.Logger

import exchange.ExchangeGrpc.ExchangeBlockingStub
import exchange.{ExchangeGrpc, SaveRecordsReply, SaveRecordsRequest}

object ExchangeClient {
def apply(host: String, port: Int): ExchangeClient = {
val channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.build
.build()

val blockingStub = ExchangeGrpc.blockingStub(channel)
new ExchangeClient(channel, blockingStub)
}

def main(args: Array[String]): Unit = {
val client = ExchangeClient("localhost", 30050)
try {
val response = client.SaveRecords()

println("My records is: " + response.isNice)
} finally {
client.shutdown()
}
}
}

class ExchangeClient private (
private val channel: ManagedChannel,
private val blockingStub: ExchangeBlockingStub
) {
private[this] val logger = Logger.getLogger(classOf[WorkerClient].getName)

def shutdown(): Unit = {
channel.shutdown.awaitTermination(5, TimeUnit.SECONDS)
}

def SaveRecords(): GetMyRecordsReply = {
logger.info(
"Try to save my records"
)
def saveRecords(records: LazyList[Record]): SaveRecordsReply = {
val request =
SaveRecordsRequest(
records.map(x => ByteString.copyFrom(x.toChars.map(_.toByte)))
)

val request = GetMyRecordsRequest()
val response = blockingStub.saveRecords(request)
logger.info("My records is: " + response.isNice)
// TODO

response
blockingStub.saveRecords(request)
}
}
Loading

0 comments on commit 993094c

Please sign in to comment.