Skip to content

feat(data-drains): add GCS, Azure Blob, BigQuery, Snowflake, and Datadog destinations#4552

Open
waleedlatif1 wants to merge 10 commits into
stagingfrom
waleedlatif1/abuja-v1
Open

feat(data-drains): add GCS, Azure Blob, BigQuery, Snowflake, and Datadog destinations#4552
waleedlatif1 wants to merge 10 commits into
stagingfrom
waleedlatif1/abuja-v1

Conversation

@waleedlatif1
Copy link
Copy Markdown
Collaborator

Summary

  • Add 5 new data-drain destinations: Google Cloud Storage, Azure Blob Storage, Google BigQuery, Snowflake, and Datadog Logs
  • Each destination implements test() + openSession()/deliver() with provider-spec-correct auth, retry, byte-accurate size guards, and abort-signal forwarding
  • Snowflake: key-pair JWT (with account-suffix stripping), 202-async polling, identifier quoting, PARSE_JSON bindings, 16 MB VARIANT guard
  • BigQuery: tabledata.insertAll with drainId-prefixed insertId dedup, partial-failure surfacing, 401 token refresh + 5xx/429 retry
  • Datadog: v2 logs intake with gzip, per-entry 1 MB + per-request 5 MB / 1000-entry guards, all sites including ap2
  • GCS: JSON API media uploads with shared retry helper, GCS-spec bucket-name validation (rejects goog/google)
  • Azure Blob: @azure/storage-blob SDK with sovereign-cloud endpointSuffix support
  • Settings UI: form specs for each destination, icons, search/UI polish
  • Docs: full destination sections in enterprise/data-drains.mdx
  • 57/57 destination tests pass; `tsc --noEmit` clean

Type of Change

  • New feature

Testing

Tested manually. New unit tests cover schema validation, retry paths, byte-accurate size guards, gzip, sovereign-cloud routing, and partial-failure handling per destination.

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@vercel
Copy link
Copy Markdown

vercel Bot commented May 11, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
docs Skipped Skipped May 11, 2026 6:04pm

Request Review

@cursor
Copy link
Copy Markdown

cursor Bot commented May 11, 2026

PR Summary

High Risk
Adds multiple new external delivery destinations with new auth flows, retry/timeout behavior, and data-shaping logic, plus a DB enum migration; failures could impact drain reliability or duplicate/lose exports if edge cases are wrong.

Overview
Extends Data Drains beyond s3/webhook to support Google Cloud Storage, Azure Blob Storage, BigQuery, Snowflake, and Datadog Logs end-to-end (API contracts, server destination registry, and DB enum values).

Adds new destination implementations with provider-specific auth and delivery semantics (object-store uploads, BigQuery insertAll with insertId dedupe + partial-failure surfacing, Snowflake SQL API w/ key-pair JWT + 202 polling, Datadog intake w/ gzip + size limits) and broad unit-test coverage for retries, validation, and error handling.

Updates the enterprise settings UI to configure these destinations (new form specs, destination labels/icons) and expands enterprise docs to describe the new destination options and their configuration.

Reviewed by Cursor Bugbot for commit 98ca6e5. Configure here.

Comment thread apps/sim/lib/api/contracts/data-drains.ts Outdated
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 11, 2026

Greptile Summary

This PR adds five new data-drain destinations — GCS, Azure Blob Storage, BigQuery, Snowflake, and Datadog — with a shared utils.ts layer (sleepUntilAborted, backoffWithJitter, parseRetryAfter, buildObjectKey, parseServiceAccount) that unifies retry, key-building, and credential-parsing logic previously duplicated in s3.ts and webhook.ts.

  • GCS: JSON API media uploads, service-account JWT auth, GCS bucket-name validation (goog/google/IP guards), test-probe upload + delete pattern matching the existing S3 style.
  • Azure Blob: @azure/storage-blob SDK with StorageSharedKeyCredential, sovereign-cloud endpointSuffix, correct CONTAINER_NAME_RE with negative-lookahead double-hyphen prevention.
  • BigQuery: tabledata.insertAll with drainId-prefixed insertId dedup, 401 token-refresh + 5xx/429 retry, partial-failure surfacing in both logs and the thrown error.
  • Snowflake: Key-pair JWT (RS256, account-suffix stripping, public-key fingerprint), 202 async polling with exponential back-off, PARSE_JSON(?) bindings, 16 MiB VARIANT guard.
  • Datadog: v2 logs intake with conditional gzip, per-entry 1 MB + per-request 5 MB uncompressed / 6 MB compressed guards, CRLF-safe NDJSON parsing, all sites including ap2.

