Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions src/vercel/_internal/workflow/worlds/local.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import contextlib
import hashlib
import json
import math
import os
import pathlib
import tempfile
import traceback
from datetime import datetime
from typing import Any, TypeVar
Expand Down Expand Up @@ -46,21 +48,50 @@ def read_json(path: pathlib.Path, schema: type[T] | pydantic.TypeAdapter[T]) ->
return None


def atomic_write(path: str | os.PathLike[str], data: bytes, *, overwrite: bool = True) -> None:
"""Atomically write ``data`` to ``path``.

Writes to a temp file in the same directory, then puts it in place
with a single atomic syscall. If ``overwrite`` is True, an existing
file is replaced; otherwise the write fails with ``FileExistsError``
if ``path`` already exists.
"""
directory = os.path.dirname(path) or "."
fd, tmp = tempfile.mkstemp(dir=directory)
try:
with os.fdopen(fd, "wb") as f:
f.write(data)
if overwrite:
os.replace(tmp, path)
else:
os.link(tmp, path)
os.unlink(tmp)
except BaseException:
with contextlib.suppress(OSError):
os.unlink(tmp)
raise


def write_json(path: pathlib.Path, data: w.BaseModel | dict, *, overwrite: bool = False) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
# Do an early check to avoid serializing stuff when we don't need to.
# The exists check is not needed for correctness, though -- the real
# check is in atomic_write, and so there is not a TOCTOU race.
if path.exists() and not overwrite:
raise w.EntityConflictError(f"File already exists: {path}")

if isinstance(data, w.BaseModel):
data = data.model_dump()
with path.open("wb") as f:
cbor2.dump(data, f)
try:
atomic_write(path, cbor2.dumps(data), overwrite=overwrite)
except FileExistsError:
raise w.EntityConflictError(f"File already exists: {path}") from None


def write_exclusive(path: pathlib.Path, data: str) -> bool:
path.parent.mkdir(parents=True, exist_ok=True)
try:
with path.open("x") as f:
f.write(data)
atomic_write(path, data.encode(), overwrite=False)
except FileExistsError:
return False
else:
Expand Down