Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hazelcast/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
from hazelcast.internal.asyncio_proxy.map import Map, EntryEventCallable
from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap
from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection
from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener
4 changes: 2 additions & 2 deletions hazelcast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class MemberInfo:
def __init__(
self,
address: "Address",
member_uuid: uuid.UUID,
attributes: typing.Dict[str, str],
member_uuid: uuid.UUID | None,
attributes: typing.Dict[str, str] | None,
lite_member: bool,
version: "MemberVersion",
_,
Expand Down
26 changes: 26 additions & 0 deletions hazelcast/internal/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@
MULTI_MAP_SERVICE,
ProxyManager,
QUEUE_SERVICE,
RELIABLE_TOPIC_SERVICE,
REPLICATED_MAP_SERVICE,
RINGBUFFER_SERVICE,
VECTOR_SERVICE,
)
from hazelcast.internal.asyncio_proxy.list import List
from hazelcast.internal.asyncio_proxy.map import Map
from hazelcast.internal.asyncio_proxy.multi_map import MultiMap
from hazelcast.internal.asyncio_proxy.queue import Queue
from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic
from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap
from hazelcast.internal.asyncio_proxy.ringbuffer import Ringbuffer
from hazelcast.internal.asyncio_reactor import AsyncioReactor
from hazelcast.serialization import SerializationServiceV1
from hazelcast.internal.asyncio_statistics import Statistics
Expand Down Expand Up @@ -310,6 +314,28 @@ async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueTyp
"""
return await self._proxy_manager.get_or_create(REPLICATED_MAP_SERVICE, name)

async def get_reliable_topic(self, name: str) -> ReliableTopic:
"""Returns the ReliableTopic instance with the specified name.

Args:
name: Name of the ReliableTopic.

Returns:
Distributed ReliableTopic instance with the specified name.
"""
return await self._proxy_manager.get_or_create(RELIABLE_TOPIC_SERVICE, name)

async def get_ringbuffer(self, name: str) -> Ringbuffer:
"""Returns the distributed Ringbuffer instance with the specified name.

Args:
name: Name of the distributed ringbuffer.

Returns:
Distributed Ringbuffer instance with the specified name.
"""
return await self._proxy_manager.get_or_create(RINGBUFFER_SERVICE, name)

async def create_vector_collection_config(
self,
name: str,
Expand Down
40 changes: 27 additions & 13 deletions hazelcast/internal/asyncio_proxy/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,27 @@
from hazelcast.internal.asyncio_proxy.multi_map import create_multi_map_proxy
from hazelcast.internal.asyncio_proxy.queue import create_queue_proxy
from hazelcast.internal.asyncio_proxy.vector_collection import (
VectorCollection,
create_vector_collection_proxy,
)
from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec
from hazelcast.internal.asyncio_invocation import Invocation
from hazelcast.internal.asyncio_proxy.base import Proxy
from hazelcast.internal.asyncio_proxy.map import create_map_proxy
from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic
from hazelcast.internal.asyncio_proxy.replicated_map import create_replicated_map_proxy
from hazelcast.internal.asyncio_proxy.ringbuffer import create_ringbuffer_proxy
from hazelcast.proxy.reliable_topic import _RINGBUFFER_PREFIX
from hazelcast.util import to_list

LIST_SERVICE = "hz:impl:listService"
MAP_SERVICE = "hz:impl:mapService"
MULTI_MAP_SERVICE = "hz:impl:multiMapService"
QUEUE_SERVICE = "hz:impl:queueService"
RELIABLE_TOPIC_SERVICE = "hz:impl:reliableTopicService"
REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService"
RINGBUFFER_SERVICE = "hz:impl:ringbufferService"
VECTOR_SERVICE = "hz:service:vector"

_proxy_init: typing.Dict[
str,
typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]],
] = {
LIST_SERVICE: create_list_proxy,
MAP_SERVICE: create_map_proxy,
MULTI_MAP_SERVICE: create_multi_map_proxy,
QUEUE_SERVICE: create_queue_proxy,
REPLICATED_MAP_SERVICE: create_replicated_map_proxy,
VECTOR_SERVICE: create_vector_collection_proxy,
}


class ProxyManager:
def __init__(self, context):
Expand Down Expand Up @@ -86,3 +78,25 @@ async def destroy_proxy(self, service_name, name, destroy_on_remote=True):

def get_distributed_objects(self):
return to_list(v for v in self._proxies.values() if not isinstance(v, asyncio.Future))


async def create_reliable_topic_proxy(service_name, name, context):
ringbuffer = await context.proxy_manager.get_or_create(
RINGBUFFER_SERVICE, _RINGBUFFER_PREFIX + name, create_on_remote=False
)
return ReliableTopic(service_name, name, context, ringbuffer)


_proxy_init: typing.Dict[
str,
typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]],
] = {
LIST_SERVICE: create_list_proxy,
MAP_SERVICE: create_map_proxy,
MULTI_MAP_SERVICE: create_multi_map_proxy,
QUEUE_SERVICE: create_queue_proxy,
RELIABLE_TOPIC_SERVICE: create_reliable_topic_proxy,
REPLICATED_MAP_SERVICE: create_replicated_map_proxy,
RINGBUFFER_SERVICE: create_ringbuffer_proxy,
VECTOR_SERVICE: create_vector_collection_proxy,
}
Loading
Loading