From 549a9db939b9a7c67b8170e9ea77edb50f79f7b3 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Mon, 5 May 2025 14:54:21 +0200 Subject: [PATCH 1/3] feat: added definitions of summary tables (JobDB and PilotAgentsDB) --- diracx-db/src/diracx/db/sql/job/schema.py | 27 +++++++++++++++++++ .../src/diracx/db/sql/pilot_agents/schema.py | 9 +++++++ 2 files changed, 36 insertions(+) diff --git a/diracx-db/src/diracx/db/sql/job/schema.py b/diracx-db/src/diracx/db/sql/job/schema.py index bb9f60bf1..b33f2e42f 100644 --- a/diracx-db/src/diracx/db/sql/job/schema.py +++ b/diracx-db/src/diracx/db/sql/job/schema.py @@ -6,6 +6,7 @@ ForeignKey, Index, Integer, + PrimaryKeyConstraint, String, Text, ) @@ -156,3 +157,29 @@ class JobCommands(JobDBBase): status = Column("Status", String(64), default="Received") reception_time = Column("ReceptionTime", DateTime, primary_key=True) execution_time = NullColumn("ExecutionTime", DateTime) + + +class JobsHistorySummary(JobDBBase): + __tablename__ = "JobsHistorySummary" + status = Column("Status", String(32)) + site = Column("Site", String(100)) + owner = Column("Owner", String(32)) + owner_group = Column("OwnerGroup", String(128)) + job_group = Column("JobGroup", String(32)) + job_type = Column("JobType", String(32)) + application_status = Column("ApplicationStatus", String(255)) + minor_status = Column("MinorStatus", String(128)) + job_count = Column("JobCount", Integer) + reschedule_sum = Column("RescheduleSum", Integer) + __table_args__ = ( + PrimaryKeyConstraint( + "Status", + "Site", + "Owner", + "OwnerGroup", + "JobGroup", + "JobType", + "ApplicationStatus", + "MinorStatus", + ), + ) diff --git a/diracx-db/src/diracx/db/sql/pilot_agents/schema.py b/diracx-db/src/diracx/db/sql/pilot_agents/schema.py index bff7c460c..04b2695f0 100644 --- a/diracx-db/src/diracx/db/sql/pilot_agents/schema.py +++ b/diracx-db/src/diracx/db/sql/pilot_agents/schema.py @@ -58,3 +58,12 @@ class PilotOutput(PilotAgentsDBBase): pilot_id = Column("PilotID", Integer, primary_key=True) std_output = Column("StdOutput", Text) std_error = Column("StdError", Text) + + +class PilotsHistorySummary(PilotAgentsDBBase): + __tablename__ = "PilotsHistorySummary" + + grid_site = Column("GridSite", String(128), primary_key=True) + grid_type = Column("GridType", String(32), primary_key=True) + status = Column("Status", String(32), primary_key=True) + pilot_count = Column("PilotCount", Integer) From c148a9bd55e78c444205efe8768878853a5e4271 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil Date: Sun, 13 Apr 2025 21:21:13 +0200 Subject: [PATCH 2/3] updated coding conventions to shun naive datetimes --- docs/CODING_CONVENTION.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/CODING_CONVENTION.md b/docs/CODING_CONVENTION.md index fdf4d894c..3fbb30a81 100644 --- a/docs/CODING_CONVENTION.md +++ b/docs/CODING_CONVENTION.md @@ -57,7 +57,6 @@ def my_ficture(): ... ```python from datetime import datetime, timedelta, timezone - delay = datetime.now(tz=timezone.utc) + timedelta(hours=1) ``` From 588e4517d59d0afe241ead2f0cca6829e116cc5c Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 6 May 2025 11:41:03 +0200 Subject: [PATCH 3/3] feat: added job.db.fill_jobs_history_summary method --- diracx-db/src/diracx/db/sql/job/db.py | 61 ++++++++++++++++++++++++++- docs/CODING_CONVENTION.md | 1 + 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/diracx-db/src/diracx/db/sql/job/db.py b/diracx-db/src/diracx/db/sql/job/db.py index 89f2bb49d..88c033d8f 100644 --- a/diracx-db/src/diracx/db/sql/job/db.py +++ b/diracx-db/src/diracx/db/sql/job/db.py @@ -5,11 +5,13 @@ from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Iterable -from sqlalchemy import bindparam, case, delete, func, insert, select, update +from sqlalchemy import bindparam, case, delete, func, insert, select, text, update if TYPE_CHECKING: from sqlalchemy.sql.elements import BindParameter +from DIRAC.WorkloadManagementSystem.Client import JobStatus + from diracx.core.exceptions import InvalidQueryError from diracx.core.models import JobCommand, SearchSpec, SortSpec @@ -333,3 +335,60 @@ async def get_job_commands(self, job_ids: Iterable[int]) -> list[JobCommand]: JobCommand(job_id=cmd.JobID, command=cmd.Command, arguments=cmd.Arguments) for cmd in commands ] + + async def fill_jobs_history_summary(self): + """Fill the JobsHistorySummary table with the summary of the jobs in a final state.""" + # Create the staging table + dialect = self.conn.dialect + if dialect == "mysql": + create_staging_table_sql = "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging LIKE JobsHistorySummary" + elif dialect == "postgresql": + create_staging_table_sql = ( + "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging " + "(LIKE JobsHistorySummary INCLUDING ALL)" + ) + elif dialect == "sqlite": + create_staging_table_sql = "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging AS JobsHistorySummary" + await self.conn.execute(text(create_staging_table_sql)) + + if dialect == "mysql": + current_date_expr = "UTC_DATE()" + elif dialect == "postgresql": + current_date_expr = "CURRENT_DATE" + elif dialect == "sqlite": + current_date_expr = "DATE('now')" + else: + raise ValueError(f"Unsupported DB dialect: {dialect}") + + # Columns for grouping + def_columns = "Status, Site, Owner, OwnerGroup, JobGroup, JobType, ApplicationStatus, MinorStatus" + agg_columns = "COUNT(JobID), SUM(RescheduleCounter)" + + # Final states list + final_states = JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES + final_states_sql = ", ".join(f"'{state}'" for state in final_states) + + # Build SQL statement + insert_sql = f""" + INSERT INTO JobsHistorySummary_staging + SELECT {def_columns}, {agg_columns} + FROM Jobs + WHERE Status IN ({final_states_sql}) + AND LastUpdateTime < {current_date_expr} + GROUP BY {def_columns} + """ # noqa: S608 + await self.conn.execute(text(insert_sql)) + + stmts = [] + + if dialect in {"mysql", "sqlite", "postgresql"}: + stmts = [ + "ALTER TABLE JobsHistorySummary RENAME TO JobsHistorySummary_old;", + "ALTER TABLE JobsHistorySummary_staging RENAME TO JobsHistorySummary;", + "DROP TABLE JobsHistorySummary_old;", + ] + else: + raise ValueError(f"Unsupported DB dialect: {dialect}") + + for stmt in stmts: + await self.conn.execute(text(stmt)) diff --git a/docs/CODING_CONVENTION.md b/docs/CODING_CONVENTION.md index 3fbb30a81..fdf4d894c 100644 --- a/docs/CODING_CONVENTION.md +++ b/docs/CODING_CONVENTION.md @@ -57,6 +57,7 @@ def my_ficture(): ... ```python from datetime import datetime, timedelta, timezone + delay = datetime.now(tz=timezone.utc) + timedelta(hours=1) ```