Skip to content

Commit 5f0cb75

Browse files
committed
feat: Support dead letter topic.
1 parent 19df7c3 commit 5f0cb75

3 files changed

Lines changed: 122 additions & 2 deletions

File tree

pulsar/__init__.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@
4848
import _pulsar
4949

5050
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
51-
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode # noqa: F401
51+
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \
52+
DeadLetterPolicyBuilder # noqa: F401
5253

5354
from pulsar.__about__ import __version__
5455

@@ -374,6 +375,64 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str
374375
_check_type(str, method, 'method')
375376
self.auth = _pulsar.AuthenticationBasic.create(username, password, method)
376377

378+
class ConsumerDeadLetterPolicy:
379+
"""
380+
Configuration for the "dead letter queue" feature in consumer.
381+
"""
382+
def __init__(self, dead_letter_topic: str = None,
383+
max_redeliver_count: int = None,
384+
initial_subscription_name: str = None):
385+
"""
386+
Wrapper DeadLetterPolicy.
387+
388+
Parameters
389+
----------
390+
dead_letter_topic: Name of the dead topic where the failing messages are sent.
391+
The default value is: sourceTopicName + "-" + subscriptionName + "-DLQ"
392+
max_redeliver_count: Maximum number of times that a message is redelivered before being sent to the dead letter queue.
393+
- The maxRedeliverCount must be greater than 0.
394+
- The default value is None (DLQ is not enabled)
395+
initial_subscription_name: Name of the initial subscription name of the dead letter topic.
396+
If this field is not set, the initial subscription for the dead letter topic is not created.
397+
If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer
398+
fails to be created.
399+
"""
400+
builder = DeadLetterPolicyBuilder()
401+
if dead_letter_topic is not None:
402+
builder.deadLetterTopic(dead_letter_topic)
403+
if max_redeliver_count is not None:
404+
builder.maxRedeliverCount(max_redeliver_count)
405+
if initial_subscription_name is not None:
406+
builder.initialSubscriptionName(initial_subscription_name)
407+
self._policy = builder.build()
408+
409+
@property
410+
def dead_letter_topic(self) -> str:
411+
"""
412+
Return the dead letter topic for dead letter policy.
413+
"""
414+
return self._policy.getDeadLetterTopic()
415+
416+
@property
417+
def max_redeliver_count(self) -> int:
418+
"""
419+
Return the max redeliver count for dead letter policy.
420+
"""
421+
return self._policy.getMaxRedeliverCount()
422+
423+
@property
424+
def initial_subscription_name(self) -> str:
425+
"""
426+
Return the initial subscription name for dead letter policy.
427+
"""
428+
return self._policy.getInitialSubscriptionName()
429+
430+
def policy(self):
431+
"""
432+
Returns the actual one DeadLetterPolicy.
433+
"""
434+
return self._policy
435+
377436
class Client:
378437
"""
379438
The Pulsar client. A single client instance can be used to create producers
@@ -708,6 +767,7 @@ def subscribe(self, topic, subscription_name,
708767
key_shared_policy=None,
709768
batch_index_ack_enabled=False,
710769
regex_subscription_mode=RegexSubscriptionMode.PersistentOnly,
770+
dead_letter_policy: ConsumerDeadLetterPolicy = None,
711771
):
712772
"""
713773
Subscribe to the given topic and subscription combination.
@@ -805,6 +865,12 @@ def my_listener(consumer, message):
805865
* PersistentOnly: By default only subscribe to persistent topics.
806866
* NonPersistentOnly: Only subscribe to non-persistent topics.
807867
* AllTopics: Subscribe to both persistent and non-persistent topics.
868+
dead_letter_policy: class ConsumerDeadLetterPolicy
869+
Set dead letter policy for consumer.
870+
By default, some messages are redelivered many times, even to the extent that they can never be
871+
stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
872+
exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
873+
automatically.
808874
"""
809875
_check_type(str, subscription_name, 'subscription_name')
810876
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -864,6 +930,8 @@ def my_listener(consumer, message):
864930
if key_shared_policy:
865931
conf.key_shared_policy(key_shared_policy.policy())
866932
conf.batch_index_ack_enabled(batch_index_ack_enabled)
933+
if dead_letter_policy:
934+
conf.dead_letter_policy(dead_letter_policy.policy())
867935

868936
c = Consumer()
869937
if isinstance(topic, str):

src/config.cc

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <pulsar/ConsumerConfiguration.h>
2323
#include <pulsar/ProducerConfiguration.h>
2424
#include <pulsar/KeySharedPolicy.h>
25+
#include <pulsar/DeadLetterPolicyBuilder.h>
2526
#include <pybind11/functional.h>
2627
#include <pybind11/pybind11.h>
2728
#include <pybind11/stl.h>
@@ -231,6 +232,20 @@ void export_config(py::module_& m) {
231232
.def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages)
232233
.def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes);
233234

235+
class_<DeadLetterPolicy>(m, "DeadLetterPolicy")
236+
.def(init<>())
237+
.def("getDeadLetterTopic", &DeadLetterPolicy::getDeadLetterTopic)
238+
.def("getMaxRedeliverCount", &DeadLetterPolicy::getMaxRedeliverCount)
239+
.def("getInitialSubscriptionName", &DeadLetterPolicy::getInitialSubscriptionName);
240+
241+
class_<DeadLetterPolicyBuilder>(m, "DeadLetterPolicyBuilder")
242+
.def(init<>())
243+
.def("deadLetterTopic", &DeadLetterPolicyBuilder::deadLetterTopic, return_value_policy::reference)
244+
.def("maxRedeliverCount", &DeadLetterPolicyBuilder::maxRedeliverCount, return_value_policy::reference)
245+
.def("initialSubscriptionName", &DeadLetterPolicyBuilder::initialSubscriptionName, return_value_policy::reference)
246+
.def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference)
247+
.def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference);
248+
234249
class_<ConsumerConfiguration, std::shared_ptr<ConsumerConfiguration>>(m, "ConsumerConfiguration")
235250
.def(init<>())
236251
.def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy)
@@ -285,7 +300,9 @@ void export_config(py::module_& m) {
285300
return_value_policy::reference)
286301
.def("batch_index_ack_enabled", &ConsumerConfiguration::isBatchIndexAckEnabled)
287302
.def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled,
288-
return_value_policy::reference);
303+
return_value_policy::reference)
304+
.def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
305+
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
289306

290307
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")
291308
.def(init<>())

tests/pulsar_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
CryptoKeyReader,
4545
ConsumerBatchReceivePolicy,
4646
ProducerAccessMode,
47+
ConsumerDeadLetterPolicy,
4748
)
4849
from pulsar.schema import JsonSchema, Record, Integer
4950

@@ -1714,6 +1715,40 @@ def test_batch_index_ack(self):
17141715
# assert no more msgs.
17151716
with self.assertRaises(pulsar.Timeout):
17161717
consumer.receive(timeout_millis=1000)
1718+
client.close()
1719+
1720+
def test_dead_letter_policy(self):
1721+
client = Client(self.serviceUrl)
1722+
topic = "my-python-topic-test-dlq" + str(time.time())
1723+
dlq_topic = 'dlq-' + topic
1724+
max_redeliver_count = 5
1725+
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared,
1726+
dead_letter_policy=ConsumerDeadLetterPolicy(dlq_topic, max_redeliver_count, 'init-sub'))
1727+
dlq_consumer = client.subscribe(dlq_topic, "my-sub", consumer_type=ConsumerType.Shared)
1728+
1729+
# Sen num msgs.
1730+
producer = client.create_producer(topic)
1731+
num = 10
1732+
for i in range(num):
1733+
producer.send(b"hello-%d" % i)
1734+
producer.flush()
1735+
1736+
# Redelivery all messages maxRedeliverCountNum time.
1737+
for i in range(1, num * max_redeliver_count + num + 1):
1738+
msg = consumer.receive()
1739+
if i % num == 0:
1740+
consumer.redeliver_unacknowledged_messages()
1741+
print(f"Start redeliver msgs '{i}'")
1742+
with self.assertRaises(pulsar.Timeout):
1743+
consumer.receive(100)
1744+
1745+
for i in range(num):
1746+
msg = dlq_consumer.receive()
1747+
self.assertTrue(msg)
1748+
self.assertEqual(msg.data(), b"hello-%d" % i)
1749+
dlq_consumer.acknowledge(msg)
1750+
with self.assertRaises(pulsar.Timeout):
1751+
dlq_consumer.receive(100)
17171752

17181753
client.close()
17191754

0 commit comments

Comments
 (0)