Skip to content

noverde/serpens

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

349 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

serpens

Shared Python building blocks for DotzInc services: thin layers over SQLAlchemy 2.0, Redis, httpx, Pub/Sub, SQS and friends. The lib stays out of the way — apps import the underlying SDKs directly for the heavy lifting and use serpens for the configuration that should not be duplicated across 24 repos.

SQS

from serpens import sqs

@sqs.handler
def message_processor(record: sqs.Record):
    print(record.body)

sqs.Record exposes data, body, message_attributes, queue_name, sent_datetime.

Lambda API

from serpens import api

@api.handler
def lambda_handler(request: api.Request):
    print(request.body)

api.Request exposes authorizer, body, path, query, headers, identity. All but body are AttrDictrequest.path.user_id shortcut for request.path["user_id"].

Async handlers (api.async_handler)

Same contract as @api.handler, but for async def handlers — needed when the body awaits coroutines (FastAPI/Mangum integration, async DB sessions, async HTTP clients).

from serpens import api
from serpens.database import async_db_session

@api.async_handler
async def lambda_handler(request: api.Request):
    async with async_db_session() as sess:
        ...

Why use it

  • Single response/error contract. _build_response and _error_response are shared with the sync version: same response shape, same elastic-apm capture, same JSON encoding via SchemaEncoder. No drift between sync and async handler responses across services.
  • Required to use async SQLAlchemy / httpx / cache_async inside a Lambda. A regular @api.handler cannot await.

Migration scenarios

Today's code Move to What you gain
@api.handler def lambda_handler(...) that calls asyncio.run(...) internally @api.async_handler async def lambda_handler(...) One event loop per invocation, no asyncio.run boilerplate, can use async libs throughout the handler
FastAPI / Mangum bridge with hand-rolled response shaping @api.async_handler Same response shape across sync/async Lambdas, central elastic-apm capture

The sync @api.handler remains the right choice when the handler has no async work.

Schema

Dataclass with static type checks and dict/JSON helpers.

from dataclasses import dataclass
from serpens.schema import Schema

@dataclass
class Person(Schema):
    name: str
    age: int

Person.load({"name": "Mike", "age": 18})
Person.loads('{"name": "Mike", "age": 18}')
Person("Mike", 18).dump()      # dict
Person("Mike", 18).dumps()     # JSON string

CSV

from serpens import csvutils as csv

for row in csv.open_csv_reader("fruits.csv"):
    print(row)

w = csv.open_csv_writer("out.csv")
w.writerow(["id", "name"]); w.writerow(["1", "Açaí"])

Database

Thin layer over SQLAlchemy 2.0. Owns:

  • Engine setup with production defaults (Postgres statement_timeout / lock_timeout / idle_in_transaction_session_timeout, Cloud SQL keepalives, scheme normalization, pool_pre_ping / pool_use_lifo, Lambda-aware tuning).
  • Session factories (SessionLocal, AsyncSessionLocal).
  • Declarative Base and TimestampMixin.
  • Alembic helper (see Migrations).
  • Generic Repository[T] / AsyncRepository[T] covering the CRUD every service used to re-implement.

Query construction stays in sqlalchemy proper — the lib does not re-export select, Integer, etc.

Why use it

  • Production hardening baked in. statement_timeout, lock_timeout, idle_in_transaction_session_timeout, Cloud SQL keepalives, pool tuning and pool_pre_ping apply automatically. Apps that hand-roll create_engine(...) skip this; the lib makes it the default.
  • One source of truth for engine config. Pool size, recycle, LIFO checkout, Postgres timeouts — all env-driven, consistent across services. No more per-app drift on max_overflow or pool_recycle.
  • Symmetric sync/async. bind / async_bind, SessionLocal / AsyncSessionLocal, db_session / async_db_session. Same mental model on either side; mix freely.
  • Repository[T] removes CRUD boilerplate. PK lookup, filtered query, paginate, add, bulk_add, upsert (Postgres ON CONFLICT RETURNING), with deliberate gaps where services should diverge (no hard-delete, no partial update — see recipes).
  • Alembic glue. serpens.database.alembic.run_migrations(metadata) drives migrations from a Lambda or CLI with one line in env.py.

