diff --git a/pkg/controller/batchrelease/batchrelease_executor.go b/pkg/controller/batchrelease/batchrelease_executor.go index ec74e27b..10d59dd0 100644 --- a/pkg/controller/batchrelease/batchrelease_executor.go +++ b/pkg/controller/batchrelease/batchrelease_executor.go @@ -39,6 +39,7 @@ import ( bgdeplopyment "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/bluegreenstyle/deployment" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle" canarydeployment "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle/deployment" + canarylws "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle/lws" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/cloneset" "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/partitionstyle/daemonset" @@ -228,6 +229,10 @@ func (r *Executor) getReleaseController(release *v1beta1.BatchRelease, newStatus klog.InfoS("Using Deployment canary-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) return canarystyle.NewControlPlane(canarydeployment.NewController, r.client, r.recorder, release, newStatus, targetKey), nil } + if targetRef.APIVersion == util.ControllerLWSKind.GroupVersion().String() && targetRef.Kind == util.ControllerLWSKind.Kind { + klog.InfoS("Using LeaderWorkerSet canary-style release controller for this batch release", "workload name", targetKey.Name, "namespace", targetKey.Namespace) + return canarystyle.NewControlPlane(canarylws.NewController, r.client, r.recorder, release, newStatus, targetKey), nil + } fallthrough case v1beta1.PartitionRollingStyle, "": diff --git a/pkg/controller/batchrelease/control/canarystyle/lws/canary.go b/pkg/controller/batchrelease/control/canarystyle/lws/canary.go new file mode 100644 index 00000000..785f1ce7 --- /dev/null +++ b/pkg/controller/batchrelease/control/canarystyle/lws/canary.go @@ -0,0 +1,266 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lws + +import ( + "context" + "encoding/json" + "fmt" + "sort" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/openkruise/rollouts/api/v1beta1" + batchcontext "github.com/openkruise/rollouts/pkg/controller/batchrelease/context" + "github.com/openkruise/rollouts/pkg/util" + utilclient "github.com/openkruise/rollouts/pkg/util/client" + expectations "github.com/openkruise/rollouts/pkg/util/expectation" +) + +type realCanaryController struct { + canaryInfo *util.WorkloadInfo + canaryObject *unstructured.Unstructured + canaryClient client.Client + objectKey types.NamespacedName + canaryPods []*corev1.Pod +} + +func newCanary(cli client.Client, key types.NamespacedName) realCanaryController { + return realCanaryController{canaryClient: cli, objectKey: key} +} + +func (r *realCanaryController) GetCanaryInfo() *util.WorkloadInfo { + return r.canaryInfo +} + +// Delete removes finalizers from canary LeaderWorkerSets +func (r *realCanaryController) Delete(release *v1beta1.BatchRelease) error { + lwsList, err := r.listLeaderWorkerSet(release, client.InNamespace(r.objectKey.Namespace), utilclient.DisableDeepCopy) + if err != nil { + return err + } + + for _, lws := range lwsList { + if !controllerutil.ContainsFinalizer(lws, util.CanaryDeploymentFinalizer) { + continue + } + err = util.UpdateFinalizer(r.canaryClient, lws, util.RemoveFinalizerOpType, util.CanaryDeploymentFinalizer) + if err != nil && !errors.IsNotFound(err) { + return err + } + klog.Infof("Successfully remove finalizers for LeaderWorkerSet %v", klog.KObj(lws)) + } + return nil +} + +func (r *realCanaryController) UpgradeBatch(ctx *batchcontext.BatchContext) error { + desired := ctx.DesiredUpdatedReplicas + if r.canaryInfo.Replicas >= desired { + return nil + } + + body := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desired) + lws := &unstructured.Unstructured{} + lws.SetGroupVersionKind(util.ControllerLWSKind) + lws.SetName(r.canaryObject.GetName()) + lws.SetNamespace(r.canaryObject.GetNamespace()) + + return r.canaryClient.Patch(context.TODO(), lws, client.RawPatch(types.StrategicMergePatchType, []byte(body))) +} + +func (r *realCanaryController) Create(release *v1beta1.BatchRelease) error { + if r.canaryObject != nil { + return nil // Don't re-create if exists + } + + // check expectation before creating canary LWS + controllerKey := client.ObjectKeyFromObject(release).String() + satisfied, timeoutDuration, rest := expectations.ResourceExpectations.SatisfiedExpectations(controllerKey) + if !satisfied { + if timeoutDuration >= expectations.ExpectationTimeout { + klog.Warningf("Unsatisfied time of expectation exceeds %v, delete key and continue, key: %v, rest: %v", + expectations.ExpectationTimeout, klog.KObj(release), rest) + expectations.ResourceExpectations.DeleteExpectations(controllerKey) + } else { + return fmt.Errorf("expectation is not satisfied, key: %v, rest: %v", klog.KObj(release), rest) + } + } + + // fetch the stable LWS as template to create canary LWS + stable := &unstructured.Unstructured{} + stable.SetGroupVersionKind(util.ControllerLWSKind) + if err := r.canaryClient.Get(context.TODO(), r.objectKey, stable); err != nil { + return err + } + return r.create(release, stable) +} + +func (r *realCanaryController) create(release *v1beta1.BatchRelease, template *unstructured.Unstructured) error { + canary := template.DeepCopy() + + // Set metadata + canary.SetGenerateName(fmt.Sprintf("%v-", r.objectKey.Name)) + canary.SetName("") // Clear name to use GenerateName + canary.SetNamespace(r.objectKey.Namespace) + canary.SetLabels(map[string]string{}) + canary.SetAnnotations(map[string]string{}) + + // Add finalizer + finalizers := []string{util.CanaryDeploymentFinalizer} + canary.SetFinalizers(finalizers) + + // Set owner reference + ownerRef := metav1.NewControllerRef(release, release.GroupVersionKind()) + canary.SetOwnerReferences([]metav1.OwnerReference{*ownerRef}) + + // Add labels + labels := canary.GetLabels() + if labels == nil { + labels = make(map[string]string) + } + labels[util.CanaryDeploymentLabel] = template.GetName() + canary.SetLabels(labels) + + // Add annotations + ownerInfo, _ := json.Marshal(ownerRef) + annotations := canary.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[util.BatchReleaseControlAnnotation] = string(ownerInfo) + canary.SetAnnotations(annotations) + + // Copy spec and patch pod template metadata if needed + if release.Spec.ReleasePlan.PatchPodTemplateMetadata != nil { + patch := release.Spec.ReleasePlan.PatchPodTemplateMetadata + templateObj, found, _ := unstructured.NestedMap(canary.Object, "spec", "template") + if found { + podTemplate := &corev1.PodTemplateSpec{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(templateObj, podTemplate); err == nil { + // Apply label patches + if podTemplate.Labels == nil { + podTemplate.Labels = make(map[string]string) + } + for k, v := range patch.Labels { + podTemplate.Labels[k] = v + } + // Apply annotation patches + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + for k, v := range patch.Annotations { + podTemplate.Annotations[k] = v + } + // Convert back to unstructured + updatedTemplate, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplate) + unstructured.SetNestedMap(canary.Object, updatedTemplate, "spec", "template") + } + } + } + + // Set initial replicas to 0 + unstructured.SetNestedField(canary.Object, int64(0), "spec", "replicas") + + // Remove paused if it exists + unstructured.RemoveNestedField(canary.Object, "spec", "paused") + + if err := r.canaryClient.Create(context.TODO(), canary); err != nil { + klog.Errorf("Failed to create canary LeaderWorkerSet(%v), error: %v", klog.KObj(canary), err) + return err + } + + // add expect to avoid to create repeatedly + controllerKey := client.ObjectKeyFromObject(release).String() + expectations.ResourceExpectations.Expect(controllerKey, expectations.Create, string(canary.GetUID())) + + canaryInfo, _ := json.Marshal(canary) + klog.Infof("Create canary LeaderWorkerSet(%v) successfully, details: %s", klog.KObj(canary), string(canaryInfo)) + return fmt.Errorf("created canary LeaderWorkerSet %v succeeded, but waiting informer synced", klog.KObj(canary)) +} + +func (r *realCanaryController) listLeaderWorkerSet(release *v1beta1.BatchRelease, options ...client.ListOption) ([]*unstructured.Unstructured, error) { + lwsList := &unstructured.UnstructuredList{} + lwsList.SetGroupVersionKind(util.ControllerLWSKind.GroupVersion().WithKind("LeaderWorkerSetList")) + + if err := r.canaryClient.List(context.TODO(), lwsList, options...); err != nil { + return nil, err + } + + var lwsObjects []*unstructured.Unstructured + for i := range lwsList.Items { + lws := &lwsList.Items[i] + // Only include LWS objects that match our GroupVersionKind + if lws.GroupVersionKind() != util.ControllerLWSKind { + continue + } + o := metav1.GetControllerOf(lws) + if o == nil || o.UID != release.UID { + continue + } + lwsObjects = append(lwsObjects, lws) + } + return lwsObjects, nil +} + +// filterCanaryLWS returns the latest LeaderWorkerSet with matching template +func filterCanaryLWS(release *v1beta1.BatchRelease, lwsList []*unstructured.Unstructured, template *corev1.PodTemplateSpec) *unstructured.Unstructured { + if len(lwsList) == 0 { + return nil + } + + sort.Slice(lwsList, func(i, j int) bool { + return lwsList[i].GetCreationTimestamp().After(lwsList[j].GetCreationTimestamp().Time) + }) + + if template == nil { + return lwsList[0] + } + + var ignoreLabels, ignoreAnno []string + if release.Spec.ReleasePlan.PatchPodTemplateMetadata != nil { + patch := release.Spec.ReleasePlan.PatchPodTemplateMetadata + for k := range patch.Labels { + ignoreLabels = append(ignoreLabels, k) + } + for k := range patch.Annotations { + ignoreAnno = append(ignoreAnno, k) + } + } + + for _, lws := range lwsList { + templateObj, found, _ := unstructured.NestedMap(lws.Object, "spec", "template") + if !found { + continue + } + lwsTemplate := &corev1.PodTemplateSpec{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(templateObj, lwsTemplate); err != nil { + continue + } + if util.EqualIgnoreSpecifyMetadata(template, lwsTemplate, ignoreLabels, ignoreAnno) { + return lws + } + } + return nil +} diff --git a/pkg/controller/batchrelease/control/canarystyle/lws/control.go b/pkg/controller/batchrelease/control/canarystyle/lws/control.go new file mode 100644 index 00000000..4a4ce609 --- /dev/null +++ b/pkg/controller/batchrelease/control/canarystyle/lws/control.go @@ -0,0 +1,195 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lws + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/openkruise/rollouts/api/v1beta1" + batchcontext "github.com/openkruise/rollouts/pkg/controller/batchrelease/context" + "github.com/openkruise/rollouts/pkg/controller/batchrelease/control" + "github.com/openkruise/rollouts/pkg/controller/batchrelease/control/canarystyle" + "github.com/openkruise/rollouts/pkg/util" + utilclient "github.com/openkruise/rollouts/pkg/util/client" +) + +type realController struct { + realStableController + realCanaryController +} + +func NewController(cli client.Client, key types.NamespacedName) canarystyle.Interface { + return &realController{ + realStableController: newStable(cli, key), + realCanaryController: newCanary(cli, key), + } +} + +func (rc *realController) BuildStableController() (canarystyle.StableInterface, error) { + if rc.stableObject != nil { + return rc, nil + } + + object := &unstructured.Unstructured{} + object.SetGroupVersionKind(util.ControllerLWSKind) + err := rc.stableClient.Get(context.TODO(), rc.stableKey, object) + if err != nil { + return rc, err + } + rc.stableObject = object + rc.stableInfo = parseWorkload(object) + return rc, nil +} + +func (rc *realController) BuildCanaryController(release *v1beta1.BatchRelease) (canarystyle.CanaryInterface, error) { + if rc.canaryObject != nil { + return rc, nil + } + + lwsList, err := rc.listLeaderWorkerSet(release, client.InNamespace(rc.stableKey.Namespace), utilclient.DisableDeepCopy) + if err != nil { + return rc, err + } + + template, err := rc.getLatestTemplate() + if client.IgnoreNotFound(err) != nil { + return rc, err + } + rc.canaryObject = filterCanaryLWS(release, lwsList, template) + if rc.canaryObject == nil { + return rc, control.GenerateNotFoundError(fmt.Sprintf("%v-canary", rc.stableKey), "LeaderWorkerSet") + } + + rc.canaryInfo = parseWorkload(rc.canaryObject) + return rc, nil +} + +func (rc *realController) CalculateBatchContext(release *v1beta1.BatchRelease) (*batchcontext.BatchContext, error) { + rolloutID := release.Spec.ReleasePlan.RolloutID + if rolloutID != "" { + if _, err := rc.ListOwnedPods(); err != nil { + return nil, err + } + } + + replicas, _, _ := unstructured.NestedInt64(rc.stableObject.Object, "spec", "replicas") + currentBatch := release.Status.CanaryStatus.CurrentBatch + desiredUpdate := int32(control.CalculateBatchReplicas(release, int(replicas), int(currentBatch))) + + updatedReplicas, _, _ := unstructured.NestedInt64(rc.canaryObject.Object, "status", "replicas") + updatedReadyReplicas, _, _ := unstructured.NestedInt64(rc.canaryObject.Object, "status", "readyReplicas") + + return &batchcontext.BatchContext{ + Pods: rc.canaryPods, + RolloutID: rolloutID, + Replicas: int32(replicas), + UpdateRevision: release.Status.UpdateRevision, + CurrentBatch: currentBatch, + DesiredUpdatedReplicas: desiredUpdate, + FailureThreshold: release.Spec.ReleasePlan.FailureThreshold, + UpdatedReplicas: int32(updatedReplicas), + UpdatedReadyReplicas: int32(updatedReadyReplicas), + }, nil +} + +func (rc *realController) getLatestTemplate() (*corev1.PodTemplateSpec, error) { + _, err := rc.BuildStableController() + if err != nil { + return nil, err + } + + templateObj, found, _ := unstructured.NestedMap(rc.stableObject.Object, "spec", "template") + if !found { + return nil, fmt.Errorf("spec.template not found in LeaderWorkerSet") + } + + podTemplate := &corev1.PodTemplateSpec{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(templateObj, podTemplate); err != nil { + return nil, fmt.Errorf("failed to convert template: %w", err) + } + + return podTemplate, nil +} + +func (rc *realController) ListOwnedPods() ([]*corev1.Pod, error) { + if rc.canaryPods != nil { + return rc.canaryPods, nil + } + var err error + rc.canaryPods, err = util.ListOwnedPods(rc.canaryClient, rc.canaryObject) + return rc.canaryPods, err +} + +// parseWorkload extracts WorkloadInfo from an unstructured LeaderWorkerSet +func parseWorkload(lws *unstructured.Unstructured) *util.WorkloadInfo { + // Use the existing ParseWorkload utility if it supports unstructured objects + // Otherwise construct manually + replicas, _, _ := unstructured.NestedInt64(lws.Object, "spec", "replicas") + statusReplicas, _, _ := unstructured.NestedInt64(lws.Object, "status", "replicas") + readyReplicas, _, _ := unstructured.NestedInt64(lws.Object, "status", "readyReplicas") + updatedReplicas, _, _ := unstructured.NestedInt64(lws.Object, "status", "updatedReplicas") + updatedReadyReplicas, _, _ := unstructured.NestedInt64(lws.Object, "status", "updatedReadyReplicas") + availableReplicas, _, _ := unstructured.NestedInt64(lws.Object, "status", "availableReplicas") + observedGen, _, _ := unstructured.NestedInt64(lws.Object, "status", "observedGeneration") + + // Extract pod template for hash calculation + templateObj, found, _ := unstructured.NestedMap(lws.Object, "spec", "template") + var updateRevision string + if found { + podTemplate := &corev1.PodTemplateSpec{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(templateObj, podTemplate); err == nil { + updateRevision = util.ComputeHash(podTemplate, nil) + } + } + + stableRevision, _, _ := unstructured.NestedString(lws.Object, "status", "stableRevision") + + return &util.WorkloadInfo{ + TypeMeta: metav1.TypeMeta{ + APIVersion: lws.GetAPIVersion(), + Kind: lws.GetKind(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: lws.GetName(), + Namespace: lws.GetNamespace(), + UID: lws.GetUID(), + Generation: lws.GetGeneration(), + Labels: lws.GetLabels(), + Annotations: lws.GetAnnotations(), + }, + LogKey: fmt.Sprintf("%s/%s (%s)", lws.GetNamespace(), lws.GetName(), lws.GroupVersionKind()), + Replicas: int32(replicas), + Status: util.WorkloadStatus{ + Replicas: int32(statusReplicas), + ReadyReplicas: int32(readyReplicas), + UpdatedReplicas: int32(updatedReplicas), + UpdatedReadyReplicas: int32(updatedReadyReplicas), + AvailableReplicas: int32(availableReplicas), + ObservedGeneration: observedGen, + UpdateRevision: updateRevision, + StableRevision: stableRevision, + }, + } +} diff --git a/pkg/controller/batchrelease/control/canarystyle/lws/stable.go b/pkg/controller/batchrelease/control/canarystyle/lws/stable.go new file mode 100644 index 00000000..22a5dc18 --- /dev/null +++ b/pkg/controller/batchrelease/control/canarystyle/lws/stable.go @@ -0,0 +1,111 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lws + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/openkruise/rollouts/api/v1beta1" + "github.com/openkruise/rollouts/pkg/controller/batchrelease/control" + "github.com/openkruise/rollouts/pkg/util" +) + +type realStableController struct { + stableInfo *util.WorkloadInfo + stableObject *unstructured.Unstructured + stableClient client.Client + stableKey types.NamespacedName +} + +func newStable(cli client.Client, key types.NamespacedName) realStableController { + return realStableController{stableClient: cli, stableKey: key} +} + +func (rc *realStableController) GetStableInfo() *util.WorkloadInfo { + return rc.stableInfo +} + +func (rc *realStableController) Initialize(release *v1beta1.BatchRelease) error { + if rc.stableObject == nil { + return nil + } + + if control.IsControlledByBatchRelease(release, rc.stableObject) { + return nil + } + + owner := control.BuildReleaseControlInfo(release) + body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, util.BatchReleaseControlAnnotation, owner) + + lws := &unstructured.Unstructured{} + lws.SetGroupVersionKind(util.ControllerLWSKind) + lws.SetName(rc.stableKey.Name) + lws.SetNamespace(rc.stableKey.Namespace) + + return rc.stableClient.Patch(context.TODO(), lws, client.RawPatch(types.StrategicMergePatchType, []byte(body))) +} + +func (rc *realStableController) Finalize(release *v1beta1.BatchRelease) error { + if rc.stableObject == nil { + return nil // no need to process deleted object + } + + // if batchPartition == nil, workload should be promoted + pause := release.Spec.ReleasePlan.BatchPartition != nil + body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}},"spec":{"paused":%v}}`, + util.BatchReleaseControlAnnotation, pause) + + lws := &unstructured.Unstructured{} + lws.SetGroupVersionKind(util.ControllerLWSKind) + lws.SetName(rc.stableKey.Name) + lws.SetNamespace(rc.stableKey.Namespace) + + if err := rc.stableClient.Patch(context.TODO(), lws, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { + return err + } + + if control.ShouldWaitResume(release) { + return waitAllUpdatedAndReady(lws) + } + return nil +} + +func waitAllUpdatedAndReady(lws *unstructured.Unstructured) error { + paused, found, _ := unstructured.NestedBool(lws.Object, "spec", "paused") + if found && paused { + return fmt.Errorf("promote error: LeaderWorkerSet should not be paused") + } + + createdReplicas, _, _ := unstructured.NestedInt64(lws.Object, "status", "replicas") + updatedReplicas, _, _ := unstructured.NestedInt64(lws.Object, "status", "updatedReplicas") + if createdReplicas != updatedReplicas { + return fmt.Errorf("promote error: all replicas should be upgraded") + } + + availableReplicas, _, _ := unstructured.NestedInt64(lws.Object, "status", "readyReplicas") + // For LWS, we use a simple check - all replicas should be ready + // This can be enhanced based on LWS-specific requirements + if availableReplicas < createdReplicas { + return fmt.Errorf("promote error: all replicas should be ready") + } + return nil +} diff --git a/pkg/controller/rollout/rollout_controller.go b/pkg/controller/rollout/rollout_controller.go index 926ec142..26fd4328 100755 --- a/pkg/controller/rollout/rollout_controller.go +++ b/pkg/controller/rollout/rollout_controller.go @@ -59,6 +59,7 @@ func init() { watchedWorkload.LoadOrStore(util.ControllerKruiseKindSts.String(), struct{}{}) watchedWorkload.LoadOrStore(util.ControllerKruiseOldKindSts.String(), struct{}{}) watchedWorkload.LoadOrStore(util.ControllerKruiseKindDS.String(), struct{}{}) + watchedWorkload.LoadOrStore(util.ControllerLWSKind.String(), struct{}{}) // LeaderWorkerSet } // RolloutReconciler reconciles a Rollout object diff --git a/pkg/util/controller_finder.go b/pkg/util/controller_finder.go index a7e78daf..112fcbb6 100644 --- a/pkg/util/controller_finder.go +++ b/pkg/util/controller_finder.go @@ -18,14 +18,18 @@ package util import ( "context" + "fmt" "sort" "strings" appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" apps "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" @@ -109,7 +113,7 @@ func (r *ControllerFinder) GetWorkloadForRef(rollout *rolloutv1beta1.Rollout) (* } func (r *ControllerFinder) canaryStyleFinders() []ControllerFinderFunc { - return []ControllerFinderFunc{r.getDeployment} + return []ControllerFinderFunc{r.getDeployment, r.getLeaderWorkerSet} } // Note: getStatefulSetLikeWorkload is placed last because it has broader matching criteria. @@ -132,6 +136,9 @@ var ( ControllerKruiseKindDS = appsv1alpha1.SchemeGroupVersion.WithKind("DaemonSet") ControllerKruiseKindSts = appsv1beta1.SchemeGroupVersion.WithKind("StatefulSet") ControllerKruiseOldKindSts = appsv1alpha1.SchemeGroupVersion.WithKind("StatefulSet") + // LeaderWorkerSet from kubernetes-sigs/lws + // API Group: workload.kubernetes.io, Version: v1alpha1, Kind: LeaderWorkerSet + ControllerLWSKind = schema.GroupVersionKind{Group: "workload.kubernetes.io", Version: "v1alpha1", Kind: "LeaderWorkerSet"} ) // getKruiseCloneSet returns the kruise cloneSet referenced by the provided controllerRef. @@ -409,6 +416,90 @@ func (r *ControllerFinder) getDeployment(namespace string, ref *rolloutv1beta1.O return workload, err } +// getLeaderWorkerSet returns the LeaderWorkerSet referenced by the provided controllerRef. +// LeaderWorkerSet is used for AI inference workloads with leader-worker pattern. +func (r *ControllerFinder) getLeaderWorkerSet(namespace string, ref *rolloutv1beta1.ObjectRef) (*Workload, error) { + // Verify this is a LeaderWorkerSet + ok, _ := verifyGroupKind(ref, ControllerLWSKind.Kind, []string{ControllerLWSKind.Group}) + if !ok { + return nil, nil + } + + // Use unstructured to handle LWS since we may not have the typed client + lws := &unstructured.Unstructured{} + lws.SetGroupVersionKind(ControllerLWSKind) + err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: ref.Name}, lws) + if err != nil { + if errors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + + // Check generation consistency + observedGen, found, _ := unstructured.NestedInt64(lws.Object, "status", "observedGeneration") + generation := lws.GetGeneration() + if !found || generation != observedGen { + return &Workload{IsStatusConsistent: false}, nil + } + + // Extract replicas - LWS typically has replicas in spec + replicas, found, _ := unstructured.NestedInt64(lws.Object, "spec", "replicas") + if !found { + replicas = 0 + } + + // Extract pod template for hash calculation + templateObj, found, _ := unstructured.NestedMap(lws.Object, "spec", "template") + if !found { + return &Workload{IsStatusConsistent: false}, fmt.Errorf("spec.template not found in LeaderWorkerSet") + } + + // Create pod template from unstructured + podTemplate := &corev1.PodTemplateSpec{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(templateObj, podTemplate); err != nil { + return &Workload{IsStatusConsistent: false}, fmt.Errorf("failed to convert template: %w", err) + } + + canaryRevision := ComputeHash(podTemplate, nil) + + workload := &Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: lws.GetName(), + Namespace: lws.GetNamespace(), + UID: lws.GetUID(), + Labels: lws.GetLabels(), + Annotations: lws.GetAnnotations(), + }, + TypeMeta: metav1.TypeMeta{ + APIVersion: lws.GetAPIVersion(), + Kind: lws.GetKind(), + }, + Replicas: int32(replicas), + IsStatusConsistent: true, + CanaryRevision: canaryRevision, + RevisionLabelKey: "workload.kubernetes.io/template-hash", // Custom revision label key for LWS + } + + // Try to extract stable revision from status if available + if stableRev, found, _ := unstructured.NestedString(lws.Object, "status", "stableRevision"); found { + workload.StableRevision = stableRev + } + + // Check if in rollout progressing + if _, ok := workload.Annotations[InRolloutProgressingAnnotation]; !ok { + return workload, nil + } + + workload.InRolloutProgressing = true + // Check for rollback condition + if workload.StableRevision != "" && workload.StableRevision == canaryRevision { + workload.IsInRollback = true + } + + return workload, nil +} + func (r *ControllerFinder) getStatefulSetLikeWorkload(namespace string, ref *rolloutv1beta1.ObjectRef) (*Workload, error) { if ref == nil { return nil, nil diff --git a/pkg/util/workloads_utils.go b/pkg/util/workloads_utils.go index b768bc90..538136e7 100644 --- a/pkg/util/workloads_utils.go +++ b/pkg/util/workloads_utils.go @@ -58,6 +58,7 @@ var ( &ControllerKruiseKindSts, &ControllerKruiseOldKindSts, &ControllerKruiseKindDS, + &ControllerLWSKind, // LeaderWorkerSet for AI inference workloads } ) @@ -245,6 +246,10 @@ func GetEmptyWorkloadObject(gvk schema.GroupVersionKind) client.Object { return &apps.DaemonSet{} case ControllerKruiseKindSts, ControllerKruiseOldKindSts: return &appsv1beta1.StatefulSet{} + case ControllerLWSKind: + unstructuredObject := &unstructured.Unstructured{} + unstructuredObject.SetGroupVersionKind(gvk) + return unstructuredObject default: unstructuredObject := &unstructured.Unstructured{} unstructuredObject.SetGroupVersionKind(gvk)