|
| 1 | +"""Low-level MIF format parsing helpers.""" |
| 2 | + |
| 3 | +import sys |
| 4 | + |
| 5 | +import numpy as np |
| 6 | + |
| 7 | + |
| 8 | +def _readline(fileobj) -> bytes: |
| 9 | + """Read one newline-terminated line from *fileobj* using only ``read(1)``. |
| 10 | +
|
| 11 | + This works with any object that implements ``read(n)``, including |
| 12 | + nibabel's ``ImageOpener`` and gzip file objects that lack ``readline``. |
| 13 | + """ |
| 14 | + buf = bytearray() |
| 15 | + while True: |
| 16 | + ch = fileobj.read(1) |
| 17 | + if not ch: |
| 18 | + break |
| 19 | + buf.extend(ch if isinstance(ch, (bytes, bytearray)) else ch.encode('latin-1')) |
| 20 | + if buf[-1:] == b'\n': |
| 21 | + break |
| 22 | + return bytes(buf) |
| 23 | + |
| 24 | + |
| 25 | +_MIF_DTYPE_MAP: dict[str, str] = { |
| 26 | + 'Int8': 'i1', |
| 27 | + 'UInt8': 'u1', |
| 28 | + 'Int16': 'i2', |
| 29 | + 'UInt16': 'u2', |
| 30 | + 'Int32': 'i4', |
| 31 | + 'UInt32': 'u4', |
| 32 | + 'Int64': 'i8', |
| 33 | + 'UInt64': 'u8', |
| 34 | + 'Float32': 'f4', |
| 35 | + 'Float64': 'f8', |
| 36 | + 'CFloat32': 'c8', |
| 37 | + 'CFloat64': 'c16', |
| 38 | +} |
| 39 | + |
| 40 | +_NUMPY_TO_MIF_BASE: dict[tuple[str, int], str] = { |
| 41 | + ('i', 1): 'Int8', |
| 42 | + ('u', 1): 'UInt8', |
| 43 | + ('i', 2): 'Int16', |
| 44 | + ('u', 2): 'UInt16', |
| 45 | + ('i', 4): 'Int32', |
| 46 | + ('u', 4): 'UInt32', |
| 47 | + ('i', 8): 'Int64', |
| 48 | + ('u', 8): 'UInt64', |
| 49 | + ('f', 4): 'Float32', |
| 50 | + ('f', 8): 'Float64', |
| 51 | + ('c', 8): 'CFloat32', |
| 52 | + ('c', 16): 'CFloat64', |
| 53 | +} |
| 54 | + |
| 55 | + |
| 56 | +def _mif_parse_dtype(dtype_str: str) -> np.dtype: |
| 57 | + """Convert a MIF datatype string (e.g. ``'Float32LE'``) to a numpy dtype.""" |
| 58 | + dtype_str = dtype_str.strip() |
| 59 | + if dtype_str.endswith('LE'): |
| 60 | + endian, base = '<', dtype_str[:-2] |
| 61 | + elif dtype_str.endswith('BE'): |
| 62 | + endian, base = '>', dtype_str[:-2] |
| 63 | + else: |
| 64 | + endian = '<' if sys.byteorder == 'little' else '>' |
| 65 | + base = dtype_str |
| 66 | + |
| 67 | + if base not in _MIF_DTYPE_MAP: |
| 68 | + raise ValueError(f'Unknown MIF datatype: {dtype_str!r}') |
| 69 | + |
| 70 | + type_char = _MIF_DTYPE_MAP[base] |
| 71 | + if type_char in ('i1', 'u1'): # single-byte types have no endianness |
| 72 | + return np.dtype(type_char) |
| 73 | + return np.dtype(endian + type_char) |
| 74 | + |
| 75 | + |
| 76 | +def _mif_dtype_to_str(dtype: np.dtype) -> str: |
| 77 | + """Convert a numpy dtype to a MIF datatype string.""" |
| 78 | + dtype = np.dtype(dtype) |
| 79 | + base_name = _NUMPY_TO_MIF_BASE.get((dtype.kind, dtype.itemsize)) |
| 80 | + if base_name is None: |
| 81 | + raise ValueError(f'Cannot represent numpy dtype {dtype!r} in MIF format') |
| 82 | + if dtype.itemsize == 1: |
| 83 | + return base_name |
| 84 | + |
| 85 | + byte_order = dtype.byteorder |
| 86 | + if byte_order == '=': |
| 87 | + byte_order = '<' if sys.byteorder == 'little' else '>' |
| 88 | + elif byte_order == '|': |
| 89 | + return base_name |
| 90 | + return base_name + ('LE' if byte_order == '<' else 'BE') |
| 91 | + |
| 92 | + |
| 93 | +def _mif_parse_layout(layout_str: str, ndim: int) -> list[int]: |
| 94 | + """Parse a MIF layout string to a list of symbolic strides (1-indexed, signed). |
| 95 | +
|
| 96 | + For example ``'-0,-1,+2'`` becomes ``[-1, -2, 3]``. The absolute value |
| 97 | + encodes ordering (1 = fastest-varying axis) and the sign encodes direction. |
| 98 | + """ |
| 99 | + strides = [] |
| 100 | + for token in layout_str.strip().split(','): |
| 101 | + token = token.strip() |
| 102 | + if token.startswith('+'): |
| 103 | + sign, val = 1, int(token[1:]) |
| 104 | + elif token.startswith('-'): |
| 105 | + sign, val = -1, int(token[1:]) |
| 106 | + else: |
| 107 | + sign, val = 1, int(token) |
| 108 | + strides.append(sign * (val + 1)) # convert 0-indexed to 1-indexed |
| 109 | + if len(strides) != ndim: |
| 110 | + raise ValueError(f'Layout has {len(strides)} axes but dim has {ndim}: {layout_str!r}') |
| 111 | + return strides |
| 112 | + |
| 113 | + |
| 114 | +def _mif_layout_to_str(layout: list[int]) -> str: |
| 115 | + """Convert symbolic strides list to a MIF layout string.""" |
| 116 | + tokens = [] |
| 117 | + for s in layout: |
| 118 | + sign = '+' if s >= 0 else '-' |
| 119 | + val = abs(s) - 1 # convert 1-indexed back to 0-indexed |
| 120 | + tokens.append(f'{sign}{val}') |
| 121 | + return ','.join(tokens) |
| 122 | + |
| 123 | + |
| 124 | +def _mif_apply_layout(raw_flat: np.ndarray, shape: tuple, layout: list[int]) -> np.ndarray: |
| 125 | + """Reorder flat MIF disk data into a canonical (positive-stride) numpy array. |
| 126 | +
|
| 127 | + MIF stores data with the axis whose ``|layout[i]|`` equals 1 varying |
| 128 | + fastest on disk, and axes with a negative ``layout[i]`` stored reversed. |
| 129 | + This function returns the array in the logical (mrtrix-canonical) order: |
| 130 | + output index ``(i, j, k, ...)`` corresponds to mrtrix image coordinate |
| 131 | + ``(i, j, k, ...)``, matching what ``mrconvert <in> <out> -strides 1,2,3,...`` |
| 132 | + would produce and what MRtrix tools (``fixelcfestats``, ``mrstats``, ...) |
| 133 | + see when they access the image via the ``Image`` API. |
| 134 | + """ |
| 135 | + ndim = len(shape) |
| 136 | + # Sort axes from fastest (|layout|=1) to slowest |
| 137 | + order = sorted(range(ndim), key=lambda i: abs(layout[i])) |
| 138 | + # Disk layout in C-order: [slowest, ..., fastest] |
| 139 | + disk_axes = list(reversed(order)) |
| 140 | + disk_shape = tuple(shape[i] for i in disk_axes) |
| 141 | + |
| 142 | + data = raw_flat.reshape(disk_shape) |
| 143 | + |
| 144 | + # Transpose: output axis i came from disk position inv_perm[i] |
| 145 | + inv_perm = [0] * ndim |
| 146 | + for disk_pos, orig_axis in enumerate(disk_axes): |
| 147 | + inv_perm[orig_axis] = disk_pos |
| 148 | + data = data.transpose(inv_perm) |
| 149 | + |
| 150 | + # Flip any axis with a negative symbolic stride so the result is in |
| 151 | + # mrtrix-canonical positive-stride order. |
| 152 | + slicer = tuple(slice(None, None, -1) if layout[i] < 0 else slice(None) for i in range(ndim)) |
| 153 | + data = data[slicer] |
| 154 | + |
| 155 | + return np.ascontiguousarray(data) |
| 156 | + |
| 157 | + |
| 158 | +def _mif_apply_layout_for_write(data: np.ndarray, layout: list[int]) -> np.ndarray: |
| 159 | + """Reorder a canonical numpy array into MIF disk layout for writing. |
| 160 | +
|
| 161 | + Inverse of :func:`_mif_apply_layout`: first flip each axis whose |
| 162 | + symbolic stride is negative, then transpose to the disk axis order |
| 163 | + (``[slowest, ..., fastest]`` in C-order). |
| 164 | + """ |
| 165 | + ndim = len(data.shape) |
| 166 | + |
| 167 | + # Flip axes that will be stored reversed on disk |
| 168 | + slicer = tuple(slice(None, None, -1) if layout[i] < 0 else slice(None) for i in range(ndim)) |
| 169 | + data = data[slicer] |
| 170 | + |
| 171 | + # Transpose to disk order: [slowest, ..., fastest] in C-order |
| 172 | + order = sorted(range(ndim), key=lambda i: abs(layout[i])) |
| 173 | + disk_axes = list(reversed(order)) |
| 174 | + data = data.transpose(disk_axes) |
| 175 | + return np.ascontiguousarray(data) |
0 commit comments