From d2b508e390bc4786ca219819b5d935bdaca37a24 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Tue, 25 Nov 2025 18:13:18 +0100 Subject: [PATCH] feat (FTS3): support failure for the first leg of a multihop --- .../DataManagementSystem/Client/FTS3Job.py | 51 +++++++++++++++---- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index e7df97986cb..725ec96e0f1 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -202,6 +202,7 @@ def monitor(self, context=None, ftsServer=None, ucert=None): """ + isMultiHop = False if not self.ftsGUID: return S_ERROR("FTSGUID not set, FTS job not submitted?") @@ -221,6 +222,10 @@ def monitor(self, context=None, ftsServer=None, ucert=None): except FTS3ClientException as e: return S_ERROR(f"Error getting the job status {e}") + job_metadata = jobStatusDict["job_metadata"] + if isinstance(job_metadata, dict): + isMultiHop = job_metadata.get("isMultiHop", False) + now = datetime.datetime.utcnow().replace(microsecond=0) self.lastMonitor = now @@ -230,19 +235,21 @@ def monitor(self, context=None, ftsServer=None, ucert=None): self.lastUpdate = now self.error = jobStatusDict["reason"] - if newStatus in self.FINAL_STATES: - self._fillAccountingDicts(jobStatusDict) + # if newStatus in self.FINAL_STATES: + # self._fillAccountingDicts(jobStatusDict) filesInfoList = jobStatusDict["files"] + # Make a copy, since we are potentially + # deleting objects or editing + orig_filesInfoList = list(filesInfoList) # Make a copy filesStatus = {} statusSummary = {} - # Make a copy, since we are potentially - - # deleting objects - for fileDict in list(filesInfoList): + for fileDict in orig_filesInfoList: file_state = fileDict["file_state"].capitalize() file_metadata = fileDict["file_metadata"] + file_error = fileDict["reason"] + is_recoverable = fileDict["recoverable"] # previous version of the code did not have dictionary as # file_metadata @@ -260,7 +267,29 @@ def monitor(self, context=None, ftsServer=None, ucert=None): filesInfoList.remove(fileDict) continue - file_error = fileDict["reason"] + # We know this is a multihop transfer, and we have a file_id, + # this means that this is the final hop, and that there are only + # 2 transfers in this job. + # If the FTS server has the "CancelUnusedMultihopFiles" settings enabled (default False) + # all NOT_USED files in a failed multihop job are canceled by FTS. + # In this case, we will never enter the following if statement. + # Otherwise, a multihop job that failed the first leg needs. + + if isMultiHop and file_state == "Not_used": + # First, make sure that we only have 2 transfers, and the first one is failed + if len(orig_filesInfoList) != 2 or orig_filesInfoList[0]["file_state"].capitalize() != "Failed": + return S_ERROR( + errno.EDEADLK, + f"Multihop job {self.ftsGUID} {self.status}has unexpected file states: {[f['file_state'] for f in orig_filesInfoList]}", + ) + # Next, get the file_state, file_error and recoverable state from the first leg, as it is not propagated to the second one + # Note that this will only change the values for the local loop, but not the values passed to _fillAccountingDicts. + # That's okay though in this specific case, because we know we are in a failed status, and we will effectively only count + # one failed transfer. + file_state = orig_filesInfoList[0]["file_state"].capitalize() + file_error = orig_filesInfoList[0]["reason"] + is_recoverable = orig_filesInfoList[0].get("recoverable", True) + filesStatus[file_id] = {"status": file_state, "error": file_error} # If the state of the file is final for FTS, set ftsGUID of the file to None, @@ -268,11 +297,10 @@ def monitor(self, context=None, ftsServer=None, ucert=None): # monitoring calls if file_state in FTS3File.FTS_FINAL_STATES: filesStatus[file_id]["ftsGUID"] = None - # TODO: update status to defunct if not recoverable here ? # If the file is failed, check if it is recoverable if file_state in FTS3File.FTS_FAILED_STATES: - if not fileDict.get("Recoverable", True): + if not is_recoverable: filesStatus[file_id]["status"] = "Defunct" # If the file is not in a final state, but the job is, we return an error @@ -643,6 +671,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N "sourceSE": self.sourceSE, "targetSE": self.targetSE, "useTokens": useTokens, # Store the information here to propagate it to submission + "isMultiHop": isMultiHop, } if self.activity: @@ -994,6 +1023,10 @@ def _fillAccountingDicts(self, jobStatusDict): if file_state in FTS3File.FTS_SUCCESS_STATES: successfulFiles.append(fileDict) else: + # Not that this will also catch the Not_used status + # in case the job is a multihop that failed on the + # first leg, but that's ok, because we just count + # the failed failes, we do not use their metadata failedFiles.append(fileDict) job_metadata = jobStatusDict["job_metadata"]