Context

Reactor 의 Context 는 스레드에 매핑되는 것이 아니라 Subscriber 에 매핑 된다. 즉, 구독이 발생할 때마다 해당 구독과 연결된 하나의 Context 가 생긴다.

Write:

chain.filter()
    .contextWrite(Context.of(Contexts.MEMBER_ID, PK))

읽기의 경우에는 두 가지 방식이 있다.

  1. 원본 데이터 소스 레벨에서 읽는 방식
  2. Operator 체인 중간에서 읽는 방식

Read:

// 원본 데이터 소스 레벨에서 읽는 방식 -> deferContextual() Operator 사용
Flux.deferContextual { ctx ->
    // ContextView 를 통해서 데이터 읽기
    val memberId = ctx.getOrDefault(Contexts.MEMBER_ID, 0L)
    memberRepository.findById(memberId)
}

여기서 람다에 있는 ctx 파라미터는 ContextView 타입이다. Context 에 데이터를 쓸 때는 Context 를 사용하지만 데이터를 읽을 때는 ContextView 를 사용한다.

Reactor 에서는 Operator Chain 간의 서로 다른 스레드들이 Context 에 저장된 데이터에 손쉽게 접근할 수 있다. 매번 context.put() 을 통해 데이터를 쓰고 불변(immutable) 객체를 contextWrite() Operator 로 전달함으로써 Thread-safe 를 보장한다.

Characteristics

  • Context 는 구독이 발생할 때마다 하나의 Context 가 해당 구독에 연결된다.
mono.contextWrite(Context.of("key1", "TESLA"))
    .subscribe { value -> println("value: $value") }

mono.contextWrite(Context.of("key2", "HYUNDAI"))
    .subscribe { value -> println("value: $value") }

TESLA, HYUNDAI 가 하나의 Context 에 있을 것 같지만 위 특징으로 인해 각각의 구독에 연결된 Context 가 생기게 된다.

  • Context 는 Operator 체인의 아래에서 위로 전파(propagation) 된다.
  • 동일한 키에 대한 값을 중복해서 저장하면 Operator 체인에서 가장 위쪽에 위치한 contextWrite() 저장한 값으로 덮어쓴다.

아래에서 위로 전파되기 때문에, 모든 Operator 에서 저장된 데이터를 읽을 수 있도록 contextWrite() 을 Chain 의 가장 마지막에 둔다.

Mono.deferContextual(ctx -> Mono.just(ctx.get(key1)))
    .publishOn(Schedulers.parallel())
    .contextWrite(Context.of(key1, "TESLA"))
    .transformDeferredContextual((mono, ctx) -> mono.map(value -> ctx.getOrDefault(key1, "HYUNDAI")))
    .contextWrite(ctx -> ctx.put(key1, "GOOGLE"))
    .subscribe(value -> System.out.println("value: " + value));

Thread.sleep(1000L);
  • Inner Sequence 내부에서는 외부 Context 에 저장된 데이터를 읽을 수 있다.
  • Inner Sequence 외부에서는 Inner Sequence 에서 저장한 데이터를 읽을 수 없다.

When use Context ?

인증 정보 같은 직교성(독립성)을 가지는 정보를 전송하는데 적합하다.

References

  • 스프링으로 시작하는 리액티브 프로그래밍 / 황정식 저 / 비제이퍼블릭