Skip to content

Add initial CENACE pipeline integration#4

Open
elmartinj wants to merge 1 commit intoTimeCopilot:mainfrom
elmartinj:cenace-integration
Open

Add initial CENACE pipeline integration#4
elmartinj wants to merge 1 commit intoTimeCopilot:mainfrom
elmartinj:cenace-integration

Conversation

@elmartinj
Copy link
Copy Markdown

Adds an initial native CENACE pipeline integration to impermanent.

Included:

  • CENACE dataset config and data loader
  • hourly partition builder
  • CENACE forecast/evaluation modules
  • top-level pipeline entrypoint with dataset dispatch
  • working baseline models: naive_last_value, seasonal_naive_24
  • AutoARIMA adapter through the existing forecasting stack

Validated:

  • end-to-end forecast + evaluation runs for CENACE
  • tested on Jan/Feb 2023 data
  • comparison run completed for cutoff 2023-01-31 23:00:00, horizon 240

TODO includes allowing more models to be plugged into the dataset and enable monthly or weekly price predictions.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an initial end-to-end CENACE dataset integration to the existing forecasting/evaluation codebase, including data partitioning, simple baseline models, and a dataset-dispatched pipeline entrypoint.

Changes:

  • Introduces a CENACE dataset pipeline (aggregate → forecast → evaluate) and wires it into a dataset registry + top-level CLI entrypoint.
  • Adds CENACE hourly partition builder + DuckDB-based loader (CENACEData) for training windows and forecast actuals.
  • Implements initial CENACE models (naive baselines + AutoARIMA adapter) and a simple metric suite (MAE/RMSE/sMAPE).

Reviewed changes

Copilot reviewed 10 out of 14 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/pipeline.py Adds a top-level CLI to run a dataset pipeline via dataset_registry.
src/dataset_registry.py Registers the new cenace dataset runner and exposes choices for CLI.
src/cenace_pipeline.py Orchestrates CENACE aggregation, forecasting, and evaluation into one pipeline call.
src/data/cenace/config.py Defines CENACE data/forecast/eval directory layout.
src/data/cenace/aggregate/core.py Builds daily parquet partitions from the processed CENACE CSV.
src/data/cenace/utils/cenace_data.py Adds a DuckDB-backed loader for training windows and horizon actuals.
src/forecast/cenace/models.py Adds baseline models and an AutoARIMA adapter emitting y_hat.
src/forecast/cenace/core.py Adds forecast runner that loads training data and writes partitioned forecasts.
src/evaluation/cenace/metrics.py Adds MAE/RMSE/sMAPE and per-series/overall aggregation.
src/evaluation/cenace/core.py Adds evaluation runner that merges forecasts with actuals and writes metrics.
src/forecast/cenace/init.py Package marker for CENACE forecasting modules.
src/evaluation/cenace/init.py Package marker for CENACE evaluation modules.
src/data/cenace/init.py Package marker for CENACE data modules.
src/data/cenace/transform/core.py (Empty in diff) Placeholder for future transform logic.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +24 to +30
out = []
for ds in future_ds:
tmp = last_values.copy()
tmp["ds"] = ds
out.append(tmp)

fcst = pd.concat(out, ignore_index=True)
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naive_last_value builds the forecast by copying last_values once per step and concatenating (O(h) DataFrame copies + concat). This can become a bottleneck for many series or large horizons. Prefer a vectorized Cartesian product (e.g., repeat last_values for h rows per id and assign ds via np.tile / merge on a future index) to reduce memory churn.

Suggested change
out = []
for ds in future_ds:
tmp = last_values.copy()
tmp["ds"] = ds
out.append(tmp)
fcst = pd.concat(out, ignore_index=True)
fcst = last_values.loc[last_values.index.repeat(h)].copy()
fcst["ds"] = list(future_ds) * len(last_values)

Copilot uses AI. Check for mistakes.
Comment thread src/cenace_pipeline.py
Comment on lines +12 to +44
def run_cenace_pipeline(
cutoff: str,
model: str,
h: int = 24,
max_window_size: int = 48,
skip_aggregate: bool = False,
) -> tuple[str, str]:
try:
cutoff_ts = pd.Timestamp(cutoff)
except Exception as exc:
raise ValueError(f"Invalid cutoff timestamp: {cutoff}") from exc

