Skip to content

feat: add Gdal process pool#1192

Open
jdroenner wants to merge 31 commits into
mainfrom
jd_1
Open

feat: add Gdal process pool#1192
jdroenner wants to merge 31 commits into
mainfrom
jd_1

Conversation

@jdroenner

Copy link
Copy Markdown
Member
  • I added an entry to CHANGELOG.md if knowledge of this change could be valuable to users.

Here is a brief summary of what I did:

I created a shared pool of gdal workers using independent processes

Comment thread geoengine/justfile Outdated
run: common::_clear
cargo run
run mode="": common::_clear
cargo build --bin gdalsource-process {{ mode }} && cargo run {{ mode }}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cargo build --bin gdalsource-process {{ mode }} && cargo run {{ mode }}
cargo build --bin gdalsource-process {{ mode }}
cargo run {{ mode }}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+ TypedRasterConversion<GridShape2D>
+ TypedRasterConversion<GridShape3D>
+ SaturatingOps
+ bytemuck::Pod

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wir haben doch schon Primitive-Casts für alles implementiert.

@jdroenner jdroenner Jun 3, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ja aber ich nutze das Crate auch nicht dafür sondern um einen Vec<T> nach Vec<u8> zu casten, das über IPC zu versenden und dann aus den Bytes wieder den Vec<T> zu machen ohne großen overhead. Primitive-Cast von einzelnen Werten nach u8 verliert ja Daten. Das hier ist aber einfach die billigste (nicht) Serialisierung.

@coveralls

Copy link
Copy Markdown
Collaborator

Coverage Report for CI Build 26897733292

Coverage decreased (-0.2%) to 87.02%

Details

  • Coverage decreased (-0.2%) from the base build.
  • Patch coverage: 478 uncovered changes across 15 files (1479 of 1957 lines covered, 75.57%).
  • 18 coverage regressions across 7 files.

Uncovered Changes

Top 10 Files by Coverage Impact Changed Covered %
geoengine/operators/src/source/gdal_source/process.rs 329 186 56.53%
geoengine/operators/src/source/gdal_source/process_pool_7.rs 450 338 75.11%
geoengine/operators/src/bin/gdalsource_process_main.rs 79 0 0.0%
geoengine/operators/src/source/gdal_source/mod.rs 771 714 92.61%
geoengine/operators/src/engine/execution_context.rs 35 12 34.29%
geoengine/operators/src/source/multi_band_gdal_source/mod.rs 74 59 79.73%
geoengine/services/src/contexts/postgres.rs 23 8 34.78%
geoengine/operators/src/util/retry.rs 21 14 66.67%
geoengine/operators/src/source/gdal_source/error.rs 6 0 0.0%
geoengine/operators/src/engine/query.rs 12 7 58.33%
Total (28 files) 1957 1479 75.57%

Coverage Regressions

18 previously-covered lines in 7 files lost coverage.

File Lines Losing Coverage Coverage
geoengine/datatypes/src/raster/no_data_value_grid.rs 12 52.5%
geoengine/operators/src/adapters/raster_subquery/raster_subquery_reprojection.rs 1 95.6%
geoengine/operators/src/source/gdal_source/mod.rs 1 92.15%
geoengine/operators/src/util/input/float_with_nan_serde.rs 1 81.82%
geoengine/services/src/contexts/postgres.rs 1 96.48%
geoengine/services/src/datasets/external/aruna/mock_grpc_server.rs 1 83.12%
geoengine/services/src/server.rs 1 0.0%

Coverage Stats

Coverage Status
Relevant Lines: 135320
Covered Lines: 117755
Line Coverage: 87.02%
Coverage Strength: 478060.06 hits per line

💛 - Coveralls

@jdroenner jdroenner marked this pull request as ready for review June 3, 2026 20:33
Err(e)
|| {
// 1. Attempt to grab or open the dataset
let ds = match cache.get_or_open(dp) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there still dataset handling outside the gdal process?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is this done inside the gdal process? maybe move all the code thats not executed inside the main geo engine process to a separate module

})
}

