diff --git a/HISTORY.rst b/HISTORY.rst
index 467e5c3..7b7c5ce 100644
--- a/HISTORY.rst
+++ b/HISTORY.rst
@@ -6,6 +6,9 @@ History
Future minor version (release soon)
-----------------------------------
+* Set native thread environment variable to number of cores from config if not set in the environment.
+* Improve parallelization to reduce risk of computational overhead.
+
1.3.0 (2026-02-23)
------------------
diff --git a/docs/index.rst b/docs/index.rst
index cf0b562..450d8a2 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -12,6 +12,7 @@ SARvey documentation
usage
preparation
processing
+ parallelization
visualization
demo_datasets
modules
diff --git a/docs/parallelization.rst b/docs/parallelization.rst
new file mode 100644
index 0000000..100618e
--- /dev/null
+++ b/docs/parallelization.rst
@@ -0,0 +1,66 @@
+Parallel processing (user guide)
+================================
+
+This page explains how to tune parallel processing in SARvey from a user perspective.
+
+Main settings
+-------------
+
+``general:num_cores``
+ Controls process-level parallelism in the CPU-heavy processing stages.
+
+``general:num_patches``
+ Splits the image stack into spatial patches to reduce memory pressure.
+ This is mostly a memory and I/O control, not a direct multiplier for CPU parallelism.
+
+How SARvey controls native thread libraries
+-------------------------------------------
+
+At startup, SARvey applies an *unset-only* policy for native thread environment variables.
+If a variable is not set, SARvey sets it to the value configured in ``general:num_cores``. If a variable is already set in your shell,
+job scheduler, or container environment, SARvey keeps the existing value.
+
+Variables handled:
+
+* ``OMP_NUM_THREADS``
+* ``OPENBLAS_NUM_THREADS``
+* ``MKL_NUM_THREADS``
+* ``NUMEXPR_NUM_THREADS``
+
+Why this matters
+----------------
+
+Some SARvey stages use Python multiprocessing while numerical libraries may also use internal
+native threads. If both layers are aggressive, CPU oversubscription can happen and runtime may get
+worse even with high CPU utilization.
+
+Recommended tuning workflow
+---------------------------
+
+1. Start conservative:
+
+ * set ``general:num_cores`` to about half of physical cores
+ * keep ``general:num_patches = 1`` unless memory is insufficient
+
+2. Increase ``general:num_cores`` stepwise and compare runtime for the same step and dataset.
+
+3. Increase ``general:num_patches`` only when memory is a bottleneck.
+
+4. On shared systems, prefer explicit environment settings in your job script if your site requires them.
+
+Troubleshooting high CPU load
+-----------------------------
+
+If CPU usage is too high:
+
+1. Lower ``general:num_cores`` first.
+2. Keep ``general:num_patches`` low unless needed for memory.
+3. Check whether your environment defines high values for BLAS/OpenMP variables.
+4. Re-run one processing step and compare wall-clock time, not only CPU percent.
+
+Quick notes
+-----------
+
+* Different processing steps scale differently with core count.
+* Higher CPU usage does not always mean faster processing.
+* Patching can increase I/O overhead; use it primarily for memory control.
diff --git a/docs/processing.rst b/docs/processing.rst
index fc98795..2fd551f 100644
--- a/docs/processing.rst
+++ b/docs/processing.rst
@@ -320,6 +320,8 @@ By setting **general:num_patches** the data is split into spatial patches and pr
However, only use the patching option if the memory is not sufficient to process the data in one go.
Using multiple patches will slow down the processing due to the overhead of loading and saving the data multiple times.
+For practical user-side tuning guidance of **general:num_cores** and **general:num_patches**, see :doc:`parallel_processing`.
+
Processing steps for one-step unwrapping workflow
-------------------------------------------------
diff --git a/sarvey/coherence.py b/sarvey/coherence.py
index 8a9e292..46708c1 100644
--- a/sarvey/coherence.py
+++ b/sarvey/coherence.py
@@ -110,6 +110,7 @@ def computeIfgsAndTemporalCoherence(*, path_temp_coh: str, path_ifgs: str, path_
avg_neighbours[:, :, i] = convolve2d(in1=ifgs[:, :, i], in2=filter_kernel, mode='same', boundary="symm")
else:
+ num_cores = min(num_cores, num_ifgs)
args = [(
idx,
ifgs[:, :, idx],
diff --git a/sarvey/densification.py b/sarvey/densification.py
index 7bfed6c..ff85e5e 100644
--- a/sarvey/densification.py
+++ b/sarvey/densification.py
@@ -141,7 +141,7 @@ def launchDensifyNetworkConsistencyCheck(args: tuple):
design_mat=design_mat
)
- prog_bar.update(counter + 1, every=np.int16(200),
+ prog_bar.update(counter + 1, every=200,
suffix='{}/{} points'.format(counter + 1, num_points))
counter += 1
@@ -224,10 +224,9 @@ def densifyNetwork(*, point1_obj: Points, vel_p1: np.ndarray, demerr_p1: np.ndar
velocity_bound, demerr_bound, num_samples)
idx_range, demerr_p2, vel_p2, gamma_p2 = launchDensifyNetworkConsistencyCheck(args)
else:
+ num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=point2_obj.num_points)
with multiprocessing.Pool(num_cores, initializer=densificationInitializer, initargs=init_args) as pool:
logger.info(msg="start parallel processing with {} cores.".format(num_cores))
- num_cores = point2_obj.num_points if num_cores > point2_obj.num_points else num_cores
- # avoids having less samples than cores
idx = ut.splitDatasetForParallelProcessing(num_samples=point2_obj.num_points, num_cores=num_cores)
args = [(
idx_range,
diff --git a/sarvey/filtering.py b/sarvey/filtering.py
index a5f0616..324cc6f 100644
--- a/sarvey/filtering.py
+++ b/sarvey/filtering.py
@@ -218,7 +218,7 @@ def estimateAtmosphericPhaseScreen(*, residuals: np.ndarray, coord_utm1: np.ndar
aps1 = np.zeros((num_points1, num_time), dtype=np.float32)
aps2 = np.zeros((num_points2, num_time), dtype=np.float32)
- num_cores = num_time if num_cores > num_time else num_cores # avoids having more samples than cores
+ num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=num_time)
idx = ut.splitDatasetForParallelProcessing(num_samples=num_time, num_cores=num_cores)
args = [(
diff --git a/sarvey/processing.py b/sarvey/processing.py
index c29fdcf..2884da9 100644
--- a/sarvey/processing.py
+++ b/sarvey/processing.py
@@ -504,8 +504,8 @@ def runUnwrappingTimeAndSpace(self):
phase=unw_phase,
num_points=point_obj.num_points,
ifg_net_obj=point_obj.ifg_net_obj,
- num_cores=1, # self.config.general.num_cores,
ref_idx=0,
+ num_cores=self.config.general.num_cores,
logger=self.logger
)
point_obj = Points(file_path=join(self.path, "p1_ts.h5"), logger=self.logger)
@@ -575,8 +575,8 @@ def runUnwrappingSpace(self):
# for sbas the ifg network needs to be inverted to get the phase time series
phase_ts = ut.invertIfgNetwork(phase=unw_phase, num_points=point_obj.num_points,
ifg_net_obj=point_obj.ifg_net_obj,
- num_cores=1, # self.config.general.num_cores,
ref_idx=0,
+ num_cores=self.config.general.num_cores,
logger=self.logger)
point_obj.phase = phase_ts
@@ -1071,7 +1071,7 @@ def runDensificationTimeAndSpace(self):
phase=unw_phase,
num_points=point2_obj.num_points,
ifg_net_obj=point2_obj.ifg_net_obj,
- num_cores=1, # self.config.general.num_cores,
+ num_cores=self.config.general.num_cores,
ref_idx=0,
logger=self.logger)
@@ -1131,8 +1131,8 @@ def runDensificationSpace(self):
phase_ts = ut.invertIfgNetwork(phase=unw_phase, num_points=point_obj.num_points,
ifg_net_obj=point_obj.ifg_net_obj,
- num_cores=1, # self.config.general.num_cores,
ref_idx=0,
+ num_cores=self.config.general.num_cores,
logger=self.logger)
point_obj.phase = phase_ts
diff --git a/sarvey/sarvey_mti.py b/sarvey/sarvey_mti.py
index be45ff9..e366cca 100755
--- a/sarvey/sarvey_mti.py
+++ b/sarvey/sarvey_mti.py
@@ -35,7 +35,6 @@
from os.path import join
import json5
-import matplotlib
import sys
import logging
import time
@@ -46,12 +45,7 @@
from sarvey.console import printStep, printCurrentConfig, showLogoSARvey
from sarvey.processing import Processing
from sarvey.config import Config, loadConfiguration
-from sarvey.utils import checkIfRequiredFilesExist
-
-try:
- matplotlib.use('QtAgg')
-except ImportError as e:
- print(e)
+from sarvey.utils import checkIfRequiredFilesExist, setNativeThreadEnvIfUnset
EXAMPLE = """Example:
sarvey -f config.json -g # create default config file with the name config.json and exit
@@ -321,6 +315,18 @@ def main(iargs=None):
config = loadConfiguration(path=config_file_path)
+ # Apply unset-only native-thread defaults based on user configuration to reduce nested parallelism.
+ native_thread_updates = setNativeThreadEnvIfUnset(num_threads=1)
+ if native_thread_updates:
+ logger.info(msg=(
+ "Native thread limits set by SARvey (unset-only, using general:num_cores={0}): {1}"
+ ).format(config.general.num_cores,
+ ", ".join(f"{k}={v}" for k, v in sorted(native_thread_updates.items()))))
+ else:
+ logger.info(msg=(
+ "Native thread limits unchanged (unset-only): existing environment values preserved."
+ ))
+
current_datetime = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
log_filename = f"sarvey_{current_datetime}.log"
logpath = config.general.logfile_path
diff --git a/sarvey/unwrapping.py b/sarvey/unwrapping.py
index f89222e..1de5010 100644
--- a/sarvey/unwrapping.py
+++ b/sarvey/unwrapping.py
@@ -350,8 +350,7 @@ def temporalUnwrapping(*, ifg_net_obj: IfgNetwork, net_obj: Network, wavelength:
vel = np.zeros((net_obj.num_arcs, 1), dtype=np.float32)
gamma = np.zeros((net_obj.num_arcs, 1), dtype=np.float32)
- num_cores = net_obj.num_arcs if num_cores > net_obj.num_arcs else num_cores # avoids having more samples
- # then cores
+ num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=net_obj.num_arcs)
idx = ut.splitDatasetForParallelProcessing(num_samples=net_obj.num_arcs, num_cores=num_cores)
args = [(
@@ -476,8 +475,7 @@ def spatialUnwrapping(*, num_ifgs: int, num_points: int, phase: np.ndarray, edge
logger.info(msg="start parallel processing with {} cores.".format(num_cores))
unw_phase = np.zeros((num_points, num_ifgs), dtype=np.float32)
- num_cores = num_ifgs if num_cores > num_ifgs else num_cores
- # avoids having more samples than cores
+ num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=num_ifgs)
idx = ut.splitDatasetForParallelProcessing(num_samples=num_ifgs, num_cores=num_cores)
args = [(
diff --git a/sarvey/utils.py b/sarvey/utils.py
index b5c96c4..af9a78a 100644
--- a/sarvey/utils.py
+++ b/sarvey/utils.py
@@ -28,9 +28,10 @@
# with this program. If not, see .
"""Utils module for SARvey."""
-import multiprocessing
+import os
import time
from os.path import exists, join
+import multiprocessing
import numpy as np
from scipy.sparse.linalg import lsqr
@@ -53,6 +54,23 @@ def convertBboxToBlock(*, bbox: tuple):
return block
+def clampNumCores(*, requested_cores: int, num_samples: int) -> int:
+ """Clamp the requested number of worker processes to a safe usable range."""
+ if requested_cores <= 0:
+ raise ValueError("Number of cores must be greater than zero.")
+ if num_samples <= 0:
+ raise ValueError("Number of samples must be greater than zero.")
+ return min(requested_cores, num_samples)
+
+
+def scaleNumCores(*, requested_cores: int, num_samples: int, scale: float = 1.0) -> int:
+ """Scale and clamp worker count while guaranteeing at least one worker."""
+ if scale <= 0:
+ raise ValueError("Scale must be greater than zero.")
+ scaled_cores = max(1, int(np.floor(requested_cores * scale)))
+ return clampNumCores(requested_cores=scaled_cores, num_samples=num_samples)
+
+
def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetwork, num_cores: int, ref_idx: int,
logger: Logger):
"""Wrap the ifg network inversion running in parallel.
@@ -89,13 +107,11 @@ def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetw
args = (np.arange(num_points), num_points, phase, design_mat, ifg_net_obj.num_images, ref_idx)
idx_range, phase_ts = launchInvertIfgNetwork(parameters=args)
else:
- # use only 10 percent of the cores, because scipy.sparse.linalg.lsqr is already running in parallel
- num_cores = int(np.floor(num_cores / 10))
+ num_cores = clampNumCores(requested_cores=num_cores, num_samples=num_points)
logger.info(msg="start parallel processing with {} cores.".format(num_cores))
phase_ts = np.zeros((num_points, ifg_net_obj.num_images), dtype=np.float32)
- num_cores = num_points if num_cores > num_points else num_cores # avoids having more samples than cores
idx = splitDatasetForParallelProcessing(num_samples=num_points, num_cores=num_cores)
args = [(
idx_range,
@@ -527,6 +543,7 @@ def splitDatasetForParallelProcessing(*, num_samples: int, num_cores: int):
idx: list
list of sample ranges for each core
"""
+ num_cores = clampNumCores(requested_cores=num_cores, num_samples=num_samples)
rest = np.mod(num_samples, num_cores)
avg_num_samples_per_core = int((num_samples - rest) / num_cores)
num_samples_per_core = np.zeros((num_cores,), dtype=np.int64)
@@ -867,3 +884,28 @@ def checkIfRequiredFilesExist(*, path_to_files: str, required_files: list, logge
if not exists(join(path_to_files, file)):
logger.error(f"File from previous step(s) is missing: {file}.")
raise FileNotFoundError
+
+
+def setNativeThreadEnvIfUnset(*, num_threads: int = 1, logger: Logger = None) -> dict:
+ """Set native library thread limits only if environment variables are unset."""
+ if num_threads <= 0:
+ raise ValueError("Number of native threads must be greater than zero.")
+
+ env_vars = (
+ "OMP_NUM_THREADS",
+ "OPENBLAS_NUM_THREADS",
+ "MKL_NUM_THREADS",
+ "NUMEXPR_NUM_THREADS",
+ )
+
+ updates = {}
+ for var_name in env_vars:
+ if os.environ.get(var_name) is None:
+ os.environ[var_name] = str(num_threads)
+ updates[var_name] = str(num_threads)
+
+ if (logger is not None) and updates:
+ logger.info(msg="Set native thread limits for unset environment variables: {}"
+ .format(", ".join(f"{k}={v}" for k, v in updates.items())))
+
+ return updates
diff --git a/tests/test_parallelization.py b/tests/test_parallelization.py
new file mode 100644
index 0000000..75ea64e
--- /dev/null
+++ b/tests/test_parallelization.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python
+
+# SARvey - A multitemporal InSAR time series tool for the derivation of displacements.
+#
+# Copyright (C) 2021-2026 Andreas Piter (IPI Hannover, piter@ipi.uni-hannover.de)
+#
+# This software was developed together with FERN.Lab (fernlab@gfz-potsdam.de) in the context
+# of the SAR4Infra project with funds of the German Federal Ministry for Digital and
+# Transport and contributions from Landesamt fuer Vermessung und Geoinformation
+# Schleswig-Holstein and Landesbetrieb Strassenbau und Verkehr Schleswig-Holstein.
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your option) any later
+# version.
+#
+# Important: This package uses PyMaxFlow. The core of PyMaxflows library is the C++
+# implementation by Vladimir Kolmogorov. It is also licensed under the GPL, but it REQUIRES that you
+# cite [BOYKOV04] (see LICENSE) in any resulting publication if you use this code for research purposes.
+# This requirement extends to SARvey.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with this program. If not, see .
+
+"""Tests for worker-count utilities used by SARvey parallel processing."""
+
+import os
+import unittest
+import numpy as np
+
+from sarvey.utils import clampNumCores, scaleNumCores, splitDatasetForParallelProcessing, setNativeThreadEnvIfUnset
+
+
+class TestParallelizationUtils(unittest.TestCase):
+ """Test safety guards for parallel worker allocation."""
+
+ _native_thread_vars = (
+ "OMP_NUM_THREADS",
+ "OPENBLAS_NUM_THREADS",
+ "MKL_NUM_THREADS",
+ "NUMEXPR_NUM_THREADS",
+ )
+
+ def setUp(self):
+ self._env_backup = {k: os.environ.get(k) for k in self._native_thread_vars}
+
+ def tearDown(self):
+ for key, val in self._env_backup.items():
+ if val is None:
+ os.environ.pop(key, None)
+ else:
+ os.environ[key] = val
+
+ def testClampNumCores_caps_to_num_samples(self):
+ """Clamp worker count to available samples."""
+ self.assertEqual(clampNumCores(requested_cores=16, num_samples=3), 3)
+ self.assertEqual(clampNumCores(requested_cores=4, num_samples=10), 4)
+
+ def testClampNumCores_rejects_invalid_values(self):
+ """Reject non-positive workers or samples."""
+ with self.assertRaises(ValueError):
+ clampNumCores(requested_cores=0, num_samples=5)
+ with self.assertRaises(ValueError):
+ clampNumCores(requested_cores=2, num_samples=0)
+
+ def testScaleNumCores_keeps_at_least_one_worker(self):
+ """Avoid the old floor-to-zero behavior for requested cores below ten."""
+ self.assertEqual(scaleNumCores(requested_cores=2, num_samples=50, scale=0.1), 1)
+ self.assertEqual(scaleNumCores(requested_cores=9, num_samples=50, scale=0.1), 1)
+ self.assertEqual(scaleNumCores(requested_cores=10, num_samples=50, scale=0.1), 1)
+ self.assertEqual(scaleNumCores(requested_cores=20, num_samples=50, scale=0.1), 2)
+
+ def testSplitDatasetForParallelProcessing_clamps_num_cores(self):
+ """Split logic should automatically cap workers to sample count."""
+ idx = splitDatasetForParallelProcessing(num_samples=3, num_cores=8)
+ self.assertEqual(len(idx), 3)
+ np.testing.assert_array_equal(idx[0], np.array([0]))
+ np.testing.assert_array_equal(idx[1], np.array([1]))
+ np.testing.assert_array_equal(idx[2], np.array([2]))
+
+ def testSplitDatasetForParallelProcessing_balances_ranges(self):
+ """Chunks should remain balanced when sample count is not divisible by workers."""
+ idx = splitDatasetForParallelProcessing(num_samples=10, num_cores=3)
+ lengths = [cur.shape[0] for cur in idx]
+ self.assertEqual(lengths, [4, 3, 3])
+ np.testing.assert_array_equal(np.concatenate(idx), np.arange(10))
+
+ def testSetNativeThreadEnvIfUnset_sets_only_missing(self):
+ """Unset variables are initialized while pre-set variables are kept."""
+ os.environ.pop("OMP_NUM_THREADS", None)
+ os.environ["OPENBLAS_NUM_THREADS"] = "7"
+ os.environ.pop("MKL_NUM_THREADS", None)
+ os.environ.pop("NUMEXPR_NUM_THREADS", None)
+
+ updates = setNativeThreadEnvIfUnset(num_threads=1)
+
+ self.assertEqual(os.environ["OMP_NUM_THREADS"], "1")
+ self.assertEqual(os.environ["OPENBLAS_NUM_THREADS"], "7")
+ self.assertEqual(os.environ["MKL_NUM_THREADS"], "1")
+ self.assertEqual(os.environ["NUMEXPR_NUM_THREADS"], "1")
+ self.assertEqual(updates["OMP_NUM_THREADS"], "1")
+ self.assertEqual(updates["MKL_NUM_THREADS"], "1")
+ self.assertEqual(updates["NUMEXPR_NUM_THREADS"], "1")
+ self.assertTrue("OPENBLAS_NUM_THREADS" not in updates)
+
+ def testSetNativeThreadEnvIfUnset_rejects_invalid_threads(self):
+ """Native thread limit must be positive."""
+ with self.assertRaises(ValueError):
+ setNativeThreadEnvIfUnset(num_threads=0)
+
+
+if __name__ == '__main__':
+ unittest.main()