Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ data:
bypassLookupIPOfInterest: {{ .Values.bypassLookupIPOfInterest }}
dataAggregationLevel: {{ .Values.dataAggregationLevel }}
telemetryInterval: {{ .Values.daemonset.telemetryInterval }}
dataSamplingRate: {{ .Values.dataSamplingRate }}
{{- end}}
---
{{- if .Values.os.windows}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ remoteContext: false
enableAnnotations: false
bypassLookupIPOfInterest: false
dataAggregationLevel: "low"
dataSamplingRate: 1

imagePullSecrets: []
nameOverride: "retina"
Expand Down
1 change: 1 addition & 0 deletions docs/02-Installation/03-Config.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Apply to both Agent and Operator.
* `enableAnnotations`: Enables gathering of metrics for annotated resources. Resources can be annotated with `retina.sh=observe`. Requires the operator and `operator.enableRetinaEndpoint` to be enabled. By enabling annotations, the agent will not use MetricsConfiguration CRD.
* `bypassLookupIPOfInterest`: If true, plugins like `packetparser` and `dropreason` will bypass IP lookup, generating an event for each packet regardless. `enableAnnotations` will not work if this is true.
* `dataAggregationLevel`: Defines the level of data aggregation for Retina. See [Data Aggregation](../05-Concepts/data-aggregation.md) for more details.
* `dataSamplingRate`: Defines the data sampling rate for `packetparser`. See [Sampling](../03-Metrics/plugins/Linux/packetparser.md#sampling) for more details.

## Operator Configuration

Expand Down
8 changes: 8 additions & 0 deletions docs/03-Metrics/plugins/Linux/packetparser.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ The `packetparser` plugin requires the `CAP_NET_ADMIN` and `CAP_SYS_ADMIN` capab

`packetparser` does not produce Basic metrics. In Advanced mode (refer to [Metric Modes](../../modes/modes.md)), the plugin transforms an eBPF result into an enriched `Flow` by adding Pod information based on IP. It then sends the `Flow` to an external channel, enabling *several modules* to generate Pod-Level metrics.

## Sampling

Since `packetparser` produces many enriched `Flow` objects it can be quite expensive for user space to process. Thus, when operating in `high` [data aggregation](../../../05-Concepts/data-aggregation.md) level optional sampling for reported packets is available via the `dataSamplingRate` configuration option.

`dataSamplingRate` is expressed in 1 out of N terms, where N is the `dataSamplingRate` value. For example, if `dataSamplingRate` is 3 1/3rd of packets will be sampled for reporting.

Keep in mind that there are cases where reporting will happen anyways as to ensure metric accuracy.

### Code locations

- Plugin and eBPF code: *pkg/plugin/packetparser/*
Expand Down
12 changes: 10 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ const (
)

var (
ErrorTelemetryIntervalTooSmall = fmt.Errorf("telemetryInterval smaller than %v is not allowed", MinTelemetryInterval)
DefaultTelemetryInterval = 15 * time.Minute
ErrorTelemetryIntervalTooSmall = fmt.Errorf("telemetryInterval smaller than %v is not allowed", MinTelemetryInterval)
DefaultTelemetryInterval = 15 * time.Minute
DefaultSamplingRate uint32 = 1
Comment thread
mmckeen marked this conversation as resolved.
)

func (l *Level) UnmarshalText(text []byte) error {
Expand Down Expand Up @@ -75,6 +76,7 @@ type Config struct {
DataAggregationLevel Level `yaml:"dataAggregationLevel"`
MonitorSockPath string `yaml:"monitorSockPath"`
TelemetryInterval time.Duration `yaml:"telemetryInterval"`
DataSamplingRate uint32 `yaml:"dataSamplingRate"`
}

func GetConfig(cfgFilename string) (*Config, error) {
Expand Down Expand Up @@ -122,6 +124,12 @@ func GetConfig(cfgFilename string) (*Config, error) {
return nil, ErrorTelemetryIntervalTooSmall
}

// If unset, default sampling rate to 1
if config.DataSamplingRate == 0 {
log.Printf("dataSamplingRate is not set, defaulting to %v", DefaultSamplingRate)
config.DataSamplingRate = DefaultSamplingRate
}

return &config, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func TestGetConfig(t *testing.T) {
c.RemoteContext ||
c.EnableAnnotations ||
c.TelemetryInterval != 15*time.Minute ||
c.DataAggregationLevel != Low {
c.DataAggregationLevel != Low ||
c.DataSamplingRate != 1 {
t.Errorf("Expeted config should be same as ./testwith/config.yaml; instead got %+v", c)
}
}
Expand Down Expand Up @@ -65,6 +66,5 @@ func TestDecodeLevelHook(t *testing.T) {
result, err := decodeLevelHook(reflect.TypeOf(test.input), reflect.TypeOf(Level(0)), test.input)
require.NoError(t, err)
assert.Equal(t, test.expected, result)

}
}
94 changes: 59 additions & 35 deletions pkg/plugin/conntrack/_cprog/conntrack.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,9 @@ static __always_inline __u8 _ct_get_traffic_direction(__u8 observation_point) {
* @arg key The key to be used to create the new connection.
* @arg observation_point The point in the network stack where the packet is observed.
* @arg is_reply true if the packet is a SYN-ACK packet. False if it is a SYN packet.
* @arg sampled Whether or not the packet was sampled for reporting.
*/
static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point, bool is_reply) {
static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point, bool is_reply, bool sampled) {
struct ct_entry new_value;
__builtin_memset(&new_value, 0, sizeof(struct ct_entry));
__u64 now = bpf_mono_now();
Expand All @@ -245,14 +246,20 @@ static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, stru
new_value.eviction_time = now + CT_SYN_TIMEOUT;
if(is_reply) {
new_value.flags_seen_rx_dir = p->flags;
new_value.last_report_rx_dir = now;
new_value.bytes_seen_since_last_report_rx_dir = 0;
new_value.packets_seen_since_last_report_rx_dir = 0;
new_value.last_report_rx_dir = sampled ? now : 0;
new_value.bytes_seen_since_last_report_rx_dir = !sampled ? p->bytes : 0;
new_value.packets_seen_since_last_report_rx_dir = !sampled;
if (!sampled) {
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_rx_dir);
}
} else {
new_value.flags_seen_tx_dir = p->flags;
new_value.last_report_tx_dir = now;
new_value.bytes_seen_since_last_report_tx_dir = 0;
new_value.packets_seen_since_last_report_tx_dir = 0;
new_value.last_report_tx_dir = sampled ? now : 0;
new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0;
new_value.packets_seen_since_last_report_tx_dir = !sampled;
if (!sampled) {
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_tx_dir);
}
}
new_value.is_direction_unknown = false;
new_value.traffic_direction = _ct_get_traffic_direction(observation_point);
Expand All @@ -273,16 +280,17 @@ static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, stru
p->is_reply = is_reply;
p->traffic_direction = new_value.traffic_direction;
bpf_map_update_elem(&retina_conntrack, key, &new_value, BPF_ANY);
return true;
return sampled;
}

