Skip to content

Commit 2fcd0ff

Browse files
0xhzxcopybara-github
authored andcommitted
feat: Add max_concurrent_active_run_count support to Scheduled Pipelines client GA.
test: Add unit tests for `max_concurrent_active_run_count`. PiperOrigin-RevId: 869264271
1 parent 6ac28a5 commit 2fcd0ff

File tree

8 files changed

+138
-0
lines changed

8 files changed

+138
-0
lines changed

google/cloud/aiplatform/pipeline_job_schedules.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ def create(
129129
allow_queueing: bool = False,
130130
max_run_count: Optional[int] = None,
131131
max_concurrent_run_count: int = 1,
132+
max_concurrent_active_run_count: Optional[int] = None,
132133
service_account: Optional[str] = None,
133134
network: Optional[str] = None,
134135
create_request_timeout: Optional[float] = None,
@@ -155,6 +156,10 @@ def create(
155156
Must be positive and <= 2^63-1.
156157
max_concurrent_run_count (int):
157158
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
159+
max_concurrent_active_run_count (int):
160+
Optional. Maximum number of active runs that can be executed
161+
concurrently for this PipelineJobSchedule. Active runs are those
162+
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
158163
service_account (str):
159164
Optional. Specifies the service account for workload run-as account.
160165
Users submitting jobs must have act-as permission on this run-as account.
@@ -176,6 +181,7 @@ def create(
176181
allow_queueing=allow_queueing,
177182
max_run_count=max_run_count,
178183
max_concurrent_run_count=max_concurrent_run_count,
184+
max_concurrent_active_run_count=max_concurrent_active_run_count,
179185
service_account=service_account,
180186
network=network,
181187
create_request_timeout=create_request_timeout,
@@ -189,6 +195,7 @@ def _create(
189195
allow_queueing: bool = False,
190196
max_run_count: Optional[int] = None,
191197
max_concurrent_run_count: int = 1,
198+
max_concurrent_active_run_count: Optional[int] = None,
192199
service_account: Optional[str] = None,
193200
network: Optional[str] = None,
194201
create_request_timeout: Optional[float] = None,
@@ -215,6 +222,10 @@ def _create(
215222
Must be positive and <= 2^63-1.
216223
max_concurrent_run_count (int):
217224
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
225+
max_concurrent_active_run_count (int):
226+
Optional. Maximum number of active runs that can be executed
227+
concurrently for this PipelineJobSchedule. Active runs are those
228+
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
218229
service_account (str):
219230
Optional. Specifies the service account for workload run-as account.
220231
Users submitting jobs must have act-as permission on this run-as account.
@@ -239,6 +250,10 @@ def _create(
239250
self._gca_resource.max_run_count = max_run_count
240251
if max_concurrent_run_count:
241252
self._gca_resource.max_concurrent_run_count = max_concurrent_run_count
253+
if max_concurrent_active_run_count:
254+
self._gca_resource.max_concurrent_active_run_count = (
255+
max_concurrent_active_run_count
256+
)
242257

243258
service_account = service_account or initializer.global_config.service_account
244259
network = network or initializer.global_config.network
@@ -383,6 +398,7 @@ def update(
383398
allow_queueing: Optional[bool] = None,
384399
max_run_count: Optional[int] = None,
385400
max_concurrent_run_count: Optional[int] = None,
401+
max_concurrent_active_run_count: Optional[int] = None,
386402
) -> None:
387403
"""Update an existing PipelineJobSchedule.
388404
@@ -415,6 +431,10 @@ def update(
415431
Must be positive and <= 2^63-1.
416432
max_concurrent_run_count (int):
417433
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
434+
max_concurrent_active_run_count (int):
435+
Optional. Maximum number of active runs that can be executed
436+
concurrently for this PipelineJobSchedule. Active runs are those
437+
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
418438
419439
Raises:
420440
RuntimeError: User tried to call update() before create().
@@ -451,6 +471,13 @@ def update(
451471
"max_concurrent_run_count",
452472
max_concurrent_run_count,
453473
)
474+
if max_concurrent_active_run_count is not None:
475+
updated_fields.append("max_concurrent_active_run_count")
476+
setattr(
477+
pipeline_job_schedule,
478+
"max_concurrent_active_run_count",
479+
max_concurrent_active_run_count,
480+
)
454481

455482
update_mask = field_mask.FieldMask(paths=updated_fields)
456483
self.api_client.update_schedule(

google/cloud/aiplatform/pipeline_jobs.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,7 @@ def create_schedule(
542542
allow_queueing: bool = False,
543543
max_run_count: Optional[int] = None,
544544
max_concurrent_run_count: int = 1,
545+
max_concurrent_active_run_count: Optional[int] = None,
545546
service_account: Optional[str] = None,
546547
network: Optional[str] = None,
547548
create_request_timeout: Optional[float] = None,
@@ -582,6 +583,10 @@ def create_schedule(
582583
Must be positive and <= 2^63-1.
583584
max_concurrent_run_count (int):
584585
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
586+
max_concurrent_active_run_count (int):
587+
Optional. Maximum number of active runs that can be executed
588+
concurrently for this PipelineJobSchedule. Active runs are those
589+
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
585590
service_account (str):
586591
Optional. Specifies the service account for workload run-as account.
587592
Users submitting jobs must have act-as permission on this run-as account.
@@ -616,6 +621,7 @@ def create_schedule(
616621
allow_queueing=allow_queueing,
617622
max_run_count=max_run_count,
618623
max_concurrent_run_count=max_concurrent_run_count,
624+
max_concurrent_active_run_count=max_concurrent_active_run_count,
619625
service_account=service_account,
620626
network=network,
621627
create_request_timeout=create_request_timeout,

google/cloud/aiplatform/preview/pipelinejob/pipeline_jobs.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ def create_schedule(
295295
allow_queueing: bool = False,
296296
max_run_count: Optional[int] = None,
297297
max_concurrent_run_count: int = 1,
298+
max_concurrent_active_run_count: Optional[int] = None,
298299
service_account: Optional[str] = None,
299300
network: Optional[str] = None,
300301
create_request_timeout: Optional[float] = None,
@@ -335,6 +336,10 @@ def create_schedule(
335336
Must be positive and <= 2^63-1.
336337
max_concurrent_run_count (int):
337338
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
339+
max_concurrent_active_run_count (int):
340+
Optional. Maximum number of active runs that can be executed
341+
concurrently for this PipelineJobSchedule. Active runs are those
342+
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
338343
service_account (str):
339344
Optional. Specifies the service account for workload run-as account.
340345
Users submitting jobs must have act-as permission on this run-as account.
@@ -358,6 +363,7 @@ def create_schedule(
358363
allow_queueing=allow_queueing,
359364
max_run_count=max_run_count,
360365
max_concurrent_run_count=max_concurrent_run_count,
366+
max_concurrent_active_run_count=max_concurrent_active_run_count,
361367
service_account=service_account,
362368
network=network,
363369
create_request_timeout=create_request_timeout,

google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def create(
7575
allow_queueing: bool = False,
7676
max_run_count: Optional[int] = None,
7777
max_concurrent_run_count: int = 1,
78+
max_concurrent_active_run_count: Optional[int] = None,
7879
service_account: Optional[str] = None,
7980
network: Optional[str] = None,
8081
create_request_timeout: Optional[float] = None,
@@ -101,6 +102,10 @@ def create(
101102
Must be positive and <= 2^63-1.
102103
max_concurrent_run_count (int):
103104
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
105+
max_concurrent_active_run_count (int):
106+
Optional. Maximum number of active runs that can be executed
107+
concurrently for this PipelineJobSchedule. Active runs are those
108+
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
104109
service_account (str):
105110
Optional. Specifies the service account for workload run-as account.
106111
Users submitting jobs must have act-as permission on this run-as account.
@@ -120,6 +125,7 @@ def create(
120125
allow_queueing=allow_queueing,
121126
max_run_count=max_run_count,
122127
max_concurrent_run_count=max_concurrent_run_count,
128+
max_concurrent_active_run_count=max_concurrent_active_run_count,
123129
service_account=service_account,
124130
network=network,
125131
create_request_timeout=create_request_timeout,
@@ -190,6 +196,7 @@ def update(
190196
allow_queueing: Optional[bool] = None,
191197
max_run_count: Optional[int] = None,
192198
max_concurrent_run_count: Optional[int] = None,
199+
max_concurrent_active_run_count: Optional[int] = None,
193200
) -> None:
194201
"""Update an existing PipelineJobSchedule.
195202
@@ -222,6 +229,10 @@ def update(
222229
Must be positive and <= 2^63-1.
223230
max_concurrent_run_count (int):
224231
Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
232+
max_concurrent_active_run_count (int):
233+
Optional. Maximum number of active runs that can be executed
234+
concurrently for this PipelineJobSchedule. Active runs are those
235+
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
225236
226237
Raises:
227238
RuntimeError: User tried to call update() before create().
@@ -234,4 +245,5 @@ def update(
234245
allow_queueing=allow_queueing,
235246
max_run_count=max_run_count,
236247
max_concurrent_run_count=max_concurrent_run_count,
248+
max_concurrent_active_run_count=max_concurrent_active_run_count,
237249
)

google/cloud/aiplatform/schedules.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,16 @@ def max_concurrent_run_count(self) -> int:
186186
self._sync_gca_resource()
187187
return self._gca_resource.max_concurrent_run_count
188188

189+
@property
190+
def max_concurrent_active_run_count(self) -> int:
191+
"""Current Schedule max_concurrent_active_run_count.
192+
193+
Returns:
194+
Schedule max_concurrent_active_run_count.
195+
"""
196+
self._sync_gca_resource()
197+
return self._gca_resource.max_concurrent_active_run_count
198+
189199
@property
190200
def allow_queueing(self) -> bool:
191201
"""Whether current Schedule allows queueing.

google/cloud/aiplatform_v1/types/schedule.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ class Schedule(proto.Message):
119119
the limit for starting the scheduled requests
120120
and not the execution of the operations/jobs
121121
created by the requests (if applicable).
122+
max_concurrent_active_run_count (int):
123+
Optional. Maximum number of active runs that can be executed
124+
concurrently for this PipelineJobSchedule. Active runs are those
125+
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
122126
allow_queueing (bool):
123127
Optional. Whether new scheduled runs can be queued when
124128
max_concurrent_runs limit is reached. If set to true, new
@@ -265,6 +269,10 @@ class RunResponse(proto.Message):
265269
proto.INT64,
266270
number=11,
267271
)
272+
max_concurrent_active_run_count: int = proto.Field(
273+
proto.INT64,
274+
number=21,
275+
)
268276
allow_queueing: bool = proto.Field(
269277
proto.BOOL,
270278
number=12,

google/cloud/aiplatform_v1beta1/types/schedule.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ class Schedule(proto.Message):
125125
the limit for starting the scheduled requests
126126
and not the execution of the operations/jobs
127127
created by the requests (if applicable).
128+
max_concurrent_active_run_count (int):
129+
Optional. Maximum number of active runs that can be executed
130+
concurrently for this PipelineJobSchedule. Active runs are those
131+
in a non-terminal state (e.g., RUNNING, PENDING, or QUEUED).
128132
allow_queueing (bool):
129133
Optional. Whether new scheduled runs can be queued when
130134
max_concurrent_runs limit is reached. If set to true, new
@@ -279,6 +283,10 @@ class RunResponse(proto.Message):
279283
proto.INT64,
280284
number=11,
281285
)
286+
max_concurrent_active_run_count: int = proto.Field(
287+
proto.INT64,
288+
number=21,
289+
)
282290
allow_queueing: bool = proto.Field(
283291
proto.BOOL,
284292
number=12,

tests/unit/aiplatform/test_pipeline_job_schedules.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@
7272
_TEST_PIPELINE_JOB_SCHEDULE_CRON = "* * * * *"
7373
_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT = 1
7474
_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT = 2
75+
_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT = 10
7576

7677
_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_CRON = "1 1 1 1 1"
7778
_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT = 5
79+
_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT = 3
7880

7981
_TEST_TEMPLATE_PATH = f"gs://{_TEST_GCS_BUCKET_NAME}/job_spec.json"
8082
_TEST_AR_TEMPLATE_PATH = "https://us-central1-kfp.pkg.dev/proj/repo/pack/latest"
@@ -472,6 +474,7 @@ def test_call_preview_schedule_service_create(
472474
cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON,
473475
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
474476
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
477+
max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
475478
service_account=_TEST_SERVICE_ACCOUNT,
476479
network=_TEST_NETWORK,
477480
create_request_timeout=None,
@@ -494,6 +497,7 @@ def test_call_preview_schedule_service_create(
494497
cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON,
495498
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
496499
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
500+
max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
497501
create_pipeline_job_request={
498502
"parent": _TEST_PARENT,
499503
"pipeline_job": {
@@ -555,6 +559,7 @@ def test_call_schedule_service_create(
555559
cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON,
556560
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
557561
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
562+
max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
558563
service_account=_TEST_SERVICE_ACCOUNT,
559564
network=_TEST_NETWORK,
560565
create_request_timeout=None,
@@ -577,6 +582,7 @@ def test_call_schedule_service_create(
577582
cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON,
578583
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
579584
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
585+
max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
580586
create_pipeline_job_request={
581587
"parent": _TEST_PARENT,
582588
"pipeline_job": {
@@ -1947,6 +1953,7 @@ def test_call_schedule_service_update(
19471953
pipeline_job_schedule.update(
19481954
cron=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_CRON,
19491955
max_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
1956+
max_concurrent_active_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
19501957
)
19511958

19521959
expected_gapic_pipeline_job_schedule = gca_schedule.Schedule(
@@ -1956,6 +1963,7 @@ def test_call_schedule_service_update(
19561963
cron=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_CRON,
19571964
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
19581965
max_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
1966+
max_concurrent_active_run_count=_TEST_UPDATED_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
19591967
create_pipeline_job_request=_TEST_CREATE_PIPELINE_JOB_REQUEST,
19601968
)
19611969
assert (
@@ -2268,3 +2276,56 @@ def test_get_allow_queueing_before_create(
22682276
)
22692277

22702278
pipeline_job_schedule.allow_queueing
2279+
2280+
@pytest.mark.parametrize(
2281+
"job_spec",
2282+
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
2283+
)
2284+
def test_get_max_concurrent_active_run_count_before_create(
2285+
self,
2286+
mock_schedule_service_create,
2287+
mock_schedule_service_get,
2288+
mock_schedule_bucket_exists,
2289+
job_spec,
2290+
mock_load_yaml_and_json,
2291+
):
2292+
"""Gets the PipelineJobSchedule max_concurrent_active_run_count before creating.
2293+
2294+
Raises error because PipelineJobSchedule should be created first.
2295+
"""
2296+
aiplatform.init(
2297+
project=_TEST_PROJECT,
2298+
staging_bucket=_TEST_GCS_BUCKET_NAME,
2299+
location=_TEST_LOCATION,
2300+
credentials=_TEST_CREDENTIALS,
2301+
)
2302+
2303+
job = pipeline_jobs.PipelineJob(
2304+
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
2305+
template_path=_TEST_TEMPLATE_PATH,
2306+
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
2307+
input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS,
2308+
enable_caching=True,
2309+
)
2310+
2311+
pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule(
2312+
pipeline_job=job,
2313+
display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME,
2314+
)
2315+
2316+
with pytest.raises(RuntimeError) as e:
2317+
pipeline_job_schedule.max_concurrent_active_run_count
2318+
2319+
assert e.match(regexp=r"PipelineJobSchedule resource has not been created.")
2320+
2321+
pipeline_job_schedule.create(
2322+
cron=_TEST_PIPELINE_JOB_SCHEDULE_CRON,
2323+
max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT,
2324+
max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT,
2325+
max_concurrent_active_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_ACTIVE_RUN_COUNT,
2326+
service_account=_TEST_SERVICE_ACCOUNT,
2327+
network=_TEST_NETWORK,
2328+
create_request_timeout=None,
2329+
)
2330+
2331+
pipeline_job_schedule.max_concurrent_active_run_count

0 commit comments

Comments
 (0)