diff --git a/README.md b/README.md index 7d4ad47e..e5daaffc 100644 --- a/README.md +++ b/README.md @@ -110,4 +110,4 @@ Spring reactive web - ##### Spring Data reactive -- +- \ No newline at end of file diff --git a/common-wiki/pom.xml b/common-wiki/pom.xml index 8a170fb0..4a647501 100644 --- a/common-wiki/pom.xml +++ b/common-wiki/pom.xml @@ -23,7 +23,7 @@ reactor-core - io.projectreactor.addons + io.projectreactor reactor-test test diff --git a/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java b/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java index 59e47392..9947e43f 100644 --- a/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java +++ b/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java @@ -55,7 +55,7 @@ public Mono getArticleNonBlocking(String name) { return Mono.just(name) .map(this::buildQueryUrl) .flatMap(this.client::get) - .flatMap(this::httpResponse2String) + .flatMapMany(this::httpResponse2String) .reduce(String::concat) .map((json) -> parseJsonContend(json, name)); diff --git a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java index 5ab4b8eb..0f529e2c 100644 --- a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java +++ b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java @@ -7,9 +7,10 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.time.Duration; import java.util.List; +import static java.time.Duration.ofMillis; + /** * @author Michael Omann * @author Andreas Keefer @@ -34,8 +35,8 @@ private DelayProxy(Object obj, DelayFunction delayFunction) { @Override protected Publisher handlePublisherReturnType(Publisher publisher, Method m, Object[] args) { - return Mono.defer(() -> Mono.just(1).delayElement(Duration.ofMillis(delayFunction.delay(m.getName())))) - .flatMap(next -> publisher); + return Mono.defer(() -> Mono.just(1).delayElement(ofMillis(delayFunction.delay(m.getName())))) + .flatMapMany(next -> publisher); } @Override diff --git a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java index 5f322421..370abd1e 100644 --- a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java +++ b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java @@ -36,7 +36,7 @@ protected Publisher handlePublisherReturnType(Publisher publisher, Method return Mono.defer(() -> { flakinessFunction.failOrPass(m.getName()); return Mono.just(1); - }).flatMap(next -> publisher); + }).flatMapMany(next -> publisher); } @Override diff --git a/docs/stand_zero.md b/docs/stand_zero.md index 687beb8e..497019eb 100644 --- a/docs/stand_zero.md +++ b/docs/stand_zero.md @@ -1,5 +1,5 @@ # Stand ZERO -## webiste +## website - Website wird nicht gelöscht - als extra top 5 diff --git a/docs/uebungen.md b/docs/uebungen.md index 9e9eb4ef..b6e5add7 100644 --- a/docs/uebungen.md +++ b/docs/uebungen.md @@ -1,12 +1,4 @@ -1. article fetch - - as -- read event + frontend - - frontend read geht schon -- stats consumer -- stats endpoint -- rating wordcount endpoints -- optimieren mit batch -- backpressure handeln + Sprint 1 - your first nonblocking Endpoint ======== @@ -14,7 +6,7 @@ Sprint 1 - your first nonblocking Endpoint This endpoint signature is currently like an traditional Spring MVC endpoint. Change the signature to the 'reactive-way' and implement this service. Hint: use `ArticleService#fetchArticle` to fetch the article from Wikipedia. -- Write Unittests (`WikiControllerTest`) and integration tests (`WikiControllerIntegrationTest`) +- Ensure Unittests (`WikiControllerTest`) and integration tests (`WikiControllerIntegrationTest`) are still running - Start the spring boot application (see `services/README.md`) and try out your new endpoint and fetch an article, e.g with curl, an browser or an RESTClient @@ -23,43 +15,22 @@ Sprint 2 - stream articles to the frontend - Have a look at `WikiController#getReadStream`. This endpoint signature is currently like an traditional Spring MVC endpoint. Change the signature to the 'reactive-way' and implement this service. - For each call to `WikiController#fetchArticle` there should be emitted an Event on - this `#getReadStream`. - Hint: you need a `org.reactivestreams.Processor`, have a look what reactor offers. -- Write Unittests (`ReadEventTest`) and integration tests (`WikiControllerIntegrationTest`) -- start ms-wikiloader and ms-statistics. ms-statistics contains a basic frontend, which consumes - the `#getReadStream` endpoint and draws a diagram. The URL is printed out in the console after - startup of the service. -- execute some calls to the `#fetchArticle` endpoint and watch the diagram. -- Start jmeter (see `services/README.md`), open an provided testplan (*.jmx) and give some load on `#fetchArticle`. - At this point it is better to start the wikiloader service in mock mode - by activating the profile `mock` (see `services/README.md`). This should be the default - from now on. - -Sprint 3 - getReadStream optimization -======== -- Maybe you noticed some problems when jmeter put heavy load on `#fetchArticle` - When you publish every single 'articleRead'-Event on it's own, spring will flush - the HTTP connection to the frontend for each event, this produces a lot of overhead. - Now try to reduce the overhead and deliver the Events in batches. - Hint: A good batch size could be 250 milliseconds -> 4 flushes per second. -- Write tests -- Test it with jmeter and the frontend - -Sprint 4 -======== - + Hint: You have to use a Processor, which acts as a subscriber and as a publisher. + In `WikiController` there is already a Processor named `readArticles`, use this one. +- Activate the Unittests in `ReadEventTest` and `WikiControllerIntegrationTest` and look if they run successfully. +- Start the sprig boot application and also the ms-statistics service which contains a frontend (see `services/README.md`) + and try out your endpoints -Sprint 5 +Sprint 3 ======== +- Have a look at `WikiController#countWords`. + This Implementation is not too bad, but is's blocking. Change it to non-blocking. +- Ensure Unittests (`WikiControllerTest`) and are still running successfully + and active integration tests (`WikiControllerIntegrationTest`) - -Sprint 6 -======== - - -Sprint 7 +Sprint 4 ======== +? Sprint X.1 - improved caching (reactive datastore) @@ -72,9 +43,9 @@ Sprint X.1 - improved caching (reactive datastore) Use Spring Data to query the Database - After some time (e.g. 1 minute) you want to expire the data. Implement a reacive timed job which drops outdated data. - + Sprint X.2 - visualizes backpressure buffers ======== - add a diagram in the frontend which visualizes backpressure buffers in the streams. add a new statistics endpoint which collects and processes this data and delivers it to the frontend - + \ No newline at end of file diff --git a/kata-reactor/pom.xml b/kata-reactor/pom.xml index 64a25ee5..497099e2 100644 --- a/kata-reactor/pom.xml +++ b/kata-reactor/pom.xml @@ -31,7 +31,7 @@ reactor-core - io.projectreactor.addons + io.projectreactor reactor-test test diff --git a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java index 4e49e184..691f2fa4 100644 --- a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java +++ b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java @@ -58,7 +58,7 @@ private Mono queryTeamReport(String teamName) { .map(sum -> "The team members:\n" + team.stream().collect(joining(", ")) + "\nproduced per member:\n" + sum + '\n') - ).single(); + ).publishOn(Schedulers.single()); } private Mono querySalesReport() { @@ -137,7 +137,7 @@ public void errorAndSubscribePositionIsShown() { public void positionOfEveryOperatorIsShown() { WaitMonitor monitor = new WaitMonitor(); - Hooks.onOperator(Hooks.OperatorHook::operatorStacktrace); + Hooks.onOperatorDebug(); Disposable subscription = reportsStream() .subscribeOn(Schedulers.elastic()) diff --git a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java index 96ff1cbc..c9a57333 100644 --- a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java +++ b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java @@ -128,7 +128,7 @@ public void ambiguous() throws Exception { .subscribeOn(Schedulers.elastic()) .retryWhen(retryWithDelay(3)); - StepVerifier.create(Flux.firstEmitting(Arrays.asList(timeout, error, ok)) + StepVerifier.create(Flux.first(timeout, error, ok) .subscribeOn(Schedulers.elastic())) .expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42")) .verifyComplete(); diff --git a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java index 27392636..8328d009 100644 --- a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java +++ b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java @@ -32,7 +32,7 @@ public void backupOnError() throws Exception { String wikiArticle = "42"; StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle) - .onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) + .onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) .subscribeOn(Schedulers.elastic())) .expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42")) .verifyComplete(); @@ -50,7 +50,7 @@ public void defaultValueBackup() throws Exception { final String wikiArticle = "42"; StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle) - .onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) + .onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) .onErrorReturn(getCachedArticle(wikiArticle)) .subscribeOn(Schedulers.elastic())) .expectNext("{{Dieser Artikel|behandelt 42}} ") @@ -75,7 +75,7 @@ public void exponentialRetry() throws Exception { final String wikiArticle = "42"; StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle) .retryWhen(retryWithDelay(3, RetryDelay.exponential())) - .onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) + .onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) .onErrorReturn(getCachedArticle(wikiArticle)) .subscribeOn(Schedulers.elastic())) .expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42")) diff --git a/pom.xml b/pom.xml index 428a3dfc..c860925c 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ io.projectreactor reactor-bom - Aluminium-SR1 + Bismuth-SR5 pom import @@ -30,12 +30,12 @@ io.reactivex.rxjava2 rxjava - 2.0.7 + 2.1.9 com.github.akarnokd rxjava2-extensions - 0.16.0 + 0.18.5 com.github.davidmoten @@ -156,7 +156,7 @@ reactor-core - io.projectreactor.addons + io.projectreactor reactor-test test @@ -209,21 +209,6 @@ de.ruedigermoeller fst - - - io.vertx - vertx-core - - - io.vertx - vertx-core - tests - test - - - io.vertx - vertx-reactive-streams - @@ -297,7 +282,7 @@ netty-example kata-rxjava kata-reactor - kata-vertx + services diff --git a/services/README.md b/services/README.md index 410ee12f..e7e80812 100644 --- a/services/README.md +++ b/services/README.md @@ -8,6 +8,8 @@ Reactive Microservices which loads wiki articles from local files - statistics (running on port 8080):
`mvn spring-boot:run` +
+Frontend is available on http://localhost:8080/index.html ### jmeter (load/performance tests) - have a look at ./jmeter/README.md diff --git a/services/ms-statistics/pom.xml b/services/ms-statistics/pom.xml index f87e9027..39e4d4a1 100644 --- a/services/ms-statistics/pom.xml +++ b/services/ms-statistics/pom.xml @@ -25,7 +25,7 @@ - io.projectreactor.addons + io.projectreactor reactor-test test diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java index 05cf0084..918e43f8 100644 --- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java +++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static java.time.Duration.ofMillis; import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE; /** @@ -49,7 +50,7 @@ public Flux> topArticle(@RequestParam(required = false, default @RequestParam(required = false, defaultValue = "5") int numberOfTopArticles) { return articleReadEventsService.readEvents() .doOnNext(StatisticsController::updateReadStatistic) - .sampleMillis(updateInterval) + .sample(ofMillis(updateInterval)) .map(readEvent -> createTopArticleList(numberOfTopArticles)) .retry(throwable -> { logger.warn("error on topArticle, retrying", throwable); @@ -62,7 +63,7 @@ public Flux> topArticle(@RequestParam(required = false, default public Flux fetchArticleStatistics(@RequestParam(required = false, defaultValue = "1000") int updateInterval) { return articleReadEventsService.readEvents() .onBackpressureDrop(articleReadEvent -> logger.warn("dropping articleReadEvent: " + articleReadEvent)) - .bufferMillis(updateInterval / 3) + .buffer(ofMillis(updateInterval / 3)) .filter(articleReadEvents -> !articleReadEvents.isEmpty()) .flatMap(articleReadEvent -> { Flux distinctArticleNames = Flux.fromIterable(articleReadEvent) @@ -82,7 +83,7 @@ public Flux fetchArticleStatistics(@RequestParam(required = f zip.getT3().get(zip.getT1().getArticleName()), zip.getT1().getFetchTimeInMillis())); }) - .bufferMillis(updateInterval) + .buffer(ofMillis(updateInterval)) .map(StatisticsController::calculateArticleStatistics) .retry(throwable -> { logger.warn("error on fetchArticleStatistics, retrying", throwable); diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java index cb8e64fe..04114ae0 100644 --- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java +++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java @@ -12,6 +12,7 @@ import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** * @author Daniel Heinrich @@ -44,7 +45,8 @@ public Mono fetchWordCount(String articleName) { .retry(1) .doOnNext(WrongStatusException.okFilter()) .flatMap(r -> r.bodyToMono(String.class)) - .map(Integer::parseInt).single(); + .map(Integer::parseInt) + .publishOn(Schedulers.single()); } @Override @@ -56,7 +58,8 @@ public Mono fetchRating(String articleName) { .retry(1) .doOnNext(WrongStatusException.okFilter()) .flatMap(r -> r.bodyToMono(String.class)) - .map(Integer::parseInt).single(); + .map(Integer::parseInt) + .publishOn(Schedulers.single()); } @Override @@ -64,11 +67,12 @@ public Flux fetchWordCounts(Flux articleNames) { return wikiServiceClient.post() .uri(ub -> ub.pathSegment(ARTICLE, WORD_COUNTS).build()) .accept(APPLICATION_STREAM_JSON_UTF8) - .exchange(articleNames, ArticleName.class) + .body(articleNames, ArticleName.class) + .exchange() .doOnError(e -> LOGGER.error(e.getMessage())) .retry(1) .doOnNext(WrongStatusException.okFilter()) - .flatMap(r -> r.bodyToFlux(WordCount.class)); + .flatMapMany(r -> r.bodyToFlux(WordCount.class)); } @Override @@ -76,10 +80,11 @@ public Flux fetchRatings(Flux articleNames) { return wikiServiceClient.post() .uri(ub -> ub.pathSegment(ARTICLE, RATINGS).build()) .accept(APPLICATION_STREAM_JSON_UTF8) - .exchange(articleNames, ArticleName.class) + .body(articleNames, ArticleName.class) + .exchange() .doOnError(e -> LOGGER.error(e.getMessage())) .retry(1) .doOnNext(WrongStatusException.okFilter()) - .flatMap(r -> r.bodyToFlux(Rating.class)); + .flatMapMany(r -> r.bodyToFlux(Rating.class)); } } diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java index 2d6dbde1..f1a8a59b 100644 --- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java +++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java @@ -26,9 +26,8 @@ public Flux readEvents() { return webClient.get() .uri(ub -> ub.pathSegment(ARTICLE, READ_EVENTS).build()) .accept(MediaType.TEXT_EVENT_STREAM) - .contentType(MediaType.TEXT_EVENT_STREAM) .exchange() - .flatMap(response -> response.bodyToFlux(ArticleReadEvent[].class)) + .flatMapMany(response -> response.bodyToFlux(ArticleReadEvent[].class)) .flatMap(articleReadEvents -> Flux.fromArray(articleReadEvents)) .log(); } diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java index 4247d268..3ab5608b 100644 --- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java +++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java @@ -29,7 +29,7 @@ public Mono
fetchArticle(String articleName) { .uri(ARTICLE_ENDPOINT + urlEncode(articleName)) .exchange() .doOnNext(WrongStatusException.okFilter()) - .flatMap(r -> r.bodyToFlux(Article.class)) + .flatMapMany(r -> r.bodyToFlux(Article.class)) .single(); } } diff --git a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java index cd88d106..3d613e11 100644 --- a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java +++ b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java @@ -7,12 +7,13 @@ import org.mockito.Mockito; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClient.HeaderSpec; +import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec; import org.springframework.web.reactive.function.client.WebClient.UriSpec; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -24,14 +25,14 @@ public class ArticleReadEventsServiceTest { private ArticleReadEventsService articleReadEventsService; - private HeaderSpec headerSpec; + private RequestBodySpec requestBodySpec; private UriSpec uriSpec; private WebClient webClient; @Before public void setUp() { - headerSpec = mock(HeaderSpec.class, Mockito.RETURNS_SELF); - uriSpec = mock(UriSpec.class, (invocation) -> headerSpec); + requestBodySpec = mock(RequestBodySpec.class, Mockito.RETURNS_SELF); + uriSpec = mock(UriSpec.class, (invocation) -> requestBodySpec); webClient = mock(WebClient.class, (invocation) -> uriSpec); articleReadEventsService = new ArticleReadEventsService(webClient); @@ -39,11 +40,11 @@ public void setUp() { @Test public void fetchReadEvents() { - Flux flux = Flux.intervalMillis(30).take(3) + Flux flux = Flux.interval(ofMillis(30)).take(3) .map(count -> asList(createReadEvent(count)).toArray(new ArticleReadEvent[]{})); ClientResponse clientResponse = mock(ClientResponse.class); when(clientResponse.bodyToFlux(ArticleReadEvent[].class)).thenReturn(flux); - when(headerSpec.exchange()).thenReturn(Mono.just(clientResponse)); + when(requestBodySpec.exchange()).thenReturn(Mono.just(clientResponse)); Flux result = articleReadEventsService.readEvents(); diff --git a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java index 9e917b20..1f0a9080 100644 --- a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java +++ b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java @@ -11,6 +11,7 @@ import reactor.core.publisher.Flux; import reactor.test.StepVerifier; +import static java.time.Duration.ofMillis; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -39,7 +40,7 @@ public void setUp() throws Exception { @Test public void fetchArticleStatisticsWithDefaultUpdateInterval() { - when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.intervalMillis(100).take(6).map(count -> createReadEvent(count, 100)) + when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.interval(ofMillis(100)).take(6).map(count -> createReadEvent(count, 100)) .doOnNext(next -> System.out.println("readEvent: " + next))); when(articleMetricsServiceMock.fetchRatings(any())).thenAnswer(invocation -> { Flux names = invocation.getArgument(0); @@ -55,8 +56,7 @@ public void fetchArticleStatisticsWithDefaultUpdateInterval() { .exchange() .expectStatus().isOk() .expectHeader().contentType(TEXT_EVENT_STREAM) - .expectBody(ArticleStatistics.class) - .returnResult(); + .returnResult(ArticleStatistics.class); StepVerifier.create(result.getResponseBody() .doOnNext(articleStatistics -> System.out.println("received on clientside: " + articleStatistics))) @@ -67,7 +67,7 @@ public void fetchArticleStatisticsWithDefaultUpdateInterval() { @Test public void fetchArticleStatisticsWithShortUpdateInterval() { - when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.intervalMillis(310).take(4) + when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.interval(ofMillis(310)).take(4) .map(count -> createReadEvent(count, count.intValue()))); when(articleMetricsServiceMock.fetchRatings(any())).thenAnswer(invocation -> { Flux names = invocation.getArgument(0); @@ -94,15 +94,14 @@ public void fetchArticleStatisticsWithShortUpdateInterval() { @Test public void fetchTopArticleWithDefaultQueryParams() { - when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.intervalMillis(245).take(5) + when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.interval(ofMillis(245)).take(5) .flatMap(count -> Flux.just(createReadEvent(count, count.intValue())).repeat(count * 2))); FluxExchangeResult result = testClient.get().uri("/top/article") .exchange() .expectStatus().isOk() .expectHeader().contentType(TEXT_EVENT_STREAM) - .expectBody(TopArticle[].class) - .returnResult(); + .returnResult(TopArticle[].class); StepVerifier.create(result.getResponseBody().flatMap(Flux::fromArray)) .expectNext(createTopArticle("3", 6)) diff --git a/services/ms-wikiloader/pom.xml b/services/ms-wikiloader/pom.xml index 2f81437a..55882ec6 100644 --- a/services/ms-wikiloader/pom.xml +++ b/services/ms-wikiloader/pom.xml @@ -14,6 +14,13 @@ com.senacor.codecamp.reactive common-wiki + + + + log4j + log4j + + org.apache.commons @@ -33,7 +40,7 @@ - io.projectreactor.addons + io.projectreactor reactor-test test diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java index bd9ee0d5..0e215056 100644 --- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java +++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java @@ -1,13 +1,22 @@ package com.senacor.codecamp.reactive.services.wikiloader; import com.fasterxml.jackson.annotation.JsonView; -import com.github.davidmoten.guavamini.Lists; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; import com.senacor.codecamp.reactive.services.wikiloader.model.Article; +import com.senacor.codecamp.reactive.services.wikiloader.model.ArticleName; +import com.senacor.codecamp.reactive.services.wikiloader.model.Rating; +import com.senacor.codecamp.reactive.services.wikiloader.model.WordCount; import com.senacor.codecamp.reactive.services.wikiloader.service.ArticleService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * @author Andreas Keefer @@ -16,7 +25,10 @@ @RequestMapping("/article") public class WikiController { + public static final int BUFFER_READ_EVENTS = 250; + private final ArticleService articleService; + private final DirectProcessor
readArticles = DirectProcessor.create(); @Autowired public WikiController(ArticleService articleService) { @@ -30,9 +42,17 @@ public WikiController(ArticleService articleService) { * @return article with media wiki as content */ @GetMapping("/{name}") - public Article fetchArticle(@PathVariable final String name) { - // DUMMY - return Article.newBuilder().withName(name).build(); + public Mono
fetchArticle(@PathVariable final String name) { + // TODO Sprint 1 + Stopwatch stopwatch = Stopwatch.createUnstarted(); + return articleService.fetchArticle(name) + .doOnSubscribe(subscription -> stopwatch.start()) + .map(content -> Article.newBuilder() + .withName(name) + .withContent(content) + .withFetchTimeInMillis((int) stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)) + .build()) + .log(); } /** @@ -43,7 +63,40 @@ public Article fetchArticle(@PathVariable final String name) { @GetMapping("/readevents") @JsonView(Article.NameOnly.class) public List
getReadStream() { - // DUMMY + // TODO Sprint2: return Lists.newArrayList(Article.newBuilder().withName("42").build()); } + + @GetMapping("/{name}/wordcount") + public Mono getWordCount(@PathVariable String name) { + return articleService.countWords(name) + .log(); + } + + @RequestMapping("/wordcounts") + public Flux countWords(@RequestBody Flux names) { + // TODO Sprint3 + List counts = names.toStream() + .map(articleName -> { + System.out.println("count words for " + articleName.getName()); + Integer count = articleService.countWords(articleName.getName()).block(); + System.out.println("count = " + count); + return new WordCount(articleName.getName(), count); + }) + .collect(Collectors.toList()); + return Flux.fromIterable(counts); + } + + @GetMapping("/{name}/rating") + public Mono getRating(@PathVariable String name) { + return articleService.rate(name) + .log(); + } + + @RequestMapping("/ratings") + public Flux ratings(@RequestBody Flux names) { + return names.flatMap(articleName -> articleService.rate(articleName.getName()) + .map(rating -> new Rating(articleName.getName(), rating))) + .log(); + } } diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java index 34ce4b1c..f937022d 100644 --- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java +++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java @@ -37,6 +37,14 @@ public Mono fetchArticle(String articleName) { return cache.lookup(articleName); } + /** + * @param articleName article name + * @return fetched article from wikipedia as a media wiki formatted string + */ + public String fetchArticleNonReactive(String articleName) { + return fetchArticle(articleName).block(); + } + /** * @param articleName article name * @return a rating of the wiki article from 1 to 5 'stars' diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java index d89d2fde..9b152a91 100644 --- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java +++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java @@ -32,7 +32,7 @@ public PublisherCache(Function> transformer, int cacheSize) { public Mono lookup(I input) { return Mono.justOrEmpty(cache.get(input)) .doOnNext(i -> print("cache hit for key '%s'", input)) - .otherwiseIfEmpty(transformer.apply(input) + .switchIfEmpty(transformer.apply(input) .doOnNext(o -> cache.put(input, o)) ); } diff --git a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java index d41e6206..9e2f5d7f 100644 --- a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java +++ b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java @@ -1,32 +1,112 @@ package com.senacor.codecamp.reactive.services.wikiloader; +import com.senacor.codecamp.reactive.services.CountService; +import com.senacor.codecamp.reactive.services.RatingService; +import com.senacor.codecamp.reactive.services.WikiService; +import com.senacor.codecamp.reactive.services.wikiloader.model.Article; import com.senacor.codecamp.reactive.services.wikiloader.service.ArticleService; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.ReplayProcessor; + +import static com.senacor.codecamp.reactive.util.DelayFunction.withNoDelay; +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; /** * @author Daniel Heinrich * @since 08/03/2017 */ @RunWith(MockitoJUnitRunner.class) +@Ignore public class ReadEventTest { @Mock - private ArticleService service; + private WikiService service; + private WikiController wikiController; @Before public void setup() { - // setup mocks ... + Mockito.when(service.fetchArticleNonBlocking(Mockito.anyString())).thenReturn(Mono.just("")); + wikiController = new WikiController(new ArticleService(service, + CountService.create(withNoDelay()), + RatingService.create(withNoDelay()), + 10)); + } @Test + // TODO Sprint2: activate after signature change public void shouldEmitEventsOnRead() throws InterruptedException { +// ReplayProcessor replay = subscribe(); +// +// wikiController.fetchArticle("foo").subscribe(); +// wikiController.fetchArticle("bar").subscribe(); +// +// Thread.sleep(WikiController.BUFFER_READ_EVENTS + 50); +// +// assertEvents(replay, "foo", "bar"); + } + + @Test + public void shouldntEmitEventsPerDefault() { + assertNoEvents(subscribe()); + } + @Test + // TODO Sprint2: activate after signature change + public void shouldntEmitOldEvents() { +// wikiController.fetchArticle("ha").subscribe(); +// wikiController.fetchArticle("haha").subscribe(); +// assertNoEvents(subscribe()); } - // ... + @Test + public void fetchingWordcountShouldNotEmitEvent() throws InterruptedException { + ReplayProcessor replay = subscribe(); + wikiController.getWordCount("foo").subscribe(); + + Thread.sleep(WikiController.BUFFER_READ_EVENTS + 50); + + assertNoEvents(replay); + } + + @Test + public void fetchingRatingShouldNotEmitEvent() throws InterruptedException { + ReplayProcessor replay = subscribe(); + wikiController.getRating("foo").subscribe(); + + Thread.sleep(WikiController.BUFFER_READ_EVENTS + 50); + + assertNoEvents(replay); + } + + private ReplayProcessor subscribe() { + ReplayProcessor articles = ReplayProcessor.create(); + // TODO Sprint2: remove Flux.just(...) after signature change + Flux.just( + wikiController.getReadStream() + ) + .flatMap(Flux::fromIterable) + .map(Article::getName) + .subscribe(articles); + return articles; + } + + private void assertNoEvents(ReplayProcessor articles) { + assertEvents(articles); + } + + private void assertEvents(ReplayProcessor articles, String... events) { + articles.onComplete(); + assertEquals(asList(events), articles.collectList().block()); + } } diff --git a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java index d2d5a22f..e64fef03 100644 --- a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java +++ b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java @@ -1,14 +1,28 @@ package com.senacor.codecamp.reactive.services.wikiloader; +import com.senacor.codecamp.reactive.services.wikiloader.model.Article; +import com.senacor.codecamp.reactive.services.wikiloader.model.ArticleName; +import com.senacor.codecamp.reactive.services.wikiloader.model.Rating; +import com.senacor.codecamp.reactive.services.wikiloader.model.WordCount; +import com.senacor.codecamp.reactive.util.ReactiveUtil; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.boot.context.embedded.LocalServerPort; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.http.MediaType; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import java.util.Map; + +import static java.time.Duration.ofMillis; +import static org.assertj.core.api.Assertions.assertThat; import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; /** @@ -21,6 +35,8 @@ public class WikiControllerIntegrationTest { @LocalServerPort private int port; + // alternativ zu dem manuellen setUp einfach einen fertigen WebTestClient injizieren lassen + //@Autowired private WebTestClient testClient; private WebClient client; @@ -34,12 +50,94 @@ public void setUp() throws Exception { } @Test - public void fetchArticle() throws Exception { + public void testFetchArticle() throws Exception { + testClient.get().uri("/article/{name}", WikiControllerTest.EIGENWERT_ARTICLE.getName()) + .exchange() + .expectStatus().isOk() + .expectBody(Article.class) + .isEqualTo(WikiControllerTest.EIGENWERT_ARTICLE); + } + + @Test + public void getWordCount() throws Exception { + testClient.get().uri("/article/{name}/wordcount", WikiControllerTest.EIGENWERT_ARTICLE.getName()) + .exchange() + .expectStatus().isOk() + .expectBody(String.class) + .isEqualTo("2"); + } + @Test + @Ignore + // TODO Sprint3 + public void countWords() throws Exception { + MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8"); + Map stringIntegerMap = testClient.post().uri("/article/wordcounts") + //.contentType(mediaType) + .accept(mediaType) + .body(Flux.just(WikiControllerTest.EIGENWERT_ARTICLE.toArticleName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.toArticleName()) + .delayElements(ofMillis(50)), ArticleName.class) + .exchange() + .expectStatus().isOk() + .expectHeader().contentType(mediaType) + .returnResult(WordCount.class) + .getResponseBody() + .collectMap(WordCount::getArticleName, WordCount::getCount) + .block(ofMillis(4000)); + assertThat(stringIntegerMap) + .containsOnlyKeys(WikiControllerTest.EIGENWERT_ARTICLE.getName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.getName()) + .containsValues(2); } @Test - public void getReadStream() throws Exception { + public void getRating() throws Exception { + testClient.get().uri("/article/{name}/rating", WikiControllerTest.EIGENWERT_ARTICLE.getName()) + .exchange() + .expectStatus().isOk() + .expectBody(String.class) + .isEqualTo("5"); + } + + @Test + public void ratings() throws Exception { + MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8"); + Map stringIntegerMap = testClient.post().uri("/article/ratings") + .contentType(MediaType.APPLICATION_JSON) + .accept(mediaType) + .body(Flux.just(WikiControllerTest.EIGENWERT_ARTICLE.toArticleName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.toArticleName()), ArticleName.class) + .exchange() + .expectStatus().isOk() + .expectHeader().contentType(mediaType) + .returnResult(Rating.class) + .getResponseBody() + .collectMap(Rating::getArticleName, Rating::getRating) + .block(ofMillis(4000)); + + assertThat(stringIntegerMap) + .containsOnlyKeys(WikiControllerTest.EIGENWERT_ARTICLE.getName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.getName()) + .containsValues(5); + } + @Test(timeout = 5000) + @Ignore + // TODO Sprint2 + public void readevents() throws Exception { + StepVerifier.create( + client.get().uri("/article/readevents", WikiControllerTest.EIGENWERT_ARTICLE.getName()) + .accept(MediaType.APPLICATION_STREAM_JSON) + .exchange() + .flatMapMany(clientResponse -> clientResponse.bodyToFlux(Article.class)) + .doOnNext(next -> ReactiveUtil.print("received readevent in testclient: %s", next)) + .next() + .doOnSubscribe(subscription -> { + // call fetchArticle + Mono.delay(ofMillis(50)) + .flatMap(delay -> client.get().uri("/article/{name}", WikiControllerTest.EIGENWERT_ARTICLE.getName()) + .exchange()) + .log() + .subscribe(); + }).log() + ).expectNextMatches(article -> article.getFetchTimeInMillis() != null) + .verifyComplete(); } -} \ No newline at end of file +} diff --git a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java index 3f2635a5..9c1e4ecd 100644 --- a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java +++ b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java @@ -1,20 +1,43 @@ package com.senacor.codecamp.reactive.services.wikiloader; +import com.senacor.codecamp.reactive.services.wikiloader.model.Rating; +import com.senacor.codecamp.reactive.services.wikiloader.model.Article; +import com.senacor.codecamp.reactive.services.wikiloader.model.ArticleName; +import com.senacor.codecamp.reactive.services.wikiloader.model.WordCount; import com.senacor.codecamp.reactive.services.CountService; import com.senacor.codecamp.reactive.services.RatingService; import com.senacor.codecamp.reactive.services.WikiService; import com.senacor.codecamp.reactive.services.wikiloader.service.ArticleService; +import com.senacor.codecamp.reactive.util.ReactiveUtil; import org.junit.Before; import org.junit.Test; +import org.springframework.http.MediaType; import org.springframework.test.web.reactive.server.WebTestClient; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; import static com.senacor.codecamp.reactive.util.DelayFunction.withNoDelay; +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; /** * @author Andreas Keefer */ public class WikiControllerTest { + static final Article EIGENWERT_ARTICLE = Article.newBuilder() + .withName("Eigenwert") + .withContent("#REDIRECT [[Eigenwertproblem]]") + .build(); + static final Article EIGENVEKTOR_ARTICLE = Article.newBuilder() + .withName("Eigenvektor") + .withContent("#REDIRECT [[Eigenwertproblem]]") + .build(); + private WebTestClient testClient; @Before @@ -28,7 +51,78 @@ public void setUp() throws Exception { @Test public void fetchArticle() throws Exception { + Article res = testClient.get().uri("/article/{name}", EIGENWERT_ARTICLE.getName()) + .exchange() + .expectStatus().isOk() + .expectBody(Article.class) + .isEqualTo(EIGENWERT_ARTICLE) + .returnResult() + .getResponseBody(); + ReactiveUtil.print(res); + assertThat(res.getFetchTimeInMillis()).isBetween(0, 1000); + } + + @Test + public void getWordCount() throws Exception { + testClient.get().uri("/article/{name}/wordcount", EIGENWERT_ARTICLE.getName()) + .exchange() + .expectStatus().isOk() + .expectBody(String.class) + .isEqualTo("2"); + } + + @Test + public void countWords() throws Exception { + MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8"); + Flux wordCountResult = testClient.post().uri("/article/wordcounts") + .accept(mediaType) + .body(Flux.just(EIGENWERT_ARTICLE.toArticleName(), EIGENVEKTOR_ARTICLE.toArticleName()), ArticleName.class) + .exchange() + .expectStatus().isOk() + .expectHeader().contentType(mediaType) + .returnResult(WordCount.class) + .getResponseBody(); + // We have to transform the list into an set to expect the word counts without an order. + Flux resutAsSet = wordCountResult.bufferTimeout(2, Duration.ofMillis(100)).map(list -> new HashSet(list)); + + Set expectedResult = new HashSet<>(asList( + new WordCount(EIGENWERT_ARTICLE.getName(), 2), + new WordCount(EIGENVEKTOR_ARTICLE.getName(), 2))); + StepVerifier.create(resutAsSet) + .expectNext(expectedResult) + .verifyComplete(); + } + + @Test + public void getRating() throws Exception { + testClient.get().uri("/article/{name}/rating", EIGENWERT_ARTICLE.getName()) + .exchange() + .expectStatus().isOk() + .expectBody(String.class) + .isEqualTo("5"); } + @Test + public void ratings() throws Exception { + MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8"); + Flux ratingsResult = testClient.post().uri("/article/ratings") + .accept(mediaType) + .body(Flux.just(EIGENWERT_ARTICLE.toArticleName(), EIGENVEKTOR_ARTICLE.toArticleName()), ArticleName.class) + .exchange() + .expectStatus().isOk() + .expectHeader().contentType(mediaType) + .returnResult(Rating.class) + .getResponseBody(); + + // We have to transform the list into an set to expect the word counts without an order. + Flux resutAsSet = ratingsResult.bufferTimeout(2, Duration.ofMillis(100)).map(list -> new HashSet(list)); + + Set expectedResult = new HashSet<>(asList( + new Rating(EIGENWERT_ARTICLE.getName(), 5), + new Rating(EIGENVEKTOR_ARTICLE.getName(), 5))); + StepVerifier.create(resutAsSet) + .expectNext(expectedResult) + .verifyComplete(); + } } diff --git a/services/pom.xml b/services/pom.xml index 9a7c1b34..8b31aa07 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -7,7 +7,7 @@ org.springframework.boot spring-boot-starter-parent - 2.0.0.BUILD-SNAPSHOT + 2.0.0.M7 @@ -27,6 +27,13 @@ + + io.projectreactor + reactor-bom + Bismuth-SR5 + pom + import + com.senacor.codecamp.reactive common-wiki