Add initial CENACE pipeline integration#4
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an initial end-to-end CENACE dataset integration to the existing forecasting/evaluation codebase, including data partitioning, simple baseline models, and a dataset-dispatched pipeline entrypoint.
Changes:
- Introduces a CENACE dataset pipeline (aggregate → forecast → evaluate) and wires it into a dataset registry + top-level CLI entrypoint.
- Adds CENACE hourly partition builder + DuckDB-based loader (
CENACEData) for training windows and forecast actuals. - Implements initial CENACE models (naive baselines + AutoARIMA adapter) and a simple metric suite (MAE/RMSE/sMAPE).
Reviewed changes
Copilot reviewed 10 out of 14 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/pipeline.py | Adds a top-level CLI to run a dataset pipeline via dataset_registry. |
| src/dataset_registry.py | Registers the new cenace dataset runner and exposes choices for CLI. |
| src/cenace_pipeline.py | Orchestrates CENACE aggregation, forecasting, and evaluation into one pipeline call. |
| src/data/cenace/config.py | Defines CENACE data/forecast/eval directory layout. |
| src/data/cenace/aggregate/core.py | Builds daily parquet partitions from the processed CENACE CSV. |
| src/data/cenace/utils/cenace_data.py | Adds a DuckDB-backed loader for training windows and horizon actuals. |
| src/forecast/cenace/models.py | Adds baseline models and an AutoARIMA adapter emitting y_hat. |
| src/forecast/cenace/core.py | Adds forecast runner that loads training data and writes partitioned forecasts. |
| src/evaluation/cenace/metrics.py | Adds MAE/RMSE/sMAPE and per-series/overall aggregation. |
| src/evaluation/cenace/core.py | Adds evaluation runner that merges forecasts with actuals and writes metrics. |
| src/forecast/cenace/init.py | Package marker for CENACE forecasting modules. |
| src/evaluation/cenace/init.py | Package marker for CENACE evaluation modules. |
| src/data/cenace/init.py | Package marker for CENACE data modules. |
| src/data/cenace/transform/core.py | (Empty in diff) Placeholder for future transform logic. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| out = [] | ||
| for ds in future_ds: | ||
| tmp = last_values.copy() | ||
| tmp["ds"] = ds | ||
| out.append(tmp) | ||
|
|
||
| fcst = pd.concat(out, ignore_index=True) |
There was a problem hiding this comment.
naive_last_value builds the forecast by copying last_values once per step and concatenating (O(h) DataFrame copies + concat). This can become a bottleneck for many series or large horizons. Prefer a vectorized Cartesian product (e.g., repeat last_values for h rows per id and assign ds via np.tile / merge on a future index) to reduce memory churn.
| out = [] | |
| for ds in future_ds: | |
| tmp = last_values.copy() | |
| tmp["ds"] = ds | |
| out.append(tmp) | |
| fcst = pd.concat(out, ignore_index=True) | |
| fcst = last_values.loc[last_values.index.repeat(h)].copy() | |
| fcst["ds"] = list(future_ds) * len(last_values) |
| def run_cenace_pipeline( | ||
| cutoff: str, | ||
| model: str, | ||
| h: int = 24, | ||
| max_window_size: int = 48, | ||
| skip_aggregate: bool = False, | ||
| ) -> tuple[str, str]: | ||
| try: | ||
| cutoff_ts = pd.Timestamp(cutoff) | ||
| except Exception as exc: | ||
| raise ValueError(f"Invalid cutoff timestamp: {cutoff}") from exc | ||
|
|
||
| if not skip_aggregate: | ||
| n_written = build_hourly_partitions() | ||
| print(f"Aggregated {n_written} partitions") | ||
|
|
||
| forecast_path = run_forecast( | ||
| cutoff=cutoff_ts, | ||
| model=model, | ||
| h=h, | ||
| max_window_size=max_window_size, | ||
| ) | ||
| print(f"Forecasts saved to: {forecast_path}") | ||
|
|
||
| eval_path = run_evaluation( | ||
| cutoff=cutoff_ts, | ||
| model=model, | ||
| h=h, | ||
| max_window_size=max_window_size, | ||
| ) | ||
| print(f"Metrics saved to: {eval_path}") | ||
|
|
||
| return str(forecast_path), str(eval_path) |
There was a problem hiding this comment.
This PR introduces a new dataset pipeline (aggregation → forecasting → evaluation) but adds no tests. The repo already has pytest coverage for analogous GH Archive components (data loaders/partition logic/aggregation), so adding at least unit tests for CENACEData.get_df/get_actuals (partition selection + DuckDB query), baseline models output shape/columns, and evaluate_forecasts would help prevent regressions.
| def evaluate_forecasts(merged: pd.DataFrame) -> pd.DataFrame: | ||
| per_uid = ( | ||
| merged.groupby("unique_id", as_index=False) | ||
| .apply( | ||
| lambda g: pd.Series( | ||
| { | ||
| "mae": mae(g["y"], g["y_hat"]), | ||
| "rmse": rmse(g["y"], g["y_hat"]), | ||
| "smape": smape(g["y"], g["y_hat"]), | ||
| } | ||
| ) | ||
| ) | ||
| .reset_index(drop=True) | ||
| ) |
There was a problem hiding this comment.
evaluate_forecasts groups by unique_id and then calls .apply(...).reset_index(drop=True). With pandas groupby.apply, the group key is often placed in the index; dropping the index here can remove unique_id entirely from the returned DataFrame, producing metrics with no identifier. Prefer groupby('unique_id').agg(...) or reset_index() (without drop=True) to guarantee unique_id is preserved.
| paths = self._paths_for_range(start, cutoff) | ||
|
|
||
| query = f""" | ||
| SELECT unique_id, ds, y | ||
| FROM read_parquet({paths}) | ||
| WHERE ds >= TIMESTAMP '{start}' | ||
| AND ds <= TIMESTAMP '{cutoff}' | ||
| """ | ||
|
|
||
| df = duckdb.sql(query).df() | ||
| df["ds"] = pd.to_datetime(df["ds"]) |
There was a problem hiding this comment.
DuckDB parquet reads are built via duckdb.sql(query) with read_parquet({paths}), where {paths} is a Python list repr. This is brittle (path escaping/backslashes on Windows, quoting, and no explicit INSTALL/LOAD parquet like other modules) and can break unexpectedly. Build a connection (duckdb.connect(':memory:')), INSTALL/LOAD parquet, and pass an explicit SQL list of Path(...).as_posix() strings (as done in GH Archive code) to make reads robust.
| f"No parquet files found between {start} and \ | ||
| {end} under {self.base_path}" |
There was a problem hiding this comment.
The FileNotFoundError message uses a line-continuation backslash inside the f-string, which will embed a newline and indentation spaces into the exception text. Format this as a single-line f-string (or use textwrap.dedent) so the error message is stable and readable.
| f"No parquet files found between {start} and \ | |
| {end} under {self.base_path}" | |
| f"No parquet files found between {start} and {end} under {self.base_path}" |
|
|
||
|
|
||
| def auto_arima(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame: | ||
| cutoff = pd.Timestamp(cutoff) |
There was a problem hiding this comment.
In auto_arima, cutoff = pd.Timestamp(cutoff) is unused (the value is never referenced). Either remove it to avoid confusion, or use it to validate that the training data max ds matches the requested cutoff (as the GH Archive forecaster does).
| cutoff = pd.Timestamp(cutoff) | |
| cutoff = pd.Timestamp(cutoff) | |
| train_max_ds = pd.Timestamp(train_df["ds"].max()) | |
| if train_max_ds != cutoff: | |
| raise ValueError( | |
| "auto_arima requires training data to end at the requested cutoff. " | |
| f"Expected max ds {cutoff}, found {train_max_ds}." | |
| ) |
Adds an initial native CENACE pipeline integration to impermanent.
Included:
Validated:
TODO includes allowing more models to be plugged into the dataset and enable monthly or weekly price predictions.