Skip to content

Commit 0152798

Browse files
brefrabouwew
authored andcommitted
Add locking protection
1 parent eaad422 commit 0152798

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

plugwise/controller.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, port: str, message_processor, node_state):
4949
self.connection = None
5050
self.discovery_finished = False
5151
self.expected_responses = {}
52+
self.lock_expected_responses = threading.Lock()
5253
self.init_callback = None
5354
self.last_seq_id = None
5455
self.message_processor = message_processor
@@ -151,6 +152,7 @@ def send(
151152
def resend(self, seq_id):
152153
"""Resend message."""
153154
_mac = "<unknown>"
155+
self.lock_expected_responses.acquire()
154156
if not self.expected_responses.get(seq_id):
155157
_LOGGER.warning(
156158
"Cannot resend unknown request %s",
@@ -211,6 +213,7 @@ def resend(self, seq_id):
211213
MESSAGE_RETRY + 1,
212214
)
213215
del self.expected_responses[seq_id]
216+
self.lock_expected_responses.release()
214217

215218
def _send_message_loop(self):
216219
"""Daemon to send messages waiting in queue."""
@@ -225,6 +228,7 @@ def _send_message_loop(self):
225228
# Calc next seq_id based last received ack message
226229
# if previous seq_id is unknown use fake b"0000"
227230
seq_id = inc_seq_id(self.last_seq_id)
231+
self.lock_expected_responses.acquire()
228232
self.expected_responses[seq_id] = request_set
229233
if self.expected_responses[seq_id][2] == 0:
230234
_LOGGER.info(
@@ -248,6 +252,7 @@ def _send_message_loop(self):
248252
self.expected_responses[seq_id][3] = datetime.now()
249253
# Send request
250254
self.connection.send(self.expected_responses[seq_id][0])
255+
self.lock_expected_responses.release()
251256
time.sleep(SLEEP_TIME)
252257
timeout_counter = 0
253258
# Wait max 1 second for acknowledge response from USB-stick
@@ -290,7 +295,8 @@ def message_handler(self, message):
290295
)
291296

292297
def _post_message_action(self, seq_id, ack_response=None, request="unknown"):
293-
"""Execute action if request has been successful.."""
298+
"""Execute action if request has been successful."""
299+
self.lock_expected_responses.acquire()
294300
if seq_id in self.expected_responses:
295301
if ack_response in (*REQUEST_SUCCESS, None):
296302
if self.expected_responses[seq_id][1]:
@@ -325,10 +331,12 @@ def _post_message_action(self, seq_id, ack_response=None, request="unknown"):
325331
request,
326332
str(seq_id),
327333
)
334+
self.lock_expected_responses.release()
328335

329336
def _receive_timeout_loop(self):
330337
"""Daemon to time out open requests without any (n)ack response message."""
331338
while self._receive_timeout_thread_state:
339+
self.lock_expected_responses.acquire()
332340
for seq_id in list(self.expected_responses.keys()):
333341
if self.expected_responses[seq_id][3] is not None:
334342
if self.expected_responses[seq_id][3] < (
@@ -347,6 +355,7 @@ def _receive_timeout_loop(self):
347355
str(seq_id),
348356
)
349357
self.resend(seq_id)
358+
self.lock_expected_responses.release()
350359
receive_timeout_checker = 0
351360
while (
352361
receive_timeout_checker < MESSAGE_TIME_OUT
@@ -367,6 +376,7 @@ def _log_status_message(self, message, status=None):
367376
str(message.seq_id),
368377
)
369378
else:
379+
self.lock_expected_responses.acquire()
370380
if self.expected_responses.get(message.seq_id):
371381
_LOGGER.warning(
372382
"Received unmanaged (%s) %s in response to %s with seq_id %s",
@@ -386,6 +396,7 @@ def _log_status_message(self, message, status=None):
386396
message.__class__.__name__,
387397
str(message.seq_id),
388398
)
399+
self.lock_expected_responses.release()
389400
else:
390401
_LOGGER.info(
391402
"Received %s from %s with sequence id %s",

0 commit comments

Comments
 (0)