Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@ All notable changes to this project will be documented in this file.

Each entry lists the date and the crate versions that were released.

## 2026-05-29 — mqdb-core 0.7.3, mqdb-agent 0.8.7

### Fixed

- Optimistic-lock (CAS) conflicts on `Database::update`/`delete` could stall single-process embedded callers and, on the persistent backend, silently drop writes. `update` reads a record, merges fields across several `.await` points (schema, constraint, index locks), then commits under a compare-and-swap on the originally-read bytes; two concurrent writers to the same key both read version N, the first commits, and the second's swap failed with `Error::Conflict`. `update` is now a thin wrapper over `try_update_once` that retries on `Error::Conflict` (bounded at 32 attempts), re-reading the latest committed value and re-applying the partial-field merge each time, so concurrent same-process updates converge to a field-level last-writer-wins instead of erroring. Retry is skipped when `update` carries precomputed vault-constraint plaintext (`update_constraint_data`), which was derived upstream against a now-stale read and cannot be re-merged here; those still surface `Error::Conflict` for the caller to retry. The typed `Error::Conflict` variant is unchanged and remains the backstop after attempts are exhausted.
- `Database::delete` left stale secondary-index entries under concurrent load (the `index pointed to non-existent entity` warning). `delete` had no compare-and-swap, so a delete that read an older version committed unconditionally; if a concurrent `update` had already changed an indexed field and committed first, the delete removed only the old index key plus the data row and stranded the new index key. `delete` now CAS-guards the primary entity and retries, re-reading the fresh record so it removes the current index keys. (Cascade/set-null child rows are not individually CAS-guarded; the reported case was the primary entity.)
- `mqdb-core` fjall backend: the batch `commit` checked CAS preconditions against a snapshot and then applied the writes in a separate batch — a check-then-act that was not atomic. Under concurrency two commits could both pass their precondition check and both apply, producing a silent lost update with no `Error::Conflict` and, via the same race, stale index entries. The in-memory backend was already correct (its data write lock spans check and apply); the encrypted backend delegates the CAS to its inner backend. The fjall `commit` now serializes the precondition check and apply under a per-backend lock (the fsync stays outside the lock, so durability cost is unchanged). A before/after benchmark (agent mode, `--durability none`, concurrency=1, 20k ops/run) showed no write-throughput regression: insert ~5.5k ops/s and update ~6.9k ops/s in both builds, within run-to-run noise. New `mqdb-agent` concurrency tests cover update convergence and delete/index consistency.

## 2026-05-28 — mqdb-agent 0.8.7, mqdb-cli 0.8.8

### Added

- Recipient-scoped event confidentiality (diagram sharing phase 3, #78), behind a new opt-in `--scoped-events` flag (env `MQDB_SCOPED_EVENTS`, default off). Until now every authenticated subscriber to `$DB/{entity}/events/#` received all change events, including for records they cannot read. With the flag on, change events for ownership-enabled and derived (child) entities are published once per recipient to `$DB/u/{recipient}/events/{entity}/{id}`, where recipients are the resource owner plus its share grantees (child events resolve recipients through the parent diagram). Recipients are resolved at write time and travel with the change event, so cascade deletes still reach the owner and grantees after the parent record and its shares are removed. Global entities (no ownership/derivation) keep the single broadcast publish, unchanged. The broker enforces that a user may only subscribe to their own `$DB/u/{me}/events/#` namespace (admins may read any; only the internal event service may publish there); the legacy `$DB/+/events/#` topic is unaffected because owned-entity events are no longer published to it. Clients must subscribe to `$DB/u/{me}/events/#` instead of `$DB/{entity}/events/#` — a breaking change gated behind the flag. Authorization core verified in TLA+ (`specs/DiagramSharing.tla`: `InvEventConfidentiality` and `InvEventCompleteness` hold across 52,488 states, including reparented child states). Agent mode only; cluster parity tracked in #75; public/anonymous event topics tracked with phase 4 (#79).

## 2026-05-28 — mqdb-core 0.7.2, mqdb-agent 0.8.6, mqdb-cli 0.8.7

### Added
Expand Down
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ An owner can share any ownership-enabled entity (the motivating case is diagrams

**Child-entity derivation.** Child records (e.g. a diagram's nodes/edges) can inherit access from their parent via `--ownership-derive` (env `MQDB_OWNERSHIP_DERIVE`), a comma-separated map of `child=fk_field>parent_entity` (e.g. `nodes=diagramId>diagrams,edges=diagramId>diagrams`). A derived child's read requires `view` on the parent and create/update/delete require `edit`; the parent reference is immutable on update, so an editor cannot move a child into a diagram they cannot edit. Without a mapping a child entity is unrestricted (default), so derivation is opt-in per deployment.

**Event confidentiality.** By default change events broadcast on `$DB/{entity}/events/#` to every authenticated subscriber. Enabling `--scoped-events` (env `MQDB_SCOPED_EVENTS`) routes events for ownership-enabled and derived entities to per-recipient topics `$DB/u/{recipient}/events/{entity}/{id}` — the owner plus its share grantees (children resolve recipients through the parent). The broker only lets a user subscribe to their own `$DB/u/{me}/events/#`. Global entities keep the broadcast topic. **This is a breaking change for subscribers** (subscribe to `$DB/u/{me}/events/#` instead of `$DB/{entity}/events/#`), so it is opt-in; enable it on the broker and the client together.

#### Admin Operations

| Topic | Action |
Expand Down
2 changes: 1 addition & 1 deletion crates/mqdb-agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqdb-agent"
version = "0.8.6"
version = "0.8.7"
edition.workspace = true
license = "Apache-2.0"
authors.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion crates/mqdb-agent/src/agent/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(super) struct AuthProviderConfig<'a> {
pub acl_file: Option<&'a std::path::Path>,
pub admin_users: &'a HashSet<String>,
pub allow_anonymous: bool,
pub scoped_events: bool,
}

impl MqdbAgent {
Expand Down Expand Up @@ -182,7 +183,8 @@ impl MqdbAgent {
let protected_provider =
TopicProtectionAuthProvider::new(current_provider, config.admin_users.clone())
.with_internal_service_username(config.service_username.cloned())
.with_all_users_admin(config.allow_anonymous && config.admin_users.is_empty());
.with_all_users_admin(config.allow_anonymous && config.admin_users.is_empty())
.with_scoped_events(config.scoped_events);
broker = broker.with_auth_provider(Arc::new(protected_provider));
if config.admin_users.is_empty() {
info!("topic protection enabled (no admin users configured)");
Expand Down
10 changes: 10 additions & 0 deletions crates/mqdb-agent/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct MqdbAgent {
pub(super) http_config: std::sync::Mutex<Option<crate::http::HttpServerConfig>>,
pub(super) ownership_config: Arc<mqdb_core::types::OwnershipConfig>,
pub(super) scope_config: Arc<mqdb_core::types::ScopeConfig>,
pub(super) scoped_events: bool,
pub(super) vault_backend: Arc<dyn VaultBackend>,
#[cfg(feature = "http-api")]
pub(super) auth_rate_limiter: Arc<RateLimiter>,
Expand Down Expand Up @@ -74,6 +75,7 @@ impl MqdbAgent {
http_config: std::sync::Mutex::new(None),
ownership_config: Arc::new(mqdb_core::types::OwnershipConfig::default()),
scope_config: Arc::new(mqdb_core::types::ScopeConfig::default()),
scoped_events: false,
vault_backend: Arc::new(NoopVaultBackend),
#[cfg(feature = "http-api")]
auth_rate_limiter: Arc::new(RateLimiter::new(10)),
Expand Down Expand Up @@ -199,6 +201,12 @@ impl MqdbAgent {
self
}

#[must_use]
pub fn with_scoped_events(mut self, enabled: bool) -> Self {
self.scoped_events = enabled;
self
}

#[must_use]
pub fn with_license_expiry(mut self, expires_at: u64) -> Self {
self.license_expires_at = Some(expires_at);
Expand Down Expand Up @@ -258,6 +266,7 @@ impl MqdbAgent {
acl_file: self.auth_setup.acl_file.as_deref(),
admin_users: &admin_users,
allow_anonymous: self.auth_setup.allow_anonymous,
scoped_events: self.scoped_events,
},
)
.await?;
Expand Down Expand Up @@ -339,6 +348,7 @@ impl MqdbAgent {
acl_file: self.auth_setup.acl_file.as_deref(),
admin_users: &admin_users,
allow_anonymous: self.auth_setup.allow_anonymous,
scoped_events: self.scoped_events,
},
)
.await?;
Expand Down
91 changes: 79 additions & 12 deletions crates/mqdb-agent/src/agent/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ impl MqdbAgent {
let event_db = Arc::clone(&self.db);
let mut event_shutdown_rx = self.shutdown_tx.subscribe();
let num_partitions = self.db.num_partitions();
let scoped_events = self.scoped_events;
let ownership_config = Arc::clone(&self.ownership_config);

tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(200)).await;
Expand Down Expand Up @@ -191,7 +193,6 @@ impl MqdbAgent {
event = event_rx.recv() => {
match event {
Ok(change_event) => {
let topic = change_event.event_topic(num_partitions);
let client_id = change_event.client_id.clone();

let payload = match serde_json::to_vec(&change_event) {
Expand All @@ -202,18 +203,35 @@ impl MqdbAgent {
}
};

let mut options = mqtt5::types::PublishOptions {
qos: mqtt5::QoS::AtLeastOnce,
..Default::default()
let Some(topics) = event_publish_topics(
&event_db,
&ownership_config,
scoped_events,
num_partitions,
&change_event,
)
.await
else {
continue;
};
if let Some(ref cid) = client_id {
options.properties.user_properties.push((
"x-origin-client-id".to_string(),
cid.clone(),
));
}
if let Err(e) = client.publish_with_options(&topic, payload, options).await {
warn!("Failed to publish event: {e}");

for topic in topics {
let mut options = mqtt5::types::PublishOptions {
qos: mqtt5::QoS::AtLeastOnce,
..Default::default()
};
if let Some(ref cid) = client_id {
options.properties.user_properties.push((
"x-origin-client-id".to_string(),
cid.clone(),
));
}
if let Err(e) = client
.publish_with_options(&topic, payload.clone(), options)
.await
{
warn!("Failed to publish event: {e}");
}
}
}
Err(e) => {
Expand Down Expand Up @@ -280,3 +298,52 @@ impl MqdbAgent {
}))
}
}

async fn event_publish_topics(
db: &crate::database::Database,
ownership: &mqdb_core::types::OwnershipConfig,
scoped_events: bool,
num_partitions: u8,
event: &mqdb_core::events::ChangeEvent,
) -> Option<Vec<String>> {
if !scoped_events {
return Some(vec![event.event_topic(num_partitions)]);
}
let recipients = if let Some(precomputed) = event.recipients.clone() {
Some(precomputed)
} else {
match db
.event_recipients(ownership, &event.entity, &event.id, event.data.as_ref())
.await
{
Ok(recipients) => recipients,
Err(e) => {
warn!(
entity = %event.entity,
id = %event.id,
error = %e,
"failed to compute event recipients; dropping event"
);
return None;
}
}
};
match recipients {
Some(recipients) => {
if recipients.is_empty() {
debug!(
entity = %event.entity,
id = %event.id,
"scoped event has no recipients; dropping"
);
}
Some(
recipients
.iter()
.map(|r| format!("$DB/u/{r}/events/{}/{}", event.entity, event.id))
.collect(),
)
}
None => Some(vec![event.event_topic(num_partitions)]),
}
}
Loading