Skip to content

Commit a1b2130

Browse files
authored
improvement(knowledge): batch trigger dispatch, prune redundant DB roundtrips (#4680)
* improvement(knowledge): batch trigger dispatch, prune redundant DB roundtrips Connector sync was dispatching Trigger.dev document-processing jobs one HTTP roundtrip at a time. processDocumentsWithQueue now uses tasks.batchTrigger when Trigger.dev is available, collapsing N roundtrips to ceil(N/1000). Idempotency keys protect against duplicate runs on retry. Also trims DB roundtrips inside the sync loop: - Per-batch isConnectorDeleted + isKnowledgeBaseDeleted collapsed into a single checkSyncLiveness JOIN (one SELECT instead of two per batch). - Dropped redundant pre-upload isKnowledgeBaseDeleted checks from addDocument/updateDocument: the batch-boundary liveness check already catches pre-batch deletions and the in-tx FOR UPDATE is authoritative for races during the batch. - Removed dead processDocumentsWithTrigger helper (never called). * refactor(knowledge): split dispatch helpers, drop dead trigger branch - Use the canonical DocumentProcessingPayload from the task module instead of the duplicate DocumentJobData interface in service.ts - Pass typeof processDocumentTask as a generic to tasks.batchTrigger so the payload shape is type-checked against the task definition - Inline TRIGGER_BATCH_SIZE provenance (Trigger.dev SDK 4.3.1+ doc'd cap, we're on 4.4.3) - Split direct vs trigger dispatch into dispatchInProcess and dispatchViaBatchTrigger; collapse the all-failed throw into a single check on the combined dispatched counter - Remove dispatchDocumentProcessingJob — its trigger branch is no longer reachable now that batchTrigger handles the trigger path, and the direct branch is inlined * improvement(knowledge): log Trigger.dev batchIds for audit trail tasks.batchTrigger returns a batchId per call. Collecting and logging them after dispatch makes it possible to look up or cancel batches in the Trigger.dev dashboard when investigating stuck or missing documents. * improvement(knowledge): thread requestId through direct dispatch logs Symmetry polish: dispatchInProcess now includes [requestId] in its error log so direct-mode failures are correlatable the same way trigger-mode failures already are. * improvement(knowledge): trim verbose comments Tightens TSDoc on processDocumentsWithQueue, TRIGGER_BATCH_SIZE, checkSyncLiveness, and the idempotency-key inline comment.
1 parent c381550 commit a1b2130

2 files changed

Lines changed: 120 additions & 123 deletions

File tree

apps/sim/lib/knowledge/connectors/sync-engine.ts

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,30 @@ type DocOp =
5858
| { type: 'add'; extDoc: ExternalDocument }
5959
| { type: 'update'; existingId: string; extDoc: ExternalDocument }
6060

61-
async function isConnectorDeleted(connectorId: string): Promise<boolean> {
61+
/** Single-roundtrip liveness check used between batches. */
62+
async function checkSyncLiveness(
63+
connectorId: string,
64+
knowledgeBaseId: string
65+
): Promise<{ connectorDeleted: boolean; knowledgeBaseDeleted: boolean }> {
6266
const rows = await db
63-
.select({ archivedAt: knowledgeConnector.archivedAt, deletedAt: knowledgeConnector.deletedAt })
67+
.select({
68+
connectorArchivedAt: knowledgeConnector.archivedAt,
69+
connectorDeletedAt: knowledgeConnector.deletedAt,
70+
kbDeletedAt: knowledgeBase.deletedAt,
71+
})
6472
.from(knowledgeConnector)
65-
.where(eq(knowledgeConnector.id, connectorId))
73+
.innerJoin(knowledgeBase, eq(knowledgeBase.id, knowledgeConnector.knowledgeBaseId))
74+
.where(and(eq(knowledgeConnector.id, connectorId), eq(knowledgeBase.id, knowledgeBaseId)))
6675
.limit(1)
67-
return rows.length === 0 || rows[0].archivedAt !== null || rows[0].deletedAt !== null
68-
}
6976

70-
async function isKnowledgeBaseDeleted(knowledgeBaseId: string): Promise<boolean> {
71-
const rows = await db
72-
.select({ deletedAt: knowledgeBase.deletedAt })
73-
.from(knowledgeBase)
74-
.where(eq(knowledgeBase.id, knowledgeBaseId))
75-
.limit(1)
76-
return rows.length === 0 || rows[0].deletedAt !== null
77+
if (rows.length === 0) {
78+
return { connectorDeleted: true, knowledgeBaseDeleted: true }
79+
}
80+
const row = rows[0]
81+
return {
82+
connectorDeleted: row.connectorArchivedAt !== null || row.connectorDeletedAt !== null,
83+
knowledgeBaseDeleted: row.kbDeletedAt !== null,
84+
}
7785
}
7886

7987
async function isKnowledgeBaseActiveInTx(
@@ -502,10 +510,11 @@ export async function executeSync(
502510
}
503511

504512
for (let i = 0; i < pendingOps.length; i += SYNC_BATCH_SIZE) {
505-
if (await isConnectorDeleted(connectorId)) {
513+
const liveness = await checkSyncLiveness(connectorId, connector.knowledgeBaseId)
514+
if (liveness.connectorDeleted) {
506515
throw new ConnectorDeletedException(connectorId)
507516
}
508-
if (await isKnowledgeBaseDeleted(connector.knowledgeBaseId)) {
517+
if (liveness.knowledgeBaseDeleted) {
509518
throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`)
510519
}
511520

@@ -642,11 +651,12 @@ export async function executeSync(
642651
}
643652
}
644653

645-
// Check if connector was deleted before retrying stuck documents
646-
if (await isConnectorDeleted(connectorId)) {
654+
// Check if connector/KB were deleted before retrying stuck documents
655+
const postBatchLiveness = await checkSyncLiveness(connectorId, connector.knowledgeBaseId)
656+
if (postBatchLiveness.connectorDeleted) {
647657
throw new ConnectorDeletedException(connectorId)
648658
}
649-
if (await isKnowledgeBaseDeleted(connector.knowledgeBaseId)) {
659+
if (postBatchLiveness.knowledgeBaseDeleted) {
650660
throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`)
651661
}
652662

@@ -881,9 +891,6 @@ async function addDocument(
881891
extDoc: ExternalDocument,
882892
sourceConfig?: Record<string, unknown>
883893
): Promise<DocumentData> {
884-
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
885-
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
886-
}
887894
const documentId = generateId()
888895
const contentBuffer = Buffer.from(extDoc.content, 'utf-8')
889896
const safeTitle = sanitizeStorageTitle(extDoc.title)
@@ -963,9 +970,6 @@ async function updateDocument(
963970
extDoc: ExternalDocument,
964971
sourceConfig?: Record<string, unknown>
965972
): Promise<DocumentData> {
966-
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
967-
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
968-
}
969973
// Fetch old file URL before uploading replacement
970974
const existingRows = await db
971975
.select({ fileUrl: document.fileUrl })

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 93 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ import { estimateTokenCount } from '@/lib/tokenization/estimators'
4848
import { deleteFile } from '@/lib/uploads/core/storage-service'
4949
import { extractStorageKey } from '@/lib/uploads/utils/file-utils'
5050
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
51-
import type { DocumentProcessingPayload } from '@/background/knowledge-processing'
51+
import type {
52+
DocumentProcessingPayload,
53+
processDocument as processDocumentTask,
54+
} from '@/background/knowledge-processing'
5255
import { calculateCost } from '@/providers/utils'
5356

5457
const logger = createLogger('DocumentService')
@@ -102,35 +105,6 @@ export interface ProcessingOptions {
102105
lang?: string
103106
}
104107

105-
interface DocumentJobData {
106-
knowledgeBaseId: string
107-
documentId: string
108-
docData: {
109-
filename: string
110-
fileUrl: string
111-
fileSize: number
112-
mimeType: string
113-
}
114-
processingOptions: ProcessingOptions
115-
requestId: string
116-
}
117-
118-
async function dispatchDocumentProcessingJob(payload: DocumentJobData): Promise<void> {
119-
if (isTriggerAvailable()) {
120-
await tasks.trigger('knowledge-process-document', payload, {
121-
tags: [`knowledgeBaseId:${payload.knowledgeBaseId}`, `documentId:${payload.documentId}`],
122-
})
123-
return
124-
}
125-
126-
await processDocumentAsync(
127-
payload.knowledgeBaseId,
128-
payload.documentId,
129-
payload.docData,
130-
payload.processingOptions
131-
)
132-
}
133-
134108
interface DocumentTagData {
135109
tagName: string
136110
fieldType: string
@@ -314,13 +288,16 @@ async function processDocumentTags(
314288
return result
315289
}
316290

317-
export async function processDocumentsWithQueue(
318-
createdDocuments: DocumentData[],
291+
/** Per-call cap for `tasks.batchTrigger` on Trigger.dev SDK 4.3.1+. */
292+
const TRIGGER_BATCH_SIZE = 1000
293+
294+
function buildJobPayload(
295+
doc: DocumentData,
319296
knowledgeBaseId: string,
320297
processingOptions: ProcessingOptions,
321298
requestId: string
322-
): Promise<void> {
323-
const jobPayloads = createdDocuments.map<DocumentJobData>((doc) => ({
299+
): DocumentProcessingPayload {
300+
return {
324301
knowledgeBaseId,
325302
documentId: doc.documentId,
326303
docData: {
@@ -331,35 +308,99 @@ export async function processDocumentsWithQueue(
331308
},
332309
processingOptions,
333310
requestId,
334-
}))
311+
}
312+
}
335313

336-
logger.info(
337-
`[${requestId}] Dispatching background processing for ${jobPayloads.length} documents`,
338-
{
339-
backend: isTriggerAvailable() ? 'trigger-dev' : 'direct',
340-
}
314+
/**
315+
* Dispatches document processing jobs via Trigger.dev's `batchTrigger` when
316+
* available, or in-process otherwise. Throws only when every dispatch fails;
317+
* partial failures are logged and recovered by the next sync's stuck-doc pass.
318+
*/
319+
export async function processDocumentsWithQueue(
320+
createdDocuments: DocumentData[],
321+
knowledgeBaseId: string,
322+
processingOptions: ProcessingOptions,
323+
requestId: string
324+
): Promise<void> {
325+
if (createdDocuments.length === 0) return
326+
327+
const jobPayloads = createdDocuments.map((doc) =>
328+
buildJobPayload(doc, knowledgeBaseId, processingOptions, requestId)
341329
)
342330

343-
const results = await Promise.allSettled(
344-
jobPayloads.map((payload) => dispatchDocumentProcessingJob(payload))
331+
const useTrigger = isTriggerAvailable()
332+
logger.info(
333+
`[${requestId}] Dispatching background processing for ${jobPayloads.length} documents`,
334+
{ backend: useTrigger ? 'trigger-dev' : 'direct' }
345335
)
346336

347-
const failures = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected')
348-
if (failures.length > 0) {
349-
logger.error(`[${requestId}] ${failures.length}/${results.length} document dispatches failed`, {
350-
errors: failures.map((f) => getErrorMessage(f.reason)),
351-
})
352-
}
337+
const dispatched = useTrigger
338+
? await dispatchViaBatchTrigger(jobPayloads, requestId)
339+
: await dispatchInProcess(jobPayloads, requestId)
353340

354341
logger.info(
355-
`[${requestId}] Document dispatch complete: ${results.length - failures.length}/${results.length} succeeded`
342+
`[${requestId}] Document dispatch complete: ${dispatched}/${jobPayloads.length} succeeded`
356343
)
357344

358-
if (failures.length === results.length) {
359-
throw new Error(`All ${failures.length} document processing dispatches failed`)
345+
if (dispatched === 0) {
346+
throw new Error(`All ${jobPayloads.length} document processing dispatches failed`)
347+
}
348+
}
349+
350+
async function dispatchViaBatchTrigger(
351+
jobPayloads: DocumentProcessingPayload[],
352+
requestId: string
353+
): Promise<number> {
354+
let dispatched = 0
355+
const batchIds: string[] = []
356+
for (let i = 0; i < jobPayloads.length; i += TRIGGER_BATCH_SIZE) {
357+
const chunk = jobPayloads.slice(i, i + TRIGGER_BATCH_SIZE)
358+
try {
359+
const result = await tasks.batchTrigger<typeof processDocumentTask>(
360+
'knowledge-process-document',
361+
chunk.map((payload) => ({
362+
payload,
363+
options: {
364+
// Scoped to (documentId, requestId): blocks intra-dispatch retries
365+
// from double-enqueuing; later syncs use a fresh requestId.
366+
idempotencyKey: `doc-process-${payload.documentId}-${requestId}`,
367+
tags: [
368+
`knowledgeBaseId:${payload.knowledgeBaseId}`,
369+
`documentId:${payload.documentId}`,
370+
],
371+
},
372+
}))
373+
)
374+
batchIds.push(result.batchId)
375+
dispatched += chunk.length
376+
} catch (error) {
377+
logger.error(`[${requestId}] Failed to batchTrigger ${chunk.length} document jobs`, {
378+
error: getErrorMessage(error),
379+
})
380+
}
381+
}
382+
if (batchIds.length > 0) {
383+
logger.info(`[${requestId}] Trigger.dev batches dispatched`, { batchIds })
360384
}
385+
return dispatched
386+
}
361387

362-
return
388+
async function dispatchInProcess(
389+
jobPayloads: DocumentProcessingPayload[],
390+
requestId: string
391+
): Promise<number> {
392+
const results = await Promise.allSettled(
393+
jobPayloads.map((p) =>
394+
processDocumentAsync(p.knowledgeBaseId, p.documentId, p.docData, p.processingOptions)
395+
)
396+
)
397+
let dispatched = 0
398+
for (const r of results) {
399+
if (r.status === 'fulfilled') dispatched++
400+
else
401+
logger.error(`[${requestId}] Document dispatch failed`, { error: getErrorMessage(r.reason) })
402+
}
403+
return dispatched
363404
}
364405

365406
export async function processDocumentAsync(
@@ -698,54 +739,6 @@ export function isTriggerAvailable(): boolean {
698739
return Boolean(env.TRIGGER_SECRET_KEY) && isTriggerDevEnabled
699740
}
700741

701-
async function processDocumentsWithTrigger(
702-
documents: DocumentProcessingPayload[],
703-
requestId: string
704-
): Promise<{ success: boolean; message: string; batchIds?: string[] }> {
705-
if (!isTriggerAvailable()) {
706-
throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing')
707-
}
708-
709-
try {
710-
logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`)
711-
712-
const MAX_BATCH_SIZE = 1000
713-
const batchIds: string[] = []
714-
715-
for (let i = 0; i < documents.length; i += MAX_BATCH_SIZE) {
716-
const chunk = documents.slice(i, i + MAX_BATCH_SIZE)
717-
const batchResult = await tasks.batchTrigger(
718-
'knowledge-process-document',
719-
chunk.map((doc) => ({
720-
payload: doc,
721-
options: {
722-
idempotencyKey: `doc-process-${doc.documentId}-${requestId}`,
723-
tags: [`knowledgeBaseId:${doc.knowledgeBaseId}`, `documentId:${doc.documentId}`],
724-
},
725-
}))
726-
)
727-
batchIds.push(batchResult.batchId)
728-
}
729-
730-
logger.info(
731-
`[${requestId}] Triggered ${documents.length} document processing jobs in ${batchIds.length} batch(es)`
732-
)
733-
734-
return {
735-
success: true,
736-
message: `${documents.length} document processing jobs triggered`,
737-
batchIds,
738-
}
739-
} catch (error) {
740-
logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error)
741-
742-
return {
743-
success: false,
744-
message: getErrorMessage(error, 'Failed to trigger background jobs'),
745-
}
746-
}
747-
}
748-
749742
export async function createDocumentRecords(
750743
documents: Array<{
751744
filename: string

0 commit comments

Comments
 (0)