@@ -1704,6 +1704,123 @@ describe("StreamBatchItemsService", () => {
17041704 }
17051705 ) ;
17061706
1707+ containerTest (
1708+ "perf: higher ingest concurrency processes a batch proportionally faster" ,
1709+ async ( { prisma, redisOptions } ) => {
1710+ const engine = buildEngine ( prisma , redisOptions ) ;
1711+ const environment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
1712+
1713+ const runCount = 150 ;
1714+
1715+ // A payload processor that holds each slot for a fixed duration, modelling
1716+ // the per-item object-store offload. perItemLatencyMs=0 models a small
1717+ // payload that never offloads, leaving the real per-item enqueueBatchItem
1718+ // (a Redis round-trip) as the only serialized cost. Local MinIO/Redis
1719+ // latency is sub-millisecond and too noisy to compare directly, so the
1720+ // offload case uses a fixed hold instead.
1721+ class FixedLatencyPayloadProcessor extends BatchPayloadProcessor {
1722+ constructor ( private readonly perItemLatencyMs : number ) {
1723+ super ( ) ;
1724+ }
1725+ async process ( payload : unknown , payloadType : string ) : Promise < BatchPayloadProcessResult > {
1726+ if ( this . perItemLatencyMs > 0 ) {
1727+ await sleep ( this . perItemLatencyMs ) ;
1728+ }
1729+ return { payload, payloadType, wasOffloaded : this . perItemLatencyMs > 0 , size : 0 } ;
1730+ }
1731+ }
1732+
1733+ async function timeIngest ( concurrency : number , perItemLatencyMs : number ) : Promise < number > {
1734+ // Each run needs its own batch: sealing mutates state and a re-stream
1735+ // of the same batch would dedup every item.
1736+ const batch = await createBatch ( prisma , environment . id , {
1737+ runCount,
1738+ status : "PENDING" ,
1739+ sealed : false ,
1740+ } ) ;
1741+ await engine . initializeBatch ( {
1742+ batchId : batch . id ,
1743+ friendlyId : batch . friendlyId ,
1744+ environmentId : environment . id ,
1745+ environmentType : environment . type ,
1746+ organizationId : environment . organizationId ,
1747+ projectId : environment . projectId ,
1748+ runCount,
1749+ processingConcurrency : 10 ,
1750+ } ) ;
1751+
1752+ const service = new StreamBatchItemsService ( {
1753+ prisma,
1754+ engine,
1755+ payloadProcessor : new FixedLatencyPayloadProcessor ( perItemLatencyMs ) ,
1756+ } ) ;
1757+
1758+ const start = performance . now ( ) ;
1759+ const result = await service . call (
1760+ environment ,
1761+ batch . friendlyId ,
1762+ itemsToAsyncIterable ( makeItems ( runCount ) ) ,
1763+ { maxItemBytes : 1024 * 1024 , concurrency }
1764+ ) ;
1765+ const elapsedMs = performance . now ( ) - start ;
1766+
1767+ // Correctness holds at every concurrency: all items accepted and sealed.
1768+ expect ( result . sealed ) . toBe ( true ) ;
1769+ expect ( result . itemsAccepted ) . toBe ( runCount ) ;
1770+ expect ( result . itemsDeduplicated ) . toBe ( 0 ) ;
1771+ expect ( await engine . getBatchEnqueuedCount ( batch . id ) ) . toBe ( runCount ) ;
1772+
1773+ return elapsedMs ;
1774+ }
1775+
1776+ // Scenario A: large payloads, where each item offloads to object storage.
1777+ const offloadLatencyMs = 10 ;
1778+ const offloadSeqMs = await timeIngest ( 1 , offloadLatencyMs ) ;
1779+ const offload10Ms = await timeIngest ( 10 , offloadLatencyMs ) ;
1780+ const offload50Ms = await timeIngest ( 50 , offloadLatencyMs ) ;
1781+
1782+ // Scenario B: small payloads (no offload). The only per-item cost is the
1783+ // real Redis enqueue, so this is the floor case that proves the speedup
1784+ // applies to all batch ingest, not just large-payload batches.
1785+ const enqueueSeqMs = await timeIngest ( 1 , 0 ) ;
1786+ const enqueue10Ms = await timeIngest ( 10 , 0 ) ;
1787+
1788+ // eslint-disable-next-line no-console
1789+ console . log (
1790+ `\n[streamBatchItems perf] runCount=${ runCount } \n` +
1791+ ` large payloads (${ offloadLatencyMs } ms/item offload):\n` +
1792+ ` concurrency=1 ${ offloadSeqMs . toFixed ( 0 ) } ms\n` +
1793+ ` concurrency=10 ${ offload10Ms . toFixed ( 0 ) } ms (${ (
1794+ offloadSeqMs / offload10Ms
1795+ ) . toFixed ( 1 ) } x faster)\n` +
1796+ ` concurrency=50 ${ offload50Ms . toFixed ( 0 ) } ms (${ (
1797+ offloadSeqMs / offload50Ms
1798+ ) . toFixed ( 1 ) } x faster)\n` +
1799+ ` small payloads (Redis enqueue only, no offload):\n` +
1800+ ` concurrency=1 ${ enqueueSeqMs . toFixed ( 0 ) } ms\n` +
1801+ ` concurrency=10 ${ enqueue10Ms . toFixed ( 0 ) } ms (${ (
1802+ enqueueSeqMs / enqueue10Ms
1803+ ) . toFixed ( 1 ) } x faster)\n`
1804+ ) ;
1805+
1806+ // Sequential floor: N items each held for offloadLatencyMs cannot finish
1807+ // faster than N x latency. Parallel ingest must beat that floor decisively.
1808+ const sequentialFloorMs = runCount * offloadLatencyMs ;
1809+ expect ( offloadSeqMs ) . toBeGreaterThan ( sequentialFloorMs * 0.8 ) ;
1810+
1811+ // 10x concurrency on a latency-bound workload should be well over 2x faster.
1812+ expect ( offload10Ms ) . toBeLessThan ( offloadSeqMs / 2 ) ;
1813+ // More concurrency keeps helping (or at least never regresses).
1814+ expect ( offload50Ms ) . toBeLessThanOrEqual ( offload10Ms * 1.2 ) ;
1815+
1816+ // Even with no offload, overlapping the per-item Redis enqueue is strictly
1817+ // faster than doing them one at a time.
1818+ expect ( enqueue10Ms ) . toBeLessThan ( enqueueSeqMs ) ;
1819+
1820+ await engine . quit ( ) ;
1821+ }
1822+ ) ;
1823+
17071824 containerTest (
17081825 "deduplicates already-enqueued items during concurrent ingest (Phase 2 retry)" ,
17091826 async ( { prisma, redisOptions } ) => {
0 commit comments