Migration scenarios

Today's code Move to What you gain
Hand-rolled create_engine(...) + sessionmaker(...) serpens.database.bind + SessionLocal / db_session Postgres timeouts, Cloud SQL keepalives, env-driven pool tuning, scheme normalization, pool_pre_ping, Lambda-aware defaults
Pony ORM serpens.database + SQLAlchemy 2.0 Async support, typed Mapped[...] columns, Alembic instead of yoyo, the SA 2.0 ecosystem; see Migrating from Pony ORM
Per-service CRUD repositories (each app reimplements get_by_id, list, paginate) serpens.database.Repository[T] / AsyncRepository[T] Shared base, upsert primitive for idempotency, NotFound exception, free pagination
Direct redis.asyncio.Redis + manual aclose() in DB-adjacent code serpens.database.async_bind + async_db_session Lifecycle helpers, autoflush=False, expire_on_commit=False sensible defaults

Hello world (sync)

from sqlalchemy import Integer, String, select
from sqlalchemy.orm import Mapped, mapped_column

from serpens.database import Base, SessionLocal, TimestampMixin, bind


class User(TimestampMixin, Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String, nullable=False)


bind()  # reads DATABASE_URL

with SessionLocal() as sess:
    sess.add(User(name="Ana"))
    sess.commit()

Hello world (async)

from sqlalchemy import select
from serpens.database import async_bind, async_db_session

async_bind()  # reads DATABASE_URL, normalizes scheme to asyncpg

async def fetch(user_id: int):
    async with async_db_session() as sess:
        return (await sess.scalars(select(User).filter_by(id=user_id))).first()

Async requires asyncpg (Postgres) or aiosqlite (SQLite).

Two ways to open a session

Explicit (preferred). SessionLocal() / AsyncSessionLocal() — caller owns commit/rollback. FastAPI handlers should use the per-request dependency:

from fastapi import Depends
from sqlalchemy.orm import Session
from serpens.database import fastapi_session  # or fastapi_async_session

@app.get("/users/{id}")
def get_user(id: int, db: Session = Depends(fastapi_session)):
    return db.scalars(select(User).filter_by(id=id)).first()

Auto-managed. db_session() / async_db_session() — context managers that commit on success, roll back on exception and close always. Convenient for Lambda handlers and short scripts. Pass the session explicitly to helpers:

from serpens.database import async_db_session

async def fetch(sess, user_id: int):
    return (await sess.scalars(select(User).filter_by(id=user_id))).first()

async with async_db_session() as sess:
    user = await fetch(sess, 1)

There is no current_session() global — sessions are passed.

Schema scoping

from serpens.database import declarative_base

Base = declarative_base(schema="public")

Configuration

Variable Default Purpose
DATABASE_URL Connection string. postgres:// and postgresql:// are normalized to postgresql+psycopg2:// (sync) / postgresql+asyncpg:// (async).
APP_NAME serpens Postgres application_name. Overridden by K_SERVICE (Cloud Run) or AWS_LAMBDA_FUNCTION_NAME (Lambda).
DB_POOL_SIZE 10 Pool size. Set to 1 on Lambda.
DB_MAX_OVERFLOW 20 Extra connections. Set to 0 on Lambda.
DB_POOL_TIMEOUT 10 Seconds to wait for a free connection.
DB_POOL_RECYCLE 1800 Recycle older connections.
DB_STATEMENT_TIMEOUT_MS 5000 Postgres statement_timeout.
DB_LOCK_TIMEOUT_MS 2000 Postgres lock_timeout.
DB_IDLE_IN_TX_TIMEOUT_MS 10000 Postgres idle_in_transaction_session_timeout.
DB_POOL_USE_LIFO true LIFO checkout (warm connections preferred). Set false for FIFO.
DB_ECHO false Log every SQL statement.

bind() and async_bind() also accept pool_use_lifo=True/False as a direct override of the env var.

Cloud SQL keepalives (keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=3) are applied automatically on Postgres connections.

Bind from FastAPI lifespan, never at import time

