Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bfabric_app_runner/src/bfabric_app_runner/cli/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ def cmd_validate_inputs_spec(yaml_file: Path) -> None:

def cmd_validate_outputs_spec(yaml_file: Path) -> None:
"""Validate an outputs spec file."""
outputs_spec = OutputsSpec.model_validate(yaml.safe_load(yaml_file.read_text()))
outputs_spec = OutputsSpec.read_yaml(yaml_file)
pprint(outputs_spec)
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

from typing import TYPE_CHECKING, assert_never

import polars as pl

if TYPE_CHECKING:
from pathlib import Path

from bfabric_app_runner.specs.outputs.annotations import BfabricOutputDataset, IncludeDatasetRef, IncludeResourceRef


def _validate_table_schema(df: pl.DataFrame) -> None:
"""Validate that required columns exist and have correct types."""
if "Resource" not in df.columns:
raise ValueError("Missing required column: Resource")

if not df["Resource"].dtype.is_integer():
raise ValueError(f"Column 'Resource' must be integer type, got {df['Resource'].dtype}")

if df["Resource"].null_count() > 0:
raise ValueError("Column 'Resource' cannot contain null values")

if "Anchor" in df.columns and df["Anchor"].dtype not in (pl.Null, pl.String):

Check warning on line 24 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Expression will always evaluate to True since the types "DataType" and "type[Null] | type[String]" have no overlap (reportUnnecessaryContains)
raise ValueError(f"Column 'Anchor' must be String type, got {df['Anchor'].dtype}")


def _load_table(ref: IncludeDatasetRef) -> pl.DataFrame:
format = ref.get_format()

Check warning on line 29 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Type of "get_format" is partially unknown   Type of "get_format" is "() -> Unknown" (reportUnknownMemberType)

Check warning on line 29 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Type of "format" is unknown (reportUnknownVariableType)
# TODO maybe this should be a generic function somewhere, it's duplicated with input specs probably!
match format:
case "csv":
df = pl.read_csv(ref.local_path)
case "tsv":
df = pl.read_csv(ref.local_path, separator="\t")
case "parquet":
df = pl.read_parquet(ref.local_path)
case _:

Check warning on line 38 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Type captured by wildcard pattern is unknown (reportUnknownVariableType)
assert_never(format)

Check warning on line 39 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Argument type is unknown   Argument corresponds to parameter "arg" in function "assert_never" (reportUnknownArgumentType)
_validate_table_schema(df)
return df


def _get_resource_row(resource: IncludeResourceRef, resource_mapping: dict[Path, int]) -> dict[str, int | str | None]:
# TODO check if accessing store_entry_path like this is robust enough (see comment below)
resource_id = resource_mapping[resource.store_entry_path]
return {"Resource": resource_id, "Anchor": resource.anchor, **resource.metadata}


# TODO the resource mapping has some challenges if it's not a model regarding mis-use non-canonical paths,
# e.g. they should never start with `/` but this could also be validated in the originating model


def generate_output_table(config: BfabricOutputDataset, resource_mapping: dict[Path, int]) -> pl.DataFrame:
"""Generates the output table contents for the specified output dataset configuration.

:param config: the output dataset configuration
:param resource_mapping: a mapping of store_entry_path to resource_id of the resources which were created
:return: the output table contents
"""
parts: list[pl.DataFrame] = [_load_table(ref) for ref in config.include_tables]
if config.include_resources:
resource_rows = [_get_resource_row(r, resource_mapping) for r in config.include_resources]
resources_df = pl.from_dicts(resource_rows, strict=False)
_validate_table_schema(resources_df)
parts.append(resources_df)
if not parts:
raise ValueError("BfabricOutputDataset requires at least one include_tables or include_resources entry")
return pl.concat(parts, how="diagonal_relaxed")
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from __future__ import annotations

import hashlib
from typing import TYPE_CHECKING
from pathlib import Path # noqa: TC003 # used at runtime by RegisterAllOutputs pydantic model
from typing import TYPE_CHECKING, assert_never

import polars as pl
import yaml
from bfabric.entities import Resource, Storage, Workunit
from bfabric.operations.dataset import CreateDatasetParams, create_dataset
from bfabric.utils.table_lint import check_for_invalid_characters
from loguru import logger
from pydantic import BaseModel

