diff --git a/.server-changes/ck-index-master-queue-dedup.md b/.server-changes/ck-index-master-queue-dedup.md new file mode 100644 index 00000000000..a2ff6495e61 --- /dev/null +++ b/.server-changes/ck-index-master-queue-dedup.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Concurrency-keyed queues now use a single master queue entry per base queue instead of one entry per key. Prevents high-CK-count tenants from consuming the entire parentQueueLimit window and starving other tenants on the same shard. diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 7ebfaf660d6..c3b3dab8e84 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -636,6 +636,9 @@ export class RunQueue { const queueKey = this.keys.queueKey(env, message.queue, concurrencyKey); + // For CK queues, use wildcard dedup so all CKs share one worker queue processing job + const dedupQueueKey = concurrencyKey ? this.keys.toCkWildcard(queueKey) : queueKey; + propagation.inject(context.active(), message); span.setAttributes({ @@ -656,10 +659,10 @@ export class RunQueue { if (!skipDequeueProcessing) { // This will move the message to the worker queue so it can be dequeued await this.worker.enqueueOnce({ - id: queueKey, // dedupe by environment, queue, and concurrency key + id: dedupQueueKey, // dedupe by environment and base queue (CK wildcard for CK queues) job: "processQueueForWorkerQueue", payload: { - queueKey, + queueKey: dedupQueueKey, environmentId: env.id, }, // Add a small delay to dedupe messages so at most one of these will processed, @@ -768,12 +771,17 @@ export class RunQueue { }); if (!options?.skipDequeueProcessing) { + // For CK queues, use wildcard dedup so all CKs share one worker queue processing job + const dedupQueueKey = message.concurrencyKey + ? this.keys.toCkWildcard(message.queue) + : message.queue; + // This will move the message to the worker queue so it can be dequeued await this.worker.enqueueOnce({ - id: message.queue, // dedupe by environment, queue, and concurrency key + id: dedupQueueKey, // dedupe by environment and base queue job: "processQueueForWorkerQueue", payload: { - queueKey: message.queue, + queueKey: dedupQueueKey, environmentId: message.environmentId, }, // Add a small delay to dedupe messages so at most one of these will processed, @@ -848,12 +856,17 @@ export class RunQueue { } if (!skipDequeueProcessing) { + // For CK queues, use wildcard dedup so all CKs share one worker queue processing job + const dedupQueueKey = message.concurrencyKey + ? this.keys.toCkWildcard(message.queue) + : message.queue; + // This will move the message to the worker queue so it can be dequeued await this.worker.enqueueOnce({ - id: message.queue, // dedupe by environment, queue, and concurrency key + id: dedupQueueKey, // dedupe by environment and base queue job: "processQueueForWorkerQueue", payload: { - queueKey: message.queue, + queueKey: dedupQueueKey, environmentId: message.environmentId, }, // Add a small delay to dedupe messages so at most one of these will processed, @@ -1374,9 +1387,15 @@ export class RunQueue { const keyPrefix = this.options.redis.keyPrefix ?? ""; for (const [masterQueueKey, queueNames] of queuesByMasterKey) { - // Deduplicate queue names within each master queue shard - const uniqueQueueNames = [...new Set(queueNames)]; - pipeline.migrateLegacyMasterQueues(masterQueueKey, keyPrefix, ...uniqueQueueNames); + // For CK queues, skip the legacy rebalance — the CK index was already + // updated inside the Lua script, and migrateLegacyMasterQueues would + // re-add the concrete :ck:bar member to the master queue. + // Only rebalance non-CK queues here. + const nonCkQueues = queueNames.filter((q) => !q.includes(":ck:")); + if (nonCkQueues.length > 0) { + const uniqueQueueNames = [...new Set(nonCkQueues)]; + pipeline.migrateLegacyMasterQueues(masterQueueKey, keyPrefix, ...uniqueQueueNames); + } } await pipeline.exec(); @@ -1453,6 +1472,40 @@ export class RunQueue { return this.#processMasterQueueShard(shard, environmentId, maxCount); } + // Test-only: directly dequeue from a queue (CK or non-CK) without enqueuing to worker queue + async testDequeueFromMasterQueue( + shard: number, + environmentId: string, + maxCount: number = 10 + ): Promise { + const masterQueueKey = this.keys.masterQueueKeyForShard(shard); + const envQueues = await this.queueSelectionStrategy.distributeFairQueuesFromParentQueue( + masterQueueKey, + environmentId + ); + + const allMessages: DequeuedMessage[] = []; + + for (const env of envQueues) { + for (const queue of env.queues) { + const messages = this.keys.isCkWildcard(queue) + ? await this.#callDequeueMessagesFromCkQueue({ + ckWildcardQueue: queue, + shard, + maxCount, + }) + : await this.#callDequeueMessagesFromQueue({ + messageQueue: queue, + shard, + maxCount, + }); + allMessages.push(...messages); + } + } + + return allMessages; + } + async #processMasterQueueShard(shard: number, consumerId: string, maxCount: number = 10) { return this.#trace( "processMasterQueueShard", @@ -1498,14 +1551,20 @@ export class RunQueue { attemptedQueues++; // Attempt to dequeue from this queue - const [error, messages] = await tryCatch( - this.#callDequeueMessagesFromQueue({ - messageQueue: queue, - shard, - // TODO: make this configurable - maxCount, - }) - ); + // Dispatch CK wildcard queues to the CK-aware dequeue path + const dequeuePromise = this.keys.isCkWildcard(queue) + ? this.#callDequeueMessagesFromCkQueue({ + ckWildcardQueue: queue, + shard, + maxCount, + }) + : this.#callDequeueMessagesFromQueue({ + messageQueue: queue, + shard, + maxCount, + }); + + const [error, messages] = await tryCatch(dequeuePromise); if (error) { this.logger.error( @@ -1519,6 +1578,11 @@ export class RunQueue { continue; } + if (messages.length > 0) { + // Reset cooloff state on successful dequeue + this._queueCooloffStates.delete(queue); + } + if (messages.length === 0) { if (cooloffState._tag === "normal") { const cooloffCountThreshold = Math.max( @@ -1589,11 +1653,18 @@ export class RunQueue { service: this.name, }); - const messages = await this.#callDequeueMessagesFromQueue({ - messageQueue: queueKey, - shard, - maxCount: 10, - }); + // Dispatch CK wildcard queues to the CK-aware dequeue path + const messages = this.keys.isCkWildcard(queueKey) + ? await this.#callDequeueMessagesFromCkQueue({ + ckWildcardQueue: queueKey, + shard, + maxCount: 10, + }) + : await this.#callDequeueMessagesFromQueue({ + messageQueue: queueKey, + shard, + maxCount: 10, + }); if (messages.length === 0) { return; @@ -1682,7 +1753,50 @@ export class RunQueue { service: this.name, }); - if (ttlInfo) { + // Use CK-aware enqueue for messages with concurrency keys + if (message.concurrencyKey) { + const ckIndexKey = this.keys.ckIndexKeyFromQueue(message.queue); + const ckWildcardName = this.keys.toCkWildcard(message.queue); + + if (ttlInfo) { + await this.redis.enqueueMessageWithTtlCk( + masterQueueKey, + queueKey, + messageKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + ttlInfo.ttlQueueKey, + ckIndexKey, + queueName, + messageId, + messageData, + messageScore, + ttlInfo.ttlMember, + String(ttlInfo.ttlExpiresAt), + ckWildcardName + ); + } else { + await this.redis.enqueueMessageCk( + masterQueueKey, + queueKey, + messageKey, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + ckIndexKey, + queueName, + messageId, + messageData, + messageScore, + ckWildcardName + ); + } + } else if (ttlInfo) { // Use the TTL-aware enqueue that atomically adds to both queues await this.redis.enqueueMessageWithTtl( masterQueueKey, @@ -1843,6 +1957,117 @@ export class RunQueue { }); } + async #callDequeueMessagesFromCkQueue({ + ckWildcardQueue, + shard, + maxCount, + }: { + ckWildcardQueue: string; + shard: number; + maxCount: number; + }): Promise { + return this.#trace("callDequeueMessagesFromCkQueue", async (span) => { + span.setAttributes({ + ckWildcardQueue, + shard, + maxCount, + }); + + const ckIndexKey = this.keys.ckIndexKeyFromQueue(ckWildcardQueue); + const queueConcurrencyLimitKey = + this.keys.queueConcurrencyLimitKeyFromQueue(ckWildcardQueue); + const envConcurrencyLimitKey = this.keys.envConcurrencyLimitKeyFromQueue(ckWildcardQueue); + const envConcurrencyLimitBurstFactorKey = + this.keys.envConcurrencyLimitBurstFactorKeyFromQueue(ckWildcardQueue); + const envCurrentConcurrencyKey = + this.keys.envCurrentConcurrencyKeyFromQueue(ckWildcardQueue); + const messageKeyPrefix = this.keys.messageKeyPrefixFromQueue(ckWildcardQueue); + const envQueueKey = this.keys.envQueueKeyFromQueue(ckWildcardQueue); + const masterQueueKey = this.keys.masterQueueKeyForShard(shard); + + // Get TTL queue key if TTL system is enabled + const ttlShardCount = this.options.ttlSystem?.shardCount ?? this.shardCount; + const ttlShard = this.keys.masterQueueShardForEnvironment( + this.keys.envIdFromQueue(ckWildcardQueue), + ttlShardCount + ); + const ttlQueueKey = this.options.ttlSystem + ? this.keys.ttlQueueKeyForShard(ttlShard) + : ""; + + this.logger.debug("#callDequeueMessagesFromCkQueue", { + ckWildcardQueue, + ckIndexKey, + queueConcurrencyLimitKey, + envConcurrencyLimitKey, + envCurrentConcurrencyKey, + messageKeyPrefix, + envQueueKey, + masterQueueKey, + ttlQueueKey, + shard, + maxCount, + }); + + const result = await this.redis.dequeueMessagesFromCkQueue( + //keys + ckIndexKey, + queueConcurrencyLimitKey, + envConcurrencyLimitKey, + envConcurrencyLimitBurstFactorKey, + envCurrentConcurrencyKey, + messageKeyPrefix, + envQueueKey, + masterQueueKey, + ttlQueueKey, + //args + ckWildcardQueue, + String(Date.now()), + String(this.options.defaultEnvConcurrency), + String(this.options.defaultEnvConcurrencyBurstFactor ?? 1), + this.options.redis.keyPrefix ?? "", + String(maxCount) + ); + + if (!result) { + span.setAttribute("message_count", 0); + return []; + } + + this.logger.debug("dequeueMessagesFromCkQueue raw result", { + result, + service: this.name, + }); + + const messages = []; + for (let i = 0; i < result.length; i += 3) { + const messageId = result[i]; + const messageScore = result[i + 1]; + const rawMessage = result[i + 2]; + + const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage)); + if (!parsedMessage.success) { + this.logger.error(`[${this.name}] Failed to parse CK message`, { + messageId, + error: parsedMessage.error, + service: this.name, + }); + continue; + } + + messages.push({ + messageId, + messageScore, + message: parsedMessage.data, + }); + } + + const filteredMessages = messages.filter(Boolean) as DequeuedMessage[]; + span.setAttribute("message_count", filteredMessages.length); + return filteredMessages; + }); + } + async #callDequeueMessageFromWorkerQueue({ workerQueue, blockingPop, @@ -2026,6 +2251,29 @@ export class RunQueue { service: this.name, }); + if (message.concurrencyKey) { + const ckIndexKey = this.keys.ckIndexKeyFromQueue(message.queue); + const ckWildcardName = this.keys.toCkWildcard(message.queue); + + return this.redis.acknowledgeMessageCk( + masterQueueKey, + messageKey, + messageQueue, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + workerQueueKey, + ckIndexKey, + messageId, + messageQueue, + messageKeyValue, + removeFromWorkerQueue ? "1" : "0", + ckWildcardName + ); + } + return this.redis.acknowledgeMessage( masterQueueKey, messageKey, @@ -2114,22 +2362,46 @@ export class RunQueue { service: this.name, }); - await this.redis.nackMessage( - //keys - masterQueueKey, - messageKey, - messageQueue, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - queueCurrentDequeuedKey, - envCurrentDequeuedKey, - envQueueKey, - //args - messageId, - messageQueue, - JSON.stringify(message), - String(messageScore) - ); + if (message.concurrencyKey) { + const ckIndexKey = this.keys.ckIndexKeyFromQueue(message.queue); + const ckWildcardName = this.keys.toCkWildcard(message.queue); + + await this.redis.nackMessageCk( + //keys + masterQueueKey, + messageKey, + messageQueue, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + ckIndexKey, + //args + messageId, + messageQueue, + JSON.stringify(message), + String(messageScore), + ckWildcardName + ); + } else { + await this.redis.nackMessage( + //keys + masterQueueKey, + messageKey, + messageQueue, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + //args + messageId, + messageQueue, + JSON.stringify(message), + String(messageScore) + ); + } } async #callMoveToDeadLetterQueue({ message }: { message: OutputPayload }) { @@ -2147,19 +2419,40 @@ export class RunQueue { this.shardCount ); - await this.redis.moveToDeadLetterQueue( - masterQueueKey, - messageKey, - messageQueue, - queueCurrentConcurrencyKey, - envCurrentConcurrencyKey, - queueCurrentDequeuedKey, - envCurrentDequeuedKey, - envQueueKey, - deadLetterQueueKey, - messageId, - messageQueue - ); + if (message.concurrencyKey) { + const ckIndexKey = this.keys.ckIndexKeyFromQueue(message.queue); + const ckWildcardName = this.keys.toCkWildcard(message.queue); + + await this.redis.moveToDeadLetterQueueCk( + masterQueueKey, + messageKey, + messageQueue, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + deadLetterQueueKey, + ckIndexKey, + messageId, + messageQueue, + ckWildcardName + ); + } else { + await this.redis.moveToDeadLetterQueue( + masterQueueKey, + messageKey, + messageQueue, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + envQueueKey, + deadLetterQueueKey, + messageId, + messageQueue + ); + } } #callUpdateEnvironmentConcurrencyLimits({ @@ -2594,34 +2887,144 @@ redis.call('SREM', envCurrentDequeuedKey, messageId) `, }); - // Expire TTL runs - atomically removes from TTL set, acknowledges from normal queue, and enqueues to TTL worker - this.redis.defineCommand("expireTtlRuns", { - numberOfKeys: 1, + // CK-aware enqueue: adds to CK index + master queue with :ck:* member + this.redis.defineCommand("enqueueMessageCk", { + numberOfKeys: 9, lua: ` -local ttlQueueKey = KEYS[1] -local keyPrefix = ARGV[1] -local currentTime = tonumber(ARGV[2]) -local batchSize = tonumber(ARGV[3]) -local shardCount = tonumber(ARGV[4]) -local workerQueueKey = ARGV[5] -local workerItemsKey = ARGV[6] -local visibilityTimeoutMs = tonumber(ARGV[7]) +local masterQueueKey = KEYS[1] +local queueKey = KEYS[2] +local messageKey = KEYS[3] +local queueCurrentConcurrencyKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local queueCurrentDequeuedKey = KEYS[6] +local envCurrentDequeuedKey = KEYS[7] +local envQueueKey = KEYS[8] +local ckIndexKey = KEYS[9] --- Get expired runs from TTL sorted set (score <= currentTime) -local expiredMembers = redis.call('ZRANGEBYSCORE', ttlQueueKey, '-inf', currentTime, 'LIMIT', 0, batchSize) +local queueName = ARGV[1] +local messageId = ARGV[2] +local messageData = ARGV[3] +local messageScore = ARGV[4] +local ckWildcardName = ARGV[5] -if #expiredMembers == 0 then - return {} +-- Write the message to the message key +redis.call('SET', messageKey, messageData) + +-- Add the message to the CK-specific queue +redis.call('ZADD', queueKey, messageScore, messageId) + +-- Add the message to the env queue +redis.call('ZADD', envQueueKey, messageScore, messageId) + +-- Rebalance CK index +local earliest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') +if #earliest > 0 then + redis.call('ZADD', ckIndexKey, earliest[2], queueName) end -local time = redis.call('TIME') -local nowMs = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000) +-- Rebalance master queue with ck:* member +local earliestIdx = redis.call('ZRANGE', ckIndexKey, 0, 0, 'WITHSCORES') +if #earliestIdx > 0 then + redis.call('ZADD', masterQueueKey, earliestIdx[2], ckWildcardName) +end -local results = {} +-- Remove old-format entry from master queue (transition cleanup) +redis.call('ZREM', masterQueueKey, queueName) -for i, member in ipairs(expiredMembers) do - -- Parse member format: "queueKey|runId|orgId" - local pipePos1 = string.find(member, "|", 1, true) +-- Update the concurrency keys +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) + `, + }); + + // CK-aware enqueue with TTL tracking + this.redis.defineCommand("enqueueMessageWithTtlCk", { + numberOfKeys: 10, + lua: ` +local masterQueueKey = KEYS[1] +local queueKey = KEYS[2] +local messageKey = KEYS[3] +local queueCurrentConcurrencyKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local queueCurrentDequeuedKey = KEYS[6] +local envCurrentDequeuedKey = KEYS[7] +local envQueueKey = KEYS[8] +local ttlQueueKey = KEYS[9] +local ckIndexKey = KEYS[10] + +local queueName = ARGV[1] +local messageId = ARGV[2] +local messageData = ARGV[3] +local messageScore = ARGV[4] +local ttlMember = ARGV[5] +local ttlScore = ARGV[6] +local ckWildcardName = ARGV[7] + +-- Write the message to the message key +redis.call('SET', messageKey, messageData) + +-- Add the message to the CK-specific queue +redis.call('ZADD', queueKey, messageScore, messageId) + +-- Add the message to the env queue +redis.call('ZADD', envQueueKey, messageScore, messageId) + +-- Add to TTL sorted set +redis.call('ZADD', ttlQueueKey, ttlScore, ttlMember) + +-- Rebalance CK index +local earliest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') +if #earliest > 0 then + redis.call('ZADD', ckIndexKey, earliest[2], queueName) +end + +-- Rebalance master queue with ck:* member +local earliestIdx = redis.call('ZRANGE', ckIndexKey, 0, 0, 'WITHSCORES') +if #earliestIdx > 0 then + redis.call('ZADD', masterQueueKey, earliestIdx[2], ckWildcardName) +end + +-- Remove old-format entry from master queue (transition cleanup) +redis.call('ZREM', masterQueueKey, queueName) + +-- Update the concurrency keys +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) + `, + }); + + // Expire TTL runs - atomically removes from TTL set, acknowledges from normal queue, and enqueues to TTL worker + this.redis.defineCommand("expireTtlRuns", { + numberOfKeys: 1, + lua: ` +local ttlQueueKey = KEYS[1] +local keyPrefix = ARGV[1] +local currentTime = tonumber(ARGV[2]) +local batchSize = tonumber(ARGV[3]) +local shardCount = tonumber(ARGV[4]) +local workerQueueKey = ARGV[5] +local workerItemsKey = ARGV[6] +local visibilityTimeoutMs = tonumber(ARGV[7]) + +-- Get expired runs from TTL sorted set (score <= currentTime) +local expiredMembers = redis.call('ZRANGEBYSCORE', ttlQueueKey, '-inf', currentTime, 'LIMIT', 0, batchSize) + +if #expiredMembers == 0 then + return {} +end + +local time = redis.call('TIME') +local nowMs = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000) + +local results = {} + +for i, member in ipairs(expiredMembers) do + -- Parse member format: "queueKey|runId|orgId" + local pipePos1 = string.find(member, "|", 1, true) if pipePos1 then local pipePos2 = string.find(member, "|", pipePos1 + 1, true) if pipePos2 then @@ -2671,6 +3074,18 @@ for i, member in ipairs(expiredMembers) do redis.call('SREM', envConcurrencyKey, runId) redis.call('SREM', envDequeuedKey, runId) + -- Rebalance CK index if this is a CK queue + local ckMatch = string.match(rawQueueKey, "^(.+):ck:.+$") + if ckMatch then + local ckIndexKey = keyPrefix .. ckMatch .. ":ckIndex" + local earliest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') + if #earliest == 0 then + redis.call('ZREM', ckIndexKey, rawQueueKey) + else + redis.call('ZADD', ckIndexKey, earliest[2], rawQueueKey) + end + end + -- Enqueue to TTL worker (runId is natural dedup key) local serializedItem = cjson.encode({ job = "expireTtlRun", @@ -2811,6 +3226,151 @@ else end -- Return results as a flat array: [messageId1, messageScore1, messagePayload1, messageId2, messageScore2, messagePayload2, ...] +return results + `, + }); + + // CK-aware dequeue: iterates CK sub-queues from CK index + this.redis.defineCommand("dequeueMessagesFromCkQueue", { + numberOfKeys: 9, + lua: ` +local ckIndexKey = KEYS[1] +local queueConcurrencyLimitKey = KEYS[2] +local envConcurrencyLimitKey = KEYS[3] +local envConcurrencyLimitBurstFactorKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local messageKeyPrefix = KEYS[6] +local envQueueKey = KEYS[7] +local masterQueueKey = KEYS[8] +local ttlQueueKey = KEYS[9] + +local ckWildcardName = ARGV[1] +local currentTime = tonumber(ARGV[2]) +local defaultEnvConcurrencyLimit = ARGV[3] +local defaultEnvConcurrencyBurstFactor = ARGV[4] +local keyPrefix = ARGV[5] +local maxCount = tonumber(ARGV[6] or '1') + +-- Check env concurrency +local envCurrentConcurrency = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0') +local envConcurrencyLimit = tonumber(redis.call('GET', envConcurrencyLimitKey) or defaultEnvConcurrencyLimit) +local envConcurrencyLimitBurstFactor = tonumber(redis.call('GET', envConcurrencyLimitBurstFactorKey) or defaultEnvConcurrencyBurstFactor) +local envConcurrencyLimitWithBurstFactor = math.floor(envConcurrencyLimit * envConcurrencyLimitBurstFactor) + +if envCurrentConcurrency >= envConcurrencyLimitWithBurstFactor then + return nil +end + +-- Get base queue concurrency limit +local queueConcurrencyLimit = math.min(tonumber(redis.call('GET', queueConcurrencyLimitKey) or '1000000'), envConcurrencyLimit) + +-- Calculate env available capacity +local envAvailableCapacity = envConcurrencyLimitWithBurstFactor - envCurrentConcurrency +local actualMaxCount = math.min(maxCount, envAvailableCapacity) + +if actualMaxCount <= 0 then + return nil +end + +-- Get CK sub-queues from CK index with scores <= currentTime +local ckQueues = redis.call('ZRANGEBYSCORE', ckIndexKey, '-inf', tostring(currentTime), 'LIMIT', 0, actualMaxCount * 3) + +if #ckQueues == 0 then + -- Rebalance master queue in case CK index has future-scored entries + local anyIdx = redis.call('ZRANGE', ckIndexKey, 0, 0, 'WITHSCORES') + if #anyIdx == 0 then + redis.call('ZREM', masterQueueKey, ckWildcardName) + else + redis.call('ZADD', masterQueueKey, anyIdx[2], ckWildcardName) + end + return nil +end + +local results = {} +local dequeuedCount = 0 + +for _, ckQueueName in ipairs(ckQueues) do + if dequeuedCount >= actualMaxCount then + break + end + + local fullQueueKey = keyPrefix .. ckQueueName + + -- Check CK-specific concurrency + local ckConcurrencyKey = fullQueueKey .. ':currentConcurrency' + local ckCurrentConcurrency = tonumber(redis.call('SCARD', ckConcurrencyKey) or '0') + + if ckCurrentConcurrency < queueConcurrencyLimit then + -- Try to dequeue one message from this CK sub-queue + local messages = redis.call('ZRANGEBYSCORE', fullQueueKey, '-inf', tostring(currentTime), 'WITHSCORES', 'LIMIT', 0, 1) + + if #messages >= 2 then + local messageId = messages[1] + local messageScore = messages[2] + + local messageKey = messageKeyPrefix .. messageId + local messagePayload = redis.call('GET', messageKey) + + if messagePayload then + local messageData = cjson.decode(messagePayload) + local ttlExpiresAt = messageData and messageData.ttlExpiresAt + + if ttlExpiresAt and ttlExpiresAt <= currentTime then + -- TTL expired - remove from queues + redis.call('ZREM', fullQueueKey, messageId) + redis.call('ZREM', envQueueKey, messageId) + else + -- Dequeue normally + redis.call('ZREM', fullQueueKey, messageId) + redis.call('ZREM', envQueueKey, messageId) + redis.call('SADD', ckConcurrencyKey, messageId) + redis.call('SADD', envCurrentConcurrencyKey, messageId) + + -- Remove from TTL set if applicable + if ttlQueueKey and ttlQueueKey ~= '' and ttlExpiresAt then + local ttlMember = ckQueueName .. '|' .. messageId .. '|' .. (messageData.orgId or '') + redis.call('ZREM', ttlQueueKey, ttlMember) + end + + table.insert(results, messageId) + table.insert(results, messageScore) + table.insert(results, messagePayload) + + dequeuedCount = dequeuedCount + 1 + end + else + -- Stale entry + redis.call('ZREM', fullQueueKey, messageId) + redis.call('ZREM', envQueueKey, messageId) + end + + -- Rebalance CK index for this sub-queue + local earliest = redis.call('ZRANGE', fullQueueKey, 0, 0, 'WITHSCORES') + if #earliest == 0 then + redis.call('ZREM', ckIndexKey, ckQueueName) + else + redis.call('ZADD', ckIndexKey, earliest[2], ckQueueName) + end + else + -- No messages available in score range, update CK index + local any = redis.call('ZRANGE', fullQueueKey, 0, 0, 'WITHSCORES') + if #any == 0 then + redis.call('ZREM', ckIndexKey, ckQueueName) + else + redis.call('ZADD', ckIndexKey, any[2], ckQueueName) + end + end + end +end + +-- Rebalance master queue +local earliestIdx = redis.call('ZRANGE', ckIndexKey, 0, 0, 'WITHSCORES') +if #earliestIdx == 0 then + redis.call('ZREM', masterQueueKey, ckWildcardName) +else + redis.call('ZADD', masterQueueKey, earliestIdx[2], ckWildcardName) +end + return results `, }); @@ -2990,6 +3550,177 @@ end -- Add the message to the dead letter queue redis.call('ZADD', deadLetterQueueKey, tonumber(redis.call('TIME')[1]), messageId) +-- Update the concurrency keys +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) +`, + }); + + // CK-aware acknowledge: rebalances CK index and master queue with :ck:* member + this.redis.defineCommand("acknowledgeMessageCk", { + numberOfKeys: 10, + lua: ` +-- Keys: +local masterQueueKey = KEYS[1] +local messageKey = KEYS[2] +local messageQueueKey = KEYS[3] +local queueCurrentConcurrencyKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local queueCurrentDequeuedKey = KEYS[6] +local envCurrentDequeuedKey = KEYS[7] +local envQueueKey = KEYS[8] +local workerQueueKey = KEYS[9] +local ckIndexKey = KEYS[10] + +-- Args: +local messageId = ARGV[1] +local messageQueueName = ARGV[2] +local messageKeyValue = ARGV[3] +local removeFromWorkerQueue = ARGV[4] +local ckWildcardName = ARGV[5] + +-- Remove the message from the message key +redis.call('DEL', messageKey) + +-- Remove the message from the CK-specific queue +redis.call('ZREM', messageQueueKey, messageId) +redis.call('ZREM', envQueueKey, messageId) + +-- Rebalance CK index +local earliestInCkQueue = redis.call('ZRANGE', messageQueueKey, 0, 0, 'WITHSCORES') +if #earliestInCkQueue == 0 then + redis.call('ZREM', ckIndexKey, messageQueueName) +else + redis.call('ZADD', ckIndexKey, earliestInCkQueue[2], messageQueueName) +end + +-- Rebalance master queue with ck:* member +local earliestInCkIndex = redis.call('ZRANGE', ckIndexKey, 0, 0, 'WITHSCORES') +if #earliestInCkIndex == 0 then + redis.call('ZREM', masterQueueKey, ckWildcardName) +else + redis.call('ZADD', masterQueueKey, earliestInCkIndex[2], ckWildcardName) +end + +-- Remove old-format entry from master queue (transition cleanup) +redis.call('ZREM', masterQueueKey, messageQueueName) + +-- Update the concurrency keys +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) + +-- Remove the message from the worker queue +if removeFromWorkerQueue == '1' then + redis.call('LREM', workerQueueKey, 0, messageKeyValue) +end +`, + }); + + // CK-aware nack: rebalances CK index and master queue with :ck:* member + this.redis.defineCommand("nackMessageCk", { + numberOfKeys: 9, + lua: ` +-- Keys: +local masterQueueKey = KEYS[1] +local messageKey = KEYS[2] +local messageQueueKey = KEYS[3] +local queueCurrentConcurrencyKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local queueCurrentDequeuedKey = KEYS[6] +local envCurrentDequeuedKey = KEYS[7] +local envQueueKey = KEYS[8] +local ckIndexKey = KEYS[9] + +-- Args: +local messageId = ARGV[1] +local messageQueueName = ARGV[2] +local messageData = ARGV[3] +local messageScore = tonumber(ARGV[4]) +local ckWildcardName = ARGV[5] + +-- Update the message data +redis.call('SET', messageKey, messageData) + +-- Update the concurrency keys +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) + +-- Enqueue the message back into the CK-specific queue +redis.call('ZADD', messageQueueKey, messageScore, messageId) +redis.call('ZADD', envQueueKey, messageScore, messageId) + +-- Rebalance CK index +local earliest = redis.call('ZRANGE', messageQueueKey, 0, 0, 'WITHSCORES') +if #earliest > 0 then + redis.call('ZADD', ckIndexKey, earliest[2], messageQueueName) +end + +-- Rebalance master queue with ck:* member +local earliestIdx = redis.call('ZRANGE', ckIndexKey, 0, 0, 'WITHSCORES') +if #earliestIdx == 0 then + redis.call('ZREM', masterQueueKey, ckWildcardName) +else + redis.call('ZADD', masterQueueKey, earliestIdx[2], ckWildcardName) +end + +-- Remove old-format entry from master queue (transition cleanup) +redis.call('ZREM', masterQueueKey, messageQueueName) +`, + }); + + // CK-aware move to dead letter queue + this.redis.defineCommand("moveToDeadLetterQueueCk", { + numberOfKeys: 10, + lua: ` +-- Keys: +local masterQueueKey = KEYS[1] +local messageKey = KEYS[2] +local messageQueue = KEYS[3] +local queueCurrentConcurrencyKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local queueCurrentDequeuedKey = KEYS[6] +local envCurrentDequeuedKey = KEYS[7] +local envQueueKey = KEYS[8] +local deadLetterQueueKey = KEYS[9] +local ckIndexKey = KEYS[10] + +-- Args: +local messageId = ARGV[1] +local messageQueueName = ARGV[2] +local ckWildcardName = ARGV[3] + +-- Remove the message from the CK-specific queue +redis.call('ZREM', messageQueue, messageId) +redis.call('ZREM', envQueueKey, messageId) + +-- Rebalance CK index +local earliest = redis.call('ZRANGE', messageQueue, 0, 0, 'WITHSCORES') +if #earliest == 0 then + redis.call('ZREM', ckIndexKey, messageQueueName) +else + redis.call('ZADD', ckIndexKey, earliest[2], messageQueueName) +end + +-- Rebalance master queue with ck:* member +local earliestIdx = redis.call('ZRANGE', ckIndexKey, 0, 0, 'WITHSCORES') +if #earliestIdx == 0 then + redis.call('ZREM', masterQueueKey, ckWildcardName) +else + redis.call('ZADD', masterQueueKey, earliestIdx[2], ckWildcardName) +end + +-- Remove old-format entry from master queue (transition cleanup) +redis.call('ZREM', masterQueueKey, messageQueueName) + +-- Add the message to the dead letter queue +redis.call('ZADD', deadLetterQueueKey, tonumber(redis.call('TIME')[1]), messageId) + -- Update the concurrency keys redis.call('SREM', queueCurrentConcurrencyKey, messageId) redis.call('SREM', envCurrentConcurrencyKey, messageId) @@ -3313,6 +4044,131 @@ declare module "@internal/redis" { maxCount: string, callback?: Callback ): Result; + + // CK-aware commands + enqueueMessageCk( + //keys + masterQueueKey: string, + queue: string, + messageKey: string, + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + envQueueKey: string, + ckIndexKey: string, + //args + queueName: string, + messageId: string, + messageData: string, + messageScore: string, + ckWildcardName: string, + callback?: Callback + ): Result; + + enqueueMessageWithTtlCk( + //keys + masterQueueKey: string, + queue: string, + messageKey: string, + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + envQueueKey: string, + ttlQueueKey: string, + ckIndexKey: string, + //args + queueName: string, + messageId: string, + messageData: string, + messageScore: string, + ttlMember: string, + ttlScore: string, + ckWildcardName: string, + callback?: Callback + ): Result; + + dequeueMessagesFromCkQueue( + //keys + ckIndexKey: string, + queueConcurrencyLimitKey: string, + envConcurrencyLimitKey: string, + envConcurrencyLimitBurstFactorKey: string, + envCurrentConcurrencyKey: string, + messageKeyPrefix: string, + envQueueKey: string, + masterQueueKey: string, + ttlQueueKey: string, + //args + ckWildcardName: string, + currentTime: string, + defaultEnvConcurrencyLimit: string, + defaultEnvConcurrencyBurstFactor: string, + keyPrefix: string, + maxCount: string, + callback?: Callback + ): Result; + + acknowledgeMessageCk( + // keys + masterQueueKey: string, + messageKey: string, + messageQueue: string, + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + envQueueKey: string, + workerQueueKey: string, + ckIndexKey: string, + // args + messageId: string, + messageQueueName: string, + messageKeyValue: string, + removeFromWorkerQueue: string, + ckWildcardName: string, + callback?: Callback + ): Result; + + nackMessageCk( + // keys + masterQueueKey: string, + messageKey: string, + messageQueue: string, + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + envQueueKey: string, + ckIndexKey: string, + // args + messageId: string, + messageQueueName: string, + messageData: string, + messageScore: string, + ckWildcardName: string, + callback?: Callback + ): Result; + + moveToDeadLetterQueueCk( + // keys + masterQueueKey: string, + messageKey: string, + messageQueue: string, + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + envQueueKey: string, + deadLetterQueueKey: string, + ckIndexKey: string, + // args + messageId: string, + messageQueueName: string, + ckWildcardName: string, + callback?: Callback + ): Result; } } diff --git a/internal-packages/run-engine/src/run-queue/keyProducer.ts b/internal-packages/run-engine/src/run-queue/keyProducer.ts index f925f0e9579..a85dfc393e9 100644 --- a/internal-packages/run-engine/src/run-queue/keyProducer.ts +++ b/internal-packages/run-engine/src/run-queue/keyProducer.ts @@ -17,6 +17,7 @@ const constants = { DEAD_LETTER_QUEUE_PART: "deadLetter", MASTER_QUEUE_PART: "masterQueue", WORKER_QUEUE_PART: "workerQueue", + CK_INDEX_PART: "ckIndex", } as const; export class RunQueueFullKeyProducer implements RunQueueKeyProducer { @@ -305,6 +306,23 @@ export class RunQueueFullKeyProducer implements RunQueueKeyProducer { return ["ttl", "shard", shard.toString()].join(":"); } + ckIndexKeyFromQueue(queue: string): string { + const baseQueue = queue.replace(/:ck:.+$/, ""); + return `${baseQueue}:${constants.CK_INDEX_PART}`; + } + + baseQueueKeyFromQueue(queue: string): string { + return queue.replace(/:ck:.+$/, ""); + } + + isCkWildcard(queue: string): boolean { + return queue.endsWith(":ck:*"); + } + + toCkWildcard(queue: string): string { + return queue.replace(/:ck:.+$/, ":ck:*"); + } + descriptorFromQueue(queue: string): QueueDescriptor { const parts = queue.split(":"); return { diff --git a/internal-packages/run-engine/src/run-queue/tests/ckIndex.test.ts b/internal-packages/run-engine/src/run-queue/tests/ckIndex.test.ts new file mode 100644 index 00000000000..3945b455a07 --- /dev/null +++ b/internal-packages/run-engine/src/run-queue/tests/ckIndex.test.ts @@ -0,0 +1,521 @@ +import { assertNonNullable, redisTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { Logger } from "@trigger.dev/core/logger"; +import { describe } from "node:test"; +import { setTimeout } from "node:timers/promises"; +import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js"; +import { RunQueue } from "../index.js"; +import { RunQueueFullKeyProducer } from "../keyProducer.js"; +import { InputPayload } from "../types.js"; +import { Decimal } from "@trigger.dev/database"; + +const testOptions = { + name: "rq", + tracer: trace.getTracer("rq"), + workers: 1, + defaultEnvConcurrency: 25, + logger: new Logger("RunQueue", "warn"), + retryOptions: { + maxAttempts: 5, + factor: 1.1, + minTimeoutInMs: 100, + maxTimeoutInMs: 1_000, + randomize: true, + }, + keys: new RunQueueFullKeyProducer(), +}; + +const authenticatedEnvDev = { + id: "e1234", + type: "DEVELOPMENT" as const, + maximumConcurrencyLimit: 10, + concurrencyLimitBurstFactor: new Decimal(2.0), + project: { id: "p1234" }, + organization: { id: "o1234" }, +}; + +function createQueue(redisContainer: any) { + return new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); +} + +function makeMessage(overrides: Partial = {}): InputPayload { + return { + runId: "r1", + taskIdentifier: "task/my-task", + orgId: "o1234", + projectId: "p1234", + environmentId: "e1234", + environmentType: "DEVELOPMENT", + queue: "task/my-task", + timestamp: Date.now(), + attempt: 0, + ...overrides, + }; +} + +vi.setConfig({ testTimeout: 60_000 }); + +describe("CK Index", () => { + redisTest( + "enqueue with CK creates CK index entry and :ck:* master queue entry", + async ({ redisContainer }) => { + const queue = createQueue(redisContainer); + try { + const msg = makeMessage({ runId: "r1", concurrencyKey: "ck-a" }); + + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: msg, + workerQueue: authenticatedEnvDev.id, + skipDequeueProcessing: true, + }); + + // Check that the CK-specific sorted set has the message + const queueLength = await queue.lengthOfQueue( + authenticatedEnvDev, + msg.queue, + msg.concurrencyKey + ); + expect(queueLength).toBe(1); + + // Check master queue: should have :ck:* entry, not :ck:ck-a + const masterQueueKey = testOptions.keys.masterQueueKeyForShard( + testOptions.keys.masterQueueShardForEnvironment(msg.environmentId, 2) + ); + const masterMembers = await queue.redis.zrange( + masterQueueKey, + 0, + -1, + "WITHSCORES" + ); + // Should have exactly one member ending with :ck:* + const ckWildcardMembers = masterMembers.filter( + (m, i) => i % 2 === 0 && m.endsWith(":ck:*") + ); + expect(ckWildcardMembers.length).toBe(1); + + // Should NOT have :ck:ck-a member + const oldFormatMembers = masterMembers.filter( + (m, i) => i % 2 === 0 && m.endsWith(":ck:ck-a") + ); + expect(oldFormatMembers.length).toBe(0); + + // Check CK index has the CK queue + const ckIndexKey = testOptions.keys.ckIndexKeyFromQueue( + testOptions.keys.queueKey(authenticatedEnvDev, msg.queue, msg.concurrencyKey) + ); + const ckIndexMembers = await queue.redis.zrange(ckIndexKey, 0, -1); + expect(ckIndexMembers.length).toBe(1); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "multiple CKs result in single master queue entry", + async ({ redisContainer }) => { + const queue = createQueue(redisContainer); + try { + const now = Date.now(); + const msg1 = makeMessage({ + runId: "r1", + concurrencyKey: "ck-a", + timestamp: now, + }); + const msg2 = makeMessage({ + runId: "r2", + concurrencyKey: "ck-b", + timestamp: now + 100, + }); + const msg3 = makeMessage({ + runId: "r3", + concurrencyKey: "ck-c", + timestamp: now + 200, + }); + + for (const msg of [msg1, msg2, msg3]) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: msg, + workerQueue: authenticatedEnvDev.id, + skipDequeueProcessing: true, + }); + } + + // Master queue should have exactly ONE entry (the :ck:* wildcard) + const masterQueueKey = testOptions.keys.masterQueueKeyForShard( + testOptions.keys.masterQueueShardForEnvironment(msg1.environmentId, 2) + ); + const masterMembers = await queue.redis.zrange( + masterQueueKey, + 0, + -1 + ); + // Filter to only members for our queue + const ourMembers = masterMembers.filter((m) => + m.includes("queue:task/my-task") + ); + expect(ourMembers.length).toBe(1); + expect(ourMembers[0]).toContain(":ck:*"); + + // CK index should have 3 entries + const ckIndexKey = testOptions.keys.ckIndexKeyFromQueue( + testOptions.keys.queueKey(authenticatedEnvDev, msg1.queue, msg1.concurrencyKey) + ); + const ckIndexMembers = await queue.redis.zrange(ckIndexKey, 0, -1); + expect(ckIndexMembers.length).toBe(3); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "dequeue from CK queue distributes across sub-queues", + async ({ redisContainer }) => { + const queue = createQueue(redisContainer); + try { + const now = Date.now() - 1000; // In the past so they're ready + const msg1 = makeMessage({ + runId: "r1", + concurrencyKey: "ck-a", + timestamp: now, + }); + const msg2 = makeMessage({ + runId: "r2", + concurrencyKey: "ck-b", + timestamp: now + 1, + }); + const msg3 = makeMessage({ + runId: "r3", + concurrencyKey: "ck-a", + timestamp: now + 2, + }); + + for (const msg of [msg1, msg2, msg3]) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: msg, + workerQueue: authenticatedEnvDev.id, + skipDequeueProcessing: true, + }); + } + + // Dequeue via the master queue consumer + const shard = testOptions.keys.masterQueueShardForEnvironment( + msg1.environmentId, + 2 + ); + const messages = await queue.testDequeueFromMasterQueue(shard, msg1.environmentId, 10); + + // Should dequeue messages from both CK sub-queues + expect(messages).toBeDefined(); + // We should get at least 2 messages (one from each CK) + // The exact order depends on CK index scoring + expect(messages!.length).toBeGreaterThanOrEqual(2); + + const dequeuedRunIds = messages!.map((m: any) => m.messageId); + // r1 (ck-a, oldest) and r2 (ck-b) should be dequeued + expect(dequeuedRunIds).toContain("r1"); + expect(dequeuedRunIds).toContain("r2"); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "empty CK sub-queue is removed from CK index", + async ({ redisContainer }) => { + const queue = createQueue(redisContainer); + try { + const now = Date.now() - 1000; + const msg1 = makeMessage({ + runId: "r1", + concurrencyKey: "ck-a", + timestamp: now, + }); + const msg2 = makeMessage({ + runId: "r2", + concurrencyKey: "ck-b", + timestamp: now + 1, + }); + + for (const msg of [msg1, msg2]) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: msg, + workerQueue: authenticatedEnvDev.id, + skipDequeueProcessing: true, + }); + } + + // CK index should have 2 entries initially + const ckIndexKey = testOptions.keys.ckIndexKeyFromQueue( + testOptions.keys.queueKey(authenticatedEnvDev, msg1.queue, msg1.concurrencyKey) + ); + let ckIndexMembers = await queue.redis.zrange(ckIndexKey, 0, -1); + expect(ckIndexMembers.length).toBe(2); + + // Dequeue both messages + const shard = testOptions.keys.masterQueueShardForEnvironment( + msg1.environmentId, + 2 + ); + await queue.testDequeueFromMasterQueue(shard, msg1.environmentId, 10); + + // CK index should be empty (both sub-queues drained) + ckIndexMembers = await queue.redis.zrange(ckIndexKey, 0, -1); + expect(ckIndexMembers.length).toBe(0); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "empty CK index removes :ck:* from master queue", + async ({ redisContainer }) => { + const queue = createQueue(redisContainer); + try { + const now = Date.now() - 1000; + const msg = makeMessage({ + runId: "r1", + concurrencyKey: "ck-a", + timestamp: now, + }); + + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: msg, + workerQueue: authenticatedEnvDev.id, + skipDequeueProcessing: true, + }); + + const masterQueueKey = testOptions.keys.masterQueueKeyForShard( + testOptions.keys.masterQueueShardForEnvironment(msg.environmentId, 2) + ); + + // Master queue should have :ck:* entry + let masterMembers = await queue.redis.zrange(masterQueueKey, 0, -1); + expect(masterMembers.length).toBe(1); + + // Dequeue the message + const shard = testOptions.keys.masterQueueShardForEnvironment( + msg.environmentId, + 2 + ); + await queue.testDequeueFromMasterQueue(shard, msg.environmentId, 10); + + // Master queue should be empty + masterMembers = await queue.redis.zrange(masterQueueKey, 0, -1); + expect(masterMembers.length).toBe(0); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "mixed CK and non-CK queues in same shard", + async ({ redisContainer }) => { + const queue = createQueue(redisContainer); + try { + const now = Date.now() - 1000; + + // Non-CK message + const msgNoCk = makeMessage({ + runId: "r-no-ck", + timestamp: now, + }); + + // CK messages + const msgCk1 = makeMessage({ + runId: "r-ck-1", + concurrencyKey: "ck-a", + timestamp: now + 1, + }); + const msgCk2 = makeMessage({ + runId: "r-ck-2", + concurrencyKey: "ck-b", + timestamp: now + 2, + }); + + for (const msg of [msgNoCk, msgCk1, msgCk2]) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: msg, + workerQueue: authenticatedEnvDev.id, + skipDequeueProcessing: true, + }); + } + + // Master queue should have 2 entries: one non-CK queue and one :ck:* + const masterQueueKey = testOptions.keys.masterQueueKeyForShard( + testOptions.keys.masterQueueShardForEnvironment(msgNoCk.environmentId, 2) + ); + const masterMembers = await queue.redis.zrange(masterQueueKey, 0, -1); + expect(masterMembers.length).toBe(2); + + // One should be the non-CK queue, one should be :ck:* + const ckWildcard = masterMembers.filter((m) => m.endsWith(":ck:*")); + const nonCk = masterMembers.filter( + (m) => !m.includes(":ck:") + ); + expect(ckWildcard.length).toBe(1); + expect(nonCk.length).toBe(1); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "acknowledge CK message rebalances CK index and master queue", + async ({ redisContainer }) => { + const queue = createQueue(redisContainer); + try { + const now = Date.now() - 1000; + const msg1 = makeMessage({ + runId: "r1", + concurrencyKey: "ck-a", + timestamp: now, + }); + const msg2 = makeMessage({ + runId: "r2", + concurrencyKey: "ck-a", + timestamp: now + 100, + }); + + for (const msg of [msg1, msg2]) { + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: msg, + workerQueue: authenticatedEnvDev.id, + skipDequeueProcessing: true, + }); + } + + // Dequeue one message + const shard = testOptions.keys.masterQueueShardForEnvironment( + msg1.environmentId, + 2 + ); + const messages = await queue.testDequeueFromMasterQueue(shard, msg1.environmentId, 1); + expect(messages!.length).toBe(1); + expect(messages![0].messageId).toBe("r1"); + + // Acknowledge the dequeued message + await queue.acknowledgeMessage(msg1.orgId, "r1", { + skipDequeueProcessing: true, + }); + + // CK index should still have the ck-a entry (r2 is still queued) + const ckIndexKey = testOptions.keys.ckIndexKeyFromQueue( + testOptions.keys.queueKey(authenticatedEnvDev, msg1.queue, msg1.concurrencyKey) + ); + const ckIndexMembers = await queue.redis.zrange(ckIndexKey, 0, -1); + expect(ckIndexMembers.length).toBe(1); + + // Master queue should still have the :ck:* entry + const masterQueueKey = testOptions.keys.masterQueueKeyForShard(shard); + const masterMembers = await queue.redis.zrange(masterQueueKey, 0, -1); + expect(masterMembers.length).toBe(1); + expect(masterMembers[0]).toContain(":ck:*"); + + // Dequeue and ack the last message + const messages2 = await queue.testDequeueFromMasterQueue(shard, msg1.environmentId, 1); + expect(messages2!.length).toBe(1); + await queue.acknowledgeMessage(msg2.orgId, "r2", { + skipDequeueProcessing: true, + }); + + // CK index should be empty + const ckIndexMembers2 = await queue.redis.zrange(ckIndexKey, 0, -1); + expect(ckIndexMembers2.length).toBe(0); + + // Master queue should be empty + const masterMembers2 = await queue.redis.zrange(masterQueueKey, 0, -1); + expect(masterMembers2.length).toBe(0); + } finally { + await queue.quit(); + } + } + ); + + redisTest( + "nack CK message rebalances CK index", + async ({ redisContainer }) => { + const queue = createQueue(redisContainer); + try { + const now = Date.now() - 1000; + const msg = makeMessage({ + runId: "r1", + concurrencyKey: "ck-a", + timestamp: now, + }); + + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: msg, + workerQueue: authenticatedEnvDev.id, + skipDequeueProcessing: true, + }); + + // Dequeue the message + const shard = testOptions.keys.masterQueueShardForEnvironment( + msg.environmentId, + 2 + ); + const messages = await queue.testDequeueFromMasterQueue(shard, msg.environmentId, 1); + expect(messages!.length).toBe(1); + + // Nack the message (re-enqueue) + await queue.nackMessage({ + orgId: msg.orgId, + messageId: "r1", + retryAt: Date.now() + 5000, + incrementAttemptCount: false, + skipDequeueProcessing: true, + }); + + // CK index should have the ck-a entry (message re-enqueued) + const ckIndexKey = testOptions.keys.ckIndexKeyFromQueue( + testOptions.keys.queueKey(authenticatedEnvDev, msg.queue, msg.concurrencyKey) + ); + const ckIndexMembers = await queue.redis.zrange(ckIndexKey, 0, -1); + expect(ckIndexMembers.length).toBe(1); + + // Master queue should have the :ck:* entry + const masterQueueKey = testOptions.keys.masterQueueKeyForShard(shard); + const masterMembers = await queue.redis.zrange(masterQueueKey, 0, -1); + expect(masterMembers.length).toBe(1); + expect(masterMembers[0]).toContain(":ck:*"); + + // No old-format entries + const oldFormatMembers = masterMembers.filter( + (m) => m.includes(":ck:") && !m.endsWith(":ck:*") + ); + expect(oldFormatMembers.length).toBe(0); + } finally { + await queue.quit(); + } + } + ); +}); diff --git a/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts b/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts index 8b980749eab..0f65b020885 100644 --- a/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/keyProducer.test.ts @@ -359,4 +359,73 @@ describe("KeyProducer", () => { concurrencyKey: "c1234", }); }); + + it("ckIndexKeyFromQueue", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = keyProducer.queueKey( + { + id: "e1234", + type: "PRODUCTION", + project: { id: "p1234" }, + organization: { id: "o1234" }, + }, + "task/task-name", + "c1234" + ); + const key = keyProducer.ckIndexKeyFromQueue(queueKey); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:ckIndex"); + }); + + it("ckIndexKeyFromQueue (from wildcard)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const key = keyProducer.ckIndexKeyFromQueue( + "{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:ck:*" + ); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name:ckIndex"); + }); + + it("baseQueueKeyFromQueue (with CK)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = keyProducer.queueKey( + { + id: "e1234", + type: "PRODUCTION", + project: { id: "p1234" }, + organization: { id: "o1234" }, + }, + "task/task-name", + "c1234" + ); + const key = keyProducer.baseQueueKeyFromQueue(queueKey); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name"); + }); + + it("baseQueueKeyFromQueue (no CK)", () => { + const keyProducer = new RunQueueFullKeyProducer(); + const queueKey = keyProducer.queueKey( + { + id: "e1234", + type: "PRODUCTION", + project: { id: "p1234" }, + organization: { id: "o1234" }, + }, + "task/task-name" + ); + const key = keyProducer.baseQueueKeyFromQueue(queueKey); + expect(key).toBe("{org:o1234}:proj:p1234:env:e1234:queue:task/task-name"); + }); + + it("isCkWildcard", () => { + const keyProducer = new RunQueueFullKeyProducer(); + expect(keyProducer.isCkWildcard("{org:o1234}:proj:p1234:env:e1234:queue:task/foo:ck:*")).toBe(true); + expect(keyProducer.isCkWildcard("{org:o1234}:proj:p1234:env:e1234:queue:task/foo:ck:bar")).toBe(false); + expect(keyProducer.isCkWildcard("{org:o1234}:proj:p1234:env:e1234:queue:task/foo")).toBe(false); + }); + + it("toCkWildcard", () => { + const keyProducer = new RunQueueFullKeyProducer(); + expect(keyProducer.toCkWildcard("{org:o1234}:proj:p1234:env:e1234:queue:task/foo:ck:bar")).toBe( + "{org:o1234}:proj:p1234:env:e1234:queue:task/foo:ck:*" + ); + }); }); diff --git a/internal-packages/run-engine/src/run-queue/types.ts b/internal-packages/run-engine/src/run-queue/types.ts index fd33e7e1925..1f16ccfa57a 100644 --- a/internal-packages/run-engine/src/run-queue/types.ts +++ b/internal-packages/run-engine/src/run-queue/types.ts @@ -125,6 +125,12 @@ export interface RunQueueKeyProducer { // TTL system methods ttlQueueKeyForShard(shard: number): string; + + // CK index methods + ckIndexKeyFromQueue(queue: string): string; + baseQueueKeyFromQueue(queue: string): string; + isCkWildcard(queue: string): boolean; + toCkWildcard(queue: string): string; } export type EnvQueues = { diff --git a/references/hello-world/src/trigger/concurrencyKeys.ts b/references/hello-world/src/trigger/concurrencyKeys.ts new file mode 100644 index 00000000000..7cdaf4847f9 --- /dev/null +++ b/references/hello-world/src/trigger/concurrencyKeys.ts @@ -0,0 +1,201 @@ +import { batch, logger, queue, task } from "@trigger.dev/sdk"; +import { setTimeout } from "node:timers/promises"; + +// Queue with concurrency limit for CK tests +const ckQueue = queue({ + name: "ck-test-queue", + concurrencyLimit: 2, +}); + +// Worker task: simulates work with a concurrency key +export const ckWorkerTask = task({ + id: "ck-worker-task", + queue: ckQueue, + retry: { maxAttempts: 1 }, + run: async (payload: { id: string; waitMs: number }) => { + const startedAt = Date.now(); + logger.info(`CK worker ${payload.id} started`); + await setTimeout(payload.waitMs); + const completedAt = Date.now(); + logger.info(`CK worker ${payload.id} completed`); + return { id: payload.id, startedAt, completedAt }; + }, +}); + +// Test 1: Multiple CKs should each get their own concurrency slot +export const ckBasicTest = task({ + id: "ck-basic-test", + retry: { maxAttempts: 1 }, + maxDuration: 120, + run: async () => { + logger.info("Testing basic CK behavior: multiple CKs run concurrently"); + + // Trigger 3 runs with different CKs - all should be able to run + // because each CK gets its own concurrency tracking + const results = await batch.triggerAndWait([ + { + id: ckWorkerTask.id, + payload: { id: "user-1", waitMs: 3000 }, + options: { concurrencyKey: "user-1" }, + }, + { + id: ckWorkerTask.id, + payload: { id: "user-2", waitMs: 3000 }, + options: { concurrencyKey: "user-2" }, + }, + { + id: ckWorkerTask.id, + payload: { id: "user-3", waitMs: 3000 }, + options: { concurrencyKey: "user-3" }, + }, + ]); + + if (!results.runs.every((r) => r.ok)) { + throw new Error("Not all CK runs completed successfully"); + } + + const executions = results.runs + .map((r) => r.output) + .sort((a, b) => a.startedAt - b.startedAt); + + logger.info("CK basic test executions", { executions }); + + return { executions }; + }, +}); + +// Test 2: Same CK should respect concurrency limit +export const ckSameConcurrencyTest = task({ + id: "ck-same-concurrency-test", + retry: { maxAttempts: 1 }, + maxDuration: 120, + run: async () => { + logger.info("Testing same CK concurrency: runs with same CK respect queue limit"); + + // Trigger 4 runs all with the same CK + // Queue limit is 2, so at most 2 should run concurrently + const results = await batch.triggerAndWait([ + { + id: ckWorkerTask.id, + payload: { id: "same-1", waitMs: 4000 }, + options: { concurrencyKey: "shared-key" }, + }, + { + id: ckWorkerTask.id, + payload: { id: "same-2", waitMs: 4000 }, + options: { concurrencyKey: "shared-key" }, + }, + { + id: ckWorkerTask.id, + payload: { id: "same-3", waitMs: 4000 }, + options: { concurrencyKey: "shared-key" }, + }, + { + id: ckWorkerTask.id, + payload: { id: "same-4", waitMs: 4000 }, + options: { concurrencyKey: "shared-key" }, + }, + ]); + + if (!results.runs.every((r) => r.ok)) { + throw new Error("Not all same-CK runs completed successfully"); + } + + const executions = results.runs + .map((r) => r.output) + .sort((a, b) => a.startedAt - b.startedAt); + + // Check max concurrent: with same CK and limit 2, should be <= 2 + let maxConcurrent = 0; + for (const current of executions) { + const concurrent = executions.filter( + (e) => + e.startedAt <= current.startedAt && + e.completedAt > current.startedAt + ).length; + maxConcurrent = Math.max(maxConcurrent, concurrent); + } + + logger.info("Same CK concurrency result", { maxConcurrent, executions }); + + if (maxConcurrent > 2) { + throw new Error(`Expected max 2 concurrent with same CK, got ${maxConcurrent}`); + } + + return { executions, maxConcurrent }; + }, +}); + +// Test 3: Many CKs - the scenario that motivated the CK index +export const ckManyKeysTest = task({ + id: "ck-many-keys-test", + retry: { maxAttempts: 1 }, + maxDuration: 180, + run: async () => { + logger.info("Testing many CKs: all should complete without starving"); + + // Trigger 20 runs each with a different CK + const items = Array.from({ length: 20 }, (_, i) => ({ + id: ckWorkerTask.id, + payload: { id: `prospect-${i}`, waitMs: 2000 }, + options: { concurrencyKey: `prospect-${i}` }, + })); + + const results = await batch.triggerAndWait(items); + + const succeeded = results.runs.filter((r) => r.ok).length; + const failed = results.runs.filter((r) => !r.ok).length; + + logger.info("Many CKs test result", { succeeded, failed, total: results.runs.length }); + + if (failed > 0) { + throw new Error(`${failed} of ${results.runs.length} runs failed`); + } + + return { succeeded, total: results.runs.length }; + }, +}); + +// Test 4: Mixed CK and non-CK triggers on same queue +export const ckMixedTest = task({ + id: "ck-mixed-test", + retry: { maxAttempts: 1 }, + maxDuration: 120, + run: async () => { + logger.info("Testing mixed CK and non-CK on same queue"); + + const results = await batch.triggerAndWait([ + // Non-CK runs + { + id: ckWorkerTask.id, + payload: { id: "no-ck-1", waitMs: 2000 }, + }, + { + id: ckWorkerTask.id, + payload: { id: "no-ck-2", waitMs: 2000 }, + }, + // CK runs + { + id: ckWorkerTask.id, + payload: { id: "with-ck-1", waitMs: 2000 }, + options: { concurrencyKey: "tenant-a" }, + }, + { + id: ckWorkerTask.id, + payload: { id: "with-ck-2", waitMs: 2000 }, + options: { concurrencyKey: "tenant-b" }, + }, + ]); + + const succeeded = results.runs.filter((r) => r.ok).length; + const failed = results.runs.filter((r) => !r.ok).length; + + logger.info("Mixed test result", { succeeded, failed }); + + if (failed > 0) { + throw new Error(`${failed} runs failed in mixed CK/non-CK test`); + } + + return { succeeded, total: results.runs.length }; + }, +});