[Spring] 리액티브에서의 Scheduler와 Context 그리고 Testing

2025. 10. 20. 01:18·Framework & Library/Spring

Chapter 10. Scheduler

10.1 스레드(Thread)의 개념 이해

  • 물리적인 스레드(Pysical Thread) : CPU 코어를 논리적으로 나눈 것
    • 듀얼코어 4 스레드 (작업관리자 [성능]에서 확인 가능)

 

  • 논리적인 스레드(Logical Thread) : 프로세스 내에서 실행되는 세부 작업의 단위
    • 이론적으로는 제한이 없지만 실제로는 물리적인 스레드의 가용 범위내에서 생성 가능

 

<aside> 💡

  • 병렬성(Parallelism) : 물리적인 스레드가 실제로 동시에 실행됨
  • 동시성(Concurrency) : 동시에 실행되는 것처럼 보이지만 실제로는 아님
    • 무수히 많은 논리적인 스레드가 물리적인 스레드를 아주 빠른 속도로 번갈아 가면서 사용 </aside>

10.2 Scheduler란?

  • 어떤 스레드에서 무엇을 처리할지 제어(경쟁조건 등 고려)를 Scheduler가 개발자 대신 수행
    • 코드가 간결해짐
    • 개발자 부담 적어짐

10.3 Scheduler를 위한 전용 Operator

  • subscribeOn()
    • 구독 발생 직후 실행될 스레드 지정
    • 원본 Publisher 동작 수행을 위한 스레드
    public static void main(String[] args) throws InterruptedException {
        Flux.fromArray(new Integer[] {1, 3, 5, 7})
                .subscribeOn(Schedulers.boundedElastic())
                .doOnNext(data -> log.info("# doOnNext: {}", data))
                .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(500L);
    }
    08:35:03.363 [main] INFO - # doOnSubscribe // 최초 실행 스레드 : main
    08:35:03.371 [boundedElastic-1] INFO - # doOnNext: 1 // 구독 발생 직후 스레드 : boundedElastic
    08:35:03.373 [boundedElastic-1] INFO - # onNext: 1
    08:35:03.373 [boundedElastic-1] INFO - # doOnNext: 3
    08:35:03.373 [boundedElastic-1] INFO - # onNext: 3
    08:35:03.373 [boundedElastic-1] INFO - # doOnNext: 5
    08:35:03.373 [boundedElastic-1] INFO - # onNext: 5
    08:35:03.373 [boundedElastic-1] INFO - # doOnNext: 7
    08:35:03.373 [boundedElastic-1] INFO - # onNext: 7
  • publishOn()
    • Downstream으로 Signal을 전송할 때 실행되는 스레드 제어
    public static void main(String[] args) throws InterruptedException {
        Flux.fromArray(new Integer[] {1, 3, 5, 7})
                .doOnNext(data -> log.info("# doOnNext: {}", data))
                .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(500L);
    }
    08:40:35.020 [main] INFO - # doOnSubscribe
    08:40:35.024 [main] INFO - # doOnNext: 1
    08:40:35.027 [main] INFO - # doOnNext: 3
    08:40:35.027 [parallel-1] INFO - # onNext: 1 // Downstream 실행 스레드 변경
    08:40:35.027 [main] INFO - # doOnNext: 5
    08:40:35.027 [main] INFO - # doOnNext: 7
    08:40:35.027 [parallel-1] INFO - # onNext: 3
    08:40:35.027 [parallel-1] INFO - # onNext: 5
    08:40:35.027 [parallel-1] INFO - # onNext: 7
  • parallel()
    • 병렬성을 가지는 물리적인 스레드
      • cf. subscribeOn(), publishOn()은 동시성을 가지는 논리적인 스레드
    • 라운드 로빈 방식으로 논리적인 코어(물리적인 스레드) 개수만큼의 스레드를 병렬로 실행
    • Example10_3

subscribeOn(), publishOn()를 활용하여 Publisher에서 데이터를 emit하는 스레드와 emit된 데이터를 가공 처리하는 스레드를 적절하게 분리할 수 있음

Scheduler의 종류

  • Schedulers.immediate() : 별도의 스레드를 추가적으로 생성하지 않고, 현재 스레드에서 작업 처리
  • Schedulers.single() : 스레드 하나만 생성해서 Scheduler가 제거되기 전까지 재사용
  • Schedulers.newSingle() : 매번 새로운 스레드 하나 생성
  • Schedulers.boundedElastic() : ExecutorService 기반의 스레드 풀 생성한 후, 정해진 수만큼의 스레드를 사용하여 작업을 처리하고 작업 종료된 스레드는 반납하여 재사용
    • 기본적으로 CPU 코어 수 x 10만큼의 스레드 생성
    • 최대 100,000개의 작업 큐에서 대기 가능
    • Blocking I/O 작업을 효과적으로 처리하기 위한 방식
    • 다른 Blocking I/O 작업이 Non-Blocking 처리에 영향을 주지 않도록 전용 스레드 할당
  • Schedulers.parallel() : Non-Blocking I/O에 최적화되어 있는 Scheduler로서 CPU 코어 수만큼의 스레드 생성
  • Schedulers.fromExecutorService() : 기존에 이미 사용하고 있는 ExecutorService가 있다면 이 ExecutorService로부터 Scheduler를 생성하는 방식
  • Schedulers.newXXXX() : 스레드 이름, 생성 가능한 디폴트 스레드의 개수, 스레드의 유효 시간, 데몬 스레드로의 동작 여부 등 직접 지정해서 커스텀 스레드 풀 생성

Chapter 11. Context

