Skip to content

gorusys/indexwave-substreams-pipeline

Repository files navigation

indexwave-substreams-pipeline

Replayable, high-throughput blockchain indexing pipeline. Ingests blocks (mock or real chain), transforms them into domain tables, and writes to ClickHouse with checkpointing and replay support.

Architecture

  • Source (src/source/): Block ingestion. Default is a deterministic mock generator for local dev; optional RPC adapter for real chains.
  • Processor (src/processor/): Parse → normalize → enrich. Deterministic transforms and stable sorting.
  • Sink (src/sink/): Batch writes to ClickHouse with retries and backpressure; idempotent upserts via ReplacingMergeTree.
  • Checkpoint (src/checkpoint/): Last processed block height and timestamp for --resume and --replay.

Data model (ClickHouse)

Table Key fields Engine
blocks height, hash, ts MergeTree
transactions tx_hash, block_height, from, to, value, fee ReplacingMergeTree
transfers tx_hash, from, to, asset, amount ReplacingMergeTree
contracts address, creator, created_height ReplacingMergeTree
events tx_hash, event_index, kind, data_json ReplacingMergeTree
indexwave_checkpoint last_block_height, last_committed_ts ReplacingMergeTree

Run locally

  1. Start ClickHouse:

    make up
  2. Ingest a range (creates DB and tables if missing):

    cargo run -- ingest --config config.toml --from-height 1 --to-height 50
  3. Stats:

    cargo run -- stats --config config.toml
  4. Replay a range (re-upsert):

    cargo run -- replay --config config.toml --from 1 --to 50
  5. Resume from checkpoint:

    cargo run -- ingest --config config.toml --resume
  6. Doctor (config + DB check):

    cargo run -- doctor --config config.toml

Replay and checkpoints

  • Checkpoint: Stored in indexwave_checkpoint (latest row by last_block_height). Updated after each successful batch.
  • --resume: Loads checkpoint and continues from last_block_height + 1 (respects --from-height / --to-height if set).
  • --replay --from N --to M: Reprocesses blocks N–M and re-inserts into ClickHouse. ReplacingMergeTree deduplicates by (block_height, tx_hash, event_index) so replays are idempotent.

Config

config.toml:

[database]
url = "http://localhost:18123"
database = "indexwave"
batch_size = 1000
max_retries = 3

[ingest]
channel_cap = 10000

[metrics]
port = 9090

Makefile

  • make up – start ClickHouse (and wait for health)
  • make down – stop services
  • make lintcargo fmt --check and cargo clippy --all-targets -- -D warnings
  • make testcargo test
  • make run – up + ingest 1–50
  • make e2e – up + ingest 1–30 + stats + replay 1–30 + stats

Troubleshooting

  • DB connection refused: Ensure ClickHouse is up (make up, then curl -s http://localhost:8123/ping).
  • Port 8123 in use: Change database.url in config and the ports in docker-compose.yml.
  • Database / table errors: Run doctor; ensure_schema creates the database and tables on first ingest.
  • Replay row count: ReplacingMergeTree merges in the background; immediate count() may show duplicates until merge. Use FINAL or wait for background merge for exact counts.

License

MIT. See LICENSE.

About

Replayable blockchain indexing pipeline: blocks to domain tables, ClickHouse sink, checkpointing.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors