Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions connectors/ncbivirus/ncbivirus.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.

searchPath := len(st.IDs) == 0

// limit is the caller's hard ceiling (NormalizedQuery.Limit), honored as a WALK
// CEILING per the contract: a huge upstream is never fully fetched for a small
// limit. A limit below the authoritative total makes the result necessarily
// incomplete (the Complete gate below still requires fetched == authority), so a
// limit can never produce a false Complete — only a BestEffort partial. 0 = unlimited.
limit := p.Query.Limit

var (
ids []string
startCount, endCount, authority int64
Expand Down Expand Up @@ -294,11 +301,21 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.
seen[id] = true
ids = append(ids, id)
}
// Terminate as soon as the distinct set covers the authoritative total, or a
// page returns no rows and no next token.
if authority > 0 && int64(len(ids)) >= authority {
// Walk ceiling: once the distinct set reaches the caller's limit, no further
// pages are needed — but ONLY when the limit is STRICTLY below the authoritative
// total. That guard is load-bearing: it guarantees a limited walk is always
// incomplete (fetched <= limit < authority => BestEffort), so the limit can
// never mask an over-observation into a false Complete. When limit >= authority
// the limit is non-constraining and the normal cover/total logic applies.
if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) >= limit {
break
}
// Drain the cursor to genuine exhaustion (no next-page token) rather than
// stopping as soon as the distinct set covers the authoritative total: a total
// that UNDER-reports while the cursor keeps emitting pages would otherwise yield
// a false Complete (and a hash that depends on which records the server placed
// before the token). The total-derived pageBudget bounds a runaway cursor, and
// the observedDistinct == authority gate downgrades any over-/under-enumeration.
if nextToken == "" {
break
}
Expand All @@ -313,8 +330,26 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.
// list before fetching. This (with Normalize's slice sorting + the per-accession
// re-keying below) discards the server's release-date-desc list order and its
// multi-accession reordering before anything is hashed.
// observedDistinct is the number of DISTINCT accessions the list walk enumerated,
// captured BEFORE any limit truncation. A Complete requires this to equal the
// authoritative total: enumerating MORE distinct accessions than the total claims is
// an inconsistency (an under-reported total) that must downgrade even when 404-skipped
// fetches happen to bring the retrieved count back down to the total.
observedDistinct := int64(len(ids))

sort.Strings(ids)

// Truncate to the first `limit` accessions in the imposed (sorted) order, but ONLY
// when limit is STRICTLY below the authoritative total (the same guard as the walk
// ceiling). This bounds the per-accession fetch for a small limit while guaranteeing
// the result stays incomplete (BestEffort). Truncating when limit == authority could
// drop an OVER-observed id (the page returned more rows than the total claims) and
// fake fetched == authority — a false Complete. When limit >= authority we keep every
// observed id so an under-reported total still downgrades.
if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) > limit {
ids = ids[:limit]
}

// Fetch one verbatim per-accession ndjson report line per DISTINCT accession.
var entryVersion string // first per-accession version seen (the get/resolve authority + search-path fallback)
raws := make([]contract.RawRecord, 0, len(ids))
Expand Down Expand Up @@ -370,7 +405,10 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.
}

