diff --git a/rpc/src/main/protobuf/exchange.proto b/rpc/src/main/protobuf/exchange.proto new file mode 100644 index 0000000..b3924d0 --- /dev/null +++ b/rpc/src/main/protobuf/exchange.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package kr.ac.postech.paranode.rpc; + +import "common.proto"; + +service Exchange { + rpc Save (SaveRequest) returns (SaveReply) {} +} + + +message SaveRequest { + bytes block = 1; +} + +message SaveReply {} diff --git a/rpc/src/main/scala/ExchangeClient.scala b/rpc/src/main/scala/ExchangeClient.scala new file mode 100644 index 0000000..0175166 --- /dev/null +++ b/rpc/src/main/scala/ExchangeClient.scala @@ -0,0 +1,37 @@ +package kr.ac.postech.paranode.rpc + +import io.grpc.ManagedChannel +import io.grpc.ManagedChannelBuilder +import kr.ac.postech.paranode.core.Block +import kr.ac.postech.paranode.rpc.exchange.ExchangeGrpc.ExchangeStub +import kr.ac.postech.paranode.rpc.exchange.SaveReply +import kr.ac.postech.paranode.rpc.exchange.SaveRequest + +import java.util.concurrent.TimeUnit +import scala.concurrent.Future + +import Implicit._ + +object ExchangeClient { + def apply(host: String, port: Int): ExchangeClient = { + val channel = + ManagedChannelBuilder.forAddress(host, port).usePlaintext().build + val stub = exchange.ExchangeGrpc.stub(channel) + new ExchangeClient(channel, stub) + } +} + +class ExchangeClient private ( + private val channel: ManagedChannel, + private val stub: ExchangeStub +) { + def shutdown(): Unit = { + channel.shutdown.awaitTermination(5, TimeUnit.SECONDS) + } + + def save(block: Block): Future[SaveReply] = { + val request = SaveRequest(block) + + stub.save(request) + } +} diff --git a/rpc/src/main/scala/ExchangeService.scala b/rpc/src/main/scala/ExchangeService.scala new file mode 100644 index 0000000..ab36640 --- /dev/null +++ b/rpc/src/main/scala/ExchangeService.scala @@ -0,0 +1,41 @@ +package kr.ac.postech.paranode.rpc + +import kr.ac.postech.paranode.core.Block +import org.apache.logging.log4j.scala.Logging + +import java.util.UUID +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.reflect.io.Directory + +import exchange.{ExchangeGrpc, SaveReply, SaveRequest} +import Implicit._ + +class ExchangeService( + executionContext: ExecutionContext, + outputDirectory: Directory +) extends ExchangeGrpc.Exchange + with Logging { + override def save(request: SaveRequest): Future[SaveReply] = { + val promise = Promise[SaveReply] + + Future { + logger.info(s"[ExchangeServer] Save ($request)") + + val block: Block = request.block + + val path = outputDirectory / UUID.randomUUID().toString + + logger.info(s"[ExchangeServer] Writing block to $path") + + block.writeTo(path) + + logger.info(s"[ExchangeServer] Block written to $path") + + promise.success(new SaveReply()) + }(executionContext) + + promise.future + } +} diff --git a/rpc/src/main/scala/WorkerServer.scala b/rpc/src/main/scala/WorkerServer.scala index bc813b7..614ac7e 100644 --- a/rpc/src/main/scala/WorkerServer.scala +++ b/rpc/src/main/scala/WorkerServer.scala @@ -3,30 +3,47 @@ import io.grpc.Server import io.grpc.ServerBuilder import org.apache.logging.log4j.scala.Logging +import java.util.concurrent.Executors import scala.concurrent.ExecutionContext import scala.reflect.io.Directory -import worker._ -import kr.ac.postech.paranode.core.WorkerMetadata +import worker._ +import exchange.ExchangeGrpc class WorkerServer( - executionContext: ExecutionContext, - me: WorkerMetadata, port: Int, inputDirectories: Array[Directory], outputDirectory: Directory ) extends Logging { self => + private[this] val workerExecutionContext: ExecutionContext = + ExecutionContext.fromExecutor( + Executors.newCachedThreadPool() + ) + + private[this] val exchangeExecutionContext: ExecutionContext = + ExecutionContext.fromExecutor( + Executors.newCachedThreadPool() + ) + private[this] val server: Server = ServerBuilder .forPort(port) .addService( WorkerGrpc.bindService( new WorkerService( - executionContext, - me, + workerExecutionContext, inputDirectories, outputDirectory ), - executionContext + workerExecutionContext + ) + ) + .addService( + ExchangeGrpc.bindService( + new ExchangeService( + exchangeExecutionContext, + outputDirectory + ), + exchangeExecutionContext ) ) .build() diff --git a/rpc/src/main/scala/WorkerService.scala b/rpc/src/main/scala/WorkerService.scala index b5e1098..e32a2df 100644 --- a/rpc/src/main/scala/WorkerService.scala +++ b/rpc/src/main/scala/WorkerService.scala @@ -32,10 +32,10 @@ import worker.{ WorkerGrpc } import Implicit._ +import exchange.SaveReply class WorkerService( executionContext: ExecutionContext, - me: WorkerMetadata, inputDirectories: Array[Directory], outputDirectory: Directory ) extends WorkerGrpc.Worker @@ -168,7 +168,7 @@ class WorkerService( def sendBlock(block: Block)(worker: WorkerMetadata) = Future { Await.result( - WorkerClient(worker.host, worker.port).saveBlock(block), + ExchangeClient(worker.host, worker.port).save(block), scala.concurrent.duration.Duration.Inf ) } @@ -183,14 +183,13 @@ class WorkerService( val targetWorkers = workers .filter(_.keyRange.get.includes(block.records.head.key)) - .filter(_.host != me.host) .toList logger.info(s"[WorkerServer] Sending $block to $targetWorkers") Await.result( Future.traverse(targetWorkers)(sendBlock(block))( - GenericBuildFrom[WorkerMetadata, SaveBlockReply], + GenericBuildFrom[WorkerMetadata, SaveReply], executionContext ), scala.concurrent.duration.Duration.Inf diff --git a/worker/src/main/scala/Worker.scala b/worker/src/main/scala/Worker.scala index bce7135..f16c708 100644 --- a/worker/src/main/scala/Worker.scala +++ b/worker/src/main/scala/Worker.scala @@ -7,9 +7,7 @@ import org.apache.logging.log4j.scala.Logging import java.net.InetAddress import java.net.ServerSocket -import java.util.concurrent.Executors import scala.concurrent.Await -import scala.concurrent.ExecutionContext import scala.util.Using object Worker extends Logging { @@ -30,14 +28,7 @@ object Worker extends Logging { s"outputDirectory: ${workerArguments.outputDirectory}\n" ) - val executionContext = ExecutionContext.fromExecutor( - Executors - .newCachedThreadPool() - ) - val server = new WorkerServer( - executionContext, - workerMetadata, workerPort, workerArguments.inputDirectories, workerArguments.outputDirectory