Scheduler
Scheduler
Reactor 에서 Scheduler 는 스레드를 관리해주는 역할을 담당한다.
Scheduler 전용 Operator 에는 subscribeOn, publishOn, parallel 메서드가 있다.
subscribeOn
subscribeOn 은 구독이 발생한 직후 실행될 스레드를 지정한다. 원본 Publisher 의 동작을 처리하기 위한 스레드를 할당 한다.
Examples:
fun main() {
Flux.fromArray(arrayOf(1, 2, 3, 4, 5)) // 원본 Publisher = 원본 Flux
.subscribeOn(Schedulers.boundedElastic())
.doOnNext { i: Int -> printWithThread("# doOnNext: $i") } // emit 되는 데이터 로깅
.doOnSubscribe { i: Subscription ->
printWithThread(
"# doOnSubscribe: $i"
)
}
.subscribe { i: Int -> printWithThread("# onNext: $i") }
Thread.sleep(1000L)
}
Outputs:
main - # doOnSubscribe: reactor.core.publisher.FluxPeek$PeekSubscriber@6a1aab78
boundedElastic-1 - # doOnNext: 1
boundedElastic-1 - # onNext: 1
boundedElastic-1 - # doOnNext: 2
boundedElastic-1 - # onNext: 2
boundedElastic-1 - # doOnNext: 3
boundedElastic-1 - # onNext: 3
boundedElastic-1 - # doOnNext: 4
boundedElastic-1 - # onNext: 4
boundedElastic-1 - # doOnNext: 5
boundedElastic-1 - # onNext: 5
doOnSubscribe 는 구독이 발생한 시점에 추가적인 작업을 실행할 수 있다.
doOnSubscribe 시 로그에 찍히는 스레드는 main 이다. (최초 실행 스레드가 main) 그 이후부턴 subscribeOn 을 통해 원본 Publisher 를 처리할 스케줄러를 지정하였기 때문에 boundedElastic 으로 로그에 찍히게 된다.
publishOn
publishOn Operator 는 Downstream 으로 Signal 을 전송할 때 실행되는 스레드를 제어하는 역할을 하는 Operator 라고 할 수 있다.
Examples:
fun main() {
Flux.fromArray(arrayOf(1, 2, 3, 4, 5)) // 원본 Publisher = 원본 Flux
.doOnNext { i: Int -> printWithThread("# doOnNext: $i") } // emit 되는 데이터 로깅
.doOnSubscribe { i: Subscription ->
printWithThread(
"# doOnSubscribe: $i"
)
}
.publishOn(Schedulers.parallel()) // Downstream 으로 emit 하는 스레드를 변경
.subscribe { i: Int -> printWithThread("# onNext: $i") }
Thread.sleep(1000L)
}
Outputs:
main - # doOnSubscribe: reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber@76a3e297
main - # doOnNext: 1
main - # doOnNext: 2
main - # doOnNext: 3
main - # doOnNext: 4
main - # doOnNext: 5
parallel-1 - # onNext: 1
parallel-1 - # onNext: 2
parallel-1 - # onNext: 3
parallel-1 - # onNext: 4
parallel-1 - # onNext: 5
publishOn 을 통해 Downstream 으로 emit 하는 스레드를 변경 하였다. 이 사실이 중요하다.
publishOn 을 여러개 사용하는 경우, publishOn 을 기준으로 사용되는 스레드가 변경된다.
parallel
subscribeOn 과 publishOn 은 동시성을 가지는 논리적인 스레드에 해당되고, parallel 은 물리적인 스레드에 해당된다. parallel 은 Round-Robin 방식으로 CPU 코어 개수 만큼의 스레드를 병렬로 실행 한다.
여기서 "CPU 코어 개수 만큼의 스레드란" 논리적인 코어 의 개수를 의미한다. CPU 에서 1개의 물리적인 코어는 2개의 논리적인 코어를 갖는다. 이 2개의 논리적인 코어를 물리적인 thread 라고 한다.
Examples:
fun main() {
Flux.fromArray(arrayOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) // 원본 Publisher = 원본 Flux
.parallel() // CPU 의 논리적인 코어(물리적인 스레드) 수에 맞게 작업을 골고루 분배
.runOn(Schedulers.parallel()) // 실제로 작업을 수행할 스레드 할당을 담당
.subscribe { i: Int -> printWithThread("# onNext: $i") }
Thread.sleep(1000L)
}
Outputs:
parallel-1 - # onNext: 1
parallel-5 - # onNext: 5
parallel-6 - # onNext: 6
parallel-7 - # onNext: 7
parallel-1 - # onNext: 9
parallel-4 - # onNext: 4
parallel-2 - # onNext: 2
parallel-2 - # onNext: 10
parallel-8 - # onNext: 8
parallel-3 - # onNext: 3
만약 runOn Operator 를 주석처리하면 병렬 처리가 적용되지 않고, main 스레드 위에서 처리되는 것을 볼 수 있다.
CPU 의 논리적인 코어 수(물리적인 Thread 수)에 맞게 데이터를 그룹화 한 것을 Reactor 에서는 rail 이라고 한다.
Kinds
Schedulers.immediate():
- 별도의 스레드를 추가 생성하지 않고 현재 스레드에서 작업을 처리할 때 사용
Schedulers.single():
- 단일 스레드를 내에서 처리할 때 사용
- 하나의 스레드를 재사용하여 작업을 처리하기 때문에 지연 시간이 짧은 작업 을 처리하는데 좋음
Schedulers.newSingle():
- 호출 시 마다 새로운 스레드를 하나 생성
- 두 번째 인자에 데몬 스레드(demon thread)로 동작하게 할지 여부를 정할 수 있음
- 데몬 스레드는 주 스레드가 종료되면 자동으로 종료됨
Schedulers.boundedElastic():
- ExecutorServicd 기반의 스레드 풀을 생성한 후, 그 안에서 정해진 수 만큼의 스레드를 사용하고 작업을 처리, 처리 후에는 스레드 반납
- 스레드 풀의 크기는 CPU 코어 수에 따라 자동으로 설정됨 (CPU 코어 수 * 10)
- 대기 가능한 스레드는 최대 100,000 개 이며 큐에서 대기함
- HTTP 요청 같은 실행 시간이 긴 Blocking I/O 작업에 효과적
Schedulers.parallel():
- Non-Blocking I/O 작업에 효과적
- CPU 의 논리적인 코어(물리적인 스레드) 수에 맞게 작업을 분배
Schedulers.fromExecutorService():
- 기존에 사용 중인 ExecutorService 가 있다면 ExecutorService 로 부터 Scheduler 생성
Summary
- subscribeOn 은 원본 Publisher 의 동작을 처리하기 위한 스레드를 할당한다.
- publishOn 은 Downstream 으로 emit 하는 스레드를 변경한다.
- publishOn 은 한 개 이상 사용할 수 있다.
- subscribeOn 과 publishOn 을 적절하게 사용하면 emit 하는 스레드와 emit 된 데이터를 처리하는 스레드를 분리할 수 있다.
- parallel 은 CPU 의 논리적인 코어(물리적인 스레드) 수에 맞게 작업을 골고루 분배한다.
References
- 스프링으로 시작하는 리액티브 프로그래밍 / 황정식 저 / 비제이퍼블릭