Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iteration 07 #109

Merged
merged 37 commits into from
Dec 7, 2023
Merged
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
03afa50
feat: add binaries
betarixm Dec 4, 2023
5ff74bb
refactor: remove hard-coded pathes
betarixm Dec 4, 2023
1d59ce2
fix: permission
betarixm Dec 4, 2023
18472ad
fix: remove hard-coded pathes
betarixm Dec 4, 2023
e33b0e0
fix: permission
betarixm Dec 4, 2023
d0b0723
Merge pull request #102 from betarixm/refactor/scripts
betarixm Dec 5, 2023
a729270
fix: use newFixedThreadPool
leejiwon1125 Nov 30, 2023
51330f3
feat: write logs to files
leejiwon1125 Nov 30, 2023
37ce60f
Merge pull request #101 from betarixm/fix/fixed-thread-pool
betarixm Dec 5, 2023
b46699e
feat: setup assembly
betarixm Dec 4, 2023
8984030
feat: wrap jar files as scripts
betarixm Dec 4, 2023
cb236cd
feat: use jar in containers
betarixm Dec 4, 2023
6c5385a
feat: make release automatic
betarixm Dec 4, 2023
7117ec4
chore: move wrappers into bin directory
betarixm Dec 5, 2023
b5f1577
Merge pull request #103 from betarixm/feat/packaging
betarixm Dec 7, 2023
b85ef56
chore: remove unused files
betarixm Dec 5, 2023
a3903ff
Merge pull request #104 from betarixm/chore/remove-the-unused
betarixm Dec 7, 2023
e5d9d21
feat: add hooks
betarixm Dec 5, 2023
b3d8c4c
refactor: split worker and master using companion
betarixm Dec 5, 2023
0421d33
feat: init subproject for e2e testing
betarixm Dec 5, 2023
42a3136
feat: make boilerplate for e2e testing
betarixm Dec 5, 2023
bee7cee
fix: await until task ends
betarixm Dec 5, 2023
3cc2cce
feat: test is sorted
betarixm Dec 5, 2023
45702f2
fix: length of test data
betarixm Dec 5, 2023
21e35c0
fix: check result with expected output
betarixm Dec 5, 2023
c3bfbf6
fix: use hash to avoid long filename
betarixm Dec 5, 2023
82251cc
feat: shutdown server
betarixm Dec 5, 2023
8cf0f7c
feat: log length of output records
betarixm Dec 5, 2023
55a7e94
ci: run e2e test on ci
betarixm Dec 5, 2023
925405d
Merge pull request #105 from betarixm/test/e2e
betarixm Dec 7, 2023
5b83507
fix: ensure that hooks return non-loopback address
betarixm Dec 7, 2023
686981e
Merge pull request #107 from betarixm/fix/address
betarixm Dec 7, 2023
5a8a2f2
config: disable stdout logging
betarixm Dec 7, 2023
9c5a9ed
docs: update readme
betarixm Dec 7, 2023
46257f9
Merge pull request #106 from betarixm/chore/clean-up
betarixm Dec 7, 2023
d40ab4c
docs: add progress-report-06
leejiwon1125 Dec 7, 2023
69d800d
Merge pull request #108 from betarixm/docs/progress-report-06
leejiwon1125 Dec 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: use newFixedThreadPool
  • Loading branch information
leejiwon1125 authored and betarixm committed Dec 5, 2023
commit a7292702b841b04fb73702fcdd8ce68f3d150983
29 changes: 21 additions & 8 deletions worker/src/main/scala/WorkerService.scala
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ class WorkerService(
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
.newFixedThreadPool(15)
)

def sorted(path: Path) = Future {
@@ -84,15 +84,19 @@ class WorkerService(
Future {
try {
logger.info(s"[WorkerServer] Sort ($request)")

logger.info(
s"[WorkerServer] thread number in sort1 : ${java.lang.Thread.activeCount()}"
)
Await.result(
Future.traverse(inputFiles.toList)(sorted)(
GenericBuildFrom[File, File],
executionContext
),
scala.concurrent.duration.Duration.Inf
)

logger.info(
s"[WorkerServer] thread number in sort2 : ${java.lang.Thread.activeCount()}"
)
logger.info("[WorkerServer] Sorted")

promise.success(new SortReply())
@@ -114,7 +118,7 @@ class WorkerService(
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
.newFixedThreadPool(15)
)

val workers: Seq[WorkerMetadata] = request.workers
@@ -154,15 +158,19 @@ class WorkerService(
Future {
try {
logger.info(s"[WorkerServer] Partition ($request)")

logger.info(
s"[WorkerServer] thread number in partition1 : ${java.lang.Thread.activeCount()}"
)
Await.result(
Future.traverse(inputFiles.toList)(partition)(
GenericBuildFrom[File, Seq[File]],
executionContext
),
scala.concurrent.duration.Duration.Inf
)

logger.info(
s"[WorkerServer] thread number in partition2 : ${java.lang.Thread.activeCount()}"
)
logger.info("[WorkerServer] Partitioned")

promise.success(new PartitionReply())
@@ -182,7 +190,7 @@ class WorkerService(
implicit val executionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newCachedThreadPool()
.newFixedThreadPool(15)
)

val workers: Seq[WorkerMetadata] = request.workers
@@ -210,14 +218,19 @@ class WorkerService(
.map(_._2)

logger.info(s"[WorkerServer] Sending $chunk to $targetClients")

logger.info(
s"[WorkerServer] thread number in exchange before : ${java.lang.Thread.activeCount()}"
)
Await.result(
Future.traverse(targetClients)(_.saveBlock(chunk))(
GenericBuildFrom[WorkerClient, SaveBlockReply],
executionContext
),
scala.concurrent.duration.Duration.Inf
)
logger.info(
s"[WorkerServer] thread number in exchange end : ${java.lang.Thread.activeCount()}"
)
})
})