fetched := int64(len(raws))
if st.Reconcile && totalKnown && !walkTruncated && !sawDuplicate && !totalMutated && !versionDrift && fetched == authority {
// limitApplied: the caller's limit constrained the walk below the authoritative
// total. Reported as the BestEffort reason (the count is still authoritative).
limitApplied := limit > 0 && totalKnown && authority > 0 && limit < authority
if st.Reconcile && totalKnown && !walkTruncated && !sawDuplicate && !totalMutated && !versionDrift && observedDistinct == authority && fetched == authority {
// Reconciled: the authoritative total was KNOWN and stable across the whole
// (bounded, re-list-free) walk, the version was coherent, and every record was
// fetched. Only then is the result a true Complete.
Expand All @@ -383,6 +421,8 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.
reason = fmt.Sprintf("ncbivirus: reconcile_count disabled; fetched %d (no authoritative total to reconcile)", fetched)
case !totalKnown:
reason = fmt.Sprintf("ncbivirus: upstream reported no authoritative x-ncbi-total-count; fetched %d — total unknown, cannot claim Complete", fetched)
case limitApplied:
reason = fmt.Sprintf("ncbivirus: stopped at the requested limit of %d (authoritative total %d) — partial by request, count not reconciled", limit, authority)
case walkTruncated:
reason = truncReason
case sawDuplicate:
Expand All @@ -391,10 +431,21 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.
reason = fmt.Sprintf("ncbivirus: x-ncbi-total-count mutated mid-walk (start=%d end=%d); fetched %d — count not reconciled", startCount, endCount, fetched)
case versionDrift:
reason = fmt.Sprintf("ncbivirus: x-datasets-version drifted mid-walk; fetched %d — version not coherent", fetched)
case observedDistinct != authority:
reason = fmt.Sprintf("ncbivirus: enumerated %d distinct accessions vs an authoritative total of %d — inconsistent enumeration, count not reconciled", observedDistinct, authority)
default:
reason = fmt.Sprintf("ncbivirus: fetched %d of %d (some accessions missing/404) — count not reconciled", fetched, authority)
}
return rs, contract.NewBestEffort(fetched, reason), nil
comp := contract.NewBestEffort(fetched, reason)
if limitApplied {
// The authoritative total IS known (the page-0 x-ncbi-total-count header); the
// walk was stopped at the caller's limit by request, not because the total is
// untrustworthy. Expose it so a caller asking only for a count gets the real
// total from a cheap limited fetch. (Other BestEffort cases leave it zero
// because their total is absent or proven unstable.)
comp.AuthoritativeCount = authority
}
return rs, comp, nil
}

// listPage performs one dataset_report list GET under a Governor permit and returns
Expand Down
166 changes: 166 additions & 0 deletions connectors/ncbivirus/ncbivirus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,172 @@ func TestRetrieve_CompleteReconcilesFromHeaderTotal(t *testing.T) {
}
}

// TestRetrieve_LimitBoundsWalk: a Query.Limit below the authoritative total is honored
// as a walk ceiling. The result is BestEffort (NOT a false Complete) truncated to Limit
// records, but still reports the authoritative total so a caller asking only for a count
// gets the real number from a cheap limited fetch.
func TestRetrieve_LimitBoundsWalk(t *testing.T) {
f := stdListDoer(t) // single page, 3 accessions, x-ncbi-total-count = 3
c := newConn(t, f)
q := searchQuery()
q.Limit = 1
p := retrievePlan(t, c, q)

rs, comp, err := c.Retrieve(p, noGov())
if err != nil {
t.Fatalf("Retrieve: %v", err)
}
if comp.State != contract.BestEffortState {
t.Fatalf("a Limit below the total must NOT Complete; want BestEffort, got %s (%s)", comp.State, comp.Reason)
}
if comp.ReconciledCount != 1 {
t.Fatalf("expected 1 record reconciled (the limit), got %d", comp.ReconciledCount)
}
if comp.AuthoritativeCount != 3 {
t.Fatalf("the authoritative total (3) must be reported even on a limited fetch, got %d", comp.AuthoritativeCount)
}
if len(rs.Records) != 1 {
t.Fatalf("records must be truncated to the limit (1), got %d", len(rs.Records))
}
}

// TestRetrieve_LimitAtOrAboveTotalStillCompletes is the over-rejection guard: a Limit
// that does not constrain the result (>= the authoritative total) must NOT downgrade a
// legitimately complete walk.
func TestRetrieve_LimitAtOrAboveTotalStillCompletes(t *testing.T) {
f := stdListDoer(t)
c := newConn(t, f)
q := searchQuery()
q.Limit = 5 // > total of 3
p := retrievePlan(t, c, q)

rs, comp, err := c.Retrieve(p, noGov())
if err != nil {
t.Fatalf("Retrieve: %v", err)
}
if comp.State != contract.CompleteState || comp.ReconciledCount != 3 || comp.AuthoritativeCount != 3 {
t.Fatalf("a non-constraining limit must still Complete(3,3), got %s %d/%d", comp.State, comp.ReconciledCount, comp.AuthoritativeCount)
}
if len(rs.Records) != 3 {
t.Fatalf("expected all 3 records, got %d", len(rs.Records))
}
}

