From b60d74e433a06c9e39ca918e9a0b249782ac7be1 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 8 Apr 2026 00:09:52 +0800 Subject: [PATCH 1/7] refactor: rename directory to location, move max_checkpoints to providers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename CheckpointConfig.directory to location — the field means a directory for JsonProvider and a database file path for SqliteProvider, so the generic name fits both - Add max_checkpoints to BaseProvider protocol - Add max_checkpoints and pruning to JsonProvider, matching SqliteProvider - Remove prune logic from checkpoint_listener since providers own cleanup --- .../src/crewai/state/checkpoint_config.py | 9 ++-- .../src/crewai/state/checkpoint_listener.py | 24 ++--------- lib/crewai/src/crewai/state/provider/core.py | 21 +++++++--- .../crewai/state/provider/json_provider.py | 32 +++++++++++---- .../crewai/state/provider/sqlite_provider.py | 41 ++++++++----------- lib/crewai/src/crewai/state/runtime.py | 16 ++++---- 6 files changed, 74 insertions(+), 69 deletions(-) diff --git a/lib/crewai/src/crewai/state/checkpoint_config.py b/lib/crewai/src/crewai/state/checkpoint_config.py index 4c60fd35c0..4c5499ff45 100644 --- a/lib/crewai/src/crewai/state/checkpoint_config.py +++ b/lib/crewai/src/crewai/state/checkpoint_config.py @@ -165,9 +165,10 @@ class CheckpointConfig(BaseModel): automatically whenever the specified event(s) fire. """ - directory: str = Field( + location: str = Field( default="./.checkpoints", - description="Filesystem path where checkpoint JSON files are written.", + description="Storage destination. For JsonProvider this is a directory " + "path; for SqliteProvider it is a database file path.", ) on_events: list[CheckpointEventType | Literal["*"]] = Field( default=["task_completed"], @@ -180,8 +181,8 @@ class CheckpointConfig(BaseModel): ) max_checkpoints: int | None = Field( default=None, - description="Maximum checkpoint files to keep. Oldest are pruned first. " - "None means keep all.", + description="Maximum checkpoints to keep. Oldest are pruned after " + "each write. None means keep all.", ) @property diff --git a/lib/crewai/src/crewai/state/checkpoint_listener.py b/lib/crewai/src/crewai/state/checkpoint_listener.py index cf5b39b2bf..6471b9bdeb 100644 --- a/lib/crewai/src/crewai/state/checkpoint_listener.py +++ b/lib/crewai/src/crewai/state/checkpoint_listener.py @@ -7,9 +7,7 @@ from __future__ import annotations -import glob import logging -import os import threading from typing import Any @@ -105,29 +103,13 @@ def _find_checkpoint(source: Any) -> CheckpointConfig | None: def _do_checkpoint(state: RuntimeState, cfg: CheckpointConfig) -> None: - """Write a checkpoint synchronously and optionally prune old files.""" + """Write a checkpoint and prune old ones if configured.""" _prepare_entities(state.root) data = state.model_dump_json() - cfg.provider.checkpoint(data, cfg.directory) + cfg.provider.checkpoint(data, cfg.location) if cfg.max_checkpoints is not None: - _prune(cfg.directory, cfg.max_checkpoints) - - -def _safe_remove(path: str) -> None: - try: - os.remove(path) - except OSError: - logger.debug("Failed to remove checkpoint file %s", path, exc_info=True) - - -def _prune(directory: str, max_keep: int) -> None: - """Remove oldest checkpoint files beyond *max_keep*.""" - pattern = os.path.join(directory, "*.json") - files = sorted(glob.glob(pattern), key=os.path.getmtime) - to_remove = files if max_keep == 0 else files[:-max_keep] - for path in to_remove: - _safe_remove(path) + cfg.provider.prune(cfg.location, cfg.max_checkpoints) def _should_checkpoint(source: Any, event: BaseEvent) -> CheckpointConfig | None: diff --git a/lib/crewai/src/crewai/state/provider/core.py b/lib/crewai/src/crewai/state/provider/core.py index ee420eea0b..46f079444c 100644 --- a/lib/crewai/src/crewai/state/provider/core.py +++ b/lib/crewai/src/crewai/state/provider/core.py @@ -34,27 +34,36 @@ def _validate(v: Any) -> BaseProvider: ), ) - def checkpoint(self, data: str, directory: str) -> str: + def checkpoint(self, data: str, location: str) -> str: """Persist a snapshot synchronously. Args: data: The serialized string to persist. - directory: Logical destination: path, bucket prefix, etc. + location: Storage destination (directory, file path, URI, etc.). Returns: - A location identifier for the saved checkpoint, such as a file path or URI. + A location identifier for the saved checkpoint. """ ... - async def acheckpoint(self, data: str, directory: str) -> str: + async def acheckpoint(self, data: str, location: str) -> str: """Persist a snapshot asynchronously. Args: data: The serialized string to persist. - directory: Logical destination: path, bucket prefix, etc. + location: Storage destination (directory, file path, URI, etc.). Returns: - A location identifier for the saved checkpoint, such as a file path or URI. + A location identifier for the saved checkpoint. + """ + ... + + def prune(self, location: str, max_keep: int) -> None: + """Remove old checkpoints, keeping at most *max_keep*. + + Args: + location: The storage destination passed to ``checkpoint``. + max_keep: Maximum number of checkpoints to retain. """ ... diff --git a/lib/crewai/src/crewai/state/provider/json_provider.py b/lib/crewai/src/crewai/state/provider/json_provider.py index 656e19fe0f..d2ac75d9c2 100644 --- a/lib/crewai/src/crewai/state/provider/json_provider.py +++ b/lib/crewai/src/crewai/state/provider/json_provider.py @@ -3,6 +3,9 @@ from __future__ import annotations from datetime import datetime, timezone +import glob +import logging +import os from pathlib import Path import uuid @@ -12,43 +15,56 @@ from crewai.state.provider.core import BaseProvider +logger = logging.getLogger(__name__) + + class JsonProvider(BaseProvider): """Persists runtime state checkpoints as JSON files on the local filesystem.""" - def checkpoint(self, data: str, directory: str) -> str: - """Write a JSON checkpoint file to the directory. + def checkpoint(self, data: str, location: str) -> str: + """Write a JSON checkpoint file. Args: data: The serialized JSON string to persist. - directory: Filesystem path where the checkpoint will be saved. + location: Directory where the checkpoint will be saved. Returns: The path to the written checkpoint file. """ - file_path = _build_path(directory) + file_path = _build_path(location) file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, "w") as f: f.write(data) return str(file_path) - async def acheckpoint(self, data: str, directory: str) -> str: - """Write a JSON checkpoint file to the directory asynchronously. + async def acheckpoint(self, data: str, location: str) -> str: + """Write a JSON checkpoint file asynchronously. Args: data: The serialized JSON string to persist. - directory: Filesystem path where the checkpoint will be saved. + location: Directory where the checkpoint will be saved. Returns: The path to the written checkpoint file. """ - file_path = _build_path(directory) + file_path = _build_path(location) await aiofiles.os.makedirs(str(file_path.parent), exist_ok=True) async with aiofiles.open(file_path, "w") as f: await f.write(data) return str(file_path) + def prune(self, location: str, max_keep: int) -> None: + """Remove oldest checkpoint files beyond *max_keep*.""" + pattern = os.path.join(location, "*.json") + files = sorted(glob.glob(pattern), key=os.path.getmtime) + for path in files if max_keep == 0 else files[:-max_keep]: + try: + os.remove(path) + except OSError: # noqa: PERF203 + logger.debug("Failed to remove %s", path, exc_info=True) + def from_checkpoint(self, location: str) -> str: """Read a JSON checkpoint file. diff --git a/lib/crewai/src/crewai/state/provider/sqlite_provider.py b/lib/crewai/src/crewai/state/provider/sqlite_provider.py index 7a1d893997..ae014dda35 100644 --- a/lib/crewai/src/crewai/state/provider/sqlite_provider.py +++ b/lib/crewai/src/crewai/state/provider/sqlite_provider.py @@ -43,58 +43,53 @@ def _make_id() -> tuple[str, str]: class SqliteProvider(BaseProvider): """Persists runtime state checkpoints in a SQLite database. - The ``directory`` argument to ``checkpoint`` / ``acheckpoint`` is - used as the database path (e.g. ``"./.checkpoints.db"``). - - Args: - max_checkpoints: Maximum number of checkpoints to retain. - Oldest rows are pruned after each write. None keeps all. + The ``location`` argument to ``checkpoint`` / ``acheckpoint`` is + used as the database file path. """ - def __init__(self, max_checkpoints: int | None = None) -> None: - self.max_checkpoints = max_checkpoints - - def checkpoint(self, data: str, directory: str) -> str: + def checkpoint(self, data: str, location: str) -> str: """Write a checkpoint to the SQLite database. Args: data: The serialized JSON string to persist. - directory: Path to the SQLite database file. + location: Path to the SQLite database file. Returns: A location string in the format ``"db_path#checkpoint_id"``. """ checkpoint_id, ts = _make_id() - Path(directory).parent.mkdir(parents=True, exist_ok=True) - with sqlite3.connect(directory) as conn: + Path(location).parent.mkdir(parents=True, exist_ok=True) + with sqlite3.connect(location) as conn: conn.execute("PRAGMA journal_mode=WAL") conn.execute(_CREATE_TABLE) conn.execute(_INSERT, (checkpoint_id, ts, data)) - if self.max_checkpoints is not None: - conn.execute(_PRUNE, (self.max_checkpoints,)) conn.commit() - return f"{directory}#{checkpoint_id}" + return f"{location}#{checkpoint_id}" - async def acheckpoint(self, data: str, directory: str) -> str: + async def acheckpoint(self, data: str, location: str) -> str: """Write a checkpoint to the SQLite database asynchronously. Args: data: The serialized JSON string to persist. - directory: Path to the SQLite database file. + location: Path to the SQLite database file. Returns: A location string in the format ``"db_path#checkpoint_id"``. """ checkpoint_id, ts = _make_id() - Path(directory).parent.mkdir(parents=True, exist_ok=True) - async with aiosqlite.connect(directory) as db: + Path(location).parent.mkdir(parents=True, exist_ok=True) + async with aiosqlite.connect(location) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute(_CREATE_TABLE) await db.execute(_INSERT, (checkpoint_id, ts, data)) - if self.max_checkpoints is not None: - await db.execute(_PRUNE, (self.max_checkpoints,)) await db.commit() - return f"{directory}#{checkpoint_id}" + return f"{location}#{checkpoint_id}" + + def prune(self, location: str, max_keep: int) -> None: + """Remove oldest checkpoint rows beyond *max_keep*.""" + with sqlite3.connect(location) as conn: + conn.execute(_PRUNE, (max_keep,)) + conn.commit() def from_checkpoint(self, location: str) -> str: """Read a checkpoint from the SQLite database. diff --git a/lib/crewai/src/crewai/state/runtime.py b/lib/crewai/src/crewai/state/runtime.py index a5bb6bd8d4..6f1c5de805 100644 --- a/lib/crewai/src/crewai/state/runtime.py +++ b/lib/crewai/src/crewai/state/runtime.py @@ -90,29 +90,31 @@ def _deserialize( return state return handler(data) - def checkpoint(self, directory: str) -> str: - """Write a checkpoint file to the directory. + def checkpoint(self, location: str) -> str: + """Write a checkpoint. Args: - directory: Filesystem path where the checkpoint JSON will be saved. + location: Storage destination. For JsonProvider this is a directory + path; for SqliteProvider it is a database file path. Returns: A location identifier for the saved checkpoint. """ _prepare_entities(self.root) - return self._provider.checkpoint(self.model_dump_json(), directory) + return self._provider.checkpoint(self.model_dump_json(), location) - async def acheckpoint(self, directory: str) -> str: + async def acheckpoint(self, location: str) -> str: """Async version of :meth:`checkpoint`. Args: - directory: Filesystem path where the checkpoint JSON will be saved. + location: Storage destination. For JsonProvider this is a directory + path; for SqliteProvider it is a database file path. Returns: A location identifier for the saved checkpoint. """ _prepare_entities(self.root) - return await self._provider.acheckpoint(self.model_dump_json(), directory) + return await self._provider.acheckpoint(self.model_dump_json(), location) @classmethod def from_checkpoint( From 013f5b3d385996261c6ffc46b31113434dffcb0d Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 8 Apr 2026 00:53:17 +0800 Subject: [PATCH 2/7] docs: update checkpointing docs for location rename and prune changes --- docs/en/concepts/checkpointing.mdx | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/docs/en/concepts/checkpointing.mdx b/docs/en/concepts/checkpointing.mdx index dccdf1b1a2..21ed139052 100644 --- a/docs/en/concepts/checkpointing.mdx +++ b/docs/en/concepts/checkpointing.mdx @@ -39,7 +39,7 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./my_checkpoints", + location="./my_checkpoints", on_events=["task_completed", "crew_kickoff_completed"], max_checkpoints=5, ), @@ -50,10 +50,10 @@ crew = Crew( | Field | Type | Default | Description | |:------|:-----|:--------|:------------| -| `directory` | `str` | `"./.checkpoints"` | Filesystem path for checkpoint files | +| `location` | `str` | `"./.checkpoints"` | Storage destination — a directory for `JsonProvider`, a database file path for `SqliteProvider` | | `on_events` | `list[str]` | `["task_completed"]` | Event types that trigger a checkpoint | | `provider` | `BaseProvider` | `JsonProvider()` | Storage backend | -| `max_checkpoints` | `int \| None` | `None` | Max files to keep; oldest pruned first | +| `max_checkpoints` | `int \| None` | `None` | Max checkpoints to keep. Oldest are pruned after each write. Pruning is handled by the provider. | ### Inheritance and Opt-Out @@ -95,7 +95,7 @@ The restored crew skips already-completed tasks and resumes from the first incom crew = Crew( agents=[researcher, writer], tasks=[research_task, write_task, review_task], - checkpoint=CheckpointConfig(directory="./crew_cp"), + checkpoint=CheckpointConfig(location="./crew_cp"), ) ``` @@ -118,7 +118,7 @@ class MyFlow(Flow): flow = MyFlow( checkpoint=CheckpointConfig( - directory="./flow_cp", + location="./flow_cp", on_events=["method_execution_finished"], ), ) @@ -137,7 +137,7 @@ agent = Agent( goal="Research topics", backstory="Expert researcher", checkpoint=CheckpointConfig( - directory="./agent_cp", + location="./agent_cp", on_events=["lite_agent_execution_completed"], ), ) @@ -160,14 +160,14 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./my_checkpoints", + location="./my_checkpoints", provider=JsonProvider(), # this is the default max_checkpoints=5, # prunes oldest files ), ) ``` -Files are named `_.json` inside the directory. +Files are named `_.json` inside the location directory. ### SqliteProvider @@ -181,17 +181,14 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./.checkpoints.db", - provider=SqliteProvider(max_checkpoints=50), + location="./.checkpoints.db", + provider=SqliteProvider(), + max_checkpoints=50, ), ) ``` -`SqliteProvider` accepts its own `max_checkpoints` parameter that prunes old rows via SQL. WAL journal mode is enabled for concurrent read access. - - -When using `SqliteProvider`, the `directory` field is the database file path, not a directory. The `max_checkpoints` on `CheckpointConfig` controls filesystem pruning (for `JsonProvider`), while `SqliteProvider.max_checkpoints` controls row pruning in the database. - +WAL journal mode is enabled for concurrent read access. ## Event Types From a5116b0ce58ecc13522cf8523992c7ce19d9facb Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 8 Apr 2026 00:56:01 +0800 Subject: [PATCH 3/7] docs: update checkpointing translations for location rename --- docs/ar/concepts/checkpointing.mdx | 19 ++++++++----------- docs/ko/concepts/checkpointing.mdx | 19 ++++++++----------- docs/pt-BR/concepts/checkpointing.mdx | 19 ++++++++----------- 3 files changed, 24 insertions(+), 33 deletions(-) diff --git a/docs/ar/concepts/checkpointing.mdx b/docs/ar/concepts/checkpointing.mdx index 4fa3665dd2..578f04be96 100644 --- a/docs/ar/concepts/checkpointing.mdx +++ b/docs/ar/concepts/checkpointing.mdx @@ -39,7 +39,7 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./my_checkpoints", + location="./my_checkpoints", on_events=["task_completed", "crew_kickoff_completed"], max_checkpoints=5, ), @@ -50,7 +50,7 @@ crew = Crew( | الحقل | النوع | الافتراضي | الوصف | |:------|:------|:----------|:------| -| `directory` | `str` | `"./.checkpoints"` | مسار ملفات نقاط الحفظ | +| `location` | `str` | `"./.checkpoints"` | مسار ملفات نقاط الحفظ | | `on_events` | `list[str]` | `["task_completed"]` | انواع الاحداث التي تطلق نقطة حفظ | | `provider` | `BaseProvider` | `JsonProvider()` | واجهة التخزين | | `max_checkpoints` | `int \| None` | `None` | الحد الاقصى للملفات؛ يتم حذف الاقدم اولا | @@ -95,7 +95,7 @@ result = crew.kickoff() # يستأنف من اخر مهمة مكتملة crew = Crew( agents=[researcher, writer], tasks=[research_task, write_task, review_task], - checkpoint=CheckpointConfig(directory="./crew_cp"), + checkpoint=CheckpointConfig(location="./crew_cp"), ) ``` @@ -118,7 +118,7 @@ class MyFlow(Flow): flow = MyFlow( checkpoint=CheckpointConfig( - directory="./flow_cp", + location="./flow_cp", on_events=["method_execution_finished"], ), ) @@ -137,7 +137,7 @@ agent = Agent( goal="Research topics", backstory="Expert researcher", checkpoint=CheckpointConfig( - directory="./agent_cp", + location="./agent_cp", on_events=["lite_agent_execution_completed"], ), ) @@ -160,7 +160,7 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./my_checkpoints", + location="./my_checkpoints", provider=JsonProvider(), max_checkpoints=5, ), @@ -179,15 +179,12 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./.checkpoints.db", - provider=SqliteProvider(max_checkpoints=50), + location="./.checkpoints.db", + provider=SqliteProvider(), ), ) ``` - -عند استخدام `SqliteProvider`، حقل `directory` هو مسار ملف قاعدة البيانات، وليس مجلدا. - ## انواع الاحداث diff --git a/docs/ko/concepts/checkpointing.mdx b/docs/ko/concepts/checkpointing.mdx index a08933faa4..643c6d9c12 100644 --- a/docs/ko/concepts/checkpointing.mdx +++ b/docs/ko/concepts/checkpointing.mdx @@ -39,7 +39,7 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./my_checkpoints", + location="./my_checkpoints", on_events=["task_completed", "crew_kickoff_completed"], max_checkpoints=5, ), @@ -50,7 +50,7 @@ crew = Crew( | 필드 | 타입 | 기본값 | 설명 | |:-----|:-----|:-------|:-----| -| `directory` | `str` | `"./.checkpoints"` | 체크포인트 파일 경로 | +| `location` | `str` | `"./.checkpoints"` | 체크포인트 파일 경로 | | `on_events` | `list[str]` | `["task_completed"]` | 체크포인트를 트리거하는 이벤트 타입 | | `provider` | `BaseProvider` | `JsonProvider()` | 스토리지 백엔드 | | `max_checkpoints` | `int \| None` | `None` | 보관할 최대 파일 수; 오래된 것부터 삭제 | @@ -95,7 +95,7 @@ result = crew.kickoff() # 마지막으로 완료된 태스크부터 재개 crew = Crew( agents=[researcher, writer], tasks=[research_task, write_task, review_task], - checkpoint=CheckpointConfig(directory="./crew_cp"), + checkpoint=CheckpointConfig(location="./crew_cp"), ) ``` @@ -118,7 +118,7 @@ class MyFlow(Flow): flow = MyFlow( checkpoint=CheckpointConfig( - directory="./flow_cp", + location="./flow_cp", on_events=["method_execution_finished"], ), ) @@ -137,7 +137,7 @@ agent = Agent( goal="Research topics", backstory="Expert researcher", checkpoint=CheckpointConfig( - directory="./agent_cp", + location="./agent_cp", on_events=["lite_agent_execution_completed"], ), ) @@ -160,7 +160,7 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./my_checkpoints", + location="./my_checkpoints", provider=JsonProvider(), max_checkpoints=5, ), @@ -179,15 +179,12 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./.checkpoints.db", - provider=SqliteProvider(max_checkpoints=50), + location="./.checkpoints.db", + provider=SqliteProvider(), ), ) ``` - -`SqliteProvider`를 사용할 때 `directory` 필드는 디렉토리가 아닌 데이터베이스 파일 경로입니다. - ## 이벤트 타입 diff --git a/docs/pt-BR/concepts/checkpointing.mdx b/docs/pt-BR/concepts/checkpointing.mdx index 1ef7aedf3e..25db59713e 100644 --- a/docs/pt-BR/concepts/checkpointing.mdx +++ b/docs/pt-BR/concepts/checkpointing.mdx @@ -39,7 +39,7 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./my_checkpoints", + location="./my_checkpoints", on_events=["task_completed", "crew_kickoff_completed"], max_checkpoints=5, ), @@ -50,7 +50,7 @@ crew = Crew( | Campo | Tipo | Padrao | Descricao | |:------|:-----|:-------|:----------| -| `directory` | `str` | `"./.checkpoints"` | Caminho para os arquivos de checkpoint | +| `location` | `str` | `"./.checkpoints"` | Caminho para os arquivos de checkpoint | | `on_events` | `list[str]` | `["task_completed"]` | Tipos de evento que acionam um checkpoint | | `provider` | `BaseProvider` | `JsonProvider()` | Backend de armazenamento | | `max_checkpoints` | `int \| None` | `None` | Maximo de arquivos a manter; os mais antigos sao removidos primeiro | @@ -95,7 +95,7 @@ A crew restaurada pula tarefas ja concluidas e retoma a partir da primeira incom crew = Crew( agents=[researcher, writer], tasks=[research_task, write_task, review_task], - checkpoint=CheckpointConfig(directory="./crew_cp"), + checkpoint=CheckpointConfig(location="./crew_cp"), ) ``` @@ -118,7 +118,7 @@ class MyFlow(Flow): flow = MyFlow( checkpoint=CheckpointConfig( - directory="./flow_cp", + location="./flow_cp", on_events=["method_execution_finished"], ), ) @@ -137,7 +137,7 @@ agent = Agent( goal="Research topics", backstory="Expert researcher", checkpoint=CheckpointConfig( - directory="./agent_cp", + location="./agent_cp", on_events=["lite_agent_execution_completed"], ), ) @@ -160,7 +160,7 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./my_checkpoints", + location="./my_checkpoints", provider=JsonProvider(), max_checkpoints=5, ), @@ -179,15 +179,12 @@ crew = Crew( agents=[...], tasks=[...], checkpoint=CheckpointConfig( - directory="./.checkpoints.db", - provider=SqliteProvider(max_checkpoints=50), + location="./.checkpoints.db", + provider=SqliteProvider(), ), ) ``` - -Ao usar `SqliteProvider`, o campo `directory` e o caminho do arquivo de banco de dados, nao um diretorio. - ## Tipos de Evento From e43ddcab5ff9791200dab45a273aaee020622078 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 8 Apr 2026 01:04:18 +0800 Subject: [PATCH 4/7] feat: add crewai checkpoint list/info CLI commands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit crewai checkpoint list [location] — lists all checkpoints with timestamp, size, and task completion summary. crewai checkpoint info [path] — shows details of a single checkpoint or the latest in a directory, including per-task status. --- lib/crewai/src/crewai/cli/checkpoint_cli.py | 149 ++++++++++++++++++++ lib/crewai/src/crewai/cli/cli.py | 23 +++ 2 files changed, 172 insertions(+) create mode 100644 lib/crewai/src/crewai/cli/checkpoint_cli.py diff --git a/lib/crewai/src/crewai/cli/checkpoint_cli.py b/lib/crewai/src/crewai/cli/checkpoint_cli.py new file mode 100644 index 0000000000..a37e46357d --- /dev/null +++ b/lib/crewai/src/crewai/cli/checkpoint_cli.py @@ -0,0 +1,149 @@ +"""CLI commands for inspecting checkpoint files.""" + +from __future__ import annotations + +from datetime import datetime +import glob +import json +import os +from typing import Any + +import click + + +def _find_checkpoints(location: str) -> list[str]: + """Find checkpoint files in a directory, sorted newest first.""" + pattern = os.path.join(location, "*.json") + return sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) + + +def _load_metadata(path: str) -> dict[str, Any]: + """Load checkpoint metadata without full deserialization.""" + with open(path) as f: + data = json.load(f) + + entities = data.get("entities", []) + event_count = len(data.get("event_record", {}).get("nodes", {})) + + parsed_entities: list[dict[str, Any]] = [] + for entity in entities: + tasks = entity.get("tasks", []) + completed = sum(1 for t in tasks if t.get("output") is not None) + info: dict[str, Any] = { + "type": entity.get("entity_type", "unknown"), + "name": entity.get("name"), + "id": entity.get("id"), + } + if tasks: + info["tasks_completed"] = completed + info["tasks_total"] = len(tasks) + info["tasks"] = [ + { + "description": t.get("description", ""), + "completed": t.get("output") is not None, + } + for t in tasks + ] + parsed_entities.append(info) + + return { + "path": path, + "size": os.path.getsize(path), + "event_count": event_count, + "entities": parsed_entities, + } + + +def _format_size(size: int) -> str: + if size < 1024: + return f"{size}B" + if size < 1024 * 1024: + return f"{size / 1024:.1f}KB" + return f"{size / 1024 / 1024:.1f}MB" + + +def _ts_from_filename(path: str) -> str | None: + """Extract timestamp from checkpoint filename.""" + name = os.path.basename(path).split("_")[0] + try: + dt = datetime.strptime(name, "%Y%m%dT%H%M%S") + except ValueError: + return None + return dt.strftime("%Y-%m-%d %H:%M:%S") + + +def list_checkpoints(location: str) -> None: + """List all checkpoints in a directory.""" + files = _find_checkpoints(location) + + if not files: + click.echo(f"No checkpoints found in {location}") + return + + click.echo(f"Found {len(files)} checkpoint(s) in {location}\n") + + for path in files: + ts = _ts_from_filename(path) or "unknown" + size = _format_size(os.path.getsize(path)) + name = os.path.basename(path) + + try: + meta = _load_metadata(path) + parts = [] + for ent in meta["entities"]: + etype = ent.get("type", "unknown") + ename = ent.get("name", "") + completed = ent.get("tasks_completed") + total = ent.get("tasks_total") + if completed is not None and total is not None: + parts.append(f"{etype}:{ename} [{completed}/{total} tasks]") + else: + parts.append(f"{etype}:{ename}") + summary = ", ".join(parts) if parts else "empty" + except Exception: + summary = "unreadable" + + click.echo(f" {name} {ts} {size} {summary}") + + +def info_checkpoint(path: str) -> None: + """Show details of a single checkpoint.""" + if os.path.isdir(path): + files = _find_checkpoints(path) + if not files: + click.echo(f"No checkpoints found in {path}") + return + path = files[0] + click.echo(f"Latest checkpoint: {os.path.basename(path)}\n") + + if not os.path.isfile(path): + click.echo(f"File not found: {path}") + return + + try: + meta = _load_metadata(path) + except Exception as exc: + click.echo(f"Failed to read checkpoint: {exc}") + return + + ts = _ts_from_filename(path) or "unknown" + click.echo(f"File: {meta['path']}") + click.echo(f"Time: {ts}") + click.echo(f"Size: {_format_size(meta['size'])}") + click.echo(f"Events: {meta['event_count']}") + + for ent in meta["entities"]: + eid = str(ent.get("id", ""))[:8] + click.echo(f"\n {ent['type']}: {ent.get('name', 'unnamed')} ({eid}...)") + + tasks = ent.get("tasks") + if isinstance(tasks, list): + click.echo( + f" Tasks: {ent['tasks_completed']}/{ent['tasks_total']} completed" + ) + for i, task in enumerate(tasks): + status = "done" if task.get("completed") else "pending" + desc = str(task.get("description", "")) + if len(desc) > 70: + desc = desc[:67] + "..." + click.echo(f" {i + 1}. [{status}] {desc}") diff --git a/lib/crewai/src/crewai/cli/cli.py b/lib/crewai/src/crewai/cli/cli.py index c40fe656f5..57ff4551a3 100644 --- a/lib/crewai/src/crewai/cli/cli.py +++ b/lib/crewai/src/crewai/cli/cli.py @@ -786,5 +786,28 @@ def traces_status() -> None: console.print(panel) +@crewai.group() +def checkpoint() -> None: + """Inspect checkpoint files.""" + + +@checkpoint.command("list") +@click.argument("location", default="./.checkpoints") +def checkpoint_list(location: str) -> None: + """List checkpoints in a directory.""" + from crewai.cli.checkpoint_cli import list_checkpoints + + list_checkpoints(location) + + +@checkpoint.command("info") +@click.argument("path", default="./.checkpoints") +def checkpoint_info(path: str) -> None: + """Show details of a checkpoint. Pass a file or directory for latest.""" + from crewai.cli.checkpoint_cli import info_checkpoint + + info_checkpoint(path) + + if __name__ == "__main__": crewai() From c61a40ed37b8dd23965204b7b6dc629484a69501 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 8 Apr 2026 01:08:05 +0800 Subject: [PATCH 5/7] fix: update checkpoint tests for directory to location rename --- lib/crewai/tests/test_checkpoint.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/crewai/tests/test_checkpoint.py b/lib/crewai/tests/test_checkpoint.py index 3533dac854..29dc289b47 100644 --- a/lib/crewai/tests/test_checkpoint.py +++ b/lib/crewai/tests/test_checkpoint.py @@ -17,10 +17,10 @@ from crewai.state.checkpoint_config import CheckpointConfig from crewai.state.checkpoint_listener import ( _find_checkpoint, - _prune, _resolve, _SENTINEL, ) +from crewai.state.provider.json_provider import JsonProvider from crewai.task import Task @@ -37,10 +37,10 @@ def test_false_returns_sentinel(self) -> None: def test_true_returns_config(self) -> None: result = _resolve(True) assert isinstance(result, CheckpointConfig) - assert result.directory == "./.checkpoints" + assert result.location == "./.checkpoints" def test_config_returns_config(self) -> None: - cfg = CheckpointConfig(directory="/tmp/cp") + cfg = CheckpointConfig(location="/tmp/cp") assert _resolve(cfg) is cfg @@ -77,12 +77,12 @@ def test_crew_none_agent_none(self) -> None: def test_agent_config_overrides_crew(self) -> None: a = self._make_agent( - checkpoint=CheckpointConfig(directory="/agent_cp") + checkpoint=CheckpointConfig(location="/agent_cp") ) self._make_crew([a], checkpoint=True) cfg = _find_checkpoint(a) assert isinstance(cfg, CheckpointConfig) - assert cfg.directory == "/agent_cp" + assert cfg.location == "/agent_cp" def test_task_inherits_from_crew(self) -> None: a = self._make_agent() @@ -123,7 +123,7 @@ def test_prune_keeps_newest(self) -> None: # Ensure distinct mtime time.sleep(0.01) - _prune(d, max_keep=2) + JsonProvider().prune(d, max_keep=2) remaining = os.listdir(d) assert len(remaining) == 2 assert "cp_3.json" in remaining @@ -135,7 +135,7 @@ def test_prune_zero_removes_all(self) -> None: with open(os.path.join(d, f"cp_{i}.json"), "w") as f: f.write("{}") - _prune(d, max_keep=0) + JsonProvider().prune(d, max_keep=0) assert os.listdir(d) == [] def test_prune_more_than_existing(self) -> None: @@ -143,7 +143,7 @@ def test_prune_more_than_existing(self) -> None: with open(os.path.join(d, "cp.json"), "w") as f: f.write("{}") - _prune(d, max_keep=10) + JsonProvider().prune(d, max_keep=10) assert len(os.listdir(d)) == 1 @@ -153,7 +153,7 @@ def test_prune_more_than_existing(self) -> None: class TestCheckpointConfig: def test_defaults(self) -> None: cfg = CheckpointConfig() - assert cfg.directory == "./.checkpoints" + assert cfg.location == "./.checkpoints" assert cfg.on_events == ["task_completed"] assert cfg.max_checkpoints is None assert not cfg.trigger_all From c0755f64b465b7688d9ea53ede1de9371eb06e6f Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 8 Apr 2026 01:14:11 +0800 Subject: [PATCH 6/7] feat: checkpoint CLI reads from both JSON directories and SQLite --- lib/crewai/src/crewai/cli/checkpoint_cli.py | 275 ++++++++++++++++---- 1 file changed, 219 insertions(+), 56 deletions(-) diff --git a/lib/crewai/src/crewai/cli/checkpoint_cli.py b/lib/crewai/src/crewai/cli/checkpoint_cli.py index a37e46357d..5486b3d6b5 100644 --- a/lib/crewai/src/crewai/cli/checkpoint_cli.py +++ b/lib/crewai/src/crewai/cli/checkpoint_cli.py @@ -6,22 +6,48 @@ import glob import json import os +import sqlite3 from typing import Any import click -def _find_checkpoints(location: str) -> list[str]: - """Find checkpoint files in a directory, sorted newest first.""" - pattern = os.path.join(location, "*.json") - return sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) +_SQLITE_MAGIC = b"SQLite format 3\x00" +_SELECT_ALL = """ +SELECT id, created_at, json(data) +FROM checkpoints +ORDER BY rowid DESC +""" -def _load_metadata(path: str) -> dict[str, Any]: - """Load checkpoint metadata without full deserialization.""" - with open(path) as f: - data = json.load(f) +_SELECT_ONE = """ +SELECT id, created_at, json(data) +FROM checkpoints +WHERE id = ? +""" + +_SELECT_LATEST = """ +SELECT id, created_at, json(data) +FROM checkpoints +ORDER BY rowid DESC +LIMIT 1 +""" + + +def _is_sqlite(path: str) -> bool: + """Check if a file is a SQLite database by reading its magic bytes.""" + if not os.path.isfile(path): + return False + try: + with open(path, "rb") as f: + return f.read(16) == _SQLITE_MAGIC + except OSError: + return False + +def _parse_checkpoint_json(raw: str, source: str) -> dict[str, Any]: + """Parse checkpoint JSON into metadata dict.""" + data = json.loads(raw) entities = data.get("entities", []) event_count = len(data.get("event_record", {}).get("nodes", {})) @@ -47,8 +73,7 @@ def _load_metadata(path: str) -> dict[str, Any]: parsed_entities.append(info) return { - "path": path, - "size": os.path.getsize(path), + "source": source, "event_count": event_count, "entities": parsed_entities, } @@ -62,77 +87,215 @@ def _format_size(size: int) -> str: return f"{size / 1024 / 1024:.1f}MB" -def _ts_from_filename(path: str) -> str | None: - """Extract timestamp from checkpoint filename.""" - name = os.path.basename(path).split("_")[0] +def _ts_from_name(name: str) -> str | None: + """Extract timestamp from checkpoint ID or filename.""" + stem = os.path.basename(name).split("_")[0].removesuffix(".json") try: - dt = datetime.strptime(name, "%Y%m%dT%H%M%S") + dt = datetime.strptime(stem, "%Y%m%dT%H%M%S") except ValueError: return None return dt.strftime("%Y-%m-%d %H:%M:%S") -def list_checkpoints(location: str) -> None: - """List all checkpoints in a directory.""" - files = _find_checkpoints(location) +def _entity_summary(entities: list[dict[str, Any]]) -> str: + parts = [] + for ent in entities: + etype = ent.get("type", "unknown") + ename = ent.get("name", "") + completed = ent.get("tasks_completed") + total = ent.get("tasks_total") + if completed is not None and total is not None: + parts.append(f"{etype}:{ename} [{completed}/{total} tasks]") + else: + parts.append(f"{etype}:{ename}") + return ", ".join(parts) if parts else "empty" - if not files: - click.echo(f"No checkpoints found in {location}") - return - click.echo(f"Found {len(files)} checkpoint(s) in {location}\n") +# --- JSON directory --- - for path in files: - ts = _ts_from_filename(path) or "unknown" - size = _format_size(os.path.getsize(path)) - name = os.path.basename(path) +def _list_json(location: str) -> list[dict[str, Any]]: + pattern = os.path.join(location, "*.json") + results = [] + for path in sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True): + name = os.path.basename(path) try: - meta = _load_metadata(path) - parts = [] - for ent in meta["entities"]: - etype = ent.get("type", "unknown") - ename = ent.get("name", "") - completed = ent.get("tasks_completed") - total = ent.get("tasks_total") - if completed is not None and total is not None: - parts.append(f"{etype}:{ename} [{completed}/{total} tasks]") - else: - parts.append(f"{etype}:{ename}") - summary = ", ".join(parts) if parts else "empty" + with open(path) as f: + raw = f.read() + meta = _parse_checkpoint_json(raw, source=name) + meta["name"] = name + meta["ts"] = _ts_from_name(name) + meta["size"] = os.path.getsize(path) + meta["path"] = path except Exception: - summary = "unreadable" + meta = {"name": name, "ts": None, "size": 0, "entities": [], "source": name} + results.append(meta) + return results + + +def _info_json_latest(location: str) -> dict[str, Any] | None: + pattern = os.path.join(location, "*.json") + files = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True) + if not files: + return None + path = files[0] + with open(path) as f: + raw = f.read() + meta = _parse_checkpoint_json(raw, source=os.path.basename(path)) + meta["name"] = os.path.basename(path) + meta["ts"] = _ts_from_name(path) + meta["size"] = os.path.getsize(path) + meta["path"] = path + return meta + + +def _info_json_file(path: str) -> dict[str, Any]: + with open(path) as f: + raw = f.read() + meta = _parse_checkpoint_json(raw, source=os.path.basename(path)) + meta["name"] = os.path.basename(path) + meta["ts"] = _ts_from_name(path) + meta["size"] = os.path.getsize(path) + meta["path"] = path + return meta + + +# --- SQLite --- + + +def _list_sqlite(db_path: str) -> list[dict[str, Any]]: + results = [] + with sqlite3.connect(db_path) as conn: + for row in conn.execute(_SELECT_ALL): + checkpoint_id, created_at, raw = row + try: + meta = _parse_checkpoint_json(raw, source=checkpoint_id) + meta["name"] = checkpoint_id + meta["ts"] = _ts_from_name(checkpoint_id) or created_at + except Exception: + meta = { + "name": checkpoint_id, + "ts": created_at, + "entities": [], + "source": checkpoint_id, + } + results.append(meta) + return results + + +def _info_sqlite_latest(db_path: str) -> dict[str, Any] | None: + with sqlite3.connect(db_path) as conn: + row = conn.execute(_SELECT_LATEST).fetchone() + if not row: + return None + checkpoint_id, created_at, raw = row + meta = _parse_checkpoint_json(raw, source=checkpoint_id) + meta["name"] = checkpoint_id + meta["ts"] = _ts_from_name(checkpoint_id) or created_at + meta["db"] = db_path + return meta + + +def _info_sqlite_id(db_path: str, checkpoint_id: str) -> dict[str, Any] | None: + with sqlite3.connect(db_path) as conn: + row = conn.execute(_SELECT_ONE, (checkpoint_id,)).fetchone() + if not row: + return None + cid, created_at, raw = row + meta = _parse_checkpoint_json(raw, source=cid) + meta["name"] = cid + meta["ts"] = _ts_from_name(cid) or created_at + meta["db"] = db_path + return meta - click.echo(f" {name} {ts} {size} {summary}") + +# --- Public API --- + + +def list_checkpoints(location: str) -> None: + """List all checkpoints at a location.""" + if _is_sqlite(location): + entries = _list_sqlite(location) + label = f"SQLite: {location}" + elif os.path.isdir(location): + entries = _list_json(location) + label = location + else: + click.echo(f"Not a directory or SQLite database: {location}") + return + + if not entries: + click.echo(f"No checkpoints found in {label}") + return + + click.echo(f"Found {len(entries)} checkpoint(s) in {label}\n") + + for entry in entries: + ts = entry.get("ts") or "unknown" + name = entry.get("name", "") + size = _format_size(entry["size"]) if "size" in entry else "" + summary = _entity_summary(entry.get("entities", [])) + if size: + click.echo(f" {name} {ts} {size} {summary}") + else: + click.echo(f" {name} {ts} {summary}") def info_checkpoint(path: str) -> None: """Show details of a single checkpoint.""" - if os.path.isdir(path): - files = _find_checkpoints(path) - if not files: + meta: dict[str, Any] | None = None + + # db_path#checkpoint_id format + if "#" in path: + db_path, checkpoint_id = path.rsplit("#", 1) + if _is_sqlite(db_path): + meta = _info_sqlite_id(db_path, checkpoint_id) + if not meta: + click.echo(f"Checkpoint not found: {checkpoint_id}") + return + + # SQLite file — show latest + if meta is None and _is_sqlite(path): + meta = _info_sqlite_latest(path) + if not meta: + click.echo(f"No checkpoints in database: {path}") + return + click.echo(f"Latest checkpoint: {meta['name']}\n") + + # Directory — show latest JSON + if meta is None and os.path.isdir(path): + meta = _info_json_latest(path) + if not meta: click.echo(f"No checkpoints found in {path}") return - path = files[0] - click.echo(f"Latest checkpoint: {os.path.basename(path)}\n") + click.echo(f"Latest checkpoint: {meta['name']}\n") - if not os.path.isfile(path): - click.echo(f"File not found: {path}") - return + # Specific JSON file + if meta is None and os.path.isfile(path): + try: + meta = _info_json_file(path) + except Exception as exc: + click.echo(f"Failed to read checkpoint: {exc}") + return - try: - meta = _load_metadata(path) - except Exception as exc: - click.echo(f"Failed to read checkpoint: {exc}") + if meta is None: + click.echo(f"Not found: {path}") return - ts = _ts_from_filename(path) or "unknown" - click.echo(f"File: {meta['path']}") + _print_info(meta) + + +def _print_info(meta: dict[str, Any]) -> None: + ts = meta.get("ts") or "unknown" + source = meta.get("path") or meta.get("db") or meta.get("source", "") + click.echo(f"Source: {source}") + click.echo(f"Name: {meta.get('name', '')}") click.echo(f"Time: {ts}") - click.echo(f"Size: {_format_size(meta['size'])}") - click.echo(f"Events: {meta['event_count']}") + if "size" in meta: + click.echo(f"Size: {_format_size(meta['size'])}") + click.echo(f"Events: {meta.get('event_count', 0)}") - for ent in meta["entities"]: + for ent in meta.get("entities", []): eid = str(ent.get("id", ""))[:8] click.echo(f"\n {ent['type']}: {ent.get('name', 'unnamed')} ({eid}...)") From 7ccf811676518f80b72bfc245f084d4f867665d3 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 8 Apr 2026 01:21:02 +0800 Subject: [PATCH 7/7] feat: show trigger event type in checkpoint list and info --- lib/crewai/src/crewai/cli/checkpoint_cli.py | 25 +++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/lib/crewai/src/crewai/cli/checkpoint_cli.py b/lib/crewai/src/crewai/cli/checkpoint_cli.py index 5486b3d6b5..c61500b208 100644 --- a/lib/crewai/src/crewai/cli/checkpoint_cli.py +++ b/lib/crewai/src/crewai/cli/checkpoint_cli.py @@ -49,7 +49,16 @@ def _parse_checkpoint_json(raw: str, source: str) -> dict[str, Any]: """Parse checkpoint JSON into metadata dict.""" data = json.loads(raw) entities = data.get("entities", []) - event_count = len(data.get("event_record", {}).get("nodes", {})) + nodes = data.get("event_record", {}).get("nodes", {}) + event_count = len(nodes) + + trigger_event = None + if nodes: + last_node = max( + nodes.values(), + key=lambda n: n.get("event", {}).get("emission_sequence") or 0, + ) + trigger_event = last_node.get("event", {}).get("type") parsed_entities: list[dict[str, Any]] = [] for entity in entities: @@ -75,6 +84,7 @@ def _parse_checkpoint_json(raw: str, source: str) -> dict[str, Any]: return { "source": source, "event_count": event_count, + "trigger": trigger_event, "entities": parsed_entities, } @@ -234,11 +244,15 @@ def list_checkpoints(location: str) -> None: ts = entry.get("ts") or "unknown" name = entry.get("name", "") size = _format_size(entry["size"]) if "size" in entry else "" + trigger = entry.get("trigger") or "" summary = _entity_summary(entry.get("entities", [])) + parts = [name, ts] if size: - click.echo(f" {name} {ts} {size} {summary}") - else: - click.echo(f" {name} {ts} {summary}") + parts.append(size) + if trigger: + parts.append(trigger) + parts.append(summary) + click.echo(f" {' '.join(parts)}") def info_checkpoint(path: str) -> None: @@ -294,6 +308,9 @@ def _print_info(meta: dict[str, Any]) -> None: if "size" in meta: click.echo(f"Size: {_format_size(meta['size'])}") click.echo(f"Events: {meta.get('event_count', 0)}") + trigger = meta.get("trigger") + if trigger: + click.echo(f"Trigger: {trigger}") for ent in meta.get("entities", []): eid = str(ent.get("id", ""))[:8]