Backend & Spring (스프링)/Reactive&Webflux

Project Reactor 디버깅하기 - Debug Project Reactor

jw92 2024. 3. 14. 23:15

디버깅의 어려움

기존의 Blocking 프로그래밍에서 Exception이 발생한 경우

Exception의 Stacktrace 를 출력하여 확인하거나

Breakpoint를 걸어서 문제가 발생한 원인을 단계적으로 찾아가면 되기 때문에 상대적으로 디버깅이 쉽다.

 

하지만 Reactor는 대부분 비동기적으로  실행되며 선언형 프로그래밍 방식으로 구성되므로 디버기이 쉽지 않다.

이를 위해 Reactor에서 몇 가지 방법을 제공한다.

 

디버그 모드 (Debug Mode)

Hooks.onOperatorDebug()

Debug Mode 활성화하는 코드

 

    static {
        fruits.put("banana", "바나나");
        fruits.put("apple", "사과");
        fruits.put("pear", "배");
        fruits.put("grape", "포도");
    }

    public static void main(String[] args) throws InterruptedException {
        Hooks.onOperatorDebug();

        Flux
                .fromArray(new String[]{"BANANAS", "APPLES", "PEARS", "MELONS"})
                .subscribeOn(Schedulers.boundedElastic())
                .publishOn(Schedulers.parallel())
                .map(String::toLowerCase)
                .map(fruit -> fruit.substring(0, fruit.length() - 1))
                .map(fruits::get)
                .map(translated -> "맛있는 " + translated)
                .subscribe(
                        log::info,
                        error -> log.error("# onError:", error));

        Thread.sleep(100L);
    }

위와 같은 코드를 실행한다고 해보자.

15:08:17.052 [main] DEBUG- Using Slf4j logging framework
15:08:17.158 [parallel-1] INFO - 맛있는 바나나
15:08:17.160 [parallel-1] INFO - 맛있는 사과
15:08:17.160 [parallel-1] INFO - 맛있는 배
15:08:17.170 [parallel-1] ERROR- # onError:
java.lang.NullPointerException: The mapper [chapter12.Example12_1$$Lambda$82/0x000000080018a780] returned a null value.
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Process finished with exit code 0

Debug Mode 활성화 전의 에러 메시지

이 에러코드 만으로는 어떤 Operator에서 에러가 발생했는지 확인하기 쉽지 않다.

 

 

Debug Mode가 활성화

15:12:21.502 [main] DEBUG- Enabling stacktrace debugging via onOperatorDebug
15:12:21.704 [parallel-1] INFO - 맛있는 바나나
15:12:21.705 [parallel-1] INFO - 맛있는 사과
15:12:21.706 [parallel-1] INFO - 맛있는 배
15:12:21.725 [parallel-1] ERROR- # onError:
java.lang.NullPointerException: The mapper [chapter12.Example12_1$$Lambda$74/0x0000000800182af0] returned a null value.
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
	reactor.core.publisher.Flux.map(Flux.java:6271)
	chapter12.Example12_1.main(Example12_1.java:35)
Error has been observed at the following site(s):
	*__Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:35)
	|_ Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:36)
Original Stack Trace:
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
		at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
		at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
		at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
		at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
		at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
		at java.base/java.lang.Thread.run(Thread.java:833)

Debug Mode 활성화 후의 에러 메시지

Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
	reactor.core.publisher.Flux.map(Flux.java:6271)
	chapter12.Example12_1.main(Example12_1.java:35)
Error has been observed at the following site(s):
	*__Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:35)
	|_ Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:36)

자세히 살펴보면 에러 메시지와 스택트레이스 사이에 위 내용이 추가되었다.

1. 에러가 발생한 지점 Example12_1.java:35

2. 에러가 발생한 지점부터 에러 전파 상태 Example12_1.java:35에서 발생하여 Example12_1.java:36으로 전파

 

이 디버그 모드는 모든 Operator의 스택트레이스를 캡처하므로 처음부터 디버그 모드를 활성화하는 것은 권장하지 않음

 

checkpoint() Operator 사용

디버그 모드가 어플리케이션 내에 있는 모든 Operator의 스택트레이스를 캡처하는 반면

checkpoint() Operator는 특정 Operator 체인 내의 스택트레이스만 캡처

 

Traceback 출력

Assembly 란?

Operator들을 연결하는 과정에서 새로운 Mono 또는 Flux가 선언되는 지점

Flux<String> flux = Flux.just("Hello", "World")
                        .map(s -> s.toUpperCase())
                        .filter(s -> s.startsWith("H"));

예를 들면, 위에서 just, map, filter 모두 Assembly

 

Traceback 이란?

에러가 발생한 Operator의 Assembly에 대한 스택트레이스와 에러 메시지 등 디버깅 정보

 

