-
-
Notifications
You must be signed in to change notification settings - Fork 53
Add tag/untag endpoints for tasks, flows, and runs #250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| from collections.abc import Awaitable, Callable | ||
| from typing import Any | ||
|
|
||
| from sqlalchemy import Row | ||
| from sqlalchemy.ext.asyncio import AsyncConnection | ||
|
|
||
| from core.errors import TagAlreadyExistsError, TagNotFoundError, TagNotOwnedError | ||
| from database.users import User, UserGroup | ||
|
|
||
|
|
||
| async def tag_entity( | ||
| entity_id: int, | ||
| tag: str, | ||
| user: User, | ||
| expdb: AsyncConnection, | ||
| *, | ||
| get_tags_fn: Callable[[int, AsyncConnection], Awaitable[list[str]]], | ||
| tag_fn: Callable[..., Awaitable[None]], | ||
| response_key: str, | ||
| ) -> dict[str, dict[str, Any]]: | ||
| tags = await get_tags_fn(entity_id, expdb) | ||
| if tag.casefold() in (t.casefold() for t in tags): | ||
| msg = f"Entity {entity_id} already tagged with {tag!r}." | ||
| raise TagAlreadyExistsError(msg) | ||
| await tag_fn(entity_id, tag, user_id=user.user_id, expdb=expdb) | ||
| tags = await get_tags_fn(entity_id, expdb) | ||
| return {response_key: {"id": str(entity_id), "tag": tags}} | ||
|
|
||
|
|
||
| async def untag_entity( | ||
| entity_id: int, | ||
| tag: str, | ||
| user: User, | ||
| expdb: AsyncConnection, | ||
| *, | ||
| get_tag_fn: Callable[[int, str, AsyncConnection], Awaitable[Row | None]], | ||
| delete_tag_fn: Callable[[int, str, AsyncConnection], Awaitable[None]], | ||
| get_tags_fn: Callable[[int, AsyncConnection], Awaitable[list[str]]], | ||
| response_key: str, | ||
| ) -> dict[str, dict[str, Any]]: | ||
| existing = await get_tag_fn(entity_id, tag, expdb) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's better for now to forgo the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this would also result in the |
||
| if existing is None: | ||
| msg = f"Tag {tag!r} not found on entity {entity_id}." | ||
| raise TagNotFoundError(msg) | ||
| groups = await user.get_groups() | ||
| if existing.uploader != user.user_id and UserGroup.ADMIN not in groups: | ||
| msg = f"Tag {tag!r} on entity {entity_id} is not owned by you." | ||
| raise TagNotOwnedError(msg) | ||
| await delete_tag_fn(entity_id, tag, expdb) | ||
| tags = await get_tags_fn(entity_id, expdb) | ||
| return {response_key: {"id": str(entity_id), "tag": tags}} | ||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -4,6 +4,11 @@ | |||||||||
| from sqlalchemy import Row, text | ||||||||||
| from sqlalchemy.ext.asyncio import AsyncConnection | ||||||||||
|
|
||||||||||
| from database.tagging import insert_tag, remove_tag, select_tag, select_tags | ||||||||||
|
|
||||||||||
| _TABLE = "implementation_tag" | ||||||||||
| _ID_COLUMN = "id" | ||||||||||
|
Comment on lines
+9
to
+10
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Would be better because it's clear what kind of a table or column they refer to. |
||||||||||
|
|
||||||||||
|
|
||||||||||
| async def get_subflows(for_flow: int, expdb: AsyncConnection) -> Sequence[Row]: | ||||||||||
| rows = await expdb.execute( | ||||||||||
|
|
@@ -23,18 +28,7 @@ async def get_subflows(for_flow: int, expdb: AsyncConnection) -> Sequence[Row]: | |||||||||
|
|
||||||||||
|
|
||||||||||
| async def get_tags(flow_id: int, expdb: AsyncConnection) -> list[str]: | ||||||||||
| rows = await expdb.execute( | ||||||||||
| text( | ||||||||||
| """ | ||||||||||
| SELECT tag | ||||||||||
| FROM implementation_tag | ||||||||||
| WHERE id = :flow_id | ||||||||||
| """, | ||||||||||
| ), | ||||||||||
| parameters={"flow_id": flow_id}, | ||||||||||
| ) | ||||||||||
| tag_rows = rows.all() | ||||||||||
| return [tag.tag for tag in tag_rows] | ||||||||||
| return await select_tags(table=_TABLE, id_column=_ID_COLUMN, id_=flow_id, expdb=expdb) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| async def get_parameters(flow_id: int, expdb: AsyncConnection) -> Sequence[Row]: | ||||||||||
|
|
@@ -54,6 +48,25 @@ async def get_parameters(flow_id: int, expdb: AsyncConnection) -> Sequence[Row]: | |||||||||
| ) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| async def tag(id_: int, tag_: str, *, user_id: int, expdb: AsyncConnection) -> None: | ||||||||||
| await insert_tag( | ||||||||||
| table=_TABLE, | ||||||||||
| id_column=_ID_COLUMN, | ||||||||||
| id_=id_, | ||||||||||
| tag_=tag_, | ||||||||||
| user_id=user_id, | ||||||||||
| expdb=expdb, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| async def get_tag(id_: int, tag_: str, expdb: AsyncConnection) -> Row | None: | ||||||||||
| return await select_tag(table=_TABLE, id_column=_ID_COLUMN, id_=id_, tag_=tag_, expdb=expdb) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| async def delete_tag(id_: int, tag_: str, expdb: AsyncConnection) -> None: | ||||||||||
| await remove_tag(table=_TABLE, id_column=_ID_COLUMN, id_=id_, tag_=tag_, expdb=expdb) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| async def get_by_name(name: str, external_version: str, expdb: AsyncConnection) -> Row | None: | ||||||||||
| """Get flow by name and external version.""" | ||||||||||
| row = await expdb.execute( | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| from sqlalchemy import Row, text | ||
| from sqlalchemy.ext.asyncio import AsyncConnection | ||
|
|
||
| from database.tagging import insert_tag, remove_tag, select_tag, select_tags | ||
|
|
||
| _TABLE = "run_tag" | ||
| _ID_COLUMN = "id" | ||
|
|
||
|
|
||
| async def get(id_: int, expdb: AsyncConnection) -> Row | None: | ||
| row = await expdb.execute( | ||
| text( | ||
| """ | ||
| SELECT * | ||
| FROM run | ||
| WHERE `id` = :run_id | ||
| """, | ||
| ), | ||
| parameters={"run_id": id_}, | ||
| ) | ||
| return row.one_or_none() | ||
|
|
||
|
|
||
| async def get_tags(id_: int, expdb: AsyncConnection) -> list[str]: | ||
| return await select_tags(table=_TABLE, id_column=_ID_COLUMN, id_=id_, expdb=expdb) | ||
|
|
||
|
|
||
| async def tag(id_: int, tag_: str, *, user_id: int, expdb: AsyncConnection) -> None: | ||
| await insert_tag( | ||
| table=_TABLE, | ||
| id_column=_ID_COLUMN, | ||
| id_=id_, | ||
| tag_=tag_, | ||
| user_id=user_id, | ||
| expdb=expdb, | ||
| ) | ||
|
|
||
|
|
||
| async def get_tag(id_: int, tag_: str, expdb: AsyncConnection) -> Row | None: | ||
| return await select_tag(table=_TABLE, id_column=_ID_COLUMN, id_=id_, tag_=tag_, expdb=expdb) | ||
|
|
||
|
|
||
| async def delete_tag(id_: int, tag_: str, expdb: AsyncConnection) -> None: | ||
| await remove_tag(table=_TABLE, id_column=_ID_COLUMN, id_=id_, tag_=tag_, expdb=expdb) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| from sqlalchemy import Row, text | ||
| from sqlalchemy.ext.asyncio import AsyncConnection | ||
|
|
||
|
|
||
| async def insert_tag( | ||
| *, | ||
| table: str, | ||
| id_column: str, | ||
|
Comment on lines
+7
to
+8
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Constrain These helpers are now the reusable tagging boundary, but they accept arbitrary identifier strings and splice them straight into every query. The current callers pass constants, yet one future non-constant call turns this module into an injection sink. Please validate against a closed set of supported identifier combinations here, instead of relying on every caller to stay disciplined. Also applies to: 16-19, 35-39, 56-59, 74-78 🤖 Prompt for AI Agents
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a good comment.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: Length of output: 1700
from sqlalchemy import Row, text
from sqlalchemy.ext.asyncio import AsyncConnection
# Allowlist of valid (table → allowed id_columns) combinations.
_ALLOWED_IDENTIFIERS: dict[str, frozenset[str]] = {
"dataset_tag": frozenset({"id"}),
"implementation_tag": frozenset({"id"}),
"run_tag": frozenset({"id"}), # used by database/runs.py — confirm if needed
"setup_tag": frozenset({"id"}),
"study_tag": frozenset({"study_id"}),
"task_tag": frozenset({"id"}),
}
def _validate_identifiers(table: str, id_column: str) -> None:
allowed_columns = _ALLOWED_IDENTIFIERS.get(table)
if allowed_columns is None:
raise ValueError(f"Unknown tag table: {table!r}")
if id_column not in allowed_columns:
raise ValueError(
f"Unknown id_column {id_column!r} for table {table!r}. "
f"Allowed: {sorted(allowed_columns)}"
)
async def insert_tag(
*,
table: str,
id_column: str,
id_: int,
tag_: str,
user_id: int,
expdb: AsyncConnection,
) -> None:
_validate_identifiers(table, id_column)
await expdb.execute(
text(
f"""
INSERT INTO {table}(`{id_column}`, `tag`, `uploader`)
VALUES (:id, :tag, :user_id)
""",
),
parameters={"id": id_, "tag": tag_, "user_id": user_id},
)
async def select_tag(
*,
table: str,
id_column: str,
id_: int,
tag_: str,
expdb: AsyncConnection,
) -> Row | None:
_validate_identifiers(table, id_column)
result = await expdb.execute(
text(
f"""
SELECT `{id_column}` as id, `tag`, `uploader`
FROM {table}
WHERE `{id_column}` = :id AND `tag` = :tag
""",
),
parameters={"id": id_, "tag": tag_},
)
return result.one_or_none()
async def remove_tag(
*,
table: str,
id_column: str,
id_: int,
tag_: str,
expdb: AsyncConnection,
) -> None:
_validate_identifiers(table, id_column)
await expdb.execute(
text(
f"""
DELETE FROM {table}
WHERE `{id_column}` = :id AND `tag` = :tag
""",
),
parameters={"id": id_, "tag": tag_},
)
async def select_tags(
*,
table: str,
id_column: str,
id_: int,
expdb: AsyncConnection,
) -> list[str]:
_validate_identifiers(table, id_column)
result = await expdb.execute(
text(
f"""
SELECT `tag`
FROM {table}
WHERE `{id_column}` = :id
""",
),
parameters={"id": id_},
)
return [row.tag for row in result.all()]Key design decisions:
✏️ Learnings added
🧠 Learnings used |
||
| id_: int, | ||
| tag_: str, | ||
| user_id: int, | ||
| expdb: AsyncConnection, | ||
| ) -> None: | ||
| await expdb.execute( | ||
| text( | ||
| f""" | ||
| INSERT INTO {table}(`{id_column}`, `tag`, `uploader`) | ||
| VALUES (:id, :tag, :user_id) | ||
| """, | ||
| ), | ||
| parameters={"id": id_, "tag": tag_, "user_id": user_id}, | ||
| ) | ||
|
|
||
|
|
||
| async def select_tag( | ||
| *, | ||
| table: str, | ||
| id_column: str, | ||
| id_: int, | ||
| tag_: str, | ||
| expdb: AsyncConnection, | ||
| ) -> Row | None: | ||
| result = await expdb.execute( | ||
| text( | ||
| f""" | ||
| SELECT `{id_column}` as id, `tag`, `uploader` | ||
| FROM {table} | ||
| WHERE `{id_column}` = :id AND `tag` = :tag | ||
| """, | ||
| ), | ||
| parameters={"id": id_, "tag": tag_}, | ||
| ) | ||
| return result.one_or_none() | ||
|
|
||
|
|
||
| async def remove_tag( | ||
| *, | ||
| table: str, | ||
| id_column: str, | ||
| id_: int, | ||
| tag_: str, | ||
| expdb: AsyncConnection, | ||
| ) -> None: | ||
| await expdb.execute( | ||
| text( | ||
| f""" | ||
| DELETE FROM {table} | ||
| WHERE `{id_column}` = :id AND `tag` = :tag | ||
| """, | ||
| ), | ||
| parameters={"id": id_, "tag": tag_}, | ||
| ) | ||
|
Comment on lines
+46
to
+62
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bind the DELETE to the checked owner. The new 🔒 Suggested direction async def remove_tag(
*,
table: str,
id_column: str,
id_: int,
tag_: str,
+ uploader: int | None = None,
expdb: AsyncConnection,
) -> None:
+ uploader_clause = " AND `uploader` = :uploader" if uploader is not None else ""
await expdb.execute(
text(
f"""
DELETE FROM {table}
- WHERE `{id_column}` = :id AND `tag` = :tag
+ WHERE `{id_column}` = :id AND `tag` = :tag{uploader_clause}
""",
),
- parameters={"id": id_, "tag": tag_},
+ parameters={
+ "id": id_,
+ "tag": tag_,
+ **({"uploader": uploader} if uploader is not None else {}),
+ },
)Then thread the expected uploader through 🧰 Tools🪛 Ruff (0.15.6)[error] 56-59: Possible SQL injection vector through string-based query construction (S608) 🤖 Prompt for AI Agents
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can ignore this one, because we would also need to account for requests by administrators to remove the tag. This isn't a very impactful race condition, but I'll make a note of it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
🧠 Learnings used |
||
|
|
||
|
|
||
| async def select_tags( | ||
| *, | ||
| table: str, | ||
| id_column: str, | ||
| id_: int, | ||
| expdb: AsyncConnection, | ||
| ) -> list[str]: | ||
| result = await expdb.execute( | ||
| text( | ||
| f""" | ||
| SELECT `tag` | ||
| FROM {table} | ||
| WHERE `{id_column}` = :id | ||
| """, | ||
| ), | ||
| parameters={"id": id_}, | ||
| ) | ||
| return [row.tag for row in result.all()] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to just return
[*tags, tag]and forgo making a database request to fetch all tags again. It will be more efficient. In either case the chance that there are concurrent requests that result in returning an outdated set of tags exists, but that's ok.