from contextlib import asynccontextmanager
from fastapi import FastAPI
from serpens.database import async_bind, async_dispose

@asynccontextmanager
async def lifespan(_app: FastAPI):
    async_bind()
    yield
    await async_dispose()

app = FastAPI(lifespan=lifespan)

Calling bind() at the top of models.py makes import order matter and breaks under cold-start.

Repository (optional)

Repository[T] (sync) and AsyncRepository[T] (async) cover the CRUD that every service rewrites: PK lookups, filtered queries, paginate, add, upsert. Subclass with model = X and add your own methods for anything custom — the query property exposes a Select(model) you compose on. The base intentionally does not ship hard-delete or partial update: services do soft-delete differently and updates often need optimistic locking.

from serpens.database import AsyncRepository

class ProductRepo(AsyncRepository[Product]):
    model = Product

    async def by_slug(self, slug):                 # custom lookup
        return await self.get_by(slug=slug)

async with async_db_session() as sess:
    p = await ProductRepo(sess).by_slug("some-product")

Built-in methods: get, get_or_raise (raises serpens.database.NotFound), get_by, exists, count, list(order_by=, limit=, offset=, **filters), paginate(stmt=, page=, size=), add(obj, flush=True), bulk_add(objs), upsert(values, conflict_on=, update_fields=).

Recipe: soft-delete

Don't expose hard delete. Add a method on your repo:

class PaymentRepo(AsyncRepository[Payment]):
    model = Payment

    async def cancel(self, payment, *, reason: str):
        payment.status = "cancelled"
        payment.cancel_reason = reason
        await self.sess.flush()

Recipe: optimistic locking

Declare version_id_col on the model — Repository doesn't fight it:

class Payment(TimestampMixin, Base):
    __tablename__ = "payments"
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    amount: Mapped[Decimal] = mapped_column(Numeric)
    version: Mapped[int] = mapped_column(Integer, nullable=False)
    __mapper_args__ = {"version_id_col": version}

SA will raise StaleDataError on concurrent update. Catch it in the handler.

Recipe: idempotent insert (upsert, not get_or_create)

get_or_create has a race between SELECT and INSERT. Use upsert instead — Postgres INSERT ... ON CONFLICT ... RETURNING:

class IdempotentPaymentRepo(AsyncRepository[Payment]):
    model = Payment

await IdempotentPaymentRepo(sess).upsert(
    {"external_id": req.idempotency_key, "amount": req.amount, "status": "received"},
    conflict_on=["external_id"],
    update_fields=["amount"],   # omit to do nothing on conflict
)

Migrations

New repos use Alembic. serpens.migrations (yoyo) is kept only for legacy services that haven't migrated yet.

serpens.database.alembic.run_migrations wires Alembic against your app's Base.metadata with sensible defaults (offline/online, scheme normalization to psycopg2, NullPool to keep migration jobs from leaking pool slots).

alembic/env.py:

from myapp.models import Base
from serpens.database.alembic import run_migrations

run_migrations(target_metadata=Base.metadata)

alembic.ini (minimal):

[alembic]
script_location = alembic
prepend_sys_path = .
file_template = %%(year)d%%(month).2d%%(day).2d_%%(rev)s_%%(slug)s
sqlalchemy.url =

Run with DATABASE_URL set:

alembic revision -m "add user table" --autogenerate
alembic upgrade head

For a Lambda migration job (no alembic.ini in the package):

import os
from alembic import command
from alembic.config import Config

def migrate_handler(event, context):
    cfg = Config()
    cfg.set_main_option("script_location", os.path.dirname(os.path.abspath(__file__)))
    command.upgrade(cfg, "head")

Migrating from Pony ORM

Pony lacks async, lacks typed Mapped[T], and is in limited maintenance. The platform standardised on SQLAlchemy 2.0 via serpens.database. Use the async API by default.

Mapping table

