Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 243 additions & 0 deletions imap_processing/hi/hi_goodtimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1495,3 +1495,246 @@ def mark_statistical_filter_1(
)
else:
logger.info("Statistical Filter 1: No bad intervals identified")


def _find_event_clusters(
event_times: np.ndarray,
min_events: int,
max_time_delta: float,
) -> list[tuple[int, int]]:
"""
Find clusters of events that occur within a maximum time window.

Uses vectorized numpy operations to find groups of min_events or more
events that all occur within max_time_delta seconds of each other.

Parameters
----------
event_times : np.ndarray
Sorted array of event times (event_met values in seconds).
min_events : int
Minimum number of events to form a cluster.
max_time_delta : float
Maximum time span in seconds for events to be considered clustered.

Returns
-------
list[tuple[int, int]]
List of (start_idx, end_idx) tuples marking cluster boundaries
in the input array. Indices are inclusive.
"""
if len(event_times) < min_events:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt this a duplicate of lines 1697 and 1698?

return []

# Compute time span for each window of min_events consecutive events
# window_spans[i] = event_times[i + min_events - 1] - event_times[i]
window_spans = event_times[min_events - 1 :] - event_times[: -(min_events - 1)]

# Mask where windows fit within max_time_delta
cluster_mask = window_spans <= max_time_delta

if not np.any(cluster_mask):
return []

# Find contiguous regions of True in the mask. Each contiguous region
# [i, j] in the mask corresponds to cluster [i, j + min_events - 1]

# Pad with False to handle edge cases
padded = np.concatenate(([False], cluster_mask, [False]))

