Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ENV.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ Please ensure these are properly defined in a `.env` file in the root directory.
|`POSTGRES_DB` | The database name for the test database | `source_collector_test_db` |
|`POSTGRES_HOST` | The host for the test database | `127.0.0.1` |
|`POSTGRES_PORT` | The port for the test database | `5432` |
|`DS_APP_SECRET_KEY`| The secret key used for decoding JWT tokens produced by the Data Sources App. Must match the secret token `JWT_SECRET_KEY` that is used in the Data Sources App for encoding. | `abc123` |
|`DS_APP_SECRET_KEY`| The secret key used for decoding JWT tokens produced by the Data Sources App. Must match the secret token `JWT_SECRET_KEY` that is used in the Data Sources App for encoding. | `abc123` |
|`DEV`| Set to any value to run the application in development mode. | `true` |
|`DEEPSEEK_API_KEY`| The API key required for accessing the DeepSeek API. | `abc123` |
|`OPENAI_API_KEY`| The API key required for accessing the OpenAI API. | `abc123` |
|`PDAP_EMAIL`| An email address for accessing the PDAP API. | `abc123@test.com` |
|`PDAP_PASSWORD`| A password for accessing the PDAP API. | `abc123` |
|`PDAP_EMAIL`| An email address for accessing the PDAP API.[^1] | `abc123@test.com` |
|`PDAP_PASSWORD`| A password for accessing the PDAP API.[^1] | `abc123` |
|`PDAP_API_KEY`| An API key for accessing the PDAP API. | `abc123` |
|`PDAP_API_URL`| The URL for the PDAP API| `https://data-sources-v2.pdap.dev/api`|
|`DISCORD_WEBHOOK_URL`| The URL for the Discord webhook used for notifications| `abc123` |

[^1:] The user account in question will require elevated permissions to access certain endpoints. At a minimum, the user will require the `source_collector` and `db_write` permissions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Add data source ID column to URL table

Revision ID: 33a546c93441
Revises: 45271f8fe75d
Create Date: 2025-03-29 17:16:11.863064

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '33a546c93441'
down_revision: Union[str, None] = '45271f8fe75d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:

Check warning on line 21 in alembic/versions/2025_03_29_1716-33a546c93441_add_data_source_id_column_to_url_table.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_03_29_1716-33a546c93441_add_data_source_id_column_to_url_table.py#L21 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_03_29_1716-33a546c93441_add_data_source_id_column_to_url_table.py:21:1: D103 Missing docstring in public function
op.add_column(
'urls',
sa.Column('data_source_id', sa.Integer(), nullable=True)
)
# Add unique constraint to data_source_id column
op.create_unique_constraint('uq_data_source_id', 'urls', ['data_source_id'])


def downgrade() -> None:

Check warning on line 30 in alembic/versions/2025_03_29_1716-33a546c93441_add_data_source_id_column_to_url_table.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_03_29_1716-33a546c93441_add_data_source_id_column_to_url_table.py#L30 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_03_29_1716-33a546c93441_add_data_source_id_column_to_url_table.py:30:1: D103 Missing docstring in public function
op.drop_column('urls', 'data_source_id')
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Add Submit URL Task Type Enum

Revision ID: b363794fa4e9
Revises: 33a546c93441
Create Date: 2025-04-15 13:38:58.293627

"""
from typing import Sequence, Union


from util.alembic_helpers import switch_enum_type

# revision identifiers, used by Alembic.
revision: str = 'b363794fa4e9'
down_revision: Union[str, None] = '33a546c93441'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:

Check warning on line 20 in alembic/versions/2025_04_15_1338-b363794fa4e9_add_submit_url_task_type_enum.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_04_15_1338-b363794fa4e9_add_submit_url_task_type_enum.py#L20 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_04_15_1338-b363794fa4e9_add_submit_url_task_type_enum.py:20:1: D103 Missing docstring in public function
switch_enum_type(
table_name='tasks',
column_name='task_type',
enum_name='task_type',
new_enum_values=[
"HTML",
"Relevancy",
"Record Type",
"Agency Identification",
"Misc Metadata",
"Submit Approved URLs"
]
)


def downgrade() -> None:

Check warning on line 36 in alembic/versions/2025_04_15_1338-b363794fa4e9_add_submit_url_task_type_enum.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_04_15_1338-b363794fa4e9_add_submit_url_task_type_enum.py#L36 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_04_15_1338-b363794fa4e9_add_submit_url_task_type_enum.py:36:1: D103 Missing docstring in public function
switch_enum_type(
table_name='tasks',
column_name='task_type',
enum_name='task_type',
new_enum_values=[
"HTML",
"Relevancy",
"Record Type",
"Agency Identification",
"Misc Metadata",
]
)

Check warning on line 48 in alembic/versions/2025_04_15_1338-b363794fa4e9_add_submit_url_task_type_enum.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_04_15_1338-b363794fa4e9_add_submit_url_task_type_enum.py#L48 <292>

no newline at end of file
Raw output
./alembic/versions/2025_04_15_1338-b363794fa4e9_add_submit_url_task_type_enum.py:48:6: W292 no newline at end of file
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Revert to pending validated URLs without name and add constraint

Revision ID: ed06a5633d2e
Revises: b363794fa4e9
Create Date: 2025-04-15 15:32:26.465488

"""
from typing import Sequence, Union

from alembic import op


# revision identifiers, used by Alembic.
revision: str = 'ed06a5633d2e'
down_revision: Union[str, None] = 'b363794fa4e9'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:

Check warning on line 20 in alembic/versions/2025_04_15_1532-ed06a5633d2e_revert_to_pending_validated_urls_.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_04_15_1532-ed06a5633d2e_revert_to_pending_validated_urls_.py#L20 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_04_15_1532-ed06a5633d2e_revert_to_pending_validated_urls_.py:20:1: D103 Missing docstring in public function

