From 07601acc77d81dc87fcc84ea6a923c3cea573b11 Mon Sep 17 00:00:00 2001 From: Sivaselvan32 Date: Wed, 17 Dec 2025 22:10:34 +0530 Subject: [PATCH 1/4] refactor(workspace): pydantic to validate data --- examples/workspace.py | 13 +- src/pytfe/models/__init__.py | 2 - src/pytfe/models/workspace.py | 595 +++++++++++++++++++---------- src/pytfe/resources/run_trigger.py | 8 +- src/pytfe/resources/workspaces.py | 504 ++++++++---------------- src/pytfe/utils.py | 103 +---- 6 files changed, 545 insertions(+), 680 deletions(-) diff --git a/examples/workspace.py b/examples/workspace.py index 4dfb6435..21c23f71 100644 --- a/examples/workspace.py +++ b/examples/workspace.py @@ -1,15 +1,6 @@ """ Terraform Cloud/Enterprise Workspace Management Example -This comprehensive example demonstrates 38 workspace operations using the python-tfe SDK, -providing a complete command-line interface for managing TFE workspaces with advanced -operations including create, read, update, delete, lock/unlock, tag management, VCS -integration, SSH keys, remote state, data retention, and filtering capabilities. - -API Coverage: 38/38 workspace methods (100% coverage) -Testing Status: All operations tested and validated -Organization: Logically grouped into 16 sections for easy navigation - Prerequisites: - Set TFE_TOKEN environment variable with your Terraform Cloud API token - Ensure you have access to the target organization @@ -133,7 +124,6 @@ def main(): parser.add_argument("--all-tests", action="store_true", help="Run all method tests") # Listing and Filtering - parser.add_argument("--page", type=int, default=1, help="Page number for listing") parser.add_argument( "--page-size", type=int, default=10, help="Page size for listing" ) @@ -154,7 +144,6 @@ def main(): try: # Create options for listing workspaces with pagination and filters options = WorkspaceListOptions( - page_number=args.page, page_size=args.page_size, search=args.search, tags=args.tags, @@ -163,7 +152,7 @@ def main(): project_id=args.project_id, ) print( - f"Fetching workspaces from organization '{args.org}' (page {args.page}, size {args.page_size})..." + f"Fetching workspaces from organization '{args.org}', size {args.page_size})..." ) # Get workspaces and convert to list safely diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index 71faee66..fa03b2db 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -352,7 +352,6 @@ WorkspaceAssignSSHKeyOptions, WorkspaceCreateOptions, WorkspaceIncludeOpt, - WorkspaceList, WorkspaceListOptions, WorkspaceListRemoteStateConsumersOptions, WorkspaceLockOptions, @@ -534,7 +533,6 @@ "WorkspaceAssignSSHKeyOptions", "WorkspaceCreateOptions", "WorkspaceIncludeOpt", - "WorkspaceList", "WorkspaceListOptions", "WorkspaceListRemoteStateConsumersOptions", "WorkspaceLockOptions", diff --git a/src/pytfe/models/workspace.py b/src/pytfe/models/workspace.py index dca54b0f..64391634 100644 --- a/src/pytfe/models/workspace.py +++ b/src/pytfe/models/workspace.py @@ -2,73 +2,175 @@ from datetime import datetime from enum import Enum -from typing import Any +from typing import TYPE_CHECKING, Any + +from pydantic import BaseModel, ConfigDict, Field, model_validator + +from ..errors import ( + InvalidNameError, + RequiredAgentModeError, + RequiredAgentPoolIDError, + RequiredNameError, + UnsupportedBothTagsRegexAndFileTriggersEnabledError, + UnsupportedBothTagsRegexAndTriggerPatternsError, + UnsupportedBothTagsRegexAndTriggerPrefixesError, + UnsupportedBothTriggerPatternsAndPrefixesError, + UnsupportedOperationsError, +) +from ..utils import has_tags_regex_defined, is_valid_workspace_name, valid_string +from .agent import AgentPool +from .common import EffectiveTagBinding, Tag, TagBinding +from .data_retention_policy import DataRetentionPolicyChoice +from .organization import ExecutionMode, Organization +from .project import Project -from pydantic import BaseModel, Field +if TYPE_CHECKING: + from .run import Run -from .common import EffectiveTagBinding, Pagination, Tag, TagBinding -from .data_retention_policy import DataRetentionPolicy, DataRetentionPolicyChoice -from .organization import ExecutionMode -from .project import Project + +# Helper classes that need to be defined before Workspace +class WorkspaceSource(str, Enum): + API = "tfe-api" + MODULE = "tfe-module" + UI = "tfe-ui" + TERRAFORM = "terraform" + + +class WorkspaceActions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + is_destroyable: bool = Field(default=False, alias="is-destroyable") + + +class WorkspacePermissions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + can_destroy: bool = Field(default=False, alias="can-destroy") + can_force_unlock: bool = Field(default=False, alias="can-force-unlock") + can_lock: bool = Field(default=False, alias="can-lock") + can_manage_run_tasks: bool = Field(default=False, alias="can-manage-run-tasks") + can_queue_apply: bool = Field(default=False, alias="can-queue-apply") + can_queue_destroy: bool = Field(default=False, alias="can-queue-destroy") + can_queue_run: bool = Field(default=False, alias="can-queue-run") + can_read_settings: bool = Field(default=False, alias="can-read-settings") + can_unlock: bool = Field(default=False, alias="can-unlock") + can_update: bool = Field(default=False, alias="can-update") + can_update_variable: bool = Field(default=False, alias="can-update-variable") + can_force_delete: bool | None = Field(default=None, alias="can-force-delete") + + +class WorkspaceSettingOverwrites(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + execution_mode: bool | None = Field(None, alias="execution-mode") + agent_pool: bool | None = Field(None, alias="agent-pool") + + +class WorkspaceOutputs(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + id: str + name: str | None = Field(default=None, alias="name") + sensitive: bool = Field(default=False, alias="sensitive") + output_type: str | None = Field(default=None, alias="output-type") + value: Any | None = Field(default=None, alias="value") + + +class LockedByChoice(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + run: Any | None = None + user: Any | None = None + team: Any | None = None + + +class VCSRepo(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + branch: str | None = Field(default=None, alias="branch") + display_identifier: str | None = Field(default=None, alias="display-identifier") + identifier: str | None = Field(default=None, alias="identifier") + ingress_submodules: bool | None = Field(default=None, alias="ingress-submodules") + oauth_token_id: str | None = Field(default=None, alias="oauth-token-id") + tags_regex: str | None = Field(default=None, alias="tags-regex") + gha_installation_id: str | None = Field( + default=None, alias="github-app-installation-id" + ) + repository_http_url: str | None = Field(default=None, alias="repository-http-url") + service_provider: str | None = Field(default=None, alias="service-provider") + tags: bool | None = Field(default=None, alias="tags") + webhook_url: str | None = Field(default=None, alias="webhook-url") + tag_prefix: str | None = Field(default=None, alias="tag-prefix") + source_directory: str | None = Field(default=None, alias="source-directory") class Workspace(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + id: str - name: str | None = None - organization: str | None = None - execution_mode: ExecutionMode | None = None - project_id: str | None = None + name: str | None = Field(None, alias="name") # Core attributes - actions: WorkspaceActions | None = None - allow_destroy_plan: bool = False - assessments_enabled: bool = False - auto_apply: bool = False - auto_apply_run_trigger: bool = False - auto_destroy_at: datetime | None = None - auto_destroy_activity_duration: str | None = None - can_queue_destroy_plan: bool = False - created_at: datetime | None = None - description: str = "" - environment: str = "" - file_triggers_enabled: bool = False - global_remote_state: bool = False - inherits_project_auto_destroy: bool = False - locked: bool = False - migration_environment: str = "" - no_code_upgrade_available: bool = False - operations: bool = False - permissions: WorkspacePermissions | None = None - queue_all_runs: bool = False - speculative_enabled: bool = False - source: WorkspaceSource | None = None - source_name: str = "" - source_url: str = "" - structured_run_output_enabled: bool = False - terraform_version: str = "" - trigger_prefixes: list[str] = Field(default_factory=list) - trigger_patterns: list[str] = Field(default_factory=list) - vcs_repo: VCSRepo | None = None - working_directory: str = "" - updated_at: datetime | None = None - resource_count: int = 0 - apply_duration_average: float | None = None # in seconds - plan_duration_average: float | None = None # in seconds - policy_check_failures: int = 0 - run_failures: int = 0 - runs_count: int = 0 - tag_names: list[str] = Field(default_factory=list) - setting_overwrites: WorkspaceSettingOverwrites | None = None + actions: WorkspaceActions | None = Field(None, alias="actions") + allow_destroy_plan: bool | None = Field(None, alias="allow-destroy-plan") + assessments_enabled: bool | None = Field(None, alias="assessments-enabled") + auto_apply: bool | None = Field(None, alias="auto-apply") + auto_apply_run_trigger: bool | None = Field(None, alias="auto-apply-run-trigger") + auto_destroy_at: datetime | None = Field(None, alias="auto-destroy-at") + auto_destroy_activity_duration: str | None = Field( + None, alias="auto-destroy-activity-duration" + ) + can_queue_destroy_plan: bool | None = Field(None, alias="can-queue-destroy-plan") + created_at: datetime | None = Field(None, alias="created-at") + description: str | None = Field(None, alias="description") + environment: str | None = Field(None, alias="environment") + execution_mode: ExecutionMode | None = Field(None, alias="execution-mode") + file_triggers_enabled: bool | None = Field(None, alias="file-triggers-enabled") + global_remote_state: bool | None = Field(None, alias="global-remote-state") + inherits_project_auto_destroy: bool | None = Field( + None, alias="inherits-project-auto-destroy" + ) + locked: bool | None = Field(None, alias="locked") + migration_environment: str | None = Field(None, alias="migration-environment") + no_code_upgrade_available: bool | None = Field( + None, alias="no-code-upgrade-available" + ) + operations: bool | None = Field(None, alias="operations") + permissions: WorkspacePermissions | None = Field(None, alias="permissions") + queue_all_runs: bool | None = Field(None, alias="queue-all-runs") + speculative_enabled: bool | None = Field(None, alias="speculative-enabled") + source: WorkspaceSource | None = Field(None, alias="source") + source_name: str | None = Field(None, alias="source-name") + source_url: str | None = Field(None, alias="source-url") + structured_run_output_enabled: bool | None = Field( + None, alias="structured-run-output-enabled" + ) + terraform_version: str | None = Field(None, alias="terraform-version") + trigger_prefixes: list[str] = Field(default_factory=list, alias="trigger-prefixes") + trigger_patterns: list[str] = Field(default_factory=list, alias="trigger-patterns") + vcs_repo: VCSRepo | None = Field(None, alias="vcs-repo") + working_directory: str | None = Field(None, alias="working-directory") + updated_at: datetime | None = Field(None, alias="updated-at") + resource_count: int | None = Field(None, alias="resource-count") + apply_duration_average: float | None = Field(None, alias="apply-duration-average") + plan_duration_average: float | None = Field(None, alias="plan-duration-average") + policy_check_failures: int | None = Field(None, alias="policy-check-failures") + run_failures: int | None = Field(None, alias="run-failures") + runs_count: int | None = Field(None, alias="workspace-kpis-runs-count") + tag_names: list[str] = Field(default_factory=list, alias="tag-names") + setting_overwrites: WorkspaceSettingOverwrites | None = Field( + None, alias="setting-overwrites" + ) # Relations - agent_pool: Any | None = None # AgentPool object - current_run: Any | None = None # Run object + agent_pool: AgentPool | None = None # AgentPool object + current_run: Run | None = None # Run object current_state_version: Any | None = None # StateVersion object + organization: Organization | None = None project: Project | None = None ssh_key: Any | None = None # SSHKey object outputs: list[WorkspaceOutputs] = Field(default_factory=list) tags: list[Tag] = Field(default_factory=list) - # tags: list[Tag] = Field(default_factory=list) current_configuration_version: Any | None = None # ConfigurationVersion object locked_by: LockedByChoice | None = None variables: list[Any] = Field(default_factory=list) # Variable objects @@ -76,8 +178,9 @@ class Workspace(BaseModel): effective_tag_bindings: list[EffectiveTagBinding] = Field(default_factory=list) # Links - links: dict[str, Any] = Field(default_factory=dict) - data_retention_policy: DataRetentionPolicy | None = None + links: dict[str, Any] | None = Field(None, alias="links") + + data_retention_policy: Any | None = None # Legacy field, deprecated data_retention_policy_choice: DataRetentionPolicyChoice | None = None @@ -99,190 +202,258 @@ class WorkspaceIncludeOpt(str, Enum): PROJECT = "project" -class WorkspaceSource(str, Enum): - API = "tfe-api" - MODULE = "tfe-module" - UI = "tfe-ui" - TERRAFORM = "terraform" - - -class WorkspaceActions(BaseModel): - is_destroyable: bool = False - - -class WorkspacePermissions(BaseModel): - can_destroy: bool = False - can_force_unlock: bool = False - can_lock: bool = False - can_manage_run_tasks: bool = False - can_queue_apply: bool = False - can_queue_destroy: bool = False - can_queue_run: bool = False - can_read_settings: bool = False - can_unlock: bool = False - can_update: bool = False - can_update_variable: bool = False - can_force_delete: bool | None = None - - -class WorkspaceSettingOverwrites(BaseModel): - execution_mode: bool | None = None - agent_pool: bool | None = None - - -class WorkspaceOutputs(BaseModel): - id: str - name: str - sensitive: bool = False - output_type: str - value: Any | None = None - - -class LockedByChoice(BaseModel): - run: Any | None = None - user: Any | None = None - team: Any | None = None - - class WorkspaceListOptions(BaseModel): """Options for listing workspaces.""" - # Pagination options (from ListOptions) - page_number: int | None = None - page_size: int | None = None + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) - # Search and filter options - search: str | None = None # search[name] - partial workspace name - tags: str | None = None # search[tags] - comma-separated tag names - exclude_tags: str | None = ( - None # search[exclude-tags] - comma-separated tag names to exclude - ) - wildcard_name: str | None = None # search[wildcard-name] - substring matching - project_id: str | None = None # filter[project][id] - project ID filter - current_run_status: str | None = ( - None # filter[current-run][status] - run status filter - ) + page_size: int | None = Field(None, alias="page[size]") + search: str | None = Field(None, alias="search[name]") + tags: str | None = Field(None, alias="search[tags]") + exclude_tags: str | None = Field(None, alias="search[exclude-tags]") + wildcard_name: str | None = Field(None, alias="search[wildcard-name]") + project_id: str | None = Field(None, alias="filter[project][id]") + current_run_status: str | None = Field(None, alias="filter[current-run][status]") - # Tag binding filters (not URL encoded, handled specially) tag_bindings: list[TagBinding] = Field(default_factory=list) # Include related resources - include: list[WorkspaceIncludeOpt] = Field(default_factory=list) + include: list[WorkspaceIncludeOpt] | None = Field(None, alias="include") # Sorting options - sort: str | None = ( - None # "name" (default) or "current-run.created-at", prepend "-" to reverse - ) + sort: str | None = Field(None, alias="sort") class WorkspaceReadOptions(BaseModel): - include: list[WorkspaceIncludeOpt] = Field(default_factory=list) + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + include: list[WorkspaceIncludeOpt] | None = Field(None, alias="include") class WorkspaceCreateOptions(BaseModel): - name: str - type: str = "workspaces" - agent_pool_id: str | None = None - allow_destroy_plan: bool | None = None - assessments_enabled: bool | None = None - auto_apply: bool | None = None - auto_apply_run_trigger: bool | None = None - auto_destroy_at: datetime | None = None - auto_destroy_activity_duration: str | None = None - inherits_project_auto_destroy: bool | None = None - description: str | None = None - execution_mode: ExecutionMode | None = None - file_triggers_enabled: bool | None = None - global_remote_state: bool | None = None - migration_environment: str | None = None - operations: bool | None = None - queue_all_runs: bool | None = None - speculative_enabled: bool | None = None - source_name: str | None = None - source_url: str | None = None - structured_run_output_enabled: bool | None = None - terraform_version: str | None = None - trigger_prefixes: list[str] = Field(default_factory=list) - trigger_patterns: list[str] = Field(default_factory=list) - vcs_repo: VCSRepo | None = None - working_directory: str | None = None - hyok_enabled: bool | None = None - tags: list[Tag] = Field(default_factory=list) - setting_overwrites: WorkspaceSettingOverwrites | None = None - project: Project | None = None - tag_bindings: list[TagBinding] = Field(default_factory=list) + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + name: str = Field(alias="name") + type: str = Field(default="workspaces") + agent_pool_id: str | None = Field(None, alias="agent-pool-id") + allow_destroy_plan: bool | None = Field(None, alias="allow-destroy-plan") + assessments_enabled: bool | None = Field(None, alias="assessments-enabled") + auto_apply: bool | None = Field(None, alias="auto-apply") + auto_apply_run_trigger: bool | None = Field(None, alias="auto-apply-run-trigger") + auto_destroy_at: datetime | None = Field(None, alias="auto-destroy-at") + auto_destroy_activity_duration: str | None = Field( + None, alias="auto-destroy-activity-duration" + ) + inherits_project_auto_destroy: bool | None = Field( + None, alias="inherits-project-auto-destroy" + ) + description: str | None = Field(None, alias="description") + execution_mode: ExecutionMode | None = Field(None, alias="execution-mode") + file_triggers_enabled: bool | None = Field(None, alias="file-triggers-enabled") + global_remote_state: bool | None = Field(None, alias="global-remote-state") + migration_environment: str | None = Field(None, alias="migration-environment") + operations: bool | None = Field(None, alias="operations") + queue_all_runs: bool | None = Field(None, alias="queue-all-runs") + speculative_enabled: bool | None = Field(None, alias="speculative-enabled") + source_name: str | None = Field(None, alias="source-name") + source_url: str | None = Field(None, alias="source-url") + structured_run_output_enabled: bool | None = Field( + None, alias="structured-run-output-enabled" + ) + terraform_version: str | None = Field(None, alias="terraform-version") + trigger_prefixes: list[str] | None = Field(None, alias="trigger-prefixes") + trigger_patterns: list[str] | None = Field(None, alias="trigger-patterns") + vcs_repo: VCSRepoOptions | None = Field(None, alias="vcs-repo") + working_directory: str | None = Field(None, alias="working-directory") + hyok_enabled: bool | None = Field(None, alias="hyok-enabled") + setting_overwrites: WorkspaceSettingOverwrites | None = Field( + None, alias="setting-overwrites" + ) + project: Project | None = Field(None, alias="project") + tag_bindings: list[TagBinding] | None = Field(None, alias="tag-bindings") + + @model_validator(mode="after") + def valid(self) -> WorkspaceCreateOptions: + """ + Validate workspace create options for proper API usage. + Raises specific validation errors if validation fails. + """ + # Check required name + if not valid_string(self.name): + raise RequiredNameError() + + # Check name format + if not is_valid_workspace_name(self.name): + raise InvalidNameError() + + # Check operations and execution mode conflict + if self.operations is not None and self.execution_mode is not None: + raise UnsupportedOperationsError() + + # Check agent mode requirements + if self.agent_pool_id is not None and ( + self.execution_mode is None or self.execution_mode != "agent" + ): + raise RequiredAgentModeError() + + if ( + self.agent_pool_id is None + and self.execution_mode is not None + and self.execution_mode == "agent" + ): + raise RequiredAgentPoolIDError() + + # Check trigger patterns and prefixes conflict + if ( + self.trigger_prefixes + and len(self.trigger_prefixes) > 0 + and self.trigger_patterns + and len(self.trigger_patterns) > 0 + ): + raise UnsupportedBothTriggerPatternsAndPrefixesError() + + # Check tags regex conflicts + if has_tags_regex_defined(self.vcs_repo): + if self.trigger_patterns and len(self.trigger_patterns) > 0: + raise UnsupportedBothTagsRegexAndTriggerPatternsError() + + if self.trigger_prefixes and len(self.trigger_prefixes) > 0: + raise UnsupportedBothTagsRegexAndTriggerPrefixesError() + + if self.file_triggers_enabled is not None and self.file_triggers_enabled: + raise UnsupportedBothTagsRegexAndFileTriggersEnabledError() + + return self -class WorkspaceUpdateOptions(BaseModel): - name: str - type: str = "workspaces" - agent_pool_id: str | None = None - allow_destroy_plan: bool | None = None - assessments_enabled: bool | None = None - auto_apply: bool | None = None - auto_apply_run_trigger: bool | None = None - auto_destroy_at: datetime | None = None - auto_destroy_activity_duration: str | None = None - inherits_project_auto_destroy: bool | None = None - description: str | None = None - execution_mode: ExecutionMode | None = None - file_triggers_enabled: bool | None = None - global_remote_state: bool | None = None - operations: bool | None = None - queue_all_runs: bool | None = None - speculative_enabled: bool | None = None - structured_run_output_enabled: bool | None = None - terraform_version: str | None = None - trigger_prefixes: list[str] = Field(default_factory=list) - trigger_patterns: list[str] = Field(default_factory=list) - vcs_repo: VCSRepo | None = None - working_directory: str | None = None - hyok_enabled: bool | None = None - setting_overwrites: WorkspaceSettingOverwrites | None = None - project: Project | None = None - tag_bindings: list[TagBinding] = Field(default_factory=list) +class WorkspaceUpdateOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) -class WorkspaceList(BaseModel): - items: list[Workspace] = Field(default_factory=list) - pagination: Pagination | None = None + name: str | None = Field(None, alias="name") + type: str = "workspaces" + agent_pool_id: str | None = Field(None, alias="agent-pool-id") + allow_destroy_plan: bool | None = Field(None, alias="allow-destroy-plan") + assessments_enabled: bool | None = Field(None, alias="assessments-enabled") + auto_apply: bool | None = Field(None, alias="auto-apply") + auto_apply_run_trigger: bool | None = Field(None, alias="auto-apply-run-trigger") + auto_destroy_at: datetime | None = Field(None, alias="auto-destroy-at") + auto_destroy_activity_duration: str | None = Field( + None, alias="auto-destroy-activity-duration" + ) + inherits_project_auto_destroy: bool | None = Field( + None, alias="inherits-project-auto-destroy" + ) + description: str | None = Field(None, alias="description") + execution_mode: ExecutionMode | None = Field(None, alias="execution-mode") + file_triggers_enabled: bool | None = Field(None, alias="file-triggers-enabled") + global_remote_state: bool | None = Field(None, alias="global-remote-state") + operations: bool | None = Field(None, alias="operations") + queue_all_runs: bool | None = Field(None, alias="queue-all-runs") + speculative_enabled: bool | None = Field(None, alias="speculative-enabled") + structured_run_output_enabled: bool | None = Field( + None, alias="structured-run-output-enabled" + ) + terraform_version: str | None = Field(None, alias="terraform-version") + trigger_prefixes: list[str] | None = Field(None, alias="trigger-prefixes") + trigger_patterns: list[str] | None = Field(None, alias="trigger-patterns") + vcs_repo: VCSRepoOptions | None = Field(None, alias="vcs-repo") + working_directory: str | None = Field(None, alias="working-directory") + hyok_enabled: bool | None = Field(None, alias="hyok-enabled") + setting_overwrites: WorkspaceSettingOverwrites | None = Field( + None, alias="setting-overwrites" + ) + project: Project | None = Field(None, alias="project") + tag_bindings: list[TagBinding] | None = Field(None, alias="tag-bindings") + + @model_validator(mode="after") + def valid(self) -> WorkspaceUpdateOptions: + """ + Validate workspace update options for proper API usage. + Raises specific validation errors if validation fails. + """ + # Check name format if provided + if self.name is not None and not is_valid_workspace_name(self.name): + raise InvalidNameError() + + # Check operations and execution mode conflict + if self.operations is not None and self.execution_mode is not None: + raise UnsupportedOperationsError() + + # Check agent mode requirements + if ( + self.agent_pool_id is None + and self.execution_mode is not None + and self.execution_mode == "agent" + ): + raise RequiredAgentPoolIDError() + + # Check trigger patterns and prefixes conflict + if ( + self.trigger_prefixes + and len(self.trigger_prefixes) > 0 + and self.trigger_patterns + and len(self.trigger_patterns) > 0 + ): + raise UnsupportedBothTriggerPatternsAndPrefixesError() + + # Check tags regex conflicts + if has_tags_regex_defined(self.vcs_repo): + if self.trigger_patterns and len(self.trigger_patterns) > 0: + raise UnsupportedBothTagsRegexAndTriggerPatternsError() + + if self.trigger_prefixes and len(self.trigger_prefixes) > 0: + raise UnsupportedBothTagsRegexAndTriggerPrefixesError() + + if self.file_triggers_enabled is not None and self.file_triggers_enabled: + raise UnsupportedBothTagsRegexAndFileTriggersEnabledError() + + return self class WorkspaceRemoveVCSConnectionOptions(BaseModel): """Options for removing VCS connection from a workspace.""" + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + id: str - vcs_repo: VCSRepoOptions | None = None + vcs_repo: VCSRepoOptions = Field(alias="vcs-repo") class WorkspaceLockOptions(BaseModel): """Options for locking a workspace.""" + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + # Specifies the reason for locking the workspace. - reason: str + reason: str | None = Field(None, alias="reason") class WorkspaceAssignSSHKeyOptions(BaseModel): """Options for assigning an SSH key to a workspace.""" - ssh_key_id: str - type: str = "workspaces" + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + ssh_key_id: str = Field(alias="id") + type: str = Field(default="workspaces") class workspaceUnassignSSHKeyOptions(BaseModel): """Options for unassigning an SSH key from a workspace.""" + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + # Must be nil to unset the currently assigned SSH key. - ssh_key_id: str - type: str = "workspaces" + ssh_key_id: str = Field(alias="id") + type: str = Field(default="workspaces") class WorkspaceListRemoteStateConsumersOptions(BaseModel): """Options for listing remote state consumers of a workspace.""" - # Pagination options (from ListOptions) - page_number: int | None = None - page_size: int | None = None + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + page_size: int | None = Field(None, alias="page[size]") class WorkspaceAddRemoteStateConsumersOptions(BaseModel): @@ -306,10 +477,10 @@ class WorkspaceUpdateRemoteStateConsumersOptions(BaseModel): class WorkspaceTagListOptions(BaseModel): """Options for listing tags of a workspace.""" - # Pagination options (from ListOptions) - page_number: int | None = None - page_size: int | None = None - query: str | None = None + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + page_size: int | None = Field(None, alias="page[size]") + query: str | None = Field(None, alias="name") class WorkspaceAddTagsOptions(BaseModel): @@ -330,19 +501,27 @@ class WorkspaceAddTagBindingsOptions(BaseModel): tag_bindings: list[TagBinding] = Field(default_factory=list) -class VCSRepo(BaseModel): - branch: str | None = None - identifier: str | None = None - ingress_submodules: bool | None = None - oauth_token_id: str | None = None - tags_regex: str | None = None - gha_installation_id: str | None = None +class VCSRepoOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + branch: str | None = Field(None, alias="branch") + identifier: str | None = Field(None, alias="identifier") + ingress_submodules: bool | None = Field(None, alias="ingress-submodules") + oauth_token_id: str | None = Field(None, alias="oauth-token-id") + tags_regex: str | None = Field(None, alias="tags-regex") + gha_installation_id: str | None = Field(None, alias="github-app-installation-id") -class VCSRepoOptions(BaseModel): - branch: str | None = None - identifier: str | None = None - ingress_submodules: bool | None = None - oauth_token_id: str | None = None - tags_regex: str | None = None - gha_installation_id: str | None = None + +# Rebuild Workspace model after all dependencies are defined +def _rebuild_workspace_model() -> None: + """Rebuild Workspace model to resolve forward references.""" + try: + from .run import Run # noqa: F401 + + Workspace.model_rebuild() + except ImportError: + # Run model not yet available, will be rebuilt later + pass + + +_rebuild_workspace_model() diff --git a/src/pytfe/resources/run_trigger.py b/src/pytfe/resources/run_trigger.py index 3a16d765..0059dd45 100644 --- a/src/pytfe/resources/run_trigger.py +++ b/src/pytfe/resources/run_trigger.py @@ -48,11 +48,11 @@ def _run_trigger_from(d: dict[str, Any], org: str | None = None) -> RunTrigger: sourceable_id = sourceable_rel["data"].get("id", "") # Create workspace objects with proper IDs - workspace = Workspace( - id=workspace_id, name=workspace_name_str, organization=org or "" + workspace = Workspace.model_validate( + {"id": workspace_id, "name": workspace_name_str, "organization": org} ) - sourceable = Workspace( - id=sourceable_id, name=sourceable_name_str, organization=org or "" + sourceable = Workspace.model_validate( + {"id": sourceable_id, "name": sourceable_name_str, "organization": org} ) sourceable_choice = SourceableChoice( workspace=sourceable diff --git a/src/pytfe/resources/workspaces.py b/src/pytfe/resources/workspaces.py index 81b49f5f..1b6f2478 100644 --- a/src/pytfe/resources/workspaces.py +++ b/src/pytfe/resources/workspaces.py @@ -29,6 +29,8 @@ DataRetentionPolicyDontDelete, DataRetentionPolicySetOptions, ) +from ..models.organization import Organization +from ..models.project import Project from ..models.workspace import ( ExecutionMode, LockedByChoice, @@ -48,19 +50,14 @@ WorkspaceReadOptions, WorkspaceRemoveRemoteStateConsumersOptions, WorkspaceRemoveTagsOptions, - WorkspaceRemoveVCSConnectionOptions, WorkspaceSettingOverwrites, - WorkspaceSource, WorkspaceTagListOptions, WorkspaceUpdateOptions, WorkspaceUpdateRemoteStateConsumersOptions, ) from ..utils import ( - _safe_str, valid_string, valid_string_id, - validate_workspace_create_options, - validate_workspace_update_options, ) from ._base import _Service @@ -73,190 +70,116 @@ def _em_safe(v: Any) -> ExecutionMode | None: return result if isinstance(result, ExecutionMode) else None -def _ws_from(d: dict[str, Any], org: str | None = None) -> Workspace: +def _ws_from(d: dict[str, Any]) -> Workspace: attr: dict[str, Any] = d.get("attributes", {}) or {} - - # Coerce to required string fields (empty string fallback keeps mypy happy) - id_str: str = _safe_str(d.get("id")) - name_str: str = _safe_str(attr.get("name")) - org_str: str = _safe_str(org if org is not None else attr.get("organization")) + relationships: dict[str, Any] = d.get("relationships", {}) or {} # Optional fields em: ExecutionMode | None = _em_safe(attr.get("execution-mode")) - proj_id: str | None = None - proj = attr.get("project") - if isinstance(proj, dict): - proj_id = proj.get("id") if isinstance(proj.get("id"), str) else None - - # Enhanced field mapping - tags_val = attr.get("tags", []) or [] - tags_list: builtins.list[Tag] = [] - if isinstance(tags_val, builtins.list): - for tag_item in tags_val: - if isinstance(tag_item, dict): - tags_list.append( - Tag(id=tag_item.get("id"), name=tag_item.get("name", "")) - ) - elif isinstance(tag_item, str): - tags_list.append(Tag(name=tag_item)) - - # Map additional attributes actions = None if attr.get("actions"): - actions = WorkspaceActions( - is_destroyable=attr["actions"].get("is-destroyable", False) - ) + actions = WorkspaceActions.model_validate(attr["actions"]) permissions = None if attr.get("permissions"): - perm_attr = attr["permissions"] - permissions = WorkspacePermissions( - can_destroy=perm_attr.get("can-destroy", False), - can_force_unlock=perm_attr.get("can-force-unlock", False), - can_lock=perm_attr.get("can-lock", False), - can_manage_run_tasks=perm_attr.get("can-manage-run-tasks", False), - can_queue_apply=perm_attr.get("can-queue-apply", False), - can_queue_destroy=perm_attr.get("can-queue-destroy", False), - can_queue_run=perm_attr.get("can-queue-run", False), - can_read_settings=perm_attr.get("can-read-settings", False), - can_unlock=perm_attr.get("can-unlock", False), - can_update=perm_attr.get("can-update", False), - can_update_variable=perm_attr.get("can-update-variable", False), - can_force_delete=perm_attr.get("can-force-delete"), - ) + permissions = WorkspacePermissions.model_validate(attr["permissions"]) setting_overwrites = None if attr.get("setting-overwrites"): - so_attr = attr["setting-overwrites"] - setting_overwrites = WorkspaceSettingOverwrites( - execution_mode=so_attr.get("execution-mode"), - agent_pool=so_attr.get("agent-pool"), + setting_overwrites = WorkspaceSettingOverwrites.model_validate( + attr["setting-overwrites"] ) # Map VCS repo vcs_repo = None if attr.get("vcs-repo"): - vcs_attr = attr["vcs-repo"] - vcs_repo = VCSRepo( - branch=vcs_attr.get("branch"), - identifier=vcs_attr.get("identifier"), - ingress_submodules=vcs_attr.get("ingress-submodules"), - oauth_token_id=vcs_attr.get("oauth-token-id"), - gha_installation_id=vcs_attr.get("github-app-installation-id"), - ) + vcs_repo = VCSRepo.model_validate(attr["vcs-repo"]) # Map locked_by choice locked_by = None - if d.get("relationships", {}).get("locked-by"): - lb_data = d["relationships"]["locked-by"]["data"] + if relationships.get("locked-by", {}).get("data"): + lb_data = relationships["locked-by"]["data"] if lb_data: - locked_by = LockedByChoice( - run=lb_data.get("run"), - user=lb_data.get("user"), - team=lb_data.get("team"), - ) + if lb_data.get("type") == "runs": + locked_by = LockedByChoice.model_validate({"run": lb_data.get("id")}) + elif lb_data.get("type") == "users": + locked_by = LockedByChoice.model_validate({"user": lb_data.get("id")}) + elif lb_data.get("type") == "teams": + locked_by = LockedByChoice.model_validate({"team": lb_data.get("id")}) # Map outputs outputs = [] - if d.get("relationships", {}).get("outputs"): - for output_data in d["relationships"]["outputs"].get("data", []): - outputs.append( - WorkspaceOutputs( - id=output_data.get("id", ""), - name=output_data.get("attributes", {}).get("name", ""), - sensitive=output_data.get("attributes", {}).get("sensitive", False), - output_type=output_data.get("attributes", {}).get( - "output-type", "" - ), - value=output_data.get("attributes", {}).get("value"), - ) - ) + if relationships.get("outputs", {}).get("data"): + for output_data in relationships["outputs"].get("data", []): + output_attrs = output_data.get("attributes", {}) + output_attrs["id"] = output_data.get("id", "") + outputs.append(WorkspaceOutputs.model_validate(output_attrs)) data_retention_policy_choice: DataRetentionPolicyChoice | None = None - if d.get("relationships", {}).get("data-retention-policy-choice"): - drp_data = d["relationships"]["data-retention-policy-choice"]["data"] + if relationships.get("data-retention-policy-choice", {}).get("data"): + drp_data = relationships["data-retention-policy-choice"]["data"] if drp_data: if drp_data.get("type") == "data-retention-policy-delete-olders": - data_retention_policy_choice = DataRetentionPolicyChoice( - data_retention_policy_delete_older=DataRetentionPolicyDeleteOlder( - id=drp_data.get("id"), - delete_older_than_n_days=drp_data.get("attributes", {}).get( - "delete-older-than-n-days", 0 - ), + data_retention_policy_delete_older = ( + DataRetentionPolicyDeleteOlder.model_validate( + { + "id": drp_data.get("id"), + "delete-older-than-n-days": drp_data.get( + "attributes", {} + ).get("delete-older-than-n-days", 0), + } ) ) + data_retention_policy_choice = DataRetentionPolicyChoice.model_validate( + data_retention_policy_delete_older + ) elif drp_data.get("type") == "data-retention-policy-dont-deletes": - data_retention_policy_choice = DataRetentionPolicyChoice( - data_retention_policy_dont_delete=DataRetentionPolicyDontDelete( - id=drp_data.get("id") + data_retention_policy_dont_delete = ( + DataRetentionPolicyDontDelete.model_validate( + {"id": drp_data.get("id")} ) ) + data_retention_policy_choice = DataRetentionPolicyChoice.model_validate( + data_retention_policy_dont_delete + ) elif drp_data.get("type") == "data-retention-policies": # Legacy data retention policy - data_retention_policy_choice = DataRetentionPolicyChoice( - data_retention_policy=DataRetentionPolicy( - id=drp_data.get("id"), - delete_older_than_n_days=drp_data.get("attributes", {}).get( + data_retention_policy = DataRetentionPolicy.model_validate( + { + "id": drp_data.get("id"), + "delete-older-than-n-days": drp_data.get("attributes", {}).get( "delete-older-than-n-days", 0 ), - ) + } + ) + data_retention_policy_choice = DataRetentionPolicyChoice.model_validate( + data_retention_policy ) - return Workspace( - id=id_str, - name=name_str, - organization=org_str, - execution_mode=em, - project_id=proj_id, - tags=tags_list, - # Core attributes - actions=actions, - allow_destroy_plan=attr.get("allow-destroy-plan", False), - assessments_enabled=attr.get("assessments-enabled", False), - auto_apply=attr.get("auto-apply", False), - auto_apply_run_trigger=attr.get("auto-apply-run-trigger", False), - auto_destroy_at=attr.get("auto-destroy-at"), - auto_destroy_activity_duration=attr.get("auto-destroy-activity-duration"), - can_queue_destroy_plan=attr.get("can-queue-destroy-plan", False), - created_at=attr.get("created-at"), - description=attr.get("description") or "", - environment=attr.get("environment", ""), - file_triggers_enabled=attr.get("file-triggers-enabled", False), - global_remote_state=attr.get("global-remote-state", False), - inherits_project_auto_destroy=attr.get("inherits-project-auto-destroy", False), - locked=attr.get("locked", False), - migration_environment=attr.get("migration-environment", ""), - no_code_upgrade_available=attr.get("no-code-upgrade-available", False), - operations=attr.get("operations", False), - permissions=permissions, - queue_all_runs=attr.get("queue-all-runs", False), - speculative_enabled=attr.get("speculative-enabled", False), - source=WorkspaceSource(attr.get("source")) if attr.get("source") else None, - source_name=attr.get("source-name") or "", - source_url=attr.get("source-url") or "", - structured_run_output_enabled=attr.get("structured-run-output-enabled", False), - terraform_version=attr.get("terraform-version") or "", - trigger_prefixes=attr.get("trigger-prefixes", []), - trigger_patterns=attr.get("trigger-patterns", []), - vcs_repo=vcs_repo, - working_directory=attr.get("working-directory") or "", - updated_at=attr.get("updated-at"), - resource_count=attr.get("resource-count", 0), - apply_duration_average=attr.get("apply-duration-average"), - plan_duration_average=attr.get("plan-duration-average"), - policy_check_failures=attr.get("policy-check-failures") or 0, - run_failures=attr.get("run-failures") or 0, - runs_count=attr.get("workspace-kpis-runs-count") or 0, - tag_names=attr.get("tag-names", []), - setting_overwrites=setting_overwrites, - # Relations - outputs=outputs, - locked_by=locked_by, - data_retention_policy_choice=data_retention_policy_choice - if data_retention_policy_choice - else None, - ) + attr["id"] = d.get("id") + attr["execution_mode"] = em + attr["actions"] = actions + attr["permissions"] = permissions + attr["setting_overwrites"] = setting_overwrites + attr["vcs-repo"] = vcs_repo + + # Add parsed relations + if relationships.get("organization", {}).get("data"): + attr["organization"] = Organization.model_validate( + {"id": relationships["organization"]["data"].get("id")} + ) + if relationships.get("project", {}).get("data"): + attr["project"] = Project.model_validate( + {"id": relationships["project"]["data"].get("id")} + ) + if relationships.get("ssh-key", {}).get("data"): + attr["ssh_key"] = relationships["ssh-key"]["data"].get("id") + attr["outputs"] = outputs + attr["locked_by"] = locked_by + attr["data_retention_policy_choice"] = data_retention_policy_choice + + return Workspace.model_validate(attr) class Workspaces(_Service): @@ -265,47 +188,32 @@ def list( organization: str, options: WorkspaceListOptions | None = None, ) -> Iterator[Workspace]: - # Validate parameters if not valid_string_id(organization): raise InvalidOrgError() - params: dict[str, Any] = {} + params = ( + options.model_dump( + by_alias=True, exclude_none=True, exclude={"tag_bindings"} + ) + if options + else {} + ) if options is not None: - # Use structured options - if options.search: - params["search[name]"] = options.search - if options.tags: - params["search[tags]"] = options.tags - if options.exclude_tags: - params["search[exclude-tags]"] = options.exclude_tags - if options.wildcard_name: - params["search[wildcard-name]"] = options.wildcard_name - if options.project_id: - params["filter[project][id]"] = options.project_id - if options.current_run_status: - params["filter[current-run][status]"] = options.current_run_status if options.include: params["include"] = ",".join([i.value for i in options.include]) - if options.sort: - params["sort"] = options.sort - if options.page_number: - params["page[number]"] = options.page_number - if options.page_size: - params["page[size]"] = options.page_size - - # Handle tag binding filters + if options.tag_bindings: for i, binding in enumerate(options.tag_bindings): if binding.key and binding.value: - params[f"search[tag-bindings][{i}][key]"] = binding.key - params[f"search[tag-bindings][{i}][value]"] = binding.value + params[f"filter[tagged][{i}][key]"] = binding.key + params[f"filter[tagged][{i}][value]"] = binding.value elif binding.key: - params[f"search[tag-bindings][{i}][key]"] = binding.key + params[f"filter[tagged][{i}][key]"] = binding.key path = f"/api/v2/organizations/{organization}/workspaces" for item in self._list(path, params=params): - yield _ws_from(item, organization) + yield _ws_from(item) def read(self, workspace: str, *, organization: str) -> Workspace: """Read workspace by organization and name.""" @@ -318,7 +226,6 @@ def read_with_options( *, organization: str, ) -> Workspace: - # Validate parameters if not valid_string_id(organization): raise InvalidOrgError() if not valid_string_id(workspace): @@ -333,7 +240,7 @@ def read_with_options( f"/api/v2/organizations/{organization}/workspaces/{workspace}", params=params, ) - ws = _ws_from(r.json()["data"], organization) + ws = _ws_from(r.json()["data"]) ws.data_retention_policy = ( ws.data_retention_policy_choice.convert_to_legacy_struct() if ws.data_retention_policy_choice @@ -348,7 +255,6 @@ def read_by_id(self, workspace_id: str) -> Workspace: def read_by_id_with_options( self, workspace_id: str, options: WorkspaceReadOptions | None = None ) -> Workspace: - # Validate parameters if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() @@ -357,7 +263,7 @@ def read_by_id_with_options( if options.include: params["include"] = ",".join([i.value for i in options.include]) r = self.t.request("GET", f"/api/v2/workspaces/{workspace_id}", params=params) - ws = _ws_from(r.json()["data"], None) + ws = _ws_from(r.json()["data"]) if ws.data_retention_policy_choice is not None: ws.data_retention_policy = ( ws.data_retention_policy_choice.convert_to_legacy_struct() @@ -370,184 +276,94 @@ def create( options: WorkspaceCreateOptions, ) -> Workspace: """Create a new workspace in the given organization.""" - # Validate parameters if not valid_string_id(organization): raise InvalidOrgError() - # Validate options before creating workspace - validate_workspace_create_options(options) - - body = self._build_workspace_payload(options, is_create=True) + body = self._build_workspace_payload(options) r = self.t.request( "POST", f"/api/v2/organizations/{organization}/workspaces", json_body=body ) - return _ws_from(r.json()["data"], organization) + return _ws_from(r.json()["data"]) - # Convenience methods for org+name operations def update( self, workspace: str, options: WorkspaceUpdateOptions, *, organization: str ) -> Workspace: """Update workspace by organization and name.""" - # Validate parameters if not valid_string_id(organization): raise InvalidOrgError() if not valid_string_id(workspace): raise InvalidWorkspaceValueError() - # Validate options before updating workspace - validate_workspace_update_options(options) - - body = self._build_workspace_payload(options, is_create=False) + body = self._build_workspace_payload(options) r = self.t.request( "PATCH", f"/api/v2/organizations/{organization}/workspaces/{workspace}", json_body=body, ) - return _ws_from(r.json()["data"], organization) + return _ws_from(r.json()["data"]) def update_by_id( self, workspace_id: str, options: WorkspaceUpdateOptions ) -> Workspace: """Update workspace by workspace ID.""" - # Validate parameters if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() - # Validate options before updating workspace - validate_workspace_update_options(options) - - body = self._build_workspace_payload(options, is_create=False) + body = self._build_workspace_payload(options) r = self.t.request( "PATCH", f"/api/v2/workspaces/{workspace_id}", json_body=body ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def _build_workspace_payload( - self, - options: WorkspaceCreateOptions | WorkspaceUpdateOptions, - is_create: bool = False, + self, options: WorkspaceCreateOptions | WorkspaceUpdateOptions ) -> dict[str, Any]: """Build the workspace payload from options following API specification. Args: options: Either WorkspaceCreateOptions or WorkspaceUpdateOptions - is_create: True for create operations, False for update operations """ - body: dict[str, Any] = {"data": {"type": "workspaces", "attributes": {}}} - - # Add attributes from options - attrs = body["data"]["attributes"] - - # Required field for both create and update: name - attrs["name"] = options.name - - # Common optional attributes - if options.agent_pool_id is not None: - attrs["agent-pool-id"] = options.agent_pool_id - if options.allow_destroy_plan is not None: - attrs["allow-destroy-plan"] = options.allow_destroy_plan - if options.assessments_enabled is not None: - attrs["assessments-enabled"] = options.assessments_enabled - if options.auto_apply is not None: - attrs["auto-apply"] = options.auto_apply - if options.auto_apply_run_trigger is not None: - attrs["auto-apply-run-trigger"] = options.auto_apply_run_trigger - if options.auto_destroy_at is not None: - # Format datetime as ISO8601 string as expected by the API - attrs["auto-destroy-at"] = options.auto_destroy_at.isoformat() - if options.auto_destroy_activity_duration is not None: - attrs["auto-destroy-activity-duration"] = ( - options.auto_destroy_activity_duration - ) - if options.description is not None: - attrs["description"] = options.description - if options.execution_mode is not None: - # Accepts either an enum (with .value) or a string; fallback to the value itself if neither - attrs["execution-mode"] = getattr( - options.execution_mode, "value", options.execution_mode - ) - if options.file_triggers_enabled is not None: - attrs["file-triggers-enabled"] = options.file_triggers_enabled - if options.global_remote_state is not None: - attrs["global-remote-state"] = options.global_remote_state - if options.queue_all_runs is not None: - attrs["queue-all-runs"] = options.queue_all_runs - if options.speculative_enabled is not None: - attrs["speculative-enabled"] = options.speculative_enabled - if options.terraform_version is not None: - attrs["terraform-version"] = options.terraform_version - if options.trigger_patterns: - attrs["trigger-patterns"] = options.trigger_patterns - if options.trigger_prefixes: - attrs["trigger-prefixes"] = options.trigger_prefixes - if options.working_directory is not None: - attrs["working-directory"] = options.working_directory - if options.allow_destroy_plan is not None: - attrs["allow-destroy-plan"] = options.allow_destroy_plan - if options.assessments_enabled is not None: - attrs["assessments-enabled"] = options.assessments_enabled - - # Create-specific attributes - if ( - is_create - and hasattr(options, "source_name") - and options.source_name is not None - ): - attrs["source-name"] = options.source_name - if ( - is_create - and hasattr(options, "source_url") - and options.source_url is not None - ): - attrs["source-url"] = options.source_url - if ( - is_create - and hasattr(options, "structured_run_output_enabled") - and options.structured_run_output_enabled is not None - ): - attrs["structured-run-output-enabled"] = ( - options.structured_run_output_enabled + attrs = ( + ( + options.model_dump( + by_alias=True, + exclude_none=True, + exclude={ + "vcs_repo", + "setting_overwrites", + "project", + "tag_bindings", + }, + ) ) - if ( - is_create - and hasattr(options, "hyok_enabled") - and options.hyok_enabled is not None - ): - attrs["hyok-enabled"] = options.hyok_enabled + if options + else {} + ) # VCS repository configuration - if hasattr(options, "vcs_repo") and options.vcs_repo is not None: - vcs_data: dict[str, Any] = {} - if options.vcs_repo.oauth_token_id is not None: - vcs_data["oauth-token-id"] = options.vcs_repo.oauth_token_id - if options.vcs_repo.identifier is not None: - vcs_data["identifier"] = options.vcs_repo.identifier - if options.vcs_repo.branch is not None: - vcs_data["branch"] = options.vcs_repo.branch - if options.vcs_repo.ingress_submodules is not None: - vcs_data["ingress-submodules"] = options.vcs_repo.ingress_submodules - if options.vcs_repo.tags_regex is not None: - vcs_data["tags-regex"] = options.vcs_repo.tags_regex - if options.vcs_repo.gha_installation_id is not None: - vcs_data["github-app-installation-id"] = ( - options.vcs_repo.gha_installation_id - ) + if hasattr(options, "vcs_repo"): + vcs_data = ( + (options.vcs_repo.model_dump(by_alias=True, exclude_none=True)) + if options.vcs_repo + else {} + ) attrs["vcs-repo"] = vcs_data # Setting overwrites - if ( - hasattr(options, "setting_overwrites") - and options.setting_overwrites is not None - ): - setting_overwrites: dict[str, Any] = {} - if options.setting_overwrites.execution_mode is not None: - setting_overwrites["execution-mode"] = ( - options.setting_overwrites.execution_mode + if hasattr(options, "setting_overwrites"): + setting_overwrites = ( + ( + options.setting_overwrites.model_dump( + by_alias=True, exclude_none=True + ) ) - if options.setting_overwrites.agent_pool is not None: - setting_overwrites["agent-pool"] = options.setting_overwrites.agent_pool + if options.setting_overwrites + else {} + ) attrs["setting-overwrites"] = setting_overwrites + body = {"data": {"type": "workspaces", "attributes": attrs}} + # Add relationships relationships: dict[str, Any] = {} @@ -576,7 +392,6 @@ def _build_workspace_payload( def delete(self, workspace: str, *, organization: str) -> None: """Delete workspace by organization and workspace name.""" - # Validate parameters for proper API usage if not valid_string_id(organization): raise InvalidOrgError() if not valid_string_id(workspace): @@ -585,18 +400,18 @@ def delete(self, workspace: str, *, organization: str) -> None: self.t.request( "DELETE", f"/api/v2/organizations/{organization}/workspaces/{workspace}" ) + return None def delete_by_id(self, workspace_id: str) -> None: """Delete workspace by workspace ID.""" - # Validate parameters for proper API usage if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() self.t.request("DELETE", f"/api/v2/workspaces/{workspace_id}") + return None def safe_delete(self, workspace: str, *, organization: str) -> None: """Safely delete workspace by organization and name.""" - # Validate parameters for proper API usage if not valid_string_id(organization): raise InvalidOrgError() if not valid_string_id(workspace): @@ -606,14 +421,15 @@ def safe_delete(self, workspace: str, *, organization: str) -> None: "POST", f"/api/v2/organizations/{organization}/workspaces/{workspace}/actions/safe-delete", ) + return None def safe_delete_by_id(self, workspace_id: str) -> None: """Safely delete workspace by workspace ID.""" - # Validate parameters for proper API usage if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() self.t.request("POST", f"/api/v2/workspaces/{workspace_id}/actions/safe-delete") + return None def remove_vcs_connection( self, @@ -622,19 +438,15 @@ def remove_vcs_connection( organization: str | None = None, ) -> Workspace: """Remove VCS connection from workspace by organization and name.""" - # Validate parameters if not valid_string_id(organization): raise InvalidOrgError() if not valid_string_id(workspace): raise InvalidWorkspaceValueError() - # Create empty options with vcs_repo=None to remove VCS connection - options = WorkspaceRemoveVCSConnectionOptions(id="", vcs_repo=None) - body = { "data": { "type": "workspaces", - "attributes": {"vcs-repo": options.vcs_repo}, + "attributes": {"vcs-repo": None}, } } @@ -643,21 +455,17 @@ def remove_vcs_connection( f"/api/v2/organizations/{organization}/workspaces/{workspace}", json_body=body, ) - return _ws_from(r.json()["data"], organization) + return _ws_from(r.json()["data"]) def remove_vcs_connection_by_id(self, workspace_id: str) -> Workspace: """Remove VCS connection from workspace by workspace ID.""" - # Validate parameters if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() - # Create empty options with vcs_repo=None to remove VCS connection - options = WorkspaceRemoveVCSConnectionOptions(id="", vcs_repo=None) - body = { "data": { "type": "workspaces", - "attributes": {"vcs-repo": options.vcs_repo}, + "attributes": {"vcs-repo": None}, } } @@ -666,11 +474,10 @@ def remove_vcs_connection_by_id(self, workspace_id: str) -> Workspace: f"/api/v2/workspaces/{workspace_id}", json_body=body, ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def lock(self, workspace_id: str, options: WorkspaceLockOptions) -> Workspace: """Lock a workspace by workspace ID.""" - # Validate parameters if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() @@ -681,11 +488,10 @@ def lock(self, workspace_id: str, options: WorkspaceLockOptions) -> Workspace: f"/api/v2/workspaces/{workspace_id}/actions/lock", json_body=body, ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def unlock(self, workspace_id: str) -> Workspace: """Unlock a workspace by workspace ID.""" - # Validate parameters if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() try: @@ -693,7 +499,7 @@ def unlock(self, workspace_id: str) -> Workspace: "POST", f"/api/v2/workspaces/{workspace_id}/actions/unlock", ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) except Exception as e: if "latest state version is still pending" in str(e): raise WorkspaceLockedStateVersionStillPending(str(e)) from e @@ -701,7 +507,6 @@ def unlock(self, workspace_id: str) -> Workspace: def force_unlock(self, workspace_id: str) -> Workspace: """Force unlock a workspace by workspace ID.""" - # Validate parameters if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() @@ -709,13 +514,12 @@ def force_unlock(self, workspace_id: str) -> Workspace: "POST", f"/api/v2/workspaces/{workspace_id}/actions/force-unlock", ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def assign_ssh_key( self, workspace_id: str, options: WorkspaceAssignSSHKeyOptions ) -> Workspace: """Assign an SSH key to a workspace by workspace ID.""" - # Validate parameters if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() @@ -737,11 +541,10 @@ def assign_ssh_key( f"/api/v2/workspaces/{workspace_id}/relationships/ssh-key", json_body=body, ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def unassign_ssh_key(self, workspace_id: str) -> Workspace: """Unassign the SSH key from a workspace by workspace ID.""" - # Validate parameters if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() @@ -758,27 +561,22 @@ def unassign_ssh_key(self, workspace_id: str) -> Workspace: json_body=body, ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def list_remote_state_consumers( - self, workspace_id: str, options: WorkspaceListRemoteStateConsumersOptions + self, + workspace_id: str, + options: WorkspaceListRemoteStateConsumersOptions | None = None, ) -> Iterator[Workspace]: """List remote state consumers of a workspace by workspace ID.""" - # Validate parameters if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() - params: dict[str, Any] = {} - if options is not None: - # Use structured options - if options.page_number: - params["page[number]"] = options.page_number - if options.page_size: - params["page[size]"] = options.page_size + params = options.model_dump(by_alias=True, exclude_none=True) if options else {} path = f"/api/v2/workspaces/{workspace_id}/relationships/remote-state-consumers" for item in self._list(path, params=params): - yield _ws_from(item, None) + yield _ws_from(item) def add_remote_state_consumers( self, workspace_id: str, options: WorkspaceAddRemoteStateConsumersOptions @@ -799,6 +597,7 @@ def add_remote_state_consumers( f"/api/v2/workspaces/{workspace_id}/relationships/remote-state-consumers", json_body=body, ) + return None def remove_remote_state_consumers( self, workspace_id: str, options: WorkspaceRemoveRemoteStateConsumersOptions @@ -818,6 +617,7 @@ def remove_remote_state_consumers( f"/api/v2/workspaces/{workspace_id}/relationships/remote-state-consumers", json_body=body, ) + return None def update_remote_state_consumers( self, workspace_id: str, options: WorkspaceUpdateRemoteStateConsumersOptions @@ -837,6 +637,7 @@ def update_remote_state_consumers( f"/api/v2/workspaces/{workspace_id}/relationships/remote-state-consumers", json_body=body, ) + return None def list_tags( self, workspace_id: str, options: WorkspaceTagListOptions | None = None @@ -844,14 +645,7 @@ def list_tags( if not valid_string_id(workspace_id): raise InvalidWorkspaceIDError() - params: dict[str, Any] = {} - if options is not None: - if options.query is not None: - params["name"] = options.query - if options.page_number is not None: - params["page[number]"] = options.page_number - if options.page_size is not None: - params["page[size]"] = options.page_size + params = options.model_dump(by_alias=True, exclude_none=True) if options else {} path = f"/api/v2/workspaces/{workspace_id}/relationships/tags" for item in self._list(path, params=params): @@ -879,6 +673,7 @@ def add_tags(self, workspace_id: str, options: WorkspaceAddTagsOptions) -> None: f"/api/v2/workspaces/{workspace_id}/relationships/tags", json_body=body, ) + return None def remove_tags( self, workspace_id: str, options: WorkspaceRemoveTagsOptions @@ -903,6 +698,7 @@ def remove_tags( f"/api/v2/workspaces/{workspace_id}/relationships/tags", json_body=body, ) + return None def list_tag_bindings(self, workspace_id: str) -> Iterator[TagBinding]: if not valid_string_id(workspace_id): @@ -980,6 +776,7 @@ def delete_all_tag_bindings(self, workspace_id: str) -> None: } } self.t.request("PATCH", f"/api/v2/workspaces/{workspace_id}", json_body=body) + return None def read_data_retention_policy( self, workspace_id: str @@ -1149,6 +946,7 @@ def delete_data_retention_policy(self, workspace_id: str) -> None: raise InvalidWorkspaceIDError() self.t.request("DELETE", self._data_retention_policy_link(workspace_id)) + return None def readme(self, workspace_id: str) -> str | None: """Get the README content of a workspace by its ID.""" diff --git a/src/pytfe/utils.py b/src/pytfe/utils.py index 646b0994..9c9e4c5f 100644 --- a/src/pytfe/utils.py +++ b/src/pytfe/utils.py @@ -14,26 +14,10 @@ OAuthClientCreateOptions, OAuthClientRemoveProjectsOptions, ) + from .models.workspace import VCSRepoOptions from urllib.parse import urlparse -from .errors import ( - InvalidNameError, - RequiredAgentModeError, - RequiredAgentPoolIDError, - RequiredNameError, - UnsupportedBothTagsRegexAndFileTriggersEnabledError, - UnsupportedBothTagsRegexAndTriggerPatternsError, - UnsupportedBothTagsRegexAndTriggerPrefixesError, - UnsupportedBothTriggerPatternsAndPrefixesError, - UnsupportedOperationsError, -) -from .models.workspace import ( - VCSRepo, - WorkspaceCreateOptions, - WorkspaceUpdateOptions, -) - _STRING_ID_PATTERN = re.compile(r"^[^/\s]+$") _WS_ID_RE = re.compile(r"^ws-[A-Za-z0-9]+$") _VERSION_PATTERN = re.compile( @@ -123,94 +107,11 @@ def is_valid_workspace_name(name: str | None) -> bool: return True -def has_tags_regex_defined(vcs_repo: VCSRepo | None) -> bool: +def has_tags_regex_defined(vcs_repo: VCSRepoOptions | None) -> bool: """Check if VCS repo has tags regex defined.""" return vcs_repo is not None and valid_string(vcs_repo.tags_regex) -def validate_workspace_create_options(options: WorkspaceCreateOptions) -> None: - """ - Validate workspace create options for proper API usage. - Raises specific validation errors if validation fails. - """ - # Check required name - if not valid_string(options.name): - raise RequiredNameError() - - # Check name format - if not is_valid_workspace_name(options.name): - raise InvalidNameError() - - # Check operations and execution mode conflict - if options.operations is not None and options.execution_mode is not None: - raise UnsupportedOperationsError() - - # Check agent mode requirements - if options.agent_pool_id is not None and ( - options.execution_mode is None or options.execution_mode != "agent" - ): - raise RequiredAgentModeError() - - if ( - options.agent_pool_id is None - and options.execution_mode is not None - and options.execution_mode == "agent" - ): - raise RequiredAgentPoolIDError() - - # Check trigger patterns and prefixes conflict - if len(options.trigger_prefixes) > 0 and len(options.trigger_patterns) > 0: - raise UnsupportedBothTriggerPatternsAndPrefixesError() - - # Check tags regex conflicts - if has_tags_regex_defined(options.vcs_repo): - if len(options.trigger_patterns) > 0: - raise UnsupportedBothTagsRegexAndTriggerPatternsError() - - if len(options.trigger_prefixes) > 0: - raise UnsupportedBothTagsRegexAndTriggerPrefixesError() - - if options.file_triggers_enabled is not None and options.file_triggers_enabled: - raise UnsupportedBothTagsRegexAndFileTriggersEnabledError() - - -def validate_workspace_update_options(options: WorkspaceUpdateOptions) -> None: - """ - Validate workspace update options for proper API usage. - Raises specific validation errors if validation fails. - """ - # Check name format if provided - if options.name is not None and not is_valid_workspace_name(options.name): - raise InvalidNameError() - - # Check operations and execution mode conflict - if options.operations is not None and options.execution_mode is not None: - raise UnsupportedOperationsError() - - # Check agent mode requirements - if ( - options.agent_pool_id is None - and options.execution_mode is not None - and options.execution_mode == "agent" - ): - raise RequiredAgentPoolIDError() - - # Check trigger patterns and prefixes conflict - if len(options.trigger_prefixes) > 0 and len(options.trigger_patterns) > 0: - raise UnsupportedBothTriggerPatternsAndPrefixesError() - - # Check tags regex conflicts - if has_tags_regex_defined(options.vcs_repo): - if len(options.trigger_patterns) > 0: - raise UnsupportedBothTagsRegexAndTriggerPatternsError() - - if len(options.trigger_prefixes) > 0: - raise UnsupportedBothTagsRegexAndTriggerPrefixesError() - - if options.file_triggers_enabled is not None and options.file_triggers_enabled: - raise UnsupportedBothTagsRegexAndFileTriggersEnabledError() - - def validate_oauth_client_create_options(options: OAuthClientCreateOptions) -> None: """ Validate OAuth client create options for proper API usage. From a349ca7e9153901fca3db7f79c654be280a0ab2b Mon Sep 17 00:00:00 2001 From: Sivaselvan32 Date: Thu, 18 Dec 2025 17:19:53 +0530 Subject: [PATCH 2/4] test(workspace): modified testcases for refactor --- src/pytfe/resources/variable_sets.py | 1 - src/pytfe/resources/workspaces.py | 14 +++++++---- tests/units/test_run.py | 4 ++-- tests/units/test_run_trigger.py | 8 +++---- tests/units/test_workspaces.py | 35 +++++++++++++--------------- 5 files changed, 31 insertions(+), 31 deletions(-) diff --git a/src/pytfe/resources/variable_sets.py b/src/pytfe/resources/variable_sets.py index 4bf4353c..8614baab 100644 --- a/src/pytfe/resources/variable_sets.py +++ b/src/pytfe/resources/variable_sets.py @@ -628,7 +628,6 @@ def _parse_variable_set(self, data: dict[str, Any]) -> VariableSet: { "id": ws["id"], "name": f"workspace-{ws['id']}", # Placeholder name - "organization": "placeholder-org", # Placeholder organization } ) parsed_data["workspaces"] = workspaces diff --git a/src/pytfe/resources/workspaces.py b/src/pytfe/resources/workspaces.py index 1b6f2478..99e56993 100644 --- a/src/pytfe/resources/workspaces.py +++ b/src/pytfe/resources/workspaces.py @@ -125,14 +125,16 @@ def _ws_from(d: dict[str, Any]) -> Workspace: DataRetentionPolicyDeleteOlder.model_validate( { "id": drp_data.get("id"), - "delete-older-than-n-days": drp_data.get( + "delete_older_than_n_days": drp_data.get( "attributes", {} ).get("delete-older-than-n-days", 0), } ) ) data_retention_policy_choice = DataRetentionPolicyChoice.model_validate( - data_retention_policy_delete_older + { + "data_retention_policy_delete_older": data_retention_policy_delete_older + } ) elif drp_data.get("type") == "data-retention-policy-dont-deletes": data_retention_policy_dont_delete = ( @@ -141,20 +143,22 @@ def _ws_from(d: dict[str, Any]) -> Workspace: ) ) data_retention_policy_choice = DataRetentionPolicyChoice.model_validate( - data_retention_policy_dont_delete + { + "data_retention_policy_dont_delete": data_retention_policy_dont_delete + } ) elif drp_data.get("type") == "data-retention-policies": # Legacy data retention policy data_retention_policy = DataRetentionPolicy.model_validate( { "id": drp_data.get("id"), - "delete-older-than-n-days": drp_data.get("attributes", {}).get( + "delete_older_than_n_days": drp_data.get("attributes", {}).get( "delete-older-than-n-days", 0 ), } ) data_retention_policy_choice = DataRetentionPolicyChoice.model_validate( - data_retention_policy + {"data_retention_policy": data_retention_policy} ) attr["id"] = d.get("id") diff --git a/tests/units/test_run.py b/tests/units/test_run.py index c1a9bf67..fa7ad87e 100644 --- a/tests/units/test_run.py +++ b/tests/units/test_run.py @@ -162,7 +162,7 @@ def test_create_run_validation_errors(self, runs_service): runs_service.create(options) # Test terraform_version with non-plan-only run - workspace = Workspace(id="ws-123", name="test", organization="test-org") + workspace = Workspace(id="ws-123", name="test", organization=None) options = RunCreateOptions( workspace=workspace, terraform_version="1.5.0", plan_only=False ) @@ -199,7 +199,7 @@ def test_create_run_success(self, runs_service): with patch.object(runs_service, "t") as mock_transport: mock_transport.request.return_value = mock_response - workspace = Workspace(id="ws-123", name="test", organization="test-org") + workspace = Workspace(id="ws-123", name="test", organization=None) variables = [ RunVariable(key="env", value="test"), RunVariable(key="region", value="us-east-1"), diff --git a/tests/units/test_run_trigger.py b/tests/units/test_run_trigger.py index 901dafb8..de3e3c2e 100644 --- a/tests/units/test_run_trigger.py +++ b/tests/units/test_run_trigger.py @@ -190,7 +190,7 @@ def test_create_run_trigger_validations(self, run_triggers_service): """Test create method with invalid workspace ID.""" options = RunTriggerCreateOptions( - sourceable=Workspace(id="ws-source", name="source", organization="org") + sourceable=Workspace(id="ws-source", name="source", organization=None) ) with pytest.raises(InvalidWorkspaceIDError): @@ -201,7 +201,7 @@ def test_create_run_trigger_validations(self, run_triggers_service): # is raised when the service method checks for None sourceable # Create valid options but then manually set sourceable to None to bypass model validation options = RunTriggerCreateOptions( - sourceable=Workspace(id="ws-source", name="source", organization="org") + sourceable=Workspace(id="ws-source", name="source", organization=None) ) options.sourceable = None @@ -227,7 +227,7 @@ def test_create_run_trigger_success(self, run_triggers_service): mock_transport.request.return_value = mock_response options = RunTriggerCreateOptions( - sourceable=Workspace(id="ws-source", name="source", organization="org") + sourceable=Workspace(id="ws-source", name="source", organization=None) ) result = run_triggers_service.create("ws-123", options) @@ -340,7 +340,7 @@ def test_validate_run_trigger_filter_param_success(self, run_triggers_service): def test_backfill_deprecated_sourceable_already_exists(self, run_triggers_service): """Test backfill when sourceable already exists.""" - workspace = Workspace(id="ws-1", name="workspace", organization="org") + workspace = Workspace(id="ws-1", name="workspace", organization=None) rt = RunTrigger( id="rt-1", created_at=datetime.now(), diff --git a/tests/units/test_workspaces.py b/tests/units/test_workspaces.py index f62f7849..77313a20 100644 --- a/tests/units/test_workspaces.py +++ b/tests/units/test_workspaces.py @@ -33,7 +33,7 @@ ) from src.pytfe.models.project import Project from src.pytfe.models.workspace import ( - VCSRepo, + VCSRepoOptions, Workspace, WorkspaceAddRemoteStateConsumersOptions, WorkspaceAddTagBindingsOptions, @@ -304,7 +304,7 @@ def test_create_workspace_with_vcs( sample_workspace_response ) - vcs_repo = VCSRepo( + vcs_repo = VCSRepoOptions( identifier="myorg/myrepo", branch="main", oauth_token_id="ot-123456", @@ -316,7 +316,6 @@ def test_create_workspace_with_vcs( name="vcs-workspace", vcs_repo=vcs_repo, working_directory="terraform/", - # Remove trigger_prefixes to avoid conflict with tags_regex ) workspace = workspaces_service.create("test-org", options=options) @@ -611,11 +610,11 @@ def test_unassign_ssh_key( def test_ws_from_conversion(self, sample_workspace_response): """Test _ws_from helper function conversion.""" workspace_data = sample_workspace_response["data"] - workspace = _ws_from(workspace_data, "test-org") + workspace = _ws_from(workspace_data) assert workspace.id == "ws-abc123def456" assert workspace.name == "test-workspace" - assert workspace.organization == "test-org" + assert workspace.organization is None assert workspace.auto_apply assert workspace.execution_mode == ExecutionMode.REMOTE assert workspace.resource_count == 25 @@ -633,11 +632,11 @@ def test_ws_from_minimal_data(self): """Test _ws_from with minimal data.""" minimal_data = {"id": "ws-minimal", "attributes": {"name": "minimal-workspace"}} - workspace = _ws_from(minimal_data, "test-org") + workspace = _ws_from(minimal_data) assert workspace.id == "ws-minimal" assert workspace.name == "minimal-workspace" - assert workspace.organization == "test-org" + assert workspace.organization is None assert not workspace.auto_apply # Default value assert not workspace.locked # Default value @@ -676,11 +675,11 @@ def test_none_values_handling(self): }, } - workspace = _ws_from(data_with_nones, "test-org") + workspace = _ws_from(data_with_nones) - assert workspace.description == "" # Should convert None to empty string - assert workspace.terraform_version == "" - assert workspace.working_directory == "" + assert workspace.description is None # None values are preserved + assert workspace.terraform_version is None + assert workspace.working_directory is None assert workspace.vcs_repo is None # ========================================== @@ -758,14 +757,13 @@ def test_list_remote_state_consumers_with_pagination( # Verify pagination parameters were passed call_args = mock_transport.request.call_args params = call_args[1]["params"] - assert params["page[number]"] == 1 assert params["page[size]"] == 5 def test_add_remote_state_consumers_basic(self, workspaces_service, mock_transport): """Test adding remote state consumers.""" consumer_workspaces = [ - Workspace(id="ws-consumer-1", name="consumer-1", organization="test-org"), - Workspace(id="ws-consumer-2", name="consumer-2", organization="test-org"), + Workspace(id="ws-consumer-1", name="consumer-1", organization=None), + Workspace(id="ws-consumer-2", name="consumer-2", organization=None), ] options = WorkspaceAddRemoteStateConsumersOptions( @@ -806,7 +804,7 @@ def test_add_remote_state_consumers_validation_errors(self, workspaces_service): # Test invalid workspace ID format (with slash) options = WorkspaceAddRemoteStateConsumersOptions( - workspaces=[Workspace(id="ws-valid", name="valid", organization="test-org")] + workspaces=[Workspace(id="ws-valid", name="valid", organization=None)] ) with pytest.raises(InvalidWorkspaceIDError): @@ -817,7 +815,7 @@ def test_remove_remote_state_consumers_basic( ): """Test removing remote state consumers.""" consumer_workspaces = [ - Workspace(id="ws-consumer-1", name="consumer-1", organization="test-org"), + Workspace(id="ws-consumer-1", name="consumer-1", organization=None), ] options = WorkspaceRemoveRemoteStateConsumersOptions( @@ -844,8 +842,8 @@ def test_update_remote_state_consumers_basic( ): """Test updating (replacing) remote state consumers.""" consumer_workspaces = [ - Workspace(id="ws-consumer-3", name="consumer-3", organization="test-org"), - Workspace(id="ws-consumer-4", name="consumer-4", organization="test-org"), + Workspace(id="ws-consumer-3", name="consumer-3", organization=None), + Workspace(id="ws-consumer-4", name="consumer-4", organization=None), ] options = WorkspaceUpdateRemoteStateConsumersOptions( @@ -929,7 +927,6 @@ def test_list_tags_with_query_and_pagination( call_args = mock_transport.request.call_args params = call_args[1]["params"] assert params["name"] == "env" - assert params["page[number]"] == 1 assert params["page[size]"] == 5 def test_add_tags_basic(self, workspaces_service, mock_transport): From 7600232466ad98be259707c211c26f0ce5c19de0 Mon Sep 17 00:00:00 2001 From: Sivaselvan32 Date: Wed, 4 Mar 2026 10:49:27 +0530 Subject: [PATCH 3/4] resolved conflict at init for stateversion models --- src/pytfe/models/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index fa03b2db..36fac933 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -667,12 +667,10 @@ "StateVersion", "StateVersionCreateOptions", "StateVersionCurrentOptions", - "StateVersionList", "StateVersionListOptions", "StateVersionReadOptions", # State Version Outputs - "StateVersionOutput", - "StateVersionOutputsList", + "StateVersionOutput" "StateVersionOutputsListOptions", ] From c1d0ec5f7bb7608fa69058cefd1fad46a3158df6 Mon Sep 17 00:00:00 2001 From: Sivaselvan32 Date: Thu, 12 Mar 2026 16:42:43 +0530 Subject: [PATCH 4/4] resolved merge conflict at init --- src/pytfe/models/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index 36fac933..855d7126 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -295,13 +295,11 @@ StateVersion, StateVersionCreateOptions, StateVersionCurrentOptions, - StateVersionList, StateVersionListOptions, StateVersionReadOptions, ) from .state_version_output import ( StateVersionOutput, - StateVersionOutputsList, StateVersionOutputsListOptions, ) from .team import ( @@ -670,7 +668,7 @@ "StateVersionListOptions", "StateVersionReadOptions", # State Version Outputs - "StateVersionOutput" + "StateVersionOutput", "StateVersionOutputsListOptions", ]