Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 99 additions & 5 deletions cloudnetpy/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from cloudnetpy import concat_lib, instruments
from cloudnetpy.categorize import CategorizeInput, generate_categorize
from cloudnetpy.exceptions import PlottingError
from cloudnetpy.exceptions import ModelDataError, PlottingError
from cloudnetpy.plotting import PlotParameters, generate_figure

if TYPE_CHECKING:
Expand All @@ -29,6 +29,17 @@

cloudnet_api_url: Final = "https://cloudnet.fmi.fi/api/"

# Model-evaluation L3 products and the observation product they downsample.
# The API does not declare these source products, so they are mapped here.
L3_SOURCE_PRODUCTS: Final = {
"l3-cf": "categorize",
"l3-iwc": "iwc",
"l3-lwc": "lwc",
}

# Model used for L3 products when none is given with --model.
DEFAULT_L3_MODEL: Final = "ecmwf"


def run(args: argparse.Namespace, tmpdir: str, client: APIClient) -> None:
cat_files = {}
Expand Down Expand Up @@ -96,6 +107,16 @@ def run(args: argparse.Namespace, tmpdir: str, client: APIClient) -> None:
if epsilon_filepath is not None:
_plot(epsilon_filepath, "epsilon-radar", args)

# Model evaluation L3 products (e.g. l3-cf)
for product in args.products:
base_product = _parse_instrument(product)[0]
if base_product in L3_SOURCE_PRODUCTS:
model = args.model or DEFAULT_L3_MODEL
if args.model is None:
logging.info("No model specified, using default '%s'", model)
l3_filepath = _process_l3_product(base_product, args, client, model)
_plot_l3(l3_filepath, base_product, args)


