From 7f57e3960b603b54aaf6d49416ff3b6ece793891 Mon Sep 17 00:00:00 2001 From: Nathan Baker Date: Mon, 30 Mar 2026 16:27:39 -0400 Subject: [PATCH 1/3] Add ACR IPC protocol messages and extend check data types Protocol-level messages: - Hello - HelloResp - ConfigData - CheckDataMsg - CheckDataAck - CheckResultMsg Add Histogram and EventPlatformEvent data types to the CheckData oneof. Extend MetricType with MonotonicCount and Historate. Remove gRPC service definition as it is no longer needed. --- lib/protos/datadog/build.rs | 8 +- .../datadog/proto/checks/v1/acr_ipc.proto | 73 +++++++++++++++++++ .../datadog/proto/checks/v1/checks.proto | 14 +--- .../checks/v1/event_platform_event.proto | 8 ++ .../datadog/proto/checks/v1/histogram.proto | 12 +++ .../datadog/proto/checks/v1/metric.proto | 2 + 6 files changed, 105 insertions(+), 12 deletions(-) create mode 100644 lib/protos/datadog/proto/checks/v1/acr_ipc.proto create mode 100644 lib/protos/datadog/proto/checks/v1/event_platform_event.proto create mode 100644 lib/protos/datadog/proto/checks/v1/histogram.proto diff --git a/lib/protos/datadog/build.rs b/lib/protos/datadog/build.rs index 4c576e001b7..eebd344f233 100644 --- a/lib/protos/datadog/build.rs +++ b/lib/protos/datadog/build.rs @@ -146,17 +146,21 @@ fn main() { .expect("Failed to build gRPC service definitions for Datadog Agent."); tonic_prost_build::configure() - .build_server(true) + .build_server(false) + .build_client(false) .include_file("checks.mod.rs") .compile_protos( &[ "proto/checks/v1/checks.proto", + "proto/checks/v1/acr_ipc.proto", "proto/checks/v1/metric.proto", "proto/checks/v1/log.proto", "proto/checks/v1/service_check.proto", "proto/checks/v1/event.proto", + "proto/checks/v1/histogram.proto", + "proto/checks/v1/event_platform_event.proto", ], &["proto"], ) - .expect("Failed to build gRPC service definitions for Checks IPC."); + .expect("Failed to build checks proto definitions."); } diff --git a/lib/protos/datadog/proto/checks/v1/acr_ipc.proto b/lib/protos/datadog/proto/checks/v1/acr_ipc.proto new file mode 100644 index 00000000000..e274540af98 --- /dev/null +++ b/lib/protos/datadog/proto/checks/v1/acr_ipc.proto @@ -0,0 +1,73 @@ +syntax = "proto3"; + +package datadog.checks.v1; + +import "checks/v1/checks.proto"; + +// Handshake: client -> server +message Hello { + uint32 protocol_version = 1; + string client_id = 2; + string client_version = 3; +} + +// Handshake: server -> client +message HelloResp { + uint32 protocol_version = 1; + string server_id = 2; + bool accepted = 3; + string reject_reason = 4; +} + +// Configuration: server -> client (unsolicited) +message ConfigData { + EnrichmentData enrichment = 1; + repeated CheckInstance checks = 2; + map agent_config = 3; +} + +message EnrichmentData { + string hostname = 1; + map host_tags = 2; + string cluster_name = 3; + string agent_version = 4; + map config_values = 5; + uint64 process_start_time = 6; + K8sConnectionInfo k8s_connection_info = 7; +} + +message K8sConnectionInfo { + string api_server_url = 1; + string bearer_token = 2; +} + +message CheckInstance { + string check_name = 1; + string instance_name = 2; + string id = 3; + bytes config = 4; +} + +// Check data: client -> server (batched) +message CheckDataMsg { + uint64 sequence_id = 1; + string check_name = 2; + string instance_name = 3; + string check_id = 4; + repeated CheckData data = 5; +} + +// Check data acknowledgement: server -> client +message CheckDataAck { + uint64 sequence_id = 1; + bool success = 2; + string error = 3; +} + +// Check result: client -> server +message CheckResultMsg { + string check_name = 1; + string instance_name = 2; + string check_id = 3; + string error = 4; +} diff --git a/lib/protos/datadog/proto/checks/v1/checks.proto b/lib/protos/datadog/proto/checks/v1/checks.proto index 395f2c7395e..fde9f67beaa 100644 --- a/lib/protos/datadog/proto/checks/v1/checks.proto +++ b/lib/protos/datadog/proto/checks/v1/checks.proto @@ -3,6 +3,8 @@ syntax = "proto3"; package datadog.checks.v1; import "checks/v1/event.proto"; +import "checks/v1/event_platform_event.proto"; +import "checks/v1/histogram.proto"; import "checks/v1/log.proto"; import "checks/v1/metric.proto"; import "checks/v1/service_check.proto"; @@ -13,15 +15,7 @@ message CheckData { datadog.checks.v1.log.Log log = 2; datadog.checks.v1.service_check.ServiceCheck service_check = 3; datadog.checks.v1.event.Event event = 4; + datadog.checks.v1.histogram.Histogram histogram = 5; + datadog.checks.v1.event_platform_event.EventPlatformEvent event_platform_event = 6; } } - -message SendCheckPayloadRequest { - repeated CheckData data = 1; -} - -message SendCheckPayloadResponse {} - -service Checks { - rpc SendCheckPayload(SendCheckPayloadRequest) returns (SendCheckPayloadResponse); -} diff --git a/lib/protos/datadog/proto/checks/v1/event_platform_event.proto b/lib/protos/datadog/proto/checks/v1/event_platform_event.proto new file mode 100644 index 00000000000..b0f6bd5b517 --- /dev/null +++ b/lib/protos/datadog/proto/checks/v1/event_platform_event.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package datadog.checks.v1.event_platform_event; + +message EventPlatformEvent { + string event_data = 1; + string event_type = 2; +} diff --git a/lib/protos/datadog/proto/checks/v1/histogram.proto b/lib/protos/datadog/proto/checks/v1/histogram.proto new file mode 100644 index 00000000000..185885a808a --- /dev/null +++ b/lib/protos/datadog/proto/checks/v1/histogram.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package datadog.checks.v1.histogram; + +message Histogram { + string metric_name = 1; + int64 value = 2; + float lower_bound = 3; + float upper_bound = 4; + int32 monotonic = 5; + repeated string tags = 6; +} diff --git a/lib/protos/datadog/proto/checks/v1/metric.proto b/lib/protos/datadog/proto/checks/v1/metric.proto index f6c06aa1284..92af273a153 100644 --- a/lib/protos/datadog/proto/checks/v1/metric.proto +++ b/lib/protos/datadog/proto/checks/v1/metric.proto @@ -8,6 +8,8 @@ enum MetricType { METRIC_TYPE_RATE = 2; METRIC_TYPE_GAUGE = 3; METRIC_TYPE_HISTOGRAM = 4; + METRIC_TYPE_MONOTONIC_COUNT = 5; + METRIC_TYPE_HISTORATE = 6; } message Metric { From 550525e8353af4c7b3f9ae56cf33ac7fe4fc21ae Mon Sep 17 00:00:00 2001 From: Nathan Baker Date: Wed, 8 Apr 2026 17:07:38 -0400 Subject: [PATCH 2/3] Add gRPC service definition for ACR<->ADP IPC Restore the AcrIpc gRPC service with four RPCs: - Handshake (unary) - SendCheckData (unary with ACK) - SendCheckResult (unary) - StreamConfig (server streaming for config push). Re-enable server and client codegen for the checks proto compilation. --- lib/protos/datadog/build.rs | 4 ++-- .../datadog/proto/checks/v1/acr_ipc.proto | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/lib/protos/datadog/build.rs b/lib/protos/datadog/build.rs index eebd344f233..ec5c02d35a3 100644 --- a/lib/protos/datadog/build.rs +++ b/lib/protos/datadog/build.rs @@ -146,8 +146,8 @@ fn main() { .expect("Failed to build gRPC service definitions for Datadog Agent."); tonic_prost_build::configure() - .build_server(false) - .build_client(false) + .build_server(true) + .build_client(true) .include_file("checks.mod.rs") .compile_protos( &[ diff --git a/lib/protos/datadog/proto/checks/v1/acr_ipc.proto b/lib/protos/datadog/proto/checks/v1/acr_ipc.proto index e274540af98..165d63be364 100644 --- a/lib/protos/datadog/proto/checks/v1/acr_ipc.proto +++ b/lib/protos/datadog/proto/checks/v1/acr_ipc.proto @@ -71,3 +71,22 @@ message CheckResultMsg { string check_id = 3; string error = 4; } + +// Trivial request/response types for RPCs that don't need a body. +message StreamConfigRequest {} +message SendCheckResultResponse {} + +// gRPC service for ACR <-> ADP IPC. +service AcrIpc { + // Handshake — first call after connecting. + rpc Handshake(Hello) returns (HelloResp); + + // Check data submission with acknowledgement. + rpc SendCheckData(CheckDataMsg) returns (CheckDataAck); + + // Check result reporting. + rpc SendCheckResult(CheckResultMsg) returns (SendCheckResultResponse); + + // Server pushes config updates to the client as a stream. + rpc StreamConfig(StreamConfigRequest) returns (stream ConfigData); +} From 40ce8a839ba4e27205b3c6906234f5527f7a1fc2 Mon Sep 17 00:00:00 2001 From: Nathan Baker Date: Tue, 28 Apr 2026 14:03:02 -0400 Subject: [PATCH 3/3] extend Log; histogram -> DDSketch, openmetrics histogram bucket; add init_config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit extend Log: - Add fields to the Log message to support all the metadata the agent and ADP support - The fields map 1:1 onto ADP's internal Log struct Histogram / sketch: - Renamed histogram.proto to sketch.proto and replaced its contents with the DDSketch wire format used by the agent's SketchPayload - CheckData oneof slot 5 is repurposed to carry pre-aggregated sketches Note: lib/saluki-components/src/sources/checks_ipc/mod.rs is left broken — it still imports the old Checks service. init_config: - The check API has both `config` and `init_config`, and many integrations rely on the init_config block for shared defaults - The previous CheckInstance message only carried the per-instance config, so an ACR-scheduled check would receive an empty init_config openmetrics: - Add support for the new openmetrics histogram bucket type --- lib/protos/datadog/build.rs | 3 +- .../datadog/proto/checks/v1/acr_ipc.proto | 7 + .../datadog/proto/checks/v1/checks.proto | 11 +- .../datadog/proto/checks/v1/histogram.proto | 12 - .../proto/checks/v1/histogram_bucket.proto | 49 + lib/protos/datadog/proto/checks/v1/log.proto | 30 + .../datadog/proto/checks/v1/sketch.proto | 46 + .../src/sources/checks_ipc/mod.rs | 848 ++++++------------ 8 files changed, 418 insertions(+), 588 deletions(-) delete mode 100644 lib/protos/datadog/proto/checks/v1/histogram.proto create mode 100644 lib/protos/datadog/proto/checks/v1/histogram_bucket.proto create mode 100644 lib/protos/datadog/proto/checks/v1/sketch.proto diff --git a/lib/protos/datadog/build.rs b/lib/protos/datadog/build.rs index ec5c02d35a3..96562ce91f3 100644 --- a/lib/protos/datadog/build.rs +++ b/lib/protos/datadog/build.rs @@ -157,8 +157,9 @@ fn main() { "proto/checks/v1/log.proto", "proto/checks/v1/service_check.proto", "proto/checks/v1/event.proto", - "proto/checks/v1/histogram.proto", + "proto/checks/v1/sketch.proto", "proto/checks/v1/event_platform_event.proto", + "proto/checks/v1/histogram_bucket.proto", ], &["proto"], ) diff --git a/lib/protos/datadog/proto/checks/v1/acr_ipc.proto b/lib/protos/datadog/proto/checks/v1/acr_ipc.proto index 165d63be364..a9687154050 100644 --- a/lib/protos/datadog/proto/checks/v1/acr_ipc.proto +++ b/lib/protos/datadog/proto/checks/v1/acr_ipc.proto @@ -45,7 +45,14 @@ message CheckInstance { string check_name = 1; string instance_name = 2; string id = 3; + // The per-instance YAML/JSON block (a single entry from the + // integration's `instances:` list). bytes config = 4; + // The integration-level YAML/JSON `init_config:` block, shared + // across every instance of this integration. Required for any check + // that reads defaults at Configure time (credentials, base URLs, + // shared tags, etc.). + bytes init_config = 5; } // Check data: client -> server (batched) diff --git a/lib/protos/datadog/proto/checks/v1/checks.proto b/lib/protos/datadog/proto/checks/v1/checks.proto index fde9f67beaa..32c7b1b4da0 100644 --- a/lib/protos/datadog/proto/checks/v1/checks.proto +++ b/lib/protos/datadog/proto/checks/v1/checks.proto @@ -4,10 +4,11 @@ package datadog.checks.v1; import "checks/v1/event.proto"; import "checks/v1/event_platform_event.proto"; -import "checks/v1/histogram.proto"; +import "checks/v1/histogram_bucket.proto"; import "checks/v1/log.proto"; import "checks/v1/metric.proto"; import "checks/v1/service_check.proto"; +import "checks/v1/sketch.proto"; message CheckData { oneof data { @@ -15,7 +16,13 @@ message CheckData { datadog.checks.v1.log.Log log = 2; datadog.checks.v1.service_check.ServiceCheck service_check = 3; datadog.checks.v1.event.Event event = 4; - datadog.checks.v1.histogram.Histogram histogram = 5; + // Slot 5 was previously `histogram` (an unused stub). Repurposed + // to carry pre-aggregated DDSketches; the wire bytes from any + // legacy producer of the old Histogram message would be + // mis-parsed here, so don't rely on cross-version compatibility + // for slot 5 specifically. + datadog.checks.v1.sketch.Sketch sketch = 5; datadog.checks.v1.event_platform_event.EventPlatformEvent event_platform_event = 6; + datadog.checks.v1.histogram_bucket.HistogramBucket histogram_bucket = 7; } } diff --git a/lib/protos/datadog/proto/checks/v1/histogram.proto b/lib/protos/datadog/proto/checks/v1/histogram.proto deleted file mode 100644 index 185885a808a..00000000000 --- a/lib/protos/datadog/proto/checks/v1/histogram.proto +++ /dev/null @@ -1,12 +0,0 @@ -syntax = "proto3"; - -package datadog.checks.v1.histogram; - -message Histogram { - string metric_name = 1; - int64 value = 2; - float lower_bound = 3; - float upper_bound = 4; - int32 monotonic = 5; - repeated string tags = 6; -} diff --git a/lib/protos/datadog/proto/checks/v1/histogram_bucket.proto b/lib/protos/datadog/proto/checks/v1/histogram_bucket.proto new file mode 100644 index 00000000000..6f41c6af148 --- /dev/null +++ b/lib/protos/datadog/proto/checks/v1/histogram_bucket.proto @@ -0,0 +1,49 @@ +syntax = "proto3"; + +package datadog.checks.v1.histogram_bucket; + +// HistogramBucket carries a pre-aggregated histogram bucket produced +// by an OpenMetrics / Prometheus-style integration. It is the wire +// shape for both the agent's `Sender.HistogramBucket` and +// `Sender.OpenmetricsBucket` upcalls (see +// pkg/aggregator/sender/sender.go on the receiving side); the two +// differ only in whether the producer is emitting many buckets per +// (name, tags) context (`HistogramBucket`, multiple_buckets=true) or +// exactly one (`OpenmetricsBucket`, multiple_buckets=false, the +// default for Python `aggregator.submit_histogram_bucket`). +message HistogramBucket { + // Metric name (e.g. "http.request.duration.count"). + string name = 1; + + // Number of observations counted by this bucket. + int64 value = 2; + + // Bucket bounds. For point observations (single-value buckets), + // both bounds carry the same value. + double lower_bound = 3; + double upper_bound = 4; + + // True when value increases monotonically across calls and the + // receiver should compute per-bucket deltas client-side. False + // when the producer already emits deltas. + bool monotonic = 5; + + // Hostname this bucket was produced on. Empty means "use the + // receiving agent's default hostname". + string hostname = 6; + + // Tags in `key:value` form. + repeated string tags = 7; + + // When `monotonic=true`, indicates the receiver should emit the + // first observed value rather than wait for a second to compute a + // delta. Producers set this when they know the bucket started from + // zero recently (e.g. a freshly-scraped target). + bool flush_first_value = 8; + + // Distinguishes the two sender upcalls. False (the default) + // routes to `Sender.OpenmetricsBucket`; true routes to + // `Sender.HistogramBucket`. Python `submit_histogram_bucket` + // always lands as false. + bool multiple_buckets = 9; +} diff --git a/lib/protos/datadog/proto/checks/v1/log.proto b/lib/protos/datadog/proto/checks/v1/log.proto index baa60795090..79b912cb87b 100644 --- a/lib/protos/datadog/proto/checks/v1/log.proto +++ b/lib/protos/datadog/proto/checks/v1/log.proto @@ -12,7 +12,37 @@ enum LogLevel { LOG_LEVEL_CRITICAL = 50; } +// Log carries a single log record produced by an integration check. +// +// Fields 1 and 2 mirror the original minimal shape and stay for backward +// compatibility. Fields 3-8 are additive: proto3 default-empty values are +// wire-compatible with peers that don't populate them. message Log { string message = 1; LogLevel level = 2; + + // Integration source name (e.g. "mysql", "http_check"). Used by the + // logs intake to route and tag. + string source = 3; + + // Hostname the log was produced on. Empty means "use the receiving + // agent's default hostname". + string hostname = 4; + + // Service the log relates to. Surfaces as `service` in the intake. + string service = 5; + + // Log-level tags, in `key:value` form (matches the agent's wire + // format used elsewhere). Combined with host / integration tags + // downstream. + repeated string tags = 6; + + // Unix-nanosecond timestamp at which the producer believes the log + // event happened. Zero means "stamp at receive time". + int64 timestamp = 7; + + // Free-form additional structured fields. Each value is a JSON-encoded + // string so producers can convey arbitrary scalar / array / object + // shapes through a stable wire type. + map additional_properties = 8; } diff --git a/lib/protos/datadog/proto/checks/v1/sketch.proto b/lib/protos/datadog/proto/checks/v1/sketch.proto new file mode 100644 index 00000000000..4d2d2c6ab07 --- /dev/null +++ b/lib/protos/datadog/proto/checks/v1/sketch.proto @@ -0,0 +1,46 @@ +syntax = "proto3"; + +package datadog.checks.v1.sketch; + +// Sketch carries a pre-aggregated DDSketch produced by a check. +// +// The wire format is aligned with the Datadog Agent's sketch_payload +// format (see DataDog/agent-payload v5 SketchPayload.Sketch.Dogsketch) +// so receivers can convert directly to the agent's internal Sketch +// type without a separate sketches-go round-trip. +message Sketch { + // Metric name (e.g. "http.request.duration"). + string name = 1; + + // Tags in `key:value` form. + repeated string tags = 2; + + // Hostname this sketch was produced on. Empty means "use the + // receiving agent's default hostname". + string hostname = 3; + + // Unix-second timestamp at which the producer flushed the sketch. + int64 timestamp = 4; + + // Flush interval the producer used; informational only — the + // receiving aggregator stamps its own flush boundary. + uint64 interval_secs = 5; + + // The DDSketch body itself, in the agent's bin-encoded shape. + Dogsketch dogsketch = 6; +} + +// Dogsketch is the bin-encoded DDSketch payload. Bin keys (k) and +// counts (n) are parallel arrays — k[i] is the bin index and n[i] the +// number of observations in that bin. This matches the Datadog Agent's +// SketchPayload.Sketch.Dogsketch definition exactly. +message Dogsketch { + int64 ts = 1; + int64 cnt = 2; + double min = 3; + double max = 4; + double avg = 5; + double sum = 6; + repeated sint32 k = 7; + repeated uint32 n = 8; +} diff --git a/lib/saluki-components/src/sources/checks_ipc/mod.rs b/lib/saluki-components/src/sources/checks_ipc/mod.rs index acc3dd05387..0125f248a20 100644 --- a/lib/saluki-components/src/sources/checks_ipc/mod.rs +++ b/lib/saluki-components/src/sources/checks_ipc/mod.rs @@ -1,22 +1,25 @@ -use std::sync::{Arc, LazyLock}; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::LazyLock; use std::time::Duration; use async_trait::async_trait; use datadog_protos::checks::{ + acr_ipc_server::{AcrIpc, AcrIpcServer}, check_data::Data, - checks_server::{Checks, ChecksServer}, - event::{AlertType as ProtoAlertType, Event as ProtoEvent, Priority as ProtoPriority}, - log::{Log as ProtoLog, LogLevel}, - metric::{Metric as ProtoMetric, MetricType}, - service_check::{ServiceCheck as ProtoServiceCheck, Status as ServiceCheckStatus}, - SendCheckPayloadRequest, SendCheckPayloadResponse, + log::LogLevel, + metric::MetricType, + service_check::Status as SvcStatus, + CheckDataAck, CheckDataMsg, CheckResultMsg, ConfigData, Hello, HelloResp, SendCheckResultResponse, + StreamConfigRequest, }; +use futures::stream::{self, Stream}; use resource_accounting::{MemoryBounds, MemoryBoundsBuilder}; use saluki_common::task::HandleExt as _; use saluki_config::GenericConfiguration; use saluki_context::tags::{Tag, TagSet}; use saluki_context::Context; -use saluki_core::data_model::event::eventd::{AlertType, EventD, Priority}; +use saluki_core::data_model::event::eventd::EventD; use saluki_core::data_model::event::log::Log; use saluki_core::data_model::event::metric::Metric; use saluki_core::data_model::event::service_check::{CheckStatus, ServiceCheck}; @@ -26,24 +29,41 @@ use saluki_core::{ components::{sources::*, ComponentContext}, data_model::event::log::LogStatus, }; -use saluki_error::{generic_error, GenericError}; +use saluki_error::{generic_error, ErrorContext as _, GenericError}; use saluki_io::net::ListenAddress; use serde::Deserialize; +use serde_json::Value as JsonValue; use stringtheory::MetaString; use tokio::select; use tokio::sync::mpsc; use tonic::transport::Server; use tonic::{Response, Status}; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, warn}; const fn default_grpc_endpoint() -> ListenAddress { ListenAddress::any_tcp(5105) } +/// AcrIpc protocol version this server speaks. Bumped on +/// wire-incompatible changes; today the protocol is wire-additive so +/// we stay on 1. +const PROTOCOL_VERSION: u32 = 1; + +/// Identifier surfaced in the Handshake response so a client can tell +/// which server flavor it connected to. +const SERVER_ID: &str = "saluki-checks-ipc"; + +// Named outputs the source dispatches into. Defined as constants so the +// `outputs()` declaration and the per-event dispatch match can't drift. +const OUTPUT_METRICS: &str = "metrics"; +const OUTPUT_LOGS: &str = "logs"; +const OUTPUT_EVENTS: &str = "events"; +const OUTPUT_SERVICE_CHECKS: &str = "service_checks"; + /// Checks IPC source. #[derive(Debug, Deserialize)] pub struct ChecksIPCConfiguration { - #[serde(rename = "checks_ipc_endpoint", default = "default_grpc_endpoint")] + #[serde(default = "default_grpc_endpoint")] grpc_endpoint: ListenAddress, } @@ -59,10 +79,10 @@ impl SourceBuilder for ChecksIPCConfiguration { fn outputs(&self) -> &[OutputDefinition] { static OUTPUTS: LazyLock>> = LazyLock::new(|| { vec![ - OutputDefinition::named_output("metrics", EventType::Metric), - OutputDefinition::named_output("logs", EventType::Log), - OutputDefinition::named_output("events", EventType::EventD), - OutputDefinition::named_output("service_checks", EventType::ServiceCheck), + OutputDefinition::named_output(OUTPUT_METRICS, EventType::Metric), + OutputDefinition::named_output(OUTPUT_LOGS, EventType::Log), + OutputDefinition::named_output(OUTPUT_EVENTS, EventType::EventD), + OutputDefinition::named_output(OUTPUT_SERVICE_CHECKS, EventType::ServiceCheck), ] }); @@ -95,11 +115,11 @@ impl Source for ChecksIPC { let (events_tx, mut events_rx) = mpsc::channel(16); - let grpc_server = Server::builder().add_service(ChecksServer::new(ChecksService { events_tx })); + let grpc_server = Server::builder().add_service(AcrIpcServer::new(AcrIpcService { events_tx })); let grpc_socket_addr = match self.grpc_endpoint { ListenAddress::Tcp(addr) => addr, - _ => return Err(generic_error!("OTLP gRPC endpoint must be a TCP address.")), + _ => return Err(generic_error!("AcrIpc gRPC endpoint must be a TCP address.")), }; context .topology_context() @@ -118,14 +138,15 @@ impl Source for ChecksIPC { _ = health.live() => continue, Some(event) = events_rx.recv() => { let output_name = match &event { - Event::Metric(_) => "metrics", - Event::Log(_) => "logs", - Event::EventD(_) => "events", - Event::ServiceCheck(_) => "service_checks", + Event::Metric(_) => OUTPUT_METRICS, + Event::Log(_) => OUTPUT_LOGS, + Event::EventD(_) => OUTPUT_EVENTS, + Event::ServiceCheck(_) => OUTPUT_SERVICE_CHECKS, _ => continue, }; - - if let Err(e) = context.dispatcher().dispatch_one_named(output_name, event).await { + let buffered = context.dispatcher().buffered_named(output_name) + .error_context("Failed to get buffered dispatcher")?; + if let Err(e) = buffered.send_all([event]).await { warn!("Failed to dispatch {output_name} event: {:?}", e); } }, @@ -137,589 +158,270 @@ impl Source for ChecksIPC { } } -struct ChecksService { +/// Server-side implementation of the AcrIpc gRPC service. ACR-shaped +/// clients (the agent-check-runner crate's IPC destination, the in-tree +/// fake test server, etc.) connect, hand off check data, and receive +/// per-batch acks. This service pushes received events through +/// `events_tx` to the source's run loop, which fans them out to the +/// topology's named outputs. +struct AcrIpcService { events_tx: mpsc::Sender, } #[async_trait] -impl Checks for ChecksService { - async fn send_check_payload( - &self, request: tonic::Request, - ) -> Result, Status> { - trace!("Received check payload."); - - let payload = request.into_inner(); - for check_data in payload.data.into_iter().filter_map(|data| data.data) { - let Some(event) = check_data_to_event(check_data) else { - continue; - }; - - if let Err(e) = self.events_tx.send(event).await { - warn!("Failed to send check event: {:?}", e); - } - } - - Ok(Response::new(SendCheckPayloadResponse {})) +impl AcrIpc for AcrIpcService { + async fn handshake(&self, request: tonic::Request) -> Result, Status> { + let hello = request.into_inner(); + info!( + client_id = %hello.client_id, + client_version = %hello.client_version, + client_protocol_version = hello.protocol_version, + "ACR client connected.", + ); + Ok(Response::new(HelloResp { + protocol_version: PROTOCOL_VERSION, + server_id: SERVER_ID.to_string(), + accepted: true, + reject_reason: String::new(), + })) } -} -fn check_data_to_event(check_data: Data) -> Option { - // Each arm exhaustively destructures its proto message (no `..`) so adding a new field - // upstream becomes a compile error here until it's mapped or explicitly ignored. - match check_data { - Data::Metric(metric) => { - let ProtoMetric { - r#type, - name, - value, - timestamp, - tags, - hostname, - interval_secs, - } = metric; - - let metric_type = MetricType::try_from(r#type).ok()?; - - let tags = tags.into_iter().map(Tag::from).collect::(); - let context = Context::from_parts(name, tags.into_shared()); - let mut metric = match metric_type { - MetricType::Counter => Metric::counter(context, (timestamp, value)), - MetricType::Gauge => Metric::gauge(context, (timestamp, value)), - MetricType::Rate => { - if interval_secs == 0 { - warn!("Received rate metric from check with interval of zero. Skipping."); - return None; - } - Metric::rate(context, (timestamp, value), Duration::from_secs(interval_secs)) - } - MetricType::Histogram => Metric::histogram(context, (timestamp, value)), - MetricType::Unspecified => { - warn!("Received metric with unspecified type. Skipping."); - return None; - } + async fn send_check_data( + &self, request: tonic::Request, + ) -> Result, Status> { + let msg = request.into_inner(); + let sequence_id = msg.sequence_id; + + for check_data in msg.data.into_iter().filter_map(|d| d.data) { + let event = match data_to_event(check_data) { + Some(e) => e, + None => continue, }; - if !hostname.is_empty() { - metric.metadata_mut().set_hostname(Arc::from(hostname)); + if let Err(e) = self.events_tx.send(event).await { + warn!("Failed to forward event to source pipeline: {:?}", e); + return Ok(Response::new(CheckDataAck { + sequence_id, + success: false, + error: format!("forward failed: {}", e), + })); } - Some(Event::Metric(metric)) } - Data::Log(log) => { - let ProtoLog { message, level } = log; - let level = LogLevel::try_from(level).ok()?; - let status = log_level_to_log_status(level); + Ok(Response::new(CheckDataAck { + sequence_id, + success: true, + error: String::new(), + })) + } - Some(Event::Log(Log::new(message).with_status(status))) - } - Data::Event(event) => { - let ProtoEvent { - title, - text, - priority, - hostname, - tags, - alert_type, - aggregation_key, - source_type_name, - timestamp, - } = event; - - let tags = tags.into_iter().map(Tag::from).collect::(); - let mut eventd = EventD::new(title, text) - .with_timestamp(timestamp) - .with_tags(tags.into_shared()); - - if !hostname.is_empty() { - eventd.set_hostname(MetaString::from(hostname)); - } - if !aggregation_key.is_empty() { - eventd.set_aggregation_key(MetaString::from(aggregation_key)); - } - if !source_type_name.is_empty() { - eventd.set_source_type_name(MetaString::from(source_type_name)); - } - if let Some(p) = ProtoPriority::try_from(priority) - .ok() - .and_then(proto_priority_to_priority) - { - eventd.set_priority(p); - } - if let Some(a) = ProtoAlertType::try_from(alert_type) - .ok() - .and_then(proto_alert_type_to_alert_type) - { - eventd.set_alert_type(a); - } - Some(Event::EventD(eventd)) - } - Data::ServiceCheck(sc) => { - let ProtoServiceCheck { - status, - name, - message, - tags, - hostname, - } = sc; - - let Some(status) = ServiceCheckStatus::try_from(status) - .ok() - .and_then(service_check_status_to_check_status) - else { - warn!( - "Received service check with unspecified or invalid status: {}. Skipping.", - status - ); - return None; - }; - let tags = tags.into_iter().map(Tag::from).collect::(); - let mut service_check = ServiceCheck::new(name, status) - .with_message(MetaString::from(message)) - .with_tags(tags.into_shared()); - if !hostname.is_empty() { - service_check.set_hostname(MetaString::from(hostname)); - } - Some(Event::ServiceCheck(service_check)) + async fn send_check_result( + &self, request: tonic::Request, + ) -> Result, Status> { + let msg = request.into_inner(); + if msg.error.is_empty() { + debug!( + check_name = %msg.check_name, + check_id = %msg.check_id, + "Check completed successfully." + ); + } else { + warn!( + check_name = %msg.check_name, + check_id = %msg.check_id, + error = %msg.error, + "Check completed with error." + ); } + Ok(Response::new(SendCheckResultResponse {})) } -} -fn log_level_to_log_status(log_level: LogLevel) -> LogStatus { - match log_level { - LogLevel::Trace => LogStatus::Trace, - LogLevel::Debug => LogStatus::Debug, - LogLevel::Info => LogStatus::Info, - LogLevel::Warning => LogStatus::Warning, - LogLevel::Error => LogStatus::Error, - LogLevel::Critical => LogStatus::Emergency, - _ => LogStatus::Info, - } -} + type StreamConfigStream = Pin> + Send + 'static>>; -fn service_check_status_to_check_status(status: ServiceCheckStatus) -> Option { - match status { - ServiceCheckStatus::Ok => Some(CheckStatus::Ok), - ServiceCheckStatus::Warning => Some(CheckStatus::Warning), - ServiceCheckStatus::Critical => Some(CheckStatus::Critical), - ServiceCheckStatus::Unknown => Some(CheckStatus::Unknown), - ServiceCheckStatus::Unspecified => None, + async fn stream_config( + &self, _request: tonic::Request, + ) -> Result, Status> { + // ADP is purely a check-data sink; it has no autodiscovery + // configs to push back to the client. Hold the stream open + // (forever pending) so the client doesn't tight-loop reconnect + // — its `Ok(None)` arm treats stream completion as a signal to + // reconnect. The stream stays alive until the underlying + // connection drops, at which point the gRPC layer terminates + // the call. + Ok(Response::new(Box::pin(stream::pending()))) } } -fn proto_priority_to_priority(priority: ProtoPriority) -> Option { - match priority { - ProtoPriority::Normal => Some(Priority::Normal), - ProtoPriority::Low => Some(Priority::Low), - ProtoPriority::Unspecified => None, +/// Translates a single received `CheckData` payload into a saluki `Event`, +/// returning `None` for variants that the source intentionally drops +/// (unsupported payload shapes, malformed enums, etc). +fn data_to_event(data: Data) -> Option { + match data { + Data::Metric(metric) => metric_to_event(metric), + Data::Log(log) => Some(log_to_event(log)), + Data::Event(event) => Some(eventd_to_event(event)), + Data::ServiceCheck(sc) => service_check_to_event(sc), + Data::Sketch(_) => { + // DDSketch reception requires a public `DDSketch::from_bins` + // (or equivalent) constructor in saluki's ddsketch crate; + // today `insert_raw_bin` is `pub(crate)` only. Drop the + // payload for now — the wire format and ACR-side encoder + // are in place so the unblock is purely a follow-up on the + // ddsketch crate's surface area. + debug!("AcrIpc Sketch payload received; sketch ingestion not yet implemented."); + None + } + Data::EventPlatformEvent(_) => { + // Event-platform events are an Agent-internal event-pipeline + // concept (DBM, NPM, etc.) with no native saluki Event + // equivalent. ADP intentionally doesn't surface them. + debug!("AcrIpc EventPlatformEvent payload received; not handled by checks_ipc source."); + None + } + Data::HistogramBucket(_) => { + // Pre-aggregated buckets target the agent's + // `Sender.HistogramBucket` / `Sender.OpenmetricsBucket` + // upcalls; saluki's metric pipeline has no equivalent + // ingestion path today. + debug!("AcrIpc HistogramBucket payload received; not handled by checks_ipc source."); + None + } } } -fn proto_alert_type_to_alert_type(alert_type: ProtoAlertType) -> Option { - match alert_type { - ProtoAlertType::Info => Some(AlertType::Info), - ProtoAlertType::Error => Some(AlertType::Error), - ProtoAlertType::Warning => Some(AlertType::Warning), - ProtoAlertType::Success => Some(AlertType::Success), - ProtoAlertType::Unspecified => None, - } -} +fn metric_to_event(metric: datadog_protos::checks::metric::Metric) -> Option { + let metric_type = MetricType::try_from(metric.r#type).ok()?; + let context = Context::from_parts(metric.name, proto_tags_to_tagset(metric.tags)); -#[cfg(test)] -mod tests { - use datadog_protos::checks::{ - check_data::Data, - event::Event as ProtoEvent, - log::Log as ProtoLog, - metric::{Metric as ProtoMetric, MetricType as ProtoMetricType}, - service_check::{ServiceCheck as ProtoServiceCheck, Status as ProtoServiceCheckStatus}, + let event = match metric_type { + MetricType::Counter => Metric::counter(context, (metric.timestamp, metric.value)), + MetricType::Gauge => Metric::gauge(context, (metric.timestamp, metric.value)), + MetricType::Rate => { + if metric.interval_secs == 0 { + warn!("Received rate metric from check with interval of zero. Skipping."); + return None; + } + Metric::rate( + context, + (metric.timestamp, metric.value), + Duration::from_secs(metric.interval_secs), + ) + } + MetricType::Histogram => Metric::histogram(context, (metric.timestamp, metric.value)), + MetricType::MonotonicCount => { + // Monotonic counts ship absolute readings (e.g. /proc/stat + // ticks); the receiver is responsible for diffing + // successive samples to produce the delta. Saluki's + // aggregator has no monotonic-aware sink — forwarding as a + // Counter would sum absolute values into nonsense. Drop + // until ADP grows a monotonic ingest path. + debug!("AcrIpc MonotonicCount metric received; not yet supported by checks_ipc source."); + return None; + } + MetricType::Historate => { + // Historate is a rate over histogram buckets; degrading to + // Histogram loses the rate semantics. No native saluki + // equivalent today. + debug!("AcrIpc Historate metric received; not yet supported by checks_ipc source."); + return None; + } + MetricType::Unspecified => { + warn!("Received metric with unspecified type. Skipping."); + return None; + } }; - use saluki_core::data_model::event::metric::MetricValues; - - use super::*; - - fn metric_data( - r#type: i32, name: &str, value: f64, timestamp: u64, interval_secs: u64, tags: &[&str], hostname: &str, - ) -> Data { - Data::Metric(ProtoMetric { - r#type, - name: name.to_string(), - value, - timestamp, - tags: tags.iter().map(|t| (*t).to_string()).collect(), - hostname: hostname.to_string(), - interval_secs, - }) - } - fn log_data(level: i32, message: &str) -> Data { - Data::Log(ProtoLog { - message: message.to_string(), - level, - }) - } + Some(Event::Metric(event)) +} - fn event_data(title: &str, text: &str, timestamp: u64, tags: &[&str], hostname: &str) -> Data { - Data::Event(ProtoEvent { - title: title.to_string(), - text: text.to_string(), - priority: 0, - hostname: hostname.to_string(), - tags: tags.iter().map(|t| (*t).to_string()).collect(), - alert_type: 0, - aggregation_key: String::new(), - source_type_name: String::new(), - timestamp, - }) - } +fn log_to_event(log: datadog_protos::checks::log::Log) -> Event { + let status = match LogLevel::try_from(log.level) { + Ok(level) => Some(log_level_to_log_status(level)), + Err(_) => None, + }; - fn service_check_data(status: i32, name: &str, message: &str, tags: &[&str], hostname: &str) -> Data { - Data::ServiceCheck(ProtoServiceCheck { - status, - name: name.to_string(), - message: message.to_string(), - tags: tags.iter().map(|t| (*t).to_string()).collect(), - hostname: hostname.to_string(), + // ACR encodes additional_properties values as JSON-encoded strings + // (since proto's map can't carry typed payloads + // directly); decode each back to a JsonValue for saluki's typed + // map. Entries that fail to parse are dropped — losing one + // attribute is preferable to losing the whole log. + let additional_properties: HashMap = log + .additional_properties + .into_iter() + .filter_map(|(k, v)| { + serde_json::from_str::(&v) + .ok() + .map(|val| (MetaString::from(k), val)) }) - } - - #[test] - fn metric_counter_conversion() { - let event = check_data_to_event(metric_data( - ProtoMetricType::Counter as i32, - "my_counter", - 1.0, - 1234, - 0, - &["tag1:value1", "tag2:value2"], - "", - )) - .expect("counter should convert"); - - let Event::Metric(metric) = event else { - panic!("expected Metric event"); - }; - assert_eq!(metric.context().name().as_ref(), "my_counter"); - assert!(metric.context().tags().has_tag("tag1:value1")); - assert!(metric.context().tags().has_tag("tag2:value2")); - assert!(matches!(metric.values(), MetricValues::Counter(_))); - } - - #[test] - fn metric_gauge_conversion() { - let event = check_data_to_event(metric_data( - ProtoMetricType::Gauge as i32, - "my_gauge", - 42.0, - 1234, - 0, - &[], - "", - )) - .expect("gauge should convert"); - let Event::Metric(metric) = event else { - panic!("expected Metric event"); - }; - assert!(matches!(metric.values(), MetricValues::Gauge(_))); - } - - #[test] - fn metric_histogram_conversion() { - let event = check_data_to_event(metric_data( - ProtoMetricType::Histogram as i32, - "my_hist", - 1.0, - 1234, - 0, - &[], - "", - )) - .expect("histogram should convert"); - let Event::Metric(metric) = event else { - panic!("expected Metric event"); - }; - assert!(matches!(metric.values(), MetricValues::Histogram(_))); - } - - #[test] - fn metric_rate_conversion_uses_interval() { - let event = check_data_to_event(metric_data( - ProtoMetricType::Rate as i32, - "my_rate", - 10.0, - 1234, - 60, - &[], - "", - )) - .expect("rate should convert"); - let Event::Metric(metric) = event else { - panic!("expected Metric event"); - }; - match metric.values() { - MetricValues::Rate(_, interval) => assert_eq!(*interval, Duration::from_secs(60)), - other => panic!("expected Rate values, got {other:?}"), - } - } - - #[test] - fn metric_rate_with_zero_interval_is_skipped() { - let event = check_data_to_event(metric_data( - ProtoMetricType::Rate as i32, - "my_rate", - 10.0, - 1234, - 0, - &[], - "", - )); - assert!(event.is_none(), "rate with zero interval must be skipped"); - } - - #[test] - fn metric_unspecified_type_is_skipped() { - let event = check_data_to_event(metric_data( - ProtoMetricType::Unspecified as i32, - "x", - 1.0, - 1234, - 0, - &[], - "", - )); - assert!(event.is_none(), "unspecified metric type must be skipped"); - } - - #[test] - fn metric_unknown_type_is_skipped() { - // Any i32 outside the proto enum range fails MetricType::try_from. - let event = check_data_to_event(metric_data(99, "x", 1.0, 1234, 0, &[], "")); - assert!(event.is_none(), "unknown metric type must be skipped"); - } - - #[test] - fn log_unknown_level_is_skipped() { - // 99 is not part of the LogLevel proto enum, so try_from returns Err. - let event = check_data_to_event(log_data(99, "hello")); - assert!(event.is_none(), "unknown log level must be skipped"); - } + .collect(); + + let mut out = Log::new(log.message) + .with_status(status) + .with_source(string_to_meta_opt(log.source)) + .with_hostname(string_to_meta_opt(log.hostname)) + .with_service(string_to_meta_opt(log.service)) + .with_tags(Some(proto_tags_to_tagset(log.tags))); + if !additional_properties.is_empty() { + out = out.with_additional_properties(Some(additional_properties)); + } + Event::Log(out) +} - #[test] - fn event_conversion_preserves_fields() { - let event = check_data_to_event(event_data("title", "body", 1234, &["env:prod", "team:foo"], "")) - .expect("event should convert"); - let Event::EventD(ev) = event else { - panic!("expected EventD event"); - }; - assert_eq!(ev.title(), "title"); - assert_eq!(ev.text(), "body"); - assert_eq!(ev.timestamp(), Some(1234)); - assert!(ev.tags().has_tag("env:prod")); - assert!(ev.tags().has_tag("team:foo")); - } +fn eventd_to_event(event: datadog_protos::checks::event::Event) -> Event { + Event::EventD( + EventD::new(event.title, event.text) + .with_timestamp(event.timestamp) + .with_tags(proto_tags_to_tagset(event.tags)), + ) +} - #[test] - fn service_check_status_mapping() { - let cases = [ - (ProtoServiceCheckStatus::Ok, CheckStatus::Ok), - (ProtoServiceCheckStatus::Warning, CheckStatus::Warning), - (ProtoServiceCheckStatus::Critical, CheckStatus::Critical), - (ProtoServiceCheckStatus::Unknown, CheckStatus::Unknown), - ]; - - for (proto_status, expected) in cases { - let event = check_data_to_event(service_check_data(proto_status as i32, "n", "m", &[], "")) - .unwrap_or_else(|| panic!("status {proto_status:?} should convert")); - let Event::ServiceCheck(sc) = event else { - panic!("expected ServiceCheck event for {proto_status:?}"); - }; - assert_eq!(sc.status(), expected, "status {proto_status:?}"); +fn service_check_to_event(sc: datadog_protos::checks::service_check::ServiceCheck) -> Option { + let status = match SvcStatus::try_from(sc.status) { + Ok(SvcStatus::Ok) => CheckStatus::Ok, + Ok(SvcStatus::Warning) => CheckStatus::Warning, + Ok(SvcStatus::Critical) => CheckStatus::Critical, + Ok(SvcStatus::Unknown) => CheckStatus::Unknown, + Ok(SvcStatus::Unspecified) | Err(_) => { + warn!( + "Received service check with unspecified/invalid status: {}. Skipping.", + sc.status + ); + return None; } - } - - #[test] - fn service_check_unspecified_status_is_skipped() { - let event = check_data_to_event(service_check_data( - ProtoServiceCheckStatus::Unspecified as i32, - "n", - "m", - &[], - "", - )); - assert!(event.is_none(), "service check with unspecified status must be skipped"); - } - - #[test] - fn service_check_unknown_status_value_is_skipped() { - // 99 is outside the proto Status enum, so try_from returns Err. - let event = check_data_to_event(service_check_data(99, "n", "m", &[], "")); - assert!( - event.is_none(), - "service check with out-of-range status must be skipped" - ); - } - - #[test] - fn service_check_preserves_name_message_and_tags() { - let event = check_data_to_event(service_check_data( - ProtoServiceCheckStatus::Ok as i32, - "my.check", - "all good", - &["env:prod"], - "", - )) - .expect("service check should convert"); - let Event::ServiceCheck(sc) = event else { - panic!("expected ServiceCheck event"); - }; - assert_eq!(sc.name(), "my.check"); - assert_eq!(sc.status(), CheckStatus::Ok); - assert_eq!(sc.message(), Some("all good")); - assert!(sc.tags().has_tag("env:prod")); - } - - #[test] - fn metric_hostname_propagates() { - let event = check_data_to_event(metric_data( - ProtoMetricType::Counter as i32, - "n", - 1.0, - 0, - 0, - &[], - "host-a", - )) - .expect("metric should convert"); - let Event::Metric(m) = event else { - panic!("expected Metric event"); - }; - assert_eq!(m.metadata().hostname(), Some("host-a")); - } - - #[test] - fn metric_empty_hostname_stays_unset() { - let event = check_data_to_event(metric_data(ProtoMetricType::Counter as i32, "n", 1.0, 0, 0, &[], "")) - .expect("metric should convert"); - let Event::Metric(m) = event else { - panic!("expected Metric event"); - }; - assert_eq!(m.metadata().hostname(), None); - } - - #[test] - fn eventd_hostname_propagates() { - let event = check_data_to_event(event_data("title", "body", 0, &[], "host-b")).expect("event should convert"); - let Event::EventD(ev) = event else { - panic!("expected EventD event"); - }; - assert_eq!(ev.hostname(), Some("host-b")); - } - - #[test] - fn eventd_empty_hostname_stays_unset() { - let event = check_data_to_event(event_data("title", "body", 0, &[], "")).expect("event should convert"); - let Event::EventD(ev) = event else { - panic!("expected EventD event"); - }; - assert_eq!(ev.hostname(), None); - } - - #[test] - fn service_check_hostname_propagates() { - let event = check_data_to_event(service_check_data( - ProtoServiceCheckStatus::Ok as i32, - "n", - "m", - &[], - "host-c", - )) - .expect("service check should convert"); - let Event::ServiceCheck(sc) = event else { - panic!("expected ServiceCheck event"); - }; - assert_eq!(sc.hostname(), Some("host-c")); - } - - #[test] - fn service_check_empty_hostname_stays_unset() { - let event = check_data_to_event(service_check_data( - ProtoServiceCheckStatus::Ok as i32, - "n", - "m", - &[], - "", - )) - .expect("service check should convert"); - let Event::ServiceCheck(sc) = event else { - panic!("expected ServiceCheck event"); - }; - assert_eq!(sc.hostname(), None); - } - - #[test] - fn eventd_priority_propagates() { - let event = check_data_to_event(Data::Event(ProtoEvent { - priority: ProtoPriority::Low as i32, - ..Default::default() - })) - .expect("event should convert"); - let Event::EventD(ev) = event else { - panic!("expected EventD event"); - }; - assert_eq!(ev.priority(), Some(Priority::Low)); - } - - #[test] - fn eventd_alert_type_propagates() { - let event = check_data_to_event(Data::Event(ProtoEvent { - alert_type: ProtoAlertType::Warning as i32, - ..Default::default() - })) - .expect("event should convert"); - let Event::EventD(ev) = event else { - panic!("expected EventD event"); - }; - assert_eq!(ev.alert_type(), Some(AlertType::Warning)); - } + }; + Some(Event::ServiceCheck( + ServiceCheck::new(sc.name, status) + .with_message(MetaString::from(sc.message)) + .with_hostname(string_to_meta_opt(sc.hostname)) + .with_tags(proto_tags_to_tagset(sc.tags)), + )) +} - #[test] - fn eventd_aggregation_key_propagates() { - let event = check_data_to_event(Data::Event(ProtoEvent { - aggregation_key: "agg-key-1".to_string(), - ..Default::default() - })) - .expect("event should convert"); - let Event::EventD(ev) = event else { - panic!("expected EventD event"); - }; - assert_eq!(ev.aggregation_key(), Some("agg-key-1")); +/// Empty proto strings represent "field not set" in proto3; map them to +/// `None` so saluki's optional-MetaString builders stay accurate rather +/// than treating empty as a real value. +fn string_to_meta_opt(s: String) -> Option { + if s.is_empty() { + None + } else { + Some(MetaString::from(s)) } +} - #[test] - fn eventd_source_type_name_propagates() { - let event = check_data_to_event(Data::Event(ProtoEvent { - source_type_name: "my-source".to_string(), - ..Default::default() - })) - .expect("event should convert"); - let Event::EventD(ev) = event else { - panic!("expected EventD event"); - }; - assert_eq!(ev.source_type_name(), Some("my-source")); - } +/// Converts the proto's `repeated string` tag list into a saluki +/// `TagSet`, consuming each owned string into a `Tag` without an +/// intermediate clone. +fn proto_tags_to_tagset(tags: Vec) -> TagSet { + tags.into_iter().map(Tag::from).collect() +} - #[test] - fn eventd_unspecified_proto_keeps_saluki_defaults() { - // A default-initialized ProtoEvent has priority=0 (Unspecified), alert_type=0 (Unspecified), - // and all strings empty. Our mapping treats Unspecified as "source did not set it", so - // `EventD::new`'s defaults (priority=Normal, alert_type=Info) survive, while the empty - // string fields stay unset. - let event = check_data_to_event(Data::Event(ProtoEvent::default())).expect("event should convert"); - let Event::EventD(ev) = event else { - panic!("expected EventD event"); - }; - assert_eq!(ev.priority(), Some(Priority::Normal)); - assert_eq!(ev.alert_type(), Some(AlertType::Info)); - assert_eq!(ev.aggregation_key(), None); - assert_eq!(ev.source_type_name(), None); - assert_eq!(ev.hostname(), None); +fn log_level_to_log_status(log_level: LogLevel) -> LogStatus { + match log_level { + LogLevel::Trace => LogStatus::Trace, + LogLevel::Debug => LogStatus::Debug, + LogLevel::Info => LogStatus::Info, + LogLevel::Warning => LogStatus::Warning, + LogLevel::Error => LogStatus::Error, + LogLevel::Critical => LogStatus::Emergency, + _ => LogStatus::Info, } }