Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions examples/state_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from __future__ import annotations

import argparse
import hashlib
import json
import os
from pathlib import Path

Expand All @@ -15,6 +17,7 @@
StateVersionListOptions,
StateVersionOutputsListOptions,
)
from pytfe.models.workspace import WorkspaceLockOptions


def _print_header(title: str):
Expand Down Expand Up @@ -110,18 +113,50 @@ def main():
# 5) (Optional) Upload a new state file
if args.upload:
_print_header(f"Uploading new state from: {args.upload}")
payload = Path(args.upload).read_bytes()
try:
payload = Path(args.upload).read_bytes()
state_obj = json.loads(payload.decode("utf-8"))
serial = int(state_obj["serial"])
lineage = state_obj.get("lineage")
md5 = hashlib.md5(payload).hexdigest() # nosec B324
locked_workspace = False

try:
client.workspaces.lock(
args.workspace_id,
WorkspaceLockOptions(
reason="python-tfe state_versions upload example"
),
)
locked_workspace = True
except Exception:
# Continue in case the workspace is already locked by the caller.
pass

# If your server supports signed uploads, this will:
# a) create SV (to get upload URL)
# b) PUT bytes to the signed URL
# c) read back the SV to return a hydrated object
new_sv = client.state_versions.upload(
args.workspace_id,
raw_state=payload,
options=StateVersionCreateOptions(),
)
try:
new_sv = client.state_versions.upload(
args.workspace_id,
raw_state=payload,
options=StateVersionCreateOptions(
serial=serial,
md5=md5,
lineage=lineage,
),
)
finally:
if locked_workspace:
client.workspaces.unlock(args.workspace_id)
print(f"Uploaded new SV: {new_sv.id} status={new_sv.status}")
except FileNotFoundError:
print(f"Upload file not found: {args.upload}")
except (KeyError, ValueError, json.JSONDecodeError):
print(
"Upload input must be a valid Terraform state JSON containing at least a serial value."
)
except ErrStateVersionUploadNotSupported as e:
# Some older/self-hosted versions don’t support direct upload
print(f"Upload not supported on this server: {e}")
Expand Down
10 changes: 10 additions & 0 deletions src/pytfe/models/state_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,18 @@ class StateVersion(BaseModel):
hosted_state_download_url: str | None = Field(
None, alias="hosted-state-download-url"
)
hosted_json_state_download_url: str | None = Field(
None, alias="hosted-json-state-download-url"
)
hosted_state_upload_url: str | None = Field(None, alias="hosted-state-upload-url")
hosted_json_state_upload_url: str | None = Field(
None, alias="hosted-json-state-upload-url"
)
status: StateVersionStatus | None = Field(None, alias="status")
serial: int | None = Field(None, alias="serial")
size: int | None = Field(None, alias="size")
terraform_version: str | None = Field(None, alias="terraform-version")
state_version: int | None = Field(None, alias="state-version")

# Optional/advanced fields (present on newer servers; keep loose)
resources_processed: bool | None = Field(None, alias="resources-processed")
Expand Down
64 changes: 55 additions & 9 deletions src/pytfe/resources/state_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
from typing import Any
from urllib.parse import urlencode

from ..errors import NotFound

# Pydantic models for this feature
from ..errors import ErrStateVersionUploadNotSupported, NotFound, TFEError
from ..models.state_version import (
StateVersion,
StateVersionCreateOptions,
Expand Down Expand Up @@ -193,18 +191,66 @@ def create(
**{k.replace("-", "_"): v for k, v in attr.items()},
)

"""
def upload(
self,
workspace: str,
*,
raw_state: bytes | None = None,
raw_state: bytes | None,
raw_json_state: bytes | None = None,
options: Optional[StateVersionCreateOptions] = None,
organization: Optional[str] = None,
options: StateVersionCreateOptions,
organization: str | None = None,
) -> StateVersion:
# TBD: Implements Upload State Functionality
"""
"""
Create a state version and upload state bytes to signed Archivist URLs.

This mirrors Terraform's recommended workflow:
1. POST /workspaces/:id/state-versions with serial+md5 and no inline state
2. PUT raw state bytes to hosted-state-upload-url
3. Optional PUT JSON state bytes to hosted-json-state-upload-url
4. Read the state version again and return the refreshed object
"""
if raw_state is None:
raise ValueError("raw_state is required")
if options.state is not None or options.json_state is not None:
raise ValueError(
"options.state and options.json_state must be omitted when using upload"
)

try:
sv = self.create(workspace, options, organization=organization)
except TFEError as exc:
# Older servers can reject the create-without-inline-state flow.
if "param is missing or the value is empty: state" in str(exc):
raise ErrStateVersionUploadNotSupported(
"state version upload is not supported by this server"
) from exc
raise

if not sv.hosted_state_upload_url:
raise ErrStateVersionUploadNotSupported(
"hosted-state-upload-url not returned by server"
)

self.t.request(
"PUT",
sv.hosted_state_upload_url,
data=raw_state,
headers={"Content-Type": "application/octet-stream"},
)

if raw_json_state is not None:
if not sv.hosted_json_state_upload_url:
raise ErrStateVersionUploadNotSupported(
"hosted-json-state-upload-url not returned by server"
)
self.t.request(
"PUT",
sv.hosted_json_state_upload_url,
data=raw_json_state,
headers={"Content-Type": "application/octet-stream"},
)

return self.read(sv.id)

def download(self, state_version_id: str) -> bytes:
"""
Expand Down
84 changes: 83 additions & 1 deletion tests/units/test_state_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

from pytfe._http import HTTPTransport
from pytfe.errors import NotFound
from pytfe.errors import ErrStateVersionUploadNotSupported, NotFound, TFEError
from pytfe.models.state_version import (
StateVersion,
StateVersionCreateOptions,
Expand Down Expand Up @@ -131,6 +131,7 @@ def test_read_state_version_success(self, state_versions_service, mock_transport
)
assert result.id == "sv-read-1"
assert result.status == StateVersionStatus.FINALIZED
assert result.serial == 9
assert result.hosted_state_download_url == "https://example.com/download"

def test_read_with_options_success(self, state_versions_service, mock_transport):
Expand Down Expand Up @@ -204,6 +205,7 @@ def test_read_current_with_options_success(
params={"include": "created_by"},
)
assert result.id == "sv-current-1"
assert result.serial == 9

def test_create_state_version_success(self, state_versions_service, mock_transport):
"""Test successful create() operation."""
Expand Down Expand Up @@ -247,6 +249,86 @@ def test_create_state_version_success(self, state_versions_service, mock_transpo
assert result.id == "sv-new-1"
assert result.status == StateVersionStatus.PENDING

def test_upload_state_version_success(self, state_versions_service, mock_transport):
"""Test upload() creates, uploads raw bytes, and re-reads state version."""
created_sv = StateVersion(
id="sv-upload-1",
created_at="2024-01-01T00:00:00Z",
status=StateVersionStatus.PENDING,
hosted_state_upload_url="https://example.com/upload-raw",
hosted_json_state_upload_url="https://example.com/upload-json",
)
final_sv = StateVersion(
id="sv-upload-1",
created_at="2024-01-01T00:00:00Z",
status=StateVersionStatus.FINALIZED,
hosted_state_download_url="https://example.com/download-raw",
)
options = StateVersionCreateOptions(serial=10, md5="abc123")

with patch.object(state_versions_service, "create", return_value=created_sv):
with patch.object(state_versions_service, "read", return_value=final_sv):
result = state_versions_service.upload(
"ws-123",
raw_state=b"raw-state",
raw_json_state=b"json-state",
options=options,
)

assert result.id == "sv-upload-1"
assert result.status == StateVersionStatus.FINALIZED
assert mock_transport.request.call_count == 2
mock_transport.request.assert_any_call(
"PUT",
"https://example.com/upload-raw",
data=b"raw-state",
headers={"Content-Type": "application/octet-stream"},
)
mock_transport.request.assert_any_call(
"PUT",
"https://example.com/upload-json",
data=b"json-state",
headers={"Content-Type": "application/octet-stream"},
)

def test_upload_state_version_unsupported_on_create_error(
self, state_versions_service
):
"""Test upload() maps legacy create error text to typed unsupported error."""
options = StateVersionCreateOptions(serial=10, md5="abc123")
legacy_err = TFEError("param is missing or the value is empty: state")

with patch.object(state_versions_service, "create", side_effect=legacy_err):
with pytest.raises(ErrStateVersionUploadNotSupported):
state_versions_service.upload(
"ws-123", raw_state=b"raw-state", options=options
)

def test_upload_state_version_requires_signed_url(self, state_versions_service):
"""Test upload() raises when server does not return hosted-state-upload-url."""
created_sv = StateVersion(
id="sv-upload-2",
created_at="2024-01-01T00:00:00Z",
status=StateVersionStatus.PENDING,
hosted_state_upload_url=None,
)
options = StateVersionCreateOptions(serial=10, md5="abc123")

with patch.object(state_versions_service, "create", return_value=created_sv):
with pytest.raises(ErrStateVersionUploadNotSupported):
state_versions_service.upload(
"ws-123", raw_state=b"raw-state", options=options
)

def test_upload_state_version_rejects_inline_state(self, state_versions_service):
"""Test upload() enforces omission of inline state/json-state in options."""
options = StateVersionCreateOptions(serial=10, md5="abc123", state="abc")

with pytest.raises(ValueError, match="must be omitted"):
state_versions_service.upload(
"ws-123", raw_state=b"raw-state", options=options
)

def test_download_state_version_not_found_when_url_missing(
self, state_versions_service
):
Expand Down
Loading