diff --git a/controllers/secret_controller.go b/controllers/secret_controller.go index 6e229e039c..89eba8ce95 100644 --- a/controllers/secret_controller.go +++ b/controllers/secret_controller.go @@ -2,6 +2,7 @@ package controllers import ( "context" + "errors" "fmt" "golang.org/x/crypto/ssh" @@ -355,8 +356,24 @@ func (r *SecretReconciler) updateUserData(ctx context.Context, keySigner ssh.Sig nodeconfig.PubKeyHashAnnotation: expectedPubKeyAnno, } } else { - // For Nodes associated with Machines, clear the public key annotation, as the clearing of the - // annotation is used solely to kick off the deletion and recreation of Machines, causing them to be + // For Nodes associated with Machines, check cluster-wide upgrade limit before clearing annotation + + // Check if we can mark this node as upgrading + if err := markNodeAsUpgrading(ctx, r.client, &node); err != nil { + var upgradeErr *UpgradeLimitExceededError + if errors.As(err, &upgradeErr) { + // Upgrade limit reached - defer this node + r.log.Info("deferring userData update due to upgrade limit", + "node", node.GetName(), "reason", upgradeErr.Error()) + // Skip clearing annotation for this node - will retry on next reconcile + continue + } + return err + } + + // Safe to clear annotation now (cluster-wide lock acquired) + // The clearing of the annotation is used solely to kick off the deletion and recreation + // of Machines, causing them to be // provisioned with the new userdata annotationsToApply = map[string]string{nodeconfig.PubKeyHashAnnotation: ""} } diff --git a/controllers/windowsmachine_controller.go b/controllers/windowsmachine_controller.go index 92e1bcaa47..3f1f0e42e9 100644 --- a/controllers/windowsmachine_controller.go +++ b/controllers/windowsmachine_controller.go @@ -280,7 +280,21 @@ func (r *WindowsMachineReconciler) Reconcile(ctx context.Context, // If the private key used to configure the machine is out of date, the machine should be deleted if node.Annotations[nodeconfig.PubKeyHashAnnotation] != nodeconfig.CreatePubKeyHashAnnotation(r.signer.PublicKey()) { - log.Info("deleting machine") + log.Info("deleting machine due to public key mismatch", "machine", machine.Name) + + // Check cluster-wide upgrade limit before deletion (defense in depth) + if err := markNodeAsUpgrading(ctx, r.client, node); err != nil { + var upgradeErr *UpgradeLimitExceededError + if !errors.As(err, &upgradeErr) { + return ctrl.Result{}, err + } + // Upgrade limit reached - log and requeue + r.log.Info(upgradeErr.Error()) + r.recorder.Eventf(machine, core.EventTypeWarning, "MachineDeletionDeferred", + "Machine %v deletion deferred: %v", machine.Name, upgradeErr.Error()) + return ctrl.Result{Requeue: true}, nil + } + deletionAllowed, err := r.isAllowedDeletion(ctx, machine) if err != nil { return ctrl.Result{}, fmt.Errorf("unable to determine if Machine can be deleted: %w", err) diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index 3aa6616c43..ca548789bb 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -4,7 +4,12 @@ import ( "context" "flag" "fmt" + "log" "os" + "os/exec" + "path/filepath" + "runtime" + "strings" "testing" "time" @@ -12,7 +17,9 @@ import ( imageClient "github.com/openshift/client-go/image/clientset/versioned/typed/image/v1" "github.com/openshift/library-go/pkg/image/imageutil" core "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "github.com/openshift/windows-machine-config-operator/pkg/retry" "github.com/openshift/windows-machine-config-operator/test/e2e/clusterinfo" @@ -184,3 +191,167 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } + +// getRepoRoot resolves the repository root directory by walking up from the test file location +func getRepoRoot() (string, error) { + _, currentFile, _, ok := runtime.Caller(1) + if !ok { + return "", fmt.Errorf("failed to get current file location") + } + dir := filepath.Dir(currentFile) + for { + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir, nil + } + parent := filepath.Dir(dir) + if parent == dir { + // Reached filesystem root without finding go.mod + return "", fmt.Errorf("failed to find repository root: could not locate go.mod") + } + dir = parent + } +} + +// deployParallelUpgradesChecker deploys the monitoring job that checks for MaxParallelUpgrades violations +func (tc *testContext) deployParallelUpgradesChecker() error { + // Resolve repo root to handle relative path correctly regardless of test working directory + repoRoot, err := getRepoRoot() + if err != nil { + return fmt.Errorf("failed to resolve repository root: %w", err) + } + + // Read the job YAML template + jobPath := filepath.Join(repoRoot, "hack/e2e/resources/parallel-upgrade-checker-job.yaml") + jobYAML, err := os.ReadFile(jobPath) + if err != nil { + return fmt.Errorf("failed to read parallel upgrades checker job YAML: %w", err) + } + + // Get the tools image from the OpenShift imagestream + toolsImage, err := getOpenShiftToolsImage(tc.client.Images) + if err != nil { + // Fall back to a default tools image if imagestream is not available + log.Printf("Warning: could not get tools image from imagestream: %v. Using default.", err) + toolsImage = "registry.ci.openshift.org/ocp/4.17:tools" + } + + // Replace placeholders + jobYAMLStr := string(jobYAML) + jobYAMLStr = strings.Replace(jobYAMLStr, "REPLACE_WITH_OPENSHIFT_TOOLS_IMAGE", toolsImage, 1) + jobYAMLStr = strings.Replace(jobYAMLStr, "namespace: wmco-test", + fmt.Sprintf("namespace: %s", tc.workloadNamespace), 1) + + // Create the job using kubectl apply + cmd := exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(jobYAMLStr) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to create parallel upgrades checker job: %w\nOutput: %s", err, string(output)) + } + + log.Printf("Deployed parallel upgrades checker job in namespace %s", tc.workloadNamespace) + return nil +} + +// verifyParallelUpgradesChecker checks if the parallel upgrades checker job detected any violations +func (tc *testContext) verifyParallelUpgradesChecker() error { + var jobCompleteErr error + + // Poll the job status until it reaches a terminal state or timeout + err := wait.Poll(tc.retryInterval, tc.timeout, func() (bool, error) { + job, err := tc.client.K8s.BatchV1().Jobs(tc.workloadNamespace).Get(context.TODO(), + "parallel-upgrades-checker", meta.GetOptions{}) + if err != nil { + return false, fmt.Errorf("failed to get parallel upgrades checker job: %w", err) + } + + // Job succeeded + if job.Status.Succeeded > 0 { + log.Printf("Parallel upgrades checker job succeeded with %d completions", job.Status.Succeeded) + return true, nil + } + + // Job failed + if job.Status.Failed > 0 { + // Gather logs for debugging + _, logErr := tc.gatherPodLogs("job-name=parallel-upgrades-checker", false) + if logErr != nil { + log.Printf("warning: unable to gather parallel upgrades checker logs: %v", logErr) + } + jobCompleteErr = fmt.Errorf("parallel upgrades checker job failed with %d failed pods", job.Status.Failed) + return true, jobCompleteErr + } + + // Job still running or pending + return false, nil + }) + + // If wait.Poll returned an error (timeout), check if we have at least one terminal pod + if err != nil { + if err == wait.ErrWaitTimeout { + log.Printf("Timeout waiting for parallel upgrades checker job to complete") + // Fall back to checking if there are any terminal pods + pods, podErr := tc.client.K8s.CoreV1().Pods(tc.workloadNamespace).List(context.TODO(), meta.ListOptions{ + LabelSelector: "job-name=parallel-upgrades-checker", + }) + if podErr != nil { + return fmt.Errorf("error listing parallel upgrades checker pods: %w", podErr) + } + + hasTerminalPod := false + for _, pod := range pods.Items { + if pod.Status.Phase == core.PodSucceeded || pod.Status.Phase == core.PodFailed { + hasTerminalPod = true + break + } + } + + if !hasTerminalPod { + return fmt.Errorf("parallel upgrades checker job did not reach terminal state within timeout") + } + } else { + return err + } + } + + // If the job failed, return the error + if jobCompleteErr != nil { + return jobCompleteErr + } + + // Check for failed pods to see if any violations were detected + failedPods, err := tc.client.K8s.CoreV1().Pods(tc.workloadNamespace).List(context.TODO(), meta.ListOptions{ + LabelSelector: "job-name=parallel-upgrades-checker", + FieldSelector: "status.phase=Failed", + }) + if err != nil { + return fmt.Errorf("error checking parallel upgrades checker status: %w", err) + } + + // Gather logs for debugging if not already done + _, err = tc.gatherPodLogs("job-name=parallel-upgrades-checker", false) + if err != nil { + log.Printf("warning: unable to gather parallel upgrades checker logs: %v", err) + } + + // If any pod failed, the check detected a violation + if len(failedPods.Items) > 0 { + return fmt.Errorf("parallel upgrades check failed: MaxParallelUpgrades was violated (%d pods failed)", + len(failedPods.Items)) + } + + log.Printf("Parallel upgrades checker passed: MaxParallelUpgrades=1 was enforced") + return nil +} + +// cleanupParallelUpgradesChecker removes the parallel upgrades checker job +func (tc *testContext) cleanupParallelUpgradesChecker() { + propagationPolicy := meta.DeletePropagationBackground + err := tc.client.K8s.BatchV1().Jobs(tc.workloadNamespace).Delete(context.TODO(), + "parallel-upgrades-checker", meta.DeleteOptions{ + PropagationPolicy: &propagationPolicy, + }) + if err != nil && !apierrors.IsNotFound(err) { + log.Printf("warning: failed to cleanup parallel upgrades checker job: %v", err) + } +} diff --git a/test/e2e/secrets_test.go b/test/e2e/secrets_test.go index 790e7b8e9f..4ee0dc00f9 100644 --- a/test/e2e/secrets_test.go +++ b/test/e2e/secrets_test.go @@ -78,11 +78,23 @@ func (tc *testContext) testUserData(t *testing.T) { } // testUserDataTamper tests if userdata reverts to previous value if updated +// AND verifies MaxParallelUpgrades=1 during userData-triggered deletions func (tc *testContext) testUserDataTamper(t *testing.T) { validUserDataSecret, err := tc.client.K8s.CoreV1().Secrets(clusterinfo.MachineAPINamespace).Get(context.TODO(), secrets.UserDataSecret, meta.GetOptions{}) require.NoError(t, err, "could not find Windows userData secret in required namespace") + // Requires at least 2 Machine nodes to test parallel behavior + require.NoError(t, tc.loadExistingNodes(), "error loading existing nodes") + if len(gc.machineNodes) < 2 { + t.Skip("Test requires at least 2 Machine nodes to verify MaxParallelUpgrades enforcement") + } + + // Deploy parallel upgrades checker before triggering userData change + err = tc.deployParallelUpgradesChecker() + require.NoError(t, err, "could not deploy parallel upgrades checker") + defer tc.cleanupParallelUpgradesChecker() + updatedSecret := validUserDataSecret.DeepCopy() updatedSecret.Data["userData"] = []byte("invalid data") _, err = tc.client.K8s.CoreV1().Secrets(clusterinfo.MachineAPINamespace).Update(context.TODO(), updatedSecret, @@ -93,6 +105,10 @@ func (tc *testContext) testUserDataTamper(t *testing.T) { // until the Machine is back up. assert.NoError(t, tc.waitForNewMachineNodes(), "error waiting for Machine nodes to be reconfigured") assert.NoError(t, tc.waitForValidUserData(validUserDataSecret), "error waiting for valid userdata") + + // Verify that MaxParallelUpgrades=1 was enforced throughout the process + err = tc.verifyParallelUpgradesChecker() + assert.NoError(t, err, "MaxParallelUpgrades was violated during userData change") } // waitForNewMachineNodes returns an error if waitForConfiguredWindowsNodes returns the same Machine backed nodes