/**
* Create a new UDP connection.
* @arg *p pointer to the packet to be processed.
* @arg key The key to be used to create the new connection.
* @arg observation_point The point in the network stack where the packet is observed.
* @arg sampled Whether or not the packet was sampled for reporting.
*/
static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point) {
static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point, bool sampled) {
if (!p || !key) {
return false;
}
Expand All @@ -295,9 +303,9 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
}
new_value.eviction_time = now + CT_CONNECTION_LIFETIME_NONTCP;
new_value.flags_seen_tx_dir = p->flags;
new_value.last_report_tx_dir = now;
new_value.bytes_seen_since_last_report_tx_dir = 0;
new_value.packets_seen_since_last_report_tx_dir = 0;
new_value.last_report_tx_dir = sampled ? now : 0;
new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0;
new_value.packets_seen_since_last_report_tx_dir = !sampled;
new_value.traffic_direction = _ct_get_traffic_direction(observation_point);
#ifdef ENABLE_CONNTRACK_METRICS
new_value.conntrack_metadata.packets_tx_count = 1;
Expand All @@ -310,7 +318,7 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
p->is_reply = false;
p->traffic_direction = new_value.traffic_direction;
bpf_map_update_elem(&retina_conntrack, key, &new_value, BPF_ANY);
return true;
return sampled;
}

