Backend & Spring (스프링)/Reactive&Webflux

Project Reactor 테스트

jw92 2024. 3. 14. 23:23
TestPublisher<Integer> source =
        TestPublisher.createNoncompliant(TestPublisher.Violation.ALLOW_NULL);

StepVerifier

Operator 체인이 시나리오대로 동작하는지 테스트

StepVerifier
        .create(Mono.just("Hello Reactor")) // 테스트 대상 Sequence 생성
        .expectNext("Hello Reactor")    // emit 된 데이터 검증
        .as("expect Hello Reactor")
        .expectComplete()   // onComplete Signal 검증
        .verify();          // 검증 실행.

 

expectSubscription()
expectComplete()
expectError()
expectNext(T t) - onNext로 전달되는 값
expectNextCount(long count)
expectNextSequence(Iterable<? extends T> iterable)
expectNoEvent(Duration duration)
expectAccessibleContext() - 구독 시점 이후에 Context가 전파

verify()
verifyComplete() - onComplete
verifyError() - onError
verifyTimeout(Duration duration) - 검증 실행 후 duration이 지나도 Publisher가 종료되지 않음을 기대

as("description: expect abcd") - 기댓값 평가 단계에 대한 설명 추가

 

StepVerifierOptions

StepVerifier
        .create(GeneralTestExample.takeNumber(source, 500),
                StepVerifierOptions.create().scenarioName("Verify from 0 to 499"))

테스트 실패할 경우에 출력되는 시나리오 이름

 

Time-Based Test

