From 0b215dc5ce3350a283b0330b2d710cba8d63c377 Mon Sep 17 00:00:00 2001 From: Tim Plummer Date: Mon, 23 Feb 2026 16:58:14 -0700 Subject: [PATCH 1/6] Add statistical filter 2 config values to utils.HiConstants Move DE_CLOCK_TICK constants inot HiConstants --- imap_processing/hi/hi_l1a.py | 13 ++--------- imap_processing/hi/hi_l1b.py | 8 ++++--- imap_processing/hi/hi_l1c.py | 13 +++++------ imap_processing/hi/utils.py | 29 +++++++++++++++++++++++++ imap_processing/tests/hi/test_hi_l1c.py | 9 ++++---- 5 files changed, 45 insertions(+), 27 deletions(-) diff --git a/imap_processing/hi/hi_l1a.py b/imap_processing/hi/hi_l1a.py index 1015ee4490..72a7e819cc 100644 --- a/imap_processing/hi/hi_l1a.py +++ b/imap_processing/hi/hi_l1a.py @@ -11,19 +11,10 @@ from imap_processing import imap_module_directory from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes -from imap_processing.hi.utils import HIAPID +from imap_processing.hi.utils import HIAPID, HiConstants from imap_processing.spice.time import met_to_ttj2000ns from imap_processing.utils import packet_file_to_datasets -# TODO: read DE_CLOCK_TICK_US from -# instrument status summary later. This value -# is rarely change but want to be able to change -# it if needed. It stores information about how -# fast the time was ticking. It is in microseconds. -DE_CLOCK_TICK_US = 1999 -DE_CLOCK_TICK_S = DE_CLOCK_TICK_US / 1e6 -HALF_CLOCK_TICK_S = DE_CLOCK_TICK_S / 2 - MILLISECOND_TO_S = 1e-3 # define the names of the 24 counter arrays @@ -293,7 +284,7 @@ def create_de_dataset(de_data_dict: dict[str, npt.ArrayLike]) -> xr.Dataset: # See Hi Algorithm Document section 2.2.5 event_met_array = np.array( meta_event_met[de_data_dict["ccsds_index"]] - + np.array(de_data_dict["de_tag"]) * DE_CLOCK_TICK_S, + + np.array(de_data_dict["de_tag"]) * HiConstants.DE_CLOCK_TICK_S, dtype=event_met_dtype, ) diff --git a/imap_processing/hi/hi_l1b.py b/imap_processing/hi/hi_l1b.py index 43feae00e7..a7a648eb53 100644 --- a/imap_processing/hi/hi_l1b.py +++ b/imap_processing/hi/hi_l1b.py @@ -11,7 +11,7 @@ from imap_processing import imap_module_directory from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes from imap_processing.cdf.utils import parse_filename_like -from imap_processing.hi.hi_l1a import HALF_CLOCK_TICK_S, MILLISECOND_TO_S +from imap_processing.hi.hi_l1a import MILLISECOND_TO_S from imap_processing.hi.utils import ( HIAPID, CoincidenceBitmap, @@ -335,7 +335,7 @@ def de_nominal_bin_and_spin_phase(dataset: xr.Dataset) -> dict[str, xr.DataArray # be binned into in the histogram packet. The Hi histogram data is binned by # spacecraft spin-phase, not instrument spin-phase, so the same is done here. # We have to add 1/2 clock tick to MET time before getting spin phase - met_seconds = dataset.event_met.values + HALF_CLOCK_TICK_S + met_seconds = dataset.event_met.values + HiConstants.HALF_CLOCK_TICK_S imap_spin_phase = get_spacecraft_spin_phase(met_seconds) new_vars["nominal_bin"].values = np.asarray(imap_spin_phase * 360 / 4).astype( np.uint8 @@ -380,7 +380,9 @@ def compute_hae_coordinates(dataset: xr.Dataset) -> dict[str, xr.DataArray]: # Per Section 2.2.5 of Algorithm Document, add 1/2 of tick duration # to MET before computing pointing. - sclk_ticks = met_to_sclkticks(dataset.event_met.values + HALF_CLOCK_TICK_S) + sclk_ticks = met_to_sclkticks( + dataset.event_met.values + HiConstants.HALF_CLOCK_TICK_S + ) et = sct_to_et(sclk_ticks) sensor_number = parse_sensor_number(dataset.attrs["Logical_source"]) # TODO: For now, we are using SPICE to compute the look direction for each diff --git a/imap_processing/hi/hi_l1c.py b/imap_processing/hi/hi_l1c.py index c9473ddec8..c4802c2352 100644 --- a/imap_processing/hi/hi_l1c.py +++ b/imap_processing/hi/hi_l1c.py @@ -14,12 +14,9 @@ from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes from imap_processing.cdf.utils import parse_filename_like -from imap_processing.hi.hi_l1a import ( - DE_CLOCK_TICK_S, - HALF_CLOCK_TICK_S, -) from imap_processing.hi.utils import ( CalibrationProductConfig, + HiConstants, create_dataset_variables, full_dataarray, parse_sensor_number, @@ -557,7 +554,7 @@ def pset_exposure( # for a given clock tick, add 1/2 clock tick and compute spin-phase. spin_phases = np.atleast_1d( get_instrument_spin_phase( - clock_tick_mets + HALF_CLOCK_TICK_S, + clock_tick_mets + HiConstants.HALF_CLOCK_TICK_S, SpiceFrame[f"IMAP_HI_{sensor_number}"], ) ) @@ -581,7 +578,7 @@ def pset_exposure( exposure_var["exposure_times"].values[:, i_esa] += new_exposure_times # Convert exposure clock ticks to seconds - exposure_var["exposure_times"].values *= DE_CLOCK_TICK_S + exposure_var["exposure_times"].values *= HiConstants.DE_CLOCK_TICK_S return exposure_var @@ -693,7 +690,7 @@ def get_de_clock_ticks_for_esa_step( clock_tick_mets = np.arange( spin_start_mets[end_time_ind - 8], spin_start_mets[end_time_ind], - DE_CLOCK_TICK_S, + HiConstants.DE_CLOCK_TICK_S, dtype=float, ) # The final clock-tick bin has less exposure time because the next spin @@ -705,7 +702,7 @@ def get_de_clock_ticks_for_esa_step( clock_tick_weights = np.ones_like(clock_tick_mets, dtype=float) clock_tick_weights[-1] = ( spin_start_mets[end_time_ind] - clock_tick_mets[-1] - ) / DE_CLOCK_TICK_S + ) / HiConstants.DE_CLOCK_TICK_S return clock_tick_mets, clock_tick_weights diff --git a/imap_processing/hi/utils.py b/imap_processing/hi/utils.py index a547a67b10..a8152ae228 100644 --- a/imap_processing/hi/utils.py +++ b/imap_processing/hi/utils.py @@ -51,6 +51,16 @@ class HiConstants: Attributes ---------- + DE_CLOCK_TICK_US : int + Duration of Direct Event clock tick in microseconds. This is the time + resolution of the Direct Event time tags. See IMAP-Hi Algorithm Document + Section 2.2.5 Annotated Direct Events for more details. + DE_CLOCK_TICK_S : float + Duration of Direct Event clock tick in seconds. + This is derived from DE_CLOCK_TICK_US. + HALF_CLOCK_TICK_S : float + Half of the Direct Event clock tick duration in seconds. This is derived + from DE_CLOCK_TICK_S. TOF1_TICK_DUR : int Duration of Time-of-Flight 1 clock tick in nanoseconds. TOF2_TICK_DUR : int @@ -76,8 +86,24 @@ class HiConstants: Sigma multiplier for extreme outlier check in Filter 1. STAT_FILTER_1_MIN_CONSECUTIVE : int Minimum consecutive intervals above threshold in Filter 1. + STAT_FILTER_2_MIN_EVENTS : int + Minimum events to form a pulse cluster in Filter 2. + STAT_FILTER_2_MAX_TIME_DELTA : float + Maximum time span in seconds for events to be considered clustered + in Filter 2. + STAT_FILTER_2_BIN_PADDING : int + Number of bins to add on each side of pulse angle range in Filter 2. """ + # TODO: read DE_CLOCK_TICK_US from + # instrument status summary later. This value + # is rarely change but want to be able to change + # it if needed. It stores information about how + # fast the time was ticking. It is in microseconds. + DE_CLOCK_TICK_US = 1999 + DE_CLOCK_TICK_S = DE_CLOCK_TICK_US / 1e6 + HALF_CLOCK_TICK_S = DE_CLOCK_TICK_S / 2 + TOF1_TICK_DUR = 1 # 1 ns TOF2_TICK_DUR = 1 # 1 ns TOF3_TICK_DUR = 0.5 # 0.5 ns @@ -97,6 +123,9 @@ class HiConstants: STAT_FILTER_1_CONSECUTIVE_SIGMA = 1.8 STAT_FILTER_1_EXTREME_SIGMA = 5.0 STAT_FILTER_1_MIN_CONSECUTIVE = 3 + STAT_FILTER_2_MIN_EVENTS = 6 + STAT_FILTER_2_MAX_TIME_DELTA = 5000 * DE_CLOCK_TICK_S + STAT_FILTER_2_BIN_PADDING = 1 def parse_sensor_number(full_string: str) -> int: diff --git a/imap_processing/tests/hi/test_hi_l1c.py b/imap_processing/tests/hi/test_hi_l1c.py index 1b393eda27..f0fad87072 100644 --- a/imap_processing/tests/hi/test_hi_l1c.py +++ b/imap_processing/tests/hi/test_hi_l1c.py @@ -14,8 +14,7 @@ from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes from imap_processing.cdf.utils import load_cdf, write_cdf from imap_processing.hi import hi_l1c -from imap_processing.hi.hi_l1a import DE_CLOCK_TICK_S -from imap_processing.hi.utils import HIAPID +from imap_processing.hi.utils import HIAPID, HiConstants from imap_processing.spice.time import met_to_ttj2000ns, ttj2000ns_to_et @@ -511,11 +510,11 @@ def test_pset_exposure( ] ).astype(float)[None, :, :] # Convert expected clock ticks to seconds - expected_values *= DE_CLOCK_TICK_S + expected_values *= HiConstants.DE_CLOCK_TICK_S np.testing.assert_allclose( exposure_dict["exposure_times"].data, expected_values, - atol=DE_CLOCK_TICK_S / 100, + atol=HiConstants.DE_CLOCK_TICK_S / 100, ) @@ -596,7 +595,7 @@ def test_get_de_clock_ticks_for_esa_step(fake_spin_df): np.absolute( fake_spin_df.spin_start_met.to_numpy() - clock_tick_mets[-1] ).min() - / DE_CLOCK_TICK_S + / HiConstants.DE_CLOCK_TICK_S ) assert clock_tick_weights[-1] == exp_final_weight assert np.all(clock_tick_weights[:-1] == 1) From 9dd320dd9c004fa72d0ec3a6898810942edbec0a Mon Sep 17 00:00:00 2001 From: Tim Plummer Date: Mon, 23 Feb 2026 17:01:21 -0700 Subject: [PATCH 2/6] Add statistical filter 2 specific functions --- imap_processing/hi/hi_goodtimes.py | 250 ++++++++++ imap_processing/tests/hi/test_hi_goodtimes.py | 465 ++++++++++++++++++ 2 files changed, 715 insertions(+) diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py index 725fc9e613..79946472ee 100644 --- a/imap_processing/hi/hi_goodtimes.py +++ b/imap_processing/hi/hi_goodtimes.py @@ -1495,3 +1495,253 @@ def mark_statistical_filter_1( ) else: logger.info("Statistical Filter 1: No bad intervals identified") + + +def _find_event_clusters( + event_times: np.ndarray, + min_events: int, + max_time_delta: float, +) -> list[tuple[int, int]]: + """ + Find clusters of events that occur within a maximum time window. + + Uses vectorized numpy operations to find groups of min_events or more + events that all occur within max_time_delta seconds of each other. + + Parameters + ---------- + event_times : np.ndarray + Sorted array of event times (event_met values in seconds). + min_events : int + Minimum number of events to form a cluster. + max_time_delta : float + Maximum time span in seconds for events to be considered clustered. + + Returns + ------- + list[tuple[int, int]] + List of (start_idx, end_idx) tuples marking cluster boundaries + in the input array. Indices are inclusive. + """ + if len(event_times) < min_events: + return [] + + # Compute time span for each window of min_events consecutive events + # window_spans[i] = event_times[i + min_events - 1] - event_times[i] + window_spans = event_times[min_events - 1 :] - event_times[: -(min_events - 1)] + + # Mask where windows fit within max_time_delta + cluster_mask = window_spans <= max_time_delta + + if not np.any(cluster_mask): + return [] + + # Find contiguous regions of True in the mask. Each contiguous region + # [i, j] in the mask corresponds to cluster [i, j + min_events - 1] + + # Pad with False to handle edge cases + padded = np.concatenate(([False], cluster_mask, [False])) + + # Find transitions: +1 = start of group, -1 = end of group + diff = np.diff(padded.astype(int)) + starts = np.flatnonzero(diff == 1) + ends = np.flatnonzero(diff == -1) + min_events - 2 # Adjust for window size + + return list(zip(starts.tolist(), ends.tolist(), strict=False)) + + +def _compute_bins_for_cluster( + nominal_bins: np.ndarray, + cluster_start: int, + cluster_end: int, + bin_padding: int, + n_bins: int = 90, +) -> np.ndarray: + """ + Compute the spin bins to cull for a cluster of events, with wrapping. + + Parameters + ---------- + nominal_bins : np.ndarray + Array of nominal_bin values for events. + cluster_start : int + Start index of cluster in nominal_bins array. + cluster_end : int + End index of cluster (inclusive). + bin_padding : int + Number of bins to add on each side. + n_bins : int + Total number of spin bins (default 90). + + Returns + ------- + np.ndarray + Array of bin indices to cull, with wrapping handled. + For example, if cluster spans bins 88-91 with n_bins=90, + returns [87, 88, 89, 0, 1, 2] (with padding=1). + """ + cluster_bins = nominal_bins[cluster_start : cluster_end + 1].astype(np.int32) + + # Unwrap to handle clusters spanning the 0/n_bins boundary + unwrapped = np.unwrap(cluster_bins, period=n_bins) + bin_min = int(np.min(unwrapped)) + bin_max = int(np.max(unwrapped)) + + # Add padding + bin_low = bin_min - bin_padding + bin_high = bin_max + bin_padding + + # Generate bin indices with wrapping using modulo + bins_to_mark = np.arange(bin_low, bin_high + 1) % n_bins + + return bins_to_mark + + +def mark_statistical_filter_2( + goodtimes_ds: xr.Dataset, + l1b_de: xr.Dataset, + qualified_coincidence_types: set[int], + min_events: int = HiConstants.STAT_FILTER_2_MIN_EVENTS, + max_time_delta: float = HiConstants.STAT_FILTER_2_MAX_TIME_DELTA, + bin_padding: int = HiConstants.STAT_FILTER_2_BIN_PADDING, + cull_code: int = CullCode.LOOSE, +) -> None: + """ + Apply Statistical Filter 2 to detect short-lived event pulses. + + Statistical Filter 2 from Algorithm Document Section 2.3.2.3 removes + occasional short-lived "pulses" of qualified counts that may be + temporally correlated between sensors. These pulses are usually visible + only at the highest few energy steps and are not caught by Filter 1. + + For each 8-spin set (grouped by esa_sweep and esa_step), this filter: + 1. Keeps only events qualifying as calibration product 1 or 2 + 2. Sorts events by event_met + 3. Finds time ranges where min_events or more events occur within + max_time_delta seconds + 4. Marks the angle range covered by each pulse (plus bin_padding bins + on each side, with wrapping) as not good for all METs in that 8-spin set + + Parameters + ---------- + goodtimes_ds : xr.Dataset + Goodtimes dataset for the current Pointing to update. + l1b_de : xr.Dataset + L1B Direct Event dataset for the current Pointing containing: + - ccsds_index: packet index for each event + - ccsds_met: MET timestamp for each packet + - event_met: MET timestamp for each event + - coincidence_type: detector coincidence bitmap + - nominal_bin: spacecraft spin bin (0-89) + - esa_step: ESA energy step for each packet + qualified_coincidence_types : set[int] + Set of coincidence type integers qualifying as calibration + products 1 or 2. + min_events : int, optional + Minimum events to form a pulse cluster. + Default is HiConstants.STAT_FILTER_2_MIN_EVENTS. + max_time_delta : float, optional + Maximum time span in seconds for events to be considered clustered. + Default is HiConstants.STAT_FILTER_2_MAX_TIME_DELTA. + bin_padding : int, optional + Number of 4-degree bins to add on each side of the pulse angle range. + Default is HiConstants.STAT_FILTER_2_BIN_PADDING. + cull_code : int, optional + Cull code to use for marking bad times. Default is CullCode.LOOSE. + + Notes + ----- + This function modifies goodtimes_ds in place. It marks specific spin + bins as bad, unlike Filters 0 and 1 which mark entire time intervals. + Bin marking wraps around (e.g., bin 91 becomes bin 1). + + The default parameters (min_events=6, max_time_delta=10.0) imply that + seeing 6+ qualified events in a 10-second window has probability + < 0.06% under normal conditions (background rate ~0.1/s). + """ + logger.info("Running mark_statistical_filter_2 culling") + + # Add esa_sweep coordinate to group packets into 8-spin sets + l1b_de_with_sweep = _add_sweep_indices(l1b_de) + + # Extract required variables + ccsds_index = l1b_de_with_sweep["ccsds_index"].values + coincidence_type = l1b_de_with_sweep["coincidence_type"].values + nominal_bin = l1b_de_with_sweep["nominal_bin"].values + l1b_event_met = l1b_de_with_sweep["event_met"].values + + # Get esa_sweep and esa_step for each event via ccsds_index + esa_sweep = l1b_de_with_sweep.coords["esa_sweep"].values + esa_step = l1b_de_with_sweep["esa_step"].values + ccsds_met = l1b_de_with_sweep["ccsds_met"].values + + # Map events to their packet's (esa_sweep, esa_step) + event_sweep = esa_sweep[ccsds_index] + event_step = esa_step[ccsds_index] + + # Filter to qualified events + is_qualified = np.isin(coincidence_type, list(qualified_coincidence_types)) + + if not np.any(is_qualified): + logger.info("Statistical Filter 2: No qualified events found") + return + + # Get unique (esa_sweep, esa_step) combinations + sweep_step_pairs = np.unique(np.column_stack([event_sweep, event_step]), axis=0) + + n_clusters_found = 0 + n_bins_marked = 0 + + # Process each 8-spin set + for sweep_idx, step_idx in sweep_step_pairs: + # Get qualified events for this 8-spin set + set_mask = (event_sweep == sweep_idx) & (event_step == step_idx) & is_qualified + if np.sum(set_mask) < min_events: + continue + + # Get event data for this 8-spin set + set_event_mets = l1b_event_met[set_mask] + set_bins = nominal_bin[set_mask] + + # Sort by event_met + sort_order = np.argsort(set_event_mets) + sorted_mets = set_event_mets[sort_order] + sorted_bins = set_bins[sort_order] + + # Find clusters + clusters = _find_event_clusters(sorted_mets, min_events, max_time_delta) + + if not clusters: + continue + + # Get all METs for this 8-spin set (to mark all packets in the set) + set_mets = ccsds_met[(esa_sweep == sweep_idx) & (esa_step == step_idx)] + + # Mark bins for each cluster + for cluster_start, cluster_end in clusters: + bins_to_mark = _compute_bins_for_cluster( + sorted_bins, cluster_start, cluster_end, bin_padding + ) + + # Mark the bins as bad for all METs in this 8-spin set + for met in set_mets: + goodtimes_ds.goodtimes.mark_bad_times( + met=met, bins=bins_to_mark, cull=cull_code + ) + + n_clusters_found += 1 + n_bins_marked += len(bins_to_mark) * len(set_mets) + + logger.debug( + f"Statistical Filter 2: ESA sweep={sweep_idx}, step={step_idx}, " + f"cluster of {cluster_end - cluster_start + 1} events, " + f"marking {len(bins_to_mark)} bins across {len(set_mets)} METs" + ) + + if n_clusters_found > 0: + logger.info( + f"Statistical Filter 2: Found {n_clusters_found} pulse cluster(s), " + f"marked {n_bins_marked} bin-intervals as bad" + ) + else: + logger.info("Statistical Filter 2: No pulse clusters identified") diff --git a/imap_processing/tests/hi/test_hi_goodtimes.py b/imap_processing/tests/hi/test_hi_goodtimes.py index fcb1f88825..6f463c1b99 100644 --- a/imap_processing/tests/hi/test_hi_goodtimes.py +++ b/imap_processing/tests/hi/test_hi_goodtimes.py @@ -10,9 +10,11 @@ CullCode, _add_sweep_indices, _build_per_sweep_datasets, + _compute_bins_for_cluster, _compute_median_and_sigma_per_esa, _compute_normalized_counts_per_sweep, _compute_qualified_counts_per_sweep, + _find_event_clusters, _get_sweep_indices, _identify_cull_pattern, create_goodtimes_dataset, @@ -21,6 +23,7 @@ mark_overflow_packets, mark_statistical_filter_0, mark_statistical_filter_1, + mark_statistical_filter_2, ) from imap_processing.quality_flags import ImapHiL1bDeFlags @@ -2484,3 +2487,465 @@ def test_current_index_out_of_range(self, goodtimes_for_filter1): current_index=10, qualified_coincidence_types=qualified_types, ) + + +class TestFindEventClusters: + """Test suite for _find_event_clusters() helper function.""" + + def test_empty_array(self): + """Test with empty input.""" + result = _find_event_clusters(np.array([]), min_events=3, max_time_delta=100) + assert result == [] + + def test_too_few_events(self): + """Test with fewer events than min_events.""" + de_tags = np.array([10, 50]) + result = _find_event_clusters(de_tags, min_events=3, max_time_delta=100) + assert result == [] + + def test_events_too_spread(self): + """Test with events spread beyond max_time_delta.""" + de_tags = np.array([0, 1000, 2000, 3000, 4000, 5000]) + result = _find_event_clusters(de_tags, min_events=3, max_time_delta=100) + assert result == [] + + def test_single_cluster(self): + """Test detection of a single cluster.""" + de_tags = np.array([100, 110, 120, 130, 140, 150]) + result = _find_event_clusters(de_tags, min_events=3, max_time_delta=100) + assert len(result) == 1 + assert result[0] == (0, 5) + + def test_multiple_clusters(self): + """Test detection of multiple separate clusters.""" + de_tags = np.array([100, 110, 120, 1000, 1010, 1020]) + result = _find_event_clusters(de_tags, min_events=3, max_time_delta=50) + assert len(result) == 2 + assert result[0] == (0, 2) + assert result[1] == (3, 5) + + def test_cluster_merge(self): + """Test that overlapping clusters are merged.""" + # Events that form overlapping windows + de_tags = np.array([0, 10, 20, 30, 40, 50]) + result = _find_event_clusters(de_tags, min_events=3, max_time_delta=30) + # All events should merge into one cluster + assert len(result) == 1 + assert result[0] == (0, 5) + + def test_exact_threshold(self): + """Test cluster detection at exact min_events threshold.""" + de_tags = np.array([0, 10, 20]) # Exactly 3 events within 20 ticks + result = _find_event_clusters(de_tags, min_events=3, max_time_delta=20) + assert len(result) == 1 + assert result[0] == (0, 2) + + +class TestComputeBinsForCluster: + """Test suite for _compute_bins_for_cluster() helper function.""" + + def test_basic_range(self): + """Test basic bin range computation.""" + nominal_bins = np.array([40, 42, 44, 46]) + bins = _compute_bins_for_cluster( + nominal_bins, cluster_start=0, cluster_end=3, bin_padding=1 + ) + expected = np.arange(39, 48) # 40-1 to 46+1 + np.testing.assert_array_equal(bins, expected) + + def test_wrapping_at_zero(self): + """Test that bins wrap around at 0.""" + nominal_bins = np.array([0, 1, 2]) + bins = _compute_bins_for_cluster( + nominal_bins, cluster_start=0, cluster_end=2, bin_padding=2, n_bins=90 + ) + # Should wrap: -2, -1, 0, 1, 2, 3, 4 -> 88, 89, 0, 1, 2, 3, 4 + expected = np.array([88, 89, 0, 1, 2, 3, 4]) + np.testing.assert_array_equal(bins, expected) + + def test_wrapping_at_max(self): + """Test that bins wrap around at n_bins.""" + nominal_bins = np.array([87, 88, 89]) + bins = _compute_bins_for_cluster( + nominal_bins, cluster_start=0, cluster_end=2, bin_padding=2, n_bins=90 + ) + # Should wrap: 85, 86, 87, 88, 89, 90, 91 -> 85, 86, 87, 88, 89, 0, 1 + expected = np.array([85, 86, 87, 88, 89, 0, 1]) + np.testing.assert_array_equal(bins, expected) + + def test_partial_cluster(self): + """Test range computation for partial cluster.""" + nominal_bins = np.array([10, 20, 30, 40, 50]) + bins = _compute_bins_for_cluster( + nominal_bins, cluster_start=1, cluster_end=3, bin_padding=1 + ) + expected = np.arange(19, 42) # 20-1 to 40+1 + np.testing.assert_array_equal(bins, expected) + + def test_no_wrapping_needed(self): + """Test middle bins that don't need wrapping.""" + nominal_bins = np.array([44, 45, 46]) + bins = _compute_bins_for_cluster( + nominal_bins, cluster_start=0, cluster_end=2, bin_padding=1 + ) + expected = np.arange(43, 48) # 44-1 to 46+1 + np.testing.assert_array_equal(bins, expected) + + def test_cluster_spanning_zero_boundary(self): + """Test cluster that spans across the 0/89 boundary.""" + # Cluster bins span from 87 to 2 (wrapping around) + nominal_bins = np.array([87, 88, 89, 0, 1, 2]) + bins = _compute_bins_for_cluster( + nominal_bins, cluster_start=0, cluster_end=5, bin_padding=1, n_bins=90 + ) + # Should mark 86-89 and 0-3 (cluster 87-2 plus padding of 1) + expected = np.array([86, 87, 88, 89, 0, 1, 2, 3]) + np.testing.assert_array_equal(bins, expected) + + +class TestStatisticalFilter2: + """Test suite for mark_statistical_filter_2() function.""" + + @pytest.fixture + def goodtimes_for_filter2(self): + """Create a goodtimes dataset for filter 2 testing.""" + n_mets = 10 + met_values = np.arange(1000.0, 1000.0 + n_mets * 120, 120) + + ds = xr.Dataset( + { + "cull_flags": xr.DataArray( + np.zeros((n_mets, 90), dtype=np.uint8), + dims=["met", "spin_bin"], + ), + "esa_step": xr.DataArray( + np.tile(np.arange(1, 11), 1)[:n_mets].astype(np.uint8), + dims=["met"], + ), + }, + coords={ + "met": met_values, + "spin_bin": np.arange(90), + }, + attrs={"sensor": "Hi45", "pointing": 1}, + ) + return ds + + def _create_l1b_de_for_filter2( + self, + n_packets: int = 10, + events_per_packet: int = 20, + base_met: float = 1000.0, + esa_step: int = 1, + ) -> xr.Dataset: + """Create L1B DE dataset for filter 2 testing. + + Creates a dataset with proper epoch and event_met dimensions: + - epoch: packet-level variables (ccsds_met, esa_step) + - event_met: event-level variables (ccsds_index, event_met, etc.) + """ + n_events = n_packets * events_per_packet + + # Spread events across packets + ccsds_index = np.repeat(np.arange(n_packets), events_per_packet).astype( + np.uint16 + ) + + # Packet-level METs (120 seconds apart) + packet_mets = np.arange(base_met, base_met + n_packets * 120, 120) + + # Default: events spread out in time within each packet (no clusters) + # Each packet spans ~120 seconds, events spread across that time + event_met_values = np.zeros(n_events, dtype=np.float64) + for i in range(n_packets): + start_idx = i * events_per_packet + end_idx = start_idx + events_per_packet + # Events spread 0-100 seconds within each packet + event_met_values[start_idx:end_idx] = packet_mets[i] + np.linspace( + 0, 100, events_per_packet + ) + + # All events are qualified type 12 + coincidence_type = np.full(n_events, 12, dtype=np.uint8) + + # Spread events across spin bins + nominal_bin = np.tile( + np.linspace(0, 89, events_per_packet).astype(np.uint8), n_packets + ) + + # All packets at same ESA step (single 8-spin set) + packet_esa_steps = np.full(n_packets, esa_step, dtype=np.uint8) + + return xr.Dataset( + { + # Event-level variables (event dimension) + "ccsds_index": (["event"], ccsds_index), + "event_met": (["event"], event_met_values), + "coincidence_type": (["event"], coincidence_type), + "nominal_bin": (["event"], nominal_bin), + # Packet-level variables (epoch dimension) + "ccsds_met": (["epoch"], packet_mets), + "esa_step": (["epoch"], packet_esa_steps), + }, + coords={ + "event": np.arange(n_events), + "epoch": np.arange(n_packets), + }, + ) + + def test_no_qualified_events(self, goodtimes_for_filter2): + """Test with no qualified events.""" + l1b_de = self._create_l1b_de_for_filter2() + # Change all events to unqualified type + l1b_de["coincidence_type"] = xr.DataArray( + np.full(len(l1b_de["event"]), 4, dtype=np.uint8), + dims=["event"], + ) + + qualified_types = {12} # Type 12 is qualified, but no events have it + + mark_statistical_filter_2( + goodtimes_for_filter2, + l1b_de, + qualified_types, + min_events=6, + max_time_delta=10.0, + ) + + # No bins should be marked + assert np.all(goodtimes_for_filter2["cull_flags"].values == 0) + + def test_no_clusters(self, goodtimes_for_filter2): + """Test with qualified events but no clusters.""" + # Create l1b_de with different esa_steps per packet + # This ensures events are in different 8-spin sets and don't get pooled + n_packets = 10 + events_per_packet = 20 + n_events = n_packets * events_per_packet + base_met = 1000.0 + + ccsds_index = np.repeat(np.arange(n_packets), events_per_packet).astype( + np.uint16 + ) + # Events spread out in time within each packet (no clusters) + # Events at 0, 5, 10, ... 95 seconds - more than 0.2s apart + packet_mets = np.arange(base_met, base_met + n_packets * 120, 120) + event_met_values = np.zeros(n_events, dtype=np.float64) + for i in range(n_packets): + start_idx = i * events_per_packet + end_idx = start_idx + events_per_packet + event_met_values[start_idx:end_idx] = packet_mets[i] + np.linspace( + 0, 95, events_per_packet + ) + coincidence_type = np.full(n_events, 12, dtype=np.uint8) + nominal_bin = np.tile( + np.linspace(0, 89, events_per_packet).astype(np.uint8), n_packets + ) + # Each packet has a different esa_step (different 8-spin sets) + packet_esa_steps = np.arange(1, n_packets + 1, dtype=np.uint8) + + l1b_de = xr.Dataset( + { + "ccsds_index": (["event"], ccsds_index), + "event_met": (["event"], event_met_values), + "coincidence_type": (["event"], coincidence_type), + "nominal_bin": (["event"], nominal_bin), + "ccsds_met": (["epoch"], packet_mets), + "esa_step": (["epoch"], packet_esa_steps), + }, + coords={ + "event": np.arange(n_events), + "epoch": np.arange(n_packets), + }, + ) + + qualified_types = {12} + + mark_statistical_filter_2( + goodtimes_for_filter2, + l1b_de, + qualified_types, + min_events=6, + max_time_delta=0.2, + ) + + # Events are spread out within each 8-spin set, no clusters should form + assert np.all(goodtimes_for_filter2["cull_flags"].values == 0) + + def test_cluster_detected(self, goodtimes_for_filter2): + """Test that a cluster is detected and bins are marked.""" + l1b_de = self._create_l1b_de_for_filter2(n_packets=1, events_per_packet=10) + + # Create a cluster: 6 events within 0.1 seconds, all at bins 40-45 + # Events at 0.01, 0.02, ..., 0.06 seconds (cluster) and + # 10, 11, 12, 13 seconds (spread out) + l1b_de["event_met"] = xr.DataArray( + np.array( + [ + 1000.01, + 1000.02, + 1000.03, + 1000.04, + 1000.05, + 1000.06, + 1010.0, + 1011.0, + 1012.0, + 1013.0, + ], + dtype=np.float64, + ), + dims=["event"], + ) + l1b_de["nominal_bin"] = xr.DataArray( + np.array([40, 41, 42, 43, 44, 45, 10, 20, 30, 50], dtype=np.uint8), + dims=["event"], + ) + + qualified_types = {12} + + mark_statistical_filter_2( + goodtimes_for_filter2, + l1b_de, + qualified_types, + min_events=6, + max_time_delta=0.1, + bin_padding=1, + ) + + # Bins 39-46 should be marked for MET 1000.0 (first MET) + cull_flags = goodtimes_for_filter2["cull_flags"].sel(met=1000.0).values + assert np.all(cull_flags[39:47] == CullCode.LOOSE) + # Other bins should be unmarked + assert np.all(cull_flags[:39] == 0) + assert np.all(cull_flags[47:] == 0) + + def test_multiple_clusters_same_packet(self, goodtimes_for_filter2): + """Test detection of multiple clusters in the same packet.""" + l1b_de = self._create_l1b_de_for_filter2(n_packets=1, events_per_packet=12) + + # Two clusters: one at 0.01-0.06s (bins 10-15), one at 10.0-10.05s (bins 70-75) + l1b_de["event_met"] = xr.DataArray( + np.array( + [ + 1000.01, + 1000.02, + 1000.03, + 1000.04, + 1000.05, + 1000.06, + 1010.00, + 1010.01, + 1010.02, + 1010.03, + 1010.04, + 1010.05, + ], + dtype=np.float64, + ), + dims=["event"], + ) + l1b_de["nominal_bin"] = xr.DataArray( + np.array([10, 11, 12, 13, 14, 15, 70, 71, 72, 73, 74, 75], dtype=np.uint8), + dims=["event"], + ) + + qualified_types = {12} + + mark_statistical_filter_2( + goodtimes_for_filter2, + l1b_de, + qualified_types, + min_events=6, + max_time_delta=0.1, + bin_padding=1, + ) + + cull_flags = goodtimes_for_filter2["cull_flags"].sel(met=1000.0).values + # First cluster: bins 9-16 + assert np.all(cull_flags[9:17] == CullCode.LOOSE) + # Second cluster: bins 69-76 + assert np.all(cull_flags[69:77] == CullCode.LOOSE) + # Middle bins should be unmarked + assert np.all(cull_flags[17:69] == 0) + + def test_bin_padding_with_wrapping(self, goodtimes_for_filter2): + """Test that bin padding wraps at array boundaries.""" + l1b_de = self._create_l1b_de_for_filter2(n_packets=1, events_per_packet=6) + + # Cluster at bins 0-2 with padding=2: bins -2 to 4 wrap to + # [88, 89, 0, 1, 2, 3, 4] + # 6 events clustered within 0.1 seconds + l1b_de["event_met"] = xr.DataArray( + np.array( + [1000.01, 1000.02, 1000.03, 1000.04, 1000.05, 1000.06], dtype=np.float64 + ), + dims=["event"], + ) + l1b_de["nominal_bin"] = xr.DataArray( + np.array([0, 0, 1, 1, 2, 2], dtype=np.uint8), + dims=["event"], + ) + + qualified_types = {12} + + mark_statistical_filter_2( + goodtimes_for_filter2, + l1b_de, + qualified_types, + min_events=6, + max_time_delta=0.1, + bin_padding=2, + ) + + cull_flags = goodtimes_for_filter2["cull_flags"].sel(met=1000.0).values + # Bins 0-4 should be marked (cluster at 0-2 + padding of 2) + assert np.all(cull_flags[0:5] == CullCode.LOOSE) + # Bins 88-89 should also be marked due to wrapping (bin -2 and -1) + assert np.all(cull_flags[88:90] == CullCode.LOOSE) + # Middle bins should be unmarked + assert np.all(cull_flags[5:88] == 0) + + def test_custom_parameters(self, goodtimes_for_filter2): + """Test with custom min_events and max_time_delta.""" + l1b_de = self._create_l1b_de_for_filter2(n_packets=1, events_per_packet=10) + + # Create events: 4 close together (not enough for default min_events=6) + # First 4 events at 0.01-0.04s (cluster), rest spread out + l1b_de["event_met"] = xr.DataArray( + np.array( + [ + 1000.01, + 1000.02, + 1000.03, + 1000.04, + 1010.0, + 1011.0, + 1012.0, + 1013.0, + 1014.0, + 1015.0, + ], + dtype=np.float64, + ), + dims=["event"], + ) + l1b_de["nominal_bin"] = xr.DataArray( + np.array([40, 41, 42, 43, 10, 20, 30, 50, 60, 70], dtype=np.uint8), + dims=["event"], + ) + + qualified_types = {12} + + # With min_events=4, should detect cluster + mark_statistical_filter_2( + goodtimes_for_filter2, + l1b_de, + qualified_types, + min_events=4, + max_time_delta=0.1, + bin_padding=1, + ) + + cull_flags = goodtimes_for_filter2["cull_flags"].sel(met=1000.0).values + assert np.all(cull_flags[39:45] == CullCode.LOOSE) From f8ca5a421acaae3ec946ee18e7bd2dd1a108a3a3 Mon Sep 17 00:00:00 2001 From: Tim Plummer Date: Wed, 25 Feb 2026 09:55:08 -0700 Subject: [PATCH 3/6] Make statistical filter 2 more idiomatic by using xarray coordinates for events --- imap_processing/hi/hi_goodtimes.py | 44 +++++++++++++----------------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py index 79946472ee..ac27a4dcef 100644 --- a/imap_processing/hi/hi_goodtimes.py +++ b/imap_processing/hi/hi_goodtimes.py @@ -1664,49 +1664,41 @@ def mark_statistical_filter_2( # Add esa_sweep coordinate to group packets into 8-spin sets l1b_de_with_sweep = _add_sweep_indices(l1b_de) - # Extract required variables + # Get packet-level arrays ccsds_index = l1b_de_with_sweep["ccsds_index"].values - coincidence_type = l1b_de_with_sweep["coincidence_type"].values - nominal_bin = l1b_de_with_sweep["nominal_bin"].values - l1b_event_met = l1b_de_with_sweep["event_met"].values - - # Get esa_sweep and esa_step for each event via ccsds_index esa_sweep = l1b_de_with_sweep.coords["esa_sweep"].values esa_step = l1b_de_with_sweep["esa_step"].values - ccsds_met = l1b_de_with_sweep["ccsds_met"].values - # Map events to their packet's (esa_sweep, esa_step) - event_sweep = esa_sweep[ccsds_index] - event_step = esa_step[ccsds_index] + # Add event-level coordinates for grouping + l1b_de_with_sweep = l1b_de_with_sweep.assign_coords( + event_sweep=("event", esa_sweep[ccsds_index]), + event_step=("event", esa_step[ccsds_index]), + ) # Filter to qualified events + coincidence_type = l1b_de_with_sweep["coincidence_type"].values is_qualified = np.isin(coincidence_type, list(qualified_coincidence_types)) if not np.any(is_qualified): logger.info("Statistical Filter 2: No qualified events found") return - # Get unique (esa_sweep, esa_step) combinations - sweep_step_pairs = np.unique(np.column_stack([event_sweep, event_step]), axis=0) + qualified_events = l1b_de_with_sweep.isel(event=is_qualified) n_clusters_found = 0 n_bins_marked = 0 - # Process each 8-spin set - for sweep_idx, step_idx in sweep_step_pairs: - # Get qualified events for this 8-spin set - set_mask = (event_sweep == sweep_idx) & (event_step == step_idx) & is_qualified - if np.sum(set_mask) < min_events: + # Process each 8-spin set using xarray groupby + for (sweep_idx, step_idx), group in qualified_events.groupby( + ["event_sweep", "event_step"] + ): + if len(group.event) < min_events: continue - # Get event data for this 8-spin set - set_event_mets = l1b_event_met[set_mask] - set_bins = nominal_bin[set_mask] - # Sort by event_met - sort_order = np.argsort(set_event_mets) - sorted_mets = set_event_mets[sort_order] - sorted_bins = set_bins[sort_order] + sorted_group = group.sortby("event_met") + sorted_mets = sorted_group["event_met"].values + sorted_bins = sorted_group["nominal_bin"].values # Find clusters clusters = _find_event_clusters(sorted_mets, min_events, max_time_delta) @@ -1715,7 +1707,9 @@ def mark_statistical_filter_2( continue # Get all METs for this 8-spin set (to mark all packets in the set) - set_mets = ccsds_met[(esa_sweep == sweep_idx) & (esa_step == step_idx)] + set_mets = l1b_de_with_sweep["ccsds_met"].values[ + (esa_sweep == sweep_idx) & (esa_step == step_idx) + ] # Mark bins for each cluster for cluster_start, cluster_end in clusters: From 1c844fca3587cc56c9206274ca2bb4ef4166c78f Mon Sep 17 00:00:00 2001 From: Tim Plummer Date: Wed, 25 Feb 2026 09:56:23 -0700 Subject: [PATCH 4/6] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- imap_processing/hi/hi_goodtimes.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py index ac27a4dcef..f4c69b0b46 100644 --- a/imap_processing/hi/hi_goodtimes.py +++ b/imap_processing/hi/hi_goodtimes.py @@ -1655,9 +1655,11 @@ def mark_statistical_filter_2( bins as bad, unlike Filters 0 and 1 which mark entire time intervals. Bin marking wraps around (e.g., bin 91 becomes bin 1). - The default parameters (min_events=6, max_time_delta=10.0) imply that - seeing 6+ qualified events in a 10-second window has probability - < 0.06% under normal conditions (background rate ~0.1/s). + The default parameters (min_events=6, + max_time_delta=HiConstants.STAT_FILTER_2_MAX_TIME_DELTA ≈ 9.995 s) + imply that seeing 6+ qualified events in an approximately 10-second + window has probability < 0.06% under normal conditions + (background rate ~0.1/s). """ logger.info("Running mark_statistical_filter_2 culling") From c7ac602b35447c295de90d7b6b092327ebbdde25 Mon Sep 17 00:00:00 2001 From: Tim Plummer Date: Thu, 26 Feb 2026 09:02:03 -0700 Subject: [PATCH 5/6] Remove duplicate check on list length, letting find_event_clusters do the check --- imap_processing/hi/hi_goodtimes.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py index f4c69b0b46..b7131b6053 100644 --- a/imap_processing/hi/hi_goodtimes.py +++ b/imap_processing/hi/hi_goodtimes.py @@ -1694,9 +1694,6 @@ def mark_statistical_filter_2( for (sweep_idx, step_idx), group in qualified_events.groupby( ["event_sweep", "event_step"] ): - if len(group.event) < min_events: - continue - # Sort by event_met sorted_group = group.sortby("event_met") sorted_mets = sorted_group["event_met"].values From 30be3c2bbd4ebfa5a8cc6060d637f4a5a77fc9bd Mon Sep 17 00:00:00 2001 From: Tim Plummer Date: Thu, 26 Feb 2026 09:15:26 -0700 Subject: [PATCH 6/6] Add check that other mets were not culled in stat filter 2 test --- imap_processing/tests/hi/test_hi_goodtimes.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/imap_processing/tests/hi/test_hi_goodtimes.py b/imap_processing/tests/hi/test_hi_goodtimes.py index 6f463c1b99..c2c8c8acb0 100644 --- a/imap_processing/tests/hi/test_hi_goodtimes.py +++ b/imap_processing/tests/hi/test_hi_goodtimes.py @@ -2905,6 +2905,9 @@ def test_bin_padding_with_wrapping(self, goodtimes_for_filter2): assert np.all(cull_flags[88:90] == CullCode.LOOSE) # Middle bins should be unmarked assert np.all(cull_flags[5:88] == 0) + # Check that no cull_flags were set on any other METs + other_mets = goodtimes_for_filter2["cull_flags"].drop_sel(met=1000.0) + assert np.all(other_mets.values == 0) def test_custom_parameters(self, goodtimes_for_filter2): """Test with custom min_events and max_time_delta."""