Skip to content
36 changes: 36 additions & 0 deletions examples/task_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os

from pytfe import TFEClient


def main():
token = os.getenv("TFE_TOKEN")
task_result_id = os.getenv("TFE_TASK_RESULT_ID")

if not token:
print("Set TFE_TOKEN")
return

if not task_result_id:
print("Set TFE_TASK_RESULT_ID")
return

client = TFEClient()

try:
result = client.task_results.read(task_result_id)

print("=== Task Result ===")
print(f"ID: {result.id}")
print(f"Status: {result.status}")
print(f"Message: {result.message}")
print(f"Task Name: {result.task_name}")
print(f"URL: {result.url}")
print(f"Task Stage: {result.task_stage.id if result.task_stage else None}")

except Exception as e:
print(f"Error: {e}")


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions src/pytfe/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from .resources.stack_configuration import StackConfigurations
from .resources.state_version_outputs import StateVersionOutputs
from .resources.state_versions import StateVersions
from .resources.task_result import TaskResults
from .resources.team import Teams
from .resources.team_project_access import TeamProjectAccesses
from .resources.team_token import TeamTokens
Expand Down Expand Up @@ -93,6 +94,7 @@ def __init__(self, config: TFEConfig | None = None):
) # org Explorer queries and saved views

self.users = Users(self._transport)
self.task_results = TaskResults(self._transport)
self.organization_tags = OrganizationTags(self._transport)
self.organization_tokens = OrganizationTokens(self._transport)
self.projects = Projects(self._transport)
Expand Down
28 changes: 28 additions & 0 deletions src/pytfe/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,17 @@
StateVersionOutput,
StateVersionOutputsListOptions,
)

