From b5db92ab7c09755f9de355b1f7906e129cf55542 Mon Sep 17 00:00:00 2001 From: leejiwon1125 Date: Wed, 22 Nov 2023 23:29:16 +0900 Subject: [PATCH] feat: reflect suggested request 1. change value name 2. make not to use file 3. use InetAddress --- master/src/main/scala/AuxFunction.scala | 39 ------------------------ master/src/main/scala/Main.scala | 21 ++++++------- rpc/src/main/scala/MasterServer.scala | 40 +++++++------------------ worker/src/main/scala/Main.scala | 9 +++--- 4 files changed, 25 insertions(+), 84 deletions(-) delete mode 100644 master/src/main/scala/AuxFunction.scala diff --git a/master/src/main/scala/AuxFunction.scala b/master/src/main/scala/AuxFunction.scala deleted file mode 100644 index 621ed76..0000000 --- a/master/src/main/scala/AuxFunction.scala +++ /dev/null @@ -1,39 +0,0 @@ -package kr.ac.postech.paranode.master - -import kr.ac.postech.paranode.core.WorkerMetadata - -import java.nio.file.Files -import java.nio.file.Paths -import scala.collection.mutable.ListBuffer -import scala.io.Source - -object AuxFunction { - - def getWorkerDetails(): List[WorkerMetadata] = { - - val dirPath = Paths.get("worker_register") - val workerDetails = ListBuffer[WorkerMetadata]() - - if (Files.exists(dirPath)) { - val files = Files.list(dirPath).toArray - files.foreach { file => - val source = Source.fromFile(file.toString) - try { - for (line <- source.getLines) { - val parts = line.split(", ").map(_.split(": ").last) - if (parts.length == 2) { - val ip = parts(0) - val port = parts(1).toInt - workerDetails += (WorkerMetadata(ip, port, None)) - } - } - } finally { - source.close() - } - } - } - - workerDetails.toList - } - -} diff --git a/master/src/main/scala/Main.scala b/master/src/main/scala/Main.scala index b51bb49..1433f1c 100644 --- a/master/src/main/scala/Main.scala +++ b/master/src/main/scala/Main.scala @@ -1,16 +1,19 @@ package kr.ac.postech.paranode.master import kr.ac.postech.paranode.core.WorkerMetadata -import kr.ac.postech.paranode.master.AuxFunction.getWorkerDetails import kr.ac.postech.paranode.rpc.MasterServer +import java.nio.file.Files +import java.nio.file.Paths +import java.net._ + import java.net.URL import scala.io.Source import scala.util.Try object Main { def main(args: Array[String]): Unit = { - val requestLimit = Try(args(0).toInt).getOrElse { + val numberOfWorker = Try(args(0).toInt).getOrElse { println("Invalid command") return } @@ -18,26 +21,24 @@ object Main { val server = new MasterServer(scala.concurrent.ExecutionContext.global) server.startServer() - while (server.getRequestCount < requestLimit) { + while (server.getWorkerDetails.size < numberOfWorker) { Thread.sleep(1000) } - val workerInfo: List[WorkerMetadata] = getWorkerDetails() + val workerInfo: List[WorkerMetadata] = server.getWorkerDetails - assert(workerInfo.size == requestLimit) + assert(workerInfo.size == numberOfWorker) try { - val url = new URL("http://checkip.amazonaws.com") - val source = Source.fromURL(url) - val publicIpAddress = source.mkString.trim - source.close() + val publicIpAddress = InetAddress.getLocalHost.getHostAddress println(publicIpAddress + ":" + server.getPort) println(workerInfo.map(_.host).mkString(", ")) } catch { case e: Exception => e.printStackTrace() } - // TODO: start WorkerClient + // TODO: save workerInfo and start WorkerClient } + } diff --git a/rpc/src/main/scala/MasterServer.scala b/rpc/src/main/scala/MasterServer.scala index fa9f91d..44ade74 100644 --- a/rpc/src/main/scala/MasterServer.scala +++ b/rpc/src/main/scala/MasterServer.scala @@ -12,9 +12,11 @@ import java.util.logging.Logger import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise - import master.{MasterGrpc, RegisterReply, RegisterRequest} +import scala.collection.mutable.{ListBuffer, WrappedArray} +import kr.ac.postech.paranode.core.WorkerMetadata + object MasterServer { private val logger = Logger.getLogger(classOf[MasterServer].getName) @@ -33,13 +35,14 @@ class MasterServer(executionContext: ExecutionContext) { self => .addService(MasterGrpc.bindService(new MasterImpl, executionContext)) .build() - private var requestCount = 0 + private val workerDetails:ListBuffer[WorkerMetadata] = ListBuffer() - def incrementRequestCount(): Unit = synchronized { - requestCount += 1 + def addWorkerInfo(workerMetadata: WorkerMetadata): Unit = synchronized { + workerDetails += workerMetadata } - def getRequestCount: Int = requestCount + def getWorkerDetails: List[WorkerMetadata] = workerDetails.toList + def getPort: String = port.toString private def start(): Unit = { @@ -78,31 +81,8 @@ class MasterServer(executionContext: ExecutionContext) { self => val promise = Promise[RegisterReply] Future { - try { - val dirPath = Paths.get("worker_register") - if (!Files.exists(dirPath)) { - Files.createDirectories(dirPath) - } - val filePath = - dirPath.resolve(request.worker.get.host + ".txt").toString - - val writer = new PrintWriter(new File(filePath), "UTF-8") - try { - writer.println( - s"Worker Host: ${request.worker.get.host}, Worker Port: ${request.worker.get.port}" - ) - } finally { - self.incrementRequestCount() - writer.close() - } - promise.success(new RegisterReply()) - } catch { - case e: Exception => - MasterServer.logger.warning( - "Failed to write to file: " + e.getMessage - ) - promise.failure(e) - } + val workerMetadata = WorkerMetadata(request.worker.get.host, request.worker.get.port, None) + addWorkerInfo(workerMetadata) }(executionContext) promise.future diff --git a/worker/src/main/scala/Main.scala b/worker/src/main/scala/Main.scala index 2e7e7f6..f5ac9ea 100644 --- a/worker/src/main/scala/Main.scala +++ b/worker/src/main/scala/Main.scala @@ -3,7 +3,7 @@ package kr.ac.postech.paranode.worker import kr.ac.postech.paranode.core.WorkerMetadata import kr.ac.postech.paranode.rpc.MasterClient -import java.net.URL +import java.net.{InetAddress, URL} import scala.io.Source import scala.util.Try @@ -34,14 +34,13 @@ object Main { // Open MasterClient and request register val client = MasterClient(ip, port) try { - val url = new URL("http://checkip.amazonaws.com") - val source = Source.fromURL(url) - val publicIpAddress = source.mkString.trim - source.close() + val publicIpAddress = InetAddress.getLocalHost.getHostAddress val workerMetadata = WorkerMetadata(publicIpAddress, -1, None) client.register(workerMetadata) + // doesn't come here. I think its because we use blockingStub. + } finally { client.shutdown() }