Skip to content

Commit

Permalink
refactor: merge exchange service
Browse files Browse the repository at this point in the history
  • Loading branch information
betarixm committed Nov 26, 2023
1 parent dfcff33 commit 988008c
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 134 deletions.
13 changes: 0 additions & 13 deletions rpc/src/main/protobuf/exchange.proto

This file was deleted.

7 changes: 7 additions & 0 deletions rpc/src/main/protobuf/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ service Worker {
rpc Sort (SortRequest) returns (SortReply) {}
rpc Partition (PartitionRequest) returns (PartitionReply) {}
rpc Exchange (ExchangeRequest) returns (ExchangeReply) {}
rpc SaveBlock (SaveBlockRequest) returns (SaveBlockReply) {}
rpc Merge (MergeRequest) returns (MergeReply) {}
}

Expand Down Expand Up @@ -36,6 +37,12 @@ message ExchangeRequest {

message ExchangeReply {}

message SaveBlockRequest {
bytes block = 1;
}

message SaveBlockReply {}

message MergeRequest {}

message MergeReply {}
43 changes: 0 additions & 43 deletions rpc/src/main/scala/ExchangeClient.scala

This file was deleted.

74 changes: 0 additions & 74 deletions rpc/src/main/scala/ExchangeServer.scala

This file was deleted.

14 changes: 11 additions & 3 deletions rpc/src/main/scala/WorkerClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package kr.ac.postech.paranode.rpc
import com.google.protobuf.ByteString
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import kr.ac.postech.paranode.core.KeyRange
import kr.ac.postech.paranode.core.WorkerMetadata
import kr.ac.postech.paranode.core.{Block, KeyRange, WorkerMetadata}

import java.util.concurrent.TimeUnit
import java.util.logging.Logger
import scala.concurrent.Future

import worker._
import worker.WorkerGrpc.WorkerStub
import common.{
Expand Down Expand Up @@ -89,6 +87,16 @@ class WorkerClient private (
stub.exchange(request)
}

def saveBlock(
block: Block
): Future[SaveBlockReply] = {
val request = SaveBlockRequest(
ByteString.copyFrom(block.toChars.map(_.toByte).toArray)
)

stub.saveBlock(request)
}

def merge(): Future[MergeReply] = {
val request = MergeRequest()
stub.merge(request)
Expand Down
28 changes: 27 additions & 1 deletion rpc/src/main/scala/WorkerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import scala.concurrent.Future
import scala.concurrent.Promise
import scala.reflect.io.Directory
import scala.reflect.io.Path

import common.{WorkerMetadata => RpcWorkerMetadata}
import worker._
import java.util.UUID

class WorkerServer(
executionContext: ExecutionContext,
Expand Down Expand Up @@ -165,6 +165,32 @@ class WorkerServer(
Future.sequence(futures).map(_ => new ExchangeReply())
}

override def saveBlock(
request: SaveBlockRequest
): Future[SaveBlockReply] = {
val promise = Promise[SaveBlockReply]

Future {
logger.debug(s"[WorkerServer] SaveBlock ($request)")

val block = Block.fromBytes(
LazyList.from(request.block.toByteArray)
)

val path = outputDirectory / UUID.randomUUID().toString

logger.debug(s"[WorkerServer] Writing block to $path")

block.writeTo(path)

logger.debug(s"[WorkerServer] Wrote block to $path")

promise.success(new SaveBlockReply())
}

promise.future
}

override def merge(request: MergeRequest): Future[MergeReply] = {
val promise = Promise[MergeReply]

Expand Down

0 comments on commit 988008c

Please sign in to comment.