Skip to content

Commit e997423

Browse files
committed
fix: add a timeout to RedisSteamBroker xautoclaim lock to prevent infinite locking
1 parent 8bc1ed5 commit e997423

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

taskiq_redis/redis_broker.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ def __init__(
161161
approximate: bool = True,
162162
idle_timeout: int = 600000, # 10 minutes
163163
unacknowledged_batch_size: int = 100,
164+
unacknowledged_lock_timeout: float | None = None,
164165
xread_count: int | None = 100,
165166
additional_streams: dict[str, str | int] | None = None,
166167
**connection_kwargs: Any,
@@ -188,8 +189,10 @@ def __init__(
188189
:param xread_count: number of messages to fetch from the stream at once.
189190
:param additional_streams: additional streams to read from.
190191
Each key is a stream name, value is a consumer id.
191-
:param redeliver_timeout: time in ms to wait before redelivering a message.
192192
:param unacknowledged_batch_size: number of unacknowledged messages to fetch.
193+
:param unacknowledged_lock_timeout: time in seconds before auto-releasing
194+
the lock. Useful when the worker crashes or gets killed.
195+
If not set, the lock can remain locked indefinitely.
193196
"""
194197
super().__init__(
195198
url,
@@ -209,6 +212,7 @@ def __init__(
209212
self.additional_streams = additional_streams or {}
210213
self.idle_timeout = idle_timeout
211214
self.unacknowledged_batch_size = unacknowledged_batch_size
215+
self.unacknowledged_lock_timeout = unacknowledged_lock_timeout
212216
self.count = xread_count
213217

214218
async def _declare_consumer_group(self) -> None:
@@ -290,6 +294,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
290294
for stream in [self.queue_name, *self.additional_streams.keys()]:
291295
lock = redis_conn.lock(
292296
f"autoclaim:{self.consumer_group_name}:{stream}",
297+
timeout=self.unacknowledged_lock_timeout,
293298
)
294299
if await lock.locked():
295300
continue

tests/test_broker.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,3 +432,38 @@ async def test_maxlen_in_sentinel_stream_broker(
432432
async with broker._acquire_master_conn() as redis_conn:
433433
assert await redis_conn.xlen(broker.queue_name) == maxlen
434434
await broker.shutdown()
435+
436+
437+
@pytest.mark.anyio
438+
async def test_unacknowledged_lock_timeout_in_stream_broker(
439+
redis_url: str,
440+
valid_broker_message: BrokerMessage,
441+
) -> None:
442+
unacknowledged_lock_timeout = 1
443+
queue_name = uuid.uuid4().hex
444+
consumer_group_name = uuid.uuid4().hex
445+
446+
broker = RedisStreamBroker(
447+
url=redis_url,
448+
approximate=False,
449+
queue_name=queue_name,
450+
consumer_group_name=consumer_group_name,
451+
unacknowledged_lock_timeout=unacknowledged_lock_timeout,
452+
)
453+
454+
await broker.startup()
455+
await broker.kick(valid_broker_message)
456+
457+
message = await get_message(broker)
458+
assert isinstance(message, AckableMessage)
459+
assert message.data == valid_broker_message.message
460+
461+
async with Redis(connection_pool=broker.connection_pool) as redis:
462+
lock_key = f"autoclaim:{consumer_group_name}:{queue_name}"
463+
await redis.exists(lock_key)
464+
await asyncio.sleep(unacknowledged_lock_timeout + 0.5)
465+
466+
lock_exists_after_timeout = await redis.exists(lock_key)
467+
assert lock_exists_after_timeout == 0, "Lock should be released after timeout"
468+
469+
await broker.shutdown()

0 commit comments

Comments
 (0)