Skip to content

Commit

Permalink
Merge pull request #86 from betarixm/feat/parallel
Browse files Browse the repository at this point in the history
Parallel logics
  • Loading branch information
betarixm authored Nov 27, 2023
2 parents ae9ec83 + 04ccb5f commit 08d1335
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 51 deletions.
28 changes: 23 additions & 5 deletions core/src/main/scala/Block.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package kr.ac.postech.paranode.core

import org.apache.logging.log4j.scala.Logging

import java.io.BufferedOutputStream
import java.io.File
import java.io.FileOutputStream
import scala.io.Source
import scala.reflect.io.Directory
import scala.reflect.io.File
import scala.reflect.io.Path

object Block extends Logging {
Expand Down Expand Up @@ -47,8 +46,8 @@ class Block(val records: LazyList[Record]) extends AnyVal {
def toChars: LazyList[Char] = records.flatMap(_.toChars)

def writeTo(path: Path): File = {
val file = new File(path.toString)
val writer = new BufferedOutputStream(new FileOutputStream(file))
val file = File(path)
val writer = file.bufferedWriter()

try {
toChars.foreach(writer.write(_))
Expand All @@ -57,6 +56,25 @@ class Block(val records: LazyList[Record]) extends AnyVal {
} finally writer.close()
}

def writeToDirectory(
directory: Directory,
size: Int = 320000
): List[File] =
records
.grouped(size)
.zipWithIndex
.map({ case (records, index) =>
val file = File(directory / s"partition.$index")
val writer = file.bufferedWriter()

try {
records.foreach(_.toChars.foreach(writer.write(_)))
file
} finally writer.close()

})
.toList

def filterByKeyRange(keyRange: KeyRange): Block = new Block(
records.filter(keyRange.includes)
)
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/BlockSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class BlockSpec extends AnyFlatSpec {

val result = block.writeTo(temporaryFile.getPath)

val source = Source.fromFile(result)
val source = Source.fromFile(result.jfile)

temporaryFile.deleteOnExit()

Expand Down
138 changes: 93 additions & 45 deletions rpc/src/main/scala/WorkerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package kr.ac.postech.paranode.rpc
import com.google.protobuf.ByteString
import kr.ac.postech.paranode.core.Block
import kr.ac.postech.paranode.core.WorkerMetadata
import kr.ac.postech.paranode.utils.GenericBuildFrom
import org.apache.logging.log4j.scala.Logging

import java.util.UUID
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.reflect.io.Directory
Expand Down Expand Up @@ -63,21 +64,31 @@ class WorkerService(
override def sort(request: SortRequest): Future[SortReply] = {
val promise = Promise[SortReply]

Future {
logger.info(s"[WorkerServer] Sort ($request)")

inputFiles
.foreach(path => {
val block = Block.fromPath(path)
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
)

val sortedBlock = block.sorted
def sorted(path: Path) = Future {
logger.info(s"[WorkerServer] Sorting block $path")
val result = Block.fromPath(path).sorted.writeTo(path)
logger.info(s"[WorkerServer] Wrote sorted block $path")
result
}

logger.info(s"[WorkerServer] Writing sorted block to $path")
Future {
logger.info(s"[WorkerServer] Sort ($request)")

sortedBlock.writeTo(path)
Await.result(
Future.traverse(inputFiles.toList)(sorted)(
GenericBuildFrom[File, File],
executionContext
),
scala.concurrent.duration.Duration.Inf
)

logger.info(s"[WorkerServer] Wrote sorted block to $path")
})
logger.info("[WorkerServer] Sorted")

promise.success(new SortReply())
}(executionContext)
Expand All @@ -90,39 +101,54 @@ class WorkerService(
): Future[PartitionReply] = {
val promise = Promise[PartitionReply]

Future {
logger.info(s"[WorkerServer] Partition ($request)")
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
)

val workers: Seq[WorkerMetadata] = request.workers
val workers: Seq[WorkerMetadata] = request.workers

inputFiles
.map(path => {
val block = Block.fromPath(path)
workers
.map(_.keyRange.get)
.map(block.partition)
.map({ case (keyRange, partition) =>
val partitionPath = Path(
s"$path.${keyRange.from.hex}-${keyRange.to.hex}"
)
def partition(path: Path) = Future {
val block = Block.fromPath(path)

logger.info(
s"[WorkerServer] Writing partition to $partitionPath"
)
logger.info("[WorkerServer] Partitioning block")

partition.writeTo(partitionPath)
val partitions = workers
.map(_.keyRange.get)
.map(keyRange => {
block.partition(keyRange)
})

logger.info(
s"[WorkerServer] Wrote partition to $partitionPath"
)
logger.info("[WorkerServer] Partitioned block")

if (path.exists && path.isFile) {
val result = path.delete()
logger.info(s"[WorkerServer] Deleted $path: $result")
}
})
logger.info("[WorkerServer] Writing partitions")

})
val result = partitions.map({ case (keyRange, partition) =>
partition.writeTo(
Path(
s"$path.${keyRange.from.hex}-${keyRange.to.hex}"
)
)
})

logger.info("[WorkerServer] Wrote partitions")

result
}

Future {
logger.info(s"[WorkerServer] Partition ($request)")

Await.result(
Future.traverse(inputFiles.toList)(partition)(
GenericBuildFrom[File, Seq[File]],
executionContext
),
scala.concurrent.duration.Duration.Inf
)

logger.info("[WorkerServer] Partitioned")

promise.success(new PartitionReply())
}(executionContext)
Expand All @@ -133,28 +159,44 @@ class WorkerService(
override def exchange(request: ExchangeRequest): Future[ExchangeReply] = {
val promise = Promise[ExchangeReply]

implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
)

def sendBlock(block: Block)(worker: WorkerMetadata) = Future {
Await.result(
WorkerClient(worker.host, worker.port).saveBlock(block),
scala.concurrent.duration.Duration.Inf
)
}

val workers: Seq[WorkerMetadata] = request.workers

Future {
logger.info(s"[WorkerServer] Exchange ($request)")

val workers: Seq[WorkerMetadata] = request.workers

inputFiles.foreach(path => {
val block = Block.fromPath(path)

val targetWorkers = workers
.filter(_.keyRange.get.includes(block.records.head.key))
.toList

logger.info(s"[WorkerServer] Sending $block to $targetWorkers")

Await.result(
Future.sequence(
targetWorkers
.map(worker => WorkerClient(worker.host, worker.port))
.map(_.saveBlock(block))
Future.traverse(targetWorkers)(sendBlock(block))(
GenericBuildFrom[WorkerMetadata, SaveBlockReply],
executionContext
),
scala.concurrent.duration.Duration.Inf
)
})

logger.info("[WorkerServer] Sent blocks")

promise.success(new ExchangeReply())
}(executionContext)

Expand Down Expand Up @@ -198,7 +240,13 @@ class WorkerService(

val mergedBlock = blocks.merged

val results = mergedBlock.writeTo(outputDirectory / "result")
logger.info("[WorkerServer] Merged blocks")

logger.info("[WorkerServer] Writing merged block")

val results = mergedBlock.writeToDirectory(outputDirectory)

logger.info("[WorkerServer] Wrote merged block")

targetFiles.foreach(file => {
val result = file.delete()
Expand Down

0 comments on commit 08d1335

Please sign in to comment.