# ── Task Result ───────────────────────────────────────────────────────────────
from .task_result import (
TaskEnforcementLevel as TaskResultEnforcementLevel,
)
from .task_result import (
TaskResult,
TaskResultStatus,
TaskResultStatusTimestamps,
)
from .task_stage import TaskStage
from .team import (
OrganizationAccess,
Team,
Expand Down Expand Up @@ -717,6 +728,12 @@
"RunTaskCreateOptions",
"RunTaskUpdateOptions",
"RunTaskReadOptions",
# Task Result
"TaskResult",
"TaskResultEnforcementLevel",
"TaskResultStatus",
"TaskResultStatusTimestamps",
"TaskStage",
# Run task integration (callback)
"TaskResultCallbackRequestOptions",
"TaskResultCallbackStatus",
Expand Down Expand Up @@ -809,3 +826,14 @@
RegistryProvider.model_rebuild()
RegistryProviderVersion.model_rebuild()
RegistryProviderPlatform.model_rebuild()

# Rebuild TaskResult to resolve Run, Workspace, PolicyEvaluation, TaskStage refs
TaskResult.model_rebuild(
raise_errors=False,
_types_namespace={
"PolicyEvaluation": PolicyEvaluation,
"Run": Run,
"TaskStage": TaskStage,
"Workspace": Workspace,
},
)
117 changes: 117 additions & 0 deletions src/pytfe/models/task_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright IBM Corp. 2025, 2026
# SPDX-License-Identifier: MPL-2.0

from __future__ import annotations

from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Any

from pydantic import BaseModel, ConfigDict, Field

if TYPE_CHECKING:
# Imported only for type checking to avoid circular imports.
from pytfe.models.policy_evaluation import PolicyEvaluation
from pytfe.models.run import Run
from pytfe.models.task_stage import TaskStage
from pytfe.models.workspace import Workspace


class TaskResultStatus(str, Enum):
passed = "passed"
failed = "failed"
pending = "pending"
running = "running"
unreachable = "unreachable"
errored = "errored"


class TaskEnforcementLevel(str, Enum):
advisory = "advisory"
mandatory = "mandatory"


class TaskResultStatusTimestamps(BaseModel):
model_config = ConfigDict(populate_by_name=True)

errored_at: datetime | None = Field(None, alias="errored-at")
running_at: datetime | None = Field(None, alias="running-at")
canceled_at: datetime | None = Field(None, alias="canceled-at")
failed_at: datetime | None = Field(None, alias="failed-at")
passed_at: datetime | None = Field(None, alias="passed-at")


class TaskResult(BaseModel):
Comment thread
isivaselvan marked this conversation as resolved.
model_config = ConfigDict(populate_by_name=True)

id: str

status: TaskResultStatus | None = Field(None, alias="status")
message: str | None = Field(None, alias="message")

status_timestamps: TaskResultStatusTimestamps | None = Field(
None,
alias="status-timestamps",
)

url: str | None = Field(None, alias="url")

created_at: datetime | None = Field(None, alias="created-at")
updated_at: datetime | None = Field(None, alias="updated-at")

task_id: str | None = Field(None, alias="task-id")
task_name: str | None = Field(None, alias="task-name")
task_url: str | None = Field(None, alias="task-url")

workspace_task_id: str | None = Field(None, alias="workspace-task-id")

workspace_task_enforcement_level: TaskEnforcementLevel | None = Field(
None,
alias="workspace-task-enforcement-level",
)

agent_pool_id: str | None = Field(None, alias="agent-pool-id")

# Relationships
# Forward-referenced to avoid circular imports; resolved lazily below.
task_stage: TaskStage | None = Field(None, alias="task-stage")
run: Run | None = Field(None, alias="run")
workspace: Workspace | None = Field(None, alias="workspace")
policy_evaluations: list[PolicyEvaluation] | None = Field(
None,
alias="policy-evaluations",
)

@classmethod
def model_validate(cls, *args: Any, **kwargs: Any) -> TaskResult:
# Ensure the TaskStage forward reference is resolved before validating.
# The import-time rebuild may run while task_stage.py is still
# partially loaded (circular import), in which case we retry here.
if not getattr(cls, "__pydantic_complete__", True):
_rebuild_task_result_model()
return super().model_validate(*args, **kwargs)


def _rebuild_task_result_model() -> None:
# Resolve all forward references once all modules are loaded.
try:
from pytfe.models.policy_evaluation import PolicyEvaluation
from pytfe.models.run import Run
from pytfe.models.task_stage import TaskStage
from pytfe.models.workspace import Workspace

TaskResult.model_rebuild(
raise_errors=False,
_types_namespace={
"PolicyEvaluation": PolicyEvaluation,
"Run": Run,
"TaskStage": TaskStage,
"Workspace": Workspace,
},
)
except Exception:
# One or more models not yet importable during partial init; safe to skip.
pass


_rebuild_task_result_model()
2 changes: 1 addition & 1 deletion src/pytfe/models/task_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# TaskStage represents a HCP Terraform or Terraform Enterprise run's stage where run tasks can occur
class TaskStage(BaseModel):
model_config = ConfigDict(populate_by_name=True, validate_by_name=True)
model_config = ConfigDict(populate_by_name=True)

id: str
# stage: Stage = Field(..., alias="stage")
Expand Down
77 changes: 77 additions & 0 deletions src/pytfe/resources/task_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright IBM Corp. 2025, 2026
# SPDX-License-Identifier: MPL-2.0

from typing import Any

from pytfe.models.policy_evaluation import PolicyEvaluation
from pytfe.models.run import Run
from pytfe.models.task_result import TaskResult
from pytfe.models.task_stage import TaskStage
from pytfe.models.workspace import Workspace
from pytfe.utils import valid_string_id

from ._base import _Service


class TaskResults(_Service):
def read(self, task_result_id: str) -> TaskResult:
if not valid_string_id(task_result_id):
raise ValueError("Invalid task_result_id")

path = f"/api/v2/task-results/{task_result_id}"

response = self.t.request("GET", path)
data = response.json()

if "data" not in data:
raise ValueError("Invalid response format")

return self._parse_task_result(data["data"])

def _parse_task_result(self, data: dict[str, Any]) -> TaskResult:
Comment thread
isivaselvan marked this conversation as resolved.
# Ensure forward references in TaskResult are resolved before use.
TaskResult.model_rebuild(
raise_errors=False,
_types_namespace={
"PolicyEvaluation": PolicyEvaluation,
"Run": Run,
"TaskStage": TaskStage,
"Workspace": Workspace,
},
)

attributes = data.get("attributes", {})
attributes["id"] = data.get("id")

relationships = data.get("relationships", {})

# Map task-stage relationship into the TaskStage SDK model.
task_stage_data = relationships.get("task-stage", {}).get("data")
if task_stage_data:
attributes["task-stage"] = TaskStage.model_validate(task_stage_data)
else:
attributes["task-stage"] = None

# Map run relationship into the Run SDK model.
run_data = relationships.get("run", {}).get("data")
if run_data:
attributes["run"] = Run.model_validate(run_data)
else:
attributes["run"] = None

# Map workspace relationship into the Workspace SDK model.
workspace_data = relationships.get("workspace", {}).get("data")
if workspace_data:
attributes["workspace"] = Workspace.model_validate(workspace_data)
else:
attributes["workspace"] = None

# Map policy-evaluations relationship into a list of PolicyEvaluation models.
policy_evaluations_data = relationships.get("policy-evaluations", {}).get(
"data", []
)
attributes["policy-evaluations"] = [
PolicyEvaluation.model_validate(pe) for pe in policy_evaluations_data
]

return TaskResult.model_validate(attributes)
Loading
Loading