Skip to content

[KIP-932] : Implement acknowledgement callback for Share Consumer#2253

Open
Kaushik Raina (k-raina) wants to merge 3 commits into
dev_kip-932_queues-for-kafkafrom
dev_kip-932_share_consumer_ack_cb
Open

[KIP-932] : Implement acknowledgement callback for Share Consumer#2253
Kaushik Raina (k-raina) wants to merge 3 commits into
dev_kip-932_queues-for-kafkafrom
dev_kip-932_share_consumer_ack_cb

Conversation

@k-raina
Copy link
Copy Markdown
Member

@k-raina Kaushik Raina (k-raina) commented May 26, 2026

Summary

  • Adds a runtime-registered acknowledgement-commit callback for ShareConsumer.
  • New Python method ShareConsumer.set_acknowledgement_commit_callback(callback) which accepts a callable (offsets, exception) -> None or None to clear.
  • New C trampoline ShareConsumer_acknowledgement_commit_cb plus helper c_share_partition_offsets_list_to_py() that converts librdkafka's rd_kafka_share_partition_offsets_list_t into Dict[TopicPartition, frozenset[int]].
  • Carves out a dedicated ShareConsumer arm in the Handle union so the share-consumer state no
    longer piggybacks on the Consumer fields. GC traverse/clear updated accordingly.
  • Adds integration tests in tests/integration/share_consumer/test_share_consumer_ack.py covering success, error,
    clear-callback, and lifecycle paths.

@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@sonarqube-confluent
Copy link
Copy Markdown

Quality Gate failed Quality Gate failed

Failed conditions
1.2% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube

return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws, &callback))
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for callback input as null

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Kaushik Raina (@k-raina) for the PR! Check the tests on if we should add more. Some I can think of :

  1. Checking if calling Share consumer functions from the other thread should raise exception. Take a look at librdkafka's rd_kafka_acquire and Java's acquire
  2. Write some tests like registring a callback and calling poll with explicit mode, then registering a different callbackand then checking if they are being invoked correctly when commiting

Take a look and improve nomenclatures in tests

* @returns The new Python dict, or NULL on allocation failure with an
* exception set.
*/
PyObject *c_share_partition_offsets_list_to_py(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PyObject *c_share_partition_offsets_list_to_py(
PyObject *c_share_partition_offsets_list_to_py_dict(

crash:
CallState_fetch_exception(cs);
CallState_crash(cs);
rd_kafka_yield(NULL);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass here NULL or rk?

Comment on lines +1659 to +1667
const rd_kafka_share_partition_offsets_t *entry =
rd_kafka_share_partition_offsets_list_get(list,
partition_index);
const rd_kafka_topic_partition_t *rktpar =
rd_kafka_share_partition_offsets_partition(entry);
const int64_t *offsets =
rd_kafka_share_partition_offsets_offsets(entry);
size_t offsets_count =
rd_kafka_share_partition_offsets_offsets_cnt(entry);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add NULL checks here

*
*
* Common instance handle for both Producer and Consumer
* Common instance handle for Producer, Consumer, and ShareConsumer
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Common instance handle for Producer, Consumer, and ShareConsumer
* Common instance handle for Producer, Consumer and ShareConsumer

batch = []
deadline = time.time() + 20.0
while time.time() < deadline and len(batch) < num_messages:
for m in sc.poll(timeout=0.5):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change naming

Comment on lines +1142 to +1143
"""Calling share-consumer APIs from inside the cb fails with _STATE —
librdkafka guards every entry point against reentrancy."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check with Java if this is the correct error. We should match Java semantics

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants