Skip to content

Commit

Permalink
feat: reflect suggested request 1. change value name 2. make not to u…
Browse files Browse the repository at this point in the history
…se file 3. use InetAddress
  • Loading branch information
leejiwon1125 committed Nov 22, 2023
1 parent f4d94e5 commit b5db92a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 84 deletions.
39 changes: 0 additions & 39 deletions master/src/main/scala/AuxFunction.scala

This file was deleted.

21 changes: 11 additions & 10 deletions master/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -1,43 +1,44 @@
package kr.ac.postech.paranode.master

import kr.ac.postech.paranode.core.WorkerMetadata
import kr.ac.postech.paranode.master.AuxFunction.getWorkerDetails
import kr.ac.postech.paranode.rpc.MasterServer

import java.nio.file.Files
import java.nio.file.Paths
import java.net._

import java.net.URL
import scala.io.Source
import scala.util.Try

object Main {
def main(args: Array[String]): Unit = {
val requestLimit = Try(args(0).toInt).getOrElse {
val numberOfWorker = Try(args(0).toInt).getOrElse {
println("Invalid command")
return
}

val server = new MasterServer(scala.concurrent.ExecutionContext.global)
server.startServer()

while (server.getRequestCount < requestLimit) {
while (server.getWorkerDetails.size < numberOfWorker) {
Thread.sleep(1000)
}

val workerInfo: List[WorkerMetadata] = getWorkerDetails()
val workerInfo: List[WorkerMetadata] = server.getWorkerDetails

assert(workerInfo.size == requestLimit)
assert(workerInfo.size == numberOfWorker)

try {
val url = new URL("http://checkip.amazonaws.com")
val source = Source.fromURL(url)
val publicIpAddress = source.mkString.trim
source.close()
val publicIpAddress = InetAddress.getLocalHost.getHostAddress

println(publicIpAddress + ":" + server.getPort)
println(workerInfo.map(_.host).mkString(", "))
} catch {
case e: Exception => e.printStackTrace()
}
// TODO: start WorkerClient
// TODO: save workerInfo and start WorkerClient

}

}
40 changes: 10 additions & 30 deletions rpc/src/main/scala/MasterServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import java.util.logging.Logger
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise

import master.{MasterGrpc, RegisterReply, RegisterRequest}

import scala.collection.mutable.{ListBuffer, WrappedArray}
import kr.ac.postech.paranode.core.WorkerMetadata

object MasterServer {
private val logger = Logger.getLogger(classOf[MasterServer].getName)

Expand All @@ -33,13 +35,14 @@ class MasterServer(executionContext: ExecutionContext) { self =>
.addService(MasterGrpc.bindService(new MasterImpl, executionContext))
.build()

private var requestCount = 0
private val workerDetails:ListBuffer[WorkerMetadata] = ListBuffer()

def incrementRequestCount(): Unit = synchronized {
requestCount += 1
def addWorkerInfo(workerMetadata: WorkerMetadata): Unit = synchronized {
workerDetails += workerMetadata
}

def getRequestCount: Int = requestCount
def getWorkerDetails: List[WorkerMetadata] = workerDetails.toList

def getPort: String = port.toString

private def start(): Unit = {
Expand Down Expand Up @@ -78,31 +81,8 @@ class MasterServer(executionContext: ExecutionContext) { self =>
val promise = Promise[RegisterReply]

Future {
try {
val dirPath = Paths.get("worker_register")
if (!Files.exists(dirPath)) {
Files.createDirectories(dirPath)
}
val filePath =
dirPath.resolve(request.worker.get.host + ".txt").toString

val writer = new PrintWriter(new File(filePath), "UTF-8")
try {
writer.println(
s"Worker Host: ${request.worker.get.host}, Worker Port: ${request.worker.get.port}"
)
} finally {
self.incrementRequestCount()
writer.close()
}
promise.success(new RegisterReply())
} catch {
case e: Exception =>
MasterServer.logger.warning(
"Failed to write to file: " + e.getMessage
)
promise.failure(e)
}
val workerMetadata = WorkerMetadata(request.worker.get.host, request.worker.get.port, None)
addWorkerInfo(workerMetadata)
}(executionContext)

promise.future
Expand Down
9 changes: 4 additions & 5 deletions worker/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package kr.ac.postech.paranode.worker
import kr.ac.postech.paranode.core.WorkerMetadata
import kr.ac.postech.paranode.rpc.MasterClient

import java.net.URL
import java.net.{InetAddress, URL}
import scala.io.Source
import scala.util.Try

Expand Down Expand Up @@ -34,14 +34,13 @@ object Main {
// Open MasterClient and request register
val client = MasterClient(ip, port)
try {
val url = new URL("http://checkip.amazonaws.com")
val source = Source.fromURL(url)
val publicIpAddress = source.mkString.trim
source.close()
val publicIpAddress = InetAddress.getLocalHost.getHostAddress

val workerMetadata = WorkerMetadata(publicIpAddress, -1, None)
client.register(workerMetadata)

// doesn't come here. I think its because we use blockingStub.

} finally {
client.shutdown()
}
Expand Down

0 comments on commit b5db92a

Please sign in to comment.