diff --git a/imap_processing/idex/idex_constants.py b/imap_processing/idex/idex_constants.py index 880110ecc..8d50ddcd9 100644 --- a/imap_processing/idex/idex_constants.py +++ b/imap_processing/idex/idex_constants.py @@ -55,6 +55,8 @@ class IdexConstants: SECONDS_IN_DAY = 86400 # Nanoseconds in day NANOSECONDS_IN_DAY = SECONDS_IN_DAY * int(1e9) +# Picocoulombs to coulombs conversion factor +PICOCOULOMB_TO_COULOMB = 1e-12 # fg to kg conversion factor FG_TO_KG = 1e-15 diff --git a/imap_processing/idex/idex_l2a.py b/imap_processing/idex/idex_l2a.py index 11182c5b2..99d90a326 100644 --- a/imap_processing/idex/idex_l2a.py +++ b/imap_processing/idex/idex_l2a.py @@ -70,10 +70,10 @@ def load_calibration_files(ancillary_files: dict) -> tuple[NDArray, NDArray]: """ # Load calibration coefficients from ancillary files t_rise_params = pd.read_csv( - ancillary_files["l2a-calibration-curve-yield-params"], skiprows=1, header=None + ancillary_files["l2a-calibration-curve-t-rise"], skiprows=1, header=None ).values.flatten()[:8] yield_params = pd.read_csv( - ancillary_files["l2a-calibration-curve-t-rise"], skiprows=1, header=None + ancillary_files["l2a-calibration-curve-yield-params"], skiprows=1, header=None ).values.flatten()[:8] return t_rise_params, yield_params @@ -319,11 +319,11 @@ def calculate_velocity_and_mass( Parameters ---------- sig_amp : float - Signal amplitude. + Signal amplitude (pC). t_rise : float - T_rise fit parameter from the target fit. + T_rise fit parameter from the target fit (us). t_rise_params : np.ndarray - Calibration parameters for rise time. + Calibration parameters for rise time (us). yield_params : np.ndarray Calibration parameters for yield. @@ -334,7 +334,7 @@ def calculate_velocity_and_mass( mass_est : float Estimated mass. """ - log_a_t: float = np.log10(t_rise_params[0]) + log_a_t: float = float(t_rise_params[0]) try: root = root_scalar( lambda lv: ( @@ -351,41 +351,51 @@ def calculate_velocity_and_mass( ) return np.nan, np.nan - log_a_y: float = np.log10(yield_params[0]) + log_a_y: float = float(yield_params[0]) yield_val = 10 ** log_smooth_powerlaw(np.log10(v_est), log_a_y, yield_params[1:]) - mass_est = sig_amp / yield_val + sig_amp_coulombs = sig_amp * idex_constants.PICOCOULOMB_TO_COULOMB + mass_est = sig_amp_coulombs / yield_val return v_est, mass_est def log_smooth_powerlaw(log_v: float, log_a: float, params: np.ndarray) -> float: """ - Define a smoothly transitioning power law to fit the calibration curve to. + Define a smoothly transitioning power law used by the IDEX calibration curves. + + This helper is used in two ways: + - rise-time calibration: log10(rise_time [us]) to log10(velocity [km/s]) + - yield calibration: log10(velocity [km/s]) to log10(charge_yield [C/kg]) Parameters ---------- log_v : float - Velocity. + The log10 input to the calibration curve. + This is either log10(rise_time [us]) for the rise-time case or + Log10(velocity [km/s]) for the yield case. log_a : float - Scale factor. + Log10 of the calibration scale factor A. params : np.ndarray - Calibration parameters for the power law. + Calibration parameters for the power law + [a1, a2, a3, vb, vc, k, m]. Returns ------- float - The value of the power law at the given velocity. + The calibrated log10 output. + This is either log10(velocity [km/s]) for the rise-time case or + log10(charge_yield [C/kg]) for the yield case. """ # Unpack the rest of the calibration parameters # a1, a2, and a3 are the power law exponents for the low, medium, and high-velocity # segments. # vb and vc are the characteristic speeds where the slope transition happens, and k # setting the sharpness of the transitions. - a1, a2, a3, vb, vc, _k, m = params + a1, a2, a3, vb, vc, k, _m = params v = 10**log_v base = log_a + a1 * log_v - transition1 = (1 + (v / vb) ** m) ** ((a2 - a1) / m) - transition2 = (1 + (v / vc) ** m) ** ((a3 - a2) / m) + transition1 = (1 + (v / vb) ** k) ** ((a2 - a1) / k) + transition2 = (1 + (v / vc) ** k) ** ((a3 - a2) / k) return base + np.log10(transition1 * transition2) @@ -759,7 +769,7 @@ def estimate_dust_mass( Parameters ---------- low_sampling_time : xarray.DataArray - The low sampling time array. + The low sampling time array in microseconds. target_signal : xarray.DataArray Target signal data. remove_noise : bool @@ -824,8 +834,8 @@ def estimate_dust_mass( if channel_name != "Ion_Grid" and amplitude <= 0.0: amplitude = float(np.max(signal)) - rise_time_0 = 0.371 # How fast the signal rises (s) - discharge_time_0 = 37.1 # How fast signal decays (s) + rise_time_0 = 0.371 # How fast the signal rises (us) + discharge_time_0 = 37.1 # How fast signal decays (us) p0 = [time_of_impact, constant_offset, amplitude, rise_time_0, discharge_time_0] positive_min = float(np.finfo(float).eps) @@ -893,13 +903,13 @@ def fit_impact( Parameters ---------- time : np.ndarray - Time values for the signal. + Time values for the signal (us). time_of_impact : float - Time of dust impact. + Time of dust impact (us). constant_offset : float Initial baseline noise. amplitude : float - Signal height. + Signal height (pC). rise_time : float How fast the signal rises (s). discharge_time : float diff --git a/imap_processing/tests/idex/test_idex_l2a.py b/imap_processing/tests/idex/test_idex_l2a.py index ba04f2bd9..826bb327f 100644 --- a/imap_processing/tests/idex/test_idex_l2a.py +++ b/imap_processing/tests/idex/test_idex_l2a.py @@ -22,6 +22,8 @@ estimate_dust_mass, fit_impact, idex_l2a, + load_calibration_files, + log_smooth_powerlaw, remove_signal_noise, sine_fit, time_to_mass, @@ -66,6 +68,13 @@ def mock_microphonics_noise(time: np.ndarray) -> np.ndarray: return combined_sig +def _write_calibration_csv(path, values): + """Write a one-row calibration CSV with the ancillary-file structure.""" + header = "A,a1,a2,a3,v_b,v_c,k,sigma,delta\n" + row = ",".join(str(value) for value in values) + "\n" + path.write_text(header + row) + + @pytest.mark.external_test_data def test_l2a_logical_source_and_cdf(l2a_dataset: xr.Dataset): """Tests that the ``idex_l2a`` function generates datasets @@ -275,19 +284,100 @@ def test_analyze_peaks_warning(caplog): np.testing.assert_array_equal(area_under_curve, np.zeros(area_under_curve.shape)) +def test_load_calibration_files_returns_expected_t_rise_params(tmp_path): + """Tests that t-rise ancillary values are loaded into t_rise_params.""" + expected_t_rise_params = np.array([3.6, -0.2, -2.0, 0.38, 5.1, 13.7, 13.3, 0.28]) + yield_values = np.array([0.06, 2.8, 5.9, 4.1, 13.0, 22.7, 8.2, 0.40, 1.47]) + + t_rise_path = tmp_path / "t_rise.csv" + yield_path = tmp_path / "yield.csv" + _write_calibration_csv(t_rise_path, expected_t_rise_params) + _write_calibration_csv(yield_path, yield_values) + + t_rise_params, _yield_params = load_calibration_files( + { + "l2a-calibration-curve-t-rise": t_rise_path, + "l2a-calibration-curve-yield-params": yield_path, + } + ) + + np.testing.assert_allclose(t_rise_params, expected_t_rise_params) + + +def test_load_calibration_files_returns_expected_yield_params(tmp_path): + """Tests that yield ancillary values are loaded into yield_params.""" + t_rise_values = np.array([3.6, -0.2, -2.0, 0.38, 5.1, 13.7, 13.3, 0.28, 1.33]) + expected_yield_params = np.array([0.06, 2.8, 5.9, 4.1, 13.0, 22.7, 8.2, 0.40]) + + t_rise_path = tmp_path / "t_rise.csv" + yield_path = tmp_path / "yield.csv" + _write_calibration_csv(t_rise_path, t_rise_values) + _write_calibration_csv(yield_path, expected_yield_params) + + _t_rise_params, yield_params = load_calibration_files( + { + "l2a-calibration-curve-t-rise": t_rise_path, + "l2a-calibration-curve-yield-params": yield_path, + } + ) + + np.testing.assert_allclose(yield_params, expected_yield_params) + + +def test_log_smooth_powerlaw_yield_curve_at_10_km_s(): + """Tests that the yield calibration returns the expected value at 10 km/s.""" + yield_params = np.array([0.06, 2.8, 5.9, 4.1, 13.0, 22.7, 8.2, 0.40]) + + log_yield = log_smooth_powerlaw(np.log10(10.0), yield_params[0], yield_params[1:]) + yield_value = 10**log_yield + + assert yield_value == pytest.approx(755.0, rel=1e-3) + + +def test_calculate_velocity_and_mass_at_10_km_s(): + """Tests mass estimation using a mocked 10 km/s velocity solution.""" + t_rise_params = np.array([3.6, -0.2, -2.0, 0.38, 5.1, 13.7, 13.3, 0.28]) + yield_params = np.array([0.06, 2.8, 5.9, 4.1, 13.0, 22.7, 8.2, 0.40]) + sig_amp_pc = 10.0 + + # This test intentionally bypasses the t_rise -> velocity inversion. + # The t_rise calibration path is currently under review and will be + # covered by a dedicated follow-up test once that behavior is finalized. + mocked_root = mock.Mock() + mocked_root.root = 1.0 # 10**1.0 == 10 km/s + + with mock.patch( + "imap_processing.idex.idex_l2a.root_scalar", return_value=mocked_root + ): + velocity_estimate, mass_estimate = calculate_velocity_and_mass( + sig_amp_pc, 2.0, t_rise_params, yield_params + ) + + expected_yield = 755.0090524738858 + expected_mass_kg = sig_amp_pc * 1e-12 / expected_yield + + assert velocity_estimate == pytest.approx(10.0, rel=1e-12) + assert mass_estimate == pytest.approx(expected_mass_kg, rel=1e-12) + + @pytest.mark.external_test_data def test_velocity_and_mass_estimate(ancillary_files): """Tests that the velocity and mass estimate function.""" # Load calibration coefficients from ancillary files t_rise_params = pd.read_csv( - ancillary_files["l2a-calibration-curve-yield-params"], skiprows=1, header=None + ancillary_files["l2a-calibration-curve-t-rise"], skiprows=1, header=None ).values.flatten()[:8] yield_params = pd.read_csv( - ancillary_files["l2a-calibration-curve-t-rise"], skiprows=1, header=None + ancillary_files["l2a-calibration-curve-yield-params"], skiprows=1, header=None ).values.flatten()[:8] - estimates = calculate_velocity_and_mass(10, 2, t_rise_params, yield_params) + expected_velocity = 5.0 + t_rise = 10 ** log_smooth_powerlaw( + np.log10(expected_velocity), float(t_rise_params[0]), t_rise_params[1:] + ) + estimates = calculate_velocity_and_mass(10, t_rise, t_rise_params, yield_params) assert len(estimates) == 2 assert not np.any(np.isnan(estimates)) + assert estimates[0] == pytest.approx(expected_velocity, rel=1e-12) def test_analyze_peaks_perfect_fits():