Skip to content

Commit

Permalink
Merge pull request #93 from betarixm/fix/setup-partition-max-size
Browse files Browse the repository at this point in the history
Set max size of sened partition
  • Loading branch information
hataehyeok authored Nov 29, 2023
2 parents 34d543c + 82c30fa commit 41006c4
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 15 deletions.
4 changes: 4 additions & 0 deletions core/src/main/scala/Block.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,8 @@ class Block(val records: LazyList[Record]) extends AnyVal {
def sample(number: Int = 64): LazyList[Key] =
Record.sample(records, number)

def splitIntoChunks(chunkSize: Int): LazyList[Block] = {
records.grouped(chunkSize).map(new Block(_)).to(LazyList)
}

}
2 changes: 1 addition & 1 deletion master/src/main/scala/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object Master extends Logging {
logger.info("[Master] Sample Requested")

val sampledKeys = clients
.sample(64)(requestExecutionContext)
.sample(1024)(requestExecutionContext)
.flatMap(_.sampledKeys)
.map(Key.fromByteString)

Expand Down
32 changes: 18 additions & 14 deletions worker/src/main/scala/WorkerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,21 +200,25 @@ class WorkerService(
inputFiles.foreach(path => {
val block = Block.fromPath(path)

val targetClients = workersWithClients
.filter(
_._1.keyRange.get.includes(block.records.head.key)
val blockChunks = block.splitIntoChunks(1000)

blockChunks.foreach(chunk => {
val targetClients = workersWithClients
.filter(
_._1.keyRange.get.includes(chunk.records.head.key)
)
.map(_._2)

logger.info(s"[WorkerServer] Sending $chunk to $targetClients")

Await.result(
Future.traverse(targetClients)(_.saveBlock(chunk))(
GenericBuildFrom[WorkerClient, SaveBlockReply],
executionContext
),
scala.concurrent.duration.Duration.Inf
)
.map(_._2)

logger.info(s"[WorkerServer] Sending $block to $targetClients")

Await.result(
Future.traverse(targetClients)(_.saveBlock(block))(
GenericBuildFrom[WorkerClient, SaveBlockReply],
executionContext
),
scala.concurrent.duration.Duration.Inf
)
})
})

clients.foreach(_.shutdown())
Expand Down

0 comments on commit 41006c4

Please sign in to comment.