From e2cfd58ef809b6b208aaff9f727c84f87076ca34 Mon Sep 17 00:00:00 2001 From: leejiwon1125 Date: Thu, 30 Nov 2023 20:20:27 +0900 Subject: [PATCH] fix to newFixedThreadPool --- worker/src/main/scala/WorkerService.scala | 29 ++++++++++++++++------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/worker/src/main/scala/WorkerService.scala b/worker/src/main/scala/WorkerService.scala index a4cc259..db7402f 100644 --- a/worker/src/main/scala/WorkerService.scala +++ b/worker/src/main/scala/WorkerService.scala @@ -71,7 +71,7 @@ class WorkerService( implicit val executionContext: ExecutionContextExecutor = scala.concurrent.ExecutionContext.fromExecutor( java.util.concurrent.Executors - .newCachedThreadPool() + .newFixedThreadPool(15) ) def sorted(path: Path) = Future { @@ -84,7 +84,9 @@ class WorkerService( Future { try { logger.info(s"[WorkerServer] Sort ($request)") - + logger.info( + s"[WorkerServer] thread number in sort1 : ${java.lang.Thread.activeCount()}" + ) Await.result( Future.traverse(inputFiles.toList)(sorted)( GenericBuildFrom[File, File], @@ -92,7 +94,9 @@ class WorkerService( ), scala.concurrent.duration.Duration.Inf ) - + logger.info( + s"[WorkerServer] thread number in sort2 : ${java.lang.Thread.activeCount()}" + ) logger.info("[WorkerServer] Sorted") promise.success(new SortReply()) @@ -114,7 +118,7 @@ class WorkerService( implicit val executionContext: ExecutionContextExecutor = scala.concurrent.ExecutionContext.fromExecutor( java.util.concurrent.Executors - .newCachedThreadPool() + .newFixedThreadPool(15) ) val workers: Seq[WorkerMetadata] = request.workers @@ -154,7 +158,9 @@ class WorkerService( Future { try { logger.info(s"[WorkerServer] Partition ($request)") - + logger.info( + s"[WorkerServer] thread number in partition1 : ${java.lang.Thread.activeCount()}" + ) Await.result( Future.traverse(inputFiles.toList)(partition)( GenericBuildFrom[File, Seq[File]], @@ -162,7 +168,9 @@ class WorkerService( ), scala.concurrent.duration.Duration.Inf ) - + logger.info( + s"[WorkerServer] thread number in partition2 : ${java.lang.Thread.activeCount()}" + ) logger.info("[WorkerServer] Partitioned") promise.success(new PartitionReply()) @@ -182,7 +190,7 @@ class WorkerService( implicit val executionContext: ExecutionContextExecutor = scala.concurrent.ExecutionContext.fromExecutor( java.util.concurrent.Executors - .newCachedThreadPool() + .newFixedThreadPool(15) ) val workers: Seq[WorkerMetadata] = request.workers @@ -210,7 +218,9 @@ class WorkerService( .map(_._2) logger.info(s"[WorkerServer] Sending $chunk to $targetClients") - + logger.info( + s"[WorkerServer] thread number in exchange before : ${java.lang.Thread.activeCount()}" + ) Await.result( Future.traverse(targetClients)(_.saveBlock(chunk))( GenericBuildFrom[WorkerClient, SaveBlockReply], @@ -218,6 +228,9 @@ class WorkerService( ), scala.concurrent.duration.Duration.Inf ) + logger.info( + s"[WorkerServer] thread number in exchange end : ${java.lang.Thread.activeCount()}" + ) }) })