[KIP-932] : Implement acknowledgement callback for Share Consumer#2253
[KIP-932] : Implement acknowledgement callback for Share Consumer#2253Kaushik Raina (k-raina) wants to merge 3 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
|
| return NULL; | ||
| } | ||
|
|
||
| if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws, &callback)) |
There was a problem hiding this comment.
Check for callback input as null
Pratyush Ranjan (PratRanj07)
left a comment
There was a problem hiding this comment.
Thanks Kaushik Raina (@k-raina) for the PR! Check the tests on if we should add more. Some I can think of :
- Checking if calling Share consumer functions from the other thread should raise exception. Take a look at librdkafka's rd_kafka_acquire and Java's acquire
- Write some tests like registring a callback and calling poll with explicit mode, then registering a different callbackand then checking if they are being invoked correctly when commiting
Take a look and improve nomenclatures in tests
| * @returns The new Python dict, or NULL on allocation failure with an | ||
| * exception set. | ||
| */ | ||
| PyObject *c_share_partition_offsets_list_to_py( |
There was a problem hiding this comment.
| PyObject *c_share_partition_offsets_list_to_py( | |
| PyObject *c_share_partition_offsets_list_to_py_dict( |
| crash: | ||
| CallState_fetch_exception(cs); | ||
| CallState_crash(cs); | ||
| rd_kafka_yield(NULL); |
There was a problem hiding this comment.
Should we pass here NULL or rk?
| const rd_kafka_share_partition_offsets_t *entry = | ||
| rd_kafka_share_partition_offsets_list_get(list, | ||
| partition_index); | ||
| const rd_kafka_topic_partition_t *rktpar = | ||
| rd_kafka_share_partition_offsets_partition(entry); | ||
| const int64_t *offsets = | ||
| rd_kafka_share_partition_offsets_offsets(entry); | ||
| size_t offsets_count = | ||
| rd_kafka_share_partition_offsets_offsets_cnt(entry); |
There was a problem hiding this comment.
We should add NULL checks here
| * | ||
| * | ||
| * Common instance handle for both Producer and Consumer | ||
| * Common instance handle for Producer, Consumer, and ShareConsumer |
There was a problem hiding this comment.
| * Common instance handle for Producer, Consumer, and ShareConsumer | |
| * Common instance handle for Producer, Consumer and ShareConsumer |
| batch = [] | ||
| deadline = time.time() + 20.0 | ||
| while time.time() < deadline and len(batch) < num_messages: | ||
| for m in sc.poll(timeout=0.5): |
There was a problem hiding this comment.
Change naming
| """Calling share-consumer APIs from inside the cb fails with _STATE — | ||
| librdkafka guards every entry point against reentrancy.""" |
There was a problem hiding this comment.
Check with Java if this is the correct error. We should match Java semantics


Summary
(offsets, exception) -> NoneorNoneto clear.ShareConsumer_acknowledgement_commit_cbplus helperc_share_partition_offsets_list_to_py()that converts librdkafka'srd_kafka_share_partition_offsets_list_tintoDict[TopicPartition, frozenset[int]].longer piggybacks on the Consumer fields. GC traverse/clear updated accordingly.
ests/integration/share_consumer/test_share_consumer_ack.pycovering success, error,clear-callback, and lifecycle paths.