Skip to content

Commit c5ca107

Browse files
antonio falabellaaldbr
authored andcommitted
fix(resources): condor command line call compatible with v24
1 parent 44ac56e commit c5ca107

File tree

2 files changed

+87
-121
lines changed

2 files changed

+87
-121
lines changed

src/DIRAC/Resources/Computing/BatchSystems/Condor.py

Lines changed: 69 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from __future__ import print_function
77
from __future__ import absolute_import
88
from __future__ import division
9+
import json
910
import re
1011
import tempfile
1112
import subprocess
@@ -87,63 +88,34 @@
8788
"""
8889

8990

90-
def parseCondorStatus(lines, jobID):
91+
def getCondorStatus(jobMetadata):
9192
"""parse the condor_q or condor_history output for the job status
9293
93-
:param lines: list of lines from the output of the condor commands, each line is a tuple of jobID, statusID, and holdReasonCode
94-
:type lines: python:list
95-
:param str jobID: jobID of condor job, e.g.: 123.53
94+
:param jobMetadata: dict with job metadata
95+
:type lines: dict[str, str | int]
9696
:returns: Status as known by DIRAC, and a reason if the job is being held
9797
"""
98-
jobID = str(jobID)
99-
100-
holdReason = ""
101-
status = None
102-
for line in lines:
103-
l = line.strip().split()
104-
105-
# Make sure the job ID exists
106-
if len(l) < 1 or l[0] != jobID:
107-
continue
108-
109-
# Make sure the status is present and is an integer
110-
try:
111-
status = int(l[1])
112-
except (ValueError, IndexError):
113-
break
114-
115-
# Stop here if the status is not held (5): result should be found in STATES_MAP
116-
if status != 5:
117-
break
118-
119-
# A job can be held for various reasons,
120-
# we need to further investigate with the holdReasonCode & holdReasonSubCode
121-
# Details in:
122-
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode
123-
124-
# By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions
125-
status = 3
126-
try:
127-
holdReasonCode = l[2]
128-
holdReasonSubcode = l[3]
129-
holdReason = " ".join(l[4:])
130-
except IndexError:
131-
# This should not happen in theory
132-
# Just set the status to unknown such as
133-
status = None
134-
holdReasonCode = "undefined"
135-
holdReasonSubcode = "undefined"
136-
break
137-
138-
# If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
139-
# And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
140-
if holdReasonCode == "3" and holdReasonSubcode == HOLD_REASON_SUBCODE:
141-
status = 5
142-
# If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
143-
elif holdReasonCode == "16":
144-
status = 1
145-
146-
return (STATES_MAP.get(status, "Unknown"), holdReason)
98+
if jobMetadata["JobStatus"] != 5:
99+
# If the job is not held, we can return the status directly
100+
return (STATES_MAP.get(jobMetadata["JobStatus"], "Unknown"), "")
101+
102+
# A job can be held for various reasons,
103+
# we need to further investigate with the holdReasonCode & holdReasonSubCode
104+
# Details in:
105+
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode
106+
107+
# By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions
108+
status = 3
109+
110+
# If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
111+
# And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
112+
if jobMetadata["HoldReasonCode"] == "3" and jobMetadata["HoldReasonSubcode"] == HOLD_REASON_SUBCODE:
113+
status = 5
114+
# If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
115+
elif jobMetadata["HoldReasonCode"] == "16":
116+
status = 1
117+
118+
return (STATES_MAP.get(status, "Unknown"), jobMetadata["HoldReason"])
147119

148120

149121
class Condor(object):
@@ -283,7 +255,6 @@ def killJob(self, **kwargs):
283255

284256
def getJobStatus(self, **kwargs):
285257
"""Get status of the jobs in the given list"""
286-
287258
resultDict = {}
288259

289260
MANDATORY_PARAMETERS = ["JobIDList"]
@@ -299,15 +270,12 @@ def getJobStatus(self, **kwargs):
299270
resultDict["Message"] = "Empty job list"
300271
return resultDict
301272

302-
user = kwargs.get("User")
303-
if not user:
304-
user = os.environ.get("USER")
305-
if not user:
306-
resultDict["Status"] = -1
307-
resultDict["Message"] = "No user name"
308-
return resultDict
273+
# Prepare the command to get the status of the jobs
274+
cmdJobs = " ".join(str(jobID) for jobID in jobIDList)
275+
attributes = "ClusterId,ProcId,JobStatus,HoldReasonCode,HoldReasonSubCode,HoldReason"
309276

310-
cmd = "condor_q -submitter %s -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason" % user
277+
# Get the status of the jobs currently active
278+
cmd = f"condor_q {cmdJobs} -attributes {attributes} -json"
311279
sp = subprocess.Popen(
312280
shlex.split(cmd),
313281
stdout=subprocess.PIPE,
@@ -317,16 +285,15 @@ def getJobStatus(self, **kwargs):
317285
output, error = sp.communicate()
318286
status = sp.returncode
319287

320-
if status != 0:
288+
if status != 0 or not output:
321289
resultDict["Status"] = status
322290
resultDict["Message"] = error
323291
return resultDict
324292

325-
qList = output.strip().split("\n")
293+
jobMetadata = json.loads(output)
326294

327-
condorHistCall = (
328-
"condor_history -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason -submitter %s" % user
329-
)
295+
# Get the status of the jobs in the history
296+
condorHistCall = f"condor_history {cmdJobs} -attributes {attributes} -json"
330297
sp = subprocess.Popen(
331298
shlex.split(condorHistCall),
332299
stdout=subprocess.PIPE,
@@ -335,15 +302,18 @@ def getJobStatus(self, **kwargs):
335302
)
336303
output, _ = sp.communicate()
337304
status = sp.returncode
338-
if status == 0:
339-
for line in output.split("\n"):
340-
qList.append(line)
305+
306+
if status != 0 or not output:
307+
resultDict["Status"] = status
308+
resultDict["Message"] = error
309+
return resultDict
310+
311+
jobMetadata += json.loads(output)
341312

342313
statusDict = {}
343-
if len(qList):
344-
for job in jobIDList:
345-
job = str(job)
346-
statusDict[job], _ = parseCondorStatus(qList, job)
314+
for jobDict in jobMetadata:
315+
jobID = f"{jobDict['ClusterId']}.{jobDict['ProcId']}"
316+
statusDict[jobID], _ = getCondorStatus(jobDict)
347317

348318
# Final output
349319
status = 0
@@ -355,54 +325,45 @@ def getCEStatus(self, **kwargs):
355325
"""Get the overall status of the CE"""
356326
resultDict = {}
357327

358-
user = kwargs.get("User")
359-
if not user:
360-
user = os.environ.get("USER")
361-
if not user:
328+
cmd = "condor_q -totals -json"
329+
sp = subprocess.Popen(
330+
shlex.split(cmd),
331+
stdout=subprocess.PIPE,
332+
stderr=subprocess.PIPE,
333+
universal_newlines=True,
334+
)
335+
output, error = sp.communicate()
336+
status = sp.returncode
337+
338+
if status != 0 or not output:
362339
resultDict["Status"] = -1
363-
resultDict["Message"] = "No user name"
340+
resultDict["Message"] = error
364341
return resultDict
365342

366-
waitingJobs = 0
367-
runningJobs = 0
343+
jresult = json.loads(output)
344+
resultDict["Status"] = 0
345+
resultDict["Waiting"] = jresult[0]["Idle"]
346+
resultDict["Running"] = jresult[0]["Running"]
368347

348+
# We also need to check the hold jobs, some of them are actually waiting (e.g. for input files)
349+
cmd = 'condor_q -json -constraint "JobStatus == 5" -attributes HoldReasonCode'
369350
sp = subprocess.Popen(
370-
shlex.split("condor_q -submitter %s" % user),
351+
shlex.split(cmd),
371352
stdout=subprocess.PIPE,
372353
stderr=subprocess.PIPE,
373354
universal_newlines=True,
374355
)
375356
output, error = sp.communicate()
376357
status = sp.returncode
377358

378-
if status != 0:
379-
if "no record" in output:
380-
resultDict["Status"] = 0
381-
resultDict["Waiting"] = waitingJobs
382-
resultDict["Running"] = runningJobs
383-
return resultDict
384-
resultDict["Status"] = status
359+
if status != 0 or not output:
360+
resultDict["Status"] = -1
385361
resultDict["Message"] = error
386362
return resultDict
387363

388-
if "no record" in output:
389-
resultDict["Status"] = 0
390-
resultDict["Waiting"] = waitingJobs
391-
resultDict["Running"] = runningJobs
392-
return resultDict
393-
394-
if output:
395-
lines = output.split("\n")
396-
for line in lines:
397-
if not line.strip():
398-
continue
399-
if " I " in line:
400-
waitingJobs += 1
401-
elif " R " in line:
402-
runningJobs += 1
364+
jresult = json.loads(output)
365+
for job_metadata in jresult:
366+
if job_metadata["HoldReasonCode"] == 16:
367+
resultDict["Waiting"] += 1
403368

404-
# Final output
405-
resultDict["Status"] = 0
406-
resultDict["Waiting"] = waitingJobs
407-
resultDict["Running"] = runningJobs
408369
return resultDict

src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
import datetime
5252
import errno
53+
import json
5354
import os
5455
import subprocess
5556
import tempfile
@@ -63,7 +64,7 @@
6364
from DIRAC.Core.Utilities.List import breakListIntoChunks
6465
from DIRAC.Core.Utilities.Subprocess import systemCall
6566
from DIRAC.FrameworkSystem.private.authorization.utils.Tokens import writeToTokenFile
66-
from DIRAC.Resources.Computing.BatchSystems.Condor import HOLD_REASON_SUBCODE, parseCondorStatus, subTemplate
67+
from DIRAC.Resources.Computing.BatchSystems.Condor import HOLD_REASON_SUBCODE, getCondorStatus, subTemplate
6768
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
6869
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
6970

@@ -400,45 +401,49 @@ def getJobStatus(self, jobIDList):
400401
if isinstance(jobIDList, str):
401402
jobIDList = [jobIDList]
402403

404+
self.tokenFile = None
403405
resultDict = {}
404406
condorIDs = {}
405407
# Get all condorIDs so we can just call condor_q and condor_history once
406408
for jobReference in jobIDList:
407409
jobReference = jobReference.split(":::")[0]
408-
condorIDs[jobReference] = self._jobReferenceToCondorID(jobReference)
410+
condorIDs[self._jobReferenceToCondorID(jobReference)] = jobReference
409411

410-
self.tokenFile = None
412+
attributes = "ClusterId,ProcId,JobStatus,HoldReasonCode,HoldReasonSubCode,HoldReason"
411413

412414
qList = []
413-
for _condorIDs in breakListIntoChunks(condorIDs.values(), 100):
414-
# This will return a list of 1245.75 3 undefined undefined undefined
415+
for _condorIDs in breakListIntoChunks(condorIDs.keys(), 100):
415416
cmd = ["condor_q"]
416417
cmd.extend(self.remoteScheddOptions.strip().split(" "))
417418
cmd.extend(_condorIDs)
418-
cmd.extend(["-af:j", "JobStatus", "HoldReasonCode", "HoldReasonSubCode", "HoldReason"])
419+
cmd.extend(["-attributes", attributes])
420+
cmd.extend(["-json"])
419421
result = self._executeCondorCommand(cmd, keepTokenFile=True)
420422
if not result["OK"]:
421423
return result
422424

423-
qList.extend(result["Value"].split("\n"))
425+
qList.extend(json.loads(result["Value"]))
424426

425427
condorHistCall = ["condor_history"]
426428
condorHistCall.extend(self.remoteScheddOptions.strip().split(" "))
427429
condorHistCall.extend(_condorIDs)
428-
condorHistCall.extend(["-af:j", "JobStatus", "HoldReasonCode", "HoldReasonSubCode", "HoldReason"])
430+
condorHistCall.extend(["-attributes", attributes])
431+
condorHistCall.extend(["-json"])
429432
result = self._executeCondorCommand(cmd, keepTokenFile=True)
430433
if not result["OK"]:
431434
return result
432435

433-
qList.extend(result["Value"].split("\n"))
436+
qList.extend(json.loads(result["Value"]))
434437

435-
for job, jobID in condorIDs.items():
436-
jobStatus, reason = parseCondorStatus(qList, jobID)
438+
for jobMetadata in qList:
439+
jobStatus, reason = getCondorStatus(jobMetadata)
440+
condorId = f"{jobMetadata['ClusterId']}.{jobMetadata['ProcId']}"
441+
jobReference = condorIDs.get(condorId)
437442

438443
if jobStatus == PilotStatus.ABORTED:
439-
self.log.verbose("Job", f"{jobID} held: {reason}")
444+
self.log.verbose("Job", f"{jobReference} held: {reason}")
440445

441-
resultDict[job] = jobStatus
446+
resultDict[jobReference] = jobStatus
442447

443448
self.tokenFile = None
444449

0 commit comments

Comments
 (0)