From 9583ef0fdc88358017b5c43ff86686ab7f1c3231 Mon Sep 17 00:00:00 2001 From: Andri Bremm Date: Mon, 5 Jun 2017 00:08:26 +0200 Subject: [PATCH 01/14] Changed to first milestone version of Spring Boot 2.0.0. This includes also a change to: - Spring 5.0.0-RC1 - Reactor 3.1.0.M2 (Bismuth-M2) Adapted code to the API changes in Spring and Reactor. --- common-wiki/pom.xml | 2 +- .../integration/WikipediaServiceJapiImpl.java | 2 +- .../codecamp/reactive/util/DelayProxy.java | 6 +- .../codecamp/reactive/util/FlakyProxy.java | 2 +- kata-reactor/pom.xml | 2 +- .../example/error/DebugWithReactorTest.java | 2 +- .../reactor/solution/Kata7bResilience.java | 6 +- pom.xml | 46 ++++++++----- services/ms-statistics/pom.xml | 2 +- .../statistics/StatisticsController.java | 7 +- .../external/ArticleMetricsServiceImpl.java | 17 +++-- .../external/ArticleReadEventsService.java | 3 +- .../external/WikiLoaderServiceImpl.java | 2 +- .../ArticleReadEventsServiceTest.java | 13 ++-- .../statistics/StatisticsControllerTest.java | 13 ++-- services/ms-wikiloader/pom.xml | 2 +- .../services/wikiloader/WikiController.java | 4 +- .../wikiloader/service/PublisherCache.java | 2 +- .../WikiControllerIntegrationTest.java | 36 +++++----- .../wikiloader/WikiControllerTest.java | 67 +++++++++++-------- services/pom.xml | 9 ++- 21 files changed, 140 insertions(+), 105 deletions(-) diff --git a/common-wiki/pom.xml b/common-wiki/pom.xml index 8a170fb0..4a647501 100644 --- a/common-wiki/pom.xml +++ b/common-wiki/pom.xml @@ -23,7 +23,7 @@ reactor-core - io.projectreactor.addons + io.projectreactor reactor-test test diff --git a/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java b/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java index 59e47392..9947e43f 100644 --- a/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java +++ b/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java @@ -55,7 +55,7 @@ public Mono getArticleNonBlocking(String name) { return Mono.just(name) .map(this::buildQueryUrl) .flatMap(this.client::get) - .flatMap(this::httpResponse2String) + .flatMapMany(this::httpResponse2String) .reduce(String::concat) .map((json) -> parseJsonContend(json, name)); diff --git a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java index 482a9e73..0f529e2c 100644 --- a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java +++ b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java @@ -9,6 +9,8 @@ import java.lang.reflect.Method; import java.util.List; +import static java.time.Duration.ofMillis; + /** * @author Michael Omann * @author Andreas Keefer @@ -33,8 +35,8 @@ private DelayProxy(Object obj, DelayFunction delayFunction) { @Override protected Publisher handlePublisherReturnType(Publisher publisher, Method m, Object[] args) { - return Mono.defer(() -> Mono.just(1).delayElementMillis(delayFunction.delay(m.getName()))) - .flatMap(next -> publisher); + return Mono.defer(() -> Mono.just(1).delayElement(ofMillis(delayFunction.delay(m.getName())))) + .flatMapMany(next -> publisher); } @Override diff --git a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java index 5f322421..370abd1e 100644 --- a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java +++ b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java @@ -36,7 +36,7 @@ protected Publisher handlePublisherReturnType(Publisher publisher, Method return Mono.defer(() -> { flakinessFunction.failOrPass(m.getName()); return Mono.just(1); - }).flatMap(next -> publisher); + }).flatMapMany(next -> publisher); } @Override diff --git a/kata-reactor/pom.xml b/kata-reactor/pom.xml index 64a25ee5..497099e2 100644 --- a/kata-reactor/pom.xml +++ b/kata-reactor/pom.xml @@ -31,7 +31,7 @@ reactor-core - io.projectreactor.addons + io.projectreactor reactor-test test diff --git a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java index 4e49e184..a133cb5c 100644 --- a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java +++ b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java @@ -58,7 +58,7 @@ private Mono queryTeamReport(String teamName) { .map(sum -> "The team members:\n" + team.stream().collect(joining(", ")) + "\nproduced per member:\n" + sum + '\n') - ).single(); + ).publishOn(Schedulers.single()); } private Mono querySalesReport() { diff --git a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java index 27392636..8328d009 100644 --- a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java +++ b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java @@ -32,7 +32,7 @@ public void backupOnError() throws Exception { String wikiArticle = "42"; StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle) - .onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) + .onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) .subscribeOn(Schedulers.elastic())) .expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42")) .verifyComplete(); @@ -50,7 +50,7 @@ public void defaultValueBackup() throws Exception { final String wikiArticle = "42"; StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle) - .onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) + .onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) .onErrorReturn(getCachedArticle(wikiArticle)) .subscribeOn(Schedulers.elastic())) .expectNext("{{Dieser Artikel|behandelt 42}} ") @@ -75,7 +75,7 @@ public void exponentialRetry() throws Exception { final String wikiArticle = "42"; StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle) .retryWhen(retryWithDelay(3, RetryDelay.exponential())) - .onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) + .onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle)) .onErrorReturn(getCachedArticle(wikiArticle)) .subscribeOn(Schedulers.elastic())) .expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42")) diff --git a/pom.xml b/pom.xml index 428a3dfc..54fd0094 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ io.projectreactor reactor-bom - Aluminium-SR1 + Bismuth-M2 pom import @@ -156,7 +156,7 @@ reactor-core - io.projectreactor.addons + io.projectreactor reactor-test test @@ -209,21 +209,6 @@ de.ruedigermoeller fst - - - io.vertx - vertx-core - - - io.vertx - vertx-core - tests - test - - - io.vertx - vertx-reactive-streams - @@ -297,7 +282,32 @@ netty-example kata-rxjava kata-reactor - kata-vertx + services + + + + spring-snapshots + http://repo.spring.io/snapshot + + true + + + + + spring-milestones + http://repo.spring.io/milestone + + + + + spring-snapshots + http://repo.spring.io/snapshot + + + spring-milestones + http://repo.spring.io/milestone + + diff --git a/services/ms-statistics/pom.xml b/services/ms-statistics/pom.xml index f87e9027..39e4d4a1 100644 --- a/services/ms-statistics/pom.xml +++ b/services/ms-statistics/pom.xml @@ -25,7 +25,7 @@ - io.projectreactor.addons + io.projectreactor reactor-test test diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java index 05cf0084..918e43f8 100644 --- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java +++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static java.time.Duration.ofMillis; import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE; /** @@ -49,7 +50,7 @@ public Flux> topArticle(@RequestParam(required = false, default @RequestParam(required = false, defaultValue = "5") int numberOfTopArticles) { return articleReadEventsService.readEvents() .doOnNext(StatisticsController::updateReadStatistic) - .sampleMillis(updateInterval) + .sample(ofMillis(updateInterval)) .map(readEvent -> createTopArticleList(numberOfTopArticles)) .retry(throwable -> { logger.warn("error on topArticle, retrying", throwable); @@ -62,7 +63,7 @@ public Flux> topArticle(@RequestParam(required = false, default public Flux fetchArticleStatistics(@RequestParam(required = false, defaultValue = "1000") int updateInterval) { return articleReadEventsService.readEvents() .onBackpressureDrop(articleReadEvent -> logger.warn("dropping articleReadEvent: " + articleReadEvent)) - .bufferMillis(updateInterval / 3) + .buffer(ofMillis(updateInterval / 3)) .filter(articleReadEvents -> !articleReadEvents.isEmpty()) .flatMap(articleReadEvent -> { Flux distinctArticleNames = Flux.fromIterable(articleReadEvent) @@ -82,7 +83,7 @@ public Flux fetchArticleStatistics(@RequestParam(required = f zip.getT3().get(zip.getT1().getArticleName()), zip.getT1().getFetchTimeInMillis())); }) - .bufferMillis(updateInterval) + .buffer(ofMillis(updateInterval)) .map(StatisticsController::calculateArticleStatistics) .retry(throwable -> { logger.warn("error on fetchArticleStatistics, retrying", throwable); diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java index cb8e64fe..04114ae0 100644 --- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java +++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java @@ -12,6 +12,7 @@ import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** * @author Daniel Heinrich @@ -44,7 +45,8 @@ public Mono fetchWordCount(String articleName) { .retry(1) .doOnNext(WrongStatusException.okFilter()) .flatMap(r -> r.bodyToMono(String.class)) - .map(Integer::parseInt).single(); + .map(Integer::parseInt) + .publishOn(Schedulers.single()); } @Override @@ -56,7 +58,8 @@ public Mono fetchRating(String articleName) { .retry(1) .doOnNext(WrongStatusException.okFilter()) .flatMap(r -> r.bodyToMono(String.class)) - .map(Integer::parseInt).single(); + .map(Integer::parseInt) + .publishOn(Schedulers.single()); } @Override @@ -64,11 +67,12 @@ public Flux fetchWordCounts(Flux articleNames) { return wikiServiceClient.post() .uri(ub -> ub.pathSegment(ARTICLE, WORD_COUNTS).build()) .accept(APPLICATION_STREAM_JSON_UTF8) - .exchange(articleNames, ArticleName.class) + .body(articleNames, ArticleName.class) + .exchange() .doOnError(e -> LOGGER.error(e.getMessage())) .retry(1) .doOnNext(WrongStatusException.okFilter()) - .flatMap(r -> r.bodyToFlux(WordCount.class)); + .flatMapMany(r -> r.bodyToFlux(WordCount.class)); } @Override @@ -76,10 +80,11 @@ public Flux fetchRatings(Flux articleNames) { return wikiServiceClient.post() .uri(ub -> ub.pathSegment(ARTICLE, RATINGS).build()) .accept(APPLICATION_STREAM_JSON_UTF8) - .exchange(articleNames, ArticleName.class) + .body(articleNames, ArticleName.class) + .exchange() .doOnError(e -> LOGGER.error(e.getMessage())) .retry(1) .doOnNext(WrongStatusException.okFilter()) - .flatMap(r -> r.bodyToFlux(Rating.class)); + .flatMapMany(r -> r.bodyToFlux(Rating.class)); } } diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java index 2d6dbde1..f1a8a59b 100644 --- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java +++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java @@ -26,9 +26,8 @@ public Flux readEvents() { return webClient.get() .uri(ub -> ub.pathSegment(ARTICLE, READ_EVENTS).build()) .accept(MediaType.TEXT_EVENT_STREAM) - .contentType(MediaType.TEXT_EVENT_STREAM) .exchange() - .flatMap(response -> response.bodyToFlux(ArticleReadEvent[].class)) + .flatMapMany(response -> response.bodyToFlux(ArticleReadEvent[].class)) .flatMap(articleReadEvents -> Flux.fromArray(articleReadEvents)) .log(); } diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java index 4247d268..3ab5608b 100644 --- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java +++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java @@ -29,7 +29,7 @@ public Mono
fetchArticle(String articleName) { .uri(ARTICLE_ENDPOINT + urlEncode(articleName)) .exchange() .doOnNext(WrongStatusException.okFilter()) - .flatMap(r -> r.bodyToFlux(Article.class)) + .flatMapMany(r -> r.bodyToFlux(Article.class)) .single(); } } diff --git a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java index cd88d106..3d613e11 100644 --- a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java +++ b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java @@ -7,12 +7,13 @@ import org.mockito.Mockito; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClient.HeaderSpec; +import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec; import org.springframework.web.reactive.function.client.WebClient.UriSpec; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -24,14 +25,14 @@ public class ArticleReadEventsServiceTest { private ArticleReadEventsService articleReadEventsService; - private HeaderSpec headerSpec; + private RequestBodySpec requestBodySpec; private UriSpec uriSpec; private WebClient webClient; @Before public void setUp() { - headerSpec = mock(HeaderSpec.class, Mockito.RETURNS_SELF); - uriSpec = mock(UriSpec.class, (invocation) -> headerSpec); + requestBodySpec = mock(RequestBodySpec.class, Mockito.RETURNS_SELF); + uriSpec = mock(UriSpec.class, (invocation) -> requestBodySpec); webClient = mock(WebClient.class, (invocation) -> uriSpec); articleReadEventsService = new ArticleReadEventsService(webClient); @@ -39,11 +40,11 @@ public void setUp() { @Test public void fetchReadEvents() { - Flux flux = Flux.intervalMillis(30).take(3) + Flux flux = Flux.interval(ofMillis(30)).take(3) .map(count -> asList(createReadEvent(count)).toArray(new ArticleReadEvent[]{})); ClientResponse clientResponse = mock(ClientResponse.class); when(clientResponse.bodyToFlux(ArticleReadEvent[].class)).thenReturn(flux); - when(headerSpec.exchange()).thenReturn(Mono.just(clientResponse)); + when(requestBodySpec.exchange()).thenReturn(Mono.just(clientResponse)); Flux result = articleReadEventsService.readEvents(); diff --git a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java index 9e917b20..1f0a9080 100644 --- a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java +++ b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java @@ -11,6 +11,7 @@ import reactor.core.publisher.Flux; import reactor.test.StepVerifier; +import static java.time.Duration.ofMillis; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -39,7 +40,7 @@ public void setUp() throws Exception { @Test public void fetchArticleStatisticsWithDefaultUpdateInterval() { - when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.intervalMillis(100).take(6).map(count -> createReadEvent(count, 100)) + when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.interval(ofMillis(100)).take(6).map(count -> createReadEvent(count, 100)) .doOnNext(next -> System.out.println("readEvent: " + next))); when(articleMetricsServiceMock.fetchRatings(any())).thenAnswer(invocation -> { Flux names = invocation.getArgument(0); @@ -55,8 +56,7 @@ public void fetchArticleStatisticsWithDefaultUpdateInterval() { .exchange() .expectStatus().isOk() .expectHeader().contentType(TEXT_EVENT_STREAM) - .expectBody(ArticleStatistics.class) - .returnResult(); + .returnResult(ArticleStatistics.class); StepVerifier.create(result.getResponseBody() .doOnNext(articleStatistics -> System.out.println("received on clientside: " + articleStatistics))) @@ -67,7 +67,7 @@ public void fetchArticleStatisticsWithDefaultUpdateInterval() { @Test public void fetchArticleStatisticsWithShortUpdateInterval() { - when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.intervalMillis(310).take(4) + when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.interval(ofMillis(310)).take(4) .map(count -> createReadEvent(count, count.intValue()))); when(articleMetricsServiceMock.fetchRatings(any())).thenAnswer(invocation -> { Flux names = invocation.getArgument(0); @@ -94,15 +94,14 @@ public void fetchArticleStatisticsWithShortUpdateInterval() { @Test public void fetchTopArticleWithDefaultQueryParams() { - when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.intervalMillis(245).take(5) + when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.interval(ofMillis(245)).take(5) .flatMap(count -> Flux.just(createReadEvent(count, count.intValue())).repeat(count * 2))); FluxExchangeResult result = testClient.get().uri("/top/article") .exchange() .expectStatus().isOk() .expectHeader().contentType(TEXT_EVENT_STREAM) - .expectBody(TopArticle[].class) - .returnResult(); + .returnResult(TopArticle[].class); StepVerifier.create(result.getResponseBody().flatMap(Flux::fromArray)) .expectNext(createTopArticle("3", 6)) diff --git a/services/ms-wikiloader/pom.xml b/services/ms-wikiloader/pom.xml index 2f81437a..c682375a 100644 --- a/services/ms-wikiloader/pom.xml +++ b/services/ms-wikiloader/pom.xml @@ -33,7 +33,7 @@ - io.projectreactor.addons + io.projectreactor reactor-test test diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java index d8e7a26a..c2774dc0 100644 --- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java +++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static java.time.Duration.ofMillis; + /** * @author Andreas Keefer */ @@ -56,7 +58,7 @@ public Mono
fetchArticle(@PathVariable final String name) { @JsonView(Article.NameOnly.class) public Flux> getReadStream() { return readArticles - .bufferMillis(BUFFER_READ_EVENTS) + .buffer(ofMillis(BUFFER_READ_EVENTS)) .filter(list -> !list.isEmpty()) .log(); } diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java index b60702c8..e127a776 100644 --- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java +++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java @@ -33,7 +33,7 @@ public PublisherCache(Function> transformer, int cacheSize) { public Mono lookup(I input) { return Mono.justOrEmpty(cache.get(input)) .doOnNext(i -> print("cache hit for key '%s'", input)) - .otherwiseIfEmpty(transformer.apply(input) + .switchIfEmpty(transformer.apply(input) .doOnNext(o -> cache.put(input, o)) ); } diff --git a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java index 003b3805..8ac6b807 100644 --- a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java +++ b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java @@ -8,8 +8,8 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.boot.context.embedded.LocalServerPort; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.web.server.LocalServerPort; import org.springframework.http.MediaType; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.reactive.server.WebTestClient; @@ -18,9 +18,9 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.time.Duration; import java.util.Map; +import static java.time.Duration.ofMillis; import static org.assertj.core.api.Assertions.assertThat; import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; @@ -54,7 +54,6 @@ public void testFetchArticle() throws Exception { .exchange() .expectStatus().isOk() .expectBody(Article.class) - .value() .isEqualTo(WikiControllerTest.EIGENWERT_ARTICLE); } @@ -64,7 +63,7 @@ public void getWordCount() throws Exception { .exchange() .expectStatus().isOk() .expectBody(String.class) - .value().isEqualTo("2"); + .isEqualTo("2"); } @Test @@ -73,16 +72,15 @@ public void countWords() throws Exception { Map stringIntegerMap = testClient.post().uri("/article/wordcounts") //.contentType(mediaType) .accept(mediaType) - .exchange(Flux.just(WikiControllerTest.EIGENWERT_ARTICLE.toArticleName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.toArticleName()) - .delayElements(Duration.ofMillis(50)), ArticleName.class) + .body(Flux.just(WikiControllerTest.EIGENWERT_ARTICLE.toArticleName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.toArticleName()) + .delayElements(ofMillis(50)), ArticleName.class) + .exchange() .expectStatus().isOk() .expectHeader().contentType(mediaType) - .expectBody(WordCount.class) - .returnResult() + .returnResult(WordCount.class) .getResponseBody() - .cast(WordCount.class) .collectMap(WordCount::getArticleName, WordCount::getCount) - .blockMillis(4000); + .block(ofMillis(4000)); assertThat(stringIntegerMap) .containsOnlyKeys(WikiControllerTest.EIGENWERT_ARTICLE.getName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.getName()) .containsValues(2); @@ -94,7 +92,7 @@ public void getRating() throws Exception { .exchange() .expectStatus().isOk() .expectBody(String.class) - .value().isEqualTo("5"); + .isEqualTo("5"); } @Test @@ -103,15 +101,14 @@ public void ratings() throws Exception { Map stringIntegerMap = testClient.post().uri("/article/ratings") .contentType(MediaType.APPLICATION_JSON) .accept(mediaType) - .exchange(Flux.just(WikiControllerTest.EIGENWERT_ARTICLE.toArticleName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.toArticleName()), ArticleName.class) + .body(Flux.just(WikiControllerTest.EIGENWERT_ARTICLE.toArticleName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.toArticleName()), ArticleName.class) + .exchange() .expectStatus().isOk() .expectHeader().contentType(mediaType) - .expectBody(Rating.class) - .returnResult() + .returnResult(Rating.class) .getResponseBody() - .cast(Rating.class) .collectMap(Rating::getArticleName, Rating::getRating) - .blockMillis(4000); + .block(ofMillis(4000)); assertThat(stringIntegerMap) .containsOnlyKeys(WikiControllerTest.EIGENWERT_ARTICLE.getName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.getName()) @@ -122,15 +119,14 @@ public void ratings() throws Exception { public void readevents() throws Exception { StepVerifier.create( client.get().uri("/article/readevents", WikiControllerTest.EIGENWERT_ARTICLE.getName()) - .contentType(MediaType.APPLICATION_STREAM_JSON) .accept(MediaType.APPLICATION_STREAM_JSON) .exchange() - .flatMap(clientResponse -> clientResponse.bodyToFlux(Article.class)) + .flatMapMany(clientResponse -> clientResponse.bodyToFlux(Article.class)) .doOnNext(next -> ReactiveUtil.print("received readevent in testclient: %s", next)) .next() .doOnSubscribe(subscription -> { // call fetchArticle - Mono.delayMillis(50) + Mono.delay(ofMillis(50)) .flatMap(delay -> client.get().uri("/article/{name}", WikiControllerTest.EIGENWERT_ARTICLE.getName()) .exchange()) .log() @@ -139,4 +135,4 @@ public void readevents() throws Exception { ).expectNextMatches(article -> article.getFetchTimeInMillis() != null) .verifyComplete(); } -} \ No newline at end of file +} diff --git a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java index f246bebc..9c1e4ecd 100644 --- a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java +++ b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java @@ -17,8 +17,11 @@ import reactor.test.StepVerifier; import java.time.Duration; +import java.util.HashSet; +import java.util.Set; import static com.senacor.codecamp.reactive.util.DelayFunction.withNoDelay; +import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; /** @@ -52,8 +55,8 @@ public void fetchArticle() throws Exception { .exchange() .expectStatus().isOk() .expectBody(Article.class) - .value() .isEqualTo(EIGENWERT_ARTICLE) + .returnResult() .getResponseBody(); ReactiveUtil.print(res); assertThat(res.getFetchTimeInMillis()).isBetween(0, 1000); @@ -65,24 +68,29 @@ public void getWordCount() throws Exception { .exchange() .expectStatus().isOk() .expectBody(String.class) - .value().isEqualTo("2"); + .isEqualTo("2"); } @Test public void countWords() throws Exception { MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8"); - StepVerifier.create( - testClient.post().uri("/article/wordcounts") - //.contentType(mediaType) - .accept(mediaType) - .exchange(Flux.just(EIGENWERT_ARTICLE.toArticleName(), EIGENVEKTOR_ARTICLE.toArticleName()) - .delayElements(Duration.ofMillis(10)), ArticleName.class) - .expectStatus().isOk() - .expectHeader().contentType(mediaType) - .expectBody(WordCount.class) - .returnResult() - .getResponseBody() - ).expectNext(new WordCount(EIGENWERT_ARTICLE.getName(), 2), new WordCount(EIGENVEKTOR_ARTICLE.getName(), 2)) + Flux wordCountResult = testClient.post().uri("/article/wordcounts") + .accept(mediaType) + .body(Flux.just(EIGENWERT_ARTICLE.toArticleName(), EIGENVEKTOR_ARTICLE.toArticleName()), ArticleName.class) + .exchange() + .expectStatus().isOk() + .expectHeader().contentType(mediaType) + .returnResult(WordCount.class) + .getResponseBody(); + + // We have to transform the list into an set to expect the word counts without an order. + Flux resutAsSet = wordCountResult.bufferTimeout(2, Duration.ofMillis(100)).map(list -> new HashSet(list)); + + Set expectedResult = new HashSet<>(asList( + new WordCount(EIGENWERT_ARTICLE.getName(), 2), + new WordCount(EIGENVEKTOR_ARTICLE.getName(), 2))); + StepVerifier.create(resutAsSet) + .expectNext(expectedResult) .verifyComplete(); } @@ -92,24 +100,29 @@ public void getRating() throws Exception { .exchange() .expectStatus().isOk() .expectBody(String.class) - .value().isEqualTo("5"); + .isEqualTo("5"); } @Test public void ratings() throws Exception { MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8"); - StepVerifier.create( - testClient.post().uri("/article/ratings") - //.contentType(mediaType) - .accept(mediaType) - .exchange(Flux.just(EIGENWERT_ARTICLE.toArticleName(), EIGENVEKTOR_ARTICLE.toArticleName()) - .delayElements(Duration.ofMillis(10)), ArticleName.class) - .expectStatus().isOk() - .expectHeader().contentType(mediaType) - .expectBody(Rating.class) - .returnResult() - .getResponseBody() - ).expectNext(new Rating(EIGENWERT_ARTICLE.getName(), 5), new Rating(EIGENVEKTOR_ARTICLE.getName(), 5)) + Flux ratingsResult = testClient.post().uri("/article/ratings") + .accept(mediaType) + .body(Flux.just(EIGENWERT_ARTICLE.toArticleName(), EIGENVEKTOR_ARTICLE.toArticleName()), ArticleName.class) + .exchange() + .expectStatus().isOk() + .expectHeader().contentType(mediaType) + .returnResult(Rating.class) + .getResponseBody(); + + // We have to transform the list into an set to expect the word counts without an order. + Flux resutAsSet = ratingsResult.bufferTimeout(2, Duration.ofMillis(100)).map(list -> new HashSet(list)); + + Set expectedResult = new HashSet<>(asList( + new Rating(EIGENWERT_ARTICLE.getName(), 5), + new Rating(EIGENVEKTOR_ARTICLE.getName(), 5))); + StepVerifier.create(resutAsSet) + .expectNext(expectedResult) .verifyComplete(); } } diff --git a/services/pom.xml b/services/pom.xml index 1dc9d0b6..f0d70d9d 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -7,7 +7,7 @@ org.springframework.boot spring-boot-starter-parent - 2.0.0.BUILD-SNAPSHOT + 2.0.0.M1 @@ -27,6 +27,13 @@ + + io.projectreactor + reactor-bom + Bismuth-M2 + pom + import + com.senacor.codecamp.reactive common-wiki From 74167042d07333c30f76e2950f9431fa0d800b5b Mon Sep 17 00:00:00 2001 From: "Keefer, Andreas" Date: Fri, 10 Mar 2017 17:08:25 +0100 Subject: [PATCH 02/14] preparation for code-camp: Tasks/Sprint descriptions (cherry picked from commit 20109a2) --- docs/uebungen.md | 33 +++++++++++++++++++ .../services/wikiloader/WikiController.java | 10 ++++++ 2 files changed, 43 insertions(+) diff --git a/docs/uebungen.md b/docs/uebungen.md index e6b3dad4..0b26057a 100644 --- a/docs/uebungen.md +++ b/docs/uebungen.md @@ -7,3 +7,36 @@ - rating wordcount endpoints - optimieren mit batch - backpressure handeln + +Sprint 1 - your first nonblocking Endpoint +======== +- Have a look at `WikiController#fetchArticle`. + This endpoint signature is currently like an traditional Spring MVC endpoint. + Change the signature to the 'reactive-way' and implement this service. + Hint: use `ArticleService#fetchArticle` to fetch the article from Wikipedia. +- Write Unittests (`WikiControllerTest`) and integration tests (`WikiControllerIntegrationTest`) +- Start the spring boot application (see `services/README.md`) and try out your new Endpoint, + e.g with curl, a Browser or an RESTClient + +Sprint 2 +======== + + +Sprint 3 +======== + + +Sprint 4 +======== + + +Sprint 5 +======== + + +Sprint 6 +======== + + +Sprint 7 +======== \ No newline at end of file diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java index c2774dc0..46119e03 100644 --- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java +++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java @@ -39,6 +39,12 @@ public WikiController(ArticleService articleService) { this.articleService = articleService; } + /** + * This endpoint fetches an wikipedia article by name as media wiki text. + * + * @param name article name + * @return article with media wiki as content + */ @GetMapping("/{name}") public Mono
fetchArticle(@PathVariable final String name) { Stopwatch stopwatch = Stopwatch.createUnstarted(); @@ -53,6 +59,10 @@ public Mono
fetchArticle(@PathVariable final String name) { .log(); } + /** + * This endpoint streams an event for each article name, which is fetched by {@link #fetchArticle}. + * This is a HOT Source (infinite stream) and acts as a Publisher in a publish/subscribe scenario. + */ @CrossOrigin @GetMapping("/readevents") @JsonView(Article.NameOnly.class) From e1098e9093e1eb3e06957ea536811bfb081295a8 Mon Sep 17 00:00:00 2001 From: "Keefer, Andreas" Date: Fri, 10 Mar 2017 17:13:51 +0100 Subject: [PATCH 03/14] preparation for code-camp: Tasks/Sprint descriptions (cherry picked from commit 3860d1e) --- docs/uebungen.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/uebungen.md b/docs/uebungen.md index 0b26057a..22120ef2 100644 --- a/docs/uebungen.md +++ b/docs/uebungen.md @@ -15,11 +15,14 @@ Sprint 1 - your first nonblocking Endpoint Change the signature to the 'reactive-way' and implement this service. Hint: use `ArticleService#fetchArticle` to fetch the article from Wikipedia. - Write Unittests (`WikiControllerTest`) and integration tests (`WikiControllerIntegrationTest`) -- Start the spring boot application (see `services/README.md`) and try out your new Endpoint, - e.g with curl, a Browser or an RESTClient +- Start the spring boot application (see `services/README.md`) and try out your new endpoint and fetch an article, + e.g with curl, an browser or an RESTClient -Sprint 2 +Sprint 2 - stream articles to the frontend ======== +- Have a look at `WikiController#getReadStream`. + This endpoint signature is currently like an traditional Spring MVC endpoint. + Change the signature to the 'reactive-way' and implement this service. Sprint 3 From adb27aa5579bcda9232641a92c15919b1f97e778 Mon Sep 17 00:00:00 2001 From: "Keefer, Andreas" Date: Fri, 10 Mar 2017 18:40:18 +0100 Subject: [PATCH 04/14] preparation for code-camp: Tasks/Sprint descriptions (cherry picked from commit beba5bb) --- README.md | 5 ++++- docs/uebungen.md | 13 ++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 4d92b8c4..52380d5b 100644 --- a/README.md +++ b/README.md @@ -107,4 +107,7 @@ Spring reactive web - demos (looks a bit outdated) ##### Security / Context / ThreadLocal -- https://github.com/spring-projects/spring-security-reactive/ +- + +##### Spring Data reactive +- \ No newline at end of file diff --git a/docs/uebungen.md b/docs/uebungen.md index 22120ef2..be995af9 100644 --- a/docs/uebungen.md +++ b/docs/uebungen.md @@ -42,4 +42,15 @@ Sprint 6 Sprint 7 -======== \ No newline at end of file +======== + + +Sprint X.1 - improved caching (reactive datastore) +======== +- have a look at `PublisherCache`, this is a quite simple and small LRU cache + to cache the fetched content from wikipedia. You can extend this cache by an + in-memory Mongo/Casandra/Redis Database: + look in the LRU Cache, if not found, look in the Mongo/Casandra Database, + if not found, query wikipedia. + Use Spring Data to query the Database + From 889359a129a253ffcbc351f9b81ddf21eb2fda4f Mon Sep 17 00:00:00 2001 From: "Keefer, Andreas" Date: Fri, 10 Mar 2017 18:52:01 +0100 Subject: [PATCH 05/14] preparation for code-camp: Tasks/Sprint descriptions (cherry picked from commit fc6a23e) --- docs/uebungen.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/uebungen.md b/docs/uebungen.md index be995af9..a7f2ef85 100644 --- a/docs/uebungen.md +++ b/docs/uebungen.md @@ -53,4 +53,11 @@ Sprint X.1 - improved caching (reactive datastore) look in the LRU Cache, if not found, look in the Mongo/Casandra Database, if not found, query wikipedia. Use Spring Data to query the Database - +- After some time (e.g. 1 minute) you want to expire the data. + Implement a reacive timed job which drops outdated data. + +Sprint X.2 - visualizes backpressure buffers +======== +- add a diagram in the frontend which visualizes backpressure buffers in the streams. + add a new statistics endpoint which collects and processes this data and delivers it to the frontend + From b195459afeff9f4284f0eeb72750425edc062a4e Mon Sep 17 00:00:00 2001 From: "Keefer, Andreas" Date: Tue, 13 Jun 2017 16:46:49 +0200 Subject: [PATCH 06/14] tippfehler --- docs/stand_zero.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/stand_zero.md b/docs/stand_zero.md index 687beb8e..497019eb 100644 --- a/docs/stand_zero.md +++ b/docs/stand_zero.md @@ -1,5 +1,5 @@ # Stand ZERO -## webiste +## website - Website wird nicht gelöscht - als extra top 5 From 2b13de2d64bfb4bdc71baf9d971eec4ba46b16d9 Mon Sep 17 00:00:00 2001 From: "Keefer, Andreas" Date: Tue, 13 Jun 2017 18:10:34 +0200 Subject: [PATCH 07/14] - exercises described - Impl adapted to exercises --- docs/uebungen.md | 30 +++------ .../services/wikiloader/WikiController.java | 65 ++++++++++++------- .../wikiloader/service/ArticleService.java | 8 +++ .../services/wikiloader/ReadEventTest.java | 37 ++++++----- .../WikiControllerIntegrationTest.java | 5 ++ 5 files changed, 85 insertions(+), 60 deletions(-) diff --git a/docs/uebungen.md b/docs/uebungen.md index a7f2ef85..80d289dd 100644 --- a/docs/uebungen.md +++ b/docs/uebungen.md @@ -14,7 +14,7 @@ Sprint 1 - your first nonblocking Endpoint This endpoint signature is currently like an traditional Spring MVC endpoint. Change the signature to the 'reactive-way' and implement this service. Hint: use `ArticleService#fetchArticle` to fetch the article from Wikipedia. -- Write Unittests (`WikiControllerTest`) and integration tests (`WikiControllerIntegrationTest`) +- Ensure Unittests (`WikiControllerTest`) and integration tests (`WikiControllerIntegrationTest`) are still running - Start the spring boot application (see `services/README.md`) and try out your new endpoint and fetch an article, e.g with curl, an browser or an RESTClient @@ -23,26 +23,20 @@ Sprint 2 - stream articles to the frontend - Have a look at `WikiController#getReadStream`. This endpoint signature is currently like an traditional Spring MVC endpoint. Change the signature to the 'reactive-way' and implement this service. - + Hint: You have to use a Processor, which acts as a subscriber and as a publisher. + In `WikiController` there is already a Processor named `readArticles`, use this one. +- Activate the Unittests in `ReadEventTest` and `WikiControllerIntegrationTest` and look if they run successfully. Sprint 3 ======== - +- Have a look at `WikiController#countWords`. + This Implementation is not too bad, but is's blocking. Change it to non-blocking. +- Ensure Unittests (`WikiControllerTest`) and are still running successfully + and active integration tests (`WikiControllerIntegrationTest`) Sprint 4 ======== - - -Sprint 5 -======== - - -Sprint 6 -======== - - -Sprint 7 -======== +? Sprint X.1 - improved caching (reactive datastore) @@ -55,9 +49,3 @@ Sprint X.1 - improved caching (reactive datastore) Use Spring Data to query the Database - After some time (e.g. 1 minute) you want to expire the data. Implement a reacive timed job which drops outdated data. - -Sprint X.2 - visualizes backpressure buffers -======== -- add a diagram in the frontend which visualizes backpressure buffers in the streams. - add a new statistics endpoint which collects and processes this data and delivers it to the frontend - diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java index 46119e03..7425ae64 100644 --- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java +++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java @@ -2,15 +2,12 @@ import com.fasterxml.jackson.annotation.JsonView; import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; import com.senacor.codecamp.reactive.services.wikiloader.model.Article; import com.senacor.codecamp.reactive.services.wikiloader.model.ArticleName; import com.senacor.codecamp.reactive.services.wikiloader.model.Rating; import com.senacor.codecamp.reactive.services.wikiloader.model.WordCount; -import com.senacor.codecamp.reactive.services.CountService; -import com.senacor.codecamp.reactive.services.RatingService; -import com.senacor.codecamp.reactive.services.WikiService; import com.senacor.codecamp.reactive.services.wikiloader.service.ArticleService; -import de.tudarmstadt.ukp.wikipedia.parser.ParsedPage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.DirectProcessor; @@ -19,8 +16,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; - -import static java.time.Duration.ofMillis; +import java.util.stream.Collectors; /** * @author Andreas Keefer @@ -46,17 +42,15 @@ public WikiController(ArticleService articleService) { * @return article with media wiki as content */ @GetMapping("/{name}") - public Mono
fetchArticle(@PathVariable final String name) { + public Article fetchArticle(@PathVariable final String name) { + // TODO Sprint 1 Stopwatch stopwatch = Stopwatch.createUnstarted(); - return articleService.fetchArticle(name) - .doOnSubscribe(subscription -> stopwatch.start()) - .map(content -> Article.newBuilder() - .withName(name) - .withContent(content) - .withFetchTimeInMillis((int) stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)) - .build()) - .doOnNext(readArticles::onNext) - .log(); + stopwatch.start(); + String articleContent = articleService.fetchArticleNonReactive(name); + return Article.newBuilder().withName(name) + .withContent(articleContent) + .withFetchTimeInMillis((int) stopwatch.elapsed(TimeUnit.MILLISECONDS)) + .build(); } /** @@ -66,11 +60,9 @@ public Mono
fetchArticle(@PathVariable final String name) { @CrossOrigin @GetMapping("/readevents") @JsonView(Article.NameOnly.class) - public Flux> getReadStream() { - return readArticles - .buffer(ofMillis(BUFFER_READ_EVENTS)) - .filter(list -> !list.isEmpty()) - .log(); + public List
getReadStream() { + // TODO Sprint2: + return Lists.newArrayList(Article.newBuilder().withName("42").build()); } @GetMapping("/{name}/wordcount") @@ -79,11 +71,36 @@ public Mono getWordCount(@PathVariable String name) { .log(); } +// @RequestMapping("/wordcounts") +// public Flux countWords(@RequestBody Flux names) { +// return names.flatMap(articleName -> articleService.countWords(articleName.getName()) +// .map(count -> new WordCount(articleName.getName(), count))) +// .log(); +// } + +// @RequestMapping("/wordcounts") +// public Flux countWords(@RequestBody Flux names) { +// List counts = names.toStream() +// .map(articleName -> { +// Integer count = articleService.countWords(articleName.getName()).block(); +// return new WordCount(articleName.getName(), count); +// }) +// .collect(Collectors.toList()); +// return Flux.fromIterable(counts); +// } + @RequestMapping("/wordcounts") public Flux countWords(@RequestBody Flux names) { - return names.flatMap(articleName -> articleService.countWords(articleName.getName()) - .map(count -> new WordCount(articleName.getName(), count))) - .log(); + // TODO Sprint3 + List counts = names.toStream() + .map(articleName -> { + System.out.println("count words for " + articleName.getName()); + Integer count = articleService.countWords(articleName.getName()).block(); + System.out.println("count = " + count); + return new WordCount(articleName.getName(), count); + }) + .collect(Collectors.toList()); + return Flux.fromIterable(counts); } @GetMapping("/{name}/rating") diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java index 34ce4b1c..f937022d 100644 --- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java +++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java @@ -37,6 +37,14 @@ public Mono fetchArticle(String articleName) { return cache.lookup(articleName); } + /** + * @param articleName article name + * @return fetched article from wikipedia as a media wiki formatted string + */ + public String fetchArticleNonReactive(String articleName) { + return fetchArticle(articleName).block(); + } + /** * @param articleName article name * @return a rating of the wiki article from 1 to 5 'stars' diff --git a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java index ee7410ba..9e2f5d7f 100644 --- a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java +++ b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java @@ -6,6 +6,7 @@ import com.senacor.codecamp.reactive.services.wikiloader.model.Article; import com.senacor.codecamp.reactive.services.wikiloader.service.ArticleService; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -24,6 +25,7 @@ * @since 08/03/2017 */ @RunWith(MockitoJUnitRunner.class) +@Ignore public class ReadEventTest { @Mock @@ -42,15 +44,16 @@ public void setup() { } @Test + // TODO Sprint2: activate after signature change public void shouldEmitEventsOnRead() throws InterruptedException { - ReplayProcessor replay = subscribe(); - - wikiController.fetchArticle("foo").subscribe(); - wikiController.fetchArticle("bar").subscribe(); - - Thread.sleep(WikiController.BUFFER_READ_EVENTS + 50); - - assertEvents(replay, "foo", "bar"); +// ReplayProcessor replay = subscribe(); +// +// wikiController.fetchArticle("foo").subscribe(); +// wikiController.fetchArticle("bar").subscribe(); +// +// Thread.sleep(WikiController.BUFFER_READ_EVENTS + 50); +// +// assertEvents(replay, "foo", "bar"); } @Test @@ -59,10 +62,11 @@ public void shouldntEmitEventsPerDefault() { } @Test + // TODO Sprint2: activate after signature change public void shouldntEmitOldEvents() { - wikiController.fetchArticle("ha").subscribe(); - wikiController.fetchArticle("haha").subscribe(); - assertNoEvents(subscribe()); +// wikiController.fetchArticle("ha").subscribe(); +// wikiController.fetchArticle("haha").subscribe(); +// assertNoEvents(subscribe()); } @Test @@ -87,10 +91,13 @@ public void fetchingRatingShouldNotEmitEvent() throws InterruptedException { private ReplayProcessor subscribe() { ReplayProcessor articles = ReplayProcessor.create(); - wikiController.getReadStream() - .flatMap(Flux::fromIterable) - .map(Article::getName) - .subscribe(articles); + // TODO Sprint2: remove Flux.just(...) after signature change + Flux.just( + wikiController.getReadStream() + ) + .flatMap(Flux::fromIterable) + .map(Article::getName) + .subscribe(articles); return articles; } diff --git a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java index 8ac6b807..e64fef03 100644 --- a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java +++ b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java @@ -6,6 +6,7 @@ import com.senacor.codecamp.reactive.services.wikiloader.model.WordCount; import com.senacor.codecamp.reactive.util.ReactiveUtil; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; @@ -67,6 +68,8 @@ public void getWordCount() throws Exception { } @Test + @Ignore + // TODO Sprint3 public void countWords() throws Exception { MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8"); Map stringIntegerMap = testClient.post().uri("/article/wordcounts") @@ -116,6 +119,8 @@ public void ratings() throws Exception { } @Test(timeout = 5000) + @Ignore + // TODO Sprint2 public void readevents() throws Exception { StepVerifier.create( client.get().uri("/article/readevents", WikiControllerTest.EIGENWERT_ARTICLE.getName()) From edbcd58a166d388a6dd8b60bce36788112637ca7 Mon Sep 17 00:00:00 2001 From: "Keefer, Andreas" Date: Tue, 13 Jun 2017 18:28:55 +0200 Subject: [PATCH 08/14] - exercises described --- docs/uebungen.md | 2 ++ services/README.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docs/uebungen.md b/docs/uebungen.md index 80d289dd..6d2e2d34 100644 --- a/docs/uebungen.md +++ b/docs/uebungen.md @@ -26,6 +26,8 @@ Sprint 2 - stream articles to the frontend Hint: You have to use a Processor, which acts as a subscriber and as a publisher. In `WikiController` there is already a Processor named `readArticles`, use this one. - Activate the Unittests in `ReadEventTest` and `WikiControllerIntegrationTest` and look if they run successfully. +- Start the sprig boot application and also the ms-statistics service which contains a frontend (see `services/README.md`) + and try out your endpoints Sprint 3 ======== diff --git a/services/README.md b/services/README.md index 410ee12f..e7e80812 100644 --- a/services/README.md +++ b/services/README.md @@ -8,6 +8,8 @@ Reactive Microservices which loads wiki articles from local files - statistics (running on port 8080):
`mvn spring-boot:run` +
+Frontend is available on http://localhost:8080/index.html ### jmeter (load/performance tests) - have a look at ./jmeter/README.md From 4afc979525141c507a27823c47d4336ed52dd81d Mon Sep 17 00:00:00 2001 From: "Keefer, Andreas" Date: Tue, 13 Jun 2017 18:35:20 +0200 Subject: [PATCH 09/14] solution deleted --- .../services/wikiloader/WikiController.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java index 7425ae64..39bfc22c 100644 --- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java +++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java @@ -71,24 +71,6 @@ public Mono getWordCount(@PathVariable String name) { .log(); } -// @RequestMapping("/wordcounts") -// public Flux countWords(@RequestBody Flux names) { -// return names.flatMap(articleName -> articleService.countWords(articleName.getName()) -// .map(count -> new WordCount(articleName.getName(), count))) -// .log(); -// } - -// @RequestMapping("/wordcounts") -// public Flux countWords(@RequestBody Flux names) { -// List counts = names.toStream() -// .map(articleName -> { -// Integer count = articleService.countWords(articleName.getName()).block(); -// return new WordCount(articleName.getName(), count); -// }) -// .collect(Collectors.toList()); -// return Flux.fromIterable(counts); -// } - @RequestMapping("/wordcounts") public Flux countWords(@RequestBody Flux names) { // TODO Sprint3 From 633a4c2c8318280b703d841b8edf9dba87964658 Mon Sep 17 00:00:00 2001 From: Andri Bremm Date: Tue, 13 Jun 2017 22:45:04 +0200 Subject: [PATCH 10/14] Exclude log4j dependency provided by common-wiki, to use default Spring Boot logback settings. --- services/ms-wikiloader/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/services/ms-wikiloader/pom.xml b/services/ms-wikiloader/pom.xml index c682375a..55882ec6 100644 --- a/services/ms-wikiloader/pom.xml +++ b/services/ms-wikiloader/pom.xml @@ -14,6 +14,13 @@ com.senacor.codecamp.reactive common-wiki + + + + log4j + log4j + + org.apache.commons From 7cdf993b1e6f4884d1bf0cd92fe71782b4893a4c Mon Sep 17 00:00:00 2001 From: "Keefer, Andreas" Date: Fri, 23 Jun 2017 10:06:13 +0200 Subject: [PATCH 11/14] cleanup --- docs/uebungen.md | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/docs/uebungen.md b/docs/uebungen.md index 6d2e2d34..64f4e813 100644 --- a/docs/uebungen.md +++ b/docs/uebungen.md @@ -1,12 +1,4 @@ -1. article fetch - - as -- read event + frontend - - frontend read geht schon -- stats consumer -- stats endpoint -- rating wordcount endpoints -- optimieren mit batch -- backpressure handeln + Sprint 1 - your first nonblocking Endpoint ======== From 4f36f612ab0cda8dc9de5a87f0241468713794ac Mon Sep 17 00:00:00 2001 From: Andri Bremm Date: Fri, 23 Jun 2017 10:36:40 +0200 Subject: [PATCH 12/14] Solution Sprint 1 --- .../services/wikiloader/WikiController.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java index 39bfc22c..0e215056 100644 --- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java +++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java @@ -42,15 +42,17 @@ public WikiController(ArticleService articleService) { * @return article with media wiki as content */ @GetMapping("/{name}") - public Article fetchArticle(@PathVariable final String name) { + public Mono
fetchArticle(@PathVariable final String name) { // TODO Sprint 1 Stopwatch stopwatch = Stopwatch.createUnstarted(); - stopwatch.start(); - String articleContent = articleService.fetchArticleNonReactive(name); - return Article.newBuilder().withName(name) - .withContent(articleContent) - .withFetchTimeInMillis((int) stopwatch.elapsed(TimeUnit.MILLISECONDS)) - .build(); + return articleService.fetchArticle(name) + .doOnSubscribe(subscription -> stopwatch.start()) + .map(content -> Article.newBuilder() + .withName(name) + .withContent(content) + .withFetchTimeInMillis((int) stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)) + .build()) + .log(); } /** From 4ecb5f53252ee011d70d19ed6395c07937fa9eb1 Mon Sep 17 00:00:00 2001 From: Daniel Heinrich Date: Sat, 27 Jan 2018 17:32:48 +0100 Subject: [PATCH 13/14] update reactor to newest release --- .../example/error/DebugWithReactorTest.java | 2 +- .../reactor/solution/Kata7aResilience.java | 2 +- pom.xml | 31 ++----------------- 3 files changed, 5 insertions(+), 30 deletions(-) diff --git a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java index a133cb5c..691f2fa4 100644 --- a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java +++ b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java @@ -137,7 +137,7 @@ public void errorAndSubscribePositionIsShown() { public void positionOfEveryOperatorIsShown() { WaitMonitor monitor = new WaitMonitor(); - Hooks.onOperator(Hooks.OperatorHook::operatorStacktrace); + Hooks.onOperatorDebug(); Disposable subscription = reportsStream() .subscribeOn(Schedulers.elastic()) diff --git a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java index 96ff1cbc..c9a57333 100644 --- a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java +++ b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java @@ -128,7 +128,7 @@ public void ambiguous() throws Exception { .subscribeOn(Schedulers.elastic()) .retryWhen(retryWithDelay(3)); - StepVerifier.create(Flux.firstEmitting(Arrays.asList(timeout, error, ok)) + StepVerifier.create(Flux.first(timeout, error, ok) .subscribeOn(Schedulers.elastic())) .expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42")) .verifyComplete(); diff --git a/pom.xml b/pom.xml index 54fd0094..c860925c 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ io.projectreactor reactor-bom - Bismuth-M2 + Bismuth-SR5 pom import @@ -30,12 +30,12 @@ io.reactivex.rxjava2 rxjava - 2.0.7 + 2.1.9 com.github.akarnokd rxjava2-extensions - 0.16.0 + 0.18.5 com.github.davidmoten @@ -285,29 +285,4 @@ services - - - - spring-snapshots - http://repo.spring.io/snapshot - - true - - - - - spring-milestones - http://repo.spring.io/milestone - - - - - spring-snapshots - http://repo.spring.io/snapshot - - - spring-milestones - http://repo.spring.io/milestone - - From 99d39980d0794a864c400267e5019ab1974981d0 Mon Sep 17 00:00:00 2001 From: Daniel Heinrich Date: Sat, 27 Jan 2018 17:32:59 +0100 Subject: [PATCH 14/14] update spring boot --- services/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/pom.xml b/services/pom.xml index 6be68c77..8b31aa07 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -7,7 +7,7 @@ org.springframework.boot spring-boot-starter-parent - 2.0.0.M1 + 2.0.0.M7 @@ -30,7 +30,7 @@ io.projectreactor reactor-bom - Bismuth-M2 + Bismuth-SR5 pom import