Skip to content

Commit fc1ca1e

Browse files
committed
improvment(sockets): migrate to redis
1 parent 2b026de commit fc1ca1e

File tree

24 files changed

+1527
-786
lines changed

24 files changed

+1527
-786
lines changed

apps/sim/app/api/auth/socket-token/route.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,22 @@ export async function POST() {
1414
headers: hdrs,
1515
})
1616

17-
if (!response) {
18-
return NextResponse.json({ error: 'Failed to generate token' }, { status: 500 })
17+
if (!response?.token) {
18+
// No token usually means invalid/expired session
19+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
1920
}
2021

2122
return NextResponse.json({ token: response.token })
2223
} catch (error) {
24+
// Check if it's an auth-related error
25+
const errorMessage = error instanceof Error ? error.message : String(error)
26+
if (
27+
errorMessage.includes('session') ||
28+
errorMessage.includes('unauthorized') ||
29+
errorMessage.includes('unauthenticated')
30+
) {
31+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
32+
}
2333
return NextResponse.json({ error: 'Failed to generate token' }, { status: 500 })
2434
}
2535
}

apps/sim/app/workspace/providers/socket-provider.tsx

Lines changed: 90 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@ import { getEnv } from '@/lib/core/config/env'
1717

1818
const logger = createLogger('SocketContext')
1919

20+
const TAB_SESSION_ID_KEY = 'sim_tab_session_id'
21+
22+
function getTabSessionId(): string {
23+
if (typeof window === 'undefined') return ''
24+
25+
let tabSessionId = sessionStorage.getItem(TAB_SESSION_ID_KEY)
26+
if (!tabSessionId) {
27+
tabSessionId = crypto.randomUUID()
28+
sessionStorage.setItem(TAB_SESSION_ID_KEY, tabSessionId)
29+
}
30+
return tabSessionId
31+
}
32+
2033
interface User {
2134
id: string
2235
name?: string
@@ -36,11 +49,13 @@ interface SocketContextType {
3649
socket: Socket | null
3750
isConnected: boolean
3851
isConnecting: boolean
52+
authFailed: boolean
3953
currentWorkflowId: string | null
4054
currentSocketId: string | null
4155
presenceUsers: PresenceUser[]
4256
joinWorkflow: (workflowId: string) => void
4357
leaveWorkflow: () => void
58+
retryConnection: () => void
4459
emitWorkflowOperation: (
4560
operation: string,
4661
target: string,
@@ -63,8 +78,6 @@ interface SocketContextType {
6378

6479
onCursorUpdate: (handler: (data: any) => void) => void
6580
onSelectionUpdate: (handler: (data: any) => void) => void
66-
onUserJoined: (handler: (data: any) => void) => void
67-
onUserLeft: (handler: (data: any) => void) => void
6881
onWorkflowDeleted: (handler: (data: any) => void) => void
6982
onWorkflowReverted: (handler: (data: any) => void) => void
7083
onOperationConfirmed: (handler: (data: any) => void) => void
@@ -75,11 +88,13 @@ const SocketContext = createContext<SocketContextType>({
7588
socket: null,
7689
isConnected: false,
7790
isConnecting: false,
91+
authFailed: false,
7892
currentWorkflowId: null,
7993
currentSocketId: null,
8094
presenceUsers: [],
8195
joinWorkflow: () => {},
8296
leaveWorkflow: () => {},
97+
retryConnection: () => {},
8398
emitWorkflowOperation: () => {},
8499
emitSubblockUpdate: () => {},
85100
emitVariableUpdate: () => {},
@@ -90,8 +105,6 @@ const SocketContext = createContext<SocketContextType>({
90105
onVariableUpdate: () => {},
91106
onCursorUpdate: () => {},
92107
onSelectionUpdate: () => {},
93-
onUserJoined: () => {},
94-
onUserLeft: () => {},
95108
onWorkflowDeleted: () => {},
96109
onWorkflowReverted: () => {},
97110
onOperationConfirmed: () => {},
@@ -112,33 +125,42 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
112125
const [currentWorkflowId, setCurrentWorkflowId] = useState<string | null>(null)
113126
const [currentSocketId, setCurrentSocketId] = useState<string | null>(null)
114127
const [presenceUsers, setPresenceUsers] = useState<PresenceUser[]>([])
128+
const [authFailed, setAuthFailed] = useState(false)
115129
const initializedRef = useRef(false)
116130

117131
const params = useParams()
118132
const urlWorkflowId = params?.workflowId as string | undefined
133+
const urlWorkflowIdRef = useRef(urlWorkflowId)
134+
urlWorkflowIdRef.current = urlWorkflowId
119135

120136
const eventHandlers = useRef<{
121137
workflowOperation?: (data: any) => void
122138
subblockUpdate?: (data: any) => void
123139
variableUpdate?: (data: any) => void
124-
125140
cursorUpdate?: (data: any) => void
126141
selectionUpdate?: (data: any) => void
127-
userJoined?: (data: any) => void
128-
userLeft?: (data: any) => void
129142
workflowDeleted?: (data: any) => void
130143
workflowReverted?: (data: any) => void
131144
operationConfirmed?: (data: any) => void
132145
operationFailed?: (data: any) => void
133146
}>({})
134147

148+
const positionUpdateTimeouts = useRef<Map<string, number>>(new Map())
149+
const pendingPositionUpdates = useRef<Map<string, any>>(new Map())
150+
135151
const generateSocketToken = async (): Promise<string> => {
136152
const res = await fetch('/api/auth/socket-token', {
137153
method: 'POST',
138154
credentials: 'include',
139155
headers: { 'cache-control': 'no-store' },
140156
})
141-
if (!res.ok) throw new Error('Failed to generate socket token')
157+
if (!res.ok) {
158+
// 401/403 indicates session expiry - don't keep retrying
159+
if (res.status === 401 || res.status === 403) {
160+
throw new Error('Authentication required')
161+
}
162+
throw new Error('Failed to generate socket token')
163+
}
142164
const body = await res.json().catch(() => ({}))
143165
const token = body?.token
144166
if (!token || typeof token !== 'string') throw new Error('Invalid socket token')
@@ -148,6 +170,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
148170
useEffect(() => {
149171
if (!user?.id) return
150172

173+
if (authFailed) {
174+
logger.info('Socket initialization skipped - auth failed, waiting for retry')
175+
return
176+
}
177+
151178
if (initializedRef.current || socket || isConnecting) {
152179
logger.info('Socket already exists or is connecting, skipping initialization')
153180
return
@@ -194,26 +221,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
194221
connected: socketInstance.connected,
195222
transport: socketInstance.io.engine?.transport?.name,
196223
})
197-
198-
if (urlWorkflowId) {
199-
logger.info(`Joining workflow room after connection: ${urlWorkflowId}`)
200-
socketInstance.emit('join-workflow', {
201-
workflowId: urlWorkflowId,
202-
})
203-
setCurrentWorkflowId(urlWorkflowId)
204-
}
224+
// Note: join-workflow is handled by the useEffect watching isConnected
205225
})
206226

207227
socketInstance.on('disconnect', (reason) => {
208228
setIsConnected(false)
209229
setIsConnecting(false)
210230
setCurrentSocketId(null)
231+
setCurrentWorkflowId(null)
232+
setPresenceUsers([])
211233

212234
logger.info('Socket disconnected', {
213235
reason,
214236
})
215-
216-
setPresenceUsers([])
217237
})
218238

219239
socketInstance.on('connect_error', (error: any) => {
@@ -226,24 +246,34 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
226246
transport: error.transport,
227247
})
228248

229-
if (
249+
// Check if this is an authentication failure
250+
const isAuthError =
230251
error.message?.includes('Token validation failed') ||
231252
error.message?.includes('Authentication failed') ||
232253
error.message?.includes('Authentication required')
233-
) {
254+
255+
if (isAuthError) {
234256
logger.warn(
235-
'Authentication failed - this could indicate session expiry or token generation issues'
257+
'Authentication failed - stopping reconnection attempts. User may need to refresh/re-login.'
236258
)
259+
// Stop reconnection attempts to prevent infinite loop
260+
socketInstance.disconnect()
261+
// Reset state to allow re-initialization when session is restored
262+
setSocket(null)
263+
setAuthFailed(true)
264+
initializedRef.current = false
237265
}
238266
})
239267

240268
socketInstance.on('reconnect', (attemptNumber) => {
269+
setIsConnected(true)
241270
setCurrentSocketId(socketInstance.id ?? null)
242271
logger.info('Socket reconnected successfully', {
243272
attemptNumber,
244273
socketId: socketInstance.id,
245274
transport: socketInstance.io.engine?.transport?.name,
246275
})
276+
// Note: join-workflow is handled by the useEffect watching isConnected
247277
})
248278

249279
socketInstance.on('reconnect_attempt', (attemptNumber) => {
@@ -284,6 +314,15 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
284314
})
285315
})
286316

317+
// Handle join workflow success - confirms room membership with presence list
318+
socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => {
319+
setCurrentWorkflowId(workflowId)
320+
setPresenceUsers(presenceUsers || [])
321+
logger.info(`Successfully joined workflow room: ${workflowId}`, {
322+
presenceCount: presenceUsers?.length || 0,
323+
})
324+
})
325+
287326
socketInstance.on('workflow-operation', (data) => {
288327
eventHandlers.current.workflowOperation?.(data)
289328
})
@@ -298,10 +337,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
298337

299338
socketInstance.on('workflow-deleted', (data) => {
300339
logger.warn(`Workflow ${data.workflowId} has been deleted`)
301-
if (currentWorkflowId === data.workflowId) {
302-
setCurrentWorkflowId(null)
303-
setPresenceUsers([])
304-
}
340+
setCurrentWorkflowId((current) => {
341+
if (current === data.workflowId) {
342+
setPresenceUsers([])
343+
return null
344+
}
345+
return current
346+
})
305347
eventHandlers.current.workflowDeleted?.(data)
306348
})
307349

@@ -446,10 +488,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
446488
logger.warn('Operation forbidden:', error)
447489
})
448490

449-
socketInstance.on('operation-confirmed', (data) => {
450-
logger.debug('Operation confirmed:', data)
451-
})
452-
453491
socketInstance.on('workflow-state', async (workflowData) => {
454492
logger.info('Received workflow state from server')
455493

@@ -478,11 +516,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
478516
positionUpdateTimeouts.current.clear()
479517
pendingPositionUpdates.current.clear()
480518
}
481-
}, [user?.id])
519+
}, [user?.id, authFailed])
482520

