From 18bf63c736e611bbeb3cb4fb43cd03824aa9566a Mon Sep 17 00:00:00 2001 From: piter Date: Thu, 19 Mar 2026 15:28:23 +0100 Subject: [PATCH 1/8] Add user documentation about parallel processing. --- docs/index.rst | 1 + docs/parallelization.rst | 66 ++++++++++++++++++++++++++++++++++++++++ docs/processing.rst | 2 ++ sarvey/processing.py | 4 --- 4 files changed, 69 insertions(+), 4 deletions(-) create mode 100644 docs/parallelization.rst diff --git a/docs/index.rst b/docs/index.rst index cf0b562b..450d8a2c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -12,6 +12,7 @@ SARvey documentation usage preparation processing + parallelization visualization demo_datasets modules diff --git a/docs/parallelization.rst b/docs/parallelization.rst new file mode 100644 index 00000000..100618e0 --- /dev/null +++ b/docs/parallelization.rst @@ -0,0 +1,66 @@ +Parallel processing (user guide) +================================ + +This page explains how to tune parallel processing in SARvey from a user perspective. + +Main settings +------------- + +``general:num_cores`` + Controls process-level parallelism in the CPU-heavy processing stages. + +``general:num_patches`` + Splits the image stack into spatial patches to reduce memory pressure. + This is mostly a memory and I/O control, not a direct multiplier for CPU parallelism. + +How SARvey controls native thread libraries +------------------------------------------- + +At startup, SARvey applies an *unset-only* policy for native thread environment variables. +If a variable is not set, SARvey sets it to the value configured in ``general:num_cores``. If a variable is already set in your shell, +job scheduler, or container environment, SARvey keeps the existing value. + +Variables handled: + +* ``OMP_NUM_THREADS`` +* ``OPENBLAS_NUM_THREADS`` +* ``MKL_NUM_THREADS`` +* ``NUMEXPR_NUM_THREADS`` + +Why this matters +---------------- + +Some SARvey stages use Python multiprocessing while numerical libraries may also use internal +native threads. If both layers are aggressive, CPU oversubscription can happen and runtime may get +worse even with high CPU utilization. + +Recommended tuning workflow +--------------------------- + +1. Start conservative: + + * set ``general:num_cores`` to about half of physical cores + * keep ``general:num_patches = 1`` unless memory is insufficient + +2. Increase ``general:num_cores`` stepwise and compare runtime for the same step and dataset. + +3. Increase ``general:num_patches`` only when memory is a bottleneck. + +4. On shared systems, prefer explicit environment settings in your job script if your site requires them. + +Troubleshooting high CPU load +----------------------------- + +If CPU usage is too high: + +1. Lower ``general:num_cores`` first. +2. Keep ``general:num_patches`` low unless needed for memory. +3. Check whether your environment defines high values for BLAS/OpenMP variables. +4. Re-run one processing step and compare wall-clock time, not only CPU percent. + +Quick notes +----------- + +* Different processing steps scale differently with core count. +* Higher CPU usage does not always mean faster processing. +* Patching can increase I/O overhead; use it primarily for memory control. diff --git a/docs/processing.rst b/docs/processing.rst index fc98795c..2fd551f7 100644 --- a/docs/processing.rst +++ b/docs/processing.rst @@ -320,6 +320,8 @@ By setting **general:num_patches** the data is split into spatial patches and pr However, only use the patching option if the memory is not sufficient to process the data in one go. Using multiple patches will slow down the processing due to the overhead of loading and saving the data multiple times. +For practical user-side tuning guidance of **general:num_cores** and **general:num_patches**, see :doc:`parallel_processing`. + Processing steps for one-step unwrapping workflow ------------------------------------------------- diff --git a/sarvey/processing.py b/sarvey/processing.py index c29fdcfd..edb5e80e 100644 --- a/sarvey/processing.py +++ b/sarvey/processing.py @@ -504,7 +504,6 @@ def runUnwrappingTimeAndSpace(self): phase=unw_phase, num_points=point_obj.num_points, ifg_net_obj=point_obj.ifg_net_obj, - num_cores=1, # self.config.general.num_cores, ref_idx=0, logger=self.logger ) @@ -575,7 +574,6 @@ def runUnwrappingSpace(self): # for sbas the ifg network needs to be inverted to get the phase time series phase_ts = ut.invertIfgNetwork(phase=unw_phase, num_points=point_obj.num_points, ifg_net_obj=point_obj.ifg_net_obj, - num_cores=1, # self.config.general.num_cores, ref_idx=0, logger=self.logger) @@ -1071,7 +1069,6 @@ def runDensificationTimeAndSpace(self): phase=unw_phase, num_points=point2_obj.num_points, ifg_net_obj=point2_obj.ifg_net_obj, - num_cores=1, # self.config.general.num_cores, ref_idx=0, logger=self.logger) @@ -1131,7 +1128,6 @@ def runDensificationSpace(self): phase_ts = ut.invertIfgNetwork(phase=unw_phase, num_points=point_obj.num_points, ifg_net_obj=point_obj.ifg_net_obj, - num_cores=1, # self.config.general.num_cores, ref_idx=0, logger=self.logger) From ee53f928b45b925b0bab3efb2443f2681e66075e Mon Sep 17 00:00:00 2001 From: piter Date: Thu, 19 Mar 2026 15:31:07 +0100 Subject: [PATCH 2/8] Clamp the number of cores to avoid requesting more threads than needed. --- sarvey/coherence.py | 1 + sarvey/densification.py | 3 +-- sarvey/filtering.py | 2 +- sarvey/unwrapping.py | 6 ++---- sarvey/utils.py | 20 +++++++++++++++++++- 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/sarvey/coherence.py b/sarvey/coherence.py index 8a9e2925..46708c1c 100644 --- a/sarvey/coherence.py +++ b/sarvey/coherence.py @@ -110,6 +110,7 @@ def computeIfgsAndTemporalCoherence(*, path_temp_coh: str, path_ifgs: str, path_ avg_neighbours[:, :, i] = convolve2d(in1=ifgs[:, :, i], in2=filter_kernel, mode='same', boundary="symm") else: + num_cores = min(num_cores, num_ifgs) args = [( idx, ifgs[:, :, idx], diff --git a/sarvey/densification.py b/sarvey/densification.py index 7bfed6ca..ce523819 100644 --- a/sarvey/densification.py +++ b/sarvey/densification.py @@ -224,10 +224,9 @@ def densifyNetwork(*, point1_obj: Points, vel_p1: np.ndarray, demerr_p1: np.ndar velocity_bound, demerr_bound, num_samples) idx_range, demerr_p2, vel_p2, gamma_p2 = launchDensifyNetworkConsistencyCheck(args) else: + num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=point2_obj.num_points) with multiprocessing.Pool(num_cores, initializer=densificationInitializer, initargs=init_args) as pool: logger.info(msg="start parallel processing with {} cores.".format(num_cores)) - num_cores = point2_obj.num_points if num_cores > point2_obj.num_points else num_cores - # avoids having less samples than cores idx = ut.splitDatasetForParallelProcessing(num_samples=point2_obj.num_points, num_cores=num_cores) args = [( idx_range, diff --git a/sarvey/filtering.py b/sarvey/filtering.py index a5f06161..324cc6fd 100644 --- a/sarvey/filtering.py +++ b/sarvey/filtering.py @@ -218,7 +218,7 @@ def estimateAtmosphericPhaseScreen(*, residuals: np.ndarray, coord_utm1: np.ndar aps1 = np.zeros((num_points1, num_time), dtype=np.float32) aps2 = np.zeros((num_points2, num_time), dtype=np.float32) - num_cores = num_time if num_cores > num_time else num_cores # avoids having more samples than cores + num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=num_time) idx = ut.splitDatasetForParallelProcessing(num_samples=num_time, num_cores=num_cores) args = [( diff --git a/sarvey/unwrapping.py b/sarvey/unwrapping.py index f89222e9..1de50104 100644 --- a/sarvey/unwrapping.py +++ b/sarvey/unwrapping.py @@ -350,8 +350,7 @@ def temporalUnwrapping(*, ifg_net_obj: IfgNetwork, net_obj: Network, wavelength: vel = np.zeros((net_obj.num_arcs, 1), dtype=np.float32) gamma = np.zeros((net_obj.num_arcs, 1), dtype=np.float32) - num_cores = net_obj.num_arcs if num_cores > net_obj.num_arcs else num_cores # avoids having more samples - # then cores + num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=net_obj.num_arcs) idx = ut.splitDatasetForParallelProcessing(num_samples=net_obj.num_arcs, num_cores=num_cores) args = [( @@ -476,8 +475,7 @@ def spatialUnwrapping(*, num_ifgs: int, num_points: int, phase: np.ndarray, edge logger.info(msg="start parallel processing with {} cores.".format(num_cores)) unw_phase = np.zeros((num_points, num_ifgs), dtype=np.float32) - num_cores = num_ifgs if num_cores > num_ifgs else num_cores - # avoids having more samples than cores + num_cores = ut.clampNumCores(requested_cores=num_cores, num_samples=num_ifgs) idx = ut.splitDatasetForParallelProcessing(num_samples=num_ifgs, num_cores=num_cores) args = [( diff --git a/sarvey/utils.py b/sarvey/utils.py index b5c96c4d..a395d236 100644 --- a/sarvey/utils.py +++ b/sarvey/utils.py @@ -53,7 +53,24 @@ def convertBboxToBlock(*, bbox: tuple): return block -def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetwork, num_cores: int, ref_idx: int, +def clampNumCores(*, requested_cores: int, num_samples: int) -> int: + """Clamp the requested number of worker processes to a safe usable range.""" + if requested_cores <= 0: + raise ValueError("Number of cores must be greater than zero.") + if num_samples <= 0: + raise ValueError("Number of samples must be greater than zero.") + return min(requested_cores, num_samples) + + +def scaleNumCores(*, requested_cores: int, num_samples: int, scale: float = 1.0) -> int: + """Scale and clamp worker count while guaranteeing at least one worker.""" + if scale <= 0: + raise ValueError("Scale must be greater than zero.") + scaled_cores = max(1, int(np.floor(requested_cores * scale))) + return clampNumCores(requested_cores=scaled_cores, num_samples=num_samples) + + +def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetwork, ref_idx: int, logger: Logger): """Wrap the ifg network inversion running in parallel. @@ -527,6 +544,7 @@ def splitDatasetForParallelProcessing(*, num_samples: int, num_cores: int): idx: list list of sample ranges for each core """ + num_cores = clampNumCores(requested_cores=num_cores, num_samples=num_samples) rest = np.mod(num_samples, num_cores) avg_num_samples_per_core = int((num_samples - rest) / num_cores) num_samples_per_core = np.zeros((num_cores,), dtype=np.int64) From a553af7cf6c380e1f6b25772457b0f35775e1d25 Mon Sep 17 00:00:00 2001 From: piter Date: Thu, 19 Mar 2026 15:33:37 +0100 Subject: [PATCH 3/8] Set native-thread defaults if it was not set by the user. --- sarvey/sarvey_mti.py | 14 +++++++++++++- sarvey/utils.py | 26 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/sarvey/sarvey_mti.py b/sarvey/sarvey_mti.py index be45ff9f..66b6af41 100755 --- a/sarvey/sarvey_mti.py +++ b/sarvey/sarvey_mti.py @@ -46,7 +46,7 @@ from sarvey.console import printStep, printCurrentConfig, showLogoSARvey from sarvey.processing import Processing from sarvey.config import Config, loadConfiguration -from sarvey.utils import checkIfRequiredFilesExist +from sarvey.utils import checkIfRequiredFilesExist, setNativeThreadEnvIfUnset try: matplotlib.use('QtAgg') @@ -321,6 +321,18 @@ def main(iargs=None): config = loadConfiguration(path=config_file_path) + # Apply unset-only native-thread defaults based on user configuration to reduce nested parallelism. + native_thread_updates = setNativeThreadEnvIfUnset(num_threads=config.general.num_cores) + if native_thread_updates: + logger.info(msg=( + "Native thread limits set by SARvey (unset-only, using general:num_cores={0}): {1}" + ).format(config.general.num_cores, + ", ".join(f"{k}={v}" for k, v in sorted(native_thread_updates.items())))) + else: + logger.info(msg=( + "Native thread limits unchanged (unset-only): existing environment values preserved." + )) + current_datetime = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) log_filename = f"sarvey_{current_datetime}.log" logpath = config.general.logfile_path diff --git a/sarvey/utils.py b/sarvey/utils.py index a395d236..e95114df 100644 --- a/sarvey/utils.py +++ b/sarvey/utils.py @@ -29,6 +29,7 @@ """Utils module for SARvey.""" import multiprocessing +import os import time from os.path import exists, join @@ -885,3 +886,28 @@ def checkIfRequiredFilesExist(*, path_to_files: str, required_files: list, logge if not exists(join(path_to_files, file)): logger.error(f"File from previous step(s) is missing: {file}.") raise FileNotFoundError + + +def setNativeThreadEnvIfUnset(*, num_threads: int = 1, logger: Logger = None) -> dict: + """Set native library thread limits only if environment variables are unset.""" + if num_threads <= 0: + raise ValueError("Number of native threads must be greater than zero.") + + env_vars = ( + "OMP_NUM_THREADS", + "OPENBLAS_NUM_THREADS", + "MKL_NUM_THREADS", + "NUMEXPR_NUM_THREADS", + ) + + updates = {} + for var_name in env_vars: + if os.environ.get(var_name) is None: + os.environ[var_name] = str(num_threads) + updates[var_name] = str(num_threads) + + if (logger is not None) and updates: + logger.info(msg="Set native thread limits for unset environment variables: {}" + .format(", ".join(f"{k}={v}" for k, v in updates.items()))) + + return updates From 7d45f21dc9b516813728b37e5c8aafd874c44840 Mon Sep 17 00:00:00 2001 From: piter Date: Thu, 19 Mar 2026 15:35:16 +0100 Subject: [PATCH 4/8] Refactor invertIfgNetwork to use remove multiprocessing and use only native threading of scipy. --- sarvey/utils.py | 75 ++++--------------------------------------------- 1 file changed, 6 insertions(+), 69 deletions(-) diff --git a/sarvey/utils.py b/sarvey/utils.py index e95114df..5e3452e8 100644 --- a/sarvey/utils.py +++ b/sarvey/utils.py @@ -83,8 +83,6 @@ def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetw number of points. ifg_net_obj: IfgNetwork instance of class IfgNetwork. - num_cores: int - number of cores to use for multiprocessing. ref_idx: int index of temporal reference date for interferogram network inversion. logger: Logger @@ -103,74 +101,10 @@ def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetw start_time = time.time() design_mat = ifg_net_obj.getDesignMatrix() - if num_cores == 1: - args = (np.arange(num_points), num_points, phase, design_mat, ifg_net_obj.num_images, ref_idx) - idx_range, phase_ts = launchInvertIfgNetwork(parameters=args) - else: - # use only 10 percent of the cores, because scipy.sparse.linalg.lsqr is already running in parallel - num_cores = int(np.floor(num_cores / 10)) - logger.info(msg="start parallel processing with {} cores.".format(num_cores)) - - phase_ts = np.zeros((num_points, ifg_net_obj.num_images), dtype=np.float32) - - num_cores = num_points if num_cores > num_points else num_cores # avoids having more samples than cores - idx = splitDatasetForParallelProcessing(num_samples=num_points, num_cores=num_cores) - args = [( - idx_range, - idx_range.shape[0], - phase[idx_range, :], - design_mat, - ifg_net_obj.num_images, - ref_idx) for idx_range in idx] - - with multiprocessing.Pool(processes=num_cores) as pool: - results = pool.map(func=launchInvertIfgNetwork, iterable=args) - - # retrieve results - for i, phase_i in results: - phase_ts[i, :] = phase_i - - m, s = divmod(time.time() - start_time, 60) - logger.debug(msg='time used: {:02.0f} mins {:02.1f} secs.'.format(m, s)) - return phase_ts - - -def launchInvertIfgNetwork(parameters: tuple): - """Launch the inversion of the interferogram network in parallel. - - Parameters - ---------- - parameters: tuple - parameters for inversion - - Tuple contains: - idx_range: np.ndarray - range of point indices to be processed - num_points: int - number of points - phase: np.ndarray - interferometric phases of the points - design_mat: np.ndarray - design matrix - num_images: int - number of images - ref_idx: int - index of temporal reference date for interferogram network inversion - - Returns - ------- - idx_range: np.ndarray - range of indices of the points processed - phase_ts: np.ndarray - inverted phase time series - """ - # Unpack the parameters - (idx_range, num_points, phase, design_mat, num_images, ref_idx) = parameters - design_mat = np.delete(arr=design_mat, obj=ref_idx, axis=1) # remove reference date - idx = np.ones((num_images,), dtype=np.bool_) + idx = np.ones((ifg_net_obj.num_images,), dtype=np.bool_) idx[ref_idx] = False - phase_ts = np.zeros((num_points, num_images), dtype=np.float32) + phase_ts = np.zeros((num_points, ifg_net_obj.num_images), dtype=np.float32) prog_bar = ptime.progressBar(maxValue=num_points) for i in range(num_points): @@ -178,7 +112,10 @@ def launchInvertIfgNetwork(parameters: tuple): prog_bar.update(value=i + 1, every=np.ceil(num_points / 100), suffix='{}/{} points'.format(i + 1, num_points)) - return idx_range, phase_ts + m, s = divmod(time.time() - start_time, 60) + logger.debug(msg='time used: {:02.0f} mins {:02.1f} secs.'.format(m, s)) + + return phase_ts def predictPhase(*, obj: [NetworkParameter, Points], vel: np.ndarray = None, demerr: np.ndarray = None, From 1d46b82fba9902f70b29f285cfebc8f1498f8352 Mon Sep 17 00:00:00 2001 From: piter Date: Thu, 19 Mar 2026 15:35:28 +0100 Subject: [PATCH 5/8] Add tests for parallelization. --- tests/test_parallelization.py | 118 ++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 tests/test_parallelization.py diff --git a/tests/test_parallelization.py b/tests/test_parallelization.py new file mode 100644 index 00000000..75ea64ed --- /dev/null +++ b/tests/test_parallelization.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python + +# SARvey - A multitemporal InSAR time series tool for the derivation of displacements. +# +# Copyright (C) 2021-2026 Andreas Piter (IPI Hannover, piter@ipi.uni-hannover.de) +# +# This software was developed together with FERN.Lab (fernlab@gfz-potsdam.de) in the context +# of the SAR4Infra project with funds of the German Federal Ministry for Digital and +# Transport and contributions from Landesamt fuer Vermessung und Geoinformation +# Schleswig-Holstein and Landesbetrieb Strassenbau und Verkehr Schleswig-Holstein. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# Important: This package uses PyMaxFlow. The core of PyMaxflows library is the C++ +# implementation by Vladimir Kolmogorov. It is also licensed under the GPL, but it REQUIRES that you +# cite [BOYKOV04] (see LICENSE) in any resulting publication if you use this code for research purposes. +# This requirement extends to SARvey. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program. If not, see . + +"""Tests for worker-count utilities used by SARvey parallel processing.""" + +import os +import unittest +import numpy as np + +from sarvey.utils import clampNumCores, scaleNumCores, splitDatasetForParallelProcessing, setNativeThreadEnvIfUnset + + +class TestParallelizationUtils(unittest.TestCase): + """Test safety guards for parallel worker allocation.""" + + _native_thread_vars = ( + "OMP_NUM_THREADS", + "OPENBLAS_NUM_THREADS", + "MKL_NUM_THREADS", + "NUMEXPR_NUM_THREADS", + ) + + def setUp(self): + self._env_backup = {k: os.environ.get(k) for k in self._native_thread_vars} + + def tearDown(self): + for key, val in self._env_backup.items(): + if val is None: + os.environ.pop(key, None) + else: + os.environ[key] = val + + def testClampNumCores_caps_to_num_samples(self): + """Clamp worker count to available samples.""" + self.assertEqual(clampNumCores(requested_cores=16, num_samples=3), 3) + self.assertEqual(clampNumCores(requested_cores=4, num_samples=10), 4) + + def testClampNumCores_rejects_invalid_values(self): + """Reject non-positive workers or samples.""" + with self.assertRaises(ValueError): + clampNumCores(requested_cores=0, num_samples=5) + with self.assertRaises(ValueError): + clampNumCores(requested_cores=2, num_samples=0) + + def testScaleNumCores_keeps_at_least_one_worker(self): + """Avoid the old floor-to-zero behavior for requested cores below ten.""" + self.assertEqual(scaleNumCores(requested_cores=2, num_samples=50, scale=0.1), 1) + self.assertEqual(scaleNumCores(requested_cores=9, num_samples=50, scale=0.1), 1) + self.assertEqual(scaleNumCores(requested_cores=10, num_samples=50, scale=0.1), 1) + self.assertEqual(scaleNumCores(requested_cores=20, num_samples=50, scale=0.1), 2) + + def testSplitDatasetForParallelProcessing_clamps_num_cores(self): + """Split logic should automatically cap workers to sample count.""" + idx = splitDatasetForParallelProcessing(num_samples=3, num_cores=8) + self.assertEqual(len(idx), 3) + np.testing.assert_array_equal(idx[0], np.array([0])) + np.testing.assert_array_equal(idx[1], np.array([1])) + np.testing.assert_array_equal(idx[2], np.array([2])) + + def testSplitDatasetForParallelProcessing_balances_ranges(self): + """Chunks should remain balanced when sample count is not divisible by workers.""" + idx = splitDatasetForParallelProcessing(num_samples=10, num_cores=3) + lengths = [cur.shape[0] for cur in idx] + self.assertEqual(lengths, [4, 3, 3]) + np.testing.assert_array_equal(np.concatenate(idx), np.arange(10)) + + def testSetNativeThreadEnvIfUnset_sets_only_missing(self): + """Unset variables are initialized while pre-set variables are kept.""" + os.environ.pop("OMP_NUM_THREADS", None) + os.environ["OPENBLAS_NUM_THREADS"] = "7" + os.environ.pop("MKL_NUM_THREADS", None) + os.environ.pop("NUMEXPR_NUM_THREADS", None) + + updates = setNativeThreadEnvIfUnset(num_threads=1) + + self.assertEqual(os.environ["OMP_NUM_THREADS"], "1") + self.assertEqual(os.environ["OPENBLAS_NUM_THREADS"], "7") + self.assertEqual(os.environ["MKL_NUM_THREADS"], "1") + self.assertEqual(os.environ["NUMEXPR_NUM_THREADS"], "1") + self.assertEqual(updates["OMP_NUM_THREADS"], "1") + self.assertEqual(updates["MKL_NUM_THREADS"], "1") + self.assertEqual(updates["NUMEXPR_NUM_THREADS"], "1") + self.assertTrue("OPENBLAS_NUM_THREADS" not in updates) + + def testSetNativeThreadEnvIfUnset_rejects_invalid_threads(self): + """Native thread limit must be positive.""" + with self.assertRaises(ValueError): + setNativeThreadEnvIfUnset(num_threads=0) + + +if __name__ == '__main__': + unittest.main() From c53a8704987ff753630a3109d51cdaee4412f026 Mon Sep 17 00:00:00 2001 From: piter Date: Thu, 19 Mar 2026 15:42:28 +0100 Subject: [PATCH 6/8] Remove interactive matplotlib and error message. --- sarvey/sarvey_mti.py | 6 ------ sarvey/utils.py | 1 - 2 files changed, 7 deletions(-) diff --git a/sarvey/sarvey_mti.py b/sarvey/sarvey_mti.py index 66b6af41..6f0391a8 100755 --- a/sarvey/sarvey_mti.py +++ b/sarvey/sarvey_mti.py @@ -35,7 +35,6 @@ from os.path import join import json5 -import matplotlib import sys import logging import time @@ -48,11 +47,6 @@ from sarvey.config import Config, loadConfiguration from sarvey.utils import checkIfRequiredFilesExist, setNativeThreadEnvIfUnset -try: - matplotlib.use('QtAgg') -except ImportError as e: - print(e) - EXAMPLE = """Example: sarvey -f config.json -g # create default config file with the name config.json and exit sarvey -f config.json 0 0 # run step 0 (preparation) diff --git a/sarvey/utils.py b/sarvey/utils.py index 5e3452e8..5bebae17 100644 --- a/sarvey/utils.py +++ b/sarvey/utils.py @@ -28,7 +28,6 @@ # with this program. If not, see . """Utils module for SARvey.""" -import multiprocessing import os import time from os.path import exists, join From 10e69797260ee7aa9491453a8ea20ba07ee2798b Mon Sep 17 00:00:00 2001 From: piter Date: Thu, 19 Mar 2026 16:03:50 +0100 Subject: [PATCH 7/8] Update history. --- HISTORY.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/HISTORY.rst b/HISTORY.rst index 467e5c35..7b7c5ce2 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,6 +6,9 @@ History Future minor version (release soon) ----------------------------------- +* Set native thread environment variable to number of cores from config if not set in the environment. +* Improve parallelization to reduce risk of computational overhead. + 1.3.0 (2026-02-23) ------------------ From 69a3de85195ec2b0604613c21a0b4334f8bc6e29 Mon Sep 17 00:00:00 2001 From: piter Date: Fri, 20 Mar 2026 15:50:31 +0100 Subject: [PATCH 8/8] Revert parallelization of invertIfgNetwork with multiprocessing. Set native thread default to 1. Fix bug in densification. --- sarvey/densification.py | 2 +- sarvey/processing.py | 4 +++ sarvey/sarvey_mti.py | 2 +- sarvey/utils.py | 76 +++++++++++++++++++++++++++++++++++++---- 4 files changed, 75 insertions(+), 9 deletions(-) diff --git a/sarvey/densification.py b/sarvey/densification.py index ce523819..ff85e5ef 100644 --- a/sarvey/densification.py +++ b/sarvey/densification.py @@ -141,7 +141,7 @@ def launchDensifyNetworkConsistencyCheck(args: tuple): design_mat=design_mat ) - prog_bar.update(counter + 1, every=np.int16(200), + prog_bar.update(counter + 1, every=200, suffix='{}/{} points'.format(counter + 1, num_points)) counter += 1 diff --git a/sarvey/processing.py b/sarvey/processing.py index edb5e80e..2884da94 100644 --- a/sarvey/processing.py +++ b/sarvey/processing.py @@ -505,6 +505,7 @@ def runUnwrappingTimeAndSpace(self): num_points=point_obj.num_points, ifg_net_obj=point_obj.ifg_net_obj, ref_idx=0, + num_cores=self.config.general.num_cores, logger=self.logger ) point_obj = Points(file_path=join(self.path, "p1_ts.h5"), logger=self.logger) @@ -575,6 +576,7 @@ def runUnwrappingSpace(self): phase_ts = ut.invertIfgNetwork(phase=unw_phase, num_points=point_obj.num_points, ifg_net_obj=point_obj.ifg_net_obj, ref_idx=0, + num_cores=self.config.general.num_cores, logger=self.logger) point_obj.phase = phase_ts @@ -1069,6 +1071,7 @@ def runDensificationTimeAndSpace(self): phase=unw_phase, num_points=point2_obj.num_points, ifg_net_obj=point2_obj.ifg_net_obj, + num_cores=self.config.general.num_cores, ref_idx=0, logger=self.logger) @@ -1129,6 +1132,7 @@ def runDensificationSpace(self): phase_ts = ut.invertIfgNetwork(phase=unw_phase, num_points=point_obj.num_points, ifg_net_obj=point_obj.ifg_net_obj, ref_idx=0, + num_cores=self.config.general.num_cores, logger=self.logger) point_obj.phase = phase_ts diff --git a/sarvey/sarvey_mti.py b/sarvey/sarvey_mti.py index 6f0391a8..e366ccad 100755 --- a/sarvey/sarvey_mti.py +++ b/sarvey/sarvey_mti.py @@ -316,7 +316,7 @@ def main(iargs=None): config = loadConfiguration(path=config_file_path) # Apply unset-only native-thread defaults based on user configuration to reduce nested parallelism. - native_thread_updates = setNativeThreadEnvIfUnset(num_threads=config.general.num_cores) + native_thread_updates = setNativeThreadEnvIfUnset(num_threads=1) if native_thread_updates: logger.info(msg=( "Native thread limits set by SARvey (unset-only, using general:num_cores={0}): {1}" diff --git a/sarvey/utils.py b/sarvey/utils.py index 5bebae17..af9a78a7 100644 --- a/sarvey/utils.py +++ b/sarvey/utils.py @@ -31,6 +31,7 @@ import os import time from os.path import exists, join +import multiprocessing import numpy as np from scipy.sparse.linalg import lsqr @@ -70,7 +71,7 @@ def scaleNumCores(*, requested_cores: int, num_samples: int, scale: float = 1.0) return clampNumCores(requested_cores=scaled_cores, num_samples=num_samples) -def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetwork, ref_idx: int, +def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetwork, num_cores: int, ref_idx: int, logger: Logger): """Wrap the ifg network inversion running in parallel. @@ -82,6 +83,8 @@ def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetw number of points. ifg_net_obj: IfgNetwork instance of class IfgNetwork. + num_cores: int + number of cores to use for multiprocessing. ref_idx: int index of temporal reference date for interferogram network inversion. logger: Logger @@ -100,10 +103,72 @@ def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetw start_time = time.time() design_mat = ifg_net_obj.getDesignMatrix() + if num_cores == 1: + args = (np.arange(num_points), num_points, phase, design_mat, ifg_net_obj.num_images, ref_idx) + idx_range, phase_ts = launchInvertIfgNetwork(parameters=args) + else: + num_cores = clampNumCores(requested_cores=num_cores, num_samples=num_points) + logger.info(msg="start parallel processing with {} cores.".format(num_cores)) + + phase_ts = np.zeros((num_points, ifg_net_obj.num_images), dtype=np.float32) + + idx = splitDatasetForParallelProcessing(num_samples=num_points, num_cores=num_cores) + args = [( + idx_range, + idx_range.shape[0], + phase[idx_range, :], + design_mat, + ifg_net_obj.num_images, + ref_idx) for idx_range in idx] + + with multiprocessing.Pool(processes=num_cores) as pool: + results = pool.map(func=launchInvertIfgNetwork, iterable=args) + + # retrieve results + for i, phase_i in results: + phase_ts[i, :] = phase_i + + m, s = divmod(time.time() - start_time, 60) + logger.debug(msg='time used: {:02.0f} mins {:02.1f} secs.'.format(m, s)) + return phase_ts + + +def launchInvertIfgNetwork(parameters: tuple): + """Launch the inversion of the interferogram network in parallel. + + Parameters + ---------- + parameters: tuple + parameters for inversion + + Tuple contains: + idx_range: np.ndarray + range of point indices to be processed + num_points: int + number of points + phase: np.ndarray + interferometric phases of the points + design_mat: np.ndarray + design matrix + num_images: int + number of images + ref_idx: int + index of temporal reference date for interferogram network inversion + + Returns + ------- + idx_range: np.ndarray + range of indices of the points processed + phase_ts: np.ndarray + inverted phase time series + """ + # Unpack the parameters + (idx_range, num_points, phase, design_mat, num_images, ref_idx) = parameters + design_mat = np.delete(arr=design_mat, obj=ref_idx, axis=1) # remove reference date - idx = np.ones((ifg_net_obj.num_images,), dtype=np.bool_) + idx = np.ones((num_images,), dtype=np.bool_) idx[ref_idx] = False - phase_ts = np.zeros((num_points, ifg_net_obj.num_images), dtype=np.float32) + phase_ts = np.zeros((num_points, num_images), dtype=np.float32) prog_bar = ptime.progressBar(maxValue=num_points) for i in range(num_points): @@ -111,10 +176,7 @@ def invertIfgNetwork(*, phase: np.ndarray, num_points: int, ifg_net_obj: IfgNetw prog_bar.update(value=i + 1, every=np.ceil(num_points / 100), suffix='{}/{} points'.format(i + 1, num_points)) - m, s = divmod(time.time() - start_time, 60) - logger.debug(msg='time used: {:02.0f} mins {:02.1f} secs.'.format(m, s)) - - return phase_ts + return idx_range, phase_ts def predictPhase(*, obj: [NetworkParameter, Points], vel: np.ndarray = None, demerr: np.ndarray = None,