From 41107f04445a379cbf9e5a70b20d1c918ee90d05 Mon Sep 17 00:00:00 2001 From: Minjae Gwon Date: Mon, 27 Nov 2023 20:43:18 +0900 Subject: [PATCH 1/3] feat: parallelize logics --- core/src/main/scala/Block.scala | 8 +- rpc/src/main/scala/WorkerService.scala | 130 ++++++++++++++++--------- 2 files changed, 89 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/Block.scala b/core/src/main/scala/Block.scala index 8d8b892..6dd85e4 100644 --- a/core/src/main/scala/Block.scala +++ b/core/src/main/scala/Block.scala @@ -2,10 +2,8 @@ package kr.ac.postech.paranode.core import org.apache.logging.log4j.scala.Logging -import java.io.BufferedOutputStream -import java.io.File -import java.io.FileOutputStream import scala.io.Source +import scala.reflect.io.File import scala.reflect.io.Path object Block extends Logging { @@ -47,8 +45,8 @@ class Block(val records: LazyList[Record]) extends AnyVal { def toChars: LazyList[Char] = records.flatMap(_.toChars) def writeTo(path: Path): File = { - val file = new File(path.toString) - val writer = new BufferedOutputStream(new FileOutputStream(file)) + val file = File(path) + val writer = file.bufferedWriter() try { toChars.foreach(writer.write(_)) diff --git a/rpc/src/main/scala/WorkerService.scala b/rpc/src/main/scala/WorkerService.scala index 028fd94..6bccbb9 100644 --- a/rpc/src/main/scala/WorkerService.scala +++ b/rpc/src/main/scala/WorkerService.scala @@ -3,12 +3,13 @@ package kr.ac.postech.paranode.rpc import com.google.protobuf.ByteString import kr.ac.postech.paranode.core.Block import kr.ac.postech.paranode.core.WorkerMetadata +import kr.ac.postech.paranode.utils.GenericBuildFrom import org.apache.logging.log4j.scala.Logging import java.util.UUID import scala.concurrent.Await import scala.concurrent.ExecutionContext -import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.ExecutionContextExecutor import scala.concurrent.Future import scala.concurrent.Promise import scala.reflect.io.Directory @@ -63,21 +64,31 @@ class WorkerService( override def sort(request: SortRequest): Future[SortReply] = { val promise = Promise[SortReply] - Future { - logger.info(s"[WorkerServer] Sort ($request)") - - inputFiles - .foreach(path => { - val block = Block.fromPath(path) + implicit val executionContext: ExecutionContextExecutor = + scala.concurrent.ExecutionContext.fromExecutor( + java.util.concurrent.Executors + .newCachedThreadPool() + ) - val sortedBlock = block.sorted + def sorted(path: Path) = Future { + logger.info(s"[WorkerServer] Sorting block $path") + val result = Block.fromPath(path).sorted.writeTo(path) + logger.info(s"[WorkerServer] Wrote sorted block $path") + result + } - logger.info(s"[WorkerServer] Writing sorted block to $path") + Future { + logger.info(s"[WorkerServer] Sort ($request)") - sortedBlock.writeTo(path) + Await.result( + Future.traverse(inputFiles.toList)(sorted)( + GenericBuildFrom[File, File], + executionContext + ), + scala.concurrent.duration.Duration.Inf + ) - logger.info(s"[WorkerServer] Wrote sorted block to $path") - }) + logger.info("[WorkerServer] Sorted") promise.success(new SortReply()) }(executionContext) @@ -90,39 +101,54 @@ class WorkerService( ): Future[PartitionReply] = { val promise = Promise[PartitionReply] - Future { - logger.info(s"[WorkerServer] Partition ($request)") + implicit val executionContext: ExecutionContextExecutor = + scala.concurrent.ExecutionContext.fromExecutor( + java.util.concurrent.Executors + .newCachedThreadPool() + ) + + val workers: Seq[WorkerMetadata] = request.workers - val workers: Seq[WorkerMetadata] = request.workers + def partition(path: Path) = Future { + val block = Block.fromPath(path) - inputFiles - .map(path => { - val block = Block.fromPath(path) - workers - .map(_.keyRange.get) - .map(block.partition) - .map({ case (keyRange, partition) => - val partitionPath = Path( - s"$path.${keyRange.from.hex}-${keyRange.to.hex}" - ) + logger.info("[WorkerServer] Partitioning block") - logger.info( - s"[WorkerServer] Writing partition to $partitionPath" - ) + val partitions = workers + .map(_.keyRange.get) + .map(keyRange => { + block.partition(keyRange) + }) - partition.writeTo(partitionPath) + logger.info("[WorkerServer] Partitioned block") - logger.info( - s"[WorkerServer] Wrote partition to $partitionPath" - ) + logger.info("[WorkerServer] Writing partitions") - if (path.exists && path.isFile) { - val result = path.delete() - logger.info(s"[WorkerServer] Deleted $path: $result") - } - }) + val result = partitions.map({ case (keyRange, partition) => + partition.writeTo( + Path( + s"$path.${keyRange.from.hex}-${keyRange.to.hex}" + ) + ) + }) - }) + logger.info("[WorkerServer] Wrote partitions") + + result + } + + Future { + logger.info(s"[WorkerServer] Partition ($request)") + + Await.result( + Future.traverse(inputFiles.toList)(partition)( + GenericBuildFrom[File, Seq[File]], + executionContext + ), + scala.concurrent.duration.Duration.Inf + ) + + logger.info("[WorkerServer] Partitioned") promise.success(new PartitionReply()) }(executionContext) @@ -133,28 +159,44 @@ class WorkerService( override def exchange(request: ExchangeRequest): Future[ExchangeReply] = { val promise = Promise[ExchangeReply] + implicit val executionContext: ExecutionContextExecutor = + scala.concurrent.ExecutionContext.fromExecutor( + java.util.concurrent.Executors + .newCachedThreadPool() + ) + + def sendBlock(block: Block)(worker: WorkerMetadata) = Future { + Await.result( + WorkerClient(worker.host, worker.port).saveBlock(block), + scala.concurrent.duration.Duration.Inf + ) + } + + val workers: Seq[WorkerMetadata] = request.workers + Future { logger.info(s"[WorkerServer] Exchange ($request)") - val workers: Seq[WorkerMetadata] = request.workers - inputFiles.foreach(path => { val block = Block.fromPath(path) + val targetWorkers = workers .filter(_.keyRange.get.includes(block.records.head.key)) + .toList logger.info(s"[WorkerServer] Sending $block to $targetWorkers") Await.result( - Future.sequence( - targetWorkers - .map(worker => WorkerClient(worker.host, worker.port)) - .map(_.saveBlock(block)) + Future.traverse(targetWorkers)(sendBlock(block))( + GenericBuildFrom[WorkerMetadata, SaveBlockReply], + executionContext ), scala.concurrent.duration.Duration.Inf ) }) + logger.info("[WorkerServer] Sent blocks") + promise.success(new ExchangeReply()) }(executionContext) From 39db36aea60bc7fea94332f03d3f902a272cfd5e Mon Sep 17 00:00:00 2001 From: Minjae Gwon Date: Mon, 27 Nov 2023 20:45:40 +0900 Subject: [PATCH 2/3] fix: test --- core/src/test/scala/BlockSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/BlockSpec.scala b/core/src/test/scala/BlockSpec.scala index d1d6fab..47f6672 100644 --- a/core/src/test/scala/BlockSpec.scala +++ b/core/src/test/scala/BlockSpec.scala @@ -107,7 +107,7 @@ class BlockSpec extends AnyFlatSpec { val result = block.writeTo(temporaryFile.getPath) - val source = Source.fromFile(result) + val source = Source.fromFile(result.jfile) temporaryFile.deleteOnExit() From 04ccb5f12e96b5ab27cc475b6b2d3f57bd5111dd Mon Sep 17 00:00:00 2001 From: Minjae Gwon Date: Mon, 27 Nov 2023 21:10:43 +0900 Subject: [PATCH 3/3] feat: save merged records in partitions --- core/src/main/scala/Block.scala | 20 ++++++++++++++++++++ rpc/src/main/scala/WorkerService.scala | 8 +++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/Block.scala b/core/src/main/scala/Block.scala index 6dd85e4..fceb39a 100644 --- a/core/src/main/scala/Block.scala +++ b/core/src/main/scala/Block.scala @@ -3,6 +3,7 @@ package kr.ac.postech.paranode.core import org.apache.logging.log4j.scala.Logging import scala.io.Source +import scala.reflect.io.Directory import scala.reflect.io.File import scala.reflect.io.Path @@ -55,6 +56,25 @@ class Block(val records: LazyList[Record]) extends AnyVal { } finally writer.close() } + def writeToDirectory( + directory: Directory, + size: Int = 320000 + ): List[File] = + records + .grouped(size) + .zipWithIndex + .map({ case (records, index) => + val file = File(directory / s"partition.$index") + val writer = file.bufferedWriter() + + try { + records.foreach(_.toChars.foreach(writer.write(_))) + file + } finally writer.close() + + }) + .toList + def filterByKeyRange(keyRange: KeyRange): Block = new Block( records.filter(keyRange.includes) ) diff --git a/rpc/src/main/scala/WorkerService.scala b/rpc/src/main/scala/WorkerService.scala index 6bccbb9..a0745f4 100644 --- a/rpc/src/main/scala/WorkerService.scala +++ b/rpc/src/main/scala/WorkerService.scala @@ -240,7 +240,13 @@ class WorkerService( val mergedBlock = blocks.merged - val results = mergedBlock.writeTo(outputDirectory / "result") + logger.info("[WorkerServer] Merged blocks") + + logger.info("[WorkerServer] Writing merged block") + + val results = mergedBlock.writeToDirectory(outputDirectory) + + logger.info("[WorkerServer] Wrote merged block") targetFiles.foreach(file => { val result = file.delete()