diff --git a/imap_processing/cli.py b/imap_processing/cli.py index cece0fd92..c827c7500 100644 --- a/imap_processing/cli.py +++ b/imap_processing/cli.py @@ -56,7 +56,7 @@ from imap_processing.glows.l1a.glows_l1a import glows_l1a from imap_processing.glows.l1b.glows_l1b import glows_l1b, glows_l1b_de from imap_processing.glows.l2.glows_l2 import glows_l2 -from imap_processing.hi import hi_l1a, hi_l1b, hi_l1c, hi_l2 +from imap_processing.hi import hi_goodtimes, hi_l1a, hi_l1b, hi_l1c, hi_l2 from imap_processing.hit.l1a.hit_l1a import hit_l1a from imap_processing.hit.l1b.hit_l1b import hit_l1b from imap_processing.hit.l2.hit_l2 import hit_l2 @@ -770,7 +770,7 @@ def do_processing( class Hi(ProcessInstrument): """Process IMAP-Hi.""" - def do_processing( + def do_processing( # noqa: PLR0912 self, dependencies: ProcessingInputCollection ) -> list[xr.Dataset]: """ @@ -789,6 +789,10 @@ def do_processing( print(f"Processing IMAP-Hi {self.data_level}") datasets: list[xr.Dataset] = [] + # Check self.repointing is not None (for mypy type checking) + if self.repointing is None: + raise ValueError("Repointing must be provided for Hi processing.") + if self.data_level == "l1a": science_files = dependencies.get_file_paths(source="hi") if len(science_files) != 1: @@ -813,17 +817,60 @@ def do_processing( load_cdf(l1a_de_file), load_cdf(l1b_hk_file), esa_energies_csv ) elif self.data_level == "l1c": - science_paths = dependencies.get_file_paths(source="hi", data_type="l1b") - if len(science_paths) != 1: - raise ValueError( - f"Expected only one science dependency. Got {science_paths}" + if "goodtimes" in self.descriptor: + # Goodtimes processing + l1b_de_paths = dependencies.get_file_paths( + source="hi", data_type="l1b", descriptor="de" ) - anc_paths = dependencies.get_file_paths(data_type="ancillary") - if len(anc_paths) != 1: - raise ValueError( - f"Expected only one ancillary dependency. Got {anc_paths}" + if not l1b_de_paths: + raise ValueError("No L1B DE files found for goodtimes processing") + + l1b_hk_paths = dependencies.get_file_paths( + source="hi", data_type="l1b", descriptor="hk" ) - datasets = hi_l1c.hi_l1c(load_cdf(science_paths[0]), anc_paths[0]) + if len(l1b_hk_paths) != 1: + raise ValueError( + f"Expected one L1B HK file, got {len(l1b_hk_paths)}" + ) + + cal_prod_paths = dependencies.get_file_paths( + data_type="ancillary", descriptor="cal-prod" + ) + if len(cal_prod_paths) != 1: + raise ValueError( + f"Expected one cal-prod ancillary file, " + f"got {len(cal_prod_paths)}" + ) + + # Load CDFs before passing to hi_goodtimes + l1b_de_datasets = [load_cdf(path) for path in l1b_de_paths] + l1b_hk = load_cdf(l1b_hk_paths[0]) + + output_paths = hi_goodtimes.hi_goodtimes( + l1b_de_datasets, + self.repointing, + l1b_hk, + cal_prod_paths[0], + Path(imap_data_access.config["DATA_DIR"]), + self.start_date, + self.version, + ) + return output_paths + elif "pset" in self.descriptor: + # L1C PSET processing + science_paths = dependencies.get_file_paths( + source="hi", data_type="l1b" + ) + if len(science_paths) != 1: + raise ValueError( + f"Expected only one science dependency. Got {science_paths}" + ) + anc_paths = dependencies.get_file_paths(data_type="ancillary") + if len(anc_paths) != 1: + raise ValueError( + f"Expected only one ancillary dependency. Got {anc_paths}" + ) + datasets = hi_l1c.hi_l1c(load_cdf(science_paths[0]), anc_paths[0]) elif self.data_level == "l2": science_paths = dependencies.get_file_paths(source="hi", data_type="l1c") anc_dependencies = dependencies.get_processing_inputs(data_type="ancillary") diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py index b7131b605..2a519bbf7 100644 --- a/imap_processing/hi/hi_goodtimes.py +++ b/imap_processing/hi/hi_goodtimes.py @@ -10,8 +10,14 @@ import xarray as xr from scipy.ndimage import convolve1d -from imap_processing.hi.utils import CoincidenceBitmap, HiConstants, parse_sensor_number +from imap_processing.hi.utils import ( + CalibrationProductConfig, + CoincidenceBitmap, + HiConstants, + parse_sensor_number, +) from imap_processing.quality_flags import ImapHiL1bDeFlags +from imap_processing.spice.repoint import get_repoint_data logger = logging.getLogger(__name__) @@ -35,6 +41,294 @@ class CullCode(IntEnum): LOOSE = 1 +def hi_goodtimes( + l1b_de_datasets: list[xr.Dataset], + current_repointing: str, + l1b_hk: xr.Dataset, + cal_product_config_path: Path, + output_dir: Path, + start_date: str, + version: str, +) -> list[Path]: + """ + Generate goodtimes file for IMAP-Hi L1C processing. + + This is the top-level function that orchestrates all goodtimes culling + operations for a single pointing. It applies the following filters in order: + + 1. mark_incomplete_spin_sets - Remove incomplete 8-spin histogram periods + 2. mark_drf_times - Remove times during spacecraft drift restabilization + 3. mark_overflow_packets - Remove times when DE packets overflow + 4. mark_statistical_filter_0 - Detect drastic penetrating background changes + 5. mark_statistical_filter_1 - Detect isotropic count rate increases + 6. mark_statistical_filter_2 - Detect short-lived event pulses + + Parameters + ---------- + l1b_de_datasets : list[xr.Dataset] + L1B DE datasets for surrounding pointings. Typically includes + current plus 3 preceding and 3 following pointings (7 total). + Statistical filters 0 and 1 use all datasets; other filters use + only the current pointing. + current_repointing : str + Repointing identifier for the current pointing (e.g., "repoint00001"). + Used to identify which dataset in l1b_de_datasets is the current one. + l1b_hk : xr.Dataset + L1B housekeeping dataset containing DRF status. + cal_product_config_path : Path + Path to calibration product configuration CSV file. + output_dir : Path + Directory where the goodtimes text file will be written. + start_date : str + Start date in YYYYMMDD format for the output filename. + version : str + Version string (e.g., "v001") for the output filename. + + Returns + ------- + list[Path] + List containing the path to the generated goodtimes text file, + or an empty list if processing cannot proceed yet. + + Notes + ----- + The output filename follows the pattern: + imap_hi_sensor{45|90}-goodtimes_{start_date}_{start_date}_{version}.txt + + TODO: Add repointing to filename when it is allowed in ancillary filenames. + + See IMAP-Hi Algorithm Document Sections 2.2.4 and 2.3.2 for details + on each culling algorithm. + + Processing requires that repointing + 3 has occurred (so that statistical + filters can use surrounding pointings). Due to challenges with dependency + management in the batch starter, it was decided to design the Hi goodtimes + to set the L1B DE dependencies as not required and handle the final logic for + checking L1B DE dependencies in this function. If repointing + 3 has not yet + completed, an empty list is returned. If repointing + 3 has occurred but + not all 7 DE files are available, all times are marked as bad. + """ + logger.info("Starting Hi goodtimes processing") + + # Parse the current repoint ID and check if we can process yet + current_repoint_id = int(current_repointing.replace("repoint", "")) + future_repoint_id = current_repoint_id + 3 + + # Check if the future repointing has finished by checking that the next + # repoint is in the repoint dataframe. + repoint_df = get_repoint_data() + required_repoints_complete = ( + future_repoint_id + 1 in repoint_df["repoint_id"].values + ) + + if not required_repoints_complete: + logger.info( + f"Goodtimes cannot yet be processed for {current_repointing}: " + f"repoint{future_repoint_id:05d} has not yet been completed." + ) + return [] + + # Find the current pointing index in the datasets + current_index = _find_current_pointing_index(l1b_de_datasets, current_repointing) + current_l1b_de = l1b_de_datasets[current_index] + + # Create the goodtimes dataset from the current pointing + goodtimes_ds = create_goodtimes_dataset(current_l1b_de) + + # Check if we have the full set of 7 DE files for nominal processing + if len(l1b_de_datasets) == 7: + _apply_goodtimes_filters( + goodtimes_ds, + l1b_de_datasets, + current_index, + l1b_hk, + cal_product_config_path, + ) + else: + # Incomplete DE file set - mark all times as bad + logger.warning( + f"Incomplete DE file set for {current_repointing}: " + f"expected 7 files, got {len(l1b_de_datasets)}. " + "Marking all times as bad." + ) + goodtimes_ds["cull_flags"][:, :] = CullCode.LOOSE + + # Generate output + output_path = _write_goodtimes_output(goodtimes_ds, output_dir, start_date, version) + return [output_path] + + +def _find_current_pointing_index( + l1b_de_datasets: list[xr.Dataset], + current_repointing: str, +) -> int: + """ + Find the index of the current pointing in the datasets list. + + Parameters + ---------- + l1b_de_datasets : list[xr.Dataset] + L1B DE datasets. + current_repointing : str + Repointing identifier for the current pointing. + + Returns + ------- + current_index : int + Index of the current pointing in the datasets list. + + Raises + ------ + ValueError + If the current repointing is not found in the datasets. + """ + for i, ds in enumerate(l1b_de_datasets): + if ds.attrs.get("Repointing") == current_repointing: + logger.info(f"Current pointing index: {i} of {len(l1b_de_datasets)}") + return i + + raise ValueError( + f"Could not find current repointing {current_repointing} " + f"in L1B DE datasets. Available repointings: " + f"{[ds.attrs.get('Repointing') for ds in l1b_de_datasets]}" + ) + + +def _apply_goodtimes_filters( + goodtimes_ds: xr.Dataset, + l1b_de_datasets: list[xr.Dataset], + current_index: int, + l1b_hk: xr.Dataset, + cal_product_config_path: Path, +) -> None: + """ + Apply all goodtimes culling filters to the dataset. + + Modifies goodtimes_ds in place by applying filters 1-6. + + Parameters + ---------- + goodtimes_ds : xr.Dataset + Goodtimes dataset to modify. + l1b_de_datasets : list[xr.Dataset] + All L1B DE datasets (current + surrounding pointings). + current_index : int + Index of the current pointing in l1b_de_datasets. + l1b_hk : xr.Dataset + L1B housekeeping dataset. + cal_product_config_path : Path + Path to calibration product configuration CSV file. + """ + current_l1b_de = l1b_de_datasets[current_index] + + # Load calibration product config + logger.info(f"Loading cal product config: {cal_product_config_path}") + cal_product_config = CalibrationProductConfig.from_csv(cal_product_config_path) + + # Log initial statistics + stats = goodtimes_ds.goodtimes.get_cull_statistics() + logger.info(f"Initial good bins: {stats['good_bins']}/{stats['total_bins']}") + + # Build set of qualified coincidence types from calibration product config + qualified_coincidence_types: set[int] = set() + for coin_types in cal_product_config["coincidence_type_values"]: + qualified_coincidence_types.update(coin_types) + logger.debug(f"Qualified coincidence types: {qualified_coincidence_types}") + + # === Apply culling filters === + + # 1. Mark incomplete spin sets + logger.info("Applying filter: mark_incomplete_spin_sets") + mark_incomplete_spin_sets(goodtimes_ds, current_l1b_de) + + # 2. Mark DRF times (drift restabilization) + logger.info("Applying filter: mark_drf_times") + mark_drf_times(goodtimes_ds, l1b_hk) + + # 3. Mark overflow packets + logger.info("Applying filter: mark_overflow_packets") + mark_overflow_packets(goodtimes_ds, current_l1b_de, cal_product_config) + + # 4. Statistical Filter 0 - drastic background changes + logger.info("Applying filter: mark_statistical_filter_0") + try: + mark_statistical_filter_0(goodtimes_ds, l1b_de_datasets, current_index) + except ValueError as e: + logger.warning(f"Skipping Statistical Filter 0: {e}") + + # 5. Statistical Filter 1 - isotropic count rate increases + logger.info("Applying filter: mark_statistical_filter_1") + try: + mark_statistical_filter_1( + goodtimes_ds, + l1b_de_datasets, + current_index, + qualified_coincidence_types, + ) + except ValueError as e: + logger.warning(f"Skipping Statistical Filter 1: {e}") + + # 6. Statistical Filter 2 - short-lived event pulses + logger.info("Applying filter: mark_statistical_filter_2") + mark_statistical_filter_2( + goodtimes_ds, + current_l1b_de, + qualified_coincidence_types, + ) + + +def _write_goodtimes_output( + goodtimes_ds: xr.Dataset, + output_dir: Path, + start_date: str, + version: str, +) -> Path: + """ + Write goodtimes dataset to output file. + + Parameters + ---------- + goodtimes_ds : xr.Dataset + Goodtimes dataset to write. + output_dir : Path + Directory where the output file will be written. + start_date : str + Start date in YYYYMMDD format. + version : str + Version string (e.g., "v001"). + + Returns + ------- + Path + Path to the generated goodtimes text file. + """ + # Log final statistics + stats = goodtimes_ds.goodtimes.get_cull_statistics() + logger.info( + f"Final statistics: {stats['good_bins']}/{stats['total_bins']} good " + f"({stats['fraction_good'] * 100:.1f}%)" + ) + if stats["cull_code_counts"]: + logger.info(f"Cull code counts: {stats['cull_code_counts']}") + + # Generate output filename + # Pattern: imap_hi_{sensor}-goodtimes_{YYYYMMDD}_{YYYYMMDD}_{version}.txt + # TODO: Add repointing to filename when it is allowed in ancillary filenames. + # Pattern should be: + # imap_hi_{sensor}-goodtimes_{YYYYMMDD}_{YYYYMMDD}_{repointing}_{version}.txt + sensor = goodtimes_ds.attrs["sensor"] + output_filename = ( + f"imap_hi_{sensor}-goodtimes_{start_date}_{start_date}_{version}.txt" + ) + output_path = output_dir / output_filename + + # Write goodtimes to text file + goodtimes_ds.goodtimes.write_txt(output_path) + + logger.info(f"Hi goodtimes processing complete: {output_path}") + return output_path + + def create_goodtimes_dataset(l1b_de: xr.Dataset) -> xr.Dataset: """ Create goodtimes dataset from L1B Direct Event data. @@ -100,7 +394,7 @@ def create_goodtimes_dataset(l1b_de: xr.Dataset) -> xr.Dataset: f"attribute: {l1b_de.attrs['Repointing']}" ) attrs = { - "sensor": f"Hi{sensor_number}", + "sensor": f"sensor{sensor_number}", "pointing": int(match["pointing_num"]), } diff --git a/imap_processing/tests/hi/test_hi_goodtimes.py b/imap_processing/tests/hi/test_hi_goodtimes.py index c2c8c8acb..1b28eaf76 100644 --- a/imap_processing/tests/hi/test_hi_goodtimes.py +++ b/imap_processing/tests/hi/test_hi_goodtimes.py @@ -1,5 +1,7 @@ """Test coverage for imap_processing.hi.hi_goodtimes.py""" +from unittest.mock import MagicMock, patch + import numpy as np import pandas as pd import pytest @@ -9,15 +11,19 @@ INTERVAL_DTYPE, CullCode, _add_sweep_indices, + _apply_goodtimes_filters, _build_per_sweep_datasets, _compute_bins_for_cluster, _compute_median_and_sigma_per_esa, _compute_normalized_counts_per_sweep, _compute_qualified_counts_per_sweep, + _find_current_pointing_index, _find_event_clusters, _get_sweep_indices, _identify_cull_pattern, + _write_goodtimes_output, create_goodtimes_dataset, + hi_goodtimes, mark_drf_times, mark_incomplete_spin_sets, mark_overflow_packets, @@ -140,7 +146,7 @@ def test_from_l1b_de_esa_step_preserved(self, mock_l1b_de, goodtimes_instance): def test_from_l1b_de_attributes(self, goodtimes_instance): """Test that attributes are set correctly.""" - assert goodtimes_instance.attrs["sensor"] == "Hi45" + assert goodtimes_instance.attrs["sensor"] == "sensor45" assert goodtimes_instance.attrs["pointing"] == 42 @@ -358,7 +364,7 @@ def test_get_good_intervals_empty(self): "esa_step": xr.DataArray(np.array([], dtype=np.uint8), dims=["met"]), }, coords={"met": np.array([]), "spin_bin": np.arange(90)}, - attrs={"sensor": "Hi45", "pointing": 0}, + attrs={"sensor": "sensor45", "pointing": 0}, ) intervals = gt.goodtimes.get_good_intervals() @@ -450,7 +456,7 @@ def test_to_txt_format(self, goodtimes_instance, tmp_path): parts = lines[0].strip().split() assert len(parts) == 7 assert parts[0] == "00042" # pointing - assert parts[5] == "Hi45" # sensor + assert parts[5] == "sensor45" # sensor def test_to_txt_values(self, goodtimes_instance, tmp_path): """Test the values in the output file.""" @@ -468,7 +474,7 @@ def test_to_txt_values(self, goodtimes_instance, tmp_path): assert int(met_end) == int(goodtimes_instance.coords["met"].values[0]) assert int(bin_low) == 0 assert int(bin_high) == 89 - assert sensor == "Hi45" + assert sensor == "sensor45" assert int(esa_step) == goodtimes_instance["esa_step"].values[0] def test_to_txt_with_culled_bins(self, goodtimes_instance, tmp_path): @@ -882,7 +888,7 @@ def goodtimes_for_drf(self): "esa_step": xr.DataArray(np.ones(n_mets, dtype=np.uint8), dims=["met"]), }, coords={"met": met_values, "spin_bin": np.arange(90)}, - attrs={"sensor": "Hi45", "pointing": 1}, + attrs={"sensor": "sensor45", "pointing": 1}, ) return gt @@ -1098,7 +1104,7 @@ def test_mark_drf_times_transition_at_start(self): ), }, coords={"met": met_values, "spin_bin": np.arange(90)}, - attrs={"sensor": "Hi45", "pointing": 1}, + attrs={"sensor": "sensor45", "pointing": 1}, ) # HK with DRF active for first 30 samples, then transition @@ -1145,7 +1151,7 @@ def test_mark_drf_times_transition_at_end(self): ), }, coords={"met": met_values, "spin_bin": np.arange(90)}, - attrs={"sensor": "Hi45", "pointing": 1}, + attrs={"sensor": "sensor45", "pointing": 1}, ) # HK with DRF becoming active mid-way, then transition at end @@ -1216,7 +1222,7 @@ def mock_goodtimes(self): ), }, coords={"met": met_values, "spin_bin": np.arange(90)}, - attrs={"sensor": "Hi45", "pointing": 1}, + attrs={"sensor": "sensor45", "pointing": 1}, ) def test_no_full_packets(self, mock_goodtimes, mock_config_df): @@ -1687,7 +1693,7 @@ def goodtimes_for_filter(self): ), }, coords={"met": met_values, "spin_bin": np.arange(90)}, - attrs={"sensor": "Hi45", "pointing": 1}, + attrs={"sensor": "sensor45", "pointing": 1}, ) return gt @@ -2336,7 +2342,7 @@ def goodtimes_for_filter1(self): ), }, coords={"met": met_values, "spin_bin": np.arange(90)}, - attrs={"sensor": "Hi45", "pointing": 1}, + attrs={"sensor": "sensor45", "pointing": 1}, ) return gt @@ -2627,7 +2633,7 @@ def goodtimes_for_filter2(self): "met": met_values, "spin_bin": np.arange(90), }, - attrs={"sensor": "Hi45", "pointing": 1}, + attrs={"sensor": "sensor45", "pointing": 1}, ) return ds @@ -2952,3 +2958,454 @@ def test_custom_parameters(self, goodtimes_for_filter2): cull_flags = goodtimes_for_filter2["cull_flags"].sel(met=1000.0).values assert np.all(cull_flags[39:45] == CullCode.LOOSE) + + +class TestFindCurrentPointingIndex: + """Test suite for _find_current_pointing_index helper function.""" + + def test_finds_current_index(self): + """Test that current index is found correctly.""" + ds1 = MagicMock() + ds1.attrs = {"Repointing": "repoint00001"} + ds2 = MagicMock() + ds2.attrs = {"Repointing": "repoint00002"} + ds3 = MagicMock() + ds3.attrs = {"Repointing": "repoint00003"} + + datasets = [ds1, ds2, ds3] + current_index = _find_current_pointing_index(datasets, "repoint00002") + + assert current_index == 1 + + def test_finds_first_matching_repointing(self): + """Test that the first matching repointing is returned.""" + ds1 = MagicMock() + ds1.attrs = {"Repointing": "repoint00005"} + ds2 = MagicMock() + ds2.attrs = {"Repointing": "repoint00005"} + + datasets = [ds1, ds2] + current_index = _find_current_pointing_index(datasets, "repoint00005") + + assert current_index == 0 + + def test_raises_when_repointing_not_found(self): + """Test that ValueError is raised when repointing not found.""" + ds1 = MagicMock() + ds1.attrs = {"Repointing": "repoint00001"} + ds2 = MagicMock() + ds2.attrs = {"Repointing": "repoint00002"} + + datasets = [ds1, ds2] + with pytest.raises(ValueError, match="Could not find current repointing"): + _find_current_pointing_index(datasets, "repoint00099") + + +class TestWriteGoodtimesOutput: + """Test suite for _write_goodtimes_output helper function.""" + + def test_generates_correct_filename_sensor45(self, tmp_path): + """Test filename generation for sensor45.""" + mock_goodtimes = MagicMock() + mock_goodtimes.attrs = {"sensor": "sensor45"} + mock_goodtimes.goodtimes.get_cull_statistics.return_value = { + "good_bins": 100, + "total_bins": 100, + "fraction_good": 1.0, + "cull_code_counts": {}, + } + + output_path = _write_goodtimes_output( + mock_goodtimes, + tmp_path, + start_date="20260115", + version="v001", + ) + + assert ( + output_path.name == "imap_hi_sensor45-goodtimes_20260115_20260115_v001.txt" + ) + mock_goodtimes.goodtimes.write_txt.assert_called_once_with(output_path) + + def test_generates_correct_filename_sensor90(self, tmp_path): + """Test filename generation for sensor90.""" + mock_goodtimes = MagicMock() + mock_goodtimes.attrs = {"sensor": "sensor90"} + mock_goodtimes.goodtimes.get_cull_statistics.return_value = { + "good_bins": 50, + "total_bins": 100, + "fraction_good": 0.5, + "cull_code_counts": {1: 50}, + } + + output_path = _write_goodtimes_output( + mock_goodtimes, + tmp_path, + start_date="20260201", + version="v002", + ) + + assert ( + output_path.name == "imap_hi_sensor90-goodtimes_20260201_20260201_v002.txt" + ) + + def test_returns_path_in_output_dir(self, tmp_path): + """Test that returned path is in the output directory.""" + mock_goodtimes = MagicMock() + mock_goodtimes.attrs = {"sensor": "sensor45"} + mock_goodtimes.goodtimes.get_cull_statistics.return_value = { + "good_bins": 100, + "total_bins": 100, + "fraction_good": 1.0, + "cull_code_counts": {}, + } + + output_path = _write_goodtimes_output( + mock_goodtimes, + tmp_path, + start_date="20260101", + version="v001", + ) + + assert output_path.parent == tmp_path + assert output_path.suffix == ".txt" + + def test_calls_write_txt(self, tmp_path): + """Test that write_txt accessor is called with correct path.""" + mock_goodtimes = MagicMock() + mock_goodtimes.attrs = {"sensor": "sensor45"} + mock_goodtimes.goodtimes.get_cull_statistics.return_value = { + "good_bins": 100, + "total_bins": 100, + "fraction_good": 1.0, + "cull_code_counts": {}, + } + + output_path = _write_goodtimes_output( + mock_goodtimes, + tmp_path, + start_date="20260101", + version="v001", + ) + + mock_goodtimes.goodtimes.write_txt.assert_called_once_with(output_path) + + +class TestApplyGoodtimesFilters: + """Test suite for _apply_goodtimes_filters helper function.""" + + def test_loads_cal_config(self, tmp_path): + """Test that cal config is loaded.""" + mock_goodtimes = MagicMock() + mock_goodtimes.goodtimes.get_cull_statistics.return_value = { + "good_bins": 100, + "total_bins": 100, + } + mock_l1b_de = MagicMock() + mock_hk = MagicMock() + mock_cal = {"coincidence_type_values": [{12}]} + + cal_path = tmp_path / "cal.csv" + + with ( + patch( + "imap_processing.hi.utils.CalibrationProductConfig.from_csv" + ) as mock_cal_load, + patch("imap_processing.hi.hi_goodtimes.mark_incomplete_spin_sets"), + patch("imap_processing.hi.hi_goodtimes.mark_drf_times"), + patch("imap_processing.hi.hi_goodtimes.mark_overflow_packets"), + patch("imap_processing.hi.hi_goodtimes.mark_statistical_filter_0"), + patch("imap_processing.hi.hi_goodtimes.mark_statistical_filter_1"), + patch("imap_processing.hi.hi_goodtimes.mark_statistical_filter_2"), + ): + mock_cal_load.return_value = mock_cal + + _apply_goodtimes_filters( + mock_goodtimes, + [mock_l1b_de], + current_index=0, + l1b_hk=mock_hk, + cal_product_config_path=cal_path, + ) + + mock_cal_load.assert_called_once_with(cal_path) + + def test_calls_all_filters(self, tmp_path): + """Test that all 6 filters are called.""" + mock_goodtimes = MagicMock() + mock_goodtimes.goodtimes.get_cull_statistics.return_value = { + "good_bins": 100, + "total_bins": 100, + } + mock_l1b_de = MagicMock() + mock_hk = MagicMock() + mock_cal = {"coincidence_type_values": [{12}]} + + with ( + patch( + "imap_processing.hi.utils.CalibrationProductConfig.from_csv", + return_value=mock_cal, + ), + patch( + "imap_processing.hi.hi_goodtimes.mark_incomplete_spin_sets" + ) as mock_f1, + patch("imap_processing.hi.hi_goodtimes.mark_drf_times") as mock_f2, + patch("imap_processing.hi.hi_goodtimes.mark_overflow_packets") as mock_f3, + patch( + "imap_processing.hi.hi_goodtimes.mark_statistical_filter_0" + ) as mock_f4, + patch( + "imap_processing.hi.hi_goodtimes.mark_statistical_filter_1" + ) as mock_f5, + patch( + "imap_processing.hi.hi_goodtimes.mark_statistical_filter_2" + ) as mock_f6, + ): + _apply_goodtimes_filters( + mock_goodtimes, + [mock_l1b_de], + current_index=0, + l1b_hk=mock_hk, + cal_product_config_path=tmp_path / "cal.csv", + ) + + mock_f1.assert_called_once() + mock_f2.assert_called_once() + mock_f3.assert_called_once() + mock_f4.assert_called_once() + mock_f5.assert_called_once() + mock_f6.assert_called_once() + + def test_handles_statistical_filter_errors(self, tmp_path): + """Test that ValueError from statistical filters is caught.""" + mock_goodtimes = MagicMock() + mock_goodtimes.goodtimes.get_cull_statistics.return_value = { + "good_bins": 100, + "total_bins": 100, + } + mock_l1b_de = MagicMock() + mock_hk = MagicMock() + mock_cal = {"coincidence_type_values": [{12}]} + + with ( + patch( + "imap_processing.hi.utils.CalibrationProductConfig.from_csv", + return_value=mock_cal, + ), + patch("imap_processing.hi.hi_goodtimes.mark_incomplete_spin_sets"), + patch("imap_processing.hi.hi_goodtimes.mark_drf_times"), + patch("imap_processing.hi.hi_goodtimes.mark_overflow_packets"), + patch( + "imap_processing.hi.hi_goodtimes.mark_statistical_filter_0", + side_effect=ValueError("test"), + ), + patch( + "imap_processing.hi.hi_goodtimes.mark_statistical_filter_1", + side_effect=ValueError("test"), + ), + patch("imap_processing.hi.hi_goodtimes.mark_statistical_filter_2"), + ): + # Should not raise - errors are caught and logged + _apply_goodtimes_filters( + mock_goodtimes, + [mock_l1b_de], + current_index=0, + l1b_hk=mock_hk, + cal_product_config_path=tmp_path / "cal.csv", + ) + + +class TestHiGoodtimes: + """Test suite for hi_goodtimes top-level function.""" + + def test_returns_empty_list_when_repoint_not_complete(self, tmp_path): + """Test that empty list is returned when repoint+3 has not occurred.""" + mock_repoint_df = pd.DataFrame( + { + "repoint_id": [1, 2, 3], + } + ) + mock_de = MagicMock() + mock_hk = MagicMock() + + with patch( + "imap_processing.hi.hi_goodtimes.get_repoint_data" + ) as mock_get_repoint: + mock_get_repoint.return_value = mock_repoint_df + + result = hi_goodtimes( + l1b_de_datasets=[mock_de], + current_repointing="repoint00001", + l1b_hk=mock_hk, + cal_product_config_path=tmp_path / "cal.csv", + output_dir=tmp_path, + start_date="20260101", + version="v001", + ) + + assert result == [] + + def test_calls_find_current_index_when_repoint_complete(self, tmp_path): + """Test that _find_current_pointing_index is called when repoint passes.""" + mock_repoint_df = pd.DataFrame({"repoint_id": list(range(1, 10))}) + mock_goodtimes = MagicMock() + mock_goodtimes.attrs = {"sensor": "sensor45"} + mock_goodtimes.__getitem__ = MagicMock() + mock_datasets = [MagicMock() for _ in range(7)] + mock_hk = MagicMock() + + with ( + patch( + "imap_processing.hi.hi_goodtimes.get_repoint_data", + return_value=mock_repoint_df, + ), + patch( + "imap_processing.hi.hi_goodtimes._find_current_pointing_index", + return_value=3, + ) as mock_find, + patch( + "imap_processing.hi.hi_goodtimes.create_goodtimes_dataset", + return_value=mock_goodtimes, + ), + patch("imap_processing.hi.hi_goodtimes._apply_goodtimes_filters"), + patch( + "imap_processing.hi.hi_goodtimes._write_goodtimes_output", + return_value=tmp_path / "out.txt", + ), + ): + hi_goodtimes( + l1b_de_datasets=mock_datasets, + current_repointing="repoint00004", + l1b_hk=mock_hk, + cal_product_config_path=tmp_path / "cal.csv", + output_dir=tmp_path, + start_date="20260101", + version="v001", + ) + + mock_find.assert_called_once_with(mock_datasets, "repoint00004") + + def test_marks_all_bad_when_incomplete_de_set(self, tmp_path): + """Test that cull_flags are set when DE set is incomplete.""" + mock_repoint_df = pd.DataFrame({"repoint_id": list(range(1, 10))}) + mock_goodtimes = MagicMock() + mock_goodtimes.attrs = {"sensor": "sensor45"} + mock_cull_flags = MagicMock() + mock_goodtimes.__getitem__ = MagicMock(return_value=mock_cull_flags) + mock_datasets = [MagicMock() for _ in range(3)] # Less than 7 + mock_hk = MagicMock() + + with ( + patch( + "imap_processing.hi.hi_goodtimes.get_repoint_data", + return_value=mock_repoint_df, + ), + patch( + "imap_processing.hi.hi_goodtimes._find_current_pointing_index", + return_value=0, + ), + patch( + "imap_processing.hi.hi_goodtimes.create_goodtimes_dataset", + return_value=mock_goodtimes, + ), + patch( + "imap_processing.hi.hi_goodtimes._write_goodtimes_output", + return_value=tmp_path / "out.txt", + ), + ): + hi_goodtimes( + l1b_de_datasets=mock_datasets, + current_repointing="repoint00001", + l1b_hk=mock_hk, + cal_product_config_path=tmp_path / "cal.csv", + output_dir=tmp_path, + start_date="20260101", + version="v001", + ) + + # Verify cull_flags were set to LOOSE (all bad) + mock_goodtimes.__getitem__.assert_called_with("cull_flags") + + def test_calls_apply_filters_when_full_de_set(self, tmp_path): + """Test that _apply_goodtimes_filters is called with 7 DE datasets.""" + mock_repoint_df = pd.DataFrame({"repoint_id": list(range(1, 10))}) + mock_goodtimes = MagicMock() + mock_goodtimes.attrs = {"sensor": "sensor45"} + mock_datasets = [MagicMock() for _ in range(7)] + mock_hk = MagicMock() + + with ( + patch( + "imap_processing.hi.hi_goodtimes.get_repoint_data", + return_value=mock_repoint_df, + ), + patch( + "imap_processing.hi.hi_goodtimes._find_current_pointing_index", + return_value=3, + ), + patch( + "imap_processing.hi.hi_goodtimes.create_goodtimes_dataset", + return_value=mock_goodtimes, + ), + patch( + "imap_processing.hi.hi_goodtimes._apply_goodtimes_filters" + ) as mock_apply, + patch( + "imap_processing.hi.hi_goodtimes._write_goodtimes_output", + return_value=tmp_path / "out.txt", + ), + ): + hi_goodtimes( + l1b_de_datasets=mock_datasets, + current_repointing="repoint00004", + l1b_hk=mock_hk, + cal_product_config_path=tmp_path / "cal.csv", + output_dir=tmp_path, + start_date="20260101", + version="v001", + ) + + mock_apply.assert_called_once() + + def test_calls_write_output(self, tmp_path): + """Test that _write_goodtimes_output is called.""" + mock_repoint_df = pd.DataFrame({"repoint_id": list(range(1, 10))}) + mock_goodtimes = MagicMock() + mock_goodtimes.attrs = {"sensor": "sensor45"} + expected_path = tmp_path / "output.txt" + mock_datasets = [MagicMock() for _ in range(7)] + mock_hk = MagicMock() + + with ( + patch( + "imap_processing.hi.hi_goodtimes.get_repoint_data", + return_value=mock_repoint_df, + ), + patch( + "imap_processing.hi.hi_goodtimes._find_current_pointing_index", + return_value=3, + ), + patch( + "imap_processing.hi.hi_goodtimes.create_goodtimes_dataset", + return_value=mock_goodtimes, + ), + patch("imap_processing.hi.hi_goodtimes._apply_goodtimes_filters"), + patch( + "imap_processing.hi.hi_goodtimes._write_goodtimes_output", + return_value=expected_path, + ) as mock_write, + ): + result = hi_goodtimes( + l1b_de_datasets=mock_datasets, + current_repointing="repoint00004", + l1b_hk=mock_hk, + cal_product_config_path=tmp_path / "cal.csv", + output_dir=tmp_path, + start_date="20260115", + version="v002", + ) + + mock_write.assert_called_once_with( + mock_goodtimes, tmp_path, "20260115", "v002" + ) + assert result == [expected_path] diff --git a/imap_processing/tests/test_cli.py b/imap_processing/tests/test_cli.py index b4040604f..ec7cc6c3c 100644 --- a/imap_processing/tests/test_cli.py +++ b/imap_processing/tests/test_cli.py @@ -281,11 +281,12 @@ def test_post_processing_returns_empty_list_if_invoked_with_no_data( @pytest.mark.parametrize( - "data_level, function_name, science_input, anc_input, n_prods", + "data_level, data_descriptor, function_name, science_input, anc_input, n_prods", [ - ("l1a", "hi_l1a", ["imap_hi_l0_raw_20231212_v001.pkts"], [], 2), + ("l1a", "sci", "hi_l1a", ["imap_hi_l0_raw_20231212_v001.pkts"], [], 2), ( "l1b", + "90sensor-de", "annotate_direct_events", [ "imap_hi_l1a_90sensor-de_20241105_v001.cdf", @@ -294,9 +295,10 @@ def test_post_processing_returns_empty_list_if_invoked_with_no_data( ["imap_hi_90sensor-esa-energies_20240101_v001.csv"], 1, ), - ("l1b", "housekeeping", ["imap_hi_l0_raw_20231212_v001.pkts"], [], 2), + ("l1b", "sci", "housekeeping", ["imap_hi_l0_raw_20231212_v001.pkts"], [], 2), ( "l1c", + "45sensor-pset", "hi_l1c", ["imap_hi_l1b_45sensor-de_20250415_v001.cdf"], ["imap_hi_calibration-prod-config_20240101_v001.csv"], @@ -304,6 +306,7 @@ def test_post_processing_returns_empty_list_if_invoked_with_no_data( ), ( "l2", + "h90-ena-h-sf-nsp-full-hae-4deg-3mo", "hi_l2", [ "imap_hi_l1c_90sensor-pset_20250415_v001.cdf", @@ -321,6 +324,7 @@ def test_post_processing_returns_empty_list_if_invoked_with_no_data( def test_hi( mock_instrument_dependencies, data_level, + data_descriptor, function_name, science_input, anc_input, @@ -346,7 +350,13 @@ def test_hi( '[{"type": "science","files": ["imap_hi_l0_raw_20231212_v001.pkts"]}]' ) instrument = Hi( - data_level, "sci", dependency_str, "20231212", "20231213", "v005", False + data_level, + data_descriptor, + dependency_str, + "20231212", + "repoint00001", + "v005", + False, ) instrument.process() @@ -354,6 +364,79 @@ def test_hi( assert mock_instrument_dependencies["mock_write_cdf"].call_count == n_prods +@mock.patch("imap_processing.cli.hi_goodtimes.hi_goodtimes", autospec=True) +def test_hi_l1c_goodtimes(mock_hi_goodtimes, mock_instrument_dependencies): + """Test coverage for cli.Hi class with l1c goodtimes descriptor""" + mocks = mock_instrument_dependencies + # goodtimes returns paths directly, not datasets + expected_output_path = Path("/path/to/goodtimes_output.txt") + mock_hi_goodtimes.return_value = [expected_output_path] + + # Mock load_cdf to return xr.Dataset objects + mock_de_dataset = xr.Dataset() + mock_hk_dataset = xr.Dataset() + # 7 DE files + 1 HK file = 8 total calls to load_cdf + mocks["mock_load_cdf"].side_effect = [ + mock_de_dataset, + mock_de_dataset, + mock_de_dataset, + mock_de_dataset, + mock_de_dataset, + mock_de_dataset, + mock_de_dataset, + mock_hk_dataset, + ] + + # Set up the input collection with required dependencies + input_collection = ProcessingInputCollection( + ScienceInput("imap_hi_l1b_45sensor-de_20250415-repoint00001_v001.cdf"), + ScienceInput("imap_hi_l1b_45sensor-de_20250415-repoint00002_v001.cdf"), + ScienceInput("imap_hi_l1b_45sensor-de_20250415-repoint00003_v001.cdf"), + ScienceInput("imap_hi_l1b_45sensor-de_20250415-repoint00004_v001.cdf"), + ScienceInput("imap_hi_l1b_45sensor-de_20250415-repoint00005_v001.cdf"), + ScienceInput("imap_hi_l1b_45sensor-de_20250415-repoint00006_v001.cdf"), + ScienceInput("imap_hi_l1b_45sensor-de_20250415-repoint00007_v001.cdf"), + ScienceInput("imap_hi_l1b_45sensor-hk_20250415-repoint00004_v001.cdf"), + AncillaryInput("imap_hi_45sensor-cal-prod_20240101_v001.json"), + ) + mocks["mock_pre_processing"].return_value = input_collection + + dependency_str = input_collection.serialize() + instrument = Hi( + "l1c", + "goodtimes", + dependency_str, + "20250415", + "repoint00004", + "v005", + False, + ) + + instrument.process() + + # Verify load_cdf was called for DE files and HK file + assert mocks["mock_load_cdf"].call_count == 8 # 7 DE + 1 HK + + # Verify hi_goodtimes was called with correct arguments + assert mock_hi_goodtimes.call_count == 1 + call_args = mock_hi_goodtimes.call_args + + # Check that datasets (not paths) were passed for l1b_de_datasets and l1b_hk + assert isinstance(call_args.args[0], list) # l1b_de_datasets is a list + assert len(call_args.args[0]) == 7 # 7 DE datasets + assert isinstance(call_args.args[2], xr.Dataset) # l1b_hk is a dataset + + # Check that start_date and version were passed correctly + assert call_args.args[5] == "20250415" # start_date + assert call_args.args[6] == "v005" # version + assert call_args.args[1] == "repoint00004" # current_repointing + + # goodtimes returns paths directly, so write_cdf should not be called for + # the goodtimes output itself, but post_processing handles Path objects + # by just passing them through for upload + assert mocks["mock_write_cdf"].call_count == 0 + + @mock.patch("imap_processing.cli.lo_l2.lo_l2", autospec=True) def test_lo_l2(mock_lo_l2, mock_instrument_dependencies): mocks = mock_instrument_dependencies