diff --git a/requirements.in b/requirements.in index 2135f1b3c17..c3100382ee0 100644 --- a/requirements.in +++ b/requirements.in @@ -6,6 +6,7 @@ # the private file second). If you see dependency weirdness in the private # image, check for version overlaps between requirements.txt and # requirements-private.txt first. +anthropic attrs==25.4.0 authlib==1.6.12 beautifulsoup4==4.14.3 diff --git a/requirements.txt b/requirements.txt index 3733c0ab8b9..20512548a8c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -140,10 +140,15 @@ annotated-types==0.7.0 \ --hash=sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53 \ --hash=sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89 # via pydantic +anthropic==0.109.2 \ + --hash=sha256:d37db299597c7bc124b49b767ff135f1e6456b64af2b2fad4b63b2a1df333cf0 \ + --hash=sha256:e0fb4ca5df0ed983248c9c6c3242adc81d9cfddb8725902da53698554117abac + # via -r requirements.in anyio==4.11.0 \ --hash=sha256:0287e96f4d26d4149305414d4e3bc32f0dcd0862365a4bddea19d7a1ec38c4fc \ --hash=sha256:82a8d0b81e318cc5ce71a5f1f8b5c4e63619620b63141ef8c995fa0db95a57c4 # via + # anthropic # google-genai # httpx # openai @@ -518,8 +523,13 @@ distro==1.9.0 \ --hash=sha256:2fa77c6fd8940f116ee1d6b94a2f90b13b5ea8d019b98bc8bafdcabcdd9bdbed \ --hash=sha256:7bffd925d65168f85027d8da9af6bddab658135b840670a223589bc0c8ef02b2 # via + # anthropic # google-genai # openai +docstring-parser==0.18.0 \ + --hash=sha256:292510982205c12b1248696f44959db3cdd1740237a968ea1e2e7a900eeb2015 \ + --hash=sha256:b3fcbed555c47d8479be0796ef7e19c2670d428d72e96da63f3a40122860374b + # via anthropic exceptiongroup==1.3.1 \ --hash=sha256:8b412432c6055b0b7d14c310000ae93352ed6754f70fa8f7c34141f91c4e3219 \ --hash=sha256:a7a39a3bd276781e98394987d3a5701d0c4edffb633bb7a5144577f82c773598 @@ -1009,6 +1019,7 @@ httpx==0.28.1 \ --hash=sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc \ --hash=sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad # via + # anthropic # google-genai # openai hyperframe==6.1.0 \ @@ -1054,9 +1065,7 @@ jaraco-classes==3.4.0 \ jeepney==0.8.0 \ --hash=sha256:5efe48d255973902f6badc3ce55e2aa6c5c3b3bc642059ef3a91247bcfcc5806 \ --hash=sha256:c0a454ad016ca575060802ee4d590dd912e35c122fa04e70306de3d076cce755 - # via - # keyring - # secretstorage + # via secretstorage jinja2==3.1.6 \ --hash=sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d \ --hash=sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67 @@ -1169,7 +1178,9 @@ jiter==0.11.1 \ --hash=sha256:fa992af648fcee2b850a3286a35f62bbbaeddbb6dbda19a00d8fbc846a947b6e \ --hash=sha256:fe04ea475392a91896d1936367854d346724a1045a247e5d1c196410473b8869 \ --hash=sha256:fe4a431c291157e11cee7c34627990ea75e8d153894365a3bc84b7a959d23ca8 - # via openai + # via + # anthropic + # openai jmespath==1.0.1 \ --hash=sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980 \ --hash=sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe @@ -1923,6 +1934,7 @@ pydantic==2.9.1 \ --hash=sha256:1363c7d975c7036df0db2b4a61f2e062fbc0aa5ab5f2772e0ffc7191a4f4bce2 \ --hash=sha256:7aff4db5fdf3cf573d4b3c30926a510a10e19a0774d38fc4967f78beb6deb612 # via + # anthropic # bigeye-sdk # google-genai # mozilla-nimbus-schemas @@ -2590,9 +2602,7 @@ s3transfer==0.13.0 \ secretstorage==3.3.3 \ --hash=sha256:2403533ef369eca6d2ba81718576c5e0f564d5cca1b58f73a8b23e7d4eeebd77 \ --hash=sha256:f356e6628222568e3af06f2eba8df495efa13b3b63081dafd4f7d9a7b7bc9f99 - # via - # bigeye-sdk - # keyring + # via bigeye-sdk shellingham==1.5.4 \ --hash=sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686 \ --hash=sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de @@ -2625,6 +2635,7 @@ sniffio==1.3.1 \ --hash=sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2 \ --hash=sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc # via + # anthropic # anyio # google-genai # openai @@ -2747,6 +2758,7 @@ typing-extensions==4.15.0 \ --hash=sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548 # via # aiosignal + # anthropic # anyio # beautifulsoup4 # cattrs diff --git a/script/metadata/README.md b/script/metadata/README.md new file mode 100644 index 00000000000..83038569460 --- /dev/null +++ b/script/metadata/README.md @@ -0,0 +1,219 @@ +# Metadata scripts - data classification PoC + +Tooling that profiles BigQuery columns and assigns each one a data-type label +from Mozilla's data taxonomy (`classification/Taxonomy overview - Data +Types.csv`). It reuses a field-profiling + telemetry-lineage pipeline to feed an +LLM classifier. + +**Scope**: PoC. Speed over correctness. Single-table runs, eyeball the output, +iterate. + +## Pipeline + +| Script | Phase | Output table (`mozdata-nonprod.analysis.*`) | +|---|---|---| +| `field_profiler.py` | **1.** Profile every column (null rate, distinct count, top values) and generate a pass-1 description from observed data. | `akomar_data_profiling_v1` | +| `lineage_probe_fetcher.py` | **2.** Walk DataHub upstream lineage to the source ping, then fetch probe definitions (Glean Dictionary or legacy stable-table schema). Captures `data_sensitivity`, `send_in_pings`, and `tags` for Glean probes. | `akomar_metadata_phase2_table_pings_v1`, `akomar_metadata_phase2_ping_probes_v1` | +| `description_reconciler.py` | **3.** *(original PoC, not used by the classifier)* Reconcile phase-1 data-driven descriptions with phase-2 probe intent. | `gkabbz_metadata_phase3_reconciled_v1` | +| `field_classifier.py` | **4.** Classify each column against the taxonomy: primary + secondary labels, confidence, reasoning, data collection category. Reads phases 1 and 2 directly. | `akomar_field_classifications_v1` | + +All scripts accept `--table project.dataset.table` and are idempotent +(already-processed rows are skipped on re-run). + +## Layout + +``` +script/metadata/ + field_profiler.py - Phase 1: profile every column + pass1 description + lineage_probe_fetcher.py - Phase 2: resolve source ping + fetch probe defs + (Glean probes include data_sensitivity, + send_in_pings, tags) + description_reconciler.py - original Phase 3 for descriptions (unused by the classifier) + field_classifier.py - Phase 4: classify columns against the taxonomy + classify_table.sh - wrapper: runs phases 1->2->4 per table, per model + classification/ + Taxonomy overview - Data Types.csv - source of truth (from legal/privacy) + build_taxonomy.py - CSV -> taxonomy.json + taxonomy.json - preprocessed, what the classifier reads + compare_models.py - diff Claude vs Gemini classifications + export_to_sheet.py - export classifications as CSV for Google Sheets +``` + +## Output tables (`mozdata-nonprod.analysis`) + +| Table | Written by | Contents | +|---|---|---| +| `akomar_data_profiling_v1` | `field_profiler.py` | One row per column: null rate, distinct count, top values, pass1 description | +| `akomar_metadata_phase2_table_pings_v1` | `lineage_probe_fetcher.py` | Table -> source ping mapping (via DataHub lineage) | +| `akomar_metadata_phase2_ping_probes_v1` | `lineage_probe_fetcher.py` | Probe defs per ping, incl. `data_sensitivity`, `send_in_pings`, `tags` for Glean | +| `akomar_field_classifications_v1` | `field_classifier.py` | Final classification: `primary_label`, `secondary_labels`, `confidence`, `reasoning`, `needs_review`, `data_collection_category` (technical / interaction / web_activity / highly_sensitive), `model` (full model name) | + +## Setup (one-time) + +```bash +pip install -r requirements.txt # adds anthropic + google-genai to the venv +export DATAHUB_GMS_TOKEN=... +# Claude backend: +export ANTHROPIC_API_KEY=... +# Gemini backend - uses Vertex AI on the `mozdata` project: +gcloud auth application-default login +python script/metadata/classification/build_taxonomy.py # regenerate taxonomy.json +``` + +## Usage + +**Classify one or more tables end-to-end** with the wrapper: + +```bash +script/metadata/classify_table.sh \ + moz-fx-data-shared-prod.search_derived.search_clients_daily_v8 \ + moz-fx-data-shared-prod.ads_backend_stable.interaction_v1 +``` + +It runs three phases per table (profile -> lineage/probes -> classify) with each +model in `$MODELS`. Default is a single Gemini run +(`gemini-3.1-flash-lite-preview`). To also classify with Claude and diff the two +afterward: + +```bash +MODELS="claude-sonnet-4-6 gemini-3.1-flash-lite-preview" \ + script/metadata/classify_table.sh "$TABLE" +python script/metadata/classification/compare_models.py --table "$TABLE" +``` + +**Or run the phases manually** (default model for `field_classifier.py` is +`claude-sonnet-4-6`): + +```bash +TABLE=moz-fx-data-shared-prod.search_derived.search_clients_daily_v8 +python script/metadata/field_profiler.py --table "$TABLE" +python script/metadata/lineage_probe_fetcher.py --table "$TABLE" +python script/metadata/field_classifier.py --table "$TABLE" +python script/metadata/field_classifier.py --table "$TABLE" --model gemini-3.1-flash-lite-preview +python script/metadata/classification/compare_models.py --table "$TABLE" +``` + +**Inspect results:** + +```sql +SELECT column_name, model, primary_label, confidence, needs_review, reasoning, data_sensitivity +FROM `mozdata-nonprod.analysis.akomar_field_classifications_v1` +WHERE source_table = 'search_clients_daily_v8' +ORDER BY column_name, model; +``` + +## Model selection + +`field_classifier.py --model ` picks the LLM; the backend is +inferred from the name prefix: + +- `claude-*` -> Anthropic API (e.g. `claude-sonnet-4-6`, `claude-opus-4-7`). + Requires `ANTHROPIC_API_KEY`. +- `gemini-*` -> Vertex AI on project `mozdata` (e.g. + `gemini-3.1-flash-lite-preview`). Requires application-default credentials. + +Default is `claude-sonnet-4-6`; any other prefix is rejected. The destination +table has a `model` column storing the full model name, and the idempotency key +is `(project, dataset, table, column, model)`, so multiple models can be run on +the same table and all rows are kept (including version-to-version comparisons +within a family). + +## Comparing models + +After classifying a table with two models: + +```bash +python script/metadata/classification/compare_models.py --table "$TABLE" +``` + +If the table has exactly two distinct `model` values they are auto-picked; +otherwise pass them explicitly with `--left` / `--right`. Prints: + +- agreement rate on `primary_label` +- per-model confidence distribution (high/medium/low counts) +- side-by-side reasoning for every disagreement, including each model's matched + probe + +Add `--show-agreements` to also dump the agreed rows. Omit `--table` to compare +across all classified tables. + +## Exporting for Legal review + +`export_to_sheet.py` produces a CSV of classifications for manual paste into a +Google Sheet. The source-table list and model are hardcoded at the top of the +script; edit them for a different scope. (Writing directly to Sheets via the API +is blocked by Mozilla's Workspace policy on the gcloud OAuth client's Sheets +scope, so CSV-and-paste is the path of least resistance.) + +```bash +python script/metadata/classification/export_to_sheet.py +# writes script/metadata/classification/classifications.csv + +# or pipe straight to the macOS clipboard: +python script/metadata/classification/export_to_sheet.py --stdout | pbcopy +``` + +In Google Sheets: open a fresh sheet, click cell A1, paste; Sheets auto-splits +the CSV. Re-runs overwrite the file (gitignored), so just paste again to refresh. + +Output columns: `dataset, table, column_name, category, category_simple, +data_collection_category, confidence, reasoning, needs_review`. `category_simple` +rolls the assigned `primary_label` up to the closest "Data type" entry from the +taxonomy (e.g. `user.behavior.search.term` -> `user.behavior.search`, +`user.unique_id.client_id` -> `user.unique_id`). + +## Taxonomy preprocessing + +`build_taxonomy.py` parses the CSV and normalizes it: + +- Strips blank/header-only rows. +- Fixes typos: `user.behaviour.*` -> `user.behavior.*`, + `personnel.demographic.Marital_status _orientation` -> + `personnel.demographic.sexual_orientation`, + `personnel.human_resouces.*` -> `personnel.human_resources.*`. +- Synthesizes top-level subject labels (`system`, `user`, `company`, + `personnel`, `jobapplicants`, `other`) from CSV section headers. + +Emits ~133 entries of `{label, parent, level, display_name, description, +examples}`, where `level` is one of `subject` / `data_type` / `subcategory` +(which CSV column the entry came from). + +## Classifier design + +For each profiled column: + +1. Fuzzy-match the column name against probes from the source ping (top 3). +2. Build an LLM prompt with: column name, data type, null rate, pass1 + description, matched probe (name + description + `data_sensitivity` + `tags`), + and the full `taxonomy.json` compacted (~6k tokens, fits easily). +3. The model returns JSON: `{primary_label, secondary_labels, confidence, + reasoning, needs_review, data_collection_category}`. + - `data_collection_category` is one of `technical` / `interaction` / + `web_activity` / `highly_sensitive` (Mozilla's [4 data collection + categories](https://wiki.mozilla.org/Data_Collection#Data_Collection_Categories), + the same scale as Glean's `data_sensitivity`). Always emitted, including + when no probe matched. The model is told to defer to a Glean-declared + `data_sensitivity` unless the column's observed content overrides it. +4. Write to BQ. + +## Explicit non-goals for the PoC + +- No ground-truth eval set, no accuracy measurement. +- No Phase 3 description reconciliation reuse - classifier reads Phase 1 + Phase 2 directly. +- No Tier 3 (`REPEATED RECORD`, `metrics STRUCT`) handling. +- No writeback to `schema.yaml` / `global.yaml` / DataHub tags - output stays in BQ for review. +- No batching / parallelism beyond what the existing scripts already do. +- No retries on LLM JSON parse failures - log and skip. + +## Plans / in progress + +Forward-looking work (these supersede some of the design above as they land): + +- [`classification/profiler_productionization_plan.md`](classification/profiler_productionization_plan.md) + - replace Phase 1 with the productionized profiler from bigquery-etl PR #9503, + feed raw profile stats into the classifier prompt, and make descriptions + optional. Revisits the "no Tier 3" non-goal (the production profiler adds + nested/array tiers). +- [`classification/fxa_classification_plan.md`](classification/fxa_classification_plan.md) + - test classifying all FxA (Mozilla Accounts) data. Revisits the + "no ground-truth eval" non-goal and raises restricted-PII handling. diff --git a/script/metadata/classification/.gitignore b/script/metadata/classification/.gitignore new file mode 100644 index 00000000000..66767c7edc8 --- /dev/null +++ b/script/metadata/classification/.gitignore @@ -0,0 +1,3 @@ +taxonomy.json +Taxonomy* +*.csv diff --git a/script/metadata/classification/build_taxonomy.py b/script/metadata/classification/build_taxonomy.py new file mode 100644 index 00000000000..d66ac664853 --- /dev/null +++ b/script/metadata/classification/build_taxonomy.py @@ -0,0 +1,102 @@ +"""Preprocess the Mozilla data taxonomy CSV into a clean JSON file for the classifier. + +Reads `Taxonomy overview - Data Types.csv`, normalizes labels, fixes known typos, +and emits a flat list of {label, parent, level, display_name, description, examples} +as `taxonomy.json`. The `level` field tags which CSV column the entry came from: + - "subject" — Data subject (col 0): user, system, personnel, ... + - "data_type" — Data type (col 1): user.unique_id, user.behavior.search, ... + - "subcategory" — Subcategory 1 (col 2): user.unique_id.client_id, ... +""" + +import csv +import json +import re +from pathlib import Path + +HERE = Path(__file__).parent +CSV_PATH = HERE / "Taxonomy overview - Data Types.csv" +JSON_PATH = HERE / "taxonomy.json" + +# Known typos in the CSV → canonical label. +LABEL_FIXES = { + "user.behaviour.media_consumption": "user.behavior.media_consumption", + "user.behaviour.search.term": "user.behavior.search.term", + "personnel.demographic.Marital_status_orientation": "personnel.demographic.sexual_orientation", + "personnel.human_resouces.operations": "personnel.human_resources.operations", +} + +# Top-level subject headers (column 0) → synthesized label. +SUBJECT_LABELS = { + "System": "system", + "User": "user", + "company": "company", + "Personnel": "personnel", + "Job applicants": "jobapplicants", + "Other": "other", +} + + +def canonical_label(raw): + """Strip whitespace (including internal), apply known typo fixes, lowercase.""" + label = re.sub(r"\s+", "", raw).rstrip(".") + # Keep casing as-is for the lookup, then return fixed version if matched + if label in LABEL_FIXES: + return LABEL_FIXES[label] + return label.lower() + + +def parent_of(label): + """Return dotted parent, or None for top-level labels.""" + if "." not in label: + return None + return label.rsplit(".", 1)[0] + + +def parse_row(row): + """Return a taxonomy entry or None for blank/unusable rows.""" + padded = (row + [""] * 7)[:7] + subject, level1, level2, name, description, examples, _ = padded + + if level2.strip(): + label = canonical_label(level2) + level = "subcategory" + elif level1.strip(): + label = canonical_label(level1) + level = "data_type" + elif subject.strip() in SUBJECT_LABELS: + label = SUBJECT_LABELS[subject.strip()] + level = "subject" + else: + return None + + return { + "label": label, + "parent": parent_of(label), + "level": level, + "display_name": name.strip() or None, + "description": description.strip() or None, + "examples": examples.strip() or None, + } + + +def main(): + rows = [] + seen = set() + with open(CSV_PATH, newline="") as f: + reader = csv.reader(f) + next(reader) # drop header + for raw_row in reader: + entry = parse_row(raw_row) + if entry is None: + continue + if entry["label"] in seen: + continue + seen.add(entry["label"]) + rows.append(entry) + + JSON_PATH.write_text(json.dumps(rows, indent=2) + "\n") + print(f"Wrote {len(rows)} taxonomy entries → {JSON_PATH}") + + +if __name__ == "__main__": + main() diff --git a/script/metadata/classification/compare_models.py b/script/metadata/classification/compare_models.py new file mode 100644 index 00000000000..72e14d6b7a2 --- /dev/null +++ b/script/metadata/classification/compare_models.py @@ -0,0 +1,253 @@ +"""Side-by-side diff of two model runs of the field classifier. + +Reads `mozdata-nonprod.analysis.akomar_field_classifications_v1` (the table +written by `field_classifier.py`), pairs rows by (project, dataset, table, +column) across two `model` values, and prints: + + - overall agreement rate on `primary_label` + - per-model confidence distribution + - per-row disagreements with reasoning from both sides + +Use `--left` and `--right` to pick the two model names to compare. If +omitted, the script uses the two distinct model values present in the data +(and errors out if there are zero or more than two). +""" + +from argparse import ArgumentParser +from collections import Counter + +from google.cloud import bigquery + +DEST_TABLE = "mozdata-nonprod.analysis.akomar_field_classifications_v1" +DEST_PROJECT = "mozdata-nonprod" + + +def parse_args(): + """Parse command line arguments.""" + parser = ArgumentParser(description=__doc__) + parser.add_argument( + "--table", + help="Restrict to a single source table (project.dataset.table).", + ) + parser.add_argument( + "--left", + help="Model name for the left-hand column (full name, e.g. claude-sonnet-4-6).", + ) + parser.add_argument( + "--right", + help="Model name for the right-hand column (full name, e.g. gemini-3.1-flash-lite-preview).", + ) + parser.add_argument( + "--show-agreements", + action="store_true", + help="Also print rows where both models agreed (default: disagreements only).", + ) + return parser.parse_args() + + +def _table_filter_clause(table): + if not table: + return "" + project, dataset, source_table = table.split(".") + return ( + f"WHERE source_project = '{project}' " + f"AND source_dataset = '{dataset}' " + f"AND source_table = '{source_table}'" + ) + + +def resolve_models(bq_client, table, left, right): + """Resolve --left/--right against models actually present in the data.""" + if left and right: + return left, right + + where = _table_filter_clause(table) + query = f""" + SELECT DISTINCT model + FROM `{DEST_TABLE}` + {where} + ORDER BY model + """ + found = [r.model for r in bq_client.query(query).result()] + if len(found) == 0: + raise SystemExit( + f"No classification rows found{f' for {table}' if table else ''}." + ) + if len(found) > 2 and not (left and right): + raise SystemExit( + f"Found {len(found)} distinct models ({', '.join(found)}). " + "Pass --left and --right to pick two." + ) + if len(found) == 1: + raise SystemExit( + f"Only one model present: {found[0]}. Need a second run to compare." + ) + return found[0], found[1] + + +def fetch_pairs(bq_client, table, left_model, right_model): + """Return rows joining left + right model classifications on (table, column).""" + where = _table_filter_clause(table) + + query = f""" + WITH base AS ( + SELECT + source_project, source_dataset, source_table, column_name, + model, + primary_label, secondary_labels, confidence, reasoning, + needs_review, matched_probe, data_sensitivity, + data_collection_category + FROM `{DEST_TABLE}` + {where} + ), + l AS (SELECT * FROM base WHERE model = @left_model), + r AS (SELECT * FROM base WHERE model = @right_model) + SELECT + COALESCE(l.source_project, r.source_project) AS source_project, + COALESCE(l.source_dataset, r.source_dataset) AS source_dataset, + COALESCE(l.source_table, r.source_table) AS source_table, + COALESCE(l.column_name, r.column_name) AS column_name, + l.primary_label AS left_label, + l.confidence AS left_confidence, + l.reasoning AS left_reasoning, + l.needs_review AS left_needs_review, + l.matched_probe AS left_matched_probe, + l.data_collection_category AS left_category, + r.primary_label AS right_label, + r.confidence AS right_confidence, + r.reasoning AS right_reasoning, + r.needs_review AS right_needs_review, + r.matched_probe AS right_matched_probe, + r.data_collection_category AS right_category + FROM l + FULL OUTER JOIN r USING (source_project, source_dataset, source_table, column_name) + ORDER BY source_dataset, source_table, column_name + """ + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ScalarQueryParameter("left_model", "STRING", left_model), + bigquery.ScalarQueryParameter("right_model", "STRING", right_model), + ] + ) + return list(bq_client.query(query, job_config=job_config).result()) + + +def summarize(pairs): + """Compute agreement rate and per-model confidence distributions.""" + both = [p for p in pairs if p.left_label and p.right_label] + only_left = [p for p in pairs if p.left_label and not p.right_label] + only_right = [p for p in pairs if p.right_label and not p.left_label] + agree = [p for p in both if p.left_label == p.right_label] + disagree = [p for p in both if p.left_label != p.right_label] + + both_cat = [p for p in pairs if p.left_category and p.right_category] + cat_agree = [p for p in both_cat if p.left_category == p.right_category] + + return { + "total_rows": len(pairs), + "both": len(both), + "only_left": len(only_left), + "only_right": len(only_right), + "agree": len(agree), + "disagree": len(disagree), + "left_confidence": Counter(p.left_confidence for p in pairs if p.left_label), + "right_confidence": Counter(p.right_confidence for p in pairs if p.right_label), + "left_category": Counter(p.left_category for p in pairs if p.left_category), + "right_category": Counter(p.right_category for p in pairs if p.right_category), + "category_both": len(both_cat), + "category_agree": len(cat_agree), + "agree_rows": agree, + "disagree_rows": disagree, + } + + +def fmt_conf(counter): + """Format Counter as 'high=12 medium=3 low=1'.""" + if not counter: + return "(no rows)" + return " ".join(f"{k}={v}" for k, v in sorted(counter.items(), key=lambda x: -x[1])) + + +def print_summary(s, left_model, right_model): + """Print the headline numbers.""" + print("=" * 72) + print(f"MODEL COMPARISON: {left_model} vs {right_model}") + print("=" * 72) + print(f"Rows joined: {s['total_rows']}") + print(f" Classified by both: {s['both']}") + print(f" Only {left_model}: {s['only_left']}") + print(f" Only {right_model}: {s['only_right']}") + print() + if s["both"]: + rate = 100.0 * s["agree"] / s["both"] + print(f"Agreement on primary_label: {s['agree']}/{s['both']} ({rate:.1f}%)") + if s["category_both"]: + cat_rate = 100.0 * s["category_agree"] / s["category_both"] + print( + f"Agreement on data_collection_category: " + f"{s['category_agree']}/{s['category_both']} ({cat_rate:.1f}%)" + ) + print() + print(f"{left_model} confidence: {fmt_conf(s['left_confidence'])}") + print(f"{right_model} confidence: {fmt_conf(s['right_confidence'])}") + print() + print(f"{left_model} data_collection_category: {fmt_conf(s['left_category'])}") + print(f"{right_model} data_collection_category: {fmt_conf(s['right_category'])}") + print() + + +def print_row(p, header, left_model, right_model): + """Print a single side-by-side comparison row.""" + fq = f"{p.source_dataset}.{p.source_table}.{p.column_name}" + print("-" * 72) + print(f"{header}: {fq}") + print( + f" {left_model}: {p.left_label or ''} " + f"({p.left_confidence or '?'}, " + f"category={p.left_category or '?'}, " + f"needs_review={p.left_needs_review}, " + f"probe={p.left_matched_probe or 'none'})" + ) + if p.left_reasoning: + print(f" {p.left_reasoning}") + print( + f" {right_model}: {p.right_label or ''} " + f"({p.right_confidence or '?'}, " + f"category={p.right_category or '?'}, " + f"needs_review={p.right_needs_review}, " + f"probe={p.right_matched_probe or 'none'})" + ) + if p.right_reasoning: + print(f" {p.right_reasoning}") + + +def main(): + """Print the comparison report.""" + args = parse_args() + bq_client = bigquery.Client(project=DEST_PROJECT) + + left_model, right_model = resolve_models( + bq_client, args.table, args.left, args.right + ) + pairs = fetch_pairs(bq_client, args.table, left_model, right_model) + s = summarize(pairs) + print_summary(s, left_model, right_model) + + if s["disagree_rows"]: + print("=" * 72) + print(f"DISAGREEMENTS ({len(s['disagree_rows'])})") + print("=" * 72) + for p in s["disagree_rows"]: + print_row(p, "DISAGREE", left_model, right_model) + print() + + if args.show_agreements and s["agree_rows"]: + print("=" * 72) + print(f"AGREEMENTS ({len(s['agree_rows'])})") + print("=" * 72) + for p in s["agree_rows"]: + print_row(p, "AGREE", left_model, right_model) + + +if __name__ == "__main__": + main() diff --git a/script/metadata/classification/export_to_sheet.py b/script/metadata/classification/export_to_sheet.py new file mode 100644 index 00000000000..f7c5c281e0a --- /dev/null +++ b/script/metadata/classification/export_to_sheet.py @@ -0,0 +1,160 @@ +"""Export field classifications as CSV for Legal review. + +Pulls rows from `mozdata-nonprod.analysis.akomar_field_classifications_v1` +for a hardcoded list of source tables and a single model, and writes a CSV +file. Paste the CSV contents into a Google Sheet manually. + +`category_simple` is the assigned `primary_label` rolled up to the closest +ancestor that is a "Data type" in `Taxonomy overview - Data Types.csv` +(taxonomy.json entries with `level == "data_type"`). Most data types are +2-segment (`user.unique_id`) but some are 3-segment (`user.behavior.search`), +so this is a longest-ancestor lookup, not a fixed truncation. + +Usage: + python script/metadata/classification/export_to_sheet.py + # or pipe to clipboard on macOS: + python script/metadata/classification/export_to_sheet.py --stdout | pbcopy +""" + +import argparse +import csv +import json +import logging +import sys +from pathlib import Path + +from google.cloud import bigquery + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s: %(levelname)s: %(message)s" +) + +DEST_TABLE = "mozdata-nonprod.analysis.akomar_field_classifications_v1" +DEST_PROJECT = "mozdata-nonprod" +TAXONOMY_PATH = Path(__file__).parent / "taxonomy.json" +DEFAULT_OUTPUT_PATH = Path(__file__).parent / "classifications.csv" + +MODEL = "gemini-3.1-flash-lite-preview" +TABLES = [ + "ads_backend_stable.interaction_v1", + "ads_derived.ad_metrics_v1", + "firefox_desktop_stable.newtab_v1", + "firefox_desktop_derived.newtab_clients_daily_v2", + "firefox_desktop_stable.quick_suggest_v1", + "search_terms_derived.suggest_impression_sanitized_v3", + "contextual_services_derived.event_aggregates_suggest_v1", + "contextual_services_derived.request_payload_suggest_v2", + "search_terms_derived.adm_daily_aggregates_v1", +] + +HEADER_ROW = [ + "dataset", + "table", + "column_name", + "category", + "category_simple", + "data_collection_category", + "confidence", + "reasoning", + "needs_review", +] + + +def load_data_types(): + """Return the set of taxonomy labels that are CSV-level 'Data type' entries.""" + taxonomy = json.loads(TAXONOMY_PATH.read_text()) + return {e["label"] for e in taxonomy if e.get("level") == "data_type"} + + +def category_simple(label, data_types): + """Walk dot-ancestors of `label` to find the closest Data type match.""" + if not label: + return "" + parts = label.split(".") + for i in range(len(parts), 0, -1): + candidate = ".".join(parts[:i]) + if candidate in data_types: + return candidate + return label # bare subject (e.g. "user") — keep as-is + + +def fetch_rows(bq_client): + """Query the classifications table for the hardcoded scope.""" + query = f""" + SELECT + source_dataset, source_table, column_name, + primary_label, data_collection_category, + confidence, reasoning, needs_review + FROM `{DEST_TABLE}` + WHERE model = @model + AND CONCAT(source_dataset, '.', source_table) IN UNNEST(@tables) + ORDER BY source_dataset, source_table, column_name + """ + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ScalarQueryParameter("model", "STRING", MODEL), + bigquery.ArrayQueryParameter("tables", "STRING", TABLES), + ] + ) + return list(bq_client.query(query, job_config=job_config).result()) + + +def build_rows(bq_rows, data_types): + """Map BQ rows into the output CSV layout.""" + out = [] + for r in bq_rows: + out.append([ + r.source_dataset or "", + r.source_table or "", + r.column_name or "", + r.primary_label or "", + category_simple(r.primary_label, data_types), + r.data_collection_category or "", + r.confidence or "", + r.reasoning or "", + "TRUE" if r.needs_review else "FALSE" if r.needs_review is False else "", + ]) + return out + + +def write_csv(rows, fh): + """Write header + data rows to a file handle as CSV.""" + writer = csv.writer(fh) + writer.writerow(HEADER_ROW) + writer.writerows(rows) + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--stdout", + action="store_true", + help=f"Write CSV to stdout instead of {DEFAULT_OUTPUT_PATH.name}.", + ) + return parser.parse_args() + + +def main(): + """Pull classifications from BQ and write a CSV for manual paste into Sheets.""" + args = parse_args() + + data_types = load_data_types() + logging.info(f"Loaded {len(data_types)} 'Data type' entries from taxonomy") + + bq_client = bigquery.Client(project=DEST_PROJECT) + bq_rows = fetch_rows(bq_client) + logging.info(f"Fetched {len(bq_rows)} rows from BQ") + + rows = build_rows(bq_rows, data_types) + + if args.stdout: + write_csv(rows, sys.stdout) + else: + with open(DEFAULT_OUTPUT_PATH, "w", newline="") as fh: + write_csv(rows, fh) + logging.info(f"Wrote {len(rows)} rows to {DEFAULT_OUTPUT_PATH}") + + +if __name__ == "__main__": + main() diff --git a/script/metadata/classification/fxa_classification_plan.md b/script/metadata/classification/fxa_classification_plan.md new file mode 100644 index 00000000000..55437fa6edf --- /dev/null +++ b/script/metadata/classification/fxa_classification_plan.md @@ -0,0 +1,119 @@ +# Plan: classify all FxA (Mozilla Accounts) data + +**Goal.** Run the classification pipeline over *all* Firefox Accounts / Mozilla +Accounts data as a test, once the productionized profiler is integrated (see +[`profiler_productionization_plan.md`](profiler_productionization_plan.md)). + +**Prerequisite.** This depends on the profiler-integration work being done first +(vendored profiler -> `akomar_column_profiles_v1`, classifier reading raw stats +with optional descriptions). Do that first; this doc is the FxA-specific layer on +top. + +## Why FxA is a different (and harder) test than the ads tables + +The ads test was mostly Glean-sourced tables with probe definitions. FxA is +**mixed-provenance and PII-dense**, which stresses exactly the parts of the +classifier that the Glean path papers over. + +## Scope: what "all FxA data" actually is + +Discovered from `sql/moz-fx-data-shared-prod/` (verify table lists at runtime via +`INFORMATION_SCHEMA` — counts below are repo dir entries, approximate): + +| Dataset | ~Tables | Provenance | Phase-2 probes? | Notes | +|---|---|---|---|---| +| `accounts_db_external` | ~42 | **Fivetran MySQL mirror** of the FxA backend DB | **No** (not telemetry) | **ACL-restricted** to `accounts-confidential`. The PII core: accounts, emails, tokens, devices, oauth, carts. | +| `accounts_db_nonprod_external` | ~42 | nonprod mirror | No | Likely synthetic/test data — probably **exclude** (confirm it isn't real). | +| `firefox_accounts_derived` | ~51 | Legacy server-side FxA events (auth/content/delete events, amplitude exports, sanitized docker logs) | **Sparse/none** (legacy, not Glean pings) | The legacy events core. | +| `firefox_accounts` | ~24 | Views over `firefox_accounts_derived` | n/a | Views — the profiler skips VIEW/MATERIALIZED VIEW, so these are auto-excluded. | +| `accounts_backend` + `_derived` + `_external` | ~15 | **Glean** (server-side Glean app) | **Yes** | Probe matching + `data_sensitivity` available here. | +| `accounts_frontend` + `_derived` | ~6 | **Glean** (JS Glean on accounts.firefox.com) | **Yes** | Probe matching available. | + +There are also `accounts_db`, `accounts_db_derived` (small) — include if they hold +real data. + +**Action:** before running, enumerate the live datasets/tables with +`INFORMATION_SCHEMA.SCHEMATA` / `.TABLES` rather than trusting this list — dataset +membership drifts. Decide explicitly whether to include the `*_nonprod_*` mirrors. + +## The central challenge: partial probe coverage + +The classifier was built around Glean: it fuzzy-matches columns to source-ping +probes and is told to defer to a declared `data_sensitivity`. For the +**non-Glean majority of FxA** (`accounts_db_external`, `firefox_accounts_derived`), +Phase 2 will resolve **no source ping**, so there are **no probes and no +`data_sensitivity`** to lean on. Those columns get classified from **column name ++ data type + profiled stats + the `pii_suppressed` tier** alone. + +Implications: +- This is actually the most valuable signal-quality test we can run — it shows how + the classifier does when the Glean crutch is gone. +- The raw-stats-into-prompt change from the profiler-integration plan is **load + bearing** here, not optional. Without it, non-Glean FxA columns would reach the + model as name + type only. +- Expect more `needs_review=true` / `low` confidence on the DB-mirror tables. That + is correct behavior, not a bug — track the rate as a result. + +## The governance blocker: restricted PII must not leak + +`accounts_db_external` is `dataset_base_acl: restricted` (accounts-confidential +workgroup) because it contains raw PII. Two concrete risks: + +1. **Access.** The profiler/classifier credentials must be in (or granted to) the + accounts-confidential workgroup to read `accounts_db_external` at all. + Verify before running, or these tables silently fail/skip. +2. **Example-value leakage.** The productionized profiler stores `example_value` + and top `values` for every *non-suppressed* column. Its PII suppression list + (`_PII_LEAF_NAMES`: account, email, fxa, ip, password, dob, ... + `_email` + suffix) is **narrow** and misses many FxA-sensitive columns (`uid`, + `auth_salt`, `verify_hash`, `kA`/`wrap_wrap_kb`, `recovery_data`, `flow_id`, + session tokens, etc.). So raw PII would land in the profiling table — and if + that table lives in `mozdata-nonprod.analysis`, **restricted PII has leaked + into a less-restricted dataset.** + +Mitigations (pick before running, do not skip): +- **Restrict the destination.** Write FxA profiling + classification output to a + table whose ACL matches accounts-confidential, not the open + `mozdata-nonprod.analysis`. (Possibly the whole FxA test should target a + restricted dataset.) +- **And/or suppress values for the DB mirror.** Run the profiler with + value/example capture disabled for `accounts_db_external`, or extend + `_PII_LEAF_NAMES` with the FxA column vocabulary. Classification mostly needs + names + null_rate + cardinality, not literal example values — dropping example + values costs little classification signal and removes the leak. +- Confirm with data stewards (FxA / data-review) that classifying these tables + into the chosen destination is acceptable. + +## Run approach + +The profiler is **dataset-scoped**, which fits "all FxA" better than the ads +per-table approach: run it **once per FxA dataset with no `--tables`** (profiles +every base table in the dataset, one `WRITE_TRUNCATE` partition, no clobber). +Give each dataset its own partition date (as in the ads plan) so several FxA +datasets coexist in one destination table. Views are auto-skipped. + +Then per table: `lineage_probe_fetcher.py --table` (will no-op probes for the +non-Glean ones) + `field_classifier.py --table`. + +## Eval angle (worth adding for FxA specifically) + +Unlike the open-ended PoC, FxA has **knowable ground truth**: the FxA DB schema is +documented (ecosystem-platform DB reference) and many columns are unambiguously +PII. Cheap, high-value validation: +- Hand-label a set of known-sensitive FxA columns (email, uid, ip_address, + tokens, recovery keys) and check the classifier tags them `highly_sensitive` / + the right `user.unique_id.*` labels. +- Report precision on the `pii_suppressed` columns (these *should* all come out + highly_sensitive) and the `needs_review` rate on the DB-mirror tables. + +This would be the first real accuracy signal for the classifier — the PoC's +explicit non-goal was "no ground-truth eval," and FxA is the natural place to +start one. + +## Open questions + +- Destination ACL: restricted dataset vs. nonprod analysis with values suppressed? +- Include `accounts_db_nonprod_external` and other `*_nonprod_*`? (Probably no.) +- Do we have accounts-confidential read access under the credentials we'll run as? +- Is there value in classifying the legacy `firefox_accounts_derived` sanitized / + amplitude-export tables, or scope to the live DB mirror + Glean apps? diff --git a/script/metadata/classification/profiler_productionization_plan.md b/script/metadata/classification/profiler_productionization_plan.md new file mode 100644 index 00000000000..79892f43da9 --- /dev/null +++ b/script/metadata/classification/profiler_productionization_plan.md @@ -0,0 +1,190 @@ +# Plan: consume the productionized profiler in the classification pipeline + +**Goal.** Replace this PoC's home-grown Phase 1 (`field_profiler.py`) with the +productionized column profiler from +[bigquery-etl PR #9503](https://github.com/mozilla/bigquery-etl/pull/9503) +(DENG-11204, merged 2026-06-08), writing to **our own** nonprod table until the +rest of the pipeline is productionized. We want **classification only** — no +description generation. + +This plan covers Phase 1 only. Phases 2 (lineage/probes) and 4 (classifier) keep +running from this branch's scripts, with the small classifier edits in §4. + +## Background / findings + +These were established by reading both code bases and adversarially verifying the +two load-bearing claims (both **confirmed**): + +- **The classifier's only profiling-derived prompt signal is `pass1_description`.** + `field_classifier.load_phase1()` selects exactly `source_project, + source_dataset, source_table, column_name, data_type, pass1_description` + (`WHERE column_tier != 'undocumented' AND pass1_description IS NOT NULL`), and + `build_classification_prompt()` interpolates only `table`, `column_name`, + `data_type`, `pass1_description`, plus the Phase-2 probe block. `null_rate`, + `distinct_count`, `is_high_cardinality`, `example_value`, and `values` are + never selected and never reach the model. + - Consequence: dropping descriptions removes the classifier's *only* + data-driven signal unless we replace it. The production table carries all the + raw stats, so we feed those into the prompt instead — strictly richer than + the lossy description summary, and it removes the dependency on the + description-generation LLM step entirely. + +- **The production `query.py` does not create its destination table.** + `save_profiles()` always loads into a partition decorator (`table$YYYYMMDD`) + with `WRITE_TRUNCATE`. A decorator load into a non-existent table **fails** + (`CREATE_IF_NEEDED` only auto-creates through a *plain* table reference, never + through `$`). In production the table pre-exists because bqetl deploys it from + `schema.yaml`; our nonprod table has no such deploy step, so we must pre-create + it (§2). + +- **Only two scripts read the Phase-1 table.** `field_classifier.py` (the + classifier) and `lineage_probe_fetcher.py` (only a `SELECT DISTINCT` of the + three identity columns, and only in its no-`--table` path — `classify_table.sh` + always passes `--table`, so the swap is invisible to it). Nothing else depends + on the profiling schema. + +- **The production profiler is self-contained**: imports only + `google.cloud.bigquery` + stdlib (no `anthropic`). Destination is fully + configurable via `--destination-project/-dataset/-table`. It adds PII + suppression (`pii_suppressed` tier — column name matched a PII pattern, never + scanned) and nested/array tiers (`nested_leaf`, `scalar_array`) beyond our + `scalar`/`leaf`/`undocumented`. `profiled_at` is `DATE` (not `TIMESTAMP`), and + there is **no** `pass1_description` column. + +## Work items + +### 1. Vendor the script (it is not on this branch) + +`column_profiles_v1/query.py` merged to `main` after this branch forked. + +```bash +git fetch origin main +git show origin/main:sql/moz-fx-data-shared-prod/data_governance_metadata_derived/column_profiles_v1/query.py \ + > script/metadata/column_profiler.py +``` + +Vendoring a copy under `script/metadata/` keeps the PoC self-contained until the +full pipeline lands. (Alternatively, run it directly from a `main` checkout.) + +### 2. Pre-create the destination table + +A decorator load will **not** auto-create the table, so create it once using the +script's own schema constant to guarantee an exact match (partitioning and +clustering must match or the loads error): + +```python +from google.cloud import bigquery +import column_profiler as cp # the vendored script + +client = bigquery.Client(project="mozdata-nonprod") +t = bigquery.Table("mozdata-nonprod.analysis.akomar_column_profiles_v1", + schema=cp._COLUMN_PROFILES_SCHEMA) +t.time_partitioning = bigquery.TimePartitioning( + type_=bigquery.TimePartitioningType.DAY, field="profiled_at") +t.clustering_fields = ["source_dataset", "source_table", "column_name"] +client.create_table(t) +``` + +### 3. Run the profiler + +Two constraints interact: + +- `--tables` (subset) requires **exactly one** `--source-datasets` entry. The 9 + target tables span **6 datasets**, so they cannot be done in one subset run. +- Each run does `WRITE_TRUNCATE` on `table$`, so two same-date runs into + the same table **clobber** each other. + +Recommended zero-edit path: **6 per-dataset runs, each into its own partition +date** of the one table. The datasets are disjoint, so no column collides across +partitions. + +```bash +P=script/metadata/column_profiler.py +D=mozdata-nonprod; DS=analysis; T=akomar_column_profiles_v1 +run() { python $P --destination-project $D --destination-dataset $DS --destination-table $T \ + --source-project moz-fx-data-shared-prod --source-datasets "$1" --tables "${@:3}" --date "$2"; } + +run ads_backend_stable 2026-06-10 interaction_v1 +run ads_derived 2026-06-09 ad_metrics_v1 +run firefox_desktop_stable 2026-06-08 newtab_v1 quick_suggest_v1 +run firefox_desktop_derived 2026-06-07 newtab_clients_daily_v2 +run search_terms_derived 2026-06-06 suggest_impression_sanitized_v3 adm_daily_aggregates_v1 +run contextual_services_derived 2026-06-05 event_aggregates_suggest_v1 request_payload_suggest_v2 +``` + +Use `--dry-run` first to confirm the work list. + +Alternatives, if the synthetic dates are undesirable: +- **One whole-dataset run** (all 6 datasets, no `--tables`, single real date, one + `WRITE_TRUNCATE`, no clobber) — but it profiles *every* table in those + datasets; `firefox_desktop_stable` alone is dozens of large ping tables, so + this is expensive. Not recommended. +- **One-line edit** to the vendored copy (`WRITE_TRUNCATE` -> `WRITE_APPEND` and + drop the `$` decorator) so all 6 runs append to one real-date partition. Clean, + but it's a fork of the production script. + +### 4. Adapt `field_classifier.py` + +The only real code change (~40 lines, mostly `load_phase1` + the prompt): + +- **Point `PHASE1_TABLE`** at + `mozdata-nonprod.analysis.akomar_column_profiles_v1`. +- **Drop `AND pass1_description IS NOT NULL`** and stop unconditionally selecting + that column (it does not exist in this schema -> would error). Make it optional: + at startup, probe `INFORMATION_SCHEMA.COLUMNS` for `pass1_description` and only + select it when present. (Satisfies "use descriptions if they exist, ignore if + not.") +- **Add latest-snapshot dedup** — the table is weekly snapshots, so without this + each column gets classified once per snapshot: + ```sql + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY source_project, source_dataset, source_table, column_name + ORDER BY profiled_at DESC) = 1 + ``` +- **Handle the new tiers.** Keep excluding `undocumented`. **Keep + `pii_suppressed`** — a column whose name matched a PII pattern is the + highest-signal thing to classify (almost certainly a `user.*` identifier / + highly_sensitive). Also keep `scalar_array` and `nested_leaf`. +- **Feed raw stats into the prompt** in place of the description line: + `null_rate`, `distinct_count`, `is_high_cardinality`, `example_value` / top + `values`, and a "PII-suppressed (name matched a PII pattern)" note when + applicable. Include `pass1_description` too *if* it was present. + +Phases 2 and 4 are otherwise untouched. Orchestration splits into: profile up +front (§3) -> then per-table `lineage_probe_fetcher.py --table` + +`field_classifier.py --table` (a trimmed `classify_table.sh` that drops its +`[1/3]` profiling step). + +## Gotcha: cross-project billing + +The production script creates its client with `project=--source-project` (our +sources, `moz-fx-data-shared-prod`), and that governs job execution/billing — +there is no separate destination-job-project flag. The load writes to +`mozdata-nonprod`. So the running credentials need **BQ jobUser on +`moz-fx-data-shared-prod`** + **dataEditor on `mozdata-nonprod.analysis`**. This +is a different job-project than the current `field_profiler.py` (which bills in +`mozdata-nonprod`). + +## Future awareness: PR #9557 (Phase 2 productionization) + +[PR #9557](https://github.com/mozilla/bigquery-etl/pull/9557) (DENG-11205, by +phil-lee70, **OPEN/unmerged** as of 2026-06-16) productionizes Phase 2 — this +branch's `lineage_probe_fetcher.py` — as two tables in +`data_governance_metadata_derived`: `lineage_mapping_v1` (table -> ping) and +`probe_definitions_v1` (probe defs per ping). The chain is +`column_profiles_v1 -> lineage_mapping_v1 -> probe_definitions_v1`; +`lineage_mapping_v1` reads its driving table list straight out of +`column_profiles_v1`. + +**Blocker for adopting it in the classifier later:** `probe_definitions_v1` keeps +only `probe_name`/`description`/`type` and **drops `data_sensitivity`, `tags`, +and `send_in_pings`** — exactly the Glean signals the classifier leans on (it is +told to defer to a declared `data_sensitivity`). Adopting #9557's probe table +will require extending its schema + Glean fetch first. No action now; just don't +treat it as a drop-in for Phase 2. + +## Net effort + +One vendored script, one `bq mk` / `create_table`, six profiling runs, and ~40 +lines of edits to `field_classifier.py`. Descriptions disappear as a dependency +and the classifier gains richer raw-stat signal. diff --git a/script/metadata/classify_table.sh b/script/metadata/classify_table.sh new file mode 100755 index 00000000000..22f95c39d5e --- /dev/null +++ b/script/metadata/classify_table.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash +# +# Run the classification sequence (profile → lineage/probes → classify with +# each model) for one or more BigQuery tables. Run +# `script/metadata/classification/compare_models.py --table ` +# separately to diff two model runs. +# +# Usage: +# script/metadata/classify_table.sh [ ...] +# +# Required env vars: +# ANTHROPIC_API_KEY — for Claude models +# DATAHUB_GMS_TOKEN — for lineage_probe_fetcher +# Required for Gemini models: +# `gcloud auth application-default login` (Vertex AI) +# +# Override the model list via $MODELS (space-separated). Default is a single +# Gemini run. To also classify with Claude (and enable a model comparison +# afterwards), set: +# MODELS="claude-sonnet-4-6 gemini-3.1-flash-lite-preview" + +set -euo pipefail + +MODELS="${MODELS:-gemini-3.1-flash-lite-preview}" + +if [[ $# -lt 1 ]]; then + echo "Usage: $0 [ ...]" >&2 + echo "Set \$MODELS to override the model list (default: $MODELS)." >&2 + exit 1 +fi + +# Resolve repo root so this works no matter where it's invoked from. +REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +cd "$REPO_ROOT" + +banner() { + printf '\n========================================================================\n' + printf '== %s\n' "$1" + printf '========================================================================\n' +} + +for TABLE in "$@"; do + banner "TABLE: $TABLE" + + banner "[1/3] field_profiler.py" + python script/metadata/field_profiler.py --table "$TABLE" + + banner "[2/3] lineage_probe_fetcher.py" + python script/metadata/lineage_probe_fetcher.py --table "$TABLE" + + for MODEL in $MODELS; do + banner "[3/3] field_classifier.py --model $MODEL" + python script/metadata/field_classifier.py --table "$TABLE" --model "$MODEL" + done + + banner "DONE: $TABLE" +done diff --git a/script/metadata/description_reconciler.py b/script/metadata/description_reconciler.py new file mode 100644 index 00000000000..2dc40389f9a --- /dev/null +++ b/script/metadata/description_reconciler.py @@ -0,0 +1,534 @@ +"""Reconcile Phase 1 data-driven descriptions with Phase 2 probe definitions. + +For each profiled column, finds candidate probes from the source ping, +then calls Claude to reconcile observed data (Phase 1) with source intent (Phase 2). +For columns with no probe match, fetches the ETL query from DataHub to provide +the actual computation logic (e.g. COUNTIF(is_dau) for a dau column). +Produces a final description and routing hint (global vs dataset-scoped). +""" + +import base64 +import json +import logging +import os +import re +from argparse import ArgumentParser +from datetime import datetime, timezone + +import anthropic +import requests +from google.api_core.exceptions import NotFound +from google.cloud import bigquery + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s: %(levelname)s: %(message)s" +) + +PHASE1_TABLE = "mozdata-nonprod.analysis.gkabbz_data_profiling_test_v1" +MAPPING_TABLE = "mozdata-nonprod.analysis.gkabbz_metadata_phase2_table_pings_v1" +PROBE_TABLE = "mozdata-nonprod.analysis.gkabbz_metadata_phase2_ping_probes_v1" +DEST_TABLE = "mozdata-nonprod.analysis.gkabbz_metadata_phase3_reconciled_v1" +DEST_PROJECT = "mozdata-nonprod" +CLAUDE_MODEL = "claude-sonnet-4-6" +TOP_N_PROBES = 10 + +DEST_SCHEMA = [ + bigquery.SchemaField( + "source_project", + "STRING", + mode="REQUIRED", + description="GCP project of the profiled BQ table.", + ), + bigquery.SchemaField( + "source_dataset", + "STRING", + mode="REQUIRED", + description="BQ dataset of the profiled table.", + ), + bigquery.SchemaField( + "source_table", + "STRING", + mode="REQUIRED", + description="BQ table name of the profiled table.", + ), + bigquery.SchemaField( + "column_name", + "STRING", + mode="REQUIRED", + description="Column name; dot notation for nested STRUCT leaf fields (e.g. client_info.app_channel).", + ), + bigquery.SchemaField( + "data_type", + "STRING", + mode="NULLABLE", + description="BQ data type of the column.", + ), + bigquery.SchemaField( + "source_ping", + "STRING", + mode="NULLABLE", + description="Name of the source telemetry ping, from Phase 2 lineage mapping. Null if no ping found.", + ), + bigquery.SchemaField( + "ping_platform", + "STRING", + mode="NULLABLE", + description="Telemetry platform: 'glean' or 'legacy_telemetry'. Null if no ping found.", + ), + bigquery.SchemaField( + "pass1_description", + "STRING", + mode="NULLABLE", + description="Phase 1 data-driven description based on observed values in the BQ table.", + ), + bigquery.SchemaField( + "matched_probe", + "STRING", + mode="NULLABLE", + description="Probe name Claude identified as the best match for this column. Null if no matching probe found.", + ), + bigquery.SchemaField( + "probe_description", + "STRING", + mode="NULLABLE", + description="Description of the matched probe from the source artifact (Glean Dictionary or stable table schema).", + ), + bigquery.SchemaField( + "final_description", + "STRING", + mode="NULLABLE", + description="Claude-reconciled description combining observed data (Phase 1) with source intent (Phase 2). Falls back to pass1_description on error.", + ), + bigquery.SchemaField( + "contradiction", + "STRING", + mode="NULLABLE", + description="Note flagging any contradiction between the Phase 1 observed behavior and the probe or source definition. Null if no contradiction found.", + ), + bigquery.SchemaField( + "routing_hint", + "STRING", + mode="NULLABLE", + description="Routing recommendation: 'global' (consistent across datasets, candidate for global.yaml), 'dataset' (dataset-specific), or 'unknown'.", + ), + bigquery.SchemaField( + "processed_at", + "TIMESTAMP", + mode="REQUIRED", + description="Timestamp when this column was reconciled.", + ), +] + + +GITHUB_REPO = "mozilla/bigquery-etl" +GITHUB_API = "https://api.github.com/repos/{repo}/contents/{path}" + + +def fetch_github_query(project, dataset, table): + """Fetch query.sql or query.py from GitHub for a BQ table. + + Tries query.sql first, then query.py. Uses GITHUB_TOKEN env var if set + (required for private repos; optional for public ones). + Returns (content, filename) or (None, None) if not found. + """ + token = os.environ.get("GITHUB_TOKEN") + headers = {"Accept": "application/vnd.github.v3+json"} + if token: + headers["Authorization"] = f"Bearer {token}" + + for filename in ("query.sql", "query.py"): + path = f"sql/{project}/{dataset}/{table}/{filename}" + url = GITHUB_API.format(repo=GITHUB_REPO, path=path) + try: + response = requests.get(url, headers=headers, timeout=30) + except Exception as e: + logging.warning(f"GitHub request failed for {path}: {e}") + return None, None + if response.status_code == 200: + content = base64.b64decode(response.json()["content"]).decode("utf-8") + logging.info(f" Fetched {filename} from GitHub ({len(content)} chars)") + return content, filename + elif response.status_code == 404: + continue + else: + logging.warning(f"GitHub API returned {response.status_code} for {path}") + return None, None + + return None, None + + +# --- Probe matching --- + + +def normalize_name(name): + """Strip all non-alphanumeric characters and lowercase for fuzzy matching.""" + return re.sub(r"[^a-z0-9]", "", name.lower()) + + +def find_matching_probes(column_name, probes): + """Return up to TOP_N_PROBES candidate probes for a column name. + + Scores probes by how closely their name matches the column name: + 3 — exact normalized match + 2 — probe name ends or starts with the column name (e.g. app_name → application.name) + 1 — substring match in either direction + """ + col_norm = normalize_name(column_name) + scored = [] + for probe in probes: + pname = probe.get("probe_name") or "" + if not pname: + continue + pname_norm = normalize_name(pname) + if col_norm == pname_norm: + score = 3 + elif pname_norm.endswith(col_norm) or pname_norm.startswith(col_norm): + score = 2 + elif col_norm in pname_norm or pname_norm in col_norm: + score = 1 + else: + continue + scored.append((score, probe)) + + scored.sort(key=lambda x: -x[0]) + return [p for _, p in scored[:TOP_N_PROBES]] + + +# --- Claude reconciler --- + + +def build_reconcile_prompt( + column_name, + data_type, + table, + source_ping, + ping_platform, + pass1_description, + matching_probes, + github_query=None, + github_filename=None, +): + """Build a Claude prompt to reconcile Phase 1 and Phase 2 descriptions.""" + ping_info = ( + f"{source_ping} ({ping_platform})" + if source_ping + else "unknown (no lineage resolved)" + ) + + if matching_probes: + probe_lines = [] + for p in matching_probes: + desc = p["probe_description"] or "(no description)" + probe_lines.append( + f" - {p['probe_name']} ({p['probe_type'] or 'unknown type'}): {desc}" + ) + source_section = ( + "Candidate probe definitions from the source ping:\n" + + "\n".join(probe_lines) + ) + elif github_query: + source_section = ( + f"No matching probe definitions found.\n\n" + f"ETL query ({github_filename}) from the bqetl GitHub repo:\n" + f"```sql\n{github_query}\n```" + ) + else: + source_section = "No matching probe definitions found — this column is likely computed or aggregated." + + return ( + "You are a data documentation expert for Mozilla's BigQuery data warehouse.\n\n" + f"Column: {column_name}\n" + f"Table: {table}\n" + f"Data type: {data_type}\n" + f"Source ping: {ping_info}\n\n" + f"Phase 1 description (based on observed data values):\n{pass1_description}\n\n" + f"{source_section}\n\n" + "Task:\n" + "1. Identify which probe this column maps to (if any). Probe names may use dots instead of underscores.\n" + " If an ETL query is provided, find how this column is computed in the SELECT clause.\n" + "2. Write a final 1-2 sentence description combining observed data (Phase 1) with source intent.\n" + "3. Note any contradiction between Phase 1 and the source definition.\n" + "4. Routing hint: 'global' if this field is generic across Mozilla products," + " 'dataset' if specific to this dataset or product, 'unknown' if unclear.\n\n" + "Respond with a JSON object only (no markdown fences):\n" + '{"matched_probe": "", "final_description": "", ' + '"contradiction": "", "routing_hint": "global|dataset|unknown"}' + ) + + +def call_claude(claude_client, prompt): + """Call Claude and parse the JSON response.""" + response = claude_client.messages.create( + model=CLAUDE_MODEL, + max_tokens=1024, + messages=[{"role": "user", "content": prompt}], + ) + text = response.content[0].text.strip() + # Strip markdown fences if Claude wraps the JSON despite being told not to + text = re.sub(r"^```(?:json)?\s*", "", text) + text = re.sub(r"\s*```$", "", text) + return json.loads(text) + + +def reconcile_column( + claude_client, + column_name, + data_type, + table, + source_ping, + ping_platform, + pass1_description, + matching_probes, + github_query=None, + github_filename=None, +): + """Reconcile one column and return a result dict, falling back to Phase 1 on error.""" + prompt = build_reconcile_prompt( + column_name, + data_type, + table, + source_ping, + ping_platform, + pass1_description, + matching_probes, + github_query, + github_filename, + ) + try: + return call_claude(claude_client, prompt) + except Exception as e: + logging.error(f"Claude call failed for {column_name}: {e}") + return { + "matched_probe": None, + "final_description": pass1_description, + "contradiction": None, + "routing_hint": "unknown", + } + + +# --- BQ loaders --- + + +def load_phase1(bq_client, project=None, dataset=None, table=None): + """Load Phase 1 columns grouped by (project, dataset, table). + + Skips undocumented columns (no pass1_description). + """ + where = "WHERE column_tier != 'undocumented' AND pass1_description IS NOT NULL" + if project and dataset and table: + where += f" AND source_project = '{project}' AND source_dataset = '{dataset}' AND source_table = '{table}'" + query = f""" + SELECT source_project, source_dataset, source_table, column_name, data_type, pass1_description + FROM `{PHASE1_TABLE}` + {where} + ORDER BY source_dataset, source_table, column_name + """ + rows_by_table = {} + for row in bq_client.query(query).result(): + key = (row.source_project, row.source_dataset, row.source_table) + if key not in rows_by_table: + rows_by_table[key] = [] + rows_by_table[key].append( + { + "column_name": row.column_name, + "data_type": row.data_type, + "pass1_description": row.pass1_description, + } + ) + return rows_by_table + + +def load_ping_mapping(bq_client): + """Load table-to-ping mapping from Phase 2 mapping table.""" + query = f""" + SELECT source_project, source_dataset, source_table, source_ping, ping_platform + FROM `{MAPPING_TABLE}` + """ + mapping = {} + for row in bq_client.query(query).result(): + key = (row.source_project, row.source_dataset, row.source_table) + mapping[key] = { + "source_ping": row.source_ping, + "ping_platform": row.ping_platform, + } + return mapping + + +def load_probes_by_ping(bq_client): + """Load all probe definitions from Phase 2, grouped by (ping_platform, source_ping).""" + query = f""" + SELECT ping_platform, source_ping, probe_name, probe_description, probe_type + FROM `{PROBE_TABLE}` + """ + probes_by_ping = {} + for row in bq_client.query(query).result(): + key = (row.ping_platform, row.source_ping) + if key not in probes_by_ping: + probes_by_ping[key] = [] + probes_by_ping[key].append( + { + "probe_name": row.probe_name, + "probe_description": row.probe_description, + "probe_type": row.probe_type, + } + ) + return probes_by_ping + + +def get_already_reconciled(bq_client): + """Return set of (project, dataset, table, column) already in the Phase 3 table.""" + try: + query = f""" + SELECT source_project, source_dataset, source_table, column_name + FROM `{DEST_TABLE}` + """ + return { + (r.source_project, r.source_dataset, r.source_table, r.column_name) + for r in bq_client.query(query).result() + } + except NotFound: + return set() + + +# --- BQ writer --- + + +def save_to_bq(bq_client, records): + """Append reconciled records to the Phase 3 table.""" + job_config = bigquery.LoadJobConfig( + schema=DEST_SCHEMA, + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + ) + job = bq_client.load_table_from_json(records, DEST_TABLE, job_config=job_config) + job.result() + logging.info(f"Saved {len(records)} rows to {DEST_TABLE}") + + +# --- Main --- + + +def parse_args(): + """Parse command line arguments.""" + parser = ArgumentParser( + description="Reconcile Phase 1 and Phase 2 descriptions to produce final field descriptions." + ) + parser.add_argument( + "--table", + help="Fully qualified BQ table (project.dataset.table). If omitted, processes all Phase 1 tables.", + ) + return parser.parse_args() + + +def main(): + """Reconcile Phase 1 data descriptions with Phase 2 probe definitions for all profiled tables.""" + args = parse_args() + bq_client = bigquery.Client(project=DEST_PROJECT) + claude_client = anthropic.Anthropic() + + project, dataset, table = (None, None, None) + if args.table: + project, dataset, table = args.table.split(".") + + logging.info("Loading Phase 1, Phase 2 mapping and probes...") + phase1 = load_phase1(bq_client, project, dataset, table) + ping_mapping = load_ping_mapping(bq_client) + probes_by_ping = load_probes_by_ping(bq_client) + already_done = get_already_reconciled(bq_client) + logging.info( + f"{len(phase1)} tables to process, {len(already_done)} columns already reconciled" + ) + + now = datetime.now(timezone.utc).isoformat() + + for (proj, ds, tbl), columns in phase1.items(): + ping_info = ping_mapping.get((proj, ds, tbl), {}) + source_ping = ping_info.get("source_ping") + ping_platform = ping_info.get("ping_platform") + ping_probes = ( + probes_by_ping.get((ping_platform, source_ping), []) if source_ping else [] + ) + + pending = [ + c for c in columns if (proj, ds, tbl, c["column_name"]) not in already_done + ] + if not pending: + logging.info( + f"Skipping {ds}.{tbl} — all {len(columns)} columns already reconciled" + ) + continue + + # Fetch GitHub query once per table — used for columns with no probe match + has_unmatched = any( + not find_matching_probes(c["column_name"], ping_probes) for c in pending + ) + github_query, github_filename = None, None + if has_unmatched: + logging.info(f" Fetching query from GitHub for {ds}.{tbl}") + github_query, github_filename = fetch_github_query(proj, ds, tbl) + if not github_query: + logging.info(f" No GitHub query found for {ds}.{tbl}") + + logging.info( + f"--- {ds}.{tbl} | ping: {source_ping or 'none'} | {len(ping_probes)} probes | {len(pending)} columns ---" + ) + + records = [] + for col in pending: + col_name = col["column_name"] + pass1 = col["pass1_description"] + matching_probes = find_matching_probes(col_name, ping_probes) + + # Only pass GitHub query for columns with no probe match + col_github_query = github_query if not matching_probes else None + col_github_filename = github_filename if not matching_probes else None + logging.info( + f" {col_name} → {len(matching_probes)} probes {'(+GitHub query)' if col_github_query else ''}" + ) + + result = reconcile_column( + claude_client, + col_name, + col["data_type"], + f"{ds}.{tbl}", + source_ping, + ping_platform, + pass1, + matching_probes, + github_query=col_github_query, + github_filename=col_github_filename, + ) + + matched_probe = result.get("matched_probe") + probe_desc = None + if matched_probe: + probe_match = next( + (p for p in ping_probes if p["probe_name"] == matched_probe), None + ) + probe_desc = probe_match["probe_description"] if probe_match else None + + records.append( + { + "source_project": proj, + "source_dataset": ds, + "source_table": tbl, + "column_name": col_name, + "data_type": col["data_type"], + "source_ping": source_ping, + "ping_platform": ping_platform, + "pass1_description": pass1, + "matched_probe": matched_probe, + "probe_description": probe_desc, + "final_description": result.get("final_description"), + "contradiction": result.get("contradiction"), + "routing_hint": result.get("routing_hint"), + "processed_at": now, + } + ) + + save_to_bq(bq_client, records) + logging.info(f" Done {ds}.{tbl} ({len(records)} columns)") + + +if __name__ == "__main__": + main() diff --git a/script/metadata/field_classifier.py b/script/metadata/field_classifier.py new file mode 100644 index 00000000000..dae0b6c2ee6 --- /dev/null +++ b/script/metadata/field_classifier.py @@ -0,0 +1,501 @@ +"""Classify each profiled column against the Mozilla data taxonomy. + +Reads: + - akomar_data_profiling_v1 (Phase 1: profile + pass1 description) + - akomar_metadata_phase2_table_pings_v1 (Phase 2: source ping per table) + - akomar_metadata_phase2_ping_probes_v1 (Phase 2: probes w/ data_sensitivity, tags) + - classification/taxonomy.json (preprocessed taxonomy) + +Writes: + - akomar_field_classifications_v1 — one row per column per table +""" + +import json +import logging +import re +from argparse import ArgumentParser +from datetime import datetime, timezone +from pathlib import Path + +import anthropic +from google import genai +from google.api_core.exceptions import NotFound +from google.cloud import bigquery +from google.genai.types import HttpOptions + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s: %(levelname)s: %(message)s" +) + +PHASE1_TABLE = "mozdata-nonprod.analysis.akomar_data_profiling_v1" +MAPPING_TABLE = "mozdata-nonprod.analysis.akomar_metadata_phase2_table_pings_v1" +PROBE_TABLE = "mozdata-nonprod.analysis.akomar_metadata_phase2_ping_probes_v1" +DEST_TABLE = "mozdata-nonprod.analysis.akomar_field_classifications_v1" +DEST_PROJECT = "mozdata-nonprod" +DEFAULT_MODEL = "claude-sonnet-4-6" +GEMINI_VERTEX_PROJECT = "mozdata" +GEMINI_VERTEX_LOCATION = "global" +TAXONOMY_PATH = Path(__file__).parent / "classification" / "taxonomy.json" +TOP_N_PROBES = 3 + + +def is_claude_model(name): + """Anthropic-hosted Claude model name (e.g. claude-sonnet-4-6).""" + return name.startswith("claude-") + + +def is_gemini_model(name): + """Vertex-hosted Gemini model name (e.g. gemini-3.1-flash-lite-preview).""" + return name.startswith("gemini-") + + +DEST_SCHEMA = [ + bigquery.SchemaField("source_project", "STRING", mode="REQUIRED"), + bigquery.SchemaField("source_dataset", "STRING", mode="REQUIRED"), + bigquery.SchemaField("source_table", "STRING", mode="REQUIRED"), + bigquery.SchemaField("column_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("data_type", "STRING", mode="NULLABLE"), + bigquery.SchemaField( + "primary_label", + "STRING", + mode="NULLABLE", + description="Most specific matching taxonomy label (e.g. user.unique_id.client_id).", + ), + bigquery.SchemaField( + "secondary_labels", + "STRING", + mode="REPEATED", + description="Additional taxonomy labels that also apply.", + ), + bigquery.SchemaField( + "confidence", + "STRING", + mode="NULLABLE", + description="high | medium | low — the model's self-reported confidence.", + ), + bigquery.SchemaField( + "reasoning", + "STRING", + mode="NULLABLE", + description="1-2 sentence justification referencing the signals used.", + ), + bigquery.SchemaField( + "needs_review", + "BOOLEAN", + mode="NULLABLE", + description="True when confidence is low or signals conflict.", + ), + bigquery.SchemaField("matched_probe", "STRING", mode="NULLABLE"), + bigquery.SchemaField( + "data_sensitivity", + "STRING", + mode="REPEATED", + description="Glean data_sensitivity labels from the matched probe, if any.", + ), + bigquery.SchemaField( + "data_collection_category", + "STRING", + mode="NULLABLE", + description=( + "LLM-inferred Mozilla data collection category — one of " + "'technical', 'interaction', 'web_activity', 'highly_sensitive' " + "(see https://wiki.mozilla.org/Data_Collection#Data_Collection_Categories). " + "Same scale as Glean's data_sensitivity but emitted for every row, " + "including columns with no probe match or legacy telemetry." + ), + ), + bigquery.SchemaField( + "model", + "STRING", + mode="NULLABLE", + description="Full LLM model name that produced the row, e.g. claude-sonnet-4-6 or gemini-3.1-flash-lite-preview.", + ), + bigquery.SchemaField("classified_at", "TIMESTAMP", mode="REQUIRED"), +] + + +# --- Taxonomy --- + + +def load_taxonomy(): + """Load the preprocessed taxonomy JSON.""" + return json.loads(TAXONOMY_PATH.read_text()) + + +def taxonomy_prompt_block(taxonomy): + """Compact the taxonomy into a single JSON block for the prompt.""" + compact = [ + { + "label": e["label"], + "name": e.get("display_name") or "", + "desc": e.get("description") or "", + "examples": e.get("examples") or "", + } + for e in taxonomy + ] + return json.dumps(compact, separators=(",", ":")) + + +# --- Probe matching (lifted from description_reconciler.py, top-3 only) --- + + +def normalize_name(name): + """Strip all non-alphanumeric characters and lowercase for fuzzy matching.""" + return re.sub(r"[^a-z0-9]", "", (name or "").lower()) + + +def find_matching_probes(column_name, probes): + """Return up to TOP_N_PROBES candidate probes for a column name.""" + col_norm = normalize_name(column_name) + scored = [] + for probe in probes: + pname = probe.get("probe_name") or "" + if not pname: + continue + pname_norm = normalize_name(pname) + if col_norm == pname_norm: + score = 3 + elif pname_norm.endswith(col_norm) or pname_norm.startswith(col_norm): + score = 2 + elif col_norm in pname_norm or pname_norm in col_norm: + score = 1 + else: + continue + scored.append((score, probe)) + scored.sort(key=lambda x: -x[0]) + return [p for _, p in scored[:TOP_N_PROBES]] + + +# --- Prompt --- + + +def build_classification_prompt( + column_name, data_type, table, pass1_description, matching_probes, taxonomy_json +): + """Build a prompt asking the LLM to assign a taxonomy label.""" + if matching_probes: + probe_lines = [] + for p in matching_probes: + parts = [f"name={p['probe_name']}"] + if p.get("probe_type"): + parts.append(f"type={p['probe_type']}") + if p.get("data_sensitivity"): + parts.append(f"data_sensitivity={p['data_sensitivity']}") + if p.get("tags"): + parts.append(f"tags={p['tags']}") + if p.get("probe_description"): + parts.append(f"desc={p['probe_description']}") + probe_lines.append(" - " + " | ".join(parts)) + probes_section = "Candidate probes:\n" + "\n".join(probe_lines) + else: + probes_section = "Candidate probes: none matched." + + return ( + "You are classifying a BigQuery column against Mozilla's data taxonomy.\n\n" + f"Table: {table}\n" + f"Column: {column_name}\n" + f"Data type: {data_type}\n" + f"Observed-data description (from profiling): {pass1_description}\n\n" + f"{probes_section}\n\n" + "Taxonomy (JSON list of {label, name, desc, examples}):\n" + f"{taxonomy_json}\n\n" + "Pick the single most specific taxonomy label that fits. If multiple apply," + " list the extras in secondary_labels. Use the Glean data_sensitivity signal" + " to disambiguate when present (e.g. highly_sensitive strongly implies" + " user.behavior, user.content, user.location.precise, etc.).\n\n" + "Also assign a Mozilla data collection category — exactly one of" + " 'technical', 'interaction', 'web_activity', 'highly_sensitive' (per" + " https://wiki.mozilla.org/Data_Collection#Data_Collection_Categories," + " same scale as Glean's data_sensitivity). Definitions:\n" + " - technical: build, environment, version, performance counters; no user content.\n" + " - interaction: how users interact with the product (clicks, sessions, feature usage).\n" + " - web_activity: web/search activity — URLs, search terms, visited domains.\n" + " - highly_sensitive: anything else of high sensitivity — precise location, free-form\n" + " user content, communications, demographic data, identifiers tied to a person.\n" + "If a Glean data_sensitivity is declared on the matched probe, you should usually agree," + " but the column's observed content takes precedence when they conflict (e.g. a" + " 'technical' probe carrying user content). Pick the single highest applicable category.\n\n" + "Respond with a JSON object only (no markdown fences):\n" + '{"primary_label": "