Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions proto/mls/api/v1/mls.proto
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ service MlsApi {
};
}

// Bidirectional subscription (XIP-83). One long-lived stream the client mutates
// in place via add/remove topic deltas, with WebSocket-style liveness ping/pong.
// A single stream MAY carry both group-message and welcome topics.
// gRPC-only: bidirectional streaming has no HTTP/grpc-gateway mapping.
rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse) {}

rpc BatchPublishCommitLog(BatchPublishCommitLogRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
post: "/mls/v1/batch-publish-commit-log"
Expand Down Expand Up @@ -404,6 +410,149 @@ message SubscribeWelcomeMessagesRequest {
repeated Filter filters = 1;
}

// --- XIP-83: bidirectional mutable subscription with liveness ----------------
//
// A single long-lived `Subscribe` stream replaces repeated server-streaming
// subscriptions. The client mutates its subscribed set in place (no reconnect on
// membership change), and the two sides exchange a WebSocket-style ping/pong so
// the client detects silent stream death and the node reaps a vanished peer.
//
// Subscriptions are expressed as kind-prefixed binary topics (XIP-49 §3.3.2):
// a leading kind byte plus the identifier, the same representation the
// decentralized backend uses, so one topic format spans both backends and a
// single stream can carry several topic kinds (group messages, welcomes).
// Request and response are wrapped in `oneof version` (cf. GroupMessage /
// WelcomeMessage) to leave room for future revisions. The versions are pinned
// per stream: a stream whose requests are V1 receives only V1 responses, so a
// client never has to handle a response version it did not speak first.

// Client -> server. Sent one or more times over the life of the stream.
message SubscribeRequest {
oneof version {
V1 v1 = 1;
}

message V1 {
// Each frame is exactly one of: a mutation, a Ping, or a Pong.
oneof request {
Mutate mutate = 1;
Ping ping = 2; // liveness challenge (e.g. probe the link after resuming)
Pong pong = 3; // answer to a server Ping
}

// Add and/or remove subscriptions in place (applied atomically per frame).
// Topics use the kind-prefixed binary representation shared with the
// decentralized backend (XIP-49 §3.3.2): the first byte is the topic kind,
// the remainder is the identifier. This RPC initially serves
// TOPIC_KIND_GROUP_MESSAGES_V1 (0x00, identifier = group_id) and
// TOPIC_KIND_WELCOME_MESSAGES_V1 (0x01, identifier = installation_key);
// a topic whose kind the node does not serve fails the stream with
// INVALID_ARGUMENT. Future kinds (key packages, identity updates) are
// adopted via the capabilities advertised on Started.
message Mutate {
repeated Subscription adds = 1; // begin delivering these topics
repeated bytes removes = 2; // topics to stop delivering

// Catch this Mutate's adds up to the live edge — history, TopicsLive
// markers, and the wave's CatchupComplete — but do NOT register them
// for live delivery. The markers then mean "you have everything as of
// now". Combined with half-closing the request stream, this is the
// bounded catch-up ("sync") mode: the server finishes the wave and then
// closes the stream itself. Removals in the Mutate are unaffected.
bool history_only = 3;

// Client-chosen correlation id, echoed on this wave's CatchupComplete
// so completions are attributable when waves overlap. SHOULD be unique
// per stream; 0 = no correlation requested (still echoed as 0).
uint64 mutate_id = 4;

// A topic to subscribe, with the cursor to resume from.
message Subscription {
bytes topic = 1;
// Deliver ids greater than this; 0 = from the beginning. For a newly
// joined group, a client SHOULD seed this from the welcome's encrypted
// WelcomeMetadata.message_cursor so a new membership does not refetch
// pre-join history it cannot decrypt; for a new installation's welcome
// topic, 0 is how pending welcomes are collected.
uint64 id_cursor = 2;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not want this to be a multi-cursor to make it easier to migrate to V4?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's something that each client can handle since we have a v3/d14n separation already.

}
}
}
}

// Server -> client.
message SubscribeResponse {
oneof version {
V1 v1 = 1;
}

message V1 {
oneof response {
Messages messages = 1;
Started started = 2; // sent once, immediately on open, before any catch-up
Ping ping = 3; // idle liveness challenge; receiver MUST answer with Pong
Pong pong = 4; // answer to a client Ping
TopicsLive topics_live = 5; // these topics just crossed from catch-up to live
CatchupComplete catchup_complete = 6; // a Mutate's adds are fully delivered
}

// A batch of new messages; group and welcome messages share the stream,
// depending on which subscriptions are active.
message Messages {
repeated GroupMessage group_messages = 1;
repeated WelcomeMessage welcome_messages = 2;
}

// The first frame on every stream.
message Started {
// The server's ping cadence (ms): the basis for the client's staleness
// threshold and the server's reap deadline.
uint32 keepalive_interval_ms = 1;
// Optional protocol features the node supports on this stream. The node
// silently ignores request types it does not understand, so a client
// MUST NOT send an optional request type whose capability the node did
// not advertise (it would hang waiting on a response that never comes).
repeated Capability capabilities = 2;
}

// Sent once per Mutate that adds subscriptions (a catch-up "wave"), after
// the wave's last TopicsLive: everything the Mutate asked for is delivered.
message CatchupComplete {
uint64 mutate_id = 1; // echoes the Mutate that started this wave (0 if none given)
}

// Emitted when topics finish catch-up, AFTER the last history frame for
// them — including any live messages that queued up behind the catch-up,
// which were equally historical from the client's perspective — so every
// later frame for a listed topic is live tail. Informational only: delivery
// correctness (no duplicates, no gaps) never depends on it. Re-adding a
// topic re-runs catch-up and re-emits it; receivers treat it idempotently.
message TopicsLive {
repeated bytes topics = 1; // kind-prefixed topics now tailing live
}

// Optional per-stream protocol features (none defined yet; future
// revisions add values, e.g. fetch-over-stream lookups answered with the
// same read view that feeds the stream, or new streamable topic kinds).
enum Capability {
CAPABILITY_UNSPECIFIED = 0;
}
}
}

// Liveness challenge/response, shared across versions. Either peer MAY send a
// Ping; the receiver MUST reply with a Pong echoing the nonce. The sender closes
// the stream if no Pong arrives within its deadline — how a node reaps a vanished
// peer (e.g. a mobile client the OS suspended behind a proxy that still ACKs the
// transport).
message Ping {
uint64 nonce = 1;
}

message Pong {
uint64 nonce = 1; // echoes the nonce of the Ping it answers
}

message BatchPublishCommitLogRequest {
repeated PublishCommitLogRequest requests = 1;
}
Expand Down
Loading