Skip to content
Open
6 changes: 6 additions & 0 deletions pkg/orchestrator/gke/gke_job_orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,12 @@ func TestGeneratePathwaysManifest(t *testing.T) {
`cpu: "8"`,
`memory: "32Gi"`,
"restartStrategy: Recreate",
"privileged: true",
"alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool",
`cpu: "24"`,
`cpu: "2"`,
`memory: "8Gi"`,
"kill -SIGTERM $PID",
}

for _, substr := range expectedSubstrs {
Expand Down
19 changes: 18 additions & 1 deletion pkg/orchestrator/gke/manifest_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,24 @@ func (g *GKEOrchestrator) PrepareManifestOptions(job orchestrator.JobDefinition,

parts := strings.Split(originalAccelType, "-")
instanceType := parts[0]
pathwaysInstanceType := fmt.Sprintf("%s:%s", instanceType, schedOpts.Topology)

// Reuse GCluster's existing GKE accelerator label mapping and algorithmically
// derive the Pathways short platform key to avoid duplicating mapping tables.
gkeLabel := g.GenerateGKENodeSelectorLabel(instanceType)
// Normalize GKE node selector labels to match JAX/Pathways platform keys:
// 1. Map GKE "v5-lite" (TPU v5e) to JAX standard "v5e" (deriving tpuv5e)
// 2. Map GKE "v5p" (TPU v5p) to JAX standard "v5" (deriving tpuv5)
normalizedLabel := gkeLabel
if strings.Contains(gkeLabel, "v5-lite") {
normalizedLabel = strings.ReplaceAll(gkeLabel, "v5-lite", "v5e")
} else if strings.Contains(gkeLabel, "v5p") {
normalizedLabel = strings.ReplaceAll(gkeLabel, "v5p", "v5")
}
pathwaysPlatform := strings.ReplaceAll(normalizedLabel, "-podslice", "")
pathwaysPlatform = strings.ReplaceAll(pathwaysPlatform, "-slice", "")
pathwaysPlatform = strings.ReplaceAll(pathwaysPlatform, "-", "")
Comment thread
SwarnaBharathiMantena marked this conversation as resolved.
Comment thread
SwarnaBharathiMantena marked this conversation as resolved.
Comment thread
SwarnaBharathiMantena marked this conversation as resolved.

pathwaysInstanceType := fmt.Sprintf("%s:%s", pathwaysPlatform, schedOpts.Topology)

opts := ManifestOptions{
IsDynamicSlicing: isDynamicSlicing,
Expand Down
71 changes: 55 additions & 16 deletions pkg/orchestrator/gke/templates/pathways_jobset.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
restartPolicy: Never
{{- if .PriorityClassName }}
priorityClassName: {{.PriorityClassName}}
{{- end }}
{{- if .ServiceAccountName }}
serviceAccountName: {{.ServiceAccountName}}
{{- end }}
Expand All @@ -81,6 +84,13 @@ spec:
{{- range .ProxyArgsList }}
- {{.}}
{{- end }}
env:
- name: PATHWAYS_HEAD
valueFrom:
fieldRef:
fieldPath: metadata.labels['jobset.sigs.k8s.io/coordinator']
- name: ABSL_FLAGS
value: "--pathways_pipe_unreachable_timeout=60s"
{{- if not .Pathways.Headless}}
restartPolicy: Always
{{- end }}
Expand Down Expand Up @@ -117,27 +127,62 @@ spec:
fieldPath: metadata.labels['jobset.sigs.k8s.io/coordinator']
- name: TPU_SKIP_MDS_QUERY
value: "true"
- name: ABSL_FLAGS
value: "--pathways_pipe_unreachable_timeout=60s"
{{- if not .Pathways.Headless}}
restartPolicy: Always
{{- end }}
resources:
limits:
cpu: "8"
memory: "32Gi"
volumeMounts:
- mountPath: /tmp
name: shared-tmp
{{- if not .Pathways.Headless}}
containers:
- name: workload-container
image: {{.FullImageName}}
securityContext:
privileged: true
resources:
limits:
cpu: "24"
memory: "100Gi"
Comment thread
SwarnaBharathiMantena marked this conversation as resolved.
Comment thread
SwarnaBharathiMantena marked this conversation as resolved.
requests:
cpu: "2"
memory: "8Gi"
env:
- name: PATHWAYS_HEAD
valueFrom:
fieldRef:
fieldPath: metadata.labels['jobset.sigs.k8s.io/coordinator']
- name: JAX_PLATFORMS
value: proxy
- name: XCLOUD_ENVIRONMENT
value: GCP
- name: JAX_BACKEND_TARGET
value: grpc://$(PATHWAYS_HEAD):29000
- name: ABSL_FLAGS
value: "--pathways_pipe_unreachable_timeout=60s"
command:
- "/bin/bash"
- "-c"
- |
{{.CommandToRun}}
echo "GCluster Start: $(date)"
_sigterm() {
if [ -n "$PID" ]; then
kill -SIGTERM $PID 2>/dev/null
wait $PID
fi
exit 143
}
Comment thread
SwarnaBharathiMantena marked this conversation as resolved.
trap _sigterm SIGTERM
(
{{.CommandToRun}}
) & PID=$!
wait $PID
EXIT_CODE=$?
echo "GCluster End: $(date)"
exit $EXIT_CODE
Comment thread
SwarnaBharathiMantena marked this conversation as resolved.
{{- if .VolumeMountsYAML }}
volumeMounts:
{{.VolumeMountsYAML}}
{{- end }}
{{- end}}
Expand All @@ -146,14 +191,13 @@ spec:
image: {{.Pathways.ColocatedPythonSidecarImage}}
{{- end}}
volumes:
- name: shared-tmp
hostPath:
path: /tmp
type: DirectoryOrCreate
{{.VolumesYAML}}
- name: worker
replicas: {{.NumSlices}}
template:
metadata:
annotations:
alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool
spec:
completionMode: Indexed
parallelism: {{.NodesPerSlice}}
Expand Down Expand Up @@ -241,15 +285,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.labels['jobset.sigs.k8s.io/coordinator']
- name: ABSL_FLAGS
value: "--pathways_pipe_unreachable_timeout=60s"
{{.ResourcesString}}
volumeMounts:
- name: shared-tmp
mountPath: /tmp
volumes:
- name: shared-tmp
hostPath:
path: /tmp
type: DirectoryOrCreate
{{.VolumesYAML}}
{{- if .NodeSelector }}
nodeSelector:
Expand Down
Loading