From 66d4455653eb48e0d740d84b977227d02e8a7100 Mon Sep 17 00:00:00 2001 From: gkabbz Date: Mon, 13 Apr 2026 10:43:16 -0400 Subject: [PATCH 1/7] feat: Add three phase BQ field profiling and metadata description pipeline --- script/metadata/description_reconciler.py | 534 +++++++++++++++++++++ script/metadata/field_profiler.py | 553 ++++++++++++++++++++++ script/metadata/lineage_probe_fetcher.py | 548 +++++++++++++++++++++ 3 files changed, 1635 insertions(+) create mode 100644 script/metadata/description_reconciler.py create mode 100644 script/metadata/field_profiler.py create mode 100644 script/metadata/lineage_probe_fetcher.py diff --git a/script/metadata/description_reconciler.py b/script/metadata/description_reconciler.py new file mode 100644 index 00000000000..2dc40389f9a --- /dev/null +++ b/script/metadata/description_reconciler.py @@ -0,0 +1,534 @@ +"""Reconcile Phase 1 data-driven descriptions with Phase 2 probe definitions. + +For each profiled column, finds candidate probes from the source ping, +then calls Claude to reconcile observed data (Phase 1) with source intent (Phase 2). +For columns with no probe match, fetches the ETL query from DataHub to provide +the actual computation logic (e.g. COUNTIF(is_dau) for a dau column). +Produces a final description and routing hint (global vs dataset-scoped). +""" + +import base64 +import json +import logging +import os +import re +from argparse import ArgumentParser +from datetime import datetime, timezone + +import anthropic +import requests +from google.api_core.exceptions import NotFound +from google.cloud import bigquery + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s: %(levelname)s: %(message)s" +) + +PHASE1_TABLE = "mozdata-nonprod.analysis.gkabbz_data_profiling_test_v1" +MAPPING_TABLE = "mozdata-nonprod.analysis.gkabbz_metadata_phase2_table_pings_v1" +PROBE_TABLE = "mozdata-nonprod.analysis.gkabbz_metadata_phase2_ping_probes_v1" +DEST_TABLE = "mozdata-nonprod.analysis.gkabbz_metadata_phase3_reconciled_v1" +DEST_PROJECT = "mozdata-nonprod" +CLAUDE_MODEL = "claude-sonnet-4-6" +TOP_N_PROBES = 10 + +DEST_SCHEMA = [ + bigquery.SchemaField( + "source_project", + "STRING", + mode="REQUIRED", + description="GCP project of the profiled BQ table.", + ), + bigquery.SchemaField( + "source_dataset", + "STRING", + mode="REQUIRED", + description="BQ dataset of the profiled table.", + ), + bigquery.SchemaField( + "source_table", + "STRING", + mode="REQUIRED", + description="BQ table name of the profiled table.", + ), + bigquery.SchemaField( + "column_name", + "STRING", + mode="REQUIRED", + description="Column name; dot notation for nested STRUCT leaf fields (e.g. client_info.app_channel).", + ), + bigquery.SchemaField( + "data_type", + "STRING", + mode="NULLABLE", + description="BQ data type of the column.", + ), + bigquery.SchemaField( + "source_ping", + "STRING", + mode="NULLABLE", + description="Name of the source telemetry ping, from Phase 2 lineage mapping. Null if no ping found.", + ), + bigquery.SchemaField( + "ping_platform", + "STRING", + mode="NULLABLE", + description="Telemetry platform: 'glean' or 'legacy_telemetry'. Null if no ping found.", + ), + bigquery.SchemaField( + "pass1_description", + "STRING", + mode="NULLABLE", + description="Phase 1 data-driven description based on observed values in the BQ table.", + ), + bigquery.SchemaField( + "matched_probe", + "STRING", + mode="NULLABLE", + description="Probe name Claude identified as the best match for this column. Null if no matching probe found.", + ), + bigquery.SchemaField( + "probe_description", + "STRING", + mode="NULLABLE", + description="Description of the matched probe from the source artifact (Glean Dictionary or stable table schema).", + ), + bigquery.SchemaField( + "final_description", + "STRING", + mode="NULLABLE", + description="Claude-reconciled description combining observed data (Phase 1) with source intent (Phase 2). Falls back to pass1_description on error.", + ), + bigquery.SchemaField( + "contradiction", + "STRING", + mode="NULLABLE", + description="Note flagging any contradiction between the Phase 1 observed behavior and the probe or source definition. Null if no contradiction found.", + ), + bigquery.SchemaField( + "routing_hint", + "STRING", + mode="NULLABLE", + description="Routing recommendation: 'global' (consistent across datasets, candidate for global.yaml), 'dataset' (dataset-specific), or 'unknown'.", + ), + bigquery.SchemaField( + "processed_at", + "TIMESTAMP", + mode="REQUIRED", + description="Timestamp when this column was reconciled.", + ), +] + + +GITHUB_REPO = "mozilla/bigquery-etl" +GITHUB_API = "https://api.github.com/repos/{repo}/contents/{path}" + + +def fetch_github_query(project, dataset, table): + """Fetch query.sql or query.py from GitHub for a BQ table. + + Tries query.sql first, then query.py. Uses GITHUB_TOKEN env var if set + (required for private repos; optional for public ones). + Returns (content, filename) or (None, None) if not found. + """ + token = os.environ.get("GITHUB_TOKEN") + headers = {"Accept": "application/vnd.github.v3+json"} + if token: + headers["Authorization"] = f"Bearer {token}" + + for filename in ("query.sql", "query.py"): + path = f"sql/{project}/{dataset}/{table}/{filename}" + url = GITHUB_API.format(repo=GITHUB_REPO, path=path) + try: + response = requests.get(url, headers=headers, timeout=30) + except Exception as e: + logging.warning(f"GitHub request failed for {path}: {e}") + return None, None + if response.status_code == 200: + content = base64.b64decode(response.json()["content"]).decode("utf-8") + logging.info(f" Fetched {filename} from GitHub ({len(content)} chars)") + return content, filename + elif response.status_code == 404: + continue + else: + logging.warning(f"GitHub API returned {response.status_code} for {path}") + return None, None + + return None, None + + +# --- Probe matching --- + + +def normalize_name(name): + """Strip all non-alphanumeric characters and lowercase for fuzzy matching.""" + return re.sub(r"[^a-z0-9]", "", name.lower()) + + +def find_matching_probes(column_name, probes): + """Return up to TOP_N_PROBES candidate probes for a column name. + + Scores probes by how closely their name matches the column name: + 3 — exact normalized match + 2 — probe name ends or starts with the column name (e.g. app_name → application.name) + 1 — substring match in either direction + """ + col_norm = normalize_name(column_name) + scored = [] + for probe in probes: + pname = probe.get("probe_name") or "" + if not pname: + continue + pname_norm = normalize_name(pname) + if col_norm == pname_norm: + score = 3 + elif pname_norm.endswith(col_norm) or pname_norm.startswith(col_norm): + score = 2 + elif col_norm in pname_norm or pname_norm in col_norm: + score = 1 + else: + continue + scored.append((score, probe)) + + scored.sort(key=lambda x: -x[0]) + return [p for _, p in scored[:TOP_N_PROBES]] + + +# --- Claude reconciler --- + + +def build_reconcile_prompt( + column_name, + data_type, + table, + source_ping, + ping_platform, + pass1_description, + matching_probes, + github_query=None, + github_filename=None, +): + """Build a Claude prompt to reconcile Phase 1 and Phase 2 descriptions.""" + ping_info = ( + f"{source_ping} ({ping_platform})" + if source_ping + else "unknown (no lineage resolved)" + ) + + if matching_probes: + probe_lines = [] + for p in matching_probes: + desc = p["probe_description"] or "(no description)" + probe_lines.append( + f" - {p['probe_name']} ({p['probe_type'] or 'unknown type'}): {desc}" + ) + source_section = ( + "Candidate probe definitions from the source ping:\n" + + "\n".join(probe_lines) + ) + elif github_query: + source_section = ( + f"No matching probe definitions found.\n\n" + f"ETL query ({github_filename}) from the bqetl GitHub repo:\n" + f"```sql\n{github_query}\n```" + ) + else: + source_section = "No matching probe definitions found — this column is likely computed or aggregated." + + return ( + "You are a data documentation expert for Mozilla's BigQuery data warehouse.\n\n" + f"Column: {column_name}\n" + f"Table: {table}\n" + f"Data type: {data_type}\n" + f"Source ping: {ping_info}\n\n" + f"Phase 1 description (based on observed data values):\n{pass1_description}\n\n" + f"{source_section}\n\n" + "Task:\n" + "1. Identify which probe this column maps to (if any). Probe names may use dots instead of underscores.\n" + " If an ETL query is provided, find how this column is computed in the SELECT clause.\n" + "2. Write a final 1-2 sentence description combining observed data (Phase 1) with source intent.\n" + "3. Note any contradiction between Phase 1 and the source definition.\n" + "4. Routing hint: 'global' if this field is generic across Mozilla products," + " 'dataset' if specific to this dataset or product, 'unknown' if unclear.\n\n" + "Respond with a JSON object only (no markdown fences):\n" + '{"matched_probe": "", "final_description": "", ' + '"contradiction": "", "routing_hint": "global|dataset|unknown"}' + ) + + +def call_claude(claude_client, prompt): + """Call Claude and parse the JSON response.""" + response = claude_client.messages.create( + model=CLAUDE_MODEL, + max_tokens=1024, + messages=[{"role": "user", "content": prompt}], + ) + text = response.content[0].text.strip() + # Strip markdown fences if Claude wraps the JSON despite being told not to + text = re.sub(r"^```(?:json)?\s*", "", text) + text = re.sub(r"\s*```$", "", text) + return json.loads(text) + + +def reconcile_column( + claude_client, + column_name, + data_type, + table, + source_ping, + ping_platform, + pass1_description, + matching_probes, + github_query=None, + github_filename=None, +): + """Reconcile one column and return a result dict, falling back to Phase 1 on error.""" + prompt = build_reconcile_prompt( + column_name, + data_type, + table, + source_ping, + ping_platform, + pass1_description, + matching_probes, + github_query, + github_filename, + ) + try: + return call_claude(claude_client, prompt) + except Exception as e: + logging.error(f"Claude call failed for {column_name}: {e}") + return { + "matched_probe": None, + "final_description": pass1_description, + "contradiction": None, + "routing_hint": "unknown", + } + + +# --- BQ loaders --- + + +def load_phase1(bq_client, project=None, dataset=None, table=None): + """Load Phase 1 columns grouped by (project, dataset, table). + + Skips undocumented columns (no pass1_description). + """ + where = "WHERE column_tier != 'undocumented' AND pass1_description IS NOT NULL" + if project and dataset and table: + where += f" AND source_project = '{project}' AND source_dataset = '{dataset}' AND source_table = '{table}'" + query = f""" + SELECT source_project, source_dataset, source_table, column_name, data_type, pass1_description + FROM `{PHASE1_TABLE}` + {where} + ORDER BY source_dataset, source_table, column_name + """ + rows_by_table = {} + for row in bq_client.query(query).result(): + key = (row.source_project, row.source_dataset, row.source_table) + if key not in rows_by_table: + rows_by_table[key] = [] + rows_by_table[key].append( + { + "column_name": row.column_name, + "data_type": row.data_type, + "pass1_description": row.pass1_description, + } + ) + return rows_by_table + + +def load_ping_mapping(bq_client): + """Load table-to-ping mapping from Phase 2 mapping table.""" + query = f""" + SELECT source_project, source_dataset, source_table, source_ping, ping_platform + FROM `{MAPPING_TABLE}` + """ + mapping = {} + for row in bq_client.query(query).result(): + key = (row.source_project, row.source_dataset, row.source_table) + mapping[key] = { + "source_ping": row.source_ping, + "ping_platform": row.ping_platform, + } + return mapping + + +def load_probes_by_ping(bq_client): + """Load all probe definitions from Phase 2, grouped by (ping_platform, source_ping).""" + query = f""" + SELECT ping_platform, source_ping, probe_name, probe_description, probe_type + FROM `{PROBE_TABLE}` + """ + probes_by_ping = {} + for row in bq_client.query(query).result(): + key = (row.ping_platform, row.source_ping) + if key not in probes_by_ping: + probes_by_ping[key] = [] + probes_by_ping[key].append( + { + "probe_name": row.probe_name, + "probe_description": row.probe_description, + "probe_type": row.probe_type, + } + ) + return probes_by_ping + + +def get_already_reconciled(bq_client): + """Return set of (project, dataset, table, column) already in the Phase 3 table.""" + try: + query = f""" + SELECT source_project, source_dataset, source_table, column_name + FROM `{DEST_TABLE}` + """ + return { + (r.source_project, r.source_dataset, r.source_table, r.column_name) + for r in bq_client.query(query).result() + } + except NotFound: + return set() + + +# --- BQ writer --- + + +def save_to_bq(bq_client, records): + """Append reconciled records to the Phase 3 table.""" + job_config = bigquery.LoadJobConfig( + schema=DEST_SCHEMA, + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + ) + job = bq_client.load_table_from_json(records, DEST_TABLE, job_config=job_config) + job.result() + logging.info(f"Saved {len(records)} rows to {DEST_TABLE}") + + +# --- Main --- + + +def parse_args(): + """Parse command line arguments.""" + parser = ArgumentParser( + description="Reconcile Phase 1 and Phase 2 descriptions to produce final field descriptions." + ) + parser.add_argument( + "--table", + help="Fully qualified BQ table (project.dataset.table). If omitted, processes all Phase 1 tables.", + ) + return parser.parse_args() + + +def main(): + """Reconcile Phase 1 data descriptions with Phase 2 probe definitions for all profiled tables.""" + args = parse_args() + bq_client = bigquery.Client(project=DEST_PROJECT) + claude_client = anthropic.Anthropic() + + project, dataset, table = (None, None, None) + if args.table: + project, dataset, table = args.table.split(".") + + logging.info("Loading Phase 1, Phase 2 mapping and probes...") + phase1 = load_phase1(bq_client, project, dataset, table) + ping_mapping = load_ping_mapping(bq_client) + probes_by_ping = load_probes_by_ping(bq_client) + already_done = get_already_reconciled(bq_client) + logging.info( + f"{len(phase1)} tables to process, {len(already_done)} columns already reconciled" + ) + + now = datetime.now(timezone.utc).isoformat() + + for (proj, ds, tbl), columns in phase1.items(): + ping_info = ping_mapping.get((proj, ds, tbl), {}) + source_ping = ping_info.get("source_ping") + ping_platform = ping_info.get("ping_platform") + ping_probes = ( + probes_by_ping.get((ping_platform, source_ping), []) if source_ping else [] + ) + + pending = [ + c for c in columns if (proj, ds, tbl, c["column_name"]) not in already_done + ] + if not pending: + logging.info( + f"Skipping {ds}.{tbl} — all {len(columns)} columns already reconciled" + ) + continue + + # Fetch GitHub query once per table — used for columns with no probe match + has_unmatched = any( + not find_matching_probes(c["column_name"], ping_probes) for c in pending + ) + github_query, github_filename = None, None + if has_unmatched: + logging.info(f" Fetching query from GitHub for {ds}.{tbl}") + github_query, github_filename = fetch_github_query(proj, ds, tbl) + if not github_query: + logging.info(f" No GitHub query found for {ds}.{tbl}") + + logging.info( + f"--- {ds}.{tbl} | ping: {source_ping or 'none'} | {len(ping_probes)} probes | {len(pending)} columns ---" + ) + + records = [] + for col in pending: + col_name = col["column_name"] + pass1 = col["pass1_description"] + matching_probes = find_matching_probes(col_name, ping_probes) + + # Only pass GitHub query for columns with no probe match + col_github_query = github_query if not matching_probes else None + col_github_filename = github_filename if not matching_probes else None + logging.info( + f" {col_name} → {len(matching_probes)} probes {'(+GitHub query)' if col_github_query else ''}" + ) + + result = reconcile_column( + claude_client, + col_name, + col["data_type"], + f"{ds}.{tbl}", + source_ping, + ping_platform, + pass1, + matching_probes, + github_query=col_github_query, + github_filename=col_github_filename, + ) + + matched_probe = result.get("matched_probe") + probe_desc = None + if matched_probe: + probe_match = next( + (p for p in ping_probes if p["probe_name"] == matched_probe), None + ) + probe_desc = probe_match["probe_description"] if probe_match else None + + records.append( + { + "source_project": proj, + "source_dataset": ds, + "source_table": tbl, + "column_name": col_name, + "data_type": col["data_type"], + "source_ping": source_ping, + "ping_platform": ping_platform, + "pass1_description": pass1, + "matched_probe": matched_probe, + "probe_description": probe_desc, + "final_description": result.get("final_description"), + "contradiction": result.get("contradiction"), + "routing_hint": result.get("routing_hint"), + "processed_at": now, + } + ) + + save_to_bq(bq_client, records) + logging.info(f" Done {ds}.{tbl} ({len(records)} columns)") + + +if __name__ == "__main__": + main() diff --git a/script/metadata/field_profiler.py b/script/metadata/field_profiler.py new file mode 100644 index 00000000000..f06c3732bc3 --- /dev/null +++ b/script/metadata/field_profiler.py @@ -0,0 +1,553 @@ +"""Profile every column in a BigQuery table — distinct values, frequency, and null rate.""" + +import logging +import random +import re +import time +from argparse import ArgumentParser +from datetime import datetime, timezone + +import anthropic +from google.cloud import bigquery + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s: %(levelname)s: %(message)s" +) + +# Cardinality threshold: fields with <= 50 distinct values store the full value + frequency list. +# Fields above the threshold store only the distinct count and one example value (shape, not enumeration) +HIGH_CARDINALITY_THRESHOLD = 50 +SKIP_DISTINCT_TYPES = { + "STRUCT", + "JSON", + "RECORD", + "ARRAY", +} # COUNT(DISTINCT ...) not supported for these types +TIER3_COLUMNS = { + "metrics" +} # wide/repeated STRUCTs deferred to a future job. Not currently profiled. +SOURCE_PROJECT = "moz-fx-data-shared-prod" +DEST_TABLE = "mozdata-nonprod.analysis.gkabbz_data_profiling_test_v1" +CLAUDE_MODEL = "claude-sonnet-4-6" + +DEST_SCHEMA = [ + bigquery.SchemaField( + "source_project", + "STRING", + mode="REQUIRED", + description="GCP project of the profiled table (e.g. moz-fx-data-shared-prod).", + ), + bigquery.SchemaField( + "source_dataset", + "STRING", + mode="REQUIRED", + description="BigQuery dataset of the profiled table.", + ), + bigquery.SchemaField( + "source_table", + "STRING", + mode="REQUIRED", + description="Name of the profiled table (without project or dataset).", + ), + bigquery.SchemaField( + "column_name", + "STRING", + mode="REQUIRED", + description="Field path of the profiled column, using dot notation for nested fields (e.g. client_info.app_channel).", + ), + bigquery.SchemaField( + "data_type", + "STRING", + mode="REQUIRED", + description="BigQuery data type of the column as reported by INFORMATION_SCHEMA.", + ), + bigquery.SchemaField( + "null_rate", + "FLOAT", + mode="NULLABLE", + description="Percentage of rows where this column is NULL, rounded to one decimal place. Null for undocumented (Tier 3) columns.", + ), + bigquery.SchemaField( + "distinct_count", + "INTEGER", + mode="NULLABLE", + description=( + "Number of distinct non-null values observed. Null for high-cardinality columns" + " where COUNT(DISTINCT) is unsupported (STRUCT, ARRAY, JSON) or for undocumented columns." + ), + ), + bigquery.SchemaField( + "is_high_cardinality", + "BOOLEAN", + mode="NULLABLE", + description=( + "True if distinct_count exceeds the cardinality threshold (50)." + " High-cardinality columns store only an example value;" + " low-cardinality columns store the full value/frequency list." + ), + ), + bigquery.SchemaField( + "example_value", + "STRING", + mode="NULLABLE", + description=( + "A single representative value sampled from the column. Only populated for high-cardinality columns." + " PII-like values (emails, UUIDs, etc.) are stored as-is here but should not be surfaced in descriptions." + ), + ), + bigquery.SchemaField( + "values", + "RECORD", + mode="REPEATED", + description="Full value/frequency list for low-cardinality columns (distinct_count <= 50). Empty for high-cardinality or undocumented columns.", + fields=[ + bigquery.SchemaField( + "value", + "STRING", + mode="NULLABLE", + description="A distinct value observed in the column, cast to STRING.", + ), + bigquery.SchemaField( + "frequency", + "INTEGER", + mode="NULLABLE", + description="Number of rows containing this value in the profiled sample.", + ), + ], + ), + bigquery.SchemaField( + "pass1_description", + "STRING", + mode="NULLABLE", + description="Data-driven description generated by Claude based solely on observed column statistics (Pass 1). Null for undocumented (Tier 3) columns.", + ), + bigquery.SchemaField( + "column_tier", + "STRING", + mode="NULLABLE", + description=( + "Profiling tier: 'scalar' (top-level non-STRUCT column)," + " 'leaf' (leaf field inside a simple non-repeated STRUCT, profiled using dot notation)," + " or 'undocumented' (ARRAY/REPEATED fields or deferred columns e.g. metrics," + " stored as a placeholder with no profiling data)." + ), + ), + bigquery.SchemaField( + "profiled_at", + "TIMESTAMP", + mode="REQUIRED", + description="UTC timestamp of when this row was written to the profiling table.", + ), +] + + +def parse_args(): + """Parse command line arguments.""" + parser = ArgumentParser(description="Profile every column in a BigQuery table.") + parser.add_argument( + "--table", + required=True, + help="Fully qualified BQ table (project.dataset.table).", + ) + return parser.parse_args() + + +def get_columns(client, table): + """Return (field_path, data_type, tier) tuples for the given table. + + Queries INFORMATION_SCHEMA to collect all columns and assign each a tier + Tier assignment drives everything downstream i.e. what gets profiled, a description or what's passed over + + Tiers: + scalar - top-level non-STRUCT column + leaf - leaf field inside a simple (non-repeated) STRUCT + undocumented - ARRAY/REPEATED fields or TIER3_COLUMNS (no profiling - stored with null description) + """ + project, dataset, table_name = table.split(".") + query = f""" + SELECT + fp.field_path, + fp.data_type, + fp.column_name, + c.data_type AS top_level_data_type + FROM `{project}`.`{dataset}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS AS fp + JOIN `{project}`.`{dataset}`.INFORMATION_SCHEMA.COLUMNS AS c + ON fp.table_name = c.table_name AND fp.column_name = c.column_name + WHERE fp.table_name = '{table_name}' + ORDER BY c.ordinal_position, fp.field_path + """ + results = [] + for row in client.query(query).result(): + field_path = row.field_path + data_type = row.data_type + column_name = row.column_name + top_level_data_type = row.top_level_data_type + + is_tier3 = ( + top_level_data_type.upper().startswith("ARRAY") + or column_name in TIER3_COLUMNS + ) + if is_tier3: + if field_path == column_name: + results.append((field_path, top_level_data_type, "undocumented")) + continue + + if data_type.upper().startswith("STRUCT"): + continue # intermediate STRUCT node, not a leaf + + tier = "leaf" if "." in field_path else "scalar" + results.append((field_path, data_type, tier)) + + return results + + +def field_path_to_sql(field_path): + """Convert dot-notation field path to backtick-quoted SQL reference. + + i.e. converts client_info.app_channel → `client_info`.`app_channel` + """ + return ".".join(f"`{part}`" for part in field_path.split(".")) + + +def field_path_to_alias(field_path): + """Convert dot-notation field path to a flat alias (dots → double underscores) for use as column alias. + + i.e. client_info.app_channel → client_info__app_channel + """ + return field_path.replace(".", "__") + + +def build_profile_query(table, columns, partition_filter=None): + """ + Build a single query that profiles every profilable column in one pass. + + Undocumented (Tier 3) columns are excluded — they will be handled separately. + If the table has a sample_id column, samples a random 1% slice. + """ + profilable = [(fp, dt) for fp, dt, tier in columns if tier != "undocumented"] + has_sample_id = any(fp == "sample_id" for fp, _ in profilable) + + total_rows = "COUNT(*) AS total_rows" + col_stats = [] + for field_path, data_type in profilable: + alias = field_path_to_alias(field_path) + sql_ref = field_path_to_sql(field_path) + col_stats.append(f"COUNTIF({sql_ref} IS NULL) AS `{alias}__nulls`") + if any(t in data_type.upper() for t in SKIP_DISTINCT_TYPES): + col_stats.append(f"NULL AS `{alias}__distinct`") + else: + col_stats.append(f"COUNT(DISTINCT {sql_ref}) AS `{alias}__distinct`") + col_stats.append( + f"APPROX_TOP_COUNT({sql_ref}, 50) AS `{alias}__top_values`" + ) + col_stats.append(f"ANY_VALUE({sql_ref}) AS `{alias}__example`") + + select = ",\n ".join([total_rows] + col_stats) + + where_clauses = [] + if partition_filter: + where_clauses.append(partition_filter) + if has_sample_id: + sample_id = random.randint(1, 99) + where_clauses.append(f"`sample_id` = {sample_id}") + logging.info( + f"sample_id column detected — using sample_id = {sample_id} (~1% sample)" + ) + + where = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" + return f"SELECT {select} FROM `{table}` {where}" + + +def check_table_has_rows(client, table): + """Return True if the table has at least one row, False if empty.""" + project, dataset, table_name = table.split(".") + query = f""" + SELECT total_rows + FROM `{project}.region-us`.INFORMATION_SCHEMA.TABLE_STORAGE + WHERE project_id = '{project}' + AND table_schema = '{dataset}' + AND table_name = '{table_name}' + """ + rows = list(client.query(query).result()) + if not rows: + logging.warning(f"Table not found in INFORMATION_SCHEMA: {table}") + return False + total_rows = rows[0].total_rows + if total_rows == 0: + logging.warning(f"Skipping {table} — table is empty (total_rows=0)") + return False + logging.info(f"Pre-flight check passed: {total_rows:,} rows in {table}") + return True + + +def profile_table(client, table): + """Profile every column in the table and return structured results.""" + # Get columns to get column tier list + columns = get_columns(client, table) + logging.info(f"Found {len(columns)} columns in {table}") + + # Run profile query without partition filter + profile_query = build_profile_query(table, columns) + partition_filter = None + + try: + job = client.query(profile_query) + rows = list(job.result()) + logging.info( + f"Profile query processed {job.total_bytes_processed / 1e9:.2f} GB" + ) + # if BQ throws "partition filter required" error, parses the partition column and retries with a 1 day window + except Exception as e: + error_msg = str(e) + if "without a filter over column(s)" in error_msg: + match = re.search(r"filter over column\(s\) '([^']+)'", error_msg) + if not match: + logging.error(f"Could not parse partition column from error: {e}") + return {} + partition_col = match.group(1) + logging.info( + f"Partition filter required on '{partition_col}', retrying with 1-day window from 7 days ago" + ) + partition_filter = ( + f"DATE(`{partition_col}`) = DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)" + ) + profile_query = build_profile_query(table, columns, partition_filter) + try: + job = client.query(profile_query) + rows = list(job.result()) + logging.info( + f"Profile query processed {job.total_bytes_processed / 1e9:.2f} GB" + ) + except Exception as retry_e: + logging.error(f"Retry failed: {retry_e}") + return {} + else: + logging.error(f"Failed to profile {table}: {e}") + return {} + + row = rows[0] + total_rows = row.total_rows + results = {} + + # Add undocumented (Tier 3) columns first — no profiling data, just a placeholder + for field_path, data_type, tier in columns: + if tier == "undocumented": + results[field_path] = { + "type": "undocumented", + "data_type": data_type, + "null_rate": None, + } + + # Profile scalar and leaf columns + for field_path, data_type, tier in columns: + if tier == "undocumented": + continue + alias = field_path_to_alias(field_path) + null_count = getattr(row, f"{alias}__nulls") + distinct_count = getattr(row, f"{alias}__distinct") + example = getattr(row, f"{alias}__example") + null_rate = round(null_count / total_rows * 100, 1) if total_rows > 0 else 0 + + top_values_raw = getattr(row, f"{alias}__top_values", []) or [] + top_values = [(item["value"], item["count"]) for item in top_values_raw] + + if distinct_count is not None and distinct_count <= HIGH_CARDINALITY_THRESHOLD: + results[field_path] = { + "type": "low_cardinality", + "data_type": data_type, + "distinct_count": distinct_count, + "null_rate": null_rate, + "values": top_values, + } + else: + results[field_path] = { + "type": "high_cardinality", + "data_type": data_type, + "distinct_count": distinct_count, + "null_rate": null_rate, + "example": str(example), + "values": top_values[:5], + } + + return results + + +def build_description_prompt(table, col, stats): + """Build a prompt for Pass 1 description generation.""" + lines = [ + f"Table: {table}", + f"Column: {col}", + f"Data type: {stats['data_type']}", + f"Null rate: {stats['null_rate']}%", + ] + if stats["type"] == "low_cardinality": + values_str = ", ".join(f"{str(v)!r} ({f:,})" for v, f in stats["values"][:10]) + lines.append(f"Distinct values: {stats['distinct_count']}") + lines.append(f"Values (value: frequency): {values_str}") + elif stats["type"] == "high_cardinality": + dc = stats["distinct_count"] + lines.append( + f"Distinct count: {dc:,} (high cardinality)" + if dc is not None + else "Distinct count: unknown (ARRAY type)" + ) + if stats.get("values"): + top_str = ", ".join(f"{str(v)!r} ({f:,})" for v, f in stats["values"]) + lines.append(f"Top values (value: frequency): {top_str}") + else: + lines.append(f"Example value: {stats['example']}") + else: + lines.append("Note: Complex type (STRUCT/JSON) — profiling skipped.") + lines.append(f"Example: {stats['example']}") + + context = "\n".join(lines) + return ( + "You are a data analyst writing field descriptions for a BigQuery data warehouse. " + "Based only on the observed data below, write a single concise sentence describing what this column contains. " + "Be specific — mention the type of values and what a high null rate implies about usage. " + "Do not speculate beyond what the data shows.\n\n" + "Important: if any example or observed value appears to be PII (e.g. an email address, UUID, user ID, IP address, or other personal identifier), " + "do not include the actual value in your description — use a randomly generated placeholder that looks like the email, uuid, or user_id instead.\n\n" + f"{context}\n\n" + "Description:" + ) + + +def generate_descriptions(table, profile): + """Call Claude API to generate a Pass 1 description for every column.""" + client = anthropic.Anthropic() + logging.info(f"Generating Pass 1 descriptions for {len(profile)} columns") + for col, stats in profile.items(): + if stats["type"] == "undocumented": + continue + prompt = build_description_prompt(table, col, stats) + try: + response = client.messages.create( + model=CLAUDE_MODEL, + max_tokens=256, + messages=[{"role": "user", "content": prompt}], + ) + stats["pass1_description"] = response.content[0].text.strip() + except Exception as e: + logging.error(f"Failed to generate description for {col}: {e}") + stats["pass1_description"] = None + return profile + + +def to_bq_records(table, profile): + """Convert profile results to a list of BigQuery row dicts.""" + project, dataset, table_name = table.split(".") + profiled_at = datetime.now(timezone.utc).isoformat() + records = [] + + for col, stats in profile.items(): + record = { + "source_project": project, + "source_dataset": dataset, + "source_table": table_name, + "column_name": col, + "data_type": stats["data_type"], + "null_rate": stats["null_rate"], + "distinct_count": None, + "is_high_cardinality": None, + "example_value": None, + "values": [], + "pass1_description": stats.get("pass1_description"), + "column_tier": stats["type"], + "profiled_at": profiled_at, + } + + if stats["type"] == "undocumented": + pass # null_rate already None, no other fields to populate + elif stats["type"] == "low_cardinality": + record["distinct_count"] = stats["distinct_count"] + record["is_high_cardinality"] = False + record["values"] = [ + {"value": str(v) if v is not None else None, "frequency": f} + for v, f in stats["values"] + ] + else: + record["distinct_count"] = stats["distinct_count"] + record["is_high_cardinality"] = True + record["example_value"] = stats.get("example") + record["values"] = [ + {"value": str(v) if v is not None else None, "frequency": f} + for v, f in stats.get("values", []) + ] + + records.append(record) + + return records + + +def save_to_bq(records, dest_table): + """Write profile records to the destination BigQuery table, creating it if needed.""" + client = bigquery.Client(project="mozdata-nonprod") + job_config = bigquery.LoadJobConfig( + schema=DEST_SCHEMA, + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION], + ) + job = client.load_table_from_json(records, dest_table, job_config=job_config) + job.result() + logging.info(f"Saved {len(records)} rows to {dest_table}") + + +def log_profile(table, profile): + """Log the table profile in a readable format.""" + logging.info("=" * 70) + logging.info(f"TABLE: {table}") + logging.info("=" * 70) + for col, stats in profile.items(): + logging.info(f" {col}") + logging.info(f" null rate: {stats['null_rate']}%") + if stats["type"] == "undocumented": + logging.info( + f" type: {stats['data_type']} (undocumented — deferred to future job)" + ) + elif stats["type"] == "low_cardinality": + logging.info(f" distinct count: {stats['distinct_count']}") + for value, freq in stats["values"]: + logging.info(f" {str(value)!r:40} {freq:>10,}") + else: + dc = stats["distinct_count"] + logging.info( + f" distinct count: {dc:,} (high cardinality)" + if dc is not None + else " distinct count: unknown (ARRAY type)" + ) + logging.info(f" example: {stats['example']}") + if stats.get("pass1_description"): + logging.info(f" description: {stats['pass1_description']}") + + +def main(): + """Profile every column in a BigQuery table and save results to BigQuery.""" + args = parse_args() + client = bigquery.Client(project=SOURCE_PROJECT) + if not check_table_has_rows(client, args.table): + return + logging.info(f"Profiling {args.table}") + start = time.time() + profile = profile_table(client, args.table) + profile = generate_descriptions(args.table, profile) + elapsed = time.time() - start + log_profile(args.table, profile) + records = to_bq_records(args.table, profile) + save_to_bq(records, DEST_TABLE) + logging.info(f"Done in {elapsed:.1f}s") + + +if __name__ == "__main__": + import sys + + # --- TEST BLOCK — only used when no args are passed on the command line --- + if len(sys.argv) == 1: + sys.argv = [ + "gk_field_sampler.py", + "--table", + "moz-fx-data-shared-prod.firefox_desktop_derived.newtab_items_daily_v1", + ] + # ---------------------------------------- + main() diff --git a/script/metadata/lineage_probe_fetcher.py b/script/metadata/lineage_probe_fetcher.py new file mode 100644 index 00000000000..7743b51f238 --- /dev/null +++ b/script/metadata/lineage_probe_fetcher.py @@ -0,0 +1,548 @@ +"""Resolve upstream lineage for profiled tables and fetch probe definitions from source pings. + +Two output tables: + gkabbz_phase2_table_pings_v1 — lineage mapping: one row per derived table + gkabbz_phase2_ping_probes_v1 — probe definitions: one row per probe in a ping (fetched once per ping) +""" + +import json +import logging +import os +from argparse import ArgumentParser +from datetime import datetime, timezone + +import requests +from google.api_core.exceptions import NotFound +from google.cloud import bigquery + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s: %(levelname)s: %(message)s" +) + +DATAHUB_URL = "https://mozilla.acryl.io/api/graphql" +GLEAN_DICT_URL = "https://dictionary.telemetry.mozilla.org/data/{app}/pings/{ping}.json" +PHASE1_TABLE = "mozdata-nonprod.analysis.gkabbz_data_profiling_test_v1" +MAPPING_TABLE = "mozdata-nonprod.analysis.gkabbz_metadata_phase2_table_pings_v1" +PROBE_TABLE = "mozdata-nonprod.analysis.gkabbz_metadata_phase2_ping_probes_v1" +DEST_PROJECT = "mozdata-nonprod" + +MAPPING_SCHEMA = [ + bigquery.SchemaField( + "source_project", + "STRING", + mode="REQUIRED", + description="GCP project of the derived BQ table.", + ), + bigquery.SchemaField( + "source_dataset", + "STRING", + mode="REQUIRED", + description="BQ dataset of the derived table.", + ), + bigquery.SchemaField( + "source_table", + "STRING", + mode="REQUIRED", + description="BQ table name of the derived table.", + ), + bigquery.SchemaField( + "source_ping", + "STRING", + mode="NULLABLE", + description="Name of the source telemetry ping resolved via DataHub lineage (e.g. first-shutdown, baseline). Null if no ping node found in lineage.", + ), + bigquery.SchemaField( + "ping_platform", + "STRING", + mode="NULLABLE", + description="Telemetry platform of the source ping: 'glean' or 'legacy_telemetry'. Null if no ping found.", + ), + bigquery.SchemaField( + "stable_urn", + "STRING", + mode="NULLABLE", + description="DataHub URN of the closest _stable.* BQ table in the lineage chain. Legacy telemetry only; used to fetch probe schema fields.", + ), + bigquery.SchemaField( + "processed_at", + "TIMESTAMP", + mode="REQUIRED", + description="Timestamp when lineage was resolved for this table.", + ), +] + +PROBE_SCHEMA = [ + bigquery.SchemaField( + "ping_platform", + "STRING", + mode="REQUIRED", + description="Telemetry platform: 'glean' or 'legacy_telemetry'.", + ), + bigquery.SchemaField( + "source_ping", + "STRING", + mode="REQUIRED", + description="Name of the telemetry ping (e.g. first-shutdown, baseline).", + ), + bigquery.SchemaField( + "probe_name", + "STRING", + mode="NULLABLE", + description=( + "Name of the probe/metric as defined in the source." + " Legacy probes use dot notation (e.g. environment.system.os); Glean probes use snake_case." + ), + ), + bigquery.SchemaField( + "probe_description", + "STRING", + mode="NULLABLE", + description="Description from the source artifact: Glean Dictionary for Glean pings, stable table schema fields for Legacy Telemetry.", + ), + bigquery.SchemaField( + "probe_type", + "STRING", + mode="NULLABLE", + description="Data type or kind of the probe (e.g. string, counter, labeled_counter, uint32).", + ), + bigquery.SchemaField( + "probe_raw", + "STRING", + mode="NULLABLE", + description="Full probe definition as JSON from the source artifact.", + ), + bigquery.SchemaField( + "processed_at", + "TIMESTAMP", + mode="REQUIRED", + description="Timestamp when probe definitions were fetched for this ping.", + ), +] + + +# --- DataHub GraphQL client --- + + +def datahub_graphql(query, variables): + """Execute a DataHub GraphQL query and return the response data dict.""" + token = os.environ["DATAHUB_GMS_TOKEN"] + response = requests.post( + DATAHUB_URL, + json={"query": query, "variables": variables}, + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + }, + timeout=30, + ) + response.raise_for_status() + body = response.json() + if "errors" in body: + raise RuntimeError(f"GraphQL errors: {body['errors']}") + return body["data"] + + +LINEAGE_QUERY = """ +query GetUpstreamLineage($urn: String!) { + searchAcrossLineage(input: { + urn: $urn + direction: UPSTREAM + count: 100 + }) { + total + searchResults { + degree + entity { + urn + type + ... on Dataset { + name + platform { urn } + subTypes { typeNames } + } + } + } + } +} +""" + +SCHEMA_FIELDS_QUERY = """ +query GetSchemaFields($urn: String!) { + dataset(urn: $urn) { + schemaMetadata { + fields { + fieldPath + nativeDataType + description + } + } + } +} +""" + + +def get_lineage(table_urn): + """Return upstream lineage search results for a dataset URN.""" + data = datahub_graphql(LINEAGE_QUERY, {"urn": table_urn}) + return data["searchAcrossLineage"]["searchResults"] + + +def get_schema_fields(dataset_urn): + """Return schema fields for a dataset URN as a list of dicts.""" + data = datahub_graphql(SCHEMA_FIELDS_QUERY, {"urn": dataset_urn}) + schema = data.get("dataset", {}).get("schemaMetadata") + if not schema: + return [] + return schema["fields"] + + +# --- Lineage resolver --- + + +def table_to_urn(project, dataset, table): + """Build a DataHub dataset URN for a BigQuery table.""" + return f"urn:li:dataset:(urn:li:dataPlatform:bigquery,{project}.{dataset}.{table},PROD)" + + +def resolve_ping(project, dataset, table): + """Walk upstream lineage and return ping metadata for a BQ table. + + Returns a dict with keys: + ping_platform - 'glean' or 'legacy_telemetry' + source_ping - ping name, e.g. 'new-profile' + stable_urn - DataHub URN of the closest stable BQ table (Legacy only, else None) + Returns None if no ping node is found in the lineage. + """ + urn = table_to_urn(project, dataset, table) + logging.info(f"Fetching lineage for {project}.{dataset}.{table}") + results = get_lineage(urn) + logging.info(f"Found {len(results)} upstream entities") + + ping_node = None + ping_platform = None + stable_urn = None + + sorted_results = sorted(results, key=lambda r: r["degree"], reverse=True) + + for result in sorted_results: + entity = result["entity"] + platform_urn = entity.get("platform", {}).get("urn", "") + sub_types = entity.get("subTypes", {}).get("typeNames", []) + + if "LegacyTelemetry" in platform_urn and "Ping" in sub_types: + ping_node = entity + ping_platform = "legacy_telemetry" + break + if "Glean" in platform_urn and "Ping" in sub_types: + ping_node = entity + ping_platform = "glean" + break + + if not ping_node: + logging.warning( + f"No ping node found in lineage for {project}.{dataset}.{table}" + ) + return None + + ping_name = ping_node["name"] + logging.info(f"Found ping: {ping_name} (platform: {ping_platform})") + + # For legacy telemetry, find the closest stable BQ table in the chain + # Chain: derived → ... → stable → live → ping + if ping_platform == "legacy_telemetry": + ping_degree = next( + r["degree"] + for r in sorted_results + if r["entity"]["urn"] == ping_node["urn"] + ) + stable_candidates = [ + r + for r in sorted_results + if ( + r["degree"] < ping_degree + and "bigquery" in r["entity"].get("platform", {}).get("urn", "") + and "_stable." in r["entity"].get("urn", "") + ) + ] + if stable_candidates: + best = max(stable_candidates, key=lambda r: r["degree"]) + stable_urn = best["entity"]["urn"] + logging.info(f"Found stable table: {best['entity']['name']}") + + return { + "ping_platform": ping_platform, + "source_ping": ping_name, + "stable_urn": stable_urn, + } + + +# --- Probe definition fetchers --- + + +def infer_glean_app(dataset): + """Infer Glean Dictionary app name from a BQ dataset name. + + Strips known suffixes to get the app slug used in the Glean Dictionary, + e.g. firefox_desktop_derived → firefox_desktop. + """ + for suffix in ("_derived", "_stable", "_live"): + if dataset.endswith(suffix): + return dataset[: -len(suffix)] + return dataset + + +def fetch_glean_probes( + _ping_platform, source_ping, stable_urn=None, dataset=None +): # noqa: stable_urn unused (shared signature) + """Fetch probe definitions for a Glean ping from the Glean Dictionary JSON API.""" + app = infer_glean_app(dataset or "") + url = GLEAN_DICT_URL.format(app=app, ping=source_ping) + logging.info(f"Fetching Glean probes from {url}") + response = requests.get(url, timeout=30) + if response.status_code == 404: + logging.warning(f"Glean Dictionary entry not found: {url}") + return [] + response.raise_for_status() + data = response.json() + probes = [] + for metric in data.get("metrics", []): + probes.append( + { + "probe_name": metric.get("name"), + "probe_description": metric.get("description"), + "probe_type": metric.get("type"), + "probe_raw": json.dumps(metric), + } + ) + logging.info(f"Found {len(probes)} Glean probes for {app}/{source_ping}") + return probes + + +def fetch_legacy_probes( + _ping_platform, source_ping, stable_urn, dataset=None +): # noqa: dataset unused (shared signature) + """Fetch probe definitions from the schema of a Legacy Telemetry stable table.""" + logging.info(f"Fetching schema fields from {stable_urn}") + fields = get_schema_fields(stable_urn) + probes = [] + for field in fields: + probes.append( + { + "probe_name": field.get("fieldPath"), + "probe_description": field.get("description"), + "probe_type": field.get("nativeDataType"), + "probe_raw": json.dumps(field), + } + ) + logging.info(f"Found {len(probes)} legacy probe fields for {source_ping}") + return probes + + +PROBE_FETCHERS = { + "glean": fetch_glean_probes, + "legacy_telemetry": fetch_legacy_probes, +} + + +# --- BQ helpers --- + + +def bq_delete(bq_client, table, where_clause): + """Delete rows from a BQ table matching a WHERE clause, ignoring NotFound.""" + try: + bq_client.query(f"DELETE FROM `{table}` WHERE {where_clause}").result() + except NotFound: + pass # Table doesn't exist yet on first run + + +def bq_insert(bq_client, records, dest_table, schema): + """Append records to a BQ table, creating it if needed.""" + job_config = bigquery.LoadJobConfig( + schema=schema, + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + ) + job = bq_client.load_table_from_json(records, dest_table, job_config=job_config) + job.result() + + +# --- Step 1: Lineage resolution → MAPPING_TABLE --- + + +def get_already_mapped_tables(bq_client): + """Return set of (project, dataset, table) already in the mapping table.""" + try: + query = f"SELECT source_project, source_dataset, source_table FROM `{MAPPING_TABLE}`" + return { + (r.source_project, r.source_dataset, r.source_table) + for r in bq_client.query(query).result() + } + except NotFound: + return set() + + +def resolve_lineage_for_tables(bq_client, tables): + """Resolve lineage for tables not yet in the mapping table and write results.""" + already_mapped = get_already_mapped_tables(bq_client) + pending = [(p, d, t) for p, d, t in tables if (p, d, t) not in already_mapped] + logging.info( + f"{len(pending)} tables need lineage resolution (skipping {len(tables) - len(pending)} already mapped)" + ) + + now = datetime.now(timezone.utc).isoformat() + for project, dataset, table in pending: + logging.info(f" Resolving {dataset}.{table}") + try: + ping_info = resolve_ping(project, dataset, table) + except Exception as e: + logging.error(f" Lineage failed for {dataset}.{table}: {e}") + ping_info = None + + record = { + "source_project": project, + "source_dataset": dataset, + "source_table": table, + "source_ping": ping_info["source_ping"] if ping_info else None, + "ping_platform": ping_info["ping_platform"] if ping_info else None, + "stable_urn": ping_info["stable_urn"] if ping_info else None, + "processed_at": now, + } + bq_insert(bq_client, [record], MAPPING_TABLE, MAPPING_SCHEMA) + + +# --- Step 2: Probe fetch → PROBE_TABLE --- + + +def get_unique_pings(bq_client): + """Return unique (ping_platform, source_ping, stable_urn, sample_dataset) from mapping table.""" + query = f""" + SELECT + ping_platform, + source_ping, + ANY_VALUE(stable_urn) AS stable_urn, + ANY_VALUE(source_dataset) AS sample_dataset + FROM `{MAPPING_TABLE}` + WHERE source_ping IS NOT NULL + GROUP BY ping_platform, source_ping + """ + return [ + { + "ping_platform": r.ping_platform, + "source_ping": r.source_ping, + "stable_urn": r.stable_urn, + "sample_dataset": r.sample_dataset, + } + for r in bq_client.query(query).result() + ] + + +def get_already_fetched_pings(bq_client): + """Return set of (ping_platform, source_ping) already in the probe table.""" + try: + query = f"SELECT DISTINCT ping_platform, source_ping FROM `{PROBE_TABLE}`" + return { + (r.ping_platform, r.source_ping) for r in bq_client.query(query).result() + } + except NotFound: + return set() + + +def fetch_probes_for_pings(bq_client): + """Fetch probe definitions for unique pings not yet in the probe table.""" + unique_pings = get_unique_pings(bq_client) + already_fetched = get_already_fetched_pings(bq_client) + pending = [ + p + for p in unique_pings + if (p["ping_platform"], p["source_ping"]) not in already_fetched + ] + logging.info( + f"{len(pending)} pings need probe fetch (skipping {len(unique_pings) - len(pending)} already fetched)" + ) + + now = datetime.now(timezone.utc).isoformat() + for ping in pending: + platform = ping["ping_platform"] + source_ping = ping["source_ping"] + stable_urn = ping["stable_urn"] + dataset = ping["sample_dataset"] + + logging.info(f" Fetching probes for {platform}/{source_ping}") + try: + fetcher = PROBE_FETCHERS[platform] + probes = fetcher( + platform, source_ping, stable_urn=stable_urn, dataset=dataset + ) + except Exception as e: + logging.error(f" Probe fetch failed for {platform}/{source_ping}: {e}") + continue + + if not probes: + logging.warning(f" No probes found for {platform}/{source_ping}") + continue + + records = [ + { + "ping_platform": platform, + "source_ping": source_ping, + "probe_name": probe["probe_name"], + "probe_description": probe["probe_description"], + "probe_type": probe["probe_type"], + "probe_raw": probe["probe_raw"], + "processed_at": now, + } + for probe in probes + ] + bq_insert(bq_client, records, PROBE_TABLE, PROBE_SCHEMA) + logging.info(f" Saved {len(records)} probes for {platform}/{source_ping}") + + +# --- Main --- + + +def parse_args(): + """Parse command line arguments.""" + parser = ArgumentParser( + description="Resolve lineage and fetch probe definitions for Phase 1 profiled tables." + ) + parser.add_argument( + "--table", + help="Fully qualified BQ table (project.dataset.table). If omitted, processes all Phase 1 tables.", + ) + return parser.parse_args() + + +def get_phase1_tables(bq_client): + """Return distinct (project, dataset, table) tuples from the Phase 1 profiling table.""" + query = f""" + SELECT DISTINCT source_project, source_dataset, source_table + FROM `{PHASE1_TABLE}` + ORDER BY source_dataset, source_table + """ + return [ + (row.source_project, row.source_dataset, row.source_table) + for row in bq_client.query(query).result() + ] + + +def main(): + """Resolve lineage and fetch probe definitions for all Phase 1 profiled tables.""" + args = parse_args() + bq_client = bigquery.Client(project=DEST_PROJECT) + + if args.table: + project, dataset, table = args.table.split(".") + tables = [(project, dataset, table)] + else: + tables = get_phase1_tables(bq_client) + logging.info(f"Found {len(tables)} Phase 1 tables") + + logging.info("=== Step 1: Lineage resolution ===") + resolve_lineage_for_tables(bq_client, tables) + + logging.info("=== Step 2: Probe fetch ===") + fetch_probes_for_pings(bq_client) + + +if __name__ == "__main__": + main() From 2bc154f85eb7c60893d02816c98e4da24135dda3 Mon Sep 17 00:00:00 2001 From: gkabbz Date: Thu, 16 Apr 2026 07:47:47 -0400 Subject: [PATCH 2/7] Add performance tracking and fix nested ARRAY column detection in field_profiler Adds per-table performance logging (BQ job ID, table size, GB scanned, elapsed time) to a CSV so profiling cost and runtime can be tracked and extrapolated to the full warehouse. Also fixes a bug where leaf fields nested inside ARRAY columns were not detected as Tier 3 and caused profile queries to fail on tables with complex nested types. --- script/metadata/field_profiler.py | 156 +++++++++++++++++++++--------- 1 file changed, 109 insertions(+), 47 deletions(-) diff --git a/script/metadata/field_profiler.py b/script/metadata/field_profiler.py index f06c3732bc3..f2f2853d3cd 100644 --- a/script/metadata/field_profiler.py +++ b/script/metadata/field_profiler.py @@ -2,7 +2,6 @@ import logging import random -import re import time from argparse import ArgumentParser from datetime import datetime, timezone @@ -176,8 +175,16 @@ def get_columns(client, table): WHERE fp.table_name = '{table_name}' ORDER BY c.ordinal_position, fp.field_path """ + rows = list(client.query(query).result()) + + # Collect all field paths that are ARRAY types so we can detect nested ARRAYs. + # A leaf like `col.items.name` must be skipped if `col.items` is ARRAY. + array_paths = { + row.field_path for row in rows if row.data_type.upper().startswith("ARRAY") + } + results = [] - for row in client.query(query).result(): + for row in rows: field_path = row.field_path data_type = row.data_type column_name = row.column_name @@ -187,6 +194,16 @@ def get_columns(client, table): top_level_data_type.upper().startswith("ARRAY") or column_name in TIER3_COLUMNS ) + + # Also catch leaves nested inside a deeper ARRAY — e.g. col.items.name + # where col.items is ARRAY>. top_level_data_type is STRUCT so the + # check above misses it; verify no ancestor path is an ARRAY. + if not is_tier3 and "." in field_path: + parts = field_path.split(".") + ancestors = [".".join(parts[:i]) for i in range(1, len(parts))] + if any(a in array_paths for a in ancestors): + is_tier3 = True + if is_tier3: if field_path == column_name: results.append((field_path, top_level_data_type, "undocumented")) @@ -236,7 +253,7 @@ def build_profile_query(table, columns, partition_filter=None): if any(t in data_type.upper() for t in SKIP_DISTINCT_TYPES): col_stats.append(f"NULL AS `{alias}__distinct`") else: - col_stats.append(f"COUNT(DISTINCT {sql_ref}) AS `{alias}__distinct`") + col_stats.append(f"APPROX_COUNT_DISTINCT({sql_ref}) AS `{alias}__distinct`") col_stats.append( f"APPROX_TOP_COUNT({sql_ref}, 50) AS `{alias}__top_values`" ) @@ -258,11 +275,29 @@ def build_profile_query(table, columns, partition_filter=None): return f"SELECT {select} FROM `{table}` {where}" +def get_partition_filter(client, table): + """Return a WHERE clause filtering to a recent partition, or None if not partitioned. + + Uses table metadata to detect the partition column rather than waiting for BQ + to throw a "partition filter required" error. Always applied when a partition exists, + even when the filter is not strictly required, to minimize bytes scanned. + """ + table_ref = client.get_table(table) + tp = table_ref.time_partitioning + if tp is None: + return None + field = tp.field # None means ingestion-time partitioning (_PARTITIONTIME) + if field: + return f"DATE(`{field}`) = DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)" + else: + return "_PARTITIONDATE = DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)" + + def check_table_has_rows(client, table): - """Return True if the table has at least one row, False if empty.""" + """Return (total_rows, total_gb) if non-empty, (0, 0) if empty, (-1, 0) if not found.""" project, dataset, table_name = table.split(".") query = f""" - SELECT total_rows + SELECT total_rows, total_logical_bytes FROM `{project}.region-us`.INFORMATION_SCHEMA.TABLE_STORAGE WHERE project_id = '{project}' AND table_schema = '{dataset}' @@ -271,13 +306,16 @@ def check_table_has_rows(client, table): rows = list(client.query(query).result()) if not rows: logging.warning(f"Table not found in INFORMATION_SCHEMA: {table}") - return False + return -1, 0 total_rows = rows[0].total_rows + total_gb = rows[0].total_logical_bytes / 1e9 if total_rows == 0: logging.warning(f"Skipping {table} — table is empty (total_rows=0)") - return False - logging.info(f"Pre-flight check passed: {total_rows:,} rows in {table}") - return True + return 0, 0 + logging.info( + f"Pre-flight check passed: {total_rows:,} rows, {total_gb:.2f} GB in {table}" + ) + return total_rows, total_gb def profile_table(client, table): @@ -286,44 +324,23 @@ def profile_table(client, table): columns = get_columns(client, table) logging.info(f"Found {len(columns)} columns in {table}") - # Run profile query without partition filter - profile_query = build_profile_query(table, columns) - partition_filter = None + # Detect partition column upfront and always apply a date filter when one exists. + # This limits the scan to a single recent day even when the filter is not required. + partition_filter = get_partition_filter(client, table) + if partition_filter: + logging.info(f"Partition filter applied: {partition_filter}") + + profile_query = build_profile_query(table, columns, partition_filter) try: job = client.query(profile_query) rows = list(job.result()) - logging.info( - f"Profile query processed {job.total_bytes_processed / 1e9:.2f} GB" - ) - # if BQ throws "partition filter required" error, parses the partition column and retries with a 1 day window + bq_job_id = job.job_id + gb_scanned = job.total_bytes_processed / 1e9 + logging.info(f"Profile query processed {gb_scanned:.4f} GB (job: {bq_job_id})") except Exception as e: - error_msg = str(e) - if "without a filter over column(s)" in error_msg: - match = re.search(r"filter over column\(s\) '([^']+)'", error_msg) - if not match: - logging.error(f"Could not parse partition column from error: {e}") - return {} - partition_col = match.group(1) - logging.info( - f"Partition filter required on '{partition_col}', retrying with 1-day window from 7 days ago" - ) - partition_filter = ( - f"DATE(`{partition_col}`) = DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)" - ) - profile_query = build_profile_query(table, columns, partition_filter) - try: - job = client.query(profile_query) - rows = list(job.result()) - logging.info( - f"Profile query processed {job.total_bytes_processed / 1e9:.2f} GB" - ) - except Exception as retry_e: - logging.error(f"Retry failed: {retry_e}") - return {} - else: - logging.error(f"Failed to profile {table}: {e}") - return {} + logging.error(f"Failed to profile {table}: {e}") + return {}, None, None row = rows[0] total_rows = row.total_rows @@ -369,7 +386,7 @@ def profile_table(client, table): "values": top_values[:5], } - return results + return results, bq_job_id, gb_scanned def build_description_prompt(table, col, stats): @@ -522,20 +539,65 @@ def log_profile(table, profile): logging.info(f" description: {stats['pass1_description']}") +PERF_LOG = "/tmp/gk_perf_phase1.csv" + + +def write_perf_row( + table, bq_job_id, total_rows, table_gb, gb_scanned, column_count, elapsed_s +): + """Append one row to the Phase 1 performance CSV.""" + import csv + import os + from datetime import datetime, timezone + + write_header = not os.path.exists(PERF_LOG) + with open(PERF_LOG, "a", newline="") as f: + writer = csv.writer(f) + if write_header: + writer.writerow( + [ + "timestamp_utc", + "table", + "bq_job_id", + "total_rows", + "table_gb", + "gb_scanned", + "column_count", + "elapsed_s", + ] + ) + writer.writerow( + [ + datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + table, + bq_job_id or "", + total_rows, + f"{table_gb:.2f}", + f"{gb_scanned:.4f}" if gb_scanned is not None else "", + column_count, + f"{elapsed_s:.1f}", + ] + ) + + def main(): """Profile every column in a BigQuery table and save results to BigQuery.""" args = parse_args() client = bigquery.Client(project=SOURCE_PROJECT) - if not check_table_has_rows(client, args.table): + total_rows, table_gb = check_table_has_rows(client, args.table) + if total_rows <= 0: return logging.info(f"Profiling {args.table}") start = time.time() - profile = profile_table(client, args.table) + profile, bq_job_id, gb_scanned = profile_table(client, args.table) profile = generate_descriptions(args.table, profile) elapsed = time.time() - start log_profile(args.table, profile) records = to_bq_records(args.table, profile) save_to_bq(records, DEST_TABLE) + write_perf_row( + args.table, bq_job_id, total_rows, table_gb, gb_scanned, len(profile), elapsed + ) logging.info(f"Done in {elapsed:.1f}s") @@ -545,9 +607,9 @@ def main(): # --- TEST BLOCK — only used when no args are passed on the command line --- if len(sys.argv) == 1: sys.argv = [ - "gk_field_sampler.py", + "field_profiler.py", "--table", - "moz-fx-data-shared-prod.firefox_desktop_derived.newtab_items_daily_v1", + "moz-fx-data-shared-prod.firefox_accounts_derived.fxa_content_events_v1", ] # ---------------------------------------- main() From 6aed6aaaa5351f3a65c131f83fb434a839083785 Mon Sep 17 00:00:00 2001 From: Arkadiusz Komarzewski Date: Wed, 22 Apr 2026 18:17:57 +0200 Subject: [PATCH 3/7] PoC data classification --- script/metadata/README.md | 35 ++ script/metadata/classification/.gitignore | 2 + .../metadata/classification/build_taxonomy.py | 95 ++++ script/metadata/field_classifier.py | 404 ++++++++++++++++++ script/metadata/field_profiler.py | 4 +- script/metadata/lineage_probe_fetcher.py | 37 +- 6 files changed, 570 insertions(+), 7 deletions(-) create mode 100644 script/metadata/README.md create mode 100644 script/metadata/classification/.gitignore create mode 100644 script/metadata/classification/build_taxonomy.py create mode 100644 script/metadata/field_classifier.py diff --git a/script/metadata/README.md b/script/metadata/README.md new file mode 100644 index 00000000000..55e3744e99b --- /dev/null +++ b/script/metadata/README.md @@ -0,0 +1,35 @@ +# Metadata scripts + +Tooling for enriching BigQuery column metadata — descriptions and data-type +classifications — from table samples, telemetry lineage, and LLM calls. + +## Pipeline + +| Script | Phase | Output table (`mozdata-nonprod.analysis.*`) | +|---|---|---| +| `field_profiler.py` | **1.** Profile every column (null rate, distinct count, top values) and generate a pass-1 description from observed data. | `akomar_data_profiling_v1` | +| `lineage_probe_fetcher.py` | **2.** Walk DataHub upstream lineage to the source ping, then fetch probe definitions (Glean Dictionary or legacy stable-table schema). Captures `data_sensitivity`, `send_in_pings`, and `tags` for Glean probes. | `akomar_metadata_phase2_table_pings_v1`, `akomar_metadata_phase2_ping_probes_v1` | +| `description_reconciler.py` | **3.** *(original PoC, not used by the classifier)* Reconcile phase-1 data-driven descriptions with phase-2 probe intent, emit a final description. | `gkabbz_metadata_phase3_reconciled_v1` | +| `field_classifier.py` | **4.** Classify each column against Mozilla's data taxonomy — primary + secondary labels, confidence, reasoning. Reads phases 1 & 2 directly. | `akomar_field_classifications_v1` | + +All scripts accept `--table project.dataset.table` and are idempotent +(already-processed rows are skipped on re-run). + +## Data classification + +The classification PoC lives in [`classification/`](classification/) — it +holds the taxonomy source CSV, preprocessed `taxonomy.json`, and the +`build_taxonomy.py` tool that maintains them. See +[`classification/PLAN.md`](classification/PLAN.md) for design, usage, and +non-goals. + +**One-liner to classify a table end-to-end:** +```bash +TABLE=moz-fx-data-shared-prod.search_derived.search_clients_daily_v8 +python script/metadata/field_profiler.py --table "$TABLE" +python script/metadata/lineage_probe_fetcher.py --table "$TABLE" +python script/metadata/field_classifier.py --table "$TABLE" +``` + +Requires `ANTHROPIC_API_KEY` and `DATAHUB_GMS_TOKEN` env vars and +`pip install anthropic`. diff --git a/script/metadata/classification/.gitignore b/script/metadata/classification/.gitignore new file mode 100644 index 00000000000..a96ac28911b --- /dev/null +++ b/script/metadata/classification/.gitignore @@ -0,0 +1,2 @@ +taxonomy.json +Taxonomy* diff --git a/script/metadata/classification/build_taxonomy.py b/script/metadata/classification/build_taxonomy.py new file mode 100644 index 00000000000..d19fc3ce2ab --- /dev/null +++ b/script/metadata/classification/build_taxonomy.py @@ -0,0 +1,95 @@ +"""Preprocess the Mozilla data taxonomy CSV into a clean JSON file for the classifier. + +Reads `Taxonomy overview - Data Types.csv`, normalizes labels, fixes known typos, +and emits a flat list of {label, parent, display_name, description, examples} +as `taxonomy.json`. +""" + +import csv +import json +import re +from pathlib import Path + +HERE = Path(__file__).parent +CSV_PATH = HERE / "Taxonomy overview - Data Types.csv" +JSON_PATH = HERE / "taxonomy.json" + +# Known typos in the CSV → canonical label. +LABEL_FIXES = { + "user.behaviour.media_consumption": "user.behavior.media_consumption", + "user.behaviour.search.term": "user.behavior.search.term", + "personnel.demographic.Marital_status_orientation": "personnel.demographic.sexual_orientation", + "personnel.human_resouces.operations": "personnel.human_resources.operations", +} + +# Top-level subject headers (column 0) → synthesized label. +SUBJECT_LABELS = { + "System": "system", + "User": "user", + "company": "company", + "Personnel": "personnel", + "Job applicants": "jobapplicants", + "Other": "other", +} + + +def canonical_label(raw): + """Strip whitespace (including internal), apply known typo fixes, lowercase.""" + label = re.sub(r"\s+", "", raw).rstrip(".") + # Keep casing as-is for the lookup, then return fixed version if matched + if label in LABEL_FIXES: + return LABEL_FIXES[label] + return label.lower() + + +def parent_of(label): + """Return dotted parent, or None for top-level labels.""" + if "." not in label: + return None + return label.rsplit(".", 1)[0] + + +def parse_row(row): + """Return a taxonomy entry or None for blank/unusable rows.""" + padded = (row + [""] * 7)[:7] + subject, level1, level2, name, description, examples, _ = padded + + if level2.strip(): + label = canonical_label(level2) + elif level1.strip(): + label = canonical_label(level1) + elif subject.strip() in SUBJECT_LABELS: + label = SUBJECT_LABELS[subject.strip()] + else: + return None + + return { + "label": label, + "parent": parent_of(label), + "display_name": name.strip() or None, + "description": description.strip() or None, + "examples": examples.strip() or None, + } + + +def main(): + rows = [] + seen = set() + with open(CSV_PATH, newline="") as f: + reader = csv.reader(f) + next(reader) # drop header + for raw_row in reader: + entry = parse_row(raw_row) + if entry is None: + continue + if entry["label"] in seen: + continue + seen.add(entry["label"]) + rows.append(entry) + + JSON_PATH.write_text(json.dumps(rows, indent=2) + "\n") + print(f"Wrote {len(rows)} taxonomy entries → {JSON_PATH}") + + +if __name__ == "__main__": + main() diff --git a/script/metadata/field_classifier.py b/script/metadata/field_classifier.py new file mode 100644 index 00000000000..692f3ca148f --- /dev/null +++ b/script/metadata/field_classifier.py @@ -0,0 +1,404 @@ +"""Classify each profiled column against the Mozilla data taxonomy. + +Reads: + - akomar_data_profiling_v1 (Phase 1: profile + pass1 description) + - akomar_metadata_phase2_table_pings_v1 (Phase 2: source ping per table) + - akomar_metadata_phase2_ping_probes_v1 (Phase 2: probes w/ data_sensitivity, tags) + - classification/taxonomy.json (preprocessed taxonomy) + +Writes: + - akomar_field_classifications_v1 — one row per column per table +""" + +import json +import logging +import re +from argparse import ArgumentParser +from datetime import datetime, timezone +from pathlib import Path + +import anthropic +from google.api_core.exceptions import NotFound +from google.cloud import bigquery + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s: %(levelname)s: %(message)s" +) + +PHASE1_TABLE = "mozdata-nonprod.analysis.akomar_data_profiling_v1" +MAPPING_TABLE = "mozdata-nonprod.analysis.akomar_metadata_phase2_table_pings_v1" +PROBE_TABLE = "mozdata-nonprod.analysis.akomar_metadata_phase2_ping_probes_v1" +DEST_TABLE = "mozdata-nonprod.analysis.akomar_field_classifications_v1" +DEST_PROJECT = "mozdata-nonprod" +CLAUDE_MODEL = "claude-sonnet-4-6" +TAXONOMY_PATH = Path(__file__).parent / "classification" / "taxonomy.json" +TOP_N_PROBES = 3 + +DEST_SCHEMA = [ + bigquery.SchemaField("source_project", "STRING", mode="REQUIRED"), + bigquery.SchemaField("source_dataset", "STRING", mode="REQUIRED"), + bigquery.SchemaField("source_table", "STRING", mode="REQUIRED"), + bigquery.SchemaField("column_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("data_type", "STRING", mode="NULLABLE"), + bigquery.SchemaField( + "primary_label", + "STRING", + mode="NULLABLE", + description="Most specific matching taxonomy label (e.g. user.unique_id.client_id).", + ), + bigquery.SchemaField( + "secondary_labels", + "STRING", + mode="REPEATED", + description="Additional taxonomy labels that also apply.", + ), + bigquery.SchemaField( + "confidence", + "STRING", + mode="NULLABLE", + description="high | medium | low — Claude's self-reported confidence.", + ), + bigquery.SchemaField( + "reasoning", + "STRING", + mode="NULLABLE", + description="1-2 sentence justification referencing the signals used.", + ), + bigquery.SchemaField( + "needs_review", + "BOOLEAN", + mode="NULLABLE", + description="True when confidence is low or signals conflict.", + ), + bigquery.SchemaField("matched_probe", "STRING", mode="NULLABLE"), + bigquery.SchemaField( + "data_sensitivity", + "STRING", + mode="REPEATED", + description="Glean data_sensitivity labels from the matched probe, if any.", + ), + bigquery.SchemaField("classified_at", "TIMESTAMP", mode="REQUIRED"), +] + + +# --- Taxonomy --- + + +def load_taxonomy(): + """Load the preprocessed taxonomy JSON.""" + return json.loads(TAXONOMY_PATH.read_text()) + + +def taxonomy_prompt_block(taxonomy): + """Compact the taxonomy into a single JSON block for the prompt.""" + compact = [ + { + "label": e["label"], + "name": e.get("display_name") or "", + "desc": e.get("description") or "", + "examples": e.get("examples") or "", + } + for e in taxonomy + ] + return json.dumps(compact, separators=(",", ":")) + + +# --- Probe matching (lifted from description_reconciler.py, top-3 only) --- + + +def normalize_name(name): + """Strip all non-alphanumeric characters and lowercase for fuzzy matching.""" + return re.sub(r"[^a-z0-9]", "", (name or "").lower()) + + +def find_matching_probes(column_name, probes): + """Return up to TOP_N_PROBES candidate probes for a column name.""" + col_norm = normalize_name(column_name) + scored = [] + for probe in probes: + pname = probe.get("probe_name") or "" + if not pname: + continue + pname_norm = normalize_name(pname) + if col_norm == pname_norm: + score = 3 + elif pname_norm.endswith(col_norm) or pname_norm.startswith(col_norm): + score = 2 + elif col_norm in pname_norm or pname_norm in col_norm: + score = 1 + else: + continue + scored.append((score, probe)) + scored.sort(key=lambda x: -x[0]) + return [p for _, p in scored[:TOP_N_PROBES]] + + +# --- Prompt --- + + +def build_classification_prompt( + column_name, data_type, table, pass1_description, matching_probes, taxonomy_json +): + """Build a prompt asking Claude to assign a taxonomy label.""" + if matching_probes: + probe_lines = [] + for p in matching_probes: + parts = [f"name={p['probe_name']}"] + if p.get("probe_type"): + parts.append(f"type={p['probe_type']}") + if p.get("data_sensitivity"): + parts.append(f"data_sensitivity={p['data_sensitivity']}") + if p.get("tags"): + parts.append(f"tags={p['tags']}") + if p.get("probe_description"): + parts.append(f"desc={p['probe_description']}") + probe_lines.append(" - " + " | ".join(parts)) + probes_section = "Candidate probes:\n" + "\n".join(probe_lines) + else: + probes_section = "Candidate probes: none matched." + + return ( + "You are classifying a BigQuery column against Mozilla's data taxonomy.\n\n" + f"Table: {table}\n" + f"Column: {column_name}\n" + f"Data type: {data_type}\n" + f"Observed-data description (from profiling): {pass1_description}\n\n" + f"{probes_section}\n\n" + "Taxonomy (JSON list of {label, name, desc, examples}):\n" + f"{taxonomy_json}\n\n" + "Pick the single most specific taxonomy label that fits. If multiple apply," + " list the extras in secondary_labels. Use the Glean data_sensitivity signal" + " to disambiguate when present (e.g. highly_sensitive strongly implies" + " user.behavior, user.content, user.location.precise, etc.).\n\n" + "Respond with a JSON object only (no markdown fences):\n" + '{"primary_label": "