Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""materialized view for variant statistics
Revision ID: b85bc7b1bec7
Revises: c404b6719110
Create Date: 2025-03-14 01:53:19.898198
"""

from alembic import op
from alembic_utils.pg_materialized_view import PGMaterializedView
from sqlalchemy.dialects import postgresql

from mavedb.models.published_variant import signature, definition


# revision identifiers, used by Alembic.
revision = "b85bc7b1bec7"
down_revision = "c404b6719110"
branch_labels = None
depends_on = None


def upgrade():
op.create_entity(
PGMaterializedView(
schema="public",
signature=signature,
definition=definition.compile(dialect=postgresql.dialect()).string,
with_data=True,
)
)
op.create_index(
f"idx_{signature}_variant_id",
signature,
["variant_id"],
unique=False,
)
op.create_index(
f"idx_{signature}_variant_urn",
signature,
["variant_urn"],
unique=False,
)
op.create_index(
f"idx_{signature}_score_set_id",
signature,
["score_set_id"],
unique=False,
)
op.create_index(
f"idx_{signature}_score_set_urn",
signature,
["score_set_urn"],
unique=False,
)
op.create_index(
f"idx_{signature}_mapped_variant_id",
signature,
["mapped_variant_id"],
unique=True,
)


def downgrade():
op.drop_index(f"idx_{signature}_variant_id", signature)
op.drop_index(f"idx_{signature}_variant_urn", signature)
op.drop_index(f"idx_{signature}_mapped_variant_id", signature)
op.drop_index(f"idx_{signature}_score_set_id", signature)
op.drop_index(f"idx_{signature}_score_set_urn", signature)
op.drop_entity(
PGMaterializedView(
schema="public",
signature=signature,
definition=definition.compile(dialect=postgresql.dialect()).string,
with_data=True,
)
)
2,524 changes: 1,293 additions & 1,231 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ SQLAlchemy = "~2.0.0"

# Optional dependencies for running this application as a server
alembic = { version = "~1.7.6", optional = true }
alembic-utils = { version = "0.8.1", optional = true }
arq = { version = "~0.25.0", optional = true }
authlib = { version = "~1.3.1", optional = true }
boto3 = { version = "~1.34.97", optional = true }
Expand All @@ -51,7 +52,7 @@ fastapi = { version = "~0.95.0", optional = true }
hgvs = { version = "~1.5.4", optional = true }
orcid = { version = "~1.0.3", optional = true }
psycopg2 = { version = "~2.9.3", optional = true }
python-jose = { extras = ["cryptography"], version = "~3.3.0", optional = true }
python-jose = { extras = ["cryptography"], version = "~3.4.0", optional = true }
python-multipart = { version = "~0.0.5", optional = true }
requests = { version = "~2.32.0", optional = true }
starlette = { version = "~0.27.0", optional = true }
Expand Down Expand Up @@ -85,7 +86,7 @@ SQLAlchemy = { extras = ["mypy"], version = "~2.0.0" }


[tool.poetry.extras]
server = ["alembic", "arq", "authlib", "biocommons", "boto3", "cdot", "cryptography", "fastapi", "hgvs", "orcid", "psycopg2", "python-jose", "python-multipart", "requests", "starlette", "starlette-context", "slack-sdk", "uvicorn", "watchtower"]
server = ["alembic", "alembic-utils", "arq", "authlib", "biocommons", "boto3", "cdot", "cryptography", "fastapi", "hgvs", "orcid", "psycopg2", "python-jose", "python-multipart", "requests", "starlette", "starlette-context", "slack-sdk", "uvicorn", "watchtower"]


[tool.mypy]
Expand Down
146 changes: 146 additions & 0 deletions src/mavedb/db/view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""
Utilities for managing views via SQLAlchemy.
"""

from functools import partial

import sqlalchemy as sa
from sqlalchemy.ext import compiler
from sqlalchemy.schema import DDLElement, MetaData
from sqlalchemy.orm import Session

from mavedb.db.base import Base

# See: https://github.com/sqlalchemy/sqlalchemy/wiki/Views, https://github.com/jeffwidman/sqlalchemy-postgresql-materialized-views?tab=readme-ov-file


class CreateView(DDLElement):
def __init__(self, name: str, selectable: sa.Select, materialized: bool):
self.name = name
self.selectable = selectable
self.materialized = materialized


class DropView(DDLElement):
def __init__(self, name: str, materialized: bool):
self.name = name
self.materialized = materialized


class MaterializedView(Base):
__abstract__ = True

@classmethod
def refresh(cls, connection, concurrently=True):
"""Refresh this materialized view."""
refresh_mat_view(connection, cls.__table__.fullname, concurrently)


@compiler.compiles(CreateView)
def _create_view(element: CreateView, compiler, **kw):
return "CREATE %s %s AS %s" % (
"MATERIALIZED VIEW" if element.materialized else "VIEW",
element.name,
compiler.sql_compiler.process(element.selectable, literal_binds=True),
)


@compiler.compiles(DropView)
def _drop_view(element: DropView, compiler, **kw):
return "DROP %s %s" % ("MATERIALIZED VIEW" if element.materialized else "VIEW", element.name)


def view_exists(ddl: CreateView, target, connection: sa.Connection, materialized: bool, **kw):
inspector = sa.inspect(connection)
if inspector is None:
return False

view_names = inspector.get_materialized_view_names() if ddl.materialized else inspector.get_view_names()
return ddl.name in view_names


