|
| 1 | +--- |
| 2 | +layout: post |
| 3 | +title: "Coroutines" |
| 4 | +date: 2019-07-22 |
| 5 | +categories: ["Kotlin"] |
| 6 | +image: kotlin/coroutines |
| 7 | +github: kotlin/blob/master/coroutines |
| 8 | +description: "Kotlin" |
| 9 | +version: Kotlin v1.3 |
| 10 | +keywords: "kotlin, coroutines, coroutine, scope, context, dispatcher, suspend, suspendig, async, withcontext, coroutinescope, runBlocking, launch, channel, actor, produce, android, programowanie, programming" |
| 11 | +--- |
| 12 | + |
| 13 | +## Problem |
| 14 | +Tradycyjny model tworzenia kodu asynchronicznego opartego o metody zwrotne (`Callback`) narażony jest na występowanie różnych trudności. W przypadku realizacji współbieżnych zadań zależnych może pojawić się problem komunikacji między zadaniami, który wymusza znalezienie sposobu na synchronizowanie rezultatów zwiększając tym samym złożoność kodu. Innym problemem jest zagnieżdżenie metod zwrotnych co z kolei sprawia, że w rzeczywistości zadania wykonywane są synchronicznie przez co wydłuża się czas ich przetwarzania. `Coroutines` (współprogramy) znacznie ułatwiają realizację zadań współbieżnych poprzez zmianę stylu pisania kodu asynchronicznego w sposób sekwencyjny. Dzięki takiemu podejściu kod jest bardziej czytelny i zrozumiały, a zarządzanie zadaniami staję się łatwiejsze. Ponadto jeden wątek potrafi obsługiwać jednocześnie wiele coroutines co przekłada się na znaczny wzrost wydajności. O `coroutine` można myśleć jako sekwencji podzadań wykonywanych wg określonej kolejności. |
| 15 | + |
| 16 | +## Funkcje zawieszania |
| 17 | +Zasada działania coroutine oparta jest o ideę funkcji zawieszania (`suspend function`). Funkcja ta potrafi zawiesić swoje działanie do późniejszego wykonania bez blokowania wątku (np. w oczekiwaniu na zakończenie innej funkcji), gdzie koszt zawieszenia w stosunku do blokowania wątku juz dużo niższy. Funkcje wstrzymania muszą być wywołane wewnątrz coroutine lub w innej funkcji zawieszenia i mogą być uruchamiane na tym samym lub różnych wątkach. Aby zadeklarować funkcje zawieszenia należy użyć słowa kluczowego `suspend`. |
| 18 | + |
| 19 | +{% highlight kotlin %} |
| 20 | +suspend fun suspendingFunction() : Int { |
| 21 | + //some task |
| 22 | + return 0 |
| 23 | +} |
| 24 | +{% endhighlight %} |
| 25 | + |
| 26 | +## Kontekst |
| 27 | +Kontekst współprogramu (`CoroutineContext`) jest zbiorem zasad i konfiguracji, która definiuje sposób w jaki coroutine będzie wykonywany (może być kombinacją różnych kontekstów `CombinedContext`). Jednym z możliwych sposobów dostarczenia kontekstu jest użycie klasy `Dispatchers` zawierającej zbiór implementacji różniących się wykorzystaniem wątków. `Dispatchers.Default` może działać na wielu wątkach i używany jest do kosztownych zadań o dużym zapotrzebowaniu mocy obliczeniowej jak np. algorytmy. `Dispatchers.Main` działa na głównym wątku co w przypadku Android pozwala na modyfikację interfejsu użytkownika. `Dispatchers.IO` jest używany przede wszystkim do prostych operacji typu wejście/wyjście jak np. zapytanie sieciowe, dostęp do bazy danych, plików czy sensorów. `Dispatchers.Unconfined` nie ogranicza się do żadnego wątku w wyniku czego jego zachowanie jest trudne do przewidzenia. Kontekst może być przekazany jawnie jako argument lub uzyskiwany niejawnie na podstawie zakresu w którym jest wykonywany. |
| 28 | + |
| 29 | +{% highlight kotlin %} |
| 30 | +//withContext is suspend functions itself so no need to declare in inside suspend function if used inside coroutine |
| 31 | +suspend fun suspendingWithContext() = |
| 32 | + //allows to change contex in coroutine for given block |
| 33 | + withContext(Dispatchers.Main) { |
| 34 | + //some work |
| 35 | + } |
| 36 | +{% endhighlight %} |
| 37 | + |
| 38 | +## Budowniczy |
| 39 | +Tworzenie i wykonanie coroutine może odbywać się przez jedną z funkcji budowniczego (`coroutine builder`) do której należą m.in.: `runBlocking`, `launch`, `async`. Funkcję te nie są funkcjami zawieszenia w związku z czym po wywołaniu kontynuowane jest działanie kolejnych instrukcji. Coroutines budowane są i działają w ramach struktury hierarchii (`structured concurrency`), tzn. rodzic (`parent coroutine`) ma zdolność do oddziaływania na cykl życia dzieci (`child coroutine`), a ich wykonanie przywiązane jest do danego zakresu (`scope`). |
| 40 | + |
| 41 | +`runBlocking` blokuje bieżący wątek dopóki wszystkie zadania w coroutine nie zostaną wykonane. Jest to przydatne w pisaniu testów wymagających wstrzymania. |
| 42 | + |
| 43 | +{% highlight kotlin %} |
| 44 | +//wait to finish this function to go further |
| 45 | +fun testSuspendingWork() = runBlocking { |
| 46 | + val result = suspendingFunction() |
| 47 | + //wait here for result |
| 48 | + assertEquals(0, result) |
| 49 | +} |
| 50 | +{% endhighlight %} |
| 51 | + |
| 52 | +`launch` jest często wykorzystywanym budowniczym, który w przeciwieństwie do `runBlocking` nie blokuje bieżącego wątku. Zwraca obiekt typu `Job` dzięki któremu możliwe jest manualne zarządzanie stanem zadań. Metoda `join` blokuje powiązany coroutine tak długo dopóki wszystkie jego zadania nie zostaną wykonane, natomiast `cancel` anuluje wszystkie zadania. Wykorzystywany do zadań typu `fire and forget` w których nie oczekuje się zwrócenia rezultatu. |
| 53 | + |
| 54 | +{% highlight kotlin %} |
| 55 | +//coroutines must be called on some scope so just use main scope called GlobalScope |
| 56 | +suspend fun launchJobAndJoin() { |
| 57 | + //Job extends CoroutineContext to it's context itself |
| 58 | + val job = GlobalScope.launch(Dispatchers.Main) { |
| 59 | + //some work |
| 60 | + val result1 = suspendingWork1() |
| 61 | + //wait for result1 |
| 62 | + val result2 = suspendingWork2() |
| 63 | + //wait for result2 |
| 64 | + //process results |
| 65 | + } |
| 66 | + |
| 67 | + //wait here until suspendingWork tasks finished |
| 68 | + job.join() //suspending function itself |
| 69 | +} |
| 70 | + |
| 71 | +fun launchJobAndCancel() { |
| 72 | + val job = GlobalScope.launch(Dispatchers.Main) { |
| 73 | + //some work |
| 74 | + val result1 = suspendingWork1() |
| 75 | + val result2 = suspendingWork2() |
| 76 | + //process results |
| 77 | + } |
| 78 | + |
| 79 | + //cancel all the jobs, so if suspendingWork is running then cancel it and pending tasks |
| 80 | + job.cancel() //regular function so no need to run inside coroutine or suspend function |
| 81 | +} |
| 82 | +{% endhighlight %} |
| 83 | + |
| 84 | +`async` podobnie jak `launch` pozwala na równoległe wykonanie zadań w tle, jednakże zwraca obiekt typu `Deferred`, który jest obietnicą przyszłego rezultatu. Metoda `await` w przypadku braku wyniku zawiesza dalsze wykonywanie instrukcji do momentu otrzymania rezultatu. |
| 85 | + |
| 86 | +{% highlight kotlin %} |
| 87 | +fun launchAsync() { |
| 88 | + val job = GlobalScope.launch(Dispatchers.Main) { |
| 89 | + val result = suspendingWork() |
| 90 | + val deferred1 = async { |
| 91 | + //some work |
| 92 | + return@async "result2" |
| 93 | + } |
| 94 | + val deferred2 = async { |
| 95 | + //some work |
| 96 | + return@async "result3" |
| 97 | + } |
| 98 | + |
| 99 | + //if in this place deferred1 and deferred2 not finished, wait for it |
| 100 | + val finalResult = "$result ${deferred1.await()} ${deferred2.await()}" |
| 101 | + } |
| 102 | + //run job.cancel() to cancel parent and all childs |
| 103 | +} |
| 104 | +{% endhighlight %} |
| 105 | + |
| 106 | +## Zakres |
| 107 | +Coroutines działają w ramach zakresu (`scope`), który stanowi dla nich przestrzeń wykonawczą i jest realizacją struktury hierarchii. Odwołanie się do zakresu wpływa na wszystkie znajdujące się w nim coroutines. Ich wykorzystanie eliminuje problem manualnego zarządzania stanem zadań, których praca i oczekiwanie na rezultat nierzadko ma sens tylko dla bieżącego ekranu (np. ładowanie danych do wyświetlenia). Zamiast ręcznego anulowania wszystkich coroutines wystarczy odwołać je poprzez zakres. Dowolna klasa implementująca `CoroutineScope` oraz nadpisująca właściwość `coroutineContext` może stać się zakresem. Warto zauważyć, że funkcje budowniczego są funkcjami rozszerzającymi `CoroutineScope`. Przykładem zakresu może być `GlobalScope` stanowiący ogólny zakres aplikacji. |
| 108 | + |
| 109 | +{% highlight kotlin %} |
| 110 | +class ScopeActivity : AppCompatActivity() { |
| 111 | + |
| 112 | + override fun onCreate(savedInstanceState: Bundle?) { |
| 113 | + super.onCreate(savedInstanceState) |
| 114 | + |
| 115 | + //calling just launch { } is not possible |
| 116 | + //launch must be called on some CoroutineScope |
| 117 | + |
| 118 | + CoroutineScope(Dispatchers.Main).launch { |
| 119 | + //work |
| 120 | + } |
| 121 | + |
| 122 | + val job = GlobalScope.launch(Dispatchers.Main) { |
| 123 | + //work |
| 124 | + async { |
| 125 | + //coroutine nested in coroutine |
| 126 | + } |
| 127 | + } |
| 128 | + //job.cancel() will cancel parent and childs |
| 129 | + } |
| 130 | +} |
| 131 | + |
| 132 | +//extend CoroutineScope |
| 133 | +class ScopeClassActivity : AppCompatActivity(), CoroutineScope { |
| 134 | + |
| 135 | + //can be combined with another context like contatenation with Job |
| 136 | + override val coroutineContext: CoroutineContext |
| 137 | + get() = Dispatchers.Main |
| 138 | + |
| 139 | + override fun onCreate(savedInstanceState: Bundle?) { |
| 140 | + super.onCreate(savedInstanceState) |
| 141 | + |
| 142 | + //now calling launch { } is possible because Activity is CoroutineScope itself |
| 143 | + launch { |
| 144 | + //work |
| 145 | + } |
| 146 | + } |
| 147 | + |
| 148 | + override fun onDestroy() { |
| 149 | + super.onDestroy() |
| 150 | + cancel() //cancel on scope so all coroutines inside scope are cancelling |
| 151 | + } |
| 152 | +} |
| 153 | + |
| 154 | +//or provide CoroutineScope by delegate |
| 155 | +class ScopeDelegateActivity : AppCompatActivity(), CoroutineScope by MainScope() { |
| 156 | + |
| 157 | + override fun onCreate(savedInstanceState: Bundle?) { |
| 158 | + super.onCreate(savedInstanceState) |
| 159 | + } |
| 160 | + |
| 161 | + //this class is CoroutineScope itself |
| 162 | +} |
| 163 | +{% endhighlight %} |
| 164 | + |
| 165 | +## Kanały |
| 166 | +Kanały są mechanizmem (podobnym do kolejki) pozwalającymi na przesyłanie i odbieranie potoku strumienia wartości między coroutines. W celu zbudowania kanału należy stworzyć instancję klasy `Channel`, która implementuje interfejsy zachowania zarówno nadawcy (`SendChannel`) jak i odbiorcy (`ReceiveChannel`). Opcjonalny parametr przekazany do metody wytwórczej odpowiada za wielkość bufora. Funkcja `send` i `receive` są funkcjami zawieszenia (zawieszają się w przypadku braku odbiorcy lub braku emisji) i umożliwiają odpowiednio emisję i odbiór wartości. Przetwarzanie wartości może także odbywać się poprzez iteracje kanału lub funkcje `consume`, `consumeEach`. |
| 167 | + |
| 168 | +{% highlight kotlin %} |
| 169 | +fun runChannel() = runBlocking { |
| 170 | + //create channel using factory method |
| 171 | + //use one of RENDEZVOUS, UNLIMITED, CONFLATED optional param to specify buffer |
| 172 | + val channel = Channel<Int>(RENDEZVOUS) |
| 173 | + |
| 174 | + launch { |
| 175 | + repeat(3) { |
| 176 | + //send some items from this coroutine |
| 177 | + channel.send(it) |
| 178 | + //suspend if no receivers |
| 179 | + } |
| 180 | + |
| 181 | + //close channel to stop emission, this guarantees to send pending items |
| 182 | + channel.close() |
| 183 | + } |
| 184 | + |
| 185 | + //receive emitted values by receive function |
| 186 | + val value = channel.receive() |
| 187 | + |
| 188 | + //as alternative use channel's loop or consume by extension function of ReceiveChannel |
| 189 | + channel.consumeEach { |
| 190 | + //do something with received values: 1, 2, 3 |
| 191 | + } |
| 192 | + |
| 193 | + //emitted items can be consumed single time, so trying to receive again will no result |
| 194 | +} |
| 195 | +{% endhighlight %} |
| 196 | + |
| 197 | +Kanały typu `Channel` ograniczone są do jednorazowego przepływu informacji, tzn. kanał może tylko raz odebrać wiadomości (w jednym miejscu). W sytuacji, gdy występuje wielu potencjalnych odbiorców należy użyć kanału typu `BroadcastChannel` lub `ConflatedBroadcastChannel`. Różnica między nimi polega na tym, że ten drugi informuje tylko o ostatniej wartości. |
| 198 | + |
| 199 | +{% highlight kotlin %} |
| 200 | +fun runBroadcastChannel() = runBlocking { |
| 201 | + val channel = BroadcastChannel<Int>(UNLIMITED) |
| 202 | + launch { |
| 203 | + repeat(3) { channel.send(it) } //doesn't supsend if no receivers |
| 204 | + channel.close() |
| 205 | + } |
| 206 | + |
| 207 | + //items can be consumed by multiple receivers |
| 208 | + channel.consumeEach {} |
| 209 | + channel.consumeEach {} |
| 210 | +} |
| 211 | +{% endhighlight %} |
| 212 | + |
| 213 | +Możliwe jest także tworzenie coroutine z automatycznie załączonym kanałem nastawionym na emisje lub odbiór za pomocą funkcji budowniczych producenta i aktora. `produce` uruchamia coroutine, który emituje strumień danych do kanału (jeden nadawca, wielu odbiorców). Zwraca `ReceiveChannel` oraz należy do zakresu `ProducerScope`. |
| 214 | + |
| 215 | +{% highlight kotlin %} |
| 216 | +//use it as some variable in real world |
| 217 | +fun runProducer() = GlobalScope.launch { |
| 218 | + //produce is extension function of CoroutineScope |
| 219 | + val producer : ReceiveChannel<Int> = produce { |
| 220 | + repeat(3) { send(it) } |
| 221 | + } |
| 222 | + |
| 223 | + //use instance of ReceiveChannel to receive values emitted by produce |
| 224 | + producer.consumeEach { |
| 225 | + //do something |
| 226 | + } |
| 227 | +} |
| 228 | +{% endhighlight %} |
| 229 | + |
| 230 | +`actor` uruchamia coroutine, który odbiera wiadomości z kanału (jeden odbiorca, wielu nadawców). Zwraca `SendChannel` i należy do zakresu `ActorScope`. |
| 231 | + |
| 232 | +{% highlight kotlin %} |
| 233 | +//use it as some variable in real world |
| 234 | +fun runActor() = GlobalScope.launch { |
| 235 | + //actor is extension function of CoroutineScope |
| 236 | + val actor : SendChannel<Int> = actor { |
| 237 | + for(item in channel) { |
| 238 | + //do something |
| 239 | + } |
| 240 | + } |
| 241 | + |
| 242 | + repeat(3) { actor.send(it) } |
| 243 | +} |
| 244 | +{% endhighlight %} |
0 commit comments