diff --git a/initdb/01-create-databases.sh b/initdb/01-create-databases.sh index 126bbd53d..60af21314 100755 --- a/initdb/01-create-databases.sh +++ b/initdb/01-create-databases.sh @@ -3,4 +3,5 @@ set -e psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "postgres" <<-EOSQL CREATE DATABASE tiled_catalog; CREATE DATABASE tiled_storage; + CRAEATE DATABASE tiled_graph; EOSQL diff --git a/pixi.toml b/pixi.toml index 793599df9..58ed6cd46 100644 --- a/pixi.toml +++ b/pixi.toml @@ -110,6 +110,9 @@ starlette = ">=0.48.0" uvicorn = "*" zarr = "*" +[feature.server.pypi-dependencies] +strawberry-graphql = {version = ">=0.315.3", extras = ["fastapi"]} + [feature.sparse.dependencies] ndindex = "*" pyarrow-all = ">=14.0.1" # includes fix to CVE 2023-47248 diff --git a/pyproject.toml b/pyproject.toml index 19bbf5377..59eb09c01 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,6 +101,7 @@ all = [ "sqlalchemy[asyncio] >=2", "stamina", "starlette >=0.48.0", + "strawberry-graphql[fastapi]>=0.315.3", "tifffile", "uvicorn[standard]", "watchfiles", @@ -233,6 +234,7 @@ server = [ "python-multipart", "sparse >=0.15.5", "stamina", + "strawberry-graphql[fastapi]>=0.315.3", "redis", "sqlalchemy[asyncio] >=2", "starlette >=0.48.0", diff --git a/share/tiled/templates/index.html b/share/tiled/templates/index.html index a4b1862a0..b670b06b9 100644 --- a/share/tiled/templates/index.html +++ b/share/tiled/templates/index.html @@ -45,6 +45,17 @@

+
+

Explore the Graph API

+

+ Use the interactive GraphQL playground to query the links graph. +

+ + + +

Learn