483521
useEffect(() => {
484522
if (!socket || !isConnected || !urlWorkflowId) return
485523

524+
// Skip if already in the correct room
486525
if (currentWorkflowId === urlWorkflowId) return
487526

488527
logger.info(
@@ -497,19 +536,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
497536
logger.info(`Joining workflow room: ${urlWorkflowId}`)
498537
socket.emit('join-workflow', {
499538
workflowId: urlWorkflowId,
539+
tabSessionId: getTabSessionId(),
500540
})
501-
setCurrentWorkflowId(urlWorkflowId)
502541
}, [socket, isConnected, urlWorkflowId, currentWorkflowId])
503542

504-
useEffect(() => {
505-
return () => {
506-
if (socket) {
507-
logger.info('Cleaning up socket connection on unmount')
508-
socket.disconnect()
509-
}
510-
}
511-
}, [])
512-
513543
const joinWorkflow = useCallback(
514544
(workflowId: string) => {
515545
if (!socket || !user?.id) {
@@ -530,8 +560,9 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
530560
logger.info(`Joining workflow: ${workflowId}`)
531561
socket.emit('join-workflow', {
532562
workflowId,
563+
tabSessionId: getTabSessionId(),
533564
})
534-
setCurrentWorkflowId(workflowId)
565+
// currentWorkflowId will be set by join-workflow-success handler
535566
},
536567
[socket, user, currentWorkflowId]
537568
)
@@ -555,8 +586,20 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
555586
}
556587
}, [socket, currentWorkflowId])
557588

558-
const positionUpdateTimeouts = useRef<Map<string, number>>(new Map())
559-
const pendingPositionUpdates = useRef<Map<string, any>>(new Map())
589+
/**
590+
* Retry socket connection after auth failure.
591+
* Call this when user has re-authenticated (e.g., after login redirect).
592+
*/
593+
const retryConnection = useCallback(() => {
594+
if (!authFailed) {
595+
logger.info('retryConnection called but no auth failure - ignoring')
596+
return
597+
}
598+
logger.info('Retrying socket connection after auth failure')
599+
setAuthFailed(false)
600+
// initializedRef.current was already reset in connect_error handler
601+
// Effect will re-run and attempt connection
602+
}, [authFailed])
560603

561604
const emitWorkflowOperation = useCallback(
562605
(operation: string, target: string, payload: any, operationId?: string) => {
@@ -716,14 +759,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
716759
eventHandlers.current.selectionUpdate = handler
717760
}, [])
718761

719-
const onUserJoined = useCallback((handler: (data: any) => void) => {
720-
eventHandlers.current.userJoined = handler
721-
}, [])
722-
723-
const onUserLeft = useCallback((handler: (data: any) => void) => {
724-
eventHandlers.current.userLeft = handler
725-
}, [])
726-
727762
const onWorkflowDeleted = useCallback((handler: (data: any) => void) => {
728763
eventHandlers.current.workflowDeleted = handler
729764
}, [])
@@ -745,11 +780,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
745780
socket,
746781
isConnected,
747782
isConnecting,
783+
authFailed,
748784
currentWorkflowId,
749785
currentSocketId,
750786
presenceUsers,
751787
joinWorkflow,
752788
leaveWorkflow,
789+
retryConnection,
753790
emitWorkflowOperation,
754791
emitSubblockUpdate,
755792
emitVariableUpdate,
@@ -760,8 +797,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
760797
onVariableUpdate,
761798
onCursorUpdate,
762799
onSelectionUpdate,
763-
onUserJoined,
764-
onUserLeft,
765800
onWorkflowDeleted,
766801
onWorkflowReverted,
767802
onOperationConfirmed,
@@ -771,11 +806,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
771806
socket,
772807
isConnected,
773808
isConnecting,
809+
authFailed,
774810
currentWorkflowId,
775811
currentSocketId,
776812
presenceUsers,
777813
joinWorkflow,
778814
leaveWorkflow,
815+
retryConnection,
779816
emitWorkflowOperation,
780817
emitSubblockUpdate,
781818
emitVariableUpdate,
@@ -786,8 +823,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
786823
onVariableUpdate,
787824
onCursorUpdate,
788825
onSelectionUpdate,
789-
onUserJoined,
790-
onUserLeft,
791826
onWorkflowDeleted,
792827
onWorkflowReverted,
793828
onOperationConfirmed,

0 commit comments

Comments
 (0)