diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..dfdb8b77 --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +*.sh text eol=lf diff --git a/.github/workflows/test_app.yml b/.github/workflows/test_app.yml index e16d1771..c869304a 100644 --- a/.github/workflows/test_app.yml +++ b/.github/workflows/test_app.yml @@ -1,21 +1,6 @@ -# This workflow will test the Source Collector App -# Utilizing the docker-compose file in the root directory name: Test Source Collector App -on: pull_request -#jobs: -# build: -# runs-on: ubuntu-latest -# steps: -# - name: Checkout repository -# uses: actions/checkout@v4 -# - name: Run docker-compose -# uses: hoverkraft-tech/compose-action@v2.0.1 -# with: -# compose-file: "docker-compose.yml" -# - name: Execute tests in the running service -# run: | -# docker ps -a && docker exec data-source-identification-app-1 pytest /app/tests/test_automated +on: pull_request jobs: container-job: @@ -34,22 +19,41 @@ jobs: --health-timeout 5s --health-retries 5 + env: # <-- Consolidated env block here + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + POSTGRES_DB: source_collector_test_db + POSTGRES_HOST: postgres + POSTGRES_PORT: 5432 + DATA_SOURCES_HOST: postgres + DATA_SOURCES_PORT: 5432 + DATA_SOURCES_USER: postgres + DATA_SOURCES_PASSWORD: postgres + DATA_SOURCES_DB: test_data_sources_db + FDW_DATA_SOURCES_HOST: postgres + FDW_DATA_SOURCES_PORT: 5432 + FDW_DATA_SOURCES_USER: postgres + FDW_DATA_SOURCES_PASSWORD: postgres + FDW_DATA_SOURCES_DB: test_data_sources_db + GOOGLE_API_KEY: TEST + GOOGLE_CSE_ID: TEST + steps: - name: Checkout repository uses: actions/checkout@v4 + + - name: Install PostgreSQL client tools + run: | + apt-get update + apt-get install -y postgresql-client + - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt + python -m local_database.create_database --use-shell + - name: Run tests run: | pytest tests/test_automated pytest tests/test_alembic - env: - POSTGRES_PASSWORD: postgres - POSTGRES_USER: postgres - POSTGRES_DB: postgres - POSTGRES_HOST: postgres - POSTGRES_PORT: 5432 - GOOGLE_API_KEY: TEST - GOOGLE_CSE_ID: TEST diff --git a/ENV.md b/ENV.md index 5292320b..fdd7d029 100644 --- a/ENV.md +++ b/ENV.md @@ -21,4 +21,24 @@ Please ensure these are properly defined in a `.env` file in the root directory. |`PDAP_API_URL`| The URL for the PDAP API| `https://data-sources-v2.pdap.dev/api`| |`DISCORD_WEBHOOK_URL`| The URL for the Discord webhook used for notifications| `abc123` | -[^1:] The user account in question will require elevated permissions to access certain endpoints. At a minimum, the user will require the `source_collector` and `db_write` permissions. \ No newline at end of file +[^1:] The user account in question will require elevated permissions to access certain endpoints. At a minimum, the user will require the `source_collector` and `db_write` permissions. + +## Foreign Data Wrapper (FDW) +``` +FDW_DATA_SOURCES_HOST=127.0.0.1 # The host of the Data Sources Database, used for FDW setup +FDW_DATA_SOURCES_PORT=1234 # The port of the Data Sources Database, used for FDW setup +FDW_DATA_SOURCES_USER=fdw_user # The username for the Data Sources Database, used for FDW setup +FDW_DATA_SOURCES_PASSWORD=password # The password for the Data Sources Database, used for FDW setup +FDW_DATA_SOURCES_DB=db_name # The database name for the Data Sources Database, used for FDW setup + +``` + +## Data Dumper + +``` +PROD_DATA_SOURCES_HOST=127.0.0.1 # The host of the production Data Sources Database, used for Data Dumper +PROD_DATA_SOURCES_PORT=1234 # The port of the production Data Sources Database, used for Data Dumper +PROD_DATA_SOURCES_USER=dump_user # The username for the production Data Sources Database, used for Data Dumper +PROD_DATA_SOURCES_PASSWORD=password # The password for the production Data Sources Database, used for Data Dumper +PROD_DATA_SOURCES_DB=db_name # The database name for the production Data Sources Database, used for Data Dumper +``` \ No newline at end of file diff --git a/alembic/versions/2025_04_21_1817-13f1272f94b9_set_up_foreign_data_wrapper.py b/alembic/versions/2025_04_21_1817-13f1272f94b9_set_up_foreign_data_wrapper.py new file mode 100644 index 00000000..1b73f5f4 --- /dev/null +++ b/alembic/versions/2025_04_21_1817-13f1272f94b9_set_up_foreign_data_wrapper.py @@ -0,0 +1,250 @@ +"""Set up foreign data wrapper + +Revision ID: 13f1272f94b9 +Revises: e285e6e7cf71 +Create Date: 2025-04-21 18:17:34.593973 + +""" +import os +from typing import Sequence, Union + +from alembic import op +from dotenv import load_dotenv + +# revision identifiers, used by Alembic. +revision: str = '13f1272f94b9' +down_revision: Union[str, None] = 'e285e6e7cf71' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + + load_dotenv() + remote_host = os.getenv("FDW_DATA_SOURCES_HOST") + user = os.getenv("FDW_DATA_SOURCES_USER") + password = os.getenv("FDW_DATA_SOURCES_PASSWORD") + db_name = os.getenv("FDW_DATA_SOURCES_DB") + port = os.getenv("FDW_DATA_SOURCES_PORT") + + op.execute(f"CREATE EXTENSION IF NOT EXISTS postgres_fdw;") + + op.execute(f""" + CREATE SERVER data_sources_server + FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (host '{remote_host}', dbname '{db_name}', port '{port}'); + """) + + op.execute(f""" + CREATE USER MAPPING FOR {user} + SERVER data_sources_server + OPTIONS (user '{user}', password '{password}'); + """) + + op.execute('CREATE SCHEMA if not exists "remote";') + + # Users table + op.execute(""" + CREATE FOREIGN TABLE IF NOT EXISTS "remote".users + ( + id bigint, + created_at timestamp with time zone, + updated_at timestamp with time zone, + email text, + password_digest text, + api_key character varying, + role text + ) + SERVER data_sources_server + OPTIONS ( + schema_name 'public', + table_name 'users' + ); + """) + + # Agencies + # -Enums + # --Jurisdiction Type + op.execute(""" + CREATE TYPE jurisdiction_type AS ENUM + ('school', 'county', 'local', 'port', 'tribal', 'transit', 'state', 'federal'); + """) + # --Agency Type + op.execute(""" + CREATE TYPE agency_type AS ENUM + ('incarceration', 'law enforcement', 'aggregated', 'court', 'unknown'); + """) + + # -Table + op.execute(""" + CREATE FOREIGN TABLE IF NOT EXISTS "remote".agencies + ( + name character , + homepage_url character , + jurisdiction_type jurisdiction_type , + lat double precision, + lng double precision, + defunct_year character , + airtable_uid character , + agency_type agency_type , + multi_agency boolean , + no_web_presence boolean , + airtable_agency_last_modified timestamp with time zone, + rejection_reason character , + last_approval_editor character , + submitter_contact character, + agency_created timestamp with time zone, + id integer, + approval_status text, + creator_user_id integer + ) + SERVER data_sources_server + OPTIONS ( + schema_name 'public', + table_name 'agencies' + ); + """) + + # Locations Table + # -Enums + # --Location Type + op.execute(""" + CREATE TYPE location_type AS ENUM + ('State', 'County', 'Locality'); + """) + + # -Table + op.execute(""" + CREATE FOREIGN TABLE IF NOT EXISTS "remote".locations + ( + id bigint, + type location_type, + state_id bigint, + county_id bigint, + locality_id bigint + ) + SERVER data_sources_server + OPTIONS ( + schema_name 'public', + table_name 'locations' + ); + """) + + # Data Sources Table + + # -Enums + # -- access_type + op.execute(""" + CREATE TYPE access_type AS ENUM + ('Download', 'Webpage', 'API'); + """) + + # -- agency_aggregation + op.execute(""" + CREATE TYPE agency_aggregation AS ENUM + ('county', 'local', 'state', 'federal'); + """) + # -- update_method + op.execute(""" + CREATE TYPE update_method AS ENUM + ('Insert', 'No updates', 'Overwrite'); + """) + + # -- detail_level + op.execute(""" + CREATE TYPE detail_level AS ENUM + ('Individual record', 'Aggregated records', 'Summarized totals'); + """) + + # -- retention_schedule + op.execute(""" + CREATE TYPE retention_schedule AS ENUM + ('< 1 day', '1 day', '< 1 week', '1 week', '1 month', '< 1 year', '1-10 years', '> 10 years', 'Future only'); + """) + + # -Table + op.execute(""" + CREATE FOREIGN TABLE IF NOT EXISTS "remote".data_sources + ( + name character varying , + description character , + source_url character , + agency_supplied boolean, + supplying_entity character , + agency_originated boolean, + agency_aggregation agency_aggregation, + coverage_start date, + coverage_end date, + updated_at timestamp with time zone , + detail_level detail_level, + record_download_option_provided boolean, + data_portal_type character , + update_method update_method, + readme_url character , + originating_entity character , + retention_schedule retention_schedule, + airtable_uid character , + scraper_url character , + created_at timestamp with time zone , + submission_notes character , + rejection_note character , + submitter_contact_info character , + agency_described_not_in_database character , + data_portal_type_other character , + data_source_request character , + broken_source_url_as_of timestamp with time zone, + access_notes text , + url_status text , + approval_status text , + record_type_id integer, + access_types access_type[], + tags text[] , + record_formats text[] , + id integer, + approval_status_updated_at timestamp with time zone , + last_approval_editor bigint + ) + SERVER data_sources_server + OPTIONS ( + schema_name 'public', + table_name 'data_sources' + ); + """) + + + +def downgrade() -> None: + # Drop foreign schema + op.execute('DROP SCHEMA IF EXISTS "remote" CASCADE;') + + # Drop enums + enums = [ + "jurisdiction_type", + "agency_type", + "location_type", + "access_type", + "agency_aggregation", + "update_method", + "detail_level", + "retention_schedule", + ] + for enum in enums: + op.execute(f""" + DROP TYPE IF EXISTS {enum}; + """) + + # Drop user mapping + user = os.getenv("DATA_SOURCES_USER") + op.execute(f""" + DROP USER MAPPING FOR {user} SERVER data_sources_server; + """) + + # Drop server + op.execute(""" + DROP SERVER IF EXISTS data_sources_server CASCADE; + """) + + # Drop FDW + op.execute(""" + DROP EXTENSION IF EXISTS postgres_fdw CASCADE; + """) diff --git a/local_database/DTOs.py b/local_database/DTOs.py new file mode 100644 index 00000000..f222e5ba --- /dev/null +++ b/local_database/DTOs.py @@ -0,0 +1,52 @@ +from typing import Annotated, Optional + +from pydantic import BaseModel, AfterValidator + +from local_database.local_db_util import is_absolute_path, get_absolute_path + + +class VolumeInfo(BaseModel): + host_path: str + container_path: Annotated[str, AfterValidator(is_absolute_path)] + + def build_volumes(self): + return { + self.host_path: { + "bind": self.container_path, + "mode": "rw" + } + } + + +class DockerfileInfo(BaseModel): + image_tag: str + dockerfile_directory: Optional[str] = None + + +class HealthCheckInfo(BaseModel): + test: list[str] + interval: int + timeout: int + retries: int + start_period: int + + def build_healthcheck(self) -> dict: + multiplicative_factor = 1000000000 # Assume 1 second + return { + "test": self.test, + "interval": self.interval * multiplicative_factor, + "timeout": self.timeout * multiplicative_factor, + "retries": self.retries, + "start_period": self.start_period * multiplicative_factor + } + + +class DockerInfo(BaseModel): + dockerfile_info: DockerfileInfo + volume_info: Optional[VolumeInfo] = None + name: str + ports: Optional[dict] = None + environment: Optional[dict] + command: Optional[str] = None + entrypoint: Optional[list[str]] = None + health_check_info: Optional[HealthCheckInfo] = None diff --git a/local_database/DataDumper/dump.sh b/local_database/DataDumper/dump.sh index 9c07c0ca..482a3ca1 100644 --- a/local_database/DataDumper/dump.sh +++ b/local_database/DataDumper/dump.sh @@ -1,15 +1,28 @@ #!/bin/bash #set -e + # Variables (customize these or pass them as environment variables) DB_HOST=${DUMP_HOST:-"postgres_container"} DB_USER=${DUMP_USER:-"your_user"} -DB_PORT=${DUMP_PORT:-"5432"} # Default to 5432 if not provided +DB_PORT=${DUMP_PORT:-"5432"} DB_PASSWORD=${DUMP_PASSWORD:-"your_password"} DB_NAME=${DUMP_NAME:-"your_database"} -DUMP_FILE="/dump/db_dump.sql" +DUMP_FILE=${DUMP_FILE:-"/dump/db_dump.sql"} +DUMP_SCHEMA_ONLY=${DUMP_SCHEMA_ONLY:-false} # Set to "true" to dump only schema + # Export password for pg_dump export PGPASSWORD=$DB_PASSWORD -# Dump the database -echo "Dumping database $DB_NAME from $DB_HOST:$DB_PORT..." -pg_dump -h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME --no-owner --no-acl -F c -f $DUMP_FILE -echo "Dump completed. File saved to $DUMP_FILE." \ No newline at end of file + +# Determine pg_dump flags +PG_DUMP_FLAGS="--no-owner --no-acl -F c" +if [[ "$DUMP_SCHEMA_ONLY" == "true" ]]; then + PG_DUMP_FLAGS="$PG_DUMP_FLAGS --schema-only" + echo "Dumping schema only..." +else + echo "Dumping full database..." +fi + +# Run pg_dump +pg_dump -h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME $PG_DUMP_FLAGS -f $DUMP_FILE + +echo "Dump completed. File saved to $DUMP_FILE." diff --git a/local_database/DataDumper/dump/data_sources_db_dump.sql b/local_database/DataDumper/dump/data_sources_db_dump.sql new file mode 100644 index 00000000..aa27b60a Binary files /dev/null and b/local_database/DataDumper/dump/data_sources_db_dump.sql differ diff --git a/local_database/DockerInfos.py b/local_database/DockerInfos.py new file mode 100644 index 00000000..3b1c071b --- /dev/null +++ b/local_database/DockerInfos.py @@ -0,0 +1,97 @@ +from local_database.DTOs import DockerInfo, DockerfileInfo, HealthCheckInfo, VolumeInfo +from local_database.constants import LOCAL_DATA_SOURCES_DB_NAME +from util.helper_functions import get_from_env, project_path + + +def get_database_docker_info() -> DockerInfo: + return DockerInfo( + dockerfile_info=DockerfileInfo( + image_tag="postgres:15", + ), + name="data_source_identification_db", + ports={ + "5432/tcp": 5432 + }, + environment={ + "POSTGRES_PASSWORD": "HanviliciousHamiltonHilltops", + "POSTGRES_USER": "test_source_collector_user", + "POSTGRES_DB": "source_collector_test_db" + }, + health_check_info=HealthCheckInfo( + test=["pg_isready", "-U", "test_source_collector_user", "-h", "127.0.0.1", "-p", "5432"], + interval=1, + timeout=3, + retries=30, + start_period=2 + ) + ) + + +def get_data_sources_data_dumper_info() -> DockerInfo: + return DockerInfo( + dockerfile_info=DockerfileInfo( + image_tag="datadumper", + dockerfile_directory=str(project_path( + "local_database", + "DataDumper" + )) + ), + volume_info=VolumeInfo( + host_path=str(project_path( + "local_database", + "DataDumper", + "dump" + )), + container_path="/dump" + ), + name="datadumper", + environment={ + "DUMP_HOST": get_from_env("PROD_DATA_SOURCES_HOST"), + "DUMP_USER": get_from_env("PROD_DATA_SOURCES_USER"), + "DUMP_PASSWORD": get_from_env("PROD_DATA_SOURCES_PASSWORD"), + "DUMP_NAME": get_from_env("PROD_DATA_SOURCES_DB"), + "DUMP_PORT": get_from_env("PROD_DATA_SOURCES_PORT"), + "RESTORE_HOST": get_from_env("POSTGRES_HOST"), + "RESTORE_USER": get_from_env("POSTGRES_USER"), + "RESTORE_PORT": get_from_env("POSTGRES_PORT"), + "RESTORE_DB_NAME": LOCAL_DATA_SOURCES_DB_NAME, + "RESTORE_PASSWORD": get_from_env("POSTGRES_PASSWORD"), + "DUMP_FILE": "/dump/data_sources_db_dump.sql", + "DUMP_SCHEMA_ONLY": "true" + }, + command="bash" + ) + + +def get_source_collector_data_dumper_info() -> DockerInfo: + return DockerInfo( + dockerfile_info=DockerfileInfo( + image_tag="datadumper", + dockerfile_directory=str(project_path( + "local_database", + "DataDumper" + )) + ), + volume_info=VolumeInfo( + host_path=str(project_path( + "local_database", + "DataDumper", + "dump" + )), + container_path="/dump" + ), + name="datadumper", + environment={ + "DUMP_HOST": get_from_env("DUMP_HOST"), + "DUMP_USER": get_from_env("DUMP_USER"), + "DUMP_PASSWORD": get_from_env("DUMP_PASSWORD"), + "DUMP_NAME": get_from_env("DUMP_DB_NAME"), + "DUMP_PORT": get_from_env("DUMP_PORT"), + "RESTORE_HOST": "data_source_identification_db", + "RESTORE_USER": "test_source_collector_user", + "RESTORE_PORT": "5432", + "RESTORE_DB_NAME": "source_collector_test_db", + "RESTORE_PASSWORD": "HanviliciousHamiltonHilltops", + }, + command="bash" + ) diff --git a/local_database/classes/DockerClient.py b/local_database/classes/DockerClient.py new file mode 100644 index 00000000..ca9d535b --- /dev/null +++ b/local_database/classes/DockerClient.py @@ -0,0 +1,118 @@ +import docker +from docker.errors import NotFound, APIError + +from local_database.DTOs import DockerfileInfo, DockerInfo + + +class DockerClient: + + def __init__(self): + self.client = docker.from_env() + + def run_command(self, command: str, container_id: str): + exec_id = self.client.api.exec_create( + container_id, + cmd=command, + tty=False, + stdin=False + ) + output_stream = self.client.api.exec_start(exec_id=exec_id, stream=True) + for line in output_stream: + print(line.decode().rstrip()) + + def start_network(self, network_name): + try: + self.client.networks.create(network_name, driver="bridge") + except APIError as e: + # Assume already exists + if e.response.status_code != 409: + raise e + print("Network already exists") + return self.client.networks.get(network_name) + + def stop_network(self, network_name): + self.client.networks.get(network_name).remove() + + def get_image( + self, + dockerfile_info: DockerfileInfo, + force_rebuild: bool = False + ): + if dockerfile_info.dockerfile_directory: + # Build image from Dockerfile + self.client.images.build( + path=dockerfile_info.dockerfile_directory, + tag=dockerfile_info.image_tag, + nocache=force_rebuild, + rm=True # Remove intermediate images + ) + return + + if force_rebuild: + # Even if not from Dockerfile, re-pull to ensure freshness + self.client.images.pull(dockerfile_info.image_tag) + return + + try: + self.client.images.get(dockerfile_info.image_tag) + except NotFound: + self.client.images.pull(dockerfile_info.image_tag) + + def get_existing_container(self, docker_info_name: str): + try: + return self.client.containers.get(docker_info_name) + except NotFound: + return None + + def create_container(self, docker_info: DockerInfo, network_name: str, force_rebuild: bool = False): + self.get_image( + docker_info.dockerfile_info, + force_rebuild=force_rebuild + ) + + container = self.client.containers.run( + image=docker_info.dockerfile_info.image_tag, + volumes=docker_info.volume_info.build_volumes() if docker_info.volume_info is not None else None, + command=docker_info.command, + entrypoint=docker_info.entrypoint, + detach=True, + name=docker_info.name, + ports=docker_info.ports, + network=network_name, + environment=docker_info.environment, + stdout=True, + stderr=True, + tty=True, + healthcheck=docker_info.health_check_info.build_healthcheck() if docker_info.health_check_info is not None else None + ) + return container + + + def run_container( + self, + docker_info: DockerInfo, + network_name: str, + force_rebuild: bool = False + ): + print(f"Running container {docker_info.name}") + container = self.get_existing_container(docker_info.name) + if container is None: + return self.create_container( + docker_info=docker_info, + network_name=network_name, + force_rebuild=force_rebuild + ) + if force_rebuild: + print("Rebuilding container...") + container.remove(force=True) + return self.create_container( + docker_info=docker_info, + network_name=network_name, + force_rebuild=force_rebuild + ) + if container.status == 'running': + print(f"Container '{docker_info.name}' is already running") + return container + container.start() + return container + diff --git a/local_database/classes/DockerContainer.py b/local_database/classes/DockerContainer.py new file mode 100644 index 00000000..ee2ecba9 --- /dev/null +++ b/local_database/classes/DockerContainer.py @@ -0,0 +1,28 @@ +import time + +from docker.models.containers import Container + +from local_database.classes.DockerClient import DockerClient + + +class DockerContainer: + + def __init__(self, dc: DockerClient, container: Container): + self.dc = dc + self.container = container + + def run_command(self, command: str): + self.dc.run_command(command, self.container.id) + + def stop(self): + self.container.stop() + + def wait_for_pg_to_be_ready(self): + for i in range(30): + exit_code, output = self.container.exec_run("pg_isready") + print(output) + if exit_code == 0: + return + time.sleep(1) + raise Exception("Timed out waiting for postgres to be ready") + diff --git a/local_database/classes/DockerManager.py b/local_database/classes/DockerManager.py new file mode 100644 index 00000000..ac294dc1 --- /dev/null +++ b/local_database/classes/DockerManager.py @@ -0,0 +1,78 @@ +import platform +import subprocess +import sys + +import docker +from docker.errors import APIError + +from local_database.DTOs import DockerfileInfo, DockerInfo +from local_database.classes.DockerClient import DockerClient +from local_database.classes.DockerContainer import DockerContainer + + +class DockerManager: + def __init__(self): + if not self.is_docker_running(): + self.start_docker_engine() + + self.client = DockerClient() + self.network_name = "my_network" + self.network = self.start_network() + + @staticmethod + def start_docker_engine(): + system = platform.system() + + match system: + case "Windows": + # Use PowerShell to start Docker Desktop on Windows + subprocess.run([ + "powershell", "-Command", + "Start-Process 'Docker Desktop' -Verb RunAs" + ]) + case "Darwin": + # MacOS: Docker Desktop must be started manually or with open + subprocess.run(["open", "-a", "Docker"]) + case "Linux": + # Most Linux systems use systemctl to manage Docker + subprocess.run(["sudo", "systemctl", "start", "docker"]) + case _: + print(f"Unsupported OS: {system}") + sys.exit(1) + + @staticmethod + def is_docker_running(): + try: + client = docker.from_env() + client.ping() + return True + except docker.errors.DockerException as e: + print(f"Docker is not running: {e}") + return False + + def run_command(self, command: str, container_id: str): + self.client.run_command(command, container_id) + + def start_network(self): + return self.client.start_network(self.network_name) + + def stop_network(self): + self.client.stop_network(self.network_name) + + def get_image(self, dockerfile_info: DockerfileInfo): + self.client.get_image(dockerfile_info) + + def run_container( + self, + docker_info: DockerInfo, + force_rebuild: bool = False + ) -> DockerContainer: + raw_container = self.client.run_container( + docker_info, + network_name=self.network_name, + force_rebuild=force_rebuild + ) + return DockerContainer(self.client, raw_container) + + def get_containers(self): + return self.client.client.containers.list() \ No newline at end of file diff --git a/local_database/classes/TimestampChecker.py b/local_database/classes/TimestampChecker.py new file mode 100644 index 00000000..56779fd4 --- /dev/null +++ b/local_database/classes/TimestampChecker.py @@ -0,0 +1,32 @@ +import datetime +import os +from typing import Optional + + +class TimestampChecker: + def __init__(self): + self.last_run_time: Optional[datetime.datetime] = self.load_last_run_time() + + def load_last_run_time(self) -> Optional[datetime.datetime]: + # Check if file `last_run.txt` exists + # If it does, load the last run time + if os.path.exists("local_state/last_run.txt"): + with open("local_state/last_run.txt", "r") as f: + return datetime.datetime.strptime( + f.read(), + "%Y-%m-%d %H:%M:%S" + ) + return None + + def last_run_within_24_hours(self): + if self.last_run_time is None: + return False + return datetime.datetime.now() - self.last_run_time < datetime.timedelta(days=1) + + def set_last_run_time(self): + # If directory `local_state` doesn't exist, create it + if not os.path.exists("local_state"): + os.makedirs("local_state") + + with open("local_state/last_run.txt", "w") as f: + f.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) diff --git a/local_database/classes/__init__.py b/local_database/classes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/local_database/constants.py b/local_database/constants.py new file mode 100644 index 00000000..d5c96e72 --- /dev/null +++ b/local_database/constants.py @@ -0,0 +1,5 @@ +LOCAL_DATA_SOURCES_DB_NAME = "test_data_sources_db" +LOCAL_SOURCE_COLLECTOR_DB_NAME = "source_collector_test_db" + +DUMP_SH_DOCKER_PATH = "/usr/local/bin/dump.sh" +RESTORE_SH_DOCKER_PATH = "/usr/local/bin/restore.sh" \ No newline at end of file diff --git a/local_database/create_database.py b/local_database/create_database.py new file mode 100644 index 00000000..58b15508 --- /dev/null +++ b/local_database/create_database.py @@ -0,0 +1,98 @@ +import argparse +import os +import subprocess + +import psycopg2 +from psycopg2 import sql + +from local_database.DockerInfos import get_data_sources_data_dumper_info +from local_database.classes.DockerManager import DockerManager +from local_database.constants import LOCAL_DATA_SOURCES_DB_NAME, LOCAL_SOURCE_COLLECTOR_DB_NAME, RESTORE_SH_DOCKER_PATH + +# Defaults (can be overridden via environment variables) +POSTGRES_HOST = os.getenv("POSTGRES_HOST", "host.docker.internal") +POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", "5432")) +POSTGRES_USER = os.getenv("POSTGRES_USER", "test_source_collector_user") +POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "HanviliciousHamiltonHilltops") + + +# Connect to the default 'postgres' database to create other databases +def connect(database="postgres", autocommit=True): + conn = psycopg2.connect( + dbname=database, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST, + port=POSTGRES_PORT + ) + if autocommit: + conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + return conn + +def create_database(db_name): + conn = connect("postgres") + with conn.cursor() as cur: + cur.execute(sql.SQL(""" + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE datname = %s AND pid <> pg_backend_pid() + """), [db_name]) + + # Drop the database if it exists + cur.execute(sql.SQL("DROP DATABASE IF EXISTS {}").format(sql.Identifier(db_name))) + print(f"🗑️ Dropped existing database: {db_name}") + + try: + cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(db_name))) + print(f"✅ Created database: {db_name}") + except psycopg2.errors.DuplicateDatabase: + print(f"⚠️ Database {db_name} already exists") + except Exception as e: + print(f"❌ Failed to create {db_name}: {e}") + +def main(): + print("Creating databases...") + create_database(LOCAL_DATA_SOURCES_DB_NAME) + create_database(LOCAL_SOURCE_COLLECTOR_DB_NAME) + +if __name__ == "__main__": + main() + parser = argparse.ArgumentParser() + + parser.add_argument( + "--use-shell", + action="store_true", + help="Use shell to run restore script" + ) + + args = parser.parse_args() + + if args.use_shell: + subprocess.run( + [ + "bash", + "-c", + RESTORE_SH_DOCKER_PATH + ], + env={ + "RESTORE_HOST": POSTGRES_HOST, + "RESTORE_USER": POSTGRES_USER, + "RESTORE_PORT": str(POSTGRES_PORT), + "RESTORE_DB_NAME": LOCAL_DATA_SOURCES_DB_NAME, + "RESTORE_PASSWORD": POSTGRES_PASSWORD + } + ) + os.system(RESTORE_SH_DOCKER_PATH) + exit(0) + + docker_manager = DockerManager() + data_sources_docker_info = get_data_sources_data_dumper_info() + container = docker_manager.run_container( + data_sources_docker_info, + force_rebuild=True + ) + try: + container.run_command(RESTORE_SH_DOCKER_PATH) + finally: + container.stop() + diff --git a/local_database/dump_data_sources_schema.py b/local_database/dump_data_sources_schema.py new file mode 100644 index 00000000..65079f53 --- /dev/null +++ b/local_database/dump_data_sources_schema.py @@ -0,0 +1,21 @@ +from local_database.DockerInfos import get_data_sources_data_dumper_info +from local_database.classes.DockerManager import DockerManager +from local_database.constants import DUMP_SH_DOCKER_PATH + + +def main(): + docker_manager = DockerManager() + data_sources_docker_info = get_data_sources_data_dumper_info() + container = docker_manager.run_container( + data_sources_docker_info, + force_rebuild=True + ) + try: + container.run_command(DUMP_SH_DOCKER_PATH) + finally: + container.stop() + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/local_database/local_db_util.py b/local_database/local_db_util.py new file mode 100644 index 00000000..7bc5bb12 --- /dev/null +++ b/local_database/local_db_util.py @@ -0,0 +1,18 @@ +from pathlib import Path + + +def get_absolute_path(relative_path: str) -> str: + """ + Get absolute path, using the current file as the point of reference + """ + current_dir = Path(__file__).parent + absolute_path = (current_dir / relative_path).resolve() + return str(absolute_path) + + +def is_absolute_path(path: str) -> str: + if len(path) == 0: + raise ValueError("Path is required") + if path[0] != "/": + raise ValueError("Container path must be absolute") + return path diff --git a/local_database/setup.py b/local_database/setup.py new file mode 100644 index 00000000..99ff1da9 --- /dev/null +++ b/local_database/setup.py @@ -0,0 +1,53 @@ +import subprocess +import time +import sys + +POSTGRES_SERVICE_NAME = "postgres" +FOLLOWUP_SCRIPT = "py create_database.py" +MAX_RETRIES = 20 +SLEEP_SECONDS = 1 + +def run_command(cmd, check=True, capture_output=False, **kwargs): + try: + return subprocess.run(cmd, shell=True, check=check, capture_output=capture_output, text=True, **kwargs) + except subprocess.CalledProcessError as e: + print(f"Command '{cmd}' failed: {e}") + sys.exit(1) + +def get_postgres_container_id(): + result = run_command(f"docker-compose ps -q {POSTGRES_SERVICE_NAME}", capture_output=True) + container_id = result.stdout.strip() + if not container_id: + print("Error: Could not find Postgres container.") + sys.exit(1) + return container_id + +def wait_for_postgres(container_id): + print("Waiting for Postgres to be ready...") + for i in range(MAX_RETRIES): + try: + run_command(f"docker exec {container_id} pg_isready -U postgres", check=True) + print("Postgres is ready!") + return + except subprocess.CalledProcessError as e: + print(f"Still waiting... ({i + 1}/{MAX_RETRIES}) Exit code: {e.returncode}") + print(f"Output: {e.output if hasattr(e, 'output') else 'N/A'}") + time.sleep(SLEEP_SECONDS) + print("Postgres did not become ready in time.") + sys.exit(1) + +def main(): + print("Stopping Docker Compose...") + run_command("docker-compose down") + + print("Starting Docker Compose...") + run_command("docker-compose up -d") + + container_id = get_postgres_container_id() + wait_for_postgres(container_id) + + print("Running follow-up script...") + run_command(FOLLOWUP_SCRIPT) + +if __name__ == "__main__": + main() diff --git a/start_mirrored_local_app.py b/start_mirrored_local_app.py index 940c372e..236030d0 100644 --- a/start_mirrored_local_app.py +++ b/start_mirrored_local_app.py @@ -6,336 +6,42 @@ requirements.txt, and must be installed separately via `pip install docker` """ -import datetime -import os -import platform -import subprocess -import sys -import time -from pathlib import Path -from typing import Optional, Annotated import uvicorn -import docker -from docker.errors import APIError, NotFound -from docker.models.containers import Container -from pydantic import BaseModel, AfterValidator - from apply_migrations import apply_migrations -from util.helper_functions import get_from_env - -def is_absolute_path(path: str) -> str: - if len(path) == 0: - raise ValueError("Path is required") - if path[0] != "/": - raise ValueError("Container path must be absolute") - return path - -class VolumeInfo(BaseModel): - host_path: str - container_path: Annotated[str, AfterValidator(is_absolute_path)] - - def build_volumes(self): - return { - get_absolute_path(self.host_path): { - "bind": self.container_path, - "mode": "rw" - } - } - -def wait_for_pg_to_be_ready(container: Container): - for i in range(30): - exit_code, output = container.exec_run("pg_isready") - print(output) - if exit_code == 0: - return - time.sleep(1) - raise Exception("Timed out waiting for postgres to be ready") - -def get_absolute_path(relative_path: str) -> str: - """ - Get absolute path, using the current file as the point of reference - """ - current_dir = Path(__file__).parent - absolute_path = (current_dir / relative_path).resolve() - return str(absolute_path) - - -class DockerfileInfo(BaseModel): - image_tag: str - dockerfile_directory: Optional[str] = None - - - -class HealthCheckInfo(BaseModel): - test: list[str] - interval: int - timeout: int - retries: int - start_period: int - - def build_healthcheck(self) -> dict: - multiplicative_factor = 1000000000 # Assume 1 second - return { - "test": self.test, - "interval": self.interval * multiplicative_factor, - "timeout": self.timeout * multiplicative_factor, - "retries": self.retries, - "start_period": self.start_period * multiplicative_factor - } - -class DockerInfo(BaseModel): - dockerfile_info: DockerfileInfo - volume_info: Optional[VolumeInfo] = None - name: str - ports: Optional[dict] = None - environment: Optional[dict] - command: Optional[str] = None - entrypoint: Optional[list[str]] = None - health_check_info: Optional[HealthCheckInfo] = None - -def run_command_checked(command: list[str] or str, shell=False): - result = subprocess.run( - command, - check=True, - capture_output=True, - text=True, - shell=shell - ) - return result - -def is_docker_running(): - try: - client = docker.from_env() - client.ping() - return True - except docker.errors.DockerException as e: - print(f"Docker is not running: {e}") - return False - -def wait_for_health(container, timeout=30): - start = time.time() - while time.time() - start < timeout: - container.reload() # Refresh container state - state = container.attrs.get("State") - print(state) - health = container.attrs.get("State", {}).get("Health", {}) - status = health.get("Status") - print(f"Health status: {status}") - if status == "healthy": - print("Postgres is healthy.") - return - elif status == "unhealthy": - raise Exception("Postgres container became unhealthy.") - time.sleep(1) - raise TimeoutError("Timed out waiting for Postgres to become healthy.") - -def start_docker_engine(): - system = platform.system() - - match system: - case "Windows": - # Use PowerShell to start Docker Desktop on Windows - subprocess.run([ - "powershell", "-Command", - "Start-Process 'Docker Desktop' -Verb RunAs" - ]) - case "Darwin": - # MacOS: Docker Desktop must be started manually or with open - subprocess.run(["open", "-a", "Docker"]) - case "Linux": - # Most Linux systems use systemctl to manage Docker - subprocess.run(["sudo", "systemctl", "start", "docker"]) - case _: - print(f"Unsupported OS: {system}") - sys.exit(1) - -class DockerManager: - def __init__(self): - self.client = docker.from_env() - self.network_name = "my_network" - self.network = self.start_network() +from local_database.DockerInfos import get_database_docker_info, get_source_collector_data_dumper_info +from local_database.classes.DockerManager import DockerManager +from local_database.classes.TimestampChecker import TimestampChecker +from local_database.constants import RESTORE_SH_DOCKER_PATH, DUMP_SH_DOCKER_PATH - def run_command(self, command: str, container_id: str): - exec_id = self.client.api.exec_create( - container_id, - cmd=command, - tty=True, - stdin=False - ) - output_stream = self.client.api.exec_start(exec_id=exec_id, stream=True) - for line in output_stream: - print(line.decode().rstrip()) - - def start_network(self): - try: - self.client.networks.create(self.network_name, driver="bridge") - except APIError as e: - # Assume already exists - print(e) - return self.client.networks.get("my_network") - - def stop_network(self): - self.client.networks.get("my_network").remove() - - def get_image(self, dockerfile_info: DockerfileInfo): - if dockerfile_info.dockerfile_directory: - # Build image from Dockerfile - self.client.images.build( - path=get_absolute_path(dockerfile_info.dockerfile_directory), - tag=dockerfile_info.image_tag - ) - else: - # Pull or use existing image - self.client.images.pull(dockerfile_info.image_tag) - - - def run_container( - self, - docker_info: DockerInfo, - ) -> Container: - print(f"Running container {docker_info.name}") - try: - container = self.client.containers.get(docker_info.name) - if container.status == 'running': - print(f"Container '{docker_info.name}' is already running") - return container - print("Restarting container...") - container.start() - return container - except NotFound: - # Container does not exist; proceed to build/pull image and run - pass - - self.get_image(docker_info.dockerfile_info) - - container = self.client.containers.run( - image=docker_info.dockerfile_info.image_tag, - volumes=docker_info.volume_info.build_volumes() if docker_info.volume_info is not None else None, - command=docker_info.command, - entrypoint=docker_info.entrypoint, - detach=True, - name=docker_info.name, - ports=docker_info.ports, - network=self.network_name, - environment=docker_info.environment, - stdout=True, - stderr=True, - tty=True, - healthcheck=docker_info.health_check_info.build_healthcheck() if docker_info.health_check_info is not None else None - ) - return container - - -class TimestampChecker: - def __init__(self): - self.last_run_time: Optional[datetime.datetime] = self.load_last_run_time() - - def load_last_run_time(self) -> Optional[datetime.datetime]: - # Check if file `last_run.txt` exists - # If it does, load the last run time - if os.path.exists("local_state/last_run.txt"): - with open("local_state/last_run.txt", "r") as f: - return datetime.datetime.strptime( - f.read(), - "%Y-%m-%d %H:%M:%S" - ) - return None - - def last_run_within_24_hours(self): - if self.last_run_time is None: - return False - return datetime.datetime.now() - self.last_run_time < datetime.timedelta(days=1) - - def set_last_run_time(self): - # If directory `local_state` doesn't exist, create it - if not os.path.exists("local_state"): - os.makedirs("local_state") - - with open("local_state/last_run.txt", "w") as f: - f.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) - -def get_database_docker_info() -> DockerInfo: - return DockerInfo( - dockerfile_info=DockerfileInfo( - image_tag="postgres:15", - ), - name="data_source_identification_db", - ports={ - "5432/tcp": 5432 - }, - environment={ - "POSTGRES_PASSWORD": "HanviliciousHamiltonHilltops", - "POSTGRES_USER": "test_source_collector_user", - "POSTGRES_DB": "source_collector_test_db" - }, - health_check_info=HealthCheckInfo( - test=["pg_isready", "-U", "test_source_collector_user", "-h", "127.0.0.1", "-p", "5432"], - interval=1, - timeout=3, - retries=30, - start_period=2 - ) - ) - -def get_data_dumper_docker_info() -> DockerInfo: - return DockerInfo( - dockerfile_info=DockerfileInfo( - image_tag="datadumper", - dockerfile_directory="local_database/DataDumper" - ), - volume_info=VolumeInfo( - host_path="./local_database/DataDumper/dump", - container_path="/dump" - ), - name="datadumper", - environment={ - "DUMP_HOST": get_from_env("DUMP_HOST"), - "DUMP_USER": get_from_env("DUMP_USER"), - "DUMP_PASSWORD": get_from_env("DUMP_PASSWORD"), - "DUMP_NAME": get_from_env("DUMP_DB_NAME"), - "DUMP_PORT": get_from_env("DUMP_PORT"), - "RESTORE_HOST": "data_source_identification_db", - "RESTORE_USER": "test_source_collector_user", - "RESTORE_PORT": "5432", - "RESTORE_DB_NAME": "source_collector_test_db", - "RESTORE_PASSWORD": "HanviliciousHamiltonHilltops", - }, - command="bash" - ) def main(): docker_manager = DockerManager() - # Ensure docker is running, and start if not - if not is_docker_running(): - start_docker_engine() # Ensure Dockerfile for database is running, and if not, start it database_docker_info = get_database_docker_info() - container = docker_manager.run_container(database_docker_info) - wait_for_pg_to_be_ready(container) + db_container = docker_manager.run_container(database_docker_info) + db_container.wait_for_pg_to_be_ready() # Start dockerfile for Datadumper - data_dumper_docker_info = get_data_dumper_docker_info() + data_dumper_docker_info = get_source_collector_data_dumper_info() # If not last run within 24 hours, run dump operation in Datadumper # Check cache if exists and checker = TimestampChecker() - container = docker_manager.run_container(data_dumper_docker_info) + data_dump_container = docker_manager.run_container(data_dumper_docker_info) if checker.last_run_within_24_hours(): print("Last run within 24 hours, skipping dump...") else: - docker_manager.run_command( - '/usr/local/bin/dump.sh', - container.id + data_dump_container.run_command( + DUMP_SH_DOCKER_PATH, ) - docker_manager.run_command( - "/usr/local/bin/restore.sh", - container.id + data_dump_container.run_command( + RESTORE_SH_DOCKER_PATH, ) print("Stopping datadumper container") - container.stop() + data_dump_container.stop() checker.set_last_run_time() # Upgrade using alembic @@ -351,7 +57,7 @@ def main(): finally: # Add feature to stop all running containers print("Stopping containers...") - for container in docker_manager.client.containers.list(): + for container in docker_manager.get_containers(): container.stop() print("Containers stopped.") diff --git a/util/helper_functions.py b/util/helper_functions.py index 7d6c7f8d..deb6830b 100644 --- a/util/helper_functions.py +++ b/util/helper_functions.py @@ -1,10 +1,20 @@ import os from enum import Enum +from pathlib import Path from typing import Type from dotenv import load_dotenv from pydantic import BaseModel +def get_project_root(marker_files=(".project-root",)) -> Path: + current = Path(__file__).resolve() + for parent in [current] + list(current.parents): + if any((parent / marker).exists() for marker in marker_files): + return parent + raise FileNotFoundError("No project root found (missing marker files)") + +def project_path(*parts: str) -> Path: + return get_project_root().joinpath(*parts) def get_enum_values(enum: Type[Enum]): return [item.value for item in enum]