From 7b2380c4a9ef63f58ceffda35a86f8afe2ae499b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Micha=C3=ABl=20Celerier?= Date: Wed, 15 Jan 2025 22:57:15 -0500 Subject: [PATCH 1/2] osc: work on adding more features to rate limiter --- .../network/rate_limiter_configuration.hpp | 25 ++ src/ossia/network/rate_limiting_protocol.cpp | 229 ++++++++++++------ src/ossia/network/rate_limiting_protocol.hpp | 20 +- 3 files changed, 197 insertions(+), 77 deletions(-) create mode 100644 src/ossia/network/rate_limiter_configuration.hpp diff --git a/src/ossia/network/rate_limiter_configuration.hpp b/src/ossia/network/rate_limiter_configuration.hpp new file mode 100644 index 00000000000..8ec4a5713bd --- /dev/null +++ b/src/ossia/network/rate_limiter_configuration.hpp @@ -0,0 +1,25 @@ +#pragma once +#include + +#include + +namespace ossia::net +{ +struct rate_limiter_configuration +{ + using clock = std::chrono::steady_clock; + + // What is the rate limit + std::chrono::milliseconds duration{}; + + // Put things in e.g. OSC bundles + bool bundle{}; + + // When sending, send every parameters from the device, not just the last ones + bool send_all{}; + + // Always send the last sent values on every tick + bool repeat{}; +}; + +} diff --git a/src/ossia/network/rate_limiting_protocol.cpp b/src/ossia/network/rate_limiting_protocol.cpp index 405f843080f..ae00fc6597a 100644 --- a/src/ossia/network/rate_limiting_protocol.cpp +++ b/src/ossia/network/rate_limiting_protocol.cpp @@ -8,83 +8,162 @@ #if defined(__cpp_exceptions) namespace ossia::net { +// FIXME refactor with coalescing_queue +// FIXME refactor with sleep_accurate (MIDISync.hpp) +// FIXME refactor with exp wait in audio_spin_mutex + struct rate_limiter { rate_limiting_protocol& self; - void operator()() const noexcept + std::chrono::steady_clock::duration duration; + std::chrono::steady_clock::duration time_to_sleep{duration}; + + std::chrono::steady_clock::time_point sleep_before() { - ossia::set_thread_name("ossia ratelim"); using namespace std::literals; using clock = rate_limiting_protocol::clock; - const auto duration = self.m_duration.load(); - thread_local auto time_to_sleep = duration; - while(self.m_running) + auto prev_time = clock::now(); + if(time_to_sleep > 1ms) + std::this_thread::sleep_for(time_to_sleep); + return prev_time; + } + + void sleep_after(std::chrono::steady_clock::time_point prev_time) + { + using namespace std::literals; + using clock = rate_limiting_protocol::clock; + + auto new_time = clock::now(); + auto observed_duration + = std::chrono::duration_cast(new_time - prev_time); + if(observed_duration > duration) + { + if(observed_duration >= 2 * duration) + time_to_sleep = 0ms; + else + time_to_sleep = 2 * duration - observed_duration; + } + else + { + time_to_sleep = duration; + } + } +}; + +template +struct rate_limiter_impl; + +template <> +struct rate_limiter_impl : rate_limiter +{ + void operator()() + { + // TODO find safe way to handle if a parameter is removed + // TODO instead we should do the value filtering in the parameter ... + // but still have to handle cases that can be optimized, such as midi + { + std::lock_guard lock{self.m_msgMutex}; + std::swap(self.m_buffer, self.m_userMessages); + } + + // Copy newest messages in local map + for(auto& msg : self.m_buffer) + { + if(msg.second.first.valid()) + { + self.m_threadMessages[msg.first] = std::move(msg.second); + msg.second.first = ossia::value{}; + } + } + + // Push the actual messages + for(auto& v : self.m_threadMessages) + { + auto val = v.second.first; + if(val.valid()) + { + self.m_protocol->push(*v.first, v.second.first); + } + } + + // Clear both containers (while keeping memory allocated for sent + // messages so that it stays fast) + for(auto& v : self.m_buffer) + if(v.second.first.valid()) + v.second.first = ossia::value{}; + + for(auto& v : self.m_threadMessages) + if(v.second.first.valid()) + v.second.first = ossia::value{}; + } +}; + +template <> +struct rate_limiter_impl : rate_limiter +{ + void operator()() + { + std::vector> vec; + ossia::iterate_all_children(this->self.m_device->get_root_node()); + + // bundle + // send all + // repeat + + // TODO find safe way to handle if a parameter is removed + // TODO instead we should do the value filtering in the parameter ... + // but still have to handle cases that can be optimized, such as midi + { + std::lock_guard lock{self.m_msgMutex}; + std::swap(self.m_buffer, self.m_userMessages); + } + + // Copy newest messages in local map + for(auto& msg : self.m_buffer) + { + if(msg.second.first.valid()) + { + self.m_threadMessages[msg.first] = std::move(msg.second); + msg.second.first = ossia::value{}; + } + } + + // Push the actual messages + for(auto& v : self.m_threadMessages) + { + auto val = v.second.first; + if(val.valid()) + { + self.m_protocol->push(*v.first, v.second.first); + } + } + + // Clear both containers (while keeping memory allocated for sent + // messages so that it stays fast) + for(auto& v : self.m_buffer) + if(v.second.first.valid()) + v.second.first = ossia::value{}; + + for(auto& v : self.m_threadMessages) + if(v.second.first.valid()) + v.second.first = ossia::value{}; + } +}; + +template +struct rate_limiter_concrete : rate_limiter_impl +{ + void operator()() + { + ossia::set_thread_name("ossia ratelim"); + + while(this->self.m_running) { try { - auto prev_time = clock::now(); - if(time_to_sleep > 1ms) - std::this_thread::sleep_for(time_to_sleep); - - // TODO find safe way to handle if a parameter is removed - // TODO instead we should do the value filtering in the parameter ... - // but still have to handle cases that can be optimized, such as midi - { - std::lock_guard lock{self.m_msgMutex}; - std::swap(self.m_buffer, self.m_userMessages); - } - - // Copy newest messages in local map - for(auto& msg : self.m_buffer) - { - if(msg.second.first.valid()) - { - self.m_threadMessages[msg.first] = std::move(msg.second); - msg.second.first = ossia::value{}; - } - } - - // Push the actual messages - for(auto& v : self.m_threadMessages) - { - auto val = v.second.first; - if(val.valid()) - { - self.m_protocol->push(*v.first, v.second.first); - } - } - - // Clear both containers (while keeping memory allocated for sent - // messages so that it stays fast) - for(auto& v : self.m_buffer) - { - if(v.second.first.valid()) - { - v.second.first = ossia::value{}; - } - } - - for(auto& v : self.m_threadMessages) - { - if(v.second.first.valid()) - { - v.second.first = ossia::value{}; - } - } - auto new_time = clock::now(); - auto observed_duration = std::chrono::duration_cast( - new_time - prev_time); - if(observed_duration > duration) - { - if(observed_duration >= 2 * duration) - time_to_sleep = 0ms; - else - time_to_sleep = 2 * duration - observed_duration; - } - else - { - time_to_sleep = duration; - } + const auto prev_time = this->sleep_before(); + rate_limiter_impl::operator()(); + sleep_after(prev_time); } catch(...) { @@ -94,17 +173,17 @@ struct rate_limiter }; rate_limiting_protocol::rate_limiting_protocol( - rate_limiting_protocol::duration d, std::unique_ptr arg) + rate_limiter_configuration d, std::unique_ptr arg) : protocol_base{flags{SupportsMultiplex}} - , m_duration{d} + , m_duration{d.duration} + , m_bundle{d.bundle} + , m_repeat{d.repeat} + , m_send_all{d.send_all} , m_protocol{std::move(arg)} - { m_userMessages.reserve(4096); m_buffer.reserve(4096); m_threadMessages.reserve(4096); - m_lastTime = clock::now(); - m_thread = std::thread{rate_limiter{*this}}; } rate_limiting_protocol::~rate_limiting_protocol() @@ -116,6 +195,7 @@ rate_limiting_protocol::~rate_limiting_protocol() void rate_limiting_protocol::set_duration(rate_limiting_protocol::duration d) { m_duration = d; + // FIXME update thread } bool rate_limiting_protocol::pull(ossia::net::parameter_base& address) @@ -165,7 +245,8 @@ void rate_limiting_protocol::set_device(device_base& dev) { m_device = &dev; m_protocol->set_device(dev); + m_lastTime = clock::now(); + m_thread = std::thread{rate_limiter_concrete{*this, m_duration}}; } - } #endif diff --git a/src/ossia/network/rate_limiting_protocol.hpp b/src/ossia/network/rate_limiting_protocol.hpp index 64dddfa7bc7..28b226c755d 100644 --- a/src/ossia/network/rate_limiting_protocol.hpp +++ b/src/ossia/network/rate_limiting_protocol.hpp @@ -4,22 +4,29 @@ #include #include #include +#include #include #include #include -#include namespace ossia::net { struct rate_limiter; + +template +struct rate_limiter_impl; +template +struct rate_limiter_concrete; + class OSSIA_EXPORT rate_limiting_protocol final : public ossia::net::protocol_base { public: - using clock = std::chrono::high_resolution_clock; + using clock = std::chrono::steady_clock; using duration = clock::duration; - rate_limiting_protocol(duration d, std::unique_ptr arg); + rate_limiting_protocol( + rate_limiter_configuration conf, std::unique_ptr arg); ~rate_limiting_protocol() override; void set_duration(duration d); @@ -46,8 +53,15 @@ class OSSIA_EXPORT rate_limiting_protocol final : public ossia::net::protocol_ba rate_limiting_protocol& operator=(rate_limiting_protocol&&) = delete; friend struct rate_limiter; + template + friend struct rate_limiter_impl; + template + friend struct rate_limiter_concrete; std::atomic m_duration{}; + bool m_bundle{}; + bool m_send_all{}; + bool m_repeat{}; std::unique_ptr m_protocol; ossia::net::device_base* m_device{}; From 11d007443223257c194f46565aaecdb0d35c4781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Micha=C3=ABl=20Celerier?= Date: Sun, 23 Mar 2025 09:54:43 -0400 Subject: [PATCH 2/2] rate limiter: implement all the configuration options --- src/ossia/network/osc/detail/bundle.hpp | 17 ++- src/ossia/network/rate_limiting_protocol.cpp | 152 ++++++++++--------- 2 files changed, 89 insertions(+), 80 deletions(-) diff --git a/src/ossia/network/osc/detail/bundle.hpp b/src/ossia/network/osc/detail/bundle.hpp index d66055e9ec0..5570df9069f 100644 --- a/src/ossia/network/osc/detail/bundle.hpp +++ b/src/ossia/network/osc/detail/bundle.hpp @@ -22,7 +22,7 @@ struct bundle_common_policy void operator()(oscpack::OutboundPacketStream& str, ossia::value& val, const Addr_T& addr) { - if(val = bound_value(addr, val); val.valid()) + if(val = bound_value(addr, std::move(val)); val.valid()) { str << oscpack::BeginMessageN(osc_address(addr)); val.apply(typename OscPolicy::dynamic_policy{{str, addr.get_unit()}}); @@ -111,13 +111,18 @@ try oscpack::OutboundPacketStream str(ret.data.data(), max_osc_message_size); str << oscpack::BeginBundleImmediate(); - ossia::value val; + static thread_local ossia::value val; for(const auto& a : addresses) { auto& param = access_parameter(a); ret.critical |= param.get_critical(); - val = param.value(); - add_element_to_bundle(str, val, param); + + if constexpr(requires { + { a.value } -> std::same_as; + }) + add_element_to_bundle(str, val = a.value, param); + else + add_element_to_bundle(str, val = param.value(), param); } str << oscpack::EndBundle(); ret.data.resize(str.Size()); @@ -150,13 +155,13 @@ std::optional make_bundle( NetworkPolicy add_element_to_bundle, const ossia::net::full_parameter_data& param) try { + static thread_local ossia::value val; bundle ret{ ossia::buffer_pool::instance().acquire(max_osc_message_size), param.critical}; { oscpack::OutboundPacketStream str(ret.data.data(), max_osc_message_size); str << oscpack::BeginBundleImmediate(); - auto val = param.value(); - add_element_to_bundle(str, val, param); + add_element_to_bundle(str, val = param.value(), param); str << oscpack::EndBundle(); ret.data.resize(str.Size()); diff --git a/src/ossia/network/rate_limiting_protocol.cpp b/src/ossia/network/rate_limiting_protocol.cpp index ae00fc6597a..78c1c17be90 100644 --- a/src/ossia/network/rate_limiting_protocol.cpp +++ b/src/ossia/network/rate_limiting_protocol.cpp @@ -1,10 +1,11 @@ // This is an open source non-commercial project. Dear PVS-Studio, please check // it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com #include +#include +#include #include #include #include - #if defined(__cpp_exceptions) namespace ossia::net { @@ -51,90 +52,60 @@ struct rate_limiter }; template -struct rate_limiter_impl; - -template <> -struct rate_limiter_impl : rate_limiter +struct rate_limiter_concrete : rate_limiter { - void operator()() + void send() { // TODO find safe way to handle if a parameter is removed // TODO instead we should do the value filtering in the parameter ... // but still have to handle cases that can be optimized, such as midi + if constexpr(SendAll) { - std::lock_guard lock{self.m_msgMutex}; - std::swap(self.m_buffer, self.m_userMessages); + ossia::net::iterate_all_children( + &this->self.m_device->get_root_node(), [&](ossia::net::parameter_base& ptr) { + self.m_threadMessages[&ptr].first = ptr.value(); + }); } - - // Copy newest messages in local map - for(auto& msg : self.m_buffer) + else { - if(msg.second.first.valid()) { - self.m_threadMessages[msg.first] = std::move(msg.second); - msg.second.first = ossia::value{}; + std::lock_guard lock{self.m_msgMutex}; + std::swap(self.m_buffer, self.m_userMessages); } - } - // Push the actual messages - for(auto& v : self.m_threadMessages) - { - auto val = v.second.first; - if(val.valid()) + for(auto& msg : self.m_buffer) { - self.m_protocol->push(*v.first, v.second.first); + if(msg.second.first.valid()) + { + self.m_threadMessages[msg.first] = std::move(msg.second); + msg.second.first = ossia::value{}; + } } } - // Clear both containers (while keeping memory allocated for sent - // messages so that it stays fast) - for(auto& v : self.m_buffer) - if(v.second.first.valid()) - v.second.first = ossia::value{}; - - for(auto& v : self.m_threadMessages) - if(v.second.first.valid()) - v.second.first = ossia::value{}; - } -}; - -template <> -struct rate_limiter_impl : rate_limiter -{ - void operator()() - { - std::vector> vec; - ossia::iterate_all_children(this->self.m_device->get_root_node()); - - // bundle - // send all - // repeat - - // TODO find safe way to handle if a parameter is removed - // TODO instead we should do the value filtering in the parameter ... - // but still have to handle cases that can be optimized, such as midi - { - std::lock_guard lock{self.m_msgMutex}; - std::swap(self.m_buffer, self.m_userMessages); - } - - // Copy newest messages in local map - for(auto& msg : self.m_buffer) + // Push the actual messages + if constexpr(Bundle) { - if(msg.second.first.valid()) + // FIXME add a push_bundle overload that takes a std::generator + thread_local std::vector e; + e.clear(); + for(auto& v : self.m_threadMessages) { - self.m_threadMessages[msg.first] = std::move(msg.second); - msg.second.first = ossia::value{}; + if(v.second.first.valid()) + e.push_back(bundle_element{ + const_cast(v.first), v.second.first}); } + if(!e.empty()) + self.m_protocol->push_bundle(e); } - - // Push the actual messages - for(auto& v : self.m_threadMessages) + else { - auto val = v.second.first; - if(val.valid()) + for(auto& v : self.m_threadMessages) { - self.m_protocol->push(*v.first, v.second.first); + if(v.second.first.valid()) + { + self.m_protocol->push(*v.first, v.second.first); + } } } @@ -144,15 +115,14 @@ struct rate_limiter_impl : rate_limiter if(v.second.first.valid()) v.second.first = ossia::value{}; - for(auto& v : self.m_threadMessages) - if(v.second.first.valid()) - v.second.first = ossia::value{}; + if constexpr(!Repeat) + { + for(auto& v : self.m_threadMessages) + if(v.second.first.valid()) + v.second.first = ossia::value{}; + } } -}; -template -struct rate_limiter_concrete : rate_limiter_impl -{ void operator()() { ossia::set_thread_name("ossia ratelim"); @@ -162,8 +132,8 @@ struct rate_limiter_concrete : rate_limiter_impl try { const auto prev_time = this->sleep_before(); - rate_limiter_impl::operator()(); - sleep_after(prev_time); + this->send(); + this->sleep_after(prev_time); } catch(...) { @@ -206,6 +176,9 @@ bool rate_limiting_protocol::pull(ossia::net::parameter_base& address) bool rate_limiting_protocol::push( const ossia::net::parameter_base& address, const ossia::value& v) { + if(m_send_all) + return true; + std::lock_guard lock{m_msgMutex}; m_userMessages[&address] = {v, {}}; return true; @@ -213,6 +186,7 @@ bool rate_limiting_protocol::push( bool rate_limiting_protocol::push_raw(const full_parameter_data& address) { + // FIXME return m_protocol->push_raw(address); } @@ -246,7 +220,37 @@ void rate_limiting_protocol::set_device(device_base& dev) m_device = &dev; m_protocol->set_device(dev); m_lastTime = clock::now(); - m_thread = std::thread{rate_limiter_concrete{*this, m_duration}}; + uint8_t flags = 0; + flags |= m_send_all ? 0b001 : 0b000; + flags |= m_repeat ? 0b010 : 0b000; + flags |= m_bundle ? 0b100 : 0b000; + switch(flags) + { + case 0b000: + m_thread = std::thread{rate_limiter_concrete<0, 0, 0>{*this, m_duration}}; + break; + case 0b001: + m_thread = std::thread{rate_limiter_concrete<0, 0, 1>{*this, m_duration}}; + break; + case 0b010: + m_thread = std::thread{rate_limiter_concrete<0, 1, 0>{*this, m_duration}}; + break; + case 0b011: + m_thread = std::thread{rate_limiter_concrete<0, 1, 1>{*this, m_duration}}; + break; + case 0b100: + m_thread = std::thread{rate_limiter_concrete<1, 0, 0>{*this, m_duration}}; + break; + case 0b101: + m_thread = std::thread{rate_limiter_concrete<1, 0, 1>{*this, m_duration}}; + break; + case 0b110: + m_thread = std::thread{rate_limiter_concrete<1, 1, 0>{*this, m_duration}}; + break; + case 0b111: + m_thread = std::thread{rate_limiter_concrete<1, 1, 1>{*this, m_duration}}; + break; + } } } #endif