Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 195 additions & 40 deletions robusta_krr/core/integrations/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.object_like_dict import ObjectLikeDict


class LightweightJobInfo:
"""Lightweight job object containing only the fields needed for GroupedJob processing."""
def __init__(self, name: str, namespace: str):
self.name = name
self.namespace = namespace


from . import config_patch as _

logger = logging.getLogger("krr")
Expand Down Expand Up @@ -261,6 +269,82 @@ def _should_list_resource(self, resource: str) -> bool:
return True
return resource in settings.resources

def _is_job_owned_by_cronjob(self, job: V1Job) -> bool:
"""Check if a job is owned by a CronJob."""
return any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])

def _is_job_grouped(self, job: V1Job) -> bool:
"""Check if a job has any of the grouping labels."""
if not settings.job_grouping_labels or not job.metadata.labels:
return False
return any(label in job.metadata.labels for label in settings.job_grouping_labels)

async def _list_namespaced_or_global_objects_batched(
self,
kind: KindLiteral,
all_namespaces_request: Callable,
namespaced_request: Callable,
namespace: str,
limit: Optional[int] = None,
continue_ref: Optional[str] = None,
) -> tuple[list[Any], Optional[str]]:
logger.debug("Listing %s in %s with batching (limit=%d)", kind, self.cluster, limit)
loop = asyncio.get_running_loop()

try:
if namespace == "*":
requests = [
loop.run_in_executor(
self.executor,
lambda: all_namespaces_request(
watch=False,
label_selector=settings.selector,
limit=limit,
_continue=continue_ref,
),
)
]
else:
requests = [
loop.run_in_executor(
self.executor,
lambda: namespaced_request(
namespace=namespace,
watch=False,
label_selector=settings.selector,
limit=limit,
_continue=continue_ref,
),
) ]

gathered_results = await asyncio.gather(*requests)

result = [
item
for request_result in gathered_results
for item in request_result.items
]

next_continue_ref = None
if gathered_results:
next_continue_ref = getattr(gathered_results[0].metadata, '_continue', None)

return result, next_continue_ref

except ApiException as e:
if e.status == 410 and e.body:
# Continue token expired
import json
try:
error_body = json.loads(e.body)
new_continue_token = error_body.get("metadata", {}).get("continue")
if new_continue_token:
logger.info("Continue token expired for jobs listing. Continuing")
return [], new_continue_token
except (json.JSONDecodeError, KeyError):
pass
raise

async def _list_namespaced_or_global_objects(
self,
kind: KindLiteral,
Expand Down Expand Up @@ -458,26 +542,57 @@ def _list_all_daemon_set(self) -> list[K8sObjectData]:
extract_containers=lambda item: item.spec.template.spec.containers,
)

def _list_all_jobs(self) -> list[K8sObjectData]:
def filter_jobs(item):
# Skip jobs owned by CronJobs
if any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []):
return False

async def _list_all_jobs(self) -> list[K8sObjectData]:
"""List all jobs using batched loading with 500 batch size."""
if not self._should_list_resource("Job"):
return []

namespaces = self.namespaces if self.namespaces != "*" else ["*"]
all_jobs = []
try:
batch_count = 0
for namespace in namespaces:
continue_ref: Optional[str] = None
while batch_count < settings.discovery_job_max_batches:
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
namespace=namespace,
limit=settings.discovery_job_batch_size,
continue_ref=continue_ref,
)
continue_ref = next_continue_ref

if not jobs_batch and continue_ref:
# refreshed continue token, count error batches
batch_count += 1
continue
if not jobs_batch:
# no more jobs to batch do not count empty batches
break

batch_count += 1
for job in jobs_batch:
if self._is_job_owned_by_cronjob(job):
continue
if self._is_job_grouped(job):
continue
for container in job.spec.template.spec.containers:
all_jobs.append(self.__build_scannable_object(job, container, "Job"))
if not continue_ref:
break

# Skip jobs that have any of the grouping labels (they will be handled by GroupedJob)
if settings.job_grouping_labels and item.metadata.labels:
if any(label in item.metadata.labels for label in settings.job_grouping_labels):
return False
logger.debug("Found %d regular jobs", len(all_jobs))
return all_jobs

return True

return self._list_scannable_objects(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
extract_containers=lambda item: item.spec.template.spec.containers,
filter_workflows=filter_jobs,
)
except Exception as e:
logger.error(
"Failed to run jobs discovery",
exc_info=True,
)
return all_jobs
Comment thread
Avi-Robusta marked this conversation as resolved.

def _list_all_cronjobs(self) -> list[K8sObjectData]:
return self._list_scannable_objects(
Expand All @@ -497,42 +612,82 @@ async def _list_all_groupedjobs(self) -> list[K8sObjectData]:
logger.debug("Skipping GroupedJob in cluster")
return []

logger.debug(f"Listing GroupedJobs with grouping labels: {settings.job_grouping_labels}")

# Get all jobs that have any of the grouping labels
all_jobs = await self._list_namespaced_or_global_objects(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
)
logger.debug("Listing GroupedJobs with grouping labels: %s", settings.job_grouping_labels)

grouped_jobs = defaultdict(list)
for job in all_jobs:
if (job.metadata.labels and
not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])):

for label_name in settings.job_grouping_labels:
if label_name in job.metadata.labels:
label_value = job.metadata.labels[label_name]
group_key = f"{label_name}={label_value}"
grouped_jobs[group_key].append(job)
grouped_jobs_template = {} # Store only ONE full job as template per group - needed for class K8sObjectData
continue_ref: Optional[str] = None
batch_count = 0
namespaces = self.namespaces if self.namespaces != "*" else ["*"]
try:
batch_count = 0
for namespace in namespaces:
continue_ref = None
while batch_count < settings.discovery_job_max_batches:
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
namespace=namespace,
limit=settings.discovery_job_batch_size,
continue_ref=continue_ref,
)

continue_ref = next_continue_ref

if not jobs_batch and continue_ref:
# refreshed continue token, count error batches
batch_count += 1
continue
if not jobs_batch:
# no more jobs to batch do not count empty batches
break

batch_count += 1
for job in jobs_batch:
if not job.metadata.labels or self._is_job_owned_by_cronjob(job) or not self._is_job_grouped(job):
continue
for label_name in settings.job_grouping_labels:
if label_name not in job.metadata.labels:
continue
# label_name is value of grouped job label
label_value = job.metadata.labels[label_name]
group_key = f"{label_name}={label_value}"
lightweight_job = LightweightJobInfo(
name=job.metadata.name,
namespace=job.metadata.namespace
)
# Store lightweight job info only for grouped jobs
grouped_jobs[group_key].append(lightweight_job)
# Keep only ONE full job as template per group
if group_key not in grouped_jobs_template:
grouped_jobs_template[group_key] = job
if not continue_ref:
break

except Exception as e:
logger.error(
"Failed to run grouped jobs discovery",
exc_info=True,
)
raise
Comment thread
Avi-Robusta marked this conversation as resolved.

result = []
for group_name, jobs in grouped_jobs.items():
template_job = grouped_jobs_template[group_name]

jobs_by_namespace = defaultdict(list)
for job in jobs:
jobs_by_namespace[job.metadata.namespace].append(job)
jobs_by_namespace[job.namespace].append(job)

for namespace, namespace_jobs in jobs_by_namespace.items():
limited_jobs = namespace_jobs[:settings.job_grouping_limit]

container_names = set()
for job in limited_jobs:
for container in job.spec.template.spec.containers:
container_names.add(container.name)
for container in template_job.spec.template.spec.containers:
container_names.add(container.name)

for container_name in container_names:
template_job = limited_jobs[0]
template_container = None
for container in template_job.spec.template.spec.containers:
if container.name == container_name:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodD
del jobs
elif object.kind == "GroupedJob":
if hasattr(object._api_resource, '_grouped_jobs'):
pod_owners = [job.metadata.name for job in object._api_resource._grouped_jobs]
pod_owners = [job.name for job in object._api_resource._grouped_jobs]
pod_owner_kind = "Job"
else:
pod_owners = [object.name]
Expand Down
4 changes: 4 additions & 0 deletions robusta_krr/core/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class Config(pd.BaseSettings):
# Threading settings
max_workers: int = pd.Field(6, ge=1)

# Discovery settings
discovery_job_batch_size: int = pd.Field(5000, ge=1, description="Batch size for Kubernetes job API calls")
discovery_job_max_batches: int = pd.Field(100, ge=1, description="Maximum number of job batches to process to prevent infinite loops")

# Job grouping settings
job_grouping_labels: Union[list[str], str, None] = pd.Field(None, description="Label name(s) to use for grouping jobs into GroupedJob workload type")
job_grouping_limit: int = pd.Field(500, ge=1, description="Maximum number of jobs/pods to query per GroupedJob group")
Expand Down
14 changes: 14 additions & 0 deletions robusta_krr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,18 @@ def run_strategy(
help="Maximum number of jobs/pods to query per GroupedJob group (default: 500).",
rich_help_panel="Job Grouping Settings",
),
discovery_job_batch_size: int = typer.Option(
5000,
"--discovery-job-batch-size",
help="Batch size for Kubernetes job API calls (default: 5000).",
rich_help_panel="Job Discovery Settings",
),
discovery_job_max_batches: int = typer.Option(
100,
"--discovery-job-max-batches",
help="Maximum number of job batches to process to prevent infinite loops (default: 100).",
rich_help_panel="Job Discovery Settings",
),
Comment thread
Avi-Robusta marked this conversation as resolved.
format: str = typer.Option(
"table",
"--formatter",
Expand Down Expand Up @@ -371,6 +383,8 @@ def run_strategy(
max_workers=max_workers,
job_grouping_labels=job_grouping_labels,
job_grouping_limit=job_grouping_limit,
discovery_job_batch_size=discovery_job_batch_size,
discovery_job_max_batches=discovery_job_max_batches,
format=format,
show_cluster_name=show_cluster_name,
verbose=verbose,
Expand Down
Loading