Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0b362a1
Clean up _evaluate_fold and optimize joblib memory usage
bruAristimunha Feb 9, 2026
2dcb887
Add splitter-based evaluation, batch results check, and tests
bruAristimunha Feb 9, 2026
d203141
Fix 5 equivalence issues in parallel evaluation path
bruAristimunha Feb 9, 2026
0806c10
Fix parallel evaluation: thread safety, equivalence, and cleanup
bruAristimunha Feb 9, 2026
5c7b4fe
Merge branch 'develop' into full-parallel
bruAristimunha Feb 9, 2026
08d775f
Fix WithinSessionSplitter RNG to use single shared instance
bruAristimunha Feb 9, 2026
0bf4075
Simplify parallel evaluation code: config dict, unified scoring, clas…
bruAristimunha Feb 9, 2026
d268e12
Fix WithinSessionSplitter RNG to use independent per-session state
bruAristimunha Feb 9, 2026
a896dde
Address PR review feedback: fix unused import, optimize param_grid co…
bruAristimunha Feb 10, 2026
ffe5b12
Fix race condition in parallel sphinx-gallery doc builds
bruAristimunha Feb 10, 2026
bec8527
Repair Zhou2016 incomplete downloads and document fix
bruAristimunha Feb 11, 2026
6d8ae8c
Atomic cache lookup and improved HDF5 locking for parallel evaluation
bruAristimunha Feb 11, 2026
085db72
Fix double score_ prefix and include error folds in aggregation
bruAristimunha Feb 11, 2026
dfd07c5
Merge branch 'develop' into full-parallel
bruAristimunha Feb 11, 2026
9bfd3a7
Merge branch 'develop' into full-parallel
bruAristimunha Mar 1, 2026
0a12a2d
Simplify evaluation logic: unify grid search, deduplicate imports
bruAristimunha Mar 1, 2026
0a80287
Fix N+1 HDF5 writes by batch-resizing datasets in Results.add()
bruAristimunha Mar 1, 2026
0e368c4
Merge branch 'develop' into full-parallel
bruAristimunha Mar 1, 2026
26c4d98
Simplify evaluation code using pandas groupby, defaultdict, and chain
bruAristimunha Mar 1, 2026
501940e
Add WithinSubjectEvaluation for k-fold CV pooling all sessions per su…
bruAristimunha Mar 1, 2026
d277f1c
Merge branch 'develop' into full-parallel
bruAristimunha Mar 1, 2026
b5783fc
Merge branch 'develop' into full-parallel
bruAristimunha Mar 1, 2026
c1de940
Address PR #967 review comments from Copilot review
bruAristimunha Mar 2, 2026
024440b
Merge branch 'develop' into full-parallel
bruAristimunha Mar 3, 2026
5bc2576
Fix carbon_emission tuple crash and restore push_result() in legacy path
bruAristimunha Mar 3, 2026
2d234e0
Simplify test code: extract helpers and parametrize duplicated tests
bruAristimunha Mar 3, 2026
6183fce
Add cross-dataset splitter and explicit split metadata flow
bruAristimunha Mar 3, 2026
e8227af
Deprecate legacy evaluation fallback in process
bruAristimunha Mar 3, 2026
295c46f
Disable WithinSubject legacy evaluate path
bruAristimunha Mar 3, 2026
e2d9f59
Polish cross-dataset splitter style and simplify group access
bruAristimunha Mar 3, 2026
8d4aa8c
Add Mainsah2025 BigP3BCI dataset (326 subjects, P300 speller)
bruAristimunha Mar 5, 2026
f8f04dc
Improve Mainsah2025: lazy loading, log montage errors
bruAristimunha Mar 5, 2026
94ac3eb
Fix Mainsah2025 subject mapping to use actual PhysioNet IDs
bruAristimunha Mar 5, 2026
3daa3ae
Merge develop into full-parallel: resolve 4 conflict files
bruAristimunha Mar 10, 2026
446d968
Restore WithinSubject evaluate API
bruAristimunha Mar 14, 2026
3be3b8e
Merge develop into full-parallel: resolve 8 conflict files
bruAristimunha Mar 18, 2026
54040bf
Add type hints to WithinSubjectEvaluation and fix _open_lock_hdf5 doc…
bruAristimunha Mar 18, 2026
73a893c
ci: retrigger CI after codecarbon 3.2.4 fix
bruAristimunha Mar 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ jobs:
echo "Removing incomplete BrainForm dataset"
rm -rf mne_data/MNE-RomaniBF2025ERP-data
fi
# Remove stale evaluation results to avoid race conditions
# between parallel sphinx-gallery examples sharing HDF5 files
if [ -d ~/mne_data/results ]; then
echo "Removing stale evaluation results"
rm -rf ~/mne_data/results
fi