# Find transitions: +1 = start of group, -1 = end of group
diff = np.diff(padded.astype(int))
starts = np.flatnonzero(diff == 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need flatnonzero? Isnt diff 1d?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flatnonzero returns the array of indices directly instead of a tuple of array of indices. So, it avoids having to add the [0] on the end to index the first returned element.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh - I see.

ends = np.flatnonzero(diff == -1) + min_events - 2 # Adjust for window size

return list(zip(starts.tolist(), ends.tolist(), strict=False))


def _compute_bins_for_cluster(
nominal_bins: np.ndarray,
cluster_start: int,
cluster_end: int,
bin_padding: int,
n_bins: int = 90,
) -> np.ndarray:
"""
Compute the spin bins to cull for a cluster of events, with wrapping.

Parameters
----------
nominal_bins : np.ndarray
Array of nominal_bin values for events.
cluster_start : int
Start index of cluster in nominal_bins array.
cluster_end : int
End index of cluster (inclusive).
bin_padding : int
Number of bins to add on each side.
n_bins : int
Total number of spin bins (default 90).

Returns
-------
np.ndarray
Array of bin indices to cull, with wrapping handled.
For example, if cluster spans bins 88-91 with n_bins=90,
returns [87, 88, 89, 0, 1, 2] (with padding=1).
"""
cluster_bins = nominal_bins[cluster_start : cluster_end + 1].astype(np.int32)

# Unwrap to handle clusters spanning the 0/n_bins boundary
unwrapped = np.unwrap(cluster_bins, period=n_bins)
bin_min = int(np.min(unwrapped))
bin_max = int(np.max(unwrapped))

# Add padding
bin_low = bin_min - bin_padding
bin_high = bin_max + bin_padding

# Generate bin indices with wrapping using modulo
bins_to_mark = np.arange(bin_low, bin_high + 1) % n_bins

return bins_to_mark


def mark_statistical_filter_2(
goodtimes_ds: xr.Dataset,
l1b_de: xr.Dataset,
qualified_coincidence_types: set[int],
min_events: int = HiConstants.STAT_FILTER_2_MIN_EVENTS,
max_time_delta: float = HiConstants.STAT_FILTER_2_MAX_TIME_DELTA,
bin_padding: int = HiConstants.STAT_FILTER_2_BIN_PADDING,
cull_code: int = CullCode.LOOSE,
) -> None:
"""
Apply Statistical Filter 2 to detect short-lived event pulses.

Statistical Filter 2 from Algorithm Document Section 2.3.2.3 removes
occasional short-lived "pulses" of qualified counts that may be
temporally correlated between sensors. These pulses are usually visible
only at the highest few energy steps and are not caught by Filter 1.

For each 8-spin set (grouped by esa_sweep and esa_step), this filter:
1. Keeps only events qualifying as calibration product 1 or 2
2. Sorts events by event_met
3. Finds time ranges where min_events or more events occur within
max_time_delta seconds
4. Marks the angle range covered by each pulse (plus bin_padding bins
on each side, with wrapping) as not good for all METs in that 8-spin set

Parameters
----------
goodtimes_ds : xr.Dataset
Goodtimes dataset for the current Pointing to update.
l1b_de : xr.Dataset
L1B Direct Event dataset for the current Pointing containing:
- ccsds_index: packet index for each event
- ccsds_met: MET timestamp for each packet
- event_met: MET timestamp for each event
- coincidence_type: detector coincidence bitmap
- nominal_bin: spacecraft spin bin (0-89)
- esa_step: ESA energy step for each packet
qualified_coincidence_types : set[int]
Set of coincidence type integers qualifying as calibration
products 1 or 2.
min_events : int, optional
Minimum events to form a pulse cluster.
Default is HiConstants.STAT_FILTER_2_MIN_EVENTS.
max_time_delta : float, optional
Maximum time span in seconds for events to be considered clustered.
Default is HiConstants.STAT_FILTER_2_MAX_TIME_DELTA.
bin_padding : int, optional
Number of 4-degree bins to add on each side of the pulse angle range.
Default is HiConstants.STAT_FILTER_2_BIN_PADDING.
cull_code : int, optional
Cull code to use for marking bad times. Default is CullCode.LOOSE.

Notes
-----
This function modifies goodtimes_ds in place. It marks specific spin
bins as bad, unlike Filters 0 and 1 which mark entire time intervals.
Bin marking wraps around (e.g., bin 91 becomes bin 1).

The default parameters (min_events=6,
max_time_delta=HiConstants.STAT_FILTER_2_MAX_TIME_DELTA ≈ 9.995 s)
imply that seeing 6+ qualified events in an approximately 10-second
window has probability < 0.06% under normal conditions
(background rate ~0.1/s).
"""
logger.info("Running mark_statistical_filter_2 culling")

# Add esa_sweep coordinate to group packets into 8-spin sets
l1b_de_with_sweep = _add_sweep_indices(l1b_de)

# Get packet-level arrays
ccsds_index = l1b_de_with_sweep["ccsds_index"].values
esa_sweep = l1b_de_with_sweep.coords["esa_sweep"].values
esa_step = l1b_de_with_sweep["esa_step"].values

# Add event-level coordinates for grouping
l1b_de_with_sweep = l1b_de_with_sweep.assign_coords(
event_sweep=("event", esa_sweep[ccsds_index]),
event_step=("event", esa_step[ccsds_index]),
)

# Filter to qualified events
coincidence_type = l1b_de_with_sweep["coincidence_type"].values
is_qualified = np.isin(coincidence_type, list(qualified_coincidence_types))

if not np.any(is_qualified):
logger.info("Statistical Filter 2: No qualified events found")
return

qualified_events = l1b_de_with_sweep.isel(event=is_qualified)

n_clusters_found = 0
n_bins_marked = 0

# Process each 8-spin set using xarray groupby
for (sweep_idx, step_idx), group in qualified_events.groupby(
["event_sweep", "event_step"]
):
# Sort by event_met
sorted_group = group.sortby("event_met")
sorted_mets = sorted_group["event_met"].values
sorted_bins = sorted_group["nominal_bin"].values

# Find clusters
clusters = _find_event_clusters(sorted_mets, min_events, max_time_delta)

if not clusters:
continue

# Get all METs for this 8-spin set (to mark all packets in the set)
set_mets = l1b_de_with_sweep["ccsds_met"].values[
(esa_sweep == sweep_idx) & (esa_step == step_idx)
]

# Mark bins for each cluster
for cluster_start, cluster_end in clusters:
bins_to_mark = _compute_bins_for_cluster(
sorted_bins, cluster_start, cluster_end, bin_padding
)

# Mark the bins as bad for all METs in this 8-spin set
for met in set_mets:
goodtimes_ds.goodtimes.mark_bad_times(
met=met, bins=bins_to_mark, cull=cull_code
)

n_clusters_found += 1
n_bins_marked += len(bins_to_mark) * len(set_mets)

logger.debug(
f"Statistical Filter 2: ESA sweep={sweep_idx}, step={step_idx}, "
f"cluster of {cluster_end - cluster_start + 1} events, "
f"marking {len(bins_to_mark)} bins across {len(set_mets)} METs"
)

if n_clusters_found > 0:
logger.info(
f"Statistical Filter 2: Found {n_clusters_found} pulse cluster(s), "
f"marked {n_bins_marked} bin-intervals as bad"
)
else:
logger.info("Statistical Filter 2: No pulse clusters identified")
13 changes: 2 additions & 11 deletions imap_processing/hi/hi_l1a.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,10 @@

from imap_processing import imap_module_directory
from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.hi.utils import HIAPID
from imap_processing.hi.utils import HIAPID, HiConstants
from imap_processing.spice.time import met_to_ttj2000ns
from imap_processing.utils import packet_file_to_datasets

# TODO: read DE_CLOCK_TICK_US from
# instrument status summary later. This value
# is rarely change but want to be able to change
# it if needed. It stores information about how
# fast the time was ticking. It is in microseconds.
DE_CLOCK_TICK_US = 1999
DE_CLOCK_TICK_S = DE_CLOCK_TICK_US / 1e6
HALF_CLOCK_TICK_S = DE_CLOCK_TICK_S / 2

MILLISECOND_TO_S = 1e-3

# define the names of the 24 counter arrays
Expand Down Expand Up @@ -293,7 +284,7 @@ def create_de_dataset(de_data_dict: dict[str, npt.ArrayLike]) -> xr.Dataset:
# See Hi Algorithm Document section 2.2.5
event_met_array = np.array(
meta_event_met[de_data_dict["ccsds_index"]]
+ np.array(de_data_dict["de_tag"]) * DE_CLOCK_TICK_S,
+ np.array(de_data_dict["de_tag"]) * HiConstants.DE_CLOCK_TICK_S,
dtype=event_met_dtype,
)

Expand Down
8 changes: 5 additions & 3 deletions imap_processing/hi/hi_l1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from imap_processing import imap_module_directory
from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.cdf.utils import parse_filename_like
from imap_processing.hi.hi_l1a import HALF_CLOCK_TICK_S, MILLISECOND_TO_S
from imap_processing.hi.hi_l1a import MILLISECOND_TO_S
from imap_processing.hi.utils import (
HIAPID,
CoincidenceBitmap,
Expand Down Expand Up @@ -335,7 +335,7 @@ def de_nominal_bin_and_spin_phase(dataset: xr.Dataset) -> dict[str, xr.DataArray
# be binned into in the histogram packet. The Hi histogram data is binned by
# spacecraft spin-phase, not instrument spin-phase, so the same is done here.
# We have to add 1/2 clock tick to MET time before getting spin phase
met_seconds = dataset.event_met.values + HALF_CLOCK_TICK_S
met_seconds = dataset.event_met.values + HiConstants.HALF_CLOCK_TICK_S
imap_spin_phase = get_spacecraft_spin_phase(met_seconds)
new_vars["nominal_bin"].values = np.asarray(imap_spin_phase * 360 / 4).astype(
np.uint8
Expand Down Expand Up @@ -380,7 +380,9 @@ def compute_hae_coordinates(dataset: xr.Dataset) -> dict[str, xr.DataArray]:

# Per Section 2.2.5 of Algorithm Document, add 1/2 of tick duration
# to MET before computing pointing.
sclk_ticks = met_to_sclkticks(dataset.event_met.values + HALF_CLOCK_TICK_S)
sclk_ticks = met_to_sclkticks(
dataset.event_met.values + HiConstants.HALF_CLOCK_TICK_S
)
et = sct_to_et(sclk_ticks)
sensor_number = parse_sensor_number(dataset.attrs["Logical_source"])
# TODO: For now, we are using SPICE to compute the look direction for each
Expand Down
13 changes: 5 additions & 8 deletions imap_processing/hi/hi_l1c.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@

from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.cdf.utils import parse_filename_like
from imap_processing.hi.hi_l1a import (
DE_CLOCK_TICK_S,
HALF_CLOCK_TICK_S,
)
from imap_processing.hi.utils import (
CalibrationProductConfig,
HiConstants,
create_dataset_variables,
full_dataarray,
parse_sensor_number,
Expand Down Expand Up @@ -557,7 +554,7 @@ def pset_exposure(
# for a given clock tick, add 1/2 clock tick and compute spin-phase.
spin_phases = np.atleast_1d(
get_instrument_spin_phase(
clock_tick_mets + HALF_CLOCK_TICK_S,
clock_tick_mets + HiConstants.HALF_CLOCK_TICK_S,
SpiceFrame[f"IMAP_HI_{sensor_number}"],
)
)
Expand All @@ -581,7 +578,7 @@ def pset_exposure(
exposure_var["exposure_times"].values[:, i_esa] += new_exposure_times

# Convert exposure clock ticks to seconds
exposure_var["exposure_times"].values *= DE_CLOCK_TICK_S
exposure_var["exposure_times"].values *= HiConstants.DE_CLOCK_TICK_S

return exposure_var

Expand Down Expand Up @@ -693,7 +690,7 @@ def get_de_clock_ticks_for_esa_step(
clock_tick_mets = np.arange(
spin_start_mets[end_time_ind - 8],
spin_start_mets[end_time_ind],
DE_CLOCK_TICK_S,
HiConstants.DE_CLOCK_TICK_S,
dtype=float,
)
# The final clock-tick bin has less exposure time because the next spin
Expand All @@ -705,7 +702,7 @@ def get_de_clock_ticks_for_esa_step(
clock_tick_weights = np.ones_like(clock_tick_mets, dtype=float)
clock_tick_weights[-1] = (
spin_start_mets[end_time_ind] - clock_tick_mets[-1]
) / DE_CLOCK_TICK_S
) / HiConstants.DE_CLOCK_TICK_S
return clock_tick_mets, clock_tick_weights


Expand Down
Loading