Skip to content

Commit

Permalink
test: separate exchange service
Browse files Browse the repository at this point in the history
  • Loading branch information
betarixm committed Nov 27, 2023
1 parent d2b03e0 commit b55d83c
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 20 deletions.
16 changes: 16 additions & 0 deletions rpc/src/main/protobuf/exchange.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package kr.ac.postech.paranode.rpc;

import "common.proto";

service Exchange {
rpc Save (SaveRequest) returns (SaveReply) {}
}


message SaveRequest {
bytes block = 1;
}

message SaveReply {}
37 changes: 37 additions & 0 deletions rpc/src/main/scala/ExchangeClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kr.ac.postech.paranode.rpc

import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import kr.ac.postech.paranode.core.Block
import kr.ac.postech.paranode.rpc.exchange.ExchangeGrpc.ExchangeStub
import kr.ac.postech.paranode.rpc.exchange.SaveReply
import kr.ac.postech.paranode.rpc.exchange.SaveRequest

import java.util.concurrent.TimeUnit
import scala.concurrent.Future

import Implicit._

object ExchangeClient {
def apply(host: String, port: Int): ExchangeClient = {
val channel =
ManagedChannelBuilder.forAddress(host, port).usePlaintext().build
val stub = exchange.ExchangeGrpc.stub(channel)
new ExchangeClient(channel, stub)
}
}

class ExchangeClient private (
private val channel: ManagedChannel,
private val stub: ExchangeStub
) {
def shutdown(): Unit = {
channel.shutdown.awaitTermination(5, TimeUnit.SECONDS)
}

def save(block: Block): Future[SaveReply] = {
val request = SaveRequest(block)

stub.save(request)
}
}
41 changes: 41 additions & 0 deletions rpc/src/main/scala/ExchangeService.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package kr.ac.postech.paranode.rpc

import kr.ac.postech.paranode.core.Block
import org.apache.logging.log4j.scala.Logging

import java.util.UUID
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.reflect.io.Directory

import exchange.{ExchangeGrpc, SaveReply, SaveRequest}
import Implicit._

class ExchangeService(
executionContext: ExecutionContext,
outputDirectory: Directory
) extends ExchangeGrpc.Exchange
with Logging {
override def save(request: SaveRequest): Future[SaveReply] = {
val promise = Promise[SaveReply]

Future {
logger.info(s"[ExchangeServer] Save ($request)")

val block: Block = request.block

val path = outputDirectory / UUID.randomUUID().toString

logger.info(s"[ExchangeServer] Writing block to $path")

block.writeTo(path)

logger.info(s"[ExchangeServer] Block written to $path")

promise.success(new SaveReply())
}(executionContext)

promise.future
}
}
31 changes: 24 additions & 7 deletions rpc/src/main/scala/WorkerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,47 @@ import io.grpc.Server
import io.grpc.ServerBuilder
import org.apache.logging.log4j.scala.Logging

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.reflect.io.Directory
import worker._

import kr.ac.postech.paranode.core.WorkerMetadata
import worker._
import exchange.ExchangeGrpc

class WorkerServer(
executionContext: ExecutionContext,
me: WorkerMetadata,
port: Int,
inputDirectories: Array[Directory],
outputDirectory: Directory
) extends Logging { self =>
private[this] val workerExecutionContext: ExecutionContext =
ExecutionContext.fromExecutor(
Executors.newCachedThreadPool()
)

private[this] val exchangeExecutionContext: ExecutionContext =
ExecutionContext.fromExecutor(
Executors.newCachedThreadPool()
)

private[this] val server: Server = ServerBuilder
.forPort(port)
.addService(
WorkerGrpc.bindService(
new WorkerService(
executionContext,
me,
workerExecutionContext,
inputDirectories,
outputDirectory
),
executionContext
workerExecutionContext
)
)
.addService(
ExchangeGrpc.bindService(
new ExchangeService(
exchangeExecutionContext,
outputDirectory
),
exchangeExecutionContext
)
)
.build()
Expand Down
7 changes: 3 additions & 4 deletions rpc/src/main/scala/WorkerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import worker.{
WorkerGrpc
}
import Implicit._
import exchange.SaveReply

class WorkerService(
executionContext: ExecutionContext,
me: WorkerMetadata,
inputDirectories: Array[Directory],
outputDirectory: Directory
) extends WorkerGrpc.Worker
Expand Down Expand Up @@ -168,7 +168,7 @@ class WorkerService(

def sendBlock(block: Block)(worker: WorkerMetadata) = Future {
Await.result(
WorkerClient(worker.host, worker.port).saveBlock(block),
ExchangeClient(worker.host, worker.port).save(block),
scala.concurrent.duration.Duration.Inf
)
}
Expand All @@ -183,14 +183,13 @@ class WorkerService(

val targetWorkers = workers
.filter(_.keyRange.get.includes(block.records.head.key))
.filter(_.host != me.host)
.toList

logger.info(s"[WorkerServer] Sending $block to $targetWorkers")

Await.result(
Future.traverse(targetWorkers)(sendBlock(block))(
GenericBuildFrom[WorkerMetadata, SaveBlockReply],
GenericBuildFrom[WorkerMetadata, SaveReply],
executionContext
),
scala.concurrent.duration.Duration.Inf
Expand Down
9 changes: 0 additions & 9 deletions worker/src/main/scala/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import org.apache.logging.log4j.scala.Logging

import java.net.InetAddress
import java.net.ServerSocket
import java.util.concurrent.Executors
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.util.Using

object Worker extends Logging {
Expand All @@ -30,14 +28,7 @@ object Worker extends Logging {
s"outputDirectory: ${workerArguments.outputDirectory}\n"
)

val executionContext = ExecutionContext.fromExecutor(
Executors
.newCachedThreadPool()
)

val server = new WorkerServer(
executionContext,
workerMetadata,
workerPort,
workerArguments.inputDirectories,
workerArguments.outputDirectory
Expand Down

0 comments on commit b55d83c

Please sign in to comment.