Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions proto/xmtpv4/message_api/message_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,140 @@ message SubscribeEnvelopesResponse {
repeated xmtp.xmtpv4.envelopes.OriginatorEnvelope envelopes = 1;
}

// ---- XIP-83 bidirectional mutable subscription (d14n binding) ----
//
// QueryApi.Subscribe is the bidirectional evolution of SubscribeTopics: one
// long-lived stream the client mutates in place (no reconnect on group
// join/leave) with a WebSocket-style ping/pong so silent stream death is
// detected on both ends. It mirrors the v3 MlsApi.Subscribe control protocol
// (mls/api/v1/mls.proto), adapted to the decentralized data model: each
// subscription resumes from a per-originator vector Cursor, and delivery is the
// unified OriginatorEnvelope stream rather than typed group/welcome messages
// (the client demuxes by each envelope's target topic). SubscribeTopics remains
// the server-streaming, immutable ancestor for clients that cannot do bidi
// (grpc-web / connect-web). Request and response are wrapped in `oneof version`
// and pinned per stream: V1 requests receive only V1 responses.

// Client -> node. Sent one or more times over the life of the stream.
message SubscribeRequest {
oneof version {
V1 v1 = 1;
}

message V1 {
// Each frame is exactly one of: a mutation, a Ping, or a Pong.
oneof request {
Mutate mutate = 1;
Ping ping = 2; // liveness challenge (e.g. probe the link after resuming)
Pong pong = 3; // answer to a node Ping
}

// Add and/or remove subscriptions in place (applied atomically per frame).
// Topics use the kind-prefixed binary representation (XIP-49 §3.3.2): the
// first byte is the topic kind, the remainder is the identifier. A topic
// whose kind the node does not serve fails the stream with INVALID_ARGUMENT.
message Mutate {
repeated Subscription adds = 1; // begin delivering these topics
repeated bytes removes = 2; // topics to stop delivering

// Catch this Mutate's adds up to the live edge — history, TopicsLive
// markers, and the wave's CatchupComplete — but do NOT register them for
// live delivery. The markers then mean "you have everything as of now".
// Combined with half-closing the request stream, this is the bounded
// catch-up ("sync") mode: the node finishes the wave then closes the
// stream itself. Removals in the Mutate are unaffected.
bool history_only = 3;

// Client-chosen correlation id, echoed on this wave's CatchupComplete so
// completions are attributable when waves overlap. SHOULD be unique per
// stream; 0 = no correlation requested (still echoed as 0).
uint64 mutate_id = 4;

// A topic to subscribe, with the vector cursor to resume from.
message Subscription {
bytes topic = 1;
// Resume point: deliver envelopes beyond this per-originator vector
// cursor. Omitted/empty = from the beginning. Originators absent from
// the cursor map are treated as sequence 0 (the node fills them in),
// mirroring SubscribeTopics.TopicFilter.last_seen.
xmtp.xmtpv4.envelopes.Cursor last_seen = 2;
}
}
}
}

// Node -> client.
message SubscribeResponse {
oneof version {
V1 v1 = 1;
}

message V1 {
oneof response {
Envelopes envelopes = 1;
Started started = 2; // sent once, immediately on open, before any catch-up
Ping ping = 3; // idle liveness challenge; receiver MUST answer with Pong
Pong pong = 4; // answer to a client Ping
TopicsLive topics_live = 5; // these topics just crossed from catch-up to live
CatchupComplete catchup_complete = 6; // a Mutate's adds are fully delivered
}

// A batch of envelopes across the active subscriptions; the client demuxes
// by each envelope's target topic.
message Envelopes {
repeated xmtp.xmtpv4.envelopes.OriginatorEnvelope envelopes = 1;
}

// The first frame on every stream.
message Started {
// The node's ping cadence (ms): the basis for the client's staleness
// threshold and the node's reap deadline.
uint32 keepalive_interval_ms = 1;
// Optional protocol features the node supports on this stream. The node
// silently ignores request types it does not understand, so a client MUST
// NOT send an optional request type whose capability the node did not
// advertise (it would hang waiting on a response that never comes).
repeated Capability capabilities = 2;
}

// Sent once per Mutate that adds subscriptions (a catch-up "wave"), after
// the wave's last TopicsLive: everything the Mutate asked for is delivered.
message CatchupComplete {
uint64 mutate_id = 1; // echoes the Mutate that started this wave (0 if none given)
}

// Emitted when topics finish catch-up, AFTER the last history frame for
// them — including any live envelopes that queued behind the catch-up,
// which were equally historical from the client's perspective — so every
// later frame for a listed topic is live tail. Informational only: delivery
// correctness (no duplicates, no gaps) never depends on it. Re-adding a
// topic re-runs catch-up and re-emits it; receivers treat it idempotently.
message TopicsLive {
repeated bytes topics = 1; // kind-prefixed topics now tailing live
}

// Optional per-stream protocol features (none defined yet; future revisions
// add values, e.g. fetch-over-stream lookups answered with the same read
// view that feeds the stream, or new streamable topic kinds).
enum Capability {
CAPABILITY_UNSPECIFIED = 0;
}
}
}

// Liveness challenge/response for Subscribe, shared across versions. Either peer
// MAY send a Ping; the receiver MUST reply with a Pong echoing the nonce. The
// sender closes the stream if no Pong arrives within its deadline — how a node
// reaps a vanished peer (e.g. a mobile client the OS suspended behind a proxy
// that still ACKs the transport).
message Ping {
uint64 nonce = 1;
}

message Pong {
uint64 nonce = 1; // echoes the nonce of the Ping it answers
}

// Batch subscribe to all envelopes
message SubscribeAllEnvelopesRequest {}

Expand Down
7 changes: 7 additions & 0 deletions proto/xmtpv4/message_api/query_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ service QueryApi {
rpc SubscribeTopics(SubscribeTopicsRequest)
returns (stream SubscribeTopicsResponse) {}

// XIP-83 bidirectional mutable subscription: a single long-lived stream the
// client mutates in place (add/remove topics) with ping/pong liveness, in
// contrast to SubscribeTopics' fixed, immutable, server-streaming filter set.
// Bidi streaming requires HTTP/2 (not grpc-web / connect-web); browser
// clients stay on SubscribeTopics.
rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse) {}

rpc GetInboxIds(GetInboxIdsRequest) returns (GetInboxIdsResponse) {}

rpc GetNewestEnvelope(GetNewestEnvelopeRequest)
Expand Down
Loading