From 3501890a4155a1feacd53efcc784063df597d9ba Mon Sep 17 00:00:00 2001 From: Steffen Bollmann Date: Fri, 5 Dec 2025 11:41:03 -0800 Subject: [PATCH 1/2] adding a nifti2mrd converter for testing open recon without dicom data and only nifti data --- nifti2mrd.py | 614 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 614 insertions(+) create mode 100644 nifti2mrd.py diff --git a/nifti2mrd.py b/nifti2mrd.py new file mode 100644 index 0000000..49781fd --- /dev/null +++ b/nifti2mrd.py @@ -0,0 +1,614 @@ +#!/usr/bin/env python3 +""" +NIfTI to ISMRMRD Converter +Converts NIfTI files to ISMRMRD format for testing the OpenRecon pipeline +adapted from: https://github.com/jlautman1/open-recon-fetal-brain-measurements/blob/main/nifti_to_ismrmrd_converter.py +""" + +import os +import sys +import argparse +import re +import numpy as np +import nibabel as nib +import json +from pathlib import Path + +try: + import ismrmrd + print("โœ… Successfully imported ismrmrd module") +except ImportError as e: + print(f"โŒ Failed to import ismrmrd: {e}") + print(" Creating mock ISMRMRD classes for testing...") + + # Create mock ISMRMRD classes for testing + class MockImage: + def __init__(self, data): + # Keep data as-is, don't convert to complex + self.data = data + self.meta = {} + self.attribute_string = "" + self.image_type = 1 # IMTYPE_MAGNITUDE + self.image_index = 0 + self.image_series_index = 1 + + @classmethod + def from_array(cls, data, transpose=True): + return cls(data) + + class MockMeta: + def __init__(self): + self._data = {} + + def __setitem__(self, key, value): + self._data[key] = value + + def __getitem__(self, key): + return self._data[key] + + def get(self, key, default=None): + return self._data.get(key, default) + + def serialize(self): + return json.dumps(self._data) + + # Create a mock ismrmrd module + import types + ismrmrd = types.ModuleType('ismrmrd') + ismrmrd.Image = MockImage + ismrmrd.Meta = MockMeta + IMTYPE_MAGNITUDE = 1 + + +def extract_orientation_from_affine(affine, shape): + """ + Extract position and direction vectors from NIfTI affine matrix + + The affine matrix transforms from voxel coordinates to world coordinates: + [x_world] [r11 r12 r13 tx] [i] + [y_world] = [r21 r22 r23 ty] * [j] + [z_world] [r31 r32 r33 tz] [k] + [ 1 ] [ 0 0 0 1] [1] + + Returns: + position: [x, y, z] position of the first voxel center + read_dir: [x, y, z] direction vector for readout (columns) + phase_dir: [x, y, z] direction vector for phase encoding (rows) + slice_dir: [x, y, z] direction vector for slice (slices) + """ + print("๐Ÿงญ Extracting orientation from affine matrix...") + print(f" Affine matrix:\n{affine}") + + # Extract the rotation/scaling part and translation + # First 3 columns are the direction vectors scaled by voxel size + rotation_scale = affine[:3, :3] + translation = affine[:3, 3] + + # Extract direction vectors (columns of the rotation matrix) + # These need to be normalized to get unit direction vectors + col0 = rotation_scale[:, 0] # First axis (usually X/readout) + col1 = rotation_scale[:, 1] # Second axis (usually Y/phase) + col2 = rotation_scale[:, 2] # Third axis (usually Z/slice) + + # Calculate voxel sizes from the direction vectors + voxel_size_x = np.linalg.norm(col0) + voxel_size_y = np.linalg.norm(col1) + voxel_size_z = np.linalg.norm(col2) + + print(f" Voxel sizes from affine: [{voxel_size_x:.4f}, {voxel_size_y:.4f}, {voxel_size_z:.4f}] mm") + + # Normalize to get unit direction vectors + read_dir = col0 / voxel_size_x if voxel_size_x > 0 else col0 + phase_dir = col1 / voxel_size_y if voxel_size_y > 0 else col1 + slice_dir = col2 / voxel_size_z if voxel_size_z > 0 else col2 + + # Position is the translation (position of first voxel) + position = translation.copy() + + # Convert from RAS (NIfTI) to LPS (ISMRMRD/DICOM) + # Flip X and Y coordinates + print("๐Ÿ”„ Converting from RAS to LPS orientation (flipping X and Y)...") + position[0] = -position[0] + position[1] = -position[1] + + read_dir[0] = -read_dir[0] + read_dir[1] = -read_dir[1] + + phase_dir[0] = -phase_dir[0] + phase_dir[1] = -phase_dir[1] + + slice_dir[0] = -slice_dir[0] + slice_dir[1] = -slice_dir[1] + + print(f" Position: [{position[0]:.4f}, {position[1]:.4f}, {position[2]:.4f}] mm") + print(f" Read direction: [{read_dir[0]:.4f}, {read_dir[1]:.4f}, {read_dir[2]:.4f}]") + print(f" Phase direction: [{phase_dir[0]:.4f}, {phase_dir[1]:.4f}, {phase_dir[2]:.4f}]") + print(f" Slice direction: [{slice_dir[0]:.4f}, {slice_dir[1]:.4f}, {slice_dir[2]:.4f}]") + + return { + 'position': position.tolist(), + 'read_dir': read_dir.tolist(), + 'phase_dir': phase_dir.tolist(), + 'slice_dir': slice_dir.tolist(), + 'voxel_size': [voxel_size_x, voxel_size_y, voxel_size_z] + } + + +def extract_metadata_from_filename(nifti_path): + """Extract patient and series metadata from NIfTI filename""" + filename = os.path.basename(nifti_path) + print(f"๐Ÿท๏ธ Extracting metadata from filename: {filename}") + + # Default metadata + metadata = { + 'config': 'openrecon', + 'enable_measurements': True, + 'enable_reporting': True, + 'confidence_threshold': 0.5, + 'PatientName': 'TEST^PATIENT', + 'StudyDescription': 'OPENRECON TEST', + 'SeriesDescription': 'TEST_SERIES', + 'PixelSpacing': [0.8, 0.8], + 'SliceThickness': 0.8, + 'PatientID': 'TESTPAT001', + 'SeriesNumber': 1 + } + + # Try to parse filename format: Pat[PatientID]_Se[SeriesNumber]_Res[X]_[Y]_Spac[Z].nii.gz + if filename.startswith('Pat') and '_Se' in filename: + try: + # Remove either .nii.gz or .nii extension flexibly + base_name = re.sub(r'\.nii(\.gz)?$', '', filename, flags=re.IGNORECASE) + parts = base_name.split('_') + for part in parts: + if part.startswith('Pat'): + patient_id = part[3:] # Remove 'Pat' prefix + metadata['PatientID'] = patient_id + metadata['PatientName'] = f'PATIENT^{patient_id}' + elif part.startswith('Se'): + series_num = int(part[2:]) # Remove 'Se' prefix + metadata['SeriesNumber'] = series_num + elif part.startswith('Res'): + # Next part should be the Y resolution + idx = parts.index(part) + if idx + 1 < len(parts): + x_res = float(part[3:]) # Remove 'Res' prefix + y_res = float(parts[idx + 1]) + metadata['PixelSpacing'] = [x_res, y_res] + elif part.startswith('Spac'): + slice_thickness = float(part[4:]) # Remove 'Spac' prefix + metadata['SliceThickness'] = slice_thickness + + print(f"โœ… Parsed metadata from filename:") + print(f" Patient ID: {metadata['PatientID']}") + print(f" Series: {metadata['SeriesNumber']}") + print(f" Resolution: {metadata['PixelSpacing']}") + print(f" Slice thickness: {metadata['SliceThickness']}") + + except Exception as e: + print(f"โš ๏ธ Warning: Could not parse filename completely: {e}") + print(" Using default metadata values") + + return metadata + + +def convert_nifti_to_ismrmrd(nifti_path, output_path=None): + """Convert NIfTI file to ISMRMRD format""" + + if not os.path.exists(nifti_path): + raise FileNotFoundError(f"NIfTI file not found: {nifti_path}") + + print(f"๐Ÿ”„ Converting NIfTI to ISMRMRD format") + print(f" Input: {nifti_path}") + + # Load NIfTI data + print("๐Ÿ“– Loading NIfTI file...") + nii = nib.load(nifti_path) + data = nii.get_fdata() + affine = nii.affine + + print(f"๐Ÿ“ Original data shape: {data.shape}") + print(f"๐Ÿ”ข Value range: {data.min():.2f} - {data.max():.2f}") + print(f"๐Ÿ“Š Data type: {data.dtype}") + + # Extract orientation information from affine matrix + orientation_info = extract_orientation_from_affine(affine, data.shape) + + # Normalize data to reasonable range for medical imaging + if data.max() > 4095: # If values are very high, normalize + data = (data / data.max()) * 4095 + print(f"๐Ÿ”ง Normalized data to range: {data.min():.2f} - {data.max():.2f}") + + # Ensure we have 3D data + if len(data.shape) == 2: + data = data[:, :, np.newaxis] + print(f"๐Ÿ“ Expanded 2D to 3D: {data.shape}") + elif len(data.shape) == 4: + data = data[:, :, :, 0] # Take first volume + print(f"๐Ÿ“ Reduced 4D to 3D: {data.shape}") + + # Create ISMRMRD Image object + print("๐Ÿ—๏ธ Creating ISMRMRD Image object...") + + # For magnitude/T2W images, keep as real float32 data + if np.iscomplexobj(data): + # If complex, take magnitude + ismrmrd_data = np.abs(data).astype(np.float32) + print("๐Ÿ”ง Converted complex data to magnitude (float32)") + else: + # Keep as real float32 + ismrmrd_data = data.astype(np.float32) + + print(f"๐Ÿ“Š ISMRMRD data type: {ismrmrd_data.dtype}") + print(f"๐Ÿ“ NIfTI data shape: {ismrmrd_data.shape}") + + # Create ISMRMRD image - no transpose, keep data as-is + # Let ISMRMRD handle storage format internally + try: + ismrmrd_image = ismrmrd.Image.from_array(ismrmrd_data, transpose=False) + except TypeError: + # Older versions don't support transpose parameter + ismrmrd_image = ismrmrd.Image.from_array(ismrmrd_data) + + print(f"๐Ÿ“ ISMRMRD image data shape: {ismrmrd_image.data.shape}") + + # Set basic image properties + if hasattr(ismrmrd_image, 'image_type'): + ismrmrd_image.image_type = IMTYPE_MAGNITUDE if 'IMTYPE_MAGNITUDE' in globals() else 1 + if hasattr(ismrmrd_image, 'image_series_index'): + ismrmrd_image.image_series_index = 1 + if hasattr(ismrmrd_image, 'image_index'): + ismrmrd_image.image_index = 0 + + # Extract metadata from filename + metadata = extract_metadata_from_filename(nifti_path) + + # Add orientation information to metadata + metadata['position'] = orientation_info['position'] + metadata['read_dir'] = orientation_info['read_dir'] + metadata['phase_dir'] = orientation_info['phase_dir'] + metadata['slice_dir'] = orientation_info['slice_dir'] + + # Update pixel spacing from actual affine-derived voxel sizes + voxel_size = orientation_info['voxel_size'] + print(f"๐Ÿ”ง Voxel spacing from affine matrix: {voxel_size}") + + print(f"๐Ÿ“ Original data shape: {ismrmrd_data.shape}") + print(f"๐Ÿ“ ISMRMRD image.data shape: {ismrmrd_image.data.shape}") + + # Check if ISMRMRD image has matrix_size attribute + if hasattr(ismrmrd_image, 'matrix_size'): + print(f"๐Ÿ“ ISMRMRD matrix_size attribute: {ismrmrd_image.matrix_size}") + + # CRITICAL: ISMRMRD stores data in reversed/transposed order (column-major Fortran order) + # Original data: [624, 512, 416] but ISMRMRD reads it as [416, 512, 624] + # So we need to reverse the FOV array to match: [Z_fov, Y_fov, X_fov] + # Correction: Based on testing, the order should be [Y_fov, Z_fov, X_fov] + field_of_view = [ + ismrmrd_data.shape[1] * voxel_size[1], # Y + ismrmrd_data.shape[2] * voxel_size[2], # Z + ismrmrd_data.shape[0] * voxel_size[0] # X + ] + + metadata['PixelSpacing'] = [voxel_size[0], voxel_size[1]] + metadata['SliceThickness'] = voxel_size[2] + metadata['field_of_view'] = field_of_view + + print(f"๐Ÿ“ Voxel spacing: [{voxel_size[0]}, {voxel_size[1]}, {voxel_size[2]}] mm") + print(f"๐Ÿ“ Field of view (reversed for ISMRMRD): {field_of_view} mm") + print(f"๐Ÿ“ Target matrix: {ismrmrd_data.shape[0]} x {ismrmrd_data.shape[1]} x {ismrmrd_data.shape[2]}") + # print(f"๐Ÿ“ ISMRMRD stores as: 416 x 512 x 624") + print(f"๐Ÿ“ Expected voxel: [{field_of_view[0]/ismrmrd_data.shape[1]:.8f}, {field_of_view[1]/ismrmrd_data.shape[2]:.8f}, {field_of_view[2]/ismrmrd_data.shape[0]:.8f}]") + + # Set image metadata + if hasattr(ismrmrd_image, 'meta'): + ismrmrd_image.meta = metadata + + # Set field_of_view on the image header if available + if hasattr(ismrmrd_image, 'field_of_view'): + ismrmrd_image.field_of_view[:] = field_of_view + + # Set position and orientation on the image header if available + if hasattr(ismrmrd_image, 'position'): + ismrmrd_image.position[:] = orientation_info['position'] + print(f"โœ… Set image position: {orientation_info['position']}") + + if hasattr(ismrmrd_image, 'read_dir'): + ismrmrd_image.read_dir[:] = orientation_info['read_dir'] + print(f"โœ… Set read direction: {orientation_info['read_dir']}") + + if hasattr(ismrmrd_image, 'phase_dir'): + ismrmrd_image.phase_dir[:] = orientation_info['phase_dir'] + print(f"โœ… Set phase direction: {orientation_info['phase_dir']}") + + if hasattr(ismrmrd_image, 'slice_dir'): + ismrmrd_image.slice_dir[:] = orientation_info['slice_dir'] + print(f"โœ… Set slice direction: {orientation_info['slice_dir']}") + + # Create XML metadata string for ISMRMRD + meta_obj = ismrmrd.Meta() + for key, value in metadata.items(): + if isinstance(value, (list, tuple)): + meta_obj[key] = list(value) + else: + meta_obj[key] = str(value) + + meta_obj['DataRole'] = 'Image' + meta_obj['ImageProcessingHistory'] = ['NIfTI_CONVERSION'] + meta_obj['Keep_image_geometry'] = 1 + meta_obj['orientation_extracted'] = 'true' + + if hasattr(ismrmrd_image, 'attribute_string'): + ismrmrd_image.attribute_string = meta_obj.serialize() + + print(f"โœ… Successfully created ISMRMRD Image") + print(f" Data shape: {ismrmrd_image.data.shape}") + print(f" Data type: {ismrmrd_image.data.dtype}") + + # Save to file if requested + if output_path: + print(f"๐Ÿ’พ Saving to: {output_path}") + + # Remove existing file if it exists to avoid corruption + if os.path.exists(output_path): + os.remove(output_path) + print(f"๐Ÿ—‘๏ธ Removed existing file: {output_path}") + + try: + # Check if we have the real ismrmrd module (not mock) + if hasattr(ismrmrd, 'Dataset'): + print(f"๐Ÿ“ Creating proper ISMRMRD Dataset file...") + + # Ensure parent directory exists + os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) + + # Create ISMRMRD Dataset (this creates proper structure) + mrdDset = ismrmrd.Dataset(output_path, 'dataset') + mrdDset._file.require_group('dataset') + + # Create XML header with proper metadata + print(f"๐Ÿ“‹ Creating ISMRMRD XML header...") + mrdHead = ismrmrd.xsd.ismrmrdHeader() + + # Study Information + mrdHead.studyInformation = ismrmrd.xsd.studyInformationType() + mrdHead.studyInformation.studyDescription = metadata.get('StudyDescription', 'NIFTI_CONVERSION') + + # Patient Information + mrdHead.subjectInformation = ismrmrd.xsd.subjectInformationType() + mrdHead.subjectInformation.patientName = metadata.get('PatientName', 'TEST^PATIENT') + mrdHead.subjectInformation.patientID = metadata.get('PatientID', 'TEST001') + + # Acquisition System Information + mrdHead.acquisitionSystemInformation = ismrmrd.xsd.acquisitionSystemInformationType() + mrdHead.acquisitionSystemInformation.systemVendor = 'NIfTI_Converter' + mrdHead.acquisitionSystemInformation.systemModel = 'Virtual' + mrdHead.acquisitionSystemInformation.institutionName = 'Test' + + # Encoding information + encoding = ismrmrd.xsd.encodingType() + encoding.trajectory = ismrmrd.xsd.trajectoryType.CARTESIAN + + # Get voxel size and FOV from metadata + voxel_size = orientation_info['voxel_size'] + + # Encoded space (acquisition space) + encoding.encodedSpace = ismrmrd.xsd.encodingSpaceType() + encoding.encodedSpace.matrixSize = ismrmrd.xsd.matrixSizeType() + encoding.encodedSpace.matrixSize.x = int(ismrmrd_data.shape[0]) + encoding.encodedSpace.matrixSize.y = int(ismrmrd_data.shape[1]) + encoding.encodedSpace.matrixSize.z = int(ismrmrd_data.shape[2]) + + encoding.encodedSpace.fieldOfView_mm = ismrmrd.xsd.fieldOfViewMm() + encoding.encodedSpace.fieldOfView_mm.x = float(ismrmrd_data.shape[0] * voxel_size[0]) + encoding.encodedSpace.fieldOfView_mm.y = float(ismrmrd_data.shape[1] * voxel_size[1]) + encoding.encodedSpace.fieldOfView_mm.z = float(ismrmrd_data.shape[2] * voxel_size[2]) + + # Recon space (same as encoded for NIfTI conversion) + encoding.reconSpace = ismrmrd.xsd.encodingSpaceType() + encoding.reconSpace.matrixSize = ismrmrd.xsd.matrixSizeType() + encoding.reconSpace.matrixSize.x = int(ismrmrd_data.shape[0]) + encoding.reconSpace.matrixSize.y = int(ismrmrd_data.shape[1]) + encoding.reconSpace.matrixSize.z = int(ismrmrd_data.shape[2]) + + encoding.reconSpace.fieldOfView_mm = ismrmrd.xsd.fieldOfViewMm() + encoding.reconSpace.fieldOfView_mm.x = float(ismrmrd_data.shape[0] * voxel_size[0]) + encoding.reconSpace.fieldOfView_mm.y = float(ismrmrd_data.shape[1] * voxel_size[1]) + encoding.reconSpace.fieldOfView_mm.z = float(ismrmrd_data.shape[2] * voxel_size[2]) + + # Encoding limits + encoding.encodingLimits = ismrmrd.xsd.encodingLimitsType() + + mrdHead.encoding.append(encoding) + + # Sequence parameters + mrdHead.sequenceParameters = ismrmrd.xsd.sequenceParametersType() + mrdHead.sequenceParameters.TR = [1.0] + mrdHead.sequenceParameters.TE = [1.0] + + print(f" Matrix size: {ismrmrd_data.shape[0]} x {ismrmrd_data.shape[1]} x {ismrmrd_data.shape[2]}") + print(f" FOV: {encoding.encodedSpace.fieldOfView_mm.x:.2f} x {encoding.encodedSpace.fieldOfView_mm.y:.2f} x {encoding.encodedSpace.fieldOfView_mm.z:.2f} mm") + print(f" Voxel size: {voxel_size[0]:.4f} x {voxel_size[1]:.4f} x {voxel_size[2]:.4f} mm") + + # Write XML header + mrdDset.write_xml_header(mrdHead.toXML('utf-8')) + print(f"โœ… Written XML header") + + # Create image with proper metadata + tmpMeta = ismrmrd.Meta() + for key, value in metadata.items(): + if isinstance(value, (list, tuple)): + tmpMeta[key] = list(value) + else: + tmpMeta[key] = str(value) + + tmpMeta['DataRole'] = 'Image' + tmpMeta['ImageProcessingHistory'] = ['NIFTI_CONVERSION'] + tmpMeta['Keep_image_geometry'] = 1 + + ismrmrd_image.attribute_string = tmpMeta.serialize() + + # Set image series index + ismrmrd_image.image_series_index = metadata.get('SeriesNumber', 1) + + # Write each slice as a separate image + print(f"๐Ÿ’พ Writing {ismrmrd_data.shape[2]} slices as separate images...") + + for slice_idx in range(ismrmrd_data.shape[2]): + # Extract 2D slice [X, Y] + slice_data = ismrmrd_data[:, :, slice_idx].astype(np.float32) + + # Create ISMRMRD image for this slice + try: + slice_image = ismrmrd.Image.from_array(slice_data, transpose=False) + except TypeError: + slice_image = ismrmrd.Image.from_array(slice_data) + + # Set image properties + slice_image.image_type = IMTYPE_MAGNITUDE if 'IMTYPE_MAGNITUDE' in globals() else 1 + slice_image.image_series_index = metadata.get('SeriesNumber', 1) + slice_image.image_index = slice_idx + + # Calculate position for this slice + # Position of slice = position of first slice + (slice_idx * slice_spacing * slice_direction) + slice_position = ( + np.array(orientation_info['position']) + + slice_idx * voxel_size[2] * np.array(orientation_info['slice_dir']) + ) + + # Set position and orientation on the slice + if hasattr(slice_image, 'position'): + slice_image.position[:] = slice_position.tolist() + + if hasattr(slice_image, 'read_dir'): + slice_image.read_dir[:] = orientation_info['read_dir'] + + if hasattr(slice_image, 'phase_dir'): + slice_image.phase_dir[:] = orientation_info['phase_dir'] + + if hasattr(slice_image, 'slice_dir'): + slice_image.slice_dir[:] = orientation_info['slice_dir'] + + # Set field_of_view for 2D slice (only X and Y, Z is single slice) + # Correction: Based on testing, the order should be [Y_fov, X_fov, Thickness] + slice_fov = [ + ismrmrd_data.shape[1] * voxel_size[1], + ismrmrd_data.shape[0] * voxel_size[0], + voxel_size[2] # Single slice thickness + ] + + if hasattr(slice_image, 'field_of_view'): + slice_image.field_of_view[:] = slice_fov + + # Create metadata for this slice + slice_meta = ismrmrd.Meta() + for key, value in metadata.items(): + if isinstance(value, (list, tuple)): + slice_meta[key] = list(value) + else: + slice_meta[key] = str(value) + + slice_meta['DataRole'] = 'Image' + slice_meta['ImageProcessingHistory'] = ['NIFTI_CONVERSION'] + slice_meta['Keep_image_geometry'] = 1 + slice_meta['slice_number'] = slice_idx + slice_meta['position'] = slice_position.tolist() + + slice_image.attribute_string = slice_meta.serialize() + + # Set slice and phase index + slice_image.slice = slice_idx + slice_image.phase = 0 + slice_image.acquisition_time_stamp = 0 + + # Write slice to dataset + # Use series index as key to group all slices in one series + series_index = metadata.get('SeriesNumber', 1) + mrdDset.append_image("image_%d" % series_index, slice_image) + + if (slice_idx + 1) % 50 == 0 or slice_idx == ismrmrd_data.shape[2] - 1: + print(f" Written {slice_idx + 1}/{ismrmrd_data.shape[2]} slices...") + + # Close dataset + mrdDset.close() + + print(f"โœ… Saved {ismrmrd_data.shape[2]} slices to {output_path}") + else: + raise ImportError("ismrmrd.Dataset not available - cannot create proper ISMRMRD file") + + except (ImportError, AttributeError, Exception) as e: + print(f"โŒ Could not save as ISMRMRD file: {e}") + import traceback + traceback.print_exc() + raise + + return ismrmrd_image, metadata + + +def main(): + """Main function to test the converter""" + print("๐Ÿงช NIfTI to ISMRMRD Converter") + print("=" * 50) + + # CLI arguments + parser = argparse.ArgumentParser(description="Convert a NIfTI file to ISMRMRD format for OpenRecon testing") + parser.add_argument( + "-i", "--input", + dest="nifti_file", + help="Path to the NIfTI file to convert, e.g. Pat[PatientID]_Se[SeriesNumber]_Res[X]_[Y]_Spac[Z].nii.gz", + default="Pat[PatientID]_Se[SeriesNumber]_Res[X]_[Y]_Spac[Z].nii.gz" + ) + parser.add_argument( + "-o", "--output", + dest="output_path", + help="Optional output path for serialized ISMRMRD data (pickle)", + default="test_ismrmrd_output.h5" + ) + args = parser.parse_args() + + # Resolve inputs + nifti_file = args.nifti_file + output_path = args.output_path + + if not os.path.exists(nifti_file): + print(f"โŒ Test file not found: {nifti_file}") + print(" Please check the file path") + return False + + try: + # Convert NIfTI to ISMRMRD + print(f"โžก๏ธ Using input: {nifti_file}") + print(f"โžก๏ธ Output path: {output_path}") + ismrmrd_image, metadata = convert_nifti_to_ismrmrd(nifti_file, output_path) + + print("\n๐Ÿ“‹ Conversion Summary:") + print(f" Input file: {nifti_file}") + print(f" Original 3D volume shape: {ismrmrd_image.data.shape}") + print(f" Number of 2D slices saved: {ismrmrd_image.data.shape[2]}") + print(f" Each slice shape: [{ismrmrd_image.data.shape[0]}, {ismrmrd_image.data.shape[1]}]") + print(f" Patient ID: {metadata.get('PatientID', 'Unknown')}") + print(f" Series: {metadata.get('SeriesNumber', 'Unknown')}") + print(f" PixelSpacing: {metadata.get('PixelSpacing', 'Unknown')}") + print(f" SliceThickness: {metadata.get('SliceThickness', 'Unknown')}") + print(f"\n๐Ÿงญ Orientation Information:") + print(f" First slice position: {metadata.get('position', 'Unknown')}") + print(f" Read direction: {metadata.get('read_dir', 'Unknown')}") + print(f" Phase direction: {metadata.get('phase_dir', 'Unknown')}") + print(f" Slice direction: {metadata.get('slice_dir', 'Unknown')}") + print(f" Slice spacing: {metadata.get('SliceThickness', 'Unknown')} mm") + + return ismrmrd_image, metadata + + except Exception as e: + print(f"โŒ Error during conversion: {e}") + import traceback + traceback.print_exc() + return False + + +if __name__ == "__main__": + result = main() + if result: + print("\n๐ŸŽ‰ Conversion completed successfully!") + else: + print("\n๐Ÿ’ฅ Conversion failed!") From 044bad19e9c8814f7d5629a29f540ff84d40b410 Mon Sep 17 00:00:00 2001 From: Steffen Bollmann Date: Thu, 19 Feb 2026 19:21:00 +0000 Subject: [PATCH 2/2] fixed orientation issues --- enhanceddicom2mrd.py | 579 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 579 insertions(+) create mode 100644 enhanceddicom2mrd.py diff --git a/enhanceddicom2mrd.py b/enhanceddicom2mrd.py new file mode 100644 index 0000000..01435ab --- /dev/null +++ b/enhanceddicom2mrd.py @@ -0,0 +1,579 @@ +import pydicom +import argparse +import ismrmrd +import ismrmrd.xsd +import numpy as np +import os +import ctypes +import re +import base64 + +# Defaults for input arguments +defaults = { + 'outGroup': 'dataset', +} + +# Lookup table between DICOM and MRD image types +imtype_map = {'M': ismrmrd.IMTYPE_MAGNITUDE, + 'P': ismrmrd.IMTYPE_PHASE, + 'R': ismrmrd.IMTYPE_REAL, + 'I': ismrmrd.IMTYPE_IMAG} + +# Lookup table between DICOM and Siemens flow directions +venc_dir_map = {'rl' : 'FLOW_DIR_R_TO_L', + 'lr' : 'FLOW_DIR_L_TO_R', + 'ap' : 'FLOW_DIR_A_TO_P', + 'pa' : 'FLOW_DIR_P_TO_A', + 'fh' : 'FLOW_DIR_F_TO_H', + 'hf' : 'FLOW_DIR_H_TO_F', + 'in' : 'FLOW_DIR_TP_IN', + 'out' : 'FLOW_DIR_TP_OUT'} + +class DicomImage: + def __init__(self, dset, frame_idx=0): + self.dset = dset + self.frame_idx = frame_idx + self.is_enhanced = (dset.SOPClassUID.name == 'Enhanced MR Image Storage') + + def get_group(self, group_name): + if not self.is_enhanced: + return None + + # Check PerFrame + if group_name in self.dset.PerFrameFunctionalGroupsSequence[self.frame_idx]: + return self.dset.PerFrameFunctionalGroupsSequence[self.frame_idx][group_name][0] + # Check Shared + if group_name in self.dset.SharedFunctionalGroupsSequence[0]: + return self.dset.SharedFunctionalGroupsSequence[0][group_name][0] + return None + + @property + def pixel_array(self): + if self.is_enhanced: + return self.dset.pixel_array[self.frame_idx] + else: + return self.dset.pixel_array + + @property + def PixelSpacing(self): + if self.is_enhanced: + measure = self.get_group('PixelMeasuresSequence') + if measure: + return measure.PixelSpacing + return [1.0, 1.0] + else: + return self.dset.PixelSpacing + + @property + def SliceThickness(self): + if self.is_enhanced: + measure = self.get_group('PixelMeasuresSequence') + if measure: + return measure.SliceThickness + return 1.0 + else: + return self.dset.SliceThickness + + @property + def ImagePositionPatient(self): + if self.is_enhanced: + pos = self.get_group('PlanePositionSequence') + if pos: + return pos.ImagePositionPatient + return [0.0, 0.0, 0.0] + else: + return self.dset.ImagePositionPatient + + @property + def ImageOrientationPatient(self): + if self.is_enhanced: + orient = self.get_group('PlaneOrientationSequence') + if orient: + return orient.ImageOrientationPatient + return [1, 0, 0, 0, 1, 0] + else: + return self.dset.ImageOrientationPatient + + @property + def SliceLocation(self): + # Calculate SliceLocation from Position and Orientation to be consistent + # SliceLocation is the projection of Position onto the normal vector of the slice. + try: + pos = np.array(self.ImagePositionPatient, dtype=float) + orient = np.array(self.ImageOrientationPatient, dtype=float) + normal = np.cross(orient[0:3], orient[3:6]) + return np.dot(pos, normal) + except: + if not self.is_enhanced and 'SliceLocation' in self.dset: + return self.dset.SliceLocation + return 0.0 + + @property + def TemporalPositionIndex(self): + if self.is_enhanced: + content = self.get_group('FrameContentSequence') + if content and 'TemporalPositionIndex' in content: + return content.TemporalPositionIndex + return 0 + + @property + def InStackPositionNumber(self): + if self.is_enhanced: + content = self.get_group('FrameContentSequence') + if content and 'InStackPositionNumber' in content: + return int(content.InStackPositionNumber) + return None + + @property + def StackID(self): + if self.is_enhanced: + content = self.get_group('FrameContentSequence') + if content and 'StackID' in content: + return str(content.StackID) + return None + + @property + def TriggerTime(self): + if self.is_enhanced: + cardiac = self.get_group('CardiacSynchronizationSequence') + if cardiac and 'NominalCardiacTriggerDelayTime' in cardiac: + return float(cardiac.NominalCardiacTriggerDelayTime) + + # Fallback to FrameContentSequence -> TemporalPositionIndex if needed? + # Or just return 0.0 + return 0.0 + else: + return float(self.dset.get('TriggerTime', 0.0)) + + @property + def AcquisitionTime(self): + if self.is_enhanced: + content = self.get_group('FrameContentSequence') + if content and 'FrameAcquisitionDateTime' in content: + dt = content.FrameAcquisitionDateTime + # DT is YYYYMMDDHHMMSS.FFFFFF + if len(dt) > 8: + return dt[8:] + return self.dset.get('AcquisitionTime', '000000.00') + else: + return self.dset.get('AcquisitionTime', '000000.00') + + @property + def InstanceNumber(self): + if self.is_enhanced: + return (self.dset.InstanceNumber * 10000) + self.frame_idx + 1 + else: + return self.dset.InstanceNumber + + @property + def SeriesNumber(self): + return self.dset.SeriesNumber + + @property + def SeriesDescription(self): + return self.dset.get('SeriesDescription', '') + + @property + def ImageType(self): + if self.is_enhanced: + content = self.get_group('MRImageFrameTypeSequence') + if content: + return content.FrameType + return self.dset.get('ImageType', ['','','MAGNITUDE']) + else: + return self.dset.get('ImageType', ['','','MAGNITUDE']) + + @property + def Rows(self): + return self.dset.Rows + + @property + def Columns(self): + return self.dset.Columns + + @property + def ImageComments(self): + return self.dset.get('ImageComments', '') + + @property + def SequenceName(self): + return self.dset.get('SequenceName', '') + + def get_private_item(self, group, element, creator): + try: + return self.dset.get_private_item(group, element, creator) + except: + return None + + def to_json(self): + return self.dset.to_json() + + +def _normalize(vec): + vec = np.asarray(vec, dtype=float) + norm = np.linalg.norm(vec) + if norm == 0: + return vec + return vec / norm + + +def CreateMrdHeader(dset): + """Create MRD XML header from a DICOM file""" + + mrdHead = ismrmrd.xsd.ismrmrdHeader() + + mrdHead.measurementInformation = ismrmrd.xsd.measurementInformationType() + mrdHead.measurementInformation.measurementID = dset.SeriesInstanceUID + mrdHead.measurementInformation.patientPosition = dset.PatientPosition + mrdHead.measurementInformation.protocolName = dset.SeriesDescription + mrdHead.measurementInformation.frameOfReferenceUID = dset.FrameOfReferenceUID + + mrdHead.acquisitionSystemInformation = ismrmrd.xsd.acquisitionSystemInformationType() + mrdHead.acquisitionSystemInformation.systemVendor = dset.Manufacturer + mrdHead.acquisitionSystemInformation.systemModel = dset.ManufacturerModelName + try: + mrdHead.acquisitionSystemInformation.systemFieldStrength_T = float(dset.MagneticFieldStrength) + except: + pass + + try: + mrdHead.acquisitionSystemInformation.institutionName = dset.InstitutionName + except: + mrdHead.acquisitionSystemInformation.institutionName = 'Virtual' + try: + mrdHead.acquisitionSystemInformation.stationName = dset.StationName + except: + pass + + mrdHead.experimentalConditions = ismrmrd.xsd.experimentalConditionsType() + try: + mrdHead.experimentalConditions.H1resonanceFrequency_Hz = int(dset.MagneticFieldStrength*4258e4) + except: + pass + + enc = ismrmrd.xsd.encodingType() + enc.trajectory = ismrmrd.xsd.trajectoryType('cartesian') + encSpace = ismrmrd.xsd.encodingSpaceType() + encSpace.matrixSize = ismrmrd.xsd.matrixSizeType() + encSpace.matrixSize.x = dset.Columns + encSpace.matrixSize.y = dset.Rows + encSpace.matrixSize.z = 1 + encSpace.fieldOfView_mm = ismrmrd.xsd.fieldOfViewMm() + + if dset.SOPClassUID.name == 'Enhanced MR Image Storage': + # Use first frame as reference + groups = dset.SharedFunctionalGroupsSequence[0] + if 'PixelMeasuresSequence' not in groups: + groups = dset.PerFrameFunctionalGroupsSequence[0] + + # PixelSpacing is [RowSpacing, ColSpacing] -> [Y_spacing, X_spacing] + pixel_spacing = groups.PixelMeasuresSequence[0].PixelSpacing + slice_thickness = float(groups.PixelMeasuresSequence[0].SliceThickness) + + encSpace.fieldOfView_mm.x = pixel_spacing[1]*dset.Columns + encSpace.fieldOfView_mm.y = pixel_spacing[0]*dset.Rows + encSpace.fieldOfView_mm.z = slice_thickness + else: + pixel_spacing = dset.PixelSpacing + slice_thickness = float(dset.SliceThickness) + + encSpace.fieldOfView_mm.x = pixel_spacing[1]*dset.Columns + encSpace.fieldOfView_mm.y = pixel_spacing[0]*dset.Rows + encSpace.fieldOfView_mm.z = slice_thickness + enc.encodedSpace = encSpace + enc.reconSpace = encSpace + enc.encodingLimits = ismrmrd.xsd.encodingLimitsType() + enc.parallelImaging = ismrmrd.xsd.parallelImagingType() + + enc.parallelImaging.accelerationFactor = ismrmrd.xsd.accelerationFactorType() + if dset.SOPClassUID.name == 'Enhanced MR Image Storage': + # Try to find MRModifierSequence + found_accel = False + if 'MRModifierSequence' in dset.SharedFunctionalGroupsSequence[0]: + mod = dset.SharedFunctionalGroupsSequence[0].MRModifierSequence[0] + if 'ParallelReductionFactorInPlane' in mod: + enc.parallelImaging.accelerationFactor.kspace_encoding_step_1 = mod.ParallelReductionFactorInPlane + enc.parallelImaging.accelerationFactor.kspace_encoding_step_2 = mod.ParallelReductionFactorOutOfPlane + found_accel = True + + if not found_accel: + enc.parallelImaging.accelerationFactor.kspace_encoding_step_1 = 1 + enc.parallelImaging.accelerationFactor.kspace_encoding_step_2 = 1 + else: + enc.parallelImaging.accelerationFactor.kspace_encoding_step_1 = 1 + enc.parallelImaging.accelerationFactor.kspace_encoding_step_2 = 1 + + mrdHead.encoding.append(enc) + + mrdHead.sequenceParameters = ismrmrd.xsd.sequenceParametersType() + + return mrdHead + +def GetDicomFiles(directory): + """Get path to all DICOMs in a directory and its sub-directories""" + for entry in os.scandir(directory): + if entry.is_file() and (entry.path.lower().endswith(".dcm") or entry.path.lower().endswith(".ima")): + yield entry.path + elif entry.is_dir(): + yield from GetDicomFiles(entry.path) + + +def main(args): + dsetsAll = [] + for entryPath in GetDicomFiles(args.folder): + try: + dset = pydicom.dcmread(entryPath) + dsetsAll.append(dset) + except Exception as e: + print(f"Error reading {entryPath}: {e}") + + if not dsetsAll: + print(f"No DICOM files found in {args.folder}") + return + + # Group by series number + uSeriesNum = np.unique([dset.SeriesNumber for dset in dsetsAll]) + + # Re-group series that were split during conversion from multi-frame to single-frame DICOMs + if all(uSeriesNum > 1000): + for i in range(len(dsetsAll)): + dsetsAll[i].SeriesNumber = int(np.floor(dsetsAll[i].SeriesNumber / 1000)) + uSeriesNum = np.unique([dset.SeriesNumber for dset in dsetsAll]) + + print("Found %d unique series from %d files in folder %s" % (len(uSeriesNum), len(dsetsAll), args.folder)) + + print("Creating MRD XML header from file %s" % dsetsAll[0].filename) + mrdHead = CreateMrdHeader(dsetsAll[0]) + print(mrdHead.toXML()) + + imgAll = [None]*len(uSeriesNum) + + for iSer in range(len(uSeriesNum)): + # Get all files for this series + series_dsets = [dset for dset in dsetsAll if dset.SeriesNumber == uSeriesNum[iSer]] + + # Expand to DicomImage objects (handling Enhanced DICOM frames) + images = [] + for dset in series_dsets: + if dset.SOPClassUID.name == 'Enhanced MR Image Storage': + nFrames = getattr(dset, 'NumberOfFrames', 1) + for i in range(nFrames): + images.append(DicomImage(dset, i)) + else: + images.append(DicomImage(dset)) + + # Group by Phase (TemporalPositionIndex for Enhanced, TriggerTime for others) + is_enhanced_series = any(img.is_enhanced for img in images) + + if is_enhanced_series: + key_func = lambda img: img.TemporalPositionIndex + else: + key_func = lambda img: img.TriggerTime + + uKeys = sorted(list(set(key_func(img) for img in images))) + + imgAll[iSer] = [] + + for iPhase, key in enumerate(uKeys): + # Get images for this phase + phase_imgs = [img for img in images if key_func(img) == key] + + # Sort slices robustly: + # 1) Enhanced frame stack coordinates when available + # 2) Geometric slice position along slice normal + # 3) InstanceNumber as final tie-breaker + def sort_key(img): + in_stack = img.InStackPositionNumber + stack_id = img.StackID or '' + try: + slice_loc = float(img.SliceLocation) + except Exception: + slice_loc = 0.0 + return ( + 0 if in_stack is not None else 1, + stack_id, + int(in_stack) if in_stack is not None else 0, + slice_loc, + int(img.InstanceNumber), + ) + + phase_imgs.sort(key=sort_key) + + # Ensure slice index increases in the same physical direction as slice_dir. + if len(phase_imgs) > 1: + row_dir = _normalize(phase_imgs[0].ImageOrientationPatient[0:3]) + col_dir = _normalize(phase_imgs[0].ImageOrientationPatient[3:6]) + slice_dir = _normalize(np.cross(row_dir, col_dir)) + + for i_check in range(1, len(phase_imgs)): + pos0 = np.asarray(phase_imgs[0].ImagePositionPatient, dtype=float) + pos1 = np.asarray(phase_imgs[i_check].ImagePositionPatient, dtype=float) + step = float(np.dot(pos1 - pos0, slice_dir)) + if step < 0: + phase_imgs.reverse() + break + if step > 0: + break + + if not phase_imgs: + continue + + # Calculate total FOV for the volume + refImg = phase_imgs[0] + num_slices = len(phase_imgs) + total_fov_z = float(refImg.SliceThickness * num_slices) + + # Create separate MRD Image for each slice + for iSlice, sliceImg in enumerate(phase_imgs): + # Create MRD Image from single slice. + # Keep DICOM's [rows, cols] layout to match direction vectors and spacing. + tmpMrdImg = ismrmrd.Image.from_array(sliceImg.pixel_array, transpose=False) + tmpMeta = ismrmrd.Meta() + + try: + # ImageType is a list/tuple. Index 2 is usually M/P/R/I + # Enhanced: FrameType + itype = sliceImg.ImageType + if len(itype) > 2: + tmpMrdImg.image_type = imtype_map.get(itype[2], ismrmrd.IMTYPE_MAGNITUDE) + elif len(itype) > 0: + # Try to guess from first char of first element? + tmpMrdImg.image_type = imtype_map.get(itype[0][0], ismrmrd.IMTYPE_MAGNITUDE) + else: + tmpMrdImg.image_type = ismrmrd.IMTYPE_MAGNITUDE + except: + print("Unsupported ImageType %s -- defaulting to IMTYPE_MAGNITUDE" % sliceImg.ImageType) + tmpMrdImg.image_type = ismrmrd.IMTYPE_MAGNITUDE + + # DICOM PixelSpacing is [row_spacing, col_spacing] => [Y, X]. + tmpMrdImg.field_of_view = ( + float(sliceImg.PixelSpacing[1] * sliceImg.Columns), + float(sliceImg.PixelSpacing[0] * sliceImg.Rows), + float(sliceImg.SliceThickness), + ) + + # Note: matrix_size is read-only and derived from data shape (cols, rows, 1 for 2D slices) + # The slice index indicates position within the volume + + row_dir = _normalize(sliceImg.ImageOrientationPatient[0:3]) + col_dir = _normalize(sliceImg.ImageOrientationPatient[3:6]) + slice_dir = _normalize(np.cross(row_dir, col_dir)) + + tmpMrdImg.position = tuple(np.asarray(sliceImg.ImagePositionPatient, dtype=float)) + tmpMrdImg.read_dir = tuple(row_dir) + tmpMrdImg.phase_dir = tuple(col_dir) + tmpMrdImg.slice_dir = tuple(slice_dir) + + # AcquisitionTime HHMMSS.FFFFFF + acq_time = sliceImg.AcquisitionTime + try: + # Handle potential empty or malformed time + if acq_time and len(acq_time) >= 6: + h = int(acq_time[0:2]) + m = int(acq_time[2:4]) + s = int(acq_time[4:6]) + f = float(acq_time[6:]) if len(acq_time) > 6 else 0.0 + tmpMrdImg.acquisition_time_stamp = round((h*3600 + m*60 + s + f)*1000/2.5) + else: + tmpMrdImg.acquisition_time_stamp = 0 + except: + tmpMrdImg.acquisition_time_stamp = 0 + + try: + tmpMrdImg.physiology_time_stamp[0] = int(round(sliceImg.TriggerTime / 2.5)) + except: + pass + + try: + item = sliceImg.get_private_item(0x0019, 0x13, 'SIEMENS MR HEADER') + if item: + ImaAbsTablePosition = item.value + tmpMrdImg.patient_table_position = (ctypes.c_float(ImaAbsTablePosition[0]), ctypes.c_float(ImaAbsTablePosition[1]), ctypes.c_float(ImaAbsTablePosition[2])) + except: + pass + + tmpMrdImg.image_series_index = uSeriesNum.tolist().index(sliceImg.SeriesNumber) + tmpMrdImg.image_index = sliceImg.InstanceNumber + + tmpMrdImg.slice = iSlice + tmpMrdImg.phase = iPhase + + # Store original DICOM ImageType in metadata + try: + itype = sliceImg.ImageType + if itype: + tmpMeta['ImageType'] = '\\'.join(str(x) for x in itype) + except: + tmpMeta['ImageType'] = '' + + try: + seq_name = sliceImg.SequenceName + res = re.search(r'(?<=_v).*$', seq_name) + if res: + venc = re.search(r'^\d+', res.group(0)) + dir = re.search(r'(?<=\d)[^\d]*$', res.group(0)) + if venc and dir: + tmpMeta['FlowVelocity'] = float(venc.group(0)) + tmpMeta['FlowDirDisplay'] = venc_dir_map.get(dir.group(0), '') + except: + pass + + try: + tmpMeta['ImageComments'] = sliceImg.ImageComments + except: + pass + + tmpMeta['SeriesDescription'] = sliceImg.SeriesDescription + tmpMeta['SequenceDescription'] = sliceImg.SeriesDescription + tmpMeta['Keep_image_geometry'] = 1 + + tmpMrdImg.attribute_string = tmpMeta.serialize() + + imgAll[iSer].append(tmpMrdImg) + + print("Series %d: Created %d 2D images (slices x phases)" % (uSeriesNum[iSer], len(imgAll[iSer]))) + + # Create an MRD file + print("Creating MRD file %s with group %s" % (args.outFile, args.outGroup)) + mrdDset = ismrmrd.Dataset(args.outFile, args.outGroup) + mrdDset._file.require_group(args.outGroup) + + # Write MRD Header + mrdDset.write_xml_header(bytes(mrdHead.toXML(), 'utf-8')) + + # Write all images + for iSer in range(len(imgAll)): + for iImg in range(len(imgAll[iSer])): + if imgAll[iSer][iImg] is not None: + mrdDset.append_image("image_%d" % imgAll[iSer][iImg].image_series_index, imgAll[iSer][iImg]) + + if len(imgAll) > 0 and len(imgAll[0]) > 0 and imgAll[0][0] is not None: + first_img = imgAll[0][0] + print(f"First Image FOV: {first_img.field_of_view}") + print(f"First Image Matrix Size: {first_img.matrix_size}") + + print(f"Header FOV: {mrdHead.encoding[0].encodedSpace.fieldOfView_mm.x} {mrdHead.encoding[0].encodedSpace.fieldOfView_mm.y} {mrdHead.encoding[0].encodedSpace.fieldOfView_mm.z}") + + fov = mrdHead.encoding[0].encodedSpace.fieldOfView_mm + matrix = mrdHead.encoding[0].encodedSpace.matrixSize + print(f"Computed Voxel Size: {fov.x/matrix.x} {fov.y/matrix.y} {fov.z/matrix.z}") + + mrdDset.close() + +if __name__ == '__main__': + """Basic conversion of a folder of DICOM files to MRD .h5 format""" + + parser = argparse.ArgumentParser(description='Convert DICOMs to MRD file', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('folder', help='Input folder of DICOMs') + parser.add_argument('-o', '--outFile', help='Output MRD file') + parser.add_argument('-g', '--outGroup', help='Group name in output MRD file') + + parser.set_defaults(**defaults) + + args = parser.parse_args() + + if args.outFile is None: + args.outFile = os.path.basename(args.folder) + '.h5' + + main(args)