Skip to content

Commit 9a7b3ed

Browse files
committed
Show off restarting map_async when it stops
1 parent 96463b5 commit 9a7b3ed

File tree

1 file changed

+29
-12
lines changed

1 file changed

+29
-12
lines changed

examples/map_failure_modes.py

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import sys
23
from itertools import count
34
from streamz import Stream
45

@@ -9,37 +10,53 @@ async def flaky_async(x, from_where):
910

1011
def flaky_sync(x, from_where):
1112
if x % 5 == 4:
12-
raise ValueError(f"I flaked out on {from_where}")
13+
raise ValueError(f"I flaked out on {x} for {from_where}")
1314
return x
1415

1516

1617
def make_counter(name):
1718
return Stream.from_iterable(count(), asynchronous=True, stream_name=name)
1819

1920

20-
async def main():
21+
async def main(run_flags):
2122
async_non_stop_source = make_counter("async not stopping")
22-
s_async = async_non_stop_source.map_async(flaky_async, async_non_stop_source)
23-
s_async.rate_limit("500ms").sink(print, async_non_stop_source.name)
23+
s_async = async_non_stop_source.rate_limit("500ms").map_async(flaky_async, async_non_stop_source)
24+
s_async.sink(print, async_non_stop_source.name)
2425

2526
sync_source = make_counter("sync")
26-
s_sync = sync_source.map(flaky_sync, sync_source)
27-
s_sync.rate_limit("500ms").sink(print, sync_source.name)
27+
s_sync = sync_source.rate_limit("500ms").map(flaky_sync, sync_source)
28+
s_sync.sink(print, sync_source.name)
2829

2930
async_stopping_source = make_counter("async stopping")
30-
s_async = async_stopping_source.map_async(flaky_async, async_stopping_source, stop_on_exception=True)
31-
s_async.rate_limit("500ms").sink(print, async_stopping_source.name)
31+
s_async_stop = async_stopping_source.rate_limit("500ms").map_async(flaky_async, async_stopping_source, stop_on_exception=True)
32+
s_async_stop.sink(print, async_stopping_source.name)
33+
34+
if run_flags[0]:
35+
async_non_stop_source.start()
36+
if run_flags[1]:
37+
sync_source.start()
38+
if run_flags[2]:
39+
async_stopping_source.start()
3240

33-
async_non_stop_source.start()
34-
sync_source.start()
35-
async_stopping_source.start()
3641
print(f"{async_non_stop_source.started=}, {sync_source.started=}, {async_stopping_source.started=}")
3742
await asyncio.sleep(3)
3843
print(f"{async_non_stop_source.stopped=}, {sync_source.stopped=}, {async_stopping_source.stopped=}")
3944

45+
if run_flags[2]:
46+
print()
47+
print(f"Restarting {async_stopping_source}")
48+
async_stopping_source.start()
49+
print()
50+
await asyncio.sleep(2)
51+
print(f"{async_non_stop_source.stopped=}, {sync_source.stopped=}, {async_stopping_source.stopped=}")
52+
4053

4154
if __name__ == "__main__":
4255
try:
43-
asyncio.run(main())
56+
if len(sys.argv) > 1:
57+
flags = [char == "T" for char in sys.argv[1]]
58+
else:
59+
flags = [True, True, True]
60+
asyncio.run(main(flags))
4461
except KeyboardInterrupt:
4562
pass

0 commit comments

Comments
 (0)