checkpoint()를 사용하면 assembly 지점 또는 에러가 전파된 assembly 지점의 traceback이 추가됨.

    public static void main(String[] args) {
        Flux
            .just(2, 4, 6, 8)
            .zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
            .map(num -> num + 2)
            .subscribe(
                    data -> log.info("# onNext: {}", data),
                    error -> log.error("# onError:", error)
            );
    }

위 코드를 실행한다면,

실제 에러가 발생한 곳은 zipWith 이며 이것이 전파된 곳은 map이다.

 

15:48:27.738 [main] INFO - # onNext: 4
15:48:27.740 [main] INFO - # onNext: 4
15:48:27.740 [main] INFO - # onNext: 4
15:48:27.747 [main] ERROR- # onError:
java.lang.ArithmeticException: / by zero
	at chapter12.Example12_2.lambda$main$0(Example12_2.java:15)
	at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)

기본 에러메시지

 

Assembly trace from producer [reactor.core.publisher.FluxMap] :
	reactor.core.publisher.Flux.checkpoint(Flux.java:3352)
	chapter12.Example12_2.main(Example12_2.java:17)
Error has been observed at the following site(s):
	*__checkpoint() ⇢ at chapter12.Example12_2.main(Example12_2.java:17)

map 밑에 .checkpoint()를 추가한다면,

17번 라인인 checkpoint()까지는 에러가 전파되었음을 확인할 수 있다.

 

Error has been observed at the following site(s):
	*__checkpoint() ⇢ at chapter12.Example12_2.main(Example12_2.java:16)
	|_ checkpoint() ⇢ at chapter12.Example12_2.main(Example12_2.java:18)

zipWith 밑에 checkpoint()를 추가한다면,

zipWith와 map이 모두 에러와 연관됨을 확인할 수 있다.

Operator 순서에 따라 zipWith에서 에러가 발생해 map에 전파되었고,

따라서 zipWith()에서 직접적으로 에러가 발생했음을 예상할 수 있다.

 

Description

public class Example12_2 {
    public static void main(String[] args) {
        Flux
            .just(2, 4, 6, 8)
            .zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
            .checkpoint("zipWith")
            .map(num -> num + 2)
            .checkpoint("map")
            .subscribe(
                    data -> log.info("# onNext: {}", data),
                    error -> log.error("# onError:", error)
            );
    }
}

Traceback 대신 Description 출력을 원한다면 위와 같이 description parameter를 추가할 수 있다.

Error has been observed at the following site(s):
	*__checkpoint ⇢ zipWith
	|_ checkpoint ⇢ map

Description이 출력된다.

 

.checkpoint("zipWith", true)

Description과 Traceback을 모두 출력하고 싶다면, 위와 같이 forceStackTrace를 true로 변경해준다.

 

log() Operator

public class Example12_7 {
    public static Map<String, String> fruits = new HashMap<>();

    static {
        fruits.put("banana", "바나나");
        fruits.put("apple", "사과");
        fruits.put("pear", "배");
        fruits.put("grape", "포도");
    }

    public static void main(String[] args) {
        Flux.fromArray(new String[]{"BANANAS", "APPLES", "PEARS", "MELONS"})
                .map(String::toLowerCase)
                .map(fruit -> fruit.substring(0, fruit.length() - 1))
                .log()
                .map(fruits::get)
                .subscribe(
                        log::info,
                        error -> log.error("# onError:", error));
    }
}

map(fruit -> ...) 다음 라인에 log()를 추가한 경우,

15:56:10.012 [main] INFO - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
15:56:10.018 [main] INFO - | request(unbounded)
15:56:10.019 [main] INFO - | onNext(banana)
15:56:10.020 [main] INFO - 바나나
15:56:10.020 [main] INFO - | onNext(apple)
15:56:10.020 [main] INFO - 사과
15:56:10.020 [main] INFO - | onNext(pear)
15:56:10.021 [main] INFO - 배
15:56:10.021 [main] INFO - | onNext(melon)
15:56:10.029 [main] INFO - | cancel()
15:56:10.032 [main] ERROR- # onError:

바나나, 사과, 배 까지 실패한 이후에 onNext(melon)에서 에러가 발생했음을 알 수 있다.

.log() 바로 앞의 Operator만 로깅해주는 것으로 보인다.

checkpoint()는 스트림의 상태를 기록하여 Traceback을 찾기 쉽다면,

log()는 세부적인 로깅이나 특정 이벤트(Input)의 추적에는 적합한 것으로 보인다.

 

.log("category", Level.FINE)

등을 통해 log level을 DEBUG 등으로 변경해줄 수 있다.

'Backend & Spring (스프링) > Reactive&Webflux' 카테고리의 다른 글

Project Reactor 테스트  (0) 2024.03.14