Pony SQLAlchemy 2.0 + serpens
class X(db.Entity): class X(TimestampMixin, Base):
name = Required(str) name: Mapped[str] = mapped_column(String, nullable=False)
email = Optional(str) email: Mapped[str | None] = mapped_column(String, nullable=True)
created_at = Required(datetime) inherit TimestampMixin
loans = Set(lambda: Loan) loans: Mapped[list["Loan"]] = relationship(back_populates="user")
composite_key(a, b) __table_args__ = (UniqueConstraint("a", "b"),)
_table_ = ("public", "users") __tablename__ = "users"; __table_args__ = {"schema": "public"}
@db_session decorator with db_session() block or pass Session
X(field=value) (auto-flush) obj = X(...); sess.add(obj); sess.flush()
X.get(field=value) sess.scalars(select(X).filter_by(field=value)).first()
X.select(...).order_by(X.id)[:10] sess.scalars(select(X).order_by(X.id).limit(10)).all()
X.select_by_sql("SELECT ...", params) sess.scalars(select(X).from_statement(text("SELECT ..."))).all()
db.generate_mapping(create_tables=True) Base.metadata.create_all(engine)

Per-file recipe

Branch from stagingfeat/migrate-pony-to-sqlalchemy.

  1. requirements.txt — bump noverde-serpens, ensure SQLAlchemy>=2.0, add asyncpg (for async) and/or psycopg2-binary (for sync + Alembic), drop pony and yoyo-migrations.
  2. Models — replace db = Database() and class X(db.Entity). Methods stuck to the entity (X.get_by_slug, X.create) become module-level functions taking Session / AsyncSession as first argument:
    async def get_product_by_slug(sess: AsyncSession, slug: str) -> Product | None:
        return (await sess.scalars(select(Product).filter_by(slug=slug))).first()
  3. Handlers — open async with async_db_session() as sess: and pass sess down. In FastAPI routes use Depends(fastapi_async_session).
  4. main.pyasync_bind() runs in a FastAPI lifespan, never at import time.
  5. TestssetUp uses async with async_db_session() as sess: sess.add(...). tearDown uses await sess.execute(delete(...)). testgres.setup(Base) still works.
  6. Migrations — yoyo → Alembic. Add alembic.ini, alembic/env.py delegating to serpens.database.alembic.run_migrations, and a baseline revision wrapping the existing schema with IF NOT EXISTS. Lambda Migrate switches to Handler: migrate_handler.migrate_handler. Run alembic stamp 0001_baseline once per environment.

Common gotchas

  • Optimistic lock changes. Pony locks read rows by default; SA 2.0 does not. If a job relied on it, opt back in with version_id_col on the model.
  • X(...) does not INSERT in SA 2.0. Use sess.add(obj) and, if you need obj.id populated, sess.flush().
  • autoflush=False. Serpens disables autoflush so a stray select doesn't flush pending changes. Call sess.flush() explicitly when needed.
  • Lambda pool: DB_POOL_SIZE=1, DB_MAX_OVERFLOW=0. A larger pool causes Cloud SQL churn under burst.
  • postgres:// vs postgresql://: SA 2.0 dropped the short prefix. serpens normalises both.
  • Schema declaration: pick one place — declarative_base(schema=...) centralized, or __table_args__={"schema":...} per model. Don't mix.

When NOT to use serpens.database

  • The repo already has an idiomatic SA 2.0 SessionLocal. Don't migrate just for standardization.
  • You need an SA feature serpens does not expose — import from sqlalchemy directly. Serpens is a thin layer by design.

DynamoDB

from dataclasses import dataclass
from serpens.document import BaseDocument

@dataclass
class PersonDocument(BaseDocument):
    _table_name_ = "person"
    id: str
    name: str

PersonDocument(id="1", name="Ana").save()
PersonDocument.get_by_key({"id": "1"})
PersonDocument.get_table()

Async HTTP client

Singleton httpx.AsyncClient — connection pools survive across requests.

from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI
from httpx import AsyncClient
from serpens.http_client import close_client, get_client, init_client

@asynccontextmanager
async def lifespan(_app: FastAPI):
    await init_client()
    yield
    await close_client()

app = FastAPI(lifespan=lifespan)

@app.get("/proxy")
async def proxy(client: AsyncClient = Depends(get_client)):
    return (await client.get("https://example.com")).json()

Timeout defaults to HTTP_CLIENT_TIMEOUT (env, seconds) or 30s. Extra kwargs pass through to httpx.AsyncClient.

Why use it

  • Pool reuse across requests. A new AsyncClient(...) per request creates and tears down TCP+TLS connections every call. The singleton amortises the handshake across the lifetime of the process — material latency win on chatty integrations.
  • One lifecycle to wire. init_client / close_client plug into FastAPI lifespan (or startup/shutdown for older versions). No per-handler instantiation boilerplate.
  • Env-driven timeout. HTTP_CLIENT_TIMEOUT standardises the cap; per-service overrides through the same channel.

Migration scenarios

Today's code Move to What you gain
async with httpx.AsyncClient() as client: await client.get(...) per call init_client() in lifespan + get_client() in handlers Pool reuse, lower TCP/TLS overhead, central timeout
requests.get(...) (sync) inside a FastAPI handler init_client() + await client.get(...) Stops blocking the event loop for the duration of the call
Per-service aiohttp / httpx singleton with hand-rolled lifecycle serpens.http_client Shared implementation, one less file per repo

Greenfield FastAPI services adopt this from day one; existing async services swap a couple of imports.

Rate limiter

Token-bucket limiter for outbound calls plus an auth_lock that serializes token refresh (avoids thundering-herd re-auths on expiry).

from serpens.rate_limit import RateLimiter

limiter = RateLimiter(rate=20, per_seconds=1.0)

@asynccontextmanager
async def lifespan(_app):
    limiter.start()
    yield
    await limiter.stop()

async def call_external():
    await limiter.acquire()
    return await client.get(...)

async def fetch_token():
    async with limiter.auth_lock:
        return await cached_get_or_set("token", 1800, _refresh_token)

Why use it

  • Respects upstream quotas without manual back-off. Most third-party APIs (banks, KYC providers, credit bureaus) cap requests-per-second; exceeding it triggers 429s or temporary blocks. Token bucket gives a smooth, predictable throughput at the configured ceiling.
  • auth_lock solves the thundering herd. When a JWT/OAuth token expires, every concurrent coroutine tries to refresh at the same time. The lock guarantees one refresh per expiry while the rest wait on it.
  • Asyncio-native. No third-party aiolimiter dependency; replenisher runs as an asyncio.Task you control via start/stop.

Migration scenarios

Today's code Move to What you gain
asyncio.Semaphore(N) hand-rolled to cap concurrency RateLimiter(rate=N, per_seconds=...) Time-based replenishment instead of pure concurrency cap; predictable RPS
aiolimiter / external rate-limit lib serpens.rate_limit One less dep; same primitive
No rate limiting at all (calls 3rd-party until 429) serpens.rate_limit Stops invalidating provider relationships and triggering exponential back-off cascades
Hand-rolled asyncio.Lock around token refresh limiter.auth_lock Bundled with the rate limit; less wiring

Typical fit: any FastAPI / async service calling a quoted upstream (banking, KYC, payment processor). Worth adopting alongside serpens.http_client since they cover the same call path.

Cache

serpens.cache ships three flavors in a single module. Pick by sync/async and by scope (process-local vs distributed).

Why use it

  • Fails open. A Redis outage degrades to "no cache" instead of crashing the caller. Reads return None (treated as miss), writes/deletes become no-ops, decorators fall through to the wrapped function. Each failure logs a warning. Most plain redis.asyncio.Redis wrappers don't trap connection errors and propagate them to handlers.
  • Single source of truth. Stops the per-service drift (in-process TTL caches reimplemented in each repo, Redis lifecycle wired by hand, etc.).
  • Symmetric APIs. Sync, async in-process and async Redis share the same mental model: decorator-based caching plus low-level get/set/delete.
  • Lifecycle helpers built in for the Redis flavor — redis_init / redis_close for module-level singleton usage, redis_pool for FastAPI Depends injection.
  • JSON serialization for free on redis_get / redis_set / redis_cached; raw bytes are still available through redis_pool when needed.

Migration scenarios

