Skip to content

Commit bad0414

Browse files
authored
feat: add negative acknowledge support via async constructs
feat: add negative acknowledge support via async constructs
2 parents def86a3 + 784b1ed commit bad0414

File tree

2 files changed

+7
-10
lines changed

2 files changed

+7
-10
lines changed

pulsar/asyncio.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -322,31 +322,28 @@ async def acknowledge_cumulative(
322322

323323
async def negative_acknowledge(
324324
self,
325-
message: Union[pulsar.Message, pulsar.MessageId,
326-
_pulsar.Message, _pulsar.MessageId]
325+
message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId]
327326
) -> None:
328327
"""
329328
Acknowledge the failure to process a single message asynchronously.
330329
331330
When a message is "negatively acked" it will be marked for redelivery after
332331
some fixed delay. The delay is configurable when constructing the consumer
333332
with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
334-
335333
This call is not blocking.
336-
334+
337335
Parameters
338336
----------
339-
340337
message:
341338
The received message or message id.
342339
"""
343-
future = asyncio.get_running_loop().create_future()
344340
if isinstance(message, pulsar.Message):
345341
msg = message._message
342+
elif isinstance(message, pulsar.MessageId):
343+
msg = message._msg_id
346344
else:
347345
msg = message
348-
self._consumer.negative_acknowledge_async(msg, functools.partial(_set_future, future, value=None))
349-
await future
346+
await asyncio.to_thread(self._consumer.negative_acknowledge, msg)
350347

351348
async def unsubscribe(self) -> None:
352349
"""

src/consumer.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,12 @@ void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const Me
135135

136136
void Consumer_negative_acknowledgeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) {
137137
py::gil_scoped_release release;
138-
consumer.negativeAcknowledgeAsync(msg, callback);
138+
consumer.negativeAcknowledge(msg);
139139
}
140140

141141
void Consumer_negative_acknowledgeAsync_message_id(Consumer& consumer, const MessageId& msgId, ResultCallback callback) {
142142
py::gil_scoped_release release;
143-
consumer.negativeAcknowledgeAsync(msg, callback);
143+
consumer.negativeAcknowledge(msgId);
144144
}
145145

146146
void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) {

0 commit comments

Comments
 (0)