- name: Cache docs build
id: cache-docs
Expand Down
4 changes: 4 additions & 0 deletions docs/source/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Enhancements
- Implement :class:`moabb.evaluations.WithinSubjectSplitter` for k-fold cross-validation within each subject across all sessions (by `Bruno Aristimunha`_)
- Add ``cv_class`` and ``cv_kwargs`` parameters to all evaluation classes (WithinSessionEvaluation, CrossSessionEvaluation, CrossSubjectEvaluation) for custom cross-validation strategies (:gh:`963` by `Bruno Aristimunha`_)
- Implement :class:`moabb.evaluations.splitters.LearningCurveSplitter` as a dedicated sklearn-compatible cross-validator for learning curves, enabling learning curve analysis with any evaluation type (:gh:`963` by `Bruno Aristimunha`_)
- Flattened parallel evaluation: CV folds are now evaluated in parallel within each dataset via ``_process_parallel()``, replacing per-fold sequential evaluation while preserving per-dataset scheduling (by `Bruno Aristimunha`_)
- Auto-generate dataset documentation admonitions (Participants, Equipment, Preprocessing, Data Access, Experimental Protocol) from class-level ``METADATA`` when missing, while preserving manually written sections (:gh:`960` by `Bruno Aristimunha`_)
- Add a "Report an Issue on GitHub" feedback section to all dataset docstrings so users can easily report dataset problems (:gh:`982` by `Bruno Aristimunha`_)
- Add ``additional_metadata`` parameter to ``paradigm.get_data()`` to fetch additional metadata columns from BIDS ``events.tsv`` files. Supports ``"all"`` to load all columns or a list of specific column names (:gh:`744` by `Matthias Dold`_)
Expand Down Expand Up @@ -115,6 +116,7 @@ Bugs
- Fixing option to pickle model (:gh:`870` by `Ethan Davis`_)
- Normalize Zenodo download paths and add a custom user-agent to improve download robustness (:gh:`946` by `Bruno Aristimunha`_)
- Use the BNCI mirror host to avoid download timeouts (:gh:`946` by `Bruno Aristimunha`_)
- Repair incomplete or corrupted :class:`moabb.datasets.Zhou2016` subject downloads by validating extracted EEG/events files and re-downloading under a subject-level lock, preventing empty-session failures during parallel docs/CI runs (by `Bruno Aristimunha`_)
- Prevent Python mutable default argument when defining CodeCarbon configurations (:gh:`956` by `Ethan Davis`_)
- Fix copytree FileExistsError in BrainInvaders2013a download by adding dirs_exist_ok=True (by `Bruno Aristimunha`_)
- Ensure optional additional scoring columns in evaluation results (:gh:`957` by `Ethan Davis`_)
Expand Down Expand Up @@ -172,6 +174,8 @@ Code health
- Remove redundant learning curve methods (``get_data_size_subsets()``, ``score_explicit()``, ``_evaluate_learning_curve()``) from WithinSessionEvaluation in favor of unified splitter-based approach (:gh:`963` by `Bruno Aristimunha`_)
- Generic metadata column registration: ``LearningCurveSplitter`` declares a ``metadata_columns`` class attribute, and ``BaseEvaluation`` auto-detects it via ``hasattr(cv_class, "metadata_columns")`` instead of hardcoding class checks, making it extensible to future custom splitters (:gh:`963` by `Bruno Aristimunha`_)
- Fix ``get_n_splits()`` delegation in ``WithinSessionSplitter`` and ``WithinSubjectSplitter`` to properly forward to the inner ``cv_class.get_n_splits()`` instead of hardcoding ``n_folds``, giving correct split counts when using custom CV classes like ``LearningCurveSplitter`` (:gh:`963` by `Bruno Aristimunha`_)
- Remove dead ``_fit_and_score()`` function and unused ``paradigm``/``mne_labels`` parameters from ``_evaluate_fold()`` in ``evaluations/base.py`` (by `Bruno Aristimunha`_)
- Memory optimization in ``_process_parallel()``: pass ``X``, ``y``, ``metadata`` as top-level positional args to ``joblib.delayed()`` so the loky backend can auto-mmap large numpy arrays, avoiding N full copies for N parallel tasks (by `Bruno Aristimunha`_)
- Remove duplicate ``get_inner_splitter_metadata()`` from ``WithinSessionSplitter``, ``WithinSubjectSplitter``, and ``CrossSubjectSplitter``. All splitters now store a ``_current_splitter`` reference, and ``BaseEvaluation._build_scored_result()`` reads metadata generically from it (:gh:`963` by `Bruno Aristimunha`_)
- Extract ``_fit_cv()``, ``_maybe_save_model_cv()``, and ``_attach_emissions()`` into ``BaseEvaluation``, removing duplicated model-fitting, model-saving, and carbon-tracking boilerplate from ``WithinSessionEvaluation``, ``CrossSessionEvaluation``, and ``CrossSubjectEvaluation`` (:gh:`963` by `Bruno Aristimunha`_)
- Extract ``_load_data()`` helper into ``BaseEvaluation`` to centralize data loading logic (epoch requirement checking and ``paradigm.get_data()`` call) that was duplicated across all three evaluation classes (:gh:`963` by `Bruno Aristimunha`_)
Expand Down
173 changes: 132 additions & 41 deletions moabb/analysis/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ def __init__(
os.makedirs(osp.dirname(self.filepath), exist_ok=True)
self.filepath = self.filepath

if overwrite and osp.isfile(self.filepath):
os.remove(self.filepath)

if not osp.isfile(self.filepath):
if overwrite:
with _open_lock_hdf5(self.filepath, "w") as f:
f.attrs["create_time"] = np.bytes_(
"{:%Y-%m-%d, %H:%M}".format(datetime.now())
)
elif not osp.isfile(self.filepath):
with _open_lock_hdf5(self.filepath, "w") as f:
f.attrs["create_time"] = np.bytes_(
"{:%Y-%m-%d, %H:%M}".format(datetime.now())
Expand Down Expand Up @@ -201,12 +203,19 @@ def to_list(res):
dset = ppline_grp[dname]
# Backward compat: existing dataset may have fewer columns
n_existing = dset["data"].shape[1]
for d in dlist:
# add id and scores to group
length = len(dset["id"]) + 1
dset["id"].resize(length, 0)
dset["data"].resize(length, 0)
dset["id"][-1, :] = np.asarray([str(d["subject"]), str(d["session"])])
n_new = len(dlist)
old_len = len(dset["id"])
new_len = old_len + n_new
dset["id"].resize(new_len, 0)
dset["data"].resize(new_len, 0)
if _carbonfootprint and "codecarbon_task_name" in dset:
dset["codecarbon_task_name"].resize(new_len, 0)

for i, d in enumerate(dlist):
row = old_len + i
dset["id"][row, :] = np.asarray(
[str(d["subject"]), str(d["session"])]
)
try:
add_cols = [d[ac] for ac in self.additional_columns]
except KeyError:
Expand Down Expand Up @@ -235,19 +244,50 @@ def to_list(res):

# Save unique CodeCarbon task name (only if dataset exists)
if "codecarbon_task_name" in dset:
dset["codecarbon_task_name"].resize(length, 0)
dset["codecarbon_task_name"][-1] = str(
dset["codecarbon_task_name"][row] = str(
d.get("codecarbon_task_name", "")
)

all_cols = np.asarray([*cols, *add_cols])
dset["data"][-1, :] = all_cols[:n_existing]
dset["data"][row, :] = all_cols[:n_existing]

def to_dataframe(self, pipelines=None, process_pipeline=None):
@staticmethod
def _to_dataframe_from_file(f, digests=None):
df_list = []
allowed = set(digests) if digests is not None else None

for digest, p_group in f.items():
if (allowed is not None) and (digest not in allowed):
continue

name = p_group.attrs["name"]
for dname, dset in p_group.items():
array = np.array(dset["data"])
ids = np.array(dset["id"])
df = pd.DataFrame(array, columns=dset.attrs["columns"])
df["subject"] = [s.decode() for s in ids[:, 0]]
df["session"] = [s.decode() for s in ids[:, 1]]
df["channels"] = dset.attrs["channels"]
df["n_sessions"] = dset.attrs["n_sessions"]
df["dataset"] = dname
df["pipeline"] = name
if _carbonfootprint and "codecarbon_task_name" in dset:
df["codecarbon_task_name"] = np.array(
dset["codecarbon_task_name"]
).astype(str)
df_list.append(df)

if not df_list:
return pd.DataFrame()
result = pd.concat(df_list, ignore_index=True)
for col in ("samples_test", "n_classes"):
if col not in result.columns:
result[col] = np.nan
return result

def to_dataframe(self, pipelines=None, process_pipeline=None):
# get the list of pipeline hash
digests = []
digests = None
if pipelines is not None and process_pipeline is not None:
digests = [
get_pipeline_digest(process_pipeline, pipelines[name])
Expand All @@ -259,33 +299,84 @@ def to_dataframe(self, pipelines=None, process_pipeline=None):
)

with _open_lock_hdf5(self.filepath, "r") as f:
for digest, p_group in f.items():
# skip if not in pipeline list
if (pipelines is not None) and (digest not in digests):
continue

name = p_group.attrs["name"]
for dname, dset in p_group.items():
array = np.array(dset["data"])
ids = np.array(dset["id"])
df = pd.DataFrame(array, columns=dset.attrs["columns"])
df["subject"] = [s.decode() for s in ids[:, 0]]
df["session"] = [s.decode() for s in ids[:, 1]]
df["channels"] = dset.attrs["channels"]
df["n_sessions"] = dset.attrs["n_sessions"]
df["dataset"] = dname
df["pipeline"] = name
if _carbonfootprint and "codecarbon_task_name" in dset:
df["codecarbon_task_name"] = np.array(
dset["codecarbon_task_name"]
).astype(str)
df_list.append(df)
return self._to_dataframe_from_file(f, digests=digests)

result = pd.concat(df_list, ignore_index=True)
for col in ("samples_test", "n_classes"):
if col not in result.columns:
result[col] = np.nan
return result
def batch_not_yet_computed_or_cached_df(
self, pipelines, dataset, subjects, process_pipeline
):
"""Atomically compute work_plan and, if complete, return cached dataframe.

Returns
-------
work_plan : dict
Same format as :func:`batch_not_yet_computed`.
cached_df : pd.DataFrame | None
Dataframe for selected pipelines when ``work_plan`` is empty.
``None`` when there is still work to do.
"""
digests = {
name: get_pipeline_digest(process_pipeline, pipeline)
for name, pipeline in pipelines.items()
}
with _open_lock_hdf5(self.filepath, "r") as f:
computed_subjects = {}
for name, digest in digests.items():
if digest in f.keys():
pipe_grp = f[digest]
if dataset.code in pipe_grp.keys():
dset = pipe_grp[dataset.code]
computed_subjects[name] = set(dset["id"][:, 0])
else:
computed_subjects[name] = set()
else:
computed_subjects[name] = set()

work_plan = {}
for subject in subjects:
subj_encoded = str(subject).encode("utf-8")
missing = {
name: pipelines[name]
for name in pipelines
if subj_encoded not in computed_subjects[name]
}
if missing:
work_plan[subject] = missing

if work_plan:
return work_plan, None

cached_df = self._to_dataframe_from_file(f, digests=list(digests.values()))
Comment thread
bruAristimunha marked this conversation as resolved.
# Filter to current dataset to avoid mixing rows from other datasets
# that share the same pipeline digest.
if cached_df is not None and not cached_df.empty:
cached_df = cached_df[cached_df["dataset"] == dataset.code]
return work_plan, cached_df

def batch_not_yet_computed(self, pipelines, dataset, subjects, process_pipeline):
"""Check all subjects at once with a single HDF5 read.

Parameters
----------
pipelines : dict of pipeline instance.
A dict containing the sklearn pipeline to evaluate.
dataset : Dataset instance
The dataset to check for.
subjects : list
List of subjects to check.
process_pipeline : Pipeline | None
The processing pipeline.

Returns
-------
dict
A dict mapping subject -> {pipeline_name: pipeline} for subjects
that still need computation. Subjects with all pipelines computed
are omitted.
"""
work_plan, _ = self.batch_not_yet_computed_or_cached_df(
pipelines, dataset, subjects, process_pipeline
)
return work_plan

def not_yet_computed(self, pipelines, dataset, subj, process_pipeline):
"""Check if a results is missing.
Expand Down
71 changes: 57 additions & 14 deletions moabb/datasets/Zhou2016.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

import json
import logging
import shutil
from pathlib import Path
from zipfile import ZipFile
from zipfile import BadZipFile, ZipFile

import requests
from mne.utils import _open_lock
Expand Down Expand Up @@ -235,6 +236,15 @@ def __init__(self, subjects=None, sessions=None):
)
self.zenodo_record_id = ZENODO_RECORD_ID

@staticmethod
def _subject_has_downloaded_data(folder_path: Path) -> bool:
"""Check whether a subject directory contains minimally valid EEG BIDS data."""
if not folder_path.exists() or not folder_path.is_dir():
return False
has_eeg = any(folder_path.rglob("*_eeg.edf"))
has_events = any(folder_path.rglob("*_events.tsv"))
return has_eeg and has_events

def _download_subject(self, subject, path, force_update, update_path, verbose) -> str:
"""Download the subject data."""
if subject not in self.subject_list:
Expand All @@ -258,21 +268,54 @@ def _download_subject(self, subject, path, force_update, update_path, verbose) -
# Check if the file corresponds to the current subject
if file_name == f"sub-{subject}.zip":
folder_path = file_path.with_suffix("")
lock_path = dataset_path / f"sub-{subject}.download.lock"
with _open_lock(lock_path, "w"):
if force_update:
if folder_path.exists():
shutil.rmtree(folder_path)
if file_path.exists():
file_path.unlink()
elif folder_path.exists() and (
not self._subject_has_downloaded_data(folder_path)
):
log.warning(
"Found incomplete Zhou2016 data at %s; repairing subject %s",
folder_path,
subject,
)
shutil.rmtree(folder_path)

if not folder_path.exists():
log.info(
f"Downloading {file_name} for subject {subject} to {file_path}"
)
download_if_missing(
file_path=file_path,
url=file_url,
warn_missing=False,
verbose=verbose,
)
if not self._subject_has_downloaded_data(folder_path):
log.info(
f"Downloading {file_name} for subject {subject} to {file_path}"
)
download_if_missing(
file_path=file_path,
url=file_url,
warn_missing=False,
verbose=verbose,
)

log.info(f"Extracting {file_name} to {folder_path}")
with ZipFile(str(file_path), "r") as zip_ref:
zip_ref.extractall(folder_path.parent)
log.info(f"Extracting {file_name} to {folder_path}")
try:
with ZipFile(str(file_path), "r") as zip_ref:
zip_ref.extractall(folder_path.parent)
except BadZipFile:
log.warning(
"Corrupted archive at %s; redownloading %s",
file_path,
file_name,
)
if file_path.exists():
file_path.unlink()
download_if_missing(
file_path=file_path,
url=file_url,
warn_missing=False,
verbose=verbose,
)
with ZipFile(str(file_path), "r") as zip_ref:
zip_ref.extractall(folder_path.parent)

else:
download_if_missing(
Expand Down
2 changes: 2 additions & 0 deletions moabb/evaluations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
CrossSessionEvaluation,
CrossSubjectEvaluation,
WithinSessionEvaluation,
WithinSubjectEvaluation,
)
from .splitters import (
CrossDatasetSplitter,
CrossSessionSplitter,
CrossSubjectSplitter,
LearningCurveSplitter,
Expand Down
Loading
Loading