Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mritk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
statistics,
utils,
)
from .data import MRIData

meta = metadata("mritk")
__version__ = meta["Version"]
Expand All @@ -35,4 +36,5 @@
"hybrid",
"r1",
"statistics",
"MRIData",
]
36 changes: 36 additions & 0 deletions src/mritk/concentration.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,42 @@ def concentration_from_R1_expr(r1_map: np.ndarray, r1_0_map: np.ndarray, r1: flo
return (1.0 / r1) * (r1_map - r1_0_map)


def R1_from_concentration_expr(c: np.ndarray, r1_0: np.ndarray, r1: float) -> np.ndarray:
"""
Computes post-contrast R1 relaxation rates from tracer concentration.

Formula: R1 = C * r1 + R1_0

Args:
c (np.ndarray): Array of tracer concentrations.
r1_0 (np.ndarray): Array of pre-contrast (baseline) R1 relaxation rates.
r1 (float): Relaxivity of the contrast agent.

Returns:
np.ndarray: Computed post-contrast R1 array.
"""
return c * r1 + r1_0


def T1_from_concentration_expr(c: np.ndarray, t1_0: np.ndarray, r1: float) -> np.ndarray:
"""
Computes post-contrast T1 relaxation times from tracer concentration.

Formula: T1 = 1 / (C * r1 + (1 / T1_0))

Args:
c (np.ndarray): Array of tracer concentrations.
t1_0 (np.ndarray): Array of pre-contrast (baseline) T1 relaxation times.
r1 (float): Relaxivity of the contrast agent.

Returns:
np.ndarray: Computed post-contrast T1 array.
"""
# Note: In a robust pipeline, you might want to mask or handle cases
# where t1_0 is 0 to avoid division by zero errors.
return 1.0 / (c * r1 + (1.0 / t1_0))


def compute_concentration_from_T1_array(
t1_data: np.ndarray, t10_data: np.ndarray, r1: float, mask: np.ndarray | None = None
) -> np.ndarray:
Expand Down
2 changes: 1 addition & 1 deletion src/mritk/looklocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def compute_looklocker_t1_array(data: np.ndarray, time_s: np.ndarray, t1_roof: f

logger.debug(f"Starting fitting for {len(d_masked)} voxels.")
with tqdm.tqdm(total=len(d_masked), desc="Fitting Look-Locker Voxels") as pbar:
voxel_fitter = partial(fit_voxel, time_s, pbar)
voxel_fitter = partial(fit_voxel, time_s=time_s, pbar=pbar)
vfunc = np.vectorize(voxel_fitter, signature="(n) -> (3)")
fitted_coefficients = vfunc(d_masked)

Expand Down
19 changes: 13 additions & 6 deletions src/mritk/mixed.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ def extract_single_volume(D: np.ndarray, frame_fg) -> MRIData:


def mixed_t1map(
SE_nii_path: Path, IR_nii_path: Path, meta_path: Path, T1_low: float, T1_high: float, output: Path | None = None
SE_nii_path: Path,
IR_nii_path: Path,
meta_path: Path,
T1_low: float = 500.0,
T1_high: float = 5000.0,
output: Path | None = None,
) -> nibabel.nifti1.Nifti1Image:
"""
Generates a T1 relaxation map by combining Spin-Echo (SE) and Inversion-Recovery (IR) acquisitions.
Expand All @@ -116,8 +121,8 @@ def mixed_t1map(
IR_nii_path (Path): Path to the Inversion-Recovery corrected real NIfTI file.
meta_path (Path): Path to the JSON file containing the sequence parameters
('TR_SE', 'TI', 'TE', 'ETL').
T1_low (float): Lower bound for the T1 interpolation grid (in ms).
T1_high (float): Upper bound for the T1 interpolation grid (in ms).
T1_low (float): Lower bound for the T1 interpolation grid (in ms). Defaults to 500 ms.
T1_high (float): Upper bound for the T1 interpolation grid (in ms). Defaults to 5000 ms.
output (Path | None, optional): Path to save the resulting T1 map NIfTI file. Defaults to None.

Returns:
Expand Down Expand Up @@ -187,16 +192,18 @@ def mixed_t1map_postprocessing(SE_nii_path: Path, T1_path: Path, output: Path |
return masked_t1map_nii


def compute_mixed_t1_array(se_data: np.ndarray, ir_data: np.ndarray, meta: dict, t1_low: float, t1_high: float) -> np.ndarray:
def compute_mixed_t1_array(
se_data: np.ndarray, ir_data: np.ndarray, meta: dict, t1_low: float = 500.0, t1_high: float = 5000.0
) -> np.ndarray:
"""
Computes a Mixed T1 array from Spin-Echo and Inversion-Recovery volumes using a lookup table.

Args:
se_data (np.ndarray): 3D numpy array of the Spin-Echo modulus data.
ir_data (np.ndarray): 3D numpy array of the Inversion-Recovery corrected real data.
meta (dict): Dictionary containing sequence parameters ('TR_SE', 'TI', 'TE', 'ETL').
t1_low (float): Lower bound for T1 generation grid.
t1_high (float): Upper bound for T1 generation grid.
t1_low (float): Lower bound for T1 generation grid. Defaults to 500 ms.
t1_high (float): Upper bound for T1 generation grid. Defaults to 5000 ms.

Returns:
np.ndarray: Computed T1 map as a 3D float32 array.
Expand Down
4 changes: 2 additions & 2 deletions src/mritk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def curve_fit_wrapper(f, t: np.ndarray, y: np.ndarray, p0: np.ndarray):
return popt


def fit_voxel(time_s: np.ndarray, pbar, m: np.ndarray) -> np.ndarray:
def fit_voxel(time_s: np.ndarray, m: np.ndarray, pbar=None) -> np.ndarray:
"""
Fits the Look-Locker relaxation curve for a single voxel's time series.

Expand All @@ -105,8 +105,8 @@ def fit_voxel(time_s: np.ndarray, pbar, m: np.ndarray) -> np.ndarray:

Args:
time_s (np.ndarray): 1D array of trigger times in seconds.
pbar: A tqdm progress bar instance (or None) to update incrementally.
m (np.ndarray): 1D array of signal magnitudes over time for the voxel.
pbar: A tqdm progress bar instance (or None) to update incrementally.

Returns:
np.ndarray: A 3-element array containing the fitted parameters `[x1, x2, x3]`.
Expand Down
120 changes: 106 additions & 14 deletions tests/test_concentration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@
import numpy as np

import mritk.cli
from mritk.concentration import (
compute_concentration_from_R1_array,
compute_concentration_from_T1_array,
concentration_from_R1_expr,
concentration_from_T1,
concentration_from_T1_expr,
)
import mritk.concentration
import mritk.data
from mritk.testing import compare_nifti_images


Expand All @@ -36,7 +31,7 @@ def test_intracranial_concentration(tmp_path, mri_data_dir: Path):
test_outputs = [tmp_path / f"output_ses-0{i}_concentration.nii.gz" for i in sessions]

for i, s in enumerate(sessions):
concentration_from_T1(
mritk.concentration.concentration_from_T1(
input_path=images_path[i],
reference_path=baseline_path,
output_path=test_outputs[i],
Expand All @@ -46,13 +41,68 @@ def test_intracranial_concentration(tmp_path, mri_data_dir: Path):
compare_nifti_images(test_outputs[i], ref_outputs[i], data_tolerance=1e-12)


def test_inverse_intracranial_concentration(tmp_path, mri_data_dir: Path):
"""
Tests that taking a generated concentration map and applying the inverse
T1 formula recovers the original post-contrast T1 map within the valid mask.
"""
baseline_path = mri_data_dir / "mri-processed/mri_processed_data/sub-01/T1maps/sub-01_ses-01_T1map_hybrid.nii.gz"
sessions = [1, 2]

# These are the original T1 maps we want to recover
original_t1_paths = [
mri_data_dir / f"mri-processed/mri_processed_data/sub-01/T1maps/sub-01_ses-0{i}_T1map_hybrid.nii.gz" for i in sessions
]

# These are the concentration maps generated by the forward pass
concentration_paths = [
mri_data_dir / f"mri-processed/mri_processed_data/sub-01/concentrations/sub-01_ses-0{i}_concentration.nii.gz"
for i in sessions
]

r1 = 0.0032

# Load the baseline T1 map (T1_0) once
t1_0_mri = mritk.data.MRIData.from_file(baseline_path, dtype=np.single)
t1_0_data = t1_0_mri.data

for i, s in enumerate(sessions):
# 1. Load the pre-calculated concentration map
conc_mri = mritk.data.MRIData.from_file(concentration_paths[i], dtype=np.single)
conc_data = conc_mri.data

# 2. Load the original post-contrast T1 map (our ground truth)
orig_t1_mri = mritk.data.MRIData.from_file(original_t1_paths[i], dtype=np.single)
orig_t1_data = orig_t1_mri.data

# 3. Create a mask of valid voxels.
# The forward function sets unmasked/invalid voxels to NaN.
valid_mask = ~np.isnan(conc_data)

# 4. Compute the inverse T1 only on valid voxels
recovered_t1_data = np.full_like(conc_data, np.nan)

# Formula: T1 = 1 / (C * r1 + (1 / T1_0))
recovered_t1_data[valid_mask] = 1.0 / (conc_data[valid_mask] * r1 + (1.0 / t1_0_data[valid_mask]))

# 5. Assert that the recovered T1 matches the original T1 strictly within the mask.
# We use assert_allclose because floating point math (1/x) will create minor precision differences.
np.testing.assert_allclose(
recovered_t1_data[valid_mask],
orig_t1_data[valid_mask],
rtol=1e-5,
atol=1e-5,
err_msg=f"Inverse T1 mismatch for session {s}",
)


def test_compute_concentration_array_no_mask():
"""Test the array computation correctly defaults to keeping all valid positive T1s when no mask is provided."""
t1_data = np.array([1000.0, 1000.0])
t10_data = np.array([2000.0, 2000.0])
r1 = 0.005

result = compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=None)
result = mritk.concentration.compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=None)

assert np.isclose(result[0], 0.1)
assert np.isclose(result[1], 0.1)
Expand All @@ -69,7 +119,7 @@ def test_concentration_from_t1_expr():
# C = 200 * 0.0005 = 0.1
expected = np.array([0.1])

result = concentration_from_T1_expr(t1, t1_0, r1)
result = mritk.concentration.concentration_from_T1_expr(t1, t1_0, r1)
np.testing.assert_array_almost_equal(result, expected)


Expand All @@ -83,7 +133,7 @@ def test_concentration_from_r1_expr():
# C = 200 * 1.0 = 200.0
expected = np.array([200.0])

result = concentration_from_R1_expr(r1_map, r1_0_map, r1)
result = mritk.concentration.concentration_from_R1_expr(r1_map, r1_0_map, r1)
np.testing.assert_array_almost_equal(result, expected)


Expand All @@ -96,7 +146,7 @@ def test_compute_concentration_from_T1_array_masking():
mask = np.array([True, True, True, False])
r1 = 0.005

result = compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=mask)
result = mritk.concentration.compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=mask)

# Expectations:
# Voxel 0: Valid, should be 0.1
Expand All @@ -119,7 +169,7 @@ def test_compute_concentration_from_R1_array_masking():
mask = np.array([True, True, True, False])
r1 = 0.005

result = compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=mask)
result = mritk.concentration.compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=mask)

# Expectations:
# Voxel 0: Valid, should be 200.0
Expand All @@ -139,12 +189,54 @@ def test_compute_concentration_from_R1_array_no_mask():
r10_data = np.array([1.0, 1.0])
r1 = 0.005

result = compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=None)
result = mritk.concentration.compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=None)

assert np.isclose(result[0], 200.0)
assert np.isclose(result[1], 200.0)


def test_r1_concentration_roundtrip():
"""Test that Signal -> Concentration -> Signal roundtrips perfectly for R1."""
np.random.seed(42)

# Generate synthetic baseline R1 and post-contrast R1
# Post-contrast R1 is typically higher than baseline R1
shape = (10, 10, 10)
r1_0 = np.random.uniform(0.5, 1.5, size=shape)
r1_post = r1_0 + np.random.uniform(0.1, 2.0, size=shape)
r1_relaxivity = 0.0045

# 1. Forward: Calculate concentration
concentration = mritk.concentration.concentration_from_R1_expr(r1_post, r1_0, r1_relaxivity)

# 2. Inverse: Calculate R1 back from concentration
r1_recovered = mritk.concentration.R1_from_concentration_expr(concentration, r1_0, r1_relaxivity)

# 3. Assert they are effectively identical
np.testing.assert_allclose(r1_recovered, r1_post, rtol=1e-5, atol=1e-8, err_msg="R1 round-trip failed!")


def test_t1_concentration_roundtrip():
"""Test that Signal -> Concentration -> Signal roundtrips perfectly for T1."""
np.random.seed(42)

# Generate synthetic baseline T1 and post-contrast T1
# Post-contrast T1 is typically lower than baseline T1
shape = (10, 10, 10)
t1_0 = np.random.uniform(1000, 2000, size=shape) # milliseconds, for instance
t1_post = t1_0 - np.random.uniform(100, 500, size=shape)
r1_relaxivity = 0.0045

# 1. Forward: Calculate concentration
concentration = mritk.concentration.concentration_from_T1_expr(t1_post, t1_0, r1_relaxivity)

# 2. Inverse: Calculate T1 back from concentration
t1_recovered = mritk.concentration.T1_from_concentration_expr(concentration, t1_0, r1_relaxivity)

# 3. Assert they are effectively identical
np.testing.assert_allclose(t1_recovered, t1_post, rtol=1e-5, atol=1e-8, err_msg="T1 round-trip failed!")


@patch("mritk.concentration.concentration_from_T1")
def test_dispatch_concentration_t1_defaults(mock_conc_t1):
"""Test the T1 concentration command with minimum required arguments."""
Expand Down
Loading