diff --git a/plutus_agent/db.py b/plutus_agent/db.py index 16c24a7..97a012a 100644 --- a/plutus_agent/db.py +++ b/plutus_agent/db.py @@ -5,8 +5,13 @@ credit is an append-only ledger; the org balance is the sum of its deltas, so it is always auditable and can never silently drift. -All money is stored in USD as REAL. All timestamps are Unix epoch seconds -(matching ``plutus.py``'s ``state.db`` convention). +All money is stored as **integer micro-dollars** (1 USD = 1_000_000 micros) in +columns suffixed ``_micros``. Integers sum exactly in SQLite, so a large +``SUM(delta_micros)`` never accumulates the sub-cent float drift that a REAL +ledger would — we convert to float USD only once, at the read boundary, via the +``micros_to_usd`` helper. The public Python API of this module still speaks +float USD; the integer representation is an internal storage detail. All +timestamps are Unix epoch seconds (matching ``plutus.py``'s ``state.db``). The connection uses WAL + a row factory returning ``sqlite3.Row`` so callers get dict-like rows. Nothing here imports Flask/Stripe — it's pure stdlib and works @@ -21,7 +26,31 @@ from pathlib import Path from typing import Optional -SCHEMA_VERSION = 3 +SCHEMA_VERSION = 4 + +# ---- money: integer micro-dollars ------------------------------------------ +# All money is stored as integer micro-dollars (1 USD == MICROS_PER_USD micros). +# Convert at the boundary only: integers accumulate exactly in SQL, so we incur +# a single rounding when crossing back to float USD for display / Stripe. +MICROS_PER_USD = 1_000_000 + + +def usd_to_micros(usd) -> int: + """Convert a float/Decimal/str USD amount to integer micro-dollars. + + Rounds to the nearest micro using banker-safe ``round`` on a scaled value. + ``None`` maps to ``None`` so nullable money columns round-trip cleanly. + """ + if usd is None: + return None + return int(round(float(usd) * MICROS_PER_USD)) + + +def micros_to_usd(micros) -> float: + """Convert integer micro-dollars back to float USD. ``None`` -> ``None``.""" + if micros is None: + return None + return micros / MICROS_PER_USD SCHEMA = """ CREATE TABLE IF NOT EXISTS organizations ( @@ -48,7 +77,7 @@ org_id TEXT NOT NULL REFERENCES organizations(id) ON DELETE CASCADE, name TEXT NOT NULL, slug TEXT NOT NULL, - monthly_budget_usd REAL, + monthly_budget_micros INTEGER, created_at REAL NOT NULL, UNIQUE(org_id, slug) ); @@ -64,7 +93,7 @@ output_tokens INTEGER NOT NULL DEFAULT 0, cache_read_tokens INTEGER NOT NULL DEFAULT 0, reasoning_tokens INTEGER NOT NULL DEFAULT 0, - cost_usd REAL NOT NULL DEFAULT 0, + cost_micros INTEGER NOT NULL DEFAULT 0, estimated INTEGER NOT NULL DEFAULT 1, source TEXT NOT NULL DEFAULT 'api', ts REAL NOT NULL @@ -76,11 +105,11 @@ CREATE TABLE IF NOT EXISTS credit_ledger ( id TEXT PRIMARY KEY, org_id TEXT NOT NULL REFERENCES organizations(id) ON DELETE CASCADE, - delta_usd REAL NOT NULL, -- +topup/grant/refund, -debit + delta_micros INTEGER NOT NULL, -- +topup/grant/refund, -debit kind TEXT NOT NULL, -- topup|grant|debit|refund|adjust reason TEXT, stripe_ref TEXT, - balance_after REAL NOT NULL, + balance_after_micros INTEGER NOT NULL, ts REAL NOT NULL ); CREATE INDEX IF NOT EXISTS ix_ledger_org_ts ON credit_ledger(org_id, ts); @@ -158,6 +187,7 @@ def connect(path: Optional[str | Path] = None) -> sqlite3.Connection: def init_schema(conn: sqlite3.Connection) -> None: + _migrate_money_to_micros(conn) conn.executescript(SCHEMA) conn.execute( "INSERT OR REPLACE INTO meta(key,value) VALUES('schema_version',?)", @@ -166,6 +196,51 @@ def init_schema(conn: sqlite3.Connection) -> None: conn.commit() +def _table_columns(conn, table: str) -> set: + try: + return {r["name"] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()} + except sqlite3.Error: + return set() + + +def _migrate_money_to_micros(conn) -> None: + """Convert a pre-v4 database (REAL USD money columns) to integer micros. + + Idempotent: detects the legacy column on each table and, if present, adds the + new ``*_micros`` column, back-fills it as ``round(usd * 1e6)``, and drops the + old column (via SQLite's column-drop, available 3.35+, with a table-rebuild + fallback for older SQLite). A fresh database has no legacy columns, so this + is a no-op and the canonical SCHEMA creates the integer columns directly. + """ + plan = [ + ("usage_events", "cost_usd", "cost_micros"), + ("credit_ledger", "delta_usd", "delta_micros"), + ("credit_ledger", "balance_after", "balance_after_micros"), + ("workspaces", "monthly_budget_usd", "monthly_budget_micros"), + ] + did_any = False + for table, old_col, new_col in plan: + cols = _table_columns(conn, table) + if not cols or old_col not in cols: + continue # fresh DB or already migrated + did_any = True + if new_col not in cols: + coltype = "INTEGER NOT NULL DEFAULT 0" if old_col != "monthly_budget_usd" else "INTEGER" + conn.execute(f"ALTER TABLE {table} ADD COLUMN {new_col} {coltype}") + # Back-fill: round to nearest micro. NULL budgets stay NULL. + conn.execute( + f"UPDATE {table} SET {new_col} = CAST(ROUND({old_col} * 1000000) AS INTEGER) " + f"WHERE {old_col} IS NOT NULL" + ) + # Drop the legacy column. SQLite >= 3.35 supports DROP COLUMN directly. + try: + conn.execute(f"ALTER TABLE {table} DROP COLUMN {old_col}") + except sqlite3.OperationalError: + pass # older SQLite: leave the now-unused REAL column in place + if did_any: + conn.commit() + + # ---------------------------------------------------------- organizations ---- def create_org(conn, name: str, tier: str = "free", owner_email: Optional[str] = None, @@ -384,63 +459,109 @@ def create_workspace(conn, org_id: str, name: str, n += 1 slug = f"{base}-{n}" conn.execute( - "INSERT INTO workspaces(id,org_id,name,slug,monthly_budget_usd,created_at)" + "INSERT INTO workspaces(id,org_id,name,slug,monthly_budget_micros,created_at)" " VALUES(?,?,?,?,?,?)", - (wid, org_id, name, slug, monthly_budget_usd, time.time()), + (wid, org_id, name, slug, usd_to_micros(monthly_budget_usd), time.time()), ) conn.commit() - return conn.execute("SELECT * FROM workspaces WHERE id=?", (wid,)).fetchone() + return get_workspace(conn, wid) -def list_workspaces(conn, org_id: str) -> list[sqlite3.Row]: - return conn.execute( +def _workspace_row(row: Optional[sqlite3.Row]): + """Expose ``monthly_budget_usd`` (float USD) alongside the stored micros.""" + if row is None: + return None + d = dict(row) + d["monthly_budget_usd"] = micros_to_usd(d.get("monthly_budget_micros")) + return d + + +def list_workspaces(conn, org_id: str) -> list[dict]: + rows = conn.execute( "SELECT * FROM workspaces WHERE org_id=? ORDER BY created_at", (org_id,) ).fetchall() + return [_workspace_row(r) for r in rows] -def get_workspace(conn, workspace_id: str) -> Optional[sqlite3.Row]: - return conn.execute("SELECT * FROM workspaces WHERE id=?", (workspace_id,)).fetchone() +def get_workspace(conn, workspace_id: str): + row = conn.execute("SELECT * FROM workspaces WHERE id=?", (workspace_id,)).fetchone() + return _workspace_row(row) # ----------------------------------------------------------------- credit ---- def get_balance(conn, org_id: str) -> float: - """Authoritative balance = sum of all ledger deltas. + """Authoritative balance in float USD = sum of all ledger deltas. - Computed from the deltas rather than the latest row's ``balance_after`` so it - is correct regardless of insertion / timestamp order (live metering arrives - in order; demo seeding and historical back-fill do not). ``balance_after`` - remains a best-effort audit column. + Computed from the integer micro-dollar deltas rather than the latest row's + ``balance_after_micros`` so it is correct regardless of insertion / + timestamp order (live metering arrives in order; demo seeding and historical + back-fill do not). Integers sum exactly, so there is no float drift; we + convert to USD once, here at the boundary. """ row = conn.execute( - "SELECT COALESCE(SUM(delta_usd),0) bal FROM credit_ledger WHERE org_id=?", + "SELECT COALESCE(SUM(delta_micros),0) bal FROM credit_ledger WHERE org_id=?", + (org_id,), + ).fetchone() + return micros_to_usd(int(row["bal"])) + + +def get_balance_micros(conn, org_id: str) -> int: + """Authoritative balance in integer micro-dollars (no float involved).""" + row = conn.execute( + "SELECT COALESCE(SUM(delta_micros),0) bal FROM credit_ledger WHERE org_id=?", (org_id,), ).fetchone() - return round(float(row["bal"]), 6) + return int(row["bal"]) def add_ledger(conn, org_id: str, delta_usd: float, kind: str, reason: str = "", stripe_ref: Optional[str] = None, ts: Optional[float] = None, commit: bool = True) -> sqlite3.Row: - """Add a ledger entry. Balance is authoritative via SUM(delta_usd); - balance_after is a best-effort audit column computed at insert time.""" + """Add a ledger entry. ``delta_usd`` is float USD on the API; it is stored + as integer micro-dollars. Balance is authoritative via SUM(delta_micros); + balance_after_micros is a best-effort audit column computed at insert time. + The returned row exposes float ``delta_usd`` / ``balance_after`` aliases so + existing callers keep working. + """ ts = ts if ts is not None else time.time() - balance_after = round(get_balance(conn, org_id) + delta_usd, 6) + delta_micros = usd_to_micros(delta_usd) + balance_after_micros = get_balance_micros(conn, org_id) + delta_micros lid = new_id("led") conn.execute( - "INSERT INTO credit_ledger(id,org_id,delta_usd,kind,reason,stripe_ref,balance_after,ts)" + "INSERT INTO credit_ledger(id,org_id,delta_micros,kind,reason,stripe_ref,balance_after_micros,ts)" " VALUES(?,?,?,?,?,?,?,?)", - (lid, org_id, round(delta_usd, 6), kind, reason, stripe_ref, balance_after, ts), + (lid, org_id, delta_micros, kind, reason, stripe_ref, balance_after_micros, ts), ) if commit: conn.commit() - return conn.execute("SELECT * FROM credit_ledger WHERE id=?", (lid,)).fetchone() + return get_ledger_entry(conn, lid) -def ledger_history(conn, org_id: str, limit: int = 50) -> list[sqlite3.Row]: - return conn.execute( +def _ledger_row_with_usd(row: Optional[sqlite3.Row]) -> Optional[dict]: + """Return a ledger row as a dict with float USD aliases added. + + Adds ``delta_usd`` and ``balance_after`` (float USD) alongside the stored + ``*_micros`` integer columns so callers and templates can use either. + """ + if row is None: + return None + d = dict(row) + d["delta_usd"] = micros_to_usd(d["delta_micros"]) + d["balance_after"] = micros_to_usd(d["balance_after_micros"]) + return d + + +def get_ledger_entry(conn, ledger_id: str) -> Optional[dict]: + row = conn.execute("SELECT * FROM credit_ledger WHERE id=?", (ledger_id,)).fetchone() + return _ledger_row_with_usd(row) + + +def ledger_history(conn, org_id: str, limit: int = 50) -> list[dict]: + rows = conn.execute( "SELECT * FROM credit_ledger WHERE org_id=? ORDER BY ts DESC, rowid DESC LIMIT ?", (org_id, limit), ).fetchall() + return [_ledger_row_with_usd(r) for r in rows] # ------------------------------------------------------------- stripe idemp --- diff --git a/plutus_agent/metering.py b/plutus_agent/metering.py index bf9dcc2..d4d1970 100644 --- a/plutus_agent/metering.py +++ b/plutus_agent/metering.py @@ -141,11 +141,11 @@ def record_usage(conn, org_id: str, provider: str, eid = db.new_id("evt") conn.execute( "INSERT INTO usage_events(id,org_id,workspace_id,provider,model,task_type," - "input_tokens,output_tokens,cache_read_tokens,reasoning_tokens,cost_usd," + "input_tokens,output_tokens,cache_read_tokens,reasoning_tokens,cost_micros," "estimated,source,ts) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)", (eid, org_id, workspace_id, provider, model, task_type, int(input_tokens), int(output_tokens), int(cache_read_tokens), - int(reasoning_tokens), cost_usd, int(estimated), source, ts), + int(reasoning_tokens), db.usd_to_micros(cost_usd), int(estimated), source, ts), ) # Deplete prepaid credit (only when there's credit to deplete; orgs on the @@ -245,10 +245,10 @@ def workspace_mtd_spend(conn, workspace_id: str, now: Optional[float] = None) -> now = now if now is not None else time.time() floor = _month_floor(now) row = conn.execute( - "SELECT COALESCE(SUM(cost_usd),0) s FROM usage_events WHERE workspace_id=? AND ts>=?", + "SELECT COALESCE(SUM(cost_micros),0) s FROM usage_events WHERE workspace_id=? AND ts>=?", (workspace_id, floor), ).fetchone() - return float(row["s"]) + return db.micros_to_usd(int(row["s"])) def org_spend_windows(conn, org_id: str, now: Optional[float] = None) -> dict: @@ -258,12 +258,12 @@ def org_spend_windows(conn, org_id: str, now: Optional[float] = None) -> dict: out = {} for name, floor in windows.items(): row = conn.execute( - "SELECT COALESCE(SUM(cost_usd),0) cost, COALESCE(SUM(input_tokens+output_tokens" + "SELECT COALESCE(SUM(cost_micros),0) cost, COALESCE(SUM(input_tokens+output_tokens" "+cache_read_tokens+reasoning_tokens),0) tok, COUNT(*) n " "FROM usage_events WHERE org_id=? AND ts>=?", (org_id, floor), ).fetchone() - out[name] = {"cost": float(row["cost"]), "tokens": int(row["tok"]), + out[name] = {"cost": db.micros_to_usd(int(row["cost"])), "tokens": int(row["tok"]), "events": int(row["n"])} return out @@ -280,13 +280,13 @@ def spend_by(conn, org_id: str, dimension: str, since: float = 0, }[dimension] join = "LEFT JOIN workspaces w ON w.id = ue.workspace_id" if dimension == "workspace" else "" rows = conn.execute( - f"SELECT {col} AS k, COALESCE(SUM(ue.cost_usd),0) cost, " + f"SELECT {col} AS k, COALESCE(SUM(ue.cost_micros),0) cost, " f"COALESCE(SUM(ue.input_tokens+ue.output_tokens+ue.cache_read_tokens+ue.reasoning_tokens),0) tok, " f"COUNT(*) n FROM usage_events ue {join} " f"WHERE ue.org_id=? AND ue.ts>=? GROUP BY k ORDER BY cost DESC", (org_id, since), ).fetchall() - return [{"key": r["k"], "cost": float(r["cost"]), "tokens": int(r["tok"]), + return [{"key": r["k"], "cost": db.micros_to_usd(int(r["cost"])), "tokens": int(r["tok"]), "events": int(r["n"])} for r in rows] @@ -299,8 +299,8 @@ def provider_health(conn, org_id: str, now: Optional[float] = None) -> list[dict now = now if now is not None else time.time() rows = conn.execute( "SELECT provider, MAX(ts) last_ts, " - "COALESCE(SUM(CASE WHEN ts>=? THEN cost_usd ELSE 0 END),0) c7, " - "COALESCE(SUM(cost_usd),0) all_cost, COUNT(*) n " + "COALESCE(SUM(CASE WHEN ts>=? THEN cost_micros ELSE 0 END),0) c7, " + "COALESCE(SUM(cost_micros),0) all_cost, COUNT(*) n " "FROM usage_events WHERE org_id=? GROUP BY provider ORDER BY all_cost DESC", (now - 7 * DAY, org_id), ).fetchall() @@ -311,8 +311,8 @@ def provider_health(conn, org_id: str, now: Optional[float] = None) -> list[dict out.append({ "provider": r["provider"], "last_ts": r["last_ts"], - "burn_per_day": round(float(r["c7"]) / 7.0, 4), - "all_cost": float(r["all_cost"]), + "burn_per_day": round(db.micros_to_usd(int(r["c7"])) / 7.0, 4), + "all_cost": db.micros_to_usd(int(r["all_cost"])), "events": int(r["n"]), "status": status, }) @@ -374,7 +374,12 @@ def recent_events(conn, org_id: str, limit: int = 25) -> list[dict]: "WHERE ue.org_id=? ORDER BY ue.ts DESC LIMIT ?", (org_id, limit), ).fetchall() - return [dict(r) for r in rows] + out = [] + for r in rows: + d = dict(r) + d["cost_usd"] = db.micros_to_usd(int(d.get("cost_micros", 0) or 0)) + out.append(d) + return out def org_summary(conn, org_id: str, now: Optional[float] = None) -> dict: diff --git a/plutus_agent/reports.py b/plutus_agent/reports.py index 24b7e9d..9d36ab5 100644 --- a/plutus_agent/reports.py +++ b/plutus_agent/reports.py @@ -38,18 +38,18 @@ def agg(dimension): rows = conn.execute( _SPEND_SQL[dimension], (org_id, start, end) ).fetchall() - return [{"key": r["k"], "cost": float(r["cost"]), + return [{"key": r["k"], "cost": db.micros_to_usd(int(r["cost"])), "tokens": int(r["tok"]), "events": int(r["n"])} for r in rows] total = conn.execute( - "SELECT COALESCE(SUM(cost_usd),0) cost, " + "SELECT COALESCE(SUM(cost_micros),0) cost, " "COALESCE(SUM(input_tokens+output_tokens+cache_read_tokens+reasoning_tokens),0) tok, " "COUNT(*) n FROM usage_events WHERE org_id=? AND ts>=? AND ts=? AND ts=? AND ts=? AND ts=? AND ts=? AND ue.tsmicros conversion round-trips and rounds correctly. + 2. Money is stored as INTEGER micros and sums exactly (no float drift) over a + large ledger -- the whole reason for the migration. + 3. A legacy v3 database (REAL USD columns) migrates idempotently to v4 micros + with values preserved. +""" +import os +import sqlite3 +import sys +import tempfile +import unittest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from plutus_agent import db, metering + + +def _fresh(): + fd, path = tempfile.mkstemp(suffix=".db") + os.close(fd) + conn = db.connect(path) + db.init_schema(conn) + return conn, path + + +class TestConversion(unittest.TestCase): + def test_round_trip(self): + for usd in (0.0, 0.000001, 0.01, 1.0, 3.14159, 12345.678901, 0.142): + self.assertEqual(db.micros_to_usd(db.usd_to_micros(usd)), usd) + + def test_rounding_to_nearest_micro(self): + # 0.0000005 -> rounds to nearest micro; sub-micro precision is dropped. + self.assertEqual(db.usd_to_micros(0.0000004), 0) + self.assertEqual(db.usd_to_micros(0.0000006), 1) + self.assertEqual(db.usd_to_micros(1.2345678), 1234568) # 7th decimal rounds + + def test_none_passthrough(self): + self.assertIsNone(db.usd_to_micros(None)) + self.assertIsNone(db.micros_to_usd(None)) + + def test_constant(self): + self.assertEqual(db.MICROS_PER_USD, 1_000_000) + + +class TestExactSummation(unittest.TestCase): + """A float REAL ledger drifts when summing many sub-cent rows; integer + micros must sum exactly.""" + + def test_no_drift_over_many_small_debits(self): + conn, path = _fresh() + try: + org = db.create_org(conn, "Acme")["id"] + db.add_ledger(conn, org, 100.0, "topup") + # 10,000 debits of $0.001 each = exactly $10.00. + for _ in range(10_000): + db.add_ledger(conn, org, -0.001, "debit", commit=False) + conn.commit() + bal = db.get_balance(conn, org) + self.assertEqual(bal, 90.0) # exact, no 89.9999... + self.assertEqual(db.get_balance_micros(conn, org), 90_000_000) + finally: + conn.close() + os.unlink(path) + + def test_money_columns_are_integer(self): + conn, path = _fresh() + try: + cols = {r["name"]: r["type"] + for r in conn.execute("PRAGMA table_info(credit_ledger)")} + self.assertEqual(cols["delta_micros"], "INTEGER") + self.assertEqual(cols["balance_after_micros"], "INTEGER") + self.assertNotIn("delta_usd", cols) + ucols = {r["name"]: r["type"] + for r in conn.execute("PRAGMA table_info(usage_events)")} + self.assertEqual(ucols["cost_micros"], "INTEGER") + self.assertNotIn("cost_usd", ucols) + finally: + conn.close() + os.unlink(path) + + def test_metering_stores_micros(self): + conn, path = _fresh() + try: + org = db.create_org(conn, "Acme")["id"] + metering.record_usage(conn, org, provider="anthropic", + input_tokens=1000, output_tokens=500, + cost_usd=0.123456) + row = conn.execute("SELECT cost_micros FROM usage_events").fetchone() + self.assertEqual(row["cost_micros"], 123456) + self.assertIsInstance(row["cost_micros"], int) + finally: + conn.close() + os.unlink(path) + + +# Legacy v3 schema (REAL money columns) for the migration test. +_V3_SCHEMA = """ +CREATE TABLE organizations (id TEXT PRIMARY KEY, name TEXT, slug TEXT, + tier TEXT DEFAULT 'free', stripe_customer_id TEXT, created_at REAL); +CREATE TABLE workspaces (id TEXT PRIMARY KEY, org_id TEXT, name TEXT, slug TEXT, + monthly_budget_usd REAL, created_at REAL); +CREATE TABLE usage_events (id TEXT PRIMARY KEY, org_id TEXT, workspace_id TEXT, + provider TEXT, model TEXT, task_type TEXT, input_tokens INTEGER, + output_tokens INTEGER, cache_read_tokens INTEGER, reasoning_tokens INTEGER, + cost_usd REAL NOT NULL DEFAULT 0, estimated INTEGER, source TEXT, ts REAL); +CREATE TABLE credit_ledger (id TEXT PRIMARY KEY, org_id TEXT, delta_usd REAL, + kind TEXT, reason TEXT, stripe_ref TEXT, balance_after REAL, ts REAL); +CREATE TABLE meta (key TEXT PRIMARY KEY, value TEXT); +""" + + +class TestMigration(unittest.TestCase): + def _make_v3(self): + fd, path = tempfile.mkstemp(suffix=".db") + os.close(fd) + conn = sqlite3.connect(path) + conn.row_factory = sqlite3.Row + conn.executescript(_V3_SCHEMA) + conn.execute("INSERT INTO meta(key,value) VALUES('schema_version','3')") + conn.execute("INSERT INTO organizations(id,name,slug,created_at) VALUES('org_1','Acme','acme',0)") + conn.execute("INSERT INTO workspaces(id,org_id,name,slug,monthly_budget_usd,created_at)" + " VALUES('ws_1','org_1','Main','main',50.0,0)") + conn.execute("INSERT INTO usage_events(id,org_id,cost_usd,ts) VALUES('e1','org_1',0.123456,1)") + conn.execute("INSERT INTO credit_ledger(id,org_id,delta_usd,kind,balance_after,ts)" + " VALUES('l1','org_1',100.0,'topup',100.0,1)") + conn.execute("INSERT INTO credit_ledger(id,org_id,delta_usd,kind,balance_after,ts)" + " VALUES('l2','org_1',-0.50,'debit',99.5,2)") + conn.commit() + conn.close() + return path + + def test_v3_migrates_to_micros(self): + path = self._make_v3() + try: + conn = db.connect(path) + db.init_schema(conn) # triggers _migrate_money_to_micros + # Values converted exactly. + self.assertEqual( + conn.execute("SELECT cost_micros FROM usage_events").fetchone()["cost_micros"], + 123456) + self.assertEqual(db.get_balance(conn, "org_1"), 99.5) + self.assertEqual(db.get_balance_micros(conn, "org_1"), 99_500_000) + ws = db.get_workspace(conn, "ws_1") + self.assertEqual(ws["monthly_budget_usd"], 50.0) + self.assertEqual(ws["monthly_budget_micros"], 50_000_000) + self.assertEqual( + conn.execute("SELECT value FROM meta WHERE key='schema_version'") + .fetchone()["value"], str(db.SCHEMA_VERSION)) + conn.close() + finally: + os.unlink(path) + + def test_migration_is_idempotent(self): + path = self._make_v3() + try: + conn = db.connect(path) + db.init_schema(conn) + db.init_schema(conn) # second run must not double-convert or error + db.init_schema(conn) + self.assertEqual(db.get_balance_micros(conn, "org_1"), 99_500_000) + conn.close() + finally: + os.unlink(path) + + +if __name__ == "__main__": + unittest.main()