Skip to content

Commit

Permalink
feat: adjust executor's lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
betarixm committed Dec 11, 2023
1 parent babf34f commit f18c8e7
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 16 deletions.
30 changes: 25 additions & 5 deletions worker/src/main/scala/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import kr.ac.postech.paranode.utils.Hooks
import org.apache.logging.log4j.scala.Logging

import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand All @@ -28,14 +29,27 @@ object Worker extends Logging {
workerArguments.outputDirectory
)

val executor = Executors.newCachedThreadPool()

val executionContext: ExecutionContext =
ExecutionContext.fromExecutor(
executor
)

try {
Await.result(
worker.run()(ExecutionContext.global),
worker.run()(executionContext),
scala.concurrent.duration.Duration.Inf
)
} catch {
case _: io.grpc.StatusRuntimeException => System.exit(0)
case _: Exception => System.exit(1)
case statusRuntimeException: io.grpc.StatusRuntimeException if {
val status = statusRuntimeException.getStatus
status.getCode == io.grpc.Status.Code.UNAVAILABLE && status.getDescription
.contains("debug data: app_requested")
} => // Suppress
} finally {
worker.shutdown()
executor.shutdown()
}
}

Expand All @@ -51,15 +65,21 @@ class Worker(
) extends Logging {
private val workerMetadata = WorkerMetadata(host, port, None)

private val serviceExecutor = Executors.newCachedThreadPool()

private val serviceExecutionContext: ExecutionContext =
ExecutionContext.fromExecutor(
Executors.newCachedThreadPool()
serviceExecutor
)

private val server = new GrpcServer(
WorkerService(
inputDirectories,
outputDirectory
outputDirectory,
onFinished = () => {
serviceExecutor.shutdown()
serviceExecutor.awaitTermination(3, TimeUnit.SECONDS)
}
)(serviceExecutionContext),
port
)
Expand Down
91 changes: 80 additions & 11 deletions worker/src/main/scala/WorkerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import kr.ac.postech.paranode.utils.GenericBuildFrom
import org.apache.logging.log4j.scala.Logging

import java.util.UUID
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import scala.concurrent._
import scala.reflect.io.Directory
import scala.reflect.io.File
Expand All @@ -20,13 +22,15 @@ import scala.util.hashing.MurmurHash3
object WorkerService {
def apply(
inputDirectories: Array[Directory],
outputDirectory: Directory
outputDirectory: Directory,
onFinished: () => Unit = () => {}
)(implicit executionContext: ExecutionContext): ServerServiceDefinition =
WorkerGrpc.bindService(
new WorkerService(
executionContext,
inputDirectories,
outputDirectory
outputDirectory,
onFinished
),
executionContext
)
Expand All @@ -35,7 +39,8 @@ object WorkerService {
class WorkerService(
executionContext: ExecutionContext,
inputDirectories: Array[Directory],
outputDirectory: Directory
outputDirectory: Directory,
onFinished: () => Unit = () => {}
) extends WorkerGrpc.Worker
with Logging {

Expand Down Expand Up @@ -69,10 +74,11 @@ class WorkerService(
override def sort(request: SortRequest): Future[SortReply] = {
val promise = Promise[SortReply]

val executor = Executors.newFixedThreadPool(15)

implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newFixedThreadPool(15)
ExecutionContext.fromExecutor(
executor
)

def sorted(path: Path) = Future {
Expand Down Expand Up @@ -108,6 +114,11 @@ class WorkerService(
}
}(executionContext)

promise.future.onComplete(_ => {
executor.shutdown()
executor.awaitTermination(5, TimeUnit.SECONDS)
})

promise.future
}

Expand All @@ -116,10 +127,11 @@ class WorkerService(
): Future[PartitionReply] = {
val promise = Promise[PartitionReply]

val executor = Executors.newFixedThreadPool(15)

implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newFixedThreadPool(15)
ExecutionContext.fromExecutor(
executor
)

val workers: Seq[WorkerMetadata] = request.workers
Expand Down Expand Up @@ -182,16 +194,22 @@ class WorkerService(
}
}(executionContext)

promise.future.onComplete(_ => {
executor.shutdown()
executor.awaitTermination(5, TimeUnit.SECONDS)
})

promise.future
}

override def exchange(request: ExchangeRequest): Future[ExchangeReply] = {
val promise = Promise[ExchangeReply]

val executor = Executors.newFixedThreadPool(15)

implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newFixedThreadPool(15)
executor
)

val workers: Seq[WorkerMetadata] = request.workers
Expand Down Expand Up @@ -247,6 +265,11 @@ class WorkerService(
}
}(executionContext)

promise.future.onComplete(_ => {
executor.shutdown()
executor.awaitTermination(5, TimeUnit.SECONDS)
})

promise.future
}

Expand All @@ -255,6 +278,12 @@ class WorkerService(
): Future[SaveBlockReply] = {
val promise = Promise[SaveBlockReply]

val executor = Executors.newFixedThreadPool(15)
implicit val executionContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(
executor
)

Future {
try {
logger.info(s"[WorkerServer] SaveBlock ($request)")
Expand All @@ -277,12 +306,24 @@ class WorkerService(
}
}(executionContext)

promise.future.onComplete(_ => {
executor.shutdown()
executor.awaitTermination(5, TimeUnit.SECONDS)
})

promise.future
}

override def merge(request: MergeRequest): Future[MergeReply] = {
val promise = Promise[MergeReply]

val executor = Executors.newFixedThreadPool(15)

implicit val executionContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(
executor
)

Future {
try {
logger.info(s"[WorkerServer] Merge ($request)")
Expand Down Expand Up @@ -322,7 +363,35 @@ class WorkerService(
}
}(executionContext)

promise.future.onComplete(_ => {
executor.shutdown()
executor.awaitTermination(5, TimeUnit.SECONDS)
})

promise.future
}

override def terminate(request: TerminateRequest): Future[TerminateReply] = {
val promise = Promise[TerminateReply]

val executor = Executors.newSingleThreadExecutor()

implicit val executionContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(
executor
)

Future {
logger.info(s"[WorkerServer] Terminate ($request)")
promise.success(new TerminateReply())
}(executionContext)

promise.future.onComplete(_ => {
executor.shutdown()
executor.awaitTermination(5, TimeUnit.SECONDS)
onFinished()
})

promise.future
}
}

0 comments on commit f18c8e7

Please sign in to comment.