Skip to content

Commit d056192

Browse files
committed
coroutines post updated
1 parent 4ef5ec1 commit d056192

File tree

1 file changed

+144
-30
lines changed

1 file changed

+144
-30
lines changed

_drafts/2019-07-22-coroutines.md

Lines changed: 144 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
layout: post
33
title: "Coroutines"
4-
date: 2019-07-22
4+
date: 2019-07-22
55
categories: ["Kotlin"]
66
image: kotlin/coroutines
77
github: kotlin/blob/master/coroutines
@@ -24,7 +24,19 @@ suspend fun suspendingFunction() : Int {
2424
{% endhighlight %}
2525

2626
## Kontekst
27-
Kontekst współprogramu (`CoroutineContext`) jest zbiorem zasad i konfiguracji, która definiuje sposób w jaki coroutine będzie wykonywany (może być kombinacją różnych kontekstów `CombinedContext`). Jednym z możliwych sposobów dostarczenia kontekstu jest użycie klasy `Dispatchers` zawierającej zbiór implementacji różniących się wykorzystaniem wątków. `Dispatchers.Default` może działać na wielu wątkach i używany jest do kosztownych zadań o dużym zapotrzebowaniu mocy obliczeniowej jak np. algorytmy. `Dispatchers.Main` działa na głównym wątku co w przypadku Android pozwala na modyfikację interfejsu użytkownika. `Dispatchers.IO` jest używany przede wszystkim do prostych operacji typu wejście/wyjście jak np. zapytanie sieciowe, dostęp do bazy danych, plików czy sensorów. `Dispatchers.Unconfined` nie ogranicza się do żadnego wątku w wyniku czego jego zachowanie jest trudne do przewidzenia. Kontekst może być przekazany jawnie jako argument lub uzyskiwany niejawnie na podstawie zakresu w którym jest wykonywany.
27+
Kontekst współprogramu (`CoroutineContext`) jest zbiorem zasad i konfiguracji, która definiuje sposób w jaki coroutine będzie wykonywany. Może być także kombinacją obiektów różnych typów kontekstu (`CombinedContext`). Składa się przeważnie z obiektów typu `Job`, `CoroutineDispatcher` oraz `CoroutineExceptionHandler`. Coroutine zawsze wykonywany jest w ramach jakiegoś kontekstu, który może być przekazany jawnie jako argument lub uzyskiwany niejawnie na podstawie zakresu w którym jest wykonywany.
28+
29+
{% highlight kotlin %}
30+
val handler = CoroutineExceptionHandler { context, exception ->
31+
//manage caught exception like logger action
32+
}
33+
34+
//actually this is CombinedContext object
35+
val context : CoroutineContext = Dispatchers.Default + Job() + handler
36+
{% endhighlight %}
37+
38+
## Dispatcher
39+
Kontekts zawiera m.in. instancję `CoroutineDispatcher`, której zadaniem jest określenie wątków wykonawczych dla coroutine. `Dispatchers.Default` może działać na wielu wątkach i używany jest do kosztownych zadań o dużym zapotrzebowaniu mocy obliczeniowej jak np. algorytmy. `Dispatchers.Main` działa na głównym wątku co w przypadku Android pozwala na modyfikację interfejsu użytkownika. `Dispatchers.IO` jest używany przede wszystkim do prostych operacji typu wejście/wyjście jak np. zapytanie sieciowe, dostęp do bazy danych, plików czy sensorów. `Dispatchers.Unconfined` nie ogranicza się do żadnego wątku w wyniku czego jego zachowanie jest trudne do przewidzenia.
2840

2941
{% highlight kotlin %}
3042
//withContext is suspend functions itself so no need to declare in inside suspend function if used inside coroutine
@@ -52,31 +64,30 @@ fun testSuspendingWork() = runBlocking {
5264
`launch` jest często wykorzystywanym budowniczym, który w przeciwieństwie do `runBlocking` nie blokuje bieżącego wątku. Zwraca obiekt typu `Job` dzięki któremu możliwe jest manualne zarządzanie stanem zadań. Metoda `join` blokuje powiązany coroutine tak długo dopóki wszystkie jego zadania nie zostaną wykonane, natomiast `cancel` anuluje wszystkie zadania. Wykorzystywany do zadań typu `fire and forget` w których nie oczekuje się zwrócenia rezultatu.
5365

5466
{% highlight kotlin %}
55-
//coroutines must be called on some scope so just use main scope called GlobalScope
5667
suspend fun launchJobAndJoin() {
5768
//Job extends CoroutineContext to it's context itself
58-
val job = GlobalScope.launch(Dispatchers.Main) {
69+
val job = launch { //note that coroutines must be called on some scope
5970
//some work
6071
val result1 = suspendingWork1()
6172
//wait for result1
6273
val result2 = suspendingWork2()
6374
//wait for result2
6475
//process results
6576
}
66-
67-
//wait here until suspendingWork tasks finished
77+
78+
//wait here until child tasks finished
6879
job.join() //suspending function itself
6980
}
7081

7182
fun launchJobAndCancel() {
72-
val job = GlobalScope.launch(Dispatchers.Main) {
83+
val job = launch {
7384
//some work
7485
val result1 = suspendingWork1()
7586
val result2 = suspendingWork2()
7687
//process results
7788
}
78-
79-
//cancel all the jobs, so if suspendingWork is running then cancel it and pending tasks
89+
90+
//cancel all cancellable jobs, so if suspendingWork is running then cancel it and pending tasks
8091
job.cancel() //regular function so no need to run inside coroutine or suspend function
8192
}
8293
{% endhighlight %}
@@ -85,16 +96,21 @@ fun launchJobAndCancel() {
8596

8697
{% highlight kotlin %}
8798
fun launchAsync() {
88-
val job = GlobalScope.launch(Dispatchers.Main) {
99+
val job = launch {
89100
val result = suspendingWork()
101+
90102
val deferred1 = async {
91103
//some work
92104
return@async "result2"
93105
}
94-
val deferred2 = async {
106+
107+
//async can be lazy started when manual start or await called
108+
val deferred2 = async(start = CoroutineStart.LAZY) {
95109
//some work
96110
return@async "result3"
97111
}
112+
//note that if no start for lazy async called then behaviour is sequantial when await called
113+
deferred2.start()
98114

99115
//if in this place deferred1 and deferred2 not finished, wait for it
100116
val finalResult = "$result ${deferred1.await()} ${deferred2.await()}"
@@ -103,6 +119,71 @@ fun launchAsync() {
103119
}
104120
{% endhighlight %}
105121

122+
## Anulowanie
123+
Wszystkie funkcje zawieszenia w coroutine są `cancellable`, tzn. potrafią obsłużyć żądanie o anulowaniu pracy przez metodę `cancel`. Jeśli coroutine został anulowany to wyrzucany jest wyjątek `CancellationException` w wyniku czego następuje przerwanie działania. Jednakże jeśli blok kodu nie jest elementem funkcji zawieszenia wówczas nie dochodzi do automatycznego sprawdzania stanu pracy co sprawia, że kod nie reaguje na żądanie `cancel`. W takiej sytuacji należy ręcznie sprawdzać stan pracy poprzez właściwość `isActive` lub funkcję `yield` (okresowo zawiesza działanie funkcji) czy też ustawienie maksymalnego czasu wykonania za pomocą funkcji `withTimeout` i `withTimeoutOrNull`.
124+
125+
{% highlight kotlin %}
126+
fun cancellableSuspsend() = runBlocking {
127+
val job = launch(Dispatchers.Default) {
128+
repeat(100) {
129+
//this computation are suspend function, so it is cancellable
130+
suspendingWork()
131+
}
132+
}
133+
134+
delay(1) //allow to start coroutine before cancel
135+
job.cancel()
136+
}
137+
138+
fun notCancellableComputation() = runBlocking {
139+
val job = launch(Dispatchers.Default) {
140+
repeat(100) {
141+
//some intensive computation
142+
notSuspendingWork()
143+
}
144+
}
145+
146+
delay(1) //allow to start coroutine before cancel
147+
job.cancel() //this won't work
148+
}
149+
150+
fun cancellableComputation() = runBlocking {
151+
val job = launch(Dispatchers.Default) {
152+
//cancellable code throw CancellationException on cancel
153+
try {
154+
repeat(100) {
155+
//check periodically is scope active or has been cancelled
156+
if (isActive) {
157+
//do some intensive computation if coroutine is still active
158+
notSuspendingWork()
159+
}
160+
}
161+
}
162+
finally {
163+
//coroutine has been cancelled
164+
//run suspending function here will throw CancellationException
165+
withContext(NonCancellable) {
166+
//running suspending function is now possible
167+
}
168+
}
169+
}
170+
171+
delay(1) //allow to start coroutine before cancel
172+
job.cancel()
173+
}
174+
175+
//coroutines must be called on some scope so just use main scope called GlobalScope
176+
fun cancellableByTimeout() = runBlocking {
177+
//cancel when coroutine couldn't complete after 1 second
178+
val result = withTimeoutOrNull(1000) {
179+
repeat(100) {
180+
notSuspendingWork()
181+
}
182+
}
183+
//use withTime to do the same but throw TimeoutCancellationException instead of return null
184+
}
185+
{% endhighlight %}
186+
106187
## Zakres
107188
Coroutines działają w ramach zakresu (`scope`), który stanowi dla nich przestrzeń wykonawczą i jest realizacją struktury hierarchii. Odwołanie się do zakresu wpływa na wszystkie znajdujące się w nim coroutines. Ich wykorzystanie eliminuje problem manualnego zarządzania stanem zadań, których praca i oczekiwanie na rezultat nierzadko ma sens tylko dla bieżącego ekranu (np. ładowanie danych do wyświetlenia). Zamiast ręcznego anulowania wszystkich coroutines wystarczy odwołać je poprzez zakres. Dowolna klasa implementująca `CoroutineScope` oraz nadpisująca właściwość `coroutineContext` może stać się zakresem. Warto zauważyć, że funkcje budowniczego są funkcjami rozszerzającymi `CoroutineScope`. Przykładem zakresu może być `GlobalScope` stanowiący ogólny zakres aplikacji.
108189

@@ -118,14 +199,13 @@ class ScopeActivity : AppCompatActivity() {
118199
CoroutineScope(Dispatchers.Main).launch {
119200
//work
120201
}
121-
122-
val job = GlobalScope.launch(Dispatchers.Main) {
202+
203+
val job = GlobalScope.launch {
123204
//work
124205
async {
125206
//coroutine nested in coroutine
126207
}
127208
}
128-
//job.cancel() will cancel parent and childs
129209
}
130210
}
131211

@@ -139,12 +219,12 @@ class ScopeClassActivity : AppCompatActivity(), CoroutineScope {
139219
override fun onCreate(savedInstanceState: Bundle?) {
140220
super.onCreate(savedInstanceState)
141221
142-
//now calling launch { } is possible because Activity is CoroutineScope itself
222+
//now calling launch is possible because Activity is CoroutineScope itself
143223
launch {
144224
//work
145225
}
146226
}
147-
227+
148228
override fun onDestroy() {
149229
super.onDestroy()
150230
cancel() //cancel on scope so all coroutines inside scope are cancelling
@@ -157,39 +237,39 @@ class ScopeDelegateActivity : AppCompatActivity(), CoroutineScope by MainScope()
157237
override fun onCreate(savedInstanceState: Bundle?) {
158238
super.onCreate(savedInstanceState)
159239
}
160-
240+
161241
//this class is CoroutineScope itself
162242
}
163243
{% endhighlight %}
164244

165245
## Kanały
166-
Kanały są mechanizmem (podobnym do kolejki) pozwalającymi na przesyłanie i odbieranie potoku strumienia wartości między coroutines. W celu zbudowania kanału należy stworzyć instancję klasy `Channel`, która implementuje interfejsy zachowania zarówno nadawcy (`SendChannel`) jak i odbiorcy (`ReceiveChannel`). Opcjonalny parametr przekazany do metody wytwórczej odpowiada za wielkość bufora. Funkcja `send` i `receive` są funkcjami zawieszenia (zawieszają się w przypadku braku odbiorcy lub braku emisji) i umożliwiają odpowiednio emisję i odbiór wartości. Przetwarzanie wartości może także odbywać się poprzez iteracje kanału lub funkcje `consume`, `consumeEach`.
246+
Kanały są mechanizmem (podobnym do kolejki) pozwalającymi na przesyłanie i odbieranie potoku strumienia wartości między coroutines. W celu zbudowania kanału należy stworzyć instancję klasy `Channel`, która implementuje interfejsy zachowania zarówno nadawcy (`SendChannel`) jak i odbiorcy (`ReceiveChannel`). Opcjonalny parametr przekazany do metody wytwórczej odpowiada za wielkość bufora. Kanały bez buforowe przesyłają elementy dopiero wtedy gdy nadawca i odbiorca są gotowi do komunikacji (spotykają się), tzn. funkcja `send` oczekuje na odbiorcę aby dokonać emisji natomiast funkcja `receive` oczekuje na nadawcę aby rozpocząć odbieranie. W przypadku kanałów buforowych strategia zawieszenia pozwala na wcześniejszą emisję w zależności od wielkościu bufora. Przetwarzanie wartości może także odbywać się poprzez iteracje kanału lub funkcje `consume`, `consumeEach`.
167247

168248
{% highlight kotlin %}
169249
fun runChannel() = runBlocking {
170250
//create channel using factory method
171251
//use one of RENDEZVOUS, UNLIMITED, CONFLATED optional param to specify buffer
172-
val channel = Channel<Int>(RENDEZVOUS)
173-
252+
val channel = Channel<Int>(RENDEZVOUS) //it is unbuffered channel
253+
174254
launch {
175255
repeat(3) {
176256
//send some items from this coroutine
177257
channel.send(it)
178258
//suspend if no receivers
179259
}
180-
260+
181261
//close channel to stop emission, this guarantees to send pending items
182262
channel.close()
183263
}
184-
264+
185265
//receive emitted values by receive function
186266
val value = channel.receive()
187267

188268
//as alternative use channel's loop or consume by extension function of ReceiveChannel
189269
channel.consumeEach {
190270
//do something with received values: 1, 2, 3
191271
}
192-
272+
193273
//emitted items can be consumed single time, so trying to receive again will no result
194274
}
195275
{% endhighlight %}
@@ -199,22 +279,22 @@ Kanały typu `Channel` ograniczone są do jednorazowego przepływu informacji, t
199279
{% highlight kotlin %}
200280
fun runBroadcastChannel() = runBlocking {
201281
val channel = BroadcastChannel<Int>(UNLIMITED)
202-
launch {
282+
launch {
203283
repeat(3) { channel.send(it) } //doesn't supsend if no receivers
204284
channel.close()
205285
}
206-
286+
207287
//items can be consumed by multiple receivers
208288
channel.consumeEach {}
209-
channel.consumeEach {}
289+
channel.consumeEach {}
210290
}
211291
{% endhighlight %}
212292

213293
Możliwe jest także tworzenie coroutine z automatycznie załączonym kanałem nastawionym na emisje lub odbiór za pomocą funkcji budowniczych producenta i aktora. `produce` uruchamia coroutine, który emituje strumień danych do kanału (jeden nadawca, wielu odbiorców). Zwraca `ReceiveChannel` oraz należy do zakresu `ProducerScope`.
214294

215295
{% highlight kotlin %}
216296
//use it as some variable in real world
217-
fun runProducer() = GlobalScope.launch {
297+
fun runProducer() = launch {
218298
//produce is extension function of CoroutineScope
219299
val producer : ReceiveChannel<Int> = produce {
220300
repeat(3) { send(it) }
@@ -223,22 +303,56 @@ fun runProducer() = GlobalScope.launch {
223303
//use instance of ReceiveChannel to receive values emitted by produce
224304
producer.consumeEach {
225305
//do something
226-
}
306+
}
227307
}
228308
{% endhighlight %}
229309

230310
`actor` uruchamia coroutine, który odbiera wiadomości z kanału (jeden odbiorca, wielu nadawców). Zwraca `SendChannel` i należy do zakresu `ActorScope`.
231311

232312
{% highlight kotlin %}
233313
//use it as some variable in real world
234-
fun runActor() = GlobalScope.launch {
314+
fun runActor() = launch {
235315
//actor is extension function of CoroutineScope
236316
val actor : SendChannel<Int> = actor {
237317
for(item in channel) {
238-
//do something
318+
//often used with selead class to do something
239319
}
240320
}
241321

242322
repeat(3) { actor.send(it) }
243323
}
324+
{% endhighlight %}
325+
326+
Wyrażenie `select` umożliwia jednoczesne oczekiwanie na wiele funkcji zawieszenia i wybranie pierwszej, która stanie się dostępna. Metoda `onReceive` definiuje zachowania otrzymania wiadomości przez kanał, natomiast `onSend` dokonuje emisji wartości do kanału.
327+
328+
{% highlight kotlin %}
329+
fun runSelect() = launch {
330+
val producer = produce {
331+
repeat(5) { send(it) }
332+
}
333+
//more producers
334+
335+
val actor = actor<Int> {
336+
consumeEach {
337+
//do something
338+
}
339+
}
340+
//more actors
341+
342+
//select works on some channels
343+
repeat(5) {
344+
select<Unit> {
345+
//do only one action for first available supsend fun
346+
347+
//imagine the case when select must do receive action for multiple channels
348+
producer.onReceive { value ->
349+
//do something
350+
}
351+
//define more onReceive for other channels
352+
353+
//or to send value for multiple channels
354+
actor.onSend(100) {}
355+
}
356+
}
357+
}
244358
{% endhighlight %}

0 commit comments

Comments
 (0)