From bef56031908e24c340c87b416c0ff17a2d5b1f97 Mon Sep 17 00:00:00 2001 From: Stella-Maria Renucci Date: Wed, 26 Nov 2025 12:07:02 +0100 Subject: [PATCH 1/3] feat: create an ISB when submitting a job --- diracx-api/src/diracx/api/jobs.py | 29 ++++++++++++++++++- diracx-cli/src/diracx/cli/jobs.py | 6 ++-- .../src/diracx/logic/jobs/submission.py | 4 --- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/diracx-api/src/diracx/api/jobs.py b/diracx-api/src/diracx/api/jobs.py index 181b7b9b7..1ff64b1c0 100644 --- a/diracx-api/src/diracx/api/jobs.py +++ b/diracx-api/src/diracx/api/jobs.py @@ -1,6 +1,6 @@ from __future__ import annotations -__all__ = ("create_sandbox", "download_sandbox") +__all__ = ("create_sandbox", "download_sandbox", "submit_jobs") import hashlib import logging @@ -13,6 +13,8 @@ import httpx import zstandard +from DIRACCommon.Core.Utilities.ClassAd.ClassAdLight import ClassAd +from typer import FileText from diracx.client.aio import AsyncDiracClient from diracx.client.models import SandboxInfo @@ -123,3 +125,28 @@ async def download_sandbox(pfn: str, destination: Path, *, client: AsyncDiracCli with tarfile_open(fh) as tf: tf.extractall(path=destination, filter="data") logger.debug("Extracted %s to %s", pfn, destination) + + +@with_client +async def submit_jobs(jdls: list[FileText], *, client: AsyncDiracClient): + # Create and upload InputSandboxes from JDLs + for i, jdl in enumerate(jdls): + original_jdl = jdl.read() + + # Fix possible lack of brackets + if original_jdl.strip()[0] != "[": + original_jdl = f"[{original_jdl}]" + + class_ad_job = ClassAd(original_jdl) + if class_ad_job.lookupAttribute("InputSandbox"): + isb = class_ad_job.getListFromExpression("InputSandbox") + sandboxes_pfn = await create_sandbox( + paths=[Path(file_path) for file_path in isb] + ) + print(f"InputSandbox created: {sandboxes_pfn[13:]}") + class_ad_job.set_expression("InputSandbox", {sandboxes_pfn}) + + jdls[i] = class_ad_job.asJDL() + + jobs = await client.jobs.submit_jdl_jobs(list(jdls)) + return jobs diff --git a/diracx-cli/src/diracx/cli/jobs.py b/diracx-cli/src/diracx/cli/jobs.py index 8039363d0..041cf5488 100644 --- a/diracx-cli/src/diracx/cli/jobs.py +++ b/diracx-cli/src/diracx/cli/jobs.py @@ -12,6 +12,7 @@ from rich.table import Table from typer import FileText, Option +from diracx.api.jobs import submit_jobs from diracx.client.aio import AsyncDiracClient from diracx.core.models import ScalarSearchOperator, SearchSpec, VectorSearchOperator from diracx.core.preferences import OutputFormats, get_diracx_preferences @@ -150,9 +151,8 @@ def display_rich(data, content_range: ContentRange) -> None: @app.async_command() -async def submit(jdl: list[FileText]): - async with AsyncDiracClient() as api: - jobs = await api.jobs.submit_jdl_jobs([x.read() for x in jdl]) +async def submit(jdls: list[FileText]): + jobs = await submit_jobs(jdls) print( f"Inserted {len(jobs)} jobs with ids: {','.join(map(str, (job.job_id for job in jobs)))}" ) diff --git a/diracx-logic/src/diracx/logic/jobs/submission.py b/diracx-logic/src/diracx/logic/jobs/submission.py index efa2e4a60..bb60f1a38 100644 --- a/diracx-logic/src/diracx/logic/jobs/submission.py +++ b/diracx-logic/src/diracx/logic/jobs/submission.py @@ -184,10 +184,6 @@ async def create_jdl_jobs(jobs: list[JobSubmissionSpec], job_db: JobDB, config: ) ) - # Fix possible lack of brackets - if original_jdl.strip()[0] != "[": - original_jdl = f"[{original_jdl}]" - original_jdls.append( ( original_jdl, From 1338f8c70a3b82665b8e772ffd5583815fb8bc83 Mon Sep 17 00:00:00 2001 From: Stella-Maria Renucci Date: Wed, 26 Nov 2025 12:15:05 +0100 Subject: [PATCH 2/3] feat: assign input sandbox to job route --- diracx-logic/src/diracx/logic/jobs/sandboxes.py | 5 +++-- diracx-routers/src/diracx/routers/jobs/sandboxes.py | 9 ++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index 43856b849..f0c601cb5 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -153,14 +153,15 @@ async def assign_sandbox_to_job( job_id: int, pfn: str, sandbox_metadata_db: SandboxMetadataDB, + sandbox_type: Literal["input", "output"], settings: SandboxStoreSettings, ): - """Map the pfn as output sandbox to job.""" + """Map the pfn as input or output sandbox to job.""" short_pfn = pfn.split("|", 1)[-1] await sandbox_metadata_db.assign_sandbox_to_jobs( jobs_ids=[job_id], pfn=short_pfn, - sb_type=SandboxType.Output, + sb_type=SandboxType(sandbox_type.capitalize()), se_name=settings.se_name, ) diff --git a/diracx-routers/src/diracx/routers/jobs/sandboxes.py b/diracx-routers/src/diracx/routers/jobs/sandboxes.py index 15abbcfc2..b532a8a7c 100644 --- a/diracx-routers/src/diracx/routers/jobs/sandboxes.py +++ b/diracx-routers/src/diracx/routers/jobs/sandboxes.py @@ -129,19 +129,22 @@ async def get_job_sandbox( return await get_job_sandbox_bl(job_id, sandbox_metadata_db, sandbox_type) -@router.patch("/{job_id}/sandbox/output") +@router.patch("/{job_id}/sandbox/{sandbox_type}") async def assign_sandbox_to_job( job_id: int, pfn: Annotated[str, Body(max_length=256, pattern=SANDBOX_PFN_REGEX)], + sandbox_type: Literal["input", "output"], sandbox_metadata_db: SandboxMetadataDB, job_db: JobDB, settings: SandboxStoreSettings, check_permissions: CheckWMSPolicyCallable, ): - """Map the pfn as output sandbox to job.""" + """Map the pfn as input or output sandbox to job.""" await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id]) try: - await assign_sandbox_to_job_bl(job_id, pfn, sandbox_metadata_db, settings) + await assign_sandbox_to_job_bl( + job_id, pfn, sandbox_metadata_db, sandbox_type, settings + ) except SandboxNotFoundError as e: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox not found" From 11f5e42f0ee8efde876343efcea2bba3427a5e42 Mon Sep 17 00:00:00 2001 From: Stella-Maria Renucci Date: Thu, 27 Nov 2025 09:54:48 +0100 Subject: [PATCH 3/3] fix: removed Typer types from api component --- diracx-api/src/diracx/api/jobs.py | 13 +++++-------- diracx-cli/src/diracx/cli/jobs.py | 4 +++- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/diracx-api/src/diracx/api/jobs.py b/diracx-api/src/diracx/api/jobs.py index 1ff64b1c0..2e3a15eda 100644 --- a/diracx-api/src/diracx/api/jobs.py +++ b/diracx-api/src/diracx/api/jobs.py @@ -14,7 +14,6 @@ import httpx import zstandard from DIRACCommon.Core.Utilities.ClassAd.ClassAdLight import ClassAd -from typer import FileText from diracx.client.aio import AsyncDiracClient from diracx.client.models import SandboxInfo @@ -128,22 +127,20 @@ async def download_sandbox(pfn: str, destination: Path, *, client: AsyncDiracCli @with_client -async def submit_jobs(jdls: list[FileText], *, client: AsyncDiracClient): +async def submit_jobs(jdls: list[str], *, client: AsyncDiracClient): # Create and upload InputSandboxes from JDLs for i, jdl in enumerate(jdls): - original_jdl = jdl.read() - # Fix possible lack of brackets - if original_jdl.strip()[0] != "[": - original_jdl = f"[{original_jdl}]" + if jdl.strip()[0] != "[": + jdl = f"[{jdl}]" - class_ad_job = ClassAd(original_jdl) + class_ad_job = ClassAd(jdl) if class_ad_job.lookupAttribute("InputSandbox"): isb = class_ad_job.getListFromExpression("InputSandbox") sandboxes_pfn = await create_sandbox( paths=[Path(file_path) for file_path in isb] ) - print(f"InputSandbox created: {sandboxes_pfn[13:]}") + logging.info(f"InputSandbox created: {sandboxes_pfn[13:]}") class_ad_job.set_expression("InputSandbox", {sandboxes_pfn}) jdls[i] = class_ad_job.asJDL() diff --git a/diracx-cli/src/diracx/cli/jobs.py b/diracx-cli/src/diracx/cli/jobs.py index 041cf5488..44fd614ad 100644 --- a/diracx-cli/src/diracx/cli/jobs.py +++ b/diracx-cli/src/diracx/cli/jobs.py @@ -152,7 +152,9 @@ def display_rich(data, content_range: ContentRange) -> None: @app.async_command() async def submit(jdls: list[FileText]): - jobs = await submit_jobs(jdls) + jdls_values = [jdl.read() for jdl in jdls] + + jobs = await submit_jobs(jdls_values) print( f"Inserted {len(jobs)} jobs with ids: {','.join(map(str, (job.job_id for job in jobs)))}" )