From 1dfe73ad929396369533261bf0258cb046fa30c1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 16 Mar 2025 10:20:49 -0700 Subject: [PATCH] Add lock synchronization to Future success/failure --- kafka/future.py | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/kafka/future.py b/kafka/future.py index d0f3c6658..2af061ee7 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -2,6 +2,7 @@ import functools import logging +import threading log = logging.getLogger(__name__) @@ -15,6 +16,7 @@ def __init__(self): self.exception = None self._callbacks = [] self._errbacks = [] + self._lock = threading.Lock() def succeeded(self): return self.is_done and not bool(self.exception) @@ -30,37 +32,46 @@ def retriable(self): def success(self, value): assert not self.is_done, 'Future is already complete' - self.value = value - self.is_done = True + with self._lock: + self.value = value + self.is_done = True if self._callbacks: self._call_backs('callback', self._callbacks, self.value) return self def failure(self, e): assert not self.is_done, 'Future is already complete' - self.exception = e if type(e) is not type else e() - assert isinstance(self.exception, BaseException), ( + exception = e if type(e) is not type else e() + assert isinstance(exception, BaseException), ( 'future failed without an exception') - self.is_done = True + with self._lock: + self.exception = exception + self.is_done = True self._call_backs('errback', self._errbacks, self.exception) return self def add_callback(self, f, *args, **kwargs): if args or kwargs: f = functools.partial(f, *args, **kwargs) - if self.is_done and not self.exception: - self._call_backs('callback', [f], self.value) - else: - self._callbacks.append(f) + with self._lock: + if not self.is_done: + self._callbacks.append(f) + elif self.succeeded(): + self._lock.release() + self._call_backs('callback', [f], self.value) + self._lock.acquire() return self def add_errback(self, f, *args, **kwargs): if args or kwargs: f = functools.partial(f, *args, **kwargs) - if self.is_done and self.exception: - self._call_backs('errback', [f], self.exception) - else: - self._errbacks.append(f) + with self._lock: + if not self.is_done: + self._errbacks.append(f) + elif self.failed(): + self._lock.release() + self._call_backs('errback', [f], self.exception) + self._lock.acquire() return self def add_both(self, f, *args, **kwargs):