Skip to content

Commit e59bb19

Browse files
committed
runtime(lava): Add methods to get queue size and all device names for device type
Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent f4f2fe8 commit e59bb19

File tree

2 files changed

+228
-0
lines changed

2 files changed

+228
-0
lines changed

kernelci/runtime/lava.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,100 @@ def _connect(self):
403403
}
404404
return rest_api
405405

406+
def _get_response(self, url, params=None):
407+
resp = self._server.session.get(url, params=params, timeout=30)
408+
resp.raise_for_status()
409+
return resp.json()
410+
411+
def _get_all(self, url, params=None):
412+
resp = self._get_response(url, params=params)
413+
results = resp.get('results', [])
414+
next_url = resp.get('next')
415+
while next_url:
416+
resp = self._get_response(next_url)
417+
results.extend(resp.get('results', []))
418+
next_url = resp.get('next')
419+
return results
420+
421+
def get_devicetype_job_count(self, device_types):
422+
"""Get queued job counts per requested device type.
423+
424+
*device_types* can be a device type name or list of device type names.
425+
This uses scheduler.jobs.queue which is the only reliable way to query
426+
queue sizes per device type.
427+
"""
428+
if self._server.url is None:
429+
raise ValueError("LAVA server URL is not configured")
430+
431+
single_type = isinstance(device_types, str)
432+
if single_type:
433+
requested_types = [device_types]
434+
else:
435+
requested_types = list(device_types or [])
436+
if not requested_types:
437+
return 0 if single_type else {}
438+
439+
queue_url = urljoin(self._server.url, 'scheduler/jobs/queue/')
440+
counts = {device_type: 0 for device_type in requested_types}
441+
start = 0
442+
limit = 100
443+
while True:
444+
params = {
445+
'device_types': requested_types,
446+
'start': start,
447+
'limit': limit,
448+
}
449+
jobs = self._get_response(queue_url, params=params)
450+
for job in jobs:
451+
requested_type = job.get('requested_device_type')
452+
if requested_type in counts:
453+
counts[requested_type] += 1
454+
if len(jobs) < limit:
455+
break
456+
start += limit
457+
458+
if single_type:
459+
return counts.get(requested_types[0], 0)
460+
return counts
461+
462+
def get_device_names_by_type(self, device_type, online_only=False):
463+
"""Get device names for a given LAVA device type.
464+
465+
*device_type* can be a string or list of device type names.
466+
*online_only* filters devices with health == 'Good' when available.
467+
Use this with get_devicetype_job_count() to gate submissions when the
468+
queue per device type exceeds a threshold.
469+
"""
470+
if self._server.url is None:
471+
raise ValueError("LAVA server URL is not configured")
472+
473+
single_type = isinstance(device_type, str)
474+
if single_type:
475+
device_types = [device_type]
476+
else:
477+
device_types = list(device_type or [])
478+
if not device_types:
479+
return [] if single_type else {}
480+
481+
devices_url = urljoin(self._server.url, 'devices/')
482+
result = {}
483+
for dev_type in device_types:
484+
params = {'device_type': dev_type}
485+
devices = self._get_all(devices_url, params=params)
486+
names = []
487+
for device in devices:
488+
if device.get('device_type') != dev_type:
489+
continue
490+
if online_only and device.get('health') not in (None, 'Good'):
491+
continue
492+
hostname = device.get('hostname') or device.get('name')
493+
if hostname:
494+
names.append(hostname)
495+
result[dev_type] = names
496+
if single_type:
497+
return result.get(device_types[0], [])
498+
return result
499+
406500
def _submit(self, job):
407501
if self._server.url is None:
408502
return self._store_job_in_external_storage(job)

tests/test_runtime.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,53 @@
1212
# implementation.
1313
# pylint: disable=protected-access
1414

15+
import types
16+
1517
import kernelci.config
1618
import kernelci.runtime
1719

1820