diff --git a/tiled/commandline/_links.py b/tiled/commandline/_links.py new file mode 100644 index 000000000..b5d5ca6e8 --- /dev/null +++ b/tiled/commandline/_links.py @@ -0,0 +1,115 @@ +from typing import Optional + +import typer + +graph_app = typer.Typer(no_args_is_help=True) + + +@graph_app.command("initialize-database") +def initialize_database(database_uri: str): + """ + Initialize the graph database for use by Tiled. + """ + import asyncio + + from sqlalchemy.ext.asyncio import create_async_engine + + from ..alembic_utils import UninitializedDatabase, check_database, stamp_head + from ..graph.alembic_constants import ALEMBIC_DIR, ALEMBIC_INI_TEMPLATE_PATH + from ..graph.core import ALL_REVISIONS, REQUIRED_REVISION, initialize_database + from ..utils import ensure_specified_sql_driver + + database_uri = ensure_specified_sql_driver(database_uri) + + async def do_setup(): + engine = create_async_engine(database_uri) + redacted_url = engine.url._replace(password="[redacted]") + try: + await check_database(engine, REQUIRED_REVISION, ALL_REVISIONS) + except UninitializedDatabase: + typer.echo( + f"Database {redacted_url} is new. Creating tables and marking revision {REQUIRED_REVISION}.", + err=True, + ) + await initialize_database(engine) + typer.echo("Database initialized.", err=True) + else: + typer.echo(f"Database at {redacted_url} is already initialized.", err=True) + raise typer.Abort() + await engine.dispose() + + asyncio.run(do_setup()) + stamp_head(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database_uri) + + +@graph_app.command("upgrade-database") +def upgrade_database( + database_uri: str, + revision: Optional[str] = typer.Argument( + None, + help="The ID of a revision to upgrade to. By default, upgrade to the latest one.", + ), +): + """ + Upgrade the graph database schema to the latest version. + """ + import asyncio + + from sqlalchemy.ext.asyncio import create_async_engine + + from ..alembic_utils import get_current_revision, upgrade + from ..graph.alembic_constants import ALEMBIC_DIR, ALEMBIC_INI_TEMPLATE_PATH + from ..graph.core import ALL_REVISIONS + from ..utils import ensure_specified_sql_driver + + database_uri = ensure_specified_sql_driver(database_uri) + + async def do_setup(): + engine = create_async_engine(database_uri) + redacted_url = engine.url._replace(password="[redacted]") + current_revision = await get_current_revision(engine, ALL_REVISIONS) + await engine.dispose() + if current_revision is None: + typer.echo( + f"Database {redacted_url} has not been initialized. Use `tiled graph initialize-database`.", + err=True, + ) + raise typer.Abort() + + asyncio.run(do_setup()) + upgrade(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database_uri, revision or "head") + + +@graph_app.command("downgrade-database") +def downgrade_database( + database_uri: str, + revision: str = typer.Argument(..., help="The ID of a revision to downgrade to."), +): + """ + Downgrade the graph database schema to a previous version. + """ + import asyncio + + from sqlalchemy.ext.asyncio import create_async_engine + + from ..alembic_utils import downgrade, get_current_revision + from ..graph.alembic_constants import ALEMBIC_DIR, ALEMBIC_INI_TEMPLATE_PATH + from ..graph.core import ALL_REVISIONS + from ..utils import ensure_specified_sql_driver + + database_uri = ensure_specified_sql_driver(database_uri) + + async def do_setup(): + engine = create_async_engine(database_uri) + redacted_url = engine.url._replace(password="[redacted]") + current_revision = await get_current_revision(engine, ALL_REVISIONS) + await engine.dispose() + if current_revision is None: + typer.echo( + f"Database {redacted_url} has not been initialized. Use `tiled graph initialize-database`.", + err=True, + ) + raise typer.Abort() + + asyncio.run(do_setup()) + downgrade(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database_uri, revision) diff --git a/tiled/commandline/main.py b/tiled/commandline/main.py index de37b158a..bd8c94ad3 100644 --- a/tiled/commandline/main.py +++ b/tiled/commandline/main.py @@ -32,6 +32,7 @@ from ._admin import admin_app # noqa: E402 from ._api_key import api_key_app # noqa: E402 from ._catalog import catalog_app # noqa: E402 +from ._links import graph_app # noqa: E402 from ._profile import profile_app # noqa: E402 from ._register import register # noqa: E402 from ._serve import serve_app # noqa: E402 @@ -52,6 +53,11 @@ name="admin", help="Administrative utilities for managing large deployments.", ) +cli_app.add_typer( + graph_app, + name="graph", + help="Manage the graph (links) database.", +) @cli_app.command("login") diff --git a/tiled/config.py b/tiled/config.py index a94281e88..f6b77af17 100644 --- a/tiled/config.py +++ b/tiled/config.py @@ -238,6 +238,13 @@ class StreamingCacheConfig(BaseSettings): settings_customise_sources = classmethod(settings_customise_sources) +class LinksDatabase(BaseSettings): + uri: Optional[str] = None + + model_config = SettingsConfigDict(env_prefix="TILED_LINKS_DATABASE_") + settings_customise_sources = classmethod(settings_customise_sources) + + class WebhooksConfig(BaseSettings): """Configuration for the optional webhooks feature. @@ -274,6 +281,7 @@ class Config(BaseSettings): file_extensions: dict[str, str] = {} authentication: Authentication = Authentication() database: Optional[Database] = None + links_database: Optional[LinksDatabase] = None # TODO: Replace Any with AccessPolicy when #1044 is merged access_policy: Annotated[Optional[Any], Field(alias="access_control")] = None response_bytesize_limit: int = 300_000_000 @@ -502,6 +510,7 @@ def construct_build_app_kwargs(config: Config): response_bytesize_limit=config.response_bytesize_limit, exact_count_limit=config.exact_count_limit, database=config.database, + links_database=config.links_database, reject_undeclared_specs=config.reject_undeclared_specs, expose_raw_assets=config.expose_raw_assets, metrics=config.metrics, diff --git a/tiled/graph/__init__.py b/tiled/graph/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tiled/graph/__main__.py b/tiled/graph/__main__.py new file mode 100644 index 000000000..e7a13fb81 --- /dev/null +++ b/tiled/graph/__main__.py @@ -0,0 +1,32 @@ +import os + +from ..alembic_utils import temp_alembic_ini +from .alembic_constants import ALEMBIC_DIR, ALEMBIC_INI_TEMPLATE_PATH + + +def main(args=None): + """ + This is runs the alembic CLI with a dynamically genericated config file. + + A database can be specified via TILED_DATABASE_URI, but it is not necessary to set + it for operations that do not connect to any database, such as defining new database + revisions (i.e. migrations). + + To define a new revision: + + $ python -m tiled.authn_database revision -m "description..." + + """ + import subprocess + import sys + + if args is None: + args = sys.argv[1:] + with temp_alembic_ini( + ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, os.getenv("TILED_DATABASE_URI", "") + ) as config_file: + return subprocess.check_output(["alembic", "-c", config_file, *args]) + + +if __name__ == "__main__": + main() diff --git a/tiled/graph/alembic.ini.template b/tiled/graph/alembic.ini.template new file mode 100644 index 000000000..3896475f1 --- /dev/null +++ b/tiled/graph/alembic.ini.template @@ -0,0 +1,99 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = {migration_script_directory} +sqlalchemy.url = {database_uri} + +# template used to generate migration files +# file_template = %%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python-dateutil library that can be +# installed by adding `alembic[tz]` to the pip requirements +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to alembic/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" +# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. Valid values are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os # default: use os.pathsep + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/tiled/graph/alembic_constants.py b/tiled/graph/alembic_constants.py new file mode 100644 index 000000000..fce8b440f --- /dev/null +++ b/tiled/graph/alembic_constants.py @@ -0,0 +1,5 @@ +import os + +_here = os.path.abspath(os.path.dirname(__file__)) +ALEMBIC_INI_TEMPLATE_PATH = os.path.join(_here, "alembic.ini.template") +ALEMBIC_DIR = os.path.join(_here, "migrations") diff --git a/tiled/graph/core.py b/tiled/graph/core.py new file mode 100644 index 000000000..de5747488 --- /dev/null +++ b/tiled/graph/core.py @@ -0,0 +1,11 @@ +from sqlalchemy.ext.asyncio import AsyncEngine + +from .store import _metadata + +ALL_REVISIONS = ["7f3a9d1c0b25"] +REQUIRED_REVISION = ALL_REVISIONS[0] + + +async def initialize_database(engine: AsyncEngine) -> None: + async with engine.begin() as conn: + await conn.run_sync(_metadata.create_all) diff --git a/tiled/graph/migrations/README b/tiled/graph/migrations/README new file mode 100644 index 000000000..31c2c3bfa --- /dev/null +++ b/tiled/graph/migrations/README @@ -0,0 +1,7 @@ +Generic single-database configuration. + +To generate a new revision file, run: + +``` +python -m tiled.authn_database revision -m "description..." +``` diff --git a/tiled/graph/migrations/env.py b/tiled/graph/migrations/env.py new file mode 100644 index 000000000..fba527f65 --- /dev/null +++ b/tiled/graph/migrations/env.py @@ -0,0 +1,88 @@ +import asyncio + +from alembic import context +from sqlalchemy import pool +from sqlalchemy.ext.asyncio import async_engine_from_config + +# from logging.config import fileConfig + + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +# fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection): + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations(): + """In this scenario we need to create an Engine + and associate a connection with the context. + + """ + + connectable = async_engine_from_config( + config.get_section(config.config_ini_section), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + + await connectable.dispose() + + +def run_migrations_online(): + """Run migrations in 'online' mode.""" + + asyncio.run(run_async_migrations()) + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/tiled/graph/migrations/script.py.mako b/tiled/graph/migrations/script.py.mako new file mode 100644 index 000000000..2c0156303 --- /dev/null +++ b/tiled/graph/migrations/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/tiled/graph/migrations/versions/7f3a9d1c0b25_initial_schema.py b/tiled/graph/migrations/versions/7f3a9d1c0b25_initial_schema.py new file mode 100644 index 000000000..0d383b2d2 --- /dev/null +++ b/tiled/graph/migrations/versions/7f3a9d1c0b25_initial_schema.py @@ -0,0 +1,61 @@ +"""Initial schema: entities and links tables + +Revision ID: 7f3a9d1c0b25 +Revises: +Create Date: 2026-05-12 + +""" + +import sqlalchemy as sa +from alembic import op + +revision = "7f3a9d1c0b25" +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "entities", + sa.Column("id", sa.String, primary_key=True), + sa.Column("entity_type", sa.String, nullable=False), + sa.Column("name", sa.String, nullable=False), + sa.Column("uri", sa.String, nullable=True), + sa.Column("properties", sa.JSON, nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + ) + op.create_index( + "entities_type_created_idx", "entities", ["entity_type", "created_at"] + ) + op.create_index("entities_uri_idx", "entities", ["uri"]) + + op.create_table( + "links", + sa.Column("id", sa.String, primary_key=True), + sa.Column( + "subject_id", + sa.String, + sa.ForeignKey("entities.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("predicate", sa.String, nullable=False), + sa.Column( + "object_id", + sa.String, + sa.ForeignKey("entities.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("properties", sa.JSON, nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + ) + op.create_index("links_subject_predicate_idx", "links", ["subject_id", "predicate"]) + op.create_index("links_predicate_object_idx", "links", ["predicate", "object_id"]) + op.create_index( + "links_triple_idx", "links", ["subject_id", "predicate", "object_id"] + ) + + +def downgrade(): + op.drop_table("links") + op.drop_table("entities") diff --git a/tiled/graph/router.py b/tiled/graph/router.py new file mode 100644 index 000000000..896581b65 --- /dev/null +++ b/tiled/graph/router.py @@ -0,0 +1,57 @@ +""" +Links router for the Tiled service. + +Provides a GraphQL interface for the entity/link graph under /api/v1/links. +The store lifecycle is owned by the router: startup/shutdown handlers are +registered automatically when the router is included in a FastAPI app. + +Database migrations are NOT run here — they are the responsibility of the +caller (app startup) following the same pattern as the authn and catalog +databases. Use `tiled graph initialize-database` / `upgrade-database` from +the CLI, or let the server auto-initialize when database_init_if_not_exists +is set. +""" + +from __future__ import annotations + +import logging +import os +from typing import Optional + +from fastapi import APIRouter, Request +from strawberry.fastapi import GraphQLRouter + +from .schema import schema +from .store import SQLAlchemyStore as SQLiteStore +from .store import Store, _url_from_path + +logger = logging.getLogger(__name__) + + +def create_router(db_path: Optional[str] = None) -> APIRouter: + store: list[Store] = [] # mutable cell — populated on startup + + async def startup() -> None: + resolved = db_path or os.environ.get("SPLASH_LINKS_DB", ":memory:") + db_url = _url_from_path(resolved) + logger.info("Initializing links store: %s", db_url) + store.append(SQLiteStore(db_url)) + + async def shutdown() -> None: + if store: + store[0].close() + logger.info("Links store closed") + + async def get_context(request: Request) -> dict: + return {"store": store[0]} + + graphql_router = GraphQLRouter( + schema, + context_getter=get_context, + graphql_ide="graphiql", + ) + + router = APIRouter(on_startup=[startup], on_shutdown=[shutdown]) + router.include_router(graphql_router, prefix="/api/graphql") + + return router diff --git a/tiled/graph/schema.py b/tiled/graph/schema.py new file mode 100644 index 000000000..75bba2611 --- /dev/null +++ b/tiled/graph/schema.py @@ -0,0 +1,299 @@ +""" +Strawberry GraphQL schema for splash-links. + +Graph model: + - Entity — a named node with a type and arbitrary JSON properties + - Link — a directed, predicate-labeled edge between two entities + +Query highlights: + - entity / entities — fetch nodes + - link / links — fetch edges, filterable by subject, predicate, object + - Entity.outgoing_links / incoming_links — graph traversal from a node + +Mutations: + - createEntity / createLink + - deleteEntity (cascades to attached links) / deleteLink +""" + +from __future__ import annotations + +import logging +from typing import Optional + +import strawberry +from strawberry.scalars import JSON as StrawberryJSON +from strawberry.types import Info + +from .store import EntityRecord, LinkRecord, Store + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# JSON scalar — pass arbitrary dicts / lists / primitives through GraphQL +# --------------------------------------------------------------------------- + +JSON = StrawberryJSON + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _store(info: Info) -> Store: + return info.context["store"] + + +# --------------------------------------------------------------------------- +# Output types +# --------------------------------------------------------------------------- + + +@strawberry.type +class Entity: + id: strawberry.ID + entity_type: str + name: str + uri: Optional[str] + properties: Optional[JSON] # type: ignore[valid-type] + created_at: str + + @strawberry.field(description="Links where this entity is the subject.") + def outgoing_links( + self, + info: Info, + predicate: Optional[str] = None, + limit: int = 100, + offset: int = 0, + ) -> list["Link"]: + records = _store(info).find_links( + subject_id=str(self.id), predicate=predicate, limit=limit, offset=offset + ) + return [_link_from_record(r) for r in records] + + @strawberry.field(description="Links where this entity is the object.") + def incoming_links( + self, + info: Info, + predicate: Optional[str] = None, + limit: int = 100, + offset: int = 0, + ) -> list["Link"]: + records = _store(info).find_links( + object_id=str(self.id), predicate=predicate, limit=limit, offset=offset + ) + return [_link_from_record(r) for r in records] + + +@strawberry.type +class Link: + id: strawberry.ID + subject_id: strawberry.ID + predicate: str + object_id: strawberry.ID + properties: Optional[JSON] # type: ignore[valid-type] + created_at: str + + @strawberry.field + def subject(self, info: Info) -> Optional[Entity]: + record = _store(info).get_entity(str(self.subject_id)) + return _entity_from_record(record) if record else None + + @strawberry.field + def object(self, info: Info) -> Optional[Entity]: + record = _store(info).get_entity(str(self.object_id)) + return _entity_from_record(record) if record else None + + +# --------------------------------------------------------------------------- +# Record -> GQL type converters +# --------------------------------------------------------------------------- + + +def _entity_from_record(r: EntityRecord) -> Entity: + return Entity( + id=strawberry.ID(r.id), + entity_type=r.entity_type, + name=r.name, + uri=r.uri, + properties=r.properties if r.properties else None, + created_at=r.created_at.isoformat(), + ) + + +def _link_from_record(r: LinkRecord) -> Link: + return Link( + id=strawberry.ID(r.id), + subject_id=strawberry.ID(r.subject_id), + predicate=r.predicate, + object_id=strawberry.ID(r.object_id), + properties=r.properties if r.properties else None, + created_at=r.created_at.isoformat(), + ) + + +# --------------------------------------------------------------------------- +# Input types +# --------------------------------------------------------------------------- + + +@strawberry.input +class UpdateEntityInput: + name: Optional[str] = None + uri: Optional[str] = None + entity_type: Optional[str] = None + + +@strawberry.input +class UpdateLinkInput: + predicate: str + + +@strawberry.input +class CreateEntityInput: + entity_type: str + name: str + uri: Optional[str] = None + properties: Optional[JSON] = None # type: ignore[valid-type] + + +@strawberry.input +class CreateLinkInput: + subject_id: strawberry.ID + predicate: str + object_id: strawberry.ID + properties: Optional[JSON] = None # type: ignore[valid-type] + + +# --------------------------------------------------------------------------- +# Query / Mutation +# --------------------------------------------------------------------------- + + +@strawberry.type +class Query: + @strawberry.field + def entity(self, info: Info, id: strawberry.ID) -> Optional[Entity]: + record = _store(info).get_entity(str(id)) + return _entity_from_record(record) if record else None + + @strawberry.field + def entities( + self, + info: Info, + entity_type: Optional[str] = None, + limit: int = 100, + offset: int = 0, + ) -> list[Entity]: + records = _store(info).list_entities( + entity_type=entity_type, limit=limit, offset=offset + ) + return [_entity_from_record(r) for r in records] + + @strawberry.field + def link(self, info: Info, id: strawberry.ID) -> Optional[Link]: + record = _store(info).get_link(str(id)) + return _link_from_record(record) if record else None + + @strawberry.field( + description="Find links, optionally filtered by subject, predicate, and/or object." + ) + def links( + self, + info: Info, + subject_id: Optional[strawberry.ID] = None, + predicate: Optional[str] = None, + object_id: Optional[strawberry.ID] = None, + limit: int = 100, + offset: int = 0, + ) -> list[Link]: + records = _store(info).find_links( + subject_id=str(subject_id) if subject_id else None, + predicate=predicate, + object_id=str(object_id) if object_id else None, + limit=limit, + offset=offset, + ) + return [_link_from_record(r) for r in records] + + +@strawberry.type +class Mutation: + @strawberry.mutation + def create_entity(self, info: Info, input: CreateEntityInput) -> Entity: + record = _store(info).create_entity( + entity_type=input.entity_type, + name=input.name, + uri=input.uri, + properties=input.properties, + ) + logger.info( + "Created entity type=%r name=%r id=%s", + record.entity_type, + record.name, + record.id, + ) + return _entity_from_record(record) + + @strawberry.mutation + def create_link(self, info: Info, input: CreateLinkInput) -> Link: + record = _store(info).create_link( + subject_id=str(input.subject_id), + predicate=input.predicate, + object_id=str(input.object_id), + properties=input.properties, + ) + logger.info( + "Created link %s -[%s]-> %s id=%s", + record.subject_id[:8], + record.predicate, + record.object_id[:8], + record.id, + ) + return _link_from_record(record) + + @strawberry.mutation( + description="Delete an entity and all its attached links. Returns true if found." + ) + def delete_entity(self, info: Info, id: strawberry.ID) -> bool: + deleted = _store(info).delete_entity(str(id)) + if deleted: + logger.info("Deleted entity id=%s", id) + return deleted + + @strawberry.mutation(description="Update an entity's name, uri, or entity_type.") + def update_entity( + self, info: Info, id: strawberry.ID, input: UpdateEntityInput + ) -> Optional[Entity]: + record = _store(info).update_entity( + str(id), + name=input.name, + uri=input.uri, + entity_type=input.entity_type, + ) + if record: + logger.info("Updated entity id=%s", id) + return _entity_from_record(record) if record else None + + @strawberry.mutation(description="Delete a single link. Returns true if found.") + def delete_link(self, info: Info, id: strawberry.ID) -> bool: + deleted = _store(info).delete_link(str(id)) + if deleted: + logger.info("Deleted link id=%s", id) + return deleted + + @strawberry.mutation(description="Update a link's predicate.") + def update_link( + self, info: Info, id: strawberry.ID, input: UpdateLinkInput + ) -> Optional[Link]: + record = _store(info).update_link(str(id), predicate=input.predicate) + if record: + logger.info("Updated link id=%s predicate=%r", id, input.predicate) + return _link_from_record(record) if record else None + + +# --------------------------------------------------------------------------- +# Schema +# --------------------------------------------------------------------------- + +schema = strawberry.Schema(query=Query, mutation=Mutation) diff --git a/tiled/graph/store.py b/tiled/graph/store.py new file mode 100644 index 000000000..75e863447 --- /dev/null +++ b/tiled/graph/store.py @@ -0,0 +1,430 @@ +""" +Storage layer for the splash-links entity graph service. + +The abstract ``Store`` interface decouples the application from the +underlying database. The concrete ``SQLAlchemyStore`` targets any database +supported by SQLAlchemy 2.x — SQLite (default), PostgreSQL, and DuckDB (via +``duckdb-engine``) are the primary targets. + +Connection URL examples +----------------------- +SQLite (file): sqlite:///links.sqlite +SQLite (memory): sqlite:///:memory: +PostgreSQL: postgresql+psycopg2://user:pass@host/dbname +DuckDB (file): duckdb:///links.duckdb +DuckDB (memory): duckdb:///:memory: +""" + +from __future__ import annotations + +import abc +import uuid +from datetime import datetime, timezone +from typing import Optional + +from pydantic import BaseModel, ConfigDict +from sqlalchemy import ( + JSON, + Column, + DateTime, + ForeignKey, + Index, + MetaData, + String, + Table, + create_engine, + delete, + event, + insert, + select, + update, +) +from sqlalchemy.engine import Engine +from sqlalchemy.pool import StaticPool + +# --------------------------------------------------------------------------- +# Data records +# --------------------------------------------------------------------------- + + +class EntityRecord(BaseModel): + model_config = ConfigDict(extra="forbid") + + id: str + entity_type: str + name: str + uri: Optional[str] + properties: dict + created_at: datetime + + +class LinkRecord(BaseModel): + model_config = ConfigDict(extra="forbid") + + id: str + subject_id: str + predicate: str + object_id: str + properties: dict + created_at: datetime + + +# --------------------------------------------------------------------------- +# Abstract interface +# --------------------------------------------------------------------------- + + +class Store(abc.ABC): + """Minimal interface for entity/link persistence.""" + + @abc.abstractmethod + def create_entity( + self, + entity_type: str, + name: str, + uri: Optional[str] = None, + properties: Optional[dict] = None, + ) -> EntityRecord: + ... + + @abc.abstractmethod + def get_entity(self, id: str) -> Optional[EntityRecord]: + ... + + @abc.abstractmethod + def list_entities( + self, + entity_type: Optional[str] = None, + limit: int = 100, + offset: int = 0, + ) -> list[EntityRecord]: + ... + + @abc.abstractmethod + def delete_entity(self, id: str) -> bool: + ... + + @abc.abstractmethod + def update_entity( + self, + id: str, + name: Optional[str] = None, + uri: Optional[str] = None, + entity_type: Optional[str] = None, + ) -> Optional[EntityRecord]: + ... + + @abc.abstractmethod + def create_link( + self, + subject_id: str, + predicate: str, + object_id: str, + properties: Optional[dict] = None, + ) -> LinkRecord: + ... + + @abc.abstractmethod + def get_link(self, id: str) -> Optional[LinkRecord]: + ... + + @abc.abstractmethod + def find_links( + self, + subject_id: Optional[str] = None, + predicate: Optional[str] = None, + object_id: Optional[str] = None, + limit: int = 100, + offset: int = 0, + ) -> list[LinkRecord]: + ... + + @abc.abstractmethod + def delete_link(self, id: str) -> bool: + ... + + @abc.abstractmethod + def update_link(self, id: str, predicate: str) -> Optional[LinkRecord]: + ... + + @abc.abstractmethod + def close(self) -> None: + ... + + +# --------------------------------------------------------------------------- +# SQLAlchemy schema +# --------------------------------------------------------------------------- + +_metadata = MetaData() + +_entities = Table( + "entities", + _metadata, + Column("id", String, primary_key=True), + Column("entity_type", String, nullable=False), + Column("name", String, nullable=False), + Column("uri", String, nullable=True), + Column("properties", JSON, nullable=False), + Column("created_at", DateTime(timezone=True), nullable=False), + Index("entities_type_created_idx", "entity_type", "created_at"), + Index("entities_uri_idx", "uri"), +) + +_links = Table( + "links", + _metadata, + Column("id", String, primary_key=True), + Column( + "subject_id", + String, + ForeignKey("entities.id", ondelete="CASCADE"), + nullable=False, + ), + Column("predicate", String, nullable=False), + Column( + "object_id", + String, + ForeignKey("entities.id", ondelete="CASCADE"), + nullable=False, + ), + Column("properties", JSON, nullable=False), + Column("created_at", DateTime(timezone=True), nullable=False), + Index("links_subject_predicate_idx", "subject_id", "predicate"), + Index("links_predicate_object_idx", "predicate", "object_id"), + Index("links_triple_idx", "subject_id", "predicate", "object_id"), +) + + +def _make_engine(db_url: str) -> Engine: + """Create a SQLAlchemy engine from a URL, applying dialect-specific tuning.""" + is_sqlite = db_url.startswith("sqlite") + is_memory = ":memory:" in db_url + + kwargs: dict = {} + if is_sqlite: + kwargs["connect_args"] = {"check_same_thread": False} + if is_memory: + kwargs["poolclass"] = StaticPool + + engine = create_engine(db_url, **kwargs) + + if is_sqlite: + # Enable foreign-key enforcement for every new SQLite connection. + @event.listens_for(engine, "connect") + def _set_sqlite_pragma(conn, _record): + conn.execute("PRAGMA foreign_keys=ON") + + return engine + + +def _url_from_path(db_path: str) -> str: + """Convert a plain file path / ':memory:' to a sqlite:// URL.""" + if "://" in db_path: + return db_path + if db_path == ":memory:": + return "sqlite:///:memory:" + return f"sqlite:///{db_path}" + + +# --------------------------------------------------------------------------- +# SQLAlchemy implementation +# --------------------------------------------------------------------------- + + +class SQLAlchemyStore(Store): + """ + Database-agnostic store backed by SQLAlchemy Core. + + ``db_url`` may be any SQLAlchemy connection URL. For convenience, + plain file paths and ``':memory:'`` are auto-converted to + ``sqlite:///…`` / ``sqlite:///:memory:``. + """ + + def __init__(self, db_url: str = ":memory:") -> None: + self._engine: Engine = _make_engine(_url_from_path(db_url)) + _metadata.create_all(self._engine) + + # ------------------------------------------------------------------ + # Row conversion helpers + # ------------------------------------------------------------------ + + @staticmethod + def _to_entity(row) -> EntityRecord: + return EntityRecord( + id=row.id, + entity_type=row.entity_type, + name=row.name, + uri=row.uri, + properties=row.properties or {}, + created_at=row.created_at, + ) + + @staticmethod + def _to_link(row) -> LinkRecord: + return LinkRecord( + id=row.id, + subject_id=row.subject_id, + predicate=row.predicate, + object_id=row.object_id, + properties=row.properties or {}, + created_at=row.created_at, + ) + + # ------------------------------------------------------------------ + # Entity operations + # ------------------------------------------------------------------ + + def create_entity( + self, + entity_type: str, + name: str, + uri: Optional[str] = None, + properties: Optional[dict] = None, + ) -> EntityRecord: + id_ = str(uuid.uuid4()) + now = datetime.now(timezone.utc) + with self._engine.begin() as conn: + conn.execute( + insert(_entities).values( + id=id_, + entity_type=entity_type, + name=name, + uri=uri, + properties=properties or {}, + created_at=now, + ) + ) + row = conn.execute(select(_entities).where(_entities.c.id == id_)).one() + return self._to_entity(row) + + def get_entity(self, id: str) -> Optional[EntityRecord]: + with self._engine.connect() as conn: + row = conn.execute( + select(_entities).where(_entities.c.id == id) + ).one_or_none() + return self._to_entity(row) if row else None + + def list_entities( + self, + entity_type: Optional[str] = None, + limit: int = 100, + offset: int = 0, + ) -> list[EntityRecord]: + stmt = ( + select(_entities) + .order_by(_entities.c.created_at) + .limit(limit) + .offset(offset) + ) + if entity_type is not None: + stmt = stmt.where(_entities.c.entity_type == entity_type) + with self._engine.connect() as conn: + rows = conn.execute(stmt).all() + return [self._to_entity(r) for r in rows] + + def delete_entity(self, id: str) -> bool: + with self._engine.begin() as conn: + result = conn.execute(delete(_entities).where(_entities.c.id == id)) + return result.rowcount > 0 + + def update_entity( + self, + id: str, + name: Optional[str] = None, + uri: Optional[str] = None, + entity_type: Optional[str] = None, + ) -> Optional[EntityRecord]: + values: dict = {} + if name is not None: + values["name"] = name + if uri is not None: + values["uri"] = uri + if entity_type is not None: + values["entity_type"] = entity_type + with self._engine.begin() as conn: + if values: + conn.execute( + update(_entities).where(_entities.c.id == id).values(**values) + ) + row = conn.execute( + select(_entities).where(_entities.c.id == id) + ).one_or_none() + return self._to_entity(row) if row else None + + # ------------------------------------------------------------------ + # Link operations + # ------------------------------------------------------------------ + + def create_link( + self, + subject_id: str, + predicate: str, + object_id: str, + properties: Optional[dict] = None, + ) -> LinkRecord: + if not self.get_entity(subject_id): + raise ValueError(f"Subject entity '{subject_id}' not found") + if not self.get_entity(object_id): + raise ValueError(f"Object entity '{object_id}' not found") + + id_ = str(uuid.uuid4()) + now = datetime.now(timezone.utc) + with self._engine.begin() as conn: + conn.execute( + insert(_links).values( + id=id_, + subject_id=subject_id, + predicate=predicate, + object_id=object_id, + properties=properties or {}, + created_at=now, + ) + ) + row = conn.execute(select(_links).where(_links.c.id == id_)).one() + return self._to_link(row) + + def get_link(self, id: str) -> Optional[LinkRecord]: + with self._engine.connect() as conn: + row = conn.execute(select(_links).where(_links.c.id == id)).one_or_none() + return self._to_link(row) if row else None + + def find_links( + self, + subject_id: Optional[str] = None, + predicate: Optional[str] = None, + object_id: Optional[str] = None, + limit: int = 100, + offset: int = 0, + ) -> list[LinkRecord]: + stmt = select(_links).order_by(_links.c.created_at).limit(limit).offset(offset) + if subject_id is not None: + stmt = stmt.where(_links.c.subject_id == subject_id) + if predicate is not None: + stmt = stmt.where(_links.c.predicate == predicate) + if object_id is not None: + stmt = stmt.where(_links.c.object_id == object_id) + with self._engine.connect() as conn: + rows = conn.execute(stmt).all() + return [self._to_link(r) for r in rows] + + def delete_link(self, id: str) -> bool: + with self._engine.begin() as conn: + result = conn.execute(delete(_links).where(_links.c.id == id)) + return result.rowcount > 0 + + def update_link(self, id: str, predicate: str) -> Optional[LinkRecord]: + with self._engine.begin() as conn: + conn.execute( + update(_links).where(_links.c.id == id).values(predicate=predicate) + ) + row = conn.execute(select(_links).where(_links.c.id == id)).one_or_none() + return self._to_link(row) if row else None + + def close(self) -> None: + self._engine.dispose() + + +# Backward-compatible aliases +SQLiteStore = SQLAlchemyStore +DuckDBStore = SQLAlchemyStore diff --git a/tiled/server/app.py b/tiled/server/app.py index 604597ea9..073c13d36 100644 --- a/tiled/server/app.py +++ b/tiled/server/app.py @@ -47,6 +47,7 @@ construct_build_app_kwargs, parse_configs, ) +from ..graph.router import create_router as get_links_router from ..media_type_registration import ( CompressionRegistry, SerializationRegistry, @@ -417,6 +418,9 @@ async def unhandled_exception_handler( app.include_router(get_zarr_router_v2(), prefix="/zarr/v2") app.include_router(get_zarr_router_v3(), prefix="/zarr/v3") + links_db_config = server_settings.get("links_database") + links_db_uri = links_db_config.uri if links_db_config is not None else None + app.include_router(get_links_router(links_db_uri)) # The Tree and Authenticator have the opportunity to add custom routes to # the server here. (Just for example, a Tree of BlueskyRuns uses this @@ -741,6 +745,82 @@ async def purge_expired_sessions_and_api_keys(): asyncio.create_task(purge_expired_sessions_and_api_keys()) ) + if links_db_uri is not None and ":memory:" not in links_db_uri: + import sys + + from sqlalchemy.ext.asyncio import create_async_engine + + from ..alembic_utils import ( + DatabaseUpgradeNeeded, + UninitializedDatabase, + check_database, + ) + from ..graph.core import ( + ALL_REVISIONS, + REQUIRED_REVISION, + initialize_database, + ) + from ..utils import ensure_specified_sql_driver + + graph_engine = create_async_engine( + ensure_specified_sql_driver(links_db_uri) + ) + redacted_graph_url = graph_engine.url._replace(password="[redacted]") + try: + await check_database(graph_engine, REQUIRED_REVISION, ALL_REVISIONS) + except UninitializedDatabase: + if settings.database_init_if_not_exists: + import subprocess + + subprocess.run( + [ + sys.executable, + "-m", + "tiled", + "graph", + "initialize-database", + str(graph_engine.url), + ], + capture_output=True, + check=True, + ) + else: + print( + dedent( + f""" + + No graph database found at {redacted_graph_url} + + To create one, run: + + tiled graph initialize-database {redacted_graph_url} + """ + ), + file=sys.stderr, + ) + raise + except DatabaseUpgradeNeeded as err: + print( + dedent( + f""" + + The graph database at {redacted_graph_url} was created by an older + version of Tiled. It needs to be upgraded. + + Back up the database, and then run: + + tiled graph upgrade-database {redacted_graph_url} + """ + ), + file=sys.stderr, + ) + raise err from None + else: + logger.info( + f"Connected to existing graph database at {redacted_graph_url}." + ) + await graph_engine.dispose() + async def shutdown_event(): # Run shutdown tasks collected from trees (adapters). for task in tasks["shutdown"]: diff --git a/web-frontend/package-lock.json b/web-frontend/package-lock.json index bccb9b597..8a75f62a8 100644 --- a/web-frontend/package-lock.json +++ b/web-frontend/package-lock.json @@ -2193,9 +2193,6 @@ "cpu": [ "arm" ], - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -2209,9 +2206,6 @@ "cpu": [ "arm" ], - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -2225,9 +2219,6 @@ "cpu": [ "arm64" ], - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -2241,9 +2232,6 @@ "cpu": [ "arm64" ], - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -2257,9 +2245,6 @@ "cpu": [ "loong64" ], - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -2273,9 +2258,6 @@ "cpu": [ "loong64" ], - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -2289,9 +2271,6 @@ "cpu": [ "ppc64" ], - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -2305,9 +2284,6 @@ "cpu": [ "ppc64" ], - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -2321,9 +2297,6 @@ "cpu": [ "riscv64" ], - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -2337,9 +2310,6 @@ "cpu": [ "riscv64" ], - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -2353,9 +2323,6 @@ "cpu": [ "s390x" ], - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -2369,9 +2336,6 @@ "cpu": [ "x64" ], - "libc": [ - "glibc" - ], "license": "MIT", "optional": true, "os": [ @@ -2385,9 +2349,6 @@ "cpu": [ "x64" ], - "libc": [ - "musl" - ], "license": "MIT", "optional": true, "os": [ @@ -3430,6 +3391,15 @@ "node": ">=10" } }, + "node_modules/cosmiconfig/node_modules/yaml": { + "version": "1.10.3", + "resolved": "https://registry.npmjs.org/yaml/-/yaml-1.10.3.tgz", + "integrity": "sha512-vIYeF1u3CjlhAFekPPAk2h/Kv4T3mAkMox5OymRiJQB0spDP10LHvt+K7G9Ny6NuuMAb25/6n1qyUjAcGNf/AA==", + "license": "ISC", + "engines": { + "node": ">= 6" + } + }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", @@ -6533,12 +6503,20 @@ "license": "ISC" }, "node_modules/yaml": { - "version": "1.10.3", - "resolved": "https://registry.npmjs.org/yaml/-/yaml-1.10.3.tgz", - "integrity": "sha512-vIYeF1u3CjlhAFekPPAk2h/Kv4T3mAkMox5OymRiJQB0spDP10LHvt+K7G9Ny6NuuMAb25/6n1qyUjAcGNf/AA==", + "version": "2.9.0", + "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.9.0.tgz", + "integrity": "sha512-2AvhNX3mb8zd6Zy7INTtSpl1F15HW6Wnqj0srWlkKLcpYl/gMIMJiyuGq2KeI2YFxUPjdlB+3Lc10seMLtL4cA==", "license": "ISC", + "optional": true, + "peer": true, + "bin": { + "yaml": "bin.mjs" + }, "engines": { - "node": ">= 6" + "node": ">= 14.6" + }, + "funding": { + "url": "https://github.com/sponsors/eemeli" } } }