Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/ossia-qt/serial/serial_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include <QObject>
#include <QQmlComponent>
#include <QSerialPort>
#include <QThread>

#include <span>
Expand Down
198 changes: 171 additions & 27 deletions src/ossia/audio/pipewire_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cassert>
#include <cerrno>
#include <chrono>
#include <cstdio>
#include <stdexcept>

Expand Down Expand Up @@ -58,6 +59,7 @@ class libpipewire
decltype(&::pw_filter_add_port) filter_add_port{};
decltype(&::pw_filter_destroy) filter_destroy{};
decltype(&::pw_filter_connect) filter_connect{};
decltype(&::pw_filter_disconnect) filter_disconnect{};
decltype(&::pw_filter_get_dsp_buffer) filter_get_dsp_buffer{};

static const libpipewire& instance()
Expand Down Expand Up @@ -114,6 +116,8 @@ class libpipewire
= library.symbol<decltype(&::pw_filter_add_port)>("pw_filter_add_port");
filter_destroy = library.symbol<decltype(&::pw_filter_destroy)>("pw_filter_destroy");
filter_connect = library.symbol<decltype(&::pw_filter_connect)>("pw_filter_connect");
filter_disconnect
= library.symbol<decltype(&::pw_filter_disconnect)>("pw_filter_disconnect");
filter_get_dsp_buffer = library.symbol<decltype(&::pw_filter_get_dsp_buffer)>(
"pw_filter_get_dsp_buffer");

Expand Down Expand Up @@ -144,6 +148,7 @@ class libpipewire
assert(filter_add_port);
assert(filter_destroy);
assert(filter_connect);
assert(filter_disconnect);
assert(filter_get_dsp_buffer);
}
};
Expand Down Expand Up @@ -323,20 +328,40 @@ struct pipewire_context
pw_registry_add_listener(
this->registry, &this->registry_listener, &registry_events, this);

synchronize();
if(!synchronize())
{
ossia::logger().error(
"PipeWire: initial synchronize() failed — context marked as broken");
// m_broken is already set by synchronize(); leave members as-is so the
// destructor still cleans them up, but the factory will see broken()
// and rebuild the context rather than use this one.
return;
}
}

int pending{};
int done{};
void synchronize()
int error_state{};
bool m_broken{};

// Once broken, the shared core connection is considered unrecoverable:
// the factory will tear this context down and build a new one rather
// than attempting to reuse it.
bool broken() const noexcept { return m_broken; }

// Round-trip to the PipeWire daemon with a hard deadline. Returns true
// on success; on timeout or protocol error, logs, marks the context as
// broken, and returns false. Callers must check the return value and
// bail out of their own wait loops — we must never spin forever against
// a wedged daemon.
bool synchronize()
{
pending = 0;
done = 0;
error_state = 0;

if(!core)
return;

spa_hook core_listener;
if(!core || m_broken)
return false;

static constexpr struct pw_core_events core_events = {
.version = PW_VERSION_CORE_EVENTS,
Expand All @@ -347,11 +372,17 @@ struct pipewire_context
if(id == PW_ID_CORE && seq == self.pending)
{
self.done = 1;
libpipewire::instance().main_loop_quit(self.main_loop);
}
},
.ping = {},
.error = {},
.error =
[](void* object, uint32_t id, int seq, int res, const char* message) {
auto& self = *(pipewire_context*)object;
ossia::logger().error(
"PipeWire: core error id={} seq={} res={} ({}): {}", id, seq, res,
spa_strerror(res), message ? message : "");
self.error_state = 1;
},
.remove_id = {},
.bound_id = {},
.add_mem = {},
Expand All @@ -361,19 +392,63 @@ struct pipewire_context
#endif
};

spa_hook core_listener;
spa_zero(core_listener);
pw_core_add_listener(core, &core_listener, &core_events, this);

