A Snowflake-native pipeline that automatically ingests documents from an external stage, parses them with Reducto, and surfaces the raw chunks plus a full-document markdown view.
Everything runs inside Snowflake — no external orchestrator, no dbt, no Python environment on your machine.
- Detects new PDF uploads on a Snowflake external stage via a directory table stream.
- Sends each file to Reducto for async document parsing (via Python UDFs with external network access).
- Polls Reducto for completed results on a 5-minute schedule.
- Stores raw parse results (chunks, metadata, usage) in
collected_results. - Provides a
document_markdown_vview that assembles chunks into full-document markdown text. - Alerts on stale jobs and logs warnings to a
pipeline_warningstable.
S3 bucket
└── External stage (directory table, auto-refresh)
└── Stream (new_documents_stream)
└── Task: submit_task (triggered by stream)
└── submitted_jobs table
submitted_jobs
└── Task: collect_task (every 5 min, polls Reducto API)
└── collected_results table
└── View: document_markdown_v (full-doc markdown)
submitted_jobs + collected_results
└── Alert: stale_pending_jobs (hourly)
└── pipeline_warnings table
- A Snowflake account with
ACCOUNTADMINor equivalent privileges (needed for creating network rules, secrets, integrations, UDFs, and tasks). - A Reducto API key — sign up at reducto.ai and generate one.
- An S3 bucket (or other cloud storage) that Snowflake can access as an external stage, where you will upload PDFs.
- An existing external stage in Snowflake pointing to that bucket.
Open setup.sql in a Snowflake worksheet (or your SQL editor of choice).
Search for >>> CONFIGURE comments and replace the placeholder values:
| Placeholder | What to set |
|---|---|
| Database name | The database to create objects in (default: raw) |
| Schema name | The schema to use (default: documents) |
SECRET_STRING |
Your Reducto API key (or update it after running — see Step 3) |
| Stage name | Your external stage name if different from raw.documents.document_stage |
Select all and run setup.sql in a Snowflake worksheet. It creates everything in order:
- Network rule, secret, and external access integration for Reducto API calls
- Directory table auto-refresh and stream on the external stage
- Python UDFs (
reducto_submit,reducto_collect) - Tables (
submitted_jobs,collected_results,pipeline_warnings) - Stored procedures (
submit_new_documents,collect_results) - A view (
document_markdown_v) assembling chunks into full-document markdown - Two serverless tasks (submit, collect) — resumed automatically
- One data quality alert — resumed automatically
If you didn't set it in Step 1, update the secret now:
ALTER SECRET raw.documents.reducto_api_key
SET SECRET_STRING = 'your-actual-reducto-api-key';So that new files are detected automatically:
- Run
DESC STAGE raw.documents.document_stage;and copy the SQS queue ARN fromdirectory_notification_channel. - In AWS, set up an SNS topic (or direct SQS subscription) on your S3 bucket for
s3:ObjectCreated:*events, pointing to that SQS ARN.
See Snowflake docs on auto-refreshing directory tables for details.
For Azure or GCP storage, follow the equivalent Snowflake guide for your cloud provider.
Upload PDFs to your S3 bucket (or whatever storage backs the stage). The pipeline picks them up automatically:
- ~30 seconds: stream detects new files,
submit_taskfires, files are submitted to Reducto - ~1-2 minutes: Reducto finishes parsing
- Next collect cycle (up to 5 min):
collect_taskpicks up completed results
Total end-to-end latency: ~2-7 minutes per document.
Full-document markdown:
SELECT source_path, num_pages, full_text
FROM raw.documents.document_markdown_v
ORDER BY collected_at DESC;Raw chunks (for fine-grained access):
SELECT
c.source_path,
f.index AS chunk_index,
f.value:content::string AS chunk_text
FROM raw.documents.collected_results c,
LATERAL FLATTEN(input => c.parse_result:chunks) f
ORDER BY c.collected_at DESC, f.index;Check data quality warnings:
SELECT * FROM raw.documents.pipeline_warnings
ORDER BY detected_at DESC;SELECT task_name, state, scheduled_time, completed_time, error_message
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
RESULT_LIMIT => 50
))
ORDER BY scheduled_time DESC;SELECT
DATE_TRUNC('hour', collected_at) AS hour,
COUNT(*) AS documents_processed,
SUM(credits_used) AS total_reducto_credits
FROM raw.documents.collected_results
GROUP BY 1
ORDER BY 1 DESC;SELECT s.job_id, s.source_path, s.submitted_at,
DATEDIFF('minute', s.submitted_at, CURRENT_TIMESTAMP()) AS minutes_pending
FROM raw.documents.submitted_jobs s
LEFT JOIN raw.documents.collected_results c ON s.job_id = c.job_id
WHERE c.job_id IS NULL
ORDER BY s.submitted_at DESC;For the initial load of many documents, the pipeline processes them through the normal stream + task flow. To speed things up:
Shorten the collect interval temporarily:
ALTER TASK collect_task SUSPEND;
ALTER TASK collect_task SET SCHEDULE = '1 MINUTE';
ALTER TASK collect_task RESUME;After the backfill completes, restore the default:
ALTER TASK collect_task SUSPEND;
ALTER TASK collect_task SET SCHEDULE = '5 MINUTE';
ALTER TASK collect_task RESUME;Or run the procedures manually in a loop:
CALL submit_new_documents();
-- wait a few minutes for Reducto to process
CALL collect_results();Tasks are not running:
- Check that tasks are resumed:
SHOW TASKS;and look at thestatecolumn. - Check task history for errors:
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY()) ORDER BY SCHEDULED_TIME DESC LIMIT 10; - Tasks auto-suspend after 10 consecutive failures. Resume with
ALTER TASK <name> RESUME;
submit_task never fires:
- The stream may be empty. Upload a file, then manually refresh:
ALTER STAGE raw.documents.document_stage REFRESH; - Verify the stream has data:
SELECT SYSTEM$STREAM_HAS_DATA('raw.documents.new_documents_stream');
collect_results always returns 0 jobs:
- Reducto jobs may still be processing. Wait a few minutes.
- Check that your API key is set:
SELECT * FROM raw.documents.reducto_api_key;(requires ACCOUNTADMIN).
Data quality alert is not firing:
- Check alert state:
SHOW ALERTS; - Check alert history:
SELECT * FROM TABLE(INFORMATION_SCHEMA.ALERT_HISTORY(RESULT_LIMIT => 20)) ORDER BY SCHEDULED_TIME DESC;
To remove all pipeline objects (preserves the external stage):
-- Run teardown.sql in a Snowflake worksheetSee teardown.sql for the full script.
├── setup.sql # Creates all Snowflake objects (run once)
├── teardown.sql # Drops all pipeline objects (for reset/cleanup)
└── README.md
| Type | Name | Purpose |
|---|---|---|
| Network Rule | reducto_egress |
Allows egress to platform.reducto.ai |
| Secret | reducto_api_key |
Stores the Reducto API key |
| Integration | reducto_access |
Wires the network rule and secret for UDFs |
| Stream | new_documents_stream |
Detects new files on the stage |
| Function | reducto_submit(url) |
Submits a document to Reducto for async parsing |
| Function | reducto_collect(job_id) |
Polls Reducto for a completed parse result |
| Table | submitted_jobs |
Tracks files submitted to Reducto |
| Table | collected_results |
Stores raw Reducto parse results (chunks + metadata) |
| Table | pipeline_warnings |
Data quality warnings from alerts |
| Procedure | submit_new_documents() |
Reads stream, submits new files |
| Procedure | collect_results() |
Polls for completed Reducto jobs |
| View | document_markdown_v |
Full-document markdown assembled from chunks |
| Task | submit_task |
Stream-triggered, calls submit_new_documents() |
| Task | collect_task |
Scheduled every 5 min, calls collect_results() |
| Alert | stale_pending_jobs_alert |
Hourly check for stuck Reducto jobs |