Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,4 @@ Spring reactive web
- <https://github.com/spring-projects/spring-security-reactive/>

##### Spring Data reactive
- <https://spring.io/blog/2016/11/28/going-reactive-with-spring-data>
- <https://spring.io/blog/2016/11/28/going-reactive-with-spring-data>
2 changes: 1 addition & 1 deletion common-wiki/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Mono<String> getArticleNonBlocking(String name) {
return Mono.just(name)
.map(this::buildQueryUrl)
.flatMap(this.client::get)
.flatMap(this::httpResponse2String)
.flatMapMany(this::httpResponse2String)
.reduce(String::concat)
.map((json) -> parseJsonContend(json, name));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.List;

import static java.time.Duration.ofMillis;

/**
* @author Michael Omann
* @author Andreas Keefer
Expand All @@ -34,8 +35,8 @@ private DelayProxy(Object obj, DelayFunction delayFunction) {

@Override
protected Publisher<?> handlePublisherReturnType(Publisher<?> publisher, Method m, Object[] args) {
return Mono.defer(() -> Mono.just(1).delayElement(Duration.ofMillis(delayFunction.delay(m.getName()))))
.flatMap(next -> publisher);
return Mono.defer(() -> Mono.just(1).delayElement(ofMillis(delayFunction.delay(m.getName()))))
.flatMapMany(next -> publisher);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protected Publisher<?> handlePublisherReturnType(Publisher<?> publisher, Method
return Mono.defer(() -> {
flakinessFunction.failOrPass(m.getName());
return Mono.just(1);
}).flatMap(next -> publisher);
}).flatMapMany(next -> publisher);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion docs/stand_zero.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Stand ZERO
## webiste
## website
- Website wird nicht gelöscht
- als extra top 5

Expand Down
61 changes: 16 additions & 45 deletions docs/uebungen.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
1. article fetch
- as
- read event + frontend
- frontend read geht schon
- stats consumer
- stats endpoint
- rating wordcount endpoints
- optimieren mit batch
- backpressure handeln


Sprint 1 - your first nonblocking Endpoint
========
- Have a look at `WikiController#fetchArticle`.
This endpoint signature is currently like an traditional Spring MVC endpoint.
Change the signature to the 'reactive-way' and implement this service.
Hint: use `ArticleService#fetchArticle` to fetch the article from Wikipedia.
- Write Unittests (`WikiControllerTest`) and integration tests (`WikiControllerIntegrationTest`)
- Ensure Unittests (`WikiControllerTest`) and integration tests (`WikiControllerIntegrationTest`) are still running
- Start the spring boot application (see `services/README.md`) and try out your new endpoint and fetch an article,
e.g with curl, an browser or an RESTClient <http://localhost:8081/article/...>

Expand All @@ -23,43 +15,22 @@ Sprint 2 - stream articles to the frontend
- Have a look at `WikiController#getReadStream`.
This endpoint signature is currently like an traditional Spring MVC endpoint.
Change the signature to the 'reactive-way' and implement this service.
For each call to `WikiController#fetchArticle` there should be emitted an Event on
this `#getReadStream`.
Hint: you need a `org.reactivestreams.Processor`, have a look what reactor offers.
- Write Unittests (`ReadEventTest`) and integration tests (`WikiControllerIntegrationTest`)
- start ms-wikiloader and ms-statistics. ms-statistics contains a basic frontend, which consumes
the `#getReadStream` endpoint and draws a diagram. The URL is printed out in the console after
startup of the service.
- execute some calls to the `#fetchArticle` endpoint and watch the diagram.
- Start jmeter (see `services/README.md`), open an provided testplan (*.jmx) and give some load on `#fetchArticle`.
At this point it is better to start the wikiloader service in mock mode
by activating the profile `mock` (see `services/README.md`). This should be the default
from now on.

Sprint 3 - getReadStream optimization
========
- Maybe you noticed some problems when jmeter put heavy load on `#fetchArticle`
When you publish every single 'articleRead'-Event on it's own, spring will flush
the HTTP connection to the frontend for each event, this produces a lot of overhead.
Now try to reduce the overhead and deliver the Events in batches.
Hint: A good batch size could be 250 milliseconds -> 4 flushes per second.
- Write tests
- Test it with jmeter and the frontend

Sprint 4
========

Hint: You have to use a Processor, which acts as a subscriber and as a publisher.
In `WikiController` there is already a Processor named `readArticles`, use this one.
- Activate the Unittests in `ReadEventTest` and `WikiControllerIntegrationTest` and look if they run successfully.
- Start the sprig boot application and also the ms-statistics service which contains a frontend (see `services/README.md`)
and try out your endpoints

Sprint 5
Sprint 3
========
- Have a look at `WikiController#countWords`.
This Implementation is not too bad, but is's blocking. Change it to non-blocking.
- Ensure Unittests (`WikiControllerTest`) and are still running successfully
and active integration tests (`WikiControllerIntegrationTest`)


Sprint 6
========


Sprint 7
Sprint 4
========
?


Sprint X.1 - improved caching (reactive datastore)
Expand All @@ -72,9 +43,9 @@ Sprint X.1 - improved caching (reactive datastore)
Use Spring Data to query the Database <https://spring.io/blog/2016/11/28/going-reactive-with-spring-data>
- After some time (e.g. 1 minute) you want to expire the data.
Implement a reacive timed job which drops outdated data.

Sprint X.2 - visualizes backpressure buffers
========
- add a diagram in the frontend which visualizes backpressure buffers in the streams.
add a new statistics endpoint which collects and processes this data and delivers it to the frontend


2 changes: 1 addition & 1 deletion kata-reactor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private Mono<String> queryTeamReport(String teamName) {
.map(sum -> "The team members:\n" +
team.stream().collect(joining(", ")) +
"\nproduced per member:\n" + sum + '\n')
).single();
).publishOn(Schedulers.single());
}

private Mono<String> querySalesReport() {
Expand Down Expand Up @@ -137,7 +137,7 @@ public void errorAndSubscribePositionIsShown() {
public void positionOfEveryOperatorIsShown() {
WaitMonitor monitor = new WaitMonitor();

Hooks.onOperator(Hooks.OperatorHook::operatorStacktrace);
Hooks.onOperatorDebug();

Disposable subscription = reportsStream()
.subscribeOn(Schedulers.elastic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void ambiguous() throws Exception {
.subscribeOn(Schedulers.elastic())
.retryWhen(retryWithDelay(3));

StepVerifier.create(Flux.firstEmitting(Arrays.asList(timeout, error, ok))
StepVerifier.create(Flux.first(timeout, error, ok)
.subscribeOn(Schedulers.elastic()))
.expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42"))
.verifyComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void backupOnError() throws Exception {

String wikiArticle = "42";
StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle)
.onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
.onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
.subscribeOn(Schedulers.elastic()))
.expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42"))
.verifyComplete();
Expand All @@ -50,7 +50,7 @@ public void defaultValueBackup() throws Exception {

final String wikiArticle = "42";
StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle)
.onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
.onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
.onErrorReturn(getCachedArticle(wikiArticle))
.subscribeOn(Schedulers.elastic()))
.expectNext("{{Dieser Artikel|behandelt 42}} ")
Expand All @@ -75,7 +75,7 @@ public void exponentialRetry() throws Exception {
final String wikiArticle = "42";
StepVerifier.create(wikiService.fetchArticleFlux(wikiArticle)
.retryWhen(retryWithDelay(3, RetryDelay.exponential()))
.onErrorResumeWith(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
.onErrorResume(error -> wikiServiceBackup.fetchArticleFlux(wikiArticle))
.onErrorReturn(getCachedArticle(wikiArticle))
.subscribeOn(Schedulers.elastic()))
.expectNextMatches(value -> value.startsWith("{{Dieser Artikel|behandelt das Jahr 42"))
Expand Down
25 changes: 5 additions & 20 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Aluminium-SR1</version>
<version>Bismuth-SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--RxJava 2-->
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.7</version>
<version>2.1.9</version>
</dependency>
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>0.16.0</version>
<version>0.18.5</version>
</dependency>
<dependency>
<groupId>com.github.davidmoten</groupId>
Expand Down Expand Up @@ -156,7 +156,7 @@
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
Expand Down Expand Up @@ -209,21 +209,6 @@
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-reactive-streams</artifactId>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -297,7 +282,7 @@
<module>netty-example</module>
<module>kata-rxjava</module>
<module>kata-reactor</module>
<module>kata-vertx</module>
<!--<module>kata-vertx</module>-->
<module>services</module>
</modules>
</project>
2 changes: 2 additions & 0 deletions services/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Reactive Microservices
which loads wiki articles from local files
- statistics (running on port 8080):
<br> `mvn spring-boot:run`
<br>
Frontend is available on http://localhost:8080/index.html

### jmeter (load/performance tests)
- have a look at ./jmeter/README.md
Expand Down
2 changes: 1 addition & 1 deletion services/ms-statistics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</dependency>

<dependency>
<groupId>io.projectreactor.addons</groupId>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static java.time.Duration.ofMillis;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE;

/**
Expand Down Expand Up @@ -49,7 +50,7 @@ public Flux<List<TopArticle>> topArticle(@RequestParam(required = false, default
@RequestParam(required = false, defaultValue = "5") int numberOfTopArticles) {
return articleReadEventsService.readEvents()
.doOnNext(StatisticsController::updateReadStatistic)
.sampleMillis(updateInterval)
.sample(ofMillis(updateInterval))
.map(readEvent -> createTopArticleList(numberOfTopArticles))
.retry(throwable -> {
logger.warn("error on topArticle, retrying", throwable);
Expand All @@ -62,7 +63,7 @@ public Flux<List<TopArticle>> topArticle(@RequestParam(required = false, default
public Flux<ArticleStatistics> fetchArticleStatistics(@RequestParam(required = false, defaultValue = "1000") int updateInterval) {
return articleReadEventsService.readEvents()
.onBackpressureDrop(articleReadEvent -> logger.warn("dropping articleReadEvent: " + articleReadEvent))
.bufferMillis(updateInterval / 3)
.buffer(ofMillis(updateInterval / 3))
.filter(articleReadEvents -> !articleReadEvents.isEmpty())
.flatMap(articleReadEvent -> {
Flux<ArticleName> distinctArticleNames = Flux.fromIterable(articleReadEvent)
Expand All @@ -82,7 +83,7 @@ public Flux<ArticleStatistics> fetchArticleStatistics(@RequestParam(required = f
zip.getT3().get(zip.getT1().getArticleName()),
zip.getT1().getFetchTimeInMillis()));
})
.bufferMillis(updateInterval)
.buffer(ofMillis(updateInterval))
.map(StatisticsController::calculateArticleStatistics)
.retry(throwable -> {
logger.warn("error on fetchArticleStatistics, retrying", throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* @author Daniel Heinrich
Expand Down Expand Up @@ -44,7 +45,8 @@ public Mono<Integer> fetchWordCount(String articleName) {
.retry(1)
.doOnNext(WrongStatusException.okFilter())
.flatMap(r -> r.bodyToMono(String.class))
.map(Integer::parseInt).single();
.map(Integer::parseInt)
.publishOn(Schedulers.single());
}

@Override
Expand All @@ -56,30 +58,33 @@ public Mono<Integer> fetchRating(String articleName) {
.retry(1)
.doOnNext(WrongStatusException.okFilter())
.flatMap(r -> r.bodyToMono(String.class))
.map(Integer::parseInt).single();
.map(Integer::parseInt)
.publishOn(Schedulers.single());
}

@Override
public Flux<WordCount> fetchWordCounts(Flux<ArticleName> articleNames) {
return wikiServiceClient.post()
.uri(ub -> ub.pathSegment(ARTICLE, WORD_COUNTS).build())
.accept(APPLICATION_STREAM_JSON_UTF8)
.exchange(articleNames, ArticleName.class)
.body(articleNames, ArticleName.class)
.exchange()
.doOnError(e -> LOGGER.error(e.getMessage()))
.retry(1)
.doOnNext(WrongStatusException.okFilter())
.flatMap(r -> r.bodyToFlux(WordCount.class));
.flatMapMany(r -> r.bodyToFlux(WordCount.class));
}

@Override
public Flux<Rating> fetchRatings(Flux<ArticleName> articleNames) {
return wikiServiceClient.post()
.uri(ub -> ub.pathSegment(ARTICLE, RATINGS).build())
.accept(APPLICATION_STREAM_JSON_UTF8)
.exchange(articleNames, ArticleName.class)
.body(articleNames, ArticleName.class)
.exchange()
.doOnError(e -> LOGGER.error(e.getMessage()))
.retry(1)
.doOnNext(WrongStatusException.okFilter())
.flatMap(r -> r.bodyToFlux(Rating.class));
.flatMapMany(r -> r.bodyToFlux(Rating.class));
}
}
Loading