Skip to content

Commit 3e2f6aa

Browse files
committed
Added support for KeySharedPolicy for consumers
1 parent a6476d9 commit 3e2f6aa

File tree

5 files changed

+236
-2
lines changed

5 files changed

+236
-2
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,9 @@ wheelhouse
1717
vcpkg_installed/
1818
*.pyd
1919
*.lib
20+
21+
22+
lib_pulsar.so
23+
tests/test.log
24+
.tests-container-id.txt
25+

pulsar/__init__.py

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@
4343
"""
4444

4545
import logging
46+
from typing import List, Tuple, Optional
47+
4648
import _pulsar
4749

4850
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
49-
LoggerLevel, BatchReceivePolicy # noqa: F401
51+
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode # noqa: F401
5052

5153
from pulsar.__about__ import __version__
5254

@@ -660,7 +662,8 @@ def subscribe(self, topic, subscription_name,
660662
max_pending_chunked_message=10,
661663
auto_ack_oldest_chunked_message_on_queue_full=False,
662664
start_message_id_inclusive=False,
663-
batch_receive_policy=None
665+
batch_receive_policy=None,
666+
key_shared_policy=None
664667
):
665668
"""
666669
Subscribe to the given topic and subscription combination.
@@ -745,6 +748,8 @@ def my_listener(consumer, message):
745748
Set the consumer to include the given position of any reset operation like Consumer::seek.
746749
batch_receive_policy: class ConsumerBatchReceivePolicy
747750
Set the batch collection policy for batch receiving.
751+
key_shared_policy: class ConsumerKeySharedPolicy
752+
Set the key shared policy for use when the ConsumerType is KeyShared.
748753
"""
749754
_check_type(str, subscription_name, 'subscription_name')
750755
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -765,6 +770,7 @@ def my_listener(consumer, message):
765770
_check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full')
766771
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
767772
_check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy')
773+
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
768774

769775
conf = _pulsar.ConsumerConfiguration()
770776
conf.consumer_type(consumer_type)
@@ -797,6 +803,9 @@ def my_listener(consumer, message):
797803
if batch_receive_policy:
798804
conf.batch_receive_policy(batch_receive_policy.policy())
799805

806+
if key_shared_policy:
807+
conf.key_shared_policy(key_shared_policy.policy())
808+
800809
c = Consumer()
801810
if isinstance(topic, str):
802811
# Single topic
@@ -1408,6 +1417,73 @@ def policy(self):
14081417
"""
14091418
return self._policy
14101419

