Skip to content

Commit

Permalink
fix to newFixedThreadPool
Browse files Browse the repository at this point in the history
  • Loading branch information
leejiwon1125 committed Nov 30, 2023
1 parent 02a7686 commit e2cfd58
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions worker/src/main/scala/WorkerService.scala
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ class WorkerService(
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
.newFixedThreadPool(15)
)

def sorted(path: Path) = Future {
@@ -84,15 +84,19 @@ class WorkerService(
Future {
try {
logger.info(s"[WorkerServer] Sort ($request)")

logger.info(
s"[WorkerServer] thread number in sort1 : ${java.lang.Thread.activeCount()}"
)
Await.result(
Future.traverse(inputFiles.toList)(sorted)(
GenericBuildFrom[File, File],
executionContext
),
scala.concurrent.duration.Duration.Inf
)

logger.info(
s"[WorkerServer] thread number in sort2 : ${java.lang.Thread.activeCount()}"
)
logger.info("[WorkerServer] Sorted")

promise.success(new SortReply())
@@ -114,7 +118,7 @@ class WorkerService(
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
.newFixedThreadPool(15)
)

val workers: Seq[WorkerMetadata] = request.workers
@@ -154,15 +158,19 @@ class WorkerService(
Future {
try {
logger.info(s"[WorkerServer] Partition ($request)")

logger.info(
s"[WorkerServer] thread number in partition1 : ${java.lang.Thread.activeCount()}"
)
Await.result(
Future.traverse(inputFiles.toList)(partition)(
GenericBuildFrom[File, Seq[File]],
executionContext
),
scala.concurrent.duration.Duration.Inf
)

logger.info(
s"[WorkerServer] thread number in partition2 : ${java.lang.Thread.activeCount()}"
)
logger.info("[WorkerServer] Partitioned")

promise.success(new PartitionReply())
@@ -182,7 +190,7 @@ class WorkerService(
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
.newFixedThreadPool(15)
)

val workers: Seq[WorkerMetadata] = request.workers
@@ -210,14 +218,19 @@ class WorkerService(
.map(_._2)

logger.info(s"[WorkerServer] Sending $chunk to $targetClients")

logger.info(
s"[WorkerServer] thread number in exchange before : ${java.lang.Thread.activeCount()}"
)
Await.result(
Future.traverse(targetClients)(_.saveBlock(chunk))(
GenericBuildFrom[WorkerClient, SaveBlockReply],
executionContext
),
scala.concurrent.duration.Duration.Inf
)
logger.info(
s"[WorkerServer] thread number in exchange end : ${java.lang.Thread.activeCount()}"
)
})
})

0 comments on commit e2cfd58

Please sign in to comment.