diff --git a/README.md b/README.md
index 7d4ad47e..e5daaffc 100644
--- a/README.md
+++ b/README.md
@@ -110,4 +110,4 @@ Spring reactive web
-
##### Spring Data reactive
--
+-
\ No newline at end of file
diff --git a/common-wiki/pom.xml b/common-wiki/pom.xml
index 8a170fb0..4a647501 100644
--- a/common-wiki/pom.xml
+++ b/common-wiki/pom.xml
@@ -23,7 +23,7 @@
reactor-core
- io.projectreactor.addons
+ io.projectreactor
reactor-test
test
diff --git a/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java b/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java
index 59e47392..9947e43f 100644
--- a/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java
+++ b/common-wiki/src/main/java/com/senacor/codecamp/reactive/services/integration/WikipediaServiceJapiImpl.java
@@ -55,7 +55,7 @@ public Mono getArticleNonBlocking(String name) {
return Mono.just(name)
.map(this::buildQueryUrl)
.flatMap(this.client::get)
- .flatMap(this::httpResponse2String)
+ .flatMapMany(this::httpResponse2String)
.reduce(String::concat)
.map((json) -> parseJsonContend(json, name));
diff --git a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java
index 5ab4b8eb..0f529e2c 100644
--- a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java
+++ b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/DelayProxy.java
@@ -7,9 +7,10 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.time.Duration;
import java.util.List;
+import static java.time.Duration.ofMillis;
+
/**
* @author Michael Omann
* @author Andreas Keefer
@@ -34,8 +35,8 @@ private DelayProxy(Object obj, DelayFunction delayFunction) {
@Override
protected Publisher> handlePublisherReturnType(Publisher> publisher, Method m, Object[] args) {
- return Mono.defer(() -> Mono.just(1).delayElement(Duration.ofMillis(delayFunction.delay(m.getName()))))
- .flatMap(next -> publisher);
+ return Mono.defer(() -> Mono.just(1).delayElement(ofMillis(delayFunction.delay(m.getName()))))
+ .flatMapMany(next -> publisher);
}
@Override
diff --git a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java
index 5f322421..370abd1e 100644
--- a/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java
+++ b/common-wiki/src/main/java/com/senacor/codecamp/reactive/util/FlakyProxy.java
@@ -36,7 +36,7 @@ protected Publisher> handlePublisherReturnType(Publisher> publisher, Method
return Mono.defer(() -> {
flakinessFunction.failOrPass(m.getName());
return Mono.just(1);
- }).flatMap(next -> publisher);
+ }).flatMapMany(next -> publisher);
}
@Override
diff --git a/docs/stand_zero.md b/docs/stand_zero.md
index 687beb8e..497019eb 100644
--- a/docs/stand_zero.md
+++ b/docs/stand_zero.md
@@ -1,5 +1,5 @@
# Stand ZERO
-## webiste
+## website
- Website wird nicht gelöscht
- als extra top 5
diff --git a/docs/uebungen.md b/docs/uebungen.md
index 9e9eb4ef..b6e5add7 100644
--- a/docs/uebungen.md
+++ b/docs/uebungen.md
@@ -1,12 +1,4 @@
-1. article fetch
- - as
-- read event + frontend
- - frontend read geht schon
-- stats consumer
-- stats endpoint
-- rating wordcount endpoints
-- optimieren mit batch
-- backpressure handeln
+
Sprint 1 - your first nonblocking Endpoint
========
@@ -14,7 +6,7 @@ Sprint 1 - your first nonblocking Endpoint
This endpoint signature is currently like an traditional Spring MVC endpoint.
Change the signature to the 'reactive-way' and implement this service.
Hint: use `ArticleService#fetchArticle` to fetch the article from Wikipedia.
-- Write Unittests (`WikiControllerTest`) and integration tests (`WikiControllerIntegrationTest`)
+- Ensure Unittests (`WikiControllerTest`) and integration tests (`WikiControllerIntegrationTest`) are still running
- Start the spring boot application (see `services/README.md`) and try out your new endpoint and fetch an article,
e.g with curl, an browser or an RESTClient
@@ -23,43 +15,22 @@ Sprint 2 - stream articles to the frontend
- Have a look at `WikiController#getReadStream`.
This endpoint signature is currently like an traditional Spring MVC endpoint.
Change the signature to the 'reactive-way' and implement this service.
- For each call to `WikiController#fetchArticle` there should be emitted an Event on
- this `#getReadStream`.
- Hint: you need a `org.reactivestreams.Processor`, have a look what reactor offers.
-- Write Unittests (`ReadEventTest`) and integration tests (`WikiControllerIntegrationTest`)
-- start ms-wikiloader and ms-statistics. ms-statistics contains a basic frontend, which consumes
- the `#getReadStream` endpoint and draws a diagram. The URL is printed out in the console after
- startup of the service.
-- execute some calls to the `#fetchArticle` endpoint and watch the diagram.
-- Start jmeter (see `services/README.md`), open an provided testplan (*.jmx) and give some load on `#fetchArticle`.
- At this point it is better to start the wikiloader service in mock mode
- by activating the profile `mock` (see `services/README.md`). This should be the default
- from now on.
-
-Sprint 3 - getReadStream optimization
-========
-- Maybe you noticed some problems when jmeter put heavy load on `#fetchArticle`
- When you publish every single 'articleRead'-Event on it's own, spring will flush
- the HTTP connection to the frontend for each event, this produces a lot of overhead.
- Now try to reduce the overhead and deliver the Events in batches.
- Hint: A good batch size could be 250 milliseconds -> 4 flushes per second.
-- Write tests
-- Test it with jmeter and the frontend
-
-Sprint 4
-========
-
+ Hint: You have to use a Processor, which acts as a subscriber and as a publisher.
+ In `WikiController` there is already a Processor named `readArticles`, use this one.
+- Activate the Unittests in `ReadEventTest` and `WikiControllerIntegrationTest` and look if they run successfully.
+- Start the sprig boot application and also the ms-statistics service which contains a frontend (see `services/README.md`)
+ and try out your endpoints
-Sprint 5
+Sprint 3
========
+- Have a look at `WikiController#countWords`.
+ This Implementation is not too bad, but is's blocking. Change it to non-blocking.
+- Ensure Unittests (`WikiControllerTest`) and are still running successfully
+ and active integration tests (`WikiControllerIntegrationTest`)
-
-Sprint 6
-========
-
-
-Sprint 7
+Sprint 4
========
+?
Sprint X.1 - improved caching (reactive datastore)
@@ -72,9 +43,9 @@ Sprint X.1 - improved caching (reactive datastore)
Use Spring Data to query the Database
- After some time (e.g. 1 minute) you want to expire the data.
Implement a reacive timed job which drops outdated data.
-
+
Sprint X.2 - visualizes backpressure buffers
========
- add a diagram in the frontend which visualizes backpressure buffers in the streams.
add a new statistics endpoint which collects and processes this data and delivers it to the frontend
-
+
\ No newline at end of file
diff --git a/kata-reactor/pom.xml b/kata-reactor/pom.xml
index 64a25ee5..497099e2 100644
--- a/kata-reactor/pom.xml
+++ b/kata-reactor/pom.xml
@@ -31,7 +31,7 @@
reactor-core
- io.projectreactor.addons
+ io.projectreactor
reactor-test
test
diff --git a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java
index 4e49e184..691f2fa4 100644
--- a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java
+++ b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/example/error/DebugWithReactorTest.java
@@ -58,7 +58,7 @@ private Mono queryTeamReport(String teamName) {
.map(sum -> "The team members:\n" +
team.stream().collect(joining(", ")) +
"\nproduced per member:\n" + sum + '\n')
- ).single();
+ ).publishOn(Schedulers.single());
}
private Mono querySalesReport() {
@@ -137,7 +137,7 @@ public void errorAndSubscribePositionIsShown() {
public void positionOfEveryOperatorIsShown() {
WaitMonitor monitor = new WaitMonitor();
- Hooks.onOperator(Hooks.OperatorHook::operatorStacktrace);
+ Hooks.onOperatorDebug();
Disposable subscription = reportsStream()
.subscribeOn(Schedulers.elastic())
diff --git a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java
index 96ff1cbc..c9a57333 100644
--- a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java
+++ b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7aResilience.java
@@ -128,7 +128,7 @@ public void ambiguous() throws Exception {
.subscribeOn(Schedulers.elastic())
.retryWhen(retryWithDelay(3));
- StepVerifier.create(Flux.firstEmitting(Arrays.asList(timeout, error, ok))
+ StepVerifier.create(Flux.first(timeout, error, ok)
.subscribeOn(Schedulers.elastic()))
.expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42"))
.verifyComplete();
diff --git a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java
index 27392636..8328d009 100644
--- a/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java
+++ b/kata-reactor/src/test/java/com/senacor/codecamp/reactive/katas/codecamp/reactor/solution/Kata7bResilience.java
@@ -32,7 +32,7 @@ public void backupOnError() throws Exception {
String wikiArticle = "42";
StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle)
- .onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
+ .onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
.subscribeOn(Schedulers.elastic()))
.expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42"))
.verifyComplete();
@@ -50,7 +50,7 @@ public void defaultValueBackup() throws Exception {
final String wikiArticle = "42";
StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle)
- .onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
+ .onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
.onErrorReturn(getCachedArticle(wikiArticle))
.subscribeOn(Schedulers.elastic()))
.expectNext("{{Dieser Artikel|behandelt 42}} ")
@@ -75,7 +75,7 @@ public void exponentialRetry() throws Exception {
final String wikiArticle = "42";
StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle)
.retryWhen(retryWithDelay(3, RetryDelay.exponential()))
- .onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
+ .onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
.onErrorReturn(getCachedArticle(wikiArticle))
.subscribeOn(Schedulers.elastic()))
.expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42"))
diff --git a/pom.xml b/pom.xml
index 428a3dfc..c860925c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
io.projectreactor
reactor-bom
- Aluminium-SR1
+ Bismuth-SR5
pom
import
@@ -30,12 +30,12 @@
io.reactivex.rxjava2
rxjava
- 2.0.7
+ 2.1.9
com.github.akarnokd
rxjava2-extensions
- 0.16.0
+ 0.18.5
com.github.davidmoten
@@ -156,7 +156,7 @@
reactor-core
- io.projectreactor.addons
+ io.projectreactor
reactor-test
test
@@ -209,21 +209,6 @@
de.ruedigermoeller
fst
-
-
- io.vertx
- vertx-core
-
-
- io.vertx
- vertx-core
- tests
- test
-
-
- io.vertx
- vertx-reactive-streams
-
@@ -297,7 +282,7 @@
netty-example
kata-rxjava
kata-reactor
- kata-vertx
+
services
diff --git a/services/README.md b/services/README.md
index 410ee12f..e7e80812 100644
--- a/services/README.md
+++ b/services/README.md
@@ -8,6 +8,8 @@ Reactive Microservices
which loads wiki articles from local files
- statistics (running on port 8080):
`mvn spring-boot:run`
+
+Frontend is available on http://localhost:8080/index.html
### jmeter (load/performance tests)
- have a look at ./jmeter/README.md
diff --git a/services/ms-statistics/pom.xml b/services/ms-statistics/pom.xml
index f87e9027..39e4d4a1 100644
--- a/services/ms-statistics/pom.xml
+++ b/services/ms-statistics/pom.xml
@@ -25,7 +25,7 @@
- io.projectreactor.addons
+ io.projectreactor
reactor-test
test
diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java
index 05cf0084..918e43f8 100644
--- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java
+++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/StatisticsController.java
@@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import static java.time.Duration.ofMillis;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE;
/**
@@ -49,7 +50,7 @@ public Flux> topArticle(@RequestParam(required = false, default
@RequestParam(required = false, defaultValue = "5") int numberOfTopArticles) {
return articleReadEventsService.readEvents()
.doOnNext(StatisticsController::updateReadStatistic)
- .sampleMillis(updateInterval)
+ .sample(ofMillis(updateInterval))
.map(readEvent -> createTopArticleList(numberOfTopArticles))
.retry(throwable -> {
logger.warn("error on topArticle, retrying", throwable);
@@ -62,7 +63,7 @@ public Flux> topArticle(@RequestParam(required = false, default
public Flux fetchArticleStatistics(@RequestParam(required = false, defaultValue = "1000") int updateInterval) {
return articleReadEventsService.readEvents()
.onBackpressureDrop(articleReadEvent -> logger.warn("dropping articleReadEvent: " + articleReadEvent))
- .bufferMillis(updateInterval / 3)
+ .buffer(ofMillis(updateInterval / 3))
.filter(articleReadEvents -> !articleReadEvents.isEmpty())
.flatMap(articleReadEvent -> {
Flux distinctArticleNames = Flux.fromIterable(articleReadEvent)
@@ -82,7 +83,7 @@ public Flux fetchArticleStatistics(@RequestParam(required = f
zip.getT3().get(zip.getT1().getArticleName()),
zip.getT1().getFetchTimeInMillis()));
})
- .bufferMillis(updateInterval)
+ .buffer(ofMillis(updateInterval))
.map(StatisticsController::calculateArticleStatistics)
.retry(throwable -> {
logger.warn("error on fetchArticleStatistics, retrying", throwable);
diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java
index cb8e64fe..04114ae0 100644
--- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java
+++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleMetricsServiceImpl.java
@@ -12,6 +12,7 @@
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
/**
* @author Daniel Heinrich
@@ -44,7 +45,8 @@ public Mono fetchWordCount(String articleName) {
.retry(1)
.doOnNext(WrongStatusException.okFilter())
.flatMap(r -> r.bodyToMono(String.class))
- .map(Integer::parseInt).single();
+ .map(Integer::parseInt)
+ .publishOn(Schedulers.single());
}
@Override
@@ -56,7 +58,8 @@ public Mono fetchRating(String articleName) {
.retry(1)
.doOnNext(WrongStatusException.okFilter())
.flatMap(r -> r.bodyToMono(String.class))
- .map(Integer::parseInt).single();
+ .map(Integer::parseInt)
+ .publishOn(Schedulers.single());
}
@Override
@@ -64,11 +67,12 @@ public Flux fetchWordCounts(Flux articleNames) {
return wikiServiceClient.post()
.uri(ub -> ub.pathSegment(ARTICLE, WORD_COUNTS).build())
.accept(APPLICATION_STREAM_JSON_UTF8)
- .exchange(articleNames, ArticleName.class)
+ .body(articleNames, ArticleName.class)
+ .exchange()
.doOnError(e -> LOGGER.error(e.getMessage()))
.retry(1)
.doOnNext(WrongStatusException.okFilter())
- .flatMap(r -> r.bodyToFlux(WordCount.class));
+ .flatMapMany(r -> r.bodyToFlux(WordCount.class));
}
@Override
@@ -76,10 +80,11 @@ public Flux fetchRatings(Flux articleNames) {
return wikiServiceClient.post()
.uri(ub -> ub.pathSegment(ARTICLE, RATINGS).build())
.accept(APPLICATION_STREAM_JSON_UTF8)
- .exchange(articleNames, ArticleName.class)
+ .body(articleNames, ArticleName.class)
+ .exchange()
.doOnError(e -> LOGGER.error(e.getMessage()))
.retry(1)
.doOnNext(WrongStatusException.okFilter())
- .flatMap(r -> r.bodyToFlux(Rating.class));
+ .flatMapMany(r -> r.bodyToFlux(Rating.class));
}
}
diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java
index 2d6dbde1..f1a8a59b 100644
--- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java
+++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/ArticleReadEventsService.java
@@ -26,9 +26,8 @@ public Flux readEvents() {
return webClient.get()
.uri(ub -> ub.pathSegment(ARTICLE, READ_EVENTS).build())
.accept(MediaType.TEXT_EVENT_STREAM)
- .contentType(MediaType.TEXT_EVENT_STREAM)
.exchange()
- .flatMap(response -> response.bodyToFlux(ArticleReadEvent[].class))
+ .flatMapMany(response -> response.bodyToFlux(ArticleReadEvent[].class))
.flatMap(articleReadEvents -> Flux.fromArray(articleReadEvents))
.log();
}
diff --git a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java
index 4247d268..3ab5608b 100644
--- a/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java
+++ b/services/ms-statistics/src/main/java/com/senacor/codecamp/reactive/services/statistics/external/WikiLoaderServiceImpl.java
@@ -29,7 +29,7 @@ public Mono fetchArticle(String articleName) {
.uri(ARTICLE_ENDPOINT + urlEncode(articleName))
.exchange()
.doOnNext(WrongStatusException.okFilter())
- .flatMap(r -> r.bodyToFlux(Article.class))
+ .flatMapMany(r -> r.bodyToFlux(Article.class))
.single();
}
}
diff --git a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java
index cd88d106..3d613e11 100644
--- a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java
+++ b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/ArticleReadEventsServiceTest.java
@@ -7,12 +7,13 @@
import org.mockito.Mockito;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.HeaderSpec;
+import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec;
import org.springframework.web.reactive.function.client.WebClient.UriSpec;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -24,14 +25,14 @@ public class ArticleReadEventsServiceTest {
private ArticleReadEventsService articleReadEventsService;
- private HeaderSpec headerSpec;
+ private RequestBodySpec requestBodySpec;
private UriSpec uriSpec;
private WebClient webClient;
@Before
public void setUp() {
- headerSpec = mock(HeaderSpec.class, Mockito.RETURNS_SELF);
- uriSpec = mock(UriSpec.class, (invocation) -> headerSpec);
+ requestBodySpec = mock(RequestBodySpec.class, Mockito.RETURNS_SELF);
+ uriSpec = mock(UriSpec.class, (invocation) -> requestBodySpec);
webClient = mock(WebClient.class, (invocation) -> uriSpec);
articleReadEventsService = new ArticleReadEventsService(webClient);
@@ -39,11 +40,11 @@ public void setUp() {
@Test
public void fetchReadEvents() {
- Flux flux = Flux.intervalMillis(30).take(3)
+ Flux flux = Flux.interval(ofMillis(30)).take(3)
.map(count -> asList(createReadEvent(count)).toArray(new ArticleReadEvent[]{}));
ClientResponse clientResponse = mock(ClientResponse.class);
when(clientResponse.bodyToFlux(ArticleReadEvent[].class)).thenReturn(flux);
- when(headerSpec.exchange()).thenReturn(Mono.just(clientResponse));
+ when(requestBodySpec.exchange()).thenReturn(Mono.just(clientResponse));
Flux result = articleReadEventsService.readEvents();
diff --git a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java
index 9e917b20..1f0a9080 100644
--- a/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java
+++ b/services/ms-statistics/src/test/java/com/senacor/codecamp/reactive/services/statistics/StatisticsControllerTest.java
@@ -11,6 +11,7 @@
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
+import static java.time.Duration.ofMillis;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -39,7 +40,7 @@ public void setUp() throws Exception {
@Test
public void fetchArticleStatisticsWithDefaultUpdateInterval() {
- when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.intervalMillis(100).take(6).map(count -> createReadEvent(count, 100))
+ when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.interval(ofMillis(100)).take(6).map(count -> createReadEvent(count, 100))
.doOnNext(next -> System.out.println("readEvent: " + next)));
when(articleMetricsServiceMock.fetchRatings(any())).thenAnswer(invocation -> {
Flux names = invocation.getArgument(0);
@@ -55,8 +56,7 @@ public void fetchArticleStatisticsWithDefaultUpdateInterval() {
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(TEXT_EVENT_STREAM)
- .expectBody(ArticleStatistics.class)
- .returnResult();
+ .returnResult(ArticleStatistics.class);
StepVerifier.create(result.getResponseBody()
.doOnNext(articleStatistics -> System.out.println("received on clientside: " + articleStatistics)))
@@ -67,7 +67,7 @@ public void fetchArticleStatisticsWithDefaultUpdateInterval() {
@Test
public void fetchArticleStatisticsWithShortUpdateInterval() {
- when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.intervalMillis(310).take(4)
+ when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.interval(ofMillis(310)).take(4)
.map(count -> createReadEvent(count, count.intValue())));
when(articleMetricsServiceMock.fetchRatings(any())).thenAnswer(invocation -> {
Flux names = invocation.getArgument(0);
@@ -94,15 +94,14 @@ public void fetchArticleStatisticsWithShortUpdateInterval() {
@Test
public void fetchTopArticleWithDefaultQueryParams() {
- when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.intervalMillis(245).take(5)
+ when(articleReadEventsServiceMock.readEvents()).thenReturn(Flux.interval(ofMillis(245)).take(5)
.flatMap(count -> Flux.just(createReadEvent(count, count.intValue())).repeat(count * 2)));
FluxExchangeResult result = testClient.get().uri("/top/article")
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(TEXT_EVENT_STREAM)
- .expectBody(TopArticle[].class)
- .returnResult();
+ .returnResult(TopArticle[].class);
StepVerifier.create(result.getResponseBody().flatMap(Flux::fromArray))
.expectNext(createTopArticle("3", 6))
diff --git a/services/ms-wikiloader/pom.xml b/services/ms-wikiloader/pom.xml
index 2f81437a..55882ec6 100644
--- a/services/ms-wikiloader/pom.xml
+++ b/services/ms-wikiloader/pom.xml
@@ -14,6 +14,13 @@
com.senacor.codecamp.reactive
common-wiki
+
+
+
+ log4j
+ log4j
+
+
org.apache.commons
@@ -33,7 +40,7 @@
- io.projectreactor.addons
+ io.projectreactor
reactor-test
test
diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java
index bd9ee0d5..0e215056 100644
--- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java
+++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/WikiController.java
@@ -1,13 +1,22 @@
package com.senacor.codecamp.reactive.services.wikiloader;
import com.fasterxml.jackson.annotation.JsonView;
-import com.github.davidmoten.guavamini.Lists;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
import com.senacor.codecamp.reactive.services.wikiloader.model.Article;
+import com.senacor.codecamp.reactive.services.wikiloader.model.ArticleName;
+import com.senacor.codecamp.reactive.services.wikiloader.model.Rating;
+import com.senacor.codecamp.reactive.services.wikiloader.model.WordCount;
import com.senacor.codecamp.reactive.services.wikiloader.service.ArticleService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/**
* @author Andreas Keefer
@@ -16,7 +25,10 @@
@RequestMapping("/article")
public class WikiController {
+ public static final int BUFFER_READ_EVENTS = 250;
+
private final ArticleService articleService;
+ private final DirectProcessor readArticles = DirectProcessor.create();
@Autowired
public WikiController(ArticleService articleService) {
@@ -30,9 +42,17 @@ public WikiController(ArticleService articleService) {
* @return article with media wiki as content
*/
@GetMapping("/{name}")
- public Article fetchArticle(@PathVariable final String name) {
- // DUMMY
- return Article.newBuilder().withName(name).build();
+ public Mono fetchArticle(@PathVariable final String name) {
+ // TODO Sprint 1
+ Stopwatch stopwatch = Stopwatch.createUnstarted();
+ return articleService.fetchArticle(name)
+ .doOnSubscribe(subscription -> stopwatch.start())
+ .map(content -> Article.newBuilder()
+ .withName(name)
+ .withContent(content)
+ .withFetchTimeInMillis((int) stopwatch.stop().elapsed(TimeUnit.MILLISECONDS))
+ .build())
+ .log();
}
/**
@@ -43,7 +63,40 @@ public Article fetchArticle(@PathVariable final String name) {
@GetMapping("/readevents")
@JsonView(Article.NameOnly.class)
public List getReadStream() {
- // DUMMY
+ // TODO Sprint2:
return Lists.newArrayList(Article.newBuilder().withName("42").build());
}
+
+ @GetMapping("/{name}/wordcount")
+ public Mono getWordCount(@PathVariable String name) {
+ return articleService.countWords(name)
+ .log();
+ }
+
+ @RequestMapping("/wordcounts")
+ public Flux countWords(@RequestBody Flux names) {
+ // TODO Sprint3
+ List counts = names.toStream()
+ .map(articleName -> {
+ System.out.println("count words for " + articleName.getName());
+ Integer count = articleService.countWords(articleName.getName()).block();
+ System.out.println("count = " + count);
+ return new WordCount(articleName.getName(), count);
+ })
+ .collect(Collectors.toList());
+ return Flux.fromIterable(counts);
+ }
+
+ @GetMapping("/{name}/rating")
+ public Mono getRating(@PathVariable String name) {
+ return articleService.rate(name)
+ .log();
+ }
+
+ @RequestMapping("/ratings")
+ public Flux ratings(@RequestBody Flux names) {
+ return names.flatMap(articleName -> articleService.rate(articleName.getName())
+ .map(rating -> new Rating(articleName.getName(), rating)))
+ .log();
+ }
}
diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java
index 34ce4b1c..f937022d 100644
--- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java
+++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/ArticleService.java
@@ -37,6 +37,14 @@ public Mono fetchArticle(String articleName) {
return cache.lookup(articleName);
}
+ /**
+ * @param articleName article name
+ * @return fetched article from wikipedia as a media wiki formatted string
+ */
+ public String fetchArticleNonReactive(String articleName) {
+ return fetchArticle(articleName).block();
+ }
+
/**
* @param articleName article name
* @return a rating of the wiki article from 1 to 5 'stars'
diff --git a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java
index d89d2fde..9b152a91 100644
--- a/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java
+++ b/services/ms-wikiloader/src/main/java/com/senacor/codecamp/reactive/services/wikiloader/service/PublisherCache.java
@@ -32,7 +32,7 @@ public PublisherCache(Function> transformer, int cacheSize) {
public Mono lookup(I input) {
return Mono.justOrEmpty(cache.get(input))
.doOnNext(i -> print("cache hit for key '%s'", input))
- .otherwiseIfEmpty(transformer.apply(input)
+ .switchIfEmpty(transformer.apply(input)
.doOnNext(o -> cache.put(input, o))
);
}
diff --git a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java
index d41e6206..9e2f5d7f 100644
--- a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java
+++ b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/ReadEventTest.java
@@ -1,32 +1,112 @@
package com.senacor.codecamp.reactive.services.wikiloader;
+import com.senacor.codecamp.reactive.services.CountService;
+import com.senacor.codecamp.reactive.services.RatingService;
+import com.senacor.codecamp.reactive.services.WikiService;
+import com.senacor.codecamp.reactive.services.wikiloader.model.Article;
import com.senacor.codecamp.reactive.services.wikiloader.service.ArticleService;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.ReplayProcessor;
+
+import static com.senacor.codecamp.reactive.util.DelayFunction.withNoDelay;
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
/**
* @author Daniel Heinrich
* @since 08/03/2017
*/
@RunWith(MockitoJUnitRunner.class)
+@Ignore
public class ReadEventTest {
@Mock
- private ArticleService service;
+ private WikiService service;
+
private WikiController wikiController;
@Before
public void setup() {
- // setup mocks ...
+ Mockito.when(service.fetchArticleNonBlocking(Mockito.anyString())).thenReturn(Mono.just(""));
+ wikiController = new WikiController(new ArticleService(service,
+ CountService.create(withNoDelay()),
+ RatingService.create(withNoDelay()),
+ 10));
+
}
@Test
+ // TODO Sprint2: activate after signature change
public void shouldEmitEventsOnRead() throws InterruptedException {
+// ReplayProcessor replay = subscribe();
+//
+// wikiController.fetchArticle("foo").subscribe();
+// wikiController.fetchArticle("bar").subscribe();
+//
+// Thread.sleep(WikiController.BUFFER_READ_EVENTS + 50);
+//
+// assertEvents(replay, "foo", "bar");
+ }
+
+ @Test
+ public void shouldntEmitEventsPerDefault() {
+ assertNoEvents(subscribe());
+ }
+ @Test
+ // TODO Sprint2: activate after signature change
+ public void shouldntEmitOldEvents() {
+// wikiController.fetchArticle("ha").subscribe();
+// wikiController.fetchArticle("haha").subscribe();
+// assertNoEvents(subscribe());
}
- // ...
+ @Test
+ public void fetchingWordcountShouldNotEmitEvent() throws InterruptedException {
+ ReplayProcessor replay = subscribe();
+ wikiController.getWordCount("foo").subscribe();
+
+ Thread.sleep(WikiController.BUFFER_READ_EVENTS + 50);
+
+ assertNoEvents(replay);
+ }
+
+ @Test
+ public void fetchingRatingShouldNotEmitEvent() throws InterruptedException {
+ ReplayProcessor replay = subscribe();
+ wikiController.getRating("foo").subscribe();
+
+ Thread.sleep(WikiController.BUFFER_READ_EVENTS + 50);
+
+ assertNoEvents(replay);
+ }
+
+ private ReplayProcessor subscribe() {
+ ReplayProcessor articles = ReplayProcessor.create();
+ // TODO Sprint2: remove Flux.just(...) after signature change
+ Flux.just(
+ wikiController.getReadStream()
+ )
+ .flatMap(Flux::fromIterable)
+ .map(Article::getName)
+ .subscribe(articles);
+ return articles;
+ }
+
+ private void assertNoEvents(ReplayProcessor articles) {
+ assertEvents(articles);
+ }
+
+ private void assertEvents(ReplayProcessor articles, String... events) {
+ articles.onComplete();
+ assertEquals(asList(events), articles.collectList().block());
+ }
}
diff --git a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java
index d2d5a22f..e64fef03 100644
--- a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java
+++ b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerIntegrationTest.java
@@ -1,14 +1,28 @@
package com.senacor.codecamp.reactive.services.wikiloader;
+import com.senacor.codecamp.reactive.services.wikiloader.model.Article;
+import com.senacor.codecamp.reactive.services.wikiloader.model.ArticleName;
+import com.senacor.codecamp.reactive.services.wikiloader.model.Rating;
+import com.senacor.codecamp.reactive.services.wikiloader.model.WordCount;
+import com.senacor.codecamp.reactive.util.ReactiveUtil;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.springframework.boot.context.embedded.LocalServerPort;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import java.util.Map;
+
+import static java.time.Duration.ofMillis;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
@@ -21,6 +35,8 @@ public class WikiControllerIntegrationTest {
@LocalServerPort
private int port;
+ // alternativ zu dem manuellen setUp einfach einen fertigen WebTestClient injizieren lassen
+ //@Autowired
private WebTestClient testClient;
private WebClient client;
@@ -34,12 +50,94 @@ public void setUp() throws Exception {
}
@Test
- public void fetchArticle() throws Exception {
+ public void testFetchArticle() throws Exception {
+ testClient.get().uri("/article/{name}", WikiControllerTest.EIGENWERT_ARTICLE.getName())
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(Article.class)
+ .isEqualTo(WikiControllerTest.EIGENWERT_ARTICLE);
+ }
+
+ @Test
+ public void getWordCount() throws Exception {
+ testClient.get().uri("/article/{name}/wordcount", WikiControllerTest.EIGENWERT_ARTICLE.getName())
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(String.class)
+ .isEqualTo("2");
+ }
+ @Test
+ @Ignore
+ // TODO Sprint3
+ public void countWords() throws Exception {
+ MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8");
+ Map stringIntegerMap = testClient.post().uri("/article/wordcounts")
+ //.contentType(mediaType)
+ .accept(mediaType)
+ .body(Flux.just(WikiControllerTest.EIGENWERT_ARTICLE.toArticleName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.toArticleName())
+ .delayElements(ofMillis(50)), ArticleName.class)
+ .exchange()
+ .expectStatus().isOk()
+ .expectHeader().contentType(mediaType)
+ .returnResult(WordCount.class)
+ .getResponseBody()
+ .collectMap(WordCount::getArticleName, WordCount::getCount)
+ .block(ofMillis(4000));
+ assertThat(stringIntegerMap)
+ .containsOnlyKeys(WikiControllerTest.EIGENWERT_ARTICLE.getName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.getName())
+ .containsValues(2);
}
@Test
- public void getReadStream() throws Exception {
+ public void getRating() throws Exception {
+ testClient.get().uri("/article/{name}/rating", WikiControllerTest.EIGENWERT_ARTICLE.getName())
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(String.class)
+ .isEqualTo("5");
+ }
+
+ @Test
+ public void ratings() throws Exception {
+ MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8");
+ Map stringIntegerMap = testClient.post().uri("/article/ratings")
+ .contentType(MediaType.APPLICATION_JSON)
+ .accept(mediaType)
+ .body(Flux.just(WikiControllerTest.EIGENWERT_ARTICLE.toArticleName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.toArticleName()), ArticleName.class)
+ .exchange()
+ .expectStatus().isOk()
+ .expectHeader().contentType(mediaType)
+ .returnResult(Rating.class)
+ .getResponseBody()
+ .collectMap(Rating::getArticleName, Rating::getRating)
+ .block(ofMillis(4000));
+
+ assertThat(stringIntegerMap)
+ .containsOnlyKeys(WikiControllerTest.EIGENWERT_ARTICLE.getName(), WikiControllerTest.EIGENVEKTOR_ARTICLE.getName())
+ .containsValues(5);
+ }
+ @Test(timeout = 5000)
+ @Ignore
+ // TODO Sprint2
+ public void readevents() throws Exception {
+ StepVerifier.create(
+ client.get().uri("/article/readevents", WikiControllerTest.EIGENWERT_ARTICLE.getName())
+ .accept(MediaType.APPLICATION_STREAM_JSON)
+ .exchange()
+ .flatMapMany(clientResponse -> clientResponse.bodyToFlux(Article.class))
+ .doOnNext(next -> ReactiveUtil.print("received readevent in testclient: %s", next))
+ .next()
+ .doOnSubscribe(subscription -> {
+ // call fetchArticle
+ Mono.delay(ofMillis(50))
+ .flatMap(delay -> client.get().uri("/article/{name}", WikiControllerTest.EIGENWERT_ARTICLE.getName())
+ .exchange())
+ .log()
+ .subscribe();
+ }).log()
+ ).expectNextMatches(article -> article.getFetchTimeInMillis() != null)
+ .verifyComplete();
}
-}
\ No newline at end of file
+}
diff --git a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java
index 3f2635a5..9c1e4ecd 100644
--- a/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java
+++ b/services/ms-wikiloader/src/test/java/com/senacor/codecamp/reactive/services/wikiloader/WikiControllerTest.java
@@ -1,20 +1,43 @@
package com.senacor.codecamp.reactive.services.wikiloader;
+import com.senacor.codecamp.reactive.services.wikiloader.model.Rating;
+import com.senacor.codecamp.reactive.services.wikiloader.model.Article;
+import com.senacor.codecamp.reactive.services.wikiloader.model.ArticleName;
+import com.senacor.codecamp.reactive.services.wikiloader.model.WordCount;
import com.senacor.codecamp.reactive.services.CountService;
import com.senacor.codecamp.reactive.services.RatingService;
import com.senacor.codecamp.reactive.services.WikiService;
import com.senacor.codecamp.reactive.services.wikiloader.service.ArticleService;
+import com.senacor.codecamp.reactive.util.ReactiveUtil;
import org.junit.Before;
import org.junit.Test;
+import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
import static com.senacor.codecamp.reactive.util.DelayFunction.withNoDelay;
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Andreas Keefer
*/
public class WikiControllerTest {
+ static final Article EIGENWERT_ARTICLE = Article.newBuilder()
+ .withName("Eigenwert")
+ .withContent("#REDIRECT [[Eigenwertproblem]]")
+ .build();
+ static final Article EIGENVEKTOR_ARTICLE = Article.newBuilder()
+ .withName("Eigenvektor")
+ .withContent("#REDIRECT [[Eigenwertproblem]]")
+ .build();
+
private WebTestClient testClient;
@Before
@@ -28,7 +51,78 @@ public void setUp() throws Exception {
@Test
public void fetchArticle() throws Exception {
+ Article res = testClient.get().uri("/article/{name}", EIGENWERT_ARTICLE.getName())
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(Article.class)
+ .isEqualTo(EIGENWERT_ARTICLE)
+ .returnResult()
+ .getResponseBody();
+ ReactiveUtil.print(res);
+ assertThat(res.getFetchTimeInMillis()).isBetween(0, 1000);
+ }
+
+ @Test
+ public void getWordCount() throws Exception {
+ testClient.get().uri("/article/{name}/wordcount", EIGENWERT_ARTICLE.getName())
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(String.class)
+ .isEqualTo("2");
+ }
+
+ @Test
+ public void countWords() throws Exception {
+ MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8");
+ Flux wordCountResult = testClient.post().uri("/article/wordcounts")
+ .accept(mediaType)
+ .body(Flux.just(EIGENWERT_ARTICLE.toArticleName(), EIGENVEKTOR_ARTICLE.toArticleName()), ArticleName.class)
+ .exchange()
+ .expectStatus().isOk()
+ .expectHeader().contentType(mediaType)
+ .returnResult(WordCount.class)
+ .getResponseBody();
+ // We have to transform the list into an set to expect the word counts without an order.
+ Flux resutAsSet = wordCountResult.bufferTimeout(2, Duration.ofMillis(100)).map(list -> new HashSet(list));
+
+ Set expectedResult = new HashSet<>(asList(
+ new WordCount(EIGENWERT_ARTICLE.getName(), 2),
+ new WordCount(EIGENVEKTOR_ARTICLE.getName(), 2)));
+ StepVerifier.create(resutAsSet)
+ .expectNext(expectedResult)
+ .verifyComplete();
+ }
+
+ @Test
+ public void getRating() throws Exception {
+ testClient.get().uri("/article/{name}/rating", EIGENWERT_ARTICLE.getName())
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(String.class)
+ .isEqualTo("5");
}
+ @Test
+ public void ratings() throws Exception {
+ MediaType mediaType = MediaType.valueOf(MediaType.APPLICATION_STREAM_JSON_VALUE + ";charset=UTF-8");
+ Flux ratingsResult = testClient.post().uri("/article/ratings")
+ .accept(mediaType)
+ .body(Flux.just(EIGENWERT_ARTICLE.toArticleName(), EIGENVEKTOR_ARTICLE.toArticleName()), ArticleName.class)
+ .exchange()
+ .expectStatus().isOk()
+ .expectHeader().contentType(mediaType)
+ .returnResult(Rating.class)
+ .getResponseBody();
+
+ // We have to transform the list into an set to expect the word counts without an order.
+ Flux resutAsSet = ratingsResult.bufferTimeout(2, Duration.ofMillis(100)).map(list -> new HashSet(list));
+
+ Set expectedResult = new HashSet<>(asList(
+ new Rating(EIGENWERT_ARTICLE.getName(), 5),
+ new Rating(EIGENVEKTOR_ARTICLE.getName(), 5)));
+ StepVerifier.create(resutAsSet)
+ .expectNext(expectedResult)
+ .verifyComplete();
+ }
}
diff --git a/services/pom.xml b/services/pom.xml
index 9a7c1b34..8b31aa07 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -7,7 +7,7 @@
org.springframework.boot
spring-boot-starter-parent
- 2.0.0.BUILD-SNAPSHOT
+ 2.0.0.M7
@@ -27,6 +27,13 @@
+
+ io.projectreactor
+ reactor-bom
+ Bismuth-SR5
+ pom
+ import
+
com.senacor.codecamp.reactive
common-wiki