public void getCOVID19CountTest() {
    StepVerifier
            .withVirtualTime(() -> TimeBasedTestExample.getCOVID19Count(
                            Flux.interval(Duration.ofHours(1)).take(1)
                    )
            )
            .expectSubscription()
            .then(() -> VirtualTimeScheduler
                                .get()
                                .advanceTimeBy(Duration.ofHours(1)))

미래에 실행되는 Reactor Sequence를 시간을 앞당겨 테스트

 

.expectNoEvent(Duration.ofMinutes(1))

아무런 이벤트가 일어나지 않음과 동시에 시간을 앞당김

 

BackPressure 테스트

public static Flux<Integer> generateNumber() {
    return Flux
            .create(emitter -> {
                for (int i = 1; i <= 100; i++) {
                    emitter.next(i);
                }
                emitter.complete();
            }, FluxSink.OverflowStrategy.ERROR);
}

 

@Test
public void generateNumberTest() {
    StepVerifier
            .create(generateNumber(), 1L)
            .thenConsumeWhile(num -> num >= 1)
            .verifyComplete();
}

 

테스트 결과는? failed

100개를 요청하는데 1개만 보냈기 때문에 Overflow

 

StepVerifier
        .create(BackpressureTestExample.generateNumber(), 1L)
        .thenConsumeWhile(num -> num >= 1)
        .expectError()
        .verifyThenAssertThat()
        .hasDroppedElements();

위와 같이 expectError()와 hasDroppedElements()로 내부적으로 drop된 데이터가 있음을 Assertions


Context 테스트

Context 테스트 역시 StepVerifer을 이용하여 테스트 가능

public static Mono<String> getSecretMessage(Mono<String> keySource) {
    return keySource
            .zipWith(Mono.deferContextual(ctx ->
                                           Mono.just((String)ctx.get("secretKey"))))
            .filter(tp ->
                        tp.getT1().equals(
                               new String(Base64Utils.decodeFromString(tp.getT2())))
            )
            .transformDeferredContextual(
                    (mono, ctx) -> mono.map(notUse -> ctx.get("secretMessage"))
            );
}

 

@Test
public void getSecretMessageTest() {
    Mono<String> source = Mono.just("hello");

    StepVerifier
            .create(
                ContextTestExample
                    .getSecretMessage(source)
                    .contextWrite(context ->
                                    context.put("secretMessage", "Hello, Reactor"))
                    .contextWrite(context -> context.put("secretKey", "aGVsbG8="))
            )
            .expectSubscription()
            .expectAccessibleContext()
            .hasKey("secretKey")
            .hasKey("secretMessage")
            .then()
            .expectNext("Hello, Reactor")
            .expectComplete()
            .verify();
}

 

Record 테스트

단순히 "값"만 검사하는 것이 아닌 좀 더 세밀한 조건으로 검사하기 위해

emit된 데이터를 기록하고 이를 Assertion

@Test
public void getCountryTest() {
    StepVerifier
            .create(RecordTestExample.getCapitalizedCountry(
                    Flux.just("korea", "england", "canada", "india")))
            .expectSubscription()
            .recordWith(ArrayList::new)
            .thenConsumeWhile(country -> !country.isEmpty())
            .consumeRecordedWith(countries -> {
                assertThat(
                        countries
                                .stream()
                                .allMatch(country ->
                                        Character.isUpperCase(country.charAt(0))),
                        is(true)
                );
            })
            .expectComplete()
            .verify();
}

recordWith(ArrayList::new) 로 기록
thenConsumeWhile()로 Predicate과 일치하는 데이터만 다음 단계에서 소비

consumeRecordedWith()로 기록된 데이터 소비

 

TestPublisher

Well-behaved TestPublihser

emit하는 데이터가 Null인지 요청 개수보다 더 많은 Data를 emit하는지 등 사양 위반 여부를 사전에 체크

@Test
public void divideByTwoTest() {
    TestPublisher<Integer> source = TestPublisher.create();

    StepVerifier
            .create(GeneralTestExample.divideByTwo(source.flux()))
            .expectSubscription()
            .then(() -> source.emit(2, 4, 6, 8, 10))
            .expectNext(1, 2, 3, 4)
            .expectError()
            .verify();
}

1. TestPublisher 생성

2. create()를 이용하여 flux로 변환

3. 필요한 Data를 Emit

Misbehaving TestPublihser

사양 위반 여부를 체크하지 않고 데이터를 emit

 

PublisherProbe

Sequence의 실행 경로를 테스트

public static Mono<String> processTask(Mono<String> main, Mono<String> standby) {
    return main
            .flatMap(massage -> Mono.just(massage))
            .switchIfEmpty(standby);
}

주전력이 끊겼을 때, 예비전력을 사용하는 로직

@Test
public void publisherProbeTest() {
    PublisherProbe<String> probe =
            PublisherProbe.of(Mono.just("# supply Standby Power"));

    StepVerifier
            .create(PublisherProbeTestExample
                    .processTask(
                            Mono.empty(),
                            probe.mono())
            )
            .expectNextCount(1)
            .verifyComplete();

    probe.assertWasSubscribed();
    probe.assertWasRequested();
    probe.assertWasNotCancelled();
}

 

  1. wasSubscribed(): 리액티브 시퀀스가 구독되었는지 확인합니다. 이 메서드는 리액티브 시퀀스가 어떤 구독도 받지 않았을 때 테스트를 실패시킵니다.
  2. wasRequested(): 구독된 리액티브 시퀀스가 요청을 받았는지 확인합니다. 이 메서드는 시퀀스가 요청되지 않았을 때 테스트를 실패시킵니다.
  3. assertWasSubscribed(): wasSubscribed()와 동일한 역할을 합니다. 리액티브 시퀀스가 구독되지 않은 경우 테스트를 실패시킵니다.
  4. assertWasNotSubscribed(): 리액티브 시퀀스가 구독되지 않은지 확인합니다. 시퀀스가 구독되었을 때 테스트를 실패시킵니다.
  5. assertWasRequested(): wasRequested()와 동일한 역할을 합니다. 구독된 리액티브 시퀀스가 요청을 받지 않았을 때 테스트를 실패시킵니다.
  6. assertWasNotRequested(): 구독된 리액티브 시퀀스가 요청을 받지 않았는지 확인합니다. 시퀀스가 요청을 받았을 때 테스트를 실패시킵니다.