Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions pkg/operator/encryption/kms/health/checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package health

import (
"context"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/openshift/library-go/pkg/operator/encryption/kms"
"github.com/openshift/library-go/pkg/operator/encryption/state"
kmsservice "k8s.io/kms/pkg/service"
)

const (
StatusHealthy = "healthy"
StatusUnhealthy = "unhealthy"
StatusError = "error"
)

type PluginHealthCondition struct {
KeyID string `json:"keyID"`
KEKID string `json:"kekID,omitempty"`
Status string `json:"status"`
LastChecked time.Time `json:"lastChecked"`
Detail string `json:"detail,omitempty"`
}

type plugin struct {
keyID string
service kmsservice.Service
}

type Checker struct {
plugins []plugin
now func() time.Time
}

// NewChecker creates a Checker that probes KMS plugins at the given UDS endpoints.
// Each endpoint must be a unix:// URI matching the kmsEndpointFormat convention
// (e.g. "unix:///var/run/kmsplugin/kms-1.sock").
func NewChecker(ctx context.Context, endpoints []string, timeout time.Duration) (*Checker, error) {
c := Checker{
plugins: make([]plugin, 0, len(endpoints)),
now: time.Now,
}

for _, endpoint := range endpoints {
keyID, err := keyIDFromEndpoint(endpoint)
if err != nil {
return nil, fmt.Errorf("invalid endpoint %q: %w", endpoint, err)
}

service, err := kms.NewGRPCService(ctx, endpoint, "kms-health-monitor", timeout)
if err != nil {
return nil, fmt.Errorf("dial KMS plugin at %q: %w", endpoint, err)
}

c.plugins = append(c.plugins, plugin{
keyID: keyID,
service: service,
})
}

return &c, nil
}

const udsScheme = "unix://"

// keyIDFromEndpoint extracts the numeric keyID from a KMS endpoint URI.
// The endpoint must follow the convention "unix:///var/run/kmsplugin/kms-{keyID}.sock".
func keyIDFromEndpoint(endpoint string) (string, error) {
if !strings.HasPrefix(endpoint, udsScheme) {
return "", fmt.Errorf("expected %s scheme", udsScheme)
}
socketPath := strings.TrimPrefix(endpoint, udsScheme)
name := strings.TrimSuffix(filepath.Base(socketPath), filepath.Ext(socketPath))
id, valid := state.NameToKeyID(name)
if !valid {
return "", fmt.Errorf("cannot extract numeric keyID from %q", name)
}
return fmt.Sprintf("%d", id), nil
}