op.execute(
"""
UPDATE public.urls
SET OUTCOME = 'pending'
WHERE OUTCOME = 'validated' AND NAME IS NULL
"""
)

op.create_check_constraint(
'url_name_not_null_when_validated',
'urls',
"NAME IS NOT NULL OR OUTCOME != 'validated'"
)


def downgrade() -> None:

Check warning on line 37 in alembic/versions/2025_04_15_1532-ed06a5633d2e_revert_to_pending_validated_urls_.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_04_15_1532-ed06a5633d2e_revert_to_pending_validated_urls_.py#L37 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_04_15_1532-ed06a5633d2e_revert_to_pending_validated_urls_.py:37:1: D103 Missing docstring in public function
op.drop_constraint(
'url_name_not_null_when_validated',
'urls',
type_='check'
)
32 changes: 26 additions & 6 deletions api/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from contextlib import asynccontextmanager

import aiohttp
import uvicorn
from fastapi import FastAPI

Expand All @@ -15,25 +16,35 @@
from collector_manager.AsyncCollectorManager import AsyncCollectorManager
from core.AsyncCore import AsyncCore
from core.AsyncCoreLogger import AsyncCoreLogger
from core.EnvVarManager import EnvVarManager
from core.ScheduledTaskManager import AsyncScheduledTaskManager
from core.SourceCollectorCore import SourceCollectorCore
from core.TaskManager import TaskManager
from html_tag_collector.ResponseParser import HTMLResponseParser
from html_tag_collector.RootURLCache import RootURLCache
from html_tag_collector.URLRequestInterface import URLRequestInterface
from hugging_face.HuggingFaceInterface import HuggingFaceInterface
from pdap_api_client.AccessManager import AccessManager
from pdap_api_client.PDAPClient import PDAPClient
from util.DiscordNotifier import DiscordPoster
from util.helper_functions import get_from_env



@asynccontextmanager
async def lifespan(app: FastAPI):
env_var_manager = EnvVarManager.get()

# Initialize shared dependencies
db_client = DatabaseClient()
adb_client = AsyncDatabaseClient()
db_client = DatabaseClient(
db_url=env_var_manager.get_postgres_connection_string()
)
adb_client = AsyncDatabaseClient(
db_url=env_var_manager.get_postgres_connection_string(is_async=True)
)
await setup_database(db_client)
core_logger = AsyncCoreLogger(adb_client=adb_client)

session = aiohttp.ClientSession()

source_collector_core = SourceCollectorCore(
db_client=DatabaseClient(),
Expand All @@ -46,7 +57,15 @@ async def lifespan(app: FastAPI):
root_url_cache=RootURLCache()
),
discord_poster=DiscordPoster(
webhook_url=get_from_env("DISCORD_WEBHOOK_URL")
webhook_url=env_var_manager.discord_webhook_url
),
pdap_client=PDAPClient(
access_manager=AccessManager(
email=env_var_manager.pdap_email,
password=env_var_manager.pdap_password,
api_key=env_var_manager.pdap_api_key,
session=session
)
)
)
async_collector_manager = AsyncCollectorManager(
Expand All @@ -72,17 +91,17 @@ async def lifespan(app: FastAPI):
yield # Code here runs before shutdown

# Shutdown logic (if needed)
# Clean up resources, close connections, etc.
await core_logger.shutdown()
await async_core.shutdown()
source_collector_core.shutdown()
# Clean up resources, close connections, etc.
await session.close()
pass


async def setup_database(db_client):
# Initialize database if dev environment, otherwise apply migrations
try:
get_from_env("DEV")
db_client.init_db()
except Exception as e:
return
Expand All @@ -95,6 +114,7 @@ async def setup_database(db_client):
lifespan=lifespan
)


routers = [
root_router,
collector_router,
Expand Down
18 changes: 9 additions & 9 deletions api/routes/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


@batch_router.get("")
def get_batch_status(
async def get_batch_status(
collector_type: Optional[CollectorType] = Query(
description="Filter by collector type",
default=None
Expand All @@ -38,13 +38,13 @@
description="The page number",
default=1
),
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> GetBatchStatusResponse:
"""
Get the status of recent batches
"""
return core.get_batch_statuses(collector_type=collector_type, status=status, page=page)
return await core.get_batch_statuses(collector_type=collector_type, status=status, page=page)


@batch_router.get("/{batch_id}")
Expand All @@ -69,28 +69,28 @@
return await core.get_urls_by_batch(batch_id, page=page)

@batch_router.get("/{batch_id}/duplicates")
def get_duplicates_by_batch(
async def get_duplicates_by_batch(

Check warning on line 72 in api/routes/batch.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] api/routes/batch.py#L72 <103>

Missing docstring in public function
Raw output
./api/routes/batch.py:72:1: D103 Missing docstring in public function
batch_id: int = Path(description="The batch id"),
page: int = Query(
description="The page number",
default=1
),
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> GetDuplicatesByBatchResponse:
return core.get_duplicate_urls_by_batch(batch_id, page=page)
return await core.get_duplicate_urls_by_batch(batch_id, page=page)

@batch_router.get("/{batch_id}/logs")
def get_batch_logs(
async def get_batch_logs(
batch_id: int = Path(description="The batch id"),
core: SourceCollectorCore = Depends(get_core),
async_core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> GetBatchLogsResponse:
"""
Retrieve the logs for a recent batch.
Note that for later batches, the logs may not be available.
"""
return core.get_batch_logs(batch_id)
return await async_core.get_batch_logs(batch_id)

@batch_router.post("/{batch_id}/abort")
async def abort_batch(
Expand Down
Loading