diff --git a/pkg/operator/encryption/kms/health/checker.go b/pkg/operator/encryption/kms/health/checker.go new file mode 100644 index 0000000000..f299ad0d93 --- /dev/null +++ b/pkg/operator/encryption/kms/health/checker.go @@ -0,0 +1,110 @@ +package health + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/openshift/library-go/pkg/operator/encryption/kms" + "github.com/openshift/library-go/pkg/operator/encryption/state" + kmsservice "k8s.io/kms/pkg/service" +) + +const ( + StatusHealthy = "healthy" + StatusUnhealthy = "unhealthy" + StatusError = "error" +) + +type PluginHealthCondition struct { + KeyID string `json:"keyID"` + KEKID string `json:"kekID,omitempty"` + Status string `json:"status"` + LastChecked time.Time `json:"lastChecked"` + Detail string `json:"detail,omitempty"` +} + +type plugin struct { + keyID string + service kmsservice.Service +} + +type Checker struct { + plugins []plugin + now func() time.Time +} + +// NewChecker creates a Checker that probes KMS plugins at the given UDS endpoints. +// Each endpoint must be a unix:// URI matching the kmsEndpointFormat convention +// (e.g. "unix:///var/run/kmsplugin/kms-1.sock"). +func NewChecker(ctx context.Context, endpoints []string, timeout time.Duration) (*Checker, error) { + c := Checker{ + plugins: make([]plugin, 0, len(endpoints)), + now: time.Now, + } + + for _, endpoint := range endpoints { + keyID, err := keyIDFromEndpoint(endpoint) + if err != nil { + return nil, fmt.Errorf("invalid endpoint %q: %w", endpoint, err) + } + + service, err := kms.NewGRPCService(ctx, endpoint, "kms-health-monitor", timeout) + if err != nil { + return nil, fmt.Errorf("dial KMS plugin at %q: %w", endpoint, err) + } + + c.plugins = append(c.plugins, plugin{ + keyID: keyID, + service: service, + }) + } + + return &c, nil +} + +const udsScheme = "unix://" + +// keyIDFromEndpoint extracts the numeric keyID from a KMS endpoint URI. +// The endpoint must follow the convention "unix:///var/run/kmsplugin/kms-{keyID}.sock". +func keyIDFromEndpoint(endpoint string) (string, error) { + if !strings.HasPrefix(endpoint, udsScheme) { + return "", fmt.Errorf("expected %s scheme", udsScheme) + } + socketPath := strings.TrimPrefix(endpoint, udsScheme) + name := strings.TrimSuffix(filepath.Base(socketPath), filepath.Ext(socketPath)) + id, valid := state.NameToKeyID(name) + if !valid { + return "", fmt.Errorf("cannot extract numeric keyID from %q", name) + } + return fmt.Sprintf("%d", id), nil +} + +func (c *Checker) CheckStatus(ctx context.Context) []PluginHealthCondition { + conditions := make([]PluginHealthCondition, 0, len(c.plugins)) + + for _, p := range c.plugins { + cond := PluginHealthCondition{ + KeyID: p.keyID, + LastChecked: c.now(), + } + + resp, err := p.service.Status(ctx) + switch { + case err != nil: + cond.Status = StatusError + cond.Detail = err.Error() + case resp.Healthz == kms.HealthzOK: + cond.Status = StatusHealthy + cond.KEKID = resp.KeyID + default: + cond.Status = StatusUnhealthy + cond.Detail = resp.Healthz + } + + conditions = append(conditions, cond) + } + return conditions +} diff --git a/pkg/operator/encryption/kms/health/checker_test.go b/pkg/operator/encryption/kms/health/checker_test.go new file mode 100644 index 0000000000..9c87e0a613 --- /dev/null +++ b/pkg/operator/encryption/kms/health/checker_test.go @@ -0,0 +1,75 @@ +package health + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + kmsservice "k8s.io/kms/pkg/service" +) + +type fakeService struct { + resp *kmsservice.StatusResponse + err error +} + +func (f *fakeService) Status(context.Context) (*kmsservice.StatusResponse, error) { + return f.resp, f.err +} +func (f *fakeService) Encrypt(context.Context, string, []byte) (*kmsservice.EncryptResponse, error) { + return nil, nil +} +func (f *fakeService) Decrypt(context.Context, string, *kmsservice.DecryptRequest) ([]byte, error) { + return nil, nil +} + +func TestChecker_CheckStatus(t *testing.T) { + fixed := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + c := &Checker{ + plugins: []plugin{ + {keyID: "1", service: &fakeService{resp: &kmsservice.StatusResponse{Healthz: "ok", KeyID: "kek-abc"}}}, + {keyID: "2", service: &fakeService{err: fmt.Errorf("connection refused")}}, + {keyID: "3", service: &fakeService{resp: &kmsservice.StatusResponse{Healthz: "degraded"}}}, + }, + now: func() time.Time { return fixed }, + } + + got := c.CheckStatus(context.Background()) + want := []PluginHealthCondition{ + {KeyID: "1", KEKID: "kek-abc", Status: StatusHealthy, LastChecked: fixed}, + {KeyID: "2", Status: StatusError, Detail: "connection refused", LastChecked: fixed}, + {KeyID: "3", Status: StatusUnhealthy, Detail: "degraded", LastChecked: fixed}, + } + if !reflect.DeepEqual(got, want) { + t.Errorf("CheckStatus():\n got: %+v\n want: %+v", got, want) + } +} + +func Test_keyIDFromEndpoint(t *testing.T) { + tests := []struct { + endpoint string + want string + wantErr bool + }{ + {"unix:///var/run/kmsplugin/kms-1.sock", "1", false}, + {"unix:///var/run/kmsplugin/kms-2.sock", "2", false}, + {"unix:///tmp/kms-42.sock", "42", false}, + {"unix:///var/run/kmsplugin/plugin.sock", "", true}, + {"/var/run/kmsplugin/kms-1.sock", "", true}, + {"tcp://localhost:8080", "", true}, + } + for _, tt := range tests { + t.Run(tt.endpoint, func(t *testing.T) { + got, err := keyIDFromEndpoint(tt.endpoint) + if (err != nil) != tt.wantErr { + t.Errorf("keyIDFromEndpoint(%q) error = %v, wantErr %v", tt.endpoint, err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("keyIDFromEndpoint(%q) = %q, want %q", tt.endpoint, got, tt.want) + } + }) + } +} diff --git a/pkg/operator/encryption/kms/health/cmd.go b/pkg/operator/encryption/kms/health/cmd.go new file mode 100644 index 0000000000..a31f514116 --- /dev/null +++ b/pkg/operator/encryption/kms/health/cmd.go @@ -0,0 +1,144 @@ +package health + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + "github.com/openshift/library-go/pkg/controller/controllercmd" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/version" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +type options struct { + kmsSockets []string + readInterval time.Duration + readTimeout time.Duration + writeTimeout time.Duration + targetGroup string + targetVersion string + targetResource string + targetKind string + nodeName string +} + +func NewCommand(ctx context.Context) *cobra.Command { + o := &options{ + readInterval: 30 * time.Second, + readTimeout: 5 * time.Second, + writeTimeout: 10 * time.Second, + } + + startFunc := func(ctx context.Context, controllerContext *controllercmd.ControllerContext) error { + if err := o.validate(); err != nil { + return err + } + return o.run(ctx, controllerContext.KubeConfig) + } + + cfg := controllercmd.NewControllerCommandConfig( + "kms-health-monitor", + version.Info{Major: "0", Minor: "0"}, + startFunc, + clock.RealClock{}, + ) + cfg.DisableLeaderElection = true + cfg.DisableServing = true + + cmd := cfg.NewCommandWithContext(ctx) + cmd.Use = "kms-health-monitor" + cmd.Short = "Observes co-located KMSv2 plugins and publishes status as an OperatorCondition." + + o.addFlags(cmd.Flags()) + return cmd +} + +func (o *options) addFlags(fs *pflag.FlagSet) { + fs.StringSliceVar(&o.kmsSockets, "kms-sockets", nil, "KMS plugin endpoints in unix:// URI format (e.g. unix:///var/run/kmsplugin/kms-1.sock)") + fs.DurationVar(&o.readInterval, "read-interval", o.readInterval, "cadence between checks") + fs.DurationVar(&o.readTimeout, "read-timeout", o.readTimeout, "deadline for each Status RPC") + fs.DurationVar(&o.writeTimeout, "write-timeout", o.writeTimeout, "deadline for each condition update") + fs.StringVar(&o.targetGroup, "target-group", "", "API group of the operator CR (e.g. operator.openshift.io)") + fs.StringVar(&o.targetVersion, "target-version", "v1", "API version of the operator CR") + fs.StringVar(&o.targetResource, "target-resource", "", "resource name of the operator CR (e.g. kubeapiservers)") + fs.StringVar(&o.targetKind, "target-kind", "", "kind of the operator CR (e.g. KubeAPIServer)") + fs.StringVar(&o.nodeName, "node-name", "", "node name recorded in the condition to identify the origin") +} + +func (o *options) validate() error { + if len(o.kmsSockets) == 0 { + return fmt.Errorf("--kms-sockets is required, at least one") + } + for _, s := range o.kmsSockets { + if strings.TrimSpace(s) == "" { + return fmt.Errorf("--kms-sockets cannot contain empty entries") + } + } + if o.readInterval <= 0 { + return fmt.Errorf("--read-interval must be positive") + } + if o.readTimeout <= 0 { + return fmt.Errorf("--read-timeout must be positive") + } + if o.writeTimeout <= 0 { + return fmt.Errorf("--write-timeout must be positive") + } + if o.nodeName == "" { + return fmt.Errorf("--node-name is required") + } + if o.targetGroup == "" { + return fmt.Errorf("--target-group is required") + } + if o.targetResource == "" { + return fmt.Errorf("--target-resource is required") + } + if o.targetKind == "" { + return fmt.Errorf("--target-kind is required") + } + return nil +} + +func (o *options) run(ctx context.Context, restConfig *rest.Config) error { + gvr := schema.GroupVersionResource{Group: o.targetGroup, Version: o.targetVersion, Resource: o.targetResource} + gvk := schema.GroupVersionKind{Group: o.targetGroup, Version: o.targetVersion, Kind: o.targetKind} + + klog.InfoS("kms-health-monitor starting", + "sockets", o.kmsSockets, + "target", gvr.String(), + "observerNode", o.nodeName, + "interval", o.readInterval, + "readTimeout", o.readTimeout, + "writeTimeout", o.writeTimeout, + ) + + writer, err := newWriter(restConfig, gvr, gvk, o.nodeName) + if err != nil { + return fmt.Errorf("create writer: %w", err) + } + checker, err := NewChecker(ctx, o.kmsSockets, o.readTimeout) + if err != nil { + return fmt.Errorf("create checker: %w", err) + } + + wait.JitterUntilWithContext(ctx, func(ctx context.Context) { + probeCtx, cancel := context.WithTimeout(ctx, o.readTimeout) + defer cancel() + conditions := checker.CheckStatus(probeCtx) + + writeCtx, writeCancel := context.WithTimeout(ctx, o.writeTimeout) + defer writeCancel() + if err := writer.Apply(writeCtx, conditions); err != nil { + klog.ErrorS(err, "apply operator status") + } + }, o.readInterval, 0.1, false) + + return nil +} diff --git a/pkg/operator/encryption/kms/health/writer.go b/pkg/operator/encryption/kms/health/writer.go new file mode 100644 index 0000000000..1e79ce62ef --- /dev/null +++ b/pkg/operator/encryption/kms/health/writer.go @@ -0,0 +1,72 @@ +package health + +import ( + "context" + "encoding/json" + "fmt" + + operatorv1 "github.com/openshift/api/operator/v1" + applyoperatorv1 "github.com/openshift/client-go/operator/applyconfigurations/operator/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" +) + +const operatorCRName = "cluster" + +type Writer struct { + client dynamic.ResourceInterface + gvk schema.GroupVersionKind + nodeName string +} + +func newWriter(cfg *rest.Config, gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, nodeName string) (*Writer, error) { + dynamicClient, err := dynamic.NewForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("create dynamic client: %w", err) + } + + return &Writer{ + client: dynamicClient.Resource(gvr), + gvk: gvk, + nodeName: nodeName, + }, nil +} + +func (w *Writer) Apply(ctx context.Context, conditions []PluginHealthCondition) error { + msg, err := json.Marshal(conditions) + if err != nil { + return fmt.Errorf("marshal conditions: %w", err) + } + + cond := applyoperatorv1.OperatorCondition(). + WithType("KMSHealthReporter_" + w.nodeName). + WithStatus(operatorv1.ConditionTrue). + WithReason("AsExpected"). + WithMessage(string(msg)) + + status := applyoperatorv1.OperatorStatus().WithConditions(cond) + + statusUnstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(status) + if err != nil { + return fmt.Errorf("convert status to unstructured: %w", err) + } + + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "status": statusUnstructured, + }, + } + obj.SetGroupVersionKind(w.gvk) + obj.SetName(operatorCRName) + + fieldManager := "kms-health-monitor-" + w.nodeName + _, err = w.client.ApplyStatus(ctx, operatorCRName, obj, metav1.ApplyOptions{ + Force: true, + FieldManager: fieldManager, + }) + return err +} diff --git a/pkg/operator/encryption/kms/health/writer_test.go b/pkg/operator/encryption/kms/health/writer_test.go new file mode 100644 index 0000000000..53e54a663d --- /dev/null +++ b/pkg/operator/encryption/kms/health/writer_test.go @@ -0,0 +1,141 @@ +package health + +import ( + "context" + "encoding/json" + "reflect" + "testing" + "time" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + clienttesting "k8s.io/client-go/testing" + + dynamicfake "k8s.io/client-go/dynamic/fake" +) + +func TestWriter_Apply(t *testing.T) { + tests := []struct { + name string + nodeName string + conditions []PluginHealthCondition + wantType string + wantMsg string + }{ + { + name: "single healthy plugin", + nodeName: "master-0", + conditions: []PluginHealthCondition{ + {KeyID: "1", KEKID: "key-abc", Status: StatusHealthy, LastChecked: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)}, + }, + wantType: "KMSHealthReporter_master-0", + wantMsg: `[{"keyID":"1","kekID":"key-abc","status":"healthy","lastChecked":"2025-01-01T00:00:00Z"}]`, + }, + { + name: "mixed healthy and error", + nodeName: "master-1", + conditions: []PluginHealthCondition{ + {KeyID: "1", KEKID: "key-abc", Status: StatusHealthy, LastChecked: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)}, + {KeyID: "2", Status: StatusError, Detail: "connection refused", LastChecked: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)}, + }, + wantType: "KMSHealthReporter_master-1", + wantMsg: `[{"keyID":"1","kekID":"key-abc","status":"healthy","lastChecked":"2025-01-01T00:00:00Z"},{"keyID":"2","status":"error","lastChecked":"2025-01-01T00:00:00Z","detail":"connection refused"}]`, + }, + { + name: "unhealthy plugin", + nodeName: "master-2", + conditions: []PluginHealthCondition{ + {KeyID: "1", Status: StatusUnhealthy, Detail: "not ready", LastChecked: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)}, + }, + wantType: "KMSHealthReporter_master-2", + wantMsg: `[{"keyID":"1","status":"unhealthy","lastChecked":"2025-01-01T00:00:00Z","detail":"not ready"}]`, + }, + } + + gvr := schema.GroupVersionResource{Group: "operator.openshift.io", Version: "v1", Resource: "kubeapiservers"} + gvk := schema.GroupVersionKind{Group: "operator.openshift.io", Version: "v1", Kind: "KubeAPIServer"} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := runtime.NewScheme() + fakeClient := dynamicfake.NewSimpleDynamicClient(scheme) + + var captured *unstructured.Unstructured + fakeClient.PrependReactor("patch", "kubeapiservers", func(action clienttesting.Action) (bool, runtime.Object, error) { + patchAction := action.(clienttesting.PatchAction) + obj := &unstructured.Unstructured{} + if err := json.Unmarshal(patchAction.GetPatch(), &obj.Object); err != nil { + t.Fatalf("unmarshal patch: %v", err) + } + captured = obj + return true, obj, nil + }) + + w := &Writer{ + client: fakeClient.Resource(gvr), + gvk: gvk, + nodeName: tt.nodeName, + } + + if err := w.Apply(context.Background(), tt.conditions); err != nil { + t.Fatalf("Apply() error: %v", err) + } + + if captured == nil { + t.Fatal("no patch was issued") + } + + statusObj, ok := captured.Object["status"] + if !ok { + t.Fatal("no status in patch") + } + statusMap, ok := statusObj.(map[string]interface{}) + if !ok { + t.Fatal("status is not a map") + } + conditionsRaw, ok := statusMap["conditions"] + if !ok { + t.Fatal("no conditions in status") + } + condList, ok := conditionsRaw.([]interface{}) + if !ok { + t.Fatalf("conditions is not a list, got %T", conditionsRaw) + } + if len(condList) != 1 { + t.Fatalf("expected 1 condition, got %d", len(condList)) + } + + condMap := condList[0].(map[string]interface{}) + if got := condMap["type"]; got != tt.wantType { + t.Errorf("condition type = %q, want %q", got, tt.wantType) + } + if got := condMap["status"]; got != "True" { + t.Errorf("condition status = %q, want %q", got, "True") + } + if got := condMap["reason"]; got != "AsExpected" { + t.Errorf("condition reason = %q, want %q", got, "AsExpected") + } + + var gotParsed, wantParsed interface{} + if err := json.Unmarshal([]byte(condMap["message"].(string)), &gotParsed); err != nil { + t.Fatalf("parse condition message: %v", err) + } + if err := json.Unmarshal([]byte(tt.wantMsg), &wantParsed); err != nil { + t.Fatalf("parse want message: %v", err) + } + if !reflect.DeepEqual(gotParsed, wantParsed) { + t.Errorf("condition message:\n got: %s\n want: %s", condMap["message"], tt.wantMsg) + } + + // Verify SSA patch type + for _, a := range fakeClient.Actions() { + if pa, ok := a.(clienttesting.PatchAction); ok { + if pa.GetPatchType() != "application/apply-patch+yaml" { + t.Errorf("patch type = %q, want application/apply-patch+yaml", pa.GetPatchType()) + } + } + } + }) + } +} diff --git a/pkg/operator/encryption/kms/preflight/checker.go b/pkg/operator/encryption/kms/preflight/checker.go index 4570218fb8..b9eb3ad0de 100644 --- a/pkg/operator/encryption/kms/preflight/checker.go +++ b/pkg/operator/encryption/kms/preflight/checker.go @@ -9,15 +9,12 @@ import ( "strings" "time" + "github.com/openshift/library-go/pkg/operator/encryption/kms" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" kmsservice "k8s.io/kms/pkg/service" ) -// healthzOK is the value the KMS plugin returns when healthy. -// See https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/kms/apis/v2/api.proto#L39 -const healthzOK = "ok" - // checker runs the preflight check against a KMS plugin by calling // Status, Encrypt, and Decrypt on the kmsservice.Service interface. // this is the same interface the apiserver uses. @@ -60,7 +57,7 @@ func (c *checker) checkStatus(ctx context.Context) error { // we only check healthz here. // version and keyID validation is the apiserver's responsibility, // the preflight check just confirms the plugin is reachable and healthy. - if resp.Healthz != healthzOK { + if resp.Healthz != kms.HealthzOK { klog.Infof(" not ready: healthz=%q, latency=%v", resp.Healthz, elapsed) return false, nil } diff --git a/pkg/operator/encryption/kms/preflight/cmd.go b/pkg/operator/encryption/kms/preflight/cmd.go index f64390e6d6..d97195b4e3 100644 --- a/pkg/operator/encryption/kms/preflight/cmd.go +++ b/pkg/operator/encryption/kms/preflight/cmd.go @@ -8,7 +8,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" - k8senvelopekmsv2 "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2" + "github.com/openshift/library-go/pkg/operator/encryption/kms" "k8s.io/klog/v2" ) @@ -51,9 +51,7 @@ func (o *options) validate() error { func (o *options) run(ctx context.Context) error { klog.Infof("Running KMS preflight check at %s", kmsSocketEndpoint) - // k8senvelopekmsv2.NewGRPCService is not a public API and may change. - // If it breaks, we can inline a minimal gRPC client using k8s.io/kms directly. - service, err := k8senvelopekmsv2.NewGRPCService(ctx, kmsSocketEndpoint, "preflight", o.kmsCallTimeout) + service, err := kms.NewGRPCService(ctx, kmsSocketEndpoint, "preflight", o.kmsCallTimeout) if err != nil { return fmt.Errorf("failed to create KMS gRPC client: %w", err) } diff --git a/pkg/operator/encryption/kms/probe.go b/pkg/operator/encryption/kms/probe.go new file mode 100644 index 0000000000..01c44ec71f --- /dev/null +++ b/pkg/operator/encryption/kms/probe.go @@ -0,0 +1,18 @@ +package kms + +import ( + "context" + "time" + + k8senvelopekmsv2 "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2" + kmsservice "k8s.io/kms/pkg/service" +) + +// HealthzOK is the value the KMS plugin returns when healthy. +// See https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/kms/apis/v2/api.proto#L39 +const HealthzOK = "ok" + +// NewGRPCService creates a KMS v2 gRPC service client connected to the given endpoint. +func NewGRPCService(ctx context.Context, endpoint, providerName string, timeout time.Duration) (kmsservice.Service, error) { + return k8senvelopekmsv2.NewGRPCService(ctx, endpoint, providerName, timeout) +}