Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/ossia/network/osc/detail/bundle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct bundle_common_policy
void
operator()(oscpack::OutboundPacketStream& str, ossia::value& val, const Addr_T& addr)
{
if(val = bound_value(addr, val); val.valid())
if(val = bound_value(addr, std::move(val)); val.valid())
{
str << oscpack::BeginMessageN(osc_address(addr));
val.apply(typename OscPolicy::dynamic_policy{{str, addr.get_unit()}});
Expand Down Expand Up @@ -111,13 +111,18 @@ try
oscpack::OutboundPacketStream str(ret.data.data(), max_osc_message_size);
str << oscpack::BeginBundleImmediate();

ossia::value val;
static thread_local ossia::value val;
for(const auto& a : addresses)
{
auto& param = access_parameter(a);
ret.critical |= param.get_critical();
val = param.value();
add_element_to_bundle(str, val, param);

if constexpr(requires {
{ a.value } -> std::same_as<ossia::value>;
})
add_element_to_bundle(str, val = a.value, param);
else
add_element_to_bundle(str, val = param.value(), param);
}
str << oscpack::EndBundle();
ret.data.resize(str.Size());
Expand Down Expand Up @@ -150,13 +155,13 @@ std::optional<bundle> make_bundle(
NetworkPolicy add_element_to_bundle, const ossia::net::full_parameter_data& param)
try
{
static thread_local ossia::value val;
bundle ret{
ossia::buffer_pool::instance().acquire(max_osc_message_size), param.critical};
{
oscpack::OutboundPacketStream str(ret.data.data(), max_osc_message_size);
str << oscpack::BeginBundleImmediate();
auto val = param.value();
add_element_to_bundle(str, val, param);
add_element_to_bundle(str, val = param.value(), param);
str << oscpack::EndBundle();
ret.data.resize(str.Size());

Expand Down
25 changes: 25 additions & 0 deletions src/ossia/network/rate_limiter_configuration.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once
#include <ossia/detail/config.hpp>

#include <chrono>

namespace ossia::net
{
struct rate_limiter_configuration
{
using clock = std::chrono::steady_clock;

// What is the rate limit
std::chrono::milliseconds duration{};

// Put things in e.g. OSC bundles
bool bundle{};

// When sending, send every parameters from the device, not just the last ones
bool send_all{};

// Always send the last sent values on every tick
bool repeat{};
};

}
219 changes: 152 additions & 67 deletions src/ossia/network/rate_limiting_protocol.cpp
Original file line number Diff line number Diff line change
@@ -1,90 +1,139 @@
// This is an open source non-commercial project. Dear PVS-Studio, please check
// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
#include <ossia/detail/thread.hpp>
#include <ossia/network/base/bundle.hpp>
#include <ossia/network/base/node_functions.hpp>
#include <ossia/network/generic/generic_device.hpp>
#include <ossia/network/generic/generic_parameter.hpp>
#include <ossia/network/rate_limiting_protocol.hpp>

#if defined(__cpp_exceptions)
namespace ossia::net
{
// FIXME refactor with coalescing_queue
// FIXME refactor with sleep_accurate (MIDISync.hpp)
// FIXME refactor with exp wait in audio_spin_mutex

struct rate_limiter
{
rate_limiting_protocol& self;
void operator()() const noexcept
std::chrono::steady_clock::duration duration;
std::chrono::steady_clock::duration time_to_sleep{duration};

std::chrono::steady_clock::time_point sleep_before()
{
ossia::set_thread_name("ossia ratelim");
using namespace std::literals;
using clock = rate_limiting_protocol::clock;
const auto duration = self.m_duration.load();
thread_local auto time_to_sleep = duration;
while(self.m_running)
auto prev_time = clock::now();
if(time_to_sleep > 1ms)
std::this_thread::sleep_for(time_to_sleep);
return prev_time;
}

void sleep_after(std::chrono::steady_clock::time_point prev_time)
{
using namespace std::literals;
using clock = rate_limiting_protocol::clock;

auto new_time = clock::now();
auto observed_duration
= std::chrono::duration_cast<std::chrono::milliseconds>(new_time - prev_time);
if(observed_duration > duration)
{
if(observed_duration >= 2 * duration)
time_to_sleep = 0ms;
else
time_to_sleep = 2 * duration - observed_duration;
}
else
{
time_to_sleep = duration;
}
}
};

template <bool Bundle, bool Repeat, bool SendAll>
struct rate_limiter_concrete : rate_limiter
{
void send()
{
// TODO find safe way to handle if a parameter is removed
// TODO instead we should do the value filtering in the parameter ...
// but still have to handle cases that can be optimized, such as midi
if constexpr(SendAll)
{
ossia::net::iterate_all_children(
&this->self.m_device->get_root_node(), [&](ossia::net::parameter_base& ptr) {
self.m_threadMessages[&ptr].first = ptr.value();
});
}
else
{
try
{
auto prev_time = clock::now();
if(time_to_sleep > 1ms)
std::this_thread::sleep_for(time_to_sleep);
std::lock_guard lock{self.m_msgMutex};
std::swap(self.m_buffer, self.m_userMessages);
}

// TODO find safe way to handle if a parameter is removed
// TODO instead we should do the value filtering in the parameter ...
// but still have to handle cases that can be optimized, such as midi
for(auto& msg : self.m_buffer)
{
if(msg.second.first.valid())
{
std::lock_guard lock{self.m_msgMutex};
std::swap(self.m_buffer, self.m_userMessages);
self.m_threadMessages[msg.first] = std::move(msg.second);
msg.second.first = ossia::value{};
}
}
}

// Copy newest messages in local map
for(auto& msg : self.m_buffer)
// Push the actual messages
if constexpr(Bundle)
{
// FIXME add a push_bundle overload that takes a std::generator
thread_local std::vector<bundle_element> e;
e.clear();
for(auto& v : self.m_threadMessages)
{
if(v.second.first.valid())
e.push_back(bundle_element{
const_cast<ossia::net::parameter_base*>(v.first), v.second.first});
}
if(!e.empty())
self.m_protocol->push_bundle(e);
}
else
{
for(auto& v : self.m_threadMessages)
{
if(v.second.first.valid())
{
if(msg.second.first.valid())
{
self.m_threadMessages[msg.first] = std::move(msg.second);
msg.second.first = ossia::value{};
}
self.m_protocol->push(*v.first, v.second.first);
}
}
}

// Push the actual messages
for(auto& v : self.m_threadMessages)
{
auto val = v.second.first;
if(val.valid())
{
self.m_protocol->push(*v.first, v.second.first);
}
}
// Clear both containers (while keeping memory allocated for sent
// messages so that it stays fast)
for(auto& v : self.m_buffer)
if(v.second.first.valid())
v.second.first = ossia::value{};

// Clear both containers (while keeping memory allocated for sent
// messages so that it stays fast)
for(auto& v : self.m_buffer)
{
if(v.second.first.valid())
{
v.second.first = ossia::value{};
}
}
if constexpr(!Repeat)
{
for(auto& v : self.m_threadMessages)
if(v.second.first.valid())
v.second.first = ossia::value{};
}
}

for(auto& v : self.m_threadMessages)
{
if(v.second.first.valid())
{
v.second.first = ossia::value{};
}
}
auto new_time = clock::now();
auto observed_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
new_time - prev_time);
if(observed_duration > duration)
{
if(observed_duration >= 2 * duration)
time_to_sleep = 0ms;
else
time_to_sleep = 2 * duration - observed_duration;
}
else
{
time_to_sleep = duration;
}
void operator()()
{
ossia::set_thread_name("ossia ratelim");

while(this->self.m_running)
{
try
{
const auto prev_time = this->sleep_before();
this->send();
this->sleep_after(prev_time);
}
catch(...)
{
Expand All @@ -94,17 +143,17 @@ struct rate_limiter
};

rate_limiting_protocol::rate_limiting_protocol(
rate_limiting_protocol::duration d, std::unique_ptr<protocol_base> arg)
rate_limiter_configuration d, std::unique_ptr<protocol_base> arg)
: protocol_base{flags{SupportsMultiplex}}
, m_duration{d}
, m_duration{d.duration}
, m_bundle{d.bundle}
, m_repeat{d.repeat}
, m_send_all{d.send_all}
, m_protocol{std::move(arg)}

{
m_userMessages.reserve(4096);
m_buffer.reserve(4096);
m_threadMessages.reserve(4096);
m_lastTime = clock::now();
m_thread = std::thread{rate_limiter{*this}};
}

rate_limiting_protocol::~rate_limiting_protocol()
Expand All @@ -116,6 +165,7 @@ rate_limiting_protocol::~rate_limiting_protocol()
void rate_limiting_protocol::set_duration(rate_limiting_protocol::duration d)
{
m_duration = d;
// FIXME update thread
}

bool rate_limiting_protocol::pull(ossia::net::parameter_base& address)
Expand All @@ -126,13 +176,17 @@ bool rate_limiting_protocol::pull(ossia::net::parameter_base& address)
bool rate_limiting_protocol::push(
const ossia::net::parameter_base& address, const ossia::value& v)
{
if(m_send_all)
return true;

std::lock_guard lock{m_msgMutex};
m_userMessages[&address] = {v, {}};
return true;
}

bool rate_limiting_protocol::push_raw(const full_parameter_data& address)
{
// FIXME
return m_protocol->push_raw(address);
}

Expand Down Expand Up @@ -165,7 +219,38 @@ void rate_limiting_protocol::set_device(device_base& dev)
{
m_device = &dev;
m_protocol->set_device(dev);
m_lastTime = clock::now();
uint8_t flags = 0;
flags |= m_send_all ? 0b001 : 0b000;
flags |= m_repeat ? 0b010 : 0b000;
flags |= m_bundle ? 0b100 : 0b000;
switch(flags)
{
case 0b000:
m_thread = std::thread{rate_limiter_concrete<0, 0, 0>{*this, m_duration}};
break;
case 0b001:
m_thread = std::thread{rate_limiter_concrete<0, 0, 1>{*this, m_duration}};
break;
case 0b010:
m_thread = std::thread{rate_limiter_concrete<0, 1, 0>{*this, m_duration}};
break;
case 0b011:
m_thread = std::thread{rate_limiter_concrete<0, 1, 1>{*this, m_duration}};
break;
case 0b100:
m_thread = std::thread{rate_limiter_concrete<1, 0, 0>{*this, m_duration}};
break;
case 0b101:
m_thread = std::thread{rate_limiter_concrete<1, 0, 1>{*this, m_duration}};
break;
case 0b110:
m_thread = std::thread{rate_limiter_concrete<1, 1, 0>{*this, m_duration}};
break;
case 0b111:
m_thread = std::thread{rate_limiter_concrete<1, 1, 1>{*this, m_duration}};
break;
}
}

}
#endif
Loading
Loading