Spring WebFlux – programowanie reaktywne w Springu


Spring WebFlux – programowanie reaktywne w Springu

Spring WebFlux (reaktywny odpowiednik Spring MVC) to moduł wprowadzony w Springu 5 (Spring Boot 2) który umożliwia pisanie aplikacji z użyciem podejścia reaktywnego. Dlaczego potrzebujemy reaktywności w Javie? Uzyskanie w czystej Javie kodu asynchronicznego który jest czytelny i łatwy w utrzymaniu to trudna sprawa. Programowanie reaktywne rozwiązuje ten problem. Jest to temat szczególnie istotny przy przetwarzaniu dużych zbiorów danych.

Przed Javą 9 programowanie reaktywne mogliśmy uzyskać za pomocą bibliotek takich jak RxJava czy Reactor (Spring WebFlux wykorzystuje bibliotekę Reactor). Zdecydowanie największą zaletą aplikacji napisanych reaktywnie jest zdolność do ich skalowania przy zachowaniu niedużej, stałej ilości dostępnych wątków i mniejszej ilości pamięci. Mogą natomiast pojawić się opóźnienia ze względu na sterowanie wątkami.

Kontener servletów Apache Tomcat z użyciem specyfikacji Java Servlet < 3.1 każdy request obsługuje w osobnym wątku (pula wątków dla Apache Tomcat wynosi 200). Wyczerpanie ilości dostępnych wątków z puli prowadzi do blokady – kolejne żądania muszą czekać na ‘wolny’ wątek w puli. Apache Tomcat który bazuje na specyfikacji Java Servlet >= 3.1 pozwala na obsługę requestów w sposób asynchroniczny! Co to oznacza? Jeśli kasjer potrzebuje pomocy przełożonego ze względu na reklamację klienta to zamiast blokować całą kolejkę (requesty) czekając na pomoc przełożonego obsługuje kolejne osoby stojące przy kasie nie blokując całej kolejki. Jeśli pojawi się przełożony to dopiero wtedy kasjer razem z przełożonym rozpatrują reklamację klienta. Jest to tzw. serwer nieblokujący w którym zazwyczaj jest 1-2 wątki na rdzeń procesora. Zazwyczaj liczba wątków nie jest większa niż liczba rdzeni procesora. Serwer nieblokujący daje możliwość przetworzenia większej liczby żądań bez utraty wydajności i stabilności!

Dokumentowa baza MongoDB posiada specjalny sterownik dla Javy, który implementuje nieblokujące API – mongodb-driver-reactivestream. Asynchroniczny sterownik do bazy nie blokuje żadnego wątku (JDBC do takich nie należy)!

Korzystając z modułu Spring WebFlux należy poznać dwa podstawowe pojęcia:

  • Flux<T> – do zwracania strumienia z 0..n danymi, implementuję interfejs Publisher<T>,
  • Mono<T> – do zwracania strumienia z 0..1 danymi, implementuję interfejs Publisher<T> np. Mono<Void> nic nie zwraca.

Do dzieła, tworzymy przykładową aplikację w oparciu o Spring WebFlux – plik pom.xml!

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-webflux</artifactId>
	</dependency>
 
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
	</dependency>
 
	<dependency>
		<groupId>de.flapdoodle.embed</groupId>
		<artifactId>de.flapdoodle.embed.mongo</artifactId>
		<scope>runtime</scope>
	</dependency>
 
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
	</dependency>
 
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-test</artifactId>
		<scope>test</scope>
	</dependency>
</dependencies>

Zależność do de.flapdoodle.embed.mongo pozwala na symulację produkcyjnej bazy danych MongoDB. Zależność do spring-boot-starter-data-mongodb-reactive pozwala na wykorzystanie reaktywnych repozytoriów MongoDB. Testy jednostkowe możliwe są natomiast dzięki zależności do reactor-test.

Tworzymy model (dokument) – klasa Training:

@Document
public class Training {
 
    private String id;
    private String name;
 
    public Training() {
    }
 
    public Training(String id, String name) {
        this.id = id;
        this.name = name;
    }
 
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Training training = (Training) o;
        return Objects.equals(name, training.name);
    }
 
    @Override
    public int hashCode() {
        return Objects.hash(name);
    }
 
    public String getId() {
        return id;
    }
 
    public void setId(String id) {
        this.id = id;
    }
 
    public String getName() {
        return name;
    }
 
    public void setName(String name) {
        this.name = name;
    }
}

Tworzymy reaktywne repozytorium – TrainingRepository :

@Repository
public interface TrainingRepository extends ReactiveMongoRepository<Training, String> {
}

Tworzymy RestControllerTrainingController :

@RestController
public class TrainingController {
 
    private final TrainingRepository trainingRepository;
 
    public TrainingController(TrainingRepository bookRepository) {
        this.trainingRepository = bookRepository;
    }
 
    @GetMapping(value = "/trainings", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Training> getBooks() {
        return trainingRepository.findAll().delayElements(Duration.ofSeconds(1));
    }
}

APPLICATION_STREAM_JSON_VALUE – aplikacja zwraca strumień elementów, każdy element będzie zserializowany do formatu JSON.

Klasa konfiguracyjna – konieczne jest wskazanie lokalizacji reaktywnych repozytoriów – ReactiveMongoRepository:

@Configuration
@EnableReactiveMongoRepositories(basePackageClasses = TrainingRepository.class)
public class MongoConfiguration {
}

Zaraz po wystartowaniu plikacji uzupełniamy bazę danych MongoDB przykładowymi danymi:

@Component
public class SampleDataInitializer implements ApplicationListener<ApplicationReadyEvent> {
 