11.1 Context란?

  • 어떠한 상황에서 그 상황을 처리하기 위해 필요한 정보
  • Reactor에서 Context의 정의
    • 전파 : Downstream에서 Upstream 체인상의 각 Operator가 해당 Context 정보를 동일하게 이용할 수 있음
  • Operator 같은 Reactor 구성요소 간에 전파되는 key/value 형태의 저장소
  • Reactor의 Context는 Subscriber와 매핑됨
    • cf. ThreadLocal의 경우 실행 스레드와 매핑됨
  • 즉, 구독이 발생할 때마다 해당 구독과 연결된 하나의 Context가 생김
    public static void main(String[] args) throws InterruptedException {
        Mono
            .deferContextual(ctx ->
                Mono
                    .just("Hello" + " " + ctx.get("firstName"))
                    .doOnNext(data -> log.info("# just doOnNext : {}", data))
            )
            .subscribeOn(Schedulers.boundedElastic())
            .publishOn(Schedulers.parallel())
            .transformDeferredContextual(
                    (mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
            )
            .contextWrite(context -> context.put("lastName", "Jobs"))
            .contextWrite(context -> context.put("firstName", "Steve"))
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
  • contextWrite() Operator로 Context에 데이터 쓰기 작업을 할 수 있다.
  • Context.put()으로 Context에 데이터를 쓸 수 있다.
  • deferContextual() Operator로 Context에 데이터 읽기 작업을 할 수 있다.
  • Context.get()으로 Context에서 데이터를 읽을 수 있다.
  • transformDeferredContextual() Operator로 Operator 중간에서 Context에 데이터 읽기 작업을 수 있다.

11.2 자주 사용되는 Context 관련 API

  • Context API (쓰기)
    • put(key, value) : key/value 형태로 Context에 값을 쓴다.
    • of(key1, value1, key2, value2, ...) : key/value 형태로 Context에 여러 개의 값을 쓴다.
    • pulAll(ContextView) : 현재 Context와 파라미터로 입력된 ContextView를 merge한다.
    • delete(key) : Context에서 key에 해당하는 value를 삭제한다.
  • ContextView API (읽기)
    • get(key) : ContextView에서 key에 해당하는 value를 반환한다.
    • getOrEmpty(key) : ContextView에서 key에 해당하는 value를 Optional로 래핑해서 반환한다.
    • getOrDefault(key, default value) : ContextView에서 key에 해당하는 value를 가져온다. key에 해당하는 value가 없으면 default value를 가져온다.
    • hasKey(key) : ContextView에서 특정 key가 존재하는지를 확인한다.
    • isEmpty() : Context가 비어 있는지 확인한다.
    • size() : Context 내에 있는 key/value의 개수를 반환한다.

11.3 Context의 특징

  • Context는 구독이 발생할 때마다 하나의 Context가 해당 구독에 연결된다.
  • Context는 Operator 체인의 아래에서 위로 전파된다.
    • 모든 Operator에서 Context에 저장된 데이터를 읽어올 수 있으려면 contextWrite()을 Operator 체인의 맨 마지막에 둬야 한다.
    • 동일한 키에 대한 값을 중복해서 저장하면 Operator 체인에서 가장 위쪽에 위치한 contextWrite()이 저장한 값으로 덮어쓴다.
  • Inner Sequence 내부에서는 외부 Context에 저장된 데이터를 읽을 수 있다.
  • Inner Sequence 외부에서는 Inner Sequence 내부 Context에 저장된 데이터를 읽을 수 없다.
  • 인증 정보 같은 직교성(독립성)을 가지는 정보를 전송하는 데 적합하다.

Chapter 12 Debugging

12.1 Reactor에서의 디버깅 방법

  • 비동기, 선언형 프로그래밍 방식의 Reactor는 디버깅이 쉽지 않음

12.1.1 Debug Mode를 사용한 디버깅

  • Hooks.onOperatorDebug()를 통해 디버그 모드 활성화 (Example12_1)
  • 체인상에서 에러가 시작된 지점부터 에러 전파 상태를 표시
  • 비용이 많이 드는 동작
    • 스택트레이스 캡처
    • 에러가 발생한 Assembly의 스택트레이스를 원본 스택트레이스 중간에 끼워 넣음
    • 운영 환경에서는 reactor.tools.agent.ReactorDebugAgent 이용
  • IntelliJ에서는 [Enable Reactor Debug mode]에서 Hooks.onOperatorDebug() 또는 ReactorDebugAgent.init() 중 택1 하여 활성화 가능

12.1.2 checkpoint( ) Operator를 사용한 디버깅

  • checkpoint() Operator를 사용하여 특정 Operator 체인 내의 스택트레이스를 캡처
    • 각 Operator 체인에 checkpoint()를 추가한 후, 범위를 좁혀 가면서 단계적으로 에러 발생 지점을 찾는다.
  • Traceback 출력 (Example12_2)
  • .checkpoint()
  • Traceback 출력 없이 식별자를 포함한 Description 출력 (Example12_4)
  • .checkpoint("Example12_4.zipWith.checkpoint")
  • Traceback과 Description 모두 출력 (Example12_5)
    • forceStackTrace=true로 지정
    • .checkpoint("Example12_4.zipWith.checkpoint", true)

12.1.3 log( ) Operator를 사용한 디버깅

  • log() Operator를 추가하여 Reactor Signal 출력 (Example12_7)

Chapter 13. Testing

  • StepVerifier - Operator 체인의 다양한 동작 방식 테스트
    • Flux 또는 Mono를 Reactor Sequence로 정의한 후, 구독 시점에 해당 Operator 체인이 사니리오대로 동자갛는지 테스트
    • Signal 이벤트 테스트
      • create() 통해 Sequence 생성
      • expectXXXX()를 통해 예상되는 Signal 기대값 평가
      • verifyXXXX()로 검증
  • TestPublisher - Signal을 발생시키며 원하는 상황을 미세하게 재연하며 테스트 진행
  • PublisherProbe - Sequence의 실행 경로를 테스트

'Framework & Library > Spring' 카테고리의 다른 글

[Spring] Spring Profile을 활용한 환경 구축  (1) 2026.01.18
[Spring] Spring Security 겉핥기  (0) 2025.12.06
[Spring] Backpressure와 Sinks 이해하기  (0) 2025.10.20
[Spring] 리액티브 시스템? 리액티브 프로그래밍?  (0) 2025.10.17
[Spring] 리액티브 프로그래밍을 들어가며  (0) 2025.10.12
'Framework & Library/Spring' 카테고리의 다른 글
  • [Spring] Spring Profile을 활용한 환경 구축
  • [Spring] Spring Security 겉핥기
  • [Spring] Backpressure와 Sinks 이해하기
  • [Spring] 리액티브 시스템? 리액티브 프로그래밍?
ryuwon
ryuwon
여러 개발 정보 끄적이고 있습니닷..
  • ryuwon
    이름 없는 블로그
    ryuwon
  • 글쓰기 관리
  • 전체
    오늘
    어제
    • 분류 전체보기 (34)
      • Series (0)
      • Programming (1)
        • Java (1)
        • C (0)
        • Swift (0)
      • Framework & Library (8)
        • Spring (6)
        • Spring Boot (2)
      • Data & ORM (0)
        • RDBMS (0)
        • NoSQL (0)
        • ORM (0)
      • Infra & DevOps (1)
        • Cloud (0)
        • DevOps (1)
        • Infra (0)
      • Knowledge (4)
        • 자료구조 (1)
        • 알고리즘 (3)
        • 운영체제 (2)
        • 네트워크 (1)
        • 아키텍쳐 및 디자인 패턴 (0)
        • 개발지식 (3)
      • Testing (0)
      • Security & System (0)
      • Project (5)
      • Writing (1)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    프로젝트
    OCI
    K3S
    찐빵
    Spring Profile
    네트워크
  • 최근 댓글

  • hELLO· Designed By정상우.v4.10.5
ryuwon
[Spring] 리액티브에서의 Scheduler와 Context 그리고 Testing
상단으로

티스토리툴바