From 1c54f36061cfe94c64b9b8e474ec0854bdf9837e Mon Sep 17 00:00:00 2001 From: Noah Kim Date: Tue, 7 Apr 2026 18:43:36 +0900 Subject: [PATCH 1/2] feat: add wasm32 single-thread fallback for threadpool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On wasm32 targets, OS thread spawning is not supported. This adds a single-threaded fallback that executes all operations sequentially on the main thread when compiled for wasm32-unknown-unknown. Changes: - threadpool.rs: split into native (yastl Pool) and wasm (WasmScope, WasmPool) modules, conditionally compiled via cfg(target_arch) - multiexp.rs: use WasmScope on wasm32 instead of yastl::Scope - WasmPool provides scoped() for fft.rs compatibility - WasmScope provides execute() matching yastl::Scope API - Worker API unchanged — callers don't need modification This enables bellperson Groth16 proof generation in the browser via WebAssembly. --- ec-gpu-gen/src/multiexp.rs | 3 + ec-gpu-gen/src/threadpool.rs | 328 +++++++++++++++++++++++------------ 2 files changed, 222 insertions(+), 109 deletions(-) diff --git a/ec-gpu-gen/src/multiexp.rs b/ec-gpu-gen/src/multiexp.rs index 24780b8..f31c2ef 100644 --- a/ec-gpu-gen/src/multiexp.rs +++ b/ec-gpu-gen/src/multiexp.rs @@ -6,7 +6,10 @@ use ff::PrimeField; use group::{prime::PrimeCurveAffine, Group}; use log::{error, info}; use rust_gpu_tools::{program_closures, Device, Program}; +#[cfg(not(target_arch = "wasm32"))] use yastl::Scope; +#[cfg(target_arch = "wasm32")] +use crate::threadpool::WasmScope as Scope; use crate::{ error::{EcError, EcResult}, diff --git a/ec-gpu-gen/src/threadpool.rs b/ec-gpu-gen/src/threadpool.rs index 459f29e..31cc802 100644 --- a/ec-gpu-gen/src/threadpool.rs +++ b/ec-gpu-gen/src/threadpool.rs @@ -1,137 +1,246 @@ //! An interface for dealing with the kinds of parallel computations involved. -use std::env; - -use crossbeam_channel::{bounded, Receiver, SendError}; -use log::trace; -use once_cell::sync::Lazy; -use yastl::Pool; - -/// The number of threads the thread pool should use. -/// -/// By default it's equal to the number of CPUs, but it can be changed with the -/// `EC_GPU_NUM_THREADS` environment variable. -static NUM_THREADS: Lazy = Lazy::new(read_num_threads); - -/// The thread pool that is used for the computations. -/// -/// By default, it's size is equal to the number of CPUs. It can be set to a different value with -/// the `EC_GPU_NUM_THREADS` environment variable. -pub static THREAD_POOL: Lazy = Lazy::new(|| Pool::new(*NUM_THREADS)); - -/// Returns the number of threads. -/// -/// The number can be set with the `EC_GPU_NUM_THREADS` environment variable. If it isn't set, it -/// defaults to the number of CPUs the system has. -fn read_num_threads() -> usize { - env::var("EC_GPU_NUM_THREADS") - .ok() - .and_then(|num| num.parse::().ok()) - .unwrap_or_else(num_cpus::get) -} +//! +//! On native targets, uses a thread pool (yastl) for parallel computation. +//! On `wasm32` targets, executes everything on the main thread since WASM +//! does not support spawning OS threads. -/// A worker operates on a pool of threads. -#[derive(Clone, Default)] -pub struct Worker {} +// ════════════════════════════════════════════════════════════════ +// Native implementation (multi-threaded via yastl) +// ════════════════════════════════════════════════════════════════ -impl Worker { - /// Returns a new worker. - pub fn new() -> Worker { - Worker {} - } +#[cfg(not(target_arch = "wasm32"))] +mod native { + use std::env; - /// Returns binary logarithm (floored) of the number of threads. - /// - /// This means, the number of threads is `2^log_num_threads()`. - pub fn log_num_threads(&self) -> u32 { - log2_floor(*NUM_THREADS) + use crossbeam_channel::{bounded, Receiver, SendError}; + use log::trace; + use once_cell::sync::Lazy; + use yastl::Pool; + + static NUM_THREADS: Lazy = Lazy::new(read_num_threads); + + pub static THREAD_POOL: Lazy = Lazy::new(|| Pool::new(*NUM_THREADS)); + + fn read_num_threads() -> usize { + env::var("EC_GPU_NUM_THREADS") + .ok() + .and_then(|num| num.parse::().ok()) + .unwrap_or_else(num_cpus::get) } - /// Executes a function in a thread and returns a [`Waiter`] immediately. - pub fn compute(&self, f: F) -> Waiter - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - let (sender, receiver) = bounded(1); - - THREAD_POOL.spawn(move || { - let res = f(); - // Best effort. We run it in a separate thread, so the receiver might not exist - // anymore, but that's OK. It only means that we are not interested in the result. - // A message is logged though, as concurrency issues are hard to debug and this might - // help in such cases. - if let Err(SendError(_)) = sender.send(res) { - trace!("Cannot send result"); - } - }); + #[derive(Clone, Default)] + pub struct Worker {} + + impl Worker { + pub fn new() -> Worker { + Worker {} + } + + pub fn log_num_threads(&self) -> u32 { + log2_floor(*NUM_THREADS) + } + + pub fn compute(&self, f: F) -> Waiter + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (sender, receiver) = bounded(1); - Waiter { receiver } + THREAD_POOL.spawn(move || { + let res = f(); + if let Err(SendError(_)) = sender.send(res) { + trace!("Cannot send result"); + } + }); + + Waiter { receiver } + } + + pub fn scope<'a, F, R>(&self, elements: usize, f: F) -> R + where + F: FnOnce(&yastl::Scope<'a>, usize) -> R, + { + let chunk_size = if elements < *NUM_THREADS { + 1 + } else { + elements / *NUM_THREADS + }; + + THREAD_POOL.scoped(|scope| f(scope, chunk_size)) + } + + pub fn scoped<'a, F, R>(&self, f: F) -> R + where + F: FnOnce(&yastl::Scope<'a>) -> R, + { + let (sender, receiver) = bounded(1); + THREAD_POOL.scoped(|s| { + let res = f(s); + sender.send(res).unwrap(); + }); + + receiver.recv().unwrap() + } } - /// Executes a function and returns the result once it is finished. - /// - /// The function gets the [`yastl::Scope`] as well as the `chunk_size` as parameters. THe - /// `chunk_size` is number of elements per thread. - pub fn scope<'a, F, R>(&self, elements: usize, f: F) -> R - where - F: FnOnce(&yastl::Scope<'a>, usize) -> R, - { - let chunk_size = if elements < *NUM_THREADS { - 1 - } else { - elements / *NUM_THREADS - }; - - THREAD_POOL.scoped(|scope| f(scope, chunk_size)) + pub struct Waiter { + receiver: Receiver, } - /// Executes the passed in function, and returns the result once it is finished. - pub fn scoped<'a, F, R>(&self, f: F) -> R - where - F: FnOnce(&yastl::Scope<'a>) -> R, - { - let (sender, receiver) = bounded(1); - THREAD_POOL.scoped(|s| { - let res = f(s); - sender.send(res).unwrap(); - }); + impl Waiter { + pub fn wait(&self) -> T { + self.receiver.recv().unwrap() + } - receiver.recv().unwrap() + pub fn done(val: T) -> Self { + let (sender, receiver) = bounded(1); + sender.send(val).unwrap(); + Waiter { receiver } + } } -} -/// A future that is waiting for a result. -pub struct Waiter { - receiver: Receiver, + pub(crate) fn log2_floor(num: usize) -> u32 { + assert!(num > 0); + let mut pow = 0; + while (1 << (pow + 1)) <= num { + pow += 1; + } + pow + } } -impl Waiter { - /// Wait for the result. - pub fn wait(&self) -> T { - self.receiver.recv().unwrap() +// ════════════════════════════════════════════════════════════════ +// WASM implementation (single-threaded fallback) +// ════════════════════════════════════════════════════════════════ + +#[cfg(target_arch = "wasm32")] +mod wasm { + /// A worker that executes everything on the main thread. + /// + /// WASM does not support spawning OS threads (std::thread::spawn panics). + /// This fallback runs all computations sequentially on the calling thread. + /// Performance is lower than native multi-threaded execution but correctness + /// is preserved — all bellperson operations are safe to run single-threaded. + #[derive(Clone, Default)] + pub struct Worker {} + + impl Worker { + pub fn new() -> Worker { + Worker {} + } + + /// Returns 0 (single-threaded = 2^0 = 1 thread). + pub fn log_num_threads(&self) -> u32 { + 0 + } + + /// Executes the function immediately on the current thread. + pub fn compute(&self, f: F) -> Waiter + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let result = f(); + Waiter { value: Some(result) } + } + + /// Executes with chunk_size = elements (single thread processes everything). + pub fn scope<'a, F, R>(&self, elements: usize, f: F) -> R + where + F: FnOnce(&WasmScope<'a>, usize) -> R, + { + let scope = WasmScope::new(); + f(&scope, elements) + } + + /// Executes the function immediately. + pub fn scoped<'a, F, R>(&self, f: F) -> R + where + F: FnOnce(&WasmScope<'a>) -> R, + { + let scope = WasmScope::new(); + f(&scope) + } } - /// One off sending. - pub fn done(val: T) -> Self { - let (sender, receiver) = bounded(1); - sender.send(val).unwrap(); + /// A minimal scope that executes spawned closures immediately (sequentially). + /// + /// Mimics the `yastl::Scope<'a>` API but runs everything on the current thread. + /// The lifetime parameter matches yastl::Scope's signature for API compatibility. + pub struct WasmScope<'a> { + _marker: std::marker::PhantomData<&'a ()>, + } - Waiter { receiver } + impl<'a> WasmScope<'a> { + fn new() -> Self { + WasmScope { _marker: std::marker::PhantomData } + } + + /// "Spawns" a closure by executing it immediately on the current thread. + pub fn execute(&self, f: F) + where + F: FnOnce() + Send, + { + f(); + } } -} -fn log2_floor(num: usize) -> u32 { - assert!(num > 0); + /// A future that already has its value (computed synchronously). + pub struct Waiter { + value: Option, + } - let mut pow = 0; + impl Waiter { + pub fn wait(&self) -> T + where + T: Clone, + { + self.value.as_ref().expect("Waiter::wait called on empty waiter").clone() + } - while (1 << (pow + 1)) <= num { - pow += 1; + pub fn done(val: T) -> Self { + Waiter { value: Some(val) } + } } - pow + /// A single-threaded "pool" for WASM — executes closures inline. + pub struct WasmPool; + + impl WasmPool { + pub fn scoped<'a, F, R>(&self, f: F) -> R + where + F: FnOnce(&WasmScope<'a>) -> R, + { + let scope = WasmScope::new(); + f(&scope) + } + } + + /// Global "thread pool" for WASM — single-threaded. + pub static THREAD_POOL: WasmPool = WasmPool; + + pub(crate) fn log2_floor(num: usize) -> u32 { + assert!(num > 0); + let mut pow = 0; + while (1 << (pow + 1)) <= num { + pow += 1; + } + pow + } } +// ════════════════════════════════════════════════════════════════ +// Re-exports — callers use Worker, Waiter, THREAD_POOL without +// knowing which implementation is active. +// ════════════════════════════════════════════════════════════════ + +#[cfg(not(target_arch = "wasm32"))] +pub use native::*; + +#[cfg(target_arch = "wasm32")] +pub use wasm::*; + #[cfg(test)] mod tests { use super::*; @@ -147,12 +256,13 @@ mod tests { assert_eq!(log2_floor(8), 3); } + #[cfg(not(target_arch = "wasm32"))] #[test] fn test_read_num_threads() { let num_cpus = num_cpus::get(); temp_env::with_var("EC_GPU_NUM_THREADS", None::<&str>, || { assert_eq!( - read_num_threads(), + native::read_num_threads(), num_cpus, "By default the number of threads matches the number of CPUs." ); @@ -160,7 +270,7 @@ mod tests { temp_env::with_var("EC_GPU_NUM_THREADS", Some("1234"), || { assert_eq!( - read_num_threads(), + native::read_num_threads(), 1234, "Number of threads matches the environment variable." ); From 21fa71ae87a5bda839425bbd911a1a001fe98752 Mon Sep 17 00:00:00 2001 From: Noah Kim Date: Tue, 7 Apr 2026 18:46:05 +0900 Subject: [PATCH 2/2] fix: use RefCell for Waiter to avoid Clone requirement --- ec-gpu-gen/src/threadpool.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/ec-gpu-gen/src/threadpool.rs b/ec-gpu-gen/src/threadpool.rs index 31cc802..74f4515 100644 --- a/ec-gpu-gen/src/threadpool.rs +++ b/ec-gpu-gen/src/threadpool.rs @@ -142,7 +142,7 @@ mod wasm { R: Send + 'static, { let result = f(); - Waiter { value: Some(result) } + Waiter { value: std::cell::RefCell::new(Some(result)) } } /// Executes with chunk_size = elements (single thread processes everything). @@ -188,19 +188,18 @@ mod wasm { /// A future that already has its value (computed synchronously). pub struct Waiter { - value: Option, + value: std::cell::RefCell>, } impl Waiter { - pub fn wait(&self) -> T - where - T: Clone, - { - self.value.as_ref().expect("Waiter::wait called on empty waiter").clone() + /// Takes the value. Panics if called more than once. + pub fn wait(&self) -> T { + self.value.borrow_mut().take() + .expect("Waiter::wait called on already-consumed waiter") } pub fn done(val: T) -> Self { - Waiter { value: Some(val) } + Waiter { value: std::cell::RefCell::new(Some(val)) } } }