Skip to content

[BREAKING] refactor(data-plane): route all data access through lumid-data-app#41

Merged
timzsu merged 7 commits into
mainfrom
zsu/lumid-data-app
Jun 12, 2026
Merged

[BREAKING] refactor(data-plane): route all data access through lumid-data-app#41
timzsu merged 7 commits into
mainfrom
zsu/lumid-data-app

Conversation

@timzsu

@timzsu timzsu commented Jun 10, 2026

Copy link
Copy Markdown
Collaborator

Purpose

Routes every Lumilake data access through lumid-data-app's HTTP API. Direct psycopg to compute Postgres and direct minio to S3 are deleted: every DataRetrievalOp mode (sql / s3 / agent), the data-profile preflight, the archive layer (job records + runtime artifacts), and server-side input/output resolution all go through one client. LUMID_DATA_URL is required; LUMID_DATA_TOKEN falls back to LUMILAKE_RUNTIME_TOKEN.

Breaking surface:

  • Removed env vars: DATABASE_URL, S3_URL, S3_ENDPOINT, S3_ACCESS_KEY, S3_CONNECTION_VALUE, S3_CERT_FILE, S3_WORKER_URL, LUMILAKE_DATA_PROFILE_CONNECT_TIMEOUT_S, LUMILAKE_DATA_PROFILE_STATEMENT_TIMEOUT_S.
  • DBLocation outputs return 422 at submit (inputs still work via the catalog endpoint).
  • S3Location.connection_string removed.
  • utils/delta.py, utils/s3.py deleted.
  • Server-runtime deps drop psycopg, psycopg-pool, minio.

Changes

Migration core

  • utils/lumid_data_client.py (new): HTTP client covering profile, retrieve_sample, list_blobs, alist_blob_keys, acatalog_column_exists, blob get/put. URL-encodes keys, validates SQL identifiers, disables redirects, forwards X-Request-ID, raises on truncated listings.
  • utils/job_storage.py: PersistentJobStorage inlined onto lumid_data_client; BlobNotFound → ArchiveNotFound.
  • runtime/data_profile_utils.py: preflight pre-collects the union of folders the batch's s3 retrievals reference and lists each with limit=10000 + truncation guard. Decoupled from S3_DATA_PREFIX so template: is always treated as an absolute blob key (matches the worker's interpretation).
  • utils/data_profile_offload.py::_estimate_plan_variants: POST /profile instead of psycopg EXPLAIN.
  • runtime/runtime_graph.py: _default_sql_sampler routes through lumid_retrieve_sample; _attach_s3_cfg → _attach_lumid_cfg.
  • routes/jobs.py: _validate_db_location_live via catalog endpoint; _resolve_s3_input_values + _dump_output_locations via lumid_data_client; DBLocation outputs blocked at submit; sync _job_storage calls wrapped with asyncio.to_thread.
  • main.py, packages/sdk/.../envs.py, packages/deploy/.../doctor.py: env contract updated; positive-float validation on both LUMID_DATA_TIMEOUT_SECONDS and LUMILAKE_HTTP_TIMEOUT_SECONDS; token-fallback warning.

Migration-adjacent fixes

  • OutputOp now accepts both LLMOp and DataRetrievalOp sources + an optional path: selector. The aggregator walks dotted paths (items.table.symbol) and JSON-decodes intermediate string fields so it handles the DataFrame-serialized shape the SQL/agent workers emit. Malformed paths raise; default is mode-derived (sql + agent → items.table, s3 → items.content).
  • Scheduler _batch_requires_gpu peeks the batch and requests gpu_group_size=0 for CPU-only batches. The check delegates to FlowmeshRuntimeManager._runtime_op_requires_gpu so the scheduler peek and FlowMesh dispatch can't drift on backend vs task_type rules.
  • BaseJobManager two-phase API: reserve_batch / commit_reservation / abort_reservation. Scheduler wraps the reserve → workers → commit window in try/finally so worker-acquisition timeout (or any exception) returns the batch to the queue instead of dropping it. User-RR rotation deferred to commit so abort truly leaves queue state unchanged.
  • utils/io_locations.py::normalize_s3_literal rejects .., empty inner segments, ?, #, NUL.

Docs + examples

  • docs/{ARCHITECTURE,ENV,CLI,WORKFLOWS,E2E_DEMO}.md, README.md, packages/sdk/README.md, compose.yml, .env.example updated to reflect lumid-data-app routing; deleted-env-var migration notes in docs/ENV.md.
  • examples/templates/yaml/data-retrieval.yaml (new): three-mode smoke; uses ${NEWS_KEY} placeholder rendered with envsubst before submission.

Design

  • One source of truth for endpoint + bearer: server stamps envs.LUMID_DATA_URL / envs.LUMID_DATA_TOKEN onto every data_spec at runtime-graph build time, so workflows don't carry these fields and can't drift between submission and dispatch.
  • Two-phase batch reservation (instead of peek-only) lets the scheduler inspect batch contents (e.g. for _batch_requires_gpu) before committing the queue mutation, preserving the "selected batches never drop" invariant even with finite poll timeouts.
  • Data-profile preflight lists each template folder by absolute key (matching the worker) rather than scoping to S3_DATA_PREFIX, eliminating the surprise where a workflow's template was outside the listing root.
  • Cross-package coupling explicitly flagged: when a type: list data_spec carries s3:// items, Lumilake stamps lumid_cfg and logs a warning naming the FlowMesh-version-pairing requirement (older workers that only read s3_cfg will fail at retrieval).

Test Plan

uv run pytest tests/ --ignore=tests/cli       # full suite
uv run pytest tests/cli/                      # CLI suite
uv run mypy <touched files>
uv run ruff check src/ packages/ tests/
uv run black --check src/ packages/ tests/
uv run codespell src/ packages/ tests/ docs/ examples/

E2E against a real lumid-data-app:

export LUMID_DATA_URL=http://<lumid>:5101 LUMID_DATA_TOKEN=<your-pat>
export NEWS_KEY=$(curl -sf -H "Authorization: Bearer $LUMID_DATA_TOKEN" "$LUMID_DATA_URL/blobs?prefix=demo/unstructured/news-html/&limit=1" | jq -r '.objects[0].key')
envsubst < examples/templates/yaml/data-retrieval.yaml > /tmp/dr.yaml
lumilake deploy build && lumilake deploy restart
lumilake job submit /tmp/dr.yaml --format yaml --input 'Stock=NVDA' --output-prefix demo/data-retrieval
lumilake job watch <job-id>

Test Result

pytest:           465 passed, 1 skipped
mypy:             clean on every touched module
ruff:             clean
black --check:    197 files would be left unchanged
codespell:        clean

E2E (lk_test_v0 PAT against 192.168.6.181:5101):

Job Workflow Walltime Result
req-3SGkzdMG7PFjmkEt5cy6d3 3-mode (sql + s3 + agent) 40s ✅ completed
req-eB5yrzo54tVyR8c3pmcMYu 3-mode 52s ✅ completed
req-kbet4j9NXniNpKJuxcWmCd 2-mode (sql + s3) 6s ✅ completed

Agent retrievals occasionally returned agent returned no retrieval result from lumid-data-app's LLM backend; retries succeed. Upstream issue, independent of this PR.

Follow-up (out of scope)

  • aiohttp.ClientSession module-level singleton + retry/backoff on idempotent GETs.
  • Translate requests.HTTPError 401/413/5xx into typed HTTPException with actionable hints.
  • Parallelize _list_blobs_for_folders with bounded concurrency.
  • Typed pydantic models replacing dict-shaped data_spec.
  • Rename S3_* env + S3LocationBlob* (nothing points to S3 anymore).
  • jobs_index.json / output_index.json GET-modify-PUT race (needs ETag/CAS on lumid-data-app).
  • HTTP raise_for_status echoing response body into exception messages.
  • packages/deploy/.../stop.py COMPOSE_PROFILES=("postgres","minio","server") and _demo_data.py minio loader — separate cleanup PR.

Pre-submission Checklist
  • I have read CONTRIBUTING.md.
  • I have run uv run pre-commit run --all-files and fixed any issues.
  • I have added or updated tests covering my changes (if applicable).
  • I have verified that uv run pytest tests/ passes locally.
  • If I changed the SDK or CLI, I have verified the affected interface works locally.
  • If this is a breaking change, I have prefixed the PR title with [BREAKING] and described migration steps above.
  • I have updated documentation or config examples if user-facing behavior changed.

@timzsu timzsu mentioned this pull request Jun 11, 2026
16 tasks
timzsu added 3 commits June 11, 2026 22:27
…data-app

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu force-pushed the zsu/lumid-data-app branch from 97a2d8a to 48a9757 Compare June 11, 2026 15:47
timzsu added 3 commits June 12, 2026 11:41
…n corners

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…v dep

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…lign doctor with FlowMesh

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu force-pushed the zsu/lumid-data-app branch from 393d1f0 to 87de8d4 Compare June 12, 2026 09:49
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu merged commit 40f3834 into main Jun 12, 2026
12 checks passed
@timzsu timzsu deleted the zsu/lumid-data-app branch June 12, 2026 11:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant