@@ -15,48 +15,88 @@ export type TaskRunProcessPoolOptions = {
1515} ;
1616
1717export class TaskRunProcessPool {
18- private availableProcesses : TaskRunProcess [ ] = [ ] ;
19- private busyProcesses : Set < TaskRunProcess > = new Set ( ) ;
18+ // Group processes by worker version
19+ private availableProcessesByVersion : Map < string , TaskRunProcess [ ] > = new Map ( ) ;
20+ private busyProcessesByVersion : Map < string , Set < TaskRunProcess > > = new Map ( ) ;
2021 private readonly options : TaskRunProcessPoolOptions ;
2122 private readonly maxPoolSize : number ;
2223 private readonly maxExecutionsPerProcess : number ;
24+ private readonly executionCountsPerProcess : Map < number , number > = new Map ( ) ;
25+ private readonly deprecatedVersions : Set < string > = new Set ( ) ;
2326
2427 constructor ( options : TaskRunProcessPoolOptions ) {
2528 this . options = options ;
2629 this . maxPoolSize = options . maxPoolSize ?? 3 ;
2730 this . maxExecutionsPerProcess = options . maxExecutionsPerProcess ?? 50 ;
2831 }
2932
33+ deprecateVersion ( version : string ) {
34+ this . deprecatedVersions . add ( version ) ;
35+
36+ logger . debug ( "[TaskRunProcessPool] Deprecating version" , { version } ) ;
37+
38+ const versionProcesses = this . availableProcessesByVersion . get ( version ) || [ ] ;
39+
40+ const processesToKill = versionProcesses . filter ( ( process ) => ! process . isExecuting ( ) ) ;
41+ Promise . all ( processesToKill . map ( ( process ) => this . killProcess ( process ) ) ) . then ( ( ) => {
42+ this . availableProcessesByVersion . delete ( version ) ;
43+ } ) ;
44+ }
45+
3046 async getProcess (
3147 workerManifest : WorkerManifest ,
3248 serverWorker : ServerBackgroundWorker ,
3349 machineResources : MachinePresetResources ,
3450 env ?: Record < string , string >
3551 ) : Promise < { taskRunProcess : TaskRunProcess ; isReused : boolean } > {
52+ const version = serverWorker . version || "unknown" ;
53+
3654 // Try to reuse an existing process if enabled
3755 if ( this . options . enableProcessReuse ) {
38- const reusableProcess = this . findReusableProcess ( ) ;
56+ const reusableProcess = this . findReusableProcess ( version ) ;
3957 if ( reusableProcess ) {
58+ const availableCount = this . availableProcessesByVersion . get ( version ) ?. length || 0 ;
59+ const busyCount = this . busyProcessesByVersion . get ( version ) ?. size || 0 ;
60+
4061 logger . debug ( "[TaskRunProcessPool] Reusing existing process" , {
41- availableCount : this . availableProcesses . length ,
42- busyCount : this . busyProcesses . size ,
62+ version,
63+ availableCount,
64+ busyCount,
4365 } ) ;
4466
45- this . availableProcesses = this . availableProcesses . filter ( ( p ) => p !== reusableProcess ) ;
46- this . busyProcesses . add ( reusableProcess ) ;
67+ // Remove from available and add to busy for this version
68+ const availableProcesses = this . availableProcessesByVersion . get ( version ) || [ ] ;
69+ this . availableProcessesByVersion . set (
70+ version ,
71+ availableProcesses . filter ( ( p ) => p !== reusableProcess )
72+ ) ;
73+
74+ if ( ! this . busyProcessesByVersion . has ( version ) ) {
75+ this . busyProcessesByVersion . set ( version , new Set ( ) ) ;
76+ }
77+ this . busyProcessesByVersion . get ( version ) ! . add ( reusableProcess ) ;
78+
4779 return { taskRunProcess : reusableProcess , isReused : true } ;
4880 } else {
81+ const availableCount = this . availableProcessesByVersion . get ( version ) ?. length || 0 ;
82+ const busyCount = this . busyProcessesByVersion . get ( version ) ?. size || 0 ;
83+
4984 logger . debug ( "[TaskRunProcessPool] No reusable process found" , {
50- availableCount : this . availableProcesses . length ,
51- busyCount : this . busyProcesses . size ,
85+ version,
86+ availableCount,
87+ busyCount,
5288 } ) ;
5389 }
5490 }
5591
5692 // Create new process
93+ const availableCount = this . availableProcessesByVersion . get ( version ) ?. length || 0 ;
94+ const busyCount = this . busyProcessesByVersion . get ( version ) ?. size || 0 ;
95+
5796 logger . debug ( "[TaskRunProcessPool] Creating new process" , {
58- availableCount : this . availableProcesses . length ,
59- busyCount : this . busyProcesses . size ,
97+ version,
98+ availableCount,
99+ busyCount,
60100 } ) ;
61101
62102 const newProcess = new TaskRunProcess ( {
@@ -70,60 +110,99 @@ export class TaskRunProcessPool {
70110 cwd : this . options . cwd ,
71111 } ) . initialize ( ) ;
72112
73- this . busyProcesses . add ( newProcess ) ;
113+ // Add to busy processes for this version
114+ if ( ! this . busyProcessesByVersion . has ( version ) ) {
115+ this . busyProcessesByVersion . set ( version , new Set ( ) ) ;
116+ }
117+ this . busyProcessesByVersion . get ( version ) ! . add ( newProcess ) ;
118+
74119 return { taskRunProcess : newProcess , isReused : false } ;
75120 }
76121
77- async returnProcess ( process : TaskRunProcess ) : Promise < void > {
78- this . busyProcesses . delete ( process ) ;
122+ async returnProcess ( process : TaskRunProcess , version : string ) : Promise < void > {
123+ // Remove from busy processes for this version
124+ const busyProcesses = this . busyProcessesByVersion . get ( version ) ;
125+ if ( busyProcesses ) {
126+ busyProcesses . delete ( process ) ;
127+ }
128+
129+ if ( process . pid ) {
130+ this . executionCountsPerProcess . set (
131+ process . pid ,
132+ ( this . executionCountsPerProcess . get ( process . pid ) ?? 0 ) + 1
133+ ) ;
134+ }
135+
136+ if ( this . shouldReuseProcess ( process , version ) ) {
137+ const availableCount = this . availableProcessesByVersion . get ( version ) ?. length || 0 ;
138+ const busyCount = this . busyProcessesByVersion . get ( version ) ?. size || 0 ;
79139
80- if ( this . shouldReuseProcess ( process ) ) {
81140 logger . debug ( "[TaskRunProcessPool] Returning process to pool" , {
82- availableCount : this . availableProcesses . length ,
83- busyCount : this . busyProcesses . size ,
141+ version,
142+ availableCount,
143+ busyCount,
84144 } ) ;
85145
86146 // Clean up but don't kill the process
87147 try {
88148 await process . cleanup ( false ) ;
89- this . availableProcesses . push ( process ) ;
149+
150+ // Add to available processes for this version
151+ if ( ! this . availableProcessesByVersion . has ( version ) ) {
152+ this . availableProcessesByVersion . set ( version , [ ] ) ;
153+ }
154+ this . availableProcessesByVersion . get ( version ) ! . push ( process ) ;
90155 } catch ( error ) {
91156 logger . debug ( "[TaskRunProcessPool] Failed to cleanup process for reuse, killing it" , {
92157 error,
93158 } ) ;
94159 await this . killProcess ( process ) ;
95160 }
96161 } else {
162+ const availableCount = this . availableProcessesByVersion . get ( version ) ?. length || 0 ;
163+ const busyCount = this . busyProcessesByVersion . get ( version ) ?. size || 0 ;
164+
97165 logger . debug ( "[TaskRunProcessPool] Killing process" , {
98- availableCount : this . availableProcesses . length ,
99- busyCount : this . busyProcesses . size ,
166+ version,
167+ availableCount,
168+ busyCount,
100169 } ) ;
101170 await this . killProcess ( process ) ;
102171 }
103172 }
104173
105- private findReusableProcess ( ) : TaskRunProcess | undefined {
106- return this . availableProcesses . find ( ( process ) => this . isProcessHealthy ( process ) ) ;
174+ private findReusableProcess ( version : string ) : TaskRunProcess | undefined {
175+ const availableProcesses = this . availableProcessesByVersion . get ( version ) || [ ] ;
176+ return availableProcesses . find ( ( process ) => this . isProcessHealthy ( process ) ) ;
107177 }
108178
109- private shouldReuseProcess ( process : TaskRunProcess ) : boolean {
179+ private shouldReuseProcess ( process : TaskRunProcess , version : string ) : boolean {
110180 const isHealthy = this . isProcessHealthy ( process ) ;
111181 const isBeingKilled = process . isBeingKilled ;
112182 const pid = process . pid ;
183+ const executionCount = this . executionCountsPerProcess . get ( pid ?? 0 ) ?? 0 ;
184+ const availableCount = this . availableProcessesByVersion . get ( version ) ?. length || 0 ;
185+ const busyCount = this . busyProcessesByVersion . get ( version ) ?. size || 0 ;
186+ const isDeprecated = this . deprecatedVersions . has ( version ) ;
113187
114188 logger . debug ( "[TaskRunProcessPool] Checking if process should be reused" , {
189+ version,
115190 isHealthy,
116191 isBeingKilled,
117192 pid,
118- availableCount : this . availableProcesses . length ,
119- busyCount : this . busyProcesses . size ,
193+ availableCount,
194+ busyCount,
120195 maxPoolSize : this . maxPoolSize ,
196+ executionCount,
197+ isDeprecated,
121198 } ) ;
122199
123200 return (
124201 this . options . enableProcessReuse &&
125202 this . isProcessHealthy ( process ) &&
126- this . availableProcesses . length < this . maxPoolSize
203+ availableCount < this . maxPoolSize &&
204+ executionCount < this . maxExecutionsPerProcess &&
205+ ! isDeprecated
127206 ) ;
128207 }
129208
@@ -141,25 +220,65 @@ export class TaskRunProcessPool {
141220 }
142221
143222 async shutdown ( ) : Promise < void > {
223+ const totalAvailable = Array . from ( this . availableProcessesByVersion . values ( ) ) . reduce (
224+ ( sum , processes ) => sum + processes . length ,
225+ 0
226+ ) ;
227+ const totalBusy = Array . from ( this . busyProcessesByVersion . values ( ) ) . reduce (
228+ ( sum , processes ) => sum + processes . size ,
229+ 0
230+ ) ;
231+
144232 logger . debug ( "[TaskRunProcessPool] Shutting down pool" , {
145- availableCount : this . availableProcesses . length ,
146- busyCount : this . busyProcesses . size ,
233+ availableCount : totalAvailable ,
234+ busyCount : totalBusy ,
235+ versions : Array . from ( this . availableProcessesByVersion . keys ( ) ) ,
147236 } ) ;
148237
149- // Kill all available processes
150- await Promise . all ( this . availableProcesses . map ( ( process ) => this . killProcess ( process ) ) ) ;
151- this . availableProcesses = [ ] ;
238+ // Kill all available processes across all versions
239+ const allAvailableProcesses = Array . from ( this . availableProcessesByVersion . values ( ) ) . flat ( ) ;
240+ await Promise . all ( allAvailableProcesses . map ( ( process ) => this . killProcess ( process ) ) ) ;
241+ this . availableProcessesByVersion . clear ( ) ;
152242
153- // Kill all busy processes
154- await Promise . all ( Array . from ( this . busyProcesses ) . map ( ( process ) => this . killProcess ( process ) ) ) ;
155- this . busyProcesses . clear ( ) ;
243+ // Kill all busy processes across all versions
244+ const allBusyProcesses = Array . from ( this . busyProcessesByVersion . values ( ) )
245+ . map ( ( processSet ) => Array . from ( processSet ) )
246+ . flat ( ) ;
247+ await Promise . all ( allBusyProcesses . map ( ( process ) => this . killProcess ( process ) ) ) ;
248+ this . busyProcessesByVersion . clear ( ) ;
156249 }
157250
158251 getStats ( ) {
252+ const totalAvailable = Array . from ( this . availableProcessesByVersion . values ( ) ) . reduce (
253+ ( sum , processes ) => sum + processes . length ,
254+ 0
255+ ) ;
256+ const totalBusy = Array . from ( this . busyProcessesByVersion . values ( ) ) . reduce (
257+ ( sum , processes ) => sum + processes . size ,
258+ 0
259+ ) ;
260+
261+ const statsByVersion : Record < string , { available : number ; busy : number } > = { } ;
262+ for ( const [ version , processes ] of this . availableProcessesByVersion . entries ( ) ) {
263+ statsByVersion [ version ] = {
264+ available : processes . length ,
265+ busy : this . busyProcessesByVersion . get ( version ) ?. size || 0 ,
266+ } ;
267+ }
268+ for ( const [ version , processes ] of this . busyProcessesByVersion . entries ( ) ) {
269+ if ( ! statsByVersion [ version ] ) {
270+ statsByVersion [ version ] = {
271+ available : 0 ,
272+ busy : processes . size ,
273+ } ;
274+ }
275+ }
276+
159277 return {
160- availableCount : this . availableProcesses . length ,
161- busyCount : this . busyProcesses . size ,
162- totalCount : this . availableProcesses . length + this . busyProcesses . size ,
278+ availableCount : totalAvailable ,
279+ busyCount : totalBusy ,
280+ totalCount : totalAvailable + totalBusy ,
281+ byVersion : statsByVersion ,
163282 } ;
164283 }
165284}
0 commit comments