Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions connectors/clinvar/clinvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.

searchPath := len(st.IDs) == 0

// limit is the caller's hard ceiling (NormalizedQuery.Limit), honored as a WALK
// CEILING per the contract: a huge upstream (e.g. BRCA1 pathogenic, thousands of
// VariationIDs) is never fully fetched for a small limit such as a --limit 1 count
// query. esearch's Count is read from the FIRST page (the authoritative total) and
// the idlist walk stops once `limit` ids are collected, so a count query returns in
// a few seconds instead of timing out walking + efetching the whole idlist. A limit
// below the authoritative count makes the result necessarily incomplete (the
// Complete gate below still requires fetched == authority), so a limit can never
// produce a false Complete — only a BestEffort partial. 0 = unlimited.
limit := p.Query.Limit

// SourceVersion authority #1: read the ClinVar ReleaseDate at walk START. It is
// re-read at walk END below; a change between the two reads means a new release
// landed under us, which downgrades to BestEffort.
Expand Down Expand Up @@ -282,6 +293,17 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.
ids = append(ids, id)
added++
}
// Walk ceiling: once the distinct set reaches the caller's limit, no further
// esearch pages are needed — but ONLY when the limit is STRICTLY below the
// authoritative count. That guard is load-bearing: it guarantees a limited walk
// is always incomplete (fetched <= limit < authority => BestEffort), so the
// limit can never mask an over-observation into a false Complete. When
// limit >= authority the limit is non-constraining and the normal cover/total
// logic applies. This is what makes a --limit 1 count query over a large gene
// stop after the first page instead of paging + efetching the whole idlist.
if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) >= limit {
break
}
// Terminate as soon as the distinct set covers the authoritative count.
if totalKnown && authority > 0 && int64(len(ids)) >= authority {
break
Expand Down Expand Up @@ -310,6 +332,17 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.
// (not lexically) so "14" precedes "18180" rather than following it.
sortVariationIDsAsc(ids)

// Truncate to the first `limit` VariationIDs in the imposed (sorted) order, but
// ONLY when limit is STRICTLY below the authoritative count (the same guard as the
// walk ceiling). This bounds the per-VariationID efetch for a small limit while
// guaranteeing the result stays incomplete (fetched <= limit < authority =>
// BestEffort). Truncating when limit >= authority could drop a legitimately-fetched
// id; keeping the guard at limit < authority means a non-constraining limit never
// perturbs a full, reconcilable walk.
if limit > 0 && authority > 0 && limit < authority && int64(len(ids)) > limit {
ids = ids[:limit]
}

// Fetch one verbatim efetch <VariationArchive> body per DISTINCT VariationID, in
// the sorted order. A 200 body with no <VariationArchive> element is a MISSING
// record (the empty <ClinVarResult-Set><set/></ClinVarResult-Set> case) — skip
Expand Down Expand Up @@ -359,6 +392,11 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.
}

fetched := int64(len(raws))
// limitApplied: the caller's limit constrained the walk below the authoritative
// count. Reported as the BestEffort reason (the count is still authoritative — the
// esearch Count from page 0). totalKnown gates it so an absent count never claims an
// authoritative total.
limitApplied := limit > 0 && totalKnown && authority > 0 && limit < authority
if st.Reconcile && totalKnown && releaseKnown && !walkTruncated && !sawDuplicate && !totalMutated && !releaseDrift && fetched == authority {
// Reconciled: the authoritative count was KNOWN and stable across the whole
// (bounded, re-list-free) walk, the ReleaseDate was coherent start-to-end, and
Expand All @@ -374,6 +412,8 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.
reason = fmt.Sprintf("clinvar: esearch reported no authoritative count; fetched %d — total unknown, cannot claim Complete", fetched)
case !releaseKnown:
reason = fmt.Sprintf("clinvar: release-header read reported no ReleaseDate; fetched %d — source version unknown, cannot claim Complete", fetched)
case limitApplied:
reason = fmt.Sprintf("clinvar: stopped at the requested limit of %d (authoritative total %d) — partial by request, count not reconciled", limit, authority)
case walkTruncated:
reason = truncReason
case sawDuplicate:
Expand All @@ -385,7 +425,16 @@ func (c *Connector) Retrieve(p contract.Plan, gov *contract.Governor) (contract.
default:
reason = fmt.Sprintf("clinvar: fetched %d of %d (some VariationIDs missing — empty <ClinVarResult-Set>) — count not reconciled", fetched, authority)
}
return rs, contract.NewBestEffort(fetched, reason), nil
comp := contract.NewBestEffort(fetched, reason)
if limitApplied {
// The authoritative total IS known (the page-0 esearch Count); the walk was
// stopped at the caller's limit by request, not because the total is
// untrustworthy. Expose it so a caller asking only for a count gets the real total
// from a cheap limited fetch (the --limit 1 count-query fast path). Other
// BestEffort cases leave it zero because their total is absent or proven unstable.
comp.AuthoritativeCount = authority
}
return rs, comp, nil
}

// esearchResponse is the typed view over the esearch 200 JSON body. Only the
Expand Down Expand Up @@ -1171,6 +1220,19 @@ func filtersToTerm(filters []contract.Filter) (string, error) {
return strings.Join(terms, " AND "), nil
}

// entrezValue prepares a filter value for an Entrez "value[field]" term. A value
// containing whitespace (e.g. "Uncertain significance", "Likely pathogenic") MUST be
// double-quoted, otherwise Entrez tokenizes it across the field boundary
// ("Uncertain significance[clinical significance]" parses as two terms and silently
// returns the wrong/zero count). A single-word value is returned unchanged so existing
// terms remain byte-identical. An already-quoted value is left as-is.
func entrezValue(v string) string {
if strings.ContainsAny(v, " \t") && !(strings.HasPrefix(v, "\"") && strings.HasSuffix(v, "\"")) {
return "\"" + v + "\""
}
return v
}

// renderTerm renders one filter into an Entrez term. A multi-value `in` (a
// comma-separated set) is expanded into a parenthesized OR group
// "(v1[field] OR v2[field])" — valid Entrez syntax — rather than a single
Expand All @@ -1184,7 +1246,7 @@ func renderTerm(r rule, value string) (string, error) {
if p == "" {
continue
}
ors = append(ors, p+"["+r.EntrezField+"]")
ors = append(ors, entrezValue(p)+"["+r.EntrezField+"]")
}
switch len(ors) {
case 0:
Expand All @@ -1195,7 +1257,7 @@ func renderTerm(r rule, value string) (string, error) {
return "(" + strings.Join(ors, " OR ") + ")", nil
}
}
return value + "[" + r.EntrezField + "]", nil
return entrezValue(value) + "[" + r.EntrezField + "]", nil
}

// --- filter semantics ------------------------------------------------------
Expand Down
153 changes: 151 additions & 2 deletions connectors/clinvar/clinvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,20 +281,30 @@ func TestFiltersToTerm(t *testing.T) {
want: "HFE[gene]",
},
{
name: "variation_type eq",
name: "variation_type eq (multi-word value is quoted)",
filters: []contract.Filter{{Field: "variation_type", Operator: "eq", Value: "single nucleotide variant"}},
want: "single nucleotide variant[type of variation]",
want: "\"single nucleotide variant\"[type of variation]",
},
{
name: "clinical_significance in single",
filters: []contract.Filter{{Field: "clinical_significance", Operator: "in", Value: "pathogenic"}},
want: "pathogenic[clinical significance]",
},
{
name: "clinical_significance in multi-word value is quoted",
filters: []contract.Filter{{Field: "clinical_significance", Operator: "in", Value: "Uncertain significance"}},
want: "\"Uncertain significance\"[clinical significance]",
},
{
name: "clinical_significance in multi -> OR group",
filters: []contract.Filter{{Field: "clinical_significance", Operator: "in", Value: "pathogenic,benign"}},
want: "(pathogenic[clinical significance] OR benign[clinical significance])",
},
{
name: "clinical_significance in OR group quotes multi-word members",
filters: []contract.Filter{{Field: "clinical_significance", Operator: "in", Value: "Pathogenic,Likely pathogenic"}},
want: "(Pathogenic[clinical significance] OR \"Likely pathogenic\"[clinical significance])",
},
{
name: "AND join in filter order",
filters: []contract.Filter{
Expand Down Expand Up @@ -497,6 +507,145 @@ func TestRetrieve_MultiPageWalk(t *testing.T) {
}
}

// --- Retrieve: limit as a walk ceiling -------------------------------------

// TestRetrieve_LimitBoundsWalk: a Query.Limit below the authoritative esearch Count
// is honored as a walk ceiling. The result is BestEffort (NOT a false Complete)
// truncated to Limit records, but STILL reports the authoritative total (esearch's
// Count from the FIRST page) so a caller asking only for a count gets the real number
// from a cheap limited fetch — the --limit 1 count-query fast path for a large gene.
func TestRetrieve_LimitBoundsWalk(t *testing.T) {
c := newConn(t, completeDoer(t, hfeTerm)) // single esearch page, count=2, both efetch bodies
q := hfeSearch()
q.Limit = 1
p := planFor(t, c, q)

rs, comp, err := c.Retrieve(p, noGov())
if err != nil {
t.Fatalf("Retrieve: %v", err)
}
if comp.State != contract.BestEffortState {
t.Fatalf("a Limit below the total must NOT Complete; want BestEffort, got %s (%s)", comp.State, comp.Reason)
}
if comp.ReconciledCount != 1 {
t.Fatalf("expected 1 record reconciled (the limit), got %d", comp.ReconciledCount)
}
if comp.AuthoritativeCount != 2 {
t.Fatalf("the authoritative esearch Count (2) must be reported even on a limited fetch, got %d", comp.AuthoritativeCount)
}
if len(rs.Records) != 1 {
t.Fatalf("records must be truncated to the limit (1), got %d", len(rs.Records))
}
// The truncation is over the imposed ASCENDING numeric sort, so the single kept id
// is the smallest VariationID (14 -> VCV000000014), not the server's newest-first 18180.
if rs.Records[0].SourceAccession != "VCV000000014" {
t.Fatalf("the kept record must be the first in ascending sorted order (VCV000000014), got %q", rs.Records[0].SourceAccession)
}
if !strings.Contains(comp.Reason, "limit") {
t.Fatalf("reason should explain the limit stop, got %q", comp.Reason)
}
}

// TestRetrieve_LimitAtOrAboveTotalStillCompletes is the over-rejection guard: a Limit
// that does not constrain the result (>= the authoritative Count) must NOT downgrade a
// legitimately complete walk to BestEffort.
func TestRetrieve_LimitAtOrAboveTotalStillCompletes(t *testing.T) {
c := newConn(t, completeDoer(t, hfeTerm)) // count=2
q := hfeSearch()
q.Limit = 5 // > total of 2
p := planFor(t, c, q)

rs, comp, err := c.Retrieve(p, noGov())
if err != nil {
t.Fatalf("Retrieve: %v", err)
}
if comp.State != contract.CompleteState || comp.ReconciledCount != 2 || comp.AuthoritativeCount != 2 {
t.Fatalf("a non-constraining limit must still Complete(2,2), got %s %d/%d", comp.State, comp.ReconciledCount, comp.AuthoritativeCount)
}
if len(rs.Records) != 2 {
t.Fatalf("expected all 2 records, got %d", len(rs.Records))
}
}

// TestRetrieve_LimitStopsPaginationEarly proves the walk ceiling bounds the esearch
// idlist walk ITSELF: with a multi-page upstream (page size 1, count=2) and Limit=1,
// the connector stops after the FIRST esearch page and never requests esearch page 2
// nor the second efetch. This is the load-bearing behavior — without the ceiling a
// --limit 1 count query over a large gene pages + efetches the whole idlist and times
// out. The authoritative Count (2) from page 1 is still reported.
func TestRetrieve_LimitStopsPaginationEarly(t *testing.T) {
f := newFakeDoer()
f.set("GET", releaseURL(), 206, loadFixture(t, "release_header.xml.gz"), nil)
// page1 lists 18180 (server newest-first order), count=2, retmax=1 (a full page).
f.set("GET", esearchURL(hfeTerm, 0, 1), 200, loadFixture(t, "esearch_page1.json"), nil)
// page2 IS registered but must NEVER be requested once the limit is reached on page 1.
f.set("GET", esearchURL(hfeTerm, 1, 1), 200, loadFixture(t, "esearch_page2.json"), nil)
// After collecting id 18180 (the only page-1 id) the walk ceiling fires (limit 1 <
// authority 2); the single fetched id is 18180.
f.set("GET", efetchURL("18180"), 200, loadFixture(t, "efetch_18180.xml"), nil)
// efetch_14 IS registered but must NEVER be requested (14 was never enumerated).
f.set("GET", efetchURL("14"), 200, loadFixture(t, "efetch_14.xml"), nil)

c, err := New(specWithPageSize(1), f)
if err != nil {
t.Fatalf("New: %v", err)
}
q := hfeSearch()
q.Limit = 1
p := planFor(t, c, q)

rs, comp, err := c.Retrieve(p, noGov())
if err != nil {
t.Fatalf("Retrieve: %v", err)
}
if comp.State != contract.BestEffortState || comp.AuthoritativeCount != 2 || len(rs.Records) != 1 {
t.Fatalf("want BestEffort with authoritative 2 and 1 record, got %s auth=%d n=%d", comp.State, comp.AuthoritativeCount, len(rs.Records))
}
// Prove esearch page 2 and the second efetch were NEVER requested: the walk ceiling
// stopped pagination after page 1. Expected calls: release(start), esearch page1,
// efetch 18180, release(end) = 4.
for _, req := range f.requestList() {
if strings.Contains(req, "retstart=1") {
t.Fatalf("walk ceiling must NOT request esearch page 2 (retstart=1), saw %q", req)
}
if strings.Contains(req, efetchURL("14")) {
t.Fatalf("walk ceiling must NOT efetch the un-enumerated id 14, saw %q", req)
}
}
if f.callCount() != 4 {
t.Fatalf("walk ceiling must stop after page 1 (want 4 calls: release+esearch+1 efetch+release), got %d", f.callCount())
}
}

// TestRetrieve_LimitEqualUnderReportedTotalStillDowngrades: the esearch idlist returns
// MORE distinct ids (2) than its Count claims (1). A Limit equal to that under-reported
// Count must NOT let truncation drop the extra id and fake fetched == authority — the
// over-observation must still downgrade to BestEffort. Truncation only applies when
// limit < authority, so this stays a full, honest fetch that exposes the inconsistency.
func TestRetrieve_LimitEqualUnderReportedTotalStillDowngrades(t *testing.T) {
f := newFakeDoer()
f.set("GET", releaseURL(), 206, loadFixture(t, "release_header.xml.gz"), nil)
// 2 distinct ids on the page, but Count claims only 1 (an under-reported total).
f.set("GET", esearchURL(hfeTerm, 0, 500), 200, []byte(`{"esearchresult":{"count":"1","idlist":["18180","14"]}}`), nil)
f.set("GET", efetchURL("14"), 200, loadFixture(t, "efetch_14.xml"), nil)
f.set("GET", efetchURL("18180"), 200, loadFixture(t, "efetch_18180.xml"), nil)
c := newConn(t, f)
q := hfeSearch()
q.Limit = 1 // == the under-reported count; truncation must NOT engage
p := planFor(t, c, q)

rs, comp, err := c.Retrieve(p, noGov())
if err != nil {
t.Fatalf("Retrieve: %v", err)
}
if comp.State != contract.BestEffortState {
t.Fatalf("a limit equal to an under-reported total must not let truncation fake a Complete; want BestEffort, got %s (%s)", comp.State, comp.Reason)
}
if len(rs.Records) != 2 {
t.Fatalf("every observed id must be kept (no truncation at limit==authority), got %d records", len(rs.Records))
}
}

// --- Retrieve: downgrades --------------------------------------------------

func TestRetrieve_MissingID_BestEffort(t *testing.T) {
Expand Down
Loading
Loading