From 6ccd5781b0c851faaa143dc37b0cfedf20f1cb19 Mon Sep 17 00:00:00 2001 From: Tyler Hawkes Date: Tue, 16 Jun 2026 14:32:57 -0600 Subject: [PATCH] =?UTF-8?q?xmtp=5Fproto:=20XIP-83=20d14n=20binding=20?= =?UTF-8?q?=E2=80=94=20QueryApi.Subscribe=20(bidi=20mutable=20subscription?= =?UTF-8?q?;=20vector-cursor=20topics;=20OriginatorEnvelope=20delivery;=20?= =?UTF-8?q?mutate=5Fid=20waves;=20Started/CatchupComplete/TopicsLive;=20pi?= =?UTF-8?q?ng/pong;=20history=5Fonly)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Fable 5 --- proto/xmtpv4/message_api/message_api.proto | 134 +++++++++++++++++++++ proto/xmtpv4/message_api/query_api.proto | 7 ++ 2 files changed, 141 insertions(+) diff --git a/proto/xmtpv4/message_api/message_api.proto b/proto/xmtpv4/message_api/message_api.proto index f16d3baf..0a67dffd 100644 --- a/proto/xmtpv4/message_api/message_api.proto +++ b/proto/xmtpv4/message_api/message_api.proto @@ -61,6 +61,140 @@ message SubscribeEnvelopesResponse { repeated xmtp.xmtpv4.envelopes.OriginatorEnvelope envelopes = 1; } +// ---- XIP-83 bidirectional mutable subscription (d14n binding) ---- +// +// QueryApi.Subscribe is the bidirectional evolution of SubscribeTopics: one +// long-lived stream the client mutates in place (no reconnect on group +// join/leave) with a WebSocket-style ping/pong so silent stream death is +// detected on both ends. It mirrors the v3 MlsApi.Subscribe control protocol +// (mls/api/v1/mls.proto), adapted to the decentralized data model: each +// subscription resumes from a per-originator vector Cursor, and delivery is the +// unified OriginatorEnvelope stream rather than typed group/welcome messages +// (the client demuxes by each envelope's target topic). SubscribeTopics remains +// the server-streaming, immutable ancestor for clients that cannot do bidi +// (grpc-web / connect-web). Request and response are wrapped in `oneof version` +// and pinned per stream: V1 requests receive only V1 responses. + +// Client -> node. Sent one or more times over the life of the stream. +message SubscribeRequest { + oneof version { + V1 v1 = 1; + } + + message V1 { + // Each frame is exactly one of: a mutation, a Ping, or a Pong. + oneof request { + Mutate mutate = 1; + Ping ping = 2; // liveness challenge (e.g. probe the link after resuming) + Pong pong = 3; // answer to a node Ping + } + + // Add and/or remove subscriptions in place (applied atomically per frame). + // Topics use the kind-prefixed binary representation (XIP-49 §3.3.2): the + // first byte is the topic kind, the remainder is the identifier. A topic + // whose kind the node does not serve fails the stream with INVALID_ARGUMENT. + message Mutate { + repeated Subscription adds = 1; // begin delivering these topics + repeated bytes removes = 2; // topics to stop delivering + + // Catch this Mutate's adds up to the live edge — history, TopicsLive + // markers, and the wave's CatchupComplete — but do NOT register them for + // live delivery. The markers then mean "you have everything as of now". + // Combined with half-closing the request stream, this is the bounded + // catch-up ("sync") mode: the node finishes the wave then closes the + // stream itself. Removals in the Mutate are unaffected. + bool history_only = 3; + + // Client-chosen correlation id, echoed on this wave's CatchupComplete so + // completions are attributable when waves overlap. SHOULD be unique per + // stream; 0 = no correlation requested (still echoed as 0). + uint64 mutate_id = 4; + + // A topic to subscribe, with the vector cursor to resume from. + message Subscription { + bytes topic = 1; + // Resume point: deliver envelopes beyond this per-originator vector + // cursor. Omitted/empty = from the beginning. Originators absent from + // the cursor map are treated as sequence 0 (the node fills them in), + // mirroring SubscribeTopics.TopicFilter.last_seen. + xmtp.xmtpv4.envelopes.Cursor last_seen = 2; + } + } + } +} + +// Node -> client. +message SubscribeResponse { + oneof version { + V1 v1 = 1; + } + + message V1 { + oneof response { + Envelopes envelopes = 1; + Started started = 2; // sent once, immediately on open, before any catch-up + Ping ping = 3; // idle liveness challenge; receiver MUST answer with Pong + Pong pong = 4; // answer to a client Ping + TopicsLive topics_live = 5; // these topics just crossed from catch-up to live + CatchupComplete catchup_complete = 6; // a Mutate's adds are fully delivered + } + + // A batch of envelopes across the active subscriptions; the client demuxes + // by each envelope's target topic. + message Envelopes { + repeated xmtp.xmtpv4.envelopes.OriginatorEnvelope envelopes = 1; + } + + // The first frame on every stream. + message Started { + // The node's ping cadence (ms): the basis for the client's staleness + // threshold and the node's reap deadline. + uint32 keepalive_interval_ms = 1; + // Optional protocol features the node supports on this stream. The node + // silently ignores request types it does not understand, so a client MUST + // NOT send an optional request type whose capability the node did not + // advertise (it would hang waiting on a response that never comes). + repeated Capability capabilities = 2; + } + + // Sent once per Mutate that adds subscriptions (a catch-up "wave"), after + // the wave's last TopicsLive: everything the Mutate asked for is delivered. + message CatchupComplete { + uint64 mutate_id = 1; // echoes the Mutate that started this wave (0 if none given) + } + + // Emitted when topics finish catch-up, AFTER the last history frame for + // them — including any live envelopes that queued behind the catch-up, + // which were equally historical from the client's perspective — so every + // later frame for a listed topic is live tail. Informational only: delivery + // correctness (no duplicates, no gaps) never depends on it. Re-adding a + // topic re-runs catch-up and re-emits it; receivers treat it idempotently. + message TopicsLive { + repeated bytes topics = 1; // kind-prefixed topics now tailing live + } + + // Optional per-stream protocol features (none defined yet; future revisions + // add values, e.g. fetch-over-stream lookups answered with the same read + // view that feeds the stream, or new streamable topic kinds). + enum Capability { + CAPABILITY_UNSPECIFIED = 0; + } + } +} + +// Liveness challenge/response for Subscribe, shared across versions. Either peer +// MAY send a Ping; the receiver MUST reply with a Pong echoing the nonce. The +// sender closes the stream if no Pong arrives within its deadline — how a node +// reaps a vanished peer (e.g. a mobile client the OS suspended behind a proxy +// that still ACKs the transport). +message Ping { + uint64 nonce = 1; +} + +message Pong { + uint64 nonce = 1; // echoes the nonce of the Ping it answers +} + // Batch subscribe to all envelopes message SubscribeAllEnvelopesRequest {} diff --git a/proto/xmtpv4/message_api/query_api.proto b/proto/xmtpv4/message_api/query_api.proto index 6e166b5e..0db72693 100644 --- a/proto/xmtpv4/message_api/query_api.proto +++ b/proto/xmtpv4/message_api/query_api.proto @@ -14,6 +14,13 @@ service QueryApi { rpc SubscribeTopics(SubscribeTopicsRequest) returns (stream SubscribeTopicsResponse) {} + // XIP-83 bidirectional mutable subscription: a single long-lived stream the + // client mutates in place (add/remove topics) with ping/pong liveness, in + // contrast to SubscribeTopics' fixed, immutable, server-streaming filter set. + // Bidi streaming requires HTTP/2 (not grpc-web / connect-web); browser + // clients stay on SubscribeTopics. + rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse) {} + rpc GetInboxIds(GetInboxIdsRequest) returns (GetInboxIdsResponse) {} rpc GetNewestEnvelope(GetNewestEnvelopeRequest)