Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion esmvalcore/_recipe/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,12 @@ def _update_multiproduct(

if step == "ensemble_statistics":
check.ensemble_statistics_preproc(settings)
grouping = ["project", "dataset", "exp", "sub_experiment"]
grouping: tuple[str, ...] | None = (
"project",
"dataset",
"exp",
"sub_experiment",
)
else:
check.multimodel_statistics_preproc(settings)
grouping = settings.get("groupby", None)
Expand Down
14 changes: 13 additions & 1 deletion esmvalcore/io/intake_esgf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from __future__ import annotations

import copy
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand All @@ -47,6 +48,8 @@
"IntakeESGFDataset",
]

logger = logging.getLogger(__name__)


class _CachingCatalog(intake_esgf.ESGFCatalog):
"""An ESGF catalog that caches to_path_dict results."""
Expand Down Expand Up @@ -122,7 +125,16 @@ def __hash__(self) -> int:

def prepare(self) -> None:
"""Prepare the data for access."""
self.catalog.to_path_dict(minimal_keys=False)
try:
self.catalog.to_path_dict(minimal_keys=False, quiet=True)
except intake_esgf.exceptions.DatasetLoadError:
logger.error(
"Failed to download dataset '%s' from the ESGF. Error messages:\n%s",
self.name,
self.catalog.session_log(),
)
raise

for index in self.catalog.indices:
# Set the sessions to None to avoid issues with pickling
# requests_cache.CachedSession objects when max_parallel_tasks > 1.
Expand Down
19 changes: 18 additions & 1 deletion esmvalcore/preprocessor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
extract_levels,
extract_location,
extract_point,
is_dataset,
regrid,
)
from esmvalcore.preprocessor._rolling_window import rolling_window_statistics
Expand Down Expand Up @@ -619,7 +620,7 @@ def __init__(
self,
filename: Path,
attributes: dict[str, Any] | None = None,
settings: dict[str, Any] | None = None,
settings: dict[str, dict[str, Any]] | None = None,
datasets: list[Dataset] | None = None,
) -> None:
if datasets is not None:
Expand All @@ -644,6 +645,22 @@ def __init__(
# Set some preprocessor settings (move all defaults here?)
if settings is None:
settings = {}

# Create a copy of any datasets in settings. This drops the information
# in Dataset.files and avoids issues with deepcopying and pickling
# those files. This is needed because
# esmvalcore.io.intake_esgf.IntakeESGFDataset objects use a
# cached_requests.CachedSession object that cannot be deepcopied or
# pickled.
settings = {
fn: {
arg: (
value.copy() if is_dataset(value) else copy.deepcopy(value)
)
for arg, value in kwargs.items()
}
for fn, kwargs in settings.items()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you dump to a json payload that can be serialized and passed around?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use requests_cache.CachedSession objects with intake-esgf to make repeat searches fast. Those aren't trivially serializeable (see requests-cache/requests-cache#707 for some background), regardless of how you serialize them. We don't need to serialize Dataset.files because we can just search for them again. Therefore, this seems the simplest solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very interesting - thanks for the pointer, bud! Why do I have the feeling that a Dataset can be serialized since it's a dictionary of simple objects (not too complicated for JSON) and also the Session can be serialized since the connection configuration is simple - OK no other stuff like backends and more complicated things though - question is, would that help vs searching for the file again? or am I trying to fit a square peg in a round hole? I am not a fan of pickles though, security-wise πŸ₯’

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't think this is worth optimizing. It should be no problem to search for the same file more than once occasionally, with the current code this only happens when the target dataset for the regrid preprocessor function is a dataset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's why I decided to dump (pun intended) my idea above - just realized it's a corner case anyway πŸ˜€

self.settings = copy.deepcopy(settings)
if attributes is None:
attributes = {}
Expand Down
5 changes: 5 additions & 0 deletions tests/integration/recipe/test_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,11 @@ def test_reference_dataset(tmp_path, patched_datafinder, session, monkeypatch):
)

assert product.settings["regrid"]["target_grid"] == reference.datasets[0]
# Check that the target dataset does not have files, to prevent pickling
# errors: https://github.com/ESMValGroup/ESMValCore/issues/2989.
# The files can be found again at load time.
assert product.settings["regrid"]["target_grid"]._files is None

assert product.settings["extract_levels"]["levels"] == levels

get_reference_levels.assert_called_once_with(reference.datasets[0])
Expand Down
25 changes: 24 additions & 1 deletion tests/unit/io/test_intake_esgf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING

import intake_esgf
import intake_esgf.exceptions
import iris.cube
import pandas as pd
import pytest
Expand Down Expand Up @@ -35,7 +36,29 @@ def test_prepare(mocker: MockerFixture) -> None:
dataset = IntakeESGFDataset(name="id", facets={}, catalog=cat)

dataset.prepare()
to_path_mock.assert_called_once_with(minimal_keys=False)
to_path_mock.assert_called_once_with(minimal_keys=False, quiet=True)


def test_prepare_fails(mocker: MockerFixture) -> None:
"""IntakeESGFDataset.prepare should should log catalog.session_log() on failure."""
cat = intake_esgf.ESGFCatalog()
exc = intake_esgf.exceptions.DatasetLoadError(
["CMCC.CMCC - CMS.historical.day.atmos.day.r1i1p1.sfcWind"],
None,
)
to_path_mock = mocker.patch.object(
cat,
"to_path_dict",
autospec=True,
side_effect=exc,
)
session_log_mock = mocker.patch.object(cat, "session_log", autospec=True)
dataset = IntakeESGFDataset(name="id", facets={}, catalog=cat)

with pytest.raises(intake_esgf.exceptions.DatasetLoadError):
dataset.prepare()
to_path_mock.assert_called_once_with(minimal_keys=False, quiet=True)
session_log_mock.assert_called_once_with()


def test_attributes_raises_before_to_iris() -> None:
Expand Down