Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion src/DIRAC/MonitoringSystem/Service/WebAppHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.JEncode import strToIntDict
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.Request import Request
Expand Down Expand Up @@ -334,6 +333,74 @@ def export_getSiteSummarySelectors(cls):

return S_OK(resultDict)

types_getApplicationStates = []

@classmethod
def export_getApplicationStates(cls, condDict=None, older=None, newer=None):
"""Return Distinct Values of ApplicationStatus job Attribute in WMS"""
return cls.jobDB.getDistinctJobAttributes("ApplicationStatus", condDict, older, newer)

types_getJobTypes = []

@classmethod
def export_getJobTypes(cls, condDict=None, older=None, newer=None):
"""Return Distinct Values of JobType job Attribute in WMS"""
return cls.jobDB.getDistinctJobAttributes("JobType", condDict, older, newer)

types_getOwners = []

@classmethod
def export_getOwners(cls, condDict=None, older=None, newer=None):
"""
Return Distinct Values of Owner job Attribute in WMS
"""
return cls.jobDB.getDistinctJobAttributes("Owner", condDict, older, newer)

types_getOwnerGroup = []

@classmethod
def export_getOwnerGroup(cls):
"""
Return Distinct Values of OwnerGroup from the JobDB
"""
return cls.jobDB.getDistinctJobAttributes("OwnerGroup")

types_getJobGroups = []

@classmethod
def export_getJobGroups(cls, condDict=None, older=None, cutDate=None):
"""
Return Distinct Values of ProductionId job Attribute in WMS
"""
return cls.jobDB.getDistinctJobAttributes("JobGroup", condDict, older, newer=cutDate)

types_getSites = []

@classmethod
def export_getSites(cls, condDict=None, older=None, newer=None):
"""
Return Distinct Values of Site job Attribute in WMS
"""
return cls.jobDB.getDistinctJobAttributes("Site", condDict, older, newer)

types_getStates = []

@classmethod
def export_getStates(cls, condDict=None, older=None, newer=None):
"""
Return Distinct Values of Status job Attribute in WMS
"""
return cls.jobDB.getDistinctJobAttributes("Status", condDict, older, newer)

types_getMinorStates = []

@classmethod
def export_getMinorStates(cls, condDict=None, older=None, newer=None):
"""
Return Distinct Values of Minor Status job Attribute in WMS
"""
return cls.jobDB.getDistinctJobAttributes("MinorStatus", condDict, older, newer)

##############################################################################
# Transformations
##############################################################################
Expand Down
23 changes: 10 additions & 13 deletions src/DIRAC/TransformationSystem/Agent/TaskManagerAgentBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@
In case you want to further extend it you are required to follow the note on the
initialize method and on the _getClients method.
"""
import time
import datetime
import concurrent.futures
import datetime
import time

from DIRAC import S_OK

from DIRAC import S_OK, gConfig
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
from DIRAC.Core.Utilities.List import breakListIntoChunks
from DIRAC.Core.Utilities.Dictionaries import breakDictionaryIntoChunks
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities.List import breakListIntoChunks
from DIRAC.TransformationSystem.Agent.TransformationAgentsUtilities import TransformationAgentsUtilities
from DIRAC.TransformationSystem.Client.FileReport import FileReport
from DIRAC.TransformationSystem.Client.WorkflowTasks import WorkflowTasks
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
from DIRAC.TransformationSystem.Agent.TransformationAgentsUtilities import TransformationAgentsUtilities
from DIRAC.TransformationSystem.Client.WorkflowTasks import WorkflowTasks
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient

Expand Down Expand Up @@ -193,11 +192,9 @@ def execute(self):
else:
# Get the transformations which should be submitted
self.tasksPerLoop = self.am_getOption("TasksPerLoop", self.tasksPerLoop)
res = self.jobManagerClient.getMaxParametricJobs()
if not res["OK"]:
self.log.warn("Could not get the maxParametricJobs from JobManager", res["Message"])
else:
self.maxParametricJobs = res["Value"]
self.maxParametricJobs = gConfig.getValue(
"/Systems/WorkloadManagement/Services/JobManager/MaxParametricJobs", self.maxParametricJobs
)

self._addOperationForTransformations(
self.operationsOnTransformationDict,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

# # from DIRAC
from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities.DErrno import cmpError
from DIRAC.Core.Utilities.List import breakListIntoChunks
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
Expand All @@ -36,6 +34,7 @@
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB

# # agent's name
AGENT_NAME = "Transformation/TransformationCleaningAgent"
Expand Down Expand Up @@ -65,6 +64,8 @@ def __init__(self, *args, **kwargs):
self.reqClient = None
# # file catalog client
self.metadataClient = None
# # JobDB
self.jobDB = None

# # transformations types
self.transformationTypes = None
Expand Down Expand Up @@ -127,6 +128,8 @@ def initialize(self):
self.metadataClient = FileCatalogClient()
# # job monitoring client
self.jobMonitoringClient = JobMonitoringClient()
# # job DB
self.jobDB = JobDB()

return S_OK()

Expand Down Expand Up @@ -224,7 +227,7 @@ def finalize(self):
So, we should just clean from time to time.
What I added here is done only when the agent finalize, and it's quite light-ish operation anyway.
"""
res = self.jobMonitoringClient.getJobGroups(None, datetime.utcnow() - timedelta(days=365))
res = self.jobDB.getDistinctJobAttributes("JobGroup", None, datetime.utcnow() - timedelta(days=365))
if not res["OK"]:
self.log.error("Failed to get job groups", res["Message"])
return res
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ def getJobParameters(self, jobIDs, parName=None):
res["Value"] = strToIntDict(res["Value"])
return res

