diff --git a/dataretrieval/waterdata/types.py b/dataretrieval/waterdata/types.py index f5e1496b..93b8d19b 100644 --- a/dataretrieval/waterdata/types.py +++ b/dataretrieval/waterdata/types.py @@ -74,3 +74,62 @@ "count", ], } + + +# --- CF / xarray vocabulary mappings --------------------------------------- +# Lookup tables used by :mod:`dataretrieval.waterdata.xarray` to translate +# USGS terms into CF-conventions metadata. Each is intentionally partial: +# anything not listed falls back to a sensible default (raw unit string kept +# verbatim; no standard_name emitted) rather than guessing a wrong CF term. +# They are plain data, so they live here rather than in the (xarray-optional) +# converter module and can be extended without importing xarray. + +# USGS unit strings -> UDUNITS / CF-canonical form. +CF_UNIT_MAP = { + "ft^3/s": "ft3 s-1", + "ft3/s": "ft3 s-1", + "ft": "ft", + "in": "in", + "degC": "degC", + "deg C": "degC", + "uS/cm": "uS/cm", + "mg/l": "mg L-1", + "mg/L": "mg L-1", + # UDUNITS 'ton' is the US short ton; 'short_ton' is not a valid UDUNITS name. + "tons/day": "ton day-1", + "%": "percent", +} + +# USGS statistic_id -> the operator in a CF ``cell_methods`` string. +CF_CELL_METHODS = { + "00001": "maximum", + "00002": "minimum", + "00003": "mean", + "00006": "sum", + "00008": "median", + "00011": "point", # instantaneous +} + +# USGS 5-digit parameter code -> CF standard_name. Deliberately conservative; +# codes without a confident match are left without a standard_name. +CF_STANDARD_NAMES = { + "00060": "water_volume_transport_in_river_channel", + # 00010 (water temperature) is intentionally omitted: ``water_temperature`` + # is NOT a CF standard name, and the only valid CF water-temperature name, + # ``sea_water_temperature``, is wrong-domain for USGS freshwater/groundwater. + # Leaving it unmapped keeps the variable's ``long_name`` without emitting an + # invalid or misleading ``standard_name``. + "00065": "water_surface_height_above_reference_datum", + "63160": "water_surface_height_above_reference_datum", + "00045": "lwe_thickness_of_precipitation_amount", +} + +# USGS parameter code -> vertical reference datum, attached as a +# ``vertical_datum`` attribute. The two water-surface-height parameters share +# the CF standard_name water_surface_height_above_reference_datum, so the datum +# distinguishes them: gage height (00065) is measured from a local site (gage) +# datum, while stream water level (63160) is referenced to NAVD88. +CF_VERTICAL_DATUM = { + "00065": "local site datum", + "63160": "NAVD88", +} diff --git a/dataretrieval/waterdata/xarray.py b/dataretrieval/waterdata/xarray.py new file mode 100644 index 00000000..2362181f --- /dev/null +++ b/dataretrieval/waterdata/xarray.py @@ -0,0 +1,909 @@ +"""xarray-returning wrappers for the Water Data getters. + +Each public function here mirrors the same-named function in +:mod:`dataretrieval.waterdata`, but returns a CF-conventions +:class:`xarray.Dataset` instead of a :class:`pandas.DataFrame`. + +By default the data is returned as a CF *contiguous ragged array* +(``featureType = "timeSeries"``): every observation is concatenated along a +single ``obs`` dimension, and each (monitoring location, parameter, +statistic) series is one instance along a ``timeseries`` dimension whose +``row_size`` records how many observations it contributes. This stores only +real observations -- no NaN fill -- so it scales to large, very ragged +multi-site pulls where record lengths differ by decades. Parameter, +statistic, unit, and location identity become per-instance metadata, and +``time`` is a 1-D coordinate along ``obs``. The trade-off is that ``time`` +is no longer a dimension you can index directly: to select by time you +first regroup the flat ``obs`` back into per-series views (e.g. with +``cf-xarray``, or via the offsets implied by ``row_size``). + +Pass ``dense=True`` for the alternative gridded layout: one named data +variable per parameter on a ``(monitoring_location_id, time)`` grid, NaN +where a series has no observation. This is ergonomic for a few overlapping +series (``ds["discharge"].sel(time=...)`` just works) but the union time +axis and NaN fill make it memory-costly for large ragged collections. + +Either way the CF metadata is derived from columns the getter already +returns (``unit_of_measure`` -> ``units``, ``statistic_id`` -> +``cell_methods``, ``parameter_code`` -> ``standard_name``), plus the +human-readable parameter name from a small cached metadata lookup; the +monitoring location carries ``cf_role = "timeseries_id"`` with ``longitude`` +/ ``latitude`` (and ``hydrologic_unit_code`` / ``state_name`` when the +metadata call already provides them), and dataset attributes carry +``Conventions``, provenance, the request URL, and ``date_modified``. + +This module requires the optional ``xarray`` dependency:: + + pip install dataretrieval[xarray] +""" + +from __future__ import annotations + +import datetime as _dt +import inspect as _inspect +import re as _re +import warnings as _warnings +from collections.abc import Callable +from dataclasses import dataclass, field, replace +from functools import wraps as _wraps + +import pandas as _pd + +try: + import xarray as _xr +except ModuleNotFoundError as _exc: # pragma: no cover - exercised only sans xarray + raise ModuleNotFoundError( + "dataretrieval.waterdata.xarray requires the optional 'xarray' " + "dependency. Install it with: pip install dataretrieval[xarray]" + ) from _exc + +from . import api as _api +from .nearest import get_nearest_continuous as _get_nearest_continuous +from .types import ( + CF_CELL_METHODS, + CF_STANDARD_NAMES, + CF_UNIT_MAP, + CF_VERTICAL_DATUM, +) + +__all__ = [ + "get_continuous", + "get_daily", + "get_field_measurements", + "get_latest_continuous", + "get_latest_daily", + "get_nearest_continuous", + "get_peaks", + "get_samples", + "get_stats_date_range", + "get_stats_por", +] + + +# The CF vocabulary lookups (USGS units -> UDUNITS, statistic_id -> +# cell_methods operator, parameter_code -> standard_name) are plain data and +# live in ``types`` -- imported as CF_UNIT_MAP / CF_CELL_METHODS / +# CF_STANDARD_NAMES at the top of this module. + +# Columns kept off the value pivot but surfaced as ancillary (flag) variables. +_ANCILLARY = ("qualifier", "approval_status") + + +# --- metadata lookups (cached per process) --------------------------------- + +_TS_META_CACHE: dict[str, dict] = {} +_FIELD_META_CACHE: dict[str, dict] = {} + +# Only the human-readable name is sourced from the metadata endpoint; units, +# statistic, and parameter code all come from the values frame itself. +_NAME_DESCRIPTORS = ("parameter_name", "parameter_description") + +# Per-site descriptors that ride along on the same metadata call (no extra +# request) and are surfaced as instance-indexed auxiliary coordinates. Only +# fields the endpoint already returns are used -- station name/altitude live on +# the monitoring-locations endpoint and would need a separate call, so they are +# intentionally not fetched here. +_SITE_DESCRIPTORS = ("hydrologic_unit_code", "state_name") + +# CF attributes for the instance-indexed site coordinates built from the +# descriptors above. +_SITE_COORD_ATTRS = { + "hydrologic_unit_code": {"long_name": "hydrologic unit code (HUC)"}, + "state_name": {"long_name": "state name"}, +} + + +def _lookup(site_ids, cache, getter): + """Fetch and cache the series + site descriptors for ``site_ids``. + + Returns ``(param_meta, site_meta)``: ``param_meta`` is + ``{parameter_code: {name descriptors}}`` (keyed by the stable parameter + code, so no hash id is needed) and ``site_meta`` is + ``{monitoring_location_id: {site descriptors present in the response}}``. + One metadata call per not-yet-cached batch of sites; the cache is keyed by + site so repeated getter calls reuse it. + """ + sites = sorted({str(s) for s in site_ids if _pd.notna(s)}) + todo = [s for s in sites if s not in cache] + if todo: + meta, _ = getter(monitoring_location_id=todo) + for s in todo: + cache[s] = {"params": {}, "site": {}} + if not meta.empty: + name_cols = [c for c in _NAME_DESCRIPTORS if c in meta.columns] + site_cols = [c for c in _SITE_DESCRIPTORS if c in meta.columns] + has_pcode = "parameter_code" in meta.columns + for _, row in meta.iterrows(): + site = row.get("monitoring_location_id") + if site not in cache: + continue + if has_pcode: + pcode = row.get("parameter_code") + if _pd.notna(pcode): + cache[site]["params"][str(pcode)] = { + c: row.get(c) for c in name_cols + } + if not cache[site]["site"]: + desc = {c: row.get(c) for c in site_cols if _pd.notna(row.get(c))} + if desc: + cache[site]["site"] = desc + param_meta: dict[str, dict] = {} + site_meta: dict[str, dict] = {} + for s in sites: + entry = cache.get(s, {}) + param_meta.update(entry.get("params", {})) + if entry.get("site"): + site_meta[s] = entry["site"] + return param_meta, site_meta + + +def _timeseries_metadata(site_ids): + return _lookup(site_ids, _TS_META_CACHE, _api.get_time_series_metadata) + + +def _field_metadata(site_ids): + return _lookup(site_ids, _FIELD_META_CACHE, _api.get_field_measurements_metadata) + + +# --- helpers --------------------------------------------------------------- + + +def _slug(name) -> str: + """A lower_snake_case, identifier-safe variable name.""" + s = _re.sub(r"[^0-9a-zA-Z]+", "_", str(name).strip().lower()).strip("_") + return s or "value" + + +def _first(series): + """First non-null value of a column, or None.""" + nonnull = series.dropna() + return nonnull.iloc[0] if len(nonnull) else None + + +def _date_modified(df): + """Most recent upstream record-refresh time in the frame, ISO-8601 or None. + + ``last_modified`` reflects when each record was last refreshed in the USGS + database; the maximum is surfaced as the dataset's ``date_modified`` (ACDD) + so a reader knows how current the pull is. + """ + if df is None or "last_modified" not in getattr(df, "columns", ()): + return None + ts = _pd.to_datetime(df["last_modified"], errors="coerce", utc=True).dropna() + return ts.max().isoformat() if len(ts) else None + + +def _prepare_values(df, group_cols, ancillary_cols): + """Slim the frame and coerce types, shared by the dense / ragged builders. + + Keeps only the columns we convert, parses ``time`` to naive-UTC + ``datetime64`` (xarray has no tz dtype), coerces ``value`` to numeric, and + drops rows whose ``time`` is unparseable/missing (with a warning, so + observations are never lost without a trace). Returns + ``(work, group_cols, ancillary, has_unit)`` filtered to columns present. + """ + group_cols = [c for c in group_cols if c in df.columns] + ancillary = [c for c in ancillary_cols if c in df.columns] + has_unit = "unit_of_measure" in df.columns + cols = [_INSTANCE, "time", "value", *group_cols, *ancillary] + if has_unit: + cols.append("unit_of_measure") + present = [c for c in dict.fromkeys(cols) if c in df.columns] + work = df.loc[:, present].copy() + # Instance id, time, and value are mandatory to build a series. A response + # that lacks any of them (e.g. a non-result Samples profile) has nothing to + # convert, so return an empty frame -> empty Dataset rather than KeyError. + if not {_INSTANCE, "time", "value"}.issubset(work.columns): + return work.iloc[0:0], group_cols, ancillary, has_unit + work["time"] = _pd.to_datetime( + work["time"], errors="coerce", utc=True + ).dt.tz_localize(None) + work["value"] = _pd.to_numeric(work["value"], errors="coerce") + n_before = len(work) + work = work[work["time"].notna()] + dropped = n_before - len(work) + if dropped: + _warnings.warn( + f"dropped {dropped} row(s) with an unparseable or missing time.", + stacklevel=3, + ) + return work, group_cols, ancillary, has_unit + + +def _var_attrs(desc, *, unit, pcode, stat, default_cell_method, ancillary, name): + """Build the CF attribute dict for one data variable. + + ``unit``, ``pcode`` and ``stat`` are read from the values frame; ``desc`` + supplies only the human-readable name from the metadata lookup. + """ + attrs: dict[str, str] = {} + long_name = desc.get("parameter_description") or desc.get("parameter_name") + if long_name and _pd.notna(long_name): + attrs["long_name"] = str(long_name) + + if unit is not None and _pd.notna(unit): + attrs["units"] = CF_UNIT_MAP.get(str(unit), str(unit)) + + op = ( + CF_CELL_METHODS.get(str(stat)) if stat is not None and _pd.notna(stat) else None + ) + op = op or default_cell_method + if op: + attrs["cell_methods"] = f"time: {op}" + + if pcode is not None and _pd.notna(pcode): + sn = CF_STANDARD_NAMES.get(str(pcode)) + if sn: + attrs["standard_name"] = sn + datum = CF_VERTICAL_DATUM.get(str(pcode)) + if datum: + attrs["vertical_datum"] = datum + attrs["usgs_parameter_code"] = str(pcode) + + if stat is not None and _pd.notna(stat): + attrs["usgs_statistic_id"] = str(stat) + + if ancillary: + attrs["ancillary_variables"] = " ".join(f"{name}_{c}" for c in ancillary) + return attrs + + +def _dataset_attrs(service, base_meta): + attrs = { + "Conventions": "CF-1.11", + "institution": "U.S. Geological Survey", + "source": f"USGS Water Data API ({service})", + "featureType": "timeSeries", + "history": ( + f"{_dt.datetime.now(_dt.timezone.utc).isoformat(timespec='seconds')} " + "created by dataretrieval.waterdata.xarray" + ), + } + url = getattr(base_meta, "url", None) + if url: + attrs["references"] = str(url) + return attrs + + +def _empty_dataset(service, base_meta): + ds = _xr.Dataset() + ds.attrs = _dataset_attrs(service, base_meta) + return ds + + +def _lonlat(geom): + """``(lon, lat)`` from a geometry, or None if it isn't a point. + + Geometry comes back as a shapely ``Point`` when geopandas is installed but + as a plain ``[lon, lat]`` list when it is not (the OGC GeoJSON coordinates + are flattened into a list). Handle both so the spatial coordinates survive + either way; anything else (a polygon, a malformed cell) is skipped rather + than guessed. + """ + x, y = getattr(geom, "x", None), getattr(geom, "y", None) + if x is not None and y is not None: + return x, y + if isinstance(geom, (list, tuple)) and len(geom) >= 2: + try: + return float(geom[0]), float(geom[1]) + except (TypeError, ValueError): + return None + return None + + +def _point_coords(df, inst): + """lon/lat per instance from point geometry, or None.""" + if "geometry" not in df.columns: + return None + geo = df.dropna(subset=["geometry"]).drop_duplicates(inst) + if geo.empty: + return None + lon, lat = {}, {} + for _, row in geo.iterrows(): + xy = _lonlat(row["geometry"]) + if xy is not None: + lon[row[inst]], lat[row[inst]] = xy + if not lon: + return None # no point geometry; skip rather than guess + return lon, lat + + +_INSTANCE = "monitoring_location_id" + + +# --- column vocabulary ----------------------------------------------------- + +# Water-quality samples (Samples DB / WQX) speak a different column vocabulary +# than the time-series getters; map their tidy backbone onto the canonical +# names the builders understand. +_SAMPLES_RENAME = { + "Location_Identifier": _INSTANCE, + "Activity_StartDateTime": "time", + "Result_Measure": "value", + "Result_MeasureUnit": "unit_of_measure", + "Result_Characteristic": "characteristic", + "Result_SampleFraction": "sample_fraction", + "Result_ResultDetectionCondition": "detection_condition", + "Result_MeasureStatusIdentifier": "status", +} +_CANONICAL_COORD_ATTRS = { + "parameter_code": {"long_name": "USGS parameter code"}, + "statistic_id": {"long_name": "USGS statistic code"}, + "unit_of_measure": {"long_name": "unit of measurement"}, +} +_SAMPLES_COORD_ATTRS = { + "characteristic": {"long_name": "characteristic name"}, + "sample_fraction": {"long_name": "sample fraction"}, + "unit_of_measure": {"long_name": "unit of measurement"}, + "detection_condition": {"long_name": "result detection condition"}, + "status": {"long_name": "result status"}, +} + + +@dataclass(frozen=True) +class _Schema: + """How a service's columns map onto the canonical builder vocabulary. + + One object describes a column dialect so a single set of builders serves + both the time-series getters and the (differently-named) Samples data: + + * ``rename`` -- source -> canonical column names (empty for the getters + that already use the canonical names); + * ``group_cols`` -- columns that identify one series (an instance); + * ``ancillary`` -- per-observation flag columns carried alongside ``value``; + * ``label_col`` -- the column whose value names the variable / ``long_name`` + (``parameter_code`` for the getters, ``characteristic`` for samples); + * ``infer_standard_name`` -- whether ``label_col`` is a USGS parameter code + eligible for a CF ``standard_name`` lookup (False for free-text + characteristics); + * ``coord_attrs`` -- ``long_name`` etc. for the per-instance metadata vars. + """ + + rename: dict = field(default_factory=dict) + group_cols: tuple = ("parameter_code", "statistic_id") + ancillary: tuple = _ANCILLARY + label_col: str = "parameter_code" + infer_standard_name: bool = True + coord_attrs: dict = field(default_factory=dict) + + +_CANONICAL = _Schema(coord_attrs=_CANONICAL_COORD_ATTRS) +_FIELD = replace(_CANONICAL, group_cols=("parameter_code",)) +_SAMPLES = _Schema( + rename=_SAMPLES_RENAME, + group_cols=("characteristic", "sample_fraction"), + ancillary=("detection_condition", "status"), + label_col="characteristic", + infer_standard_name=False, + coord_attrs=_SAMPLES_COORD_ATTRS, +) + + +def _build_dense( + df, + base_meta, + *, + service, + schema=_CANONICAL, + series_meta=None, + site_meta=None, + default_cell_method=None, +): + """Hash-free values frame -> dense CF timeSeries Dataset (one var/parameter). + + Parameters are pivoted onto a shared ``(monitoring_location_id, time)`` + grid, NaN where a series has no observation. Every CF attribute is derived + from ``parameter_code`` / ``statistic_id`` / ``unit_of_measure`` already + present, plus the human-readable name from ``series_meta`` keyed by + ``parameter_code``. Ergonomic for a few overlapping series but memory-costly + for large ragged collections; see :func:`_build_ragged` for the default. + """ + series_meta = series_meta or {} + if df is None or len(df) == 0: + return _empty_dataset(service, base_meta) + df = df.rename(columns=schema.rename) if schema.rename else df + work, group_cols, ancillary, has_unit = _prepare_values( + df, schema.group_cols, schema.ancillary + ) + if work.empty: + return _empty_dataset(service, base_meta) + + datasets, used = [], set() + for _, group in work.groupby(group_cols, dropna=False): + pcode = _first(group["parameter_code"]) if "parameter_code" in group else None + stat = _first(group["statistic_id"]) if "statistic_id" in group else None + group_units = group["unit_of_measure"].dropna().unique() if has_unit else () + unit = group_units[0] if len(group_units) else None + desc = series_meta.get(str(pcode), {}) if pcode is not None else {} + + name = _slug(desc.get("parameter_name") or pcode) + if name in used: # same parameter, different statistic -> distinct var + op = CF_CELL_METHODS.get(str(stat)) or (str(stat) if stat else None) + name = f"{name}_{_slug(op)}" if op else name + while name in used: + name += "_x" + used.add(name) + + if len(group_units) > 1: + # One variable can carry only one ``units`` attr; surface the + # mix instead of silently labeling every value with the first. + _warnings.warn( + f"'{name}' spans multiple units {list(group_units)}; labeling " + f"with '{unit}'. Filter by site/parameter to avoid mixing " + "units in one variable.", + stacklevel=3, + ) + + sub = group.set_index([_INSTANCE, "time"])[["value", *ancillary]] + if not sub.index.is_unique: + _warnings.warn( + f"'{name}' has multiple values per (site, time) -- two series " + "share this (site, parameter, statistic); keeping the first. " + "Filter the query to separate them.", + stacklevel=3, + ) + sub = sub[~sub.index.duplicated(keep="first")] + ds_g = sub.to_xarray().rename( + {"value": name, **{c: f"{name}_{c}" for c in ancillary}} + ) + ds_g[name].attrs = _var_attrs( + desc, + unit=unit, + pcode=pcode, + stat=stat, + default_cell_method=default_cell_method, + ancillary=ancillary, + name=name, + ) + datasets.append(ds_g) + + # Outer join on time: parameters sampled on different clocks share a + # union time axis, NaN where a given parameter has no observation. + ds = _xr.merge(datasets, combine_attrs="drop_conflicts", join="outer") + ds.attrs = _dataset_attrs(service, base_meta) + dm = _date_modified(df) + if dm: + ds.attrs["date_modified"] = dm + ds["time"].attrs.setdefault("standard_name", "time") + if _INSTANCE in ds.coords: + ds[_INSTANCE].attrs.setdefault("cf_role", "timeseries_id") + + coords = _point_coords(df, _INSTANCE) + if coords is not None: + lon, lat = coords + order = list(ds[_INSTANCE].values) + ds = ds.assign_coords( + longitude=(_INSTANCE, [lon.get(k) for k in order]), + latitude=(_INSTANCE, [lat.get(k) for k in order]), + ) + ds["longitude"].attrs = {"standard_name": "longitude", "units": "degrees_east"} + ds["latitude"].attrs = {"standard_name": "latitude", "units": "degrees_north"} + + # Instance-indexed site descriptors carried along on the metadata call + # (HUC, state); added only where present so absent fields don't appear. + if site_meta and _INSTANCE in ds.coords: + order = list(ds[_INSTANCE].values) + for col, col_attrs in _SITE_COORD_ATTRS.items(): + vals = [site_meta.get(str(k), {}).get(col) for k in order] + if any(v is not None for v in vals): + ds = ds.assign_coords({col: (_INSTANCE, vals)}) + ds[col].attrs.update(col_attrs) + + return ds + + +# --- ragged (CF contiguous ragged array) builders -------------------------- + + +def _assemble_ragged(work, *, inst_cols, ancillary, inst_first=()): + """Cleaned values frame -> CF contiguous-ragged Dataset skeleton. + + ``work`` must already be time-parsed (naive UTC), value-coerced, and free + of NaT times. Observations are concatenated along a single ``obs`` + dimension, sorted so each instance's rows are contiguous; ``row_size`` + records how many obs each instance contributes (the CF ``sample_dimension`` + link). ``inst_cols`` define instance identity; ``inst_first`` columns + (e.g. ``unit_of_measure``) are carried as one value per instance. Returns + ``(ds, inst_frame)`` where ``inst_frame`` is one row per instance. + """ + work = work.sort_values([*inst_cols, "time"], kind="stable") + grp = work.groupby(inst_cols, dropna=False, sort=False) + row_size = grp.size() + idx = row_size.index + inst_frame = ( + idx.to_frame(index=False) + if isinstance(idx, _pd.MultiIndex) + else _pd.DataFrame({inst_cols[0]: idx}) + ) + data_vars = { + "value": ("obs", work["value"].to_numpy()), + "row_size": ("timeseries", row_size.to_numpy().astype("int32")), + } + for c in ancillary: + data_vars[c] = ("obs", work[c].to_numpy()) + coords = {"time": ("obs", work["time"].to_numpy())} + for c in inst_cols: + coords[c] = ("timeseries", inst_frame[c].to_numpy()) + for c in inst_first: + coords[c] = ("timeseries", grp[c].first().to_numpy()) + return _xr.Dataset(data_vars, coords=coords), inst_frame + + +def _finalize_ragged( + ds, geo_df, inst_frame, *, service, base_meta, ancillary, value_attrs +): + """Attach the structural + provenance CF metadata shared by ragged builders. + + Sets the ``value`` attrs (with ``ancillary_variables`` linked), dataset + attributes (``Conventions`` / provenance / ``date_modified``), the + ``row_size`` ``sample_dimension`` link, a per-instance ``timeseries_id`` + carrying ``cf_role`` (a site alone isn't unique once it has several + series), and ``longitude`` / ``latitude`` per instance from ``geo_df``. + """ + if ancillary: + value_attrs = {**value_attrs, "ancillary_variables": " ".join(ancillary)} + ds["value"].attrs = value_attrs + ds.attrs = _dataset_attrs(service, base_meta) + dm = _date_modified(geo_df) + if dm: + ds.attrs["date_modified"] = dm + ds["time"].attrs.setdefault("standard_name", "time") + ds["row_size"].attrs = { + "long_name": "number of observations per time series", + "sample_dimension": "obs", + } + # Join the instance keys into a cf_role id, skipping null parts so a + # missing key (e.g. a characteristic with no sample fraction) doesn't + # render as a literal "nan" token. + ts_id = inst_frame.apply( + lambda r: ":".join(str(x) for x in r if _pd.notna(x)), axis=1 + ).to_numpy() + ds = ds.assign_coords(timeseries_id=("timeseries", ts_id)) + ds["timeseries_id"].attrs["cf_role"] = "timeseries_id" + ds[_INSTANCE].attrs.setdefault("long_name", "monitoring location identifier") + + coords = _point_coords(geo_df, _INSTANCE) + if coords is not None: + lon, lat = coords + sites = inst_frame[_INSTANCE].to_numpy() + ds = ds.assign_coords( + longitude=("timeseries", [lon.get(s) for s in sites]), + latitude=("timeseries", [lat.get(s) for s in sites]), + ) + ds["longitude"].attrs = {"standard_name": "longitude", "units": "degrees_east"} + ds["latitude"].attrs = {"standard_name": "latitude", "units": "degrees_north"} + return ds + + +def _build_ragged( + df, + base_meta, + *, + service, + schema=_CANONICAL, + series_meta=None, + site_meta=None, + default_cell_method=None, +): + """Hash-free values frame -> CF *contiguous ragged array* timeSeries Dataset. + + All observations are concatenated into one ``obs`` dimension; each series (a + ``schema.group_cols`` combination at a site) is an instance along + ``timeseries`` with ``row_size`` giving its length. Only real observations + are stored (no NaN fill), so this scales to large, very ragged multi-site + pulls. Per-series parameter / statistic / unit are instance coordinates; + descriptors homogeneous across instances are also written onto ``value``. + """ + series_meta = series_meta or {} + if df is None or len(df) == 0: + return _empty_dataset(service, base_meta) + work_df = df.rename(columns=schema.rename) if schema.rename else df + work, group_cols, ancillary, has_unit = _prepare_values( + work_df, schema.group_cols, schema.ancillary + ) + if work.empty: + return _empty_dataset(service, base_meta) + + inst_cols = [_INSTANCE, *group_cols] + ds, inst_frame = _assemble_ragged( + work, + inst_cols=inst_cols, + ancillary=ancillary, + inst_first=(["unit_of_measure"] if has_unit else []), + ) + + # Descriptors go on ``value`` only when homogeneous across instances; + # otherwise they vary per series and live on the instance coordinates. + labels = ( + inst_frame[schema.label_col].dropna().unique() + if schema.label_col in inst_frame + else [] + ) + stats = ( + inst_frame["statistic_id"].dropna().unique() + if "statistic_id" in inst_frame + else [] + ) + units = ( + _pd.unique(ds["unit_of_measure"].to_series().dropna()) + if "unit_of_measure" in ds.coords + else [] + ) + unit = units[0] if len(units) == 1 else None + if len(labels) == 1: + if schema.infer_standard_name: + desc, pcode = series_meta.get(str(labels[0]), {}), labels[0] + else: # free-text label (e.g. a characteristic): it *is* the name + desc, pcode = {"parameter_name": str(labels[0])}, None + value_attrs = _var_attrs( + desc, + unit=unit, + pcode=pcode, + stat=stats[0] if len(stats) == 1 else None, + default_cell_method=default_cell_method, + ancillary=(), + name="value", + ) + else: + value_attrs = { + "long_name": "measured value", + "comment": ( + "multiple series with differing metadata are stacked here; see " + "the per-timeseries coordinates for each series' identity" + ), + } + if unit is not None: + value_attrs["units"] = CF_UNIT_MAP.get(str(unit), str(unit)) + # A service-wide cell method (e.g. samples are instantaneous grabs) + # still applies when the statistic doesn't vary; the time-series getters + # leave per-parameter cell methods to the per-instance coordinates. + if default_cell_method and not schema.infer_standard_name: + value_attrs["cell_methods"] = f"time: {default_cell_method}" + + ds = _finalize_ragged( + ds, + work_df, + inst_frame, + service=service, + base_meta=base_meta, + ancillary=ancillary, + value_attrs=value_attrs, + ) + + for col, col_attrs in schema.coord_attrs.items(): + if col in ds.variables: + ds[col].attrs.update(col_attrs) + + if site_meta: + sites = inst_frame[_INSTANCE].to_numpy() + for col, col_attrs in _SITE_COORD_ATTRS.items(): + vals = [site_meta.get(str(s), {}).get(col) for s in sites] + if any(v is not None for v in vals): + ds = ds.assign_coords({col: ("timeseries", vals)}) + ds[col].attrs.update(col_attrs) + + return ds + + +def _build_stats(df, base_meta, service): + """Best-effort, preliminary conversion of the statistics tables. + + The statistics service returns percentile tables keyed by time-of-year + rather than a (time, value) series, so this produces a flat Dataset (one + variable per column over an ``index`` dimension) with dataset-level + provenance. A richer percentile/day-of-year layout is future work. + """ + if df is None or len(df) == 0: + return _empty_dataset(service, base_meta) + # The timeseries/samples builders surface only the columns they convert, so + # opaque hash IDs never reach those datasets. This flat path keeps every + # column, so drop the stats service's hash-valued IDs (and geometry) here to + # keep the CF dataset free of per-record UUID coordinates. + drop = ("geometry", "computation_id", "parent_time_series_id", "time_series_id") + flat = df.drop(columns=[c for c in drop if c in df.columns]) + ds = _xr.Dataset.from_dataframe(flat.reset_index(drop=True)) + ds.attrs = _dataset_attrs(service, base_meta) + ds.attrs["comment"] = "preliminary flat conversion; see module docs" + dm = _date_modified(df) + if dm: + ds.attrs["date_modified"] = dm + return ds + + +# --- public getters -------------------------------------------------------- + + +def _sites(df): + """Unique monitoring-location ids present in a values frame.""" + if "monitoring_location_id" in df: + return df["monitoring_location_id"].unique() + return [] + + +def _fetch(func, args, kwargs): + """Call the underlying getter, dropping a stray ``include_hash`` kwarg. + + The xarray builders surface only the columns they convert, so the opaque + hash-valued ID columns (per-record UUIDs, per-series join keys) never reach + the dataset regardless of what the getter returns. ``include_hash`` is not a + parameter of the plain getters, so it is swallowed here to keep passing it + to an xarray wrapper harmless. + """ + kwargs.pop("include_hash", None) + return func(*args, **kwargs) + + +def _xr_doc(func, *, cf_metadata=True, allow_dense=True): + """Prepend an xarray note to the wrapped getter's docstring. + + ``cf_metadata=False`` describes the preliminary stats path (a flat Dataset + without per-variable CF attributes); ``allow_dense=False`` describes a + ragged-only path (samples), where ``dense=`` does not apply. + """ + if not cf_metadata: + returns = ( + "a preliminary, flat ``xarray.Dataset`` (dataset-level provenance " + "only; per-variable CF metadata is not yet populated)" + ) + elif allow_dense: + returns = ( + "a CF-conventions ``xarray.Dataset`` as a contiguous ragged array " + "(pass ``dense=True`` for the NaN-filled (site, time) grid with one " + "named variable per parameter)" + ) + else: + returns = ( + "a CF-conventions ``xarray.Dataset`` as a contiguous ragged array " + "(always ragged; discrete samples are too sparse for a dense grid, " + "so ``dense=`` does not apply)" + ) + # A column-0 summary paragraph + a cleandoc'd body keeps the combined + # docstring valid numpydoc (the wrapped getter's first line is unindented + # but its body is source-indented, which would otherwise break dedent and + # over-indent the Parameters/Returns sections). + note = ( + "xarray wrapper: same arguments as " + f"``dataretrieval.waterdata.{func.__name__}``, but returns {returns}. " + "Hash-valued ID columns are always omitted here; the ``include_hash`` " + "flag does not apply." + ) + body = _inspect.cleandoc(func.__doc__) if func.__doc__ else "" + return f"{note}\n\n{body}" if body else note + + +@dataclass(frozen=True) +class _Service: + """Per-service configuration that drives one public xarray getter. + + The variation across services is data, not behaviour: which getter to + call, how to look up series metadata, the column ``schema``, the fallback + ``cell_methods`` operator, whether the result is a time series or the + preliminary stats table, and whether a dense grid is offered. + """ + + getter: Callable + service: str + metadata: str | None = None # "timeseries" | "field" | None + schema: _Schema = _CANONICAL + default_cell_method: str | None = None + layout: str = "series" # "series" | "stats" + allow_dense: bool = True + + +def _series_metadata(source, df): + """Resolve the named metadata source at call time (stays monkeypatchable).""" + if source == "timeseries": + return _timeseries_metadata(_sites(df)) + if source == "field": + return _field_metadata(_sites(df)) + return {}, {} + + +def _make_getter(spec): + """Build the public getter for one ``_Service`` spec.""" + + has_dense = spec.layout != "stats" and spec.allow_dense + + @_wraps(spec.getter) # carry the real signature/__wrapped__ for help()/IDEs + def getter(*args, dense=False, **kwargs): + if dense and not has_dense: + _warnings.warn( + f"{spec.getter.__name__} has no dense layout; ignoring dense=True.", + stacklevel=2, + ) + dense = False + df, base_meta = _fetch(spec.getter, args, kwargs) + if spec.layout == "stats": + return _build_stats(df, base_meta, spec.service) + series_meta, site_meta = _series_metadata(spec.metadata, df) + build = _build_dense if dense else _build_ragged + return build( + df, + base_meta, + service=spec.service, + schema=spec.schema, + series_meta=series_meta, + site_meta=site_meta, + default_cell_method=spec.default_cell_method, + ) + + # ``@_wraps`` copied the getter's docstring; replace it with the xarray note. + getter.__doc__ = _xr_doc( + spec.getter, + cf_metadata=(spec.layout != "stats"), + allow_dense=spec.allow_dense, + ) + return getter + + +get_daily = _make_getter(_Service(_api.get_daily, "daily", "timeseries")) +get_continuous = _make_getter(_Service(_api.get_continuous, "continuous", "timeseries")) +get_latest_continuous = _make_getter( + _Service( + _api.get_latest_continuous, + "latest-continuous", + "timeseries", + default_cell_method="point", + ) +) +get_latest_daily = _make_getter( + _Service( + _api.get_latest_daily, + "latest-daily", + "timeseries", + default_cell_method="point", + ) +) +get_nearest_continuous = _make_getter( + _Service( + _get_nearest_continuous, + "continuous", + "timeseries", + default_cell_method="point", + ) +) +get_peaks = _make_getter( + _Service(_api.get_peaks, "peaks", "timeseries", default_cell_method="maximum") +) +get_field_measurements = _make_getter( + _Service( + _api.get_field_measurements, + "field-measurements", + "field", + schema=_FIELD, + default_cell_method="point", + ) +) +get_stats_por = _make_getter(_Service(_api.get_stats_por, "statistics", layout="stats")) +get_stats_date_range = _make_getter( + _Service(_api.get_stats_date_range, "statistics", layout="stats") +) +get_samples = _make_getter( + _Service( + _api.get_samples, + "samples", + schema=_SAMPLES, + default_cell_method="point", + allow_dense=False, + ) +) diff --git a/demos/waterdata_xarray_demo.ipynb b/demos/waterdata_xarray_demo.ipynb new file mode 100644 index 00000000..ab9d521f --- /dev/null +++ b/demos/waterdata_xarray_demo.ipynb @@ -0,0 +1,368 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# CF-conventions `xarray` datasets from the `waterdata` module\n", + "\n", + "The `dataretrieval.waterdata.xarray` module mirrors the `waterdata` getters\n", + "(`get_daily`, `get_continuous`, `get_peaks`, `get_samples`, …) but returns a\n", + "[CF-conventions](https://cfconventions.org/) [`xarray.Dataset`](https://docs.xarray.dev/)\n", + "instead of a `pandas.DataFrame`. Parameter names, units, statistics, and station\n", + "metadata are looked up and written onto the dataset as CF attributes, so the result\n", + "is self-describing and ready to write to netCDF.\n", + "\n", + "This notebook covers:\n", + "\n", + "1. the default **ragged** layout and how to read it;\n", + "2. **why** ragged is the default (and the `dense=True` opt-out);\n", + "3. the one trade-off you need to know — **selecting by time**;\n", + "4. multiple parameters in one pull;\n", + "5. water-quality samples (`get_samples`);\n", + "6. writing CF netCDF." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "The xarray helpers are an optional extra:\n", + "\n", + "```bash\n", + "pip install dataretrieval[xarray]\n", + "```\n", + "\n", + "As with the rest of `waterdata`, signing up for a free\n", + "[API key](https://api.waterdata.usgs.gov/signup/) gives you higher rate limits.\n", + "Import the xarray wrappers under a short alias so they don't shadow the plain\n", + "`DataFrame`-returning getters:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "from dataretrieval.waterdata import xarray as wdx" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. A single time series\n", + "\n", + "Pull a year of daily mean discharge (parameter `00060`, statistic `00003`) at one\n", + "gage. The wrapper takes the same arguments as `waterdata.get_daily`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ds = wdx.get_daily(\n", + " monitoring_location_id=\"USGS-05407000\",\n", + " parameter_code=\"00060\",\n", + " statistic_id=\"00003\",\n", + " time=\"2023-01-01/2024-01-01\",\n", + ")\n", + "ds" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Note the shape of the result — this is a CF **contiguous ragged array**\n", + "(`featureType = \"timeSeries\"`):\n", + "\n", + "* every observation lives along a single `obs` dimension;\n", + "* each *series* — one `(monitoring location, parameter, statistic)` — is one\n", + " instance along the `timeseries` dimension;\n", + "* `row_size` records how many observations each series contributes (the CF\n", + " `sample_dimension` link), and `timeseries_id` carries `cf_role`;\n", + "* because there is a single, homogeneous parameter here, the descriptors land\n", + " directly on `value` (`long_name`, `units`, `cell_methods`, `standard_name`).\n", + "\n", + "The flag columns (`qualifier`, `approval_status`) are linked as\n", + "`ancillary_variables`, and dataset attributes carry provenance:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(ds[\"value\"].attrs)\n", + "print({k: ds.attrs[k] for k in (\"Conventions\", \"featureType\", \"references\", \"date_modified\")})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Why ragged is the default\n", + "\n", + "Real collections are *ragged*: some gages have a century of record, others a few\n", + "years. Pull discharge at two gages with very different start dates and look at\n", + "`row_size`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sites = [\"USGS-05407000\", \"USGS-02238500\"] # since 1913 vs since 2008\n", + "ragged = wdx.get_daily(\n", + " monitoring_location_id=sites, parameter_code=\"00060\", statistic_id=\"00003\"\n", + ")\n", + "print(\"dims :\", dict(ragged.sizes))\n", + "print(\"row_size :\", dict(zip(ragged[\"monitoring_location_id\"].values, ragged[\"row_size\"].values)))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The alternative is a **dense** `(monitoring_location_id, time)` grid — one named\n", + "variable per parameter, NaN where a series has no observation. It is convenient\n", + "(see the next section) but pays for a union time axis and NaN fill. Pass\n", + "`dense=True` to get it, and compare the in-memory size:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dense = wdx.get_daily(\n", + " monitoring_location_id=sites, parameter_code=\"00060\", statistic_id=\"00003\",\n", + " dense=True,\n", + ")\n", + "print(\"dense dims :\", dict(dense.sizes))\n", + "print(\"dense var :\", list(dense.data_vars))\n", + "print(f\"ragged {ragged.nbytes/1e6:6.2f} MB dense {dense.nbytes/1e6:6.2f} MB\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "With only two gages the gap is modest, but it grows fast: a whole state's daily\n", + "discharge can be 15–30% dense, where the gridded layout balloons to hundreds of\n", + "megabytes (mostly NaN) while the ragged array stores only real observations.\n", + "That is why ragged is the default." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. The trade-off: selecting by time\n", + "\n", + "This is the one thing to internalize about the ragged layout.\n", + "\n", + "In the **dense** dataset, `time` is a real dimension with an index, so\n", + "label-based selection just works:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# all sites on a given day, addressed as a (site, time) grid\n", + "dense[\"discharge\"].sel(time=\"2020-06-01\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In the **ragged** dataset, `value` is one flat list along `obs`, and `time` is\n", + "*just another variable* riding along `obs` — not a dimension. So `value` cannot\n", + "be addressed as a `(site, time)` grid, and a time selection returns whatever\n", + "observations happen to match, **mixed across sites with no labels** (and\n", + "identical dates across sites collide onto the same axis):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"value dims:\", ragged[\"value\"].dims) # ('obs',) -- not (site, time)\n", + "# .sel(time=...) on the flat obs axis can't give you a per-series slice\n", + "ragged[\"value\"].sel(time=\"2020-06-01\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To recover the convenient, time-indexed view you first **regroup** the flat\n", + "`obs` back into per-series pieces using the offsets implied by `row_size`. A\n", + "tiny helper:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def series(ds, i):\n", + " \"\"\"Instance ``i`` of a ragged dataset as a time-indexed DataArray.\"\"\"\n", + " starts = np.concatenate([[0], np.cumsum(ds[\"row_size\"].values)])\n", + " sl = slice(int(starts[i]), int(starts[i + 1]))\n", + " da = ds[\"value\"].isel(obs=sl)\n", + " return da.assign_coords(time=ds[\"time\"].isel(obs=sl)).swap_dims(obs=\"time\")\n", + "\n", + "\n", + "s0 = series(ragged, 0)\n", + "print(ragged[\"timeseries_id\"].values[0])\n", + "s0.sel(time=\"2020-06-01\") # now .sel(time=...) works on a single series" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "For anything beyond a one-off slice, the [`cf-xarray`](https://cf-xarray.readthedocs.io/)\n", + "package understands the CF ragged encoding (`cf_role`, `sample_dimension`) and\n", + "will regroup/decode for you. Or, when you know the pull is small and overlapping,\n", + "just ask for `dense=True` up front and use `.sel(time=...)` directly.\n", + "\n", + "**Rule of thumb:** ragged for storage and large multi-site pulls; `dense=True`\n", + "for ergonomic time-based slicing of a few overlapping series." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Multiple parameters in one pull\n", + "\n", + "When a pull mixes parameters (and units), the ragged layout keeps a single\n", + "`value` and records the parameter/unit per *instance* — so nothing is\n", + "mislabeled. The homogeneous descriptors drop off `value` and live on the\n", + "`parameter_code` / `unit_of_measure` coordinates instead:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "multi = wdx.get_daily(\n", + " monitoring_location_id=\"USGS-05407000\",\n", + " parameter_code=[\"00060\", \"00045\"], # discharge + precipitation\n", + " time=\"2023-06-01/2023-07-01\",\n", + ")\n", + "print(\"value long_name:\", multi[\"value\"].attrs.get(\"long_name\"))\n", + "print(\"per-instance parameter_code:\", multi[\"parameter_code\"].values)\n", + "print(\"per-instance unit_of_measure:\", multi[\"unit_of_measure\"].values)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In the `dense=True` form each parameter instead becomes its own named variable\n", + "(`discharge`, `precipitation`, …) on the shared time grid." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Water-quality samples\n", + "\n", + "`get_samples` returns discrete water-quality results in the same ragged shape:\n", + "one instance per `(monitoring location, characteristic, sample fraction)`, with\n", + "the result value plus `detection_condition` and `status` as ancillary\n", + "(censoring) variables. Characteristics are free text, so no CF `standard_name`\n", + "is inferred and non-numeric results coerce to NaN (the `detection_condition`\n", + "variable preserves non-detects)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "wq = wdx.get_samples(\n", + " service=\"results\",\n", + " profile=\"basicphyschem\",\n", + " monitoringLocationIdentifier=\"USGS-05406500\",\n", + " activityStartDateLower=\"2019-01-01\",\n", + " activityStartDateUpper=\"2020-01-01\",\n", + ")\n", + "print(\"dims:\", dict(wq.sizes))\n", + "print(\"characteristics:\", sorted(set(wq[\"characteristic\"].values))[:8], \"...\")\n", + "print(\"ancillary:\", wq[\"value\"].attrs.get(\"ancillary_variables\"))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Writing CF netCDF\n", + "\n", + "Because the dataset already carries CF metadata in the standard ragged-array\n", + "encoding, it serializes straight to a self-describing netCDF file that\n", + "CF-aware tools (THREDDS, `cf-xarray`, Panoply, …) can read back (requires a\n", + "netCDF backend, e.g. `pip install netCDF4`):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ragged.to_netcdf(\"daily_discharge.nc\") # CF-1.11 contiguous ragged array" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## References\n", + "\n", + "* CF conventions — [Discrete Sampling Geometries](https://cfconventions.org/cf-conventions/cf-conventions.html#discrete-sampling-geometries)\n", + " (contiguous ragged array representation)\n", + "* [`cf-xarray`](https://cf-xarray.readthedocs.io/) — decode/group CF DSG datasets\n", + "* [`xarray`](https://docs.xarray.dev/) documentation" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/source/examples/index.rst b/docs/source/examples/index.rst index 6011fc4b..369aff01 100644 --- a/docs/source/examples/index.rst +++ b/docs/source/examples/index.rst @@ -15,6 +15,18 @@ covers a basic introduction to module functions and usage. WaterData_demo +CF-conventions ``xarray`` datasets from the ``waterdata`` module +---------------------------------------------------------------- +The ``waterdata.xarray`` wrappers return CF-conventions ``xarray.Dataset`` +objects (a ragged array by default, with a ``dense=True`` gridded opt-out). +This notebook demonstrates the layouts, the time-selection trade-off, and +writing CF netCDF. + +.. toctree:: + :maxdepth: 1 + + waterdata_xarray_demo + Simple uses of the ``dataretrieval`` package -------------------------------------------- diff --git a/docs/source/examples/waterdata_xarray_demo.nblink b/docs/source/examples/waterdata_xarray_demo.nblink new file mode 100644 index 00000000..9c886545 --- /dev/null +++ b/docs/source/examples/waterdata_xarray_demo.nblink @@ -0,0 +1,3 @@ +{ + "path": "../../../demos/waterdata_xarray_demo.ipynb" +} diff --git a/pyproject.toml b/pyproject.toml index 65b1ae68..2ffc823a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,9 @@ doc = [ nldi = [ 'geopandas>=0.10' ] +xarray = [ + 'xarray>=2023.1.0' +] [project.urls] homepage = "https://github.com/DOI-USGS/dataretrieval-python" diff --git a/tests/waterdata_xarray_test.py b/tests/waterdata_xarray_test.py new file mode 100644 index 00000000..19b20087 --- /dev/null +++ b/tests/waterdata_xarray_test.py @@ -0,0 +1,719 @@ +"""Offline unit tests for dataretrieval.waterdata.xarray converters. + +These exercise the pure DataFrame -> xarray.Dataset converters with synthetic +frames, so they run without network access. Live end-to-end behavior is +covered by the waterdata getters' own tests. +""" + +from types import SimpleNamespace + +import pandas as pd +import pytest + +xr = pytest.importorskip("xarray") +from dataretrieval.waterdata import xarray as wdx # noqa: E402 + + +def _meta(url="https://example.test/items"): + return SimpleNamespace(url=url) + + +def _daily_frame( + time_series_id="A", + site="USGS-1", + values=(100, 110), + times=("2024-06-01", "2024-06-02"), +): + n = len(values) + return pd.DataFrame( + { + "time": list(times), + "value": list(values), + "monitoring_location_id": [site] * n, + "parameter_code": ["00060"] * n, + "statistic_id": ["00003"] * n, + "unit_of_measure": ["ft^3/s"] * n, + "qualifier": [None] * n, + "approval_status": ["Approved"] * n, + "time_series_id": [time_series_id] * n, + } + ) + + +# series_meta is keyed by parameter_code and supplies only the readable name; +# units/statistic/parameter_code come from the values frame itself. +_DISCHARGE_META = { + "00060": { + "parameter_name": "Discharge", + "parameter_description": "Discharge, cubic feet per second", + } +} + + +def test_build_timeseries_cf_attributes(): + ds = wdx._build_dense( + _daily_frame(), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert isinstance(ds, xr.Dataset) + assert "discharge" in ds.data_vars + v = ds["discharge"] + assert v.attrs["long_name"] == "Discharge, cubic feet per second" + assert v.attrs["units"] == "ft3 s-1" # UDUNITS-normalized from "ft^3/s" + assert v.attrs["cell_methods"] == "time: mean" + assert v.attrs["standard_name"] == "water_volume_transport_in_river_channel" + assert v.attrs["usgs_parameter_code"] == "00060" + assert v.attrs["usgs_statistic_id"] == "00003" + # dataset-level provenance + assert ds.attrs["Conventions"] == "CF-1.11" + assert ds.attrs["featureType"] == "timeSeries" + assert ds.attrs["references"] == "https://example.test/items" + # site is the DSG instance dimension + assert ds["monitoring_location_id"].attrs.get("cf_role") == "timeseries_id" + assert ds.sizes["time"] == 2 + + +def test_ancillary_variables_linked(): + ds = wdx._build_dense( + _daily_frame(), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert "discharge_qualifier" in ds.data_vars + assert "discharge_approval_status" in ds.data_vars + assert ds["discharge"].attrs["ancillary_variables"] == ( + "discharge_qualifier discharge_approval_status" + ) + + +def test_unknown_unit_passes_through_verbatim(): + df = _daily_frame() + df["unit_of_measure"] = "widgets/s" # units are read from the frame + ds = wdx._build_dense(df, _meta(), service="daily", series_meta=_DISCHARGE_META) + assert ds["discharge"].attrs["units"] == "widgets/s" + + +def test_missing_standard_name_is_omitted(): + # parameter_code with no curated CF mapping -> no standard_name key + meta = {"99999": {"parameter_name": "Mystery", "parameter_description": "Mystery"}} + df = _daily_frame() + df["parameter_code"] = "99999" + ds = wdx._build_dense(df, _meta(), service="daily", series_meta=meta) + assert "standard_name" not in ds["mystery"].attrs + assert ds["mystery"].attrs["usgs_parameter_code"] == "99999" + + +def test_vertical_datum_distinguishes_stage_parameters(): + # 00065 (gage height) and 63160 (water level above NAVD88) share the CF + # standard_name water_surface_height_above_reference_datum; the vertical_datum + # attribute records the differing reference datum so they're distinguishable. + for pcode, datum in (("00065", "local site datum"), ("63160", "NAVD88")): + df = _daily_frame() + df["parameter_code"] = pcode + ds = wdx._build_ragged(df, _meta(), service="continuous", series_meta={}) + v = ds["value"] + assert v.attrs["standard_name"] == "water_surface_height_above_reference_datum" + assert v.attrs["vertical_datum"] == datum + # a parameter with no datum mapping gets no vertical_datum attr + ds = wdx._build_ragged( + _daily_frame(), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert "vertical_datum" not in ds["value"].attrs + + +def test_multiple_parameters_outer_join_on_time(): + # discharge at t1,t2 ; temperature at t2,t3 -> union time, NaN fill + q = _daily_frame(values=(100, 110), times=("2024-06-01", "2024-06-02")) + t = pd.DataFrame( + { + "time": ["2024-06-02", "2024-06-03"], + "value": [18.0, 19.0], + "monitoring_location_id": ["USGS-1", "USGS-1"], + "parameter_code": ["00010", "00010"], + "statistic_id": ["00011", "00011"], + "unit_of_measure": ["degC", "degC"], + "qualifier": [None, None], + "approval_status": ["Approved", "Approved"], + "time_series_id": ["B", "B"], + } + ) + meta = dict(_DISCHARGE_META) + meta["00010"] = { + "parameter_name": "Temperature, water", + "parameter_description": "Temperature, water, degrees Celsius", + } + ds = wdx._build_dense( + pd.concat([q, t]), _meta(), service="continuous", series_meta=meta + ) + assert {"discharge", "temperature_water"} <= set(ds.data_vars) + assert ds.sizes["time"] == 3 # union of {t1,t2,t3} + # temperature has no value at t1 -> NaN + assert pd.isna(ds["temperature_water"].sel(time="2024-06-01").item()) + # cell_methods derived from statistic_id 00011 (instantaneous) -> point + assert ds["temperature_water"].attrs["cell_methods"] == "time: point" + + +def test_collision_dedups_with_warning(): + # two values for the same (site, parameter, statistic, time) are ambiguous + # without the hash key -> keep the first and warn; site stays the dim. + a = _daily_frame(values=(100,), times=("2024-06-01",)) + b = _daily_frame(values=(200,), times=("2024-06-01",)) + with pytest.warns(UserWarning, match="multiple values per"): + ds = wdx._build_dense( + pd.concat([a, b]), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert "monitoring_location_id" in ds.dims + assert ds.sizes["time"] == 1 + assert ( + ds["discharge"].sel(monitoring_location_id="USGS-1", time="2024-06-01").item() + == 100 + ) + + +def test_empty_frame_returns_dataset_with_conventions(): + ds = wdx._build_dense(pd.DataFrame(), _meta(), service="daily", series_meta={}) + assert isinstance(ds, xr.Dataset) + assert list(ds.data_vars) == [] + assert ds.attrs["Conventions"] == "CF-1.11" + + +def test_build_stats_flat_dataset(): + df = pd.DataFrame( + { + "monitoring_location_id": ["USGS-1", "USGS-1"], + "parameter_code": ["00060", "00060"], + "month": [1, 2], + "p50_va": [120.0, 130.0], + } + ) + ds = wdx._build_stats(df, _meta(), "statistics") + assert isinstance(ds, xr.Dataset) + assert "p50_va" in ds.data_vars + assert ds.attrs["Conventions"] == "CF-1.11" + + +def test_build_stats_drops_hash_columns(): + # The plain getters no longer drop hash IDs, so the flat stats builder must + # drop the stats service's per-record / per-series UUIDs itself to keep the + # CF dataset free of opaque coordinates. + df = pd.DataFrame( + { + "monitoring_location_id": ["USGS-1"], + "parameter_code": ["00060"], + "computation_id": ["7d70379f-8452-44cd-b026-24dfa11f8503"], + "parent_time_series_id": ["9cca880dec4846ec8cbdd05f3e22603e"], + "time_series_id": ["b026-24dfa11f8503"], + "p50_va": [120.0], + } + ) + ds = wdx._build_stats(df, _meta(), "statistics") + for hash_col in ("computation_id", "parent_time_series_id", "time_series_id"): + assert hash_col not in ds.variables + assert "p50_va" in ds.data_vars + + +def test_ragged_omits_hash_columns(): + # The synthetic daily frame carries a time_series_id hash column; the ragged + # builder whitelists the columns it converts, so the hash never surfaces in + # the dataset (the timeseries path stays hash-free without the getter's + # help). + ds = wdx._build_ragged( + _daily_frame(), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert "time_series_id" not in ds.variables + + +def test_public_wrappers_exist_and_are_documented(): + for name in wdx.__all__: + fn = getattr(wdx, name) + assert callable(fn) + # _make_getter carries the wrapped getter's name through, so the public + # symbol, its __name__, and the docstring reference all agree. + assert fn.__name__ == name + assert "xarray wrapper" in (fn.__doc__ or "") + assert f"dataretrieval.waterdata.{name}" in (fn.__doc__ or "") + + +def test_fetch_strips_include_hash(): + # The xarray path never surfaces hash columns, so include_hash must be + # dropped before the underlying getter is called (no wasted fetch). + captured = {} + + def fake_getter(*args, **kwargs): + captured.update(kwargs) + return "df", "meta" + + df, meta = wdx._fetch( + fake_getter, (), {"include_hash": True, "parameter_code": "00060"} + ) + assert (df, meta) == ("df", "meta") + assert "include_hash" not in captured + assert captured == {"parameter_code": "00060"} + + +def test_every_wrapper_routes_through_fetch(monkeypatch): + # Pin the wiring: each public wrapper must delegate the underlying fetch to + # _fetch (which is what strips include_hash). Guards against a wrapper + # quietly reverting to calling the getter directly and leaking the flag. + seen = [] + + def spy(func, args, kwargs): + seen.append(dict(kwargs)) + return pd.DataFrame(), SimpleNamespace(url=None) # empty -> no network + + monkeypatch.setattr(wdx, "_fetch", spy) + for name in wdx.__all__: + seen.clear() + ds = getattr(wdx, name)(monitoring_location_id="USGS-1", include_hash=True) + assert isinstance(ds, xr.Dataset) + assert len(seen) == 1, f"{name} did not route through _fetch" + # the wrapper hands include_hash to _fetch; _fetch is what drops it + # (asserted in test_fetch_strips_include_hash) + assert seen[0].get("include_hash") is True + + +def test_unparseable_time_dropped_with_warning(): + # A bad/missing time must be dropped explicitly (with a warning), not + # silently swallowed by to_xarray. + df = _daily_frame( + values=(100, 110, 120), + times=("2024-06-01", "not-a-date", "2024-06-03"), + ) + with pytest.warns(UserWarning, match="unparseable or missing time"): + ds = wdx._build_dense(df, _meta(), service="daily", series_meta=_DISCHARGE_META) + assert ds.sizes["time"] == 2 # the bad-time row is gone, the good ones stay + assert 110 not in ds["discharge"].values # the dropped value did not survive + + +def test_all_unparseable_time_returns_empty_dataset(): + df = _daily_frame(values=(1, 2), times=("bad-a", "bad-b")) + with pytest.warns(UserWarning, match="unparseable or missing time"): + ds = wdx._build_dense(df, _meta(), service="daily", series_meta=_DISCHARGE_META) + assert list(ds.data_vars) == [] + assert ds.attrs["Conventions"] == "CF-1.11" + + +def test_mixed_units_in_one_variable_warns(): + # Same (parameter, statistic) across two sites but different units -> one + # variable can hold only one units attr; warn instead of silently mislabeling. + a = _daily_frame(site="USGS-1", values=(100,), times=("2024-06-01",)) + b = _daily_frame(site="USGS-2", values=(3,), times=("2024-06-01",)) + b["unit_of_measure"] = "m3 s-1" + with pytest.warns(UserWarning, match="spans multiple units"): + ds = wdx._build_dense( + pd.concat([a, b]), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert ds.sizes["monitoring_location_id"] == 2 + assert ds["discharge"].attrs["units"] == "ft3 s-1" # first unit (ft^3/s) + + +def test_point_coords_from_list_geometry(): + # Without geopandas the OGC geometry is a plain [lon, lat] list, not a + # shapely Point. lon/lat must still be extracted (regression: the old + # ``.x``/``.y`` access raised AttributeError and silently dropped them). + df = _daily_frame() + df["geometry"] = [[-90.44, 43.19]] * len(df) + ds = wdx._build_dense(df, _meta(), service="daily", series_meta=_DISCHARGE_META) + assert "longitude" in ds.coords and "latitude" in ds.coords + assert ds["longitude"].sel(monitoring_location_id="USGS-1").item() == -90.44 + assert ds["latitude"].sel(monitoring_location_id="USGS-1").item() == 43.19 + assert ds["longitude"].attrs["units"] == "degrees_east" + assert ds["latitude"].attrs["standard_name"] == "latitude" + + +def test_point_coords_from_pointlike_geometry(): + # The shapely-Point path (geopandas installed) still works: any object + # exposing .x/.y is read directly. + df = _daily_frame() + df["geometry"] = [SimpleNamespace(x=-90.44, y=43.19)] * len(df) + ds = wdx._build_dense(df, _meta(), service="daily", series_meta=_DISCHARGE_META) + assert ds["longitude"].sel(monitoring_location_id="USGS-1").item() == -90.44 + assert ds["latitude"].sel(monitoring_location_id="USGS-1").item() == 43.19 + + +def test_non_point_geometry_skipped(): + # A non-point geometry (no .x/.y, not a [lon, lat] pair) is skipped, not + # guessed -- no lon/lat coords are added. + df = _daily_frame() + df["geometry"] = [object()] * len(df) + ds = wdx._build_dense(df, _meta(), service="daily", series_meta=_DISCHARGE_META) + assert "longitude" not in ds.coords and "latitude" not in ds.coords + + +def test_date_modified_from_last_modified(): + # The newest last_modified becomes the dataset-level date_modified (ACDD). + df = _daily_frame() + df["last_modified"] = ["2024-06-01T00:00:00Z", "2024-06-10T12:00:00Z"] + ds = wdx._build_dense(df, _meta(), service="daily", series_meta=_DISCHARGE_META) + assert ds.attrs["date_modified"].startswith("2024-06-10") + + +def test_no_date_modified_without_last_modified(): + # The default frame has no last_modified column -> no date_modified attr. + ds = wdx._build_dense( + _daily_frame(), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert "date_modified" not in ds.attrs + + +def test_site_metadata_coordinates(): + # HUC / state_name ride along on the metadata call and become + # instance-indexed auxiliary coordinates. + site_meta = { + "USGS-1": {"hydrologic_unit_code": "07070005", "state_name": "Wisconsin"} + } + ds = wdx._build_dense( + _daily_frame(), + _meta(), + service="daily", + series_meta=_DISCHARGE_META, + site_meta=site_meta, + ) + huc = ds["hydrologic_unit_code"].sel(monitoring_location_id="USGS-1").item() + assert huc == "07070005" + assert ds["state_name"].sel(monitoring_location_id="USGS-1").item() == "Wisconsin" + assert ds["hydrologic_unit_code"].attrs["long_name"] == "hydrologic unit code (HUC)" + + +def test_site_metadata_absent_adds_no_coords(): + # No site_meta -> no HUC/state coords (back-compat with the old signature). + ds = wdx._build_dense( + _daily_frame(), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert "hydrologic_unit_code" not in ds.coords + assert "state_name" not in ds.coords + + +# --- ragged layout (the default) ------------------------------------------- + + +def test_ragged_structure_and_cf_attrs(): + # Single (site, parameter, statistic) -> one instance; value attrs are set + # because the descriptors are homogeneous across the (one) instance. + ds = wdx._build_ragged( + _daily_frame(), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert set(ds.sizes) == {"obs", "timeseries"} + assert ds.sizes == {"obs": 2, "timeseries": 1} + assert ds["value"].dims == ("obs",) + assert list(ds["value"].values) == [100, 110] + assert ds["row_size"].dims == ("timeseries",) + assert int(ds["row_size"][0]) == 2 + assert ds["row_size"].attrs["sample_dimension"] == "obs" + # per-instance identity + cf_role on the synthesized timeseries_id + assert ds["monitoring_location_id"].dims == ("timeseries",) + assert ds["timeseries_id"].attrs["cf_role"] == "timeseries_id" + assert ds["timeseries_id"].values[0] == "USGS-1:00060:00003" + # homogeneous descriptors land on value + v = ds["value"] + assert v.attrs["long_name"] == "Discharge, cubic feet per second" + assert v.attrs["units"] == "ft3 s-1" + assert v.attrs["cell_methods"] == "time: mean" + assert v.attrs["standard_name"] == "water_volume_transport_in_river_channel" + assert v.attrs["ancillary_variables"] == "qualifier approval_status" + assert ds.attrs["featureType"] == "timeSeries" + # ancillary flags are per-observation; metadata is per-instance + assert ds["qualifier"].dims == ("obs",) + assert ds["approval_status"].dims == ("obs",) + assert ds["parameter_code"].dims == ("timeseries",) + assert ds["parameter_code"].values.tolist() == ["00060"] + assert ds["statistic_id"].values.tolist() == ["00003"] + assert ds["unit_of_measure"].values.tolist() == ["ft^3/s"] + assert ds["parameter_code"].attrs["long_name"] == "USGS parameter code" + + +def test_ragged_stores_only_real_observations(): + # Two sites of very different length: obs == sum of lengths, no NaN fill. + a = _daily_frame( + site="USGS-A", + values=(1, 2, 3), + times=("2024-06-01", "2024-06-02", "2024-06-03"), + ) + b = _daily_frame(site="USGS-B", values=(9,), times=("2024-06-03",)) + ds = wdx._build_ragged( + pd.concat([a, b]), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + assert ds.sizes == {"obs": 4, "timeseries": 2} # 3 + 1, not 2 x 3 grid + assert not pd.isna(ds["value"].values).any() + assert sorted(ds["row_size"].values.tolist()) == [1, 3] + + +def test_ragged_mixed_parameters_value_is_generic(): + # Mixed parameters/units -> value carries no single units/standard_name; + # the per-instance parameter_code coordinate disambiguates. + q = _daily_frame(values=(100, 110), times=("2024-06-01", "2024-06-02")) + t = pd.DataFrame( + { + "time": ["2024-06-02", "2024-06-03"], + "value": [18.0, 19.0], + "monitoring_location_id": ["USGS-1", "USGS-1"], + "parameter_code": ["00010", "00010"], + "statistic_id": ["00011", "00011"], + "unit_of_measure": ["degC", "degC"], + "qualifier": [None, None], + "approval_status": ["Approved", "Approved"], + "time_series_id": ["B", "B"], + } + ) + meta = dict(_DISCHARGE_META) + meta["00010"] = { + "parameter_name": "Temperature, water", + "parameter_description": "Temperature, water, degrees Celsius", + } + ds = wdx._build_ragged( + pd.concat([q, t]), _meta(), service="continuous", series_meta=meta + ) + assert ds.sizes == {"obs": 4, "timeseries": 2} + assert ds["value"].attrs["long_name"] == "measured value" + assert "units" not in ds["value"].attrs # ft^3/s vs degC -> not homogeneous + assert "standard_name" not in ds["value"].attrs + assert set(ds["parameter_code"].values) == {"00060", "00010"} + + +def test_ragged_keeps_duplicate_observations_without_warning(): + # The dense path warns + dedups colliding (site, time); ragged just keeps + # both observations -- no grid, so no ambiguity. Assert specifically that + # neither dense-path diagnostic fires (not "no warning at all", so an + # unrelated pandas warning can't fail the test for the wrong reason). + a = _daily_frame(values=(100,), times=("2024-06-01",)) + b = _daily_frame(values=(200,), times=("2024-06-01",)) + import warnings as _w + + with _w.catch_warnings(record=True) as caught: + _w.simplefilter("always") + ds = wdx._build_ragged( + pd.concat([a, b]), _meta(), service="daily", series_meta=_DISCHARGE_META + ) + msgs = [str(w.message) for w in caught] + assert not any( + "multiple values per" in m or "spans multiple units" in m for m in msgs + ), msgs + assert ds.sizes["obs"] == 2 + assert sorted(ds["value"].values.tolist()) == [100, 200] + + +def test_ragged_lonlat_and_date_modified_per_instance(): + df = _daily_frame() + df["geometry"] = [[-90.44, 43.19]] * len(df) + df["last_modified"] = ["2024-06-01T00:00:00Z", "2024-06-10T12:00:00Z"] + ds = wdx._build_ragged(df, _meta(), service="daily", series_meta=_DISCHARGE_META) + assert ds["longitude"].dims == ("timeseries",) + assert float(ds["longitude"][0]) == -90.44 + assert float(ds["latitude"][0]) == 43.19 + assert ds.attrs["date_modified"].startswith("2024-06-10") + + +def _instance_blocks(ds): + """Map each instance's (parameter, statistic) -> (time-sorted values, unit) + by walking the row_size offsets -- the reader's view of a ragged array.""" + starts, acc = [], 0 + for n in ds["row_size"].values.tolist(): + starts.append(acc) + acc += int(n) + starts.append(acc) + blocks = {} + for i in range(ds.sizes["timeseries"]): + sl = slice(starts[i], starts[i + 1]) + key = (str(ds["parameter_code"].values[i]), str(ds["statistic_id"].values[i])) + blocks[key] = ( + ds["value"].values[sl].tolist(), + str(ds["unit_of_measure"].values[i]), + ) + return blocks + + +def test_ragged_alignment_survives_interleaved_input(): + # The critical invariant: row_size, the contiguous obs blocks, and the + # per-instance metadata must all stay aligned even when the input rows are + # interleaved across instances and out of time order. A regression in the + # sort/group ordering would silently map values to the wrong series. + cols = [ + "monitoring_location_id", + "parameter_code", + "statistic_id", + "unit_of_measure", + "time", + "value", + ] + # Values are deliberately NON-monotonic with time within each instance, so + # a regression that ordered obs by value (instead of time) would produce a + # different sequence and fail -- "sorted values" alone wouldn't catch that. + rows = [ + ("USGS-1", "00060", "00003", "ft^3/s", "2024-06-02", 100), # later, smaller + ("USGS-1", "00010", "00011", "degC", "2024-06-01", 19), + ("USGS-1", "00060", "00001", "ft^3/s", "2024-06-03", 500), + ("USGS-1", "00060", "00003", "ft^3/s", "2024-06-01", 150), # earlier, larger + ("USGS-1", "00010", "00011", "degC", "2024-06-03", 18), + ("USGS-1", "00060", "00001", "ft^3/s", "2024-06-01", 480), + ] + df = pd.DataFrame(rows, columns=cols) + ds = wdx._build_ragged(df, _meta(), service="daily", series_meta={}) + + assert ds.sizes == {"obs": 6, "timeseries": 3} + assert int(ds["row_size"].sum()) == ds.sizes["obs"] + blocks = _instance_blocks(ds) + # each instance's values + unit are the ones that belong to it, in TIME order + # (not value order: 150 precedes 100, and 19 precedes 18) + assert blocks[("00060", "00003")] == ([150, 100], "ft^3/s") + assert blocks[("00060", "00001")] == ([480, 500], "ft^3/s") + assert blocks[("00010", "00011")] == ([19, 18], "degC") + + +def test_ragged_field_schema_without_statistic(): + # The field-measurements schema groups by parameter_code only (no + # statistic_id): the instance has no statistic segment, and the service + # default cell method fills in. + df = pd.DataFrame( + { + "monitoring_location_id": ["USGS-1", "USGS-1"], + "parameter_code": ["00060", "00060"], + "time": ["2024-06-01", "2024-06-02"], + "value": [100, 110], + "unit_of_measure": ["ft^3/s", "ft^3/s"], + } + ) + ds = wdx._build_ragged( + df, + _meta(), + service="field-measurements", + schema=wdx._FIELD, + series_meta=_DISCHARGE_META, + default_cell_method="point", + ) + assert ds.sizes == {"obs": 2, "timeseries": 1} + assert "statistic_id" not in ds.coords + assert ds["timeseries_id"].values[0] == "USGS-1:00060" # no stat segment + assert ds["value"].attrs["cell_methods"] == "time: point" + assert ds["value"].attrs["long_name"] == "Discharge, cubic feet per second" + + +# --- dense opt-out wiring --------------------------------------------------- + + +def test_wrapper_defaults_to_ragged_and_dense_opt_out(monkeypatch): + # Default wrapper output is ragged; dense=True returns the gridded layout. + monkeypatch.setattr(wdx, "_fetch", lambda func, a, k: (_daily_frame(), _meta())) + monkeypatch.setattr( + wdx, "_timeseries_metadata", lambda sites: (_DISCHARGE_META, {}) + ) + + ragged = wdx.get_daily(monitoring_location_id="USGS-1") + assert "obs" in ragged.sizes and "value" in ragged.data_vars + assert "discharge" not in ragged.data_vars + + dense = wdx.get_daily(monitoring_location_id="USGS-1", dense=True) + assert "discharge" in dense.data_vars + assert "obs" not in dense.sizes + + +# --- water-quality samples -------------------------------------------------- + + +def _samples_frame(characteristics=("Temperature, water",), units=("deg C",)): + rows = [] + for ch, u in zip(characteristics, units): + rows.append( + { + "Location_Identifier": "USGS-1", + "Activity_StartDateTime": "2020-07-10T12:00:00Z", + "Result_Characteristic": ch, + "Result_SampleFraction": "Total", + "Result_Measure": 12.5, + "Result_MeasureUnit": u, + "Result_ResultDetectionCondition": None, + "Result_MeasureStatusIdentifier": "Provisional", + } + ) + return pd.DataFrame(rows) + + +def _samples_ds(frame): + """Build a samples ragged Dataset the way the get_samples service does.""" + return wdx._build_ragged( + frame, + _meta(), + service="samples", + schema=wdx._SAMPLES, + default_cell_method="point", + ) + + +def test_build_samples_single_characteristic(): + ds = _samples_ds(_samples_frame()) + assert set(ds.sizes) == {"obs", "timeseries"} + assert ds["value"].dims == ("obs",) + assert ds["characteristic"].values[0] == "Temperature, water" + assert ds["value"].attrs["long_name"] == "Temperature, water" + assert ds["value"].attrs["cell_methods"] == "time: point" + # censoring columns are ancillary, linked from value + assert "detection_condition" in ds.variables and "status" in ds.variables + assert ds["value"].attrs["ancillary_variables"] == "detection_condition status" + assert ds["timeseries_id"].attrs["cf_role"] == "timeseries_id" + + +def test_build_samples_mixed_characteristics_generic_value(): + ds = _samples_ds( + _samples_frame(("Temperature, water", "pH"), ("deg C", "std units")) + ) + assert ds.sizes["timeseries"] == 2 + assert ds["value"].attrs["long_name"] == "measured value" + assert set(ds["characteristic"].values) == {"Temperature, water", "pH"} + + +def test_get_samples_wrapper_builds_ragged(monkeypatch): + monkeypatch.setattr(wdx, "_fetch", lambda func, a, k: (_samples_frame(), _meta())) + ds = wdx.get_samples(monitoringLocationIdentifier="USGS-1", include_hash=True) + assert "obs" in ds.sizes and "value" in ds.data_vars + assert ds["characteristic"].values[0] == "Temperature, water" + + +def test_prepare_values_missing_required_column_returns_empty(): + # A frame lacking a mandatory column (here: no value) must degrade to an + # empty Dataset, not raise KeyError from the column slim. + df = _daily_frame().drop(columns=["value"]) + ds = wdx._build_ragged(df, _meta(), service="daily", series_meta=_DISCHARGE_META) + assert isinstance(ds, xr.Dataset) + assert list(ds.data_vars) == [] + assert ds.attrs["Conventions"] == "CF-1.11" + + +def test_build_samples_missing_value_returns_empty(): + # A non-result Samples profile (no Result_Measure -> no "value") must not + # crash; it has nothing to convert. + df = pd.DataFrame( + { + "Location_Identifier": ["USGS-1"], + "Activity_StartDateTime": ["2020-07-10T12:00:00Z"], + "Result_Characteristic": ["pH"], + } + ) + ds = _samples_ds(df) + assert list(ds.data_vars) == [] + assert ds.attrs["Conventions"] == "CF-1.11" + + +def test_get_samples_ignores_dense_with_warning(monkeypatch): + # dense=True is advertised generically; get_samples is always ragged, so it + # must accept-and-ignore (with a warning) rather than leak dense= to the + # underlying getter (which would TypeError). + monkeypatch.setattr(wdx, "_fetch", lambda func, a, k: (_samples_frame(), _meta())) + with pytest.warns(UserWarning, match="no dense layout"): + ds = wdx.get_samples(monitoringLocationIdentifier="USGS-1", dense=True) + assert "obs" in ds.sizes # still ragged + + +def test_get_stats_ignores_dense_with_warning(monkeypatch): + # The stats layout has no dense grid either; dense=True is ignored with the + # same warning (consistent with samples), not silently swallowed. + frame = pd.DataFrame({"monitoring_location_id": ["USGS-1"], "p50_va": [120.0]}) + monkeypatch.setattr(wdx, "_fetch", lambda func, a, k: (frame, _meta())) + with pytest.warns(UserWarning, match="no dense layout"): + ds = wdx.get_stats_por(monitoring_location_id="USGS-1", dense=True) + assert isinstance(ds, xr.Dataset) + + +def test_timeseries_id_skips_missing_instance_key(): + # A NaN instance key (e.g. a characteristic with no sample fraction) must + # not render as a literal "nan" token in the cf_role timeseries_id. + df = _samples_frame() + df["Result_SampleFraction"] = None + ds = _samples_ds(df) + assert all("nan" not in tid for tid in ds["timeseries_id"].values) + assert ds["timeseries_id"].values[0] == "USGS-1:Temperature, water"