/**
Expand All @@ -319,18 +327,19 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
* @arg key The key to be used to handle the connection.
* @arg reverse_key The reverse key to be used to handle the connection.
* @arg observation_point The point in the network stack where the packet is observed.
* @arg sampled Whether or not the packet was sampled for reporting.
*/
static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point) {
static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point, bool sampled) {
if (!p || !key || !reverse_key) {
return false;
}
u8 tcp_handshake = p->flags & (TCP_SYN|TCP_ACK);
if (tcp_handshake == TCP_SYN) {
// We have a SYN, we set `is_reply` to false and we provide `key`
return _ct_create_new_tcp_connection(p, key, observation_point, false);
return _ct_create_new_tcp_connection(p, key, observation_point, false, sampled);
} else if(tcp_handshake == (TCP_SYN|TCP_ACK)) {
// We have a SYN-ACK, we set `is_reply` to true and we provide `reverse_key`
return _ct_create_new_tcp_connection(p, reverse_key, observation_point, true);
return _ct_create_new_tcp_connection(p, reverse_key, observation_point, true, sampled);
}

// The packet is not a SYN packet and the connection corresponding to this packet is not found.
Expand All @@ -353,9 +362,12 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
if (p->flags & TCP_ACK) {
p->is_reply = true;
new_value.flags_seen_rx_dir = p->flags;
new_value.last_report_rx_dir = now;
new_value.bytes_seen_since_last_report_rx_dir = 0;
new_value.packets_seen_since_last_report_rx_dir = 0;
new_value.last_report_rx_dir = sampled ? now : 0;
new_value.bytes_seen_since_last_report_rx_dir = !sampled ? p->bytes : 0;
new_value.packets_seen_since_last_report_rx_dir = !sampled;
if (!sampled) {
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_rx_dir);
}
#ifdef ENABLE_CONNTRACK_METRICS
new_value.conntrack_metadata.bytes_rx_count = p->bytes;
new_value.conntrack_metadata.packets_rx_count = 1;
Expand All @@ -364,9 +376,12 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
} else { // Otherwise, the packet is considered as a packet in the send direction.
p->is_reply = false;
new_value.flags_seen_tx_dir = p->flags;
new_value.last_report_tx_dir = now;
new_value.bytes_seen_since_last_report_tx_dir = 0;
new_value.packets_seen_since_last_report_tx_dir = 0;
new_value.last_report_tx_dir = sampled ? now : 0;
new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0;
new_value.packets_seen_since_last_report_tx_dir = !sampled;
if (!sampled) {
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_tx_dir);
}
#ifdef ENABLE_CONNTRACK_METRICS
new_value.conntrack_metadata.bytes_tx_count = p->bytes;
new_value.conntrack_metadata.packets_tx_count = 1;
Expand All @@ -377,7 +392,7 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
// Update packet's conntrack metadata.
__builtin_memcpy(&p->conntrack_metadata, &new_value.conntrack_metadata, sizeof(struct conntrackmetadata));
#endif // ENABLE_CONNTRACK_METRICS
return true;
return sampled;
}