Confidence Score: 5/5

Safe to merge — all destination implementations are additive, the DB migration is a non-destructive enum extension, and the previous round of reviewer feedback has been addressed.

The remaining findings are non-blocking: the Snowflake retry loop skips jitter, the BigQuery 401 path leaves an unconsumed response body, and both the GCS and Snowflake API contract schemas omit the regex validators present in the runtime schemas — all producing suboptimal-but-recoverable behaviour rather than data loss or incorrect delivery.

The API contract file (lib/api/contracts/data-drains.ts) is worth a second look — the GCS and Snowflake contract schemas still don't mirror the regex validations enforced at drain-run time, meaning invalid identifiers or reserved bucket names can be persisted before being rejected.

Important Files Changed

Filename Overview
apps/sim/lib/data-drains/destinations/snowflake.ts New Snowflake destination: JWT key-pair auth, 202 async polling, VARIANT 16MB guard, sleepUntilAborted in both retry loops — local computeBackoffDelay omits jitter unlike the shared backoffWithJitter helper
apps/sim/lib/data-drains/destinations/bigquery.ts New BigQuery destination: streaming insertAll with insertId dedup, 401 token-refresh, partial-failure surfacing — 401 response body is not drained before the refresh-retry, which may hinder HTTP connection reuse
apps/sim/lib/data-drains/destinations/gcs.ts New GCS destination: JSON API media uploads with shared retry helper, GCS bucket-name validation (rejects goog/google), service-account JSON parsing via shared utils
apps/sim/lib/data-drains/destinations/datadog.ts New Datadog destination: gzip compression, per-entry 1MB + per-request 5MB uncompressed / 6MB compressed guards, CRLF-safe NDJSON parsing, all sites including ap2
apps/sim/lib/data-drains/destinations/azure_blob.ts New Azure Blob destination: @azure/storage-blob SDK, sovereign-cloud endpointSuffix, correctly structured CONTAINER_NAME_RE with negative lookahead for double hyphens
apps/sim/lib/data-drains/destinations/utils.ts New shared utilities: sleepUntilAborted, backoffWithJitter (with ±20% jitter), parseRetryAfter (delta-seconds and HTTP-date), buildObjectKey, parseServiceAccount — well-extracted from s3.ts and webhook.ts
apps/sim/lib/api/contracts/data-drains.ts API contract schemas for all 5 new destinations; gcsConfigBodySchema and snowflakeConfigBodySchema omit the regex validations present in runtime schemas, so invalid values can be persisted and surface errors only at drain-run time
packages/db/migrations/0205_public_lord_hawal.sql Additive ALTER TYPE migration adding 5 new enum values BEFORE webhook; correct and idempotent for PostgreSQL

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Drain Run] --> B[NDJSON Body]
    B --> C{Destination Type}
    C -->|s3 / gcs / azure_blob| D[Object Store]
    C -->|bigquery| E[BigQuery tabledata.insertAll]
    C -->|snowflake| F[Snowflake SQL API v2]
    C -->|datadog| G[Datadog v2 Logs]
    C -->|webhook| H[HTTPS Webhook]

    D -->|GCS| D1[google-auth-library JWT + fetchWithRetry]
    D -->|Azure| D2[azure/storage-blob SDK]
    D -->|S3| D3[AWS SDK PutObjectCommand]

    E --> E1[google-auth-library JWT, 401 token refresh, sleepUntilAborted on 5xx/429]
    F --> F1[jose SignJWT RS256, 202 poll loop, VARIANT 16MB guard]
    G --> G1[gzipSync, 5MB uncompressed + 6MB wire guards, all sites]

    D1 & D2 & D3 --> OBJ[date-partitioned NDJSON object key]
    E1 & F1 & G1 & H --> DONE[DeliveryResult with locator]
    OBJ --> DONE
