Skip to content

Commit c84a9dc

Browse files
committed
Hold messages some time, allow multiple clients
1 parent 6b797d7 commit c84a9dc

File tree

6 files changed

+94
-36
lines changed

6 files changed

+94
-36
lines changed

monitor/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "0.1.0"
1+
__version__ = "0.2.0"
22

33
__all__ = ['websocket_connections', 'zeromq_connection', 'config_manager', 'main']
44

monitor/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def main():
2929
# get logger
3030
logger = init_logger(*config.get_logger_settings())
3131
# here we'll store all active connections
32-
connections = ClientConnections(logger)
32+
connections = ClientConnections(logger, loop)
3333

3434
websock_server = None
3535
try:

monitor/test/test_ClientConnections.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,34 @@
99
class TestClientConnections(unittest.TestCase):
1010
def setUp(self):
1111
logger = MagicMock()
12-
self._connections = ClientConnections(logger)
12+
loop = MagicMock
13+
self._connections = ClientConnections(logger, loop)
1314

1415
def test_add_client(self):
1516
queue = self._connections.add_client("1234")
1617
self.assertNotEqual(queue, None, "No queue returned")
1718
self.assertIsInstance(queue, asyncio.Queue, "Wrong type of returned value")
18-
self.assertEqual(queue, self._connections._clients["1234"],
19+
self.assertEqual(queue, self._connections._clients["1234"][0],
1920
"Queue not present at requested id")
2021
queue2 = self._connections.add_client("456777")
2122
self.assertNotEqual(queue, queue2, "Different clients have same future")
23+
self._connections.add_client("1234")
24+
self.assertEqual(len(self._connections._clients["1234"]), 2, "No concurrent clients for same id")
25+
26+
def test_remove_channel(self):
27+
self._connections.add_client("1234")
28+
self._connections.remove_channel("4567")
29+
self.assertEqual(len(self._connections._clients), 1, "Nonexisting channel removed")
30+
self._connections.remove_channel("1234")
31+
self.assertEqual(len(self._connections._clients), 0, "Existing channel not removed")
2232

2333
def test_remove_client(self):
24-
fut = self._connections.add_client("1234")
25-
self._connections.remove_client("4567")
26-
self.assertEqual(len(self._connections._clients), 1, "Nonexisting client removed")
27-
self._connections.remove_client("1234")
28-
self.assertEqual(len(self._connections._clients), 0, "Existing client not removed")
34+
queue1 = self._connections.add_client("1234")
35+
queue2 = self._connections.add_client("1234")
36+
self._connections.remove_client("4567", queue1)
37+
self.assertEqual(len(self._connections._clients["1234"]), 2, "Nonexisting client removed")
38+
self._connections.remove_client("1234", queue2)
39+
self.assertEqual(len(self._connections._clients["1234"]), 1, "Existing client not removed")
2940

3041
def test_remove_all_clients(self):
3142
self._connections.add_client("1234")
@@ -37,11 +48,11 @@ def test_remove_all_clients(self):
3748

3849
def test_send_message(self):
3950
queue = self._connections.add_client("1234")
40-
ret = self._connections.send_message("1234", "testing message")
41-
self.assertTrue(ret, "Message not sent")
51+
self._connections.send_message("1234", "testing message")
4252
self.assertEqual(queue.get_nowait(), "testing message", "Result from queue differs")
43-
ret2 = self._connections.send_message("1234", "msg two")
44-
self.assertTrue(ret2, "Add second message to queue failed")
53+
self._connections.send_message("1234", "msg two")
4554
self.assertEqual(queue.get_nowait(), "msg two", "Queue has not second message")
46-
ret3 = self._connections.send_message("51236", "random string")
47-
self.assertFalse(ret3, "Message sent to nonexisting host id")
55+
56+
self._connections.send_message("51236", "random string")
57+
queue2 = self._connections.add_client("51236")
58+
self.assertEqual(queue2.get_nowait(), "random string", "Result from queue differs")

monitor/test/test_websock_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def test_connection_handler(self):
4848
websocket_mock.recv.assert_called_once_with()
4949
websocket_mock.send.assert_called_once_with('result text')
5050
connection_mock.add_client.assert_called_once_with("1234")
51-
connection_mock.remove_client.assert_called_once_with("1234")
51+
connection_mock.remove_client.assert_called_once_with("1234", queue)
5252

5353
@patch('asyncio.set_event_loop')
5454
def test_run(self, mock_set_loop):

monitor/websocket_connections.py

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,43 +16,79 @@ class ClientConnections:
1616
thread safe.
1717
"""
1818

19-
def __init__(self, logger):
19+
def __init__(self, logger, loop):
2020
"""
2121
Initialize empty client dictionary
22+
23+
:param logger: System logger.
24+
:param loop: Main asyncio event loop.
2225
"""
2326
self._clients = dict()
27+
self._saved_messages = dict()
2428
self._logger = logger
29+
self._loop = loop
2530

2631
def add_client(self, id):
2732
"""
2833
Register new client which wants to receive messages to identifier 'id'.
29-
Only one subscriber per stream is allowed. Latter overrides previous one.
34+
If there are any such messages, they are sent immediately. There can be
35+
more subscribers per stream.
3036
3137
:param id: Identifier of required stream of messages
3238
:return: Returns new asyncio.Queue on which can be wait for by
3339
'yield from' command.
3440
"""
3541
new_queue = asyncio.Queue()
36-
self._clients[id] = new_queue
42+
if id not in self._clients.keys():
43+
self._clients[id] = []
44+
self._clients[id].append(new_queue)
45+
46+
# if there are already any messages, send them
47+
if id in self._saved_messages.keys():
48+
for msg in self._saved_messages[id]:
49+
new_queue.put_nowait(msg)
50+
3751
self._logger.debug("client connection: new client '{}' registered".format(id))
3852
return new_queue
3953

40-
def remove_client(self, id):
54+
def remove_channel(self, id):
4155
"""
42-
Remove client listening on 'id' message stream. This means removing associated
43-
queue and deleting the entry from internal dictionary.
44-
If no such client exists, nothing is done.
56+
Remove all clients listening on 'id' channel. This means removing all associated
57+
queues and received messages. If no such channel exists, nothing is done.
58+
This method is called 5 minutes after last message of each channel.
4559
4660
:param id: Identifier of required stream of messages
4761
:return: Nothing
4862
"""
63+
if id in self._saved_messages.keys():
64+
del self._saved_messages[id]
65+
4966
if id in self._clients.keys():
5067
del self._clients[id]
68+
self._logger.debug("client connection: channel '{}' removed".format(id))
69+
else:
70+
self._logger.debug("client connection: channel '{}' removing failed - "
71+
" not present".format(id))
72+
73+
def remove_client(self, id, queue):
74+
"""
75+
Remove client listening on 'id' message stream with queue 'queue'.
76+
This means removing associated queue and deleting the entry from internal dictionary.
77+
If no such client exists, nothing is done.
78+
79+
:param id: Identifier of required stream of messages
80+
:param queue: Queue associated with client to be removed
81+
:return: Nothing
82+
"""
83+
if id in self._clients.keys():
84+
clients = self._clients[id]
85+
clients.remove(queue)
5186
self._logger.debug("client connection: client '{}' removed".format(id))
5287
else:
5388
self._logger.debug("client connection: client '{}' removing failed - "
5489
" not present".format(id))
5590

91+
5692
def remove_all_clients(self):
5793
"""
5894
Remove all registered clients. This method could be called on app
@@ -61,26 +97,31 @@ def remove_all_clients(self):
6197
:return: Nothing
6298
"""
6399
self._clients.clear()
100+
self._saved_messages.clear()
64101
self._logger.debug("client connection: all clients removed")
65102

66103
def send_message(self, id, message):
67104
"""
68105
Send 'message' to client listening on stream with 'id'. If 'id' is not
69-
known, the message is silently dropped. The message is put into queue,
70-
so no message will get lost.
106+
known, the message is saved for latter use. Messages for connected
107+
clients are put into queues, so no message will get lost.
71108
72109
:param id: Identifier of required stream of messages
73110
:param message: String containing text to be sent
74-
:return: Returns True if message was sent, False otherwise
111+
:return: Nothing
75112
"""
113+
114+
if id not in self._saved_messages.keys():
115+
self._saved_messages[id] = []
116+
self._saved_messages[id].append(message)
117+
76118
if id in self._clients.keys():
77-
queue = self._clients[id]
78-
queue.put_nowait(message)
79-
return True
80-
else:
81-
self._logger.warning("client connection: Dropping message '{}' for "
82-
"non-existing stream '{}'".format(message, id))
83-
return False
119+
for queue in self._clients[id]:
120+
queue.put_nowait(message)
121+
122+
# on last message schedule removing whole channel after 5 minute wait
123+
if message is None:
124+
self._loop.call_later(5*60, self.remove_channel, id)
84125

85126

86127
class WebsocketServer(threading.Thread):
@@ -118,8 +159,8 @@ def connection_handler(self, websocket, path):
118159
119160
:param websocket: Socket with request
120161
:param path: Requested path of socket (not used)
121-
:return: Returns when socket is closed or future from ClientConnections
122-
is cancelled.
162+
:return: Returns when socket is closed or poison pill is found in message queue
163+
from ClientConnections.
123164
"""
124165
wanted_id = None
125166
try:
@@ -138,7 +179,8 @@ def connection_handler(self, websocket, path):
138179
except websockets.ConnectionClosed:
139180
self._logger.info("websocket server: connection closed for channel '{}'". format(wanted_id))
140181
finally:
141-
self._connections.remove_client(wanted_id)
182+
self._connections.remove_client(wanted_id, queue)
183+
142184

143185
def run(self):
144186
"""

monitor/zeromq_connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,9 @@ def start(self, message_callback):
7070
break
7171
# call registered callback with given data
7272
message_callback(client_id, data)
73+
74+
# after last message (command FINISHED) send also poison pill
75+
# to close listening sockets
76+
if decoded_message[1] == "FINISHED":
77+
message_callback(client_id, None)
7378
return True

0 commit comments

Comments
 (0)