diff --git a/.env.example b/.env.example index d3fc87e..eb2229d 100644 --- a/.env.example +++ b/.env.example @@ -13,6 +13,6 @@ SPATIALITE_LIB="C:/OSGeo4W/bin/mod_spatialite.dll" # ========================= # Working Directories # ========================= -# Your local data folder - Windows or Mac +# Your local data folder - Windows or Mac (choose one) DATA_ROOT="F:/nardata/work/rme_extraction" DATA_ROOT="/Users/philipbailey/GISData/riverscapes" diff --git a/.vscode/launch.json b/.vscode/launch.json index b9998d4..c840e3f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -39,7 +39,7 @@ "program": "${file}", "env": { "PYTHONPATH": "${workspaceFolder}" - }, + }, "console": "integratedTerminal", "args": "${command:pickArgs}" }, @@ -318,11 +318,7 @@ // Working folder (downloads + parquet output) "{env:DATA_ROOT}/rme-athena", // Optional filters / flags - "--tags", - "2025CONUS", - // "--huc_filter", "160", // "--delete", - // "--force_update" ] }, { @@ -474,7 +470,8 @@ "False", "c7d8c487-c377-42b0-a5b6-4c16db18fb41", // "--visit_id", "744" - "--watershed", "Yankee Fork", + "--watershed", + "Yankee Fork", ] }, // { diff --git a/pipelines/rme_to_athena/rme_to_athena_parquet.py b/pipelines/rme_to_athena/rme_to_athena_parquet.py index 6588ea7..c15970f 100644 --- a/pipelines/rme_to_athena/rme_to_athena_parquet.py +++ b/pipelines/rme_to_athena/rme_to_athena_parquet.py @@ -10,6 +10,7 @@ import argparse import logging import os +from pathlib import Path import re import shutil import warnings @@ -22,10 +23,10 @@ from semver import Version from rsxml.util import safe_makedirs -from rsxml import dotenv, Logger +from rsxml import dotenv, Logger, ProgressBar -from pydex import RiverscapesAPI, RiverscapesSearchParams, RiverscapesProject -from pydex.lib.athena import athena_query_get_parsed +from pydex import RiverscapesAPI, RiverscapesProject +from pydex.lib.athena import query_to_dataframe # Environment-configurable buckets. These represent stable infrastructure and # should not vary run-to-run, so we prefer environment variables over CLI args. @@ -36,6 +37,29 @@ DATA_BUCKET = os.getenv(DATA_BUCKET_ENV_VAR, DEFAULT_DATA_BUCKET) ATHENA_OUTPUT_BUCKET = os.getenv(OUTPUT_BUCKET_ENV_VAR, DATA_BUCKET) # fallback to data bucket if not set +# Query to identify projects to add/replace. No semicolon allowed. +missing_projects_query = """ +with huc_projects_dex as + (select project_id, + huc, + created_on + from vw_projects + WHERE project_type_id = 'rs_metric_engine' + and owner = 'a52b8094-7a1d-4171-955c-ad30ae935296' + AND created_on >= 1735689600 + AND (contains(tags, '2025CONUS') + OR contains(tags, 'conus_athena'))), + huc_projects_scraped as + (select substr(huc12, 1, 10) as huc10, + raw_rme_pq2.rme_date_created_ts + from raw_rme_pq2) +select distinct project_id, huc, created_on, rme_date_created_ts +from huc_projects_dex dex + left join huc_projects_scraped scr on dex.huc = scr.huc10 +where scr.huc10 is null + or scr.rme_date_created_ts < truncate(dex.created_on/1000)*1000 +""" + def semver_to_int(version: Version) -> int: """convert to integer for easier comparisons @@ -51,30 +75,6 @@ def semver_to_int(version: Version) -> int: return version.major * MAJOR + version.minor * MINOR + version.patch -def get_athena_rme_projects(output_bucket: str) -> dict[str, int]: - """ - Query Athena for existing RME projects - return: lookup dict consisting of watershedID (ie huc10,str) and timestamp (integer) - FUTURE ENHANCEMENT: if we're only interested in updating a subset, no need to return everything in rme - """ - # FUTURE ENHANCEMENT - unless watershed_id a global id across countries we need something better - existing_rme = athena_query_get_parsed(output_bucket, 'SELECT DISTINCT watershed_id, rme_date_created_ts FROM raw_rme_pq2') - # this should look like: - # [{'rme_date_created_ts': '1752810123000', 'watershed_id': '1704020402'}, - # {'rme_date_created_ts': '1756512492000', 'watershed_id': '1030010112'}, - # ... - # ] - if not existing_rme: - print("got nothing back! failure?!") - raise NotImplementedError - - # Convert list of dicts to a dict keyed by watershed_id. assumes no null values - return { - row['watershed_id']: int(row['rme_date_created_ts']) - for row in existing_rme - } - - def download_file(rs_api: RiverscapesAPI, project_id: str, download_dir: str, regex: str) -> str: """ Download files from a project on Data Exchange that match the regex string @@ -84,7 +84,7 @@ def download_file(rs_api: RiverscapesAPI, project_id: str, download_dir: str, re log = Logger('download RS DEX file') gpkg_path = get_matching_file(download_dir, regex) if gpkg_path is not None and os.path.isfile(gpkg_path): - log.debug(f'file for {project_id} previously downloaded') + log.debug(f'file for matching {regex} previously downloaded') return gpkg_path rs_api.download_files(project_id, download_dir, [regex]) @@ -127,6 +127,7 @@ def download_rme_geopackage( """ # RegEx string for finding RME output GeoPackages RME_SCRAPE_GPKG_REGEX = r'.*riverscapes_metrics.gpkg' + # NOTE: will not overwrite existing files - which can be a problem. rme_gpkg = download_file(rs_api, project.id, huc_dir, RME_SCRAPE_GPKG_REGEX) # pyright: ignore[reportArgumentType] return rme_gpkg @@ -223,12 +224,9 @@ def upload_to_s3( def scrape_rme( rs_api: RiverscapesAPI, spatialite_path: str, - search_params: RiverscapesSearchParams, - download_dir: str, + download_dir: str | Path, data_bucket: str, - athena_output_bucket: str, delete_downloads_when_done: bool, - force_update: bool, ) -> None: """ Orchestrate the scraping, processing, and uploading of RME projects. @@ -244,23 +242,23 @@ def scrape_rme( log = Logger('Scrape RME') - rme_in_athena = get_athena_rme_projects(athena_output_bucket) - log.debug(f'{len(rme_in_athena)} existing rme projects found in athena') - # loop through data exchange projects + # NEW WAY + # run Athena query to find all eligible projects that are newer than what is already scraped + projects_to_add_df = query_to_dataframe(missing_projects_query, 'identify new projects') + if projects_to_add_df.empty: + log.info("Query to identify projects to scrape returned no results.") + return + count = 0 - for project, _stats, _searchtotal, prg in rs_api.search(search_params, progress_bar=True, page_size=100): + prg = ProgressBar(projects_to_add_df.shape[0], text="Scrape Progress") + for project_id in projects_to_add_df['project_id']: + project = rs_api.get_project_full(project_id) if project.huc is None or project.huc == '': log.warning(f'Project {project.id} does not have a HUC. Skipping.') continue - # check whether the project is already in Athena with the same or newer date + # this truncates to nearest second, for whatever reason project_created_date_ts = int(project.created_date.timestamp()) * 1000 # pyright: ignore[reportOptionalMemberAccess] Projects always have a created_date - if project.huc in rme_in_athena and rme_in_athena[project.huc] <= project_created_date_ts: - if force_update: - log.info(f'Force update project {project.id} as {project.huc} is already in Athena with the same or newer date. DEX ts = {project_created_date_ts}; Athena ts={rme_in_athena[project.huc]}') - else: - log.info(f'Skipping project {project.id} as {project.huc} is already in Athena with the same or newer date. DEX ts = {project_created_date_ts}; Athena ts={rme_in_athena[project.huc]}') - continue if project.model_version is None: log.warning(f'Project {project.id} does not have a model version. Skipping.') @@ -296,6 +294,7 @@ def scrape_rme( except Exception as e: log.error(f'Error scraping HUC {project.huc}: {e}') raise + prg.finish() def main(): @@ -303,14 +302,8 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument('stage', help='Environment: staging or production', type=str) parser.add_argument('spatialite_path', help='Path to the mod_spatialite library', type=str) - # Bucket configuration now via environment variables (RME_DATA_BUCKET, RME_ATHENA_OUTPUT_BUCKET) - # Remove old CLI bucket arguments to reduce noise and accidental misconfiguration. parser.add_argument('working_folder', help='top level folder for downloads and output', type=str) - parser.add_argument('--tags', help='Data Exchange tags to search for projects', type=str) - parser.add_argument('--collection', help='Collection GUID', type=str) parser.add_argument('--delete', help='Whether or not to delete downloaded GeoPackages', action='store_true', default=False) - parser.add_argument('--huc_filter', help='HUC filter SQL prefix ("17%")', type=str, default='') - parser.add_argument('--force_update', help='Generate and upload new parquet even if data exists', action='store_true', default=False) args = dotenv.parse_args_env(parser) # Set up some reasonable folders to store things @@ -323,20 +316,6 @@ def main(): log.title("rme scrape to parquet to athena") - # Data Exchange Search Params - search_params = RiverscapesSearchParams({ - 'projectTypeId': 'rs_metric_engine', - }) - - if args.collection != '.': - search_params.collection = args.collection - - if args.tags is not None and args.tags != '.': - search_params.tags = args.tags.split(',') - - if args.huc_filter != '' and args.huc_filter != '.': - search_params.meta = {'HUC': args.huc_filter} - # Log bucket resolution if ATHENA_OUTPUT_BUCKET == DATA_BUCKET: log.warning(f"Using single bucket for data & Athena output: {DATA_BUCKET} (override with {OUTPUT_BUCKET_ENV_VAR})") @@ -347,12 +326,9 @@ def main(): scrape_rme( api, args.spatialite_path, - search_params, download_folder, DATA_BUCKET, - ATHENA_OUTPUT_BUCKET, - args.delete, - args.force_update, + args.delete ) log.info('Process complete') diff --git a/pydex/lib/athena.py b/pydex/lib/athena.py index 632f8ce..2dd2bed 100644 --- a/pydex/lib/athena.py +++ b/pydex/lib/athena.py @@ -5,13 +5,63 @@ * athena * cybercastor_scripts -Consider porting any improvements to these other repositories. +Consider porting any improvements to/from these other repositories. """ import time import re +import uuid +# 3rd party import boto3 +import awswrangler as wr +import pandas as pd from rsxml import Logger +S3_ATHENA_BUCKET = "riverscapes-athena-output" + + +def query_to_dataframe(query: str, querylabel: str = "") -> pd.DataFrame: + """uses awswrangler to return a DataFrame for a given query + args: + query : the query to execute + querylabel : optional label for log messages, handy when queries are running in parallel + + * ctas False, unload True is "approach 2" + Does an UNLOAD query on Athena and parse the Parquet result on s3 + see docs for details https://aws-sdk-pandas.readthedocs.io/en/3.14.0/stubs/awswrangler.athena.read_sql_query.html + + PROS: + * Faster for mid and big result sizes + * Can handle some level of nested types. + * Does not modify Glue Data Catalog + + CONS: + * Output S3 path must be empty (thus use UUID). + * Does not support timestamp with time zone. + * Does not support columns with repeated names. + * Does not support columns with undefined data types. + * data has to fit into RAM memory. do not use for results with millions of rows + """ + log = Logger("Athena unload query to DF") + s3_output = f's3://{S3_ATHENA_BUCKET}/athena_unload/{uuid.uuid4()}/' + + query_bytes = len(query.encode('utf-8')) + if query_bytes > 262144: + raise ValueError(f"Query exceeds Athena's 256 KB limit ({query_bytes} bytes).") + + log.debug(f"Query {querylabel}:\n{query}") + try: + df = wr.athena.read_sql_query( + query, + database='default', + ctas_approach=False, + unload_approach=True, # only PARQUET format is supported with this option + s3_output=s3_output, + ) + log.debug(f"Query {querylabel} to dataframe completed.") + return df + except Exception as e: + log.warning(f"Query {querylabel} failed or returned no results: {e}") + return pd.DataFrame() # Return empty DataFrame for downstream code def fix_s3_uri(argstr: str) -> str: diff --git a/pyproject.toml b/pyproject.toml index ad80aef..a0f50be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,9 @@ dependencies = [ "six==1.17.0", "termcolor==2.5.0", "urllib3>=2.3", - "riverscapes-metadata @ git+https://github.com/Riverscapes/RiverscapesXML.git@master#subdirectory=riverscapes_metadata" + "riverscapes-metadata @ git+https://github.com/Riverscapes/RiverscapesXML.git@master#subdirectory=riverscapes_metadata", + "awswrangler>=3.10.1", + "pandas>=2.3.3", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index f6ca05a..0e77427 100644 --- a/uv.lock +++ b/uv.lock @@ -1,6 +1,11 @@ version = 1 revision = 3 requires-python = ">=3.12" +resolution-markers = [ + "python_full_version >= '3.13' and platform_machine == 'x86_64' and sys_platform == 'darwin'", + "python_full_version < '3.13' and platform_machine == 'x86_64' and sys_platform == 'darwin'", + "platform_machine != 'x86_64' or sys_platform != 'darwin'", +] [[package]] name = "ansicon" @@ -80,6 +85,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3a/2a/7cc015f5b9f5db42b7d48157e23356022889fc354a2813c15934b7cb5c0e/attrs-25.4.0-py3-none-any.whl", hash = "sha256:adcf7e2a1fb3b36ac48d97835bb6d8ade15b8dcce26aba8bf1d14847b57a3373", size = 67615, upload-time = "2025-10-06T13:54:43.17Z" }, ] +[[package]] +name = "awswrangler" +version = "3.10.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "boto3" }, + { name = "botocore" }, + { name = "numpy" }, + { name = "packaging" }, + { name = "pandas" }, + { name = "pyarrow" }, + { name = "setuptools" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d1/83/1e60d8a85e1db7203fb33b749666edff29a96f77ecccf219ce4bb2587adc/awswrangler-3.10.1.tar.gz", hash = "sha256:2c12c522ddf4f59214bb7b79ecab585fa8bb96ea6300e48126962485f1598ddf", size = 279223, upload-time = "2024-12-04T10:39:31.85Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e2/c3/f429b1e76a26e7a19474a0b56e3b76b403a860725a7e618b96754ebd70aa/awswrangler-3.10.1-py3-none-any.whl", hash = "sha256:050e9190e1211f6bb901f44241237e7310a8a71191bb31cb35779f795644209f", size = 378943, upload-time = "2024-12-04T10:39:29.714Z" }, +] + [[package]] name = "blessed" version = "1.23.0" @@ -685,11 +709,11 @@ wheels = [ [[package]] name = "packaging" -version = "25.0" +version = "24.2" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/a1/d4/1fc4078c65507b51b96ca8f8c3ba19e6a61c8253c72794544580a7b6c24d/packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f", size = 165727, upload-time = "2025-04-19T11:48:59.673Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d0/63/68dbb6eb2de9cb10ee4c9c14a0148804425e13c4fb20d61cce69f53106da/packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f", size = 163950, upload-time = "2024-11-08T09:47:47.202Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" }, + { url = "https://files.pythonhosted.org/packages/88/ef/eb23f262cca3c0c4eb7ab1933c3b1f03d021f2c48f54763065b6f0e321be/packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759", size = 65451, upload-time = "2024-11-08T09:47:44.722Z" }, ] [[package]] @@ -897,6 +921,7 @@ version = "0.1.0" source = { virtual = "." } dependencies = [ { name = "apsw" }, + { name = "awswrangler" }, { name = "boto3" }, { name = "geopandas" }, { name = "graphql-core" }, @@ -904,6 +929,7 @@ dependencies = [ { name = "jsonschema" }, { name = "lxml" }, { name = "matplotlib" }, + { name = "pandas" }, { name = "pyarrow" }, { name = "pyathena" }, { name = "python-dateutil" }, @@ -926,6 +952,7 @@ geo = [ [package.metadata] requires-dist = [ { name = "apsw", specifier = ">=3.49.1.0" }, + { name = "awswrangler", specifier = ">=3.10.1" }, { name = "boto3", specifier = ">=1.7.84" }, { name = "gdal", marker = "extra == 'geo'", specifier = "==3.11.4" }, { name = "geopandas", specifier = ">=1.0.1" }, @@ -934,6 +961,7 @@ requires-dist = [ { name = "jsonschema", specifier = ">=4.17.3" }, { name = "lxml", specifier = "==5.3.1" }, { name = "matplotlib", specifier = ">=3.9.4" }, + { name = "pandas", specifier = ">=2.3.3" }, { name = "pyarrow", specifier = ">=21.0.0" }, { name = "pyathena", specifier = ">=2.9.4" }, { name = "python-dateutil", specifier = "==2.9.0.post0" },