From 91ad0aa022f55848f22d50220e87020dfd27aa1f Mon Sep 17 00:00:00 2001 From: Tristan Farmer <159447266+001TMF@users.noreply.github.com> Date: Sun, 14 Jun 2026 21:35:55 +0100 Subject: [PATCH] fix(connectors): honor --limit + fix gzip/taxon-id/multi-word terms (uniprot, pdb, clinvar) Three connectors ignored Query.Limit and walked/fetched the full result set even for a count query (--limit 1), causing timeouts; each also had a connector-specific bug. Mirrors the ncbi-virus limit-as-walk-ceiling pattern (#55); complete-or-fail is preserved (the strict `limit < authority` guard makes a limited walk always BestEffort, never a false Complete). uniprot: - decompress gzip response bodies in readBody (Content-Encoding OR 0x1f8b magic sniff); UniProt gzips its JSON and it arrived undecompressed, failing json decode with '\x1f'. - honor Query.Limit as a walk ceiling; expose the authoritative total on the BestEffort path. pdb: - map organism_taxon_id to rcsb_entity_source_organism.taxonomy_lineage.id (string `in`); the prior attribute is search-disabled upstream (HTTP 400 for every operator). - honor Query.Limit as a walk ceiling; cap getEntry to the limited id set (it was fetching every entry of the full total even for --limit 1). clinvar: - honor Query.Limit as a walk ceiling on the esearch idlist walk. - quote multi-word Entrez values ("Uncertain significance"); unquoted multi-word values were mis-tokenized by esearch and silently returned 0. Per-connector regression tests added, each proven non-vacuous by revert. make ci green. Cross-model (codex) gate deferred (ChatGPT usage limit); covered by an adversarial verify pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- connectors/clinvar/clinvar.go | 68 ++++++++++- connectors/clinvar/clinvar_test.go | 153 ++++++++++++++++++++++- connectors/pdb/pdb.go | 95 ++++++++++++++- connectors/pdb/pdb_test.go | 178 +++++++++++++++++++++++++++ connectors/uniprot/uniprot.go | 97 ++++++++++++++- connectors/uniprot/uniprot_test.go | 187 +++++++++++++++++++++++++++++ 6 files changed, 766 insertions(+), 12 deletions(-) diff --git a/connectors/clinvar/clinvar.go b/connectors/clinvar/clinvar.go index 250656b..bf3b2b4 100644 --- a/connectors/clinvar/clinvar.go +++ b/connectors/clinvar/clinvar.go @@ -196,6 +196,17 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. searchPath := len(st.IDs) == 0 + // limit is the caller's hard ceiling (NormalizedQuery.Limit), honored as a WALK + // CEILING per the contract: a huge upstream (e.g. BRCA1 pathogenic, thousands of + // VariationIDs) is never fully fetched for a small limit such as a --limit 1 count + // query. esearch's Count is read from the FIRST page (the authoritative total) and + // the idlist walk stops once `limit` ids are collected, so a count query returns in + // a few seconds instead of timing out walking + efetching the whole idlist. A limit + // below the authoritative count makes the result necessarily incomplete (the + // Complete gate below still requires fetched == authority), so a limit can never + // produce a false Complete — only a BestEffort partial. 0 = unlimited. + limit := p.Query.Limit + // SourceVersion authority #1: read the ClinVar ReleaseDate at walk START. It is // re-read at walk END below; a change between the two reads means a new release // landed under us, which downgrades to BestEffort. @@ -282,6 +293,17 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. ids = append(ids, id) added++ } + // Walk ceiling: once the distinct set reaches the caller's limit, no further + // esearch pages are needed — but ONLY when the limit is STRICTLY below the + // authoritative count. That guard is load-bearing: it guarantees a limited walk + // is always incomplete (fetched <= limit < authority => BestEffort), so the + // limit can never mask an over-observation into a false Complete. When + // limit >= authority the limit is non-constraining and the normal cover/total + // logic applies. This is what makes a --limit 1 count query over a large gene + // stop after the first page instead of paging + efetching the whole idlist. + if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) >= limit { + break + } // Terminate as soon as the distinct set covers the authoritative count. if totalKnown && authority > 0 && int64(len(ids)) >= authority { break @@ -310,6 +332,17 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. // (not lexically) so "14" precedes "18180" rather than following it. sortVariationIDsAsc(ids) + // Truncate to the first `limit` VariationIDs in the imposed (sorted) order, but + // ONLY when limit is STRICTLY below the authoritative count (the same guard as the + // walk ceiling). This bounds the per-VariationID efetch for a small limit while + // guaranteeing the result stays incomplete (fetched <= limit < authority => + // BestEffort). Truncating when limit >= authority could drop a legitimately-fetched + // id; keeping the guard at limit < authority means a non-constraining limit never + // perturbs a full, reconcilable walk. + if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) > limit { + ids = ids[:limit] + } + // Fetch one verbatim efetch body per DISTINCT VariationID, in // the sorted order. A 200 body with no element is a MISSING // record (the empty case) — skip @@ -359,6 +392,11 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. } fetched := int64(len(raws)) + // limitApplied: the caller's limit constrained the walk below the authoritative + // count. Reported as the BestEffort reason (the count is still authoritative — the + // esearch Count from page 0). totalKnown gates it so an absent count never claims an + // authoritative total. + limitApplied := limit > 0 && totalKnown && authority > 0 && limit < authority if st.Reconcile && totalKnown && releaseKnown && !walkTruncated && !sawDuplicate && !totalMutated && !releaseDrift && fetched == authority { // Reconciled: the authoritative count was KNOWN and stable across the whole // (bounded, re-list-free) walk, the ReleaseDate was coherent start-to-end, and @@ -374,6 +412,8 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. reason = fmt.Sprintf("clinvar: esearch reported no authoritative count; fetched %d — total unknown, cannot claim Complete", fetched) case !releaseKnown: reason = fmt.Sprintf("clinvar: release-header read reported no ReleaseDate; fetched %d — source version unknown, cannot claim Complete", fetched) + case limitApplied: + reason = fmt.Sprintf("clinvar: stopped at the requested limit of %d (authoritative total %d) — partial by request, count not reconciled", limit, authority) case walkTruncated: reason = truncReason case sawDuplicate: @@ -385,7 +425,16 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. default: reason = fmt.Sprintf("clinvar: fetched %d of %d (some VariationIDs missing — empty ) — count not reconciled", fetched, authority) } - return rs, contract.NewBestEffort(fetched, reason), nil + comp := contract.NewBestEffort(fetched, reason) + if limitApplied { + // The authoritative total IS known (the page-0 esearch Count); the walk was + // stopped at the caller's limit by request, not because the total is + // untrustworthy. Expose it so a caller asking only for a count gets the real total + // from a cheap limited fetch (the --limit 1 count-query fast path). Other + // BestEffort cases leave it zero because their total is absent or proven unstable. + comp.AuthoritativeCount = authority + } + return rs, comp, nil } // esearchResponse is the typed view over the esearch 200 JSON body. Only the @@ -1171,6 +1220,19 @@ func filtersToTerm(filters []contract.Filter) (string, error) { return strings.Join(terms, " AND "), nil } +// entrezValue prepares a filter value for an Entrez "value[field]" term. A value +// containing whitespace (e.g. "Uncertain significance", "Likely pathogenic") MUST be +// double-quoted, otherwise Entrez tokenizes it across the field boundary +// ("Uncertain significance[clinical significance]" parses as two terms and silently +// returns the wrong/zero count). A single-word value is returned unchanged so existing +// terms remain byte-identical. An already-quoted value is left as-is. +func entrezValue(v string) string { + if strings.ContainsAny(v, " \t") && !(strings.HasPrefix(v, "\"") && strings.HasSuffix(v, "\"")) { + return "\"" + v + "\"" + } + return v +} + // renderTerm renders one filter into an Entrez term. A multi-value `in` (a // comma-separated set) is expanded into a parenthesized OR group // "(v1[field] OR v2[field])" — valid Entrez syntax — rather than a single @@ -1184,7 +1246,7 @@ func renderTerm(r rule, value string) (string, error) { if p == "" { continue } - ors = append(ors, p+"["+r.EntrezField+"]") + ors = append(ors, entrezValue(p)+"["+r.EntrezField+"]") } switch len(ors) { case 0: @@ -1195,7 +1257,7 @@ func renderTerm(r rule, value string) (string, error) { return "(" + strings.Join(ors, " OR ") + ")", nil } } - return value + "[" + r.EntrezField + "]", nil + return entrezValue(value) + "[" + r.EntrezField + "]", nil } // --- filter semantics ------------------------------------------------------ diff --git a/connectors/clinvar/clinvar_test.go b/connectors/clinvar/clinvar_test.go index af3ecda..6c0ddf5 100644 --- a/connectors/clinvar/clinvar_test.go +++ b/connectors/clinvar/clinvar_test.go @@ -281,20 +281,30 @@ func TestFiltersToTerm(t *testing.T) { want: "HFE[gene]", }, { - name: "variation_type eq", + name: "variation_type eq (multi-word value is quoted)", filters: []contract.Filter{{Field: "variation_type", Operator: "eq", Value: "single nucleotide variant"}}, - want: "single nucleotide variant[type of variation]", + want: "\"single nucleotide variant\"[type of variation]", }, { name: "clinical_significance in single", filters: []contract.Filter{{Field: "clinical_significance", Operator: "in", Value: "pathogenic"}}, want: "pathogenic[clinical significance]", }, + { + name: "clinical_significance in multi-word value is quoted", + filters: []contract.Filter{{Field: "clinical_significance", Operator: "in", Value: "Uncertain significance"}}, + want: "\"Uncertain significance\"[clinical significance]", + }, { name: "clinical_significance in multi -> OR group", filters: []contract.Filter{{Field: "clinical_significance", Operator: "in", Value: "pathogenic,benign"}}, want: "(pathogenic[clinical significance] OR benign[clinical significance])", }, + { + name: "clinical_significance in OR group quotes multi-word members", + filters: []contract.Filter{{Field: "clinical_significance", Operator: "in", Value: "Pathogenic,Likely pathogenic"}}, + want: "(Pathogenic[clinical significance] OR \"Likely pathogenic\"[clinical significance])", + }, { name: "AND join in filter order", filters: []contract.Filter{ @@ -497,6 +507,145 @@ func TestRetrieve_MultiPageWalk(t *testing.T) { } } +// --- Retrieve: limit as a walk ceiling ------------------------------------- + +// TestRetrieve_LimitBoundsWalk: a Query.Limit below the authoritative esearch Count +// is honored as a walk ceiling. The result is BestEffort (NOT a false Complete) +// truncated to Limit records, but STILL reports the authoritative total (esearch's +// Count from the FIRST page) so a caller asking only for a count gets the real number +// from a cheap limited fetch — the --limit 1 count-query fast path for a large gene. +func TestRetrieve_LimitBoundsWalk(t *testing.T) { + c := newConn(t, completeDoer(t, hfeTerm)) // single esearch page, count=2, both efetch bodies + q := hfeSearch() + q.Limit = 1 + p := planFor(t, c, q) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.BestEffortState { + t.Fatalf("a Limit below the total must NOT Complete; want BestEffort, got %s (%s)", comp.State, comp.Reason) + } + if comp.ReconciledCount != 1 { + t.Fatalf("expected 1 record reconciled (the limit), got %d", comp.ReconciledCount) + } + if comp.AuthoritativeCount != 2 { + t.Fatalf("the authoritative esearch Count (2) must be reported even on a limited fetch, got %d", comp.AuthoritativeCount) + } + if len(rs.Records) != 1 { + t.Fatalf("records must be truncated to the limit (1), got %d", len(rs.Records)) + } + // The truncation is over the imposed ASCENDING numeric sort, so the single kept id + // is the smallest VariationID (14 -> VCV000000014), not the server's newest-first 18180. + if rs.Records[0].SourceAccession != "VCV000000014" { + t.Fatalf("the kept record must be the first in ascending sorted order (VCV000000014), got %q", rs.Records[0].SourceAccession) + } + if !strings.Contains(comp.Reason, "limit") { + t.Fatalf("reason should explain the limit stop, got %q", comp.Reason) + } +} + +// TestRetrieve_LimitAtOrAboveTotalStillCompletes is the over-rejection guard: a Limit +// that does not constrain the result (>= the authoritative Count) must NOT downgrade a +// legitimately complete walk to BestEffort. +func TestRetrieve_LimitAtOrAboveTotalStillCompletes(t *testing.T) { + c := newConn(t, completeDoer(t, hfeTerm)) // count=2 + q := hfeSearch() + q.Limit = 5 // > total of 2 + p := planFor(t, c, q) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.CompleteState || comp.ReconciledCount != 2 || comp.AuthoritativeCount != 2 { + t.Fatalf("a non-constraining limit must still Complete(2,2), got %s %d/%d", comp.State, comp.ReconciledCount, comp.AuthoritativeCount) + } + if len(rs.Records) != 2 { + t.Fatalf("expected all 2 records, got %d", len(rs.Records)) + } +} + +// TestRetrieve_LimitStopsPaginationEarly proves the walk ceiling bounds the esearch +// idlist walk ITSELF: with a multi-page upstream (page size 1, count=2) and Limit=1, +// the connector stops after the FIRST esearch page and never requests esearch page 2 +// nor the second efetch. This is the load-bearing behavior — without the ceiling a +// --limit 1 count query over a large gene pages + efetches the whole idlist and times +// out. The authoritative Count (2) from page 1 is still reported. +func TestRetrieve_LimitStopsPaginationEarly(t *testing.T) { + f := newFakeDoer() + f.set("GET", releaseURL(), 206, loadFixture(t, "release_header.xml.gz"), nil) + // page1 lists 18180 (server newest-first order), count=2, retmax=1 (a full page). + f.set("GET", esearchURL(hfeTerm, 0, 1), 200, loadFixture(t, "esearch_page1.json"), nil) + // page2 IS registered but must NEVER be requested once the limit is reached on page 1. + f.set("GET", esearchURL(hfeTerm, 1, 1), 200, loadFixture(t, "esearch_page2.json"), nil) + // After collecting id 18180 (the only page-1 id) the walk ceiling fires (limit 1 < + // authority 2); the single fetched id is 18180. + f.set("GET", efetchURL("18180"), 200, loadFixture(t, "efetch_18180.xml"), nil) + // efetch_14 IS registered but must NEVER be requested (14 was never enumerated). + f.set("GET", efetchURL("14"), 200, loadFixture(t, "efetch_14.xml"), nil) + + c, err := New(specWithPageSize(1), f) + if err != nil { + t.Fatalf("New: %v", err) + } + q := hfeSearch() + q.Limit = 1 + p := planFor(t, c, q) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.BestEffortState || comp.AuthoritativeCount != 2 || len(rs.Records) != 1 { + t.Fatalf("want BestEffort with authoritative 2 and 1 record, got %s auth=%d n=%d", comp.State, comp.AuthoritativeCount, len(rs.Records)) + } + // Prove esearch page 2 and the second efetch were NEVER requested: the walk ceiling + // stopped pagination after page 1. Expected calls: release(start), esearch page1, + // efetch 18180, release(end) = 4. + for _, req := range f.requestList() { + if strings.Contains(req, "retstart=1") { + t.Fatalf("walk ceiling must NOT request esearch page 2 (retstart=1), saw %q", req) + } + if strings.Contains(req, efetchURL("14")) { + t.Fatalf("walk ceiling must NOT efetch the un-enumerated id 14, saw %q", req) + } + } + if f.callCount() != 4 { + t.Fatalf("walk ceiling must stop after page 1 (want 4 calls: release+esearch+1 efetch+release), got %d", f.callCount()) + } +} + +// TestRetrieve_LimitEqualUnderReportedTotalStillDowngrades: the esearch idlist returns +// MORE distinct ids (2) than its Count claims (1). A Limit equal to that under-reported +// Count must NOT let truncation drop the extra id and fake fetched == authority — the +// over-observation must still downgrade to BestEffort. Truncation only applies when +// limit < authority, so this stays a full, honest fetch that exposes the inconsistency. +func TestRetrieve_LimitEqualUnderReportedTotalStillDowngrades(t *testing.T) { + f := newFakeDoer() + f.set("GET", releaseURL(), 206, loadFixture(t, "release_header.xml.gz"), nil) + // 2 distinct ids on the page, but Count claims only 1 (an under-reported total). + f.set("GET", esearchURL(hfeTerm, 0, 500), 200, []byte(`{"esearchresult":{"count":"1","idlist":["18180","14"]}}`), nil) + f.set("GET", efetchURL("14"), 200, loadFixture(t, "efetch_14.xml"), nil) + f.set("GET", efetchURL("18180"), 200, loadFixture(t, "efetch_18180.xml"), nil) + c := newConn(t, f) + q := hfeSearch() + q.Limit = 1 // == the under-reported count; truncation must NOT engage + p := planFor(t, c, q) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.BestEffortState { + t.Fatalf("a limit equal to an under-reported total must not let truncation fake a Complete; want BestEffort, got %s (%s)", comp.State, comp.Reason) + } + if len(rs.Records) != 2 { + t.Fatalf("every observed id must be kept (no truncation at limit==authority), got %d records", len(rs.Records)) + } +} + // --- Retrieve: downgrades -------------------------------------------------- func TestRetrieve_MissingID_BestEffort(t *testing.T) { diff --git a/connectors/pdb/pdb.go b/connectors/pdb/pdb.go index e24763a..cd1e103 100644 --- a/connectors/pdb/pdb.go +++ b/connectors/pdb/pdb.go @@ -24,6 +24,7 @@ import ( "net/http" "sort" "strconv" + "strings" "time" "pinakes.sh/pinakes/connectors" @@ -150,6 +151,15 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. return contract.RecordSet{}, contract.Completeness{}, fmt.Errorf("pdb: retrieve: decode steps: %w", err) } + // limit is the caller's hard ceiling (NormalizedQuery.Limit), honored as a WALK + // CEILING (mirrors ncbi-virus #55): a huge upstream is never fully paginated or + // fetched for a small limit (e.g. a --limit 1 count query). A limit strictly + // below the authoritative total makes the result necessarily incomplete (the + // Complete gate still requires fetched == authority), so a limit can never + // produce a false Complete — only a BestEffort partial that still carries the + // authoritative total. 0 = unlimited. + limit := p.Query.Limit + var ( ids []string startCount, endCount, authority int64 @@ -174,6 +184,17 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. authority = total ids = append(ids, page...) collected += int64(len(page)) + // Walk ceiling: once the collected id count reaches the caller's limit, no + // further pages are needed — but ONLY when the limit is STRICTLY below the + // authoritative total. That guard is load-bearing: it guarantees a limited + // walk is always incomplete (fetched <= limit < authority => BestEffort), so + // a limit can never mask a count mismatch into a false Complete. When limit + // >= authority the limit is non-constraining and the full walk runs. The page + // order is already the server-forced rcsb_id asc sort, so truncating the + // collected prefix is deterministic. + if limit > 0 && authority > 0 && limit < authority && collected >= limit { + break + } // Stop when we have collected the whole reported total, or a page came // back empty (defensive: a 0-row page with collected= authority is skipped so a genuine + // count mismatch still downgrades rather than being silently capped. + if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) > limit { + ids = ids[:limit] + } + raws := make([]contract.RawRecord, 0, len(ids)) for _, id := range ids { body, status, err := c.getEntry(gov, id) @@ -206,7 +237,11 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. } fetched := int64(len(raws)) - if c.reconcile && startCount == endCount && fetched == authority { + // limitApplied: the caller's limit constrained the walk below the authoritative + // total. The total is still authoritative (it came from page 0's total_count); + // the walk was stopped by request, not because the total is untrustworthy. + limitApplied := limit > 0 && authority > 0 && limit < authority + if c.reconcile && !limitApplied && startCount == endCount && fetched == authority { // Reconciled: counts match and the total was stable across the walk. return rs, contract.NewComplete(fetched, authority), nil } @@ -215,12 +250,23 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. switch { case !c.reconcile: reason = fmt.Sprintf("pdb: reconcile_count disabled; fetched %d (no authoritative total to reconcile)", fetched) + case limitApplied: + reason = fmt.Sprintf("pdb: stopped at the requested limit of %d (authoritative total %d) — partial by request, count not reconciled", limit, authority) case startCount != endCount: reason = fmt.Sprintf("pdb: total_count mutated mid-walk (%d->%d); fetched %d — count not reconciled", startCount, endCount, fetched) default: reason = fmt.Sprintf("pdb: fetched %d of %d (some entries missing/404) — count not reconciled", fetched, authority) } - return rs, contract.NewBestEffort(fetched, reason), nil + comp := contract.NewBestEffort(fetched, reason) + if limitApplied { + // The authoritative total IS known (page 0's total_count); expose it so a + // caller asking only for a count gets the real total from a cheap limited + // fetch (the benchmark reads manifest.completeness.authoritative_count). Other + // BestEffort cases leave it zero because their total is absent or proven + // unstable (a mid-walk mutation). + comp.AuthoritativeCount = authority + } + return rs, comp, nil } // searchResponse is the typed view over the RCSB Search API 200 body. Only the @@ -666,10 +712,20 @@ func filtersToNodes(filters []contract.Filter) ([]map[string]any, error) { if f.Operator != r.Op { return nil, fmt.Errorf("pdb: filter field %q uses operator %q; only %q is supported", f.Field, f.Operator, r.Op) } + // The RCSB `in` operator requires an ARRAY value (a scalar is rejected with + // 400 "expects an array"); every other operator takes a scalar. Render the + // value shape per operator so a single-value `in` filter (e.g. + // organism_taxon_id:in:9606) still emits the array form the API demands. + var value any + if r.RCSBOperator == "in" { + value = coerceInValue(r.Type, f.Value) + } else { + value = coerceValue(r.Type, f.Value) + } params := map[string]any{ "attribute": r.RCSBAttribute, "operator": r.RCSBOperator, - "value": coerceValue(r.Type, f.Value), + "value": value, } nodes = append(nodes, map[string]any{ "type": "terminal", @@ -697,6 +753,28 @@ func coerceValue(typ, v string) any { return v } +// coerceInValue renders the value for an RCSB `in` terminal, which REQUIRES an +// array (a bare scalar yields 400 "expects an array"). The normalized filter +// carries comma-separated members in a single value string; each member is +// trimmed and coerced to the rule's element type. Both attributes that use `in` +// in this connector — organism_taxon_id (taxonomy_lineage.id) and +// uniprot_accession (database_accession) — are STRING-typed upstream and reject a +// numeric array element ("expects an array of strings"), so the common case keeps +// each element a string; coerceValue still applies for any future numeric `in` +// rule. An empty member is dropped so a trailing comma does not inject "". +func coerceInValue(typ, v string) []any { + parts := strings.Split(v, ",") + out := make([]any, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p == "" { + continue + } + out = append(out, coerceValue(typ, p)) + } + return out +} + // --- filter semantics ------------------------------------------------------ // rule is one ORDERED filter-semantics rule mapping a normalized field to an RCSB @@ -741,11 +819,18 @@ func defaultRules() []rule { Type: "string", }, { + // The searchable RCSB taxonomy attribute is taxonomy_lineage.id, NOT + // ncbi_taxonomy_id: the latter has search DISABLED upstream and returns 400 + // ("search is not enabled on [...ncbi_taxonomy_id]") for every operator. The + // lineage attribute is indexed as a STRING and its `in` operator requires an + // array OF STRINGS (an integer array is rejected: "expects an array of + // strings"), so Type is "string" — coerceInValue renders each taxid as a + // string array element rather than an int. Field: "organism_taxon_id", Op: "in", - RCSBAttribute: "rcsb_entity_source_organism.ncbi_taxonomy_id", + RCSBAttribute: "rcsb_entity_source_organism.taxonomy_lineage.id", RCSBOperator: "in", - Type: "integer", + Type: "string", }, { Field: "uniprot_accession", diff --git a/connectors/pdb/pdb_test.go b/connectors/pdb/pdb_test.go index 45e1a49..105e844 100644 --- a/connectors/pdb/pdb_test.go +++ b/connectors/pdb/pdb_test.go @@ -202,6 +202,77 @@ func TestPlan_UnknownFilterFieldRejected(t *testing.T) { } } +// TestPlan_OrganismTaxonIDValidRCSBNode is the regression for the organism_taxon_id +// 400: the RCSB Search API rejects the ncbi_taxonomy_id attribute ("search is not +// enabled") AND rejects an integer/scalar value for the `in` operator. The valid +// terminal uses the taxonomy_lineage.id attribute, the `in` operator, and an ARRAY +// OF STRINGS. This test fails if the rule reverts to ncbi_taxonomy_id, to a non-`in` +// operator, or to a scalar/integer value shape — every one of which the live API +// returns 400 for (verified against search.rcsb.org). +func TestPlan_OrganismTaxonIDValidRCSBNode(t *testing.T) { + c := newConn(t, newFakeDoer()) + q := contract.NormalizedQuery{ + SourceID: "pdb", + Operation: "search", + Filters: []contract.Filter{ + {Field: "organism_taxon_id", Operator: "in", Value: "9606"}, + }, + } + p, err := c.Plan(q) + if err != nil { + t.Fatalf("Plan: %v", err) + } + var st planSteps + if err := json.Unmarshal(p.Steps, &st); err != nil { + t.Fatalf("decode steps: %v", err) + } + var body map[string]any + if err := json.Unmarshal(st.SearchBody, &body); err != nil { + t.Fatalf("decode search body: %v", err) + } + term, _ := body["query"].(map[string]any) + if term == nil { + t.Fatalf("query node missing: %s", st.SearchBody) + } + params, _ := term["parameters"].(map[string]any) + if params == nil { + t.Fatalf("terminal parameters missing: %s", st.SearchBody) + } + // The non-searchable attribute must NOT be emitted (it returns a live 400). + if got := params["attribute"]; got != "rcsb_entity_source_organism.taxonomy_lineage.id" { + t.Fatalf("organism_taxon_id must map to the searchable taxonomy_lineage.id attribute, got %v (body=%s)", got, st.SearchBody) + } + if got := params["operator"]; got != "in" { + t.Fatalf("organism_taxon_id operator = %v, want in", got) + } + // `in` REQUIRES an array (a scalar yields a live 400 "expects an array"). + arr, ok := params["value"].([]any) + if !ok { + t.Fatalf("organism_taxon_id `in` value must be a JSON array, got %T (%v)", params["value"], params["value"]) + } + if len(arr) != 1 { + t.Fatalf("expected exactly one taxid element, got %v", arr) + } + // Element must be a STRING ("9606"), not the integer 9606 (an int array yields a + // live 400 "expects an array of strings"). json.Unmarshal decodes a JSON string + // to a Go string and a JSON number to float64, so a string element is the proof. + if _, isStr := arr[0].(string); !isStr { + t.Fatalf("organism_taxon_id array element must be a JSON string, got %T (%v)", arr[0], arr[0]) + } + if arr[0] != "9606" { + t.Fatalf("organism_taxon_id array element = %v, want \"9606\"", arr[0]) + } + // Belt-and-suspenders: the raw bytes must carry the string form "9606", never a + // bare numeric 9606 nor the dead ncbi_taxonomy_id attribute. + raw := string(st.SearchBody) + if containsAny(raw, "ncbi_taxonomy_id") { + t.Fatalf("search body must not reference the non-searchable ncbi_taxonomy_id attribute: %s", raw) + } + if !containsAny(raw, "[\"9606\"]") { + t.Fatalf("search body must carry the string-array value [\"9606\"]: %s", raw) + } +} + // --- Retrieve: complete / 204 / 404-downgrade / mid-walk / 429 / 500 ------- // stdSearchDoer wires a fake serving the 2-id search page and both entry bodies. @@ -336,6 +407,98 @@ func TestRetrieve_MidWalkMutationDowngrades(t *testing.T) { } } +// TestRetrieve_LimitIsWalkCeiling is the regression for the LIMIT/timeout bug: a +// count-style query (--limit 1) over a huge upstream must NOT fetch every entry. +// The search page reports an authoritative total of 5 but the connector, honoring +// the limit as a walk ceiling, must cap the id list to 1 and call getEntry exactly +// once — never for the other ids. The result is BestEffort (a limited walk can +// never be Complete) yet still exposes the real authoritative_count (5) so a cheap +// count query is correct. This test fails if the limit ceiling is reverted: the old +// code fetched getEntry for every collected id (here 2), and reported no +// authoritative count on the partial. +func TestRetrieve_LimitIsWalkCeiling(t *testing.T) { + // total_count 5 (large authority) but the page returns 2 concrete ids. With + // limit 1, only the FIRST id (server-forced rcsb_id asc order, here 1MBN) is + // fetched; 4HHB must never be requested. + page := mustJSON(map[string]any{ + "query_id": "x", + "result_type": "entry", + "total_count": 5, + "result_set": []any{ + map[string]any{"identifier": "1MBN", "score": 1.0}, + map[string]any{"identifier": "4HHB", "score": 0.9}, + }, + }) + f := newFakeDoer() + f.set("POST", searchURL, 200, page, nil) + f.set("GET", entryURL("1MBN"), 200, loadFixture(t, "entry_1MBN.json"), nil) + // 4HHB is wired so that, IF the connector wrongly fetched it, the test would + // still see the request; we assert it is never requested below. + f.set("GET", entryURL("4HHB"), 200, loadFixture(t, "entry_4HHB.json"), nil) + c := newConn(t, f) + + p := retrievePlan(t, c, contract.NormalizedQuery{SourceID: "pdb", Operation: "search", Limit: 1}) + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + + // Exactly one entry fetched (the limit), not the whole collected set. + if len(rs.Records) != 1 { + t.Fatalf("limit 1 must yield exactly 1 record, got %d", len(rs.Records)) + } + if rs.Records[0].SourceAccession != "1MBN" { + t.Fatalf("limit must keep the first id in the imposed order (1MBN), got %q", rs.Records[0].SourceAccession) + } + // The other id's entry must NEVER be requested — the whole point of the ceiling. + if requestCount(f, "GET", entryURL("4HHB")) != 0 { + t.Fatalf("limit ceiling violated: getEntry(4HHB) was called %d times; a --limit 1 walk must not fetch beyond the limit", requestCount(f, "GET", entryURL("4HHB"))) + } + if requestCount(f, "GET", entryURL("1MBN")) != 1 { + t.Fatalf("expected getEntry(1MBN) exactly once, got %d", requestCount(f, "GET", entryURL("1MBN"))) + } + + // A limited walk is necessarily incomplete: never a false Complete. + if comp.State != contract.BestEffortState { + t.Fatalf("a limited (--limit 1 < total 5) walk must be BestEffort, got %s", comp.State) + } + // ...but the authoritative count MUST still be the upstream total_count (5), so a + // count query is answered correctly from a cheap limited fetch. + if comp.AuthoritativeCount != 5 { + t.Fatalf("BestEffort(limitApplied) must expose authoritative_count=5 (the upstream total), got %d", comp.AuthoritativeCount) + } + if comp.ReconciledCount != 1 { + t.Fatalf("reconciled_count = %d, want 1 (the fetched count under the limit)", comp.ReconciledCount) + } + if !containsAny(comp.Reason, "limit") { + t.Fatalf("BestEffort reason should name the limit, got %q", comp.Reason) + } +} + +// TestRetrieve_LimitAtOrAboveTotalStillCompletes proves the ceiling guard is +// strict: a limit >= the authoritative total is non-constraining, so the full walk +// runs and a reconciled result is still Complete (the limit must not spuriously +// downgrade a genuinely-complete set). Fails if the guard drops the `limit < +// authority` condition. +func TestRetrieve_LimitAtOrAboveTotalStillCompletes(t *testing.T) { + f := stdSearchDoer(t) // total_count 2, two ids, both present + c := newConn(t, f) + p := retrievePlan(t, c, contract.NormalizedQuery{SourceID: "pdb", Operation: "search", Limit: 2}) + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.CompleteState { + t.Fatalf("limit == total must stay Complete, got %s (%s)", comp.State, comp.Reason) + } + if comp.ReconciledCount != 2 || comp.AuthoritativeCount != 2 { + t.Fatalf("expected Complete(2,2), got %d/%d", comp.ReconciledCount, comp.AuthoritativeCount) + } + if len(rs.Records) != 2 { + t.Fatalf("expected 2 records, got %d", len(rs.Records)) + } +} + func TestRetrieve_429ThrottlesThenSucceeds(t *testing.T) { f := stdSearchDoer(t) // The first search POST returns 429 (Retry-After 0 to keep the test fast), @@ -562,6 +725,21 @@ func specWithPageSize(n int) connectors.Spec { return s } +// requestCount returns how many times the fake doer saw a given method+URL, so a +// test can prove the limit ceiling bounded the per-id getEntry fan-out. +func requestCount(f *fakeDoer, method, url string) int { + f.mu.Lock() + defer f.mu.Unlock() + want := routeKey(method, url) + n := 0 + for _, r := range f.requests { + if r == want { + n++ + } + } + return n +} + func containsAny(s string, subs ...string) bool { for _, sub := range subs { if indexOf(s, sub) >= 0 { diff --git a/connectors/uniprot/uniprot.go b/connectors/uniprot/uniprot.go index 5e60784..78b7be2 100644 --- a/connectors/uniprot/uniprot.go +++ b/connectors/uniprot/uniprot.go @@ -17,6 +17,7 @@ package uniprot import ( "bytes" + "compress/gzip" "encoding/json" "fmt" "io" @@ -174,6 +175,13 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. searchPath := len(st.IDs) == 0 + // limit is the caller's hard ceiling (NormalizedQuery.Limit), honored as a WALK + // CEILING per the contract: a huge upstream is never fully fetched for a small + // limit. A limit below the authoritative total makes the result necessarily + // incomplete (the Complete gate below still requires fetched == authority), so a + // limit can never produce a false Complete — only a BestEffort partial. 0 = unlimited. + limit := p.Query.Limit + var ( ids []string startCount, endCount, authority int64 @@ -273,6 +281,17 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. seen[id] = true ids = append(ids, id) } + // Walk ceiling: once the distinct set reaches the caller's limit, no further + // pages are needed — but ONLY when the limit is STRICTLY below the authoritative + // total. That guard is load-bearing: it guarantees a limited walk is always + // incomplete (fetched <= limit < authority => BestEffort), so the limit can + // never mask an over-observation into a false Complete. When limit >= authority + // the limit is non-constraining and the normal cover/total logic applies. This + // is what makes a count query (--limit 1) fast: page 0 carries the + // authoritative X-Total-Results, so the walk stops after one page. + if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) >= limit { + break + } // Terminate as soon as the distinct set covers the authoritative total (the // PDB-style bound), or a page returns no rows and no next link. if authority > 0 && int64(len(ids)) >= authority { @@ -288,6 +307,25 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. } } + // observedDistinct is the number of DISTINCT accessions the walk enumerated, + // captured BEFORE any limit truncation. A Complete requires this to equal the + // authoritative total: enumerating MORE distinct accessions than the total claims is + // an inconsistency (an under-reported total) that must downgrade even when 404-skipped + // fetches happen to bring the retrieved count back down to the total. (On the + // get/resolve path observedDistinct == authority == len(ids) by construction.) + observedDistinct := int64(len(ids)) + + // Truncate to the first `limit` accessions in the (search-sorted) order, but ONLY + // when limit is STRICTLY below the authoritative total (the same guard as the walk + // ceiling). This bounds the per-accession fetch for a small limit while guaranteeing + // the result stays incomplete (BestEffort). Truncating when limit == authority could + // drop an OVER-observed id (a page returned more rows than the total claims) and + // fake fetched == authority — a false Complete. When limit >= authority we keep every + // observed id so an under-reported total still downgrades. + if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) > limit { + ids = ids[:limit] + } + // Fetch one verbatim /uniprotkb/{accession}.json body per DISTINCT accession. var entryRelease string // first entry release seen (the get/resolve release authority + search-path fallback) raws := make([]contract.RawRecord, 0, len(ids)) @@ -348,7 +386,10 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. } fetched := int64(len(raws)) - if st.Reconcile && totalKnown && !walkTruncated && !sawDuplicate && !totalMutated && !releaseDrift && fetched == authority { + // limitApplied: the caller's limit constrained the walk below the authoritative + // total. Reported as the BestEffort reason (the count is still authoritative). + limitApplied := limit > 0 && totalKnown && authority > 0 && limit < authority + if st.Reconcile && totalKnown && !walkTruncated && !sawDuplicate && !totalMutated && !releaseDrift && observedDistinct == authority && fetched == authority { // Reconciled: the authoritative total was KNOWN and stable across the whole // (bounded, re-list-free) walk, the release was coherent, and every record was // fetched. Only then is the result a true Complete. @@ -361,6 +402,8 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. reason = fmt.Sprintf("uniprot: reconcile_count disabled; fetched %d (no authoritative total to reconcile)", fetched) case !totalKnown: reason = fmt.Sprintf("uniprot: upstream reported no authoritative X-Total-Results; fetched %d — total unknown, cannot claim Complete", fetched) + case limitApplied: + reason = fmt.Sprintf("uniprot: stopped at the requested limit of %d (authoritative total %d) — partial by request, count not reconciled", limit, authority) case walkTruncated: reason = truncReason case sawDuplicate: @@ -369,10 +412,21 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. reason = fmt.Sprintf("uniprot: X-Total-Results mutated mid-walk (start=%d end=%d); fetched %d — count not reconciled", startCount, endCount, fetched) case releaseDrift: reason = fmt.Sprintf("uniprot: X-UniProt-Release drifted mid-walk; fetched %d — release not coherent", fetched) + case observedDistinct != authority: + reason = fmt.Sprintf("uniprot: enumerated %d distinct accessions vs an authoritative total of %d — inconsistent enumeration, count not reconciled", observedDistinct, authority) default: reason = fmt.Sprintf("uniprot: fetched %d of %d (some accessions missing/404) — count not reconciled", fetched, authority) } - return rs, contract.NewBestEffort(fetched, reason), nil + comp := contract.NewBestEffort(fetched, reason) + if limitApplied { + // The authoritative total IS known (the page-0 X-Total-Results header); the walk + // was stopped at the caller's limit by request, not because the total is + // untrustworthy. Expose it so a caller asking only for a count gets the real total + // from a cheap limited fetch. (Other BestEffort cases leave it zero because their + // total is absent or proven unstable.) + comp.AuthoritativeCount = authority + } + return rs, comp, nil } // searchResponse is the typed view over the UniProt search 200 body. We request @@ -530,9 +584,48 @@ func readBody(resp *http.Response) ([]byte, int, http.Header, time.Duration, err if err != nil { return nil, resp.StatusCode, hdr, ra, err } + // UniProt gzips the JSON body and (for this shared HTTP client) it arrives + // undecompressed: the raw gzip bytes (first two bytes 0x1f 0x8b) would fail the + // json decode with "invalid character \x1f looking for beginning of value". + // Decompress here — BEFORE returning to searchPage/getEntry, both of which decode + // JSON — when the body is gzip, detected either by the Content-Encoding header + // containing "gzip" OR by the gzip magic prefix (a missing/incorrect header on a + // gzipped body is exactly the observed failure, so the magic-byte sniff is the + // load-bearing path). + if isGzip(hdr, b) { + decoded, derr := gunzip(b) + if derr != nil { + return nil, resp.StatusCode, hdr, ra, fmt.Errorf("uniprot: decompress gzip body: %w", derr) + } + b = decoded + } return b, resp.StatusCode, hdr, ra, nil } +// isGzip reports whether the body should be gunzipped: either the Content-Encoding +// header advertises gzip, or the body opens with the gzip magic bytes 0x1f 0x8b +// (UniProt may gzip the body while the shared client leaves the header off, so the +// magic-byte sniff is required, not just the header check). +func isGzip(hdr http.Header, b []byte) bool { + if strings.Contains(strings.ToLower(hdr.Get("Content-Encoding")), "gzip") { + return true + } + return len(b) >= 2 && b[0] == 0x1f && b[1] == 0x8b +} + +// gunzip decompresses a gzip-framed body and returns the decompressed bytes. Errors +// (a corrupt or truncated stream) surface to the caller as a read error rather than +// a silent partial — a malformed body must never masquerade as a valid empty/short +// JSON payload. +func gunzip(b []byte) ([]byte, error) { + zr, err := gzip.NewReader(bytes.NewReader(b)) + if err != nil { + return nil, err + } + defer func() { _ = zr.Close() }() + return io.ReadAll(zr) +} + // parseTotal reads the authoritative total from the X-Total-Results response // header. It returns (n, true) only when the header is present and parses to a // non-negative integer; an absent or unparseable header returns (0, false) — the diff --git a/connectors/uniprot/uniprot_test.go b/connectors/uniprot/uniprot_test.go index d2577d5..262ea3c 100644 --- a/connectors/uniprot/uniprot_test.go +++ b/connectors/uniprot/uniprot_test.go @@ -1,6 +1,8 @@ package uniprot import ( + "bytes" + "compress/gzip" "encoding/json" "errors" "net/url" @@ -15,6 +17,22 @@ import ( "pinakes.sh/pinakes/engine/contract" ) +// gzipBytes gzip-compresses b, exactly as UniProt delivers a gzipped JSON body. It +// is the deterministic seam the gzip regression tests use to prove readBody +// decompresses before decoding. +func gzipBytes(t *testing.T, b []byte) []byte { + t.Helper() + var buf bytes.Buffer + zw := gzip.NewWriter(&buf) + if _, err := zw.Write(b); err != nil { + t.Fatalf("gzip write: %v", err) + } + if err := zw.Close(); err != nil { + t.Fatalf("gzip close: %v", err) + } + return buf.Bytes() +} + // uniprotSpec returns the connectors.Spec equivalent to connectors/uniprot.yaml. // It is asserted to pass Spec.Validate in TestSpec_Validates, so the declarative // YAML shape is exercised in CI without adding a YAML dependency to the engine. @@ -971,6 +989,175 @@ func TestRetrieve_NoSearchReleaseEntryDriftDowngrades(t *testing.T) { } } +// --- gzip body decompression (bug 1) --------------------------------------- + +// TestRetrieve_GzipSearchBodyDecodes proves the search body arrives gzip-framed +// (first bytes 0x1f 0x8b) — exactly how UniProt delivers it through the shared +// client — and is decompressed by readBody BEFORE the JSON decode, so the walk +// reconciles Complete instead of failing the decode with +// "invalid character \x1f looking for beginning of value". FAILS if the gzip +// handling in readBody is reverted (the raw 0x1f byte reaches json.Decode). +func TestRetrieve_GzipSearchBodyDecodes(t *testing.T) { + f := newFakeDoer() + gz := gzipBytes(t, loadFixture(t, "search_2ids.json")) + // Sanity: the body really is gzip-framed (the regression is only meaningful if + // the bytes the connector reads start with the gzip magic prefix). + if len(gz) < 2 || gz[0] != 0x1f || gz[1] != 0x8b { + t.Fatalf("test fixture is not gzip-framed: % x", gz[:min(2, len(gz))]) + } + // Content-Encoding header advertises gzip (the header-driven path). + f.set("GET", searchURL(500), 200, gz, map[string]string{ + headerTotal: "2", headerRelease: "2026_02", "Content-Encoding": "gzip", + }) + f.set("GET", entryURL("A0A0C5B5G6"), 200, loadFixture(t, "entry_A0A0C5B5G6.json"), hdr("", "2026_02", "")) + f.set("GET", entryURL("P0DPR3"), 200, loadFixture(t, "entry_P0DPR3.json"), hdr("", "2026_02", "")) + c := newConn(t, f) + p := retrievePlan(t, c, contract.NormalizedQuery{SourceID: "uniprot", Operation: "search"}) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("a gzipped search body must be decompressed, not fail the decode: %v", err) + } + if comp.State != contract.CompleteState || comp.AuthoritativeCount != 2 { + t.Fatalf("expected Complete(2,2) from the decompressed search page, got %s auth=%d (%s)", comp.State, comp.AuthoritativeCount, comp.Reason) + } + if len(rs.Records) != 2 { + t.Fatalf("expected 2 records from the decompressed search walk, got %d", len(rs.Records)) + } +} + +// TestRetrieve_GzipBodyNoHeaderStillDecodes proves the magic-byte sniff is +// load-bearing: when the gzipped body carries NO Content-Encoding header (the +// observed failure mode — the body is gzipped but the shared client leaves the +// header off), readBody must still detect the 0x1f 0x8b prefix and decompress. +// FAILS if the magic-byte branch of isGzip is reverted (only the header check +// remains, the raw gzip bytes reach json.Decode). +func TestRetrieve_GzipBodyNoHeaderStillDecodes(t *testing.T) { + f := newFakeDoer() + gz := gzipBytes(t, loadFixture(t, "search_2ids.json")) + // No Content-Encoding header — only the gzip magic bytes signal compression. + f.set("GET", searchURL(500), 200, gz, hdr("2", "2026_02", "")) + f.set("GET", entryURL("A0A0C5B5G6"), 200, loadFixture(t, "entry_A0A0C5B5G6.json"), hdr("", "2026_02", "")) + f.set("GET", entryURL("P0DPR3"), 200, loadFixture(t, "entry_P0DPR3.json"), hdr("", "2026_02", "")) + c := newConn(t, f) + p := retrievePlan(t, c, contract.NormalizedQuery{SourceID: "uniprot", Operation: "search"}) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("a gzipped body with no Content-Encoding header must still be sniffed and decompressed: %v", err) + } + if comp.State != contract.CompleteState || len(rs.Records) != 2 { + t.Fatalf("expected Complete with 2 records from the magic-byte-detected gzip body, got %s recs=%d (%s)", comp.State, len(rs.Records), comp.Reason) + } +} + +// TestGetEntry_GzipBodyDecodes proves readBody's gzip handling also covers the +// per-entry body (search and getEntry share readBody): a gzipped +// /uniprotkb/{acc}.json body decodes and normalizes, never failing the decode. +// FAILS if gzip handling in readBody is reverted. +func TestGetEntry_GzipBodyDecodes(t *testing.T) { + f := newFakeDoer() + gzEntry := gzipBytes(t, loadFixture(t, "entry_P0DPR3.json")) + f.set("GET", entryURL("P0DPR3"), 200, gzEntry, map[string]string{ + headerRelease: "2026_02", "Content-Encoding": "gzip", + }) + c := newConn(t, f) + p := retrievePlan(t, c, contract.NormalizedQuery{SourceID: "uniprot", Operation: "get", IDs: []string{"P0DPR3"}}) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("a gzipped entry body must be decompressed: %v", err) + } + if comp.State != contract.CompleteState { + t.Fatalf("expected Complete from the decompressed entry, got %s (%s)", comp.State, comp.Reason) + } + // The decompressed bytes must be valid for Normalize (proves it really decoded). + nr, err := c.Normalize(rs) + if err != nil { + t.Fatalf("normalize the decompressed entry: %v", err) + } + if len(nr.Records) != 1 || nr.Records[0].Fields["primary_accession"] != "P0DPR3" { + t.Fatalf("decompressed entry did not normalize correctly: %+v", nr.Records) + } +} + +// --- limit as a walk ceiling (bug 2) --------------------------------------- + +// TestRetrieve_LimitBoundsWalkAndExposesAuthority proves a count query +// (--limit 1) against a large upstream stops after the FIRST page yet still +// reports the upstream authoritative total. The fake routes 5 distinct pages +// (total=20000); honoring the limit as a walk ceiling means only page 0 is +// listed, so its X-Total-Results (20000) is exposed as AuthoritativeCount on the +// BestEffort result and at most 1 entry is fetched. FAILS if the limit ceiling is +// reverted: the walk would follow every rel="next" (calling far more than the +// ceiling allows) and the BestEffort would carry AuthoritativeCount=0. +func TestRetrieve_LimitBoundsWalkAndExposesAuthority(t *testing.T) { + f := newFakeDoer() + const total = 20000 + // Build a chain of 5 pages, each listing one fresh accession and a fresh + // rel="next". Without the limit ceiling the walk would page well past page 0. + accs := []string{"P00001", "P00002", "P00003", "P00004", "P00005"} + for i := range accs { + this := restBase + "/uniprotkb/search?cursor=P" + strconv.Itoa(i) + "&fields=accession&format=json&query=%2A&size=500&sort=accession+asc" + if i == 0 { + this = searchURL(500) + } + nxt := restBase + "/uniprotkb/search?cursor=P" + strconv.Itoa(i+1) + "&fields=accession&format=json&query=%2A&size=500&sort=accession+asc" + body := []byte(`{"results":[{"primaryAccession":"` + accs[i] + `"}]}`) + f.set("GET", this, 200, body, hdr(strconv.Itoa(total), "2026_02", "<"+nxt+">; rel=\"next\"")) + } + // Entry body for the single accession the ceiling permits fetching. + f.set("GET", entryURL("P00001"), 200, loadFixture(t, "entry_P0DPR3.json"), hdr("", "2026_02", "")) + c := newConn(t, f) + + p := retrievePlan(t, c, contract.NormalizedQuery{SourceID: "uniprot", Operation: "search", Limit: 1}) + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve with limit: %v", err) + } + // A limit below the total can never be a false Complete — it must be BestEffort. + if comp.State != contract.BestEffortState { + t.Fatalf("a limit below the authoritative total must be BestEffort, got %s", comp.State) + } + // The whole point of the fix: the upstream total is exposed despite the cheap walk. + if comp.AuthoritativeCount != total { + t.Fatalf("limited walk must still expose the authoritative total %d, got %d", total, comp.AuthoritativeCount) + } + if !strings.Contains(comp.Reason, "limit") { + t.Fatalf("BestEffort reason should name the limit, got %q", comp.Reason) + } + if int64(len(rs.Records)) > 1 { + t.Fatalf("limit 1 must fetch at most 1 record, got %d", len(rs.Records)) + } + // The walk must be bounded: page 0 (1 list call) + at most 1 entry fetch. Without + // the ceiling the walk would list all 5 routed pages (and any further rel="next"). + if f.callCount() > 3 { + t.Fatalf("limit ceiling did not bound the walk: %d upstream calls (expected ~2: one list page + one entry)", f.callCount()) + } +} + +// TestRetrieve_LimitAtOrAboveTotalStillCompletes proves the ceiling guard is +// strict (limit < authority): a limit >= the authoritative total is +// non-constraining, so the walk drains fully and reconciles Complete — the limit +// must NOT downgrade a result that genuinely covers the whole set. +func TestRetrieve_LimitAtOrAboveTotalStillCompletes(t *testing.T) { + f := stdSearchDoer(t) // total=2, single page, two entries + c := newConn(t, f) + // limit 5 >= total 2 => non-constraining. + p := retrievePlan(t, c, contract.NormalizedQuery{SourceID: "uniprot", Operation: "search", Limit: 5}) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.CompleteState || comp.AuthoritativeCount != 2 { + t.Fatalf("a limit >= total must not downgrade; expected Complete(2,2), got %s auth=%d (%s)", comp.State, comp.AuthoritativeCount, comp.Reason) + } + if len(rs.Records) != 2 { + t.Fatalf("expected both records fetched, got %d", len(rs.Records)) + } +} + // --- helpers --------------------------------------------------------------- func rawSetFromFixtures(t *testing.T, idToFile map[string]string) contract.RecordSet {