From dd63c0eff84bd88a26785cd533b2aba2b48b6db2 Mon Sep 17 00:00:00 2001 From: Simon Fayer Date: Tue, 16 Jun 2026 20:49:40 +0100 Subject: [PATCH] fix: Parameterise SQL in WMS JobDB --- .../WorkloadManagementSystem/DB/JobDB.py | 550 ++++++------------ 1 file changed, 183 insertions(+), 367 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 829bda4a2c0..b064acfaebf 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -122,17 +122,14 @@ def getJobParameters(self, jobID, paramList=None): if paramList: if isinstance(paramList, str): paramList = paramList.split(",") - paramNameList = [] - for pn in paramList: - ret = self._escapeString(pn) - if not ret["OK"]: - return ret - paramNameList.append(ret["Value"]) - cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})".format( - ",".join(str(int(j)) for j in jobIDList), - ",".join(paramNameList), - ) - result = self._query(cmd) + cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN (" + cmd += ",".join(["%s"] * len(jobIDList)) + args = jobIDList + cmd += ") AND Name IN (" + cmd += ",".join(["%s"] * len(paramList)) + cmd += ")" + args.extend(paramList) + result = self._query(cmd, args=args) if result["OK"]: if result["Value"]: for res_jobID, res_name, res_value in result["Value"]: @@ -168,30 +165,21 @@ def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1): If recheduleCounter = -1, all cycles are returned. """ - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - # self.log.debug('JobDB.getAtticJobParameters: Getting Attic Parameters for job %s' % jobID) resultDict = {} - paramCondition = "" + cmd = "SELECT Name, Value, RescheduleCycle from AtticJobParameters WHERE JobID=%s" + args = [jobID] if paramList: - paramNameList = [] - for x in paramList: - ret = self._escapeString(x) - if not ret["OK"]: - return ret - paramNameList.append(ret["Value"]) - paramNames = ",".join(paramNameList) - paramCondition = f" AND Name in ({paramNames})" - rCounter = "" + cmd += " AND Name in (" + cmd += ",".join(["%s"] * len(paramList)) + cmd += ")" + args.extend(paramList) if rescheduleCounter != -1: - rCounter = " AND RescheduleCycle=%d" % int(rescheduleCounter) - cmd = "SELECT Name, Value, RescheduleCycle from AtticJobParameters" - cmd += f" WHERE JobID={jobID} {paramCondition} {rCounter}" - result = self._query(cmd) + cmd += " AND RescheduleCycle=%s" + args.append(rescheduleCounter) + + result = self._query(cmd, args=args) if result["OK"]: if result["Value"]: for name, value, counter in result["Value"]: @@ -215,10 +203,14 @@ def getJobsAttributes(self, jobIDs, attrList=None): return {} # If no list of attributes is given, return all attributes - if not attrList: + if attrList: + if isinstance(attrList, str): + attrList = attrList.replace(" ", "").split(",") + for attrName in attrList: + if attrName.lower() not in [x.lower() for x in self.jobAttributeNames]: + return S_ERROR(f"Unknown job attribute: {attrName}") + else: attrList = self.jobAttributeNames - if isinstance(attrList, str): - attrList = attrList.replace(" ", "").split(",") attrList.sort() if isinstance(jobIDs, str): @@ -226,18 +218,14 @@ def getJobsAttributes(self, jobIDs, attrList=None): if isinstance(jobIDs, int): jobIDs = [jobIDs] - attrNameListS = [] - for x in attrList: - x = "`" + returnValueOrRaise(self._escapeString(x))[1:-1] + "`" - attrNameListS.append(x) - attrNames = "JobID," + ",".join(attrNameListS) - sqlCmd = "CREATE TEMPORARY TABLE to_select_Jobs (JobID INTEGER NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;" returnValueOrRaise(self._update(sqlCmd)) try: sqlCmd = "INSERT INTO to_select_Jobs (JobID) VALUES ( %s )" returnValueOrRaise(self._updatemany(sqlCmd, [(int(j),) for j in jobIDs])) - sqlCmd = f"SELECT {attrNames} FROM Jobs JOIN to_select_Jobs USING (JobID)" + sqlCmd = "SELECT " + sqlCmd += "JobID," + ",".join(attrList) + sqlCmd += " FROM Jobs JOIN to_select_Jobs USING (JobID)" result = returnValueOrRaise(self._query(sqlCmd)) finally: sqlCmd = "DROP TEMPORARY TABLE to_select_Jobs" @@ -302,24 +290,14 @@ def getJobOptParameters(self, jobID, paramList=None): empty, get all the parameters then """ - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - + cmd = "SELECT Name, Value from OptimizerParameters WHERE JobID=%s" + args = [jobID] if paramList: - paramNameList = [] - for x in paramList: - ret = self._escapeString(x) - if not ret["OK"]: - return ret - paramNameList.append(ret["Value"]) - paramNames = ",".join(paramNameList) - cmd = f"SELECT Name, Value from OptimizerParameters WHERE JobID={jobID} and Name in ({paramNames})" - else: - cmd = f"SELECT Name, Value from OptimizerParameters WHERE JobID={jobID}" - - result = self._query(cmd) + cmd += " AND Name IN (" + cmd += ",".join(["%s"] * len(paramList)) + cmd += ")" + args.extend(paramList) + result = self._query(cmd, args=args) if not result["OK"]: return S_ERROR("JobDB.getJobOptParameters: failed to retrieve parameters") try: @@ -340,19 +318,20 @@ def getInputData(self, jobID: list[int | str]) -> DReturnType[dict[int, list[str def getInputData(self, jobID: int | str | list[int | str]) -> DReturnType[list[str] | dict[int, list[str]]]: """Get input data for the given job""" + cmd = "SELECT JobID, LFN FROM InputData" + args = [] if isinstance(jobID, (int, str)): - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - query = f"JobID={jobID}" + cmd += " WHERE JobID=%s" + args.append(str(jobID)) result = [] else: - job_ids = {int(i) for i in jobID} - query = f"JobID IN ({','.join(map(str, job_ids))})" - result = {i: [] for i in job_ids} - cmd = f"SELECT JobID, LFN FROM InputData WHERE {query}" - res = self._query(cmd) + # jobID is actually a list of jobIDs + cmd += " WHERE JobID IN (" + cmd += ",".join(["%s"] * len(jobID)) + cmd += ")" + args.extend({str(x) for x in jobID}) + result = {int(i): [] for i in jobID} + res = self._query(cmd, args=args) if not res["OK"]: return res @@ -370,12 +349,8 @@ def getInputData(self, jobID: int | str | list[int | str]) -> DReturnType[list[s ############################################################################# def setInputData(self, jobID, inputData): """Inserts input data for the given job""" - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - cmd = f"DELETE FROM InputData WHERE JobID={jobID}" - result = self._update(cmd) + cmd = "DELETE FROM InputData WHERE JobID=%s" + result = self._update(cmd, args=(str(jobID),)) if not result["OK"]: result = S_ERROR("JobDB.setInputData: operation failed.") @@ -383,12 +358,9 @@ def setInputData(self, jobID, inputData): # some jobs are setting empty string as InputData if not lfn: continue - ret = self._escapeString(lfn.strip()) - if not ret["OK"]: - return ret - lfn = ret["Value"] - cmd = f"INSERT INTO InputData (JobID,LFN) VALUES ({jobID}, {lfn} )" - res = self._update(cmd) + lfn = lfn.strip() + cmd = "INSERT INTO InputData (JobID,LFN) VALUES (%s, %s)" + res = self._update(cmd, args=(str(jobID), lfn)) if not res["OK"]: return res @@ -491,33 +463,28 @@ def setJobAttribute(self, jobID, attrName, attrValue, update=False, myDate=None, if not res["OK"]: return res if update: - cmd = f"UPDATE Jobs SET LastUpdateTime=UTC_TIMESTAMP() WHERE JobID={jobID}" + cmd = "UPDATE Jobs SET LastUpdateTime=UTC_TIMESTAMP() WHERE JobID=%s" + args = [str(jobID)] if myDate: - cmd += f" AND LastUpdateTime < {myDate}" - return self._update(cmd) + cmd += " AND LastUpdateTime < %s" + args.append(myDate) + return self._update(cmd, args=args) else: return res # if we are here it's because we are not updating the status - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - - ret = self._escapeString(attrValue) - if not ret["OK"]: - return ret - value = ret["Value"] - + cmd = "UPDATE Jobs SET " + cmd += "`" + attrName + "`=%s" + args = [attrValue] if update: - cmd = f"UPDATE Jobs SET {attrName}={value},LastUpdateTime=UTC_TIMESTAMP() WHERE JobID={jobID}" - else: - cmd = f"UPDATE Jobs SET {attrName}={value} WHERE JobID={jobID}" - + cmd += ",LastUpdateTime=UTC_TIMESTAMP()" + cmd += " WHERE JobID=%s" + args.append(str(jobID)) if myDate: - cmd += f" AND LastUpdateTime < {myDate}" + cmd += " AND LastUpdateTime < %s" + args.append(myDate) - return self._update(cmd) + return self._update(cmd, args=args) ############################################################################# def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=None, force=False): @@ -558,6 +525,7 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No if attrName not in self.jobAttributeNames: return S_ERROR(EWMSJMAN, "Request to set non-existing job attribute") + statusDone = False if "Status" in attrNames: # Treat this update separately res = self.setJobsMajorStatus(jIDList, attrValues[attrNames.index("Status")], force=force) @@ -565,25 +533,33 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No return res attrValues.pop(attrNames.index("Status")) attrNames.remove("Status") + statusDone = True - attr = [] - + attrs = [] + args = [] for name, value in zip(attrNames, attrValues): - ret = self._escapeString(value) - if not ret["OK"]: - return ret - attr.append(f"{name}={ret['Value']}") + attrs.append(f"{name}=%s") + args.append(value) if update: - attr.append("LastUpdateTime=UTC_TIMESTAMP()") - if not attr: - return S_ERROR("JobDB.setAttributes: Nothing to do") - - cmd = f"UPDATE Jobs SET {', '.join(attr)} WHERE JobID in ( {', '.join(str(int(j)) for j in jIDList)} )" - + attrs.append("LastUpdateTime=UTC_TIMESTAMP()") + if not attrs: + if statusDone: + # We did update status earlier, so having no more work isn't an error! + return S_OK() + else: + return S_ERROR("JobDB.setAttributes: Nothing to do") + + cmd = "UPDATE Jobs SET " + cmd += ",".join(attrs) + cmd += " WHERE JobID in (" + cmd += ",".join(["%s"] * len(jobIDList)) + cmd += ")" + args.extend(jobIDList) if myDate: - cmd += f" AND LastUpdateTime < {myDate}" + cmd += " AND LastUpdateTime < %s" + args.append(myDate) - return self._update(cmd) + return self._update(cmd, args=args) def setJobsMajorStatus(self, jIDList, candidateStatus, force=False): """ @@ -620,20 +596,8 @@ def setJobsMajorStatus(self, jIDList, candidateStatus, force=False): newStatuses[jID] = nextState - cmd = "INSERT INTO Jobs (JobID, Status) VALUES " - - ns = [] - for jID, status in newStatuses.items(): - ret_status = self._escapeString(status) - if not ret_status["OK"]: - return ret_status - status = ret_status["Value"] - ns.append(f"({jID}, {status})") - cmd += ",".join(ns) - - cmd += " ON DUPLICATE KEY UPDATE Status=VALUES(Status)" - - return self._update(cmd) + cmd = "INSERT INTO Jobs (JobID, Status) VALUES (%s, %s) ON DUPLICATE KEY UPDATE Status=VALUES(Status)" + return self._updatemany(cmd, data=newStatuses.items()) def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus="", force=False): """Set status of the job specified by its jobID""" @@ -664,153 +628,91 @@ def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus="", f def setEndExecTime(self, jobID, endDate=None): """Set EndExecTime time stamp""" - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - + args = [] + req = "UPDATE Jobs SET EndExecTime=" if endDate: - ret = self._escapeString(endDate) - if not ret["OK"]: - return ret - endDate = ret["Value"] + req += "%s" + args.append(endDate) else: - endDate = "UTC_TIMESTAMP()" - req = f"UPDATE Jobs SET EndExecTime={endDate} WHERE JobID={jobID} AND EndExecTime IS NULL" - return self._update(req) + req += "UTC_TIMESTAMP()" + req += " WHERE JobID=%s AND EndExecTime IS NULL" + args.append(str(jobID)) + return self._update(req, args=args) ############################################################################# def setStartExecTime(self, jobID, startDate=None): """Set StartExecTime time stamp and HeartBeatTime if not already set""" - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - - if startDate: - ret = self._escapeString(startDate) + # Set also the HeartBeatTime in case the job gets stuck before sending the first HeartBeat + for field in ("HeartBeatTime", "StartExecTime"): + args = [] + req = "UPDATE Jobs SET " + req += field + if startDate: + req += "=%s" + args.append(startDate) + else: + req += "=UTC_TIMESTAMP()" + req += " WHERE JobID=%s AND `" + field + "` IS NULL" + args.append(str(jobID)) + ret = self._update(req, args=args) if not ret["OK"]: return ret - startDate = ret["Value"] - else: - startDate = "UTC_TIMESTAMP()" - # Set also the HeartBeatTime in case the job gets stuck before sending the first HeartBeat - req = f"UPDATE Jobs SET HeartBeatTime={startDate} WHERE JobID={jobID} AND HeartBeatTime IS NULL" - ret = self._update(req) - if not ret["OK"]: - return ret - req = f"UPDATE Jobs SET StartExecTime={startDate} WHERE JobID={jobID} AND StartExecTime IS NULL" - return self._update(req) + return S_OK() ############################################################################# def setJobOptParameter(self, jobID, name, value): """Set an optimzer parameter specified by name,value pair for the job JobID""" - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - e_jobID = ret["Value"] - - ret = self._escapeString(name) - if not ret["OK"]: - return ret - e_name = ret["Value"] - - cmd = f"DELETE FROM OptimizerParameters WHERE JobID={e_jobID} AND Name={e_name}" - res = self._update(cmd) + # Remove old parameter and then insert new one + res = self.removeJobOptParameter(jobID, name) if not res["OK"]: return res - return self.insertFields("OptimizerParameters", ["JobID", "Name", "Value"], [jobID, name, value]) ############################################################################# def removeJobOptParameter(self, jobID, name): """Remove the specified optimizer parameter for jobID""" - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - ret = self._escapeString(name) - if not ret["OK"]: - return ret - name = ret["Value"] - - cmd = f"DELETE FROM OptimizerParameters WHERE JobID={jobID} AND Name={name}" - return self._update(cmd) + cmd = "DELETE FROM OptimizerParameters WHERE JobID=%s AND Name=%s" + return self._update(cmd, args=(str(jobID), name)) ############################################################################# def setAtticJobParameter(self, jobID, key, value, rescheduleCounter): """Set attic parameter for job specified by its jobID when job rescheduling for later debugging """ - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - - ret = self._escapeString(key) - if not ret["OK"]: - return ret - key = ret["Value"] - - ret = self._escapeString(value) - if not ret["OK"]: - return ret - value = ret["Value"] - - ret = self._escapeString(rescheduleCounter) - if not ret["OK"]: - return ret - rescheduleCounter = ret["Value"] - - cmd = "INSERT INTO AtticJobParameters (JobID,RescheduleCycle,Name,Value) VALUES({},{},{},{})".format( - jobID, - rescheduleCounter, - key, - value, - ) - return self._update(cmd) + cmd = "INSERT INTO AtticJobParameters (JobID,RescheduleCycle,Name,Value) VALUES(%s,%s,%s,%s)" + args = (str(jobID), rescheduleCounter, key, value) + return self._update(cmd, args=args) ############################################################################# def setJobJDL(self, jobID, jdl=None, originalJDL=None): - """Insert JDL's for job specified by jobID""" - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] + """Insert JDLs for job specified by jobID""" - req = f"SELECT OriginalJDL FROM JobJDLs WHERE JobID={jobID}" - result = self._query(req) + req = f"SELECT OriginalJDL FROM JobJDLs WHERE JobID=%s" + result = self._query(req, args=(str(jobID),)) updateFlag = False if result["OK"] and result["Value"]: updateFlag = True if jdl: - ret = self._escapeString(compressJDL(jdl)) - if not ret["OK"]: - return ret - e_JDL = ret["Value"] - if updateFlag: - cmd = f"UPDATE JobJDLs Set JDL={e_JDL} WHERE JobID={jobID}" + cmd = f"UPDATE JobJDLs Set JDL=%s WHERE JobID=%s" + args = (compressJDL(jdl), str(jobID)) else: - cmd = f"INSERT INTO JobJDLs (JobID,JDL) VALUES ({jobID},{e_JDL})" - result = self._update(cmd) + cmd = f"INSERT INTO JobJDLs (JobID,JDL) VALUES (%s, %s)" + args = (str(jobID), compressJDL(jdl)) + result = self._update(cmd, args=args) if not result["OK"]: return result if originalJDL: - ret = self._escapeString(compressJDL(originalJDL)) - if not ret["OK"]: - return ret - e_originalJDL = ret["Value"] - if updateFlag: - cmd = f"UPDATE JobJDLs Set OriginalJDL={e_originalJDL} WHERE JobID={jobID}" + cmd = "UPDATE JobJDLs Set OriginalJDL=%s WHERE JobID=%s" + args = (compressJDL(originalJDL), str(jobID)) else: - cmd = f"INSERT INTO JobJDLs (JobID,OriginalJDL) VALUES ({jobID},{e_originalJDL})" - - result = self._update(cmd) + cmd = "INSERT INTO JobJDLs (JobID,OriginalJDL) VALUES (%s, %s)" + args = (str(jobID), compressJDL(originalJDL)) + result = self._update(cmd, args=args) return result @@ -839,17 +741,12 @@ def getJobJDL(self, jobID, original=False): """Get JDL for job specified by its jobID. By default the current job JDL is returned. If 'original' argument is True, original JDL is returned """ - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - if original: - cmd = f"SELECT OriginalJDL FROM JobJDLs WHERE JobID={jobID}" + cmd = "SELECT OriginalJDL FROM JobJDLs WHERE JobID=%s" else: - cmd = f"SELECT JDL FROM JobJDLs WHERE JobID={jobID}" + cmd = "SELECT JDL FROM JobJDLs WHERE JobID=%s" - result = self._query(cmd) + result = self._query(cmd, args=(str(jobID),)) if result["OK"]: jdl = result["Value"] if not jdl: @@ -956,25 +853,12 @@ def insertNewJobIntoDB( inputData = classAdJob.getListFromExpression("InputData") values = [] - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - e_jobID = ret["Value"] - for lfn in inputData: - # some jobs are setting empty string as InputData - if not lfn: - continue - ret = self._escapeString(lfn.strip()) - if not ret["OK"]: - return ret - lfn = ret["Value"] - - values.append(f"({e_jobID}, {lfn} )") + values.append((jobID, lfn)) if values: - cmd = f"INSERT INTO InputData (JobID,LFN) VALUES {', '.join(values)}" - result = self._update(cmd) + cmd = "INSERT INTO InputData (JobID,LFN) VALUES (%s, %s)" + result = self._updatemany(cmd, data=values) if not result["OK"]: return result @@ -1096,17 +980,12 @@ def rescheduleJob(self, jobID): if not result["OK"]: break - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - e_jobID = ret["Value"] - - res = self._update(f"DELETE FROM JobParameters WHERE JobID={e_jobID}") + res = self._update("DELETE FROM JobParameters WHERE JobID=%s", args=(str(jobID),)) if not res["OK"]: return res # Delete optimizer parameters - if not self._update(f"DELETE FROM OptimizerParameters WHERE JobID={e_jobID}")["OK"]: + if not self._update("DELETE FROM OptimizerParameters WHERE JobID=%s", args=(str(jobID),))["OK"]: return S_ERROR("JobDB.removeJobOptParameter: operation failed.") # the JobManager needs to know if there is InputData ??? to decide which optimizer to call @@ -1153,17 +1032,11 @@ def rescheduleJob(self, jobID): site = siteList[0] jobAttrs["Site"] = site - jobAttrs["Status"] = JobStatus.RECEIVED - jobAttrs["MinorStatus"] = JobMinorStatus.RESCHEDULED - jobAttrs["ApplicationStatus"] = "Unknown" - jobAttrs["LastUpdateTime"] = str(datetime.datetime.utcnow()) - jobAttrs["RescheduleTime"] = str(datetime.datetime.utcnow()) - jobAttrs["VO"] = getVOForGroup(resultDict["OwnerGroup"]) reqJDL = classAdReq.asJDL() @@ -1342,25 +1215,20 @@ def getSiteSummaryWeb(self, selectDict, sortList, startItem, maxItems): def setHeartBeatData(self, jobID, dynamicDataDict): """Add the job's heart beat data to the database""" - # Set the time stamp first - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - e_jobID = ret["Value"] - # If HeartBeatTime is being set, set it... timeStamp = dynamicDataDict.pop("HeartBeatTime", None) + req = "UPDATE Jobs SET " + args = [] if timeStamp: - result = self._escapeString(timeStamp) - if not result["OK"]: - self.log.warn("Failed to escape string ", timeStamp) - return result - req = f"UPDATE Jobs SET HeartBeatTime={result['Value']} WHERE JobID={e_jobID}" + req += "HeartBeatTime=%s " + args.append(timeStamp) else: - req = f"UPDATE Jobs SET HeartBeatTime=UTC_TIMESTAMP(), Status='{JobStatus.RUNNING}' WHERE JobID={e_jobID}" - - result = self._update(req) + req += "HeartBeatTime=UTC_TIMESTAMP(),Status=%s " + args.append(JobStatus.RUNNING) + req += "WHERE JobID=%s" + args.append(str(jobID)) + result = self._update(req, args=args) if not result["OK"]: return S_ERROR(f"Failed to set the heart beat time: {result['Message']}") @@ -1368,22 +1236,11 @@ def setHeartBeatData(self, jobID, dynamicDataDict): # Add dynamic data to the job heart beat log valueList = [] for key, value in dynamicDataDict.items(): - result = self._escapeString(key) - if not result["OK"]: - self.log.warn("Failed to escape string", key) - continue - e_key = result["Value"] - result = self._escapeString(value) - if not result["OK"]: - self.log.warn("Failed to escape string", value) - continue - e_value = result["Value"] - valueList.append(f"( {e_jobID}, {e_key}, {e_value}, UTC_TIMESTAMP())") + valueList.append((str(jobID), key, value)) if valueList: - req = "INSERT INTO HeartBeatLoggingInfo (JobID,Name,Value,HeartBeatTime) VALUES " - req += ",".join(valueList) - result = self._update(req) + req = "INSERT INTO HeartBeatLoggingInfo (JobID,Name,Value,HeartBeatTime) VALUES (%s,%s,%s,UTC_TIMESTAMP())" + result = self._updatemany(req, data=valueList) if not result["OK"]: ok = False self.log.warn("Error storing heart beat data", result["Message"]) @@ -1393,12 +1250,10 @@ def setHeartBeatData(self, jobID, dynamicDataDict): ##################################################################################### def getHeartBeatData(self, jobID): """Retrieve the job's heart beat data""" - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - res = self._query(f"SELECT Name,Value,HeartBeatTime from HeartBeatLoggingInfo WHERE JobID={jobID}") + res = self._query( + "SELECT Name,Value,HeartBeatTime from HeartBeatLoggingInfo WHERE JobID=%s", args=(str(jobID),) + ) if not res["OK"]: return res @@ -1418,43 +1273,20 @@ def getHeartBeatData(self, jobID): ##################################################################################### def setJobCommand(self, jobID, command, arguments=None): """Store a command to be passed to the job together with the next heart beat""" - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - - ret = self._escapeString(command) - if not ret["OK"]: - return ret - command = ret["Value"] - - if arguments: - ret = self._escapeString(arguments) - if not ret["OK"]: - return ret - arguments = ret["Value"] - else: - arguments = "''" - req = "INSERT INTO JobCommands (JobID,Command,Arguments,ReceptionTime) " - req += f"VALUES ({jobID}, {command}, {arguments}, UTC_TIMESTAMP())" - return self._update(req) + if not arguments: + arguments = "" + req = "INSERT INTO JobCommands (JobID,Command,Arguments,ReceptionTime) VALUES (%s,%s,%s,UTC_TIMESTAMP())" + args = (jobID, command, arguments) + return self._update(req, args=args) ##################################################################################### def getJobCommand(self, jobID, status=JobStatus.RECEIVED): """Get a command to be passed to the job together with the next heart beat""" - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - - ret = self._escapeString(status) - if not ret["OK"]: - return ret - status = ret["Value"] - - result = self._query(f"SELECT Command, Arguments FROM JobCommands WHERE JobID={jobID} AND Status={status}") + req = "SELECT Command, Arguments FROM JobCommands WHERE JobID=%s AND Status=%s" + args = (str(jobID), status) + result = self._query(req, args=args) if not result["OK"]: return result @@ -1463,35 +1295,28 @@ def getJobCommand(self, jobID, status=JobStatus.RECEIVED): ##################################################################################### def setJobCommandStatus(self, jobID, command, status): """Set the command status""" - ret = self._escapeString(jobID) - if not ret["OK"]: - return ret - jobID = ret["Value"] - - ret = self._escapeString(command) - if not ret["OK"]: - return ret - command = ret["Value"] - - ret = self._escapeString(status) - if not ret["OK"]: - return ret - status = ret["Value"] - - return self._update(f"UPDATE JobCommands SET Status={status} WHERE JobID={jobID} AND Command={command}") + req = "UPDATE JobCommands SET Status=%s WHERE JobID=%s AND Command=%s" + args = (status, str(jobID), command) + return self._update(req, args=args) ##################################################################################### def getSummarySnapshot(self, requestedFields=False): """Get the summary snapshot for a given combination""" - if not requestedFields: - requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"] - valueFields = ["COUNT(JobID)", "SUM(RescheduleCounter)"] - defString = ", ".join(requestedFields) - valueString = ", ".join(valueFields) - result = self._query(f"SELECT {defString}, {valueString} FROM Jobs GROUP BY {defString}") + fields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"] + if requestedFields: + for field in requestedFields: + if field.lower() not in [x.lower() for x in self.jobAttributeNames]: + return S_ERROR(f"Unknown summarySnapshot job field name: {field}") + fields = requestedFields + extraFields = ["COUNT(JobID)", "SUM(RescheduleCounter)"] + req = "SELECT " + req += ", ".join(fields + extraFields) + req += " FROM Jobs GROUP BY " + req += ", ".join(fields) + result = self._query(req) if not result["OK"]: return result - return S_OK(((requestedFields + valueFields), result["Value"])) + return S_OK(((fields + extraFields), result["Value"])) def removeInfoFromHeartBeatLogging(self, status, delTime, maxLines): """Remove HeartBeatLoggingInfo from DB. @@ -1501,16 +1326,6 @@ def removeInfoFromHeartBeatLogging(self, status, delTime, maxLines): :param int maxLines: maximum number of lines to be removed :returns: S_OK/S_ERROR """ - ret = self._escapeString(status) - if not ret["OK"]: - return ret - status = ret["Value"] - - ret = self._escapeString(delTime) - if not ret["OK"]: - return ret - delTime = ret["Value"] - self.log.verbose("Removing HeartBeatLogginInfo for", f"{status!r} {delTime!r} {maxLines!r}") cmd = """DELETE h FROM HeartBeatLoggingInfo AS h JOIN (SELECT hi.JobID FROM HeartBeatLoggingInfo AS hi @@ -1518,12 +1333,13 @@ def removeInfoFromHeartBeatLogging(self, status, delTime, maxLines): WHERE j.Status = %(status)s AND LastUpdateTime < %(delay)s - LIMIT %(maxLines)d) h2 - ON h2.JobID = h.JobID""" % { + LIMIT %(maxLines)s) h2 + ON h2.JobID = h.JobID""" + args = { "maxLines": maxLines, "status": status, "delay": delTime, } - result = self._update(cmd) + result = self._update(cmd, args=args) self.log.verbose("Removed from HBLI", result) return result