Skip to content

Commit 8df8752

Browse files
authored
Merge pull request #904 from marcodejongh/fix/ghost-participants-cleanup
Fix ghost participants accumulating in sessions
2 parents bca9730 + bde7076 commit 8df8752

File tree

14 files changed

+16321
-121
lines changed

14 files changed

+16321
-121
lines changed

docs/websocket-implementation.md

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,19 @@ When Redis is unavailable, RoomManager falls back to **Postgres-only mode**:
863863
| **Warm** | Redis only | 4 hours | Recently inactive (users left) |
864864
| **Cold** | PostgreSQL | Indefinite | Historical sessions, dormant restoration |
865865

866+
### Participant Tracking (Postgres)
867+
868+
Session participation is tracked in two layers:
869+
870+
| Layer | Table / Key | Granularity | Lifetime |
871+
|-------|-------------|-------------|----------|
872+
| **Real-time** (Redis) | `boardsesh:session:{id}:members` | Per connection ID | Ephemeral (4h TTL) |
873+
| **Historical** (Postgres) | `board_session_participants` | Per authenticated user | Permanent |
874+
875+
**`board_session_participants`** records one row per (session_id, user_id) with a `joined_at` timestamp. It is upserted (`ON CONFLICT DO NOTHING`) when an authenticated user joins a session. Rows are never deleted on disconnect — they serve as a permanent historical record of who participated.
876+
877+
> **Note:** The legacy `board_session_clients` table (one row per WebSocket connection) is no longer written to. Leader state is managed exclusively in Redis via the `DistributedStateManager`.
878+
866879
### Key Redis Data Structures
867880

868881
**Session State (RedisSessionStore):**
@@ -877,11 +890,11 @@ boardsesh:lock:session:restore:{id} # String - distributed lock (10s TTL)
877890

878891
**Distributed State (DistributedStateManager):**
879892
```
880-
boardsesh:conn:{connectionId} # Hash - connection data (instanceId, sessionId, username, etc.)
881-
boardsesh:session:{id}:members # Set - connection IDs in session (cross-instance)
882-
boardsesh:session:{id}:leader # String - leader connection ID
883-
boardsesh:instance:{id}:conns # Set - connections owned by instance
884-
boardsesh:instance:{id}:heartbeat # String - instance heartbeat timestamp (60s TTL)
893+
boardsesh:conn:{connectionId} # Hash - connection data (1h TTL, refreshed on activity)
894+
boardsesh:session:{id}:members # Set - connection IDs in session (4h TTL)
895+
boardsesh:session:{id}:leader # String - leader connection ID (4h TTL)
896+
boardsesh:instance:{id}:conns # Set - connections owned by instance (2h TTL, refreshed on heartbeat)
897+
boardsesh:instance:{id}:heartbeat # String - instance heartbeat timestamp (60s TTL, refreshed every 30s)
885898
```
886899

