|
48 | 48 | import _pulsar |
49 | 49 |
|
50 | 50 | from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \ |
51 | | - LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode # noqa: F401 |
| 51 | + LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode # noqa: F401 |
52 | 52 |
|
53 | 53 | from pulsar.__about__ import __version__ |
54 | 54 |
|
@@ -523,7 +523,8 @@ def create_producer(self, topic, |
523 | 523 | properties=None, |
524 | 524 | batching_type=BatchingType.Default, |
525 | 525 | encryption_key=None, |
526 | | - crypto_key_reader=None |
| 526 | + crypto_key_reader=None, |
| 527 | + access_mode=ProducerAccessMode.Shared, |
527 | 528 | ): |
528 | 529 | """ |
529 | 530 | Create a new producer on a given topic. |
@@ -614,6 +615,15 @@ def create_producer(self, topic, |
614 | 615 | crypto_key_reader: CryptoKeyReader, optional |
615 | 616 | Symmetric encryption class implementation, configuring public key encryption messages for the producer |
616 | 617 | and private key decryption messages for the consumer |
| 618 | + access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared |
| 619 | + Set the type of access mode that the producer requires on the topic. |
| 620 | + Supported modes: |
| 621 | + * Shared: By default multiple producers can publish on a topic. |
| 622 | + * Exclusive: Require exclusive access for producer. |
| 623 | + Fail immediately if there's already a producer connected. |
| 624 | + * WaitForExclusive: Producer creation is pending until it can acquire exclusive access. |
| 625 | + * ExclusiveWithFencing: Acquire exclusive access for the producer. |
| 626 | + Any existing producer will be removed and invalidated immediately. |
617 | 627 | """ |
618 | 628 | _check_type(str, topic, 'topic') |
619 | 629 | _check_type_or_none(str, producer_name, 'producer_name') |
@@ -649,6 +659,7 @@ def create_producer(self, topic, |
649 | 659 | conf.batching_type(batching_type) |
650 | 660 | conf.chunking_enabled(chunking_enabled) |
651 | 661 | conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) |
| 662 | + conf.access_mode(access_mode) |
652 | 663 | if producer_name: |
653 | 664 | conf.producer_name(producer_name) |
654 | 665 | if initial_sequence_id: |
|
0 commit comments