From 2fb2c24c1fa26252ec68738c6febd396bb05f54c Mon Sep 17 00:00:00 2001 From: PGijsbers Date: Tue, 24 Mar 2026 16:07:02 +0100 Subject: [PATCH 1/3] Parallelize independent async calls --- src/routers/openml/datasets.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/routers/openml/datasets.py b/src/routers/openml/datasets.py index 1b6bc52..98263ca 100644 --- a/src/routers/openml/datasets.py +++ b/src/routers/openml/datasets.py @@ -1,3 +1,4 @@ +import asyncio import re from datetime import datetime from enum import StrEnum @@ -298,8 +299,10 @@ async def get_dataset_features( ) -> list[Feature]: assert expdb is not None # noqa: S101 await _get_dataset_raise_otherwise(dataset_id, user, expdb) - features = await database.datasets.get_features(dataset_id, expdb) - ontologies = await database.datasets.get_feature_ontologies(dataset_id, expdb) + features, ontologies = await asyncio.gather( + database.datasets.get_features(dataset_id, expdb), + database.datasets.get_feature_ontologies(dataset_id, expdb), + ) for feature in features: feature.ontology = ontologies.get(feature.index) From a8f313d965531d3a04b5c8ec40d463e708a70e26 Mon Sep 17 00:00:00 2001 From: PGijsbers Date: Tue, 24 Mar 2026 16:21:07 +0100 Subject: [PATCH 2/3] Fire off async jobs together if possible --- src/routers/mldcat_ap/dataset.py | 17 ++++++++++------- src/routers/openml/datasets.py | 10 ++++++---- src/routers/openml/flows.py | 10 ++++++---- src/routers/openml/setups.py | 17 +++++++++++------ src/routers/openml/tasks.py | 22 +++++++++++++++------- 5 files changed, 48 insertions(+), 28 deletions(-) diff --git a/src/routers/mldcat_ap/dataset.py b/src/routers/mldcat_ap/dataset.py index 4ee465d..998f940 100644 --- a/src/routers/mldcat_ap/dataset.py +++ b/src/routers/mldcat_ap/dataset.py @@ -4,6 +4,7 @@ Specific queries could be written to fetch e.g., a single feature or quality. """ +import asyncio from typing import Annotated from fastapi import APIRouter, Depends, HTTPException @@ -46,13 +47,16 @@ async def get_mldcat_ap_distribution( ) -> JsonLDGraph: assert user_db is not None # noqa: S101 assert expdb is not None # noqa: S101 - oml_dataset = await get_dataset( - dataset_id=distribution_id, - user=user, - user_db=user_db, - expdb_db=expdb, + oml_dataset, openml_features, oml_qualities = await asyncio.gather( + get_dataset( + dataset_id=distribution_id, + user=user, + user_db=user_db, + expdb_db=expdb, + ), + get_dataset_features(distribution_id, user, expdb), + get_qualities(distribution_id, user, expdb), ) - openml_features = await get_dataset_features(distribution_id, user, expdb) features = [ Feature( id_=f"{_server_url}/feature/{distribution_id}/{feature.index}", @@ -61,7 +65,6 @@ async def get_mldcat_ap_distribution( ) for feature in openml_features ] - oml_qualities = await get_qualities(distribution_id, user, expdb) qualities = [ Quality( id_=f"{_server_url}/quality/{quality.name}/{distribution_id}", diff --git a/src/routers/openml/datasets.py b/src/routers/openml/datasets.py index 98263ca..bf16d73 100644 --- a/src/routers/openml/datasets.py +++ b/src/routers/openml/datasets.py @@ -405,10 +405,12 @@ async def get_dataset( msg = f"No data file found for dataset {dataset_id}." raise DatasetNoDataFileError(msg) - tags = await database.datasets.get_tags_for(dataset_id, expdb_db) - description = await database.datasets.get_description(dataset_id, expdb_db) - processing_result = await _get_processing_information(dataset_id, expdb_db) - status = await database.datasets.get_status(dataset_id, expdb_db) + tags, description, processing_result, status = await asyncio.gather( + database.datasets.get_tags_for(dataset_id, expdb_db), + database.datasets.get_description(dataset_id, expdb_db), + _get_processing_information(dataset_id, expdb_db), + database.datasets.get_status(dataset_id, expdb_db), + ) status_ = DatasetStatus(status.status) if status else DatasetStatus.IN_PREPARATION diff --git a/src/routers/openml/flows.py b/src/routers/openml/flows.py index 4125486..44c2338 100644 --- a/src/routers/openml/flows.py +++ b/src/routers/openml/flows.py @@ -1,3 +1,4 @@ +import asyncio from typing import Annotated, Literal from fastapi import APIRouter, Depends @@ -40,7 +41,11 @@ async def get_flow( msg = f"Flow with id {flow_id} not found." raise FlowNotFoundError(msg) - parameter_rows = await database.flows.get_parameters(flow_id, expdb) + parameter_rows, tags, subflow_rows = await asyncio.gather( + database.flows.get_parameters(flow_id, expdb), + database.flows.get_tags(flow_id, expdb), + database.flows.get_subflows(flow_id, expdb), + ) parameters = [ Parameter( name=parameter.name, @@ -53,9 +58,6 @@ async def get_flow( ) for parameter in parameter_rows ] - - tags = await database.flows.get_tags(flow_id, expdb) - subflow_rows = await database.flows.get_subflows(flow_id, expdb) subflows = [] for subflow in subflow_rows: subflows.append( # noqa: PERF401 diff --git a/src/routers/openml/setups.py b/src/routers/openml/setups.py index 65d2d53..52a134e 100644 --- a/src/routers/openml/setups.py +++ b/src/routers/openml/setups.py @@ -1,5 +1,6 @@ """All endpoints that relate to setups.""" +import asyncio from typing import Annotated from fastapi import APIRouter, Body, Depends @@ -27,11 +28,13 @@ async def tag_setup( expdb_db: Annotated[AsyncConnection, Depends(expdb_connection)], ) -> dict[str, dict[str, str | list[str]]]: """Add tag `tag` to setup with id `setup_id`.""" - if not await database.setups.get(setup_id, expdb_db): + setup, setup_tags = await asyncio.gather( + database.setups.get(setup_id, expdb_db), + database.setups.get_tags(setup_id, expdb_db), + ) + if not setup: msg = f"Setup {setup_id} not found." raise SetupNotFoundError(msg) - - setup_tags = await database.setups.get_tags(setup_id, expdb_db) matched_tag_row = next((t for t in setup_tags if t.tag.casefold() == tag.casefold()), None) if matched_tag_row: @@ -51,11 +54,13 @@ async def untag_setup( expdb_db: Annotated[AsyncConnection, Depends(expdb_connection)], ) -> dict[str, dict[str, str | list[str]]]: """Remove tag `tag` from setup with id `setup_id`.""" - if not await database.setups.get(setup_id, expdb_db): + setup, setup_tags = await asyncio.gather( + database.setups.get(setup_id, expdb_db), + database.setups.get_tags(setup_id, expdb_db), + ) + if not setup: msg = f"Setup {setup_id} not found." raise SetupNotFoundError(msg) - - setup_tags = await database.setups.get_tags(setup_id, expdb_db) matched_tag_row = next((t for t in setup_tags if t.tag.casefold() == tag.casefold()), None) if not matched_tag_row: diff --git a/src/routers/openml/tasks.py b/src/routers/openml/tasks.py index 788cd80..d746448 100644 --- a/src/routers/openml/tasks.py +++ b/src/routers/openml/tasks.py @@ -1,3 +1,4 @@ +import asyncio import json import re from typing import Annotated, cast @@ -169,23 +170,30 @@ async def get_task( msg = f"Task {task_id} has task type {task.ttid}, but task type {task.ttid} is not found." raise InternalError(msg) + task_input_rows, ttios, tags = await asyncio.gather( + database.tasks.get_input_for_task(task_id, expdb), + database.tasks.get_task_type_inout_with_template(task_type.ttid, expdb), + database.tasks.get_tags(task_id, expdb), + ) task_inputs = { - row.input: int(row.value) if row.value.isdigit() else row.value - for row in await database.tasks.get_input_for_task(task_id, expdb) + row.input: int(row.value) if row.value.isdigit() else row.value for row in task_input_rows } - ttios = await database.tasks.get_task_type_inout_with_template(task_type.ttid, expdb) templates = [(tt_io.name, tt_io.io, tt_io.requirement, tt_io.template_api) for tt_io in ttios] + input_templates = [ + (name, template) for name, io, required, template in templates if io == "input" + ] + filled_templates = await asyncio.gather( + *[fill_template(template, task, task_inputs, expdb) for name, template in input_templates], + ) inputs = [ - await fill_template(template, task, task_inputs, expdb) | {"name": name} - for name, io, required, template in templates - if io == "input" + filled | {"name": name} + for (name, _), filled in zip(input_templates, filled_templates, strict=True) ] outputs = [ convert_template_xml_to_json(template) | {"name": name} for name, io, required, template in templates if io == "output" ] - tags = await database.tasks.get_tags(task_id, expdb) name = f"Task {task_id} ({task_type.name})" dataset_id = task_inputs.get("source_data") if isinstance(dataset_id, int) and (dataset := await database.datasets.get(dataset_id, expdb)): From e5b354ecdf52ef5a5bc599cf63e5cf7bbc3eadec Mon Sep 17 00:00:00 2001 From: PGijsbers Date: Tue, 24 Mar 2026 16:33:06 +0100 Subject: [PATCH 3/3] Update tests for better use of concurrency --- .../openml/migration/setups_migration_test.py | 71 ++++++++++--------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/tests/routers/openml/migration/setups_migration_test.py b/tests/routers/openml/migration/setups_migration_test.py index b33742e..58dd369 100644 --- a/tests/routers/openml/migration/setups_migration_test.py +++ b/tests/routers/openml/migration/setups_migration_test.py @@ -1,3 +1,4 @@ +import asyncio import contextlib import re from collections.abc import AsyncGenerator, Callable, Iterable @@ -114,14 +115,15 @@ async def test_setup_untag_response_is_identical_setup_doesnt_exist( tag = "totally_new_tag_for_migration_testing" api_key = ApiKey.SOME_USER - original = await php_api.post( - "/setup/untag", - data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, - ) - - new = await py_api.post( - f"/setup/untag?api_key={api_key}", - json={"setup_id": setup_id, "tag": tag}, + original, new = await asyncio.gather( + php_api.post( + "/setup/untag", + data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, + ), + py_api.post( + f"/setup/untag?api_key={api_key}", + json={"setup_id": setup_id, "tag": tag}, + ), ) assert original.status_code == HTTPStatus.PRECONDITION_FAILED @@ -142,14 +144,15 @@ async def test_setup_untag_response_is_identical_tag_doesnt_exist( tag = "totally_new_tag_for_migration_testing" api_key = ApiKey.SOME_USER - original = await php_api.post( - "/setup/untag", - data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, - ) - - new = await py_api.post( - f"/setup/untag?api_key={api_key}", - json={"setup_id": setup_id, "tag": tag}, + original, new = await asyncio.gather( + php_api.post( + "/setup/untag", + data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, + ), + py_api.post( + f"/setup/untag?api_key={api_key}", + json={"setup_id": setup_id, "tag": tag}, + ), ) assert original.status_code == HTTPStatus.PRECONDITION_FAILED @@ -223,14 +226,15 @@ async def test_setup_tag_response_is_identical_setup_doesnt_exist( tag = "totally_new_tag_for_migration_testing" api_key = ApiKey.SOME_USER - original = await php_api.post( - "/setup/tag", - data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, - ) - - new = await py_api.post( - f"/setup/tag?api_key={api_key}", - json={"setup_id": setup_id, "tag": tag}, + original, new = await asyncio.gather( + php_api.post( + "/setup/tag", + data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, + ), + py_api.post( + f"/setup/tag?api_key={api_key}", + json={"setup_id": setup_id, "tag": tag}, + ), ) assert original.status_code == HTTPStatus.PRECONDITION_FAILED @@ -253,15 +257,16 @@ async def test_setup_tag_response_is_identical_tag_already_exists( api_key = ApiKey.SOME_USER async with temporary_tags(tags=[tag], setup_id=setup_id, persist=True): - original = await php_api.post( - "/setup/tag", - data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, - ) - - # In Python, since PHP committed it, it's also there for Python test context - new = await py_api.post( - f"/setup/tag?api_key={api_key}", - json={"setup_id": setup_id, "tag": tag}, + # Both APIs can be tested in parallel since the tag is already persisted + original, new = await asyncio.gather( + php_api.post( + "/setup/tag", + data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, + ), + py_api.post( + f"/setup/tag?api_key={api_key}", + json={"setup_id": setup_id, "tag": tag}, + ), ) assert original.status_code == HTTPStatus.INTERNAL_SERVER_ERROR