From 2fb2c24c1fa26252ec68738c6febd396bb05f54c Mon Sep 17 00:00:00 2001
From: PGijsbers
Date: Tue, 24 Mar 2026 16:07:02 +0100
Subject: [PATCH 1/3] Parallelize independent async calls
---
src/routers/openml/datasets.py | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/src/routers/openml/datasets.py b/src/routers/openml/datasets.py
index 1b6bc52..98263ca 100644
--- a/src/routers/openml/datasets.py
+++ b/src/routers/openml/datasets.py
@@ -1,3 +1,4 @@
+import asyncio
import re
from datetime import datetime
from enum import StrEnum
@@ -298,8 +299,10 @@ async def get_dataset_features(
) -> list[Feature]:
assert expdb is not None # noqa: S101
await _get_dataset_raise_otherwise(dataset_id, user, expdb)
- features = await database.datasets.get_features(dataset_id, expdb)
- ontologies = await database.datasets.get_feature_ontologies(dataset_id, expdb)
+ features, ontologies = await asyncio.gather(
+ database.datasets.get_features(dataset_id, expdb),
+ database.datasets.get_feature_ontologies(dataset_id, expdb),
+ )
for feature in features:
feature.ontology = ontologies.get(feature.index)
From a8f313d965531d3a04b5c8ec40d463e708a70e26 Mon Sep 17 00:00:00 2001
From: PGijsbers
Date: Tue, 24 Mar 2026 16:21:07 +0100
Subject: [PATCH 2/3] Fire off async jobs together if possible
---
src/routers/mldcat_ap/dataset.py | 17 ++++++++++-------
src/routers/openml/datasets.py | 10 ++++++----
src/routers/openml/flows.py | 10 ++++++----
src/routers/openml/setups.py | 17 +++++++++++------
src/routers/openml/tasks.py | 22 +++++++++++++++-------
5 files changed, 48 insertions(+), 28 deletions(-)
diff --git a/src/routers/mldcat_ap/dataset.py b/src/routers/mldcat_ap/dataset.py
index 4ee465d..998f940 100644
--- a/src/routers/mldcat_ap/dataset.py
+++ b/src/routers/mldcat_ap/dataset.py
@@ -4,6 +4,7 @@
Specific queries could be written to fetch e.g., a single feature or quality.
"""
+import asyncio
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
@@ -46,13 +47,16 @@ async def get_mldcat_ap_distribution(
) -> JsonLDGraph:
assert user_db is not None # noqa: S101
assert expdb is not None # noqa: S101
- oml_dataset = await get_dataset(
- dataset_id=distribution_id,
- user=user,
- user_db=user_db,
- expdb_db=expdb,
+ oml_dataset, openml_features, oml_qualities = await asyncio.gather(
+ get_dataset(
+ dataset_id=distribution_id,
+ user=user,
+ user_db=user_db,
+ expdb_db=expdb,
+ ),
+ get_dataset_features(distribution_id, user, expdb),
+ get_qualities(distribution_id, user, expdb),
)
- openml_features = await get_dataset_features(distribution_id, user, expdb)
features = [
Feature(
id_=f"{_server_url}/feature/{distribution_id}/{feature.index}",
@@ -61,7 +65,6 @@ async def get_mldcat_ap_distribution(
)
for feature in openml_features
]
- oml_qualities = await get_qualities(distribution_id, user, expdb)
qualities = [
Quality(
id_=f"{_server_url}/quality/{quality.name}/{distribution_id}",
diff --git a/src/routers/openml/datasets.py b/src/routers/openml/datasets.py
index 98263ca..bf16d73 100644
--- a/src/routers/openml/datasets.py
+++ b/src/routers/openml/datasets.py
@@ -405,10 +405,12 @@ async def get_dataset(
msg = f"No data file found for dataset {dataset_id}."
raise DatasetNoDataFileError(msg)
- tags = await database.datasets.get_tags_for(dataset_id, expdb_db)
- description = await database.datasets.get_description(dataset_id, expdb_db)
- processing_result = await _get_processing_information(dataset_id, expdb_db)
- status = await database.datasets.get_status(dataset_id, expdb_db)
+ tags, description, processing_result, status = await asyncio.gather(
+ database.datasets.get_tags_for(dataset_id, expdb_db),
+ database.datasets.get_description(dataset_id, expdb_db),
+ _get_processing_information(dataset_id, expdb_db),
+ database.datasets.get_status(dataset_id, expdb_db),
+ )
status_ = DatasetStatus(status.status) if status else DatasetStatus.IN_PREPARATION
diff --git a/src/routers/openml/flows.py b/src/routers/openml/flows.py
index 4125486..44c2338 100644
--- a/src/routers/openml/flows.py
+++ b/src/routers/openml/flows.py
@@ -1,3 +1,4 @@
+import asyncio
from typing import Annotated, Literal
from fastapi import APIRouter, Depends
@@ -40,7 +41,11 @@ async def get_flow(
msg = f"Flow with id {flow_id} not found."
raise FlowNotFoundError(msg)
- parameter_rows = await database.flows.get_parameters(flow_id, expdb)
+ parameter_rows, tags, subflow_rows = await asyncio.gather(
+ database.flows.get_parameters(flow_id, expdb),
+ database.flows.get_tags(flow_id, expdb),
+ database.flows.get_subflows(flow_id, expdb),
+ )
parameters = [
Parameter(
name=parameter.name,
@@ -53,9 +58,6 @@ async def get_flow(
)
for parameter in parameter_rows
]
-
- tags = await database.flows.get_tags(flow_id, expdb)
- subflow_rows = await database.flows.get_subflows(flow_id, expdb)
subflows = []
for subflow in subflow_rows:
subflows.append( # noqa: PERF401
diff --git a/src/routers/openml/setups.py b/src/routers/openml/setups.py
index 65d2d53..52a134e 100644
--- a/src/routers/openml/setups.py
+++ b/src/routers/openml/setups.py
@@ -1,5 +1,6 @@
"""All endpoints that relate to setups."""
+import asyncio
from typing import Annotated
from fastapi import APIRouter, Body, Depends
@@ -27,11 +28,13 @@ async def tag_setup(
expdb_db: Annotated[AsyncConnection, Depends(expdb_connection)],
) -> dict[str, dict[str, str | list[str]]]:
"""Add tag `tag` to setup with id `setup_id`."""
- if not await database.setups.get(setup_id, expdb_db):
+ setup, setup_tags = await asyncio.gather(
+ database.setups.get(setup_id, expdb_db),
+ database.setups.get_tags(setup_id, expdb_db),
+ )
+ if not setup:
msg = f"Setup {setup_id} not found."
raise SetupNotFoundError(msg)
-
- setup_tags = await database.setups.get_tags(setup_id, expdb_db)
matched_tag_row = next((t for t in setup_tags if t.tag.casefold() == tag.casefold()), None)
if matched_tag_row:
@@ -51,11 +54,13 @@ async def untag_setup(
expdb_db: Annotated[AsyncConnection, Depends(expdb_connection)],
) -> dict[str, dict[str, str | list[str]]]:
"""Remove tag `tag` from setup with id `setup_id`."""
- if not await database.setups.get(setup_id, expdb_db):
+ setup, setup_tags = await asyncio.gather(
+ database.setups.get(setup_id, expdb_db),
+ database.setups.get_tags(setup_id, expdb_db),
+ )
+ if not setup:
msg = f"Setup {setup_id} not found."
raise SetupNotFoundError(msg)
-
- setup_tags = await database.setups.get_tags(setup_id, expdb_db)
matched_tag_row = next((t for t in setup_tags if t.tag.casefold() == tag.casefold()), None)
if not matched_tag_row:
diff --git a/src/routers/openml/tasks.py b/src/routers/openml/tasks.py
index 788cd80..d746448 100644
--- a/src/routers/openml/tasks.py
+++ b/src/routers/openml/tasks.py
@@ -1,3 +1,4 @@
+import asyncio
import json
import re
from typing import Annotated, cast
@@ -169,23 +170,30 @@ async def get_task(
msg = f"Task {task_id} has task type {task.ttid}, but task type {task.ttid} is not found."
raise InternalError(msg)
+ task_input_rows, ttios, tags = await asyncio.gather(
+ database.tasks.get_input_for_task(task_id, expdb),
+ database.tasks.get_task_type_inout_with_template(task_type.ttid, expdb),
+ database.tasks.get_tags(task_id, expdb),
+ )
task_inputs = {
- row.input: int(row.value) if row.value.isdigit() else row.value
- for row in await database.tasks.get_input_for_task(task_id, expdb)
+ row.input: int(row.value) if row.value.isdigit() else row.value for row in task_input_rows
}
- ttios = await database.tasks.get_task_type_inout_with_template(task_type.ttid, expdb)
templates = [(tt_io.name, tt_io.io, tt_io.requirement, tt_io.template_api) for tt_io in ttios]
+ input_templates = [
+ (name, template) for name, io, required, template in templates if io == "input"
+ ]
+ filled_templates = await asyncio.gather(
+ *[fill_template(template, task, task_inputs, expdb) for name, template in input_templates],
+ )
inputs = [
- await fill_template(template, task, task_inputs, expdb) | {"name": name}
- for name, io, required, template in templates
- if io == "input"
+ filled | {"name": name}
+ for (name, _), filled in zip(input_templates, filled_templates, strict=True)
]
outputs = [
convert_template_xml_to_json(template) | {"name": name}
for name, io, required, template in templates
if io == "output"
]
- tags = await database.tasks.get_tags(task_id, expdb)
name = f"Task {task_id} ({task_type.name})"
dataset_id = task_inputs.get("source_data")
if isinstance(dataset_id, int) and (dataset := await database.datasets.get(dataset_id, expdb)):
From e5b354ecdf52ef5a5bc599cf63e5cf7bbc3eadec Mon Sep 17 00:00:00 2001
From: PGijsbers
Date: Tue, 24 Mar 2026 16:33:06 +0100
Subject: [PATCH 3/3] Update tests for better use of concurrency
---
.../openml/migration/setups_migration_test.py | 71 ++++++++++---------
1 file changed, 38 insertions(+), 33 deletions(-)
diff --git a/tests/routers/openml/migration/setups_migration_test.py b/tests/routers/openml/migration/setups_migration_test.py
index b33742e..58dd369 100644
--- a/tests/routers/openml/migration/setups_migration_test.py
+++ b/tests/routers/openml/migration/setups_migration_test.py
@@ -1,3 +1,4 @@
+import asyncio
import contextlib
import re
from collections.abc import AsyncGenerator, Callable, Iterable
@@ -114,14 +115,15 @@ async def test_setup_untag_response_is_identical_setup_doesnt_exist(
tag = "totally_new_tag_for_migration_testing"
api_key = ApiKey.SOME_USER
- original = await php_api.post(
- "/setup/untag",
- data={"api_key": api_key, "tag": tag, "setup_id": setup_id},
- )
-
- new = await py_api.post(
- f"/setup/untag?api_key={api_key}",
- json={"setup_id": setup_id, "tag": tag},
+ original, new = await asyncio.gather(
+ php_api.post(
+ "/setup/untag",
+ data={"api_key": api_key, "tag": tag, "setup_id": setup_id},
+ ),
+ py_api.post(
+ f"/setup/untag?api_key={api_key}",
+ json={"setup_id": setup_id, "tag": tag},
+ ),
)
assert original.status_code == HTTPStatus.PRECONDITION_FAILED
@@ -142,14 +144,15 @@ async def test_setup_untag_response_is_identical_tag_doesnt_exist(
tag = "totally_new_tag_for_migration_testing"
api_key = ApiKey.SOME_USER
- original = await php_api.post(
- "/setup/untag",
- data={"api_key": api_key, "tag": tag, "setup_id": setup_id},
- )
-
- new = await py_api.post(
- f"/setup/untag?api_key={api_key}",
- json={"setup_id": setup_id, "tag": tag},
+ original, new = await asyncio.gather(
+ php_api.post(
+ "/setup/untag",
+ data={"api_key": api_key, "tag": tag, "setup_id": setup_id},
+ ),
+ py_api.post(
+ f"/setup/untag?api_key={api_key}",
+ json={"setup_id": setup_id, "tag": tag},
+ ),
)
assert original.status_code == HTTPStatus.PRECONDITION_FAILED
@@ -223,14 +226,15 @@ async def test_setup_tag_response_is_identical_setup_doesnt_exist(
tag = "totally_new_tag_for_migration_testing"
api_key = ApiKey.SOME_USER
- original = await php_api.post(
- "/setup/tag",
- data={"api_key": api_key, "tag": tag, "setup_id": setup_id},
- )
-
- new = await py_api.post(
- f"/setup/tag?api_key={api_key}",
- json={"setup_id": setup_id, "tag": tag},
+ original, new = await asyncio.gather(
+ php_api.post(
+ "/setup/tag",
+ data={"api_key": api_key, "tag": tag, "setup_id": setup_id},
+ ),
+ py_api.post(
+ f"/setup/tag?api_key={api_key}",
+ json={"setup_id": setup_id, "tag": tag},
+ ),
)
assert original.status_code == HTTPStatus.PRECONDITION_FAILED
@@ -253,15 +257,16 @@ async def test_setup_tag_response_is_identical_tag_already_exists(
api_key = ApiKey.SOME_USER
async with temporary_tags(tags=[tag], setup_id=setup_id, persist=True):
- original = await php_api.post(
- "/setup/tag",
- data={"api_key": api_key, "tag": tag, "setup_id": setup_id},
- )
-
- # In Python, since PHP committed it, it's also there for Python test context
- new = await py_api.post(
- f"/setup/tag?api_key={api_key}",
- json={"setup_id": setup_id, "tag": tag},
+ # Both APIs can be tested in parallel since the tag is already persisted
+ original, new = await asyncio.gather(
+ php_api.post(
+ "/setup/tag",
+ data={"api_key": api_key, "tag": tag, "setup_id": setup_id},
+ ),
+ py_api.post(
+ f"/setup/tag?api_key={api_key}",
+ json={"setup_id": setup_id, "tag": tag},
+ ),
)
assert original.status_code == HTTPStatus.INTERNAL_SERVER_ERROR