887900
**Pub/Sub Channels:**
@@ -931,63 +944,50 @@ sequenceDiagram
931944
P->>P: Exit
932945
```
933946

934-
### Dead Instance Detection and TTL Cleanup
947+
### Dead Instance Detection and Active Cleanup
935948

936-
When an instance crashes or terminates without graceful shutdown, the system relies on TTL-based cleanup:
949+
When an instance crashes or terminates without graceful shutdown, the system uses a combination of active cleanup and TTL-based self-healing:
937950

938-
**1. Instance Heartbeat Expiry (60s TTL)**
951+
**1. Instance Heartbeat (60s TTL, refreshed every 30s)**
939952

940-
Each instance updates its heartbeat every 30 seconds:
941953
```
942954
boardsesh:instance:{id}:heartbeat = timestamp (60s TTL)
943955
```
944956

945-
When an instance dies unexpectedly:
946-
- The heartbeat key expires after 60 seconds
947-
- Redis automatically removes the heartbeat key
948-
949-
**2. Connection Data Expiry (1 hour TTL)**
950-
951-
Connection data has its own TTL:
952-
```
953-
boardsesh:conn:{connectionId} = {...} (1 hour TTL)
954-
```
955-
956-
Connections from dead instances:
957-
- Continue to exist until their 1-hour TTL expires
958-
- Are refreshed on client activity (extends TTL)
959-
- Eventually expire if no activity
957+
When an instance dies unexpectedly, its heartbeat key expires after 60 seconds.
960958

961-
**3. Session Member Sets (4 hour TTL)**
959+
**2. Active Dead Instance Cleanup**
962960

963-
Session member sets track all connection IDs:
964-
```
965-
boardsesh:session:{id}:members = Set of connectionIds (4 hour TTL)
966-
```
961+
The `DistributedStateManager` actively discovers and cleans up dead instances:
967962

968-
**Limitation: No Active Cleanup for Dead Instances**
963+
- **On startup**: `cleanupDeadInstanceConnections()` runs asynchronously after the first heartbeat
964+
- **Periodically**: Piggybacks on the 30s heartbeat cycle, running every 4th heartbeat (~2 minutes)
969965

970-
Currently, there is no background job that actively scans for dead instances and cleans up their orphaned connections. This means:
966+
The cleanup process:
967+
1. SCANs for `boardsesh:instance:*:conns` keys
968+
2. Checks if the corresponding heartbeat key exists (skip current instance)
969+
3. For each dead instance: fetches its connection IDs, groups by session
970+
4. Deletes all orphaned connection hashes and instance tracking keys
971+
5. Runs `PRUNE_STALE_SESSION_MEMBERS_SCRIPT` (Lua) per affected session to atomically remove stale members and re-elect leader if needed
971972

972-
- Session member sets may temporarily contain connection IDs from dead instances
973-
- `getSessionMembers()` may return connections that no longer exist (gracefully handled by filtering out missing connection data)
974-
- Leader election may initially select a connection from a dead instance (but will re-elect on next leader action)
975-
- Connection data remains until TTL expires naturally
973+
**3. Stale-Filtering at Read Time**
976974

977-
**Implications for Clients**
975+
Even between cleanup cycles, stale entries never inflate participant counts:
978976

979-
- Clients should handle the case where a session "member" is no longer reachable
980-
- Leader changes may occur when the elected leader from a dead instance is detected as unresponsive
981-
- User lists may temporarily show stale entries that get filtered out on refresh
977+
- `getSessionMemberCount()` pipelines `EXISTS` checks against each member's connection hash — only live connections are counted
978+
- `getSessionMembers()` filters out members whose connection data is missing
979+
- `hasSessionMembers()` delegates to the filtered count
982980

983-
**Future Improvement**
981+
**4. TTL Self-Healing (defense in depth)**
984982

985-
A background cleanup job could periodically:
986-
1. Scan for instances with expired heartbeats
987-
2. Remove their orphaned connections from session member sets
988-
3. Trigger leader re-election if the current leader's instance is dead
983+
| Key | TTL | Refreshed |
984+
|-----|-----|-----------|
985+
| `boardsesh:conn:{id}` | 1 hour | On client activity |
986+
| `boardsesh:instance:{id}:conns` | 2 hours | On every heartbeat (30s) |
987+
| `boardsesh:instance:{id}:heartbeat` | 60 seconds | Every 30s |
988+
| `boardsesh:session:{id}:members` | 4 hours | On join/leave/refresh |
989989

990-
This would reduce the window of stale data but is not currently implemented.
990+
Even if active cleanup fails, orphaned data expires naturally via these TTLs.
991991

992992
---
993993

packages/backend/src/__tests__/distributed-state.test.ts

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,3 +1090,249 @@ describe.skipIf(!redisAvailable)('DistributedStateManager - cleanup error handli
10901090
expect(Object.keys(conn).length).toBe(0);
10911091
});
10921092
});
1093+
1094+
describe.skipIf(!redisAvailable)('DistributedStateManager - Dead Instance Cleanup', () => {
1095+
let redis: Redis;
1096+
let liveManager: DistributedStateManager;
1097+
1098+
beforeAll(async () => {
1099+
redis = new Redis(REDIS_URL);
1100+
await new Promise<void>((resolve) => redis.once('ready', resolve));
1101+
});
1102+
1103+
afterAll(async () => {
1104+
forceResetDistributedState();
1105+
await redis.quit();
1106+
});
1107+
1108+
beforeEach(async () => {
1109+
forceResetDistributedState();
1110+
try {
1111+
const keys = await redis.keys('boardsesh:*');
1112+
if (keys.length > 0) {
1113+
await redis.del(...keys);
1114+
}
1115+
} catch (err) {
1116+
console.warn('Failed to clean up test keys:', err);
1117+
}
1118+
liveManager = new DistributedStateManager(redis, 'live-instance');
1119+
});
1120+
1121+
afterEach(async () => {
1122+
await liveManager.stop();
1123+
forceResetDistributedState();
1124+
});
1125+
1126+
describe('discoverDeadInstances', () => {
1127+
it('should return empty when no other instances exist', async () => {
1128+
liveManager.start();
1129+
// Give heartbeat time to run
1130+
await new Promise((r) => setTimeout(r, 100));
1131+
1132+
const dead = await liveManager.discoverDeadInstances();
1133+
expect(dead).toEqual([]);
1134+
});
1135+
1136+
it('should skip the current instance', async () => {
1137+
liveManager.start();
1138+
await new Promise((r) => setTimeout(r, 100));
1139+
1140+
// Even if we manually clear our own heartbeat, discoverDeadInstances
1141+
// skips the current instance ID.
1142+
await redis.del('boardsesh:instance:live-instance:heartbeat');
1143+
1144+
const dead = await liveManager.discoverDeadInstances();
1145+
expect(dead).toEqual([]);
1146+
});
1147+
1148+
it('should find instances whose heartbeat expired', async () => {
1149+
// Simulate a dead instance: create its conns key but no heartbeat
1150+
await redis.sadd('boardsesh:instance:dead-inst:conns', 'orphan-conn-1');
1151+
1152+
const dead = await liveManager.discoverDeadInstances();
1153+
expect(dead).toEqual(['dead-inst']);
1154+
});
1155+
1156+
it('should not report instances with active heartbeats', async () => {
1157+
// Create another instance with a valid heartbeat
1158+
await redis.sadd('boardsesh:instance:alive-inst:conns', 'conn-x');
1159+
await redis.setex('boardsesh:instance:alive-inst:heartbeat', 60, Date.now().toString());
1160+
1161+
const dead = await liveManager.discoverDeadInstances();
1162+
expect(dead).toEqual([]);
1163+
});
1164+
});
1165+
1166+
describe('cleanupDeadInstanceConnections', () => {
1167+
it('should remove orphaned connections from dead instances', async () => {
1168+
// Register a live connection
1169+
await liveManager.registerConnection('live-conn', 'LiveUser');
1170+
await liveManager.joinSession('live-conn', 'shared-session');
1171+
1172+
// Simulate a dead instance with orphaned connections in the same session
1173+
const deadInstanceId = 'dead-instance-abc';
1174+
// Create connection hashes for the dead connections
1175+
await redis.hmset('boardsesh:conn:orphan-1', {
1176+
connectionId: 'orphan-1',
1177+
instanceId: deadInstanceId,
1178+
sessionId: 'shared-session',
1179+
userId: '',
1180+
username: 'Ghost1',
1181+
avatarUrl: '',
1182+
isLeader: 'false',
1183+
connectedAt: (Date.now() - 60000).toString(),
1184+
});
1185+
await redis.hmset('boardsesh:conn:orphan-2', {
1186+
connectionId: 'orphan-2',
1187+
instanceId: deadInstanceId,
1188+
sessionId: 'shared-session',
1189+
userId: '',
1190+
username: 'Ghost2',
1191+
avatarUrl: '',
1192+
isLeader: 'false',
1193+
connectedAt: (Date.now() - 60000).toString(),
1194+
});
1195+
// Add to instance tracking (no heartbeat = dead)
1196+
await redis.sadd(`boardsesh:instance:${deadInstanceId}:conns`, 'orphan-1', 'orphan-2');
1197+
// Add to session members
1198+
await redis.sadd('boardsesh:session:shared-session:members', 'orphan-1', 'orphan-2');
1199+
1200+
// Verify inflated count before cleanup
1201+
const rawCount = await redis.scard('boardsesh:session:shared-session:members');
1202+
expect(rawCount).toBe(3); // 1 live + 2 ghosts
1203+
1204+
// Run cleanup
1205+
const result = await liveManager.cleanupDeadInstanceConnections();
1206+
1207+
expect(result.deadInstances).toEqual([deadInstanceId]);
1208+
expect(result.staleConnections.sort()).toEqual(['orphan-1', 'orphan-2']);
1209+
expect(result.sessionsAffected).toEqual(['shared-session']);
1210+
1211+
// Verify connection hashes are deleted
1212+
const conn1 = await redis.hgetall('boardsesh:conn:orphan-1');
1213+
const conn2 = await redis.hgetall('boardsesh:conn:orphan-2');
1214+
expect(Object.keys(conn1).length).toBe(0);
1215+
expect(Object.keys(conn2).length).toBe(0);
1216+
1217+
// Verify instance tracking key is deleted
1218+
const instanceConns = await redis.smembers(`boardsesh:instance:${deadInstanceId}:conns`);
1219+
expect(instanceConns.length).toBe(0);
1220+
1221+
// Verify session member count is correct (only live connection)
1222+
const members = await liveManager.getSessionMembers('shared-session');
1223+
expect(members.length).toBe(1);
1224+
expect(members[0].username).toBe('LiveUser');
1225+
});
1226+
1227+
it('should return empty summary when no dead instances exist', async () => {
1228+
const result = await liveManager.cleanupDeadInstanceConnections();
1229+
expect(result.deadInstances).toEqual([]);
1230+
expect(result.staleConnections).toEqual([]);
1231+
expect(result.sessionsAffected).toEqual([]);
1232+
});
1233+
1234+
it('should handle dead instance with no connections', async () => {
1235+
// Dead instance with empty conns set
1236+
await redis.sadd('boardsesh:instance:empty-dead:conns', '__placeholder__');
1237+
await redis.srem('boardsesh:instance:empty-dead:conns', '__placeholder__');
1238+
// Actually just create the key with an entry then remove it—
1239+
// simpler: just create it directly
1240+
await redis.del('boardsesh:instance:empty-dead:conns');
1241+
// Need the key to exist for SCAN to find it
1242+
await redis.sadd('boardsesh:instance:empty-dead:conns', 'temp');
1243+
await redis.srem('boardsesh:instance:empty-dead:conns', 'temp');
1244+
1245+
// The key may or may not exist (Redis deletes empty sets).
1246+
// Either way, cleanup should not error.
1247+
const result = await liveManager.cleanupDeadInstanceConnections();
1248+
// May or may not find the instance depending on whether Redis kept the empty set
1249+
expect(result.staleConnections).toEqual([]);
1250+
});
1251+
});
1252+
1253+
describe('cleanupStaleSessionMembers', () => {
1254+
it('should remove members whose connection hashes expired', async () => {
1255+
await liveManager.registerConnection('real-conn', 'RealUser');
1256+
await liveManager.joinSession('real-conn', 'prune-session');
1257+
1258+
// Add a stale member directly (no connection hash)
1259+
await redis.sadd('boardsesh:session:prune-session:members', 'stale-conn');
1260+
1261+
// Verify stale member is in the set
1262+
const rawCount = await redis.scard('boardsesh:session:prune-session:members');
1263+
expect(rawCount).toBe(2);
1264+
1265+
const removed = await liveManager.cleanupStaleSessionMembers('prune-session');
1266+
expect(removed).toBe(1);
1267+
1268+
// Only real connection remains
1269+
const count = await liveManager.getSessionMemberCount('prune-session');
1270+
expect(count).toBe(1);
1271+
});
1272+
1273+
it('should re-elect leader when stale leader is pruned', async () => {
1274+
await liveManager.registerConnection('member-conn', 'Member');
1275+
await liveManager.joinSession('member-conn', 'leader-prune-session');
1276+
1277+
// Add a stale leader directly
1278+
await redis.sadd('boardsesh:session:leader-prune-session:members', 'stale-leader');
1279+
await redis.set('boardsesh:session:leader-prune-session:leader', 'stale-leader', 'EX', 14400);
1280+
1281+
const removed = await liveManager.cleanupStaleSessionMembers('leader-prune-session');
1282+
expect(removed).toBe(1);
1283+
1284+
// member-conn should now be leader
1285+
const leader = await liveManager.getSessionLeader('leader-prune-session');
1286+
expect(leader).toBe('member-conn');
1287+
1288+
const conn = await liveManager.getConnection('member-conn');
1289+
expect(conn!.isLeader).toBe(true);
1290+
});
1291+
1292+
it('should clean up empty session when all members are stale', async () => {
1293+
// Add only stale members
1294+
await redis.sadd('boardsesh:session:all-stale:members', 'gone-1', 'gone-2');
1295+
await redis.set('boardsesh:session:all-stale:leader', 'gone-1', 'EX', 14400);
1296+
1297+
const removed = await liveManager.cleanupStaleSessionMembers('all-stale');
1298+
expect(removed).toBe(2);
1299+
1300+
// Session should be cleaned up
1301+
const exists = await redis.exists('boardsesh:session:all-stale:members');
1302+
expect(exists).toBe(0);
1303+
1304+
const leader = await liveManager.getSessionLeader('all-stale');
1305+
expect(leader).toBeNull();
1306+
});
1307+
1308+
it('should return 0 when no stale members exist', async () => {
1309+
await liveManager.registerConnection('healthy-conn', 'Healthy');
1310+
await liveManager.joinSession('healthy-conn', 'healthy-session');
1311+
1312+
const removed = await liveManager.cleanupStaleSessionMembers('healthy-session');
1313+
expect(removed).toBe(0);
1314+
});
1315+
});
1316+
1317+
describe('getSessionMemberCount - stale filtering', () => {
1318+
it('should not count members whose connection hashes are missing', async () => {
1319+
await liveManager.registerConnection('counted-conn', 'Counted');
1320+
await liveManager.joinSession('counted-conn', 'count-session');
1321+
1322+
// Add stale members directly to the session set
1323+
await redis.sadd('boardsesh:session:count-session:members', 'ghost-1', 'ghost-2');
1324+
1325+
// Raw SCARD would return 3, but getSessionMemberCount should return 1
1326+
const rawCount = await redis.scard('boardsesh:session:count-session:members');
1327+
expect(rawCount).toBe(3);
1328+
1329+
const filteredCount = await liveManager.getSessionMemberCount('count-session');
1330+
expect(filteredCount).toBe(1);
1331+
});
1332+
1333+
it('should return 0 for empty session', async () => {
1334+
const count = await liveManager.getSessionMemberCount('nonexistent-session');
1335+
expect(count).toBe(0);
1336+
});
1337+
});
1338+
});

packages/backend/src/db/schema.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export {
44
boardSessionClients as sessionClients,
55
boardSessionQueues as sessionQueues,
66
sessionBoards,
7+
boardSessionParticipants,
78
type BoardSession as Session,
89
type NewBoardSession as NewSession,
910
type BoardSessionClient as SessionClient,
@@ -12,4 +13,6 @@ export {
1213
type NewBoardSessionQueue as NewSessionQueue,
1314
type SessionBoard,
1415
type NewSessionBoard,
16+
type BoardSessionParticipant,
17+
type NewBoardSessionParticipant,
1518
} from '@boardsesh/db/schema/app';

0 commit comments

Comments
 (0)