21+
class _FakeResponse:
22+
def __init__(self, payload, status_code=200, text=""):
23+
"""Initialize a fake response with payload and status."""
24+
self._payload = payload
25+
self.status_code = status_code
26+
self.text = text
27+
28+
def raise_for_status(self):
29+
"""Raise an error when the response indicates failure."""
30+
if self.status_code >= 400:
31+
raise RuntimeError(f"HTTP {self.status_code}")
32+
33+
def json(self):
34+
"""Return the preloaded JSON payload."""
35+
return self._payload
36+
37+
38+
class _FakeSession:
39+
def __init__(self, get_handler=None, post_handler=None):
40+
"""Initialize a fake session with optional handlers."""
41+
self._get_handler = get_handler
42+
self._post_handler = post_handler
43+
self.calls = []
44+
45+
def get(self, url, params=None, timeout=30): # pylint: disable=unused-argument
46+
"""Invoke the GET handler and return a fake response."""
47+
if not self._get_handler:
48+
raise AssertionError("GET handler not set")
49+
self.calls.append((url, params))
50+
return _FakeResponse(self._get_handler(url, params))
51+
52+
def post( # pylint: disable=unused-argument
53+
self, url, json=None, allow_redirects=False, timeout=30
54+
):
55+
"""Invoke the POST handler and return its response."""
56+
if not self._post_handler:
57+
raise AssertionError("POST handler not set")
58+
self.calls.append((url, json))
59+
return self._post_handler(url, json)
60+
61+
1962
def test_runtimes_init():
2063
"""Test that all the runtimes can be initialised (offline)"""
2164
config = kernelci.config.load('tests/configs/runtimes.yaml')
@@ -66,3 +109,94 @@ def test_lava_priority_scale():
66109
spec_priority = int(priority)
67110
print(f"* {plan_name:12s} {lab_priority:3d} {spec_priority:3d}")
68111
assert lab_priority == spec_priority
112+
113+
114+
def test_lava_get_devicetype_job_count():
115+
"""Test queued job count aggregation via scheduler.jobs.queue."""
116+
config = kernelci.config.load('tests/configs/lava-runtimes.yaml')
117+
runtime_config = config['runtimes']['lab-min-12-max-40-new-runtime']
118+
lab = kernelci.runtime.get_runtime(runtime_config)
119+
120+
def handler(url, params):
121+
assert url.endswith('scheduler/jobs/queue/')
122+
assert params.get('device_types') == ['type-a', 'type-b']
123+
assert params.get('limit') == 100
124+
if params.get('start') == 0:
125+
jobs = (
126+
[{'requested_device_type': 'type-a'} for _ in range(60)] +
127+
[{'requested_device_type': 'type-b'} for _ in range(40)]
128+
)
129+
return jobs
130+
if params.get('start') == 100:
131+
return [{'requested_device_type': 'type-a'}]
132+
raise AssertionError(f"Unexpected request: {url} {params}")
133+
134+
lab._server = types.SimpleNamespace(
135+
url='http://lava/api/v0.2/',
136+
session=_FakeSession(get_handler=handler),
137+
)
138+
139+
counts = lab.get_devicetype_job_count(['type-a', 'type-b'])
140+
assert counts == {'type-a': 61, 'type-b': 40}
141+
142+
143+
def test_lava_get_device_names_by_type():
144+
"""Test device name lookups with type filtering and health checks."""
145+
config = kernelci.config.load('tests/configs/lava-runtimes.yaml')
146+
runtime_config = config['runtimes']['lab-min-12-max-40-new-runtime']
147+
lab = kernelci.runtime.get_runtime(runtime_config)
148+
149+
def handler(url, params):
150+
if url.endswith('devices/'):
151+
dev_type = params.get('device_type')
152+
if dev_type == 'type-a':
153+
return {
154+
'results': [
155+
{'device_type': 'type-a', 'hostname': 'dev-1', 'health': 'Good'},
156+
{'device_type': 'type-a', 'hostname': 'dev-2', 'health': 'Bad'},
157+
{'device_type': 'type-b', 'hostname': 'dev-x', 'health': 'Good'},
158+
],
159+
'next': None,
160+
}
161+
if dev_type == 'type-b':
162+
return {
163+
'results': [
164+
{'device_type': 'type-b', 'hostname': 'dev-3', 'health': 'Good'},
165+
],
166+
'next': None,
167+
}
168+
raise AssertionError(f"Unexpected request: {url} {params}")
169+
170+
lab._server = types.SimpleNamespace(
171+
url='http://lava/api/v0.2/',
172+
session=_FakeSession(get_handler=handler),
173+
)
174+
175+
names = lab.get_device_names_by_type('type-a', online_only=True)
176+
assert names == ['dev-1']
177+
178+
names_by_type = lab.get_device_names_by_type(['type-a', 'type-b'])
179+
assert names_by_type == {'type-a': ['dev-1', 'dev-2'], 'type-b': ['dev-3']}
180+
181+
182+
def test_lava_submit_rest():
183+
"""Test LAVA REST submission builds a job with expected payload."""
184+
config = kernelci.config.load('tests/configs/lava-runtimes.yaml')
185+
runtime_config = config['runtimes']['lab-min-12-max-40-new-runtime']
186+
lab = kernelci.runtime.get_runtime(runtime_config)
187+
188+
captured = {}
189+
190+
def post_handler(url, payload):
191+
assert url.endswith('jobs/')
192+
captured['json'] = payload
193+
return _FakeResponse({'job_ids': [123]})
194+
195+
lab._server = types.SimpleNamespace(
196+
url='http://lava/api/v0.2/',
197+
session=_FakeSession(post_handler=post_handler),
198+
)
199+
200+
job_id = lab._submit("jobdef")
201+
assert job_id == 123
202+
assert captured['json']['definition'] == "jobdef"

0 commit comments

Comments
 (0)