Skip to content
3 changes: 3 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ History
Future minor version (release soon)
-----------------------------------

* Set native thread environment variable to number of cores from config if not set in the environment.
* Improve parallelization to reduce risk of computational overhead.

1.3.0 (2026-02-23)
------------------

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ SARvey documentation
usage
preparation
processing
parallelization
visualization
demo_datasets
modules
Expand Down
66 changes: 66 additions & 0 deletions docs/parallelization.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
Parallel processing (user guide)
================================

This page explains how to tune parallel processing in SARvey from a user perspective.

Main settings
-------------

``general:num_cores``
Controls process-level parallelism in the CPU-heavy processing stages.

``general:num_patches``
Splits the image stack into spatial patches to reduce memory pressure.
This is mostly a memory and I/O control, not a direct multiplier for CPU parallelism.

How SARvey controls native thread libraries
-------------------------------------------

At startup, SARvey applies an *unset-only* policy for native thread environment variables.
If a variable is not set, SARvey sets it to the value configured in ``general:num_cores``. If a variable is already set in your shell,
job scheduler, or container environment, SARvey keeps the existing value.

Variables handled:

* ``OMP_NUM_THREADS``
* ``OPENBLAS_NUM_THREADS``
* ``MKL_NUM_THREADS``
* ``NUMEXPR_NUM_THREADS``

Why this matters
----------------

Some SARvey stages use Python multiprocessing while numerical libraries may also use internal
native threads. If both layers are aggressive, CPU oversubscription can happen and runtime may get
worse even with high CPU utilization.

Recommended tuning workflow
---------------------------

1. Start conservative:

* set ``general:num_cores`` to about half of physical cores
* keep ``general:num_patches = 1`` unless memory is insufficient

2. Increase ``general:num_cores`` stepwise and compare runtime for the same step and dataset.

3. Increase ``general:num_patches`` only when memory is a bottleneck.

4. On shared systems, prefer explicit environment settings in your job script if your site requires them.

Troubleshooting high CPU load
-----------------------------

If CPU usage is too high:

1. Lower ``general:num_cores`` first.
2. Keep ``general:num_patches`` low unless needed for memory.
3. Check whether your environment defines high values for BLAS/OpenMP variables.
4. Re-run one processing step and compare wall-clock time, not only CPU percent.

Quick notes
-----------

* Different processing steps scale differently with core count.
* Higher CPU usage does not always mean faster processing.
* Patching can increase I/O overhead; use it primarily for memory control.
2 changes: 2 additions & 0 deletions docs/processing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ By setting **general:num_patches** the data is split into spatial patches and pr
However, only use the patching option if the memory is not sufficient to process the data in one go.
Using multiple patches will slow down the processing due to the overhead of loading and saving the data multiple times.

For practical user-side tuning guidance of **general:num_cores** and **general:num_patches**, see :doc:`parallel_processing`.


Processing steps for one-step unwrapping workflow
-------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions sarvey/coherence.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def computeIfgsAndTemporalCoherence(*, path_temp_coh: str, path_ifgs: str, path_
avg_neighbours[:, :, i] = convolve2d(in1=ifgs[:, :, i], in2=filter_kernel, mode='same', boundary="symm")
else:

num_cores = min(num_cores, num_ifgs)
args = [(
idx,
ifgs[:, :, idx],
Expand Down
5 changes: 2 additions & 3 deletions sarvey/densification.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def launchDensifyNetworkConsistencyCheck(args: tuple):
design_mat=design_mat
)

prog_bar.update(counter + 1, every=np.int16(200),
prog_bar.update(counter + 1, every=200,
suffix='{}/{} points'.format(counter + 1, num_points))
counter += 1

Expand Down Expand Up @@ -224,10 +224,9 @@ def densifyNetwork(*, point1_obj: Points, vel_p1: np.ndarray, demerr_p1: np.ndar
velocity_bound, demerr_bound, num_samples)
idx_range, demerr_p2, vel_p2, gamma_p2 = launchDensifyNetworkConsistencyCheck(args)
else:
num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=point2_obj.num_points)
with multiprocessing.Pool(num_cores, initializer=densificationInitializer, initargs=init_args) as pool:
logger.info(msg="start parallel processing with {} cores.".format(num_cores))
num_cores = point2_obj.num_points if num_cores > point2_obj.num_points else num_cores
# avoids having less samples than cores
idx = ut.splitDatasetForParallelProcessing(num_samples=point2_obj.num_points, num_cores=num_cores)
args = [(
idx_range,
Expand Down
2 changes: 1 addition & 1 deletion sarvey/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def estimateAtmosphericPhaseScreen(*, residuals: np.ndarray, coord_utm1: np.ndar
aps1 = np.zeros((num_points1, num_time), dtype=np.float32)
aps2 = np.zeros((num_points2, num_time), dtype=np.float32)

num_cores = num_time if num_cores > num_time else num_cores # avoids having more samples than cores
num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=num_time)
idx = ut.splitDatasetForParallelProcessing(num_samples=num_time, num_cores=num_cores)

args = [(
Expand Down
8 changes: 4 additions & 4 deletions sarvey/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,8 @@ def runUnwrappingTimeAndSpace(self):
phase=unw_phase,
num_points=point_obj.num_points,
ifg_net_obj=point_obj.ifg_net_obj,
num_cores=1, # self.config.general.num_cores,
ref_idx=0,
num_cores=self.config.general.num_cores,
logger=self.logger
)
point_obj = Points(file_path=join(self.path, "p1_ts.h5"), logger=self.logger)
Expand Down Expand Up @@ -575,8 +575,8 @@ def runUnwrappingSpace(self):
# for sbas the ifg network needs to be inverted to get the phase time series
phase_ts = ut.invertIfgNetwork(phase=unw_phase, num_points=point_obj.num_points,
ifg_net_obj=point_obj.ifg_net_obj,
num_cores=1, # self.config.general.num_cores,
ref_idx=0,
num_cores=self.config.general.num_cores,
logger=self.logger)

point_obj.phase = phase_ts
Expand Down Expand Up @@ -1071,7 +1071,7 @@ def runDensificationTimeAndSpace(self):
phase=unw_phase,
num_points=point2_obj.num_points,
ifg_net_obj=point2_obj.ifg_net_obj,
num_cores=1, # self.config.general.num_cores,
num_cores=self.config.general.num_cores,
ref_idx=0,
logger=self.logger)

Expand Down Expand Up @@ -1131,8 +1131,8 @@ def runDensificationSpace(self):

phase_ts = ut.invertIfgNetwork(phase=unw_phase, num_points=point_obj.num_points,
ifg_net_obj=point_obj.ifg_net_obj,
num_cores=1, # self.config.general.num_cores,
ref_idx=0,
num_cores=self.config.general.num_cores,
logger=self.logger)

point_obj.phase = phase_ts
Expand Down
20 changes: 13 additions & 7 deletions sarvey/sarvey_mti.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from os.path import join

import json5
import matplotlib
import sys
import logging
import time
Expand All @@ -46,12 +45,7 @@
from sarvey.console import printStep, printCurrentConfig, showLogoSARvey
from sarvey.processing import Processing
from sarvey.config import Config, loadConfiguration
from sarvey.utils import checkIfRequiredFilesExist

try:
matplotlib.use('QtAgg')
except ImportError as e:
print(e)
from sarvey.utils import checkIfRequiredFilesExist, setNativeThreadEnvIfUnset

EXAMPLE = """Example:
sarvey -f config.json -g # create default config file with the name config.json and exit
Expand Down Expand Up @@ -321,6 +315,18 @@ def main(iargs=None):

config = loadConfiguration(path=config_file_path)

# Apply unset-only native-thread defaults based on user configuration to reduce nested parallelism.
native_thread_updates = setNativeThreadEnvIfUnset(num_threads=1)
if native_thread_updates:
logger.info(msg=(
"Native thread limits set by SARvey (unset-only, using general:num_cores={0}): {1}"
).format(config.general.num_cores,
", ".join(f"{k}={v}" for k, v in sorted(native_thread_updates.items()))))
else:
logger.info(msg=(
"Native thread limits unchanged (unset-only): existing environment values preserved."
))

current_datetime = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
log_filename = f"sarvey_{current_datetime}.log"
logpath = config.general.logfile_path
Expand Down
6 changes: 2 additions & 4 deletions sarvey/unwrapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ def temporalUnwrapping(*, ifg_net_obj: IfgNetwork, net_obj: Network, wavelength:
vel = np.zeros((net_obj.num_arcs, 1), dtype=np.float32)
gamma = np.zeros((net_obj.num_arcs, 1), dtype=np.float32)

num_cores = net_obj.num_arcs if num_cores > net_obj.num_arcs else num_cores # avoids having more samples
# then cores
num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=net_obj.num_arcs)
idx = ut.splitDatasetForParallelProcessing(num_samples=net_obj.num_arcs, num_cores=num_cores)

args = [(
Expand Down Expand Up @@ -476,8 +475,7 @@ def spatialUnwrapping(*, num_ifgs: int, num_points: int, phase: np.ndarray, edge
logger.info(msg="start parallel processing with {} cores.".format(num_cores))

unw_phase = np.zeros((num_points, num_ifgs), dtype=np.float32)
num_cores = num_ifgs if num_cores > num_ifgs else num_cores
# avoids having more samples than cores
num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=num_ifgs)
idx = ut.splitDatasetForParallelProcessing(num_samples=num_ifgs, num_cores=num_cores)

args = [(
Expand Down
50 changes: 46 additions & 4 deletions sarvey/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
# with this program. If not, see <https://www.gnu.org/licenses/>.

"""Utils module for SARvey."""
import multiprocessing
import os
import time
from os.path import exists, join
import multiprocessing

import numpy as np
from scipy.sparse.linalg import lsqr
Expand All @@ -53,6 +54,23 @@ def convertBboxToBlock(*, bbox: tuple):
return block


def clampNumCores(*, requested_cores: int, num_samples: int) -> int:
"""Clamp the requested number of worker processes to a safe usable range."""
if requested_cores <= 0:
raise ValueError("Number of cores must be greater than zero.")
if num_samples <= 0:
raise ValueError("Number of samples must be greater than zero.")
return min(requested_cores, num_samples)


def scaleNumCores(*, requested_cores: int, num_samples: int, scale: float = 1.0) -> int:
"""Scale and clamp worker count while guaranteeing at least one worker."""
if scale <= 0:
raise ValueError("Scale must be greater than zero.")
scaled_cores = max(1, int(np.floor(requested_cores * scale)))
return clampNumCores(requested_cores=scaled_cores, num_samples=num_samples)


def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetwork, num_cores: int, ref_idx: int,
logger: Logger):
"""Wrap the ifg network inversion running in parallel.
Expand Down Expand Up @@ -89,13 +107,11 @@ def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetw
args = (np.arange(num_points), num_points, phase, design_mat, ifg_net_obj.num_images, ref_idx)
idx_range, phase_ts = launchInvertIfgNetwork(parameters=args)
else:
# use only 10 percent of the cores, because scipy.sparse.linalg.lsqr is already running in parallel
num_cores = int(np.floor(num_cores / 10))
num_cores = clampNumCores(requested_cores=num_cores, num_samples=num_points)
logger.info(msg="start parallel processing with {} cores.".format(num_cores))

phase_ts = np.zeros((num_points, ifg_net_obj.num_images), dtype=np.float32)

num_cores = num_points if num_cores > num_points else num_cores # avoids having more samples than cores
idx = splitDatasetForParallelProcessing(num_samples=num_points, num_cores=num_cores)
args = [(
idx_range,
Expand Down Expand Up @@ -527,6 +543,7 @@ def splitDatasetForParallelProcessing(*, num_samples: int, num_cores: int):
idx: list
list of sample ranges for each core
"""
num_cores = clampNumCores(requested_cores=num_cores, num_samples=num_samples)
rest = np.mod(num_samples, num_cores)
avg_num_samples_per_core = int((num_samples - rest) / num_cores)
num_samples_per_core = np.zeros((num_cores,), dtype=np.int64)
Expand Down Expand Up @@ -867,3 +884,28 @@ def checkIfRequiredFilesExist(*, path_to_files: str, required_files: list, logge
if not exists(join(path_to_files, file)):
logger.error(f"File from previous step(s) is missing: {file}.")
raise FileNotFoundError


def setNativeThreadEnvIfUnset(*, num_threads: int = 1, logger: Logger = None) -> dict:
"""Set native library thread limits only if environment variables are unset."""
if num_threads <= 0:
raise ValueError("Number of native threads must be greater than zero.")

env_vars = (
"OMP_NUM_THREADS",
"OPENBLAS_NUM_THREADS",
"MKL_NUM_THREADS",
"NUMEXPR_NUM_THREADS",
)

updates = {}
for var_name in env_vars:
if os.environ.get(var_name) is None:
os.environ[var_name] = str(num_threads)
updates[var_name] = str(num_threads)

if (logger is not None) and updates:
logger.info(msg="Set native thread limits for unset environment variables: {}"
.format(", ".join(f"{k}={v}" for k, v in updates.items())))

return updates
Loading