    private TrainingRepository trainingRepository;
 
    public SampleDataInitializer(TrainingRepository repository) {
        this.trainingRepository = repository;
    }
 
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
 
        Flux<Training>  trainingFlux = trainingRepository
                .findAll()
                .thenMany(
                        Flux
                                .just("JAVA", "SPRING")
                                .map(name -> new Training(UUID.randomUUID().toString(), name))
                                .flatMap(trainingRepository::save)
                );
 
        Mono<Void> all = Mono.when(trainingFlux);
        all.block();
 
        trainingRepository.findAll().subscribe(training -> System.out.println(training.getName()));
 
    }
}

W pliku application.properties dodajemy niezbędną konfigurację:

server.port             = 7774
spring.data.mongodb.uri = mongodb://localhost:27017/test

Po wejściu na adres:

http://localhost:7774/trainings

w odstępie 1 sekundy podawane są dane (dokumenty z bazy MongoDB):

Przykładowy test jednostkowy aplikacji w którym weryfikujemy czy dane zwracane są prawidłowo:

@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureWebTestClient
public class TrainingControllerTest {
 
    @Autowired
    WebTestClient webTestClient;
 
    @Autowired
    private ApplicationContext context;
 
    @Autowired
    private  TrainingRepository trainingRepository;
 
    @Before
    public void setUp()  {
        webTestClient = webTestClient
                .mutate()
                .responseTimeout(Duration.ofSeconds(50))
                .build();
    }
 
    @Test
    public void trainingsTest() {
 
        trainingRepository.findAll().subscribe(training -> System.out.println("[TRAINING TEST] " + training.getName()));
 
        Flux<Training> trainingStreamFlux = webTestClient.get().uri("/trainings")
                .accept(MediaType.APPLICATION_STREAM_JSON)
                .exchange()
                .expectStatus().isOk()
                .returnResult(Training.class)
                .getResponseBody();
 
        StepVerifier.create(trainingStreamFlux)
                .expectNext( new Training(UUID.randomUUID().toString(), "JAVA"))
                .expectNext( new Training(UUID.randomUUID().toString(), "SPRING"))
                .thenCancel()
                .verify();
    }
}

Jak wystawić usługę sieciową w sposób funkcyjny – czyli tworzymy uchwyt na usługę!

@Component
public class TrainingHandler {
 
    private final TrainingRepository trainingRepository;
 
    public TrainingHandler(TrainingRepository trainingRepository) {
        this.trainingRepository = trainingRepository;
    }
 
    public Mono<ServerResponse> getTrainings(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_STREAM_JSON)
                .body(trainingRepository.findAll().delayElements(Duration.ofSeconds(5)),
                  Training.class);
    }
}

Rejestrujemy beana RouterFunction w klasie konfiguracyjnej – łączenie kodu biznesowego z serwerem WWW:

@Configuration
@EnableReactiveMongoRepositories(basePackageClasses = TrainingRepository.class)
public class MongoConfiguration {
    @Bean
    public RouterFunction<ServerResponse> booksRoute(TrainingHandler trainingHandler) {
        return RouterFunctions.route(GET("/trainings-handler"), trainingHandler::getTrainings);
    }
}

Po wejściu na adres:

http://localhost:7774/trainings-handler

otrzymujemy wynik:

Moduł WebFlux daje możliwość wywoływania zewnętrznych usług restowych w sposób reaktywny:

@Test
public void trainingsTest() {
    WebClient.create("http://localhost:7774")
            .get()
            .uri("/trainings")
            .retrieve()
            .bodyToFlux(Training.class)
            .doOnNext(training -> System.out.println("[training]: " + training.getName()))
            .blockLast();
}

użycie metody blockLast() jest wymagane ze względu na to, że główny wątek aplikacji w testach może zakończyć się przed zwróceniem wyniku.

Zobacz kod na GitHubie i zapisz się na bezpłatny newsletter!

.

2 Comments

  1. Cześć!
    Fajny tutorial. Zaciekawiła mnnie struktura testów. W funkcji ‘trainingsTest()’ (tej pierwszej) mamy:
    .expectNext( new Training(UUID.randomUUID().toString(), “JAVA”))
    .expectNext( new Training(UUID.randomUUID().toString(), “SPRING”))

    Taki ‘assert’ nigdy nie przejdzie, ponieważ zawsze porównujemy 2 różne obiekty – pole ID będzie inne w każdym przypadku.
    Wydaje mi się, że moglibyśmy ten kod zastąpić takim:
    .expectNextMatches(training -> training.getName().equals(“JAVA”))
    .expectNextMatches(training -> training.getName().equals(“SPRING”))

    Jest to celowe uproszczenie czy coś przeoczyłem?

  2. marcin warycha - javaleader.pl 7 lutego 2020 at 14:54

    Cześć Michał,

    do metody .expectNext przekazujesz obiekt klasy Training:

    .expectNext( new Training(UUID.randomUUID().toString(), “JAVA”) )

    zatem tutaj Michał nie masz asercji tylko przekazanie nowego obiektu do metody .expectNext 😉

Leave a comment

Your email address will not be published.


*