func (c *Checker) CheckStatus(ctx context.Context) []PluginHealthCondition {
conditions := make([]PluginHealthCondition, 0, len(c.plugins))

for _, p := range c.plugins {
cond := PluginHealthCondition{
KeyID: p.keyID,
LastChecked: c.now(),
}

resp, err := p.service.Status(ctx)
switch {
case err != nil:
cond.Status = StatusError
cond.Detail = err.Error()
case resp.Healthz == kms.HealthzOK:
cond.Status = StatusHealthy
cond.KEKID = resp.KeyID
default:
cond.Status = StatusUnhealthy
cond.Detail = resp.Healthz
Comment on lines +94 to +104

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Guard against nil Status() responses before dereferencing.

If Status(ctx) returns (nil, nil), Line 99 panics on resp.Healthz. Treat nil responses as an error condition instead.

Suggested fix
 		resp, err := p.service.Status(ctx)
 		switch {
 		case err != nil:
 			cond.Status = StatusError
 			cond.Detail = err.Error()
+		case resp == nil:
+			cond.Status = StatusError
+			cond.Detail = "empty status response"
 		case resp.Healthz == kms.HealthzOK:
 			cond.Status = StatusHealthy
 			cond.KEKID = resp.KeyID
 		default:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
resp, err := p.service.Status(ctx)
switch {
case err != nil:
cond.Status = StatusError
cond.Detail = err.Error()
case resp.Healthz == kms.HealthzOK:
cond.Status = StatusHealthy
cond.KEKID = resp.KeyID
default:
cond.Status = StatusUnhealthy
cond.Detail = resp.Healthz
resp, err := p.service.Status(ctx)
switch {
case err != nil:
cond.Status = StatusError
cond.Detail = err.Error()
case resp == nil:
cond.Status = StatusError
cond.Detail = "empty status response"
case resp.Healthz == kms.HealthzOK:
cond.Status = StatusHealthy
cond.KEKID = resp.KeyID
default:
cond.Status = StatusUnhealthy
cond.Detail = resp.Healthz
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/operator/encryption/kms/health/checker.go` around lines 94 - 104, The
health checker currently dereferences resp without guarding against a nil return
from p.service.Status(ctx); update the logic in the Status handling (the block
using p.service.Status(ctx), resp and cond) to first check if err != nil OR resp
== nil and treat either as an error case (set cond.Status = StatusError and
cond.Detail to the error message or a suitable "nil response" message), and only
if resp is non-nil proceed to inspect resp.Healthz (compare to kms.HealthzOK to
set StatusHealthy and cond.KEKID, otherwise set StatusUnhealthy and cond.Detail
= resp.Healthz). Ensure no dereference of resp occurs when it may be nil.

}

conditions = append(conditions, cond)
}
return conditions
}
75 changes: 75 additions & 0 deletions pkg/operator/encryption/kms/health/checker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package health

import (
"context"
"fmt"
"reflect"
"testing"
"time"

kmsservice "k8s.io/kms/pkg/service"
)

type fakeService struct {
resp *kmsservice.StatusResponse
err error
}

func (f *fakeService) Status(context.Context) (*kmsservice.StatusResponse, error) {
return f.resp, f.err
}
func (f *fakeService) Encrypt(context.Context, string, []byte) (*kmsservice.EncryptResponse, error) {
return nil, nil
}
func (f *fakeService) Decrypt(context.Context, string, *kmsservice.DecryptRequest) ([]byte, error) {
return nil, nil
}

func TestChecker_CheckStatus(t *testing.T) {
fixed := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
c := &Checker{
plugins: []plugin{
{keyID: "1", service: &fakeService{resp: &kmsservice.StatusResponse{Healthz: "ok", KeyID: "kek-abc"}}},
{keyID: "2", service: &fakeService{err: fmt.Errorf("connection refused")}},
{keyID: "3", service: &fakeService{resp: &kmsservice.StatusResponse{Healthz: "degraded"}}},
},
now: func() time.Time { return fixed },
}

got := c.CheckStatus(context.Background())
want := []PluginHealthCondition{
{KeyID: "1", KEKID: "kek-abc", Status: StatusHealthy, LastChecked: fixed},
{KeyID: "2", Status: StatusError, Detail: "connection refused", LastChecked: fixed},
{KeyID: "3", Status: StatusUnhealthy, Detail: "degraded", LastChecked: fixed},
}
if !reflect.DeepEqual(got, want) {
t.Errorf("CheckStatus():\n got: %+v\n want: %+v", got, want)
}
}

func Test_keyIDFromEndpoint(t *testing.T) {
tests := []struct {
endpoint string
want string
wantErr bool
}{
{"unix:///var/run/kmsplugin/kms-1.sock", "1", false},
{"unix:///var/run/kmsplugin/kms-2.sock", "2", false},
{"unix:///tmp/kms-42.sock", "42", false},
{"unix:///var/run/kmsplugin/plugin.sock", "", true},
{"/var/run/kmsplugin/kms-1.sock", "", true},
{"tcp://localhost:8080", "", true},
}
for _, tt := range tests {
t.Run(tt.endpoint, func(t *testing.T) {
got, err := keyIDFromEndpoint(tt.endpoint)
if (err != nil) != tt.wantErr {
t.Errorf("keyIDFromEndpoint(%q) error = %v, wantErr %v", tt.endpoint, err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("keyIDFromEndpoint(%q) = %q, want %q", tt.endpoint, got, tt.want)
}
})
}
}
144 changes: 144 additions & 0 deletions pkg/operator/encryption/kms/health/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package health

import (
"context"
"fmt"
"strings"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/openshift/library-go/pkg/controller/controllercmd"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)

type options struct {
kmsSockets []string
readInterval time.Duration
readTimeout time.Duration
writeTimeout time.Duration
targetGroup string
targetVersion string
targetResource string
targetKind string
nodeName string
}

func NewCommand(ctx context.Context) *cobra.Command {
o := &options{
readInterval: 30 * time.Second,
readTimeout: 5 * time.Second,
writeTimeout: 10 * time.Second,
}

startFunc := func(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
if err := o.validate(); err != nil {
return err
}
return o.run(ctx, controllerContext.KubeConfig)
}

cfg := controllercmd.NewControllerCommandConfig(
"kms-health-monitor",
version.Info{Major: "0", Minor: "0"},
startFunc,
clock.RealClock{},
)
cfg.DisableLeaderElection = true
cfg.DisableServing = true

cmd := cfg.NewCommandWithContext(ctx)
cmd.Use = "kms-health-monitor"
cmd.Short = "Observes co-located KMSv2 plugins and publishes status as an OperatorCondition."

o.addFlags(cmd.Flags())
return cmd
}

func (o *options) addFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&o.kmsSockets, "kms-sockets", nil, "KMS plugin endpoints in unix:// URI format (e.g. unix:///var/run/kmsplugin/kms-1.sock)")
fs.DurationVar(&o.readInterval, "read-interval", o.readInterval, "cadence between checks")
fs.DurationVar(&o.readTimeout, "read-timeout", o.readTimeout, "deadline for each Status RPC")
fs.DurationVar(&o.writeTimeout, "write-timeout", o.writeTimeout, "deadline for each condition update")
fs.StringVar(&o.targetGroup, "target-group", "", "API group of the operator CR (e.g. operator.openshift.io)")
fs.StringVar(&o.targetVersion, "target-version", "v1", "API version of the operator CR")
fs.StringVar(&o.targetResource, "target-resource", "", "resource name of the operator CR (e.g. kubeapiservers)")
fs.StringVar(&o.targetKind, "target-kind", "", "kind of the operator CR (e.g. KubeAPIServer)")
fs.StringVar(&o.nodeName, "node-name", "", "node name recorded in the condition to identify the origin")
}

func (o *options) validate() error {
if len(o.kmsSockets) == 0 {
return fmt.Errorf("--kms-sockets is required, at least one")
}
for _, s := range o.kmsSockets {
if strings.TrimSpace(s) == "" {
return fmt.Errorf("--kms-sockets cannot contain empty entries")
}
}
if o.readInterval <= 0 {
return fmt.Errorf("--read-interval must be positive")
}
if o.readTimeout <= 0 {
return fmt.Errorf("--read-timeout must be positive")
}
if o.writeTimeout <= 0 {
return fmt.Errorf("--write-timeout must be positive")
}
if o.nodeName == "" {
return fmt.Errorf("--node-name is required")
}
if o.targetGroup == "" {
return fmt.Errorf("--target-group is required")
}
if o.targetResource == "" {
return fmt.Errorf("--target-resource is required")
}
if o.targetKind == "" {
return fmt.Errorf("--target-kind is required")
}
return nil
}

func (o *options) run(ctx context.Context, restConfig *rest.Config) error {
gvr := schema.GroupVersionResource{Group: o.targetGroup, Version: o.targetVersion, Resource: o.targetResource}
gvk := schema.GroupVersionKind{Group: o.targetGroup, Version: o.targetVersion, Kind: o.targetKind}

klog.InfoS("kms-health-monitor starting",
"sockets", o.kmsSockets,
"target", gvr.String(),
"observerNode", o.nodeName,
"interval", o.readInterval,
"readTimeout", o.readTimeout,
"writeTimeout", o.writeTimeout,
)
Comment on lines +113 to +120

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid logging raw socket paths and node identity at startup.

Line 114 and Line 116 emit internal topology details (kmsSockets, observerNode). Prefer non-sensitive aggregates (for example, socket count).

Suggested fix
 	klog.InfoS("kms-health-monitor starting",
-		"sockets", o.kmsSockets,
+		"socketCount", len(o.kmsSockets),
 		"target", gvr.String(),
-		"observerNode", o.nodeName,
 		"interval", o.readInterval,
 		"readTimeout", o.readTimeout,
 		"writeTimeout", o.writeTimeout,
 	)

As per coding guidelines **/*.{go,py,java,js,ts,tsx}: flag logging that may expose internal hostnames or customer/internal data.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/operator/encryption/kms/health/cmd.go` around lines 113 - 120, The
startup log in klog.InfoS should stop emitting raw sensitive topology fields
o.kmsSockets and o.nodeName; instead, log non-sensitive aggregates (e.g.,
len(o.kmsSockets) as "socketCount") and remove or redact the observer node
identity. Update the call site that constructs the InfoS message (the klog.InfoS
invocation in cmd.go) to replace "sockets", o.kmsSockets with a socket count
aggregate and drop or replace "observerNode", o.nodeName with a non-identifying
indicator (or omit it), keeping other fields like "target", "interval",
"readTimeout", and "writeTimeout" unchanged.


writer, err := newWriter(restConfig, gvr, gvk, o.nodeName)
if err != nil {
return fmt.Errorf("create writer: %w", err)
}
checker, err := NewChecker(ctx, o.kmsSockets, o.readTimeout)
if err != nil {
return fmt.Errorf("create checker: %w", err)
}

wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
probeCtx, cancel := context.WithTimeout(ctx, o.readTimeout)
defer cancel()
conditions := checker.CheckStatus(probeCtx)

Comment on lines +131 to +135

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Apply read timeout per RPC, not per full sweep.

Line 67 documents --read-timeout as “deadline for each Status RPC”, but Line 132 sets one deadline for the entire CheckStatus loop. With multiple plugins, later RPCs inherit a shortened deadline.

Suggested fix
 	wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
-		probeCtx, cancel := context.WithTimeout(ctx, o.readTimeout)
-		defer cancel()
-		conditions := checker.CheckStatus(probeCtx)
+		conditions := checker.CheckStatus(ctx)
 
 		writeCtx, writeCancel := context.WithTimeout(ctx, o.writeTimeout)
 		defer writeCancel()
 		if err := writer.Apply(writeCtx, conditions); err != nil {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
probeCtx, cancel := context.WithTimeout(ctx, o.readTimeout)
defer cancel()
conditions := checker.CheckStatus(probeCtx)
wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
conditions := checker.CheckStatus(ctx)
writeCtx, writeCancel := context.WithTimeout(ctx, o.writeTimeout)
defer writeCancel()
if err := writer.Apply(writeCtx, conditions); err != nil {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/operator/encryption/kms/health/cmd.go` around lines 131 - 135, The
current loop creates a single context.WithTimeout(ctx, o.readTimeout) around the
whole CheckStatus sweep which shorts the deadline for later RPCs; instead remove
the per-sweep timeout in the wait.JitterUntilWithContext callback and ensure
timeout is applied per RPC inside checker.CheckStatus (or change
checker.CheckStatus signature to accept o.readTimeout so it can create a
separate context.WithTimeout for each Status RPC). Update calls to
checker.CheckStatus and its internals to create a new
context.WithTimeout(parentCtx, readTimeout) for each plugin/RPC invocation so
each RPC gets the full configured o.readTimeout rather than sharing one across
the whole sweep.

writeCtx, writeCancel := context.WithTimeout(ctx, o.writeTimeout)
defer writeCancel()
if err := writer.Apply(writeCtx, conditions); err != nil {
klog.ErrorS(err, "apply operator status")
}
}, o.readInterval, 0.1, false)

return nil
}
Loading