From 931caeffe2a932eadad2d2b1a59a90162936605c Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 16 Apr 2026 14:17:27 +0200 Subject: [PATCH 01/11] refactor: remove dead code The fact that the batcher is always initialized with default values highlights that something is wrong. It's really difficult to pick sensible values. And in practice nobody care. Signed-off-by: Didier Wenzek --- crates/common/batcher/src/batcher.rs | 12 ++++++++++ crates/common/batcher/src/lib.rs | 36 +--------------------------- 2 files changed, 13 insertions(+), 35 deletions(-) diff --git a/crates/common/batcher/src/batcher.rs b/crates/common/batcher/src/batcher.rs index 9fa7f24d834..b4fd6fd65e2 100644 --- a/crates/common/batcher/src/batcher.rs +++ b/crates/common/batcher/src/batcher.rs @@ -2,6 +2,7 @@ use crate::batch::Batch; use crate::batch::BatchAdd; use crate::batchable::Batchable; use crate::config::BatchConfig; +use crate::BatchConfigBuilder; use time::OffsetDateTime; #[derive(Debug, Eq, PartialEq)] @@ -17,6 +18,17 @@ pub struct Batcher { batches: Vec>, } +impl Default for Batcher { + fn default() -> Self { + let batch_config = BatchConfigBuilder::new() + .event_jitter(500) + .delivery_jitter(400) // Heuristic delay that should work out well on a Rpi + .message_leap_limit(0) + .build(); + Batcher::new(batch_config) + } +} + impl Batcher { /// Create a Batcher with the specified config. pub fn new(config: BatchConfig) -> Batcher { diff --git a/crates/common/batcher/src/lib.rs b/crates/common/batcher/src/lib.rs index 39b91d551f9..f40e287d2dd 100644 --- a/crates/common/batcher/src/lib.rs +++ b/crates/common/batcher/src/lib.rs @@ -27,46 +27,17 @@ use tedge_actors::RuntimeRequestSink; use tedge_actors::SimpleMessageBoxBuilder; pub struct BatchingActorBuilder { - batching_window: u32, - maximum_message_delay: u32, - message_leap_limit: u32, message_box: SimpleMessageBoxBuilder, BatchDriverOutput>, } impl Default for BatchingActorBuilder { fn default() -> Self { BatchingActorBuilder { - batching_window: 500, - maximum_message_delay: 400, // Heuristic delay that should work out well on an Rpi - message_leap_limit: 0, message_box: SimpleMessageBoxBuilder::new("Event batcher", 16), } } } -impl BatchingActorBuilder { - pub fn with_batching_window(self, batching_window: u32) -> Self { - Self { - batching_window, - ..self - } - } - - pub fn with_maximum_message_delay(self, maximum_message_delay: u32) -> Self { - Self { - maximum_message_delay, - ..self - } - } - - pub fn with_message_leap_limit(self, message_leap_limit: u32) -> Self { - Self { - message_leap_limit, - ..self - } - } -} - impl MessageSink> for BatchingActorBuilder { fn get_sender(&self) -> DynSender> { self.message_box.get_sender() @@ -93,12 +64,7 @@ impl Builder> for BatchingActorBuilder { } fn build(self) -> BatchDriver { - let batch_config = BatchConfigBuilder::new() - .event_jitter(self.batching_window) - .delivery_jitter(self.maximum_message_delay) - .message_leap_limit(self.message_leap_limit) - .build(); - let batcher = Batcher::new(batch_config); + let batcher = Batcher::default(); let message_box = self.message_box.build(); BatchDriver::new(batcher, message_box) } From 4e1bc9061fcd075b34672b2e288a4bb05d4a651c Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 16 Apr 2026 17:59:15 +0200 Subject: [PATCH 02/11] The batcher flow transformer batch messages over time windows Signed-off-by: Didier Wenzek --- Cargo.lock | 2 + crates/common/batcher/Cargo.toml | 2 + crates/common/batcher/src/batcher.rs | 2 +- crates/common/batcher/src/flows.rs | 210 +++++++++++++++++++++++++++ crates/common/batcher/src/lib.rs | 2 + crates/core/tedge_mapper/src/lib.rs | 2 + 6 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 crates/common/batcher/src/flows.rs diff --git a/Cargo.lock b/Cargo.lock index ad56bcad075..5b803cad4f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -678,7 +678,9 @@ name = "batcher" version = "2.0.1" dependencies = [ "async-trait", + "serde_json", "tedge_actors", + "tedge_flows", "time", "tokio", ] diff --git a/crates/common/batcher/Cargo.toml b/crates/common/batcher/Cargo.toml index 1a20711f61a..30bc9ecf9ed 100644 --- a/crates/common/batcher/Cargo.toml +++ b/crates/common/batcher/Cargo.toml @@ -10,7 +10,9 @@ repository = { workspace = true } [dependencies] async-trait = { workspace = true } +serde_json = { workspace = true } tedge_actors = { workspace = true } +tedge_flows = { workspace = true } time = { workspace = true } tokio = { workspace = true, features = ["time"] } diff --git a/crates/common/batcher/src/batcher.rs b/crates/common/batcher/src/batcher.rs index b4fd6fd65e2..cd4a147d4c8 100644 --- a/crates/common/batcher/src/batcher.rs +++ b/crates/common/batcher/src/batcher.rs @@ -23,7 +23,7 @@ impl Default for Batcher { let batch_config = BatchConfigBuilder::new() .event_jitter(500) .delivery_jitter(400) // Heuristic delay that should work out well on a Rpi - .message_leap_limit(0) + .message_leap_limit(500) .build(); Batcher::new(batch_config) } diff --git a/crates/common/batcher/src/flows.rs b/crates/common/batcher/src/flows.rs new file mode 100644 index 00000000000..80e90ca7f4b --- /dev/null +++ b/crates/common/batcher/src/flows.rs @@ -0,0 +1,210 @@ +use crate::batcher::BatcherOutput; +use crate::Batchable; +use crate::Batcher; +use serde_json::json; +use serde_json::Value; +use std::time::SystemTime; +use tedge_flows::ConfigError; +use tedge_flows::FlowContextHandle; +use tedge_flows::FlowError; +use tedge_flows::JsonValue; +use tedge_flows::Message; +use tedge_flows::Transformer; +use time::OffsetDateTime; + +#[derive(Default)] +pub struct MessageBatcher { + batcher: Batcher, + batch_topic: String, +} + +impl Batchable for Message { + type Key = String; + + fn key(&self) -> Self::Key { + self.topic.clone() + } + + fn event_time(&self) -> OffsetDateTime { + self.timestamp + .map(|t| t.into()) + .unwrap_or_else(OffsetDateTime::now_utc) + } +} + +impl Clone for MessageBatcher { + fn clone(&self) -> MessageBatcher { + MessageBatcher::default() + } +} + +impl Transformer for MessageBatcher { + fn name(&self) -> &str { + "time-window-batcher" + } + + fn set_config(&mut self, config: JsonValue) -> Result<(), ConfigError> { + // TODO: We should expose : event_jitter, delivery_jitter and message_leap_limit + if let Some(topic) = config.string_property("topic") { + self.batch_topic = topic.to_owned(); + } + Ok(()) + } + + fn on_message( + &mut self, + timestamp: SystemTime, + message: &Message, + _context: &FlowContextHandle, + ) -> Result, FlowError> { + let batches = self + .batcher + .event(timestamp.into(), message.clone()) + .into_iter() + .filter_map(|action| match action { + BatcherOutput::Batch(batch) => Some(batch), + BatcherOutput::Timer(_) => None, + }); + self.batch_message_batches(batches) + } + + fn is_periodic(&self) -> bool { + true + } + + fn on_interval( + &mut self, + timestamp: SystemTime, + _context: &FlowContextHandle, + ) -> Result, FlowError> { + let batches = self.batcher.time(timestamp.into()); + self.batch_message_batches(batches) + } +} + +impl MessageBatcher { + /// Build a message from a batch of messages + /// + /// Assume each message payload can be translated to JSON + /// Build a message which payload is a JSON array of all the messages + fn batch_messages(&self, messages: Vec) -> Result { + let mut batch = vec![]; + + for message in messages { + let Some(utf8_payload) = message.payload_str() else { + return Err(FlowError::UnsupportedMessage( + "Cannot batch non UTF-8 message".to_owned(), + )); + }; + let payload: Value = match serde_json::from_str(utf8_payload) { + Ok(payload) => payload, + Err(_) => json!(utf8_payload), + }; + batch.push(json!({ + "topic": message.topic, + "payload": payload, + })) + } + + Ok(Message::new( + self.batch_topic.clone(), + Value::Array(batch).to_string(), + )) + } + + fn batch_message_batches( + &self, + batches: impl IntoIterator>, + ) -> Result, FlowError> { + batches + .into_iter() + .map(|batch| self.batch_messages(batch)) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn single_event_batch() { + let context = FlowContextHandle::default(); + let mut batcher = MessageBatcher::default(); + batcher + .set_config(json!({"topic": "batch"}).into()) + .unwrap(); + + let now = SystemTime::now(); + let msg = Message::with_timestamp("a/b", "42", now); + assert!(batcher.on_message(now, &msg, &context).unwrap().is_empty()); + + let later = now + Duration::from_secs(5); + let batch = batcher.on_interval(later, &context).unwrap(); + assert_batch_eq( + batch, + "batch", + json!([ + {"topic": "a/b", "payload": 42}, + ]), + ); + } + + #[test] + fn multi_event_batch() { + let context = FlowContextHandle::default(); + let mut batcher = MessageBatcher::default(); + batcher + .set_config(json!({"topic": "batch"}).into()) + .unwrap(); + + let now = SystemTime::now(); + let msg = Message::with_timestamp("payload/num", "42", now); + assert!(batcher.on_message(now, &msg, &context).unwrap().is_empty()); + + let later = now + Duration::from_millis(5); + let msg = Message::with_timestamp("payload/string", r#"124|456.789"#, now); + assert!(batcher + .on_message(later, &msg, &context) + .unwrap() + .is_empty()); + + let later = now + Duration::from_millis(10); + let msg = Message::with_timestamp("payload/json", r#"{"foo": "bar"}"#, now); + assert!(batcher + .on_message(later, &msg, &context) + .unwrap() + .is_empty()); + + let later = now + Duration::from_secs(5); + let batch = batcher.on_interval(later, &context).unwrap(); + assert_batch_eq( + batch, + "batch", + json!([ + {"topic": "payload/num", "payload": 42}, + {"topic": "payload/string", "payload": "124|456.789"}, + {"topic": "payload/json", "payload": {"foo": "bar"}}, + ]), + ); + } + + fn assert_batch_eq(batch: Vec, topic: &str, mut expected_payload: Value) { + expected_payload + .as_array_mut() + .unwrap() + .sort_by_key(|msg| msg.get("topic").unwrap().to_string()); + + assert_eq!(batch.len(), 1); + assert_eq!(batch[0].topic, topic); + let mut actual_payload: Value = + serde_json::from_slice(batch[0].payload.as_slice()).unwrap(); + actual_payload + .as_array_mut() + .unwrap() + .sort_by_key(|msg| msg.get("topic").unwrap().to_string()); + + assert_eq!(actual_payload, expected_payload); + } +} diff --git a/crates/common/batcher/src/lib.rs b/crates/common/batcher/src/lib.rs index f40e287d2dd..b1d082bd6bf 100644 --- a/crates/common/batcher/src/lib.rs +++ b/crates/common/batcher/src/lib.rs @@ -5,6 +5,7 @@ mod batchable; mod batcher; mod config; mod driver; +mod flows; pub use crate::batchable::Batchable; pub use crate::batcher::Batcher; @@ -16,6 +17,7 @@ pub use crate::config::EventBatchConfigBuilder; pub use crate::driver::BatchDriver; pub use crate::driver::BatchDriverInput; pub use crate::driver::BatchDriverOutput; +pub use crate::flows::MessageBatcher; use std::convert::Infallible; use tedge_actors::Builder; use tedge_actors::DynSender; diff --git a/crates/core/tedge_mapper/src/lib.rs b/crates/core/tedge_mapper/src/lib.rs index 37c6cc845ef..4e1f2c3be62 100644 --- a/crates/core/tedge_mapper/src/lib.rs +++ b/crates/core/tedge_mapper/src/lib.rs @@ -326,6 +326,8 @@ fn load_builtin_transformers(flows: &mut impl FlowRegistryExt) { az_mapper_ext::load_builtin_transformers(flows); #[cfg(feature = "aws")] aws_mapper_ext::load_builtin_transformers(flows); + + flows.register_builtin(batcher::MessageBatcher::default()); } pub(crate) async fn mapper_flow_registry( From 81a77dfa28caedc63db72add7bc243a20578fe86 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 21 Apr 2026 13:34:37 +0200 Subject: [PATCH 03/11] breaking change! deprecate tedge-mapper-collectd Signed-off-by: Didier Wenzek --- Cargo.lock | 2 - crates/core/tedge/src/supervisor.rs | 4 +- crates/core/tedge_mapper/Cargo.toml | 2 - .../core/tedge_mapper/src/collectd/mapper.rs | 59 ------------------- crates/core/tedge_mapper/src/collectd/mod.rs | 1 - crates/core/tedge_mapper/src/lib.rs | 6 -- 6 files changed, 2 insertions(+), 72 deletions(-) delete mode 100644 crates/core/tedge_mapper/src/collectd/mapper.rs delete mode 100644 crates/core/tedge_mapper/src/collectd/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 5b803cad4f8..6d79ba1f909 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5051,8 +5051,6 @@ dependencies = [ "camino", "certificate", "clap", - "clock", - "collectd_ext", "flockfile", "mqtt_channel", "rcgen", diff --git a/crates/core/tedge/src/supervisor.rs b/crates/core/tedge/src/supervisor.rs index e007a1de185..577a2dc4795 100644 --- a/crates/core/tedge/src/supervisor.rs +++ b/crates/core/tedge/src/supervisor.rs @@ -561,12 +561,12 @@ mod tests { #[test] fn run_all_logging_considers_supervisor_agent_and_mapper_services() { assert_eq!( - log_service_names(Some(&MapperName::Collectd)), + log_service_names(Some(&MapperName::UserDefined(vec!["local".to_string()]))), vec![ "tedge".to_string(), tedge_agent::AGENT_NAME.to_string(), "tedge-mapper".to_string(), - "tedge-mapper-collectd".to_string(), + "tedge-mapper".to_string(), ] ); } diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index 48373c6dad3..ba71829a669 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -21,8 +21,6 @@ c8y_mapper_ext = { workspace = true, optional = true } camino = { workspace = true } certificate = { workspace = true } clap = { workspace = true } -clock = { workspace = true } -collectd_ext = { workspace = true } flockfile = { workspace = true } mqtt_channel = { workspace = true } serde = { workspace = true } diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs deleted file mode 100644 index 0f5bfda74ec..00000000000 --- a/crates/core/tedge_mapper/src/collectd/mapper.rs +++ /dev/null @@ -1,59 +0,0 @@ -use crate::core::component::TEdgeComponent; -use crate::core::mapper::start_basic_actors; -use async_trait::async_trait; -use batcher::BatchingActorBuilder; -use collectd_ext::actor::CollectdActorBuilder; -use mqtt_channel::QoS; -use mqtt_channel::Topic; -use mqtt_channel::TopicFilter; -use tedge_actors::MessageSink; -use tedge_actors::NoConfig; -use tedge_actors::Runtime; -use tedge_config::TEdgeConfig; -use tedge_utils::paths::TedgePaths; - -const COLLECTD_MAPPER_NAME: &str = "tedge-mapper-collectd"; -const COLLECTD_INPUT_TOPICS: &str = "collectd/#"; -const COLLECTD_OUTPUT_TOPIC: &str = "te/device/main///m/"; - -pub struct CollectdMapper; - -impl CollectdMapper { - fn input_topics() -> TopicFilter { - TopicFilter::new_unchecked(COLLECTD_INPUT_TOPICS).with_qos(QoS::AtMostOnce) - } - - fn output_topic() -> Topic { - Topic::new_unchecked(COLLECTD_OUTPUT_TOPIC) - } -} - -#[async_trait] -impl TEdgeComponent for CollectdMapper { - async fn build( - &self, - tedge_config: TEdgeConfig, - _config_dir: &TedgePaths, - ) -> Result { - let (mut runtime, mut mqtt_actor) = - start_basic_actors(COLLECTD_MAPPER_NAME, &tedge_config).await?; - - let input_topic = CollectdMapper::input_topics(); - let output_topic = CollectdMapper::output_topic(); - - let mut batching_actor = BatchingActorBuilder::default(); - let mut collectd_actor = CollectdActorBuilder::new(input_topic); - - collectd_actor.add_input(&mut mqtt_actor); - batching_actor.connect_source(NoConfig, &mut collectd_actor); - mqtt_actor.connect_mapped_source(NoConfig, &mut batching_actor, move |batch| { - collectd_ext::converter::batch_into_mqtt_messages(&output_topic, batch) - }); - - runtime.spawn(collectd_actor).await?; - runtime.spawn(batching_actor).await?; - runtime.spawn(mqtt_actor).await?; - - Ok(runtime) - } -} diff --git a/crates/core/tedge_mapper/src/collectd/mod.rs b/crates/core/tedge_mapper/src/collectd/mod.rs deleted file mode 100644 index 9b60821cb13..00000000000 --- a/crates/core/tedge_mapper/src/collectd/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod mapper; diff --git a/crates/core/tedge_mapper/src/lib.rs b/crates/core/tedge_mapper/src/lib.rs index 4e1f2c3be62..d3e3e2b6f4e 100644 --- a/crates/core/tedge_mapper/src/lib.rs +++ b/crates/core/tedge_mapper/src/lib.rs @@ -4,7 +4,6 @@ use crate::aws::mapper::AwsMapper; use crate::az::mapper::AzureMapper; #[cfg(feature = "c8y")] use crate::c8y::mapper::CumulocityMapper; -use crate::collectd::mapper::CollectdMapper; use crate::core::component::TEdgeComponent; use crate::custom::mapper::CustomMapper; use anyhow::bail; @@ -70,7 +69,6 @@ pub mod aws; pub mod az; #[cfg(feature = "c8y")] pub mod c8y; -mod collectd; mod core; mod custom; use crate::custom_mapper_resolve::EffectiveMapperConfig; @@ -110,7 +108,6 @@ fn lookup_component(component_name: MapperName) -> anyhow::Result Box::new(AwsMapper { profile: read_and_set_var!(profile, "TEDGE_CLOUD_PROFILE"), }), - MapperName::Collectd => Box::new(CollectdMapper), #[cfg(feature = "c8y")] MapperName::C8y { profile } => Box::new(CumulocityMapper { profile: read_and_set_var!(profile, "TEDGE_CLOUD_PROFILE"), @@ -173,7 +170,6 @@ pub enum MapperName { #[clap(long)] profile: Option, }, - Collectd, /// Run a user-defined mapper from `/etc/tedge/mappers/{name}/`. /// /// The mapper name must match `[a-z][a-z0-9-]*`. @@ -202,7 +198,6 @@ impl fmt::Display for MapperName { MapperName::C8y { profile: Some(profile), } => write!(f, "tedge-mapper-c8y@{profile}"), - MapperName::Collectd => write!(f, "tedge-mapper-collectd"), MapperName::UserDefined(args) => write!( f, "tedge-mapper-{}", @@ -221,7 +216,6 @@ impl MapperName { MapperName::Aws { .. } => "tedge-mapper-aws", #[cfg(feature = "c8y")] MapperName::C8y { .. } => "tedge-mapper-c8y", - MapperName::Collectd => "tedge-mapper-collectd", MapperName::UserDefined(_) => "tedge-mapper", } } From f0b950c76f37f8c42fce493e5e76ee745e32caa2 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 21 Apr 2026 13:41:00 +0200 Subject: [PATCH 04/11] breaking change! deprecate collectd_ext Signed-off-by: Didier Wenzek --- Cargo.lock | 18 - Cargo.toml | 1 - crates/extensions/collectd_ext/Cargo.toml | 31 -- crates/extensions/collectd_ext/src/actor.rs | 91 ----- crates/extensions/collectd_ext/src/batcher.rs | 125 ------ .../extensions/collectd_ext/src/collectd.rs | 373 ------------------ .../extensions/collectd_ext/src/converter.rs | 26 -- crates/extensions/collectd_ext/src/error.rs | 29 -- crates/extensions/collectd_ext/src/lib.rs | 5 - 9 files changed, 699 deletions(-) delete mode 100644 crates/extensions/collectd_ext/Cargo.toml delete mode 100644 crates/extensions/collectd_ext/src/actor.rs delete mode 100644 crates/extensions/collectd_ext/src/batcher.rs delete mode 100644 crates/extensions/collectd_ext/src/collectd.rs delete mode 100644 crates/extensions/collectd_ext/src/converter.rs delete mode 100644 crates/extensions/collectd_ext/src/error.rs delete mode 100644 crates/extensions/collectd_ext/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 6d79ba1f909..19699b6a3e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,24 +1197,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" -[[package]] -name = "collectd_ext" -version = "2.0.1" -dependencies = [ - "anyhow", - "assert_matches", - "async-trait", - "batcher", - "clock", - "log", - "tedge_actors", - "tedge_api", - "tedge_mqtt_ext", - "thiserror 2.0.12", - "time", - "tokio", -] - [[package]] name = "colorchoice" version = "1.0.3" diff --git a/Cargo.toml b/Cargo.toml index 5413dc5d2da..cdff9f912f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,6 @@ c8y_http_proxy = { path = "crates/extensions/c8y_http_proxy" } c8y_mapper_ext = { path = "crates/extensions/c8y_mapper_ext" } certificate = { path = "crates/common/certificate" } clock = { path = "crates/common/clock" } -collectd_ext = { path = "crates/extensions/collectd_ext" } download = { path = "crates/common/download" } flockfile = { path = "crates/common/flockfile" } json-writer = { path = "crates/common/json_writer" } diff --git a/crates/extensions/collectd_ext/Cargo.toml b/crates/extensions/collectd_ext/Cargo.toml deleted file mode 100644 index 01eddee6096..00000000000 --- a/crates/extensions/collectd_ext/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "collectd_ext" -description = "thin-edge extension adding support for collectd" -version = { workspace = true } -authors = { workspace = true } -edition = { workspace = true } -rust-version = { workspace = true } -license = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-trait = { workspace = true } -batcher = { workspace = true } -clock = { workspace = true } -log = { workspace = true } -tedge_actors = { workspace = true } -tedge_api = { workspace = true } -tedge_mqtt_ext = { workspace = true } -thiserror = { workspace = true } -time = { workspace = true } -tokio = { workspace = true, features = ["sync", "time"] } - -[dev-dependencies] -anyhow = { workspace = true } -assert_matches = { workspace = true } -time = { workspace = true, features = ["macros"] } - -[lints] -workspace = true diff --git a/crates/extensions/collectd_ext/src/actor.rs b/crates/extensions/collectd_ext/src/actor.rs deleted file mode 100644 index d410435b6f2..00000000000 --- a/crates/extensions/collectd_ext/src/actor.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::collectd::CollectdMessage; -use async_trait::async_trait; -use log::error; -use std::convert::Infallible; -use tedge_actors::Actor; -use tedge_actors::Builder; -use tedge_actors::DynSender; -use tedge_actors::MessageReceiver; -use tedge_actors::MessageSink; -use tedge_actors::MessageSource; -use tedge_actors::NoConfig; -use tedge_actors::RuntimeError; -use tedge_actors::RuntimeRequest; -use tedge_actors::RuntimeRequestSink; -use tedge_actors::Sender; -use tedge_actors::SimpleMessageBox; -use tedge_actors::SimpleMessageBoxBuilder; -use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::TopicFilter; - -/// An actor that collects measurements from collectd over MQTT -pub struct CollectdActor { - messages: SimpleMessageBox, -} - -#[async_trait] -impl Actor for CollectdActor { - fn name(&self) -> &str { - "collectd" - } - - async fn run(mut self) -> Result<(), RuntimeError> { - while let Some(message) = self.messages.recv().await { - match CollectdMessage::parse_from(&message) { - Ok(collectd_message) => { - for msg in collectd_message { - self.messages.send(msg).await? - } - } - Err(err) => { - error!("Error while decoding a collectd message: {}", err); - } - } - } - Ok(()) - } -} - -pub struct CollectdActorBuilder { - topics: TopicFilter, - message_box: SimpleMessageBoxBuilder, -} - -impl CollectdActorBuilder { - pub fn new(topics: TopicFilter) -> Self { - CollectdActorBuilder { - topics, - message_box: SimpleMessageBoxBuilder::new("Collectd", 16), - } - } - - pub fn add_input(&mut self, source: &mut impl MessageSource) { - source.connect_sink(self.topics.clone(), &self.message_box) - } -} - -impl RuntimeRequestSink for CollectdActorBuilder { - fn get_signal_sender(&self) -> DynSender { - self.message_box.get_signal_sender() - } -} - -impl MessageSource for CollectdActorBuilder { - fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink) { - self.message_box.connect_sink(config, peer) - } -} - -impl Builder for CollectdActorBuilder { - type Error = Infallible; - - fn try_build(self) -> Result { - Ok(self.build()) - } - - fn build(self) -> CollectdActor { - CollectdActor { - messages: self.message_box.build(), - } - } -} diff --git a/crates/extensions/collectd_ext/src/batcher.rs b/crates/extensions/collectd_ext/src/batcher.rs deleted file mode 100644 index 094af4bc487..00000000000 --- a/crates/extensions/collectd_ext/src/batcher.rs +++ /dev/null @@ -1,125 +0,0 @@ -use clock::Timestamp; -use tedge_api::measurement::MeasurementGroup; -use tedge_api::measurement::MeasurementGrouper; -use tedge_api::measurement::MeasurementGrouperError; -use tedge_api::measurement::MeasurementVisitor; -use tedge_api::measurement::ThinEdgeJsonSerializer; -use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::Topic; - -use super::collectd::CollectdMessage; -use super::error::DeviceMonitorError; - -#[derive(Debug)] -pub struct MessageBatch { - message_grouper: MeasurementGrouper, -} - -impl MessageBatch { - pub fn thin_edge_json( - output_topic: &Topic, - messages: Vec, - ) -> Result { - let mut messages = messages.into_iter(); - - if let Some(first_message) = messages.next() { - let timestamp = first_message.timestamp; - let mut batch = MessageBatch::start_batch(first_message, timestamp)?; - for message in messages { - batch.add_to_batch(message)?; - } - let measurements = batch.end_batch()?; - - let mut tedge_json_serializer = ThinEdgeJsonSerializer::new(); - measurements.accept(&mut tedge_json_serializer)?; - - let payload = tedge_json_serializer.bytes()?; - Ok(MqttMessage::new(output_topic, payload)) - } else { - Err(DeviceMonitorError::FromInvalidThinEdgeJson( - MeasurementGrouperError::UnexpectedEnd, - )) - } - } - - fn start_batch( - collectd_message: CollectdMessage, - timestamp: Timestamp, - ) -> Result { - let mut message_grouper = MeasurementGrouper::new(); - message_grouper.visit_timestamp(timestamp)?; - - let mut message_batch = Self { message_grouper }; - - message_batch.add_to_batch(collectd_message)?; - - Ok(message_batch) - } - - fn add_to_batch( - &mut self, - collectd_message: CollectdMessage, - ) -> Result<(), DeviceMonitorError> { - collectd_message.accept(&mut self.message_grouper)?; - Ok(()) - } - - fn end_batch(self) -> Result { - Ok(self.message_grouper.end()?) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use assert_matches::assert_matches; - use clock::Clock; - use clock::WallClock; - use time::macros::datetime; - - #[test] - fn test_message_batch_processor() -> anyhow::Result<()> { - let timestamp = datetime!(2015-05-15 0:00:01.444 UTC); - let collectd_message = CollectdMessage::new("temperature", "value", 32.5, timestamp); - let mut message_batch = MessageBatch::start_batch(collectd_message, WallClock.now())?; - - let collectd_message = CollectdMessage::new("coordinate", "x", 50.0, timestamp); - message_batch.add_to_batch(collectd_message)?; - - let collectd_message = CollectdMessage::new("coordinate", "y", 70.0, timestamp); - message_batch.add_to_batch(collectd_message)?; - - let collectd_message = CollectdMessage::new("pressure", "value", 98.2, timestamp); - message_batch.add_to_batch(collectd_message)?; - - let collectd_message = CollectdMessage::new("coordinate", "z", 90.0, timestamp); - message_batch.add_to_batch(collectd_message)?; - - let message_group = message_batch.end_batch()?; - - assert_matches!(message_group.timestamp(), Some(_)); - - assert_eq!( - message_group.get_measurement_value(Some("temperature"), "value"), - Some(32.5) - ); - assert_eq!( - message_group.get_measurement_value(Some("pressure"), "value"), - Some(98.2) - ); - assert_eq!( - message_group.get_measurement_value(Some("coordinate"), "x"), - Some(50.0) - ); - assert_eq!( - message_group.get_measurement_value(Some("coordinate"), "y"), - Some(70.0) - ); - assert_eq!( - message_group.get_measurement_value(Some("coordinate"), "z"), - Some(90.0) - ); - - Ok(()) - } -} diff --git a/crates/extensions/collectd_ext/src/collectd.rs b/crates/extensions/collectd_ext/src/collectd.rs deleted file mode 100644 index 6176cf9452e..00000000000 --- a/crates/extensions/collectd_ext/src/collectd.rs +++ /dev/null @@ -1,373 +0,0 @@ -use batcher::Batchable; -use tedge_api::measurement::MeasurementVisitor; -use tedge_mqtt_ext::MqttMessage; -use time::Duration; -use time::OffsetDateTime; - -#[derive(Debug)] -pub struct CollectdMessage { - pub metric_group_key: String, - pub metric_key: String, - pub timestamp: OffsetDateTime, - pub metric_value: f64, -} - -#[derive(thiserror::Error, Debug)] -pub enum CollectdError { - #[error( - "Message received on invalid collectd topic: {0}. \ - Collectd message topics must be in the format collectd///" - )] - InvalidMeasurementTopic(String), - - #[error("Invalid payload received on topic: {0}. Error: {1}")] - InvalidMeasurementPayload(String, CollectdPayloadError), - - #[error("Non UTF-8 payload: {0:?}")] - NonUTF8MeasurementPayload(Vec), -} - -impl CollectdMessage { - pub fn accept(&self, visitor: &mut T) -> Result<(), T::Error> - where - T: MeasurementVisitor, - { - visitor.visit_grouped_measurement( - &self.metric_group_key, - &self.metric_key, - self.metric_value, - ) - } - - #[cfg(test)] - pub fn new( - metric_group_key: &str, - metric_key: &str, - metric_value: f64, - timestamp: OffsetDateTime, - ) -> Self { - Self { - metric_group_key: metric_group_key.to_string(), - metric_key: metric_key.to_string(), - timestamp, - metric_value, - } - } - - pub fn parse_from(mqtt_message: &MqttMessage) -> Result, CollectdError> { - let topic = mqtt_message.topic.name.as_str(); - let collectd_topic = match CollectdTopic::from_str(topic) { - Ok(collectd_topic) => collectd_topic, - Err(_) => { - return Err(CollectdError::InvalidMeasurementTopic(topic.into())); - } - }; - - let payload = mqtt_message.payload_str().map_err(|_err| { - CollectdError::NonUTF8MeasurementPayload(mqtt_message.payload_bytes().into()) - })?; - - let collectd_payload = CollectdPayload::parse_from(payload) - .map_err(|err| CollectdError::InvalidMeasurementPayload(topic.into(), err))?; - - let num_measurements = collectd_payload.metric_values.len(); - let mut collectd_messages: Vec = Vec::with_capacity(num_measurements); - - for (i, value) in collectd_payload.metric_values.iter().enumerate() { - let mut metric_key = collectd_topic.metric_key.to_string(); - // If there are multiple values, then create unique keys metric_key_val1, metric_key_val2 etc. - if num_measurements > 1 { - metric_key = format!("{}_val{}", metric_key, i + 1); - } - collectd_messages.push(CollectdMessage { - metric_group_key: collectd_topic.metric_group_key.to_string(), - metric_key, - timestamp: collectd_payload.timestamp(), - metric_value: *value, - }); - } - Ok(collectd_messages) - } -} - -#[derive(Debug, Eq, PartialEq, Hash)] -pub struct CollectdTopic<'a> { - metric_group_key: &'a str, - metric_key: &'a str, -} - -#[derive(Debug)] -struct InvalidCollectdTopicName; - -impl<'a> CollectdTopic<'a> { - fn from_str(topic_name: &'a str) -> Result { - let mut iter = topic_name.split('/'); - let _collectd_prefix = iter.next().ok_or(InvalidCollectdTopicName)?; - let _hostname = iter.next().ok_or(InvalidCollectdTopicName)?; - let metric_group_key = iter.next().ok_or(InvalidCollectdTopicName)?; - let metric_key = iter.next().ok_or(InvalidCollectdTopicName)?; - - match iter.next() { - None => Ok(CollectdTopic { - metric_group_key, - metric_key, - }), - Some(_) => Err(InvalidCollectdTopicName), - } - } -} - -#[derive(Debug)] -struct CollectdPayload { - timestamp: f64, - metric_values: Vec, -} - -#[derive(thiserror::Error, Debug)] -#[allow(clippy::enum_variant_names)] -pub enum CollectdPayloadError { - #[error("Invalid payload: {0}. Expected payload format: :")] - InvalidMeasurementPayloadFormat(String), - - #[error("Invalid measurement timestamp: {0}. Epoch time value expected")] - InvalidMeasurementTimestamp(String), - - #[error("Invalid measurement value: {0}. Must be a number")] - InvalidMeasurementValue(String), -} - -impl CollectdPayload { - fn parse_from(payload: &str) -> Result { - let msg: Vec<&str> = payload.split(':').collect(); - let vec_len = msg.len(); - - if vec_len <= 1 { - return Err(CollectdPayloadError::InvalidMeasurementPayloadFormat( - payload.to_string(), - )); - } - - // First element is always the timestamp - let timestamp = msg[0].parse::().map_err(|_err| { - CollectdPayloadError::InvalidMeasurementTimestamp(msg[0].to_string()) - })?; - - let metric_values = msg - .into_iter() - .skip(1) - .map(|m| { - m.parse::() - .map_err(|_err| CollectdPayloadError::InvalidMeasurementValue(m.to_string())) - }) - .collect::, _>>()?; - - Ok(CollectdPayload { - timestamp, - metric_values, - }) - } - - pub fn timestamp(&self) -> OffsetDateTime { - let timestamp = self.timestamp.trunc() as i64; - let nanoseconds = (self.timestamp.fract() * 1.0e9) as u32; - OffsetDateTime::from_unix_timestamp(timestamp).unwrap() - + Duration::nanoseconds(nanoseconds as i64) - } -} - -impl Batchable for CollectdMessage { - type Key = String; - - fn key(&self) -> Self::Key { - format!("{}/{}", &self.metric_group_key, &self.metric_key) - } - - fn event_time(&self) -> OffsetDateTime { - self.timestamp - } -} - -#[cfg(test)] -mod tests { - use assert_matches::assert_matches; - use std::ops::Index; - use tedge_mqtt_ext::MqttMessage; - use tedge_mqtt_ext::Topic; - use time::macros::datetime; - - use super::*; - - #[test] - fn collectd_message_parsing() { - let topic = Topic::new_unchecked("collectd/localhost/temperature/value"); - let mqtt_message = MqttMessage::new(&topic, "123456789:32.5"); - - let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap(); - - let CollectdMessage { - metric_group_key, - metric_key, - timestamp, - metric_value, - } = collectd_message.index(0); - assert_eq!(metric_group_key, "temperature"); - - assert_eq!(metric_key, "value"); - assert_eq!(*timestamp, datetime!(1973-11-29 21:33:09.0 UTC)); - assert_eq!(*metric_value, 32.5); - } - - #[test] - fn collectd_message_parsing_multi_valued_measurement() { - let topic = Topic::new("collectd/localhost/temperature/value").unwrap(); - let mqtt_message = MqttMessage::new(&topic, "123456789:32.5:45.2"); - - let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap(); - - let CollectdMessage { - metric_group_key, - metric_key, - timestamp, - metric_value: _, - } = collectd_message.index(0); - assert_eq!(metric_group_key, "temperature"); - - assert_eq!(metric_key, "value_val1"); - assert_eq!(*timestamp, datetime!(1973-11-29 21:33:09.0 UTC)); - - let CollectdMessage { - metric_group_key, - metric_key, - timestamp, - metric_value, - } = collectd_message.index(1); - - assert_eq!(metric_group_key, "temperature"); - assert_eq!(metric_key, "value_val2"); - assert_eq!(*timestamp, datetime!(1973-11-29 21:33:09.0 UTC)); - assert_eq!(*metric_value, 45.2); - } - - #[test] - fn collectd_null_terminated_message_parsing() { - let topic = Topic::new("collectd/localhost/temperature/value").unwrap(); - let mqtt_message = MqttMessage::new(&topic, "123456789.125:32.5\u{0}"); - - let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap(); - - let CollectdMessage { - metric_group_key, - metric_key, - timestamp, - metric_value, - } = collectd_message.index(0); - - assert_eq!(metric_group_key, "temperature"); - assert_eq!(metric_key, "value"); - assert_eq!(*timestamp, datetime!(1973-11-29 21:33:09.125 UTC)); - assert_eq!(*metric_value, 32.5); - } - - #[test] - fn invalid_collectd_message_topic() { - let topic = Topic::new("collectd/less/level").unwrap(); - let mqtt_message = MqttMessage::new(&topic, "123456789:32.5"); - - let result = CollectdMessage::parse_from(&mqtt_message); - - assert_matches!(result, Err(CollectdError::InvalidMeasurementTopic(_))); - } - - #[test] - fn invalid_collectd_message_payload() { - let topic = Topic::new("collectd/host/group/key").unwrap(); - let invalid_collectd_message = MqttMessage::new(&topic, "123456789"); - - let result = CollectdMessage::parse_from(&invalid_collectd_message); - - assert_matches!(result, Err(CollectdError::InvalidMeasurementPayload(_, _))); - } - - #[test] - fn invalid_collectd_topic_less_levels() { - let result = CollectdTopic::from_str("collectd/less/levels"); - - assert_matches!(result, Err(InvalidCollectdTopicName)); - } - - #[test] - fn invalid_collectd_topic_more_levels() { - let result = CollectdTopic::from_str("collectd/more/levels/than/needed"); - - assert_matches!(result, Err(InvalidCollectdTopicName)); - } - - #[test] - fn invalid_collectd_payload_no_separator() { - let payload = "123456789"; - let result = CollectdPayload::parse_from(payload); - - assert_matches!( - result, - Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(_)) - ); - } - - #[test] - fn invalid_collectd_metric_value() { - let payload = "123456789:abc"; - let result = CollectdPayload::parse_from(payload); - - assert_matches!( - result, - Err(CollectdPayloadError::InvalidMeasurementValue(_)) - ); - } - - #[test] - fn invalid_collectd_metric_multi_value() { - let payload = "123456789:96.6:abc"; - let result = CollectdPayload::parse_from(payload); - - assert_matches!( - result, - Err(CollectdPayloadError::InvalidMeasurementValue(_)) - ); - } - - #[test] - fn valid_collectd_multivalue_metric() { - let payload = "123456789:1234:5678"; - let result = CollectdPayload::parse_from(payload).unwrap(); - - assert_eq!(result.timestamp, 123456789.0); - assert_eq!(result.metric_values, vec![1234.0, 5678.0]); - } - - #[test] - fn invalid_collectd_metric_timestamp() { - let payload = "abc:98.6"; - let result = CollectdPayload::parse_from(payload); - - assert_matches!( - result, - Err(CollectdPayloadError::InvalidMeasurementTimestamp(_)) - ); - } - - #[test] - fn very_large_metric_value() { - let payload: String = format!("123456789:{}", u128::MAX); - let collectd_payload = CollectdPayload::parse_from(payload.as_str()).unwrap(); - - assert_eq!(*collectd_payload.metric_values.index(0), u128::MAX as f64); - } - - #[test] - fn very_small_metric_value() { - let payload: String = format!("123456789:{}", i128::MIN); - let collectd_payload = CollectdPayload::parse_from(payload.as_str()).unwrap(); - - assert_eq!(*collectd_payload.metric_values.index(0), i128::MIN as f64); - } -} diff --git a/crates/extensions/collectd_ext/src/converter.rs b/crates/extensions/collectd_ext/src/converter.rs deleted file mode 100644 index aebd384dd0b..00000000000 --- a/crates/extensions/collectd_ext/src/converter.rs +++ /dev/null @@ -1,26 +0,0 @@ -use crate::batcher::MessageBatch; -use crate::collectd::CollectdMessage; -use batcher::BatchDriverOutput; -use log::error; -use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::Topic; - -pub fn batch_into_mqtt_messages( - output_topic: &Topic, - in_message: BatchDriverOutput, -) -> Vec { - match in_message { - BatchDriverOutput::Batch(measurements) => { - match MessageBatch::thin_edge_json(output_topic, measurements) { - Ok(message) => { - vec![message] - } - Err(err) => { - error!("Error while encoding a thin-edge json message: {}", err); - vec![] - } - } - } - BatchDriverOutput::Flush => vec![], - } -} diff --git a/crates/extensions/collectd_ext/src/error.rs b/crates/extensions/collectd_ext/src/error.rs deleted file mode 100644 index df4133b50ab..00000000000 --- a/crates/extensions/collectd_ext/src/error.rs +++ /dev/null @@ -1,29 +0,0 @@ -use tedge_actors::RuntimeError; -use tokio::sync::mpsc::error::SendError; - -#[derive(thiserror::Error, Debug)] -#[allow(clippy::enum_variant_names)] -pub enum DeviceMonitorError { - #[error(transparent)] - FromMqttClient(#[from] tedge_mqtt_ext::MqttError), - - #[error(transparent)] - FromInvalidCollectdMeasurement(#[from] crate::collectd::CollectdError), - - #[error(transparent)] - FromInvalidThinEdgeJson(#[from] tedge_api::measurement::MeasurementGrouperError), - - #[error(transparent)] - FromThinEdgeJsonSerializationError( - #[from] tedge_api::measurement::ThinEdgeJsonSerializationError, - ), - - #[error(transparent)] - FromBatchingError(#[from] SendError), -} - -impl From for RuntimeError { - fn from(error: DeviceMonitorError) -> Self { - Box::new(error).into() - } -} diff --git a/crates/extensions/collectd_ext/src/lib.rs b/crates/extensions/collectd_ext/src/lib.rs deleted file mode 100644 index f6c3fcf54ad..00000000000 --- a/crates/extensions/collectd_ext/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod actor; -pub mod batcher; -pub mod collectd; -pub mod converter; -pub mod error; From c3a1c5cee601e9fa4f051c44fc32c6383cabdad1 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 21 Apr 2026 13:54:06 +0200 Subject: [PATCH 05/11] breaking change! deprecate tedge-mapper-collectd packaging Signed-off-by: Didier Wenzek --- .../systemd/tedge-mapper-collectd.service | 14 ----- .../package_manifests/nfpm.tedge-mapper.yaml | 13 +--- .../_generated/tedge-mapper/apk/postinst | 6 -- .../_generated/tedge-mapper/apk/postrm | 3 +- .../_generated/tedge-mapper/deb/postinst | 10 +--- .../_generated/tedge-mapper/deb/postrm | 3 +- .../_generated/tedge-mapper/rpm/postinst | 8 +-- .../_generated/tedge-mapper/rpm/postrm | 3 +- .../_generated/tedge/apk/preinst | 4 -- .../_generated/tedge/deb/preinst | 4 -- .../_generated/tedge/rpm/preinst | 4 -- configuration/package_scripts/packages.json | 2 +- .../package_scripts/tedge-mapper/postinst | 6 -- .../package_scripts/tedge-mapper/postrm | 3 +- configuration/package_scripts/tedge/preinst | 4 -- .../health_tedge-mapper-collectd.robot | 60 ------------------- 16 files changed, 9 insertions(+), 138 deletions(-) delete mode 100644 configuration/init/systemd/tedge-mapper-collectd.service delete mode 100644 tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot diff --git a/configuration/init/systemd/tedge-mapper-collectd.service b/configuration/init/systemd/tedge-mapper-collectd.service deleted file mode 100644 index b7193db0746..00000000000 --- a/configuration/init/systemd/tedge-mapper-collectd.service +++ /dev/null @@ -1,14 +0,0 @@ -[Unit] -Description=tedge-mapper-collectd converts Thin Edge JSON measurements to Cumulocity JSON format. -After=syslog.target network.target mosquitto.service - -[Service] -User=tedge -ExecStartPre=+-/usr/bin/tedge init -ExecStart=/usr/bin/tedge-mapper collectd -Restart=on-failure -RestartPreventExitStatus=255 -RestartSec=5 - -[Install] -WantedBy=multi-user.target diff --git a/configuration/package_manifests/nfpm.tedge-mapper.yaml b/configuration/package_manifests/nfpm.tedge-mapper.yaml index bc0f24ba480..8b8fb2f905e 100644 --- a/configuration/package_manifests/nfpm.tedge-mapper.yaml +++ b/configuration/package_manifests/nfpm.tedge-mapper.yaml @@ -132,17 +132,6 @@ contents: mode: 0644 packager: rpm - - src: ./configuration/init/systemd/tedge-mapper-collectd.service - dst: /lib/systemd/system/tedge-mapper-collectd.service - file_info: - mode: 0644 - packager: deb - - src: ./configuration/init/systemd/tedge-mapper-collectd.service - dst: /lib/systemd/system/tedge-mapper-collectd.service - file_info: - mode: 0644 - packager: rpm - - src: ./configuration/init/systemd/tedge-mapper-local.service dst: /lib/systemd/system/tedge-mapper-local.service file_info: @@ -264,4 +253,4 @@ overrides: preinstall: configuration/package_scripts/_generated/tedge-mapper/deb/preinst postinstall: configuration/package_scripts/_generated/tedge-mapper/deb/postinst preremove: configuration/package_scripts/_generated/tedge-mapper/deb/prerm - postremove: configuration/package_scripts/_generated/tedge-mapper/deb/postrm \ No newline at end of file + postremove: configuration/package_scripts/_generated/tedge-mapper/deb/postrm diff --git a/configuration/package_scripts/_generated/tedge-mapper/apk/postinst b/configuration/package_scripts/_generated/tedge-mapper/apk/postinst index b5d579124cc..e553a9a021d 100644 --- a/configuration/package_scripts/_generated/tedge-mapper/apk/postinst +++ b/configuration/package_scripts/_generated/tedge-mapper/apk/postinst @@ -51,12 +51,6 @@ if command -v systemctl >/dev/null; then if [ -f "/etc/tedge/mosquitto-conf/aws-bridge.conf" ]; then enable_start_service tedge-mapper-aws.service fi - if [ -d /run/systemd/system ]; then - ### Enable the service if the collectd is running on the device - if systemctl is-active --quiet collectd.service; then - enable_start_service tedge-mapper-collectd.service - fi - fi fi if [ -f /var/lib/dpkg/info/tedge_mapper.postrm ]; then diff --git a/configuration/package_scripts/_generated/tedge-mapper/apk/postrm b/configuration/package_scripts/_generated/tedge-mapper/apk/postrm index 11dbc79c731..1c018d8f11c 100644 --- a/configuration/package_scripts/_generated/tedge-mapper/apk/postrm +++ b/configuration/package_scripts/_generated/tedge-mapper/apk/postrm @@ -6,8 +6,7 @@ purge_mapper_lock() { /run/lock/tedge-mapper-c8y.lock \ /run/lock/tedge-mapper-az.lock \ /run/lock/tedge-mapper-aws.lock \ - /run/lock/tedge-mapper-local.lock \ - /run/lock/tedge-mapper-collectd.lock + /run/lock/tedge-mapper-local.lock } case "$1" in diff --git a/configuration/package_scripts/_generated/tedge-mapper/deb/postinst b/configuration/package_scripts/_generated/tedge-mapper/deb/postinst index 23e5c8c5d9a..531addb1f41 100755 --- a/configuration/package_scripts/_generated/tedge-mapper/deb/postinst +++ b/configuration/package_scripts/_generated/tedge-mapper/deb/postinst @@ -230,9 +230,9 @@ if [ "$1" = "configure" ] || [ "$1" = "abort-upgrade" ] || [ "$1" = "abort-decon systemctl --system daemon-reload >/dev/null || true if [ -n "$2" ]; then if command -v deb-systemd-invoke >/dev/null 2>&1; then - deb-systemd-invoke try-restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service tedge-mapper-collectd.service >/dev/null || true + deb-systemd-invoke try-restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service >/dev/null || true else - systemctl try-restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service tedge-mapper-collectd.service >/dev/null || true + systemctl try-restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service >/dev/null || true fi fi fi @@ -273,12 +273,6 @@ if command -v systemctl >/dev/null; then if [ -f "/etc/tedge/mosquitto-conf/aws-bridge.conf" ]; then enable_start_service tedge-mapper-aws.service fi - if [ -d /run/systemd/system ]; then - ### Enable the service if the collectd is running on the device - if systemctl is-active --quiet collectd.service; then - enable_start_service tedge-mapper-collectd.service - fi - fi fi if [ -f /var/lib/dpkg/info/tedge_mapper.postrm ]; then diff --git a/configuration/package_scripts/_generated/tedge-mapper/deb/postrm b/configuration/package_scripts/_generated/tedge-mapper/deb/postrm index 0a69c0bfc49..710a02b83cc 100755 --- a/configuration/package_scripts/_generated/tedge-mapper/deb/postrm +++ b/configuration/package_scripts/_generated/tedge-mapper/deb/postrm @@ -6,8 +6,7 @@ purge_mapper_lock() { /run/lock/tedge-mapper-c8y.lock \ /run/lock/tedge-mapper-az.lock \ /run/lock/tedge-mapper-aws.lock \ - /run/lock/tedge-mapper-local.lock \ - /run/lock/tedge-mapper-collectd.lock + /run/lock/tedge-mapper-local.lock } case "$1" in diff --git a/configuration/package_scripts/_generated/tedge-mapper/rpm/postinst b/configuration/package_scripts/_generated/tedge-mapper/rpm/postinst index 09b4a55f156..e464b6bf180 100644 --- a/configuration/package_scripts/_generated/tedge-mapper/rpm/postinst +++ b/configuration/package_scripts/_generated/tedge-mapper/rpm/postinst @@ -78,7 +78,7 @@ fi if [ $1 -eq 2 ]; then if [ -d /run/systemd/system ]; then systemctl --system daemon-reload >/dev/null || true - systemctl restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service tedge-mapper-collectd.service >/dev/null || true + systemctl restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service >/dev/null || true fi fi # End automatically added section @@ -117,12 +117,6 @@ if command -v systemctl >/dev/null; then if [ -f "/etc/tedge/mosquitto-conf/aws-bridge.conf" ]; then enable_start_service tedge-mapper-aws.service fi - if [ -d /run/systemd/system ]; then - ### Enable the service if the collectd is running on the device - if systemctl is-active --quiet collectd.service; then - enable_start_service tedge-mapper-collectd.service - fi - fi fi if [ -f /var/lib/dpkg/info/tedge_mapper.postrm ]; then diff --git a/configuration/package_scripts/_generated/tedge-mapper/rpm/postrm b/configuration/package_scripts/_generated/tedge-mapper/rpm/postrm index 3dcfd3cf68a..2a9c531e76c 100644 --- a/configuration/package_scripts/_generated/tedge-mapper/rpm/postrm +++ b/configuration/package_scripts/_generated/tedge-mapper/rpm/postrm @@ -6,8 +6,7 @@ purge_mapper_lock() { /run/lock/tedge-mapper-c8y.lock \ /run/lock/tedge-mapper-az.lock \ /run/lock/tedge-mapper-aws.lock \ - /run/lock/tedge-mapper-local.lock \ - /run/lock/tedge-mapper-collectd.lock + /run/lock/tedge-mapper-local.lock } case "$1" in diff --git a/configuration/package_scripts/_generated/tedge/apk/preinst b/configuration/package_scripts/_generated/tedge/apk/preinst index aa437772049..573e00fa434 100644 --- a/configuration/package_scripts/_generated/tedge/apk/preinst +++ b/configuration/package_scripts/_generated/tedge/apk/preinst @@ -146,8 +146,4 @@ if user_exists "$TEDGE_USER" && group_exists "$TEDGE_GROUP"; then chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-aws.lock fi - if [ -f "/run/lock/tedge-mapper-collectd.lock" ]; then - chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-collectd.lock - fi - fi # end: user_exists && group_exists diff --git a/configuration/package_scripts/_generated/tedge/deb/preinst b/configuration/package_scripts/_generated/tedge/deb/preinst index aa437772049..573e00fa434 100755 --- a/configuration/package_scripts/_generated/tedge/deb/preinst +++ b/configuration/package_scripts/_generated/tedge/deb/preinst @@ -146,8 +146,4 @@ if user_exists "$TEDGE_USER" && group_exists "$TEDGE_GROUP"; then chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-aws.lock fi - if [ -f "/run/lock/tedge-mapper-collectd.lock" ]; then - chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-collectd.lock - fi - fi # end: user_exists && group_exists diff --git a/configuration/package_scripts/_generated/tedge/rpm/preinst b/configuration/package_scripts/_generated/tedge/rpm/preinst index aa437772049..573e00fa434 100644 --- a/configuration/package_scripts/_generated/tedge/rpm/preinst +++ b/configuration/package_scripts/_generated/tedge/rpm/preinst @@ -146,8 +146,4 @@ if user_exists "$TEDGE_USER" && group_exists "$TEDGE_GROUP"; then chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-aws.lock fi - if [ -f "/run/lock/tedge-mapper-collectd.lock" ]; then - chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-collectd.lock - fi - fi # end: user_exists && group_exists diff --git a/configuration/package_scripts/packages.json b/configuration/package_scripts/packages.json index 549bcc445e9..15df42400e6 100644 --- a/configuration/package_scripts/packages.json +++ b/configuration/package_scripts/packages.json @@ -36,7 +36,7 @@ {"name": "tedge-mapper-az", "enable": false, "start": false, "restart_after_upgrade": true, "stop_on_upgrade": true}, {"name": "tedge-mapper-c8y", "enable": false, "start": false, "restart_after_upgrade": true, "stop_on_upgrade": true}, {"name": "tedge-mapper-local", "enable": true, "start": false, "restart_after_upgrade": true, "stop_on_upgrade": true}, - {"name": "tedge-mapper-collectd", "enable": false, "start": false, "restart_after_upgrade": true, "stop_on_upgrade": true}, + {"name": "tedge-mapper-collectd", "enable": false, "start": false, "restart_after_upgrade": true, "stop_on_upgrade": true, "deprecated": true}, {"name": "tedge-mapper-aws.target", "enable": true, "start": true, "restart_after_upgrade": true, "stop_on_upgrade": true}, {"name": "tedge-mapper-az.target", "enable": true, "start": true, "restart_after_upgrade": true, "stop_on_upgrade": true}, {"name": "tedge-mapper-c8y.target", "enable": true, "start": true, "restart_after_upgrade": true, "stop_on_upgrade": true} diff --git a/configuration/package_scripts/tedge-mapper/postinst b/configuration/package_scripts/tedge-mapper/postinst index d846b12e946..1877f591932 100644 --- a/configuration/package_scripts/tedge-mapper/postinst +++ b/configuration/package_scripts/tedge-mapper/postinst @@ -51,12 +51,6 @@ if command -v systemctl >/dev/null; then if [ -f "/etc/tedge/mosquitto-conf/aws-bridge.conf" ]; then enable_start_service tedge-mapper-aws.service fi - if [ -d /run/systemd/system ]; then - ### Enable the service if the collectd is running on the device - if systemctl is-active --quiet collectd.service; then - enable_start_service tedge-mapper-collectd.service - fi - fi fi if [ -f /var/lib/dpkg/info/tedge_mapper.postrm ]; then diff --git a/configuration/package_scripts/tedge-mapper/postrm b/configuration/package_scripts/tedge-mapper/postrm index 20a11ec9142..0784763fc02 100644 --- a/configuration/package_scripts/tedge-mapper/postrm +++ b/configuration/package_scripts/tedge-mapper/postrm @@ -6,8 +6,7 @@ purge_mapper_lock() { /run/lock/tedge-mapper-c8y.lock \ /run/lock/tedge-mapper-az.lock \ /run/lock/tedge-mapper-aws.lock \ - /run/lock/tedge-mapper-local.lock \ - /run/lock/tedge-mapper-collectd.lock + /run/lock/tedge-mapper-local.lock } case "$1" in diff --git a/configuration/package_scripts/tedge/preinst b/configuration/package_scripts/tedge/preinst index aa437772049..573e00fa434 100644 --- a/configuration/package_scripts/tedge/preinst +++ b/configuration/package_scripts/tedge/preinst @@ -146,8 +146,4 @@ if user_exists "$TEDGE_USER" && group_exists "$TEDGE_GROUP"; then chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-aws.lock fi - if [ -f "/run/lock/tedge-mapper-collectd.lock" ]; then - chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-collectd.lock - fi - fi # end: user_exists && group_exists diff --git a/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot b/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot deleted file mode 100644 index 437de9fe672..00000000000 --- a/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot +++ /dev/null @@ -1,60 +0,0 @@ -*** Settings *** -Resource ../../resources/common.resource -Library ThinEdgeIO - -Suite Setup Setup -Suite Teardown Get Suite Logs - -Test Tags theme:monitoring - - -*** Test Cases *** -Stop tedge-mapper-collectd - Execute Command sudo systemctl stop tedge-mapper-collectd.service - -Update the service file - Execute Command cmd=sudo sed -i '10iWatchdogSec=30' /lib/systemd/system/tedge-mapper-collectd.service - -Reload systemd files - Execute Command sudo systemctl daemon-reload - -Start tedge-mapper-collectd - Execute Command sudo systemctl start tedge-mapper-collectd.service - -Start watchdog service - Execute Command sudo systemctl start tedge-watchdog.service - Sleep 10s - -Check PID of tedge-mapper-collectd - ${pid}= Service Should Be Running tedge-mapper-collectd - Set Suite Variable ${pid} - -Kill the PID - Kill Process ${pid} - -Recheck PID of tedge-mapper-collectd - ${pid1}= Service Should Be Running tedge-mapper-collectd - Set Suite Variable ${pid1} - -Compare PID change - Should Not Be Equal ${pid} ${pid1} - -Stop watchdog service - Execute Command sudo systemctl stop tedge-watchdog.service - -Remove entry from service file - Execute Command sudo sed -i '10d' /lib/systemd/system/tedge-mapper-collectd.service - Execute Command sudo systemctl daemon-reload - -tedge-collectd-mapper health status - Execute Command sudo systemctl start tedge-mapper-collectd.service - - Sleep 5s reason=It fails without this! It needs a better way of queuing requests - ${pid}= Service Should Be Running tedge-mapper-collectd - Execute Command sudo tedge mqtt pub 'te/device/main/service/tedge-mapper-collectd/cmd/health/check' '' - ${messages}= Should Have MQTT Messages - ... te/device/main/service/tedge-mapper-collectd/status/health - ... minimum=1 - ... maximum=2 - Should Contain ${messages[0]} "pid":${pid} - Should Contain ${messages[0]} "status":"up" From d4e73b469215a276e56ace3799963317aa1c9ba6 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 21 Apr 2026 14:49:58 +0200 Subject: [PATCH 06/11] Remove batcher dependency on actors Signed-off-by: Didier Wenzek --- Cargo.lock | 1 - crates/common/batcher/Cargo.toml | 2 - crates/common/batcher/src/batcher.rs | 1 + crates/common/batcher/src/driver.rs | 294 --------------------------- crates/common/batcher/src/lib.rs | 57 ------ 5 files changed, 1 insertion(+), 354 deletions(-) delete mode 100644 crates/common/batcher/src/driver.rs diff --git a/Cargo.lock b/Cargo.lock index 19699b6a3e9..d8ea767de17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -679,7 +679,6 @@ version = "2.0.1" dependencies = [ "async-trait", "serde_json", - "tedge_actors", "tedge_flows", "time", "tokio", diff --git a/crates/common/batcher/Cargo.toml b/crates/common/batcher/Cargo.toml index 30bc9ecf9ed..dc277fe9cc5 100644 --- a/crates/common/batcher/Cargo.toml +++ b/crates/common/batcher/Cargo.toml @@ -11,13 +11,11 @@ repository = { workspace = true } [dependencies] async-trait = { workspace = true } serde_json = { workspace = true } -tedge_actors = { workspace = true } tedge_flows = { workspace = true } time = { workspace = true } tokio = { workspace = true, features = ["time"] } [dev-dependencies] -tedge_actors = { workspace = true, features = ["test-helpers"] } tokio = { workspace = true, features = ["rt", "macros"] } [lints] diff --git a/crates/common/batcher/src/batcher.rs b/crates/common/batcher/src/batcher.rs index cd4a147d4c8..1132772dc96 100644 --- a/crates/common/batcher/src/batcher.rs +++ b/crates/common/batcher/src/batcher.rs @@ -109,6 +109,7 @@ impl Batcher { batch.batch_end() + self.config.delivery_jitter() > time } + #[cfg(test)] pub(crate) fn flush(&mut self) -> Vec> { let mut batches = Vec::with_capacity(self.batches.len()); diff --git a/crates/common/batcher/src/driver.rs b/crates/common/batcher/src/driver.rs deleted file mode 100644 index 2dfd7003cb7..00000000000 --- a/crates/common/batcher/src/driver.rs +++ /dev/null @@ -1,294 +0,0 @@ -use crate::batchable::Batchable; -use crate::batcher::Batcher; -use crate::batcher::BatcherOutput; -use async_trait::async_trait; -use std::collections::BTreeSet; -use std::time::Duration; -use tedge_actors::Actor; -use tedge_actors::ChannelError; -use tedge_actors::MessageReceiver; -use tedge_actors::RuntimeError; -use tedge_actors::Sender; -use tedge_actors::SimpleMessageBox; -use time::OffsetDateTime; - -/// Input message to the BatchDriver's input channel. -#[derive(Debug)] -pub enum BatchDriverInput { - /// Message representing a new item to batch. - Event(B), - /// Message representing that the batching should finish and that - /// any remaining batches should be immediately closed and sent to the output. - Flush, -} - -impl From for BatchDriverInput { - fn from(event: B) -> Self { - BatchDriverInput::Event(event) - } -} - -/// Output message from the BatchDriver's output channel. -#[derive(Debug)] -pub enum BatchDriverOutput { - /// Message representing a batch of items. - Batch(Vec), - /// Message representing that batching has finished. - Flush, -} - -impl From> for Vec { - fn from(value: BatchDriverOutput) -> Self { - match value { - BatchDriverOutput::Batch(events) => events, - BatchDriverOutput::Flush => vec![], - } - } -} - -/// The central API for using the batching algorithm. -/// Send items in, get batches out. -pub struct BatchDriver { - batcher: Batcher, - message_box: SimpleMessageBox, BatchDriverOutput>, - timers: BTreeSet, -} - -enum TimeTo { - Unbounded, - Future(std::time::Duration), - Past(OffsetDateTime), -} - -#[async_trait] -impl Actor for BatchDriver { - fn name(&self) -> &str { - "Event batcher" - } - - /// Start the batching - runs until receiving a Flush message - async fn run(mut self) -> Result<(), RuntimeError> { - loop { - let message = match self.time_to_next_timer() { - TimeTo::Unbounded => self.recv(None), - TimeTo::Future(timeout) => self.recv(Some(timeout)), - TimeTo::Past(timer) => { - self.timers.remove(&timer); - self.time(OffsetDateTime::now_utc()).await?; - continue; - } - }; - - match message.await { - Err(_) => continue, // timer timeout expired - Ok(None) => break, // input channel closed - Ok(Some(BatchDriverInput::Flush)) => break, // we've been told to stop - Ok(Some(BatchDriverInput::Event(event))) => self.event(event).await?, - }; - } - - Ok(self.flush().await?) - } -} - -impl BatchDriver { - /// Define the batching process and channels to interact with it. - pub fn new( - batcher: Batcher, - message_box: SimpleMessageBox, BatchDriverOutput>, - ) -> BatchDriver { - BatchDriver { - batcher, - message_box, - timers: BTreeSet::new(), - } - } - - async fn recv( - &mut self, - timeout: Option, - ) -> Result>, tokio::time::error::Elapsed> { - match timeout { - None => Ok(self.message_box.recv().await), - Some(timeout) => tokio::time::timeout(timeout, self.message_box.recv()).await, - } - } - - fn time_to_next_timer(&self) -> TimeTo { - match self.timers.iter().next() { - None => TimeTo::Unbounded, - Some(timer) => { - let signed_duration = *timer - OffsetDateTime::now_utc(); - if signed_duration.is_negative() { - return TimeTo::Past(*timer); - } - TimeTo::Future(std::time::Duration::new( - signed_duration.abs().whole_seconds() as u64, - 0, - )) - } - } - } - - async fn event(&mut self, event: B) -> Result<(), ChannelError> { - for action in self.batcher.event(OffsetDateTime::now_utc(), event) { - match action { - BatcherOutput::Batch(batch) => { - self.message_box - .send(BatchDriverOutput::Batch(batch)) - .await?; - } - BatcherOutput::Timer(t) => { - self.timers.insert(t); - } - }; - } - - Ok(()) - } - - async fn time(&mut self, timer: OffsetDateTime) -> Result<(), ChannelError> { - for batch in self.batcher.time(timer) { - self.message_box - .send(BatchDriverOutput::Batch(batch)) - .await?; - } - - Ok(()) - } - - async fn flush(&mut self) -> Result<(), ChannelError> { - for batch in self.batcher.flush() { - self.message_box - .send(BatchDriverOutput::Batch(batch)) - .await?; - } - - self.message_box.send(BatchDriverOutput::Flush).await - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::batchable::Batchable; - use crate::batcher::Batcher; - use crate::config::BatchConfigBuilder; - use crate::driver::BatchDriver; - use std::time::Duration; - use tedge_actors::Builder; - use tedge_actors::MessageSource; - use tedge_actors::NoConfig; - use tedge_actors::SimpleMessageBoxBuilder; - use tokio::time::timeout; - - type TestBox = - SimpleMessageBox, BatchDriverInput>; - - #[tokio::test] - async fn flush_empty() -> Result<(), ChannelError> { - let mut test_box = spawn_driver(); - test_box.send(BatchDriverInput::Flush).await?; - assert_recv_flush(&mut test_box).await; - Ok(()) - } - - #[tokio::test] - async fn flush_one_batch() -> Result<(), ChannelError> { - let mut test_box = spawn_driver(); - - let event1 = TestBatchEvent::new(1, OffsetDateTime::now_utc()); - test_box.send(BatchDriverInput::Event(event1)).await?; - test_box.send(BatchDriverInput::Flush).await?; - - assert_recv_batch(&mut test_box, vec![event1]).await; - assert_recv_flush(&mut test_box).await; - - Ok(()) - } - - #[tokio::test] - async fn two_batches_with_timer() -> Result<(), ChannelError> { - let mut test_box = spawn_driver(); - - let event1 = TestBatchEvent::new(1, OffsetDateTime::now_utc()); - test_box.send(BatchDriverInput::Event(event1)).await?; - - assert_recv_batch(&mut test_box, vec![event1]).await; - - let event2 = TestBatchEvent::new(2, OffsetDateTime::now_utc()); - test_box.send(BatchDriverInput::Event(event2)).await?; - - assert_recv_batch(&mut test_box, vec![event2]).await; - - Ok(()) - } - - async fn assert_recv_batch(test_box: &mut TestBox, expected: Vec) { - match timeout(Duration::from_secs(10), test_box.recv()).await { - Ok(Some(BatchDriverOutput::Batch(batch))) => assert_batch(batch, expected), - other => panic!("Failed to receive batch: {:?}", other), - } - } - - fn assert_batch(batch: Vec, expected: Vec) { - assert_eq!(batch.len(), expected.len()); - - for event in &batch { - if !expected.contains(event) { - panic!("Failed to find: {:?}", event); - } - } - } - - async fn assert_recv_flush(test_box: &mut TestBox) { - match timeout(Duration::from_secs(10), test_box.recv()).await { - Ok(Some(BatchDriverOutput::Flush)) => {} - other => panic!("Failed to receive flush: {:?}", other), - } - } - - fn spawn_driver() -> TestBox { - let config = BatchConfigBuilder::new() - .event_jitter(50) - .delivery_jitter(20) - .message_leap_limit(0) - .build(); - let batcher = Batcher::new(config); - let mut box_builder = SimpleMessageBoxBuilder::new("SUT", 1); - let mut test_box_builder = SimpleMessageBoxBuilder::new("Test", 1); - box_builder.connect_sink(NoConfig, &test_box_builder); - test_box_builder.connect_sink(NoConfig, &box_builder); - let test_box = test_box_builder.build(); - let driver_box = box_builder.build(); - - let driver = BatchDriver::new(batcher, driver_box); - tokio::spawn(async move { driver.run().await }); - - test_box - } - - #[derive(Debug, Copy, Clone, Eq, PartialEq)] - struct TestBatchEvent { - key: u64, - event_time: OffsetDateTime, - } - - impl TestBatchEvent { - fn new(key: u64, event_time: OffsetDateTime) -> TestBatchEvent { - TestBatchEvent { key, event_time } - } - } - - impl Batchable for TestBatchEvent { - type Key = u64; - - fn key(&self) -> Self::Key { - self.key - } - - fn event_time(&self) -> OffsetDateTime { - self.event_time - } - } -} diff --git a/crates/common/batcher/src/lib.rs b/crates/common/batcher/src/lib.rs index b1d082bd6bf..355ce6deb45 100644 --- a/crates/common/batcher/src/lib.rs +++ b/crates/common/batcher/src/lib.rs @@ -4,7 +4,6 @@ mod batch; mod batchable; mod batcher; mod config; -mod driver; mod flows; pub use crate::batchable::Batchable; @@ -14,60 +13,4 @@ pub use crate::config::BatchConfigBuilder; pub use crate::config::BuildableBatchConfigBuilder; pub use crate::config::DeliveryBatchConfigBuilder; pub use crate::config::EventBatchConfigBuilder; -pub use crate::driver::BatchDriver; -pub use crate::driver::BatchDriverInput; -pub use crate::driver::BatchDriverOutput; pub use crate::flows::MessageBatcher; -use std::convert::Infallible; -use tedge_actors::Builder; -use tedge_actors::DynSender; -use tedge_actors::MessageSink; -use tedge_actors::MessageSource; -use tedge_actors::NoConfig; -use tedge_actors::RuntimeRequest; -use tedge_actors::RuntimeRequestSink; -use tedge_actors::SimpleMessageBoxBuilder; - -pub struct BatchingActorBuilder { - message_box: SimpleMessageBoxBuilder, BatchDriverOutput>, -} - -impl Default for BatchingActorBuilder { - fn default() -> Self { - BatchingActorBuilder { - message_box: SimpleMessageBoxBuilder::new("Event batcher", 16), - } - } -} - -impl MessageSink> for BatchingActorBuilder { - fn get_sender(&self) -> DynSender> { - self.message_box.get_sender() - } -} - -impl MessageSource, NoConfig> for BatchingActorBuilder { - fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink>) { - self.message_box.connect_sink(config, peer) - } -} - -impl RuntimeRequestSink for BatchingActorBuilder { - fn get_signal_sender(&self) -> DynSender { - self.message_box.get_signal_sender() - } -} - -impl Builder> for BatchingActorBuilder { - type Error = Infallible; - - fn try_build(self) -> Result, Self::Error> { - Ok(self.build()) - } - - fn build(self) -> BatchDriver { - let batcher = Batcher::default(); - let message_box = self.message_box.build(); - BatchDriver::new(batcher, message_box) - } -} From 55d5baebcd50225ddde18461632b07834580049c Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 21 Apr 2026 15:33:59 +0200 Subject: [PATCH 07/11] remove collectd from tedge diag plugins Signed-off-by: Didier Wenzek --- tests/RobotFramework/tests/tedge/diag/predefined_plugins.robot | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/RobotFramework/tests/tedge/diag/predefined_plugins.robot b/tests/RobotFramework/tests/tedge/diag/predefined_plugins.robot index f6faf0688f0..e9d8ed2eba4 100644 --- a/tests/RobotFramework/tests/tedge/diag/predefined_plugins.robot +++ b/tests/RobotFramework/tests/tedge/diag/predefined_plugins.robot @@ -14,7 +14,6 @@ Test Tags theme:troubleshooting theme:cli theme:plugins ... output.log ... tedge-agent.log ... tedge-mapper-c8y.log - ... tedge-mapper-collectd.log ... tedge-config-list.log ... tedge.toml ... mappers/c8y/mapper.toml @@ -25,7 +24,6 @@ Test Tags theme:troubleshooting theme:cli theme:plugins Log Should Contain tedge-agent.log Starting tedge-agent.service Log Should Contain tedge-mapper-c8y.log Starting tedge-mapper-c8y.service - Log Should Contain tedge-mapper-collectd.log Starting tedge-mapper-collectd.service Log Should Contain tedge-config-list.log c8y.url Log Should Contain mappers/c8y/mapper.toml url Log Should Contain mappers/tb/mapper.toml mqtt.azure.com @@ -106,7 +104,6 @@ Log Should Contain Custom Suite Setup Setup Execute Command mkdir -p /results - Start Service tedge-mapper-collectd Execute Command mkdir -p /etc/tedge/mappers/tb Execute Command printf 'url \= "mqtt.azure.com:1883"\n' > /etc/tedge/mappers/tb/mapper.toml Execute Command tedge diag collect --keep-dir --output-dir /results --name test From 509369ac598a4629e3c2a25b5aa91fdfdbf56c00 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 21 Apr 2026 16:31:25 +0200 Subject: [PATCH 08/11] docs: remove references to tedge-mapper-collectd The following sections have still to be deeply rewritten: - Getting Started / A tour of thin-edge.io / Monitor the device - Getting Started / Monitoring - Operate Devices / Troubleshooting / Device Monitoring Signed-off-by: Didier Wenzek --- .../configuration/mosquitto-configuration.md | 38 ++++--------------- .../operate/monitoring/systemd-watchdog.md | 2 +- docs/src/operate/troubleshooting/log-files.md | 16 +------- .../monitoring-service-health.md | 1 - docs/src/references/mappers/mqtt-topics.md | 16 -------- docs/src/references/supported-platforms.md | 1 - 6 files changed, 10 insertions(+), 64 deletions(-) diff --git a/docs/src/operate/configuration/mosquitto-configuration.md b/docs/src/operate/configuration/mosquitto-configuration.md index 16c76520aa7..c7559e0ff8f 100644 --- a/docs/src/operate/configuration/mosquitto-configuration.md +++ b/docs/src/operate/configuration/mosquitto-configuration.md @@ -80,36 +80,14 @@ tedge connect aws This will configure all the services (mosquitto, tedge-mapper-c8y.service, tedge-mapper-az.service, tedge-mapper-aws.service, tedge-agent.service) to use the newly set port and the bind address. -## Common Errors +### Step 4: Restart the local services using MQTT -The below example shows that we cannot set a string value for the port number. +After changing the mqtt port and host, all the services using MQTT have to be restarted. -```sh -tedge config set mqtt.bind.port '"1234"' -``` - -```text title="Output" -Error: failed to set the configuration key: mqtt.bind.port with value: "1234". - -Caused by: - Conversion from String failed -``` - -## Updating the mqtt port and bind address (host) in collectd and for collectd-mapper - -Update the `collectd.conf` with the new port and host in ``. - -Then, restart the collectd service. +- For the Cumulocity, Azure and AWS mappers, this is done running the appropriate `tedge connect` command. +- For user-configured mapper, this has to be done manually, for instance by restarting `tedge-mapper-local`. + ```sh + sudo systemctl restart tedge-mapper-local + ``` +- For other services, say `collectd`, please refer to their documentation to update their configuration and restart the service. -```sh -sudo systemctl restart collectd -``` - -After changing the mqtt port and host, then connect to the cloud using `tedge connect c8y/az`. -Then (Steps 1-3) the collectd-mapper has to be restarted to use the newly set port and bind address (host). - -Restart the tedge-mapper-collectd service. - -```sh -sudo systemctl restart tedge-mapper-collectd -``` diff --git a/docs/src/operate/monitoring/systemd-watchdog.md b/docs/src/operate/monitoring/systemd-watchdog.md index 00d1587ac31..4eceb8a728a 100644 --- a/docs/src/operate/monitoring/systemd-watchdog.md +++ b/docs/src/operate/monitoring/systemd-watchdog.md @@ -18,7 +18,7 @@ This document describes how the systemd watchdog mechanism can be enabled for %% ## Enabling the systemd watchdog feature for a tedge service -Enabling systemd watchdog for a %%te%% service (tedge-agent, tedge-mapper-c8y/az/collectd) is a two-step process. +Enabling systemd watchdog for a %%te%% service (tedge-agent, tedge-mapper-c8y/az) is a two-step process. ### Step 1: Enable the watchdog feature in the systemd service file diff --git a/docs/src/operate/troubleshooting/log-files.md b/docs/src/operate/troubleshooting/log-files.md index 5a3c63912e5..c8f1279e912 100644 --- a/docs/src/operate/troubleshooting/log-files.md +++ b/docs/src/operate/troubleshooting/log-files.md @@ -47,21 +47,7 @@ journalctl -u tedge-mapper-aws ``` :::note -Run `tedge_mapper --debug aws` to log more debug messages -::: - -### Device monitoring logs {#device-logs} -The %%te%% device monitoring component logs can be found as below - -#### Collectd mapper logs {#collectd-mapper} -The log messages of the collectd mapper that sends the monitoring data to the cloud can be accessed as below - -```sh -journalctl -u tedge-mapper-collectd -``` - -:::note -Run `tedge-mapper --debug collectd` to log more debug messages +Run `tedge-mapper --debug aws` to log more debug messages ::: ### Software Management logs {#software-management} diff --git a/docs/src/operate/troubleshooting/monitoring-service-health.md b/docs/src/operate/troubleshooting/monitoring-service-health.md index cee97d4170f..5dd06734e9e 100644 --- a/docs/src/operate/troubleshooting/monitoring-service-health.md +++ b/docs/src/operate/troubleshooting/monitoring-service-health.md @@ -62,7 +62,6 @@ The following endpoints are currently supported: * `te/device/main/service/tedge-mapper-c8y/status/health` * `te/device/main/service/tedge-mapper-az/status/health` * `te/device/main/service/tedge-mapper-aws/status/health` -* `te/device/main/service/tedge-mapper-collectd/status/health` All future tedge services will also follow the same topic naming scheme convention. diff --git a/docs/src/references/mappers/mqtt-topics.md b/docs/src/references/mappers/mqtt-topics.md index a06f6f6249a..55d708d9a21 100644 --- a/docs/src/references/mappers/mqtt-topics.md +++ b/docs/src/references/mappers/mqtt-topics.md @@ -82,19 +82,3 @@ The AWS topics are prefixed by `aws/`. * `aws/shadow/#` Use this topic to interact with unnamed and named shadows of the device. It's mapped to `$aws/things/{device_id}/shadow`. -## Collectd topics - -When the [device monitoring feature is enabled](../../start/device-monitoring.md), -monitoring metrics are emitted by `collectd` on a hierarchy of MQTT topics. - -* `collectd/$HOSTNAME/#` - All the metrics collected on the device (which hostname is `$HOSTNAME`). -* `collectd/$HOSTNAME/$PLUGIN/#` - All the metrics collected by a given collectd plugin, named `$PLUGIN`. -* `collectd/$HOSTNAME/$PLUGIN/$METRIC` - The topic for a given metric, named `$METRIC`. - All the measurements are published as a pair of a Unix timestamp in milliseconds and a numeric value - in the format `$TIMESTAMP:$VALUE`. For example, `1623155717:98.6`. - -The `collectd-mapper` daemon process ingests these measurements and emits translated messages -to the measurement topic. - -* This process groups the atomic measurements that have been received during the same time-window (currently 200 ms) -* and produces a single %%te%% JSON for the whole group of measurements. diff --git a/docs/src/references/supported-platforms.md b/docs/src/references/supported-platforms.md index 701d9361ff4..fe6b496df6a 100644 --- a/docs/src/references/supported-platforms.md +++ b/docs/src/references/supported-platforms.md @@ -90,7 +90,6 @@ In this scenario, all of the %%te%% components are running on the gateway device |Name|Typical Memory Usage (MiB)| |--|--| |tedge-mapper c8y (Cumulocity)|8| -|tedge-mapper collectd |8| |tedge-agent|8| |mosquitto|10| |**Total**|34| From 414d387e68d3ac9bd296a8220011e4798a04bedb Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Tue, 26 May 2026 20:41:12 +0200 Subject: [PATCH 09/11] fully deprecate common batcher Signed-off-by: Didier Wenzek --- Cargo.lock | 12 - Cargo.toml | 1 - crates/common/batcher/Cargo.toml | 22 - crates/common/batcher/src/batch.rs | 177 ------- crates/common/batcher/src/batchable.rs | 18 - crates/common/batcher/src/batcher.rs | 644 ------------------------- crates/common/batcher/src/config.rs | 117 ----- crates/common/batcher/src/flows.rs | 210 -------- crates/common/batcher/src/lib.rs | 16 - crates/core/tedge_mapper/Cargo.toml | 1 - crates/core/tedge_mapper/src/lib.rs | 2 - 11 files changed, 1220 deletions(-) delete mode 100644 crates/common/batcher/Cargo.toml delete mode 100644 crates/common/batcher/src/batch.rs delete mode 100644 crates/common/batcher/src/batchable.rs delete mode 100644 crates/common/batcher/src/batcher.rs delete mode 100644 crates/common/batcher/src/config.rs delete mode 100644 crates/common/batcher/src/flows.rs delete mode 100644 crates/common/batcher/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index d8ea767de17..6c831f5dd1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -673,17 +673,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "batcher" -version = "2.0.1" -dependencies = [ - "async-trait", - "serde_json", - "tedge_flows", - "time", - "tokio", -] - [[package]] name = "bindgen" version = "0.72.1" @@ -5025,7 +5014,6 @@ dependencies = [ "async-trait", "aws_mapper_ext", "az_mapper_ext", - "batcher", "c8y_api", "c8y_auth_proxy", "c8y_mapper_ext", diff --git a/Cargo.toml b/Cargo.toml index cdff9f912f5..f7e4d9d1897 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ repository = "https://github.com/thin-edge/thin-edge.io" aws_mapper_ext = { path = "crates/extensions/aws_mapper_ext" } axum_tls = { path = "crates/common/axum_tls" } az_mapper_ext = { path = "crates/extensions/az_mapper_ext" } -batcher = { path = "crates/common/batcher" } c8y-firmware-plugin = { path = "plugins/c8y_firmware_plugin" } c8y-remote-access-plugin = { path = "plugins/c8y_remote_access_plugin" } c8y_api = { path = "crates/core/c8y_api" } diff --git a/crates/common/batcher/Cargo.toml b/crates/common/batcher/Cargo.toml deleted file mode 100644 index dc277fe9cc5..00000000000 --- a/crates/common/batcher/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "batcher" -version = { workspace = true } -authors = { workspace = true } -edition = { workspace = true } -rust-version = { workspace = true } -license = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } - -[dependencies] -async-trait = { workspace = true } -serde_json = { workspace = true } -tedge_flows = { workspace = true } -time = { workspace = true } -tokio = { workspace = true, features = ["time"] } - -[dev-dependencies] -tokio = { workspace = true, features = ["rt", "macros"] } - -[lints] -workspace = true diff --git a/crates/common/batcher/src/batch.rs b/crates/common/batcher/src/batch.rs deleted file mode 100644 index 8e7bdb9361d..00000000000 --- a/crates/common/batcher/src/batch.rs +++ /dev/null @@ -1,177 +0,0 @@ -use crate::batchable::Batchable; -use std::collections::HashMap; -use std::iter::once; -use time::OffsetDateTime; - -#[must_use] -#[derive(Debug)] -pub enum BatchAdd { - Added, - Duplicate, - Split(Batch), -} - -#[derive(Debug)] -pub struct Batch { - batch_start: OffsetDateTime, - batch_end: OffsetDateTime, - events: HashMap, -} - -impl Batch { - pub fn new(batch_start: OffsetDateTime, batch_end: OffsetDateTime, event: B) -> Batch { - let mut events = HashMap::new(); - events.insert(event.key(), event); - - Batch { - batch_start, - batch_end, - events, - } - } - - pub fn batch_start(&self) -> OffsetDateTime { - self.batch_start - } - - pub fn batch_end(&self) -> OffsetDateTime { - self.batch_end - } - - pub fn add(&mut self, event: B) -> BatchAdd { - let key = event.key(); - if let Some(existing_event) = self.events.get(&key) { - let existing_event_time = existing_event.event_time(); - - if event.event_time() == existing_event_time { - return BatchAdd::Duplicate; - } - - return BatchAdd::Split(self.split(existing_event_time, event)); - } - - self.events.insert(key, event); - - BatchAdd::Added - } - - fn split(&mut self, existing_event_time: OffsetDateTime, event: B) -> Batch { - let split_point = midpoint(existing_event_time, event.event_time()); - - let mut new_batch_events = HashMap::new(); - let new_batch_end = self.batch_end; - - let all_events = std::mem::take(&mut self.events); - self.batch_end = split_point; - - // Go over all the events in this batch plus the new event and allocate them, - // either the existing batch or the new batch. - for event in all_events.into_values().chain(once(event)) { - let event_time = event.event_time(); - - if event_time < split_point { - self.events.insert(event.key(), event); - } else { - new_batch_events.insert(event.key(), event); - } - } - - Batch { - batch_start: split_point, - batch_end: new_batch_end, - events: new_batch_events, - } - } - - pub fn into_vec(self) -> Vec { - self.events.into_values().collect() - } -} - -fn midpoint(event_time1: OffsetDateTime, event_time2: OffsetDateTime) -> OffsetDateTime { - let gap = event_time1 - event_time2; - event_time2 + gap / 2 -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn add() { - let batch_start = OffsetDateTime::from_unix_timestamp(0).unwrap(); - let batch_end = OffsetDateTime::from_unix_timestamp(100).unwrap(); - let event1 = TestBatchEvent::new(1, 40); - let event2 = TestBatchEvent::new(2, 60); - - let mut batch = Batch::new(batch_start, batch_end, event1.clone()); - assert!(matches!(batch.add(event2.clone()), BatchAdd::Added)); - - let result = batch.into_vec(); - assert_eq!(result.len(), 2); - assert!(result.contains(&event1)); - assert!(result.contains(&event2)); - } - - #[test] - fn split() { - let batch_start = OffsetDateTime::from_unix_timestamp(0).unwrap(); - let batch_end = OffsetDateTime::from_unix_timestamp(100).unwrap(); - let event1 = TestBatchEvent::new(1, 40); - let event2 = TestBatchEvent::new(1, 60); - - let mut batch1 = Batch::new(batch_start, batch_end, event1.clone()); - match batch1.add(event2.clone()) { - BatchAdd::Split(batch2) => { - let result1 = batch1.into_vec(); - assert_eq!(result1.len(), 1); - assert!(result1.contains(&event1)); - - let result2 = batch2.into_vec(); - assert_eq!(result2.len(), 1); - assert!(result2.contains(&event2)); - } - _ => panic!("Expected split"), - } - } - - #[test] - fn duplicate() { - let batch_start = OffsetDateTime::from_unix_timestamp(0).unwrap(); - let batch_end = OffsetDateTime::from_unix_timestamp(100).unwrap(); - let event1 = TestBatchEvent::new(1, 40); - let event2 = TestBatchEvent::new(1, 40); - - let mut batch = Batch::new(batch_start, batch_end, event1.clone()); - assert!(matches!(batch.add(event2), BatchAdd::Duplicate)); - - let result = batch.into_vec(); - assert_eq!(result.len(), 1); - assert!(result.contains(&event1)); - } - - #[derive(Debug, Clone, Eq, PartialEq)] - struct TestBatchEvent { - key: u64, - event_time: OffsetDateTime, - } - - impl TestBatchEvent { - fn new(key: u64, event_time: i64) -> TestBatchEvent { - let event_time = OffsetDateTime::from_unix_timestamp(event_time).unwrap(); - TestBatchEvent { key, event_time } - } - } - - impl Batchable for TestBatchEvent { - type Key = u64; - - fn key(&self) -> Self::Key { - self.key - } - - fn event_time(&self) -> OffsetDateTime { - self.event_time - } - } -} diff --git a/crates/common/batcher/src/batchable.rs b/crates/common/batcher/src/batchable.rs deleted file mode 100644 index 02ff0266a2a..00000000000 --- a/crates/common/batcher/src/batchable.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::fmt::Debug; -use std::hash::Hash; -use time::OffsetDateTime; - -/// Implement this interface for the items that you want batched. -/// -/// No items with the same key will go in the same batch. -/// The event_time of the item will determine how items are grouped, -/// dependent on how the batcher is configured. -pub trait Batchable: 'static + Debug + Send + Sync { - type Key: Eq + Hash + Debug + Send + Sync; - - /// Define the uniqueness within a batch. - fn key(&self) -> Self::Key; - - /// The time at which this item was created. This time is used to group items into a batch. - fn event_time(&self) -> OffsetDateTime; -} diff --git a/crates/common/batcher/src/batcher.rs b/crates/common/batcher/src/batcher.rs deleted file mode 100644 index 1132772dc96..00000000000 --- a/crates/common/batcher/src/batcher.rs +++ /dev/null @@ -1,644 +0,0 @@ -use crate::batch::Batch; -use crate::batch::BatchAdd; -use crate::batchable::Batchable; -use crate::config::BatchConfig; -use crate::BatchConfigBuilder; -use time::OffsetDateTime; - -#[derive(Debug, Eq, PartialEq)] -pub(crate) enum BatcherOutput { - Batch(Vec), - Timer(OffsetDateTime), -} - -/// Provides the core implementation of the batching algorithm. -#[derive(Debug)] -pub struct Batcher { - config: BatchConfig, - batches: Vec>, -} - -impl Default for Batcher { - fn default() -> Self { - let batch_config = BatchConfigBuilder::new() - .event_jitter(500) - .delivery_jitter(400) // Heuristic delay that should work out well on a Rpi - .message_leap_limit(500) - .build(); - Batcher::new(batch_config) - } -} - -impl Batcher { - /// Create a Batcher with the specified config. - pub fn new(config: BatchConfig) -> Batcher { - Batcher { - config, - batches: vec![], - } - } - - pub(crate) fn event( - &mut self, - processing_time: OffsetDateTime, - event: B, - ) -> Vec> { - let event_time = event.event_time(); - - if event_time < processing_time - self.config.delivery_jitter() { - // Discard event because it is too old - return vec![]; - } - - if event_time > processing_time + self.config.message_leap_limit() { - // Discard event because it is too futuristic - return vec![]; - } - - match self.find_target_batch(event_time) { - None => { - let new_batch = self.make_new_batch(event); - let new_batch_end = new_batch.batch_end(); - self.batches.push(new_batch); - self.output_for_batch_end(processing_time, new_batch_end) - } - Some(target_batch) => match target_batch.add(event) { - BatchAdd::Added => vec![], - BatchAdd::Duplicate => vec![], - BatchAdd::Split(new_batch) => { - let split_batch_end = target_batch.batch_end(); - self.batches.push(new_batch); - self.output_for_batch_end(processing_time, split_batch_end) - } - }, - } - } - - fn output_for_batch_end( - &mut self, - processing_time: OffsetDateTime, - batch_end: OffsetDateTime, - ) -> Vec> { - let batch_timeout = batch_end + self.config.delivery_jitter(); - if processing_time < batch_timeout { - vec![BatcherOutput::Timer(batch_timeout)] - } else { - self.time(processing_time) - .into_iter() - .map(BatcherOutput::Batch) - .collect() - } - } - - pub(crate) fn time(&mut self, time: OffsetDateTime) -> Vec> { - let batches = std::mem::take(&mut self.batches); - - let (open_batches, closed_batches) = batches - .into_iter() - .partition(|batch| self.is_open(batch, time)); - - self.batches = open_batches; - - closed_batches - .into_iter() - .map(|batch| batch.into_vec()) - .collect() - } - - fn is_open(&self, batch: &Batch, time: OffsetDateTime) -> bool { - batch.batch_end() + self.config.delivery_jitter() > time - } - - #[cfg(test)] - pub(crate) fn flush(&mut self) -> Vec> { - let mut batches = Vec::with_capacity(self.batches.len()); - - while let Some(batch) = self.batches.pop() { - batches.push(batch.into_vec()) - } - - batches - } - - fn find_target_batch(&mut self, event_time: OffsetDateTime) -> Option<&mut Batch> { - self.batches - .iter_mut() - .find(|batch| batch.batch_start() <= event_time && event_time <= batch.batch_end()) - } - - fn make_new_batch(&self, event: B) -> Batch { - let event_time = event.event_time(); - let mut batch_start = event_time; - let mut batch_end = batch_start + self.config.event_jitter(); - - if let Some(previous_batch) = self.previous_batch(event_time) { - batch_start = batch_start.max(previous_batch.batch_end()) - } - if let Some(next_batch) = self.next_batch(event_time) { - batch_end = batch_end.min(next_batch.batch_start()) - } - - Batch::new(batch_start, batch_end, event) - } - - fn previous_batch(&self, event_time: OffsetDateTime) -> Option<&Batch> { - self.batches - .iter() - .filter(|batch| batch.batch_end() < event_time) - .max_by(|batch1, batch2| batch1.batch_end().cmp(&batch2.batch_end())) - } - - fn next_batch(&self, event_time: OffsetDateTime) -> Option<&Batch> { - self.batches - .iter() - .filter(|batch| batch.batch_start() > event_time) - .min_by(|batch1, batch2| batch1.batch_start().cmp(&batch2.batch_start())) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::batchable::Batchable; - use crate::config::BatchConfigBuilder; - use std::collections::BTreeMap; - use time::Duration; - - #[test] - fn single_event_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - - test.event(1, &event1); - test.expect_batch(70, vec![event1]); - - test.run(); - } - - #[test] - fn multi_event_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "b", 2); - - test.event(1, &event1); - test.event(11, &event2); - test.expect_batch(70, vec![event1, event2]); - - test.run(); - } - - #[test] - // The same behavior as for `multi_event_batch` is expected - // Since we just change how long we wait for an event - fn multi_event_batch_with_long_delivery_jitter() { - let mut test = BatcherTest::new(50, 50, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "b", 2); - - test.event(1, &event1); - test.event(11, &event2); - test.expect_batch(100, vec![event1, event2]); - - test.run(); - } - - #[test] - fn multi_event_batch_with_long_delivery_jitter_and_delayed_message() { - let mut test = BatcherTest::new(50, 50, 0); - - let event1 = test.create_event(5, "a", 2); - let event2 = test.create_event(10, "b", 1); - - test.event(11, &event2); - test.event(25, &event1); // late, but not too late - - test.expect_batch(60, vec![event1]); - test.expect_batch(110, vec![event2]); - - test.run(); - } - - #[test] - fn split_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "a", 2); - - test.event(1, &event1); - test.event(11, &event2); - test.expect_batch(25, vec![event1]); // why 25? - test.expect_batch(70, vec![event2]); - - test.run(); - } - - #[test] - fn allocate_to_earlier_split_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "a", 2); - let event3 = test.create_event(2, "b", 3); - - test.event(1, &event1); - test.event(11, &event2); - test.event(12, &event3); - test.expect_batch(25, vec![event1, event3]); - test.expect_batch(70, vec![event2]); - - test.run(); - } - - #[test] - fn allocate_to_later_split_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "a", 2); - let event3 = test.create_event(9, "b", 3); - - test.event(1, &event1); - test.event(11, &event2); - test.event(12, &event3); - test.expect_batch(25, vec![event1]); - test.expect_batch(70, vec![event2, event3]); - - test.run(); - } - - #[test] - fn flush_no_batches() { - let mut test = BatcherTest::new(50, 20, 0); - test.flush(100); - test.run(); - } - - #[test] - fn flush_one_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "b", 2); - - test.event(1, &event1); - test.event(11, &event2); - test.flush(20); - test.expect_batch(20, vec![event1, event2]); - - test.run(); - } - - #[test] - fn flush_two_batches() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(3, "b", 2); - let event3 = test.create_event(10, "a", 3); - - test.event(1, &event1); - test.event(4, &event2); - test.event(11, &event3); - test.flush(20); - test.expect_batch(20, vec![event1, event2]); - test.expect_batch(20, vec![event3]); - - test.run(); - } - - // The following tests are taken from the diagrams on the specification: - // https://github.com/albinsuresh/thin-edge.io-specs/blob/main/src/telemetry-data/message-batching/message-batching.md - - #[test] - fn simple_batching_with_batching_window() { - let mut test = BatcherTest::new(50, 20, 0); - - let a = test.create_event(115, "a", 1); - let b = test.create_event(120, "b", 2); - let c = test.create_event(145, "c", 3); - let d = test.create_event(160, "d", 4); - let e = test.create_event(175, "e", 5); - let f = test.create_event(215, "f", 6); - let g = test.create_event(240, "g", 7); - - test.event(125, &b); - test.event(135, &a); // order inversion - test.event(150, &c); - test.event(165, &d); - test.event(189, &e); - test.event(250, &g); - test.event(260, &f); // too late - test.expect_batch(140, vec![a]); - test.expect_batch(190, vec![b, c, d]); - test.expect_batch(245, vec![e]); - test.expect_batch(310, vec![g]); - - test.run(); - } - - #[test] - fn simple_batching_with_batching_timeout() { - let mut test = BatcherTest::new(50, 20, 0); - - let a = test.create_event(120, "a", 1); - let b = test.create_event(130, "b", 2); - let c = test.create_event(145, "c", 3); - let d = test.create_event(180, "d", 4); - let e = test.create_event(190, "e", 5); - - test.event(130, &a); - test.event(140, &b); - test.event(150, &c); - test.event(189, &d); - test.event(210, &e); - test.expect_batch(190, vec![a, b, c]); - test.expect_batch(250, vec![d, e]); - - test.run(); - } - - #[test] - fn batch_split_due_to_conflicting_measurements() { - let mut test = BatcherTest::new(50, 20, 0); - - let a1 = test.create_event(120, "a", 1); - let b1 = test.create_event(125, "b", 2); - let a2 = test.create_event(140, "a", 3); - let c1 = test.create_event(150, "c", 4); - let a3 = test.create_event(170, "a", 5); - - test.event(125, &a1); - test.event(140, &b1); - test.event(150, &a2); - test.event(170, &c1); - test.event(180, &a3); - test.expect_batch(150, vec![a1, b1]); - test.expect_batch(180, vec![a2, c1]); - test.expect_batch(190, vec![a3]); - - test.run(); - } - - #[test] - fn receiving_older_already_batched_messages_after_starting_a_new_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let a = test.create_event(120, "a", 1); - let b = test.create_event(130, "b", 2); - let c = test.create_event(140, "c", 3); - let d = test.create_event(190, "d", 4); - let e = test.create_event(210, "e", 5); - - test.event(130, &a); - test.event(140, &b); - test.event(150, &c); - test.event(160, &c); - test.event(175, &c); - test.event(210, &d); - test.event(220, &c); - test.event(230, &e); - test.expect_batch(190, vec![a, b, c]); - test.expect_batch(260, vec![d, e]); - - test.run(); - } - - #[test] - fn receiving_older_unbatched_messages_after_starting_a_new_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let a1 = test.create_event(120, "a", 1); - let b1 = test.create_event(130, "b", 2); - let c1 = test.create_event(140, "c", 3); - let d1 = test.create_event(145, "d", 4); - let a2 = test.create_event(180, "a", 5); - let b2 = test.create_event(200, "b", 6); - - test.event(130, &a1); - test.event(140, &b1); - test.event(150, &c1); - test.event(189, &a2); - test.event(205, &b2); - test.event(215, &d1); - test.expect_batch(190, vec![a1, b1, c1]); - test.expect_batch(250, vec![a2, b2]); - - test.run(); - } - - #[derive(Debug, Clone, Eq, PartialEq)] - struct TestBatchEvent { - event_time: OffsetDateTime, - key: String, - value: u64, - } - - impl Batchable for TestBatchEvent { - type Key = String; - - fn key(&self) -> Self::Key { - self.key.clone() - } - - fn event_time(&self) -> OffsetDateTime { - self.event_time - } - } - - #[derive(Debug)] - enum EventOrTimer { - Event(TestBatchEvent), - Timer(), - } - - struct BatcherTest { - start_time: OffsetDateTime, - batcher: Batcher, - inputs: BTreeMap, - flush_time: Option, - expected_batches: BTreeMap>>, - } - - impl BatcherTest { - fn new(event_jitter: u32, delivery_jitter: u32, message_leap_limit: u32) -> BatcherTest { - let batcher_config = BatchConfigBuilder::new() - .event_jitter(event_jitter) - .delivery_jitter(delivery_jitter) - .message_leap_limit(message_leap_limit) - .build(); - - let start_time = OffsetDateTime::from_unix_timestamp(0).unwrap(); - let batcher = Batcher::new(batcher_config); - - BatcherTest { - start_time, - batcher, - inputs: BTreeMap::new(), - flush_time: None, - expected_batches: BTreeMap::new(), - } - } - - fn create_event(&mut self, event_time: i64, key: &str, value: u64) -> TestBatchEvent { - let event_time = self.create_instant(event_time); - let key = key.into(); - TestBatchEvent { - event_time, - key, - value, - } - } - - fn event(&mut self, processed_time: i64, event: &TestBatchEvent) { - let processed_time = self.create_instant(processed_time); - if let Some(_existing) = self - .inputs - .insert(processed_time, EventOrTimer::Event(event.clone())) - { - panic!("Two events with same processing time") - } - } - - fn flush(&mut self, flush_time: i64) { - self.flush_time = Some(self.create_instant(flush_time)); - } - - fn expect_batch(&mut self, batch_close_time: i64, batch: Vec) { - let batch_close_time = self.create_instant(batch_close_time); - let batches_at_time = self.expected_batches.entry(batch_close_time).or_default(); - batches_at_time.push(batch); - } - - fn run(mut self) { - let mut actual_batches = BTreeMap::new(); - - if let Some(flush_time) = self.flush_time { - if !self.inputs.split_off(&flush_time).is_empty() { - panic!("Flush must be the last test action"); - } - } - - while let Some((t, action)) = pop_first(&mut self.inputs) { - match action { - EventOrTimer::Event(event) => { - let outputs = self.batcher.event(t, event); - self.handle_outputs(t, outputs, &mut actual_batches, self.flush_time); - } - EventOrTimer::Timer() => { - actual_batches.insert(t, self.batcher.time(t)); - } - }; - } - - if let Some(t) = self.flush_time { - let batches = self.batcher.flush(); - if !batches.is_empty() { - actual_batches.insert(t, batches); - } - } - - verify(self.expected_batches, actual_batches); - } - - fn handle_outputs( - &mut self, - t: OffsetDateTime, - outputs: Vec>, - all_batches: &mut BTreeMap>>, - flush_time: Option, - ) { - let mut batches = vec![]; - - for output in outputs { - match output { - BatcherOutput::Batch(batch) => batches.push(batch), - BatcherOutput::Timer(timer) => { - if timer <= t { - panic!( - "Batcher requested non-future timer. Input: {}, timer: {}", - t, timer - ); - } - let add_timer = match flush_time { - None => true, - Some(flush_time) => timer < flush_time, - }; - if add_timer { - if let Some(existing) = self.inputs.insert(timer, EventOrTimer::Timer()) - { - panic!( - "Timer at the same time as existing event/timer: {}: {:?}", - timer, existing - ); - } - } - } - } - } - - if !batches.is_empty() { - all_batches.insert(t, batches); - } - } - - fn create_instant(&self, time: i64) -> OffsetDateTime { - self.start_time + Duration::milliseconds(time) - } - } - - fn verify( - expected_batches: BTreeMap>>, - mut actual_batches: BTreeMap>>, - ) { - assert_eq!( - actual_batches.keys().collect::>(), - expected_batches.keys().collect::>() - ); - - for (time, timed_expected_batches) in expected_batches { - let mut timed_actual_batches = actual_batches.remove(&time).unwrap(); - - for timed_expected_batch in &timed_expected_batches { - let found = - timed_actual_batches - .iter() - .enumerate() - .find(|(_index, timed_actual_batch)| { - match_batches(timed_actual_batch, timed_expected_batch) - }); - - match found { - None => panic!( - "Failed to match batch @ {}: {:?}", - time, timed_actual_batches - ), - Some((index, _batch)) => timed_actual_batches.remove(index), - }; - } - } - } - - fn match_batches(batch1: &[TestBatchEvent], batch2: &[TestBatchEvent]) -> bool { - if batch1.len() != batch2.len() { - return false; - } - - for event in batch1 { - if !batch2.contains(event) { - return false; - } - } - - true - } - - fn pop_first(map: &mut BTreeMap) -> Option<(K, V)> { - let (&key, _value) = map.iter().next()?; - map.remove_entry(&key) - } -} diff --git a/crates/common/batcher/src/config.rs b/crates/common/batcher/src/config.rs deleted file mode 100644 index 9c70279ab54..00000000000 --- a/crates/common/batcher/src/config.rs +++ /dev/null @@ -1,117 +0,0 @@ -use time::Duration; - -/// The parameters for the batching process. -#[derive(Debug, Clone)] -pub struct BatchConfig { - event_jitter: Duration, - delivery_jitter: Duration, - message_leap_limit: Duration, -} - -impl BatchConfig { - /// Get the largest expected variation in event times. - pub fn event_jitter(&self) -> Duration { - self.event_jitter - } - - /// Get the largest expected variation in delivery times. - pub fn delivery_jitter(&self) -> Duration { - self.delivery_jitter - } - - /// Get the largest expected time discontinuity. - pub fn message_leap_limit(&self) -> Duration { - self.message_leap_limit - } -} - -/// Used to configure the parameters for batching. Start here. -#[derive(Debug, Default)] -pub struct BatchConfigBuilder {} - -impl BatchConfigBuilder { - /// Start configuring the batching parameters. - pub fn new() -> BatchConfigBuilder { - BatchConfigBuilder {} - } - - /// Set the largest expected variation in event times, in milliseconds. - pub fn event_jitter(self, event_jitter: u32) -> EventBatchConfigBuilder { - EventBatchConfigBuilder { event_jitter } - } -} - -/// Used to configure the parameters for batching. -#[derive(Debug)] -pub struct EventBatchConfigBuilder { - event_jitter: u32, -} - -impl EventBatchConfigBuilder { - /// Set the largest expected variation in delivery times, in milliseconds. - pub fn delivery_jitter(self, delivery_jitter: u32) -> DeliveryBatchConfigBuilder { - DeliveryBatchConfigBuilder { - event_jitter: self.event_jitter, - delivery_jitter, - } - } -} - -/// Used to configure the parameters for batching. -#[derive(Debug)] -pub struct DeliveryBatchConfigBuilder { - event_jitter: u32, - delivery_jitter: u32, -} - -impl DeliveryBatchConfigBuilder { - /// Set the largest expected time discontinuity, in milliseconds. - pub fn message_leap_limit(self, message_leap_limit: u32) -> BuildableBatchConfigBuilder { - BuildableBatchConfigBuilder { - event_jitter: self.event_jitter, - delivery_jitter: self.delivery_jitter, - message_leap_limit, - } - } -} - -/// Used to configure the parameters for batching. -#[derive(Debug)] -pub struct BuildableBatchConfigBuilder { - event_jitter: u32, - delivery_jitter: u32, - message_leap_limit: u32, -} - -impl BuildableBatchConfigBuilder { - /// Finalise the batching parameters. - pub fn build(self) -> BatchConfig { - let event_jitter = Duration::milliseconds(self.event_jitter as i64); - let delivery_jitter = Duration::milliseconds(self.delivery_jitter as i64); - let message_leap_limit = Duration::milliseconds(self.message_leap_limit as i64); - - BatchConfig { - event_jitter, - delivery_jitter, - message_leap_limit, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn build_config() { - let config = BatchConfigBuilder::new() - .event_jitter(1) - .delivery_jitter(2) - .message_leap_limit(3) - .build(); - - assert_eq!(config.event_jitter(), Duration::milliseconds(1)); - assert_eq!(config.delivery_jitter(), Duration::milliseconds(2)); - assert_eq!(config.message_leap_limit(), Duration::milliseconds(3)); - } -} diff --git a/crates/common/batcher/src/flows.rs b/crates/common/batcher/src/flows.rs deleted file mode 100644 index 80e90ca7f4b..00000000000 --- a/crates/common/batcher/src/flows.rs +++ /dev/null @@ -1,210 +0,0 @@ -use crate::batcher::BatcherOutput; -use crate::Batchable; -use crate::Batcher; -use serde_json::json; -use serde_json::Value; -use std::time::SystemTime; -use tedge_flows::ConfigError; -use tedge_flows::FlowContextHandle; -use tedge_flows::FlowError; -use tedge_flows::JsonValue; -use tedge_flows::Message; -use tedge_flows::Transformer; -use time::OffsetDateTime; - -#[derive(Default)] -pub struct MessageBatcher { - batcher: Batcher, - batch_topic: String, -} - -impl Batchable for Message { - type Key = String; - - fn key(&self) -> Self::Key { - self.topic.clone() - } - - fn event_time(&self) -> OffsetDateTime { - self.timestamp - .map(|t| t.into()) - .unwrap_or_else(OffsetDateTime::now_utc) - } -} - -impl Clone for MessageBatcher { - fn clone(&self) -> MessageBatcher { - MessageBatcher::default() - } -} - -impl Transformer for MessageBatcher { - fn name(&self) -> &str { - "time-window-batcher" - } - - fn set_config(&mut self, config: JsonValue) -> Result<(), ConfigError> { - // TODO: We should expose : event_jitter, delivery_jitter and message_leap_limit - if let Some(topic) = config.string_property("topic") { - self.batch_topic = topic.to_owned(); - } - Ok(()) - } - - fn on_message( - &mut self, - timestamp: SystemTime, - message: &Message, - _context: &FlowContextHandle, - ) -> Result, FlowError> { - let batches = self - .batcher - .event(timestamp.into(), message.clone()) - .into_iter() - .filter_map(|action| match action { - BatcherOutput::Batch(batch) => Some(batch), - BatcherOutput::Timer(_) => None, - }); - self.batch_message_batches(batches) - } - - fn is_periodic(&self) -> bool { - true - } - - fn on_interval( - &mut self, - timestamp: SystemTime, - _context: &FlowContextHandle, - ) -> Result, FlowError> { - let batches = self.batcher.time(timestamp.into()); - self.batch_message_batches(batches) - } -} - -impl MessageBatcher { - /// Build a message from a batch of messages - /// - /// Assume each message payload can be translated to JSON - /// Build a message which payload is a JSON array of all the messages - fn batch_messages(&self, messages: Vec) -> Result { - let mut batch = vec![]; - - for message in messages { - let Some(utf8_payload) = message.payload_str() else { - return Err(FlowError::UnsupportedMessage( - "Cannot batch non UTF-8 message".to_owned(), - )); - }; - let payload: Value = match serde_json::from_str(utf8_payload) { - Ok(payload) => payload, - Err(_) => json!(utf8_payload), - }; - batch.push(json!({ - "topic": message.topic, - "payload": payload, - })) - } - - Ok(Message::new( - self.batch_topic.clone(), - Value::Array(batch).to_string(), - )) - } - - fn batch_message_batches( - &self, - batches: impl IntoIterator>, - ) -> Result, FlowError> { - batches - .into_iter() - .map(|batch| self.batch_messages(batch)) - .collect() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::Duration; - - #[test] - fn single_event_batch() { - let context = FlowContextHandle::default(); - let mut batcher = MessageBatcher::default(); - batcher - .set_config(json!({"topic": "batch"}).into()) - .unwrap(); - - let now = SystemTime::now(); - let msg = Message::with_timestamp("a/b", "42", now); - assert!(batcher.on_message(now, &msg, &context).unwrap().is_empty()); - - let later = now + Duration::from_secs(5); - let batch = batcher.on_interval(later, &context).unwrap(); - assert_batch_eq( - batch, - "batch", - json!([ - {"topic": "a/b", "payload": 42}, - ]), - ); - } - - #[test] - fn multi_event_batch() { - let context = FlowContextHandle::default(); - let mut batcher = MessageBatcher::default(); - batcher - .set_config(json!({"topic": "batch"}).into()) - .unwrap(); - - let now = SystemTime::now(); - let msg = Message::with_timestamp("payload/num", "42", now); - assert!(batcher.on_message(now, &msg, &context).unwrap().is_empty()); - - let later = now + Duration::from_millis(5); - let msg = Message::with_timestamp("payload/string", r#"124|456.789"#, now); - assert!(batcher - .on_message(later, &msg, &context) - .unwrap() - .is_empty()); - - let later = now + Duration::from_millis(10); - let msg = Message::with_timestamp("payload/json", r#"{"foo": "bar"}"#, now); - assert!(batcher - .on_message(later, &msg, &context) - .unwrap() - .is_empty()); - - let later = now + Duration::from_secs(5); - let batch = batcher.on_interval(later, &context).unwrap(); - assert_batch_eq( - batch, - "batch", - json!([ - {"topic": "payload/num", "payload": 42}, - {"topic": "payload/string", "payload": "124|456.789"}, - {"topic": "payload/json", "payload": {"foo": "bar"}}, - ]), - ); - } - - fn assert_batch_eq(batch: Vec, topic: &str, mut expected_payload: Value) { - expected_payload - .as_array_mut() - .unwrap() - .sort_by_key(|msg| msg.get("topic").unwrap().to_string()); - - assert_eq!(batch.len(), 1); - assert_eq!(batch[0].topic, topic); - let mut actual_payload: Value = - serde_json::from_slice(batch[0].payload.as_slice()).unwrap(); - actual_payload - .as_array_mut() - .unwrap() - .sort_by_key(|msg| msg.get("topic").unwrap().to_string()); - - assert_eq!(actual_payload, expected_payload); - } -} diff --git a/crates/common/batcher/src/lib.rs b/crates/common/batcher/src/lib.rs deleted file mode 100644 index 355ce6deb45..00000000000 --- a/crates/common/batcher/src/lib.rs +++ /dev/null @@ -1,16 +0,0 @@ -//! Group together events that are close in time. - -mod batch; -mod batchable; -mod batcher; -mod config; -mod flows; - -pub use crate::batchable::Batchable; -pub use crate::batcher::Batcher; -pub use crate::config::BatchConfig; -pub use crate::config::BatchConfigBuilder; -pub use crate::config::BuildableBatchConfigBuilder; -pub use crate::config::DeliveryBatchConfigBuilder; -pub use crate::config::EventBatchConfigBuilder; -pub use crate::flows::MessageBatcher; diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index ba71829a669..a717b6a840c 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -14,7 +14,6 @@ anyhow = { workspace = true } async-trait = { workspace = true } aws_mapper_ext = { workspace = true, optional = true } az_mapper_ext = { workspace = true, optional = true } -batcher = { workspace = true } c8y_api = { workspace = true, optional = true } c8y_auth_proxy = { workspace = true, optional = true } c8y_mapper_ext = { workspace = true, optional = true } diff --git a/crates/core/tedge_mapper/src/lib.rs b/crates/core/tedge_mapper/src/lib.rs index d3e3e2b6f4e..7e12954e580 100644 --- a/crates/core/tedge_mapper/src/lib.rs +++ b/crates/core/tedge_mapper/src/lib.rs @@ -320,8 +320,6 @@ fn load_builtin_transformers(flows: &mut impl FlowRegistryExt) { az_mapper_ext::load_builtin_transformers(flows); #[cfg(feature = "aws")] aws_mapper_ext::load_builtin_transformers(flows); - - flows.register_builtin(batcher::MessageBatcher::default()); } pub(crate) async fn mapper_flow_registry( From 121eaf1342e3584c37319423954b51ff796f4032 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Thu, 28 May 2026 16:36:44 +0200 Subject: [PATCH 10/11] Docs: removing sections related to the deprecated collectd mapper Signed-off-by: Didier Wenzek --- .../images/collectd-metrics.png | Bin .../monitoring}/device-monitoring.md | 10 +-- .../troubleshooting/device-monitoring.md | 67 -------------- docs/src/operate/troubleshooting/log-files.md | 11 +-- docs/src/references/mappers/flows.md | 3 +- docs/src/start/getting-started.md | 83 ++---------------- 6 files changed, 12 insertions(+), 162 deletions(-) rename docs/src/{start => operate}/images/collectd-metrics.png (100%) rename docs/src/{start => operate/monitoring}/device-monitoring.md (96%) delete mode 100644 docs/src/operate/troubleshooting/device-monitoring.md diff --git a/docs/src/start/images/collectd-metrics.png b/docs/src/operate/images/collectd-metrics.png similarity index 100% rename from docs/src/start/images/collectd-metrics.png rename to docs/src/operate/images/collectd-metrics.png diff --git a/docs/src/start/device-monitoring.md b/docs/src/operate/monitoring/device-monitoring.md similarity index 96% rename from docs/src/start/device-monitoring.md rename to docs/src/operate/monitoring/device-monitoring.md index 026c925c8de..b246824a6cb 100644 --- a/docs/src/start/device-monitoring.md +++ b/docs/src/operate/monitoring/device-monitoring.md @@ -1,7 +1,7 @@ --- title: Monitoring -tags: [Getting Started, Monitoring, Collectd] -sidebar_position: 8 +tags: [Monitoring, Collectd] +sidebar_position: 1 description: Monitoring your device with collectd --- @@ -20,7 +20,7 @@ and then into the [cloud-vendor specific format](../understand/tedge-mapper.md). -![device monitoring with collectd](images/collectd-metrics.png) +![device monitoring with collectd](../images/collectd-metrics.png) @@ -156,7 +156,3 @@ tedge mqtt sub 'c8y/#' [c8y/measurement/measurements/create] {"type": "ThinEdgeMeasurement","time":"2021-06-07T15:40:30.155037451+01:00","cpu":{"percent-active": {"value": 0.753768844221106}},"memory":{"percent-used": {"value": 1.16587699972141}},"df-root":{"percent_bytes-used": {"value": 71.3117904663086}}} [c8y/measurement/measurements/create] {"type": "ThinEdgeMeasurement","time":"2021-06-07T15:40:31.154898577+01:00","cpu":{"percent-active": {"value": 0.5}},"memory":{"percent-used": {"value": 1.16608109197519}}} ``` - -## Troubleshooting - -For troubleshooting tips, check out the [device monitoring](../operate/troubleshooting/device-monitoring.md) section. diff --git a/docs/src/operate/troubleshooting/device-monitoring.md b/docs/src/operate/troubleshooting/device-monitoring.md deleted file mode 100644 index 9d6f3adfbc9..00000000000 --- a/docs/src/operate/troubleshooting/device-monitoring.md +++ /dev/null @@ -1,67 +0,0 @@ ---- -title: Device Monitoring -tags: [Operate, Monitoring] -sidebar_position: 1 -description: How to troubleshoot device monitoring ---- - -To install and configure monitoring on your device, -see the tutorial [Monitor your device with collectd](../../start/device-monitoring.md). - -## Is collectd running? - -```sh -sudo systemctl status collectd -``` - -If not, launch collected - -```sh -sudo systemctl start collectd -``` - -## Is collectd publishing MQTT messages? - -```sh te2mqtt formats=v1 -tedge mqtt sub 'collectd/#' -``` - -If no metrics are collected, please check the [MQTT configuration](../../start/device-monitoring.md#collectd-configuration) - -:::note -The `collectd.conf` file included with %%te%% is configured for conservative interval times, e.g. 10 mins to 1 hour depending on the metric. This is done so that the metrics don't consume unnecessary IoT resources both on the device and in the cloud. If you want to push the metrics more frequently then you will have to adjust the `Interval` settings either globally or on the individual plugins. Make sure you restart the collectd service after making any changes to the configuration. -::: - -## Is the tedge-mapper-collectd running? - -```sh -sudo systemctl status tedge-mapper-collectd -``` - -If not, launch tedge-mapper-collectd.service as below - -```sh -sudo systemctl start tedge-mapper-collectd -``` - -## Are the collectd metrics published in Thin Edge JSON format? - -```sh te2mqtt formats=v1 -tedge mqtt sub 'te/device/main///m/+' -``` - -## Are the collectd metrics published to Cumulocity? - -```sh te2mqtt formats=v1 -tedge mqtt sub 'c8y/#' -``` - -If not see how to [connect a device to Cumulocity](../../start/connect-c8y.md). - -## Are the collectd metrics published to Azure IoT? - -```sh te2mqtt formats=v1 -tedge mqtt sub 'az/#' -``` - -If not see how to [connect a device to Azure IoT](../../start/connect-azure.md). diff --git a/docs/src/operate/troubleshooting/log-files.md b/docs/src/operate/troubleshooting/log-files.md index c8f1279e912..0759755b530 100644 --- a/docs/src/operate/troubleshooting/log-files.md +++ b/docs/src/operate/troubleshooting/log-files.md @@ -77,7 +77,7 @@ Run `tedge-agent --debug` to log more debug messages ::: ## Third-party component logs {#thirdparty} -%%te%% uses the third-party components `Mosquitto` as the mqtt broker and `Collectd` for monitoring purpose. +%%te%% uses the third-party components `Mosquitto` as the mqtt broker. The logs that are created by these components can be accessed on a %%te%% device as below. ### Mosquitto logs {#mosquitto} @@ -89,15 +89,6 @@ The `Mosquitto` logs can be found in `/var/log/mosquitto/mosquitto.log`. Set `log_type debug` or `log_type all` on `/etc/mosquitto/mosquitto.conf`, to capture more debug information. ::: -### Collectd logs {#collectd} -`Collectd` is used for monitoring the resource status of a %%te%% device. -Collectd logs all the messages at `/var/log/syslog`. -So, the collectd specific logs can be accessed using the `journalctl` as below - -```sh -journalctl -u collectd -``` - ## Configuring log levels in %%te%% {#configure-log-levels} The log levels can be configured for %%te%% services using either by command line or setting the required log diff --git a/docs/src/references/mappers/flows.md b/docs/src/references/mappers/flows.md index eda0879be0e..de55513c023 100644 --- a/docs/src/references/mappers/flows.md +++ b/docs/src/references/mappers/flows.md @@ -568,7 +568,8 @@ __Note__ that when the input of a test is received from its stdin, the topic is given using a bracket syntax `[] ` similar to the output of `tedge mqtt sub` and `tedge flows test` itself. -This can be used to chain tests: +This can be used to chain tests. +For example, assuming a flow has been installed to transform `collectd` messages into %%te%% measurements: ```shell $ tedge flows test collectd/mandarine/cpu/percent-active '1754571280.572:2.07156308851224' | tedge flows test diff --git a/docs/src/start/getting-started.md b/docs/src/start/getting-started.md index dba095c6d20..36d4f2fd3fb 100644 --- a/docs/src/start/getting-started.md +++ b/docs/src/start/getting-started.md @@ -49,10 +49,9 @@ This tutorial is divided into small steps. The first three steps are needed to i - [Step 1 Install %%te%%](#step-1-install-thin-edgeio) - [Step 2 Configure and Connect to Cumulocity](#step-2-configure-and-connect-to-cumulocity) - [Step 3 Sending Device Data](#step-3-sending-device-data) -- [Step 4 Monitor the device](#step-4-monitor-the-device) -- [Step 5 Add software management](#step-5-add-software-management) -- [Step 6 Manage configuration files](#step-6-manage-configuration-files) -- [Step 7 Manage Log Files](#step-7-manage-log-files) +- [Step 4 Add software management](#step-5-add-software-management) +- [Step 5 Manage configuration files](#step-6-manage-configuration-files) +- [Step 6 Manage Log Files](#step-7-manage-log-files) ## Step 1 Install %%te%% {#step-1-install-thin-edgeio} @@ -264,77 +263,7 @@ When you go to events (`Device management` → `your device` → `events`) -## Step 4 Monitor the device - -With %%te%% device monitoring, you can collect metrics from the device and forward these device metrics to Cumulocity. - -Device monitoring can be enabled by installing a community package, [tedge-collectd-setup](https://cloudsmith.io/~thinedge/repos/community/packages/?q=name%3A%27%5Etedge-collectd-setup%24%27), which will install [collectd](https://www.collectd.org/) and configure some sensible defaults including monitoring of cpu, memory and disk metrics. - -```sh tab={"label":"Debian/Ubuntu"} -sudo apt-get install tedge-collectd-setup -``` - -```sh tab={"label":"RHEL/Fedora/RockyLinux"} -sudo dnf install tedge-collectd-setup -``` - -```sh tab={"label":"Alpine"} -sudo apk add tedge-collectd-setup -``` - -What you should see by now is that data arrives on the `collectd/#` topics. You can check that via: - -```sh te2mqtt formats=v1 -tedge mqtt sub 'collectd/#' -``` - -The output will be similar like: - -```log title="Output" -INFO: Connected -[collectd/raspberrypi/df-root/percent_bytes-used] 1667205183.407:11.7998857498169 -[collectd/raspberrypi/memory/percent-used] 1667205183.408:4.87045198079293 -[collectd/raspberrypi/cpu/percent-active] 1667205184.398:1.52284263959391 -``` - -:::note -The default collectd settings, `/etc/collectd/collectd.conf`, use conservative interval times, e.g. 10 mins to 1 hour depending on the metric. This is done so that the metrics don't consume unnecessary IoT resources both on the device and in the cloud. If you want to push the metrics more frequently then you will have to adjust the `Interval` settings either globally or on the individual plugins. Make sure you restart the collectd service after making any changes to the configuration. -::: - -The `tedge-mapper-collectd` service subscribes to the `collectd/#` topics and translates them to the tedge payloads, then the respective cloud mappers will translate the %%te%% messages to the format dictated by each cloud. - -As an example, you can inspect the Cumulocity translated metrics using the following command: - -```sh te2mqtt formats=v1 -tedge mqtt sub 'c8y/#' -``` - -The output will be similar like: - -```log title="Output" -INFO: Connected -[c8y/measurement/measurements/create] {"type":"ThinEdgeMeasurement","time":"2022-10-31T08:35:44.398000001Z","cpu":{"percent-active":{"value":1.26262626262626}},"memory":{"percent-used":{"value":4.87024847292786}}} -[c8y/measurement/measurements/create] {"type":"ThinEdgeMeasurement","time":"2022-10-31T08:35:45.398000001Z","memory":{"percent-used":{"value":4.87024847292786}},"cpu":{"percent-active":{"value":1.01522842639594}}} -[c8y/measurement/measurements/create] {"type":"ThinEdgeMeasurement","time":"2022-10-31T08:35:46.398000001Z","memory":{"percent-used":{"value":4.87024847292786}},"cpu":{"percent-active":{"value":0.759493670886076}}} -[c8y/measurement/measurements/create] {"type":"ThinEdgeMeasurement","time":"2022-10-31T08:35:47.398000001Z","memory":{"percent-used":{"value":4.87024847292786}},"cpu":{"percent-active":{"value":2.01005025125628}}} -[c8y/measurement/measurements/create] {"type":"ThinEdgeMeasurement","time":"2022-10-31T08:35:48.398000001Z","memory":{"percent-used":{"value":4.87004496506279}},"cpu":{"percent-active":{"value":0.254452926208651}}} -``` - -The monitoring data will appear in Cumulocity on the device in the measurement section. - - - -![CollectdMeasurements](./images/collectd-metrics.png) - - - - -### Edit Collectd - -To change the monitored data, it is needed to change the collectd.conf. This can be done via Cumulocity, and [step 6](#change-collectd-configuration) explains how to do it. - - -## Step 5 Add software management +## Step 4 Add software management Software management takes care of allowing installation and management of any type of software from Cumulocity. Since the type is generic, any type of software can be managed. In %%te%% this can be extended with plugins. For every software type, a particular plugin is needed. @@ -399,7 +328,7 @@ Find more information about [how to manage the software](https://cumulocity.com/ How to [develop your own plugins](../extend/software-management.md) is described here. -## Step 6 Manage configuration files +## Step 5 Manage configuration files With %%te%% it is possible to manage config files on a device by using the Cumulocity configuration management feature as a part of Device Management. @@ -482,7 +411,7 @@ To change the collectd metrics of the device, which are displayed in Cumulocity, 10. If you then click on get snapshot from device (select the right configuration file in device supported configurations), you will see the change of the configuration file. -## Step 7 Manage Log Files +## Step 6 Manage Log Files With %%te%% it is possible to request log files from a device by using the Cumulocity log request feature as a part of Device Management. From 24ad17cea054a51cd87959ddab8abd3f46fa0338 Mon Sep 17 00:00:00 2001 From: reubenmiller Date: Thu, 4 Jun 2026 11:34:20 +0200 Subject: [PATCH 11/11] support deprecating a service from maintainer scripts Signed-off-by: reubenmiller --- configuration/package_scripts/generate.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/configuration/package_scripts/generate.py b/configuration/package_scripts/generate.py index 1ddc96a7d74..166c0b390bf 100755 --- a/configuration/package_scripts/generate.py +++ b/configuration/package_scripts/generate.py @@ -58,6 +58,11 @@ class Service: stop_on_upgrade: bool = True restart_after_upgrade: bool = True + # a deprecated service should not be automatically enabled or started, + # but it should still be removed on upgrade. + # Eventually the service may be completely removed from the services list in the packages.json file + deprecated: bool = False + def get_template(name, default=""): file = Path(name) @@ -200,20 +205,21 @@ def append_matching_services( # Special case for rpm packages: # By default rpm maintainer scripts restart mark a service to be restarted in the postrm script # unlike debian which does this in the postinst. + # Note: if the service is deprecated, it should still be removed if package_type == "deb": append_matching_services( postrm, "postrm-systemd", lambda x: True, variables ) elif package_type == "rpm": append_matching_services( - postrm, "postrm-systemd", lambda x: service.stop_on_upgrade, variables + postrm, "postrm-systemd", lambda x: service and service.stop_on_upgrade, variables ) ## postinst: restart after upgrade and start append_matching_services( postinst, "postinst-systemd-restart", - lambda x: x.restart_after_upgrade and x.start, + lambda x: x.restart_after_upgrade and x.start and not x.deprecated, { **variables, "RESTART_ACTION": "restart", @@ -224,7 +230,7 @@ def append_matching_services( append_matching_services( postinst, "postinst-systemd-restartnostart", - lambda x: x.restart_after_upgrade and not x.start, + lambda x: x.restart_after_upgrade and not x.start and not x.deprecated, { **variables, "RESTART_ACTION": "try-restart", @@ -235,15 +241,16 @@ def append_matching_services( append_matching_services( postinst, "postinst-systemd-start", - lambda x: not x.restart_after_upgrade and x.start, + lambda x: not x.restart_after_upgrade and x.start and not x.deprecated, variables, ) # prerm: stop_on_upgrade=false or restart_after_upgrade=true + # Note: if the service is deprecated, it should still be removed append_matching_services( prerm, "prerm-systemd-restart", - lambda x: not x.stop_on_upgrade or x.restart_after_upgrade, + lambda x: (not x.stop_on_upgrade or x.restart_after_upgrade), variables, ) @@ -252,7 +259,7 @@ def append_matching_services( prerm, "prerm-systemd", lambda x: not (not x.stop_on_upgrade or x.restart_after_upgrade) - and x.start, + and x.start and not x.deprecated, variables, )