diff --git a/dask_jobqueue/__init__.py b/dask_jobqueue/__init__.py index 4bd11a26..7b925309 100644 --- a/dask_jobqueue/__init__.py +++ b/dask_jobqueue/__init__.py @@ -8,6 +8,7 @@ from .lsf import LSFCluster from .oar import OARCluster from .htcondor import HTCondorCluster +from .flux import FluxCluster from . import _version diff --git a/dask_jobqueue/flux.py b/dask_jobqueue/flux.py new file mode 100644 index 00000000..eecff0f8 --- /dev/null +++ b/dask_jobqueue/flux.py @@ -0,0 +1,178 @@ +import logging +import shlex + +import dask + +from .core import Job, JobQueueCluster, job_parameters, cluster_parameters + +logger = logging.getLogger(__name__) + + +def _without_arg(command, option): + command_args = shlex.split(command) + filtered_args = [] + skip_next = False + + for arg in command_args: + if skip_next: + skip_next = False + continue + + if arg == option: + skip_next = True + continue + + filtered_args.append(arg) + + return " ".join(filtered_args) + + +def _flux_walltime(value): + if value is None: + return None + + if isinstance(value, str) and value.count(":") == 2: + hours, minutes, seconds = value.split(":") + total_seconds = (int(hours) * 3600) + (int(minutes) * 60) + int(seconds) + + if total_seconds % 3600 == 0: + return f"{total_seconds // 3600}h" + if total_seconds % 60 == 0: + return f"{total_seconds // 60}m" + return f"{total_seconds}s" + + return value + + +class FluxJob(Job): + submit_command = "flux batch" + cancel_command = "flux cancel" + config_name = "flux" + + def __init__( + self, + scheduler=None, + name=None, + queue=None, + account=None, + walltime=None, + job_cpu=None, + job_nodes=None, + config_name=None, + **base_class_kwargs + ): + super().__init__( + scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs + ) + + if queue is None: + queue = dask.config.get("jobqueue.%s.queue" % self.config_name) + if account is None: + account = dask.config.get("jobqueue.%s.account" % self.config_name) + if walltime is None: + walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name) + walltime = _flux_walltime(walltime) + if job_cpu is None: + job_cpu = dask.config.get("jobqueue.%s.job-cpu" % self.config_name) + if job_nodes is None: + job_nodes = dask.config.get("jobqueue.%s.job-nodes" % self.config_name) + if job_nodes is None: + job_nodes = 1 + if job_nodes < 1: + raise ValueError("job_nodes must be at least 1") + + self.job_nodes = job_nodes + + header_lines = [] + if self.job_name is not None: + header_lines.append("#flux: --job-name=%s" % self.job_name) + if queue is not None: + header_lines.append("#flux: -q %s" % queue) + if account is not None: + header_lines.append("#flux: -B %s" % account) + if walltime is not None: + header_lines.append("#flux: -t %s" % walltime) + + if self.job_nodes > 1: + header_lines.append("#flux: -N %d" % self.job_nodes) + header_lines.append("#flux: -n %d" % self.worker_processes) + if job_cpu is None: + job_cpu = self.worker_process_threads + else: + header_lines.append("#flux: -N 1") + header_lines.append("#flux: -n 1") + if job_cpu is None: + job_cpu = self.worker_cores + + if job_cpu is not None: + header_lines.append("#flux: -c %d" % job_cpu) + + header_lines = list( + filter( + lambda line: not any(skip in line for skip in self.job_directives_skip), + header_lines, + ) + ) + header_lines.extend(["#flux: %s" % arg for arg in self.job_extra_directives]) + self.job_header = "\n".join(header_lines) + + logger.debug("Job script: \n %s", self.job_script()) + + def job_script(self): + worker_command = self._command_template + if self.job_nodes > 1: + worker_command = "flux run -N {nodes} -n {tasks} {command}".format( + nodes=self.job_nodes, + tasks=self.worker_processes, + command=_without_arg(self._command_template, "--nworkers"), + ) + + pieces = { + "shebang": self.shebang, + "job_header": self.job_header, + "job_script_prologue": "\n".join(filter(None, self._job_script_prologue)), + "worker_command": worker_command, + "job_script_epilogue": "\n".join(filter(None, self._job_script_epilogue)), + } + return self._script_template % pieces + + def _job_id_from_submit_output(self, out): + return out.strip() + + +class FluxCluster(JobQueueCluster): + __doc__ = """ Launch Dask on a Flux cluster + + Parameters + ---------- + queue : str + Destination queue for each worker job. Passed to ``flux batch -q``. + account : str + Bank associated with each worker job. Passed to ``flux batch -B``. + {job} + {cluster} + walltime : str + Walltime for each worker job in Flux standard duration syntax. + memory : str + Total memory budget exposed to Dask for each submitted job. + This value is used to derive the Dask worker ``--memory-limit`` setting, + but Flux itself does not enforce or reserve memory from this parameter. + job_cpu : int + Number of CPU cores to allocate per Flux slot. + job_nodes : int + Number of nodes to allocate per submitted job. + For multi-node jobs, one Dask worker process is launched per Flux slot. + job_extra : list + Deprecated: use ``job_extra_directives`` instead. This parameter will be removed in a future version. + job_extra_directives : list + List of other Flux options. Each option will be prepended with the ``#flux:`` prefix. + + Examples + -------- + >>> from dask_jobqueue import FluxCluster + >>> cluster = FluxCluster(queue='pdebug', cores=4, memory="16 GB") + >>> cluster.scale(jobs=2) + """.format( + job=job_parameters, cluster=cluster_parameters + ) + job_cls = FluxJob diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index becccd5c..7008a053 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -137,6 +137,41 @@ jobqueue: # Scheduler options scheduler-options: {} + flux: + name: dask-worker + + # Dask worker options + cores: null # Total number of cores per job + memory: null # Total amount of memory per job + processes: null # Number of Python processes per job + + python: null # Python executable + interface: null # Network interface to use like eth0 or ib0 + death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler + local-directory: null # Location of fast local storage like /scratch or $TMPDIR + shared-temp-directory: null # Shared directory currently used to dump temporary security objects for workers + extra: null # deprecated: use worker-extra-args + worker-command: "distributed.cli.dask_worker" # Command to launch a worker + worker-extra-args: [] # Additional arguments to pass to `dask-worker` + + # Flux resource manager options + shebang: "#!/usr/bin/env bash" + queue: null + account: null + walltime: '30m' + env-extra: null + job-script-prologue: [] + job-script-epilogue: [] + job-cpu: null + job-nodes: 1 + job-extra: null + job-extra-directives: [] + job-directives-skip: [] + log-directory: null + + # Scheduler options + scheduler-options: {} + moab: name: dask-worker diff --git a/dask_jobqueue/tests/test_flux.py b/dask_jobqueue/tests/test_flux.py new file mode 100644 index 00000000..12974057 --- /dev/null +++ b/dask_jobqueue/tests/test_flux.py @@ -0,0 +1,269 @@ +import sys +import warnings + +import dask +import pytest +from dask.utils import format_bytes, parse_bytes + +from dask_jobqueue import FluxCluster +from dask_jobqueue.flux import FluxJob, _flux_walltime, _without_arg + + +def test_without_arg_removes_only_requested_option(): + command = ( + "python -m distributed.cli.dask_worker tcp://127.0.0.1:8786 " + "--name worker-0 --nthreads 2 --nworkers 4 --memory-limit 1GiB" + ) + + assert _without_arg(command, "--nworkers") == ( + "python -m distributed.cli.dask_worker tcp://127.0.0.1:8786 " + "--name worker-0 --nthreads 2 --memory-limit 1GiB" + ) + assert "--nworkers 4" in _without_arg(command, "--no-such-option") + + +@pytest.mark.parametrize( + ("raw", "expected"), + [ + (None, None), + ("45m", "45m"), + ("00:10:00", "10m"), + ("01:00:00", "1h"), + ("00:00:59", "59s"), + ("01:01:01", "3661s"), + ], +) +def test_flux_walltime_normalization(raw, expected): + assert _flux_walltime(raw) == expected + + +def test_flux_job_rejects_invalid_job_nodes(): + with pytest.raises(ValueError, match="job_nodes must be at least 1"): + FluxJob( + scheduler="tcp://127.0.0.1:8786", + cores=2, + memory="4GB", + processes=1, + job_nodes=0, + ) + + +def test_header(): + with FluxCluster( + walltime="45m", + processes=4, + cores=8, + memory="28GB", + name="dask-worker", + ) as cluster: + assert "#flux: --job-name=dask-worker" in cluster.job_header + assert "#flux: -N 1" in cluster.job_header + assert "#flux: -n 1" in cluster.job_header + assert "#flux: -c 8" in cluster.job_header + assert "#flux: -t 45m" in cluster.job_header + assert "#flux: -q" not in cluster.job_header + assert "#flux: -B" not in cluster.job_header + + with FluxCluster( + queue="debug", + account="project123", + walltime="30m", + processes=2, + cores=4, + memory="8GB", + job_cpu=6, + job_nodes=3, + job_extra_directives=["--exclusive"], + ) as cluster: + assert "#flux: -q debug" in cluster.job_header + assert "#flux: -B project123" in cluster.job_header + assert "#flux: -N 3" in cluster.job_header + assert "#flux: -n 2" in cluster.job_header + assert "#flux: -c 6" in cluster.job_header + assert "#flux: --exclusive" in cluster.job_header + + +def test_job_script(): + with FluxCluster( + walltime="45m", + processes=4, + cores=8, + memory="28GB", + ) as cluster: + job_script = cluster.job_script() + formatted_bytes = format_bytes(parse_bytes("7GB")).replace(" ", "") + + assert "#flux: --job-name=dask-worker" in job_script + assert "#flux: -N 1" in job_script + assert "#flux: -n 1" in job_script + assert "#flux: -c 8" in job_script + assert "#flux: -t 45m" in job_script + assert "#flux: -q" not in job_script + assert "#flux: -B" not in job_script + assert f"{sys.executable} -m distributed.cli.dask_worker tcp://" in job_script + assert "--nthreads 2" in job_script + assert "--nworkers 4" in job_script + assert f"--memory-limit {formatted_bytes}" in job_script + assert "flux run" not in job_script + + with FluxCluster( + queue="debug", + account="project123", + walltime="30m", + processes=2, + cores=4, + memory="8GB", + job_nodes=3, + job_script_prologue=['echo "starting"'], + job_script_epilogue=['echo "done"'], + ) as cluster: + job_script = cluster.job_script() + formatted_bytes = format_bytes(parse_bytes("4GB")).replace(" ", "") + + assert "#flux: -q debug" in job_script + assert "#flux: -B project123" in job_script + assert "#flux: -N 3" in job_script + assert "#flux: -n 2" in job_script + assert "#flux: -c 2" in job_script + assert "flux run -N 3 -n 2" in job_script + assert "--nworkers" not in job_script + assert "--nthreads 2" in job_script + assert f"--memory-limit {formatted_bytes}" in job_script + assert 'echo "starting"' in job_script + assert 'echo "done"' in job_script + + +def test_job_script_normalizes_hms_walltime(): + with FluxCluster( + queue="pdebug", + walltime="00:10:00", + processes=1, + cores=2, + memory="4GB", + ) as cluster: + job_script = cluster.job_script() + + assert "#flux: -t 10m" in job_script + assert "#flux: -t 00:10:00" not in job_script + + +def test_header_lines_skip(): + job = FluxJob(cores=1, memory="1GB", job_name="foobar") + assert "foobar" in job.job_script() + + job = FluxJob(cores=1, memory="1GB", job_name="foobar", job_directives_skip=["--job-name"]) + assert "foobar" not in job.job_script() + + + +def test_header_lines_dont_skip_extra_directives(): + job = FluxJob( + cores=1, memory="1GB", job_name="foobar", job_extra_directives=["--job-name=custom"] + ) + assert "foobar" in job.job_script() + assert "--job-name=custom" in job.job_script() + + job = FluxJob( + cores=1, + memory="1GB", + job_name="foobar", + job_directives_skip=["--job-name"], + job_extra_directives=["--job-name=custom"], + ) + assert "foobar" not in job.job_script() + assert "--job-name=custom" in job.job_script() + + + +def test_deprecation_header_skip(): + warnings.simplefilter("ignore", UserWarning) + + with warnings.catch_warnings(record=True) as caught: + FluxJob(cores=1, memory="1 GB", header_skip=["old_param"]) + assert len(caught) == 1 + assert issubclass(caught[0].category, FutureWarning) + assert "header_skip has been renamed" in str(caught[0].message) + + with warnings.catch_warnings(record=True) as caught: + FluxJob( + cores=1, + memory="1 GB", + header_skip=["old_param"], + job_directives_skip=["new_param"], + ) + assert len(caught) == 1 + assert issubclass(caught[0].category, FutureWarning) + assert "header_skip has been renamed" in str(caught[0].message) + + with warnings.catch_warnings(record=True) as caught: + FluxJob( + cores=1, + memory="1 GB", + job_directives_skip=["new_param"], + ) + assert len(caught) == 0 + + warnings.simplefilter("ignore") + job = FluxJob( + cores=1, + memory="1 GB", + job_name="jobname", + header_skip=["jobname"], + job_directives_skip=["new_param"], + ) + assert "jobname" in job.job_script() + + job = FluxJob( + cores=1, + memory="1 GB", + job_name="jobname", + header_skip=["jobname"], + ) + assert "jobname" not in job.job_script() + + job = FluxJob( + cores=1, + memory="1 GB", + job_name="jobname", + header_skip=["jobname"], + job_directives_skip=(), + ) + assert "jobname" not in job.job_script() + + + +def test_config_name_flux_takes_custom_config(): + conf = { + "name": "myname", + "cores": 1, + "memory": "2 GB", + "processes": 1, + "python": None, + "interface": None, + "death-timeout": None, + "local-directory": "/foo", + "shared-temp-directory": None, + "extra": None, + "worker-command": None, + "worker-extra-args": [], + "queue": "myqueue", + "account": "myaccount", + "walltime": "00:02:00", + "env-extra": None, + "job-script-prologue": [], + "job-script-epilogue": [], + "job-extra": None, + "job-extra-directives": [], + "job-directives-skip": [], + "log-directory": None, + "shebang": "#!/usr/bin/env bash", + "job-cpu": None, + "job-nodes": 1, + } + + with dask.config.set({"jobqueue.flux-config-name": conf}): + with FluxCluster(config_name="flux-config-name") as cluster: + assert cluster.job_name == "myname" + assert "#flux: -q myqueue" in cluster.job_header + assert "#flux: -B myaccount" in cluster.job_header + assert "#flux: -t 2m" in cluster.job_header diff --git a/docs/source/clusters-api.rst b/docs/source/clusters-api.rst index 1f6f427c..4fbf92c0 100644 --- a/docs/source/clusters-api.rst +++ b/docs/source/clusters-api.rst @@ -16,3 +16,4 @@ Clusters API PBSCluster SGECluster SLURMCluster + FluxCluster diff --git a/docs/source/clusters-configuration.rst b/docs/source/clusters-configuration.rst index 644ee6c4..d68d6e2d 100644 --- a/docs/source/clusters-configuration.rst +++ b/docs/source/clusters-configuration.rst @@ -32,6 +32,11 @@ Note that the ``cores`` and ``memory`` keywords above correspond not to your full desired deployment, but rather to the size of a *single job* which should be no larger than the size of a single machine in your cluster. +For most schedulers both values influence the scheduler request. Flux is the +main exception: ``memory`` is still required because it controls the Dask worker +``--memory-limit``, but Flux itself does not currently reserve or enforce memory +from that parameter. + Separately you will specify how many jobs to deploy using the scale method. You can either specify the number of workers, or the total number of cores or memory that you want.