Skip to content

Commit 70701af

Browse files
committed
[AIT-316] feat: introduce support for message annotations
- Added `RealtimeAnnotations` class to manage annotation creation, deletion, and subscription on realtime channels. - Introduced `Annotation` and `AnnotationAction` types to encapsulate annotation details and actions. - Extended flags to include `ANNOTATION_PUBLISH` and `ANNOTATION_SUBSCRIBE`. - Refactored data encoding logic into `ably.util.encoding`. - Integrated annotation handling into `RealtimeChannel` and `RestChannel`.
1 parent 3d2c3c4 commit 70701af

File tree

4 files changed

+104
-118
lines changed

4 files changed

+104
-118
lines changed

ably/realtime/annotations.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def __init__(self, channel: RealtimeChannel, connection_manager: ConnectionManag
4040
self.__subscriptions = EventEmitter()
4141
self.__rest_annotations = RestAnnotations(channel)
4242

43-
async def publish(self, msg_or_serial, annotation: dict | Annotation, params: dict=None):
43+
async def publish(self, msg_or_serial, annotation: dict | Annotation, params: dict | None = None):
4444
"""
4545
Publish an annotation on a message via the realtime connection.
4646
@@ -84,7 +84,12 @@ async def publish(self, msg_or_serial, annotation: dict | Annotation, params: di
8484
# Send via WebSocket
8585
await self.__connection_manager.send_protocol_message(protocol_message)
8686

87-
async def delete(self, msg_or_serial, annotation: dict | Annotation, params=None, timeout=None):
87+
async def delete(
88+
self,
89+
msg_or_serial,
90+
annotation: dict | Annotation,
91+
params: dict | None = None,
92+
):
8893
"""
8994
Delete an annotation on a message.
9095
@@ -95,7 +100,6 @@ async def delete(self, msg_or_serial, annotation: dict | Annotation, params=None
95100
msg_or_serial: Either a message serial (string) or a Message object
96101
annotation: Dict containing annotation properties or Annotation object
97102
params: Optional dict of query parameters
98-
timeout: Optional timeout (not used for realtime, kept for compatibility)
99103
100104
Returns:
101105
None
@@ -161,13 +165,13 @@ async def subscribe(self, *args):
161165

162166
# Check if ANNOTATION_SUBSCRIBE mode is enabled
163167
if self.__channel.state == ChannelState.ATTACHED:
164-
if not Flag.ANNOTATION_SUBSCRIBE in self.__channel.modes:
168+
if Flag.ANNOTATION_SUBSCRIBE not in self.__channel.modes:
165169
raise AblyException(
166-
"You are trying to add an annotation listener, but you haven't requested the "
170+
message="You are trying to add an annotation listener, but you haven't requested the "
167171
"annotation_subscribe channel mode in ChannelOptions, so this won't do anything "
168172
"(we only deliver annotations to clients who have explicitly requested them)",
169-
93001,
170-
400
173+
code=93001,
174+
status_code=400,
171175
)
172176

173177
def unsubscribe(self, *args):
@@ -219,7 +223,7 @@ def _process_incoming(self, incoming_annotations):
219223
annotation_type = annotation.type or ''
220224
self.__subscriptions._emit(annotation_type, annotation)
221225

222-
async def get(self, msg_or_serial, params=None):
226+
async def get(self, msg_or_serial, params: dict | None = None):
223227
"""
224228
Retrieve annotations for a message with pagination support.
225229

ably/rest/annotations.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from ably.http.paginatedresult import PaginatedResult, format_params
1010
from ably.types.annotation import (
1111
Annotation,
12+
AnnotationAction,
1213
make_annotation_response_handler,
1314
)
1415
from ably.types.message import Message
@@ -108,23 +109,27 @@ def __base_path_for_serial(self, serial):
108109
channel_path = '/channels/{}/'.format(parse.quote_plus(self.__channel.name, safe=':'))
109110
return channel_path + 'messages/' + parse.quote_plus(serial, safe=':') + '/annotations'
110111

111-
async def publish(self, msg_or_serial, annotation_values, params=None, timeout=None):
112+
async def publish(
113+
self,
114+
msg_or_serial,
115+
annotation: dict | Annotation,
116+
params: dict | None = None,
117+
):
112118
"""
113119
Publish an annotation on a message.
114120
115121
Args:
116122
msg_or_serial: Either a message serial (string) or a Message object
117-
annotation_values: Dict containing annotation properties (type, name, data, etc.)
123+
annotation: Dict containing annotation properties (type, name, data, etc.) or Annotation object
118124
params: Optional dict of query parameters
119-
timeout: Optional timeout for the HTTP request
120125
121126
Returns:
122127
None
123128
124129
Raises:
125130
AblyException: If the request fails or inputs are invalid
126131
"""
127-
annotation = construct_validate_annotation(msg_or_serial, annotation_values)
132+
annotation = construct_validate_annotation(msg_or_serial, annotation)
128133

129134
# Convert to wire format
130135
request_body = annotation.as_dict(binary=self.__channel.ably.options.use_binary_protocol)
@@ -145,9 +150,14 @@ async def publish(self, msg_or_serial, annotation_values, params=None, timeout=N
145150
path += '?' + parse.urlencode(params)
146151

147152
# Send request
148-
await self.__channel.ably.http.post(path, body=request_body, timeout=timeout)
149-
150-
async def delete(self, msg_or_serial, annotation_values, params=None, timeout=None):
153+
await self.__channel.ably.http.post(path, body=request_body)
154+
155+
async def delete(
156+
self,
157+
msg_or_serial,
158+
annotation: dict | Annotation,
159+
params: dict | None = None,
160+
):
151161
"""
152162
Delete an annotation on a message.
153163
@@ -156,9 +166,8 @@ async def delete(self, msg_or_serial, annotation_values, params=None, timeout=No
156166
157167
Args:
158168
msg_or_serial: Either a message serial (string) or a Message object
159-
annotation_values: Dict containing annotation properties
169+
annotation: Dict containing annotation properties or Annotation object
160170
params: Optional dict of query parameters
161-
timeout: Optional timeout for the HTTP request
162171
163172
Returns:
164173
None
@@ -167,11 +176,14 @@ async def delete(self, msg_or_serial, annotation_values, params=None, timeout=No
167176
AblyException: If the request fails or inputs are invalid
168177
"""
169178
# Set action to delete
170-
annotation_values = annotation_values.copy()
171-
annotation_values['action'] = 'annotation.delete'
172-
return await self.publish(msg_or_serial, annotation_values, params, timeout)
179+
if isinstance(annotation, Annotation):
180+
annotation_values = annotation.as_dict()
181+
else:
182+
annotation_values = annotation.copy()
183+
annotation_values['action'] = AnnotationAction.ANNOTATION_DELETE
184+
return await self.publish(msg_or_serial, annotation_values, params)
173185

174-
async def get(self, msg_or_serial, params=None):
186+
async def get(self, msg_or_serial, params: dict | None = None):
175187
"""
176188
Retrieve annotations for a message with pagination support.
177189

test/ably/realtime/realtimeannotations_test.py

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@
55

66
from ably import AblyException
77
from ably.types.annotation import AnnotationAction
8+
from ably.types.channelmode import ChannelMode
89
from ably.types.channeloptions import ChannelOptions
9-
from ably.types.message import MessageAction
1010
from test.ably.testapp import TestApp
1111
from test.ably.utils import BaseAsyncTestCase, assert_waiter
12-
from ably.types.channelmode import ChannelMode
1312

1413
log = logging.getLogger(__name__)
1514

@@ -20,11 +19,17 @@ class TestRealtimeAnnotations(BaseAsyncTestCase):
2019
@pytest.fixture(autouse=True)
2120
async def setup(self, transport):
2221
self.test_vars = await TestApp.get_test_vars()
22+
# Use clientId to match JS tests (required for reaction:distinct.v1 annotation type)
23+
import random
24+
import string
25+
client_id = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
2326
self.ably = await TestApp.get_ably_realtime(
2427
use_binary_protocol=True if transport == 'msgpack' else False,
28+
client_id=client_id,
2529
)
2630
self.rest = await TestApp.get_ably_rest(
2731
use_binary_protocol=True if transport == 'msgpack' else False,
32+
client_id=client_id,
2833
)
2934

3035
async def test_publish_and_subscribe_annotations(self):
@@ -39,7 +44,6 @@ async def test_publish_and_subscribe_annotations(self):
3944
self.get_channel_name('mutable:publish_subscribe_annotation'),
4045
channel_options
4146
)
42-
rest_channel = self.rest.channels[channel.name]
4347
await channel.attach()
4448

4549
# Setup annotation listener
@@ -65,15 +69,15 @@ def on_message(msg):
6569

6670
# Publish annotation using realtime
6771
await channel.annotations.publish(publish_result.serials[0], {
68-
'type': 'reaction:multiple.v1',
72+
'type': 'reaction:distinct.v1',
6973
'name': '👍'
7074
})
7175

7276
# Wait for annotation
7377
annotation = await annotation_future
7478
assert annotation.action == AnnotationAction.ANNOTATION_CREATE
7579
assert annotation.message_serial == publish_result.serials[0]
76-
assert annotation.type == 'reaction:multiple.v1'
80+
assert annotation.type == 'reaction:distinct.v1'
7781
assert annotation.name == '👍'
7882
assert annotation.serial > annotation.message_serial
7983

@@ -92,22 +96,25 @@ def on_message(msg):
9296
# await channel.annotations.subscribe(on_annotation2)
9397
#
9498
# await rest_channel.annotations.publish(publish_result.serials[0], {
95-
# 'type': 'reaction:multiple.v1',
99+
# 'type': 'reaction:distinct.v1',
96100
# 'name': '😕'
97101
# })
98102
#
99103
# annotation = await annotation_future2
100104
# assert annotation.action == AnnotationAction.ANNOTATION_CREATE
101105
# assert annotation.message_serial == publish_result.serials[0]
102-
# assert annotation.type == 'reaction:multiple.v1'
106+
# assert annotation.type == 'reaction:distinct.v1'
103107
# assert annotation.name == '😕'
104108
# assert annotation.serial > annotation.message_serial
105109

106110
async def test_get_all_annotations_for_a_message(self):
107111
"""Test retrieving all annotations with pagination (matches JS test)"""
108-
channel_options = ChannelOptions(params={
109-
'modes': 'publish,subscribe,annotation_publish,annotation_subscribe'
110-
})
112+
channel_options = ChannelOptions(modes=[
113+
ChannelMode.PUBLISH,
114+
ChannelMode.SUBSCRIBE,
115+
ChannelMode.ANNOTATION_PUBLISH,
116+
ChannelMode.ANNOTATION_SUBSCRIBE
117+
])
111118
channel = self.ably.channels.get(
112119
self.get_channel_name('mutable:get_all_annotations_for_a_message'),
113120
channel_options
@@ -131,7 +138,7 @@ def on_message(msg):
131138
emojis = ['👍', '😕', '👎', '👍👍', '😕😕', '👎👎']
132139
for emoji in emojis:
133140
await channel.annotations.publish(message.serial, {
134-
'type': 'reaction:multiple.v1',
141+
'type': 'reaction:distinct.v1',
135142
'name': emoji
136143
})
137144

@@ -149,7 +156,7 @@ async def check_annotations():
149156
# Verify annotations
150157
assert annotations[0].action == AnnotationAction.ANNOTATION_CREATE
151158
assert annotations[0].message_serial == message.serial
152-
assert annotations[0].type == 'reaction:multiple.v1'
159+
assert annotations[0].type == 'reaction:distinct.v1'
153160
assert annotations[0].name == '👍'
154161
assert annotations[1].name == '😕'
155162
assert annotations[2].name == '👎'
@@ -176,9 +183,12 @@ async def check_annotations():
176183

177184
async def test_subscribe_by_annotation_type(self):
178185
"""Test subscribing to specific annotation types"""
179-
channel_options = ChannelOptions(params={
180-
'modes': 'publish,subscribe,annotation_publish,annotation_subscribe'
181-
})
186+
channel_options = ChannelOptions(modes=[
187+
ChannelMode.PUBLISH,
188+
ChannelMode.SUBSCRIBE,
189+
ChannelMode.ANNOTATION_PUBLISH,
190+
ChannelMode.ANNOTATION_SUBSCRIBE
191+
])
182192
channel = self.ably.channels.get(
183193
self.get_channel_name('mutable:subscribe_by_type'),
184194
channel_options
@@ -201,30 +211,31 @@ async def on_reaction(annotation):
201211
if not reaction_future.done():
202212
reaction_future.set_result(annotation)
203213

204-
await channel.annotations.subscribe('reaction:multiple.v1', on_reaction)
214+
await channel.annotations.subscribe('reaction:distinct.v1', on_reaction)
205215

206216
# Publish message and annotation
207217
await channel.publish('message', 'test')
208218
message = await message_future
209219

210-
# Temporary anti-flake measure (matches JS test)
211-
await asyncio.sleep(1)
212220

213221
await channel.annotations.publish(message.serial, {
214-
'type': 'reaction:multiple.v1',
222+
'type': 'reaction:distinct.v1',
215223
'name': '👍'
216224
})
217225

218226
# Should receive the annotation
219227
annotation = await reaction_future
220-
assert annotation.type == 'reaction:multiple.v1'
228+
assert annotation.type == 'reaction:distinct.v1'
221229
assert annotation.name == '👍'
222230

223231
async def test_unsubscribe_annotations(self):
224232
"""Test unsubscribing from annotations"""
225-
channel_options = ChannelOptions(params={
226-
'modes': 'publish,subscribe,annotation_publish,annotation_subscribe'
227-
})
233+
channel_options = ChannelOptions(modes=[
234+
ChannelMode.PUBLISH,
235+
ChannelMode.SUBSCRIBE,
236+
ChannelMode.ANNOTATION_PUBLISH,
237+
ChannelMode.ANNOTATION_SUBSCRIBE
238+
])
228239
channel = self.ably.channels.get(
229240
self.get_channel_name('mutable:unsubscribe_annotations'),
230241
channel_options
@@ -251,11 +262,8 @@ async def on_annotation(annotation):
251262
await channel.publish('message', 'test')
252263
message = await message_future
253264

254-
# Temporary anti-flake measure (matches JS test)
255-
await asyncio.sleep(1)
256-
257265
await channel.annotations.publish(message.serial, {
258-
'type': 'reaction:multiple.v1',
266+
'type': 'reaction:distinct.v1',
259267
'name': '👍'
260268
})
261269

@@ -267,7 +275,7 @@ async def on_annotation(annotation):
267275

268276
# Publish another annotation
269277
await channel.annotations.publish(message.serial, {
270-
'type': 'reaction:multiple.v1',
278+
'type': 'reaction:distinct.v1',
271279
'name': '😕'
272280
})
273281

@@ -276,9 +284,12 @@ async def on_annotation(annotation):
276284

277285
async def test_delete_annotation(self):
278286
"""Test deleting annotations"""
279-
channel_options = ChannelOptions(params={
280-
'modes': 'publish,subscribe,annotation_publish,annotation_subscribe'
281-
})
287+
channel_options = ChannelOptions(modes=[
288+
ChannelMode.PUBLISH,
289+
ChannelMode.SUBSCRIBE,
290+
ChannelMode.ANNOTATION_PUBLISH,
291+
ChannelMode.ANNOTATION_SUBSCRIBE
292+
])
282293
channel = self.ably.channels.get(
283294
self.get_channel_name('mutable:delete_annotation'),
284295
channel_options
@@ -305,11 +316,8 @@ async def on_annotation(annotation):
305316
await channel.publish('message', 'test')
306317
message = await message_future
307318

308-
# Temporary anti-flake measure (matches JS test)
309-
await asyncio.sleep(1)
310-
311319
await channel.annotations.publish(message.serial, {
312-
'type': 'reaction:multiple.v1',
320+
'type': 'reaction:distinct.v1',
313321
'name': '👍'
314322
})
315323

@@ -319,7 +327,7 @@ async def on_annotation(annotation):
319327

320328
# Delete the annotation
321329
await channel.annotations.delete(message.serial, {
322-
'type': 'reaction:multiple.v1',
330+
'type': 'reaction:distinct.v1',
323331
'name': '👍'
324332
})
325333

@@ -330,9 +338,10 @@ async def on_annotation(annotation):
330338
async def test_subscribe_without_annotation_mode_fails(self):
331339
"""Test that subscribing without annotation_subscribe mode raises an error"""
332340
# Create channel without annotation_subscribe mode
333-
channel_options = ChannelOptions(params={
334-
'modes': 'publish,subscribe'
335-
})
341+
channel_options = ChannelOptions(modes=[
342+
ChannelMode.PUBLISH,
343+
ChannelMode.SUBSCRIBE
344+
])
336345
channel = self.ably.channels.get(
337346
self.get_channel_name('mutable:no_annotation_mode'),
338347
channel_options

0 commit comments

Comments
 (0)