diff --git a/apps/sim/app/api/files/delete/route.test.ts b/apps/sim/app/api/files/delete/route.test.ts index 977902d0be0..df3bad6170f 100644 --- a/apps/sim/app/api/files/delete/route.test.ts +++ b/apps/sim/app/api/files/delete/route.test.ts @@ -1,28 +1,25 @@ /** * @vitest-environment node */ -import { authMockFns, hybridAuthMockFns } from '@sim/testing' +import { + authMockFns, + hybridAuthMockFns, + storageServiceMock, + storageServiceMockFns, +} from '@sim/testing' import { beforeEach, describe, expect, it, vi } from 'vitest' const mocks = vi.hoisted(() => { const mockVerifyFileAccess = vi.fn() const mockVerifyWorkspaceFileAccess = vi.fn() - const mockDeleteFile = vi.fn() - const mockHasCloudStorage = vi.fn() const mockGetStorageProvider = vi.fn() const mockIsUsingCloudStorage = vi.fn() - const mockUploadFile = vi.fn() - const mockDownloadFile = vi.fn() return { mockVerifyFileAccess, mockVerifyWorkspaceFileAccess, - mockDeleteFile, - mockHasCloudStorage, mockGetStorageProvider, mockIsUsingCloudStorage, - mockUploadFile, - mockDownloadFile, } }) @@ -68,23 +65,18 @@ vi.mock('@/lib/uploads', () => ({ getStorageProvider: mocks.mockGetStorageProvider, isUsingCloudStorage: mocks.mockIsUsingCloudStorage, StorageService: { - uploadFile: mocks.mockUploadFile, - downloadFile: mocks.mockDownloadFile, - deleteFile: mocks.mockDeleteFile, - hasCloudStorage: mocks.mockHasCloudStorage, + uploadFile: storageServiceMockFns.mockUploadFile, + downloadFile: storageServiceMockFns.mockDownloadFile, + deleteFile: storageServiceMockFns.mockDeleteFile, + hasCloudStorage: storageServiceMockFns.mockHasCloudStorage, }, - uploadFile: mocks.mockUploadFile, - downloadFile: mocks.mockDownloadFile, - deleteFile: mocks.mockDeleteFile, - hasCloudStorage: mocks.mockHasCloudStorage, + uploadFile: storageServiceMockFns.mockUploadFile, + downloadFile: storageServiceMockFns.mockDownloadFile, + deleteFile: storageServiceMockFns.mockDeleteFile, + hasCloudStorage: storageServiceMockFns.mockHasCloudStorage, })) -vi.mock('@/lib/uploads/core/storage-service', () => ({ - uploadFile: mocks.mockUploadFile, - downloadFile: mocks.mockDownloadFile, - deleteFile: mocks.mockDeleteFile, - hasCloudStorage: mocks.mockHasCloudStorage, -})) +vi.mock('@/lib/uploads/core/storage-service', () => storageServiceMock) vi.mock('@/lib/uploads/server/metadata', () => ({ deleteFileMetadata: vi.fn().mockResolvedValue(undefined), @@ -117,14 +109,14 @@ describe('File Delete API Route', () => { }) mocks.mockVerifyFileAccess.mockResolvedValue(true) mocks.mockVerifyWorkspaceFileAccess.mockResolvedValue(true) - mocks.mockDeleteFile.mockResolvedValue(undefined) - mocks.mockHasCloudStorage.mockReturnValue(true) + storageServiceMockFns.mockDeleteFile.mockResolvedValue(undefined) + storageServiceMockFns.mockHasCloudStorage.mockReturnValue(true) mocks.mockGetStorageProvider.mockReturnValue('s3') mocks.mockIsUsingCloudStorage.mockReturnValue(true) }) it('should handle local file deletion successfully', async () => { - mocks.mockHasCloudStorage.mockReturnValue(false) + storageServiceMockFns.mockHasCloudStorage.mockReturnValue(false) mocks.mockGetStorageProvider.mockReturnValue('local') mocks.mockIsUsingCloudStorage.mockReturnValue(false) @@ -142,7 +134,7 @@ describe('File Delete API Route', () => { }) it('should handle file not found gracefully', async () => { - mocks.mockHasCloudStorage.mockReturnValue(false) + storageServiceMockFns.mockHasCloudStorage.mockReturnValue(false) mocks.mockGetStorageProvider.mockReturnValue('local') mocks.mockIsUsingCloudStorage.mockReturnValue(false) @@ -170,7 +162,7 @@ describe('File Delete API Route', () => { expect(data).toHaveProperty('success', true) expect(data).toHaveProperty('message', 'File deleted successfully') - expect(mocks.mockDeleteFile).toHaveBeenCalledWith({ + expect(storageServiceMockFns.mockDeleteFile).toHaveBeenCalledWith({ key: 'workspace/test-workspace-id/1234567890-test-file.txt', context: 'workspace', }) @@ -190,7 +182,7 @@ describe('File Delete API Route', () => { expect(data).toHaveProperty('success', true) expect(data).toHaveProperty('message', 'File deleted successfully') - expect(mocks.mockDeleteFile).toHaveBeenCalledWith({ + expect(storageServiceMockFns.mockDeleteFile).toHaveBeenCalledWith({ key: 'workspace/test-workspace-id/1234567890-test-document.pdf', context: 'workspace', }) diff --git a/apps/sim/app/api/files/multipart/route.test.ts b/apps/sim/app/api/files/multipart/route.test.ts new file mode 100644 index 00000000000..b70fed81b82 --- /dev/null +++ b/apps/sim/app/api/files/multipart/route.test.ts @@ -0,0 +1,202 @@ +/** + * @vitest-environment node + */ +import { authMockFns, permissionsMock, permissionsMockFns } from '@sim/testing' +import { NextRequest } from 'next/server' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { + mockIsUsingCloudStorage, + mockGetStorageProvider, + mockGetStorageConfig, + mockCompleteS3MultipartUpload, + mockCompleteBlobMultipartUpload, + mockDeriveBlobBlockId, + mockVerifyUploadToken, + mockSignUploadToken, +} = vi.hoisted(() => ({ + mockIsUsingCloudStorage: vi.fn(), + mockGetStorageProvider: vi.fn(), + mockGetStorageConfig: vi.fn(), + mockCompleteS3MultipartUpload: vi.fn(), + mockCompleteBlobMultipartUpload: vi.fn(), + mockDeriveBlobBlockId: vi.fn(), + mockVerifyUploadToken: vi.fn(), + mockSignUploadToken: vi.fn(), +})) + +vi.mock('@/lib/uploads', () => ({ + isUsingCloudStorage: mockIsUsingCloudStorage, + getStorageProvider: mockGetStorageProvider, + getStorageConfig: mockGetStorageConfig, +})) + +vi.mock('@/lib/uploads/core/upload-token', () => ({ + signUploadToken: mockSignUploadToken, + verifyUploadToken: mockVerifyUploadToken, +})) + +vi.mock('@/lib/uploads/providers/s3/client', () => ({ + completeS3MultipartUpload: mockCompleteS3MultipartUpload, + initiateS3MultipartUpload: vi.fn(), + getS3MultipartPartUrls: vi.fn(), + abortS3MultipartUpload: vi.fn(), +})) + +vi.mock('@/lib/uploads/providers/blob/client', () => ({ + completeMultipartUpload: mockCompleteBlobMultipartUpload, + deriveBlobBlockId: mockDeriveBlobBlockId, + initiateMultipartUpload: vi.fn(), + getMultipartPartUrls: vi.fn(), + abortMultipartUpload: vi.fn(), +})) + +vi.mock('@/lib/workspaces/permissions/utils', () => permissionsMock) + +import { POST } from '@/app/api/files/multipart/route' + +const tokenPayload = { + uploadId: 'upload-1', + key: 'workspace/ws-1/123-abc-file.bin', + userId: 'user-1', + workspaceId: 'ws-1', + context: 'workspace' as const, +} + +const makeRequest = (action: string, body: unknown) => + new NextRequest(`http://localhost/api/files/multipart?action=${action}`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }) + +describe('POST /api/files/multipart action=complete', () => { + beforeEach(() => { + vi.clearAllMocks() + authMockFns.mockGetSession.mockResolvedValue({ user: { id: 'user-1' } }) + permissionsMockFns.mockGetUserEntityPermissions.mockResolvedValue('write') + mockIsUsingCloudStorage.mockReturnValue(true) + mockGetStorageConfig.mockReturnValue({ bucket: 'b', region: 'r' }) + mockVerifyUploadToken.mockReturnValue({ valid: true, payload: tokenPayload }) + mockSignUploadToken.mockReturnValue('signed-token') + mockCompleteS3MultipartUpload.mockResolvedValue({ + location: 'loc', + path: '/api/files/serve/...', + key: tokenPayload.key, + }) + mockCompleteBlobMultipartUpload.mockResolvedValue({ + location: 'loc', + path: '/api/files/serve/...', + key: tokenPayload.key, + }) + mockDeriveBlobBlockId.mockImplementation( + (n: number) => `block-${n.toString().padStart(6, '0')}` + ) + }) + + it('rejects parts without partNumber', async () => { + mockGetStorageProvider.mockReturnValue('s3') + const res = await POST( + makeRequest('complete', { + uploadToken: 'tok', + parts: [{ etag: 'abc' }], + }) + ) + expect(res.status).toBe(400) + expect(mockCompleteS3MultipartUpload).not.toHaveBeenCalled() + }) + + it('S3 path requires etag and forwards { ETag, PartNumber }', async () => { + mockGetStorageProvider.mockReturnValue('s3') + + const missingEtag = await POST( + makeRequest('complete', { + uploadToken: 'tok', + parts: [{ partNumber: 1 }], + }) + ) + expect(missingEtag.status).toBe(500) + + mockCompleteS3MultipartUpload.mockClear() + + const ok = await POST( + makeRequest('complete', { + uploadToken: 'tok', + parts: [ + { partNumber: 1, etag: 'aaa' }, + { partNumber: 2, etag: 'bbb' }, + ], + }) + ) + expect(ok.status).toBe(200) + expect(mockCompleteS3MultipartUpload).toHaveBeenCalledWith( + tokenPayload.key, + tokenPayload.uploadId, + [ + { ETag: 'aaa', PartNumber: 1 }, + { ETag: 'bbb', PartNumber: 2 }, + ], + expect.any(Object) + ) + }) + + it('Blob path derives blockId from partNumber and ignores etag', async () => { + mockGetStorageProvider.mockReturnValue('blob') + mockGetStorageConfig.mockReturnValue({ + containerName: 'c', + accountName: 'a', + accountKey: 'k', + }) + + const res = await POST( + makeRequest('complete', { + uploadToken: 'tok', + parts: [{ partNumber: 1, etag: 'irrelevant' }, { partNumber: 2 }], + }) + ) + + expect(res.status).toBe(200) + expect(mockDeriveBlobBlockId).toHaveBeenCalledWith(1) + expect(mockDeriveBlobBlockId).toHaveBeenCalledWith(2) + expect(mockCompleteBlobMultipartUpload).toHaveBeenCalledWith( + tokenPayload.key, + [ + { partNumber: 1, blockId: 'block-000001' }, + { partNumber: 2, blockId: 'block-000002' }, + ], + expect.objectContaining({ containerName: 'c' }) + ) + }) + + it('returns 403 when token is invalid', async () => { + mockGetStorageProvider.mockReturnValue('s3') + mockVerifyUploadToken.mockReturnValueOnce({ valid: false }) + const res = await POST( + makeRequest('complete', { + uploadToken: 'bad', + parts: [{ partNumber: 1, etag: 'a' }], + }) + ) + expect(res.status).toBe(403) + }) + + it('batch complete normalizes per upload', async () => { + mockGetStorageProvider.mockReturnValue('s3') + const res = await POST( + makeRequest('complete', { + uploads: [ + { + uploadToken: 'tok-a', + parts: [{ partNumber: 1, etag: 'aaa' }], + }, + { + uploadToken: 'tok-b', + parts: [{ partNumber: 1, etag: 'bbb' }], + }, + ], + }) + ) + expect(res.status).toBe(200) + expect(mockCompleteS3MultipartUpload).toHaveBeenCalledTimes(2) + }) +}) diff --git a/apps/sim/app/api/files/multipart/route.ts b/apps/sim/app/api/files/multipart/route.ts index 0b9ddf07c3d..80213d54cbb 100644 --- a/apps/sim/app/api/files/multipart/route.ts +++ b/apps/sim/app/api/files/multipart/route.ts @@ -22,6 +22,7 @@ import { type UploadTokenPayload, verifyUploadToken, } from '@/lib/uploads/core/upload-token' +import type { StorageConfig } from '@/lib/uploads/shared/types' import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' const logger = createLogger('MultipartUploadAPI') @@ -39,6 +40,37 @@ const ALLOWED_UPLOAD_CONTEXTS = new Set([ 'workspace-logos', ]) +/** + * Unified part identity sent by the client when completing a multipart upload. + * `etag` is required for S3 (CompleteMultipartUpload). For Azure the server + * derives the block id from `partNumber` via {@link deriveBlobBlockId}. + */ +interface ClientCompletedPart { + partNumber: number + etag?: string +} + +const isClientCompletedParts = (value: unknown): value is ClientCompletedPart[] => + Array.isArray(value) && + value.every( + (p) => + p !== null && + typeof p === 'object' && + typeof (p as ClientCompletedPart).partNumber === 'number' && + ((p as ClientCompletedPart).etag === undefined || + typeof (p as ClientCompletedPart).etag === 'string') + ) + +const buildS3CustomConfig = (config: StorageConfig) => + config.bucket && config.region ? { bucket: config.bucket, region: config.region } : undefined + +const buildBlobCustomConfig = (config: StorageConfig) => ({ + containerName: config.containerName!, + accountName: config.accountName!, + accountKey: config.accountKey, + connectionString: config.connectionString, +}) + const verifyTokenForUser = (token: string | undefined, userId: string) => { if (!token || typeof token !== 'string') { return null @@ -103,12 +135,44 @@ export const POST = withRouteHandler(async (request: NextRequest) => { const config = getStorageConfig(storageContext) + let customKey: string | undefined + if (context === 'workspace') { + const { MAX_WORKSPACE_FILE_SIZE } = await import('@/lib/uploads/shared/types') + if (typeof fileSize === 'number' && fileSize > MAX_WORKSPACE_FILE_SIZE) { + return NextResponse.json( + { error: `File size exceeds maximum of ${MAX_WORKSPACE_FILE_SIZE} bytes` }, + { status: 413 } + ) + } + + const { generateWorkspaceFileKey } = await import( + '@/lib/uploads/contexts/workspace/workspace-file-manager' + ) + customKey = generateWorkspaceFileKey(workspaceId, fileName) + + const { checkStorageQuota } = await import('@/lib/billing/storage') + const quotaCheck = await checkStorageQuota(userId, fileSize) + if (!quotaCheck.allowed) { + return NextResponse.json( + { error: quotaCheck.error || 'Storage limit exceeded' }, + { status: 413 } + ) + } + } + let uploadId: string let key: string if (storageProvider === 's3') { const { initiateS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client') - const result = await initiateS3MultipartUpload({ fileName, contentType, fileSize }) + const result = await initiateS3MultipartUpload({ + fileName, + contentType, + fileSize, + customConfig: buildS3CustomConfig(config), + customKey, + purpose: context, + }) uploadId = result.uploadId key = result.key } else if (storageProvider === 'blob') { @@ -117,12 +181,8 @@ export const POST = withRouteHandler(async (request: NextRequest) => { fileName, contentType, fileSize, - customConfig: { - containerName: config.containerName!, - accountName: config.accountName!, - accountKey: config.accountKey, - connectionString: config.connectionString, - }, + customConfig: buildBlobCustomConfig(config), + customKey, }) uploadId = result.uploadId key = result.key @@ -173,17 +233,21 @@ export const POST = withRouteHandler(async (request: NextRequest) => { if (storageProvider === 's3') { const { getS3MultipartPartUrls } = await import('@/lib/uploads/providers/s3/client') - const presignedUrls = await getS3MultipartPartUrls(key, uploadId, partNumbers) + const presignedUrls = await getS3MultipartPartUrls( + key, + uploadId, + partNumbers, + buildS3CustomConfig(config) + ) return NextResponse.json({ presignedUrls }) } if (storageProvider === 'blob') { const { getMultipartPartUrls } = await import('@/lib/uploads/providers/blob/client') - const presignedUrls = await getMultipartPartUrls(key, partNumbers, { - containerName: config.containerName!, - accountName: config.accountName!, - accountKey: config.accountKey, - connectionString: config.connectionString, - }) + const presignedUrls = await getMultipartPartUrls( + key, + partNumbers, + buildBlobCustomConfig(config) + ) return NextResponse.json({ presignedUrls }) } @@ -207,60 +271,83 @@ export const POST = withRouteHandler(async (request: NextRequest) => { const data: CompleteMultipartBody = parsed.data.body - if ('uploads' in data && Array.isArray(data.uploads)) { - const verified = data.uploads.map((upload) => { - const payload = verifyTokenForUser(upload.uploadToken, userId) - return payload ? { payload, parts: upload.parts } : null - }) + const s3Module = + storageProvider === 's3' ? await import('@/lib/uploads/providers/s3/client') : null + const blobModule = + storageProvider === 'blob' ? await import('@/lib/uploads/providers/blob/client') : null + + const completeOne = async (payload: UploadTokenPayload, parts: ClientCompletedPart[]) => { + const { uploadId, key, context } = payload + const config = getStorageConfig(context) - if (verified.some((entry) => entry === null)) { - return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 }) + if (storageProvider === 's3' && s3Module) { + const { completeS3MultipartUpload } = s3Module + const s3Parts = parts.map((p) => { + if (!p.etag) { + throw new Error(`Missing etag for S3 part ${p.partNumber}`) + } + return { ETag: p.etag, PartNumber: p.partNumber } + }) + const result = await completeS3MultipartUpload( + key, + uploadId, + s3Parts, + buildS3CustomConfig(config) + ) + return { + success: true as const, + location: result.location, + path: result.path, + key: result.key, + } } - const verifiedEntries = verified.filter( - (entry): entry is { payload: UploadTokenPayload; parts: unknown } => entry !== null - ) + if (storageProvider === 'blob' && blobModule) { + const { completeMultipartUpload, deriveBlobBlockId } = blobModule + const blobParts = parts.map((p) => ({ + partNumber: p.partNumber, + blockId: deriveBlobBlockId(p.partNumber), + })) + const result = await completeMultipartUpload( + key, + blobParts, + buildBlobCustomConfig(config) + ) + return { + success: true as const, + location: result.location, + path: result.path, + key: result.key, + } + } - const results = await Promise.all( - verifiedEntries.map(async ({ payload, parts }) => { - const { uploadId, key, context } = payload - const config = getStorageConfig(context) - - if (storageProvider === 's3') { - const { completeS3MultipartUpload } = await import( - '@/lib/uploads/providers/s3/client' - ) - const result = await completeS3MultipartUpload(key, uploadId, parts as any) - return { - success: true, - location: result.location, - path: result.path, - key: result.key, - } - } - if (storageProvider === 'blob') { - const { completeMultipartUpload } = await import( - '@/lib/uploads/providers/blob/client' - ) - const result = await completeMultipartUpload(key, parts as any, { - containerName: config.containerName!, - accountName: config.accountName!, - accountKey: config.accountKey, - connectionString: config.connectionString, - }) - return { - success: true, - location: result.location, - path: result.path, - key: result.key, - } - } + throw new Error(`Unsupported storage provider: ${storageProvider}`) + } - throw new Error(`Unsupported storage provider: ${storageProvider}`) - }) + if ('uploads' in data && Array.isArray(data.uploads)) { + const verified: Array<{ payload: UploadTokenPayload; parts: ClientCompletedPart[] }> = [] + for (const upload of data.uploads) { + const payload = verifyTokenForUser(upload.uploadToken, userId) + if (!payload) { + return NextResponse.json( + { error: 'Invalid or expired upload token' }, + { status: 403 } + ) + } + if (!isClientCompletedParts(upload.parts)) { + return NextResponse.json( + { error: 'Invalid parts payload: expected [{ partNumber, etag? }]' }, + { status: 400 } + ) + } + verified.push({ payload, parts: upload.parts }) + } + + const results = await Promise.all( + verified.map(({ payload, parts }) => completeOne(payload, parts)) ) - logger.info(`Completed ${verifiedEntries.length} multipart uploads`) + logger.info(`Completed ${verified.length} multipart uploads`) return NextResponse.json({ results }) } @@ -269,42 +356,18 @@ export const POST = withRouteHandler(async (request: NextRequest) => { if (!tokenPayload) { return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 }) } - - const { uploadId, key, context } = tokenPayload - const config = getStorageConfig(context) - - if (storageProvider === 's3') { - const { completeS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client') - const result = await completeS3MultipartUpload(key, uploadId, single.parts as any) - logger.info(`Completed S3 multipart upload for key ${key} (context: ${context})`) - return NextResponse.json({ - success: true, - location: result.location, - path: result.path, - key: result.key, - }) - } - if (storageProvider === 'blob') { - const { completeMultipartUpload } = await import('@/lib/uploads/providers/blob/client') - const result = await completeMultipartUpload(key, single.parts as any, { - containerName: config.containerName!, - accountName: config.accountName!, - accountKey: config.accountKey, - connectionString: config.connectionString, - }) - logger.info(`Completed Azure multipart upload for key ${key} (context: ${context})`) - return NextResponse.json({ - success: true, - location: result.location, - path: result.path, - key: result.key, - }) + if (!isClientCompletedParts(single.parts)) { + return NextResponse.json( + { error: 'Invalid parts payload: expected [{ partNumber, etag? }]' }, + { status: 400 } + ) } - return NextResponse.json( - { error: `Unsupported storage provider: ${storageProvider}` }, - { status: 400 } + const result = await completeOne(tokenPayload, single.parts) + logger.info( + `Completed ${storageProvider} multipart upload for key ${tokenPayload.key} (context: ${tokenPayload.context})` ) + return NextResponse.json(result) } case 'abort': { @@ -330,16 +393,11 @@ export const POST = withRouteHandler(async (request: NextRequest) => { if (storageProvider === 's3') { const { abortS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client') - await abortS3MultipartUpload(key, uploadId) + await abortS3MultipartUpload(key, uploadId, buildS3CustomConfig(config)) logger.info(`Aborted S3 multipart upload for key ${key} (context: ${context})`) } else if (storageProvider === 'blob') { const { abortMultipartUpload } = await import('@/lib/uploads/providers/blob/client') - await abortMultipartUpload(key, { - containerName: config.containerName!, - accountName: config.accountName!, - accountKey: config.accountKey, - connectionString: config.connectionString, - }) + await abortMultipartUpload(key, buildBlobCustomConfig(config)) logger.info(`Aborted Azure multipart upload for key ${key} (context: ${context})`) } else { return NextResponse.json( diff --git a/apps/sim/app/api/files/parse/route.test.ts b/apps/sim/app/api/files/parse/route.test.ts index 8a2c06f19ff..e2c032b4718 100644 --- a/apps/sim/app/api/files/parse/route.test.ts +++ b/apps/sim/app/api/files/parse/route.test.ts @@ -10,6 +10,8 @@ import { inputValidationMock, permissionsMock, permissionsMockFns, + storageServiceMock, + storageServiceMockFns, } from '@sim/testing' import { NextRequest } from 'next/server' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' @@ -22,8 +24,6 @@ const { mockIsSupportedFileType, mockParseFile, mockParseBuffer, - mockDownloadFile, - mockHasCloudStorage, mockFsAccess, mockFsStat, mockFsReadFile, @@ -47,8 +47,6 @@ const { content: 'parsed buffer content', metadata: { pageCount: 1 }, }), - mockDownloadFile: vi.fn(), - mockHasCloudStorage: vi.fn().mockReturnValue(true), mockFsAccess: vi.fn().mockResolvedValue(undefined), mockFsStat: vi.fn().mockImplementation(() => ({ isFile: () => true })), mockFsReadFile: vi.fn().mockResolvedValue(Buffer.from('test file content')), @@ -79,10 +77,7 @@ vi.mock('@/lib/file-parsers', () => ({ parseBuffer: mockParseBuffer, })) -vi.mock('@/lib/uploads/core/storage-service', () => ({ - downloadFile: mockDownloadFile, - hasCloudStorage: mockHasCloudStorage, -})) +vi.mock('@/lib/uploads/core/storage-service', () => storageServiceMock) vi.mock('path', () => ({ default: actualPath, @@ -176,6 +171,7 @@ describe('File Parse API Route', () => { }) permissionsMockFns.mockGetUserEntityPermissions.mockResolvedValue({ canView: true }) + storageServiceMockFns.mockHasCloudStorage.mockReturnValue(true) mockIsSupportedFileType.mockReturnValue(true) mockParseFile.mockResolvedValue({ content: 'parsed content', @@ -325,8 +321,8 @@ describe('File Parse API Route', () => { authenticated: true, }) - mockDownloadFile.mockRejectedValue(new Error('Access denied')) - mockHasCloudStorage.mockReturnValue(true) + storageServiceMockFns.mockDownloadFile.mockRejectedValue(new Error('Access denied')) + storageServiceMockFns.mockHasCloudStorage.mockReturnValue(true) const req = new NextRequest('http://localhost:3000/api/files/parse', { method: 'POST', diff --git a/apps/sim/app/api/files/presigned/route.test.ts b/apps/sim/app/api/files/presigned/route.test.ts index f6641c07d9f..6ae6a10ed5e 100644 --- a/apps/sim/app/api/files/presigned/route.test.ts +++ b/apps/sim/app/api/files/presigned/route.test.ts @@ -4,7 +4,7 @@ * @vitest-environment node */ -import { authMockFns } from '@sim/testing' +import { authMockFns, storageServiceMock, storageServiceMockFns } from '@sim/testing' import { NextRequest } from 'next/server' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' @@ -16,9 +16,6 @@ const { mockGetStorageConfig, mockIsUsingCloudStorage, mockGetStorageProvider, - mockHasCloudStorage, - mockGeneratePresignedUploadUrl, - mockGeneratePresignedDownloadUrl, mockValidateFileType, mockGenerateCopilotUploadUrl, mockIsImageFileType, @@ -32,9 +29,6 @@ const { mockGetStorageConfig: vi.fn(), mockIsUsingCloudStorage: vi.fn(), mockGetStorageProvider: vi.fn(), - mockHasCloudStorage: vi.fn(), - mockGeneratePresignedUploadUrl: vi.fn(), - mockGeneratePresignedDownloadUrl: vi.fn().mockResolvedValue('https://example.com/presigned-url'), mockValidateFileType: vi.fn().mockReturnValue(null), mockGenerateCopilotUploadUrl: vi.fn().mockResolvedValue({ url: 'https://example.com/presigned-url', @@ -63,11 +57,7 @@ vi.mock('@/lib/uploads/config', () => ({ getStorageProvider: mockGetStorageProvider, })) -vi.mock('@/lib/uploads/core/storage-service', () => ({ - hasCloudStorage: mockHasCloudStorage, - generatePresignedUploadUrl: mockGeneratePresignedUploadUrl, - generatePresignedDownloadUrl: mockGeneratePresignedDownloadUrl, -})) +vi.mock('@/lib/uploads/core/storage-service', () => storageServiceMock) vi.mock('@/lib/uploads/utils/validation', () => ({ validateFileType: mockValidateFileType, @@ -132,8 +122,8 @@ function setupFileApiMocks( storageProvider === 'blob' ? 'Azure Blob' : storageProvider === 's3' ? 'S3' : 'Local' ) - mockHasCloudStorage.mockReturnValue(cloudEnabled) - mockGeneratePresignedUploadUrl.mockImplementation( + storageServiceMockFns.mockHasCloudStorage.mockReturnValue(cloudEnabled) + storageServiceMockFns.mockGeneratePresignedUploadUrl.mockImplementation( async (opts: { fileName: string; context: string }) => { const timestamp = Date.now() const safeFileName = opts.fileName.replace(/[^a-zA-Z0-9.-]/g, '_') @@ -144,7 +134,9 @@ function setupFileApiMocks( } } ) - mockGeneratePresignedDownloadUrl.mockResolvedValue('https://example.com/presigned-url') + storageServiceMockFns.mockGeneratePresignedDownloadUrl.mockResolvedValue( + 'https://example.com/presigned-url' + ) mockValidateFileType.mockReturnValue(null) @@ -431,7 +423,7 @@ describe('/api/files/presigned', () => { storageProvider: 's3', }) - mockGeneratePresignedUploadUrl.mockRejectedValue( + storageServiceMockFns.mockGeneratePresignedUploadUrl.mockRejectedValue( new Error('Unknown storage provider: unknown') ) @@ -458,7 +450,9 @@ describe('/api/files/presigned', () => { storageProvider: 's3', }) - mockGeneratePresignedUploadUrl.mockRejectedValue(new Error('S3 service unavailable')) + storageServiceMockFns.mockGeneratePresignedUploadUrl.mockRejectedValue( + new Error('S3 service unavailable') + ) const request = new NextRequest('http://localhost:3000/api/files/presigned?type=chat', { method: 'POST', @@ -483,7 +477,9 @@ describe('/api/files/presigned', () => { storageProvider: 'blob', }) - mockGeneratePresignedUploadUrl.mockRejectedValue(new Error('Azure service unavailable')) + storageServiceMockFns.mockGeneratePresignedUploadUrl.mockRejectedValue( + new Error('Azure service unavailable') + ) const request = new NextRequest('http://localhost:3000/api/files/presigned?type=chat', { method: 'POST', diff --git a/apps/sim/app/api/files/serve/[...path]/route.test.ts b/apps/sim/app/api/files/serve/[...path]/route.test.ts index 17b7a8d2fda..f0e7738c8d0 100644 --- a/apps/sim/app/api/files/serve/[...path]/route.test.ts +++ b/apps/sim/app/api/files/serve/[...path]/route.test.ts @@ -3,7 +3,7 @@ * * @vitest-environment node */ -import { hybridAuthMockFns } from '@sim/testing' +import { hybridAuthMockFns, storageServiceMock, storageServiceMockFns } from '@sim/testing' import { NextRequest } from 'next/server' import { beforeEach, describe, expect, it, vi } from 'vitest' @@ -11,7 +11,6 @@ const { mockVerifyFileAccess, mockReadFile, mockIsUsingCloudStorage, - mockDownloadFile, mockDownloadCopilotFile, mockInferContextFromKey, mockGetContentType, @@ -30,7 +29,6 @@ const { mockVerifyFileAccess: vi.fn(), mockReadFile: vi.fn(), mockIsUsingCloudStorage: vi.fn(), - mockDownloadFile: vi.fn(), mockDownloadCopilotFile: vi.fn(), mockInferContextFromKey: vi.fn(), mockGetContentType: vi.fn(), @@ -58,10 +56,7 @@ vi.mock('@/lib/uploads', () => ({ isUsingCloudStorage: mockIsUsingCloudStorage, })) -vi.mock('@/lib/uploads/core/storage-service', () => ({ - downloadFile: mockDownloadFile, - hasCloudStorage: vi.fn().mockReturnValue(true), -})) +vi.mock('@/lib/uploads/core/storage-service', () => storageServiceMock) vi.mock('@/lib/uploads/utils/file-utils', () => ({ inferContextFromKey: mockInferContextFromKey, @@ -104,6 +99,7 @@ describe('File Serve API Route', () => { mockVerifyFileAccess.mockResolvedValue(true) mockReadFile.mockResolvedValue(Buffer.from('test content')) mockIsUsingCloudStorage.mockReturnValue(false) + storageServiceMockFns.mockHasCloudStorage.mockReturnValue(true) mockInferContextFromKey.mockReturnValue('workspace') mockGetContentType.mockReturnValue('text/plain') mockFindLocalFile.mockReturnValue('/test/uploads/test-file.txt') @@ -161,7 +157,7 @@ describe('File Serve API Route', () => { it('should serve cloud file by downloading and proxying', async () => { mockIsUsingCloudStorage.mockReturnValue(true) - mockDownloadFile.mockResolvedValue(Buffer.from('test cloud file content')) + storageServiceMockFns.mockDownloadFile.mockResolvedValue(Buffer.from('test cloud file content')) mockGetContentType.mockReturnValue('image/png') const req = new NextRequest( @@ -174,7 +170,7 @@ describe('File Serve API Route', () => { expect(response.status).toBe(200) expect(response.headers.get('Content-Type')).toBe('image/png') - expect(mockDownloadFile).toHaveBeenCalledWith({ + expect(storageServiceMockFns.mockDownloadFile).toHaveBeenCalledWith({ key: 'workspace/test-workspace-id/1234567890-image.png', context: 'workspace', }) diff --git a/apps/sim/app/api/files/upload/route.test.ts b/apps/sim/app/api/files/upload/route.test.ts index 8e9ff1dbe8b..f0ef4ede98b 100644 --- a/apps/sim/app/api/files/upload/route.test.ts +++ b/apps/sim/app/api/files/upload/route.test.ts @@ -3,7 +3,14 @@ * * @vitest-environment node */ -import { authMockFns, hybridAuthMockFns, permissionsMock, permissionsMockFns } from '@sim/testing' +import { + authMockFns, + hybridAuthMockFns, + permissionsMock, + permissionsMockFns, + storageServiceMock, + storageServiceMockFns, +} from '@sim/testing' import { NextRequest } from 'next/server' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' @@ -16,8 +23,6 @@ const mocks = vi.hoisted(() => { const mockGetStorageProvider = vi.fn() const mockIsUsingCloudStorage = vi.fn() const mockUploadFile = vi.fn() - const mockHasCloudStorage = vi.fn() - const mockStorageUploadFile = vi.fn() return { mockVerifyFileAccess, @@ -28,8 +33,6 @@ const mocks = vi.hoisted(() => { mockGetStorageProvider, mockIsUsingCloudStorage, mockUploadFile, - mockHasCloudStorage, - mockStorageUploadFile, } }) @@ -85,10 +88,7 @@ vi.mock('@/lib/uploads', () => ({ uploadFile: mocks.mockUploadFile, })) -vi.mock('@/lib/uploads/core/storage-service', () => ({ - uploadFile: mocks.mockStorageUploadFile, - hasCloudStorage: mocks.mockHasCloudStorage, -})) +vi.mock('@/lib/uploads/core/storage-service', () => storageServiceMock) vi.mock('@/lib/uploads/setup.server', () => ({ UPLOAD_DIR_SERVER: '/tmp/test-uploads', @@ -153,8 +153,8 @@ function setupFileApiMocks( type: 'text/plain', }) - mocks.mockHasCloudStorage.mockReturnValue(cloudEnabled) - mocks.mockStorageUploadFile.mockResolvedValue({ + storageServiceMockFns.mockHasCloudStorage.mockReturnValue(cloudEnabled) + storageServiceMockFns.mockUploadFile.mockResolvedValue({ key: 'test-key', path: '/test/path', }) @@ -325,8 +325,8 @@ describe('File Upload Security Tests', () => { user: { id: 'test-user-id' }, }) - mocks.mockHasCloudStorage.mockReturnValue(false) - mocks.mockStorageUploadFile.mockResolvedValue({ + storageServiceMockFns.mockHasCloudStorage.mockReturnValue(false) + storageServiceMockFns.mockUploadFile.mockResolvedValue({ key: 'test-key', path: '/test/path', }) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts index 6a9808f1b0e..c7b86847f0b 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts @@ -5,6 +5,7 @@ import { databaseMock, hybridAuthMockFns, + posthogServerMock, workflowAuthzMockFns, workflowsUtilsMock, } from '@sim/testing' @@ -43,9 +44,7 @@ vi.mock('@/lib/workflows/executor/human-in-the-loop-manager', () => ({ vi.mock('@/lib/workflows/utils', () => workflowsUtilsMock) -vi.mock('@/lib/posthog/server', () => ({ - captureServerEvent: vi.fn(), -})) +vi.mock('@/lib/posthog/server', () => posthogServerMock) vi.mock('@/lib/execution/event-buffer', () => ({ setExecutionMeta: (...args: unknown[]) => mockSetExecutionMeta(...args), diff --git a/apps/sim/app/api/workspaces/[id]/files/presigned/route.test.ts b/apps/sim/app/api/workspaces/[id]/files/presigned/route.test.ts new file mode 100644 index 00000000000..8fee00a3f25 --- /dev/null +++ b/apps/sim/app/api/workspaces/[id]/files/presigned/route.test.ts @@ -0,0 +1,161 @@ +/** + * @vitest-environment node + */ +import { + authMockFns, + permissionsMock, + permissionsMockFns, + storageServiceMock, + storageServiceMockFns, +} from '@sim/testing' +import { NextRequest } from 'next/server' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockCheckStorageQuota, mockGenerateWorkspaceFileKey, mockUseBlobStorage } = vi.hoisted( + () => ({ + mockCheckStorageQuota: vi.fn(), + mockGenerateWorkspaceFileKey: vi.fn(), + mockUseBlobStorage: { value: false }, + }) +) + +vi.mock('@/lib/billing/storage', () => ({ + checkStorageQuota: mockCheckStorageQuota, +})) + +vi.mock('@/lib/uploads/core/storage-service', () => storageServiceMock) + +vi.mock('@/lib/uploads/contexts/workspace/workspace-file-manager', () => ({ + generateWorkspaceFileKey: mockGenerateWorkspaceFileKey, +})) + +vi.mock('@/lib/uploads/config', () => ({ + get USE_BLOB_STORAGE() { + return mockUseBlobStorage.value + }, +})) + +vi.mock('@/lib/workspaces/permissions/utils', () => permissionsMock) + +const WS = '7727ef3f-8cf6-4686-b063-2bb006a10785' + +import { POST } from '@/app/api/workspaces/[id]/files/presigned/route' + +const params = (id = WS) => ({ params: Promise.resolve({ id }) }) + +const makeRequest = (body: unknown) => + new NextRequest(`http://localhost/api/workspaces/${WS}/files/presigned`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }) + +const validBody = { + fileName: 'video.mp4', + contentType: 'video/mp4', + fileSize: 10 * 1024 * 1024, +} + +describe('POST /api/workspaces/[id]/files/presigned', () => { + beforeEach(() => { + vi.clearAllMocks() + authMockFns.mockGetSession.mockResolvedValue({ user: { id: 'user-1' } }) + permissionsMockFns.mockGetUserEntityPermissions.mockResolvedValue('write') + mockCheckStorageQuota.mockResolvedValue({ allowed: true }) + storageServiceMockFns.mockHasCloudStorage.mockReturnValue(true) + mockGenerateWorkspaceFileKey.mockReturnValue(`workspace/${WS}/123-abc-video.mp4`) + storageServiceMockFns.mockGeneratePresignedUploadUrl.mockResolvedValue({ + url: 'https://s3/presigned', + key: `workspace/${WS}/123-abc-video.mp4`, + uploadHeaders: { 'Content-Type': 'video/mp4' }, + }) + }) + + it('returns 401 when unauthenticated', async () => { + authMockFns.mockGetSession.mockResolvedValueOnce(null) + const res = await POST(makeRequest(validBody), params()) + expect(res.status).toBe(401) + }) + + it('returns 403 when user has read-only permission', async () => { + permissionsMockFns.mockGetUserEntityPermissions.mockResolvedValueOnce('read') + const res = await POST(makeRequest(validBody), params()) + expect(res.status).toBe(403) + }) + + it('returns 400 for missing fileName', async () => { + const res = await POST(makeRequest({ ...validBody, fileName: '' }), params()) + expect(res.status).toBe(400) + }) + + it('returns 400 for negative fileSize', async () => { + const res = await POST(makeRequest({ ...validBody, fileSize: -1 }), params()) + expect(res.status).toBe(400) + }) + + it('accepts fileSize === 0 (empty new files)', async () => { + const res = await POST(makeRequest({ ...validBody, fileSize: 0 }), params()) + expect(res.status).toBe(200) + }) + + it('returns 413 when fileSize exceeds 5 GiB ceiling', async () => { + const res = await POST( + makeRequest({ ...validBody, fileSize: 6 * 1024 * 1024 * 1024 }), + params() + ) + expect(res.status).toBe(413) + }) + + it('returns 413 when storage quota would be exceeded', async () => { + mockCheckStorageQuota.mockResolvedValueOnce({ allowed: false, error: 'Over quota' }) + const res = await POST(makeRequest(validBody), params()) + const body = await res.json() + expect(res.status).toBe(413) + expect(body.error).toBe('Over quota') + }) + + it('returns local fallback signal when cloud storage is not configured', async () => { + storageServiceMockFns.mockHasCloudStorage.mockReturnValueOnce(false) + const res = await POST(makeRequest(validBody), params()) + const body = await res.json() + expect(res.status).toBe(200) + expect(body.directUploadSupported).toBe(false) + expect(body.presignedUrl).toBe('') + expect(body.fileInfo.name).toBe('video.mp4') + expect(storageServiceMockFns.mockGeneratePresignedUploadUrl).not.toHaveBeenCalled() + }) + + it('issues a presigned URL bound to the workspace', async () => { + const res = await POST(makeRequest(validBody), params()) + const body = await res.json() + + expect(res.status).toBe(200) + expect(body.directUploadSupported).toBe(true) + expect(body.presignedUrl).toBe('https://s3/presigned') + expect(body.fileInfo.key).toBe(`workspace/${WS}/123-abc-video.mp4`) + expect(body.fileInfo.path).toContain('?context=workspace') + expect(body.fileInfo.path).toContain('s3') + expect(body.uploadHeaders).toEqual({ 'Content-Type': 'video/mp4' }) + + expect(mockGenerateWorkspaceFileKey).toHaveBeenCalledWith(WS, 'video.mp4') + expect(storageServiceMockFns.mockGeneratePresignedUploadUrl).toHaveBeenCalledWith( + expect.objectContaining({ + context: 'workspace', + userId: 'user-1', + customKey: `workspace/${WS}/123-abc-video.mp4`, + metadata: { workspaceId: WS }, + }) + ) + }) + + it('serves blob path when blob storage is configured', async () => { + mockUseBlobStorage.value = true + try { + const res = await POST(makeRequest(validBody), params()) + const body = await res.json() + expect(body.fileInfo.path).toContain('/blob/') + } finally { + mockUseBlobStorage.value = false + } + }) +}) diff --git a/apps/sim/app/api/workspaces/[id]/files/presigned/route.ts b/apps/sim/app/api/workspaces/[id]/files/presigned/route.ts new file mode 100644 index 00000000000..332a9386ca7 --- /dev/null +++ b/apps/sim/app/api/workspaces/[id]/files/presigned/route.ts @@ -0,0 +1,97 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { workspacePresignedUploadContract } from '@/lib/api/contracts/workspace-files' +import { parseRequest } from '@/lib/api/server' +import { getSession } from '@/lib/auth' +import { checkStorageQuota } from '@/lib/billing/storage' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { USE_BLOB_STORAGE } from '@/lib/uploads/config' +import { generateWorkspaceFileKey } from '@/lib/uploads/contexts/workspace/workspace-file-manager' +import { generatePresignedUploadUrl, hasCloudStorage } from '@/lib/uploads/core/storage-service' +import { MAX_WORKSPACE_FILE_SIZE } from '@/lib/uploads/shared/types' +import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' + +const logger = createLogger('WorkspacePresignedAPI') + +/** + * POST /api/workspaces/[id]/files/presigned + * Returns a presigned PUT URL for a workspace-scoped object key. The client + * uploads the bytes directly to S3/Blob, then calls /files/register to + * insert metadata. + */ +export const POST = withRouteHandler( + async (request: NextRequest, context: { params: Promise<{ id: string }> }) => { + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + const userId = session.user.id + + const parsed = await parseRequest(workspacePresignedUploadContract, request, context) + if (!parsed.success) return parsed.response + const { params, body } = parsed.data + const workspaceId = params.id + const { fileName, contentType, fileSize } = body + + const permission = await getUserEntityPermissions(userId, 'workspace', workspaceId) + if (permission !== 'admin' && permission !== 'write') { + logger.warn(`User ${userId} lacks write permission for ${workspaceId}`) + return NextResponse.json({ error: 'Forbidden' }, { status: 403 }) + } + + if (fileSize > MAX_WORKSPACE_FILE_SIZE) { + return NextResponse.json( + { error: `File size exceeds maximum of ${MAX_WORKSPACE_FILE_SIZE} bytes` }, + { status: 413 } + ) + } + + if (!hasCloudStorage()) { + logger.info(`Local storage detected, signaling API fallback for ${fileName}`) + return NextResponse.json({ + fileName, + presignedUrl: '', + fileInfo: { path: '', key: '', name: fileName, size: fileSize, type: contentType }, + directUploadSupported: false, + }) + } + + const quotaCheck = await checkStorageQuota(userId, fileSize) + if (!quotaCheck.allowed) { + return NextResponse.json( + { error: quotaCheck.error || 'Storage limit exceeded' }, + { status: 413 } + ) + } + + const key = generateWorkspaceFileKey(workspaceId, fileName) + const presigned = await generatePresignedUploadUrl({ + fileName, + contentType, + fileSize, + context: 'workspace', + userId, + customKey: key, + expirationSeconds: 3600, + metadata: { workspaceId }, + }) + + const finalPath = `/api/files/serve/${USE_BLOB_STORAGE ? 'blob' : 's3'}/${encodeURIComponent(key)}?context=workspace` + + logger.info(`Issued workspace presigned URL for ${fileName} -> ${key}`) + + return NextResponse.json({ + fileName, + presignedUrl: presigned.url, + fileInfo: { + path: finalPath, + key: presigned.key, + name: fileName, + size: fileSize, + type: contentType, + }, + uploadHeaders: presigned.uploadHeaders, + directUploadSupported: true, + }) + } +) diff --git a/apps/sim/app/api/workspaces/[id]/files/register/route.test.ts b/apps/sim/app/api/workspaces/[id]/files/register/route.test.ts new file mode 100644 index 00000000000..3d9fd1465e3 --- /dev/null +++ b/apps/sim/app/api/workspaces/[id]/files/register/route.test.ts @@ -0,0 +1,171 @@ +/** + * @vitest-environment node + */ +import { + auditMock, + auditMockFns, + authMockFns, + permissionsMock, + permissionsMockFns, + posthogServerMock, + posthogServerMockFns, +} from '@sim/testing' +import { NextRequest } from 'next/server' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockRegisterUploadedWorkspaceFile, mockParseWorkspaceFileKey, FileConflictErrorImpl } = + vi.hoisted(() => { + class FileConflictErrorImpl extends Error { + constructor(message: string) { + super(message) + this.name = 'FileConflictError' + } + } + return { + mockRegisterUploadedWorkspaceFile: vi.fn(), + mockParseWorkspaceFileKey: vi.fn(), + FileConflictErrorImpl, + } + }) + +vi.mock('@/lib/uploads/contexts/workspace', () => ({ + registerUploadedWorkspaceFile: mockRegisterUploadedWorkspaceFile, + parseWorkspaceFileKey: mockParseWorkspaceFileKey, + FileConflictError: FileConflictErrorImpl, +})) + +vi.mock('@/lib/posthog/server', () => posthogServerMock) +vi.mock('@/lib/workspaces/permissions/utils', () => permissionsMock) +vi.mock('@sim/audit', () => auditMock) + +const WS = '7727ef3f-8cf6-4686-b063-2bb006a10785' +const VALID_KEY = `workspace/${WS}/123-abc-video.mp4` + +import { POST } from '@/app/api/workspaces/[id]/files/register/route' + +const params = (id = WS) => ({ params: Promise.resolve({ id }) }) + +const makeRequest = (body: unknown) => + new NextRequest(`http://localhost/api/workspaces/${WS}/files/register`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }) + +const validBody = { + key: VALID_KEY, + name: 'video.mp4', + contentType: 'video/mp4', +} + +describe('POST /api/workspaces/[id]/files/register', () => { + beforeEach(() => { + vi.clearAllMocks() + authMockFns.mockGetSession.mockResolvedValue({ + user: { id: 'user-1', name: 'User One', email: 'u@example.com' }, + }) + permissionsMockFns.mockGetUserEntityPermissions.mockResolvedValue('write') + mockParseWorkspaceFileKey.mockImplementation((key: string) => { + const match = key.match(/^workspace\/([^/]+)\//) + return match ? match[1] : null + }) + mockRegisterUploadedWorkspaceFile.mockResolvedValue({ + file: { + id: 'wf_123', + name: 'video.mp4', + size: 10 * 1024 * 1024, + type: 'video/mp4', + url: '/api/files/serve/...', + key: VALID_KEY, + context: 'workspace', + }, + created: true, + }) + }) + + it('returns 401 when unauthenticated', async () => { + authMockFns.mockGetSession.mockResolvedValueOnce(null) + const res = await POST(makeRequest(validBody), params()) + expect(res.status).toBe(401) + }) + + it('returns 403 when user lacks write permission', async () => { + permissionsMockFns.mockGetUserEntityPermissions.mockResolvedValueOnce('read') + const res = await POST(makeRequest(validBody), params()) + expect(res.status).toBe(403) + }) + + it('rejects keys belonging to a different workspace', async () => { + const otherWsKey = `workspace/00000000-0000-0000-0000-000000000000/123-abc-video.mp4` + const res = await POST(makeRequest({ ...validBody, key: otherWsKey }), params()) + const body = await res.json() + expect(res.status).toBe(400) + expect(body.error).toContain('does not belong') + expect(mockRegisterUploadedWorkspaceFile).not.toHaveBeenCalled() + }) + + it('returns 400 for empty key/name', async () => { + const res = await POST(makeRequest({ ...validBody, key: '' }), params()) + expect(res.status).toBe(400) + }) + + it('returns 404 when storage object is missing', async () => { + mockRegisterUploadedWorkspaceFile.mockRejectedValueOnce( + new Error('Uploaded object not found in storage') + ) + const res = await POST(makeRequest(validBody), params()) + expect(res.status).toBe(404) + }) + + it('returns 409 on duplicate file conflict', async () => { + mockRegisterUploadedWorkspaceFile.mockRejectedValueOnce(new FileConflictErrorImpl('video.mp4')) + const res = await POST(makeRequest(validBody), params()) + const body = await res.json() + expect(res.status).toBe(409) + expect(body.isDuplicate).toBe(true) + }) + + it('skips audit + analytics on idempotent re-register (created=false)', async () => { + mockRegisterUploadedWorkspaceFile.mockResolvedValueOnce({ + file: { + id: 'wf_123', + name: 'video.mp4', + size: 10 * 1024 * 1024, + type: 'video/mp4', + url: '/api/files/serve/...', + key: VALID_KEY, + context: 'workspace', + }, + created: false, + }) + + const res = await POST(makeRequest(validBody), params()) + expect(res.status).toBe(200) + expect(posthogServerMockFns.mockCaptureServerEvent).not.toHaveBeenCalled() + expect(auditMockFns.mockRecordAudit).not.toHaveBeenCalled() + }) + + it('finalizes upload, records audit and analytics', async () => { + const res = await POST(makeRequest(validBody), params()) + const body = await res.json() + + expect(res.status).toBe(200) + expect(body.success).toBe(true) + expect(body.file).toMatchObject({ id: 'wf_123', key: VALID_KEY }) + + expect(mockRegisterUploadedWorkspaceFile).toHaveBeenCalledWith({ + workspaceId: WS, + userId: 'user-1', + key: VALID_KEY, + originalName: 'video.mp4', + contentType: 'video/mp4', + }) + + expect(posthogServerMockFns.mockCaptureServerEvent).toHaveBeenCalledWith( + 'user-1', + 'file_uploaded', + expect.objectContaining({ workspace_id: WS, file_type: 'video/mp4' }), + expect.any(Object) + ) + }) +}) diff --git a/apps/sim/app/api/workspaces/[id]/files/register/route.ts b/apps/sim/app/api/workspaces/[id]/files/register/route.ts new file mode 100644 index 00000000000..dfcaa537b5e --- /dev/null +++ b/apps/sim/app/api/workspaces/[id]/files/register/route.ts @@ -0,0 +1,101 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { registerWorkspaceFileContract } from '@/lib/api/contracts/workspace-files' +import { parseRequest } from '@/lib/api/server' +import { getSession } from '@/lib/auth' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { captureServerEvent } from '@/lib/posthog/server' +import { + FileConflictError, + parseWorkspaceFileKey, + registerUploadedWorkspaceFile, +} from '@/lib/uploads/contexts/workspace' +import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' + +const logger = createLogger('WorkspaceRegisterAPI') + +/** + * POST /api/workspaces/[id]/files/register + * Finalize a direct-to-storage upload by inserting metadata, updating quota, + * and recording an audit log. Validates the storage key belongs to the + * caller's workspace to prevent cross-tenant key smuggling. + */ +export const POST = withRouteHandler( + async (request: NextRequest, context: { params: Promise<{ id: string }> }) => { + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + const userId = session.user.id + + const parsed = await parseRequest(registerWorkspaceFileContract, request, context) + if (!parsed.success) return parsed.response + const { params, body } = parsed.data + const workspaceId = params.id + const { key, name, contentType } = body + + const permission = await getUserEntityPermissions(userId, 'workspace', workspaceId) + if (permission !== 'admin' && permission !== 'write') { + logger.warn(`User ${userId} lacks write permission for ${workspaceId}`) + return NextResponse.json({ error: 'Forbidden' }, { status: 403 }) + } + + if (parseWorkspaceFileKey(key) !== workspaceId) { + logger.warn(`Key ${key} does not belong to workspace ${workspaceId}`) + return NextResponse.json( + { error: 'Storage key does not belong to this workspace' }, + { status: 400 } + ) + } + + try { + const { file: userFile, created } = await registerUploadedWorkspaceFile({ + workspaceId, + userId, + key, + originalName: name, + contentType, + }) + + if (created) { + logger.info(`Registered direct upload ${name} -> ${key}`) + + captureServerEvent( + userId, + 'file_uploaded', + { workspace_id: workspaceId, file_type: contentType }, + { groups: { workspace: workspaceId } } + ) + + recordAudit({ + workspaceId, + actorId: userId, + actorName: session.user.name, + actorEmail: session.user.email, + action: AuditAction.FILE_UPLOADED, + resourceType: AuditResourceType.FILE, + resourceId: userFile.id, + resourceName: name, + description: `Uploaded file "${name}"`, + metadata: { fileSize: userFile.size, fileType: contentType }, + request, + }) + } else { + logger.info(`Idempotent re-register for existing upload ${name} -> ${key}`) + } + + return NextResponse.json({ success: true, file: userFile }) + } catch (error) { + logger.error('Failed to register workspace file:', error) + + const errorMessage = error instanceof Error ? error.message : 'Failed to register file' + const isDuplicate = + error instanceof FileConflictError || errorMessage.includes('already exists') + const isMissing = errorMessage.includes('not found in storage') + + const status = isDuplicate ? 409 : isMissing ? 404 : 500 + return NextResponse.json({ success: false, error: errorMessage, isDuplicate }, { status }) + } + } +) diff --git a/apps/sim/app/api/workspaces/[id]/files/route.ts b/apps/sim/app/api/workspaces/[id]/files/route.ts index 090e2aa1ee2..d89b12118e8 100644 --- a/apps/sim/app/api/workspaces/[id]/files/route.ts +++ b/apps/sim/app/api/workspaces/[id]/files/route.ts @@ -15,6 +15,7 @@ import { listWorkspaceFiles, uploadWorkspaceFile, } from '@/lib/uploads/contexts/workspace' +import { MAX_WORKSPACE_FORMDATA_FILE_SIZE } from '@/lib/uploads/shared/types' import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' import { verifyWorkspaceMembership } from '@/app/api/workflows/utils' @@ -129,13 +130,12 @@ export const POST = withRouteHandler( const fileName = rawFile.name || 'untitled.md' - const maxSize = 100 * 1024 * 1024 - if (rawFile.size > maxSize) { + if (rawFile.size > MAX_WORKSPACE_FORMDATA_FILE_SIZE) { return NextResponse.json( { - error: `File size exceeds 100MB limit (${(rawFile.size / (1024 * 1024)).toFixed(2)}MB)`, + error: `File size exceeds maximum of ${MAX_WORKSPACE_FORMDATA_FILE_SIZE} bytes (${(rawFile.size / (1024 * 1024)).toFixed(2)}MB)`, }, - { status: 400 } + { status: 413 } ) } diff --git a/apps/sim/app/api/workspaces/invitations/route.test.ts b/apps/sim/app/api/workspaces/invitations/route.test.ts index 979fe7523bc..c364d8228e4 100644 --- a/apps/sim/app/api/workspaces/invitations/route.test.ts +++ b/apps/sim/app/api/workspaces/invitations/route.test.ts @@ -8,6 +8,7 @@ import { createMockRequest, permissionsMock, permissionsMockFns, + posthogServerMock, schemaMock, } from '@sim/testing' import { beforeEach, describe, expect, it, vi } from 'vitest' @@ -94,9 +95,7 @@ vi.mock('@/ee/access-control/utils/permission-check', () => ({ vi.mock('@sim/audit', () => auditMock) -vi.mock('@/lib/posthog/server', () => ({ - captureServerEvent: vi.fn(), -})) +vi.mock('@/lib/posthog/server', () => posthogServerMock) vi.mock('@/lib/core/telemetry', () => ({ PlatformEvents: { diff --git a/apps/sim/app/workspace/[workspaceId]/files/files.tsx b/apps/sim/app/workspace/[workspaceId]/files/files.tsx index cd1cbdf499e..c599a313c2a 100644 --- a/apps/sim/app/workspace/[workspaceId]/files/files.tsx +++ b/apps/sim/app/workspace/[workspaceId]/files/files.tsx @@ -23,11 +23,13 @@ import { ModalHeader, Pencil, Trash, + toast, Upload, } from '@/components/emcn' import { File as FilesIcon } from '@/components/emcn/icons' import { getDocumentIcon } from '@/components/icons/document-icons' import type { WorkspaceFileRecord } from '@/lib/uploads/contexts/workspace' +import { MAX_WORKSPACE_FILE_SIZE } from '@/lib/uploads/shared/types' import { downloadWorkspaceFile, formatFileSize, @@ -180,7 +182,11 @@ export function Files() { filesRef.current = files const [uploading, setUploading] = useState(false) - const [uploadProgress, setUploadProgress] = useState({ completed: 0, total: 0 }) + const [uploadProgress, setUploadProgress] = useState({ + completed: 0, + total: 0, + currentPercent: 0, + }) const [isDraggingOver, setIsDraggingOver] = useState(false) const dragCounterRef = useRef(0) const [inputValue, setInputValue] = useState('') @@ -376,8 +382,24 @@ export function Files() { const uploadFiles = async (filesToUpload: File[]) => { if (!workspaceId || filesToUpload.length === 0) return + const oversized: string[] = [] + const sizeFiltered = filesToUpload.filter((f) => { + if (f.size > MAX_WORKSPACE_FILE_SIZE) { + oversized.push(f.name) + return false + } + return true + }) + if (oversized.length > 0) { + toast.error( + oversized.length === 1 + ? `${oversized[0]} exceeds the 5 GiB upload limit` + : `${oversized.length} files exceed the 5 GiB upload limit` + ) + } + const unsupported: string[] = [] - const allowedFiles = filesToUpload.filter((f) => { + const allowedFiles = sizeFiltered.filter((f) => { const ext = getFileExtension(f.name) const ok = SUPPORTED_EXTENSIONS.includes(ext as (typeof SUPPORTED_EXTENSIONS)[number]) if (!ok) unsupported.push(f.name) @@ -392,12 +414,22 @@ export function Files() { try { setUploading(true) - setUploadProgress({ completed: 0, total: allowedFiles.length }) + setUploadProgress({ completed: 0, total: allowedFiles.length, currentPercent: 0 }) for (let i = 0; i < allowedFiles.length; i++) { try { - await uploadFile.mutateAsync({ workspaceId, file: allowedFiles[i] }) - setUploadProgress({ completed: i + 1, total: allowedFiles.length }) + await uploadFile.mutateAsync({ + workspaceId, + file: allowedFiles[i], + onProgress: ({ percent }) => { + setUploadProgress((prev) => ({ ...prev, currentPercent: percent })) + }, + }) + setUploadProgress({ + completed: i + 1, + total: allowedFiles.length, + currentPercent: 0, + }) } catch (err) { logger.error('Error uploading file:', err) } @@ -406,7 +438,7 @@ export function Files() { logger.error('Error uploading file:', err) } finally { setUploading(false) - setUploadProgress({ completed: 0, total: 0 }) + setUploadProgress({ completed: 0, total: 0, currentPercent: 0 }) } } @@ -824,7 +856,9 @@ export function Files() { const uploadButtonLabel = uploading && uploadProgress.total > 0 - ? `${uploadProgress.completed}/${uploadProgress.total}` + ? uploadProgress.currentPercent > 0 && uploadProgress.currentPercent < 100 + ? `${uploadProgress.completed}/${uploadProgress.total} · ${uploadProgress.currentPercent}%` + : `${uploadProgress.completed}/${uploadProgress.total}` : uploading ? 'Uploading...' : 'Upload' diff --git a/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts b/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts index bb7cab5a7d1..87ebc397b72 100644 --- a/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts +++ b/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts @@ -2,20 +2,36 @@ import { useCallback, useState } from 'react' import { createLogger } from '@sim/logger' import { sleep } from '@sim/utils/helpers' import { useQueryClient } from '@tanstack/react-query' -import { isApiClientError } from '@/lib/api/client/errors' -import { requestJson } from '@/lib/api/client/request' -import { createKnowledgeDocumentsContract } from '@/lib/api/contracts/knowledge/documents' -import { getFileExtension, getMimeTypeFromExtension } from '@/lib/uploads/utils/file-utils' +import { + calculateUploadTimeoutMs, + DirectUploadError, + isTransientUploadError, + LARGE_FILE_THRESHOLD, + MULTIPART_MAX_RETRIES, + MULTIPART_RETRY_BACKOFF, + MULTIPART_RETRY_DELAY_MS, + normalizePresignedData, + type PresignedUploadInfo, + runUploadStrategy, + runWithConcurrency, + type UploadProgressEvent, + WHOLE_FILE_PARALLEL_UPLOADS, +} from '@/lib/uploads/client/direct-upload' +import { getFileContentType, isAbortError, isNetworkError } from '@/lib/uploads/utils/file-utils' import { knowledgeKeys } from '@/hooks/queries/kb/knowledge' const logger = createLogger('KnowledgeUpload') +const KB_BATCH_PRESIGNED_ENDPOINT = '/api/files/presigned/batch?type=knowledge-base' +const KB_API_UPLOAD_ENDPOINT = '/api/files/upload' + +const BATCH_REQUEST_SIZE = 50 + export interface UploadedFile { filename: string fileUrl: string fileSize: number mimeType: string - // Document tags tag1?: string tag2?: string tag3?: string @@ -29,7 +45,7 @@ export interface FileUploadStatus { fileName: string fileSize: number status: 'pending' | 'uploading' | 'completed' | 'failed' - progress?: number // 0-100 percentage + progress?: number error?: string } @@ -38,15 +54,15 @@ export interface UploadProgress { filesCompleted: number totalFiles: number currentFile?: string - currentFileProgress?: number // 0-100 percentage for current file - fileStatuses?: FileUploadStatus[] // Track each file's status + currentFileProgress?: number + fileStatuses?: FileUploadStatus[] } export interface UploadError { message: string timestamp: number code?: string - details?: any + details?: unknown } export interface ProcessingOptions { @@ -69,323 +85,123 @@ class KnowledgeUploadError extends Error { } } -class PresignedUrlError extends KnowledgeUploadError { - constructor(message: string, details?: unknown) { - super(message, 'PRESIGNED_URL_ERROR', details) - } -} - -class DirectUploadError extends KnowledgeUploadError { - constructor(message: string, details?: unknown) { - super(message, 'DIRECT_UPLOAD_ERROR', details) - } -} - class ProcessingError extends KnowledgeUploadError { constructor(message: string, details?: unknown) { super(message, 'PROCESSING_ERROR', details) } } -/** - * Loosely-typed shape of a failed `Response` JSON body returned by routes - * in this codebase. Routes generally surface `{ error?, message?, details? }` - * where `details` is a Zod issue array (`{ message: string }[]`). Treated - * as a structural read-only view; missing/undefined fields are tolerated. - */ -interface ApiErrorBodyShape { - error?: unknown - message?: unknown - details?: unknown -} - -/** - * Reads a failed `Response`'s JSON body into a typed shape and returns the - * parsed body plus a human-readable error string that combines the - * top-level `error`/`message` with any Zod `details[].message` entries. - * Falls back to `statusText` then status code when the body is unreadable. - */ -async function readApiResponseError( - response: Response, - fallback = 'Unknown error' -): Promise<{ message: string; body: ApiErrorBodyShape | null }> { - let body: ApiErrorBodyShape | null = null - try { - const parsed: unknown = await response.json() - if (parsed && typeof parsed === 'object') { - body = parsed as ApiErrorBodyShape - } - } catch { - body = null - } - - const baseError = - (typeof body?.error === 'string' && body.error) || - (typeof body?.message === 'string' && body.message) || - response.statusText || - `HTTP ${response.status}` || - fallback - - const detailMessages = Array.isArray(body?.details) - ? body.details - .map((d) => - d && typeof d === 'object' && 'message' in d && typeof d.message === 'string' - ? d.message - : null - ) - .filter((m): m is string => Boolean(m)) - .join(', ') - : '' - - return { - message: detailMessages ? `${baseError}: ${detailMessages}` : baseError, - body, - } -} - -/** - * Configuration constants for file upload operations - */ -const UPLOAD_CONFIG = { - MAX_PARALLEL_UPLOADS: 3, - MAX_RETRIES: 3, - RETRY_DELAY_MS: 2000, - RETRY_BACKOFF: 2, - CHUNK_SIZE: 8 * 1024 * 1024, - DIRECT_UPLOAD_THRESHOLD: 4 * 1024 * 1024, - LARGE_FILE_THRESHOLD: 50 * 1024 * 1024, - BASE_TIMEOUT_MS: 2 * 60 * 1000, - TIMEOUT_PER_MB_MS: 1500, - MAX_TIMEOUT_MS: 10 * 60 * 1000, - MULTIPART_PART_CONCURRENCY: 3, - MULTIPART_MAX_RETRIES: 3, - BATCH_REQUEST_SIZE: 50, -} as const - -/** - * Calculates the upload timeout based on file size - */ -const calculateUploadTimeoutMs = (fileSize: number) => { - const sizeInMb = fileSize / (1024 * 1024) - const dynamicBudget = UPLOAD_CONFIG.BASE_TIMEOUT_MS + sizeInMb * UPLOAD_CONFIG.TIMEOUT_PER_MB_MS - return Math.min(dynamicBudget, UPLOAD_CONFIG.MAX_TIMEOUT_MS) -} - -/** - * Gets high resolution timestamp for performance measurements - */ -const getHighResTime = () => - typeof performance !== 'undefined' && typeof performance.now === 'function' - ? performance.now() - : Date.now() - -/** - * Formats bytes to megabytes with 2 decimal places - */ -const formatMegabytes = (bytes: number) => Number((bytes / (1024 * 1024)).toFixed(2)) - -/** - * Calculates throughput in Mbps - */ -const calculateThroughputMbps = (bytes: number, durationMs: number) => { - if (!bytes || !durationMs) return 0 - return Number((((bytes * 8) / durationMs) * 0.001).toFixed(2)) -} - -/** - * Formats duration from milliseconds to seconds - */ -const formatDurationSeconds = (durationMs: number) => Number((durationMs / 1000).toFixed(2)) +const getErrorMessage = (error: unknown): string => + error instanceof Error ? error.message : typeof error === 'string' ? error : 'Unknown error' -/** - * Gets the content type for a file, falling back to extension-based lookup if browser doesn't provide one - */ -const getFileContentType = (file: File): string => { - if (file.type?.trim()) { - return file.type - } - const extension = getFileExtension(file.name) - return getMimeTypeFromExtension(extension) +interface BatchPresignedFile { + fileName: string + contentType: string + fileSize: number } /** - * Runs async operations with concurrency limit + * Fetch presigned upload data for the small files in `files`. Returns a sparse + * array aligned with the input: entries for files >= LARGE_FILE_THRESHOLD are + * `undefined` because those uploads use multipart and never consume a presigned + * single-PUT URL. */ -const runWithConcurrency = async ( - items: T[], - limit: number, - worker: (item: T, index: number) => Promise -): Promise>> => { - const results: Array> = Array(items.length) - - if (items.length === 0) { - return results +const fetchBatchPresignedData = async ( + files: File[] +): Promise<(PresignedUploadInfo | undefined)[]> => { + const result: (PresignedUploadInfo | undefined)[] = new Array(files.length).fill(undefined) + const smallFileIndices: number[] = [] + for (let i = 0; i < files.length; i++) { + if (files[i].size <= LARGE_FILE_THRESHOLD) smallFileIndices.push(i) } + if (smallFileIndices.length === 0) return result - const concurrency = Math.max(1, Math.min(limit, items.length)) - let nextIndex = 0 - - const runners = Array.from({ length: concurrency }, async () => { - while (true) { - const currentIndex = nextIndex++ - if (currentIndex >= items.length) { - break - } - - try { - const value = await worker(items[currentIndex], currentIndex) - results[currentIndex] = { status: 'fulfilled', value } - } catch (error) { - results[currentIndex] = { status: 'rejected', reason: error } - } + for (let start = 0; start < smallFileIndices.length; start += BATCH_REQUEST_SIZE) { + const batchIndices = smallFileIndices.slice(start, start + BATCH_REQUEST_SIZE) + const batchFiles = batchIndices.map((i) => files[i]) + const body: { files: BatchPresignedFile[] } = { + files: batchFiles.map((file) => ({ + fileName: file.name, + contentType: getFileContentType(file), + fileSize: file.size, + })), } - }) - - await Promise.all(runners) - return results -} - -/** - * Extracts the error name from an unknown error object - */ -const getErrorName = (error: unknown) => - typeof error === 'object' && error !== null && 'name' in error ? String((error as any).name) : '' - -/** - * Extracts a human-readable message from an unknown error - */ -const getErrorMessage = (error: unknown) => - error instanceof Error ? error.message : typeof error === 'string' ? error : 'Unknown error' -/** - * Checks if an error is an abort error - */ -const isAbortError = (error: unknown) => getErrorName(error) === 'AbortError' - -/** - * Checks if an error is a network-related error - */ -const isNetworkError = (error: unknown) => { - if (!(error instanceof Error)) { - return false - } - - const message = error.message.toLowerCase() - return ( - message.includes('network') || - message.includes('fetch') || - message.includes('connection') || - message.includes('timeout') || - message.includes('timed out') || - message.includes('ecconnreset') - ) -} - -interface PresignedFileInfo { - path: string - key: string - name: string - size: number - type: string -} - -interface PresignedUploadInfo { - fileName: string - presignedUrl: string - fileInfo: PresignedFileInfo - uploadHeaders?: Record - directUploadSupported: boolean - presignedUrls?: any -} + const response = await fetch(KB_BATCH_PRESIGNED_ENDPOINT, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }) -/** - * Normalizes presigned URL response data into a consistent format - */ -const normalizePresignedData = (data: any, context: string): PresignedUploadInfo => { - const presignedUrl = data?.presignedUrl || data?.uploadUrl - const fileInfo = data?.fileInfo + if (!response.ok) { + throw new Error(`Batch presigned URL generation failed: ${response.statusText}`) + } - if (!presignedUrl || !fileInfo?.path) { - throw new PresignedUrlError(`Invalid presigned response for ${context}`, data) + const { files: presignedItems } = (await response.json()) as { files: unknown[] } + batchIndices.forEach((fileIdx, batchPos) => { + result[fileIdx] = normalizePresignedData(presignedItems[batchPos], batchFiles[batchPos].name) + }) } - return { - fileName: data.fileName || fileInfo.name || context, - presignedUrl, - fileInfo: { - path: fileInfo.path, - key: fileInfo.key, - name: fileInfo.name || context, - size: fileInfo.size || data.fileSize || 0, - type: fileInfo.type || data.contentType || '', - }, - uploadHeaders: data.uploadHeaders || undefined, - directUploadSupported: data.directUploadSupported !== false, - presignedUrls: data.presignedUrls, - } + return result } /** - * Fetches presigned URL data for file upload + * Server-proxied fallback used when cloud storage isn't configured. */ -const getPresignedData = async ( +const uploadFileThroughAPI = async ( file: File, - timeoutMs: number, - controller?: AbortController -): Promise => { - const localController = controller ?? new AbortController() - const timeoutId = setTimeout(() => localController.abort(), timeoutMs) - const startTime = getHighResTime() + workspaceId: string | undefined +): Promise<{ filePath: string }> => { + const formData = new FormData() + formData.append('file', file) + formData.append('context', 'knowledge-base') + if (workspaceId) formData.append('workspaceId', workspaceId) + + const controller = new AbortController() + const timeoutId = setTimeout(() => controller.abort(), calculateUploadTimeoutMs(file.size)) try { - // boundary-raw-fetch: presigned URL coordination is part of the multipart-signed-url upload flow tracked together with XHR PUT progress; keeping this on raw fetch preserves a single retry/timeout AbortController shared with the direct upload XHR - const presignedResponse = await fetch('/api/files/presigned?type=knowledge-base', { + const response = await fetch(KB_API_UPLOAD_ENDPOINT, { method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - fileName: file.name, - contentType: getFileContentType(file), - fileSize: file.size, - }), - signal: localController.signal, + body: formData, + signal: controller.signal, }) - if (!presignedResponse.ok) { - const { message: fullError, body: errorDetails } = - await readApiResponseError(presignedResponse) - - logger.error('Presigned URL request failed', { - status: presignedResponse.status, - fileSize: file.size, - }) + if (!response.ok) { + let errorData: { message?: string; error?: string } | null = null + try { + errorData = (await response.json()) as { message?: string; error?: string } + } catch {} + throw new KnowledgeUploadError( + `Failed to upload ${file.name}: ${errorData?.message || errorData?.error || response.statusText}`, + 'API_UPLOAD_ERROR', + errorData + ) + } - throw new PresignedUrlError( - `Failed to get presigned URL for ${file.name}: ${fullError}`, - errorDetails + const result = (await response.json()) as { + fileInfo?: { path?: string } + path?: string + } + const filePath = result.fileInfo?.path ?? result.path + if (!filePath) { + throw new KnowledgeUploadError( + `Invalid upload response for ${file.name}: missing file path`, + 'API_UPLOAD_ERROR', + result ) } - const presignedData = await presignedResponse.json() - const durationMs = getHighResTime() - startTime - logger.info('Fetched presigned URL', { - fileName: file.name, - sizeMB: formatMegabytes(file.size), - durationMs: formatDurationSeconds(durationMs), - }) - return normalizePresignedData(presignedData, file.name) + return { filePath } } finally { clearTimeout(timeoutId) - if (!controller) { - localController.abort() - } } } -/** - * Hook for managing file uploads to knowledge bases - */ +const toAbsoluteUrl = (path: string): string => + path.startsWith('http') ? path : `${window.location.origin}${path}` + export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { const queryClient = useQueryClient() const [isUploading, setIsUploading] = useState(false) @@ -396,642 +212,145 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { }) const [uploadError, setUploadError] = useState(null) - /** - * Creates an UploadedFile object from file metadata - */ - const createUploadedFile = ( - filename: string, - fileUrl: string, - fileSize: number, - mimeType: string, - originalFile?: File - ): UploadedFile => ({ - filename, - fileUrl, - fileSize, - mimeType, - tag1: (originalFile as any)?.tag1, - tag2: (originalFile as any)?.tag2, - tag3: (originalFile as any)?.tag3, - tag4: (originalFile as any)?.tag4, - tag5: (originalFile as any)?.tag5, - tag6: (originalFile as any)?.tag6, - tag7: (originalFile as any)?.tag7, - }) - - /** - * Creates an UploadError from an exception - */ - const createErrorFromException = (error: unknown, defaultMessage: string): UploadError => { - if (error instanceof KnowledgeUploadError) { - return { - message: error.message, - code: error.code, - details: error.details, - timestamp: Date.now(), - } + const buildUploadedFile = (file: File, fileUrl: string): UploadedFile => { + const f = file as File & { + tag1?: string + tag2?: string + tag3?: string + tag4?: string + tag5?: string + tag6?: string + tag7?: string } - - if (error instanceof Error) { - return { - message: error.message, - timestamp: Date.now(), - } - } - return { - message: defaultMessage, - timestamp: Date.now(), - } - } - - /** - * Upload a single file with retry logic - */ - const uploadSingleFileWithRetry = async ( - file: File, - retryCount = 0, - fileIndex?: number, - presignedOverride?: PresignedUploadInfo - ): Promise => { - const timeoutMs = calculateUploadTimeoutMs(file.size) - let presignedData: PresignedUploadInfo | undefined - const attempt = retryCount + 1 - logger.info('Upload attempt started', { - fileName: file.name, - attempt, - sizeMB: formatMegabytes(file.size), - timeoutMs: formatDurationSeconds(timeoutMs), - }) - - try { - const controller = new AbortController() - const timeoutId = setTimeout(() => controller.abort(), timeoutMs) - - try { - if (file.size > UPLOAD_CONFIG.LARGE_FILE_THRESHOLD) { - presignedData = presignedOverride ?? (await getPresignedData(file, timeoutMs, controller)) - return await uploadFileInChunks(file, presignedData, timeoutMs, fileIndex) - } - - if (presignedOverride?.directUploadSupported && presignedOverride.presignedUrl) { - return await uploadFileDirectly(file, presignedOverride, timeoutMs, controller, fileIndex) - } - - return await uploadFileThroughAPI(file, timeoutMs) - } finally { - clearTimeout(timeoutId) - } - } catch (error) { - const isTimeout = isAbortError(error) - const isNetwork = isNetworkError(error) - - if (retryCount < UPLOAD_CONFIG.MAX_RETRIES) { - const delay = UPLOAD_CONFIG.RETRY_DELAY_MS * UPLOAD_CONFIG.RETRY_BACKOFF ** retryCount - if (isTimeout || isNetwork) { - logger.warn( - `Upload failed (${isTimeout ? 'timeout' : 'network'}), retrying in ${delay / 1000}s...`, - { - attempt: retryCount + 1, - fileSize: file.size, - delay: delay, - } - ) - } - - if (fileIndex !== undefined) { - setUploadProgress((prev) => ({ - ...prev, - fileStatuses: prev.fileStatuses?.map((fs, idx) => - idx === fileIndex ? { ...fs, progress: 0, status: 'uploading' as const } : fs - ), - })) - } - - await sleep(delay) - const shouldReusePresigned = (isTimeout || isNetwork) && presignedData - return uploadSingleFileWithRetry( - file, - retryCount + 1, - fileIndex, - shouldReusePresigned ? presignedData : undefined - ) - } - - logger.error('Upload failed after retries', { - fileSize: file.size, - errorType: isTimeout ? 'timeout' : isNetwork ? 'network' : 'unknown', - attempts: UPLOAD_CONFIG.MAX_RETRIES + 1, - }) - throw error + filename: file.name, + fileUrl, + fileSize: file.size, + mimeType: getFileContentType(file), + tag1: f.tag1, + tag2: f.tag2, + tag3: f.tag3, + tag4: f.tag4, + tag5: f.tag5, + tag6: f.tag6, + tag7: f.tag7, } } - /** - * Upload file directly with timeout and progress tracking - */ - const uploadFileDirectly = async ( - file: File, - presignedData: PresignedUploadInfo, - timeoutMs: number, - outerController: AbortController, - fileIndex?: number - ): Promise => { - return new Promise((resolve, reject) => { - const xhr = new XMLHttpRequest() - let isCompleted = false - const startTime = getHighResTime() - - const timeoutId = setTimeout(() => { - if (!isCompleted) { - isCompleted = true - xhr.abort() - reject(new Error('Upload timeout')) - } - }, timeoutMs) - - const abortHandler = () => { - if (!isCompleted) { - isCompleted = true - clearTimeout(timeoutId) - xhr.abort() - reject(new DirectUploadError(`Upload aborted for ${file.name}`, {})) - } - } - - outerController.signal.addEventListener('abort', abortHandler) - - xhr.upload.addEventListener('progress', (event) => { - if (event.lengthComputable && fileIndex !== undefined && !isCompleted) { - const percentComplete = Math.round((event.loaded / event.total) * 100) - setUploadProgress((prev) => { - if (prev.fileStatuses?.[fileIndex]?.status === 'uploading') { - return { - ...prev, - fileStatuses: prev.fileStatuses?.map((fs, idx) => - idx === fileIndex ? { ...fs, progress: percentComplete } : fs - ), - } - } - return prev - }) - } - }) - - xhr.addEventListener('load', () => { - if (!isCompleted) { - isCompleted = true - clearTimeout(timeoutId) - outerController.signal.removeEventListener('abort', abortHandler) - const durationMs = getHighResTime() - startTime - if (xhr.status >= 200 && xhr.status < 300) { - const fullFileUrl = presignedData.fileInfo.path.startsWith('http') - ? presignedData.fileInfo.path - : `${window.location.origin}${presignedData.fileInfo.path}` - logger.info('Direct upload completed', { - fileName: file.name, - sizeMB: formatMegabytes(file.size), - durationMs: formatDurationSeconds(durationMs), - throughputMbps: calculateThroughputMbps(file.size, durationMs), - status: xhr.status, - }) - resolve( - createUploadedFile(file.name, fullFileUrl, file.size, getFileContentType(file), file) - ) - } else { - logger.error('S3 PUT request failed', { - status: xhr.status, - fileSize: file.size, - }) - reject( - new DirectUploadError( - `Direct upload failed for ${file.name}: ${xhr.status} ${xhr.statusText}`, - { - uploadResponse: xhr.statusText, - } - ) - ) - } - } - }) - - xhr.addEventListener('error', () => { - if (!isCompleted) { - isCompleted = true - clearTimeout(timeoutId) - outerController.signal.removeEventListener('abort', abortHandler) - const durationMs = getHighResTime() - startTime - logger.error('Direct upload network error', { - fileName: file.name, - sizeMB: formatMegabytes(file.size), - durationMs: formatDurationSeconds(durationMs), - }) - reject(new DirectUploadError(`Network error uploading ${file.name}`, {})) - } - }) - - xhr.addEventListener('abort', abortHandler) - - xhr.open('PUT', presignedData.presignedUrl) - - xhr.setRequestHeader('Content-Type', file.type) - if (presignedData.uploadHeaders) { - Object.entries(presignedData.uploadHeaders).forEach(([key, value]) => { - xhr.setRequestHeader(key, value as string) - }) - } - - xhr.send(file) - }) + const updateFileStatus = (fileIndex: number, patch: Partial) => { + setUploadProgress((prev) => ({ + ...prev, + fileStatuses: prev.fileStatuses?.map((fs, idx) => + idx === fileIndex ? { ...fs, ...patch } : fs + ), + })) } - /** - * Upload large file in chunks (multipart upload) - */ - const uploadFileInChunks = async ( + const uploadOneFile = async ( file: File, - presignedData: PresignedUploadInfo, - timeoutMs: number, - fileIndex?: number + fileIndex: number, + presigned: PresignedUploadInfo | undefined ): Promise => { - logger.info( - `Uploading large file ${file.name} (${(file.size / 1024 / 1024).toFixed(2)}MB) using multipart upload` - ) - const startTime = getHighResTime() + if (!options.workspaceId) { + throw new KnowledgeUploadError('workspaceId is required for upload', 'MISSING_WORKSPACE_ID') + } - try { - if (!options.workspaceId) { - throw new Error('workspaceId is required for multipart upload') - } + const onProgress = (event: UploadProgressEvent) => { + updateFileStatus(fileIndex, { progress: event.percent, status: 'uploading' }) + } - // boundary-raw-fetch: multipart-signed-url initiate step coordinates the cloud multipart upload token consumed by raw-fetch PUTs to provider-issued part URLs below - const initiateResponse = await fetch('/api/files/multipart?action=initiate', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - fileName: file.name, - contentType: getFileContentType(file), - fileSize: file.size, + let attempt = 0 + while (true) { + try { + const result = await runUploadStrategy({ + file, workspaceId: options.workspaceId, - }), - }) - - if (!initiateResponse.ok) { - throw new Error(`Failed to initiate multipart upload: ${initiateResponse.statusText}`) - } - - const { uploadId, key, uploadToken } = await initiateResponse.json() - logger.info(`Initiated multipart upload with ID: ${uploadId}`) - - const chunkSize = UPLOAD_CONFIG.CHUNK_SIZE - const numParts = Math.ceil(file.size / chunkSize) - const partNumbers = Array.from({ length: numParts }, (_, i) => i + 1) - - // boundary-raw-fetch: multipart-signed-url get-part-urls step issues provider-signed PUT URLs consumed by the raw-fetch PUT loop below; kept on raw fetch to preserve retry/abort coordination with the part PUTs - const partUrlsResponse = await fetch('/api/files/multipart?action=get-part-urls', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - uploadToken, - partNumbers, - }), - }) - - if (!partUrlsResponse.ok) { - // boundary-raw-fetch: multipart-signed-url abort step issued from inside the multipart upload error path; keeps cleanup on the same raw-fetch path as the rest of the multipart flow - await fetch('/api/files/multipart?action=abort', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ uploadToken }), + context: 'knowledge-base', + presignedEndpoint: '/api/files/presigned?type=knowledge-base', + presignedOverride: presigned, + onProgress, }) - throw new Error(`Failed to get part URLs: ${partUrlsResponse.statusText}`) - } - - const { presignedUrls } = await partUrlsResponse.json() - - const uploadedParts: Array<{ ETag: string; PartNumber: number }> = [] - - const controller = new AbortController() - const multipartTimeoutId = setTimeout(() => controller.abort(), timeoutMs) - - try { - const uploadPart = async ({ partNumber, url }: any) => { - const start = (partNumber - 1) * chunkSize - const end = Math.min(start + chunkSize, file.size) - const chunk = file.slice(start, end) - - for (let attempt = 0; attempt <= UPLOAD_CONFIG.MULTIPART_MAX_RETRIES; attempt++) { - try { - const partResponse = await fetch(url, { - method: 'PUT', - body: chunk, - signal: controller.signal, - headers: { - 'Content-Type': file.type, - }, - }) - - if (!partResponse.ok) { - throw new Error(`Failed to upload part ${partNumber}: ${partResponse.statusText}`) - } - - const etag = partResponse.headers.get('ETag') || '' - logger.info(`Uploaded part ${partNumber}/${numParts}`) - - if (fileIndex !== undefined) { - const partProgress = Math.min(100, Math.round((partNumber / numParts) * 100)) - setUploadProgress((prev) => ({ - ...prev, - fileStatuses: prev.fileStatuses?.map((fs, idx) => - idx === fileIndex ? { ...fs, progress: partProgress } : fs - ), - })) - } - - return { ETag: etag.replace(/"/g, ''), PartNumber: partNumber } - } catch (partError) { - if (attempt >= UPLOAD_CONFIG.MULTIPART_MAX_RETRIES) { - throw partError - } - - const delay = UPLOAD_CONFIG.RETRY_DELAY_MS * UPLOAD_CONFIG.RETRY_BACKOFF ** attempt - logger.warn( - `Part ${partNumber} failed (attempt ${attempt + 1}), retrying in ${Math.round(delay / 1000)}s` - ) - await sleep(delay) - } - } - - throw new Error(`Retries exhausted for part ${partNumber}`) + return buildUploadedFile(file, toAbsoluteUrl(result.path)) + } catch (error) { + if (error instanceof DirectUploadError && error.code === 'FALLBACK_REQUIRED') { + const { filePath } = await uploadFileThroughAPI(file, options.workspaceId) + return buildUploadedFile(file, toAbsoluteUrl(filePath)) } - const partResults = await runWithConcurrency( - presignedUrls, - UPLOAD_CONFIG.MULTIPART_PART_CONCURRENCY, - uploadPart - ) - - partResults.forEach((result) => { - if (result?.status === 'fulfilled') { - uploadedParts.push(result.value) - } else if (result?.status === 'rejected') { - throw result.reason - } - }) - } finally { - clearTimeout(multipartTimeoutId) - } - - // boundary-raw-fetch: multipart-signed-url complete step finalizes the multipart upload coordinated with raw-fetch part PUTs above - const completeResponse = await fetch('/api/files/multipart?action=complete', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - uploadToken, - parts: uploadedParts, - }), - }) - - if (!completeResponse.ok) { - throw new Error(`Failed to complete multipart upload: ${completeResponse.statusText}`) - } - - const { path } = await completeResponse.json() - logger.info(`Completed multipart upload for ${file.name}`) - - const durationMs = getHighResTime() - startTime - logger.info('Multipart upload metrics', { - fileName: file.name, - sizeMB: formatMegabytes(file.size), - parts: uploadedParts.length, - durationMs: formatDurationSeconds(durationMs), - throughputMbps: calculateThroughputMbps(file.size, durationMs), - }) - - const fullFileUrl = path.startsWith('http') ? path : `${window.location.origin}${path}` - - return createUploadedFile(file.name, fullFileUrl, file.size, getFileContentType(file), file) - } catch (error) { - logger.error(`Multipart upload failed for ${file.name}:`, error) - const durationMs = getHighResTime() - startTime - logger.warn('Falling back to direct upload after multipart failure', { - fileName: file.name, - sizeMB: formatMegabytes(file.size), - durationMs: formatDurationSeconds(durationMs), - }) - return uploadFileDirectly(file, presignedData, timeoutMs, new AbortController(), fileIndex) - } - } - - /** - * Fallback upload through API - */ - const uploadFileThroughAPI = async (file: File, timeoutMs: number): Promise => { - const controller = new AbortController() - const timeoutId = setTimeout(() => controller.abort(), timeoutMs) - - try { - const formData = new FormData() - formData.append('file', file) - formData.append('context', 'knowledge-base') - - if (options.workspaceId) { - formData.append('workspaceId', options.workspaceId) - } - - // boundary-raw-fetch: multipart/form-data upload (FileUpload boundary), incompatible with requestJson which JSON-stringifies bodies - const uploadResponse = await fetch('/api/files/upload', { - method: 'POST', - body: formData, - signal: controller.signal, - }) - - if (!uploadResponse.ok) { - const { message: fullError, body: errorData } = await readApiResponseError(uploadResponse) - throw new DirectUploadError(`Failed to upload ${file.name}: ${fullError}`, errorData) - } - - const uploadResult = await uploadResponse.json() - - const filePath = uploadResult.fileInfo?.path || uploadResult.path + const retryable = isNetworkError(error) || isTransientUploadError(error) + if (isAbortError(error) || !retryable || attempt >= MULTIPART_MAX_RETRIES) { + throw error + } - if (!filePath) { - throw new DirectUploadError( - `Invalid upload response for ${file.name}: missing file path`, - uploadResult + const delay = MULTIPART_RETRY_DELAY_MS * MULTIPART_RETRY_BACKOFF ** attempt + attempt++ + logger.warn( + `Upload retry ${attempt}/${MULTIPART_MAX_RETRIES} for ${file.name} in ${Math.round(delay / 1000)}s` ) + updateFileStatus(fileIndex, { progress: 0, status: 'uploading' }) + await sleep(delay) } - - return createUploadedFile( - file.name, - filePath.startsWith('http') ? filePath : `${window.location.origin}${filePath}`, - file.size, - getFileContentType(file), - file - ) - } finally { - clearTimeout(timeoutId) } } - /** - * Uploads files in batches using presigned URLs - */ const uploadFilesInBatches = async (files: File[]): Promise => { - const results: UploadedFile[] = [] - const failedFiles: Array<{ file: File; error: Error }> = [] - const fileStatuses: FileUploadStatus[] = files.map((file) => ({ fileName: file.name, fileSize: file.size, - status: 'pending' as const, + status: 'pending', progress: 0, })) - setUploadProgress((prev) => ({ - ...prev, - fileStatuses, - })) + setUploadProgress((prev) => ({ ...prev, fileStatuses })) logger.info(`Starting batch upload of ${files.length} files`) - try { - const batches = [] - - for ( - let batchStart = 0; - batchStart < files.length; - batchStart += UPLOAD_CONFIG.BATCH_REQUEST_SIZE - ) { - const batchFiles = files.slice(batchStart, batchStart + UPLOAD_CONFIG.BATCH_REQUEST_SIZE) - const batchIndexOffset = batchStart - batches.push({ batchFiles, batchIndexOffset }) - } - - logger.info(`Starting parallel processing of ${batches.length} batches`) - - const presignedPromises = batches.map(async ({ batchFiles }, batchIndex) => { - logger.info( - `Getting presigned URLs for batch ${batchIndex + 1}/${batches.length} (${batchFiles.length} files)` - ) - - const batchRequest = { - files: batchFiles.map((file) => ({ - fileName: file.name, - contentType: getFileContentType(file), - fileSize: file.size, - })), - } - - // boundary-raw-fetch: signed-URL batch coordination for direct uploads handed to raw-fetch PUTs against provider URLs; kept on raw fetch to preserve the same controller/timeout used by the multipart flow - const batchResponse = await fetch('/api/files/presigned/batch?type=knowledge-base', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify(batchRequest), - }) - - if (!batchResponse.ok) { - const { message: fullError } = await readApiResponseError(batchResponse) - throw new Error(`Batch ${batchIndex + 1} presigned URL generation failed: ${fullError}`) - } - - const { files: presignedData } = await batchResponse.json() - return { batchFiles, presignedData, batchIndex } - }) - - const allPresignedData = await Promise.all(presignedPromises) - logger.info(`Got all presigned URLs, starting uploads`) - - const allUploads = allPresignedData.flatMap(({ batchFiles, presignedData, batchIndex }) => { - const batchIndexOffset = batchIndex * UPLOAD_CONFIG.BATCH_REQUEST_SIZE - - return batchFiles.map((file, batchFileIndex) => { - const fileIndex = batchIndexOffset + batchFileIndex - const presigned = presignedData[batchFileIndex] - - return { file, presigned, fileIndex } - }) - }) - - const uploadResults = await runWithConcurrency( - allUploads, - UPLOAD_CONFIG.MAX_PARALLEL_UPLOADS, - async ({ file, presigned, fileIndex }) => { - if (!presigned) { - throw new Error(`No presigned data for file ${file.name}`) - } + const presignedData = await fetchBatchPresignedData(files) + const settled = await runWithConcurrency( + files, + WHOLE_FILE_PARALLEL_UPLOADS, + async (file, index) => { + updateFileStatus(index, { status: 'uploading' }) + try { + const uploaded = await uploadOneFile(file, index, presignedData[index]) setUploadProgress((prev) => ({ ...prev, - fileStatuses: prev.fileStatuses?.map((fs, idx) => - idx === fileIndex ? { ...fs, status: 'uploading' as const } : fs - ), + filesCompleted: prev.filesCompleted + 1, })) - - try { - const result = await uploadSingleFileWithRetry(file, 0, fileIndex, presigned) - - setUploadProgress((prev) => ({ - ...prev, - filesCompleted: prev.filesCompleted + 1, - fileStatuses: prev.fileStatuses?.map((fs, idx) => - idx === fileIndex ? { ...fs, status: 'completed' as const, progress: 100 } : fs - ), - })) - - return result - } catch (error) { - setUploadProgress((prev) => ({ - ...prev, - fileStatuses: prev.fileStatuses?.map((fs, idx) => - idx === fileIndex - ? { - ...fs, - status: 'failed' as const, - error: getErrorMessage(error), - } - : fs - ), - })) - throw error - } + updateFileStatus(index, { status: 'completed', progress: 100 }) + return uploaded + } catch (error) { + updateFileStatus(index, { status: 'failed', error: getErrorMessage(error) }) + throw error } - ) - - uploadResults.forEach((result, idx) => { - if (result?.status === 'fulfilled') { - results.push(result.value) - } else if (result?.status === 'rejected') { - failedFiles.push({ - file: allUploads[idx].file, - error: - result.reason instanceof Error ? result.reason : new Error(String(result.reason)), - }) - } - }) + } + ) - if (failedFiles.length > 0) { - logger.error(`Failed to upload ${failedFiles.length} files`) - throw new KnowledgeUploadError( - `Failed to upload ${failedFiles.length} file(s)`, - 'PARTIAL_UPLOAD_FAILURE', - { - failedFiles, - uploadedFiles: results, - } - ) + const succeeded: UploadedFile[] = [] + const failed: Array<{ file: File; error: Error }> = [] + settled.forEach((result, idx) => { + if (result?.status === 'fulfilled') { + succeeded.push(result.value) + } else if (result?.status === 'rejected') { + failed.push({ + file: files[idx], + error: result.reason instanceof Error ? result.reason : new Error(String(result.reason)), + }) } + }) - return results - } catch (error) { - logger.error('Batch upload failed:', error) - throw error + if (failed.length > 0) { + throw new KnowledgeUploadError( + `Failed to upload ${failed.length} file(s)`, + 'PARTIAL_UPLOAD_FAILURE', + { failedFiles: failed, uploadedFiles: succeeded } + ) } + + return succeeded } - /** - * Main upload function that handles file uploads and document processing - */ const uploadFiles = async ( files: File[], knowledgeBaseId: string, @@ -1040,7 +359,6 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { if (files.length === 0) { throw new KnowledgeUploadError('No files provided for upload', 'NO_FILES') } - if (!knowledgeBaseId?.trim()) { throw new KnowledgeUploadError('Knowledge base ID is required', 'INVALID_KB_ID') } @@ -1054,43 +372,49 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { setUploadProgress((prev) => ({ ...prev, stage: 'processing' })) - const processPayload = { - documents: uploadedFiles.map((file) => ({ - ...file, - })), - processingOptions: { - recipe: processingOptions.recipe ?? 'default', - lang: 'en', - }, - bulk: true, - } + // boundary-raw-fetch: bulk document-processing kickoff with dynamic recipe payload; response is consumed alongside the upload progress lifecycle and not modeled by a single contract + const processResponse = await fetch(`/api/knowledge/${knowledgeBaseId}/documents`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + documents: uploadedFiles.map((f) => ({ ...f })), + processingOptions: { + recipe: processingOptions.recipe ?? 'default', + lang: 'en', + }, + bulk: true, + }), + }) - let processResult: Awaited< - ReturnType> - > - try { - processResult = await requestJson(createKnowledgeDocumentsContract, { - params: { id: knowledgeBaseId }, - body: processPayload, + if (!processResponse.ok) { + let errorData: { error?: string; message?: string } | null = null + try { + errorData = (await processResponse.json()) as { error?: string; message?: string } + } catch {} + logger.error('Document processing failed:', { + status: processResponse.status, + error: errorData, }) - } catch (err) { - if (isApiClientError(err)) { - logger.error('Document processing failed:', { - status: err.status, - error: err.body, - uploadedFiles: uploadedFiles.map((f) => ({ - filename: f.filename, - fileUrl: f.fileUrl, - fileSize: f.fileSize, - mimeType: f.mimeType, - })), - }) - throw new ProcessingError(`Failed to start document processing: ${err.message}`, err.body) - } - throw err + throw new ProcessingError( + `Failed to start document processing: ${errorData?.error || errorData?.message || 'Unknown error'}`, + errorData + ) } - if (!('documentsCreated' in processResult.data)) { + const processResult = (await processResponse.json()) as { + success?: boolean + error?: string + data?: { documentsCreated?: unknown } + } + + if (!processResult.success) { + throw new ProcessingError( + `Document processing failed: ${processResult.error || 'Unknown error'}`, + processResult + ) + } + + if (!processResult.data?.documentsCreated) { throw new ProcessingError( 'Invalid processing response: missing document data', processResult @@ -1098,23 +422,25 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { } setUploadProgress((prev) => ({ ...prev, stage: 'completing' })) - logger.info(`Successfully started processing ${uploadedFiles.length} documents`) - await queryClient.invalidateQueries({ - queryKey: knowledgeKeys.detail(knowledgeBaseId), - }) + await queryClient.invalidateQueries({ queryKey: knowledgeKeys.detail(knowledgeBaseId) }) return uploadedFiles } catch (err) { logger.error('Error uploading documents:', err) - const error = createErrorFromException(err, 'Unknown error occurred during upload') + const error: UploadError = + err instanceof KnowledgeUploadError + ? { message: err.message, code: err.code, details: err.details, timestamp: Date.now() } + : err instanceof DirectUploadError + ? { message: err.message, code: err.code, details: err.details, timestamp: Date.now() } + : err instanceof Error + ? { message: err.message, timestamp: Date.now() } + : { message: 'Unknown error occurred during upload', timestamp: Date.now() } + setUploadError(error) options.onError?.(error) - - logger.error('Document upload failed:', error.message) - throw err } finally { setIsUploading(false) @@ -1122,9 +448,6 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) { } } - /** - * Clears the current upload error - */ const clearError = useCallback(() => { setUploadError(null) }, []) diff --git a/apps/sim/hooks/queries/workspace-files.ts b/apps/sim/hooks/queries/workspace-files.ts index aa3951ada26..a894c4988d7 100644 --- a/apps/sim/hooks/queries/workspace-files.ts +++ b/apps/sim/hooks/queries/workspace-files.ts @@ -1,17 +1,25 @@ import { createLogger } from '@sim/logger' +import { sleep } from '@sim/utils/helpers' import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query' import { toast } from '@/components/emcn' -import { isApiClientError } from '@/lib/api/client/errors' +import { ApiClientError, isApiClientError } from '@/lib/api/client/errors' import { requestJson } from '@/lib/api/client/request' import { getUsageLimitsContract } from '@/lib/api/contracts/usage-limits' import { deleteWorkspaceFileContract, listWorkspaceFilesContract, + registerWorkspaceFileContract, renameWorkspaceFileContract, restoreWorkspaceFileContract, updateWorkspaceFileContentContract, } from '@/lib/api/contracts/workspace-files' +import { + DirectUploadError, + runUploadStrategy, + type UploadProgressEvent, +} from '@/lib/uploads/client/direct-upload' import type { WorkspaceFileRecord } from '@/lib/uploads/contexts/workspace' +import type { UserFile } from '@/executor/types' const logger = createLogger('WorkspaceFilesQuery') @@ -195,36 +203,134 @@ export function useStorageInfo(enabled = true) { interface UploadFileParams { workspaceId: string file: File + onProgress?: (event: UploadProgressEvent) => void + signal?: AbortSignal } -export function useUploadWorkspaceFile() { - const queryClient = useQueryClient() +interface UploadFileResponse { + success: boolean + file: UserFile +} - return useMutation({ - mutationFn: async ({ workspaceId, file }: UploadFileParams) => { - const formData = new FormData() - formData.append('file', file) - - // boundary-raw-fetch: multipart/form-data file upload, requestJson only supports JSON bodies - const response = await fetch(`/api/workspaces/${workspaceId}/files`, { - method: 'POST', - body: formData, - }) +async function uploadViaApiFallback( + workspaceId: string, + file: File, + signal?: AbortSignal +): Promise { + const formData = new FormData() + formData.append('file', file) + + // boundary-raw-fetch: multipart/form-data fallback upload, requestJson only supports JSON bodies + const response = await fetch(`/api/workspaces/${workspaceId}/files`, { + method: 'POST', + body: formData, + signal, + }) + + return parseUploadResponse(response, 'Upload failed') +} - const data = await response.json() +async function parseUploadResponse( + response: Response, + fallbackMessage: string +): Promise { + let data: { success?: boolean; error?: string; file?: UserFile } | null = null + try { + data = await response.json() + } catch {} - if (!data.success) { - throw new Error(data.error || 'Upload failed') - } + if (!response.ok || !data?.success) { + throw new Error(data?.error || `${fallbackMessage} (${response.status})`) + } + return data as UploadFileResponse +} - return data - }, +async function uploadWorkspaceFile( + workspaceId: string, + file: File, + onProgress?: (event: UploadProgressEvent) => void, + signal?: AbortSignal +): Promise { + let result + try { + result = await runUploadStrategy({ + file, + presignedEndpoint: `/api/workspaces/${workspaceId}/files/presigned`, + workspaceId, + context: 'workspace', + onProgress, + signal, + }) + } catch (error) { + if (error instanceof DirectUploadError && error.code === 'FALLBACK_REQUIRED') { + return uploadViaApiFallback(workspaceId, file, signal) + } + throw error + } + + const data = await registerWithRetry(workspaceId, result, signal) + + if (!data.success || !data.file) { + throw new Error(data.error || 'Failed to register file') + } + return { success: true, file: data.file } +} + +const REGISTER_MAX_ATTEMPTS = 3 +const REGISTER_RETRY_DELAY_MS = 500 + +/** + * Register the uploaded object with bounded retries. The server-side handler + * is idempotent (existing-record short-circuit), so safely retrying handles + * dropped responses that would otherwise orphan the object in storage. + */ +async function registerWithRetry( + workspaceId: string, + result: { key: string; name: string; contentType: string }, + signal?: AbortSignal +) { + let lastError: unknown + for (let attempt = 1; attempt <= REGISTER_MAX_ATTEMPTS; attempt++) { + try { + return await requestJson(registerWorkspaceFileContract, { + params: { id: workspaceId }, + body: { + key: result.key, + name: result.name, + contentType: result.contentType, + }, + signal, + }) + } catch (error) { + lastError = error + if (signal?.aborted) throw error + const isTransient = + !(error instanceof ApiClientError) || (error.status >= 500 && error.status < 600) + if (!isTransient || attempt === REGISTER_MAX_ATTEMPTS) throw error + await sleep(REGISTER_RETRY_DELAY_MS * attempt) + } + } + throw lastError +} + +export function useUploadWorkspaceFile() { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: ({ workspaceId, file, onProgress, signal }: UploadFileParams) => + uploadWorkspaceFile(workspaceId, file, onProgress, signal), onSettled: () => { queryClient.invalidateQueries({ queryKey: workspaceFilesKeys.lists() }) queryClient.invalidateQueries({ queryKey: workspaceFilesKeys.storageInfo() }) }, - onError: (error) => { + onSuccess: (_data, variables) => { + toast.success(`Uploaded "${variables.file.name}"`) + }, + onError: (error, variables) => { logger.error('Failed to upload file:', error) + toast.error(`Failed to upload "${variables.file.name}": ${error.message}`, { + duration: 5000, + }) }, }) } diff --git a/apps/sim/lib/api/contracts/workspace-files.ts b/apps/sim/lib/api/contracts/workspace-files.ts index c7a3c9436c7..6aae8b2d2df 100644 --- a/apps/sim/lib/api/contracts/workspace-files.ts +++ b/apps/sim/lib/api/contracts/workspace-files.ts @@ -143,3 +143,76 @@ export const workspaceFileCompiledCheckContract = defineRouteContract({ schema: compiledCheckResponseSchema, }, }) + +export const workspacePresignedUploadBodySchema = z.object({ + fileName: z.string().min(1, 'fileName is required'), + contentType: z.string().min(1, 'contentType is required'), + fileSize: z.number().nonnegative('fileSize must be a non-negative number'), +}) + +export type WorkspacePresignedUploadBody = z.input + +const workspacePresignedFileInfoSchema = z.object({ + path: z.string(), + key: z.string(), + name: z.string(), + size: z.number(), + type: z.string(), +}) + +const workspacePresignedUploadResponseSchema = z.object({ + fileName: z.string(), + presignedUrl: z.string(), + fileInfo: workspacePresignedFileInfoSchema, + uploadHeaders: z.record(z.string(), z.string()).optional(), + directUploadSupported: z.boolean(), +}) + +export const workspacePresignedUploadContract = defineRouteContract({ + method: 'POST', + path: '/api/workspaces/[id]/files/presigned', + params: workspaceFilesParamsSchema, + body: workspacePresignedUploadBodySchema, + response: { + mode: 'json', + schema: workspacePresignedUploadResponseSchema, + }, +}) + +export const registerWorkspaceFileBodySchema = z.object({ + key: z.string().min(1, 'key is required'), + name: z.string().min(1, 'name is required'), + contentType: z.string().min(1, 'contentType is required'), +}) + +export type RegisterWorkspaceFileBody = z.input + +const registeredWorkspaceFileSchema = z.object({ + id: z.string(), + name: z.string(), + url: z.string(), + size: z.number(), + type: z.string(), + key: z.string(), + context: z.string().optional(), +}) + +const registerWorkspaceFileResponseSchema = z.object({ + success: z.boolean(), + file: registeredWorkspaceFileSchema.optional(), + error: z.string().optional(), + isDuplicate: z.boolean().optional(), +}) + +export type RegisterWorkspaceFileResponse = z.output + +export const registerWorkspaceFileContract = defineRouteContract({ + method: 'POST', + path: '/api/workspaces/[id]/files/register', + params: workspaceFilesParamsSchema, + body: registerWorkspaceFileBodySchema, + response: { + mode: 'json', + schema: registerWorkspaceFileResponseSchema, + }, +}) diff --git a/apps/sim/lib/uploads/client/direct-upload.test.ts b/apps/sim/lib/uploads/client/direct-upload.test.ts new file mode 100644 index 00000000000..26c8bb99a1e --- /dev/null +++ b/apps/sim/lib/uploads/client/direct-upload.test.ts @@ -0,0 +1,225 @@ +/** + * @vitest-environment jsdom + */ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { + DirectUploadError, + type PresignedUploadInfo, + runUploadStrategy, +} from '@/lib/uploads/client/direct-upload' + +const ONE_MB = 1024 * 1024 +const LARGE_THRESHOLD = 50 * ONE_MB + +const makeFile = (size: number, name = 'test.bin', type = 'application/octet-stream'): File => { + const file = new File([new Uint8Array(0)], name, { type }) + Object.defineProperty(file, 'size', { value: size }) + return file +} + +const presigned = (overrides?: Partial): PresignedUploadInfo => ({ + fileName: 'test.bin', + presignedUrl: 'https://s3/presigned', + fileInfo: { + path: '/api/files/serve/test', + key: 'workspace/ws-1/test.bin', + name: 'test.bin', + size: ONE_MB, + type: 'application/octet-stream', + }, + uploadHeaders: undefined, + directUploadSupported: true, + ...overrides, +}) + +class MockXHR { + static instances: MockXHR[] = [] + upload = { addEventListener: vi.fn() } + status = 200 + statusText = 'OK' + private listeners: Record void>> = {} + open = vi.fn() + setRequestHeader = vi.fn() + abort = vi.fn() + send = vi.fn(() => { + queueMicrotask(() => this.listeners.load?.forEach((cb) => cb())) + }) + addEventListener = (event: string, cb: () => void) => { + ;(this.listeners[event] ??= []).push(cb) + } + removeEventListener = vi.fn() + constructor() { + MockXHR.instances.push(this) + } +} + +describe('runUploadStrategy', () => { + let originalXHR: typeof XMLHttpRequest + + beforeEach(() => { + MockXHR.instances = [] + originalXHR = globalThis.XMLHttpRequest + globalThis.XMLHttpRequest = MockXHR as unknown as typeof XMLHttpRequest + }) + + afterEach(() => { + globalThis.XMLHttpRequest = originalXHR + vi.restoreAllMocks() + }) + + it('uses presigned PUT for files at or below the multipart threshold', async () => { + const file = makeFile(LARGE_THRESHOLD) + + const result = await runUploadStrategy({ + file, + workspaceId: 'ws-1', + context: 'workspace', + presignedOverride: presigned(), + }) + + expect(result.key).toBe('workspace/ws-1/test.bin') + expect(MockXHR.instances).toHaveLength(1) + expect(MockXHR.instances[0].open).toHaveBeenCalledWith('PUT', 'https://s3/presigned') + }) + + it('throws FALLBACK_REQUIRED when server signals no cloud storage', async () => { + const file = makeFile(ONE_MB) + + await expect( + runUploadStrategy({ + file, + workspaceId: 'ws-1', + context: 'workspace', + presignedOverride: presigned({ presignedUrl: '', directUploadSupported: false }), + }) + ).rejects.toMatchObject({ + name: 'DirectUploadError', + code: 'FALLBACK_REQUIRED', + }) + }) + + it('takes the multipart path for files larger than the threshold and posts unified parts', async () => { + const file = makeFile(LARGE_THRESHOLD + ONE_MB) + const calls: Array<{ url: string; body: unknown }> = [] + + const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => { + const url = typeof input === 'string' ? input : input.toString() + const rawBody = init?.body + const body = typeof rawBody === 'string' ? JSON.parse(rawBody) : undefined + calls.push({ url, body }) + + if (url.includes('action=initiate')) { + return new Response( + JSON.stringify({ uploadId: 'u1', key: 'workspace/ws-1/big.bin', uploadToken: 't' }), + { status: 200 } + ) + } + if (url.includes('action=get-part-urls')) { + return new Response( + JSON.stringify({ + presignedUrls: [ + { partNumber: 1, url: 'https://s3/part1' }, + { partNumber: 2, url: 'https://s3/part2' }, + ], + }), + { status: 200 } + ) + } + if (url.startsWith('https://s3/part')) { + return new Response(null, { status: 200, headers: { ETag: '"etag-x"' } }) + } + if (url.includes('action=complete')) { + return new Response(JSON.stringify({ path: '/api/files/serve/big' }), { status: 200 }) + } + throw new Error(`unexpected url ${url}`) + }) + + vi.stubGlobal('fetch', fetchMock) + + const result = await runUploadStrategy({ + file, + workspaceId: 'ws-1', + context: 'workspace', + }) + + expect(result.path).toBe('/api/files/serve/big') + + const completeCall = calls.find((c) => c.url.includes('action=complete'))! + expect(completeCall.body).toMatchObject({ + uploadToken: 't', + parts: [ + { partNumber: 1, etag: 'etag-x' }, + { partNumber: 2, etag: 'etag-x' }, + ], + }) + }) + + it('rejects with ABORTED when signal is already aborted before PUT begins', async () => { + const file = makeFile(ONE_MB) + const controller = new AbortController() + controller.abort() + + await expect( + runUploadStrategy({ + file, + workspaceId: 'ws-1', + context: 'workspace', + presignedOverride: presigned(), + signal: controller.signal, + }) + ).rejects.toMatchObject({ name: 'DirectUploadError', code: 'ABORTED' }) + }) + + it('fires action=abort when the multipart complete call fails', async () => { + const file = makeFile(LARGE_THRESHOLD + ONE_MB) + const calls: Array<{ url: string }> = [] + + const fetchMock = vi.fn(async (input: RequestInfo | URL) => { + const url = typeof input === 'string' ? input : input.toString() + calls.push({ url }) + + if (url.includes('action=initiate')) { + return new Response( + JSON.stringify({ uploadId: 'u1', key: 'workspace/ws-1/big.bin', uploadToken: 't' }), + { status: 200 } + ) + } + if (url.includes('action=get-part-urls')) { + return new Response( + JSON.stringify({ + presignedUrls: [ + { partNumber: 1, url: 'https://s3/part1' }, + { partNumber: 2, url: 'https://s3/part2' }, + ], + }), + { status: 200 } + ) + } + if (url.startsWith('https://s3/part')) { + return new Response(null, { status: 200, headers: { ETag: '"etag-x"' } }) + } + if (url.includes('action=complete')) { + return new Response(JSON.stringify({ error: 'kaboom' }), { status: 500 }) + } + if (url.includes('action=abort')) { + return new Response(null, { status: 200 }) + } + throw new Error(`unexpected url ${url}`) + }) + + vi.stubGlobal('fetch', fetchMock) + + await expect( + runUploadStrategy({ file, workspaceId: 'ws-1', context: 'workspace' }) + ).rejects.toBeInstanceOf(DirectUploadError) + + expect(calls.some((c) => c.url.includes('action=abort'))).toBe(true) + }) + + it('throws when neither presignedEndpoint nor presignedOverride is supplied', async () => { + const file = makeFile(ONE_MB) + await expect( + runUploadStrategy({ file, workspaceId: 'ws-1', context: 'workspace' }) + ).rejects.toBeInstanceOf(DirectUploadError) + }) +}) diff --git a/apps/sim/lib/uploads/client/direct-upload.ts b/apps/sim/lib/uploads/client/direct-upload.ts new file mode 100644 index 00000000000..07322b071f4 --- /dev/null +++ b/apps/sim/lib/uploads/client/direct-upload.ts @@ -0,0 +1,591 @@ +import { createLogger } from '@sim/logger' +import { sleep } from '@sim/utils/helpers' +import { getFileContentType, isAbortError } from '@/lib/uploads/utils/file-utils' + +const logger = createLogger('DirectUpload') + +const CHUNK_SIZE = 8 * 1024 * 1024 +export const LARGE_FILE_THRESHOLD = 50 * 1024 * 1024 +const BASE_TIMEOUT_MS = 2 * 60 * 1000 +const TIMEOUT_PER_MB_MS = 1500 +const MAX_TIMEOUT_MS = 10 * 60 * 1000 +export const MULTIPART_PART_CONCURRENCY = 3 +export const MULTIPART_MAX_RETRIES = 3 +export const MULTIPART_RETRY_DELAY_MS = 2000 +export const MULTIPART_RETRY_BACKOFF = 2 +export const WHOLE_FILE_PARALLEL_UPLOADS = 3 + +export interface PresignedFileInfo { + path: string + key: string + name: string + size: number + type: string +} + +export interface PresignedUploadInfo { + fileName: string + presignedUrl: string + fileInfo: PresignedFileInfo + uploadHeaders?: Record + directUploadSupported: boolean +} + +export interface UploadStrategyResult { + key: string + path: string + name: string + size: number + contentType: string +} + +export interface UploadProgressEvent { + loaded: number + total: number + percent: number +} + +export type DirectUploadErrorCode = + | 'PRESIGNED_URL_ERROR' + | 'DIRECT_UPLOAD_ERROR' + | 'MULTIPART_ERROR' + | 'ABORTED' + | 'FALLBACK_REQUIRED' + +export class DirectUploadError extends Error { + constructor( + message: string, + public code: DirectUploadErrorCode, + public details?: unknown, + public status?: number + ) { + super(message) + this.name = 'DirectUploadError' + } +} + +/** + * Transport-level upload errors worth retrying at the outer level: timeouts, + * network failures, and 5xx from the storage backend. Excludes deterministic + * client failures (4xx, `PRESIGNED_URL_ERROR`, `FALLBACK_REQUIRED`) and aborts. + */ +export const isTransientUploadError = (error: unknown): boolean => { + if (!(error instanceof DirectUploadError)) return false + if (error.code !== 'DIRECT_UPLOAD_ERROR' && error.code !== 'MULTIPART_ERROR') return false + if (error.status === undefined) return true + return error.status >= 500 && error.status < 600 +} + +export const calculateUploadTimeoutMs = (fileSize: number): number => { + const sizeInMb = fileSize / (1024 * 1024) + const dynamicBudget = BASE_TIMEOUT_MS + sizeInMb * TIMEOUT_PER_MB_MS + return Math.min(dynamicBudget, MAX_TIMEOUT_MS) +} + +/** + * Run `worker` over `items` with at most `limit` concurrent invocations. + * Returns a settled result per item (never rejects), so callers can handle + * partial failures explicitly. + */ +export const runWithConcurrency = async ( + items: T[], + limit: number, + worker: (item: T, index: number) => Promise +): Promise>> => { + const results: Array> = Array(items.length) + if (items.length === 0) return results + + const concurrency = Math.max(1, Math.min(limit, items.length)) + let nextIndex = 0 + + const runners = Array.from({ length: concurrency }, async () => { + while (true) { + const currentIndex = nextIndex++ + if (currentIndex >= items.length) break + try { + const value = await worker(items[currentIndex], currentIndex) + results[currentIndex] = { status: 'fulfilled', value } + } catch (error) { + results[currentIndex] = { status: 'rejected', reason: error } + } + } + }) + + await Promise.all(runners) + return results +} + +/** + * Normalize a presigned-upload server response into a {@link PresignedUploadInfo}. + * Accepts both single (`/api/files/presigned`) and batch entry shapes, tolerates + * `presignedUrl` vs `uploadUrl` aliases, and short-circuits when the server + * signals no cloud storage (`directUploadSupported: false`) so callers can fall + * back to a server-proxied upload path. + * + * @throws {@link DirectUploadError} with code `PRESIGNED_URL_ERROR` if the + * response is missing a presigned URL or `fileInfo.path`. + */ +export const normalizePresignedData = (data: unknown, context: string): PresignedUploadInfo => { + const d = (data ?? {}) as Record + const presignedUrl = (d.presignedUrl as string) || (d.uploadUrl as string) || '' + const fileInfo = d.fileInfo as Record | undefined + const directUploadSupported = d.directUploadSupported !== false + + if (!directUploadSupported) { + return { + fileName: (d.fileName as string) || context, + presignedUrl: '', + fileInfo: { path: '', key: '', name: context, size: 0, type: '' }, + directUploadSupported: false, + } + } + + if (!presignedUrl || !fileInfo?.path) { + throw new DirectUploadError( + `Invalid presigned response for ${context}`, + 'PRESIGNED_URL_ERROR', + data + ) + } + + return { + fileName: (d.fileName as string) || (fileInfo.name as string) || context, + presignedUrl, + fileInfo: { + path: fileInfo.path as string, + key: (fileInfo.key as string) || '', + name: (fileInfo.name as string) || context, + size: (fileInfo.size as number) || (d.fileSize as number) || 0, + type: (fileInfo.type as string) || (d.contentType as string) || '', + }, + uploadHeaders: (d.uploadHeaders as Record) || undefined, + directUploadSupported: true, + } +} + +export interface GetPresignedOptions { + endpoint: string + file: File + signal?: AbortSignal +} + +/** + * Fetch a single presigned upload URL from a server endpoint that follows the + * `{ presignedUrl, fileInfo, uploadHeaders?, directUploadSupported }` contract. + */ +export const getPresignedUploadInfo = async ( + opts: GetPresignedOptions +): Promise => { + const { endpoint, file, signal } = opts + const response = await fetch(endpoint, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + fileName: file.name, + contentType: getFileContentType(file), + fileSize: file.size, + }), + signal, + }) + + if (!response.ok) { + let errorDetails: unknown = null + try { + errorDetails = await response.json() + } catch {} + const serverMessage = + errorDetails != null && + typeof errorDetails === 'object' && + typeof (errorDetails as Record).error === 'string' + ? ((errorDetails as Record).error as string) + : null + throw new DirectUploadError( + serverMessage || + `Failed to get presigned URL for ${file.name}: ${response.status} ${response.statusText}`, + 'PRESIGNED_URL_ERROR', + errorDetails + ) + } + + return normalizePresignedData(await response.json(), file.name) +} + +interface UploadViaPutOptions { + file: File + presignedUrl: string + uploadHeaders?: Record + signal?: AbortSignal + onProgress?: (event: UploadProgressEvent) => void +} + +const uploadViaPresignedPut = (opts: UploadViaPutOptions): Promise => { + const { file, presignedUrl, uploadHeaders, signal, onProgress } = opts + + return new Promise((resolve, reject) => { + const xhr = new XMLHttpRequest() + let isCompleted = false + const timeoutMs = calculateUploadTimeoutMs(file.size) + + const timeoutId = setTimeout(() => { + if (isCompleted) return + isCompleted = true + signal?.removeEventListener('abort', abortHandler) + xhr.abort() + reject(new DirectUploadError(`Upload timeout for ${file.name}`, 'DIRECT_UPLOAD_ERROR')) + }, timeoutMs) + + const abortHandler = () => { + if (isCompleted) return + isCompleted = true + clearTimeout(timeoutId) + xhr.abort() + reject(new DirectUploadError(`Upload aborted for ${file.name}`, 'ABORTED')) + } + + if (signal) { + if (signal.aborted) { + abortHandler() + return + } + signal.addEventListener('abort', abortHandler) + } + + xhr.upload.addEventListener('progress', (event) => { + if (event.lengthComputable && !isCompleted) { + onProgress?.({ + loaded: event.loaded, + total: event.total, + percent: Math.round((event.loaded / event.total) * 100), + }) + } + }) + + xhr.addEventListener('load', () => { + if (isCompleted) return + isCompleted = true + clearTimeout(timeoutId) + signal?.removeEventListener('abort', abortHandler) + + if (xhr.status >= 200 && xhr.status < 300) { + onProgress?.({ loaded: file.size, total: file.size, percent: 100 }) + resolve() + } else { + reject( + new DirectUploadError( + `Direct upload failed for ${file.name}: ${xhr.status} ${xhr.statusText}`, + 'DIRECT_UPLOAD_ERROR', + undefined, + xhr.status + ) + ) + } + }) + + xhr.addEventListener('error', () => { + if (isCompleted) return + isCompleted = true + clearTimeout(timeoutId) + signal?.removeEventListener('abort', abortHandler) + reject(new DirectUploadError(`Network error uploading ${file.name}`, 'DIRECT_UPLOAD_ERROR')) + }) + + xhr.open('PUT', presignedUrl) + xhr.setRequestHeader('Content-Type', getFileContentType(file)) + if (uploadHeaders) { + for (const [key, value] of Object.entries(uploadHeaders)) { + xhr.setRequestHeader(key, value) + } + } + xhr.send(file) + }) +} + +interface MultipartUploadOptions { + file: File + workspaceId: string + context: 'workspace' | 'knowledge-base' + signal?: AbortSignal + onProgress?: (event: UploadProgressEvent) => void +} + +interface CompletedPart { + partNumber: number + etag?: string +} + +interface PartUrl { + partNumber: number + url: string +} + +const uploadViaMultipart = async ( + opts: MultipartUploadOptions +): Promise<{ key: string; path: string }> => { + const { file, workspaceId, context, signal, onProgress } = opts + + // boundary-raw-fetch: multipart upload control plane uses action query strings; client lifecycle (initiate/get-part-urls/complete/abort) is sequenced manually and not modeled by a single contract + const initiateResponse = await fetch('/api/files/multipart?action=initiate', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + fileName: file.name, + contentType: getFileContentType(file), + fileSize: file.size, + workspaceId, + context, + }), + signal, + }) + + if (!initiateResponse.ok) { + let errorBody: { error?: string } | null = null + try { + errorBody = (await initiateResponse.clone().json()) as { error?: string } + } catch {} + if ( + initiateResponse.status === 400 && + typeof errorBody?.error === 'string' && + errorBody.error.toLowerCase().includes('cloud storage') + ) { + throw new DirectUploadError( + 'Server signaled fallback to API upload', + 'FALLBACK_REQUIRED', + errorBody + ) + } + throw new DirectUploadError( + `Failed to initiate multipart upload: ${initiateResponse.statusText}`, + 'MULTIPART_ERROR', + undefined, + initiateResponse.status + ) + } + + const { key, uploadToken } = (await initiateResponse.json()) as { + uploadId: string + key: string + uploadToken: string + } + + const numParts = Math.ceil(file.size / CHUNK_SIZE) + const partNumbers = Array.from({ length: numParts }, (_, i) => i + 1) + + const abortMultipart = async () => { + try { + // boundary-raw-fetch: fire-and-forget abort during multipart cleanup; intentionally avoids contract response parsing so cleanup cannot mask the original error + await fetch('/api/files/multipart?action=abort', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ uploadToken }), + }) + } catch (err) { + logger.warn('Failed to abort multipart upload:', err) + } + } + + let presignedUrls: PartUrl[] + try { + // boundary-raw-fetch: multipart upload control plane uses action query strings; sequenced with initiate/complete/abort outside the contract layer + const partUrlsResponse = await fetch('/api/files/multipart?action=get-part-urls', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ uploadToken, partNumbers }), + signal, + }) + + if (!partUrlsResponse.ok) { + throw new DirectUploadError( + `Failed to get part URLs: ${partUrlsResponse.statusText}`, + 'MULTIPART_ERROR', + undefined, + partUrlsResponse.status + ) + } + + ;({ presignedUrls } = (await partUrlsResponse.json()) as { presignedUrls: PartUrl[] }) + } catch (err) { + await abortMultipart() + throw err + } + + const completedBytes = new Array(numParts).fill(0) + const reportProgress = () => { + const loaded = completedBytes.reduce((a, b) => a + b, 0) + onProgress?.({ + loaded, + total: file.size, + percent: Math.min(100, Math.round((loaded / file.size) * 100)), + }) + } + + const uploadedParts: CompletedPart[] = [] + + try { + const uploadPart = async ({ partNumber, url }: PartUrl): Promise => { + const start = (partNumber - 1) * CHUNK_SIZE + const end = Math.min(start + CHUNK_SIZE, file.size) + const chunk = file.slice(start, end) + + for (let attempt = 0; attempt <= MULTIPART_MAX_RETRIES; attempt++) { + try { + const partResponse = await fetch(url, { + method: 'PUT', + body: chunk, + signal, + headers: { 'Content-Type': getFileContentType(file) }, + }) + + if (!partResponse.ok) { + throw new DirectUploadError( + `Failed to upload part ${partNumber}: ${partResponse.statusText}`, + 'MULTIPART_ERROR', + undefined, + partResponse.status + ) + } + + const etag = partResponse.headers.get('ETag') || undefined + completedBytes[partNumber - 1] = end - start + reportProgress() + + return { partNumber, etag: etag?.replace(/"/g, '') } + } catch (partError) { + const isClientError = + partError instanceof DirectUploadError && + partError.status !== undefined && + partError.status >= 400 && + partError.status < 500 + if (isAbortError(partError) || isClientError || attempt >= MULTIPART_MAX_RETRIES) { + throw partError + } + const delay = MULTIPART_RETRY_DELAY_MS * MULTIPART_RETRY_BACKOFF ** attempt + logger.warn( + `Part ${partNumber} failed (attempt ${attempt + 1}), retrying in ${Math.round(delay / 1000)}s` + ) + await sleep(delay) + } + } + + throw new DirectUploadError(`Retries exhausted for part ${partNumber}`, 'MULTIPART_ERROR') + } + + const partResults = await runWithConcurrency( + presignedUrls, + MULTIPART_PART_CONCURRENCY, + uploadPart + ) + + for (const result of partResults) { + if (result?.status === 'fulfilled') { + uploadedParts.push(result.value) + } else if (result?.status === 'rejected') { + throw result.reason + } + } + } catch (error) { + await abortMultipart() + throw error + } + + let path: string + try { + // boundary-raw-fetch: multipart upload control plane uses action query strings; sequenced with initiate/get-part-urls/abort outside the contract layer + const completeResponse = await fetch('/api/files/multipart?action=complete', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ uploadToken, parts: uploadedParts }), + signal, + }) + + if (!completeResponse.ok) { + throw new DirectUploadError( + `Failed to complete multipart upload: ${completeResponse.statusText}`, + 'MULTIPART_ERROR', + undefined, + completeResponse.status + ) + } + + ;({ path } = (await completeResponse.json()) as { path: string }) + } catch (err) { + await abortMultipart() + throw err + } + return { key, path } +} + +export interface RunUploadStrategyOptions { + file: File + workspaceId: string + context: 'workspace' | 'knowledge-base' + /** Endpoint to mint a presigned PUT URL. Required unless `presignedOverride` is provided. */ + presignedEndpoint?: string + /** Pre-fetched presigned data (e.g. from a batch endpoint). Skips per-file fetch. */ + presignedOverride?: PresignedUploadInfo + signal?: AbortSignal + onProgress?: (event: UploadProgressEvent) => void +} + +/** + * Strategy ladder for client-side uploads: + * - Files larger than {@link LARGE_FILE_THRESHOLD} use multipart S3/Blob with chunked PUTs. + * - Smaller files use a presigned PUT URL (fetched per-file, or supplied via + * `presignedOverride` for batched flows like KB). + * - If the server signals no cloud storage is configured, a {@link DirectUploadError} + * with code `FALLBACK_REQUIRED` is thrown so callers can fall back to a server-proxied path. + */ +export const runUploadStrategy = async ( + opts: RunUploadStrategyOptions +): Promise => { + const { file, presignedEndpoint, presignedOverride, workspaceId, context, signal, onProgress } = + opts + const contentType = getFileContentType(file) + + if (presignedOverride && !presignedOverride.directUploadSupported) { + throw new DirectUploadError('Server signaled fallback to API upload', 'FALLBACK_REQUIRED') + } + + if (file.size > LARGE_FILE_THRESHOLD) { + const { key, path } = await uploadViaMultipart({ + file, + workspaceId, + context, + signal, + onProgress, + }) + return { key, path, name: file.name, size: file.size, contentType } + } + + let presigned: PresignedUploadInfo + if (presignedOverride) { + presigned = presignedOverride + } else { + if (!presignedEndpoint) { + throw new DirectUploadError( + 'runUploadStrategy requires either presignedEndpoint or presignedOverride', + 'PRESIGNED_URL_ERROR' + ) + } + presigned = await getPresignedUploadInfo({ endpoint: presignedEndpoint, file, signal }) + } + + if (!presigned.directUploadSupported) { + throw new DirectUploadError('Server signaled fallback to API upload', 'FALLBACK_REQUIRED') + } + + await uploadViaPresignedPut({ + file, + presignedUrl: presigned.presignedUrl, + uploadHeaders: presigned.uploadHeaders, + signal, + onProgress, + }) + + return { + key: presigned.fileInfo.key, + path: presigned.fileInfo.path, + name: file.name, + size: file.size, + contentType, + } +} diff --git a/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts b/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts index d79798bc3b5..20780b71392 100644 --- a/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts +++ b/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts @@ -7,6 +7,7 @@ import { db } from '@sim/db' import { workspaceFiles } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { getPostgresErrorCode } from '@sim/utils/errors' +import { generateShortId } from '@sim/utils/id' import { and, eq, isNull, sql } from 'drizzle-orm' import { checkStorageQuota, @@ -16,8 +17,15 @@ import { import { normalizeVfsSegment } from '@/lib/copilot/vfs/normalize-segment' import { generateRestoreName } from '@/lib/core/utils/restore-name' import { getServePathPrefix } from '@/lib/uploads' -import { downloadFile, hasCloudStorage, uploadFile } from '@/lib/uploads/core/storage-service' +import { + deleteFile, + downloadFile, + hasCloudStorage, + headObject, + uploadFile, +} from '@/lib/uploads/core/storage-service' import { getFileMetadataByKey, insertFileMetadata } from '@/lib/uploads/server/metadata' +import { MAX_WORKSPACE_FILE_SIZE } from '@/lib/uploads/shared/types' import { getWorkspaceWithOwner } from '@/lib/workspaces/permissions/utils' import { isUuid, sanitizeFileName } from '@/executor/constants' import type { UserFile } from '@/executor/types' @@ -152,7 +160,7 @@ export async function uploadWorkspaceFile( for (let attempt = 0; attempt < MAX_UPLOAD_UNIQUE_RETRIES; attempt++) { const uniqueName = await allocateUniqueWorkspaceFileName(workspaceId, fileName) const storageKey = generateWorkspaceFileKey(workspaceId, uniqueName) - let fileId = `wf_${Date.now()}_${Math.random().toString(36).substring(2, 9)}` + let fileId = `wf_${generateShortId()}` try { logger.info(`Generated storage key: ${storageKey}`) @@ -167,12 +175,12 @@ export async function uploadWorkspaceFile( const uploadResult = await uploadFile({ file: fileBuffer, - fileName: storageKey, // Use the full storageKey as fileName + fileName: storageKey, contentType, context: 'workspace', - preserveKey: true, // Don't add timestamp prefix - customKey: storageKey, // Explicitly set the key - metadata, // Pass metadata for cloud storage consistency + preserveKey: true, + customKey: storageKey, + metadata, }) logger.info(`Upload returned key: ${uploadResult.key}`) @@ -232,7 +240,7 @@ export async function uploadWorkspaceFile( name: uniqueName, size: fileBuffer.length, type: contentType, - url: serveUrl, // Use authenticated serve URL (enforces context) + url: serveUrl, key: uploadResult.key, context: 'workspace', } @@ -261,6 +269,144 @@ export async function uploadWorkspaceFile( throw new FileConflictError(fileName) } +/** + * Finalize a workspace file that was uploaded directly to cloud storage + * (presigned PUT or completed multipart). Verifies the object exists, + * checks quota, allocates a non-colliding display name, inserts metadata, + * and increments storage usage. + * + * Throws if the object is missing in storage, quota is exceeded, or the + * caller cannot resolve a unique name within the retry budget. + */ +export interface RegisterUploadedWorkspaceFileResult { + file: UserFile + /** True when a new metadata row was inserted; false when an existing row was reused. */ + created: boolean +} + +export async function registerUploadedWorkspaceFile(params: { + workspaceId: string + userId: string + key: string + originalName: string + contentType: string +}): Promise { + const { workspaceId, userId, key, originalName, contentType } = params + + if (!hasCloudStorage()) { + throw new Error('Direct-upload registration requires cloud storage') + } + + if (parseWorkspaceFileKey(key) !== workspaceId) { + throw new Error('Storage key does not belong to this workspace') + } + + const head = await headObject(key, 'workspace') + if (!head) { + throw new Error('Uploaded object not found in storage') + } + const verifiedSize = head.size + + const cleanupOrphan = async (reason: string) => { + try { + await deleteFile({ key, context: 'workspace' }) + } catch (deleteError) { + logger.error(`Failed to clean up orphaned object after ${reason}`, deleteError) + } + } + + if (verifiedSize > MAX_WORKSPACE_FILE_SIZE) { + await cleanupOrphan('size-cap rejection') + throw new Error(`File size exceeds maximum of ${MAX_WORKSPACE_FILE_SIZE} bytes`) + } + + /** + * Existence check precedes the quota guard so a network-dropped retry doesn't + * double-charge quota or orphan-cleanup an already-registered object. + */ + const existing = await getFileMetadataByKey(key, 'workspace') + + let fileId = existing?.id ?? '' + let displayName = existing?.originalName ?? '' + let created = false + + if (!existing) { + const quotaCheck = await checkStorageQuota(userId, verifiedSize) + if (!quotaCheck.allowed) { + await cleanupOrphan('quota rejection') + throw new Error(quotaCheck.error || 'Storage limit exceeded') + } + + let lastInsertError: unknown + for (let attempt = 0; attempt < MAX_UPLOAD_UNIQUE_RETRIES; attempt++) { + fileId = `wf_${generateShortId()}` + displayName = await allocateUniqueWorkspaceFileName(workspaceId, originalName) + try { + await insertFileMetadata({ + id: fileId, + key, + userId, + workspaceId, + context: 'workspace', + originalName: displayName, + contentType, + size: verifiedSize, + }) + created = true + lastInsertError = undefined + break + } catch (insertError) { + lastInsertError = insertError + if (getPostgresErrorCode(insertError) === '23505') { + logger.warn( + `Unique name conflict on register (attempt ${attempt + 1}/${MAX_UPLOAD_UNIQUE_RETRIES}), retrying with a new name` + ) + continue + } + break + } + } + + if (!created) { + logger.error( + 'Failed to insert metadata after direct upload; cleaning up storage object', + lastInsertError + ) + await cleanupOrphan('metadata insert failure') + if (getPostgresErrorCode(lastInsertError) === '23505') { + throw new FileConflictError(originalName) + } + throw lastInsertError instanceof Error + ? lastInsertError + : new Error('Failed to insert workspace file metadata') + } + + try { + await incrementStorageUsage(userId, verifiedSize) + } catch (storageError) { + logger.error('Failed to update storage tracking:', storageError) + } + } else { + logger.info(`Using existing metadata record for direct upload: ${key}`) + } + + const pathPrefix = getServePathPrefix() + const serveUrl = `${pathPrefix}${encodeURIComponent(key)}?context=workspace` + + return { + file: { + id: fileId, + name: displayName, + size: verifiedSize, + type: contentType, + url: serveUrl, + key, + context: 'workspace', + }, + created, + } +} + /** * Track a file that was already uploaded to workspace S3 as a chat-scoped upload. * Links the existing workspaceFiles metadata record (created by the storage service @@ -293,7 +439,7 @@ export async function trackChatUpload( return } - const fileId = `wf_${Date.now()}_${Math.random().toString(36).substring(2, 9)}` + const fileId = `wf_${generateShortId()}` await db.insert(workspaceFiles).values({ id: fileId, diff --git a/apps/sim/lib/uploads/core/storage-service.ts b/apps/sim/lib/uploads/core/storage-service.ts index b504db175e8..93ee3c734b7 100644 --- a/apps/sim/lib/uploads/core/storage-service.ts +++ b/apps/sim/lib/uploads/core/storage-service.ts @@ -237,6 +237,30 @@ export async function deleteFile(options: DeleteFileOptions): Promise { await unlink(filePath) } +/** + * Check whether an object exists in the configured cloud storage provider. + * Returns object size and content-type when present, or null when missing. + * Throws on errors other than "not found". For local filesystem, returns null. + */ +export async function headObject( + key: string, + context: StorageContext +): Promise<{ size: number; contentType?: string } | null> { + const config = getStorageConfig(context) + + if (USE_BLOB_STORAGE) { + const { headBlobObject } = await import('@/lib/uploads/providers/blob/client') + return headBlobObject(key, createBlobConfig(config)) + } + + if (USE_S3_STORAGE) { + const { headS3Object } = await import('@/lib/uploads/providers/s3/client') + return headS3Object(key, createS3Config(config)) + } + + return null +} + /** * Generate a presigned URL for direct file upload */ @@ -251,6 +275,7 @@ export async function generatePresignedUploadUrl( userId, expirationSeconds = 3600, metadata = {}, + customKey, } = options const allMetadata = { @@ -263,10 +288,15 @@ export async function generatePresignedUploadUrl( const config = getStorageConfig(context) - const timestamp = Date.now() - const uniqueId = Math.random().toString(36).substring(2, 9) - const safeFileName = fileName.replace(/[^a-zA-Z0-9.-]/g, '_') - const key = `${context}/${timestamp}-${uniqueId}-${safeFileName}` + let key: string + if (customKey) { + key = customKey + } else { + const timestamp = Date.now() + const uniqueId = Math.random().toString(36).substring(2, 9) + const safeFileName = fileName.replace(/[^a-zA-Z0-9.-]/g, '_') + key = `${context}/${timestamp}-${uniqueId}-${safeFileName}` + } if (USE_S3_STORAGE) { return generateS3PresignedUrl( diff --git a/apps/sim/lib/uploads/providers/blob/client.ts b/apps/sim/lib/uploads/providers/blob/client.ts index b32f4c616fe..ee1c0505661 100644 --- a/apps/sim/lib/uploads/providers/blob/client.ts +++ b/apps/sim/lib/uploads/providers/blob/client.ts @@ -305,6 +305,58 @@ export async function downloadFromBlob(key: string, customConfig?: BlobConfig): return downloaded } +/** + * Check whether a blob exists (and return its size when it does). + * Returns null when the blob is missing. + */ +export async function headBlobObject( + key: string, + customConfig?: BlobConfig +): Promise<{ size: number; contentType?: string } | null> { + const { BlobServiceClient, StorageSharedKeyCredential } = await import('@azure/storage-blob') + let blobServiceClient: BlobServiceClientType + let containerName: string + + if (customConfig) { + if (customConfig.connectionString) { + blobServiceClient = BlobServiceClient.fromConnectionString(customConfig.connectionString) + } else if (customConfig.accountName && customConfig.accountKey) { + const credential = new StorageSharedKeyCredential( + customConfig.accountName, + customConfig.accountKey + ) + blobServiceClient = new BlobServiceClient( + `https://${customConfig.accountName}.blob.core.windows.net`, + credential + ) + } else { + throw new Error('Invalid custom blob configuration') + } + containerName = customConfig.containerName + } else { + blobServiceClient = await getBlobServiceClient() + containerName = BLOB_CONFIG.containerName + } + + const containerClient = blobServiceClient.getContainerClient(containerName) + const blockBlobClient = containerClient.getBlockBlobClient(key) + + try { + const properties = await blockBlobClient.getProperties() + return { + size: properties.contentLength ?? 0, + contentType: properties.contentType, + } + } catch (err) { + const status = (err as { statusCode?: number }).statusCode + const code = (err as { code?: string }).code + if (status === 404 || code === 'BlobNotFound') { + return null + } + throw err + } +} + /** * Delete a file from Azure Blob Storage * @param key Blob name @@ -366,6 +418,16 @@ async function streamToBuffer(readableStream: NodeJS.ReadableStream): Promise { const { BlobServiceClient, StorageSharedKeyCredential } = await import('@azure/storage-blob') - const { fileName, contentType, customConfig } = options + const { fileName, contentType, customConfig, customKey } = options let blobServiceClient: BlobServiceClientType let containerName: string @@ -400,7 +462,7 @@ export async function initiateMultipartUpload( } const safeFileName = sanitizeFileName(fileName) - const uniqueKey = `kb/${generateId()}-${safeFileName}` + const uniqueKey = customKey || `kb/${generateId()}-${safeFileName}` const uploadId = generateId() @@ -473,9 +535,7 @@ export async function getMultipartPartUrls( const blockBlobClient = containerClient.getBlockBlobClient(key) return partNumbers.map((partNumber) => { - const blockId = Buffer.from(`block-${partNumber.toString().padStart(6, '0')}`).toString( - 'base64' - ) + const blockId = deriveBlobBlockId(partNumber) const sasOptions = { containerName, diff --git a/apps/sim/lib/uploads/providers/blob/types.ts b/apps/sim/lib/uploads/providers/blob/types.ts index 8223b689384..b024e2a6363 100644 --- a/apps/sim/lib/uploads/providers/blob/types.ts +++ b/apps/sim/lib/uploads/providers/blob/types.ts @@ -10,6 +10,11 @@ export interface AzureMultipartUploadInit { contentType: string fileSize: number customConfig?: BlobConfig + /** + * When provided, overrides the default `kb/${id}-${name}` key derivation. + * Caller is responsible for uniqueness and prefix conventions. + */ + customKey?: string } export interface AzurePartUploadUrl { diff --git a/apps/sim/lib/uploads/providers/s3/client.ts b/apps/sim/lib/uploads/providers/s3/client.ts index 0f27ceed16b..6f347a4bdd6 100644 --- a/apps/sim/lib/uploads/providers/s3/client.ts +++ b/apps/sim/lib/uploads/providers/s3/client.ts @@ -4,6 +4,7 @@ import { CreateMultipartUploadCommand, DeleteObjectCommand, GetObjectCommand, + HeadObjectCommand, PutObjectCommand, S3Client, UploadPartCommand, @@ -197,6 +198,35 @@ export async function downloadFromS3(key: string, customConfig?: S3Config): Prom }) } +/** + * Check whether an object exists in S3 (and return its size when it does). + * Returns null when the object is missing. + */ +export async function headS3Object( + key: string, + customConfig?: S3Config +): Promise<{ size: number; contentType?: string } | null> { + const config = customConfig || { bucket: S3_CONFIG.bucket, region: S3_CONFIG.region } + + try { + const response = await getS3Client().send( + new HeadObjectCommand({ Bucket: config.bucket, Key: key }) + ) + return { + size: response.ContentLength ?? 0, + contentType: response.ContentType, + } + } catch (error) { + const code = (error as { name?: string; $metadata?: { httpStatusCode?: number } } | null)?.name + const status = (error as { $metadata?: { httpStatusCode?: number } } | null)?.$metadata + ?.httpStatusCode + if (code === 'NotFound' || code === 'NoSuchKey' || status === 404) { + return null + } + throw error + } +} + /** * Delete a file from S3 * @param key S3 object key @@ -227,13 +257,13 @@ export async function deleteFromS3(key: string, customConfig?: S3Config): Promis export async function initiateS3MultipartUpload( options: S3MultipartUploadInit ): Promise<{ uploadId: string; key: string }> { - const { fileName, contentType, customConfig } = options + const { fileName, contentType, customConfig, customKey, purpose } = options const config = customConfig || { bucket: S3_KB_CONFIG.bucket, region: S3_KB_CONFIG.region } const s3Client = getS3Client() const safeFileName = sanitizeFileName(fileName) - const uniqueKey = `kb/${generateId()}-${safeFileName}` + const uniqueKey = customKey || `kb/${generateId()}-${safeFileName}` const command = new CreateMultipartUploadCommand({ Bucket: config.bucket, @@ -242,7 +272,7 @@ export async function initiateS3MultipartUpload( Metadata: { originalName: sanitizeFilenameForMetadata(fileName), uploadedAt: new Date().toISOString(), - purpose: 'knowledge-base', + purpose: purpose || 'knowledge-base', }, }) diff --git a/apps/sim/lib/uploads/providers/s3/types.ts b/apps/sim/lib/uploads/providers/s3/types.ts index a4d7cb155f8..266a86a862c 100644 --- a/apps/sim/lib/uploads/providers/s3/types.ts +++ b/apps/sim/lib/uploads/providers/s3/types.ts @@ -8,6 +8,16 @@ export interface S3MultipartUploadInit { contentType: string fileSize: number customConfig?: S3Config + /** + * When provided, overrides the default `kb/${id}-${name}` key derivation. + * Caller is responsible for uniqueness and prefix conventions. + */ + customKey?: string + /** + * Storage purpose tag persisted as object metadata. Defaults to `knowledge-base` + * for backwards compatibility. + */ + purpose?: string } export interface S3PartUploadUrl { diff --git a/apps/sim/lib/uploads/shared/types.ts b/apps/sim/lib/uploads/shared/types.ts index f5d8210db56..30b5cd6b7b6 100644 --- a/apps/sim/lib/uploads/shared/types.ts +++ b/apps/sim/lib/uploads/shared/types.ts @@ -1,3 +1,16 @@ +/** + * Defense-in-depth ceiling on the size of any single workspace file upload. + * Enforced both server-side (presigned route) and client-side (Files tab) so + * users get fast feedback before bytes are streamed. + */ +export const MAX_WORKSPACE_FILE_SIZE = 5 * 1024 * 1024 * 1024 + +/** + * Cap on the legacy FormData upload route, which buffers the whole file in + * worker memory. Direct-to-storage uploads use {@link MAX_WORKSPACE_FILE_SIZE}. + */ +export const MAX_WORKSPACE_FORMDATA_FILE_SIZE = 100 * 1024 * 1024 + export type StorageContext = | 'knowledge-base' | 'chat' @@ -55,6 +68,11 @@ export interface GeneratePresignedUrlOptions { userId?: string expirationSeconds?: number metadata?: Record + /** + * When provided, overrides the default `${context}/${timestamp}-${id}-${name}` key derivation. + * The caller takes responsibility for uniqueness and prefix conventions. + */ + customKey?: string } export interface PresignedUrlResponse { diff --git a/apps/sim/lib/uploads/utils/file-utils.test.ts b/apps/sim/lib/uploads/utils/file-utils.test.ts new file mode 100644 index 00000000000..f5e3c45ebde --- /dev/null +++ b/apps/sim/lib/uploads/utils/file-utils.test.ts @@ -0,0 +1,39 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import { isAbortError, isNetworkError } from '@/lib/uploads/utils/file-utils' + +describe('isAbortError', () => { + it('returns true for AbortError-named errors', () => { + const err = new Error('aborted') + err.name = 'AbortError' + expect(isAbortError(err)).toBe(true) + }) + + it('returns false for generic Errors', () => { + expect(isAbortError(new Error('boom'))).toBe(false) + expect(isAbortError(null)).toBe(false) + expect(isAbortError('AbortError')).toBe(false) + }) +}) + +describe('isNetworkError', () => { + it.each([ + 'fetch failed', + 'Network request failed', + 'connection reset', + 'request timeout', + 'operation timed out', + 'ECONNRESET while reading body', + ])('matches transient message %s', (msg) => { + expect(isNetworkError(new Error(msg))).toBe(true) + }) + + it('does not match deterministic errors', () => { + expect(isNetworkError(new Error('Forbidden'))).toBe(false) + expect(isNetworkError(new Error('Validation failed: name is required'))).toBe(false) + expect(isNetworkError('not an error')).toBe(false) + expect(isNetworkError(null)).toBe(false) + }) +}) diff --git a/apps/sim/lib/uploads/utils/file-utils.ts b/apps/sim/lib/uploads/utils/file-utils.ts index bed6d19d919..b625c22ea42 100644 --- a/apps/sim/lib/uploads/utils/file-utils.ts +++ b/apps/sim/lib/uploads/utils/file-utils.ts @@ -272,13 +272,63 @@ export function getMimeTypeFromExtension(extension: string): string { } /** - * Resolve a reliable MIME type from a file, falling back to extension - * when the browser reports empty or generic `application/octet-stream` + * Resolve a reliable MIME type from a file, falling back to the extension map + * when the browser reports an empty type. By default treats + * `application/octet-stream` as "unknown" and falls back to the extension — + * pass `{ preserveOctetStream: true }` for direct PUT uploads where the + * browser-supplied content-type must match the presigned handshake exactly. + */ +export function resolveFileType( + file: { type: string; name: string }, + options?: { preserveOctetStream?: boolean } +): string { + const browserType = file.type?.trim() + if (browserType) { + if (options?.preserveOctetStream || browserType !== 'application/octet-stream') { + return browserType + } + } + return getMimeTypeFromExtension(getFileExtension(file.name)) +} + +/** + * Upload `Content-Type` for direct PUT — preserves the browser's reported type + * verbatim (including `application/octet-stream`) so it matches the presigned + * URL's signed Content-Type header. + */ +export function getFileContentType(file: File): string { + return resolveFileType(file, { preserveOctetStream: true }) +} + +/** + * Whether `error` is a DOM `AbortError` (XHR `abort()`, fetch `signal.aborted`, + * etc). Used in upload retry loops so aborts short-circuit instead of retrying. */ -export function resolveFileType(file: { type: string; name: string }): string { - return file.type && file.type !== 'application/octet-stream' - ? file.type - : getMimeTypeFromExtension(getFileExtension(file.name)) +export function isAbortError(error: unknown): boolean { + return ( + typeof error === 'object' && + error !== null && + 'name' in error && + String((error as { name?: unknown }).name) === 'AbortError' + ) +} + +/** + * Heuristic: whether `error` is a transient network/connection failure that's + * worth retrying (vs. a deterministic 4xx/auth/validation error). Sniffs the + * message because browsers and servers report these without standardized codes. + */ +export function isNetworkError(error: unknown): boolean { + if (!(error instanceof Error)) return false + const message = error.message.toLowerCase() + return ( + message.includes('network') || + message.includes('fetch') || + message.includes('connection') || + message.includes('timeout') || + message.includes('timed out') || + message.includes('econnreset') + ) } const MIME_TO_EXTENSION: Record = { diff --git a/packages/testing/src/mocks/index.ts b/packages/testing/src/mocks/index.ts index 22ecd6449df..c945ebf3bfc 100644 --- a/packages/testing/src/mocks/index.ts +++ b/packages/testing/src/mocks/index.ts @@ -86,6 +86,8 @@ export { } from './logging-session.mock' // Permission mocks export { permissionsMock, permissionsMockFns } from './permissions.mock' +// PostHog server mocks (for @/lib/posthog/server) +export { posthogServerMock, posthogServerMockFns } from './posthog-server.mock' // Redis client mocks (for Redis client objects) export { clearRedisMocks, createMockRedis, type MockRedis } from './redis.mock' // Redis config mocks (for @/lib/core/config/redis) @@ -106,8 +108,10 @@ export { type MockSocket, type MockSocketServer, } from './socket.mock' -// Storage mocks +// Storage mocks (browser localStorage/sessionStorage) export { clearStorageMocks, createMockStorage, setupGlobalStorageMocks } from './storage.mock' +// Storage service mocks (for @/lib/uploads/core/storage-service) +export { storageServiceMock, storageServiceMockFns } from './storage-service.mock' // Stripe mocks export { createMockStripeEvent, diff --git a/packages/testing/src/mocks/posthog-server.mock.ts b/packages/testing/src/mocks/posthog-server.mock.ts new file mode 100644 index 00000000000..1c8a01716ea --- /dev/null +++ b/packages/testing/src/mocks/posthog-server.mock.ts @@ -0,0 +1,30 @@ +import { vi } from 'vitest' + +/** + * Controllable mock functions for `@/lib/posthog/server`. + * All defaults are bare `vi.fn()` — configure per-test as needed. + * + * @example + * ```ts + * import { posthogServerMockFns } from '@sim/testing' + * + * expect(posthogServerMockFns.mockCaptureServerEvent).toHaveBeenCalledWith(...) + * ``` + */ +export const posthogServerMockFns = { + mockCaptureServerEvent: vi.fn(), + mockGetPostHogClient: vi.fn(() => null), +} + +/** + * Static mock module for `@/lib/posthog/server`. + * + * @example + * ```ts + * vi.mock('@/lib/posthog/server', () => posthogServerMock) + * ``` + */ +export const posthogServerMock = { + captureServerEvent: posthogServerMockFns.mockCaptureServerEvent, + getPostHogClient: posthogServerMockFns.mockGetPostHogClient, +} diff --git a/packages/testing/src/mocks/storage-service.mock.ts b/packages/testing/src/mocks/storage-service.mock.ts new file mode 100644 index 00000000000..effa9666f11 --- /dev/null +++ b/packages/testing/src/mocks/storage-service.mock.ts @@ -0,0 +1,47 @@ +import { vi } from 'vitest' + +/** + * Controllable mock functions for `@/lib/uploads/core/storage-service`. + * All defaults are bare `vi.fn()` — configure per-test as needed. + * + * @example + * ```ts + * import { storageServiceMockFns } from '@sim/testing' + * + * storageServiceMockFns.mockHasCloudStorage.mockReturnValue(true) + * storageServiceMockFns.mockGeneratePresignedUploadUrl.mockResolvedValue({ + * uploadUrl: 'https://s3/test', key: 'workspace/x/y', ... + * }) + * ``` + */ +export const storageServiceMockFns = { + mockUploadFile: vi.fn(), + mockDownloadFile: vi.fn(), + mockDeleteFile: vi.fn(), + mockHeadObject: vi.fn(), + mockGeneratePresignedUploadUrl: vi.fn(), + mockGenerateBatchPresignedUploadUrls: vi.fn(), + mockGeneratePresignedDownloadUrl: vi.fn(), + mockHasCloudStorage: vi.fn(() => false), + mockGetS3InfoForKey: vi.fn(), +} + +/** + * Static mock module for `@/lib/uploads/core/storage-service`. + * + * @example + * ```ts + * vi.mock('@/lib/uploads/core/storage-service', () => storageServiceMock) + * ``` + */ +export const storageServiceMock = { + uploadFile: storageServiceMockFns.mockUploadFile, + downloadFile: storageServiceMockFns.mockDownloadFile, + deleteFile: storageServiceMockFns.mockDeleteFile, + headObject: storageServiceMockFns.mockHeadObject, + generatePresignedUploadUrl: storageServiceMockFns.mockGeneratePresignedUploadUrl, + generateBatchPresignedUploadUrls: storageServiceMockFns.mockGenerateBatchPresignedUploadUrls, + generatePresignedDownloadUrl: storageServiceMockFns.mockGeneratePresignedDownloadUrl, + hasCloudStorage: storageServiceMockFns.mockHasCloudStorage, + getS3InfoForKey: storageServiceMockFns.mockGetS3InfoForKey, +} diff --git a/scripts/check-api-validation-contracts.ts b/scripts/check-api-validation-contracts.ts index 05dad203c3b..07f0e43802b 100644 --- a/scripts/check-api-validation-contracts.ts +++ b/scripts/check-api-validation-contracts.ts @@ -9,8 +9,8 @@ const QUERY_HOOKS_DIR = path.join(ROOT, 'apps/sim/hooks/queries') const SELECTOR_HOOKS_DIR = path.join(ROOT, 'apps/sim/hooks/selectors') const BASELINE = { - totalRoutes: 722, - zodRoutes: 722, + totalRoutes: 724, + zodRoutes: 724, nonZodRoutes: 0, } as const