Today's code Move to What you gain
Third-party Redis Depends factory (callable yielding redis.asyncio.Redis) serpens.cache.redis_pool Fail-open client on Redis outage; same Depends contract, one-line swap
App-local acached / in-process async TTL cache serpens.cache.acached One implementation maintained centrally; monotonic-clock TTL; same self-aware key heuristic
Direct Redis.from_url(...) + manual aclose() in Lambda serpens.cache.redis_init / redis_close / redis_get / redis_set / redis_cached Lifecycle helpers, auto JSON serialization, fail-open, env-driven prefix/TTL
serpens.cache.cached (sync legacy) unchanged Already lives here; consumed by parameters and secrets_manager

The migration is intentionally minimal — typically a single import line per app. The behavior gain (Redis outages no longer break callers) is automatic on the new APIs.

Sync, in-process (legacy)

Used by serpens.parameters, serpens.secrets_manager and downstream services. TTL bucketed by name.

from serpens.cache import cached, clear_cache

@cached("secrets_manager", 900)
def get(secret_id):
    ...

clear_cache("secrets_manager")

Async, in-process

acached / clear_acache — same idea, for async def callers. The decorator drops the first positional argument from the key, on the assumption it's self (a repository or service object pointing at the same store). Different instances therefore share entries — fine for read-mostly data. Uses time.monotonic so TTL is immune to clock adjustments.

from serpens.cache import acached, clear_acache

class ProductRepo:
    @acached("products", ttl_seconds=600)
    async def get_by_slug(self, slug: str):
        return await self.session.scalar(select(Product).where(Product.slug == slug))

clear_acache("products")  # one bucket
clear_acache()            # everything

Async, Redis-backed

For FastAPI / long-running services that need a cache shared across workers and instances. Lifecycle: redis_init once at startup, redis_close at shutdown.

Fails open. On RedisError (host unreachable, timeout, refused connection) reads return None (treated as miss), writes/deletes become no-ops, and redis_cached_get_or_set falls through to the wrapped function. Each failure logs a warning. Programming errors (using the client before redis_init) still raise RuntimeError.

from serpens.cache import (
    redis_init, redis_close, redis_get, redis_set, redis_delete,
    redis_cached, redis_cached_get_or_set,
)

@asynccontextmanager
async def lifespan(_app):
    await redis_init()
    yield
    await redis_close()

await redis_set("user:42", {"name": "Ana"}, ttl=60)
user = await redis_get("user:42")

@redis_cached("products", ttl=600)
async def get_product(slug: str):
    return await fetch_product(slug)
Variable Default Purpose
REDIS_URL Redis connection string.
CACHE_PREFIX serpens Prefix prepended to every key. Set per-service.
CACHE_TTL 300 Default TTL for redis_set / redis_cached.

FastAPI Depends style

redis_pool(url) returns a callable suitable for FastAPI Depends, yielding a fail-open Redis client per request. The same fail-open semantics apply: get returns None, set/delete no-op on RedisError.

from fastapi import Depends, FastAPI
from redis.asyncio import Redis
from serpens.cache import redis_pool

app = FastAPI()
cache = redis_pool(settings.REDIS_URL)

@app.get("/users/{user_id}")
async def get_user(user_id: str, client: Redis = Depends(cache)):
    return await client.get(f"user:{user_id}")

Use this when the rest of the app expects a redis.asyncio.Redis client (e.g. when wiring third-party libraries that take cache_gen). For Lambda / single-process apps, the redis_* module-level functions above are simpler.

In tests

testgres.setup(Base, redis_mode=True) spins a Redis container alongside Postgres and exports REDIS_URLredis_init() picks it up without further config. If REDIS_URL is already set, the existing instance is reused.

Async Pub/Sub publisher

serpens.pubsub.AsyncPublisher wraps the sync Google SDK with asyncio.wrap_future so await publish(...) does not block the event loop. Instantiate once per process in lifespan, close on shutdown.

topic is the full topic id (projects/PROJECT/topics/NAME) — the same value Terraform exposes as an env var, no need to rebuild it via client.topic_path(...). When elasticapm is installed, every publish emits a messaging span labeled with the topic.

from serpens.pubsub import AsyncPublisher

publisher = AsyncPublisher()

