Skip to content

Commit e5d9b98

Browse files
committed
use native api
1 parent d00ed95 commit e5d9b98

File tree

2 files changed

+46
-91
lines changed

2 files changed

+46
-91
lines changed

apps/sim/socket/handlers/subblocks.ts

Lines changed: 23 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,11 @@ async function flushSubblockUpdate(
133133

134134
if (workflowExists.length === 0) {
135135
pending.opToSocket.forEach((socketId, opId) => {
136-
const sock = io.sockets.sockets.get(socketId)
137-
if (sock) {
138-
sock.emit('operation-failed', {
139-
operationId: opId,
140-
error: 'Workflow not found',
141-
retryable: false,
142-
})
143-
}
136+
io.to(socketId).emit('operation-failed', {
137+
operationId: opId,
138+
error: 'Workflow not found',
139+
retryable: false,
140+
})
144141
})
145142
return
146143
}
@@ -173,68 +170,48 @@ async function flushSubblockUpdate(
173170
})
174171

175172
if (updateSuccessful) {
176-
// Collect all sender socket IDs to exclude from broadcast
173+
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
177174
const senderSocketIds = [...pending.opToSocket.values()]
178-
const firstSenderSocket =
179-
senderSocketIds.length > 0 ? io.sockets.sockets.get(senderSocketIds[0]) : null
180-
181-
if (firstSenderSocket) {
182-
// socket.to(room).emit() excludes sender and broadcasts across all pods via Redis adapter
183-
firstSenderSocket.to(workflowId).emit('subblock-update', {
184-
blockId,
185-
subblockId,
186-
value,
187-
timestamp,
188-
})
189-
} else if (senderSocketIds.length > 0) {
190-
// Senders disconnected but we should still exclude them in case they reconnected
191-
// Use io.except() to exclude all sender socket IDs
175+
if (senderSocketIds.length > 0) {
192176
io.to(workflowId).except(senderSocketIds).emit('subblock-update', {
193177
blockId,
194178
subblockId,
195179
value,
196180
timestamp,
197181
})
198182
} else {
199-
// No senders tracked (edge case) - broadcast to all
200-
roomManager.emitToWorkflow(workflowId, 'subblock-update', {
183+
io.to(workflowId).emit('subblock-update', {
201184
blockId,
202185
subblockId,
203186
value,
204187
timestamp,
205188
})
206189
}
207190

208-
// Confirm all coalesced operationIds (only to sockets still connected on this pod)
191+
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)
209192
pending.opToSocket.forEach((socketId, opId) => {
210-
const sock = io.sockets.sockets.get(socketId)
211-
if (sock) {
212-
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
213-
}
193+
io.to(socketId).emit('operation-confirmed', {
194+
operationId: opId,
195+
serverTimestamp: Date.now(),
196+
})
214197
})
215198
} else {
216199
pending.opToSocket.forEach((socketId, opId) => {
217-
const sock = io.sockets.sockets.get(socketId)
218-
if (sock) {
219-
sock.emit('operation-failed', {
220-
operationId: opId,
221-
error: 'Block no longer exists',
222-
retryable: false,
223-
})
224-
}
200+
io.to(socketId).emit('operation-failed', {
201+
operationId: opId,
202+
error: 'Block no longer exists',
203+
retryable: false,
204+
})
225205
})
226206
}
227207
} catch (error) {
228208
logger.error('Error flushing subblock update:', error)
229209
pending.opToSocket.forEach((socketId, opId) => {
230-
const sock = io.sockets.sockets.get(socketId)
231-
if (sock) {
232-
sock.emit('operation-failed', {
233-
operationId: opId,
234-
error: error instanceof Error ? error.message : 'Unknown error',
235-
retryable: true,
236-
})
237-
}
210+
io.to(socketId).emit('operation-failed', {
211+
operationId: opId,
212+
error: error instanceof Error ? error.message : 'Unknown error',
213+
retryable: true,
214+
})
238215
})
239216
}
240217
}

apps/sim/socket/handlers/variables.ts

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,11 @@ async function flushVariableUpdate(
127127

128128
if (workflowExists.length === 0) {
129129
pending.opToSocket.forEach((socketId, opId) => {
130-
const sock = io.sockets.sockets.get(socketId)
131-
if (sock) {
132-
sock.emit('operation-failed', {
133-
operationId: opId,
134-
error: 'Workflow not found',
135-
retryable: false,
136-
})
137-
}
130+
io.to(socketId).emit('operation-failed', {
131+
operationId: opId,
132+
error: 'Workflow not found',
133+
retryable: false,
134+
})
138135
})
139136
return
140137
}
@@ -170,69 +167,50 @@ async function flushVariableUpdate(
170167
})
171168

172169
if (updateSuccessful) {
173-
// Collect all sender socket IDs to exclude from broadcast
170+
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
174171
const senderSocketIds = [...pending.opToSocket.values()]
175-
const firstSenderSocket =
176-
senderSocketIds.length > 0 ? io.sockets.sockets.get(senderSocketIds[0]) : null
177-
178-
if (firstSenderSocket) {
179-
// socket.to(room).emit() excludes sender and broadcasts across all pods via Redis adapter
180-
firstSenderSocket.to(workflowId).emit('variable-update', {
181-
variableId,
182-
field,
183-
value,
184-
timestamp,
185-
})
186-
} else if (senderSocketIds.length > 0) {
187-
// Senders disconnected but we should still exclude them in case they reconnected
188-
// Use io.except() to exclude all sender socket IDs
172+
if (senderSocketIds.length > 0) {
189173
io.to(workflowId).except(senderSocketIds).emit('variable-update', {
190174
variableId,
191175
field,
192176
value,
193177
timestamp,
194178
})
195179
} else {
196-
// No senders tracked (edge case) - broadcast to all
197-
roomManager.emitToWorkflow(workflowId, 'variable-update', {
180+
io.to(workflowId).emit('variable-update', {
198181
variableId,
199182
field,
200183
value,
201184
timestamp,
202185
})
203186
}
204187

188+
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)
205189
pending.opToSocket.forEach((socketId, opId) => {
206-
const sock = io.sockets.sockets.get(socketId)
207-
if (sock) {
208-
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
209-
}
190+
io.to(socketId).emit('operation-confirmed', {
191+
operationId: opId,
192+
serverTimestamp: Date.now(),
193+
})
210194
})
211195

212196
logger.debug(`Flushed variable update ${workflowId}: ${variableId}.${field}`)
213197
} else {
214198
pending.opToSocket.forEach((socketId, opId) => {
215-
const sock = io.sockets.sockets.get(socketId)
216-
if (sock) {
217-
sock.emit('operation-failed', {
218-
operationId: opId,
219-
error: 'Variable no longer exists',
220-
retryable: false,
221-
})
222-
}
199+
io.to(socketId).emit('operation-failed', {
200+
operationId: opId,
201+
error: 'Variable no longer exists',
202+
retryable: false,
203+
})
223204
})
224205
}
225206
} catch (error) {
226207
logger.error('Error flushing variable update:', error)
227208
pending.opToSocket.forEach((socketId, opId) => {
228-
const sock = io.sockets.sockets.get(socketId)
229-
if (sock) {
230-
sock.emit('operation-failed', {
231-
operationId: opId,
232-
error: error instanceof Error ? error.message : 'Unknown error',
233-
retryable: true,
234-
})
235-
}
209+
io.to(socketId).emit('operation-failed', {
210+
operationId: opId,
211+
error: error instanceof Error ? error.message : 'Unknown error',
212+
retryable: true,
213+
})
236214
})
237215
}
238216
}

0 commit comments

Comments
 (0)