from bfabric_app_runner.output_registration.annotation_table import generate_output_table
from bfabric_app_runner.specs.outputs.annotations import AnnotationType, BfabricOutputDataset
from bfabric_app_runner.specs.outputs_spec import (
CopyResourceSpec,
OutputsSpec,
Expand All @@ -21,8 +25,6 @@
from bfabric_app_runner.util.scp import scp

if TYPE_CHECKING:
from pathlib import Path

from bfabric import Bfabric
from bfabric.experimental.workunit_definition import WorkunitDefinition

Expand All @@ -39,8 +41,8 @@
client: Bfabric,
workunit_definition: WorkunitDefinition,
resource_id: int | None = None,
) -> None:
"""Registers a file in the workunit."""
) -> int:
"""Registers a file in the workunit and returns the resource ID."""
existing_id = _identify_existing_resource_id(client, spec, workunit_definition)
if resource_id is not None and existing_id is not None and resource_id != existing_id:
raise ValueError(f"Resource id {resource_id} does not match existing resource id {existing_id}")
Expand All @@ -62,7 +64,8 @@
if existing_id is not None:
resource_data["id"] = existing_id

_ = client.save("resource", resource_data)
result = client.save("resource", resource_data)
return result[0]["id"]

Check failure on line 68 in bfabric_app_runner/src/bfabric_app_runner/output_registration/register.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Type "ApiResponseDataType" is not assignable to return type "int"   Type "ApiResponseDataType" is not assignable to type "int"     "float" is not assignable to "int" (reportReturnType)


def _identify_existing_resource_id(
Expand Down Expand Up @@ -172,20 +175,27 @@
return None


class RegisterAllOutputs(BaseModel):
resources_mapping: dict[Path, int] = {}
"""This maps the store_entry_path to the ids of the resources which were created or updated"""


def register_all(
client: Bfabric,
workunit_definition: WorkunitDefinition,
specs_list: list[SpecType],
ssh_user: str | None,
reuse_default_resource: bool,
force_storage: Path | None,
) -> None:
) -> RegisterAllOutputs:
"""Registers all the output specs to the workunit."""
default_resource_was_reused = not reuse_default_resource

storage = _get_storage(client, force_storage, specs_list, workunit_definition)
logger.info(f"Using storage: {storage}")

outputs = RegisterAllOutputs()

for spec in specs_list:
logger.debug(f"Registering {spec}")
if isinstance(spec, CopyResourceSpec):
Expand All @@ -200,7 +210,8 @@
default_resource_was_reused = True
else:
resource_id = None
register_file_in_workunit(

outputs.resources_mapping[spec.store_entry_path] = register_file_in_workunit(
spec,
client=client,
workunit_definition=workunit_definition,
Expand All @@ -211,7 +222,9 @@
elif isinstance(spec, SaveLinkSpec):
_save_link(spec, client, workunit_definition=workunit_definition)
else:
raise ValueError(f"Unknown spec type: {type(spec)}")
assert_never(type(spec))

return outputs


def _get_storage(
Expand All @@ -227,6 +240,36 @@
return None


def _save_annotations(
outputs: RegisterAllOutputs,
annotations: list[AnnotationType],
client: Bfabric,
workunit_definition: WorkunitDefinition,
) -> None:
# Create-only for now: if an output dataset already exists on the workunit, `create_dataset` will
# surface the SOAP error. Updating an existing dataset is a follow-up (no Workunit.output_dataset
# field today, so locating the prior dataset requires its own design).
registration = workunit_definition.registration
if registration is None:
raise ValueError("workunit_definition has no registration; cannot save annotations")
for annotation in annotations:
if isinstance(annotation, BfabricOutputDataset):

Check warning on line 256 in bfabric_app_runner/src/bfabric_app_runner/output_registration/register.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Unnecessary isinstance call; "AnnotationType" is always an instance of "BfabricOutputDataset" (reportUnnecessaryIsInstance)
output_df = generate_output_table(config=annotation, resource_mapping=outputs.resources_mapping)
check_for_invalid_characters(table=output_df, invalid_characters="")
dataset = create_dataset(
client,
output_df,
CreateDatasetParams(
name=annotation.name or f"Output Dataset (workunit {registration.workunit_id})",
container_id=registration.container_id,
workunit_id=registration.workunit_id,
),
)
logger.info(f"Output dataset saved with id {dataset.id} for workunit {registration.workunit_id}")
else:
assert_never(type(annotation))


def register_outputs(
outputs_yaml: Path,
workunit_definition: WorkunitDefinition,
Expand All @@ -236,12 +279,13 @@
force_storage: Path | None,
) -> None:
"""Registers outputs to the workunit."""
specs_list = OutputsSpec.read_yaml(outputs_yaml)
register_all(
spec = OutputsSpec.read_yaml(outputs_yaml)
outputs = register_all(
client=client,
workunit_definition=workunit_definition,
specs_list=specs_list,
specs_list=spec.outputs,
ssh_user=ssh_user,
reuse_default_resource=reuse_default_resource,
force_storage=force_storage,
)
_save_annotations(outputs, spec.annotations, client=client, workunit_definition=workunit_definition)
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class StaticFileSpec(BaseModel):
type: Literal["static_file"] = "static_file"

content: str | bytes

"""The text or binary content to write."""
filename: str
"""The target filename to write to."""
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from __future__ import annotations

import typing
from pathlib import Path
from typing import ClassVar, Literal

from pydantic import BaseModel, model_validator


class IncludeDatasetRef(BaseModel):
Formats: ClassVar = Literal["csv", "tsv", "parquet"]

Check warning on line 11 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Type annotation for attribute `Formats` is required because this class is not decorated with `@final` (reportUnannotatedClassAttribute)

local_path: Path
format: Formats | None = None

Check failure on line 14 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Variable not allowed in type expression (reportInvalidTypeForm)

def get_format(self) -> Formats:

Check failure on line 16 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Variable not allowed in type expression (reportInvalidTypeForm)

Check warning on line 16 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Return type is unknown (reportUnknownParameterType)
"""Returns the format inferring the type from the filename if not specified explicitly."""
if self.format is not None:

Check warning on line 18 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Type of "format" is partially unknown   Type of "format" is "Unknown | None" (reportUnknownMemberType)
return self.format
return self.local_path.suffix.removeprefix(".")

@model_validator(mode="after")
def _check_format_or_correct_suffix(self) -> IncludeDatasetRef:
allowed_formats = typing.get_args(self.Formats)
if self.format is None and self.local_path.suffix.removeprefix(".") not in allowed_formats:
msg = f"When format is not specified, the file extension must be one of {allowed_formats}"
raise ValueError(msg)
return self


class IncludeResourceRef(BaseModel):
store_entry_path: Path
anchor: str = ""
metadata: dict[str, str] = {}


class BfabricOutputDataset(BaseModel):
# Single discriminator value for now; when a second annotation type lands, add a default and migrate consumers.
type: Literal["bfabric_output_dataset"]
name: str | None = None
include_tables: list[IncludeDatasetRef] = []
include_resources: list[IncludeResourceRef] = []


AnnotationType = BfabricOutputDataset
26 changes: 22 additions & 4 deletions bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import annotations

import enum
from collections import Counter
from pathlib import Path # noqa: TCH003
from typing import Literal, Annotated
from typing import Annotated, Literal

import yaml
from pydantic import BaseModel, ConfigDict, Field, model_validator

from bfabric_app_runner.specs.outputs.annotations import AnnotationType


class UpdateExisting(enum.Enum):
NO = "no"
Expand Down Expand Up @@ -45,6 +48,7 @@
invalid_characters: str = ""


# TODO deprecate!
class SaveLinkSpec(BaseModel):
"""Saves a link to the workunit, or, if desired to an arbitrary entity of type entity_type with id entity_id."""

Expand Down Expand Up @@ -74,13 +78,27 @@
class OutputsSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
outputs: list[SpecType]
annotations: list[AnnotationType] = []

@classmethod
def read_yaml(cls, path: Path) -> list[SpecType]:
model = cls.model_validate(yaml.safe_load(path.read_text()))
return model.outputs
def read_yaml(cls, path: Path) -> OutputsSpec:
return cls.model_validate(yaml.safe_load(path.read_text()))

@classmethod
def write_yaml(cls, specs: list[SpecType], path: Path) -> None:
model = cls.model_validate(dict(outputs=specs))
path.write_text(yaml.dump(model.model_dump(mode="json")))

@model_validator(mode="after")
def _no_duplicate_store_entry_paths(self) -> OutputsSpec:
# arguably, this is a bit strict and might be relaxed in the future
# TODO although the main scenario where this could be a problem is when store_folder_path is used and maybe we
# should handle this differently?
resource_specs = list(filter(lambda x: isinstance(x, CopyResourceSpec), self.outputs))
store_entry_paths = [spec.store_entry_path for spec in resource_specs]

Check failure on line 98 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Cannot access attribute "store_entry_path" for class "SaveLinkSpec"   Attribute "store_entry_path" is unknown (reportAttributeAccessIssue)

Check failure on line 98 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Cannot access attribute "store_entry_path" for class "SaveDatasetSpec"   Attribute "store_entry_path" is unknown (reportAttributeAccessIssue)
# check for duplicates
duplicates = [path for path, count in Counter(store_entry_paths).items() if count > 1]
if duplicates:
msg = f"Duplicate store entry paths: {duplicates}"
raise ValueError(msg)
return self
Loading
Loading