|
| 1 | +# LiveObjects Implementation Summary |
| 2 | + |
| 3 | +## Overview |
| 4 | + |
| 5 | +This document summarizes the implementation of object message handling logic in the ably-java liveobjects module, |
| 6 | +based on the JavaScript implementation in ably-js. |
| 7 | + |
| 8 | +## JavaScript Implementation Analysis |
| 9 | + |
| 10 | +### Flow Overview |
| 11 | + |
| 12 | +The JavaScript implementation follows this flow: |
| 13 | + |
| 14 | +1. **Entry Point**: `RealtimeChannel.processMessage()` receives protocol messages with `OBJECT` or `OBJECT_SYNC` actions |
| 15 | +2. **Message Routing**: |
| 16 | + - `OBJECT` action → `this._objects.handleObjectMessages()` |
| 17 | + - `OBJECT_SYNC` action → `this._objects.handleObjectSyncMessages()` |
| 18 | +3. **State Management**: Objects have states: `initialized`, `syncing`, `synced` |
| 19 | +4. **Buffering**: Non-sync messages are buffered when state is not `synced` |
| 20 | +5. **Sync Processing**: Sync messages are applied to a data pool and then applied to objects |
| 21 | +6. **Operation Application**: Individual operations are applied to objects with serial-based conflict resolution |
| 22 | + |
| 23 | +### Key Components |
| 24 | + |
| 25 | +- **ObjectsPool**: Manages live objects by objectId |
| 26 | +- **SyncObjectsDataPool**: Temporarily stores sync data before applying to objects |
| 27 | +- **Buffered Operations**: Queues operations during sync sequences |
| 28 | +- **Serial-based Conflict Resolution**: Uses site serials to determine operation precedence |
| 29 | + |
| 30 | +## Kotlin Implementation |
| 31 | + |
| 32 | +### Files Modified/Created |
| 33 | + |
| 34 | +1. **DefaultLiveObjects.kt** - Main implementation with state management and message handling |
| 35 | +2. **LiveObjectImpl.kt** - Concrete implementations of LiveMap and LiveCounter |
| 36 | + |
| 37 | +### Key Features Implemented |
| 38 | + |
| 39 | +#### 1. State Management |
| 40 | +```kotlin |
| 41 | +private enum class ObjectsState { |
| 42 | + INITIALIZED, |
| 43 | + SYNCING, |
| 44 | + SYNCED |
| 45 | +} |
| 46 | +``` |
| 47 | + |
| 48 | +#### 2. Message Handling |
| 49 | +- **handle()**: Main entry point for protocol messages |
| 50 | +- **handleObjectMessages()**: Processes regular object messages |
| 51 | +- **handleObjectSyncMessages()**: Processes sync messages |
| 52 | + |
| 53 | +#### 3. Sync Processing |
| 54 | +- **parseSyncChannelSerial()**: Extracts syncId and syncCursor from channel serial |
| 55 | +- **startNewSync()**: Begins new sync sequence |
| 56 | +- **endSync()**: Completes sync sequence and applies buffered operations |
| 57 | +- **applySync()**: Applies sync data to objects pool |
| 58 | + |
| 59 | +#### 4. Object Operations |
| 60 | +- **applyObjectMessages()**: Applies individual operations to objects |
| 61 | +- **createZeroValueObjectIfNotExists()**: Creates placeholder objects for operations |
| 62 | +- **parseObjectId()**: Parses object IDs to determine type |
| 63 | + |
| 64 | +#### 5. LiveObject Implementations |
| 65 | +- **BaseLiveObject**: Abstract base class with common functionality |
| 66 | +- **LiveMapImpl**: Concrete implementation for map objects |
| 67 | +- **LiveCounterImpl**: Concrete implementation for counter objects |
| 68 | + |
| 69 | +### Implementation Details |
| 70 | + |
| 71 | +#### Serial-based Conflict Resolution |
| 72 | +```kotlin |
| 73 | +protected fun canApplyOperation(opSerial: String?, opSiteCode: String?): Boolean { |
| 74 | + val siteSerial = siteTimeserials[opSiteCode] |
| 75 | + return siteSerial == null || opSerial > siteSerial |
| 76 | +} |
| 77 | +``` |
| 78 | + |
| 79 | +#### CRDT Semantics for Maps |
| 80 | +```kotlin |
| 81 | +private fun canApplyMapOperation(mapEntrySerial: String?, opSerial: String?): Boolean { |
| 82 | + // For LWW CRDT semantics, operation should only be applied if its serial is strictly greater |
| 83 | + if (mapEntrySerial.isNullOrEmpty() && opSerial.isNullOrEmpty()) { |
| 84 | + return false |
| 85 | + } |
| 86 | + if (mapEntrySerial.isNullOrEmpty()) { |
| 87 | + return true |
| 88 | + } |
| 89 | + if (opSerial.isNullOrEmpty()) { |
| 90 | + return false |
| 91 | + } |
| 92 | + return opSerial > mapEntrySerial |
| 93 | +} |
| 94 | +``` |
| 95 | + |
| 96 | +#### State Transitions |
| 97 | +1. **INITIALIZED** → **SYNCING**: When first sync message received |
| 98 | +2. **SYNCING** → **SYNCED**: When sync sequence completes |
| 99 | +3. **SYNCED**: Normal operation state |
| 100 | + |
| 101 | +#### Buffering Strategy |
| 102 | +- Regular object messages are buffered during sync sequences |
| 103 | +- Buffered messages are applied after sync completion |
| 104 | +- This ensures consistency and prevents race conditions |
| 105 | + |
| 106 | +### Comparison with JavaScript |
| 107 | + |
| 108 | +| Feature | JavaScript | Kotlin | |
| 109 | +|---------|------------|--------| |
| 110 | +| State Management | `ObjectsState` enum | `ObjectsState` enum | |
| 111 | +| Object Pool | `ObjectsPool` class | `ConcurrentHashMap<String, LiveObject>` | |
| 112 | +| Sync Data Pool | `SyncObjectsDataPool` class | `ConcurrentHashMap<String, ObjectState>` | |
| 113 | +| Buffering | `_bufferedObjectOperations` array | `bufferedObjectOperations` list | |
| 114 | +| Serial Parsing | `_parseSyncChannelSerial()` | `parseSyncChannelSerial()` | |
| 115 | +| CRDT Logic | `_canApplyMapOperation()` | `canApplyMapOperation()` | |
| 116 | + |
| 117 | +### Thread Safety |
| 118 | + |
| 119 | +The Kotlin implementation uses: |
| 120 | +- `ConcurrentHashMap` for thread-safe collections |
| 121 | +- Immutable data structures where possible |
| 122 | +- Proper synchronization for state changes |
| 123 | + |
| 124 | +### Error Handling |
| 125 | + |
| 126 | +- Validates object IDs and operation parameters |
| 127 | +- Logs warnings for malformed messages |
| 128 | +- Throws appropriate exceptions for invalid states |
| 129 | +- Graceful handling of missing serials or site codes |
| 130 | + |
| 131 | +### Future Enhancements |
| 132 | + |
| 133 | +1. **Event Emission**: Implement proper event emission for object updates |
| 134 | +2. **Lifecycle Events**: Add support for object lifecycle events (created, deleted) |
| 135 | +3. **Garbage Collection**: Implement GC for tombstoned objects |
| 136 | +4. **Performance Optimization**: Add caching and optimization for frequently accessed objects |
| 137 | +5. **Testing**: Comprehensive unit and integration tests |
| 138 | + |
| 139 | +## Compliance with Specification |
| 140 | + |
| 141 | +The implementation follows the Ably LiveObjects specification (PR #333) and maintains compatibility with the JavaScript implementation while leveraging Kotlin's type safety and concurrency features. |
0 commit comments