From bb1b31ee141ba19f781bcbf0e35632dd8478fd2e Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 4 Jun 2026 18:18:41 +0200 Subject: [PATCH 1/8] First initial working WebUI --- Cargo.lock | 312 ++++++++++++ Cargo.toml | 3 + breakwater/Cargo.toml | 6 +- breakwater/src/cli_args.rs | 38 +- breakwater/src/main.rs | 18 + breakwater/src/sinks/egui/mod.rs | 24 +- breakwater/src/sinks/mod.rs | 2 + breakwater/src/sinks/web.rs | 268 +++++++++++ breakwater/src/sinks/web_index.html | 716 ++++++++++++++++++++++++++++ 9 files changed, 1362 insertions(+), 25 deletions(-) create mode 100644 breakwater/src/sinks/web.rs create mode 100644 breakwater/src/sinks/web_index.html diff --git a/Cargo.lock b/Cargo.lock index 79f9b7f..d655d21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -326,6 +326,61 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "axum" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" +dependencies = [ + "axum-core", + "base64", + "bytes", + "form_urlencoded", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sha1", + "sync_wrapper", + "tokio", + "tokio-tungstenite", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.76" @@ -341,6 +396,12 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bincode" version = "2.0.1" @@ -429,6 +490,15 @@ dependencies = [ "no_std_io2", ] +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "block2" version = "0.5.1" @@ -452,15 +522,18 @@ name = "breakwater" version = "0.20.0" dependencies = [ "async-trait", + "axum", "breakwater-egui-overlay", "breakwater-parser", "bytemuck", + "bytes", "chrono", "clap", "color-eyre", "const_format", "eframe", "egui", + "flate2", "futures", "libloading 0.9.0", "local-ip-address", @@ -921,6 +994,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -997,6 +1079,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "ctor" version = "0.10.1" @@ -1093,6 +1185,16 @@ dependencies = [ "syn", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "dispatch" version = "0.2.0" @@ -1689,6 +1791,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "gethostname" version = "1.1.0" @@ -1899,6 +2011,86 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "http" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55281c53a1894c864990125767da440a4e630446785086f52523b20033b74498" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + +[[package]] +name = "hyper-util" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" +dependencies = [ + "bytes", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -2481,6 +2673,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "maybe-rayon" version = "0.1.1" @@ -2536,6 +2734,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -3924,6 +4128,12 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "ryu" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" + [[package]] name = "same-file" version = "1.0.6" @@ -4013,6 +4223,40 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -4290,6 +4534,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.13.2" @@ -4533,6 +4783,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "toml_datetime" version = "1.1.1+spec-1.1.0" @@ -4563,6 +4825,34 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.44" @@ -4693,6 +4983,22 @@ version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2df906b07856748fa3f6e0ad0cbaa047052d4a7dd609e231c4f72cee8c36f31" +[[package]] +name = "tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c01152af293afb9c7c2a57e4b559c5620b421f6d133261c60dd2d0cdb38e6b8" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.4", + "sha1", + "thiserror 2.0.18", +] + [[package]] name = "type-map" version = "0.5.1" @@ -4708,6 +5014,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" +[[package]] +name = "typenum" +version = "1.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6f5e870be6c3b371b77fe0ee0bafb859fa4964b4404c27de1d380043c4dda20" + [[package]] name = "unicode-bidi" version = "0.3.18" diff --git a/Cargo.toml b/Cargo.toml index a3a7cf9..8f5b49d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,9 @@ repository = "https://github.com/sernauer/breakwater" [workspace.dependencies] async-trait = "0.1" +axum = { version = "0.8", features = ["ws"] } bytemuck = { version = "1.21" } +bytes = "1.0" chrono = "0.4" clap = { version = "4.5", features = ["derive"] } color-eyre = "0.6" @@ -24,6 +26,7 @@ const_format = "0.2" criterion = { version = "0.8", features = ["async_tokio"] } eframe = { version = "0.34", default-features = false, features = ["glow", "default_fonts", "persistence", "wayland", "x11"] } egui = "0.34" +flate2 = "1.0" futures = "0.3" libloading = "0.9" local-ip-address = "0.6.3" diff --git a/breakwater/Cargo.toml b/breakwater/Cargo.toml index 99b4378..05fb97f 100644 --- a/breakwater/Cargo.toml +++ b/breakwater/Cargo.toml @@ -16,13 +16,16 @@ breakwater-parser.workspace = true breakwater-egui-overlay = { workspace = true, optional = true } async-trait.workspace = true +axum = { workspace = true, optional = true } bytemuck = { workspace = true, optional = true } +bytes = { workspace = true, optional = true } chrono.workspace = true clap.workspace = true color-eyre.workspace = true const_format.workspace = true eframe = { workspace = true, optional = true } egui = { workspace = true, optional = true } +flate2 = { workspace = true, optional = true } libloading = { workspace = true, optional = true } local-ip-address.workspace = true memadvise.workspace = true @@ -48,7 +51,7 @@ rstest.workspace = true [features] # We don't enable binary-sync-pixels and binary-set-pixel by default to make it a bit harder for clients ;) -default = ["egui", "vnc"] +default = ["egui", "vnc", "web"] alpha = ["breakwater-parser/alpha"] binary-set-pixel = ["breakwater-parser/binary-set-pixel"] @@ -56,6 +59,7 @@ binary-sync-pixels = ["breakwater-parser/binary-sync-pixels"] egui = ["dep:breakwater-egui-overlay", "dep:bytemuck", "dep:eframe", "dep:egui", "dep:libloading"] native-display = ["dep:softbuffer", "dep:winit"] vnc = ["dep:vncserver"] +web = ["dep:axum", "dep:bytes", "dep:flate2"] [lints] workspace = true diff --git a/breakwater/src/cli_args.rs b/breakwater/src/cli_args.rs index 99fc1c7..3ee9335 100644 --- a/breakwater/src/cli_args.rs +++ b/breakwater/src/cli_args.rs @@ -100,7 +100,7 @@ pub struct CliArgs { pub viewport: Vec, /// Specify one or more pixelflut endpoints to display. - #[cfg(feature = "egui")] + #[cfg(any(feature = "egui", feature = "web"))] #[clap(long)] pub advertised_endpoints: Vec, @@ -112,6 +112,12 @@ pub struct CliArgs { #[clap(long)] pub ui: Option, + /// Listen address for the WebUI, e.g. `[::]:8080`. + /// Serves a small website that streams the canvas to web browsers over a WebSocket. + #[cfg(feature = "web")] + #[clap(long)] + pub web_listen_address: Option, + /// Create (or use an existing) shared memory region for the framebuffer. /// This enables other applications to read and write Pixel values to the framebuffer or can be /// used to persist the canvas across restarts. @@ -120,6 +126,36 @@ pub struct CliArgs { } impl CliArgs { + /// Resolves the Pixelflut endpoints to advertise to users (so they know where to connect). + /// + /// If `--advertised-endpoints` is set, those are returned verbatim. Otherwise we make a best + /// effort to guess: for a single listener we resolve the local v4 + v6 IPs and append the port, + /// for multiple listeners we just list them. + #[cfg(any(feature = "egui", feature = "web"))] + pub fn resolve_advertised_endpoints(&self) -> Vec { + if !self.advertised_endpoints.is_empty() { + return self.advertised_endpoints.clone(); + } + + match &self.listen_addresses[..] { + // No listeners given, so also no endpoints to advertise + [] => vec![], + // In case of a single listener we get the local IPs (v4 + v6) and concat them with the + // port + [single_listener] => { + let port = single_listener.port(); + + [local_ip_address::local_ip(), local_ip_address::local_ipv6()] + .into_iter() + .filter_map(Result::ok) + .map(|ip| format!("{ip}:{port}")) + .collect() + } + // If multiple listeners are used it's complicated, so we just print them + multiple_listeners => multiple_listeners.iter().map(ToString::to_string).collect(), + } + } + /// Checks that at most one IP per version (v4/v6) is configured. /// Returns the (optional) v4 address and (optional) v6 address. #[cfg(feature = "vnc")] diff --git a/breakwater/src/main.rs b/breakwater/src/main.rs index d4bf178..9651669 100644 --- a/breakwater/src/main.rs +++ b/breakwater/src/main.rs @@ -140,6 +140,24 @@ async fn main() -> eyre::Result<()> { } } + #[cfg(feature = "web")] + { + use crate::sinks::web::WebSink; + + if let Some(web_sink) = WebSink::new( + fb.clone(), + &args, + statistics_tx.clone(), + statistics_information_rx.resubscribe(), + terminate_signal_rx.resubscribe(), + ) + .await + .context("failed to create web sink")? + { + display_sinks.push(Box::new(web_sink)); + } + } + let mut ffmpeg_thread_present = false; if let Some(ffmpeg_sink) = FfmpegSink::new( fb.clone(), diff --git a/breakwater/src/sinks/egui/mod.rs b/breakwater/src/sinks/egui/mod.rs index 0bbed4a..020dda3 100644 --- a/breakwater/src/sinks/egui/mod.rs +++ b/breakwater/src/sinks/egui/mod.rs @@ -104,29 +104,7 @@ impl DisplaySink for EguiSink { } }); - let advertised_endpoints = if cli_args.advertised_endpoints.is_empty() { - // In case no advertised endpoints to display are given, we calculate the most likely - // endpoint(s) to display. - match &cli_args.listen_addresses[..] { - // No listeners given, so also no endpoints to advertise - [] => vec![], - // In case of a single listener we get the local IPs (v4 + v6) and concat them with - // the port - [single_listener] => { - let port = single_listener.port(); - - [local_ip_address::local_ip(), local_ip_address::local_ipv6()] - .into_iter() - .filter_map(Result::ok) - .map(|ip| format!("{ip}:{port}")) - .collect() - } - // If multiple listeners are used it's complicated, so we just print them - multiple_listeners => multiple_listeners.iter().map(ToString::to_string).collect(), - } - } else { - cli_args.advertised_endpoints.clone() - }; + let advertised_endpoints = cli_args.resolve_advertised_endpoints(); Ok(Some(Self { framebuffer: fb, diff --git a/breakwater/src/sinks/mod.rs b/breakwater/src/sinks/mod.rs index 4ee53f7..ff6a1ca 100644 --- a/breakwater/src/sinks/mod.rs +++ b/breakwater/src/sinks/mod.rs @@ -16,6 +16,8 @@ pub mod ffmpeg; pub mod native_display; #[cfg(feature = "vnc")] pub mod vnc; +#[cfg(feature = "web")] +pub mod web; // The stabilization of async functions in traits in Rust 1.75 did not include support for using traits containing async // functions as dyn Trait, so we still need to use async_trait here. diff --git a/breakwater/src/sinks/web.rs b/breakwater/src/sinks/web.rs new file mode 100644 index 0000000..7d9cf6e --- /dev/null +++ b/breakwater/src/sinks/web.rs @@ -0,0 +1,268 @@ +use std::{io::Write, net::SocketAddr, sync::Arc, time::Duration}; + +use async_trait::async_trait; +use axum::{ + Router, + extract::{ + State, + ws::{Message, Utf8Bytes, WebSocket, WebSocketUpgrade}, + }, + response::{Html, Response}, + routing::get, +}; +use breakwater_parser::{FB_BYTES_PER_PIXEL, FrameBuffer}; +use bytes::Bytes; +use color_eyre::eyre::{self, Context}; +use flate2::{Compression, write::ZlibEncoder}; +use tokio::{ + sync::{broadcast, mpsc}, + time, +}; +use tracing::{debug, instrument, trace, warn}; + +use crate::{ + cli_args::CliArgs, + sinks::DisplaySink, + statistics::{StatisticsEvent, StatisticsInformationEvent}, +}; + +const INDEX_HTML: &str = include_str!("web_index.html"); + +/// Number of compressed frames buffered for each connected client. Kept small on purpose: +/// a client that can't drain the buffer in time receives a [`broadcast::error::RecvError::Lagged`] +/// and simply skips ahead to the newest frame, which reduces its effective frame rate. +const FRAME_BUFFER_SIZE: usize = 2; + +/// Number of stats messages buffered per client. Stats are produced roughly once per second, so a +/// small buffer is plenty; a client that lags simply skips the missed updates. As of writing no +/// data is lost in case a stats message is missed, so we don't need to be super careful about that. +const STATS_BUFFER_SIZE: usize = 3; + +#[derive(Clone)] +struct WebState { + /// Carries the latest frame already serialized to binary BLOB, ready to send to every client. + frame_tx: broadcast::Sender, + /// Carries the latest statistics already serialized to JSON, ready to send to every client. + stats_tx: broadcast::Sender, + width: usize, + height: usize, + /// Pixelflut endpoints to advertise to users, sent once on connect. + advertised_endpoints: Vec, +} + +pub struct WebSink { + fb: Arc, + statistics_information_rx: broadcast::Receiver, + terminate_signal_rx: broadcast::Receiver<()>, + + listen_address: SocketAddr, + fps: u32, + frame_tx: broadcast::Sender, + stats_tx: broadcast::Sender, + advertised_endpoints: Vec, + + /// Reused scratch buffer holding one RGBA frame, so we don't reallocate every tick. + frame_buf: Vec, +} + +#[async_trait] +impl DisplaySink for WebSink { + #[instrument(skip_all, err)] + async fn new( + fb: Arc, + cli_args: &CliArgs, + _statistics_tx: mpsc::Sender, + statistics_information_rx: broadcast::Receiver, + terminate_signal_rx: broadcast::Receiver<()>, + ) -> eyre::Result> { + let Some(listen_address) = cli_args.web_listen_address else { + debug!("Web sink not enabled as no web listen address is specified"); + return Ok(None); + }; + + let (frame_tx, _) = broadcast::channel(FRAME_BUFFER_SIZE); + let (stats_tx, _) = broadcast::channel(STATS_BUFFER_SIZE); + let frame_buf = vec![0; fb.get_size() * FB_BYTES_PER_PIXEL]; + + Ok(Some(Self { + fb, + statistics_information_rx, + terminate_signal_rx, + listen_address, + fps: cli_args.fps, + frame_tx, + stats_tx, + advertised_endpoints: cli_args.resolve_advertised_endpoints(), + frame_buf, + })) + } + + #[instrument(skip(self), err)] + async fn run(&mut self) -> eyre::Result<()> { + let state = WebState { + frame_tx: self.frame_tx.clone(), + stats_tx: self.stats_tx.clone(), + width: self.fb.get_width(), + height: self.fb.get_height(), + advertised_endpoints: self.advertised_endpoints.clone(), + }; + + // Dedicated task: serialize every incoming statistics event to JSON (once, not per client) + // and broadcast it. The full per-IP maps are included so the frontend can build show + // traffic per IP. + let mut statistics_information_rx = self.statistics_information_rx.resubscribe(); + let stats_tx = self.stats_tx.clone(); + let stats_task = tokio::spawn(async move { + loop { + match statistics_information_rx.recv().await { + Ok(info) => match serde_json::to_value(&info) { + Ok(mut value) => { + if let Some(object) = value.as_object_mut() { + object.insert("type".to_owned(), "stats".into()); + } + // Ignore the error: it only means no clients are currently connected. + let _ = stats_tx.send(Utf8Bytes::from(value.to_string())); + } + Err(err) => warn!(%err, "failed to serialize statistics to JSON"), + }, + // We fell behind on statistics events; just continue with the next one. + Err(broadcast::error::RecvError::Lagged(_)) => {} + // The statistics thread shut down, so will we. + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); + + let app = Router::new() + .route("/", get(index)) + .route("/ws", get(ws_handler)) + .with_state(state); + + let listener = tokio::net::TcpListener::bind(self.listen_address) + .await + .with_context(|| format!("failed to bind web server to {}", self.listen_address))?; + tracing::info!( + "Web UI available at http://{}", + listener.local_addr().unwrap_or(self.listen_address) + ); + + // Shut the HTTP server down gracefully once we receive the terminate signal. + let mut server_terminate_rx = self.terminate_signal_rx.resubscribe(); + let server = tokio::spawn(async move { + let shutdown = async move { + let _ = server_terminate_rx.recv().await; + }; + if let Err(err) = axum::serve(listener, app) + .with_graceful_shutdown(shutdown) + .await + { + warn!(%err, "web server stopped unexpectedly"); + } + }); + + // Encoder loop: compress the framebuffer once per tick and broadcast the bytes to every + // connected client. The expensive work (copy + compress) happens a single time regardless + // of the number of viewers. + let mut interval = time::interval(Duration::from_micros(1_000_000 / u64::from(self.fps))); + loop { + if self.terminate_signal_rx.try_recv().is_ok() { + break; + } + + // No point spending CPU on compression while nobody is watching. + if self.frame_tx.receiver_count() > 0 { + let frame = self.encode_frame()?; + // Ignore the error: it only means all receivers disconnected between the check above + // and here. + let _ = self.frame_tx.send(frame); + } + + interval.tick().await; + } + + server.abort(); + stats_task.abort(); + Ok(()) + } +} + +impl WebSink { + /// Copies the current framebuffer into the scratch buffer, forces the alpha channel to opaque + /// (the framebuffer stores `rgb0`, but the browser's `ImageData` expects a meaningful alpha), + /// and zlib-compresses the result. + fn encode_frame(&mut self) -> eyre::Result { + self.frame_buf.copy_from_slice(self.fb.as_bytes()); + for pixel in self.frame_buf.chunks_exact_mut(FB_BYTES_PER_PIXEL) { + pixel[3] = 0xff; + } + + // `Compression::fast()` (level 1) keeps CPU usage low; Pixelflut battles are high-entropy, + // so a higher level would mostly burn CPU for little gain. + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::fast()); + encoder + .write_all(&self.frame_buf) + .context("failed to compress frame")?; + let compressed = encoder.finish().context("failed to finish compression")?; + + trace!( + raw_bytes = self.frame_buf.len(), + compressed_bytes = compressed.len(), + "encoded web frame" + ); + + Ok(Bytes::from(compressed)) + } +} + +async fn index() -> Html<&'static str> { + Html(INDEX_HTML) +} + +async fn ws_handler(ws: WebSocketUpgrade, State(state): State) -> Response { + ws.on_upgrade(move |socket| handle_socket(socket, state)) +} + +async fn handle_socket(mut socket: WebSocket, state: WebState) { + // Tell the client the canvas dimensions (so it can size the `` and allocate + // `ImageData`) and the Pixelflut endpoints to advertise. + let hello = serde_json::json!({ + "type": "hello", + "width": state.width, + "height": state.height, + "advertised_endpoints": state.advertised_endpoints, + }) + .to_string(); + if socket.send(Message::Text(hello.into())).await.is_err() { + return; + } + + let mut frame_rx = state.frame_tx.subscribe(); + let mut stats_rx = state.stats_tx.subscribe(); + loop { + tokio::select! { + frame = frame_rx.recv() => match frame { + Ok(frame) => { + if socket.send(Message::Binary(frame)).await.is_err() { + // Client disconnected. + break; + } + } + // This client fell behind: skip the dropped frames and continue with the newest one. + // This is what throttles slow clients to a lower frame rate. + Err(broadcast::error::RecvError::Lagged(skipped)) => { + trace!(skipped, "web client lagging behind, dropping frames"); + } + Err(broadcast::error::RecvError::Closed) => break, + }, + stats_msg = stats_rx.recv() => match stats_msg { + Ok(json) => { + if socket.send(Message::Text(json)).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + }, + } + } +} diff --git a/breakwater/src/sinks/web_index.html b/breakwater/src/sinks/web_index.html new file mode 100644 index 0000000..da16521 --- /dev/null +++ b/breakwater/src/sinks/web_index.html @@ -0,0 +1,716 @@ + + + + + + Pixelflut viewer + + + +
+ +
+ +
+ +
connecting…
+ +
+ Pixelflut canvas (breakwater) + + +
+
+ + + +
+ + + + From 3acd8db891577e49797fe4f867d4bc7c37452d34 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 5 Jun 2026 00:56:12 +0200 Subject: [PATCH 2/8] Implement basic chat --- breakwater/src/cli_args.rs | 5 + breakwater/src/sinks/web.rs | 197 ++++++++++++++++++++++++---- breakwater/src/sinks/web_index.html | 90 +++++++++---- 3 files changed, 238 insertions(+), 54 deletions(-) diff --git a/breakwater/src/cli_args.rs b/breakwater/src/cli_args.rs index 3ee9335..4457491 100644 --- a/breakwater/src/cli_args.rs +++ b/breakwater/src/cli_args.rs @@ -118,6 +118,11 @@ pub struct CliArgs { #[clap(long)] pub web_listen_address: Option, + /// Maximum number of chat messages a single IP address may send per minute in the WebUI. + #[cfg(feature = "web")] + #[clap(long, default_value_t = 10)] + pub chat_messages_per_minute: u32, + /// Create (or use an existing) shared memory region for the framebuffer. /// This enables other applications to read and write Pixel values to the framebuffer or can be /// used to persist the canvas across restarts. diff --git a/breakwater/src/sinks/web.rs b/breakwater/src/sinks/web.rs index 7d9cf6e..d280037 100644 --- a/breakwater/src/sinks/web.rs +++ b/breakwater/src/sinks/web.rs @@ -1,10 +1,16 @@ -use std::{io::Write, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, VecDeque}, + io::Write, + net::{IpAddr, SocketAddr}, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; use async_trait::async_trait; use axum::{ Router, extract::{ - State, + ConnectInfo, State, ws::{Message, Utf8Bytes, WebSocket, WebSocketUpgrade}, }, response::{Html, Response}, @@ -14,6 +20,7 @@ use breakwater_parser::{FB_BYTES_PER_PIXEL, FrameBuffer}; use bytes::Bytes; use color_eyre::eyre::{self, Context}; use flate2::{Compression, write::ZlibEncoder}; +use futures::{SinkExt, StreamExt, stream::SplitSink}; use tokio::{ sync::{broadcast, mpsc}, time, @@ -38,12 +45,35 @@ const FRAME_BUFFER_SIZE: usize = 2; /// data is lost in case a stats message is missed, so we don't need to be super careful about that. const STATS_BUFFER_SIZE: usize = 3; +/// Number of chat messages buffered per client. A client that lags this far behind will miss some +/// chat messages. As they should be cheap to send we try to deliver all of them. +const CHAT_BUFFER_SIZE: usize = 1024; + +/// The window over which the per-IP chat rate limit is applied. +const CHAT_RATE_LIMIT_WINDOW: Duration = Duration::from_mins(1); + +/// Maximum length (in characters) of a chat username and message. Enforced server-side so a crafted +/// client can't bypass the frontend's `maxlength` and blow up the UI. +/// +/// Note: If you change this value, please also change it in the frontend. +const MAX_CHAT_NAME_LEN: usize = 20; +const MAX_CHAT_MESSAGE_LEN: usize = 256; + +/// Tracks the timestamps of recent chat messages per IP address, shared across all connections so +/// the rate limit applies per IP rather than per connection. +type ChatRateLimiter = Arc>>>; + #[derive(Clone)] struct WebState { /// Carries the latest frame already serialized to binary BLOB, ready to send to every client. frame_tx: broadcast::Sender, /// Carries the latest statistics already serialized to JSON, ready to send to every client. stats_tx: broadcast::Sender, + /// Carries chat messages (already serialized to JSON) to every connected client. + chat_tx: broadcast::Sender, + /// Maximum number of chat messages a single IP may send per [`CHAT_RATE_LIMIT_WINDOW`]. + chat_rate_limit: u32, + chat_rate_limiter: ChatRateLimiter, width: usize, height: usize, /// Pixelflut endpoints to advertise to users, sent once on connect. @@ -57,9 +87,10 @@ pub struct WebSink { listen_address: SocketAddr, fps: u32, - frame_tx: broadcast::Sender, - stats_tx: broadcast::Sender, - advertised_endpoints: Vec, + + /// Shared state handed to every connection handler (channels, rate limiter, canvas size, ...). + /// The sink keeps its own copy to feed the encoder loop and stats task. + state: WebState, /// Reused scratch buffer holding one RGBA frame, so we don't reallocate every tick. frame_buf: Vec, @@ -82,36 +113,40 @@ impl DisplaySink for WebSink { let (frame_tx, _) = broadcast::channel(FRAME_BUFFER_SIZE); let (stats_tx, _) = broadcast::channel(STATS_BUFFER_SIZE); + let (chat_tx, _) = broadcast::channel(CHAT_BUFFER_SIZE); let frame_buf = vec![0; fb.get_size() * FB_BYTES_PER_PIXEL]; + let state = WebState { + frame_tx, + stats_tx, + chat_tx, + chat_rate_limit: cli_args.chat_messages_per_minute, + chat_rate_limiter: Arc::new(Mutex::new(HashMap::new())), + width: fb.get_width(), + height: fb.get_height(), + advertised_endpoints: cli_args.resolve_advertised_endpoints(), + }; + Ok(Some(Self { fb, statistics_information_rx, terminate_signal_rx, listen_address, fps: cli_args.fps, - frame_tx, - stats_tx, - advertised_endpoints: cli_args.resolve_advertised_endpoints(), + state, frame_buf, })) } #[instrument(skip(self), err)] async fn run(&mut self) -> eyre::Result<()> { - let state = WebState { - frame_tx: self.frame_tx.clone(), - stats_tx: self.stats_tx.clone(), - width: self.fb.get_width(), - height: self.fb.get_height(), - advertised_endpoints: self.advertised_endpoints.clone(), - }; + let state = self.state.clone(); // Dedicated task: serialize every incoming statistics event to JSON (once, not per client) // and broadcast it. The full per-IP maps are included so the frontend can build show // traffic per IP. let mut statistics_information_rx = self.statistics_information_rx.resubscribe(); - let stats_tx = self.stats_tx.clone(); + let stats_tx = self.state.stats_tx.clone(); let stats_task = tokio::spawn(async move { loop { match statistics_information_rx.recv().await { @@ -152,9 +187,14 @@ impl DisplaySink for WebSink { let shutdown = async move { let _ = server_terminate_rx.recv().await; }; - if let Err(err) = axum::serve(listener, app) - .with_graceful_shutdown(shutdown) - .await + // `into_make_service_with_connect_info` makes the peer `SocketAddr` available to + // handlers via `ConnectInfo`, which we use for the per-IP chat rate limit. + if let Err(err) = axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .with_graceful_shutdown(shutdown) + .await { warn!(%err, "web server stopped unexpectedly"); } @@ -170,11 +210,11 @@ impl DisplaySink for WebSink { } // No point spending CPU on compression while nobody is watching. - if self.frame_tx.receiver_count() > 0 { + if self.state.frame_tx.receiver_count() > 0 { let frame = self.encode_frame()?; // Ignore the error: it only means all receivers disconnected between the check above // and here. - let _ = self.frame_tx.send(frame); + let _ = self.state.frame_tx.send(frame); } interval.tick().await; @@ -218,11 +258,18 @@ async fn index() -> Html<&'static str> { Html(INDEX_HTML) } -async fn ws_handler(ws: WebSocketUpgrade, State(state): State) -> Response { - ws.on_upgrade(move |socket| handle_socket(socket, state)) +async fn ws_handler( + ws: WebSocketUpgrade, + ConnectInfo(who): ConnectInfo, + State(state): State, +) -> Response { + ws.on_upgrade(move |socket| handle_socket(socket, who.ip(), state)) } -async fn handle_socket(mut socket: WebSocket, state: WebState) { +async fn handle_socket(socket: WebSocket, ip: IpAddr, state: WebState) { + // Split so we can read incoming chat messages and write outgoing frames/stats/chat concurrently. + let (mut sender, mut receiver) = socket.split(); + // Tell the client the canvas dimensions (so it can size the `` and allocate // `ImageData`) and the Pixelflut endpoints to advertise. let hello = serde_json::json!({ @@ -232,17 +279,18 @@ async fn handle_socket(mut socket: WebSocket, state: WebState) { "advertised_endpoints": state.advertised_endpoints, }) .to_string(); - if socket.send(Message::Text(hello.into())).await.is_err() { + if sender.send(Message::Text(hello.into())).await.is_err() { return; } let mut frame_rx = state.frame_tx.subscribe(); let mut stats_rx = state.stats_tx.subscribe(); + let mut chat_rx = state.chat_tx.subscribe(); loop { tokio::select! { frame = frame_rx.recv() => match frame { Ok(frame) => { - if socket.send(Message::Binary(frame)).await.is_err() { + if sender.send(Message::Binary(frame)).await.is_err() { // Client disconnected. break; } @@ -256,13 +304,108 @@ async fn handle_socket(mut socket: WebSocket, state: WebState) { }, stats_msg = stats_rx.recv() => match stats_msg { Ok(json) => { - if socket.send(Message::Text(json)).await.is_err() { + if sender.send(Message::Text(json)).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + }, + chat_msg = chat_rx.recv() => match chat_msg { + Ok(json) => { + if sender.send(Message::Text(json)).await.is_err() { break; } } Err(broadcast::error::RecvError::Lagged(_)) => {} Err(broadcast::error::RecvError::Closed) => break, }, + incoming = receiver.next() => match incoming { + Some(Ok(Message::Text(text))) => handle_incoming_chat(&text, ip, &state, &mut sender).await, + // Client closed the connection or errored. + Some(Ok(Message::Close(_)) | Err(_)) | None => break, + // Ignore anything else the client might send (binary, ping, pong). + Some(Ok(_)) => {} + }, } } } + +/// Parses, validates and rate-limits an incoming chat message. On success it is broadcast to all +/// clients; if the sender hit the rate limit, a `chat_error` is sent back only to them. +async fn handle_incoming_chat( + text: &str, + ip: IpAddr, + state: &WebState, + sender: &mut SplitSink, +) { + let Ok(value) = serde_json::from_str::(text) else { + return; + }; + if value.get("type").and_then(serde_json::Value::as_str) != Some("chat") { + return; + } + + let name = value + .get("name") + .and_then(serde_json::Value::as_str) + .unwrap_or_default() + .trim(); + let message = value + .get("text") + .and_then(serde_json::Value::as_str) + .unwrap_or_default() + .trim(); + if name.is_empty() || message.is_empty() { + return; + } + + // Basic sanity caps so a single message can't blow up the UI. + let name: String = name.chars().take(MAX_CHAT_NAME_LEN).collect(); + let message: String = message.chars().take(MAX_CHAT_MESSAGE_LEN).collect(); + + match check_rate_limit(&state.chat_rate_limiter, ip, state.chat_rate_limit) { + Ok(()) => { + let json = + serde_json::json!({ "type": "chat", "name": name, "text": message, "ip": ip }); + let _ = state.chat_tx.send(Utf8Bytes::from(json.to_string())); + } + Err(recent) => { + let json = serde_json::json!({ + "type": "chat_error", + "text": format!( + "Your IP {ip} already sent {recent} messages in the last minute, limit is {}", + state.chat_rate_limit, + ), + }); + let _ = sender + .send(Message::Text(Utf8Bytes::from(json.to_string()))) + .await; + } + } +} + +/// Records a chat message for `ip` if it is within the per-IP rate limit. +/// +/// Returns `Ok(())` if allowed (and records the message), or `Err(recent)` with the number of +/// messages already sent within [`CHAT_RATE_LIMIT_WINDOW`] if the limit has been reached. +fn check_rate_limit(limiter: &ChatRateLimiter, ip: IpAddr, limit: u32) -> Result<(), usize> { + let now = Instant::now(); + let mut limiter = limiter.lock().expect("chat rate limiter mutex poisoned"); + let timestamps = limiter.entry(ip).or_default(); + + // Drop timestamps that have aged out of the window. + while timestamps + .front() + .is_some_and(|&t| now.duration_since(t) > CHAT_RATE_LIMIT_WINDOW) + { + timestamps.pop_front(); + } + + if timestamps.len() >= limit as usize { + Err(timestamps.len()) + } else { + timestamps.push_back(now); + Ok(()) + } +} diff --git a/breakwater/src/sinks/web_index.html b/breakwater/src/sinks/web_index.html index da16521..1204d42 100644 --- a/breakwater/src/sinks/web_index.html +++ b/breakwater/src/sinks/web_index.html @@ -260,17 +260,20 @@ scrollbar-color: var(--border) transparent; } .msg { font-size: 13px; line-height: 1.35; word-wrap: break-word; } - .msg .author { font-weight: 600; margin-right: 6px; } + .msg-head { display: flex; align-items: baseline; gap: 6px; } + .msg .author { font-weight: 600; } + .msg-ip { color: var(--text-dim); font-size: 11px; font-family: monospace; } + .msg-text { word-wrap: break-word; overflow-wrap: anywhere; } .msg.system { color: var(--text-dim); font-style: italic; } .chat-input { display: flex; + flex-direction: column; gap: 8px; padding: 10px 0 14px; border-top: 1px solid var(--border); } .chat-input input { - flex: 1; background: var(--panel-2); border: 1px solid var(--border); border-radius: 18px; @@ -280,6 +283,11 @@ outline: none; } .chat-input input:focus { border-color: var(--accent); } + .chat-send-row { + display: flex; + gap: 8px; + } + .chat-send-row input { flex: 1; min-width: 0; } .chat-input button { background: var(--accent); color: #000; @@ -292,17 +300,6 @@ } .chat-input button:disabled { opacity: .4; cursor: default; } - .mock-tag { - font-size: 10px; - color: var(--text-dim); - border: 1px solid var(--border); - border-radius: 4px; - padding: 1px 5px; - margin-left: 6px; - vertical-align: middle; - text-transform: uppercase; - letter-spacing: .04em; - } /* ---------- Responsive ---------- */ @media (max-width: 820px) { @@ -391,11 +388,14 @@

Top traffic

-

Chat mock

+

Chat

- - + +
+ + +
@@ -417,6 +417,7 @@

Chat mock

let latest = null; // most recent compressed frame (ArrayBuffer); older ones are dropped let inflating = false; let framesThisSecond = 0; + let activeSocket = null; // current WebSocket, used to send chat messages // Decompress a zlib (deflate) frame using only the browser-native DecompressionStream. // No third-party JavaScript required. @@ -429,6 +430,7 @@

Chat mock

const proto = location.protocol === "https:" ? "wss" : "ws"; const ws = new WebSocket(`${proto}://${location.host}/ws`); ws.binaryType = "arraybuffer"; + activeSocket = ws; ws.onmessage = (event) => { if (typeof event.data === "string") { @@ -442,6 +444,10 @@

Chat mock

renderEndpoints(msg.advertised_endpoints); } else if (msg.type === "stats") { updateStats(msg); + } else if (msg.type === "chat") { + addMessage(msg.name, msg.text, false, msg.ip); + } else if (msg.type === "chat_error") { + addMessage("", msg.text, true); } } else { // Keep only the newest frame; if we're behind, intermediate frames are skipped. @@ -450,6 +456,7 @@

Chat mock

}; ws.onclose = () => { + activeSocket = null; fpsEl.textContent = "disconnected"; setTimeout(connect, 1000); }; @@ -648,7 +655,7 @@

Chat mock

.slice(0, 5); if (top.length === 0) { - topTraffic.innerHTML = '
  • no traffic yet
  • '; + topTraffic.innerHTML = '
  • no traffic currently
  • '; return; } @@ -672,28 +679,48 @@

    Chat mock

    } // ===================================================================== - // Chat (MOCK — local only, replace with a real chat channel later) + // Chat // ===================================================================== const chatMessages = document.getElementById("chat-messages"); const chatForm = document.getElementById("chat-form"); + const chatName = document.getElementById("chat-name"); const chatText = document.getElementById("chat-text"); - function addMessage(author, text, system = false) { + // Remember the chosen name across reloads; it can be changed at any time. + chatName.value = localStorage.getItem("chatName") || ""; + chatName.addEventListener("input", () => { + localStorage.setItem("chatName", chatName.value.trim()); + }); + + function addMessage(author, text, system = false, ip = null) { const el = document.createElement("div"); el.className = system ? "msg system" : "msg"; if (system) { el.textContent = text; } else { + // First line: author name + originating IP (so name spoofing is easy to spot). + const head = document.createElement("div"); + head.className = "msg-head"; const a = document.createElement("span"); a.className = "author"; - a.textContent = author + ":"; + a.textContent = author; a.style.color = colorFor(author); - el.appendChild(a); - el.appendChild(document.createTextNode(text)); + head.appendChild(a); + if (ip) { + const ipEl = document.createElement("span"); + ipEl.className = "msg-ip"; + ipEl.textContent = ip; + head.appendChild(ipEl); + } + // Second line: the message text. + const body = document.createElement("div"); + body.className = "msg-text"; + body.textContent = text; + el.appendChild(head); + el.appendChild(body); } - const atBottom = chatMessages.scrollTop + chatMessages.clientHeight >= chatMessages.scrollHeight - 20; chatMessages.appendChild(el); - if (atBottom) chatMessages.scrollTop = chatMessages.scrollHeight; + chatMessages.scrollTop = chatMessages.scrollHeight; } function colorFor(name) { @@ -704,13 +731,22 @@

    Chat mock

    chatForm.addEventListener("submit", (e) => { e.preventDefault(); + const name = chatName.value.trim(); const text = chatText.value.trim(); + if (!name) { + addMessage("", "Please enter your name above before chatting.", true); + chatName.focus(); + return; + } if (!text) return; - addMessage("you", text); + if (!activeSocket || activeSocket.readyState !== WebSocket.OPEN) { + addMessage("", "Not connected — please wait a moment and try again.", true); + return; + } + // The server echoes the message back to everyone (including us), so we don't add it locally. + activeSocket.send(JSON.stringify({ type: "chat", name, text })); chatText.value = ""; }); - - addMessage("developer", "Psssst it's a mock. Need more Tschunk :)"); From 81b671f548ab4a295afc6c9d0dce170b5ac83117 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 5 Jun 2026 02:32:28 +0200 Subject: [PATCH 3/8] Print more compression details --- breakwater/src/sinks/web.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/breakwater/src/sinks/web.rs b/breakwater/src/sinks/web.rs index d280037..0ad6e92 100644 --- a/breakwater/src/sinks/web.rs +++ b/breakwater/src/sinks/web.rs @@ -238,15 +238,19 @@ impl WebSink { // `Compression::fast()` (level 1) keeps CPU usage low; Pixelflut battles are high-entropy, // so a higher level would mostly burn CPU for little gain. + let start = Instant::now(); let mut encoder = ZlibEncoder::new(Vec::new(), Compression::fast()); encoder .write_all(&self.frame_buf) .context("failed to compress frame")?; let compressed = encoder.finish().context("failed to finish compression")?; + let compression_time = start.elapsed(); trace!( raw_bytes = self.frame_buf.len(), compressed_bytes = compressed.len(), + compression_factor = self.frame_buf.len() as f64 / compressed.len() as f64, + ?compression_time, "encoded web frame" ); From db1f53b5cbf040514e33f87f478440520a2dd957 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 5 Jun 2026 16:48:17 +0200 Subject: [PATCH 4/8] perf: Split the frame into chunks and compress them individually --- breakwater/src/sinks/web.rs | 116 +++++++++++++++++++++++++--- breakwater/src/sinks/web_index.html | 36 ++++++++- 2 files changed, 137 insertions(+), 15 deletions(-) diff --git a/breakwater/src/sinks/web.rs b/breakwater/src/sinks/web.rs index 0ad6e92..aafe055 100644 --- a/breakwater/src/sinks/web.rs +++ b/breakwater/src/sinks/web.rs @@ -21,6 +21,7 @@ use bytes::Bytes; use color_eyre::eyre::{self, Context}; use flate2::{Compression, write::ZlibEncoder}; use futures::{SinkExt, StreamExt, stream::SplitSink}; +use simple_moving_average::{SMA, SingleSumSMA}; use tokio::{ sync::{broadcast, mpsc}, time, @@ -35,6 +36,18 @@ use crate::{ const INDEX_HTML: &str = include_str!("web_index.html"); +/// Number of independently-compressed chunks per frame. The framebuffer is split into this many +/// contiguous byte ranges that are zlib-compressed in parallel, drastically cutting the wall-clock +/// time spent compressing a single frame. All chunks are packed into one websocket message (see +/// [`WebSink::encode_frame`]), so a client never renders a partially-updated frame. +/// +/// Note: If you change this value, the frontend adapts automatically as the chunk count is encoded +/// in the message header. +const FRAME_COMPRESSION_CHUNKS: usize = 16; + +/// Number of recent frames over which the average compression duration is computed for logging. +const COMPRESSION_TIME_WINDOW_SIZE: usize = 100; + /// Number of compressed frames buffered for each connected client. Kept small on purpose: /// a client that can't drain the buffer in time receives a [`broadcast::error::RecvError::Lagged`] /// and simply skips ahead to the newest frame, which reduces its effective frame rate. @@ -94,6 +107,10 @@ pub struct WebSink { /// Reused scratch buffer holding one RGBA frame, so we don't reallocate every tick. frame_buf: Vec, + + /// Rolling average of the per-frame compression duration (in microseconds), logged alongside + /// the instantaneous duration to give a more stable picture. + compression_time_window: SingleSumSMA, } #[async_trait] @@ -135,6 +152,7 @@ impl DisplaySink for WebSink { fps: cli_args.fps, state, frame_buf, + compression_time_window: SingleSumSMA::new(), })) } @@ -204,6 +222,9 @@ impl DisplaySink for WebSink { // connected client. The expensive work (copy + compress) happens a single time regardless // of the number of viewers. let mut interval = time::interval(Duration::from_micros(1_000_000 / u64::from(self.fps))); + // In case we delayed a frame, there is no point in trying to get the following frames + // quicker as a compensation. + interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay); loop { if self.terminate_signal_rx.try_recv().is_ok() { break; @@ -211,7 +232,7 @@ impl DisplaySink for WebSink { // No point spending CPU on compression while nobody is watching. if self.state.frame_tx.receiver_count() > 0 { - let frame = self.encode_frame()?; + let frame = self.encode_frame().await?; // Ignore the error: it only means all receivers disconnected between the check above // and here. let _ = self.state.frame_tx.send(frame); @@ -230,34 +251,105 @@ impl WebSink { /// Copies the current framebuffer into the scratch buffer, forces the alpha channel to opaque /// (the framebuffer stores `rgb0`, but the browser's `ImageData` expects a meaningful alpha), /// and zlib-compresses the result. - fn encode_frame(&mut self) -> eyre::Result { + /// + /// Compression is the single most expensive part of serving the web UI, so the buffer is split + /// into [`FRAME_COMPRESSION_CHUNKS`] contiguous byte ranges that are compressed in parallel. + /// The compressed chunks are concatenated into one message, prefixed with a small header so the + /// client can split them apart again: + /// + /// ```text + /// u32le chunk_count + /// u32le compressed_len × chunk_count + /// bytes compressed chunk data, back-to-back, in order + /// ``` + /// + /// Because the chunks are simply consecutive slices of the framebuffer, the client reproduces + /// the full frame by decompressing each chunk and concatenating the output in order. Keeping + /// everything in a single websocket message guarantees a client never renders a half-updated + /// frame (which would show as a visible tear/artefact). + async fn encode_frame(&mut self) -> eyre::Result { self.frame_buf.copy_from_slice(self.fb.as_bytes()); for pixel in self.frame_buf.chunks_exact_mut(FB_BYTES_PER_PIXEL) { pixel[3] = 0xff; } - // `Compression::fast()` (level 1) keeps CPU usage low; Pixelflut battles are high-entropy, - // so a higher level would mostly burn CPU for little gain. let start = Instant::now(); - let mut encoder = ZlibEncoder::new(Vec::new(), Compression::fast()); - encoder - .write_all(&self.frame_buf) - .context("failed to compress frame")?; - let compressed = encoder.finish().context("failed to finish compression")?; + + let len = self.frame_buf.len(); + // Round up so we never end up with more than `FRAME_COMPRESSION_CHUNKS` chunks. The exact + // split points don't matter for correctness as the client reassembles the chunks in order. + let chunk_size = len.div_ceil(FRAME_COMPRESSION_CHUNKS).max(1); + + // Compress each chunk on Tokio's blocking thread pool. `spawn_blocking` requires `'static` + // closures, so we temporarily move the scratch buffer into an `Arc` that every task shares; + // it is reclaimed below to keep reusing the same allocation across frames. + let frame = Arc::new(std::mem::take(&mut self.frame_buf)); + let mut tasks = Vec::with_capacity(FRAME_COMPRESSION_CHUNKS); + let mut offset = 0; + while offset < len { + let end = (offset + chunk_size).min(len); + let frame = Arc::clone(&frame); + tasks.push(tokio::task::spawn_blocking(move || { + compress_chunk(&frame[offset..end]) + })); + offset = end; + } + + // Collect in spawn order, so the chunks stay in framebuffer order. + let mut compressed_chunks: Vec> = Vec::with_capacity(tasks.len()); + for task in tasks { + compressed_chunks.push(task.await.context("compression task panicked")??); + } + + // All tasks have finished, so we are the sole owner again: reclaim the buffer for reuse. + self.frame_buf = + Arc::try_unwrap(frame).expect("frame buffer still shared after compression"); + let compression_time = start.elapsed(); + self.compression_time_window + .add_sample(compression_time.as_micros() as u64); + let avg_compression_time = + Duration::from_micros(self.compression_time_window.get_average()); + + // Assemble the framed message: header (chunk count + per-chunk lengths) followed by the + // compressed bytes. + let compressed_bytes: usize = compressed_chunks.iter().map(Vec::len).sum(); + let header_len = (1 + compressed_chunks.len()) * size_of::(); + let mut message = Vec::with_capacity(header_len + compressed_bytes); + message.extend_from_slice(&(compressed_chunks.len() as u32).to_le_bytes()); + for chunk in &compressed_chunks { + message.extend_from_slice(&(chunk.len() as u32).to_le_bytes()); + } + for chunk in &compressed_chunks { + message.extend_from_slice(chunk); + } trace!( raw_bytes = self.frame_buf.len(), - compressed_bytes = compressed.len(), - compression_factor = self.frame_buf.len() as f64 / compressed.len() as f64, + compressed_bytes, + chunks = compressed_chunks.len(), + compression_factor = self.frame_buf.len() as f64 / compressed_bytes as f64, ?compression_time, + ?avg_compression_time, "encoded web frame" ); - Ok(Bytes::from(compressed)) + Ok(Bytes::from(message)) } } +/// Zlib-compresses a single chunk of the framebuffer. +/// +/// `Compression::fast()` (level 1) keeps CPU usage low; Pixelflut battles are high-entropy, so a +/// higher level would mostly burn CPU for little gain. +fn compress_chunk(data: &[u8]) -> eyre::Result> { + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::fast()); + encoder + .write_all(data) + .context("failed to compress frame chunk")?; + encoder.finish().context("failed to finish compression") +} + async fn index() -> Html<&'static str> { Html(INDEX_HTML) } diff --git a/breakwater/src/sinks/web_index.html b/breakwater/src/sinks/web_index.html index 1204d42..32f7c9f 100644 --- a/breakwater/src/sinks/web_index.html +++ b/breakwater/src/sinks/web_index.html @@ -419,13 +419,39 @@

    Chat

    let framesThisSecond = 0; let activeSocket = null; // current WebSocket, used to send chat messages - // Decompress a zlib (deflate) frame using only the browser-native DecompressionStream. + // Decompress a single zlib (deflate) chunk using only the browser-native DecompressionStream. // No third-party JavaScript required. async function inflate(buffer) { const stream = new Response(buffer).body.pipeThrough(new DecompressionStream("deflate")); return new Uint8Array(await new Response(stream).arrayBuffer()); } + // A frame arrives as several zlib-compressed chunks packed into one message, so the server can + // compress them in parallel. Layout (little-endian): + // u32 chunkCount + // u32 compressedLen × chunkCount + // compressed bytes (chunks back-to-back, in order) + // Decompressing each chunk and concatenating the output in order reproduces the full RGBA + // framebuffer. Returns the decompressed chunks (Uint8Array[]) in order. + async function inflateFrame(buffer) { + const view = new DataView(buffer); + const chunkCount = view.getUint32(0, true); + let offset = 4; + const slices = []; + for (let i = 0; i < chunkCount; i++) { + const len = view.getUint32(offset, true); + offset += 4; + slices.push(len); + } + let dataOffset = offset; + // Decompress all chunks concurrently; the browser can run these off the main thread. + return Promise.all(slices.map((len) => { + const slice = buffer.slice(dataOffset, dataOffset + len); + dataOffset += len; + return inflate(slice); + })); + } + function connect() { const proto = location.protocol === "https:" ? "wss" : "ws"; const ws = new WebSocket(`${proto}://${location.host}/ws`); @@ -471,8 +497,12 @@

    Chat

    const buffer = latest; latest = null; try { - const rgba = await inflate(buffer); - imageData.data.set(rgba); + const chunks = await inflateFrame(buffer); + let dataOffset = 0; + for (const rgba of chunks) { + imageData.data.set(rgba, dataOffset); + dataOffset += rgba.length; + } ctx.putImageData(imageData, 0, 0); framesThisSecond++; updateCoordReadout(); From cfe9b4adf8d5c7d1c788225e5f65115166bdee61 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Sat, 6 Jun 2026 22:58:47 +0200 Subject: [PATCH 5/8] fix: Calculate clients bytes/s on the server to avoid wrong numbers in the UI --- breakwater/src/sinks/web_index.html | 23 ++++------------------- breakwater/src/statistics.rs | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/breakwater/src/sinks/web_index.html b/breakwater/src/sinks/web_index.html index 32f7c9f..9e3c4a2 100644 --- a/breakwater/src/sinks/web_index.html +++ b/breakwater/src/sinks/web_index.html @@ -645,11 +645,6 @@

    Chat

    return `${value} ${prefixes[i]}${unit}`; } - // The server only sends cumulative byte counts; we derive the current per-IP rate by diffing - // consecutive snapshots here, which keeps the Rust side minimal. - let prevBytesForIp = null; - let prevStatsTime = 0; - function updateStats(s) { const v4 = s.ips_v4 || 0; const v6 = s.ips_v6 || 0; @@ -659,20 +654,10 @@

    Chat

    stTraffic.innerHTML = fmtSI(s.bytes_per_s * 8, "bit/s"); stTotal.innerHTML = fmtSI(s.bytes, "B"); - const totals = s.bytes_for_ip || {}; - const now = performance.now(); - const elapsed = prevStatsTime ? (now - prevStatsTime) / 1000 : 0; - const rates = {}; - if (elapsed > 0 && prevBytesForIp) { - for (const ip in totals) { - const delta = totals[ip] - (prevBytesForIp[ip] || 0); - if (delta > 0) rates[ip] = delta / elapsed; - } - } - prevBytesForIp = totals; - prevStatsTime = now; - - renderTopTraffic(rates, totals, s.connections_for_ip || {}); + // Per-IP rates are computed server-side (`bytes_per_s_for_ip`). Diffing the cumulative + // `bytes_for_ip` against wall-clock time on the client produced bogus spikes (TBit/s) when + // the tab is throttled, e.g. on mobile running at <= 1 fps. + renderTopTraffic(s.bytes_per_s_for_ip || {}, s.bytes_for_ip || {}, s.connections_for_ip || {}); // Also available for future views: // s.ips_v4, s.ips_v6, s.fps, s.denied_connections_for_ip } diff --git a/breakwater/src/statistics.rs b/breakwater/src/statistics.rs index 85f1288..5317c08 100644 --- a/breakwater/src/statistics.rs +++ b/breakwater/src/statistics.rs @@ -58,6 +58,7 @@ pub struct StatisticsInformationEvent { pub connections_for_ip: HashMap, pub denied_connections_for_ip: HashMap, pub bytes_for_ip: HashMap, + pub bytes_per_s_for_ip: HashMap, pub statistic_events: u64, } @@ -74,6 +75,7 @@ pub struct Statistics { bytes_for_ip: HashMap, bytes_per_s_window: SingleSumSMA, + bytes_per_s_for_ip_window: HashMap>, fps_window: SingleSumSMA, statistics_save_mode: StatisticsSaveMode, @@ -114,6 +116,7 @@ impl Statistics { denied_connections_for_ip: HashMap::new(), bytes_for_ip: HashMap::new(), bytes_per_s_window: SingleSumSMA::new(), + bytes_per_s_for_ip_window: HashMap::new(), fps_window: SingleSumSMA::new(), statistics_save_mode, }; @@ -222,6 +225,19 @@ impl Statistics { let bytes = self.bytes_for_ip.values().sum(); self.bytes_per_s_window .add_sample((bytes - prev.bytes) * 1000 / elapsed_ms); + for (ip, bytes) in &self.bytes_for_ip { + if let Some(prev_bytes) = prev.bytes_for_ip.get(ip) { + self.bytes_per_s_for_ip_window + .entry(*ip) + .or_insert_with(SingleSumSMA::new) + .add_sample((bytes - prev_bytes) * 1000 / elapsed_ms); + } + } + // Drop windows that have decayed to zero, so the map — and the JSON we broadcast every + // second — stays bounded to recently-active IPs rather than every IP ever seen. + self.bytes_per_s_for_ip_window + .retain(|_, window| window.get_average() > 0); + self.fps_window .add_sample((frame - prev.frame) * 1000 / elapsed_ms); let statistic_events = self.statistic_events; @@ -237,6 +253,11 @@ impl Statistics { connections_for_ip: self.connections_for_ip.clone(), denied_connections_for_ip: self.denied_connections_for_ip.clone(), bytes_for_ip: self.bytes_for_ip.clone(), + bytes_per_s_for_ip: self + .bytes_per_s_for_ip_window + .iter() + .map(|(ip, bytes_per_s)| (*ip, bytes_per_s.get_average())) + .collect(), statistic_events, } } From ae898586d6110dd40864c4a383fff5f6ed96204a Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Sat, 6 Jun 2026 23:08:34 +0200 Subject: [PATCH 6/8] fix: Side panel scrolling. Now show top 10 IPs --- breakwater/src/sinks/web_index.html | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/breakwater/src/sinks/web_index.html b/breakwater/src/sinks/web_index.html index 9e3c4a2..28a17a8 100644 --- a/breakwater/src/sinks/web_index.html +++ b/breakwater/src/sinks/web_index.html @@ -111,8 +111,17 @@ /* ---------- Sidebar ---------- */ .sidebar { display: grid; - grid-template-rows: auto auto auto 1fr; + /* `minmax(220px, 1fr)` keeps the chat usable: on tall screens it fills the leftover space + (and `.chat-messages` scrolls internally); on short screens it stops shrinking at 220px so + the sections above (connect/stats) no longer get squeezed off-screen — instead the whole + sidebar overflows and `overflow-y: auto` makes every section reachable by scrolling. */ + grid-template-rows: auto auto auto minmax(440px, 1fr); min-height: 0; + /* Vertical scroll only — `overflow-x: hidden` stops a stray nowrap element (e.g. a long IP) + or the scrollbar gutter from triggering an annoying little horizontal scroll. */ + overflow: hidden auto; + scrollbar-width: thin; + scrollbar-color: var(--border) transparent; background: var(--panel); border-left: 1px solid var(--border); } @@ -662,12 +671,12 @@

    Chat

    // s.ips_v4, s.ips_v6, s.fps, s.denied_connections_for_ip } - // Top 5 IPs by current send rate. `bytes_for_ip` already contains both IPv4 and IPv6 + // Top 10 IPs by current send rate. `bytes_for_ip` already contains both IPv4 and IPv6 // addresses, so they are ranked together in a single list. function renderTopTraffic(rates, totals, conns) { const top = Object.entries(rates) .sort((a, b) => b[1] - a[1]) - .slice(0, 5); + .slice(0, 10); if (top.length === 0) { topTraffic.innerHTML = '
  • no traffic currently
  • '; From 2c3c071576a94159387aeadb5c98a1a99017c073 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Sat, 6 Jun 2026 23:21:45 +0200 Subject: [PATCH 7/8] feat: Let users pick to sort by traffic or total --- breakwater/src/sinks/web_index.html | 96 ++++++++++++++++++++++++----- 1 file changed, 80 insertions(+), 16 deletions(-) diff --git a/breakwater/src/sinks/web_index.html b/breakwater/src/sinks/web_index.html index 28a17a8..b4fa783 100644 --- a/breakwater/src/sinks/web_index.html +++ b/breakwater/src/sinks/web_index.html @@ -14,7 +14,7 @@ --text-dim: #aaa; --accent: #3ea6ff; --live: #ff4444; - --sidebar-w: 340px; + --sidebar-w: 380px; } * { box-sizing: border-box; } @@ -116,6 +116,10 @@ the sections above (connect/stats) no longer get squeezed off-screen — instead the whole sidebar overflows and `overflow-y: auto` makes every section reachable by scrolling. */ grid-template-rows: auto auto auto minmax(440px, 1fr); + /* Pin the single column to the panel width (`minmax(0, …)` lets it shrink below its content's + intrinsic size) so wide content — e.g. a long IPv6 endpoint set to `white-space: nowrap` — + gets ellipsized inside the panel instead of widening it and being clipped on the right. */ + grid-template-columns: minmax(0, 1fr); min-height: 0; /* Vertical scroll only — `overflow-x: hidden` stops a stray nowrap element (e.g. a long IP) or the scrollbar gutter from triggering an annoying little horizontal scroll. */ @@ -163,9 +167,9 @@ border: 1px solid var(--border); border-radius: 6px; padding: 6px 9px; - overflow: hidden; - text-overflow: ellipsis; - white-space: nowrap; + /* Wrap long endpoints (e.g. IPv6 addresses) across lines instead of truncating them, so the + full address is always readable. `anywhere` lets the break fall mid-token when needed. */ + overflow-wrap: anywhere; } .endpoint-list .empty { color: var(--text-dim); font-style: italic; } .endpoints h2, @@ -213,6 +217,35 @@ letter-spacing: .06em; color: var(--text-dim); } + /* Heading row holding the "Top …" title and the traffic/total switch. */ + .top-head { + display: flex; + align-items: center; + justify-content: space-between; + gap: 8px; + margin: 16px 0 8px; + } + .top-head .subhead { margin: 0; } + .seg { + display: inline-flex; + border: 1px solid var(--border); + border-radius: 6px; + overflow: hidden; + } + .seg-btn { + background: var(--panel-2); + color: var(--text-dim); + border: none; + padding: 4px 9px; + font: inherit; + font-size: 11px; + font-weight: 600; + text-transform: uppercase; + letter-spacing: .04em; + cursor: pointer; + } + .seg-btn + .seg-btn { border-left: 1px solid var(--border); } + .seg-btn.active { background: var(--accent); color: #000; } .top-list { list-style: none; margin: 0; @@ -235,9 +268,7 @@ .top-list .entry { flex: 1; min-width: 0; } .top-list .ip { font-family: monospace; - overflow: hidden; - text-overflow: ellipsis; - white-space: nowrap; + overflow-wrap: anywhere; } .top-list .meta { display: flex; @@ -390,7 +421,13 @@

    Stats

    -

    Top traffic

    +
    +

    Top traffic

    +
    + + +
    +
    1. waiting for data…
    @@ -666,32 +703,47 @@

    Chat

    // Per-IP rates are computed server-side (`bytes_per_s_for_ip`). Diffing the cumulative // `bytes_for_ip` against wall-clock time on the client produced bogus spikes (TBit/s) when // the tab is throttled, e.g. on mobile running at <= 1 fps. - renderTopTraffic(s.bytes_per_s_for_ip || {}, s.bytes_for_ip || {}, s.connections_for_ip || {}); + latestStats = s; + renderTopList(); // Also available for future views: // s.ips_v4, s.ips_v6, s.fps, s.denied_connections_for_ip } - // Top 10 IPs by current send rate. `bytes_for_ip` already contains both IPv4 and IPv6 - // addresses, so they are ranked together in a single list. - function renderTopTraffic(rates, totals, conns) { - const top = Object.entries(rates) + // How the "Top" list is ranked: "traffic" = current send rate (bytes/s), "total" = cumulative + // bytes. Defaults to traffic. The latest stats snapshot is cached so toggling re-renders the + // list immediately instead of waiting for the next (~1/s) stats message. + let topSortMode = "traffic"; + let latestStats = null; + + // Top 10 IPs ranked by the selected mode. `bytes_for_ip` already contains both IPv4 and IPv6 + // addresses, so they are ranked together in a single list. The per-row metrics shown (rate, + // connections, total) stay the same regardless of mode — only the ranking key changes. + function renderTopList() { + if (!latestStats) return; + const rates = latestStats.bytes_per_s_for_ip || {}; + const totals = latestStats.bytes_for_ip || {}; + const conns = latestStats.connections_for_ip || {}; + + const rankBy = topSortMode === "total" ? totals : rates; + const top = Object.entries(rankBy) + .filter(([, value]) => value > 0) .sort((a, b) => b[1] - a[1]) .slice(0, 10); if (top.length === 0) { - topTraffic.innerHTML = '
  • no traffic currently
  • '; + topTraffic.innerHTML = `
  • ${topSortMode === "total" ? "no data yet" : "no traffic currently"}
  • `; return; } topTraffic.innerHTML = top - .map(([ip, rate], i) => + .map(([ip], i) => `
  • ` + `${i + 1}` + `
    ` + `
    ${ip}
    ` + `
    ` + `` + - `${fmtSI(rate * 8, "bit/s")} ` + + `${fmtSI((rates[ip] || 0) * 8, "bit/s")} ` + `(${conns[ip] || 0} conn)` + `` + `${fmtSI(totals[ip] || 0, "B")} total` + @@ -702,6 +754,18 @@

    Chat

    .join(""); } + // Wire up the traffic/total switch: update the mode, the active button, the heading label, and + // re-render straight away from the cached snapshot. + const topSortLabel = document.getElementById("top-sort-label"); + document.querySelectorAll(".seg-btn").forEach((btn) => { + btn.addEventListener("click", () => { + topSortMode = btn.dataset.mode; + document.querySelectorAll(".seg-btn").forEach((b) => b.classList.toggle("active", b === btn)); + topSortLabel.textContent = topSortMode === "total" ? "total" : "traffic"; + renderTopList(); + }); + }); + // ===================================================================== // Chat // ===================================================================== From 876d2aa75f188949fd3bac1554d6156084c34c1c Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Sun, 7 Jun 2026 00:48:05 +0200 Subject: [PATCH 8/8] feat: Various UI features, e.g. blocked connections --- breakwater/src/sinks/web_index.html | 183 ++++++++++++++++++++++++---- 1 file changed, 161 insertions(+), 22 deletions(-) diff --git a/breakwater/src/sinks/web_index.html b/breakwater/src/sinks/web_index.html index b4fa783..7463e1c 100644 --- a/breakwater/src/sinks/web_index.html +++ b/breakwater/src/sinks/web_index.html @@ -65,6 +65,34 @@ user-select: none; } + /* Connection-state pill (top-left of the player). Hidden while connected; shown with a pulsing + dot while (re)connecting, so a dropped websocket is obvious — important on flaky mobile links + where the canvas would otherwise just silently freeze on its last frame. */ + #conn-status { + position: absolute; + top: 12px; + left: 14px; + display: none; + align-items: center; + gap: 7px; + font: 13px/1.2 system-ui, -apple-system, sans-serif; + color: #fff; + background: rgba(0, 0, 0, .55); + padding: 5px 10px; + border-radius: 4px; + user-select: none; + } + #conn-status.show { display: inline-flex; } + #conn-status::before { + content: ""; + width: 8px; + height: 8px; + border-radius: 50%; + background: var(--live); + animation: conn-pulse 1s ease-in-out infinite; + } + @keyframes conn-pulse { 0%, 100% { opacity: 1; } 50% { opacity: .25; } } + #coord-readout { position: fixed; z-index: 10; @@ -172,6 +200,23 @@ overflow-wrap: anywhere; } .endpoint-list .empty { color: var(--text-dim); font-style: italic; } + /* Click-to-copy affordance for endpoints. The transient `.copied` flash overlays a small badge + on the right; it covers the tail of a (wrapped) address only for ~1s, which is acceptable. */ + .endpoint-list li.copyable { cursor: pointer; position: relative; } + .endpoint-list li.copyable:hover { border-color: var(--accent); } + .endpoint-list li.copied::after { + content: "copied ✓"; + position: absolute; + top: 50%; + right: 8px; + transform: translateY(-50%); + font-family: system-ui, -apple-system, sans-serif; + font-size: 11px; + color: #000; + background: var(--accent); + padding: 2px 7px; + border-radius: 4px; + } .endpoints h2, .stats h2, .chat h2 { @@ -358,6 +403,7 @@
    +
    Connecting…
    connecting…
    @@ -426,6 +472,7 @@

    Top traffic

    +
      @@ -498,12 +545,31 @@

      Chat

      })); } + // Connection-state pill. `everConnected` distinguishes the first connect ("Connecting…") from a + // later drop ("Reconnecting…"); the pill is hidden entirely once the socket is open. + const connStatusEl = document.getElementById("conn-status"); + let everConnected = false; + function setConnState(connected) { + if (connected) { + connStatusEl.classList.remove("show"); + } else { + connStatusEl.textContent = everConnected ? "Reconnecting…" : "Connecting…"; + connStatusEl.classList.add("show"); + } + } + function connect() { + setConnState(false); const proto = location.protocol === "https:" ? "wss" : "ws"; const ws = new WebSocket(`${proto}://${location.host}/ws`); ws.binaryType = "arraybuffer"; activeSocket = ws; + ws.onopen = () => { + everConnected = true; + setConnState(true); + }; + ws.onmessage = (event) => { if (typeof event.data === "string") { const msg = JSON.parse(event.data); @@ -529,7 +595,7 @@

      Chat

      ws.onclose = () => { activeSocket = null; - fpsEl.textContent = "disconnected"; + setConnState(false); setTimeout(connect, 1000); }; ws.onerror = () => ws.close(); @@ -593,13 +659,17 @@

      Chat

      // ===================================================================== // Cursor coordinate + color readout (on by default) // ===================================================================== - let showCoords = true; + // Restore the saved preference (defaults to on when never set). + let showCoords = localStorage.getItem("showCoords") !== "false"; let cursorX = null; // last cursor position over the player, in client coordinates let cursorY = null; const coordReadout = document.getElementById("coord-readout"); - document.getElementById("toggle-coords").addEventListener("change", (e) => { + const toggleCoordsEl = document.getElementById("toggle-coords"); + toggleCoordsEl.checked = showCoords; + toggleCoordsEl.addEventListener("change", (e) => { showCoords = e.target.checked; + localStorage.setItem("showCoords", showCoords); updateCoordReadout(); }); @@ -649,10 +719,48 @@

      Chat

      // ===================================================================== const endpointsEl = document.getElementById("endpoints"); + // Copy `text` to the clipboard. The async Clipboard API only works in secure contexts (https or + // localhost); breakwater is often viewed over plain http on a LAN, so fall back to the legacy + // textarea + execCommand path there. Returns whether the copy succeeded. + async function copyText(text) { + if (navigator.clipboard && window.isSecureContext) { + try { + await navigator.clipboard.writeText(text); + return true; + } catch (e) { /* fall through to the legacy path */ } + } + try { + const ta = document.createElement("textarea"); + ta.value = text; + ta.style.position = "fixed"; + ta.style.opacity = "0"; + document.body.appendChild(ta); + ta.select(); + const ok = document.execCommand("copy"); + document.body.removeChild(ta); + return ok; + } catch (e) { + return false; + } + } + + // Turn an endpoint
    1. into a click-to-copy control with a brief "copied ✓" flash. + function makeCopyable(li, text) { + li.classList.add("copyable"); + li.title = `Click to copy: ${text}`; + li.addEventListener("click", async () => { + if (await copyText(text)) { + li.classList.add("copied"); + clearTimeout(li._copiedTimer); + li._copiedTimer = setTimeout(() => li.classList.remove("copied"), 1200); + } + }); + } + // The URL this viewer is reachable at (per-viewer, e.g. localhost vs the public host). const viewUrl = document.getElementById("view-url"); viewUrl.textContent = location.origin; - viewUrl.title = location.origin; + makeCopyable(viewUrl, location.origin); function renderEndpoints(endpoints) { endpointsEl.innerHTML = ""; @@ -667,7 +775,7 @@

      Chat

      for (const endpoint of endpoints) { const li = document.createElement("li"); li.textContent = endpoint; - li.title = endpoint; + makeCopyable(li, endpoint); endpointsEl.appendChild(li); } } @@ -706,13 +814,17 @@

      Chat

      latestStats = s; renderTopList(); // Also available for future views: - // s.ips_v4, s.ips_v6, s.fps, s.denied_connections_for_ip + // s.ips_v4, s.ips_v6, s.fps } // How the "Top" list is ranked: "traffic" = current send rate (bytes/s), "total" = cumulative - // bytes. Defaults to traffic. The latest stats snapshot is cached so toggling re-renders the - // list immediately instead of waiting for the next (~1/s) stats message. - let topSortMode = "traffic"; + // bytes, "denied" = denied connection count. Defaults to traffic. The latest stats snapshot is + // cached so toggling re-renders the list immediately instead of waiting for the next (~1/s) + // stats message. + const TOP_SORT_MODES = ["traffic", "total", "denied"]; + let topSortMode = TOP_SORT_MODES.includes(localStorage.getItem("topSortMode")) + ? localStorage.getItem("topSortMode") + : "traffic"; let latestStats = null; // Top 10 IPs ranked by the selected mode. `bytes_for_ip` already contains both IPv4 and IPv6 @@ -723,48 +835,75 @@

      Chat

      const rates = latestStats.bytes_per_s_for_ip || {}; const totals = latestStats.bytes_for_ip || {}; const conns = latestStats.connections_for_ip || {}; + const denied = latestStats.denied_connections_for_ip || {}; - const rankBy = topSortMode === "total" ? totals : rates; + const rankBy = topSortMode === "total" ? totals + : topSortMode === "denied" ? denied + : rates; const top = Object.entries(rankBy) .filter(([, value]) => value > 0) .sort((a, b) => b[1] - a[1]) .slice(0, 10); if (top.length === 0) { - topTraffic.innerHTML = `
    2. ${topSortMode === "total" ? "no data yet" : "no traffic currently"}
    3. `; + const empty = topSortMode === "denied" ? "no denied connections" + : topSortMode === "total" ? "no data yet" + : "no traffic currently"; + topTraffic.innerHTML = `
    4. ${empty}
    5. `; return; } + // The accented primary metric reflects the active ranking; the right column shows a + // complementary figure so each row still carries both traffic and size context in every mode. topTraffic.innerHTML = top - .map(([ip], i) => - `
    6. ` + + .map(([ip], i) => { + const rate = fmtSI((rates[ip] || 0) * 8, "bit/s"); + const total = fmtSI(totals[ip] || 0, "B"); + let primary, right; + if (topSortMode === "denied") { + primary = `${denied[ip] || 0} denied`; + right = `${total} total`; + } else if (topSortMode === "total") { + primary = total; + right = rate; + } else { + primary = rate; + right = `${total} total`; + } + return `
    7. ` + `${i + 1}` + `
      ` + `
      ${ip}
      ` + `
      ` + `` + - `${fmtSI((rates[ip] || 0) * 8, "bit/s")} ` + + `${primary} ` + `(${conns[ip] || 0} conn)` + `` + - `${fmtSI(totals[ip] || 0, "B")} total` + + `${right}` + `
      ` + `
      ` + - `
    8. ` - ) + ``; + }) .join(""); } - // Wire up the traffic/total switch: update the mode, the active button, the heading label, and - // re-render straight away from the cached snapshot. + // Wire up the traffic/total switch: persist the mode, update the active button + heading label, + // and re-render straight away from the cached snapshot. const topSortLabel = document.getElementById("top-sort-label"); - document.querySelectorAll(".seg-btn").forEach((btn) => { + const segBtns = document.querySelectorAll(".seg-btn"); + function applyTopSortMode() { + segBtns.forEach((b) => b.classList.toggle("active", b.dataset.mode === topSortMode)); + topSortLabel.textContent = topSortMode; // "traffic" | "total" | "denied" + } + segBtns.forEach((btn) => { btn.addEventListener("click", () => { topSortMode = btn.dataset.mode; - document.querySelectorAll(".seg-btn").forEach((b) => b.classList.toggle("active", b === btn)); - topSortLabel.textContent = topSortMode === "total" ? "total" : "traffic"; + localStorage.setItem("topSortMode", topSortMode); + applyTopSortMode(); renderTopList(); }); }); + applyTopSortMode(); // reflect the restored preference on load // ===================================================================== // Chat