diff --git a/deploy/standard/manifests/controller/helm/retina/templates/configmap.yaml b/deploy/standard/manifests/controller/helm/retina/templates/configmap.yaml index da0de325f2..14f197b8eb 100644 --- a/deploy/standard/manifests/controller/helm/retina/templates/configmap.yaml +++ b/deploy/standard/manifests/controller/helm/retina/templates/configmap.yaml @@ -25,6 +25,7 @@ data: bypassLookupIPOfInterest: {{ .Values.bypassLookupIPOfInterest }} dataAggregationLevel: {{ .Values.dataAggregationLevel }} telemetryInterval: {{ .Values.daemonset.telemetryInterval }} + dataSamplingRate: {{ .Values.dataSamplingRate }} {{- end}} --- {{- if .Values.os.windows}} diff --git a/deploy/standard/manifests/controller/helm/retina/values.yaml b/deploy/standard/manifests/controller/helm/retina/values.yaml index 6bf0c9cf60..49902e33e4 100644 --- a/deploy/standard/manifests/controller/helm/retina/values.yaml +++ b/deploy/standard/manifests/controller/helm/retina/values.yaml @@ -56,6 +56,7 @@ remoteContext: false enableAnnotations: false bypassLookupIPOfInterest: false dataAggregationLevel: "low" +dataSamplingRate: 1 imagePullSecrets: [] nameOverride: "retina" diff --git a/docs/02-Installation/03-Config.md b/docs/02-Installation/03-Config.md index 6b6df2895d..97dbac74d5 100644 --- a/docs/02-Installation/03-Config.md +++ b/docs/02-Installation/03-Config.md @@ -53,6 +53,7 @@ Apply to both Agent and Operator. * `enableAnnotations`: Enables gathering of metrics for annotated resources. Resources can be annotated with `retina.sh=observe`. Requires the operator and `operator.enableRetinaEndpoint` to be enabled. By enabling annotations, the agent will not use MetricsConfiguration CRD. * `bypassLookupIPOfInterest`: If true, plugins like `packetparser` and `dropreason` will bypass IP lookup, generating an event for each packet regardless. `enableAnnotations` will not work if this is true. * `dataAggregationLevel`: Defines the level of data aggregation for Retina. See [Data Aggregation](../05-Concepts/data-aggregation.md) for more details. +* `dataSamplingRate`: Defines the data sampling rate for `packetparser`. See [Sampling](../03-Metrics/plugins/Linux/packetparser.md#sampling) for more details. ## Operator Configuration diff --git a/docs/03-Metrics/plugins/Linux/packetparser.md b/docs/03-Metrics/plugins/Linux/packetparser.md index b772fe362a..d2a6d92618 100644 --- a/docs/03-Metrics/plugins/Linux/packetparser.md +++ b/docs/03-Metrics/plugins/Linux/packetparser.md @@ -15,6 +15,14 @@ The `packetparser` plugin requires the `CAP_NET_ADMIN` and `CAP_SYS_ADMIN` capab `packetparser` does not produce Basic metrics. In Advanced mode (refer to [Metric Modes](../../modes/modes.md)), the plugin transforms an eBPF result into an enriched `Flow` by adding Pod information based on IP. It then sends the `Flow` to an external channel, enabling *several modules* to generate Pod-Level metrics. +## Sampling + +Since `packetparser` produces many enriched `Flow` objects it can be quite expensive for user space to process. Thus, when operating in `high` [data aggregation](../../../05-Concepts/data-aggregation.md) level optional sampling for reported packets is available via the `dataSamplingRate` configuration option. + +`dataSamplingRate` is expressed in 1 out of N terms, where N is the `dataSamplingRate` value. For example, if `dataSamplingRate` is 3 1/3rd of packets will be sampled for reporting. + +Keep in mind that there are cases where reporting will happen anyways as to ensure metric accuracy. + ### Code locations - Plugin and eBPF code: *pkg/plugin/packetparser/* diff --git a/pkg/config/config.go b/pkg/config/config.go index 0ac6b1cb06..f9086ff902 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -24,8 +24,9 @@ const ( ) var ( - ErrorTelemetryIntervalTooSmall = fmt.Errorf("telemetryInterval smaller than %v is not allowed", MinTelemetryInterval) - DefaultTelemetryInterval = 15 * time.Minute + ErrorTelemetryIntervalTooSmall = fmt.Errorf("telemetryInterval smaller than %v is not allowed", MinTelemetryInterval) + DefaultTelemetryInterval = 15 * time.Minute + DefaultSamplingRate uint32 = 1 ) func (l *Level) UnmarshalText(text []byte) error { @@ -75,6 +76,7 @@ type Config struct { DataAggregationLevel Level `yaml:"dataAggregationLevel"` MonitorSockPath string `yaml:"monitorSockPath"` TelemetryInterval time.Duration `yaml:"telemetryInterval"` + DataSamplingRate uint32 `yaml:"dataSamplingRate"` } func GetConfig(cfgFilename string) (*Config, error) { @@ -122,6 +124,12 @@ func GetConfig(cfgFilename string) (*Config, error) { return nil, ErrorTelemetryIntervalTooSmall } + // If unset, default sampling rate to 1 + if config.DataSamplingRate == 0 { + log.Printf("dataSamplingRate is not set, defaulting to %v", DefaultSamplingRate) + config.DataSamplingRate = DefaultSamplingRate + } + return &config, nil } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 2d6ca3b92f..d26d6e5835 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -28,7 +28,8 @@ func TestGetConfig(t *testing.T) { c.RemoteContext || c.EnableAnnotations || c.TelemetryInterval != 15*time.Minute || - c.DataAggregationLevel != Low { + c.DataAggregationLevel != Low || + c.DataSamplingRate != 1 { t.Errorf("Expeted config should be same as ./testwith/config.yaml; instead got %+v", c) } } @@ -65,6 +66,5 @@ func TestDecodeLevelHook(t *testing.T) { result, err := decodeLevelHook(reflect.TypeOf(test.input), reflect.TypeOf(Level(0)), test.input) require.NoError(t, err) assert.Equal(t, test.expected, result) - } } diff --git a/pkg/plugin/conntrack/_cprog/conntrack.c b/pkg/plugin/conntrack/_cprog/conntrack.c index be648d8198..f5fb2b3729 100644 --- a/pkg/plugin/conntrack/_cprog/conntrack.c +++ b/pkg/plugin/conntrack/_cprog/conntrack.c @@ -233,8 +233,9 @@ static __always_inline __u8 _ct_get_traffic_direction(__u8 observation_point) { * @arg key The key to be used to create the new connection. * @arg observation_point The point in the network stack where the packet is observed. * @arg is_reply true if the packet is a SYN-ACK packet. False if it is a SYN packet. + * @arg sampled Whether or not the packet was sampled for reporting. */ -static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point, bool is_reply) { +static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point, bool is_reply, bool sampled) { struct ct_entry new_value; __builtin_memset(&new_value, 0, sizeof(struct ct_entry)); __u64 now = bpf_mono_now(); @@ -245,14 +246,20 @@ static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, stru new_value.eviction_time = now + CT_SYN_TIMEOUT; if(is_reply) { new_value.flags_seen_rx_dir = p->flags; - new_value.last_report_rx_dir = now; - new_value.bytes_seen_since_last_report_rx_dir = 0; - new_value.packets_seen_since_last_report_rx_dir = 0; + new_value.last_report_rx_dir = sampled ? now : 0; + new_value.bytes_seen_since_last_report_rx_dir = !sampled ? p->bytes : 0; + new_value.packets_seen_since_last_report_rx_dir = !sampled; + if (!sampled) { + _ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_rx_dir); + } } else { new_value.flags_seen_tx_dir = p->flags; - new_value.last_report_tx_dir = now; - new_value.bytes_seen_since_last_report_tx_dir = 0; - new_value.packets_seen_since_last_report_tx_dir = 0; + new_value.last_report_tx_dir = sampled ? now : 0; + new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0; + new_value.packets_seen_since_last_report_tx_dir = !sampled; + if (!sampled) { + _ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_tx_dir); + } } new_value.is_direction_unknown = false; new_value.traffic_direction = _ct_get_traffic_direction(observation_point); @@ -273,7 +280,7 @@ static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, stru p->is_reply = is_reply; p->traffic_direction = new_value.traffic_direction; bpf_map_update_elem(&retina_conntrack, key, &new_value, BPF_ANY); - return true; + return sampled; } /** @@ -281,8 +288,9 @@ static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, stru * @arg *p pointer to the packet to be processed. * @arg key The key to be used to create the new connection. * @arg observation_point The point in the network stack where the packet is observed. + * @arg sampled Whether or not the packet was sampled for reporting. */ -static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point) { +static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point, bool sampled) { if (!p || !key) { return false; } @@ -295,9 +303,9 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c } new_value.eviction_time = now + CT_CONNECTION_LIFETIME_NONTCP; new_value.flags_seen_tx_dir = p->flags; - new_value.last_report_tx_dir = now; - new_value.bytes_seen_since_last_report_tx_dir = 0; - new_value.packets_seen_since_last_report_tx_dir = 0; + new_value.last_report_tx_dir = sampled ? now : 0; + new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0; + new_value.packets_seen_since_last_report_tx_dir = !sampled; new_value.traffic_direction = _ct_get_traffic_direction(observation_point); #ifdef ENABLE_CONNTRACK_METRICS new_value.conntrack_metadata.packets_tx_count = 1; @@ -310,7 +318,7 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c p->is_reply = false; p->traffic_direction = new_value.traffic_direction; bpf_map_update_elem(&retina_conntrack, key, &new_value, BPF_ANY); - return true; + return sampled; } /** @@ -319,18 +327,19 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c * @arg key The key to be used to handle the connection. * @arg reverse_key The reverse key to be used to handle the connection. * @arg observation_point The point in the network stack where the packet is observed. + * @arg sampled Whether or not the packet was sampled for reporting. */ -static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point) { +static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point, bool sampled) { if (!p || !key || !reverse_key) { return false; } u8 tcp_handshake = p->flags & (TCP_SYN|TCP_ACK); if (tcp_handshake == TCP_SYN) { // We have a SYN, we set `is_reply` to false and we provide `key` - return _ct_create_new_tcp_connection(p, key, observation_point, false); + return _ct_create_new_tcp_connection(p, key, observation_point, false, sampled); } else if(tcp_handshake == (TCP_SYN|TCP_ACK)) { // We have a SYN-ACK, we set `is_reply` to true and we provide `reverse_key` - return _ct_create_new_tcp_connection(p, reverse_key, observation_point, true); + return _ct_create_new_tcp_connection(p, reverse_key, observation_point, true, sampled); } // The packet is not a SYN packet and the connection corresponding to this packet is not found. @@ -353,9 +362,12 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c if (p->flags & TCP_ACK) { p->is_reply = true; new_value.flags_seen_rx_dir = p->flags; - new_value.last_report_rx_dir = now; - new_value.bytes_seen_since_last_report_rx_dir = 0; - new_value.packets_seen_since_last_report_rx_dir = 0; + new_value.last_report_rx_dir = sampled ? now : 0; + new_value.bytes_seen_since_last_report_rx_dir = !sampled ? p->bytes : 0; + new_value.packets_seen_since_last_report_rx_dir = !sampled; + if (!sampled) { + _ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_rx_dir); + } #ifdef ENABLE_CONNTRACK_METRICS new_value.conntrack_metadata.bytes_rx_count = p->bytes; new_value.conntrack_metadata.packets_rx_count = 1; @@ -364,9 +376,12 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c } else { // Otherwise, the packet is considered as a packet in the send direction. p->is_reply = false; new_value.flags_seen_tx_dir = p->flags; - new_value.last_report_tx_dir = now; - new_value.bytes_seen_since_last_report_tx_dir = 0; - new_value.packets_seen_since_last_report_tx_dir = 0; + new_value.last_report_tx_dir = sampled ? now : 0; + new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0; + new_value.packets_seen_since_last_report_tx_dir = !sampled; + if (!sampled) { + _ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_tx_dir); + } #ifdef ENABLE_CONNTRACK_METRICS new_value.conntrack_metadata.bytes_tx_count = p->bytes; new_value.conntrack_metadata.packets_tx_count = 1; @@ -377,7 +392,7 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c // Update packet's conntrack metadata. __builtin_memcpy(&p->conntrack_metadata, &new_value.conntrack_metadata, sizeof(struct conntrackmetadata)); #endif // ENABLE_CONNTRACK_METRICS - return true; + return sampled; } /** @@ -386,17 +401,18 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c * @arg key The key to be used to handle the connection. * @arg reverse_key The reverse key to be used to handle the connection. * @arg observation_point The point in the network stack where the packet is observed. + * @arg sampled Whether or not the packet was sampled for reporting. */ -static __always_inline struct packetreport _ct_handle_new_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point) { +static __always_inline struct packetreport _ct_handle_new_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point, bool sampled) { struct packetreport report; __builtin_memset(&report, 0, sizeof(struct packetreport)); if (!p || !key || !reverse_key) { return report; } if (key->proto & IPPROTO_TCP) { - report.report = _ct_handle_tcp_connection(p, key, reverse_key, observation_point); + report.report = _ct_handle_tcp_connection(p, key, reverse_key, observation_point, sampled); } else if (key->proto & IPPROTO_UDP) { - report.report = _ct_handle_udp_connection(p, key, observation_point); + report.report = _ct_handle_udp_connection(p, key, observation_point, sampled); } else { report.report = false; // We are not interested in other protocols. } @@ -410,9 +426,10 @@ static __always_inline struct packetreport _ct_handle_new_connection(struct pack * @arg flags The flags of the packet. * @arg direction The direction of the packet in relation to the connection. * @arg bytes The size of the packet in bytes. + * @arg sampled Whether or not the packet was sampled for reporting. * Returns a packetreport struct representing if the packet should be reported to userspace. */ -static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4_key *key, struct ct_entry *entry, __u8 flags, __u8 direction, __u32 bytes) { +static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4_key *key, struct ct_entry *entry, __u8 flags, __u8 direction, __u32 bytes, bool sampled) { struct packetreport report; __builtin_memset(&report, 0, sizeof(struct packetreport)); report.report = false; @@ -522,21 +539,27 @@ static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4 WRITE_ONCE(entry->eviction_time, now + CT_CONNECTION_LIFETIME_NONTCP); } + if (flags != seen_flags) { + if (direction == CT_PACKET_DIR_TX) { + WRITE_ONCE(entry->flags_seen_tx_dir, flags); + } else { + WRITE_ONCE(entry->flags_seen_rx_dir, flags); + } + } + // Report if: // 1. We already decided to report based on protocol-specific rules, or - // 2. New flags have appeared, or + // 2. New flags have appeared and the packet has been sampled, or // 3. Reporting interval has elapsed - if (should_report || flags != seen_flags || now - last_report >= CT_REPORT_INTERVAL) { + if (should_report || (sampled && flags != seen_flags) || now - last_report >= CT_REPORT_INTERVAL) { report.report = true; // Update the connection's state if (direction == CT_PACKET_DIR_TX) { - WRITE_ONCE(entry->flags_seen_tx_dir, flags); WRITE_ONCE(entry->last_report_tx_dir, now); WRITE_ONCE(entry->bytes_seen_since_last_report_tx_dir, 0); WRITE_ONCE(entry->packets_seen_since_last_report_tx_dir, 0); __builtin_memset(&entry->flags_seen_since_last_report_tx_dir, 0, sizeof(struct tcpflagscount)); } else { - WRITE_ONCE(entry->flags_seen_rx_dir, flags); WRITE_ONCE(entry->last_report_rx_dir, now); WRITE_ONCE(entry->bytes_seen_since_last_report_rx_dir, 0); WRITE_ONCE(entry->packets_seen_since_last_report_rx_dir, 0); @@ -565,9 +588,10 @@ static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4 * Process a packet and update the connection tracking map. * @arg *p pointer to the packet to be processed. * @arg observation_point The point in the network stack where the packet is observed. + * @arg sampled Whether or not the packet has been sampled for reporting. * Returns a packetreport struct representing if the packet should be reported to userspace. */ -static __always_inline __attribute__((unused)) struct packetreport ct_process_packet(struct packet *p, __u8 observation_point) { +static __always_inline __attribute__((unused)) struct packetreport ct_process_packet(struct packet *p, __u8 observation_point, bool sampled) { if (!p) { struct packetreport report; __builtin_memset(&report, 0, sizeof(struct packetreport)); @@ -601,7 +625,7 @@ static __always_inline __attribute__((unused)) struct packetreport ct_process_pa // Update packet's conntract metadata. __builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata)); #endif // ENABLE_CONNTRACK_METRICS - return _ct_should_report_packet(&key, entry, p->flags, CT_PACKET_DIR_TX, p->bytes); + return _ct_should_report_packet(&key, entry, p->flags, CT_PACKET_DIR_TX, p->bytes, sampled); } // The connection is not found in the send direction. Check the reply direction by reversing the key. @@ -623,9 +647,9 @@ static __always_inline __attribute__((unused)) struct packetreport ct_process_pa // Update packet's conntract metadata. __builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata)); #endif // ENABLE_CONNTRACK_METRICS - return _ct_should_report_packet(&reverse_key, entry, p->flags, CT_PACKET_DIR_RX, p->bytes); + return _ct_should_report_packet(&reverse_key, entry, p->flags, CT_PACKET_DIR_RX, p->bytes, sampled); } // If the connection is still not found, the connection is new. - return _ct_handle_new_connection(p, &key, &reverse_key, observation_point); + return _ct_handle_new_connection(p, &key, &reverse_key, observation_point, sampled); } diff --git a/pkg/plugin/packetparser/_cprog/dynamic.h b/pkg/plugin/packetparser/_cprog/dynamic.h index ecadc42211..43f0976f19 100644 --- a/pkg/plugin/packetparser/_cprog/dynamic.h +++ b/pkg/plugin/packetparser/_cprog/dynamic.h @@ -1,2 +1,3 @@ #define BYPASS_LOOKUP_IP_OF_INTEREST 0 #define DATA_AGGREGATION_LEVEL 0 +#define DATA_SAMPLING_RATE 1 \ No newline at end of file diff --git a/pkg/plugin/packetparser/_cprog/packetparser.c b/pkg/plugin/packetparser/_cprog/packetparser.c index 07256867fb..d961c81f5c 100644 --- a/pkg/plugin/packetparser/_cprog/packetparser.c +++ b/pkg/plugin/packetparser/_cprog/packetparser.c @@ -208,11 +208,24 @@ static void parse(struct __sk_buff *skb, __u8 obs) p.conntrack_metadata = conntrack_metadata; #endif // ENABLE_CONNTRACK_METRICS + #ifdef DATA_AGGREGATION_LEVEL + + // Calculate sampling + bool sampled __attribute__((unused)); + sampled = true; + + #ifdef DATA_SAMPLING_RATE + u32 rand __attribute__((unused)); + rand = bpf_get_prandom_u32(); + if (rand >= UINT32_MAX / DATA_SAMPLING_RATE) { + sampled = false; + } + #endif + // Process the packet in ct struct packetreport report __attribute__((unused)); - report = ct_process_packet(&p, obs); + report = ct_process_packet(&p, obs, sampled); - #ifdef DATA_AGGREGATION_LEVEL // If the data aggregation level is low, always send the packet to the perf buffer. #if DATA_AGGREGATION_LEVEL == DATA_AGGREGATION_LEVEL_LOW p.previously_observed_packets = 0; diff --git a/pkg/plugin/packetparser/packetparser_linux.go b/pkg/plugin/packetparser/packetparser_linux.go index 9baade8e67..f6f5389962 100644 --- a/pkg/plugin/packetparser/packetparser_linux.go +++ b/pkg/plugin/packetparser/packetparser_linux.go @@ -117,6 +117,10 @@ func (p *packetParser) Generate(ctx context.Context) error { p.l.Info("data aggregation level", zap.String("level", p.cfg.DataAggregationLevel.String())) st += fmt.Sprintf("#define DATA_AGGREGATION_LEVEL %d\n", p.cfg.DataAggregationLevel) + // Process packetparser sampling rate. + p.l.Info("sampling rate", zap.Uint32("rate", p.cfg.DataSamplingRate)) + st += fmt.Sprintf("#define DATA_SAMPLING_RATE %d\n", p.cfg.DataSamplingRate) + // Generate dynamic header for packetparser. err = loader.WriteFile(ctx, dynamicHeaderPath, st) if err != nil {