diff --git a/src/ossia/network/osc/detail/bundle.hpp b/src/ossia/network/osc/detail/bundle.hpp index d66055e9ec0..5570df9069f 100644 --- a/src/ossia/network/osc/detail/bundle.hpp +++ b/src/ossia/network/osc/detail/bundle.hpp @@ -22,7 +22,7 @@ struct bundle_common_policy void operator()(oscpack::OutboundPacketStream& str, ossia::value& val, const Addr_T& addr) { - if(val = bound_value(addr, val); val.valid()) + if(val = bound_value(addr, std::move(val)); val.valid()) { str << oscpack::BeginMessageN(osc_address(addr)); val.apply(typename OscPolicy::dynamic_policy{{str, addr.get_unit()}}); @@ -111,13 +111,18 @@ try oscpack::OutboundPacketStream str(ret.data.data(), max_osc_message_size); str << oscpack::BeginBundleImmediate(); - ossia::value val; + static thread_local ossia::value val; for(const auto& a : addresses) { auto& param = access_parameter(a); ret.critical |= param.get_critical(); - val = param.value(); - add_element_to_bundle(str, val, param); + + if constexpr(requires { + { a.value } -> std::same_as; + }) + add_element_to_bundle(str, val = a.value, param); + else + add_element_to_bundle(str, val = param.value(), param); } str << oscpack::EndBundle(); ret.data.resize(str.Size()); @@ -150,13 +155,13 @@ std::optional make_bundle( NetworkPolicy add_element_to_bundle, const ossia::net::full_parameter_data& param) try { + static thread_local ossia::value val; bundle ret{ ossia::buffer_pool::instance().acquire(max_osc_message_size), param.critical}; { oscpack::OutboundPacketStream str(ret.data.data(), max_osc_message_size); str << oscpack::BeginBundleImmediate(); - auto val = param.value(); - add_element_to_bundle(str, val, param); + add_element_to_bundle(str, val = param.value(), param); str << oscpack::EndBundle(); ret.data.resize(str.Size()); diff --git a/src/ossia/network/rate_limiter_configuration.hpp b/src/ossia/network/rate_limiter_configuration.hpp new file mode 100644 index 00000000000..8ec4a5713bd --- /dev/null +++ b/src/ossia/network/rate_limiter_configuration.hpp @@ -0,0 +1,25 @@ +#pragma once +#include + +#include + +namespace ossia::net +{ +struct rate_limiter_configuration +{ + using clock = std::chrono::steady_clock; + + // What is the rate limit + std::chrono::milliseconds duration{}; + + // Put things in e.g. OSC bundles + bool bundle{}; + + // When sending, send every parameters from the device, not just the last ones + bool send_all{}; + + // Always send the last sent values on every tick + bool repeat{}; +}; + +} diff --git a/src/ossia/network/rate_limiting_protocol.cpp b/src/ossia/network/rate_limiting_protocol.cpp index 405f843080f..78c1c17be90 100644 --- a/src/ossia/network/rate_limiting_protocol.cpp +++ b/src/ossia/network/rate_limiting_protocol.cpp @@ -1,90 +1,139 @@ // This is an open source non-commercial project. Dear PVS-Studio, please check // it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com #include +#include +#include #include #include #include - #if defined(__cpp_exceptions) namespace ossia::net { +// FIXME refactor with coalescing_queue +// FIXME refactor with sleep_accurate (MIDISync.hpp) +// FIXME refactor with exp wait in audio_spin_mutex + struct rate_limiter { rate_limiting_protocol& self; - void operator()() const noexcept + std::chrono::steady_clock::duration duration; + std::chrono::steady_clock::duration time_to_sleep{duration}; + + std::chrono::steady_clock::time_point sleep_before() { - ossia::set_thread_name("ossia ratelim"); using namespace std::literals; using clock = rate_limiting_protocol::clock; - const auto duration = self.m_duration.load(); - thread_local auto time_to_sleep = duration; - while(self.m_running) + auto prev_time = clock::now(); + if(time_to_sleep > 1ms) + std::this_thread::sleep_for(time_to_sleep); + return prev_time; + } + + void sleep_after(std::chrono::steady_clock::time_point prev_time) + { + using namespace std::literals; + using clock = rate_limiting_protocol::clock; + + auto new_time = clock::now(); + auto observed_duration + = std::chrono::duration_cast(new_time - prev_time); + if(observed_duration > duration) + { + if(observed_duration >= 2 * duration) + time_to_sleep = 0ms; + else + time_to_sleep = 2 * duration - observed_duration; + } + else + { + time_to_sleep = duration; + } + } +}; + +template +struct rate_limiter_concrete : rate_limiter +{ + void send() + { + // TODO find safe way to handle if a parameter is removed + // TODO instead we should do the value filtering in the parameter ... + // but still have to handle cases that can be optimized, such as midi + if constexpr(SendAll) + { + ossia::net::iterate_all_children( + &this->self.m_device->get_root_node(), [&](ossia::net::parameter_base& ptr) { + self.m_threadMessages[&ptr].first = ptr.value(); + }); + } + else { - try { - auto prev_time = clock::now(); - if(time_to_sleep > 1ms) - std::this_thread::sleep_for(time_to_sleep); + std::lock_guard lock{self.m_msgMutex}; + std::swap(self.m_buffer, self.m_userMessages); + } - // TODO find safe way to handle if a parameter is removed - // TODO instead we should do the value filtering in the parameter ... - // but still have to handle cases that can be optimized, such as midi + for(auto& msg : self.m_buffer) + { + if(msg.second.first.valid()) { - std::lock_guard lock{self.m_msgMutex}; - std::swap(self.m_buffer, self.m_userMessages); + self.m_threadMessages[msg.first] = std::move(msg.second); + msg.second.first = ossia::value{}; } + } + } - // Copy newest messages in local map - for(auto& msg : self.m_buffer) + // Push the actual messages + if constexpr(Bundle) + { + // FIXME add a push_bundle overload that takes a std::generator + thread_local std::vector e; + e.clear(); + for(auto& v : self.m_threadMessages) + { + if(v.second.first.valid()) + e.push_back(bundle_element{ + const_cast(v.first), v.second.first}); + } + if(!e.empty()) + self.m_protocol->push_bundle(e); + } + else + { + for(auto& v : self.m_threadMessages) + { + if(v.second.first.valid()) { - if(msg.second.first.valid()) - { - self.m_threadMessages[msg.first] = std::move(msg.second); - msg.second.first = ossia::value{}; - } + self.m_protocol->push(*v.first, v.second.first); } + } + } - // Push the actual messages - for(auto& v : self.m_threadMessages) - { - auto val = v.second.first; - if(val.valid()) - { - self.m_protocol->push(*v.first, v.second.first); - } - } + // Clear both containers (while keeping memory allocated for sent + // messages so that it stays fast) + for(auto& v : self.m_buffer) + if(v.second.first.valid()) + v.second.first = ossia::value{}; - // Clear both containers (while keeping memory allocated for sent - // messages so that it stays fast) - for(auto& v : self.m_buffer) - { - if(v.second.first.valid()) - { - v.second.first = ossia::value{}; - } - } + if constexpr(!Repeat) + { + for(auto& v : self.m_threadMessages) + if(v.second.first.valid()) + v.second.first = ossia::value{}; + } + } - for(auto& v : self.m_threadMessages) - { - if(v.second.first.valid()) - { - v.second.first = ossia::value{}; - } - } - auto new_time = clock::now(); - auto observed_duration = std::chrono::duration_cast( - new_time - prev_time); - if(observed_duration > duration) - { - if(observed_duration >= 2 * duration) - time_to_sleep = 0ms; - else - time_to_sleep = 2 * duration - observed_duration; - } - else - { - time_to_sleep = duration; - } + void operator()() + { + ossia::set_thread_name("ossia ratelim"); + + while(this->self.m_running) + { + try + { + const auto prev_time = this->sleep_before(); + this->send(); + this->sleep_after(prev_time); } catch(...) { @@ -94,17 +143,17 @@ struct rate_limiter }; rate_limiting_protocol::rate_limiting_protocol( - rate_limiting_protocol::duration d, std::unique_ptr arg) + rate_limiter_configuration d, std::unique_ptr arg) : protocol_base{flags{SupportsMultiplex}} - , m_duration{d} + , m_duration{d.duration} + , m_bundle{d.bundle} + , m_repeat{d.repeat} + , m_send_all{d.send_all} , m_protocol{std::move(arg)} - { m_userMessages.reserve(4096); m_buffer.reserve(4096); m_threadMessages.reserve(4096); - m_lastTime = clock::now(); - m_thread = std::thread{rate_limiter{*this}}; } rate_limiting_protocol::~rate_limiting_protocol() @@ -116,6 +165,7 @@ rate_limiting_protocol::~rate_limiting_protocol() void rate_limiting_protocol::set_duration(rate_limiting_protocol::duration d) { m_duration = d; + // FIXME update thread } bool rate_limiting_protocol::pull(ossia::net::parameter_base& address) @@ -126,6 +176,9 @@ bool rate_limiting_protocol::pull(ossia::net::parameter_base& address) bool rate_limiting_protocol::push( const ossia::net::parameter_base& address, const ossia::value& v) { + if(m_send_all) + return true; + std::lock_guard lock{m_msgMutex}; m_userMessages[&address] = {v, {}}; return true; @@ -133,6 +186,7 @@ bool rate_limiting_protocol::push( bool rate_limiting_protocol::push_raw(const full_parameter_data& address) { + // FIXME return m_protocol->push_raw(address); } @@ -165,7 +219,38 @@ void rate_limiting_protocol::set_device(device_base& dev) { m_device = &dev; m_protocol->set_device(dev); + m_lastTime = clock::now(); + uint8_t flags = 0; + flags |= m_send_all ? 0b001 : 0b000; + flags |= m_repeat ? 0b010 : 0b000; + flags |= m_bundle ? 0b100 : 0b000; + switch(flags) + { + case 0b000: + m_thread = std::thread{rate_limiter_concrete<0, 0, 0>{*this, m_duration}}; + break; + case 0b001: + m_thread = std::thread{rate_limiter_concrete<0, 0, 1>{*this, m_duration}}; + break; + case 0b010: + m_thread = std::thread{rate_limiter_concrete<0, 1, 0>{*this, m_duration}}; + break; + case 0b011: + m_thread = std::thread{rate_limiter_concrete<0, 1, 1>{*this, m_duration}}; + break; + case 0b100: + m_thread = std::thread{rate_limiter_concrete<1, 0, 0>{*this, m_duration}}; + break; + case 0b101: + m_thread = std::thread{rate_limiter_concrete<1, 0, 1>{*this, m_duration}}; + break; + case 0b110: + m_thread = std::thread{rate_limiter_concrete<1, 1, 0>{*this, m_duration}}; + break; + case 0b111: + m_thread = std::thread{rate_limiter_concrete<1, 1, 1>{*this, m_duration}}; + break; + } } - } #endif diff --git a/src/ossia/network/rate_limiting_protocol.hpp b/src/ossia/network/rate_limiting_protocol.hpp index 64dddfa7bc7..28b226c755d 100644 --- a/src/ossia/network/rate_limiting_protocol.hpp +++ b/src/ossia/network/rate_limiting_protocol.hpp @@ -4,22 +4,29 @@ #include #include #include +#include #include #include #include -#include namespace ossia::net { struct rate_limiter; + +template +struct rate_limiter_impl; +template +struct rate_limiter_concrete; + class OSSIA_EXPORT rate_limiting_protocol final : public ossia::net::protocol_base { public: - using clock = std::chrono::high_resolution_clock; + using clock = std::chrono::steady_clock; using duration = clock::duration; - rate_limiting_protocol(duration d, std::unique_ptr arg); + rate_limiting_protocol( + rate_limiter_configuration conf, std::unique_ptr arg); ~rate_limiting_protocol() override; void set_duration(duration d); @@ -46,8 +53,15 @@ class OSSIA_EXPORT rate_limiting_protocol final : public ossia::net::protocol_ba rate_limiting_protocol& operator=(rate_limiting_protocol&&) = delete; friend struct rate_limiter; + template + friend struct rate_limiter_impl; + template + friend struct rate_limiter_concrete; std::atomic m_duration{}; + bool m_bundle{}; + bool m_send_all{}; + bool m_repeat{}; std::unique_ptr m_protocol; ossia::net::device_base* m_device{};