From a05d19cf4bf46e9005fb6b1106a0aad1b8f5f194 Mon Sep 17 00:00:00 2001 From: Tristan Farmer <159447266+001TMF@users.noreply.github.com> Date: Mon, 15 Jun 2026 10:10:14 +0100 Subject: [PATCH 1/2] feat(ncbivirus): native local filters + Filtered completeness; fix multi-page total reconciliation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The NCBI Datasets v2 virus API cannot evaluate sequence-length, collection-date range, ambiguous-character, or lab-passaged predicates server-side — exactly the filters VirBench's hard queries exercise. Previously such a query could only be run partially (post-hoc filtering outside the engine). This adds them as deterministic LOCAL predicates: the connector scans the COMPLETE, count-reconciled candidate set and applies the predicates exhaustively, reporting a new FilteredState whose AuthoritativeCount is the EXACT match count (computed over a whole reconciled scan, not sampled). The caller's limit caps only the returned records, so `search --limit 1` still yields the exact total. The ambiguous-character count uses the sequence from a new efetch endpoint (the sequence never enters a RawRecord or the hash; it is immutable per accession.version, so the count is reproducible). Cheap report-only predicates run first; sequences are fetched only for survivors. Also fixes a latent multi-page reconciliation bug surfaced by this work: NCBI returns x-ncbi-total-count ONLY on the first page of a token walk; continuation pages omit it (but keep x-datasets-version). The walk previously read that absence as total=0 / a mid-walk mutation and downgraded EVERY >1000-record query to best-effort. The page-0 total is now authoritative for the whole walk; only a PRESENT, differing total signals a real mutation. (Prior multi-page fixtures set the total on every page, masking this; a new test reflects real NCBI behavior.) And corrects a latent contract bug: CompletenessState now serializes as its string token (matching schema/manifest.schema.json) rather than the raw uint8 ordinal, with a backward-compatible reader. The manifest is not content-addressed, so no snapshot hash changes. Review fixes (cross-model: codex/GPT-5.5 + adversarial): reject a repeated (field, operator) pair (last-write-wins could relax a bound and corrupt the exact count); validate calendar days per-month with leap years in normalizeDate; and propagate the FASTA scanner error so a truncated read cannot undercount N. - engine/contract: FilteredState + NewFiltered + IsExact + JSON token codec - connectors/ncbivirus: sequence_length, collection_date (gte/lte), max_ambiguous_chars, exclude_lab_passaged; multi-operator-per-field rule table; efetch endpoint; first-page-total reconciliation fix - schema: add "filtered" to the completeness state enum + reason rule - case-studies/ebola: refresh filter_logic_hash (semantics changed; intended drift) Tests cover the full local-filter path, the multi-page total-header behavior, predicate units, and the contract codec — all proven-by-revert. make ci green. Co-Authored-By: Claude Opus 4.8 (1M context) --- case-studies/ebola/manifest.json | 4 +- connectors/ncbivirus/fake_test.go | 4 +- connectors/ncbivirus/localfilter_test.go | 497 +++++++++++++ connectors/ncbivirus/ncbivirus.go | 672 ++++++++++++++++-- connectors/ncbivirus/ncbivirus_test.go | 34 + engine/contract/completeness.go | 139 +++- engine/contract/completeness_filtered_test.go | 128 ++++ schema/manifest.schema.json | 12 +- surface/provider/specs/ncbi-virus.yaml | 8 + 9 files changed, 1436 insertions(+), 62 deletions(-) create mode 100644 connectors/ncbivirus/localfilter_test.go create mode 100644 engine/contract/completeness_filtered_test.go diff --git a/case-studies/ebola/manifest.json b/case-studies/ebola/manifest.json index a73b3e3..77baa91 100644 --- a/case-studies/ebola/manifest.json +++ b/case-studies/ebola/manifest.json @@ -2,7 +2,7 @@ "manifest_schema_version": "1.0.0", "engine_version": "pinakes/0.1.0", "connector_spec_version": "ncbi-virus/1.0.0", - "filter_logic_hash": "sha256:089fe8548c81dc1e871e08175f7a567e7cb9e94c7f6e32f6f1f8e597d198c4ca", + "filter_logic_hash": "sha256:82248fc9f8a959e1e53172618961d757ceb65afd2d8f5f9c98b9bb1f80a91039", "serializer_codec": { "serializer_version": "1.0.0", "digest_algorithm": "sha256", @@ -49,7 +49,7 @@ "logical_record_hash": "sha256:01fffd0c228c9f46e28f472a75b2e58b978ef9e565909f2838aa507bd44120e9", "schema_version": "sequences/1.0.0", "completeness": { - "state": 0, + "state": "complete", "reconciled_count": 32, "authoritative_count": 32 }, diff --git a/connectors/ncbivirus/fake_test.go b/connectors/ncbivirus/fake_test.go index 4ec9027..a102f95 100644 --- a/connectors/ncbivirus/fake_test.go +++ b/connectors/ncbivirus/fake_test.go @@ -144,9 +144,9 @@ const restBase = "https://api.ncbi.nlm.nih.gov" func listURLForTaxon(t *testing.T, taxon string, pageSize int64) string { t.Helper() c := &Connector{rest: restBase, pageSize: pageSize} - u, err := c.buildListURL([]contract.Filter{{Field: "organism_taxon_id", Operator: "eq", Value: taxon}}) + u, _, err := c.planSearch([]contract.Filter{{Field: "organism_taxon_id", Operator: "eq", Value: taxon}}) if err != nil { - t.Fatalf("buildListURL: %v", err) + t.Fatalf("planSearch: %v", err) } return u } diff --git a/connectors/ncbivirus/localfilter_test.go b/connectors/ncbivirus/localfilter_test.go new file mode 100644 index 0000000..72091b7 --- /dev/null +++ b/connectors/ncbivirus/localfilter_test.go @@ -0,0 +1,497 @@ +package ncbivirus + +// Tests for the LOCAL-filter path: predicates the NCBI Datasets API cannot evaluate +// server-side (sequence_length, collection_date range, max_ambiguous_chars, +// exclude_lab_passaged), which Retrieve applies to the COMPLETE reconciled candidate +// set and reports as a FilteredState result with an exact match count. Fixtures are +// synthesized inline so each case is self-contained and readable. + +import ( + "bytes" + "encoding/json" + "net/url" + "strings" + "testing" + + "pinakes.sh/pinakes/connectors" + "pinakes.sh/pinakes/engine/contract" +) + +// --- inline fixture builders ----------------------------------------------- + +// reportLine builds one per-accession dataset_report ndjson object. collDate "" omits +// the collection_date so the record is undated. +func reportLine(acc string, length int, name, collDate string) []byte { + iso := map[string]any{"name": name} + if collDate != "" { + iso["collection_date"] = collDate + } + b, err := json.Marshal(map[string]any{ + "accession": acc, + "length": length, + "isolate": iso, + }) + if err != nil { + panic(err) + } + return b +} + +// listPageNDJSON builds a list-page body: one {"accession":...} line per accession. +func listPageNDJSON(accs ...string) []byte { + var b bytes.Buffer + for _, a := range accs { + b.WriteString(`{"accession":"`) + b.WriteString(a) + b.WriteString(`"}` + "\n") + } + return b.Bytes() +} + +// efetchURL is the exact URL the connector forms for one accession's FASTA. +func efetchURL(acc string) string { + v := url.Values{} + v.Set("db", "nuccore") + v.Set("id", acc) + v.Set("rettype", "fasta") + v.Set("retmode", "text") + return "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?" + v.Encode() +} + +// fastaBody builds a FASTA payload with `ambig` ambiguous (N) bases padded with A's. +func fastaBody(acc string, ambig int) []byte { + seq := strings.Repeat("N", ambig) + strings.Repeat("A", 100) + return []byte(">" + acc + " synthetic\n" + seq + "\n") +} + +func specWithEfetch() connectors.Spec { + s := ncbiVirusSpec() + s.Endpoints["efetch"] = "https://eutils.ncbi.nlm.nih.gov" + return s +} + +func newConnEfetch(t *testing.T, doer httpDoer) *Connector { + t.Helper() + c, err := New(specWithEfetch(), doer) + if err != nil { + t.Fatalf("New: %v", err) + } + return c +} + +func accsOf(rs contract.RecordSet) []string { + out := make([]string, 0, len(rs.Records)) + for _, r := range rs.Records { + out = append(out, r.SourceAccession) + } + return out +} + +// --- Plan: server vs local separation -------------------------------------- + +func TestPlan_LocalFiltersSeparatedFromServerParams(t *testing.T) { + c := newConnEfetch(t, newFakeDoer()) + q := contract.NormalizedQuery{ + SourceID: "ncbi-virus", + Operation: "search", + Filters: []contract.Filter{ + {Field: "organism_taxon_id", Operator: "eq", Value: "3052462"}, + {Field: "host", Operator: "eq", Value: "human"}, + {Field: "sequence_length", Operator: "gte", Value: "15200"}, + {Field: "collection_date", Operator: "gte", Value: "2014-01-01"}, + {Field: "collection_date", Operator: "lte", Value: "2014-06-20"}, + {Field: "max_ambiguous_chars", Operator: "lte", Value: "1900"}, + {Field: "exclude_lab_passaged", Operator: "eq", Value: "true"}, + }, + } + p, err := c.Plan(q) + if err != nil { + t.Fatalf("Plan: %v", err) + } + var st planSteps + if err := json.Unmarshal(p.Steps, &st); err != nil { + t.Fatalf("decode steps: %v", err) + } + // The taxon path + the server-side host filter belong in the URL; the four local + // predicates must NOT leak into the upstream query string. + if !strings.Contains(st.ListURL, "/taxon/3052462/") { + t.Fatalf("taxon must scope the list URL, got %q", st.ListURL) + } + if !strings.Contains(st.ListURL, "filter.host=human") { + t.Fatalf("host is server-side and must be in the list URL, got %q", st.ListURL) + } + for _, leaked := range []string{"sequence_length", "collection_date", "max_ambiguous", "exclude_lab"} { + if strings.Contains(st.ListURL, leaked) { + t.Fatalf("local predicate %q leaked into the upstream URL %q", leaked, st.ListURL) + } + } + if len(st.LocalFilters) != 5 { // 2 length-range? no: 1 length + 2 date + 1 ambig + 1 lab = 5 + t.Fatalf("expected 5 local predicates carried in the plan, got %d: %+v", len(st.LocalFilters), st.LocalFilters) + } +} + +func TestPlan_MaxAmbiguousRequiresEfetchEndpoint(t *testing.T) { + // A connector WITHOUT the efetch endpoint must reject max_ambiguous_chars as a + // caller-fixable (unsatisfiable) query, not fail at retrieve time. + c := newConn(t, newFakeDoer()) // ncbiVirusSpec has no efetch endpoint + _, err := c.Plan(contract.NormalizedQuery{ + SourceID: "ncbi-virus", + Operation: "search", + Filters: []contract.Filter{ + {Field: "organism_taxon_id", Operator: "eq", Value: "3052462"}, + {Field: "max_ambiguous_chars", Operator: "lte", Value: "1900"}, + }, + }) + if err == nil { + t.Fatal("max_ambiguous_chars without an efetch endpoint must be rejected") + } +} + +func TestPlan_RangeOperatorsOnOneFieldAccepted(t *testing.T) { + // A two-sided range uses the same field under two operators; both must be accepted. + c := newConnEfetch(t, newFakeDoer()) + _, err := c.Plan(contract.NormalizedQuery{ + SourceID: "ncbi-virus", + Operation: "search", + Filters: []contract.Filter{ + {Field: "organism_taxon_id", Operator: "eq", Value: "3052462"}, + {Field: "sequence_length", Operator: "gte", Value: "15200"}, + {Field: "sequence_length", Operator: "lte", Value: "20000"}, + }, + }) + if err != nil { + t.Fatalf("two-sided sequence_length range must be accepted: %v", err) + } +} + +func TestPlan_DuplicateFieldOperatorRejected(t *testing.T) { + // Two bounds under the SAME (field, operator) would silently overwrite (last + // wins) and could relax a constraint, corrupting the exact match count. + c := newConnEfetch(t, newFakeDoer()) + _, err := c.Plan(contract.NormalizedQuery{ + SourceID: "ncbi-virus", + Operation: "search", + Filters: []contract.Filter{ + {Field: "organism_taxon_id", Operator: "eq", Value: "3052462"}, + {Field: "sequence_length", Operator: "gte", Value: "15200"}, + {Field: "sequence_length", Operator: "gte", Value: "10000"}, // duplicate (field,op) + }, + }) + if err == nil { + t.Fatal("a repeated (field, operator) must be rejected, not silently overwritten") + } +} + +func TestPlan_BadCollectionDateRejected(t *testing.T) { + c := newConnEfetch(t, newFakeDoer()) + _, err := c.Plan(contract.NormalizedQuery{ + SourceID: "ncbi-virus", + Operation: "search", + Filters: []contract.Filter{ + {Field: "organism_taxon_id", Operator: "eq", Value: "3052462"}, + {Field: "collection_date", Operator: "gte", Value: "not-a-date"}, + }, + }) + if err == nil { + t.Fatal("an unparseable collection_date must be rejected at plan time") + } +} + +// --- Retrieve: FilteredState with an exact match count ---------------------- + +// localQuery is a search over taxon 3052462 with the given local filters. +func localQuery(local ...contract.Filter) contract.NormalizedQuery { + return contract.NormalizedQuery{ + SourceID: "ncbi-virus", + Operation: "search", + Filters: append([]contract.Filter{ + {Field: "organism_taxon_id", Operator: "eq", Value: "3052462"}, + }, local...), + } +} + +func TestRetrieve_LocalFilter_FilteredStateExactCount(t *testing.T) { + f := newFakeDoer() + // 6 candidates; only A1 and A5 satisfy length>=15200 AND date in [2014-01-01,2014-06-20]. + f.set("GET", listURLForTaxon(t, "3052462", 1000), 200, + listPageNDJSON("A1", "A2", "A3", "A4", "A5", "A6"), hdr("6", "", "18.30.1")) + f.set("GET", reportURL("A1"), 200, reportLine("A1", 18000, "x", "2014-03-15"), hdr("", "", "18.30.1")) // match + f.set("GET", reportURL("A2"), 200, reportLine("A2", 15000, "x", "2014-03-15"), hdr("", "", "18.30.1")) // too short + f.set("GET", reportURL("A3"), 200, reportLine("A3", 16000, "x", "2013-12-01"), hdr("", "", "18.30.1")) // before window + f.set("GET", reportURL("A4"), 200, reportLine("A4", 16000, "x", "2014-07-01"), hdr("", "", "18.30.1")) // after window + f.set("GET", reportURL("A5"), 200, reportLine("A5", 16000, "x", "2014-02"), hdr("", "", "18.30.1")) // partial date, in window + f.set("GET", reportURL("A6"), 200, reportLine("A6", 16000, "x", ""), hdr("", "", "18.30.1")) // undated -> excluded + + c := newConnEfetch(t, f) + q := localQuery( + contract.Filter{Field: "sequence_length", Operator: "gte", Value: "15200"}, + contract.Filter{Field: "collection_date", Operator: "gte", Value: "2014-01-01"}, + contract.Filter{Field: "collection_date", Operator: "lte", Value: "2014-06-20"}, + ) + p := retrievePlan(t, c, q) + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.FilteredState { + t.Fatalf("want FilteredState, got %s (%s)", comp.State, comp.Reason) + } + if comp.AuthoritativeCount != 2 { + t.Fatalf("exact match count must be 2 (A1,A5), got %d — reason: %s", comp.AuthoritativeCount, comp.Reason) + } + if comp.ReconciledCount != 2 { + t.Fatalf("returned count must be 2, got %d", comp.ReconciledCount) + } + if got := accsOf(rs); len(got) != 2 || got[0] != "A1" || got[1] != "A5" { + t.Fatalf("matched records must be [A1 A5] (accession-sorted), got %v", got) + } + if !comp.IsExact() { + t.Fatal("a FilteredState result must report IsExact()=true") + } + if comp.IsComplete() { + t.Fatal("a FilteredState result must NOT report IsComplete()=true") + } +} + +// TestRetrieve_LocalFilter_FullScanIgnoresLimit is the load-bearing benchmark case: a +// `--limit 1` query still scans the WHOLE candidate set so the exact match count is +// reported (via AuthoritativeCount), with only the returned records truncated. +func TestRetrieve_LocalFilter_FullScanIgnoresLimit(t *testing.T) { + f := newFakeDoer() + f.set("GET", listURLForTaxon(t, "3052462", 1000), 200, + listPageNDJSON("A1", "A2", "A3"), hdr("3", "", "18.30.1")) + f.set("GET", reportURL("A1"), 200, reportLine("A1", 18000, "x", "2014-03-15"), hdr("", "", "18.30.1")) // match + f.set("GET", reportURL("A2"), 200, reportLine("A2", 18000, "x", "2014-03-16"), hdr("", "", "18.30.1")) // match + f.set("GET", reportURL("A3"), 200, reportLine("A3", 10000, "x", "2014-03-17"), hdr("", "", "18.30.1")) // too short + + c := newConnEfetch(t, f) + q := localQuery(contract.Filter{Field: "sequence_length", Operator: "gte", Value: "15200"}) + q.Limit = 1 + p := retrievePlan(t, c, q) + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.FilteredState { + t.Fatalf("want FilteredState even under a limit, got %s", comp.State) + } + if comp.AuthoritativeCount != 2 { + t.Fatalf("the FULL match count (2) must survive the limit, got %d", comp.AuthoritativeCount) + } + if comp.ReconciledCount != 1 || len(rs.Records) != 1 { + t.Fatalf("the output (not the count) must be limited to 1, got reconciled=%d records=%d", comp.ReconciledCount, len(rs.Records)) + } + if accsOf(rs)[0] != "A1" { + t.Fatalf("the single returned record must be the first sorted match (A1), got %v", accsOf(rs)) + } +} + +// TestRetrieve_LocalFilter_MaxAmbiguousFetchesOnlySurvivors proves the sequence is +// fetched ONLY for records that pass the cheap report-only predicates, and the +// ambiguous-base ceiling drops the noisy one. +func TestRetrieve_LocalFilter_MaxAmbiguousFetchesOnlySurvivors(t *testing.T) { + f := newFakeDoer() + f.set("GET", listURLForTaxon(t, "3052462", 1000), 200, + listPageNDJSON("B1", "B2", "B3"), hdr("3", "", "18.30.1")) + f.set("GET", reportURL("B1"), 200, reportLine("B1", 18000, "x", "2014-03-15"), hdr("", "", "18.30.1")) // passes report-only + f.set("GET", reportURL("B2"), 200, reportLine("B2", 18000, "x", "2014-03-15"), hdr("", "", "18.30.1")) // passes report-only + f.set("GET", reportURL("B3"), 200, reportLine("B3", 10000, "x", "2014-03-15"), hdr("", "", "18.30.1")) // too short -> no efetch + // B1 has 2 N's (<=5, kept), B2 has 10 N's (>5, dropped). B3's efetch is deliberately + // NOT registered: if the connector fetched it, the unrouted 404 would fail the test. + f.set("GET", efetchURL("B1"), 200, fastaBody("B1", 2), nil) + f.set("GET", efetchURL("B2"), 200, fastaBody("B2", 10), nil) + + c := newConnEfetch(t, f) + q := localQuery( + contract.Filter{Field: "sequence_length", Operator: "gte", Value: "15200"}, + contract.Filter{Field: "max_ambiguous_chars", Operator: "lte", Value: "5"}, + ) + p := retrievePlan(t, c, q) + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.FilteredState || comp.AuthoritativeCount != 1 { + t.Fatalf("want Filtered with 1 match (B1), got %s count=%d (%s)", comp.State, comp.AuthoritativeCount, comp.Reason) + } + if got := accsOf(rs); len(got) != 1 || got[0] != "B1" { + t.Fatalf("only B1 should survive the ambiguous-base ceiling, got %v", got) + } + // Exactly two efetch calls (B1, B2). B3 was dropped by the cheap predicate first. + var efetchCalls int + for _, r := range f.requests { + if strings.Contains(r, "efetch.fcgi") { + efetchCalls++ + } + } + if efetchCalls != 2 { + t.Fatalf("sequence must be fetched only for report-survivors (B1,B2) => 2 calls, got %d", efetchCalls) + } +} + +func TestRetrieve_LocalFilter_ExcludeLabPassaged(t *testing.T) { + f := newFakeDoer() + f.set("GET", listURLForTaxon(t, "3052462", 1000), 200, + listPageNDJSON("C1", "C2"), hdr("2", "", "18.30.1")) + f.set("GET", reportURL("C1"), 200, reportLine("C1", 18000, "Ebola/field/2014", "2014-03-15"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("C2"), 200, reportLine("C2", 18000, "Ebola/Vero-passage/2014", "2014-03-15"), hdr("", "", "18.30.1")) + + c := newConnEfetch(t, f) + q := localQuery(contract.Filter{Field: "exclude_lab_passaged", Operator: "eq", Value: "true"}) + p := retrievePlan(t, c, q) + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.AuthoritativeCount != 1 || accsOf(rs)[0] != "C1" { + t.Fatalf("the lab-passaged isolate (C2) must be excluded; want [C1], got %v (count=%d)", accsOf(rs), comp.AuthoritativeCount) + } +} + +// TestRetrieve_LocalFilter_UnreconciledCandidateDowngrades: if the candidate scan +// itself does not reconcile (a report 404s), the filtered result cannot claim to be +// exhaustive, so it downgrades to BestEffort rather than Filtered. +func TestRetrieve_LocalFilter_UnreconciledCandidateDowngrades(t *testing.T) { + f := newFakeDoer() + f.set("GET", listURLForTaxon(t, "3052462", 1000), 200, + listPageNDJSON("A1", "A2", "A3"), hdr("3", "", "18.30.1")) // claims 3 + f.set("GET", reportURL("A1"), 200, reportLine("A1", 18000, "x", "2014-03-15"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("A2"), 200, reportLine("A2", 18000, "x", "2014-03-15"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("A3"), 404, nil, hdr("", "", "18.30.1")) // missing -> candidate scan != authoritative + + c := newConnEfetch(t, f) + q := localQuery(contract.Filter{Field: "sequence_length", Operator: "gte", Value: "15200"}) + p := retrievePlan(t, c, q) + _, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.BestEffortState { + t.Fatalf("an unreconciled candidate scan must downgrade a filtered result to BestEffort, got %s", comp.State) + } +} + +// TestRetrieve_NoLocalFilters_UnchangedComplete confirms the legacy path is untouched +// when no local predicates are present (a plain reconciled Complete). +func TestRetrieve_NoLocalFilters_UnchangedComplete(t *testing.T) { + f := newFakeDoer() + f.set("GET", listURLForTaxon(t, "3052462", 1000), 200, + listPageNDJSON("A1", "A2"), hdr("2", "", "18.30.1")) + f.set("GET", reportURL("A1"), 200, reportLine("A1", 18000, "x", "2014-03-15"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("A2"), 200, reportLine("A2", 15000, "x", "2014-03-15"), hdr("", "", "18.30.1")) + + c := newConnEfetch(t, f) + p := retrievePlan(t, c, localQuery()) // taxon only, no local filters + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.CompleteState || comp.ReconciledCount != 2 || len(rs.Records) != 2 { + t.Fatalf("no-local-filter path must stay a plain Complete(2), got %s reconciled=%d records=%d", comp.State, comp.ReconciledCount, len(rs.Records)) + } +} + +// --- predicate unit tests -------------------------------------------------- + +func TestNormalizeDate(t *testing.T) { + cases := []struct { + in string + lo, hi int + ok bool + }{ + {"2014-06-20", 20140620, 20140620, true}, + {"2014-02", 20140201, 20140228, true}, // non-leap Feb widens to the 28th + {"2016-02", 20160201, 20160229, true}, // leap Feb widens to the 29th + {"2014", 20140101, 20141231, true}, + {"2014-03-15T12:00:00Z", 20140315, 20140315, true}, + {"2016-02-29", 20160229, 20160229, true}, // valid leap day + {"", 0, 0, false}, + {"not-a-date", 0, 0, false}, + {"2014-13", 0, 0, false}, + {"2014-00", 0, 0, false}, + {"2014-06-32", 0, 0, false}, + {"2014-02-31", 0, 0, false}, // impossible day rejected + {"2014-04-31", 0, 0, false}, // April has 30 days + {"2014-02-29", 0, 0, false}, // 2014 is not a leap year + {"2014-06-", 0, 0, false}, + } + for _, tc := range cases { + lo, hi, ok := normalizeDate(tc.in) + if ok != tc.ok || (ok && (lo != tc.lo || hi != tc.hi)) { + t.Errorf("normalizeDate(%q) = (%d,%d,%v), want (%d,%d,%v)", tc.in, lo, hi, ok, tc.lo, tc.hi, tc.ok) + } + } +} + +func TestDateMatches(t *testing.T) { + gte := 20140101 + lte := 20140620 + cases := []struct { + date string + want bool + }{ + {"2014-03-15", true}, + {"2014-01-01", true}, // inclusive lower + {"2014-06-20", true}, // inclusive upper + {"2013-12-31", false}, // before + {"2014-06-21", false}, // after + {"2014-02", true}, // partial overlaps + {"2014", true}, // whole-year interval overlaps the window + {"2015", false}, // whole-year after the window + {"", false}, // undated excluded when a bound is present + } + for _, tc := range cases { + if got := dateMatches(tc.date, >e, <e); got != tc.want { + t.Errorf("dateMatches(%q) = %v, want %v", tc.date, got, tc.want) + } + } + // No bounds => always matches, even undated. + if !dateMatches("", nil, nil) { + t.Error("dateMatches with no bounds must accept any value") + } +} + +func TestLengthMatches(t *testing.T) { + min := int64(15200) + if lengthMatches("", &min, nil) { + t.Error("a missing length cannot satisfy a length bound") + } + if !lengthMatches("15200", &min, nil) { + t.Error("length == min must satisfy gte") + } + if lengthMatches("15199", &min, nil) { + t.Error("length < min must fail gte") + } + max := int64(20000) + if !lengthMatches("18000", &min, &max) { + t.Error("length within [min,max] must match") + } + if lengthMatches("21000", &min, &max) { + t.Error("length > max must fail lte") + } + if !lengthMatches("", nil, nil) { + t.Error("no bound => unconstrained, even for a missing length") + } +} + +func TestCountAmbiguous(t *testing.T) { + if n := countAmbiguous("ACGTACGT"); n != 0 { + t.Errorf("clean sequence => 0 ambiguous, got %d", n) + } + if n := countAmbiguous("ACGTNNRYacgtn"); n != 5 { // N,N,R,Y + lowercase n + t.Errorf("expected 5 ambiguous (NNRY + n), got %d", n) + } +} + +func TestIsLabPassaged(t *testing.T) { + for _, name := range []string{"Ebola/Vero/2014", "isolate passaged x", "tissue culture clone", "Cultured-2014"} { + if !isLabPassaged(name) { + t.Errorf("%q should be detected as lab-passaged", name) + } + } + for _, name := range []string{"Ebola/H.sapiens/SLE/2014", "Makona-G3686"} { + if isLabPassaged(name) { + t.Errorf("%q should NOT be flagged as lab-passaged", name) + } + } +} diff --git a/connectors/ncbivirus/ncbivirus.go b/connectors/ncbivirus/ncbivirus.go index 8a64f98..ffb8331 100644 --- a/connectors/ncbivirus/ncbivirus.go +++ b/connectors/ncbivirus/ncbivirus.go @@ -75,6 +75,7 @@ type httpDoer interface { // lifetime of the connector. type Connector struct { rest string // REST base, e.g. https://api.ncbi.nlm.nih.gov + efetch string // E-utilities base for the sequence fetch, e.g. https://eutils.ncbi.nlm.nih.gov ("" if not configured) doer httpDoer pageSize int64 reconcile bool @@ -93,12 +94,17 @@ func New(s connectors.Spec, doer httpDoer) (*Connector, error) { return nil, fmt.Errorf("ncbivirus: spec must declare endpoints[\"rest\"] (got %q)", rest) } rest = strings.TrimRight(rest, "/") + // efetch is OPTIONAL: it is only needed to evaluate the local max_ambiguous_chars + // predicate. Absent, that one filter is rejected at plan time; every other filter + // (including the other local ones) works without it. + efetch := strings.TrimRight(s.Endpoints["efetch"], "/") ps := int64(s.Pagination.PageSize) if ps <= 0 || ps > maxPageSize { ps = maxPageSize } return &Connector{ rest: rest, + efetch: efetch, doer: doer, pageSize: ps, reconcile: s.Pagination.ReconcileCount, @@ -119,11 +125,27 @@ type planSteps struct { ListURL string `json:"list_url,omitempty"` // IDs is the explicit accession list for a `get`/`resolve`. Empty for `search`. IDs []string `json:"ids,omitempty"` + // LocalFilters are the predicates the upstream API cannot evaluate server-side + // (sequence_length, collection_date range, max_ambiguous_chars, + // exclude_lab_passaged). Retrieve applies them to the COMPLETE reconciled + // candidate set after the walk; the server-side filters live in ListURL. Empty + // for the common case (no local predicates) — then Retrieve's behavior is + // byte-for-byte the legacy path. + LocalFilters []localFilter `json:"local_filters,omitempty"` // PageSize / Reconcile snapshot the policy at plan time. PageSize int64 `json:"page_size"` Reconcile bool `json:"reconcile"` } +// localFilter is one normalized predicate evaluated locally by Retrieve (not sent +// upstream). Field/Op/Value mirror contract.Filter; the value is interpreted per +// the field's declared type (see applyLocalFilters). +type localFilter struct { + Field string `json:"field"` + Op string `json:"op"` + Value string `json:"value"` +} + // Plan resolves a NormalizedQuery into an execution Plan. It MUST NOT retrieve. // // For get/resolve the accession list is authoritative (total = len(IDs)). For @@ -138,13 +160,16 @@ func (c *Connector) Plan(q contract.NormalizedQuery) (contract.Plan, error) { case "get", "resolve": st.IDs = append([]string(nil), q.IDs...) default: // "search" (any unknown verb is treated as a filter-driven search) - u, err := c.buildListURL(q.Filters) + u, local, err := c.planSearch(q.Filters) if err != nil { - // Caller-fixable: a missing required organism_taxon_id, or a blank/multi - // taxon value. 422, not 500 — so an agent fixes the query, not retries it. + // Caller-fixable: a missing required organism_taxon_id, a blank/multi taxon + // value, or a local predicate (max_ambiguous_chars) requiring an endpoint + // this deployment did not configure. 422, not 500 — so an agent fixes the + // query, not retries it. return contract.Plan{}, contract.UnsatisfiableQuery(fmt.Errorf("ncbivirus: plan: %w", err)) } st.ListURL = u + st.LocalFilters = local } steps, err := json.Marshal(st) if err != nil { @@ -192,12 +217,28 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. searchPath := len(st.IDs) == 0 + // hasLocal: the query carries predicates the upstream cannot evaluate (sequence + // length / collection-date range / ambiguous-char ceiling / lab-passaged). When + // set, the candidate set must be scanned IN FULL (the match count is unknowable + // without examining every candidate), so the limit becomes an OUTPUT cap only — it + // must not short-circuit the walk or truncate the candidate accession list, or the + // local match count would be wrong. + hasLocal := len(st.LocalFilters) > 0 + // limit is the caller's hard ceiling (NormalizedQuery.Limit), honored as a WALK // CEILING per the contract: a huge upstream is never fully fetched for a small // limit. A limit below the authoritative total makes the result necessarily // incomplete (the Complete gate below still requires fetched == authority), so a // limit can never produce a false Complete — only a BestEffort partial. 0 = unlimited. + // + // walkLimit is the ceiling applied to the WALK/candidate fetch. With local filters + // it is forced to 0 (unlimited) so the whole candidate set is scanned; the caller's + // `limit` is then applied only to the matched OUTPUT (see the hasLocal branch). limit := p.Query.Limit + walkLimit := limit + if hasLocal { + walkLimit = 0 + } var ( ids []string @@ -271,6 +312,8 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. startCount = total startVersion = version totalKnown = known + endCount = total + authority = total if known && total > 0 { // A total-derived page budget (+2 margin for a slightly-growing index), // used only when it is below the hard cap. Guarding on the QUOTIENT @@ -281,18 +324,25 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. } } } else { - // Compare EVERY page against page 0, so a mid-walk mutation is caught even - // when a later page returns to the original value (an A->B->A walk). - if !known || total != startCount { - totalMutated = true + // NCBI Datasets returns x-ncbi-total-count ONLY on the FIRST page; on a + // continuation page its ABSENCE is expected and must NOT be read as a + // mutation or reset the authoritative total to zero (doing so wrongly + // downgraded every multi-page walk). Only a total that is PRESENT and + // DIFFERS from the first-page total is a genuine mid-walk index mutation + // (caught even on an A->B->A walk because every present page is compared to + // page 0). + if known { + if total != startCount { + totalMutated = true + } + endCount = total + authority = total } if version != startVersion { versionDrift = true } } - endCount = total listVersion = version - authority = total for _, id := range pageIDs { if seen[id] { sawDuplicate = true @@ -307,7 +357,9 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. // incomplete (fetched <= limit < authority => BestEffort), so the limit can // never mask an over-observation into a false Complete. When limit >= authority // the limit is non-constraining and the normal cover/total logic applies. - if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) >= limit { + // walkLimit (not limit) is used so a local-filter query — which forces + // walkLimit=0 — always drains the full candidate set for an exact match count. + if walkLimit > 0 && authority > 0 && walkLimit < authority && int64(len(ids)) >= walkLimit { break } // Drain the cursor to genuine exhaustion (no next-page token) rather than @@ -346,8 +398,10 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. // drop an OVER-observed id (the page returned more rows than the total claims) and // fake fetched == authority — a false Complete. When limit >= authority we keep every // observed id so an under-reported total still downgrades. - if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) > limit { - ids = ids[:limit] + // walkLimit (not limit) is used so a local-filter query keeps the FULL candidate + // list (walkLimit=0) and applies the caller's limit only to the matched output. + if walkLimit > 0 && authority > 0 && walkLimit < authority && int64(len(ids)) > walkLimit { + ids = ids[:walkLimit] } // Fetch one verbatim per-accession ndjson report line per DISTINCT accession. @@ -405,13 +459,24 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract. } fetched := int64(len(raws)) + // candidateReconciled: the authoritative total was KNOWN and stable across the + // whole (bounded, re-list-free) walk, the version was coherent, and every record + // was fetched. It gates both a true Complete (no local filters) and a trustworthy + // Filtered (local filters applied to a fully-scanned, reconciled candidate set). + candidateReconciled := st.Reconcile && totalKnown && !walkTruncated && !sawDuplicate && !totalMutated && !versionDrift && observedDistinct == authority && fetched == authority + + // Local-filter path: apply the predicates the upstream cannot to the fully-scanned + // candidate set and report a Filtered result (exact match count) when the scan + // reconciled, else a BestEffort (the scan itself was incomplete, so exhaustiveness + // cannot be claimed). + if hasLocal { + return c.finishLocal(gov, rs, raws, st.LocalFilters, candidateReconciled, authority, fetched, limit) + } + // limitApplied: the caller's limit constrained the walk below the authoritative // total. Reported as the BestEffort reason (the count is still authoritative). limitApplied := limit > 0 && totalKnown && authority > 0 && limit < authority - if st.Reconcile && totalKnown && !walkTruncated && !sawDuplicate && !totalMutated && !versionDrift && observedDistinct == authority && fetched == authority { - // Reconciled: the authoritative total was KNOWN and stable across the whole - // (bounded, re-list-free) walk, the version was coherent, and every record was - // fetched. Only then is the result a true Complete. + if candidateReconciled { return rs, contract.NewComplete(fetched, authority), nil } @@ -841,61 +906,105 @@ func jsonNumberOrNil(n json.Number) any { // --- list URL construction ------------------------------------------------- -// buildListURL constructs the first-page dataset_report list URL from the normalized -// filters. organism_taxon_id selects the /taxon/{id} path; every other filter renders -// a filter.* query param via the same ordered rule table FilterSemantics publishes. -// An empty filter set with no taxon is rejected — a taxon (path or filter) is required -// to scope the list. page_size is always pinned. There is NO sort to inject: the virus -// dataset_report exposes none, so reproducibility is imposed downstream. -func (c *Connector) buildListURL(filters []contract.Filter) (string, error) { - taxon, params, err := filtersToListParams(filters) +// planSearch constructs the first-page dataset_report list URL from the SERVER-side +// filters and returns the LOCAL predicates separately (for Retrieve to apply after +// the walk). organism_taxon_id selects the /taxon/{id} path; every other server +// filter renders a filter.* query param via the same ordered rule table +// FilterSemantics publishes; local rules (no upstream param) are collected. An empty +// filter set with no taxon is rejected — a taxon is required to scope the list. +// page_size is always pinned. There is NO sort to inject: the virus dataset_report +// exposes none, so reproducibility is imposed downstream. +func (c *Connector) planSearch(filters []contract.Filter) (listURL string, local []localFilter, err error) { + taxon, params, local, err := filtersToPlan(filters) if err != nil { - return "", err + return "", nil, err } if taxon == "" { - return "", fmt.Errorf("ncbivirus: a search requires an organism_taxon_id filter (the taxon path segment)") + return "", nil, fmt.Errorf("ncbivirus: a search requires an organism_taxon_id filter (the taxon path segment)") + } + // Validate local predicate VALUES at plan time (caller-fixable). The surface + // catalog already checked field/operator/JSON-type, but not domain specifics: + // max_ambiguous_chars needs the efetch endpoint, collection_date needs a parseable + // date, and the integer bounds must be non-negative integers. + for _, lf := range local { + switch lf.Field { + case fieldMaxAmbiguous: + if c.efetch == "" { + return "", nil, fmt.Errorf("ncbivirus: filter %q requires the efetch endpoint, which is not configured for this source", fieldMaxAmbiguous) + } + if _, ok := parseNonNegInt(lf.Value); !ok { + return "", nil, fmt.Errorf("ncbivirus: filter %q value %q must be a non-negative integer", lf.Field, lf.Value) + } + case fieldSeqLength: + if _, ok := parseNonNegInt(lf.Value); !ok { + return "", nil, fmt.Errorf("ncbivirus: filter %q value %q must be a non-negative integer", lf.Field, lf.Value) + } + case fieldCollectionDate: + if _, _, ok := normalizeDate(lf.Value); !ok { + return "", nil, fmt.Errorf("ncbivirus: filter %q value %q is not a parseable date (want YYYY, YYYY-MM, or YYYY-MM-DD)", lf.Field, lf.Value) + } + } } v := url.Values{} for k, val := range params { v.Set(k, val) } v.Set("page_size", strconv.FormatInt(c.pageSize, 10)) - return c.rest + "/datasets/v2/virus/taxon/" + url.PathEscape(taxon) + "/dataset_report?" + v.Encode(), nil + return c.rest + "/datasets/v2/virus/taxon/" + url.PathEscape(taxon) + "/dataset_report?" + v.Encode(), local, nil } -// filtersToListParams translates normalized filters into the taxon path segment plus -// a map of filter.* query params, using the same ordered rule table FilterSemantics -// publishes. An unknown field or disallowed operator is a hard error so a caller's -// filter is never silently dropped. The taxon rule maps to the path segment, not a -// query param. Multi-value `in` (taxon) is rejected — the path accepts one taxon. -func filtersToListParams(filters []contract.Filter) (taxon string, params map[string]string, err error) { +// filtersToPlan translates normalized filters into the taxon path segment, a map of +// server-side filter.* query params, and the list of LOCAL predicates, using the +// ordered rule table FilterSemantics publishes. Rules are matched by (field, +// operator) so a field may legitimately appear with more than one operator (e.g. a +// two-sided range: sequence_length gte AND sequence_length lte). An unknown field or +// an unsupported operator for a known field is a hard error so a caller's filter is +// never silently dropped. The taxon rule maps to the path segment; server rules map +// to query params; local rules are returned for post-walk evaluation. +func filtersToPlan(filters []contract.Filter) (taxon string, params map[string]string, local []localFilter, err error) { params = make(map[string]string) - byField := make(map[string]rule, len(defaultRules())) - for _, r := range defaultRules() { - byField[r.Field] = r + rules := defaultRules() + byFieldOp := make(map[string]rule, len(rules)) + known := make(map[string]bool, len(rules)) + for _, r := range rules { + byFieldOp[r.Field+"\x00"+r.Op] = r + known[r.Field] = true } + seen := make(map[string]bool, len(filters)) for _, f := range filters { - r, ok := byField[f.Field] + key := f.Field + "\x00" + f.Operator + r, ok := byFieldOp[key] if !ok { - return "", nil, fmt.Errorf("ncbivirus: unknown filter field %q", f.Field) + if known[f.Field] { + return "", nil, nil, fmt.Errorf("ncbivirus: filter field %q does not support operator %q", f.Field, f.Operator) + } + return "", nil, nil, fmt.Errorf("ncbivirus: unknown filter field %q", f.Field) } - if f.Operator != r.Op { - return "", nil, fmt.Errorf("ncbivirus: filter field %q uses operator %q; only %q is supported", f.Field, f.Operator, r.Op) + // Reject a repeated (field, operator): a second bound under the SAME operator + // would silently overwrite the first (last-write-wins) and could RELAX a + // constraint, corrupting the "exact" local-match count. A two-sided range uses + // two DIFFERENT operators (gte + lte) and is unaffected. + if seen[key] { + return "", nil, nil, fmt.Errorf("ncbivirus: filter field %q specified more than once with operator %q", f.Field, f.Operator) } - if r.PathTaxon { + seen[key] = true + switch { + case r.PathTaxon: val := strings.TrimSpace(f.Value) if val == "" || strings.Contains(val, ",") { - return "", nil, fmt.Errorf("ncbivirus: filter field %q must carry exactly one taxon value (the path accepts one taxon)", f.Field) + return "", nil, nil, fmt.Errorf("ncbivirus: filter field %q must carry exactly one taxon value (the path accepts one taxon)", f.Field) } if taxon != "" { - return "", nil, fmt.Errorf("ncbivirus: filter field %q specified more than once", f.Field) + return "", nil, nil, fmt.Errorf("ncbivirus: filter field %q specified more than once", f.Field) } taxon = val - continue + case r.Local: + local = append(local, localFilter{Field: r.Field, Op: r.Op, Value: f.Value}) + default: + params[r.QueryParam] = coerceValue(r.Type, f.Value) } - params[r.QueryParam] = coerceValue(r.Type, f.Value) } - return taxon, params, nil + return taxon, params, local, nil } // coerceValue maps a normalized filter value to the token NCBI expects for the rule's @@ -923,11 +1032,22 @@ type rule struct { Field string `json:"field"` // normalized field the caller uses Op string `json:"op"` // normalized operator the caller uses PathTaxon bool `json:"path_taxon,omitempty"` // true => maps to the /taxon/{v} path segment - QueryParam string `json:"query_param,omitempty"` // upstream filter.* query param (when not PathTaxon) + QueryParam string `json:"query_param,omitempty"` // upstream filter.* query param (when server-side) + Local bool `json:"local,omitempty"` // true => evaluated locally by Retrieve, NOT sent upstream Type string `json:"type"` // JSON-Schema type for catalog Enum []string `json:"enum,omitempty"` } +// Local-filter field names: predicates the NCBI Datasets dataset_report cannot +// evaluate server-side, so Retrieve applies them to the complete reconciled +// candidate set. These are the filters VirBench's hard queries exercise. +const ( + fieldSeqLength = "sequence_length" // nucleotide length bound (gte/lte), from the report's length + fieldCollectionDate = "collection_date" // sample collection-date range (gte/lte), from isolate.collection_date + fieldMaxAmbiguous = "max_ambiguous_chars" // ceiling on ambiguous (non-ACGTU) bases (lte), counted from the sequence + fieldExcludeLab = "exclude_lab_passaged" // drop lab-passaged isolates (eq true), by isolate-name heuristic +) + // defaultRules is the ORDERED rule table. Order is frozen; changing it (or any rule) // changes the semantics Hash, which is the intended drift-detection signal. The virus // dataset_report has no sort knob, so no rule encodes one. organism_taxon_id is the @@ -985,6 +1105,45 @@ func defaultRules() []rule { QueryParam: "filter.geo_location", Type: "string", }, + // --- LOCAL predicates (evaluated by Retrieve over the complete reconciled + // candidate set; the Datasets dataset_report has no server-side equivalent). --- + { + Field: fieldSeqLength, + Op: "gte", + Local: true, + Type: "integer", + }, + { + Field: fieldSeqLength, + Op: "lte", + Local: true, + Type: "integer", + }, + { + Field: fieldCollectionDate, + Op: "gte", + Local: true, + Type: "string", + }, + { + Field: fieldCollectionDate, + Op: "lte", + Local: true, + Type: "string", + }, + { + Field: fieldMaxAmbiguous, + Op: "lte", + Local: true, + Type: "integer", + }, + { + Field: fieldExcludeLab, + Op: "eq", + Local: true, + Type: "boolean", + Enum: []string{"true", "false"}, + }, } } @@ -1008,20 +1167,433 @@ func buildSemantics() contract.SemanticsSpec { } // rulesToFields projects the ordered rule table into the catalog SemanticField list -// (used to publish the per-source JSON Schema). It is derived from the same ordered -// slice so it never leaks map order. +// (used to publish the per-source JSON Schema). A field that appears under more than +// one operator (e.g. a two-sided range) is emitted ONCE with its operators merged, +// in first-seen order, so the published catalog lists each field exactly once with +// every operator it accepts. Order is taken from the ordered rule slice so it never +// leaks map order. func rulesToFields(rules []rule) []contract.SemanticField { out := make([]contract.SemanticField, 0, len(rules)) + idx := make(map[string]int, len(rules)) for _, r := range rules { + if i, ok := idx[r.Field]; ok { + f := &out[i] + if !containsString(f.Operators, r.Op) { + f.Operators = append(f.Operators, r.Op) + } + for _, e := range r.Enum { + if !containsString(f.Enum, e) { + f.Enum = append(f.Enum, e) + } + } + continue + } + idx[r.Field] = len(out) out = append(out, contract.SemanticField{ Name: r.Field, Type: r.Type, Operators: []string{r.Op}, - Enum: r.Enum, + Enum: append([]string(nil), r.Enum...), }) } return out } +// containsString reports whether ss contains s. +func containsString(ss []string, s string) bool { + for _, x := range ss { + if x == s { + return true + } + } + return false +} + +// --- local filtering (post-walk predicates the upstream cannot evaluate) ---- + +// finishLocal applies the local predicates to the fully-scanned candidate set and +// builds the final result. The whole candidate set was scanned (the walk ignored the +// limit), so the matched set is EXHAUSTIVE and the match count is EXACT. When the +// candidate scan reconciled against the authoritative total, the result is Filtered +// (an exact, reproducible match count); otherwise it is BestEffort, because +// exhaustiveness cannot be claimed over a scan that did not itself reconcile. The +// caller's limit caps only the RETURNED records — never the match count — so a +// `--limit 1` count query still reports the true total via AuthoritativeCount. +func (c *Connector) finishLocal(gov *contract.Governor, rs contract.RecordSet, candidates []contract.RawRecord, filters []localFilter, candidateReconciled bool, candidateTotal, candidateFetched, limit int64) (contract.RecordSet, contract.Completeness, error) { + matched, err := c.applyLocalFilters(gov, candidates, filters) + if err != nil { + return contract.RecordSet{}, contract.Completeness{}, err + } + matchedCount := int64(len(matched)) + returned := matched + if limit > 0 && matchedCount > limit { + returned = matched[:limit] + } + rs.Records = returned + desc := describeLocalFilters(filters) + if candidateReconciled { + reason := fmt.Sprintf("ncbivirus: scanned the complete reconciled candidate set of %d record(s); %d matched the local predicate(s) [%s]; returned %d (exhaustive and reproducible)", candidateTotal, matchedCount, desc, len(returned)) + return rs, contract.NewFiltered(matchedCount, int64(len(returned)), reason), nil + } + reason := fmt.Sprintf("ncbivirus: local predicate(s) [%s] applied to a candidate set that did NOT reconcile (scanned %d of an authoritative %d); %d of those scanned matched — not trustworthy as an exhaustive count", desc, candidateFetched, candidateTotal, matchedCount) + return rs, contract.NewBestEffort(int64(len(returned)), reason), nil +} + +// applyLocalFilters returns the subset of candidates passing EVERY local predicate. +// The cheap report-only predicates (length, collection-date, lab-passaged) are +// evaluated first; the sequence is fetched (for max_ambiguous_chars) ONLY for the +// records that survive them, so a narrow query fetches few sequences. The input order +// (already accession-sorted) is preserved. +func (c *Connector) applyLocalFilters(gov *contract.Governor, candidates []contract.RawRecord, filters []localFilter) ([]contract.RawRecord, error) { + pred, err := parsePredicates(filters) + if err != nil { + return nil, err + } + out := make([]contract.RawRecord, 0, len(candidates)) + for _, rr := range candidates { + var d reportDoc + dec := json.NewDecoder(bytes.NewReader(rr.Raw)) + dec.UseNumber() + if err := dec.Decode(&d); err != nil { + return nil, fmt.Errorf("ncbivirus: local filter: decode report %q: %w", rr.SourceAccession, err) + } + if !lengthMatches(d.Length, pred.seqMin, pred.seqMax) { + continue + } + if !dateMatches(d.Isolate.CollectionDate, pred.dateGteLo, pred.dateLteHi) { + continue + } + if pred.excludeLab && isLabPassaged(d.Isolate.Name) { + continue + } + if pred.maxAmbiguous != nil { + seq, err := c.fetchSequence(gov, rr.SourceAccession) + if err != nil { + return nil, err + } + if countAmbiguous(seq) > *pred.maxAmbiguous { + continue + } + } + out = append(out, rr) + } + return out, nil +} + +// localPredicates is the parsed, typed form of the local filters. nil pointers mean +// "no constraint of this kind". +type localPredicates struct { + seqMin, seqMax *int64 + dateGteLo *int // record's latest possible day (hi) must be >= this yyyymmdd + dateLteHi *int // record's earliest possible day (lo) must be <= this yyyymmdd + maxAmbiguous *int64 + excludeLab bool +} + +// parsePredicates parses the normalized local filters into typed bounds. Values were +// already validated at plan time, but it re-checks defensively and fails loudly on a +// shape it does not recognize (rather than silently ignoring a predicate). +func parsePredicates(filters []localFilter) (localPredicates, error) { + var p localPredicates + for _, f := range filters { + switch { + case f.Field == fieldSeqLength && f.Op == "gte": + n, ok := parseNonNegInt(f.Value) + if !ok { + return p, fmt.Errorf("ncbivirus: %q gte value %q is not a non-negative integer", f.Field, f.Value) + } + v := n + p.seqMin = &v + case f.Field == fieldSeqLength && f.Op == "lte": + n, ok := parseNonNegInt(f.Value) + if !ok { + return p, fmt.Errorf("ncbivirus: %q lte value %q is not a non-negative integer", f.Field, f.Value) + } + v := n + p.seqMax = &v + case f.Field == fieldCollectionDate && f.Op == "gte": + lo, _, ok := normalizeDate(f.Value) + if !ok { + return p, fmt.Errorf("ncbivirus: %q gte value %q is not a parseable date", f.Field, f.Value) + } + v := lo + p.dateGteLo = &v + case f.Field == fieldCollectionDate && f.Op == "lte": + _, hi, ok := normalizeDate(f.Value) + if !ok { + return p, fmt.Errorf("ncbivirus: %q lte value %q is not a parseable date", f.Field, f.Value) + } + v := hi + p.dateLteHi = &v + case f.Field == fieldMaxAmbiguous && f.Op == "lte": + n, ok := parseNonNegInt(f.Value) + if !ok { + return p, fmt.Errorf("ncbivirus: %q lte value %q is not a non-negative integer", f.Field, f.Value) + } + v := n + p.maxAmbiguous = &v + case f.Field == fieldExcludeLab && f.Op == "eq": + p.excludeLab = isTruthy(f.Value) + default: + return p, fmt.Errorf("ncbivirus: unsupported local predicate %q %q", f.Field, f.Op) + } + } + return p, nil +} + +// lengthMatches reports whether a record length satisfies the (optional) bounds. With +// no bound it is unconstrained; with a bound, a missing/unparseable length cannot +// satisfy it (the record is excluded). +func lengthMatches(n json.Number, min, max *int64) bool { + if min == nil && max == nil { + return true + } + if n == "" { + return false + } + v, err := strconv.ParseInt(n.String(), 10, 64) + if err != nil { + return false + } + if min != nil && v < *min { + return false + } + if max != nil && v > *max { + return false + } + return true +} + +// dateMatches reports whether a collection_date overlaps the requested range. A +// partial date is treated as the interval of days it could denote (e.g. "2014" => +// [2014-01-01, 2014-12-31]); the record matches `gte X` if its latest possible day is +// on/after X, and `lte Y` if its earliest possible day is on/before Y. With no date +// bound it is unconstrained; with a bound, an undated/unparseable record is excluded. +func dateMatches(date string, gteLo, lteHi *int) bool { + if gteLo == nil && lteHi == nil { + return true + } + lo, hi, ok := normalizeDate(date) + if !ok { + return false + } + if gteLo != nil && hi < *gteLo { + return false + } + if lteHi != nil && lo > *lteHi { + return false + } + return true +} + +// normalizeDate parses YYYY, YYYY-MM, or YYYY-MM-DD (ignoring any trailing time +// component) into the inclusive [lo, hi] interval of yyyymmdd integers it could +// denote. ok is false for an empty or unparseable value. A missing month widens to +// the whole year; a missing day widens to the whole month (using that month's real +// length, leap-year aware); an explicit day is validated against the real number of +// days in its month, so impossible dates like 2014-02-31 are rejected. +func normalizeDate(s string) (lo, hi int, ok bool) { + s = strings.TrimSpace(s) + if s == "" { + return 0, 0, false + } + if i := strings.IndexAny(s, "Tt "); i >= 0 { + s = s[:i] + } + parts := strings.Split(s, "-") + if len(parts) == 0 || len(parts) > 3 { + return 0, 0, false + } + y, err := strconv.Atoi(parts[0]) + if err != nil || y < 1000 || y > 9999 { + return 0, 0, false + } + loM, hiM, loD := 1, 12, 1 + hiD := daysInMonth(y, 12) + if len(parts) >= 2 { + m, err := strconv.Atoi(parts[1]) + if err != nil || m < 1 || m > 12 { + return 0, 0, false + } + loM, hiM = m, m + hiD = daysInMonth(y, m) + } + if len(parts) == 3 { + d, err := strconv.Atoi(parts[2]) + if err != nil || d < 1 || d > daysInMonth(y, hiM) { + return 0, 0, false + } + loD, hiD = d, d + } + return y*10000 + loM*100 + loD, y*10000 + hiM*100 + hiD, true +} + +// daysInMonth returns the number of days in month m (1-12) of year y, accounting +// for leap years. m is assumed already range-checked to 1-12. +func daysInMonth(y, m int) int { + switch m { + case 1, 3, 5, 7, 8, 10, 12: + return 31 + case 4, 6, 9, 11: + return 30 + default: // February + if y%4 == 0 && (y%100 != 0 || y%400 == 0) { + return 29 + } + return 28 + } +} + +// countAmbiguous counts ambiguous nucleotide characters — anything other than an +// unambiguous A/C/G/T/U base (case-insensitive), i.e. N and the IUPAC degenerate +// codes. The input is expected pre-stripped of whitespace (see fastaResidues). +func countAmbiguous(seq string) int64 { + var n int64 + for _, r := range seq { + switch r { + case 'A', 'C', 'G', 'T', 'U', 'a', 'c', 'g', 't', 'u': + default: + n++ + } + } + return n +} + +// labPassageMarkers are case-insensitive substrings of an isolate name that signal a +// lab-passaged / cell-cultured sample. The dataset_report does not expose the GenBank +// passage qualifier, so exclude_lab_passaged is a documented metadata heuristic over +// the isolate name — deterministic and reproducible, but conservative (it will not +// catch a passaged sample whose name omits these markers). +var labPassageMarkers = []string{ + "passage", "passaged", "vero", "cell culture", "cell-culture", + "tissue culture", "tissue-culture", "cultured", +} + +// isLabPassaged reports whether an isolate name matches a lab-passage marker. +func isLabPassaged(isolateName string) bool { + n := strings.ToLower(isolateName) + for _, m := range labPassageMarkers { + if strings.Contains(n, m) { + return true + } + } + return false +} + +// parseNonNegInt parses a base-10 non-negative int64; ok is false otherwise. +func parseNonNegInt(s string) (int64, bool) { + n, err := strconv.ParseInt(strings.TrimSpace(s), 10, 64) + if err != nil || n < 0 { + return 0, false + } + return n, true +} + +// isTruthy maps a normalized boolean filter value to a bool (default false). +func isTruthy(s string) bool { + switch strings.ToLower(strings.TrimSpace(s)) { + case "true", "1", "yes": + return true + } + return false +} + +// describeLocalFilters renders the predicates for the completeness Reason. +func describeLocalFilters(filters []localFilter) string { + parts := make([]string, 0, len(filters)) + for _, f := range filters { + parts = append(parts, fmt.Sprintf("%s %s %s", f.Field, f.Op, f.Value)) + } + return strings.Join(parts, ", ") +} + +// fetchSequence retrieves one accession's nucleotide sequence from the efetch +// (E-utilities) endpoint as FASTA, returning the concatenated, upper-cased, +// whitespace-free residue string. It is used ONLY to evaluate max_ambiguous_chars; +// the sequence NEVER enters a RawRecord, the logical hash, or a snapshot. Sequences +// are immutable per accession.version, so the result is a pure function of the +// accession (the determinism the local filter relies on). It runs under a Governor +// permit and honors 429 backoff like every other fetch. +func (c *Connector) fetchSequence(gov *contract.Governor, id string) (string, error) { + if c.efetch == "" { + return "", fmt.Errorf("ncbivirus: fetch sequence %q: efetch endpoint not configured", id) + } + v := url.Values{} + v.Set("db", "nuccore") + v.Set("id", id) + v.Set("rettype", "fasta") + v.Set("retmode", "text") + reqURL := c.efetch + "/entrez/eutils/efetch.fcgi?" + v.Encode() + + for attempt := 0; ; attempt++ { + rep, err := gov.AcquireWithOutcome(sourceID, 1) + if err != nil { + return "", fmt.Errorf("ncbivirus: fetch sequence %q: governor acquire: %w", id, err) + } + req, rerr := http.NewRequest(http.MethodGet, reqURL, nil) + if rerr != nil { + rep(contract.OutcomeFailed, 0) + return "", fmt.Errorf("ncbivirus: fetch sequence %q: build request: %w", id, rerr) + } + resp, derr := c.doer.Do(req) + if derr != nil { + rep(contract.OutcomeFailed, 0) + return "", fmt.Errorf("ncbivirus: fetch sequence %q: transport: %w", id, derr) + } + payload, status, _, retryAfter, berr := readBody(resp) + if berr != nil { + rep(contract.OutcomeFailed, 0) + return "", fmt.Errorf("ncbivirus: fetch sequence %q: read body: %w", id, berr) + } + switch { + case status == http.StatusOK: + rep(contract.OutcomeSuccess, 0) + seq, ferr := fastaResidues(payload) + if ferr != nil { + return "", fmt.Errorf("ncbivirus: fetch sequence %q: parse FASTA: %w", id, ferr) + } + if seq == "" { + return "", fmt.Errorf("ncbivirus: fetch sequence %q: empty FASTA body", id) + } + return seq, nil + case status == http.StatusTooManyRequests: + rep(contract.OutcomeThrottled, retryAfter) + if attempt >= maxThrottleRetries { + return "", fmt.Errorf("ncbivirus: fetch sequence %q: throttled (429) after %d retries", id, attempt) + } + backoff(attempt, retryAfter) + continue + case status >= 500: + rep(contract.OutcomeFailed, 0) + return "", fmt.Errorf("ncbivirus: fetch sequence %q: upstream %d", id, status) + default: + rep(contract.OutcomeFailed, 0) + return "", fmt.Errorf("ncbivirus: fetch sequence %q: unexpected status %d", id, status) + } + } +} + +// fastaResidues concatenates all non-header lines of a FASTA payload into one +// upper-cased residue string with surrounding whitespace removed. It returns the +// scanner error if the read was truncated (e.g. a line exceeding the buffer): a +// partial sequence would undercount ambiguous bases and could wrongly admit a +// record, so the caller must treat it as a fetch failure rather than a short read. +func fastaResidues(payload []byte) (string, error) { + var b strings.Builder + sc := bufio.NewScanner(bytes.NewReader(payload)) + sc.Buffer(make([]byte, 0, 64*1024), 16*1024*1024) + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + if line == "" || strings.HasPrefix(line, ">") { + continue + } + b.WriteString(strings.ToUpper(line)) + } + if err := sc.Err(); err != nil { + return "", err + } + return b.String(), nil +} + // Ensure *Connector satisfies the frozen contract at compile time. var _ contract.Connector = (*Connector)(nil) diff --git a/connectors/ncbivirus/ncbivirus_test.go b/connectors/ncbivirus/ncbivirus_test.go index 3e74908..f586a08 100644 --- a/connectors/ncbivirus/ncbivirus_test.go +++ b/connectors/ncbivirus/ncbivirus_test.go @@ -526,6 +526,40 @@ func TestRetrieve_TokenWalkCollectsAllPages(t *testing.T) { } } +// TestRetrieve_TotalHeaderOnlyOnFirstPage reflects REAL NCBI Datasets behavior: the +// x-ncbi-total-count header is returned ONLY on the first page; continuation pages +// omit it (but keep x-datasets-version). The page-0 total is authoritative for the +// whole walk, so its absence on a later page must NOT be read as a mutation that +// downgrades an otherwise-complete multi-page walk. (The other multi-page fixtures +// set the total on every page, which masked this; real NCBI does not.) +func TestRetrieve_TotalHeaderOnlyOnFirstPage(t *testing.T) { + f := newFakeDoer() + listURL := listURLForTaxon(t, "2697049", 1000) + f.set("GET", listURL, 200, loadFixture(t, "list_page1.ndjson"), hdr("3", "TOKEN2", "18.30.1")) + page2URL := appendPageToken(listURL, "TOKEN2") + // page2: total header ABSENT (as real NCBI returns it), version present, no token. + f.set("GET", page2URL, 200, loadFixture(t, "list_page2.ndjson"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("NC_045512.2"), 200, loadFixture(t, "report_NC_045512.2.ndjson"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("NC_002549.1"), 200, loadFixture(t, "report_NC_002549.1.ndjson"), hdr("", "", "18.30.1")) + f.set("GET", reportURL("NC_001498.1"), 200, loadFixture(t, "report_NC_001498.1.ndjson"), hdr("", "", "18.30.1")) + c := newConn(t, f) + p := retrievePlan(t, c, searchQuery()) + + rs, comp, err := c.Retrieve(p, noGov()) + if err != nil { + t.Fatalf("Retrieve: %v", err) + } + if comp.State != contract.CompleteState { + t.Fatalf("a multi-page walk whose continuation page omits the total header must still reconcile Complete, got %s (%s)", comp.State, comp.Reason) + } + if comp.ReconciledCount != 3 || comp.AuthoritativeCount != 3 { + t.Fatalf("the page-0 total (3) must stay authoritative across the walk, got %d/%d", comp.ReconciledCount, comp.AuthoritativeCount) + } + if len(rs.Records) != 3 { + t.Fatalf("expected all 3 records collected, got %d", len(rs.Records)) + } +} + func TestRetrieve_204IsZeroNotError(t *testing.T) { f := newFakeDoer() f.set("GET", listURLForTaxon(t, "2697049", 1000), 204, nil, hdr("0", "", "18.30.1")) diff --git a/engine/contract/completeness.go b/engine/contract/completeness.go index 6e77e03..fb2f633 100644 --- a/engine/contract/completeness.go +++ b/engine/contract/completeness.go @@ -1,5 +1,12 @@ package contract +import ( + "bytes" + "encoding/json" + "fmt" + "strconv" +) + // Completeness is the result of the complete-or-fail invariant — the single // most important correctness property in Pinakes (PRD section 6.2; Build // Handoff invariant 3). An agent cannot eyeball a truncated result, so the @@ -29,6 +36,15 @@ type Completeness struct { // source (PRD section 6.2). It is meaningful only for CompleteState, where it // must equal ReconciledCount. Left zero/unused for BestEffort when no // trustworthy total exists. + // + // For FilteredState it carries a DIFFERENT, equally-trustworthy total: the exact + // number of records that matched the query's local predicates, computed by + // EXHAUSTIVELY scanning a COMPLETE, count-reconciled candidate set (see + // FilteredState and NewFiltered). It is authoritative because it is derived + // deterministically from a whole reconciled scan, not sampled — it is the honest + // answer to "how many match", even though the upstream API cannot report it + // server-side. ReconciledCount may be below it only when an output Limit + // truncated the returned records. AuthoritativeCount int64 `json:"authoritative_count,omitempty" yaml:"authoritative_count,omitempty"` // Reason is the mandatory human-readable explanation for any non-complete @@ -57,7 +73,7 @@ type SourceCompleteness struct { Reason string `json:"reason,omitempty" yaml:"reason,omitempty"` } -// CompletenessState enumerates the three completeness classifications. +// CompletenessState enumerates the completeness classifications. type CompletenessState uint8 const ( @@ -77,6 +93,28 @@ const ( // for the survivable case, carrying a per-source breakdown and a resumable // cursor (PRD section 9.6). PartialState + + // FilteredState means the result is the EXHAUSTIVE, deterministic subset of a + // COMPLETE, count-reconciled candidate set that matched the query's local + // predicates — predicates the upstream API cannot evaluate server-side (e.g. a + // sequence-length bound, an ambiguous-character ceiling, a collection-date + // range), so the engine retrieves the whole reconciled candidate set and applies + // them locally. + // + // It is strictly stronger than BestEffort: there is no sampling and no + // truncation of the scan — every candidate was examined, so the match count is + // EXACT and the selection is REPRODUCIBLE (the same inputs yield the same set). + // It is distinct from CompleteState because the returned count is not an + // upstream-reported authoritative total; ReconciledCount is the number of + // matching records returned and AuthoritativeCount is the exact match count + // (they differ only when an output Limit truncated the returned records). A + // non-empty Reason records the candidate scope and the predicates applied. + // + // Like a limited result, a FilteredState set is served live and is not (yet) + // content-addressed as a reproducible snapshot, because the upstream "snapshot + // => reconciled-complete" invariant the Verify path trusts must stay untouched; + // the determinism is real but is re-derived by re-running, not by snapshot pin. + FilteredState ) // String returns the canonical token for the state. @@ -88,13 +126,73 @@ func (c CompletenessState) String() string { return "best_effort" case PartialState: return "partial" + case FilteredState: + return "filtered" default: return "unknown" } } // Valid reports whether c is a defined completeness state. -func (c CompletenessState) Valid() bool { return c <= PartialState } +func (c CompletenessState) Valid() bool { return c <= FilteredState } + +// stateFromToken maps a canonical token back to its state, the inverse of String +// over the defined states. ok is false for an unknown token. +func stateFromToken(s string) (CompletenessState, bool) { + switch s { + case "complete": + return CompleteState, true + case "best_effort": + return BestEffortState, true + case "partial": + return PartialState, true + case "filtered": + return FilteredState, true + default: + return 0, false + } +} + +// MarshalJSON emits the state as its canonical string token (e.g. "complete"), +// matching the published manifest JSON schema (schema/manifest.schema.json), +// rather than the raw uint8 ordinal. The manifest is a JSON wire contract and is +// never content-addressed, so the token encoding affects only the wire form, not +// any snapshot hash. +func (c CompletenessState) MarshalJSON() ([]byte, error) { + if !c.Valid() { + return nil, fmt.Errorf("contract: cannot marshal invalid completeness state %d", uint8(c)) + } + return []byte(strconv.Quote(c.String())), nil +} + +// UnmarshalJSON accepts the canonical string token AND, for backward +// compatibility with manifests written before the token encoding, a bare integer +// ordinal. An unknown token or out-of-range ordinal is rejected. +func (c *CompletenessState) UnmarshalJSON(data []byte) error { + trimmed := bytes.TrimSpace(data) + if len(trimmed) > 0 && trimmed[0] == '"' { + var s string + if err := json.Unmarshal(trimmed, &s); err != nil { + return err + } + st, ok := stateFromToken(s) + if !ok { + return fmt.Errorf("contract: unknown completeness state %q", s) + } + *c = st + return nil + } + var n uint8 + if err := json.Unmarshal(trimmed, &n); err != nil { + return fmt.Errorf("contract: completeness state must be a token string or an integer ordinal: %w", err) + } + st := CompletenessState(n) + if !st.Valid() { + return fmt.Errorf("contract: completeness state ordinal %d out of range", n) + } + *c = st + return nil +} // NewComplete constructs a Complete result. It is the caller's responsibility // (the resolver) to have reconciled count against authoritative BEFORE calling @@ -137,9 +235,46 @@ func NewPartial(count int64, reason, resumeCursor string, perSource ...SourceCom } } +// NewFiltered constructs a Filtered result: the exhaustive, deterministic subset +// of a COMPLETE, count-reconciled candidate set that matched the query's local +// predicates (see FilteredState). matched is the EXACT number of records that +// matched (the whole candidate set was scanned, so it is not a sample); returned +// is how many of those are carried in the result (below matched only when an +// output Limit truncated them). reason MUST be non-empty and SHOULD name the +// candidate scope and the predicates applied. It panics on an impossible shape +// (negative counts, returned > matched, or an empty reason) so a Filtered result +// can only describe a real exhaustive scan. +func NewFiltered(matched, returned int64, reason string) Completeness { + if reason == "" { + panic("contract: NewFiltered requires a non-empty reason") + } + if matched < 0 || returned < 0 { + panic("contract: NewFiltered counts must be non-negative") + } + if returned > matched { + panic("contract: NewFiltered called with returned > matched — a filtered result cannot return more than it matched") + } + return Completeness{ + State: FilteredState, + ReconciledCount: returned, + AuthoritativeCount: matched, + Reason: reason, + } +} + // IsComplete reports whether the result asserts a reconciled, whole answer. func (c Completeness) IsComplete() bool { return c.State == CompleteState } +// IsExact reports whether the result carries an exact, trustworthy answer count: +// a reconciled-complete upstream total (CompleteState) or an exhaustive local +// match count over a complete reconciled candidate scan (FilteredState). It is +// false for BestEffort/Partial, where the count is not a proven total. Callers +// that only need a verified count (not a proven whole record set) can trust the +// count when IsExact is true. +func (c Completeness) IsExact() bool { + return c.State == CompleteState || c.State == FilteredState +} + // Reconciled reports whether the reconciled count equals the authoritative // total. It is the predicate the resolver checks before emitting Complete; if it // is false the engine must fail or downgrade, never emit Complete. diff --git a/engine/contract/completeness_filtered_test.go b/engine/contract/completeness_filtered_test.go new file mode 100644 index 0000000..21f14e8 --- /dev/null +++ b/engine/contract/completeness_filtered_test.go @@ -0,0 +1,128 @@ +package contract + +import ( + "encoding/json" + "strings" + "testing" +) + +func TestFilteredState_StringAndPredicates(t *testing.T) { + if FilteredState.String() != "filtered" { + t.Fatalf("FilteredState token = %q, want filtered", FilteredState.String()) + } + if !FilteredState.Valid() { + t.Fatal("FilteredState must be Valid") + } + c := NewFiltered(2, 1, "scanned 6, matched 2, returned 1") + if c.State != FilteredState { + t.Fatalf("state = %s, want filtered", c.State) + } + if c.AuthoritativeCount != 2 || c.ReconciledCount != 1 { + t.Fatalf("want authoritative=2 reconciled=1, got %d/%d", c.AuthoritativeCount, c.ReconciledCount) + } + if !c.IsExact() { + t.Fatal("a Filtered result reports an exact count => IsExact() must be true") + } + if c.IsComplete() { + t.Fatal("Filtered is not Complete") + } + if c.Reconciled() { + t.Fatal("Filtered must not report Reconciled() (its count is not an upstream total)") + } +} + +func TestIsExact_OnlyCompleteAndFiltered(t *testing.T) { + if !NewComplete(3, 3).IsExact() { + t.Error("Complete must be exact") + } + if !NewFiltered(3, 3, "r").IsExact() { + t.Error("Filtered must be exact") + } + if NewBestEffort(2, "r").IsExact() { + t.Error("BestEffort must not be exact") + } + if NewPartial(2, "r", "cur").IsExact() { + t.Error("Partial must not be exact") + } +} + +func TestNewFiltered_PanicsOnImpossibleShape(t *testing.T) { + mustPanic(t, "returned > matched", func() { NewFiltered(1, 2, "r") }) + mustPanic(t, "empty reason", func() { NewFiltered(2, 1, "") }) + mustPanic(t, "negative", func() { NewFiltered(-1, 0, "r") }) +} + +func mustPanic(t *testing.T, name string, fn func()) { + t.Helper() + defer func() { + if recover() == nil { + t.Fatalf("%s: expected panic, got none", name) + } + }() + fn() +} + +func TestCompletenessState_MarshalJSON_Token(t *testing.T) { + cases := map[CompletenessState]string{ + CompleteState: `"complete"`, + BestEffortState: `"best_effort"`, + PartialState: `"partial"`, + FilteredState: `"filtered"`, + } + for st, want := range cases { + b, err := json.Marshal(st) + if err != nil { + t.Fatalf("marshal %s: %v", st, err) + } + if string(b) != want { + t.Fatalf("marshal %s = %s, want %s", st, b, want) + } + } + // An invalid state must not marshal. + if _, err := json.Marshal(CompletenessState(99)); err == nil { + t.Fatal("marshaling an invalid state must error") + } +} + +func TestCompletenessState_UnmarshalJSON_TokenAndOrdinal(t *testing.T) { + var s CompletenessState + // Token form (the wire format). + if err := json.Unmarshal([]byte(`"filtered"`), &s); err != nil || s != FilteredState { + t.Fatalf("unmarshal token: got %s err=%v", s, err) + } + // Backward-compatible integer ordinal (manifests written before the token codec). + if err := json.Unmarshal([]byte(`0`), &s); err != nil || s != CompleteState { + t.Fatalf("unmarshal ordinal 0: got %s err=%v", s, err) + } + if err := json.Unmarshal([]byte(`1`), &s); err != nil || s != BestEffortState { + t.Fatalf("unmarshal ordinal 1: got %s err=%v", s, err) + } + // Unknown token and out-of-range ordinal are rejected. + if err := json.Unmarshal([]byte(`"nonsense"`), &s); err == nil { + t.Fatal("unknown token must be rejected") + } + if err := json.Unmarshal([]byte(`99`), &s); err == nil { + t.Fatal("out-of-range ordinal must be rejected") + } +} + +func TestCompleteness_JSONRoundTripFiltered(t *testing.T) { + in := NewFiltered(266, 1, "scanned 3279; 266 matched [collection_date in window]; returned 1") + b, err := json.Marshal(in) + if err != nil { + t.Fatalf("marshal: %v", err) + } + if !strings.Contains(string(b), `"state":"filtered"`) { + t.Fatalf("manifest JSON must carry the token state, got %s", b) + } + if !strings.Contains(string(b), `"authoritative_count":266`) { + t.Fatalf("the exact match count must serialize as authoritative_count, got %s", b) + } + var out Completeness + if err := json.Unmarshal(b, &out); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if out.State != FilteredState || out.AuthoritativeCount != 266 || out.ReconciledCount != 1 { + t.Fatalf("round-trip mismatch: %+v", out) + } +} diff --git a/schema/manifest.schema.json b/schema/manifest.schema.json index 4c933a1..766ecbe 100644 --- a/schema/manifest.schema.json +++ b/schema/manifest.schema.json @@ -171,7 +171,7 @@ "description": "The complete-or-fail verdict (PRD section 6.2). The engine never emits state:complete without reconciling reconciled_count against authoritative_count.", "required": ["state", "reconciled_count"], "properties": { - "state": { "type": "string", "enum": ["complete", "best_effort", "partial"] }, + "state": { "type": "string", "enum": ["complete", "best_effort", "partial", "filtered"] }, "reconciled_count": { "type": "integer", "minimum": 0, @@ -180,9 +180,9 @@ "authoritative_count": { "type": "integer", "minimum": 0, - "description": "Source's authoritative total; meaningful for state:complete, where it must equal reconciled_count." + "description": "Source's authoritative total; meaningful for state:complete (where it must equal reconciled_count) and state:filtered (where it is the exact local-match count from an exhaustive scan of the complete reconciled candidate set, and reconciled_count may be below it only when an output limit truncated the returned records)." }, - "reason": { "type": "string", "description": "Mandatory explanation for best_effort and partial; absent for complete." }, + "reason": { "type": "string", "description": "Mandatory explanation for best_effort, partial and filtered; absent for complete." }, "resume_cursor": { "type": "string", "description": "Opaque resumable cursor for a partial result (PRD section 9.6)." }, "per_source": { "type": "array", @@ -196,8 +196,8 @@ "then": { "required": ["state", "reconciled_count", "authoritative_count"] } }, { - "$comment": "best_effort and partial require a non-empty reason.", - "if": { "properties": { "state": { "enum": ["best_effort", "partial"] } } }, + "$comment": "best_effort, partial and filtered require a non-empty reason.", + "if": { "properties": { "state": { "enum": ["best_effort", "partial", "filtered"] } } }, "then": { "required": ["reason"], "properties": { "reason": { "minLength": 1 } } } } ] @@ -208,7 +208,7 @@ "required": ["source_id", "state", "reconciled_count"], "properties": { "source_id": { "type": "string" }, - "state": { "type": "string", "enum": ["complete", "best_effort", "partial"] }, + "state": { "type": "string", "enum": ["complete", "best_effort", "partial", "filtered"] }, "reconciled_count": { "type": "integer", "minimum": 0 }, "authoritative_count": { "type": "integer", "minimum": 0 }, "reason": { "type": "string" } diff --git a/surface/provider/specs/ncbi-virus.yaml b/surface/provider/specs/ncbi-virus.yaml index 2b32ce2..30d725b 100644 --- a/surface/provider/specs/ncbi-virus.yaml +++ b/surface/provider/specs/ncbi-virus.yaml @@ -22,6 +22,14 @@ endpoints: # one bare report JSON object per line (no reports[] envelope) + # x-datasets-version header rest: https://api.ncbi.nlm.nih.gov + # efetch (E-utilities) serves the raw nucleotide sequence used ONLY to evaluate the + # local max_ambiguous_chars predicate (the Datasets dataset_report exposes + # nucleotide.sequence_hash but not the sequence, and no ambiguous-base count). It is + # never the RawRecord source and never enters the logical hash; sequences are + # immutable per accession.version, so the count is a pure, reproducible function of + # the record. Keyless works (5 rps, shared governor budget). + # GET /entrez/eutils/efetch.fcgi?db=nuccore&id={acc}&rettype=fasta&retmode=text + efetch: https://eutils.ncbi.nlm.nih.gov auth: api_key # OPTIONAL key: works fully without one (5 rps). A loaded NCBI key # raises this source to 10 rps and is attached ONLY to the outbound *http.Request # ("api-key:" header) by the credential RoundTripper — it never reaches a URL From 0a37d077d57c590d9707ff0526d94cd3b3ffd1fc Mon Sep 17 00:00:00 2001 From: Tristan Farmer <159447266+001TMF@users.noreply.github.com> Date: Mon, 15 Jun 2026 19:14:49 +0100 Subject: [PATCH 2/2] feat(ncbivirus): local FASTA export via SequenceExporter capability Add an optional, off-the-frozen-Connector SequenceExporter capability so `export --output-format fasta` materializes the verified set's sequences through the engine instead of a side fetch. ncbivirus implements ExportFASTA via batched efetch, emitting canonical deterministic FASTA (>ACCESSION + wrapped residues, in canonical record order) under strict complete-or-fail: every requested accession must come back exactly once or it errors. snapshot.Store.WriteExport writes the artifact atomically under /exports/ (key/ext path-traversal guarded); resolver.MaterializeExport type-asserts the capability for sequence formats and keeps the legacy locator for the rest. ctx threaded for cancellation. Cross-model reviewed (codex/GPT-5.5): no P0; all findings folded in (set verification, canonical bytes, ctx, ext validation, empty-before-efetch). 20 new tests, key guarantees proven by revert. Live-validated: the exact ZEBOV query exports 261 records, byte-identical residues to the reference set. Co-Authored-By: Claude Opus 4.8 (1M context) --- connectors/ncbivirus/export_test.go | 249 ++++++++++++++++++++++++++++ connectors/ncbivirus/ncbivirus.go | 237 ++++++++++++++++++++++++-- engine/contract/connector.go | 28 ++++ engine/resolver/export.go | 72 ++++++++ engine/resolver/export_test.go | 124 ++++++++++++++ engine/snapshot/export.go | 71 ++++++++ engine/snapshot/export_test.go | 78 +++++++++ surface/core/handler.go | 30 ++-- 8 files changed, 861 insertions(+), 28 deletions(-) create mode 100644 connectors/ncbivirus/export_test.go create mode 100644 engine/resolver/export.go create mode 100644 engine/resolver/export_test.go create mode 100644 engine/snapshot/export.go create mode 100644 engine/snapshot/export_test.go diff --git a/connectors/ncbivirus/export_test.go b/connectors/ncbivirus/export_test.go new file mode 100644 index 0000000..30f5d22 --- /dev/null +++ b/connectors/ncbivirus/export_test.go @@ -0,0 +1,249 @@ +package ncbivirus + +import ( + "bytes" + "context" + "fmt" + "net/url" + "strings" + "testing" + + "pinakes.sh/pinakes/engine/contract" +) + +// efetchBatchURL is the exact URL the connector forms for one efetch batch (a +// comma-joined accession list), mirroring ExportFASTA's url.Values construction. +func efetchBatchURL(accs ...string) string { + v := url.Values{} + v.Set("db", "nuccore") + v.Set("id", strings.Join(accs, ",")) + v.Set("rettype", "fasta") + v.Set("retmode", "text") + return "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?" + v.Encode() +} + +// recs builds a normalized record set carrying just the source accessions, which is +// all ExportFASTA reads. +func recs(accs ...string) []contract.NormalizedRecord { + out := make([]contract.NormalizedRecord, 0, len(accs)) + for _, a := range accs { + out = append(out, contract.NormalizedRecord{LogicalID: "ncbi-virus:" + a, SourceAccession: a}) + } + return out +} + +// fastaMulti concatenates one synthetic FASTA record per accession into a single +// efetch payload, the way efetch returns a multi-id request. Each accession's +// residues are a stable function of the accession (NOT its position in the +// payload), so a scrambled return order does not change any record's sequence. +func fastaMulti(accs ...string) []byte { + var b bytes.Buffer + for _, a := range accs { + reps := 15 + int(a[len(a)-1])%4 // stable per accession + b.WriteString(">" + a + " synthetic\n" + strings.Repeat("ACGT", reps) + "\n") + } + return b.Bytes() +} + +// headerCount counts FASTA header lines in out. +func headerCount(out []byte) int { + n := bytes.Count(out, []byte("\n>")) + if len(out) > 0 && out[0] == '>' { + n++ + } + return n +} + +func TestExportFASTA_CanonicalAndOrdered(t *testing.T) { + f := newFakeDoer() + f.set("GET", efetchBatchURL("KM1", "KM2", "KM3"), 200, fastaMulti("KM1", "KM2", "KM3"), nil) + c := newConnEfetch(t, f) + + out, err := c.ExportFASTA(context.Background(), recs("KM1", "KM2", "KM3"), noGov()) + if err != nil { + t.Fatalf("ExportFASTA: %v", err) + } + // One batched efetch call, not one per accession. + var efetchCalls int + for _, r := range f.requests { + if strings.Contains(r, "efetch.fcgi") { + efetchCalls++ + } + } + if efetchCalls != 1 { + t.Fatalf("expected 1 batched efetch call, got %d", efetchCalls) + } + if hc := headerCount(out); hc != 3 { + t.Fatalf("expected 3 FASTA records, got %d", hc) + } + // Canonical headers: ">ACCESSION" exactly (no upstream free-text description). + for _, want := range []string{">KM1\n", ">KM2\n", ">KM3\n"} { + if !bytes.Contains(out, []byte(want)) { + t.Fatalf("canonical header %q missing from output:\n%s", want, out) + } + } + if bytes.Contains(out, []byte("synthetic")) { + t.Fatalf("export must drop upstream header free-text, got:\n%s", out) + } + // Records preserved in canonical (input) order. + iKM1, iKM2, iKM3 := bytes.Index(out, []byte(">KM1")), bytes.Index(out, []byte(">KM2")), bytes.Index(out, []byte(">KM3")) + if !(iKM1 >= 0 && iKM1 < iKM2 && iKM2 < iKM3) { + t.Fatalf("records out of canonical order: KM1@%d KM2@%d KM3@%d", iKM1, iKM2, iKM3) + } +} + +func TestExportFASTA_OutputDeterministicRegardlessOfReturnOrder(t *testing.T) { + // efetch returns the records in a DIFFERENT order than requested; the export + // must still emit them in requested order, byte-identical. + f1 := newFakeDoer() + f1.set("GET", efetchBatchURL("KM1", "KM2", "KM3"), 200, fastaMulti("KM1", "KM2", "KM3"), nil) + out1, err := newConnEfetch(t, f1).ExportFASTA(context.Background(), recs("KM1", "KM2", "KM3"), noGov()) + if err != nil { + t.Fatalf("ExportFASTA 1: %v", err) + } + f2 := newFakeDoer() + f2.set("GET", efetchBatchURL("KM1", "KM2", "KM3"), 200, fastaMulti("KM3", "KM1", "KM2"), nil) // scrambled + out2, err := newConnEfetch(t, f2).ExportFASTA(context.Background(), recs("KM1", "KM2", "KM3"), noGov()) + if err != nil { + t.Fatalf("ExportFASTA 2: %v", err) + } + if !bytes.Equal(out1, out2) { + t.Fatalf("export must be canonical regardless of efetch return order:\n%s\n---\n%s", out1, out2) + } +} + +func TestExportFASTA_Batches(t *testing.T) { + // 250 accessions => two efetch batches (200 + 50), and every record returned. + all := make([]string, 0, 250) + for i := 0; i < 250; i++ { + all = append(all, fmt.Sprintf("A%03d", i)) + } + f := newFakeDoer() + f.set("GET", efetchBatchURL(all[:200]...), 200, fastaMulti(all[:200]...), nil) + f.set("GET", efetchBatchURL(all[200:]...), 200, fastaMulti(all[200:]...), nil) + c := newConnEfetch(t, f) + + out, err := c.ExportFASTA(context.Background(), recs(all...), noGov()) + if err != nil { + t.Fatalf("ExportFASTA: %v", err) + } + var efetchCalls int + for _, r := range f.requests { + if strings.Contains(r, "efetch.fcgi") { + efetchCalls++ + } + } + if efetchCalls != 2 { + t.Fatalf("expected 2 batched efetch calls for 250 records, got %d", efetchCalls) + } + if hc := headerCount(out); hc != 250 { + t.Fatalf("expected 250 FASTA records across batches, got %d", hc) + } +} + +func TestExportFASTA_IncompleteIsFailure(t *testing.T) { + // efetch returns only two of the three requested accessions: a short export must + // fail (complete-or-fail), never silently return a short set. + f := newFakeDoer() + f.set("GET", efetchBatchURL("KM1", "KM2", "KM3"), 200, fastaMulti("KM1", "KM2"), nil) + c := newConnEfetch(t, f) + + _, err := c.ExportFASTA(context.Background(), recs("KM1", "KM2", "KM3"), noGov()) + if err == nil { + t.Fatal("a short efetch payload (2 of 3 records) must fail the export") + } + if !strings.Contains(err.Error(), "incomplete") || !strings.Contains(err.Error(), "KM3") { + t.Fatalf("error must name the incompleteness and the missing accession, got: %v", err) + } +} + +func TestExportFASTA_UnrequestedAccessionIsFailure(t *testing.T) { + // efetch returns a record that was never requested: reject it. + f := newFakeDoer() + f.set("GET", efetchBatchURL("KM1", "KM2"), 200, fastaMulti("KM1", "KM9"), nil) + c := newConnEfetch(t, f) + + _, err := c.ExportFASTA(context.Background(), recs("KM1", "KM2"), noGov()) + if err == nil || !strings.Contains(err.Error(), "unrequested") { + t.Fatalf("an unrequested returned accession must fail, got: %v", err) + } +} + +func TestExportFASTA_DuplicateReturnedAccessionIsFailure(t *testing.T) { + f := newFakeDoer() + f.set("GET", efetchBatchURL("KM1", "KM2"), 200, fastaMulti("KM1", "KM1"), nil) // KM2 missing, KM1 twice + c := newConnEfetch(t, f) + + _, err := c.ExportFASTA(context.Background(), recs("KM1", "KM2"), noGov()) + if err == nil { + t.Fatal("a duplicate returned accession must fail") + } +} + +func TestExportFASTA_DuplicateRequestedAccessionIsFailure(t *testing.T) { + c := newConnEfetch(t, newFakeDoer()) + _, err := c.ExportFASTA(context.Background(), recs("KM1", "KM1"), noGov()) + if err == nil || !strings.Contains(err.Error(), "duplicate") { + t.Fatalf("a duplicate requested accession must fail, got: %v", err) + } +} + +func TestExportFASTA_VersionInsensitiveMatch(t *testing.T) { + // Requested bare accession; efetch returns the versioned form. They must match. + f := newFakeDoer() + f.set("GET", efetchBatchURL("KM1"), 200, []byte(">KM1.2 synthetic\nACGTACGT\n"), nil) + c := newConnEfetch(t, f) + + out, err := c.ExportFASTA(context.Background(), recs("KM1"), noGov()) + if err != nil { + t.Fatalf("version-insensitive match must succeed: %v", err) + } + if !bytes.HasPrefix(out, []byte(">KM1\n")) { + t.Fatalf("export header must be the requested accession, got:\n%s", out) + } +} + +func TestExportFASTA_NoEfetchEndpoint(t *testing.T) { + c := newConn(t, newFakeDoer()) // base spec: no efetch endpoint configured + _, err := c.ExportFASTA(context.Background(), recs("KM1"), noGov()) + if err == nil { + t.Fatal("ExportFASTA without an efetch endpoint must error") + } +} + +func TestExportFASTA_MissingAccessionIsFailure(t *testing.T) { + c := newConnEfetch(t, newFakeDoer()) + bad := []contract.NormalizedRecord{{LogicalID: "ncbi-virus:x"}} // no SourceAccession + _, err := c.ExportFASTA(context.Background(), bad, noGov()) + if err == nil { + t.Fatal("a record without a source accession must fail the export") + } +} + +func TestExportFASTA_EmptySetNeedsNoEndpoint(t *testing.T) { + // A valid empty verified set yields empty bytes and no efetch call — even with no + // efetch endpoint configured (no upstream access is required). + c := newConn(t, newFakeDoer()) + out, err := c.ExportFASTA(context.Background(), nil, noGov()) + if err != nil { + t.Fatalf("empty set: %v", err) + } + if len(out) != 0 { + t.Fatalf("empty set must yield no bytes, got %d", len(out)) + } +} + +func TestExportFASTA_CancelledContext(t *testing.T) { + f := newFakeDoer() + f.set("GET", efetchBatchURL("KM1"), 200, fastaMulti("KM1"), nil) + c := newConnEfetch(t, f) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := c.ExportFASTA(ctx, recs("KM1"), noGov()) + if err == nil { + t.Fatal("a cancelled context must abort the export") + } +} + +// Compile-time proof the connector advertises the optional capability. +var _ contract.SequenceExporter = (*Connector)(nil) diff --git a/connectors/ncbivirus/ncbivirus.go b/connectors/ncbivirus/ncbivirus.go index ffb8331..7f28503 100644 --- a/connectors/ncbivirus/ncbivirus.go +++ b/connectors/ncbivirus/ncbivirus.go @@ -20,6 +20,7 @@ package ncbivirus import ( "bufio" "bytes" + "context" "encoding/json" "fmt" "io" @@ -1523,56 +1524,254 @@ func (c *Connector) fetchSequence(gov *contract.Governor, id string) (string, er v.Set("id", id) v.Set("rettype", "fasta") v.Set("retmode", "text") - reqURL := c.efetch + "/entrez/eutils/efetch.fcgi?" + v.Encode() + // Retrieve (and thus the max_ambiguous N-count fetch) is ctx-free in the frozen + // Connector interface, so this path uses a background context; the export path + // (ExportFASTA) threads the request's real context for cancellation. + payload, err := c.efetchFASTA(context.Background(), gov, v, fmt.Sprintf("fetch sequence %q", id)) + if err != nil { + return "", err + } + seq, ferr := fastaResidues(payload) + if ferr != nil { + return "", fmt.Errorf("ncbivirus: fetch sequence %q: parse FASTA: %w", id, ferr) + } + if seq == "" { + return "", fmt.Errorf("ncbivirus: fetch sequence %q: empty FASTA body", id) + } + return seq, nil +} +// efetchFASTA performs one efetch (E-utilities) GET for the given query values and +// returns the raw FASTA payload on success. It acquires a Governor permit per +// attempt, honors 429 backoff, and fails on 5xx / unexpected status — the same +// policy every other fetch uses. label appears only in error messages (e.g. +// `fetch sequence "X"` or `fasta export batch 0-199`). +func (c *Connector) efetchFASTA(ctx context.Context, gov *contract.Governor, v url.Values, label string) ([]byte, error) { + reqURL := c.efetch + "/entrez/eutils/efetch.fcgi?" + v.Encode() for attempt := 0; ; attempt++ { + if err := ctx.Err(); err != nil { + return nil, fmt.Errorf("ncbivirus: %s: %w", label, err) + } rep, err := gov.AcquireWithOutcome(sourceID, 1) if err != nil { - return "", fmt.Errorf("ncbivirus: fetch sequence %q: governor acquire: %w", id, err) + return nil, fmt.Errorf("ncbivirus: %s: governor acquire: %w", label, err) } - req, rerr := http.NewRequest(http.MethodGet, reqURL, nil) + req, rerr := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) if rerr != nil { rep(contract.OutcomeFailed, 0) - return "", fmt.Errorf("ncbivirus: fetch sequence %q: build request: %w", id, rerr) + return nil, fmt.Errorf("ncbivirus: %s: build request: %w", label, rerr) } resp, derr := c.doer.Do(req) if derr != nil { rep(contract.OutcomeFailed, 0) - return "", fmt.Errorf("ncbivirus: fetch sequence %q: transport: %w", id, derr) + return nil, fmt.Errorf("ncbivirus: %s: transport: %w", label, derr) } payload, status, _, retryAfter, berr := readBody(resp) if berr != nil { rep(contract.OutcomeFailed, 0) - return "", fmt.Errorf("ncbivirus: fetch sequence %q: read body: %w", id, berr) + return nil, fmt.Errorf("ncbivirus: %s: read body: %w", label, berr) } switch { case status == http.StatusOK: rep(contract.OutcomeSuccess, 0) - seq, ferr := fastaResidues(payload) - if ferr != nil { - return "", fmt.Errorf("ncbivirus: fetch sequence %q: parse FASTA: %w", id, ferr) - } - if seq == "" { - return "", fmt.Errorf("ncbivirus: fetch sequence %q: empty FASTA body", id) - } - return seq, nil + return payload, nil case status == http.StatusTooManyRequests: rep(contract.OutcomeThrottled, retryAfter) if attempt >= maxThrottleRetries { - return "", fmt.Errorf("ncbivirus: fetch sequence %q: throttled (429) after %d retries", id, attempt) + return nil, fmt.Errorf("ncbivirus: %s: throttled (429) after %d retries", label, attempt) } backoff(attempt, retryAfter) continue case status >= 500: rep(contract.OutcomeFailed, 0) - return "", fmt.Errorf("ncbivirus: fetch sequence %q: upstream %d", id, status) + return nil, fmt.Errorf("ncbivirus: %s: upstream %d", label, status) default: rep(contract.OutcomeFailed, 0) - return "", fmt.Errorf("ncbivirus: fetch sequence %q: unexpected status %d", id, status) + return nil, fmt.Errorf("ncbivirus: %s: unexpected status %d", label, status) } } } +// efetchBatch is the number of accessions requested per efetch call during a FASTA +// export. NCBI's E-utilities accept a comma-separated id list; 200 keeps each +// request URL well within limits while minimizing round-trips. +const efetchBatch = 200 + +// fastaLineWidth is the residue wrap width of the canonical export. +const fastaLineWidth = 70 + +// ExportFASTA implements contract.SequenceExporter: it materializes the nucleotide +// FASTA for a verified, normalized record set by fetching sequences from the +// efetch endpoint in batches and emitting them in the records' canonical order. +// +// The output is CANONICAL and deterministic: one record per requested accession as +// `>ACCESSION` (the verified accession, not upstream free-text) followed by the +// upper-cased residues wrapped at a fixed width. The bytes are therefore a pure +// function of (verified accession set, immutable per-accession.version residues), +// not of NCBI's mutable header/wrapping. They are DERIVED — never a RawRecord, the +// logical hash, or a snapshot. +// +// It is strict complete-or-fail: every requested accession must come back exactly +// once, every returned accession must have been requested, and no residues may be +// empty — otherwise it errors rather than emitting a short or scrambled set. efetch +// is NOT assumed to preserve input order or de-duplicate. +func (c *Connector) ExportFASTA(ctx context.Context, records []contract.NormalizedRecord, gov *contract.Governor) ([]byte, error) { + // Requested order + a version-insensitive lookup so a returned accession.version + // matches its requested record even if one side omits the version suffix. + ids := make([]string, 0, len(records)) // full accessions, requested order + keyForIdx := make([]string, 0, len(records)) // stripped key per requested record + wantKey := make(map[string]bool, len(records)) + for _, rec := range records { + if rec.SourceAccession == "" { + return nil, fmt.Errorf("ncbivirus: fasta export: record %q has no source accession", rec.LogicalID) + } + key := accessionKey(rec.SourceAccession) + if wantKey[key] { + return nil, fmt.Errorf("ncbivirus: fasta export: duplicate accession %q in record set", rec.SourceAccession) + } + wantKey[key] = true + ids = append(ids, rec.SourceAccession) + keyForIdx = append(keyForIdx, key) + } + // A valid empty verified set needs no upstream access. + if len(ids) == 0 { + return []byte{}, nil + } + if c.efetch == "" { + return nil, fmt.Errorf("ncbivirus: fasta export: efetch endpoint not configured") + } + + // Fetch in batches; index residues by stripped accession key. + seqs := make(map[string]string, len(ids)) + for start := 0; start < len(ids); start += efetchBatch { + end := start + efetchBatch + if end > len(ids) { + end = len(ids) + } + v := url.Values{} + v.Set("db", "nuccore") + v.Set("id", strings.Join(ids[start:end], ",")) + v.Set("rettype", "fasta") + v.Set("retmode", "text") + payload, err := c.efetchFASTA(ctx, gov, v, fmt.Sprintf("fasta export batch %d-%d", start, end-1)) + if err != nil { + return nil, err + } + recs, perr := parseFASTARecords(payload) + if perr != nil { + return nil, fmt.Errorf("ncbivirus: fasta export: parse batch %d-%d: %w", start, end-1, perr) + } + for _, fr := range recs { + key := accessionKey(fr.accession) + if !wantKey[key] { + return nil, fmt.Errorf("ncbivirus: fasta export: efetch returned unrequested accession %q", fr.accession) + } + if _, dup := seqs[key]; dup { + return nil, fmt.Errorf("ncbivirus: fasta export: efetch returned accession %q twice", fr.accession) + } + if fr.residues == "" { + return nil, fmt.Errorf("ncbivirus: fasta export: empty sequence for %q", fr.accession) + } + seqs[key] = fr.residues + } + } + + // Complete-or-fail: every requested accession must have a sequence. + if len(seqs) != len(ids) { + for i, key := range keyForIdx { + if _, ok := seqs[key]; !ok { + return nil, fmt.Errorf("ncbivirus: fasta export incomplete: requested %d sequences, got %d (missing %q)", len(ids), len(seqs), ids[i]) + } + } + return nil, fmt.Errorf("ncbivirus: fasta export incomplete: requested %d sequences, got %d", len(ids), len(seqs)) + } + + // Emit canonical FASTA in requested order. + var out bytes.Buffer + for i, key := range keyForIdx { + out.WriteString(">") + out.WriteString(ids[i]) + out.WriteByte('\n') + writeWrapped(&out, seqs[key], fastaLineWidth) + } + return out.Bytes(), nil +} + +// accessionKey strips the version suffix (".N") from an accession so the requested +// accession and efetch's returned accession.version compare equal regardless of +// which side carries the version. +func accessionKey(acc string) string { + if i := strings.IndexByte(acc, '.'); i >= 0 { + return acc[:i] + } + return acc +} + +// fastaRecord is one parsed FASTA record: its accession (the first whitespace- +// delimited token of the header) and concatenated, upper-cased residues. +type fastaRecord struct { + accession string + residues string +} + +// parseFASTARecords splits a (possibly multi-record) FASTA payload into records, +// returning the scanner error on a truncated read so a partial sequence is treated +// as a failure rather than silently admitted. +func parseFASTARecords(payload []byte) ([]fastaRecord, error) { + var ( + recs []fastaRecord + cur *fastaRecord + seq strings.Builder + ) + flush := func() { + if cur != nil { + cur.residues = seq.String() + recs = append(recs, *cur) + } + seq.Reset() + } + sc := bufio.NewScanner(bytes.NewReader(payload)) + sc.Buffer(make([]byte, 0, 64*1024), 16*1024*1024) + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + if line == "" { + continue + } + if strings.HasPrefix(line, ">") { + flush() + header := strings.TrimSpace(line[1:]) + acc := header + if i := strings.IndexAny(header, " \t"); i >= 0 { + acc = header[:i] + } + cur = &fastaRecord{accession: acc} + continue + } + if cur == nil { + return nil, fmt.Errorf("residues before any FASTA header") + } + seq.WriteString(strings.ToUpper(line)) + } + if err := sc.Err(); err != nil { + return nil, err + } + flush() + return recs, nil +} + +// writeWrapped writes seq to out in fixed-width residue lines (the canonical FASTA +// body), each terminated by a newline. +func writeWrapped(out *bytes.Buffer, seq string, width int) { + for i := 0; i < len(seq); i += width { + end := i + width + if end > len(seq) { + end = len(seq) + } + out.WriteString(seq[i:end]) + out.WriteByte('\n') + } +} + // fastaResidues concatenates all non-header lines of a FASTA payload into one // upper-cased residue string with surrounding whitespace removed. It returns the // scanner error if the read was truncated (e.g. a line exceeding the buffer): a @@ -1597,3 +1796,7 @@ func fastaResidues(payload []byte) (string, error) { // Ensure *Connector satisfies the frozen contract at compile time. var _ contract.Connector = (*Connector)(nil) + +// Ensure *Connector also satisfies the optional sequence-export capability, so +// `export --output-format fasta` materializes sequences through the engine. +var _ contract.SequenceExporter = (*Connector)(nil) diff --git a/engine/contract/connector.go b/engine/contract/connector.go index 22fe6c4..2e52df9 100644 --- a/engine/contract/connector.go +++ b/engine/contract/connector.go @@ -1,5 +1,7 @@ package contract +import "context" + // Connector is the FROZEN interface every source connector implements (PRD // section 7.1). It is the single most important signature in the project: the // declarative YAML spec covers the uniform part (auth, limits, license, @@ -50,3 +52,29 @@ type Connector interface { // drifted manifest (PRD sections 6.1, 7.1). FilterSemantics() SemanticsSpec } + +// SequenceExporter is an OPTIONAL connector capability, kept OFF the frozen +// Connector interface so adding it touches no existing connector. A connector +// whose records reference an external, immutable sequence payload (nucleotide / +// protein FASTA fetched by accession) implements it so `export --output-format +// fasta` can materialize real bytes through the engine rather than a side fetch. +// +// The resolver type-asserts a source's connector to SequenceExporter when an +// export requests a sequence format; a connector that does not implement it makes +// that format unavailable (the export refuses with a clear error rather than +// silently returning a metadata-only locator). +// +// The returned FASTA bytes are DERIVED data: they are NOT part of the logical- +// record digest, the manifest, or any snapshot. Sequences are immutable per +// accession.version, so the export is a pure function of the verified accession +// set the records carry — its determinism rides on the already-reconciled set, +// not on a second reproducibility anchor. The implementation MUST preserve the +// records' canonical order and MUST be complete-or-fail: if it cannot return a +// sequence for every requested record it returns an error, never a short set. +type SequenceExporter interface { + // ExportFASTA returns the FASTA bytes for records, in their given (canonical) + // order, fetching every sequence through gov permits like any other call. ctx + // carries the request deadline/cancellation so a long multi-batch export aborts + // when the caller disconnects. + ExportFASTA(ctx context.Context, records []NormalizedRecord, gov *Governor) ([]byte, error) +} diff --git a/engine/resolver/export.go b/engine/resolver/export.go new file mode 100644 index 0000000..4ab96d5 --- /dev/null +++ b/engine/resolver/export.go @@ -0,0 +1,72 @@ +package resolver + +// export.go materializes an export artifact for an already-resolved result. The +// surface Export verb resolves the query (Resolve → records + manifest), then +// calls MaterializeExport to turn the requested output format into a concrete +// local locator. +// +// For a sequence-bearing format (currently "fasta") the bytes are fetched through +// the source connector's OPTIONAL contract.SequenceExporter capability and written +// to a real file under the snapshot store, so `export --output-format fasta` +// produces sequences THROUGH the engine rather than a side fetch. For every other +// format the locator is the legacy content-addressed logical locator +// (file://exports/) — bytes for those formats are not yet +// materialized, preserving prior behavior. + +import ( + "context" + "fmt" + + "pinakes.sh/pinakes/engine/contract" +) + +// sequenceExportExt maps a sequence export format token to the on-disk extension +// the materialized artifact is written with. A format absent here is not a +// sequence format: MaterializeExport returns the legacy logical locator for it. +var sequenceExportExt = map[string]string{ + "fasta": ".fasta", +} + +// MaterializeExport returns the local locator for an export of records in the +// given output format. logicalHash is the resolved manifest's reproducibility +// anchor; it both names the materialized artifact (so re-exporting the same +// verified set is idempotent) and is the legacy logical locator's content key. +// +// For a sequence format the source connector MUST implement +// contract.SequenceExporter and a snapshot store MUST be configured; the bytes are +// fetched (under ctx) in canonical record order and written atomically, and the +// returned locator is a file:// path to those bytes. For any other format the bytes +// are not materialized and the legacy file://exports/ locator is returned +// unchanged. +func (r *Resolver) MaterializeExport(ctx context.Context, format, logicalHash string, records *contract.NormalizedRecords) (string, error) { + ext, isSeq := sequenceExportExt[format] + if !isSeq { + // Unchanged Foundation behavior: a content-addressed logical locator with + // no materialized bytes (the JSON/other-format export path is deferred). + return "file://exports/" + logicalHash, nil + } + if records == nil { + return "", fmt.Errorf("resolver: %q export has no resolved records", format) + } + src, ok := r.sources[records.SourceID] + if !ok { + return "", fmt.Errorf("%w: %q", ErrUnknownSource, records.SourceID) + } + exporter, ok := src.Connector.(contract.SequenceExporter) + if !ok { + return "", fmt.Errorf("resolver: source %q does not support %q export", records.SourceID, format) + } + if r.store == nil { + return "", fmt.Errorf("resolver: %q export requires a configured snapshot store", format) + } + + data, err := exporter.ExportFASTA(ctx, records.Records, r.gov) + if err != nil { + return "", fmt.Errorf("resolver: materialize %q export for %q: %w", format, records.SourceID, err) + } + path, err := r.store.WriteExport(logicalHash, ext, data) + if err != nil { + return "", fmt.Errorf("resolver: write %q export: %w", format, err) + } + return "file://" + path, nil +} diff --git a/engine/resolver/export_test.go b/engine/resolver/export_test.go new file mode 100644 index 0000000..9c6f7f1 --- /dev/null +++ b/engine/resolver/export_test.go @@ -0,0 +1,124 @@ +package resolver + +import ( + "context" + "os" + "strings" + "testing" + + "pinakes.sh/pinakes/engine/contract" +) + +// exportingConn is a mockConnector that also advertises the optional +// contract.SequenceExporter capability, returning canned bytes (or an error) from +// ExportFASTA so MaterializeExport's materialization path is testable without a +// network. +type exportingConn struct { + *mockConnector + fasta []byte + err error +} + +func (e *exportingConn) ExportFASTA(ctx context.Context, records []contract.NormalizedRecord, gov *contract.Governor) ([]byte, error) { + return e.fasta, e.err +} + +func exportSource(id string, c contract.Connector) Source { + return Source{ + ID: id, + Connector: c, + Maturity: contract.L2Normalized, + License: contract.License{ + SPDX: "CC0-1.0", Posture: contract.HostServe, Redistribute: true, Snapshot: true, + }, + ConnectorSpecVersion: "mock/1.0.0", + } +} + +func TestMaterializeExport_NonSequenceFormatLegacyLocator(t *testing.T) { + store, _ := newMemStore(t) + src := mockSource("pdb", contract.L2Normalized, true) + r := NewResolver(map[string]Source{"pdb": src}, testGovernor(), store, nil, testEngineVersion) + + loc, err := r.MaterializeExport(context.Background(), "json", "abc123", &contract.NormalizedRecords{SourceID: "pdb"}) + if err != nil { + t.Fatalf("MaterializeExport(json): %v", err) + } + if loc != "file://exports/abc123" { + t.Fatalf("non-sequence format must return the legacy logical locator, got %q", loc) + } +} + +func TestMaterializeExport_FASTAWritesRealFile(t *testing.T) { + store, _ := newMemStore(t) + want := []byte(">KM1 synthetic\nACGTACGT\n>KM2 synthetic\nTTTTGGGG\n") + conn := &exportingConn{mockConnector: newMockConnector("ncbi-virus", true), fasta: want} + r := NewResolver(map[string]Source{"ncbi-virus": exportSource("ncbi-virus", conn)}, + testGovernor(), store, nil, testEngineVersion) + + loc, err := r.MaterializeExport(context.Background(), "fasta", "deadbeef", &contract.NormalizedRecords{ + SourceID: "ncbi-virus", + Records: []contract.NormalizedRecord{{SourceAccession: "KM1"}, {SourceAccession: "KM2"}}, + }) + if err != nil { + t.Fatalf("MaterializeExport(fasta): %v", err) + } + if !strings.HasPrefix(loc, "file://") { + t.Fatalf("fasta export must return a file:// locator, got %q", loc) + } + path := strings.TrimPrefix(loc, "file://") + got, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read materialized export %q: %v", path, err) + } + if string(got) != string(want) { + t.Fatalf("materialized bytes mismatch:\n got %q\nwant %q", got, want) + } + if !strings.HasSuffix(path, "deadbeef.fasta") { + t.Fatalf("export file must be keyed by the logical hash, got %q", path) + } +} + +func TestMaterializeExport_UnknownSource(t *testing.T) { + store, _ := newMemStore(t) + r := NewResolver(map[string]Source{}, testGovernor(), store, nil, testEngineVersion) + _, err := r.MaterializeExport(context.Background(), "fasta", "h", &contract.NormalizedRecords{SourceID: "missing"}) + if err == nil { + t.Fatal("fasta export for an unknown source must error") + } +} + +func TestMaterializeExport_ConnectorWithoutCapability(t *testing.T) { + store, _ := newMemStore(t) + // A plain mockConnector does NOT implement SequenceExporter. + src := mockSource("pdb", contract.L2Normalized, true) + r := NewResolver(map[string]Source{"pdb": src}, testGovernor(), store, nil, testEngineVersion) + _, err := r.MaterializeExport(context.Background(), "fasta", "h", &contract.NormalizedRecords{SourceID: "pdb"}) + if err == nil || !strings.Contains(err.Error(), "does not support") { + t.Fatalf("a connector without the capability must refuse fasta export, got: %v", err) + } +} + +func TestMaterializeExport_NilRecords(t *testing.T) { + store, _ := newMemStore(t) + conn := &exportingConn{mockConnector: newMockConnector("ncbi-virus", true)} + r := NewResolver(map[string]Source{"ncbi-virus": exportSource("ncbi-virus", conn)}, + testGovernor(), store, nil, testEngineVersion) + _, err := r.MaterializeExport(context.Background(), "fasta", "h", nil) + if err == nil { + t.Fatal("fasta export with nil records must error") + } +} + +func TestMaterializeExport_NoStore(t *testing.T) { + conn := &exportingConn{mockConnector: newMockConnector("ncbi-virus", true), fasta: []byte(">x\nACGT\n")} + r := NewResolver(map[string]Source{"ncbi-virus": exportSource("ncbi-virus", conn)}, + testGovernor(), nil /* no store */, nil, testEngineVersion) + _, err := r.MaterializeExport(context.Background(), "fasta", "h", &contract.NormalizedRecords{ + SourceID: "ncbi-virus", + Records: []contract.NormalizedRecord{{SourceAccession: "x"}}, + }) + if err == nil || !strings.Contains(err.Error(), "snapshot store") { + t.Fatalf("fasta export without a store must error, got: %v", err) + } +} diff --git a/engine/snapshot/export.go b/engine/snapshot/export.go new file mode 100644 index 0000000..77ab424 --- /dev/null +++ b/engine/snapshot/export.go @@ -0,0 +1,71 @@ +package snapshot + +// export.go materializes DERIVED export artifacts (e.g. a FASTA file built from a +// verified accession set) under the store root's "exports" tree. An export is not +// a content-addressed snapshot object and never enters the version index or the +// reproducibility anchor: it is a pure function of the verified logical-record set +// it was built from (the resolver keys it by that set's logical-record hash, so a +// re-export of the same verified result is idempotent). Keeping it under the store +// root — a sibling of objects/ — means a store is still fully self-describing on +// disk and the same retention root covers derived artifacts. + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" +) + +// WriteExport writes data as a derived export artifact named key+ext under +// /exports/ and returns its absolute path. The write is atomic (temp file in +// the same directory, fsync, rename), matching the object backend's durability. +// key must be non-empty and a single path element (no separators) so a caller +// hash can never escape the exports tree; ext is an optional extension including +// the leading dot (e.g. ".fasta"). +func (s *Store) WriteExport(key, ext string, data []byte) (string, error) { + if key == "" { + return "", errors.New("snapshot: WriteExport requires a non-empty key") + } + if strings.ContainsAny(key, `/\`) || key == "." || key == ".." { + return "", fmt.Errorf("snapshot: WriteExport key %q must be a single path element", key) + } + // ext must be empty or a dot-prefixed, separator-free suffix so it cannot escape + // the exports tree (e.g. "../../etc") when joined onto key. + if ext != "" && (!strings.HasPrefix(ext, ".") || strings.ContainsAny(ext, `/\`) || strings.Contains(ext, "..")) { + return "", fmt.Errorf("snapshot: WriteExport ext %q must be empty or a dot-prefixed extension", ext) + } + + dir := filepath.Join(s.root, "exports") + if err := os.MkdirAll(dir, 0o755); err != nil { + return "", fmt.Errorf("snapshot: create exports dir: %w", err) + } + path := filepath.Join(dir, key+ext) + + tmp, err := os.CreateTemp(dir, ".tmp-export-*") + if err != nil { + return "", fmt.Errorf("snapshot: create temp export: %w", err) + } + tmpName := tmp.Name() + defer os.Remove(tmpName) // no-op after a successful rename + if _, err := tmp.Write(data); err != nil { + tmp.Close() + return "", fmt.Errorf("snapshot: write temp export: %w", err) + } + if err := tmp.Sync(); err != nil { + tmp.Close() + return "", fmt.Errorf("snapshot: sync temp export: %w", err) + } + if err := tmp.Close(); err != nil { + return "", fmt.Errorf("snapshot: close temp export: %w", err) + } + if err := os.Rename(tmpName, path); err != nil { + return "", fmt.Errorf("snapshot: rename export into place: %w", err) + } + + abs, err := filepath.Abs(path) + if err != nil { + return path, nil // path is already usable; absolute form is best-effort + } + return abs, nil +} diff --git a/engine/snapshot/export_test.go b/engine/snapshot/export_test.go new file mode 100644 index 0000000..31588aa --- /dev/null +++ b/engine/snapshot/export_test.go @@ -0,0 +1,78 @@ +package snapshot + +import ( + "os" + "path/filepath" + "testing" +) + +func TestWriteExport_WritesContentAddressedFile(t *testing.T) { + root := t.TempDir() + s, err := NewStore(root) + if err != nil { + t.Fatalf("NewStore: %v", err) + } + data := []byte(">KM1\nACGTACGT\n") + path, err := s.WriteExport("abc123", ".fasta", data) + if err != nil { + t.Fatalf("WriteExport: %v", err) + } + if !filepath.IsAbs(path) { + t.Fatalf("WriteExport must return an absolute path, got %q", path) + } + want := filepath.Join(root, "exports", "abc123.fasta") + if abs, _ := filepath.Abs(want); path != abs { + t.Fatalf("export path = %q, want %q", path, abs) + } + got, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read export: %v", err) + } + if string(got) != string(data) { + t.Fatalf("export content = %q, want %q", got, data) + } +} + +func TestWriteExport_Idempotent(t *testing.T) { + s, err := NewStore(t.TempDir()) + if err != nil { + t.Fatalf("NewStore: %v", err) + } + p1, err := s.WriteExport("k", ".fasta", []byte("first")) + if err != nil { + t.Fatalf("WriteExport 1: %v", err) + } + p2, err := s.WriteExport("k", ".fasta", []byte("second")) + if err != nil { + t.Fatalf("WriteExport 2: %v", err) + } + if p1 != p2 { + t.Fatalf("same key must map to the same path: %q vs %q", p1, p2) + } + got, _ := os.ReadFile(p2) + if string(got) != "second" { + t.Fatalf("re-export must overwrite, got %q", got) + } +} + +func TestWriteExport_RejectsBadKey(t *testing.T) { + s, err := NewStore(t.TempDir()) + if err != nil { + t.Fatalf("NewStore: %v", err) + } + for _, k := range []string{"", "a/b", "../escape", ".", ".."} { + if _, err := s.WriteExport(k, ".fasta", []byte("x")); err == nil { + t.Fatalf("WriteExport must reject key %q", k) + } + } + // A malicious extension must not escape the exports tree either. + for _, ext := range []string{"fasta", "../../etc/x", ".a/b", "..fa"} { + if _, err := s.WriteExport("k", ext, []byte("x")); err == nil { + t.Fatalf("WriteExport must reject ext %q", ext) + } + } + // Empty ext is allowed (no extension). + if _, err := s.WriteExport("noext", "", []byte("x")); err != nil { + t.Fatalf("WriteExport must allow empty ext: %v", err) + } +} diff --git a/surface/core/handler.go b/surface/core/handler.go index 89a2bb5..f8ce2a4 100644 --- a/surface/core/handler.go +++ b/surface/core/handler.go @@ -249,29 +249,37 @@ func (h *Handler) Export(ctx context.Context, req *idl.ExportRequest) (*idl.Expo resp.Error = errResp(err) return resp, nil } - _, manifest, err := h.provider.GetResolver().Resolve(ctx, coerceExport(req)) + records, manifest, err := h.provider.GetResolver().Resolve(ctx, coerceExport(req)) if err != nil { resp.Error = errResp(err) return resp, nil } resp.Manifest = manifest resp.OutputFormat = req.OutputFormat - // Synchronous, in-process export: the locator is a local path derived from the - // reproducibility anchor (the logical-record hash), so the same result always - // locates the same export. No JobID — nothing ran asynchronously. - resp.Location = exportLocation(manifest) + // Synchronous, in-process export. For a sequence format (fasta) MaterializeExport + // fetches the bytes through the source connector and writes a real local file, + // returning a file:// path to it; for any other format it returns the legacy + // content-addressed logical locator derived from the reproducibility anchor, so + // the same result always locates the same export. No JobID — nothing ran + // asynchronously. + loc, err := h.provider.GetResolver().MaterializeExport(ctx, req.OutputFormat, manifestHash(manifest), records) + if err != nil { + resp.Error = errResp(err) + return resp, nil + } + resp.Location = loc return resp, nil } -// exportLocation derives the synchronous export's local locator from the manifest's -// logical-record hash, so an export's location is reproducible and tied to its -// content. A nil manifest (defensive; Resolve returns non-nil on success) yields an -// empty locator. -func exportLocation(m *contract.Manifest) string { +// manifestHash returns the manifest's logical-record hash (the reproducibility +// anchor), which both names a materialized export artifact and keys the legacy +// content-addressed locator. A nil manifest (defensive; Resolve returns non-nil on +// success) yields an empty key. +func manifestHash(m *contract.Manifest) string { if m == nil { return "" } - return "file://exports/" + m.LogicalRecordHash + return m.LogicalRecordHash } // Jobs is a control-plane stub at Foundation time: the async job lifecycle is