Skip to content

Implementation of the asyncio consumer#228

Closed
Nictec wants to merge 16 commits into
apache:mainfrom
Nictec:nl-full-asyncio
Closed

Implementation of the asyncio consumer#228
Nictec wants to merge 16 commits into
apache:mainfrom
Nictec:nl-full-asyncio

Conversation

@Nictec

@Nictec Nictec commented Sep 20, 2024

Copy link
Copy Markdown

Hi,
I implemented the asyncio consumer in a similar way as the producer. All missing async functions provided by the C++ library are now implemented in the pybind11 classes and I use futures in python to convert the functions with callbacks to async functions. The reason I started with this implementation is because I need the async consumer in a project with FastAPI myself.
Tests could be a little bit patchy, for that reason i would appreciate help if i missed something

@BewareMyPower BewareMyPower left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the binaries from the git commits

@BewareMyPower

Copy link
Copy Markdown
Contributor
Traceback (most recent call last):
  File "asyncio_test.py", line 27, in <module>
    from pulsar.asyncio import (
  File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pulsar/asyncio.py", line 165, in <module>
    class Consumer:
  File "/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pulsar/asyncio.py", line 216, in Consumer
    async def seek(self, position: tuple[int, int, int, int] | pulsar.MessageId):
TypeError: 'type' object is not subscriptable

Could you also make the tests pass for Python 3.8? Though Python 3.8 will reach EOL on the next month (2024-10)

@Nictec

Nictec commented Sep 30, 2024

Copy link
Copy Markdown
Author

I will try to fix the issues this week.

@Nictec

Nictec commented Oct 18, 2024

Copy link
Copy Markdown
Author

I found another issue with the schema system in my asyncio implementation, i will fix this too before the next commit.

@Nictec Nictec requested a review from BewareMyPower October 18, 2024 15:06
@Nictec Nictec removed their assignment Oct 18, 2024
@BewareMyPower

Copy link
Copy Markdown
Contributor

Let me fix the broken CI first

@BewareMyPower

Copy link
Copy Markdown
Contributor

Could you rebase to master to resolve the conflicts and have the CI fixed?

@BewareMyPower BewareMyPower added this to the 3.6.0 milestone Nov 4, 2024
Comment thread develop/install_manifest.txt Outdated
Comment thread src/client.cc Outdated
Comment thread src/consumer.cc

@BewareMyPower BewareMyPower left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Please remove these binaries.

@merlimat

merlimat commented Dec 2, 2024

Copy link
Copy Markdown
Contributor

@BewareMyPower I removed the binary files

Comment thread pulsar/asyncio.py Outdated

@BewareMyPower BewareMyPower left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you wrap an xxxAsync API from pulsar-client-cpp, the correct way is:

// release the GIL because xxxAsync does not access any Python objects
py::gil_scoped_release release;
// Pass the callback directly because pybind11 acquires the GIL automatically when the callback is executed
xxx.xxxAsync(yyy, callback);

Comment thread tests/asyncio_test.py Outdated
Comment thread src/client.cc Outdated
Comment thread src/consumer.cc

@BewareMyPower BewareMyPower left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also fix the wrapper in consumer.cc

Comment thread src/consumer.cc

void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) {
consumer.unsubscribeAsync([callback] (Result result) {
py::gil_scoped_acquire acquire;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This GIL acquire is not necessary

Comment thread src/consumer.cc
}

void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
py::gil_scoped_acquire acquire;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should release the GIL rather than acquire the GIL

@BewareMyPower BewareMyPower modified the milestones: 3.6.0, 3.7.0 Jan 21, 2025
@shibd shibd modified the milestones: 3.7.0, 3.8.0 May 12, 2025
@BewareMyPower BewareMyPower modified the milestones: 3.8.0, 3.9.0 Jul 19, 2025
@BewareMyPower

Copy link
Copy Markdown
Contributor

I will continue the work in #277

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants