Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 150 additions & 29 deletions plutus_agent/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
credit is an append-only ledger; the org balance is the sum of its deltas, so it
is always auditable and can never silently drift.

All money is stored in USD as REAL. All timestamps are Unix epoch seconds
(matching ``plutus.py``'s ``state.db`` convention).
All money is stored as **integer micro-dollars** (1 USD = 1_000_000 micros) in
columns suffixed ``_micros``. Integers sum exactly in SQLite, so a large
``SUM(delta_micros)`` never accumulates the sub-cent float drift that a REAL
ledger would — we convert to float USD only once, at the read boundary, via the
``micros_to_usd`` helper. The public Python API of this module still speaks
float USD; the integer representation is an internal storage detail. All
timestamps are Unix epoch seconds (matching ``plutus.py``'s ``state.db``).

The connection uses WAL + a row factory returning ``sqlite3.Row`` so callers get
dict-like rows. Nothing here imports Flask/Stripe — it's pure stdlib and works
Expand All @@ -21,7 +26,31 @@
from pathlib import Path
from typing import Optional

SCHEMA_VERSION = 3
SCHEMA_VERSION = 4

# ---- money: integer micro-dollars ------------------------------------------
# All money is stored as integer micro-dollars (1 USD == MICROS_PER_USD micros).
# Convert at the boundary only: integers accumulate exactly in SQL, so we incur
# a single rounding when crossing back to float USD for display / Stripe.
MICROS_PER_USD = 1_000_000


def usd_to_micros(usd) -> int:
"""Convert a float/Decimal/str USD amount to integer micro-dollars.

Rounds to the nearest micro using banker-safe ``round`` on a scaled value.
``None`` maps to ``None`` so nullable money columns round-trip cleanly.
"""
if usd is None:
return None
return int(round(float(usd) * MICROS_PER_USD))


def micros_to_usd(micros) -> float:
"""Convert integer micro-dollars back to float USD. ``None`` -> ``None``."""
if micros is None:
return None
return micros / MICROS_PER_USD

SCHEMA = """
CREATE TABLE IF NOT EXISTS organizations (
Expand All @@ -48,7 +77,7 @@
org_id TEXT NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
name TEXT NOT NULL,
slug TEXT NOT NULL,
monthly_budget_usd REAL,
monthly_budget_micros INTEGER,
created_at REAL NOT NULL,
UNIQUE(org_id, slug)
);
Expand All @@ -64,7 +93,7 @@
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_tokens INTEGER NOT NULL DEFAULT 0,
reasoning_tokens INTEGER NOT NULL DEFAULT 0,
cost_usd REAL NOT NULL DEFAULT 0,
cost_micros INTEGER NOT NULL DEFAULT 0,
estimated INTEGER NOT NULL DEFAULT 1,
source TEXT NOT NULL DEFAULT 'api',
ts REAL NOT NULL
Expand All @@ -76,11 +105,11 @@
CREATE TABLE IF NOT EXISTS credit_ledger (
id TEXT PRIMARY KEY,
org_id TEXT NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
delta_usd REAL NOT NULL, -- +topup/grant/refund, -debit
delta_micros INTEGER NOT NULL, -- +topup/grant/refund, -debit
kind TEXT NOT NULL, -- topup|grant|debit|refund|adjust
reason TEXT,
stripe_ref TEXT,
balance_after REAL NOT NULL,
balance_after_micros INTEGER NOT NULL,
ts REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_ledger_org_ts ON credit_ledger(org_id, ts);
Expand Down Expand Up @@ -158,6 +187,7 @@ def connect(path: Optional[str | Path] = None) -> sqlite3.Connection:


def init_schema(conn: sqlite3.Connection) -> None:
_migrate_money_to_micros(conn)
conn.executescript(SCHEMA)
conn.execute(
"INSERT OR REPLACE INTO meta(key,value) VALUES('schema_version',?)",
Expand All @@ -166,6 +196,51 @@ def init_schema(conn: sqlite3.Connection) -> None:
conn.commit()


def _table_columns(conn, table: str) -> set:
try:
return {r["name"] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
except sqlite3.Error:
return set()


def _migrate_money_to_micros(conn) -> None:
"""Convert a pre-v4 database (REAL USD money columns) to integer micros.

Idempotent: detects the legacy column on each table and, if present, adds the
new ``*_micros`` column, back-fills it as ``round(usd * 1e6)``, and drops the
old column (via SQLite's column-drop, available 3.35+, with a table-rebuild
fallback for older SQLite). A fresh database has no legacy columns, so this
is a no-op and the canonical SCHEMA creates the integer columns directly.
"""
plan = [
("usage_events", "cost_usd", "cost_micros"),
("credit_ledger", "delta_usd", "delta_micros"),
("credit_ledger", "balance_after", "balance_after_micros"),
("workspaces", "monthly_budget_usd", "monthly_budget_micros"),
]
did_any = False
for table, old_col, new_col in plan:
cols = _table_columns(conn, table)
if not cols or old_col not in cols:
continue # fresh DB or already migrated
did_any = True
if new_col not in cols:
coltype = "INTEGER NOT NULL DEFAULT 0" if old_col != "monthly_budget_usd" else "INTEGER"
conn.execute(f"ALTER TABLE {table} ADD COLUMN {new_col} {coltype}")
# Back-fill: round to nearest micro. NULL budgets stay NULL.
conn.execute(
f"UPDATE {table} SET {new_col} = CAST(ROUND({old_col} * 1000000) AS INTEGER) "
f"WHERE {old_col} IS NOT NULL"
)
# Drop the legacy column. SQLite >= 3.35 supports DROP COLUMN directly.
try:
conn.execute(f"ALTER TABLE {table} DROP COLUMN {old_col}")
except sqlite3.OperationalError:
pass # older SQLite: leave the now-unused REAL column in place
if did_any:
conn.commit()


# ---------------------------------------------------------- organizations ----
def create_org(conn, name: str, tier: str = "free",
owner_email: Optional[str] = None,
Expand Down Expand Up @@ -384,63 +459,109 @@ def create_workspace(conn, org_id: str, name: str,
n += 1
slug = f"{base}-{n}"
conn.execute(
"INSERT INTO workspaces(id,org_id,name,slug,monthly_budget_usd,created_at)"
"INSERT INTO workspaces(id,org_id,name,slug,monthly_budget_micros,created_at)"
" VALUES(?,?,?,?,?,?)",
(wid, org_id, name, slug, monthly_budget_usd, time.time()),
(wid, org_id, name, slug, usd_to_micros(monthly_budget_usd), time.time()),
)
conn.commit()
return conn.execute("SELECT * FROM workspaces WHERE id=?", (wid,)).fetchone()
return get_workspace(conn, wid)


def list_workspaces(conn, org_id: str) -> list[sqlite3.Row]:
return conn.execute(
def _workspace_row(row: Optional[sqlite3.Row]):
"""Expose ``monthly_budget_usd`` (float USD) alongside the stored micros."""
if row is None:
return None
d = dict(row)
d["monthly_budget_usd"] = micros_to_usd(d.get("monthly_budget_micros"))
return d


def list_workspaces(conn, org_id: str) -> list[dict]:
rows = conn.execute(
"SELECT * FROM workspaces WHERE org_id=? ORDER BY created_at", (org_id,)
).fetchall()
return [_workspace_row(r) for r in rows]


def get_workspace(conn, workspace_id: str) -> Optional[sqlite3.Row]:
return conn.execute("SELECT * FROM workspaces WHERE id=?", (workspace_id,)).fetchone()
def get_workspace(conn, workspace_id: str):
row = conn.execute("SELECT * FROM workspaces WHERE id=?", (workspace_id,)).fetchone()
return _workspace_row(row)


# ----------------------------------------------------------------- credit ----
def get_balance(conn, org_id: str) -> float:
"""Authoritative balance = sum of all ledger deltas.
"""Authoritative balance in float USD = sum of all ledger deltas.

Computed from the deltas rather than the latest row's ``balance_after`` so it
is correct regardless of insertion / timestamp order (live metering arrives
in order; demo seeding and historical back-fill do not). ``balance_after``
remains a best-effort audit column.
Computed from the integer micro-dollar deltas rather than the latest row's
``balance_after_micros`` so it is correct regardless of insertion /
timestamp order (live metering arrives in order; demo seeding and historical
back-fill do not). Integers sum exactly, so there is no float drift; we
convert to USD once, here at the boundary.
"""
row = conn.execute(
"SELECT COALESCE(SUM(delta_usd),0) bal FROM credit_ledger WHERE org_id=?",
"SELECT COALESCE(SUM(delta_micros),0) bal FROM credit_ledger WHERE org_id=?",
(org_id,),
).fetchone()
return micros_to_usd(int(row["bal"]))


def get_balance_micros(conn, org_id: str) -> int:
"""Authoritative balance in integer micro-dollars (no float involved)."""
row = conn.execute(
"SELECT COALESCE(SUM(delta_micros),0) bal FROM credit_ledger WHERE org_id=?",
(org_id,),
).fetchone()
return round(float(row["bal"]), 6)
return int(row["bal"])


def add_ledger(conn, org_id: str, delta_usd: float, kind: str,
reason: str = "", stripe_ref: Optional[str] = None,
ts: Optional[float] = None, commit: bool = True) -> sqlite3.Row:
"""Add a ledger entry. Balance is authoritative via SUM(delta_usd);
balance_after is a best-effort audit column computed at insert time."""
"""Add a ledger entry. ``delta_usd`` is float USD on the API; it is stored
as integer micro-dollars. Balance is authoritative via SUM(delta_micros);
balance_after_micros is a best-effort audit column computed at insert time.
The returned row exposes float ``delta_usd`` / ``balance_after`` aliases so
existing callers keep working.
"""
ts = ts if ts is not None else time.time()
balance_after = round(get_balance(conn, org_id) + delta_usd, 6)
delta_micros = usd_to_micros(delta_usd)
balance_after_micros = get_balance_micros(conn, org_id) + delta_micros
lid = new_id("led")
conn.execute(
"INSERT INTO credit_ledger(id,org_id,delta_usd,kind,reason,stripe_ref,balance_after,ts)"
"INSERT INTO credit_ledger(id,org_id,delta_micros,kind,reason,stripe_ref,balance_after_micros,ts)"
" VALUES(?,?,?,?,?,?,?,?)",
(lid, org_id, round(delta_usd, 6), kind, reason, stripe_ref, balance_after, ts),
(lid, org_id, delta_micros, kind, reason, stripe_ref, balance_after_micros, ts),
)
if commit:
conn.commit()
return conn.execute("SELECT * FROM credit_ledger WHERE id=?", (lid,)).fetchone()
return get_ledger_entry(conn, lid)


def ledger_history(conn, org_id: str, limit: int = 50) -> list[sqlite3.Row]:
return conn.execute(
def _ledger_row_with_usd(row: Optional[sqlite3.Row]) -> Optional[dict]:
"""Return a ledger row as a dict with float USD aliases added.

Adds ``delta_usd`` and ``balance_after`` (float USD) alongside the stored
``*_micros`` integer columns so callers and templates can use either.
"""
if row is None:
return None
d = dict(row)
d["delta_usd"] = micros_to_usd(d["delta_micros"])
d["balance_after"] = micros_to_usd(d["balance_after_micros"])
return d


def get_ledger_entry(conn, ledger_id: str) -> Optional[dict]:
row = conn.execute("SELECT * FROM credit_ledger WHERE id=?", (ledger_id,)).fetchone()
return _ledger_row_with_usd(row)


def ledger_history(conn, org_id: str, limit: int = 50) -> list[dict]:
rows = conn.execute(
"SELECT * FROM credit_ledger WHERE org_id=? ORDER BY ts DESC, rowid DESC LIMIT ?",
(org_id, limit),
).fetchall()
return [_ledger_row_with_usd(r) for r in rows]


# ------------------------------------------------------------- stripe idemp ---
Expand Down
31 changes: 18 additions & 13 deletions plutus_agent/metering.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ def record_usage(conn, org_id: str, provider: str,
eid = db.new_id("evt")
conn.execute(
"INSERT INTO usage_events(id,org_id,workspace_id,provider,model,task_type,"
"input_tokens,output_tokens,cache_read_tokens,reasoning_tokens,cost_usd,"
"input_tokens,output_tokens,cache_read_tokens,reasoning_tokens,cost_micros,"
"estimated,source,ts) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(eid, org_id, workspace_id, provider, model, task_type,
int(input_tokens), int(output_tokens), int(cache_read_tokens),
int(reasoning_tokens), cost_usd, int(estimated), source, ts),
int(reasoning_tokens), db.usd_to_micros(cost_usd), int(estimated), source, ts),
)

# Deplete prepaid credit (only when there's credit to deplete; orgs on the
Expand Down Expand Up @@ -245,10 +245,10 @@ def workspace_mtd_spend(conn, workspace_id: str, now: Optional[float] = None) ->
now = now if now is not None else time.time()
floor = _month_floor(now)
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd),0) s FROM usage_events WHERE workspace_id=? AND ts>=?",
"SELECT COALESCE(SUM(cost_micros),0) s FROM usage_events WHERE workspace_id=? AND ts>=?",
(workspace_id, floor),
).fetchone()
return float(row["s"])
return db.micros_to_usd(int(row["s"]))


def org_spend_windows(conn, org_id: str, now: Optional[float] = None) -> dict:
Expand All @@ -258,12 +258,12 @@ def org_spend_windows(conn, org_id: str, now: Optional[float] = None) -> dict:
out = {}
for name, floor in windows.items():
row = conn.execute(
"SELECT COALESCE(SUM(cost_usd),0) cost, COALESCE(SUM(input_tokens+output_tokens"
"SELECT COALESCE(SUM(cost_micros),0) cost, COALESCE(SUM(input_tokens+output_tokens"
"+cache_read_tokens+reasoning_tokens),0) tok, COUNT(*) n "
"FROM usage_events WHERE org_id=? AND ts>=?",
(org_id, floor),
).fetchone()
out[name] = {"cost": float(row["cost"]), "tokens": int(row["tok"]),
out[name] = {"cost": db.micros_to_usd(int(row["cost"])), "tokens": int(row["tok"]),
"events": int(row["n"])}
return out

Expand All @@ -280,13 +280,13 @@ def spend_by(conn, org_id: str, dimension: str, since: float = 0,
}[dimension]
join = "LEFT JOIN workspaces w ON w.id = ue.workspace_id" if dimension == "workspace" else ""
rows = conn.execute(
f"SELECT {col} AS k, COALESCE(SUM(ue.cost_usd),0) cost, "
f"SELECT {col} AS k, COALESCE(SUM(ue.cost_micros),0) cost, "
f"COALESCE(SUM(ue.input_tokens+ue.output_tokens+ue.cache_read_tokens+ue.reasoning_tokens),0) tok, "
f"COUNT(*) n FROM usage_events ue {join} "
f"WHERE ue.org_id=? AND ue.ts>=? GROUP BY k ORDER BY cost DESC",
(org_id, since),
).fetchall()
return [{"key": r["k"], "cost": float(r["cost"]), "tokens": int(r["tok"]),
return [{"key": r["k"], "cost": db.micros_to_usd(int(r["cost"])), "tokens": int(r["tok"]),
"events": int(r["n"])} for r in rows]


Expand All @@ -299,8 +299,8 @@ def provider_health(conn, org_id: str, now: Optional[float] = None) -> list[dict
now = now if now is not None else time.time()
rows = conn.execute(
"SELECT provider, MAX(ts) last_ts, "
"COALESCE(SUM(CASE WHEN ts>=? THEN cost_usd ELSE 0 END),0) c7, "
"COALESCE(SUM(cost_usd),0) all_cost, COUNT(*) n "
"COALESCE(SUM(CASE WHEN ts>=? THEN cost_micros ELSE 0 END),0) c7, "
"COALESCE(SUM(cost_micros),0) all_cost, COUNT(*) n "
"FROM usage_events WHERE org_id=? GROUP BY provider ORDER BY all_cost DESC",
(now - 7 * DAY, org_id),
).fetchall()
Expand All @@ -311,8 +311,8 @@ def provider_health(conn, org_id: str, now: Optional[float] = None) -> list[dict
out.append({
"provider": r["provider"],
"last_ts": r["last_ts"],
"burn_per_day": round(float(r["c7"]) / 7.0, 4),
"all_cost": float(r["all_cost"]),
"burn_per_day": round(db.micros_to_usd(int(r["c7"])) / 7.0, 4),
"all_cost": db.micros_to_usd(int(r["all_cost"])),
"events": int(r["n"]),
"status": status,
})
Expand Down Expand Up @@ -374,7 +374,12 @@ def recent_events(conn, org_id: str, limit: int = 25) -> list[dict]:
"WHERE ue.org_id=? ORDER BY ue.ts DESC LIMIT ?",
(org_id, limit),
).fetchall()
return [dict(r) for r in rows]
out = []
for r in rows:
d = dict(r)
d["cost_usd"] = db.micros_to_usd(int(d.get("cost_micros", 0) or 0))
out.append(d)
return out


def org_summary(conn, org_id: str, now: Optional[float] = None) -> dict:
Expand Down
Loading
Loading