diff --git a/Cargo.lock b/Cargo.lock index ad56bcad075..6c831f5dd1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -673,16 +673,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "batcher" -version = "2.0.1" -dependencies = [ - "async-trait", - "tedge_actors", - "time", - "tokio", -] - [[package]] name = "bindgen" version = "0.72.1" @@ -1195,24 +1185,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" -[[package]] -name = "collectd_ext" -version = "2.0.1" -dependencies = [ - "anyhow", - "assert_matches", - "async-trait", - "batcher", - "clock", - "log", - "tedge_actors", - "tedge_api", - "tedge_mqtt_ext", - "thiserror 2.0.12", - "time", - "tokio", -] - [[package]] name = "colorchoice" version = "1.0.3" @@ -5042,15 +5014,12 @@ dependencies = [ "async-trait", "aws_mapper_ext", "az_mapper_ext", - "batcher", "c8y_api", "c8y_auth_proxy", "c8y_mapper_ext", "camino", "certificate", "clap", - "clock", - "collectd_ext", "flockfile", "mqtt_channel", "rcgen", diff --git a/Cargo.toml b/Cargo.toml index 5413dc5d2da..f7e4d9d1897 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ repository = "https://github.com/thin-edge/thin-edge.io" aws_mapper_ext = { path = "crates/extensions/aws_mapper_ext" } axum_tls = { path = "crates/common/axum_tls" } az_mapper_ext = { path = "crates/extensions/az_mapper_ext" } -batcher = { path = "crates/common/batcher" } c8y-firmware-plugin = { path = "plugins/c8y_firmware_plugin" } c8y-remote-access-plugin = { path = "plugins/c8y_remote_access_plugin" } c8y_api = { path = "crates/core/c8y_api" } @@ -37,7 +36,6 @@ c8y_http_proxy = { path = "crates/extensions/c8y_http_proxy" } c8y_mapper_ext = { path = "crates/extensions/c8y_mapper_ext" } certificate = { path = "crates/common/certificate" } clock = { path = "crates/common/clock" } -collectd_ext = { path = "crates/extensions/collectd_ext" } download = { path = "crates/common/download" } flockfile = { path = "crates/common/flockfile" } json-writer = { path = "crates/common/json_writer" } diff --git a/configuration/init/systemd/tedge-mapper-collectd.service b/configuration/init/systemd/tedge-mapper-collectd.service deleted file mode 100644 index b7193db0746..00000000000 --- a/configuration/init/systemd/tedge-mapper-collectd.service +++ /dev/null @@ -1,14 +0,0 @@ -[Unit] -Description=tedge-mapper-collectd converts Thin Edge JSON measurements to Cumulocity JSON format. -After=syslog.target network.target mosquitto.service - -[Service] -User=tedge -ExecStartPre=+-/usr/bin/tedge init -ExecStart=/usr/bin/tedge-mapper collectd -Restart=on-failure -RestartPreventExitStatus=255 -RestartSec=5 - -[Install] -WantedBy=multi-user.target diff --git a/configuration/package_manifests/nfpm.tedge-mapper.yaml b/configuration/package_manifests/nfpm.tedge-mapper.yaml index bc0f24ba480..8b8fb2f905e 100644 --- a/configuration/package_manifests/nfpm.tedge-mapper.yaml +++ b/configuration/package_manifests/nfpm.tedge-mapper.yaml @@ -132,17 +132,6 @@ contents: mode: 0644 packager: rpm - - src: ./configuration/init/systemd/tedge-mapper-collectd.service - dst: /lib/systemd/system/tedge-mapper-collectd.service - file_info: - mode: 0644 - packager: deb - - src: ./configuration/init/systemd/tedge-mapper-collectd.service - dst: /lib/systemd/system/tedge-mapper-collectd.service - file_info: - mode: 0644 - packager: rpm - - src: ./configuration/init/systemd/tedge-mapper-local.service dst: /lib/systemd/system/tedge-mapper-local.service file_info: @@ -264,4 +253,4 @@ overrides: preinstall: configuration/package_scripts/_generated/tedge-mapper/deb/preinst postinstall: configuration/package_scripts/_generated/tedge-mapper/deb/postinst preremove: configuration/package_scripts/_generated/tedge-mapper/deb/prerm - postremove: configuration/package_scripts/_generated/tedge-mapper/deb/postrm \ No newline at end of file + postremove: configuration/package_scripts/_generated/tedge-mapper/deb/postrm diff --git a/configuration/package_scripts/_generated/tedge-mapper/apk/postinst b/configuration/package_scripts/_generated/tedge-mapper/apk/postinst index b5d579124cc..e553a9a021d 100644 --- a/configuration/package_scripts/_generated/tedge-mapper/apk/postinst +++ b/configuration/package_scripts/_generated/tedge-mapper/apk/postinst @@ -51,12 +51,6 @@ if command -v systemctl >/dev/null; then if [ -f "/etc/tedge/mosquitto-conf/aws-bridge.conf" ]; then enable_start_service tedge-mapper-aws.service fi - if [ -d /run/systemd/system ]; then - ### Enable the service if the collectd is running on the device - if systemctl is-active --quiet collectd.service; then - enable_start_service tedge-mapper-collectd.service - fi - fi fi if [ -f /var/lib/dpkg/info/tedge_mapper.postrm ]; then diff --git a/configuration/package_scripts/_generated/tedge-mapper/apk/postrm b/configuration/package_scripts/_generated/tedge-mapper/apk/postrm index 11dbc79c731..1c018d8f11c 100644 --- a/configuration/package_scripts/_generated/tedge-mapper/apk/postrm +++ b/configuration/package_scripts/_generated/tedge-mapper/apk/postrm @@ -6,8 +6,7 @@ purge_mapper_lock() { /run/lock/tedge-mapper-c8y.lock \ /run/lock/tedge-mapper-az.lock \ /run/lock/tedge-mapper-aws.lock \ - /run/lock/tedge-mapper-local.lock \ - /run/lock/tedge-mapper-collectd.lock + /run/lock/tedge-mapper-local.lock } case "$1" in diff --git a/configuration/package_scripts/_generated/tedge-mapper/deb/postinst b/configuration/package_scripts/_generated/tedge-mapper/deb/postinst index 23e5c8c5d9a..531addb1f41 100755 --- a/configuration/package_scripts/_generated/tedge-mapper/deb/postinst +++ b/configuration/package_scripts/_generated/tedge-mapper/deb/postinst @@ -230,9 +230,9 @@ if [ "$1" = "configure" ] || [ "$1" = "abort-upgrade" ] || [ "$1" = "abort-decon systemctl --system daemon-reload >/dev/null || true if [ -n "$2" ]; then if command -v deb-systemd-invoke >/dev/null 2>&1; then - deb-systemd-invoke try-restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service tedge-mapper-collectd.service >/dev/null || true + deb-systemd-invoke try-restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service >/dev/null || true else - systemctl try-restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service tedge-mapper-collectd.service >/dev/null || true + systemctl try-restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service >/dev/null || true fi fi fi @@ -273,12 +273,6 @@ if command -v systemctl >/dev/null; then if [ -f "/etc/tedge/mosquitto-conf/aws-bridge.conf" ]; then enable_start_service tedge-mapper-aws.service fi - if [ -d /run/systemd/system ]; then - ### Enable the service if the collectd is running on the device - if systemctl is-active --quiet collectd.service; then - enable_start_service tedge-mapper-collectd.service - fi - fi fi if [ -f /var/lib/dpkg/info/tedge_mapper.postrm ]; then diff --git a/configuration/package_scripts/_generated/tedge-mapper/deb/postrm b/configuration/package_scripts/_generated/tedge-mapper/deb/postrm index 0a69c0bfc49..710a02b83cc 100755 --- a/configuration/package_scripts/_generated/tedge-mapper/deb/postrm +++ b/configuration/package_scripts/_generated/tedge-mapper/deb/postrm @@ -6,8 +6,7 @@ purge_mapper_lock() { /run/lock/tedge-mapper-c8y.lock \ /run/lock/tedge-mapper-az.lock \ /run/lock/tedge-mapper-aws.lock \ - /run/lock/tedge-mapper-local.lock \ - /run/lock/tedge-mapper-collectd.lock + /run/lock/tedge-mapper-local.lock } case "$1" in diff --git a/configuration/package_scripts/_generated/tedge-mapper/rpm/postinst b/configuration/package_scripts/_generated/tedge-mapper/rpm/postinst index 09b4a55f156..e464b6bf180 100644 --- a/configuration/package_scripts/_generated/tedge-mapper/rpm/postinst +++ b/configuration/package_scripts/_generated/tedge-mapper/rpm/postinst @@ -78,7 +78,7 @@ fi if [ $1 -eq 2 ]; then if [ -d /run/systemd/system ]; then systemctl --system daemon-reload >/dev/null || true - systemctl restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service tedge-mapper-collectd.service >/dev/null || true + systemctl restart tedge-mapper-aws.service tedge-mapper-az.service tedge-mapper-c8y.service tedge-mapper-local.service >/dev/null || true fi fi # End automatically added section @@ -117,12 +117,6 @@ if command -v systemctl >/dev/null; then if [ -f "/etc/tedge/mosquitto-conf/aws-bridge.conf" ]; then enable_start_service tedge-mapper-aws.service fi - if [ -d /run/systemd/system ]; then - ### Enable the service if the collectd is running on the device - if systemctl is-active --quiet collectd.service; then - enable_start_service tedge-mapper-collectd.service - fi - fi fi if [ -f /var/lib/dpkg/info/tedge_mapper.postrm ]; then diff --git a/configuration/package_scripts/_generated/tedge-mapper/rpm/postrm b/configuration/package_scripts/_generated/tedge-mapper/rpm/postrm index 3dcfd3cf68a..2a9c531e76c 100644 --- a/configuration/package_scripts/_generated/tedge-mapper/rpm/postrm +++ b/configuration/package_scripts/_generated/tedge-mapper/rpm/postrm @@ -6,8 +6,7 @@ purge_mapper_lock() { /run/lock/tedge-mapper-c8y.lock \ /run/lock/tedge-mapper-az.lock \ /run/lock/tedge-mapper-aws.lock \ - /run/lock/tedge-mapper-local.lock \ - /run/lock/tedge-mapper-collectd.lock + /run/lock/tedge-mapper-local.lock } case "$1" in diff --git a/configuration/package_scripts/_generated/tedge/apk/preinst b/configuration/package_scripts/_generated/tedge/apk/preinst index aa437772049..573e00fa434 100644 --- a/configuration/package_scripts/_generated/tedge/apk/preinst +++ b/configuration/package_scripts/_generated/tedge/apk/preinst @@ -146,8 +146,4 @@ if user_exists "$TEDGE_USER" && group_exists "$TEDGE_GROUP"; then chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-aws.lock fi - if [ -f "/run/lock/tedge-mapper-collectd.lock" ]; then - chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-collectd.lock - fi - fi # end: user_exists && group_exists diff --git a/configuration/package_scripts/_generated/tedge/deb/preinst b/configuration/package_scripts/_generated/tedge/deb/preinst index aa437772049..573e00fa434 100755 --- a/configuration/package_scripts/_generated/tedge/deb/preinst +++ b/configuration/package_scripts/_generated/tedge/deb/preinst @@ -146,8 +146,4 @@ if user_exists "$TEDGE_USER" && group_exists "$TEDGE_GROUP"; then chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-aws.lock fi - if [ -f "/run/lock/tedge-mapper-collectd.lock" ]; then - chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-collectd.lock - fi - fi # end: user_exists && group_exists diff --git a/configuration/package_scripts/_generated/tedge/rpm/preinst b/configuration/package_scripts/_generated/tedge/rpm/preinst index aa437772049..573e00fa434 100644 --- a/configuration/package_scripts/_generated/tedge/rpm/preinst +++ b/configuration/package_scripts/_generated/tedge/rpm/preinst @@ -146,8 +146,4 @@ if user_exists "$TEDGE_USER" && group_exists "$TEDGE_GROUP"; then chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-aws.lock fi - if [ -f "/run/lock/tedge-mapper-collectd.lock" ]; then - chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-collectd.lock - fi - fi # end: user_exists && group_exists diff --git a/configuration/package_scripts/generate.py b/configuration/package_scripts/generate.py index 1ddc96a7d74..166c0b390bf 100755 --- a/configuration/package_scripts/generate.py +++ b/configuration/package_scripts/generate.py @@ -58,6 +58,11 @@ class Service: stop_on_upgrade: bool = True restart_after_upgrade: bool = True + # a deprecated service should not be automatically enabled or started, + # but it should still be removed on upgrade. + # Eventually the service may be completely removed from the services list in the packages.json file + deprecated: bool = False + def get_template(name, default=""): file = Path(name) @@ -200,20 +205,21 @@ def append_matching_services( # Special case for rpm packages: # By default rpm maintainer scripts restart mark a service to be restarted in the postrm script # unlike debian which does this in the postinst. + # Note: if the service is deprecated, it should still be removed if package_type == "deb": append_matching_services( postrm, "postrm-systemd", lambda x: True, variables ) elif package_type == "rpm": append_matching_services( - postrm, "postrm-systemd", lambda x: service.stop_on_upgrade, variables + postrm, "postrm-systemd", lambda x: service and service.stop_on_upgrade, variables ) ## postinst: restart after upgrade and start append_matching_services( postinst, "postinst-systemd-restart", - lambda x: x.restart_after_upgrade and x.start, + lambda x: x.restart_after_upgrade and x.start and not x.deprecated, { **variables, "RESTART_ACTION": "restart", @@ -224,7 +230,7 @@ def append_matching_services( append_matching_services( postinst, "postinst-systemd-restartnostart", - lambda x: x.restart_after_upgrade and not x.start, + lambda x: x.restart_after_upgrade and not x.start and not x.deprecated, { **variables, "RESTART_ACTION": "try-restart", @@ -235,15 +241,16 @@ def append_matching_services( append_matching_services( postinst, "postinst-systemd-start", - lambda x: not x.restart_after_upgrade and x.start, + lambda x: not x.restart_after_upgrade and x.start and not x.deprecated, variables, ) # prerm: stop_on_upgrade=false or restart_after_upgrade=true + # Note: if the service is deprecated, it should still be removed append_matching_services( prerm, "prerm-systemd-restart", - lambda x: not x.stop_on_upgrade or x.restart_after_upgrade, + lambda x: (not x.stop_on_upgrade or x.restart_after_upgrade), variables, ) @@ -252,7 +259,7 @@ def append_matching_services( prerm, "prerm-systemd", lambda x: not (not x.stop_on_upgrade or x.restart_after_upgrade) - and x.start, + and x.start and not x.deprecated, variables, ) diff --git a/configuration/package_scripts/packages.json b/configuration/package_scripts/packages.json index 549bcc445e9..15df42400e6 100644 --- a/configuration/package_scripts/packages.json +++ b/configuration/package_scripts/packages.json @@ -36,7 +36,7 @@ {"name": "tedge-mapper-az", "enable": false, "start": false, "restart_after_upgrade": true, "stop_on_upgrade": true}, {"name": "tedge-mapper-c8y", "enable": false, "start": false, "restart_after_upgrade": true, "stop_on_upgrade": true}, {"name": "tedge-mapper-local", "enable": true, "start": false, "restart_after_upgrade": true, "stop_on_upgrade": true}, - {"name": "tedge-mapper-collectd", "enable": false, "start": false, "restart_after_upgrade": true, "stop_on_upgrade": true}, + {"name": "tedge-mapper-collectd", "enable": false, "start": false, "restart_after_upgrade": true, "stop_on_upgrade": true, "deprecated": true}, {"name": "tedge-mapper-aws.target", "enable": true, "start": true, "restart_after_upgrade": true, "stop_on_upgrade": true}, {"name": "tedge-mapper-az.target", "enable": true, "start": true, "restart_after_upgrade": true, "stop_on_upgrade": true}, {"name": "tedge-mapper-c8y.target", "enable": true, "start": true, "restart_after_upgrade": true, "stop_on_upgrade": true} diff --git a/configuration/package_scripts/tedge-mapper/postinst b/configuration/package_scripts/tedge-mapper/postinst index d846b12e946..1877f591932 100644 --- a/configuration/package_scripts/tedge-mapper/postinst +++ b/configuration/package_scripts/tedge-mapper/postinst @@ -51,12 +51,6 @@ if command -v systemctl >/dev/null; then if [ -f "/etc/tedge/mosquitto-conf/aws-bridge.conf" ]; then enable_start_service tedge-mapper-aws.service fi - if [ -d /run/systemd/system ]; then - ### Enable the service if the collectd is running on the device - if systemctl is-active --quiet collectd.service; then - enable_start_service tedge-mapper-collectd.service - fi - fi fi if [ -f /var/lib/dpkg/info/tedge_mapper.postrm ]; then diff --git a/configuration/package_scripts/tedge-mapper/postrm b/configuration/package_scripts/tedge-mapper/postrm index 20a11ec9142..0784763fc02 100644 --- a/configuration/package_scripts/tedge-mapper/postrm +++ b/configuration/package_scripts/tedge-mapper/postrm @@ -6,8 +6,7 @@ purge_mapper_lock() { /run/lock/tedge-mapper-c8y.lock \ /run/lock/tedge-mapper-az.lock \ /run/lock/tedge-mapper-aws.lock \ - /run/lock/tedge-mapper-local.lock \ - /run/lock/tedge-mapper-collectd.lock + /run/lock/tedge-mapper-local.lock } case "$1" in diff --git a/configuration/package_scripts/tedge/preinst b/configuration/package_scripts/tedge/preinst index aa437772049..573e00fa434 100644 --- a/configuration/package_scripts/tedge/preinst +++ b/configuration/package_scripts/tedge/preinst @@ -146,8 +146,4 @@ if user_exists "$TEDGE_USER" && group_exists "$TEDGE_GROUP"; then chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-aws.lock fi - if [ -f "/run/lock/tedge-mapper-collectd.lock" ]; then - chown "$TEDGE_USER:$TEDGE_GROUP" /run/lock/tedge-mapper-collectd.lock - fi - fi # end: user_exists && group_exists diff --git a/crates/common/batcher/Cargo.toml b/crates/common/batcher/Cargo.toml deleted file mode 100644 index 1a20711f61a..00000000000 --- a/crates/common/batcher/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "batcher" -version = { workspace = true } -authors = { workspace = true } -edition = { workspace = true } -rust-version = { workspace = true } -license = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } - -[dependencies] -async-trait = { workspace = true } -tedge_actors = { workspace = true } -time = { workspace = true } -tokio = { workspace = true, features = ["time"] } - -[dev-dependencies] -tedge_actors = { workspace = true, features = ["test-helpers"] } -tokio = { workspace = true, features = ["rt", "macros"] } - -[lints] -workspace = true diff --git a/crates/common/batcher/src/batch.rs b/crates/common/batcher/src/batch.rs deleted file mode 100644 index 8e7bdb9361d..00000000000 --- a/crates/common/batcher/src/batch.rs +++ /dev/null @@ -1,177 +0,0 @@ -use crate::batchable::Batchable; -use std::collections::HashMap; -use std::iter::once; -use time::OffsetDateTime; - -#[must_use] -#[derive(Debug)] -pub enum BatchAdd { - Added, - Duplicate, - Split(Batch), -} - -#[derive(Debug)] -pub struct Batch { - batch_start: OffsetDateTime, - batch_end: OffsetDateTime, - events: HashMap, -} - -impl Batch { - pub fn new(batch_start: OffsetDateTime, batch_end: OffsetDateTime, event: B) -> Batch { - let mut events = HashMap::new(); - events.insert(event.key(), event); - - Batch { - batch_start, - batch_end, - events, - } - } - - pub fn batch_start(&self) -> OffsetDateTime { - self.batch_start - } - - pub fn batch_end(&self) -> OffsetDateTime { - self.batch_end - } - - pub fn add(&mut self, event: B) -> BatchAdd { - let key = event.key(); - if let Some(existing_event) = self.events.get(&key) { - let existing_event_time = existing_event.event_time(); - - if event.event_time() == existing_event_time { - return BatchAdd::Duplicate; - } - - return BatchAdd::Split(self.split(existing_event_time, event)); - } - - self.events.insert(key, event); - - BatchAdd::Added - } - - fn split(&mut self, existing_event_time: OffsetDateTime, event: B) -> Batch { - let split_point = midpoint(existing_event_time, event.event_time()); - - let mut new_batch_events = HashMap::new(); - let new_batch_end = self.batch_end; - - let all_events = std::mem::take(&mut self.events); - self.batch_end = split_point; - - // Go over all the events in this batch plus the new event and allocate them, - // either the existing batch or the new batch. - for event in all_events.into_values().chain(once(event)) { - let event_time = event.event_time(); - - if event_time < split_point { - self.events.insert(event.key(), event); - } else { - new_batch_events.insert(event.key(), event); - } - } - - Batch { - batch_start: split_point, - batch_end: new_batch_end, - events: new_batch_events, - } - } - - pub fn into_vec(self) -> Vec { - self.events.into_values().collect() - } -} - -fn midpoint(event_time1: OffsetDateTime, event_time2: OffsetDateTime) -> OffsetDateTime { - let gap = event_time1 - event_time2; - event_time2 + gap / 2 -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn add() { - let batch_start = OffsetDateTime::from_unix_timestamp(0).unwrap(); - let batch_end = OffsetDateTime::from_unix_timestamp(100).unwrap(); - let event1 = TestBatchEvent::new(1, 40); - let event2 = TestBatchEvent::new(2, 60); - - let mut batch = Batch::new(batch_start, batch_end, event1.clone()); - assert!(matches!(batch.add(event2.clone()), BatchAdd::Added)); - - let result = batch.into_vec(); - assert_eq!(result.len(), 2); - assert!(result.contains(&event1)); - assert!(result.contains(&event2)); - } - - #[test] - fn split() { - let batch_start = OffsetDateTime::from_unix_timestamp(0).unwrap(); - let batch_end = OffsetDateTime::from_unix_timestamp(100).unwrap(); - let event1 = TestBatchEvent::new(1, 40); - let event2 = TestBatchEvent::new(1, 60); - - let mut batch1 = Batch::new(batch_start, batch_end, event1.clone()); - match batch1.add(event2.clone()) { - BatchAdd::Split(batch2) => { - let result1 = batch1.into_vec(); - assert_eq!(result1.len(), 1); - assert!(result1.contains(&event1)); - - let result2 = batch2.into_vec(); - assert_eq!(result2.len(), 1); - assert!(result2.contains(&event2)); - } - _ => panic!("Expected split"), - } - } - - #[test] - fn duplicate() { - let batch_start = OffsetDateTime::from_unix_timestamp(0).unwrap(); - let batch_end = OffsetDateTime::from_unix_timestamp(100).unwrap(); - let event1 = TestBatchEvent::new(1, 40); - let event2 = TestBatchEvent::new(1, 40); - - let mut batch = Batch::new(batch_start, batch_end, event1.clone()); - assert!(matches!(batch.add(event2), BatchAdd::Duplicate)); - - let result = batch.into_vec(); - assert_eq!(result.len(), 1); - assert!(result.contains(&event1)); - } - - #[derive(Debug, Clone, Eq, PartialEq)] - struct TestBatchEvent { - key: u64, - event_time: OffsetDateTime, - } - - impl TestBatchEvent { - fn new(key: u64, event_time: i64) -> TestBatchEvent { - let event_time = OffsetDateTime::from_unix_timestamp(event_time).unwrap(); - TestBatchEvent { key, event_time } - } - } - - impl Batchable for TestBatchEvent { - type Key = u64; - - fn key(&self) -> Self::Key { - self.key - } - - fn event_time(&self) -> OffsetDateTime { - self.event_time - } - } -} diff --git a/crates/common/batcher/src/batchable.rs b/crates/common/batcher/src/batchable.rs deleted file mode 100644 index 02ff0266a2a..00000000000 --- a/crates/common/batcher/src/batchable.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::fmt::Debug; -use std::hash::Hash; -use time::OffsetDateTime; - -/// Implement this interface for the items that you want batched. -/// -/// No items with the same key will go in the same batch. -/// The event_time of the item will determine how items are grouped, -/// dependent on how the batcher is configured. -pub trait Batchable: 'static + Debug + Send + Sync { - type Key: Eq + Hash + Debug + Send + Sync; - - /// Define the uniqueness within a batch. - fn key(&self) -> Self::Key; - - /// The time at which this item was created. This time is used to group items into a batch. - fn event_time(&self) -> OffsetDateTime; -} diff --git a/crates/common/batcher/src/batcher.rs b/crates/common/batcher/src/batcher.rs deleted file mode 100644 index 9fa7f24d834..00000000000 --- a/crates/common/batcher/src/batcher.rs +++ /dev/null @@ -1,631 +0,0 @@ -use crate::batch::Batch; -use crate::batch::BatchAdd; -use crate::batchable::Batchable; -use crate::config::BatchConfig; -use time::OffsetDateTime; - -#[derive(Debug, Eq, PartialEq)] -pub(crate) enum BatcherOutput { - Batch(Vec), - Timer(OffsetDateTime), -} - -/// Provides the core implementation of the batching algorithm. -#[derive(Debug)] -pub struct Batcher { - config: BatchConfig, - batches: Vec>, -} - -impl Batcher { - /// Create a Batcher with the specified config. - pub fn new(config: BatchConfig) -> Batcher { - Batcher { - config, - batches: vec![], - } - } - - pub(crate) fn event( - &mut self, - processing_time: OffsetDateTime, - event: B, - ) -> Vec> { - let event_time = event.event_time(); - - if event_time < processing_time - self.config.delivery_jitter() { - // Discard event because it is too old - return vec![]; - } - - if event_time > processing_time + self.config.message_leap_limit() { - // Discard event because it is too futuristic - return vec![]; - } - - match self.find_target_batch(event_time) { - None => { - let new_batch = self.make_new_batch(event); - let new_batch_end = new_batch.batch_end(); - self.batches.push(new_batch); - self.output_for_batch_end(processing_time, new_batch_end) - } - Some(target_batch) => match target_batch.add(event) { - BatchAdd::Added => vec![], - BatchAdd::Duplicate => vec![], - BatchAdd::Split(new_batch) => { - let split_batch_end = target_batch.batch_end(); - self.batches.push(new_batch); - self.output_for_batch_end(processing_time, split_batch_end) - } - }, - } - } - - fn output_for_batch_end( - &mut self, - processing_time: OffsetDateTime, - batch_end: OffsetDateTime, - ) -> Vec> { - let batch_timeout = batch_end + self.config.delivery_jitter(); - if processing_time < batch_timeout { - vec![BatcherOutput::Timer(batch_timeout)] - } else { - self.time(processing_time) - .into_iter() - .map(BatcherOutput::Batch) - .collect() - } - } - - pub(crate) fn time(&mut self, time: OffsetDateTime) -> Vec> { - let batches = std::mem::take(&mut self.batches); - - let (open_batches, closed_batches) = batches - .into_iter() - .partition(|batch| self.is_open(batch, time)); - - self.batches = open_batches; - - closed_batches - .into_iter() - .map(|batch| batch.into_vec()) - .collect() - } - - fn is_open(&self, batch: &Batch, time: OffsetDateTime) -> bool { - batch.batch_end() + self.config.delivery_jitter() > time - } - - pub(crate) fn flush(&mut self) -> Vec> { - let mut batches = Vec::with_capacity(self.batches.len()); - - while let Some(batch) = self.batches.pop() { - batches.push(batch.into_vec()) - } - - batches - } - - fn find_target_batch(&mut self, event_time: OffsetDateTime) -> Option<&mut Batch> { - self.batches - .iter_mut() - .find(|batch| batch.batch_start() <= event_time && event_time <= batch.batch_end()) - } - - fn make_new_batch(&self, event: B) -> Batch { - let event_time = event.event_time(); - let mut batch_start = event_time; - let mut batch_end = batch_start + self.config.event_jitter(); - - if let Some(previous_batch) = self.previous_batch(event_time) { - batch_start = batch_start.max(previous_batch.batch_end()) - } - if let Some(next_batch) = self.next_batch(event_time) { - batch_end = batch_end.min(next_batch.batch_start()) - } - - Batch::new(batch_start, batch_end, event) - } - - fn previous_batch(&self, event_time: OffsetDateTime) -> Option<&Batch> { - self.batches - .iter() - .filter(|batch| batch.batch_end() < event_time) - .max_by(|batch1, batch2| batch1.batch_end().cmp(&batch2.batch_end())) - } - - fn next_batch(&self, event_time: OffsetDateTime) -> Option<&Batch> { - self.batches - .iter() - .filter(|batch| batch.batch_start() > event_time) - .min_by(|batch1, batch2| batch1.batch_start().cmp(&batch2.batch_start())) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::batchable::Batchable; - use crate::config::BatchConfigBuilder; - use std::collections::BTreeMap; - use time::Duration; - - #[test] - fn single_event_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - - test.event(1, &event1); - test.expect_batch(70, vec![event1]); - - test.run(); - } - - #[test] - fn multi_event_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "b", 2); - - test.event(1, &event1); - test.event(11, &event2); - test.expect_batch(70, vec![event1, event2]); - - test.run(); - } - - #[test] - // The same behavior as for `multi_event_batch` is expected - // Since we just change how long we wait for an event - fn multi_event_batch_with_long_delivery_jitter() { - let mut test = BatcherTest::new(50, 50, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "b", 2); - - test.event(1, &event1); - test.event(11, &event2); - test.expect_batch(100, vec![event1, event2]); - - test.run(); - } - - #[test] - fn multi_event_batch_with_long_delivery_jitter_and_delayed_message() { - let mut test = BatcherTest::new(50, 50, 0); - - let event1 = test.create_event(5, "a", 2); - let event2 = test.create_event(10, "b", 1); - - test.event(11, &event2); - test.event(25, &event1); // late, but not too late - - test.expect_batch(60, vec![event1]); - test.expect_batch(110, vec![event2]); - - test.run(); - } - - #[test] - fn split_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "a", 2); - - test.event(1, &event1); - test.event(11, &event2); - test.expect_batch(25, vec![event1]); // why 25? - test.expect_batch(70, vec![event2]); - - test.run(); - } - - #[test] - fn allocate_to_earlier_split_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "a", 2); - let event3 = test.create_event(2, "b", 3); - - test.event(1, &event1); - test.event(11, &event2); - test.event(12, &event3); - test.expect_batch(25, vec![event1, event3]); - test.expect_batch(70, vec![event2]); - - test.run(); - } - - #[test] - fn allocate_to_later_split_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "a", 2); - let event3 = test.create_event(9, "b", 3); - - test.event(1, &event1); - test.event(11, &event2); - test.event(12, &event3); - test.expect_batch(25, vec![event1]); - test.expect_batch(70, vec![event2, event3]); - - test.run(); - } - - #[test] - fn flush_no_batches() { - let mut test = BatcherTest::new(50, 20, 0); - test.flush(100); - test.run(); - } - - #[test] - fn flush_one_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(10, "b", 2); - - test.event(1, &event1); - test.event(11, &event2); - test.flush(20); - test.expect_batch(20, vec![event1, event2]); - - test.run(); - } - - #[test] - fn flush_two_batches() { - let mut test = BatcherTest::new(50, 20, 0); - - let event1 = test.create_event(0, "a", 1); - let event2 = test.create_event(3, "b", 2); - let event3 = test.create_event(10, "a", 3); - - test.event(1, &event1); - test.event(4, &event2); - test.event(11, &event3); - test.flush(20); - test.expect_batch(20, vec![event1, event2]); - test.expect_batch(20, vec![event3]); - - test.run(); - } - - // The following tests are taken from the diagrams on the specification: - // https://github.com/albinsuresh/thin-edge.io-specs/blob/main/src/telemetry-data/message-batching/message-batching.md - - #[test] - fn simple_batching_with_batching_window() { - let mut test = BatcherTest::new(50, 20, 0); - - let a = test.create_event(115, "a", 1); - let b = test.create_event(120, "b", 2); - let c = test.create_event(145, "c", 3); - let d = test.create_event(160, "d", 4); - let e = test.create_event(175, "e", 5); - let f = test.create_event(215, "f", 6); - let g = test.create_event(240, "g", 7); - - test.event(125, &b); - test.event(135, &a); // order inversion - test.event(150, &c); - test.event(165, &d); - test.event(189, &e); - test.event(250, &g); - test.event(260, &f); // too late - test.expect_batch(140, vec![a]); - test.expect_batch(190, vec![b, c, d]); - test.expect_batch(245, vec![e]); - test.expect_batch(310, vec![g]); - - test.run(); - } - - #[test] - fn simple_batching_with_batching_timeout() { - let mut test = BatcherTest::new(50, 20, 0); - - let a = test.create_event(120, "a", 1); - let b = test.create_event(130, "b", 2); - let c = test.create_event(145, "c", 3); - let d = test.create_event(180, "d", 4); - let e = test.create_event(190, "e", 5); - - test.event(130, &a); - test.event(140, &b); - test.event(150, &c); - test.event(189, &d); - test.event(210, &e); - test.expect_batch(190, vec![a, b, c]); - test.expect_batch(250, vec![d, e]); - - test.run(); - } - - #[test] - fn batch_split_due_to_conflicting_measurements() { - let mut test = BatcherTest::new(50, 20, 0); - - let a1 = test.create_event(120, "a", 1); - let b1 = test.create_event(125, "b", 2); - let a2 = test.create_event(140, "a", 3); - let c1 = test.create_event(150, "c", 4); - let a3 = test.create_event(170, "a", 5); - - test.event(125, &a1); - test.event(140, &b1); - test.event(150, &a2); - test.event(170, &c1); - test.event(180, &a3); - test.expect_batch(150, vec![a1, b1]); - test.expect_batch(180, vec![a2, c1]); - test.expect_batch(190, vec![a3]); - - test.run(); - } - - #[test] - fn receiving_older_already_batched_messages_after_starting_a_new_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let a = test.create_event(120, "a", 1); - let b = test.create_event(130, "b", 2); - let c = test.create_event(140, "c", 3); - let d = test.create_event(190, "d", 4); - let e = test.create_event(210, "e", 5); - - test.event(130, &a); - test.event(140, &b); - test.event(150, &c); - test.event(160, &c); - test.event(175, &c); - test.event(210, &d); - test.event(220, &c); - test.event(230, &e); - test.expect_batch(190, vec![a, b, c]); - test.expect_batch(260, vec![d, e]); - - test.run(); - } - - #[test] - fn receiving_older_unbatched_messages_after_starting_a_new_batch() { - let mut test = BatcherTest::new(50, 20, 0); - - let a1 = test.create_event(120, "a", 1); - let b1 = test.create_event(130, "b", 2); - let c1 = test.create_event(140, "c", 3); - let d1 = test.create_event(145, "d", 4); - let a2 = test.create_event(180, "a", 5); - let b2 = test.create_event(200, "b", 6); - - test.event(130, &a1); - test.event(140, &b1); - test.event(150, &c1); - test.event(189, &a2); - test.event(205, &b2); - test.event(215, &d1); - test.expect_batch(190, vec![a1, b1, c1]); - test.expect_batch(250, vec![a2, b2]); - - test.run(); - } - - #[derive(Debug, Clone, Eq, PartialEq)] - struct TestBatchEvent { - event_time: OffsetDateTime, - key: String, - value: u64, - } - - impl Batchable for TestBatchEvent { - type Key = String; - - fn key(&self) -> Self::Key { - self.key.clone() - } - - fn event_time(&self) -> OffsetDateTime { - self.event_time - } - } - - #[derive(Debug)] - enum EventOrTimer { - Event(TestBatchEvent), - Timer(), - } - - struct BatcherTest { - start_time: OffsetDateTime, - batcher: Batcher, - inputs: BTreeMap, - flush_time: Option, - expected_batches: BTreeMap>>, - } - - impl BatcherTest { - fn new(event_jitter: u32, delivery_jitter: u32, message_leap_limit: u32) -> BatcherTest { - let batcher_config = BatchConfigBuilder::new() - .event_jitter(event_jitter) - .delivery_jitter(delivery_jitter) - .message_leap_limit(message_leap_limit) - .build(); - - let start_time = OffsetDateTime::from_unix_timestamp(0).unwrap(); - let batcher = Batcher::new(batcher_config); - - BatcherTest { - start_time, - batcher, - inputs: BTreeMap::new(), - flush_time: None, - expected_batches: BTreeMap::new(), - } - } - - fn create_event(&mut self, event_time: i64, key: &str, value: u64) -> TestBatchEvent { - let event_time = self.create_instant(event_time); - let key = key.into(); - TestBatchEvent { - event_time, - key, - value, - } - } - - fn event(&mut self, processed_time: i64, event: &TestBatchEvent) { - let processed_time = self.create_instant(processed_time); - if let Some(_existing) = self - .inputs - .insert(processed_time, EventOrTimer::Event(event.clone())) - { - panic!("Two events with same processing time") - } - } - - fn flush(&mut self, flush_time: i64) { - self.flush_time = Some(self.create_instant(flush_time)); - } - - fn expect_batch(&mut self, batch_close_time: i64, batch: Vec) { - let batch_close_time = self.create_instant(batch_close_time); - let batches_at_time = self.expected_batches.entry(batch_close_time).or_default(); - batches_at_time.push(batch); - } - - fn run(mut self) { - let mut actual_batches = BTreeMap::new(); - - if let Some(flush_time) = self.flush_time { - if !self.inputs.split_off(&flush_time).is_empty() { - panic!("Flush must be the last test action"); - } - } - - while let Some((t, action)) = pop_first(&mut self.inputs) { - match action { - EventOrTimer::Event(event) => { - let outputs = self.batcher.event(t, event); - self.handle_outputs(t, outputs, &mut actual_batches, self.flush_time); - } - EventOrTimer::Timer() => { - actual_batches.insert(t, self.batcher.time(t)); - } - }; - } - - if let Some(t) = self.flush_time { - let batches = self.batcher.flush(); - if !batches.is_empty() { - actual_batches.insert(t, batches); - } - } - - verify(self.expected_batches, actual_batches); - } - - fn handle_outputs( - &mut self, - t: OffsetDateTime, - outputs: Vec>, - all_batches: &mut BTreeMap>>, - flush_time: Option, - ) { - let mut batches = vec![]; - - for output in outputs { - match output { - BatcherOutput::Batch(batch) => batches.push(batch), - BatcherOutput::Timer(timer) => { - if timer <= t { - panic!( - "Batcher requested non-future timer. Input: {}, timer: {}", - t, timer - ); - } - let add_timer = match flush_time { - None => true, - Some(flush_time) => timer < flush_time, - }; - if add_timer { - if let Some(existing) = self.inputs.insert(timer, EventOrTimer::Timer()) - { - panic!( - "Timer at the same time as existing event/timer: {}: {:?}", - timer, existing - ); - } - } - } - } - } - - if !batches.is_empty() { - all_batches.insert(t, batches); - } - } - - fn create_instant(&self, time: i64) -> OffsetDateTime { - self.start_time + Duration::milliseconds(time) - } - } - - fn verify( - expected_batches: BTreeMap>>, - mut actual_batches: BTreeMap>>, - ) { - assert_eq!( - actual_batches.keys().collect::>(), - expected_batches.keys().collect::>() - ); - - for (time, timed_expected_batches) in expected_batches { - let mut timed_actual_batches = actual_batches.remove(&time).unwrap(); - - for timed_expected_batch in &timed_expected_batches { - let found = - timed_actual_batches - .iter() - .enumerate() - .find(|(_index, timed_actual_batch)| { - match_batches(timed_actual_batch, timed_expected_batch) - }); - - match found { - None => panic!( - "Failed to match batch @ {}: {:?}", - time, timed_actual_batches - ), - Some((index, _batch)) => timed_actual_batches.remove(index), - }; - } - } - } - - fn match_batches(batch1: &[TestBatchEvent], batch2: &[TestBatchEvent]) -> bool { - if batch1.len() != batch2.len() { - return false; - } - - for event in batch1 { - if !batch2.contains(event) { - return false; - } - } - - true - } - - fn pop_first(map: &mut BTreeMap) -> Option<(K, V)> { - let (&key, _value) = map.iter().next()?; - map.remove_entry(&key) - } -} diff --git a/crates/common/batcher/src/config.rs b/crates/common/batcher/src/config.rs deleted file mode 100644 index 9c70279ab54..00000000000 --- a/crates/common/batcher/src/config.rs +++ /dev/null @@ -1,117 +0,0 @@ -use time::Duration; - -/// The parameters for the batching process. -#[derive(Debug, Clone)] -pub struct BatchConfig { - event_jitter: Duration, - delivery_jitter: Duration, - message_leap_limit: Duration, -} - -impl BatchConfig { - /// Get the largest expected variation in event times. - pub fn event_jitter(&self) -> Duration { - self.event_jitter - } - - /// Get the largest expected variation in delivery times. - pub fn delivery_jitter(&self) -> Duration { - self.delivery_jitter - } - - /// Get the largest expected time discontinuity. - pub fn message_leap_limit(&self) -> Duration { - self.message_leap_limit - } -} - -/// Used to configure the parameters for batching. Start here. -#[derive(Debug, Default)] -pub struct BatchConfigBuilder {} - -impl BatchConfigBuilder { - /// Start configuring the batching parameters. - pub fn new() -> BatchConfigBuilder { - BatchConfigBuilder {} - } - - /// Set the largest expected variation in event times, in milliseconds. - pub fn event_jitter(self, event_jitter: u32) -> EventBatchConfigBuilder { - EventBatchConfigBuilder { event_jitter } - } -} - -/// Used to configure the parameters for batching. -#[derive(Debug)] -pub struct EventBatchConfigBuilder { - event_jitter: u32, -} - -impl EventBatchConfigBuilder { - /// Set the largest expected variation in delivery times, in milliseconds. - pub fn delivery_jitter(self, delivery_jitter: u32) -> DeliveryBatchConfigBuilder { - DeliveryBatchConfigBuilder { - event_jitter: self.event_jitter, - delivery_jitter, - } - } -} - -/// Used to configure the parameters for batching. -#[derive(Debug)] -pub struct DeliveryBatchConfigBuilder { - event_jitter: u32, - delivery_jitter: u32, -} - -impl DeliveryBatchConfigBuilder { - /// Set the largest expected time discontinuity, in milliseconds. - pub fn message_leap_limit(self, message_leap_limit: u32) -> BuildableBatchConfigBuilder { - BuildableBatchConfigBuilder { - event_jitter: self.event_jitter, - delivery_jitter: self.delivery_jitter, - message_leap_limit, - } - } -} - -/// Used to configure the parameters for batching. -#[derive(Debug)] -pub struct BuildableBatchConfigBuilder { - event_jitter: u32, - delivery_jitter: u32, - message_leap_limit: u32, -} - -impl BuildableBatchConfigBuilder { - /// Finalise the batching parameters. - pub fn build(self) -> BatchConfig { - let event_jitter = Duration::milliseconds(self.event_jitter as i64); - let delivery_jitter = Duration::milliseconds(self.delivery_jitter as i64); - let message_leap_limit = Duration::milliseconds(self.message_leap_limit as i64); - - BatchConfig { - event_jitter, - delivery_jitter, - message_leap_limit, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn build_config() { - let config = BatchConfigBuilder::new() - .event_jitter(1) - .delivery_jitter(2) - .message_leap_limit(3) - .build(); - - assert_eq!(config.event_jitter(), Duration::milliseconds(1)); - assert_eq!(config.delivery_jitter(), Duration::milliseconds(2)); - assert_eq!(config.message_leap_limit(), Duration::milliseconds(3)); - } -} diff --git a/crates/common/batcher/src/driver.rs b/crates/common/batcher/src/driver.rs deleted file mode 100644 index 2dfd7003cb7..00000000000 --- a/crates/common/batcher/src/driver.rs +++ /dev/null @@ -1,294 +0,0 @@ -use crate::batchable::Batchable; -use crate::batcher::Batcher; -use crate::batcher::BatcherOutput; -use async_trait::async_trait; -use std::collections::BTreeSet; -use std::time::Duration; -use tedge_actors::Actor; -use tedge_actors::ChannelError; -use tedge_actors::MessageReceiver; -use tedge_actors::RuntimeError; -use tedge_actors::Sender; -use tedge_actors::SimpleMessageBox; -use time::OffsetDateTime; - -/// Input message to the BatchDriver's input channel. -#[derive(Debug)] -pub enum BatchDriverInput { - /// Message representing a new item to batch. - Event(B), - /// Message representing that the batching should finish and that - /// any remaining batches should be immediately closed and sent to the output. - Flush, -} - -impl From for BatchDriverInput { - fn from(event: B) -> Self { - BatchDriverInput::Event(event) - } -} - -/// Output message from the BatchDriver's output channel. -#[derive(Debug)] -pub enum BatchDriverOutput { - /// Message representing a batch of items. - Batch(Vec), - /// Message representing that batching has finished. - Flush, -} - -impl From> for Vec { - fn from(value: BatchDriverOutput) -> Self { - match value { - BatchDriverOutput::Batch(events) => events, - BatchDriverOutput::Flush => vec![], - } - } -} - -/// The central API for using the batching algorithm. -/// Send items in, get batches out. -pub struct BatchDriver { - batcher: Batcher, - message_box: SimpleMessageBox, BatchDriverOutput>, - timers: BTreeSet, -} - -enum TimeTo { - Unbounded, - Future(std::time::Duration), - Past(OffsetDateTime), -} - -#[async_trait] -impl Actor for BatchDriver { - fn name(&self) -> &str { - "Event batcher" - } - - /// Start the batching - runs until receiving a Flush message - async fn run(mut self) -> Result<(), RuntimeError> { - loop { - let message = match self.time_to_next_timer() { - TimeTo::Unbounded => self.recv(None), - TimeTo::Future(timeout) => self.recv(Some(timeout)), - TimeTo::Past(timer) => { - self.timers.remove(&timer); - self.time(OffsetDateTime::now_utc()).await?; - continue; - } - }; - - match message.await { - Err(_) => continue, // timer timeout expired - Ok(None) => break, // input channel closed - Ok(Some(BatchDriverInput::Flush)) => break, // we've been told to stop - Ok(Some(BatchDriverInput::Event(event))) => self.event(event).await?, - }; - } - - Ok(self.flush().await?) - } -} - -impl BatchDriver { - /// Define the batching process and channels to interact with it. - pub fn new( - batcher: Batcher, - message_box: SimpleMessageBox, BatchDriverOutput>, - ) -> BatchDriver { - BatchDriver { - batcher, - message_box, - timers: BTreeSet::new(), - } - } - - async fn recv( - &mut self, - timeout: Option, - ) -> Result>, tokio::time::error::Elapsed> { - match timeout { - None => Ok(self.message_box.recv().await), - Some(timeout) => tokio::time::timeout(timeout, self.message_box.recv()).await, - } - } - - fn time_to_next_timer(&self) -> TimeTo { - match self.timers.iter().next() { - None => TimeTo::Unbounded, - Some(timer) => { - let signed_duration = *timer - OffsetDateTime::now_utc(); - if signed_duration.is_negative() { - return TimeTo::Past(*timer); - } - TimeTo::Future(std::time::Duration::new( - signed_duration.abs().whole_seconds() as u64, - 0, - )) - } - } - } - - async fn event(&mut self, event: B) -> Result<(), ChannelError> { - for action in self.batcher.event(OffsetDateTime::now_utc(), event) { - match action { - BatcherOutput::Batch(batch) => { - self.message_box - .send(BatchDriverOutput::Batch(batch)) - .await?; - } - BatcherOutput::Timer(t) => { - self.timers.insert(t); - } - }; - } - - Ok(()) - } - - async fn time(&mut self, timer: OffsetDateTime) -> Result<(), ChannelError> { - for batch in self.batcher.time(timer) { - self.message_box - .send(BatchDriverOutput::Batch(batch)) - .await?; - } - - Ok(()) - } - - async fn flush(&mut self) -> Result<(), ChannelError> { - for batch in self.batcher.flush() { - self.message_box - .send(BatchDriverOutput::Batch(batch)) - .await?; - } - - self.message_box.send(BatchDriverOutput::Flush).await - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::batchable::Batchable; - use crate::batcher::Batcher; - use crate::config::BatchConfigBuilder; - use crate::driver::BatchDriver; - use std::time::Duration; - use tedge_actors::Builder; - use tedge_actors::MessageSource; - use tedge_actors::NoConfig; - use tedge_actors::SimpleMessageBoxBuilder; - use tokio::time::timeout; - - type TestBox = - SimpleMessageBox, BatchDriverInput>; - - #[tokio::test] - async fn flush_empty() -> Result<(), ChannelError> { - let mut test_box = spawn_driver(); - test_box.send(BatchDriverInput::Flush).await?; - assert_recv_flush(&mut test_box).await; - Ok(()) - } - - #[tokio::test] - async fn flush_one_batch() -> Result<(), ChannelError> { - let mut test_box = spawn_driver(); - - let event1 = TestBatchEvent::new(1, OffsetDateTime::now_utc()); - test_box.send(BatchDriverInput::Event(event1)).await?; - test_box.send(BatchDriverInput::Flush).await?; - - assert_recv_batch(&mut test_box, vec![event1]).await; - assert_recv_flush(&mut test_box).await; - - Ok(()) - } - - #[tokio::test] - async fn two_batches_with_timer() -> Result<(), ChannelError> { - let mut test_box = spawn_driver(); - - let event1 = TestBatchEvent::new(1, OffsetDateTime::now_utc()); - test_box.send(BatchDriverInput::Event(event1)).await?; - - assert_recv_batch(&mut test_box, vec![event1]).await; - - let event2 = TestBatchEvent::new(2, OffsetDateTime::now_utc()); - test_box.send(BatchDriverInput::Event(event2)).await?; - - assert_recv_batch(&mut test_box, vec![event2]).await; - - Ok(()) - } - - async fn assert_recv_batch(test_box: &mut TestBox, expected: Vec) { - match timeout(Duration::from_secs(10), test_box.recv()).await { - Ok(Some(BatchDriverOutput::Batch(batch))) => assert_batch(batch, expected), - other => panic!("Failed to receive batch: {:?}", other), - } - } - - fn assert_batch(batch: Vec, expected: Vec) { - assert_eq!(batch.len(), expected.len()); - - for event in &batch { - if !expected.contains(event) { - panic!("Failed to find: {:?}", event); - } - } - } - - async fn assert_recv_flush(test_box: &mut TestBox) { - match timeout(Duration::from_secs(10), test_box.recv()).await { - Ok(Some(BatchDriverOutput::Flush)) => {} - other => panic!("Failed to receive flush: {:?}", other), - } - } - - fn spawn_driver() -> TestBox { - let config = BatchConfigBuilder::new() - .event_jitter(50) - .delivery_jitter(20) - .message_leap_limit(0) - .build(); - let batcher = Batcher::new(config); - let mut box_builder = SimpleMessageBoxBuilder::new("SUT", 1); - let mut test_box_builder = SimpleMessageBoxBuilder::new("Test", 1); - box_builder.connect_sink(NoConfig, &test_box_builder); - test_box_builder.connect_sink(NoConfig, &box_builder); - let test_box = test_box_builder.build(); - let driver_box = box_builder.build(); - - let driver = BatchDriver::new(batcher, driver_box); - tokio::spawn(async move { driver.run().await }); - - test_box - } - - #[derive(Debug, Copy, Clone, Eq, PartialEq)] - struct TestBatchEvent { - key: u64, - event_time: OffsetDateTime, - } - - impl TestBatchEvent { - fn new(key: u64, event_time: OffsetDateTime) -> TestBatchEvent { - TestBatchEvent { key, event_time } - } - } - - impl Batchable for TestBatchEvent { - type Key = u64; - - fn key(&self) -> Self::Key { - self.key - } - - fn event_time(&self) -> OffsetDateTime { - self.event_time - } - } -} diff --git a/crates/common/batcher/src/lib.rs b/crates/common/batcher/src/lib.rs deleted file mode 100644 index 39b91d551f9..00000000000 --- a/crates/common/batcher/src/lib.rs +++ /dev/null @@ -1,105 +0,0 @@ -//! Group together events that are close in time. - -mod batch; -mod batchable; -mod batcher; -mod config; -mod driver; - -pub use crate::batchable::Batchable; -pub use crate::batcher::Batcher; -pub use crate::config::BatchConfig; -pub use crate::config::BatchConfigBuilder; -pub use crate::config::BuildableBatchConfigBuilder; -pub use crate::config::DeliveryBatchConfigBuilder; -pub use crate::config::EventBatchConfigBuilder; -pub use crate::driver::BatchDriver; -pub use crate::driver::BatchDriverInput; -pub use crate::driver::BatchDriverOutput; -use std::convert::Infallible; -use tedge_actors::Builder; -use tedge_actors::DynSender; -use tedge_actors::MessageSink; -use tedge_actors::MessageSource; -use tedge_actors::NoConfig; -use tedge_actors::RuntimeRequest; -use tedge_actors::RuntimeRequestSink; -use tedge_actors::SimpleMessageBoxBuilder; - -pub struct BatchingActorBuilder { - batching_window: u32, - maximum_message_delay: u32, - message_leap_limit: u32, - message_box: SimpleMessageBoxBuilder, BatchDriverOutput>, -} - -impl Default for BatchingActorBuilder { - fn default() -> Self { - BatchingActorBuilder { - batching_window: 500, - maximum_message_delay: 400, // Heuristic delay that should work out well on an Rpi - message_leap_limit: 0, - message_box: SimpleMessageBoxBuilder::new("Event batcher", 16), - } - } -} - -impl BatchingActorBuilder { - pub fn with_batching_window(self, batching_window: u32) -> Self { - Self { - batching_window, - ..self - } - } - - pub fn with_maximum_message_delay(self, maximum_message_delay: u32) -> Self { - Self { - maximum_message_delay, - ..self - } - } - - pub fn with_message_leap_limit(self, message_leap_limit: u32) -> Self { - Self { - message_leap_limit, - ..self - } - } -} - -impl MessageSink> for BatchingActorBuilder { - fn get_sender(&self) -> DynSender> { - self.message_box.get_sender() - } -} - -impl MessageSource, NoConfig> for BatchingActorBuilder { - fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink>) { - self.message_box.connect_sink(config, peer) - } -} - -impl RuntimeRequestSink for BatchingActorBuilder { - fn get_signal_sender(&self) -> DynSender { - self.message_box.get_signal_sender() - } -} - -impl Builder> for BatchingActorBuilder { - type Error = Infallible; - - fn try_build(self) -> Result, Self::Error> { - Ok(self.build()) - } - - fn build(self) -> BatchDriver { - let batch_config = BatchConfigBuilder::new() - .event_jitter(self.batching_window) - .delivery_jitter(self.maximum_message_delay) - .message_leap_limit(self.message_leap_limit) - .build(); - let batcher = Batcher::new(batch_config); - let message_box = self.message_box.build(); - BatchDriver::new(batcher, message_box) - } -} diff --git a/crates/core/tedge/src/supervisor.rs b/crates/core/tedge/src/supervisor.rs index e007a1de185..577a2dc4795 100644 --- a/crates/core/tedge/src/supervisor.rs +++ b/crates/core/tedge/src/supervisor.rs @@ -561,12 +561,12 @@ mod tests { #[test] fn run_all_logging_considers_supervisor_agent_and_mapper_services() { assert_eq!( - log_service_names(Some(&MapperName::Collectd)), + log_service_names(Some(&MapperName::UserDefined(vec!["local".to_string()]))), vec![ "tedge".to_string(), tedge_agent::AGENT_NAME.to_string(), "tedge-mapper".to_string(), - "tedge-mapper-collectd".to_string(), + "tedge-mapper".to_string(), ] ); } diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index 48373c6dad3..a717b6a840c 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -14,15 +14,12 @@ anyhow = { workspace = true } async-trait = { workspace = true } aws_mapper_ext = { workspace = true, optional = true } az_mapper_ext = { workspace = true, optional = true } -batcher = { workspace = true } c8y_api = { workspace = true, optional = true } c8y_auth_proxy = { workspace = true, optional = true } c8y_mapper_ext = { workspace = true, optional = true } camino = { workspace = true } certificate = { workspace = true } clap = { workspace = true } -clock = { workspace = true } -collectd_ext = { workspace = true } flockfile = { workspace = true } mqtt_channel = { workspace = true } serde = { workspace = true } diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs deleted file mode 100644 index 0f5bfda74ec..00000000000 --- a/crates/core/tedge_mapper/src/collectd/mapper.rs +++ /dev/null @@ -1,59 +0,0 @@ -use crate::core::component::TEdgeComponent; -use crate::core::mapper::start_basic_actors; -use async_trait::async_trait; -use batcher::BatchingActorBuilder; -use collectd_ext::actor::CollectdActorBuilder; -use mqtt_channel::QoS; -use mqtt_channel::Topic; -use mqtt_channel::TopicFilter; -use tedge_actors::MessageSink; -use tedge_actors::NoConfig; -use tedge_actors::Runtime; -use tedge_config::TEdgeConfig; -use tedge_utils::paths::TedgePaths; - -const COLLECTD_MAPPER_NAME: &str = "tedge-mapper-collectd"; -const COLLECTD_INPUT_TOPICS: &str = "collectd/#"; -const COLLECTD_OUTPUT_TOPIC: &str = "te/device/main///m/"; - -pub struct CollectdMapper; - -impl CollectdMapper { - fn input_topics() -> TopicFilter { - TopicFilter::new_unchecked(COLLECTD_INPUT_TOPICS).with_qos(QoS::AtMostOnce) - } - - fn output_topic() -> Topic { - Topic::new_unchecked(COLLECTD_OUTPUT_TOPIC) - } -} - -#[async_trait] -impl TEdgeComponent for CollectdMapper { - async fn build( - &self, - tedge_config: TEdgeConfig, - _config_dir: &TedgePaths, - ) -> Result { - let (mut runtime, mut mqtt_actor) = - start_basic_actors(COLLECTD_MAPPER_NAME, &tedge_config).await?; - - let input_topic = CollectdMapper::input_topics(); - let output_topic = CollectdMapper::output_topic(); - - let mut batching_actor = BatchingActorBuilder::default(); - let mut collectd_actor = CollectdActorBuilder::new(input_topic); - - collectd_actor.add_input(&mut mqtt_actor); - batching_actor.connect_source(NoConfig, &mut collectd_actor); - mqtt_actor.connect_mapped_source(NoConfig, &mut batching_actor, move |batch| { - collectd_ext::converter::batch_into_mqtt_messages(&output_topic, batch) - }); - - runtime.spawn(collectd_actor).await?; - runtime.spawn(batching_actor).await?; - runtime.spawn(mqtt_actor).await?; - - Ok(runtime) - } -} diff --git a/crates/core/tedge_mapper/src/collectd/mod.rs b/crates/core/tedge_mapper/src/collectd/mod.rs deleted file mode 100644 index 9b60821cb13..00000000000 --- a/crates/core/tedge_mapper/src/collectd/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod mapper; diff --git a/crates/core/tedge_mapper/src/lib.rs b/crates/core/tedge_mapper/src/lib.rs index 37c6cc845ef..7e12954e580 100644 --- a/crates/core/tedge_mapper/src/lib.rs +++ b/crates/core/tedge_mapper/src/lib.rs @@ -4,7 +4,6 @@ use crate::aws::mapper::AwsMapper; use crate::az::mapper::AzureMapper; #[cfg(feature = "c8y")] use crate::c8y::mapper::CumulocityMapper; -use crate::collectd::mapper::CollectdMapper; use crate::core::component::TEdgeComponent; use crate::custom::mapper::CustomMapper; use anyhow::bail; @@ -70,7 +69,6 @@ pub mod aws; pub mod az; #[cfg(feature = "c8y")] pub mod c8y; -mod collectd; mod core; mod custom; use crate::custom_mapper_resolve::EffectiveMapperConfig; @@ -110,7 +108,6 @@ fn lookup_component(component_name: MapperName) -> anyhow::Result Box::new(AwsMapper { profile: read_and_set_var!(profile, "TEDGE_CLOUD_PROFILE"), }), - MapperName::Collectd => Box::new(CollectdMapper), #[cfg(feature = "c8y")] MapperName::C8y { profile } => Box::new(CumulocityMapper { profile: read_and_set_var!(profile, "TEDGE_CLOUD_PROFILE"), @@ -173,7 +170,6 @@ pub enum MapperName { #[clap(long)] profile: Option, }, - Collectd, /// Run a user-defined mapper from `/etc/tedge/mappers/{name}/`. /// /// The mapper name must match `[a-z][a-z0-9-]*`. @@ -202,7 +198,6 @@ impl fmt::Display for MapperName { MapperName::C8y { profile: Some(profile), } => write!(f, "tedge-mapper-c8y@{profile}"), - MapperName::Collectd => write!(f, "tedge-mapper-collectd"), MapperName::UserDefined(args) => write!( f, "tedge-mapper-{}", @@ -221,7 +216,6 @@ impl MapperName { MapperName::Aws { .. } => "tedge-mapper-aws", #[cfg(feature = "c8y")] MapperName::C8y { .. } => "tedge-mapper-c8y", - MapperName::Collectd => "tedge-mapper-collectd", MapperName::UserDefined(_) => "tedge-mapper", } } diff --git a/crates/extensions/collectd_ext/Cargo.toml b/crates/extensions/collectd_ext/Cargo.toml deleted file mode 100644 index 01eddee6096..00000000000 --- a/crates/extensions/collectd_ext/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "collectd_ext" -description = "thin-edge extension adding support for collectd" -version = { workspace = true } -authors = { workspace = true } -edition = { workspace = true } -rust-version = { workspace = true } -license = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-trait = { workspace = true } -batcher = { workspace = true } -clock = { workspace = true } -log = { workspace = true } -tedge_actors = { workspace = true } -tedge_api = { workspace = true } -tedge_mqtt_ext = { workspace = true } -thiserror = { workspace = true } -time = { workspace = true } -tokio = { workspace = true, features = ["sync", "time"] } - -[dev-dependencies] -anyhow = { workspace = true } -assert_matches = { workspace = true } -time = { workspace = true, features = ["macros"] } - -[lints] -workspace = true diff --git a/crates/extensions/collectd_ext/src/actor.rs b/crates/extensions/collectd_ext/src/actor.rs deleted file mode 100644 index d410435b6f2..00000000000 --- a/crates/extensions/collectd_ext/src/actor.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::collectd::CollectdMessage; -use async_trait::async_trait; -use log::error; -use std::convert::Infallible; -use tedge_actors::Actor; -use tedge_actors::Builder; -use tedge_actors::DynSender; -use tedge_actors::MessageReceiver; -use tedge_actors::MessageSink; -use tedge_actors::MessageSource; -use tedge_actors::NoConfig; -use tedge_actors::RuntimeError; -use tedge_actors::RuntimeRequest; -use tedge_actors::RuntimeRequestSink; -use tedge_actors::Sender; -use tedge_actors::SimpleMessageBox; -use tedge_actors::SimpleMessageBoxBuilder; -use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::TopicFilter; - -/// An actor that collects measurements from collectd over MQTT -pub struct CollectdActor { - messages: SimpleMessageBox, -} - -#[async_trait] -impl Actor for CollectdActor { - fn name(&self) -> &str { - "collectd" - } - - async fn run(mut self) -> Result<(), RuntimeError> { - while let Some(message) = self.messages.recv().await { - match CollectdMessage::parse_from(&message) { - Ok(collectd_message) => { - for msg in collectd_message { - self.messages.send(msg).await? - } - } - Err(err) => { - error!("Error while decoding a collectd message: {}", err); - } - } - } - Ok(()) - } -} - -pub struct CollectdActorBuilder { - topics: TopicFilter, - message_box: SimpleMessageBoxBuilder, -} - -impl CollectdActorBuilder { - pub fn new(topics: TopicFilter) -> Self { - CollectdActorBuilder { - topics, - message_box: SimpleMessageBoxBuilder::new("Collectd", 16), - } - } - - pub fn add_input(&mut self, source: &mut impl MessageSource) { - source.connect_sink(self.topics.clone(), &self.message_box) - } -} - -impl RuntimeRequestSink for CollectdActorBuilder { - fn get_signal_sender(&self) -> DynSender { - self.message_box.get_signal_sender() - } -} - -impl MessageSource for CollectdActorBuilder { - fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink) { - self.message_box.connect_sink(config, peer) - } -} - -impl Builder for CollectdActorBuilder { - type Error = Infallible; - - fn try_build(self) -> Result { - Ok(self.build()) - } - - fn build(self) -> CollectdActor { - CollectdActor { - messages: self.message_box.build(), - } - } -} diff --git a/crates/extensions/collectd_ext/src/batcher.rs b/crates/extensions/collectd_ext/src/batcher.rs deleted file mode 100644 index 094af4bc487..00000000000 --- a/crates/extensions/collectd_ext/src/batcher.rs +++ /dev/null @@ -1,125 +0,0 @@ -use clock::Timestamp; -use tedge_api::measurement::MeasurementGroup; -use tedge_api::measurement::MeasurementGrouper; -use tedge_api::measurement::MeasurementGrouperError; -use tedge_api::measurement::MeasurementVisitor; -use tedge_api::measurement::ThinEdgeJsonSerializer; -use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::Topic; - -use super::collectd::CollectdMessage; -use super::error::DeviceMonitorError; - -#[derive(Debug)] -pub struct MessageBatch { - message_grouper: MeasurementGrouper, -} - -impl MessageBatch { - pub fn thin_edge_json( - output_topic: &Topic, - messages: Vec, - ) -> Result { - let mut messages = messages.into_iter(); - - if let Some(first_message) = messages.next() { - let timestamp = first_message.timestamp; - let mut batch = MessageBatch::start_batch(first_message, timestamp)?; - for message in messages { - batch.add_to_batch(message)?; - } - let measurements = batch.end_batch()?; - - let mut tedge_json_serializer = ThinEdgeJsonSerializer::new(); - measurements.accept(&mut tedge_json_serializer)?; - - let payload = tedge_json_serializer.bytes()?; - Ok(MqttMessage::new(output_topic, payload)) - } else { - Err(DeviceMonitorError::FromInvalidThinEdgeJson( - MeasurementGrouperError::UnexpectedEnd, - )) - } - } - - fn start_batch( - collectd_message: CollectdMessage, - timestamp: Timestamp, - ) -> Result { - let mut message_grouper = MeasurementGrouper::new(); - message_grouper.visit_timestamp(timestamp)?; - - let mut message_batch = Self { message_grouper }; - - message_batch.add_to_batch(collectd_message)?; - - Ok(message_batch) - } - - fn add_to_batch( - &mut self, - collectd_message: CollectdMessage, - ) -> Result<(), DeviceMonitorError> { - collectd_message.accept(&mut self.message_grouper)?; - Ok(()) - } - - fn end_batch(self) -> Result { - Ok(self.message_grouper.end()?) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use assert_matches::assert_matches; - use clock::Clock; - use clock::WallClock; - use time::macros::datetime; - - #[test] - fn test_message_batch_processor() -> anyhow::Result<()> { - let timestamp = datetime!(2015-05-15 0:00:01.444 UTC); - let collectd_message = CollectdMessage::new("temperature", "value", 32.5, timestamp); - let mut message_batch = MessageBatch::start_batch(collectd_message, WallClock.now())?; - - let collectd_message = CollectdMessage::new("coordinate", "x", 50.0, timestamp); - message_batch.add_to_batch(collectd_message)?; - - let collectd_message = CollectdMessage::new("coordinate", "y", 70.0, timestamp); - message_batch.add_to_batch(collectd_message)?; - - let collectd_message = CollectdMessage::new("pressure", "value", 98.2, timestamp); - message_batch.add_to_batch(collectd_message)?; - - let collectd_message = CollectdMessage::new("coordinate", "z", 90.0, timestamp); - message_batch.add_to_batch(collectd_message)?; - - let message_group = message_batch.end_batch()?; - - assert_matches!(message_group.timestamp(), Some(_)); - - assert_eq!( - message_group.get_measurement_value(Some("temperature"), "value"), - Some(32.5) - ); - assert_eq!( - message_group.get_measurement_value(Some("pressure"), "value"), - Some(98.2) - ); - assert_eq!( - message_group.get_measurement_value(Some("coordinate"), "x"), - Some(50.0) - ); - assert_eq!( - message_group.get_measurement_value(Some("coordinate"), "y"), - Some(70.0) - ); - assert_eq!( - message_group.get_measurement_value(Some("coordinate"), "z"), - Some(90.0) - ); - - Ok(()) - } -} diff --git a/crates/extensions/collectd_ext/src/collectd.rs b/crates/extensions/collectd_ext/src/collectd.rs deleted file mode 100644 index 6176cf9452e..00000000000 --- a/crates/extensions/collectd_ext/src/collectd.rs +++ /dev/null @@ -1,373 +0,0 @@ -use batcher::Batchable; -use tedge_api::measurement::MeasurementVisitor; -use tedge_mqtt_ext::MqttMessage; -use time::Duration; -use time::OffsetDateTime; - -#[derive(Debug)] -pub struct CollectdMessage { - pub metric_group_key: String, - pub metric_key: String, - pub timestamp: OffsetDateTime, - pub metric_value: f64, -} - -#[derive(thiserror::Error, Debug)] -pub enum CollectdError { - #[error( - "Message received on invalid collectd topic: {0}. \ - Collectd message topics must be in the format collectd///" - )] - InvalidMeasurementTopic(String), - - #[error("Invalid payload received on topic: {0}. Error: {1}")] - InvalidMeasurementPayload(String, CollectdPayloadError), - - #[error("Non UTF-8 payload: {0:?}")] - NonUTF8MeasurementPayload(Vec), -} - -impl CollectdMessage { - pub fn accept(&self, visitor: &mut T) -> Result<(), T::Error> - where - T: MeasurementVisitor, - { - visitor.visit_grouped_measurement( - &self.metric_group_key, - &self.metric_key, - self.metric_value, - ) - } - - #[cfg(test)] - pub fn new( - metric_group_key: &str, - metric_key: &str, - metric_value: f64, - timestamp: OffsetDateTime, - ) -> Self { - Self { - metric_group_key: metric_group_key.to_string(), - metric_key: metric_key.to_string(), - timestamp, - metric_value, - } - } - - pub fn parse_from(mqtt_message: &MqttMessage) -> Result, CollectdError> { - let topic = mqtt_message.topic.name.as_str(); - let collectd_topic = match CollectdTopic::from_str(topic) { - Ok(collectd_topic) => collectd_topic, - Err(_) => { - return Err(CollectdError::InvalidMeasurementTopic(topic.into())); - } - }; - - let payload = mqtt_message.payload_str().map_err(|_err| { - CollectdError::NonUTF8MeasurementPayload(mqtt_message.payload_bytes().into()) - })?; - - let collectd_payload = CollectdPayload::parse_from(payload) - .map_err(|err| CollectdError::InvalidMeasurementPayload(topic.into(), err))?; - - let num_measurements = collectd_payload.metric_values.len(); - let mut collectd_messages: Vec = Vec::with_capacity(num_measurements); - - for (i, value) in collectd_payload.metric_values.iter().enumerate() { - let mut metric_key = collectd_topic.metric_key.to_string(); - // If there are multiple values, then create unique keys metric_key_val1, metric_key_val2 etc. - if num_measurements > 1 { - metric_key = format!("{}_val{}", metric_key, i + 1); - } - collectd_messages.push(CollectdMessage { - metric_group_key: collectd_topic.metric_group_key.to_string(), - metric_key, - timestamp: collectd_payload.timestamp(), - metric_value: *value, - }); - } - Ok(collectd_messages) - } -} - -#[derive(Debug, Eq, PartialEq, Hash)] -pub struct CollectdTopic<'a> { - metric_group_key: &'a str, - metric_key: &'a str, -} - -#[derive(Debug)] -struct InvalidCollectdTopicName; - -impl<'a> CollectdTopic<'a> { - fn from_str(topic_name: &'a str) -> Result { - let mut iter = topic_name.split('/'); - let _collectd_prefix = iter.next().ok_or(InvalidCollectdTopicName)?; - let _hostname = iter.next().ok_or(InvalidCollectdTopicName)?; - let metric_group_key = iter.next().ok_or(InvalidCollectdTopicName)?; - let metric_key = iter.next().ok_or(InvalidCollectdTopicName)?; - - match iter.next() { - None => Ok(CollectdTopic { - metric_group_key, - metric_key, - }), - Some(_) => Err(InvalidCollectdTopicName), - } - } -} - -#[derive(Debug)] -struct CollectdPayload { - timestamp: f64, - metric_values: Vec, -} - -#[derive(thiserror::Error, Debug)] -#[allow(clippy::enum_variant_names)] -pub enum CollectdPayloadError { - #[error("Invalid payload: {0}. Expected payload format: :")] - InvalidMeasurementPayloadFormat(String), - - #[error("Invalid measurement timestamp: {0}. Epoch time value expected")] - InvalidMeasurementTimestamp(String), - - #[error("Invalid measurement value: {0}. Must be a number")] - InvalidMeasurementValue(String), -} - -impl CollectdPayload { - fn parse_from(payload: &str) -> Result { - let msg: Vec<&str> = payload.split(':').collect(); - let vec_len = msg.len(); - - if vec_len <= 1 { - return Err(CollectdPayloadError::InvalidMeasurementPayloadFormat( - payload.to_string(), - )); - } - - // First element is always the timestamp - let timestamp = msg[0].parse::().map_err(|_err| { - CollectdPayloadError::InvalidMeasurementTimestamp(msg[0].to_string()) - })?; - - let metric_values = msg - .into_iter() - .skip(1) - .map(|m| { - m.parse::() - .map_err(|_err| CollectdPayloadError::InvalidMeasurementValue(m.to_string())) - }) - .collect::, _>>()?; - - Ok(CollectdPayload { - timestamp, - metric_values, - }) - } - - pub fn timestamp(&self) -> OffsetDateTime { - let timestamp = self.timestamp.trunc() as i64; - let nanoseconds = (self.timestamp.fract() * 1.0e9) as u32; - OffsetDateTime::from_unix_timestamp(timestamp).unwrap() - + Duration::nanoseconds(nanoseconds as i64) - } -} - -impl Batchable for CollectdMessage { - type Key = String; - - fn key(&self) -> Self::Key { - format!("{}/{}", &self.metric_group_key, &self.metric_key) - } - - fn event_time(&self) -> OffsetDateTime { - self.timestamp - } -} - -#[cfg(test)] -mod tests { - use assert_matches::assert_matches; - use std::ops::Index; - use tedge_mqtt_ext::MqttMessage; - use tedge_mqtt_ext::Topic; - use time::macros::datetime; - - use super::*; - - #[test] - fn collectd_message_parsing() { - let topic = Topic::new_unchecked("collectd/localhost/temperature/value"); - let mqtt_message = MqttMessage::new(&topic, "123456789:32.5"); - - let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap(); - - let CollectdMessage { - metric_group_key, - metric_key, - timestamp, - metric_value, - } = collectd_message.index(0); - assert_eq!(metric_group_key, "temperature"); - - assert_eq!(metric_key, "value"); - assert_eq!(*timestamp, datetime!(1973-11-29 21:33:09.0 UTC)); - assert_eq!(*metric_value, 32.5); - } - - #[test] - fn collectd_message_parsing_multi_valued_measurement() { - let topic = Topic::new("collectd/localhost/temperature/value").unwrap(); - let mqtt_message = MqttMessage::new(&topic, "123456789:32.5:45.2"); - - let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap(); - - let CollectdMessage { - metric_group_key, - metric_key, - timestamp, - metric_value: _, - } = collectd_message.index(0); - assert_eq!(metric_group_key, "temperature"); - - assert_eq!(metric_key, "value_val1"); - assert_eq!(*timestamp, datetime!(1973-11-29 21:33:09.0 UTC)); - - let CollectdMessage { - metric_group_key, - metric_key, - timestamp, - metric_value, - } = collectd_message.index(1); - - assert_eq!(metric_group_key, "temperature"); - assert_eq!(metric_key, "value_val2"); - assert_eq!(*timestamp, datetime!(1973-11-29 21:33:09.0 UTC)); - assert_eq!(*metric_value, 45.2); - } - - #[test] - fn collectd_null_terminated_message_parsing() { - let topic = Topic::new("collectd/localhost/temperature/value").unwrap(); - let mqtt_message = MqttMessage::new(&topic, "123456789.125:32.5\u{0}"); - - let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap(); - - let CollectdMessage { - metric_group_key, - metric_key, - timestamp, - metric_value, - } = collectd_message.index(0); - - assert_eq!(metric_group_key, "temperature"); - assert_eq!(metric_key, "value"); - assert_eq!(*timestamp, datetime!(1973-11-29 21:33:09.125 UTC)); - assert_eq!(*metric_value, 32.5); - } - - #[test] - fn invalid_collectd_message_topic() { - let topic = Topic::new("collectd/less/level").unwrap(); - let mqtt_message = MqttMessage::new(&topic, "123456789:32.5"); - - let result = CollectdMessage::parse_from(&mqtt_message); - - assert_matches!(result, Err(CollectdError::InvalidMeasurementTopic(_))); - } - - #[test] - fn invalid_collectd_message_payload() { - let topic = Topic::new("collectd/host/group/key").unwrap(); - let invalid_collectd_message = MqttMessage::new(&topic, "123456789"); - - let result = CollectdMessage::parse_from(&invalid_collectd_message); - - assert_matches!(result, Err(CollectdError::InvalidMeasurementPayload(_, _))); - } - - #[test] - fn invalid_collectd_topic_less_levels() { - let result = CollectdTopic::from_str("collectd/less/levels"); - - assert_matches!(result, Err(InvalidCollectdTopicName)); - } - - #[test] - fn invalid_collectd_topic_more_levels() { - let result = CollectdTopic::from_str("collectd/more/levels/than/needed"); - - assert_matches!(result, Err(InvalidCollectdTopicName)); - } - - #[test] - fn invalid_collectd_payload_no_separator() { - let payload = "123456789"; - let result = CollectdPayload::parse_from(payload); - - assert_matches!( - result, - Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(_)) - ); - } - - #[test] - fn invalid_collectd_metric_value() { - let payload = "123456789:abc"; - let result = CollectdPayload::parse_from(payload); - - assert_matches!( - result, - Err(CollectdPayloadError::InvalidMeasurementValue(_)) - ); - } - - #[test] - fn invalid_collectd_metric_multi_value() { - let payload = "123456789:96.6:abc"; - let result = CollectdPayload::parse_from(payload); - - assert_matches!( - result, - Err(CollectdPayloadError::InvalidMeasurementValue(_)) - ); - } - - #[test] - fn valid_collectd_multivalue_metric() { - let payload = "123456789:1234:5678"; - let result = CollectdPayload::parse_from(payload).unwrap(); - - assert_eq!(result.timestamp, 123456789.0); - assert_eq!(result.metric_values, vec![1234.0, 5678.0]); - } - - #[test] - fn invalid_collectd_metric_timestamp() { - let payload = "abc:98.6"; - let result = CollectdPayload::parse_from(payload); - - assert_matches!( - result, - Err(CollectdPayloadError::InvalidMeasurementTimestamp(_)) - ); - } - - #[test] - fn very_large_metric_value() { - let payload: String = format!("123456789:{}", u128::MAX); - let collectd_payload = CollectdPayload::parse_from(payload.as_str()).unwrap(); - - assert_eq!(*collectd_payload.metric_values.index(0), u128::MAX as f64); - } - - #[test] - fn very_small_metric_value() { - let payload: String = format!("123456789:{}", i128::MIN); - let collectd_payload = CollectdPayload::parse_from(payload.as_str()).unwrap(); - - assert_eq!(*collectd_payload.metric_values.index(0), i128::MIN as f64); - } -} diff --git a/crates/extensions/collectd_ext/src/converter.rs b/crates/extensions/collectd_ext/src/converter.rs deleted file mode 100644 index aebd384dd0b..00000000000 --- a/crates/extensions/collectd_ext/src/converter.rs +++ /dev/null @@ -1,26 +0,0 @@ -use crate::batcher::MessageBatch; -use crate::collectd::CollectdMessage; -use batcher::BatchDriverOutput; -use log::error; -use tedge_mqtt_ext::MqttMessage; -use tedge_mqtt_ext::Topic; - -pub fn batch_into_mqtt_messages( - output_topic: &Topic, - in_message: BatchDriverOutput, -) -> Vec { - match in_message { - BatchDriverOutput::Batch(measurements) => { - match MessageBatch::thin_edge_json(output_topic, measurements) { - Ok(message) => { - vec![message] - } - Err(err) => { - error!("Error while encoding a thin-edge json message: {}", err); - vec![] - } - } - } - BatchDriverOutput::Flush => vec![], - } -} diff --git a/crates/extensions/collectd_ext/src/error.rs b/crates/extensions/collectd_ext/src/error.rs deleted file mode 100644 index df4133b50ab..00000000000 --- a/crates/extensions/collectd_ext/src/error.rs +++ /dev/null @@ -1,29 +0,0 @@ -use tedge_actors::RuntimeError; -use tokio::sync::mpsc::error::SendError; - -#[derive(thiserror::Error, Debug)] -#[allow(clippy::enum_variant_names)] -pub enum DeviceMonitorError { - #[error(transparent)] - FromMqttClient(#[from] tedge_mqtt_ext::MqttError), - - #[error(transparent)] - FromInvalidCollectdMeasurement(#[from] crate::collectd::CollectdError), - - #[error(transparent)] - FromInvalidThinEdgeJson(#[from] tedge_api::measurement::MeasurementGrouperError), - - #[error(transparent)] - FromThinEdgeJsonSerializationError( - #[from] tedge_api::measurement::ThinEdgeJsonSerializationError, - ), - - #[error(transparent)] - FromBatchingError(#[from] SendError), -} - -impl From for RuntimeError { - fn from(error: DeviceMonitorError) -> Self { - Box::new(error).into() - } -} diff --git a/crates/extensions/collectd_ext/src/lib.rs b/crates/extensions/collectd_ext/src/lib.rs deleted file mode 100644 index f6c3fcf54ad..00000000000 --- a/crates/extensions/collectd_ext/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod actor; -pub mod batcher; -pub mod collectd; -pub mod converter; -pub mod error; diff --git a/docs/src/operate/configuration/mosquitto-configuration.md b/docs/src/operate/configuration/mosquitto-configuration.md index 16c76520aa7..c7559e0ff8f 100644 --- a/docs/src/operate/configuration/mosquitto-configuration.md +++ b/docs/src/operate/configuration/mosquitto-configuration.md @@ -80,36 +80,14 @@ tedge connect aws This will configure all the services (mosquitto, tedge-mapper-c8y.service, tedge-mapper-az.service, tedge-mapper-aws.service, tedge-agent.service) to use the newly set port and the bind address. -## Common Errors +### Step 4: Restart the local services using MQTT -The below example shows that we cannot set a string value for the port number. +After changing the mqtt port and host, all the services using MQTT have to be restarted. -```sh -tedge config set mqtt.bind.port '"1234"' -``` - -```text title="Output" -Error: failed to set the configuration key: mqtt.bind.port with value: "1234". - -Caused by: - Conversion from String failed -``` - -## Updating the mqtt port and bind address (host) in collectd and for collectd-mapper - -Update the `collectd.conf` with the new port and host in ``. - -Then, restart the collectd service. +- For the Cumulocity, Azure and AWS mappers, this is done running the appropriate `tedge connect` command. +- For user-configured mapper, this has to be done manually, for instance by restarting `tedge-mapper-local`. + ```sh + sudo systemctl restart tedge-mapper-local + ``` +- For other services, say `collectd`, please refer to their documentation to update their configuration and restart the service. -```sh -sudo systemctl restart collectd -``` - -After changing the mqtt port and host, then connect to the cloud using `tedge connect c8y/az`. -Then (Steps 1-3) the collectd-mapper has to be restarted to use the newly set port and bind address (host). - -Restart the tedge-mapper-collectd service. - -```sh -sudo systemctl restart tedge-mapper-collectd -``` diff --git a/docs/src/start/images/collectd-metrics.png b/docs/src/operate/images/collectd-metrics.png similarity index 100% rename from docs/src/start/images/collectd-metrics.png rename to docs/src/operate/images/collectd-metrics.png diff --git a/docs/src/start/device-monitoring.md b/docs/src/operate/monitoring/device-monitoring.md similarity index 96% rename from docs/src/start/device-monitoring.md rename to docs/src/operate/monitoring/device-monitoring.md index 026c925c8de..b246824a6cb 100644 --- a/docs/src/start/device-monitoring.md +++ b/docs/src/operate/monitoring/device-monitoring.md @@ -1,7 +1,7 @@ --- title: Monitoring -tags: [Getting Started, Monitoring, Collectd] -sidebar_position: 8 +tags: [Monitoring, Collectd] +sidebar_position: 1 description: Monitoring your device with collectd --- @@ -20,7 +20,7 @@ and then into the [cloud-vendor specific format](../understand/tedge-mapper.md). -![device monitoring with collectd](images/collectd-metrics.png) +![device monitoring with collectd](../images/collectd-metrics.png) @@ -156,7 +156,3 @@ tedge mqtt sub 'c8y/#' [c8y/measurement/measurements/create] {"type": "ThinEdgeMeasurement","time":"2021-06-07T15:40:30.155037451+01:00","cpu":{"percent-active": {"value": 0.753768844221106}},"memory":{"percent-used": {"value": 1.16587699972141}},"df-root":{"percent_bytes-used": {"value": 71.3117904663086}}} [c8y/measurement/measurements/create] {"type": "ThinEdgeMeasurement","time":"2021-06-07T15:40:31.154898577+01:00","cpu":{"percent-active": {"value": 0.5}},"memory":{"percent-used": {"value": 1.16608109197519}}} ``` - -## Troubleshooting - -For troubleshooting tips, check out the [device monitoring](../operate/troubleshooting/device-monitoring.md) section. diff --git a/docs/src/operate/monitoring/systemd-watchdog.md b/docs/src/operate/monitoring/systemd-watchdog.md index 00d1587ac31..4eceb8a728a 100644 --- a/docs/src/operate/monitoring/systemd-watchdog.md +++ b/docs/src/operate/monitoring/systemd-watchdog.md @@ -18,7 +18,7 @@ This document describes how the systemd watchdog mechanism can be enabled for %% ## Enabling the systemd watchdog feature for a tedge service -Enabling systemd watchdog for a %%te%% service (tedge-agent, tedge-mapper-c8y/az/collectd) is a two-step process. +Enabling systemd watchdog for a %%te%% service (tedge-agent, tedge-mapper-c8y/az) is a two-step process. ### Step 1: Enable the watchdog feature in the systemd service file diff --git a/docs/src/operate/troubleshooting/device-monitoring.md b/docs/src/operate/troubleshooting/device-monitoring.md deleted file mode 100644 index 9d6f3adfbc9..00000000000 --- a/docs/src/operate/troubleshooting/device-monitoring.md +++ /dev/null @@ -1,67 +0,0 @@ ---- -title: Device Monitoring -tags: [Operate, Monitoring] -sidebar_position: 1 -description: How to troubleshoot device monitoring ---- - -To install and configure monitoring on your device, -see the tutorial [Monitor your device with collectd](../../start/device-monitoring.md). - -## Is collectd running? - -```sh -sudo systemctl status collectd -``` - -If not, launch collected - -```sh -sudo systemctl start collectd -``` - -## Is collectd publishing MQTT messages? - -```sh te2mqtt formats=v1 -tedge mqtt sub 'collectd/#' -``` - -If no metrics are collected, please check the [MQTT configuration](../../start/device-monitoring.md#collectd-configuration) - -:::note -The `collectd.conf` file included with %%te%% is configured for conservative interval times, e.g. 10 mins to 1 hour depending on the metric. This is done so that the metrics don't consume unnecessary IoT resources both on the device and in the cloud. If you want to push the metrics more frequently then you will have to adjust the `Interval` settings either globally or on the individual plugins. Make sure you restart the collectd service after making any changes to the configuration. -::: - -## Is the tedge-mapper-collectd running? - -```sh -sudo systemctl status tedge-mapper-collectd -``` - -If not, launch tedge-mapper-collectd.service as below - -```sh -sudo systemctl start tedge-mapper-collectd -``` - -## Are the collectd metrics published in Thin Edge JSON format? - -```sh te2mqtt formats=v1 -tedge mqtt sub 'te/device/main///m/+' -``` - -## Are the collectd metrics published to Cumulocity? - -```sh te2mqtt formats=v1 -tedge mqtt sub 'c8y/#' -``` - -If not see how to [connect a device to Cumulocity](../../start/connect-c8y.md). - -## Are the collectd metrics published to Azure IoT? - -```sh te2mqtt formats=v1 -tedge mqtt sub 'az/#' -``` - -If not see how to [connect a device to Azure IoT](../../start/connect-azure.md). diff --git a/docs/src/operate/troubleshooting/log-files.md b/docs/src/operate/troubleshooting/log-files.md index 5a3c63912e5..0759755b530 100644 --- a/docs/src/operate/troubleshooting/log-files.md +++ b/docs/src/operate/troubleshooting/log-files.md @@ -47,21 +47,7 @@ journalctl -u tedge-mapper-aws ``` :::note -Run `tedge_mapper --debug aws` to log more debug messages -::: - -### Device monitoring logs {#device-logs} -The %%te%% device monitoring component logs can be found as below - -#### Collectd mapper logs {#collectd-mapper} -The log messages of the collectd mapper that sends the monitoring data to the cloud can be accessed as below - -```sh -journalctl -u tedge-mapper-collectd -``` - -:::note -Run `tedge-mapper --debug collectd` to log more debug messages +Run `tedge-mapper --debug aws` to log more debug messages ::: ### Software Management logs {#software-management} @@ -91,7 +77,7 @@ Run `tedge-agent --debug` to log more debug messages ::: ## Third-party component logs {#thirdparty} -%%te%% uses the third-party components `Mosquitto` as the mqtt broker and `Collectd` for monitoring purpose. +%%te%% uses the third-party components `Mosquitto` as the mqtt broker. The logs that are created by these components can be accessed on a %%te%% device as below. ### Mosquitto logs {#mosquitto} @@ -103,15 +89,6 @@ The `Mosquitto` logs can be found in `/var/log/mosquitto/mosquitto.log`. Set `log_type debug` or `log_type all` on `/etc/mosquitto/mosquitto.conf`, to capture more debug information. ::: -### Collectd logs {#collectd} -`Collectd` is used for monitoring the resource status of a %%te%% device. -Collectd logs all the messages at `/var/log/syslog`. -So, the collectd specific logs can be accessed using the `journalctl` as below - -```sh -journalctl -u collectd -``` - ## Configuring log levels in %%te%% {#configure-log-levels} The log levels can be configured for %%te%% services using either by command line or setting the required log diff --git a/docs/src/operate/troubleshooting/monitoring-service-health.md b/docs/src/operate/troubleshooting/monitoring-service-health.md index cee97d4170f..5dd06734e9e 100644 --- a/docs/src/operate/troubleshooting/monitoring-service-health.md +++ b/docs/src/operate/troubleshooting/monitoring-service-health.md @@ -62,7 +62,6 @@ The following endpoints are currently supported: * `te/device/main/service/tedge-mapper-c8y/status/health` * `te/device/main/service/tedge-mapper-az/status/health` * `te/device/main/service/tedge-mapper-aws/status/health` -* `te/device/main/service/tedge-mapper-collectd/status/health` All future tedge services will also follow the same topic naming scheme convention. diff --git a/docs/src/references/mappers/flows.md b/docs/src/references/mappers/flows.md index eda0879be0e..de55513c023 100644 --- a/docs/src/references/mappers/flows.md +++ b/docs/src/references/mappers/flows.md @@ -568,7 +568,8 @@ __Note__ that when the input of a test is received from its stdin, the topic is given using a bracket syntax `[] ` similar to the output of `tedge mqtt sub` and `tedge flows test` itself. -This can be used to chain tests: +This can be used to chain tests. +For example, assuming a flow has been installed to transform `collectd` messages into %%te%% measurements: ```shell $ tedge flows test collectd/mandarine/cpu/percent-active '1754571280.572:2.07156308851224' | tedge flows test diff --git a/docs/src/references/mappers/mqtt-topics.md b/docs/src/references/mappers/mqtt-topics.md index a06f6f6249a..55d708d9a21 100644 --- a/docs/src/references/mappers/mqtt-topics.md +++ b/docs/src/references/mappers/mqtt-topics.md @@ -82,19 +82,3 @@ The AWS topics are prefixed by `aws/`. * `aws/shadow/#` Use this topic to interact with unnamed and named shadows of the device. It's mapped to `$aws/things/{device_id}/shadow`. -## Collectd topics - -When the [device monitoring feature is enabled](../../start/device-monitoring.md), -monitoring metrics are emitted by `collectd` on a hierarchy of MQTT topics. - -* `collectd/$HOSTNAME/#` - All the metrics collected on the device (which hostname is `$HOSTNAME`). -* `collectd/$HOSTNAME/$PLUGIN/#` - All the metrics collected by a given collectd plugin, named `$PLUGIN`. -* `collectd/$HOSTNAME/$PLUGIN/$METRIC` - The topic for a given metric, named `$METRIC`. - All the measurements are published as a pair of a Unix timestamp in milliseconds and a numeric value - in the format `$TIMESTAMP:$VALUE`. For example, `1623155717:98.6`. - -The `collectd-mapper` daemon process ingests these measurements and emits translated messages -to the measurement topic. - -* This process groups the atomic measurements that have been received during the same time-window (currently 200 ms) -* and produces a single %%te%% JSON for the whole group of measurements. diff --git a/docs/src/references/supported-platforms.md b/docs/src/references/supported-platforms.md index 701d9361ff4..fe6b496df6a 100644 --- a/docs/src/references/supported-platforms.md +++ b/docs/src/references/supported-platforms.md @@ -90,7 +90,6 @@ In this scenario, all of the %%te%% components are running on the gateway device |Name|Typical Memory Usage (MiB)| |--|--| |tedge-mapper c8y (Cumulocity)|8| -|tedge-mapper collectd |8| |tedge-agent|8| |mosquitto|10| |**Total**|34| diff --git a/docs/src/start/getting-started.md b/docs/src/start/getting-started.md index dba095c6d20..36d4f2fd3fb 100644 --- a/docs/src/start/getting-started.md +++ b/docs/src/start/getting-started.md @@ -49,10 +49,9 @@ This tutorial is divided into small steps. The first three steps are needed to i - [Step 1 Install %%te%%](#step-1-install-thin-edgeio) - [Step 2 Configure and Connect to Cumulocity](#step-2-configure-and-connect-to-cumulocity) - [Step 3 Sending Device Data](#step-3-sending-device-data) -- [Step 4 Monitor the device](#step-4-monitor-the-device) -- [Step 5 Add software management](#step-5-add-software-management) -- [Step 6 Manage configuration files](#step-6-manage-configuration-files) -- [Step 7 Manage Log Files](#step-7-manage-log-files) +- [Step 4 Add software management](#step-5-add-software-management) +- [Step 5 Manage configuration files](#step-6-manage-configuration-files) +- [Step 6 Manage Log Files](#step-7-manage-log-files) ## Step 1 Install %%te%% {#step-1-install-thin-edgeio} @@ -264,77 +263,7 @@ When you go to events (`Device management` → `your device` → `events`) -## Step 4 Monitor the device - -With %%te%% device monitoring, you can collect metrics from the device and forward these device metrics to Cumulocity. - -Device monitoring can be enabled by installing a community package, [tedge-collectd-setup](https://cloudsmith.io/~thinedge/repos/community/packages/?q=name%3A%27%5Etedge-collectd-setup%24%27), which will install [collectd](https://www.collectd.org/) and configure some sensible defaults including monitoring of cpu, memory and disk metrics. - -```sh tab={"label":"Debian/Ubuntu"} -sudo apt-get install tedge-collectd-setup -``` - -```sh tab={"label":"RHEL/Fedora/RockyLinux"} -sudo dnf install tedge-collectd-setup -``` - -```sh tab={"label":"Alpine"} -sudo apk add tedge-collectd-setup -``` - -What you should see by now is that data arrives on the `collectd/#` topics. You can check that via: - -```sh te2mqtt formats=v1 -tedge mqtt sub 'collectd/#' -``` - -The output will be similar like: - -```log title="Output" -INFO: Connected -[collectd/raspberrypi/df-root/percent_bytes-used] 1667205183.407:11.7998857498169 -[collectd/raspberrypi/memory/percent-used] 1667205183.408:4.87045198079293 -[collectd/raspberrypi/cpu/percent-active] 1667205184.398:1.52284263959391 -``` - -:::note -The default collectd settings, `/etc/collectd/collectd.conf`, use conservative interval times, e.g. 10 mins to 1 hour depending on the metric. This is done so that the metrics don't consume unnecessary IoT resources both on the device and in the cloud. If you want to push the metrics more frequently then you will have to adjust the `Interval` settings either globally or on the individual plugins. Make sure you restart the collectd service after making any changes to the configuration. -::: - -The `tedge-mapper-collectd` service subscribes to the `collectd/#` topics and translates them to the tedge payloads, then the respective cloud mappers will translate the %%te%% messages to the format dictated by each cloud. - -As an example, you can inspect the Cumulocity translated metrics using the following command: - -```sh te2mqtt formats=v1 -tedge mqtt sub 'c8y/#' -``` - -The output will be similar like: - -```log title="Output" -INFO: Connected -[c8y/measurement/measurements/create] {"type":"ThinEdgeMeasurement","time":"2022-10-31T08:35:44.398000001Z","cpu":{"percent-active":{"value":1.26262626262626}},"memory":{"percent-used":{"value":4.87024847292786}}} -[c8y/measurement/measurements/create] {"type":"ThinEdgeMeasurement","time":"2022-10-31T08:35:45.398000001Z","memory":{"percent-used":{"value":4.87024847292786}},"cpu":{"percent-active":{"value":1.01522842639594}}} -[c8y/measurement/measurements/create] {"type":"ThinEdgeMeasurement","time":"2022-10-31T08:35:46.398000001Z","memory":{"percent-used":{"value":4.87024847292786}},"cpu":{"percent-active":{"value":0.759493670886076}}} -[c8y/measurement/measurements/create] {"type":"ThinEdgeMeasurement","time":"2022-10-31T08:35:47.398000001Z","memory":{"percent-used":{"value":4.87024847292786}},"cpu":{"percent-active":{"value":2.01005025125628}}} -[c8y/measurement/measurements/create] {"type":"ThinEdgeMeasurement","time":"2022-10-31T08:35:48.398000001Z","memory":{"percent-used":{"value":4.87004496506279}},"cpu":{"percent-active":{"value":0.254452926208651}}} -``` - -The monitoring data will appear in Cumulocity on the device in the measurement section. - - - -![CollectdMeasurements](./images/collectd-metrics.png) - - - - -### Edit Collectd - -To change the monitored data, it is needed to change the collectd.conf. This can be done via Cumulocity, and [step 6](#change-collectd-configuration) explains how to do it. - - -## Step 5 Add software management +## Step 4 Add software management Software management takes care of allowing installation and management of any type of software from Cumulocity. Since the type is generic, any type of software can be managed. In %%te%% this can be extended with plugins. For every software type, a particular plugin is needed. @@ -399,7 +328,7 @@ Find more information about [how to manage the software](https://cumulocity.com/ How to [develop your own plugins](../extend/software-management.md) is described here. -## Step 6 Manage configuration files +## Step 5 Manage configuration files With %%te%% it is possible to manage config files on a device by using the Cumulocity configuration management feature as a part of Device Management. @@ -482,7 +411,7 @@ To change the collectd metrics of the device, which are displayed in Cumulocity, 10. If you then click on get snapshot from device (select the right configuration file in device supported configurations), you will see the change of the configuration file. -## Step 7 Manage Log Files +## Step 6 Manage Log Files With %%te%% it is possible to request log files from a device by using the Cumulocity log request feature as a part of Device Management. diff --git a/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot b/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot deleted file mode 100644 index 437de9fe672..00000000000 --- a/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot +++ /dev/null @@ -1,60 +0,0 @@ -*** Settings *** -Resource ../../resources/common.resource -Library ThinEdgeIO - -Suite Setup Setup -Suite Teardown Get Suite Logs - -Test Tags theme:monitoring - - -*** Test Cases *** -Stop tedge-mapper-collectd - Execute Command sudo systemctl stop tedge-mapper-collectd.service - -Update the service file - Execute Command cmd=sudo sed -i '10iWatchdogSec=30' /lib/systemd/system/tedge-mapper-collectd.service - -Reload systemd files - Execute Command sudo systemctl daemon-reload - -Start tedge-mapper-collectd - Execute Command sudo systemctl start tedge-mapper-collectd.service - -Start watchdog service - Execute Command sudo systemctl start tedge-watchdog.service - Sleep 10s - -Check PID of tedge-mapper-collectd - ${pid}= Service Should Be Running tedge-mapper-collectd - Set Suite Variable ${pid} - -Kill the PID - Kill Process ${pid} - -Recheck PID of tedge-mapper-collectd - ${pid1}= Service Should Be Running tedge-mapper-collectd - Set Suite Variable ${pid1} - -Compare PID change - Should Not Be Equal ${pid} ${pid1} - -Stop watchdog service - Execute Command sudo systemctl stop tedge-watchdog.service - -Remove entry from service file - Execute Command sudo sed -i '10d' /lib/systemd/system/tedge-mapper-collectd.service - Execute Command sudo systemctl daemon-reload - -tedge-collectd-mapper health status - Execute Command sudo systemctl start tedge-mapper-collectd.service - - Sleep 5s reason=It fails without this! It needs a better way of queuing requests - ${pid}= Service Should Be Running tedge-mapper-collectd - Execute Command sudo tedge mqtt pub 'te/device/main/service/tedge-mapper-collectd/cmd/health/check' '' - ${messages}= Should Have MQTT Messages - ... te/device/main/service/tedge-mapper-collectd/status/health - ... minimum=1 - ... maximum=2 - Should Contain ${messages[0]} "pid":${pid} - Should Contain ${messages[0]} "status":"up" diff --git a/tests/RobotFramework/tests/tedge/diag/predefined_plugins.robot b/tests/RobotFramework/tests/tedge/diag/predefined_plugins.robot index f6faf0688f0..e9d8ed2eba4 100644 --- a/tests/RobotFramework/tests/tedge/diag/predefined_plugins.robot +++ b/tests/RobotFramework/tests/tedge/diag/predefined_plugins.robot @@ -14,7 +14,6 @@ Test Tags theme:troubleshooting theme:cli theme:plugins ... output.log ... tedge-agent.log ... tedge-mapper-c8y.log - ... tedge-mapper-collectd.log ... tedge-config-list.log ... tedge.toml ... mappers/c8y/mapper.toml @@ -25,7 +24,6 @@ Test Tags theme:troubleshooting theme:cli theme:plugins Log Should Contain tedge-agent.log Starting tedge-agent.service Log Should Contain tedge-mapper-c8y.log Starting tedge-mapper-c8y.service - Log Should Contain tedge-mapper-collectd.log Starting tedge-mapper-collectd.service Log Should Contain tedge-config-list.log c8y.url Log Should Contain mappers/c8y/mapper.toml url Log Should Contain mappers/tb/mapper.toml mqtt.azure.com @@ -106,7 +104,6 @@ Log Should Contain Custom Suite Setup Setup Execute Command mkdir -p /results - Start Service tedge-mapper-collectd Execute Command mkdir -p /etc/tedge/mappers/tb Execute Command printf 'url \= "mqtt.azure.com:1883"\n' > /etc/tedge/mappers/tb/mapper.toml Execute Command tedge diag collect --keep-dir --output-dir /results --name test