// TestRetrieve_LimitStopsPaginationEarly proves the walk ceiling bounds the LIST walk
// itself: with a multi-page upstream and Limit=1, the connector stops after page 1 and
// never requests page 2 nor the skipped per-accession reports.
func TestRetrieve_LimitStopsPaginationEarly(t *testing.T) {
f := newFakeDoer()
listURL := listURLForTaxon(t, "2697049", 1000)
// page1 lists NC_045512.2 + NC_002549.1 (server order), total 3, a next-page token.
f.set("GET", listURL, 200, loadFixture(t, "list_page1.ndjson"), hdr("3", "TOKEN2", "18.30.1"))
// page2 is registered but must NEVER be fetched once the limit is reached on page 1.
page2URL := appendPageToken(listURL, "TOKEN2")
f.set("GET", page2URL, 200, loadFixture(t, "list_page2.ndjson"), hdr("3", "", "18.30.1"))
// After sort+truncate to 1, the single fetched accession is NC_002549.1.
f.set("GET", reportURL("NC_002549.1"), 200, loadFixture(t, "report_NC_002549.1.ndjson"), hdr("", "", "18.30.1"))
c := newConn(t, f)
q := searchQuery()
q.Limit = 1
p := retrievePlan(t, c, q)

rs, comp, err := c.Retrieve(p, noGov())
if err != nil {
t.Fatalf("Retrieve: %v", err)
}
if comp.State != contract.BestEffortState || comp.AuthoritativeCount != 3 || len(rs.Records) != 1 {
t.Fatalf("want BestEffort with authoritative 3 and 1 record, got %s auth=%d n=%d", comp.State, comp.AuthoritativeCount, len(rs.Records))
}
// Only page 1 + the one truncated report were fetched (2 calls): page 2 and the other
// reports were never requested — the walk ceiling stopped pagination early.
if f.callCount() != 2 {
t.Fatalf("walk ceiling must stop after page 1 (want 2 calls: list + 1 report), got %d", f.callCount())
}
}

// TestRetrieve_LimitEqualUnderReportedTotalStillDowngrades is the cross-model gate's
// catch: the list page returns MORE distinct accessions (3) than the authoritative total
// claims (2). A Limit equal to that under-reported total must NOT let truncation drop the
// extra id and fake fetched == authority — the over-observation must still downgrade to
// BestEffort. (Truncation only applies when limit < authority, so this stays a full,
// honest fetch that exposes the inconsistency.)
func TestRetrieve_LimitEqualUnderReportedTotalStillDowngrades(t *testing.T) {
f := newFakeDoer()
// 3 distinct accessions on the page, but x-ncbi-total-count claims only 2.
f.set("GET", listURLForTaxon(t, "2697049", 1000), 200, loadFixture(t, "list_3ids.ndjson"), hdr("2", "", "18.30.1"))
f.set("GET", reportURL("NC_045512.2"), 200, loadFixture(t, "report_NC_045512.2.ndjson"), hdr("", "", "18.30.1"))
f.set("GET", reportURL("NC_002549.1"), 200, loadFixture(t, "report_NC_002549.1.ndjson"), hdr("", "", "18.30.1"))
f.set("GET", reportURL("NC_001498.1"), 200, loadFixture(t, "report_NC_001498.1.ndjson"), hdr("", "", "18.30.1"))
c := newConn(t, f)
q := searchQuery()
q.Limit = 2 // == the under-reported total; truncation must NOT engage
p := retrievePlan(t, c, q)

rs, comp, err := c.Retrieve(p, noGov())
if err != nil {
t.Fatalf("Retrieve: %v", err)
}
if comp.State != contract.BestEffortState {
t.Fatalf("a limit equal to an under-reported total must not let truncation fake a Complete; want BestEffort, got %s (%s)", comp.State, comp.Reason)
}
if len(rs.Records) != 3 {
t.Fatalf("every observed accession must be kept (no truncation at limit==authority), got %d records", len(rs.Records))
}
}