@ignoreEncodeWarning
def getJobsParameters(self, jobIDs, parameters):
res = self._getRPC().getJobsParameters(jobIDs, parameters)

# Cast the str keys to int
if res["OK"]:
res["Value"] = strToIntDict(res["Value"])
return res

@ignoreEncodeWarning
def getJobsMinorStatus(self, jobIDs):
res = self._getRPC().getJobsMinorStatus(jobIDs)
Expand Down
58 changes: 0 additions & 58 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -1190,64 +1190,6 @@ def rescheduleJob(self, jobID):

return retVal

#############################################################################
def getSiteSummary(self):
"""Get the summary of jobs in a given status on all the sites"""

waitingList = ['"Submitted"', '"Assigned"', '"Waiting"', '"Matched"']
waitingString = ",".join(waitingList)

result = self.getDistinctJobAttributes("Site")
if not result["OK"]:
return result

siteList = result["Value"]
siteDict = {}
totalDict = {
JobStatus.WAITING: 0,
JobStatus.RUNNING: 0,
JobStatus.STALLED: 0,
JobStatus.DONE: 0,
JobStatus.FAILED: 0,
}

for site in siteList:
if site == "ANY":
continue
# Waiting
siteDict[site] = {}
ret = self._escapeString(site)
if not ret["OK"]:
return ret
e_site = ret["Value"]

req = f"SELECT COUNT(JobID) FROM Jobs WHERE Status IN ({waitingString}) AND Site={e_site}"
result = self._query(req)
if result["OK"]:
count = result["Value"][0][0]
else:
return S_ERROR("Failed to get Site data from the JobDB")
siteDict[site][JobStatus.WAITING] = count
totalDict[JobStatus.WAITING] += count
# Running,Stalled,Done,Failed
for status in [
f'"{JobStatus.RUNNING}"',
f'"{JobStatus.STALLED}"',
f'"{JobStatus.DONE}"',
f'"{JobStatus.FAILED}"',
]:
req = f"SELECT COUNT(JobID) FROM Jobs WHERE Status={status} AND Site={e_site}"
result = self._query(req)
if result["OK"]:
count = result["Value"][0][0]
else:
return S_ERROR("Failed to get Site data from the JobDB")
siteDict[site][status.replace('"', "")] = count
totalDict[status.replace('"', "")] += count

siteDict["Total"] = totalDict
return S_OK(siteDict)

#################################################################################
def getSiteSummaryWeb(self, selectDict, sortList, startItem, maxItems):
"""Get the summary of jobs in a given status on all the sites in the standard Web form"""
Expand Down
10 changes: 0 additions & 10 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,6 @@ def __sendJobsToOptimizationMind(self, jids):
return
self.log.info("Optimize msg sent", f"for {len(jids)} jobs")

###########################################################################
types_getMaxParametricJobs = []

def export_getMaxParametricJobs(self):
"""Get the maximum number of parametric jobs

:return: S_OK()/S_ERROR()
"""
return S_OK(self.maxParametricJobs)

types_submitJob = [str]

def export_submitJob(self, jobDesc):
Expand Down
Loading
Loading