Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions backend/infrahub/computed_attribute/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def _chunk_ids(ids: list[str], chunk_size: int) -> list[list[str]]:
return [ids[i : i + chunk_size] for i in range(0, len(ids), chunk_size)]


def _get_submission_chunk_size() -> int:
return max(1, get_prefect_max_related_resources() // 2)


UPDATE_ATTRIBUTE = """
mutation UpdateAttribute(
$id: String!,
Expand Down Expand Up @@ -212,7 +216,7 @@ async def trigger_update_python_computed_attributes(
if not object_ids:
return

chunk_size = get_prefect_max_related_resources() // 2
chunk_size = _get_submission_chunk_size()
for chunk in _chunk_ids(object_ids, chunk_size):
await get_workflow().submit_workflow(
workflow=COMPUTED_ATTRIBUTE_PROCESS_TRANSFORM,
Expand Down Expand Up @@ -554,7 +558,7 @@ async def query_transform_targets(
key = (subscriber.kind, computed_attribute.name)
batches.setdefault(key, []).append(subscriber.object_id)

chunk_size = get_prefect_max_related_resources() // 2
chunk_size = _get_submission_chunk_size()
for (kind, attribute_name), batch_object_ids in batches.items():
for chunk in _chunk_ids(batch_object_ids, chunk_size):
await get_workflow().submit_workflow(
Expand Down
5 changes: 3 additions & 2 deletions backend/infrahub/core/ipam/utilization.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ async def get_children(
if ip_prefixes is None:
ip_prefixes = self.ip_prefixes
result: list[PrefixChildDetails] = []
for prefix in ip_prefixes:
for child in self._results_by_prefix_id.get(prefix.get_id(), []):
prefix_ids = {prefix.get_id() for prefix in ip_prefixes}
for prefix_id in prefix_ids:
for child in self._results_by_prefix_id.get(prefix_id, []):
if prefix_member_type and child.child_type != prefix_member_type:
continue
result.append(child)
Expand Down
38 changes: 16 additions & 22 deletions backend/infrahub/git/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,8 @@ def create_commit_worktree(self, commit: str) -> bool | Worktree:
"""Create a new worktree for a given commit.

Raises:
RepositoryError: When the worktree cannot be created and the commit cannot be fetched from a remote.
CommitNotFoundError: When the commit does not exist in the local clone.
RepositoryError: When the worktree cannot be created for any other reason.

"""
# Check of the worktree already exist
Expand All @@ -721,22 +722,9 @@ def create_commit_worktree(self, commit: str) -> bool | Worktree:
log.debug(f"Commit worktree created {commit}", repository=self.name)
return worktree
except GitCommandError as exc:
if "invalid reference" not in exc.stderr:
raise RepositoryError(identifier=self.name, message=exc.stderr) from exc

if not self.has_origin:
raise RepositoryError(
identifier=self.name,
message=f"Commit {commit} not found and no remote origin configured to fetch from.",
) from exc

# Commit may exist on the remote but hasn't been fetched to this worker yet
log.info(f"Commit {commit} not found locally, fetching from remote.", repository=self.name)
repo.remotes.origin.fetch()

repo.git.worktree("add", directory, commit)
log.debug(f"Commit worktree created {commit} after fetch", repository=self.name)
return worktree
if "invalid reference" in exc.stderr:
raise CommitNotFoundError(identifier=self.name, commit=commit) from exc
raise RepositoryError(identifier=self.name, message=exc.stderr) from exc

def create_branch_worktree(self, branch_name: str, branch_id: str) -> bool:
"""Create a new worktree for a given branch.
Expand All @@ -761,7 +749,13 @@ def create_branch_worktree(self, branch_name: str, branch_id: str) -> bool:
async def calculate_diff_between_commits(
self, first_commit: str, second_commit: str
) -> tuple[list[str], list[str], list[str]]:
"""TODO need to refactor this function to return more information.
"""Return the (changed, added, removed) files going from first_commit to second_commit.

Direction follows `git diff first_commit second_commit`: first_commit is the old/base side
and second_commit is the new/target side. A file present only in second_commit is "added";
a file present only in first_commit is "removed".

TODO need to refactor this function to return more information.

Like :
- What has changed inside the files
Expand All @@ -777,12 +771,12 @@ async def calculate_diff_between_commits(
added_files = []

for x in commit_in_branch.diff(commit_to_compare, create_patch=True):
if x.a_blob and not x.b_blob and x.a_blob.path not in added_files:
added_files.append(x.a_blob.path)
if x.a_blob and not x.b_blob and x.a_blob.path not in removed_files:
removed_files.append(x.a_blob.path)
elif x.a_blob and x.b_blob and x.a_blob.path not in changed_files:
changed_files.append(x.a_blob.path)
elif not x.a_blob and x.b_blob and x.b_blob.path not in removed_files:
removed_files.append(x.b_blob.path)
elif not x.a_blob and x.b_blob and x.b_blob.path not in added_files:
added_files.append(x.b_blob.path)

return changed_files, added_files, removed_files

Expand Down
14 changes: 12 additions & 2 deletions backend/infrahub/git/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@
from pydantic import ValidationError as PydanticValidationError
from typing_extensions import Self

from infrahub import config
from infrahub import config, lock
from infrahub.core.constants import ArtifactStatus, ContentType, InfrahubKind, RepositoryObjects, RepositorySyncStatus
from infrahub.core.registry import registry
from infrahub.events.artifact_action import ArtifactCreatedEvent, ArtifactUpdatedEvent
from infrahub.events.models import EventMeta
from infrahub.events.repository_action import CommitUpdatedEvent
from infrahub.exceptions import (
CheckError,
CommitNotFoundError,
RepositoryConfigurationError,
RepositoryInvalidFileSystemError,
TransformError,
Expand Down Expand Up @@ -165,7 +166,16 @@ async def init(cls, commit: str | None = None, **kwargs: Any) -> Self:
log.info(f"Initialized the local directory for {self.name} because it was missing.")

if commit:
self.get_commit_worktree(commit=commit)
try:
self.get_commit_worktree(commit=commit)
except CommitNotFoundError:
if not self.has_origin:
raise
# The commit may exist on the remote but not yet in this worker's clone.
# Fetch under the repository lock, which serializes shared-clone mutations, and retry.
async with lock.registry.get(name=self.name, namespace="repository"):
await self.fetch()
self.get_commit_worktree(commit=commit)

log.debug(
f"Initiated the object on an existing directory for {self.name}",
Expand Down
125 changes: 95 additions & 30 deletions backend/infrahub/git/repository.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

from cachetools import TTLCache
Expand All @@ -13,7 +14,7 @@

from infrahub import config
from infrahub.core.constants import InfrahubKind, RepositoryInternalStatus, RepositoryOperationalStatus
from infrahub.exceptions import RepositoryError
from infrahub.exceptions import CommitNotFoundError, RepositoryError
from infrahub.git.integrator import InfrahubRepositoryIntegrator
from infrahub.log import get_logger

Expand All @@ -23,6 +24,15 @@
log = get_logger()


@dataclass
class PendingObjectImport:
"""A repository object import waiting to run: which commit to import from and which Infrahub branch to import into."""

infrahub_branch_name: str
commit: str
git_branch_name: str | None = None


class InfrahubRepository(InfrahubRepositoryIntegrator):
"""Primary type of Git repository, with deep integration within Infrahub.

Expand Down Expand Up @@ -66,7 +76,26 @@ async def sync(self, staging_branch: str | None = None) -> None:
By default the sync will focus only on the branches pulled from origin that have some differences with the local one.

Raises:
GraphQLError: When creating a branch in the graph fails for a reason other than the branch already existing.
GraphQLError: When a branch or commit update against the database fails.

"""
for pending in await self.collect_pending_imports(staging_branch=staging_branch):
await self.import_objects_from_files(
infrahub_branch_name=pending.infrahub_branch_name,
git_branch_name=pending.git_branch_name,
commit=pending.commit,
)

async def collect_pending_imports(self, staging_branch: str | None = None) -> list[PendingObjectImport]:
"""Run the git and branch-setup side of a sync and return the imports it produced.

Brings the local clone in line with the remote and records the affected branches and their
commits in the database, pinning a per-commit worktree for each. Returns one entry per branch
whose objects still need importing into the graph. A per-branch git failure is logged and skipped so the
other branches' imports are still returned.

Raises:
GraphQLError: When a branch or commit update against the database fails.

"""
log.info("Starting the synchronization.", repository=self.name)
Expand All @@ -75,8 +104,9 @@ async def sync(self, staging_branch: str | None = None) -> None:

new_branches, updated_branches = await self.compare_local_remote()

pending_imports: list[PendingObjectImport] = []
if not new_branches and not updated_branches:
return
return pending_imports

log.debug(f"New Branches {new_branches}, Updated Branches {updated_branches}", repository=self.name)

Expand All @@ -89,19 +119,30 @@ async def sync(self, staging_branch: str | None = None) -> None:

infrahub_branch = self._get_mapped_target_branch(branch_name=branch_name)
try:
branch = await self.create_branch_in_graph(branch_name=infrahub_branch)
except GraphQLError as exc:
if "already exist" not in exc.errors[0]["message"]:
raise
branch = await self.sdk.branch.get(branch_name=infrahub_branch)

await self.create_branch_in_git(branch_name=branch.name, branch_id=branch.id, push_origin=True)

commit = self.get_commit_value(branch_name=branch_name, remote=False)
self.create_commit_worktree(commit=commit)
await self.update_commit_value(branch_name=infrahub_branch, commit=commit)
try:
branch = await self.create_branch_in_graph(branch_name=infrahub_branch)
except GraphQLError as exc:
if "already exist" not in exc.errors[0]["message"]:
raise
branch = await self.sdk.branch.get(branch_name=infrahub_branch)

await self.create_branch_in_git(branch_name=branch.name, branch_id=branch.id, push_origin=True)

commit = self.get_commit_value(branch_name=branch_name, remote=False)
self.create_commit_worktree(commit=commit)
await self.update_commit_value(branch_name=infrahub_branch, commit=commit)
except (RepositoryError, CommitNotFoundError, GitCommandError, ValueError) as exc:
# Isolate per-branch git failures so imports already collected for the other
# branches are still returned and applied.
log.warning(
"Failed to prepare branch for import, skipping it.",
repository=self.name,
branch=branch_name,
exc_info=exc,
)
continue

await self.import_objects_from_files(infrahub_branch_name=infrahub_branch, commit=commit)
pending_imports.append(PendingObjectImport(infrahub_branch_name=infrahub_branch, commit=commit))

for branch_name in updated_branches:
is_valid = self.validate_remote_branch(branch_name=branch_name)
Expand All @@ -110,9 +151,23 @@ async def sync(self, staging_branch: str | None = None) -> None:

infrahub_branch = self._get_mapped_target_branch(branch_name=branch_name)

commit_after = await self.pull(branch_name=branch_name)
try:
commit_after = await self.pull(branch_name=branch_name)
except (RepositoryError, CommitNotFoundError, GitCommandError, ValueError) as exc:
# Isolate per-branch git failures so imports already collected for the other
# branches are still returned and applied; graph errors are left to propagate.
log.warning(
"Failed to pull branch for import, skipping it.",
repository=self.name,
branch=branch_name,
exc_info=exc,
)
continue

if isinstance(commit_after, str):
await self.import_objects_from_files(infrahub_branch_name=infrahub_branch, commit=commit_after)
pending_imports.append(
PendingObjectImport(infrahub_branch_name=infrahub_branch, commit=commit_after)
)

elif commit_after is True:
log.warning(
Expand All @@ -121,26 +176,36 @@ async def sync(self, staging_branch: str | None = None) -> None:
branch=branch_name,
)

await self._sync_staging(staging_branch=staging_branch, updated_branches=updated_branches)
pending_imports.extend(
await self._collect_staging_imports(staging_branch=staging_branch, updated_branches=updated_branches)
)
return pending_imports

async def _sync_staging(self, staging_branch: str | None, updated_branches: list[str]) -> None:
if (
async def _collect_staging_imports(
self, staging_branch: str | None, updated_branches: list[str]
) -> list[PendingObjectImport]:
if not (
self.internal_status == RepositoryInternalStatus.STAGING.value
and staging_branch
and self.default_branch in updated_branches
):
commit_after = await self.pull(branch_name=self.default_branch)
if isinstance(commit_after, str):
await self.import_objects_from_files(
git_branch_name=self.default_branch, infrahub_branch_name=staging_branch, commit=commit_after
)
return []

elif commit_after is True:
log.warning(
f"An update was detected but the commit remained the same after pull() ({commit_after}).",
repository=self.name,
branch=self.default_branch,
commit_after = await self.pull(branch_name=self.default_branch)
if isinstance(commit_after, str):
return [
PendingObjectImport(
infrahub_branch_name=staging_branch, git_branch_name=self.default_branch, commit=commit_after
)
]

if commit_after is True:
log.warning(
f"An update was detected but the commit remained the same after pull() ({commit_after}).",
repository=self.name,
branch=self.default_branch,
)
return []

async def push(self, branch_name: str) -> bool:
"""Push a given branch to the remote Origin repository."""
Expand Down
Loading
Loading