Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 126 additions & 28 deletions sdk/src/sdk/postcall_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from typing import Any, Literal

import google.genai as genai
from google.genai import types as genai_types
Expand Down Expand Up @@ -177,22 +177,32 @@ def _validate_memory(raw: Any) -> ExtractedMemory | None:
"""Coerce a raw dict from Gemini into a clean :class:`ExtractedMemory`,
or return None if the shape is bad enough to drop.

We're forgiving: missing fields fill with sensible defaults. The
only hard reject is missing ``content`` (the row would be empty).
Forgiving: missing fields fill with sensible defaults. Non-string
values for the string fields (``content``, ``summary``, ``category``)
are rejected explicitly so a payload like ``{"content": 123}``
drops the row cleanly instead of raising ``AttributeError`` on
``.strip()`` — that crash used to propagate up through the
validation loop and abort the whole extraction, an unclassified
failure that bypassed the typed-status surface.
"""
if not isinstance(raw, dict):
return None
content = (raw.get("content") or "").strip()
raw_content = raw.get("content")
if not isinstance(raw_content, str):
return None
content = raw_content.strip()
if not content:
return None
summary = (raw.get("summary") or content[:80]).strip()
raw_summary = raw.get("summary")
summary = (raw_summary if isinstance(raw_summary, str) else content[:80]).strip()
raw_topics = raw.get("topics") or []
topics: list[str] = []
if isinstance(raw_topics, list):
for t in raw_topics[:5]:
if isinstance(t, str) and t.strip():
topics.append(t.strip().lower())
category = (raw.get("category") or "general").strip().lower()
raw_category = raw.get("category")
category = (raw_category if isinstance(raw_category, str) else "general").strip().lower()
if category not in CATEGORIES:
category = "general"
return ExtractedMemory(
Expand All @@ -203,20 +213,87 @@ def _validate_memory(raw: Any) -> ExtractedMemory | None:
)


async def _extract_memories(transcript: str) -> list[ExtractedMemory]:
# Status hints emitted by `_extract_memories` so the caller's
# completion log distinguishes "Gemini auth broke" from "transcript was
# genuinely empty" — the conflation that hid the 2026-05-15 voice-path
# silent-loss for three days (see openclaw-livekit#29).
#
# `extracted` = memories list non-empty, capture step runs next.
# `empty_extraction` = valid transcript reached Gemini, Gemini returned
# no memory objects. Genuinely uneventful call.
# `no_transcript_text` = transcript was whitespace-only (handled here;
# `run_extraction` separately handles the "no transcript file" case
# as `no_transcript`).
# `no_api_key` = GEMINI_API_KEY / GOOGLE_API_KEY unset.
# `auth_failed` = Gemini returned 401 / 403 / UNAUTHENTICATED. The
# typical credential-rotation-not-propagated failure (see
# wiki/gotchas/openclaw-livekit-deploy-traps §1).
# `transport_failed` = catch-all non-auth Gemini call failure (network
# error, timeout, connection refused, unexpected SDK-side exception).
# The provider may or may not have been reached — distinguishing them
# requires more SDK-specific exception introspection than is worth
# the complexity for the operator-side signal. Treat as "something
# went wrong outside the auth check; look at the ERROR log line above
# the completion line for specifics."
# `parse_failed` = Gemini returned non-JSON or non-conformant JSON
# (no `memories` array, wrong shape).
ExtractionStatus = Literal[
"extracted",
"empty_extraction",
"no_transcript_text",
"no_api_key",
"auth_failed",
"transport_failed",
"parse_failed",
]


@dataclass(frozen=True)
class ExtractionResult:
"""Outcome of one Gemini extraction call.

``memories`` is empty for every status except ``extracted``. The
``status`` field carries the cause when ``memories`` is empty so
the completion-log line can distinguish auth failure from
genuine no-extraction (the silent-loss class fixed in
openclaw-livekit#29).
"""

memories: list[ExtractedMemory]
status: ExtractionStatus


def _classify_gemini_exception(exc: BaseException) -> ExtractionStatus:
"""Map a Gemini SDK exception to an `ExtractionStatus`.

The google-genai SDK doesn't expose typed auth/transport exceptions
we can isinstance-match cleanly. Falls back to substring matching
on the formatted message — the same shape the production 2026-05-15
incident surfaced (``"401 UNAUTHENTICATED"`` substring). Order
matters: auth check before transport so a 401 reaching us via a
transport-shaped exception still classifies correctly.
"""
msg = str(exc)
if "401" in msg or "403" in msg or "UNAUTHENTICATED" in msg or "PERMISSION_DENIED" in msg:
return "auth_failed"
return "transport_failed"


async def _extract_memories(transcript: str) -> ExtractionResult:
"""Send the transcript to Gemini Flash and parse the JSON response.

Returns ``[]`` on any failure: missing API key, network error,
malformed JSON, schema-violating response. The caller treats an
empty list as "no extraction happened" — explicit saves from the
call are still in Musubi.
Returns an :class:`ExtractionResult` whose ``status`` field
distinguishes the failure modes that used to collapse to ``[]``:
auth failure, transport failure, parse failure, missing API key,
or genuinely empty extraction. The caller uses the status hint
on the completion log line.
"""
if not transcript.strip():
return []
return ExtractionResult(memories=[], status="no_transcript_text")
api_key = _gemini_api_key()
if not api_key:
logger.warning("postcall_memory: no Gemini API key, skipping extraction")
return []
return ExtractionResult(memories=[], status="no_api_key")

client = genai.Client(api_key=api_key)

Expand All @@ -238,24 +315,29 @@ def _call() -> str:
raw_text = await asyncio.to_thread(_call)
except Exception as exc:
logger.error("postcall_memory: Gemini call failed: %s", exc)
return []
return ExtractionResult(memories=[], status=_classify_gemini_exception(exc))

try:
data = json.loads(raw_text)
except json.JSONDecodeError as exc:
logger.error("postcall_memory: malformed JSON from Gemini: %s", exc)
return []
return ExtractionResult(memories=[], status="parse_failed")

raw_memories = data.get("memories") if isinstance(data, dict) else None
if not isinstance(raw_memories, list):
return []
return ExtractionResult(memories=[], status="parse_failed")

out: list[ExtractedMemory] = []
for r in raw_memories:
m = _validate_memory(r)
if m is not None:
out.append(m)
return out
if not out:
# Gemini reached + parsed but produced 0 valid memories. This is
# the genuine "uneventful call" path, distinct from auth/parse
# failures above.
return ExtractionResult(memories=[], status="empty_extraction")
return ExtractionResult(memories=out, status="extracted")
Comment thread
ericmey marked this conversation as resolved.


# --- capture ----------------------------------------------------------------
Expand Down Expand Up @@ -328,8 +410,26 @@ async def run_extraction(

def _complete(status: str, *, extracted: int = 0, captured: int = 0) -> int:
"""Single completion log line so audit/Rin can grep one shape.
Status is one of: ``no_transcript``, ``empty_extraction``,
``captured``, ``no_captures``."""

Status is one of:
- ``no_transcript`` (transcript file missing / unreadable)
- ``no_transcript_text`` (file present, body whitespace-only)
- ``no_api_key`` (GEMINI_API_KEY/GOOGLE_API_KEY unset)
- ``auth_failed`` (Gemini 401/403 — most often a stale key
per wiki/gotchas/openclaw-livekit-deploy-traps §1)
- ``transport_failed`` (Gemini network/timeout error)
- ``parse_failed`` (Gemini returned non-JSON or wrong shape)
- ``empty_extraction`` (Gemini reached + parsed but produced
zero memories — genuinely uneventful call)
- ``captured`` (memories extracted AND at least one capture
succeeded)
- ``no_captures`` (memories extracted but every capture failed)

Pre-openclaw-livekit#29, ``auth_failed`` / ``transport_failed`` /
``parse_failed`` / ``empty_extraction`` all logged as
``empty_extraction`` — silently hid the 2026-05-15 voice path
breakage for three days.
"""
total_ms = int((time.monotonic() - started) * 1000)
logger.info(
"postcall_memory: completed call_sid=%s status=%s extracted=%d captured=%d total_ms=%d",
Expand All @@ -349,20 +449,18 @@ def _complete(status: str, *, extracted: int = 0, captured: int = 0) -> int:
if transcript is None:
return _complete("no_transcript")

memories = await _extract_memories(transcript)
if not memories:
# Could be: Gemini errored, malformed JSON, empty transcript, or
# genuinely nothing extractable. The error-level log lines from
# _extract_memories distinguish them in the upstream log; here we
# just record the outcome.
return _complete("empty_extraction")
result = await _extract_memories(transcript)
if result.status != "extracted":
# Propagate the typed status from _extract_memories — distinguishes
# auth/transport/parse failures from genuine empty extraction.
return _complete(result.status)

if client is None:
cfg = MusubiV2ClientConfig.from_env()
client = MusubiV2Client(cfg)

captured = 0
for memory in memories:
for memory in result.memories:
ok = await _capture_one(
client=client,
namespace=namespace,
Expand All @@ -374,7 +472,7 @@ def _complete(status: str, *, extracted: int = 0, captured: int = 0) -> int:
captured += 1

status = "captured" if captured > 0 else "no_captures"
return _complete(status, extracted=len(memories), captured=captured)
return _complete(status, extracted=len(result.memories), captured=captured)


# --- wiring -----------------------------------------------------------------
Expand Down
Loading