Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions controllers/secret_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"context"
"errors"
"fmt"

"golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -355,8 +356,24 @@ func (r *SecretReconciler) updateUserData(ctx context.Context, keySigner ssh.Sig
nodeconfig.PubKeyHashAnnotation: expectedPubKeyAnno,
}
} else {
// For Nodes associated with Machines, clear the public key annotation, as the clearing of the
// annotation is used solely to kick off the deletion and recreation of Machines, causing them to be
// For Nodes associated with Machines, check cluster-wide upgrade limit before clearing annotation

// Check if we can mark this node as upgrading
if err := markNodeAsUpgrading(ctx, r.client, &node); err != nil {
var upgradeErr *UpgradeLimitExceededError
if errors.As(err, &upgradeErr) {
// Upgrade limit reached - defer this node
r.log.Info("deferring userData update due to upgrade limit",
"node", node.GetName(), "reason", upgradeErr.Error())
// Skip clearing annotation for this node - will retry on next reconcile
continue
}
return err
}

// Safe to clear annotation now (cluster-wide lock acquired)
// The clearing of the annotation is used solely to kick off the deletion and recreation
// of Machines, causing them to be
// provisioned with the new userdata
annotationsToApply = map[string]string{nodeconfig.PubKeyHashAnnotation: ""}
}
Expand Down
16 changes: 15 additions & 1 deletion controllers/windowsmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,21 @@ func (r *WindowsMachineReconciler) Reconcile(ctx context.Context,
// If the private key used to configure the machine is out of date, the machine should be deleted
if node.Annotations[nodeconfig.PubKeyHashAnnotation] !=
nodeconfig.CreatePubKeyHashAnnotation(r.signer.PublicKey()) {
log.Info("deleting machine")
log.Info("deleting machine due to public key mismatch", "machine", machine.Name)

// Check cluster-wide upgrade limit before deletion (defense in depth)
if err := markNodeAsUpgrading(ctx, r.client, node); err != nil {
var upgradeErr *UpgradeLimitExceededError
if !errors.As(err, &upgradeErr) {
return ctrl.Result{}, err
}
// Upgrade limit reached - log and requeue
r.log.Info(upgradeErr.Error())
r.recorder.Eventf(machine, core.EventTypeWarning, "MachineDeletionDeferred",
"Machine %v deletion deferred: %v", machine.Name, upgradeErr.Error())
return ctrl.Result{Requeue: true}, nil
}

deletionAllowed, err := r.isAllowedDeletion(ctx, machine)
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to determine if Machine can be deleted: %w", err)
Expand Down
171 changes: 171 additions & 0 deletions test/e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@ import (
"context"
"flag"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"testing"
"time"

config "github.com/openshift/api/config/v1"
imageClient "github.com/openshift/client-go/image/clientset/versioned/typed/image/v1"
"github.com/openshift/library-go/pkg/image/imageutil"
core "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/openshift/windows-machine-config-operator/pkg/retry"
"github.com/openshift/windows-machine-config-operator/test/e2e/clusterinfo"
Expand Down Expand Up @@ -184,3 +191,167 @@ func TestMain(m *testing.M) {

os.Exit(m.Run())
}

// getRepoRoot resolves the repository root directory by walking up from the test file location
func getRepoRoot() (string, error) {
_, currentFile, _, ok := runtime.Caller(1)
if !ok {
return "", fmt.Errorf("failed to get current file location")
}
dir := filepath.Dir(currentFile)
for {
if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil {
return dir, nil
}
parent := filepath.Dir(dir)
if parent == dir {
// Reached filesystem root without finding go.mod
return "", fmt.Errorf("failed to find repository root: could not locate go.mod")
}
dir = parent
}
}

// deployParallelUpgradesChecker deploys the monitoring job that checks for MaxParallelUpgrades violations
func (tc *testContext) deployParallelUpgradesChecker() error {
// Resolve repo root to handle relative path correctly regardless of test working directory
repoRoot, err := getRepoRoot()
if err != nil {
return fmt.Errorf("failed to resolve repository root: %w", err)
}

// Read the job YAML template
jobPath := filepath.Join(repoRoot, "hack/e2e/resources/parallel-upgrade-checker-job.yaml")
jobYAML, err := os.ReadFile(jobPath)
if err != nil {
return fmt.Errorf("failed to read parallel upgrades checker job YAML: %w", err)
}

// Get the tools image from the OpenShift imagestream
toolsImage, err := getOpenShiftToolsImage(tc.client.Images)
if err != nil {
// Fall back to a default tools image if imagestream is not available
log.Printf("Warning: could not get tools image from imagestream: %v. Using default.", err)
toolsImage = "registry.ci.openshift.org/ocp/4.17:tools"
}

// Replace placeholders
jobYAMLStr := string(jobYAML)
jobYAMLStr = strings.Replace(jobYAMLStr, "REPLACE_WITH_OPENSHIFT_TOOLS_IMAGE", toolsImage, 1)
jobYAMLStr = strings.Replace(jobYAMLStr, "namespace: wmco-test",
fmt.Sprintf("namespace: %s", tc.workloadNamespace), 1)

// Create the job using kubectl apply
cmd := exec.Command("kubectl", "apply", "-f", "-")
cmd.Stdin = strings.NewReader(jobYAMLStr)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to create parallel upgrades checker job: %w\nOutput: %s", err, string(output))
}

log.Printf("Deployed parallel upgrades checker job in namespace %s", tc.workloadNamespace)
return nil
}

// verifyParallelUpgradesChecker checks if the parallel upgrades checker job detected any violations
func (tc *testContext) verifyParallelUpgradesChecker() error {
var jobCompleteErr error

// Poll the job status until it reaches a terminal state or timeout
err := wait.Poll(tc.retryInterval, tc.timeout, func() (bool, error) {
job, err := tc.client.K8s.BatchV1().Jobs(tc.workloadNamespace).Get(context.TODO(),
"parallel-upgrades-checker", meta.GetOptions{})
if err != nil {
return false, fmt.Errorf("failed to get parallel upgrades checker job: %w", err)
}

// Job succeeded
if job.Status.Succeeded > 0 {
log.Printf("Parallel upgrades checker job succeeded with %d completions", job.Status.Succeeded)
return true, nil
}

// Job failed
if job.Status.Failed > 0 {
// Gather logs for debugging
_, logErr := tc.gatherPodLogs("job-name=parallel-upgrades-checker", false)
if logErr != nil {
log.Printf("warning: unable to gather parallel upgrades checker logs: %v", logErr)
}
jobCompleteErr = fmt.Errorf("parallel upgrades checker job failed with %d failed pods", job.Status.Failed)
return true, jobCompleteErr
}

// Job still running or pending
return false, nil
})

// If wait.Poll returned an error (timeout), check if we have at least one terminal pod
if err != nil {
if err == wait.ErrWaitTimeout {
log.Printf("Timeout waiting for parallel upgrades checker job to complete")
// Fall back to checking if there are any terminal pods
pods, podErr := tc.client.K8s.CoreV1().Pods(tc.workloadNamespace).List(context.TODO(), meta.ListOptions{
LabelSelector: "job-name=parallel-upgrades-checker",
})
if podErr != nil {
return fmt.Errorf("error listing parallel upgrades checker pods: %w", podErr)
}

hasTerminalPod := false
for _, pod := range pods.Items {
if pod.Status.Phase == core.PodSucceeded || pod.Status.Phase == core.PodFailed {
hasTerminalPod = true
break
}
}

if !hasTerminalPod {
return fmt.Errorf("parallel upgrades checker job did not reach terminal state within timeout")
}
} else {
return err
}
}

// If the job failed, return the error
if jobCompleteErr != nil {
return jobCompleteErr
}

// Check for failed pods to see if any violations were detected
failedPods, err := tc.client.K8s.CoreV1().Pods(tc.workloadNamespace).List(context.TODO(), meta.ListOptions{
LabelSelector: "job-name=parallel-upgrades-checker",
FieldSelector: "status.phase=Failed",
})
if err != nil {
return fmt.Errorf("error checking parallel upgrades checker status: %w", err)
}

// Gather logs for debugging if not already done
_, err = tc.gatherPodLogs("job-name=parallel-upgrades-checker", false)
if err != nil {
log.Printf("warning: unable to gather parallel upgrades checker logs: %v", err)
}

// If any pod failed, the check detected a violation
if len(failedPods.Items) > 0 {
return fmt.Errorf("parallel upgrades check failed: MaxParallelUpgrades was violated (%d pods failed)",
len(failedPods.Items))
}

log.Printf("Parallel upgrades checker passed: MaxParallelUpgrades=1 was enforced")
return nil
}

// cleanupParallelUpgradesChecker removes the parallel upgrades checker job
func (tc *testContext) cleanupParallelUpgradesChecker() {
propagationPolicy := meta.DeletePropagationBackground
err := tc.client.K8s.BatchV1().Jobs(tc.workloadNamespace).Delete(context.TODO(),
"parallel-upgrades-checker", meta.DeleteOptions{
PropagationPolicy: &propagationPolicy,
})
if err != nil && !apierrors.IsNotFound(err) {
log.Printf("warning: failed to cleanup parallel upgrades checker job: %v", err)
}
}
16 changes: 16 additions & 0 deletions test/e2e/secrets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,23 @@ func (tc *testContext) testUserData(t *testing.T) {
}

// testUserDataTamper tests if userdata reverts to previous value if updated
// AND verifies MaxParallelUpgrades=1 during userData-triggered deletions
func (tc *testContext) testUserDataTamper(t *testing.T) {
validUserDataSecret, err := tc.client.K8s.CoreV1().Secrets(clusterinfo.MachineAPINamespace).Get(context.TODO(),
secrets.UserDataSecret, meta.GetOptions{})
require.NoError(t, err, "could not find Windows userData secret in required namespace")

// Requires at least 2 Machine nodes to test parallel behavior
require.NoError(t, tc.loadExistingNodes(), "error loading existing nodes")
if len(gc.machineNodes) < 2 {
t.Skip("Test requires at least 2 Machine nodes to verify MaxParallelUpgrades enforcement")
}

// Deploy parallel upgrades checker before triggering userData change
err = tc.deployParallelUpgradesChecker()
require.NoError(t, err, "could not deploy parallel upgrades checker")
defer tc.cleanupParallelUpgradesChecker()

updatedSecret := validUserDataSecret.DeepCopy()
updatedSecret.Data["userData"] = []byte("invalid data")
_, err = tc.client.K8s.CoreV1().Secrets(clusterinfo.MachineAPINamespace).Update(context.TODO(), updatedSecret,
Expand All @@ -93,6 +105,10 @@ func (tc *testContext) testUserDataTamper(t *testing.T) {
// until the Machine is back up.
assert.NoError(t, tc.waitForNewMachineNodes(), "error waiting for Machine nodes to be reconfigured")
assert.NoError(t, tc.waitForValidUserData(validUserDataSecret), "error waiting for valid userdata")

// Verify that MaxParallelUpgrades=1 was enforced throughout the process
err = tc.verifyParallelUpgradesChecker()
assert.NoError(t, err, "MaxParallelUpgrades was violated during userData change")
}

// waitForNewMachineNodes returns an error if waitForConfiguredWindowsNodes returns the same Machine backed nodes
Expand Down