diff --git a/diracx-api/src/diracx/api/jobs.py b/diracx-api/src/diracx/api/jobs.py index 181b7b9b7..2e3a15eda 100644 --- a/diracx-api/src/diracx/api/jobs.py +++ b/diracx-api/src/diracx/api/jobs.py @@ -1,6 +1,6 @@ from __future__ import annotations -__all__ = ("create_sandbox", "download_sandbox") +__all__ = ("create_sandbox", "download_sandbox", "submit_jobs") import hashlib import logging @@ -13,6 +13,7 @@ import httpx import zstandard +from DIRACCommon.Core.Utilities.ClassAd.ClassAdLight import ClassAd from diracx.client.aio import AsyncDiracClient from diracx.client.models import SandboxInfo @@ -123,3 +124,26 @@ async def download_sandbox(pfn: str, destination: Path, *, client: AsyncDiracCli with tarfile_open(fh) as tf: tf.extractall(path=destination, filter="data") logger.debug("Extracted %s to %s", pfn, destination) + + +@with_client +async def submit_jobs(jdls: list[str], *, client: AsyncDiracClient): + # Create and upload InputSandboxes from JDLs + for i, jdl in enumerate(jdls): + # Fix possible lack of brackets + if jdl.strip()[0] != "[": + jdl = f"[{jdl}]" + + class_ad_job = ClassAd(jdl) + if class_ad_job.lookupAttribute("InputSandbox"): + isb = class_ad_job.getListFromExpression("InputSandbox") + sandboxes_pfn = await create_sandbox( + paths=[Path(file_path) for file_path in isb] + ) + logging.info(f"InputSandbox created: {sandboxes_pfn[13:]}") + class_ad_job.set_expression("InputSandbox", {sandboxes_pfn}) + + jdls[i] = class_ad_job.asJDL() + + jobs = await client.jobs.submit_jdl_jobs(list(jdls)) + return jobs diff --git a/diracx-cli/src/diracx/cli/jobs.py b/diracx-cli/src/diracx/cli/jobs.py index 8039363d0..44fd614ad 100644 --- a/diracx-cli/src/diracx/cli/jobs.py +++ b/diracx-cli/src/diracx/cli/jobs.py @@ -12,6 +12,7 @@ from rich.table import Table from typer import FileText, Option +from diracx.api.jobs import submit_jobs from diracx.client.aio import AsyncDiracClient from diracx.core.models import ScalarSearchOperator, SearchSpec, VectorSearchOperator from diracx.core.preferences import OutputFormats, get_diracx_preferences @@ -150,9 +151,10 @@ def display_rich(data, content_range: ContentRange) -> None: @app.async_command() -async def submit(jdl: list[FileText]): - async with AsyncDiracClient() as api: - jobs = await api.jobs.submit_jdl_jobs([x.read() for x in jdl]) +async def submit(jdls: list[FileText]): + jdls_values = [jdl.read() for jdl in jdls] + + jobs = await submit_jobs(jdls_values) print( f"Inserted {len(jobs)} jobs with ids: {','.join(map(str, (job.job_id for job in jobs)))}" ) diff --git a/diracx-logic/src/diracx/logic/jobs/sandboxes.py b/diracx-logic/src/diracx/logic/jobs/sandboxes.py index 43856b849..f0c601cb5 100644 --- a/diracx-logic/src/diracx/logic/jobs/sandboxes.py +++ b/diracx-logic/src/diracx/logic/jobs/sandboxes.py @@ -153,14 +153,15 @@ async def assign_sandbox_to_job( job_id: int, pfn: str, sandbox_metadata_db: SandboxMetadataDB, + sandbox_type: Literal["input", "output"], settings: SandboxStoreSettings, ): - """Map the pfn as output sandbox to job.""" + """Map the pfn as input or output sandbox to job.""" short_pfn = pfn.split("|", 1)[-1] await sandbox_metadata_db.assign_sandbox_to_jobs( jobs_ids=[job_id], pfn=short_pfn, - sb_type=SandboxType.Output, + sb_type=SandboxType(sandbox_type.capitalize()), se_name=settings.se_name, ) diff --git a/diracx-logic/src/diracx/logic/jobs/submission.py b/diracx-logic/src/diracx/logic/jobs/submission.py index efa2e4a60..bb60f1a38 100644 --- a/diracx-logic/src/diracx/logic/jobs/submission.py +++ b/diracx-logic/src/diracx/logic/jobs/submission.py @@ -184,10 +184,6 @@ async def create_jdl_jobs(jobs: list[JobSubmissionSpec], job_db: JobDB, config: ) ) - # Fix possible lack of brackets - if original_jdl.strip()[0] != "[": - original_jdl = f"[{original_jdl}]" - original_jdls.append( ( original_jdl, diff --git a/diracx-routers/src/diracx/routers/jobs/sandboxes.py b/diracx-routers/src/diracx/routers/jobs/sandboxes.py index 15abbcfc2..b532a8a7c 100644 --- a/diracx-routers/src/diracx/routers/jobs/sandboxes.py +++ b/diracx-routers/src/diracx/routers/jobs/sandboxes.py @@ -129,19 +129,22 @@ async def get_job_sandbox( return await get_job_sandbox_bl(job_id, sandbox_metadata_db, sandbox_type) -@router.patch("/{job_id}/sandbox/output") +@router.patch("/{job_id}/sandbox/{sandbox_type}") async def assign_sandbox_to_job( job_id: int, pfn: Annotated[str, Body(max_length=256, pattern=SANDBOX_PFN_REGEX)], + sandbox_type: Literal["input", "output"], sandbox_metadata_db: SandboxMetadataDB, job_db: JobDB, settings: SandboxStoreSettings, check_permissions: CheckWMSPolicyCallable, ): - """Map the pfn as output sandbox to job.""" + """Map the pfn as input or output sandbox to job.""" await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id]) try: - await assign_sandbox_to_job_bl(job_id, pfn, sandbox_metadata_db, settings) + await assign_sandbox_to_job_bl( + job_id, pfn, sandbox_metadata_db, sandbox_type, settings + ) except SandboxNotFoundError as e: raise HTTPException( status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox not found"