Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions src/DIRAC/Resources/Computing/PoolComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ def __init__(self, ceUniqueID):
self.pPool = None
self.taskID = 0
self.processorsPerTask = {}
self.ram = (
1024
* 1024 # Available RAM for the node, in MB. The default value is an arbitrary large value in case of no limit
)
self.ramPerTask = {}
self.ram = 0 # effectively this means "no limits"

# This CE will effectively submit to another "Inner"CE
# (by default to the InProcess CE)
Expand All @@ -82,7 +79,7 @@ def _reset(self):

self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors))
self.ceParameters["MaxTotalJobs"] = self.processors
if self.ceParameters.get("MaxRAM", 0):
if self.ceParameters.get("MaxRAM", 0): # if there's a limit, we set it
self.ram = int(self.ceParameters["MaxRAM"])
# Indicates that the submission is done asynchronously
# The result is not immediately available
Expand Down Expand Up @@ -113,6 +110,7 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
return S_OK(taskID)

memoryForJob = self._getMemoryForJobs(kwargs)

if memoryForJob is None:
self.taskResults[self.taskID] = S_ERROR("Not enough memory for the job")
taskID = self.taskID
Expand Down Expand Up @@ -201,7 +199,10 @@ def _getMemoryForJobs(self, kwargs):
"""

# # job requirements
requestedMemory = kwargs.get("MinRAM", 0)
requestedMemory = kwargs.get("MinRAM", kwargs.get("MaxRAM", 0))
# if there's no limit, we just let it match the maximum
if not self.ram:
return max(requestedMemory, kwargs.get("MaxRAM", 0))

# # now check what the slot can provide
# Do we have enough memory?
Expand Down Expand Up @@ -258,7 +259,10 @@ def getCEStatus(self):
result["AvailableProcessors"] = self.processors - processorsInUse
# dealing with RAM
result["UsedRAM"] = sum(self.ramPerTask.values())
result["AvailableRAM"] = self.ram - sum(self.ramPerTask.values())
if self.ram:
result["AvailableRAM"] = self.ram - sum(self.ramPerTask.values())
else:
result["AvailableRAM"] = 0

return result

Expand Down
38 changes: 27 additions & 11 deletions src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,20 @@ def test_executeJob_wholeNode4(createAndDelete):


@pytest.mark.slow
def test_executeJob_wholeNode8(createAndDelete):
@pytest.mark.parametrize(
"ce_parameters",
[
({"NumberOfProcessors": 8}),
({"NumberOfProcessors": 8, "MaxRAM": 32000}),
({"WholeNode": True, "NumberOfProcessors": 8, "MaxRAM": 32000}),
],
)
def test_executeJob_wholeNode8(createAndDelete, ce_parameters):
time.sleep(0.5)
taskIDs = {}

ceParameters = {"WholeNode": True, "NumberOfProcessors": 8, "MaxRAM": 32000}
ce = PoolComputingElement("TestPoolCE")
ce.setParameters(ceParameters)
ce.setParameters(ce_parameters)

jobParams = {"mpTag": True, "numberOfProcessors": 2, "maxNumberOfProcessors": 2}
result = ce.submitJob("testPoolCEJob_2.py", None, **jobParams)
Expand All @@ -217,6 +224,8 @@ def test_executeJob_wholeNode8(createAndDelete):

result = ce.getCEStatus()
assert result["UsedProcessors"] == 2
assert result["UsedRAM"] == 0
assert result["AvailableRAM"] == ce_parameters.get("MaxRAM", 0)

jobParams = {"mpTag": True, "numberOfProcessors": 1, "maxNumberOfProcessors": 3}
result = ce.submitJob("testPoolCEJob_3.py", None, **jobParams)
Expand All @@ -228,7 +237,7 @@ def test_executeJob_wholeNode8(createAndDelete):
result = ce.getCEStatus()
assert result["UsedProcessors"] == 5
assert result["UsedRAM"] == 0
assert result["AvailableRAM"] == 32000
assert result["AvailableRAM"] == ce_parameters.get("MaxRAM", 0)

jobParams = {"numberOfProcessors": 2, "MinRAM": 4000, "MaxRAM": 8000} # This is same as asking for SP
result = ce.submitJob("testPoolCEJob_4.py", None, **jobParams)
Expand All @@ -240,7 +249,9 @@ def test_executeJob_wholeNode8(createAndDelete):
result = ce.getCEStatus()
assert result["UsedProcessors"] == 6
assert result["UsedRAM"] == 8000
assert result["AvailableRAM"] == 24000
assert result["AvailableRAM"] == (
ce_parameters.get("MaxRAM") - result["UsedRAM"] if ce_parameters.get("MaxRAM") else 0
)

jobParams = {"MinRAM": 8000, "MaxRAM": 8000} # This is same as asking for SP
result = ce.submitJob("testPoolCEJob_5.py", None, **jobParams)
Expand All @@ -252,19 +263,24 @@ def test_executeJob_wholeNode8(createAndDelete):
result = ce.getCEStatus()
assert result["UsedProcessors"] == 7
assert result["UsedRAM"] == 16000
assert result["AvailableRAM"] == 16000
assert result["AvailableRAM"] == (
ce_parameters.get("MaxRAM") - result["UsedRAM"] if ce_parameters.get("MaxRAM") else 0
)

jobParams = {"MaxRAM": 24000} # This will fail
jobParams = {"MaxRAM": 24000} # This will fail for the case when the ce have set a RAM
result = ce.submitJob("testPoolCEJob_6.py", None, **jobParams)
assert result["OK"] is True
taskID = result["Value"]
assert taskID == 4
taskIDs[taskID] = False
if ce_parameters.get("MaxRAM"):
assert ce.taskResults[taskID]["OK"] is False

result = ce.getCEStatus()
assert result["UsedProcessors"] == 7
assert result["UsedRAM"] == 16000
assert result["AvailableRAM"] == 16000
assert result["UsedProcessors"] == 7 if ce_parameters.get("MaxRAM") else 8
assert result["UsedRAM"] == 16000 if ce_parameters.get("MaxRAM") else 40000
assert result["AvailableRAM"] == (
ce_parameters.get("MaxRAM") - result["UsedRAM"] if ce_parameters.get("MaxRAM") else 0
)

# now trying again would fail
jobParams = {"mpTag": True, "numberOfProcessors": 3}
Expand Down
Loading