def view_doesnt_exist(ddl: CreateView, target, connection: sa.Connection, materialized: bool, **kw):
return not view_exists(ddl, target, connection, materialized, **kw)


def view(name: str, selectable: sa.Select, metadata: MetaData = Base.metadata, materialized: bool = False):
"""
Register a view or materialized view to SQLAlchemy. Use this function to define a view on some arbitrary
model class.

```
class MyView(Base):
__table__ = view(
"my_view",
select(
MyModel.id.label("id"),
MyModel.name.label("name"),
),
materialized=False,
)
```

When registered in this manner, SQLAlchemy will create and destroy the view along with other tables. You can
then query this view as if it were an ORM object.

```
results = db.query(select(MyView.col1).where(MyView.col2)).all()
```
"""
t = sa.table(
name,
*(sa.Column(c.name, c.type, primary_key=c.primary_key) for c in selectable.selected_columns),
)
t.primary_key.update(c for c in t.c if c.primary_key) # type: ignore

# TODO: Figure out indices.
if materialized:
sa.event.listen(
metadata,
"after_create",
CreateView(name, selectable, True).execute_if(callable_=partial(view_doesnt_exist, materialized=True)),
)
sa.event.listen(
metadata,
"before_drop",
DropView(name, True).execute_if(callable_=partial(view_exists, materialized=True)),
)

else:
sa.event.listen(
metadata,
"after_create",
CreateView(name, selectable, False).execute_if(callable_=partial(view_doesnt_exist, materialized=False)),
)
sa.event.listen(
metadata,
"before_drop",
DropView(name, False).execute_if(callable_=partial(view_exists, materialized=False)),
)

return t


def refresh_mat_view(session: Session, name: str, concurrently=True):
"""
Refreshes a single materialized view, given by `name`.
"""
# since session.execute() bypasses autoflush, must manually flush in order
# to include newly-created/modified objects in the refresh
session.flush()
_con = "CONCURRENTLY " if concurrently else ""
session.execute(sa.text("REFRESH MATERIALIZED VIEW " + _con + name))


def refresh_all_mat_views(session: Session, concurrently=True):
"""
Refreshes all materialized views. Views are refreshed in non-deterministic order,
so view definitions can't depend on each other.
"""
inspector = sa.inspect(session.connection())

if not inspector:
return

for mv in inspector.get_materialized_view_names():
refresh_mat_view(session, mv, concurrently)
1 change: 1 addition & 0 deletions src/mavedb/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"license",
"mapped_variant",
"publication_identifier",
"published_variant",
"raw_read_identifier",
"refseq_identifier",
"refseq_offset",
Expand Down
45 changes: 45 additions & 0 deletions src/mavedb/models/published_variant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from sqlalchemy import select, join

from mavedb.db.view import MaterializedView, view

from mavedb.models.score_set import ScoreSet
from mavedb.models.variant import Variant
from mavedb.models.mapped_variant import MappedVariant


signature = "published_variants_materialized_view"
definition = (
select(
Variant.id.label("variant_id"),
Variant.urn.label("variant_urn"),
MappedVariant.id.label("mapped_variant_id"),
ScoreSet.id.label("score_set_id"),
ScoreSet.urn.label("score_set_urn"),
ScoreSet.published_date.label("published_date"),
MappedVariant.current.label("current_mapped_variant"),
)
.select_from(
join(Variant, MappedVariant, Variant.id == MappedVariant.variant_id, isouter=True).join(
ScoreSet, ScoreSet.id == Variant.score_set_id
)
)
.where(
ScoreSet.published_date.is_not(None),
)
)


class PublishedVariantsMV(MaterializedView):
__table__ = view(
signature,
definition,
materialized=True,
)

variant_id = __table__.c.variant_id
variant_urn = __table__.c.variant_urn
mapped_variant_id = __table__.c.mapped_variant_id
score_set_id = __table__.c.score_set_id
score_set_urn = __table__.c.score_set_urn
published_date = __table__.c.published_date
current_mapped_variant = __table__.c.current_mapped_variant
18 changes: 16 additions & 2 deletions src/mavedb/routers/score_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ async def upload_score_set_variant_data(
except UnicodeDecodeError as e:
raise HTTPException(status_code=400, detail=f"Error decoding file: {e}. Ensure the file has correct values.")


if scores_file:
# Although this is also updated within the variant creation job, update it here
# as well so that we can display the proper UI components (queue invocation delay
Expand Down Expand Up @@ -1016,11 +1015,12 @@ async def delete_score_set(
response_model=score_set.ScoreSet,
response_model_exclude_none=True,
)
def publish_score_set(
async def publish_score_set(
*,
urn: str,
db: Session = Depends(deps.get_db),
user_data: UserData = Depends(require_current_user),
worker: ArqRedis = Depends(deps.get_worker),
) -> Any:
"""
Publish a score set.
Expand Down Expand Up @@ -1097,4 +1097,18 @@ def publish_score_set(
db.commit()
db.refresh(item)

# await the insertion of this job into the worker queue, not the job itself.
job = await worker.enqueue_job(
"refresh_published_variants_view",
correlation_id_for_context(),
user_data.user.id,
)
if job is not None:
save_to_logging_context({"worker_job_id": job.job_id})
logger.info(msg="Enqueud published variant materialized view refresh job.", extra=logging_context())
else:
logger.warning(
msg="Failed to enqueue published variant materialized view refresh job.", extra=logging_context()
)

return item
Loading