Jong Wook Kim edited this page Feb 2, 2017 · 1 revision 패키지에는 병렬 프로그래밍을 위한 다음과 같은 도구가 포함되어 있습니다.


저장소에 대량으로 데이터를 넣는 경우 클라이언트단에서 스루풋을 제한해야 할 때가 있습니다. 그런 경우를 위해 만들어진 Throttled는 임의의 시퀀스를 원하는 속도로 탐색할 수 있게 해 줍니다.

for (i <- Throttled(1 to 100, 4.0)) {
  println(s"loop $i executed at ${System.currentTimeMillis}")

위 코드는 1 to 100이란 시퀀스를 초당 4.0회, 즉 250ms 간격으로 탐색하게 하며 출력 결과는 다음과 같습니다.

loop 1 executed at 1424772133000
loop 2 executed at 1424772133250
loop 3 executed at 1424772133500
loop 4 executed at 1424772133750
loop 5 executed at 1424772134000

내부적으로 필요한 시간간격에 따라 Thread.sleep과 busy wait을 혼합하여 사용하고 있으며, 경험적으로 초당 100만번까지는 1% 이내의, 초당 1000만번까지는 10% 이내의 오차로 사용할 수 있습니다.

Future 벗겨내기

Future를 리턴하는 비동기 구현을 테스트할 때 Await.result의 호출이 번거로운데, 이것을 간단히 sync() 호출로 해결할 수 있게 해 줍니다.


val future: Future[String] = someAsyncFunction()

이름 충돌로 인해 implicit conversion이 잘 동작하지 않을 수 있어서 await()과 같은 역할을 하는 get(), block(), sync() 메소드도 사용할 수 있으며, 비동기 API의 테스트케이스 작성이 간단해져서 편리합니다.

논블로킹 타임아웃

특정 스레드를 블록하지 않으면서 Future에 Timeout을 걸기 위해서는 .timeout() 메소드를 사용할 수 있습니다.

import scala.concurrent.duration._

Get("").timeout(5.millis).sync()  // 5 ms 안에 응답을 받지 못하면 실패

Netty의 HashedWheelTimer 구현을 빌려서 사용하기 때문에 ScheduledExecutorService 종류보다 가벼우며, 대신 100ms 정도의 오차가 있을 수 있습니다.


특정한 이름에 대해서 유일한 인스턴스가 필요할 때 NamedSingleton을 사용할 수 있습니다. 다음 예는 NamedSingleton을 이용하여 특정 접두어에 유일한 번호를 붙여서 얻는 thread-safe 코드입니다.


/** 주어진 이름에 대해 유일한 AtomicInteger를 제공하는 NamedSingleton */
val counter = NamedSingleton { new AtomicInteger }

/** 주어진 prefix로 시작하는 유일한 이름 리턴 */
def uniqueName(prefix: String) = s"$prefix-${counter(prefix).incrementAndGet()}"

비슷하게 String이 아닌 키를 사용할 수 있는 KeyedSingleton도 있습니다.


비슷한 방식으로, 특정한 이름에 대한 락이 필요할 때 NamedLock을 사용할 수 있습니다. 다음 예는 로딩이 오래 걸리는 리소스를 중복으로 로드되는 것을 방지하기 위해 리소스의 이름을 기준으로 락을 사용하는 예제입니다.

/** 주어진 이름을 가지는 리소스 로딩; 동시에 로드되지 않게 하기 위해 락을 사용한다 */
def loadResource(name: String) {
  NamedLock(name).synchronized {
    if (alreadyLoaded(name)) {
      // name으로 로딩된 리소스 리턴
    } else {
      // 리소스 로드 후 리턴


java.util.concurrent에 포함된 ExecutorService를 스칼라에서 쓰기 쉽게 감싼 도구입니다. Executor가 만들 스레드의 이름을 지정하거나 daemon 스레드를 사용할 지 여부를 결정할 수 있습니다. 디폴트로 daemon 스레드를 만듭니다.


val executor = NamedExecutors.scheduled("my-scheduler", daemon = false)

executor.withFixedRate(100.millis) {
  println("this message will be printed every 100 ms")

// scala.concurrent.Future 에서 사용할 수 있는 implicit ExecutionContext
implicit val context = ExecutionContext.fromExecutorService(executor)

위 코드의 마지막 라인은 NamedExecutors를 이용하여 Scala의 비동기 프로그래밍에 필요한 ExecutionContext를 만드는 예제입니다. 이렇게 하면 스레드 덤프에서 어떤 용도로 만들어진 스레드인지를 확인할 수 있어서 ExecutionContext.global보다 편리할 수 있습니다.

NamedExecutors 오브젝트에는 Executor의 종류에 따라 다음과 같은 메소드가 있습니다. 추가적으로 daemon = false 파라미터를 주면 daemon이 아닌 스레드를 만듭니다.

메소드 스케줄 가능 스레드
scheduled(name) 싱글 스레드
scheduledPool(name,size) 고정 크기 스레드풀
cached(name) 동적으로 리사이즈되는 스레드풀
fixed(name,size) 고정 크기 스레드풀
single(name) 싱글스레드
forkJoin(name,size) ForkJoin Pool

RichExecutorService, RichScheduledExecutorService

NamedExecutors의 메소드들이 리턴하는 ExecutorService들은 java.util.concurrent 패키지와 호환되면서도 스칼라를 위한 편리한 메소드 submit을 추가한 RichExecutorService 입니다.

val executor = NamedExecutors.single("hello")

executor.submit {
} // "hello-0" 출력

또한 스케줄 가능한 Executor를 요청할 경우 작업을 즉시 실행하는 것뿐만 아니라 특정 딜레이를 주거나 특정 시간간격으로 실행되는 작업을 실행하게 할 수 있습니다.

import scala.concurrent.duration._

val scheduler = NamedExecutors.scheduled("my-scheduler")

scheduler.scheduleIn(3.seconds) {
  println("이 메시지는 3초 후 출력됨")

scheduler.withFixedRate(3.seconds) {
  println("이 메시지는 매 3초마다 출력됨")

scheduler.withFixedRate(3.seconds) {
  println("이 메시지는 매 3초마다 출력됨")

scheduler.withFixedDelay(3.seconds) {
  println("이 메시지는 매 3초 간격으로 출력됨")

scheduler.withFixedRate(3.seconds, 2.seconds) {
  println("이 메시지는 2초 후부터 시작하여 매 3초마다 출력됨")

withFixedRatewithFixedDelay의 차이점은 일정한 실행 주기를 지정하는 것과 이전 작업이 끝난 뒤 다음 작업이 시작되기까지의 시간간격을 지정하는 것의 차이입니다. 작업의 실행시간이 짧지 않은 경우 차이가 생깁니다.

