Skip to content

Commit

Permalink
Merge pull request #101 from betarixm/fix/fixed-thread-pool
Browse files Browse the repository at this point in the history
Make worker use FixedThreadPool
  • Loading branch information
betarixm authored Dec 5, 2023
2 parents d0b0723 + 51330f3 commit 37ce60f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
12 changes: 10 additions & 2 deletions log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@
status = warn

# Name of the configuration
name = ConsoleLogConfigDemo
name = ConsoleFileLogConfigDemo

# Console appender configuration
appender.console.type = Console
appender.console.name = consoleLogger
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

# File appender configuration
appender.file.type = File
appender.file.name = fileLogger
appender.file.fileName = logs/app.log
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

# Root logger level
rootLogger.level = info

# Root logger referring to console appender
# Root logger referring to both console and file appenders
rootLogger.appenderRef.stdout.ref = consoleLogger
rootLogger.appenderRef.file.ref = fileLogger
29 changes: 21 additions & 8 deletions worker/src/main/scala/WorkerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class WorkerService(
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
.newFixedThreadPool(15)
)

def sorted(path: Path) = Future {
Expand All @@ -84,15 +84,19 @@ class WorkerService(
Future {
try {
logger.info(s"[WorkerServer] Sort ($request)")

logger.info(
s"[WorkerServer] thread number in sort1 : ${java.lang.Thread.activeCount()}"
)
Await.result(
Future.traverse(inputFiles.toList)(sorted)(
GenericBuildFrom[File, File],
executionContext
),
scala.concurrent.duration.Duration.Inf
)

logger.info(
s"[WorkerServer] thread number in sort2 : ${java.lang.Thread.activeCount()}"
)
logger.info("[WorkerServer] Sorted")

promise.success(new SortReply())
Expand All @@ -114,7 +118,7 @@ class WorkerService(
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
.newFixedThreadPool(15)
)

val workers: Seq[WorkerMetadata] = request.workers
Expand Down Expand Up @@ -154,15 +158,19 @@ class WorkerService(
Future {
try {
logger.info(s"[WorkerServer] Partition ($request)")

logger.info(
s"[WorkerServer] thread number in partition1 : ${java.lang.Thread.activeCount()}"
)
Await.result(
Future.traverse(inputFiles.toList)(partition)(
GenericBuildFrom[File, Seq[File]],
executionContext
),
scala.concurrent.duration.Duration.Inf
)

logger.info(
s"[WorkerServer] thread number in partition2 : ${java.lang.Thread.activeCount()}"
)
logger.info("[WorkerServer] Partitioned")

promise.success(new PartitionReply())
Expand All @@ -182,7 +190,7 @@ class WorkerService(
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
.newFixedThreadPool(15)
)

val workers: Seq[WorkerMetadata] = request.workers
Expand Down Expand Up @@ -210,14 +218,19 @@ class WorkerService(
.map(_._2)

logger.info(s"[WorkerServer] Sending $chunk to $targetClients")

logger.info(
s"[WorkerServer] thread number in exchange before : ${java.lang.Thread.activeCount()}"
)
Await.result(
Future.traverse(targetClients)(_.saveBlock(chunk))(
GenericBuildFrom[WorkerClient, SaveBlockReply],
executionContext
),
scala.concurrent.duration.Duration.Inf
)
logger.info(
s"[WorkerServer] thread number in exchange end : ${java.lang.Thread.activeCount()}"
)
})
})

Expand Down

0 comments on commit 37ce60f

Please sign in to comment.