diff --git a/src/MoqxRelay.cpp b/src/MoqxRelay.cpp index 2a6bdf3f..7430ec02 100644 --- a/src/MoqxRelay.cpp +++ b/src/MoqxRelay.cpp @@ -1110,9 +1110,22 @@ folly::coro::Task MoqxRelay::addSubscriberAndPublishViaLocalForwarder( } } + // Capture largest/extensions on publisherExec; reading them on subscriberExec would + // race the publisher advancing largest_. + std::optional seedLargest; + Extensions seedExtensions; + co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(publisherExec), + [&]() -> folly::coro::Task { + seedLargest = publisherFwd->largest(); + seedExtensions = publisherFwd->extensions(); + co_return; + }() + ); + auto [localFwd, isNew, localReg] = acquireLocalForwarder(ftn, [&] { - auto fwd = std::make_shared(ftn, publisherFwd->largest()); - fwd->setExtensions(publisherFwd->extensions()); + auto fwd = std::make_shared(ftn, seedLargest); + fwd->setExtensions(seedExtensions); return fwd; }); @@ -2466,23 +2479,34 @@ void MoqxRelay::onTrackEvicted(const FullTrackName& ftn, std::shared_ptrgetSubscriber(session.get()); - if (!sub || sub->isPinned()) { - XLOG(DBG4) << "onTrackEvicted: pinned subscriber, skipping"; + // PublishDone makes removeSubscriber notify the downstream via publishDone() rather + // than silently dropping it. + auto evict = [session](const std::shared_ptr& fwd) { + if (!fwd) { + return; + } + auto sub = fwd->getSubscriber(session.get()); + if (!sub || sub->isPinned()) { + XLOG(DBG4) << "onTrackEvicted: pinned/missing subscriber, skipping"; + return; + } + fwd->removeSubscriber( + session, + PublishDone{RequestID(0), PublishDoneStatusCode::SUBSCRIPTION_ENDED, 0, "evicted"}, + "onTrackEvicted" + ); + }; + + if (mode() == Mode::LocalForwarder) { + // The subscriber lives on the per-thread local forwarder, not the registry's + // publisher forwarder; evict it on its owning exec. + folly::via(session->getExecutor(), [this, ftn, evict = std::move(evict)]() { + auto* localReg = tlForwarders_.get(); + evict(localReg ? localReg->get(ftn) : nullptr); + }); return; } - // Pass PublishDone so removeSubscriber calls trackConsumer->publishDone() before - // removing the subscriber. Passing nullopt would silently drop the subscriber - // without notifying the downstream session that its subscription ended. - forwarder->removeSubscriber( - session, - PublishDone{RequestID(0), PublishDoneStatusCode::SUBSCRIPTION_ENDED, 0, "evicted"}, - "onTrackEvicted" - ); + evict(registry_.getForwarder(ftn)); } void MoqxRelay::dumpState(RelayStateVisitor& visitor) const {