diff --git a/README.md b/README.md
index 9015bb8b..53e8c1e8 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@
Sync OCI container images across registries - efficiently.
-
+
[](https://github.com/clowdhaus/ocync/actions/workflows/ci.yml)
diff --git a/crates/ocync-distribution/src/auth/docker.rs b/crates/ocync-distribution/src/auth/docker.rs
index 46e401f2..ace7aca2 100644
--- a/crates/ocync-distribution/src/auth/docker.rs
+++ b/crates/ocync-distribution/src/auth/docker.rs
@@ -238,7 +238,7 @@ async fn run_credential_helper(helper: &str, registry: &str) -> Result Result Result Result) -> TargetCh
while let Some((target_name, target_client, batch_checker, result)) = head_checks.next().await {
match result {
Ok(Some(head)) if head.digest == *compare_digest => {
- info!(
+ debug!(
source_repo = %source.repo,
source_tag = %source.tag,
target_repo = %target.repo,
@@ -1574,7 +1574,7 @@ async fn full_pull_and_build_tasks(params: FullPullParams<'_>) -> DiscoveryOutco
while let Some((target_name, target_client, batch_checker, result)) = head_checks.next().await {
match result {
Ok(Some(head)) if head.digest == *source_digest => {
- info!(
+ debug!(
source_repo = %source.repo,
source_tag = %source.tag,
target_repo = %target.repo,
@@ -1763,7 +1763,7 @@ async fn execute_item(
.notify_repo_failed(&item.target_name, &item.target.repo);
if is_immutable_tag_error(&err) {
- info!(
+ debug!(
source_repo = %item.source.repo,
target_repo = %item.target.repo,
target_tag = %item.target.tag,
@@ -2868,7 +2868,7 @@ async fn discover_referrers(
}
},
Err(e) if e.is_not_found() => {
- info!(
+ debug!(
repo = %source_repo,
digest = %parent_digest,
"no referrers found (API 404, tag fallback 404)"
@@ -2877,7 +2877,7 @@ async fn discover_referrers(
}
Err(e) => {
// Non-404 error on fallback is not fatal; log and continue.
- info!(
+ debug!(
repo = %source_repo,
digest = %parent_digest,
error = %e,
@@ -2891,7 +2891,7 @@ async fn discover_referrers(
Err(e) => {
// Non-404 error from referrers API. Log and continue rather
// than failing the entire image sync for an artifact query.
- info!(
+ debug!(
repo = %source_repo,
digest = %parent_digest,
error = %e,
diff --git a/crates/ocync-sync/src/filter.rs b/crates/ocync-sync/src/filter.rs
index 1acaa97c..ed1a63ff 100644
--- a/crates/ocync-sync/src/filter.rs
+++ b/crates/ocync-sync/src/filter.rs
@@ -6,7 +6,7 @@ use std::sync::OnceLock;
use globset::{Glob, GlobBuilder, GlobSet, GlobSetBuilder};
use serde::{Deserialize, Serialize};
-use tracing::warn;
+use tracing::debug;
use crate::Error;
@@ -111,6 +111,48 @@ impl FilterConfig {
self.run_pipeline(tags, true)
}
+ /// One-line summary of the active filter clauses, e.g.
+ /// `semver >=1.0.0, latest=5`. Returns `None` when no filtering applies.
+ ///
+ /// Sole formatter for filter rationale shown in non-dry-run logs; uses
+ /// the same `FilterConfig` fields the pipeline operates on so adding a
+ /// new field to the config will fail tests here before it ships.
+ pub fn describe(&self) -> Option {
+ let mut parts = Vec::new();
+ if !self.glob.is_empty() {
+ parts.push(format!("glob {}", self.glob.join(",")));
+ }
+ if let Some(ref s) = self.semver {
+ parts.push(format!("semver {s}"));
+ }
+ if !self.exclude.is_empty() {
+ parts.push(format!("exclude {}", self.exclude.join(",")));
+ }
+ if !self.include.is_empty() {
+ parts.push(format!("include {}", self.include.join(",")));
+ }
+ if let Some(order) = self.sort {
+ parts.push(format!(
+ "sort {}",
+ match order {
+ SortOrder::Semver => "semver",
+ SortOrder::Alpha => "alpha",
+ }
+ ));
+ }
+ if let Some(n) = self.latest {
+ parts.push(format!("latest={n}"));
+ }
+ if let Some(n) = self.min_tags {
+ parts.push(format!("min_tags={n}"));
+ }
+ if parts.is_empty() {
+ None
+ } else {
+ Some(parts.join(", "))
+ }
+ }
+
/// Shared pipeline implementation. When `track` is false (the real-sync
/// hot path), per-stage `StageDelta` and per-reason `DropReason` are not
/// constructed; the resulting `Filtered.report` carries empty vectors.
@@ -171,18 +213,18 @@ impl FilterConfig {
range: range.to_owned(),
reason: e.to_string(),
})?;
- let (kept, dropped) =
- partition_with_drop(
- &pipeline,
- track,
- |t| match crate::version::TagVersion::parse(t) {
- Some(ver) => req.matches(&ver),
- None => {
- warn!(tag = t, "tag is not parseable as a version, dropping");
- false
- }
- },
- );
+ let (kept, dropped) = partition_with_drop(&pipeline, track, |t| {
+ if is_referrers_fallback_tag(t) {
+ return false;
+ }
+ match crate::version::TagVersion::parse(t) {
+ Some(ver) => req.matches(&ver),
+ None => {
+ debug!(tag = t, "tag is not parseable as a version, dropping");
+ false
+ }
+ }
+ });
push_drop_reason(
&mut drop_reasons,
track,
@@ -248,7 +290,19 @@ impl FilterConfig {
}
if let Some(order) = self.sort {
+ let before = pipeline.len();
sort_tags_in_place(&mut pipeline, order);
+ if track {
+ let label = match order {
+ SortOrder::Semver => "sort semver desc",
+ SortOrder::Alpha => "sort alpha desc",
+ };
+ pipeline_stages.push(StageDelta {
+ label: label.to_string(),
+ count_in: before,
+ count_out: pipeline.len(),
+ });
+ }
}
if let Some(n) = self.latest {
let before = pipeline.len();
@@ -265,13 +319,8 @@ impl FilterConfig {
pipeline.truncate(n);
}
if track {
- let label = match self.sort {
- Some(SortOrder::Semver) => format!("sort semver desc, latest {n}"),
- Some(SortOrder::Alpha) => format!("sort alpha desc, latest {n}"),
- None => format!("latest {n}"),
- };
pipeline_stages.push(StageDelta {
- label,
+ label: format!("keep latest {n}"),
count_in: before,
count_out: pipeline.len(),
});
@@ -477,6 +526,18 @@ fn push_drop_reason(
// Individual stages
// ---------------------------------------------------------------------------
+/// True for OCI 1.1 referrers fallback tags (`-` and the cosign
+/// `.sig`/`.sbom`/`.att` variants). These are pointers to artifacts, not image
+/// versions, and will never satisfy a semver range -- skip parsing so they do
+/// not appear in the unparseable-tag log channel.
+///
+/// Public so that observability/UX code outside the filter pipeline (e.g. the
+/// CLI's "no tags matched" warn) can partition source tag lists without
+/// reintroducing a duplicate prefix check that would drift over time.
+pub fn is_referrers_fallback_tag(tag: &str) -> bool {
+ tag.starts_with("sha256-") || tag.starts_with("sha512-")
+}
+
/// Build a [`GlobSet`] from patterns, returning an error on invalid patterns.
pub fn build_glob_set(patterns: &[String]) -> Result {
let mut builder = GlobSetBuilder::new();
@@ -520,8 +581,11 @@ fn filter_semver<'a>(tags: &[&'a str], range: &str) -> Result, Erro
.iter()
.copied()
.filter(|tag| {
+ if is_referrers_fallback_tag(tag) {
+ return false;
+ }
let Some(ver) = crate::version::TagVersion::parse(tag) else {
- warn!(tag, "tag is not parseable as a version, dropping");
+ debug!(tag, "tag is not parseable as a version, dropping");
return false;
};
req.matches(&ver)
@@ -728,6 +792,24 @@ mod tests {
assert_eq!(result, vec!["1.0.0"]);
}
+ /// Referrers fallback tags (cosign signatures, SBOMs, attestations) bypass
+ /// the version parser. They drop silently so noisy unparseable-tag logs do
+ /// not fire once per artifact tag per image.
+ #[test]
+ fn semver_skips_referrers_fallback_tags() {
+ let tags = vec![
+ "1.0.0",
+ "sha256-abc123def456.sig",
+ "sha256-abc123def456.sbom",
+ "sha256-abc123def456.att",
+ "sha256-abc123def456",
+ "sha512-deadbeef.sig",
+ "2.0.0",
+ ];
+ let result = filter_semver(&tags, ">=1.0.0").unwrap();
+ assert_eq!(result, vec!["1.0.0", "2.0.0"]);
+ }
+
// - pipeline tests ----------------------------------------------------
#[test]
@@ -1378,4 +1460,68 @@ mod tests {
assert_eq!(reason.count, reason.samples.len());
}
}
+
+ // - describe ----------------------------------------------------------
+
+ #[test]
+ fn describe_default_returns_none() {
+ assert!(FilterConfig::default().describe().is_none());
+ }
+
+ #[test]
+ fn describe_combines_clauses_in_pipeline_order() {
+ let config = FilterConfig {
+ glob: vec!["1.*".into()],
+ semver: Some(">=1.0.0".into()),
+ exclude: vec!["*-rc*".into()],
+ sort: Some(SortOrder::Semver),
+ latest: Some(5),
+ min_tags: Some(1),
+ ..FilterConfig::default()
+ };
+ assert_eq!(
+ config.describe().as_deref(),
+ Some("glob 1.*, semver >=1.0.0, exclude *-rc*, sort semver, latest=5, min_tags=1")
+ );
+ }
+
+ /// Regression guard: every `FilterConfig` field that influences selection
+ /// must contribute to `describe()`. Adding a new field without updating
+ /// `describe()` would silently leave INFO-line filter rationale stale.
+ #[test]
+ fn describe_covers_every_selection_field() {
+ let cfg = FilterConfig {
+ include: vec!["latest".into()],
+ glob: vec!["v*".into()],
+ semver: Some("^1".into()),
+ exclude: vec!["nightly".into()],
+ sort: Some(SortOrder::Alpha),
+ latest: Some(3),
+ min_tags: Some(2),
+ };
+ let desc = cfg.describe().expect("non-empty config describes");
+ for needle in [
+ "include latest",
+ "glob v*",
+ "semver ^1",
+ "exclude nightly",
+ "sort alpha",
+ "latest=3",
+ "min_tags=2",
+ ] {
+ assert!(desc.contains(needle), "missing {needle:?} in {desc:?}");
+ }
+ }
+
+ // - is_referrers_fallback_tag ----------------------------------------
+
+ #[test]
+ fn referrers_fallback_detection() {
+ assert!(is_referrers_fallback_tag("sha256-abcdef"));
+ assert!(is_referrers_fallback_tag("sha256-deadbeef.sig"));
+ assert!(is_referrers_fallback_tag("sha512-cafef00d.sbom"));
+ assert!(!is_referrers_fallback_tag("v1.0.0"));
+ assert!(!is_referrers_fallback_tag("latest"));
+ assert!(!is_referrers_fallback_tag("sha256")); // no dash
+ }
}
diff --git a/crates/ocync-sync/src/lib.rs b/crates/ocync-sync/src/lib.rs
index 3cf88e26..579cf6be 100644
--- a/crates/ocync-sync/src/lib.rs
+++ b/crates/ocync-sync/src/lib.rs
@@ -184,7 +184,7 @@ impl std::fmt::Display for ErrorKind {
}
/// Aggregate statistics for a sync run.
-#[derive(Debug, Default, Serialize)]
+#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)]
pub struct SyncStats {
/// Number of images successfully synced.
pub images_synced: u64,
diff --git a/docs/public/ecr-banner.png b/docs/public/ecr-banner.png
new file mode 100644
index 00000000..bf47edf9
Binary files /dev/null and b/docs/public/ecr-banner.png differ
diff --git a/docs/src/content/cli-reference.md b/docs/src/content/cli-reference.md
index c96e076e..f54dd58f 100644
--- a/docs/src/content/cli-reference.md
+++ b/docs/src/content/cli-reference.md
@@ -48,11 +48,11 @@ ocync sync -c config.yaml --json
`--dry-run` runs the full filter pipeline against each mapping's source tags and prints, per mapping:
-- **`source candidates: N`** -- the number of tags fetched from the source.
+- **`source tags: N`** -- the number of tags fetched from the source.
- **`include path:`** -- tags rescued via `include:` (bypasses `glob:`/`semver:` and the system-exclude defaults). Default cap is 5 names; `-v` removes the cap.
-- **`pipeline:`** -- per-stage attrition (`glob`, `semver`, `exclude`, `latest`). Each row shows count_in -> count_out and the drop count.
+- **`filter:`** -- per-stage attrition (`glob`, `semver`, `exclude`, `sort`, `keep latest`). Each row shows count_in -> count_out and the drop count.
- **`kept (N):`** -- the final tags. When `include:` is used, rescued tags are listed first and tagged `[via include]` so the rescue path is visible.
-- **`dropped N:`** -- Pareto-sorted drop attribution (largest cause first), with sample tag names per reason. Default cap is 5 names per reason; `-v` removes the cap.
+- **`dropped (N):`** -- Pareto-sorted drop attribution (largest cause first), with sample tag names per reason. Default cap is 5 names per reason; `-v` removes the cap.
- **`min_tags: N`** -- when `min_tags:` is configured, the line prints `kept M, satisfied` or `kept M, real sync will FAIL with BelowMinTags`. Real-sync (no `--dry-run`) errors out below `min_tags`; dry-run shows the report and surfaces the gap so the configuration can be fixed before running.
## copy
diff --git a/src/cli/commands/analyze.rs b/src/cli/commands/analyze.rs
index d5fdd09f..c0974b9a 100644
--- a/src/cli/commands/analyze.rs
+++ b/src/cli/commands/analyze.rs
@@ -18,7 +18,7 @@ use ocync_distribution::{Digest, RepositoryName};
use ocync_sync::ShutdownSignal;
-use crate::cli::commands::synchronize::{build_clients, resolve_mapping};
+use crate::cli::commands::synchronize::{MappingResolution, build_clients, resolve_mapping};
use crate::cli::config::load_config;
use crate::cli::output::format_bytes;
use crate::cli::{CliError, ExitCode};
@@ -68,8 +68,8 @@ pub(crate) async fn run(
let resolved =
match resolve_mapping(mapping, &config, &clients, &no_checkers, false).await? {
- Some(r) => r,
- None => continue,
+ MappingResolution::Resolved(r) => r,
+ MappingResolution::NoMatchingTags(_) => continue,
};
for tag_pair in &resolved.tags {
diff --git a/src/cli/commands/dry_run.rs b/src/cli/commands/dry_run.rs
index d807cb82..69bd83fa 100644
--- a/src/cli/commands/dry_run.rs
+++ b/src/cli/commands/dry_run.rs
@@ -49,7 +49,7 @@ fn write_mapping(w: &mut W, m: &ResolvedMapping, verbose: bool) -> io:
let target_names: Vec<&str> = m.targets.iter().map(|t| &*t.name).collect();
writeln!(
w,
- "dry-run: {} -> {} => [{}]",
+ "dry-run: {} -> {} [{}]",
m.source_repo,
m.target_repo,
target_names.join(", ")
@@ -61,7 +61,7 @@ fn write_mapping(w: &mut W, m: &ResolvedMapping, verbose: bool) -> io:
return write_simple_tag_list(w, &m.tags);
};
- writeln!(w, " source candidates: {}", report.candidate_count)?;
+ writeln!(w, " source tags: {}", report.candidate_count)?;
writeln!(w)?;
if !report.include_kept.is_empty() {
@@ -107,7 +107,7 @@ fn write_include_path(w: &mut W, report: &FilterReport, verbose: bool)
}
fn write_pipeline(w: &mut W, report: &FilterReport) -> io::Result<()> {
- writeln!(w, " pipeline:")?;
+ writeln!(w, " filter:")?;
for stage in &report.pipeline {
let delta = stage.count_in as isize - stage.count_out as isize;
let delta_str = if delta != 0 {
@@ -115,11 +115,11 @@ fn write_pipeline(w: &mut W, report: &FilterReport) -> io::Result<()>
} else {
String::new()
};
- writeln!(
- w,
+ let line = format!(
" {:<28} {:>4} -> {:<4}{}",
stage.label, stage.count_in, stage.count_out, delta_str
- )?;
+ );
+ writeln!(w, "{}", line.trim_end())?;
}
Ok(())
}
@@ -150,7 +150,7 @@ fn write_dropped(w: &mut W, report: &FilterReport, verbose: bool) -> i
if total == 0 {
return Ok(false);
}
- writeln!(w, " dropped {total}:")?;
+ writeln!(w, " dropped ({total}):")?;
for reason in &report.dropped {
let samples_display = render_samples(&reason.samples, verbose);
// `LatestCap` reads as a complete clause ("over latest=N limit"); every
@@ -167,8 +167,7 @@ fn write_dropped(w: &mut W, report: &FilterReport, verbose: bool) -> i
if matches!(reason.kind, DropKind::SystemExclude) {
writeln!(
w,
- " {:<28}to keep prereleases, list patterns under include: (globs supported)",
- ""
+ " hint: to keep prereleases, list patterns under include: (globs supported)"
)?;
}
}
@@ -394,8 +393,8 @@ mod tests {
assert!(out.contains("tags (2):"), "{out}");
assert!(out.contains(" v1.0.0\n"), "{out}");
assert!(out.contains(" v1.1.0\n"), "{out}");
- // No pipeline/kept/dropped sections appear.
- assert!(!out.contains("pipeline:"), "{out}");
+ // No filter/kept/dropped sections appear.
+ assert!(!out.contains("filter:"), "{out}");
assert!(!out.contains("kept ("), "{out}");
assert!(!out.contains("dropped"), "{out}");
}
diff --git a/src/cli/commands/synchronize.rs b/src/cli/commands/synchronize.rs
index 498c05b3..210c87b7 100644
--- a/src/cli/commands/synchronize.rs
+++ b/src/cli/commands/synchronize.rs
@@ -16,7 +16,7 @@ use ocync_sync::engine::{
DEFAULT_MAX_CONCURRENT_TRANSFERS, RegistryAlias, ResolvedArtifacts, ResolvedMapping,
SyncEngine, TagPair, TargetEntry,
};
-use ocync_sync::filter::{FilterConfig, build_glob_set};
+use ocync_sync::filter::{FilterConfig, build_glob_set, is_referrers_fallback_tag};
use ocync_sync::retry::RetryConfig;
use ocync_sync::shutdown::ShutdownSignal;
use ocync_sync::staging::BlobStage;
@@ -25,6 +25,7 @@ use crate::SyncArgs;
use crate::cli::config::{
AuthType, Config, GlobOrList, MappingConfig, TagsConfig, load_config, resolve_target_names,
};
+use crate::cli::output::{format_bytes, format_duration};
use crate::cli::{CliError, ExitCode, bare_hostname, build_registry_client};
/// Default cache TTL: 12 hours.
@@ -33,6 +34,174 @@ pub(crate) const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(12 * 3600);
/// Default cache file name within the cache directory.
const CACHE_FILE_NAME: &str = "transfer_state.bin";
+/// Sample cap for the source-tag list shown in the no-tags-matched WARN.
+/// Mirrors `dry_run::SAMPLE_CAP` so both surfaces show the same depth of
+/// example data without overwhelming the log line.
+const NO_TAGS_SAMPLE_CAP: usize = 5;
+
+/// Outcome of resolving a single mapping. Either the mapping is ready for the
+/// engine, or no source tag survived filtering and the caller decides whether
+/// to log a WARN (sync mode: always; watch mode: only on transition).
+///
+/// The size disparity between variants is intentional: `ResolvedMapping` flows
+/// directly into `Vec` for the engine, so boxing it would
+/// just add a heap round-trip per success. The error variant is rare; we pay
+/// the disparity instead of the allocation traffic.
+#[allow(clippy::large_enum_variant)]
+pub(crate) enum MappingResolution {
+ Resolved(ResolvedMapping),
+ NoMatchingTags(NoTagsInfo),
+}
+
+/// Diagnostic context for a mapping whose filter rejected every source tag.
+///
+/// Fields together let an operator see, in one log line, the size and
+/// composition of the source repo (image tags vs OCI 1.1 referrer fallbacks),
+/// the active filter clauses, and example image tag names so the cause is
+/// obvious without spelunking.
+pub(crate) struct NoTagsInfo {
+ pub from: String,
+ pub image_count: usize,
+ pub artifact_count: usize,
+ /// Active filter clauses (e.g. `semver >=1.0.0, latest=5`). `None` only
+ /// when no filter is configured -- distinct from "filter description
+ /// missing" so the formatter can render an explicit fallback string.
+ pub filter_desc: Option,
+ /// Up to [`NO_TAGS_SAMPLE_CAP`] image-tag names. Excludes referrer
+ /// fallback tags so the example list is meaningful on cosign-heavy
+ /// repos like `cgr.dev/chainguard/*` (otherwise dominated by
+ /// `sha256-(.sig|.sbom|.att)` entries).
+ pub samples: Vec,
+}
+
+impl NoTagsInfo {
+ /// Total tags returned by `/v2//tags/list`. Derived: image + artifact.
+ fn source_total(&self) -> usize {
+ self.image_count + self.artifact_count
+ }
+
+ /// True when the source had more image tags than `samples` shows.
+ fn samples_truncated(&self) -> bool {
+ self.image_count > self.samples.len()
+ }
+}
+
+impl std::fmt::Display for NoTagsInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let total = self.source_total();
+ let total_phrase = if self.artifact_count > 0 {
+ format!(
+ "{total} source tags ({} image tags, {} referrer artifacts)",
+ self.image_count, self.artifact_count
+ )
+ } else {
+ format!("{total} source tags")
+ };
+ let filter = self
+ .filter_desc
+ .as_deref()
+ .unwrap_or("no filter configured");
+ let samples = if self.samples.is_empty() {
+ "".to_string()
+ } else if self.samples_truncated() {
+ format!("[{}, ...]", self.samples.join(", "))
+ } else {
+ format!("[{}]", self.samples.join(", "))
+ };
+ write!(
+ f,
+ "{}: 0 of {total_phrase} matched filter ({filter}); skipping. Source: {samples}",
+ self.from
+ )
+ }
+}
+
+/// Per-process state that lets watch-mode log on transitions instead of
+/// every cycle. Sync mode passes `None`. State lives in `watch::run` so it
+/// spans loop iterations.
+///
+/// Tracks three pieces of cross-cycle context:
+///
+/// 1. No-tags failure set: mappings whose filter rejected every source tag.
+/// 2. Per-mapping outcomes: the prior cycle's [`MappingOutcome`] keyed by
+/// `mapping.from`, used to detect both repeated and recovery transitions.
+/// 3. Per-cycle emit counter: bumped by every `observe_*` method that
+/// reports a transition; the watch loop reads it to gate the idle
+/// heartbeat.
+#[derive(Debug, Default)]
+pub(crate) struct WatchLogState {
+ warned_no_tags: HashSet,
+ last_outcomes: HashMap,
+ cycle_emit_count: u32,
+}
+
+impl WatchLogState {
+ pub(crate) fn begin_cycle(&mut self) {
+ self.cycle_emit_count = 0;
+ }
+
+ pub(crate) fn cycle_emit_count(&self) -> u32 {
+ self.cycle_emit_count
+ }
+
+ /// Record a no-match observation. Returns `true` on transition into the
+ /// failure state (caller emits a WARN); `false` when already failing.
+ fn observe_no_match(&mut self, from: &str) -> bool {
+ let changed = self.warned_no_tags.insert(from.to_string());
+ if changed {
+ self.cycle_emit_count = self.cycle_emit_count.saturating_add(1);
+ }
+ changed
+ }
+
+ /// Record a successful resolution. Returns `true` when the mapping was
+ /// previously in the failure set (caller emits a recovery INFO).
+ fn observe_resolved(&mut self, from: &str) -> bool {
+ let changed = self.warned_no_tags.remove(from);
+ if changed {
+ self.cycle_emit_count = self.cycle_emit_count.saturating_add(1);
+ }
+ changed
+ }
+
+ /// Record `outcome` as the latest result for `from`.
+ ///
+ /// Returns:
+ /// - `None` when the outcome is identical to the prior cycle (suppress).
+ /// - `Some(false)` on a non-recovery transition (emit normally).
+ /// - `Some(true)` when transitioning from `failed > 0` to `failed == 0`
+ /// (emit with `[recovered]` marker).
+ fn observe_mapping_outcome(&mut self, from: &str, outcome: &MappingOutcome) -> Option {
+ use std::collections::hash_map::Entry;
+ match self.last_outcomes.entry(from.to_string()) {
+ Entry::Occupied(mut slot) => {
+ let prev = *slot.get();
+ if &prev == outcome {
+ return None;
+ }
+ slot.insert(*outcome);
+ self.cycle_emit_count = self.cycle_emit_count.saturating_add(1);
+ Some(prev.failed > 0 && outcome.failed == 0)
+ }
+ Entry::Vacant(slot) => {
+ slot.insert(*outcome);
+ self.cycle_emit_count = self.cycle_emit_count.saturating_add(1);
+ Some(false)
+ }
+ }
+ }
+
+ /// Drop entries for mappings no longer in the active set so the state
+ /// does not grow unbounded across edits to the config.
+ fn retain_active<'a>(&mut self, active: impl IntoIterator- ) {
+ let active_set: HashSet<&str> = active.into_iter().collect();
+ self.warned_no_tags
+ .retain(|k| active_set.contains(k.as_str()));
+ self.last_outcomes
+ .retain(|k, _| active_set.contains(k.as_str()));
+ }
+}
+
/// Resolve the cache directory and file path from config.
///
/// Uses `global.cache_dir` if configured, otherwise places the cache
@@ -78,6 +247,7 @@ pub(crate) async fn run(
shutdown: Option<&ShutdownSignal>,
external_cache: Option>>,
verbose: bool,
+ mut watch_log: Option<&mut WatchLogState>,
) -> Result {
let config = load_config(&args.config)?;
@@ -86,14 +256,32 @@ pub(crate) async fn run(
let mut mappings = Vec::new();
for mapping in &config.mappings {
- match resolve_mapping(mapping, &config, &clients, &batch_checkers, args.dry_run).await {
- Ok(Some(resolved)) => mappings.push(resolved),
- Ok(None) => {} // no tags after filtering, logged inside
- Err(err) => return Err(err),
+ match resolve_mapping(mapping, &config, &clients, &batch_checkers, args.dry_run).await? {
+ MappingResolution::Resolved(resolved) => mappings.push(resolved),
+ MappingResolution::NoMatchingTags(info) => {
+ let should_warn = match watch_log.as_mut() {
+ Some(state) => state.observe_no_match(&info.from),
+ None => true,
+ };
+ if should_warn {
+ emit_no_tags_warn(&info);
+ }
+ }
}
}
- log_resolved_mappings(&mappings);
+ if let Some(state) = watch_log.as_mut() {
+ for resolved in &mappings {
+ let from = resolved.source_repo.as_str();
+ if state.observe_resolved(from) {
+ tracing::info!(
+ from = %from,
+ "{from}: filter now matches at least one tag; resuming sync"
+ );
+ }
+ }
+ state.retain_active(config.mappings.iter().map(|m| m.from.as_str()));
+ }
if args.dry_run {
crate::cli::commands::dry_run::print(&mappings, verbose);
@@ -157,6 +345,18 @@ pub(crate) async fn run(
.map_or(DEFAULT_MAX_CONCURRENT_TRANSFERS, |g| {
g.max_concurrent_transfers
});
+ // Capture per-mapping metadata before the engine consumes `mappings`.
+ // Used to emit one INFO line per mapping after the engine returns,
+ // grouped from the report's per-image outcomes.
+ let descriptors: Vec = mappings
+ .iter()
+ .map(|m| MappingDescriptor {
+ from: m.source_repo.as_str().to_string(),
+ target_repo: m.target_repo.as_str().to_string(),
+ target_names: m.targets.iter().map(|t| (*t.name).to_string()).collect(),
+ })
+ .collect();
+
let engine = SyncEngine::new(RetryConfig::default(), max_concurrent);
let report = engine
.run(mappings, cache.clone(), staging, progress, shutdown)
@@ -169,11 +369,180 @@ pub(crate) async fn run(
}
}
+ emit_mapping_outcomes(&descriptors, &report, watch_log.as_deref_mut());
+ // Watch mode: suppress the cycle tail when no per-mapping line emitted
+ // (steady-state idle); sync mode: always emit as the final marker.
+ let cycle_had_activity = watch_log
+ .as_deref()
+ .is_none_or(|s| s.cycle_emit_count() > 0);
+ if cycle_had_activity {
+ emit_cycle_tail(&descriptors, &report);
+ }
+
write_output(&report, args.json)?;
Ok(ExitCode::from_report(report.exit_code()))
}
+/// Per-mapping metadata captured before the engine consumes `mappings`,
+/// so we can join it with the engine's per-image report after the fact
+/// to emit one log line per mapping (with source/target context).
+struct MappingDescriptor {
+ from: String,
+ target_repo: String,
+ target_names: Vec,
+}
+
+/// Per-mapping aggregated outcome derived from [`SyncReport.images`].
+/// Used for log emission and watch-mode change detection.
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
+pub(crate) struct MappingOutcome {
+ pub synced: u64,
+ pub skipped: u64,
+ pub failed: u64,
+ pub bytes: u64,
+}
+
+impl MappingOutcome {
+ fn is_empty(&self) -> bool {
+ self.synced == 0 && self.skipped == 0 && self.failed == 0
+ }
+}
+
+/// Emit one INFO (or WARN, on failures) per mapping summarizing what its
+/// configured tags did this cycle. In watch mode (when `watch_log` is
+/// `Some`), suppress mappings whose outcome is unchanged from the prior
+/// cycle so steady-state pods log only on transition.
+fn emit_mapping_outcomes(
+ descriptors: &[MappingDescriptor],
+ report: &SyncReport,
+ mut watch_log: Option<&mut WatchLogState>,
+) {
+ for d in descriptors {
+ let outcome = aggregate_mapping_outcome(&d.from, &d.target_repo, report);
+ // No images for this mapping in the report (e.g. the mapping was
+ // resolved to zero tags by an upstream filter that the engine
+ // never saw). The no-tags WARN already covered it; skip here.
+ if outcome.is_empty() {
+ continue;
+ }
+ let recovered = match watch_log.as_deref_mut() {
+ Some(state) => match state.observe_mapping_outcome(&d.from, &outcome) {
+ Some(r) => r,
+ None => continue,
+ },
+ None => false,
+ };
+ let line = format_mapping_outcome(d, &outcome, recovered);
+ // `from` / `to` are intentionally NOT structured fields here -- the
+ // message already names them as `from -> to`, and tracing's text
+ // formatter would otherwise tail the line with a redundant
+ // `from=... to=...` block. The count fields remain because they
+ // carry zero values the terse message elides.
+ //
+ // The two arms differ only in level. `tracing::event!` would let
+ // us pick at runtime, but it requires a const-expression level.
+ if outcome.failed > 0 {
+ tracing::warn!(
+ synced = outcome.synced,
+ skipped = outcome.skipped,
+ failed = outcome.failed,
+ bytes = outcome.bytes,
+ recovered,
+ "{line}"
+ );
+ } else {
+ tracing::info!(
+ synced = outcome.synced,
+ skipped = outcome.skipped,
+ failed = outcome.failed,
+ bytes = outcome.bytes,
+ recovered,
+ "{line}"
+ );
+ }
+ }
+}
+
+fn aggregate_mapping_outcome(
+ source_repo: &str,
+ target_repo: &str,
+ report: &SyncReport,
+) -> MappingOutcome {
+ let src_prefix = format!("{source_repo}:");
+ let tgt_prefix = format!("{target_repo}:");
+ let mut o = MappingOutcome::default();
+ for r in &report.images {
+ if !(r.source.starts_with(&src_prefix) && r.target.starts_with(&tgt_prefix)) {
+ continue;
+ }
+ match r.status {
+ ocync_sync::ImageStatus::Synced => {
+ o.synced += 1;
+ o.bytes += r.bytes_transferred;
+ }
+ ocync_sync::ImageStatus::Skipped { .. } => o.skipped += 1,
+ ocync_sync::ImageStatus::Failed { .. } => o.failed += 1,
+ }
+ }
+ o
+}
+
+fn format_mapping_outcome(d: &MappingDescriptor, o: &MappingOutcome, recovered: bool) -> String {
+ let mut parts = Vec::with_capacity(3);
+ if o.synced > 0 {
+ parts.push(format!("synced {}", o.synced));
+ }
+ if o.skipped > 0 {
+ parts.push(format!("skipped {}", o.skipped));
+ }
+ if o.failed > 0 {
+ parts.push(format!("failed {}", o.failed));
+ }
+ let counts = parts.join(", ");
+ let bytes_clause = if o.bytes > 0 {
+ format!(" ({})", format_bytes(o.bytes))
+ } else {
+ String::new()
+ };
+ let recovered_clause = if recovered { " [recovered]" } else { "" };
+ // Multi-target mappings need the bracket to disambiguate which targets
+ // the line refers to. Single-target mappings: omit -- the destination
+ // is already in the `from -> to` arrow.
+ let targets_clause = if d.target_names.len() > 1 {
+ format!(" [{}]", d.target_names.join(", "))
+ } else {
+ String::new()
+ };
+ format!(
+ "{} -> {}{targets_clause}: {counts}{bytes_clause}{recovered_clause}",
+ d.from, d.target_repo
+ )
+}
+
+/// One-line cycle tail rolling up totals across all mappings. The caller
+/// is responsible for gating this in watch mode (skip on idle cycles).
+fn emit_cycle_tail(descriptors: &[MappingDescriptor], report: &SyncReport) {
+ let s = &report.stats;
+ let line = format!(
+ "summary: {} mappings | {} synced, {} skipped, {} failed | {} in {}",
+ descriptors.len(),
+ s.images_synced,
+ s.images_skipped,
+ s.images_failed,
+ format_bytes(s.bytes_transferred),
+ format_duration(report.duration),
+ );
+ // Counts are already in the message; structured fields would be a
+ // verbatim restatement in text output. JSON aggregators parse the
+ // message (or use the SyncReport via `--json`).
+ if s.images_failed > 0 {
+ tracing::warn!("{line}");
+ } else {
+ tracing::info!("{line}");
+ }
+}
+
/// Parse a human-readable duration string into a [`Duration`].
///
/// Accepts:
@@ -273,18 +642,19 @@ async fn build_batch_checkers(
Ok(checkers)
}
-/// Resolve a single mapping config into a `ResolvedMapping`, or `None` if no
-/// tags survive filtering.
+/// Resolve a single mapping config into a [`MappingResolution`].
///
-/// Falls back to `defaults.source`, `defaults.targets`, and `defaults.tags`
-/// when the mapping does not specify its own values.
+/// Returns [`MappingResolution::Resolved`] when at least one tag survives the
+/// filter pipeline, or [`MappingResolution::NoMatchingTags`] carrying the
+/// diagnostic context the caller needs to render a WARN. Pulls fallbacks from
+/// `defaults.source`, `defaults.targets`, and `defaults.tags`.
pub(crate) async fn resolve_mapping(
mapping: &MappingConfig,
config: &Config,
clients: &HashMap>,
batch_checkers: &HashMap>,
with_report: bool,
-) -> Result