1420+
class ConsumerKeySharedPolicy:
1421+
"""
1422+
Consumer key shared policy is used to configure the consumer behaviour when the ConsumerType is KeyShared.
1423+
"""
1424+
def __init__(
1425+
self,
1426+
key_shared_mode: KeySharedMode = KeySharedMode.AutoSplit,
1427+
allow_out_of_order_delivery: bool = False,
1428+
sticky_ranges: Optional[List[Tuple[int, int]]] = None,
1429+
):
1430+
"""
1431+
Wrapper KeySharedPolicy.
1432+
1433+
Parameters
1434+
----------
1435+
1436+
key_shared_mode: KeySharedMode, optional
1437+
Set the key shared mode. eg: KeySharedMode.Sticky or KeysharedMode.AutoSplit
1438+
1439+
allow_out_of_order_delivery: bool, optional
1440+
Set whether to allow for out of order delivery
1441+
If it is enabled, it relaxes the ordering requirement and allows the broker to send out-of-order
1442+
messages in case of failures. This makes it faster for new consumers to join without being stalled by
1443+
an existing slow consumer.
1444+
1445+
If this is True, a single consumer still receives all keys, but they may come in different orders.
1446+
1447+
sticky_ranges: List[Tuple[int, int]], optional
1448+
Set the ranges used with sticky mode. The integers can be from 0 to 2^16 (0 <= val < 65,536)
1449+
"""
1450+
if key_shared_mode == KeySharedMode.Sticky and sticky_ranges is None:
1451+
raise ValueError("When using key_shared_mode = KeySharedMode.Sticky you must also provide sticky_ranges")
1452+
1453+
self._policy = KeySharedPolicy()
1454+
self._policy.setKeySharedMode(key_shared_mode)
1455+
self._policy.setAllowOutOfOrderDelivery(allow_out_of_order_delivery)
1456+
1457+
if sticky_ranges is not None:
1458+
self._policy.setStickyRanges(sticky_ranges)
1459+
1460+
@property
1461+
def key_shared_mode(self) -> KeySharedMode:
1462+
"""
1463+
Returns the key shared mode
1464+
"""
1465+
return self._policy.getKeySharedMode()
1466+
1467+
@property
1468+
def allow_out_of_order_delivery(self) -> bool:
1469+
"""
1470+
Returns whether out of order delivery is enabled
1471+
"""
1472+
return self._policy.isAllowOutOfOrderDelivery()
1473+
1474+
@property
1475+
def sticky_ranges(self) -> List[Tuple[int, int]]:
1476+
"""
1477+
Returns the actual sticky ranges
1478+
"""
1479+
return self._policy.getStickyRanges()
1480+
1481+
def policy(self):
1482+
"""
1483+
Returns the actual KeySharedPolicy.
1484+
"""
1485+
return self._policy
1486+
14111487
class Reader:
14121488
"""
14131489
Pulsar topic reader.

src/config.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
#include <pulsar/ConsoleLoggerFactory.h>
2222
#include <pulsar/ConsumerConfiguration.h>
2323
#include <pulsar/ProducerConfiguration.h>
24+
#include <pulsar/KeySharedPolicy.h>
2425
#include <pybind11/functional.h>
2526
#include <pybind11/pybind11.h>
27+
#include <pybind11/stl.h>
2628
#include <memory>
2729

2830
namespace py = pybind11;
@@ -118,9 +120,21 @@ static ClientConfiguration& ClientConfiguration_setFileLogger(ClientConfiguratio
118120
return conf;
119121
}
120122

123+
template <typename... Args>
124+
using overload_cast_ = pybind11::detail::overload_cast_impl<Args...>;
125+
121126
void export_config(py::module_& m) {
122127
using namespace py;
123128

129+
class_<KeySharedPolicy, std::shared_ptr<KeySharedPolicy>>(m, "KeySharedPolicy")
130+
.def(init<>())
131+
.def("setKeySharedMode", &KeySharedPolicy::setKeySharedMode, return_value_policy::reference)
132+
.def("getKeySharedMode", &KeySharedPolicy::getKeySharedMode)
133+
.def("setAllowOutOfOrderDelivery", &KeySharedPolicy::setAllowOutOfOrderDelivery, return_value_policy::reference)
134+
.def("isAllowOutOfOrderDelivery", &KeySharedPolicy::isAllowOutOfOrderDelivery)
135+
.def("setStickyRanges", overload_cast_<StickyRanges>()(&KeySharedPolicy::setStickyRanges), return_value_policy::reference)
136+
.def("getStickyRanges", &KeySharedPolicy::getStickyRanges);
137+
124138
class_<CryptoKeyReader, std::shared_ptr<CryptoKeyReader>>(m, "AbstractCryptoKeyReader")
125139
.def("getPublicKey", &CryptoKeyReader::getPublicKey)
126140
.def("getPrivateKey", &CryptoKeyReader::getPrivateKey);
@@ -222,6 +236,8 @@ void export_config(py::module_& m) {
222236

223237
class_<ConsumerConfiguration, std::shared_ptr<ConsumerConfiguration>>(m, "ConsumerConfiguration")
224238
.def(init<>())
239+
.def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy)
240+
.def("key_shared_policy", &ConsumerConfiguration::setKeySharedPolicy, return_value_policy::reference)
225241
.def("consumer_type", &ConsumerConfiguration::getConsumerType)
226242
.def("consumer_type", &ConsumerConfiguration::setConsumerType, return_value_policy::reference)
227243
.def("schema", &ConsumerConfiguration::getSchema, return_value_policy::copy)

src/enums.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <pulsar/CompressionType.h>
2121
#include <pulsar/ConsumerConfiguration.h>
2222
#include <pulsar/ProducerConfiguration.h>
23+
#include <pulsar/KeySharedPolicy.h>
2324
#include <pybind11/pybind11.h>
2425

2526
using namespace pulsar;
@@ -28,6 +29,10 @@ namespace py = pybind11;
2829
void export_enums(py::module_& m) {
2930
using namespace py;
3031

32+
enum_<KeySharedMode>(m, "KeySharedMode")
33+
.value("AutoSplit", AUTO_SPLIT)
34+
.value("Sticky", STICKY);
35+
3136
enum_<ProducerConfiguration::PartitionsRoutingMode>(m, "PartitionsRoutingMode")
3237
.value("UseSinglePartition", ProducerConfiguration::UseSinglePartition)
3338
.value("RoundRobinDistribution", ProducerConfiguration::RoundRobinDistribution)

tests/pulsar_test.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
MessageId,
3333
CompressionType,
3434
ConsumerType,
35+
KeySharedMode,
36+
ConsumerKeySharedPolicy,
3537
PartitionsRoutingMode,
3638
AuthenticationBasic,
3739
AuthenticationTLS,
@@ -1437,6 +1439,135 @@ def send_callback(res, msg):
14371439
producer.flush()
14381440
client.close()
14391441

1442+
def test_keyshare_policy(self):
1443+
with self.assertRaises(ValueError):
1444+
# Raise error because sticky ranges are not provided.
1445+
pulsar.ConsumerKeySharedPolicy(
1446+
key_shared_mode=pulsar.KeySharedMode.Sticky,
1447+
allow_out_of_order_delivery=False,
1448+
)
1449+
1450+
expected_key_shared_mode = pulsar.KeySharedMode.Sticky
1451+
expected_allow_out_of_order_delivery = True
1452+
expected_sticky_ranges = [(0, 100), (101,200)]
1453+
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1454+
key_shared_mode=expected_key_shared_mode,
1455+
allow_out_of_order_delivery=expected_allow_out_of_order_delivery,
1456+
sticky_ranges=expected_sticky_ranges
1457+
)
1458+
1459+
self.assertEqual(consumer_key_shared_policy.key_shared_mode, expected_key_shared_mode)
1460+
self.assertEqual(consumer_key_shared_policy.allow_out_of_order_delivery, expected_allow_out_of_order_delivery)
1461+
self.assertEqual(consumer_key_shared_policy.sticky_ranges, expected_sticky_ranges)
1462+
1463+
def test_keyshared_invalid_sticky_ranges(self):
1464+
client = Client(self.serviceUrl)
1465+
topic = "my-python-topic-keyshare-invalid-" + str(time.time())
1466+
with self.assertRaises(ValueError):
1467+
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1468+
key_shared_mode=pulsar.KeySharedMode.Sticky,
1469+
allow_out_of_order_delivery=False,
1470+
sticky_ranges=[(0,65536)]
1471+
)
1472+
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared,
1473+
start_message_id_inclusive=True,
1474+
key_shared_policy=consumer_key_shared_policy)
1475+
1476+
with self.assertRaises(ValueError):
1477+
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1478+
key_shared_mode=pulsar.KeySharedMode.Sticky,
1479+
allow_out_of_order_delivery=False,
1480+
sticky_ranges=[(0, 100), (50, 150)]
1481+
)
1482+
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared,
1483+
start_message_id_inclusive=True,
1484+
key_shared_policy=consumer_key_shared_policy)
1485+
1486+
1487+
def test_keyshared_autosplit(self):
1488+
client = Client(self.serviceUrl)
1489+
topic = "my-python-topic-keyshare-autosplit-" + str(time.time())
1490+
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1491+
key_shared_mode=pulsar.KeySharedMode.AutoSplit,
1492+
allow_out_of_order_delivery=True,
1493+
)
1494+
1495+
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-1',
1496+
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
1497+
consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-2',
1498+
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
1499+
producer = client.create_producer(topic)
1500+
1501+
for i in range(10):
1502+
if i > 0:
1503+
time.sleep(0.02)
1504+
producer.send(b"hello-%d" % i)
1505+
1506+
msgs = []
1507+
while True:
1508+
try:
1509+
msg = consumer.receive(100)
1510+
except pulsar.Timeout:
1511+
break
1512+
msgs.append(msgs)
1513+
consumer.acknowledge(msg)
1514+
1515+
while True:
1516+
try:
1517+
msg = consumer2.receive(100)
1518+
except pulsar.Timeout:
1519+
break
1520+
msgs.append(msgs)
1521+
consumer2.acknowledge(msg)
1522+
1523+
self.assertEqual(len(msgs), 10)
1524+
client.close()
1525+
1526+
def test_sticky_autosplit(self):
1527+
client = Client(self.serviceUrl)
1528+
topic = "my-python-topic-keyshare-sticky-" + str(time.time())
1529+
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1530+
key_shared_mode=pulsar.KeySharedMode.Sticky,
1531+
allow_out_of_order_delivery=True,
1532+
sticky_ranges=[(0,30000)],
1533+
)
1534+
1535+
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-1',
1536+
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
1537+
1538+
consumer2_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1539+
key_shared_mode=pulsar.KeySharedMode.Sticky,
1540+
allow_out_of_order_delivery=True,
1541+
sticky_ranges=[(30001, 65535)],
1542+
)
1543+
consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-2',
1544+
start_message_id_inclusive=True, key_shared_policy=consumer2_key_shared_policy)
1545+
producer = client.create_producer(topic)
1546+
1547+
for i in range(10):
1548+
if i > 0:
1549+
time.sleep(0.02)
1550+
producer.send(b"hello-%d" % i)
1551+
1552+
msgs = []
1553+
while True:
1554+
try:
1555+
msg = consumer.receive(100)
1556+
except pulsar.Timeout:
1557+
break
1558+
msgs.append(msgs)
1559+
consumer.acknowledge(msg)
1560+
1561+
while True:
1562+
try:
1563+
msg = consumer2.receive(100)
1564+
except pulsar.Timeout:
1565+
break
1566+
msgs.append(msgs)
1567+
consumer2.acknowledge(msg)
1568+
1569+
self.assertEqual(len(msgs), 10)
1570+
client.close()
14401571

14411572
if __name__ == "__main__":
14421573
main()

0 commit comments

Comments
 (0)