Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions .github/workflows/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# type: docs, feat, fix, refactor, style or test
# scope (optional): any extra info, (like DMS or whatever)

name: 'Commit Message Check'
name: "Commit Message Check"
on: pull_request

jobs:
Expand All @@ -15,19 +15,19 @@ jobs:
- name: Check Commit Format
uses: gsactions/commit-message-checker@v2
with:
pattern: '^((docs|feat|fix|refactor|style|test|sweep)( ?\(.*\))?: .+|Revert ".+")$'
excludeDescription: 'true' # optional: this excludes the description body of a pull request
excludeTitle: 'true' # optional: this excludes the title of a pull request
checkAllCommitMessages: 'true' # optional: this checks all commits associated with a pull request
pattern: '^((docs|feat|fix|chore|refactor|style|test|sweep)( ?\(.*\))?: .+|Revert ".+")$'
excludeDescription: "true" # optional: this excludes the description body of a pull request
excludeTitle: "true" # optional: this excludes the title of a pull request
checkAllCommitMessages: "true" # optional: this checks all commits associated with a pull request
accessToken: ${{ secrets.GITHUB_TOKEN }} # github access token is only required if checkAllCommitMessages is true
flags: 'gim'
flags: "gim"
error: 'Your commit has to follow the format "<type>(<scope>): <subject>"".'
- name: Check Commit Length
uses: gsactions/commit-message-checker@v2
with:
pattern: '^.{20,150}$'
error: 'Commit messages should be between 20 and 150 chars'
excludeDescription: 'true' # optional: this excludes the description body of a pull request
excludeTitle: 'true' # optional: this excludes the title of a pull request
checkAllCommitMessages: 'true' # optional: this checks all commits associated with a pull request
pattern: "^.{20,150}$"
error: "Commit messages should be between 20 and 150 chars"
excludeDescription: "true" # optional: this excludes the description body of a pull request
excludeTitle: "true" # optional: this excludes the title of a pull request
checkAllCommitMessages: "true" # optional: this checks all commits associated with a pull request
accessToken: ${{ secrets.GITHUB_TOKEN }} # github access token is only required if checkAllCommitMessages is true
17 changes: 17 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# DIRAC Agent Guidelines

## Build/Lint/Test Commands
- **Build**: `pip install -e .`
- **Lint**: `ruff check src/ && pylint src/`
- **Test**: `pytest tests/`
- **Single test**: `pytest src/DIRAC/path/to/test.py::test_function`

## Code Style Guidelines
- **Formatting**: Use `black` with line length 120 (configured in pyproject.toml)
- **Imports**: Absolute imports only; sort with `isort` (black profile)
- **Naming**: CamelCase for classes, snake_case for functions/variables
- **Types**: Use type hints; run `mypy` for strict checking
- **Error handling**: Return `S_OK(result)` or `S_ERROR(message)` from DIRAC.Core.Utilities.ReturnValues
- **Logging**: Use `gLogger.info/warn/error` (from DIRAC import gLogger)
- **Docstrings**: Follow Google/NumPy style where present
- **Security**: Never log secrets; validate inputs
26 changes: 18 additions & 8 deletions src/DIRAC/Resources/Computing/InProcessComputingElement.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
""" The simplest of the "inner" CEs (meaning it's used by a jobAgent inside a pilot)
"""The simplest of the "inner" CEs (meaning it's used by a jobAgent inside a pilot)

A "InProcess" CE instance submits jobs in the current process.
This is the standard "inner CE" invoked from the JobAgent, main alternative being the PoolCE
A "InProcess" CE instance submits jobs in the current process.
This is the standard "inner CE" invoked from the JobAgent, main alternative being the PoolCE
"""

import os
import stat

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
from DIRAC import S_ERROR, S_OK
from DIRAC.Core.Utilities.CGroups2 import CG2Manager

from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
from DIRAC.Resources.Computing.ComputingElement import ComputingElement


Expand All @@ -21,8 +21,8 @@ def __init__(self, ceUniqueID):
self.submittedJobs = 0
self.runningJobs = 0

self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))
self.maxRAM = int(self.ceParameters.get("MaxRAM", 0))
self.processors = 1
self.maxRAM = 0
self.ceParameters["MaxTotalJobs"] = 1

def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
Expand All @@ -34,6 +34,16 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
:param list inputs: dependencies of executableFile
:return: S_OK(payload exit code) / S_ERROR() if submission issue
"""
self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors))
self.maxRAM = int(self.ceParameters.get("MaxRAM", self.maxRAM))

if "numberOfProcessors" in kwargs:
if self.processors < int(kwargs["numberOfProcessors"]):
return S_ERROR("Requesting processors not available")
if "MaxRAM" in kwargs:
if self.maxRAM < int(kwargs["MaxRAM"]):
return S_ERROR("Requesting RAM not available")

payloadEnv = dict(os.environ)
payloadProxy = ""
renewTask = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,41 @@

import pytest

from DIRAC.Resources.Computing.test.Test_PoolComputingElement import jobScript, _stopJob
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper

# sut
from DIRAC.Resources.Computing.InProcessComputingElement import InProcessComputingElement
from DIRAC.Resources.Computing.test.Test_PoolComputingElement import _stopJob, jobScript
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper


@pytest.mark.slow
def test_submitJob():
@pytest.mark.parametrize(
"ce_parameters, available_processors, ram",
[
({}, 1, 0),
({"NumberOfProcessors": 8}, 8, 0),
({"MaxRAM": 2048}, 1, 2048),
({"NumberOfProcessors": 8, "MaxRAM": 2048}, 8, 2048),
],
)
def test_submitJob(ce_parameters, available_processors, ram):
# initialization
ce = InProcessComputingElement("InProcessCE")
ce.ceParameters = ce_parameters

# simple
with open("testJob.py", "w") as execFile:
execFile.write(jobScript % "1")
os.chmod("testJob.py", 0o755)

ce = InProcessComputingElement("InProcessCE")
res = ce.submitJob("testJob.py", None)
assert res["OK"] is True
res = ce.getCEStatus()
assert res["OK"] is True
assert res["SubmittedJobs"] == 1
assert res["RunningJobs"] == 0
assert res["WaitingJobs"] == 0
assert res["AvailableProcessors"] == available_processors
assert res["AvailableRAM"] == ram
_stopJob(1)
for ff in ["testJob.py", "stop_job_2", "job.info", "std.out"]:
if os.path.isfile(ff):
Expand All @@ -50,23 +66,66 @@ def test_submitJob():
res = ce.submitJob(
wrapperFile,
proxy=None,
numberOfProcessors=4,
maxNumberOfProcessors=8,
numberOfProcessors=available_processors,
maxNumberOfProcessors=available_processors,
wholeNode=False,
mpTag=True,
MinRAM=2500,
MaxRAM=4000,
MinRAM=ram,
MaxRAM=ram,
jobDesc={"jobParams": jobParams, "resourceParams": resourceParams, "optimizerParams": optimizerParams},
)
assert res["OK"] is True
_stopJob(2)

res = ce.getCEStatus()
assert res["OK"] is True
assert res["SubmittedJobs"] == 2
assert res["RunningJobs"] == 0
assert res["WaitingJobs"] == 0
assert res["AvailableProcessors"] == available_processors
assert res["AvailableRAM"] == ram

_stopJob(2)
for ff in ["testJob.py", "stop_job_2", "job.info", "std.out"]:
if os.path.isfile(ff):
os.remove(ff)
if os.path.isdir("job"):
shutil.rmtree("job")

# failing
with open("testJob.py", "w") as execFile:
execFile.write(jobScript % "3")
os.chmod("testJob.py", 0o755)

jobParams = {"JobType": "User", "Executable": "testJob.py"}
resourceParams = {"GridCE": "some_CE"}
optimizerParams = {}

wrapperFile = createJobWrapper(
jobID=3, jobParams=jobParams, resourceParams=resourceParams, optimizerParams=optimizerParams, logLevel="DEBUG"
)["Value"][
"JobExecutablePath"
] # This is not under test, assuming it works fine

res = ce.submitJob(
wrapperFile,
proxy=None,
numberOfProcessors=4 + available_processors,
maxNumberOfProcessors=8 + available_processors,
wholeNode=False,
mpTag=True,
MinRAM=2500,
MaxRAM=4000,
jobDesc={"jobParams": jobParams, "resourceParams": resourceParams, "optimizerParams": optimizerParams},
)
assert res["OK"] is False
res = ce.getCEStatus()
assert res["OK"] is True
assert res["SubmittedJobs"] == 2
assert res["RunningJobs"] == 0
assert res["WaitingJobs"] == 0
assert res["AvailableProcessors"] == available_processors
assert res["AvailableRAM"] == ram
_stopJob(1)
for ff in ["testJob.py", "stop_job_3", "job.info", "std.out"]:
if os.path.isfile(ff):
os.remove(ff)
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def getAvailableRAM(siteName=None, gridCE=None, queue=None):
if not queue:
queue = gConfig.getValue("/LocalSite/CEQueue", "")
if not (siteName and gridCE and queue):
gLogger.error("Could not find AvailableRAM: missing siteName or gridCE or queue. Returning 0")
gLogger.warn("Could not find AvailableRAM: missing siteName or gridCE or queue. Returning 0")
return 0

grid = siteName.split(".")[0]
Expand Down
53 changes: 26 additions & 27 deletions src/DIRAC/tests/Utilities/testJobDefinitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,6 @@ def helloWorldSSHBatch():
return endOfAllJobs(J)


def helloWorldARM():
"""simple hello world job to DIRAC.ARM.ch"""

