diff --git a/src/DIRAC/MonitoringSystem/Client/WebAppClient.py b/src/DIRAC/MonitoringSystem/Client/WebAppClient.py new file mode 100644 index 00000000000..d46a7fc44a7 --- /dev/null +++ b/src/DIRAC/MonitoringSystem/Client/WebAppClient.py @@ -0,0 +1,26 @@ +""" Module that contains client access to the WebApp handler. +""" + +from DIRAC.Core.Base.Client import Client, createClient + + +@createClient("Monitoring/WebApp") +class WebAppClient(Client): + """WebAppClient sets url for the WebAppHandler.""" + + def __init__(self, url=None, **kwargs): + """ + Sets URL for WebApp handler + + :param self: self reference + :param url: url of the WebAppHandler + :param kwargs: forwarded to the Base Client class + """ + + super().__init__(**kwargs) + + if not url: + self.serverURL = "Monitoring/WebApp" + + else: + self.serverURL = url diff --git a/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg b/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg index 46b568d2bb6..dd839f7ef1f 100644 --- a/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg +++ b/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg @@ -28,4 +28,13 @@ Services } } ##END + ##BEGIN WebApp + WebApp + { + Port = 9199 + Authorization + { + Default = authenticated + } + } } diff --git a/src/DIRAC/MonitoringSystem/Service/WebAppHandler.py b/src/DIRAC/MonitoringSystem/Service/WebAppHandler.py new file mode 100644 index 00000000000..edaee4d4023 --- /dev/null +++ b/src/DIRAC/MonitoringSystem/Service/WebAppHandler.py @@ -0,0 +1,533 @@ +""" +The WebAppHandler module provides a class to handle web requests from the DIRAC WebApp. +It is not indented to be used in diracx +""" +from DIRAC import S_ERROR, S_OK +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites +from DIRAC.Core.DISET.RequestHandler import RequestHandler +from DIRAC.Core.Utilities.JEncode import strToIntDict +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.RequestManagementSystem.Client.Operation import Operation +from DIRAC.RequestManagementSystem.Client.Request import Request +from DIRAC.TransformationSystem.Client import TransformationFilesStatus +from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, JobPolicy + +TASKS_STATE_NAMES = ["TotalCreated", "Created"] + sorted( + set(JobStatus.JOB_STATES) | set(Request.ALL_STATES) | set(Operation.ALL_STATES) +) +FILES_STATE_NAMES = ["PercentProcessed", "Total"] + TransformationFilesStatus.TRANSFORMATION_FILES_STATES + + +class WebAppHandler(RequestHandler): + @classmethod + def initializeHandler(cls, serviceInfoDict): + """Initialization of DB objects""" + + try: + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") + if not result["OK"]: + return result + cls.pilotAgentsDB = result["Value"](parentLogger=cls.log) + + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") + if not result["OK"]: + return result + cls.jobDB = result["Value"](parentLogger=cls.log) + + result = ObjectLoader().loadObject("TransformationSystem.DB.TransformationDB", "TransformationDB") + if not result["OK"]: + return result + cls.transformationDB = result["Value"](parentLogger=cls.log) + + except RuntimeError as excp: + return S_ERROR(f"Can't connect to DB: {excp}") + + return S_OK() + + ############################################################################## + # PilotAgents + ############################################################################## + + types_getPilotMonitorWeb = [dict, list, int, int] + + @classmethod + def export_getPilotMonitorWeb(cls, selectDict, sortList, startItem, maxItems): + """Get the summary of the pilot information for a given page in the + pilot monitor in a generic format + """ + + return cls.pilotAgentsDB.getPilotMonitorWeb(selectDict, sortList, startItem, maxItems) + + types_getPilotMonitorSelectors = [] + + @classmethod + def export_getPilotMonitorSelectors(cls): + """Get all the distinct selector values for the Pilot Monitor web portal page""" + + return cls.pilotAgentsDB.getPilotMonitorSelectors() + + types_getPilotSummaryWeb = [dict, list, int, int] + + @classmethod + def export_getPilotSummaryWeb(cls, selectDict, sortList, startItem, maxItems): + """Get the summary of the pilot information for a given page in the + pilot monitor in a generic format + """ + + return cls.pilotAgentsDB.getPilotSummaryWeb(selectDict, sortList, startItem, maxItems) + + types_getPilotStatistics = [str, dict] + + @classmethod + def export_getPilotStatistics(cls, attribute, selectDict): + """Get pilot statistics distribution per attribute value with a given selection""" + + startDate = selectDict.get("FromDate", None) + if startDate: + del selectDict["FromDate"] + + if startDate is None: + startDate = selectDict.get("LastUpdate", None) + if startDate: + del selectDict["LastUpdate"] + endDate = selectDict.get("ToDate", None) + if endDate: + del selectDict["ToDate"] + + result = cls.pilotAgentsDB.getCounters( + "PilotAgents", [attribute], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime" + ) + statistics = {} + if result["OK"]: + for status, count in result["Value"]: + statistics[status[attribute]] = count + + return S_OK(statistics) + + types_getPilotsCounters = [str, list, dict] + + # This was PilotManagerHandler.getCounters + @classmethod + def export_getPilotsCounters(cls, table, keys, condDict, newer=None, timeStamp="SubmissionTime"): + """Set the pilot agent status""" + + return cls.pilotAgentsDB.getCounters(table, keys, condDict, newer=newer, timeStamp=timeStamp) + + ############################################################################## + # Jobs + ############################################################################## + + types_getJobPageSummaryWeb = [dict, list, int, int] + + def export_getJobPageSummaryWeb(self, selectDict, sortList, startItem, maxItems, selectJobs=True): + """Get the summary of the job information for a given page in the + job monitor in a generic format + """ + + resultDict = {} + + startDate, endDate, selectDict = self.parseSelectors(selectDict) + + # initialize jobPolicy + credDict = self.getRemoteCredentials() + owner = credDict["username"] + ownerGroup = credDict["group"] + operations = Operations(group=ownerGroup) + globalJobsInfo = operations.getValue("/Services/JobMonitoring/GlobalJobsInfo", True) + jobPolicy = JobPolicy(owner, ownerGroup, globalJobsInfo) + jobPolicy.jobDB = self.jobDB + result = jobPolicy.getControlledUsers(RIGHT_GET_INFO) + if not result["OK"]: + return result + if not result["Value"]: + return S_ERROR(f"User and group combination has no job rights ({owner!r}, {ownerGroup!r})") + if result["Value"] != "ALL": + selectDict[("Owner", "OwnerGroup")] = result["Value"] + + # Sorting instructions. Only one for the moment. + if sortList: + orderAttribute = sortList[0][0] + ":" + sortList[0][1] + else: + orderAttribute = None + + result = self.jobDB.getCounters( + "Jobs", ["Status"], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime" + ) + if not result["OK"]: + return result + + statusDict = {} + nJobs = 0 + for stDict, count in result["Value"]: + nJobs += count + statusDict[stDict["Status"]] = count + + resultDict["TotalRecords"] = nJobs + if nJobs == 0: + return S_OK(resultDict) + + resultDict["Extras"] = statusDict + + if selectJobs: + iniJob = startItem + if iniJob >= nJobs: + return S_ERROR("Item number out of range") + + result = self.jobDB.selectJobs( + selectDict, orderAttribute=orderAttribute, newer=startDate, older=endDate, limit=(maxItems, iniJob) + ) + if not result["OK"]: + return result + + summaryJobList = result["Value"] + if not globalJobsInfo: + validJobs, _invalidJobs, _nonauthJobs, _ownJobs = jobPolicy.evaluateJobRights( + summaryJobList, RIGHT_GET_INFO + ) + summaryJobList = validJobs + + res = self.jobDB.getJobsAttributes(summaryJobList) + if not res["OK"]: + return res + return S_OK(strToIntDict(res["Value"])) + + summaryDict = result["Value"] + # If no jobs can be selected after the properties check + if not summaryDict: + return S_OK(resultDict) + + # Evaluate last sign of life time + for jobDict in summaryDict.values(): + if not jobDict.get("HeartBeatTime") or jobDict["HeartBeatTime"] == "None": + jobDict["LastSignOfLife"] = jobDict["LastUpdateTime"] + else: + jobDict["LastSignOfLife"] = jobDict["HeartBeatTime"] + + # prepare the standard structure now + # This should be faster than making a list of values() + for jobDict in summaryDict.values(): + paramNames = list(jobDict) + break + records = [list(jobDict.values()) for jobDict in summaryDict.values()] + + resultDict["ParameterNames"] = paramNames + resultDict["Records"] = records + + return S_OK(resultDict) + + types_getJobStats = [str, dict] + + @classmethod + def export_getJobStats(cls, attribute, selectDict): + """Get job statistics distribution per attribute value with a given selection""" + startDate, endDate, selectDict = cls.parseSelectors(selectDict) + result = cls.jobDB.getCounters( + "Jobs", [attribute], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime" + ) + if not result["OK"]: + return result + resultDict = {} + for cDict, count in result["Value"]: + resultDict[cDict[attribute]] = count + + return S_OK(resultDict) + + @classmethod + def parseSelectors(cls, selectDict=None): + """Parse selectors before DB query + + :param dict selectDict: selectors + + :return: str, str, dict -- start/end date, selectors + """ + selectDict = selectDict or {} + + # Get time period + startDate = selectDict.get("FromDate", None) + if startDate: + del selectDict["FromDate"] + # For backward compatibility + if startDate is None: + startDate = selectDict.get("LastUpdate", None) + if startDate: + del selectDict["LastUpdate"] + endDate = selectDict.get("ToDate", None) + if endDate: + del selectDict["ToDate"] + + # Provide JobID bound to a specific PilotJobReference + # There is no reason to have both PilotJobReference and JobID in selectDict + # If that occurs, use the JobID instead of the PilotJobReference + pilotJobRefs = selectDict.get("PilotJobReference") + if pilotJobRefs: + del selectDict["PilotJobReference"] + if not selectDict.get("JobID"): + for pilotJobRef in [pilotJobRefs] if isinstance(pilotJobRefs, str) else pilotJobRefs: + res = cls.pilotAgentsDB.getPilotInfo(pilotJobRef) + if res["OK"] and "Jobs" in res["Value"][pilotJobRef]: + selectDict["JobID"] = selectDict.get("JobID", []) + selectDict["JobID"].extend(res["Value"][pilotJobRef]["Jobs"]) + + return startDate, endDate, selectDict + + types_getJobsCounters = [list] + + # This was JobManagerHanlder.getCounters + @classmethod + def export_getJobsCounters(cls, attrList, attrDict=None, cutDate=""): + """ + Retrieve list of distinct attributes values from attrList + with attrDict as condition. + For each set of distinct values, count number of occurences. + Return a list. Each item is a list with 2 items, the list of distinct + attribute values and the counter + """ + + _, _, attrDict = cls.parseSelectors(attrDict) + return cls.jobDB.getCounters("Jobs", attrList, attrDict, newer=str(cutDate), timeStamp="LastUpdateTime") + + types_getSiteSummaryWeb = [dict, list, int, int] + + @classmethod + def export_getSiteSummaryWeb(cls, selectDict, sortList, startItem, maxItems): + """Get the summary of the jobs running on sites in a generic format + + :param dict selectDict: selectors + :param list sortList: sorting list + :param int startItem: start item number + :param int maxItems: maximum of items + + :return: S_OK(dict)/S_ERROR() + """ + return cls.jobDB.getSiteSummaryWeb(selectDict, sortList, startItem, maxItems) + + types_getSiteSummarySelectors = [] + + @classmethod + def export_getSiteSummarySelectors(cls): + """Get all the distinct selector values for the site summary web portal page + + :return: S_OK(dict)/S_ERROR() + """ + resultDict = {} + statusList = ["Good", "Fair", "Poor", "Bad", "Idle"] + resultDict["Status"] = statusList + maskStatus = ["Active", "Banned", "NoMask", "Reduced"] + resultDict["MaskStatus"] = maskStatus + + res = getSites() + if not res["OK"]: + return res + siteList = res["Value"] + + countryList = [] + for site in siteList: + if site.find(".") != -1: + country = site.split(".")[2].lower() + if country not in countryList: + countryList.append(country) + countryList.sort() + resultDict["Country"] = countryList + siteList.sort() + resultDict["Site"] = siteList + + return S_OK(resultDict) + + ############################################################################## + # Transformations + ############################################################################## + + types_getDistinctAttributeValues = [str, dict] + + @classmethod + def export_getDistinctAttributeValues(cls, attribute, selectDict): + res = cls.transformationDB.getTableDistinctAttributeValues("Transformations", [attribute], selectDict) + if not res["OK"]: + return res + return S_OK(res["Value"][attribute]) + + types_getTransformationFilesSummaryWeb = [dict, list, int, int] + + @classmethod + def export_getTransformationFilesSummaryWeb(cls, selectDict, sortList, startItem, maxItems): + selectColumns = (["TransformationID", "Status", "UsedSE", "TargetSE"],) + timeStamp = ("LastUpdate",) + statusColumn = ("Status",) + fromDate = selectDict.get("FromDate", None) + if fromDate: + del selectDict["FromDate"] + # if not fromDate: + # fromDate = last_update + toDate = selectDict.get("ToDate", None) + if toDate: + del selectDict["ToDate"] + # Sorting instructions. Only one for the moment. + if sortList: + orderAttribute = sortList[0][0] + ":" + sortList[0][1] + else: + orderAttribute = None + # Get the columns that match the selection + fcn = None + fcnName = "getTransformationFiles" + if hasattr(cls.transformationDB, fcnName) and callable(getattr(cls.transformationDB, fcnName)): + fcn = getattr(cls.transformationDB, fcnName) + if not fcn: + return S_ERROR(f"Unable to invoke gTransformationDB.{fcnName}, it isn't a member function") + res = fcn(condDict=selectDict, older=toDate, newer=fromDate, timeStamp=timeStamp, orderAttribute=orderAttribute) + if not res["OK"]: + return res + + # The full list of columns in contained here + allRows = res["Value"] + # Prepare the standard structure now within the resultDict dictionary + resultDict = {} + # Create the total records entry + resultDict["TotalRecords"] = len(allRows) + + # Get the rows which are within the selected window + if resultDict["TotalRecords"] == 0: + return S_OK(resultDict) + ini = startItem + last = ini + maxItems + if ini >= resultDict["TotalRecords"]: + return S_ERROR("Item number out of range") + if last > resultDict["TotalRecords"]: + last = resultDict["TotalRecords"] + + selectedRows = allRows[ini:last] + resultDict["Records"] = [] + for row in selectedRows: + resultDict["Records"].append(list(row.values())) + + # Create the ParameterNames entry + resultDict["ParameterNames"] = list(selectedRows[0].keys()) + # Find which element in the tuple contains the requested status + if statusColumn not in resultDict["ParameterNames"]: + return S_ERROR("Provided status column not present") + + # Generate the status dictionary + statusDict = {} + for row in selectedRows: + status = row[statusColumn] + statusDict[status] = statusDict.setdefault(status, 0) + 1 + resultDict["Extras"] = statusDict + + # Obtain the distinct values of the selection parameters + res = cls.transformationDB.getTableDistinctAttributeValues( + "TransformationFiles", selectColumns, selectDict, older=toDate, newer=fromDate + ) + distinctSelections = zip(selectColumns, []) + if res["OK"]: + distinctSelections = res["Value"] + resultDict["Selections"] = distinctSelections + + return S_OK(resultDict) + + types_getTransformationSummaryWeb = [dict, list, int, int] + + @classmethod + def export_getTransformationSummaryWeb(cls, selectDict, sortList, startItem, maxItems): + """Get the summary of the transformation information for a given page in the generic format""" + + # Obtain the timing information from the selectDict + last_update = selectDict.get("CreationDate", None) + if last_update: + del selectDict["CreationDate"] + fromDate = selectDict.get("FromDate", None) + if fromDate: + del selectDict["FromDate"] + if not fromDate: + fromDate = last_update + toDate = selectDict.get("ToDate", None) + if toDate: + del selectDict["ToDate"] + # Sorting instructions. Only one for the moment. + if sortList: + orderAttribute = [] + for i in sortList: + orderAttribute += [i[0] + ":" + i[1]] + else: + orderAttribute = None + + # Get the transformations that match the selection + res = cls.transformationDB.getTransformations( + condDict=selectDict, older=toDate, newer=fromDate, orderAttribute=orderAttribute + ) + if not res["OK"]: + return res + + ops = Operations() + # Prepare the standard structure now within the resultDict dictionary + resultDict = {} + trList = res["Records"] + # Create the total records entry + nTrans = len(trList) + resultDict["TotalRecords"] = nTrans + # Create the ParameterNames entry + # As this list is a reference to the list in the DB, we cannot extend it, therefore copy it + resultDict["ParameterNames"] = list(res["ParameterNames"]) + # Add the job states to the ParameterNames entry + taskStateNames = TASKS_STATE_NAMES + ops.getValue("Transformations/AdditionalTaskStates", []) + resultDict["ParameterNames"] += ["Jobs_" + x for x in taskStateNames] + # Add the file states to the ParameterNames entry + fileStateNames = FILES_STATE_NAMES + ops.getValue("Transformations/AdditionalFileStates", []) + resultDict["ParameterNames"] += ["Files_" + x for x in fileStateNames] + + # Get the transformations which are within the selected window + if nTrans == 0: + return S_OK(resultDict) + ini = startItem + last = ini + maxItems + if ini >= nTrans: + return S_ERROR("Item number out of range") + if last > nTrans: + last = nTrans + transList = trList[ini:last] + + statusDict = {} + extendableTranfs = ops.getValue("Transformations/ExtendableTransfTypes", ["Simulation", "MCsimulation"]) + givenUpFileStatus = ops.getValue("Transformations/GivenUpFileStatus", ["MissingInFC"]) + problematicStatuses = ops.getValue("Transformations/ProblematicStatuses", ["Problematic"]) + # Add specific information for each selected transformation + for trans in transList: + transDict = dict(zip(resultDict["ParameterNames"], trans)) + + # Update the status counters + status = transDict["Status"] + statusDict[status] = statusDict.setdefault(status, 0) + 1 + + # Get the statistics on the number of jobs for the transformation + transID = transDict["TransformationID"] + res = cls.transformationDB.getTransformationTaskStats(transID) + taskDict = {} + if res["OK"] and res["Value"]: + taskDict = res["Value"] + for state in taskStateNames: + trans.append(taskDict.get(state, 0)) + + # Get the statistics for the number of files for the transformation + fileDict = {} + transType = transDict["Type"] + if transType.lower() in extendableTranfs: + fileDict["PercentProcessed"] = "-" + else: + res = cls.transformationDB.getTransformationStats(transID) + if res["OK"]: + fileDict = res["Value"] + total = fileDict["Total"] + for stat in givenUpFileStatus: + total -= fileDict.get(stat, 0) + processed = fileDict.get(TransformationFilesStatus.PROCESSED, 0) + fileDict["PercentProcessed"] = f"{int(processed * 1000.0 / total) / 10.0:.1f}" if total else 0.0 + problematic = 0 + for stat in problematicStatuses: + problematic += fileDict.get(stat, 0) + fileDict["Problematic"] = problematic + for state in fileStateNames: + trans.append(fileDict.get(state, 0)) + + resultDict["Records"] = transList + resultDict["Extras"] = statusDict + return S_OK(resultDict) diff --git a/src/DIRAC/ProductionSystem/scripts/dirac_prod_get_trans.py b/src/DIRAC/ProductionSystem/scripts/dirac_prod_get_trans.py index a8ed596a6c8..1c51616bb51 100755 --- a/src/DIRAC/ProductionSystem/scripts/dirac_prod_get_trans.py +++ b/src/DIRAC/ProductionSystem/scripts/dirac_prod_get_trans.py @@ -16,14 +16,13 @@ def main(): Script.registerArgument("prodID: Production ID") _, args = Script.parseCommandLine() + from DIRAC.MonitoringSystem.Client.WebAppClient import WebAppClient from DIRAC.ProductionSystem.Client.ProductionClient import ProductionClient - from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient # get arguments prodID = args[0] prodClient = ProductionClient() - transClient = TransformationClient() res = prodClient.getProductionTransformations(prodID) transIDs = [] @@ -70,7 +69,7 @@ def main(): ] resList = [] - res = transClient.getTransformationSummaryWeb({"TransformationID": transIDs}, [], 0, len(transIDs)) + res = WebAppClient().getTransformationSummaryWeb({"TransformationID": transIDs}, [], 0, len(transIDs)) if not res["OK"]: DIRAC.gLogger.error(res["Message"]) diff --git a/src/DIRAC/TransformationSystem/Client/Transformation.py b/src/DIRAC/TransformationSystem/Client/Transformation.py index 06ce69ccbe2..b6281e4df61 100644 --- a/src/DIRAC/TransformationSystem/Client/Transformation.py +++ b/src/DIRAC/TransformationSystem/Client/Transformation.py @@ -9,6 +9,7 @@ from DIRAC.Core.Base.API import API from DIRAC.Core.Utilities.JEncode import encode from DIRAC.Core.Utilities.PromptUser import promptUser +from DIRAC.MonitoringSystem.Client.WebAppClient import WebAppClient from DIRAC.RequestManagementSystem.Client.Operation import Operation from DIRAC.TransformationSystem.Client.BodyPlugin.BaseBody import BaseBody from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient @@ -498,7 +499,7 @@ def getSummaryTransformations(self, transID=[]): ] dictList = [] - result = self.transClient.getTransformationSummaryWeb(condDict, orderby, start, maxitems) + result = WebAppClient().getTransformationSummaryWeb(condDict, orderby, start, maxitems) if not result["OK"]: self._prettyPrint(result) return result diff --git a/src/DIRAC/TransformationSystem/Client/TransformationClient.py b/src/DIRAC/TransformationSystem/Client/TransformationClient.py index 609808cf93e..2ed7a37b656 100644 --- a/src/DIRAC/TransformationSystem/Client/TransformationClient.py +++ b/src/DIRAC/TransformationSystem/Client/TransformationClient.py @@ -48,13 +48,6 @@ class TransformationClient(Client): getFileSummary(lfns) exists(lfns) - - Web monitoring tools - - getDistinctAttributeValues(attribute, selectDict) - getTransformationStatusCounters() - getTransformationSummary() - getTransformationSummaryWeb(selectDict, sortList, startItem, maxItems) """ def __init__(self, **kwargs): diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index e99a09d7f2f..16e053d420f 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -3,22 +3,12 @@ import datetime from DIRAC import S_ERROR, S_OK -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Security.Properties import SecurityProperty from DIRAC.Core.Utilities.Decorators import deprecated from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.JEncode import encode as jencode from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.RequestManagementSystem.Client.Operation import Operation -from DIRAC.RequestManagementSystem.Client.Request import Request -from DIRAC.TransformationSystem.Client import TransformationFilesStatus -from DIRAC.WorkloadManagementSystem.Client import JobStatus - -TASKS_STATE_NAMES = ["TotalCreated", "Created"] + sorted( - set(JobStatus.JOB_STATES) | set(Request.ALL_STATES) | set(Operation.ALL_STATES) -) -FILES_STATE_NAMES = ["PercentProcessed", "Total"] + TransformationFilesStatus.TRANSFORMATION_FILES_STATES class TransformationManagerHandlerMixin: @@ -530,39 +520,12 @@ def export_setMetadata(cls, path, querydict): """Set metadata to a file or to a directory (path)""" return cls.transformationDB.setMetadata(path, querydict) - #################################################################### - # - # These are the methods used for web monitoring - # - - # TODO Get rid of this (talk to Matvey) - types_getDistinctAttributeValues = [str, dict] - - @classmethod - def export_getDistinctAttributeValues(cls, attribute, selectDict): - res = cls.transformationDB.getTableDistinctAttributeValues("Transformations", [attribute], selectDict) - if not res["OK"]: - return res - return S_OK(res["Value"][attribute]) - types_getTableDistinctAttributeValues = [str, list, dict] @classmethod def export_getTableDistinctAttributeValues(cls, table, attributes, selectDict): return cls.transformationDB.getTableDistinctAttributeValues(table, attributes, selectDict) - types_getTransformationStatusCounters = [] - - @classmethod - def export_getTransformationStatusCounters(cls): - res = cls.transformationDB.getCounters("Transformations", ["Status"], {}) - if not res["OK"]: - return res - statDict = {} - for attrDict, count in res["Value"]: - statDict[attrDict["Status"]] = count - return S_OK(statDict) - types_getTransformationSummary = [] def export_getTransformationSummary(self): @@ -587,305 +550,6 @@ def export_getTransformationSummary(self): resultDict[transID] = transDict return S_OK(resultDict) - types_getTabbedSummaryWeb = [str, dict, dict, list, int, int] - - def export_getTabbedSummaryWeb(self, table, requestedTables, selectDict, sortList, startItem, maxItems): - tableDestinations = { - "Transformations": { - "TransformationFiles": ["TransformationID"], - "TransformationTasks": ["TransformationID"], - }, - "TransformationFiles": { - "Transformations": ["TransformationID"], - "TransformationTasks": ["TransformationID", "TaskID"], - }, - "TransformationTasks": { - "Transformations": ["TransformationID"], - "TransformationFiles": ["TransformationID", "TaskID"], - }, - } - - tableSelections = { - "Transformations": ["TransformationID", "AgentType", "Type", "TransformationGroup", "Plugin"], - "TransformationFiles": ["TransformationID", "TaskID", "Status", "UsedSE", "TargetSE"], - "TransformationTasks": ["TransformationID", "TaskID", "ExternalStatus", "TargetSE"], - } - - tableTimeStamps = { - "Transformations": "CreationDate", - "TransformationFiles": "LastUpdate", - "TransformationTasks": "CreationTime", - } - - tableStatusColumn = { - "Transformations": "Status", - "TransformationFiles": "Status", - "TransformationTasks": "ExternalStatus", - } - - resDict = {} - res = self.__getTableSummaryWeb( - table, - selectDict, - sortList, - startItem, - maxItems, - selectColumns=tableSelections[table], - timeStamp=tableTimeStamps[table], - statusColumn=tableStatusColumn[table], - ) - if not res["OK"]: - self.log.error("Failed to get Summary for table", f"{table} {res['Message']}") - return res - resDict[table] = res["Value"] - selections = res["Value"]["Selections"] - tableSelection = {} - for destination in tableDestinations[table].keys(): - tableSelection[destination] = {} - for parameter in tableDestinations[table][destination]: - tableSelection[destination][parameter] = selections.get(parameter, []) - - for table, paramDict in requestedTables.items(): - sortList = paramDict.get("SortList", []) - startItem = paramDict.get("StartItem", 0) - maxItems = paramDict.get("MaxItems", 50) - res = self.__getTableSummaryWeb( - table, - tableSelection[table], - sortList, - startItem, - maxItems, - selectColumns=tableSelections[table], - timeStamp=tableTimeStamps[table], - statusColumn=tableStatusColumn[table], - ) - if not res["OK"]: - self.log.error("Failed to get Summary for table", f"{table} {res['Message']}") - return res - resDict[table] = res["Value"] - return S_OK(resDict) - - types_getTransformationsSummaryWeb = [dict, list, int, int] - - def export_getTransformationsSummaryWeb(self, selectDict, sortList, startItem, maxItems): - return self.__getTableSummaryWeb( - "Transformations", - selectDict, - sortList, - startItem, - maxItems, - selectColumns=["TransformationID", "AgentType", "Type", "Group", "Plugin"], - timeStamp="CreationDate", - statusColumn="Status", - ) - - types_getTransformationTasksSummaryWeb = [dict, list, int, int] - - def export_getTransformationTasksSummaryWeb(self, selectDict, sortList, startItem, maxItems): - return self.__getTableSummaryWeb( - "TransformationTasks", - selectDict, - sortList, - startItem, - maxItems, - selectColumns=["TransformationID", "ExternalStatus", "TargetSE"], - timeStamp="CreationTime", - statusColumn="ExternalStatus", - ) - - types_getTransformationFilesSummaryWeb = [dict, list, int, int] - - def export_getTransformationFilesSummaryWeb(self, selectDict, sortList, startItem, maxItems): - return self.__getTableSummaryWeb( - "TransformationFiles", - selectDict, - sortList, - startItem, - maxItems, - selectColumns=["TransformationID", "Status", "UsedSE", "TargetSE"], - timeStamp="LastUpdate", - statusColumn="Status", - ) - - def __getTableSummaryWeb( - self, table, selectDict, sortList, startItem, maxItems, selectColumns=[], timeStamp=None, statusColumn="Status" - ): - fromDate = selectDict.get("FromDate", None) - if fromDate: - del selectDict["FromDate"] - # if not fromDate: - # fromDate = last_update - toDate = selectDict.get("ToDate", None) - if toDate: - del selectDict["ToDate"] - # Sorting instructions. Only one for the moment. - if sortList: - orderAttribute = sortList[0][0] + ":" + sortList[0][1] - else: - orderAttribute = None - # Get the columns that match the selection - fcn = None - fcnName = f"get{table}" - if hasattr(self.transformationDB, fcnName) and callable(getattr(self.transformationDB, fcnName)): - fcn = getattr(self.transformationDB, fcnName) - if not fcn: - return S_ERROR(f"Unable to invoke gTransformationDB.{fcnName}, it isn't a member function") - res = fcn(condDict=selectDict, older=toDate, newer=fromDate, timeStamp=timeStamp, orderAttribute=orderAttribute) - if not res["OK"]: - return res - - # The full list of columns in contained here - allRows = res["Value"] - # Prepare the standard structure now within the resultDict dictionary - resultDict = {} - # Create the total records entry - resultDict["TotalRecords"] = len(allRows) - - # Get the rows which are within the selected window - if resultDict["TotalRecords"] == 0: - return S_OK(resultDict) - ini = startItem - last = ini + maxItems - if ini >= resultDict["TotalRecords"]: - return S_ERROR("Item number out of range") - if last > resultDict["TotalRecords"]: - last = resultDict["TotalRecords"] - - selectedRows = allRows[ini:last] - resultDict["Records"] = [] - for row in selectedRows: - resultDict["Records"].append(list(row.values())) - - # Create the ParameterNames entry - resultDict["ParameterNames"] = list(selectedRows[0].keys()) - # Find which element in the tuple contains the requested status - if statusColumn not in resultDict["ParameterNames"]: - return S_ERROR("Provided status column not present") - - # Generate the status dictionary - statusDict = {} - for row in selectedRows: - status = row[statusColumn] - statusDict[status] = statusDict.setdefault(status, 0) + 1 - resultDict["Extras"] = statusDict - - # Obtain the distinct values of the selection parameters - res = self.transformationDB.getTableDistinctAttributeValues( - table, selectColumns, selectDict, older=toDate, newer=fromDate - ) - distinctSelections = zip(selectColumns, []) - if res["OK"]: - distinctSelections = res["Value"] - resultDict["Selections"] = distinctSelections - - return S_OK(resultDict) - - types_getTransformationSummaryWeb = [dict, list, int, int] - - def export_getTransformationSummaryWeb(self, selectDict, sortList, startItem, maxItems): - """Get the summary of the transformation information for a given page in the generic format""" - - # Obtain the timing information from the selectDict - last_update = selectDict.get("CreationDate", None) - if last_update: - del selectDict["CreationDate"] - fromDate = selectDict.get("FromDate", None) - if fromDate: - del selectDict["FromDate"] - if not fromDate: - fromDate = last_update - toDate = selectDict.get("ToDate", None) - if toDate: - del selectDict["ToDate"] - # Sorting instructions. Only one for the moment. - if sortList: - orderAttribute = [] - for i in sortList: - orderAttribute += [i[0] + ":" + i[1]] - else: - orderAttribute = None - - # Get the transformations that match the selection - res = self.transformationDB.getTransformations( - condDict=selectDict, older=toDate, newer=fromDate, orderAttribute=orderAttribute - ) - if not res["OK"]: - return res - - ops = Operations() - # Prepare the standard structure now within the resultDict dictionary - resultDict = {} - trList = res["Records"] - # Create the total records entry - nTrans = len(trList) - resultDict["TotalRecords"] = nTrans - # Create the ParameterNames entry - # As this list is a reference to the list in the DB, we cannot extend it, therefore copy it - resultDict["ParameterNames"] = list(res["ParameterNames"]) - # Add the job states to the ParameterNames entry - taskStateNames = TASKS_STATE_NAMES + ops.getValue("Transformations/AdditionalTaskStates", []) - resultDict["ParameterNames"] += ["Jobs_" + x for x in taskStateNames] - # Add the file states to the ParameterNames entry - fileStateNames = FILES_STATE_NAMES + ops.getValue("Transformations/AdditionalFileStates", []) - resultDict["ParameterNames"] += ["Files_" + x for x in fileStateNames] - - # Get the transformations which are within the selected window - if nTrans == 0: - return S_OK(resultDict) - ini = startItem - last = ini + maxItems - if ini >= nTrans: - return S_ERROR("Item number out of range") - if last > nTrans: - last = nTrans - transList = trList[ini:last] - - statusDict = {} - extendableTranfs = ops.getValue("Transformations/ExtendableTransfTypes", ["Simulation", "MCsimulation"]) - givenUpFileStatus = ops.getValue("Transformations/GivenUpFileStatus", ["MissingInFC"]) - problematicStatuses = ops.getValue("Transformations/ProblematicStatuses", ["Problematic"]) - # Add specific information for each selected transformation - for trans in transList: - transDict = dict(zip(resultDict["ParameterNames"], trans)) - - # Update the status counters - status = transDict["Status"] - statusDict[status] = statusDict.setdefault(status, 0) + 1 - - # Get the statistics on the number of jobs for the transformation - transID = transDict["TransformationID"] - res = self.transformationDB.getTransformationTaskStats(transID) - taskDict = {} - if res["OK"] and res["Value"]: - taskDict = res["Value"] - for state in taskStateNames: - trans.append(taskDict.get(state, 0)) - - # Get the statistics for the number of files for the transformation - fileDict = {} - transType = transDict["Type"] - if transType.lower() in extendableTranfs: - fileDict["PercentProcessed"] = "-" - else: - res = self.transformationDB.getTransformationStats(transID) - if res["OK"]: - fileDict = res["Value"] - total = fileDict["Total"] - for stat in givenUpFileStatus: - total -= fileDict.get(stat, 0) - processed = fileDict.get(TransformationFilesStatus.PROCESSED, 0) - fileDict["PercentProcessed"] = f"{int(processed * 1000.0 / total) / 10.0:.1f}" if total else 0.0 - problematic = 0 - for stat in problematicStatuses: - problematic += fileDict.get(stat, 0) - fileDict["Problematic"] = problematic - for state in fileStateNames: - trans.append(fileDict.get(state, 0)) - - resultDict["Records"] = transList - resultDict["Extras"] = statusDict - return S_OK(resultDict) - class TransformationManagerHandler(TransformationManagerHandlerMixin, RequestHandler): pass diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index b691180ecff..d864e669669 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -22,10 +22,10 @@ from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient -from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB +from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB class StalledJobAgent(AgentModule): @@ -262,7 +262,7 @@ def _getJobPilotStatus(self, jobID): # There is no pilot reference, hence its status is unknown return S_OK("NoPilot") - result = PilotManagerClient().getPilotInfo(pilotReference) + result = PilotAgentsDB().getPilotInfo(pilotReference) if not result["OK"]: if DErrno.cmpError(result, DErrno.EWMSNOPILOT): self.log.warn("No pilot found", f"for job {jobID}: {result['Message']}") diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py index 94111b9e224..91edbe96c00 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py @@ -27,7 +27,7 @@ def sja(mocker): mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobLoggingDB") mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobMonitoringClient", return_value=MagicMock()) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobManagerClient", return_value=MagicMock()) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotManagerClient", return_value=MagicMock()) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotAgentsDB", return_value=MagicMock()) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.WMSClient", return_value=MagicMock()) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getDNForUsername", return_value=MagicMock()) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py index f97667fd460..34e1375449e 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py @@ -4,17 +4,13 @@ The following methods are available in the Service interface """ -import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities from DIRAC import S_ERROR, S_OK from DIRAC.ConfigurationSystem.Client.Helpers import Registry -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning from DIRAC.Core.Utilities.JEncode import strToIntDict from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient -from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, JobPolicy class JobMonitoringHandlerMixin: @@ -47,44 +43,6 @@ def initializeRequest(self): credDict = self.getRemoteCredentials() self.vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"])) - @classmethod - def parseSelectors(cls, selectDict=None): - """Parse selectors before DB query - - :param dict selectDict: selectors - - :return: str, str, dict -- start/end date, selectors - """ - selectDict = selectDict or {} - - # Get time period - startDate = selectDict.get("FromDate", None) - if startDate: - del selectDict["FromDate"] - # For backward compatibility - if startDate is None: - startDate = selectDict.get("LastUpdate", None) - if startDate: - del selectDict["LastUpdate"] - endDate = selectDict.get("ToDate", None) - if endDate: - del selectDict["ToDate"] - - # Provide JobID bound to a specific PilotJobReference - # There is no reason to have both PilotJobReference and JobID in selectDict - # If that occurs, use the JobID instead of the PilotJobReference - pilotJobRefs = selectDict.get("PilotJobReference") - if pilotJobRefs: - del selectDict["PilotJobReference"] - if not selectDict.get("JobID"): - for pilotJobRef in [pilotJobRefs] if isinstance(pilotJobRefs, str) else pilotJobRefs: - res = cls.pilotManager.getPilotInfo(pilotJobRef) - if res["OK"] and "Jobs" in res["Value"][pilotJobRef]: - selectDict["JobID"] = selectDict.get("JobID", []) - selectDict["JobID"].extend(res["Value"][pilotJobRef]["Jobs"]) - - return startDate, endDate, selectDict - @classmethod def getJobsAttributes(cls, *args, **kwargs): """Utility function for unpacking""" @@ -189,22 +147,6 @@ def export_getJobs(cls, attrDict=None, cutDate=None): return cls.jobDB.selectJobs(attrDict, newer=cutDate) - ############################################################################## - types_getCounters = [list] - - @classmethod - def export_getCounters(cls, attrList, attrDict=None, cutDate=""): - """ - Retrieve list of distinct attributes values from attrList - with attrDict as condition. - For each set of distinct values, count number of occurences. - Return a list. Each item is a list with 2 items, the list of distinct - attribute values and the counter - """ - - _, _, attrDict = cls.parseSelectors(attrDict) - return cls.jobDB.getCounters("Jobs", attrList, attrDict, newer=str(cutDate), timeStamp="LastUpdateTime") - ############################################################################## types_getJobOwner = [int] @@ -295,122 +237,6 @@ def export_getJobSummary(cls, jobID): def export_getJobsSummary(cls, jobIDs): return cls.getJobsAttributes(jobIDs) - ############################################################################## - types_getJobPageSummaryWeb = [dict, list, int, int] - - def export_getJobPageSummaryWeb(self, selectDict, sortList, startItem, maxItems, selectJobs=True): - """Get the summary of the job information for a given page in the - job monitor in a generic format - """ - - resultDict = {} - - startDate, endDate, selectDict = self.parseSelectors(selectDict) - - # initialize jobPolicy - credDict = self.getRemoteCredentials() - owner = credDict["username"] - ownerGroup = credDict["group"] - operations = Operations(group=ownerGroup) - globalJobsInfo = operations.getValue("/Services/JobMonitoring/GlobalJobsInfo", True) - jobPolicy = JobPolicy(owner, ownerGroup, globalJobsInfo) - jobPolicy.jobDB = self.jobDB - result = jobPolicy.getControlledUsers(RIGHT_GET_INFO) - if not result["OK"]: - return result - if not result["Value"]: - return S_ERROR(f"User and group combination has no job rights ({owner!r}, {ownerGroup!r})") - if result["Value"] != "ALL": - selectDict[("Owner", "OwnerGroup")] = result["Value"] - - # Sorting instructions. Only one for the moment. - if sortList: - orderAttribute = sortList[0][0] + ":" + sortList[0][1] - else: - orderAttribute = None - - result = self.jobDB.getCounters( - "Jobs", ["Status"], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime" - ) - if not result["OK"]: - return result - - statusDict = {} - nJobs = 0 - for stDict, count in result["Value"]: - nJobs += count - statusDict[stDict["Status"]] = count - - resultDict["TotalRecords"] = nJobs - if nJobs == 0: - return S_OK(resultDict) - - resultDict["Extras"] = statusDict - - if selectJobs: - iniJob = startItem - if iniJob >= nJobs: - return S_ERROR("Item number out of range") - - result = self.jobDB.selectJobs( - selectDict, orderAttribute=orderAttribute, newer=startDate, older=endDate, limit=(maxItems, iniJob) - ) - if not result["OK"]: - return result - - summaryJobList = result["Value"] - if not globalJobsInfo: - validJobs, _invalidJobs, _nonauthJobs, _ownJobs = jobPolicy.evaluateJobRights( - summaryJobList, RIGHT_GET_INFO - ) - summaryJobList = validJobs - - result = self.getJobsAttributes(summaryJobList) - if not result["OK"]: - return result - - summaryDict = result["Value"] - # If no jobs can be selected after the properties check - if not summaryDict: - return S_OK(resultDict) - - # Evaluate last sign of life time - for jobDict in summaryDict.values(): - if not jobDict.get("HeartBeatTime") or jobDict["HeartBeatTime"] == "None": - jobDict["LastSignOfLife"] = jobDict["LastUpdateTime"] - else: - jobDict["LastSignOfLife"] = jobDict["HeartBeatTime"] - - # prepare the standard structure now - # This should be faster than making a list of values() - for jobDict in summaryDict.values(): - paramNames = list(jobDict) - break - records = [list(jobDict.values()) for jobDict in summaryDict.values()] - - resultDict["ParameterNames"] = paramNames - resultDict["Records"] = records - - return S_OK(resultDict) - - ############################################################################## - types_getJobStats = [str, dict] - - @classmethod - def export_getJobStats(cls, attribute, selectDict): - """Get job statistics distribution per attribute value with a given selection""" - startDate, endDate, selectDict = cls.parseSelectors(selectDict) - result = cls.jobDB.getCounters( - "Jobs", [attribute], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime" - ) - if not result["OK"]: - return result - resultDict = {} - for cDict, count in result["Value"]: - resultDict[cDict[attribute]] = count - - return S_OK(resultDict) - ############################################################################## types_getJobParameter = [[str, int], str] diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index 6b94ecd5be3..74123b81b11 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -275,37 +275,6 @@ def export_getPilotSummary(cls, startdate="", enddate=""): return cls.pilotAgentsDB.getPilotSummary(startdate, enddate) - ############################################################################## - types_getPilotMonitorWeb = [dict, list, int, int] - - @classmethod - def export_getPilotMonitorWeb(cls, selectDict, sortList, startItem, maxItems): - """Get the summary of the pilot information for a given page in the - pilot monitor in a generic format - """ - - return cls.pilotAgentsDB.getPilotMonitorWeb(selectDict, sortList, startItem, maxItems) - - ############################################################################## - types_getPilotMonitorSelectors = [] - - @classmethod - def export_getPilotMonitorSelectors(cls): - """Get all the distinct selector values for the Pilot Monitor web portal page""" - - return cls.pilotAgentsDB.getPilotMonitorSelectors() - - ############################################################################## - types_getPilotSummaryWeb = [dict, list, int, int] - - @classmethod - def export_getPilotSummaryWeb(cls, selectDict, sortList, startItem, maxItems): - """Get the summary of the pilot information for a given page in the - pilot monitor in a generic format - """ - - return cls.pilotAgentsDB.getPilotSummaryWeb(selectDict, sortList, startItem, maxItems) - ############################################################################## types_getGroupedPilotSummary = [list] @@ -416,44 +385,6 @@ def export_countPilots(cls, condDict, older=None, newer=None, timeStamp="Submiss return cls.pilotAgentsDB.countPilots(condDict, older, newer, timeStamp) ########################################################################################## - types_getCounters = [str, list, dict] - - @classmethod - def export_getCounters(cls, table, keys, condDict, newer=None, timeStamp="SubmissionTime"): - """Set the pilot agent status""" - - return cls.pilotAgentsDB.getCounters(table, keys, condDict, newer=newer, timeStamp=timeStamp) - - ############################################################################## - types_getPilotStatistics = [str, dict] - - @classmethod - def export_getPilotStatistics(cls, attribute, selectDict): - """Get pilot statistics distribution per attribute value with a given selection""" - - startDate = selectDict.get("FromDate", None) - if startDate: - del selectDict["FromDate"] - - if startDate is None: - startDate = selectDict.get("LastUpdate", None) - if startDate: - del selectDict["LastUpdate"] - endDate = selectDict.get("ToDate", None) - if endDate: - del selectDict["ToDate"] - - result = cls.pilotAgentsDB.getCounters( - "PilotAgents", [attribute], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime" - ) - statistics = {} - if result["OK"]: - for status, count in result["Value"]: - statistics[status[attribute]] = count - - return S_OK(statistics) - - ############################################################################## types_deletePilots = [[list, str, int]] @classmethod diff --git a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py index 2dea53c87ea..b1ef5ad80a9 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py @@ -3,7 +3,6 @@ """ from DIRAC import S_ERROR, S_OK from DIRAC.ConfigurationSystem.Client.Helpers import Registry -from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC.Core.Utilities.Decorators import deprecated from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader @@ -183,55 +182,6 @@ def export_getJobPilotOutput(self, jobID): return self.pilotManager.getPilotOutput(pilotReference) return S_ERROR("No pilot job reference found") - ############################################################################## - types_getSiteSummaryWeb = [dict, list, int, int] - - @classmethod - def export_getSiteSummaryWeb(cls, selectDict, sortList, startItem, maxItems): - """Get the summary of the jobs running on sites in a generic format - - :param dict selectDict: selectors - :param list sortList: sorting list - :param int startItem: start item number - :param int maxItems: maximum of items - - :return: S_OK(dict)/S_ERROR() - """ - return cls.jobDB.getSiteSummaryWeb(selectDict, sortList, startItem, maxItems) - - ############################################################################## - types_getSiteSummarySelectors = [] - - @classmethod - def export_getSiteSummarySelectors(cls): - """Get all the distinct selector values for the site summary web portal page - - :return: S_OK(dict)/S_ERROR() - """ - resultDict = {} - statusList = ["Good", "Fair", "Poor", "Bad", "Idle"] - resultDict["Status"] = statusList - maskStatus = ["Active", "Banned", "NoMask", "Reduced"] - resultDict["MaskStatus"] = maskStatus - - res = getSites() - if not res["OK"]: - return res - siteList = res["Value"] - - countryList = [] - for site in siteList: - if site.find(".") != -1: - country = site.split(".")[2].lower() - if country not in countryList: - countryList.append(country) - countryList.sort() - resultDict["Country"] = countryList - siteList.sort() - resultDict["Site"] = siteList - - return S_OK(resultDict) - class WMSAdministratorHandler(WMSAdministratorHandlerMixin, RequestHandler): pass diff --git a/tests/Integration/WorkloadManagementSystem/Test_WMSAdministratorClient.py b/tests/Integration/Monitoring/Test_WebAppClient.py similarity index 52% rename from tests/Integration/WorkloadManagementSystem/Test_WMSAdministratorClient.py rename to tests/Integration/Monitoring/Test_WebAppClient.py index 33952d125c0..7743354017d 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_WMSAdministratorClient.py +++ b/tests/Integration/Monitoring/Test_WebAppClient.py @@ -5,15 +5,13 @@ DIRAC.initialize() # Initialize configuration # sut -from DIRAC.WorkloadManagementSystem.Client.WMSAdministratorClient import WMSAdministratorClient +from DIRAC.MonitoringSystem.Client.WebAppClient import WebAppClient def test_WMSAdministratorClient(): - wmsAdministrator = WMSAdministratorClient() - - res = wmsAdministrator.getSiteSummaryWeb({}, [], 0, 100) + res = WebAppClient().getSiteSummaryWeb({}, [], 0, 100) assert res["OK"], res["Message"] assert res["Value"]["TotalRecords"] in [0, 1, 2, 34] - res = wmsAdministrator.getSiteSummarySelectors() + res = WebAppClient().getSiteSummarySelectors() assert res["OK"], res["Message"] diff --git a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py index a392bd54756..09a00772234 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py +++ b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py @@ -29,9 +29,10 @@ import tempfile import time -import DIRAC import pytest +import DIRAC + DIRAC.initialize() # Initialize configuration from DIRAC import gLogger @@ -40,6 +41,7 @@ from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd from DIRAC.DataManagementSystem.Client.DataManager import DataManager from DIRAC.Interfaces.API.Dirac import Dirac +from DIRAC.MonitoringSystem.Client.WebAppClient import WebAppClient from DIRAC.tests.Utilities.WMS import helloWorldJob, parametricJob from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient @@ -443,7 +445,7 @@ def test_JobStateUpdateAndJobMonitoringMultiple(lfn: str) -> None: # self.assertTrue(res['OK'], res.get('Message')) res = jobMonitoringClient.getJobsSummary(jobIDs) assert res["OK"], res["Message"] - res = jobMonitoringClient.getJobPageSummaryWeb({}, [], 0, 100) + res = WebAppClient().getJobPageSummaryWeb({}, [], 0, 100) assert res["OK"], res["Message"] res = jobStateUpdateClient.setJobStatusBulk( diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py index cfb7fe519bd..a030e9f70e4 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py @@ -14,6 +14,7 @@ DIRAC.initialize() # Initialize configuration from DIRAC import gLogger +from DIRAC.MonitoringSystem.Client.WebAppClient import WebAppClient from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient gLogger.setLevel("VERBOSE") @@ -21,6 +22,7 @@ def test_PilotsDB(): pilots = PilotManagerClient() + webapp = WebAppClient() # This will allow you to run the test again if necessary for jobID in ["aPilot", "anotherPilot"]: @@ -58,12 +60,12 @@ def test_PilotsDB(): res = pilots.getPilotSummary("", "") assert res["OK"], res["Message"] assert res["Value"]["Total"]["Submitted"] >= 1 - res = pilots.getPilotMonitorWeb({}, [], 0, 100) + res = webapp.getPilotMonitorWeb({}, [], 0, 100) assert res["OK"], res["Message"] assert res["Value"]["TotalRecords"] >= 1 - res = pilots.getPilotMonitorSelectors() + res = webapp.getPilotMonitorSelectors() assert res["OK"], res["Message"] - res = pilots.getPilotSummaryWeb({}, [], 0, 100) + res = webapp.getPilotSummaryWeb({}, [], 0, 100) assert res["OK"], res["Message"] assert res["Value"]["TotalRecords"] >= 1 diff --git a/tests/Integration/all_integration_client_tests.sh b/tests/Integration/all_integration_client_tests.sh index cfaf1507340..332d30a07ea 100644 --- a/tests/Integration/all_integration_client_tests.sh +++ b/tests/Integration/all_integration_client_tests.sh @@ -56,9 +56,10 @@ pytest --no-check-dirac-environment "${THIS_DIR}/ResourceStatusSystem/Test_Email echo -e "*** $(date -u) **** WMS TESTS ****\n" pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_SandboxStoreClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) -pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_PilotsClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) -pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_WMSAdministratorClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) -pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_Client_WMS.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) +if [[ -z "${INSTALLATION_BRANCH}" ]]; then + pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_PilotsClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) + pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_Client_WMS.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) +fi # Make sure we have the prod role for these tests to get the VmRpcOperator permission dirac-proxy-init -g prod "${DEBUG}" |& tee -a clientTestOutputs.txt @@ -71,7 +72,9 @@ python "${THIS_DIR}/WorkloadManagementSystem/createJobXMLDescriptions.py" |& tee #-------------------------------------------------------------------------------# echo -e "*** $(date -u) **** MONITORING TESTS ****\n" pytest --no-check-dirac-environment "${THIS_DIR}/Monitoring/Test_MonitoringSystem.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) - +if [[ -z "${INSTALLATION_BRANCH}" ]]; then + pytest --no-check-dirac-environment "${THIS_DIR}/Monitoring/Test_WebAppClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) +fi #-------------------------------------------------------------------------------# echo -e "*** $(date -u) **** TS TESTS ****\n"