diff --git a/Cargo.lock b/Cargo.lock index d5fbacdf..d8d79e51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -462,9 +462,11 @@ dependencies = [ "config", "crossbeam-channel", "dhat", + "futures", "hex", "hiro-system-kit", "lazy_static", + "redis", "reqwest 0.12.15", "rocket", "schemars", @@ -752,6 +754,20 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "config" version = "1.0.0" @@ -810,6 +826,12 @@ dependencies = [ "libc", ] +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "crc32fast" version = "1.4.2" @@ -2062,9 +2084,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.171" +version = "0.2.175" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" +checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" [[package]] name = "libloading" @@ -2340,6 +2362,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -2356,6 +2388,15 @@ dependencies = [ "itoa", ] +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -2891,6 +2932,32 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redis" +version = "0.32.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd3650deebc68526b304898b192fa4102a4ef0b9ada24da096559cb60e0eef8" +dependencies = [ + "bytes", + "cfg-if", + "combine", + "crc16", + "futures-sink", + "futures-util", + "itoa", + "log", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "rand 0.9.0", + "ryu", + "sha1_smol", + "socket2 0.6.0", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.5.10" @@ -3582,6 +3649,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" @@ -3731,6 +3804,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "spin" version = "0.9.8" diff --git a/components/bitcoind/Cargo.toml b/components/bitcoind/Cargo.toml index 17f0a195..609e6604 100644 --- a/components/bitcoind/Cargo.toml +++ b/components/bitcoind/Cargo.toml @@ -30,6 +30,8 @@ lazy_static = "1.4.0" schemars = { version = "0.8.16", git = "https://github.com/hirosystems/schemars.git", branch = "feat-chainhook-fixes" } strum = { version = "0.23.0", features = ["derive"] } dhat = { version = "0.3.3", optional = true } +redis = { version = "0.32.5", features = ["aio", "tokio-comp", "cluster-async", "sentinel"] } +futures = "0.3" [dev-dependencies] assert-json-diff = "2.0.2" diff --git a/components/bitcoind/src/utils/mod.rs b/components/bitcoind/src/utils/mod.rs index 0071a0ef..be034048 100644 --- a/components/bitcoind/src/utils/mod.rs +++ b/components/bitcoind/src/utils/mod.rs @@ -1,4 +1,5 @@ pub mod bitcoind; +pub mod redis_notifier; use std::{ collections::{BTreeSet, VecDeque}, diff --git a/components/bitcoind/src/utils/redis_notifier.rs b/components/bitcoind/src/utils/redis_notifier.rs new file mode 100644 index 00000000..cd403dc8 --- /dev/null +++ b/components/bitcoind/src/utils/redis_notifier.rs @@ -0,0 +1,560 @@ +use std::sync::{Mutex, OnceLock}; + +use bitcoin::Network; +use config::{BitcoindConfig, RedisConfig}; +use redis::{aio::MultiplexedConnection, cluster::ClusterClient, Client, IntoConnectionInfo}; +use serde::Serialize; + +use crate::types::BlockIdentifier; + +#[derive(Serialize)] +struct IndexProgressPayload<'a> { + chain: &'a str, + network: &'a str, + indexer: &'a str, + apply_blocks: Vec, + rollback_blocks: Vec, +} + +#[derive(Serialize)] +struct Message<'a> { + id: String, + payload: IndexProgressPayload<'a>, +} + +#[derive(Serialize)] +struct BlockIndexRef { + hash: String, + index: u64, +} + +pub struct RedisNotifier { + pub queue: String, + client: RedisClientKind, + sentinel_cfg: Option<(Vec, String)>, + retry_attempts: u32, + retry_backoff_ms: u64, + database: Option, +} + +enum RedisClientKind { + Single(Client), + Cluster(ClusterClient), +} + +// No separate connection enum; we connect per attempt in `attempt_push` + +impl RedisNotifier { + pub fn new(redis: &RedisConfig) -> Result { + let client = build_redis_client(redis)?; + let sentinel_cfg = match (&redis.sentinel_nodes, &redis.sentinel_master_name) { + (Some(nodes), Some(master)) if !nodes.is_empty() => { + Some((nodes.clone(), master.clone())) + } + _ => None, + }; + Ok(Self { + queue: redis.queue.clone(), + client, + sentinel_cfg, + retry_attempts: redis.retry_attempts.unwrap_or(3), + retry_backoff_ms: redis.retry_backoff_ms.unwrap_or(200), + database: redis.database, + }) + } + + fn network_str(bitcoind: &BitcoindConfig) -> &'static str { + match bitcoind.network { + Network::Bitcoin => "mainnet", + Network::Testnet => "testnet", + Network::Regtest => todo!(), + Network::Signet => todo!(), + _ => todo!(), + } + } + + pub async fn notify( + &self, + indexer: &str, + bitcoind: &BitcoindConfig, + apply_blocks: &[BlockIdentifier], + rollback_blocks: &[BlockIdentifier], + ) -> Result<(), String> { + let payload = build_message_json(indexer, bitcoind, apply_blocks, rollback_blocks)?; + // In-test capture path (no real Redis required) + if std::env::var("REDIS_TEST_CAPTURE").ok().as_deref() == Some("1") { + capture_messages().lock().unwrap().push(payload); + return Ok(()); + } + + let mut attempts = 0; + loop { + if let Err(e) = self.attempt_push(&payload).await { + attempts += 1; + if attempts >= self.retry_attempts { + return Err(format!( + "unable to rpush redis message after {attempts} attempts: {e}" + )); + } + // If configured for Sentinel, attempt a fresh master resolution and push immediately + if let Some((nodes, master)) = &self.sentinel_cfg { + if let Some(master_url) = + resolve_sentinel_master(nodes, master, self.database).await + { + if let Ok(client) = Client::open(master_url) { + // Create a scoped connection that will be automatically dropped + let push_result = { + let mut single_conn = client + .get_multiplexed_tokio_connection() + .await + .map_err(|e| format!("unable to get sentinel connection: {e}")); + match single_conn { + Ok(ref mut conn) => redis::cmd("RPUSH") + .arg(&self.queue) + .arg(&payload) + .query_async(conn) + .await + .map(|_: ()| ()), + Err(e) => Err(redis::RedisError::from(( + redis::ErrorKind::IoError, + "connection error", + e, + ))), + } + }; // Connection is dropped here + if push_result.is_ok() { + return Ok(()); + } + } + } + } + tokio::time::sleep(std::time::Duration::from_millis( + self.retry_backoff_ms * attempts as u64, + )) + .await; + continue; + } + return Ok(()); + } + } + + async fn attempt_push(&self, payload: &str) -> Result<(), String> { + // Use scoped connections to ensure they're properly dropped after use + match &self.client { + RedisClientKind::Single(client) => { + // Connection is scoped and dropped at the end of this block + let mut conn: MultiplexedConnection = client + .get_multiplexed_tokio_connection() + .await + .map_err(|e| format!("unable to get redis connection: {e}"))?; + redis::cmd("RPUSH") + .arg(&self.queue) + .arg(payload) + .query_async(&mut conn) + .await + .map(|_: ()| ()) + .map_err(|e| format!("unable to rpush redis message: {e}")) + } + RedisClientKind::Cluster(client) => { + // Connection is scoped and dropped at the end of this block + let mut conn = client + .get_async_connection() + .await + .map_err(|e| format!("unable to get redis cluster connection: {e}"))?; + redis::cmd("RPUSH") + .arg(&self.queue) + .arg(payload) + .query_async(&mut conn) + .await + .map(|_: ()| ()) + .map_err(|e| format!("unable to rpush redis message: {e}")) + } + } + } +} + +fn build_redis_client(redis: &RedisConfig) -> Result { + // Prefer cluster when cluster_nodes set + if let Some(nodes) = &redis.cluster_nodes { + if !nodes.is_empty() { + let client = ClusterClient::new(nodes.clone()) + .map_err(|e| format!("unable to create redis cluster client: {e}"))?; + return Ok(RedisClientKind::Cluster(client)); + } + } + // If sentinel provided, resolve master and connect as Single + if let (Some(sentinels), Some(master)) = (&redis.sentinel_nodes, &redis.sentinel_master_name) { + if !sentinels.is_empty() { + if let Some(master_url) = futures::executor::block_on(resolve_sentinel_master( + sentinels, + master, + redis.database, + )) { + let client = Client::open(master_url) + .map_err(|e| format!("unable to connect to sentinel-resolved master: {e}"))?; + return Ok(RedisClientKind::Single(client)); + } else { + return Err(format!("unable to resolve sentinel master '{}'", master)); + } + } + } + // Fallbacks via comma-separated URL(s) + let endpoints: Vec<&str> = redis + .url + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect(); + let endpoints = if endpoints.is_empty() { + vec![redis.url.as_str()] + } else { + endpoints + }; + for ep in endpoints { + if let Ok(info) = ep.into_connection_info() { + if let Ok(client) = Client::open(info) { + return Ok(RedisClientKind::Single(client)); + } + } + } + Err("unable to connect to any configured redis endpoint".to_string()) +} + +async fn resolve_sentinel_master( + sentinels: &Vec, + master: &str, + database: Option, +) -> Option { + for s in sentinels { + if let Ok(client) = Client::open(s.as_str()) { + if let Ok(mut conn) = client.get_multiplexed_tokio_connection().await { + let res: redis::RedisResult> = redis::cmd("SENTINEL") + .arg("get-master-addr-by-name") + .arg(master) + .query_async(&mut conn) + .await; + if let Ok(v) = res { + if v.len() == 2 { + let host = &v[0]; + let port = &v[1]; + let db = database.unwrap_or(0); + return Some(format!("redis://{}:{}/{}", host, port, db)); + } + } + } + } + } + None +} + +pub fn ensure_0x>(s: S) -> String { + let v = s.as_ref(); + if v.starts_with("0x") || v.starts_with("0X") { + v.to_string() + } else { + format!("0x{}", v) + } +} + +static CAPTURED_MESSAGES: OnceLock>> = OnceLock::new(); + +fn capture_messages() -> &'static Mutex> { + CAPTURED_MESSAGES.get_or_init(|| Mutex::new(Vec::new())) +} + +#[cfg(test)] +fn take_captured_messages() -> Vec { + let mut guard = capture_messages().lock().unwrap(); + let messages = guard.clone(); + guard.clear(); + messages +} + +fn js_sys_time_ms() -> u128 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or(0) +} + +pub fn build_message_json( + indexer: &str, + bitcoind: &BitcoindConfig, + apply_blocks: &[BlockIdentifier], + rollback_blocks: &[BlockIdentifier], +) -> Result { + let network = RedisNotifier::network_str(bitcoind); + let id = format!( + "bitcoin-{}-{}-{}", + indexer, + apply_blocks + .last() + .map(|b| b.index.to_string()) + .unwrap_or_else(|| "na".to_string()), + js_sys_time_ms() + ); + let message = Message { + id, + payload: IndexProgressPayload { + chain: "bitcoin", + network, + indexer, + apply_blocks: apply_blocks + .iter() + .map(|b| BlockIndexRef { + hash: ensure_0x(&b.hash), + index: b.index, + }) + .collect(), + rollback_blocks: rollback_blocks + .iter() + .map(|b| BlockIndexRef { + hash: ensure_0x(&b.hash), + index: b.index, + }) + .collect(), + }, + }; + serde_json::to_string(&message).map_err(|e| format!("unable to serialize redis message: {e}")) +} + +#[cfg(test)] +mod tests { + use bitcoin::Network; + use config::{BitcoindConfig, RedisConfig}; + use tokio::runtime::Runtime; + + use super::{build_message_json, ensure_0x, take_captured_messages, RedisNotifier}; + use crate::types::BlockIdentifier; + + fn sample_bitcoind(network: Network) -> BitcoindConfig { + BitcoindConfig { + network, + rpc_url: "http://localhost:18443".into(), + rpc_username: "user".into(), + rpc_password: "pass".into(), + zmq_url: "tcp://127.0.0.1:28332".into(), + } + } + + #[test] + fn test_build_message_json_for_runes() { + let bitcoind = sample_bitcoind(Network::Testnet); + let apply = vec![ + BlockIdentifier { + index: 100, + hash: ensure_0x("abc"), + }, + BlockIdentifier { + index: 101, + hash: ensure_0x("def"), + }, + ]; + let rollback = vec![BlockIdentifier { + index: 99, + hash: ensure_0x("999"), + }]; + let json = build_message_json("runes", &bitcoind, &apply, &rollback).unwrap(); + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert!(v["id"].as_str().unwrap().starts_with("bitcoin-runes-")); + assert_eq!(v["payload"]["chain"], "bitcoin"); + assert_eq!(v["payload"]["indexer"], "runes"); + assert_eq!(v["payload"]["network"], "testnet"); + let apply_blocks = v["payload"]["apply_blocks"].as_array().unwrap(); + assert_eq!(apply_blocks.len(), 2); + assert_eq!(apply_blocks[0]["hash"], "0xabc"); + assert_eq!(apply_blocks[0]["index"], 100); + assert_eq!(apply_blocks[1]["hash"], "0xdef"); + assert_eq!(apply_blocks[1]["index"], 101); + let rollback_blocks = v["payload"]["rollback_blocks"].as_array().unwrap(); + assert_eq!(rollback_blocks.len(), 1); + assert_eq!(rollback_blocks[0]["hash"], "0x999"); + assert_eq!(rollback_blocks[0]["index"], 99); + } + + #[test] + fn test_build_message_json_for_ordinals() { + let bitcoind = sample_bitcoind(Network::Testnet); + let apply = vec![BlockIdentifier { + index: 800_000, + hash: ensure_0x("feed"), + }]; + let rollback: Vec = vec![]; + let json = build_message_json("ordinals", &bitcoind, &apply, &rollback).unwrap(); + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert!(v["id"].as_str().unwrap().starts_with("bitcoin-ordinals-")); + assert_eq!(v["payload"]["chain"], "bitcoin"); + assert_eq!(v["payload"]["indexer"], "ordinals"); + assert_eq!(v["payload"]["network"], "testnet"); + let apply_blocks = v["payload"]["apply_blocks"].as_array().unwrap(); + assert_eq!(apply_blocks.len(), 1); + assert_eq!(apply_blocks[0]["hash"], "0xfeed"); + assert_eq!(apply_blocks[0]["index"], 800_000); + let rollback_blocks = v["payload"]["rollback_blocks"].as_array().unwrap(); + assert!(rollback_blocks.is_empty()); + } + + #[test] + fn test_build_message_json_for_runes_reorg() { + let bitcoind = sample_bitcoind(Network::Testnet); + let apply = vec![ + BlockIdentifier { + index: 2, + hash: ensure_0x("1235aa"), + }, + BlockIdentifier { + index: 3, + hash: ensure_0x("1236"), + }, + ]; + let rollback = vec![BlockIdentifier { + index: 2, + hash: ensure_0x("1235"), + }]; + let json = build_message_json("runes", &bitcoind, &apply, &rollback).unwrap(); + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert!(v["id"].as_str().unwrap().starts_with("bitcoin-runes-")); + assert_eq!(v["payload"]["chain"], "bitcoin"); + assert_eq!(v["payload"]["indexer"], "runes"); + assert_eq!(v["payload"]["network"], "testnet"); + let apply_blocks = v["payload"]["apply_blocks"].as_array().unwrap(); + assert_eq!(apply_blocks.len(), 2); + assert_eq!(apply_blocks[0]["hash"], "0x1235aa"); + assert_eq!(apply_blocks[0]["index"], 2); + assert_eq!(apply_blocks[1]["hash"], "0x1236"); + assert_eq!(apply_blocks[1]["index"], 3); + let rollback_blocks = v["payload"]["rollback_blocks"].as_array().unwrap(); + assert_eq!(rollback_blocks.len(), 1); + assert_eq!(rollback_blocks[0]["hash"], "0x1235"); + assert_eq!(rollback_blocks[0]["index"], 2); + } + + #[test] + fn test_build_message_json_for_ordinals_reorg() { + let bitcoind = sample_bitcoind(Network::Testnet); + let apply = vec![ + BlockIdentifier { + index: 820_001, + hash: ensure_0x("abc1"), + }, + BlockIdentifier { + index: 820_002, + hash: ensure_0x("abc2"), + }, + ]; + let rollback = vec![BlockIdentifier { + index: 820_001, + hash: ensure_0x("old1"), + }]; + let json = build_message_json("ordinals", &bitcoind, &apply, &rollback).unwrap(); + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert!(v["id"].as_str().unwrap().starts_with("bitcoin-ordinals-")); + assert_eq!(v["payload"]["chain"], "bitcoin"); + assert_eq!(v["payload"]["indexer"], "ordinals"); + assert_eq!(v["payload"]["network"], "testnet"); + let apply_blocks = v["payload"]["apply_blocks"].as_array().unwrap(); + assert_eq!(apply_blocks.len(), 2); + assert_eq!(apply_blocks[0]["hash"], "0xabc1"); + assert_eq!(apply_blocks[0]["index"], 820_001); + assert_eq!(apply_blocks[1]["hash"], "0xabc2"); + assert_eq!(apply_blocks[1]["index"], 820_002); + let rollback_blocks = v["payload"]["rollback_blocks"].as_array().unwrap(); + assert_eq!(rollback_blocks.len(), 1); + assert_eq!(rollback_blocks[0]["hash"], "0xold1"); + assert_eq!(rollback_blocks[0]["index"], 820_001); + } + + #[test] + fn test_notifier_pushes_captured_message() { + // Enable capture via env var and run notify; ensure a payload is captured + let rt = Runtime::new().unwrap(); + rt.block_on(async move { + std::env::set_var("REDIS_TEST_CAPTURE", "1"); + let notifier = RedisNotifier::new(&RedisConfig { + enabled: true, + url: "redis://127.0.0.1:6379/0".into(), + queue: "test-queue".into(), + database: None, + cluster_nodes: None, + sentinel_nodes: None, + sentinel_master_name: None, + username: None, + password: None, + retry_attempts: None, + retry_backoff_ms: None, + connection_timeout_ms: None, + command_timeout_ms: None, + }) + .unwrap(); + let bitcoind = sample_bitcoind(Network::Testnet); + let apply = vec![BlockIdentifier { + index: 1, + hash: ensure_0x("1234"), + }]; + let rollback: Vec = vec![]; + notifier + .notify("runes", &bitcoind, &apply, &rollback) + .await + .unwrap(); + std::env::remove_var("REDIS_TEST_CAPTURE"); + }); + let captured = take_captured_messages(); + assert_eq!(captured.len(), 1); + let v: serde_json::Value = serde_json::from_str(&captured[0]).unwrap(); + assert_eq!(v["payload"]["chain"], "bitcoin"); + assert_eq!(v["payload"]["indexer"], "runes"); + assert_eq!(v["payload"]["network"], "testnet"); + let apply_blocks = v["payload"]["apply_blocks"].as_array().unwrap(); + assert_eq!(apply_blocks.len(), 1); + assert_eq!(apply_blocks[0]["hash"], "0x1234"); + assert_eq!(apply_blocks[0]["index"], 1); + } + + #[test] + #[ignore = "requires local Redis at redis://127.0.0.1:6379/0"] + fn notify_pushes_to_redis() { + use tokio::runtime::Runtime; + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let queue = "it_test_q"; + let notifier = RedisNotifier::new(&RedisConfig { + enabled: true, + url: "redis://127.0.0.1:6379/0".into(), + queue: queue.into(), + database: None, + cluster_nodes: None, + sentinel_nodes: None, + sentinel_master_name: None, + username: None, + password: None, + retry_attempts: None, + retry_backoff_ms: None, + connection_timeout_ms: None, + command_timeout_ms: None, + }) + .unwrap(); + + let bitcoind = sample_bitcoind(Network::Testnet); + let apply = vec![BlockIdentifier { + index: 2, + hash: "0xabc".into(), + }]; + notifier + .notify("ordinals", &bitcoind, &apply, &[]) + .await + .unwrap(); + + let client = redis::Client::open("redis://127.0.0.1:6379/0").unwrap(); + let mut conn = client.get_multiplexed_tokio_connection().await.unwrap(); + let msg: String = redis::cmd("RPOP") + .arg(queue) + .query_async(&mut conn) + .await + .unwrap(); + let v: serde_json::Value = serde_json::from_str(&msg).unwrap(); + assert_eq!(v["payload"]["indexer"], "ordinals"); + assert_eq!(v["payload"]["apply_blocks"][0]["hash"], "0xabc"); + }); + } +} diff --git a/components/config/src/config.rs b/components/config/src/config.rs index 7905da05..fe707d43 100644 --- a/components/config/src/config.rs +++ b/components/config/src/config.rs @@ -20,6 +20,7 @@ pub struct Config { pub resources: ResourcesConfig, pub storage: StorageConfig, pub metrics: Option, + pub redis: Option, } #[derive(Clone, Debug)] @@ -78,6 +79,23 @@ pub struct MetricsConfig { pub prometheus_port: u16, } +#[derive(Clone, Debug, Default)] +pub struct RedisConfig { + pub enabled: bool, + pub url: String, + pub queue: String, + pub database: Option, // Redis database number (0-15) + pub cluster_nodes: Option>, // for Redis Cluster + pub sentinel_nodes: Option>, // for Redis Sentinel + pub sentinel_master_name: Option, + pub username: Option, + pub password: Option, + pub retry_attempts: Option, + pub retry_backoff_ms: Option, + pub connection_timeout_ms: Option, + pub command_timeout_ms: Option, +} + #[derive(Deserialize, Debug, Clone)] pub struct ResourcesConfig { pub ulimit: usize, @@ -156,6 +174,21 @@ impl Config { enabled: true, prometheus_port: 9153, }), + redis: Some(RedisConfig { + enabled: true, + url: "redis://127.0.0.1:6379/0".into(), + queue: "bitcoin:index-progress".into(), + database: None, // defaults to 0 + cluster_nodes: None, + sentinel_nodes: None, + sentinel_master_name: None, + username: None, + password: None, + retry_attempts: None, + retry_backoff_ms: None, + connection_timeout_ms: None, + command_timeout_ms: None, + }), } } diff --git a/components/config/src/generator.rs b/components/config/src/generator.rs index 756be7f8..08f5e244 100644 --- a/components/config/src/generator.rs +++ b/components/config/src/generator.rs @@ -49,6 +49,11 @@ memory_available = 16 bitcoind_rpc_threads = 2 bitcoind_rpc_timeout = 15 indexer_channel_capacity = 10 + +[redis] +enabled = true +url = "redis://127.0.0.1:6379/0" +queue = "bitcoin:index-progress" "#, network = network.to_lowercase(), ); diff --git a/components/config/src/toml.rs b/components/config/src/toml.rs index 7de33d76..5fc4bdc7 100644 --- a/components/config/src/toml.rs +++ b/components/config/src/toml.rs @@ -7,9 +7,10 @@ use bitcoin::Network; use crate::{ BitcoindConfig, Config, MetricsConfig, OrdinalsBrc20Config, OrdinalsConfig, - OrdinalsMetaProtocolsConfig, PgDatabaseConfig, ResourcesConfig, RunesConfig, StorageConfig, - DEFAULT_BITCOIND_RPC_THREADS, DEFAULT_BITCOIND_RPC_TIMEOUT, DEFAULT_INDEXER_CHANNEL_CAPACITY, - DEFAULT_LRU_CACHE_SIZE, DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT, DEFAULT_WORKING_DIR, + OrdinalsMetaProtocolsConfig, PgDatabaseConfig, RedisConfig, ResourcesConfig, RunesConfig, + StorageConfig, DEFAULT_BITCOIND_RPC_THREADS, DEFAULT_BITCOIND_RPC_TIMEOUT, + DEFAULT_INDEXER_CHANNEL_CAPACITY, DEFAULT_LRU_CACHE_SIZE, DEFAULT_MEMORY_AVAILABLE, + DEFAULT_ULIMIT, DEFAULT_WORKING_DIR, }; #[derive(Deserialize, Clone, Debug)] @@ -91,6 +92,23 @@ pub struct MetricsConfigToml { pub prometheus_port: u16, } +#[derive(Deserialize, Debug, Clone)] +pub struct RedisConfigToml { + pub enabled: bool, + pub url: String, + pub queue: String, + pub database: Option, // Redis database number (0-15) + pub cluster_nodes: Option>, // e.g. ["redis://host1:6379", "redis://host2:6379"] + pub sentinel_nodes: Option>, // e.g. ["redis://sentinel1:26379", "redis://sentinel2:26379"] + pub sentinel_master_name: Option, // e.g. "mymaster" + pub username: Option, + pub password: Option, + pub retry_attempts: Option, + pub retry_backoff_ms: Option, + pub connection_timeout_ms: Option, + pub command_timeout_ms: Option, +} + #[derive(Deserialize, Debug, Clone)] pub struct ConfigToml { pub storage: StorageConfigToml, @@ -99,6 +117,7 @@ pub struct ConfigToml { pub bitcoind: BitcoindConfigToml, pub resources: ResourcesConfigToml, pub metrics: Option, + pub redis: Option, } impl ConfigToml { @@ -160,6 +179,24 @@ impl ConfigToml { enabled: metrics.enabled, prometheus_port: metrics.prometheus_port, }); + let redis = match toml.redis { + Some(redis) => Some(RedisConfig { + enabled: true, + url: redis.url, + queue: redis.queue, + database: redis.database, + cluster_nodes: redis.cluster_nodes, + sentinel_nodes: redis.sentinel_nodes, + sentinel_master_name: redis.sentinel_master_name, + username: redis.username, + password: redis.password, + retry_attempts: redis.retry_attempts, + retry_backoff_ms: redis.retry_backoff_ms, + connection_timeout_ms: redis.connection_timeout_ms, + command_timeout_ms: redis.command_timeout_ms, + }), + None => None, + }; let config = Config { storage: StorageConfig { @@ -198,6 +235,7 @@ impl ConfigToml { zmq_url: toml.bitcoind.zmq_url, }, metrics, + redis, }; Ok(config) } diff --git a/components/ordinals/src/lib.rs b/components/ordinals/src/lib.rs index 68ec4756..049b206a 100644 --- a/components/ordinals/src/lib.rs +++ b/components/ordinals/src/lib.rs @@ -19,7 +19,7 @@ use std::{ use bitcoind::{ start_bitcoin_indexer, try_debug, try_info, types::BlockIdentifier, - utils::{future_block_on, Context}, + utils::{future_block_on, redis_notifier::RedisNotifier, Context}, Indexer, IndexerCommand, }; use config::Config; @@ -83,6 +83,13 @@ async fn new_ordinals_indexer_runloop( let mut sequence_cursor = SequenceCursor::new(); let mut brc20_cache: Option = brc20_new_cache(&config_moved); + + let redis_notifier = if let Some(redis_cfg) = &config_moved.redis { + Some(RedisNotifier::new(redis_cfg)?) + } else { + None + }; + loop { if abort_signal_moved.load(Ordering::SeqCst) { break; @@ -148,6 +155,22 @@ async fn new_ordinals_indexer_runloop( cache_l2.clear(); garbage_collect_nth_block = 0; } + // Notify redis with both apply and rollback blocks, if enabled + if let Some(notifier) = &redis_notifier { + let apply_refs: Vec = apply_blocks + .iter() + .map(|b| b.block_identifier.clone()) + .collect(); + let rollback_refs = rollback_block_ids.to_vec(); + notifier + .notify( + "ordinals", + &config_moved.bitcoind, + &apply_refs, + &rollback_refs, + ) + .await?; + } } IndexerCommand::Terminate => { break; diff --git a/components/runes/src/db/index.rs b/components/runes/src/db/index.rs index 38e9b00a..6fc274b3 100644 --- a/components/runes/src/db/index.rs +++ b/components/runes/src/db/index.rs @@ -22,7 +22,7 @@ use crate::{ pub fn get_rune_genesis_block_height(network: Network) -> u64 { match network { Network::Bitcoin => 840_000, - Network::Testnet => todo!(), + Network::Testnet => 0, Network::Signet => todo!(), Network::Regtest => todo!(), _ => todo!(), diff --git a/components/runes/src/lib.rs b/components/runes/src/lib.rs index 796d023c..e3e45680 100644 --- a/components/runes/src/lib.rs +++ b/components/runes/src/lib.rs @@ -9,7 +9,7 @@ use std::{ use bitcoind::{ start_bitcoin_indexer, try_error, try_info, try_warn, types::BlockIdentifier, - utils::{future_block_on, Context}, + utils::{future_block_on, redis_notifier::RedisNotifier, Context}, Indexer, IndexerCommand, }; use config::Config; @@ -52,6 +52,13 @@ async fn new_runes_indexer_runloop( let _profiler = dhat::Profiler::new_heap(); let mut index_cache = IndexCache::new(&config_moved, &pg_pool_moved).await; + + let redis_notifier = if let Some(redis_cfg) = &config_moved.redis { + Some(RedisNotifier::new(redis_cfg)?) + } else { + None + }; + loop { if abort_signal_moved.load(Ordering::SeqCst) { break; @@ -87,6 +94,22 @@ async fn new_runes_indexer_runloop( ) .await; } + // Notify redis with both apply and rollback blocks, if enabled + if let Some(notifier) = &redis_notifier { + let apply_refs: Vec = apply_blocks + .iter() + .map(|b| b.block_identifier.clone()) + .collect(); + let rollback_refs = rollback_block_ids.to_vec(); + notifier + .notify( + "runes", + &config_moved.bitcoind, + &apply_refs, + &rollback_refs, + ) + .await?; + } } IndexerCommand::Terminate => { break;