diff --git a/.github/workflows/semantic.yml b/.github/workflows/semantic.yml index 3550803141c..bba349c5151 100644 --- a/.github/workflows/semantic.yml +++ b/.github/workflows/semantic.yml @@ -4,7 +4,7 @@ # type: docs, feat, fix, refactor, style or test # scope (optional): any extra info, (like DMS or whatever) -name: 'Commit Message Check' +name: "Commit Message Check" on: pull_request jobs: @@ -15,19 +15,19 @@ jobs: - name: Check Commit Format uses: gsactions/commit-message-checker@v2 with: - pattern: '^((docs|feat|fix|refactor|style|test|sweep)( ?\(.*\))?: .+|Revert ".+")$' - excludeDescription: 'true' # optional: this excludes the description body of a pull request - excludeTitle: 'true' # optional: this excludes the title of a pull request - checkAllCommitMessages: 'true' # optional: this checks all commits associated with a pull request + pattern: '^((docs|feat|fix|chore|refactor|style|test|sweep)( ?\(.*\))?: .+|Revert ".+")$' + excludeDescription: "true" # optional: this excludes the description body of a pull request + excludeTitle: "true" # optional: this excludes the title of a pull request + checkAllCommitMessages: "true" # optional: this checks all commits associated with a pull request accessToken: ${{ secrets.GITHUB_TOKEN }} # github access token is only required if checkAllCommitMessages is true - flags: 'gim' + flags: "gim" error: 'Your commit has to follow the format "(): "".' - name: Check Commit Length uses: gsactions/commit-message-checker@v2 with: - pattern: '^.{20,150}$' - error: 'Commit messages should be between 20 and 150 chars' - excludeDescription: 'true' # optional: this excludes the description body of a pull request - excludeTitle: 'true' # optional: this excludes the title of a pull request - checkAllCommitMessages: 'true' # optional: this checks all commits associated with a pull request + pattern: "^.{20,150}$" + error: "Commit messages should be between 20 and 150 chars" + excludeDescription: "true" # optional: this excludes the description body of a pull request + excludeTitle: "true" # optional: this excludes the title of a pull request + checkAllCommitMessages: "true" # optional: this checks all commits associated with a pull request accessToken: ${{ secrets.GITHUB_TOKEN }} # github access token is only required if checkAllCommitMessages is true diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000000..df574f8a519 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,17 @@ +# DIRAC Agent Guidelines + +## Build/Lint/Test Commands +- **Build**: `pip install -e .` +- **Lint**: `ruff check src/ && pylint src/` +- **Test**: `pytest tests/` +- **Single test**: `pytest src/DIRAC/path/to/test.py::test_function` + +## Code Style Guidelines +- **Formatting**: Use `black` with line length 120 (configured in pyproject.toml) +- **Imports**: Absolute imports only; sort with `isort` (black profile) +- **Naming**: CamelCase for classes, snake_case for functions/variables +- **Types**: Use type hints; run `mypy` for strict checking +- **Error handling**: Return `S_OK(result)` or `S_ERROR(message)` from DIRAC.Core.Utilities.ReturnValues +- **Logging**: Use `gLogger.info/warn/error` (from DIRAC import gLogger) +- **Docstrings**: Follow Google/NumPy style where present +- **Security**: Never log secrets; validate inputs diff --git a/src/DIRAC/Resources/Computing/InProcessComputingElement.py b/src/DIRAC/Resources/Computing/InProcessComputingElement.py index 96969335c30..5d8e5a20e64 100755 --- a/src/DIRAC/Resources/Computing/InProcessComputingElement.py +++ b/src/DIRAC/Resources/Computing/InProcessComputingElement.py @@ -1,15 +1,15 @@ -""" The simplest of the "inner" CEs (meaning it's used by a jobAgent inside a pilot) +"""The simplest of the "inner" CEs (meaning it's used by a jobAgent inside a pilot) - A "InProcess" CE instance submits jobs in the current process. - This is the standard "inner CE" invoked from the JobAgent, main alternative being the PoolCE +A "InProcess" CE instance submits jobs in the current process. +This is the standard "inner CE" invoked from the JobAgent, main alternative being the PoolCE """ + import os import stat -from DIRAC import S_OK, S_ERROR -from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler +from DIRAC import S_ERROR, S_OK from DIRAC.Core.Utilities.CGroups2 import CG2Manager - +from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler from DIRAC.Resources.Computing.ComputingElement import ComputingElement @@ -21,8 +21,8 @@ def __init__(self, ceUniqueID): self.submittedJobs = 0 self.runningJobs = 0 - self.processors = int(self.ceParameters.get("NumberOfProcessors", 1)) - self.maxRAM = int(self.ceParameters.get("MaxRAM", 0)) + self.processors = 1 + self.maxRAM = 0 self.ceParameters["MaxTotalJobs"] = 1 def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs): @@ -34,6 +34,16 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs): :param list inputs: dependencies of executableFile :return: S_OK(payload exit code) / S_ERROR() if submission issue """ + self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors)) + self.maxRAM = int(self.ceParameters.get("MaxRAM", self.maxRAM)) + + if "numberOfProcessors" in kwargs: + if self.processors < int(kwargs["numberOfProcessors"]): + return S_ERROR("Requesting processors not available") + if "MaxRAM" in kwargs: + if self.maxRAM < int(kwargs["MaxRAM"]): + return S_ERROR("Requesting RAM not available") + payloadEnv = dict(os.environ) payloadProxy = "" renewTask = None diff --git a/src/DIRAC/Resources/Computing/test/Test_InProcessComputingElement.py b/src/DIRAC/Resources/Computing/test/Test_InProcessComputingElement.py index 3b8be05700a..b746b0d9f1f 100644 --- a/src/DIRAC/Resources/Computing/test/Test_InProcessComputingElement.py +++ b/src/DIRAC/Resources/Computing/test/Test_InProcessComputingElement.py @@ -8,25 +8,41 @@ import pytest -from DIRAC.Resources.Computing.test.Test_PoolComputingElement import jobScript, _stopJob -from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper - # sut from DIRAC.Resources.Computing.InProcessComputingElement import InProcessComputingElement +from DIRAC.Resources.Computing.test.Test_PoolComputingElement import _stopJob, jobScript +from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper @pytest.mark.slow -def test_submitJob(): +@pytest.mark.parametrize( + "ce_parameters, available_processors, ram", + [ + ({}, 1, 0), + ({"NumberOfProcessors": 8}, 8, 0), + ({"MaxRAM": 2048}, 1, 2048), + ({"NumberOfProcessors": 8, "MaxRAM": 2048}, 8, 2048), + ], +) +def test_submitJob(ce_parameters, available_processors, ram): + # initialization + ce = InProcessComputingElement("InProcessCE") + ce.ceParameters = ce_parameters + + # simple with open("testJob.py", "w") as execFile: execFile.write(jobScript % "1") os.chmod("testJob.py", 0o755) - ce = InProcessComputingElement("InProcessCE") res = ce.submitJob("testJob.py", None) assert res["OK"] is True res = ce.getCEStatus() assert res["OK"] is True assert res["SubmittedJobs"] == 1 + assert res["RunningJobs"] == 0 + assert res["WaitingJobs"] == 0 + assert res["AvailableProcessors"] == available_processors + assert res["AvailableRAM"] == ram _stopJob(1) for ff in ["testJob.py", "stop_job_2", "job.info", "std.out"]: if os.path.isfile(ff): @@ -50,23 +66,66 @@ def test_submitJob(): res = ce.submitJob( wrapperFile, proxy=None, - numberOfProcessors=4, - maxNumberOfProcessors=8, + numberOfProcessors=available_processors, + maxNumberOfProcessors=available_processors, wholeNode=False, mpTag=True, - MinRAM=2500, - MaxRAM=4000, + MinRAM=ram, + MaxRAM=ram, jobDesc={"jobParams": jobParams, "resourceParams": resourceParams, "optimizerParams": optimizerParams}, ) assert res["OK"] is True + _stopJob(2) res = ce.getCEStatus() assert res["OK"] is True assert res["SubmittedJobs"] == 2 + assert res["RunningJobs"] == 0 + assert res["WaitingJobs"] == 0 + assert res["AvailableProcessors"] == available_processors + assert res["AvailableRAM"] == ram - _stopJob(2) for ff in ["testJob.py", "stop_job_2", "job.info", "std.out"]: if os.path.isfile(ff): os.remove(ff) if os.path.isdir("job"): shutil.rmtree("job") + + # failing + with open("testJob.py", "w") as execFile: + execFile.write(jobScript % "3") + os.chmod("testJob.py", 0o755) + + jobParams = {"JobType": "User", "Executable": "testJob.py"} + resourceParams = {"GridCE": "some_CE"} + optimizerParams = {} + + wrapperFile = createJobWrapper( + jobID=3, jobParams=jobParams, resourceParams=resourceParams, optimizerParams=optimizerParams, logLevel="DEBUG" + )["Value"][ + "JobExecutablePath" + ] # This is not under test, assuming it works fine + + res = ce.submitJob( + wrapperFile, + proxy=None, + numberOfProcessors=4 + available_processors, + maxNumberOfProcessors=8 + available_processors, + wholeNode=False, + mpTag=True, + MinRAM=2500, + MaxRAM=4000, + jobDesc={"jobParams": jobParams, "resourceParams": resourceParams, "optimizerParams": optimizerParams}, + ) + assert res["OK"] is False + res = ce.getCEStatus() + assert res["OK"] is True + assert res["SubmittedJobs"] == 2 + assert res["RunningJobs"] == 0 + assert res["WaitingJobs"] == 0 + assert res["AvailableProcessors"] == available_processors + assert res["AvailableRAM"] == ram + _stopJob(1) + for ff in ["testJob.py", "stop_job_3", "job.info", "std.out"]: + if os.path.isfile(ff): + os.remove(ff) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py index e86e429c3e6..0c530c261ae 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py @@ -229,7 +229,7 @@ def getAvailableRAM(siteName=None, gridCE=None, queue=None): if not queue: queue = gConfig.getValue("/LocalSite/CEQueue", "") if not (siteName and gridCE and queue): - gLogger.error("Could not find AvailableRAM: missing siteName or gridCE or queue. Returning 0") + gLogger.warn("Could not find AvailableRAM: missing siteName or gridCE or queue. Returning 0") return 0 grid = siteName.split(".")[0] diff --git a/src/DIRAC/tests/Utilities/testJobDefinitions.py b/src/DIRAC/tests/Utilities/testJobDefinitions.py index 61064d4b9e3..8d5708155f2 100644 --- a/src/DIRAC/tests/Utilities/testJobDefinitions.py +++ b/src/DIRAC/tests/Utilities/testJobDefinitions.py @@ -239,22 +239,6 @@ def helloWorldSSHBatch(): return endOfAllJobs(J) -def helloWorldARM(): - """simple hello world job to DIRAC.ARM.ch""" - - J = baseToAllJobs("helloWorldARM") - try: - J.setInputSandbox([find_all("exe-script.py", rootPath, "DIRAC/tests/Workflow")[0]]) - except IndexError: - try: - J.setInputSandbox([find_all("exe-script.py", ".", "DIRAC/tests/Workflow")[0]]) - except IndexError: # we are in Jenkins - J.setInputSandbox([find_all("exe-script.py", "/home/dirac", "DIRAC/tests/Workflow")[0]]) - J.setExecutable("exe-script.py", "", "helloWorld.log") - J.setDestination("DIRAC.ARM.ch") - return endOfAllJobs(J) - - def helloWorldCloudCE(): """simple hello world job to Cloud at Imperial College using SiteDirector""" @@ -340,39 +324,54 @@ def wholeNodeJob(): def memory_4GB(): - """simple hello world job, with a memory requirement of 4 GB and MultiProcessor tags""" + """simple hello world job, with a memory requirement of max 4 GB""" J = baseToAllJobs("memory_4GB") try: - J.setInputSandbox([find_all("mpTest.py", rootPath, "DIRAC/tests/Utilities")[0]]) + J.setInputSandbox([find_all("exe-script.py", rootPath, "DIRAC/tests/Workflow")[0]]) except IndexError: try: - J.setInputSandbox([find_all("mpTest.py", ".", "DIRAC/tests/Utilities")[0]]) + J.setInputSandbox([find_all("exe-script.py", ".", "DIRAC/tests/Workflow")[0]]) except IndexError: # we are in Jenkins - J.setInputSandbox([find_all("mpTest.py", os.environ["WORKSPACE"], "DIRAC/tests/Utilities")[0]]) - - J.setExecutable("mpTest.py") - J.setNumberOfProcessors(numberOfProcessors=2) - J.setRAMRequirements(ramRequired=2500, maxRAM=4000) + J.setInputSandbox([find_all("exe-script.py", "/home/dirac", "DIRAC/tests/Workflow")[0]]) + J.setExecutable("exe-script.py", "", "helloWorld.log") + J.setRAMRequirements(maxRAM=4000) return endOfAllJobs(J) def memory_2_to4GB(): - """simple hello world job, with a memory requirement of 2 to 4 GB and MultiProcessor tags""" + """simple hello world job, with a memory requirement of 2 to 4 GB""" J = baseToAllJobs("memory_2_to_4GB") + try: + J.setInputSandbox([find_all("exe-script.py", rootPath, "DIRAC/tests/Workflow")[0]]) + except IndexError: + try: + J.setInputSandbox([find_all("exe-script.py", ".", "DIRAC/tests/Workflow")[0]]) + except IndexError: # we are in Jenkins + J.setInputSandbox([find_all("exe-script.py", "/home/dirac", "DIRAC/tests/Workflow")[0]]) + + J.setExecutable("exe-script.py") + J.setRAMRequirements(ramRequired=2500, maxRAM=4000) + return endOfAllJobs(J) + + +def memory_2_to4GB_MP(): + """simple hello world job, with a memory requirement of 2 to 4 GB and MultiProcessor tags""" + + J = baseToAllJobs("memory_2_to_4GB_MP") try: J.setInputSandbox([find_all("mpTest.py", rootPath, "DIRAC/tests/Utilities")[0]]) except IndexError: try: J.setInputSandbox([find_all("mpTest.py", ".", "DIRAC/tests/Utilities")[0]]) except IndexError: # we are in Jenkins - J.setInputSandbox([find_all("mpTest.py", os.environ["WORKSPACE"], "DIRAC/tests/Utilities")[0]]) + J.setInputSandbox([find_all("mpTest.py", "/home/dirac", "DIRAC/tests/Utilities")[0]]) J.setExecutable("mpTest.py") J.setNumberOfProcessors(numberOfProcessors=2) - J.setRAMRequirements(ramRequired=4000, maxRAM=4000) + J.setRAMRequirements(ramRequired=2500, maxRAM=4000) return endOfAllJobs(J) diff --git a/tests/System/unitTestUserJobs.py b/tests/System/unitTestUserJobs.py index b922062cd2a..08d66c3708d 100644 --- a/tests/System/unitTestUserJobs.py +++ b/tests/System/unitTestUserJobs.py @@ -1,5 +1,5 @@ -""" Collection of user jobs for testing purposes -""" +"""Collection of user jobs for testing purposes""" + # pylint: disable=wrong-import-position, invalid-name import sys import time @@ -102,10 +102,6 @@ def test_submit(self): self.assertTrue(res["OK"]) jobsSubmittedList.append(res["Value"]) - res = helloWorldARM() - self.assertTrue(res["OK"]) - jobsSubmittedList.append(res["Value"]) - res = mpJob() self.assertTrue(res["OK"]) jobsSubmittedList.append(res["Value"]) @@ -130,6 +126,10 @@ def test_submit(self): self.assertTrue(res["OK"]) jobsSubmittedList.append(res["Value"]) + res = memory_2_to4GB_MP() + self.assertTrue(res["OK"]) + jobsSubmittedList.append(res["Value"]) + res = parametricJob() self.assertTrue(res["OK"]) jobsSubmittedList.append(res["Value"])