diff --git a/diracx-db/src/diracx/db/sql/job/db.py b/diracx-db/src/diracx/db/sql/job/db.py index 89f2bb49d..88c033d8f 100644 --- a/diracx-db/src/diracx/db/sql/job/db.py +++ b/diracx-db/src/diracx/db/sql/job/db.py @@ -5,11 +5,13 @@ from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Iterable -from sqlalchemy import bindparam, case, delete, func, insert, select, update +from sqlalchemy import bindparam, case, delete, func, insert, select, text, update if TYPE_CHECKING: from sqlalchemy.sql.elements import BindParameter +from DIRAC.WorkloadManagementSystem.Client import JobStatus + from diracx.core.exceptions import InvalidQueryError from diracx.core.models import JobCommand, SearchSpec, SortSpec @@ -333,3 +335,60 @@ async def get_job_commands(self, job_ids: Iterable[int]) -> list[JobCommand]: JobCommand(job_id=cmd.JobID, command=cmd.Command, arguments=cmd.Arguments) for cmd in commands ] + + async def fill_jobs_history_summary(self): + """Fill the JobsHistorySummary table with the summary of the jobs in a final state.""" + # Create the staging table + dialect = self.conn.dialect + if dialect == "mysql": + create_staging_table_sql = "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging LIKE JobsHistorySummary" + elif dialect == "postgresql": + create_staging_table_sql = ( + "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging " + "(LIKE JobsHistorySummary INCLUDING ALL)" + ) + elif dialect == "sqlite": + create_staging_table_sql = "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging AS JobsHistorySummary" + await self.conn.execute(text(create_staging_table_sql)) + + if dialect == "mysql": + current_date_expr = "UTC_DATE()" + elif dialect == "postgresql": + current_date_expr = "CURRENT_DATE" + elif dialect == "sqlite": + current_date_expr = "DATE('now')" + else: + raise ValueError(f"Unsupported DB dialect: {dialect}") + + # Columns for grouping + def_columns = "Status, Site, Owner, OwnerGroup, JobGroup, JobType, ApplicationStatus, MinorStatus" + agg_columns = "COUNT(JobID), SUM(RescheduleCounter)" + + # Final states list + final_states = JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES + final_states_sql = ", ".join(f"'{state}'" for state in final_states) + + # Build SQL statement + insert_sql = f""" + INSERT INTO JobsHistorySummary_staging + SELECT {def_columns}, {agg_columns} + FROM Jobs + WHERE Status IN ({final_states_sql}) + AND LastUpdateTime < {current_date_expr} + GROUP BY {def_columns} + """ # noqa: S608 + await self.conn.execute(text(insert_sql)) + + stmts = [] + + if dialect in {"mysql", "sqlite", "postgresql"}: + stmts = [ + "ALTER TABLE JobsHistorySummary RENAME TO JobsHistorySummary_old;", + "ALTER TABLE JobsHistorySummary_staging RENAME TO JobsHistorySummary;", + "DROP TABLE JobsHistorySummary_old;", + ] + else: + raise ValueError(f"Unsupported DB dialect: {dialect}") + + for stmt in stmts: + await self.conn.execute(text(stmt)) diff --git a/diracx-db/src/diracx/db/sql/job/schema.py b/diracx-db/src/diracx/db/sql/job/schema.py index bb9f60bf1..b33f2e42f 100644 --- a/diracx-db/src/diracx/db/sql/job/schema.py +++ b/diracx-db/src/diracx/db/sql/job/schema.py @@ -6,6 +6,7 @@ ForeignKey, Index, Integer, + PrimaryKeyConstraint, String, Text, ) @@ -156,3 +157,29 @@ class JobCommands(JobDBBase): status = Column("Status", String(64), default="Received") reception_time = Column("ReceptionTime", DateTime, primary_key=True) execution_time = NullColumn("ExecutionTime", DateTime) + + +class JobsHistorySummary(JobDBBase): + __tablename__ = "JobsHistorySummary" + status = Column("Status", String(32)) + site = Column("Site", String(100)) + owner = Column("Owner", String(32)) + owner_group = Column("OwnerGroup", String(128)) + job_group = Column("JobGroup", String(32)) + job_type = Column("JobType", String(32)) + application_status = Column("ApplicationStatus", String(255)) + minor_status = Column("MinorStatus", String(128)) + job_count = Column("JobCount", Integer) + reschedule_sum = Column("RescheduleSum", Integer) + __table_args__ = ( + PrimaryKeyConstraint( + "Status", + "Site", + "Owner", + "OwnerGroup", + "JobGroup", + "JobType", + "ApplicationStatus", + "MinorStatus", + ), + ) diff --git a/diracx-db/src/diracx/db/sql/pilot_agents/schema.py b/diracx-db/src/diracx/db/sql/pilot_agents/schema.py index bff7c460c..04b2695f0 100644 --- a/diracx-db/src/diracx/db/sql/pilot_agents/schema.py +++ b/diracx-db/src/diracx/db/sql/pilot_agents/schema.py @@ -58,3 +58,12 @@ class PilotOutput(PilotAgentsDBBase): pilot_id = Column("PilotID", Integer, primary_key=True) std_output = Column("StdOutput", Text) std_error = Column("StdError", Text) + + +class PilotsHistorySummary(PilotAgentsDBBase): + __tablename__ = "PilotsHistorySummary" + + grid_site = Column("GridSite", String(128), primary_key=True) + grid_type = Column("GridType", String(32), primary_key=True) + status = Column("Status", String(32), primary_key=True) + pilot_count = Column("PilotCount", Integer)