Skip to content

Commit ffbd59d

Browse files
authored
Merge pull request #494 from dwskoog/example_fixes
Refresh examples to ensure that they run in current version
2 parents 3c0f570 + 5ad905e commit ffbd59d

File tree

10 files changed

+103
-53
lines changed

10 files changed

+103
-53
lines changed

examples/fib_asyncio.py

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,17 @@
1-
from streamz import Stream
21
import asyncio
3-
from tornado.platform.asyncio import AsyncIOMainLoop
4-
AsyncIOMainLoop().install()
2+
from streamz import Stream
53

64

7-
source = Stream()
5+
source = Stream(asynchronous=True)
86
s = source.sliding_window(2).map(sum)
9-
L = s.sink_to_list() # store result in a list
10-
11-
s.rate_limit(0.5).sink(source.emit) # pipe output back to input
12-
s.rate_limit(1.0).sink(lambda x: print(L)) # print state of L every second
13-
14-
source.emit(0) # seed with initial values
15-
source.emit(1)
16-
7+
L = s.sink_to_list() # store result in a list
178

18-
def run_asyncio_loop():
19-
loop = asyncio.get_event_loop()
20-
try:
21-
loop.run_forever()
22-
except KeyboardInterrupt:
23-
pass
24-
finally:
25-
loop.close()
9+
s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second
10+
s.rate_limit('500ms').connect(source) # pipe output back to input
2611

12+
source.emit(1) # seed with initial value, does not block thread due to Future return
2713

28-
run_asyncio_loop()
14+
try:
15+
asyncio.get_event_loop().run_forever()
16+
except (KeyboardInterrupt, asyncio.CancelledError):
17+
pass

examples/fib_thread.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
from streamz import Stream
2-
from tornado.ioloop import IOLoop
32

43
source = Stream()
54
s = source.sliding_window(2).map(sum)
6-
L = s.sink_to_list() # store result in a list
5+
L = s.sink_to_list() # store result in a list
76

8-
s.rate_limit('500ms').sink(source.emit) # pipe output back to input
97
s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second
8+
s.rate_limit('500ms').connect(source) # pipe output back to input
109

11-
source.emit(0) # seed with initial values
12-
source.emit(1)
10+
try:
11+
source.emit(1) # seed with initial value, blocks thread due to cycle in stream
12+
except KeyboardInterrupt:
13+
pass

examples/fib_tornado.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44

55
source = Stream(asynchronous=True)
66
s = source.sliding_window(2).map(sum)
7-
L = s.sink_to_list() # store result in a list
7+
L = s.sink_to_list() # store result in a list
88

9-
s.rate_limit('500ms').sink(source.emit) # pipe output back to input
109
s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second
10+
s.rate_limit('500ms').connect(source) # pipe output back to input
1111

12-
source.emit(0) # seed with initial values
13-
source.emit(1)
12+
source.emit(1) # seed with initial value, does not block thread due to Future return
1413

15-
IOLoop.current().start()
14+
try:
15+
IOLoop.current().start()
16+
except KeyboardInterrupt:
17+
pass

examples/network_wordcount.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,9 @@
1818
)
1919

2020
s.start()
21-
time.sleep(600)
21+
22+
try:
23+
while True:
24+
time.sleep(600)
25+
except KeyboardInterrupt:
26+
pass

examples/scrape.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,23 @@
1-
from __future__ import print_function
2-
3-
from time import sleep
41
import sys
5-
6-
from BeautifulSoup import BeautifulSoup # Python 2 only, sorry.
2+
from urllib.parse import urlparse
73

84
import requests
9-
from streamz import Stream
105
import toolz
11-
import urlparse
6+
from bs4 import BeautifulSoup
127

8+
from streamz import Stream
139

1410

15-
def links_of_page((content, page)):
16-
uri = urlparse.urlparse(page)
11+
def links_of_page(content_page):
12+
(content, page) = content_page
13+
uri = urlparse(page)
1714
domain = '%s://%s' % (uri.scheme, uri.netloc)
1815
try:
19-
soup = BeautifulSoup(content)
16+
soup = BeautifulSoup(content, features="html.parser")
2017
except:
2118
return []
2219
else:
23-
links = [link.get('href') for link in soup.findAll('a')]
20+
links = [link.get('href') for link in soup.find_all('a')]
2421
return [domain + link
2522
for link in links
2623
if link
@@ -41,8 +38,8 @@ def topk_dict(d, k=10):
4138
.map(lambda x: x.content))
4239
links = (content.zip(pages)
4340
.map(links_of_page)
44-
.concat())
45-
links.sink(source.emit)
41+
.flatten())
42+
links.connect(source)
4643

