diff --git a/crates/weather-ingest/src/gfs.rs b/crates/weather-ingest/src/gfs.rs index 34c6fdf..1372071 100644 --- a/crates/weather-ingest/src/gfs.rs +++ b/crates/weather-ingest/src/gfs.rs @@ -4,9 +4,11 @@ //! (721×1440 = 1.04M points), so any number of cities sample for free. use std::io::Cursor; +use std::time::Duration; use anyhow::{bail, ensure, Context, Result}; use grib::{from_reader, Grib2SubmessageDecoder}; +use tracing::warn; const NODD: &str = "https://noaa-gfs-bdp-pds.s3.amazonaws.com"; // GFS pgrb2.0p25: 0.25° global grid, scanning +i (W→E) / −j (N→S) from (90N, 0E). @@ -89,6 +91,12 @@ pub fn k_to_f(k: f32) -> f32 { /// must be the native 0.25° grid (they are, straight from [`Field::decode`]). pub fn encode_uv_png(u: &Field, v: &Field) -> Result> { let norm = |x: f32| { + // A missing/NaN cell must not become a false gale: `NaN as u8` saturates + // to byte 0, which the web denormalizes to -WIND_MS_MAX (a ~40 m/s wind). + // Map non-finite to the mid-byte (≈ 0 m/s) instead. + if !x.is_finite() { + return 128u8; + } (((x + WIND_MS_MAX) / (2.0 * WIND_MS_MAX)) * 255.0) .round() .clamp(0.0, 255.0) as u8 @@ -117,7 +125,14 @@ pub fn encode_uv_png(u: &Field, v: &Field) -> Result> { /// Used for the REFC precip texture (dBZ over [`REFC_DBZ_MIN`, `REFC_DBZ_MAX`]). pub fn encode_scalar_png(field: &Field, min: f32, max: f32) -> Result> { let span = (max - min).max(f32::EPSILON); - let norm = |x: f32| (((x - min) / span) * 255.0).round().clamp(0.0, 255.0) as u8; + // Non-finite (missing) cells map to byte 0 = `min` — which the web renders at + // the floor (transparent) — never a spurious mid-range value. + let norm = |x: f32| { + if !x.is_finite() { + return 0u8; + } + (((x - min) / span) * 255.0).round().clamp(0.0, 255.0) as u8 + }; let gray: Vec = field.values.iter().map(|&x| norm(x)).collect(); let mut out = Vec::new(); { @@ -143,10 +158,19 @@ fn idx_byte_range(idx: &str, var: &str, level: &str) -> Result<(u64, Option let f: Vec<&str> = line.split(':').collect(); if f.len() >= 5 && f[3] == var && f[4] == level { let start = f[1].parse().context("idx byte offset")?; - let end = lines - .get(i + 1) - .and_then(|l| l.split(':').nth(1)) - .and_then(|o| o.parse().ok()); + // The next record's offset bounds this one; its absence means this is + // the trailing field (read to EOF). A present-but-unparseable next + // offset is corruption, not EOF — surface it rather than over-reading. + let end = match lines.get(i + 1) { + None => None, + Some(next) => Some( + next.split(':') + .nth(1) + .context("malformed .idx: next line has no offset")? + .parse() + .context("idx next byte offset")?, + ), + }; return Ok((start, end)); } } @@ -154,6 +178,8 @@ fn idx_byte_range(idx: &str, var: &str, level: &str) -> Result<(u64, Option } /// Fetch one field: read the `.idx`, byte-range the message, decode the grid. +/// The planet-wide fan-out is dozens of GETs per refresh, so a single flaky +/// request retries with backoff rather than aborting the whole job. pub async fn fetch_field( http: &reqwest::Client, date: &str, @@ -161,6 +187,36 @@ pub async fn fetch_field( fhour: u16, var: &str, level: &str, +) -> Result { + const ATTEMPTS: u32 = 3; + let mut last: Option = None; + for attempt in 0..ATTEMPTS { + if attempt > 0 { + // 300ms, 600ms backoff. + tokio::time::sleep(Duration::from_millis(300 * (1u64 << (attempt - 1)))).await; + } + match fetch_field_once(http, date, hour, fhour, var, level).await { + Ok(field) => return Ok(field), + Err(e) => { + warn!( + "fetch_field {var}:{level} f{fhour:03} attempt {} failed: {e:#}", + attempt + 1 + ); + last = Some(e); + } + } + } + Err(last.expect("loop runs at least once")) + .with_context(|| format!("fetch_field {var}:{level} f{fhour:03} after {ATTEMPTS} attempts")) +} + +async fn fetch_field_once( + http: &reqwest::Client, + date: &str, + hour: u8, + fhour: u16, + var: &str, + level: &str, ) -> Result { let base = field_url(date, hour, fhour); let idx = http @@ -172,7 +228,10 @@ pub async fn fetch_field( .await?; let (start, end) = idx_byte_range(&idx, var, level)?; let range = match end { - Some(e) => format!("bytes={start}-{}", e - 1), + Some(e) => format!( + "bytes={start}-{}", + e.checked_sub(1).context("idx end offset underflow")? + ), None => format!("bytes={start}-"), }; let bytes = http diff --git a/crates/weather-ingest/src/main.rs b/crates/weather-ingest/src/main.rs index 1525b8a..58541b2 100644 --- a/crates/weather-ingest/src/main.rs +++ b/crates/weather-ingest/src/main.rs @@ -42,9 +42,18 @@ struct Config { impl Config { fn from_env() -> Result { - Ok(Self { - nws_area: std::env::var("NWS_AREA").unwrap_or_default(), - }) + let nws_area = std::env::var("NWS_AREA").unwrap_or_default(); + // This goes straight into the alerts query string; constrain it to NWS + // area codes (comma-separated 2-letter, e.g. "TX,OK") so nothing else can + // be injected into the URL. + if !nws_area.is_empty() + && !nws_area + .split(',') + .all(|c| c.len() == 2 && c.bytes().all(|b| b.is_ascii_uppercase())) + { + bail!("NWS_AREA must be comma-separated 2-letter codes (e.g. TX,OK), got {nws_area:?}"); + } + Ok(Self { nws_area }) } } @@ -290,18 +299,9 @@ async fn fetch_temps(http: &reqwest::Client, sink: &Sink) -> Result<()> { res??; } } - let index = CityTileIndex { - snapshot_ms, - hours: hours.clone(), - min_zoom: CITYTILE_MIN_Z, - max_zoom: CITYTILE_MAX_Z, - }; - sink.publish("citytile/latest.json", &serde_json::to_value(index)?, 300) - .await?; - // Whole-planet lattice (zoomed out) as a single file — same snapshot + hours // as the tiles, so the timeline stays in lockstep. `i`/`j` let the web thin - // the grid at low zoom. + // the grid at low zoom. Written before the latest.json pointer below. let lattice_count = lattice.len(); let lattice_feats: Vec> = lattice .into_iter() @@ -317,9 +317,22 @@ async fn fetch_temps(http: &reqwest::Client, sink: &Sink) -> Result<()> { ) }) .collect(); - let lattice_body = serde_json::to_value(CityTile::new(snapshot_ms, hours, lattice_feats))?; + let lattice_body = + serde_json::to_value(CityTile::new(snapshot_ms, hours.clone(), lattice_feats))?; sink.publish("lattice.json", &lattice_body, 3600).await?; + // The latest.json pointer commits the snapshot, so write it LAST — after the + // immutable tiles and the lattice — and a consumer that reads it never races + // a not-yet-written artifact. + let index = CityTileIndex { + snapshot_ms, + hours, + min_zoom: CITYTILE_MIN_Z, + max_zoom: CITYTILE_MAX_Z, + }; + sink.publish("citytile/latest.json", &serde_json::to_value(index)?, 300) + .await?; + info!( "temps: {} cities → {tile_count} tiles (z{CITYTILE_MIN_Z}-{CITYTILE_MAX_Z}) + {lattice_count} lattice points in {:.1?}", cities.len(),