Skip to content

Commit 7cfdee7

Browse files
committed
feat: Support dead letter topic.
1 parent 2cb3cfe commit 7cfdee7

File tree

3 files changed

+121
-2
lines changed

3 files changed

+121
-2
lines changed

pulsar/__init__.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import _pulsar
4949

5050
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
51-
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode # noqa: F401
51+
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, DeadLetterPolicy, DeadLetterPolicyBuilder # noqa: F401
5252

5353
from pulsar.__about__ import __version__
5454

@@ -374,6 +374,64 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str
374374
_check_type(str, method, 'method')
375375
self.auth = _pulsar.AuthenticationBasic.create(username, password, method)
376376

377+
class ConsumerDeadLetterPolicy:
378+
"""
379+
Configuration for the "dead letter queue" feature in consumer.
380+
"""
381+
def __init__(self, dead_letter_topic: str = None,
382+
max_redeliver_count: int = None,
383+
initial_subscription_name: str = None):
384+
"""
385+
Wrapper DeadLetterPolicy.
386+
387+
Parameters
388+
----------
389+
dead_letter_topic: Name of the dead topic where the failing messages are sent.
390+
The default value is: sourceTopicName + "-" + subscriptionName + "-DLQ"
391+
max_redeliver_count: Maximum number of times that a message is redelivered before being sent to the dead letter queue.
392+
- The maxRedeliverCount must be greater than 0.
393+
- The default value is None (DLQ is not enabled)
394+
initial_subscription_name: Name of the initial subscription name of the dead letter topic.
395+
If this field is not set, the initial subscription for the dead letter topic is not created.
396+
If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer
397+
fails to be created.
398+
"""
399+
builder = DeadLetterPolicyBuilder()
400+
if dead_letter_topic is not None:
401+
builder.deadLetterTopic(dead_letter_topic)
402+
if max_redeliver_count is not None:
403+
builder.maxRedeliverCount(max_redeliver_count)
404+
if initial_subscription_name is not None:
405+
builder.initialSubscriptionName(initial_subscription_name)
406+
self._policy = builder.build()
407+
408+
@property
409+
def dead_letter_topic(self) -> str:
410+
"""
411+
Return the dead letter topic for dead letter policy.
412+
"""
413+
return self._policy.getDeadLetterTopic()
414+
415+
@property
416+
def max_redeliver_count(self) -> int:
417+
"""
418+
Return the max redeliver count for dead letter policy.
419+
"""
420+
return self._policy.getMaxRedeliverCount()
421+
422+
@property
423+
def initial_subscription_name(self) -> str:
424+
"""
425+
Return the initial subscription name for dead letter policy.
426+
"""
427+
return self._policy.getInitialSubscriptionName()
428+
429+
def policy(self):
430+
"""
431+
Returns the actual one DeadLetterPolicy.
432+
"""
433+
return self._policy
434+
377435
class Client:
378436
"""
379437
The Pulsar client. A single client instance can be used to create producers
@@ -694,6 +752,7 @@ def subscribe(self, topic, subscription_name,
694752
batch_receive_policy=None,
695753
key_shared_policy=None,
696754
batch_index_ack_enabled=False,
755+
dead_letter_policy: ConsumerDeadLetterPolicy = None,
697756
):
698757
"""
699758
Subscribe to the given topic and subscription combination.
@@ -783,6 +842,12 @@ def my_listener(consumer, message):
783842
batch_index_ack_enabled: Enable the batch index acknowledgement.
784843
It should be noted that this option can only work when the broker side also enables the batch index
785844
acknowledgement. See the `acknowledgmentAtBatchIndexLevelEnabled` config in `broker.conf`.
845+
dead_letter_policy: class ConsumerDeadLetterPolicy
846+
Set dead letter policy for consumer.
847+
By default, some messages are redelivered many times, even to the extent that they can never be
848+
stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
849+
exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
850+
automatically.
786851
"""
787852
_check_type(str, subscription_name, 'subscription_name')
788853
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -840,6 +905,8 @@ def my_listener(consumer, message):
840905
if key_shared_policy:
841906
conf.key_shared_policy(key_shared_policy.policy())
842907
conf.batch_index_ack_enabled(batch_index_ack_enabled)
908+
if dead_letter_policy:
909+
conf.dead_letter_policy(dead_letter_policy.policy())
843910

844911
c = Consumer()
845912
if isinstance(topic, str):

src/config.cc

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <pulsar/ConsumerConfiguration.h>
2323
#include <pulsar/ProducerConfiguration.h>
2424
#include <pulsar/KeySharedPolicy.h>
25+
#include <pulsar/DeadLetterPolicyBuilder.h>
2526
#include <pybind11/functional.h>
2627
#include <pybind11/pybind11.h>
2728
#include <pybind11/stl.h>
@@ -231,6 +232,20 @@ void export_config(py::module_& m) {
231232
.def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages)
232233
.def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes);
233234

235+
class_<DeadLetterPolicy>(m, "DeadLetterPolicy")
236+
.def(init<>())
237+
.def("getDeadLetterTopic", &DeadLetterPolicy::getDeadLetterTopic)
238+
.def("getMaxRedeliverCount", &DeadLetterPolicy::getMaxRedeliverCount)
239+
.def("getInitialSubscriptionName", &DeadLetterPolicy::getInitialSubscriptionName);
240+
241+
class_<DeadLetterPolicyBuilder>(m, "DeadLetterPolicyBuilder")
242+
.def(init<>())
243+
.def("deadLetterTopic", &DeadLetterPolicyBuilder::deadLetterTopic, return_value_policy::reference)
244+
.def("maxRedeliverCount", &DeadLetterPolicyBuilder::maxRedeliverCount, return_value_policy::reference)
245+
.def("initialSubscriptionName", &DeadLetterPolicyBuilder::initialSubscriptionName, return_value_policy::reference)
246+
.def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference)
247+
.def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference);
248+
234249
class_<ConsumerConfiguration, std::shared_ptr<ConsumerConfiguration>>(m, "ConsumerConfiguration")
235250
.def(init<>())
236251
.def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy)
@@ -283,7 +298,9 @@ void export_config(py::module_& m) {
283298
return_value_policy::reference)
284299
.def("batch_index_ack_enabled", &ConsumerConfiguration::isBatchIndexAckEnabled)
285300
.def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled,
286-
return_value_policy::reference);
301+
return_value_policy::reference)
302+
.def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
303+
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
287304

288305
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")
289306
.def(init<>())

tests/pulsar_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
InitialPosition,
4343
CryptoKeyReader,
4444
ConsumerBatchReceivePolicy,
45+
ConsumerDeadLetterPolicy,
4546
)
4647
from pulsar.schema import JsonSchema, Record, Integer
4748

@@ -1657,6 +1658,40 @@ def test_batch_index_ack(self):
16571658
# assert no more msgs.
16581659
with self.assertRaises(pulsar.Timeout):
16591660
consumer.receive(timeout_millis=1000)
1661+
client.close()
1662+
1663+
def test_dead_letter_policy(self):
1664+
client = Client(self.serviceUrl)
1665+
topic = "my-python-topic-test-dlq" + str(time.time())
1666+
dlq_topic = 'dlq-' + topic
1667+
max_redeliver_count = 5
1668+
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared,
1669+
dead_letter_policy=ConsumerDeadLetterPolicy(dlq_topic, max_redeliver_count, 'init-sub'))
1670+
dlq_consumer = client.subscribe(dlq_topic, "my-sub", consumer_type=ConsumerType.Shared)
1671+
1672+
# Sen num msgs.
1673+
producer = client.create_producer(topic)
1674+
num = 10
1675+
for i in range(num):
1676+
producer.send(b"hello-%d" % i)
1677+
producer.flush()
1678+
1679+
# Redelivery all messages maxRedeliverCountNum time.
1680+
for i in range(1, num * max_redeliver_count + num + 1):
1681+
msg = consumer.receive()
1682+
if i % num == 0:
1683+
consumer.redeliver_unacknowledged_messages()
1684+
print(f"Start redeliver msgs '{i}'")
1685+
with self.assertRaises(pulsar.Timeout):
1686+
consumer.receive(100)
1687+
1688+
for i in range(num):
1689+
msg = dlq_consumer.receive()
1690+
self.assertTrue(msg)
1691+
self.assertEqual(msg.data(), b"hello-%d" % i)
1692+
dlq_consumer.acknowledge(msg)
1693+
with self.assertRaises(pulsar.Timeout):
1694+
dlq_consumer.receive(100)
16601695

16611696
client.close()
16621697

0 commit comments

Comments
 (0)