diff --git a/cmd/observability/utils.go b/cmd/observability/utils.go new file mode 100644 index 0000000000..33a04a5b0a --- /dev/null +++ b/cmd/observability/utils.go @@ -0,0 +1,83 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package observability + +import ( + "fmt" + "strings" + + "github.com/microsoft/retina/internal/buildinfo" + "github.com/microsoft/retina/pkg/config" + "github.com/microsoft/retina/pkg/log" + "github.com/microsoft/retina/pkg/telemetry" + "go.uber.org/zap" + "k8s.io/client-go/rest" +) + +const ( + logFileName = "retina.log" +) + +func InitializeTelemetryClient(restCfg *rest.Config, enabledPlugin []string, enableTelemetry bool, l *zap.SugaredLogger) (telemetry.Telemetry, error) { + if enableTelemetry { + if buildinfo.ApplicationInsightsID == "" { + panic("telemetry enabled, but ApplicationInsightsID is empty") + } + l.Info("telemetry enabled", zap.String("applicationInsightsID", buildinfo.ApplicationInsightsID)) + + var tel telemetry.Telemetry + var err error + if restCfg != nil { + tel, err = telemetry.NewAppInsightsTelemetryClient("retina-agent", map[string]string{ + "version": buildinfo.Version, + "apiserver": restCfg.Host, + "plugins": strings.Join(enabledPlugin, `,`), + }) + } else { + tel, err = telemetry.NewAppInsightsTelemetryClient("standalone-retina-agent", map[string]string{ + "version": buildinfo.Version, + "plugins": strings.Join(enabledPlugin, `,`), + }) + } + if err != nil { + l.Error("failed to create telemetry client", zap.Error(err)) + return tel, fmt.Errorf("error when creating telemetry client: %w", err) + } + return tel, nil + } + + l.Info("telemetry disabled") + tel := telemetry.NewNoopTelemetry() + return tel, nil +} + +func InitializeLogger(logLevel string, enableTelemetry bool, enabledPlugin []string, dataAggregationLevel config.Level) *log.ZapLogger { + if buildinfo.ApplicationInsightsID != "" { + telemetry.InitAppInsights(buildinfo.ApplicationInsightsID, buildinfo.Version) + defer telemetry.ShutdownAppInsights() + defer telemetry.TrackPanic() + } + + fmt.Println("init logger") + zl, err := log.SetupZapLogger(&log.LogOpts{ + Level: logLevel, + File: false, + FileName: logFileName, + MaxFileSizeMB: 100, //nolint:gomnd // defaults + MaxBackups: 3, //nolint:gomnd // defaults + MaxAgeDays: 30, //nolint:gomnd // defaults + ApplicationInsightsID: buildinfo.ApplicationInsightsID, + EnableTelemetry: enableTelemetry, + }, + zap.String("version", buildinfo.Version), + zap.String("plugins", strings.Join(enabledPlugin, `,`)), + zap.String("data aggregation level", dataAggregationLevel.String()), + ) + if err != nil { + panic(err) + } + defer zl.Close() + + return zl +} diff --git a/cmd/root.go b/cmd/root.go index 3f2fef1cff..9adda6e1a5 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -28,7 +28,7 @@ var ( RunE: func(cmd *cobra.Command, args []string) error { // Do Stuff Here fmt.Println("Starting Retina Agent") - d := standard.NewDaemon(metricsAddr, probeAddr, cfgFile, enableLeaderElection) + d := standard.NewDaemon(cfgFile, metricsAddr, probeAddr, enableLeaderElection) if err := d.Start(); err != nil { return fmt.Errorf("starting daemon: %w", err) } diff --git a/cmd/standalone.go b/cmd/standalone.go new file mode 100644 index 0000000000..be5f5e41b1 --- /dev/null +++ b/cmd/standalone.go @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package cmd + +import ( + "fmt" + + "github.com/microsoft/retina/cmd/standalone" + "github.com/microsoft/retina/internal/buildinfo" + "github.com/spf13/cobra" +) + +var standaloneCmd = &cobra.Command{ + Use: "standalone", + Short: "Start Retina without K8s control plane", + RunE: func(cobraCmd *cobra.Command, _ []string) error { + if v, _ := cobraCmd.Flags().GetBool("version"); v { + fmt.Printf("%s %s\n", cobraCmd.Name(), buildinfo.Version) + } + d := standalone.NewDaemon(cfgFile) + if err := d.Start(); err != nil { + return fmt.Errorf("starting standalone daemon: %w", err) + } + return nil + }, +} + +func init() { + standaloneCmd.Flags().AddFlagSet(rootCmd.Flags()) + rootCmd.AddCommand(standaloneCmd) +} diff --git a/cmd/standalone/daemon.go b/cmd/standalone/daemon.go new file mode 100644 index 0000000000..d270da0536 --- /dev/null +++ b/cmd/standalone/daemon.go @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package standalone + +import ( + "fmt" + + "github.com/microsoft/retina/cmd/observability" + "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/metrics" + "go.uber.org/zap" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/microsoft/retina/pkg/config" + cache "github.com/microsoft/retina/pkg/controllers/cache/standalone" + sc "github.com/microsoft/retina/pkg/controllers/daemon/standalone" + cm "github.com/microsoft/retina/pkg/managers/controllermanager" + sm "github.com/microsoft/retina/pkg/module/metrics/standalone" +) + +type Daemon struct { + configFile string +} + +func NewDaemon(configFile string) *Daemon { + return &Daemon{ + configFile: configFile, + } +} + +func (d *Daemon) Start() error { + fmt.Printf("Starting Retina daemon in standalone mode\n") + + daemonCfg, err := config.GetStandaloneConfig(d.configFile) + if err != nil { + panic(err) + } + zl := observability.InitializeLogger(daemonCfg.LogLevel, daemonCfg.EnableTelemetry, daemonCfg.EnabledPlugin, daemonCfg.DataAggregationLevel) + mainLogger := zl.Named("main").Sugar() + + // Initialize basic metrics and telemetry client + metrics.InitializeMetrics() + tel, err := observability.InitializeTelemetryClient(nil, daemonCfg.EnabledPlugin, daemonCfg.EnableTelemetry, mainLogger) + if err != nil { + return fmt.Errorf("failed to initialize telemetry client: %w", err) + } + + // Initialize cache and run enricher + ctx := ctrl.SetupSignalHandler() + controllerCache := cache.New() + enrich := enricher.NewStandalone(ctx, controllerCache) + enrich.Run() + + // Initialize metrics module + metricsModule := sm.InitModule(ctx, enrich) + + mainLogger.Info("Initializing RetinaEndpoint controller") + controller, err := sc.New(daemonCfg, controllerCache, metricsModule) + if err != nil { + mainLogger.Fatal("failed to create RetinaEndpoint controller", zap.Error(err)) + } + go controller.Run(ctx) + + // Initialize controller manager + controllerMgr, err := cm.NewStandaloneControllerManager(daemonCfg, tel) + if err != nil { + mainLogger.Fatal("failed to create standalone controller manager", zap.Error(err)) + } + if err := controllerMgr.Init(); err != nil { + mainLogger.Fatal("failed to initialize standalone controller manager", zap.Error(err)) + } + + // start heartbeat goroutine for application insights + go tel.Heartbeat(ctx, daemonCfg.TelemetryInterval) + + // Start controller manager, which will start the http server and plugin manager + go controllerMgr.Start(ctx) + mainLogger.Info("Started controller manager") + + <-ctx.Done() + controllerMgr.Stop() + + mainLogger.Info("Network observability exiting. Till next time!") + return nil +} diff --git a/cmd/standard/daemon.go b/cmd/standard/daemon.go index 121bf70645..de782e0b9d 100644 --- a/cmd/standard/daemon.go +++ b/cmd/standard/daemon.go @@ -5,7 +5,6 @@ package standard import ( "fmt" "os" - "strings" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -25,6 +24,7 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "github.com/go-logr/zapr" + "github.com/microsoft/retina/cmd/observability" retinav1alpha1 "github.com/microsoft/retina/crd/api/v1alpha1" "github.com/microsoft/retina/internal/buildinfo" "github.com/microsoft/retina/pkg/config" @@ -37,18 +37,14 @@ import ( sc "github.com/microsoft/retina/pkg/controllers/daemon/service" "github.com/microsoft/retina/pkg/enricher" - "github.com/microsoft/retina/pkg/log" cm "github.com/microsoft/retina/pkg/managers/controllermanager" "github.com/microsoft/retina/pkg/managers/filtermanager" "github.com/microsoft/retina/pkg/metrics" mm "github.com/microsoft/retina/pkg/module/metrics" "github.com/microsoft/retina/pkg/pubsub" - "github.com/microsoft/retina/pkg/telemetry" ) const ( - logFileName = "retina.log" - nodeNameEnvKey = "NODE_NAME" nodeIPEnvKey = "NODE_IP" ) @@ -62,73 +58,50 @@ func init() { } type Daemon struct { + configFile string metricsAddr string probeAddr string enableLeaderElection bool - configFile string } -func NewDaemon(metricsAddr, probeAddr, configFile string, enableLeaderElection bool) *Daemon { +func NewDaemon(configFile, metricsAddr, probeAddr string, enableLeaderElection bool) *Daemon { return &Daemon{ + configFile: configFile, metricsAddr: metricsAddr, probeAddr: probeAddr, enableLeaderElection: enableLeaderElection, - configFile: configFile, } } func (d *Daemon) Start() error { - fmt.Printf("starting Retina daemon with legacy control plane %v\n", buildinfo.Version) - - if buildinfo.ApplicationInsightsID != "" { - telemetry.InitAppInsights(buildinfo.ApplicationInsightsID, buildinfo.Version) - defer telemetry.ShutdownAppInsights() - defer telemetry.TrackPanic() - } + fmt.Printf("Starting Retina daemon with legacy control plane %v\n", buildinfo.Version) + fmt.Println("init client-go") - daemonConfig, err := config.GetConfig(d.configFile) + daemonCfg, err := config.GetConfig(d.configFile) if err != nil { panic(err) } + zl := observability.InitializeLogger(daemonCfg.LogLevel, daemonCfg.EnableTelemetry, daemonCfg.EnabledPlugin, daemonCfg.DataAggregationLevel) - fmt.Println("init client-go") - var cfg *rest.Config + var restCfg *rest.Config if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig != "" { - fmt.Println("KUBECONFIG set, using kubeconfig: ", kubeconfig) - cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + fmt.Println("KUBECONFIG detected, using kubeconfig: ", kubeconfig) + restCfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return fmt.Errorf("creating controller-runtime manager: %w", err) } } else { - cfg, err = kcfg.GetConfig() + restCfg, err = kcfg.GetConfig() if err != nil { panic(err) } } - fmt.Println("api server: ", cfg.Host) - - fmt.Println("init logger") - zl, err := log.SetupZapLogger(&log.LogOpts{ - Level: daemonConfig.LogLevel, - File: false, - FileName: logFileName, - MaxFileSizeMB: 100, //nolint:gomnd // defaults - MaxBackups: 3, //nolint:gomnd // defaults - MaxAgeDays: 30, //nolint:gomnd // defaults - ApplicationInsightsID: buildinfo.ApplicationInsightsID, - EnableTelemetry: daemonConfig.EnableTelemetry, - }, - zap.String("version", buildinfo.Version), - zap.String("apiserver", cfg.Host), - zap.String("plugins", strings.Join(daemonConfig.EnabledPlugin, `,`)), - zap.String("data aggregation level", daemonConfig.DataAggregationLevel.String()), + fmt.Println("api server: ", restCfg.Host) + + mainLogger := zl.Named("main").Sugar().With( + "apiserver", restCfg.Host, ) - if err != nil { - panic(err) - } - defer zl.Close() - mainLogger := zl.Named("main").Sugar() // Allow the current process to lock memory for eBPF resources. // OS specific implementation. @@ -138,31 +111,14 @@ func (d *Daemon) Start() error { } metrics.InitializeMetrics() + mainLogger.Info(zap.String("data aggregation level", daemonCfg.DataAggregationLevel.String())) - mainLogger.Info(zap.String("data aggregation level", daemonConfig.DataAggregationLevel.String())) - - var tel telemetry.Telemetry - if daemonConfig.EnableTelemetry { - if buildinfo.ApplicationInsightsID == "" { - panic("telemetry enabled, but ApplicationInsightsID is empty") - } - mainLogger.Info("telemetry enabled", zap.String("applicationInsightsID", buildinfo.ApplicationInsightsID)) - tel, err = telemetry.NewAppInsightsTelemetryClient("retina-agent", map[string]string{ - "version": buildinfo.Version, - "apiserver": cfg.Host, - "plugins": strings.Join(daemonConfig.EnabledPlugin, `,`), - }) - if err != nil { - mainLogger.Error("failed to create telemetry client", zap.Error(err)) - return fmt.Errorf("error when creating telemetry client: %w", err) - } - } else { - mainLogger.Info("telemetry disabled") - tel = telemetry.NewNoopTelemetry() + tel, err := observability.InitializeTelemetryClient(restCfg, daemonCfg.EnabledPlugin, daemonCfg.EnableTelemetry, mainLogger) + if err != nil { + return fmt.Errorf("failed to initialize telemetry client: %w", err) } // Create a manager for controller-runtime - mgrOption := crmgr.Options{ Scheme: scheme, Metrics: metricsserver.Options{ @@ -174,7 +130,7 @@ func (d *Daemon) Start() error { } // Local context has its meaning only when pod level(advanced) metrics is enabled. - if daemonConfig.EnablePodLevel && !daemonConfig.RemoteContext { + if daemonCfg.EnablePodLevel && !daemonCfg.RemoteContext { mainLogger.Info("Remote context is disabled, only pods deployed on the same node as retina-agent will be monitored") // the new cache sets Selector options on the Manager cache which are used // to perform *server-side* filtering of the cached objects. This is very important @@ -206,7 +162,7 @@ func (d *Daemon) Start() error { } } - mgr, err := crmgr.New(cfg, mgrOption) + mgr, err := crmgr.New(restCfg, mgrOption) if err != nil { mainLogger.Error("Unable to start manager", zap.Error(err)) return fmt.Errorf("creating controller-runtime manager: %w", err) @@ -236,10 +192,10 @@ func (d *Daemon) Start() error { ctx := ctrl.SetupSignalHandler() ctrl.SetLogger(zapr.NewLogger(zl.Logger.Named("controller-runtime"))) - if daemonConfig.EnablePodLevel { + if daemonCfg.EnablePodLevel { pubSub := pubsub.New() controllerCache := controllercache.New(pubSub) - enrich := enricher.New(ctx, controllerCache) + enrich := enricher.NewStandard(ctx, controllerCache) //nolint:govet // shadowing this err is fine fm, err := filtermanager.Init(5) //nolint:gomnd // defaults if err != nil { @@ -247,16 +203,16 @@ func (d *Daemon) Start() error { } defer fm.Stop() //nolint:errcheck // best effort enrich.Run() - metricsModule := mm.InitModule(ctx, daemonConfig, pubSub, enrich, fm, controllerCache) + metricsModule := mm.InitModule(ctx, daemonCfg, pubSub, enrich, fm, controllerCache) - if !daemonConfig.RemoteContext { + if !daemonCfg.RemoteContext { mainLogger.Info("Initializing Pod controller") podController := pc.New(mgr.GetClient(), controllerCache) if err := podController.SetupWithManager(mgr); err != nil { mainLogger.Fatal("unable to create PodController", zap.Error(err)) } - } else if daemonConfig.EnableRetinaEndpoint { + } else if daemonCfg.EnableRetinaEndpoint { mainLogger.Info("RetinaEndpoint is enabled") mainLogger.Info("Initializing RetinaEndpoint controller") @@ -278,7 +234,7 @@ func (d *Daemon) Start() error { mainLogger.Fatal("unable to create svcController", zap.Error(err)) } - if daemonConfig.EnableAnnotations { + if daemonCfg.EnableAnnotations { mainLogger.Info("Initializing MetricsConfig namespaceController") namespaceController := namespacecontroller.New(mgr.GetClient(), controllerCache, metricsModule) if err := namespaceController.SetupWithManager(mgr); err != nil { @@ -294,7 +250,7 @@ func (d *Daemon) Start() error { } } - controllerMgr, err := cm.NewControllerManager(daemonConfig, cl, tel) + controllerMgr, err := cm.NewStandardControllerManager(daemonCfg, cl, tel) if err != nil { mainLogger.Fatal("Failed to create controller manager", zap.Error(err)) } @@ -304,10 +260,10 @@ func (d *Daemon) Start() error { // Stop is best effort. If it fails, we still want to stop the main process. // This is needed for graceful shutdown of Retina plugins. // Do it in the main thread as graceful shutdown is important. - defer controllerMgr.Stop(ctx) + defer controllerMgr.Stop() // start heartbeat goroutine for application insights - go tel.Heartbeat(ctx, daemonConfig.TelemetryInterval) + go tel.Heartbeat(ctx, daemonCfg.TelemetryInterval) // Start controller manager, which will start http server and plugin manager. go controllerMgr.Start(ctx) diff --git a/pkg/config/config.go b/pkg/config/config.go index 0ac6b1cb06..777aa5229b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -104,7 +104,7 @@ func GetConfig(cfgFilename string) (*Config, error) { err = viper.Unmarshal(&config, decoderConfigOption) if err != nil { - return nil, fmt.Errorf("fatal error config file: %s", err) + return nil, fmt.Errorf("fatal error unmarshalling config file: %w", err) } if config.MetricsIntervalDuration != 0 { diff --git a/pkg/config/standalone_config.go b/pkg/config/standalone_config.go new file mode 100644 index 0000000000..ba8e4e00c3 --- /dev/null +++ b/pkg/config/standalone_config.go @@ -0,0 +1,126 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package config + +import ( + "errors" + "fmt" + "log" + "strings" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/spf13/viper" +) + +type StandaloneConfig struct { + APIServer Server `yaml:"apiServer"` + LogLevel string `yaml:"logLevel"` + EnableTelemetry bool `yaml:"enableTelemetry"` + EnabledPlugin []string `yaml:"enabledPlugin"` + DataAggregationLevel Level `yaml:"dataAggregationLevel"` + MetricsInterval time.Duration `yaml:"metricsInterval"` + TelemetryInterval time.Duration `yaml:"telemetryInterval"` + EnrichmentMode string `yaml:"enrichmentMode"` + CrictlCommandTimeout time.Duration `yaml:"crictlCommandTimeout"` + StateFileLocation string `yaml:"stateFileLocation"` +} + +var ( + DefaultStandaloneConfig = StandaloneConfig{ + LogLevel: "info", + EnableTelemetry: false, + EnabledPlugin: []string{"hnsstats"}, + DataAggregationLevel: High, + MetricsInterval: time.Second, + EnrichmentMode: "crictl", + TelemetryInterval: DefaultTelemetryInterval, + CrictlCommandTimeout: 5 * time.Second, + } + + ErrMissingStateFileLocation = errors.New("stateFileLocation must be set when using statefile enrichment mode") +) + +func GetStandaloneConfig(cfgFilename string) (*StandaloneConfig, error) { + if cfgFilename != "" { + viper.SetConfigFile(cfgFilename) + } else { + viper.SetConfigName("config") + viper.AddConfigPath("/retina/config") + } + + viper.SetEnvPrefix("retina") + viper.AutomaticEnv() + + err := viper.ReadInConfig() + if err != nil { + return nil, fmt.Errorf("fatal error config file: %w", err) + } + + var config StandaloneConfig + decoderConfigOption := viper.DecodeHook(mapstructure.ComposeDecodeHookFunc( + mapstructure.StringToTimeDurationHookFunc(), // default hook. + mapstructure.StringToSliceHookFunc(","), // default hook. + decodeLevelHook, + )) + + err = viper.Unmarshal(&config, decoderConfigOption) + if err != nil { + return nil, fmt.Errorf("fatal error unmarshalling config file: %w", err) + } + + if config.MetricsInterval == 0 { + log.Printf("metricsInterval is not set, defaulting to %v", DefaultStandaloneConfig.MetricsInterval) + config.MetricsInterval = DefaultStandaloneConfig.MetricsInterval + } + + if config.TelemetryInterval == 0 { + log.Printf("telemetryInterval is not set, defaulting to %v", DefaultTelemetryInterval) + config.TelemetryInterval = DefaultTelemetryInterval + } + + if config.CrictlCommandTimeout == 0 { + log.Printf("crictlCommandTimeout is not set, defaulting to %v", DefaultStandaloneConfig.CrictlCommandTimeout) + config.CrictlCommandTimeout = DefaultStandaloneConfig.CrictlCommandTimeout + } + + switch { + case config.EnrichmentMode == "crictl": + case strings.HasSuffix(config.EnrichmentMode, "statefile"): + if config.StateFileLocation == "" { + return nil, ErrMissingStateFileLocation + } + default: + log.Printf("invalid enrichmentMode: %s, defaulting to '%s', supported modes: crictl and statefile", config.EnrichmentMode, DefaultStandaloneConfig.EnrichmentMode) + config.EnrichmentMode = DefaultStandaloneConfig.EnrichmentMode + } + + return &config, nil +} + +func StandaloneConfigAdapter(sc *StandaloneConfig) *Config { + if sc == nil { + return nil + } + + return &Config{ + APIServer: sc.APIServer, + LogLevel: sc.LogLevel, + EnabledPlugin: sc.EnabledPlugin, + MetricsInterval: sc.MetricsInterval, + MetricsIntervalDuration: sc.MetricsInterval, + EnableTelemetry: sc.EnableTelemetry, + DataAggregationLevel: sc.DataAggregationLevel, + TelemetryInterval: sc.TelemetryInterval, + + // Fields not applicable to StandaloneConfig, set to default values + EnableRetinaEndpoint: false, + EnablePodLevel: false, + EnableConntrackMetrics: false, + RemoteContext: false, + EnableAnnotations: false, + BypassLookupIPOfInterest: false, + MonitorSockPath: "", + } +} diff --git a/pkg/config/standalone_config_test.go b/pkg/config/standalone_config_test.go new file mode 100644 index 0000000000..3d3f32e421 --- /dev/null +++ b/pkg/config/standalone_config_test.go @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package config + +import ( + "reflect" + "testing" + "time" +) + +func TestGetStandaloneConfig(t *testing.T) { + c, err := GetStandaloneConfig("./testwith/config-standalone.yaml") + if err != nil { + t.Fatalf("Expected no error, instead got %+v", err) + } + if c.APIServer.Host != "0.0.0.0" || + c.APIServer.Port != 10093 || + c.LogLevel != "info" || + !c.EnableTelemetry || + !reflect.DeepEqual(c.EnabledPlugin, []string{"hnsstats"}) || + c.DataAggregationLevel != High || + c.MetricsInterval != 1*time.Second || + c.TelemetryInterval != 15*time.Minute || + c.EnrichmentMode != "azure-vnet-statefile" || + c.CrictlCommandTimeout != 5*time.Second || + c.StateFileLocation != "/generic/file/location/azure-vnet.json" { + t.Errorf("Expeted config should be same as ./testwith/config-standalone.yaml; instead got %+v", c) + } +} diff --git a/pkg/config/testwith/config-standalone.yaml b/pkg/config/testwith/config-standalone.yaml new file mode 100644 index 0000000000..4778ede4ec --- /dev/null +++ b/pkg/config/testwith/config-standalone.yaml @@ -0,0 +1,12 @@ +apiServer: + host: "0.0.0.0" + port: 10093 +logLevel: info +enableTelemetry: true +enabledPlugin: ["hnsstats"] +dataAggregationLevel: "high" +metricsInterval: "1s" +telemetryInterval: "15m" +enrichmentMode: "azure-vnet-statefile" +crictlCommandTimeout: "5s" +stateFileLocation: "/generic/file/location/azure-vnet.json" diff --git a/pkg/controllers/cache/standalone/cache.go b/pkg/controllers/cache/standalone/cache.go new file mode 100644 index 0000000000..1bffee1f85 --- /dev/null +++ b/pkg/controllers/cache/standalone/cache.go @@ -0,0 +1,112 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package standalone + +import ( + "fmt" + "net" + "sync" + + "github.com/microsoft/retina/pkg/common" + "github.com/microsoft/retina/pkg/log" + "go.uber.org/zap" +) + +type Cache struct { + mu sync.RWMutex + l *log.ZapLogger + // ipToEndpoint is a map of IP addresses to RetinaEndpoints (namespace/name) + ipToEndpoint map[string]*common.RetinaEndpoint +} + +// New returns a new instance of Cache +func New() *Cache { + c := &Cache{ + l: log.Logger().Named("Cache"), + ipToEndpoint: make(map[string]*common.RetinaEndpoint), + } + return c +} + +// GetAllIPs returns a list of all IPs in the cache +func (c *Cache) GetAllIPs() []string { + c.mu.RLock() + defer c.mu.RUnlock() + + ips := make([]string, 0, len(c.ipToEndpoint)) + for ip := range c.ipToEndpoint { + ips = append(ips, ip) + } + return ips +} + +// GetPodByIP returns the retina endpoint for the given IP +func (c *Cache) GetPodByIP(ip string) *common.RetinaEndpoint { + c.mu.RLock() + defer c.mu.RUnlock() + return c.ipToEndpoint[ip] +} + +// UpdateRetinaEndpoint updates the cache with the given retina endpoint +func (c *Cache) UpdateRetinaEndpoint(ep *common.RetinaEndpoint) error { + c.mu.Lock() + defer c.mu.Unlock() + return c.updateEndpoint(ep) +} + +// updateEndpoint updates the cache if there is a new retina endpoint +func (c *Cache) updateEndpoint(ep *common.RetinaEndpoint) error { + ip, err := ep.PrimaryIP() + if err != nil { + c.l.Error("error getting IP for retina endpoint", zap.Error(err)) + return fmt.Errorf("failed to get IP from retina endpoint %s: %w", ep.Key(), err) + } + + if pod, exists := c.ipToEndpoint[ip]; exists { + if pod.Name() == ep.Name() && pod.Namespace() == ep.Namespace() { + return nil + } + } + c.ipToEndpoint[ip] = ep + c.l.Info("Added retina endpoint to cache", zap.String("ip", ip), zap.String("namespace", ep.Namespace()), zap.String("name", ep.Name())) + return nil +} + +// DeleteRetinaEndpoint deletes the given retina endpoint from the cache +func (c *Cache) DeleteRetinaEndpoint(epKey string) error { + c.mu.Lock() + defer c.mu.Unlock() + c.deleteEndpoint(epKey) + return nil +} + +// deleteEndpoint deletes the given retina endpoint from the cache +func (c *Cache) deleteEndpoint(epKey string) { + if ep, exists := c.ipToEndpoint[epKey]; exists { + delete(c.ipToEndpoint, epKey) + c.l.Info("Deleted retina endpoint from cache", zap.String("ip", epKey), zap.String("namespace", ep.Namespace()), zap.String("name", ep.Name())) + } +} + +// Clear resets the ip to endpoint map +func (c *Cache) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + c.ipToEndpoint = make(map[string]*common.RetinaEndpoint) + c.l.Info("Cleared all retina endpoints from cache") +} + +// No op +func (c *Cache) GetSvcByIP(_ string) *common.RetinaSvc { return nil } +func (c *Cache) GetNodeByIP(_ string) *common.RetinaNode { return nil } +func (c *Cache) GetObjByIP(_ string) interface{} { return nil } +func (c *Cache) GetIPsByNamespace(_ string) []net.IP { return nil } +func (c *Cache) GetAnnotatedNamespaces() []string { return nil } + +func (c *Cache) UpdateRetinaSvc(_ *common.RetinaSvc) error { return nil } +func (c *Cache) DeleteRetinaSvc(_ string) error { return nil } +func (c *Cache) UpdateRetinaNode(_ *common.RetinaNode) error { return nil } +func (c *Cache) DeleteRetinaNode(_ string) error { return nil } +func (c *Cache) AddAnnotatedNamespace(_ string) {} +func (c *Cache) DeleteAnnotatedNamespace(_ string) {} diff --git a/pkg/controllers/cache/standalone/cache_test.go b/pkg/controllers/cache/standalone/cache_test.go new file mode 100644 index 0000000000..96b720178a --- /dev/null +++ b/pkg/controllers/cache/standalone/cache_test.go @@ -0,0 +1,164 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package standalone + +import ( + "net" + "testing" + + "github.com/microsoft/retina/pkg/common" + "github.com/microsoft/retina/pkg/log" + "github.com/stretchr/testify/require" +) + +var ( + ep1 = common.NewRetinaEndpoint("pod1", "ns1", &common.IPAddresses{IPv4: net.ParseIP("10.0.0.1")}) + ep2 = common.NewRetinaEndpoint("pod2", "ns2", &common.IPAddresses{IPv4: net.ParseIP("10.0.0.1")}) + ep3 = common.NewRetinaEndpoint("pod1", "ns1", &common.IPAddresses{IPv4: net.ParseIP("10.0.0.1")}) +) + +func TestCacheAddEndpoint(t *testing.T) { + if _, err := log.SetupZapLogger(log.GetDefaultLogOpts()); err != nil { + t.Errorf("Error setting up logger: %s", err) + } + + tests := []struct { + name string + endpoint *common.RetinaEndpoint + expectedPod string + expectedNs string + }{ + { + name: "Add new endpoint", + endpoint: ep1, + expectedPod: ep1.Name(), + expectedNs: ep1.Namespace(), + }, + { + name: "Add identical endpoint", + endpoint: ep3, + expectedPod: ep1.Name(), + expectedNs: ep1.Namespace(), + }, + { + name: "Update endpoint info for same IP", + endpoint: ep2, + expectedPod: ep2.Name(), + expectedNs: ep2.Namespace(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := New() + + err := c.UpdateRetinaEndpoint(tt.endpoint) + require.NoError(t, err) + + ip, err := tt.endpoint.PrimaryIP() + require.NoError(t, err) + + got := c.GetPodByIP(ip) + require.NotNil(t, got, "Expected retina endpoint, got nil") + require.Equal(t, tt.expectedPod, got.Name()) + require.Equal(t, tt.expectedNs, got.Namespace()) + }) + } +} + +func TestCacheDeleteEndpoint(t *testing.T) { + if _, err := log.SetupZapLogger(log.GetDefaultLogOpts()); err != nil { + t.Errorf("Error setting up logger: %s", err) + } + + ip1, err := ep1.PrimaryIP() + require.NoError(t, err) + + tests := []struct { + name string + add []*common.RetinaEndpoint + deleteIP string + expectedEndpoint *common.RetinaEndpoint + }{ + { + name: "Delete existing endpoint", + add: []*common.RetinaEndpoint{ep1}, + deleteIP: ip1, + expectedEndpoint: nil, + }, + { + name: "Delete non-existing pod (no-op)", + add: []*common.RetinaEndpoint{}, + deleteIP: "10.0.0.2", + expectedEndpoint: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := New() + + for _, ep := range tt.add { + require.NoError(t, c.UpdateRetinaEndpoint(ep)) + } + require.NoError(t, c.DeleteRetinaEndpoint(tt.deleteIP)) + + got := c.GetPodByIP(tt.deleteIP) + require.Equal(t, tt.expectedEndpoint, got) + }) + } +} + +func TestCacheGetAllIPs(t *testing.T) { + if _, err := log.SetupZapLogger(log.GetDefaultLogOpts()); err != nil { + t.Errorf("Error setting up logger: %s", err) + } + ep4 := common.NewRetinaEndpoint("pod4", "ns4", &common.IPAddresses{IPv4: net.ParseIP("10.0.0.4")}) + + tests := []struct { + name string + add []*common.RetinaEndpoint + delete []string + wantIPs []string + }{ + { + name: "Add two IPs", + add: []*common.RetinaEndpoint{ep1, ep2}, + wantIPs: []string{"10.0.0.1"}, + }, + { + name: "Add two unique IPs", + add: []*common.RetinaEndpoint{ep1, ep4}, + wantIPs: []string{"10.0.0.1", "10.0.0.4"}, + }, + { + name: "Add two unique IPs and delete one IP", + add: []*common.RetinaEndpoint{ep1, ep4}, + delete: []string{"10.0.0.1"}, + wantIPs: []string{"10.0.0.4"}, + }, + { + name: "Add two unique IPs and delete two IPs", + add: []*common.RetinaEndpoint{ep1, ep4}, + delete: []string{"10.0.0.1", "10.0.0.4"}, + wantIPs: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := New() + + for _, ep := range tt.add { + require.NoError(t, c.UpdateRetinaEndpoint(ep)) + } + for _, ip := range tt.delete { + require.NoError(t, c.DeleteRetinaEndpoint(ip)) + } + + gotIPs := c.GetAllIPs() + require.ElementsMatch(t, tt.wantIPs, gotIPs, "IPs mismatch for test: %s", tt.name) + }) + } +} diff --git a/pkg/controllers/daemon/standalone/controller.go b/pkg/controllers/daemon/standalone/controller.go new file mode 100644 index 0000000000..19a9755f20 --- /dev/null +++ b/pkg/controllers/daemon/standalone/controller.go @@ -0,0 +1,130 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package standalone + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/microsoft/retina/pkg/common" + kcfg "github.com/microsoft/retina/pkg/config" + "github.com/microsoft/retina/pkg/controllers/cache/standalone" + "github.com/microsoft/retina/pkg/controllers/daemon/standalone/source" + "github.com/microsoft/retina/pkg/controllers/daemon/standalone/source/ctrinfo" + "github.com/microsoft/retina/pkg/controllers/daemon/standalone/source/statefile" + "github.com/microsoft/retina/pkg/log" + sm "github.com/microsoft/retina/pkg/module/metrics/standalone" + + "go.uber.org/zap" +) + +type Controller struct { + // interface for fetching retina endpoint information + src source.Source + // cache to hold retina endpoints + cache *standalone.Cache + + metricsModule *sm.Module + config *kcfg.StandaloneConfig + l *log.ZapLogger +} + +// New creates a new instance of the standalone controller +func New(config *kcfg.StandaloneConfig, cache *standalone.Cache, metricsModule *sm.Module) (*Controller, error) { + var src source.Source + var err error + + switch { + case config.EnrichmentMode == "crictl": + src, err = ctrinfo.New(config.CrictlCommandTimeout) + if err != nil { + return nil, fmt.Errorf("failed to create crictl source: %w", err) + } + + case strings.HasSuffix(config.EnrichmentMode, "statefile"): + src, err = statefile.New(config.EnrichmentMode, config.StateFileLocation) + if err != nil { + return nil, fmt.Errorf("failed to create statefile source: %w", err) + } + } + + return &Controller{ + src: src, + cache: cache, + config: config, + metricsModule: metricsModule, + l: log.Logger().Named(string("RetinaEndpointController")), + }, nil +} + +// Reconcile syncs the state of the running endpoints with the existing endpoints in cache +func (c *Controller) Reconcile(ctx context.Context) error { + c.l.Info("Reconciling retina endpoints") + + // Retrieve running pod information from the corresponding source + runningEps, err := c.src.GetAllEndpoints() + if err != nil { + return fmt.Errorf("failed to get running endpoints: %w", err) + } + + runningIPs := make(map[string]*common.RetinaEndpoint) + for _, ep := range runningEps { + ip, err := ep.PrimaryIP() + if err != nil || ip == "" { + continue + } + runningIPs[ip] = ep + } + + cachedIPs := c.cache.GetAllIPs() + + // Remove IPs not in the running set + for _, ip := range cachedIPs { + if _, exists := runningIPs[ip]; !exists { + if err := c.cache.DeleteRetinaEndpoint(ip); err != nil { + return fmt.Errorf("failed to delete retina endpoint for ip=%s: %w", ip, err) + } + } + } + + // Update IPs that are not existing in cache + for ip, ep := range runningIPs { + if err := c.cache.UpdateRetinaEndpoint(ep); err != nil { + return fmt.Errorf("failed to update retina endpoint for ip=%s: %w", ip, err) + } + } + + c.metricsModule.Reconcile(ctx) + c.l.Info("Reconciliation completed") + return nil +} + +// Run starts the controller loop +func (c *Controller) Run(ctx context.Context) { + c.l.Info("Starting RetinaEndpoint controller") + + ticker := time.NewTicker(c.config.MetricsInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + c.Stop() + return + case <-ticker.C: + if err := c.Reconcile(ctx); err != nil { + c.l.Error("failed to reconcile", zap.Error(err)) + } + } + } +} + +// Stop stops the controller and cleans up resources +func (c *Controller) Stop() { + c.l.Info("Stopping RetinaEndpoint controller") + c.cache.Clear() + c.metricsModule.Clear() +} diff --git a/pkg/controllers/daemon/standalone/controller_test.go b/pkg/controllers/daemon/standalone/controller_test.go new file mode 100644 index 0000000000..8416f00fc2 --- /dev/null +++ b/pkg/controllers/daemon/standalone/controller_test.go @@ -0,0 +1,78 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package standalone + +import ( + "context" + "net" + "testing" + "time" + + "github.com/microsoft/retina/pkg/common" + kcfg "github.com/microsoft/retina/pkg/config" + "github.com/microsoft/retina/pkg/controllers/cache/standalone" + "github.com/microsoft/retina/pkg/controllers/daemon/standalone/source" + "github.com/microsoft/retina/pkg/log" + sm "github.com/microsoft/retina/pkg/module/metrics/standalone" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestControllerReconcile(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // Setup logger + _, err := log.SetupZapLogger(log.GetDefaultLogOpts()) + require.NoError(t, err) + + // Mock source + mockSource := source.NewMockSource(ctrl) + + // Cache + cache := standalone.New() + + // Metrics module + ctx := context.Background() + metricsModule := sm.InitModule(ctx, nil) + + // Prepopulate cache with an endpoint to simulate deletion + oldEp := common.NewRetinaEndpoint("old-pod", "default", &common.IPAddresses{IPv4: net.ParseIP("1.1.1.2")}) + require.NoError(t, cache.UpdateRetinaEndpoint(oldEp)) + + // New endpoint returned by the source + newEndpoint := common.NewRetinaEndpoint("new-pod", "default", &common.IPAddresses{IPv4: net.ParseIP("1.1.1.1")}) + mockSource.EXPECT().GetAllEndpoints().Return([]*common.RetinaEndpoint{newEndpoint}, nil) + + // Need valid file path to initialize statefile source + testMockStatefilePath := "./source/statefile/azure/azure-vnet-mock.json" + + // Setup test controller with invalid config to test error handling + invalidCfg := &kcfg.StandaloneConfig{MetricsInterval: time.Second, EnrichmentMode: "gcp-statefile", StateFileLocation: testMockStatefilePath} + controller, err := New(invalidCfg, cache, metricsModule) + require.Error(t, err) + require.Nil(t, controller) + + // Setup test controller with valid config + cfg := &kcfg.StandaloneConfig{MetricsInterval: time.Second, EnrichmentMode: "azure-vnet-statefile", StateFileLocation: testMockStatefilePath} + controller, err = New(cfg, cache, metricsModule) + require.NoError(t, err) + require.NotNil(t, controller) + + controller.src = mockSource // inject mock source + + // Run Reconcile + err = controller.Reconcile(ctx) + require.NoError(t, err) + + // Validate cache updates + cachedIPs := cache.GetAllIPs() + require.Len(t, cachedIPs, 1, "only new endpoint should remain in cache") + require.Contains(t, cachedIPs, "1.1.1.1") + + // Stop the controller and validate cleanup + controller.Stop() + require.Empty(t, controller.cache.GetAllIPs()) +} diff --git a/pkg/controllers/daemon/standalone/source/ctrinfo/ctrfinfo_linux.go b/pkg/controllers/daemon/standalone/source/ctrinfo/ctrfinfo_linux.go new file mode 100644 index 0000000000..48ebbfd949 --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/ctrinfo/ctrfinfo_linux.go @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//go:build linux + +package ctrinfo + +func runGetPods(c *Ctrinfo) (string, error) { + return c.runCommand("crictl", "pods", "-q") +} + +func runPodInspect(c *Ctrinfo, id string) (string, error) { + return c.runCommand("crictl", "inspectp", id) +} diff --git a/pkg/controllers/daemon/standalone/source/ctrinfo/ctrinfo.go b/pkg/controllers/daemon/standalone/source/ctrinfo/ctrinfo.go new file mode 100644 index 0000000000..d1b27b5616 --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/ctrinfo/ctrinfo.go @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package ctrinfo + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net" + "os/exec" + "strings" + "time" + + "github.com/microsoft/retina/pkg/common" +) + +type Ctrinfo struct { + commandTimeout time.Duration +} + +type PodSpec struct { + Status Status `json:"status"` +} + +type Status struct { + Metadata Metadata `json:"metadata"` + Network PodNetwork `json:"network"` +} + +type Metadata struct { + Name string `json:"name"` + Namespace string `json:"namespace"` +} + +type PodNetwork struct { + IP string `json:"ip"` +} + +var ( + getPodsCmd = runGetPods + inspectPodCmd = runPodInspect + + errGetPods = errors.New("failed to get running pods") + errInspectPod = errors.New("failed to inspect pod information") + errJSONRead = errors.New("error unmarshalling JSON") +) + +func newCtrinfo(commandTimeout time.Duration) *Ctrinfo { + return &Ctrinfo{ + commandTimeout: commandTimeout, + } +} + +func New(commandTimeout time.Duration) (*Ctrinfo, error) { + _, err := exec.LookPath("crictl") + if err != nil { + return nil, fmt.Errorf("crictl not found in PATH: %w", err) + } + return newCtrinfo(commandTimeout), nil +} + +func (c *Ctrinfo) GetAllEndpoints() ([]*common.RetinaEndpoint, error) { + // Using crictl to get all running pods + runningPods, err := getPodsCmd(c) + if err != nil { + return nil, fmt.Errorf("%w: %w", errGetPods, err) + } + + podIDs := strings.Split(strings.TrimSpace(runningPods), "\n") + endpoints := []*common.RetinaEndpoint{} + for _, podID := range podIDs { + if podID == "" { + continue + } + + // Using crictl to get pod spec + podSpec, err := inspectPodCmd(c, podID) + if err != nil { + return nil, fmt.Errorf("%w: %w", errInspectPod, err) + } + + var spec PodSpec + if err := json.Unmarshal([]byte(podSpec), &spec); err != nil { + return nil, fmt.Errorf("%w: %w", errJSONRead, err) + } + + ip := net.ParseIP(spec.Status.Network.IP) + // Skip pods with invalid or empty IPs + if ip == nil { + continue + } + + endpoints = append(endpoints, common.NewRetinaEndpoint( + spec.Status.Metadata.Name, + spec.Status.Metadata.Namespace, + common.NewIPAddress(ip, nil), + )) + } + + return endpoints, nil +} + +func (c *Ctrinfo) runCommand(command string, args ...string) (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.commandTimeout) + defer cancel() + + cmd := exec.CommandContext(ctx, command, args...) + var output bytes.Buffer + cmd.Stdout = &output + err := cmd.Run() + if err != nil { + return "", fmt.Errorf("failed to run command: %w", err) + } + return output.String(), nil +} diff --git a/pkg/controllers/daemon/standalone/source/ctrinfo/ctrinfo_test.go b/pkg/controllers/daemon/standalone/source/ctrinfo/ctrinfo_test.go new file mode 100644 index 0000000000..03639f7478 --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/ctrinfo/ctrinfo_test.go @@ -0,0 +1,110 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package ctrinfo + +import ( + "net" + "os" + "testing" + "time" + + "github.com/microsoft/retina/pkg/common" + "github.com/stretchr/testify/require" +) + +func TestCtrinfoGetAllEndpoints(t *testing.T) { + invalidJSONPath := "invalid_pod_spec.json" + invalidJSONContent := `{"status": {"metadata": {"name": "retina-pod", "namespace": "retina-namespace"}` + + err := os.WriteFile(invalidJSONPath, []byte(invalidJSONContent), 0o600) + require.NoError(t, err, "failed to create invalid JSON file") + defer os.Remove(invalidJSONPath) + + src := newCtrinfo(1 * time.Second) + + tests := []struct { + name string + podCmdOutput string + inspectCmdOutput string + getPodsErr error + inspectPodErr error + expectedErr error + expectedCount int + expectedRetinaEndpoint *common.RetinaEndpoint + }{ + { + name: "Successful get all endpoints", + podCmdOutput: "pod1\npod2\n", + inspectCmdOutput: "mock_podSpec.json", + expectedErr: nil, + expectedCount: 2, + expectedRetinaEndpoint: common.NewRetinaEndpoint( + "retina-pod", + "retina-namespace", + common.NewIPAddress(net.ParseIP("10.0.0.4"), nil), + ), + }, + { + name: "Get all running pods error", + getPodsErr: errGetPods, + expectedErr: errGetPods, + expectedCount: 0, + }, + { + name: "Inspect pod command error", + podCmdOutput: "pod1\npod2\n", + inspectPodErr: errInspectPod, + expectedErr: errInspectPod, + expectedCount: 0, + }, + { + name: "Invalid pod spec JSON", + podCmdOutput: "pod1\npod2\n", + inspectCmdOutput: invalidJSONPath, + expectedErr: errJSONRead, + expectedCount: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + originalGetPodsCmd := getPodsCmd + originalInspectPodCmd := inspectPodCmd + defer func() { + getPodsCmd = originalGetPodsCmd + inspectPodCmd = originalInspectPodCmd + }() + + getPodsCmd = func(_ *Ctrinfo) (string, error) { + if tt.getPodsErr != nil { + return "", tt.getPodsErr + } + return tt.podCmdOutput, nil + } + inspectPodCmd = func(_ *Ctrinfo, _ string) (string, error) { + if tt.inspectPodErr != nil { + return "", tt.inspectPodErr + } + content, err := os.ReadFile(tt.inspectCmdOutput) + if err != nil { + return "", errJSONRead + } + return string(content), nil + } + + endpoints, err := src.GetAllEndpoints() + if tt.expectedErr != nil { + require.Error(t, err) + require.ErrorContains(t, err, tt.expectedErr.Error()) + require.Nil(t, endpoints) + } else { + require.NoError(t, err) + require.Len(t, endpoints, tt.expectedCount) + if tt.expectedCount > 0 { + require.Equal(t, tt.expectedRetinaEndpoint, endpoints[0]) + } + } + }) + } +} diff --git a/pkg/controllers/daemon/standalone/source/ctrinfo/ctrinfo_windows.go b/pkg/controllers/daemon/standalone/source/ctrinfo/ctrinfo_windows.go new file mode 100644 index 0000000000..09968dc57a --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/ctrinfo/ctrinfo_windows.go @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//go:build windows + +package ctrinfo + +func runGetPods(c *Ctrinfo) (string, error) { + return c.runCommand("cmd", "/c", "crictl", "pods", "-q") +} + +func runPodInspect(c *Ctrinfo, id string) (string, error) { + return c.runCommand("cmd", "/c", "crictl", "inspectp", id) +} diff --git a/pkg/controllers/daemon/standalone/source/ctrinfo/mock_podSpec.json b/pkg/controllers/daemon/standalone/source/ctrinfo/mock_podSpec.json new file mode 100644 index 0000000000..70ca17934d --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/ctrinfo/mock_podSpec.json @@ -0,0 +1,99 @@ +{ + "status": { + "id": "X", + "metadata": { + "attempt": 1, + "name": "retina-pod", + "namespace": "retina-namespace", + "uid": "" + }, + "state": "SANDBOX_READY", + "createdAt": "X", + "network": { + "additionalIps": [], + "ip": "10.0.0.4" + }, + "linux": { + "namespaces": { + "options": { + "ipc": "POD", + "network": "POD", + "pid": "POD", + "targetId": "" + } + } + }, + "labels": { + "sandbox-platform": "windows/amd64" + }, + "annotations": {}, + "runtimeHandler": "runhcs-wcow-process" + }, + "info": { + "pid": 9324, + "processStatus": "running", + "netNamespaceClosed": false, + "image": "mcr.microsoft.com/windows/nanoserver:ltsc2022", + "snapshotKey": "X", + "snapshotter": "windows", + "runtimeHandler": "runhcs-wcow-process", + "runtimeType": "io.containerd.runhcs.v1", + "runtimeOptions": { + "debug": true, + "debug_type": 2, + "sandbox_image": "mcr.microsoft.com/windows/nanoserver:ltsc2022", + "sandbox_platform": "windows/amd64" + }, + "config": { + "metadata": { + "name": "retina-pod", + "namespace": "retina-namespace", + "attempt": 1 + }, + "labels": { + "sandbox-platform": "windows/amd64" + } + }, + "runtimeSpec": { + "ociVersion": "1.0.2", + "process": { + "user": { + "uid": 0, + "gid": 0 + }, + "args": [ + "c:\\windows\\system32\\cmd.exe" + ], + "cwd": "C:\\" + }, + "annotations": { + "io.kubernetes.cri.container-type": "sandbox", + "io.kubernetes.cri.sandbox-id": "X" + }, + "windows": { + "layerFolders": null, + "network": { + "networkNamespace": "X" + } + } + }, + "cniResult": { + "Interfaces": { + "eth0": { + "IPConfigs": [ + { + "IP": "10.0.0.4", + "Gateway": "10.0.0.1" + } + ], + "Mac": "", + "Sandbox": "" + } + }, + "DNS": [ + {} + ], + "Routes": null + } + } +} \ No newline at end of file diff --git a/pkg/controllers/daemon/standalone/source/mock_source.go b/pkg/controllers/daemon/standalone/source/mock_source.go new file mode 100644 index 0000000000..3b9a782878 --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/mock_source.go @@ -0,0 +1,61 @@ +// autogenerated +// +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/microsoft/retina/pkg/controllers/daemon/standalone/source (interfaces: Source) +// +// Generated by this command: +// +// mockgen -destination=mock_source.go -copyright_file=../lib/ignore_headers.txt -package=source github.com/microsoft/retina/pkg/controllers/daemon/standalone/source Source +// + +// Package source is a generated GoMock package. +package source + +import ( + reflect "reflect" + + common "github.com/microsoft/retina/pkg/common" + gomock "go.uber.org/mock/gomock" +) + +// MockSource is a mock of Source interface. +type MockSource struct { + ctrl *gomock.Controller + recorder *MockSourceMockRecorder +} + +// MockSourceMockRecorder is the mock recorder for MockSource. +type MockSourceMockRecorder struct { + mock *MockSource +} + +// NewMockSource creates a new mock instance. +func NewMockSource(ctrl *gomock.Controller) *MockSource { + mock := &MockSource{ctrl: ctrl} + mock.recorder = &MockSourceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSource) EXPECT() *MockSourceMockRecorder { + return m.recorder +} + +// GetAllEndpoints mocks base method. +func (m *MockSource) GetAllEndpoints() ([]*common.RetinaEndpoint, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllEndpoints") + ret0, _ := ret[0].([]*common.RetinaEndpoint) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAllEndpoints indicates an expected call of GetAllEndpoints. +func (mr *MockSourceMockRecorder) GetAllEndpoints() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllEndpoints", reflect.TypeOf((*MockSource)(nil).GetAllEndpoints)) +} diff --git a/pkg/controllers/daemon/standalone/source/statefile/azure/azure-vnet-mock.json b/pkg/controllers/daemon/standalone/source/statefile/azure/azure-vnet-mock.json new file mode 100644 index 0000000000..c8d9648960 --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/statefile/azure/azure-vnet-mock.json @@ -0,0 +1,37 @@ +{ + "Network": { + "ExternalInterfaces": { + "Ethernet": { + "Name": "Ethernet", + "Networks": { + "mock_network": { + "Endpoints": { + "mock_endpoint_1": { + "Id": "X", + "IPAddresses": [ + { + "IP": "192.0.0.5", + "Mask": "X" + } + ], + "PODName": "retina-pod", + "PODNameSpace": "retina-namespace" + }, + "mock_endpoint_2": { + "Id": "Y", + "IPAddresses": [ + { + "IP": "192.0.0.6", + "Mask": "Y" + } + ], + "PODName": "retina-pod2", + "PODNameSpace": "retina-namespace2" + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/pkg/controllers/daemon/standalone/source/statefile/azure/azure-vnet-test.go b/pkg/controllers/daemon/standalone/source/statefile/azure/azure-vnet-test.go new file mode 100644 index 0000000000..0b39ce600f --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/statefile/azure/azure-vnet-test.go @@ -0,0 +1,107 @@ +// // Copyright (c) Microsoft Corporation. +// // Licensed under the MIT license. + +package azure + +import ( + "net" + "os" + "testing" + + "github.com/microsoft/retina/pkg/common" + "github.com/stretchr/testify/require" +) + +func TestAzureVnetGetAllEndpoints(t *testing.T) { + emptyJSONPath := "empty-azure-vnet.json" + emptyJSONContent := `` + err := os.WriteFile(emptyJSONPath, []byte(emptyJSONContent), 0o600) + require.NoError(t, err, "failed to create empty JSON file") + + invalidJSONPath := "mock-invalid-azure-vnet.json" + invalidJSONContent := `{ + "Network": { + "ExternalInterfaces": { + "eth0": { + "Networks": { + "192.0.0.5": { + "IpAddresses": [ + { + "IP": "192.0.0.5" + } + ], + "PodName": "retina2-pod", + "PodNamespace": "retina2-namespace" + } + } + } + } + } + ` + err = os.WriteFile(invalidJSONPath, []byte(invalidJSONContent), 0o600) + require.NoError(t, err, "failed to create invalid JSON file") + + defer os.Remove(emptyJSONPath) + defer os.Remove(invalidJSONPath) + + src := &Statefile{} + + tests := []struct { + name string + filePath string + emptyFile bool + expectedEndpoint []*common.RetinaEndpoint + expectedErr bool + }{ + { + name: "Valid state file", + filePath: "azure-vnet-mock.json", + emptyFile: false, + expectedEndpoint: []*common.RetinaEndpoint{ + common.NewRetinaEndpoint("retina-pod", "retina-namespace", common.NewIPAddress(net.ParseIP("192.0.0.5"), nil)), + common.NewRetinaEndpoint("retina-pod2", "retina-namespace2", common.NewIPAddress(net.ParseIP("192.0.0.6"), nil)), + }, + expectedErr: false, + }, + { + name: "Empty state file", + filePath: emptyJSONPath, + emptyFile: true, + expectedEndpoint: nil, + expectedErr: false, + }, + { + name: "Missing state file", + filePath: "non-existent-file.json", + emptyFile: false, + expectedEndpoint: nil, + expectedErr: true, + }, + { + name: "Invalid state file JSON", + expectedEndpoint: nil, + emptyFile: false, + filePath: invalidJSONPath, + expectedErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + src = New(tt.filePath) + endpoints, err := src.GetAllEndpoints() + + if tt.expectedErr { + require.Error(t, err) + require.Nil(t, endpoints) + } else { + require.NoError(t, err) + if tt.emptyFile { + require.Empty(t, endpoints) + } else { + require.ElementsMatch(t, tt.expectedEndpoint, endpoints) + } + } + }) + } +} diff --git a/pkg/controllers/daemon/standalone/source/statefile/azure/azure-vnet.go b/pkg/controllers/daemon/standalone/source/statefile/azure/azure-vnet.go new file mode 100644 index 0000000000..83951e7b8b --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/statefile/azure/azure-vnet.go @@ -0,0 +1,90 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package azure + +import ( + "encoding/json" + "fmt" + "net" + "os" + + "github.com/microsoft/retina/pkg/common" +) + +type Statefile struct { + location string +} + +type CniState struct { + Network Network `json:"Network"` +} + +type Network struct { + ExternalInterfaces map[string]ExternalInterface `json:"ExternalInterfaces"` +} + +type ExternalInterface struct { + Networks map[string]NetworkInfo `json:"Networks"` +} + +type NetworkInfo struct { + Endpoints map[string]Endpoint `json:"Endpoints"` +} + +type Endpoint struct { + ID string `json:"Id"` + IPAddresses []IPInfo `json:"IPAddresses"` + PodName string `json:"PodName"` + PodNamespace string `json:"PodNamespace"` +} + +type IPInfo struct { + IP string `json:"IP"` +} + +func New(location string) *Statefile { + return &Statefile{ + location: location, + } +} + +func (a *Statefile) GetAllEndpoints() ([]*common.RetinaEndpoint, error) { + data, err := os.ReadFile(a.location) + if err != nil { + return nil, fmt.Errorf("failed to read azure-vnet state file: %w", err) + } + + if len(data) == 0 { + return nil, nil + } + + var cniState CniState + if err := json.Unmarshal(data, &cniState); err != nil { + return nil, fmt.Errorf("failed to decode azure-vnet state file: %w", err) + } + + endpoints := []*common.RetinaEndpoint{} + + // For every HNS endpoint, we check if the equivalent IP address exists in the azure-vnet state file + for _, iface := range cniState.Network.ExternalInterfaces { + for _, networkInfo := range iface.Networks { + for _, endpoint := range networkInfo.Endpoints { + for _, ipInfo := range endpoint.IPAddresses { + ip := ipInfo.IP + if ip == "" { + continue + } + + endpoints = append(endpoints, common.NewRetinaEndpoint( + endpoint.PodName, + endpoint.PodNamespace, + common.NewIPAddress(net.ParseIP(ip), nil), + )) + } + } + } + } + + return endpoints, nil +} diff --git a/pkg/controllers/daemon/standalone/source/statefile/statefile.go b/pkg/controllers/daemon/standalone/source/statefile/statefile.go new file mode 100644 index 0000000000..d6ba213d74 --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/statefile/statefile.go @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package statefile + +import ( + "os" + + "github.com/pkg/errors" + + "github.com/microsoft/retina/pkg/controllers/daemon/standalone/source" + "github.com/microsoft/retina/pkg/controllers/daemon/standalone/source/statefile/azure" +) + +var ErrUnsupportedStatefileType = errors.New("unsupported statefile enrichment type, valid types are: azure-vnet-statefile") + +func newStatefile(enrichmentMode, location string) (source.Source, error) { + switch enrichmentMode { + case "azure-vnet-statefile": + return azure.New(location), nil + default: + return nil, errors.Wrapf(ErrUnsupportedStatefileType, "enrichmentMode=%s", enrichmentMode) + } +} + +func New(enrichmentMode, location string) (source.Source, error) { + if _, err := os.Stat(location); os.IsNotExist(err) { + return nil, errors.Wrapf(err, "statefile does not exist at location: %s", location) + } + + return newStatefile(enrichmentMode, location) +} diff --git a/pkg/controllers/daemon/standalone/source/statefile/statefile_test.go b/pkg/controllers/daemon/standalone/source/statefile/statefile_test.go new file mode 100644 index 0000000000..7326dad33d --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/statefile/statefile_test.go @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package statefile + +import ( + "testing" + + "github.com/microsoft/retina/pkg/controllers/daemon/standalone/source/statefile/azure" + "github.com/stretchr/testify/require" +) + +func TestNew(t *testing.T) { + tests := []struct { + name string + enrichmentMode string + location string + wantType interface{} + wantErr error + }{ + { + name: "valid statefile enrichment type", + enrichmentMode: "azure-vnet-statefile", + location: "azure-vnet.json", + wantType: &azure.Statefile{}, + wantErr: nil, + }, + { + name: "invalid statefile type", + enrichmentMode: "gcp-vnet-statefile", + location: "gcp-vnet.json", + wantType: nil, + wantErr: ErrUnsupportedStatefileType, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + src, err := newStatefile(tt.enrichmentMode, tt.location) + + if tt.wantErr != nil { + require.ErrorContains(t, err, tt.wantErr.Error()) + require.Nil(t, src, "expected nil source on error") + } else { + require.NoError(t, err, "expected no error") + require.IsType(t, tt.wantType, src, "statefile source type mismatch") + } + }) + } +} diff --git a/pkg/controllers/daemon/standalone/source/types.go b/pkg/controllers/daemon/standalone/source/types.go new file mode 100644 index 0000000000..eabf92ab8a --- /dev/null +++ b/pkg/controllers/daemon/standalone/source/types.go @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package source + +import "github.com/microsoft/retina/pkg/common" + +//go:generate go run go.uber.org/mock/mockgen@v0.4.0 -destination=mock_source.go -copyright_file=../lib/ignore_headers.txt -package=source github.com/microsoft/retina/pkg/controllers/daemon/standalone/source Source + +type Source interface { + // GetAllEndpoints retrieves all retina endpoints from its corresponding source + GetAllEndpoints() ([]*common.RetinaEndpoint, error) +} diff --git a/pkg/enricher/base/base.go b/pkg/enricher/base/base.go new file mode 100644 index 0000000000..352d135b61 --- /dev/null +++ b/pkg/enricher/base/base.go @@ -0,0 +1,152 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package base + +import ( + "context" + "reflect" + "sync" + + "github.com/cilium/cilium/api/v1/flow" + v1 "github.com/cilium/cilium/pkg/hubble/api/v1" + "github.com/cilium/cilium/pkg/hubble/container" + "github.com/microsoft/retina/pkg/common" + c "github.com/microsoft/retina/pkg/controllers/cache" + "go.uber.org/zap" + + "github.com/microsoft/retina/pkg/log" +) + +var ( + E EnricherInterface + Once sync.Once + Initialized bool +) + +type Enricher struct { + // ctx is the context of enricher + Ctx context.Context + + // l is the logger + L *log.ZapLogger + + // // cache is the cache of all the objects + Cache c.CacheInterface + + InputRing *container.Ring + + OutputRing *container.Ring + + Reader *container.RingReader +} + +func NewEnricher(ctx context.Context, logger *log.ZapLogger, cache c.CacheInterface) *Enricher { + ir := container.NewRing(container.Capacity1023) + return &Enricher{ + Ctx: ctx, + L: logger, + Cache: cache, + InputRing: ir, + OutputRing: container.NewRing(container.Capacity1023), + Reader: container.NewRingReader(ir, ir.OldestWrite()), + } +} + +func Instance() EnricherInterface { + return E +} + +func IsInitialized() bool { + return Initialized +} + +func (e *Enricher) Run(enrichFn func(ev *v1.Event)) { + go func() { + for { + select { + case <-e.Ctx.Done(): + e.L.Debug("context is done for enricher") + return + default: + ev := e.Reader.NextFollow(e.Ctx) + // nolint:gocritic + // if err != nil { + // se.L.Error("error while reading from input channel for enricher", zap.Error(err)) + // continue + // } + if ev == nil { + e.L.Debug("received nil from input channel for enricher") + continue + } + // todo + switch fl := ev.Event.(type) { + case *flow.Flow: + e.L.Debug("Enriching flow", zap.Any("flow", fl)) + enrichFn(ev) + default: + e.L.Debug("received unknown type from input channel for enricher", + zap.Any("obj", ev), + zap.Any("type", reflect.TypeOf(ev)), + ) + } + } + } + }() +} + +// export forwards the flow to other modules +func (e *Enricher) Export(ev *v1.Event) { + e.OutputRing.Write(ev) +} + +func (e *Enricher) GetEndpoint(obj interface{}) *flow.Endpoint { + if obj == nil { + return nil + } + + switch o := obj.(type) { + case *common.RetinaEndpoint: + // TODO add service type + return &flow.Endpoint{ + Namespace: o.Namespace(), + PodName: o.Name(), + Labels: o.FormattedLabels(), + Workloads: e.getWorkloads(o.OwnerRefs()), + } + + case *common.RetinaSvc: + // todo + return nil + + default: + e.L.Debug("received unknown type from cache", zap.Any("obj", obj), zap.Any("type", reflect.TypeOf(obj))) + return nil + } +} + +func (e *Enricher) getWorkloads(ownerRefs []*common.OwnerReference) []*flow.Workload { + if ownerRefs == nil { + return nil + } + workloads := make([]*flow.Workload, 0) + + for _, ownerRef := range ownerRefs { + w := &flow.Workload{ + Name: ownerRef.Name, + Kind: ownerRef.Kind, + } + + workloads = append(workloads, w) + } + + return workloads +} + +func (e *Enricher) Write(ev *v1.Event) { + e.InputRing.Write(ev) +} + +func (e *Enricher) ExportReader() *container.RingReader { + return container.NewRingReader(e.OutputRing, e.OutputRing.OldestWrite()) +} diff --git a/pkg/enricher/mock_enricherinterface.go b/pkg/enricher/base/mock_enricherinterface.go similarity index 78% rename from pkg/enricher/mock_enricherinterface.go rename to pkg/enricher/base/mock_enricherinterface.go index 11824f1f9c..569849083c 100644 --- a/pkg/enricher/mock_enricherinterface.go +++ b/pkg/enricher/base/mock_enricherinterface.go @@ -5,15 +5,15 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/microsoft/retina/pkg/enricher (interfaces: EnricherInterface) +// Source: github.com/microsoft/retina/pkg/enricher/base (interfaces: EnricherInterface) // // Generated by this command: // -// mockgen -destination=mock_enricherinterface.go -copyright_file=../lib/ignore_headers.txt -package=enricher github.com/microsoft/retina/pkg/enricher EnricherInterface +// mockgen -destination=mock_enricherinterface.go -copyright_file=../lib/ignore_headers.txt -package=base github.com/microsoft/retina/pkg/enricher/base EnricherInterface // -// Package enricher is a generated GoMock package. -package enricher +// Package base is a generated GoMock package. +package base import ( reflect "reflect" @@ -46,6 +46,18 @@ func (m *MockEnricherInterface) EXPECT() *MockEnricherInterfaceMockRecorder { return m.recorder } +// Enrich mocks base method. +func (m *MockEnricherInterface) Enrich(arg0 *v1.Event) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Enrich", arg0) +} + +// Enrich indicates an expected call of Enrich. +func (mr *MockEnricherInterfaceMockRecorder) Enrich(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enrich", reflect.TypeOf((*MockEnricherInterface)(nil).Enrich), arg0) +} + // ExportReader mocks base method. func (m *MockEnricherInterface) ExportReader() *container.RingReader { m.ctrl.T.Helper() diff --git a/pkg/enricher/types.go b/pkg/enricher/base/types.go similarity index 65% rename from pkg/enricher/types.go rename to pkg/enricher/base/types.go index ed76c7e5dd..cd88a152cb 100644 --- a/pkg/enricher/types.go +++ b/pkg/enricher/base/types.go @@ -1,16 +1,17 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -package enricher +package base import ( v1 "github.com/cilium/cilium/pkg/hubble/api/v1" "github.com/cilium/cilium/pkg/hubble/container" ) -//go:generate go run go.uber.org/mock/mockgen@v0.4.0 -destination=mock_enricherinterface.go -copyright_file=../lib/ignore_headers.txt -package=enricher github.com/microsoft/retina/pkg/enricher EnricherInterface +//go:generate go run go.uber.org/mock/mockgen@v0.4.0 -destination=mock_enricherinterface.go -copyright_file=../lib/ignore_headers.txt -package=base github.com/microsoft/retina/pkg/enricher/base EnricherInterface type EnricherInterface interface { Run() Write(ev *v1.Event) ExportReader() *container.RingReader + Enrich(ev *v1.Event) } diff --git a/pkg/enricher/enricher.go b/pkg/enricher/enricher.go deleted file mode 100644 index 98013cd2ca..0000000000 --- a/pkg/enricher/enricher.go +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -package enricher - -import ( - "context" - "reflect" - "sync" - - "github.com/cilium/cilium/api/v1/flow" - v1 "github.com/cilium/cilium/pkg/hubble/api/v1" - "github.com/cilium/cilium/pkg/hubble/container" - "github.com/microsoft/retina/pkg/common" - "github.com/microsoft/retina/pkg/controllers/cache" - "github.com/microsoft/retina/pkg/log" - "go.uber.org/zap" -) - -var ( - e *Enricher - once sync.Once - initialized bool -) - -type Enricher struct { - // ctx is the context of enricher - ctx context.Context - - // l is the logger - l *log.ZapLogger - - // cache is the cache of all the objects - cache cache.CacheInterface - - inputRing *container.Ring - - Reader *container.RingReader - - outputRing *container.Ring -} - -func New(ctx context.Context, cache cache.CacheInterface) *Enricher { - once.Do(func() { - ir := container.NewRing(container.Capacity1023) - e = &Enricher{ - ctx: ctx, - l: log.Logger().Named("enricher"), - cache: cache, - inputRing: ir, - Reader: container.NewRingReader(ir, ir.OldestWrite()), - outputRing: container.NewRing(container.Capacity1023), - } - initialized = true - }) - - return e -} - -func Instance() *Enricher { - return e -} - -func IsInitialized() bool { - return initialized -} - -func (e *Enricher) Run() { - go func() { - for { - select { - case <-e.ctx.Done(): - e.l.Debug("context is done for enricher") - return - default: - ev := e.Reader.NextFollow(e.ctx) - //if err != nil { - //e.l.Error("error while reading from input channel for enricher", zap.Error(err)) - // continue - //} - if ev == nil { - e.l.Debug("received nil from input channel for enricher") - continue - } - // todo - switch ev.Event.(type) { - case *flow.Flow: - e.l.Debug("Enriching flow", zap.Any("flow", ev.Event.(*flow.Flow))) - e.enrich(ev) - default: - e.l.Debug("received unknown type from input channel for enricher", - zap.Any("obj", ev), - zap.Any("type", reflect.TypeOf(ev)), - ) - } - } - } - }() -} - -// enrich takes the flow and enriches it with the information from the cache -func (e *Enricher) enrich(ev *v1.Event) { - flow := ev.Event.(*flow.Flow) - - // IPversion is a enum in the flow proto - // 0: IPVersion_IP_NOT_USED - // 1: IPVersion_IPv4 - // 2: IPVersion_IPv6 - if flow.IP.IpVersion > 1 { - e.l.Error("IP version is not supported", zap.Any("IPVersion", flow.IP.IpVersion)) - return - } - if flow.IP.Source == "" { - e.l.Debug("source IP is empty") - return - } - srcObj := e.cache.GetObjByIP(flow.IP.Source) - if srcObj != nil { - flow.Source = e.getEndpoint(srcObj) - } - - if flow.IP.Destination == "" { - e.l.Debug("destination IP is empty") - return - } - - dstObj := e.cache.GetObjByIP(flow.IP.Destination) - if dstObj != nil { - flow.Destination = e.getEndpoint(dstObj) - } - - ev.Event = flow - e.l.Debug("enriched flow", zap.Any("flow", flow)) - e.export(ev) -} - -// export forwards the flow to other modules -func (e *Enricher) export(ev *v1.Event) { - e.outputRing.Write(ev) -} - -func (e *Enricher) getEndpoint(obj interface{}) *flow.Endpoint { - if obj == nil { - return nil - } - - switch o := obj.(type) { - case *common.RetinaEndpoint: - // TODO add service type - return &flow.Endpoint{ - Namespace: o.Namespace(), - PodName: o.Name(), - Labels: o.FormattedLabels(), - Workloads: e.getWorkloads(o.OwnerRefs()), - } - - case *common.RetinaSvc: - // todo - return nil - - default: - e.l.Debug("received unknown type from cache", zap.Any("obj", obj), zap.Any("type", reflect.TypeOf(obj))) - return nil - } -} - -func (e *Enricher) getWorkloads(ownerRefs []*common.OwnerReference) []*flow.Workload { - if ownerRefs == nil { - return nil - } - workloads := make([]*flow.Workload, 0) - - for _, ownerRef := range ownerRefs { - w := &flow.Workload{ - Name: ownerRef.Name, - Kind: ownerRef.Kind, - } - - workloads = append(workloads, w) - } - - return workloads -} - -func (e *Enricher) Write(ev *v1.Event) { - e.inputRing.Write(ev) -} - -func (e *Enricher) ExportReader() *container.RingReader { - return container.NewRingReader(e.outputRing, e.outputRing.OldestWrite()) -} diff --git a/pkg/enricher/standalone.go b/pkg/enricher/standalone.go new file mode 100644 index 0000000000..f69b8eeacf --- /dev/null +++ b/pkg/enricher/standalone.go @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package enricher + +import ( + "context" + + "github.com/cilium/cilium/api/v1/flow" + v1 "github.com/cilium/cilium/pkg/hubble/api/v1" + c "github.com/microsoft/retina/pkg/controllers/cache" + "github.com/microsoft/retina/pkg/enricher/base" + "github.com/microsoft/retina/pkg/log" + "go.uber.org/zap" +) + +type StandaloneEnricher struct { + *base.Enricher +} + +func newStandalone(ctx context.Context, cache c.CacheInterface) *StandaloneEnricher { + logger := log.Logger().Named("standalone-enricher") + return &StandaloneEnricher{ + Enricher: base.NewEnricher(ctx, logger, cache), + } +} + +func NewStandalone(ctx context.Context, cache c.CacheInterface) base.EnricherInterface { + base.Once.Do(func() { + base.E = newStandalone(ctx, cache) + base.Initialized = true + }) + return base.E +} + +func (se *StandaloneEnricher) Enrich(ev *v1.Event) { + fl := ev.Event.(*flow.Flow) + + if fl.GetIP().GetSource() == "" { + se.L.Debug("source IP is empty") + return + } + + srcObj := se.Cache.GetPodByIP(fl.GetIP().GetSource()) + if srcObj != nil { + fl.Source = se.GetEndpoint(srcObj) + se.L.Debug("enriched flow", zap.Any("flow", fl)) + } else { + fl.Source = nil + } + + ev.Event = fl + se.Export(ev) +} + +func (se *StandaloneEnricher) Run() { + se.Enricher.Run(se.Enrich) +} diff --git a/pkg/enricher/standalone_test.go b/pkg/enricher/standalone_test.go new file mode 100644 index 0000000000..a03c604d9f --- /dev/null +++ b/pkg/enricher/standalone_test.go @@ -0,0 +1,151 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package enricher + +import ( + "context" + "net" + "sync" + "testing" + "time" + + "github.com/cilium/cilium/api/v1/flow" + v1 "github.com/cilium/cilium/pkg/hubble/api/v1" + "github.com/microsoft/retina/pkg/common" + "github.com/microsoft/retina/pkg/controllers/cache/standalone" + "github.com/microsoft/retina/pkg/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEnricherStandaloneWithEndpointPresent(t *testing.T) { + opts := log.GetDefaultLogOpts() + opts.Level = "debug" + if _, err := log.SetupZapLogger(opts); err != nil { + t.Errorf("Error setting up logger: %s", err) + } + + eventCount := 20 + expectedOutputCount := eventCount - 2 // last written event is not readable due to ring buffers + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testCache := standalone.New() + sourceIP := "1.1.1.1" + + // Add endpoint to cache + endpoint := common.NewRetinaEndpoint("pod1", "ns1", &common.IPAddresses{IPv4: net.ParseIP(sourceIP)}) + require.NoError(t, testCache.UpdateRetinaEndpoint(endpoint)) + + // Create the enricher with standalone enabled + enricher := newStandalone(ctx, testCache) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < eventCount; i++ { + ev := &v1.Event{ + Event: &flow.Flow{ + IP: &flow.IP{ + Source: sourceIP, + }, + }, + } + enricher.Write(ev) + time.Sleep(25 * time.Millisecond) + } + }() + + enricher.Run() + + wg.Add(1) + go func() { + defer wg.Done() + count := 0 + reader := enricher.ExportReader() + for { + ev := reader.NextFollow(ctx) + if ev == nil { + break + } + fl := ev.Event.(*flow.Flow) + receivedFlow := fl.GetSource() + + assert.NotNil(t, receivedFlow, "Expected flow") + assert.Equal(t, "pod1", receivedFlow.GetPodName()) + assert.Equal(t, "ns1", receivedFlow.GetNamespace()) + + count++ + } + assert.Equal(t, expectedOutputCount, count, "Received event count mismatch") + }() + + time.Sleep(3 * time.Second) + cancel() + wg.Wait() +} + +func TestEnricherStandaloneWithEndpointAbsent(t *testing.T) { + opts := log.GetDefaultLogOpts() + opts.Level = "debug" + if _, err := log.SetupZapLogger(opts); err != nil { + t.Errorf("Error setting up logger: %s", err) + } + + eventCount := 20 + expectedOutputCount := eventCount - 2 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testCache := standalone.New() + sourceIP := "9.9.9.9" // No endpoint present in cache + + // Create the enricher with standalone enabled + enricher := newStandalone(ctx, testCache) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < eventCount; i++ { + ev := &v1.Event{ + Event: &flow.Flow{ + IP: &flow.IP{ + Source: sourceIP, + }, + }, + } + enricher.Write(ev) + time.Sleep(25 * time.Millisecond) + } + }() + + enricher.Run() + + wg.Add(1) + go func() { + defer wg.Done() + count := 0 + reader := enricher.ExportReader() + for { + ev := reader.NextFollow(ctx) + if ev == nil { + break + } + fl := ev.Event.(*flow.Flow) + receivedFlow := fl.GetSource() + assert.Nil(t, receivedFlow) + + count++ + } + assert.Equal(t, expectedOutputCount, count, "Received event count mismatch") + }() + + time.Sleep(3 * time.Second) + cancel() + wg.Wait() +} diff --git a/pkg/enricher/standard.go b/pkg/enricher/standard.go new file mode 100644 index 0000000000..f0e768f364 --- /dev/null +++ b/pkg/enricher/standard.go @@ -0,0 +1,74 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package enricher + +import ( + "context" + + fl "github.com/cilium/cilium/api/v1/flow" + v1 "github.com/cilium/cilium/pkg/hubble/api/v1" + c "github.com/microsoft/retina/pkg/controllers/cache" + "github.com/microsoft/retina/pkg/enricher/base" + "github.com/microsoft/retina/pkg/log" + "go.uber.org/zap" +) + +type StandardEnricher struct { + *base.Enricher +} + +func newStandard(ctx context.Context, cache c.CacheInterface) *StandardEnricher { + logger := log.Logger().Named("standard-enricher") + return &StandardEnricher{ + Enricher: base.NewEnricher(ctx, logger, cache), + } +} + +func NewStandard(ctx context.Context, cache c.CacheInterface) base.EnricherInterface { + base.Once.Do(func() { + base.E = newStandard(ctx, cache) + base.Initialized = true + }) + return base.E +} + +// Enrich takes the flow and enriches it with the information from the cache +func (se *StandardEnricher) Enrich(ev *v1.Event) { + flow := ev.Event.(*fl.Flow) + + // IPversion is a enum in the flow proto + // 0: IPVersion_IP_NOT_USED + // 1: IPVersion_IPv4 + // 2: IPVersion_IPv6 + if flow.GetIP().GetIpVersion() > 1 { + se.L.Error("IP version is not supported", zap.Any("IPVersion", flow.GetIP().GetIpVersion())) + return + } + if flow.GetIP().GetSource() == "" { + se.L.Debug("source IP is empty") + return + } + srcObj := se.Cache.GetObjByIP(flow.GetIP().GetSource()) + if srcObj != nil { + flow.Source = se.GetEndpoint(srcObj) + } + + if flow.GetIP().GetDestination() == "" { + se.L.Debug("destination IP is empty") + return + } + + dstObj := se.Cache.GetObjByIP(flow.GetIP().GetDestination()) + if dstObj != nil { + flow.Destination = se.GetEndpoint(dstObj) + } + + ev.Event = flow + se.L.Debug("enriched flow", zap.Any("flow", flow)) + se.Export(ev) +} + +func (se *StandardEnricher) Run() { + se.Enricher.Run(se.Enrich) +} diff --git a/pkg/enricher/enricher_test.go b/pkg/enricher/standard_test.go similarity index 96% rename from pkg/enricher/enricher_test.go rename to pkg/enricher/standard_test.go index cd05556d10..a94dfacb09 100644 --- a/pkg/enricher/enricher_test.go +++ b/pkg/enricher/standard_test.go @@ -14,6 +14,8 @@ import ( v1 "github.com/cilium/cilium/pkg/hubble/api/v1" "github.com/microsoft/retina/pkg/common" "github.com/microsoft/retina/pkg/controllers/cache" + "github.com/microsoft/retina/pkg/enricher/base" + "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/pubsub" "github.com/stretchr/testify/assert" @@ -74,7 +76,7 @@ func TestEnricherSecondaryIPs(t *testing.T) { require.NoError(t, err) // get the enricher - e := New(ctx, c) + e := newStandard(ctx, c) var wg sync.WaitGroup wg.Add(1) @@ -140,7 +142,7 @@ func TestEnricherSecondaryIPs(t *testing.T) { wg.Wait() } -func addEvent(e *Enricher, sourceIP, destIP string) { +func addEvent(e base.EnricherInterface, sourceIP, destIP string) { l := log.Logger().Named("addev") ev := &v1.Event{ Timestamp: timestamppb.Now(), diff --git a/pkg/managers/controllermanager/base/base.go b/pkg/managers/controllermanager/base/base.go new file mode 100644 index 0000000000..cc396cd2e8 --- /dev/null +++ b/pkg/managers/controllermanager/base/base.go @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package base + +import ( + "context" + "time" + + "github.com/microsoft/retina/pkg/controllers/cache" + "github.com/microsoft/retina/pkg/enricher/base" + "github.com/microsoft/retina/pkg/log" + pm "github.com/microsoft/retina/pkg/managers/pluginmanager" + sm "github.com/microsoft/retina/pkg/managers/servermanager" + "github.com/microsoft/retina/pkg/telemetry" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +const ( + ResyncTime time.Duration = 5 * time.Minute +) + +type Controller struct { + L *log.ZapLogger + HTTPServer *sm.HTTPServer + PluginManager *pm.PluginManager + Tel telemetry.Telemetry + Cache cache.CacheInterface + Enricher base.EnricherInterface +} + +func (m *Controller) Start(ctx context.Context) { + // Only track panics if telemetry is enabled + defer telemetry.TrackPanic() + + var g *errgroup.Group + + g, ctx = errgroup.WithContext(ctx) + + //nolint:gocritic + // defer m.otelAgent.Start(ctx)() + g.Go(func() error { + return m.PluginManager.Start(ctx) + }) + g.Go(func() error { + return m.HTTPServer.Start(ctx) + }) + //nolint:gocritic + // g.Go(func() error { + // return m.clusterObsCl.Start() + // }) + + if err := g.Wait(); err != nil { + m.L.Panic("Error running controller manager", zap.Error(err)) + } +} + +func (m *Controller) Stop() { + // Stop the plugin manager. This will help clean up the plugin resources. + m.PluginManager.Stop() + m.L.Info("Stopped controller manager") +} diff --git a/pkg/managers/controllermanager/controllermanager.go b/pkg/managers/controllermanager/controllermanager.go deleted file mode 100644 index 0ee6b46479..0000000000 --- a/pkg/managers/controllermanager/controllermanager.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. -package controllermanager - -import ( - "context" - "time" - - kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/controllers/cache" - "github.com/microsoft/retina/pkg/enricher" - "github.com/microsoft/retina/pkg/log" - pm "github.com/microsoft/retina/pkg/managers/pluginmanager" - sm "github.com/microsoft/retina/pkg/managers/servermanager" - "github.com/microsoft/retina/pkg/pubsub" - "github.com/microsoft/retina/pkg/telemetry" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" -) - -const ( - ResyncTime time.Duration = 5 * time.Minute -) - -type Controller struct { - l *log.ZapLogger - httpServer *sm.HTTPServer - pluginManager *pm.PluginManager - tel telemetry.Telemetry - conf *kcfg.Config - pubsub *pubsub.PubSub - cache *cache.Cache - enricher *enricher.Enricher -} - -func NewControllerManager(conf *kcfg.Config, kubeclient kubernetes.Interface, tel telemetry.Telemetry) (*Controller, error) { - cmLogger := log.Logger().Named("controller-manager") - - if conf.EnablePodLevel { - // informer factory for pods/services - factory := informers.NewSharedInformerFactory(kubeclient, ResyncTime) - factory.WaitForCacheSync(wait.NeverStop) - } - - pMgr, err := pm.NewPluginManager( - conf, - tel, - ) - if err != nil { - return nil, err - } - - // create HTTP server for API server - httpServer := sm.NewHTTPServer( - conf.APIServer.Host, - conf.APIServer.Port, - ) - - return &Controller{ - l: cmLogger, - httpServer: httpServer, - pluginManager: pMgr, - tel: tel, - conf: conf, - }, nil -} - -func (m *Controller) Init(ctx context.Context) error { - m.l.Info("Initializing controller manager ...") - - if err := m.httpServer.Init(); err != nil { - return err - } - - if m.conf.EnablePodLevel { - // create pubsub instance - m.pubsub = pubsub.New() - - // create cache instance - m.cache = cache.New(m.pubsub) - - // create enricher instance - m.enricher = enricher.New(ctx, m.cache) - } - - return nil -} - -func (m *Controller) Start(ctx context.Context) { - // Only track panics if telemetry is enabled - defer telemetry.TrackPanic() - - var g *errgroup.Group - - g, ctx = errgroup.WithContext(ctx) - - // defer m.otelAgent.Start(ctx)() - g.Go(func() error { - return m.pluginManager.Start(ctx) - }) - g.Go(func() error { - return m.httpServer.Start(ctx) - }) - // g.Go(func() error { - // return m.clusterObsCl.Start() - // }) - - if err := g.Wait(); err != nil { - m.l.Panic("Error running controller manager", zap.Error(err)) - } -} - -func (m *Controller) Stop(ctx context.Context) { - // Stop the plugin manager. This will help clean up the plugin resources. - m.pluginManager.Stop() - m.l.Info("Stopped controller manager") -} diff --git a/pkg/managers/controllermanager/standalone.go b/pkg/managers/controllermanager/standalone.go new file mode 100644 index 0000000000..fd9a3b21cd --- /dev/null +++ b/pkg/managers/controllermanager/standalone.go @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package controllermanager + +import ( + "fmt" + + kcfg "github.com/microsoft/retina/pkg/config" + "github.com/microsoft/retina/pkg/log" + "github.com/microsoft/retina/pkg/managers/controllermanager/base" + pm "github.com/microsoft/retina/pkg/managers/pluginmanager" + sm "github.com/microsoft/retina/pkg/managers/servermanager" + "github.com/microsoft/retina/pkg/telemetry" +) + +type StandaloneController struct { + base.Controller + conf *kcfg.StandaloneConfig +} + +func NewStandaloneControllerManager(conf *kcfg.StandaloneConfig, tel telemetry.Telemetry) (*StandaloneController, error) { + cmLogger := log.Logger().Named("standalone-controller-manager") + + pMgr, err := pm.NewPluginManager(kcfg.StandaloneConfigAdapter(conf), tel) + if err != nil { + return nil, fmt.Errorf("failed to create plugin manager: %w", err) + } + + // create HTTP server for API server + httpServer := sm.NewHTTPServer( + conf.APIServer.Host, + conf.APIServer.Port, + ) + + return &StandaloneController{ + Controller: base.Controller{ + L: cmLogger, + HTTPServer: httpServer, + PluginManager: pMgr, + Tel: tel, + }, + conf: conf, + }, nil +} + +func (m *StandaloneController) Init() error { + m.L.Info("Initializing standalone controller manager") + + if err := m.HTTPServer.Init(); err != nil { + return fmt.Errorf("failed to initialize HTTP server: %w", err) + } + + return nil +} diff --git a/pkg/managers/controllermanager/standalone_test.go b/pkg/managers/controllermanager/standalone_test.go new file mode 100644 index 0000000000..cd8308a94e --- /dev/null +++ b/pkg/managers/controllermanager/standalone_test.go @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package controllermanager + +import ( + "testing" + + kcfg "github.com/microsoft/retina/pkg/config" + "github.com/microsoft/retina/pkg/log" + "github.com/microsoft/retina/pkg/telemetry" + + "github.com/stretchr/testify/require" +) + +const ( + testStandaloneCfgFile = "../../config/testwith/config-standalone.yaml" +) + +func TestNewStandaloneControllerManager(t *testing.T) { + c, err := kcfg.GetStandaloneConfig(testStandaloneCfgFile) + require.NoError(t, err, "Expected no error, instead got %+v", err) + require.NotNil(t, c) + + if _, err = log.SetupZapLogger(log.GetDefaultLogOpts()); err != nil { + t.Errorf("Error setting up logger: %s", err) + } + + cm, err := NewStandaloneControllerManager(c, telemetry.NewNoopTelemetry()) + require.Error(t, err, "Expected error of not recognising windows plugins in linux, instead got no error") + require.Nil(t, cm) +} diff --git a/pkg/managers/controllermanager/standard.go b/pkg/managers/controllermanager/standard.go new file mode 100644 index 0000000000..2b8282e4de --- /dev/null +++ b/pkg/managers/controllermanager/standard.go @@ -0,0 +1,81 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package controllermanager + +import ( + "context" + "fmt" + + kcfg "github.com/microsoft/retina/pkg/config" + "github.com/microsoft/retina/pkg/controllers/cache" + "github.com/microsoft/retina/pkg/enricher" + + "github.com/microsoft/retina/pkg/log" + "github.com/microsoft/retina/pkg/managers/controllermanager/base" + pm "github.com/microsoft/retina/pkg/managers/pluginmanager" + sm "github.com/microsoft/retina/pkg/managers/servermanager" + "github.com/microsoft/retina/pkg/pubsub" + "github.com/microsoft/retina/pkg/telemetry" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" +) + +type StandardController struct { + base.Controller + conf *kcfg.Config + pubsub *pubsub.PubSub +} + +func NewStandardControllerManager(conf *kcfg.Config, kubeclient kubernetes.Interface, tel telemetry.Telemetry) (*StandardController, error) { + cmLogger := log.Logger().Named("standard-controller-manager") + + if conf.EnablePodLevel { + // informer factory for pods/services + factory := informers.NewSharedInformerFactory(kubeclient, base.ResyncTime) + factory.WaitForCacheSync(wait.NeverStop) + } + + pMgr, err := pm.NewPluginManager(conf, tel) + if err != nil { + return nil, fmt.Errorf("failed to create plugin manager: %w", err) + } + + // create HTTP server for API server + httpServer := sm.NewHTTPServer( + conf.APIServer.Host, + conf.APIServer.Port, + ) + + return &StandardController{ + Controller: base.Controller{ + L: cmLogger, + HTTPServer: httpServer, + PluginManager: pMgr, + Tel: tel, + }, + conf: conf, + }, nil +} + +func (m *StandardController) Init(ctx context.Context) error { + m.L.Info("Initializing standard controller manager ...") + + if err := m.HTTPServer.Init(); err != nil { + return fmt.Errorf("failed to initialize HTTP server: %w", err) + } + + if m.conf.EnablePodLevel { + // create pubsub instance + m.pubsub = pubsub.New() + + // create cache instance + m.Cache = cache.New(m.pubsub) + + // create enricher instance + m.Enricher = enricher.NewStandard(ctx, m.Cache) + } + + return nil +} diff --git a/pkg/managers/controllermanager/controllermanager_test.go b/pkg/managers/controllermanager/standard_test.go similarity index 90% rename from pkg/managers/controllermanager/controllermanager_test.go rename to pkg/managers/controllermanager/standard_test.go index 841dda9dd6..281a8c5133 100644 --- a/pkg/managers/controllermanager/controllermanager_test.go +++ b/pkg/managers/controllermanager/standard_test.go @@ -33,7 +33,7 @@ func TestNewControllerManager(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) kubeclient := k8sfake.NewSimpleClientset() - cm, err := NewControllerManager(c, kubeclient, telemetry.NewNoopTelemetry()) + cm, err := NewStandardControllerManager(c, kubeclient, telemetry.NewNoopTelemetry()) assert.NoError(t, err, "Expected no error, instead got %+v", err) assert.NotNil(t, cm) } @@ -45,7 +45,7 @@ func TestNewControllerManagerWin(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) kubeclient := k8sfake.NewSimpleClientset() - cm, err := NewControllerManager(c, kubeclient, telemetry.NewNoopTelemetry()) + cm, err := NewStandardControllerManager(c, kubeclient, telemetry.NewNoopTelemetry()) assert.Error(t, err, "Expected error of not recognising windows plugins in linux, instead got no error") assert.Nil(t, cm) } @@ -57,7 +57,7 @@ func TestNewControllerManagerInit(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) kubeclient := k8sfake.NewSimpleClientset() - cm, err := NewControllerManager(c, kubeclient, telemetry.NewNoopTelemetry()) + cm, err := NewStandardControllerManager(c, kubeclient, telemetry.NewNoopTelemetry()) assert.NoError(t, err, "Expected no error, instead got %+v", err) assert.NotNil(t, cm) @@ -72,7 +72,7 @@ func TestControllerPluginManagerStartFail(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) kubeclient := k8sfake.NewSimpleClientset() - cm, err := NewControllerManager(c, kubeclient, telemetry.NewNoopTelemetry()) + cm, err := NewStandardControllerManager(c, kubeclient, telemetry.NewNoopTelemetry()) assert.NoError(t, err, "Expected no error, instead got %+v", err) assert.NotNil(t, cm) @@ -98,7 +98,7 @@ func TestControllerPluginManagerStartFail(t *testing.T) { mockPlugin.EXPECT().Start(gomock.Any()).Return(errors.New("test error")).AnyTimes() mgr.SetPlugin(pluginName, mockPlugin) - cm.pluginManager = mgr + cm.PluginManager = mgr err = cm.Init(context.Background()) require.NoError(t, err, "Expected no error, instead got %+v", err) diff --git a/pkg/module/metrics/metrics_module.go b/pkg/module/metrics/metrics_module.go index 0d38c06522..d8c77a7155 100644 --- a/pkg/module/metrics/metrics_module.go +++ b/pkg/module/metrics/metrics_module.go @@ -15,7 +15,7 @@ import ( "github.com/microsoft/retina/pkg/common" kcfg "github.com/microsoft/retina/pkg/config" "github.com/microsoft/retina/pkg/controllers/cache" - "github.com/microsoft/retina/pkg/enricher" + enricher "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/exporter" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/managers/filtermanager" diff --git a/pkg/module/metrics/metrics_module_linux_test.go b/pkg/module/metrics/metrics_module_linux_test.go index 1d53ac4b54..78ba6ef19d 100644 --- a/pkg/module/metrics/metrics_module_linux_test.go +++ b/pkg/module/metrics/metrics_module_linux_test.go @@ -13,7 +13,7 @@ import ( "github.com/microsoft/retina/pkg/common" kcfg "github.com/microsoft/retina/pkg/config" "github.com/microsoft/retina/pkg/controllers/cache" - "github.com/microsoft/retina/pkg/enricher" + enricher "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/managers/filtermanager" "github.com/microsoft/retina/pkg/pubsub" diff --git a/pkg/module/metrics/standalone/metrics_module.go b/pkg/module/metrics/standalone/metrics_module.go new file mode 100644 index 0000000000..654cc3dbeb --- /dev/null +++ b/pkg/module/metrics/standalone/metrics_module.go @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package standalone + +import ( + "context" + + "github.com/microsoft/retina/pkg/enricher/base" +) + +type Module struct{} + +func InitModule(_ context.Context, _ base.EnricherInterface) *Module { + return &Module{} +} + +func (m *Module) Reconcile(_ context.Context) {} + +func (m *Module) Clear() {} diff --git a/pkg/plugin/ciliumeventobserver/ciliumeventobserver_linux.go b/pkg/plugin/ciliumeventobserver/ciliumeventobserver_linux.go index 5373e5c4fd..141c16f339 100644 --- a/pkg/plugin/ciliumeventobserver/ciliumeventobserver_linux.go +++ b/pkg/plugin/ciliumeventobserver/ciliumeventobserver_linux.go @@ -12,7 +12,7 @@ import ( v1 "github.com/cilium/cilium/pkg/hubble/api/v1" "github.com/cilium/cilium/pkg/monitor/payload" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/plugin/registry" @@ -77,8 +77,8 @@ func (c *ciliumeventobserver) Init() error { func (c *ciliumeventobserver) Start(ctx context.Context) error { if c.cfg.EnablePodLevel { - if enricher.IsInitialized() { - c.enricher = enricher.Instance() + if base.IsInitialized() { + c.enricher = base.Instance() } else { c.l.Warn("retina enricher is not initialized") } diff --git a/pkg/plugin/ciliumeventobserver/ciliumeventobserver_linux_test.go b/pkg/plugin/ciliumeventobserver/ciliumeventobserver_linux_test.go index 92eec5ce1f..92a235718c 100644 --- a/pkg/plugin/ciliumeventobserver/ciliumeventobserver_linux_test.go +++ b/pkg/plugin/ciliumeventobserver/ciliumeventobserver_linux_test.go @@ -28,9 +28,10 @@ func TestStartError(t *testing.T) { _, _ = log.SetupZapLogger(log.GetDefaultLogOpts()) c := cache.New(pubsub.New()) - e := enricher.New(ctxTimeout, c) + e := enricher.NewStandard(ctxTimeout, c) e.Run() - defer e.Reader.Close() + reader := e.ExportReader() + defer reader.Close() cfg := &config.Config{ EnablePodLevel: true, diff --git a/pkg/plugin/ciliumeventobserver/types_linux.go b/pkg/plugin/ciliumeventobserver/types_linux.go index 3f9fad500c..60502bff0d 100644 --- a/pkg/plugin/ciliumeventobserver/types_linux.go +++ b/pkg/plugin/ciliumeventobserver/types_linux.go @@ -10,7 +10,7 @@ import ( hp "github.com/cilium/cilium/pkg/hubble/parser" "github.com/cilium/cilium/pkg/monitor/payload" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" ) @@ -19,7 +19,7 @@ const name = "ciliumeventobserver" type ciliumeventobserver struct { cfg *kcfg.Config l *log.ZapLogger - enricher enricher.EnricherInterface + enricher base.EnricherInterface externalChannel chan *v1.Event payloadEvents chan *payload.Payload connection net.Conn diff --git a/pkg/plugin/dns/dns_linux.go b/pkg/plugin/dns/dns_linux.go index 0e41bb7ca9..d1194ace2b 100644 --- a/pkg/plugin/dns/dns_linux.go +++ b/pkg/plugin/dns/dns_linux.go @@ -14,7 +14,7 @@ import ( "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/dns/types" "github.com/inspektor-gadget/inspektor-gadget/pkg/utils/host" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/plugin/common" @@ -63,8 +63,8 @@ func (d *dns) Init() error { func (d *dns) Start(ctx context.Context) error { if d.cfg.EnablePodLevel { - if enricher.IsInitialized() { - d.enricher = enricher.Instance() + if base.IsInitialized() { + d.enricher = base.Instance() } else { d.l.Warn("retina enricher is not initialized") } diff --git a/pkg/plugin/dns/dns_linux_test.go b/pkg/plugin/dns/dns_linux_test.go index 397c10dc55..0b815fbccb 100644 --- a/pkg/plugin/dns/dns_linux_test.go +++ b/pkg/plugin/dns/dns_linux_test.go @@ -17,6 +17,7 @@ import ( "github.com/microsoft/retina/pkg/config" "github.com/microsoft/retina/pkg/controllers/cache" "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/plugin/common/mocks" @@ -54,9 +55,10 @@ func TestStart(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) c := cache.New(pubsub.New()) - e := enricher.New(ctxTimeout, c) + e := enricher.NewStandard(ctxTimeout, c) e.Run() - defer e.Reader.Close() + reader := e.ExportReader() + defer reader.Close() d := &dns{ l: log.Logger().Named(name), @@ -146,7 +148,7 @@ func TestRequestEventHandler(t *testing.T) { metrics.DNSRequestCounter = mockCV // Advanced metrics. - mockEnricher := enricher.NewMockEnricherInterface(ctrl) + mockEnricher := base.NewMockEnricherInterface(ctrl) mockEnricher.EXPECT().Write(EventMatched( utils.DNSType_QUERY, 0, event.DNSName, []string{event.QType}, 0, []string{}, )).Times(1) @@ -196,7 +198,7 @@ func TestResponseEventHandler(t *testing.T) { metrics.DNSResponseCounter = mockCV // Advanced metrics. - mockEnricher := enricher.NewMockEnricherInterface(ctrl) + mockEnricher := base.NewMockEnricherInterface(ctrl) mockEnricher.EXPECT().Write(EventMatched( utils.DNSType_RESPONSE, 0, event.DNSName, []string{event.QType}, 2, []string{"1.1.1.1", "2.2.2.2"}, )).Times(1) diff --git a/pkg/plugin/dns/types_linux.go b/pkg/plugin/dns/types_linux.go index 9517cc9b2f..235b4a723a 100644 --- a/pkg/plugin/dns/types_linux.go +++ b/pkg/plugin/dns/types_linux.go @@ -5,7 +5,7 @@ package dns import ( v1 "github.com/cilium/cilium/pkg/hubble/api/v1" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + enricher "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/plugin/common" diff --git a/pkg/plugin/dropreason/dropreason_linux.go b/pkg/plugin/dropreason/dropreason_linux.go index c22dc11770..a3c0aef685 100644 --- a/pkg/plugin/dropreason/dropreason_linux.go +++ b/pkg/plugin/dropreason/dropreason_linux.go @@ -19,7 +19,7 @@ import ( "github.com/cilium/ebpf/perf" "github.com/microsoft/retina/internal/ktime" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + enricher "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/loader" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" diff --git a/pkg/plugin/dropreason/dropreason_linux_test.go b/pkg/plugin/dropreason/dropreason_linux_test.go index 675b81f20f..4bbf0115ca 100644 --- a/pkg/plugin/dropreason/dropreason_linux_test.go +++ b/pkg/plugin/dropreason/dropreason_linux_test.go @@ -18,7 +18,7 @@ import ( "github.com/blang/semver/v4" "github.com/cilium/ebpf/perf" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" mocks "github.com/microsoft/retina/pkg/plugin/dropreason/mocks" @@ -201,7 +201,7 @@ func TestDropReasonRun(t *testing.T) { mockedMap := mocks.NewMockIMap(ctrl) mockedMapIterator := mocks.NewMockIMapIterator(ctrl) mockedPerfReader := mocks.NewMockIPerfReader(ctrl) - menricher := enricher.NewMockEnricherInterface(ctrl) //nolint:typecheck + menricher := base.NewMockEnricherInterface(ctrl) //nolint:typecheck // mock enricher interface // reasign helper function so that it returns the mockedMapIterator iMapIterator = func(x IMap) IMapIterator { @@ -262,7 +262,7 @@ func TestDropReasonReadDataPodLevelEnabled(t *testing.T) { mockedMap := mocks.NewMockIMap(ctrl) mockedPerfReader := mocks.NewMockIPerfReader(ctrl) - menricher := enricher.NewMockEnricherInterface(ctrl) //nolint:typecheck + menricher := base.NewMockEnricherInterface(ctrl) //nolint:typecheck // mock enricher interface // create a rawSample slice and fill it with `unsafe.Sizeof(kprobePacket{})` rawSample := make([]byte, unsafe.Sizeof(kprobePacket{})) diff --git a/pkg/plugin/dropreason/types_linux.go b/pkg/plugin/dropreason/types_linux.go index 5db5c363e2..bc5c26166f 100644 --- a/pkg/plugin/dropreason/types_linux.go +++ b/pkg/plugin/dropreason/types_linux.go @@ -10,7 +10,7 @@ import ( "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/utils" ) @@ -36,7 +36,7 @@ type dropReason struct { metricsMapData IMap isRunning bool reader IPerfReader - enricher enricher.EnricherInterface + enricher base.EnricherInterface recordsChannel chan perf.Record wg sync.WaitGroup externalChannel chan *hubblev1.Event diff --git a/pkg/plugin/packetparser/packetparser_linux.go b/pkg/plugin/packetparser/packetparser_linux.go index 9baade8e67..211538c30f 100644 --- a/pkg/plugin/packetparser/packetparser_linux.go +++ b/pkg/plugin/packetparser/packetparser_linux.go @@ -27,7 +27,7 @@ import ( "github.com/microsoft/retina/internal/ktime" "github.com/microsoft/retina/pkg/common" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + enricher "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/loader" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" diff --git a/pkg/plugin/packetparser/packetparser_linux_test.go b/pkg/plugin/packetparser/packetparser_linux_test.go index 50903852a1..091d7ed11f 100644 --- a/pkg/plugin/packetparser/packetparser_linux_test.go +++ b/pkg/plugin/packetparser/packetparser_linux_test.go @@ -21,7 +21,7 @@ import ( tc "github.com/florianl/go-tc" nl "github.com/mdlayher/netlink" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/plugin/packetparser/mocks" @@ -289,7 +289,7 @@ func TestReadData_Error(t *testing.T) { mperf := mocks.NewMockperfReader(ctrl) mperf.EXPECT().Read().Return(perf.Record{}, errors.New("error")).AnyTimes() - menricher := enricher.NewMockEnricherInterface(ctrl) //nolint:typecheck + menricher := base.NewMockEnricherInterface(ctrl) //nolint:typecheck // mock enricher interface menricher.EXPECT().Write(gomock.Any()).Times(0) p := &packetParser{ @@ -328,7 +328,7 @@ func TestReadDataPodLevelEnabled(t *testing.T) { mperf := mocks.NewMockperfReader(ctrl) mperf.EXPECT().Read().Return(record, nil).MinTimes(1) - menricher := enricher.NewMockEnricherInterface(ctrl) //nolint:typecheck + menricher := base.NewMockEnricherInterface(ctrl) //nolint:typecheck // mock enricher interface menricher.EXPECT().Write(gomock.Any()).MinTimes(1) p := &packetParser{ diff --git a/pkg/plugin/packetparser/types_linux.go b/pkg/plugin/packetparser/types_linux.go index c220ba65af..5283091a6e 100644 --- a/pkg/plugin/packetparser/types_linux.go +++ b/pkg/plugin/packetparser/types_linux.go @@ -7,6 +7,7 @@ import ( "sync" kcfg "github.com/microsoft/retina/pkg/config" + "github.com/microsoft/retina/pkg/enricher/base" v1 "github.com/cilium/cilium/pkg/hubble/api/v1" "github.com/cilium/ebpf" @@ -15,7 +16,6 @@ import ( nl "github.com/mdlayher/netlink" "github.com/vishvananda/netlink" - "github.com/microsoft/retina/pkg/enricher" "github.com/microsoft/retina/pkg/log" ) @@ -114,7 +114,7 @@ type packetParser struct { // tcMap is a map of key to *val. tcMap *sync.Map reader perfReader - enricher enricher.EnricherInterface + enricher base.EnricherInterface // interfaceLockMap is a map of key to *sync.Mutex. interfaceLockMap *sync.Map endpointIngressInfo *ebpf.ProgramInfo diff --git a/pkg/plugin/pktmon/pktmon_windows.go b/pkg/plugin/pktmon/pktmon_windows.go index 4b0759f292..75de01e3ce 100644 --- a/pkg/plugin/pktmon/pktmon_windows.go +++ b/pkg/plugin/pktmon/pktmon_windows.go @@ -12,7 +12,7 @@ import ( observerv1 "github.com/cilium/cilium/api/v1/observer" v1 "github.com/cilium/cilium/pkg/hubble/api/v1" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + enricher "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/plugin/registry" diff --git a/pkg/plugin/tcpretrans/tcpretrans_linux.go b/pkg/plugin/tcpretrans/tcpretrans_linux.go index 1f4b994f4e..44a7bd36c8 100644 --- a/pkg/plugin/tcpretrans/tcpretrans_linux.go +++ b/pkg/plugin/tcpretrans/tcpretrans_linux.go @@ -17,7 +17,7 @@ import ( "github.com/inspektor-gadget/inspektor-gadget/pkg/socketenricher" "github.com/inspektor-gadget/inspektor-gadget/pkg/utils/host" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/plugin/registry" "github.com/microsoft/retina/pkg/utils" @@ -76,8 +76,8 @@ func (t *tcpretrans) Start(ctx context.Context) error { return nil } // Set up enricher - if enricher.IsInitialized() { - t.enricher = enricher.Instance() + if base.IsInitialized() { + t.enricher = base.Instance() } else { t.l.Error(errEnricherNotInitialized.Error()) return errEnricherNotInitialized diff --git a/pkg/plugin/tcpretrans/types_linux.go b/pkg/plugin/tcpretrans/types_linux.go index b3856bbe3e..9da51ac73b 100644 --- a/pkg/plugin/tcpretrans/types_linux.go +++ b/pkg/plugin/tcpretrans/types_linux.go @@ -8,7 +8,7 @@ import ( gadgetcontext "github.com/inspektor-gadget/inspektor-gadget/pkg/gadget-context" "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/tcpretrans/tracer" kcfg "github.com/microsoft/retina/pkg/config" - "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" ) @@ -19,7 +19,7 @@ type tcpretrans struct { l *log.ZapLogger tracer *tracer.Tracer gadgetCtx *gadgetcontext.GadgetContext - enricher enricher.EnricherInterface + enricher base.EnricherInterface } var errEnricherNotInitialized = errors.New("enricher not initialized") diff --git a/test/enricher/main_linux.go b/test/enricher/main_linux.go index acb700be85..8acee59c33 100644 --- a/test/enricher/main_linux.go +++ b/test/enricher/main_linux.go @@ -11,6 +11,7 @@ import ( v1 "github.com/cilium/cilium/pkg/hubble/api/v1" "github.com/microsoft/retina/pkg/controllers/cache" "github.com/microsoft/retina/pkg/enricher" + "github.com/microsoft/retina/pkg/enricher/base" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/pubsub" "go.uber.org/zap" @@ -26,7 +27,7 @@ func main() { ctx := context.Background() c := cache.New(pubsub.New()) - e := enricher.New(ctx, c) + e := enricher.NewStandard(ctx, c) e.Run() @@ -44,7 +45,7 @@ func main() { } } -func addEvent(e *enricher.Enricher) { +func addEvent(e base.EnricherInterface) { l := log.Logger().Named("addev") ev := &v1.Event{ Timestamp: timestamppb.Now(), diff --git a/test/plugin/dns/main_linux.go b/test/plugin/dns/main_linux.go index 59cb24f50f..1a1d2458f4 100644 --- a/test/plugin/dns/main_linux.go +++ b/test/plugin/dns/main_linux.go @@ -52,7 +52,7 @@ func main() { defer cancel() c := cache.New(pubsub.New()) - e := enricher.New(ctx, c) + e := enricher.NewStandard(ctx, c) e.Run() err = tt.Generate(ctxTimeout)