diff --git a/doc/api.rst b/doc/api.rst index 4fe3921f..bd7dccd4 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -36,6 +36,7 @@ Coordinate refinement trackpy.refine_com trackpy.refine_leastsq + trackpy.spiff.apply_spiff Linking ------- diff --git a/trackpy/api.py b/trackpy/api.py index 8611bf70..fc3521ee 100644 --- a/trackpy/api.py +++ b/trackpy/api.py @@ -24,5 +24,6 @@ from . import predict from . import utils from . import artificial +from .spiff import apply_spiff from .utils import handle_logging, ignore_logging, quiet from .try_numba import try_numba_jit, enable_numba, disable_numba diff --git a/trackpy/feature.py b/trackpy/feature.py index 94edae58..831c78a3 100644 --- a/trackpy/feature.py +++ b/trackpy/feature.py @@ -15,6 +15,7 @@ from .masks import (binary_mask, N_binary_mask, r_squared_mask, x_squared_masks, cosmask, sinmask) from .uncertainty import _static_error, measure_noise +from .spiff import apply_spiff import trackpy # to get trackpy.__version__ logger = logging.getLogger(__name__) @@ -175,14 +176,14 @@ def local_maxima(image, radius, percentile=64, margin=None): def estimate_mass(image, radius, coord): "Compute the total brightness in the neighborhood of a local maximum." square = [slice(c - rad, c + rad + 1) for c, rad in zip(coord, radius)] - neighborhood = binary_mask(radius, image.ndim)*image[square] + neighborhood = binary_mask(radius, image.ndim) * image[square] return np.sum(neighborhood) def estimate_size(image, radius, coord, estimated_mass): "Compute the total brightness in the neighborhood of a local maximum." square = [slice(c - rad, c + rad + 1) for c, rad in zip(coord, radius)] - neighborhood = binary_mask(radius, image.ndim)*image[square] + neighborhood = binary_mask(radius, image.ndim) * image[square] Rg = np.sqrt(np.sum(r_squared_mask(radius, image.ndim) * neighborhood) / estimated_mass) return Rg @@ -205,7 +206,7 @@ def locate(raw_image, diameter, minmass=None, maxsize=None, separation=None, noise_size=1, smoothing_size=None, threshold=None, invert=False, percentile=64, topn=None, preprocess=True, max_iterations=10, filter_before=None, filter_after=None, - characterize=True, engine='auto'): + characterize=True, engine='auto', spiff=False): """Locate Gaussian-like blobs of some approximate size in an image. Preprocess the image by performing a band pass and a threshold. @@ -272,6 +273,15 @@ def locate(raw_image, diameter, minmass=None, maxsize=None, separation=None, characterize : boolean Compute "extras": eccentricity, signal, ep. True by default. engine : {'auto', 'python', 'numba'} + spiff : boolean or 'auto' + Apply the SPIFF sub-pixel bias correction + (``trackpy.spiff.apply_spiff``) to the located features + before returning. False by default. If True, the correction is + applied and a warning is emitted if there are too few features for + a reliable correction. If ``'auto'``, the correction is applied + silently when there are enough features, and skipped otherwise. + Note that SPIFF generally works best when applied to features + pooled from many frames at once; see ``batch``. Returns ------- @@ -308,7 +318,7 @@ def locate(raw_image, diameter, minmass=None, maxsize=None, separation=None, if filter_before is not None: raise ValueError("The filter_before argument is no longer supported as " "it does not improve performance. Features are " - "filtered after refine.") # see GH issue #141 + "filtered after refine.") # see GH issue #141 if filter_after is not None: warnings.warn("The filter_after argument has been deprecated: it is " "always on, unless minmass = None and maxsize = None.", @@ -323,7 +333,7 @@ def locate(raw_image, diameter, minmass=None, maxsize=None, separation=None, diameter = tuple([int(x) for x in diameter]) if not np.all([x & 1 for x in diameter]): raise ValueError("Feature diameter must be an odd integer. Round up.") - radius = tuple([x//2 for x in diameter]) + radius = tuple([x // 2 for x in diameter]) isotropic = np.all(radius[1:] == radius[:-1]) if (not isotropic) and (maxsize is not None): @@ -352,11 +362,11 @@ def locate(raw_image, diameter, minmass=None, maxsize=None, separation=None, dim = raw_image.ndim warnings.warn("I am interpreting the image as {}-dimensional. " "If it is actually a {}-dimensional color image, " - "convert it to grayscale first.".format(dim, dim-1)) + "convert it to grayscale first.".format(dim, dim - 1)) if threshold is None: if is_float_image: - threshold = 1/255. + threshold = 1 / 255. else: threshold = 1 @@ -373,7 +383,7 @@ def locate(raw_image, diameter, minmass=None, maxsize=None, separation=None, # For optimal performance, performance, coerce the image dtype to integer. if is_float_image: # For float images, assume bitdepth of 8. dtype = np.uint8 - else: # For integer images, take original dtype + else: # For integer images, take original dtype dtype = raw_image.dtype # Normalize_to_int does nothing if image is already of integer type. scale_factor, image = convert_to_int(image, dtype) @@ -452,6 +462,12 @@ def locate(raw_image, diameter, minmass=None, maxsize=None, separation=None, ep = pd.DataFrame(ep, columns=['ep_' + cc for cc in pos_columns]) refined_coords = pandas_concat([refined_coords, ep], axis=1) + # Optionally apply the SPIFF sub-pixel bias correction. + if spiff: + refined_coords = apply_spiff( + refined_coords, pos_columns=pos_columns, + warn_if_insufficient=(spiff != 'auto')) + # If this is a pims Frame object, it has a frame number. # Tag it on; this is helpful for parallelization. if hasattr(raw_image, 'frame_no') and raw_image.frame_no is not None: @@ -460,7 +476,7 @@ def locate(raw_image, diameter, minmass=None, maxsize=None, separation=None, def batch(frames, diameter, output=None, meta=None, processes='auto', - after_locate=None, **kwargs): + after_locate=None, spiff=False, **kwargs): """Locate Gaussian-like blobs of some approximate size in a set of images. Preprocess the image by performing a band pass and a threshold. @@ -497,6 +513,16 @@ def batch(frames, diameter, output=None, meta=None, processes='auto', - ``features``: a DataFrame containing the detected features. Furthermore it must return a DataFrame like ``features``. + spiff : boolean or 'auto' + Apply the SPIFF sub-pixel bias correction + (``trackpy.spiff.apply_spiff``). False by default. + If True, the correction is applied and a warning is emitted if + there are too few features for a reliable correction. If + ``'auto'``, the correction is applied silently when there are + enough features, and skipped otherwise. + + When ``output`` is None, the correction is computed once for all features in all frames (recommended). + When ``output`` is specified, the correction is computed for each frame, which may be less accurate. **kwargs : Keyword arguments that are passed to the wrapped `trackpy.locate`. Refer to its docstring for further details. @@ -523,6 +549,12 @@ def batch(frames, diameter, output=None, meta=None, processes='auto', "provided internally by `frames`") # Add required keyword argument kwargs["diameter"] = diameter + # SPIFF is most accurate when applied to features pooled from all + # frames at once. When output is None we can pool, so override any + # per-frame spiff request and apply the correction below. When + # output is specified, pooling isn't possible, so fall back to + # applying the correction frame-by-frame inside locate(). + kwargs["spiff"] = spiff if output is not None else False if meta: # Gather meta information and save as YAML in current directory. @@ -582,7 +614,11 @@ def after_locate(frame_no, features): if output is None: if len(all_features) > 0: - return pandas_concat(all_features).reset_index(drop=True) + result = pandas_concat(all_features).reset_index(drop=True) + if spiff: + result = apply_spiff( + result, warn_if_insufficient=(spiff != 'auto')) + return result else: # return empty DataFrame warnings.warn("No maxima found in any frame.") return pd.DataFrame(columns=list(features.columns) + ['frame']) @@ -630,8 +666,8 @@ def characterize(coords, image, radius, scale_factor=1.): mass[feat])[::-1] # change order yx -> xy # I only know how to measure eccentricity in 2D. if ndim == 2: - ecc[feat] = np.sqrt(np.sum(neighborhood*cosmask(radius))**2 + - np.sum(neighborhood*sinmask(radius))**2) + ecc[feat] = np.sqrt(np.sum(neighborhood * cosmask(radius)) ** 2 + + np.sum(neighborhood * sinmask(radius)) ** 2) ecc[feat] /= (mass[feat] - neighborhood[radius] + 1e-6) result = dict(mass=mass, signal=signal, ecc=ecc) diff --git a/trackpy/spiff.py b/trackpy/spiff.py new file mode 100644 index 00000000..a17d684a --- /dev/null +++ b/trackpy/spiff.py @@ -0,0 +1,96 @@ +import numpy as np +from pandas import DataFrame +import warnings + +# The minimum number of features required to apply SPIFF correction. +# This is a conservative requirement (likely higher than it needs to be) and is +# subject to further optimization. +MIN_FEATURES = 50 + + +def apply_spiff(f: DataFrame, pos_columns=None, + warn_if_insufficient=True) -> DataFrame: + """ + Removes pixel bias in a given list of features (using a single-pixel interior filling function), + thereby improving sub-pixel accuracy. + + Parameters + ---------- + f : DataFrame + Features as returned by ``trackpy.locate`` or ``trackpy.batch``. + pos_columns : list of column names, optional + The position columns to correct. If None, defaults to ``['x', 'y']`` + (and ``'z'`` if present in ``f``). + warn_if_insufficient : boolean, optional + If True (default), emit a warning when ``f`` contains fewer than + ``MIN_FEATURES`` rows and the correction is skipped. Set to False + for silent skipping (e.g. when called automatically via the + ``spiff='auto'`` option of ``locate`` or ``batch``). + + Returns + ------- + DataFrame + A copy of ``f`` with corrected positions, or ``f`` unchanged if + there are too few features. + + Notes + ----- + The algorithm used is inspired by "Analysis and correction of errors in + nanoscale particle tracking using the Single-pixel interior filling function + (SPIFF) algorithm" paper (see below). + The accuracy of this algorithm improves with the number of features. When + tracking features across multiple frames (e.g. in a video), consider locating + the features across all frames first (using tp.batch) before applying this function + (as opposed to applying this function for each individual frame). + If f contains fewer than ``MIN_FEATURES`` features, f is returned as-is, + due to lack of data. + + Citations + ----- + Yifat, Y., Sule, N., Lin, Y. et al. + Analysis and correction of errors in nanoscale particle tracking using the + Single-pixel interior filling function (SPIFF) algorithm. Sci Rep 7, 16553 (2017). + https://doi.org/10.1038/s41598-017-14166-6 + """ + if len(f) < MIN_FEATURES: + if warn_if_insufficient: + warnings.warn( + "Not enough features ({n} < {min_n}) to apply SPIFF " + "sub-pixel bias correction; returning features unchanged. " + "Consider running on a larger batch of frames.".format( + n=len(f), min_n=MIN_FEATURES)) + return f + + if pos_columns is None: + if 'z' in f: + pos_columns = ['x', 'y', 'z'] + else: + pos_columns = ['x', 'y'] + + f = f.copy() + + # Correct each column individually (subject to further optimization, + # by assuming sub-pixel bias is equal across certain dimensions). + for col in pos_columns: + # Get the values as a numpy array + x = np.array(f[col]) + + # Get the sub-pixel values + spiff = x % 1 + + # Mirror the values around the center of the pixel + spiff_mirrored = np.where(spiff < 0.5, spiff, 1 - spiff) + + # Sort the values for efficient search + spiff_sorted = np.sort(spiff_mirrored) + + # Reverse any sub-pixel bias + spiff_corrected_low = np.searchsorted(spiff_sorted, spiff) / len(x) / 2 + spiff_corrected_high = 1 - np.searchsorted(spiff_sorted, 1 - spiff) / len(x) / 2 + spiff_corrected = np.where(spiff < 0.5, spiff_corrected_low, spiff_corrected_high) + + # Add the sub-pixel value back to the original pixel position + x_corrected = np.floor(x) + spiff_corrected + f[col] = x_corrected + + return f diff --git a/trackpy/tests/test_spiff.py b/trackpy/tests/test_spiff.py new file mode 100644 index 00000000..548e71bb --- /dev/null +++ b/trackpy/tests/test_spiff.py @@ -0,0 +1,143 @@ +import unittest +import warnings + +import numpy as np +import pandas as pd + +import trackpy as tp +from trackpy.utils import default_pos_columns +from trackpy.artificial import draw_array +from trackpy.spiff import apply_spiff, MIN_FEATURES +from trackpy.tests.common import StrictTestCase, sort_positions + + +def _subpix_bias(features, columns): + """Histogram-based measure of sub-pixel bias across position columns.""" + return np.std( + np.histogram(features[columns].values % 1, bins=10, range=(0, 1))[0] + ) + + +class TestSpiff(StrictTestCase): + def _test_spiff(self, ndim): + # Draw an image with 200 features and some noise + expected, image = draw_array(200, 2, noise_level=1, ndim=ndim) + + columns = default_pos_columns(ndim) + + # Locate the features and calculate the deviation + features = tp.locate(image, diameter=5) + _, actual = sort_positions(features[columns].values, expected) + deviation = np.sqrt(np.mean(np.sum((actual - expected) ** 2, 1))) + + # Apply SPIFF correction and calculate the deviation + corrected_features = apply_spiff(features) + _, corrected = sort_positions(corrected_features[columns].values, expected) + deviation_corrected = np.sqrt(np.mean(np.sum((corrected - expected) ** 2, 1))) + + # Verify that the SPIFF correction improves accuracy + assert (deviation_corrected < deviation) + + # Verify that the SPIFF corrected features have less subpixel bias + hist_dev = _subpix_bias(features, columns) + corrected_hist_dev = _subpix_bias(corrected_features, columns) + assert (corrected_hist_dev < hist_dev) + + def test_spiff_2d(self): + self._test_spiff(ndim=2) + + def test_spiff_3d(self): + self._test_spiff(ndim=3) + + +class TestSpiffOption(StrictTestCase): + """Tests for the ``spiff`` option of ``locate`` and ``batch``.""" + + def _make_image(self, n=200, ndim=2): + return draw_array(n, 2, noise_level=1, ndim=ndim) + + def test_locate_spiff_false_default(self): + # By default, locate must not modify features via SPIFF. + expected, image = self._make_image() + baseline = tp.locate(image, diameter=5) + corrected = tp.locate(image, diameter=5, spiff=False) + pd.testing.assert_frame_equal(baseline, corrected) + + def test_locate_spiff_true_reduces_bias(self): + expected, image = self._make_image() + columns = default_pos_columns(2) + baseline = tp.locate(image, diameter=5) + corrected = tp.locate(image, diameter=5, spiff=True) + # Same number of features, but corrected positions should have + # less sub-pixel bias. + assert len(baseline) == len(corrected) + assert _subpix_bias(corrected, columns) < _subpix_bias(baseline, columns) + # And the values should actually be different. + assert not np.allclose(baseline[columns].values, + corrected[columns].values) + + def test_locate_spiff_true_warns_on_few_features(self): + # A small image yields very few features; spiff=True should warn. + expected, image = self._make_image(n=4) + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + result = tp.locate(image, diameter=5, spiff=True) + messages = [str(w.message) for w in caught] + assert any("SPIFF" in m for m in messages), messages + assert len(result) < MIN_FEATURES + + def test_locate_spiff_auto_silent_on_few_features(self): + expected, image = self._make_image(n=4) + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + tp.locate(image, diameter=5, spiff='auto') + messages = [str(w.message) for w in caught] + assert not any("SPIFF" in m for m in messages), messages + + def test_batch_spiff_pools_across_frames(self): + # Build a small "video" of identical frames so SPIFF gets enough + # features when pooled across all of them. + expected, image = self._make_image(n=50) + frames = [image, image, image, image] + columns = default_pos_columns(2) + baseline = tp.batch(frames, diameter=5) + corrected = tp.batch(frames, diameter=5, spiff=True) + assert len(baseline) == len(corrected) + assert _subpix_bias(corrected, columns) < _subpix_bias(baseline, columns) + + def test_batch_spiff_default_is_off(self): + expected, image = self._make_image(n=50) + frames = [image, image] + baseline = tp.batch(frames, diameter=5) + explicit = tp.batch(frames, diameter=5, spiff=False) + pd.testing.assert_frame_equal(baseline, explicit) + + def test_batch_spiff_with_output_applies_per_frame(self): + # When `output` is specified, features cannot be pooled across + # frames, so SPIFF should fall back to per-frame application via + # the underlying `locate` call. + expected, image = self._make_image(n=50) + frames = [image, image] + columns = default_pos_columns(2) + + class Collector: + def __init__(self): + self.frames = [] + + def put(self, features): + self.frames.append(features) + + baseline = Collector() + tp.batch(frames, diameter=5, output=baseline) + corrected = Collector() + tp.batch(frames, diameter=5, spiff=True, output=corrected) + + baseline_all = pd.concat(baseline.frames, ignore_index=True) + corrected_all = pd.concat(corrected.frames, ignore_index=True) + assert len(baseline_all) == len(corrected_all) + assert (_subpix_bias(corrected_all, columns) + < _subpix_bias(baseline_all, columns)) + + +if __name__ == '__main__': + unittest.main()