Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions src/DIRAC/DataManagementSystem/Client/FTS3Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def monitor(self, context=None, ftsServer=None, ucert=None):


"""
isMultiHop = False

if not self.ftsGUID:
return S_ERROR("FTSGUID not set, FTS job not submitted?")
Expand All @@ -221,6 +222,10 @@ def monitor(self, context=None, ftsServer=None, ucert=None):
except FTS3ClientException as e:
return S_ERROR(f"Error getting the job status {e}")

job_metadata = jobStatusDict["job_metadata"]
if isinstance(job_metadata, dict):
isMultiHop = job_metadata.get("isMultiHop", False)

now = datetime.datetime.utcnow().replace(microsecond=0)
self.lastMonitor = now

Expand All @@ -230,19 +235,21 @@ def monitor(self, context=None, ftsServer=None, ucert=None):
self.lastUpdate = now
self.error = jobStatusDict["reason"]

if newStatus in self.FINAL_STATES:
self._fillAccountingDicts(jobStatusDict)
# if newStatus in self.FINAL_STATES:
# self._fillAccountingDicts(jobStatusDict)

filesInfoList = jobStatusDict["files"]
# Make a copy, since we are potentially
# deleting objects or editing
orig_filesInfoList = list(filesInfoList) # Make a copy
filesStatus = {}
statusSummary = {}

# Make a copy, since we are potentially

# deleting objects
for fileDict in list(filesInfoList):
for fileDict in orig_filesInfoList:
file_state = fileDict["file_state"].capitalize()
file_metadata = fileDict["file_metadata"]
file_error = fileDict["reason"]
is_recoverable = fileDict["recoverable"]

# previous version of the code did not have dictionary as
# file_metadata
Expand All @@ -260,19 +267,40 @@ def monitor(self, context=None, ftsServer=None, ucert=None):
filesInfoList.remove(fileDict)
continue

file_error = fileDict["reason"]
# We know this is a multihop transfer, and we have a file_id,
# this means that this is the final hop, and that there are only
# 2 transfers in this job.
# If the FTS server has the "CancelUnusedMultihopFiles" settings enabled (default False)
# all NOT_USED files in a failed multihop job are canceled by FTS.
# In this case, we will never enter the following if statement.
# Otherwise, a multihop job that failed the first leg needs.

if isMultiHop and file_state == "Not_used":
# First, make sure that we only have 2 transfers, and the first one is failed
if len(orig_filesInfoList) != 2 or orig_filesInfoList[0]["file_state"].capitalize() != "Failed":
return S_ERROR(
errno.EDEADLK,
f"Multihop job {self.ftsGUID} {self.status}has unexpected file states: {[f['file_state'] for f in orig_filesInfoList]}",
)
# Next, get the file_state, file_error and recoverable state from the first leg, as it is not propagated to the second one
# Note that this will only change the values for the local loop, but not the values passed to _fillAccountingDicts.
# That's okay though in this specific case, because we know we are in a failed status, and we will effectively only count
# one failed transfer.
file_state = orig_filesInfoList[0]["file_state"].capitalize()
file_error = orig_filesInfoList[0]["reason"]
is_recoverable = orig_filesInfoList[0].get("recoverable", True)

filesStatus[file_id] = {"status": file_state, "error": file_error}

# If the state of the file is final for FTS, set ftsGUID of the file to None,
# such that it is "released" from this job and not updated anymore in future
# monitoring calls
if file_state in FTS3File.FTS_FINAL_STATES:
filesStatus[file_id]["ftsGUID"] = None
# TODO: update status to defunct if not recoverable here ?

# If the file is failed, check if it is recoverable
if file_state in FTS3File.FTS_FAILED_STATES:
if not fileDict.get("Recoverable", True):
if not is_recoverable:
filesStatus[file_id]["status"] = "Defunct"

# If the file is not in a final state, but the job is, we return an error
Expand Down Expand Up @@ -643,6 +671,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
"sourceSE": self.sourceSE,
"targetSE": self.targetSE,
"useTokens": useTokens, # Store the information here to propagate it to submission
"isMultiHop": isMultiHop,
}

if self.activity:
Expand Down Expand Up @@ -994,6 +1023,10 @@ def _fillAccountingDicts(self, jobStatusDict):
if file_state in FTS3File.FTS_SUCCESS_STATES:
successfulFiles.append(fileDict)
else:
# Not that this will also catch the Not_used status
# in case the job is a multihop that failed on the
# first leg, but that's ok, because we just count
# the failed failes, we do not use their metadata
failedFiles.append(fileDict)

job_metadata = jobStatusDict["job_metadata"]
Expand Down
Loading