diff --git a/doc/source/contents.rst b/doc/source/contents.rst
index cac6d59a..bd909225 100644
--- a/doc/source/contents.rst
+++ b/doc/source/contents.rst
@@ -36,6 +36,7 @@ The Kernel Tuner documentation
optimization
metrics
observers
+ parallel
.. toctree::
:maxdepth: 1
diff --git a/doc/source/launch_ray.sh b/doc/source/launch_ray.sh
new file mode 100644
index 00000000..954d5f11
--- /dev/null
+++ b/doc/source/launch_ray.sh
@@ -0,0 +1,34 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+# Get SLURM variables
+NODELIST="${SLURM_STEP_NODELIST:-${SLURM_JOB_NODELIST:-}}"
+NUM_NODES="${SLURM_STEP_NUM_NODES:-${SLURM_JOB_NUM_NODES:-}}"
+
+if [[ -z "$NODELIST" || -z "$NUM_NODES" ]]; then
+ echo "ERROR: Not running under Slurm (missing SLURM_* vars)."
+ exit 1
+fi
+
+# Get head node
+NODES=$(scontrol show hostnames "$NODELIST")
+NODES_ARRAY=($NODES)
+RAY_IP="${NODES_ARRAY[0]}"
+RAY_PORT="${RAY_PORT:-6379}"
+RAY_ADDRESS="${RAY_IP}:${RAY_PORT}"
+
+# Ensure command exists (Ray >= 2.49 per docs)
+if ! ray symmetric-run --help >/dev/null 2>&1; then
+ echo "ERROR: 'ray symmetric-run' not available. Check Ray installation (needs Ray 2.49+)."
+ exit 1
+fi
+
+# Launch cluster!
+echo "Ray head node: $RAY_ADDRESS"
+
+exec ray symmetric-run \
+ --address "$RAY_ADDRESS" \
+ --min-nodes "$NUM_NODES" \
+ -- \
+ "$@"
+
diff --git a/doc/source/parallel.rst b/doc/source/parallel.rst
new file mode 100644
index 00000000..f1e83f09
--- /dev/null
+++ b/doc/source/parallel.rst
@@ -0,0 +1,143 @@
+Parallel and Remote Tuning
+==========================
+
+By default, Kernel Tuner benchmarks GPU kernel configurations sequentially on a single local GPU.
+While this works well for small tuning problems, it can become a bottleneck for larger search spaces.
+
+.. image:: parallel_runner.png
+ :width: 700px
+ :alt: Example of sequential versus parallel tuning.
+
+
+Kernel Tuner also supports **parallel tuning**, allowing multiple GPUs to evaluate kernel configurations in parallel.
+The same mechanism can be used for **remote tuning**, where Kernel Tuner runs on a host system while one or more GPUs are located on remote machines.
+
+Parallel/remote tuning is implemented using `Ray `_ and works on both local multi-GPU systems and distributed clusters.
+
+How to use
+----------
+
+To enable parallel tuning, pass the ``parallel_workers`` argument to ``tune_kernel``:
+
+.. code-block:: python
+
+ kernel_tuner.tune_kernel(
+ "vector_add",
+ kernel_string,
+ size,
+ args,
+ tune_params,
+ parallel_workers=True,
+ )
+
+If ``parallel_workers`` is set to ``True``, Kernel Tuner will use all available Ray workers for tuning.
+Alternatively, ``parallel_workers`` can be set to an integer ``n`` to use exactly ``n`` workers.
+
+
+Parallel tuning and optimization strategies
+-------------------------------------------
+
+The achievable speedup from using multiple GPUs depends in part on the **optimization strategy** used during tuning.
+
+Some optimization strategies support **maximum parallelism** and can evaluate all configurations independently.
+Other strategies support **limited parallelism**, typically by repeatly evaluating a fixed-size population of configurations in parallel.
+Finally, some strategies are **inherently sequential** and always evaluate configurations one by one, providing no parallelism.
+
+The current optimization strategies can be grouped as follows:
+
+* **Maximum parallelism**:
+ ``brute_force``, ``random_sample``
+
+* **Limited parallelism**:
+ ``genetic_algorithm``, ``pso``, ``diff_evo``, ``firefly_algorithm``
+
+* **No parallelism**:
+ ``minimize``, ``basinhopping``, ``greedy_mls``, ``ordered_greedy_mls``,
+ ``greedy_ils``, ``dual_annealing``, ``mls``,
+ ``simulated_annealing``, ``bayes_opt``
+
+
+
+Setting up Ray
+--------------
+
+Kernel Tuner uses `Ray `_ to distribute kernel evaluations across multiple GPUs.
+ay is an open-source framework for distributed computing in Python.
+
+To use parallel tuning, you must first install Ray itself:
+
+.. code-block:: bash
+
+ $ pip install ray
+
+Next, you must set up a Ray cluster.
+Kernel Tuner will internally attempt to connect to an existing cluster by calling:
+
+.. code-block:: python
+
+ ray.init(address="auto")
+
+Refer to the Ray documentation for details on how ``ray.init()`` connects to a local or remote cluster
+(`documentation `_).
+For example, you can set the ``RAY_ADDRESS`` environment variable to point to the address of a remote Ray head node.
+Alternatively, you may manually call ``ray.init(address="your_head_node_ip:6379")`` before calling ``tune_kernel``.
+
+Here are some common ways to set up your cluster:
+
+
+Local multi-GPU machine
+***********************
+
+By default, on a machine with multiple GPUs, Ray will start a temporary local cluster and automatically detect all available GPUs.
+Kernel Tuner can then use these GPUs in parallel for tuning.
+
+
+Distributed cluster with SLURM (easy, Ray ≥2.49)
+************************************************
+
+The most straightforward way to use Ray on a SLURM cluster is to use the ``ray symmetric-run`` command, available from Ray **2.49** onwards.
+This launches a Ray environment, runs your script, and then shuts it down again.
+
+Consider the following script ``launch_ray.sh``.
+
+.. literalinclude:: launch_ray.sh
+ :language: bash
+
+Next, run your Kernel Tuner script using ``srun``.
+The exact command depends on your cluster.
+In the example below, ``-N4`` indicates 4 nodes and ``--gres=gpu:1`` indicates 1 GPU per node.
+
+.. code-block:: bash
+
+ $ srun -N4 --gres=gpu:1 launch_ray.sh python3 my_tuning_script.py
+
+
+Distributed Cluster with SLURM (manual, Ray <2.49)
+**************************************************
+
+An alternative way to use Ray on SLURM is to launch a Ray cluster, obtain the IP address of the head node, and the connect to it remotely.
+
+Consider the following sbatch script ``submit_ray.sh``.
+
+.. literalinclude:: submit_ray.sh
+ :language: bash
+
+Next, submit your job using ``sbatch``.
+
+.. code-block:: bash
+
+ $ sbatch submit_ray.sh
+ Submitted batch job 1223577
+
+After this, inspect the file `slurm-1223577.out` and search for the following line:
+
+.. code-block::
+
+ $ grep RAY_ADDRESS slurm-1223577.out
+ Launching head node: RAY_ADDRESS=145.184.221.164:6379
+
+Finally, launch your application using:
+
+.. code-block::
+
+ RAY_ADDRESS=145.184.221.164:6379 python my_tuning_script.py
diff --git a/doc/source/parallel_runner.png b/doc/source/parallel_runner.png
new file mode 100644
index 00000000..30785a63
Binary files /dev/null and b/doc/source/parallel_runner.png differ
diff --git a/doc/source/submit_ray.sh b/doc/source/submit_ray.sh
new file mode 100644
index 00000000..63eec967
--- /dev/null
+++ b/doc/source/submit_ray.sh
@@ -0,0 +1,26 @@
+#!/bin/bash
+#SBATCH --time=00:10:00
+#SBATCH --nodes=2
+#SBATCH --ntasks-per-node=1
+#SBATCH --gpus-per-task=1
+set -euo pipefail
+
+HEAD_NODE=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n1)
+HEAD_NODE_IP=$(srun -N1 -n1 -w "$HEAD_NODE" bash -lc 'hostname -I | awk "{print \$1}"')
+RAY_PORT=6379
+RAY_ADDRESS="${HEAD_NODE_IP}:${RAY_PORT}"
+
+echo "Launching head node: RAY_ADDRESS=$RAY_ADDRESS"
+srun --nodes=1 --ntasks=1 -w "$HEAD_NODE" \
+ ray start --head --node-ip-address="$HEAD_NODE_IP" --port="$RAY_PORT" --block &
+sleep 5
+
+NUM_WORKERS=$((SLURM_JOB_NUM_NODES - 1))
+echo "Launching ${NUM_WORKERS} worker node(s)"
+if [[ "$NUM_WORKERS" -gt 0 ]]; then
+ srun -n "$NUM_WORKERS" --nodes="$NUM_WORKERS" --ntasks-per-node=1 --exclude "$HEAD_NODE" \
+ ray start --address "$RAY_ADDRESS" --block &
+fi
+
+# Keep job alive (or replace with running your workload on the head)
+wait
diff --git a/examples/cuda/sepconv_parallel.py b/examples/cuda/sepconv_parallel.py
new file mode 100644
index 00000000..074200e1
--- /dev/null
+++ b/examples/cuda/sepconv_parallel.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+import numpy
+from kernel_tuner import tune_kernel
+from collections import OrderedDict
+
+
+def tune():
+ with open("convolution.cu", "r") as f:
+ kernel_string = f.read()
+
+ # setup tunable parameters
+ tune_params = OrderedDict()
+ tune_params["filter_height"] = [i for i in range(3, 19, 2)]
+ tune_params["filter_width"] = [i for i in range(3, 19, 2)]
+ tune_params["block_size_x"] = [16 * i for i in range(1, 65)]
+ tune_params["block_size_y"] = [2**i for i in range(6)]
+ tune_params["tile_size_x"] = [i for i in range(1, 11)]
+ tune_params["tile_size_y"] = [i for i in range(1, 11)]
+
+ tune_params["use_padding"] = [0, 1] # toggle the insertion of padding in shared memory
+ tune_params["read_only"] = [0, 1] # toggle using the read-only cache
+
+ # limit the search to only use padding when its effective, and at least 32 threads in a block
+ restrict = ["use_padding==0 or (block_size_x % 32 != 0)", "block_size_x*block_size_y >= 32"]
+
+ # setup input and output dimensions
+ problem_size = (4096, 4096)
+ size = numpy.prod(problem_size)
+ largest_fh = max(tune_params["filter_height"])
+ largest_fw = max(tune_params["filter_width"])
+ input_size = (problem_size[0] + largest_fw - 1) * (problem_size[1] + largest_fh - 1)
+
+ # create input data
+ output_image = numpy.zeros(size).astype(numpy.float32)
+ input_image = numpy.random.randn(input_size).astype(numpy.float32)
+ filter_weights = numpy.random.randn(largest_fh * largest_fw).astype(numpy.float32)
+
+ # setup kernel arguments
+ cmem_args = {"d_filter": filter_weights}
+ args = [output_image, input_image, filter_weights]
+
+ # tell the Kernel Tuner how to compute grid dimensions
+ grid_div_x = ["block_size_x", "tile_size_x"]
+ grid_div_y = ["block_size_y", "tile_size_y"]
+
+ # start tuning separable convolution (row)
+ tune_params["filter_height"] = [1]
+ tune_params["tile_size_y"] = [1]
+ results_row = tune_kernel(
+ "convolution_kernel",
+ kernel_string,
+ problem_size,
+ args,
+ tune_params,
+ grid_div_y=grid_div_y,
+ grid_div_x=grid_div_x,
+ cmem_args=cmem_args,
+ verbose=False,
+ restrictions=restrict,
+ parallel_runner=1024,
+ cache="convolution_kernel_row",
+ )
+
+ # start tuning separable convolution (col)
+ tune_params["filter_height"] = tune_params["filter_width"][:]
+ tune_params["file_size_y"] = tune_params["tile_size_x"][:]
+ tune_params["filter_width"] = [1]
+ tune_params["tile_size_x"] = [1]
+ results_col = tune_kernel(
+ "convolution_kernel",
+ kernel_string,
+ problem_size,
+ args,
+ tune_params,
+ grid_div_y=grid_div_y,
+ grid_div_x=grid_div_x,
+ cmem_args=cmem_args,
+ verbose=False,
+ restrictions=restrict,
+ parallel_runner=1024,
+ cache="convolution_kernel_col",
+ )
+
+ return results_row, results_col
+
+
+if __name__ == "__main__":
+ results_row, results_col = tune()
diff --git a/examples/cuda/vector_add_parallel.py b/examples/cuda/vector_add_parallel.py
new file mode 100644
index 00000000..8d35ce7c
--- /dev/null
+++ b/examples/cuda/vector_add_parallel.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+
+import numpy
+from kernel_tuner import tune_kernel
+
+
+def tune():
+ kernel_string = """
+ __global__ void vector_add(float *c, float *a, float *b, int n) {
+ int i = (blockIdx.x * block_size_x) + threadIdx.x;
+ if ( i < n ) {
+ c[i] = a[i] + b[i];
+ }
+ }
+ """
+
+ size = 10000000
+
+ a = numpy.random.randn(size).astype(numpy.float32)
+ b = numpy.random.randn(size).astype(numpy.float32)
+ c = numpy.zeros_like(b)
+ n = numpy.int32(size)
+
+ args = [c, a, b, n]
+
+ tune_params = dict()
+ tune_params["block_size_x"] = [32 * i for i in range(1, 33)]
+
+ results, env = tune_kernel("vector_add", kernel_string, size, args, tune_params, parallel_workers=True)
+ print(env)
+ return results
+
+
+if __name__ == "__main__":
+ tune()
diff --git a/kernel_tuner/backends/cupy.py b/kernel_tuner/backends/cupy.py
index 51613be7..87ba1514 100644
--- a/kernel_tuner/backends/cupy.py
+++ b/kernel_tuner/backends/cupy.py
@@ -73,6 +73,7 @@ def __init__(self, device=0, iterations=7, compiler_options=None, observers=None
s.split(":")[0].strip(): s.split(":")[1].strip() for s in cupy_info
}
env["device_name"] = info_dict[f"Device {device} Name"]
+ env["pci_bus_id"] = info_dict[f"Device {device} PCI Bus ID"]
env["cuda_version"] = cp.cuda.runtime.driverGetVersion()
env["compute_capability"] = self.cc
diff --git a/kernel_tuner/backends/nvcuda.py b/kernel_tuner/backends/nvcuda.py
index 6729e683..c151e6a9 100644
--- a/kernel_tuner/backends/nvcuda.py
+++ b/kernel_tuner/backends/nvcuda.py
@@ -100,6 +100,7 @@ def __init__(self, device=0, iterations=7, compiler_options=None, observers=None
cuda_error_check(err)
env = dict()
env["device_name"] = device_properties.name.decode()
+ env["pci_bus_id"] = device_properties.pciBusID
env["cuda_version"] = driver.CUDA_VERSION
env["compute_capability"] = self.cc
env["iterations"] = self.iterations
diff --git a/kernel_tuner/backends/pycuda.py b/kernel_tuner/backends/pycuda.py
index c8f3e689..8f9326c2 100644
--- a/kernel_tuner/backends/pycuda.py
+++ b/kernel_tuner/backends/pycuda.py
@@ -139,6 +139,7 @@ def _finish_up():
# collect environment information
env = dict()
env["device_name"] = self.context.get_device().name()
+ env["pci_bus_id"] = self.context.get_device().pci_bus_id()
env["cuda_version"] = ".".join([str(i) for i in drv.get_version()])
env["compute_capability"] = self.cc
env["iterations"] = self.iterations
diff --git a/kernel_tuner/interface.py b/kernel_tuner/interface.py
index 0641eb7e..b05fd535 100644
--- a/kernel_tuner/interface.py
+++ b/kernel_tuner/interface.py
@@ -39,8 +39,6 @@
import kernel_tuner.util as util
from kernel_tuner.file_utils import get_input_file, get_t4_metadata, get_t4_results, import_class_from_file
from kernel_tuner.integration import get_objective_defaults
-from kernel_tuner.runners.sequential import SequentialRunner
-from kernel_tuner.runners.simulation import SimulationRunner
from kernel_tuner.searchspace import Searchspace
try:
@@ -476,6 +474,7 @@ def __deepcopy__(self, _):
),
("metrics", ("specifies user-defined metrics, please see :ref:`metrics`.", "dict")),
("simulation_mode", ("Simulate an auto-tuning search from an existing cachefile", "bool")),
+ ("parallel_workers", ("Set to `True` or an integer to enable parallel tuning. If set to an integer, this will be the number of parallel workers.", "int|bool")),
("observers", ("""A list of Observers to use during tuning, please see :ref:`observers`.""", "list")),
]
)
@@ -587,6 +586,7 @@ def tune_kernel(
cache=None,
metrics=None,
simulation_mode=False,
+ parallel_workers=None,
observers=None,
objective=None,
objective_higher_is_better=None,
@@ -616,18 +616,23 @@ def tune_kernel(
kernel_options = Options([(k, opts[k]) for k in _kernel_options.keys()])
tuning_options = Options([(k, opts[k]) for k in _tuning_options.keys()])
device_options = Options([(k, opts[k]) for k in _device_options.keys()])
- tuning_options["unique_results"] = {}
# copy some values from strategy_options
searchspace_construction_options = {}
+ max_fevals = None
+ time_limit = None
+
if strategy_options:
if "max_fevals" in strategy_options:
- tuning_options["max_fevals"] = strategy_options["max_fevals"]
+ max_fevals = strategy_options["max_fevals"]
+ tuning_options["max_fevals"] = max_fevals # TODO: Is this used?
if "time_limit" in strategy_options:
- tuning_options["time_limit"] = strategy_options["time_limit"]
+ time_limit = strategy_options["time_limit"]
+ tuning_options["time_limit"] = time_limit # TODO: Is this used?
if "searchspace_construction_options" in strategy_options:
searchspace_construction_options = strategy_options["searchspace_construction_options"]
+
# log the user inputs
logging.debug("tune_kernel called")
logging.debug("kernel_options: %s", util.get_config_string(kernel_options))
@@ -654,9 +659,22 @@ def tune_kernel(
strategy = brute_force
# select the runner for this job based on input
- selected_runner = SimulationRunner if simulation_mode else SequentialRunner
+ # TODO: we could use the "match case" syntax when removing support for 3.9
tuning_options.simulated_time = 0
- runner = selected_runner(kernelsource, kernel_options, device_options, iterations, observers)
+
+ if parallel_workers and simulation_mode:
+ raise ValueError("Enabling `parallel_workers` and `simulation_mode` together is not supported")
+ elif simulation_mode:
+ from kernel_tuner.runners.simulation import SimulationRunner
+ runner = SimulationRunner(kernelsource, kernel_options, device_options, iterations, observers)
+ elif parallel_workers:
+ from kernel_tuner.runners.parallel import ParallelRunner
+ num_workers = None if parallel_workers is True else parallel_workers
+ runner = ParallelRunner(kernelsource, kernel_options, device_options, tuning_options, iterations, observers, num_workers=num_workers)
+ else:
+ from kernel_tuner.runners.sequential import SequentialRunner
+ runner = SequentialRunner(kernelsource, kernel_options, device_options, iterations, observers)
+
# the user-specified function may or may not have an optional atol argument;
# we normalize it so that it always accepts atol.
@@ -672,33 +690,50 @@ def preprocess_cache(filepath):
# process cache
if cache:
cache = preprocess_cache(cache)
- util.process_cache(cache, kernel_options, tuning_options, runner)
+ tuning_options.cachefile = cache
+ tuning_options.cache = util.process_cache(cache, kernel_options, tuning_options, runner)
else:
- tuning_options.cache = {}
tuning_options.cachefile = None
+ tuning_options.cache = {}
# create search space
tuning_options.restrictions_unmodified = deepcopy(restrictions)
- searchspace = Searchspace(tune_params, restrictions, runner.dev.max_threads, **searchspace_construction_options)
+ device_info = runner.get_device_info()
+ searchspace = Searchspace(tune_params, restrictions, device_info.max_threads, **searchspace_construction_options)
+
restrictions = searchspace._modified_restrictions
tuning_options.restrictions = restrictions
+
if verbose:
print(f"Searchspace has {searchspace.size} configurations after restrictions.")
# register the times and raise an exception if the budget is exceeded
- if "time_limit" in tuning_options:
- tuning_options["startup_time"] = perf_counter() - start_overhead_time
- if tuning_options["startup_time"] > tuning_options["time_limit"]:
+ startup_time = perf_counter() - start_overhead_time
+
+ if time_limit is not None:
+ if startup_time > time_limit:
raise RuntimeError(
- f"The startup time of the tuning process ({tuning_options['startup_time']} seconds) has exceeded the time limit ({tuning_options['time_limit']} seconds). "
+ f"The startup time of the tuning process ({startup_time} seconds) has exceeded the time limit ({time_limit} seconds). "
"Please increase the time limit or decrease the size of the search space."
)
- tuning_options["start_time"] = perf_counter()
+
+ time_limit -= startup_time
+
+ if max_fevals is None or max_fevals > searchspace.size:
+ logging.info(f"evaluation limit has been adjusted from {max_fevals} to {searchspace.size} (search space size)")
+ max_fevals = searchspace.size
+
+ # Create the budget
+ tuning_options["budget"] = util.TuningBudget(time_limit, max_fevals)
+
# call the strategy to execute the tuning process
results = strategy.tune(searchspace, runner, tuning_options)
env = runner.get_environment(tuning_options)
+ # Shut down the runner
+ runner.shutdown()
+
# finished iterating over search space
if results: # checks if results is not empty
best_config = util.get_best_config(results, objective, objective_higher_is_better)
diff --git a/kernel_tuner/runners/parallel.py b/kernel_tuner/runners/parallel.py
new file mode 100644
index 00000000..0a8edcf8
--- /dev/null
+++ b/kernel_tuner/runners/parallel.py
@@ -0,0 +1,355 @@
+"""A specialized runner that tunes in parallel the parameter space."""
+from collections import deque
+import logging
+import socket
+from time import perf_counter
+from typing import List, Optional
+from kernel_tuner.core import DeviceInterface
+from kernel_tuner.interface import Options
+from kernel_tuner.runners.runner import Runner
+from kernel_tuner.util import (
+ BudgetExceededConfig,
+ ErrorConfig,
+ TuningBudget,
+ print_config_output,
+ process_metrics,
+ store_cache,
+)
+from datetime import datetime, timezone
+
+logger = logging.getLogger(__name__)
+
+try:
+ import ray
+except ImportError as e:
+ raise ImportError(f"unable to initialize the parallel runner: {e}") from e
+
+
+@ray.remote(num_gpus=1)
+class DeviceActor:
+ def __init__(
+ self, kernel_source, kernel_options, device_options, tuning_options, iterations, observers
+ ):
+ # detect language and create high-level device interface
+ self.dev = DeviceInterface(
+ kernel_source, iterations=iterations, observers=observers, **device_options
+ )
+
+ self.units = self.dev.units
+ self.quiet = device_options.quiet
+ self.kernel_source = kernel_source
+ self.warmed_up = False if self.dev.requires_warmup else True
+ self.kernel_options = kernel_options
+ self.tuning_options = tuning_options
+
+ # move data to the GPU
+ self.gpu_args = self.dev.ready_argument_list(kernel_options.arguments)
+
+ def shutdown(self):
+ ray.actor.exit_actor()
+
+ def get_environment(self):
+ # Get the device properties
+ env = dict(self.dev.get_environment())
+
+ # Get the host name
+ env["host_name"] = socket.gethostname()
+
+ # Get info about the ray instance
+ ctx = ray.get_runtime_context()
+ env["ray"] = {
+ "node_id": ctx.get_node_id(),
+ "worker_id": ctx.get_worker_id(),
+ "actor_id": ctx.get_actor_id(),
+ }
+
+ return env
+
+ def run(self, key, element):
+ # TODO: logging.debug("sequential runner started for " + self.kernel_options.kernel_name)
+ params = dict(element)
+ result = None
+ warmup_time = 0
+
+ # attempt to warmup the GPU by running the first config in the parameter space and ignoring the result
+ if not self.warmed_up:
+ warmup_time = perf_counter()
+ self.dev.compile_and_benchmark(
+ self.kernel_source, self.gpu_args, params, self.kernel_options, self.tuning_options
+ )
+ self.warmed_up = True
+ warmup_time = 1e3 * (perf_counter() - warmup_time)
+
+ result = self.dev.compile_and_benchmark(
+ self.kernel_source, self.gpu_args, params, self.kernel_options, self.tuning_options
+ )
+
+ params.update(result)
+
+ params["timestamp"] = datetime.now(timezone.utc).isoformat()
+ params["ray_actor_id"] = ray.get_runtime_context().get_actor_id()
+ params["host_name"] = socket.gethostname()
+
+ # all visited configurations are added to results to provide a trace for optimization strategies
+ return key, params
+
+
+class DeviceActorState:
+ def __init__(self, index, actor):
+ self.index = index
+ self.actor = actor
+ self.running_jobs = []
+ self.maximum_running_jobs = 1
+ self.is_running = True
+ self.env = ray.get(actor.get_environment.remote())
+
+ def __repr__(self):
+ actor_id = self.env["ray"]["actor_id"]
+ host_name = self.env["host_name"]
+ return f"{self.index} ({host_name}, {actor_id})"
+
+ def shutdown(self):
+ if not self.is_running:
+ return
+
+ self.is_running = False
+
+ try:
+ self.actor.shutdown.remote()
+ except Exception:
+ logger.exception("Failed to request actor shutdown: %s", self)
+
+ def submit(self, key, config):
+ logger.info(f"job submitted to worker {self}: {key}")
+ job = self.actor.run.remote(key, config)
+ self.running_jobs.append(job)
+ return job
+
+ def is_available(self):
+ if not self.is_running:
+ return False
+
+ # Check for ready jobs, but do not block
+ ready_jobs, self.running_jobs = ray.wait(self.running_jobs, timeout=0)
+
+ for job in ready_jobs:
+ try:
+ key, _result = ray.get(job)
+ logger.info(f"job finished on worker {self}: {key}")
+ except Exception:
+ logger.exception(f"job failed on worker {self}")
+
+ # Available if this actor can now run another job
+ return len(self.running_jobs) < self.maximum_running_jobs
+
+
+class ParallelRunner(Runner):
+ def __init__(
+ self,
+ kernel_source,
+ kernel_options,
+ device_options,
+ tuning_options,
+ iterations,
+ observers,
+ num_workers=None,
+ ):
+ if not ray.is_initialized():
+ ray.init()
+
+ if num_workers is None:
+ num_workers = int(ray.cluster_resources().get("GPU", 0))
+
+ if num_workers == 0:
+ raise RuntimeError("failed to initialize parallel runner: no GPUs found")
+
+ if num_workers < 1:
+ raise RuntimeError(
+ f"failed to initialize parallel runner: invalid number of GPUs specified: {num_workers}"
+ )
+
+ self.workers = []
+
+ try:
+ # Start workers
+ for index in range(num_workers):
+ actor = DeviceActor.remote(
+ kernel_source,
+ kernel_options,
+ device_options,
+ tuning_options,
+ iterations,
+ observers,
+ )
+ worker = DeviceActorState(index, actor)
+ self.workers.append(worker)
+
+ logger.info(f"connected to worker {worker}")
+
+ # Check if all workers have the same device
+ device_names = {w.env.get("device_name") for w in self.workers}
+ if len(device_names) != 1:
+ raise RuntimeError(
+ f"failed to initialize parallel runner: workers have different devices: {sorted(device_names)}"
+ )
+ except:
+ # If an exception occurs, shut down the worker and reraise error
+ self.shutdown()
+ raise
+
+ self.device_name = device_names.pop()
+
+ # TODO: Get units from the device?
+ self.start_time = perf_counter()
+ self.units = {"time": "ms"}
+ self.quiet = device_options.quiet
+
+ def get_device_info(self):
+ # TODO: Get this from the device?
+ return Options({"max_threads": 1024})
+
+ def get_environment(self, tuning_options):
+ return {"device_name": self.device_name, "workers": [w.env for w in self.workers]}
+
+ def shutdown(self):
+ for worker in self.workers:
+ try:
+ worker.shutdown()
+ except Exception as err:
+ logger.exception(f"error while shutting down worker {worker}")
+
+ def available_parallelism(self):
+ return len(self.workers)
+
+ def submit_jobs(self, jobs, budget: TuningBudget):
+ pending_jobs = deque(jobs)
+ running_jobs = []
+
+ while pending_jobs and not budget.is_done():
+ job_was_submitted = False
+
+ # If there is still work left, submit it now
+ for i, worker in enumerate(list(self.workers)):
+ if worker.is_available():
+ # Push worker to back of list
+ self.workers.pop(i)
+ self.workers.append(worker)
+
+ # Pop job and submit it
+ key, config = pending_jobs.popleft()
+ ref = worker.submit(key, config)
+ running_jobs.append(ref)
+
+ job_was_submitted = True
+ budget.add_evaluations(1)
+ break
+
+ # If no work was submitted, wait until a worker is available
+ if not job_was_submitted:
+ if not running_jobs:
+ raise RuntimeError("invalid state: no ray workers available")
+
+ ready_jobs, running_jobs = ray.wait(running_jobs, num_returns=1)
+
+ for result in ready_jobs:
+ yield ray.get(result)
+
+ # If there are still pending jobs, then the budget has been exceeded.
+ # We return `None` to indicate that no result is available for these jobs.
+ while pending_jobs:
+ key, _ = pending_jobs.popleft()
+ yield (key, None)
+
+ # Wait until running jobs complete
+ while running_jobs:
+ ready_jobs, running_jobs = ray.wait(running_jobs, num_returns=1)
+
+ for result in ready_jobs:
+ yield ray.get(result)
+
+ def run(self, parameter_space, tuning_options) -> List[Optional[dict]]:
+ metrics = tuning_options.metrics
+ objective = tuning_options.objective
+
+ jobs = [] # Jobs that need to be executed
+ results = [] # Results that will be returned at the end
+ key2index = dict() # Used to insert job result back into `results`
+
+ total_worker_time = 0
+
+ # Select jobs which are not in the cache
+ for index, config in enumerate(parameter_space):
+ params = dict(zip(tuning_options.tune_params.keys(), config))
+ key = ",".join([str(i) for i in config])
+
+ if key in tuning_options.cache:
+ params.update(tuning_options.cache[key])
+
+ # Simulate compile, verification, and benchmark time
+ tuning_options.budget.add_time(milliseconds=params["compile_time"])
+ tuning_options.budget.add_time(milliseconds=params["verification_time"])
+ tuning_options.budget.add_time(milliseconds=params["benchmark_time"])
+ results.append(params)
+ else:
+ assert key not in key2index, "duplicate jobs submitted"
+ key2index[key] = index
+
+ jobs.append((key, params))
+ results.append(None)
+
+
+ # Submit jobs and wait for them to finish
+ for key, result in self.submit_jobs(jobs, tuning_options.budget):
+ # `None` indicate that no result is available since the budget is exceeded.
+ # We can skip it, meaning that `results` contains `None`s for these entries
+ if result is None:
+ continue
+
+ # Store the result into the output array
+ results[key2index[key]] = result
+
+ # Collect total time spent by worker
+ total_worker_time += (
+ result["compile_time"] + result["verification_time"] + result["benchmark_time"]
+ )
+
+ if isinstance(result.get(objective), ErrorConfig):
+ logging.error(
+ "kernel configuration {key} was skipped silently due to compile or runtime failure",
+ key,
+ )
+
+ # print configuration to the console
+ print_config_output(
+ tuning_options.tune_params, result, self.quiet, tuning_options.metrics, self.units
+ )
+
+ # add configuration to cache
+ store_cache(key, result, tuning_options.cachefile, tuning_options.cache)
+
+ total_time = 1000 * (perf_counter() - self.start_time)
+ self.start_time = perf_counter()
+
+ strategy_time = self.last_strategy_time
+ self.last_strategy_time = 0
+
+ runner_time = total_time - strategy_time
+ framework_time = max(runner_time * len(self.workers) - total_worker_time, 0)
+
+ num_valid_results = sum(bool(r) for r in results) # Count the number of valid results
+
+ # Post-process all the results
+ for result in results:
+ # Skip missing results
+ if not result:
+ continue
+
+ # Amortize the time over all the results
+ result["strategy_time"] = strategy_time / num_valid_results
+ result["framework_time"] = framework_time / num_valid_results
+
+ # only compute metrics on configs that have not errored
+ if not isinstance(result.get(objective), ErrorConfig):
+ result = process_metrics(result, metrics)
+
+ return results
diff --git a/kernel_tuner/runners/runner.py b/kernel_tuner/runners/runner.py
index 80ab3214..e95b7811 100644
--- a/kernel_tuner/runners/runner.py
+++ b/kernel_tuner/runners/runner.py
@@ -13,8 +13,19 @@ def __init__(
):
pass
+ def shutdown(self):
+ pass
+
+ def available_parallelism(self):
+ """ Gives an indication of how many jobs this runner can execute in parallel. """
+ return 1
+
+ @abstractmethod
+ def get_device_info(self):
+ pass
+
@abstractmethod
- def get_environment(self):
+ def get_environment(self, tuning_options):
pass
@abstractmethod
diff --git a/kernel_tuner/runners/sequential.py b/kernel_tuner/runners/sequential.py
index 5e53093b..1814cefe 100644
--- a/kernel_tuner/runners/sequential.py
+++ b/kernel_tuner/runners/sequential.py
@@ -20,15 +20,13 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob
:param kernel_options: A dictionary with all options for the kernel.
:type kernel_options: kernel_tuner.interface.Options
- :param device_options: A dictionary with all options for the device
- on which the kernel should be tuned.
+ :param device_options: A dictionary with all options for the device on which the kernel should be tuned.
:type device_options: kernel_tuner.interface.Options
- :param iterations: The number of iterations used for benchmarking
- each kernel instance.
+ :param iterations: The number of iterations used for benchmarking each kernel instance.
:type iterations: int
"""
- #detect language and create high-level device interface
+ # detect language and create high-level device interface
self.dev = DeviceInterface(kernel_source, iterations=iterations, observers=observers, **device_options)
self.units = self.dev.units
@@ -41,9 +39,12 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob
self.last_strategy_time = 0
self.kernel_options = kernel_options
- #move data to the GPU
+ # move data to the GPU
self.gpu_args = self.dev.ready_argument_list(kernel_options.arguments)
+ def get_device_info(self):
+ return self.dev
+
def get_environment(self, tuning_options):
return self.dev.get_environment()
@@ -53,21 +54,29 @@ def run(self, parameter_space, tuning_options):
:param parameter_space: The parameter space as an iterable.
:type parameter_space: iterable
- :param tuning_options: A dictionary with all options regarding the tuning
- process.
- :type tuning_options: kernel_tuner.iterface.Options
+ :param tuning_options: A dictionary with all options regarding the tuning process.
+ :type tuning_options: kernel_tuner.interface.Options
- :returns: A list of dictionaries for executed kernel configurations and their
- execution times.
- :rtype: dict())
+ :returns: A list of dictionaries for executed kernel configurations and their execution times.
+ :rtype: dict()
"""
- logging.debug('sequential runner started for ' + self.kernel_options.kernel_name)
+ logging.debug("sequential runner started for " + self.kernel_options.kernel_name)
results = []
+ # self.last_strategy_time is set by cost_func
+ strategy_time_per_config = self.last_strategy_time / len(parameter_space) if len(parameter_space) > 0 else 0
+
# iterate over parameter space
for element in parameter_space:
+ # If the time limit is exceeded, just skip this element. Add `None` to
+ # indicate to CostFunc that no result is available for this config.
+ if tuning_options.budget.is_done():
+ results.append(None)
+ continue
+
+ tuning_options.budget.add_evaluations(1)
params = dict(zip(tuning_options.tune_params.keys(), element))
result = None
@@ -77,33 +86,47 @@ def run(self, parameter_space, tuning_options):
x_int = ",".join([str(i) for i in element])
if tuning_options.cache and x_int in tuning_options.cache:
params.update(tuning_options.cache[x_int])
- params['compile_time'] = 0
- params['verification_time'] = 0
- params['benchmark_time'] = 0
+
+ # Simulate compile, verification, and benchmark time
+ tuning_options.budget.add_time(milliseconds=params["compile_time"])
+ tuning_options.budget.add_time(milliseconds=params["verification_time"])
+ tuning_options.budget.add_time(milliseconds=params["benchmark_time"])
else:
# attempt to warmup the GPU by running the first config in the parameter space and ignoring the result
if not self.warmed_up:
warmup_time = perf_counter()
- self.dev.compile_and_benchmark(self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options)
+ self.dev.compile_and_benchmark(
+ self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options
+ )
self.warmed_up = True
warmup_time = 1e3 * (perf_counter() - warmup_time)
- result = self.dev.compile_and_benchmark(self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options)
+ result = self.dev.compile_and_benchmark(
+ self.kernel_source, self.gpu_args, params, self.kernel_options, tuning_options
+ )
params.update(result)
if tuning_options.objective in result and isinstance(result[tuning_options.objective], ErrorConfig):
- logging.debug('kernel configuration was skipped silently due to compile or runtime failure')
+ logging.debug("kernel configuration was skipped silently due to compile or runtime failure")
# only compute metrics on configs that have not errored
if tuning_options.metrics and not isinstance(params.get(tuning_options.objective), ErrorConfig):
params = process_metrics(params, tuning_options.metrics)
# get the framework time by estimating based on other times
- total_time = 1000 * ((perf_counter() - self.start_time) - warmup_time)
- params['strategy_time'] = self.last_strategy_time
- params['framework_time'] = max(total_time - (params['compile_time'] + params['verification_time'] + params['benchmark_time'] + params['strategy_time']), 0)
- params['timestamp'] = str(datetime.now(timezone.utc))
+ total_time = 1000 * (perf_counter() - self.start_time) - warmup_time
+ params["strategy_time"] = strategy_time_per_config
+ params["framework_time"] = max(
+ total_time
+ - (
+ params["compile_time"]
+ + params["verification_time"]
+ + params["benchmark_time"]
+ ),
+ 0,
+ )
+ params["timestamp"] = str(datetime.now(timezone.utc))
self.start_time = perf_counter()
if result:
@@ -111,7 +134,7 @@ def run(self, parameter_space, tuning_options):
print_config_output(tuning_options.tune_params, params, self.quiet, tuning_options.metrics, self.units)
# add configuration to cache
- store_cache(x_int, params, tuning_options)
+ store_cache(x_int, params, tuning_options.cachefile, tuning_options.cache)
# all visited configurations are added to results to provide a trace for optimization strategies
results.append(params)
diff --git a/kernel_tuner/runners/simulation.py b/kernel_tuner/runners/simulation.py
index 9695879d..0c0affd7 100644
--- a/kernel_tuner/runners/simulation.py
+++ b/kernel_tuner/runners/simulation.py
@@ -16,11 +16,11 @@ class SimulationDevice(_SimulationDevice):
@property
def name(self):
- return self.env['device_name']
+ return self.env["device_name"]
@name.setter
def name(self, value):
- self.env['device_name'] = value
+ self.env["device_name"] = value
if not self.quiet:
print("Simulating: " + value)
@@ -40,12 +40,10 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob
:param kernel_options: A dictionary with all options for the kernel.
:type kernel_options: kernel_tuner.interface.Options
- :param device_options: A dictionary with all options for the device
- on which the kernel should be tuned.
+ :param device_options: A dictionary with all options for the device on which the kernel should be tuned.
:type device_options: kernel_tuner.interface.Options
- :param iterations: The number of iterations used for benchmarking
- each kernel instance.
+ :param iterations: The number of iterations used for benchmarking each kernel instance.
:type iterations: int
"""
self.quiet = device_options.quiet
@@ -56,14 +54,18 @@ def __init__(self, kernel_source, kernel_options, device_options, iterations, ob
self.kernel_options = kernel_options
self.start_time = perf_counter()
+ self.total_simulated_time = 0
self.last_strategy_start_time = self.start_time
self.last_strategy_time = 0
self.units = {}
+ def get_device_info(self):
+ return self.dev
+
def get_environment(self, tuning_options):
env = self.dev.get_environment()
env["simulation"] = True
- env["simulated_time"] = tuning_options.simulated_time
+ env["simulated_time"] = self.total_simulated_time
return env
def run(self, parameter_space, tuning_options):
@@ -72,64 +74,62 @@ def run(self, parameter_space, tuning_options):
:param parameter_space: The parameter space as an iterable.
:type parameter_space: iterable
- :param tuning_options: A dictionary with all options regarding the tuning
- process.
+ :param tuning_options: A dictionary with all options regarding the tuning process.
:type tuning_options: kernel_tuner.iterface.Options
- :returns: A list of dictionaries for executed kernel configurations and their
- execution times.
+ :returns: A list of dictionaries for executed kernel configurations and their execution times.
:rtype: dict()
"""
- logging.debug('simulation runner started for ' + self.kernel_options.kernel_name)
+ logging.debug("simulation runner started for " + self.kernel_options.kernel_name)
results = []
- # iterate over parameter space
+ # self.last_strategy_time is set by cost_func
+ strategy_time_per_config = self.last_strategy_time / len(parameter_space) if len(parameter_space) > 0 else 0
+
+ # iterate over parameter space
for element in parameter_space:
+ # Append `None` to indicate that the tuning budget has been exceeded
+ if tuning_options.budget.is_done():
+ results.append(None)
+ continue
+
# check if element is in the cache
- x_int = ",".join([str(i) for i in element])
- if tuning_options.cache and x_int in tuning_options.cache:
- result = tuning_options.cache[x_int].copy()
+ key = ",".join([str(i) for i in element])
+
+ if key in tuning_options.cache:
+ # Get from cache and create a copy
+ result = dict(tuning_options.cache[key])
# only compute metrics on configs that have not errored
if tuning_options.metrics and not isinstance(result.get(tuning_options.objective), util.ErrorConfig):
result = util.process_metrics(result, tuning_options.metrics)
-
- # Simulate behavior of sequential runner that when a configuration is
- # served from the cache by the sequential runner, the compile_time,
- # verification_time, and benchmark_time are set to 0.
- # This step is only performed in the simulation runner when a configuration
- # is served from the cache beyond the first timel. That is, when the
- # configuration is already counted towards the unique_results.
- # It is the responsibility of cost_func to add configs to unique_results.
- if x_int in tuning_options.unique_results:
-
- result['compile_time'] = 0
- result['verification_time'] = 0
- result['benchmark_time'] = 0
-
- else:
- # configuration is evaluated for the first time, print to the console
- util.print_config_output(tuning_options.tune_params, result, self.quiet, tuning_options.metrics, self.units)
+ # configuration is evaluated for the first time, print to the console
+ util.print_config_output(
+ tuning_options.tune_params, result, self.quiet, tuning_options.metrics, self.units
+ )
# Everything but the strategy time and framework time are simulated,
- # self.last_strategy_time is set by cost_func
- result['strategy_time'] = self.last_strategy_time
+ result["strategy_time"] = strategy_time_per_config
+
+ # Simulate the evaluation of this configuration
+ tuning_options.budget.add_evaluations(1)
+ tuning_options.budget.add_time(milliseconds=result["compile_time"])
+ tuning_options.budget.add_time(milliseconds=result["verification_time"])
+ tuning_options.budget.add_time(milliseconds=result["benchmark_time"])
try:
- simulated_time = result['compile_time'] + result['verification_time'] + result['benchmark_time']
- tuning_options.simulated_time += simulated_time
+ self.total_simulated_time += result["compile_time"] + result["verification_time"] + result["benchmark_time"]
except KeyError:
- if "time_limit" in tuning_options:
- raise RuntimeError(
- "Cannot use simulation mode with a time limit on a cache file that does not have full compile, verification, and benchmark timings on all configurations"
- )
+ raise RuntimeError(
+ "Cannot use simulation mode with a time limit on a cache file that does not have full compile, verification, and benchmark timings on all configurations"
+ )
total_time = 1000 * (perf_counter() - self.start_time)
self.start_time = perf_counter()
- result['framework_time'] = total_time - self.last_strategy_time
+ result["framework_time"] = total_time
results.append(result)
continue
@@ -142,11 +142,11 @@ def run(self, parameter_space, tuning_options):
result['compile_time'] = 0
result['verification_time'] = 0
result['benchmark_time'] = 0
- result['strategy_time'] = self.last_strategy_time
+ result['strategy_time'] = strategy_time_per_config
total_time = 1000 * (perf_counter() - self.start_time)
self.start_time = perf_counter()
- result['framework_time'] = total_time - self.last_strategy_time
+ result['framework_time'] = total_time
result[tuning_options.objective] = util.InvalidConfig()
results.append(result)
diff --git a/kernel_tuner/strategies/bayes_opt.py b/kernel_tuner/strategies/bayes_opt.py
index a814e7ce..64d4c623 100644
--- a/kernel_tuner/strategies/bayes_opt.py
+++ b/kernel_tuner/strategies/bayes_opt.py
@@ -455,6 +455,33 @@ def fit_observations_to_model(self):
"""Update the model based on the current list of observations."""
self.__model.fit(self.__valid_params, self.__valid_observations)
+ def evaluate_parallel_objective_function(self, param_configs: list[tuple]) -> list[float]:
+ """Evaluates the objective function for multiple configurations in parallel."""
+ results = []
+ valid_param_configs = []
+ valid_indices = []
+
+ # Extract the valid configurations
+ for param_config in param_configs:
+ param_config = self.unprune_param_config(param_config)
+ denormalized_param_config = self.denormalize_param_config(param_config)
+ if not self.__searchspace_obj.is_param_config_valid(denormalized_param_config):
+ results.append(self.invalid_value)
+ else:
+ valid_indices.append(len(results))
+ results.append(None)
+ valid_param_configs.append(param_config)
+
+ # Run valid configurations in parallel
+ scores = self.cost_func.eval_all(valid_param_configs)
+
+ # Put the scores at the right location in the result
+ for idx, score in zip(valid_indices, scores):
+ results[idx] = score
+
+ self.fevals += len(scores)
+ return results
+
def evaluate_objective_function(self, param_config: tuple) -> float:
"""Evaluates the objective function."""
param_config = self.unprune_param_config(param_config)
diff --git a/kernel_tuner/strategies/common.py b/kernel_tuner/strategies/common.py
index b51274ce..0964597d 100644
--- a/kernel_tuner/strategies/common.py
+++ b/kernel_tuner/strategies/common.py
@@ -84,102 +84,174 @@ def __init__(
scaling: whether to internally scale parameter values. Defaults to False.
snap: whether to snap given configurations to their closests equivalent in the space. Defaults to True.
return_invalid: whether to return the util.ErrorConfig of an invalid configuration. Defaults to False.
- return_raw: returns (result, results[raw]). Key inferred from objective if set to True. Defaults to None.
"""
self.searchspace = searchspace
self.tuning_options = tuning_options
- if isinstance(self.tuning_options, dict):
- self.tuning_options["max_fevals"] = min(
- tuning_options["max_fevals"] if "max_fevals" in tuning_options else np.inf, searchspace.size
- )
+ self.objective = tuning_options.objective
+ self.objective_higher_is_better = tuning_options.objective_higher_is_better
+ self.constraint_aware = bool(tuning_options.strategy_options.get("constraint_aware"))
self.runner = runner
self.scaling = scaling
self.snap = snap
self.return_invalid = return_invalid
- self.return_raw = return_raw
- if return_raw is True:
- self.return_raw = f"{tuning_options['objective']}s"
+ self.unique_results = dict()
self.results = []
self.budget_spent_fraction = 0.0
self.invalid_return_value = invalid_value
+ def _normalize_and_validate_config(self, x, check_restrictions=True):
+ # snap values in x to nearest actual value for each parameter, unscale x if needed
+ if self.snap:
+ if self.scaling:
+ config = unscale_and_snap_to_nearest(x, self.searchspace.tune_params, self.tuning_options.eps)
+ else:
+ config = snap_to_nearest_config(x, self.searchspace.tune_params)
+ else:
+ config = x
- def __call__(self, x, check_restrictions=True):
- """Cost function used by almost all strategies."""
+ is_legal = True
+
+ # else check if this is a legal (non-restricted) configuration
+ if check_restrictions:
+ is_legal = self.searchspace.is_param_config_valid(tuple(config))
+
+ # Attempt to repare the config
+ if not is_legal and self.constraint_aware:
+ # attempt to repair
+ new_config = unscale_and_snap_to_nearest_valid(x, config, self.searchspace, self.tuning_options.eps)
+
+ if new_config:
+ config = new_config
+ is_legal = True
+
+ return config, is_legal
+
+
+ def _run_configs(self, xs, check_restrictions=True):
+ """ Takes a list of Euclidian coordinates and evaluates the configurations at those points. """
self.runner.last_strategy_time = 1000 * (perf_counter() - self.runner.last_strategy_start_time)
+ self.runner.start_time = perf_counter() # start framework time
# error value to return for numeric optimizers that need a numerical value
logging.debug("_cost_func called")
- logging.debug("x: %s", str(x))
# check if max_fevals is reached or time limit is exceeded
- self.budget_spent_fraction = util.check_stop_criterion(self.tuning_options)
+ self.tuning_options.budget.raise_exception_if_done()
+
+ batch_configs = [] # The configs to run
+ batch_keys = [] # The keys of the configs to run
+ pending_indices_by_key = dict() # Maps key => where to store result in `final_results`
+ final_results = [] # List returned to the user
+ legal_indices = [] # Indices in `final_results` that are legal
+
+ # Loop over all configurations. For each configurations there are four cases:
+ # 1. The configuration is invalid, we can skip it
+ # 2. The configuration is in `unique_results`, we can get it from there
+ # 3. The configuration is in `pending_indices_by_key`, it is duplicate in `xs`
+ # 4. The configuration must be evaluated by the runner.
+ for index, x in enumerate(xs):
+ config, is_legal = self._normalize_and_validate_config(x, check_restrictions=check_restrictions)
+ logging.debug("normalize config: %s -> %s (legal: %s)", str(x), str(config), is_legal)
+ key = ",".join([str(i) for i in config])
+
+ # 1. Not legal, just return `InvalidConfig`
+ if not is_legal:
+ result = dict(zip(self.searchspace.tune_params.keys(), config))
+ result[self.objective] = util.InvalidConfig()
+ final_results.append(result)
+
+ # 2. Attempt to retrieve from `unique_results`
+ elif key in self.unique_results:
+ result = dict(self.unique_results[key])
+ legal_indices.append(index)
+ final_results.append(result)
+
+ # 3. We have already seen this config in the current batch
+ elif key in pending_indices_by_key:
+ pending_indices_by_key[key].append(index)
+ final_results.append(None)
+
+ # 4. A new config, we must evaluate this
+ else:
+ batch_keys.append(key)
+ batch_configs.append(config)
+ pending_indices_by_key[key] = [index]
+ final_results.append(None)
- # snap values in x to nearest actual value for each parameter, unscale x if needed
- if self.snap:
- if self.scaling:
- params = unscale_and_snap_to_nearest(x, self.searchspace.tune_params, self.tuning_options.eps)
+ # compile and benchmark the batch
+ batch_results = self.runner.run(batch_configs, self.tuning_options)
+
+ for key, result in zip(batch_keys, batch_results):
+ # Skip. Result is missing because the runner has exhausted the budget
+ if result is None:
+ continue
+
+ # set in the results array
+ for index in pending_indices_by_key[key]:
+ legal_indices.append(index)
+ final_results[index] = dict(result)
+
+ # Disable the timings. Only the first result must get these.
+ result["compile_time"] = 0
+ result["verification_time"] = 0
+ result["benchmark_time"] = 0
+
+ # Put result in `unique_results`
+ self.unique_results[key] = result
+
+ # Only things in `legal_indices` are valid results
+ for index in sorted(legal_indices):
+ self.results.append(final_results[index])
+
+ # upon returning from this function control will be given back to the strategy, so reset the start time
+ self.runner.last_strategy_start_time = perf_counter()
+
+ # this check is necessary because some strategies cannot handle partially completed requests
+ # for example when only half of the configs in a population have been evaluated
+ self.tuning_options.budget.raise_exception_if_done()
+ self.budget_spent_fraction = self.tuning_options.budget.get_fraction_consumed()
+
+ # If some results are missing (`None`), then the runner did not return all results
+ # because the budget has been exceed or some other reason causing the runner to fail.
+ if not all(final_results):
+ raise util.StopCriterionReached("runner did not evaluate all given configurations")
+
+ return final_results
+
+ def eval_all(self, xs, check_restrictions=True):
+ """Cost function used by almost all strategies."""
+ results = self._run_configs(xs, check_restrictions=check_restrictions)
+ return_values = []
+
+ for result in results:
+ # get numerical return value, taking optimization direction into account
+ return_value = result[self.objective]
+
+ if not isinstance(return_value, util.ErrorConfig):
+ # this is a valid configuration, so invert value in case of maximization
+ if self.objective_higher_is_better:
+ return_value = -return_value
else:
- params = snap_to_nearest_config(x, self.searchspace.tune_params)
- else:
- params = x
- logging.debug("params %s", str(params))
+ # this is not a valid configuration, replace with float max if needed
+ if not self.return_invalid:
+ return_value = sys.float_info.max
- legal = True
- result = {}
- x_int = ",".join([str(i) for i in params])
+ # include raw data in return if requested
+ return_values.append(return_value)
- # else check if this is a legal (non-restricted) configuration
- if check_restrictions and self.searchspace.restrictions:
- legal = self.searchspace.is_param_config_valid(tuple(params))
-
-
- if not legal:
- if "constraint_aware" in self.tuning_options.strategy_options and self.tuning_options.strategy_options["constraint_aware"]:
- # attempt to repair
- new_params = unscale_and_snap_to_nearest_valid(x, params, self.searchspace, self.tuning_options.eps)
- if new_params:
- params = new_params
- legal = True
- x_int = ",".join([str(i) for i in params])
-
- if not legal:
- params_dict = dict(zip(self.searchspace.tune_params.keys(), params))
- result = params_dict
- result[self.tuning_options.objective] = util.InvalidConfig()
-
- if legal:
- # compile and benchmark this instance
- res = self.runner.run([params], self.tuning_options)
- result = res[0]
-
- # append to tuning results
- if x_int not in self.tuning_options.unique_results:
- self.tuning_options.unique_results[x_int] = result
-
- self.results.append(result)
-
- # upon returning from this function control will be given back to the strategy, so reset the start time
- self.runner.last_strategy_start_time = perf_counter()
-
- # get numerical return value, taking optimization direction into account
- return_value = result[self.tuning_options.objective]
- if not isinstance(return_value, util.ErrorConfig):
- # this is a valid configuration, so invert value in case of maximization
- return_value = -return_value if self.tuning_options.objective_higher_is_better else return_value
- else:
- # this is not a valid configuration, replace with float max if needed
- if not self.return_invalid:
- return_value = self.invalid_return_value
+ return return_values
- # include raw data in return if requested
- if self.return_raw is not None:
- try:
- return return_value, result[self.return_raw]
- except KeyError:
- return return_value, [np.nan]
+ def eval(self, x, check_restrictions=True):
+ return self.eval_all([x], check_restrictions=check_restrictions)[0]
- return return_value
+ def __call__(self, x, check_restrictions=True):
+ return self.eval(x, check_restrictions=check_restrictions)
+
+ def get_results(self):
+ return self.results
+
+ def get_num_unique_results(self):
+ return len(self.unique_results)
def get_start_pos(self):
"""Get starting position for optimization."""
@@ -234,6 +306,11 @@ def get_bounds(self):
return bounds
+def _get_nth_true(lst, n):
+ # Returns the index of the nth True value in a list
+ return [i for i, x in enumerate(lst) if x][n-1]
+
+
def setup_method_arguments(method, bounds):
"""Prepare method specific arguments."""
kwargs = {}
diff --git a/kernel_tuner/strategies/diff_evo.py b/kernel_tuner/strategies/diff_evo.py
index 29577581..ad1cd57c 100644
--- a/kernel_tuner/strategies/diff_evo.py
+++ b/kernel_tuner/strategies/diff_evo.py
@@ -115,7 +115,7 @@ def generate_population(tune_params, max_idx, popsize, searchspace, constraint_a
return population
-def differential_evolution(searchspace, cost_func, bounds, popsize, maxiter, F, CR, method, constraint_aware, verbose):
+def differential_evolution(searchspace, cost_func: CostFunc, bounds, popsize, maxiter, F, CR, method, constraint_aware, verbose):
"""
A basic implementation of the Differential Evolution algorithm.
@@ -139,7 +139,7 @@ def differential_evolution(searchspace, cost_func, bounds, popsize, maxiter, F,
population[0] = cost_func.get_start_pos()
# Calculate the initial cost for each individual in the population
- population_cost = np.array([cost_func(ind) for ind in population])
+ population_cost = np.array(cost_func.eval_all(population))
# Keep track of the best solution found so far
best_idx = np.argmin(population_cost)
@@ -208,7 +208,7 @@ def differential_evolution(searchspace, cost_func, bounds, popsize, maxiter, F,
# --- c. Selection ---
# Calculate the cost of the new trial vectors
- trial_population_cost = np.array([cost_func(ind) for ind in trial_population])
+ trial_population_cost = np.array(cost_func.eval_all(trial_population))
# Keep track of whether population changes over time
no_change = True
@@ -244,7 +244,7 @@ def differential_evolution(searchspace, cost_func, bounds, popsize, maxiter, F,
print(f"Generation {generation + 1}, Best Cost: {best_cost:.6f}")
if verbose:
- print(f"Differential Evolution completed fevals={len(cost_func.tuning_options.unique_results)}")
+ print(f"Differential Evolution completed fevals={cost_func.get_num_unique_results()}")
return {"solution": best_solution, "cost": best_cost}
diff --git a/kernel_tuner/strategies/firefly_algorithm.py b/kernel_tuner/strategies/firefly_algorithm.py
index a732d404..861c5f86 100644
--- a/kernel_tuner/strategies/firefly_algorithm.py
+++ b/kernel_tuner/strategies/firefly_algorithm.py
@@ -44,13 +44,14 @@ def tune(searchspace: Searchspace, runner, tuning_options):
swarm[0].position = x0
# compute initial intensities
- for j in range(num_particles):
- try:
+ try:
+ for j in range(num_particles):
swarm[j].compute_intensity(cost_func)
- except StopCriterionReached as e:
- if tuning_options.verbose:
- print(e)
- return cost_func.results
+ except StopCriterionReached as e:
+ if tuning_options.verbose:
+ print(e)
+ return cost_func.results
+ for j in range(num_particles):
if swarm[j].score <= best_score_global:
best_position_global = swarm[j].position
best_score_global = swarm[j].score
diff --git a/kernel_tuner/strategies/genetic_algorithm.py b/kernel_tuner/strategies/genetic_algorithm.py
index 804758ee..fa1f6cc9 100644
--- a/kernel_tuner/strategies/genetic_algorithm.py
+++ b/kernel_tuner/strategies/genetic_algorithm.py
@@ -43,19 +43,17 @@ def tune(searchspace: Searchspace, runner, tuning_options):
# determine fitness of population members
weighted_population = []
- for dna in population:
- try:
- # if we are not constraint-aware we should check restrictions upon evaluation
- time = cost_func(dna, check_restrictions=not constraint_aware)
- num_evaluated += 1
- except StopCriterionReached as e:
- if tuning_options.verbose:
- print(e)
- return cost_func.results
-
- weighted_population.append((dna, time))
+ try:
+ # if we are not constraint-aware we should check restrictions upon evaluation
+ times = cost_func.eval_all(population, check_restrictions=not constraint_aware)
+ num_evaluated += len(population)
+ except StopCriterionReached as e:
+ if tuning_options.verbose:
+ print(e)
+ return cost_func.results
# population is sorted such that better configs have higher chance of reproducing
+ weighted_population = list(zip(population, times))
weighted_population.sort(key=lambda x: x[1])
# 'best_score' is used only for printing
diff --git a/kernel_tuner/strategies/greedy_ils.py b/kernel_tuner/strategies/greedy_ils.py
index d9cf67ec..fde8bb13 100644
--- a/kernel_tuner/strategies/greedy_ils.py
+++ b/kernel_tuner/strategies/greedy_ils.py
@@ -37,16 +37,15 @@ def tune(searchspace: Searchspace, runner, tuning_options):
last_improvement = 0
while fevals < max_fevals:
-
try:
- candidate = base_hillclimb(candidate, neighbor, max_fevals, searchspace, tuning_options, cost_func, restart=restart, randomize=True)
+ candidate = base_hillclimb(candidate, neighbor, max_fevals, searchspace, cost_func, restart=restart, randomize=True)
new_score = cost_func(candidate, check_restrictions=False)
except StopCriterionReached as e:
if tuning_options.verbose:
print(e)
return cost_func.results
- fevals = len(tuning_options.unique_results)
+ fevals = cost_func.get_num_unique_results()
if new_score < best_score:
last_improvement = 0
else:
diff --git a/kernel_tuner/strategies/greedy_mls.py b/kernel_tuner/strategies/greedy_mls.py
index 4edd2f0a..dd02ff44 100644
--- a/kernel_tuner/strategies/greedy_mls.py
+++ b/kernel_tuner/strategies/greedy_mls.py
@@ -29,15 +29,14 @@ def tune(searchspace: Searchspace, runner, tuning_options):
#while searching
while fevals < max_fevals:
try:
- base_hillclimb(candidate, neighbor, max_fevals, searchspace, tuning_options, cost_func, restart=restart, randomize=randomize, order=order)
+ base_hillclimb(candidate, neighbor, max_fevals, searchspace, cost_func, restart=restart, randomize=randomize, order=order)
except StopCriterionReached as e:
if tuning_options.verbose:
print(e)
return cost_func.results
candidate = searchspace.get_random_sample(1)[0]
-
- fevals = len(tuning_options.unique_results)
+ fevals = cost_func.get_num_unique_results()
return cost_func.results
diff --git a/kernel_tuner/strategies/hillclimbers.py b/kernel_tuner/strategies/hillclimbers.py
index 120bc926..a4632801 100644
--- a/kernel_tuner/strategies/hillclimbers.py
+++ b/kernel_tuner/strategies/hillclimbers.py
@@ -4,7 +4,7 @@
from kernel_tuner.strategies.common import CostFunc
-def base_hillclimb(base_sol: tuple, neighbor_method: str, max_fevals: int, searchspace: Searchspace, tuning_options,
+def base_hillclimb(base_sol: tuple, neighbor_method: str, max_fevals: int, searchspace: Searchspace,
cost_func: CostFunc, restart=True, randomize=True, order=None):
"""Hillclimbing search until max_fevals is reached or no improvement is found.
@@ -25,10 +25,6 @@ def base_hillclimb(base_sol: tuple, neighbor_method: str, max_fevals: int, searc
:params searchspace: The searchspace object.
:type searchspace: Seachspace
- :param tuning_options: A dictionary with all options regarding the tuning
- process.
- :type tuning_options: dict
-
:param cost_func: An instance of `kernel_tuner.strategies.common.CostFunc`
:type runner: kernel_tuner.strategies.common.CostFunc
@@ -72,33 +68,39 @@ def base_hillclimb(base_sol: tuple, neighbor_method: str, max_fevals: int, searc
if randomize:
random.shuffle(indices)
+ children = []
+
# in each dimension see the possible values
for index in indices:
neighbors = searchspace.get_param_neighbors(tuple(child), index, neighbor_method, randomize)
# for each value in this dimension
for val in neighbors:
- orig_val = child[index]
+ child = list(child)
child[index] = val
+ children.append(child)
+ if restart:
+ for child in children:
# get score for this position
score = cost_func(child)
- # generalize this to other tuning objectives
if score < best_score:
best_score = score
base_sol = child[:]
found_improved = True
- if restart:
- break
- else:
- child[index] = orig_val
+ break
+ else:
+ # get score for all positions in parallel
+ scores = cost_func.eval_all(children, check_restrictions=False)
- fevals = len(tuning_options.unique_results)
- if fevals >= max_fevals:
- return base_sol
+ for child, score in zip(children, scores):
+ if score < best_score:
+ best_score = score
+ base_sol = child[:]
+ found_improved = True
- if found_improved and restart:
- break
+ if found_improved and restart:
+ break
return base_sol
diff --git a/kernel_tuner/strategies/pso.py b/kernel_tuner/strategies/pso.py
index e8489d12..eefbc866 100644
--- a/kernel_tuner/strategies/pso.py
+++ b/kernel_tuner/strategies/pso.py
@@ -51,24 +51,26 @@ def tune(searchspace: Searchspace, runner, tuning_options):
if tuning_options.verbose:
print("start iteration ", i, "best time global", best_score_global)
+ try:
+ scores = cost_func.eval_all([p.position for p in swarm])
+ except StopCriterionReached as e:
+ if tuning_options.verbose:
+ print(e)
+ return cost_func.results
+
# evaluate particle positions
- for j in range(num_particles):
- try:
- swarm[j].evaluate(cost_func)
- except StopCriterionReached as e:
- if tuning_options.verbose:
- print(e)
- return cost_func.results
+ for p, score in zip(swarm, scores):
+ p.set_score(score)
# update global best if needed
- if swarm[j].score <= best_score_global:
- best_position_global = swarm[j].position
- best_score_global = swarm[j].score
+ if score <= best_score_global:
+ best_position_global = p.position
+ best_score_global = score
# update particle velocities and positions
- for j in range(0, num_particles):
- swarm[j].update_velocity(best_position_global, w, c1, c2)
- swarm[j].update_position(bounds)
+ for p in swarm:
+ p.update_velocity(best_position_global, w, c1, c2)
+ p.update_position(bounds)
if tuning_options.verbose:
print("Final result:")
@@ -92,7 +94,10 @@ def __init__(self, bounds):
self.score = sys.float_info.max
def evaluate(self, cost_func):
- self.score = cost_func(self.position)
+ self.set_score(cost_func(self.position))
+
+ def set_score(self, score):
+ self.score = score
# update best_pos if needed
if self.score < self.best_score:
self.best_pos = self.position
diff --git a/kernel_tuner/strategies/pyatf_strategies.py b/kernel_tuner/strategies/pyatf_strategies.py
index d0d67778..1b82391c 100644
--- a/kernel_tuner/strategies/pyatf_strategies.py
+++ b/kernel_tuner/strategies/pyatf_strategies.py
@@ -85,7 +85,7 @@ def tune(searchspace: Searchspace, runner, tuning_options):
try:
# optimization loop (KT-compatible re-implementation of `make_step` from TuningRun)
- while len(tuning_options.unique_results) < searchspace.size:
+ while cost_func.get_num_unique_results() < searchspace.size:
# get new coordinates
if not coordinates_or_indices:
diff --git a/kernel_tuner/strategies/random_sample.py b/kernel_tuner/strategies/random_sample.py
index 33b5075d..19440149 100644
--- a/kernel_tuner/strategies/random_sample.py
+++ b/kernel_tuner/strategies/random_sample.py
@@ -20,16 +20,13 @@ def tune(searchspace: Searchspace, runner, tuning_options):
num_samples = min(tuning_options.max_fevals, searchspace.size)
samples = searchspace.get_random_sample(num_samples)
-
cost_func = CostFunc(searchspace, tuning_options, runner)
- for sample in samples:
- try:
- cost_func(sample, check_restrictions=False)
- except StopCriterionReached as e:
- if tuning_options.verbose:
- print(e)
- return cost_func.results
+ try:
+ cost_func.eval_all(samples, check_restrictions=False)
+ except StopCriterionReached as e:
+ if tuning_options.verbose:
+ print(e)
return cost_func.results
diff --git a/kernel_tuner/strategies/simulated_annealing.py b/kernel_tuner/strategies/simulated_annealing.py
index 962a1e34..b73bf0d6 100644
--- a/kernel_tuner/strategies/simulated_annealing.py
+++ b/kernel_tuner/strategies/simulated_annealing.py
@@ -68,7 +68,7 @@ def tune(searchspace: Searchspace, runner, tuning_options):
pos = new_pos
old_cost = new_cost
- c = len(tuning_options.unique_results)
+ c = cost_func.get_num_unique_results()
T = T_start * alpha**(max_iter/max_fevals*c)
# check if solver gets stuck and if so restart from random position
diff --git a/kernel_tuner/util.py b/kernel_tuner/util.py
index 2c50bd6c..99c2a2f6 100644
--- a/kernel_tuner/util.py
+++ b/kernel_tuner/util.py
@@ -1,5 +1,6 @@
"""Module for kernel tuner utility functions."""
import ast
+from datetime import timedelta
import errno
import json
import logging
@@ -187,28 +188,75 @@ def check_argument_list(kernel_name, kernel_string, args):
warnings.warn(errors[0], UserWarning)
-def check_stop_criterion(to: dict) -> float:
- """Check if the stop criterion is reached.
+class TuningBudget:
+ def __init__(self, time_limit=None, max_fevals=None):
+ if time_limit is not None and not isinstance(time_limit, timedelta):
+ time_limit = timedelta(seconds=time_limit)
- Args:
- to (dict): tuning options.
+ if max_fevals is not None and max_fevals <= 0:
+ raise ValueError("max_fevals must be greater than zero")
+
+ if time_limit is not None and time_limit <= timedelta(seconds=0):
+ raise ValueError("time_limit must be greater than zero")
- Raises:
- StopCriterionReached: if the max_fevals is reached or time limit is exceeded.
+ self.start_time_seconds = time.perf_counter()
+ self.time_spent_extra = timedelta()
+ self.time_limit = time_limit
+ self.num_fevals = 0
+ self.max_fevals = max_fevals
+
+ def add_evaluations(self, n=1):
+ self.num_fevals += n
+
+ def add_time(self, seconds=0, milliseconds=0):
+ self.time_spent_extra += timedelta(seconds=seconds, milliseconds=milliseconds)
+
+ def get_time_spent(self) -> timedelta:
+ seconds_passed = time.perf_counter() - self.start_time_seconds
+ return timedelta(seconds=seconds_passed) + self.time_spent_extra
+
+ def get_time_remaining(self) -> timedelta:
+ if self.time_limit is not None:
+ return max(self.time_limit - self.get_time_spent(), timedelta(seconds=0))
+ else:
+ return timedelta.max
+
+ def get_evaluations_spent(self) -> int:
+ return self.num_fevals
+
+ def get_evaluations_remaining(self) -> int:
+ if self.max_fevals is not None:
+ return max(self.max_fevals - self.num_fevals, 0)
+ else:
+ return float("inf")
+
+ def is_done(self) -> bool:
+ if self.max_fevals is not None and self.num_fevals >= self.max_fevals:
+ return True
- Returns:
- float: fraction of budget spent. If both max_fevals and time_limit are set, it returns the fraction of time.
- """
- if "max_fevals" in to:
- if len(to.unique_results) >= to.max_fevals:
- raise StopCriterionReached(f"max_fevals ({to.max_fevals}) reached")
- if not "time_limit" in to:
- return len(to.unique_results) / to.max_fevals
- if "time_limit" in to:
- time_spent = (time.perf_counter() - to.start_time) + (to.simulated_time * 1e-3) + to.startup_time
- if time_spent > to.time_limit:
+ if self.time_limit is not None and self.get_time_spent() > self.time_limit:
+ return True
+
+ return False
+
+ def raise_exception_if_done(self):
+ if self.max_fevals is not None and self.num_fevals >= self.max_fevals:
+ raise StopCriterionReached(f"max_fevals ({self.max_fevals}) reached")
+
+ if self.time_limit is not None and self.get_time_spent() > self.time_limit:
raise StopCriterionReached("time limit exceeded")
- return time_spent / to.time_limit
+
+ def get_fraction_consumed(self) -> float:
+ if self.max_fevals is not None and self.time_limit is not None:
+ time_spent = self.get_time_spent()
+ return min(1.0, time_spent / self.time_limit, self.num_fevals / self.max_fevals)
+ elif self.max_fevals is not None:
+ return min(1.0, self.num_fevals / self.max_fevals)
+ elif self.time_limit is not None:
+ return min(1.0, self.get_time_spent() / self.time_limit)
+ else:
+ return 0.0
+
@@ -684,17 +732,18 @@ def process_metrics(params, metrics):
:rtype: dict
"""
- if not isinstance(metrics, dict):
- raise ValueError("metrics should be a dictionary to preserve order and support composability")
- for k, v in metrics.items():
- if isinstance(v, str):
- value = eval(replace_param_occurrences(v, params))
- elif callable(v):
- value = v(params)
- else:
- raise ValueError("metric dicts values should be strings or callable")
- # We overwrite any existing values for the given key
- params[k] = value
+ if metrics is not None:
+ if not isinstance(metrics, dict):
+ raise ValueError("metrics should be a dictionary to preserve order and support composability")
+ for k, v in metrics.items():
+ if isinstance(v, str):
+ value = eval(replace_param_occurrences(v, params))
+ elif callable(v):
+ value = v(params)
+ else:
+ raise ValueError("metric dicts values should be strings or callable")
+ # We overwrite any existing values for the given key
+ params[k] = value
return params
@@ -1140,7 +1189,7 @@ def check_matching_problem_size(cached_problem_size, problem_size):
if cached_problem_size_arr.size != problem_size_arr.size or not (cached_problem_size_arr == problem_size_arr).all():
raise ValueError(f"Cannot load cache which contains results for different problem_size, cache: {cached_problem_size}, requested: {problem_size}")
-def process_cache(cache, kernel_options, tuning_options, runner):
+def process_cache(cachefile, kernel_options, tuning_options, runner):
"""Cache file for storing tuned configurations.
the cache file is stored using JSON and uses the following format:
@@ -1169,9 +1218,9 @@ def process_cache(cache, kernel_options, tuning_options, runner):
raise ValueError("Caching only works correctly when tunable parameters are stored in a dictionary")
# if file does not exist, create new cache
- if not os.path.isfile(cache):
+ if not os.path.isfile(cachefile):
if tuning_options.simulation_mode:
- raise ValueError(f"Simulation mode requires an existing cachefile: file {cache} does not exist")
+ raise ValueError(f"Simulation mode requires an existing cachefile: file {cachefile} does not exist")
c = dict()
c["device_name"] = runner.dev.name
@@ -1185,15 +1234,14 @@ def process_cache(cache, kernel_options, tuning_options, runner):
contents = json.dumps(c, cls=NpEncoder, indent="")[:-3] # except the last "}\n}"
# write the header to the cachefile
- with open(cache, "w") as cachefile:
- cachefile.write(contents)
+ with open(cachefile, "w") as f:
+ f.write(contents)
- tuning_options.cachefile = cache
- tuning_options.cache = {}
+ return {}
# if file exists
else:
- cached_data = read_cache(cache, open_cache=not tuning_options.simulation_mode)
+ cached_data = read_cache(cachefile, open_cache=not tuning_options.simulation_mode)
# if in simulation mode, use the device name from the cache file as the runner device name
if runner.simulation_mode:
@@ -1219,17 +1267,16 @@ def process_cache(cache, kernel_options, tuning_options, runner):
)
raise ValueError(
f"Cannot load cache which contains results obtained with different tunable parameters. \
- Cache at '{cache}' has: {cached_data['tune_params_keys']}, tuning_options has: {list(tuning_options.tune_params.keys())}"
+ Cache at '{cachefile}' has: {cached_data['tune_params_keys']}, tuning_options has: {list(tuning_options.tune_params.keys())}"
)
- tuning_options.cachefile = cache
- tuning_options.cache = cached_data["cache"]
+ return cached_data["cache"]
-def correct_open_cache(cache, open_cache=True):
+def correct_open_cache(cachefile, open_cache=True):
"""If cache file was not properly closed, pretend it was properly closed."""
- with open(cache, "r") as cachefile:
- filestr = cachefile.read().strip()
+ with open(cachefile, "r") as f:
+ filestr = f.read().strip()
# if file was not properly closed, pretend it was properly closed
if len(filestr) > 0 and filestr[-3:] not in ["}\n}", "}}}"]:
@@ -1241,15 +1288,15 @@ def correct_open_cache(cache, open_cache=True):
else:
if open_cache:
# if it was properly closed, open it for appending new entries
- with open(cache, "w") as cachefile:
- cachefile.write(filestr[:-3] + ",")
+ with open(cachefile, "w") as f:
+ f.write(filestr[:-3] + ",")
return filestr
-def read_cache(cache, open_cache=True):
+def read_cache(cachefile, open_cache=True):
"""Read the cachefile into a dictionary, if open_cache=True prepare the cachefile for appending."""
- filestr = correct_open_cache(cache, open_cache)
+ filestr = correct_open_cache(cachefile, open_cache)
error_configs = {
"InvalidConfig": InvalidConfig(),
@@ -1267,25 +1314,25 @@ def read_cache(cache, open_cache=True):
return cache_data
-def close_cache(cache):
- if not os.path.isfile(cache):
+def close_cache(cachefile):
+ if not os.path.isfile(cachefile):
raise ValueError("close_cache expects cache file to exist")
- with open(cache, "r") as fh:
+ with open(cachefile, "r") as fh:
contents = fh.read()
# close to file to make sure it can be read by JSON parsers
if contents[-1] == ",":
- with open(cache, "w") as fh:
+ with open(cachefile, "w") as fh:
fh.write(contents[:-1] + "}\n}")
-def store_cache(key, params, tuning_options):
+def store_cache(key, params, cachefile, cache):
"""Stores a new entry (key, params) to the cachefile."""
# logging.debug('store_cache called, cache=%s, cachefile=%s' % (tuning_options.cache, tuning_options.cachefile))
- if isinstance(tuning_options.cache, dict):
- if key not in tuning_options.cache:
- tuning_options.cache[key] = params
+ if isinstance(cache, dict):
+ if key not in cache:
+ cache[key] = params
# Convert ErrorConfig objects to string, wanted to do this inside the JSONconverter but couldn't get it to work
output_params = params.copy()
@@ -1293,9 +1340,9 @@ def store_cache(key, params, tuning_options):
if isinstance(v, ErrorConfig):
output_params[k] = str(v)
- if tuning_options.cachefile:
- with open(tuning_options.cachefile, "a") as cachefile:
- cachefile.write("\n" + json.dumps({key: output_params}, cls=NpEncoder)[1:-1] + ",")
+ if cachefile:
+ with open(cachefile, "a") as f:
+ f.write("\n" + json.dumps({key: output_params}, cls=NpEncoder)[1:-1] + ",")
def dump_cache(obj: str, tuning_options):
diff --git a/test/strategies/test_bayesian_optimization.py b/test/strategies/test_bayesian_optimization.py
index f8c889aa..5c7bab32 100644
--- a/test/strategies/test_bayesian_optimization.py
+++ b/test/strategies/test_bayesian_optimization.py
@@ -19,6 +19,8 @@
strategy_options = dict(popsize=0, max_fevals=10)
tuning_options = Options(dict(restrictions=[], tune_params=tune_params, strategy_options=strategy_options))
tuning_options["scaling"] = True
+tuning_options["objective"] = "time"
+tuning_options["objective_higher_is_better"] = False
tuning_options["snap"] = True
max_threads = 1024
searchspace = Searchspace(tune_params, [], max_threads)
diff --git a/test/strategies/test_common.py b/test/strategies/test_common.py
index 90f6c63e..54ecde6f 100644
--- a/test/strategies/test_common.py
+++ b/test/strategies/test_common.py
@@ -7,7 +7,7 @@
from kernel_tuner.searchspace import Searchspace
from kernel_tuner.strategies import common
from kernel_tuner.strategies.common import CostFunc
-from kernel_tuner.util import StopCriterionReached
+from kernel_tuner.util import StopCriterionReached, TuningBudget
try:
from mock import Mock
@@ -30,7 +30,7 @@ def fake_runner():
def test_cost_func():
x = [1, 4]
- tuning_options = Options(scaling=False, snap=False, tune_params=tune_params,
+ tuning_options = Options(tune_params=tune_params, budget=TuningBudget(),
restrictions=None, strategy_options={}, cache={}, unique_results={},
objective="time", objective_higher_is_better=False, metrics=None)
runner = fake_runner()
@@ -41,14 +41,13 @@ def test_cost_func():
# check if restrictions are properly handled
def restrictions(x, y):
return False
- tuning_options = Options(scaling=False, snap=False, tune_params=tune_params,
+ tuning_options = Options(tune_params=tune_params, budget=TuningBudget(),
restrictions=restrictions, strategy_options={},
verbose=True, cache={}, unique_results={},
objective="time", objective_higher_is_better=False, metrics=None)
- with raises(StopCriterionReached):
- time = CostFunc(Searchspace(tune_params, restrictions, 1024), tuning_options, runner)(x)
- assert time == sys.float_info.max
+ time = CostFunc(Searchspace(tune_params, restrictions, 1024), tuning_options, runner)(x)
+ assert time == sys.float_info.max
def test_setup_method_arguments():
diff --git a/test/strategies/test_strategies.py b/test/strategies/test_strategies.py
index ea5a2994..63d01dbd 100644
--- a/test/strategies/test_strategies.py
+++ b/test/strategies/test_strategies.py
@@ -53,6 +53,7 @@ def vector_add():
strategies.append(pytest.param(s, marks=skip_if_no_pyatf))
else:
strategies.append(s)
+
@pytest.mark.parametrize('strategy', strategies)
def test_strategies(vector_add, strategy):
options = dict(popsize=5, neighbor='adjacent')
@@ -96,7 +97,7 @@ def test_strategies(vector_add, strategy):
tune_params = vector_add[-1]
unique_results = {}
for result in results:
- x_int = ",".join([str(v) for k, v in result.items() if k in tune_params])
+ x_int = ",".join([str(v) for k, v in result.items() if k in tune_params.keys()])
if not isinstance(result["time"], InvalidConfig):
unique_results[x_int] = result["time"]
assert len(unique_results) <= filter_options["max_fevals"]
diff --git a/test/test_common.py b/test/test_common.py
index 7c1bd683..e23a5588 100644
--- a/test/test_common.py
+++ b/test/test_common.py
@@ -1,20 +1,28 @@
import random
import numpy as np
+import pytest
import kernel_tuner.strategies.common as common
+import kernel_tuner.util
from kernel_tuner.interface import Options
from kernel_tuner.searchspace import Searchspace
+@pytest.fixture
+def tuning_options():
+ tuning_options = Options()
+ tuning_options["strategy_options"] = {}
+ tuning_options["objective"] = "time"
+ tuning_options["objective_higher_is_better"] = False
+ tuning_options["budget"] = kernel_tuner.util.TuningBudget()
+ return tuning_options
+
-def test_get_bounds_x0_eps():
+def test_get_bounds_x0_eps(tuning_options):
tune_params = dict()
tune_params['x'] = [0, 1, 2, 3, 4]
searchspace = Searchspace(tune_params, [], 1024)
- tuning_options = Options()
- tuning_options["strategy_options"] = {}
-
bounds, x0, eps = common.CostFunc(searchspace, tuning_options, None, scaling=True).get_bounds_x0_eps()
assert bounds == [(0.0, 1.0)]
@@ -27,7 +35,7 @@ def test_get_bounds_x0_eps():
assert eps == 1.0
-def test_get_bounds():
+def test_get_bounds(tuning_options):
tune_params = dict()
tune_params['x'] = [0, 1, 2, 3, 4]
@@ -39,7 +47,7 @@ def test_get_bounds():
expected = [(0, 4), (0, 9900), (-11.2, 123.27)]
searchspace = Searchspace(tune_params, None, 1024)
- cost_func = common.CostFunc(searchspace, None, None)
+ cost_func = common.CostFunc(searchspace, tuning_options, None)
answer = cost_func.get_bounds()
assert answer == expected
diff --git a/test/test_runners.py b/test/test_runners.py
index 3a0a26e2..609ccad3 100644
--- a/test/test_runners.py
+++ b/test/test_runners.py
@@ -163,8 +163,8 @@ def test_time_keeping(env):
answer = [args[1] + args[2], None, None, None]
options = dict(method="uniform",
- popsize=10,
- maxiter=1,
+ popsize=5,
+ maxiter=50,
mutation_chance=1,
max_fevals=10)
start = time.perf_counter()
@@ -287,6 +287,7 @@ def test_runner(env):
device_options = Options([(k, opts.get(k, None))
for k in _device_options.keys()])
tuning_options.cachefile = None
+ tuning_options.unique_results = {}
# create runner
runner = SequentialRunner(kernelsource,
diff --git a/test/test_util_functions.py b/test/test_util_functions.py
index e785f415..4b63e864 100644
--- a/test/test_util_functions.py
+++ b/test/test_util_functions.py
@@ -3,6 +3,7 @@
import json
import os
import warnings
+import datetime
import numpy as np
import pytest
@@ -429,6 +430,92 @@ def test_check_argument_list7():
assert_user_warning(check_argument_list, [kernel_name, kernel_string, args])
+def test_tuning_budget1():
+ budget = TuningBudget()
+ assert budget.get_evaluations_spent() == 0
+ assert budget.get_evaluations_remaining() == float("inf")
+ assert not budget.is_done()
+ budget.raise_exception_if_done() # Should not raise
+ assert budget.get_fraction_consumed() == 0.0
+
+ budget.add_evaluations(9000)
+ assert budget.get_evaluations_spent() == 9000
+ assert budget.get_evaluations_remaining() == float("inf")
+ assert not budget.is_done()
+ budget.raise_exception_if_done() # Should not raise
+ assert budget.get_fraction_consumed() == 0.0
+
+ budget.add_time(seconds=9000)
+ assert budget.get_evaluations_spent() == 9000
+ assert budget.get_evaluations_remaining() == float("inf")
+ assert not budget.is_done()
+ budget.raise_exception_if_done() # Should not raise
+ assert budget.get_fraction_consumed() == 0.0
+
+def test_tuning_budget2():
+ budget = TuningBudget(max_fevals=5)
+ assert budget.get_evaluations_spent() == 0
+ assert budget.get_evaluations_remaining() == 5
+ assert not budget.is_done()
+ budget.raise_exception_if_done() # Should not raise
+ assert budget.get_fraction_consumed() == 0.0
+
+ budget.add_evaluations(4)
+ assert budget.get_evaluations_spent() == 4
+ assert budget.get_evaluations_remaining() == 1
+ assert not budget.is_done()
+ budget.raise_exception_if_done() # Should not raise
+ assert budget.get_fraction_consumed() == 4/5
+
+ budget.add_evaluations(1)
+ assert budget.get_evaluations_spent() == 5
+ assert budget.get_evaluations_remaining() == 0
+ assert budget.is_done()
+ assert pytest.raises(StopCriterionReached, budget.raise_exception_if_done)
+ assert budget.get_fraction_consumed() == 1.0
+
+
+def test_tuning_budget3():
+ # Two values are similar if they are within 0.01
+ approx = lambda x: pytest.approx(x, abs=0.01)
+
+ budget = TuningBudget(time_limit=5)
+ assert budget.get_time_spent().total_seconds() == approx(0)
+ assert budget.get_time_remaining().total_seconds() == approx(5)
+ assert budget.get_evaluations_spent() == 0
+ assert budget.get_evaluations_remaining() == float("inf")
+ assert not budget.is_done()
+ budget.raise_exception_if_done() # Should not raise
+ assert budget.get_fraction_consumed() == approx(0.0)
+
+ budget.add_evaluations(1)
+ assert budget.get_time_spent().total_seconds() == approx(0)
+ assert budget.get_time_remaining().total_seconds() == approx(5)
+ assert budget.get_evaluations_spent() == 1
+ assert budget.get_evaluations_remaining() == float("inf")
+ assert not budget.is_done()
+ budget.raise_exception_if_done() # Should not raise
+ assert budget.get_fraction_consumed() == approx(0.0)
+
+ budget.add_time(seconds=2)
+ assert budget.get_time_spent().total_seconds() == approx(2)
+ assert budget.get_time_remaining().total_seconds() == approx(3)
+ assert budget.get_evaluations_spent() == 1
+ assert budget.get_evaluations_remaining() == float("inf")
+ assert not budget.is_done()
+ budget.raise_exception_if_done() # Should not raise
+ assert budget.get_fraction_consumed() == approx(2/5)
+
+ budget.add_time(seconds=4)
+ assert budget.get_time_spent().total_seconds() == approx(6)
+ assert budget.get_time_remaining().total_seconds() == approx(0)
+ assert budget.get_evaluations_spent() == 1
+ assert budget.get_evaluations_remaining() == float("inf")
+ assert budget.is_done()
+ assert pytest.raises(StopCriterionReached, budget.raise_exception_if_done)
+ assert budget.get_fraction_consumed() == 1.0
+
+
def test_check_tune_params_list():
tune_params = dict(
zip(
@@ -611,25 +698,25 @@ def assert_open_cachefile_is_correctly_parsed(cache):
try:
# call process_cache without pre-existing cache
- process_cache(cache, kernel_options, tuning_options, runner)
+ tuning_options.cachefile = cache
+ tuning_options.cache = process_cache(cache, kernel_options, tuning_options, runner)
# check if file has been created
assert os.path.isfile(cache)
assert_open_cachefile_is_correctly_parsed(cache)
- assert tuning_options.cachefile == cache
assert isinstance(tuning_options.cache, dict)
assert len(tuning_options.cache) == 0
# store one entry in the cache
params = {"x": 4, "time": np.float32(0.1234)}
- store_cache("4", params, tuning_options)
+ store_cache("4", params, cache, tuning_options.cache)
assert len(tuning_options.cache) == 1
# close the cache
close_cache(cache)
# now test process cache with a pre-existing cache file
- process_cache(cache, kernel_options, tuning_options, runner)
+ tuning_options.cache = process_cache(cache, kernel_options, tuning_options, runner)
assert_open_cachefile_is_correctly_parsed(cache)
assert tuning_options.cache["4"]["time"] == params["time"]
@@ -638,7 +725,7 @@ def assert_open_cachefile_is_correctly_parsed(cache):
# a different kernel, device, or parameter set
with pytest.raises(ValueError) as excep:
kernel_options.kernel_name = "wrong_kernel"
- process_cache(cache, kernel_options, tuning_options, runner)
+ tuning_options.cache = process_cache(cache, kernel_options, tuning_options, runner)
assert "kernel" in str(excep.value)
# correct the kernel name from last test
@@ -646,7 +733,7 @@ def assert_open_cachefile_is_correctly_parsed(cache):
with pytest.raises(ValueError) as excep:
runner.dev.name = "wrong_device"
- process_cache(cache, kernel_options, tuning_options, runner)
+ tuning_options.cache = process_cache(cache, kernel_options, tuning_options, runner)
assert "device" in str(excep.value)
# correct the device from last test
@@ -654,7 +741,7 @@ def assert_open_cachefile_is_correctly_parsed(cache):
with pytest.raises(ValueError) as excep:
tuning_options.tune_params["y"] = ["a", "b"]
- process_cache(cache, kernel_options, tuning_options, runner)
+ tuning_options.cache = process_cache(cache, kernel_options, tuning_options, runner)
assert "parameter" in str(excep.value)
finally: