Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,44 @@ description is the shared trunk, and three sibling *bypasses* project it into:
The three bypasses are siblings — none is upstream of another; each is a
different aggregation of the same descriptions.

### Synthesis mode (optional)

By default `MEMORY.md` and the `skill/` tree are rendered deterministically from
already-extracted records. When `memory_files_config.synthesize=True`, they are
instead synthesized directly from the per-source descriptions by an LLM
(`memu.memory_fs.MemorySynthesizer`, prompts in `memu.prompts.memory_fs`):

- `MEMORY.md`: one LLM pass turns all descriptions into a consolidated memory doc.
- `skill/<name>/SKILL.md`: one LLM pass extracts skills as a JSON array of
`{name, body}` objects, each written as its own skill doc.

`INDEX.md` stays deterministic in both modes. Synthesis uses the
`synthesis_llm_profile` profile and leaves the existing memorize/extract pipeline
untouched.

### Initialize vs. incremental update

Synthesis is stateful and mirrors the "submit the changed part of the file system"
model. `MemoryService._build_memory_files(where, changed=...)` decides between two
paths:

- **Initialization** (no prior tree on disk, or `changed is None`): scan all
in-scope sources, turn each into its multimodal description, and synthesize
`MEMORY.md` + the `skill/` tree from scratch (`MemorySynthesizer.synthesize`).
- **Incremental update** (a tree already exists and a changed set is supplied):
read the existing `MEMORY.md` body and existing skill bodies back off disk and
merge only the changed sources' descriptions into them
(`MemorySynthesizer.update`, prompts `MEMORY_UPDATE_PROMPT` / `SKILL_UPDATE_PROMPT`).
Skills are upserted by slug, so untouched skills survive.

`INDEX.md` is always recomputed from the current source set, so it needs no LLM
merge. `export_memory_files(user=...)` always takes the initialization path (full
rebuild). When `memory_files_config.update_on_memorize=True`, each `memorize()`
call drives this builder with its just-created resources as the changed set, so the
tree initializes on first run and incrementally updates afterwards. The hook is
best-effort: an export failure is logged and never fails memorize, since the
structured memory is already persisted.

The exporter is read-only against the database and disabled by default
(`memory_files_config.enabled`). Diff detection is handled by a sidecar manifest
(`.memufs_manifest.json`) that stores per-file content hashes, so each export
Expand Down
27 changes: 27 additions & 0 deletions src/memu/app/memorize.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class MemorizeMixin:
_extract_json_blob: Callable[[str], str]
_escape_prompt_value: Callable[[str], str]
user_model: type[BaseModel]
memory_files_config: Any
_build_memory_files: Callable[..., Awaitable[dict[str, Any]]]

async def memorize(
self,
Expand Down Expand Up @@ -92,8 +94,33 @@ async def memorize(
if response is None:
msg = "Memorize workflow failed to produce a response"
raise RuntimeError(msg)

await self._maybe_update_memory_files(result, user_scope)
return response

async def _maybe_update_memory_files(
self, result: WorkflowState, user_scope: dict[str, Any] | None
) -> None:
"""Drive the memory file tree from a memorize call (init or incremental update).

Gated behind ``memory_files_config.enabled`` and ``update_on_memorize`` so the
default memorize behavior is unchanged. The just-created resources are the
"changed part of the file system" that an existing tree is updated against;
if no tree exists yet, ``_build_memory_files`` initializes it from the full
scoped store instead. Failures are best-effort: the memory is already
persisted, so an export error must not fail memorize.
"""
cfg = self.memory_files_config
if not (getattr(cfg, "enabled", False) and getattr(cfg, "update_on_memorize", False)):
return
changed = cast("list[Resource]", result.get("resources") or [])
if not changed:
return
try:
await self._build_memory_files(user_scope, changed=changed)
except Exception:
logger.exception("Memory file export failed after memorize")

def _build_memorize_workflow(self) -> list[WorkflowStep]:
steps = [
WorkflowStep(
Expand Down
44 changes: 43 additions & 1 deletion src/memu/app/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
LLMInterceptorHandle,
LLMInterceptorRegistry,
)
from memu.memory_fs import ExportResult, MemoryFileExporter
from memu.memory_fs import ExportResult, MemoryFileExporter, MemorySynthesizer
from memu.workflow.interceptor import WorkflowInterceptorHandle, WorkflowInterceptorRegistry
from memu.workflow.pipeline import PipelineManager
from memu.workflow.runner import WorkflowRunner, resolve_workflow_runner
Expand Down Expand Up @@ -96,6 +96,7 @@ def __init__(
# Writes are serialized through a single lock so concurrent exports never
# interleave on the shared output directory.
self._memory_file_exporter = MemoryFileExporter(self.memory_files_config.output_dir)
self._memory_synthesizer = MemorySynthesizer()
self._memory_files_lock = asyncio.Lock()

self._pipelines = PipelineManager(
Expand Down Expand Up @@ -382,11 +383,52 @@ async def export_memory_files(self, *, user: dict[str, Any] | None = None) -> di
msg = "Memory files are disabled; set memory_files_config.enabled=True to use export_memory_files()."
raise RuntimeError(msg)
where = self.user_model(**user).model_dump() if user is not None else None
# No changed set => full (re)initialization of the tree.
return await self._build_memory_files(where, changed=None)

async def _build_memory_files(
self,
where: dict[str, Any] | None,
*,
changed: list[Any] | None,
) -> dict[str, Any]:
"""Initialize or incrementally update the memory file tree.

``changed`` is the list of just-memorized ``Resource`` objects driving an
incremental update. When it is ``None`` (or no prior tree exists), the tree
is (re)initialized from the full scoped store.
"""
memory_body: str | None = None
skills: dict[str, str] | None = None

if self.memory_files_config.synthesize:
client = self._get_llm_client(self.memory_files_config.synthesis_llm_profile)
if changed is not None and self._memory_file_exporter.artifacts_exist():
# UPDATE: merge the changed descriptions into existing artifacts.
existing_memory = await asyncio.to_thread(self._memory_file_exporter.read_memory_body)
existing_skills = await asyncio.to_thread(self._memory_file_exporter.read_skills)
synthesized = await self._memory_synthesizer.update(
MemoryFileExporter._build_descriptions(changed),
existing_memory=existing_memory,
existing_skills=existing_skills,
chat=client.chat,
)
else:
# INIT: build from scratch over all in-scope descriptions.
resources = list(self.database.resource_repo.list_resources(where=where or None).values())
synthesized = await self._memory_synthesizer.synthesize(
MemoryFileExporter._build_descriptions(resources),
chat=client.chat,
)
memory_body, skills = synthesized.memory_body, synthesized.skills

async with self._memory_files_lock:
result: ExportResult = await asyncio.to_thread(
self._memory_file_exporter.export,
self.database,
where=where,
memory_body=memory_body,
skills=skills,
)
return result.to_dict()

Expand Down
18 changes: 18 additions & 0 deletions src/memu/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,24 @@ class MemoryFilesConfig(BaseModel):
default="./data/memory",
description="Directory where the memory markdown tree (INDEX.md/MEMORY.md/skill/) is written.",
)
synthesize: bool = Field(
default=False,
description=(
"Synthesize MEMORY.md and skill docs from per-source descriptions via the LLM "
"instead of rendering already-extracted records. INDEX.md stays deterministic."
),
)
synthesis_llm_profile: str = Field(
default="default",
description="LLM profile used when synthesize=True.",
)
update_on_memorize: bool = Field(
default=False,
description=(
"Automatically initialize or incrementally update the memory file tree after each "
"memorize() call, using the just-created resources as the changed file set."
),
)


class RetrieveCategoryConfig(BaseModel):
Expand Down
10 changes: 9 additions & 1 deletion src/memu/memory_fs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,13 @@
"""

from memu.memory_fs.exporter import ExportResult, FileDescription, MemoryFileExporter, slugify
from memu.memory_fs.synthesizer import MemorySynthesizer, SynthesisResult

__all__ = ["ExportResult", "FileDescription", "MemoryFileExporter", "slugify"]
__all__ = [
"ExportResult",
"FileDescription",
"MemoryFileExporter",
"MemorySynthesizer",
"SynthesisResult",
"slugify",
]
96 changes: 80 additions & 16 deletions src/memu/memory_fs/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,21 @@ class MemoryFileExporter:
def __init__(self, output_dir: str) -> None:
self.output_dir = pathlib.Path(output_dir)

def export(self, database: Database, *, where: Mapping[str, Any] | None = None) -> ExportResult:
"""Render the (optionally scoped) store and write only changed artifacts."""
def export(
self,
database: Database,
*,
where: Mapping[str, Any] | None = None,
memory_body: str | None = None,
skills: dict[str, str] | None = None,
) -> ExportResult:
"""Render the (optionally scoped) store and write only changed artifacts.

``memory_body`` and ``skills`` are optional synthesized overrides (e.g.
produced by :class:`~memu.memory_fs.synthesizer.MemorySynthesizer`). When
provided they replace the deterministic, database-derived rendering of
``MEMORY.md`` / the ``skill/`` tree; ``INDEX.md`` is always deterministic.
"""
self.output_dir.mkdir(parents=True, exist_ok=True)
scope = dict(where) if where else None

Expand All @@ -119,12 +132,20 @@ def export(self, database: Database, *, where: Mapping[str, Any] | None = None)
# The shared trunk: one multimodal description per source file.
descriptions = self._build_descriptions(resources)

skill_artifacts = self._skill_bypass(items)
if skills is not None:
skill_artifacts = {
f"{SKILL_DIRNAME}/{slug}/{SKILL_FILENAME}": self._skill_document(body)
for slug, body in skills.items()
}
else:
skill_artifacts = self._skill_bypass(items)
skill_slugs = sorted(rel.split("/")[1] for rel in skill_artifacts)

body = memory_body if memory_body is not None else self._memory_body(categories)

artifacts: dict[str, str] = {}
artifacts.update(skill_artifacts)
artifacts[MEMORY_FILENAME] = self._memory_bypass(categories)
artifacts[MEMORY_FILENAME] = self._memory_document(body)
artifacts[INDEX_FILENAME] = self._index_bypass(categories, descriptions, skill_slugs, items, database=database)

return self._sync(artifacts)
Expand Down Expand Up @@ -162,9 +183,13 @@ def _skill_bypass(self, items: list[MemoryItem]) -> dict[str, str]:
slug = base if count == 0 else f"{base}-{count + 1}"
used[base] = count + 1
rel_path = f"{SKILL_DIRNAME}/{slug}/{SKILL_FILENAME}"
artifacts[rel_path] = f"{_GENERATED_NOTICE}\n\n{body}\n"
artifacts[rel_path] = self._skill_document(body)
return artifacts

@staticmethod
def _skill_document(body: str) -> str:
return f"{_GENERATED_NOTICE}\n\n{body.strip()}\n"

@staticmethod
def _skill_name(body: str, *, fallback: str) -> str:
"""Derive a skill folder slug from frontmatter ``name:`` or first heading."""
Expand All @@ -181,22 +206,26 @@ def _skill_name(body: str, *, fallback: str) -> str:

# -- bypass: MEMORY ----------------------------------------------------

def _memory_bypass(self, categories: list[MemoryCategory]) -> str:
"""The living memory: folder (category) summaries aggregated into one file."""
lines = ["# Memory", "", _GENERATED_NOTICE, ""]
@staticmethod
def _memory_document(body: str) -> str:
body = body.strip() or "_No memory yet._"
return f"# Memory\n\n{_GENERATED_NOTICE}\n\n{body}\n"

def _memory_body(self, categories: list[MemoryCategory]) -> str:
"""The living memory body: folder (category) summaries aggregated."""
if not categories:
lines += ["_No memory yet._", ""]
return "\n".join(lines)
return ""
sections: list[str] = []
for category in sorted(categories, key=lambda c: (c.name.lower(), c.id)):
description = self._inline((category.description or "").strip())
summary = (category.summary or "").strip()
lines.append(f"## {category.name}")
block = [f"## {category.name}"]
if description:
lines.append(f"_{description}_")
lines.append("")
lines.append(summary or "_No summary yet._")
lines.append("")
return "\n".join(lines)
block.append(f"_{description}_")
block.append("")
block.append(summary or "_No summary yet._")
sections.append("\n".join(block))
return "\n\n".join(sections)

# -- bypass: INDEX -----------------------------------------------------

Expand Down Expand Up @@ -305,6 +334,41 @@ def _prune_empty_dirs(self, directory: pathlib.Path) -> None:
else:
break

# -- reading existing artifacts (for incremental update) ---------------

def artifacts_exist(self) -> bool:
"""Whether a prior memory tree is already present (init vs update)."""
return (self.output_dir / MEMORY_FILENAME).exists()

def read_memory_body(self) -> str:
"""Read MEMORY.md and strip the heading/notice, returning just the body."""
path = self.output_dir / MEMORY_FILENAME
if not path.exists():
return ""
return self._strip_chrome(path.read_text(encoding="utf-8"), drop_heading="# Memory")

def read_skills(self) -> dict[str, str]:
"""Read existing ``skill/<slug>/SKILL.md`` bodies keyed by slug."""
skills: dict[str, str] = {}
skill_root = self.output_dir / SKILL_DIRNAME
if not skill_root.is_dir():
return skills
for child in sorted(skill_root.iterdir()):
doc = child / SKILL_FILENAME
if child.is_dir() and doc.exists():
skills[child.name] = self._strip_chrome(doc.read_text(encoding="utf-8"))
return skills

@staticmethod
def _strip_chrome(text: str, *, drop_heading: str | None = None) -> str:
lines = text.splitlines()
kept = [
line
for line in lines
if line.strip() != _GENERATED_NOTICE and (drop_heading is None or line.strip() != drop_heading)
]
return "\n".join(kept).strip()

def _load_manifest(self) -> dict[str, str]:
manifest_path = self.output_dir / MANIFEST_NAME
if not manifest_path.exists():
Expand Down
Loading
Loading