Skip to content

eduso/databricks-metadata-orchestration-demo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Metadata-Driven Pipeline Orchestration on Databricks

A reference implementation of the metadata-driven orchestration pattern (a.k.a. "the ADF malla pattern") built on Databricks Jobs + Lakeflow + Unity Catalog.

A single SQL table defines every pipeline — its dependencies, conditions, windows, SLAs — and a 150-line Python orchestrator reads it and builds the DAG dynamically via the Jobs API v2.2.

Why this exists. Many enterprise data teams — especially in banking — have built a "metadata framework" on top of Azure Data Factory: a control table lists pipelines, an executor loops through it. The pattern is solid; the tool is expensive and hard to observe. This repo shows the same pattern, same ergonomics, running on Databricks with native observability, lineage and governance.


What you get

  • 10 representative banking pipelines — streaming ingestion, CDC, DLT silver/gold, conditional EOD, multi-prerequisite regulatory reporting, intraday wealth positions, cross-LoB reconciliation
  • Dynamic DAG construction from a metadata table (Delta in Unity Catalog)
  • 3 control patterns demonstrated: simple dependency, multi-prerequisite, conditional execution via SQL expressions
  • Synthetic banking data (~280K rows across 4 bronze tables) — no real customer data required
  • Databricks Asset Bundle (DAB) for one-command deploy
  • Observability dashboard spec (queries) using system.lakeflow.*
  • Runs end-to-end in ~2 minutes on serverless jobs compute — cost < $1 per full demo

Architecture

flowchart TB
  subgraph UC ["Unity Catalog (configurable catalog, default: banking_demo_metadata)"]
    M[(control.pipeline_metadata<br/>10 pipelines)]
    B[bronze.*<br/>card_transactions<br/>core_banking_cdc<br/>wealth_positions<br/>regulatory_refdata]
    S[silver.*<br/>transactions<br/>core_banking_accounts<br/>wealth_positions_hourly]
    G[gold.*<br/>eod_balances<br/>customer_360]
  end

  MO["02_master_orchestrator (notebook)<br/>reads metadata → builds tasks[]<br/>submits to /api/2.2/jobs/runs/submit"]

  subgraph DAG ["Dynamically-generated Jobs DAG"]
    direction LR
    T1[ingest_*] --> T2[dlt_silver_*]
    T2 --> T3[eod_*]
    T2 --> T4[dlt_gold_*]
    T3 --> T5[regulatory_*]
    GATE["condition_task gates<br/>(for pipelines with run_condition)"]
  end

  SYSLAKE[(system.lakeflow.*<br/>job_run_timeline<br/>job_task_run_timeline)]
  DASH["Lakeview dashboard<br/>KPIs · SLA · Timeline"]

  M --> MO
  MO --> DAG
  DAG --> B
  B --> S
  S --> G
  DAG -.run logs.-> SYSLAKE
  SYSLAKE --> DASH
Loading

The metadata table (source of truth)

Columns mapped to Databricks Jobs API primitives:

Metadata column Jobs API v2.2 equivalent
pipeline_id task_key
pipeline_type (NOTEBOOK / SQL / DLT / PYTHON / LAKEFLOW_CONNECT) notebook_task / sql_task / pipeline_task / …
depends_on (array) depends_on: [{task_key: …}]
run_condition (SQL expr) injected evaluate_condition task + condition_task gate
schedule_cron schedule.quartz_cron_expression
timeout_min timeout_seconds
max_retries / retry_backoff_sec max_retries / min_retry_interval_millis
parameters (map) notebook_task.base_parameters
owner (email) email_notifications.on_failure
sla_min cross-referenced against system.lakeflow.job_task_run_timeline.execution_duration_seconds

Full reference in docs/metadata_schema.md.


Quick start

Prerequisites

  • Databricks workspace with Unity Catalog enabled
  • A SQL warehouse (Serverless preferred) — get its ID from workspace UI
  • Permission to create a catalog and schemas
  • Databricks CLI v0.240+ authenticated against your workspace
# Sanity check
databricks current-user me
databricks warehouses list

Deploy in 3 commands

git clone <this-repo>
cd databricks-metadata-orchestration-demo

# 1. Validate the bundle
databricks bundle validate \
  --var warehouse_id=<your_warehouse_id>

# 2. Deploy (creates schemas, uploads notebooks)
databricks bundle deploy \
  --var warehouse_id=<your_warehouse_id>

# 3. Run the setup job (populates data + metadata)
databricks bundle run setup \
  --var warehouse_id=<your_warehouse_id>

# 4. Run the master orchestrator (builds and submits the dynamic DAG)
databricks bundle run run_master_orchestrator \
  --var warehouse_id=<your_warehouse_id>

The master orchestrator prints a URL to the submitted child run. Open it to watch the dynamic DAG execute (~20 tasks, 90-120 seconds wall time on serverless).

Custom catalog name

Default catalog is banking_demo_metadata. Override with:

databricks bundle deploy \
  --var catalog_name=my_demo_catalog \
  --var warehouse_id=<your_warehouse_id>

You'll need CREATE CATALOG permission. If you cannot create a catalog, ask an admin to create it first, then deploy — the bundle will only create schemas inside the existing catalog.


What's in the repo

databricks-metadata-orchestration-demo/
├── databricks.yml                     # DAB root — variables + targets
├── resources/
│   ├── schemas.yml                    # UC schemas: control, bronze, silver, gold
│   ├── job_setup.yml                  # SQL job: generate data + seed metadata
│   └── job_demo.yml                   # notebook job: run master_orchestrator
├── src/
│   ├── sql/
│   │   ├── 00_generate_bronze_data.sql
│   │   ├── 01_setup_metadata.sql
│   │   └── 02_setup_permissions.sql   # optional — grant read to account users
│   └── notebooks/
│       ├── 00_evaluate_condition.py   # generic condition evaluator
│       ├── 01_generic_transform.py    # fallback stub for pipelines without a dedicated notebook
│       ├── 02_master_orchestrator.py  # the core — reads metadata, builds DAG, submits
│       └── pipelines/
│           ├── p_dlt_silver_transactions.py
│           ├── p_dlt_silver_core_banking.py
│           ├── p_wealth_positions_intraday.py
│           ├── p_eod_balances_aggregation.py
│           └── p_dlt_gold_customer_360.py
├── docs/
│   ├── architecture.md                # the pattern in detail
│   ├── metadata_schema.md             # every column explained
│   └── adf_vs_databricks.md           # honest 5-dim comparison
└── README.md

The 10 seeded pipelines

ID Type LoB Pattern it demonstrates
p_ingest_cards_kafka PYTHON RETAIL streaming, no deps
p_ingest_cdc_core_banking LAKEFLOW_CONNECT RETAIL scheduled every 15 min
p_dlt_silver_transactions DLT RETAIL single-parent dep
p_dlt_silver_core_banking DLT RETAIL single-parent dep, SCD2-style
p_eod_balances_aggregation SQL RETAIL conditional (hour=23 AND business day)
p_regulatory_report SQL RETAIL multi-prerequisite + monthly window
p_stress_test_credit_portfolio PYTHON RETAIL time-window (02:00–05:00)
p_wealth_positions_intraday PYTHON WEALTH intraday + market-hours condition
p_dlt_gold_customer_360 DLT GROUP multi-LoB join (3 parents)
p_daily_reconciliation SQL GROUP multi-prerequisite + late-night window

5 of these have real implementation notebooks (silver/gold). The other 5 use the generic stub 01_generic_transform.py — perfect as placeholders for ingestion (which would be Lakeflow Connect or streaming in real life) and for the conditional ones that usually skip.


Running after deploy

Run once manually

databricks bundle run run_master_orchestrator \
  --var warehouse_id=<your_warehouse_id>

Inspect the metadata

SELECT pipeline_id, pipeline_type, depends_on, run_condition, subsidiary, criticality
FROM banking_demo_metadata.control.pipeline_metadata
ORDER BY subsidiary, pipeline_id;

See the DAG in the UI

The master job prints ✔ submitted one-time run: <url> — open that. You'll see 20 tasks (10 pipelines + 5 condition evaluators + 5 gates).

Observability (build your own Lakeview dashboard)

Base queries against system.lakeflow.job_task_run_timeline:

-- Last 7 days summary
SELECT
  count(*) AS total_runs,
  round(sum(CASE WHEN result_state='SUCCEEDED' THEN 1 ELSE 0 END) * 1.0 / count(*), 3) AS success_rate,
  round(avg(execution_duration_seconds), 1) AS avg_duration_sec
FROM system.lakeflow.job_task_run_timeline
WHERE period_start_time > date_sub(current_date(), 7)
  AND task_key IN (SELECT pipeline_id FROM banking_demo_metadata.control.pipeline_metadata);

-- Duration by pipeline
SELECT task_key, round(avg(execution_duration_seconds), 2) AS avg_duration_sec, count(*) AS runs
FROM system.lakeflow.job_task_run_timeline
WHERE period_start_time > date_sub(current_date(), 7)
  AND task_key IN (SELECT pipeline_id FROM banking_demo_metadata.control.pipeline_metadata)
  AND result_state = 'SUCCEEDED'
GROUP BY task_key
ORDER BY avg_duration_sec DESC;

-- SLA breach detection
SELECT tt.task_key, m.pipeline_name, tt.execution_duration_seconds, m.sla_min,
       CASE WHEN tt.execution_duration_seconds > m.sla_min * 60 THEN 'BREACH' ELSE 'OK' END AS sla_status
FROM system.lakeflow.job_task_run_timeline tt
JOIN banking_demo_metadata.control.pipeline_metadata m ON tt.task_key = m.pipeline_id
WHERE tt.period_start_time > date_sub(current_date(), 7)
ORDER BY tt.period_start_time DESC
LIMIT 100;

Customization

Add a new pipeline

Insert a row into pipeline_metadata. If the pipeline_id is in IMPLEMENTED_PIPELINES set in 02_master_orchestrator.py, a dedicated notebook in pipelines/<pipeline_id> is expected. Otherwise the generic stub runs.

INSERT INTO banking_demo_metadata.control.pipeline_metadata VALUES
  ('p_new_pipeline',
   'My new pipeline',
   'SQL', 'RETAIL', 'P2', 'Description here',
   array('source.table'),
   'silver.my_target',
   array('p_some_parent'),   -- depends_on
   'TRUE',                   -- run_condition
   '0 0 * * * ?',           -- schedule_cron
   NULL, NULL,
   30, 2, 60,
   map('param1','value1'),
   '/path/to/code',
   'you@example.com', 20, TRUE,
   current_timestamp(), current_timestamp());

Re-run the master orchestrator — the new pipeline is now in the DAG.

Change the execution pattern

Want Change
Only run on weekdays Set run_condition = 'dayofweek(current_date()) BETWEEN 2 AND 6'
Limit to business hours Set run_condition = 'hour(current_timestamp()) BETWEEN 9 AND 17'
Pause without deleting UPDATE ... SET enabled = FALSE WHERE pipeline_id = '...'
Chain two pipelines Add the parent's pipeline_id to the child's depends_on array

Cost

Full demo run on Azure Databricks serverless jobs compute is under $1:

Component DBU cost
SQL warehouse (setup SQL) — runs ~1 min ~$0.05
Serverless jobs — master + 20 tasks, ~2 min total ~$0.30
Dashboard queries (when viewed) <$0.10
Total per run < $0.50

Idle cost: zero (serverless + warehouse auto-stop).


FAQ

Why not just use Lakeflow Declarative Pipelines?

This pattern complements DLT — many of the 10 pipelines are DLT type and would be implemented as real DLT pipelines in production. The orchestrator handles the cross-pipeline concerns (dependencies, conditions, schedules, SLAs) that DLT alone doesn't address. It's the missing layer between "one DLT pipeline" and "hundreds of pipelines orchestrated together".

Is this a replacement for ADF?

It's a pattern migration aid. The control table + executor model maps 1:1 from ADF metadata frameworks. See docs/adf_vs_databricks.md for a dimension-by-dimension comparison.

Can I use a persistent job instead of one-time submit?

Yes — run the master with mode=create_job instead of mode=submit_once. This creates a permanent Databricks job you can schedule, re-run, and manage in the UI.

How do I version the metadata?

The metadata table is a Delta table — use time travel (SELECT * FROM ... VERSION AS OF 42) for audit. In production teams usually populate it via a DAB or SQL MERGE from a git-versioned source.

Does this work with non-Azure workspaces (AWS, GCP)?

Yes — nothing Azure-specific in the core pattern. The LAKEFLOW_CONNECT example mentions Azure SQL, but the connector is just metadata — swap for any supported source on your cloud.

What about secrets (connection strings, API keys)?

Databricks Secrets. Reference them as {{secrets/<scope>/<key>}} in parameters or as dbutils.secrets.get(...) inside the pipeline notebook.


Contributing

PRs welcome. Ideas for extension:

  • Add real Lakeflow Connect ingestion (replace the stub for p_ingest_cdc_core_banking)
  • Split the metadata into a git-sourced YAML → Delta sync
  • Add a Databricks SQL Alert definition for SLA breaches
  • Add a cycle detection test in CI
  • Add cross-workspace or cross-region extension (via Delta Sharing or UC federation)

License

Apache 2.0 — see LICENSE.


Acknowledgments

Pattern shaped by collaboration with banking data platform teams in LatAm. Built by Eduardo Sojo — Databricks Solution Architect.

About

Metadata-driven pipeline orchestration pattern on Databricks — dynamic DAG from a Delta control table, Jobs API v2.2, Unity Catalog, Lakeflow. Deployable via Databricks Asset Bundle.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages