From 120b00706b43fc273302c7b666d230a704472b29 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Tue, 16 Jun 2026 14:30:26 -0700 Subject: [PATCH] Add basic network policy for Ingress to Workers Restrict Ingress to Workers and hence Actors to be only from the Router component. This makes the atecontroller responsible for K8s level network policy Note: this is a belts-and-suspenders model as this applies on the Worker level. This will be refined further with Actor-specific policies. --- .../controllers/networkpolicy_controller.go | 140 ++++++++++++++++++ .../networkpolicy_controller_test.go | 93 ++++++++++++ .../controllers/workerpool_controller_test.go | 8 + cmd/atecontroller/main.go | 8 + manifests/ate-install/generated/role.yaml | 12 ++ 5 files changed, 261 insertions(+) create mode 100644 cmd/atecontroller/internal/controllers/networkpolicy_controller.go create mode 100644 cmd/atecontroller/internal/controllers/networkpolicy_controller_test.go diff --git a/cmd/atecontroller/internal/controllers/networkpolicy_controller.go b/cmd/atecontroller/internal/controllers/networkpolicy_controller.go new file mode 100644 index 000000000..0b980fd73 --- /dev/null +++ b/cmd/atecontroller/internal/controllers/networkpolicy_controller.go @@ -0,0 +1,140 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + + networkingv1 "k8s.io/api/networking/v1" + k8errors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + metav1ac "k8s.io/client-go/applyconfigurations/meta/v1" + networkingv1ac "k8s.io/client-go/applyconfigurations/networking/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" +) + +const networkPolicyFieldOwner = "ate-networkpolicy" + +type NetworkPolicyReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +//+kubebuilder:rbac:groups=ate.dev,resources=workerpools,verbs=get;list;watch +//+kubebuilder:rbac:groups=networking.k8s.io,resources=networkpolicies,verbs=get;list;watch;create;update;patch;delete + +func (r *NetworkPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := log.FromContext(ctx) + + wp := &atev1alpha1.WorkerPool{} + if err := r.Get(ctx, req.NamespacedName, wp); err != nil { + if k8errors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("failed to get worker pool %q: %w", req.NamespacedName, err) + } + + if !wp.GetDeletionTimestamp().IsZero() { + log.Info("WorkerPool is being deleted, NetworkPolicy will be GC'd via OwnerReference", + "namespace", wp.Namespace, + "name", wp.Name) + return ctrl.Result{}, nil + } + + if err := r.reconcileImpl(ctx, wp); err != nil { + log.Error(err, "Failed to reconcile NetworkPolicy") + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} + +func (r *NetworkPolicyReconciler) reconcileImpl(ctx context.Context, wp *atev1alpha1.WorkerPool) error { + log := log.FromContext(ctx) + + npAC := buildNetworkPolicyApplyConfig(wp) + + if err := r.Apply(ctx, npAC, client.FieldOwner(networkPolicyFieldOwner), client.ForceOwnership); err != nil { + return fmt.Errorf("failed to apply NetworkPolicy %s:%s: %w", *npAC.Namespace, *npAC.Name, err) + } + log.Info("reconcileImpl done", + "namespace", *npAC.Namespace, + "name", *npAC.Name) + + return nil +} + +func buildNetworkPolicyApplyConfig(wp *atev1alpha1.WorkerPool) *networkingv1ac.NetworkPolicyApplyConfiguration { + np := networkingv1ac.NetworkPolicy(npName(wp.Name), wp.Namespace). + WithOwnerReferences(metav1ac.OwnerReference(). + WithAPIVersion(atev1alpha1.GroupVersion.String()). + WithKind("WorkerPool"). + WithName(wp.Name). + WithUID(wp.UID). + WithController(true). + WithBlockOwnerDeletion(true)) + + // Ingress policy: only accept connections from the atenet-router, all ports. + np. + WithSpec(networkingv1ac.NetworkPolicySpec(). + WithPodSelector(metav1ac.LabelSelector(). + WithMatchLabels(map[string]string{"ate.dev/worker-pool": wp.Name})). + WithPolicyTypes(networkingv1.PolicyTypeIngress, networkingv1.PolicyTypeEgress). + WithIngress( + networkingv1ac.NetworkPolicyIngressRule(). + WithFrom( + networkingv1ac.NetworkPolicyPeer(). + WithNamespaceSelector(metav1ac.LabelSelector(). + WithMatchLabels(map[string]string{"kubernetes.io/metadata.name": "ate-system"})). + WithPodSelector(metav1ac.LabelSelector(). + WithMatchLabels(map[string]string{"app": "atenet-router"})), + ), + ). + WithEgress( + networkingv1ac.NetworkPolicyEgressRule(), + )) + + // TODO: don't implement any Egress policy yet. + + return np +} + +func npName(wpName string) string { + sum := sha256.Sum256([]byte(wpName)) + hash := hex.EncodeToString(sum[:]) + if len(hash) > 5 { + hash = hash[:5] + } + truncated := wpName + if len(truncated) > 30 { + truncated = truncated[:30] + } + return fmt.Sprintf("substrate-%s-%s", truncated, hash) +} + +func (r *NetworkPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + Named("networkpolicy"). + For(&atev1alpha1.WorkerPool{}). + Owns(&networkingv1.NetworkPolicy{}). + Complete(r) +} diff --git a/cmd/atecontroller/internal/controllers/networkpolicy_controller_test.go b/cmd/atecontroller/internal/controllers/networkpolicy_controller_test.go new file mode 100644 index 000000000..209ac75bb --- /dev/null +++ b/cmd/atecontroller/internal/controllers/networkpolicy_controller_test.go @@ -0,0 +1,93 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "context" + "testing" + + networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/types" +) + +func TestWorkerPoolCreatesNetworkPolicy(t *testing.T) { + wp := makeWorkerPool("test-netpolicy-create", "default", 2, "ateom:v1") + if err := k8sClient.Create(testCtx, wp); err != nil { + t.Fatalf("create WorkerPool: %v", err) + } + t.Cleanup(func() { k8sClient.Delete(testCtx, wp) }) //nolint:errcheck + + eventually(t, func(ctx context.Context) (bool, error) { + npName := npName(wp.Name) + np := &networkingv1.NetworkPolicy{} + err := k8sClient.Get(ctx, types.NamespacedName{Name: npName, Namespace: wp.Namespace}, np) + if err != nil { + return false, nil + } + + // Verify OwnerReference + if len(np.OwnerReferences) == 0 || np.OwnerReferences[0].Name != wp.Name { + return false, nil + } + + // Verify PodSelector matches the worker pool + if np.Spec.PodSelector.MatchLabels == nil || np.Spec.PodSelector.MatchLabels["ate.dev/worker-pool"] != wp.Name { + return false, nil + } + + // Verify PolicyTypes contains Ingress and Egress + hasIngress := false + hasEgress := false + for _, pt := range np.Spec.PolicyTypes { + if pt == networkingv1.PolicyTypeIngress { + hasIngress = true + } + if pt == networkingv1.PolicyTypeEgress { + hasEgress = true + } + } + if !hasIngress || !hasEgress { + return false, nil + } + + // Verify Ingress Rules (Allow only ingress from ATE router) + if len(np.Spec.Ingress) != 1 { + return false, nil + } + ingressRule := np.Spec.Ingress[0] + if len(ingressRule.From) != 1 { + return false, nil + } + fromPeer := ingressRule.From[0] + if fromPeer.NamespaceSelector == nil || fromPeer.NamespaceSelector.MatchLabels["kubernetes.io/metadata.name"] != "ate-system" { + return false, nil + } + if fromPeer.PodSelector == nil || fromPeer.PodSelector.MatchLabels["app"] != "atenet-router" { + return false, nil + } + + // Verify Egress Rules: + // For now, we allow all egress. + if len(np.Spec.Egress) != 1 { + return false, nil + } + egressRule := np.Spec.Egress[0] + if len(egressRule.To) != 0 || len(egressRule.Ports) != 0 { + return false, nil + } + + return true, nil + }) +} diff --git a/cmd/atecontroller/internal/controllers/workerpool_controller_test.go b/cmd/atecontroller/internal/controllers/workerpool_controller_test.go index 6a95b91f2..9be481bb4 100644 --- a/cmd/atecontroller/internal/controllers/workerpool_controller_test.go +++ b/cmd/atecontroller/internal/controllers/workerpool_controller_test.go @@ -95,6 +95,14 @@ func TestMain(m *testing.M) { os.Exit(1) } + if err := (&NetworkPolicyReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + fmt.Fprintf(os.Stderr, "netpolicy controller setup failed: %v\n", err) + os.Exit(1) + } + testCtx, testCancel = context.WithCancel(context.Background()) go func() { _ = mgr.Start(testCtx) diff --git a/cmd/atecontroller/main.go b/cmd/atecontroller/main.go index ab9244cab..c24945f09 100644 --- a/cmd/atecontroller/main.go +++ b/cmd/atecontroller/main.go @@ -79,6 +79,14 @@ func main() { os.Exit(1) } + if err = (&controllers.NetworkPolicyReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "NetPolicy") + os.Exit(1) + } + if err = (&controllers.ActorTemplateReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), diff --git a/manifests/ate-install/generated/role.yaml b/manifests/ate-install/generated/role.yaml index 7341d28dd..48f80e958 100644 --- a/manifests/ate-install/generated/role.yaml +++ b/manifests/ate-install/generated/role.yaml @@ -80,3 +80,15 @@ rules: - get - patch - update +- apiGroups: + - networking.k8s.io + resources: + - networkpolicies + verbs: + - create + - delete + - get + - list + - patch + - update + - watch