Skip to content

Commit 111e15a

Browse files
committed
[ECO-5076] Implemented code for getRoot method
1. Added getChannelModes and getChannelState methods to adapter 2. Added extension helper methods to adapter to ensure channel config. is valid
1 parent e808d38 commit 111e15a

8 files changed

Lines changed: 128 additions & 20 deletions

File tree

lib/src/main/java/io/ably/lib/objects/Adapter.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package io.ably.lib.objects;
22

33
import io.ably.lib.realtime.AblyRealtime;
4+
import io.ably.lib.realtime.ChannelState;
45
import io.ably.lib.realtime.CompletionListener;
56
import io.ably.lib.types.AblyException;
7+
import io.ably.lib.types.ChannelMode;
8+
import io.ably.lib.types.ChannelOptions;
69
import io.ably.lib.types.ProtocolMessage;
710
import io.ably.lib.util.Log;
811
import org.jetbrains.annotations.NotNull;
@@ -34,4 +37,32 @@ public void send(@NotNull ProtocolMessage msg, @NotNull CompletionListener liste
3437
public int maxMessageSizeLimit() {
3538
return ably.connection.connectionManager.maxMessageSize;
3639
}
40+
41+
@Override
42+
public ChannelMode[] getChannelModes(@NotNull String channelName) {
43+
if (ably.channels.containsKey(channelName)) {
44+
// RTO2a - channel.modes is only populated on channel attachment, so use it only if it is set
45+
ChannelMode[] modes = ably.channels.get(channelName).getModes();
46+
if (modes != null) {
47+
return modes;
48+
}
49+
// RTO2b - otherwise as a best effort use user provided channel options
50+
ChannelOptions options = ably.channels.get(channelName).getOptions();
51+
if (options != null && options.hasModes()) {
52+
return options.modes;
53+
}
54+
return null;
55+
}
56+
Log.e(TAG, "getChannelMode(): channel not found: " + channelName);
57+
return null;
58+
}
59+
60+
@Override
61+
public ChannelState getChannelState(@NotNull String channelName) {
62+
if (ably.channels.containsKey(channelName)) {
63+
return ably.channels.get(channelName).state;
64+
}
65+
Log.e(TAG, "getChannelState(): channel not found: " + channelName);
66+
return null;
67+
}
3768
}

lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package io.ably.lib.objects;
22

3+
import io.ably.lib.realtime.ChannelState;
34
import io.ably.lib.realtime.CompletionListener;
45
import io.ably.lib.types.AblyException;
6+
import io.ably.lib.types.ChannelMode;
57
import io.ably.lib.types.ProtocolMessage;
68
import org.jetbrains.annotations.NotNull;
9+
import org.jetbrains.annotations.Nullable;
710

811
public interface LiveObjectsAdapter {
912
/**
@@ -31,5 +34,24 @@ public interface LiveObjectsAdapter {
3134
* @return the maximum message size limit in bytes.
3235
*/
3336
int maxMessageSizeLimit();
37+
38+
/**
39+
* Retrieves the channel modes for a specific channel.
40+
* This method returns the modes that are set for the specified channel.
41+
*
42+
* @param channelName the name of the channel for which to retrieve the modes
43+
* @return the array of channel modes for the specified channel, or null if the channel is not found
44+
* Spec: RTO2a, RTO2b
45+
*/
46+
@Nullable ChannelMode[] getChannelModes(@NotNull String channelName);
47+
48+
/**
49+
* Retrieves the current state of a specific channel.
50+
* This method returns the state of the specified channel, which indicates its connection status.
51+
*
52+
* @param channelName the name of the channel for which to retrieve the state
53+
* @return the current state of the specified channel, or null if the channel is not found
54+
*/
55+
@Nullable ChannelState getChannelState(@NotNull String channelName);
3456
}
3557

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,10 @@ public ChannelMode[] getModes() {
12891289
return modes.toArray(new ChannelMode[modes.size()]);
12901290
}
12911291

1292+
public ChannelOptions getOptions() {
1293+
return options;
1294+
}
1295+
12921296
/************************************
12931297
* internal general
12941298
* @throws AblyException

live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
4141
private val sequentialScope =
4242
CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName(channelName) + SupervisorJob())
4343

44+
/**
45+
* Coroutine scope for handling callbacks asynchronously.
46+
*/
47+
private val callbackScope = CoroutineScope(Dispatchers.Default + CoroutineName("LiveObjectsCallback-$channelName"))
48+
4449
/**
4550
* Event bus for handling incoming object messages sequentially.
4651
*/
@@ -51,11 +56,12 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
5156
incomingObjectsHandler = initializeHandlerForIncomingObjectMessages()
5257
}
5358

54-
/**
55-
* @spec RTO1 - Returns the root LiveMap object with proper validation and sync waiting
56-
*/
5759
override fun getRoot(): LiveMap {
58-
TODO("Not yet implemented")
60+
return runBlocking { getRootAsync() }
61+
}
62+
63+
override fun getRootAsync(callback: Callback<LiveMap>) {
64+
callbackScope.with(callback) { getRootAsync() }
5965
}
6066

6167
override fun createMap(liveMap: LiveMap): LiveMap {
@@ -70,10 +76,6 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
7076
TODO("Not yet implemented")
7177
}
7278

73-
override fun getRootAsync(callback: Callback<LiveMap>) {
74-
TODO("Not yet implemented")
75-
}
76-
7779
override fun createMapAsync(liveMap: LiveMap, callback: Callback<LiveMap>) {
7880
TODO("Not yet implemented")
7981
}
@@ -94,6 +96,14 @@ internal class DefaultLiveObjects(private val channelName: String, internal val
9496
TODO("Not yet implemented")
9597
}
9698

99+
private suspend fun getRootAsync(): LiveMap {
100+
return sequentialScope.async {
101+
adapter.throwIfInvalidAccessApiConfiguration(channelName)
102+
// TODO - wait for state in synced state
103+
objectsPool.get(ROOT_OBJECT_ID) as LiveMap
104+
}.await()
105+
}
106+
97107
/**
98108
* Handles a ProtocolMessage containing proto action as `object` or `object_sync`.
99109
* @spec RTL1 - Processes incoming object messages and object sync messages

live-objects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ internal enum class ErrorCode(public val code: Int) {
88
// LiveMap specific error codes
99
MapKeyShouldBeString(40_003),
1010
MapValueDataTypeUnsupported(40_013),
11+
// Channel mode and state validation error codes
12+
ChannelModeRequired(40_024),
13+
ChannelStateError(90_001),
1114
}
1215

1316
internal enum class HttpStatusCode(public val code: Int) {

live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.ably.lib.objects
22

3+
import io.ably.lib.realtime.ChannelState
34
import io.ably.lib.realtime.CompletionListener
5+
import io.ably.lib.types.ChannelMode
46
import io.ably.lib.types.ErrorInfo
57
import io.ably.lib.types.ProtocolMessage
68
import kotlinx.coroutines.suspendCancellableCoroutine
@@ -39,6 +41,27 @@ internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMe
3941
setChannelSerial(channelName, channelSerial)
4042
}
4143

44+
internal fun LiveObjectsAdapter.throwIfInvalidAccessApiConfiguration(channelName: String) {
45+
throwIfMissingChannelMode(channelName, ChannelMode.object_subscribe)
46+
throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed))
47+
}
48+
49+
// Spec: RTO2
50+
internal fun LiveObjectsAdapter.throwIfMissingChannelMode(channelName: String, channelMode: ChannelMode) {
51+
val channelModes = getChannelModes(channelName)
52+
if (channelModes == null || !channelModes.contains(channelMode)) {
53+
// Spec: RTO2a2, RTO2b2
54+
throw ablyException("\"${channelMode.name}\" channel mode must be set for this operation", ErrorCode.ChannelModeRequired)
55+
}
56+
}
57+
58+
internal fun LiveObjectsAdapter.throwIfInChannelState(channelName: String, channelStates: Array<ChannelState>) {
59+
val currentState = getChannelState(channelName)
60+
if (currentState == null || channelStates.contains(currentState)) {
61+
throw ablyException("Channel is in invalid state: $currentState", ErrorCode.ChannelStateError)
62+
}
63+
}
64+
4265
internal enum class ProtocolMessageFormat(private val value: String) {
4366
Msgpack("msgpack"),
4467
Json("json");

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,12 @@ internal class ObjectsPool(
4848
private var gcJob: Job // Job for the garbage collection coroutine
4949

5050
init {
51-
// Initialize pool with root object
52-
createInitialPool()
51+
// RTO3b - Initialize pool with root object
52+
pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, adapter, this)
5353
// Start garbage collection coroutine
5454
gcJob = startGCJob()
5555
}
5656

57-
/**
58-
* Creates the initial pool with root object.
59-
*
60-
* @spec RTO3b - Creates root LiveMap object
61-
*/
62-
private fun createInitialPool() {
63-
val root = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, adapter, this)
64-
pool[ROOT_OBJECT_ID] = root
65-
}
66-
6757
/**
6858
* Gets a live object from the pool by object ID.
6959
*/

live-objects/src/main/kotlin/io/ably/lib/objects/Utils.kt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.ably.lib.objects
22

33
import io.ably.lib.types.AblyException
4+
import io.ably.lib.types.Callback
45
import io.ably.lib.types.ErrorInfo
6+
import kotlinx.coroutines.*
57

68
internal fun ablyException(
79
errorMessage: String,
@@ -44,3 +46,26 @@ internal fun objectError(errorMessage: String, cause: Throwable? = null): AblyEx
4446
*/
4547
internal val String.byteSize: Int
4648
get() = this.toByteArray(Charsets.UTF_8).size
49+
50+
/**
51+
* Executes a suspend function within a coroutine and handles the result via a callback.
52+
*
53+
* This utility bridges between coroutine-based implementation code and callback-based APIs.
54+
* It launches a coroutine in the current scope to execute the provided suspend block,
55+
* then routes the result or any error to the appropriate callback method.
56+
*
57+
* @param T The type of result expected from the operation
58+
* @param callback The callback to invoke with the operation result or error
59+
* @param block The suspend function to execute that returns a value of type T
60+
*/
61+
internal fun <T> CoroutineScope.with(callback: Callback<T>, block: suspend () -> T) {
62+
launch {
63+
try {
64+
val result = block()
65+
callback.onSuccess(result)
66+
} catch (throwable: Throwable) {
67+
val exception = throwable as? AblyException
68+
callback.onError(exception?.errorInfo)
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)