Skip to content

Commit

Permalink
feat: exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
betarixm committed Nov 26, 2023
1 parent 7b4ad23 commit 5791381
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 13 deletions.
11 changes: 11 additions & 0 deletions master/src/main/scala/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ object Master extends Logging {

logger.debug("[Master] Partition finished")

logger.debug("[Master] Exchange started")

Await.result(
Future.sequence(
clients.map(_.exchange(keyRangesWithWorker))
),
scala.concurrent.duration.Duration.Inf
)

logger.debug("[Master] Exchange finished")

server.blockUntilShutdown()
}

Expand Down
5 changes: 4 additions & 1 deletion rpc/src/main/scala/WorkerClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package kr.ac.postech.paranode.rpc
import com.google.protobuf.ByteString
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import kr.ac.postech.paranode.core.{Block, KeyRange, WorkerMetadata}
import kr.ac.postech.paranode.core.Block
import kr.ac.postech.paranode.core.KeyRange
import kr.ac.postech.paranode.core.WorkerMetadata

import java.util.concurrent.TimeUnit
import java.util.logging.Logger
import scala.concurrent.Future

import worker._
import worker.WorkerGrpc.WorkerStub
import common.{
Expand Down
50 changes: 38 additions & 12 deletions rpc/src/main/scala/WorkerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ import io.grpc.ServerBuilder
import kr.ac.postech.paranode.core._
import org.apache.logging.log4j.scala.Logging

import java.util.UUID
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.reflect.io.Directory
import scala.reflect.io.Path

import common.{WorkerMetadata => RpcWorkerMetadata}
import worker._
import java.util.UUID

class WorkerServer(
executionContext: ExecutionContext,
Expand Down Expand Up @@ -148,14 +150,6 @@ class WorkerServer(
logger.debug(
s"[WorkerServer] Wrote partition to $partitionPath"
)

if (path.exists && path.isFile) {
val result = path.delete()

logger.debug(
s"[WorkerServer] Deleted $path: $result"
)
}
})

})
Expand All @@ -167,10 +161,42 @@ class WorkerServer(
}

override def exchange(request: ExchangeRequest): Future[ExchangeReply] = {
val futures =
request.workers.map(_ => Future {}(executionContext))
val promise = Promise[ExchangeReply]

Future {
logger.debug(s"[WorkerServer] Exchange ($request)")

val workers = toWorkerMetadata(request.workers)

inputFiles.foreach(path => {
val block = Block.fromPath(path)
val targetWorkers = workers
.filter(_.keyRange.get.includes(block.records.head.key))

Future.sequence(futures).map(_ => new ExchangeReply())
logger.debug(s"[WorkerServer] Sending $block to $targetWorkers")

Await.result(
Future.sequence(
targetWorkers
.map(worker => WorkerClient(worker.host, worker.port))
.map(_.saveBlock(block))
),
scala.concurrent.duration.Duration.Inf
)

if (path.exists && path.isFile) {
val result = path.delete()

logger.debug(
s"[WorkerServer] Deleted $path: $result"
)
}
})

promise.success(new ExchangeReply())
}

promise.future
}

override def saveBlock(
Expand Down

0 comments on commit 5791381

Please sign in to comment.