4744
"""
4845
from nltk.corpus import stopwords
@@ -60,8 +57,7 @@ def topk_dict(d, k=10):
6057
"""
6158

6259
if len(sys.argv) > 1:
63-
source.emit(sys.argv[1])
64-
65-
66-
67-
#
60+
try:
61+
source.emit(sys.argv[1])
62+
except KeyboardInterrupt:
63+
pass

streamz/core.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1754,7 +1754,10 @@ class flatten(Stream):
17541754
def update(self, x, who=None, metadata=None):
17551755
L = []
17561756
items = chain(x)
1757-
item = next(items)
1757+
try:
1758+
item = next(items)
1759+
except StopIteration:
1760+
return L
17581761
for item_next in items:
17591762
y = self._emit(item)
17601763
item = item_next

streamz/sources.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import queue
44
import os
55
import time
6+
from inspect import isawaitable
7+
68
from tornado import gen
79
import weakref
810

@@ -252,7 +254,9 @@ async def handle_stream(self, stream, address):
252254
while not self.source.stopped:
253255
try:
254256
data = await stream.read_until(self.source.delimiter)
255-
await self.source._emit(data)
257+
result = self.source._emit(data)
258+
if isawaitable(result):
259+
await result
256260
except StreamClosedError:
257261
break
258262

streamz/tests/test_core.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,17 @@ def test_flatten(iterators):
886886
assert L == [1, 2, 3, 4, 5, 6, 7, 8]
887887

888888

889+
def test_flatten_empty():
890+
source = Stream()
891+
L = source.flatten().sink_to_list()
892+
893+
source.emit([1, 2])
894+
source.emit([])
895+
source.emit([3, 4])
896+
897+
assert L == [1, 2, 3, 4]
898+
899+
889900
def test_unique():
890901
source = Stream()
891902
L = source.unique().sink_to_list()

streamz/tests/test_sources.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from flaky import flaky
55
import pytest
66
from streamz import Source
7-
from streamz.utils_test import wait_for, await_for, gen_test
7+
from streamz.utils_test import free_port, wait_for, await_for, gen_test
88
import socket
99

1010

@@ -47,6 +47,37 @@ def test_tcp():
4747
sock2.close()
4848

4949

50+
@flaky(max_runs=3, min_passes=1)
51+
def test_tcp_word_count_example():
52+
port = free_port()
53+
s = Source.from_tcp(port)
54+
out = s.map(bytes.split).flatten().frequencies().sink_to_list()
55+
s.start()
56+
wait_for(lambda: s.server is not None, 2, period=0.02)
57+
58+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
59+
sock.connect(("localhost", port))
60+
sock.send(b'data\n')
61+
62+
with (socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock,
63+
socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock2):
64+
sock.connect(("localhost", port))
65+
sock2.connect(("localhost", port))
66+
sock.send(b'data\n')
67+
# regression test a bug in from_tcp where a second packet from
68+
# the same socket is dropped due to the socket handler dying
69+
sock.send(b'data\n')
70+
sock2.send(b'data2\n')
71+
72+
expected = [{b"data": 1}, {b"data": 2}, {b"data": 3}, {b"data": 3, b"data2": 1}]
73+
74+
def fail_func():
75+
assert out == expected
76+
77+
wait_for(lambda: out == expected, 2, fail_func=fail_func, period=0.01)
78+
79+
80+
5081
@flaky(max_runs=3, min_passes=1)
5182
@gen_test(timeout=60)
5283
def test_tcp_async():

streamz/utils_test.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import os
66
import shutil
7+
import socket
78
import tempfile
89
from time import time, sleep
910

@@ -14,6 +15,13 @@
1415
from .core import _io_loops, Stream
1516

1617

18+
def free_port():
19+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
20+
s.bind(('localhost', 0))
21+
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
22+
return s.getsockname()[1]
23+
24+
1725
@contextmanager
1826
def tmpfile(extension=''):
1927
extension = '.' + extension.lstrip('.')

0 commit comments

Comments
 (0)