Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions kafka/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import functools
import logging
import threading

log = logging.getLogger(__name__)

Expand All @@ -15,6 +16,7 @@ def __init__(self):
self.exception = None
self._callbacks = []
self._errbacks = []
self._lock = threading.Lock()

def succeeded(self):
return self.is_done and not bool(self.exception)
Expand All @@ -30,37 +32,46 @@ def retriable(self):

def success(self, value):
assert not self.is_done, 'Future is already complete'
self.value = value
self.is_done = True
with self._lock:
self.value = value
self.is_done = True
if self._callbacks:
self._call_backs('callback', self._callbacks, self.value)
return self

def failure(self, e):
assert not self.is_done, 'Future is already complete'
self.exception = e if type(e) is not type else e()
assert isinstance(self.exception, BaseException), (
exception = e if type(e) is not type else e()
assert isinstance(exception, BaseException), (
'future failed without an exception')
self.is_done = True
with self._lock:
self.exception = exception
self.is_done = True
self._call_backs('errback', self._errbacks, self.exception)
return self

def add_callback(self, f, *args, **kwargs):
if args or kwargs:
f = functools.partial(f, *args, **kwargs)
if self.is_done and not self.exception:
self._call_backs('callback', [f], self.value)
else:
self._callbacks.append(f)
with self._lock:
if not self.is_done:
self._callbacks.append(f)
elif self.succeeded():
self._lock.release()
self._call_backs('callback', [f], self.value)
self._lock.acquire()
return self

def add_errback(self, f, *args, **kwargs):
if args or kwargs:
f = functools.partial(f, *args, **kwargs)
if self.is_done and self.exception:
self._call_backs('errback', [f], self.exception)
else:
self._errbacks.append(f)
with self._lock:
if not self.is_done:
self._errbacks.append(f)
elif self.failed():
self._lock.release()
self._call_backs('errback', [f], self.exception)
self._lock.acquire()
return self

def add_both(self, f, *args, **kwargs):
Expand Down
Loading