if not skip_aggregate:
n_written = build_hourly_partitions()
print(f"Aggregated {n_written} partitions")

forecast_path = run_forecast(
cutoff=cutoff_ts,
model=model,
h=h,
max_window_size=max_window_size,
)
print(f"Forecasts saved to: {forecast_path}")

eval_path = run_evaluation(
cutoff=cutoff_ts,
model=model,
h=h,
max_window_size=max_window_size,
)
print(f"Metrics saved to: {eval_path}")

return str(forecast_path), str(eval_path)
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces a new dataset pipeline (aggregation → forecasting → evaluation) but adds no tests. The repo already has pytest coverage for analogous GH Archive components (data loaders/partition logic/aggregation), so adding at least unit tests for CENACEData.get_df/get_actuals (partition selection + DuckDB query), baseline models output shape/columns, and evaluate_forecasts would help prevent regressions.

Copilot uses AI. Check for mistakes.
Comment on lines +21 to +34
def evaluate_forecasts(merged: pd.DataFrame) -> pd.DataFrame:
per_uid = (
merged.groupby("unique_id", as_index=False)
.apply(
lambda g: pd.Series(
{
"mae": mae(g["y"], g["y_hat"]),
"rmse": rmse(g["y"], g["y_hat"]),
"smape": smape(g["y"], g["y_hat"]),
}
)
)
.reset_index(drop=True)
)
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evaluate_forecasts groups by unique_id and then calls .apply(...).reset_index(drop=True). With pandas groupby.apply, the group key is often placed in the index; dropping the index here can remove unique_id entirely from the returned DataFrame, producing metrics with no identifier. Prefer groupby('unique_id').agg(...) or reset_index() (without drop=True) to guarantee unique_id is preserved.

Copilot uses AI. Check for mistakes.
Comment on lines +50 to +60
paths = self._paths_for_range(start, cutoff)

query = f"""
SELECT unique_id, ds, y
FROM read_parquet({paths})
WHERE ds >= TIMESTAMP '{start}'
AND ds <= TIMESTAMP '{cutoff}'
"""

df = duckdb.sql(query).df()
df["ds"] = pd.to_datetime(df["ds"])
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DuckDB parquet reads are built via duckdb.sql(query) with read_parquet({paths}), where {paths} is a Python list repr. This is brittle (path escaping/backslashes on Windows, quoting, and no explicit INSTALL/LOAD parquet like other modules) and can break unexpectedly. Build a connection (duckdb.connect(':memory:')), INSTALL/LOAD parquet, and pass an explicit SQL list of Path(...).as_posix() strings (as done in GH Archive code) to make reads robust.

Copilot uses AI. Check for mistakes.
Comment on lines +35 to +36
f"No parquet files found between {start} and \
{end} under {self.base_path}"
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FileNotFoundError message uses a line-continuation backslash inside the f-string, which will embed a newline and indentation spaces into the exception text. Format this as a single-line f-string (or use textwrap.dedent) so the error message is stable and readable.

Suggested change
f"No parquet files found between {start} and \
{end} under {self.base_path}"
f"No parquet files found between {start} and {end} under {self.base_path}"

Copilot uses AI. Check for mistakes.


def auto_arima(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame:
cutoff = pd.Timestamp(cutoff)
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In auto_arima, cutoff = pd.Timestamp(cutoff) is unused (the value is never referenced). Either remove it to avoid confusion, or use it to validate that the training data max ds matches the requested cutoff (as the GH Archive forecaster does).

Suggested change
cutoff = pd.Timestamp(cutoff)
cutoff = pd.Timestamp(cutoff)
train_max_ds = pd.Timestamp(train_df["ds"].max())
if train_max_ds != cutoff:
raise ValueError(
"auto_arima requires training data to end at the requested cutoff. "
f"Expected max ds {cutoff}, found {train_max_ds}."
)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants