@@ -36,7 +36,6 @@ import {
3636import { EventBus , EventBusEvents } from "./eventBus.js" ;
3737import { RunLocker } from "./locking.js" ;
3838import { ReleaseConcurrencyTokenBucketQueue } from "./releaseConcurrencyTokenBucketQueue.js" ;
39- import { canReleaseConcurrency } from "./statuses.js" ;
4039import { BatchSystem } from "./systems/batchSystem.js" ;
4140import { CheckpointSystem } from "./systems/checkpointSystem.js" ;
4241import { DelayedRunSystem } from "./systems/delayedRunSystem.js" ;
@@ -46,13 +45,14 @@ import {
4645 ExecutionSnapshotSystem ,
4746 getLatestExecutionSnapshot ,
4847} from "./systems/executionSnapshotSystem.js" ;
48+ import { ReleaseConcurrencySystem } from "./systems/releaseConcurrencySystem.js" ;
4949import { RunAttemptSystem } from "./systems/runAttemptSystem.js" ;
5050import { SystemResources } from "./systems/systems.js" ;
5151import { TtlSystem } from "./systems/ttlSystem.js" ;
52+ import { WaitingForWorkerSystem } from "./systems/waitingForWorkerSystem.js" ;
5253import { WaitpointSystem } from "./systems/waitpointSystem.js" ;
5354import { EngineWorker , HeartbeatTimeouts , RunEngineOptions , TriggerParams } from "./types.js" ;
5455import { workerCatalog } from "./workerCatalog.js" ;
55- import { WaitingForWorkerSystem } from "./systems/waitingForWorkerSystem.js" ;
5656
5757export class RunEngine {
5858 private runLockRedis : Redis ;
@@ -63,7 +63,7 @@ export class RunEngine {
6363 private logger = new Logger ( "RunEngine" , "debug" ) ;
6464 private tracer : Tracer ;
6565 private heartbeatTimeouts : HeartbeatTimeouts ;
66- private releaseConcurrencyQueue : ReleaseConcurrencyTokenBucketQueue < {
66+ releaseConcurrencyQueue : ReleaseConcurrencyTokenBucketQueue < {
6767 orgId : string ;
6868 projectId : string ;
6969 envId : string ;
@@ -79,6 +79,7 @@ export class RunEngine {
7979 delayedRunSystem : DelayedRunSystem ;
8080 ttlSystem : TtlSystem ;
8181 waitingForWorkerSystem : WaitingForWorkerSystem ;
82+ releaseConcurrencySystem : ReleaseConcurrencySystem ;
8283
8384 constructor ( private readonly options : RunEngineOptions ) {
8485 this . prisma = options . prisma ;
@@ -188,7 +189,7 @@ export class RunEngine {
188189 redis : {
189190 ...options . queue . redis , // Use base queue redis options
190191 ...options . releaseConcurrency ?. redis , // Allow overrides
191- keyPrefix : `${ options . queue . redis . keyPrefix } release-concurrency:` ,
192+ keyPrefix : `${ options . queue . redis . keyPrefix ?? "" } release-concurrency:` ,
192193 } ,
193194 retry : {
194195 maxRetries : options . releaseConcurrency ?. maxRetries ?? 5 ,
@@ -201,8 +202,8 @@ export class RunEngine {
201202 consumersCount : options . releaseConcurrency ?. consumersCount ?? 1 ,
202203 pollInterval : options . releaseConcurrency ?. pollInterval ?? 1000 ,
203204 batchSize : options . releaseConcurrency ?. batchSize ?? 10 ,
204- executor : async ( descriptor , runId ) => {
205- await this . #executeReleasedConcurrencyFromQueue ( descriptor , runId ) ;
205+ executor : async ( descriptor , snapshotId ) => {
206+ await this . releaseConcurrencySystem . executeReleaseConcurrencyForSnapshot ( snapshotId ) ;
206207 } ,
207208 maxTokens : async ( descriptor ) => {
208209 const environment = await this . prisma . runtimeEnvironment . findFirstOrThrow ( {
@@ -239,6 +240,10 @@ export class RunEngine {
239240 releaseConcurrencyQueue : this . releaseConcurrencyQueue ,
240241 } ;
241242
243+ this . releaseConcurrencySystem = new ReleaseConcurrencySystem ( {
244+ resources,
245+ } ) ;
246+
242247 this . executionSnapshotSystem = new ExecutionSnapshotSystem ( {
243248 resources,
244249 heartbeatTimeouts : this . heartbeatTimeouts ,
@@ -251,6 +256,7 @@ export class RunEngine {
251256
252257 this . checkpointSystem = new CheckpointSystem ( {
253258 resources,
259+ releaseConcurrencySystem : this . releaseConcurrencySystem ,
254260 executionSnapshotSystem : this . executionSnapshotSystem ,
255261 enqueueSystem : this . enqueueSystem ,
256262 } ) ;
@@ -269,6 +275,7 @@ export class RunEngine {
269275 resources,
270276 executionSnapshotSystem : this . executionSnapshotSystem ,
271277 enqueueSystem : this . enqueueSystem ,
278+ releaseConcurrencySystem : this . releaseConcurrencySystem ,
272279 } ) ;
273280
274281 this . ttlSystem = new TtlSystem ( {
@@ -344,6 +351,7 @@ export class RunEngine {
344351 machine,
345352 workerId,
346353 runnerId,
354+ releaseConcurrency,
347355 } : TriggerParams ,
348356 tx ?: PrismaClientOrTransaction
349357 ) : Promise < TaskRun > {
@@ -435,6 +443,8 @@ export class RunEngine {
435443 runStatus : status ,
436444 environmentId : environment . id ,
437445 environmentType : environment . type ,
446+ projectId : environment . project . id ,
447+ organizationId : environment . organization . id ,
438448 workerId,
439449 runnerId,
440450 } ,
@@ -490,12 +500,11 @@ export class RunEngine {
490500 runId : parentTaskRunId ,
491501 waitpoints : associatedWaitpoint . id ,
492502 projectId : associatedWaitpoint . projectId ,
493- organizationId : environment . organization . id ,
494503 batch,
495504 workerId,
496505 runnerId,
497506 tx : prisma ,
498- releaseConcurrency : true , // TODO: This needs to use the release concurrency system
507+ releaseConcurrency,
499508 } ) ;
500509 }
501510
@@ -1003,7 +1012,6 @@ export class RunEngine {
10031012 runId,
10041013 waitpoints,
10051014 projectId,
1006- organizationId,
10071015 releaseConcurrency,
10081016 timeout,
10091017 spanIdToComplete,
@@ -1028,7 +1036,6 @@ export class RunEngine {
10281036 runId,
10291037 waitpoints,
10301038 projectId,
1031- organizationId,
10321039 releaseConcurrency,
10331040 timeout,
10341041 spanIdToComplete,
@@ -1039,35 +1046,6 @@ export class RunEngine {
10391046 } ) ;
10401047 }
10411048
1042- async #executeReleasedConcurrencyFromQueue(
1043- descriptor : { orgId : string ; projectId : string ; envId : string } ,
1044- runId : string
1045- ) {
1046- this . logger . debug ( "Executing released concurrency" , {
1047- descriptor,
1048- runId,
1049- } ) ;
1050-
1051- // - Runlock the run
1052- // - Get latest snapshot
1053- // - If the run is non suspended or going to be, then bail
1054- // - If the run is suspended or going to be, then release the concurrency
1055- await this . runLock . lock ( [ runId ] , 5_000 , async ( ) => {
1056- const snapshot = await getLatestExecutionSnapshot ( this . prisma , runId ) ;
1057-
1058- if ( ! canReleaseConcurrency ( snapshot . executionStatus ) ) {
1059- this . logger . debug ( "Run is not in a state to release concurrency" , {
1060- runId,
1061- snapshot,
1062- } ) ;
1063-
1064- return ;
1065- }
1066-
1067- return await this . runQueue . releaseConcurrency ( descriptor . orgId , snapshot . runId ) ;
1068- } ) ;
1069- }
1070-
10711049 /** This completes a waitpoint and updates all entries so the run isn't blocked,
10721050 * if they're no longer blocked. This doesn't suffer from race conditions. */
10731051 async completeWaitpoint ( {
@@ -1328,7 +1306,8 @@ export class RunEngine {
13281306 id : latestSnapshot . environmentId ,
13291307 type : latestSnapshot . environmentType ,
13301308 } ,
1331- orgId : run . runtimeEnvironment . organizationId ,
1309+ orgId : latestSnapshot . organizationId ,
1310+ projectId : latestSnapshot . projectId ,
13321311 error : {
13331312 type : "INTERNAL_ERROR" ,
13341313 code : "TASK_RUN_DEQUEUED_MAX_RETRIES" ,
0 commit comments