From e220d1adcac4ca06c2123727cda17b713ef81020 Mon Sep 17 00:00:00 2001 From: LeastAction-Labs Date: Tue, 16 Jun 2026 21:48:59 -0700 Subject: [PATCH 1/2] fix: stop log streams from blocking worker event loops (#11) Log endpoints did directory walks, recursive globs, full-file reads, and JSON parsing/sorting directly on the worker event loop. With one event loop per uvicorn worker, a few heavy log tabs could freeze an entire worker, making the service unusable. - Offload all blocking/CPU-bound log work to threads via asyncio.to_thread - Add a per-process semaphore (LOG_WORK_SEMAPHORE) bounding concurrent heavy log ops so a burst can't exhaust the shared thread pool and starve other endpoints; apply it to query_logs (DuckDB) too - Page file tails by seeking backward from EOF instead of reading the whole file into memory SSE contract unchanged (frontend reads only content/has_more). Tail-paging math validated to match the prior whole-file logic across skip/limit ranges, including multi-block reads and empty files. Co-Authored-By: Claude Opus 4.8 --- backend/src/core/api/routes/logs.py | 10 +- backend/src/core/logs_details/service.py | 293 +++++++++++++++-------- 2 files changed, 196 insertions(+), 107 deletions(-) diff --git a/backend/src/core/api/routes/logs.py b/backend/src/core/api/routes/logs.py index f3389ea..90f6996 100644 --- a/backend/src/core/api/routes/logs.py +++ b/backend/src/core/api/routes/logs.py @@ -17,7 +17,7 @@ from src.common.config import Config from src.common.context_vars.user_context import get_user_laui from src.common.logger.logger import log_error, log_info -from src.core.logs_details.service import LogsService, get_logs_service +from src.core.logs_details.service import LOG_WORK_SEMAPHORE, LogsService, get_logs_service logs_router = APIRouter() @@ -94,10 +94,10 @@ async def query_logs(request: LogQueryRequest) -> dict: log_info( "api", "logs_router", "query_logs", f"user={get_user_laui()} payload={request.model_dump()}" ) - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, _run_duckdb_query, request.sql, request.category, request.date - ) + async with LOG_WORK_SEMAPHORE: + return await asyncio.to_thread( + _run_duckdb_query, request.sql, request.category, request.date + ) def _sse(event: str, data: Any) -> str: diff --git a/backend/src/core/logs_details/service.py b/backend/src/core/logs_details/service.py index 2919026..efb0b81 100644 --- a/backend/src/core/logs_details/service.py +++ b/backend/src/core/logs_details/service.py @@ -3,18 +3,54 @@ # LeastAction Sustainable Use License (see LICENSE.md) or, for files # marked EE, the LeastAction Enterprise Edition License (see LICENSE_EE.md). # Use of this file outside those terms is not permitted. +import asyncio import json +import os from collections.abc import AsyncIterator from pathlib import Path from typing import Any import aiofiles -import aiofiles.os from src.common.config import Config from src.common.exceptions import InvalidArgumentError, NotFoundError, UnsupportedMediaTypeError CHUNK_SIZE = 64 * 1024 # 64 KB +TAIL_BLOCK_SIZE = 64 * 1024 # read window when paging a file tail from the end + +# Bounds how many heavy log operations (directory walks, full-file reads, JSON +# parsing) run concurrently *per worker process*. uvicorn runs one event loop per +# worker; without a cap, a handful of heavy log requests can saturate the shared +# thread pool and starve every other endpoint on that worker. See issue #11. +_MAX_CONCURRENT_LOG_OPS = 3 +LOG_WORK_SEMAPHORE = asyncio.Semaphore(_MAX_CONCURRENT_LOG_OPS) + + +def _read_tail_lines(path: Path, needed: int) -> tuple[list[str], bool]: + """Read non-empty lines from the END of a file, growing backward until at least + ``needed`` complete lines are available or the start of file is reached. + + Returns ``(lines_in_chronological_order, reached_start)``. Avoids loading the + whole file just to show a tail page. + """ + with open(path, "rb") as f: + f.seek(0, os.SEEK_END) + pos = f.tell() + buf = b"" + # +1 newline of slack so a partial first line can be dropped and still leave + # `needed` complete lines. + while pos > 0 and buf.count(b"\n") <= needed: + read_size = min(TAIL_BLOCK_SIZE, pos) + pos -= read_size + f.seek(pos) + buf = f.read(read_size) + buf + reached_start = pos == 0 + + lines = buf.decode("utf-8", errors="replace").split("\n") + if not reached_start and lines: + # The first line was cut mid-line by the block boundary; drop it. + lines = lines[1:] + return [ln for ln in lines if ln.strip()], reached_start class LogsService: @@ -24,44 +60,53 @@ def __init__(self) -> None: def _get_logs_dir(self) -> Path: return Config().logs_dir - def _build_item_info(self, base_dir: Path, item: Path, stat) -> dict[str, Any]: - return { - "name": item.name, - "type": "directory" if item.is_dir() else "file", - "size": stat.st_size if item.is_file() else None, - "modified": stat.st_mtime, - "path": str(item.relative_to(base_dir)), - } + # ── folder listing ──────────────────────────────────────────────────────── - async def list_folder_items(self, folder_path: str) -> dict[str, Any]: + def _list_folder_items_sync(self, folder_path: str) -> dict[str, Any]: logs_dir = self._logs_dir target = logs_dir if not folder_path or folder_path == "." else logs_dir / folder_path - if not await aiofiles.os.path.exists(target): + if not target.exists(): raise NotFoundError(message="Folder not found") - if not await aiofiles.os.path.isdir(target): + if not target.is_dir(): raise InvalidArgumentError(message="Path is not a directory") items: list[dict[str, Any]] = [] - for item in target.iterdir(): - stat = await aiofiles.os.stat(item) - items.append(self._build_item_info(logs_dir, item, stat)) + with os.scandir(target) as it: + for entry in it: + stat = entry.stat() + is_file = entry.is_file() + items.append( + { + "name": entry.name, + "type": "directory" if entry.is_dir() else "file", + "size": stat.st_size if is_file else None, + "modified": stat.st_mtime, + "path": str(Path(entry.path).relative_to(logs_dir)), + } + ) if not folder_path or folder_path == ".": items.sort(key=lambda x: x["name"], reverse=True) return {"directory": str(target), "items": items, "total_count": len(items)} - async def get_file_metadata(self, file_path: str) -> dict[str, Any]: + async def list_folder_items(self, folder_path: str) -> dict[str, Any]: + async with LOG_WORK_SEMAPHORE: + return await asyncio.to_thread(self._list_folder_items_sync, folder_path) + + # ── file metadata (light) ───────────────────────────────────────────────── + + def _get_file_metadata_sync(self, file_path: str) -> dict[str, Any]: logs_dir = self._logs_dir target = logs_dir / file_path - if not await aiofiles.os.path.exists(target): + if not target.exists(): raise NotFoundError(message="File not found") - if not await aiofiles.os.path.isfile(target): + if not target.is_file(): raise InvalidArgumentError(message="Path is not a file") - s = await aiofiles.os.stat(target) + s = target.stat() return { "name": target.name, "path": str(target.relative_to(logs_dir)), @@ -73,47 +118,78 @@ async def get_file_metadata(self, file_path: str) -> dict[str, Any]: "is_readable": s.st_size > 0, } - async def iter_file_lines_paged( - self, - file_path: str, - skip: int = 0, - limit: int = 400, - ) -> AsyncIterator[dict[str, Any]]: - """Return a tail page of lines in chronological order, paging backward from end of file. + async def get_file_metadata(self, file_path: str) -> dict[str, Any]: + # A single stat — cheap enough to skip the concurrency gate. + return await asyncio.to_thread(self._get_file_metadata_sync, file_path) - skip: number of lines from the end already loaded (0 = start from last line) - limit: page size - Returns lines in normal chronological order so newest appears at bottom. - """ + # ── paged tail reader ───────────────────────────────────────────────────── + + def _read_tail_page_sync(self, file_path: str, skip: int, limit: int) -> dict[str, Any] | None: logs_dir = self._logs_dir target = logs_dir / file_path ext = target.suffix.lower() - if not await aiofiles.os.path.exists(target): + if not target.exists(): raise NotFoundError(message="File not found") - if not await aiofiles.os.path.isfile(target): + if not target.is_file(): raise InvalidArgumentError(message="Path is not a file") if ext not in {".log", ".txt"}: raise UnsupportedMediaTypeError(message=f"Streaming not supported for '{ext}'") - async with aiofiles.open(target, encoding="utf-8", errors="replace") as f: - content = await f.read() - - all_lines = [l for l in content.splitlines() if l.strip()] - total = len(all_lines) - # Page backward from the end: window is [start, end) in chronological order - end = max(0, total - skip) + tail, reached_start = _read_tail_lines(target, skip + limit + 1) + n = len(tail) + # Window [start, end) within `tail`, in chronological order. skip counts lines + # already loaded from the end; limit is the page size. + end = max(0, n - skip) start = max(0, end - limit) - page = all_lines[start:end] - has_more = start > 0 - - if page: - yield { - "content": "\n".join(page), - "content_type": "text/plain", - "chunk_index": 0, - "has_more": has_more, - "total_lines": total, + page = tail[start:end] + has_more = (start > 0) if reached_start else end > 0 + + if not page: + return None + return { + "content": "\n".join(page), + "content_type": "text/plain", + "chunk_index": 0, + "has_more": has_more, + "total_lines": n if reached_start else None, + } + + async def iter_file_lines_paged( + self, + file_path: str, + skip: int = 0, + limit: int = 400, + ) -> AsyncIterator[dict[str, Any]]: + """Yield a tail page of lines in chronological order, paging backward from the + end of the file without reading the whole file into memory. + + skip: number of lines from the end already loaded (0 = start from last line) + limit: page size + """ + async with LOG_WORK_SEMAPHORE: + page = await asyncio.to_thread(self._read_tail_page_sync, file_path, skip, limit) + if page is not None: + yield page + + # ── whole-file chunk reader ─────────────────────────────────────────────── + + def _read_json_sync(self, target: Path) -> dict[str, Any]: + with open(target, encoding="utf-8") as f: + raw = f.read() + try: + parsed = json.loads(raw) + return { + "content": json.dumps(parsed, indent=2), + "content_type": "application/json", + "json_valid": True, + } + except json.JSONDecodeError as je: + return { + "content": raw, + "content_type": "application/json", + "json_valid": False, + "content_error": str(je), } async def iter_file_chunks(self, file_path: str) -> AsyncIterator[dict[str, Any]]: @@ -124,51 +200,50 @@ async def iter_file_chunks(self, file_path: str) -> AsyncIterator[dict[str, Any] if ext not in {".log", ".txt", ".json"}: raise UnsupportedMediaTypeError(message=f"Streaming not supported for '{ext}'") - if ext == ".json": - async with aiofiles.open(target, encoding="utf-8") as f: - raw = await f.read() - try: - parsed = json.loads(raw) - yield { - "content": json.dumps(parsed, indent=2), - "content_type": "application/json", - "json_valid": True, - } - except json.JSONDecodeError as je: - yield { - "content": raw, - "content_type": "application/json", - "json_valid": False, - "content_error": str(je), - } - return - - async with aiofiles.open(target, encoding="utf-8", errors="replace") as f: - index = 0 - while chunk := await f.read(CHUNK_SIZE): - yield {"content": chunk, "content_type": "text/plain", "chunk_index": index} - index += 1 + async with LOG_WORK_SEMAPHORE: + if ext == ".json": + # Parsing/re-serialising is CPU-bound — keep it off the event loop. + yield await asyncio.to_thread(self._read_json_sync, target) + return - async def iter_session_logs(self, session_id: str) -> AsyncIterator[dict[str, Any]]: + async with aiofiles.open(target, encoding="utf-8", errors="replace") as f: + index = 0 + while chunk := await f.read(CHUNK_SIZE): + yield {"content": chunk, "content_type": "text/plain", "chunk_index": index} + index += 1 + + # ── session logs ────────────────────────────────────────────────────────── + + def _collect_session_logs_sync(self, session_id: str) -> list[dict[str, Any]]: logs_dir = self._logs_dir - if not await aiofiles.os.path.exists(logs_dir): - return + if not logs_dir.exists(): + return [] + out: list[dict[str, Any]] = [] for log_file in logs_dir.glob(f"**/session_id={session_id}/**/*.log"): if not log_file.is_file(): continue try: - s = await aiofiles.os.stat(log_file) - yield { - "name": log_file.name, - "path": str(log_file.relative_to(logs_dir)), - "full_path": str(log_file), - "size": s.st_size, - "modified": s.st_mtime, - "created": s.st_ctime, - } + s = log_file.stat() + out.append( + { + "name": log_file.name, + "path": str(log_file.relative_to(logs_dir)), + "full_path": str(log_file), + "size": s.st_size, + "modified": s.st_mtime, + "created": s.st_ctime, + } + ) except Exception: continue + return out + + async def iter_session_logs(self, session_id: str) -> AsyncIterator[dict[str, Any]]: + async with LOG_WORK_SEMAPHORE: + entries = await asyncio.to_thread(self._collect_session_logs_sync, session_id) + for entry in entries: + yield entry # ── legacy non-streaming (kept for compatibility) ───────────────────────── @@ -189,25 +264,25 @@ async def get_logs_by_session_id(self, session_id: str) -> dict[str, Any]: logs.sort(key=lambda x: x["modified"], reverse=True) return {"session_id": session_id, "logs": logs, "total_count": len(logs)} - async def get_session_log_content( + def _get_session_log_content_sync( self, session_id: str, - level: str | None = None, - category: str | None = None, - page: int = 1, - per_page: int = 50, + level: str | None, + category: str | None, + page: int, + per_page: int, ) -> dict[str, Any]: - """Read and parse all log files for a session, returning structured entries.""" logs_dir = self._logs_dir - if not await aiofiles.os.path.exists(logs_dir): - return { - "session_id": session_id, - "entries": [], - "total_count": 0, - "page": page, - "per_page": per_page, - "files_read": 0, - } + empty = { + "session_id": session_id, + "entries": [], + "total_count": 0, + "page": page, + "per_page": per_page, + "files_read": 0, + } + if not logs_dir.exists(): + return empty all_entries: list[dict[str, Any]] = [] files_read = 0 @@ -224,8 +299,8 @@ async def get_session_log_content( continue seen_files.add(str(log_file)) try: - async with aiofiles.open(log_file, encoding="utf-8", errors="replace") as f: - content = await f.read() + with open(log_file, encoding="utf-8", errors="replace") as f: + content = f.read() files_read += 1 for line in content.splitlines(): line = line.strip() @@ -267,6 +342,20 @@ async def get_session_log_content( "files_read": files_read, } + async def get_session_log_content( + self, + session_id: str, + level: str | None = None, + category: str | None = None, + page: int = 1, + per_page: int = 50, + ) -> dict[str, Any]: + """Read and parse all log files for a session, returning structured entries.""" + async with LOG_WORK_SEMAPHORE: + return await asyncio.to_thread( + self._get_session_log_content_sync, session_id, level, category, page, per_page + ) + def get_logs_service() -> LogsService: return LogsService() From 1c62d3a09868b47ef032b4c76dda012b0b55a61e Mon Sep 17 00:00:00 2001 From: LeastAction-Labs Date: Wed, 17 Jun 2026 17:11:49 -0700 Subject: [PATCH 2/2] fix: bound heavy log ops with a wall-clock timeout (#11) A stuck disk/mount or pathological glob could hold a LOG_WORK_SEMAPHORE slot forever; three such ops would permanently lock out every log endpoint. - Add run_log_op(): acquire semaphore + asyncio.to_thread + asyncio.wait_for, so the slot is released on timeout and capacity recovers. Raises TimeoutError with a clear message (bare asyncio.TimeoutError stringifies to ''). - Route the heavy methods and query_logs through it; query_logs uses a larger LOG_QUERY_TIMEOUT and returns a graceful {"error": ...} on timeout. - iter_file_chunks keeps its manual semaphore (held across the stream) and bounds each blocking read with wait_for instead. Note: wait_for cancels the awaiting coroutine but cannot kill the worker thread, so timeouts are generous (30s ops, 120s queries) and only fire on genuine pathology. Verified the slot is released and capacity recovers even when all slots time out simultaneously. Co-Authored-By: Claude Opus 4.8 --- backend/src/core/api/routes/logs.py | 20 ++++++++--- backend/src/core/logs_details/service.py | 46 +++++++++++++++++------- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/backend/src/core/api/routes/logs.py b/backend/src/core/api/routes/logs.py index 90f6996..5931bcd 100644 --- a/backend/src/core/api/routes/logs.py +++ b/backend/src/core/api/routes/logs.py @@ -3,7 +3,6 @@ # LeastAction Sustainable Use License (see LICENSE.md) or, for files # marked EE, the LeastAction Enterprise Edition License (see LICENSE_EE.md). # Use of this file outside those terms is not permitted. -import asyncio import json import traceback from collections.abc import AsyncGenerator @@ -17,7 +16,12 @@ from src.common.config import Config from src.common.context_vars.user_context import get_user_laui from src.common.logger.logger import log_error, log_info -from src.core.logs_details.service import LOG_WORK_SEMAPHORE, LogsService, get_logs_service +from src.core.logs_details.service import ( + LOG_QUERY_TIMEOUT, + LogsService, + get_logs_service, + run_log_op, +) logs_router = APIRouter() @@ -94,10 +98,16 @@ async def query_logs(request: LogQueryRequest) -> dict: log_info( "api", "logs_router", "query_logs", f"user={get_user_laui()} payload={request.model_dump()}" ) - async with LOG_WORK_SEMAPHORE: - return await asyncio.to_thread( - _run_duckdb_query, request.sql, request.category, request.date + try: + return await run_log_op( + _run_duckdb_query, + request.sql, + request.category, + request.date, + timeout=LOG_QUERY_TIMEOUT, ) + except TimeoutError as exc: + return {"error": str(exc)} def _sse(event: str, data: Any) -> str: diff --git a/backend/src/core/logs_details/service.py b/backend/src/core/logs_details/service.py index efb0b81..2a8174f 100644 --- a/backend/src/core/logs_details/service.py +++ b/backend/src/core/logs_details/service.py @@ -25,6 +25,27 @@ _MAX_CONCURRENT_LOG_OPS = 3 LOG_WORK_SEMAPHORE = asyncio.Semaphore(_MAX_CONCURRENT_LOG_OPS) +# Wall-clock ceiling for a single heavy log operation. On timeout the semaphore +# slot is released so a wedged disk/mount or pathological glob can't permanently +# consume capacity and lock out every log endpoint (issue #11, "thread locks"). +# Caveat: the worker thread keeps running — Python threads aren't cancellable — +# so keep this generous; it should only fire on genuine pathology, not slow I/O. +_LOG_OP_TIMEOUT = 30.0 # seconds +# DuckDB queries over wide date ranges are legitimately slower; give them headroom. +LOG_QUERY_TIMEOUT = 120.0 # seconds + + +async def run_log_op(fn, *args, timeout: float = _LOG_OP_TIMEOUT): + """Run a blocking log operation in a thread, gated by the concurrency semaphore + and bounded by a wall-clock timeout. Raises TimeoutError with a clear message + (plain asyncio.TimeoutError stringifies to ''), which the SSE handlers surface + as an error event.""" + async with LOG_WORK_SEMAPHORE: + try: + return await asyncio.wait_for(asyncio.to_thread(fn, *args), timeout) + except TimeoutError as exc: + raise TimeoutError(f"Log operation exceeded {int(timeout)}s and was aborted") from exc + def _read_tail_lines(path: Path, needed: int) -> tuple[list[str], bool]: """Read non-empty lines from the END of a file, growing backward until at least @@ -92,8 +113,7 @@ def _list_folder_items_sync(self, folder_path: str) -> dict[str, Any]: return {"directory": str(target), "items": items, "total_count": len(items)} async def list_folder_items(self, folder_path: str) -> dict[str, Any]: - async with LOG_WORK_SEMAPHORE: - return await asyncio.to_thread(self._list_folder_items_sync, folder_path) + return await run_log_op(self._list_folder_items_sync, folder_path) # ── file metadata (light) ───────────────────────────────────────────────── @@ -167,8 +187,7 @@ async def iter_file_lines_paged( skip: number of lines from the end already loaded (0 = start from last line) limit: page size """ - async with LOG_WORK_SEMAPHORE: - page = await asyncio.to_thread(self._read_tail_page_sync, file_path, skip, limit) + page = await run_log_op(self._read_tail_page_sync, file_path, skip, limit) if page is not None: yield page @@ -200,15 +219,20 @@ async def iter_file_chunks(self, file_path: str) -> AsyncIterator[dict[str, Any] if ext not in {".log", ".txt", ".json"}: raise UnsupportedMediaTypeError(message=f"Streaming not supported for '{ext}'") + # Held across the whole stream to throttle concurrent big-file reads, so + # run_log_op (which acquires the same semaphore) can't be used here. Each + # blocking step is bounded with wait_for so a stalled read can't pin the slot. async with LOG_WORK_SEMAPHORE: if ext == ".json": # Parsing/re-serialising is CPU-bound — keep it off the event loop. - yield await asyncio.to_thread(self._read_json_sync, target) + yield await asyncio.wait_for( + asyncio.to_thread(self._read_json_sync, target), _LOG_OP_TIMEOUT + ) return async with aiofiles.open(target, encoding="utf-8", errors="replace") as f: index = 0 - while chunk := await f.read(CHUNK_SIZE): + while chunk := await asyncio.wait_for(f.read(CHUNK_SIZE), _LOG_OP_TIMEOUT): yield {"content": chunk, "content_type": "text/plain", "chunk_index": index} index += 1 @@ -240,8 +264,7 @@ def _collect_session_logs_sync(self, session_id: str) -> list[dict[str, Any]]: return out async def iter_session_logs(self, session_id: str) -> AsyncIterator[dict[str, Any]]: - async with LOG_WORK_SEMAPHORE: - entries = await asyncio.to_thread(self._collect_session_logs_sync, session_id) + entries = await run_log_op(self._collect_session_logs_sync, session_id) for entry in entries: yield entry @@ -351,10 +374,9 @@ async def get_session_log_content( per_page: int = 50, ) -> dict[str, Any]: """Read and parse all log files for a session, returning structured entries.""" - async with LOG_WORK_SEMAPHORE: - return await asyncio.to_thread( - self._get_session_log_content_sync, session_id, level, category, page, per_page - ) + return await run_log_op( + self._get_session_log_content_sync, session_id, level, category, page, per_page + ) def get_logs_service() -> LogsService: