Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 65 additions & 6 deletions crates/weather-ingest/src/gfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
//! (721×1440 = 1.04M points), so any number of cities sample for free.

use std::io::Cursor;
use std::time::Duration;

use anyhow::{bail, ensure, Context, Result};
use grib::{from_reader, Grib2SubmessageDecoder};
use tracing::warn;

const NODD: &str = "https://noaa-gfs-bdp-pds.s3.amazonaws.com";
// GFS pgrb2.0p25: 0.25° global grid, scanning +i (W→E) / −j (N→S) from (90N, 0E).
Expand Down Expand Up @@ -89,6 +91,12 @@ pub fn k_to_f(k: f32) -> f32 {
/// must be the native 0.25° grid (they are, straight from [`Field::decode`]).
pub fn encode_uv_png(u: &Field, v: &Field) -> Result<Vec<u8>> {
let norm = |x: f32| {
// A missing/NaN cell must not become a false gale: `NaN as u8` saturates
// to byte 0, which the web denormalizes to -WIND_MS_MAX (a ~40 m/s wind).
// Map non-finite to the mid-byte (≈ 0 m/s) instead.
if !x.is_finite() {
return 128u8;
}
(((x + WIND_MS_MAX) / (2.0 * WIND_MS_MAX)) * 255.0)
.round()
.clamp(0.0, 255.0) as u8
Expand Down Expand Up @@ -117,7 +125,14 @@ pub fn encode_uv_png(u: &Field, v: &Field) -> Result<Vec<u8>> {
/// Used for the REFC precip texture (dBZ over [`REFC_DBZ_MIN`, `REFC_DBZ_MAX`]).
pub fn encode_scalar_png(field: &Field, min: f32, max: f32) -> Result<Vec<u8>> {
let span = (max - min).max(f32::EPSILON);
let norm = |x: f32| (((x - min) / span) * 255.0).round().clamp(0.0, 255.0) as u8;
// Non-finite (missing) cells map to byte 0 = `min` — which the web renders at
// the floor (transparent) — never a spurious mid-range value.
let norm = |x: f32| {
if !x.is_finite() {
return 0u8;
}
(((x - min) / span) * 255.0).round().clamp(0.0, 255.0) as u8
};
let gray: Vec<u8> = field.values.iter().map(|&x| norm(x)).collect();
let mut out = Vec::new();
{
Expand All @@ -143,24 +158,65 @@ fn idx_byte_range(idx: &str, var: &str, level: &str) -> Result<(u64, Option<u64>
let f: Vec<&str> = line.split(':').collect();
if f.len() >= 5 && f[3] == var && f[4] == level {
let start = f[1].parse().context("idx byte offset")?;
let end = lines
.get(i + 1)
.and_then(|l| l.split(':').nth(1))
.and_then(|o| o.parse().ok());
// The next record's offset bounds this one; its absence means this is
// the trailing field (read to EOF). A present-but-unparseable next
// offset is corruption, not EOF — surface it rather than over-reading.
let end = match lines.get(i + 1) {
None => None,
Some(next) => Some(
next.split(':')
.nth(1)
.context("malformed .idx: next line has no offset")?
.parse()
.context("idx next byte offset")?,
),
};
return Ok((start, end));
}
}
bail!("{var}:{level} not in .idx")
}

/// Fetch one field: read the `.idx`, byte-range the message, decode the grid.
/// The planet-wide fan-out is dozens of GETs per refresh, so a single flaky
/// request retries with backoff rather than aborting the whole job.
pub async fn fetch_field(
http: &reqwest::Client,
date: &str,
hour: u8,
fhour: u16,
var: &str,
level: &str,
) -> Result<Field> {
const ATTEMPTS: u32 = 3;
let mut last: Option<anyhow::Error> = None;
for attempt in 0..ATTEMPTS {
if attempt > 0 {
// 300ms, 600ms backoff.
tokio::time::sleep(Duration::from_millis(300 * (1u64 << (attempt - 1)))).await;
}
match fetch_field_once(http, date, hour, fhour, var, level).await {
Ok(field) => return Ok(field),
Err(e) => {
warn!(
"fetch_field {var}:{level} f{fhour:03} attempt {} failed: {e:#}",
attempt + 1
);
last = Some(e);
}
}
}
Err(last.expect("loop runs at least once"))
.with_context(|| format!("fetch_field {var}:{level} f{fhour:03} after {ATTEMPTS} attempts"))
}

async fn fetch_field_once(
http: &reqwest::Client,
date: &str,
hour: u8,
fhour: u16,
var: &str,
level: &str,
) -> Result<Field> {
let base = field_url(date, hour, fhour);
let idx = http
Expand All @@ -172,7 +228,10 @@ pub async fn fetch_field(
.await?;
let (start, end) = idx_byte_range(&idx, var, level)?;
let range = match end {
Some(e) => format!("bytes={start}-{}", e - 1),
Some(e) => format!(
"bytes={start}-{}",
e.checked_sub(1).context("idx end offset underflow")?
),
None => format!("bytes={start}-"),
};
let bytes = http
Expand Down
41 changes: 27 additions & 14 deletions crates/weather-ingest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,18 @@ struct Config {

impl Config {
fn from_env() -> Result<Self> {
Ok(Self {
nws_area: std::env::var("NWS_AREA").unwrap_or_default(),
})
let nws_area = std::env::var("NWS_AREA").unwrap_or_default();
// This goes straight into the alerts query string; constrain it to NWS
// area codes (comma-separated 2-letter, e.g. "TX,OK") so nothing else can
// be injected into the URL.
if !nws_area.is_empty()
&& !nws_area
.split(',')
.all(|c| c.len() == 2 && c.bytes().all(|b| b.is_ascii_uppercase()))
{
bail!("NWS_AREA must be comma-separated 2-letter codes (e.g. TX,OK), got {nws_area:?}");
}
Ok(Self { nws_area })
}
}

Expand Down Expand Up @@ -290,18 +299,9 @@ async fn fetch_temps(http: &reqwest::Client, sink: &Sink) -> Result<()> {
res??;
}
}
let index = CityTileIndex {
snapshot_ms,
hours: hours.clone(),
min_zoom: CITYTILE_MIN_Z,
max_zoom: CITYTILE_MAX_Z,
};
sink.publish("citytile/latest.json", &serde_json::to_value(index)?, 300)
.await?;

// Whole-planet lattice (zoomed out) as a single file — same snapshot + hours
// as the tiles, so the timeline stays in lockstep. `i`/`j` let the web thin
// the grid at low zoom.
// the grid at low zoom. Written before the latest.json pointer below.
let lattice_count = lattice.len();
let lattice_feats: Vec<Feature<Point, LatticeForecast>> = lattice
.into_iter()
Expand All @@ -317,9 +317,22 @@ async fn fetch_temps(http: &reqwest::Client, sink: &Sink) -> Result<()> {
)
})
.collect();
let lattice_body = serde_json::to_value(CityTile::new(snapshot_ms, hours, lattice_feats))?;
let lattice_body =
serde_json::to_value(CityTile::new(snapshot_ms, hours.clone(), lattice_feats))?;
sink.publish("lattice.json", &lattice_body, 3600).await?;

// The latest.json pointer commits the snapshot, so write it LAST — after the
// immutable tiles and the lattice — and a consumer that reads it never races
// a not-yet-written artifact.
let index = CityTileIndex {
snapshot_ms,
hours,
min_zoom: CITYTILE_MIN_Z,
max_zoom: CITYTILE_MAX_Z,
};
sink.publish("citytile/latest.json", &serde_json::to_value(index)?, 300)
.await?;

info!(
"temps: {} cities → {tile_count} tiles (z{CITYTILE_MIN_Z}-{CITYTILE_MAX_Z}) + {lattice_count} lattice points in {:.1?}",
cities.len(),
Expand Down