Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions src/MoqxRelay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1110,9 +1110,22 @@ folly::coro::Task<void> MoqxRelay::addSubscriberAndPublishViaLocalForwarder(
}
}

// Capture largest/extensions on publisherExec; reading them on subscriberExec would
// race the publisher advancing largest_.
std::optional<AbsoluteLocation> seedLargest;
Extensions seedExtensions;
co_await folly::coro::co_withExecutor(
folly::getKeepAliveToken(publisherExec),
[&]() -> folly::coro::Task<void> {
seedLargest = publisherFwd->largest();
seedExtensions = publisherFwd->extensions();
co_return;
}()
);

auto [localFwd, isNew, localReg] = acquireLocalForwarder(ftn, [&] {
auto fwd = std::make_shared<MoQForwarder>(ftn, publisherFwd->largest());
fwd->setExtensions(publisherFwd->extensions());
auto fwd = std::make_shared<MoQForwarder>(ftn, seedLargest);
fwd->setExtensions(seedExtensions);
return fwd;
});

Expand Down Expand Up @@ -2466,23 +2479,34 @@ void MoqxRelay::onTrackEvicted(const FullTrackName& ftn, std::shared_ptr<MoQSess
return;
}

auto forwarder = registry_.getForwarder(ftn);
if (!forwarder) {
return;
}
auto sub = forwarder->getSubscriber(session.get());
if (!sub || sub->isPinned()) {
XLOG(DBG4) << "onTrackEvicted: pinned subscriber, skipping";
// PublishDone makes removeSubscriber notify the downstream via publishDone() rather
// than silently dropping it.
auto evict = [session](const std::shared_ptr<MoQForwarder>& fwd) {
if (!fwd) {
return;
}
auto sub = fwd->getSubscriber(session.get());
if (!sub || sub->isPinned()) {
XLOG(DBG4) << "onTrackEvicted: pinned/missing subscriber, skipping";
return;
}
fwd->removeSubscriber(
session,
PublishDone{RequestID(0), PublishDoneStatusCode::SUBSCRIPTION_ENDED, 0, "evicted"},
"onTrackEvicted"
);
};

if (mode() == Mode::LocalForwarder) {
// The subscriber lives on the per-thread local forwarder, not the registry's
// publisher forwarder; evict it on its owning exec.
folly::via(session->getExecutor(), [this, ftn, evict = std::move(evict)]() {
auto* localReg = tlForwarders_.get();
evict(localReg ? localReg->get(ftn) : nullptr);
});
return;
}
// Pass PublishDone so removeSubscriber calls trackConsumer->publishDone() before
// removing the subscriber. Passing nullopt would silently drop the subscriber
// without notifying the downstream session that its subscription ended.
forwarder->removeSubscriber(
session,
PublishDone{RequestID(0), PublishDoneStatusCode::SUBSCRIPTION_ENDED, 0, "evicted"},
"onTrackEvicted"
);
evict(registry_.getForwarder(ftn));
}

void MoqxRelay::dumpState(RelayStateVisitor& visitor) const {
Expand Down
Loading