Skip to content

Commit db740f7

Browse files
author
Grok Compression
committed
scheduling: in single threaded mode, use one inline executor per calling thread
1 parent 3eb845e commit db740f7

4 files changed

Lines changed: 96 additions & 1 deletion

File tree

src/lib/core/codestream/decompress/CodeStreamDecompress.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <chrono>
1919
#include <cmath>
2020
#include <functional>
21+
#include <optional>
2122

2223
#include "TFSingleton.h"
2324
#include "grk_fseek.h"
@@ -132,6 +133,12 @@ CodeStreamDecompress::CodeStreamDecompress(IStream* stream)
132133
tileMarkerParsers_.resize(TFSingleton::num_threads());
133134
std::generate(tileMarkerParsers_.begin(), tileMarkerParsers_.end(),
134135
[]() { return std::make_unique<MarkerParser>(); });
136+
137+
// Single-threaded mode: give this codec its own inline (0-worker) executor so
138+
// concurrent decodes run on their own thread instead of contending on the
139+
// global singleton. Multi-threaded mode keeps using the shared pool.
140+
if(TFSingleton::isSingleThreaded())
141+
localExecutor_ = std::make_unique<tf::Executor>(0);
135142
}
136143
void CodeStreamDecompress::init(grk_decompress_parameters* parameters)
137144
{
@@ -166,6 +173,12 @@ void CodeStreamDecompress::setBandCallback(grk_io_band_callback callback, void*
166173

167174
bool CodeStreamDecompress::decompress(grk_plugin_tile* tile)
168175
{
176+
// Route all scheduling/wavelet work onto this codec's own executor while
177+
// decoding (single-threaded mode only; no-op when localExecutor_ is null).
178+
std::optional<TFSingleton::ScopedExecutor> scopedExec;
179+
if(localExecutor_)
180+
scopedExec.emplace(localExecutor_.get(), 1);
181+
169182
current_plugin_tile = tile;
170183

171184
// GPU plugin T2-only fast path: skip heavy infrastructure
@@ -928,6 +941,12 @@ bool CodeStreamDecompress::sequentialParseAndSchedule(bool multiTile)
928941

929942
bool CodeStreamDecompress::decompressTile(uint16_t tileIndex)
930943
{
944+
// Route all scheduling/wavelet work onto this codec's own executor while
945+
// decoding (single-threaded mode only; no-op when localExecutor_ is null).
946+
std::optional<TFSingleton::ScopedExecutor> scopedExec;
947+
if(localExecutor_)
948+
scopedExec.emplace(localExecutor_.get(), 1);
949+
931950
multiTileComposite_->postReadHeader(&cp_);
932951

933952
// 1. sanity check on tile index

src/lib/core/codestream/decompress/CodeStreamDecompress.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ class CodeStreamDecompress final : public CodeStream, public IDecompressor
5353
decompressConsumer_.join();
5454
if(decompressWorker_.joinable())
5555
decompressWorker_.join();
56-
TFSingleton::get().wait_for_all();
56+
// Wait on this codec's own executor when it owns one (single-threaded mode);
57+
// the decode scope has already exited so the thread-local override is gone.
58+
if(localExecutor_)
59+
localExecutor_->wait_for_all();
60+
else
61+
TFSingleton::get().wait_for_all();
5762
}
5863

5964
void init(grk_decompress_parameters* param) override;
@@ -590,6 +595,16 @@ class CodeStreamDecompress final : public CodeStream, public IDecompressor
590595

591596
std::vector<std::unique_ptr<MarkerParser>> tileMarkerParsers_;
592597

598+
/**
599+
* @brief Per-codec inline executor, owned only in single-threaded mode.
600+
*
601+
* When set, the decode runs on this executor (activated via
602+
* TFSingleton::ScopedExecutor for the duration of decompress/decompressTile)
603+
* instead of the process-global singleton, so concurrent single-threaded
604+
* decodes are independent and lock-free. Null in multi-threaded mode.
605+
*/
606+
std::unique_ptr<tf::Executor> localExecutor_;
607+
593608
/**
594609
* @brief global HT flag
595610
*

src/lib/core/scheduling/CodecScheduler.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ struct ITileProcessor;
6161
std::unique_ptr<tf::Executor> TFSingleton::instance_ = nullptr;
6262
std::mutex TFSingleton::mutex_;
6363
size_t TFSingleton::numThreads_;
64+
thread_local tf::Executor* TFSingleton::tlsExec_ = nullptr;
65+
thread_local size_t TFSingleton::tlsNumThreads_ = 0;
66+
thread_local bool TFSingleton::tlsActive_ = false;
6467

6568
namespace grk
6669
{

src/lib/core/scheduling/TFSingleton.h

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,15 @@ class TFSingleton
5252
/**
5353
* @brief Gets current instance of the Singleton (creates with full hardware concurrency if null)
5454
* @return Taskflow Executor
55+
*
56+
* When a per-codec executor is active on this thread (see ScopedExecutor)
57+
* it is returned lock-free, so concurrent single-threaded decodes do not
58+
* contend on mutex_. Otherwise the process-global executor is used.
5559
*/
5660
static tf::Executor& get(void)
5761
{
62+
if(tlsActive_)
63+
return *tlsExec_;
5864
std::lock_guard<std::mutex> lock(mutex_);
5965
if(!instance_)
6066
{
@@ -73,6 +79,8 @@ class TFSingleton
7379
*/
7480
static size_t num_threads()
7581
{
82+
if(tlsActive_)
83+
return tlsNumThreads_;
7684
std::lock_guard<std::mutex> lock(mutex_);
7785
return numThreads_;
7886
}
@@ -82,6 +90,8 @@ class TFSingleton
8290
*/
8391
static bool isSingleThreaded()
8492
{
93+
if(tlsActive_)
94+
return tlsNumThreads_ == 1;
8595
std::lock_guard<std::mutex> lock(mutex_);
8696
return numThreads_ == 1;
8797
}
@@ -107,6 +117,39 @@ class TFSingleton
107117
return (id >= 0) ? (uint32_t)id : 0u;
108118
}
109119

120+
/**
121+
* @brief Scoped override that makes get()/num_threads()/workerId() resolve to a
122+
* caller-owned executor on the current thread instead of the global singleton.
123+
*
124+
* Used in single-threaded mode so each codec runs on its own inline executor:
125+
* concurrent decodes become independent and lock-free. Saves and restores the
126+
* previous thread-local values, so nesting (e.g. overviews) is safe.
127+
*/
128+
class ScopedExecutor
129+
{
130+
public:
131+
ScopedExecutor(tf::Executor* exec, size_t numThreads)
132+
: prevExec_(tlsExec_), prevNumThreads_(tlsNumThreads_), prevActive_(tlsActive_)
133+
{
134+
tlsExec_ = exec;
135+
tlsNumThreads_ = numThreads;
136+
tlsActive_ = true;
137+
}
138+
~ScopedExecutor()
139+
{
140+
tlsExec_ = prevExec_;
141+
tlsNumThreads_ = prevNumThreads_;
142+
tlsActive_ = prevActive_;
143+
}
144+
ScopedExecutor(const ScopedExecutor&) = delete;
145+
ScopedExecutor& operator=(const ScopedExecutor&) = delete;
146+
147+
private:
148+
tf::Executor* prevExec_;
149+
size_t prevNumThreads_;
150+
bool prevActive_;
151+
};
152+
110153
private:
111154
// Deleted copy constructor and assignment operator
112155
TFSingleton(const TFSingleton&) = delete;
@@ -130,4 +173,19 @@ class TFSingleton
130173
* @brief total number of threads
131174
*/
132175
static size_t numThreads_;
176+
177+
/**
178+
* @brief Per-thread override executor (set by ScopedExecutor), or nullptr.
179+
*/
180+
static thread_local tf::Executor* tlsExec_;
181+
182+
/**
183+
* @brief Thread count reported while a ScopedExecutor override is active.
184+
*/
185+
static thread_local size_t tlsNumThreads_;
186+
187+
/**
188+
* @brief True while a ScopedExecutor override is active on this thread.
189+
*/
190+
static thread_local bool tlsActive_;
133191
};

0 commit comments

Comments
 (0)