Loading

Reviews (9): Last reviewed commit: "improvement(data-drains): consolidate ba..." | Re-trigger Greptile

Comment thread apps/sim/ee/data-drains/destinations/registry.tsx Outdated
Comment thread apps/sim/lib/data-drains/destinations/bigquery.ts Outdated
Comment thread apps/sim/lib/data-drains/destinations/gcs.ts Outdated
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Comment thread apps/sim/lib/data-drains/destinations/snowflake.ts
Comment thread apps/sim/lib/data-drains/destinations/datadog.ts
Comment thread apps/sim/lib/data-drains/destinations/gcs.ts Outdated
Comment thread apps/sim/lib/data-drains/destinations/snowflake.ts Outdated
Comment thread apps/sim/lib/data-drains/destinations/snowflake.ts Outdated
Comment thread apps/sim/lib/data-drains/destinations/bigquery.ts Outdated
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Comment thread apps/sim/lib/api/contracts/data-drains.ts Outdated
Comment thread apps/sim/lib/data-drains/destinations/gcs.ts Outdated
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Comment thread apps/sim/lib/api/contracts/data-drains.ts
Comment thread apps/sim/lib/api/contracts/data-drains.ts
Comment thread apps/sim/lib/api/contracts/data-drains.ts
Comment thread apps/sim/lib/data-drains/destinations/gcs.ts Outdated
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Comment thread apps/sim/lib/data-drains/destinations/bigquery.ts Outdated
Comment thread apps/sim/lib/api/contracts/data-drains.ts
…tKey contract

- BigQuery insertAll now wraps the fetch in try/catch inside the retry loop so DNS failures, socket resets, and timeouts are retried with backoff instead of propagating immediately.
- Align azureBlobCredentialsBodySchema with the runtime schema (min 64 / max 120 / base64 regex) so obviously invalid keys are rejected at the API boundary rather than at drain-run time.
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Comment thread apps/sim/lib/data-drains/destinations/datadog.ts
Comment thread apps/sim/lib/data-drains/destinations/datadog.ts Outdated
…JSON line context

- Extract a single parseRetryAfter helper (capped at 30s, returns number | null) into lib/data-drains/destinations/utils.ts and remove the five local copies in bigquery, datadog, gcs, snowflake, and webhook.
- Datadog parseNdjson now wraps JSON.parse in try/catch and surfaces the failing line index, matching BigQuery's parser.
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Comment thread apps/sim/lib/data-drains/destinations/datadog.ts
Comment thread apps/sim/lib/data-drains/destinations/snowflake.ts Outdated
Comment thread apps/sim/lib/data-drains/destinations/datadog.ts
- Datadog payload guard now checks the uncompressed size against the 5 MB limit and the wire size against the 6 MB compressed limit, so gzip cannot smuggle an oversized body past the client-side check.
- Snowflake VARIANT limit is 16 MiB (16,777,216 bytes), not 16,000,000 bytes — small payloads between 16 MB and 16 MiB were being rejected unnecessarily.
- Drop the unused apiKey field on Datadog PostInput; the key is already embedded in the prepared request headers.
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Comment thread apps/sim/lib/data-drains/destinations/datadog.ts Outdated
…tils

Datadog, GCS, and webhook each had byte-identical backoff helpers (BASE 500ms, MAX 30s, jitter ±20%, Retry-After floor). Lift the helper into lib/data-drains/destinations/utils.ts alongside parseRetryAfter and sleepUntilAborted, and drop the per-file copies and their BASE_BACKOFF_MS/MAX_BACKOFF_MS constants.
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Bugbot reviewed your changes and found no new issues!

Comment @cursor review or bugbot run to trigger another review on this PR

Reviewed by Cursor Bugbot for commit 98ca6e5. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant