From c18bd686865312c44c9c97d35eccc1a90722209d Mon Sep 17 00:00:00 2001 From: Max Chis Date: Thu, 17 Apr 2025 14:33:33 -0400 Subject: [PATCH] feat(app): Add `/batch` filter for batches with pending URLs --- api/routes/batch.py | 14 +++- collector_db/AsyncDatabaseClient.py | 28 ++++++- core/AsyncCore.py | 4 +- tests/helpers/DBDataCreator.py | 31 ++++++-- .../api/helpers/RequestValidator.py | 10 ++- .../integration/api/test_batch.py | 77 +++++++++++++++++++ .../collector_db/test_db_client.py | 4 - 7 files changed, 148 insertions(+), 20 deletions(-) diff --git a/api/routes/batch.py b/api/routes/batch.py index 9d4b62cc..2c791503 100644 --- a/api/routes/batch.py +++ b/api/routes/batch.py @@ -1,11 +1,10 @@ from typing import Optional -from fastapi import Path, APIRouter, HTTPException +from fastapi import Path, APIRouter from fastapi.params import Query, Depends from api.dependencies import get_core, get_async_core from collector_db.DTOs.BatchInfo import BatchInfo -from collector_manager.CollectorManager import InvalidCollectorError from collector_manager.enums import CollectorType from core.AsyncCore import AsyncCore from core.DTOs.GetBatchLogsResponse import GetBatchLogsResponse @@ -34,6 +33,10 @@ async def get_batch_status( description="Filter by status", default=None ), + has_pending_urls: Optional[bool] = Query( + description="Filter by whether the batch has pending URLs", + default=None + ), page: int = Query( description="The page number", default=1 @@ -44,7 +47,12 @@ async def get_batch_status( """ Get the status of recent batches """ - return await core.get_batch_statuses(collector_type=collector_type, status=status, page=page) + return await core.get_batch_statuses( + collector_type=collector_type, + status=status, + has_pending_urls=has_pending_urls, + page=page + ) @batch_router.get("/{batch_id}") diff --git a/collector_db/AsyncDatabaseClient.py b/collector_db/AsyncDatabaseClient.py index 8ceda774..957a4eb6 100644 --- a/collector_db/AsyncDatabaseClient.py +++ b/collector_db/AsyncDatabaseClient.py @@ -1567,17 +1567,37 @@ async def get_recent_batch_status_info( page: int, collector_type: Optional[CollectorType] = None, status: Optional[BatchStatus] = None, + has_pending_urls: Optional[bool] = None ) -> List[BatchInfo]: # Get only the batch_id, collector_type, status, and created_at limit = 100 - query = (Select(Batch) - .order_by(Batch.date_generated.desc())) + query = Select(Batch) + if has_pending_urls is not None: + if has_pending_urls: + # Query for all that have pending URLs + query = query.join(URL, Batch.id == URL.batch_id).filter(URL.outcome == URLStatus.PENDING.value) + else: + # Query for all that DO NOT have pending URLs + # (or that have no URLs at all) + query = query.join( + URL, + Batch.id == URL.batch_id, + isouter=True + ).filter( + or_( + URL.outcome != URLStatus.PENDING.value, + URL.outcome.is_(None) + ) + ) if collector_type: query = query.filter(Batch.strategy == collector_type.value) if status: query = query.filter(Batch.status == status.value) - query = (query.limit(limit) - .offset((page - 1) * limit)) + + query = (query. + order_by(Batch.date_generated.desc()). + limit(limit). + offset((page - 1) * limit)) raw_results = await session.execute(query) batches = raw_results.scalars().all() return [BatchInfo(**batch.__dict__) for batch in batches] diff --git a/core/AsyncCore.py b/core/AsyncCore.py index cb9a80bc..d436d3c9 100644 --- a/core/AsyncCore.py +++ b/core/AsyncCore.py @@ -67,12 +67,14 @@ async def get_batch_statuses( self, collector_type: Optional[CollectorType], status: Optional[BatchStatus], + has_pending_urls: Optional[bool], page: int ) -> GetBatchStatusResponse: results = await self.adb_client.get_recent_batch_status_info( collector_type=collector_type, status=status, - page=page + page=page, + has_pending_urls=has_pending_urls ) return GetBatchStatusResponse(results=results) diff --git a/tests/helpers/DBDataCreator.py b/tests/helpers/DBDataCreator.py index 613bfe4d..28d8a573 100644 --- a/tests/helpers/DBDataCreator.py +++ b/tests/helpers/DBDataCreator.py @@ -36,11 +36,15 @@ def __init__(self, db_client: Optional[DatabaseClient] = None): self.db_client = DatabaseClient() self.adb_client: AsyncDatabaseClient = AsyncDatabaseClient() - def batch(self, strategy: CollectorType = CollectorType.EXAMPLE) -> int: + def batch( + self, + strategy: CollectorType = CollectorType.EXAMPLE, + batch_status: BatchStatus = BatchStatus.IN_PROCESS + ) -> int: return self.db_client.insert_batch( BatchInfo( strategy=strategy.value, - status=BatchStatus.IN_PROCESS, + status=batch_status, total_url_count=1, parameters={"test_key": "test_value"}, user_id=1 @@ -56,11 +60,26 @@ async def task(self, url_ids: Optional[list[int]] = None) -> int: async def batch_and_urls( self, strategy: CollectorType = CollectorType.EXAMPLE, - url_count: int = 1, - with_html_content: bool = False + url_count: int = 3, + with_html_content: bool = False, + batch_status: BatchStatus = BatchStatus.READY_TO_LABEL, + url_status: URLStatus = URLStatus.PENDING ) -> BatchURLCreationInfo: - batch_id = self.batch(strategy=strategy) - iuis: InsertURLsInfo = self.urls(batch_id=batch_id, url_count=url_count) + batch_id = self.batch( + strategy=strategy, + batch_status=batch_status + ) + if batch_status in (BatchStatus.ERROR, BatchStatus.ABORTED): + return BatchURLCreationInfo( + batch_id=batch_id, + url_ids=[], + urls=[] + ) + iuis: InsertURLsInfo = self.urls( + batch_id=batch_id, + url_count=url_count, + outcome=url_status + ) url_ids = [iui.url_id for iui in iuis.url_mappings] if with_html_content: await self.html_data(url_ids) diff --git a/tests/test_automated/integration/api/helpers/RequestValidator.py b/tests/test_automated/integration/api/helpers/RequestValidator.py index f8ada6ae..4a12bb0e 100644 --- a/tests/test_automated/integration/api/helpers/RequestValidator.py +++ b/tests/test_automated/integration/api/helpers/RequestValidator.py @@ -120,13 +120,19 @@ def delete( expected_response=expected_response, **kwargs) - def get_batch_statuses(self, collector_type: Optional[CollectorType] = None, status: Optional[BatchStatus] = None) -> GetBatchStatusResponse: + def get_batch_statuses( + self, + collector_type: Optional[CollectorType] = None, + status: Optional[BatchStatus] = None, + has_pending_urls: Optional[bool] = None + ) -> GetBatchStatusResponse: params = {} update_if_not_none( target=params, source={ "collector_type": collector_type.value if collector_type else None, - "status": status.value if status else None + "status": status.value if status else None, + "has_pending_urls": has_pending_urls } ) data = self.get( diff --git a/tests/test_automated/integration/api/test_batch.py b/tests/test_automated/integration/api/test_batch.py index 604e2d67..bc86dfec 100644 --- a/tests/test_automated/integration/api/test_batch.py +++ b/tests/test_automated/integration/api/test_batch.py @@ -1,11 +1,88 @@ import asyncio import time +import pytest + from collector_db.DTOs.BatchInfo import BatchInfo from collector_db.DTOs.InsertURLsInfo import InsertURLsInfo from collector_manager.DTOs.ExampleInputDTO import ExampleInputDTO +from collector_manager.enums import CollectorType, URLStatus from core.enums import BatchStatus +@pytest.mark.asyncio +async def test_get_batch_status_pending_url_filter(api_test_helper): + ath = api_test_helper + + # Add an errored out batch + batch_error = await ath.db_data_creator.batch_and_urls( + strategy=CollectorType.EXAMPLE, + url_count=1, + batch_status=BatchStatus.ERROR + ) + + # Add a batch with pending urls + batch_pending = await ath.db_data_creator.batch_and_urls( + strategy=CollectorType.EXAMPLE, + url_count=1, + batch_status=BatchStatus.READY_TO_LABEL, + with_html_content=True, + url_status=URLStatus.PENDING + ) + + # Add a batch with submitted URLs + batch_submitted = await ath.db_data_creator.batch_and_urls( + strategy=CollectorType.EXAMPLE, + url_count=1, + batch_status=BatchStatus.READY_TO_LABEL, + with_html_content=True, + url_status=URLStatus.SUBMITTED + ) + + # Add an aborted batch + batch_aborted = await ath.db_data_creator.batch_and_urls( + strategy=CollectorType.EXAMPLE, + url_count=1, + batch_status=BatchStatus.ABORTED + ) + + # Add a batch with validated URLs + batch_validated = await ath.db_data_creator.batch_and_urls( + strategy=CollectorType.EXAMPLE, + url_count=1, + batch_status=BatchStatus.READY_TO_LABEL, + with_html_content=True, + url_status=URLStatus.VALIDATED + ) + + # Test filter for pending URLs and only retrieve the second batch + pending_urls_results = ath.request_validator.get_batch_statuses( + has_pending_urls=True + ) + + assert len(pending_urls_results.results) == 1 + assert pending_urls_results.results[0].id == batch_pending.batch_id + + # Test filter without pending URLs and retrieve the other four batches + no_pending_urls_results = ath.request_validator.get_batch_statuses( + has_pending_urls=False + ) + + assert len(no_pending_urls_results.results) == 4 + for result in no_pending_urls_results.results: + assert result.id in [ + batch_error.batch_id, + batch_submitted.batch_id, + batch_validated.batch_id, + batch_aborted.batch_id + ] + + # Test no filter for pending URLs and retrieve all batches + no_filter_results = ath.request_validator.get_batch_statuses() + + assert len(no_filter_results.results) == 5 + + + def test_abort_batch(api_test_helper): ath = api_test_helper diff --git a/tests/test_automated/integration/collector_db/test_db_client.py b/tests/test_automated/integration/collector_db/test_db_client.py index 5ea0bee2..5560577e 100644 --- a/tests/test_automated/integration/collector_db/test_db_client.py +++ b/tests/test_automated/integration/collector_db/test_db_client.py @@ -269,10 +269,6 @@ async def test_get_next_url_for_final_review_favor_more_components(db_data_creat assert result.id == url_mapping_with_user_anno.url_id - - - - @pytest.mark.asyncio async def test_get_next_url_for_final_review_no_annotations(db_data_creator: DBDataCreator): """