feat(DENG-11205): Productionizing Probe Fetcher Pipeline#9557
feat(DENG-11205): Productionizing Probe Fetcher Pipeline#9557phil-lee70 wants to merge 3 commits into
Conversation
There was a problem hiding this comment.
This PR adds two new weekly query.py tasks under data_governance_metadata_derived — lineage_mapping_v1 (walks DataHub upstream lineage to resolve each profiled table's source ping) and probe_definitions_v1 (fetches probe definitions per ping from the Glean Dictionary or a _stable table's DataHub schema), chained downstream of column_profiles_v1 via referenced_tables.
The code closely follows the established column_profiles_v1 pattern: self-contained vendored helpers, explicit BigQuery schemas matching schema.yaml, partition-decorator WRITE_TRUNCATE loads for idempotency, and the secrets:/scheduling blocks match existing conventions. The requests dependency is fine — it's a pinned direct dep in requirements.txt, present in the runner image. Schemas, field modes, and partition/clustering config are all consistent between the Python schemas, schema.yaml, and metadata.yaml. The two inline notes are minor robustness/determinism observations, not blockers.
| SELECT | ||
| ping_platform, | ||
| source_ping, | ||
| ANY_VALUE(stable_urn) AS stable_urn |
There was a problem hiding this comment.
suggestion: ANY_VALUE(stable_urn) silently picks an arbitrary stable_urn when a legacy source_ping resolves to more than one stable table across the profiled derived tables. For legacy telemetry, source_ping is just the bare ping name (e.g. main), with no app prefix, so two derived tables that both trace back to a main ping but through different _stable tables would collapse into one group and the winning stable_urn is non-deterministic. Since fetch_legacy_probes reads the field schema of exactly that stable_urn, the probe set for the ping would then depend on which row ANY_VALUE happened to pick. If a single canonical stable table per ping name isn't guaranteed, consider grouping by stable_urn too (or MIN/MAX for determinism) so the choice is at least stable run-to-run.
There was a problem hiding this comment.
This may be worth considering
There was a problem hiding this comment.
agree, replaced with MAX(stable_urn) for deterministic answer
| f"Lineage failed for {project}.{dataset}.{table_name}: {e}. " | ||
| f"Skipping row — not caching as NULL." | ||
| ) | ||
| return None |
There was a problem hiding this comment.
suggestion: On a DataHub transport error this skips the row, and save_lineage does a WRITE_TRUNCATE of the whole partition with only the successfully-resolved rows — so a skipped table is simply absent from this run's partition. The module docstring frames this as "the next run retries," but with the weekly cadence and probe_definitions_v1 consuming the same partition on the same ds, a transient blip drops that table's ping (and therefore its probes) for the entire week, not just for a moment. A bounded retry/backoff around the resolve_ping/_datahub_graphql call would close that gap and keep the "skip rather than poison the cache" semantics intact for genuinely persistent failures.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
| if not rows: | ||
| logger.warning("No lineage rows produced; nothing to write.") | ||
| return |
There was a problem hiding this comment.
This appears to indicate a more serious error. Perhaps consider raising for errors when all attempts (retries) fail? (examples that i can think: DataHub outage, token expiration, etc)
Something similar to this
There was a problem hiding this comment.
agreed, implemented fail loud pattern
| if not rows: | ||
| logger.warning("No probe rows produced; nothing to write.") | ||
| return |
There was a problem hiding this comment.
Similar to the other one...raising for serious total failure errors or every fetch returns empty.
There was a problem hiding this comment.
agreed, implemented fail loud pattern
gkatre
left a comment
There was a problem hiding this comment.
@phil-lee70 Nicely done!
I left a couple of comments perhaps worth considering. I also see a couple comments from the bot. Otherwise this is good to go.
Integration report
|
Description
Productionizes the lineage + probe-fetch portion of the schema_enricher metadata-completeness pipeline as two new weekly bqetl tasks under
data_governance_metadata_derived, downstream of Gaurang'scolumn_profiles_v1.New tables
lineage_mapping_v1— for each table profiled incolumn_profiles_v1, walks DataHub upstream lineage to identify the source telemetry ping (Glean or Legacy Telemetry). One row per source table.probe_definitions_v1— for each distinct ping inlineage_mapping_v1, fetches probe definitions from the Glean Dictionary (Glean) or the closest upstream _stable table's DataHub schema (Legacy Telemetry). One row per probe.Both tasks run on the existing
bqetl_data_governance_metadataDAG (weekly, Mondays 04:00 UTC). Dependency chain is expressed viareferenced_tablesin eachmetadata.yaml:column_profiles_v1 → lineage_mapping_v1 → probe_definitions_v1Design notes
data-shared-llm-agents/agents/schema_enricher/src/schema_enricher/tools/probe_fetcher.pyso the scripts depend only ongoogle-cloud-bigquery+requests. Helper functions (_select_ping_from_lineage,fetch_glean_probes,fetch_legacy_probes) are preserved byte-equivalent — including the multi-app aggregation branch for cross-app Glean tables.One pair of tables, not per-source-dataset. Lineage and probe data is small (one row per table, one row per probe), so the per-dataset split Gaurang used for column_profiles_v1 isn't needed here.
resolved_at/fetched_atwith 90-day retention. Older partitions reflect DataHub/Glean state at the time of run, not historical state.Verification
python -m py_compileclean on bothquery.pyfiles.bqetl dag generate bqetl_data_governance_metadataproduces the expectedset_upstreamchain.--dry-runand ad-hoc single-input flags (--tableson lineage,--pingson probes) for verification against the agent'sprobe_fetcher.py.Related Tickets & Documents
Reviewer, please follow this checklist