Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ SPATIALITE_LIB="C:/OSGeo4W/bin/mod_spatialite.dll"
# =========================
# Working Directories
# =========================
# Your local data folder - Windows or Mac
# Your local data folder - Windows or Mac (choose one)
DATA_ROOT="F:/nardata/work/rme_extraction"
DATA_ROOT="/Users/philipbailey/GISData/riverscapes"
9 changes: 3 additions & 6 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"program": "${file}",
"env": {
"PYTHONPATH": "${workspaceFolder}"
},
},
"console": "integratedTerminal",
"args": "${command:pickArgs}"
},
Expand Down Expand Up @@ -318,11 +318,7 @@
// Working folder (downloads + parquet output)
"{env:DATA_ROOT}/rme-athena",
// Optional filters / flags
"--tags",
"2025CONUS",
// "--huc_filter", "160",
// "--delete",
// "--force_update"
]
},
{
Expand Down Expand Up @@ -474,7 +470,8 @@
"False",
"c7d8c487-c377-42b0-a5b6-4c16db18fb41",
// "--visit_id", "744"
"--watershed", "Yankee Fork",
"--watershed",
"Yankee Fork",
]
},
// {
Expand Down
110 changes: 43 additions & 67 deletions pipelines/rme_to_athena/rme_to_athena_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import argparse
import logging
import os
from pathlib import Path
import re
import shutil
import warnings
Expand All @@ -22,10 +23,10 @@
from semver import Version

from rsxml.util import safe_makedirs
from rsxml import dotenv, Logger
from rsxml import dotenv, Logger, ProgressBar

from pydex import RiverscapesAPI, RiverscapesSearchParams, RiverscapesProject
from pydex.lib.athena import athena_query_get_parsed
from pydex import RiverscapesAPI, RiverscapesProject
from pydex.lib.athena import query_to_dataframe

# Environment-configurable buckets. These represent stable infrastructure and
# should not vary run-to-run, so we prefer environment variables over CLI args.
Expand All @@ -36,6 +37,29 @@
DATA_BUCKET = os.getenv(DATA_BUCKET_ENV_VAR, DEFAULT_DATA_BUCKET)
ATHENA_OUTPUT_BUCKET = os.getenv(OUTPUT_BUCKET_ENV_VAR, DATA_BUCKET) # fallback to data bucket if not set

# Query to identify projects to add/replace. No semicolon allowed.
missing_projects_query = """
with huc_projects_dex as
(select project_id,
huc,
created_on
from vw_projects
WHERE project_type_id = 'rs_metric_engine'
and owner = 'a52b8094-7a1d-4171-955c-ad30ae935296'
AND created_on >= 1735689600
AND (contains(tags, '2025CONUS')
OR contains(tags, 'conus_athena'))),
huc_projects_scraped as
(select substr(huc12, 1, 10) as huc10,
raw_rme_pq2.rme_date_created_ts
from raw_rme_pq2)
select distinct project_id, huc, created_on, rme_date_created_ts
from huc_projects_dex dex
left join huc_projects_scraped scr on dex.huc = scr.huc10
where scr.huc10 is null
or scr.rme_date_created_ts < truncate(dex.created_on/1000)*1000
"""


def semver_to_int(version: Version) -> int:
"""convert to integer for easier comparisons
Expand All @@ -51,30 +75,6 @@ def semver_to_int(version: Version) -> int:
return version.major * MAJOR + version.minor * MINOR + version.patch


def get_athena_rme_projects(output_bucket: str) -> dict[str, int]:
"""
Query Athena for existing RME projects
return: lookup dict consisting of watershedID (ie huc10,str) and timestamp (integer)
FUTURE ENHANCEMENT: if we're only interested in updating a subset, no need to return everything in rme
"""
# FUTURE ENHANCEMENT - unless watershed_id a global id across countries we need something better
existing_rme = athena_query_get_parsed(output_bucket, 'SELECT DISTINCT watershed_id, rme_date_created_ts FROM raw_rme_pq2')
# this should look like:
# [{'rme_date_created_ts': '1752810123000', 'watershed_id': '1704020402'},
# {'rme_date_created_ts': '1756512492000', 'watershed_id': '1030010112'},
# ...
# ]
if not existing_rme:
print("got nothing back! failure?!")
raise NotImplementedError

# Convert list of dicts to a dict keyed by watershed_id. assumes no null values
return {
row['watershed_id']: int(row['rme_date_created_ts'])
for row in existing_rme
}


def download_file(rs_api: RiverscapesAPI, project_id: str, download_dir: str, regex: str) -> str:
"""
Download files from a project on Data Exchange that match the regex string
Expand All @@ -84,7 +84,7 @@ def download_file(rs_api: RiverscapesAPI, project_id: str, download_dir: str, re
log = Logger('download RS DEX file')
gpkg_path = get_matching_file(download_dir, regex)
if gpkg_path is not None and os.path.isfile(gpkg_path):
log.debug(f'file for {project_id} previously downloaded')
log.debug(f'file for matching {regex} previously downloaded')
return gpkg_path

rs_api.download_files(project_id, download_dir, [regex])
Expand Down Expand Up @@ -127,6 +127,7 @@ def download_rme_geopackage(
"""
# RegEx string for finding RME output GeoPackages
RME_SCRAPE_GPKG_REGEX = r'.*riverscapes_metrics.gpkg'
# NOTE: will not overwrite existing files - which can be a problem.
rme_gpkg = download_file(rs_api, project.id, huc_dir, RME_SCRAPE_GPKG_REGEX) # pyright: ignore[reportArgumentType]
return rme_gpkg

Expand Down Expand Up @@ -223,12 +224,9 @@ def upload_to_s3(
def scrape_rme(
rs_api: RiverscapesAPI,
spatialite_path: str,
search_params: RiverscapesSearchParams,
download_dir: str,
download_dir: str | Path,
data_bucket: str,
athena_output_bucket: str,
delete_downloads_when_done: bool,
force_update: bool,
) -> None:
"""
Orchestrate the scraping, processing, and uploading of RME projects.
Expand All @@ -244,23 +242,23 @@ def scrape_rme(

log = Logger('Scrape RME')

rme_in_athena = get_athena_rme_projects(athena_output_bucket)
log.debug(f'{len(rme_in_athena)} existing rme projects found in athena')
# loop through data exchange projects
# NEW WAY
# run Athena query to find all eligible projects that are newer than what is already scraped
projects_to_add_df = query_to_dataframe(missing_projects_query, 'identify new projects')
if projects_to_add_df.empty:
log.info("Query to identify projects to scrape returned no results.")
return

count = 0
for project, _stats, _searchtotal, prg in rs_api.search(search_params, progress_bar=True, page_size=100):
prg = ProgressBar(projects_to_add_df.shape[0], text="Scrape Progress")
for project_id in projects_to_add_df['project_id']:
project = rs_api.get_project_full(project_id)
if project.huc is None or project.huc == '':
log.warning(f'Project {project.id} does not have a HUC. Skipping.')
continue

# check whether the project is already in Athena with the same or newer date
# this truncates to nearest second, for whatever reason
project_created_date_ts = int(project.created_date.timestamp()) * 1000 # pyright: ignore[reportOptionalMemberAccess] Projects always have a created_date
if project.huc in rme_in_athena and rme_in_athena[project.huc] <= project_created_date_ts:
if force_update:
log.info(f'Force update project {project.id} as {project.huc} is already in Athena with the same or newer date. DEX ts = {project_created_date_ts}; Athena ts={rme_in_athena[project.huc]}')
else:
log.info(f'Skipping project {project.id} as {project.huc} is already in Athena with the same or newer date. DEX ts = {project_created_date_ts}; Athena ts={rme_in_athena[project.huc]}')
continue

if project.model_version is None:
log.warning(f'Project {project.id} does not have a model version. Skipping.')
Expand Down Expand Up @@ -296,21 +294,16 @@ def scrape_rme(
except Exception as e:
log.error(f'Error scraping HUC {project.huc}: {e}')
raise
prg.finish()


def main():
"""Process arguments, set up logs and orchestrate call to other functions"""
parser = argparse.ArgumentParser()
parser.add_argument('stage', help='Environment: staging or production', type=str)
parser.add_argument('spatialite_path', help='Path to the mod_spatialite library', type=str)
# Bucket configuration now via environment variables (RME_DATA_BUCKET, RME_ATHENA_OUTPUT_BUCKET)
# Remove old CLI bucket arguments to reduce noise and accidental misconfiguration.
parser.add_argument('working_folder', help='top level folder for downloads and output', type=str)
parser.add_argument('--tags', help='Data Exchange tags to search for projects', type=str)
parser.add_argument('--collection', help='Collection GUID', type=str)
parser.add_argument('--delete', help='Whether or not to delete downloaded GeoPackages', action='store_true', default=False)
parser.add_argument('--huc_filter', help='HUC filter SQL prefix ("17%")', type=str, default='')
parser.add_argument('--force_update', help='Generate and upload new parquet even if data exists', action='store_true', default=False)
args = dotenv.parse_args_env(parser)

# Set up some reasonable folders to store things
Expand All @@ -323,20 +316,6 @@ def main():

log.title("rme scrape to parquet to athena")

# Data Exchange Search Params
search_params = RiverscapesSearchParams({
'projectTypeId': 'rs_metric_engine',
})

if args.collection != '.':
search_params.collection = args.collection

if args.tags is not None and args.tags != '.':
search_params.tags = args.tags.split(',')

if args.huc_filter != '' and args.huc_filter != '.':
search_params.meta = {'HUC': args.huc_filter}

# Log bucket resolution
if ATHENA_OUTPUT_BUCKET == DATA_BUCKET:
log.warning(f"Using single bucket for data & Athena output: {DATA_BUCKET} (override with {OUTPUT_BUCKET_ENV_VAR})")
Expand All @@ -347,12 +326,9 @@ def main():
scrape_rme(
api,
args.spatialite_path,
search_params,
download_folder,
DATA_BUCKET,
ATHENA_OUTPUT_BUCKET,
args.delete,
args.force_update,
args.delete
)

log.info('Process complete')
Expand Down
52 changes: 51 additions & 1 deletion pydex/lib/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,63 @@
* athena
* cybercastor_scripts

Consider porting any improvements to these other repositories.
Consider porting any improvements to/from these other repositories.
"""
import time
import re
import uuid
# 3rd party
import boto3
import awswrangler as wr
import pandas as pd

from rsxml import Logger
S3_ATHENA_BUCKET = "riverscapes-athena-output"


def query_to_dataframe(query: str, querylabel: str = "") -> pd.DataFrame:
"""uses awswrangler to return a DataFrame for a given query
args:
query : the query to execute
querylabel : optional label for log messages, handy when queries are running in parallel

* ctas False, unload True is "approach 2"
Does an UNLOAD query on Athena and parse the Parquet result on s3
see docs for details https://aws-sdk-pandas.readthedocs.io/en/3.14.0/stubs/awswrangler.athena.read_sql_query.html

PROS:
* Faster for mid and big result sizes
* Can handle some level of nested types.
* Does not modify Glue Data Catalog

CONS:
* Output S3 path must be empty (thus use UUID).
* Does not support timestamp with time zone.
* Does not support columns with repeated names.
* Does not support columns with undefined data types.
* data has to fit into RAM memory. do not use for results with millions of rows
"""
log = Logger("Athena unload query to DF")
s3_output = f's3://{S3_ATHENA_BUCKET}/athena_unload/{uuid.uuid4()}/'

query_bytes = len(query.encode('utf-8'))
if query_bytes > 262144:
raise ValueError(f"Query exceeds Athena's 256 KB limit ({query_bytes} bytes).")

log.debug(f"Query {querylabel}:\n{query}")
try:
df = wr.athena.read_sql_query(
query,
database='default',
ctas_approach=False,
unload_approach=True, # only PARQUET format is supported with this option
s3_output=s3_output,
)
log.debug(f"Query {querylabel} to dataframe completed.")
return df
except Exception as e:
log.warning(f"Query {querylabel} failed or returned no results: {e}")
return pd.DataFrame() # Return empty DataFrame for downstream code


def fix_s3_uri(argstr: str) -> str:
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ dependencies = [
"six==1.17.0",
"termcolor==2.5.0",
"urllib3>=2.3",
"riverscapes-metadata @ git+https://github.com/Riverscapes/RiverscapesXML.git@master#subdirectory=riverscapes_metadata"
"riverscapes-metadata @ git+https://github.com/Riverscapes/RiverscapesXML.git@master#subdirectory=riverscapes_metadata",
"awswrangler>=3.10.1",
"pandas>=2.3.3",
]

[project.optional-dependencies]
Expand Down
Loading