diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 64d1411f..082ebdcd 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -51,6 +51,7 @@ jobs: sed -i "s/VAR_JENKINS_QUEUE/jenkins-queue_not_important/g" pilot.json sed -i "s/VAR_DIRAC_VERSION/${{ matrix.dirac_branch }}/g" pilot.json sed -i "s#VAR_CS#https://lbcertifdirac70.cern.ch:9135/Configuration/Server#g" pilot.json + sed -i "s#VAR_PREF_URL_PATTERN#.*\.cern\.ch/.*#g" sed -i "s#VAR_USERDN#/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=alboyer/CN=819281/CN=Alexandre Franck Boyer#g" pilot.json sed -i "s#VAR_USERDN_GRIDPP#${DIRACUSERDN_GRIDPP}#g" pilot.json g_job="testintegrationworkflow${GITHUB_JOB//-/}" @@ -107,6 +108,7 @@ jobs: sed -i "s/VAR_JENKINS_QUEUE/jenkins-queue_not_important/g" pilot.json sed -i "s/VAR_DIRAC_VERSION/${version}/g" pilot.json sed -i "s#VAR_CS#https://lbcertifdirac70.cern.ch:9135/Configuration/Server#g" pilot.json + sed -i "s#VAR_PREF_URL_PATTERN#.*\.cern\.ch/.*#g" sed -i "s#VAR_USERDN#/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=alboyer/CN=819281/CN=Alexandre Franck Boyer#g" pilot.json sed -i "s#VAR_USERDN_GRIDPP#${DIRACUSERDN_GRIDPP}#g" pilot.json g_job="testintegrationworkflow${GITHUB_JOB//-/}" @@ -158,6 +160,7 @@ jobs: sed -i "s/VAR_JENKINS_QUEUE/jenkins-queue_not_important/g" pilot.json sed -i "s/VAR_DIRAC_VERSION/${version}/g" pilot.json sed -i "s#VAR_CS#https://lbcertifdirac70.cern.ch:9135/Configuration/Server#g" pilot.json + sed -i "s#VAR_PREF_URL_PATTERN#.*\.cern\.ch/.*#g" sed -i "s#VAR_USERDN#/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=alboyer/CN=819281/CN=Alexandre Franck Boyer#g" pilot.json sed -i "s#VAR_USERDN_GRIDPP#${DIRACUSERDN_GRIDPP}#g" pilot.json g_job="testintegrationworkflow${GITHUB_JOB//-/}" @@ -205,6 +208,7 @@ jobs: sed -i "s/VAR_JENKINS_QUEUE/jenkins-queue_not_important/g" pilot.json sed -i "s/VAR_DIRAC_VERSION/integration/g" pilot.json sed -i "s#VAR_CS#https://lbcertifdirac70.cern.ch:9135/Configuration/Server#g" pilot.json + sed -i "s#VAR_PREF_URL_PATTERN#.*\.cern\.ch/.*#g" sed -i "s#VAR_USERDN#/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=alboyer/CN=819281/CN=Alexandre Franck Boyer#g" pilot.json sed -i "s#VAR_USERDN_GRIDPP#${DIRACUSERDN_GRIDPP}#g" pilot.json g_job="testintegrationworkflow${GITHUB_JOB//-/}" @@ -260,6 +264,7 @@ jobs: sed -i "s/VAR_JENKINS_QUEUE/jenkins-queue_not_important/g" pilot.json sed -i "s/VAR_DIRAC_VERSION/${version}/g" pilot.json sed -i "s#VAR_CS#https://lbcertifdirac70.cern.ch:9135/Configuration/Server#g" pilot.json + sed -i "s#VAR_PREF_URL_PATTERN#.*\.cern\.ch/.*#g" sed -i "s#VAR_USERDN#/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=alboyer/CN=819281/CN=Alexandre Franck Boyer#g" pilot.json sed -i "s#VAR_USERDN_GRIDPP#${DIRACUSERDN_GRIDPP}#g" pilot.json g_job="testintegrationworkflow${GITHUB_JOB//-/}" @@ -311,6 +316,7 @@ jobs: sed -i "s/VAR_JENKINS_QUEUE/jenkins-queue_not_important/g" pilot.json sed -i "s/VAR_DIRAC_VERSION/${version}/g" pilot.json sed -i "s#VAR_CS#https://lbcertifdirac70.cern.ch:9135/Configuration/Server#g" pilot.json + sed -i "s#VAR_PREF_URL_PATTERN#.*\.cern\.ch/.*#g" sed -i "s#VAR_USERDN#/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=alboyer/CN=819281/CN=Alexandre Franck Boyer#g" pilot.json sed -i "s#VAR_USERDN_GRIDPP#${DIRACUSERDN_GRIDPP}#g" pilot.json g_job="testintegrationworkflow${GITHUB_JOB//-/}" @@ -373,6 +379,7 @@ jobs: sed -i "s/VAR_JENKINS_QUEUE/jenkins-queue_not_important/g" pilot.json sed -i "s/VAR_DIRAC_VERSION/${version}/g" pilot.json sed -i "s#VAR_CS#https://lbcertifdirac70.cern.ch:9135/Configuration/Server#g" pilot.json + sed -i "s#VAR_PREF_URL_PATTERN#.*\.cern\.ch/.*#g" sed -i "s#VAR_USERDN#/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=alboyer/CN=819281/CN=Alexandre Franck Boyer#g" pilot.json sed -i "s#VAR_USERDN_GRIDPP#${DIRACUSERDN_GRIDPP}#g" pilot.json g_job="testintegrationworkflow${GITHUB_JOB//-/}" @@ -424,6 +431,7 @@ jobs: sed -i "s/VAR_JENKINS_QUEUE/jenkins-queue_not_important/g" pilot.json sed -i "s/VAR_DIRAC_VERSION/${version}/g" pilot.json sed -i "s#VAR_CS#https://lbcertifdirac70.cern.ch:9135/Configuration/Server#g" pilot.json + sed -i "s#VAR_PREF_URL_PATTERN#.*\.cern\.ch/.*#g" sed -i "s#VAR_USERDN#/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=alboyer/CN=819281/CN=Alexandre Franck Boyer#g" pilot.json sed -i "s#VAR_USERDN_GRIDPP#${DIRACUSERDN_GRIDPP}#g" pilot.json g_job="testintegrationworkflow${GITHUB_JOB//-/}" @@ -474,6 +482,7 @@ jobs: sed -i "s/VAR_JENKINS_QUEUE/jenkins-queue_not_important/g" pilot.json sed -i "s/VAR_DIRAC_VERSION/${version}/g" pilot.json sed -i "s#VAR_CS#https://lbcertifdirac70.cern.ch:9135/Configuration/Server#g" pilot.json + sed -i "s#VAR_PREF_URL_PATTERN#.*\.cern\.ch/.*#g" sed -i "s#VAR_USERDN#/DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=alboyer/CN=819281/CN=Alexandre Franck Boyer#g" pilot.json sed -i "s#VAR_USERDN_GRIDPP#${DIRACUSERDN_GRIDPP}#g" pilot.json g_job="testintegrationworkflow${GITHUB_JOB//-/}" diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 8afe0f62..578463c5 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -2,10 +2,12 @@ from __future__ import absolute_import, division, print_function +import copy import fcntl import getopt import json import os +import random import re import select import signal @@ -85,7 +87,11 @@ def load_module_from_path(module_name, path_to_module): IsADirectoryError = IOError # Timer 2.7 and < 3.3 versions issue where Timer is a function -if sys.version_info.major == 2 or sys.version_info.major == 3 and sys.version_info.minor < 3: +if ( + sys.version_info.major == 2 + or sys.version_info.major == 3 + and sys.version_info.minor < 3 +): from threading import _Timer as Timer # pylint: disable=no-name-in-module else: from threading import Timer @@ -98,7 +104,9 @@ def parseVersion(releaseVersion): :param str releaseVersion: The software version to use """ - VERSION_PATTERN = re.compile(r"^(?:v)?(\d+)[r\.](\d+)(?:[p\.](\d+))?(?:(?:-pre|a)?(\d+))?$") + VERSION_PATTERN = re.compile( + r"^(?:v)?(\d+)[r\.](\d+)(?:[p\.](\d+))?(?:(?:-pre|a)?(\d+))?$" + ) match = VERSION_PATTERN.match(releaseVersion) # If the regex fails just return the original version @@ -188,11 +196,17 @@ def retrieveUrlTimeout(url, fileName, log, timeout=0): signal.alarm(0) return False except URLError: - log.error('Timeout after %s seconds on transfer request for "%s"' % (str(timeout), url)) + log.error( + 'Timeout after %s seconds on transfer request for "%s"' + % (str(timeout), url) + ) return False except Exception as x: if x == "Timeout": - log.error('Timeout after %s seconds on transfer request for "%s"' % (str(timeout), url)) + log.error( + 'Timeout after %s seconds on transfer request for "%s"' + % (str(timeout), url) + ) if timeout: signal.alarm(0) raise x @@ -274,7 +288,9 @@ def getSubmitterInfo(ceName): if "SGE_TASK_ID" in os.environ: batchSystemType = "SGE" batchSystemJobID = os.environ["JOB_ID"] - batchSystemParameters["BinaryPath"] = os.environ.get("SGE_BINARY_PATH", "Unknown") + batchSystemParameters["BinaryPath"] = os.environ.get( + "SGE_BINARY_PATH", "Unknown" + ) batchSystemParameters["Queue"] = os.environ.get("QUEUE", "Unknown") flavour = "SSH%s" % batchSystemType @@ -307,7 +323,12 @@ def getSubmitterInfo(ceName): batchSystemParameters["InfoPath"] = os.environ["_CONDOR_JOB_AD"] flavour = "SSH%s" % batchSystemType - pilotReference = "sshcondor://" + ceName + "/" + os.environ.get("CONDOR_JOBID", pilotReference) + pilotReference = ( + "sshcondor://" + + ceName + + "/" + + os.environ.get("CONDOR_JOBID", pilotReference) + ) # # Local/SSH @@ -325,7 +346,12 @@ def getSubmitterInfo(ceName): if "SSHBATCH_JOBID" in os.environ and "SSH_NODE_HOST" in os.environ: flavour = "SSHBATCH" pilotReference = ( - "sshbatchhost://" + ceName + "/" + os.environ["SSH_NODE_HOST"] + "/" + os.environ["SSHBATCH_JOBID"] + "sshbatchhost://" + + ceName + + "/" + + os.environ["SSH_NODE_HOST"] + + "/" + + os.environ["SSHBATCH_JOBID"] ) # # CEs @@ -348,17 +374,50 @@ def getSubmitterInfo(ceName): return ( flavour, pilotReference, - {"Type": batchSystemType, "JobID": batchSystemJobID, "Parameters": batchSystemParameters}, + { + "Type": batchSystemType, + "JobID": batchSystemJobID, + "Parameters": batchSystemParameters, + }, ) +def _findURLPriority(preferredURLPatterns, url): + """Find which preferred URL pattern the URL matches. + + :param str preferredURLPatterns: patterns to check in ranked order + :param str url: URL to check + + :return: int -- index of the pattern that matched, smallest is the most preferred + """ + for i, pattern in enumerate(preferredURLPatterns): + if re.match(pattern, url): + return i + return len(preferredURLPatterns) + + +def orderCSByPatterns(configServerList, preferredURLPatterns): + """enters a list of CS URLs, and a list of strings with preferredURLPatterns""" + csList = [] + preferredURLPatterns = [re.compile(pattern) for pattern in preferredURLPatterns] + urlGroups = [set() for _ in range(len(preferredURLPatterns) + 1)] + for cs in copy.deepcopy(configServerList): + urlGroups[_findURLPriority(preferredURLPatterns, cs)].add(cs) + for urlGroup in urlGroups: + random.shuffle(list(urlGroup)) + csList.extend(urlGroup) + return csList + + def getFlavour(ceName): """Old method to get the flavour of the pilot. Deprecated. Please use getSubmitterInfo instead. """ warnings.warn( - "getFlavour() is deprecated. Please use getSubmitterInfo() instead.", category=DeprecationWarning, stacklevel=2 + "getFlavour() is deprecated. Please use getSubmitterInfo() instead.", + category=DeprecationWarning, + stacklevel=2, ) flavour, pilotReference, _ = getSubmitterInfo(ceName) return flavour, pilotReference @@ -387,7 +446,9 @@ def loadModule(self, modName, hideExceptions=False): if rootModule: impName = "%s.%s" % (rootModule, impName) self.log.debug("Trying to load %s" % impName) - module, parentPath = self.__recurseImport(impName, hideExceptions=hideExceptions) + module, parentPath = self.__recurseImport( + impName, hideExceptions=hideExceptions + ) # Error. Something cannot be imported. Return error if module is None: return None, None @@ -409,13 +470,18 @@ def __recurseImport(self, modName, parentModule=None, hideExceptions=False): except ImportError as excp: if str(excp).find("No module named %s" % modName[0]) == 0: return None, None - errMsg = "Can't load %s in %s" % (".".join(modName), parentModule.__path__[0]) + errMsg = "Can't load %s in %s" % ( + ".".join(modName), + parentModule.__path__[0], + ) if not hideExceptions: self.log.exception(errMsg) return None, None if len(modName) == 1: return impModule, parentModule.__path__[0] - return self.__recurseImport(modName[1:], impModule, hideExceptions=hideExceptions) + return self.__recurseImport( + modName[1:], impModule, hideExceptions=hideExceptions + ) def loadObject(self, package, moduleName, command): """Load an object from inside a module""" @@ -483,7 +549,9 @@ def __outputMessage(self, msg, level, header): with open(self.out, "a") as outputFile: for _line in str(msg).split("\n"): if header: - outLine = self.messageTemplate.format(level=level, message=_line) + outLine = self.messageTemplate.format( + level=level, message=_line + ) print(outLine) if self.out: outputFile.write(outLine + "\n") @@ -539,7 +607,9 @@ def __init__( self.wnVO = wnVO self.isPilotLoggerOn = isPilotLoggerOn sendToURL = partial(sendMessage, url, pilotUUID, wnVO, "sendMessage") - self.buffer = FixedSizeBuffer(sendToURL, bufsize=bufsize, autoflush=flushInterval) + self.buffer = FixedSizeBuffer( + sendToURL, bufsize=bufsize, autoflush=flushInterval + ) def debug(self, msg, header=True, _sendPilotLog=False): # TODO: Send pilot log remotely? @@ -710,7 +780,9 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage): context.load_cert_chain(cert) # this is a proxy raw_data = {"method": method, "args": message} except IsADirectoryError: # assuming it'a dir containing cert and key - context.load_cert_chain(os.path.join(cert, "hostcert.pem"), os.path.join(cert, "hostkey.pem")) + context.load_cert_chain( + os.path.join(cert, "hostcert.pem"), os.path.join(cert, "hostkey.pem") + ) raw_data = {"method": method, "args": message, "extraCredentials": '"hosts"'} if sys.version_info.major == 3: @@ -770,7 +842,12 @@ def executeAndGetOutput(self, cmd, environDict=None): self.log.info("Executing command %s" % cmd) _p = subprocess.Popen( - cmd, shell=True, env=environDict, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False + cmd, + shell=True, + env=environDict, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=False, ) # Use non-blocking I/O on the process pipes @@ -793,11 +870,13 @@ def executeAndGetOutput(self, cmd, environDict=None): outChunk = outChunk.decode("utf-8") # Strip unicode replacement characters # Ensure correct type conversion in Python 2 - outChunk = str(outChunk.replace(u"\ufffd", "")) + outChunk = str(outChunk.replace("\ufffd", "")) # Avoid potential str() issues in Py2 outChunk = unicode(outChunk) # pylint: disable=undefined-variable else: - outChunk = str(outChunk.replace("\ufffd", "")) # Python 3: Ensure it's a string + outChunk = str( + outChunk.replace("\ufffd", "") + ) # Python 3: Ensure it's a string if stream == _p.stderr: sys.stderr.write(outChunk) @@ -852,7 +931,12 @@ def forkAndExecute(self, cmd, logFile, environDict=None): with open(logFile, "a+", 0) as fpLogFile: try: _p = subprocess.Popen( - "%s" % cmd, shell=True, env=environDict, close_fds=False, stdout=fpLogFile, stderr=fpLogFile + "%s" % cmd, + shell=True, + env=environDict, + close_fds=False, + stdout=fpLogFile, + stderr=fpLogFile, ) # return code @@ -963,7 +1047,9 @@ def __init__(self): # Set number of allocatable processors from MJF if available try: - self.pilotProcessors = int(urlopen(os.path.join(os.environ["JOBFEATURES"], "allocated_cpu")).read()) + self.pilotProcessors = int( + urlopen(os.path.join(os.environ["JOBFEATURES"], "allocated_cpu")).read() + ) except Exception: self.pilotProcessors = 1 @@ -980,7 +1066,11 @@ def __init__(self): ("l:", "project=", "Project to install"), ("n:", "name=", "Set as Site Name"), ("o:", "option=", "Option=value to add"), - ("m:", "maxNumberOfProcessors=", "specify a max number of processors to use by the payload inside a pilot"), + ( + "m:", + "maxNumberOfProcessors=", + "specify a max number of processors to use by the payload inside a pilot", + ), ("", "modules=", "for installing non-released code"), ( "", @@ -1003,7 +1093,11 @@ def __init__(self): ("K:", "certLocation=", "Specify server certificate location"), ("M:", "MaxCycles=", "Maximum Number of JobAgent cycles to run"), ("", "PollingTime=", "JobAgent execution frequency"), - ("", "StopOnApplicationFailure=", "Stop Job Agent when encounter an application failure"), + ( + "", + "StopOnApplicationFailure=", + "Stop Job Agent when encounter an application failure", + ), ("", "StopAfterFailedMatches=", "Stop Job Agent after N failed matches"), ("N:", "Name=", "CE Name"), ("O:", "OwnerDN=", "Pilot OwnerDN (for private pilots)"), @@ -1013,12 +1107,20 @@ def __init__(self): ("R:", "reference=", "Use this pilot reference"), ("S:", "setup=", "DIRAC Setup to use"), ("T:", "CPUTime=", "Requested CPU Time"), - ("W:", "gateway=", "Configure as DIRAC Gateway during installation"), + ( + "W:", + "gateway=", + "Configure as DIRAC Gateway during installation", + ), ("X:", "commands=", "Pilot commands to execute"), ("Z:", "commandOptions=", "Options parsed by command modules"), ("", "pilotUUID=", "pilot UUID"), ("", "preinstalledEnv=", "preinstalled pilot environment script location"), - ("", "preinstalledEnvPrefix=", "preinstalled pilot environment area prefix"), + ( + "", + "preinstalledEnvPrefix=", + "preinstalled pilot environment area prefix", + ), ("", "architectureScript=", "architecture script to use"), ("", "CVMFS_locations=", "comma-separated list of CVMS locations"), ) @@ -1094,7 +1196,8 @@ def __checkSecurityDir(self, envName, dirName): # If so, just return if envName in os.environ and safe_listdir(os.environ[envName]): self.log.debug( - "%s is set in the host environment as %s, aligning installEnv to it" % (envName, os.environ[envName]) + "%s is set in the host environment as %s, aligning installEnv to it" + % (envName, os.environ[envName]) ) else: # None of the candidates exists, stop the program. @@ -1105,7 +1208,9 @@ def __initCommandLine1(self): """Parses and interpret options on the command line: first pass (essential things)""" self.optList, __args__ = getopt.getopt( - sys.argv[1:], "".join([opt[0] for opt in self.cmdOpts]), [opt[1] for opt in self.cmdOpts] + sys.argv[1:], + "".join([opt[0] for opt in self.cmdOpts]), + [opt[1] for opt in self.cmdOpts], ) self.log.debug("Options list: %s" % self.optList) for o, v in self.optList: @@ -1131,7 +1236,9 @@ def __initCommandLine2(self): """ self.optList, __args__ = getopt.getopt( - sys.argv[1:], "".join([opt[0] for opt in self.cmdOpts]), [opt[1] for opt in self.cmdOpts] + sys.argv[1:], + "".join([opt[0] for opt in self.cmdOpts]), + [opt[1] for opt in self.cmdOpts], ) for o, v in self.optList: if o == "-E" or o == "--commandExtensions": @@ -1140,7 +1247,9 @@ def __initCommandLine2(self): self.commands = v.split(",") elif o == "-Z" or o == "--commandOptions": for i in v.split(","): - self.commandOptions[i.split("=", 1)[0].strip()] = i.split("=", 1)[1].strip() + self.commandOptions[i.split("=", 1)[0].strip()] = i.split("=", 1)[ + 1 + ].strip() elif o == "-e" or o == "--extraPackages": self.extensions = v.split(",") elif o == "-n" or o == "--name": @@ -1259,27 +1368,40 @@ def __initJSON2(self): self.pilotLogging = pilotLogging.upper() == "TRUE" self.loggerURL = pilotOptions.get("RemoteLoggerURL") # logger buffer flush interval in seconds. - self.loggerTimerInterval = int(pilotOptions.get("RemoteLoggerTimerInterval", self.loggerTimerInterval)) + self.loggerTimerInterval = int( + pilotOptions.get("RemoteLoggerTimerInterval", self.loggerTimerInterval) + ) # logger buffer size in lines: - self.loggerBufsize = max(1, int(pilotOptions.get("RemoteLoggerBufsize", self.loggerBufsize))) + self.loggerBufsize = max( + 1, int(pilotOptions.get("RemoteLoggerBufsize", self.loggerBufsize)) + ) # logger CE white list loggerCEsWhiteList = pilotOptions.get("RemoteLoggerCEsWhiteList") # restrict remote logging to whitelisted CEs ([] or None => no restriction) self.log.debug("JSON: Remote logging CE white list: %s" % loggerCEsWhiteList) if loggerCEsWhiteList is not None: if not isinstance(loggerCEsWhiteList, list): - loggerCEsWhiteList = [elem.strip() for elem in loggerCEsWhiteList.split(",")] + loggerCEsWhiteList = [ + elem.strip() for elem in loggerCEsWhiteList.split(",") + ] if self.ceName not in loggerCEsWhiteList: self.pilotLogging = False - self.log.debug("JSON: Remote logging disabled for this CE: %s" % self.ceName) + self.log.debug( + "JSON: Remote logging disabled for this CE: %s" % self.ceName + ) pilotLogLevel = pilotOptions.get("PilotLogLevel", "INFO") if pilotLogLevel.lower() == "debug": self.debugFlag = True self.log.debug("JSON: Remote logging: %s" % self.pilotLogging) self.log.debug("JSON: Remote logging URL: %s" % self.loggerURL) - self.log.debug("JSON: Remote logging buffer flush interval in sec.(0: disabled): %s" % self.loggerTimerInterval) + self.log.debug( + "JSON: Remote logging buffer flush interval in sec.(0: disabled): %s" + % self.loggerTimerInterval + ) self.log.debug("JSON: Remote/local logging debug flag: %s" % self.debugFlag) - self.log.debug("JSON: Remote logging buffer size (lines): %s" % self.loggerBufsize) + self.log.debug( + "JSON: Remote logging buffer size (lines): %s" % self.loggerBufsize + ) # CE type if present, then Defaults, otherwise as defined in the code: if "Commands" in pilotOptions: @@ -1291,7 +1413,9 @@ def __initJSON2(self): else: # TODO: This is a workaround until the pilot JSON syncroniser is fixed self.commands = [elem.strip() for elem in commands.split(",")] - self.log.debug("Selecting commands from JSON for Grid CE type %s" % key) + self.log.debug( + "Selecting commands from JSON for Grid CE type %s" % key + ) break else: key = "CodeDefaults" @@ -1301,10 +1425,20 @@ def __initJSON2(self): # Command extensions for the commands above: commandExtOptions = pilotOptions.get("CommandExtensions") if commandExtOptions: - self.commandExtensions = [elem.strip() for elem in commandExtOptions.split(",")] + self.commandExtensions = [ + elem.strip() for elem in commandExtOptions.split(",") + ] # Configuration server (the synchroniser looks into gConfig.getServersList(), as before # the generic one (a list): - self.configServer = ",".join([str(pv).strip() for pv in self.pilotJSON["ConfigurationServers"]]) + self.configServer = ",".join( + [str(pv).strip() for pv in self.pilotJSON["ConfigurationServers"]] + ) + + preferredURLPatterns = self.pilotJSON.get("PreferredURLPatterns") + if preferredURLPatterns: + self.configServer = orderCSByPatterns( + self.configServer, preferredURLPatterns + ) # version(a comma separated values in a string). We take the first one. (the default value defined in the code) dVersion = pilotOptions.get("Version", self.releaseVersion) @@ -1314,13 +1448,19 @@ def __initJSON2(self): else: self.log.warn("Could not find a version in the JSON file configuration") - self.log.debug("Version: %s -> (release) %s" % (str(dVersion), self.releaseVersion)) + self.log.debug( + "Version: %s -> (release) %s" % (str(dVersion), self.releaseVersion) + ) - self.releaseProject = pilotOptions.get("Project", self.releaseProject) # default from the code. + self.releaseProject = pilotOptions.get( + "Project", self.releaseProject + ) # default from the code. self.log.debug("Release project: %s" % self.releaseProject) if "CVMFS_locations" in pilotOptions: - self.CVMFS_locations = pilotOptions["CVMFS_locations"].replace(" ", "").split(",") + self.CVMFS_locations = ( + pilotOptions["CVMFS_locations"].replace(" ", "").split(",") + ) self.log.debug("CVMFS locations: %s" % self.CVMFS_locations) def getPilotOptionsDict(self): @@ -1358,7 +1498,10 @@ def __getVO(self): with open(cert, "rb") as fp: return getVO(fp.read()) except IOError as err: - self.log.error("Could not read a proxy, setting vo to 'unknown': %s" % os.strerror(err.errno)) + self.log.error( + "Could not read a proxy, setting vo to 'unknown': %s" + % os.strerror(err.errno) + ) else: self.log.error("Could not locate a proxy via X509_USER_PROXY") @@ -1455,46 +1598,81 @@ def __initJSON(self): # Commands first # FIXME: pilotSynchronizer() should publish these as comma-separated lists. We are ready for that. try: - if isinstance(self.pilotJSON["Setups"][self.setup]["Commands"][self.gridCEType], basestring): + if isinstance( + self.pilotJSON["Setups"][self.setup]["Commands"][self.gridCEType], + basestring, + ): self.commands = [ str(pv).strip() - for pv in self.pilotJSON["Setups"][self.setup]["Commands"][self.gridCEType].split(",") + for pv in self.pilotJSON["Setups"][self.setup]["Commands"][ + self.gridCEType + ].split(",") ] else: self.commands = [ - str(pv).strip() for pv in self.pilotJSON["Setups"][self.setup]["Commands"][self.gridCEType] + str(pv).strip() + for pv in self.pilotJSON["Setups"][self.setup]["Commands"][ + self.gridCEType + ] ] except KeyError: try: - if isinstance(self.pilotJSON["Setups"][self.setup]["Commands"]["Defaults"], basestring): + if isinstance( + self.pilotJSON["Setups"][self.setup]["Commands"]["Defaults"], + basestring, + ): self.commands = [ str(pv).strip() - for pv in self.pilotJSON["Setups"][self.setup]["Commands"]["Defaults"].split(",") + for pv in self.pilotJSON["Setups"][self.setup]["Commands"][ + "Defaults" + ].split(",") ] else: self.commands = [ - str(pv).strip() for pv in self.pilotJSON["Setups"][self.setup]["Commands"]["Defaults"] + str(pv).strip() + for pv in self.pilotJSON["Setups"][self.setup]["Commands"][ + "Defaults" + ] ] except KeyError: try: - if isinstance(self.pilotJSON["Setups"]["Defaults"]["Commands"][self.gridCEType], basestring): + if isinstance( + self.pilotJSON["Setups"]["Defaults"]["Commands"][ + self.gridCEType + ], + basestring, + ): self.commands = [ str(pv).strip() - for pv in self.pilotJSON["Setups"]["Defaults"]["Commands"][self.gridCEType].split(",") + for pv in self.pilotJSON["Setups"]["Defaults"]["Commands"][ + self.gridCEType + ].split(",") ] else: self.commands = [ - str(pv).strip() for pv in self.pilotJSON["Setups"]["Defaults"]["Commands"][self.gridCEType] + str(pv).strip() + for pv in self.pilotJSON["Setups"]["Defaults"]["Commands"][ + self.gridCEType + ] ] except KeyError: try: - if isinstance(self.pilotJSON["Defaults"]["Commands"]["Defaults"], basestring): + if isinstance( + self.pilotJSON["Defaults"]["Commands"]["Defaults"], + basestring, + ): self.commands = [ - str(pv).strip() for pv in self.pilotJSON["Defaults"]["Commands"]["Defaults"].split(",") + str(pv).strip() + for pv in self.pilotJSON["Defaults"]["Commands"][ + "Defaults" + ].split(",") ] else: self.commands = [ - str(pv).strip() for pv in self.pilotJSON["Defaults"]["Commands"]["Defaults"] + str(pv).strip() + for pv in self.pilotJSON["Defaults"]["Commands"][ + "Defaults" + ] ] except KeyError: pass @@ -1507,27 +1685,38 @@ def __initJSON(self): self.pilotJSON["Setups"][self.setup]["CommandExtensions"], basestring ): # In the specific setup? self.commandExtensions = [ - str(pv).strip() for pv in self.pilotJSON["Setups"][self.setup]["CommandExtensions"].split(",") + str(pv).strip() + for pv in self.pilotJSON["Setups"][self.setup][ + "CommandExtensions" + ].split(",") ] else: self.commandExtensions = [ - str(pv).strip() for pv in self.pilotJSON["Setups"][self.setup]["CommandExtensions"] + str(pv).strip() + for pv in self.pilotJSON["Setups"][self.setup]["CommandExtensions"] ] except KeyError: try: if isinstance( - self.pilotJSON["Setups"]["Defaults"]["CommandExtensions"], basestring + self.pilotJSON["Setups"]["Defaults"]["CommandExtensions"], + basestring, ): # Or in the defaults section? self.commandExtensions = [ - str(pv).strip() for pv in self.pilotJSON["Setups"]["Defaults"]["CommandExtensions"].split(",") + str(pv).strip() + for pv in self.pilotJSON["Setups"]["Defaults"][ + "CommandExtensions" + ].split(",") ] else: self.commandExtensions = [ - str(pv).strip() for pv in self.pilotJSON["Setups"]["Defaults"]["CommandExtensions"] + str(pv).strip() + for pv in self.pilotJSON["Setups"]["Defaults"][ + "CommandExtensions" + ] ] except KeyError: pass - self.log.debug("Commands extesions: %s" % self.commandExtensions) + self.log.debug("Commands extensions: %s" % self.commandExtensions) # CS URL(s) # pilotSynchronizer() can publish this as a comma separated list. We are ready for that @@ -1536,10 +1725,15 @@ def __initJSON(self): self.pilotJSON["ConfigurationServers"], basestring ): # Generic, there may also be setup-specific ones self.configServer = ",".join( - [str(pv).strip() for pv in self.pilotJSON["ConfigurationServers"].split(",")] + [ + str(pv).strip() + for pv in self.pilotJSON["ConfigurationServers"].split(",") + ] ) else: # it's a list, we suppose - self.configServer = ",".join([str(pv).strip() for pv in self.pilotJSON["ConfigurationServers"]]) + self.configServer = ",".join( + [str(pv).strip() for pv in self.pilotJSON["ConfigurationServers"]] + ) except KeyError: pass try: # now trying to see if there is setup-specific ones @@ -1547,39 +1741,71 @@ def __initJSON(self): self.pilotJSON["Setups"][self.setup]["ConfigurationServer"], basestring ): # In the specific setup? self.configServer = ",".join( - [str(pv).strip() for pv in self.pilotJSON["Setups"][self.setup]["ConfigurationServer"].split(",")] + [ + str(pv).strip() + for pv in self.pilotJSON["Setups"][self.setup][ + "ConfigurationServer" + ].split(",") + ] ) else: # it's a list, we suppose self.configServer = ",".join( - [str(pv).strip() for pv in self.pilotJSON["Setups"][self.setup]["ConfigurationServer"]] + [ + str(pv).strip() + for pv in self.pilotJSON["Setups"][self.setup][ + "ConfigurationServer" + ] + ] ) except KeyError: # and if it doesn't exist try: if isinstance( - self.pilotJSON["Setups"]["Defaults"]["ConfigurationServer"], basestring + self.pilotJSON["Setups"]["Defaults"]["ConfigurationServer"], + basestring, ): # Is there one in the defaults section? self.configServer = ",".join( [ str(pv).strip() - for pv in self.pilotJSON["Setups"]["Defaults"]["ConfigurationServer"].split(",") + for pv in self.pilotJSON["Setups"]["Defaults"][ + "ConfigurationServer" + ].split(",") ] ) else: # it's a list, we suppose self.configServer = ",".join( - [str(pv).strip() for pv in self.pilotJSON["Setups"]["Defaults"]["ConfigurationServer"]] + [ + str(pv).strip() + for pv in self.pilotJSON["Setups"]["Defaults"][ + "ConfigurationServer" + ] + ] ) except KeyError: pass + + preferredURLPatterns = self.pilotJSON.get("preferredURLPatterns") + if preferredURLPatterns: + self.configServer = orderCSByPatterns( + self.configServer, preferredURLPatterns + ) self.log.debug("CS list: %s" % self.configServer) # Version # There may be a list of versions specified (in a string, comma separated). We just want the first one. dVersion = None try: - dVersion = [dv.strip() for dv in self.pilotJSON["Setups"][self.setup]["Version"].split(",", 1)] + dVersion = [ + dv.strip() + for dv in self.pilotJSON["Setups"][self.setup]["Version"].split(",", 1) + ] except KeyError: try: - dVersion = [dv.strip() for dv in self.pilotJSON["Setups"]["Defaults"]["Version"].split(",", 1)] + dVersion = [ + dv.strip() + for dv in self.pilotJSON["Setups"]["Defaults"]["Version"].split( + ",", 1 + ) + ] except KeyError: self.log.warn("Could not find a version in the JSON file configuration") if dVersion is not None: @@ -1590,7 +1816,9 @@ def __initJSON(self): self.releaseProject = str(self.pilotJSON["Setups"][self.setup]["Project"]) except KeyError: try: - self.releaseProject = str(self.pilotJSON["Setups"]["Defaults"]["Project"]) + self.releaseProject = str( + self.pilotJSON["Setups"]["Defaults"]["Project"] + ) except KeyError: pass self.log.debug("Release project: %s" % self.releaseProject) @@ -1610,7 +1838,9 @@ def __ceType(self): try: if not self.gridCEType: # We don't override a grid CEType given on the command line! - self.gridCEType = str(self.pilotJSON["CEs"][self.ceName]["GridCEType"]) + self.gridCEType = str( + self.pilotJSON["CEs"][self.ceName]["GridCEType"] + ) except KeyError: pass # This LocalCEType is like 'InProcess' or 'Pool' or 'Pool/Singularity' etc. @@ -1620,7 +1850,9 @@ def __ceType(self): except KeyError: pass try: - self.ceType = str(self.pilotJSON["CEs"][self.ceName][self.queueName]["LocalCEType"]) + self.ceType = str( + self.pilotJSON["CEs"][self.ceName][self.queueName]["LocalCEType"] + ) except KeyError: pass diff --git a/Pilot/tests/Test_Pilot.py b/Pilot/tests/Test_Pilot.py index 8a1b75a1..eb42020b 100644 --- a/Pilot/tests/Test_Pilot.py +++ b/Pilot/tests/Test_Pilot.py @@ -13,7 +13,7 @@ import unittest from Pilot.pilotCommands import CheckWorkerNode, ConfigureSite, NagiosProbes -from Pilot.pilotTools import PilotParams +from Pilot.pilotTools import PilotParams, orderCSByPatterns class PilotTestCase(unittest.TestCase): @@ -33,7 +33,12 @@ def setUp(self): "Version": "v1r1, v2r2", } }, - "CEs": {"grid1.example.com": {"GridCEType": "cetype1", "Site": "site.example.com"}}, + "CEs": { + "grid1.example.com": { + "GridCEType": "cetype1", + "Site": "site.example.com", + } + }, "DefaultSetup": "TestSetup", }, fp, @@ -69,7 +74,14 @@ class CommandsTestCase(PilotTestCase): def test_InitJSON(self): """Test the pilot.json and command line parsing""" - sys.argv[1:] = ["--Name", "grid1.example.com", "--commandOptions", "a=1,b=2", "-Z", "c=3"] + sys.argv[1:] = [ + "--Name", + "grid1.example.com", + "--commandOptions", + "a=1,b=2", + "-Z", + "c=3", + ] pp = PilotParams() self.assertEqual(pp.commands, ["x", "y", "z"]) @@ -93,7 +105,13 @@ def test_InitJSON(self): self.assertEqual(pp.commandOptions["b"], "2") self.assertEqual(pp.commandOptions["c"], "3") - sys.argv[1:] = ["--Name", "grid1.example.com", "--commandOptions=a = 1, b=2", "-Z", " c=3"] # spaces and '='' + sys.argv[1:] = [ + "--Name", + "grid1.example.com", + "--commandOptions=a = 1, b=2", + "-Z", + " c=3", + ] # spaces and '='' pp = PilotParams() self.assertEqual(pp.commandOptions["a"], "1") @@ -133,6 +151,57 @@ def test_NagiosProbes(self): self.assertEqual(nagios.nagiosProbes, ["Nagios1", "Nagios2"]) self.assertEqual(nagios.nagiosPutURL, "https://127.0.0.2/") + def test_orderCSByPatterns(self): + cs_list = orderCSByPatterns(["dips://some.site:9132"], ["dips://.*\.site:."]) + assert cs_list == ["dips://some.site:9132"] + + cs_list = orderCSByPatterns( + ["dips://some.anotherSite:9132", "dips://some.site:9132"], + ["dips://.*\.site:."], + ) + assert cs_list == ["dips://some.site:9132", "dips://some.anotherSite:9132"] + + cs_list = orderCSByPatterns( + [ + "dips://another.site:9132", + "dips://some.another:9132", + "dips://some.site:9132", + ], + ["dips://.*\.site:."], + ) + assert cs_list[-1] == "dips://some.another:9132" + + cs_list = orderCSByPatterns( + [ + "dips://another.site:9132", + "dips://some.site:9132", + "dips://some.anotherSite:9132", + "dips://anothe.anotherSite:9132", + ], + ["dips://.*\.site:."], + ) + assert set(cs_list[0:2]) == { + "dips://some.site:9132", + "dips://another.site:9132", + } + + cs_list = orderCSByPatterns( + [ + "dips://another.site:9132", + "dips://some.exclude:9132", + "dips://some.anotherSite:9132", + "dips://anothe.anotherSite:9132", + ], + ["^(?!.*some\.exclude).*dips*"], + ) + assert set(cs_list) == { + "dips://another.site:9132", + "dips://some.anotherSite:9132", + "dips://anothe.anotherSite:9132", + "dips://some.exclude:9132", + } + assert cs_list[-1] == "dips://some.exclude:9132" + ############################################################################# # Test Suite run diff --git a/tests/CI/pilot_newSchema.json b/tests/CI/pilot_newSchema.json index 0c8c1d32..cf3c1c1c 100644 --- a/tests/CI/pilot_newSchema.json +++ b/tests/CI/pilot_newSchema.json @@ -153,5 +153,8 @@ }, "ConfigurationServers": [ "VAR_CS" - ] + ], + "PreferredURLPatterns": [ + "VAR_PREF_URL_PATTERN" + ] }