Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
270e6ae
ref(options): migrate enable_any_attribute_filter to sentry-options
claude Jun 23, 2026
16891b4
ref(options): migrate read-only RPC/query runtime configs to sentry-o…
claude Jun 23, 2026
3462c2b
ref(options): migrate static Rust consumer runtime configs to sentry-…
claude Jun 23, 2026
88bbe22
ref(options): migrate db_query cache flags to sentry-options
claude Jun 23, 2026
fd5e12d
ref(options): harden get_option against unexpected client errors
claude Jun 24, 2026
5edba9d
ref(options): migrate clickhouse-http and utils config knobs
claude Jun 24, 2026
68f61da
ref(options): migrate query-processor and RPC flags to sentry-options
claude Jun 24, 2026
43b439d
ref(options): migrate storage-selector and consumer flags to sentry-o…
claude Jun 24, 2026
c4300a3
ref(options): keep http_batch_join_timeout on runtime config
claude Jun 24, 2026
65afd2b
ref(options): migrate read-through cache flags to sentry-options
claude Jun 24, 2026
c5d35d1
ref(options): migrate deletions subsystem flags to sentry-options
claude Jun 24, 2026
453e82e
ref(options): migrate routing/scheduler/validation flags to sentry-op…
claude Jun 24, 2026
a3f41e2
ref(options): migrate replacements subsystem flags to sentry-options
claude Jun 24, 2026
db76ae6
ref(options): migrate generic_metrics_use_case_killswitch to sentry-o…
claude Jun 24, 2026
7659d7a
ref(options): use get_bool_option for enable_any_attribute_filter
claude Jun 24, 2026
d147a49
Merge remote-tracking branch 'origin/master' into claude/upbeat-newto…
claude Jun 24, 2026
9d8db3c
style(rust): fix sentry_options import ordering for rustfmt
claude Jun 24, 2026
6d6e5aa
ref(options): support dynamic option names via dict-typed options
claude Jun 25, 2026
e49a098
fix(types): resolve pre-existing mypy errors in files touched by the …
claude Jun 25, 2026
81a5ede
fix(tests): force SENTRY_OPTIONS_DIR to in-repo schemas in conftest
claude Jun 25, 2026
2f39e3d
fix(tests): override max_group_ids_exclude via sentry-options in repl…
claude Jun 25, 2026
7763d9a
ref(options): migrate slicing_mega_cluster_partitions dynamic key to …
claude Jun 25, 2026
6a25d47
fix(tests): override storage-routing config via sentry-options in sel…
claude Jun 25, 2026
e43e7d7
ref(options): migrate MappingOptimizer hashmap killswitches to sentry…
claude Jun 25, 2026
f94aebf
ref(options): migrate per-storage Rust consumer configs to sentry-opt…
claude Jun 25, 2026
f676c09
ref(options): migrate query_settings to sentry-options dict options
claude Jun 25, 2026
e10a061
test(rust): cover dict-option read in get_dlq_grace_period_min
claude Jun 25, 2026
351352d
ref(options): migrate remaining read-only runtime configs to sentry-o…
claude Jun 25, 2026
f6c9db0
Merge remote-tracking branch 'origin/master' into claude/upbeat-newto…
claude Jun 25, 2026
43b0cc1
fix(options): migrate lightweight_delete_mode/lightweight_deletes_syn…
claude Jun 25, 2026
822c23e
ref(options): migrate replacements_expiry_window_minutes to sentry-op…
claude Jun 25, 2026
bae0bc1
ref(options): migrate ConfigurableComponent + routing-strategy config…
claude Jun 25, 2026
524da53
Merge remote-tracking branch 'origin/master' into claude/upbeat-newto…
claude Jun 25, 2026
edfdee3
fix(options): migrate use_array_map_columns_timestamp_seconds after m…
claude Jun 25, 2026
833e6a0
fix(tests): patch get_int_option in replacements-expiry test after mi…
claude Jun 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"sentry-arroyo>=2.39.2",
"sentry-conventions>=0.13.0",
"sentry-kafka-schemas>=2.1.36",
"sentry-options>=1.1.1",
"sentry-protos>=0.34.0",
"sentry-redis-tools>=0.5.1",
"sentry-relay>=0.9.25",
Expand Down
47 changes: 42 additions & 5 deletions rust_snuba/src/processors/eap_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use uuid::Uuid;

use sentry_arroyo::backends::kafka::types::KafkaPayload;
use sentry_arroyo::counter;
use sentry_options::options;
use sentry_protos::snuba::v1::any_value::Value;
use sentry_protos::snuba::v1::{ArrayValue, TraceItem, TraceItemType};

Expand All @@ -18,7 +19,6 @@ use crate::processors::utils::{
enforce_retention, get_drop_invalid_timestamps_enabled, out_of_valid_interval_secs,
record_invalid_timestamp_metric, SilencedDLQMessage,
};
use crate::runtime_config::get_str_config;
use crate::strategies::clickhouse::rowbinary;
use crate::types::CogsData;
use crate::types::{item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata};
Expand Down Expand Up @@ -153,10 +153,10 @@ fn get_dlq_grace_period_min(storage_name: &str) -> Option<i64> {
if storage_name.is_empty() {
return None;
}
get_str_config(&format!("{DLQ_GRACE_PERIOD_MIN_KEY}:{storage_name}"))
options("snuba")
.ok()
.flatten()
.and_then(|s| s.parse::<i64>().ok())
.and_then(|o| o.get(DLQ_GRACE_PERIOD_MIN_KEY).ok())
.and_then(|v| v.get(storage_name).and_then(|n| n.as_i64()))
.filter(|&n| n >= 0)
Comment thread
sentry[bot] marked this conversation as resolved.
}

Expand Down Expand Up @@ -662,12 +662,21 @@ mod tests {
use std::time::SystemTime;

use prost_types::Timestamp;
use sentry_options::init_with_schemas;
use sentry_options::testing::override_options;
use sentry_protos::snuba::v1::any_value::Value;
use sentry_protos::snuba::v1::{AnyValue, ArrayValue, TraceItemType};
use serde::Deserialize;
use serde_json::json;
use std::sync::Once;

use super::*;

static INIT: Once = Once::new();
fn init_options() {
INIT.call_once(|| init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap());
}

fn generate_trace_item(item_id: Uuid) -> TraceItem {
TraceItem {
attributes: Default::default(),
Expand Down Expand Up @@ -1121,10 +1130,38 @@ mod tests {

#[test]
fn test_get_dlq_grace_period_min_unset_storage_returns_none() {
// Empty storage_name short-circuits to None without hitting Python.
// Empty storage_name short-circuits to None without reading options.
assert_eq!(get_dlq_grace_period_min(""), None);
}

#[test]
fn test_get_dlq_grace_period_min_reads_dict_option() {
init_options();
{
let _guard = override_options(&[(
"snuba",
"eap_items_dlq_grace_period_min",
json!({ "eap_items_dlq_test": 45 }),
)])
.unwrap();
// Reads the per-storage entry out of the dict option (the nested get
// on the serde_json::Value returned by options(...).get(...)).
assert_eq!(get_dlq_grace_period_min("eap_items_dlq_test"), Some(45));
// A storage with no entry in the dict falls back to None.
assert_eq!(get_dlq_grace_period_min("eap_items_dlq_other"), None);
}
{
// Negative values are rejected.
let _guard = override_options(&[(
"snuba",
"eap_items_dlq_grace_period_min",
json!({ "eap_items_dlq_test": -1 }),
)])
.unwrap();
assert_eq!(get_dlq_grace_period_min("eap_items_dlq_test"), None);
}
}

#[test]
fn test_row_binary_basic_processing() {
let item_id = Uuid::new_v4();
Expand Down
25 changes: 14 additions & 11 deletions rust_snuba/src/processors/generic_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use adler::Adler32;
use anyhow::{anyhow, Context, Error};
use anyhow::{anyhow, Context};
use chrono::DateTime;
use sentry_options::options;
use serde::{
de::value::{MapAccessDeserializer, SeqAccessDeserializer},
Deserialize, Deserializer, Serialize,
};
use std::{collections::BTreeMap, marker::PhantomData, vec};

use crate::{
runtime_config::get_str_config,
types::{CogsData, InsertBatch, RowData},
KafkaMessageMetadata, ProcessorConfig,
};
Expand Down Expand Up @@ -339,8 +339,8 @@ impl Parse for CountersRawRow {
}
}

fn should_use_killswitch(config: Result<Option<String>, Error>, use_case: &MessageUseCase) -> bool {
if let Some(killswitch) = config.ok().flatten() {
fn should_use_killswitch(config: Option<String>, use_case: &MessageUseCase) -> bool {
if let Some(killswitch) = config {
return killswitch.contains(use_case.use_case_id.as_str());
}

Expand All @@ -355,7 +355,10 @@ where
T: Parse + Serialize,
{
let payload_bytes = payload.payload().context("Expected payload")?;
let killswitch_config = get_str_config("generic_metrics_use_case_killswitch");
let killswitch_config = options("snuba")
.ok()
.and_then(|o| o.get("generic_metrics_use_case_killswitch").ok())
.and_then(|v| v.as_str().map(String::from));
Comment thread
cursor[bot] marked this conversation as resolved.
let use_case: MessageUseCase = serde_json::from_slice(payload_bytes)?;

if should_use_killswitch(killswitch_config, &use_case) {
Expand Down Expand Up @@ -1358,7 +1361,7 @@ mod tests {

#[test]
fn test_shouldnt_killswitch() {
let fake_config = Ok(Some("[custom]".to_string()));
let fake_config = Some("[custom]".to_string());
let use_case = MessageUseCase {
use_case_id: "transactions".to_string(),
};
Expand All @@ -1371,7 +1374,7 @@ mod tests {
let use_case = MessageUseCase {
use_case_id: "transactions".to_string(),
};
let fake_config = Ok(Some("[transactions]".to_string()));
let fake_config = Some("[transactions]".to_string());

assert!(should_use_killswitch(fake_config, &use_case));
}
Expand All @@ -1381,7 +1384,7 @@ mod tests {
let use_case = MessageUseCase {
use_case_id: "transactions".to_string(),
};
let fake_config = Ok(Some("[transactions, custom]".to_string()));
let fake_config = Some("[transactions, custom]".to_string());

assert!(should_use_killswitch(fake_config, &use_case));
}
Expand All @@ -1391,7 +1394,7 @@ mod tests {
let use_case = MessageUseCase {
use_case_id: "transactions".to_string(),
};
let fake_config = Ok(Some("[]".to_string()));
let fake_config = Some("[]".to_string());

assert!(!should_use_killswitch(fake_config, &use_case));
}
Expand All @@ -1401,7 +1404,7 @@ mod tests {
let use_case = MessageUseCase {
use_case_id: "transactions".to_string(),
};
let fake_config = Ok(Some("".to_string()));
let fake_config = Some("".to_string());

assert!(!should_use_killswitch(fake_config, &use_case));
}
Expand All @@ -1411,7 +1414,7 @@ mod tests {
let use_case = MessageUseCase {
use_case_id: "transactions".to_string(),
};
let fake_config = Ok(None);
let fake_config: Option<String> = None;

assert!(!should_use_killswitch(fake_config, &use_case));
}
Expand Down
10 changes: 5 additions & 5 deletions rust_snuba/src/processors/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::config::EnvConfig;
use crate::runtime_config::get_str_config;
use crate::types::item_type_name;
use chrono::{DateTime, NaiveDateTime, Utc};
use schemars::JsonSchema;
use sentry_arroyo::counter;
use sentry_options::options;
use sentry_protos::snuba::v1::TraceItemType;
use serde::{Deserialize, Deserializer, Serialize};

Expand All @@ -16,7 +16,7 @@ pub const INVALID_TIMESTAMP_FUTURE_INTERVAL_SECONDS: i64 = 7 * 24 * 60 * 60;
/// timestamp dropping is enabled.
pub const INVALID_TIMESTAMP_PAST_INTERVAL_SECONDS: i64 = 30 * 24 * 60 * 60;

/// Runtime config key. When set to `"1"`, the eap-items consumer skips messages
/// sentry-options key. When `true`, the eap-items consumer skips messages
/// whose event `timestamp` is more than one week in the future or more than
/// thirty days in the past (see `out_of_valid_interval_secs`).
pub const DROP_INVALID_TIMESTAMPS_KEY: &str = "eap_items_drop_invalid_timestamps";
Expand All @@ -34,10 +34,10 @@ pub fn out_of_valid_interval_secs(ts: DateTime<Utc>, now: DateTime<Utc>) -> bool
}

pub fn get_drop_invalid_timestamps_enabled() -> bool {
get_str_config(DROP_INVALID_TIMESTAMPS_KEY)
options("snuba")
.ok()
.flatten()
.map(|s| s == "1")
.and_then(|o| o.get(DROP_INVALID_TIMESTAMPS_KEY).ok())
.and_then(|v| v.as_bool())
.unwrap_or(false)
}

Expand Down
65 changes: 33 additions & 32 deletions rust_snuba/src/rebalancing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime_config;
use sentry_options::options;
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -29,45 +29,46 @@ pub fn delay_kafka_rebalance(configured_delay_secs: u64) {
}

pub fn get_rebalance_delay_secs(consumer_group: &str) -> Option<u64> {
runtime_config::get_str_config(
format!("quantized_rebalance_consumer_group_delay_secs__{consumer_group}").as_str(),
)
.ok()??
.parse()
.ok()
// Migrated from the per-group runtime config key
// `quantized_rebalance_consumer_group_delay_secs__<consumer_group>` to a
// single `snuba` sentry-options dict keyed by consumer group. A group with
// no entry (or a non-integer value) yields no delay.
options("snuba")
.ok()
.and_then(|o| o.get("quantized_rebalance_consumer_group_delay_secs").ok())
.and_then(|v| v.get(consumer_group).and_then(|n| n.as_u64()))
}

#[cfg(test)]
mod tests {
use super::*;
use sentry_options::init_with_schemas;
use sentry_options::testing::override_options;
use serde_json::json;
use std::sync::Once;

static INIT: Once = Once::new();

fn init_options() {
INIT.call_once(|| init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap());
}

#[test]
fn test_delay_config() {
// teardown, even when the test fails
let _guard = scopeguard::guard((), |_| {
runtime_config::patch_str_config_for_test(
"quantized_rebalance_consumer_group_delay_secs__spans",
None,
);
});
init_options();

// A consumer group with no entry in the dict yields no delay.
assert_eq!(get_rebalance_delay_secs("spans"), None);

let _guard = override_options(&[(
"snuba",
"quantized_rebalance_consumer_group_delay_secs",
json!({ "spans": 420 }),
)])
.unwrap();

runtime_config::patch_str_config_for_test(
"quantized_rebalance_consumer_group_delay_secs__spans",
None,
);
let delay_secs = get_rebalance_delay_secs("spans");
assert_eq!(delay_secs, None);
runtime_config::patch_str_config_for_test(
"quantized_rebalance_consumer_group_delay_secs__spans",
Some("420"),
);
let delay_secs = get_rebalance_delay_secs("spans");
assert_eq!(delay_secs, Some(420));
runtime_config::patch_str_config_for_test(
"quantized_rebalance_consumer_group_delay_secs__spans",
Some("garbage"),
);
let delay_secs = get_rebalance_delay_secs("spans");
assert_eq!(delay_secs, None);
// The configured group reads its delay; an unconfigured one stays None.
assert_eq!(get_rebalance_delay_secs("spans"), Some(420));
assert_eq!(get_rebalance_delay_secs("transactions"), None);
}
}
Loading
Loading