diff --git a/examples/workspace_run_task.py b/examples/workspace_run_task.py new file mode 100644 index 0000000..2348951 --- /dev/null +++ b/examples/workspace_run_task.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +""" +Workspace Run Task Operations Example + +Demonstrates workspace run task operations: +1. create() - Attach a run task to a workspace +2. list() - List all workspace run tasks for a workspace +3. read() - Read a workspace run task by ID +4. update() - Update enforcement/stage settings +5. delete() - Delete a workspace run task + +Prerequisites: +- Set TFE_TOKEN and TFE_ADDRESS environment variables +- Use valid workspace and run task IDs +""" + +import argparse + +from pytfe import TFEClient, TFEConfig +from pytfe.models import ( + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskUpdateOptions, +) + + +def _find_matching_workspace_run_task(items, run_task_id: str): + for item in items: + if item.run_task and item.run_task.id == run_task_id: + return item + + if len(items) == 1: + return items[0] + + return None + + +def main(): + parser = argparse.ArgumentParser( + description="Workspace run task operations demo for python-tfe SDK" + ) + parser.add_argument( + "--workspace-id", + required=True, + help="Workspace ID (example: ws-abc123)", + ) + parser.add_argument( + "--run-task-id", + required=True, + help="Run task ID (example: task-abc123)", + ) + parser.add_argument( + "--delete-existing", + action="store_true", + help="Allow delete() to remove a workspace run task that existed before this example ran", + ) + args = parser.parse_args() + + client = TFEClient(TFEConfig.from_env()) + + workspace_id = args.workspace_id + run_task_id = args.run_task_id + + if workspace_id == "ws-xxxxxxxx" or run_task_id == "task-xxxxxxxx": + print("Please provide real IDs for --workspace-id and --run-task-id") + return + + print("=" * 80) + print("WORKSPACE RUN TASK OPERATIONS") + print("=" * 80) + + workspace_task = None + created_in_run = False + + print("\n1. create()") + try: + create_options = WorkspaceRunTaskCreateOptions( + enforcement_level="advisory", + run_task={"id": run_task_id}, + stages=["post_plan"], + ) + workspace_task = client.workspace_run_tasks.create(workspace_id, create_options) + created_in_run = True + print(f"Created workspace run task: {workspace_task.id}") + except Exception as exc: + print(f"Create result: {exc}") + + print("\n2. list()") + items = [] + try: + items = list(client.workspace_run_tasks.list(workspace_id)) + print(f"Found {len(items)} workspace run task(s)") + for item in items: + run_task = item.run_task.id if item.run_task else None + print( + f"- {item.id} run_task={run_task} enforcement={item.enforcement_level} stages={item.stages}" + ) + + if workspace_task is None: + workspace_task = _find_matching_workspace_run_task(items, run_task_id) + if workspace_task is not None: + print(f"Using existing workspace run task: {workspace_task.id}") + except Exception as exc: + print(f"List failed: {exc}") + + print("\n3. read()") + if workspace_task is None: + print("Read skipped: no workspace run task ID available") + else: + try: + workspace_task = client.workspace_run_tasks.read( + workspace_id, workspace_task.id + ) + print( + f"Read workspace run task: {workspace_task.id} enforcement={workspace_task.enforcement_level} stages={workspace_task.stages}" + ) + except Exception as exc: + print(f"Read failed: {exc}") + + print("\n4. update()") + if workspace_task is None: + print("Update skipped: no workspace run task ID available") + else: + try: + update_options = WorkspaceRunTaskUpdateOptions( + # enforcement_level="mandatory", + stages=["post_plan", "pre_apply"], + ) + workspace_task = client.workspace_run_tasks.update( + workspace_id, workspace_task.id, update_options + ) + print( + f"Updated workspace run task: {workspace_task.id} enforcement={workspace_task.enforcement_level} stages={workspace_task.stages}" + ) + except Exception as exc: + print(f"Update failed: {exc}") + + print("\n5. delete()") + if workspace_task is None: + print("Delete skipped: no workspace run task ID available") + elif not created_in_run and not args.delete_existing: + print( + "Delete skipped: workspace run task existed before this example. " + "Re-run with --delete-existing to delete it." + ) + else: + try: + client.workspace_run_tasks.delete(workspace_id, workspace_task.id) + print(f"Deleted workspace run task: {workspace_task.id}") + except Exception as exc: + print(f"Delete failed: {exc}") + + print("=" * 80) + print("WORKSPACE RUN TASK OPERATIONS COMPLETED") + print("=" * 80) + + +if __name__ == "__main__": + main() diff --git a/src/pytfe/client.py b/src/pytfe/client.py index 409a36c..63f9286 100644 --- a/src/pytfe/client.py +++ b/src/pytfe/client.py @@ -52,6 +52,7 @@ from .resources.variable import Variables from .resources.variable_sets import VariableSets, VariableSetVariables from .resources.workspace_resources import WorkspaceResourcesService +from .resources.workspace_run_task import WorkspaceRunTasks from .resources.workspaces import Workspaces @@ -103,6 +104,7 @@ def __init__(self, config: TFEConfig | None = None): self.variable_set_variables = VariableSetVariables(self._transport) self.workspaces = Workspaces(self._transport) self.workspace_resources = WorkspaceResourcesService(self._transport) + self.workspace_run_tasks = WorkspaceRunTasks(self._transport) self.registry_modules = RegistryModules(self._transport) self.registry_providers = RegistryProviders(self._transport) self.registry_provider_versions = RegistryProviderVersions(self._transport) diff --git a/src/pytfe/errors.py b/src/pytfe/errors.py index 4cfa601..832bb09 100644 --- a/src/pytfe/errors.py +++ b/src/pytfe/errors.py @@ -338,6 +338,13 @@ def __init__(self, message: str = 'category must be "task"'): super().__init__(message) +class InvalidWorkspaceRunTaskIDError(InvalidValues): + """Raised when an invalid workspace run task ID is provided.""" + + def __init__(self, message: str = "invalid value for workspace run task ID"): + super().__init__(message) + + # Run Trigger errors class RequiredRunTriggerListOpsError(RequiredFieldMissing): """Raised when required run trigger list options are missing.""" diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index f7cda57..a756180 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -328,6 +328,10 @@ from .run_task_integration import ( TaskResultStatus as TaskResultCallbackStatus, ) +from .run_task_request import ( + RunTaskRequest, + RunTaskRequestCapabilitites, +) from .run_trigger import ( RunTrigger, RunTriggerCreateOptions, @@ -456,6 +460,15 @@ WorkspaceResource, WorkspaceResourceListOptions, ) +from .workspace_run_task import ( + RunTaskReference, + WorkspaceRunTask, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskEnforcementLevel, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskStage, + WorkspaceRunTaskUpdateOptions, +) # ── Public surface ──────────────────────────────────────────────────────────── __all__ = [ @@ -684,6 +697,14 @@ # Workspace Resources "WorkspaceResource", "WorkspaceResourceListOptions", + # Workspace Run Tasks + "RunTaskReference", + "WorkspaceRunTask", + "WorkspaceRunTaskEnforcementLevel", + "WorkspaceRunTaskStage", + "WorkspaceRunTaskListOptions", + "WorkspaceRunTaskCreateOptions", + "WorkspaceRunTaskUpdateOptions", "RunQueue", "ReadRunQueueOptions", # Runs @@ -728,6 +749,9 @@ "RunTaskCreateOptions", "RunTaskUpdateOptions", "RunTaskReadOptions", + # Run Task Request + "RunTaskRequest", + "RunTaskRequestCapabilitites", # Task Result "TaskResult", "TaskResultEnforcementLevel", diff --git a/src/pytfe/models/run_task_request.py b/src/pytfe/models/run_task_request.py new file mode 100644 index 0000000..b251046 --- /dev/null +++ b/src/pytfe/models/run_task_request.py @@ -0,0 +1,119 @@ +# Copyright IBM Corp. 2025, 2026 +# SPDX-License-Identifier: MPL-2.0 + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict, Field + + +class RunTaskRequestCapabilitites(BaseModel): + """Defines the capabilities that the caller supports.""" + + model_config = ConfigDict(populate_by_name=True) + + outcomes: bool = Field(..., description="Whether the caller supports outcomes") + + +class RunTaskRequest(BaseModel): + """Payload object that TFC/E sends to the Run Task's URL. + + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#common-properties + """ + + model_config = ConfigDict(populate_by_name=True) + + access_token: str = Field( + ..., alias="access_token", description="The access token for the run task" + ) + capabilitites: RunTaskRequestCapabilitites = Field( + default_factory=lambda: RunTaskRequestCapabilitites(outcomes=False), + alias="capabilitites", + description="The capabilities that the caller supports", + ) + configuration_version_download_url: str | None = Field( + None, + alias="configuration_version_download_url", + description="The URL to download the configuration version", + ) + configuration_version_id: str | None = Field( + None, + alias="configuration_version_id", + description="The ID of the configuration version", + ) + is_speculative: bool = Field( + ..., alias="is_speculative", description="Whether the run is speculative" + ) + organization_name: str = Field( + ..., alias="organization_name", description="The name of the organization" + ) + payload_version: int = Field( + ..., alias="payload_version", description="The version of the payload format" + ) + plan_json_api_url: str | None = Field( + None, + alias="plan_json_api_url", + description="URL to the plan JSON API (specific to post_plan, pre_apply or post_apply stage)", + ) + run_app_url: str = Field( + ..., alias="run_app_url", description="The URL to the run in the TFC/E UI" + ) + run_created_at: datetime = Field( + ..., alias="run_created_at", description="The time the run was created" + ) + run_created_by: str = Field( + ..., alias="run_created_by", description="The user who created the run" + ) + run_id: str = Field(..., alias="run_id", description="The ID of the run") + run_message: str = Field( + ..., alias="run_message", description="The message associated with the run" + ) + stage: str = Field(..., alias="stage", description="The stage of the run task") + task_result_callback_url: str = Field( + ..., + alias="task_result_callback_url", + description="The URL to call with the task result", + ) + task_result_enforcement_level: str = Field( + ..., + alias="task_result_enforcement_level", + description="The enforcement level of the task result", + ) + task_result_id: str = Field( + ..., alias="task_result_id", description="The ID of the task result" + ) + vcs_branch: str | None = Field( + None, alias="vcs_branch", description="The VCS branch associated with the run" + ) + vcs_commit_url: str | None = Field( + None, + alias="vcs_commit_url", + description="The URL of the VCS commit associated with the run", + ) + vcs_pull_request_url: str | None = Field( + None, + alias="vcs_pull_request_url", + description="The URL of the VCS pull request associated with the run", + ) + vcs_repo_url: str | None = Field( + None, + alias="vcs_repo_url", + description="The URL of the VCS repository associated with the run", + ) + workspace_app_url: str = Field( + ..., + alias="workspace_app_url", + description="The URL to the workspace in the TFC/E UI", + ) + workspace_id: str = Field( + ..., alias="workspace_id", description="The ID of the workspace" + ) + workspace_name: str = Field( + ..., alias="workspace_name", description="The name of the workspace" + ) + workspace_working_directory: str | None = Field( + None, + alias="workspace_working_directory", + description="The working directory configured for the workspace", + ) diff --git a/src/pytfe/models/workspace_run_task.py b/src/pytfe/models/workspace_run_task.py index f29695e..61577e6 100644 --- a/src/pytfe/models/workspace_run_task.py +++ b/src/pytfe/models/workspace_run_task.py @@ -3,8 +3,143 @@ from __future__ import annotations -from pydantic import BaseModel +from enum import Enum + +from pydantic import BaseModel, ConfigDict, Field, field_validator + +from ..errors import InvalidRunTaskIDError +from .workspace import Workspace + + +def _normalize_stage_value(value: str) -> str: + """Normalize stage names to the API's underscore format.""" + return value.replace("-", "_") + + +class WorkspaceRunTaskStage(str, Enum): + PRE_PLAN = "pre_plan" + POST_PLAN = "post_plan" + PRE_APPLY = "pre_apply" + POST_APPLY = "post_apply" + + +class WorkspaceRunTaskEnforcementLevel(str, Enum): + ADVISORY = "advisory" + MANDATORY = "mandatory" + + +class RunTaskReference(BaseModel): + """Reference model for run task relationships.""" + + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + id: str = Field(..., description="Run task ID") class WorkspaceRunTask(BaseModel): + """Workspace run task model.""" + + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + id: str + enforcement_level: WorkspaceRunTaskEnforcementLevel | None = Field( + default=None, validation_alias="enforcement-level" + ) + stages: list[WorkspaceRunTaskStage] = Field( + default_factory=list, validation_alias="stages" + ) + run_task: RunTaskReference | None = Field(default=None, validation_alias="run-task") + workspace: Workspace | None = Field(default=None, validation_alias="workspace") + + @field_validator("stages", mode="before") + @classmethod + def normalize_stages( + cls, value: list[str | WorkspaceRunTaskStage] | None + ) -> list[str]: + if value is None: + return [] + return [ + _normalize_stage_value( + item.value if isinstance(item, WorkspaceRunTaskStage) else item + ) + for item in value + ] + + +class WorkspaceRunTaskListOptions(BaseModel): + """Options for listing workspace run tasks.""" + + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + page_size: int | None = Field(default=None, alias="page[size]") + + +class WorkspaceRunTaskCreateOptions(BaseModel): + """Options for creating a workspace run task.""" + + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + type: str = Field(default="workspace-tasks") + enforcement_level: WorkspaceRunTaskEnforcementLevel = Field( + ..., alias="enforcement-level" + ) + run_task: RunTaskReference = Field(..., alias="run-task") + stages: list[WorkspaceRunTaskStage] | None = Field(default=None, alias="stages") + + @field_validator("run_task", mode="before") + @classmethod + def normalize_run_task( + cls, value: RunTaskReference | dict | str + ) -> RunTaskReference | dict | str: + if isinstance(value, RunTaskReference): + return value + if isinstance(value, str): + return RunTaskReference(id=value) + if isinstance(value, dict) and isinstance(value.get("id"), str): + return RunTaskReference(id=value["id"]) + return value + + def validate_for_create(self) -> None: + """Validate create options.""" + if not self.run_task.id: + raise InvalidRunTaskIDError() + + @field_validator("stages", mode="before") + @classmethod + def normalize_stages( + cls, value: list[str | WorkspaceRunTaskStage] | None + ) -> list[str] | None: + if value is None: + return None + return [ + _normalize_stage_value( + item.value if isinstance(item, WorkspaceRunTaskStage) else item + ) + for item in value + ] + + +class WorkspaceRunTaskUpdateOptions(BaseModel): + """Options for updating a workspace run task.""" + + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + type: str = Field(default="workspace-tasks") + enforcement_level: WorkspaceRunTaskEnforcementLevel | None = Field( + default=None, alias="enforcement-level" + ) + stages: list[WorkspaceRunTaskStage] | None = Field(default=None, alias="stages") + + @field_validator("stages", mode="before") + @classmethod + def normalize_stages( + cls, value: list[str | WorkspaceRunTaskStage] | None + ) -> list[str] | None: + if value is None: + return None + return [ + _normalize_stage_value( + item.value if isinstance(item, WorkspaceRunTaskStage) else item + ) + for item in value + ] diff --git a/src/pytfe/resources/workspace_run_task.py b/src/pytfe/resources/workspace_run_task.py new file mode 100644 index 0000000..5c285bc --- /dev/null +++ b/src/pytfe/resources/workspace_run_task.py @@ -0,0 +1,173 @@ +# Copyright IBM Corp. 2025, 2026 +# SPDX-License-Identifier: MPL-2.0 + +from __future__ import annotations + +from collections.abc import Iterator +from typing import Any +from urllib.parse import quote + +from ..errors import ( + InvalidRunTaskIDError, + InvalidWorkspaceIDError, + InvalidWorkspaceRunTaskIDError, +) +from ..models.workspace import Workspace +from ..models.workspace_run_task import ( + RunTaskReference, + WorkspaceRunTask, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskEnforcementLevel, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskStage, + WorkspaceRunTaskUpdateOptions, +) +from ..utils import _safe_str, valid_string_id +from ._base import _Service + + +def _workspace_run_task_from(data: dict[str, Any]) -> WorkspaceRunTask: + """Convert API response data to WorkspaceRunTask model.""" + attributes = data.get("attributes", {}) or {} + relationships = data.get("relationships", {}) or {} + + run_task = None + run_task_data = relationships.get("task", {}).get("data") + if isinstance(run_task_data, dict) and run_task_data.get("id"): + run_task = RunTaskReference(id=_safe_str(run_task_data.get("id"))) + + workspace = None + workspace_data = relationships.get("workspace", {}).get("data") + if isinstance(workspace_data, dict) and workspace_data.get("id"): + workspace = Workspace.model_construct(id=_safe_str(workspace_data.get("id"))) + + enforcement_level = None + raw_enforcement = attributes.get("enforcement-level") + if isinstance(raw_enforcement, str): + try: + enforcement_level = WorkspaceRunTaskEnforcementLevel(raw_enforcement) + except ValueError: + enforcement_level = None + + stages: list[WorkspaceRunTaskStage] = [] + for raw_stage in attributes.get("stages", []): + if not isinstance(raw_stage, str): + continue + try: + stages.append(WorkspaceRunTaskStage(raw_stage)) + except ValueError: + continue + + return WorkspaceRunTask( + id=_safe_str(data.get("id")), + enforcement_level=enforcement_level, + stages=stages, + run_task=run_task, + workspace=workspace, + ) + + +class WorkspaceRunTasks(_Service): + """Workspace run tasks service.""" + + def create( + self, workspace_id: str, options: WorkspaceRunTaskCreateOptions + ) -> WorkspaceRunTask: + """Attach a run task to a workspace.""" + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError() + + options.validate_for_create() + + body: dict[str, Any] = { + "data": { + "type": "workspace-tasks", + "attributes": { + "enforcement-level": options.enforcement_level, + }, + "relationships": { + "task": {"data": {"type": "tasks", "id": options.run_task.id}} + }, + } + } + + if options.stages is not None: + body["data"]["attributes"]["stages"] = options.stages + + path = f"/api/v2/workspaces/{quote(workspace_id)}/tasks" + response = self.t.request("POST", path, json_body=body) + return _workspace_run_task_from(response.json()["data"]) + + def list( + self, + workspace_id: str, + options: WorkspaceRunTaskListOptions | None = None, + ) -> Iterator[WorkspaceRunTask]: + """List all workspace run tasks for a workspace.""" + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError() + + params = options.model_dump(by_alias=True, exclude_none=True) if options else {} + path = f"/api/v2/workspaces/{quote(workspace_id)}/tasks" + for item in self._list(path, params=params): + yield _workspace_run_task_from(item) + + def read(self, workspace_id: str, workspace_task_id: str) -> WorkspaceRunTask: + """Read a workspace run task by ID.""" + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError() + if not valid_string_id(workspace_task_id): + raise InvalidWorkspaceRunTaskIDError() + + path = ( + f"/api/v2/workspaces/{quote(workspace_id)}/tasks/{quote(workspace_task_id)}" + ) + response = self.t.request("GET", path) + return _workspace_run_task_from(response.json()["data"]) + + def update( + self, + workspace_id: str, + workspace_task_id: str, + options: WorkspaceRunTaskUpdateOptions, + ) -> WorkspaceRunTask: + """Update a workspace run task by ID.""" + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError() + if not valid_string_id(workspace_task_id): + raise InvalidWorkspaceRunTaskIDError() + + attributes = options.model_dump( + by_alias=True, exclude_none=True, exclude={"type"} + ) + body: dict[str, Any] = { + "data": { + "type": "workspace-tasks", + "id": workspace_task_id, + "attributes": attributes, + } + } + + path = ( + f"/api/v2/workspaces/{quote(workspace_id)}/tasks/{quote(workspace_task_id)}" + ) + response = self.t.request("PATCH", path, json_body=body) + return _workspace_run_task_from(response.json()["data"]) + + def delete(self, workspace_id: str, workspace_task_id: str) -> None: + """Delete a workspace run task by ID.""" + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError() + if not valid_string_id(workspace_task_id): + raise InvalidWorkspaceRunTaskIDError() + + path = ( + f"/api/v2/workspaces/{quote(workspace_id)}/tasks/{quote(workspace_task_id)}" + ) + self.t.request("DELETE", path) + + +def _validate_run_task_id(run_task_id: str) -> None: + """Backward-compatible helper for direct ID validation usage in tests/callers.""" + if not valid_string_id(run_task_id): + raise InvalidRunTaskIDError() diff --git a/tests/units/test_workspace_run_task.py b/tests/units/test_workspace_run_task.py new file mode 100644 index 0000000..11f47b0 --- /dev/null +++ b/tests/units/test_workspace_run_task.py @@ -0,0 +1,207 @@ +# Copyright IBM Corp. 2025, 2026 +# SPDX-License-Identifier: MPL-2.0 + +"""Unit tests for workspace run tasks.""" + +from unittest.mock import Mock + +import pytest + +from pytfe._http import HTTPTransport +from pytfe.errors import InvalidWorkspaceIDError, InvalidWorkspaceRunTaskIDError +from pytfe.models.workspace_run_task import ( + WorkspaceRunTask, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskUpdateOptions, +) +from pytfe.resources.workspace_run_task import ( + WorkspaceRunTasks, + _workspace_run_task_from, +) + + +class TestWorkspaceRunTaskFrom: + def test_workspace_run_task_from_full(self): + data = { + "id": "wst-123", + "attributes": { + "enforcement-level": "mandatory", + "stage": "post_plan", + "stages": ["post_plan", "pre_apply"], + }, + "relationships": { + "task": {"data": {"id": "task-123", "type": "tasks"}}, + "workspace": {"data": {"id": "ws-123", "type": "workspaces"}}, + }, + } + + result = _workspace_run_task_from(data) + + assert isinstance(result, WorkspaceRunTask) + assert result.id == "wst-123" + assert result.enforcement_level == "mandatory" + assert result.stages == ["post_plan", "pre_apply"] + assert result.run_task is not None + assert result.run_task.id == "task-123" + assert result.workspace is not None + assert result.workspace.id == "ws-123" + + +class TestWorkspaceRunTasks: + @pytest.fixture + def mock_transport(self): + return Mock(spec=HTTPTransport) + + @pytest.fixture + def workspace_run_tasks_service(self, mock_transport): + return WorkspaceRunTasks(mock_transport) + + def test_create_success(self, workspace_run_tasks_service, mock_transport): + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "wst-1", + "attributes": { + "enforcement-level": "advisory", + "stages": ["post_plan"], + }, + "relationships": { + "task": {"data": {"id": "task-1", "type": "tasks"}}, + "workspace": {"data": {"id": "ws-1", "type": "workspaces"}}, + }, + } + } + mock_transport.request.return_value = mock_response + + options = WorkspaceRunTaskCreateOptions( + enforcement_level="advisory", + run_task={"id": "task-1"}, + stages=["post_plan"], + ) + + result = workspace_run_tasks_service.create("ws-1", options) + + assert isinstance(result, WorkspaceRunTask) + assert result.id == "wst-1" + call_args = mock_transport.request.call_args + assert call_args[0][0] == "POST" + assert call_args[0][1] == "/api/v2/workspaces/ws-1/tasks" + + def test_create_validation_errors(self, workspace_run_tasks_service): + options = WorkspaceRunTaskCreateOptions( + enforcement_level="advisory", + run_task={"id": "task-1"}, + ) + + with pytest.raises(InvalidWorkspaceIDError): + workspace_run_tasks_service.create("", options) + + def test_list_success(self, workspace_run_tasks_service): + workspace_run_tasks_service._list = Mock( + return_value=[ + { + "id": "wst-1", + "attributes": {"enforcement-level": "advisory", "stages": []}, + "relationships": {}, + }, + { + "id": "wst-2", + "attributes": { + "enforcement-level": "mandatory", + "stages": ["pre_apply"], + }, + "relationships": {}, + }, + ] + ) + + options = WorkspaceRunTaskListOptions(page_size=10) + items = list(workspace_run_tasks_service.list("ws-1", options)) + + workspace_run_tasks_service._list.assert_called_once_with( + "/api/v2/workspaces/ws-1/tasks", + params={"page[size]": 10}, + ) + assert len(items) == 2 + assert items[0].id == "wst-1" + assert items[1].id == "wst-2" + + def test_list_validation_error(self, workspace_run_tasks_service): + with pytest.raises(InvalidWorkspaceIDError): + list(workspace_run_tasks_service.list("")) + + def test_read_success(self, workspace_run_tasks_service, mock_transport): + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "wst-1", + "attributes": { + "enforcement-level": "advisory", + "stages": ["post_plan"], + }, + "relationships": {}, + } + } + mock_transport.request.return_value = mock_response + + result = workspace_run_tasks_service.read("ws-1", "wst-1") + + assert isinstance(result, WorkspaceRunTask) + assert result.id == "wst-1" + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/workspaces/ws-1/tasks/wst-1" + ) + + def test_read_validation_error(self, workspace_run_tasks_service): + with pytest.raises(InvalidWorkspaceIDError): + workspace_run_tasks_service.read("", "wst-1") + with pytest.raises(InvalidWorkspaceRunTaskIDError): + workspace_run_tasks_service.read("ws-1", "") + + def test_update_success(self, workspace_run_tasks_service, mock_transport): + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "wst-1", + "attributes": { + "enforcement-level": "mandatory", + "stages": ["post_plan", "pre_apply"], + }, + "relationships": {}, + } + } + mock_transport.request.return_value = mock_response + + options = WorkspaceRunTaskUpdateOptions( + enforcement_level="mandatory", + stages=["post_plan", "pre_apply"], + ) + result = workspace_run_tasks_service.update("ws-1", "wst-1", options) + + assert isinstance(result, WorkspaceRunTask) + assert result.enforcement_level == "mandatory" + call_args = mock_transport.request.call_args + assert call_args[0][0] == "PATCH" + assert call_args[0][1] == "/api/v2/workspaces/ws-1/tasks/wst-1" + + def test_update_validation_error(self, workspace_run_tasks_service): + options = WorkspaceRunTaskUpdateOptions(enforcement_level="mandatory") + + with pytest.raises(InvalidWorkspaceIDError): + workspace_run_tasks_service.update("", "wst-1", options) + with pytest.raises(InvalidWorkspaceRunTaskIDError): + workspace_run_tasks_service.update("ws-1", "", options) + + def test_delete_success(self, workspace_run_tasks_service, mock_transport): + workspace_run_tasks_service.delete("ws-1", "wst-1") + + mock_transport.request.assert_called_once_with( + "DELETE", "/api/v2/workspaces/ws-1/tasks/wst-1" + ) + + def test_delete_validation_error(self, workspace_run_tasks_service): + with pytest.raises(InvalidWorkspaceIDError): + workspace_run_tasks_service.delete("", "wst-1") + with pytest.raises(InvalidWorkspaceRunTaskIDError): + workspace_run_tasks_service.delete("ws-1", "")