Skip to content

Commit f07c984

Browse files
committed
[ECO-5065] feat: add support for managing message annotations via REST and Realtime APIs
Introduced new classes and functionality to enable creating, retrieving, and deleting annotations on messages, both synchronously and asynchronously. Extended protocol support to handle annotation operations and added serialization/deserialization logic for annotations and summaries.
1 parent aedfd9b commit f07c984

File tree

15 files changed

+1249
-24
lines changed

15 files changed

+1249
-24
lines changed

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.ably.lib.http.Http;
1414
import io.ably.lib.http.HttpCore;
1515
import io.ably.lib.http.HttpUtils;
16+
import io.ably.lib.rest.RestAnnotation;
1617
import io.ably.lib.transport.ConnectionManager;
1718
import io.ably.lib.transport.ConnectionManager.QueuedMessage;
1819
import io.ably.lib.transport.Defaults;
@@ -91,6 +92,8 @@ public abstract class ChannelBase extends EventEmitter<ChannelEvent, ChannelStat
9192
*/
9293
private boolean released = false;
9394

95+
public final RealtimeAnnotation annotations;
96+
9497
/***
9598
* internal
9699
*
@@ -1295,6 +1298,10 @@ else if(stateChange.current.equals(failureState)) {
12951298
this.attachResume = false;
12961299
state = ChannelState.initialized;
12971300
this.decodingContext = new DecodingContext();
1301+
this.annotations = new RealtimeAnnotation(
1302+
this,
1303+
new RestAnnotation(name, ably.http, ably.options, options)
1304+
);
12981305
}
12991306

13001307
void onChannelMessage(ProtocolMessage msg) {
@@ -1361,6 +1368,9 @@ void onChannelMessage(ProtocolMessage msg) {
13611368
case error:
13621369
setFailed(msg.error);
13631370
break;
1371+
case annotation:
1372+
annotations.onAnnotation(msg);
1373+
break;
13641374
default:
13651375
Log.e(TAG, "onChannelMessage(): Unexpected message action (" + msg.action + ")");
13661376
}
@@ -1387,6 +1397,17 @@ public void once(ChannelState state, ChannelStateListener listener) {
13871397
super.once(state.getChannelEvent(), listener);
13881398
}
13891399

1400+
/**
1401+
* (Internal) Sends a protocol message and provides a callback for completion.
1402+
*
1403+
* @param protocolMessage the protocol message to be sent
1404+
* @param listener the listener to be notified upon completion of the message delivery
1405+
*/
1406+
public void sendProtocolMessage(ProtocolMessage protocolMessage, CompletionListener listener) throws AblyException {
1407+
ConnectionManager connectionManager = ably.connection.connectionManager;
1408+
connectionManager.send(protocolMessage, ably.options.queueMessages, listener);
1409+
}
1410+
13901411
private static final String TAG = Channel.class.getName();
13911412
final AblyRealtime ably;
13921413
final String basePath;
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
package io.ably.lib.realtime;
2+
3+
import io.ably.lib.rest.RestAnnotation;
4+
import io.ably.lib.types.AblyException;
5+
import io.ably.lib.types.Annotation;
6+
import io.ably.lib.types.AnnotationAction;
7+
import io.ably.lib.types.AsyncPaginatedResult;
8+
import io.ably.lib.types.Callback;
9+
import io.ably.lib.types.ErrorInfo;
10+
import io.ably.lib.types.MessageDecodeException;
11+
import io.ably.lib.types.PaginatedResult;
12+
import io.ably.lib.types.Param;
13+
import io.ably.lib.types.ProtocolMessage;
14+
import io.ably.lib.util.Log;
15+
import io.ably.lib.util.Multicaster;
16+
17+
import java.util.ArrayList;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Locale;
21+
import java.util.Map;
22+
23+
/**
24+
* RealtimeAnnotation provides subscription capabilities for annotations received on a channel.
25+
* It allows adding or removing listeners to handle annotation events and facilitates broadcasting
26+
* those events to the appropriate listeners.
27+
* <p>
28+
* Note: This is an experimental API. While the underlying functionality is stable,
29+
* the public API may change in future releases.
30+
*/
31+
public class RealtimeAnnotation {
32+
33+
private static final String TAG = RealtimeAnnotation.class.getName();
34+
35+
private final ChannelBase channel;
36+
private final RestAnnotation restAnnotation;
37+
private final AnnotationMulticaster listeners = new AnnotationMulticaster();
38+
private final Map<String, AnnotationMulticaster> typeListeners = new HashMap<>();
39+
40+
public RealtimeAnnotation(ChannelBase channel, RestAnnotation restAnnotation) {
41+
this.channel = channel;
42+
this.restAnnotation = restAnnotation;
43+
}
44+
45+
public void publish(String messageSerial, Annotation annotation, CompletionListener listener) throws AblyException {
46+
Log.v(TAG, String.format("publish(MsgSerial, Annotation); channel = %s", channel.name));
47+
48+
throwIfNotAbleToSendOrQueueAnnotation();
49+
50+
try {
51+
annotation.encode(channel.options);
52+
} catch (MessageDecodeException e) {
53+
throw AblyException.fromThrowable(e);
54+
}
55+
56+
Log.v(TAG, String.format("RealtimeAnnotations.publish(): channelName = %s, sending annotation with messageSerial = %s, type = %s",
57+
channel.name, messageSerial, annotation.type));
58+
59+
ProtocolMessage protocolMessage = new ProtocolMessage();
60+
protocolMessage.action = ProtocolMessage.Action.annotation;
61+
protocolMessage.channel = channel.name;
62+
protocolMessage.annotations = new Annotation[]{annotation};
63+
64+
channel.sendProtocolMessage(protocolMessage, listener);
65+
}
66+
67+
public void publish(String messageSerial, Annotation annotation) throws AblyException {
68+
publish(messageSerial, annotation, null);
69+
}
70+
71+
private void throwIfNotAbleToSendOrQueueAnnotation() throws AblyException {
72+
if (channel.state == ChannelState.detached || channel.state == ChannelState.detaching || channel.state == ChannelState.failed) {
73+
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to enter presence channel in detached or failed state", 400, 91001));
74+
}
75+
}
76+
77+
public void delete(String messageSerial, Annotation annotation, CompletionListener listener) throws AblyException {
78+
Log.v(TAG, String.format("delete(MsgSerial, Annotation); channel = %s", channel.name));
79+
annotation.action = AnnotationAction.ANNOTATION_DELETE;
80+
publish(messageSerial, annotation, listener);
81+
}
82+
83+
public void delete(String messageSerial, Annotation annotation) throws AblyException {
84+
delete(messageSerial, annotation, null);
85+
}
86+
87+
/**
88+
* Retrieves a paginated list of annotations associated with the specified message serial.
89+
* <p>
90+
* Note: This is an experimental API. While the underlying functionality is stable,
91+
* the public API may change in future releases.
92+
*
93+
* @param messageSerial the unique serial identifier for the message being annotated.
94+
* @param params an array of query parameters for filtering or modifying the request.
95+
* @return a {@link PaginatedResult} containing the matching annotations.
96+
* @throws AblyException if an error occurs during the retrieval process.
97+
*/
98+
public PaginatedResult<Annotation> get(String messageSerial, Param[] params) throws AblyException {
99+
return restAnnotation.get(messageSerial, params);
100+
}
101+
102+
/**
103+
* Retrieves a paginated list of annotations associated with the specified message serial.
104+
* <p>
105+
* Note: This is an experimental API. While the underlying functionality is stable,
106+
* the public API may change in future releases.
107+
*
108+
* @param messageSerial the unique serial identifier for the message being annotated
109+
* @return a PaginatedResult containing the matching annotations
110+
* @throws AblyException if an error occurs during the retrieval process
111+
*/
112+
public PaginatedResult<Annotation> get(String messageSerial) throws AblyException {
113+
return restAnnotation.get(messageSerial, null);
114+
}
115+
116+
/**
117+
* Asynchronously retrieves a paginated list of annotations associated with the specified message serial.
118+
* <p>
119+
* Note: This is an experimental API. While the underlying functionality is stable,
120+
* the public API may change in future releases.
121+
*
122+
* @param messageSerial the unique serial identifier for the message being annotated.
123+
* @param params an array of query parameters for filtering or modifying the request.
124+
* @param callback a callback to handle the result asynchronously, providing an {@link AsyncPaginatedResult} containing the matching annotations.
125+
*/
126+
public void getAsync(String messageSerial, Param[] params, Callback<AsyncPaginatedResult<Annotation>> callback) {
127+
restAnnotation.getAsync(messageSerial, params, callback);
128+
}
129+
130+
/**
131+
* Asynchronously retrieves a paginated list of annotations associated with the specified message serial.
132+
* <p>
133+
* Note: This is an experimental API. While the underlying functionality is stable,
134+
* the public API may change in future releases.
135+
*
136+
* @param messageSerial the unique serial identifier for the message being annotated.
137+
* @param callback a callback to handle the result asynchronously, providing an {@link AsyncPaginatedResult} containing the matching annotations.
138+
*/
139+
public void getAsync(String messageSerial, Callback<AsyncPaginatedResult<Annotation>> callback) {
140+
restAnnotation.getAsync(messageSerial, null, callback);
141+
}
142+
143+
/**
144+
* Subscribes the given {@link AnnotationListener} to the channel, allowing it to receive annotations.
145+
* If the channel's attach on subscribe option is enabled, the channel is attached automatically.
146+
* <p>
147+
* Note: This is an experimental API. While the underlying functionality is stable,
148+
* the public API may change in future releases.
149+
*
150+
* @param listener the listener to be subscribed to the channel
151+
* @throws AblyException if an error occurs during channel attachment
152+
*/
153+
public synchronized void subscribe(AnnotationListener listener) throws AblyException {
154+
Log.v(TAG, String.format("subscribe(); annotations in channel = %s", channel.name));
155+
listeners.add(listener);
156+
if (channel.attachOnSubscribeEnabled()) {
157+
channel.attach();
158+
}
159+
}
160+
161+
/**
162+
* Unsubscribes the specified {@link AnnotationListener} from the channel, stopping it
163+
* from receiving further annotations. Any corresponding type-specific listeners
164+
* associated with the listener are also removed.
165+
* <p>
166+
* Note: This is an experimental API. While the underlying functionality is stable,
167+
* the public API may change in future releases.
168+
*
169+
* @param listener the {@link AnnotationListener} to be unsubscribed
170+
*/
171+
public synchronized void unsubscribe(AnnotationListener listener) {
172+
Log.v(TAG, String.format("unsubscribe(); annotations in channel = %s", channel.name));
173+
listeners.remove(listener);
174+
for (AnnotationMulticaster multicaster : typeListeners.values()) {
175+
multicaster.remove(listener);
176+
}
177+
}
178+
179+
/**
180+
* Subscribes the given {@link AnnotationListener} to the channel for a specific annotation type,
181+
* allowing it to receive annotations of the specified type. If the channel's attach on subscribe
182+
* option is enabled, the channel is attached automatically.
183+
* <p>
184+
* Note: This is an experimental API. While the underlying functionality is stable,
185+
* the public API may change in future releases.
186+
*
187+
* @param type the specific annotation type to subscribe to; if null, subscribes to all types
188+
* @param listener the {@link AnnotationListener} to be subscribed
189+
*/
190+
public synchronized void subscribe(String type, AnnotationListener listener) throws AblyException {
191+
Log.v(TAG, String.format("subscribe(); annotations in channel = %s; single type = %s", channel.name, type));
192+
subscribeImpl(type, listener);
193+
if (channel.attachOnSubscribeEnabled()) {
194+
channel.attach();
195+
}
196+
}
197+
198+
/**
199+
* Unsubscribes the specified {@link AnnotationListener} from receiving annotations
200+
* of a particular type within the channel. If there are no remaining listeners
201+
* for the specified type, the type-specific listener collection is also removed.
202+
* <p>
203+
* Note: This is an experimental API. While the underlying functionality is stable,
204+
* the public API may change in future releases.
205+
*
206+
* @param type the specific annotation type to unsubscribe from; if null, unsubscribes
207+
* from all annotations associated with the listener
208+
* @param listener the {@link AnnotationListener} to be unsubscribed
209+
*/
210+
public synchronized void unsubscribe(String type, AnnotationListener listener) {
211+
Log.v(TAG, String.format("unsubscribe(); annotations in channel = %s; single type = %s", channel.name, type));
212+
unsubscribeImpl(type, listener);
213+
}
214+
215+
/**
216+
* Internal method. Handles incoming annotation messages from the protocol layer.
217+
*
218+
* @param protocolMessage the protocol message containing annotation data
219+
*/
220+
public void onAnnotation(ProtocolMessage protocolMessage) {
221+
List<Annotation> annotations = new ArrayList<>();
222+
for (int i = 0; i < protocolMessage.annotations.length; i++) {
223+
Annotation annotation = protocolMessage.annotations[i];
224+
try {
225+
annotation.decode(channel.options);
226+
} catch (MessageDecodeException e) {
227+
Log.e(TAG, String.format(Locale.ROOT, "%s on channel %s", e.errorInfo.message, channel.name));
228+
}
229+
/* populate fields derived from protocol message */
230+
if (annotation.connectionId == null) annotation.connectionId = protocolMessage.connectionId;
231+
if (annotation.timestamp == 0) annotation.timestamp = protocolMessage.timestamp;
232+
if (annotation.id == null) annotation.id = protocolMessage.id + ':' + i;
233+
annotations.add(annotation);
234+
}
235+
broadcastAnnotation(annotations);
236+
}
237+
238+
private void broadcastAnnotation(List<Annotation> annotations) {
239+
for (Annotation annotation : annotations) {
240+
listeners.onAnnotation(annotation);
241+
242+
String type = annotation.type != null ? annotation.type : "";
243+
AnnotationMulticaster eventListener = typeListeners.get(type);
244+
if (eventListener != null) eventListener.onAnnotation(annotation);
245+
}
246+
}
247+
248+
private void subscribeImpl(String type, AnnotationListener listener) {
249+
String annotationType = type != null ? type : "";
250+
AnnotationMulticaster typeSpecificListeners = typeListeners.get(annotationType);
251+
if (typeSpecificListeners == null) {
252+
typeSpecificListeners = new AnnotationMulticaster();
253+
typeListeners.put(annotationType, typeSpecificListeners);
254+
}
255+
typeSpecificListeners.add(listener);
256+
}
257+
258+
private void unsubscribeImpl(String type, AnnotationListener listener) {
259+
AnnotationMulticaster listeners = typeListeners.get(type);
260+
if (listeners != null) {
261+
listeners.remove(listener);
262+
if (listeners.isEmpty()) {
263+
typeListeners.remove(type);
264+
}
265+
}
266+
}
267+
268+
public interface AnnotationListener {
269+
void onAnnotation(Annotation annotation);
270+
}
271+
272+
private static class AnnotationMulticaster extends Multicaster<AnnotationListener> implements AnnotationListener {
273+
@Override
274+
public void onAnnotation(Annotation annotation) {
275+
for (final AnnotationListener member : getMembers()) {
276+
try {
277+
member.onAnnotation(annotation);
278+
} catch (Exception e) {
279+
Log.e(TAG, e.getMessage(), e);
280+
}
281+
}
282+
}
283+
}
284+
}

lib/src/main/java/io/ably/lib/rest/ChannelBase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ public class ChannelBase {
3838
*/
3939
public final Presence presence;
4040

41+
/**
42+
* Represents the annotations associated with a channel message.
43+
* This field provides functionality for managing annotations.
44+
*/
45+
public final RestAnnotation annotations;
46+
47+
4148
/**
4249
* Publish a message on this channel using the REST API.
4350
* Since the REST API is stateless, this request is made independently
@@ -315,6 +322,7 @@ private BasePaginatedQuery.ResultRequest<PresenceMessage> historyImpl(Http http,
315322
this.options = options;
316323
this.basePath = "/channels/" + HttpUtils.encodeURIComponent(name);
317324
this.presence = new Presence();
325+
this.annotations = new RestAnnotation(name, ably.http, ably.options, options);
318326
}
319327

320328
private final AblyBase ably;

0 commit comments

Comments
 (0)