diff --git a/src/DIRAC/Resources/Computing/PoolComputingElement.py b/src/DIRAC/Resources/Computing/PoolComputingElement.py index fed710d4433..e09d1b5d26f 100644 --- a/src/DIRAC/Resources/Computing/PoolComputingElement.py +++ b/src/DIRAC/Resources/Computing/PoolComputingElement.py @@ -64,11 +64,8 @@ def __init__(self, ceUniqueID): self.pPool = None self.taskID = 0 self.processorsPerTask = {} - self.ram = ( - 1024 - * 1024 # Available RAM for the node, in MB. The default value is an arbitrary large value in case of no limit - ) self.ramPerTask = {} + self.ram = 0 # effectively this means "no limits" # This CE will effectively submit to another "Inner"CE # (by default to the InProcess CE) @@ -82,7 +79,7 @@ def _reset(self): self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors)) self.ceParameters["MaxTotalJobs"] = self.processors - if self.ceParameters.get("MaxRAM", 0): + if self.ceParameters.get("MaxRAM", 0): # if there's a limit, we set it self.ram = int(self.ceParameters["MaxRAM"]) # Indicates that the submission is done asynchronously # The result is not immediately available @@ -113,6 +110,7 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs): return S_OK(taskID) memoryForJob = self._getMemoryForJobs(kwargs) + if memoryForJob is None: self.taskResults[self.taskID] = S_ERROR("Not enough memory for the job") taskID = self.taskID @@ -201,7 +199,10 @@ def _getMemoryForJobs(self, kwargs): """ # # job requirements - requestedMemory = kwargs.get("MinRAM", 0) + requestedMemory = kwargs.get("MinRAM", kwargs.get("MaxRAM", 0)) + # if there's no limit, we just let it match the maximum + if not self.ram: + return max(requestedMemory, kwargs.get("MaxRAM", 0)) # # now check what the slot can provide # Do we have enough memory? @@ -258,7 +259,10 @@ def getCEStatus(self): result["AvailableProcessors"] = self.processors - processorsInUse # dealing with RAM result["UsedRAM"] = sum(self.ramPerTask.values()) - result["AvailableRAM"] = self.ram - sum(self.ramPerTask.values()) + if self.ram: + result["AvailableRAM"] = self.ram - sum(self.ramPerTask.values()) + else: + result["AvailableRAM"] = 0 return result diff --git a/src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py b/src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py index fc18fcd497d..cac93fa054d 100644 --- a/src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py +++ b/src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py @@ -200,13 +200,20 @@ def test_executeJob_wholeNode4(createAndDelete): @pytest.mark.slow -def test_executeJob_wholeNode8(createAndDelete): +@pytest.mark.parametrize( + "ce_parameters", + [ + ({"NumberOfProcessors": 8}), + ({"NumberOfProcessors": 8, "MaxRAM": 32000}), + ({"WholeNode": True, "NumberOfProcessors": 8, "MaxRAM": 32000}), + ], +) +def test_executeJob_wholeNode8(createAndDelete, ce_parameters): time.sleep(0.5) taskIDs = {} - ceParameters = {"WholeNode": True, "NumberOfProcessors": 8, "MaxRAM": 32000} ce = PoolComputingElement("TestPoolCE") - ce.setParameters(ceParameters) + ce.setParameters(ce_parameters) jobParams = {"mpTag": True, "numberOfProcessors": 2, "maxNumberOfProcessors": 2} result = ce.submitJob("testPoolCEJob_2.py", None, **jobParams) @@ -217,6 +224,8 @@ def test_executeJob_wholeNode8(createAndDelete): result = ce.getCEStatus() assert result["UsedProcessors"] == 2 + assert result["UsedRAM"] == 0 + assert result["AvailableRAM"] == ce_parameters.get("MaxRAM", 0) jobParams = {"mpTag": True, "numberOfProcessors": 1, "maxNumberOfProcessors": 3} result = ce.submitJob("testPoolCEJob_3.py", None, **jobParams) @@ -228,7 +237,7 @@ def test_executeJob_wholeNode8(createAndDelete): result = ce.getCEStatus() assert result["UsedProcessors"] == 5 assert result["UsedRAM"] == 0 - assert result["AvailableRAM"] == 32000 + assert result["AvailableRAM"] == ce_parameters.get("MaxRAM", 0) jobParams = {"numberOfProcessors": 2, "MinRAM": 4000, "MaxRAM": 8000} # This is same as asking for SP result = ce.submitJob("testPoolCEJob_4.py", None, **jobParams) @@ -240,7 +249,9 @@ def test_executeJob_wholeNode8(createAndDelete): result = ce.getCEStatus() assert result["UsedProcessors"] == 6 assert result["UsedRAM"] == 8000 - assert result["AvailableRAM"] == 24000 + assert result["AvailableRAM"] == ( + ce_parameters.get("MaxRAM") - result["UsedRAM"] if ce_parameters.get("MaxRAM") else 0 + ) jobParams = {"MinRAM": 8000, "MaxRAM": 8000} # This is same as asking for SP result = ce.submitJob("testPoolCEJob_5.py", None, **jobParams) @@ -252,19 +263,24 @@ def test_executeJob_wholeNode8(createAndDelete): result = ce.getCEStatus() assert result["UsedProcessors"] == 7 assert result["UsedRAM"] == 16000 - assert result["AvailableRAM"] == 16000 + assert result["AvailableRAM"] == ( + ce_parameters.get("MaxRAM") - result["UsedRAM"] if ce_parameters.get("MaxRAM") else 0 + ) - jobParams = {"MaxRAM": 24000} # This will fail + jobParams = {"MaxRAM": 24000} # This will fail for the case when the ce have set a RAM result = ce.submitJob("testPoolCEJob_6.py", None, **jobParams) assert result["OK"] is True taskID = result["Value"] assert taskID == 4 - taskIDs[taskID] = False + if ce_parameters.get("MaxRAM"): + assert ce.taskResults[taskID]["OK"] is False result = ce.getCEStatus() - assert result["UsedProcessors"] == 7 - assert result["UsedRAM"] == 16000 - assert result["AvailableRAM"] == 16000 + assert result["UsedProcessors"] == 7 if ce_parameters.get("MaxRAM") else 8 + assert result["UsedRAM"] == 16000 if ce_parameters.get("MaxRAM") else 40000 + assert result["AvailableRAM"] == ( + ce_parameters.get("MaxRAM") - result["UsedRAM"] if ce_parameters.get("MaxRAM") else 0 + ) # now trying again would fail jobParams = {"mpTag": True, "numberOfProcessors": 3}