From 7eb51e68939df1927f0afae6ba51fb05a085b699 Mon Sep 17 00:00:00 2001 From: Minjae Gwon Date: Sat, 18 Nov 2023 09:54:26 +0900 Subject: [PATCH 01/13] chore: remove the unused --- rpc/src/main/protobuf/hello.proto | 19 ------------------- rpc/src/proto-file.proto | 0 2 files changed, 19 deletions(-) delete mode 100644 rpc/src/main/protobuf/hello.proto delete mode 100644 rpc/src/proto-file.proto diff --git a/rpc/src/main/protobuf/hello.proto b/rpc/src/main/protobuf/hello.proto deleted file mode 100644 index c8c3b72..0000000 --- a/rpc/src/main/protobuf/hello.proto +++ /dev/null @@ -1,19 +0,0 @@ -syntax = "proto3"; - -package kr.ac.postech.paranode.rpc; - -// The greeting service definition. -service Greeter { - // Sends a greeting - rpc SayHello (HelloRequest) returns (HelloReply) {} -} - -// The request message containing the user's name. -message HelloRequest { - string name = 1; -} - -// The response message containing the greetings -message HelloReply { - string message = 1; -} diff --git a/rpc/src/proto-file.proto b/rpc/src/proto-file.proto deleted file mode 100644 index e69de29..0000000 From 2d1be280bcfd028948f4d09ed32d9b6a9e39a044 Mon Sep 17 00:00:00 2001 From: Minjae Gwon Date: Sat, 18 Nov 2023 15:46:55 +0900 Subject: [PATCH 02/13] refactor: proto definitions along with skeletons --- build.sbt | 8 +- rpc/src/main/protobuf/common.proto | 18 ++++ rpc/src/main/protobuf/exchange.proto | 11 +-- rpc/src/main/protobuf/master.proto | 13 ++- rpc/src/main/protobuf/worker.proto | 39 +++----- rpc/src/main/scala/ExchangeClient.scala | 41 +++------ rpc/src/main/scala/ExchangeServer.scala | 32 ++++--- rpc/src/main/scala/MasterClient.scala | 47 +++------- rpc/src/main/scala/MasterServer.scala | 41 ++++----- rpc/src/main/scala/WorkerClient.scala | 114 +++++++++--------------- rpc/src/main/scala/WorkerServer.scala | 77 ++++++++-------- 11 files changed, 187 insertions(+), 254 deletions(-) create mode 100644 rpc/src/main/protobuf/common.proto diff --git a/build.sbt b/build.sbt index 3fe7c86..510ec1f 100644 --- a/build.sbt +++ b/build.sbt @@ -17,12 +17,13 @@ lazy val commonSettings = Seq( libraryDependencies ++= Seq( "org.scalactic" %% "scalactic" % "3.2.17", "org.scalatest" %% "scalatest" % "3.2.17" % "test", - "org.scalatest" %% "scalatest-flatspec" % "3.2.17" % "test" + "org.scalatest" %% "scalatest-flatspec" % "3.2.17" % "test", + "io.reactivex.rxjava3" % "rxjava" % "3.0.4" ) ) lazy val root = (project in file(".")) - .aggregate(core, utils, master, worker, network) + .aggregate(core, utils, master, worker, rpc) lazy val utils = (project in file("utils")) .settings( @@ -51,7 +52,7 @@ lazy val worker = (project in file("worker")) ) .dependsOn(core) -lazy val network = (project in file("rpc")) +lazy val rpc = (project in file("rpc")) .settings( commonSettings, idePackagePrefix := Some("kr.ac.postech.paranode.rpc"), @@ -63,3 +64,4 @@ lazy val network = (project in file("rpc")) scalapb.gen() -> (Compile / sourceManaged).value / "scalapb" ) ) + .dependsOn(core) diff --git a/rpc/src/main/protobuf/common.proto b/rpc/src/main/protobuf/common.proto new file mode 100644 index 0000000..89f014d --- /dev/null +++ b/rpc/src/main/protobuf/common.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package kr.ac.postech.paranode.rpc; + +message Node { + string host = 1; + int32 port = 2; +} + +message KeyRange { + bytes from = 1; + bytes to = 2; +} + +message WorkerMetadata { + Node node = 1; + KeyRange keyRange = 2; +} diff --git a/rpc/src/main/protobuf/exchange.proto b/rpc/src/main/protobuf/exchange.proto index b344a87..248b039 100644 --- a/rpc/src/main/protobuf/exchange.proto +++ b/rpc/src/main/protobuf/exchange.proto @@ -3,14 +3,11 @@ syntax = "proto3"; package kr.ac.postech.paranode.rpc; service Exchange { - - rpc SaveRecords (GetMyRecordsRequest) returns (GetMyRecordsReply) {} + rpc SaveRecords (SaveRecordsRequest) returns (SaveRecordsReply) {} } -message GetMyRecordsRequest { - repeated string records = 1; +message SaveRecordsRequest { + repeated bytes records = 1; } -message GetMyRecordsReply { - bool isNice = 1; -} \ No newline at end of file +message SaveRecordsReply {} diff --git a/rpc/src/main/protobuf/master.proto b/rpc/src/main/protobuf/master.proto index 8aa73e0..3cefd6f 100644 --- a/rpc/src/main/protobuf/master.proto +++ b/rpc/src/main/protobuf/master.proto @@ -2,17 +2,14 @@ syntax = "proto3"; package kr.ac.postech.paranode.rpc; +import "common.proto"; + service Master { - - rpc RegisterWorkerDirectory (RegisterRequest) returns (RegisterReply) {} + rpc Register (RegisterRequest) returns (RegisterReply) {} } message RegisterRequest { - string ipAddress = 1; - repeated string inputDirectory = 2; - string outputDirectory = 3; + Node worker = 1; } -message RegisterReply { - bool isRegistered = 1; -} \ No newline at end of file +message RegisterReply {} diff --git a/rpc/src/main/protobuf/worker.proto b/rpc/src/main/protobuf/worker.proto index c4f286f..d6b3e76 100644 --- a/rpc/src/main/protobuf/worker.proto +++ b/rpc/src/main/protobuf/worker.proto @@ -2,11 +2,12 @@ syntax = "proto3"; package kr.ac.postech.paranode.rpc; -service Worker { +import "common.proto"; - rpc SampleKeys (SampleRequest) returns (SampleReply) {} - rpc MakePartitions (PartitionRequest) returns (PartitionReply) {} - rpc ExchangeWithOtherWorker (ExchangeRequest) returns (ExchangeReply) {} +service Worker { + rpc Sample (SampleRequest) returns (SampleReply) {} + rpc Partition (PartitionRequest) returns (PartitionReply) {} + rpc Exchange (ExchangeRequest) returns (ExchangeReply) {} rpc Merge (MergeRequest) returns (MergeReply) {} } @@ -15,33 +16,21 @@ message SampleRequest { } message SampleReply { - repeated string sampledKeys = 1; - bool isNice = 2; + repeated bytes sampledKeys = 1; } message PartitionRequest { - - message WorkerPartition { - string workerIpAddress = 1; - string startKey = 2; - string endKey = 3; - } - repeated WorkerPartition partitions = 1; - + repeated WorkerMetadata workers = 1; } -message PartitionReply { - bool isNice = 1; -} - -message ExchangeRequest { } +message PartitionReply {} -message ExchangeReply { - bool isNice = 1; +message ExchangeRequest { + repeated WorkerMetadata workers = 1; } -message MergeRequest { } +message ExchangeReply {} + +message MergeRequest {} -message MergeReply { - bool isNice = 1; -} \ No newline at end of file +message MergeReply {} diff --git a/rpc/src/main/scala/ExchangeClient.scala b/rpc/src/main/scala/ExchangeClient.scala index 7758a8c..7281e9a 100644 --- a/rpc/src/main/scala/ExchangeClient.scala +++ b/rpc/src/main/scala/ExchangeClient.scala @@ -1,56 +1,43 @@ package kr.ac.postech.paranode.rpc +import com.google.protobuf.ByteString import io.grpc.ManagedChannel import io.grpc.ManagedChannelBuilder -import kr.ac.postech.paranode.rpc.exchange.ExchangeGrpc -import kr.ac.postech.paranode.rpc.exchange.ExchangeGrpc.ExchangeBlockingStub -import kr.ac.postech.paranode.rpc.exchange.GetMyRecordsReply -import kr.ac.postech.paranode.rpc.exchange.GetMyRecordsRequest +import kr.ac.postech.paranode.core.Record import java.util.concurrent.TimeUnit -import java.util.logging.Logger + +import exchange.ExchangeGrpc.ExchangeBlockingStub +import exchange.{ExchangeGrpc, SaveRecordsReply, SaveRecordsRequest} object ExchangeClient { def apply(host: String, port: Int): ExchangeClient = { val channel = ManagedChannelBuilder .forAddress(host, port) .usePlaintext() - .build + .build() + val blockingStub = ExchangeGrpc.blockingStub(channel) new ExchangeClient(channel, blockingStub) } - - def main(args: Array[String]): Unit = { - val client = ExchangeClient("localhost", 30050) - try { - val response = client.SaveRecords() - - println("My records is: " + response.isNice) - } finally { - client.shutdown() - } - } } class ExchangeClient private ( private val channel: ManagedChannel, private val blockingStub: ExchangeBlockingStub ) { - private[this] val logger = Logger.getLogger(classOf[WorkerClient].getName) - def shutdown(): Unit = { channel.shutdown.awaitTermination(5, TimeUnit.SECONDS) } - def SaveRecords(): GetMyRecordsReply = { - logger.info( - "Try to save my records" - ) + def saveRecords(records: LazyList[Record]): SaveRecordsReply = { + val request = + SaveRecordsRequest( + records.map(x => ByteString.copyFrom(x.toChars.map(_.toByte))) + ) - val request = GetMyRecordsRequest() - val response = blockingStub.saveRecords(request) - logger.info("My records is: " + response.isNice) + // TODO - response + blockingStub.saveRecords(request) } } diff --git a/rpc/src/main/scala/ExchangeServer.scala b/rpc/src/main/scala/ExchangeServer.scala index 5893269..81694e9 100644 --- a/rpc/src/main/scala/ExchangeServer.scala +++ b/rpc/src/main/scala/ExchangeServer.scala @@ -2,13 +2,13 @@ package kr.ac.postech.paranode.rpc import io.grpc.Server import io.grpc.ServerBuilder -import kr.ac.postech.paranode.rpc.exchange.ExchangeGrpc -import kr.ac.postech.paranode.rpc.exchange.GetMyRecordsReply -import kr.ac.postech.paranode.rpc.exchange.GetMyRecordsRequest import java.util.logging.Logger import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.concurrent.Promise + +import exchange.{ExchangeGrpc, SaveRecordsReply, SaveRecordsRequest} object ExchangeServer { private val logger = Logger.getLogger(classOf[ExchangeServer].getName) @@ -23,14 +23,13 @@ object ExchangeServer { } class ExchangeServer(executionContext: ExecutionContext) { self => - private[this] var server: Server = null + private[this] val server: Server = ServerBuilder + .forPort(ExchangeServer.port) + .addService(ExchangeGrpc.bindService(new ExchangeImpl, executionContext)) + .build() private def start(): Unit = { - server = ServerBuilder - .forPort(ExchangeServer.port) - .addService(ExchangeGrpc.bindService(new ExchangeImpl, executionContext)) - .build - .start + server.start() ExchangeServer.logger.info( "Server started, listening on " + ExchangeServer.port @@ -59,11 +58,16 @@ class ExchangeServer(executionContext: ExecutionContext) { self => private class ExchangeImpl extends ExchangeGrpc.Exchange { override def saveRecords( - request: GetMyRecordsRequest - ): Future[GetMyRecordsReply] = { - // TODO - val reply = GetMyRecordsReply(isNice = true) - Future.successful(reply) + request: SaveRecordsRequest + ): Future[SaveRecordsReply] = { + val promise = Promise[SaveRecordsReply] + + Future { + // TODO: Logic + promise.success(new SaveRecordsReply()) + }(executionContext) + + promise.future } } diff --git a/rpc/src/main/scala/MasterClient.scala b/rpc/src/main/scala/MasterClient.scala index 5b8afe6..d98f49c 100644 --- a/rpc/src/main/scala/MasterClient.scala +++ b/rpc/src/main/scala/MasterClient.scala @@ -2,15 +2,15 @@ package kr.ac.postech.paranode.rpc import io.grpc.ManagedChannel import io.grpc.ManagedChannelBuilder -import io.grpc.StatusRuntimeException -import kr.ac.postech.paranode.rpc.master.MasterGrpc -import kr.ac.postech.paranode.rpc.master.MasterGrpc.MasterBlockingStub -import kr.ac.postech.paranode.rpc.master.RegisterRequest +import kr.ac.postech.paranode.core.WorkerMetadata import java.util.concurrent.TimeUnit -import java.util.logging.Level import java.util.logging.Logger +import common.Node +import master.MasterGrpc.MasterBlockingStub +import master.{MasterGrpc, RegisterRequest} + object MasterClient { def apply(host: String, port: Int): MasterClient = { val channel = @@ -18,25 +18,13 @@ object MasterClient { val blockingStub = MasterGrpc.blockingStub(channel) new MasterClient(channel, blockingStub) } - - def main(args: Array[String]): Unit = { - val client = MasterClient("localhost", 50051) - try { - val ipAddress = "1.2.3.4" - val inputDirectory = List("a/a", "b/b") - val outputDirectory = "c/c" - client.register(ipAddress, inputDirectory, outputDirectory) - } finally { - client.shutdown() - } - } } class MasterClient private ( private val channel: ManagedChannel, private val blockingStub: MasterBlockingStub ) { - private[this] val logger = Logger.getLogger(classOf[MasterClient].getName) + Logger.getLogger(classOf[MasterClient].getName) def shutdown(): Unit = { channel.shutdown.awaitTermination(5, TimeUnit.SECONDS) @@ -44,27 +32,12 @@ class MasterClient private ( /** Say hello to server. */ def register( - ipAddress: String, - inputDirectory: List[String], - outputDirectory: String + workerMetadata: WorkerMetadata ): Unit = { - logger.info( - "Try to register " + ipAddress + " | " + inputDirectory.mkString( - ", " - ) + " | " + outputDirectory + ", " - ) - val request = RegisterRequest( - ipAddress, - inputDirectory, - outputDirectory + Some(Node(workerMetadata.host, workerMetadata.port)) ) - try { - val response = blockingStub.registerWorkerDirectory(request) - logger.info("registering result: " + response.isRegistered) - } catch { - case e: StatusRuntimeException => - logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus) - } + + blockingStub.register(request) } } diff --git a/rpc/src/main/scala/MasterServer.scala b/rpc/src/main/scala/MasterServer.scala index 10cf2a8..fa8e5c6 100644 --- a/rpc/src/main/scala/MasterServer.scala +++ b/rpc/src/main/scala/MasterServer.scala @@ -2,13 +2,13 @@ package kr.ac.postech.paranode.rpc import io.grpc.Server import io.grpc.ServerBuilder -import kr.ac.postech.paranode.rpc.master.MasterGrpc -import kr.ac.postech.paranode.rpc.master.RegisterReply -import kr.ac.postech.paranode.rpc.master.RegisterRequest import java.util.logging.Logger import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.concurrent.Promise + +import master.{MasterGrpc, RegisterReply, RegisterRequest} object MasterServer { private val logger = Logger.getLogger(classOf[MasterServer].getName) @@ -23,17 +23,18 @@ object MasterServer { } class MasterServer(executionContext: ExecutionContext) { self => - private[this] var server: Server = null + private[this] val server: Server = ServerBuilder + .forPort(MasterServer.port) + .addService(MasterGrpc.bindService(new MasterImpl, executionContext)) + .build() private def start(): Unit = { - server = ServerBuilder - .forPort(MasterServer.port) - .addService(MasterGrpc.bindService(new MasterImpl, executionContext)) - .build - .start + server.start() + MasterServer.logger.info( "Server started, listening on " + MasterServer.port ) + sys.addShutdownHook { System.err.println( "*** shutting down gRPC server since JVM is shutting down" @@ -56,21 +57,15 @@ class MasterServer(executionContext: ExecutionContext) { self => } private class MasterImpl extends MasterGrpc.Master { - override def registerWorkerDirectory( - request: RegisterRequest - ): Future[RegisterReply] = { - System.err.println("*** server side code working") - System.err.println( - s"*** Received registration request: ipAddress = ${request.ipAddress}" - ) - System.err.println( - s"*** Input Directories: ${request.inputDirectory.mkString(", ")}" - ) - System.err.println(s"*** Output Directory: ${request.outputDirectory}") + override def register(request: RegisterRequest): Future[RegisterReply] = { + val promise = Promise[RegisterReply] - val reply = RegisterReply(isRegistered = true) - Future.successful(reply) + Future { + // TODO: Logic + promise.success(new RegisterReply()) + }(executionContext) + + promise.future } } - } diff --git a/rpc/src/main/scala/WorkerClient.scala b/rpc/src/main/scala/WorkerClient.scala index 9265012..4dcded4 100644 --- a/rpc/src/main/scala/WorkerClient.scala +++ b/rpc/src/main/scala/WorkerClient.scala @@ -1,22 +1,22 @@ package kr.ac.postech.paranode.rpc +import com.google.protobuf.ByteString import io.grpc.ManagedChannel import io.grpc.ManagedChannelBuilder -import kr.ac.postech.paranode.rpc.worker.ExchangeReply -import kr.ac.postech.paranode.rpc.worker.ExchangeRequest -import kr.ac.postech.paranode.rpc.worker.MergeReply -import kr.ac.postech.paranode.rpc.worker.MergeRequest -import kr.ac.postech.paranode.rpc.worker.PartitionReply -import kr.ac.postech.paranode.rpc.worker.PartitionRequest -import kr.ac.postech.paranode.rpc.worker.PartitionRequest.WorkerPartition -import kr.ac.postech.paranode.rpc.worker.SampleReply -import kr.ac.postech.paranode.rpc.worker.SampleRequest -import kr.ac.postech.paranode.rpc.worker.WorkerGrpc -import kr.ac.postech.paranode.rpc.worker.WorkerGrpc.WorkerBlockingStub +import kr.ac.postech.paranode.core.KeyRange +import kr.ac.postech.paranode.core.WorkerMetadata import java.util.concurrent.TimeUnit import java.util.logging.Logger +import worker._ +import worker.WorkerGrpc.WorkerBlockingStub +import common.{ + Node, + KeyRange => RpcKeyRange, + WorkerMetadata => RpcWorkerMetadata +} + object WorkerClient { def apply(host: String, port: Int): WorkerClient = { val channel = ManagedChannelBuilder @@ -26,95 +26,61 @@ object WorkerClient { val blockingStub = WorkerGrpc.blockingStub(channel) new WorkerClient(channel, blockingStub) } - - def main(args: Array[String]): Unit = { - val client = WorkerClient("localhost", 30040) - try { - // TODO: later, numberOfKeys should be given by the master or we don't need this - val numberOfKeys = 3 - val responseOfSample = client.sample(numberOfKeys) - - println( - "Sampled keys: " + responseOfSample.sampledKeys.mkString(", ") - ) - - // TODO: Add more logic for link above sampled keys logic and below partition logic - val workerPartitions = List( - Tuple3("111.222.333.1", "startKey1", "endKey1"), - Tuple3("111.222.333.2", "startKey2", "endKey2") - ) - - val responseOfPartition = client.partition(workerPartitions) - - println("Partitioned is: " + responseOfPartition.isNice) - } finally { - client.shutdown() - } - } } class WorkerClient private ( private val channel: ManagedChannel, private val blockingStub: WorkerBlockingStub ) { - private[this] val logger = Logger.getLogger(classOf[WorkerClient].getName) + Logger.getLogger(classOf[WorkerClient].getName) def shutdown(): Unit = { channel.shutdown.awaitTermination(5, TimeUnit.SECONDS) } def sample(numberOfKeys: Int): SampleReply = { - logger.info( - "Try to sample " + numberOfKeys + " keys" - ) - val request = SampleRequest(numberOfKeys) - val response = blockingStub.sampleKeys(request) - logger.info("Sampled keys: " + response.sampledKeys.mkString(", ")) + val response = blockingStub.sample(request) response } def partition( - workerPartitions: List[(String, String, String)] + workers: List[(WorkerMetadata, KeyRange)] ): PartitionReply = { - logger.info( - "Try to partition" - ) - val partitions = workerPartitions.map { - case (ipAddress, startKey, endKey) => - WorkerPartition(ipAddress, startKey, endKey) - } - - val request = PartitionRequest(partitions) - val response = blockingStub.makePartitions(request) - logger.info("Partitioned is: " + response.isNice) + val request = PartitionRequest(workers.map({ case (worker, keyRange) => + RpcWorkerMetadata( + Some(Node(worker.host, worker.port)), + Some( + RpcKeyRange( + ByteString.copyFrom(keyRange.from.underlying), + ByteString.copyFrom(keyRange.to.underlying) + ) + ) + ) + })) - response + blockingStub.partition(request) } - def exchange(): ExchangeReply = { - logger.info( - "Try to exchange" - ) - - val request = ExchangeRequest() - val response = blockingStub.exchangeWithOtherWorker(request) - logger.info("Exchanged is: " + response.isNice) + def exchange(workers: List[(WorkerMetadata, KeyRange)]): ExchangeReply = { + val request = ExchangeRequest(workers.map({ case (worker, keyRange) => + RpcWorkerMetadata( + Some(Node(worker.host, worker.port)), + Some( + RpcKeyRange( + ByteString.copyFrom(keyRange.from.underlying), + ByteString.copyFrom(keyRange.to.underlying) + ) + ) + ) + })) - response + blockingStub.exchange(request) } def merge(): MergeReply = { - logger.info( - "Try to merge" - ) - val request = MergeRequest() - val response = blockingStub.merge(request) - logger.info("Merged is: " + response.isNice) - - response + blockingStub.merge(request) } - } diff --git a/rpc/src/main/scala/WorkerServer.scala b/rpc/src/main/scala/WorkerServer.scala index e087d6b..367f92d 100644 --- a/rpc/src/main/scala/WorkerServer.scala +++ b/rpc/src/main/scala/WorkerServer.scala @@ -2,19 +2,13 @@ package kr.ac.postech.paranode.rpc import io.grpc.Server import io.grpc.ServerBuilder -import kr.ac.postech.paranode.rpc.worker.ExchangeReply -import kr.ac.postech.paranode.rpc.worker.ExchangeRequest -import kr.ac.postech.paranode.rpc.worker.MergeReply -import kr.ac.postech.paranode.rpc.worker.MergeRequest -import kr.ac.postech.paranode.rpc.worker.PartitionReply -import kr.ac.postech.paranode.rpc.worker.PartitionRequest -import kr.ac.postech.paranode.rpc.worker.SampleReply -import kr.ac.postech.paranode.rpc.worker.SampleRequest -import kr.ac.postech.paranode.rpc.worker.WorkerGrpc import java.util.logging.Logger import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.concurrent.Promise + +import worker._ object WorkerServer { private val logger = Logger.getLogger(classOf[WorkerServer].getName) @@ -29,14 +23,13 @@ object WorkerServer { } class WorkerServer(executionContext: ExecutionContext) { self => - private[this] var server: Server = null + private[this] val server: Server = ServerBuilder + .forPort(WorkerServer.port) + .addService(WorkerGrpc.bindService(new WorkerImpl, executionContext)) + .build() private def start(): Unit = { - server = ServerBuilder - .forPort(WorkerServer.port) - .addService(WorkerGrpc.bindService(new WorkerImpl, executionContext)) - .build - .start + server.start() WorkerServer.logger.info( "Server started, listening on " + WorkerServer.port @@ -64,39 +57,51 @@ class WorkerServer(executionContext: ExecutionContext) { self => } private class WorkerImpl extends WorkerGrpc.Worker { - override def sampleKeys(request: SampleRequest): Future[SampleReply] = { + override def sample(request: SampleRequest): Future[SampleReply] = { + val promise = Promise[SampleReply] - // TODO: Implement the logic to sample keys from the input directory. - // val sampledKeys = blockOfWorker.sample(block).map(_.toString).toList - val sampledKeys = List("0x1", "0x2", "0x3") - // TODO: Implement the logic to check whether the sampled keys are nice + Future { + // TODO: Logic + promise.success(new SampleReply()) + }(executionContext) - val reply = SampleReply(sampledKeys, isNice = true) - Future.successful(reply) + promise.future } - override def makePartitions( + override def partition( request: PartitionRequest ): Future[PartitionReply] = { - // TODO - val reply = PartitionReply(isNice = true) - Future.successful(reply) + val promise = Promise[PartitionReply] + + Future { + // TODO: Logic + promise.success(new PartitionReply()) + }(executionContext) + + promise.future } - override def exchangeWithOtherWorker( - request: ExchangeRequest - ): Future[ExchangeReply] = { - // TODO - val reply = ExchangeReply(isNice = true) - Future.successful(reply) + override def exchange(request: ExchangeRequest): Future[ExchangeReply] = { + val promise = Promise[ExchangeReply] + + Future { + // TODO: Logic + promise.success(new ExchangeReply()) + }(executionContext) + + promise.future } override def merge(request: MergeRequest): Future[MergeReply] = { - // TODO - val reply = MergeReply(isNice = true) - Future.successful(reply) - } + val promise = Promise[MergeReply] + Future { + // TODO: Logic + promise.success(new MergeReply()) + }(executionContext) + + promise.future + } } } From 53b57772bd57a88b06e57aa6c69261ddc842e3c7 Mon Sep 17 00:00:00 2001 From: leejiwon1125 Date: Wed, 22 Nov 2023 02:18:27 +0900 Subject: [PATCH 03/13] feat: make master server to register worker ip address and port --- rpc/src/main/scala/MasterServer.scala | 30 +++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/rpc/src/main/scala/MasterServer.scala b/rpc/src/main/scala/MasterServer.scala index fa8e5c6..d30e3db 100644 --- a/rpc/src/main/scala/MasterServer.scala +++ b/rpc/src/main/scala/MasterServer.scala @@ -3,6 +3,10 @@ package kr.ac.postech.paranode.rpc import io.grpc.Server import io.grpc.ServerBuilder +import java.io.File +import java.io.PrintWriter +import java.nio.file.Files +import java.nio.file.Paths import java.util.logging.Logger import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -61,8 +65,30 @@ class MasterServer(executionContext: ExecutionContext) { self => val promise = Promise[RegisterReply] Future { - // TODO: Logic - promise.success(new RegisterReply()) + try { + val dirPath = Paths.get("worker_register") + if (!Files.exists(dirPath)) { + Files.createDirectories(dirPath) + } + val filePath = + dirPath.resolve(request.worker.get.host + ".txt").toString + + val writer = new PrintWriter(new File(filePath), "UTF-8") + try { + writer.println( + s"Worker Host: ${request.worker.get.host}, Worker Port: ${request.worker.get.port}" + ) + } finally { + writer.close() + } + promise.success(new RegisterReply()) + } catch { + case e: Exception => + MasterServer.logger.warning( + "Failed to write to file: " + e.getMessage + ) + promise.failure(e) + } }(executionContext) promise.future From 308903e3b446e6fdd3db47276aad9f3e71150652 Mon Sep 17 00:00:00 2001 From: leejiwon1125 Date: Wed, 22 Nov 2023 15:28:20 +0900 Subject: [PATCH 04/13] feat: make master server to save requeste count --- rpc/src/main/scala/MasterServer.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/rpc/src/main/scala/MasterServer.scala b/rpc/src/main/scala/MasterServer.scala index d30e3db..1f53233 100644 --- a/rpc/src/main/scala/MasterServer.scala +++ b/rpc/src/main/scala/MasterServer.scala @@ -11,9 +11,10 @@ import java.util.logging.Logger import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise - import master.{MasterGrpc, RegisterReply, RegisterRequest} +import kr.ac.postech.paranode.rpc.MasterServer.port + object MasterServer { private val logger = Logger.getLogger(classOf[MasterServer].getName) @@ -31,6 +32,15 @@ class MasterServer(executionContext: ExecutionContext) { self => .forPort(MasterServer.port) .addService(MasterGrpc.bindService(new MasterImpl, executionContext)) .build() + + private var requestCount = 0 + + def incrementRequestCount(): Unit = synchronized { + requestCount += 1 + } + + def getRequestCount: Int = requestCount + def getPort:String = port.toString private def start(): Unit = { server.start() @@ -48,6 +58,9 @@ class MasterServer(executionContext: ExecutionContext) { self => } } + def startServer(): Unit = this.start() + def stopServer(): Unit = this.stop() + private def stop(): Unit = { if (server != null) { server.shutdown() @@ -79,6 +92,7 @@ class MasterServer(executionContext: ExecutionContext) { self => s"Worker Host: ${request.worker.get.host}, Worker Port: ${request.worker.get.port}" ) } finally { + self.incrementRequestCount() writer.close() } promise.success(new RegisterReply()) From b0073cf7bac2a83213ef12ae7373420c0b450829 Mon Sep 17 00:00:00 2001 From: leejiwon1125 Date: Wed, 22 Nov 2023 15:30:36 +0900 Subject: [PATCH 05/13] feat: make initialization code for master node --- master/src/main/scala/Main.scala | 67 +++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/master/src/main/scala/Main.scala b/master/src/main/scala/Main.scala index cfd7e1d..237b0d0 100644 --- a/master/src/main/scala/Main.scala +++ b/master/src/main/scala/Main.scala @@ -1,7 +1,72 @@ package kr.ac.postech.paranode.master +import kr.ac.postech.paranode.core.WorkerMetadata +import kr.ac.postech.paranode.rpc.MasterServer + +import scala.util.Try +import scala.io.Source +import java.net.URL +import java.nio.file.{Files, Paths} +import scala.collection.mutable.ListBuffer + object Main { def main(args: Array[String]): Unit = { - println("Hello world!") + val requestLimit = Try(args(1).toInt).getOrElse { + println("Invalid command") + return + } + + val server = new MasterServer(scala.concurrent.ExecutionContext.global) + server.startServer() + + while (server.getRequestCount < requestLimit) { + Thread.sleep(1000) + } + + val workerInfo: List[WorkerMetadata] = getWorkerDetails() + + assert(workerInfo.size == requestLimit) + + try { + val url = new URL("http://checkip.amazonaws.com") + val source = Source.fromURL(url) + val publicIpAddress = source.mkString.trim + source.close() + + println(publicIpAddress + ":" + server.getPort) + println(workerInfo.map(_.host).mkString(", ")) + } catch { + case e: Exception => e.printStackTrace() + } + + // TODO: start WorkerClient + + } +} + +def getWorkerDetails(): List[WorkerMetadata] = { + + val dirPath = Paths.get("worker_register") + val workerDetails = ListBuffer[WorkerMetadata]() + + if (Files.exists(dirPath)) { + val files = Files.list(dirPath).toArray + files.foreach { file => + val source = Source.fromFile(file.toString) + try { + for (line <- source.getLines) { + val parts = line.split(", ").map(_.split(": ").last) + if (parts.length == 2) { + val ip = parts(0) + val port = parts(1).toInt + workerDetails += (WorkerMetadata(ip, port, None)) + } + } + } finally { + source.close() + } + } } + + workerDetails.toList } From d1f1d25ca6430f766d64d839b0b3a215a236cc56 Mon Sep 17 00:00:00 2001 From: leejiwon1125 Date: Wed, 22 Nov 2023 15:32:18 +0900 Subject: [PATCH 06/13] feat: add main function for testing purpose --- rpc/src/main/scala/MasterClient.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rpc/src/main/scala/MasterClient.scala b/rpc/src/main/scala/MasterClient.scala index d98f49c..0ecda08 100644 --- a/rpc/src/main/scala/MasterClient.scala +++ b/rpc/src/main/scala/MasterClient.scala @@ -18,6 +18,17 @@ object MasterClient { val blockingStub = MasterGrpc.blockingStub(channel) new MasterClient(channel, blockingStub) } + + def main(args: Array[String]): Unit = { + val client = MasterClient("localhost", 50051) + try { + val workerMetadata = WorkerMetadata("1.2.3.4", 56, None) + client.register(workerMetadata) + } finally { + client.shutdown() + } + } + } class MasterClient private ( From a46b0ab19c78fabb59d57f4430934933184798d3 Mon Sep 17 00:00:00 2001 From: Minjae Gwon Date: Wed, 22 Nov 2023 17:16:57 +0900 Subject: [PATCH 07/13] fix: make master and worker depends on rpc --- build.sbt | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/build.sbt b/build.sbt index 510ec1f..5cddebe 100644 --- a/build.sbt +++ b/build.sbt @@ -38,30 +38,32 @@ lazy val core = (project in file("core")) ) .dependsOn(utils) -lazy val master = (project in file("master")) +lazy val rpc = (project in file("rpc")) .settings( commonSettings, - idePackagePrefix := Some("kr.ac.postech.paranode.master") + idePackagePrefix := Some("kr.ac.postech.paranode.rpc"), + libraryDependencies ++= Seq( + "io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion, + "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion + ), + Compile / PB.targets := Seq( + scalapb.gen() -> (Compile / sourceManaged).value / "scalapb" + ) ) .dependsOn(core) -lazy val worker = (project in file("worker")) +lazy val master = (project in file("master")) .settings( commonSettings, - idePackagePrefix := Some("kr.ac.postech.paranode.worker") + idePackagePrefix := Some("kr.ac.postech.paranode.master") ) .dependsOn(core) + .dependsOn(rpc) -lazy val rpc = (project in file("rpc")) +lazy val worker = (project in file("worker")) .settings( commonSettings, - idePackagePrefix := Some("kr.ac.postech.paranode.rpc"), - libraryDependencies ++= Seq( - "io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion, - "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion - ), - Compile / PB.targets := Seq( - scalapb.gen() -> (Compile / sourceManaged).value / "scalapb" - ) + idePackagePrefix := Some("kr.ac.postech.paranode.worker") ) .dependsOn(core) + .dependsOn(rpc) From f4d94e50a2de88d6b4d707e65a503c4e04f953df Mon Sep 17 00:00:00 2001 From: leejiwon1125 Date: Wed, 22 Nov 2023 20:47:34 +0900 Subject: [PATCH 08/13] feat: make initialization code for worker node --- master/src/main/scala/AuxFunction.scala | 39 +++++++++++++++++++++ master/src/main/scala/Main.scala | 37 +++----------------- rpc/src/main/scala/MasterServer.scala | 8 ++--- worker/src/main/scala/Main.scala | 46 ++++++++++++++++++++++++- 4 files changed, 92 insertions(+), 38 deletions(-) create mode 100644 master/src/main/scala/AuxFunction.scala diff --git a/master/src/main/scala/AuxFunction.scala b/master/src/main/scala/AuxFunction.scala new file mode 100644 index 0000000..621ed76 --- /dev/null +++ b/master/src/main/scala/AuxFunction.scala @@ -0,0 +1,39 @@ +package kr.ac.postech.paranode.master + +import kr.ac.postech.paranode.core.WorkerMetadata + +import java.nio.file.Files +import java.nio.file.Paths +import scala.collection.mutable.ListBuffer +import scala.io.Source + +object AuxFunction { + + def getWorkerDetails(): List[WorkerMetadata] = { + + val dirPath = Paths.get("worker_register") + val workerDetails = ListBuffer[WorkerMetadata]() + + if (Files.exists(dirPath)) { + val files = Files.list(dirPath).toArray + files.foreach { file => + val source = Source.fromFile(file.toString) + try { + for (line <- source.getLines) { + val parts = line.split(", ").map(_.split(": ").last) + if (parts.length == 2) { + val ip = parts(0) + val port = parts(1).toInt + workerDetails += (WorkerMetadata(ip, port, None)) + } + } + } finally { + source.close() + } + } + } + + workerDetails.toList + } + +} diff --git a/master/src/main/scala/Main.scala b/master/src/main/scala/Main.scala index 237b0d0..b51bb49 100644 --- a/master/src/main/scala/Main.scala +++ b/master/src/main/scala/Main.scala @@ -1,17 +1,16 @@ package kr.ac.postech.paranode.master import kr.ac.postech.paranode.core.WorkerMetadata +import kr.ac.postech.paranode.master.AuxFunction.getWorkerDetails import kr.ac.postech.paranode.rpc.MasterServer -import scala.util.Try -import scala.io.Source import java.net.URL -import java.nio.file.{Files, Paths} -import scala.collection.mutable.ListBuffer +import scala.io.Source +import scala.util.Try object Main { def main(args: Array[String]): Unit = { - val requestLimit = Try(args(1).toInt).getOrElse { + val requestLimit = Try(args(0).toInt).getOrElse { println("Invalid command") return } @@ -38,35 +37,7 @@ object Main { } catch { case e: Exception => e.printStackTrace() } - // TODO: start WorkerClient } } - -def getWorkerDetails(): List[WorkerMetadata] = { - - val dirPath = Paths.get("worker_register") - val workerDetails = ListBuffer[WorkerMetadata]() - - if (Files.exists(dirPath)) { - val files = Files.list(dirPath).toArray - files.foreach { file => - val source = Source.fromFile(file.toString) - try { - for (line <- source.getLines) { - val parts = line.split(", ").map(_.split(": ").last) - if (parts.length == 2) { - val ip = parts(0) - val port = parts(1).toInt - workerDetails += (WorkerMetadata(ip, port, None)) - } - } - } finally { - source.close() - } - } - } - - workerDetails.toList -} diff --git a/rpc/src/main/scala/MasterServer.scala b/rpc/src/main/scala/MasterServer.scala index 1f53233..fa9f91d 100644 --- a/rpc/src/main/scala/MasterServer.scala +++ b/rpc/src/main/scala/MasterServer.scala @@ -2,6 +2,7 @@ package kr.ac.postech.paranode.rpc import io.grpc.Server import io.grpc.ServerBuilder +import kr.ac.postech.paranode.rpc.MasterServer.port import java.io.File import java.io.PrintWriter @@ -11,9 +12,8 @@ import java.util.logging.Logger import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise -import master.{MasterGrpc, RegisterReply, RegisterRequest} -import kr.ac.postech.paranode.rpc.MasterServer.port +import master.{MasterGrpc, RegisterReply, RegisterRequest} object MasterServer { private val logger = Logger.getLogger(classOf[MasterServer].getName) @@ -32,7 +32,7 @@ class MasterServer(executionContext: ExecutionContext) { self => .forPort(MasterServer.port) .addService(MasterGrpc.bindService(new MasterImpl, executionContext)) .build() - + private var requestCount = 0 def incrementRequestCount(): Unit = synchronized { @@ -40,7 +40,7 @@ class MasterServer(executionContext: ExecutionContext) { self => } def getRequestCount: Int = requestCount - def getPort:String = port.toString + def getPort: String = port.toString private def start(): Unit = { server.start() diff --git a/worker/src/main/scala/Main.scala b/worker/src/main/scala/Main.scala index bac6422..2e7e7f6 100644 --- a/worker/src/main/scala/Main.scala +++ b/worker/src/main/scala/Main.scala @@ -1,7 +1,51 @@ package kr.ac.postech.paranode.worker +import kr.ac.postech.paranode.core.WorkerMetadata +import kr.ac.postech.paranode.rpc.MasterClient + +import java.net.URL +import scala.io.Source +import scala.util.Try + object Main { def main(args: Array[String]): Unit = { - println("Hello world!") + if (args.length < 5) { + println("Usage: worker -I -O ") + return + } + + val Array(ip, portStr) = args(0).split(":") + val port = Try(portStr.toInt).getOrElse { + println("Invalid port number.") + return + } + + val inputDirIndex = args.indexOf("-I") + 1 + val outputDirIndex = args.indexOf("-O") + 1 + + if (inputDirIndex <= 0 || outputDirIndex <= 0) { + println("Input or Output directories not specified correctly.") + return + } + + args.slice(inputDirIndex, outputDirIndex - 1) + args(outputDirIndex) + + // Open MasterClient and request register + val client = MasterClient(ip, port) + try { + val url = new URL("http://checkip.amazonaws.com") + val source = Source.fromURL(url) + val publicIpAddress = source.mkString.trim + source.close() + + val workerMetadata = WorkerMetadata(publicIpAddress, -1, None) + client.register(workerMetadata) + + } finally { + client.shutdown() + } + } + } From b5db92ab7c09755f9de355b1f7906e129cf55542 Mon Sep 17 00:00:00 2001 From: leejiwon1125 Date: Wed, 22 Nov 2023 23:29:16 +0900 Subject: [PATCH 09/13] feat: reflect suggested request 1. change value name 2. make not to use file 3. use InetAddress --- master/src/main/scala/AuxFunction.scala | 39 ------------------------ master/src/main/scala/Main.scala | 21 ++++++------- rpc/src/main/scala/MasterServer.scala | 40 +++++++------------------ worker/src/main/scala/Main.scala | 9 +++--- 4 files changed, 25 insertions(+), 84 deletions(-) delete mode 100644 master/src/main/scala/AuxFunction.scala diff --git a/master/src/main/scala/AuxFunction.scala b/master/src/main/scala/AuxFunction.scala deleted file mode 100644 index 621ed76..0000000 --- a/master/src/main/scala/AuxFunction.scala +++ /dev/null @@ -1,39 +0,0 @@ -package kr.ac.postech.paranode.master - -import kr.ac.postech.paranode.core.WorkerMetadata - -import java.nio.file.Files -import java.nio.file.Paths -import scala.collection.mutable.ListBuffer -import scala.io.Source - -object AuxFunction { - - def getWorkerDetails(): List[WorkerMetadata] = { - - val dirPath = Paths.get("worker_register") - val workerDetails = ListBuffer[WorkerMetadata]() - - if (Files.exists(dirPath)) { - val files = Files.list(dirPath).toArray - files.foreach { file => - val source = Source.fromFile(file.toString) - try { - for (line <- source.getLines) { - val parts = line.split(", ").map(_.split(": ").last) - if (parts.length == 2) { - val ip = parts(0) - val port = parts(1).toInt - workerDetails += (WorkerMetadata(ip, port, None)) - } - } - } finally { - source.close() - } - } - } - - workerDetails.toList - } - -} diff --git a/master/src/main/scala/Main.scala b/master/src/main/scala/Main.scala index b51bb49..1433f1c 100644 --- a/master/src/main/scala/Main.scala +++ b/master/src/main/scala/Main.scala @@ -1,16 +1,19 @@ package kr.ac.postech.paranode.master import kr.ac.postech.paranode.core.WorkerMetadata -import kr.ac.postech.paranode.master.AuxFunction.getWorkerDetails import kr.ac.postech.paranode.rpc.MasterServer +import java.nio.file.Files +import java.nio.file.Paths +import java.net._ + import java.net.URL import scala.io.Source import scala.util.Try object Main { def main(args: Array[String]): Unit = { - val requestLimit = Try(args(0).toInt).getOrElse { + val numberOfWorker = Try(args(0).toInt).getOrElse { println("Invalid command") return } @@ -18,26 +21,24 @@ object Main { val server = new MasterServer(scala.concurrent.ExecutionContext.global) server.startServer() - while (server.getRequestCount < requestLimit) { + while (server.getWorkerDetails.size < numberOfWorker) { Thread.sleep(1000) } - val workerInfo: List[WorkerMetadata] = getWorkerDetails() + val workerInfo: List[WorkerMetadata] = server.getWorkerDetails - assert(workerInfo.size == requestLimit) + assert(workerInfo.size == numberOfWorker) try { - val url = new URL("http://checkip.amazonaws.com") - val source = Source.fromURL(url) - val publicIpAddress = source.mkString.trim - source.close() + val publicIpAddress = InetAddress.getLocalHost.getHostAddress println(publicIpAddress + ":" + server.getPort) println(workerInfo.map(_.host).mkString(", ")) } catch { case e: Exception => e.printStackTrace() } - // TODO: start WorkerClient + // TODO: save workerInfo and start WorkerClient } + } diff --git a/rpc/src/main/scala/MasterServer.scala b/rpc/src/main/scala/MasterServer.scala index fa9f91d..44ade74 100644 --- a/rpc/src/main/scala/MasterServer.scala +++ b/rpc/src/main/scala/MasterServer.scala @@ -12,9 +12,11 @@ import java.util.logging.Logger import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise - import master.{MasterGrpc, RegisterReply, RegisterRequest} +import scala.collection.mutable.{ListBuffer, WrappedArray} +import kr.ac.postech.paranode.core.WorkerMetadata + object MasterServer { private val logger = Logger.getLogger(classOf[MasterServer].getName) @@ -33,13 +35,14 @@ class MasterServer(executionContext: ExecutionContext) { self => .addService(MasterGrpc.bindService(new MasterImpl, executionContext)) .build() - private var requestCount = 0 + private val workerDetails:ListBuffer[WorkerMetadata] = ListBuffer() - def incrementRequestCount(): Unit = synchronized { - requestCount += 1 + def addWorkerInfo(workerMetadata: WorkerMetadata): Unit = synchronized { + workerDetails += workerMetadata } - def getRequestCount: Int = requestCount + def getWorkerDetails: List[WorkerMetadata] = workerDetails.toList + def getPort: String = port.toString private def start(): Unit = { @@ -78,31 +81,8 @@ class MasterServer(executionContext: ExecutionContext) { self => val promise = Promise[RegisterReply] Future { - try { - val dirPath = Paths.get("worker_register") - if (!Files.exists(dirPath)) { - Files.createDirectories(dirPath) - } - val filePath = - dirPath.resolve(request.worker.get.host + ".txt").toString - - val writer = new PrintWriter(new File(filePath), "UTF-8") - try { - writer.println( - s"Worker Host: ${request.worker.get.host}, Worker Port: ${request.worker.get.port}" - ) - } finally { - self.incrementRequestCount() - writer.close() - } - promise.success(new RegisterReply()) - } catch { - case e: Exception => - MasterServer.logger.warning( - "Failed to write to file: " + e.getMessage - ) - promise.failure(e) - } + val workerMetadata = WorkerMetadata(request.worker.get.host, request.worker.get.port, None) + addWorkerInfo(workerMetadata) }(executionContext) promise.future diff --git a/worker/src/main/scala/Main.scala b/worker/src/main/scala/Main.scala index 2e7e7f6..f5ac9ea 100644 --- a/worker/src/main/scala/Main.scala +++ b/worker/src/main/scala/Main.scala @@ -3,7 +3,7 @@ package kr.ac.postech.paranode.worker import kr.ac.postech.paranode.core.WorkerMetadata import kr.ac.postech.paranode.rpc.MasterClient -import java.net.URL +import java.net.{InetAddress, URL} import scala.io.Source import scala.util.Try @@ -34,14 +34,13 @@ object Main { // Open MasterClient and request register val client = MasterClient(ip, port) try { - val url = new URL("http://checkip.amazonaws.com") - val source = Source.fromURL(url) - val publicIpAddress = source.mkString.trim - source.close() + val publicIpAddress = InetAddress.getLocalHost.getHostAddress val workerMetadata = WorkerMetadata(publicIpAddress, -1, None) client.register(workerMetadata) + // doesn't come here. I think its because we use blockingStub. + } finally { client.shutdown() } From 5e04944846cc46b9880281e3de16844b7da49177 Mon Sep 17 00:00:00 2001 From: leejiwon1125 Date: Wed, 22 Nov 2023 23:36:48 +0900 Subject: [PATCH 10/13] feat: 1. make blocking call to non blocking call so that now it works porperly 2. lint --- master/src/main/scala/Main.scala | 5 ----- rpc/src/main/scala/MasterClient.scala | 10 +++++----- rpc/src/main/scala/MasterServer.scala | 16 +++++++--------- worker/src/main/scala/Main.scala | 6 +----- 4 files changed, 13 insertions(+), 24 deletions(-) diff --git a/master/src/main/scala/Main.scala b/master/src/main/scala/Main.scala index 1433f1c..c25f58b 100644 --- a/master/src/main/scala/Main.scala +++ b/master/src/main/scala/Main.scala @@ -3,12 +3,7 @@ package kr.ac.postech.paranode.master import kr.ac.postech.paranode.core.WorkerMetadata import kr.ac.postech.paranode.rpc.MasterServer -import java.nio.file.Files -import java.nio.file.Paths import java.net._ - -import java.net.URL -import scala.io.Source import scala.util.Try object Main { diff --git a/rpc/src/main/scala/MasterClient.scala b/rpc/src/main/scala/MasterClient.scala index 0ecda08..872017c 100644 --- a/rpc/src/main/scala/MasterClient.scala +++ b/rpc/src/main/scala/MasterClient.scala @@ -8,15 +8,15 @@ import java.util.concurrent.TimeUnit import java.util.logging.Logger import common.Node -import master.MasterGrpc.MasterBlockingStub +import master.MasterGrpc.MasterStub import master.{MasterGrpc, RegisterRequest} object MasterClient { def apply(host: String, port: Int): MasterClient = { val channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build - val blockingStub = MasterGrpc.blockingStub(channel) - new MasterClient(channel, blockingStub) + val stub = MasterGrpc.stub(channel) + new MasterClient(channel, stub) } def main(args: Array[String]): Unit = { @@ -33,7 +33,7 @@ object MasterClient { class MasterClient private ( private val channel: ManagedChannel, - private val blockingStub: MasterBlockingStub + private val stub: MasterStub ) { Logger.getLogger(classOf[MasterClient].getName) @@ -49,6 +49,6 @@ class MasterClient private ( Some(Node(workerMetadata.host, workerMetadata.port)) ) - blockingStub.register(request) + stub.register(request) } } diff --git a/rpc/src/main/scala/MasterServer.scala b/rpc/src/main/scala/MasterServer.scala index 44ade74..2728a55 100644 --- a/rpc/src/main/scala/MasterServer.scala +++ b/rpc/src/main/scala/MasterServer.scala @@ -2,20 +2,17 @@ package kr.ac.postech.paranode.rpc import io.grpc.Server import io.grpc.ServerBuilder +import kr.ac.postech.paranode.core.WorkerMetadata import kr.ac.postech.paranode.rpc.MasterServer.port -import java.io.File -import java.io.PrintWriter -import java.nio.file.Files -import java.nio.file.Paths import java.util.logging.Logger +import scala.collection.mutable.ListBuffer +import scala.collection.mutable.WrappedArray import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise -import master.{MasterGrpc, RegisterReply, RegisterRequest} -import scala.collection.mutable.{ListBuffer, WrappedArray} -import kr.ac.postech.paranode.core.WorkerMetadata +import master.{MasterGrpc, RegisterReply, RegisterRequest} object MasterServer { private val logger = Logger.getLogger(classOf[MasterServer].getName) @@ -35,7 +32,7 @@ class MasterServer(executionContext: ExecutionContext) { self => .addService(MasterGrpc.bindService(new MasterImpl, executionContext)) .build() - private val workerDetails:ListBuffer[WorkerMetadata] = ListBuffer() + private val workerDetails: ListBuffer[WorkerMetadata] = ListBuffer() def addWorkerInfo(workerMetadata: WorkerMetadata): Unit = synchronized { workerDetails += workerMetadata @@ -81,7 +78,8 @@ class MasterServer(executionContext: ExecutionContext) { self => val promise = Promise[RegisterReply] Future { - val workerMetadata = WorkerMetadata(request.worker.get.host, request.worker.get.port, None) + val workerMetadata = + WorkerMetadata(request.worker.get.host, request.worker.get.port, None) addWorkerInfo(workerMetadata) }(executionContext) diff --git a/worker/src/main/scala/Main.scala b/worker/src/main/scala/Main.scala index f5ac9ea..cb24274 100644 --- a/worker/src/main/scala/Main.scala +++ b/worker/src/main/scala/Main.scala @@ -3,8 +3,7 @@ package kr.ac.postech.paranode.worker import kr.ac.postech.paranode.core.WorkerMetadata import kr.ac.postech.paranode.rpc.MasterClient -import java.net.{InetAddress, URL} -import scala.io.Source +import java.net.InetAddress import scala.util.Try object Main { @@ -35,12 +34,9 @@ object Main { val client = MasterClient(ip, port) try { val publicIpAddress = InetAddress.getLocalHost.getHostAddress - val workerMetadata = WorkerMetadata(publicIpAddress, -1, None) client.register(workerMetadata) - // doesn't come here. I think its because we use blockingStub. - } finally { client.shutdown() } From d9b36c0d04b0752cc0fd05247b55dd3e5e0bd575 Mon Sep 17 00:00:00 2001 From: leejiwon1125 Date: Wed, 22 Nov 2023 23:44:45 +0900 Subject: [PATCH 11/13] chore: change value name --- master/src/main/scala/Main.scala | 4 ++-- worker/src/main/scala/Main.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/master/src/main/scala/Main.scala b/master/src/main/scala/Main.scala index c25f58b..08537a9 100644 --- a/master/src/main/scala/Main.scala +++ b/master/src/main/scala/Main.scala @@ -25,9 +25,9 @@ object Main { assert(workerInfo.size == numberOfWorker) try { - val publicIpAddress = InetAddress.getLocalHost.getHostAddress + val ipAddress = InetAddress.getLocalHost.getHostAddress - println(publicIpAddress + ":" + server.getPort) + println(ipAddress + ":" + server.getPort) println(workerInfo.map(_.host).mkString(", ")) } catch { case e: Exception => e.printStackTrace() diff --git a/worker/src/main/scala/Main.scala b/worker/src/main/scala/Main.scala index cb24274..2e87062 100644 --- a/worker/src/main/scala/Main.scala +++ b/worker/src/main/scala/Main.scala @@ -33,8 +33,8 @@ object Main { // Open MasterClient and request register val client = MasterClient(ip, port) try { - val publicIpAddress = InetAddress.getLocalHost.getHostAddress - val workerMetadata = WorkerMetadata(publicIpAddress, -1, None) + val ipAddress = InetAddress.getLocalHost.getHostAddress + val workerMetadata = WorkerMetadata(ipAddress, -1, None) client.register(workerMetadata) } finally { From ff302f96d6e46a31d5c6d7e01fb881dbcd2733ae Mon Sep 17 00:00:00 2001 From: hataehyeok <105369662+hataehyeok@users.noreply.github.com> Date: Thu, 23 Nov 2023 10:09:11 +0900 Subject: [PATCH 12/13] feat: implement worker server --- rpc/src/main/scala/WorkerServer.scala | 88 +++++++++++++++++++++++---- 1 file changed, 75 insertions(+), 13 deletions(-) diff --git a/rpc/src/main/scala/WorkerServer.scala b/rpc/src/main/scala/WorkerServer.scala index 367f92d..309bc75 100644 --- a/rpc/src/main/scala/WorkerServer.scala +++ b/rpc/src/main/scala/WorkerServer.scala @@ -1,12 +1,16 @@ package kr.ac.postech.paranode.rpc +import com.google.protobuf.ByteString import io.grpc.Server import io.grpc.ServerBuilder +import kr.ac.postech.paranode.core._ import java.util.logging.Logger import scala.concurrent.ExecutionContext +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.Promise +import scala.reflect.io.Path import worker._ @@ -61,8 +65,20 @@ class WorkerServer(executionContext: ExecutionContext) { self => val promise = Promise[SampleReply] Future { - // TODO: Logic - promise.success(new SampleReply()) + try { + val sortedBlock = Block.fromPath(Path("data/block"), 10, 90).sort() + val sampledKeys = sortedBlock + .sample() + .map(key => ByteString.copyFrom(key.underlying)) + .toList + val reply = SampleReply(sampledKeys) + + promise.success(reply) + } catch { + case e: Exception => + println(e) + promise.failure(e) + } }(executionContext) promise.future @@ -74,30 +90,76 @@ class WorkerServer(executionContext: ExecutionContext) { self => val promise = Promise[PartitionReply] Future { - // TODO: Logic - promise.success(new PartitionReply()) + try { + val block = Block.fromPath(Path("data/block"), 10, 90) + request.workers + .map(workerMetadata => { + val keyRange = KeyRange( + Key.fromString(workerMetadata.keyRange.get.from.toStringUtf8), + Key.fromString(workerMetadata.keyRange.get.to.toStringUtf8) + ) + val partition = block.partition(keyRange) + val partitionPath = Path( + s"data/partition/${workerMetadata.node.get.host}:${workerMetadata.node.get.port}" + ) + partition._2.writeTo(partitionPath) + }) + + promise.success(new PartitionReply()) + } catch { + case e: Exception => + println(e) + promise.failure(e) + } }(executionContext) promise.future } override def exchange(request: ExchangeRequest): Future[ExchangeReply] = { - val promise = Promise[ExchangeReply] - - Future { - // TODO: Logic - promise.success(new ExchangeReply()) - }(executionContext) + val futures = request.workers.map(workerMetadata => + Future { + val host = workerMetadata.node.get.host + val port = workerMetadata.node.get.port + val partitionPath = Path(s"data/partition/${host}:${port}") + + try { + if (partitionPath.exists) { + val partition = Block.fromPath(partitionPath, 10, 90) + val exchangeClient = ExchangeClient.apply(host, port) + val reply = exchangeClient.saveRecords(partition.records) + Some(reply) + } else { + None + } + } finally { + if (partitionPath.exists) { + partitionPath.delete() + } + } + }(executionContext) + ) - promise.future + Future.sequence(futures).map(_ => new ExchangeReply()) } override def merge(request: MergeRequest): Future[MergeReply] = { val promise = Promise[MergeReply] Future { - // TODO: Logic - promise.success(new MergeReply()) + try { + val host = Path("data/host") + val port = Path("data/port") + val blockPath = Path(s"data/partition/${host}:${port}") + val mergedBlock = Block.fromPath(blockPath, 10, 90).sort() + mergedBlock.writeTo(blockPath) + + promise.success(new MergeReply()) + } catch { + case e: Exception => + println(e) + promise.failure(e) + } }(executionContext) promise.future From 37feaba20b686115fe5ad90de181c952644395aa Mon Sep 17 00:00:00 2001 From: Minjae Gwon Date: Thu, 23 Nov 2023 20:19:49 +0900 Subject: [PATCH 13/13] docs: fifth progress report --- docs/report-progress-05.md | 43 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 docs/report-progress-05.md diff --git a/docs/report-progress-05.md b/docs/report-progress-05.md new file mode 100644 index 0000000..ad4c1cd --- /dev/null +++ b/docs/report-progress-05.md @@ -0,0 +1,43 @@ +# Progress Report + +> 11/17/2023 - 11/23/2023 + +## Discussion + +### File Naming Convention + +> For `/[INPUT DIR]/[BLOCK A]` + +- Sorted Block A + - `/[INPUT DIR]/[BLOCK A]` +- A Partition of A + - `/[INPUT DIR]/[BLOCK A].[FROM].[TO]` +- Partition from Other Worker + - `/[OUTPUT DIR]/[UUID]` +- Merged + - `/[OUTPUT DIR]/result` + +## Completed Tasks + +- Merge proto related branches and implement (#62) +- Implement Worker Server (#63) +- Implement Exchange Server (#64) +- Implement Master Server (#65) +- CLI and Initialize (#66) + +## Assigned Tasks + +- Refactor Servers and Clients (#75) + - @betarixm +- Extract Key Range from Worker (#76) + - @leejiwon1125 +- E2E Test (Small) (#77) + - @betarixm, @hataehyeok, @leejiwon1125 +- Exchange Server and Client (#78) + - @leejiwon1125 +- E2E Test (Real World) (#79) + - @betarixm, @hataehyeok, @leejiwon1125 +- Grpc Test (#80) + - @hataehyeok +- Logging (#81) + - @betarixm