/**
Expand All @@ -386,17 +401,18 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
* @arg key The key to be used to handle the connection.
* @arg reverse_key The reverse key to be used to handle the connection.
* @arg observation_point The point in the network stack where the packet is observed.
* @arg sampled Whether or not the packet was sampled for reporting.
*/
static __always_inline struct packetreport _ct_handle_new_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point) {
static __always_inline struct packetreport _ct_handle_new_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point, bool sampled) {
struct packetreport report;
__builtin_memset(&report, 0, sizeof(struct packetreport));
if (!p || !key || !reverse_key) {
return report;
}
if (key->proto & IPPROTO_TCP) {
report.report = _ct_handle_tcp_connection(p, key, reverse_key, observation_point);
report.report = _ct_handle_tcp_connection(p, key, reverse_key, observation_point, sampled);
} else if (key->proto & IPPROTO_UDP) {
report.report = _ct_handle_udp_connection(p, key, observation_point);
report.report = _ct_handle_udp_connection(p, key, observation_point, sampled);
} else {
report.report = false; // We are not interested in other protocols.
}
Expand All @@ -410,9 +426,10 @@ static __always_inline struct packetreport _ct_handle_new_connection(struct pack
* @arg flags The flags of the packet.
* @arg direction The direction of the packet in relation to the connection.
* @arg bytes The size of the packet in bytes.
* @arg sampled Whether or not the packet was sampled for reporting.
* Returns a packetreport struct representing if the packet should be reported to userspace.
*/
static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4_key *key, struct ct_entry *entry, __u8 flags, __u8 direction, __u32 bytes) {
static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4_key *key, struct ct_entry *entry, __u8 flags, __u8 direction, __u32 bytes, bool sampled) {
struct packetreport report;
__builtin_memset(&report, 0, sizeof(struct packetreport));
report.report = false;
Expand Down Expand Up @@ -522,21 +539,27 @@ static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4
WRITE_ONCE(entry->eviction_time, now + CT_CONNECTION_LIFETIME_NONTCP);
}

if (flags != seen_flags) {
if (direction == CT_PACKET_DIR_TX) {
WRITE_ONCE(entry->flags_seen_tx_dir, flags);
} else {
WRITE_ONCE(entry->flags_seen_rx_dir, flags);
}
}

// Report if:
// 1. We already decided to report based on protocol-specific rules, or
// 2. New flags have appeared, or
// 2. New flags have appeared and the packet has been sampled, or
// 3. Reporting interval has elapsed
if (should_report || flags != seen_flags || now - last_report >= CT_REPORT_INTERVAL) {
if (should_report || (sampled && flags != seen_flags) || now - last_report >= CT_REPORT_INTERVAL) {
report.report = true;
// Update the connection's state
if (direction == CT_PACKET_DIR_TX) {
WRITE_ONCE(entry->flags_seen_tx_dir, flags);
WRITE_ONCE(entry->last_report_tx_dir, now);
WRITE_ONCE(entry->bytes_seen_since_last_report_tx_dir, 0);
WRITE_ONCE(entry->packets_seen_since_last_report_tx_dir, 0);
__builtin_memset(&entry->flags_seen_since_last_report_tx_dir, 0, sizeof(struct tcpflagscount));
} else {
WRITE_ONCE(entry->flags_seen_rx_dir, flags);
WRITE_ONCE(entry->last_report_rx_dir, now);
WRITE_ONCE(entry->bytes_seen_since_last_report_rx_dir, 0);
WRITE_ONCE(entry->packets_seen_since_last_report_rx_dir, 0);
Expand Down Expand Up @@ -565,9 +588,10 @@ static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4
* Process a packet and update the connection tracking map.
* @arg *p pointer to the packet to be processed.
* @arg observation_point The point in the network stack where the packet is observed.
* @arg sampled Whether or not the packet has been sampled for reporting.
* Returns a packetreport struct representing if the packet should be reported to userspace.
*/
static __always_inline __attribute__((unused)) struct packetreport ct_process_packet(struct packet *p, __u8 observation_point) {
static __always_inline __attribute__((unused)) struct packetreport ct_process_packet(struct packet *p, __u8 observation_point, bool sampled) {
if (!p) {
struct packetreport report;
__builtin_memset(&report, 0, sizeof(struct packetreport));
Expand Down Expand Up @@ -601,7 +625,7 @@ static __always_inline __attribute__((unused)) struct packetreport ct_process_pa
// Update packet's conntract metadata.
__builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata));
#endif // ENABLE_CONNTRACK_METRICS
return _ct_should_report_packet(&key, entry, p->flags, CT_PACKET_DIR_TX, p->bytes);
return _ct_should_report_packet(&key, entry, p->flags, CT_PACKET_DIR_TX, p->bytes, sampled);
}

// The connection is not found in the send direction. Check the reply direction by reversing the key.
Expand All @@ -623,9 +647,9 @@ static __always_inline __attribute__((unused)) struct packetreport ct_process_pa
// Update packet's conntract metadata.
__builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata));
#endif // ENABLE_CONNTRACK_METRICS
return _ct_should_report_packet(&reverse_key, entry, p->flags, CT_PACKET_DIR_RX, p->bytes);
return _ct_should_report_packet(&reverse_key, entry, p->flags, CT_PACKET_DIR_RX, p->bytes, sampled);
}

// If the connection is still not found, the connection is new.
return _ct_handle_new_connection(p, &key, &reverse_key, observation_point);
return _ct_handle_new_connection(p, &key, &reverse_key, observation_point, sampled);
}
1 change: 1 addition & 0 deletions pkg/plugin/packetparser/_cprog/dynamic.h
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
#define BYPASS_LOOKUP_IP_OF_INTEREST 0
#define DATA_AGGREGATION_LEVEL 0
#define DATA_SAMPLING_RATE 1
17 changes: 15 additions & 2 deletions pkg/plugin/packetparser/_cprog/packetparser.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,24 @@ static void parse(struct __sk_buff *skb, __u8 obs)
p.conntrack_metadata = conntrack_metadata;
#endif // ENABLE_CONNTRACK_METRICS

#ifdef DATA_AGGREGATION_LEVEL

// Calculate sampling
bool sampled __attribute__((unused));
sampled = true;

#ifdef DATA_SAMPLING_RATE
u32 rand __attribute__((unused));
rand = bpf_get_prandom_u32();
Comment thread
mmckeen marked this conversation as resolved.
if (rand >= UINT32_MAX / DATA_SAMPLING_RATE) {
Comment thread
mmckeen marked this conversation as resolved.
sampled = false;
}
#endif

// Process the packet in ct
struct packetreport report __attribute__((unused));
report = ct_process_packet(&p, obs);
report = ct_process_packet(&p, obs, sampled);

#ifdef DATA_AGGREGATION_LEVEL
// If the data aggregation level is low, always send the packet to the perf buffer.
#if DATA_AGGREGATION_LEVEL == DATA_AGGREGATION_LEVEL_LOW
p.previously_observed_packets = 0;
Expand Down
4 changes: 4 additions & 0 deletions pkg/plugin/packetparser/packetparser_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (p *packetParser) Generate(ctx context.Context) error {
p.l.Info("data aggregation level", zap.String("level", p.cfg.DataAggregationLevel.String()))
st += fmt.Sprintf("#define DATA_AGGREGATION_LEVEL %d\n", p.cfg.DataAggregationLevel)

// Process packetparser sampling rate.
p.l.Info("sampling rate", zap.Uint32("rate", p.cfg.DataSamplingRate))
st += fmt.Sprintf("#define DATA_SAMPLING_RATE %d\n", p.cfg.DataSamplingRate)

// Generate dynamic header for packetparser.
err = loader.WriteFile(ctx, dynamicHeaderPath, st)
if err != nil {
Expand Down
Loading