Skip to content

Commit aafa1a1

Browse files
committed
[ECO-5458] Updated interfaces for mapUpdate and counterUpdate to inherit LiveObjectUpdate
1. Updated mapManager to return LiveMapUpdate instead of Map 2. Updated counterManager to return LiveCounterUpdate instead of map 3. Updated BaseLiveObject accordingly
1 parent 819ec3d commit aafa1a1

File tree

14 files changed

+259
-77
lines changed

14 files changed

+259
-77
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.ably.lib.objects.type;
2+
3+
public abstract class LiveObjectUpdate {
4+
protected final Object update;
5+
6+
protected LiveObjectUpdate(Object update) {
7+
this.update = update;
8+
}
9+
}

lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* It allows incrementing, decrementing, and retrieving the current value of the counter,
1212
* both synchronously and asynchronously.
1313
*/
14-
public interface LiveCounter {
14+
public interface LiveCounter extends LiveCounterChange {
1515

1616
/**
1717
* Increments the value of the counter by 1.
Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,39 @@
11
package io.ably.lib.objects.type.counter;
22

3+
import io.ably.lib.objects.type.LiveObjectUpdate;
34
import org.jetbrains.annotations.NotNull;
45

56
/**
67
* Spec: RTLC11, RTLC11a
78
*/
8-
public class LiveCounterUpdate {
9-
@NotNull
10-
private final Long amount; // RTLC11b, RTLC11b1
9+
public class LiveCounterUpdate extends LiveObjectUpdate {
10+
11+
public LiveCounterUpdate() {
12+
super(null);
13+
}
1114

12-
public LiveCounterUpdate(@NotNull Long amount) {
13-
this.amount = amount;
15+
public LiveCounterUpdate(Long amount) {
16+
super(new Update(amount));
1417
}
1518

1619
@NotNull
17-
public Long getUpdate() {
18-
return amount;
20+
public LiveCounterUpdate.Update getUpdate() {
21+
return (Update) update;
22+
}
23+
24+
/**
25+
* Spec: RTLC11b, RTLC11b1
26+
*/
27+
public static class Update {
28+
@NotNull
29+
private final Long amount;
30+
31+
public Update(@NotNull Long amount) {
32+
this.amount = amount;
33+
}
34+
35+
public @NotNull Long getAmount() {
36+
return amount;
37+
}
1938
}
2039
}

lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* The LiveMap interface provides methods to interact with a live, real-time map structure.
1515
* It supports both synchronous and asynchronous operations for managing key-value pairs.
1616
*/
17-
public interface LiveMap {
17+
public interface LiveMap extends LiveMapChange {
1818

1919
/**
2020
* Retrieves the value associated with the specified key.

lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
11
package io.ably.lib.objects.type.map;
22

3+
import io.ably.lib.objects.type.LiveObjectUpdate;
34
import org.jetbrains.annotations.NotNull;
45

56
import java.util.Map;
67

78
/**
89
* Spec: RTLM18, RTLM18a
910
*/
10-
public class LiveMapUpdate {
11+
public class LiveMapUpdate extends LiveObjectUpdate {
1112

12-
@NotNull
13-
private final Map<String, Change> update;
13+
public LiveMapUpdate() {
14+
super(null);
15+
}
1416

1517
/**
1618
* Constructor for LiveMapUpdate
1719
* @param update The map of updates
1820
*/
1921
public LiveMapUpdate(@NotNull Map<String, Change> update) {
20-
this.update = update;
22+
super(update);
2123
}
2224

2325
/**
@@ -26,7 +28,7 @@ public LiveMapUpdate(@NotNull Map<String, Change> update) {
2628
*/
2729
@NotNull
2830
public Map<String, Change> getUpdate() {
29-
return update;
31+
return (Map<String, Change>) update;
3032
}
3133

3234
/**

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.ably.lib.objects
22

33
import io.ably.lib.objects.type.BaseLiveObject
4+
import io.ably.lib.objects.type.LiveObjectUpdate
45
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
56
import io.ably.lib.objects.type.livemap.DefaultLiveMap
67
import io.ably.lib.util.Log
@@ -126,7 +127,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
126127

127128
val receivedObjectIds = mutableSetOf<String>()
128129
// RTO5c1a2 - List to collect updates for existing objects
129-
val existingObjectUpdates = mutableListOf<Pair<BaseLiveObject, Any>>()
130+
val existingObjectUpdates = mutableListOf<Pair<BaseLiveObject, LiveObjectUpdate>>()
130131

131132
// RTO5c1
132133
for ((objectId, objectState) in syncObjectsDataPool) {

live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@ import io.ably.lib.objects.ObjectOperation
66
import io.ably.lib.objects.ObjectState
77
import io.ably.lib.objects.ObjectsPoolDefaults
88
import io.ably.lib.objects.objectError
9+
import io.ably.lib.objects.type.livecounter.noOpCounterUpdate
10+
import io.ably.lib.objects.type.livemap.noOpMapUpdate
911
import io.ably.lib.util.Log
1012

1113
internal enum class ObjectType(val value: String) {
1214
Map("map"),
1315
Counter("counter")
1416
}
1517

18+
internal val LiveObjectUpdate.noOp get() = this.update == null
19+
1620
/**
1721
* Base implementation of LiveObject interface.
1822
* Provides common functionality for all live objects.
@@ -43,7 +47,7 @@ internal abstract class BaseLiveObject(
4347
*
4448
* @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter
4549
*/
46-
internal fun applyObjectSync(objectState: ObjectState): Map<String, Any> {
50+
internal fun applyObjectSync(objectState: ObjectState): LiveObjectUpdate {
4751
if (objectState.objectId != objectId) {
4852
throw objectError("Invalid object state: object state objectId=${objectState.objectId}; $objectType objectId=$objectId")
4953
}
@@ -61,7 +65,10 @@ internal abstract class BaseLiveObject(
6165

6266
if (isTombstoned) {
6367
// this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of object state message processing
64-
return mapOf()
68+
if (objectType == ObjectType.Map) {
69+
return noOpMapUpdate
70+
}
71+
return noOpCounterUpdate
6572
}
6673
return applyObjectState(objectState) // RTLM6, RTLC6
6774
}
@@ -102,11 +109,6 @@ internal abstract class BaseLiveObject(
102109
applyObjectOperation(objectOperation, objectMessage) // RTLC7d
103110
}
104111

105-
internal fun notifyUpdated(update: Any) {
106-
// TODO: Implement event emission for updates
107-
Log.v(tag, "Object $objectId updated: $update")
108-
}
109-
110112
/**
111113
* Checks if an operation can be applied based on serial comparison.
112114
*
@@ -126,7 +128,7 @@ internal abstract class BaseLiveObject(
126128
/**
127129
* Marks the object as tombstoned.
128130
*/
129-
internal fun tombstone(): Any {
131+
internal fun tombstone(): LiveObjectUpdate {
130132
isTombstoned = true
131133
tombstonedAt = System.currentTimeMillis()
132134
val update = clearData()
@@ -151,7 +153,7 @@ internal abstract class BaseLiveObject(
151153
* @return A map describing the changes made to the object's data
152154
*
153155
*/
154-
abstract fun applyObjectState(objectState: ObjectState): Map<String, Any>
156+
abstract fun applyObjectState(objectState: ObjectState): LiveObjectUpdate
155157

156158
/**
157159
* Applies an operation to this live object.
@@ -177,7 +179,13 @@ internal abstract class BaseLiveObject(
177179
*
178180
* @return A map representing the diff of changes made
179181
*/
180-
abstract fun clearData(): Map<String, Any>
182+
abstract fun clearData(): LiveObjectUpdate
183+
184+
/**
185+
* Notifies subscribers about changes made to this live object. Propagates updates through the
186+
* appropriate manager after converting the generic update map to type-specific update objects.
187+
*/
188+
abstract fun notifyUpdated(update: LiveObjectUpdate)
181189

182190
/**
183191
* Called during garbage collection intervals to clean up expired entries.

live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@ import io.ably.lib.objects.*
44
import io.ably.lib.objects.ObjectOperation
55
import io.ably.lib.objects.ObjectState
66
import io.ably.lib.objects.type.BaseLiveObject
7+
import io.ably.lib.objects.type.LiveObjectUpdate
78
import io.ably.lib.objects.type.ObjectType
89
import io.ably.lib.objects.type.counter.LiveCounter
10+
import io.ably.lib.objects.type.counter.LiveCounterChange
11+
import io.ably.lib.objects.type.counter.LiveCounterUpdate
12+
import io.ably.lib.objects.type.noOp
913
import io.ably.lib.types.Callback
14+
import io.ably.lib.util.Log
1015
import java.util.concurrent.atomic.AtomicLong
1116

1217
/**
@@ -50,21 +55,38 @@ internal class DefaultLiveCounter private constructor(
5055
TODO("Not yet implemented")
5156
}
5257

58+
override fun subscribe(listener: LiveCounterChange.Listener): ObjectsSubscription {
59+
return liveCounterManager.subscribe(listener)
60+
}
61+
62+
override fun unsubscribe(listener: LiveCounterChange.Listener) = liveCounterManager.unsubscribe(listener)
63+
64+
override fun unsubscribeAll() = liveCounterManager.unsubscribeAll()
65+
5366
override fun value(): Long {
5467
adapter.throwIfInvalidAccessApiConfiguration(channelName)
5568
return data.get()
5669
}
5770

58-
override fun applyObjectState(objectState: ObjectState): Map<String, Long> {
71+
override fun applyObjectState(objectState: ObjectState): LiveCounterUpdate {
5972
return liveCounterManager.applyState(objectState)
6073
}
6174

6275
override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
6376
liveCounterManager.applyOperation(operation)
6477
}
6578

66-
override fun clearData(): Map<String, Long> {
67-
return mapOf("amount" to data.get()).apply { data.set(0) }
79+
override fun clearData(): LiveCounterUpdate {
80+
return LiveCounterUpdate(data.get()).apply { data.set(0) }
81+
}
82+
83+
override fun notifyUpdated(update: LiveObjectUpdate) {
84+
if (update.noOp) {
85+
return
86+
}
87+
val counterUpdate = update as LiveCounterUpdate
88+
Log.v(tag, "Object $objectId updated: $update")
89+
liveCounterManager.notify(counterUpdate)
6890
}
6991

7092
override fun onGCInterval() {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.ably.lib.objects.type.livecounter
2+
3+
import io.ably.lib.objects.ObjectsSubscription
4+
import io.ably.lib.objects.type.counter.LiveCounterChange
5+
import io.ably.lib.objects.type.counter.LiveCounterUpdate
6+
import io.ably.lib.util.EventEmitter
7+
import io.ably.lib.util.Log
8+
9+
internal val noOpCounterUpdate = LiveCounterUpdate()
10+
11+
internal interface HandlesLiveCounterChange {
12+
fun notify(update: LiveCounterUpdate)
13+
}
14+
15+
internal abstract class LiveCounterChangeCoordinator: LiveCounterChange, HandlesLiveCounterChange {
16+
private val tag = "DefaultLiveCounterChangeCoordinator"
17+
18+
private val counterChangeEmitter = LiveCounterChangeEmitter()
19+
20+
override fun subscribe(listener: LiveCounterChange.Listener): ObjectsSubscription {
21+
counterChangeEmitter.on(listener)
22+
return ObjectsSubscription {
23+
counterChangeEmitter.off(listener)
24+
}
25+
}
26+
27+
override fun unsubscribe(listener: LiveCounterChange.Listener) = counterChangeEmitter.off(listener)
28+
29+
override fun unsubscribeAll() = counterChangeEmitter.off()
30+
31+
override fun notify(update: LiveCounterUpdate) = counterChangeEmitter.emit(update)
32+
}
33+
34+
private class LiveCounterChangeEmitter : EventEmitter<LiveCounterUpdate, LiveCounterChange.Listener>() {
35+
private val tag = "LiveCounterChangeEmitter"
36+
37+
override fun apply(listener: LiveCounterChange.Listener?, event: LiveCounterUpdate?, vararg args: Any?) {
38+
try {
39+
listener?.onUpdated(event!!)
40+
} catch (t: Throwable) {
41+
Log.e(tag, "Error occurred while executing listener callback for event: $event", t)
42+
}
43+
}
44+
}

live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,19 @@ import io.ably.lib.objects.ObjectOperation
55
import io.ably.lib.objects.ObjectOperationAction
66
import io.ably.lib.objects.ObjectState
77
import io.ably.lib.objects.objectError
8+
import io.ably.lib.objects.type.counter.LiveCounterUpdate
89
import io.ably.lib.util.Log
910

10-
internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
11+
internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter): LiveCounterChangeCoordinator() {
12+
1113
private val objectId = liveCounter.objectId
1214

1315
private val tag = "LiveCounterManager"
1416

1517
/**
1618
* @spec RTLC6 - Overrides counter data with state from sync
1719
*/
18-
internal fun applyState(objectState: ObjectState): Map<String, Long> {
20+
internal fun applyState(objectState: ObjectState): LiveCounterUpdate {
1921
val previousData = liveCounter.data.get()
2022

2123
if (objectState.tombstone) {
@@ -31,7 +33,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
3133
}
3234
}
3335

34-
return mapOf("amount" to (liveCounter.data.get() - previousData))
36+
return LiveCounterUpdate(liveCounter.data.get() - previousData)
3537
}
3638

3739
/**
@@ -57,7 +59,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
5759
/**
5860
* @spec RTLC8 - Applies counter create operation
5961
*/
60-
private fun applyCounterCreate(operation: ObjectOperation): Map<String, Long> {
62+
private fun applyCounterCreate(operation: ObjectOperation): LiveCounterUpdate {
6163
if (liveCounter.createOperationIsMerged) {
6264
// RTLC8b
6365
// There can't be two different create operation for the same object id, because the object id
@@ -67,7 +69,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
6769
tag,
6870
"Skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=$objectId"
6971
)
70-
return mapOf()
72+
return noOpCounterUpdate // RTLC8c
7173
}
7274

7375
return mergeInitialDataFromCreateOperation(operation) // RTLC8c
@@ -76,17 +78,17 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
7678
/**
7779
* @spec RTLC9 - Applies counter increment operation
7880
*/
79-
private fun applyCounterInc(counterOp: ObjectCounterOp): Map<String, Long> {
81+
private fun applyCounterInc(counterOp: ObjectCounterOp): LiveCounterUpdate {
8082
val amount = counterOp.amount?.toLong() ?: 0
8183
val previousValue = liveCounter.data.get()
8284
liveCounter.data.set(previousValue + amount) // RTLC9b
83-
return mapOf("amount" to amount)
85+
return LiveCounterUpdate(amount)
8486
}
8587

8688
/**
8789
* @spec RTLC10 - Merges initial data from create operation
8890
*/
89-
private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): Map<String, Long> {
91+
private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): LiveCounterUpdate {
9092
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case.
9193
// note that it is intentional to SUM the incoming count from the create op.
9294
// if we got here, it means that current counter instance is missing the initial value in its data reference,
@@ -95,6 +97,6 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) {
9597
val previousValue = liveCounter.data.get()
9698
liveCounter.data.set(previousValue + count) // RTLC10a
9799
liveCounter.createOperationIsMerged = true // RTLC10b
98-
return mapOf("amount" to count)
100+
return LiveCounterUpdate(count)
99101
}
100102
}

0 commit comments

Comments
 (0)