// TestRetrieve_OverListedWith404StillDowngrades is the round-2 gate's catch — a
// pre-existing complete-or-fail gap, independent of Limit. The list page enumerates 3
// distinct accessions but the authoritative total claims only 2, and one over-listed
// accession 404s on fetch, so the retrieved count (2) coincidentally equals the total.
// Enumerating more distinct accessions than the total claims is an inconsistency that
// must downgrade: the Complete gate also requires the observed distinct count to equal
// the authoritative total.
func TestRetrieve_OverListedWith404StillDowngrades(t *testing.T) {
f := newFakeDoer()
f.set("GET", listURLForTaxon(t, "2697049", 1000), 200, loadFixture(t, "list_3ids.ndjson"), hdr("2", "", "18.30.1")) // 3 ids, total claims 2
f.set("GET", reportURL("NC_045512.2"), 200, loadFixture(t, "report_NC_045512.2.ndjson"), hdr("", "", "18.30.1"))
f.set("GET", reportURL("NC_002549.1"), 200, loadFixture(t, "report_NC_002549.1.ndjson"), hdr("", "", "18.30.1"))
f.set("GET", reportURL("NC_001498.1"), 404, nil, hdr("", "", "18.30.1")) // one over-listed accession 404s
c := newConn(t, f)
p := retrievePlan(t, c, searchQuery()) // no Limit — this guards the core complete-or-fail gate

_, comp, err := c.Retrieve(p, noGov())
if err != nil {
t.Fatalf("Retrieve: %v", err)
}
if comp.State != contract.BestEffortState {
t.Fatalf("observing 3 distinct accessions when the total claims 2 must downgrade even if a 404 makes fetched==total; got %s (%s)", comp.State, comp.Reason)
}
}

// TestRetrieve_TotalCoveredButCursorNotExhaustedDowngrades is the round-3 gate's catch:
// page 1's accessions already "cover" the header total (2), but page 1 ALSO emits a
// next-page token and page 2 enumerates a 3rd matching accession (the total under-
// reported). The walk must drain the cursor to exhaustion rather than stopping at
// len(ids) >= total, observe 3 distinct accessions, and downgrade — never Complete(2,2).
func TestRetrieve_TotalCoveredButCursorNotExhaustedDowngrades(t *testing.T) {
f := newFakeDoer()
listURL := listURLForTaxon(t, "2697049", 1000)
// page1: 2 accessions, total claims 2, BUT a next-page token is present.
f.set("GET", listURL, 200, loadFixture(t, "list_page1.ndjson"), hdr("2", "TOKEN2", "18.30.1"))
// page2: a 3rd accession, still total 2, last page.
page2URL := appendPageToken(listURL, "TOKEN2")
f.set("GET", page2URL, 200, loadFixture(t, "list_page2.ndjson"), hdr("2", "", "18.30.1"))
f.set("GET", reportURL("NC_045512.2"), 200, loadFixture(t, "report_NC_045512.2.ndjson"), hdr("", "", "18.30.1"))
f.set("GET", reportURL("NC_002549.1"), 200, loadFixture(t, "report_NC_002549.1.ndjson"), hdr("", "", "18.30.1"))
f.set("GET", reportURL("NC_001498.1"), 200, loadFixture(t, "report_NC_001498.1.ndjson"), hdr("", "", "18.30.1"))
c := newConn(t, f)
p := retrievePlan(t, c, searchQuery()) // no Limit

_, comp, err := c.Retrieve(p, noGov())
if err != nil {
t.Fatalf("Retrieve: %v", err)
}
if comp.State != contract.BestEffortState {
t.Fatalf("a covered-but-not-exhausted cursor (under-reported total) must downgrade, not Complete; got %s (%s)", comp.State, comp.Reason)
}
}

func TestRetrieve_TokenWalkCollectsAllPages(t *testing.T) {
f := newFakeDoer()
listURL := listURLForTaxon(t, "2697049", 1000)
Expand Down