diff --git a/Cargo.lock b/Cargo.lock index 79f9b7f..d655d21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -326,6 +326,61 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "axum" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" +dependencies = [ + "axum-core", + "base64", + "bytes", + "form_urlencoded", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sha1", + "sync_wrapper", + "tokio", + "tokio-tungstenite", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.76" @@ -341,6 +396,12 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bincode" version = "2.0.1" @@ -429,6 +490,15 @@ dependencies = [ "no_std_io2", ] +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "block2" version = "0.5.1" @@ -452,15 +522,18 @@ name = "breakwater" version = "0.20.0" dependencies = [ "async-trait", + "axum", "breakwater-egui-overlay", "breakwater-parser", "bytemuck", + "bytes", "chrono", "clap", "color-eyre", "const_format", "eframe", "egui", + "flate2", "futures", "libloading 0.9.0", "local-ip-address", @@ -921,6 +994,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -997,6 +1079,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "ctor" version = "0.10.1" @@ -1093,6 +1185,16 @@ dependencies = [ "syn", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "dispatch" version = "0.2.0" @@ -1689,6 +1791,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "gethostname" version = "1.1.0" @@ -1899,6 +2011,86 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "http" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55281c53a1894c864990125767da440a4e630446785086f52523b20033b74498" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + +[[package]] +name = "hyper-util" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" +dependencies = [ + "bytes", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -2481,6 +2673,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "maybe-rayon" version = "0.1.1" @@ -2536,6 +2734,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -3924,6 +4128,12 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "ryu" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" + [[package]] name = "same-file" version = "1.0.6" @@ -4013,6 +4223,40 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -4290,6 +4534,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "synstructure" version = "0.13.2" @@ -4533,6 +4783,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "toml_datetime" version = "1.1.1+spec-1.1.0" @@ -4563,6 +4825,34 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.44" @@ -4693,6 +4983,22 @@ version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2df906b07856748fa3f6e0ad0cbaa047052d4a7dd609e231c4f72cee8c36f31" +[[package]] +name = "tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c01152af293afb9c7c2a57e4b559c5620b421f6d133261c60dd2d0cdb38e6b8" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.4", + "sha1", + "thiserror 2.0.18", +] + [[package]] name = "type-map" version = "0.5.1" @@ -4708,6 +5014,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" +[[package]] +name = "typenum" +version = "1.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6f5e870be6c3b371b77fe0ee0bafb859fa4964b4404c27de1d380043c4dda20" + [[package]] name = "unicode-bidi" version = "0.3.18" diff --git a/Cargo.toml b/Cargo.toml index a3a7cf9..8f5b49d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,9 @@ repository = "https://github.com/sernauer/breakwater" [workspace.dependencies] async-trait = "0.1" +axum = { version = "0.8", features = ["ws"] } bytemuck = { version = "1.21" } +bytes = "1.0" chrono = "0.4" clap = { version = "4.5", features = ["derive"] } color-eyre = "0.6" @@ -24,6 +26,7 @@ const_format = "0.2" criterion = { version = "0.8", features = ["async_tokio"] } eframe = { version = "0.34", default-features = false, features = ["glow", "default_fonts", "persistence", "wayland", "x11"] } egui = "0.34" +flate2 = "1.0" futures = "0.3" libloading = "0.9" local-ip-address = "0.6.3" diff --git a/breakwater/Cargo.toml b/breakwater/Cargo.toml index 99b4378..05fb97f 100644 --- a/breakwater/Cargo.toml +++ b/breakwater/Cargo.toml @@ -16,13 +16,16 @@ breakwater-parser.workspace = true breakwater-egui-overlay = { workspace = true, optional = true } async-trait.workspace = true +axum = { workspace = true, optional = true } bytemuck = { workspace = true, optional = true } +bytes = { workspace = true, optional = true } chrono.workspace = true clap.workspace = true color-eyre.workspace = true const_format.workspace = true eframe = { workspace = true, optional = true } egui = { workspace = true, optional = true } +flate2 = { workspace = true, optional = true } libloading = { workspace = true, optional = true } local-ip-address.workspace = true memadvise.workspace = true @@ -48,7 +51,7 @@ rstest.workspace = true [features] # We don't enable binary-sync-pixels and binary-set-pixel by default to make it a bit harder for clients ;) -default = ["egui", "vnc"] +default = ["egui", "vnc", "web"] alpha = ["breakwater-parser/alpha"] binary-set-pixel = ["breakwater-parser/binary-set-pixel"] @@ -56,6 +59,7 @@ binary-sync-pixels = ["breakwater-parser/binary-sync-pixels"] egui = ["dep:breakwater-egui-overlay", "dep:bytemuck", "dep:eframe", "dep:egui", "dep:libloading"] native-display = ["dep:softbuffer", "dep:winit"] vnc = ["dep:vncserver"] +web = ["dep:axum", "dep:bytes", "dep:flate2"] [lints] workspace = true diff --git a/breakwater/src/cli_args.rs b/breakwater/src/cli_args.rs index 99fc1c7..4457491 100644 --- a/breakwater/src/cli_args.rs +++ b/breakwater/src/cli_args.rs @@ -100,7 +100,7 @@ pub struct CliArgs { pub viewport: Vec, /// Specify one or more pixelflut endpoints to display. - #[cfg(feature = "egui")] + #[cfg(any(feature = "egui", feature = "web"))] #[clap(long)] pub advertised_endpoints: Vec, @@ -112,6 +112,17 @@ pub struct CliArgs { #[clap(long)] pub ui: Option, + /// Listen address for the WebUI, e.g. `[::]:8080`. + /// Serves a small website that streams the canvas to web browsers over a WebSocket. + #[cfg(feature = "web")] + #[clap(long)] + pub web_listen_address: Option, + + /// Maximum number of chat messages a single IP address may send per minute in the WebUI. + #[cfg(feature = "web")] + #[clap(long, default_value_t = 10)] + pub chat_messages_per_minute: u32, + /// Create (or use an existing) shared memory region for the framebuffer. /// This enables other applications to read and write Pixel values to the framebuffer or can be /// used to persist the canvas across restarts. @@ -120,6 +131,36 @@ pub struct CliArgs { } impl CliArgs { + /// Resolves the Pixelflut endpoints to advertise to users (so they know where to connect). + /// + /// If `--advertised-endpoints` is set, those are returned verbatim. Otherwise we make a best + /// effort to guess: for a single listener we resolve the local v4 + v6 IPs and append the port, + /// for multiple listeners we just list them. + #[cfg(any(feature = "egui", feature = "web"))] + pub fn resolve_advertised_endpoints(&self) -> Vec { + if !self.advertised_endpoints.is_empty() { + return self.advertised_endpoints.clone(); + } + + match &self.listen_addresses[..] { + // No listeners given, so also no endpoints to advertise + [] => vec![], + // In case of a single listener we get the local IPs (v4 + v6) and concat them with the + // port + [single_listener] => { + let port = single_listener.port(); + + [local_ip_address::local_ip(), local_ip_address::local_ipv6()] + .into_iter() + .filter_map(Result::ok) + .map(|ip| format!("{ip}:{port}")) + .collect() + } + // If multiple listeners are used it's complicated, so we just print them + multiple_listeners => multiple_listeners.iter().map(ToString::to_string).collect(), + } + } + /// Checks that at most one IP per version (v4/v6) is configured. /// Returns the (optional) v4 address and (optional) v6 address. #[cfg(feature = "vnc")] diff --git a/breakwater/src/main.rs b/breakwater/src/main.rs index fef3f70..595ef33 100644 --- a/breakwater/src/main.rs +++ b/breakwater/src/main.rs @@ -140,6 +140,24 @@ async fn main() -> eyre::Result<()> { } } + #[cfg(feature = "web")] + { + use crate::sinks::web::WebSink; + + if let Some(web_sink) = WebSink::new( + fb.clone(), + &args, + statistics_tx.clone(), + statistics_information_rx.resubscribe(), + terminate_signal_rx.resubscribe(), + ) + .await + .context("failed to create web sink")? + { + display_sinks.push(Box::new(web_sink)); + } + } + let mut ffmpeg_thread_present = false; if let Some(ffmpeg_sink) = FfmpegSink::new( fb.clone(), diff --git a/breakwater/src/sinks/egui/mod.rs b/breakwater/src/sinks/egui/mod.rs index 0bbed4a..020dda3 100644 --- a/breakwater/src/sinks/egui/mod.rs +++ b/breakwater/src/sinks/egui/mod.rs @@ -104,29 +104,7 @@ impl DisplaySink for EguiSink { } }); - let advertised_endpoints = if cli_args.advertised_endpoints.is_empty() { - // In case no advertised endpoints to display are given, we calculate the most likely - // endpoint(s) to display. - match &cli_args.listen_addresses[..] { - // No listeners given, so also no endpoints to advertise - [] => vec![], - // In case of a single listener we get the local IPs (v4 + v6) and concat them with - // the port - [single_listener] => { - let port = single_listener.port(); - - [local_ip_address::local_ip(), local_ip_address::local_ipv6()] - .into_iter() - .filter_map(Result::ok) - .map(|ip| format!("{ip}:{port}")) - .collect() - } - // If multiple listeners are used it's complicated, so we just print them - multiple_listeners => multiple_listeners.iter().map(ToString::to_string).collect(), - } - } else { - cli_args.advertised_endpoints.clone() - }; + let advertised_endpoints = cli_args.resolve_advertised_endpoints(); Ok(Some(Self { framebuffer: fb, diff --git a/breakwater/src/sinks/mod.rs b/breakwater/src/sinks/mod.rs index 4ee53f7..ff6a1ca 100644 --- a/breakwater/src/sinks/mod.rs +++ b/breakwater/src/sinks/mod.rs @@ -16,6 +16,8 @@ pub mod ffmpeg; pub mod native_display; #[cfg(feature = "vnc")] pub mod vnc; +#[cfg(feature = "web")] +pub mod web; // The stabilization of async functions in traits in Rust 1.75 did not include support for using traits containing async // functions as dyn Trait, so we still need to use async_trait here. diff --git a/breakwater/src/sinks/web.rs b/breakwater/src/sinks/web.rs new file mode 100644 index 0000000..aafe055 --- /dev/null +++ b/breakwater/src/sinks/web.rs @@ -0,0 +1,507 @@ +use std::{ + collections::{HashMap, VecDeque}, + io::Write, + net::{IpAddr, SocketAddr}, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; + +use async_trait::async_trait; +use axum::{ + Router, + extract::{ + ConnectInfo, State, + ws::{Message, Utf8Bytes, WebSocket, WebSocketUpgrade}, + }, + response::{Html, Response}, + routing::get, +}; +use breakwater_parser::{FB_BYTES_PER_PIXEL, FrameBuffer}; +use bytes::Bytes; +use color_eyre::eyre::{self, Context}; +use flate2::{Compression, write::ZlibEncoder}; +use futures::{SinkExt, StreamExt, stream::SplitSink}; +use simple_moving_average::{SMA, SingleSumSMA}; +use tokio::{ + sync::{broadcast, mpsc}, + time, +}; +use tracing::{debug, instrument, trace, warn}; + +use crate::{ + cli_args::CliArgs, + sinks::DisplaySink, + statistics::{StatisticsEvent, StatisticsInformationEvent}, +}; + +const INDEX_HTML: &str = include_str!("web_index.html"); + +/// Number of independently-compressed chunks per frame. The framebuffer is split into this many +/// contiguous byte ranges that are zlib-compressed in parallel, drastically cutting the wall-clock +/// time spent compressing a single frame. All chunks are packed into one websocket message (see +/// [`WebSink::encode_frame`]), so a client never renders a partially-updated frame. +/// +/// Note: If you change this value, the frontend adapts automatically as the chunk count is encoded +/// in the message header. +const FRAME_COMPRESSION_CHUNKS: usize = 16; + +/// Number of recent frames over which the average compression duration is computed for logging. +const COMPRESSION_TIME_WINDOW_SIZE: usize = 100; + +/// Number of compressed frames buffered for each connected client. Kept small on purpose: +/// a client that can't drain the buffer in time receives a [`broadcast::error::RecvError::Lagged`] +/// and simply skips ahead to the newest frame, which reduces its effective frame rate. +const FRAME_BUFFER_SIZE: usize = 2; + +/// Number of stats messages buffered per client. Stats are produced roughly once per second, so a +/// small buffer is plenty; a client that lags simply skips the missed updates. As of writing no +/// data is lost in case a stats message is missed, so we don't need to be super careful about that. +const STATS_BUFFER_SIZE: usize = 3; + +/// Number of chat messages buffered per client. A client that lags this far behind will miss some +/// chat messages. As they should be cheap to send we try to deliver all of them. +const CHAT_BUFFER_SIZE: usize = 1024; + +/// The window over which the per-IP chat rate limit is applied. +const CHAT_RATE_LIMIT_WINDOW: Duration = Duration::from_mins(1); + +/// Maximum length (in characters) of a chat username and message. Enforced server-side so a crafted +/// client can't bypass the frontend's `maxlength` and blow up the UI. +/// +/// Note: If you change this value, please also change it in the frontend. +const MAX_CHAT_NAME_LEN: usize = 20; +const MAX_CHAT_MESSAGE_LEN: usize = 256; + +/// Tracks the timestamps of recent chat messages per IP address, shared across all connections so +/// the rate limit applies per IP rather than per connection. +type ChatRateLimiter = Arc>>>; + +#[derive(Clone)] +struct WebState { + /// Carries the latest frame already serialized to binary BLOB, ready to send to every client. + frame_tx: broadcast::Sender, + /// Carries the latest statistics already serialized to JSON, ready to send to every client. + stats_tx: broadcast::Sender, + /// Carries chat messages (already serialized to JSON) to every connected client. + chat_tx: broadcast::Sender, + /// Maximum number of chat messages a single IP may send per [`CHAT_RATE_LIMIT_WINDOW`]. + chat_rate_limit: u32, + chat_rate_limiter: ChatRateLimiter, + width: usize, + height: usize, + /// Pixelflut endpoints to advertise to users, sent once on connect. + advertised_endpoints: Vec, +} + +pub struct WebSink { + fb: Arc, + statistics_information_rx: broadcast::Receiver, + terminate_signal_rx: broadcast::Receiver<()>, + + listen_address: SocketAddr, + fps: u32, + + /// Shared state handed to every connection handler (channels, rate limiter, canvas size, ...). + /// The sink keeps its own copy to feed the encoder loop and stats task. + state: WebState, + + /// Reused scratch buffer holding one RGBA frame, so we don't reallocate every tick. + frame_buf: Vec, + + /// Rolling average of the per-frame compression duration (in microseconds), logged alongside + /// the instantaneous duration to give a more stable picture. + compression_time_window: SingleSumSMA, +} + +#[async_trait] +impl DisplaySink for WebSink { + #[instrument(skip_all, err)] + async fn new( + fb: Arc, + cli_args: &CliArgs, + _statistics_tx: mpsc::Sender, + statistics_information_rx: broadcast::Receiver, + terminate_signal_rx: broadcast::Receiver<()>, + ) -> eyre::Result> { + let Some(listen_address) = cli_args.web_listen_address else { + debug!("Web sink not enabled as no web listen address is specified"); + return Ok(None); + }; + + let (frame_tx, _) = broadcast::channel(FRAME_BUFFER_SIZE); + let (stats_tx, _) = broadcast::channel(STATS_BUFFER_SIZE); + let (chat_tx, _) = broadcast::channel(CHAT_BUFFER_SIZE); + let frame_buf = vec![0; fb.get_size() * FB_BYTES_PER_PIXEL]; + + let state = WebState { + frame_tx, + stats_tx, + chat_tx, + chat_rate_limit: cli_args.chat_messages_per_minute, + chat_rate_limiter: Arc::new(Mutex::new(HashMap::new())), + width: fb.get_width(), + height: fb.get_height(), + advertised_endpoints: cli_args.resolve_advertised_endpoints(), + }; + + Ok(Some(Self { + fb, + statistics_information_rx, + terminate_signal_rx, + listen_address, + fps: cli_args.fps, + state, + frame_buf, + compression_time_window: SingleSumSMA::new(), + })) + } + + #[instrument(skip(self), err)] + async fn run(&mut self) -> eyre::Result<()> { + let state = self.state.clone(); + + // Dedicated task: serialize every incoming statistics event to JSON (once, not per client) + // and broadcast it. The full per-IP maps are included so the frontend can build show + // traffic per IP. + let mut statistics_information_rx = self.statistics_information_rx.resubscribe(); + let stats_tx = self.state.stats_tx.clone(); + let stats_task = tokio::spawn(async move { + loop { + match statistics_information_rx.recv().await { + Ok(info) => match serde_json::to_value(&info) { + Ok(mut value) => { + if let Some(object) = value.as_object_mut() { + object.insert("type".to_owned(), "stats".into()); + } + // Ignore the error: it only means no clients are currently connected. + let _ = stats_tx.send(Utf8Bytes::from(value.to_string())); + } + Err(err) => warn!(%err, "failed to serialize statistics to JSON"), + }, + // We fell behind on statistics events; just continue with the next one. + Err(broadcast::error::RecvError::Lagged(_)) => {} + // The statistics thread shut down, so will we. + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); + + let app = Router::new() + .route("/", get(index)) + .route("/ws", get(ws_handler)) + .with_state(state); + + let listener = tokio::net::TcpListener::bind(self.listen_address) + .await + .with_context(|| format!("failed to bind web server to {}", self.listen_address))?; + tracing::info!( + "Web UI available at http://{}", + listener.local_addr().unwrap_or(self.listen_address) + ); + + // Shut the HTTP server down gracefully once we receive the terminate signal. + let mut server_terminate_rx = self.terminate_signal_rx.resubscribe(); + let server = tokio::spawn(async move { + let shutdown = async move { + let _ = server_terminate_rx.recv().await; + }; + // `into_make_service_with_connect_info` makes the peer `SocketAddr` available to + // handlers via `ConnectInfo`, which we use for the per-IP chat rate limit. + if let Err(err) = axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .with_graceful_shutdown(shutdown) + .await + { + warn!(%err, "web server stopped unexpectedly"); + } + }); + + // Encoder loop: compress the framebuffer once per tick and broadcast the bytes to every + // connected client. The expensive work (copy + compress) happens a single time regardless + // of the number of viewers. + let mut interval = time::interval(Duration::from_micros(1_000_000 / u64::from(self.fps))); + // In case we delayed a frame, there is no point in trying to get the following frames + // quicker as a compensation. + interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay); + loop { + if self.terminate_signal_rx.try_recv().is_ok() { + break; + } + + // No point spending CPU on compression while nobody is watching. + if self.state.frame_tx.receiver_count() > 0 { + let frame = self.encode_frame().await?; + // Ignore the error: it only means all receivers disconnected between the check above + // and here. + let _ = self.state.frame_tx.send(frame); + } + + interval.tick().await; + } + + server.abort(); + stats_task.abort(); + Ok(()) + } +} + +impl WebSink { + /// Copies the current framebuffer into the scratch buffer, forces the alpha channel to opaque + /// (the framebuffer stores `rgb0`, but the browser's `ImageData` expects a meaningful alpha), + /// and zlib-compresses the result. + /// + /// Compression is the single most expensive part of serving the web UI, so the buffer is split + /// into [`FRAME_COMPRESSION_CHUNKS`] contiguous byte ranges that are compressed in parallel. + /// The compressed chunks are concatenated into one message, prefixed with a small header so the + /// client can split them apart again: + /// + /// ```text + /// u32le chunk_count + /// u32le compressed_len × chunk_count + /// bytes compressed chunk data, back-to-back, in order + /// ``` + /// + /// Because the chunks are simply consecutive slices of the framebuffer, the client reproduces + /// the full frame by decompressing each chunk and concatenating the output in order. Keeping + /// everything in a single websocket message guarantees a client never renders a half-updated + /// frame (which would show as a visible tear/artefact). + async fn encode_frame(&mut self) -> eyre::Result { + self.frame_buf.copy_from_slice(self.fb.as_bytes()); + for pixel in self.frame_buf.chunks_exact_mut(FB_BYTES_PER_PIXEL) { + pixel[3] = 0xff; + } + + let start = Instant::now(); + + let len = self.frame_buf.len(); + // Round up so we never end up with more than `FRAME_COMPRESSION_CHUNKS` chunks. The exact + // split points don't matter for correctness as the client reassembles the chunks in order. + let chunk_size = len.div_ceil(FRAME_COMPRESSION_CHUNKS).max(1); + + // Compress each chunk on Tokio's blocking thread pool. `spawn_blocking` requires `'static` + // closures, so we temporarily move the scratch buffer into an `Arc` that every task shares; + // it is reclaimed below to keep reusing the same allocation across frames. + let frame = Arc::new(std::mem::take(&mut self.frame_buf)); + let mut tasks = Vec::with_capacity(FRAME_COMPRESSION_CHUNKS); + let mut offset = 0; + while offset < len { + let end = (offset + chunk_size).min(len); + let frame = Arc::clone(&frame); + tasks.push(tokio::task::spawn_blocking(move || { + compress_chunk(&frame[offset..end]) + })); + offset = end; + } + + // Collect in spawn order, so the chunks stay in framebuffer order. + let mut compressed_chunks: Vec> = Vec::with_capacity(tasks.len()); + for task in tasks { + compressed_chunks.push(task.await.context("compression task panicked")??); + } + + // All tasks have finished, so we are the sole owner again: reclaim the buffer for reuse. + self.frame_buf = + Arc::try_unwrap(frame).expect("frame buffer still shared after compression"); + + let compression_time = start.elapsed(); + self.compression_time_window + .add_sample(compression_time.as_micros() as u64); + let avg_compression_time = + Duration::from_micros(self.compression_time_window.get_average()); + + // Assemble the framed message: header (chunk count + per-chunk lengths) followed by the + // compressed bytes. + let compressed_bytes: usize = compressed_chunks.iter().map(Vec::len).sum(); + let header_len = (1 + compressed_chunks.len()) * size_of::(); + let mut message = Vec::with_capacity(header_len + compressed_bytes); + message.extend_from_slice(&(compressed_chunks.len() as u32).to_le_bytes()); + for chunk in &compressed_chunks { + message.extend_from_slice(&(chunk.len() as u32).to_le_bytes()); + } + for chunk in &compressed_chunks { + message.extend_from_slice(chunk); + } + + trace!( + raw_bytes = self.frame_buf.len(), + compressed_bytes, + chunks = compressed_chunks.len(), + compression_factor = self.frame_buf.len() as f64 / compressed_bytes as f64, + ?compression_time, + ?avg_compression_time, + "encoded web frame" + ); + + Ok(Bytes::from(message)) + } +} + +/// Zlib-compresses a single chunk of the framebuffer. +/// +/// `Compression::fast()` (level 1) keeps CPU usage low; Pixelflut battles are high-entropy, so a +/// higher level would mostly burn CPU for little gain. +fn compress_chunk(data: &[u8]) -> eyre::Result> { + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::fast()); + encoder + .write_all(data) + .context("failed to compress frame chunk")?; + encoder.finish().context("failed to finish compression") +} + +async fn index() -> Html<&'static str> { + Html(INDEX_HTML) +} + +async fn ws_handler( + ws: WebSocketUpgrade, + ConnectInfo(who): ConnectInfo, + State(state): State, +) -> Response { + ws.on_upgrade(move |socket| handle_socket(socket, who.ip(), state)) +} + +async fn handle_socket(socket: WebSocket, ip: IpAddr, state: WebState) { + // Split so we can read incoming chat messages and write outgoing frames/stats/chat concurrently. + let (mut sender, mut receiver) = socket.split(); + + // Tell the client the canvas dimensions (so it can size the `` and allocate + // `ImageData`) and the Pixelflut endpoints to advertise. + let hello = serde_json::json!({ + "type": "hello", + "width": state.width, + "height": state.height, + "advertised_endpoints": state.advertised_endpoints, + }) + .to_string(); + if sender.send(Message::Text(hello.into())).await.is_err() { + return; + } + + let mut frame_rx = state.frame_tx.subscribe(); + let mut stats_rx = state.stats_tx.subscribe(); + let mut chat_rx = state.chat_tx.subscribe(); + loop { + tokio::select! { + frame = frame_rx.recv() => match frame { + Ok(frame) => { + if sender.send(Message::Binary(frame)).await.is_err() { + // Client disconnected. + break; + } + } + // This client fell behind: skip the dropped frames and continue with the newest one. + // This is what throttles slow clients to a lower frame rate. + Err(broadcast::error::RecvError::Lagged(skipped)) => { + trace!(skipped, "web client lagging behind, dropping frames"); + } + Err(broadcast::error::RecvError::Closed) => break, + }, + stats_msg = stats_rx.recv() => match stats_msg { + Ok(json) => { + if sender.send(Message::Text(json)).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + }, + chat_msg = chat_rx.recv() => match chat_msg { + Ok(json) => { + if sender.send(Message::Text(json)).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + }, + incoming = receiver.next() => match incoming { + Some(Ok(Message::Text(text))) => handle_incoming_chat(&text, ip, &state, &mut sender).await, + // Client closed the connection or errored. + Some(Ok(Message::Close(_)) | Err(_)) | None => break, + // Ignore anything else the client might send (binary, ping, pong). + Some(Ok(_)) => {} + }, + } + } +} + +/// Parses, validates and rate-limits an incoming chat message. On success it is broadcast to all +/// clients; if the sender hit the rate limit, a `chat_error` is sent back only to them. +async fn handle_incoming_chat( + text: &str, + ip: IpAddr, + state: &WebState, + sender: &mut SplitSink, +) { + let Ok(value) = serde_json::from_str::(text) else { + return; + }; + if value.get("type").and_then(serde_json::Value::as_str) != Some("chat") { + return; + } + + let name = value + .get("name") + .and_then(serde_json::Value::as_str) + .unwrap_or_default() + .trim(); + let message = value + .get("text") + .and_then(serde_json::Value::as_str) + .unwrap_or_default() + .trim(); + if name.is_empty() || message.is_empty() { + return; + } + + // Basic sanity caps so a single message can't blow up the UI. + let name: String = name.chars().take(MAX_CHAT_NAME_LEN).collect(); + let message: String = message.chars().take(MAX_CHAT_MESSAGE_LEN).collect(); + + match check_rate_limit(&state.chat_rate_limiter, ip, state.chat_rate_limit) { + Ok(()) => { + let json = + serde_json::json!({ "type": "chat", "name": name, "text": message, "ip": ip }); + let _ = state.chat_tx.send(Utf8Bytes::from(json.to_string())); + } + Err(recent) => { + let json = serde_json::json!({ + "type": "chat_error", + "text": format!( + "Your IP {ip} already sent {recent} messages in the last minute, limit is {}", + state.chat_rate_limit, + ), + }); + let _ = sender + .send(Message::Text(Utf8Bytes::from(json.to_string()))) + .await; + } + } +} + +/// Records a chat message for `ip` if it is within the per-IP rate limit. +/// +/// Returns `Ok(())` if allowed (and records the message), or `Err(recent)` with the number of +/// messages already sent within [`CHAT_RATE_LIMIT_WINDOW`] if the limit has been reached. +fn check_rate_limit(limiter: &ChatRateLimiter, ip: IpAddr, limit: u32) -> Result<(), usize> { + let now = Instant::now(); + let mut limiter = limiter.lock().expect("chat rate limiter mutex poisoned"); + let timestamps = limiter.entry(ip).or_default(); + + // Drop timestamps that have aged out of the window. + while timestamps + .front() + .is_some_and(|&t| now.duration_since(t) > CHAT_RATE_LIMIT_WINDOW) + { + timestamps.pop_front(); + } + + if timestamps.len() >= limit as usize { + Err(timestamps.len()) + } else { + timestamps.push_back(now); + Ok(()) + } +} diff --git a/breakwater/src/sinks/web_index.html b/breakwater/src/sinks/web_index.html new file mode 100644 index 0000000..7463e1c --- /dev/null +++ b/breakwater/src/sinks/web_index.html @@ -0,0 +1,979 @@ + + + + + + Pixelflut viewer + + + +
+ +
+ +
+ +
Connecting…
+
connecting…
+ +
+ Pixelflut canvas (breakwater) + + +
+
+ + + +
+ + + + diff --git a/breakwater/src/statistics.rs b/breakwater/src/statistics.rs index 7d76fef..d6cabfb 100644 --- a/breakwater/src/statistics.rs +++ b/breakwater/src/statistics.rs @@ -59,6 +59,7 @@ pub struct StatisticsInformationEvent { pub connections_for_ip: HashMap, pub denied_connections_for_ip: HashMap, pub bytes_for_ip: HashMap, + pub bytes_per_s_for_ip: HashMap, pub statistic_events: u64, } @@ -75,6 +76,7 @@ pub struct Statistics { bytes_for_ip: HashMap, bytes_per_s_window: SingleSumSMA, + bytes_per_s_for_ip_window: HashMap>, fps_window: SingleSumSMA, statistics_save_mode: StatisticsSaveMode, @@ -130,6 +132,7 @@ impl Statistics { denied_connections_for_ip: HashMap::new(), bytes_for_ip: HashMap::new(), bytes_per_s_window: SingleSumSMA::new(), + bytes_per_s_for_ip_window: HashMap::new(), fps_window: SingleSumSMA::new(), statistics_save_mode, }; @@ -240,6 +243,19 @@ impl Statistics { let bytes = self.bytes_for_ip.values().sum(); self.bytes_per_s_window .add_sample((bytes - prev.bytes) * 1000 / elapsed_ms); + for (ip, bytes) in &self.bytes_for_ip { + if let Some(prev_bytes) = prev.bytes_for_ip.get(ip) { + self.bytes_per_s_for_ip_window + .entry(*ip) + .or_insert_with(SingleSumSMA::new) + .add_sample((bytes - prev_bytes) * 1000 / elapsed_ms); + } + } + // Drop windows that have decayed to zero, so the map — and the JSON we broadcast every + // second — stays bounded to recently-active IPs rather than every IP ever seen. + self.bytes_per_s_for_ip_window + .retain(|_, window| window.get_average() > 0); + self.fps_window .add_sample((frame - prev.frame) * 1000 / elapsed_ms); let statistic_events = self.statistic_events; @@ -255,6 +271,11 @@ impl Statistics { connections_for_ip: self.connections_for_ip.clone(), denied_connections_for_ip: self.denied_connections_for_ip.clone(), bytes_for_ip: self.bytes_for_ip.clone(), + bytes_per_s_for_ip: self + .bytes_per_s_for_ip_window + .iter() + .map(|(ip, bytes_per_s)| (*ip, bytes_per_s.get_average())) + .collect(), statistic_events, } }