feat(api): XIP-83 d14n binding — bidirectional Subscribe on QueryApi#2020
Conversation
Summary by OctaneNew Contracts
Updated Contracts
🔗 Commit Hash: c44b65c |
| m.l.topicsMu.Lock() | ||
| defer m.l.topicsMu.Unlock() | ||
| if m.l.closed { | ||
| return | ||
| } | ||
| m.worker.topicListeners.addListener(keys, m.l) | ||
| for k := range keys { | ||
| m.l.topics[k] = struct{}{} | ||
| } | ||
| } |
There was a problem hiding this comment.
🔴 Critical message/mutable_subscription.go:47
mutableSubscription.addTopics checks m.l.closed while holding topicsMu, but subscribeWorker.closeListener sets l.closed = true without taking that lock. A concurrent mutate can pass the stale closed check and re-add the listener to topicListeners after its channel is closed, causing dispatchToListeners to panic on send.
func (m *mutableSubscription) addTopics(keys map[string]struct{}) {
if len(keys) == 0 {
return
}
- m.l.topicsMu.Lock()
- defer m.l.topicsMu.Unlock()
- if m.l.closed {
+ m.worker.topicListeners.mu.RLock()
+ defer m.worker.topicListeners.mu.RUnlock()
+ m.l.topicsMu.Lock()
+ defer m.l.topicsMu.Unlock()
+ if m.l.closed {
return
}
m.worker.topicListeners.addListener(keys, m.l)🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/mutable_subscription.go around lines 47-56:
`mutableSubscription.addTopics` checks `m.l.closed` while holding `topicsMu`, but `subscribeWorker.closeListener` sets `l.closed = true` without taking that lock. A concurrent mutate can pass the stale `closed` check and re-add the listener to `topicListeners` after its channel is closed, causing `dispatchToListeners` to panic on send.
There was a problem hiding this comment.
Fixed. closeListener now sets l.closed under l.topicsMu, so a concurrent mutableSubscription.addTopics observes it atomically with its addListener call and can no longer re-register a listener after its channel is closed. Added a -race regression test (TestMutableSubscriptionAddRaceWithClose) that hammers addTopics against the reap.
| wire := make([][]byte, 0, len(w.topics)) | ||
| for _, t := range w.topics { | ||
| if w.historyOnly { | ||
| wire = append(wire, t.wire) | ||
| continue | ||
| } |
There was a problem hiding this comment.
🟡 Medium message/subscribe.go:368
handleCatchUp appends history_only topics to wire at line 371 and emits TopicsLive for them at line 392, but these topics are never added to liveTopics so no live envelopes will actually arrive. A client receiving the TopicsLive frame incorrectly believes these topics are now live. Consider skipping the TopicsLive announcement entirely for history_only waves.
wire := make([][]byte, 0, len(w.topics))
for _, t := range w.topics {
if w.historyOnly {
- wire = append(wire, t.wire)
continue
}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @pkg/api/message/subscribe.go around lines 368-373:
`handleCatchUp` appends `history_only` topics to `wire` at line 371 and emits `TopicsLive` for them at line 392, but these topics are never added to `liveTopics` so no live envelopes will actually arrive. A client receiving the `TopicsLive` frame incorrectly believes these topics are now live. Consider skipping the `TopicsLive` announcement entirely for `history_only` waves.
There was a problem hiding this comment.
Working as specified, leaving as-is. The proto contract for the history_only field explicitly mandates these markers — it says the adds are caught up via "history, TopicsLive markers, and the wave's CatchupComplete" — and TopicsLive is documented informational-only ("delivery correctness (no duplicates, no gaps) never depends on it"). A client that opted into history_only interprets TopicsLive as the "everything as of now" boundary, not "a live tail follows"; it learns there is no live tail because it sent history_only (and typically half-closes). Suppressing the marker here would make this binding diverge from the field contract and from the v3 MlsApi binding.
ApprovabilityVerdict: Needs human review 3 blocking correctness issues found. This PR introduces a significant new bidirectional Subscribe RPC feature (~1500 lines) with complex concurrency patterns. Additionally, there are unresolved review comments including a critical race condition that could cause panics, and the author does not own any of the modified files. You can customize Macroscope's approvability policy. Learn more. |
Overview
Detailed findings
|
c44b65c to
c8131f9
Compare
|
Pushed an update addressing the Macroscope findings (3 fixed, 1 working-as-specified per the Beyond those, I did a deeper self-review of the concurrent paths and hardened several edge cases, each with a regression test:
|
c8131f9 to
a58bba6
Compare
Response to the Octane security reviewThanks — these were valuable. Note Octane scanned commit ✅ Fixed in this push (in direct response to Octane + Macroscope)
✅ Already fixed in the current commit (Octane scanned the pre-fix
|
…lace topic add/remove Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
a58bba6 to
eccc068
Compare
Handler state-management redesigned to make the recurring bug-classes structurally impossibleAcross the prior review rounds (Macroscope ×3 + a deep multi-agent pass), the findings kept clustering into the same two structural causes rather than independent bugs. Instead of patching symptoms again, I restructured the handler so those classes can't be expressed: 1. Per-topic state was four parallel maps ( 2. The sender result was a All previously-flagged fixes are folded in: ReviewTwo independent multi-agent review rounds (5 dimensions each; every finding adversarially verified by 3 skeptics). The
Build clean, tests race-clean, golangci-lint v2.10.1: 0 issues. Note: the line anchors on the earlier inline comments have moved with the rewrite — replies inline. |
…4 protos Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…for tests Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
eccc068 to
c44972f
Compare
XIP-83 d14n binding — bidirectional
QueryApi.SubscribeImplements the decentralized (d14n) binding of XIP-83 mutable subscription streams on xmtpd. A single long-lived bidirectional stream lets a client mutate its topic set in place (add/remove subscriptions) and keep the connection alive with ping/pong — instead of tearing down and reopening a stream every time group membership changes.
This is the xmtpd counterpart to the node-go v3
MlsApi.Subscribebinding. Same control protocol (Mutate/Started/CatchupComplete/TopicsLive/Ping/Pong); the d14n binding deliversOriginatorEnvelopes and uses per-originator vector cursors.Design: gated async catch-up
advanceTopicCursors). Anything that lands during catch-up is either in the snapshot or arrives live; the dedup drops the overlap.maxActiveSubscribeTopics) — ~16–32MB for a consumer that large, deliberately favored over forcing clients into multiple connections (which would invite rate-limiting and wreck some topologies).Ping; the receiver mustPongthe nonce. No pong within the deadline → the node reaps the vanished peer (e.g. a mobile client the OS suspended behind a proxy that still ACKs the transport).Commits
feat(api): add mutableSubscription handle to subscribeWorker— in-place topic add/remove on the existing subscribe worker, guarded bytopicsMu.feat(api): XIP-83 bidirectional Subscribe handler on QueryApi + xmtpv4 protos— the handler (subscribe.go) + regenerated xmtpv4 protos.test(api): Subscribe handler tests + configurable keepalive interval for tests— 5 race-tested scenarios + a test-only knob to shorten the keepalive cadence.Tests
go test -race ./pkg/api/message— all green:TestSubscribe_CatchUpThenLiveTestSubscribe_MutateRemoveStopsDeliveryTestSubscribe_HalfCloseHistoryOnlyDrainsTestSubscribe_HistoryOnlyOnLiveRejectedTestSubscribe_NoPongIsReapedDependency & merge ordering
Built on xmtp/proto#338 (
QueryApi.Subscribe). The committed.pb.gohere is generated from that branch viadev/gen/protos; once #338 merges, regenerating from protomainproduces byte-identical output.🤖 Generated with Claude Code
Note
Add bidirectional Subscribe RPC to QueryApi with mutable topic membership and catch-up history
Subscribebidi-streaming RPC inpkg/api/message/subscribe.go: clients sendMutateframes to add/remove topics at runtime; the server delivers history catch-up,TopicsLive, andCatchupCompletemarkers per mutate, then routes live envelopes.mutableSubscriptioninpkg/api/message/mutable_subscription.goso topic registration is updated in-place without reconnecting; add/remove are idempotent and safe under concurrent worker reaping.subscribeSessionstate machine managing ping/pong keepalive, backpressure-bounded sending, pending-byte caps, in-flight wave limits, and graceful half-close drain.closeListenerinpkg/api/message/subscribe_worker.goto set the closed flag undertopicsMubefore channel close, preventing re-registration races and send-on-closed-channel bugs.SubscribeRequest/SubscribeResponseand related message types acrosspkg/proto/.Macroscope summarized c44972f.