def _process_epsilon_radar(
cat_files: dict, args: argparse.Namespace, client: APIClient
Expand All @@ -118,6 +139,47 @@ def _process_epsilon_radar(
return str(output_file)


def _process_l3_product(
product: str, args: argparse.Namespace, client: APIClient, model: str
) -> str | None:
obs = product.removeprefix("l3-")
source_product = L3_SOURCE_PRODUCTS[product]
product_file = _fetch_product(args, source_product, client)
if product_file is None:
logging.info("No %s data available for %s", source_product, product)
return None
model_file = _fetch_model(args, client, model)
if model_file is None:
logging.info("No model data available for %s", product)
return None
filename = f"{args.date.replace('-', '')}_{args.site}_{model}_{product}.nc"
output_file = _create_output_folder("evaluation", args) / filename
model_name = next((m.name for m in client.models() if m.id == model), model)
site_name = next(
(s.human_readable_name for s in client.sites() if s.id == args.site),
args.site,
)
module = importlib.import_module(
"cloudnetpy.model_evaluation.products.product_resampling"
)
try:
module.process_L3_day_product(
model,
obs,
model_file,
product_file,
str(output_file),
model_name=model_name,
site_name=site_name,
overwrite=True,
)
except ModelDataError as e:
logging.info("Failed to process %s: %s", product, e)
return None
logging.info("Processed %s: %s", product, output_file)
return str(output_file)


def _process_categorize(
input_files: dict,
instrument_prefs: dict[str, list[str]],
Expand Down Expand Up @@ -536,9 +598,14 @@ def _fetch_product(
return _download_product_file(meta, folder, client, force=args.force_download)


def _fetch_model(args: argparse.Namespace, client: APIClient) -> str | None:
def _fetch_model(
args: argparse.Namespace, client: APIClient, model_id: str | None = None
) -> str | None:
files = client.files(
product_id="model", model_id=args.model, date=args.date, site_id=args.site
product_id="model",
model_id=model_id or args.model,
date=args.date,
site_id=args.site,
)
if not files:
logging.info("No model data available for this date")
Expand Down Expand Up @@ -623,6 +690,33 @@ def _plot(
logging.info("Plotted %s: %s", product, image_name)


def _plot_l3(
filepath: PathLike | str | None,
product: str,
args: argparse.Namespace,
) -> None:
if filepath is None or (not args.plot and not args.show):
return
obs = product.removeprefix("l3-")
save_path = f"{Path(filepath).parent}/" if args.plot else None
var_list = args.variables.split(",") if args.variables is not None else None
module = importlib.import_module("cloudnetpy.model_evaluation.plotting.plotting")
try:
module.generate_L3_day_plots(
str(filepath),
obs,
var_list=var_list,
save_path=save_path,
show=args.show,
include_advection=False,
)
except (PlottingError, ValueError, KeyError) as e:
logging.info("Failed to plot %s: %s", product, e)
return
if args.plot:
logging.info("Plotted %s to %s", product, save_path)


def _process_cat_product(product: str, categorize_file: str) -> str:
output_file = categorize_file.replace("categorize", product)
module = importlib.import_module("cloudnetpy.products")
Expand All @@ -648,7 +742,7 @@ def _parse_products(product_argument: str, client: APIClient) -> list[str]:
valid_products = []
for product in products:
prod, _ = _parse_instrument(product)
if prod in valid_options:
if prod in valid_options or prod in L3_SOURCE_PRODUCTS:
valid_products.append(product)
return valid_products

Expand Down Expand Up @@ -704,7 +798,7 @@ def main() -> None:
"-m",
"--model",
type=lambda arg: _parse_model(arg, client),
help="Model to use in categorize.",
help="Model to use in categorize and model evaluation (l3-*) products.",
)
parser.add_argument(
"-i",
Expand Down
187 changes: 71 additions & 116 deletions cloudnetpy/model_evaluation/file_handler.py
Original file line number Diff line number Diff line change
@@ -1,160 +1,115 @@
import os
from datetime import datetime
from os import PathLike
from typing import TYPE_CHECKING
from uuid import UUID

import netCDF4
import numpy.typing as npt

from cloudnetpy import output, utils
from cloudnetpy import output
from cloudnetpy.model_evaluation.model_metadata import MODEL_PREFIX, PRODUCT_NAMES

from .metadata import (
CYCLE_ATTRIBUTES,
MODEL_ATTRIBUTES,
MODEL_L3_ATTRIBUTES,
REGRID_PRODUCT_ATTRIBUTES,
)
from .products.model_products import ModelManager

if TYPE_CHECKING:
from cloudnetpy.model_evaluation.products.model_products import ModelManager
from cloudnetpy.model_evaluation.products.observation_products import (
ObservationManager,
)


def update_attributes(model_downsample_variables: dict, attributes: dict) -> None:
"""Overrides existing Cloudnet-ME Array-attributes.
Overrides existing attributes using hard-coded values.
New attributes are added.
"""Sets variable attributes for the L3 downsampled file.

Model (simulated) fields are prefixed with ``model_``; observation fields
downsampled to the model grid use the bare product key.

Args:
model_downsample_variables (dict): Array instances.
attributes (dict): Product-specific attributes.
attributes (dict): Product-specific attributes (e.g. time units).
"""
for key in model_downsample_variables:
x = len(key.split("_")) - 1
key_parts = key.split("_", x)
if key in list(attributes.keys()):
model_downsample_variables[key].set_attributes(attributes[key])
for key, variable in model_downsample_variables.items():
if key in attributes:
variable.set_attributes(attributes[key])
if key in MODEL_ATTRIBUTES:
model_downsample_variables[key].set_attributes(MODEL_ATTRIBUTES[key])
elif "_".join(key_parts[0:-1]) in REGRID_PRODUCT_ATTRIBUTES:
model_downsample_variables[key].set_attributes(
REGRID_PRODUCT_ATTRIBUTES["_".join(key_parts[0:-1])],
)
elif "_".join(key_parts[0:-2]) in REGRID_PRODUCT_ATTRIBUTES:
model_downsample_variables[key].set_attributes(
REGRID_PRODUCT_ATTRIBUTES["_".join(key_parts[0:-2])],
)
elif (
"_".join(key_parts[1:]) in MODEL_L3_ATTRIBUTES
or "_".join(key_parts[2:]) in MODEL_L3_ATTRIBUTES
):
try:
model_downsample_variables[key].set_attributes(
MODEL_L3_ATTRIBUTES["_".join(key_parts[1:])],
)
except KeyError:
model_downsample_variables[key].set_attributes(
MODEL_L3_ATTRIBUTES["_".join(key_parts[2:])],
)
elif "_".join(key_parts[1:]) in CYCLE_ATTRIBUTES:
model_downsample_variables[key].set_attributes(
CYCLE_ATTRIBUTES["_".join(key_parts[1:])],
)
elif "_".join(key_parts[2:]) in CYCLE_ATTRIBUTES:
model_downsample_variables[key].set_attributes(
CYCLE_ATTRIBUTES["_".join(key_parts[2:])],
)
variable.set_attributes(MODEL_ATTRIBUTES[key])
elif key.startswith(MODEL_PREFIX):
base = key.removeprefix(MODEL_PREFIX)
if base in MODEL_L3_ATTRIBUTES:
variable.set_attributes(MODEL_L3_ATTRIBUTES[base])
elif base in CYCLE_ATTRIBUTES:
variable.set_attributes(CYCLE_ATTRIBUTES[base])
elif key in REGRID_PRODUCT_ATTRIBUTES:
variable.set_attributes(REGRID_PRODUCT_ATTRIBUTES[key])


def save_downsampled_file(
id_mark: str,
product: str,
file_name: str | PathLike,
objects: tuple,
files: tuple[list[str | PathLike], str | PathLike],
model_obj: "ModelManager",
obs_obj: "ObservationManager",
model_files: list[str | PathLike],
product_file: str | PathLike,
uuid: UUID,
model_name: str | None = None,
site_name: str | None = None,
) -> None:
"""Saves a standard downsampled day product file.

Args:
id_mark (str): File identifier, format "(product name)_(model name)"
file_name (str): Name of the output file to be generated
objects (tuple): Include two objects: The :class:'ModelManager' and
The :class:'ObservationManager.
files (tuple): Includes two sourcefile group: List of model file(s) used
for processing output file and Cloudnet L2 product file
keep_uuid (bool): If True, keeps the UUID of the old file, if that exists.
Default is False when new UUID is generated.
product (str): Product name, e.g. "cf".
file_name (str): Name of the output file to be generated.
model_obj (ModelManager): Model fields downsampled to the model grid.
obs_obj (ObservationManager): Cloudnet L2 observation product.
model_files (list): Model file(s) used for processing the output file.
product_file (str): Cloudnet L2 product file.
uuid (str): Set specific UUID for the file.
model_name (str): Human-readable model name for plot titles. Falls back
to the model id when not given.
site_name (str): Human-readable site name for the location attribute and
plot subtitle. Falls back to the source file's location
when not given.
"""
obj = objects[0]
dimensions = {"time": len(obj.time), "level": len(obj.data["level"][:])}
with output.init_file(file_name, dimensions, obj.data, uuid) as root_group:
n_levels = model_obj.data[model_obj.keys["height"]][:].shape[-1]
dimensions = {"time": len(model_obj.time), "level": n_levels}
location = site_name or model_obj.dataset.location
with output.init_file(file_name, dimensions, model_obj.data, uuid) as root_group:
_augment_global_attributes(root_group)
root_group.cloudnet_file_type = "l3-" + id_mark.split("_", maxsplit=1)[0]
root_group.title = (
f"Downsampled {id_mark.capitalize().replace('_', ' of ')} "
f"from {obj.dataset.location}"
)
_add_source(root_group, objects, files)
output.copy_global(
obj.dataset, root_group, ("location", "day", "month", "year")
)
if not hasattr(obj.dataset, "day"):
root_group.year, root_group.month, root_group.day = obj.date
output.merge_history(root_group, id_mark, obj)


def add_var2ncfile(obj: ModelManager, file_name: str | PathLike) -> None:
with netCDF4.Dataset(file_name, "r+", format="NETCDF4_CLASSIC") as nc_file:
_write_vars2nc(nc_file, obj.data)


def _write_vars2nc(rootgrp: netCDF4.Dataset, cloudnet_variables: dict) -> None:
"""Iterates over Cloudnet-ME instances and write to given rootgrp."""

def _get_dimensions(array: npt.NDArray) -> tuple:
"""Finds correct dimensions for a variable."""
if utils.isscalar(array):
return ()
variable_size: tuple = ()
file_dims = rootgrp.dimensions
array_dims = array.shape
for length in array_dims:
dim = [key for key in file_dims if file_dims[key].size == length][0] # noqa: RUF015
variable_size = (*variable_size, dim)
return variable_size

for key in cloudnet_variables:
obj = cloudnet_variables[key]
size = _get_dimensions(obj.data)
try:
nc_variable = rootgrp.createVariable(
obj.name,
obj.data_type,
size,
zlib=True,
)
nc_variable[:] = obj.data
for attr in obj.fetch_attributes():
setattr(nc_variable, attr, getattr(obj, attr))
except RuntimeError:
continue
root_group.cloudnet_file_type = "l3-" + product
product_name = PRODUCT_NAMES.get(product, product)
root_group.title = f"Observed and modeled {product_name} over {location}"
root_group.model_id = model_obj.model
root_group.model_name = model_name or model_obj.model
_add_source(root_group, model_obj, obs_obj, model_files, product_file)
output.copy_global(model_obj.dataset, root_group, ("day", "month", "year"))
root_group.location = location
if not hasattr(model_obj.dataset, "day"):
root_group.year, root_group.month, root_group.day = model_obj.date
output.merge_history(root_group, f"L3 {product_name}", model_obj)


def _augment_global_attributes(root_group: netCDF4.Dataset) -> None:
root_group.Conventions = "CF-1.8"


def _add_source(root_ground: netCDF4.Dataset, objects: tuple, files: tuple) -> None:
def _add_source(
root_group: netCDF4.Dataset,
model_obj: "ModelManager",
obs_obj: "ObservationManager",
model_files: list[str | PathLike],
product_file: str | PathLike,
) -> None:
"""Generates source info for multiple files."""
model, obs = objects
model_files, obs_file = files
source = f"Observation file: {os.path.basename(obs_file)}"
source += "\n"
source += f"{model.model} file(s): "
for i, f in enumerate(model_files):
source += f"{os.path.basename(f)}"
if i < len(model_files) - 1:
source += "\n"
root_ground.source = source
root_ground.source_file_uuids = output.get_source_uuids([model, obs])
filenames = [os.path.basename(product_file)] + [
os.path.basename(f) for f in model_files
]
root_group.source = "\n".join(filenames)
root_group.source_file_uuids = output.get_source_uuids([model_obj, obs_obj])


def add_time_attribute(date: datetime) -> dict:
Expand Down
Loading