From 5791381795aa6c3490ab312ad760ffe9e09c8712 Mon Sep 17 00:00:00 2001 From: Minjae Gwon Date: Sun, 26 Nov 2023 21:07:00 +0900 Subject: [PATCH] feat: exchange --- master/src/main/scala/Master.scala | 11 ++++++ rpc/src/main/scala/WorkerClient.scala | 5 ++- rpc/src/main/scala/WorkerServer.scala | 50 ++++++++++++++++++++------- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/master/src/main/scala/Master.scala b/master/src/main/scala/Master.scala index c3d55d9..9c7da97 100644 --- a/master/src/main/scala/Master.scala +++ b/master/src/main/scala/Master.scala @@ -92,6 +92,17 @@ object Master extends Logging { logger.debug("[Master] Partition finished") + logger.debug("[Master] Exchange started") + + Await.result( + Future.sequence( + clients.map(_.exchange(keyRangesWithWorker)) + ), + scala.concurrent.duration.Duration.Inf + ) + + logger.debug("[Master] Exchange finished") + server.blockUntilShutdown() } diff --git a/rpc/src/main/scala/WorkerClient.scala b/rpc/src/main/scala/WorkerClient.scala index 368bad5..4b71752 100644 --- a/rpc/src/main/scala/WorkerClient.scala +++ b/rpc/src/main/scala/WorkerClient.scala @@ -3,11 +3,14 @@ package kr.ac.postech.paranode.rpc import com.google.protobuf.ByteString import io.grpc.ManagedChannel import io.grpc.ManagedChannelBuilder -import kr.ac.postech.paranode.core.{Block, KeyRange, WorkerMetadata} +import kr.ac.postech.paranode.core.Block +import kr.ac.postech.paranode.core.KeyRange +import kr.ac.postech.paranode.core.WorkerMetadata import java.util.concurrent.TimeUnit import java.util.logging.Logger import scala.concurrent.Future + import worker._ import worker.WorkerGrpc.WorkerStub import common.{ diff --git a/rpc/src/main/scala/WorkerServer.scala b/rpc/src/main/scala/WorkerServer.scala index fe0125e..f35b45c 100644 --- a/rpc/src/main/scala/WorkerServer.scala +++ b/rpc/src/main/scala/WorkerServer.scala @@ -6,15 +6,17 @@ import io.grpc.ServerBuilder import kr.ac.postech.paranode.core._ import org.apache.logging.log4j.scala.Logging +import java.util.UUID +import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.Promise import scala.reflect.io.Directory import scala.reflect.io.Path + import common.{WorkerMetadata => RpcWorkerMetadata} import worker._ -import java.util.UUID class WorkerServer( executionContext: ExecutionContext, @@ -148,14 +150,6 @@ class WorkerServer( logger.debug( s"[WorkerServer] Wrote partition to $partitionPath" ) - - if (path.exists && path.isFile) { - val result = path.delete() - - logger.debug( - s"[WorkerServer] Deleted $path: $result" - ) - } }) }) @@ -167,10 +161,42 @@ class WorkerServer( } override def exchange(request: ExchangeRequest): Future[ExchangeReply] = { - val futures = - request.workers.map(_ => Future {}(executionContext)) + val promise = Promise[ExchangeReply] + + Future { + logger.debug(s"[WorkerServer] Exchange ($request)") + + val workers = toWorkerMetadata(request.workers) + + inputFiles.foreach(path => { + val block = Block.fromPath(path) + val targetWorkers = workers + .filter(_.keyRange.get.includes(block.records.head.key)) - Future.sequence(futures).map(_ => new ExchangeReply()) + logger.debug(s"[WorkerServer] Sending $block to $targetWorkers") + + Await.result( + Future.sequence( + targetWorkers + .map(worker => WorkerClient(worker.host, worker.port)) + .map(_.saveBlock(block)) + ), + scala.concurrent.duration.Duration.Inf + ) + + if (path.exists && path.isFile) { + val result = path.delete() + + logger.debug( + s"[WorkerServer] Deleted $path: $result" + ) + } + }) + + promise.success(new ExchangeReply()) + } + + promise.future } override def saveBlock(