J = baseToAllJobs("helloWorldARM")
try:
J.setInputSandbox([find_all("exe-script.py", rootPath, "DIRAC/tests/Workflow")[0]])
except IndexError:
try:
J.setInputSandbox([find_all("exe-script.py", ".", "DIRAC/tests/Workflow")[0]])
except IndexError: # we are in Jenkins
J.setInputSandbox([find_all("exe-script.py", "/home/dirac", "DIRAC/tests/Workflow")[0]])
J.setExecutable("exe-script.py", "", "helloWorld.log")
J.setDestination("DIRAC.ARM.ch")
return endOfAllJobs(J)


def helloWorldCloudCE():
"""simple hello world job to Cloud at Imperial College using SiteDirector"""

Expand Down Expand Up @@ -340,39 +324,54 @@ def wholeNodeJob():


def memory_4GB():
"""simple hello world job, with a memory requirement of 4 GB and MultiProcessor tags"""
"""simple hello world job, with a memory requirement of max 4 GB"""

J = baseToAllJobs("memory_4GB")
try:
J.setInputSandbox([find_all("mpTest.py", rootPath, "DIRAC/tests/Utilities")[0]])
J.setInputSandbox([find_all("exe-script.py", rootPath, "DIRAC/tests/Workflow")[0]])
except IndexError:
try:
J.setInputSandbox([find_all("mpTest.py", ".", "DIRAC/tests/Utilities")[0]])
J.setInputSandbox([find_all("exe-script.py", ".", "DIRAC/tests/Workflow")[0]])
except IndexError: # we are in Jenkins
J.setInputSandbox([find_all("mpTest.py", os.environ["WORKSPACE"], "DIRAC/tests/Utilities")[0]])

J.setExecutable("mpTest.py")
J.setNumberOfProcessors(numberOfProcessors=2)
J.setRAMRequirements(ramRequired=2500, maxRAM=4000)
J.setInputSandbox([find_all("exe-script.py", "/home/dirac", "DIRAC/tests/Workflow")[0]])

J.setExecutable("exe-script.py", "", "helloWorld.log")
J.setRAMRequirements(maxRAM=4000)
return endOfAllJobs(J)


def memory_2_to4GB():
"""simple hello world job, with a memory requirement of 2 to 4 GB and MultiProcessor tags"""
"""simple hello world job, with a memory requirement of 2 to 4 GB"""

J = baseToAllJobs("memory_2_to_4GB")
try:
J.setInputSandbox([find_all("exe-script.py", rootPath, "DIRAC/tests/Workflow")[0]])
except IndexError:
try:
J.setInputSandbox([find_all("exe-script.py", ".", "DIRAC/tests/Workflow")[0]])
except IndexError: # we are in Jenkins
J.setInputSandbox([find_all("exe-script.py", "/home/dirac", "DIRAC/tests/Workflow")[0]])

J.setExecutable("exe-script.py")
J.setRAMRequirements(ramRequired=2500, maxRAM=4000)
return endOfAllJobs(J)


def memory_2_to4GB_MP():
"""simple hello world job, with a memory requirement of 2 to 4 GB and MultiProcessor tags"""

J = baseToAllJobs("memory_2_to_4GB_MP")
try:
J.setInputSandbox([find_all("mpTest.py", rootPath, "DIRAC/tests/Utilities")[0]])
except IndexError:
try:
J.setInputSandbox([find_all("mpTest.py", ".", "DIRAC/tests/Utilities")[0]])
except IndexError: # we are in Jenkins
J.setInputSandbox([find_all("mpTest.py", os.environ["WORKSPACE"], "DIRAC/tests/Utilities")[0]])
J.setInputSandbox([find_all("mpTest.py", "/home/dirac", "DIRAC/tests/Utilities")[0]])

J.setExecutable("mpTest.py")
J.setNumberOfProcessors(numberOfProcessors=2)
J.setRAMRequirements(ramRequired=4000, maxRAM=4000)
J.setRAMRequirements(ramRequired=2500, maxRAM=4000)
return endOfAllJobs(J)


Expand Down
12 changes: 6 additions & 6 deletions tests/System/unitTestUserJobs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
""" Collection of user jobs for testing purposes
"""
"""Collection of user jobs for testing purposes"""

# pylint: disable=wrong-import-position, invalid-name
import sys
import time
Expand Down Expand Up @@ -102,10 +102,6 @@ def test_submit(self):
self.assertTrue(res["OK"])
jobsSubmittedList.append(res["Value"])

res = helloWorldARM()
self.assertTrue(res["OK"])
jobsSubmittedList.append(res["Value"])

res = mpJob()
self.assertTrue(res["OK"])
jobsSubmittedList.append(res["Value"])
Expand All @@ -130,6 +126,10 @@ def test_submit(self):
self.assertTrue(res["OK"])
jobsSubmittedList.append(res["Value"])

res = memory_2_to4GB_MP()
self.assertTrue(res["OK"])
jobsSubmittedList.append(res["Value"])

res = parametricJob()
self.assertTrue(res["OK"])
jobsSubmittedList.append(res["Value"])
Expand Down
Loading