diff --git a/alembic/versions/2025_05_03_0956-028565b77b9e_add_manual_strategy_to_batch_strategy_.py b/alembic/versions/2025_05_03_0956-028565b77b9e_add_manual_strategy_to_batch_strategy_.py index c5af4d33..9ec86fee 100644 --- a/alembic/versions/2025_05_03_0956-028565b77b9e_add_manual_strategy_to_batch_strategy_.py +++ b/alembic/versions/2025_05_03_0956-028565b77b9e_add_manual_strategy_to_batch_strategy_.py @@ -38,6 +38,12 @@ def upgrade() -> None: def downgrade() -> None: + # Delete all batches with manual strategy + op.execute(""" + DELETE FROM BATCHES + WHERE STRATEGY = 'manual' + """) + switch_enum_type( table_name="batches", column_name="strategy", diff --git a/alembic/versions/2025_05_06_0816-e55e16e0738f_create_backlogsnapshot_table.py b/alembic/versions/2025_05_06_0816-e55e16e0738f_create_backlogsnapshot_table.py new file mode 100644 index 00000000..4d2fe7c5 --- /dev/null +++ b/alembic/versions/2025_05_06_0816-e55e16e0738f_create_backlogsnapshot_table.py @@ -0,0 +1,31 @@ +"""Create BacklogSnapshot Table + +Revision ID: e55e16e0738f +Revises: 028565b77b9e +Create Date: 2025-05-06 08:16:29.385305 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'e55e16e0738f' +down_revision: Union[str, None] = '028565b77b9e' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + 'backlog_snapshot', + sa.Column('id', sa.Integer(), nullable=False, primary_key=True), + sa.Column('count_pending_total', sa.Integer(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + ) + + +def downgrade() -> None: + op.drop_table('backlog_snapshot') diff --git a/alembic/versions/2025_05_06_0919-f25852e17c04_create_url_annotation_flags_view.py b/alembic/versions/2025_05_06_0919-f25852e17c04_create_url_annotation_flags_view.py new file mode 100644 index 00000000..09f8d825 --- /dev/null +++ b/alembic/versions/2025_05_06_0919-f25852e17c04_create_url_annotation_flags_view.py @@ -0,0 +1,47 @@ +"""Create URL Annotation Flags View + +Revision ID: f25852e17c04 +Revises: e55e16e0738f +Create Date: 2025-05-06 09:19:54.000410 + +""" +from typing import Sequence, Union + +from alembic import op + + +# revision identifiers, used by Alembic. +revision: str = 'f25852e17c04' +down_revision: Union[str, None] = 'e55e16e0738f' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute(""" + CREATE OR REPLACE VIEW url_annotation_flags AS + ( + SELECT u.id, + CASE WHEN arts.url_id IS NOT NULL THEN TRUE ELSE FALSE END AS has_auto_record_type_suggestion, + CASE WHEN ars.url_id IS NOT NULL THEN TRUE ELSE FALSE END AS has_auto_relevant_suggestion, + CASE WHEN auas.url_id IS NOT NULL THEN TRUE ELSE FALSE END AS has_auto_agency_suggestion, + CASE WHEN urts.url_id IS NOT NULL THEN TRUE ELSE FALSE END AS has_user_record_type_suggestion, + CASE WHEN urs.url_id IS NOT NULL THEN TRUE ELSE FALSE END AS has_user_relevant_suggestion, + CASE WHEN uuas.url_id IS NOT NULL THEN TRUE ELSE FALSE END AS has_user_agency_suggestion, + CASE WHEN cua.url_id IS NOT NULL THEN TRUE ELSE FALSE END AS has_confirmed_agency, + CASE WHEN ruu.url_id IS NOT NULL THEN TRUE ELSE FALSE END AS was_reviewed + FROM urls u + LEFT JOIN public.auto_record_type_suggestions arts ON u.id = arts.url_id + LEFT JOIN public.auto_relevant_suggestions ars ON u.id = ars.url_id + LEFT JOIN public.automated_url_agency_suggestions auas ON u.id = auas.url_id + LEFT JOIN public.user_record_type_suggestions urts ON u.id = urts.url_id + LEFT JOIN public.user_relevant_suggestions urs ON u.id = urs.url_id + LEFT JOIN public.user_url_agency_suggestions uuas ON u.id = uuas.url_id + LEFT JOIN public.reviewing_user_url ruu ON u.id = ruu.url_id + LEFT JOIN public.confirmed_url_agency cua on u.id = cua.url_id + ) + """) + + +def downgrade() -> None: + op.execute("DROP VIEW url_annotation_flags;") diff --git a/alembic/versions/2025_05_06_1115-6f2007bbcce3_create_url_data_sources_table.py b/alembic/versions/2025_05_06_1115-6f2007bbcce3_create_url_data_sources_table.py new file mode 100644 index 00000000..499de2e4 --- /dev/null +++ b/alembic/versions/2025_05_06_1115-6f2007bbcce3_create_url_data_sources_table.py @@ -0,0 +1,79 @@ +"""Create url_data_sources table + +Revision ID: 6f2007bbcce3 +Revises: f25852e17c04 +Create Date: 2025-05-06 11:15:24.485465 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision: str = '6f2007bbcce3' +down_revision: Union[str, None] = 'f25852e17c04' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Create url_data_sources_table table + op.create_table( + 'url_data_sources', + sa.Column( + 'id', + sa.Integer(), + primary_key=True + ), + sa.Column( + 'url_id', + sa.Integer(), + sa.ForeignKey( + 'urls.id', + ondelete='CASCADE' + ), + nullable=False + ), + sa.Column( + 'data_source_id', + sa.Integer(), + nullable=False + ), + sa.Column( + 'created_at', + sa.TIMESTAMP(), + nullable=False, + server_default=sa.text('now()') + ), + sa.UniqueConstraint('url_id', name='uq_url_data_sources_url_id'), + sa.UniqueConstraint('data_source_id', name='uq_url_data_sources_data_source_id') + ) + + # Migrate existing urls with a data source ID + op.execute(""" + INSERT INTO url_data_sources + (url_id, data_source_id) + SELECT id, data_source_id + FROM urls + WHERE data_source_id IS NOT NULL + """) + + # Drop existing data source ID column from urls table + op.drop_column('urls', 'data_source_id') + + +def downgrade() -> None: + + op.drop_table('url_data_sources') + + op.add_column( + 'urls', + sa.Column( + 'data_source_id', + sa.Integer(), + nullable=True + ) + ) + + diff --git a/api/main.py b/api/main.py index 1b80716e..94b52cd2 100644 --- a/api/main.py +++ b/api/main.py @@ -9,6 +9,7 @@ from api.routes.annotate import annotate_router from api.routes.batch import batch_router from api.routes.collector import collector_router +from api.routes.metrics import metrics_router from api.routes.review import review_router from api.routes.root import root_router from api.routes.search import search_router @@ -130,7 +131,8 @@ async def redirect_docs(): url_router, task_router, review_router, - search_router + search_router, + metrics_router ] for router in routers: diff --git a/api/routes/metrics.py b/api/routes/metrics.py new file mode 100644 index 00000000..d81aa2e6 --- /dev/null +++ b/api/routes/metrics.py @@ -0,0 +1,64 @@ +from fastapi import APIRouter +from fastapi.params import Query, Depends + +from api.dependencies import get_async_core +from core.AsyncCore import AsyncCore +from core.DTOs.GetMetricsBacklogResponse import GetMetricsBacklogResponseDTO +from core.DTOs.GetMetricsBatchesAggregatedResponseDTO import GetMetricsBatchesAggregatedResponseDTO +from core.DTOs.GetMetricsBatchesBreakdownResponseDTO import GetMetricsBatchesBreakdownResponseDTO +from core.DTOs.GetMetricsURLsAggregatedResponseDTO import GetMetricsURLsAggregatedResponseDTO +from core.DTOs.GetMetricsURLsBreakdownPendingResponseDTO import GetMetricsURLsBreakdownPendingResponseDTO +from core.DTOs.GetMetricsURLsBreakdownSubmittedResponseDTO import GetMetricsURLsBreakdownSubmittedResponseDTO +from security_manager.SecurityManager import AccessInfo, get_access_info + +metrics_router = APIRouter( + prefix="/metrics", + tags=["Metrics"], +) + + +@metrics_router.get("/batches/aggregated") +async def get_batches_aggregated_metrics( + core: AsyncCore = Depends(get_async_core), + access_info: AccessInfo = Depends(get_access_info) +) -> GetMetricsBatchesAggregatedResponseDTO: + return await core.get_batches_aggregated_metrics() + +@metrics_router.get("/batches/breakdown") +async def get_batches_breakdown_metrics( + core: AsyncCore = Depends(get_async_core), + access_info: AccessInfo = Depends(get_access_info), + page: int = Query( + description="The page number", + default=1 + ) +) -> GetMetricsBatchesBreakdownResponseDTO: + return await core.get_batches_breakdown_metrics(page=page) + +@metrics_router.get("/urls/aggregate") +async def get_urls_aggregated_metrics( + core: AsyncCore = Depends(get_async_core), + access_info: AccessInfo = Depends(get_access_info) +) -> GetMetricsURLsAggregatedResponseDTO: + return await core.get_urls_aggregated_metrics() + +@metrics_router.get("/urls/breakdown/submitted") +async def get_urls_breakdown_submitted_metrics( + core: AsyncCore = Depends(get_async_core), + access_info: AccessInfo = Depends(get_access_info) +) -> GetMetricsURLsBreakdownSubmittedResponseDTO: + return await core.get_urls_breakdown_submitted_metrics() + +@metrics_router.get("/urls/breakdown/pending") +async def get_urls_breakdown_pending_metrics( + core: AsyncCore = Depends(get_async_core), + access_info: AccessInfo = Depends(get_access_info) +) -> GetMetricsURLsBreakdownPendingResponseDTO: + return await core.get_urls_breakdown_pending_metrics() + +@metrics_router.get("/backlog") +async def get_backlog_metrics( + core: AsyncCore = Depends(get_async_core), + access_info: AccessInfo = Depends(get_access_info) +) -> GetMetricsBacklogResponseDTO: + return await core.get_backlog_metrics() \ No newline at end of file diff --git a/collector_db/AsyncDatabaseClient.py b/collector_db/AsyncDatabaseClient.py index 85d74146..e9438c5b 100644 --- a/collector_db/AsyncDatabaseClient.py +++ b/collector_db/AsyncDatabaseClient.py @@ -3,10 +3,12 @@ from typing import Optional, Type, Any, List from fastapi import HTTPException -from sqlalchemy import select, exists, func, case, desc, Select, not_, and_, update, asc, delete +from sqlalchemy import select, exists, func, case, desc, Select, not_, and_, update, asc, delete, insert, CTE +from sqlalchemy.dialects import postgresql from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.orm import selectinload, joinedload, QueryableAttribute, aliased +from sqlalchemy.sql.functions import coalesce from starlette import status from collector_db.ConfigManager import ConfigManager @@ -26,10 +28,21 @@ from collector_db.models import URL, URLErrorInfo, URLHTMLContent, Base, \ RootURL, Task, TaskError, LinkTaskURL, Batch, Agency, AutomatedUrlAgencySuggestion, \ UserUrlAgencySuggestion, AutoRelevantSuggestion, AutoRecordTypeSuggestion, UserRelevantSuggestion, \ - UserRecordTypeSuggestion, ReviewingUserURL, URLOptionalDataSourceMetadata, ConfirmedURLAgency, Duplicate, Log + UserRecordTypeSuggestion, ReviewingUserURL, URLOptionalDataSourceMetadata, ConfirmedURLAgency, Duplicate, Log, \ + BacklogSnapshot, URLDataSource from collector_manager.enums import URLStatus, CollectorType from core.DTOs.AllAnnotationPostInfo import AllAnnotationPostInfo from core.DTOs.FinalReviewApprovalInfo import FinalReviewApprovalInfo +from core.DTOs.GetMetricsBacklogResponse import GetMetricsBacklogResponseDTO, GetMetricsBacklogResponseInnerDTO +from core.DTOs.GetMetricsBatchesAggregatedResponseDTO import GetMetricsBatchesAggregatedResponseDTO, \ + GetMetricsBatchesAggregatedInnerResponseDTO +from core.DTOs.GetMetricsBatchesBreakdownResponseDTO import GetMetricsBatchesBreakdownResponseDTO, \ + GetMetricsBatchesBreakdownInnerResponseDTO +from core.DTOs.GetMetricsURLsAggregatedResponseDTO import GetMetricsURLsAggregatedResponseDTO +from core.DTOs.GetMetricsURLsBreakdownPendingResponseDTO import GetMetricsURLsBreakdownPendingResponseDTO, \ + GetMetricsURLsBreakdownPendingResponseInnerDTO +from core.DTOs.GetMetricsURLsBreakdownSubmittedResponseDTO import GetMetricsURLsBreakdownSubmittedResponseDTO, \ + GetMetricsURLsBreakdownSubmittedInnerDTO from core.DTOs.GetNextRecordTypeAnnotationResponseInfo import GetNextRecordTypeAnnotationResponseInfo from core.DTOs.GetNextRelevanceAnnotationResponseInfo import GetNextRelevanceAnnotationResponseInfo from core.DTOs.GetNextURLForAgencyAnnotationResponse import GetNextURLForAgencyAnnotationResponse, \ @@ -1313,6 +1326,8 @@ async def insert_url(self, session: AsyncSession, url_info: URLInfo) -> int: collector_metadata=url_info.collector_metadata, outcome=url_info.outcome.value ) + if url_info.created_at is not None: + url_entry.created_at = url_info.created_at session.add(url_entry) await session.flush() return url_entry.id @@ -1359,6 +1374,8 @@ async def insert_batch(self, session: AsyncSession, batch_info: BatchInfo) -> in record_type_match_rate=batch_info.record_type_match_rate, record_category_match_rate=batch_info.record_category_match_rate, ) + if batch_info.date_generated is not None: + batch.date_generated = batch_info.date_generated session.add(batch) await session.flush() return batch.id @@ -1474,14 +1491,24 @@ async def mark_urls_as_submitted(self, session: AsyncSession, infos: list[Submit for info in infos: url_id = info.url_id data_source_id = info.data_source_id + query = ( update(URL) .where(URL.id == url_id) .values( - data_source_id=data_source_id, outcome=URLStatus.SUBMITTED.value ) ) + + url_data_source_object = URLDataSource( + url_id=url_id, + data_source_id=data_source_id + ) + if info.submitted_at is not None: + url_data_source_object.created_at = info.submitted_at + session.add(url_data_source_object) + + await session.execute(query) @session_manager @@ -1794,3 +1821,389 @@ async def search_for_url(self, session: AsyncSession, url: str) -> SearchURLResp url_id=url.id ) + @session_manager + async def get_batches_aggregated_metrics(self, session: AsyncSession) -> GetMetricsBatchesAggregatedResponseDTO: + sc = StatementComposer + + # First, get all batches broken down by collector type and status + def batch_column(status: BatchStatus, label): + return sc.count_distinct( + case( + (Batch.status == status.value, + Batch.id) + ), + label=label + ) + + batch_count_subquery = select( + batch_column(BatchStatus.READY_TO_LABEL, label="done_count"), + batch_column(BatchStatus.ERROR, label="error_count"), + Batch.strategy, + ).group_by(Batch.strategy).subquery("batch_count") + + def url_column(status: URLStatus, label): + return sc.count_distinct( + case( + (URL.outcome == status.value, + URL.id) + ), + label=label + ) + + # Next, count urls + url_count_subquery = select( + Batch.strategy, + url_column(URLStatus.PENDING, label="pending_count"), + url_column(URLStatus.ERROR, label="error_count"), + url_column(URLStatus.VALIDATED, label="validated_count"), + url_column(URLStatus.SUBMITTED, label="submitted_count"), + url_column(URLStatus.REJECTED, label="rejected_count"), + + ).outerjoin( + Batch, Batch.id == URL.batch_id + ).group_by( + Batch.strategy + ).subquery("url_count") + + # Combine + query = select( + Batch.strategy, + batch_count_subquery.c.done_count.label("batch_done_count"), + batch_count_subquery.c.error_count.label("batch_error_count"), + coalesce(url_count_subquery.c.pending_count, 0).label("pending_count"), + coalesce(url_count_subquery.c.error_count, 0).label("error_count"), + coalesce(url_count_subquery.c.submitted_count, 0).label("submitted_count"), + coalesce(url_count_subquery.c.rejected_count, 0).label("rejected_count"), + coalesce(url_count_subquery.c.validated_count, 0).label("validated_count") + ).join( + batch_count_subquery, + Batch.strategy == batch_count_subquery.c.strategy + ).outerjoin( + url_count_subquery, + Batch.strategy == url_count_subquery.c.strategy + ) + raw_results = await session.execute(query) + results = raw_results.all() + d: dict[CollectorType, GetMetricsBatchesAggregatedInnerResponseDTO] = {} + for result in results: + d[CollectorType(result.strategy)] = GetMetricsBatchesAggregatedInnerResponseDTO( + count_successful_batches=result.batch_done_count, + count_failed_batches=result.batch_error_count, + count_urls=result.pending_count + result.submitted_count + + result.rejected_count + result.error_count + + result.validated_count, + count_urls_pending=result.pending_count, + count_urls_validated=result.validated_count, + count_urls_submitted=result.submitted_count, + count_urls_rejected=result.rejected_count, + count_urls_errors=result.error_count + ) + + total_batch_query = await session.execute( + select( + sc.count_distinct(Batch.id, label="count") + ) + ) + total_batch_count = total_batch_query.scalars().one_or_none() + if total_batch_count is None: + total_batch_count = 0 + + return GetMetricsBatchesAggregatedResponseDTO( + total_batches=total_batch_count, + by_strategy=d + ) + + @session_manager + async def get_batches_breakdown_metrics( + self, + session: AsyncSession, + page: int + ) -> GetMetricsBatchesBreakdownResponseDTO: + sc = StatementComposer + + main_query = select( + Batch.strategy, + Batch.id, + Batch.status, + Batch.date_generated.label("created_at"), + ) + + def url_column(status: URLStatus, label): + return sc.count_distinct( + case( + (URL.outcome == status.value, + URL.id) + ), + label=label + ) + + count_query = select( + URL.batch_id, + sc.count_distinct(URL.id, label="count_total"), + url_column(URLStatus.PENDING, label="count_pending"), + url_column(URLStatus.SUBMITTED, label="count_submitted"), + url_column(URLStatus.REJECTED, label="count_rejected"), + url_column(URLStatus.ERROR, label="count_error"), + url_column(URLStatus.VALIDATED, label="count_validated"), + ).group_by( + URL.batch_id + ).subquery("url_count") + + query = (select( + main_query.c.strategy, + main_query.c.id, + main_query.c.created_at, + main_query.c.status, + coalesce(count_query.c.count_total, 0).label("count_total"), + coalesce(count_query.c.count_pending, 0).label("count_pending"), + coalesce(count_query.c.count_submitted, 0).label("count_submitted"), + coalesce(count_query.c.count_rejected, 0).label("count_rejected"), + coalesce(count_query.c.count_error, 0).label("count_error"), + coalesce(count_query.c.count_validated, 0).label("count_validated"), + ).outerjoin( + count_query, + main_query.c.id == count_query.c.batch_id + ).offset( + (page - 1) * 100 + ).order_by( + main_query.c.created_at.asc() + )) + + raw_results = await session.execute(query) + results = raw_results.all() + batches: list[GetMetricsBatchesBreakdownInnerResponseDTO] = [] + for result in results: + dto = GetMetricsBatchesBreakdownInnerResponseDTO( + batch_id=result.id, + strategy=CollectorType(result.strategy), + status=BatchStatus(result.status), + created_at=result.created_at, + count_url_total=result.count_total, + count_url_pending=result.count_pending, + count_url_submitted=result.count_submitted, + count_url_rejected=result.count_rejected, + count_url_error=result.count_error, + count_url_validated=result.count_validated + ) + batches.append(dto) + return GetMetricsBatchesBreakdownResponseDTO( + batches=batches, + ) + + @session_manager + async def get_urls_breakdown_submitted_metrics( + self, + session: AsyncSession + ) -> GetMetricsURLsBreakdownSubmittedResponseDTO: + + # Build the query + week = func.date_trunc('week', URLDataSource.created_at) + query = ( + select( + week.label('week'), + func.count(URLDataSource.id).label('count_submitted'), + ) + .group_by(week) + .order_by(week.asc()) + ) + + # Execute the query + raw_results = await session.execute(query) + results = raw_results.all() + final_results: list[GetMetricsURLsBreakdownSubmittedInnerDTO] = [] + for result in results: + dto = GetMetricsURLsBreakdownSubmittedInnerDTO( + week_of=result.week, + count_submitted=result.count_submitted + ) + final_results.append(dto) + return GetMetricsURLsBreakdownSubmittedResponseDTO( + entries=final_results + ) + + @session_manager + async def get_urls_aggregated_metrics( + self, + session: AsyncSession + ) -> GetMetricsURLsAggregatedResponseDTO: + sc = StatementComposer + + oldest_pending_url_query = select( + URL.id, + URL.created_at + ).where( + URL.outcome == URLStatus.PENDING.value + ).order_by( + URL.created_at.asc() + ).limit(1) + + oldest_pending_url = await session.execute(oldest_pending_url_query) + oldest_pending_url = oldest_pending_url.one_or_none() + if oldest_pending_url is None: + oldest_pending_url_id = None + oldest_pending_created_at = None + else: + oldest_pending_url_id = oldest_pending_url.id + oldest_pending_created_at = oldest_pending_url.created_at + + def case_column(status: URLStatus, label): + return sc.count_distinct( + case( + (URL.outcome == status.value, + URL.id) + ), + label=label + ) + + count_query = select( + sc.count_distinct(URL.id, label="count"), + case_column(URLStatus.PENDING, label="count_pending"), + case_column(URLStatus.SUBMITTED, label="count_submitted"), + case_column(URLStatus.VALIDATED, label="count_validated"), + case_column(URLStatus.REJECTED, label="count_rejected"), + case_column(URLStatus.ERROR, label="count_error"), + ) + raw_results = await session.execute(count_query) + results = raw_results.all() + + return GetMetricsURLsAggregatedResponseDTO( + count_urls_total=results[0].count, + count_urls_pending=results[0].count_pending, + count_urls_submitted=results[0].count_submitted, + count_urls_validated=results[0].count_validated, + count_urls_rejected=results[0].count_rejected, + count_urls_errors=results[0].count_error, + oldest_pending_url_id=oldest_pending_url_id, + oldest_pending_url_created_at=oldest_pending_created_at, + ) + + def compile(self, statement): + compiled_sql = statement.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}) + return compiled_sql + + @session_manager + async def get_urls_breakdown_pending_metrics( + self, + session: AsyncSession + ) -> GetMetricsURLsBreakdownPendingResponseDTO: + sc = StatementComposer + + flags: CTE = sc.url_annotation_flags_query( + status=URLStatus.PENDING + ) + + + week = func.date_trunc('week', URL.created_at) + + # Build the query + query = ( + select( + week.label('week'), + func.count(URL.id).label('count_total'), + func.count(case( + (flags.c.has_user_record_type_annotation == True, 1)) + ).label('user_record_type_count'), + func.count(case( + (flags.c.has_user_relevant_annotation == True, 1)) + ).label('user_relevant_count'), + func.count(case( + (flags.c.has_user_agency_annotation == True, 1)) + ).label('user_agency_count'), + ) + .join(flags, flags.c.url_id == URL.id) + .group_by(week) + .order_by(week.asc()) + ) + + # Execute the query and return the results + results = await session.execute(query) + all_results = results.all() + final_results: list[GetMetricsURLsBreakdownPendingResponseInnerDTO] = [] + + for result in all_results: + dto = GetMetricsURLsBreakdownPendingResponseInnerDTO( + week_created_at=result.week, + count_pending_total=result.count_total, + count_pending_relevant_user=result.user_relevant_count, + count_pending_record_type_user=result.user_record_type_count, + count_pending_agency_user=result.user_agency_count, + ) + final_results.append(dto) + return GetMetricsURLsBreakdownPendingResponseDTO( + entries=final_results, + ) + + @session_manager + async def get_backlog_metrics( + self, + session: AsyncSession + ) -> GetMetricsBacklogResponseDTO: + # 1. Create a subquery that assigns row_number() partitioned by week + weekly_snapshots_subq = ( + select( + BacklogSnapshot.id, + BacklogSnapshot.created_at, + BacklogSnapshot.count_pending_total, + func.date_trunc('week', BacklogSnapshot.created_at).label("week_start"), + func.row_number() + .over( + partition_by=func.date_trunc('week', BacklogSnapshot.created_at), + order_by=BacklogSnapshot.created_at.desc() + ) + .label("row_number") + ) + .subquery() + ) + + # 2. Filter for the top (most recent) row in each week + stmt = ( + select( + weekly_snapshots_subq.c.week_start, + weekly_snapshots_subq.c.created_at, + weekly_snapshots_subq.c.count_pending_total + ) + .where(weekly_snapshots_subq.c.row_number == 1) + .order_by(weekly_snapshots_subq.c.week_start) + ) + + raw_result = await session.execute(stmt) + results = raw_result.all() + final_results = [] + for result in results: + final_results.append( + GetMetricsBacklogResponseInnerDTO( + week_of=result.week_start, + count_pending_total=result.count_pending_total, + ) + ) + + return GetMetricsBacklogResponseDTO(entries=final_results) + + + @session_manager + async def populate_backlog_snapshot( + self, + session: AsyncSession, + dt: Optional[datetime] = None + ): + sc = StatementComposer + # Get count of pending URLs + query = select( + sc.count_distinct(URL.id, label="count") + ).where( + URL.outcome == URLStatus.PENDING.value + ) + + raw_result = await session.execute(query) + count = raw_result.one()[0] + + # insert count into snapshot + snapshot = BacklogSnapshot( + count_pending_total=count + ) + if dt is not None: + snapshot.created_at = dt + + session.add(snapshot) + + diff --git a/collector_db/DTOs/URLInfo.py b/collector_db/DTOs/URLInfo.py index c47d2830..5a1d2221 100644 --- a/collector_db/DTOs/URLInfo.py +++ b/collector_db/DTOs/URLInfo.py @@ -13,4 +13,5 @@ class URLInfo(BaseModel): collector_metadata: Optional[dict] = None outcome: URLStatus = URLStatus.PENDING updated_at: Optional[datetime.datetime] = None + created_at: Optional[datetime.datetime] = None name: Optional[str] = None diff --git a/collector_db/DatabaseClient.py b/collector_db/DatabaseClient.py index 94320fbc..8bd8105f 100644 --- a/collector_db/DatabaseClient.py +++ b/collector_db/DatabaseClient.py @@ -1,10 +1,10 @@ from functools import wraps from typing import Optional, List -from sqlalchemy import create_engine +from sqlalchemy import create_engine, update from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import sessionmaker, scoped_session +from sqlalchemy.orm import sessionmaker, scoped_session, Session from collector_db.ConfigManager import ConfigManager from collector_db.DTOs.BatchInfo import BatchInfo @@ -13,10 +13,11 @@ from collector_db.DTOs.LogInfo import LogInfo from collector_db.DTOs.URLInfo import URLInfo from collector_db.DTOs.URLMapping import URLMapping -from collector_db.models import Base, Batch, URL, Log, Duplicate -from collector_manager.enums import CollectorType +from collector_db.models import Base, Batch, URL, Log, Duplicate, URLDataSource +from collector_manager.enums import CollectorType, URLStatus from core.DTOs.ManualBatchInputDTO import ManualBatchInputDTO from core.DTOs.ManualBatchResponseDTO import ManualBatchResponseDTO +from core.DTOs.task_data_objects.SubmitApprovedURLTDO import SubmittedURLInfo from core.EnvVarManager import EnvVarManager from core.enums import BatchStatus @@ -72,6 +73,8 @@ def insert_batch(self, session, batch_info: BatchInfo) -> int: record_type_match_rate=batch_info.record_type_match_rate, record_category_match_rate=batch_info.record_category_match_rate, ) + if batch_info.date_generated is not None: + batch.date_generated = batch_info.date_generated session.add(batch) session.commit() session.refresh(batch) @@ -109,6 +112,8 @@ def insert_url(self, session, url_info: URLInfo) -> int: outcome=url_info.outcome.value, name=url_info.name ) + if url_info.created_at is not None: + url_entry.created_at = url_info.created_at session.add(url_entry) session.commit() session.refresh(url_entry) @@ -164,6 +169,34 @@ def update_url(self, session, url_info: URLInfo): url = session.query(URL).filter_by(id=url_info.id).first() url.collector_metadata = url_info.collector_metadata + @session_manager + def mark_urls_as_submitted( + self, + session: Session, + infos: list[SubmittedURLInfo] + ): + for info in infos: + url_id = info.url_id + data_source_id = info.data_source_id + + query = ( + update(URL) + .where(URL.id == url_id) + .values( + outcome=URLStatus.SUBMITTED.value + ) + ) + + url_data_source_object = URLDataSource( + url_id=url_id, + data_source_id=data_source_id + ) + if info.submitted_at is not None: + url_data_source_object.created_at = info.submitted_at + session.add(url_data_source_object) + + session.execute(query) + if __name__ == "__main__": client = DatabaseClient() print("Database client initialized.") diff --git a/collector_db/StatementComposer.py b/collector_db/StatementComposer.py index ca66f6ba..23c817b1 100644 --- a/collector_db/StatementComposer.py +++ b/collector_db/StatementComposer.py @@ -1,11 +1,12 @@ -from typing import Any +from typing import Any, Optional -from sqlalchemy import Select, select, exists, Table, func, Subquery, and_, not_, ColumnElement +from sqlalchemy import Select, select, exists, Table, func, Subquery, and_, not_, ColumnElement, case, literal, CTE from sqlalchemy.orm import aliased from collector_db.enums import URLMetadataAttributeType, ValidationStatus, TaskType from collector_db.models import URL, URLHTMLContent, AutomatedUrlAgencySuggestion, URLOptionalDataSourceMetadata, Batch, \ - ConfirmedURLAgency, LinkTaskURL, Task, UserUrlAgencySuggestion, UserRecordTypeSuggestion, UserRelevantSuggestion + ConfirmedURLAgency, LinkTaskURL, Task, UserUrlAgencySuggestion, UserRecordTypeSuggestion, UserRelevantSuggestion, \ + AutoRecordTypeSuggestion, AutoRelevantSuggestion, ReviewingUserURL from collector_manager.enums import URLStatus, CollectorType from core.enums import BatchStatus @@ -114,4 +115,54 @@ def user_suggestion_not_exists( ) ) - return subquery \ No newline at end of file + return subquery + + @staticmethod + def count_distinct(field, label): + return func.count(func.distinct(field)).label(label) + + @staticmethod + def sum_distinct(field, label): + return func.sum(func.distinct(field)).label(label) + + @staticmethod + def url_annotation_flags_query( + status: Optional[URLStatus] = None + ) -> CTE: + stmt = ( + select( + URL.id.label("url_id"), + case((AutoRecordTypeSuggestion.url_id != None, literal(True)), else_=literal(False)).label( + "has_auto_record_type_annotation" + ), + case((AutoRelevantSuggestion.url_id != None, literal(True)), else_=literal(False)).label( + "has_auto_relevant_annotation" + ), + case((AutomatedUrlAgencySuggestion.url_id != None, literal(True)), else_=literal(False)).label( + "has_auto_agency_annotation" + ), + case((UserRecordTypeSuggestion.url_id != None, literal(True)), else_=literal(False)).label( + "has_user_record_type_annotation" + ), + case((UserRelevantSuggestion.url_id != None, literal(True)), else_=literal(False)).label( + "has_user_relevant_annotation" + ), + case((UserUrlAgencySuggestion.url_id != None, literal(True)), else_=literal(False)).label( + "has_user_agency_annotation" + ), + case((ReviewingUserURL.url_id != None, literal(True)), else_=literal(False)).label("was_reviewed"), + ) + .outerjoin(AutoRecordTypeSuggestion, URL.id == AutoRecordTypeSuggestion.url_id) + .outerjoin(AutoRelevantSuggestion, URL.id == AutoRelevantSuggestion.url_id) + .outerjoin(AutomatedUrlAgencySuggestion, URL.id == AutomatedUrlAgencySuggestion.url_id) + .outerjoin(UserRecordTypeSuggestion, URL.id == UserRecordTypeSuggestion.url_id) + .outerjoin(UserRelevantSuggestion, URL.id == UserRelevantSuggestion.url_id) + .outerjoin(UserUrlAgencySuggestion, URL.id == UserUrlAgencySuggestion.url_id) + .outerjoin(ReviewingUserURL, URL.id == ReviewingUserURL.url_id) + ) + if status is not None: + stmt = stmt.where( + URL.outcome == status.value + ) + + return stmt.cte("url_annotation_flags") \ No newline at end of file diff --git a/collector_db/models.py b/collector_db/models.py index b5e70cdc..b38243dd 100644 --- a/collector_db/models.py +++ b/collector_db/models.py @@ -106,7 +106,6 @@ class URL(Base): record_type = Column(postgresql.ENUM(*record_type_values, name='record_type'), nullable=True) created_at = get_created_at_column() updated_at = get_updated_at_column() - data_source_id = Column(Integer, nullable=True) # Relationships batch = relationship("Batch", back_populates="urls") @@ -137,6 +136,11 @@ class URL(Base): confirmed_agencies = relationship( "ConfirmedURLAgency", ) + data_source = relationship( + "URLDataSource", + back_populates="url", + uselist=False + ) class URLOptionalDataSourceMetadata(Base): @@ -452,4 +456,26 @@ class UserRecordTypeSuggestion(Base): # Relationships - url = relationship("URL", back_populates="user_record_type_suggestion") \ No newline at end of file + url = relationship("URL", back_populates="user_record_type_suggestion") + +class BacklogSnapshot(Base): + __tablename__ = "backlog_snapshot" + + id = Column(Integer, primary_key=True, autoincrement=True) + count_pending_total = Column(Integer, nullable=False) + created_at = get_created_at_column() + +class URLDataSource(Base): + __tablename__ = "url_data_sources" + + id = Column(Integer, primary_key=True, autoincrement=True) + url_id = Column(Integer, ForeignKey("urls.id"), nullable=False) + data_source_id = Column(Integer, nullable=False) + created_at = get_created_at_column() + + # Relationships + url = relationship( + "URL", + back_populates="data_source", + uselist=False + ) \ No newline at end of file diff --git a/core/AsyncCore.py b/core/AsyncCore.py index 46ccca0d..e7b7f534 100644 --- a/core/AsyncCore.py +++ b/core/AsyncCore.py @@ -15,6 +15,12 @@ from core.DTOs.GetBatchLogsResponse import GetBatchLogsResponse from core.DTOs.GetBatchStatusResponse import GetBatchStatusResponse from core.DTOs.GetDuplicatesByBatchResponse import GetDuplicatesByBatchResponse +from core.DTOs.GetMetricsBacklogResponse import GetMetricsBacklogResponseDTO +from core.DTOs.GetMetricsBatchesAggregatedResponseDTO import GetMetricsBatchesAggregatedResponseDTO +from core.DTOs.GetMetricsBatchesBreakdownResponseDTO import GetMetricsBatchesBreakdownResponseDTO +from core.DTOs.GetMetricsURLsAggregatedResponseDTO import GetMetricsURLsAggregatedResponseDTO +from core.DTOs.GetMetricsURLsBreakdownPendingResponseDTO import GetMetricsURLsBreakdownPendingResponseDTO +from core.DTOs.GetMetricsURLsBreakdownSubmittedResponseDTO import GetMetricsURLsBreakdownSubmittedResponseDTO from core.DTOs.GetNextRecordTypeAnnotationResponseInfo import GetNextRecordTypeAnnotationResponseOuterInfo from core.DTOs.GetNextRelevanceAnnotationResponseInfo import GetNextRelevanceAnnotationResponseOuterInfo from core.DTOs.GetNextURLForAgencyAnnotationResponse import GetNextURLForAgencyAnnotationResponse, \ @@ -299,3 +305,21 @@ async def upload_manual_batch( async def search_for_url(self, url: str) -> SearchURLResponse: return await self.adb_client.search_for_url(url) + + async def get_batches_aggregated_metrics(self) -> GetMetricsBatchesAggregatedResponseDTO: + return await self.adb_client.get_batches_aggregated_metrics() + + async def get_batches_breakdown_metrics(self, page: int) -> GetMetricsBatchesBreakdownResponseDTO: + return await self.adb_client.get_batches_breakdown_metrics(page=page) + + async def get_urls_breakdown_submitted_metrics(self) -> GetMetricsURLsBreakdownSubmittedResponseDTO: + return await self.adb_client.get_urls_breakdown_submitted_metrics() + + async def get_urls_aggregated_metrics(self) -> GetMetricsURLsAggregatedResponseDTO: + return await self.adb_client.get_urls_aggregated_metrics() + + async def get_urls_breakdown_pending_metrics(self) -> GetMetricsURLsBreakdownPendingResponseDTO: + return await self.adb_client.get_urls_breakdown_pending_metrics() + + async def get_backlog_metrics(self) -> GetMetricsBacklogResponseDTO: + return await self.adb_client.get_backlog_metrics() \ No newline at end of file diff --git a/core/DTOs/GetMetricsBacklogResponse.py b/core/DTOs/GetMetricsBacklogResponse.py new file mode 100644 index 00000000..0df38324 --- /dev/null +++ b/core/DTOs/GetMetricsBacklogResponse.py @@ -0,0 +1,10 @@ +import datetime + +from pydantic import BaseModel + +class GetMetricsBacklogResponseInnerDTO(BaseModel): + week_of: datetime.date + count_pending_total: int + +class GetMetricsBacklogResponseDTO(BaseModel): + entries: list[GetMetricsBacklogResponseInnerDTO] \ No newline at end of file diff --git a/core/DTOs/GetMetricsBatchesAggregatedResponseDTO.py b/core/DTOs/GetMetricsBatchesAggregatedResponseDTO.py new file mode 100644 index 00000000..37535f2d --- /dev/null +++ b/core/DTOs/GetMetricsBatchesAggregatedResponseDTO.py @@ -0,0 +1,25 @@ +from typing import Dict + +from pydantic import BaseModel + +from collector_manager.enums import CollectorType + + +class GetMetricsBatchesAggregatedInnerResponseDTO(BaseModel): + count_successful_batches: int + count_failed_batches: int + count_urls: int + count_urls_pending: int + count_urls_validated: int + count_urls_submitted: int + count_urls_rejected: int + count_urls_errors: int + + + +class GetMetricsBatchesAggregatedResponseDTO(BaseModel): + total_batches: int + by_strategy: Dict[ + CollectorType, + GetMetricsBatchesAggregatedInnerResponseDTO + ] \ No newline at end of file diff --git a/core/DTOs/GetMetricsBatchesBreakdownResponseDTO.py b/core/DTOs/GetMetricsBatchesBreakdownResponseDTO.py new file mode 100644 index 00000000..6572f49f --- /dev/null +++ b/core/DTOs/GetMetricsBatchesBreakdownResponseDTO.py @@ -0,0 +1,22 @@ +from datetime import datetime + +from pydantic import BaseModel + +from collector_manager.enums import CollectorType +from core.enums import BatchStatus + + +class GetMetricsBatchesBreakdownInnerResponseDTO(BaseModel): + batch_id: int + strategy: CollectorType + status: BatchStatus + created_at: datetime + count_url_total: int + count_url_pending: int + count_url_submitted: int + count_url_rejected: int + count_url_error: int + count_url_validated: int + +class GetMetricsBatchesBreakdownResponseDTO(BaseModel): + batches: list[GetMetricsBatchesBreakdownInnerResponseDTO] \ No newline at end of file diff --git a/core/DTOs/GetMetricsURLsAggregatedResponseDTO.py b/core/DTOs/GetMetricsURLsAggregatedResponseDTO.py new file mode 100644 index 00000000..66009223 --- /dev/null +++ b/core/DTOs/GetMetricsURLsAggregatedResponseDTO.py @@ -0,0 +1,14 @@ +import datetime + +from pydantic import BaseModel + + +class GetMetricsURLsAggregatedResponseDTO(BaseModel): + count_urls_total: int + count_urls_pending: int + count_urls_submitted: int + count_urls_rejected: int + count_urls_validated: int + count_urls_errors: int + oldest_pending_url_created_at: datetime.datetime + oldest_pending_url_id: int \ No newline at end of file diff --git a/core/DTOs/GetMetricsURLsBreakdownPendingResponseDTO.py b/core/DTOs/GetMetricsURLsBreakdownPendingResponseDTO.py new file mode 100644 index 00000000..22235e45 --- /dev/null +++ b/core/DTOs/GetMetricsURLsBreakdownPendingResponseDTO.py @@ -0,0 +1,12 @@ +from pydantic import BaseModel +from datetime import datetime + +class GetMetricsURLsBreakdownPendingResponseInnerDTO(BaseModel): + week_created_at: datetime + count_pending_total: int + count_pending_relevant_user: int + count_pending_record_type_user: int + count_pending_agency_user: int + +class GetMetricsURLsBreakdownPendingResponseDTO(BaseModel): + entries: list[GetMetricsURLsBreakdownPendingResponseInnerDTO] \ No newline at end of file diff --git a/core/DTOs/GetMetricsURLsBreakdownSubmittedResponseDTO.py b/core/DTOs/GetMetricsURLsBreakdownSubmittedResponseDTO.py new file mode 100644 index 00000000..d5c1dde5 --- /dev/null +++ b/core/DTOs/GetMetricsURLsBreakdownSubmittedResponseDTO.py @@ -0,0 +1,10 @@ +from datetime import date + +from pydantic import BaseModel + +class GetMetricsURLsBreakdownSubmittedInnerDTO(BaseModel): + week_of: date + count_submitted: int + +class GetMetricsURLsBreakdownSubmittedResponseDTO(BaseModel): + entries: list[GetMetricsURLsBreakdownSubmittedInnerDTO] \ No newline at end of file diff --git a/core/DTOs/task_data_objects/SubmitApprovedURLTDO.py b/core/DTOs/task_data_objects/SubmitApprovedURLTDO.py index c5b002d0..be26d3a8 100644 --- a/core/DTOs/task_data_objects/SubmitApprovedURLTDO.py +++ b/core/DTOs/task_data_objects/SubmitApprovedURLTDO.py @@ -3,7 +3,7 @@ from pydantic import BaseModel from core.enums import RecordType - +from datetime import datetime class SubmitApprovedURLTDO(BaseModel): url_id: int @@ -22,4 +22,5 @@ class SubmitApprovedURLTDO(BaseModel): class SubmittedURLInfo(BaseModel): url_id: int data_source_id: Optional[int] - request_error: Optional[str] \ No newline at end of file + request_error: Optional[str] + submitted_at: Optional[datetime] = None \ No newline at end of file diff --git a/core/ScheduledTaskManager.py b/core/ScheduledTaskManager.py index 0a407d9e..e0b87247 100644 --- a/core/ScheduledTaskManager.py +++ b/core/ScheduledTaskManager.py @@ -18,6 +18,7 @@ def __init__(self, async_core: AsyncCore): # Jobs self.run_cycles_job = None self.delete_logs_job = None + self.populate_backlog_snapshot_job = None def add_scheduled_tasks(self): self.run_cycles_job = self.scheduler.add_job( @@ -35,6 +36,13 @@ def add_scheduled_tasks(self): start_date=datetime.now() + timedelta(minutes=10) ) ) + self.populate_backlog_snapshot_job = self.scheduler.add_job( + self.async_core.adb_client.populate_backlog_snapshot, + trigger=IntervalTrigger( + days=1, + start_date=datetime.now() + timedelta(minutes=20) + ) + ) def shutdown(self): if self.scheduler.running: diff --git a/pyproject.toml b/pyproject.toml index de0abfcd..5d2269c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "ckanapi~=4.8", "datasets~=2.19.1", "docker~=7.1.0", + "environs>=14.1.1", "fastapi[standard]~=0.115.6", "from-root~=1.3.0", "google-api-python-client>=2.156.0", @@ -43,6 +44,7 @@ dependencies = [ [dependency-groups] dev = [ "docker>=7.1.0", + "pendulum>=3.1.0", "pytest>=7.2.2", "pytest-asyncio~=0.25.2", "pytest-mock==3.12.0", diff --git a/tests/conftest.py b/tests/conftest.py index d7b1bce7..c8f4bd64 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -62,13 +62,24 @@ def setup_and_teardown(): try: runner.upgrade("head") except Exception as e: + print("Exception while upgrading: ", e) + print("Resetting schema") runner.reset_schema() runner.stamp("base") runner.upgrade("head") - live_connection.close() - engine.dispose() + yield + try: + runner.downgrade("base") + except Exception as e: + print("Exception while downgrading: ", e) + print("Resetting schema") + runner.reset_schema() + runner.stamp("base") + finally: + live_connection.close() + engine.dispose() @pytest.fixture def wipe_database(): diff --git a/tests/helpers/DBDataCreator.py b/tests/helpers/DBDataCreator.py index 695a3c7a..71338d84 100644 --- a/tests/helpers/DBDataCreator.py +++ b/tests/helpers/DBDataCreator.py @@ -1,4 +1,5 @@ import asyncio +from datetime import datetime from random import randint from typing import List, Optional @@ -11,15 +12,28 @@ from collector_db.DTOs.URLErrorInfos import URLErrorPydanticInfo from collector_db.DTOs.URLHTMLContentInfo import URLHTMLContentInfo, HTMLContentType from collector_db.DTOs.URLInfo import URLInfo +from collector_db.DTOs.URLMapping import URLMapping from collector_db.DatabaseClient import DatabaseClient from collector_db.enums import TaskType from collector_manager.enums import CollectorType, URLStatus +from core.DTOs.FinalReviewApprovalInfo import FinalReviewApprovalInfo from core.DTOs.URLAgencySuggestionInfo import URLAgencySuggestionInfo +from core.DTOs.task_data_objects.SubmitApprovedURLTDO import SubmittedURLInfo from core.DTOs.task_data_objects.URLMiscellaneousMetadataTDO import URLMiscellaneousMetadataTDO from core.enums import BatchStatus, SuggestionType, RecordType +from tests.helpers.test_batch_creation_parameters import TestBatchCreationParameters, AnnotationInfo from tests.helpers.simple_test_data_functions import generate_test_urls +class URLCreationInfo(BaseModel): + url_mappings: list[URLMapping] + outcome: URLStatus + annotation_info: AnnotationInfo + +class BatchURLCreationInfoV2(BaseModel): + batch_id: int + url_creation_infos: dict[URLStatus, URLCreationInfo] + class BatchURLCreationInfo(BaseModel): batch_id: int url_ids: list[int] @@ -37,9 +51,10 @@ def __init__(self, db_client: Optional[DatabaseClient] = None): self.adb_client: AsyncDatabaseClient = AsyncDatabaseClient() def batch( - self, - strategy: CollectorType = CollectorType.EXAMPLE, - batch_status: BatchStatus = BatchStatus.IN_PROCESS + self, + strategy: CollectorType = CollectorType.EXAMPLE, + batch_status: BatchStatus = BatchStatus.IN_PROCESS, + created_at: Optional[datetime] = None ) -> int: return self.db_client.insert_batch( BatchInfo( @@ -47,7 +62,8 @@ def batch( status=batch_status, total_url_count=1, parameters={"test_key": "test_value"}, - user_id=1 + user_id=1, + date_generated=created_at ) ) @@ -57,6 +73,49 @@ async def task(self, url_ids: Optional[list[int]] = None) -> int: await self.adb_client.link_urls_to_task(task_id=task_id, url_ids=url_ids) return task_id + async def batch_v2( + self, + parameters: TestBatchCreationParameters + ) -> BatchURLCreationInfoV2: + batch_id = self.batch( + strategy=parameters.strategy, + batch_status=parameters.outcome, + created_at=parameters.created_at + ) + if parameters.outcome in (BatchStatus.ERROR, BatchStatus.ABORTED): + return BatchURLCreationInfoV2( + batch_id=batch_id, + url_creation_infos={} + ) + + d: dict[URLStatus, URLCreationInfo] = {} + for url_parameters in parameters.urls: + iui: InsertURLsInfo = self.urls( + batch_id=batch_id, + url_count=url_parameters.count, + outcome=url_parameters.status, + created_at=parameters.created_at + ) + url_ids = [iui.url_id for iui in iui.url_mappings] + if url_parameters.with_html_content: + await self.html_data(url_ids) + if url_parameters.annotation_info.has_annotations(): + for url_id in url_ids: + await self.annotate( + url_id=url_id, + annotation_info=url_parameters.annotation_info + ) + + d[url_parameters.status] = URLCreationInfo( + url_mappings=iui.url_mappings, + outcome=url_parameters.status, + annotation_info=url_parameters.annotation_info + ) + return BatchURLCreationInfoV2( + batch_id=batch_id, + url_creation_infos=d + ) + async def batch_and_urls( self, strategy: CollectorType = CollectorType.EXAMPLE, @@ -113,6 +172,41 @@ async def auto_relevant_suggestions(self, url_id: int, relevant: bool = True): relevant=relevant ) + async def annotate(self, url_id: int, annotation_info: AnnotationInfo): + info = annotation_info + if info.user_relevant is not None: + await self.user_relevant_suggestion(url_id=url_id, relevant=info.user_relevant) + if info.auto_relevant is not None: + await self.auto_relevant_suggestions(url_id=url_id, relevant=info.auto_relevant) + if info.user_record_type is not None: + await self.user_record_type_suggestion(url_id=url_id, record_type=info.user_record_type) + if info.auto_record_type is not None: + await self.auto_record_type_suggestions(url_id=url_id, record_type=info.auto_record_type) + if info.user_agency is not None: + await self.agency_user_suggestions(url_id=url_id, agency_id=info.user_agency) + if info.auto_agency is not None: + await self.agency_auto_suggestions(url_id=url_id, count=1, suggestion_type=SuggestionType.AUTO_SUGGESTION) + if info.confirmed_agency is not None: + await self.agency_auto_suggestions(url_id=url_id, count=1, suggestion_type=SuggestionType.CONFIRMED) + if info.final_review_approved is not None: + if info.final_review_approved: + final_review_approval_info = FinalReviewApprovalInfo( + url_id=url_id, + record_type=annotation_info.user_record_type, + agency_ids=[annotation_info.user_agency] if annotation_info.user_agency is not None else None, + description="Test Description", + ) + await self.adb_client.approve_url( + approval_info=final_review_approval_info, + user_id=1 + ) + else: + await self.adb_client.reject_url( + url_id=url_id, + user_id=1 + ) + + async def user_relevant_suggestion( self, url_id: int, @@ -204,7 +298,8 @@ def urls( batch_id: int, url_count: int, collector_metadata: Optional[dict] = None, - outcome: URLStatus = URLStatus.PENDING + outcome: URLStatus = URLStatus.PENDING, + created_at: Optional[datetime] = None ) -> InsertURLsInfo: raw_urls = generate_test_urls(url_count) url_infos: List[URLInfo] = [] @@ -214,15 +309,32 @@ def urls( url=url, outcome=outcome, name="Test Name" if outcome == URLStatus.VALIDATED else None, - collector_metadata=collector_metadata + collector_metadata=collector_metadata, + created_at=created_at ) ) - return self.db_client.insert_urls( + url_insert_info = self.db_client.insert_urls( url_infos=url_infos, batch_id=batch_id, ) + # If outcome is submitted, also add entry to DataSourceURL + if outcome == URLStatus.SUBMITTED: + submitted_url_infos = [] + for url_id in url_insert_info.url_ids: + submitted_url_info = SubmittedURLInfo( + url_id=url_id, + data_source_id=url_id, # Use same ID for convenience, + request_error=None, + submitted_at=created_at + ) + submitted_url_infos.append(submitted_url_info) + self.db_client.mark_urls_as_submitted(submitted_url_infos) + + + return url_insert_info + async def url_miscellaneous_metadata( self, url_id: int, diff --git a/tests/helpers/complex_test_data_functions.py b/tests/helpers/complex_test_data_functions.py index 6f9ca7c3..bc03020f 100644 --- a/tests/helpers/complex_test_data_functions.py +++ b/tests/helpers/complex_test_data_functions.py @@ -121,3 +121,4 @@ async def add_relevant_suggestion(relevant: bool): url_mapping=url_mapping, user_agency_id=user_agency_id ) + diff --git a/tests/helpers/test_batch_creation_parameters.py b/tests/helpers/test_batch_creation_parameters.py new file mode 100644 index 00000000..cfb4805e --- /dev/null +++ b/tests/helpers/test_batch_creation_parameters.py @@ -0,0 +1,71 @@ +import datetime +from typing import Optional + +from pydantic import BaseModel, model_validator + +from collector_manager.enums import URLStatus, CollectorType +from core.enums import BatchStatus, AnnotationType, RecordType + + +class AnnotationInfo(BaseModel): + user_relevant: Optional[bool] = None + auto_relevant: Optional[bool] = None + user_record_type: Optional[RecordType] = None + auto_record_type: Optional[RecordType] = None + user_agency: Optional[int] = None + auto_agency: Optional[int] = None + confirmed_agency: Optional[int] = None + final_review_approved: Optional[bool] = None + + def has_annotations(self): + return any(value is not None for value in [ + self.user_relevant, + self.auto_relevant, + self.user_record_type, + self.auto_record_type, + self.user_agency, + self.auto_agency, + self.confirmed_agency, + self.final_review_approved + ]) + +class TestURLCreationParameters(BaseModel): + count: int + status: URLStatus = URLStatus.PENDING + with_html_content: bool = False + annotation_info: AnnotationInfo = AnnotationInfo() + + @model_validator(mode='after') + def validate_annotation_info(self): + if self.status == URLStatus.REJECTED: + self.annotation_info.final_review_approved = False + return self + if self.status != URLStatus.VALIDATED: + return self + + # Assume is validated + self.annotation_info.final_review_approved = True + if self.annotation_info.user_record_type is None: + self.annotation_info.user_record_type = RecordType.ARREST_RECORDS + if self.annotation_info.user_agency is None: + self.annotation_info.user_agency = 1 + + + return self + +class TestBatchCreationParameters(BaseModel): + created_at: Optional[datetime.datetime] = None + outcome: BatchStatus = BatchStatus.READY_TO_LABEL + strategy: CollectorType = CollectorType.EXAMPLE + urls: Optional[list[TestURLCreationParameters]] = None + + @model_validator(mode='after') + def validate_urls(self): + if self.outcome != BatchStatus.READY_TO_LABEL: + if self.urls is not None: + raise ValueError('URLs cannot be provided if outcome is not READY_TO_LABEL') + return self + + if self.urls is None: + self.urls = [TestURLCreationParameters(count=1)] + return self \ No newline at end of file diff --git a/tests/test_automated/integration/api/helpers/RequestValidator.py b/tests/test_automated/integration/api/helpers/RequestValidator.py index 91d27729..9207305a 100644 --- a/tests/test_automated/integration/api/helpers/RequestValidator.py +++ b/tests/test_automated/integration/api/helpers/RequestValidator.py @@ -16,6 +16,12 @@ from core.DTOs.GetBatchLogsResponse import GetBatchLogsResponse from core.DTOs.GetBatchStatusResponse import GetBatchStatusResponse from core.DTOs.GetDuplicatesByBatchResponse import GetDuplicatesByBatchResponse +from core.DTOs.GetMetricsBacklogResponse import GetMetricsBacklogResponseDTO +from core.DTOs.GetMetricsBatchesAggregatedResponseDTO import GetMetricsBatchesAggregatedResponseDTO +from core.DTOs.GetMetricsBatchesBreakdownResponseDTO import GetMetricsBatchesBreakdownResponseDTO +from core.DTOs.GetMetricsURLsAggregatedResponseDTO import GetMetricsURLsAggregatedResponseDTO +from core.DTOs.GetMetricsURLsBreakdownPendingResponseDTO import GetMetricsURLsBreakdownPendingResponseDTO +from core.DTOs.GetMetricsURLsBreakdownSubmittedResponseDTO import GetMetricsURLsBreakdownSubmittedResponseDTO from core.DTOs.GetNextRecordTypeAnnotationResponseInfo import GetNextRecordTypeAnnotationResponseOuterInfo from core.DTOs.GetNextRelevanceAnnotationResponseInfo import GetNextRelevanceAnnotationResponseOuterInfo from core.DTOs.GetNextURLForAgencyAnnotationResponse import GetNextURLForAgencyAnnotationResponse, \ @@ -140,6 +146,19 @@ def post_v2( **kwargs ) + def get_v2( + self, + url: str, + params: Optional[dict] = None, + **kwargs + ) -> dict: + return self.open_v2( + method="GET", + url=url, + params=params, + **kwargs + ) + def put( self, @@ -393,4 +412,41 @@ async def search_url(self, url: str) -> SearchURLResponse: url=f"/search/url", params={"url": url} ) - return SearchURLResponse(**data) \ No newline at end of file + return SearchURLResponse(**data) + + async def get_batches_aggregated_metrics(self) -> GetMetricsBatchesAggregatedResponseDTO: + data = self.get_v2( + url="/metrics/batches/aggregated" + ) + return GetMetricsBatchesAggregatedResponseDTO(**data) + + async def get_batches_breakdown_metrics(self, page: int) -> GetMetricsBatchesBreakdownResponseDTO: + data = self.get_v2( + url="/metrics/batches/breakdown", + params={"page": page} + ) + return GetMetricsBatchesBreakdownResponseDTO(**data) + + async def get_urls_breakdown_submitted_metrics(self) -> GetMetricsURLsBreakdownSubmittedResponseDTO: + data = self.get_v2( + url="/metrics/urls/breakdown/submitted" + ) + return GetMetricsURLsBreakdownSubmittedResponseDTO(**data) + + async def get_urls_breakdown_pending_metrics(self) -> GetMetricsURLsBreakdownPendingResponseDTO: + data = self.get_v2( + url="/metrics/urls/breakdown/pending" + ) + return GetMetricsURLsBreakdownPendingResponseDTO(**data) + + async def get_backlog_metrics(self) -> GetMetricsBacklogResponseDTO: + data = self.get_v2( + url="/metrics/backlog" + ) + return GetMetricsBacklogResponseDTO(**data) + + async def get_urls_aggregated_metrics(self) -> GetMetricsURLsAggregatedResponseDTO: + data = self.get_v2( + url="/metrics/urls/aggregate", + ) + return GetMetricsURLsAggregatedResponseDTO(**data) \ No newline at end of file diff --git a/tests/test_automated/integration/api/test_metrics.py b/tests/test_automated/integration/api/test_metrics.py new file mode 100644 index 00000000..fc45ad0b --- /dev/null +++ b/tests/test_automated/integration/api/test_metrics.py @@ -0,0 +1,478 @@ +import pendulum +import pytest + +from collector_manager.enums import URLStatus, CollectorType +from core.enums import BatchStatus, RecordType +from tests.helpers.test_batch_creation_parameters import TestBatchCreationParameters, TestURLCreationParameters, \ + AnnotationInfo + + +@pytest.mark.asyncio +async def test_get_batches_aggregated_metrics(api_test_helper): + ath = api_test_helper + # Create successful batches with URLs of different statuses + all_params = [] + for i in range(3): + params = TestBatchCreationParameters( + strategy=CollectorType.MANUAL, + urls=[ + TestURLCreationParameters( + count=1, + status=URLStatus.PENDING + ), + TestURLCreationParameters( + count=2, + status=URLStatus.SUBMITTED + ), + TestURLCreationParameters( + count=3, + status=URLStatus.REJECTED + ), + TestURLCreationParameters( + count=4, + status=URLStatus.ERROR + ), + TestURLCreationParameters( + count=5, + status=URLStatus.VALIDATED + ) + ] + ) + all_params.append(params) + + + # Create failed batches + for i in range(2): + params = TestBatchCreationParameters( + outcome=BatchStatus.ERROR + ) + all_params.append(params) + + for params in all_params: + await ath.db_data_creator.batch_v2(params) + + dto = await ath.request_validator.get_batches_aggregated_metrics() + assert dto.total_batches == 5 + inner_dto_example = dto.by_strategy[CollectorType.EXAMPLE] + assert inner_dto_example.count_urls == 0 + assert inner_dto_example.count_successful_batches == 0 + assert inner_dto_example.count_failed_batches == 2 + assert inner_dto_example.count_urls_pending == 0 + assert inner_dto_example.count_urls_submitted == 0 + assert inner_dto_example.count_urls_rejected == 0 + assert inner_dto_example.count_urls_errors == 0 + assert inner_dto_example.count_urls_validated == 0 + + inner_dto_manual = dto.by_strategy[CollectorType.MANUAL] + assert inner_dto_manual.count_urls == 45 + assert inner_dto_manual.count_successful_batches == 3 + assert inner_dto_manual.count_failed_batches == 0 + assert inner_dto_manual.count_urls_pending == 3 + assert inner_dto_manual.count_urls_submitted == 6 + assert inner_dto_manual.count_urls_rejected == 9 + assert inner_dto_manual.count_urls_errors == 12 + assert inner_dto_manual.count_urls_validated == 15 + + +@pytest.mark.asyncio +async def test_get_batches_breakdown_metrics(api_test_helper): + # Create a different batch for each week, with different URLs + today = pendulum.today() + ath = api_test_helper + + batch_1_params = TestBatchCreationParameters( + strategy=CollectorType.MANUAL, + urls=[ + TestURLCreationParameters( + count=1, + status=URLStatus.PENDING + ), + TestURLCreationParameters( + count=2, + status=URLStatus.SUBMITTED + ), + ] + ) + batch_1 = await ath.db_data_creator.batch_v2(batch_1_params) + batch_2_params = TestBatchCreationParameters( + strategy=CollectorType.EXAMPLE, + outcome=BatchStatus.ERROR, + created_at=today.subtract(weeks=1), + ) + batch_2 = await ath.db_data_creator.batch_v2(batch_2_params) + batch_3_params = TestBatchCreationParameters( + strategy=CollectorType.AUTO_GOOGLER, + created_at=today.subtract(weeks=2), + urls=[ + TestURLCreationParameters( + count=3, + status=URLStatus.REJECTED + ), + TestURLCreationParameters( + count=4, + status=URLStatus.ERROR + ), + TestURLCreationParameters( + count=5, + status=URLStatus.VALIDATED + ), + ] + ) + batch_3 = await ath.db_data_creator.batch_v2(batch_3_params) + + dto_1 = await ath.request_validator.get_batches_breakdown_metrics( + page=1 + ) + assert len(dto_1.batches) == 3 + dto_batch_1 = dto_1.batches[2] + assert dto_batch_1.batch_id == batch_1.batch_id + assert dto_batch_1.strategy == CollectorType.MANUAL + assert dto_batch_1.status == BatchStatus.READY_TO_LABEL + assert pendulum.instance(dto_batch_1.created_at) > today + assert dto_batch_1.count_url_total == 3 + assert dto_batch_1.count_url_pending == 1 + assert dto_batch_1.count_url_submitted == 2 + assert dto_batch_1.count_url_rejected == 0 + assert dto_batch_1.count_url_error == 0 + assert dto_batch_1.count_url_validated == 0 + + dto_batch_2 = dto_1.batches[1] + assert dto_batch_2.batch_id == batch_2.batch_id + assert dto_batch_2.status == BatchStatus.ERROR + assert dto_batch_2.strategy == CollectorType.EXAMPLE + assert pendulum.instance(dto_batch_2.created_at) == today.subtract(weeks=1) + assert dto_batch_2.count_url_total == 0 + assert dto_batch_2.count_url_submitted == 0 + assert dto_batch_2.count_url_pending == 0 + assert dto_batch_2.count_url_rejected == 0 + assert dto_batch_2.count_url_error == 0 + assert dto_batch_2.count_url_validated == 0 + + dto_batch_3 = dto_1.batches[0] + assert dto_batch_3.batch_id == batch_3.batch_id + assert dto_batch_3.status == BatchStatus.READY_TO_LABEL + assert dto_batch_3.strategy == CollectorType.AUTO_GOOGLER + assert pendulum.instance(dto_batch_3.created_at) == today.subtract(weeks=2) + assert dto_batch_3.count_url_total == 12 + assert dto_batch_3.count_url_pending == 0 + assert dto_batch_3.count_url_submitted == 0 + assert dto_batch_3.count_url_rejected == 3 + assert dto_batch_3.count_url_error == 4 + assert dto_batch_3.count_url_validated == 5 + + dto_2 = await ath.request_validator.get_batches_breakdown_metrics( + page=2 + ) + assert len(dto_2.batches) == 0 + +@pytest.mark.asyncio +async def test_get_urls_breakdown_submitted_metrics(api_test_helper): + # Create URLs with submitted status, broken down in different amounts by different weeks + # And ensure the URLs are + today = pendulum.today() + ath = api_test_helper + + batch_1_params = TestBatchCreationParameters( + strategy=CollectorType.MANUAL, + urls=[ + TestURLCreationParameters( + count=1, + status=URLStatus.PENDING + ), + TestURLCreationParameters( + count=2, + status=URLStatus.SUBMITTED + ), + ] + ) + batch_1 = await ath.db_data_creator.batch_v2(batch_1_params) + batch_2_params = TestBatchCreationParameters( + strategy=CollectorType.EXAMPLE, + urls=[ + TestURLCreationParameters( + count=3, + status=URLStatus.SUBMITTED + ) + ], + created_at=today.subtract(weeks=1), + ) + batch_2 = await ath.db_data_creator.batch_v2(batch_2_params) + batch_3_params = TestBatchCreationParameters( + strategy=CollectorType.AUTO_GOOGLER, + created_at=today.subtract(weeks=1), + urls=[ + TestURLCreationParameters( + count=3, + status=URLStatus.SUBMITTED + ), + TestURLCreationParameters( + count=4, + status=URLStatus.ERROR + ), + TestURLCreationParameters( + count=5, + status=URLStatus.VALIDATED + ), + ] + ) + batch_3 = await ath.db_data_creator.batch_v2(batch_3_params) + + dto = await ath.request_validator.get_urls_breakdown_submitted_metrics() + assert len(dto.entries) == 2 + + entry_1 = dto.entries[0] + assert entry_1.count_submitted == 6 + + entry_2 = dto.entries[1] + assert entry_2.count_submitted == 2 + + +@pytest.mark.asyncio +async def test_get_urls_breakdown_pending_metrics(api_test_helper): + # Build URLs, broken down into three separate weeks, + # with each week having a different number of pending URLs + # with a different number of kinds of annotations per URLs + + + today = pendulum.today() + ath = api_test_helper + + agency_id = await ath.db_data_creator.agency() + # Additionally, add some URLs that are submitted, + # validated, errored, and ensure they are not counted + batch_1_params = TestBatchCreationParameters( + strategy=CollectorType.MANUAL, + urls=[ + TestURLCreationParameters( + count=1, + status=URLStatus.PENDING, + annotation_info=AnnotationInfo( + user_relevant=False + ) + ), + TestURLCreationParameters( + count=2, + status=URLStatus.SUBMITTED + ), + ] + ) + batch_1 = await ath.db_data_creator.batch_v2(batch_1_params) + batch_2_params = TestBatchCreationParameters( + strategy=CollectorType.EXAMPLE, + urls=[ + TestURLCreationParameters( + count=3, + status=URLStatus.PENDING, + annotation_info=AnnotationInfo( + user_relevant=True, + user_record_type=RecordType.CALLS_FOR_SERVICE + ) + ) + ], + created_at=today.subtract(weeks=1), + ) + batch_2 = await ath.db_data_creator.batch_v2(batch_2_params) + batch_3_params = TestBatchCreationParameters( + strategy=CollectorType.AUTO_GOOGLER, + created_at=today.subtract(weeks=1), + urls=[ + TestURLCreationParameters( + count=3, + status=URLStatus.SUBMITTED + ), + TestURLCreationParameters( + count=4, + status=URLStatus.ERROR + ), + TestURLCreationParameters( + count=5, + status=URLStatus.PENDING, + annotation_info=AnnotationInfo( + user_relevant=True, + user_record_type=RecordType.INCARCERATION_RECORDS, + user_agency=agency_id + ) + ), + ] + ) + batch_3 = await ath.db_data_creator.batch_v2(batch_3_params) + + dto = await ath.request_validator.get_urls_breakdown_pending_metrics() + assert len(dto.entries) == 2 + + entry_1 = dto.entries[0] + assert entry_1.count_pending_total == 8 + assert entry_1.count_pending_relevant_user == 8 + assert entry_1.count_pending_record_type_user == 8 + assert entry_1.count_pending_agency_user == 5 + + entry_2 = dto.entries[1] + assert entry_2.count_pending_total == 1 + assert entry_2.count_pending_relevant_user == 1 + assert entry_2.count_pending_record_type_user == 0 + assert entry_2.count_pending_agency_user == 0 + +@pytest.mark.asyncio +async def test_get_urls_aggregate_metrics(api_test_helper): + ath = api_test_helper + today = pendulum.today() + + batch_0_params = TestBatchCreationParameters( + strategy=CollectorType.MANUAL, + created_at=today.subtract(days=1), + urls=[ + TestURLCreationParameters( + count=1, + status=URLStatus.PENDING, + ), + ] + ) + batch_0 = await ath.db_data_creator.batch_v2(batch_0_params) + oldest_url_id = batch_0.url_creation_infos[URLStatus.PENDING].url_mappings[0].url_id + + + batch_1_params = TestBatchCreationParameters( + strategy=CollectorType.MANUAL, + urls=[ + TestURLCreationParameters( + count=1, + status=URLStatus.PENDING, + ), + TestURLCreationParameters( + count=2, + status=URLStatus.SUBMITTED + ), + ] + ) + batch_1 = await ath.db_data_creator.batch_v2(batch_1_params) + + batch_2_params = TestBatchCreationParameters( + strategy=CollectorType.AUTO_GOOGLER, + urls=[ + TestURLCreationParameters( + count=4, + status=URLStatus.PENDING, + ), + TestURLCreationParameters( + count=2, + status=URLStatus.ERROR + ), + TestURLCreationParameters( + count=1, + status=URLStatus.VALIDATED + ), + TestURLCreationParameters( + count=5, + status=URLStatus.REJECTED + ), + ] + ) + batch_2 = await ath.db_data_creator.batch_v2(batch_2_params) + + dto = await ath.request_validator.get_urls_aggregated_metrics() + + assert dto.oldest_pending_url_id == oldest_url_id + assert dto.oldest_pending_url_created_at == today.subtract(days=1).in_timezone('UTC').naive() + assert dto.count_urls_pending == 6 + assert dto.count_urls_rejected == 5 + assert dto.count_urls_errors == 2 + assert dto.count_urls_validated == 1 + assert dto.count_urls_submitted == 2 + assert dto.count_urls_total == 16 + + + +@pytest.mark.asyncio +async def test_get_backlog_metrics(api_test_helper): + today = pendulum.today() + + ath = api_test_helper + adb_client = ath.adb_client() + + + # Populate the backlog table and test that backlog metrics returned on a weekly basis + # Ensure that multiple days in each week are added to the backlog table, with different values + + + batch_1_params = TestBatchCreationParameters( + strategy=CollectorType.MANUAL, + urls=[ + TestURLCreationParameters( + count=1, + status=URLStatus.PENDING, + annotation_info=AnnotationInfo( + user_relevant=False + ) + ), + TestURLCreationParameters( + count=2, + status=URLStatus.SUBMITTED + ), + ] + ) + batch_1 = await ath.db_data_creator.batch_v2(batch_1_params) + + await adb_client.populate_backlog_snapshot( + dt=today.subtract(weeks=3).naive() + ) + + await adb_client.populate_backlog_snapshot( + dt=today.subtract(weeks=2, days=3).naive() + ) + + batch_2_params = TestBatchCreationParameters( + strategy=CollectorType.AUTO_GOOGLER, + urls=[ + TestURLCreationParameters( + count=4, + status=URLStatus.PENDING, + annotation_info=AnnotationInfo( + user_relevant=False + ) + ), + TestURLCreationParameters( + count=2, + status=URLStatus.ERROR + ), + ] + ) + batch_2 = await ath.db_data_creator.batch_v2(batch_2_params) + + await adb_client.populate_backlog_snapshot( + dt=today.subtract(weeks=2).naive() + ) + + await adb_client.populate_backlog_snapshot( + dt=today.subtract(weeks=1, days=4).naive() + ) + + batch_3_params = TestBatchCreationParameters( + strategy=CollectorType.AUTO_GOOGLER, + urls=[ + TestURLCreationParameters( + count=7, + status=URLStatus.PENDING, + annotation_info=AnnotationInfo( + user_relevant=False + ) + ), + TestURLCreationParameters( + count=5, + status=URLStatus.VALIDATED + ), + ] + ) + batch_3 = await ath.db_data_creator.batch_v2(batch_3_params) + + await adb_client.populate_backlog_snapshot( + dt=today.subtract(weeks=1).naive() + ) + + dto = await ath.request_validator.get_backlog_metrics() + + assert len(dto.entries) == 3 + + # Test that the count closest to the beginning of the week is returned for each week + assert dto.entries[0].count_pending_total == 1 + assert dto.entries[1].count_pending_total == 5 + assert dto.entries[2].count_pending_total == 12 \ No newline at end of file diff --git a/tests/test_automated/integration/tasks/test_submit_approved_url_task.py b/tests/test_automated/integration/tasks/test_submit_approved_url_task.py index 2d3aa192..32dc765c 100644 --- a/tests/test_automated/integration/tasks/test_submit_approved_url_task.py +++ b/tests/test_automated/integration/tasks/test_submit_approved_url_task.py @@ -4,7 +4,7 @@ import pytest from collector_db.enums import TaskType -from collector_db.models import URL, URLErrorInfo +from collector_db.models import URL, URLErrorInfo, URLDataSource from collector_manager.enums import URLStatus from core.DTOs.FinalReviewApprovalInfo import FinalReviewApprovalInfo from core.DTOs.TaskOperatorRunInfo import TaskOperatorOutcome @@ -152,10 +152,18 @@ async def test_submit_approved_url_task( assert url_2.outcome == URLStatus.SUBMITTED.value assert url_3.outcome == URLStatus.ERROR.value - # Check URLs now have data source ids - assert url_1.data_source_id == 21 - assert url_2.data_source_id == 34 - assert url_3.data_source_id is None + # Get URL Data Source Links + url_data_sources = await db_data_creator.adb_client.get_all(URLDataSource) + assert len(url_data_sources) == 2 + + url_data_source_1 = url_data_sources[0] + url_data_source_2 = url_data_sources[1] + + assert url_data_source_1.url_id == url_1.id + assert url_data_source_1.data_source_id == 21 + + assert url_data_source_2.url_id == url_2.id + assert url_data_source_2.data_source_id == 34 # Check that errored URL has entry in url_error_info url_errors = await db_data_creator.adb_client.get_all(URLErrorInfo) diff --git a/uv.lock b/uv.lock index f2ea60ae..bb269479 100644 --- a/uv.lock +++ b/uv.lock @@ -348,6 +348,7 @@ dependencies = [ { name = "ckanapi" }, { name = "datasets" }, { name = "docker" }, + { name = "environs" }, { name = "fastapi", extra = ["standard"] }, { name = "from-root" }, { name = "google-api-python-client" }, @@ -379,6 +380,7 @@ dependencies = [ [package.dev-dependencies] dev = [ { name = "docker" }, + { name = "pendulum" }, { name = "pytest" }, { name = "pytest-asyncio" }, { name = "pytest-mock" }, @@ -396,6 +398,7 @@ requires-dist = [ { name = "ckanapi", specifier = "~=4.8" }, { name = "datasets", specifier = "~=2.19.1" }, { name = "docker", specifier = "~=7.1.0" }, + { name = "environs", specifier = ">=14.1.1" }, { name = "fastapi", extras = ["standard"], specifier = "~=0.115.6" }, { name = "from-root", specifier = "~=1.3.0" }, { name = "google-api-python-client", specifier = ">=2.156.0" }, @@ -427,6 +430,7 @@ requires-dist = [ [package.metadata.requires-dev] dev = [ { name = "docker", specifier = ">=7.1.0" }, + { name = "pendulum", specifier = ">=3.1.0" }, { name = "pytest", specifier = ">=7.2.2" }, { name = "pytest-asyncio", specifier = "~=0.25.2" }, { name = "pytest-mock", specifier = "==3.12.0" }, @@ -519,6 +523,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d7/ee/bf0adb559ad3c786f12bcbc9296b3f5675f529199bef03e2df281fa1fadb/email_validator-2.2.0-py3-none-any.whl", hash = "sha256:561977c2d73ce3611850a06fa56b414621e0c8faa9d66f2611407d87465da631", size = 33521, upload_time = "2024-06-20T11:30:28.248Z" }, ] +[[package]] +name = "environs" +version = "14.1.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "marshmallow" }, + { name = "python-dotenv" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/31/d3/e82bdbb8cc332e751f67a3f668c5d134d57f983497d9f3a59a375b6e8fd8/environs-14.1.1.tar.gz", hash = "sha256:03db7ee2d50ec697b68814cd175a3a05a7c7954804e4e419ca8b570dc5a835cf", size = 32050, upload_time = "2025-02-10T20:24:26.437Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f4/1c/ab9752f02d32d981d647c05822be9ff93809be8953dacea2da2bec9a9de9/environs-14.1.1-py3-none-any.whl", hash = "sha256:45bc56f1d53bbc59d8dd69bba97377dd88ec28b8229d81cedbd455b21789445b", size = 15566, upload_time = "2025-02-10T20:24:22.116Z" }, +] + [[package]] name = "fastapi" version = "0.115.12" @@ -1441,6 +1458,49 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ab/5f/b38085618b950b79d2d9164a711c52b10aefc0ae6833b96f626b7021b2ed/pandas-2.2.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:ad5b65698ab28ed8d7f18790a0dc58005c7629f227be9ecc1072aa74c0c1d43a", size = 13098436, upload_time = "2024-09-20T13:09:48.112Z" }, ] +[[package]] +name = "pendulum" +version = "3.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, + { name = "tzdata" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/23/7c/009c12b86c7cc6c403aec80f8a4308598dfc5995e5c523a5491faaa3952e/pendulum-3.1.0.tar.gz", hash = "sha256:66f96303560f41d097bee7d2dc98ffca716fbb3a832c4b3062034c2d45865015", size = 85930, upload_time = "2025-04-19T14:30:01.675Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5e/6e/d28d3c22e6708b819a94c05bd05a3dfaed5c685379e8b6dc4b34b473b942/pendulum-3.1.0-cp311-cp311-macosx_10_12_x86_64.whl", hash = "sha256:61a03d14f8c64d13b2f7d5859e4b4053c4a7d3b02339f6c71f3e4606bfd67423", size = 338596, upload_time = "2025-04-19T14:01:11.306Z" }, + { url = "https://files.pythonhosted.org/packages/e1/e6/43324d58021d463c2eeb6146b169d2c935f2f840f9e45ac2d500453d954c/pendulum-3.1.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:e674ed2d158afa5c361e60f1f67872dc55b492a10cacdaa7fcd7b7da5f158f24", size = 325854, upload_time = "2025-04-19T14:01:13.156Z" }, + { url = "https://files.pythonhosted.org/packages/b0/a7/d2ae79b960bfdea94dab67e2f118697b08bc9e98eb6bd8d32c4d99240da3/pendulum-3.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7c75377eb16e58bbe7e03ea89eeea49be6fc5de0934a4aef0e263f8b4fa71bc2", size = 344334, upload_time = "2025-04-19T14:01:15.151Z" }, + { url = "https://files.pythonhosted.org/packages/96/94/941f071212e23c29aae7def891fb636930c648386e059ce09ea0dcd43933/pendulum-3.1.0-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:656b8b0ce070f0f2e5e2668247d3c783c55336534aa1f13bd0969535878955e1", size = 382259, upload_time = "2025-04-19T14:01:16.924Z" }, + { url = "https://files.pythonhosted.org/packages/51/ad/a78a701656aec00d16fee636704445c23ca11617a0bfe7c3848d1caa5157/pendulum-3.1.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48962903e6c1afe1f13548cb6252666056086c107d59e3d64795c58c9298bc2e", size = 436361, upload_time = "2025-04-19T14:01:18.796Z" }, + { url = "https://files.pythonhosted.org/packages/da/93/83f59ccbf4435c29dca8c63a6560fcbe4783079a468a5f91d9f886fd21f0/pendulum-3.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d364ec3f8e65010fefd4b0aaf7be5eb97e5df761b107a06f5e743b7c3f52c311", size = 353653, upload_time = "2025-04-19T14:01:20.159Z" }, + { url = "https://files.pythonhosted.org/packages/6f/0f/42d6644ec6339b41066f594e52d286162aecd2e9735aaf994d7e00c9e09d/pendulum-3.1.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:dd52caffc2afb86612ec43bbeb226f204ea12ebff9f3d12f900a7d3097210fcc", size = 524567, upload_time = "2025-04-19T14:01:21.457Z" }, + { url = "https://files.pythonhosted.org/packages/de/45/d84d909202755ab9d3379e5481fdf70f53344ebefbd68d6f5803ddde98a6/pendulum-3.1.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d439fccaa35c91f686bd59d30604dab01e8b5c1d0dd66e81648c432fd3f8a539", size = 525571, upload_time = "2025-04-19T14:01:23.329Z" }, + { url = "https://files.pythonhosted.org/packages/0d/e0/4de160773ce3c2f7843c310db19dd919a0cd02cc1c0384866f63b18a6251/pendulum-3.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:43288773a86d9c5c0ddb645f88f615ff6bd12fd1410b34323662beccb18f3b49", size = 260259, upload_time = "2025-04-19T14:01:24.689Z" }, + { url = "https://files.pythonhosted.org/packages/c1/7f/ffa278f78112c6c6e5130a702042f52aab5c649ae2edf814df07810bbba5/pendulum-3.1.0-cp311-cp311-win_arm64.whl", hash = "sha256:569ea5072ae0f11d625e03b36d865f8037b76e838a3b621f6967314193896a11", size = 253899, upload_time = "2025-04-19T14:01:26.442Z" }, + { url = "https://files.pythonhosted.org/packages/7a/d7/b1bfe15a742f2c2713acb1fdc7dc3594ff46ef9418ac6a96fcb12a6ba60b/pendulum-3.1.0-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:4dfd53e7583ccae138be86d6c0a0b324c7547df2afcec1876943c4d481cf9608", size = 336209, upload_time = "2025-04-19T14:01:27.815Z" }, + { url = "https://files.pythonhosted.org/packages/eb/87/0392da0c603c828b926d9f7097fbdddaafc01388cb8a00888635d04758c3/pendulum-3.1.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6a6e06a28f3a7d696546347805536f6f38be458cb79de4f80754430696bea9e6", size = 323130, upload_time = "2025-04-19T14:01:29.336Z" }, + { url = "https://files.pythonhosted.org/packages/c0/61/95f1eec25796be6dddf71440ee16ec1fd0c573fc61a73bd1ef6daacd529a/pendulum-3.1.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7e68d6a51880708084afd8958af42dc8c5e819a70a6c6ae903b1c4bfc61e0f25", size = 341509, upload_time = "2025-04-19T14:01:31.1Z" }, + { url = "https://files.pythonhosted.org/packages/b5/7b/eb0f5e6aa87d5e1b467a1611009dbdc92f0f72425ebf07669bfadd8885a6/pendulum-3.1.0-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9e3f1e5da39a7ea7119efda1dd96b529748c1566f8a983412d0908455d606942", size = 378674, upload_time = "2025-04-19T14:01:32.974Z" }, + { url = "https://files.pythonhosted.org/packages/29/68/5a4c1b5de3e54e16cab21d2ec88f9cd3f18599e96cc90a441c0b0ab6b03f/pendulum-3.1.0-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:e9af1e5eeddb4ebbe1b1c9afb9fd8077d73416ade42dd61264b3f3b87742e0bb", size = 436133, upload_time = "2025-04-19T14:01:34.349Z" }, + { url = "https://files.pythonhosted.org/packages/87/5d/f7a1d693e5c0f789185117d5c1d5bee104f5b0d9fbf061d715fb61c840a8/pendulum-3.1.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:20f74aa8029a42e327bfc150472e0e4d2358fa5d795f70460160ba81b94b6945", size = 351232, upload_time = "2025-04-19T14:01:35.669Z" }, + { url = "https://files.pythonhosted.org/packages/30/77/c97617eb31f1d0554edb073201a294019b9e0a9bd2f73c68e6d8d048cd6b/pendulum-3.1.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:cf6229e5ee70c2660148523f46c472e677654d0097bec010d6730f08312a4931", size = 521562, upload_time = "2025-04-19T14:01:37.05Z" }, + { url = "https://files.pythonhosted.org/packages/76/22/0d0ef3393303877e757b848ecef8a9a8c7627e17e7590af82d14633b2cd1/pendulum-3.1.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:350cabb23bf1aec7c7694b915d3030bff53a2ad4aeabc8c8c0d807c8194113d6", size = 523221, upload_time = "2025-04-19T14:01:38.444Z" }, + { url = "https://files.pythonhosted.org/packages/99/f3/aefb579aa3cebd6f2866b205fc7a60d33e9a696e9e629024752107dc3cf5/pendulum-3.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:42959341e843077c41d47420f28c3631de054abd64da83f9b956519b5c7a06a7", size = 260502, upload_time = "2025-04-19T14:01:39.814Z" }, + { url = "https://files.pythonhosted.org/packages/02/74/4332b5d6e34c63d4df8e8eab2249e74c05513b1477757463f7fdca99e9be/pendulum-3.1.0-cp312-cp312-win_arm64.whl", hash = "sha256:006758e2125da2e624493324dfd5d7d1b02b0c44bc39358e18bf0f66d0767f5f", size = 253089, upload_time = "2025-04-19T14:01:41.171Z" }, + { url = "https://files.pythonhosted.org/packages/8e/1f/af928ba4aa403dac9569f787adcf024005e7654433d71f7a84e608716837/pendulum-3.1.0-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:28658b0baf4b30eb31d096a375983cfed033e60c0a7bbe94fa23f06cd779b50b", size = 336209, upload_time = "2025-04-19T14:01:42.775Z" }, + { url = "https://files.pythonhosted.org/packages/b6/16/b010643007ba964c397da7fa622924423883c1bbff1a53f9d1022cd7f024/pendulum-3.1.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:b114dcb99ce511cb8f5495c7b6f0056b2c3dba444ef1ea6e48030d7371bd531a", size = 323132, upload_time = "2025-04-19T14:01:44.577Z" }, + { url = "https://files.pythonhosted.org/packages/64/19/c3c47aeecb5d9bceb0e89faafd800d39809b696c5b7bba8ec8370ad5052c/pendulum-3.1.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2404a6a54c80252ea393291f0b7f35525a61abae3d795407f34e118a8f133a18", size = 341509, upload_time = "2025-04-19T14:01:46.084Z" }, + { url = "https://files.pythonhosted.org/packages/38/cf/c06921ff6b860ff7e62e70b8e5d4dc70e36f5abb66d168bd64d51760bc4e/pendulum-3.1.0-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d06999790d9ee9962a1627e469f98568bf7ad1085553fa3c30ed08b3944a14d7", size = 378674, upload_time = "2025-04-19T14:01:47.727Z" }, + { url = "https://files.pythonhosted.org/packages/62/0b/a43953b9eba11e82612b033ac5133f716f1b76b6108a65da6f408b3cc016/pendulum-3.1.0-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:94751c52f6b7c306734d1044c2c6067a474237e1e5afa2f665d1fbcbbbcf24b3", size = 436133, upload_time = "2025-04-19T14:01:49.126Z" }, + { url = "https://files.pythonhosted.org/packages/eb/a0/ec3d70b3b96e23ae1d039f132af35e17704c22a8250d1887aaefea4d78a6/pendulum-3.1.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5553ac27be05e997ec26d7f004cf72788f4ce11fe60bb80dda604a64055b29d0", size = 351232, upload_time = "2025-04-19T14:01:50.575Z" }, + { url = "https://files.pythonhosted.org/packages/f4/97/aba23f1716b82f6951ba2b1c9178a2d107d1e66c102762a9bf19988547ea/pendulum-3.1.0-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:f8dee234ca6142bf0514368d01a72945a44685aaa2fc4c14c98d09da9437b620", size = 521563, upload_time = "2025-04-19T14:01:51.9Z" }, + { url = "https://files.pythonhosted.org/packages/01/33/2c0d5216cc53d16db0c4b3d510f141ee0a540937f8675948541190fbd48b/pendulum-3.1.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:7378084fe54faab4ee481897a00b710876f2e901ded6221671e827a253e643f2", size = 523221, upload_time = "2025-04-19T14:01:53.275Z" }, + { url = "https://files.pythonhosted.org/packages/51/89/8de955c339c31aeae77fd86d3225509b998c81875e9dba28cb88b8cbf4b3/pendulum-3.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:8539db7ae2c8da430ac2515079e288948c8ebf7eb1edd3e8281b5cdf433040d6", size = 260501, upload_time = "2025-04-19T14:01:54.749Z" }, + { url = "https://files.pythonhosted.org/packages/15/c3/226a3837363e94f8722461848feec18bfdd7d5172564d53aa3c3397ff01e/pendulum-3.1.0-cp313-cp313-win_arm64.whl", hash = "sha256:1ce26a608e1f7387cd393fba2a129507c4900958d4f47b90757ec17656856571", size = 253087, upload_time = "2025-04-19T14:01:55.998Z" }, + { url = "https://files.pythonhosted.org/packages/6e/23/e98758924d1b3aac11a626268eabf7f3cf177e7837c28d47bf84c64532d0/pendulum-3.1.0-py3-none-any.whl", hash = "sha256:f9178c2a8e291758ade1e8dd6371b1d26d08371b4c7730a6e9a3ef8b16ebae0f", size = 111799, upload_time = "2025-04-19T14:02:34.739Z" }, +] + [[package]] name = "playwright" version = "1.49.1"