// RAII: guarantee the listener is removed on every exit path so the
// core never calls into a dead stack frame (the hook lives on the stack).
struct hook_guard
{
spa_hook* h;
~hook_guard() { spa_hook_remove(h); }
} guard{&core_listener};

pending = pw_core_sync(core, PW_ID_CORE, 0);
while(!done)

using clk = std::chrono::steady_clock;
constexpr auto timeout = std::chrono::seconds(2);
const auto deadline = clk::now() + timeout;

while(!done && !error_state)
{
pw.main_loop_run(this->main_loop);
const auto now = clk::now();
if(now >= deadline)
{
ossia::logger().error(
"PipeWire: synchronize() timed out after {} s waiting for core sync",
int(std::chrono::duration_cast<std::chrono::seconds>(timeout).count()));
error_state = 1;
break;
}

const auto remaining
= std::chrono::duration_cast<std::chrono::milliseconds>(deadline - now).count();
const int iter_ms = int(remaining < 50 ? remaining : 50);

const int r = pw_loop_iterate(lp, iter_ms);
if(r < 0)
{
ossia::logger().error(
"PipeWire: pw_loop_iterate failed: {}", spa_strerror(r));
error_state = 1;
break;
}
}

if(error_state || !done)
{
m_broken = true;
return false;
}
spa_hook_remove(&core_listener);
return true;
}

pw_proxy* link_ports(uint32_t out_port, uint32_t in_port)
{
if(m_broken)
return nullptr;

auto props = pw.properties_new(
PW_KEY_LINK_OUTPUT_PORT, std::to_string(out_port).c_str(),
PW_KEY_LINK_INPUT_PORT, std::to_string(in_port).c_str(), nullptr);
Expand All @@ -389,7 +464,14 @@ struct pipewire_context
return nullptr;
}

synchronize();
if(!synchronize())
{
// sync failed: the daemon may or may not have committed the create.
// Destroy our local proxy (best-effort, won't hang) and bail out.
pw.proxy_destroy(proxy);
pw.properties_free(props);
return nullptr;
}
pw.properties_free(props);
return proxy;
}
Expand Down Expand Up @@ -623,14 +705,27 @@ class pipewire_audio_protocol : public audio_engine
throw std::runtime_error("PipeWire: cannot connect");
}

// Wait until everything is registered with PipeWire
this->loop->synchronize();
// Wait until everything is registered with PipeWire. Every synchronize()
// call is bounded by a deadline inside the context; if any of them fail
// we abort the construction without setting activated, leaving an inert
// engine that running() will report as false.
if(!this->loop->synchronize())
{
ossia::logger().error(
"PipeWire: synchronize() failed after filter_connect — engine inactive");
return;
}
{
int k = 0;
auto node_id = filter_node_id();
while(node_id == 4294967295)
{
this->loop->synchronize();
if(!this->loop->synchronize())
{
ossia::logger().error(
"PipeWire: synchronize() failed while waiting for node id");
return;
}
node_id = filter_node_id();

if(k++; k > 100)
Expand All @@ -645,7 +740,12 @@ class pipewire_audio_protocol : public audio_engine
while(this_node.inputs.size() < num_local_ins
|| this_node.outputs.size() < num_local_outs)
{
this->loop->synchronize();
if(!this->loop->synchronize())
{
ossia::logger().error(
"PipeWire: synchronize() failed while waiting for ports");
return;
}
if(k++; k > 100)
return;
}
Expand Down Expand Up @@ -779,10 +879,50 @@ class pipewire_audio_protocol : public audio_engine

auto& pw = libpipewire::instance();

// Tear-down ordering matters here. pw_filter_connect with
// PW_FILTER_FLAG_RT_PROCESS schedules on_process on PipeWire's data
// (RT) loop, which is a different thread from the main/control loop
// we're running on. If we destroy link proxies or the filter while
// the data loop is still dispatching on_process against them, we race
// the RT thread and corrupt PipeWire's internal state — which can
// wedge the shared core connection and leave the app unrecoverable
// short of a full restart.
//
// Correct sequence:
// 1. pw_filter_disconnect: synchronously deactivates the node and
// tears down its data-loop scheduling, so on_process will no
// longer fire for this filter.
// 2. synchronize: round-trip with the daemon so the disconnect has
// fully propagated before we touch any proxies the RT path may
// have referenced.
// 3. destroy link proxies — now safe.
// 4. pw_filter_destroy — frees the local filter impl. It detects
// the prior disconnect and skips re-doing it.
// 5. synchronize — make sure the destroys have landed before we
// return and potentially let the shared context keep running.
if(this->filter)
{
if(int res = pw.filter_disconnect(this->filter); res < 0)
{
ossia::logger().warn(
"PipeWire: filter_disconnect failed: {}", spa_strerror(res));
}

// Best-effort sync; if this fails the context will be marked broken
// and the factory will rebuild it on the next make_engine call. We
// still continue the tear-down to free our local resources.
loop->synchronize();
}

for(auto link : this->links)
pw.proxy_destroy(link);
this->links.clear();

pw.filter_destroy(this->filter);
if(this->filter)
{
pw.filter_destroy(this->filter);
this->filter = nullptr;
}

loop->synchronize();
activated = false;
Expand Down Expand Up @@ -859,19 +999,23 @@ class pipewire_audio_protocol : public audio_engine
return;

auto& self = *(pipewire_audio_protocol*)userdata;
uint32_t nframes = position->clock.duration;
double current_time_ns = position->clock.nsec * 1e-9;
while(nframes >= self.effective_buffer_size)
const uint32_t nframes = position->clock.duration;
const double current_time_ns = position->clock.nsec * 1e-9;

// ossia's audio graph runs at a fixed block size; since we force the quantum
// via PW_KEY_NODE_FORCE_QUANTUM + PW_KEY_NODE_LOCK_QUANTUM the server should
// always call us with exactly effective_buffer_size frames. If it doesn't,
// skip the cycle rather than overrunning the DSP buffer or feeding the tick
// a mismatched block size.
if(nframes != static_cast<uint32_t>(self.effective_buffer_size))
{
self.do_process(self.effective_buffer_size, current_time_ns);
nframes -= self.effective_buffer_size;
current_time_ns += double(self.effective_buffer_size) / self.effective_sample_rate;
ossia::logger().warn(
"PipeWire: unexpected block size {} (expected {}), skipping cycle",
nframes, self.effective_buffer_size);
return;
}

if(nframes > 0)
{
self.do_process(self.effective_buffer_size, current_time_ns);
}
self.do_process(nframes, current_time_ns);
}

std::vector<port*> input_ports;
Expand Down
20 changes: 15 additions & 5 deletions src/ossia/dataflow/data_copy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,39 +188,49 @@ struct copy_data_pos
const std::size_t pos;

template <typename T, typename U>
void operator()(const T&, const U&) const
bool operator()(const T&, const U&) const
{
return false;
}

void operator()(const value_delay_line& out, value_port& in)
bool operator()(const value_delay_line& out, value_port& in)
{
if(pos < out.data.size())
{
copy_data{}(out.data[pos], in);
return true;
}
return false;
}

void operator()(const audio_delay_line& out, audio_port& in)
bool operator()(const audio_delay_line& out, audio_port& in)
{
if(pos < out.samples.size())
{
mix(out.samples[pos], in.get());
return true;
}
return false;
}

void operator()(const midi_delay_line& out, midi_port& in)
bool operator()(const midi_delay_line& out, midi_port& in)
{
if(pos < out.messages.size())
{
copy_data{}(out.messages[pos], in);
return true;
}
return false;
}
void operator()(const geometry_delay_line& out, geometry_port& in)

bool operator()(const geometry_delay_line& out, geometry_port& in)
{
if(pos < out.geometries.size())
{
copy_data{}(out.geometries[pos], in);
return true;
}
return false;
}
};
}
Loading
Loading