diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 316e19b..99a23d9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,3 +38,35 @@ jobs: - name: Run tests run: cargo test --workspace --exclude wasm-edge + + typecheck-react: + name: Typecheck @recached/react + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: "20.x" + + - name: Generate SDK type declarations + run: | + cd wasm-edge && npm install + # wasm-pack isn't available in this job, so create a minimal stub that + # satisfies the TypeScript import so tsc can emit sdk.d.ts for the react step. + mkdir -p pkg + node -e " + require('fs').writeFileSync('pkg/recached_edge.d.ts', + 'export class RecachedCache {}\nexport default function init(): Promise;\n' + ); + " + npx tsc + + - name: Install dependencies + run: cd recached-react && npm install --legacy-peer-deps + + - name: Typecheck + run: cd recached-react && npm run typecheck diff --git a/.github/workflows/npm.yml b/.github/workflows/npm.yml index c135188..6256067 100644 --- a/.github/workflows/npm.yml +++ b/.github/workflows/npm.yml @@ -1,4 +1,4 @@ -name: Publish Wasm Package to NPM +name: Publish Packages to NPM on: push: @@ -9,7 +9,7 @@ env: FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true jobs: - build-and-publish: + publish-wasm: name: Build and publish recached-edge to NPM runs-on: ubuntu-latest @@ -59,3 +59,35 @@ jobs: env: NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} run: cd wasm-edge/pkg && npm publish --access public + + publish-react: + name: Build and publish @recached/react to NPM + runs-on: ubuntu-latest + needs: publish-wasm + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: "20.x" + registry-url: "https://registry.npmjs.org" + + - name: Install dependencies + run: cd recached-react && npm install --legacy-peer-deps + + - name: Typecheck + run: cd recached-react && npm run typecheck + + - name: Build + run: cd recached-react && npm run build + + - name: Copy LICENSE into package + run: cp LICENSE.md recached-react/LICENSE.md + + - name: Publish to NPM + env: + NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }} + run: cd recached-react && npm publish --access public diff --git a/.gitignore b/.gitignore index c5b5c37..d1bd249 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ Thumbs.db # committing Cargo.lock is best practice to ensure reproducible builds! PLAN.md +NOTES.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 9907c3e..bd41436 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,45 @@ All notable changes to Recached are documented here. --- +## [0.1.4] — 2026-05-10 + +### Added + +**Snapshot persistence** (`server-native`) +- New `SAVE` command: blocks until the snapshot is written to disk and returns `+OK`. +- New `BGSAVE` command: spawns a background Tokio task to write the snapshot and immediately returns `+Background saving started`. The server continues accepting connections during the save. +- New `LASTSAVE` command: returns the Unix timestamp (seconds) of the most recent successful save as an integer. +- On startup, the server loads the snapshot from disk before accepting connections. Expired keys are silently skipped during restore. +- On clean shutdown (SIGTERM or Ctrl-C), a final snapshot is saved before the process exits. +- Periodic autosave runs every `RECACHED_SAVE_INTERVAL` seconds (default: 900 = 15 min). Set to `0` to disable autosave while keeping `SAVE`/`BGSAVE`/`LASTSAVE` available. +- Snapshot path is controlled by `RECACHED_SAVE_PATH` (default: `recached.rdb` in the working directory). +- Snapshot format: [MessagePack](https://msgpack.org/) via `rmp-serde`. Atomic write: data is written to a `.tmp` file then renamed, so a crash mid-save cannot corrupt the previous snapshot. +- All data types are preserved: strings, hashes, lists, sets, sorted sets, and TTLs. +- `Command::Save`, `Command::BgSave`, `Command::LastSave` added to `core-engine`; handled by the server before reaching `execute_and_record` since they require async filesystem I/O. +- `SnapshotEntry` and `SnapshotValue` public types added to `core-engine::store`. +- `KeyValueStore::snapshot()` and `KeyValueStore::restore()` methods added. + +**AOF persistence** (`server-native`) +- New `RECACHED_AOF_PATH` env var. When set, every successful write is appended to the file as a normalized RESP command immediately after execution, in addition to periodic snapshot saves. +- New `RECACHED_AOF_SYNC` env var controlling fsync policy: `always` (after every write), `everysec` (background flush once per second, default), `no` (OS-managed). +- On startup: snapshot is loaded first, then AOF commands are replayed for the delta — recovering writes made after the last snapshot. +- After each successful snapshot save, the AOF is automatically truncated. The snapshot subsumes the log, so on the next startup only the post-snapshot delta is replayed. +- Combined with snapshots, the maximum data loss window is bounded by the AOF sync interval (≤1 second with `everysec`) rather than the snapshot interval. +- `APPEND` command added to the write broadcast path so it is captured by AOF and replication. + +**Leader-follower replication** (`server-native`) +- New `RECACHED_REPLICAOF=host:port` env var. When set, the server runs as a read-only replica: it connects to the primary, loads a full snapshot, then streams all subsequent write commands in real time. +- New `RECACHED_REPL_PORT` env var (default: `6381`). The primary listens on this port for incoming replica connections. +- Initial sync protocol: the primary registers the replica's write channel first (so writes during snapshot serialization are buffered), serializes the full store to MessagePack, sends it length-prefixed over TCP, then streams subsequent writes as length-prefixed RESP strings. +- Replicas reject all write commands with `-READONLY You can't write against a read only replica.` +- Replicas reconnect automatically with exponential backoff (2 s → 4 s → … → 30 s cap) if the primary is temporarily unavailable. +- All writes on the primary flow through the unified `ServerState::on_write()` path, which handles AOF append and replica fan-out in a single call. + +**Configurable connection limit** (`server-native`) +- New `RECACHED_MAX_CONNECTIONS` env var (default: `1024`). Raising this allows high-traffic deployments to accept more concurrent clients without rebuilding from source. + +--- + ## [0.1.3] — 2026-05-09 ### Added @@ -30,7 +69,7 @@ All notable changes to Recached are documented here. --- -## [0.1.2] — 2026-05-08 +## [0.1.2] — 2026-05-02 ### Added diff --git a/Cargo.lock b/Cargo.lock index 1a8eb35..84760a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + [[package]] name = "aws-lc-rs" version = "1.16.3" @@ -101,10 +107,11 @@ dependencies = [ [[package]] name = "core-engine" -version = "0.1.3" +version = "0.1.4" dependencies = [ "dashmap", "rand", + "serde", ] [[package]] @@ -616,6 +623,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -807,6 +823,25 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rmp" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ba8be72d372b2c9b35542551678538b562e7cf86c3315773cae48dfbfe7790c" +dependencies = [ + "num-traits", +] + +[[package]] +name = "rmp-serde" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72f81bee8c8ef9b577d1681a70ebbc962c232461e397b22c208c43c04b67a155" +dependencies = [ + "rmp", + "serde", +] + [[package]] name = "rustls" version = "0.23.40" @@ -908,15 +943,47 @@ dependencies = [ "libc", ] +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "server-native" -version = "0.1.3" +version = "0.1.4" dependencies = [ "core-engine", "futures-util", "metrics", "metrics-exporter-prometheus", + "rmp-serde", "rustls-pemfile", + "serde", "tokio", "tokio-rustls", "tokio-tungstenite", @@ -1317,7 +1384,7 @@ dependencies = [ [[package]] name = "wasm-edge" -version = "0.1.3" +version = "0.1.4" dependencies = [ "core-engine", "getrandom 0.3.4", diff --git a/Cargo.toml b/Cargo.toml index 0f2ff11..99c5d79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ resolver = "2" # ── Single source of truth for all crate versions ──────────────────────────── # Members inherit with: version.workspace = true / edition.workspace = true [workspace.package] -version = "0.1.3" +version = "0.1.4" edition = "2024" license = "MIT" authors = ["ThinkGrid Labs"] @@ -36,6 +36,10 @@ metrics-exporter-prometheus = { version = "0.16", features = ["http-listener"] tokio-rustls = "0.26" rustls-pemfile = "2" +# serialization +serde = { version = "1", features = ["derive"] } +rmp-serde = "1" + # wasm wasm-bindgen = "0.2.92" js-sys = "0.3.69" diff --git a/core-engine/Cargo.toml b/core-engine/Cargo.toml index 58fdd9b..65dc385 100644 --- a/core-engine/Cargo.toml +++ b/core-engine/Cargo.toml @@ -6,3 +6,4 @@ edition.workspace = true [dependencies] dashmap.workspace = true rand.workspace = true +serde.workspace = true diff --git a/core-engine/src/cmd.rs b/core-engine/src/cmd.rs index f9bc088..ca68b1f 100644 --- a/core-engine/src/cmd.rs +++ b/core-engine/src/cmd.rs @@ -147,6 +147,12 @@ pub enum Command { // ── Observable keys ─────────────────────────────────────────────────────── Watch(Vec), Unwatch(Vec), + // ── Persistence ─────────────────────────────────────────────────────────── + Save, + BgSave, + LastSave, + // ── Replication ─────────────────────────────────────────────────────────── + ReplicaOfNoOne, Unknown(String), } @@ -966,6 +972,24 @@ impl Command { arr[1..].iter().filter_map(extract_string).collect(), )), + // ── Persistence ─────────────────────────────────────────── + "SAVE" => Ok(Command::Save), + "BGSAVE" => Ok(Command::BgSave), + "LASTSAVE" => Ok(Command::LastSave), + + // ── Replication ─────────────────────────────────────────── + "REPLICAOF" => { + need!(3); + let arg1 = extract_string(&arr[1]).unwrap_or_default().to_uppercase(); + let arg2 = extract_string(&arr[2]).unwrap_or_default().to_uppercase(); + if arg1 == "NO" && arg2 == "ONE" { + Ok(Command::ReplicaOfNoOne) + } else { + Err("ERR REPLICAOF supports only 'REPLICAOF NO ONE' at runtime" + .to_string()) + } + } + _ => Ok(Command::Unknown(cmd_name.to_owned())), } } @@ -1703,4 +1727,22 @@ mod tests { Command::ZCount("z".into(), "-inf".into(), "+inf".into()) ); } + + // ── Persistence ─────────────────────────────────────────────────────────── + + #[test] + fn save_bgsave_lastsave_parse() { + assert_eq!( + Command::from_value(array(&["SAVE"])).unwrap(), + Command::Save + ); + assert_eq!( + Command::from_value(array(&["BGSAVE"])).unwrap(), + Command::BgSave + ); + assert_eq!( + Command::from_value(array(&["LASTSAVE"])).unwrap(), + Command::LastSave + ); + } } diff --git a/core-engine/src/store.rs b/core-engine/src/store.rs index a440f3a..31c17e5 100644 --- a/core-engine/src/store.rs +++ b/core-engine/src/store.rs @@ -2,6 +2,7 @@ use crate::cmd::{Command, SetCondition, SetExpiry, ZAddOptions}; use crate::resp::Value; use dashmap::DashMap; use rand::seq::IteratorRandom; +use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -258,10 +259,29 @@ pub enum EvictionPolicy { // ── KeyValueStore ───────────────────────────────────────────────────────────── +// ── Snapshot types ──────────────────────────────────────────────────────────── + +#[derive(Serialize, Deserialize)] +pub enum SnapshotValue { + Str(String), + Hash(HashMap), + List(Vec), + Set(Vec), + ZSet(Vec<(String, f64)>), +} + +#[derive(Serialize, Deserialize)] +pub struct SnapshotEntry { + pub key: String, + pub value: SnapshotValue, + pub expires_at_ms: Option, +} + #[derive(Clone)] pub struct KeyValueStore { data: Arc>, max_keys: Option, + max_memory_bytes: Option, eviction_policy: EvictionPolicy, } @@ -276,6 +296,7 @@ impl KeyValueStore { Self { data: Arc::new(DashMap::new()), max_keys: None, + max_memory_bytes: None, eviction_policy: EvictionPolicy::NoEviction, } } @@ -284,18 +305,59 @@ impl KeyValueStore { Self { data: Arc::new(DashMap::new()), max_keys: Some(max), + max_memory_bytes: None, eviction_policy: EvictionPolicy::NoEviction, } } - pub fn with_config(max_keys: Option, eviction_policy: EvictionPolicy) -> Self { + pub fn with_config( + max_keys: Option, + max_memory_bytes: Option, + eviction_policy: EvictionPolicy, + ) -> Self { Self { data: Arc::new(DashMap::new()), max_keys, + max_memory_bytes, eviction_policy, } } + /// Approximate heap usage in bytes — key+value sizes plus a fixed overhead per entry. + pub fn approximate_memory_bytes(&self) -> usize { + self.data + .iter() + .map(|r| { + let val_size = match &r.value().value { + EntryValue::Str(s) => s.len(), + EntryValue::Hash(m) => m.iter().map(|(k, v)| k.len() + v.len()).sum(), + EntryValue::List(l) => l.iter().map(|s| s.len()).sum(), + EntryValue::Set(s) => s.iter().map(|m| m.len()).sum::(), + EntryValue::ZSet(z) => z.scores.keys().map(|m| m.len() + 8).sum(), + }; + r.key().len() + val_size + 64 + }) + .sum() + } + + /// Evict entries until memory usage is below `max_memory_bytes`, or the + /// eviction policy cannot free any more. Returns true if under limit. + pub fn try_evict_for_memory(&self) -> bool { + let limit = match self.max_memory_bytes { + Some(l) => l, + None => return true, + }; + let now = now_ms(); + loop { + if self.approximate_memory_bytes() <= limit { + return true; + } + if !self.evict_one(now) { + return false; + } + } + } + /// Returns the current value of a key for watch push notifications. /// Strings are returned as bulk strings. Complex types return nil — the /// watcher must use a type-specific command (HGETALL, LRANGE, etc.) to @@ -388,6 +450,58 @@ impl KeyValueStore { } } + pub fn snapshot(&self) -> Vec { + let now = now_ms(); + self.data + .iter() + .filter(|e| !e.is_expired(now)) + .map(|e| { + let value = match &e.value { + EntryValue::Str(s) => SnapshotValue::Str(s.clone()), + EntryValue::Hash(m) => SnapshotValue::Hash(m.clone()), + EntryValue::List(l) => SnapshotValue::List(l.iter().cloned().collect()), + EntryValue::Set(s) => SnapshotValue::Set(s.iter().cloned().collect()), + EntryValue::ZSet(z) => { + SnapshotValue::ZSet(z.scores.iter().map(|(k, &v)| (k.clone(), v)).collect()) + } + }; + SnapshotEntry { + key: e.key().clone(), + value, + expires_at_ms: e.expires_at_ms, + } + }) + .collect() + } + + pub fn restore(&self, entries: Vec) { + let now = now_ms(); + for e in entries { + if let Some(exp) = e.expires_at_ms + && now >= exp + { + continue; + } + let value = match e.value { + SnapshotValue::Str(s) => EntryValue::Str(s), + SnapshotValue::Hash(m) => EntryValue::Hash(m), + SnapshotValue::List(l) => EntryValue::List(l.into_iter().collect()), + SnapshotValue::Set(s) => EntryValue::Set(s.into_iter().collect()), + SnapshotValue::ZSet(pairs) => EntryValue::ZSet(ZSetInner { + scores: pairs.into_iter().collect(), + }), + }; + self.data.insert( + e.key, + Entry { + value, + expires_at_ms: e.expires_at_ms, + written_at_ms: now, + }, + ); + } + } + pub fn execute(&self, cmd: Command) -> Value { match cmd { // ── Core ───────────────────────────────────────────────────────── @@ -1780,6 +1894,12 @@ impl KeyValueStore { Command::Watch(_) | Command::Unwatch(_) => { Value::Error("ERR WATCH/UNWATCH only supported over WebSocket".to_string()) } + Command::Save | Command::BgSave | Command::LastSave => { + Value::Error("ERR persistence commands must be handled by the server".to_string()) + } + Command::ReplicaOfNoOne => { + Value::Error("ERR REPLICAOF NO ONE must be handled by the server".to_string()) + } } } } @@ -3208,4 +3328,93 @@ mod tests { let res = s.execute(Command::ZRevRange("z".into(), 0, -1, true)); assert_eq!(res, arr(&["b", "2", "a", "1"])); } + + // ── Snapshot / Restore ──────────────────────────────────────────────────── + + #[test] + fn snapshot_round_trip_all_types() { + let s = store(); + s.execute(Command::Set( + "str".into(), + "hello".into(), + SetOptions::default(), + )); + s.execute(Command::HSet("hash".into(), vec![("f".into(), "v".into())])); + s.execute(Command::LPush("list".into(), vec!["a".into(), "b".into()])); + s.execute(Command::SAdd("set".into(), vec!["x".into()])); + s.execute(Command::ZAdd( + "zset".into(), + ZAddOptions::default(), + vec![(1.5, "m".into())], + )); + + let entries = s.snapshot(); + assert_eq!(entries.len(), 5); + + let s2 = store(); + s2.restore(entries); + + assert_eq!(s2.execute(Command::Get("str".into())), bulk("hello")); + assert_eq!( + s2.execute(Command::HGet("hash".into(), "f".into())), + bulk("v") + ); + assert_eq!( + s2.execute(Command::LRange("list".into(), 0, -1)), + arr(&["b", "a"]) + ); + assert_eq!( + s2.execute(Command::SIsMember("set".into(), "x".into())), + int(1) + ); + assert_eq!( + s2.execute(Command::ZScore("zset".into(), "m".into())), + bulk("1.5") + ); + } + + #[test] + fn snapshot_skips_expired_keys() { + use std::time::Duration; + let s = store(); + s.execute(Command::Set( + "live".into(), + "v".into(), + SetOptions::default(), + )); + s.execute(Command::PSetEx("dead".into(), 1, "v".into())); + std::thread::sleep(Duration::from_millis(10)); + + let entries = s.snapshot(); + assert!(entries.iter().any(|e| e.key == "live")); + assert!(!entries.iter().any(|e| e.key == "dead")); + } + + #[test] + fn restore_skips_already_expired() { + let s = store(); + let entry = SnapshotEntry { + key: "ghost".into(), + value: SnapshotValue::Str("v".into()), + expires_at_ms: Some(1), + }; + s.restore(vec![entry]); + assert_eq!(s.execute(Command::DbSize), int(0)); + } + + #[test] + fn snapshot_preserves_ttl() { + let s = store(); + s.execute(Command::SetEx("k".into(), 60, "v".into())); + + let entries = s.snapshot(); + assert_eq!(entries.len(), 1); + assert!(entries[0].expires_at_ms.is_some()); + + let s2 = store(); + s2.restore(entries); + + let ttl = s2.execute(Command::Ttl("k".into())); + assert!(matches!(ttl, Value::Integer(n) if n > 0 && n <= 60)); + } } diff --git a/docs/.vitepress/config.ts b/docs/.vitepress/config.ts index 87e2860..005bba3 100644 --- a/docs/.vitepress/config.ts +++ b/docs/.vitepress/config.ts @@ -31,6 +31,7 @@ export default defineConfig({ { text: 'Guide', link: '/guide/introduction' }, { text: 'Server', link: '/server/installation' }, { text: 'Browser', link: '/browser/getting-started' }, + { text: 'React', link: '/react/getting-started' }, { text: 'Roadmap', link: '/roadmap' }, ], @@ -65,6 +66,15 @@ export default defineConfig({ ], }, ], + '/react/': [ + { + text: 'React', + items: [ + { text: 'Getting Started', link: '/react/getting-started' }, + { text: 'Hooks Reference', link: '/react/hooks-reference' }, + ], + }, + ], }, socialLinks: [ diff --git a/docs/react/getting-started.md b/docs/react/getting-started.md new file mode 100644 index 0000000..78616b8 --- /dev/null +++ b/docs/react/getting-started.md @@ -0,0 +1,164 @@ +# Getting Started + +`@recached/react` is the official React hooks package for Recached. It wraps the `recached-edge` WASM SDK and bridges it to React's rendering model — components re-render automatically whenever a key changes, whether the mutation came from the same component, another tab, or the server. + +## Requirements + +- React 18 or later +- `recached-edge` 0.1.4 or later + +## Install + +```bash +npm install @recached/react recached-edge +# or +pnpm add @recached/react recached-edge +# or +yarn add @recached/react recached-edge +``` + +--- + +## Setup + +Mount `` once near the root of your app. Every hook must be a descendant of it. + +```tsx +// main.tsx / app.tsx +import { RecachedProvider } from '@recached/react' + +export default function App() { + return ( + + + + ) +} +``` + +`` renders `null` until the WASM module has initialized and (optionally) replayed the IndexedDB WAL. Your app appears only when the cache is ready — no blank-state flicker on refresh. + +### Options + +| Option | Description | +|--------|-------------| +| `persistence` | Load the IndexedDB WAL on startup and write every mutation through. The cache survives page refreshes with no server round-trip. | +| `broadcastChannel` | Name for cross-tab sync. Tabs with the same name share mutations automatically via the BroadcastChannel API, no server needed. | +| `connect.url` | WebSocket URL of the Recached server (`ws://` or `wss://`). Mutations are pushed to the server and server-side mutations are pushed down. | +| `connect.password` | Server password when `RECACHED_PASSWORD` is configured. | + +All options are optional. Omit `connect` for a local-only in-memory cache. + +--- + +## First component + +```tsx +import { useKey, useRecached } from '@recached/react' + +function ThemeToggle() { + const cache = useRecached() + const theme = useKey('theme') ?? 'light' + + return ( + + ) +} +``` + +`useKey('theme')` re-renders `ThemeToggle` every time the `theme` key changes — from any source. Click in one tab, every other tab updates instantly. + +--- + +## With Next.js App Router + +The WASM module and WebSocket must run in a client component. Wrap your providers in a `'use client'` boundary: + +```tsx +// app/providers.tsx +'use client' + +import { RecachedProvider } from '@recached/react' +import type { ReactNode } from 'react' + +export function Providers({ children }: { children: ReactNode }) { + return ( + + {children} + + ) +} +``` + +```tsx +// app/layout.tsx +import { Providers } from './providers' + +export default function RootLayout({ children }: { children: React.ReactNode }) { + return ( + + + {children} + + + ) +} +``` + +Individual components that use `useKey` or `useRecached` must also be client components (`'use client'`). + +--- + +## With Vite + +No extra config needed — Vite handles WASM imports natively. If you encounter issues with the WASM file not being served correctly, add to `vite.config.ts`: + +```ts +import { defineConfig } from 'vite' + +export default defineConfig({ + optimizeDeps: { + exclude: ['recached-edge'], + }, +}) +``` + +--- + +## With a pre-built cache instance + +If you need to create the cache outside React (e.g. to read from it before the tree renders), pass it via the `cache` prop instead of `options`: + +```ts +// lib/cache.ts +import { createCache } from 'recached-edge' + +export const cache = await createCache({ + persistence: true, + connect: { url: 'ws://localhost:6380' }, +}) +``` + +```tsx +// main.tsx +import { RecachedProvider } from '@recached/react' +import { cache } from './lib/cache' + + + + +``` + +When the `cache` prop is set, `` skips initialization and uses the instance directly. diff --git a/docs/react/hooks-reference.md b/docs/react/hooks-reference.md new file mode 100644 index 0000000..366f2bc --- /dev/null +++ b/docs/react/hooks-reference.md @@ -0,0 +1,221 @@ +# Hooks Reference + +## `` + +```tsx + + {children} + +``` + +Context provider that initializes the cache and makes it available to all descendant hooks. Mount it once near the root of your app. + +Renders `null` until the cache is ready (WASM init + optional persistence hydration), then renders `children`. + +### Props + +| Prop | Type | Description | +|------|------|-------------| +| `options` | `CacheOptions` | Passed directly to `createCache`. Controls persistence, BroadcastChannel, and server connection. | +| `cache` | `Cache` | A pre-built `Cache` instance. When provided, `options` is ignored and initialization is skipped. | +| `children` | `ReactNode` | Your app tree. Rendered only after the cache is ready. | + +### Examples + +```tsx +// With server connection + + +// With persistence only (no server) + + +// With cross-tab sync only + + +// Full setup + +``` + +--- + +## `useRecached()` + +```ts +function useRecached(): Cache +``` + +Returns the `Cache` instance from the nearest ``. Use it to call write and imperative methods: `set`, `setEx`, `setJSON`, `del`, `publish`, `subscribe`, `clearPersistence`. + +Throws if called outside a ``. + +### Example + +```tsx +function SaveButton() { + const cache = useRecached() + + async function save() { + cache.setJSON('user:42', { id: 42, name: 'Alice' }, 300) + } + + return +} +``` + +### Writing from event handlers + +Reads (`useKey`, `useKeyJSON`) are reactive. Writes are always done imperatively via `useRecached()`. This keeps the component API simple: read reactively, write explicitly. + +```tsx +function Counter() { + const cache = useRecached() + const count = useKey('count') + const n = Number(count ?? 0) + + return ( +
+

{n}

+ + +
+ ) +} +``` + +--- + +## `useKey(key)` + +```ts +function useKey(key: string): string | null +``` + +Reactively reads a string value from the cache. Returns `null` when the key does not exist or has expired. + +The component re-renders automatically whenever `key` is mutated — from any source: + +- A write in the same component or another component in the same tab +- A write received from the server (WebSocket fan-out) +- A write from another tab (BroadcastChannel sync) + +Built on React 18's [`useSyncExternalStore`](https://react.dev/reference/react/useSyncExternalStore) — safe with concurrent rendering and Strict Mode. + +### Example + +```tsx +function StatusBadge() { + const status = useKey('system:status') + return {status ?? 'unknown'} +} +``` + +### Dynamic keys + +The `key` argument is tracked. Changing it reads the new key immediately: + +```tsx +function UserStatus({ userId }: { userId: string }) { + const status = useKey(`user:${userId}:status`) + return {status ?? 'offline'} +} +``` + +### SSR + +On the server, `useKey` always returns `null` (there is no WASM store server-side). Handle it like any initially-absent value: + +```tsx +const theme = useKey('theme') ?? 'light' +``` + +--- + +## `useKeyJSON(key)` + +```ts +function useKeyJSON(key: string): T | null +``` + +Same as `useKey` but JSON-parses the stored string. Returns `null` when the key is missing, expired, or the stored value is not valid JSON. + +### Example + +```tsx +interface CartItem { + id: string + name: string + qty: number + price: number +} + +function CartSummary() { + const cache = useRecached() + const items = useKeyJSON('cart') ?? [] + const total = items.reduce((sum, item) => sum + item.price * item.qty, 0) + + function removeItem(id: string) { + cache.setJSON('cart', items.filter((i) => i.id !== id)) + } + + return ( +
+
    + {items.map((item) => ( +
  • + {item.name} × {item.qty} + +
  • + ))} +
+

Total: ${total.toFixed(2)}

+
+ ) +} +``` + +### Writing JSON + +Use `cache.setJSON()` from `useRecached()` to write structured data back: + +```tsx +cache.setJSON('cart', updatedItems) // no expiry +cache.setJSON('cart', updatedItems, 3600) // expires in 1 hour +``` + +--- + +## Reactivity model + +Every mutation — regardless of source — fires the internal notification bus, which triggers all `useKey` and `useKeyJSON` subscribers to re-read their key and schedule a re-render if the value changed. + +``` +cache.set('key', value) ← local write + │ + ├─ WASM store updated + ├─ notify all useKey('key') subscribers → re-render + ├─ WebSocket send → server → fan-out to other clients + │ └─ WASM store updated → notify → re-render + └─ BroadcastChannel post → other tabs + └─ WASM store updated → notify → re-render +``` + +`useSyncExternalStore` ensures React reads a consistent snapshot — no tearing, no stale closures, compatible with concurrent features like `` and `startTransition`. + +--- + +## TypeScript + +All hooks are fully typed. `useKeyJSON` infers the generic `T` from the type argument: + +```ts +const user = useKeyJSON('user:42') // User | null +const items = useKeyJSON('tags') // string[] | null +``` + +`useRecached()` returns the full `Cache` type with all methods typed. diff --git a/docs/roadmap.md b/docs/roadmap.md index 09ceb59..45118c5 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -1,53 +1,7 @@ # Roadmap -## What shipped - -The following features are implemented and stable. - -**Protocol & server** - -- RESP protocol — full parser and serializer, fragmentation handling, depth-limited (no stack-overflow DoS) -- TCP server (port 6379) — compatible with any Redis client (`ioredis`, `node-redis`, `redis-py`, `Jedis`, etc.) -- WebSocket sync (port 6380) — real-time mutation broadcast between server and browser WASM instances -- Sender-ID dedup filter — browser clients do not double-apply their own mutations -- `RECACHED_PASSWORD` + brute-force lockout after 5 consecutive failed `AUTH` attempts -- `RECACHED_ALLOW_IPS` — comma-separated IP allowlist with validated IP parsing -- `RECACHED_MAX_KEYS` — hard key cap -- Connection semaphore (max 1024 concurrent connections) -- Eviction policies: `noeviction`, `lru`, `allkeys-random`, `volatile-lru`, `volatile-ttl` -- Background active eviction (1s sweep) + lazy eviction on every read -- TLS on both ports (`RECACHED_TLS_CERT` + `RECACHED_TLS_KEY`) -- Prometheus metrics (`RECACHED_METRICS_PORT`, scrape at `/metrics`) -- Structured `tracing` logs with configurable level via `RUST_LOG` -- Docker image (`ghcr.io/thinkgrid-labs/recached`) -- Homebrew formula - -**Commands** - -See [Commands](/server/commands) for the full list. In summary: `PING`, `AUTH`, all String commands, Expiry commands, Key management (`DEL`, `EXISTS`, `TYPE`, `RENAME`, `KEYS`, `SCAN`, `DBSIZE`, `FLUSHDB`), Hash, List, Set, Sorted Set, Transactions (`MULTI`/`EXEC`/`DISCARD`), Pub/Sub (`SUBSCRIBE`, `UNSUBSCRIBE`, `PSUBSCRIBE`, `PUNSUBSCRIBE`, `PUBLISH`), and WebSocket-only observable keys (`WATCH`/`UNWATCH`). - -**Browser (WASM)** - -- `recached-edge` npm package — TypeScript SDK for the browser -- `RecachedCache` class — zero-latency local reads, all cache types -- WebSocket sync — connect to port 6380 and receive server mutations automatically -- Observable keys (`cache.watch()`) — callbacks on key change from any source -- Pub/Sub over WebSocket — `subscribe()` and `publish()` in the browser -- IndexedDB WAL persistence — cache survives page refresh -- BroadcastChannel cross-tab sync — all tabs in the same origin share mutations - ---- - ## Planned -### Delta sync (`SYNC `) - -Currently, when a WebSocket client reconnects after a gap, the server performs a full resync: it sends the current value of all keys. For most applications this is fine, but for high write-rate deployments with large key sets, a full resync on every reconnect adds unnecessary data transfer. - -The plan: the server maintains an in-memory ring buffer of recent mutations (configurable depth, e.g. last 10,000 writes). Each mutation has a sequence number. On reconnect, the client sends `SYNC ` and the server replays only the mutations that occurred after that sequence number. If the client's sequence number is older than the ring buffer, the server falls back to a full resync. - -The `wasm-edge` module already tracks a WAL sequence number — this becomes the cursor for delta sync. - ### WASI target A `wasm32-wasip1` build of `wasm-edge` for Cloudflare Workers and Deno Deploy. This would allow the Recached WASM module to run at the edge as a cache layer between origin and CDN, with the same API as the browser client. diff --git a/docs/server/commands.md b/docs/server/commands.md index 57f6341..cb54f5c 100644 --- a/docs/server/commands.md +++ b/docs/server/commands.md @@ -232,6 +232,32 @@ await pub.publish('events:orders', JSON.stringify({ id: 123, status: 'shipped' } --- +## Persistence + +Snapshot commands write the in-memory store to disk in MessagePack format. The snapshot path and autosave interval are controlled by [`RECACHED_SAVE_PATH` and `RECACHED_SAVE_INTERVAL`](/server/configuration#environment-variable-reference). + +| Command | Description | +|---|---| +| `SAVE` | Synchronously writes a snapshot to disk. Blocks until the file is written. Returns `OK` on success. | +| `BGSAVE` | Triggers a background snapshot. Returns immediately; the save runs in a background task while the server continues accepting connections. | +| `LASTSAVE` | Returns the Unix timestamp (seconds) of the most recent successful snapshot. Returns the server start time if no save has completed yet. | + +### Example + +```bash +# Trigger a background save and check when it completed +BGSAVE # +Background saving started +# ... time passes ... +LASTSAVE # (integer) 1746794400 +``` + +```bash +# Force a synchronous save (blocks until done — use BGSAVE in production) +SAVE # +OK +``` + +--- + ## Observable Keys (WebSocket-only) `WATCH` and `UNWATCH` are Recached-specific commands available only over WebSocket connections (port 6380). They have different semantics from Redis's `WATCH` (which is used for optimistic locking with transactions). diff --git a/docs/server/configuration.md b/docs/server/configuration.md index 28d5182..cdd894b 100644 --- a/docs/server/configuration.md +++ b/docs/server/configuration.md @@ -11,6 +11,13 @@ Recached is configured entirely through environment variables. There is no confi | `RECACHED_MAX_KEYS` | _(unlimited)_ | Maximum number of keys in the store. When this limit is reached, behavior depends on `RECACHED_EVICTION`. If set to `noeviction` (the default), write commands that would exceed the cap return an error. | | `RECACHED_EVICTION` | `noeviction` | Eviction policy when `RECACHED_MAX_KEYS` is reached. See eviction policies below. | | `RECACHED_METRICS_PORT` | `9091` | Port for the Prometheus metrics HTTP server. Metrics are available at `/metrics`. Set to `0` to disable. | +| `RECACHED_SAVE_PATH` | `recached.rdb` | Path to the snapshot file. The server loads this file on startup and writes to it on `SAVE`, `BGSAVE`, autosave, and clean shutdown. | +| `RECACHED_SAVE_INTERVAL` | `900` | Autosave interval in seconds. The server automatically saves a snapshot in the background at this interval. Set to `0` to disable autosave (manual `SAVE`/`BGSAVE` still work). | +| `RECACHED_AOF_PATH` | _(disabled)_ | Path to the append-only file. When set, every write command is appended to this file in addition to snapshot saves. On startup the snapshot is loaded first, then AOF commands are replayed for the delta. The AOF is truncated after each successful snapshot save. | +| `RECACHED_AOF_SYNC` | `everysec` | AOF fsync policy. `always`: fsync after every write (safest, slowest). `everysec`: fsync once per second (default, good balance). `no`: let the OS decide (fastest, least safe). | +| `RECACHED_MAX_CONNECTIONS` | `1024` | Maximum number of concurrent client connections (TCP + WebSocket combined). New connections are dropped when the limit is reached. | +| `RECACHED_REPL_PORT` | `6381` | TCP port the primary listens on for incoming replica connections. Only active when `RECACHED_REPLICAOF` is not set (i.e. this server is a primary). | +| `RECACHED_REPLICAOF` | _(none)_ | Set to `host:port` to run this server as a read-only replica. On startup it connects to the primary, receives a full snapshot, and then streams all subsequent writes. Reconnects automatically with exponential backoff on disconnect. | | `RECACHED_TLS_CERT` | _(none)_ | Path to a PEM-encoded TLS certificate file. TLS is enabled on both ports when this and `RECACHED_TLS_KEY` are set. | | `RECACHED_TLS_KEY` | _(none)_ | Path to a PEM-encoded TLS private key file. If either `RECACHED_TLS_CERT` or `RECACHED_TLS_KEY` is missing, the server falls back to plain TCP/WS. | | `RUST_LOG` | `info` | Log level. Accepts `error`, `warn`, `info`, `debug`, `trace`. Module-specific: `RUST_LOG=recached=debug,tokio=warn`. | @@ -111,6 +118,56 @@ scrape_configs: Available metrics include key count, command latency histograms, active connections, and WebSocket client count. +### With AOF + snapshot (strong durability) + +Combining snapshots with an append-only file means you lose at most a few writes on crash, rather than up to 15 minutes of writes with snapshots alone. + +```bash +RECACHED_SAVE_PATH="/data/recached.rdb" \ +RECACHED_AOF_PATH="/data/recached.aof" \ +RECACHED_AOF_SYNC="everysec" \ +recached-server +``` + +On startup: snapshot is loaded first, then any AOF commands written after the snapshot are replayed. The AOF is automatically truncated after each successful snapshot save. + +### With leader-follower replication + +Run a primary and one or more read-only replicas. Replicas receive a full snapshot on connect, then stream every subsequent write in real time. + +```bash +# Primary (default replication port 6381) +RECACHED_SAVE_PATH="/data/recached.rdb" \ +RECACHED_REPL_PORT="6381" \ +recached-server + +# Replica (connects to primary, rejects writes) +RECACHED_REPLICAOF="primary-host:6381" \ +recached-server +``` + +Replicas reconnect automatically with exponential backoff (2s → 4s → … → 30s cap) if the primary is temporarily unavailable. Write commands sent to a replica return `-READONLY`. + +### With snapshot persistence + +By default the server saves a snapshot every 15 minutes to `recached.rdb` in the working directory. On startup it restores from that file automatically. + +```bash +# Custom path and 5-minute autosave +RECACHED_SAVE_PATH="/var/lib/recached/dump.rdb" \ +RECACHED_SAVE_INTERVAL="300" \ +recached-server +``` + +```bash +# Disable autosave — trigger saves manually with BGSAVE +RECACHED_SAVE_PATH="/data/recached.rdb" \ +RECACHED_SAVE_INTERVAL="0" \ +recached-server +``` + +The snapshot file is written atomically: the server writes to a `.tmp` file and renames it into place, so a crash mid-save cannot corrupt the previous snapshot. Expired keys are silently skipped on restore. On a clean shutdown (SIGTERM or Ctrl-C), a final snapshot is saved before the process exits. + ### High-connection workloads The server accepts up to 1024 concurrent connections by default (enforced with a connection semaphore). This limit is not currently configurable via environment variable — if you need more, build from source and adjust the constant in `server-native/src/main.rs`. diff --git a/recached-react/.gitignore b/recached-react/.gitignore new file mode 100644 index 0000000..62ccde4 --- /dev/null +++ b/recached-react/.gitignore @@ -0,0 +1,4 @@ +node_modules/ +dist/ +*.tsbuildinfo +.DS_Store diff --git a/recached-react/README.md b/recached-react/README.md new file mode 100644 index 0000000..4812f6c --- /dev/null +++ b/recached-react/README.md @@ -0,0 +1,208 @@ +# @recached/react + +Official React hooks for [Recached](https://github.com/thinkgrid-labs/recached) — zero-latency reactive cache with automatic server sync and cross-tab sharing. + +## Features + +- **Zero-latency reads** — all reads are served from local WASM memory, no network round-trip +- **Automatic re-renders** — components update when a key changes from any source: local writes, server WebSocket push, or BroadcastChannel cross-tab sync +- **React 18 concurrent-safe** — built on `useSyncExternalStore`, no tearing +- **TypeScript-first** — full type inference including `useKeyJSON` + +## Requirements + +- React 18 or later +- `recached-edge` 0.1.4 or later (peer dependency) + +## Installation + +```bash +npm install @recached/react recached-edge +``` + +## Quick start + +Mount `` once near the root of your app, then use `useKey` anywhere inside it. + +```tsx +import { RecachedProvider, useKey, useRecached } from '@recached/react'; + +function App() { + return ( + + + + ); +} + +function Counter() { + const cache = useRecached(); + const count = useKey('count'); + + return ( + + ); +} +``` + +Clicking the button updates `count` in the WASM store, notifies all `useKey('count')` subscribers in the same tab, syncs to the server, and fans out to all other connected tabs and clients — all without a page reload. + +## API + +### `` + +```tsx + + {children} + +``` + +Creates and provides a `Cache` instance to all descendants. Renders `null` until the cache is ready (WASM init + optional persistence hydration). + +| Prop | Type | Description | +|------|------|-------------| +| `options` | `CacheOptions` | Passed to `createCache`. Controls persistence, BroadcastChannel, and server connection. | +| `cache` | `Cache` | Pass a pre-built `Cache` instance instead of creating one. When set, `options` is ignored. | + +```tsx +// With server connection + + +// With persistence (survives page refresh) + + +// Cross-tab sync only (no server) + + +// Pre-built instance (advanced) +const cache = await createCache({ ... }); + +``` + +### `useRecached()` + +```ts +function useRecached(): Cache +``` + +Returns the `Cache` instance from the nearest ``. Use this to call `set`, `setEx`, `setJSON`, `del`, `publish`, and other write or imperative methods. + +Throws if called outside a ``. + +```tsx +function SaveButton() { + const cache = useRecached(); + return ( + + ); +} +``` + +### `useKey(key)` + +```ts +function useKey(key: string): string | null +``` + +Reactively reads a string value. Returns `null` when the key does not exist or has expired. Re-renders the component automatically whenever the key changes — from any mutation source. + +```tsx +const theme = useKey('theme'); // "dark" | "light" | null +``` + +### `useKeyJSON(key)` + +```ts +function useKeyJSON(key: string): T | null +``` + +Same as `useKey` but JSON-parses the value. Returns `null` on a missing key, expired key, or invalid JSON. + +```tsx +interface User { id: number; name: string } + +function UserCard() { + const user = useKeyJSON('user:42'); + if (!user) return ; + return

{user.name}

; +} +``` + +## Reactivity model + +Every write — whether it comes from the same component, another component in the same tab, another tab via BroadcastChannel, or another client via the server — fires the mutation bus, which causes all `useKey` / `useKeyJSON` subscribers to re-read their key and re-render if the value changed. + +``` +Local write (cache.set) + └─▶ WASM store update + └─▶ notify_mutation → re-render all useKey subscribers + └─▶ WebSocket send → server fan-out → other clients + └─▶ BroadcastChannel post → other tabs + └─▶ WASM store update → notify_mutation → re-render +``` + +## Examples + +### Theme toggle + +```tsx +function ThemeToggle() { + const cache = useRecached(); + const theme = useKey('theme') ?? 'light'; + return ( + + ); +} +``` + +### Shared shopping cart + +```tsx +interface CartItem { id: string; qty: number } + +function Cart() { + const cache = useRecached(); + const items = useKeyJSON('cart') ?? []; + + function addItem(id: string) { + const updated = [...items, { id, qty: 1 }]; + cache.setJSON('cart', updated, 3600); + } + + return ( +
    + {items.map((item) =>
  • {item.id} × {item.qty}
  • )} +
+ ); +} +``` + +### With expiry + +```tsx +function SessionBanner() { + const cache = useRecached(); + const session = useKey('session'); + + if (!session) return ; + return

Logged in — session expires soon

; +} + +// Elsewhere: set with 30-minute TTL +cache.setEx('session', userId, 1800); +``` + +## License + +MIT diff --git a/recached-react/package-lock.json b/recached-react/package-lock.json new file mode 100644 index 0000000..0e6ba7f --- /dev/null +++ b/recached-react/package-lock.json @@ -0,0 +1,60 @@ +{ + "name": "@recached/react", + "version": "0.1.4", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "@recached/react", + "version": "0.1.4", + "license": "MIT", + "devDependencies": { + "@types/react": "^18", + "typescript": "^5.9.3" + }, + "peerDependencies": { + "react": ">=18", + "recached-edge": ">=0.1.4" + } + }, + "node_modules/@types/prop-types": { + "version": "15.7.15", + "resolved": "https://registry.npmjs.org/@types/prop-types/-/prop-types-15.7.15.tgz", + "integrity": "sha512-F6bEyamV9jKGAFBEmlQnesRPGOQqS2+Uwi0Em15xenOxHaf2hv6L8YCVn3rPdPJOiJfPiCnLIRyvwVaqMY3MIw==", + "dev": true, + "license": "MIT" + }, + "node_modules/@types/react": { + "version": "18.3.28", + "resolved": "https://registry.npmjs.org/@types/react/-/react-18.3.28.tgz", + "integrity": "sha512-z9VXpC7MWrhfWipitjNdgCauoMLRdIILQsAEV+ZesIzBq/oUlxk0m3ApZuMFCXdnS4U7KrI+l3WRUEGQ8K1QKw==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/prop-types": "*", + "csstype": "^3.2.2" + } + }, + "node_modules/csstype": { + "version": "3.2.3", + "resolved": "https://registry.npmjs.org/csstype/-/csstype-3.2.3.tgz", + "integrity": "sha512-z1HGKcYy2xA8AGQfwrn0PAy+PB7X/GSj3UVJW9qKyn43xWa+gl5nXmU4qqLMRzWVLFC8KusUX8T/0kCiOYpAIQ==", + "dev": true, + "license": "MIT" + }, + "node_modules/typescript": { + "version": "5.9.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", + "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", + "dev": true, + "license": "Apache-2.0", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + } + } +} diff --git a/recached-react/package.json b/recached-react/package.json new file mode 100644 index 0000000..3241662 --- /dev/null +++ b/recached-react/package.json @@ -0,0 +1,50 @@ +{ + "name": "@recached/react", + "version": "0.1.4", + "description": "Official React hooks for Recached \u2014 zero-latency reactive cache", + "type": "module", + "main": "./dist/index.js", + "module": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.js", + "default": "./dist/index.js" + } + }, + "files": [ + "dist/" + ], + "scripts": { + "build": "tsc", + "typecheck": "tsc --noEmit", + "version:patch": "npm version patch --no-git-tag-version", + "version:minor": "npm version minor --no-git-tag-version", + "version:major": "npm version major --no-git-tag-version" + }, + "peerDependencies": { + "react": ">=18", + "recached-edge": ">=0.1.4" + }, + "devDependencies": { + "@types/react": "^18", + "typescript": "^5.9.3" + }, + "keywords": [ + "recached", + "react", + "hooks", + "cache", + "wasm", + "real-time", + "zero-latency" + ], + "license": "MIT", + "repository": { + "type": "git", + "url": "https://github.com/thinkgrid-labs/recached" + }, + "homepage": "https://github.com/thinkgrid-labs/recached/tree/main/recached-react", + "bugs": "https://github.com/thinkgrid-labs/recached/issues" +} diff --git a/recached-react/src/context.tsx b/recached-react/src/context.tsx new file mode 100644 index 0000000..ce00edd --- /dev/null +++ b/recached-react/src/context.tsx @@ -0,0 +1,66 @@ +import { + createContext, + useContext, + useEffect, + useState, + type ReactNode, +} from 'react'; +import { createCache, type Cache, type CacheOptions } from 'recached-edge'; + +const RecachedCtx = createContext(null); + +interface ProviderProps { + /** + * Options passed to `createCache`. Ignored when `cache` is provided. + */ + options?: CacheOptions; + /** + * Pass a pre-built `Cache` instance (e.g. one you created outside React). + * When provided, `options` is ignored and no lifecycle management is done. + */ + cache?: Cache; + children: ReactNode; +} + +/** + * Mount this once near the root of your app. All `useKey`, `useKeyJSON`, and + * `usePubSub` calls inside must be descendants of this provider. + * + * ```tsx + * + * + * + * ``` + */ +export function RecachedProvider({ options, cache: prebuilt, children }: ProviderProps) { + const [cache, setCache] = useState(prebuilt ?? null); + + useEffect(() => { + if (prebuilt) return; + let cancelled = false; + createCache(options).then((c) => { + if (!cancelled) setCache(c); + }); + return () => { + cancelled = true; + }; + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + + if (!cache) return null; + return {children}; +} + +/** + * Return the raw `Cache` instance from the nearest ``. + * Use this when you need to call `set`, `del`, `publish`, etc. imperatively. + * + * Throws if called outside a ``. + */ +export function useRecached(): Cache { + const cache = useContext(RecachedCtx); + if (!cache) { + throw new Error('useRecached must be called inside a .'); + } + return cache; +} diff --git a/recached-react/src/index.ts b/recached-react/src/index.ts new file mode 100644 index 0000000..e5687a2 --- /dev/null +++ b/recached-react/src/index.ts @@ -0,0 +1,2 @@ +export { RecachedProvider, useRecached } from './context'; +export { useKey, useKeyJSON } from './useKey'; diff --git a/recached-react/src/useKey.ts b/recached-react/src/useKey.ts new file mode 100644 index 0000000..7bc9a4b --- /dev/null +++ b/recached-react/src/useKey.ts @@ -0,0 +1,59 @@ +import { useSyncExternalStore } from 'react'; +import { useRecached } from './context'; + +/** + * Reactively read a string key from the Recached store. + * + * The component re-renders automatically whenever the key is written or + * deleted — whether the mutation originated locally, from another tab via + * BroadcastChannel, or from another client via the server WebSocket. + * + * Returns `null` when the key does not exist or has expired. + * + * Built on React 18's `useSyncExternalStore` — safe with concurrent features. + * + * ```tsx + * function ThemeButton() { + * const cache = useRecached(); + * const theme = useKey('theme'); + * return ( + * + * ); + * } + * ``` + */ +export function useKey(key: string): string | null { + const cache = useRecached(); + return useSyncExternalStore( + (cb) => cache.onMutation(cb), + () => cache.get(key), + () => null, + ); +} + +/** + * Reactively read a JSON-parsed value from the Recached store. + * + * Behaves identically to {@link useKey} but parses the stored string as JSON. + * Returns `null` when the key is missing, expired, or contains invalid JSON. + * + * ```tsx + * interface User { id: number; name: string } + * + * function UserCard() { + * const user = useKeyJSON('user:42'); + * if (!user) return ; + * return

{user.name}

; + * } + * ``` + */ +export function useKeyJSON(key: string): T | null { + const cache = useRecached(); + return useSyncExternalStore( + (cb) => cache.onMutation(cb), + () => cache.getJSON(key), + () => null, + ); +} diff --git a/recached-react/tsconfig.json b/recached-react/tsconfig.json new file mode 100644 index 0000000..41f3013 --- /dev/null +++ b/recached-react/tsconfig.json @@ -0,0 +1,20 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "ESNext", + "moduleResolution": "bundler", + "jsx": "react-jsx", + "strict": true, + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "outDir": "./dist", + "rootDir": "./src", + "skipLibCheck": true, + "baseUrl": ".", + "paths": { + "recached-edge": ["../wasm-edge/sdk.d.ts"] + } + }, + "include": ["src"] +} diff --git a/scripts/bump-version.sh b/scripts/bump-version.sh index ab1b25f..cd0f860 100755 --- a/scripts/bump-version.sh +++ b/scripts/bump-version.sh @@ -31,6 +31,25 @@ EOF echo "Bumped $CURRENT → $NEW_VERSION in $CARGO_TOML" +# Update package.json files. +update_package_json() { + local pkg="$1" + python3 - "$pkg" "$NEW_VERSION" <<'PYEOF' +import json, sys +path, version = sys.argv[1], sys.argv[2] +with open(path) as f: + data = json.load(f) +data['version'] = version +with open(path, 'w') as f: + json.dump(data, f, indent=2) + f.write('\n') +PYEOF + echo "Bumped $NEW_VERSION in $pkg" +} + +update_package_json "$ROOT/wasm-edge/package.json" +update_package_json "$ROOT/recached-react/package.json" + # Verify the workspace resolves cleanly. echo "Verifying workspace..." cargo check --workspace --exclude wasm-edge --quiet diff --git a/server-native/Cargo.toml b/server-native/Cargo.toml index d215fe8..8bbd2d2 100644 --- a/server-native/Cargo.toml +++ b/server-native/Cargo.toml @@ -14,3 +14,5 @@ metrics.workspace = true metrics-exporter-prometheus.workspace = true tokio-rustls.workspace = true rustls-pemfile.workspace = true +serde.workspace = true +rmp-serde.workspace = true diff --git a/server-native/src/main.rs b/server-native/src/main.rs index e0a0018..e1409aa 100644 --- a/server-native/src/main.rs +++ b/server-native/src/main.rs @@ -1,16 +1,19 @@ use core_engine::cmd::{Command, SetExpiry, ZAddCondition}; use core_engine::resp::Value; -use core_engine::store::{EvictionPolicy, KeyValueStore}; +use core_engine::store::{EvictionPolicy, KeyValueStore, SnapshotEntry}; use futures_util::{SinkExt, StreamExt}; use metrics::{counter, gauge}; use rustls_pemfile::{certs, private_key}; use std::collections::{HashMap, HashSet}; +use std::io::ErrorKind; use std::net::IpAddr; +use std::path::PathBuf; use std::str::FromStr; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{Semaphore, broadcast, mpsc}; use tokio_rustls::TlsAcceptor; use tokio_rustls::rustls::ServerConfig; @@ -141,6 +144,10 @@ fn command_name(cmd: &Command) -> &'static str { Command::Publish(_, _) => "publish", Command::Watch(_) => "watch", Command::Unwatch(_) => "unwatch", + Command::Save => "save", + Command::BgSave => "bgsave", + Command::LastSave => "lastsave", + Command::ReplicaOfNoOne => "replicaof", Command::Unknown(_) => "unknown", } } @@ -201,10 +208,434 @@ const MAX_TCP_READ_BUFFER_BYTES: usize = 64 * 1024 * 1024; // 64 MB per connecti const MAX_MULTI_QUEUE_LEN: usize = 10_000; const MAX_WATCHES_PER_CONN: usize = 1_024; const BROADCAST_CHANNEL_CAPACITY: usize = 512; -const MAX_CONNECTIONS: usize = 1024; +const DEFAULT_MAX_CONNECTIONS: usize = 1024; const MAX_AUTH_FAILURES: u32 = 5; const EVICTION_INTERVAL_SECS: u64 = 1; +// ── snapshot persistence ────────────────────────────────────────────────────── + +struct SnapshotConfig { + path: PathBuf, + last_save: AtomicI64, +} + +fn now_unix_secs() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64 +} + +/// Parse a human-readable memory size string (e.g. "512mb", "1gb", "262144") +/// into a byte count. Returns None on parse failure. +fn parse_memory_bytes(s: &str) -> Option { + let s = s.trim().to_lowercase(); + if let Some(n) = s.strip_suffix("gb") { + n.trim() + .parse::() + .ok() + .map(|n| n * 1024 * 1024 * 1024) + } else if let Some(n) = s.strip_suffix("mb") { + n.trim().parse::().ok().map(|n| n * 1024 * 1024) + } else if let Some(n) = s.strip_suffix("kb") { + n.trim().parse::().ok().map(|n| n * 1024) + } else { + s.parse().ok() + } +} + +async fn save_snapshot(store: &KeyValueStore, cfg: &SnapshotConfig) { + let entries = store.snapshot(); + let count = entries.len(); + let tmp = cfg.path.with_extension("tmp"); + match rmp_serde::to_vec(&entries) { + Err(e) => warn!("Snapshot serialize failed: {}", e), + Ok(bytes) => match tokio::fs::write(&tmp, &bytes).await { + Err(e) => warn!("Snapshot write failed: {}", e), + Ok(()) => match tokio::fs::rename(&tmp, &cfg.path).await { + Err(e) => warn!("Snapshot rename failed: {}", e), + Ok(()) => { + cfg.last_save.store(now_unix_secs(), Ordering::Relaxed); + info!("Snapshot saved: {} entries → {:?}", count, cfg.path); + } + }, + }, + } +} + +async fn load_snapshot(store: &KeyValueStore, path: &std::path::Path) -> bool { + match tokio::fs::read(path).await { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + info!("No snapshot at {:?}, starting fresh", path); + false + } + Err(e) => { + warn!("Snapshot read failed: {}", e); + false + } + Ok(bytes) => match rmp_serde::from_slice::>(&bytes) { + Err(e) => { + warn!("Snapshot deserialize failed: {}", e); + false + } + Ok(entries) => { + let count = entries.len(); + store.restore(entries); + info!("Snapshot loaded: {} entries ← {:?}", count, path); + true + } + }, + } +} + +// ── AOF ─────────────────────────────────────────────────────────────────────── + +#[derive(Clone, Copy, PartialEq)] +enum AofSync { + Always, + EverySec, + No, +} + +struct AofWriter { + #[allow(dead_code)] + path: PathBuf, + file: tokio::sync::Mutex, + sync: AofSync, +} + +impl AofWriter { + async fn open(path: PathBuf, sync: AofSync) -> std::io::Result { + let file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await?; + Ok(Self { + path, + file: tokio::sync::Mutex::new(file), + sync, + }) + } + + async fn append(&self, resp: &str) { + let mut f = self.file.lock().await; + if f.write_all(resp.as_bytes()).await.is_err() { + warn!("AOF write failed"); + return; + } + if self.sync == AofSync::Always { + let _ = f.flush().await; + } + } + + async fn flush(&self) { + let _ = self.file.lock().await.flush().await; + } + + async fn truncate(&self) { + match self.file.lock().await.set_len(0).await { + Ok(()) => info!("AOF truncated after snapshot save"), + Err(e) => warn!("AOF truncate failed: {}", e), + } + } +} + +async fn replay_aof(store: &KeyValueStore, path: &std::path::Path) -> usize { + let bytes = match tokio::fs::read(path).await { + Err(e) if e.kind() == ErrorKind::NotFound => return 0, + Err(e) => { + warn!("AOF read failed: {}", e); + return 0; + } + Ok(b) => b, + }; + let mut replayed = 0usize; + let mut offset = 0; + while offset < bytes.len() { + match Value::parse(&bytes[offset..]) { + Ok((value, consumed)) => { + offset += consumed; + if let Ok(cmd) = Command::from_value(value) { + store.execute(cmd); + replayed += 1; + } + } + Err(ref e) if e == "Incomplete" => break, + Err(_) => { + warn!("AOF corrupted at offset {}, stopping replay", offset); + break; + } + } + } + if replayed > 0 { + info!("AOF replayed: {} commands ← {:?}", replayed, path); + } + replayed +} + +// ── Replication ─────────────────────────────────────────────────────────────── + +type ReplSender = mpsc::UnboundedSender>; +type ReplRegistry = Arc>>; + +// ── Server state ────────────────────────────────────────────────────────────── + +struct ServerState { + snap: Arc, + aof: Option>, + replicas: ReplRegistry, + /// true = currently acting as a read-only replica + is_replica: std::sync::atomic::AtomicBool, +} + +impl ServerState { + fn is_replica(&self) -> bool { + self.is_replica.load(Ordering::Relaxed) + } + + fn promote_to_primary(&self) { + self.is_replica.store(false, Ordering::Relaxed); + info!("REPLICAOF NO ONE: promoted to primary — writes now accepted"); + } + + /// Called after every successful write: appends to AOF and fans out to replicas. + async fn on_write(&self, resp: &str) { + if let Some(aof) = &self.aof { + aof.append(resp).await; + } + let bytes = resp.as_bytes().to_vec(); + let mut reg = self.replicas.lock().await; + reg.retain(|tx| tx.send(bytes.clone()).is_ok()); + } + + /// Save snapshot then truncate AOF (snapshot subsumes the log). + async fn save(&self, store: &KeyValueStore) { + save_snapshot(store, &self.snap).await; + if let Some(aof) = &self.aof { + aof.truncate().await; + } + } +} + +fn is_write_command(cmd: &Command) -> bool { + matches!( + cmd, + Command::Set(..) + | Command::Del(..) + | Command::Unlink(..) + | Command::Append(..) + | Command::GetSet(..) + | Command::MSet(..) + | Command::SetNx(..) + | Command::SetEx(..) + | Command::PSetEx(..) + | Command::Incr(..) + | Command::Decr(..) + | Command::IncrBy(..) + | Command::DecrBy(..) + | Command::Expire(..) + | Command::PExpire(..) + | Command::ExpireAt(..) + | Command::PExpireAt(..) + | Command::Persist(..) + | Command::FlushDb + | Command::Rename(..) + | Command::HSet(..) + | Command::HDel(..) + | Command::HIncrBy(..) + | Command::HIncrByFloat(..) + | Command::HSetNx(..) + | Command::LPush(..) + | Command::RPush(..) + | Command::LPushX(..) + | Command::RPushX(..) + | Command::LPop(..) + | Command::RPop(..) + | Command::LSet(..) + | Command::LRem(..) + | Command::LTrim(..) + | Command::SAdd(..) + | Command::SRem(..) + | Command::SInterStore(..) + | Command::SUnionStore(..) + | Command::SDiffStore(..) + | Command::SPop(..) + | Command::SMove(..) + | Command::ZAdd(..) + | Command::ZRem(..) + | Command::ZIncrBy(..) + ) +} + +// ── Replication server (primary side) ──────────────────────────────────────── + +async fn run_repl_server( + port: u16, + store: Arc, + snap_cfg: Arc, + replicas: ReplRegistry, + repl_password: Option>, +) { + let listener = match TcpListener::bind(format!("0.0.0.0:{}", port)).await { + Ok(l) => l, + Err(e) => { + warn!("Replication listener failed to bind :{}: {}", port, e); + return; + } + }; + info!("Replication server listening on 0.0.0.0:{}", port); + loop { + match listener.accept().await { + Ok((socket, addr)) => { + info!("Replica connected from {}", addr); + let store = Arc::clone(&store); + let snap_cfg = Arc::clone(&snap_cfg); + let replicas = Arc::clone(&replicas); + let pwd = repl_password.clone(); + tokio::spawn(async move { + if let Err(e) = handle_replica(socket, store, snap_cfg, replicas, pwd).await { + info!("Replica {} disconnected: {}", addr, e); + } + }); + } + Err(e) => warn!("Replication accept error: {}", e), + } + } +} + +async fn handle_replica( + mut socket: TcpStream, + store: Arc, + _snap_cfg: Arc, + replicas: ReplRegistry, + repl_password: Option>, +) -> std::io::Result<()> { + // 0. Auth handshake — replica must send "\n" before anything else + if let Some(pwd) = &repl_password { + let mut auth_buf = vec![0u8; pwd.len() + 1]; + socket.read_exact(&mut auth_buf).await?; + let received_pwd = &auth_buf[..pwd.len()]; + let terminator = auth_buf[pwd.len()]; + if received_pwd != pwd.as_bytes() || terminator != b'\n' { + let _ = socket + .write_all(b"-ERR invalid replication password\n") + .await; + return Err(std::io::Error::new( + ErrorKind::PermissionDenied, + "replication auth failed", + )); + } + socket.write_all(b"+OK\n").await?; + socket.flush().await?; + } + + // 1. Register channel first so subsequent writes are buffered + let (tx, mut rx) = mpsc::unbounded_channel::>(); + replicas.lock().await.push(tx); + + // 2. Take snapshot and send (writes since snapshot are in channel) + let snap_bytes = + rmp_serde::to_vec(&store.snapshot()).map_err(|e| std::io::Error::other(e.to_string()))?; + let len = snap_bytes.len() as u32; + socket.write_all(&len.to_le_bytes()).await?; + socket.write_all(&snap_bytes).await?; + socket.flush().await?; + + // 3. Stream buffered + ongoing writes + while let Some(bytes) = rx.recv().await { + let len = bytes.len() as u32; + socket.write_all(&len.to_le_bytes()).await?; + socket.write_all(&bytes).await?; + socket.flush().await?; + } + Ok(()) +} + +// ── Replication client (replica side) ──────────────────────────────────────── + +async fn run_repl_client( + primary_addr: String, + store: Arc, + repl_password: Option, +) { + let mut backoff_secs = 2u64; + loop { + info!("Replica: connecting to primary at {}", primary_addr); + match TcpStream::connect(&primary_addr).await { + Err(e) => warn!("Replica: connect failed: {}", e), + Ok(mut socket) => { + backoff_secs = 2; + if let Err(e) = + sync_from_primary(&mut socket, &store, repl_password.as_deref()).await + { + warn!("Replica: sync ended: {}", e); + } + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(30); + } +} + +async fn sync_from_primary( + socket: &mut TcpStream, + store: &KeyValueStore, + repl_password: Option<&str>, +) -> std::io::Result<()> { + // 0. Send auth password if configured + if let Some(pwd) = repl_password { + let msg = format!("{}\n", pwd); + socket.write_all(msg.as_bytes()).await?; + socket.flush().await?; + // Read "+OK\n" (4 bytes) + let mut resp = [0u8; 4]; + socket.read_exact(&mut resp).await?; + if &resp != b"+OK\n" { + return Err(std::io::Error::new( + ErrorKind::PermissionDenied, + "replication auth rejected by primary", + )); + } + } + + // 1. Receive full snapshot + let mut len_buf = [0u8; 4]; + socket.read_exact(&mut len_buf).await?; + let snap_len = u32::from_le_bytes(len_buf) as usize; + let mut snap_bytes = vec![0u8; snap_len]; + socket.read_exact(&mut snap_bytes).await?; + + match rmp_serde::from_slice::>(&snap_bytes) { + Ok(entries) => { + let count = entries.len(); + store.restore(entries); + info!("Replica: snapshot loaded ({} entries)", count); + } + Err(e) => { + return Err(std::io::Error::new(ErrorKind::InvalidData, e.to_string())); + } + } + + // 2. Stream write commands from primary + loop { + let mut len_buf = [0u8; 4]; + socket.read_exact(&mut len_buf).await?; + let cmd_len = u32::from_le_bytes(len_buf) as usize; + let mut cmd_bytes = vec![0u8; cmd_len]; + socket.read_exact(&mut cmd_bytes).await?; + + match Value::parse(&cmd_bytes) { + Ok((value, _)) => { + if let Ok(cmd) = Command::from_value(value) { + store.execute(cmd); + } + } + Err(e) => warn!("Replica: bad command from primary: {}", e), + } + } +} + // ── connection identity ────────────────────────────────────────────────────── // TCP mutation broadcasts use id=0; WS/TCP pubsub connections get ids ≥ 1. @@ -532,6 +963,10 @@ fn broadcast_for(cmd: &Command, response: &Value) -> Option { let ms_s = ms.to_string(); Some(resp_command(&["SET", k, v, "PX", &ms_s])) } + Command::Append(k, v) => match response { + Value::Integer(_) => Some(resp_command(&["APPEND", k, v])), + _ => None, + }, Command::GetSet(k, v) => Some(resp_command(&["SET", k, v])), Command::Incr(k) | Command::Decr(k) => match response { Value::Integer(n) => { @@ -905,6 +1340,10 @@ async fn main() -> Result<(), Box> { .ok() .and_then(|v| v.parse::().ok()); + let max_memory_bytes = std::env::var("RECACHED_MAX_MEMORY") + .ok() + .and_then(|v| parse_memory_bytes(&v)); + let eviction_policy = match std::env::var("RECACHED_EVICTION") .unwrap_or_default() .to_lowercase() @@ -917,11 +1356,148 @@ async fn main() -> Result<(), Box> { _ => EvictionPolicy::NoEviction, }; - if max_keys.is_some() { - info!("Key limit: {:?}, eviction: {:?}", max_keys, eviction_policy); + if max_keys.is_some() || max_memory_bytes.is_some() { + info!( + "Key limit: {:?}, memory limit: {:?} bytes, eviction: {:?}", + max_keys, max_memory_bytes, eviction_policy + ); + } + + let store = Arc::new(KeyValueStore::with_config( + max_keys, + max_memory_bytes, + eviction_policy, + )); + + // ── snapshot persistence ────────────────────────────────────────────── + let save_path = PathBuf::from( + std::env::var("RECACHED_SAVE_PATH").unwrap_or_else(|_| "recached.rdb".to_string()), + ); + let save_interval_secs: u64 = std::env::var("RECACHED_SAVE_INTERVAL") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(900); + + load_snapshot(&store, &save_path).await; + + let snap_cfg = Arc::new(SnapshotConfig { + path: save_path, + last_save: AtomicI64::new(now_unix_secs()), + }); + + // ── AOF ─────────────────────────────────────────────────────────────── + let aof_path = std::env::var("RECACHED_AOF_PATH").ok().map(PathBuf::from); + let aof_sync = match std::env::var("RECACHED_AOF_SYNC") + .unwrap_or_default() + .to_lowercase() + .as_str() + { + "always" => AofSync::Always, + "no" => AofSync::No, + _ => AofSync::EverySec, + }; + + let aof: Option> = if let Some(path) = aof_path { + match AofWriter::open(path.clone(), aof_sync).await { + Ok(w) => { + replay_aof(&store, &path).await; + let writer = Arc::new(w); + if aof_sync == AofSync::EverySec { + let w2 = Arc::clone(&writer); + tokio::spawn(async move { + let mut interval = + tokio::time::interval(tokio::time::Duration::from_secs(1)); + loop { + interval.tick().await; + w2.flush().await; + } + }); + } + info!( + "AOF enabled: {:?} (sync={})", + path, + match aof_sync { + AofSync::Always => "always", + AofSync::EverySec => "everysec", + AofSync::No => "no", + } + ); + Some(writer) + } + Err(e) => { + warn!("AOF open failed: {} — running without AOF", e); + None + } + } + } else { + None + }; + + // ── Replication ─────────────────────────────────────────────────────── + let replicaof = std::env::var("RECACHED_REPLICAOF").ok(); + let repl_port: u16 = std::env::var("RECACHED_REPL_PORT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(6381); + let repl_password: Option = std::env::var("RECACHED_REPL_PASSWORD").ok(); + + if repl_password.is_some() { + info!("Replication auth ENABLED (RECACHED_REPL_PASSWORD is set)."); + } else { + warn!( + "Replication auth DISABLED. Set RECACHED_REPL_PASSWORD to secure the replication port." + ); + } + + let is_replica_start = replicaof.is_some(); + let replicas: ReplRegistry = Arc::new(tokio::sync::Mutex::new(Vec::new())); + + // ── server state ────────────────────────────────────────────────────── + let state = Arc::new(ServerState { + snap: Arc::clone(&snap_cfg), + aof, + replicas: Arc::clone(&replicas), + is_replica: std::sync::atomic::AtomicBool::new(is_replica_start), + }); + + // ── autosave ────────────────────────────────────────────────────────── + if save_interval_secs > 0 { + let store_snap = Arc::clone(&store); + let state_snap = Arc::clone(&state); + tokio::spawn(async move { + let mut interval = + tokio::time::interval(tokio::time::Duration::from_secs(save_interval_secs)); + interval.tick().await; // skip immediate first tick + loop { + interval.tick().await; + state_snap.save(&store_snap).await; + } + }); + info!( + "Autosave every {}s → {:?}", + save_interval_secs, snap_cfg.path + ); + } else { + info!("Autosave disabled (RECACHED_SAVE_INTERVAL=0). Use SAVE or BGSAVE manually."); } - let store = Arc::new(KeyValueStore::with_config(max_keys, eviction_policy)); + // ── start replication ───────────────────────────────────────────────── + if !is_replica_start { + let store_r = Arc::clone(&store); + let snap_r = Arc::clone(&snap_cfg); + let reg_r = Arc::clone(&replicas); + let pwd_r = repl_password.clone().map(Arc::new); + tokio::spawn(async move { + run_repl_server(repl_port, store_r, snap_r, reg_r, pwd_r).await; + }); + } else if let Some(primary_addr) = replicaof { + let store_r = Arc::clone(&store); + let pwd_r = repl_password.clone(); + tokio::spawn(async move { + run_repl_client(primary_addr, store_r, pwd_r).await; + }); + info!("Running as replica — write commands will be rejected"); + } // ── background eviction ─────────────────────────────────────────────── { @@ -932,6 +1508,7 @@ async fn main() -> Result<(), Box> { loop { interval.tick().await; store_sweep.sweep_expired(); + store_sweep.try_evict_for_memory(); } }); } @@ -947,7 +1524,12 @@ async fn main() -> Result<(), Box> { let watch_registry: WatchRegistry = Arc::new(Mutex::new(HashMap::new())); // ── connection limiter ──────────────────────────────────────────────── - let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS)); + let max_connections = std::env::var("RECACHED_MAX_CONNECTIONS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_MAX_CONNECTIONS); + info!("Max connections: {}", max_connections); + let semaphore = Arc::new(Semaphore::new(max_connections)); // ── TLS ─────────────────────────────────────────────────────────────── let tls_acceptor: Option = load_tls_acceptor(); @@ -977,6 +1559,7 @@ async fn main() -> Result<(), Box> { let pubsub_tcp = Arc::clone(&pubsub); let tls_tcp = Arc::clone(&tls_acceptor); let watch_tcp = Arc::clone(&watch_registry); + let snap_tcp = Arc::clone(&state); tokio::spawn(async move { loop { @@ -1001,15 +1584,16 @@ async fn main() -> Result<(), Box> { let ps = Arc::clone(&pubsub_tcp); let wr = Arc::clone(&watch_tcp); let tls = Arc::clone(&tls_tcp); + let sc = Arc::clone(&snap_tcp); tokio::spawn(async move { let _permit = permit; if let Some(acc) = tls.as_ref() { match acc.accept(socket).await { - Ok(tls_stream) => handle_tcp(tls_stream, s, t, p, ps, wr).await, + Ok(tls_stream) => handle_tcp(tls_stream, s, t, p, ps, wr, sc).await, Err(e) => warn!("TCP TLS handshake failed from {}: {}", addr, e), } } else { - handle_tcp(socket, s, t, p, ps, wr).await; + handle_tcp(socket, s, t, p, ps, wr, sc).await; } }); } @@ -1018,44 +1602,80 @@ async fn main() -> Result<(), Box> { } }); + // ── graceful shutdown via oneshot channel ──────────────────────────── + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + tokio::spawn(async move { + #[cfg(unix)] + { + let mut sigterm = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to register SIGTERM handler"); + tokio::select! { + _ = tokio::signal::ctrl_c() => {}, + _ = sigterm.recv() => {}, + } + } + #[cfg(not(unix))] + { + let _ = tokio::signal::ctrl_c().await; + } + let _ = shutdown_tx.send(()); + }); + loop { - match ws_listener.accept().await { - Ok((socket, addr)) => { - if let Some(allowed) = &allowed_ips - && !allowed.contains(&addr.ip()) - { - debug!("WS: rejected IP {}", addr.ip()); - continue; - } - let permit = match Arc::clone(&semaphore).try_acquire_owned() { - Ok(p) => p, - Err(_) => { - warn!("WS: connection limit reached, dropping {}", addr); - continue; - } - }; - let s = Arc::clone(&store); - let t = tx.clone(); - let p = Arc::clone(&global_password); - let ps = Arc::clone(&pubsub); - let wr = Arc::clone(&watch_registry); - let tls = Arc::clone(&tls_acceptor); - let id = next_conn_id(); - tokio::spawn(async move { - let _permit = permit; - if let Some(acc) = tls.as_ref() { - match acc.accept(socket).await { - Ok(tls_stream) => handle_ws(tls_stream, s, t, p, id, ps, wr).await, - Err(e) => warn!("WS TLS handshake failed from {}: {}", addr, e), + tokio::select! { + biased; + + res = ws_listener.accept() => { + match res { + Ok((socket, addr)) => { + if let Some(allowed) = &allowed_ips + && !allowed.contains(&addr.ip()) + { + debug!("WS: rejected IP {}", addr.ip()); + continue; } - } else { - handle_ws(socket, s, t, p, id, ps, wr).await; + let permit = match Arc::clone(&semaphore).try_acquire_owned() { + Ok(p) => p, + Err(_) => { + warn!("WS: connection limit reached, dropping {}", addr); + continue; + } + }; + let s = Arc::clone(&store); + let t = tx.clone(); + let p = Arc::clone(&global_password); + let ps = Arc::clone(&pubsub); + let wr = Arc::clone(&watch_registry); + let tls = Arc::clone(&tls_acceptor); + let sc = Arc::clone(&state); + let id = next_conn_id(); + tokio::spawn(async move { + let _permit = permit; + if let Some(acc) = tls.as_ref() { + match acc.accept(socket).await { + Ok(tls_stream) => handle_ws(tls_stream, s, t, p, id, ps, wr, sc).await, + Err(e) => warn!("WS TLS handshake failed from {}: {}", addr, e), + } + } else { + handle_ws(socket, s, t, p, id, ps, wr, sc).await; + } + }); } - }); + Err(e) => warn!("WS accept error: {}", e), + } + } + + _ = &mut shutdown_rx => { + info!("Shutdown signal received, saving final snapshot..."); + state.save(&store).await; + info!("Done. Goodbye."); + break; } - Err(e) => warn!("WS accept error: {}", e), } } + + Ok(()) } // ── TCP handler ─────────────────────────────────────────────────────────────── @@ -1067,6 +1687,7 @@ async fn handle_tcp( password: Arc>, pubsub: SharedPubSub, watch_registry: WatchRegistry, + state: Arc, ) where S: AsyncRead + AsyncWrite + Unpin + Send, { @@ -1156,7 +1777,8 @@ async fn handle_tcp( for qcmd in queue { let resp = execute_and_record(&store, &qcmd); if let Some(msg) = broadcast_for(&qcmd, &resp) { - let _ = tx.send((0, msg)); + let _ = tx.send((0, msg.clone())); + state.on_write(&msg).await; } notify_watchers(&watch_registry, &qcmd, &resp, &store); results.push(resp); @@ -1260,11 +1882,42 @@ async fn handle_tcp( if writer.write_all(err).await.is_err() { break 'outer; } continue 'parse; } + // Replica: reject writes + if state.is_replica() && is_write_command(&cmd) { + let err = b"-READONLY You can't write against a read only replica.\r\n"; + if writer.write_all(err).await.is_err() { break 'outer; } + continue 'parse; + } + // Snapshot commands — handled here (async I/O, not in execute()) + match &cmd { + Command::Save => { + state.save(&store).await; + if writer.write_all(b"+OK\r\n").await.is_err() { break 'outer; } + continue 'parse; + } + Command::BgSave => { + let s = Arc::clone(&store); + let st = Arc::clone(&state); + tokio::spawn(async move { st.save(&s).await; }); + if writer.write_all(b"+Background saving started\r\n").await.is_err() { break 'outer; } + continue 'parse; + } + Command::LastSave => { + let ts = state.snap.last_save.load(Ordering::Relaxed); + if writer.write_all(&Value::Integer(ts).serialize()).await.is_err() { break 'outer; } + continue 'parse; + } + Command::ReplicaOfNoOne => { + state.promote_to_primary(); + if writer.write_all(b"+OK\r\n").await.is_err() { break 'outer; } + continue 'parse; + } + _ => {} + } let response = execute_and_record(&store, &cmd); - if let Some(msg) = broadcast_for(&cmd, &response) - && let Err(e) = tx.send((0, msg)) - { - debug!("TCP broadcast had no WS receivers: {}", e); + if let Some(msg) = broadcast_for(&cmd, &response) { + let _ = tx.send((0, msg.clone())); + state.on_write(&msg).await; } notify_watchers(&watch_registry, &cmd, &response, &store); if writer.write_all(&response.serialize()).await.is_err() { @@ -1310,6 +1963,7 @@ async fn handle_tcp( // ── WebSocket handler ───────────────────────────────────────────────────────── +#[allow(clippy::too_many_arguments)] async fn handle_ws( socket: S, store: Arc, @@ -1318,6 +1972,7 @@ async fn handle_ws( conn_id: u64, pubsub: SharedPubSub, watch_registry: WatchRegistry, + state: Arc, ) where S: AsyncRead + AsyncWrite + Unpin + Send, { @@ -1422,7 +2077,8 @@ async fn handle_ws( for qcmd in queue { let resp = execute_and_record(&store, &qcmd); if let Some(msg) = broadcast_for(&qcmd, &resp) { - let _ = tx.send((conn_id, msg)); + let _ = tx.send((conn_id, msg.clone())); + state.on_write(&msg).await; } notify_watchers(&watch_registry, &qcmd, &resp, &store); results.push(resp); @@ -1555,11 +2211,43 @@ async fn handle_ws( ws_send!(b"-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in subscribe mode\r\n"); continue 'outer; } + // Replica: reject writes + if state.is_replica() && is_write_command(&cmd) { + ws_send!(b"-READONLY You can't write against a read only replica.\r\n"); + continue 'outer; + } + // Snapshot commands + match &cmd { + Command::Save => { + state.save(&store).await; + ws_send!(b"+OK\r\n"); + continue 'outer; + } + Command::BgSave => { + let s = Arc::clone(&store); + let st = Arc::clone(&state); + tokio::spawn(async move { st.save(&s).await; }); + ws_send!(b"+Background saving started\r\n"); + continue 'outer; + } + Command::LastSave => { + let ts = state.snap.last_save.load(Ordering::Relaxed); + ws_send!(&Value::Integer(ts).serialize()); + continue 'outer; + } + Command::ReplicaOfNoOne => { + state.promote_to_primary(); + ws_send!(b"+OK\r\n"); + continue 'outer; + } + _ => {} + } let response = execute_and_record(&store, &cmd); - if let Some(b_msg) = broadcast_for(&cmd, &response) - && let Err(e) = tx.send((conn_id, b_msg)) - { - debug!("WS broadcast on conn {} had no receivers: {}", conn_id, e); + if let Some(b_msg) = broadcast_for(&cmd, &response) { + if let Err(e) = tx.send((conn_id, b_msg.clone())) { + debug!("WS broadcast on conn {} had no receivers: {}", conn_id, e); + } + state.on_write(&b_msg).await; } notify_watchers(&watch_registry, &cmd, &response, &store); ws_send!(&response.serialize()); @@ -1631,3 +2319,125 @@ async fn handle_ws( } } } + +#[cfg(test)] +mod tests { + use super::*; + use core_engine::cmd::{SetOptions, ZAddOptions}; + use core_engine::resp::Value; + use core_engine::store::KeyValueStore; + use std::sync::atomic::AtomicI64; + + fn tmp_path(name: &str) -> PathBuf { + std::env::temp_dir().join(format!("recached_test_{name}_{}", std::process::id())) + } + + // ── is_write_command ────────────────────────────────────────────────────── + + #[test] + fn is_write_command_classifies_correctly() { + assert!(is_write_command(&Command::Set( + "k".into(), + "v".into(), + SetOptions::default() + ))); + assert!(is_write_command(&Command::Del(vec!["k".into()]))); + assert!(is_write_command(&Command::Incr("k".into()))); + assert!(is_write_command(&Command::FlushDb)); + assert!(is_write_command(&Command::HSet( + "h".into(), + vec![("f".into(), "v".into())] + ))); + assert!(is_write_command(&Command::LPush( + "l".into(), + vec!["v".into()] + ))); + assert!(is_write_command(&Command::SAdd( + "s".into(), + vec!["m".into()] + ))); + assert!(is_write_command(&Command::ZAdd( + "z".into(), + ZAddOptions::default(), + vec![(1.0, "m".into())] + ))); + // reads + assert!(!is_write_command(&Command::Get("k".into()))); + assert!(!is_write_command(&Command::HGet("h".into(), "f".into()))); + assert!(!is_write_command(&Command::LRange("l".into(), 0, -1))); + assert!(!is_write_command(&Command::SMembers("s".into()))); + assert!(!is_write_command(&Command::DbSize)); + assert!(!is_write_command(&Command::Ping(None))); + assert!(!is_write_command(&Command::Publish( + "ch".into(), + "msg".into() + ))); + } + + // ── AOF replay ──────────────────────────────────────────────────────────── + + #[tokio::test] + async fn replay_aof_missing_file() { + let store = KeyValueStore::new(); + let path = tmp_path("aof_missing"); + let count = replay_aof(&store, &path).await; + assert_eq!(count, 0); + } + + #[tokio::test] + async fn replay_aof_basic() { + let store = KeyValueStore::new(); + let path = tmp_path("aof_basic.aof"); + let resp = "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n\ + *3\r\n$3\r\nSET\r\n$3\r\nbaz\r\n$3\r\nqux\r\n"; + tokio::fs::write(&path, resp.as_bytes()).await.unwrap(); + let count = replay_aof(&store, &path).await; + assert_eq!(count, 2); + assert_eq!(store.execute(Command::DbSize), Value::Integer(2)); + let _ = tokio::fs::remove_file(&path).await; + } + + // ── Snapshot save / load ────────────────────────────────────────────────── + + #[tokio::test] + async fn snapshot_save_and_load() { + let store = KeyValueStore::new(); + store.execute(Command::Set( + "hello".into(), + "world".into(), + SetOptions::default(), + )); + let path = tmp_path("snap.rdb"); + let cfg = Arc::new(SnapshotConfig { + path: path.clone(), + last_save: AtomicI64::new(0), + }); + save_snapshot(&store, &cfg).await; + assert!(path.exists()); + let store2 = KeyValueStore::new(); + let loaded = load_snapshot(&store2, &path).await; + assert!(loaded); + assert_eq!( + store2.execute(Command::Get("hello".into())), + Value::BulkString(Some(b"world".to_vec())) + ); + let _ = tokio::fs::remove_file(&path).await; + } + + // ── AofWriter append / truncate ─────────────────────────────────────────── + + #[tokio::test] + async fn aof_writer_append_and_truncate() { + let path = tmp_path("aof_writer.aof"); + let aof = AofWriter::open(path.clone(), AofSync::No).await.unwrap(); + aof.append("*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n") + .await; + aof.flush().await; + let len_before = tokio::fs::metadata(&path).await.unwrap().len(); + assert!(len_before > 0); + aof.truncate().await; + let len_after = tokio::fs::metadata(&path).await.unwrap().len(); + assert_eq!(len_after, 0); + let _ = tokio::fs::remove_file(&path).await; + } +} diff --git a/wasm-edge/node_modules/.package-lock.json b/wasm-edge/node_modules/.package-lock.json index f5467cc..6caa5ef 100644 --- a/wasm-edge/node_modules/.package-lock.json +++ b/wasm-edge/node_modules/.package-lock.json @@ -1,6 +1,6 @@ { "name": "recached-edge", - "version": "0.1.3", + "version": "0.1.4", "lockfileVersion": 3, "requires": true, "packages": { diff --git a/wasm-edge/package-lock.json b/wasm-edge/package-lock.json index af9c559..584c942 100644 --- a/wasm-edge/package-lock.json +++ b/wasm-edge/package-lock.json @@ -1,12 +1,12 @@ { "name": "recached-edge", - "version": "0.1.3", + "version": "0.1.4", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "recached-edge", - "version": "0.1.3", + "version": "0.1.4", "license": "MIT", "devDependencies": { "typescript": "^5.9.3" diff --git a/wasm-edge/package.json b/wasm-edge/package.json index 8774fb9..0ce8da7 100644 --- a/wasm-edge/package.json +++ b/wasm-edge/package.json @@ -1,7 +1,7 @@ { "name": "recached-edge", - "description": "Browser and edge WebAssembly client for Recached — zero-latency local cache with automatic server sync", - "version": "0.1.3", + "description": "Browser and edge WebAssembly client for Recached \u2014 zero-latency local cache with automatic server sync", + "version": "0.1.4", "type": "module", "main": "sdk.js", "module": "sdk.js", @@ -21,7 +21,10 @@ ], "scripts": { "build": "wasm-pack build --target web --out-dir pkg && tsc", - "typecheck": "tsc --noEmit" + "typecheck": "tsc --noEmit", + "version:patch": "npm version patch --no-git-tag-version", + "version:minor": "npm version minor --no-git-tag-version", + "version:major": "npm version major --no-git-tag-version" }, "devDependencies": { "typescript": "^5.9.3" diff --git a/wasm-edge/sdk.d.ts b/wasm-edge/sdk.d.ts new file mode 100644 index 0000000..8d750a8 --- /dev/null +++ b/wasm-edge/sdk.d.ts @@ -0,0 +1,200 @@ +export interface ConnectOptions { + /** WebSocket URL, e.g. `"ws://localhost:6380"` or `"wss://cache.example.com"`. */ + url: string; + /** + * Server password. Required when the server has `RECACHED_PASSWORD` set. + * Sent immediately after the socket opens via `AUTH`. + */ + password?: string; +} +export interface CacheOptions { + /** + * Load the IndexedDB WAL and enable write-through persistence so the cache + * survives page refreshes. + * + * Gracefully ignored if IndexedDB is unavailable (e.g. some private-browsing + * modes or non-browser environments). + * + * @default false + */ + persistence?: boolean; + /** + * BroadcastChannel name for cross-tab mutation sharing. + * All tabs that open with the same name see each other's writes automatically, + * with no server connection required. + */ + broadcastChannel?: string; + /** + * Connect to a Recached server immediately after the cache is created. + * Once connected, writes are pushed to the server and server-side mutations + * are pushed down to the local WASM store automatically. + */ + connect?: ConnectOptions; +} +interface RawCache { + enable_persistence(): Promise; + clear_persistence(): Promise; + broadcast(channel_name: string): void; + connect(url: string): void; + auth(password: string): string; + set(key: string, value: string): string; + set_ex(key: string, value: string, seconds: number): string; + get(key: string): string | undefined; + del(key: string): number; + ttl(key: string): number; + exists(key: string): boolean; + publish(channel: string, message: string): void; + subscribe(channel: string): void; + unsubscribe(channel: string): void; + set_mutation_callback(cb: () => void): void; + free(): void; +} +/** + * A typed wrapper around the Recached WASM cache. + * + * Obtain an instance via {@link createCache} rather than `new Cache()`. + * + * ```ts + * const cache = await createCache({ persistence: true }); + * cache.set('theme', 'dark'); + * cache.get('theme'); // "dark" + * ``` + */ +export declare class Cache { + /** @internal */ + readonly raw: RawCache; + private readonly _mutationListeners; + /** @internal Arrow function so `this` is always bound when passed as a callback. */ + private readonly _notifyMutation; + /** @internal */ + constructor(raw: RawCache); + /** + * Subscribe to store mutations from any source — local writes, server + * WebSocket push, and BroadcastChannel cross-tab sync. + * + * Returns an unsubscribe function. Pass directly to React's + * `useSyncExternalStore` `subscribe` parameter. + * + * ```ts + * useSyncExternalStore( + * cache.onMutation.bind(cache), + * () => cache.get('key'), + * () => null, + * ); + * ``` + */ + onMutation(cb: () => void): () => void; + /** + * Return the value for `key`, or `null` if the key does not exist or has expired. + * + * Always served from local WASM memory — zero network latency. + */ + get(key: string): string | null; + /** + * Return a JSON-parsed value stored under `key`, or `null` if the key is + * missing, expired, or not valid JSON. + * + * ```ts + * interface User { id: number; name: string } + * const user = cache.getJSON('user:42'); // User | null + * ``` + */ + getJSON(key: string): T | null; + /** Return `true` if `key` exists and has not expired. */ + exists(key: string): boolean; + /** + * Return the remaining TTL in seconds. + * - `-1` — key exists with no expiry + * - `-2` — key does not exist + */ + ttl(key: string): number; + /** + * Store a string value. Syncs to the server and other tabs when connected. + */ + set(key: string, value: string): void; + /** + * Store a string value with a TTL (seconds). The key is deleted automatically + * once the TTL elapses. + */ + setEx(key: string, value: string, seconds: number): void; + /** + * Serialize `value` as JSON and store it under `key`. + * Pass `ttl` (seconds) to have the key expire automatically. + * + * ```ts + * cache.setJSON('user:42', { id: 42, name: 'Alice' }, 300); // expires in 5 min + * ``` + */ + setJSON(key: string, value: T, ttl?: number): void; + /** + * Delete `key`. + * + * Returns `true` if the key existed, `false` if it did not. + * Syncs to the server and other tabs when connected. + */ + del(key: string): boolean; + /** + * Subscribe to a server pub/sub channel. Push messages arrive via the + * WebSocket `onmessage` callback. + */ + subscribe(channel: string): void; + /** Unsubscribe from a server pub/sub channel. */ + unsubscribe(channel: string): void; + /** + * Publish a message to a server pub/sub channel. All subscribers — browser + * and server-side — receive the message. + */ + publish(channel: string, message: string): void; + /** + * Erase the IndexedDB WAL. The in-memory store is not affected. + * + * Use on sign-out so the next session starts with an empty cache. + */ + clearPersistence(): Promise; +} +/** + * Initialise the WASM module eagerly. + * + * `createCache` calls this automatically on first use, so calling `init()` + * directly is only necessary when you want to front-load the WASM download + * (e.g. during a loading screen). + * + * ```ts + * import { init, createCache } from 'recached-edge'; + * + * await init(); // start downloading WASM now + * // ... other app setup ... + * const cache = await createCache(); // resolves immediately — already loaded + * ``` + */ +export declare function init(): Promise; +/** + * Create a {@link Cache} instance. + * + * Loads and initialises the WASM module on first call (subsequent calls reuse + * the existing module). Options are applied in order: + * `persistence` → `broadcastChannel` → `connect` + `auth`. + * + * ```ts + * import { createCache } from 'recached-edge'; + * + * // Local-only, survives refresh + * const cache = await createCache({ persistence: true }); + * + * // With server sync + * const cache = await createCache({ + * persistence: true, + * connect: { url: 'ws://localhost:6380', password: 'secret' }, + * }); + * + * cache.set('theme', 'dark'); + * cache.getJSON('user:42'); // User | null + * + * // --- page refresh --- + * const cache = await createCache({ persistence: true }); + * cache.get('theme'); // "dark" — restored from IndexedDB, zero network + * ``` + */ +export declare function createCache(options?: CacheOptions): Promise; +export {}; +//# sourceMappingURL=sdk.d.ts.map \ No newline at end of file diff --git a/wasm-edge/sdk.d.ts.map b/wasm-edge/sdk.d.ts.map new file mode 100644 index 0000000..27d3b07 --- /dev/null +++ b/wasm-edge/sdk.d.ts.map @@ -0,0 +1 @@ +{"version":3,"file":"sdk.d.ts","sourceRoot":"","sources":["sdk.ts"],"names":[],"mappings":"AAEA,MAAM,WAAW,cAAc;IAC7B,kFAAkF;IAClF,GAAG,EAAE,MAAM,CAAC;IACZ;;;OAGG;IACH,QAAQ,CAAC,EAAE,MAAM,CAAC;CACnB;AAED,MAAM,WAAW,YAAY;IAC3B;;;;;;;;OAQG;IACH,WAAW,CAAC,EAAE,OAAO,CAAC;IACtB;;;;OAIG;IACH,gBAAgB,CAAC,EAAE,MAAM,CAAC;IAC1B;;;;OAIG;IACH,OAAO,CAAC,EAAE,cAAc,CAAC;CAC1B;AAKD,UAAU,QAAQ;IAChB,kBAAkB,IAAI,OAAO,CAAC,IAAI,CAAC,CAAC;IACpC,iBAAiB,IAAI,OAAO,CAAC,IAAI,CAAC,CAAC;IACnC,SAAS,CAAC,YAAY,EAAE,MAAM,GAAG,IAAI,CAAC;IACtC,OAAO,CAAC,GAAG,EAAE,MAAM,GAAG,IAAI,CAAC;IAC3B,IAAI,CAAC,QAAQ,EAAE,MAAM,GAAG,MAAM,CAAC;IAC/B,GAAG,CAAC,GAAG,EAAE,MAAM,EAAE,KAAK,EAAE,MAAM,GAAG,MAAM,CAAC;IACxC,MAAM,CAAC,GAAG,EAAE,MAAM,EAAE,KAAK,EAAE,MAAM,EAAE,OAAO,EAAE,MAAM,GAAG,MAAM,CAAC;IAC5D,GAAG,CAAC,GAAG,EAAE,MAAM,GAAG,MAAM,GAAG,SAAS,CAAC;IACrC,GAAG,CAAC,GAAG,EAAE,MAAM,GAAG,MAAM,CAAC;IACzB,GAAG,CAAC,GAAG,EAAE,MAAM,GAAG,MAAM,CAAC;IACzB,MAAM,CAAC,GAAG,EAAE,MAAM,GAAG,OAAO,CAAC;IAC7B,OAAO,CAAC,OAAO,EAAE,MAAM,EAAE,OAAO,EAAE,MAAM,GAAG,IAAI,CAAC;IAChD,SAAS,CAAC,OAAO,EAAE,MAAM,GAAG,IAAI,CAAC;IACjC,WAAW,CAAC,OAAO,EAAE,MAAM,GAAG,IAAI,CAAC;IACnC,qBAAqB,CAAC,EAAE,EAAE,MAAM,IAAI,GAAG,IAAI,CAAC;IAC5C,IAAI,IAAI,IAAI,CAAC;CACd;AAwBD;;;;;;;;;;GAUG;AACH,qBAAa,KAAK;IAChB,gBAAgB;IAChB,QAAQ,CAAC,GAAG,EAAE,QAAQ,CAAC;IAEvB,OAAO,CAAC,QAAQ,CAAC,kBAAkB,CAAyB;IAE5D,oFAAoF;IACpF,OAAO,CAAC,QAAQ,CAAC,eAAe,CAE9B;IAEF,gBAAgB;gBACJ,GAAG,EAAE,QAAQ;IAKzB;;;;;;;;;;;;;;OAcG;IACH,UAAU,CAAC,EAAE,EAAE,MAAM,IAAI,GAAG,MAAM,IAAI;IAOtC;;;;OAIG;IACH,GAAG,CAAC,GAAG,EAAE,MAAM,GAAG,MAAM,GAAG,IAAI;IAI/B;;;;;;;;OAQG;IACH,OAAO,CAAC,CAAC,EAAE,GAAG,EAAE,MAAM,GAAG,CAAC,GAAG,IAAI;IAUjC,yDAAyD;IACzD,MAAM,CAAC,GAAG,EAAE,MAAM,GAAG,OAAO;IAI5B;;;;OAIG;IACH,GAAG,CAAC,GAAG,EAAE,MAAM,GAAG,MAAM;IAMxB;;OAEG;IACH,GAAG,CAAC,GAAG,EAAE,MAAM,EAAE,KAAK,EAAE,MAAM,GAAG,IAAI;IAKrC;;;OAGG;IACH,KAAK,CAAC,GAAG,EAAE,MAAM,EAAE,KAAK,EAAE,MAAM,EAAE,OAAO,EAAE,MAAM,GAAG,IAAI;IAKxD;;;;;;;OAOG;IACH,OAAO,CAAC,CAAC,EAAE,GAAG,EAAE,MAAM,EAAE,KAAK,EAAE,CAAC,EAAE,GAAG,CAAC,EAAE,MAAM,GAAG,IAAI;IAUrD;;;;;OAKG;IACH,GAAG,CAAC,GAAG,EAAE,MAAM,GAAG,OAAO;IAQzB;;;OAGG;IACH,SAAS,CAAC,OAAO,EAAE,MAAM,GAAG,IAAI;IAIhC,iDAAiD;IACjD,WAAW,CAAC,OAAO,EAAE,MAAM,GAAG,IAAI;IAIlC;;;OAGG;IACH,OAAO,CAAC,OAAO,EAAE,MAAM,EAAE,OAAO,EAAE,MAAM,GAAG,IAAI;IAM/C;;;;OAIG;IACH,gBAAgB,IAAI,OAAO,CAAC,IAAI,CAAC;CAGlC;AAID;;;;;;;;;;;;;;GAcG;AACH,wBAAsB,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC,CAE1C;AAED;;;;;;;;;;;;;;;;;;;;;;;;;;GA0BG;AACH,wBAAsB,WAAW,CAAC,OAAO,GAAE,YAAiB,GAAG,OAAO,CAAC,KAAK,CAAC,CAkB5E"} \ No newline at end of file diff --git a/wasm-edge/sdk.js b/wasm-edge/sdk.js new file mode 100644 index 0000000..c5bbe2c --- /dev/null +++ b/wasm-edge/sdk.js @@ -0,0 +1,237 @@ +// ── Types ───────────────────────────────────────────────────────────────────── +let _module = null; +let _initPromise = null; +async function ensureModule() { + if (_module) + return _module; + if (!_initPromise) { + _initPromise = (async () => { + const mod = await import('./pkg/recached_edge.js'); + await mod.default(); // initialise WASM — idempotent on repeated calls + _module = mod; + return mod; + })(); + } + return _initPromise; +} +// ── Cache ───────────────────────────────────────────────────────────────────── +/** + * A typed wrapper around the Recached WASM cache. + * + * Obtain an instance via {@link createCache} rather than `new Cache()`. + * + * ```ts + * const cache = await createCache({ persistence: true }); + * cache.set('theme', 'dark'); + * cache.get('theme'); // "dark" + * ``` + */ +export class Cache { + /** @internal */ + constructor(raw) { + this._mutationListeners = new Set(); + /** @internal Arrow function so `this` is always bound when passed as a callback. */ + this._notifyMutation = () => { + for (const cb of this._mutationListeners) + cb(); + }; + this.raw = raw; + raw.set_mutation_callback(this._notifyMutation); + } + /** + * Subscribe to store mutations from any source — local writes, server + * WebSocket push, and BroadcastChannel cross-tab sync. + * + * Returns an unsubscribe function. Pass directly to React's + * `useSyncExternalStore` `subscribe` parameter. + * + * ```ts + * useSyncExternalStore( + * cache.onMutation.bind(cache), + * () => cache.get('key'), + * () => null, + * ); + * ``` + */ + onMutation(cb) { + this._mutationListeners.add(cb); + return () => this._mutationListeners.delete(cb); + } + // ── Reads ───────────────────────────────────────────────────────────────── + /** + * Return the value for `key`, or `null` if the key does not exist or has expired. + * + * Always served from local WASM memory — zero network latency. + */ + get(key) { + return this.raw.get(key) ?? null; + } + /** + * Return a JSON-parsed value stored under `key`, or `null` if the key is + * missing, expired, or not valid JSON. + * + * ```ts + * interface User { id: number; name: string } + * const user = cache.getJSON('user:42'); // User | null + * ``` + */ + getJSON(key) { + const raw = this.get(key); + if (raw === null) + return null; + try { + return JSON.parse(raw); + } + catch { + return null; + } + } + /** Return `true` if `key` exists and has not expired. */ + exists(key) { + return this.raw.exists(key); + } + /** + * Return the remaining TTL in seconds. + * - `-1` — key exists with no expiry + * - `-2` — key does not exist + */ + ttl(key) { + return this.raw.ttl(key); + } + // ── Writes ──────────────────────────────────────────────────────────────── + /** + * Store a string value. Syncs to the server and other tabs when connected. + */ + set(key, value) { + this.raw.set(key, value); + this._notifyMutation(); + } + /** + * Store a string value with a TTL (seconds). The key is deleted automatically + * once the TTL elapses. + */ + setEx(key, value, seconds) { + this.raw.set_ex(key, value, seconds); + this._notifyMutation(); + } + /** + * Serialize `value` as JSON and store it under `key`. + * Pass `ttl` (seconds) to have the key expire automatically. + * + * ```ts + * cache.setJSON('user:42', { id: 42, name: 'Alice' }, 300); // expires in 5 min + * ``` + */ + setJSON(key, value, ttl) { + const serialized = JSON.stringify(value); + if (ttl !== undefined) { + this.raw.set_ex(key, serialized, ttl); + } + else { + this.raw.set(key, serialized); + } + this._notifyMutation(); + } + /** + * Delete `key`. + * + * Returns `true` if the key existed, `false` if it did not. + * Syncs to the server and other tabs when connected. + */ + del(key) { + const existed = this.raw.del(key) === 1; + this._notifyMutation(); + return existed; + } + // ── Pub/sub ─────────────────────────────────────────────────────────────── + /** + * Subscribe to a server pub/sub channel. Push messages arrive via the + * WebSocket `onmessage` callback. + */ + subscribe(channel) { + this.raw.subscribe(channel); + } + /** Unsubscribe from a server pub/sub channel. */ + unsubscribe(channel) { + this.raw.unsubscribe(channel); + } + /** + * Publish a message to a server pub/sub channel. All subscribers — browser + * and server-side — receive the message. + */ + publish(channel, message) { + this.raw.publish(channel, message); + } + // ── Persistence ─────────────────────────────────────────────────────────── + /** + * Erase the IndexedDB WAL. The in-memory store is not affected. + * + * Use on sign-out so the next session starts with an empty cache. + */ + clearPersistence() { + return this.raw.clear_persistence(); + } +} +// ── Public API ──────────────────────────────────────────────────────────────── +/** + * Initialise the WASM module eagerly. + * + * `createCache` calls this automatically on first use, so calling `init()` + * directly is only necessary when you want to front-load the WASM download + * (e.g. during a loading screen). + * + * ```ts + * import { init, createCache } from 'recached-edge'; + * + * await init(); // start downloading WASM now + * // ... other app setup ... + * const cache = await createCache(); // resolves immediately — already loaded + * ``` + */ +export async function init() { + await ensureModule(); +} +/** + * Create a {@link Cache} instance. + * + * Loads and initialises the WASM module on first call (subsequent calls reuse + * the existing module). Options are applied in order: + * `persistence` → `broadcastChannel` → `connect` + `auth`. + * + * ```ts + * import { createCache } from 'recached-edge'; + * + * // Local-only, survives refresh + * const cache = await createCache({ persistence: true }); + * + * // With server sync + * const cache = await createCache({ + * persistence: true, + * connect: { url: 'ws://localhost:6380', password: 'secret' }, + * }); + * + * cache.set('theme', 'dark'); + * cache.getJSON('user:42'); // User | null + * + * // --- page refresh --- + * const cache = await createCache({ persistence: true }); + * cache.get('theme'); // "dark" — restored from IndexedDB, zero network + * ``` + */ +export async function createCache(options = {}) { + const mod = await ensureModule(); + const raw = new mod.RecachedCache(); + if (options.persistence) { + await raw.enable_persistence(); + } + if (options.broadcastChannel) { + raw.broadcast(options.broadcastChannel); + } + if (options.connect) { + raw.connect(options.connect.url); + if (options.connect.password) { + raw.auth(options.connect.password); + } + } + return new Cache(raw); +} diff --git a/wasm-edge/sdk.ts b/wasm-edge/sdk.ts index 13249d4..9df0122 100644 --- a/wasm-edge/sdk.ts +++ b/wasm-edge/sdk.ts @@ -53,6 +53,7 @@ interface RawCache { publish(channel: string, message: string): void; subscribe(channel: string): void; unsubscribe(channel: string): void; + set_mutation_callback(cb: () => void): void; free(): void; } @@ -93,9 +94,37 @@ export class Cache { /** @internal */ readonly raw: RawCache; + private readonly _mutationListeners = new Set<() => void>(); + + /** @internal Arrow function so `this` is always bound when passed as a callback. */ + private readonly _notifyMutation = (): void => { + for (const cb of this._mutationListeners) cb(); + }; + /** @internal */ constructor(raw: RawCache) { this.raw = raw; + raw.set_mutation_callback(this._notifyMutation); + } + + /** + * Subscribe to store mutations from any source — local writes, server + * WebSocket push, and BroadcastChannel cross-tab sync. + * + * Returns an unsubscribe function. Pass directly to React's + * `useSyncExternalStore` `subscribe` parameter. + * + * ```ts + * useSyncExternalStore( + * cache.onMutation.bind(cache), + * () => cache.get('key'), + * () => null, + * ); + * ``` + */ + onMutation(cb: () => void): () => void { + this._mutationListeners.add(cb); + return () => this._mutationListeners.delete(cb); } // ── Reads ───────────────────────────────────────────────────────────────── @@ -149,6 +178,7 @@ export class Cache { */ set(key: string, value: string): void { this.raw.set(key, value); + this._notifyMutation(); } /** @@ -157,6 +187,7 @@ export class Cache { */ setEx(key: string, value: string, seconds: number): void { this.raw.set_ex(key, value, seconds); + this._notifyMutation(); } /** @@ -174,6 +205,7 @@ export class Cache { } else { this.raw.set(key, serialized); } + this._notifyMutation(); } /** @@ -183,7 +215,9 @@ export class Cache { * Syncs to the server and other tabs when connected. */ del(key: string): boolean { - return this.raw.del(key) === 1; + const existed = this.raw.del(key) === 1; + this._notifyMutation(); + return existed; } // ── Pub/sub ─────────────────────────────────────────────────────────────── diff --git a/wasm-edge/src/lib.rs b/wasm-edge/src/lib.rs index 4f91358..96ba51f 100644 --- a/wasm-edge/src/lib.rs +++ b/wasm-edge/src/lib.rs @@ -84,6 +84,8 @@ pub struct RecachedCache { idb: Rc>>, /// Monotonically-increasing WAL sequence counter. seq: Rc>, + /// JS callback invoked after every mutation (local or server/BC-pushed). + on_mutation: Rc>>, _onmessage: Option>, _onbc: Option>, } @@ -108,6 +110,12 @@ fn persist_cmd(idb: &Rc>>, seq: &Rc>, encoded: } } +fn notify_mutation(on_mut: &Rc>>) { + if let Some(f) = on_mut.borrow().as_ref() { + let _ = f.call0(&JsValue::NULL); + } +} + // ── public API ──────────────────────────────────────────────────────────────── #[wasm_bindgen] @@ -120,11 +128,19 @@ impl RecachedCache { bc: None, idb: Rc::new(RefCell::new(None)), seq: Rc::new(Cell::new(0)), + on_mutation: Rc::new(RefCell::new(None)), _onmessage: None, _onbc: None, } } + /// Register a JS callback invoked after every mutation from any source + /// (local write, server WebSocket push, or BroadcastChannel sync). + /// The SDK's `onMutation()` wires this up automatically. + pub fn set_mutation_callback(&mut self, cb: js_sys::Function) { + *self.on_mutation.borrow_mut() = Some(cb); + } + /// Open the IndexedDB WAL, replay all stored commands into the in-memory /// store, and enable persistence for future writes. /// @@ -194,6 +210,7 @@ impl RecachedCache { pub fn broadcast(&mut self, channel_name: &str) -> Result<(), JsValue> { let bc = BroadcastChannel::new(channel_name)?; let store_clone = Arc::clone(&self.store); + let on_mut = Rc::clone(&self.on_mutation); let onbc = Closure::wrap(Box::new(move |e: MessageEvent| { if let Ok(text) = e.data().dyn_into::() { @@ -233,6 +250,7 @@ impl RecachedCache { | Command::ZRem(_, _) | Command::ZIncrBy(_, _, _) => { store_clone.execute(cmd); + notify_mutation(&on_mut); } _ => {} } @@ -252,6 +270,7 @@ impl RecachedCache { pub fn connect(&mut self, url: &str) -> Result<(), JsValue> { let ws = WebSocket::new(url)?; let store_clone = Arc::clone(&self.store); + let on_mut = Rc::clone(&self.on_mutation); let onmessage = Closure::wrap(Box::new(move |e: MessageEvent| { if let Ok(text) = e.data().dyn_into::() { @@ -291,6 +310,7 @@ impl RecachedCache { | Command::ZRem(_, _) | Command::ZIncrBy(_, _, _) => { store_clone.execute(cmd); + notify_mutation(&on_mut); } _ => {} } @@ -333,6 +353,7 @@ impl RecachedCache { let _ = bc.post_message(&JsValue::from_str(&encoded)); } persist_cmd(&self.idb, &self.seq, &encoded); + notify_mutation(&self.on_mutation); match resp { Value::SimpleString(s) => s, @@ -361,6 +382,7 @@ impl RecachedCache { let _ = bc.post_message(&JsValue::from_str(&encoded)); } persist_cmd(&self.idb, &self.seq, &encoded); + notify_mutation(&self.on_mutation); match resp { Value::SimpleString(s) => s, @@ -391,6 +413,7 @@ impl RecachedCache { let _ = bc.post_message(&JsValue::from_str(&encoded)); } persist_cmd(&self.idb, &self.seq, &encoded); + notify_mutation(&self.on_mutation); match resp { Value::Integer(i) => i as i32,