diff --git a/.github/workflows/tox.yml b/.github/workflows/tox.yml
index 0ba105f..4db05c1 100644
--- a/.github/workflows/tox.yml
+++ b/.github/workflows/tox.yml
@@ -44,6 +44,12 @@ jobs:
with:
fetch-depth: 0
+ - name: Install mrtrix3
+ run: |
+ sudo apt-get update
+ sudo apt-get install -y --no-install-recommends mrtrix3
+ mrconvert -version | head -1
+
- name: Install the latest version of uv
uses: astral-sh/setup-uv@v8.0.0
diff --git a/docs/examples/plot_fixel_workflow.py b/docs/examples/plot_fixel_workflow.py
index f06a6de..dcef1f7 100644
--- a/docs/examples/plot_fixel_workflow.py
+++ b/docs/examples/plot_fixel_workflow.py
@@ -6,7 +6,7 @@
use the ``modelarrayio to-modelarray`` command to convert the MIF files to the HDF5 format
(``.h5``) used by **ModelArray**,
and ``modelarrayio export-results`` to export results back to MIF.
-This guide assumes **ModelArrayIO** and **MRtrix** are already installed.
+This guide assumes **ModelArrayIO** is already installed.
"""
# %%
@@ -132,8 +132,8 @@
#
# .. warning::
#
-# **Existing files are not overwritten.** ``modelarrayio export-results`` calls ``mrconvert`` without
-# ``-force``, so any ``.mif`` file already present in ``--output-dir`` with the same name
+# **Existing files are not overwritten.** With ``modelarrayio export-results``, any ``.mif`` file
+# already present in ``--output-dir`` with the same name
# will be left unchanged. If ``--output-dir`` itself already exists you will see a
# ``WARNING: Output directory exists`` message, but no files will be deleted. To start
# fresh, manually remove the output directory before re-running ``modelarrayio export-results``.
diff --git a/docs/installation.rst b/docs/installation.rst
index 1d01c75..335a977 100644
--- a/docs/installation.rst
+++ b/docs/installation.rst
@@ -14,16 +14,6 @@ If you want to use the most up-to-date version, you can install from the ``main`
pip install git+https://github.com/PennLINC/ModelArrayIO.git
-MRtrix (required for fixel ``.mif`` only)
------------------------------------------
-
-Fixel-wise ``.mif`` conversion tools use MRtrix ``mrconvert``.
-Install MRtrix from `MRtrix's webpage `_ if needed.
-Run ``mrview`` in the terminal to verify the installation.
-
-If your data are in NIfTI or CIFTI format only, you can skip this step.
-
-
What Next?
----------
diff --git a/pyproject.toml b/pyproject.toml
index 6771069..6ccb90c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -182,6 +182,7 @@ exclude_lines = [
[tool.pytest.ini_options]
markers = [
"s3: tests that require network access to public S3 (deselect with '-m not s3')",
+ "downloaded_data: tests that download external fixture data before running",
]
log_cli = true
diff --git a/src/modelarrayio/cli/h5_to_mif.py b/src/modelarrayio/cli/h5_to_mif.py
index 603a827..8ac189e 100644
--- a/src/modelarrayio/cli/h5_to_mif.py
+++ b/src/modelarrayio/cli/h5_to_mif.py
@@ -6,10 +6,10 @@
from pathlib import Path
import h5py
-import nibabel as nb
+import numpy as np
from modelarrayio.cli import utils as cli_utils
-from modelarrayio.utils.mif import mif_to_nifti2, nifti2_to_mif
+from modelarrayio.utils.mif import MifImage, mif_to_image
logger = logging.getLogger(__name__)
@@ -24,10 +24,9 @@ def h5_to_mif(example_mif, in_file, analysis_name, compress, output_dir):
named ``results/has_names``. This data can be of any type and does not need to contain
more than a single row of data. Instead, its attributes are read to get column names
for the data represented in ``results/results_matrix``.
- The function takes the example mif file and converts it to Nifti2 to get a header.
- Then each column in ``results/results_matrix`` is extracted to fill the data of a
- new Nifti2 file that gets converted to mif and named according to the corresponding
- item in ``results/has_names``.
+ The function takes the example mif file as a template header. Then each column in
+ ``results/results_matrix`` is extracted to fill the data of a new ``MifImage`` and
+ named according to the corresponding item in ``results/has_names``.
Parameters
----------
@@ -47,9 +46,10 @@ def h5_to_mif(example_mif, in_file, analysis_name, compress, output_dir):
-------
None
"""
- # Get a template nifti image.
- nifti2_img, _ = mif_to_nifti2(example_mif)
+ # Use the example MIF as the template so layout and metadata stay native to MIF.
+ template_img, _ = mif_to_image(example_mif)
output_path = Path(output_dir)
+ output_path.mkdir(parents=True, exist_ok=True)
ext = '.mif.gz' if compress else '.mif'
with h5py.File(in_file, 'r') as h5_data:
results_matrix = h5_data[f'results/{analysis_name}/results_matrix']
@@ -60,22 +60,50 @@ def h5_to_mif(example_mif, in_file, analysis_name, compress, output_dir):
for result_col, result_name in enumerate(results_names):
valid_result_name = cli_utils.sanitize_result_name(result_name)
out_mif = output_path / f'{analysis_name}_{valid_result_name}{ext}'
- temp_nifti2 = nb.Nifti2Image(
- results_matrix[result_col, :].reshape(-1, 1, 1),
- nifti2_img.affine,
- header=nifti2_img.header,
+ write_mif(
+ arr=np.asarray(results_matrix[result_col, :], dtype=np.float32),
+ template_img=template_img,
+ out_file=out_mif,
)
- nifti2_to_mif(temp_nifti2, out_mif)
if 'p.value' not in valid_result_name:
continue
valid_result_name_1mpvalue = valid_result_name.replace('p.value', '1m.p.value')
out_mif_1mpvalue = output_path / f'{analysis_name}_{valid_result_name_1mpvalue}{ext}'
- output_mifvalues_1mpvalue = 1 - results_matrix[result_col, :]
- temp_nifti2_1mpvalue = nb.Nifti2Image(
- output_mifvalues_1mpvalue.reshape(-1, 1, 1),
- nifti2_img.affine,
- header=nifti2_img.header,
+ output_mifvalues_1mpvalue = np.asarray(
+ 1 - results_matrix[result_col, :],
+ dtype=np.float32,
)
- nifti2_to_mif(temp_nifti2_1mpvalue, out_mif_1mpvalue)
+ write_mif(
+ arr=output_mifvalues_1mpvalue,
+ template_img=template_img,
+ out_file=out_mif_1mpvalue,
+ )
+
+ return 0
+
+
+def write_mif(arr, template_img, out_file):
+ """Write array to MIF file.
+
+ Parameters
+ ----------
+ arr : :obj:`numpy.ndarray`
+ Array to write to file.
+ template_img : :obj:`MifImage`
+ Template MIF image.
+ out_file : :obj:`pathlib.Path`
+ Output file to write.
+ If it already exists, this function will raise a warning and not overwrite.
+ """
+ if out_file.exists():
+ logger.warning('Output file already exists. Not overwriting. %s', out_file)
+ return
+
+ result_data = arr.reshape(template_img.shape)
+ result_header = template_img.header.copy()
+ result_header.set_data_shape(result_data.shape)
+ result_header.set_data_dtype(result_data.dtype)
+ result_img = MifImage(result_data, template_img.affine, header=result_header)
+ result_img.to_filename(out_file)
diff --git a/src/modelarrayio/utils/_mif_format.py b/src/modelarrayio/utils/_mif_format.py
new file mode 100644
index 0000000..1aa8a64
--- /dev/null
+++ b/src/modelarrayio/utils/_mif_format.py
@@ -0,0 +1,175 @@
+"""Low-level MIF format parsing helpers."""
+
+import sys
+
+import numpy as np
+
+
+def _readline(fileobj) -> bytes:
+ """Read one newline-terminated line from *fileobj* using only ``read(1)``.
+
+ This works with any object that implements ``read(n)``, including
+ nibabel's ``ImageOpener`` and gzip file objects that lack ``readline``.
+ """
+ buf = bytearray()
+ while True:
+ ch = fileobj.read(1)
+ if not ch:
+ break
+ buf.extend(ch if isinstance(ch, (bytes, bytearray)) else ch.encode('latin-1'))
+ if buf[-1:] == b'\n':
+ break
+ return bytes(buf)
+
+
+_MIF_DTYPE_MAP: dict[str, str] = {
+ 'Int8': 'i1',
+ 'UInt8': 'u1',
+ 'Int16': 'i2',
+ 'UInt16': 'u2',
+ 'Int32': 'i4',
+ 'UInt32': 'u4',
+ 'Int64': 'i8',
+ 'UInt64': 'u8',
+ 'Float32': 'f4',
+ 'Float64': 'f8',
+ 'CFloat32': 'c8',
+ 'CFloat64': 'c16',
+}
+
+_NUMPY_TO_MIF_BASE: dict[tuple[str, int], str] = {
+ ('i', 1): 'Int8',
+ ('u', 1): 'UInt8',
+ ('i', 2): 'Int16',
+ ('u', 2): 'UInt16',
+ ('i', 4): 'Int32',
+ ('u', 4): 'UInt32',
+ ('i', 8): 'Int64',
+ ('u', 8): 'UInt64',
+ ('f', 4): 'Float32',
+ ('f', 8): 'Float64',
+ ('c', 8): 'CFloat32',
+ ('c', 16): 'CFloat64',
+}
+
+
+def _mif_parse_dtype(dtype_str: str) -> np.dtype:
+ """Convert a MIF datatype string (e.g. ``'Float32LE'``) to a numpy dtype."""
+ dtype_str = dtype_str.strip()
+ if dtype_str.endswith('LE'):
+ endian, base = '<', dtype_str[:-2]
+ elif dtype_str.endswith('BE'):
+ endian, base = '>', dtype_str[:-2]
+ else:
+ endian = '<' if sys.byteorder == 'little' else '>'
+ base = dtype_str
+
+ if base not in _MIF_DTYPE_MAP:
+ raise ValueError(f'Unknown MIF datatype: {dtype_str!r}')
+
+ type_char = _MIF_DTYPE_MAP[base]
+ if type_char in ('i1', 'u1'): # single-byte types have no endianness
+ return np.dtype(type_char)
+ return np.dtype(endian + type_char)
+
+
+def _mif_dtype_to_str(dtype: np.dtype) -> str:
+ """Convert a numpy dtype to a MIF datatype string."""
+ dtype = np.dtype(dtype)
+ base_name = _NUMPY_TO_MIF_BASE.get((dtype.kind, dtype.itemsize))
+ if base_name is None:
+ raise ValueError(f'Cannot represent numpy dtype {dtype!r} in MIF format')
+ if dtype.itemsize == 1:
+ return base_name
+
+ byte_order = dtype.byteorder
+ if byte_order == '=':
+ byte_order = '<' if sys.byteorder == 'little' else '>'
+ elif byte_order == '|':
+ return base_name
+ return base_name + ('LE' if byte_order == '<' else 'BE')
+
+
+def _mif_parse_layout(layout_str: str, ndim: int) -> list[int]:
+ """Parse a MIF layout string to a list of symbolic strides (1-indexed, signed).
+
+ For example ``'-0,-1,+2'`` becomes ``[-1, -2, 3]``. The absolute value
+ encodes ordering (1 = fastest-varying axis) and the sign encodes direction.
+ """
+ strides = []
+ for token in layout_str.strip().split(','):
+ token = token.strip()
+ if token.startswith('+'):
+ sign, val = 1, int(token[1:])
+ elif token.startswith('-'):
+ sign, val = -1, int(token[1:])
+ else:
+ sign, val = 1, int(token)
+ strides.append(sign * (val + 1)) # convert 0-indexed to 1-indexed
+ if len(strides) != ndim:
+ raise ValueError(f'Layout has {len(strides)} axes but dim has {ndim}: {layout_str!r}')
+ return strides
+
+
+def _mif_layout_to_str(layout: list[int]) -> str:
+ """Convert symbolic strides list to a MIF layout string."""
+ tokens = []
+ for s in layout:
+ sign = '+' if s >= 0 else '-'
+ val = abs(s) - 1 # convert 1-indexed back to 0-indexed
+ tokens.append(f'{sign}{val}')
+ return ','.join(tokens)
+
+
+def _mif_apply_layout(raw_flat: np.ndarray, shape: tuple, layout: list[int]) -> np.ndarray:
+ """Reorder flat MIF disk data into a canonical (positive-stride) numpy array.
+
+ MIF stores data with the axis whose ``|layout[i]|`` equals 1 varying
+ fastest on disk, and axes with a negative ``layout[i]`` stored reversed.
+ This function returns the array in the logical (mrtrix-canonical) order:
+ output index ``(i, j, k, ...)`` corresponds to mrtrix image coordinate
+ ``(i, j, k, ...)``, matching what ``mrconvert -strides 1,2,3,...``
+ would produce and what MRtrix tools (``fixelcfestats``, ``mrstats``, ...)
+ see when they access the image via the ``Image`` API.
+ """
+ ndim = len(shape)
+ # Sort axes from fastest (|layout|=1) to slowest
+ order = sorted(range(ndim), key=lambda i: abs(layout[i]))
+ # Disk layout in C-order: [slowest, ..., fastest]
+ disk_axes = list(reversed(order))
+ disk_shape = tuple(shape[i] for i in disk_axes)
+
+ data = raw_flat.reshape(disk_shape)
+
+ # Transpose: output axis i came from disk position inv_perm[i]
+ inv_perm = [0] * ndim
+ for disk_pos, orig_axis in enumerate(disk_axes):
+ inv_perm[orig_axis] = disk_pos
+ data = data.transpose(inv_perm)
+
+ # Flip any axis with a negative symbolic stride so the result is in
+ # mrtrix-canonical positive-stride order.
+ slicer = tuple(slice(None, None, -1) if layout[i] < 0 else slice(None) for i in range(ndim))
+ data = data[slicer]
+
+ return np.ascontiguousarray(data)
+
+
+def _mif_apply_layout_for_write(data: np.ndarray, layout: list[int]) -> np.ndarray:
+ """Reorder a canonical numpy array into MIF disk layout for writing.
+
+ Inverse of :func:`_mif_apply_layout`: first flip each axis whose
+ symbolic stride is negative, then transpose to the disk axis order
+ (``[slowest, ..., fastest]`` in C-order).
+ """
+ ndim = len(data.shape)
+
+ # Flip axes that will be stored reversed on disk
+ slicer = tuple(slice(None, None, -1) if layout[i] < 0 else slice(None) for i in range(ndim))
+ data = data[slicer]
+
+ # Transpose to disk order: [slowest, ..., fastest] in C-order
+ order = sorted(range(ndim), key=lambda i: abs(layout[i]))
+ disk_axes = list(reversed(order))
+ data = data.transpose(disk_axes)
+ return np.ascontiguousarray(data)
diff --git a/src/modelarrayio/utils/mif.py b/src/modelarrayio/utils/mif.py
index aa9d913..5a93fc0 100644
--- a/src/modelarrayio/utils/mif.py
+++ b/src/modelarrayio/utils/mif.py
@@ -1,73 +1,20 @@
"""Utility functions for MIF data."""
-import shutil
-import subprocess
-import tempfile
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
-import nibabel as nb
import numpy as np
import pandas as pd
from tqdm import tqdm
+from modelarrayio.utils.mif_image import MifHeader, MifImage
-def find_mrconvert():
- """Find the mrconvert executable on the system.
+__all__ = ['MifHeader', 'MifImage', 'gather_fixels', 'load_cohort_mif', 'mif_to_image']
- Returns
- -------
- :obj:`str`
- Path to the mrconvert executable.
- """
- return shutil.which('mrconvert')
-
-
-def _require_mrconvert() -> str:
- mrconvert = find_mrconvert()
- if mrconvert is None:
- raise FileNotFoundError('The mrconvert executable could not be found on $PATH.')
- return mrconvert
-
-
-def _run_mrconvert(source_file: Path, output_file: Path) -> None:
- try:
- subprocess.run(
- [_require_mrconvert(), str(source_file), str(output_file)],
- check=True,
- capture_output=True,
- text=True,
- )
- except subprocess.CalledProcessError as exc:
- message = exc.stderr.strip() or exc.stdout.strip() or 'mrconvert failed.'
- raise RuntimeError(
- f'mrconvert failed while converting {source_file} to {output_file}: {message}'
- ) from exc
-
-
-def nifti2_to_mif(nifti2_image, mif_file):
- """Convert a .nii file to a .mif file.
-
- Parameters
- ----------
- nifti2_image : :obj:`nibabel.Nifti2Image`
- Nifti2 image
- mif_file : :obj:`str`
- Path to a .mif file
- """
- output_path = Path(mif_file)
- with tempfile.TemporaryDirectory() as temp_dir:
- temp_nii = Path(temp_dir) / 'mrconvert_input.nii'
- nifti2_image.to_filename(temp_nii)
- _run_mrconvert(temp_nii, output_path)
- if not output_path.exists():
- raise RuntimeError(f'mrconvert did not create expected output file: {output_path}')
-
-
-def mif_to_nifti2(mif_file):
- """Convert a .mif file to a .nii file.
+def mif_to_image(mif_file):
+ """Load a `.mif` file into a :class:`MifImage`.
Parameters
----------
@@ -76,29 +23,15 @@ def mif_to_nifti2(mif_file):
Returns
-------
- nifti2_img : :obj:`nibabel.Nifti2Image`
- Nifti2 image
+ mif_img : :obj:`MifImage`
+ Loaded MIF image
data : :obj:`numpy.ndarray`
- Data from the nifti2 image
+ Data from the MIF image
"""
input_path = Path(mif_file)
- if input_path.suffix == '.nii':
- nifti2_img = nb.load(input_path)
- data = nifti2_img.get_fdata(dtype=np.float32).squeeze()
- return nifti2_img, data
-
- with tempfile.TemporaryDirectory() as temp_dir:
- nii_path = Path(temp_dir) / 'mif.nii'
- _run_mrconvert(input_path, nii_path)
- if not nii_path.exists():
- raise RuntimeError(f'mrconvert did not create expected output file: {nii_path}')
-
- loaded_img = nb.load(nii_path)
- in_memory_data = np.asanyarray(loaded_img.dataobj)
- nifti2_img = nb.Nifti2Image(in_memory_data, loaded_img.affine, header=loaded_img.header)
- data = loaded_img.get_fdata(dtype=np.float32).squeeze()
-
- return nifti2_img, data
+ mif_img = MifImage.from_filename(str(input_path))
+ data = mif_img.get_fdata(dtype=np.float32).squeeze()
+ return mif_img, data
def load_cohort_mif(cohort_long, s3_workers):
@@ -138,7 +71,7 @@ def load_cohort_mif(cohort_long, s3_workers):
def _worker(job):
sn, subj_idx, src = job
- _img, data = mif_to_nifti2(src)
+ _img, data = mif_to_image(src)
return sn, subj_idx, data
if s3_workers > 1:
@@ -181,7 +114,7 @@ def gather_fixels(index_file, directions_file):
voxel_table : :obj:`pandas.DataFrame`
DataFrame with voxel_id, i, j, k
"""
- _index_img, index_data = mif_to_nifti2(index_file)
+ _index_img, index_data = mif_to_image(index_file)
# number of fixels in each voxel; by index.mif definition
count_vol = index_data[..., 0].astype(np.uint32)
# index of the first fixel in this voxel, in the list of all fixels
@@ -189,7 +122,10 @@ def gather_fixels(index_file, directions_file):
id_vol = index_data[..., 1]
max_id = id_vol.max()
# = the maximum id of fixels + 1 = # of fixels in entire image
- max_fixel_id = max_id + int(count_vol[id_vol == max_id])
+ terminal_counts = count_vol[id_vol == max_id]
+ if terminal_counts.size == 0:
+ raise ValueError('Could not determine the final fixel count from the index image.')
+ max_fixel_id = int(max_id) + int(terminal_counts.max())
voxel_mask = count_vol > 0 # voxels that contains fixel(s), =1
masked_ids = id_vol[voxel_mask] # 1D array, len = # of voxels with fixel(s), value see id_vol
masked_counts = count_vol[voxel_mask] # dim as masked_ids; value see count_vol
@@ -221,7 +157,7 @@ def gather_fixels(index_file, directions_file):
}
)
- _directions_img, directions_data = mif_to_nifti2(directions_file)
+ _directions_img, directions_data = mif_to_image(directions_file)
fixel_table = pd.DataFrame(
{
'fixel_id': fixel_ids,
diff --git a/src/modelarrayio/utils/mif_image.py b/src/modelarrayio/utils/mif_image.py
new file mode 100644
index 0000000..da6f822
--- /dev/null
+++ b/src/modelarrayio/utils/mif_image.py
@@ -0,0 +1,400 @@
+"""Nibabel-compatible image classes for MIF (.mif / .mif.gz) files."""
+
+from copy import deepcopy
+from typing import Self
+
+import numpy as np
+from nibabel.filebasedimages import FileBasedHeader
+from nibabel.spatialimages import SpatialImage
+
+from modelarrayio.utils._mif_format import (
+ _mif_apply_layout,
+ _mif_apply_layout_for_write,
+ _mif_dtype_to_str,
+ _mif_layout_to_str,
+ _mif_parse_dtype,
+ _mif_parse_layout,
+ _readline,
+)
+
+
+class MifHeader(FileBasedHeader):
+ """Header for MIF (.mif / .mif.gz) image files.
+
+ The MIF format uses a text header with ``key: value`` pairs followed by
+ ``END``, then binary image data at the byte offset given by the
+ ``file: . `` entry.
+
+ The transform stored in the file contains *unit direction cosines* for
+ each voxel axis; voxel sizes are stored separately in the ``vox`` field.
+ The nibabel 4x4 affine is reconstructed as::
+
+ affine[:3, :3] = transform[:3, :3] * zooms # column-wise scaling
+ affine[:3, 3] = transform[:3, 3] # translation unchanged
+ """
+
+ def __init__(
+ self,
+ shape: tuple = (1,),
+ zooms: tuple | None = None,
+ layout: list[int] | None = None,
+ dtype: np.dtype | None = None,
+ transform: np.ndarray | None = None,
+ intensity_offset: float = 0.0,
+ intensity_scale: float = 1.0,
+ keyval: dict | None = None,
+ ) -> None:
+ self._shape = tuple(int(s) for s in shape)
+ ndim = len(self._shape)
+ self._zooms = tuple(float(z) for z in zooms) if zooms is not None else (1.0,) * ndim
+ self._layout = list(layout) if layout is not None else list(range(1, ndim + 1))
+ self._dtype = np.dtype(dtype) if dtype is not None else np.dtype('f4')
+ if transform is not None:
+ self._transform = np.array(transform, dtype=np.float64).reshape(3, 4)
+ else:
+ self._transform = np.eye(3, 4, dtype=np.float64)
+ self._intensity_offset = float(intensity_offset)
+ self._intensity_scale = float(intensity_scale)
+ self._keyval: dict[str, str] = dict(keyval) if keyval is not None else {}
+ self._data_offset: int | None = None # populated by from_fileobj
+
+ @classmethod
+ def from_header(cls, header=None):
+ if header is None:
+ return cls()
+ if type(header) is cls:
+ return header.copy()
+ raise NotImplementedError(f'Cannot convert {type(header)} to {cls}')
+
+ @classmethod
+ def from_fileobj(cls, fileobj) -> Self:
+ """Read a MIF header from a binary file-like object.
+
+ Uses only ``read(1)`` internally so it works with nibabel's
+ ``ImageOpener`` and gzip streams as well as regular file objects.
+ """
+ first_line = _readline(fileobj).decode('latin-1').rstrip('\n\r')
+ if first_line != 'mrtrix image':
+ raise ValueError(f'Not a MIF file (expected "mrtrix image", got {first_line!r})')
+
+ shape = None
+ zooms = None
+ layout_str = None
+ dtype_str = None
+ transform_rows: list[list[float]] = []
+ scaling = None
+ keyval: dict[str, str] = {}
+ file_entry = None
+
+ while True:
+ line = _readline(fileobj).decode('latin-1')
+ line = line.rstrip('\n\r')
+ if line == 'END' or not line:
+ break
+ comment_pos = line.find('#')
+ if comment_pos >= 0:
+ line = line[:comment_pos]
+ line = line.strip()
+ if not line or ':' not in line:
+ continue
+
+ colon = line.index(':')
+ key = line[:colon].strip()
+ value = line[colon + 1 :].strip()
+ if not key or not value:
+ continue
+
+ lkey = key.lower()
+ if lkey == 'dim':
+ shape = tuple(int(x.strip()) for x in value.split(','))
+ elif lkey == 'vox':
+ zooms = tuple(float(x.strip()) for x in value.split(','))
+ elif lkey == 'layout':
+ layout_str = value
+ elif lkey == 'datatype':
+ dtype_str = value
+ elif lkey == 'transform':
+ transform_rows.append([float(x.strip()) for x in value.split(',')])
+ elif lkey == 'scaling':
+ scaling = [float(x.strip()) for x in value.split(',')]
+ elif lkey == 'file':
+ file_entry = value
+ else:
+ # Preserve case and accumulate multi-line values
+ keyval[key] = (keyval[key] + '\n' + value) if key in keyval else value
+
+ if shape is None:
+ raise ValueError('Missing "dim" in MIF header')
+ if zooms is None:
+ raise ValueError('Missing "vox" in MIF header')
+ if dtype_str is None:
+ raise ValueError('Missing "datatype" in MIF header')
+ if layout_str is None:
+ raise ValueError('Missing "layout" in MIF header')
+
+ dtype = _mif_parse_dtype(dtype_str)
+ layout = _mif_parse_layout(layout_str, len(shape))
+
+ transform = np.eye(3, 4, dtype=np.float64)
+ if len(transform_rows) >= 3:
+ for r in range(3):
+ for c in range(min(4, len(transform_rows[r]))):
+ transform[r, c] = transform_rows[r][c]
+
+ intensity_offset, intensity_scale = 0.0, 1.0
+ if scaling is not None and len(scaling) == 2:
+ intensity_offset, intensity_scale = scaling[0], scaling[1]
+
+ hdr = cls(
+ shape=shape,
+ zooms=zooms,
+ layout=layout,
+ dtype=dtype,
+ transform=transform,
+ intensity_offset=intensity_offset,
+ intensity_scale=intensity_scale,
+ keyval=keyval,
+ )
+
+ if file_entry is not None:
+ parts = file_entry.split()
+ if len(parts) >= 2:
+ hdr._data_offset = int(parts[1])
+ elif len(parts) == 1 and parts[0] != '.':
+ hdr._data_offset = 0 # external data file (MIH format)
+
+ return hdr
+
+ def write_to(self, fileobj, data_offset: int | None = None) -> int:
+ """Write the MIF header to *fileobj*, returning the data byte offset.
+
+ The ``file: . \\nEND\\n`` footer and any alignment padding are
+ written so that the caller can immediately append the binary data.
+ """
+ lines = ['mrtrix image']
+ lines.append(f'dim: {",".join(str(s) for s in self._shape)}')
+ lines.append(f'vox: {",".join(str(float(z)) for z in self._zooms)}')
+ lines.append(f'layout: {_mif_layout_to_str(self._layout)}')
+ lines.append(f'datatype: {_mif_dtype_to_str(self._dtype)}')
+
+ for row in range(3):
+ row_vals = ', '.join(repr(float(self._transform[row, col])) for col in range(4))
+ lines.append(f'transform: {row_vals}')
+
+ if self._intensity_offset != 0.0 or self._intensity_scale != 1.0:
+ lines.append(f'scaling: {self._intensity_offset},{self._intensity_scale}')
+
+ for key, value in self._keyval.items():
+ lines.extend(f'{key}: {line_val}' for line_val in value.split('\n'))
+
+ pre_file_bytes = ('\n'.join(lines) + '\n').encode('latin-1')
+ pre_file_pos = len(pre_file_bytes)
+
+ if data_offset is None:
+ # Iteratively compute the offset so that the file: line fits exactly.
+ file_prefix = b'file: . '
+ end_suffix = b'\nEND\n'
+ offset = pre_file_pos + len(file_prefix) + 5 + len(end_suffix)
+ offset += (4 - offset % 4) % 4
+ for _ in range(5):
+ file_line = file_prefix + str(offset).encode() + end_suffix
+ total = pre_file_pos + len(file_line)
+ new_offset = total + (4 - total % 4) % 4
+ if new_offset == offset:
+ break
+ offset = new_offset
+ data_offset = offset
+
+ file_line = f'file: . {data_offset}\nEND\n'.encode('latin-1')
+ fileobj.write(pre_file_bytes)
+ fileobj.write(file_line)
+
+ current_pos = pre_file_pos + len(file_line)
+ padding = data_offset - current_pos
+ if padding > 0:
+ fileobj.write(b'\x00' * padding)
+
+ return data_offset
+
+ def copy(self) -> Self:
+ return deepcopy(self)
+
+ # ------------------------------------------------------------------
+ # Nibabel SpatialHeader protocol
+ # ------------------------------------------------------------------
+
+ def get_data_shape(self) -> tuple:
+ return self._shape
+
+ def set_data_shape(self, shape) -> None:
+ self._shape = tuple(int(s) for s in shape)
+
+ def get_zooms(self) -> tuple:
+ return self._zooms
+
+ def set_zooms(self, zooms) -> None:
+ self._zooms = tuple(float(z) for z in zooms)
+
+ def get_data_dtype(self) -> np.dtype:
+ return self._dtype
+
+ def set_data_dtype(self, dtype) -> None:
+ self._dtype = np.dtype(dtype)
+
+ def get_layout(self) -> list[int]:
+ return list(self._layout)
+
+ def get_transform(self) -> np.ndarray:
+ """Return a copy of the 3x4 direction-cosine + translation matrix."""
+ return self._transform.copy()
+
+ def get_best_affine(self) -> np.ndarray:
+ """Return the 4x4 affine mapping canonical voxel indices to scanner space (mm).
+
+ The image data returned by :meth:`MifImage.get_fdata` is always in
+ mrtrix-canonical (positive-stride) order, so the affine is simply
+ built from the MIF ``transform`` and ``vox`` fields:
+
+ affine[:3, :3] = transform[:, :3] * vox # column-wise scale
+ affine[:3, 3] = transform[:, 3]
+ """
+ affine = np.eye(4, dtype=np.float64)
+ n_spatial = min(3, len(self._zooms), len(self._shape))
+ zooms = np.ones(3, dtype=np.float64)
+ zooms[:n_spatial] = self._zooms[:n_spatial]
+
+ affine[:3, :3] = self._transform[:, :3] * zooms
+ affine[:3, 3] = self._transform[:, 3]
+ return affine
+
+ def get_intensity_scaling(self) -> tuple[float, float]:
+ """Return ``(offset, scale)`` for intensity rescaling."""
+ return self._intensity_offset, self._intensity_scale
+
+ def get_keyval(self) -> dict[str, str]:
+ return dict(self._keyval)
+
+ __hash__ = None # required because __eq__ is defined
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, MifHeader):
+ return NotImplemented
+ return (
+ self._shape == other._shape
+ and self._zooms == other._zooms
+ and self._layout == other._layout
+ and self._dtype == other._dtype
+ and np.allclose(self._transform, other._transform)
+ and self._intensity_offset == other._intensity_offset
+ and self._intensity_scale == other._intensity_scale
+ and self._keyval == other._keyval
+ )
+
+
+class MifImage(SpatialImage):
+ """Nibabel-style image class for MIF (.mif / .mif.gz) files.
+
+ Supports reading and writing the MRtrix Image Format, including gzip
+ compression. The public API mirrors standard nibabel images::
+
+ img = MifImage.load('image.mif')
+ data = img.get_fdata()
+ affine = img.affine
+
+ new_img = MifImage(data, affine)
+ new_img.to_filename('output.mif')
+ new_img.to_filename('output.mif.gz')
+
+ The MIF *layout* field (e.g. ``-0,-1,+2``) describes which axis varies
+ fastest on disk and in which direction. :meth:`get_fdata` always returns
+ a C-contiguous array indexed as ``data[x, y, z, ...]`` regardless of the
+ on-disk layout.
+ """
+
+ header_class = MifHeader
+ files_types = (('image', '.mif'),)
+ valid_exts = ('.mif',)
+ _compressed_suffixes = ('.gz',)
+
+ def __init__(self, dataobj, affine, header=None, extra=None, file_map=None):
+ super().__init__(dataobj, affine, header=header, extra=extra, file_map=file_map)
+ # Ensure layout has the right number of axes for freshly created images
+ if header is None and hasattr(dataobj, 'shape') and len(dataobj.shape) > 0:
+ ndim = len(dataobj.shape)
+ if len(self._header._layout) != ndim:
+ self._header._layout = list(range(1, ndim + 1))
+ self._header.set_data_dtype(np.asarray(dataobj).dtype)
+
+ @classmethod
+ def from_file_map(cls, file_map, *, mmap=False, keep_file_open=None):
+ """Load a MIF image from a nibabel *file_map* dict."""
+ img_fh = file_map['image']
+ with img_fh.get_prepare_fileobj(mode='rb') as fileobj:
+ header = cls.header_class.from_fileobj(fileobj)
+ data_offset = header._data_offset
+ if data_offset is None:
+ raise ValueError('Could not determine data offset from MIF header')
+
+ current_pos = fileobj.tell()
+ skip = data_offset - current_pos
+ if skip > 0:
+ fileobj.read(skip) # works for both seekable and gzip streams
+ elif skip < 0:
+ fileobj.seek(data_offset)
+
+ shape = header.get_data_shape()
+ dtype = header.get_data_dtype()
+ n_bytes = int(np.prod(shape)) * dtype.itemsize
+ raw = np.frombuffer(fileobj.read(n_bytes), dtype=dtype)
+
+ data = _mif_apply_layout(raw, shape, header.get_layout())
+
+ affine = header.get_best_affine()
+ off, scale = header.get_intensity_scaling()
+ if scale != 1.0 or off != 0.0:
+ data = data.astype(np.float64) * scale + off
+
+ img = cls(data, affine, header=header, file_map=file_map)
+ img._affine = affine # keep the exact affine from the header
+ return img
+
+ def to_file_map(self, file_map=None, dtype=None):
+ """Save the image to the files described by *file_map*."""
+ if file_map is None:
+ file_map = self.file_map
+
+ self.update_header()
+ header = self._header
+
+ if dtype is not None:
+ header.set_data_dtype(np.dtype(dtype))
+
+ data = np.asanyarray(self._dataobj)
+
+ img_fh = file_map['image']
+ with img_fh.get_prepare_fileobj(mode='wb') as fileobj:
+ header.write_to(fileobj)
+ disk_data = _mif_apply_layout_for_write(data, header.get_layout())
+ fileobj.write(disk_data.astype(header.get_data_dtype()).tobytes())
+
+ def _affine2header(self):
+ """Sync the nibabel affine back into the MIF header fields."""
+ if self._affine is None:
+ return
+ hdr = self._header
+ affine = self._affine
+ # Extract voxel sizes as column norms of the rotation+scale part
+ zooms = np.sqrt(np.sum(affine[:3, :3] ** 2, axis=0))
+ ndim = len(hdr.get_data_shape())
+
+ zooms_list = list(hdr.get_zooms())
+ n_spatial = min(3, ndim)
+ zooms_list[:n_spatial] = zooms[:n_spatial].tolist()
+ hdr.set_zooms(zooms_list)
+
+ # Store unit direction cosines and translation
+ transform = np.zeros((3, 4), dtype=np.float64)
+ safe_zooms = np.where(zooms > 0, zooms, 1.0)
+ transform[:, :3] = affine[:3, :3] / safe_zooms
+ transform[:, 3] = affine[:3, 3]
+ hdr._transform = transform
diff --git a/test/conftest.py b/test/conftest.py
index 69c4ff5..7b0c290 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -3,7 +3,12 @@
from __future__ import annotations
import os
+import tarfile
from pathlib import Path
+from urllib.error import URLError
+from urllib.request import urlretrieve
+
+import pytest
# CLI tests run subprocesses with cwd under tmp_path (outside the repo). Coverage
# discovers [tool.coverage] from the process cwd, so those children would default to
@@ -11,3 +16,39 @@
# the parent. Point subprocess coverage at the project config explicitly.
_ROOT = Path(__file__).resolve().parents[1]
os.environ.setdefault('COVERAGE_RCFILE', str(_ROOT / 'pyproject.toml'))
+
+_FIXEL_DATA_URL = 'https://upenn.box.com/shared/static/aaauvwrsrvsj3yvdkl1b899wx1s8ih2f.tar.gz'
+_FIXEL_DATA_URL_ENV = 'MODELARRAYIO_FIXEL_TEST_DATA_URL'
+_FIXEL_DATA_DIR_ENV = 'MODELARRAYIO_FIXEL_TEST_DATA_DIR'
+
+
+def _download_and_extract_fixel_test_data(destination_dir: Path) -> Path:
+ """Download and extract the fixel test dataset archive."""
+ source_dir = os.environ.get(_FIXEL_DATA_DIR_ENV)
+ if source_dir:
+ source_path = Path(source_dir).expanduser().resolve()
+ if not source_path.exists():
+ raise FileNotFoundError(
+ f'{_FIXEL_DATA_DIR_ENV} points to a missing path: {source_path}'
+ )
+ return source_path
+
+ destination_dir.mkdir(parents=True, exist_ok=True)
+ archive_path = destination_dir / 'mif_test_data.tar.gz'
+ data_url = os.environ.get(_FIXEL_DATA_URL_ENV, _FIXEL_DATA_URL)
+ urlretrieve(data_url, archive_path) # noqa: S310
+
+ with tarfile.open(archive_path, mode='r:gz') as archive:
+ archive.extractall(destination_dir) # noqa: S202
+
+ return destination_dir
+
+
+@pytest.fixture(scope='session')
+def downloaded_fixel_data_dir(tmp_path_factory: pytest.TempPathFactory) -> Path:
+ """Provide the downloaded fixel dataset directory for tests."""
+ destination_dir = tmp_path_factory.mktemp('downloaded_fixel_data')
+ try:
+ return _download_and_extract_fixel_test_data(destination_dir)
+ except (FileNotFoundError, OSError, URLError, tarfile.TarError) as exc:
+ raise RuntimeError(f'Downloaded fixel test data unavailable: {exc}') from exc
diff --git a/test/test_mif_strides.py b/test/test_mif_strides.py
new file mode 100644
index 0000000..29dd671
--- /dev/null
+++ b/test/test_mif_strides.py
@@ -0,0 +1,216 @@
+"""Verify that MIF loading produces mrtrix-canonical (positive-stride) data."""
+
+from __future__ import annotations
+
+import shutil
+import subprocess
+from pathlib import Path
+
+import numpy as np
+import pytest
+
+from modelarrayio.utils._mif_format import _mif_apply_layout, _mif_apply_layout_for_write
+from modelarrayio.utils.mif import mif_to_image
+
+# ---------------------------------------------------------------------------
+# Pure-Python synthetic fixtures: independent of downloaded data and mrconvert
+# ---------------------------------------------------------------------------
+
+
+def _write_mif(
+ path: Path,
+ data: np.ndarray,
+ layout: list[int],
+ scaling: tuple[float, float] | None = None,
+) -> None:
+ """Write a minimal MIF file with the given canonical-order *data* and *layout*."""
+ shape = data.shape
+ ndim = len(shape)
+ assert len(layout) == ndim
+
+ # Convert canonical-order data into the on-disk byte layout.
+ disk = _mif_apply_layout_for_write(data, layout)
+
+ layout_str = ','.join(('+' if s > 0 else '-') + str(abs(s) - 1) for s in layout)
+
+ header_lines = [
+ 'mrtrix image',
+ f'dim: {",".join(str(s) for s in shape)}',
+ f'vox: {",".join("1.0" for _ in shape)}',
+ f'layout: {layout_str}',
+ 'datatype: Float32LE',
+ 'transform: 1.0, 0.0, 0.0, 0.0',
+ 'transform: 0.0, 1.0, 0.0, 0.0',
+ 'transform: 0.0, 0.0, 1.0, 0.0',
+ ]
+ if scaling is not None:
+ header_lines.append(f'scaling: {scaling[0]},{scaling[1]}')
+
+ pre = ('\n'.join(header_lines) + '\n').encode('latin-1')
+ # Compute offset the same way MifHeader.write_to does.
+ file_prefix = b'file: . '
+ end_suffix = b'\nEND\n'
+ offset = len(pre) + len(file_prefix) + 5 + len(end_suffix)
+ offset += (4 - offset % 4) % 4
+ for _ in range(5):
+ file_line = file_prefix + str(offset).encode() + end_suffix
+ total = len(pre) + len(file_line)
+ new_offset = total + (4 - total % 4) % 4
+ if new_offset == offset:
+ break
+ offset = new_offset
+ file_line = f'file: . {offset}\nEND\n'.encode('latin-1')
+
+ payload = disk.astype(' None:
+ """Layout ``-0`` means disk bytes are stored reversed; loader must un-reverse."""
+ canonical = np.arange(7, dtype=np.float32)
+ path = tmp_path / 'neg_1d.mif'
+ _write_mif(path, canonical, layout=[-1])
+
+ # Inspect the raw bytes: they should be reversed relative to canonical.
+ raw = path.read_bytes()[-canonical.nbytes :]
+ on_disk = np.frombuffer(raw, dtype=' None:
+ """Layout ``-0,+1,-2`` should produce canonical-order numpy output on read."""
+ canonical = np.arange(2 * 3 * 4, dtype=np.float32).reshape(2, 3, 4)
+ path = tmp_path / 'neg_3d.mif'
+ _write_mif(path, canonical, layout=[-1, 2, -3])
+
+ _, loaded = mif_to_image(str(path))
+ np.testing.assert_array_equal(loaded, canonical)
+
+
+def test_layout_axis_reorder_only(tmp_path: Path) -> None:
+ """Layout ``+1,+2,+0`` reorders axes on disk; loader returns canonical order."""
+ canonical = np.arange(2 * 3 * 4, dtype=np.float32).reshape(2, 3, 4)
+ path = tmp_path / 'reorder.mif'
+ _write_mif(path, canonical, layout=[2, 3, 1])
+
+ _, loaded = mif_to_image(str(path))
+ np.testing.assert_array_equal(loaded, canonical)
+
+
+def test_apply_layout_round_trips_through_write_and_read() -> None:
+ """_mif_apply_layout is the inverse of _mif_apply_layout_for_write."""
+ rng = np.random.default_rng(0)
+ canonical = rng.random((3, 2, 4), dtype=np.float32)
+ for layout in ([1, 2, 3], [-1, 2, 3], [-1, -2, 3], [3, -1, 2], [-3, 2, -1]):
+ disk = _mif_apply_layout_for_write(canonical, layout)
+ restored = _mif_apply_layout(disk.ravel(), canonical.shape, layout)
+ np.testing.assert_array_equal(restored, canonical)
+
+
+def test_intensity_scaling_applied_once(tmp_path: Path) -> None:
+ """Verify the read path applies ``offset + scale * disk_value`` once."""
+ disk_values = np.arange(5, dtype=np.float32)
+ offset, scale = 2.5, 0.25
+ path = tmp_path / 'scaled.mif'
+ _write_mif(path, disk_values, layout=[1], scaling=(offset, scale))
+
+ _, loaded = mif_to_image(str(path))
+ np.testing.assert_allclose(loaded, offset + scale * disk_values, rtol=1e-6)
+
+
+# ---------------------------------------------------------------------------
+# mrconvert-based equivalence: loader matches canonical-stride output
+# ---------------------------------------------------------------------------
+
+
+_MRCONVERT = shutil.which('mrconvert')
+
+
+def _canonical_strides(ndim: int) -> str:
+ return ','.join(str(i + 1) for i in range(ndim))
+
+
+def _downloaded_scalar_file(data_dir: Path) -> Path | None:
+ """Return one fixel-scalar .mif from the cohort CSV, or ``None`` if unavailable."""
+ import pandas as pd
+
+ cohort_csv = data_dir / 'stat-alpha_cohort.csv'
+ if not cohort_csv.exists():
+ return None
+ cohort = pd.read_csv(cohort_csv)
+ candidate = data_dir / str(cohort['source_file'].iloc[0])
+ return candidate if candidate.exists() else None
+
+
+@pytest.mark.skipif(_MRCONVERT is None, reason='mrconvert not available on PATH')
+@pytest.mark.downloaded_data
+@pytest.mark.parametrize('filename', ['directions.mif', 'index.mif', '__scalar__'])
+def test_loader_matches_mrconvert_canonical_strides(
+ tmp_path: Path, downloaded_fixel_data_dir: Path, filename: str
+) -> None:
+ """Non-canonical stride files should load identically to their canonical-stride twin."""
+ if filename == '__scalar__':
+ src = _downloaded_scalar_file(downloaded_fixel_data_dir)
+ if src is None:
+ pytest.skip('no fixel scalar file listed in downloaded cohort CSV')
+ else:
+ src = downloaded_fixel_data_dir / filename
+ if not src.exists():
+ pytest.skip(f'{filename} missing from downloaded fixture directory')
+
+ _, data_src = mif_to_image(str(src))
+ ndim = data_src.ndim
+
+ canonical = tmp_path / f'canonical_{src.name}'
+ subprocess.run(
+ [
+ _MRCONVERT,
+ str(src),
+ str(canonical),
+ '-strides',
+ _canonical_strides(ndim),
+ '-force',
+ '-quiet',
+ ],
+ check=True,
+ )
+ _, data_can = mif_to_image(str(canonical))
+ np.testing.assert_array_equal(data_src, data_can)
+
+
+@pytest.mark.downloaded_data
+def test_fixel_id_points_to_matching_scalar(downloaded_fixel_data_dir: Path) -> None:
+ """The first_fixel_id stored in index.mif must index correctly into a fixel scalar.
+
+ This fails when the loader returns disk-ordered data because ``scalar[id]``
+ would fetch the reversed fixel.
+ """
+ index_file = downloaded_fixel_data_dir / 'index.mif'
+ scalar_file = _downloaded_scalar_file(downloaded_fixel_data_dir)
+ if not index_file.exists() or scalar_file is None:
+ pytest.skip('fixel fixtures missing from downloaded data')
+
+ _, index_data = mif_to_image(str(index_file))
+ _, scalar = mif_to_image(str(scalar_file))
+
+ count_vol = index_data[..., 0].astype(np.int64)
+ id_vol = index_data[..., 1].astype(np.int64)
+ n_fixels = scalar.shape[0]
+
+ # Total count must equal the number of fixels in the scalar file.
+ assert int(count_vol.sum()) == n_fixels
+
+ # Every (count, id) pair must fit inside the scalar array.
+ nonzero = count_vol > 0
+ ids = id_vol[nonzero]
+ counts = count_vol[nonzero]
+ assert int((ids + counts).max()) == n_fixels
+ assert int(ids.min()) >= 0
diff --git a/test/test_mif_utils.py b/test/test_mif_utils.py
index c76348c..f05688e 100644
--- a/test/test_mif_utils.py
+++ b/test/test_mif_utils.py
@@ -2,27 +2,92 @@
from __future__ import annotations
-import nibabel as nb
+from pathlib import Path
+
+import h5py
import numpy as np
+import pandas as pd
import pytest
+from modelarrayio.cli.h5_to_mif import h5_to_mif
+from modelarrayio.cli.mif_to_h5 import mif_to_h5
from modelarrayio.utils import mif
+from modelarrayio.utils.misc import load_and_normalize_cohort
+
+
+@pytest.mark.downloaded_data
+def test_mif_to_h5_results(
+ tmp_path_factory: pytest.TempPathFactory, downloaded_fixel_data_dir: Path
+) -> None:
+ """Test mif-to-h5 and h5-to-mif conversion, mimicking a ModelArray analysis."""
+ import os
+
+ # Step 1: Prepare inputs for conversion
+ out_dir = tmp_path_factory.mktemp('data_fixel_toy')
+ in_dir = downloaded_fixel_data_dir
+ index_file = in_dir / 'index.mif'
+ directions_file = in_dir / 'directions.mif'
+ cohort_file = in_dir / 'stat-alpha_cohort.csv'
+ if not index_file.exists():
+ raise FileNotFoundError(f'Contents of {in_dir}:\n{os.listdir(in_dir)}')
+ # Prepend absolute path to source files in cohort file
+ cohort_df = pd.read_csv(cohort_file)
+ cohort_df['source_file'] = cohort_df['source_file'].map(lambda path: str(in_dir / path))
+ temp_cohort_file = out_dir / 'stat-alpha_cohort.csv'
+ cohort_df.to_csv(temp_cohort_file, index=False)
+ cohort_long, _ = load_and_normalize_cohort(temp_cohort_file)
-def _make_nifti2(shape=(2, 1, 1)) -> nb.Nifti2Image:
- data = np.zeros(shape, dtype=np.float32)
- return nb.Nifti2Image(data, affine=np.eye(4))
+ # Step 2: Convert MIF to HDF5
+ h5_file = out_dir / 'stat-alpha.h5'
+ assert (
+ mif_to_h5(
+ index_file=index_file,
+ directions_file=directions_file,
+ cohort_long=cohort_long,
+ output=h5_file,
+ )
+ == 0
+ )
+ # Step 3: Add a result (element-wise mean across files) to the HDF5 file
+ with h5py.File(h5_file, 'a') as h5:
+ alpha_values = h5['scalars/alpha/values'][...]
+ mean_values = np.mean(alpha_values, axis=0, dtype=np.float32)
+ results_group = h5.require_group('results/lm')
+ results_group.create_dataset('results_matrix', data=mean_values[np.newaxis, :])
+ results_group.create_dataset(
+ 'column_names',
+ data=np.array(['mean'], dtype=h5py.string_dtype('utf-8')),
+ )
-def test_nifti2_to_mif_raises_when_mrconvert_missing(tmp_path, monkeypatch) -> None:
- monkeypatch.setattr(mif, 'find_mrconvert', lambda: None)
+ # Step 4: Convert HDF5 to MIF
+ output_dir = out_dir / 'mif_results'
+ assert (
+ h5_to_mif(
+ example_mif=cohort_df['source_file'].iloc[0],
+ in_file=h5_file,
+ analysis_name='lm',
+ compress=False,
+ output_dir=output_dir,
+ )
+ == 0
+ )
- with pytest.raises(FileNotFoundError, match='mrconvert'):
- mif.nifti2_to_mif(_make_nifti2(), tmp_path / 'out.mif')
+ # Step 5: Calculate mean directly from MIF files
+ source_arrays = [
+ mif.mif_to_image(source_file)[1].astype(np.float32)
+ for source_file in cohort_df['source_file']
+ ]
+ direct_mean = np.mean(np.stack(source_arrays, axis=0), axis=0, dtype=np.float32)
+ # Step 6: Compare the mean from the HDF5 file to the mean from the MIF files
+ output_mif = output_dir / 'lm_mean.mif'
+ assert output_mif.exists()
-def test_mif_to_nifti2_raises_when_mrconvert_missing(monkeypatch) -> None:
- monkeypatch.setattr(mif, 'find_mrconvert', lambda: None)
+ result_img, result_data = mif.mif_to_image(output_mif)
+ template_img, _ = mif.mif_to_image(cohort_df['source_file'].iloc[0])
- with pytest.raises(FileNotFoundError, match='mrconvert'):
- mif.mif_to_nifti2('missing_input.mif')
+ assert isinstance(result_img, mif.MifImage)
+ np.testing.assert_allclose(result_img.affine, template_img.affine)
+ np.testing.assert_allclose(result_data, direct_mean)
diff --git a/tox.ini b/tox.ini
index e422b2f..a6e4de2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -20,7 +20,7 @@ DEPENDS =
latest: latest
[testenv]
-description = Pytest with coverage (excludes network S3 tests)
+description = Pytest with coverage (excludes network S3 and downloaded-data tests)
labels = test
setenv =
COVERAGE_FILE = {toxinidir}/.tox/.coverage.{envname}
@@ -44,6 +44,11 @@ runner =
uv_resolution =
min: lowest-direct
+commands =
+ pytest -m "not s3 and not downloaded_data" --cov=modelarrayio --cov-config={toxinidir}/pyproject.toml --cov-report=term-missing --cov-report=xml {posargs:test}
+
+[testenv:py312-latest]
+description = Pytest with coverage including downloaded-data tests
commands =
pytest -m "not s3" --cov=modelarrayio --cov-config={toxinidir}/pyproject.toml --cov-report=term-missing --cov-report=xml {posargs:test}