diff --git a/src/mritk/__init__.py b/src/mritk/__init__.py index 45be4fb..d805c50 100644 --- a/src/mritk/__init__.py +++ b/src/mritk/__init__.py @@ -16,6 +16,7 @@ statistics, utils, ) +from .data import MRIData meta = metadata("mritk") __version__ = meta["Version"] @@ -35,4 +36,5 @@ "hybrid", "r1", "statistics", + "MRIData", ] diff --git a/src/mritk/concentration.py b/src/mritk/concentration.py index cf1960c..61be638 100644 --- a/src/mritk/concentration.py +++ b/src/mritk/concentration.py @@ -51,6 +51,42 @@ def concentration_from_R1_expr(r1_map: np.ndarray, r1_0_map: np.ndarray, r1: flo return (1.0 / r1) * (r1_map - r1_0_map) +def R1_from_concentration_expr(c: np.ndarray, r1_0: np.ndarray, r1: float) -> np.ndarray: + """ + Computes post-contrast R1 relaxation rates from tracer concentration. + + Formula: R1 = C * r1 + R1_0 + + Args: + c (np.ndarray): Array of tracer concentrations. + r1_0 (np.ndarray): Array of pre-contrast (baseline) R1 relaxation rates. + r1 (float): Relaxivity of the contrast agent. + + Returns: + np.ndarray: Computed post-contrast R1 array. + """ + return c * r1 + r1_0 + + +def T1_from_concentration_expr(c: np.ndarray, t1_0: np.ndarray, r1: float) -> np.ndarray: + """ + Computes post-contrast T1 relaxation times from tracer concentration. + + Formula: T1 = 1 / (C * r1 + (1 / T1_0)) + + Args: + c (np.ndarray): Array of tracer concentrations. + t1_0 (np.ndarray): Array of pre-contrast (baseline) T1 relaxation times. + r1 (float): Relaxivity of the contrast agent. + + Returns: + np.ndarray: Computed post-contrast T1 array. + """ + # Note: In a robust pipeline, you might want to mask or handle cases + # where t1_0 is 0 to avoid division by zero errors. + return 1.0 / (c * r1 + (1.0 / t1_0)) + + def compute_concentration_from_T1_array( t1_data: np.ndarray, t10_data: np.ndarray, r1: float, mask: np.ndarray | None = None ) -> np.ndarray: diff --git a/src/mritk/looklocker.py b/src/mritk/looklocker.py index c799444..f69af81 100644 --- a/src/mritk/looklocker.py +++ b/src/mritk/looklocker.py @@ -132,7 +132,7 @@ def compute_looklocker_t1_array(data: np.ndarray, time_s: np.ndarray, t1_roof: f logger.debug(f"Starting fitting for {len(d_masked)} voxels.") with tqdm.tqdm(total=len(d_masked), desc="Fitting Look-Locker Voxels") as pbar: - voxel_fitter = partial(fit_voxel, time_s, pbar) + voxel_fitter = partial(fit_voxel, time_s=time_s, pbar=pbar) vfunc = np.vectorize(voxel_fitter, signature="(n) -> (3)") fitted_coefficients = vfunc(d_masked) diff --git a/src/mritk/mixed.py b/src/mritk/mixed.py index 186ee65..77abfde 100644 --- a/src/mritk/mixed.py +++ b/src/mritk/mixed.py @@ -101,7 +101,12 @@ def extract_single_volume(D: np.ndarray, frame_fg) -> MRIData: def mixed_t1map( - SE_nii_path: Path, IR_nii_path: Path, meta_path: Path, T1_low: float, T1_high: float, output: Path | None = None + SE_nii_path: Path, + IR_nii_path: Path, + meta_path: Path, + T1_low: float = 500.0, + T1_high: float = 5000.0, + output: Path | None = None, ) -> nibabel.nifti1.Nifti1Image: """ Generates a T1 relaxation map by combining Spin-Echo (SE) and Inversion-Recovery (IR) acquisitions. @@ -116,8 +121,8 @@ def mixed_t1map( IR_nii_path (Path): Path to the Inversion-Recovery corrected real NIfTI file. meta_path (Path): Path to the JSON file containing the sequence parameters ('TR_SE', 'TI', 'TE', 'ETL'). - T1_low (float): Lower bound for the T1 interpolation grid (in ms). - T1_high (float): Upper bound for the T1 interpolation grid (in ms). + T1_low (float): Lower bound for the T1 interpolation grid (in ms). Defaults to 500 ms. + T1_high (float): Upper bound for the T1 interpolation grid (in ms). Defaults to 5000 ms. output (Path | None, optional): Path to save the resulting T1 map NIfTI file. Defaults to None. Returns: @@ -187,7 +192,9 @@ def mixed_t1map_postprocessing(SE_nii_path: Path, T1_path: Path, output: Path | return masked_t1map_nii -def compute_mixed_t1_array(se_data: np.ndarray, ir_data: np.ndarray, meta: dict, t1_low: float, t1_high: float) -> np.ndarray: +def compute_mixed_t1_array( + se_data: np.ndarray, ir_data: np.ndarray, meta: dict, t1_low: float = 500.0, t1_high: float = 5000.0 +) -> np.ndarray: """ Computes a Mixed T1 array from Spin-Echo and Inversion-Recovery volumes using a lookup table. @@ -195,8 +202,8 @@ def compute_mixed_t1_array(se_data: np.ndarray, ir_data: np.ndarray, meta: dict, se_data (np.ndarray): 3D numpy array of the Spin-Echo modulus data. ir_data (np.ndarray): 3D numpy array of the Inversion-Recovery corrected real data. meta (dict): Dictionary containing sequence parameters ('TR_SE', 'TI', 'TE', 'ETL'). - t1_low (float): Lower bound for T1 generation grid. - t1_high (float): Upper bound for T1 generation grid. + t1_low (float): Lower bound for T1 generation grid. Defaults to 500 ms. + t1_high (float): Upper bound for T1 generation grid. Defaults to 5000 ms. Returns: np.ndarray: Computed T1 map as a 3D float32 array. diff --git a/src/mritk/utils.py b/src/mritk/utils.py index bd32b8c..a3f3a73 100644 --- a/src/mritk/utils.py +++ b/src/mritk/utils.py @@ -95,7 +95,7 @@ def curve_fit_wrapper(f, t: np.ndarray, y: np.ndarray, p0: np.ndarray): return popt -def fit_voxel(time_s: np.ndarray, pbar, m: np.ndarray) -> np.ndarray: +def fit_voxel(time_s: np.ndarray, m: np.ndarray, pbar=None) -> np.ndarray: """ Fits the Look-Locker relaxation curve for a single voxel's time series. @@ -105,8 +105,8 @@ def fit_voxel(time_s: np.ndarray, pbar, m: np.ndarray) -> np.ndarray: Args: time_s (np.ndarray): 1D array of trigger times in seconds. - pbar: A tqdm progress bar instance (or None) to update incrementally. m (np.ndarray): 1D array of signal magnitudes over time for the voxel. + pbar: A tqdm progress bar instance (or None) to update incrementally. Returns: np.ndarray: A 3-element array containing the fitted parameters `[x1, x2, x3]`. diff --git a/tests/test_concentration.py b/tests/test_concentration.py index 29a7435..a50909c 100644 --- a/tests/test_concentration.py +++ b/tests/test_concentration.py @@ -9,13 +9,8 @@ import numpy as np import mritk.cli -from mritk.concentration import ( - compute_concentration_from_R1_array, - compute_concentration_from_T1_array, - concentration_from_R1_expr, - concentration_from_T1, - concentration_from_T1_expr, -) +import mritk.concentration +import mritk.data from mritk.testing import compare_nifti_images @@ -36,7 +31,7 @@ def test_intracranial_concentration(tmp_path, mri_data_dir: Path): test_outputs = [tmp_path / f"output_ses-0{i}_concentration.nii.gz" for i in sessions] for i, s in enumerate(sessions): - concentration_from_T1( + mritk.concentration.concentration_from_T1( input_path=images_path[i], reference_path=baseline_path, output_path=test_outputs[i], @@ -46,13 +41,68 @@ def test_intracranial_concentration(tmp_path, mri_data_dir: Path): compare_nifti_images(test_outputs[i], ref_outputs[i], data_tolerance=1e-12) +def test_inverse_intracranial_concentration(tmp_path, mri_data_dir: Path): + """ + Tests that taking a generated concentration map and applying the inverse + T1 formula recovers the original post-contrast T1 map within the valid mask. + """ + baseline_path = mri_data_dir / "mri-processed/mri_processed_data/sub-01/T1maps/sub-01_ses-01_T1map_hybrid.nii.gz" + sessions = [1, 2] + + # These are the original T1 maps we want to recover + original_t1_paths = [ + mri_data_dir / f"mri-processed/mri_processed_data/sub-01/T1maps/sub-01_ses-0{i}_T1map_hybrid.nii.gz" for i in sessions + ] + + # These are the concentration maps generated by the forward pass + concentration_paths = [ + mri_data_dir / f"mri-processed/mri_processed_data/sub-01/concentrations/sub-01_ses-0{i}_concentration.nii.gz" + for i in sessions + ] + + r1 = 0.0032 + + # Load the baseline T1 map (T1_0) once + t1_0_mri = mritk.data.MRIData.from_file(baseline_path, dtype=np.single) + t1_0_data = t1_0_mri.data + + for i, s in enumerate(sessions): + # 1. Load the pre-calculated concentration map + conc_mri = mritk.data.MRIData.from_file(concentration_paths[i], dtype=np.single) + conc_data = conc_mri.data + + # 2. Load the original post-contrast T1 map (our ground truth) + orig_t1_mri = mritk.data.MRIData.from_file(original_t1_paths[i], dtype=np.single) + orig_t1_data = orig_t1_mri.data + + # 3. Create a mask of valid voxels. + # The forward function sets unmasked/invalid voxels to NaN. + valid_mask = ~np.isnan(conc_data) + + # 4. Compute the inverse T1 only on valid voxels + recovered_t1_data = np.full_like(conc_data, np.nan) + + # Formula: T1 = 1 / (C * r1 + (1 / T1_0)) + recovered_t1_data[valid_mask] = 1.0 / (conc_data[valid_mask] * r1 + (1.0 / t1_0_data[valid_mask])) + + # 5. Assert that the recovered T1 matches the original T1 strictly within the mask. + # We use assert_allclose because floating point math (1/x) will create minor precision differences. + np.testing.assert_allclose( + recovered_t1_data[valid_mask], + orig_t1_data[valid_mask], + rtol=1e-5, + atol=1e-5, + err_msg=f"Inverse T1 mismatch for session {s}", + ) + + def test_compute_concentration_array_no_mask(): """Test the array computation correctly defaults to keeping all valid positive T1s when no mask is provided.""" t1_data = np.array([1000.0, 1000.0]) t10_data = np.array([2000.0, 2000.0]) r1 = 0.005 - result = compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=None) + result = mritk.concentration.compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=None) assert np.isclose(result[0], 0.1) assert np.isclose(result[1], 0.1) @@ -69,7 +119,7 @@ def test_concentration_from_t1_expr(): # C = 200 * 0.0005 = 0.1 expected = np.array([0.1]) - result = concentration_from_T1_expr(t1, t1_0, r1) + result = mritk.concentration.concentration_from_T1_expr(t1, t1_0, r1) np.testing.assert_array_almost_equal(result, expected) @@ -83,7 +133,7 @@ def test_concentration_from_r1_expr(): # C = 200 * 1.0 = 200.0 expected = np.array([200.0]) - result = concentration_from_R1_expr(r1_map, r1_0_map, r1) + result = mritk.concentration.concentration_from_R1_expr(r1_map, r1_0_map, r1) np.testing.assert_array_almost_equal(result, expected) @@ -96,7 +146,7 @@ def test_compute_concentration_from_T1_array_masking(): mask = np.array([True, True, True, False]) r1 = 0.005 - result = compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=mask) + result = mritk.concentration.compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=mask) # Expectations: # Voxel 0: Valid, should be 0.1 @@ -119,7 +169,7 @@ def test_compute_concentration_from_R1_array_masking(): mask = np.array([True, True, True, False]) r1 = 0.005 - result = compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=mask) + result = mritk.concentration.compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=mask) # Expectations: # Voxel 0: Valid, should be 200.0 @@ -139,12 +189,54 @@ def test_compute_concentration_from_R1_array_no_mask(): r10_data = np.array([1.0, 1.0]) r1 = 0.005 - result = compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=None) + result = mritk.concentration.compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=None) assert np.isclose(result[0], 200.0) assert np.isclose(result[1], 200.0) +def test_r1_concentration_roundtrip(): + """Test that Signal -> Concentration -> Signal roundtrips perfectly for R1.""" + np.random.seed(42) + + # Generate synthetic baseline R1 and post-contrast R1 + # Post-contrast R1 is typically higher than baseline R1 + shape = (10, 10, 10) + r1_0 = np.random.uniform(0.5, 1.5, size=shape) + r1_post = r1_0 + np.random.uniform(0.1, 2.0, size=shape) + r1_relaxivity = 0.0045 + + # 1. Forward: Calculate concentration + concentration = mritk.concentration.concentration_from_R1_expr(r1_post, r1_0, r1_relaxivity) + + # 2. Inverse: Calculate R1 back from concentration + r1_recovered = mritk.concentration.R1_from_concentration_expr(concentration, r1_0, r1_relaxivity) + + # 3. Assert they are effectively identical + np.testing.assert_allclose(r1_recovered, r1_post, rtol=1e-5, atol=1e-8, err_msg="R1 round-trip failed!") + + +def test_t1_concentration_roundtrip(): + """Test that Signal -> Concentration -> Signal roundtrips perfectly for T1.""" + np.random.seed(42) + + # Generate synthetic baseline T1 and post-contrast T1 + # Post-contrast T1 is typically lower than baseline T1 + shape = (10, 10, 10) + t1_0 = np.random.uniform(1000, 2000, size=shape) # milliseconds, for instance + t1_post = t1_0 - np.random.uniform(100, 500, size=shape) + r1_relaxivity = 0.0045 + + # 1. Forward: Calculate concentration + concentration = mritk.concentration.concentration_from_T1_expr(t1_post, t1_0, r1_relaxivity) + + # 2. Inverse: Calculate T1 back from concentration + t1_recovered = mritk.concentration.T1_from_concentration_expr(concentration, t1_0, r1_relaxivity) + + # 3. Assert they are effectively identical + np.testing.assert_allclose(t1_recovered, t1_post, rtol=1e-5, atol=1e-8, err_msg="T1 round-trip failed!") + + @patch("mritk.concentration.concentration_from_T1") def test_dispatch_concentration_t1_defaults(mock_conc_t1): """Test the T1 concentration command with minimum required arguments."""