diff --git a/connectors/ncbivirus/ncbivirus.go b/connectors/ncbivirus/ncbivirus.go index 89cf6ac..8a64f98 100644 --- a/connectors/ncbivirus/ncbivirus.go +++ b/connectors/ncbivirus/ncbivirus.go @@ -192,6 +192,13 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. searchPath := len(st.IDs) == 0 + // limit is the caller's hard ceiling (NormalizedQuery.Limit), honored as a WALK + // CEILING per the contract: a huge upstream is never fully fetched for a small + // limit. A limit below the authoritative total makes the result necessarily + // incomplete (the Complete gate below still requires fetched == authority), so a + // limit can never produce a false Complete — only a BestEffort partial. 0 = unlimited. + limit := p.Query.Limit + var ( ids []string startCount, endCount, authority int64 @@ -294,11 +301,21 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. seen[id] = true ids = append(ids, id) } - // Terminate as soon as the distinct set covers the authoritative total, or a - // page returns no rows and no next token. - if authority > 0 && int64(len(ids)) >= authority { + // Walk ceiling: once the distinct set reaches the caller's limit, no further + // pages are needed — but ONLY when the limit is STRICTLY below the authoritative + // total. That guard is load-bearing: it guarantees a limited walk is always + // incomplete (fetched <= limit < authority => BestEffort), so the limit can + // never mask an over-observation into a false Complete. When limit >= authority + // the limit is non-constraining and the normal cover/total logic applies. + if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) >= limit { break } + // Drain the cursor to genuine exhaustion (no next-page token) rather than + // stopping as soon as the distinct set covers the authoritative total: a total + // that UNDER-reports while the cursor keeps emitting pages would otherwise yield + // a false Complete (and a hash that depends on which records the server placed + // before the token). The total-derived pageBudget bounds a runaway cursor, and + // the observedDistinct == authority gate downgrades any over-/under-enumeration. if nextToken == "" { break } @@ -313,8 +330,26 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. // list before fetching. This (with Normalize's slice sorting + the per-accession // re-keying below) discards the server's release-date-desc list order and its // multi-accession reordering before anything is hashed. + // observedDistinct is the number of DISTINCT accessions the list walk enumerated, + // captured BEFORE any limit truncation. A Complete requires this to equal the + // authoritative total: enumerating MORE distinct accessions than the total claims is + // an inconsistency (an under-reported total) that must downgrade even when 404-skipped + // fetches happen to bring the retrieved count back down to the total. + observedDistinct := int64(len(ids)) + sort.Strings(ids) + // Truncate to the first `limit` accessions in the imposed (sorted) order, but ONLY + // when limit is STRICTLY below the authoritative total (the same guard as the walk + // ceiling). This bounds the per-accession fetch for a small limit while guaranteeing + // the result stays incomplete (BestEffort). Truncating when limit == authority could + // drop an OVER-observed id (the page returned more rows than the total claims) and + // fake fetched == authority — a false Complete. When limit >= authority we keep every + // observed id so an under-reported total still downgrades. + if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) > limit { + ids = ids[:limit] + } + // Fetch one verbatim per-accession ndjson report line per DISTINCT accession. var entryVersion string // first per-accession version seen (the get/resolve authority + search-path fallback) raws := make([]contract.RawRecord, 0, len(ids)) @@ -370,7 +405,10 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. } fetched := int64(len(raws)) - if st.Reconcile && totalKnown && !walkTruncated && !sawDuplicate && !totalMutated && !versionDrift && fetched == authority { + // limitApplied: the caller's limit constrained the walk below the authoritative + // total. Reported as the BestEffort reason (the count is still authoritative). + limitApplied := limit > 0 && totalKnown && authority > 0 && limit < authority + if st.Reconcile && totalKnown && !walkTruncated && !sawDuplicate && !totalMutated && !versionDrift && observedDistinct == authority && fetched == authority { // Reconciled: the authoritative total was KNOWN and stable across the whole // (bounded, re-list-free) walk, the version was coherent, and every record was // fetched. Only then is the result a true Complete. @@ -383,6 +421,8 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. reason = fmt.Sprintf("ncbivirus: reconcile_count disabled; fetched %d (no authoritative total to reconcile)", fetched) case !totalKnown: reason = fmt.Sprintf("ncbivirus: upstream reported no authoritative x-ncbi-total-count; fetched %d — total unknown, cannot claim Complete", fetched) + case limitApplied: + reason = fmt.Sprintf("ncbivirus: stopped at the requested limit of %d (authoritative total %d) — partial by request, count not reconciled", limit, authority) case walkTruncated: reason = truncReason case sawDuplicate: @@ -391,10 +431,21 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. reason = fmt.Sprintf("ncbivirus: x-ncbi-total-count mutated mid-walk (start=%d end=%d); fetched %d — count not reconciled", startCount, endCount, fetched) case versionDrift: reason = fmt.Sprintf("ncbivirus: x-datasets-version drifted mid-walk; fetched %d — version not coherent", fetched) + case observedDistinct != authority: + reason = fmt.Sprintf("ncbivirus: enumerated %d distinct accessions vs an authoritative total of %d — inconsistent enumeration, count not reconciled", observedDistinct, authority) default: reason = fmt.Sprintf("ncbivirus: fetched %d of %d (some accessions missing/404) — count not reconciled", fetched, authority) } - return rs, contract.NewBestEffort(fetched, reason), nil + comp := contract.NewBestEffort(fetched, reason) + if limitApplied { + // The authoritative total IS known (the page-0 x-ncbi-total-count header); the + // walk was stopped at the caller's limit by request, not because the total is + // untrustworthy. Expose it so a caller asking only for a count gets the real + // total from a cheap limited fetch. (Other BestEffort cases leave it zero + // because their total is absent or proven unstable.) + comp.AuthoritativeCount = authority + } + return rs, comp, nil } // listPage performs one dataset_report list GET under a Governor permit and returns diff --git a/connectors/ncbivirus/ncbivirus_test.go b/connectors/ncbivirus/ncbivirus_test.go index ccad71b..3e74908 100644 --- a/connectors/ncbivirus/ncbivirus_test.go +++ b/connectors/ncbivirus/ncbivirus_test.go @@ -330,6 +330,172 @@ func TestRetrieve_CompleteReconcilesFromHeaderTotal(t *testing.T) { } } +// TestRetrieve_LimitBoundsWalk: a Query.Limit below the authoritative total is honored +// as a walk ceiling. The result is BestEffort (NOT a false Complete) truncated to Limit +// records, but still reports the authoritative total so a caller asking only for a count +// gets the real number from a cheap limited fetch. +func TestRetrieve_LimitBoundsWalk(t *testing.T) { + f := stdListDoer(t) // single page, 3 accessions, x-ncbi-total-count = 3 + c := newConn(t, f) + q := searchQuery() + q.Limit = 1 + p := retrievePlan(t, c, q) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.BestEffortState { + t.Fatalf("a Limit below the total must NOT Complete; want BestEffort, got %s (%s)", comp.State, comp.Reason) + } + if comp.ReconciledCount != 1 { + t.Fatalf("expected 1 record reconciled (the limit), got %d", comp.ReconciledCount) + } + if comp.AuthoritativeCount != 3 { + t.Fatalf("the authoritative total (3) must be reported even on a limited fetch, got %d", comp.AuthoritativeCount) + } + if len(rs.Records) != 1 { + t.Fatalf("records must be truncated to the limit (1), got %d", len(rs.Records)) + } +} + +// TestRetrieve_LimitAtOrAboveTotalStillCompletes is the over-rejection guard: a Limit +// that does not constrain the result (>= the authoritative total) must NOT downgrade a +// legitimately complete walk. +func TestRetrieve_LimitAtOrAboveTotalStillCompletes(t *testing.T) { + f := stdListDoer(t) + c := newConn(t, f) + q := searchQuery() + q.Limit = 5 // > total of 3 + p := retrievePlan(t, c, q) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.CompleteState || comp.ReconciledCount != 3 || comp.AuthoritativeCount != 3 { + t.Fatalf("a non-constraining limit must still Complete(3,3), got %s %d/%d", comp.State, comp.ReconciledCount, comp.AuthoritativeCount) + } + if len(rs.Records) != 3 { + t.Fatalf("expected all 3 records, got %d", len(rs.Records)) + } +} + +// TestRetrieve_LimitStopsPaginationEarly proves the walk ceiling bounds the LIST walk +// itself: with a multi-page upstream and Limit=1, the connector stops after page 1 and +// never requests page 2 nor the skipped per-accession reports. +func TestRetrieve_LimitStopsPaginationEarly(t *testing.T) { + f := newFakeDoer() + listURL := listURLForTaxon(t, "2697049", 1000) + // page1 lists NC_045512.2 + NC_002549.1 (server order), total 3, a next-page token. + f.set("GET", listURL, 200, loadFixture(t, "list_page1.ndjson"), hdr("3", "TOKEN2", "18.30.1")) + // page2 is registered but must NEVER be fetched once the limit is reached on page 1. + page2URL := appendPageToken(listURL, "TOKEN2") + f.set("GET", page2URL, 200, loadFixture(t, "list_page2.ndjson"), hdr("3", "", "18.30.1")) + // After sort+truncate to 1, the single fetched accession is NC_002549.1. + f.set("GET", reportURL("NC_002549.1"), 200, loadFixture(t, "report_NC_002549.1.ndjson"), hdr("", "", "18.30.1")) + c := newConn(t, f) + q := searchQuery() + q.Limit = 1 + p := retrievePlan(t, c, q) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.BestEffortState || comp.AuthoritativeCount != 3 || len(rs.Records) != 1 { + t.Fatalf("want BestEffort with authoritative 3 and 1 record, got %s auth=%d n=%d", comp.State, comp.AuthoritativeCount, len(rs.Records)) + } + // Only page 1 + the one truncated report were fetched (2 calls): page 2 and the other + // reports were never requested — the walk ceiling stopped pagination early. + if f.callCount() != 2 { + t.Fatalf("walk ceiling must stop after page 1 (want 2 calls: list + 1 report), got %d", f.callCount()) + } +} + +// TestRetrieve_LimitEqualUnderReportedTotalStillDowngrades is the cross-model gate's +// catch: the list page returns MORE distinct accessions (3) than the authoritative total +// claims (2). A Limit equal to that under-reported total must NOT let truncation drop the +// extra id and fake fetched == authority — the over-observation must still downgrade to +// BestEffort. (Truncation only applies when limit < authority, so this stays a full, +// honest fetch that exposes the inconsistency.) +func TestRetrieve_LimitEqualUnderReportedTotalStillDowngrades(t *testing.T) { + f := newFakeDoer() + // 3 distinct accessions on the page, but x-ncbi-total-count claims only 2. + f.set("GET", listURLForTaxon(t, "2697049", 1000), 200, loadFixture(t, "list_3ids.ndjson"), hdr("2", "", "18.30.1")) + f.set("GET", reportURL("NC_045512.2"), 200, loadFixture(t, "report_NC_045512.2.ndjson"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("NC_002549.1"), 200, loadFixture(t, "report_NC_002549.1.ndjson"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("NC_001498.1"), 200, loadFixture(t, "report_NC_001498.1.ndjson"), hdr("", "", "18.30.1")) + c := newConn(t, f) + q := searchQuery() + q.Limit = 2 // == the under-reported total; truncation must NOT engage + p := retrievePlan(t, c, q) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.BestEffortState { + t.Fatalf("a limit equal to an under-reported total must not let truncation fake a Complete; want BestEffort, got %s (%s)", comp.State, comp.Reason) + } + if len(rs.Records) != 3 { + t.Fatalf("every observed accession must be kept (no truncation at limit==authority), got %d records", len(rs.Records)) + } +} + +// TestRetrieve_OverListedWith404StillDowngrades is the round-2 gate's catch — a +// pre-existing complete-or-fail gap, independent of Limit. The list page enumerates 3 +// distinct accessions but the authoritative total claims only 2, and one over-listed +// accession 404s on fetch, so the retrieved count (2) coincidentally equals the total. +// Enumerating more distinct accessions than the total claims is an inconsistency that +// must downgrade: the Complete gate also requires the observed distinct count to equal +// the authoritative total. +func TestRetrieve_OverListedWith404StillDowngrades(t *testing.T) { + f := newFakeDoer() + f.set("GET", listURLForTaxon(t, "2697049", 1000), 200, loadFixture(t, "list_3ids.ndjson"), hdr("2", "", "18.30.1")) // 3 ids, total claims 2 + f.set("GET", reportURL("NC_045512.2"), 200, loadFixture(t, "report_NC_045512.2.ndjson"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("NC_002549.1"), 200, loadFixture(t, "report_NC_002549.1.ndjson"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("NC_001498.1"), 404, nil, hdr("", "", "18.30.1")) // one over-listed accession 404s + c := newConn(t, f) + p := retrievePlan(t, c, searchQuery()) // no Limit — this guards the core complete-or-fail gate + + _, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.BestEffortState { + t.Fatalf("observing 3 distinct accessions when the total claims 2 must downgrade even if a 404 makes fetched==total; got %s (%s)", comp.State, comp.Reason) + } +} + +// TestRetrieve_TotalCoveredButCursorNotExhaustedDowngrades is the round-3 gate's catch: +// page 1's accessions already "cover" the header total (2), but page 1 ALSO emits a +// next-page token and page 2 enumerates a 3rd matching accession (the total under- +// reported). The walk must drain the cursor to exhaustion rather than stopping at +// len(ids) >= total, observe 3 distinct accessions, and downgrade — never Complete(2,2). +func TestRetrieve_TotalCoveredButCursorNotExhaustedDowngrades(t *testing.T) { + f := newFakeDoer() + listURL := listURLForTaxon(t, "2697049", 1000) + // page1: 2 accessions, total claims 2, BUT a next-page token is present. + f.set("GET", listURL, 200, loadFixture(t, "list_page1.ndjson"), hdr("2", "TOKEN2", "18.30.1")) + // page2: a 3rd accession, still total 2, last page. + page2URL := appendPageToken(listURL, "TOKEN2") + f.set("GET", page2URL, 200, loadFixture(t, "list_page2.ndjson"), hdr("2", "", "18.30.1")) + f.set("GET", reportURL("NC_045512.2"), 200, loadFixture(t, "report_NC_045512.2.ndjson"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("NC_002549.1"), 200, loadFixture(t, "report_NC_002549.1.ndjson"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("NC_001498.1"), 200, loadFixture(t, "report_NC_001498.1.ndjson"), hdr("", "", "18.30.1")) + c := newConn(t, f) + p := retrievePlan(t, c, searchQuery()) // no Limit + + _, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.BestEffortState { + t.Fatalf("a covered-but-not-exhausted cursor (under-reported total) must downgrade, not Complete; got %s (%s)", comp.State, comp.Reason) + } +} + func TestRetrieve_TokenWalkCollectsAllPages(t *testing.T) { f := newFakeDoer() listURL := listURLForTaxon(t, "2697049", 1000)