/*

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented out code?

exe_path.pop();
}

let binary_name = if cfg!(windows) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot really build on windows anyway or can we? haven't been able to get proj running under windows.

}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum GdalDataByteVariant {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put "Grid" into the name?

}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum GdalDataVariant<T> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put "grid" into the name?

// [`IpcError`] does not implement the serde traits, and thus cant be send
// via the ipc_channels

// --- 1. STRONGLY TYPED SUB-CATEGORIES (Must be Serialize + Deserialize + Clone) ---

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe remove copilot comments

let child = Command::new(exe_path)
.arg(token)
.arg("debug") // FIXME: paste log level here!
.env_remove("LLVM_PROFILE_FILE")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe explain these removals. it's for llcov right? does it need to be removed in production too?

.map(|o| o.iter().map(String::as_str).collect::<Vec<_>>());

// reverts the thread local configs on drop
let thread_local_configs: Option<TemporaryGdalThreadLocalConfigOptions> = dataset_params

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we need thead local configs for now? I guess the configs should all be global now because e.g. JP2000 driver has problems with thread local configs without setting num cpus to 1

}
}

/// Retrieves the cached dataset if the parameters match, otherwise opens a new dataset with the given parameters, updates the cache, and returns it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really a cache if there is only one entry? isn't this just the current dataset?

fn is_hit(params: Option<&GdalDatasetParameters>, other: &GdalDatasetParameters) -> bool {
// TODO: we could optimize this by hashing the parameters and comparing the hash for a quick check before doing the full equality check, if it turns out to be a bottleneck.
if let Some(cached) = params {
cached.file_path == other.file_path

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the other properties like geotransform etc. don't they need to be the same as well?


impl WorkerAffinity {
/// Computes the affinity score, decaying the reward linearly over time.
/// `now` is passed down to bypass the expensive vDSO clock lookup bottleneck inside hot loops.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is vDSO?

let idle_duration = now.saturating_duration_since(self.timestamp).as_secs_f64();

if idle_duration > CACHE_TTL_SECS {
return 0.0; // Cache expired, connections likely closed or dead

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the TTL and the matching be really connected this way?
cache expiration and score computation could be be two separate things. Evict elements once they are expired, and then match against all not expired entries.

let decay = 1.0 - (idle_duration / CACHE_TTL_SECS);
let mut score = 0.0;

if self.dataset_hash == dataset_hash {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at some point we should check if the dataset is the same and not only the hash

/// Computes the affinity score, decaying the reward linearly over time.
/// `now` is passed down to bypass the expensive vDSO clock lookup bottleneck inside hot loops.
#[inline]
pub fn calculate_score(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add some tests for the score.

.spawn(move || {
Self::worker_companion_loop(id, &tx, &rx, &mut job_rx, &b_tx_worker);
})
.expect("Failed to spawn persistent GDAL companion thread");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the companion thread used for?

job_rx: &mut mpsc::UnboundedReceiver<WorkerJob>,
broker_tx: &mpsc::Sender<BrokerCommand>,
) {
while let Some(job) = job_rx.blocking_recv() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not use tokio runtime and async recv instead of creating a new thread and blocking it?

let (job_tx, mut job_rx) = mpsc::unbounded_channel();
let b_tx_worker = b_tx.clone();

std::thread::Builder::new()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuse code from new here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, the code for spawning and companion creation is duplicated here

let mut best_dataset_active_idx = 0;
let mut datasets_scanned = 0;

// Inlined, highly optimized single pass across eligible datasets

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe remove copilot comment 😅


/// High-affinity immediate dispatch cutoff threshold. If a candidate worker's affinity score
/// matches or exceeds this, we short-circuit the matrix evaluation instantly.
const IMMEDIATE_DISPATCH_THRESHOLD: f64 = 11000.0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea but the number seems pretty unintuitive for me at first. its just same dataset and band right? maybe write SCORE_DATASET_MATCH + SCORE_BAND_MATCH?

break;
}

// Perform atomic dispatch operation

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this atomic?

}
}

state.try_dispatch();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should be done in a sepeate worker thread/green thread/tokio task

pub struct GdalProcessPool {
pub max_processes: u64,
pub global_active_worker: u64,
pub worker_per_dataset: u64,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max worker per dataset?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also what is a dataset: worker_per_gdal_dataset
one geo engine dataset can have multiple gdal datasets, e.g. one for each tile. we do not load balance for geo engine datasets but only for gdal datasets right?


#[derive(Debug, Deserialize)]
pub struct GdalProcessPool {
pub max_processes: u64,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker processes? it's a bit confusing what's the differences between worker and process?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants