Skip to content

Commit e3a601c

Browse files
committed
[AIT-84] feat: add support for returning serials on publish
- Introduced `publishWithResult` API to include message serials in the response. - Added callback-based `publishAsync` method for asynchronous publishing with serials. - Deprecated older `CompletionListener`-based publish methods in favor of `Callback<PublishResult>`.
1 parent 47b958f commit e3a601c

8 files changed

Lines changed: 296 additions & 203 deletions

File tree

android/src/main/java/io/ably/lib/push/PushChannel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void subscribeClient() throws AblyException {
4949
* @throws AblyException
5050
*/
5151
public void subscribeClientAsync(CompletionListener listener) {
52-
subscribeClientImpl().async(new CompletionListener.ToCallback(listener));
52+
subscribeClientImpl().async(new CompletionListener.ToCallback<>(listener));
5353
}
5454

5555
protected Http.Request<Void> subscribeClientImpl() {
@@ -83,7 +83,7 @@ public void subscribeDevice() throws AblyException {
8383
* @throws AblyException
8484
*/
8585
public void subscribeDeviceAsync(CompletionListener listener) {
86-
subscribeDeviceImpl().async(new CompletionListener.ToCallback(listener));
86+
subscribeDeviceImpl().async(new CompletionListener.ToCallback<>(listener));
8787
}
8888

8989
protected Http.Request<Void> subscribeDeviceImpl() {
@@ -131,7 +131,7 @@ public void unsubscribeClient() throws AblyException {
131131
* @throws AblyException
132132
*/
133133
public void unsubscribeClientAsync(CompletionListener listener) {
134-
unsubscribeClientImpl().async(new CompletionListener.ToCallback(listener));
134+
unsubscribeClientImpl().async(new CompletionListener.ToCallback<>(listener));
135135
}
136136

137137
protected Http.Request<Void> unsubscribeClientImpl() {
@@ -163,7 +163,7 @@ public void unsubscribeDevice() throws AblyException {
163163
* @throws AblyException
164164
*/
165165
public void unsubscribeDeviceAsync(CompletionListener listener) {
166-
unsubscribeDeviceImpl().async(new CompletionListener.ToCallback(listener));
166+
unsubscribeDeviceImpl().async(new CompletionListener.ToCallback<>(listener));
167167
}
168168

169169
protected Http.Request<Void> unsubscribeDeviceImpl() {

lib/src/main/java/io/ably/lib/push/PushBase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void publish(Param[] recipient, JsonObject payload) throws AblyException
8080
* @throws AblyException
8181
*/
8282
public void publishAsync(Param[] recipient, JsonObject payload, final CompletionListener listener) {
83-
publishImpl(recipient, payload).async(new CompletionListener.ToCallback(listener));
83+
publishImpl(recipient, payload).async(new CompletionListener.ToCallback<>(listener));
8484
}
8585

8686
private Http.Request<Void> publishImpl(final Param[] recipient, final JsonObject payload) {
@@ -275,7 +275,7 @@ public void remove(String deviceId) throws AblyException {
275275
* @param listener A listener to be notified of success or failure.
276276
*/
277277
public void removeAsync(String deviceId, CompletionListener listener) {
278-
removeImpl(deviceId).async(new CompletionListener.ToCallback(listener));
278+
removeImpl(deviceId).async(new CompletionListener.ToCallback<>(listener));
279279
}
280280

281281
protected Http.Request<Void> removeImpl(final String deviceId) {
@@ -310,7 +310,7 @@ public void removeWhere(Param[] params) throws AblyException {
310310
* @param listener A listener to be notified of success or failure.
311311
*/
312312
public void removeWhereAsync(Param[] params, CompletionListener listener) {
313-
removeWhereImpl(params).async(new CompletionListener.ToCallback(listener));
313+
removeWhereImpl(params).async(new CompletionListener.ToCallback<>(listener));
314314
}
315315

316316
protected Http.Request<Void> removeWhereImpl(Param[] params) {
@@ -435,7 +435,7 @@ public void remove(ChannelSubscription subscription) throws AblyException {
435435
* @throws AblyException
436436
*/
437437
public void removeAsync(ChannelSubscription subscription, CompletionListener listener) {
438-
removeImpl(subscription).async(new CompletionListener.ToCallback(listener));
438+
removeImpl(subscription).async(new CompletionListener.ToCallback<>(listener));
439439
}
440440

441441
protected Http.Request<Void> removeImpl(ChannelSubscription subscription) {
@@ -476,7 +476,7 @@ public void removeWhere(Param[] params) throws AblyException {
476476
* @throws AblyException
477477
*/
478478
public void removeWhereAsync(Param[] params, CompletionListener listener) {
479-
removeWhereImpl(params).async(new CompletionListener.ToCallback(listener));
479+
removeWhereImpl(params).async(new CompletionListener.ToCallback<>(listener));
480480
}
481481

482482
protected Http.Request<Void> removeWhereImpl(Param[] params) {

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.ably.lib.types.ProtocolMessage;
4444
import io.ably.lib.types.ProtocolMessage.Action;
4545
import io.ably.lib.types.ProtocolMessage.Flag;
46+
import io.ably.lib.types.PublishResult;
4647
import io.ably.lib.types.Summary;
4748
import io.ably.lib.types.UpdateDeleteResult;
4849
import io.ably.lib.util.CollectionUtils;
@@ -443,6 +444,16 @@ private static void callCompletionListenerError(CompletionListener listener, Err
443444
}
444445
}
445446

447+
private static void callCompletionListenerError(Callback<PublishResult> listener, ErrorInfo err) {
448+
if(listener != null) {
449+
try {
450+
listener.onError(err);
451+
} catch(Throwable t) {
452+
Log.e(TAG, "Unexpected exception calling CompletionListener", t);
453+
}
454+
}
455+
}
456+
446457
private void setAttached(ProtocolMessage message) {
447458
clearAttachTimers();
448459
properties.attachSerial = message.channelSerial;
@@ -1025,7 +1036,7 @@ private void unsubscribeImpl(String name, MessageListener listener) {
10251036
* @throws AblyException
10261037
*/
10271038
public void publish(String name, Object data) throws AblyException {
1028-
publish(name, data, null);
1039+
publish(name, data, (Callback<PublishResult>) null);
10291040
}
10301041

10311042
/**
@@ -1037,7 +1048,7 @@ public void publish(String name, Object data) throws AblyException {
10371048
* @throws AblyException
10381049
*/
10391050
public void publish(Message message) throws AblyException {
1040-
publish(message, null);
1051+
publish(message, (Callback<PublishResult>) null);
10411052
}
10421053

10431054
/**
@@ -1049,7 +1060,7 @@ public void publish(Message message) throws AblyException {
10491060
* @throws AblyException
10501061
*/
10511062
public void publish(Message[] messages) throws AblyException {
1052-
publish(messages, null);
1063+
publish(messages, (Callback<PublishResult>) null);
10531064
}
10541065

10551066
/**
@@ -1065,12 +1076,34 @@ public void publish(Message[] messages) throws AblyException {
10651076
* <p>
10661077
* This listener is invoked on a background thread.
10671078
* @throws AblyException
1079+
* @deprecated Use {@link #publish(String, Object, Callback)} instead.
10681080
*/
1081+
@Deprecated
10691082
public void publish(String name, Object data, CompletionListener listener) throws AblyException {
10701083
Log.v(TAG, "publish(String, Object); channel = " + this.name + "; event = " + name);
10711084
publish(new Message[] {new Message(name, data)}, listener);
10721085
}
10731086

1087+
/**
1088+
* Publishes a single message to the channel with the given event name and payload.
1089+
* When publish is called with this client library, it won't attempt to implicitly attach to the channel,
1090+
* so long as <a href="https://ably.com/docs/realtime/channels#transient-publish">transient publishing</a> is available in the library.
1091+
* Otherwise, the client will implicitly attach.
1092+
* <p>
1093+
* Spec: RTL6i
1094+
* @param name the event name
1095+
* @param data the message payload
1096+
* @param callback A callback may optionally be passed in to this call to be notified of success or failure of the operation,
1097+
* receiving a {@link PublishResult} with message serial(s) on success.
1098+
* <p>
1099+
* This callback is invoked on a background thread.
1100+
* @throws AblyException
1101+
*/
1102+
public void publish(String name, Object data, Callback<PublishResult> callback) throws AblyException {
1103+
Log.v(TAG, "publish(String, Object); channel = " + this.name + "; event = " + name);
1104+
publish(new Message[] {new Message(name, data)}, callback);
1105+
}
1106+
10741107
/**
10751108
* Publishes a message to the channel.
10761109
* When publish is called with this client library, it won't attempt to implicitly attach to the channel.
@@ -1081,12 +1114,31 @@ public void publish(String name, Object data, CompletionListener listener) throw
10811114
* <p>
10821115
* This listener is invoked on a background thread.
10831116
* @throws AblyException
1117+
* @deprecated Use {@link #publish(Message, Callback)} instead.
10841118
*/
1119+
@Deprecated
10851120
public void publish(Message message, CompletionListener listener) throws AblyException {
10861121
Log.v(TAG, "publish(Message); channel = " + this.name + "; event = " + message.name);
10871122
publish(new Message[] {message}, listener);
10881123
}
10891124

1125+
/**
1126+
* Publishes a message to the channel.
1127+
* When publish is called with this client library, it won't attempt to implicitly attach to the channel.
1128+
* <p>
1129+
* Spec: RTL6i
1130+
* @param message A {@link Message} object.
1131+
* @param callback A callback may optionally be passed in to this call to be notified of success or failure of the operation,
1132+
* receiving a {@link PublishResult} with message serial(s) on success.
1133+
* <p>
1134+
* This callback is invoked on a background thread.
1135+
* @throws AblyException
1136+
*/
1137+
public void publish(Message message, Callback<PublishResult> callback) throws AblyException {
1138+
Log.v(TAG, "publish(Message); channel = " + this.name + "; event = " + message.name);
1139+
publish(new Message[] {message}, callback);
1140+
}
1141+
10901142
/**
10911143
* Publishes an array of messages to the channel.
10921144
* When publish is called with this client library, it won't attempt to implicitly attach to the channel.
@@ -1098,7 +1150,12 @@ public void publish(Message message, CompletionListener listener) throws AblyExc
10981150
* This listener is invoked on a background thread.
10991151
* @throws AblyException
11001152
*/
1153+
@Deprecated
11011154
public synchronized void publish(Message[] messages, CompletionListener listener) throws AblyException {
1155+
publish(messages, Listeners.fromCompletionListener(listener));
1156+
}
1157+
1158+
public synchronized void publish(Message[] messages, Callback<PublishResult> listener) throws AblyException {
11021159
Log.v(TAG, "publish(Message[]); channel = " + this.name);
11031160
ConnectionManager connectionManager = ably.connection.connectionManager;
11041161
ConnectionManager.State connectionState = connectionManager.getConnectionState();
@@ -1125,7 +1182,7 @@ public synchronized void publish(Message[] messages, CompletionListener listener
11251182
case suspended:
11261183
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to publish in failed or suspended state", 400, 40000));
11271184
default:
1128-
connectionManager.send(msg, queueMessages, Listeners.fromCompletionListener(listener));
1185+
connectionManager.send(msg, queueMessages, listener);
11291186
}
11301187
}
11311188

lib/src/main/java/io/ably/lib/realtime/CompletionListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ public void onError(ErrorInfo reason) {
4343
}
4444
}
4545

46-
class ToCallback implements Callback<Void> {
46+
class ToCallback<T> implements Callback<T> {
4747
private CompletionListener listener;
4848
public ToCallback(CompletionListener listener) {
4949
this.listener = listener;
5050
}
5151

5252
@Override
53-
public void onSuccess(Void v) {
53+
public void onSuccess(T v) {
5454
listener.onSuccess();
5555
}
5656

lib/src/main/java/io/ably/lib/rest/ChannelBase.java

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.ably.lib.types.Param;
1818
import io.ably.lib.types.PresenceMessage;
1919
import io.ably.lib.types.PresenceSerializer;
20+
import io.ably.lib.types.PublishResult;
2021
import io.ably.lib.types.UpdateDeleteResult;
2122
import io.ably.lib.util.Crypto;
2223

@@ -66,6 +67,23 @@ void publish(Http http, String name, Object data) throws AblyException {
6667
publishImpl(http, name, data).sync();
6768
}
6869

70+
/**
71+
* Publish a message on this channel using the REST API and return the result.
72+
* Since the REST API is stateless, this request is made independently
73+
* of any other request on this or any other channel.
74+
* @param name the event name
75+
* @param data the message payload;
76+
* @return A {@link PublishResult} containing the message serial(s)
77+
* @throws AblyException
78+
*/
79+
public PublishResult publishWithResult(String name, Object data) throws AblyException {
80+
return publishWithResult(ably.http, name, data);
81+
}
82+
83+
PublishResult publishWithResult(Http http, String name, Object data) throws AblyException {
84+
return publishImpl(http, name, data).sync();
85+
}
86+
6987
/**
7088
* Publish a message on this channel using the REST API.
7189
* Since the REST API is stateless, this request is made independently
@@ -76,16 +94,37 @@ void publish(Http http, String name, Object data) throws AblyException {
7694
* @param listener a listener to be notified of the outcome of this message.
7795
* <p>
7896
* This listener is invoked on a background thread.
97+
* @deprecated Use {@link #publishAsync(String, Object, Callback)} instead.
7998
*/
99+
@Deprecated
80100
public void publishAsync(String name, Object data, CompletionListener listener) {
81101
publishAsync(ably.http, name, data, listener);
82102
}
83103

84104
void publishAsync(Http http, String name, Object data, CompletionListener listener) {
85-
publishImpl(http, name, data).async(new CompletionListener.ToCallback(listener));
105+
publishImpl(http, name, data).async(new CompletionListener.ToCallback<>(listener));
86106
}
87107

88-
private Http.Request<Void> publishImpl(Http http, String name, Object data) {
108+
/**
109+
* Asynchronously publish a message on this channel using the REST API.
110+
* Since the REST API is stateless, this request is made independently
111+
* of any other request on this or any other channel.
112+
*
113+
* @param name the event name
114+
* @param data the message payload;
115+
* @param callback a callback to be notified of the outcome of this message with the {@link PublishResult}.
116+
* <p>
117+
* This callback is invoked on a background thread.
118+
*/
119+
public void publishAsync(String name, Object data, Callback<PublishResult> callback) {
120+
publishAsync(ably.http, name, data, callback);
121+
}
122+
123+
void publishAsync(Http http, String name, Object data, Callback<PublishResult> callback) {
124+
publishImpl(http, name, data).async(callback);
125+
}
126+
127+
private Http.Request<PublishResult> publishImpl(Http http, String name, Object data) {
89128
return publishImpl(http, new Message[] {new Message(name, data)});
90129
}
91130

@@ -105,26 +144,52 @@ void publish(Http http, final Message[] messages) throws AblyException {
105144
publishImpl(http, messages).sync();
106145
}
107146

147+
/**
148+
* Publish an array of messages on this channel. When there are
149+
* multiple messages to be sent, it is more efficient to use this
150+
* method to publish them in a single request, as compared with
151+
* publishing via multiple independent requests.
152+
* @param messages array of messages to publish.
153+
* @throws AblyException
154+
*/
155+
public PublishResult publishWithResult(final Message[] messages) throws AblyException {
156+
return publishImpl(ably.http, messages).sync();
157+
}
158+
108159
/**
109160
* Asynchronously publish an array of messages on this channel
110161
*
111162
* @param messages the message
112163
* @param listener a listener to be notified of the outcome of this message.
164+
* @deprecated Use {@link #publishAsync(Message[], Callback)} instead.
113165
* <p>
114166
* This listener is invoked on a background thread.
115167
*/
168+
@Deprecated
116169
public void publishAsync(final Message[] messages, final CompletionListener listener) {
117170
publishAsync(ably.http, messages, listener);
118171
}
119172

120173
void publishAsync(Http http, final Message[] messages, final CompletionListener listener) {
121-
publishImpl(http, messages).async(new CompletionListener.ToCallback(listener));
174+
publishImpl(http, messages).async(new CompletionListener.ToCallback<>(listener));
122175
}
123176

124-
private Http.Request<Void> publishImpl(Http http, final Message[] messages) {
125-
return http.request(new Http.Execute<Void>() {
177+
/**
178+
* Asynchronously publish an array of messages on this channel
179+
*
180+
* @param messages the message
181+
* @param listener a listener to be notified of the outcome of this message.
182+
* <p>
183+
* This listener is invoked on a background thread.
184+
*/
185+
public void publishAsync(final Message[] messages, final Callback<PublishResult> listener) {
186+
publishImpl(ably.http, messages).async(listener);
187+
}
188+
189+
private Http.Request<PublishResult> publishImpl(Http http, final Message[] messages) {
190+
return http.request(new Http.Execute<PublishResult>() {
126191
@Override
127-
public void execute(HttpScheduler http, final Callback<Void> callback) throws AblyException {
192+
public void execute(HttpScheduler http, final Callback<PublishResult> callback) throws AblyException {
128193
/* handle message ids */
129194
boolean hasClientSuppliedId = false;
130195
for(Message message : messages) {
@@ -145,7 +210,15 @@ public void execute(HttpScheduler http, final Callback<Void> callback) throws Ab
145210
HttpCore.RequestBody requestBody = ably.options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(messages) : MessageSerializer.asJsonRequest(messages);
146211
final Param[] params = ably.options.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; // RSC7c
147212

148-
http.post(basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, requestBody, null, true, callback);
213+
// Create ResponseHandler from BodyHandler
214+
HttpCore.BodyHandler<String> bodyHandler = PublishResult.getBodyHandler();
215+
HttpCore.ResponseHandler<PublishResult> responseHandler = (response, error) -> {
216+
if (error != null) throw AblyException.fromErrorInfo(error);
217+
String[] serials = bodyHandler.handleResponseBody(response.contentType, response.body);
218+
return new PublishResult(serials);
219+
};
220+
221+
http.post(basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, requestBody, responseHandler, true, callback);
149222
}
150223
});
151224
}

0 commit comments

Comments
 (0)