@asynccontextmanager
async def lifespan(_app):
    yield
    publisher.close()

async def emit(payload: dict):
    await publisher.publish(settings.MY_TOPIC, payload)

Why use it

  • Doesn't block the event loop. The Google SDK is synchronous (returns a concurrent.futures.Future). Without asyncio.wrap_future, awaiting a publish in a FastAPI handler stalls every other request on the same worker. AsyncPublisher bridges the gap.
  • Terraform-friendly topic id. Accepts projects/.../topics/... directly — same value already exposed as MY_TOPIC env in your Terraform module. No need to keep project_id separately and call client.topic_path(project, topic) everywhere.
  • APM observability for free. When elasticapm is installed, every publish emits a messaging span with queue_name=<topic>. No-op if APM isn't present.
  • Single connection per process. The client is instantiated once on app boot; gRPC channel reuse cuts per-publish overhead.

Migration scenarios

Today's code Move to What you gain
pubsub_v1.PublisherClient() + client.topic_path(project, topic) + future.result() per publish AsyncPublisher() + await publisher.publish(topic, payload) Non-blocking publish, full topic id, central APM span emission, one client per process
serpens.pubsub.publish_message(...) (sync, creates a new PublisherClient per call) inside an async handler AsyncPublisher Avoids the per-call client construction; no event loop blocking
Per-service TracedMessagePublisher / APM-aware wrapper AsyncPublisher Removes the duplicated wrapper; spans emitted from the lib

The sync serpens.pubsub.publish_message / publish_message_batch remain the right choice for Lambda one-shots or non-async code paths.

Test infrastructure (testgres)

serpens.testgres.setup wires a Postgres (and optionally Redis) container into a unittest/pytest suite, running create_all against your Base.metadata before the first test.

# conftest.py
from serpens import testgres
from myapp.models import Base

testgres.setup(Base, async_mode=True, redis_mode=True)

Modes

Flag Effect When to enable
(default) Spins a Postgres container, binds database.SessionLocal (sync) Lambda services / sync codebases
async_mode=True Also binds database.AsyncSessionLocal with NullPool FastAPI services / async tests; remove the per-conftest async engine wiring
redis_mode=True Spins a Redis container alongside Postgres, exports REDIS_URL Services using serpens.cache.redis_* or any Redis client; replaces the manual docker run redis boilerplate teams currently keep in their own conftest.py
default_schema="x,y" Pre-creates the schemas and sets search_path Services using schema scoping via declarative_base(schema=...)
uri=... / DATABASE_URL env Skips the container, uses the provided URI CI runners that already have Postgres available
REDIS_URL env (when redis_mode=True) Skips the Redis container, uses the provided URL CI runners with existing Redis

Why use it

  • One conftest line replaces a dozen. Tests that previously wired create_engine, metadata.create_all, sessionmaker, container lifecycle, schema setup and Redis container setup collapse to a single setup(...) call.
  • Real Postgres in tests. Caught the kind of bug SQLite can't model (ON CONFLICT, JSONB operators, schema-qualified names). Same engine semantics as production.
  • Async + sync engines wired together. Both SessionLocal and AsyncSessionLocal point at the same database — tests can mix.
  • Defers create_all to startTestRun. Models registered after setup() returns (common when tests do dynamic imports) still get their tables.
  • Graceful failure. Waits for the published TCP port and a real psycopg2.connect, raising a clear RuntimeError if the container doesn't come up — no more silent test hangs.

Migration scenarios

Today's code Move to What you gain
Hand-rolled docker run postgres:13 in test setup + manual create_engine + metadata.create_all serpens.testgres.setup(Base) Container lifecycle, schema bootstrap, sane defaults, error propagation
Test suite that wires async engine separately from sync (parallel AsyncSessionLocal setup in conftest.py) setup(Base, async_mode=True) One factory wired automatically with NullPool (correct for tests)
docker run postgres + docker run redis boilerplate in conftest.py setup(Base, redis_mode=True) One call, both containers, env vars exported

About

A set of Python utilities, recipes and snippets

Topics

Resources

License

Stars

Watchers

Forks

Contributors