Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
:caption: StatesAccountingAgent options
"""
import datetime
import threading

from DIRAC import S_ERROR, S_OK
from DIRAC.AccountingSystem.Client.DataStoreClient import DataStoreClient
Expand Down Expand Up @@ -49,6 +50,8 @@ def initialize(self):
"""Standard initialization"""
# This agent will always loop every 15 minutes
self.am_setOption("PollingTime", 900)
# This agent will restart every 24 hours
self.am_setOption("MaxCycles", 96)

# Check whether to send to Monitoring or Accounting or both
self.jobMonitoringOption = Operations().getMonitoringBackends(monitoringType="WMSHistory")
Expand All @@ -65,6 +68,18 @@ def initialize(self):
if "Monitoring" in self.pilotMonitoringOption:
self.pilotReporter = MonitoringReporter(monitoringType="PilotsHistory", failoverQueueName=messageQueue)

# self.fill_time: now -1h
now = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
self.fill_time = now.strftime("%Y-%m-%d %H:%M:%S")

threadJobDB = threading.Thread(target=lambda: JobDB().fillJobsHistorySummary(self.fill_time))
threadJobDB.daemon = True
threadJobDB.start()

threadPilotDB = threading.Thread(target=lambda: PilotAgentsDB().fillPilotsHistorySummary(self.fill_time))
threadPilotDB.daemon = True
threadPilotDB.start()

self.__jobDBFields = []
for field in self.__summaryKeyFieldsMapping:
if field == "User":
Expand All @@ -85,16 +100,15 @@ def execute(self):
# PilotsHistory to Monitoring
if "Monitoring" in self.pilotMonitoringOption:
self.log.info("Committing PilotsHistory to Monitoring")
result = PilotAgentsDB().getSummarySnapshot()
result = PilotAgentsDB().getSummarySnapshot(fill_time=self.fill_time)
now = datetime.datetime.utcnow()
if not result["OK"]:
self.log.error(
"Can't get the PilotAgentsDB summary",
f"{result['Message']}: won't commit PilotsHistory at this cycle",
)

values = result["Value"][1]
for record in values:
for record in result["Value"]:
rD = {}
for iP, _ in enumerate(self.__pilotsMapping):
rD[self.__pilotsMapping[iP]] = record[iP]
Expand All @@ -109,16 +123,14 @@ def execute(self):

# WMSHistory to Monitoring or Accounting
self.log.info(f"Committing WMSHistory to {'and '.join(self.jobMonitoringOption)} backend")
result = JobDB().getSummarySnapshot(self.__jobDBFields)
result = JobDB().getSummarySnapshot(fill_time=self.fill_time, requestedFields=self.__jobDBFields)
now = datetime.datetime.utcnow()
if not result["OK"]:
self.log.error("Can't get the JobDB summary", f"{result['Message']}: won't commit WMSHistory at this cycle")
return S_ERROR()

values = result["Value"][1]

self.log.info("Start sending WMSHistory records")
for record in values:
for record in result["Value"]:
rD = {}
for fV in self.__summaryDefinedFields:
rD[fV[0]] = fV[1]
Expand Down
63 changes: 55 additions & 8 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise, SErrorException
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, SErrorException, convertToReturnValue, returnValueOrRaise
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
Expand Down Expand Up @@ -1510,17 +1510,64 @@ def setJobCommandStatus(self, jobID, command, status):
return self._update(f"UPDATE JobCommands SET Status={status} WHERE JobID={jobID} AND Command={command}")

#####################################################################################
def getSummarySnapshot(self, requestedFields=False):
def fillJobsHistorySummary(self, fill_time: str):
"""Fill the JobsHistorySummary table with the summary of the jobs in a final state"""

# Create the staging table
createStagingTable_sql = "CREATE TABLE IF NOT EXISTS JobsHistorySummary_staging LIKE JobsHistorySummary"
if not (result := self._update(createStagingTable_sql))["OK"]:
return result

# Insert the data into the staging table
defString = "Status, Site, Owner, OwnerGroup, JobGroup, JobType, ApplicationStatus, MinorStatus"
valuesString = "COUNT(JobID), SUM(RescheduleCounter)"
final_states = "', '".join(JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES)
final_states = f"'{final_states}'"
query = (
f"INSERT INTO JobsHistorySummary_staging SELECT {defString}, {valuesString} "
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime < '{fill_time}' "
f"GROUP BY {defString}"
)
if not (result := self._update(query))["OK"]:
return result

# Atomic swap
sql = (
"RENAME TABLE JobsHistorySummary TO JobsHistorySummary_old,"
"JobsHistorySummary_staging TO JobsHistorySummary;"
"DROP TABLE JobsHistorySummary_old;"
)
return self._update(sql)

def getSummarySnapshot(self, fill_time: str, requestedFields=False):
"""Get the summary snapshot for a given combination"""
if not requestedFields:
requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"]
valueFields = ["COUNT(JobID)", "SUM(RescheduleCounter)"]
defString = ", ".join(requestedFields)
valueString = ", ".join(valueFields)
result = self._query(f"SELECT {defString}, {valueString} FROM Jobs GROUP BY {defString}")
if not result["OK"]:
return result
return S_OK(((requestedFields + valueFields), result["Value"]))
final_states = "', '".join(JobStatus.JOB_FINAL_STATES + JobStatus.JOB_REALLY_FINAL_STATES)
final_states = f"'{final_states}'"

query = f"SELECT {defString}, SUM(JobCount) AS JobCount, SUM(RescheduleSum) AS RescheduleSum FROM ("
# All jobs that are NOT in a final state
query += (
f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum "
f"FROM Jobs WHERE STATUS NOT IN ({final_states}) "
f"GROUP BY {defString} "
)
query += "UNION ALL "
# Recent jobs only (after fill_time) that are in a final state
query += (
f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum "
f"FROM Jobs WHERE Status IN ({final_states}) AND LastUpdateTime >= '{fill_time}' "
f"GROUP BY {defString} "
)
query += "UNION ALL "
# Cached history (of jobs in a final state)
query += (
f"SELECT {defString}, JobCount, RescheduleSum FROM JobsHistorySummary) AS combined GROUP BY {defString}"
)

return self._query(query)

def removeInfoFromHeartBeatLogging(self, status, delTime, maxLines):
"""Remove HeartBeatLoggingInfo from DB.
Expand Down
16 changes: 16 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,19 @@ CREATE TABLE `JobCommands` (
PRIMARY KEY (`JobID`,`Arguments`,`ReceptionTime`),
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- ------------------------------------------------------------------------------
DROP TABLE IF EXISTS `JobsHistorySummary`;
CREATE TABLE `JobsHistorySummary` (
`Status` VARCHAR(32),
`Site` VARCHAR(100),
`Owner` VARCHAR(32),
`OwnerGroup` VARCHAR(128),
`JobGroup` VARCHAR(32),
`JobType` VARCHAR(32),
`ApplicationStatus` VARCHAR(255),
`MinorStatus` VARCHAR(128),
`JobCount` INT,
`RescheduleSum` INT,
PRIMARY KEY (Status, Site, Owner, OwnerGroup, JobGroup, JobType, ApplicationStatus, MinorStatus)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
61 changes: 54 additions & 7 deletions src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,17 +1047,64 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems):

return S_OK(resultDict)

def getSummarySnapshot(self, requestedFields=False):
def fillPilotsHistorySummary(self, fill_time: str):
"""Fill the PilotsHistorySummary table with the summary of the Pilots in a final state"""

# Create the staging table
createStagingTable_sql = "CREATE TABLE IF NOT EXISTS PilotsHistorySummary_staging LIKE PilotsHistorySummary"
if not (result := self._update(createStagingTable_sql))["OK"]:
return result

# Insert the data into the staging table
defString = "GridSite, GridType, Status"
valuesString = "COUNT(PilotID)"
final_states = "', '".join(PilotStatus.PILOT_FINAL_STATES)
final_states = f"'{final_states}'"

query = (
f"INSERT INTO PilotsHistorySummary_staging SELECT {defString}, {valuesString} "
f"FROM PilotAgents WHERE Status IN ({final_states}) AND LastUpdateTime < '{fill_time}' "
f"GROUP BY {defString}"
)
if not (result := self._update(query))["OK"]:
return result

# Atomic swap
sql = (
"RENAME TABLE PilotsHistorySummary TO PilotsHistorySummary_old,"
"PilotsHistorySummary_staging TO PilotsHistorySummary;"
"DROP TABLE PilotsHistorySummary_old;"
)
return self._update(sql)

def getSummarySnapshot(self, fill_time: str, requestedFields=False):
"""Get the summary snapshot for a given combination"""
if not requestedFields:
requestedFields = ["GridSite", "GridType", "Status"]
valueFields = ["COUNT(PilotID)"]
defString = ", ".join(requestedFields)
valueString = ", ".join(valueFields)
result = self._query(f"SELECT {defString}, {valueString} FROM PilotAgents GROUP BY {defString}")
if not result["OK"]:
return result
return S_OK(((requestedFields + valueFields), result["Value"]))
valueString = "COUNT(PilotID) AS PilotCount"
final_states = "', '".join(PilotStatus.PILOT_FINAL_STATES)
final_states = f"'{final_states}'"

query = f"SELECT {defString}, SUM(PilotCount) AS PilotCount FROM ("
# All Pilots that are NOT in a final state
query += (
f"SELECT {defString}, {valueString} "
f"FROM PilotAgents WHERE STATUS NOT IN ({final_states}) "
f"GROUP BY {defString} "
)
query += "UNION ALL "
# Recent Pilots only (today) that are in a final state
query += (
f"SELECT {defString}, {valueString} "
f"FROM PilotAgents WHERE Status IN ({final_states}) AND LastUpdateTime >= '{fill_time}' "
f"GROUP BY {defString} "
)
query += "UNION ALL "
# Cached history (of Pilots in a final state)
query += f"SELECT * FROM PilotsHistorySummary) AS combined GROUP BY {defString}"

return self._query(query)


class PivotedPilotSummaryTable:
Expand Down
9 changes: 9 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,12 @@ CREATE TABLE `PilotOutput` (
`StdError` MEDIUMTEXT,
PRIMARY KEY (`PilotID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

DROP TABLE IF EXISTS `PilotsHistorySummary`;
CREATE TABLE `PilotsHistorySummary` (
`GridSite` VARCHAR(128),
`GridType` VARCHAR(32),
`Status` VARCHAR(32),
`PilotCount` INT,
PRIMARY KEY (`GridSite`,`GridType`,`Status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
import tempfile
import time

import DIRAC
import pytest

import DIRAC

DIRAC.initialize() # Initialize configuration

from DIRAC import gLogger
Expand Down Expand Up @@ -406,7 +407,6 @@ def test_JobStateUpdateAndJobMonitoringMultiple(lfn: str) -> None:
try:
res = jobMonitoringClient.getSites()
assert res["OK"], res["Message"]
assert set(res["Value"]) <= {"ANY", "DIRAC.Jenkins.ch", "Site"}

res = jobMonitoringClient.getJobTypes()
assert res["OK"], res["Message"]
Expand Down
89 changes: 89 additions & 0 deletions tests/Integration/WorkloadManagementSystem/Test_JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# pylint: disable=wrong-import-position, missing-docstring

import csv
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -443,3 +444,91 @@ def test_attributes(jobDB):
res = jobDB.getJobsAttributes([jobID_1, jobID_2], ["Status"])
assert res["OK"], res["Message"]
assert res["Value"] == {jobID_1: {"Status": JobStatus.DONE}, jobID_2: {"Status": JobStatus.RUNNING}}


# Parse date strings into datetime objects
def process_data(jobIDs, data):
converted_data = []

full_data = []

for j, d in zip(jobIDs, data):
row = list(d)
row.insert(0, j) # Insert JobID at the beginning of the row
full_data.append(row)

for row in full_data:
# date fields
date_indices = [8, 9, 10, 11, 12, 13] # Positions of date fields
for i in date_indices:
if not row[i]:
row[i] = None
else:
try:
row[i] = datetime.strptime(row[i], "%Y-%m-%d %H:%M:%S")
except ValueError:
# Handle invalid dates
row[i] = None
# Convert other fields to appropriate types
int_indices = [17, 18] # Positions of integer fields
for i in int_indices:
if not row[i]:
row[i] = 0
else:
try:
row[i] = int(row[i])
except ValueError:
# Handle invalid integers
row[i] = 0
converted_data.append(tuple(row))
return converted_data


def test_summarySnapshot():
jobDB = JobDB()
for table in [
"InputData",
"JobParameters",
"AtticJobParameters",
"HeartBeatLoggingInfo",
"OptimizerParameters",
"JobCommands",
"Jobs",
"JobJDLs",
]:
sqlCmd = f"DELETE from `{table}`"
jobDB._update(sqlCmd)

# insert some predefined jobs to test the summary snapshot
with open("jobs.csv", newline="", encoding="utf-8") as csvfile:
csvreader = csv.reader(csvfile)
data = list(csvreader)

# First inserting the JDLs
jdlData = [(jdl, "", "")] * len(data)
res = jobDB._updatemany("INSERT INTO JobJDLs (JDL, JobRequirements, OriginalJDL) VALUES (%s,%s,%s)", jdlData)
assert res["OK"], res["Message"]
# Getting which JobIDs were inserted
res = jobDB._query("SELECT JobID FROM JobJDLs")
assert res["OK"], res["Message"]
jobIDs = [row[0] for row in res["Value"]][0 : len(data)]

# Now inserting the jobs
processed_data = process_data(jobIDs, data)
placeholders = ",".join(["%s"] * len(processed_data[0]))
sql = f"INSERT INTO Jobs (JobID, JobType, JobGroup, Site, JobName, Owner, OwnerGroup, VO, SubmissionTime, RescheduleTime, LastUpdateTime, StartExecTime, HeartBeatTime, EndExecTime, Status, MinorStatus, ApplicationStatus, UserPriority, RescheduleCounter, VerifiedFlag, AccountedFlag) VALUES ({placeholders})"
res = jobDB._updatemany(sql, processed_data)
assert res["OK"], res["Message"]
# Act
now = datetime.utcnow() - timedelta(hours=1)
fill_time = now.strftime("%Y-%m-%d %H:%M:%S")
res = jobDB.fillJobsHistorySummary(fill_time)
assert res["OK"], res["Message"]
res = jobDB.getSummarySnapshot(fill_time)
assert res["OK"], res["Message"]
requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"]
defString = ", ".join(requestedFields)
simple_query = f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum FROM Jobs GROUP BY {defString}"
res_sq = jobDB._query(simple_query)
assert res_sq["OK"], res_sq["Message"]
assert sorted(res_sq["Value"]) == sorted(res["Value"])
Loading
Loading