Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions embed/src/pixelrag_embed/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def chunk_article(article_dir: str, dry_run: bool = False, force: bool = False)
page_height = meta.get("page_height", 0)
viewport_width = meta.get("viewport_width", 875)
tile_height = meta.get("tile_height", 8192)
article_id = meta.get("article_id") # propagate from tiles.json into chunks.json

chunks_info = [] # list of {tile, chunk_index, file, y_offset, height}
files_written = 0
Expand Down Expand Up @@ -229,6 +230,8 @@ def chunk_article(article_dir: str, dry_run: bool = False, force: bool = False)
"tile_hashes": tile_hashes,
"chunks": chunks_info,
}
if article_id is not None:
manifest["article_id"] = article_id

if not dry_run:
with open(chunks_json, "w") as f:
Expand Down
38 changes: 25 additions & 13 deletions embed/src/pixelrag_embed/embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,14 @@ def scan_shard_tiles(
if not meta.get("complete", False):
continue

# Extract article_id from directory name: "3104240.png.tiles" -> 3104240
dir_name = tiles_dir.name # e.g. "3104240.png.tiles"
try:
article_id = int(dir_name.split(".")[0])
except (ValueError, IndexError):
logger.warning("Cannot parse article_id from %s", dir_name)
continue
article_id = meta.get("article_id")
if article_id is None:
dir_name = tiles_dir.name
try:
article_id = int(dir_name.split(".")[0])
except (ValueError, IndexError):
logger.warning("Cannot parse article_id from %s", dir_name)
continue

if article_id in skip:
continue
Expand Down Expand Up @@ -343,12 +344,23 @@ def scan_shard_chunks(
logger.warning("Skipping %s: %s", chunks_json, e)
continue

dir_name = tiles_dir.name
try:
article_id = int(dir_name.split(".")[0])
except (ValueError, IndexError):
logger.warning("Cannot parse article_id from %s", dir_name)
continue
article_id = meta.get("article_id")
if article_id is None:
# chunks.json predates the article_id contract — try the sibling
# tiles.json (CPU embedder does the same), then the directory name.
tiles_json = tiles_dir / "tiles.json"
if tiles_json.exists():
try:
article_id = json.loads(tiles_json.read_text()).get("article_id")
except (json.JSONDecodeError, OSError):
pass
if article_id is None:
dir_name = tiles_dir.name
try:
article_id = int(dir_name.split(".")[0])
except (ValueError, IndexError):
logger.warning("Cannot parse article_id from %s", dir_name)
continue

if article_id in skip:
continue
Expand Down
30 changes: 23 additions & 7 deletions embed/src/pixelrag_embed/embed_cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

import argparse
import hashlib
import json
import logging
import os
Expand Down Expand Up @@ -79,16 +80,31 @@ def scan_chunks(shard_dir: str) -> list[dict]:
)

for td in tile_dirs:
dir_name = td.name
article_id_str = dir_name.replace(".png.tiles", "")
try:
article_id = int(article_id_str)
except ValueError:
article_id = hash(article_id_str) % (2**31)

chunks_json = td / "chunks.json"
tiles_json = td / "tiles.json"

# Read article_id from the manifest (written by the pipeline).
# Fall back to parsing the directory name for backward compat
# with indexes built before this change.
article_id = None
for mf in (chunks_json, tiles_json):
if mf.exists() and article_id is None:
try:
article_id = json.loads(mf.read_text()).get("article_id")
except (json.JSONDecodeError, OSError):
pass
if article_id is None:
article_id_str = td.name.replace(".png.tiles", "")
try:
article_id = int(article_id_str)
except ValueError:
# Non-numeric dir name with no manifest article_id. Use a
# stable hash (builtin hash() is salted by PYTHONHASHSEED and
# would give a different id every build -> non-reproducible
# index). sha1 keeps the same id for the same dir name.
digest = hashlib.sha1(article_id_str.encode()).hexdigest()
article_id = int(digest[:8], 16)

if chunks_json.exists():
with open(chunks_json) as f:
manifest = json.load(f)
Expand Down
30 changes: 22 additions & 8 deletions index/src/pixelrag_index/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,28 +89,42 @@ def build(config: dict, limit: int | None = None, force: bool = False) -> Path:
" Rendered %d URLs (%d skipped, already exist)", len(new_url_docs), skipped
)

# Render PDFs
# Render PDFs — use idx as tile directory name (like URLs) so directory
# names are always the numeric article_id.
for idx, doc in pdf_docs:
try:
render_pdf(doc.path, str(tiles_dir))
render_pdf(doc.path, str(tiles_dir), stem=str(idx))
except Exception as e:
logger.warning(" FAILED PDF %s: %s", doc.id, e)
if pdf_docs:
logger.info(" Rendered %d PDFs", len(pdf_docs))

# Save articles.json for serve API — title + URL per article.
# Use the pipeline's sequential *position index* (0, 1, 2, …) rather than
# int(a["id"]), because local sources use filename stems (e.g. "art_alice")
# as doc IDs, which are not numeric. int() on a filename stem raises ValueError
# and crashes the entire index build step.
# Write article_id into each tile directory's manifests so the embed
# pipeline reads it explicitly instead of guessing from the directory name.
# tiles.json always exists here. chunks.json exists only for PDFs (pdf.py
# writes it at render time, and chunk.py then skips those dirs); for URLs it
# is created by Stage 2's chunk.py, which propagates article_id from
# tiles.json. So write whichever manifests exist now.
for idx, _ in url_docs + pdf_docs + image_docs:
for manifest_name in ("tiles.json", "chunks.json"):
manifest_path = tiles_dir / f"{idx}.png.tiles" / manifest_name
if manifest_path.exists():
try:
manifest = json.loads(manifest_path.read_text())
manifest["article_id"] = idx
manifest_path.write_text(json.dumps(manifest))
except (json.JSONDecodeError, OSError):
pass

# Save articles.json for serve API — maps article_id (array index) to
# human-readable title + URL.
articles_path = output / "articles.json"
article_entries = []
for enum_idx, a in enumerate(articles):
title = a.get("metadata", {}).get("title", "")
if not title and a.get("url"):
title = a["url"].split("/")[-1].replace("_", " ").replace("%20", " ")
if not title:
# Fall back to original doc id (e.g. filename stem) as display title
title = a.get("id", str(enum_idx))
url = a.get("url", "") or a.get("path", "")
article_entries.append({"title": title, "url": url})
Expand Down
7 changes: 6 additions & 1 deletion render/src/pixelrag_render/backends/pdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def render_pdf(
dpi: int = 200,
pages: Optional[list[int]] = None,
quality: int = 85,
stem: str | None = None,
) -> list[Path]:
"""Render a PDF to JPEG tiles.

Expand All @@ -32,6 +33,9 @@ def render_pdf(
dpi: Resolution for rendering (default 200 gives ~1650×2200px for A4).
pages: 1-based list of page numbers to render. ``None`` renders all pages.
quality: JPEG quality 1-100 (default 85).
stem: Override for the tile directory name. Defaults to the PDF filename
stem. The pipeline passes the article_id here so directory names
are always numeric and consistent with articles.json.

Returns:
List containing the single tile directory Path on success.
Expand All @@ -55,7 +59,8 @@ def render_pdf(
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)

stem = path.stem
if stem is None:
stem = path.stem
tile_dir = output_dir / f"{stem}.png.tiles"
tile_dir.mkdir(parents=True, exist_ok=True)

Expand Down
4 changes: 3 additions & 1 deletion render/src/pixelrag_render/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def render_pdf(
dpi: int = 200,
pages: Optional[list[int]] = None,
quality: int = 85,
stem: str | None = None,
) -> list[Path]:
"""Render a PDF file to tiled JPEG images.

Expand All @@ -124,13 +125,14 @@ def render_pdf(
dpi: Rendering resolution (default 200 ≈ 1650×2200 for A4).
pages: 1-based list of page numbers to render. ``None`` renders all.
quality: JPEG quality 1-100 (default 85).
stem: Override for the tile directory name (default: PDF filename stem).

Returns:
List containing the tile directory Path on success.
"""
from .backends.pdf import render_pdf as _render_pdf

return _render_pdf(path, output_dir, dpi=dpi, pages=pages, quality=quality)
return _render_pdf(path, output_dir, dpi=dpi, pages=pages, quality=quality, stem=stem)


def render_file(
Expand Down
101 changes: 101 additions & 0 deletions tests/test_article_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Tests for the article_id manifest contract.

The pipeline writes article_id into tiles.json. chunk.py propagates it into
chunks.json. Both embedders (GPU scan_shard_chunks, CPU scan_chunks) read it
from the manifest, falling back to the directory name only for legacy indexes.
"""

import json
from pathlib import Path

from PIL import Image

from pixelrag_embed.chunk import chunk_article
from pixelrag_embed.embed import scan_shard_chunks
from pixelrag_embed.embed_cpu import scan_chunks


def _make_tile_dir(base: Path, dir_name: str, article_id: int | None = None) -> Path:
"""A tile dir with only tiles.json (as it exists right after rendering)."""
td = base / f"{dir_name}.png.tiles"
td.mkdir(parents=True)
Image.new("RGB", (875, 500)).save(td / "tile_0000.png")
meta = {"tiles": ["tile_0000.png"], "tile_height": 8192, "complete": True}
if article_id is not None:
meta["article_id"] = article_id
(td / "tiles.json").write_text(json.dumps(meta))
return td


def _read_chunks_article_id(td: Path):
return json.loads((td / "chunks.json").read_text()).get("article_id")


# --- chunk.py propagates article_id from tiles.json into chunks.json ----------


def test_chunk_propagates_article_id_to_chunks_json(tmp_path):
td = _make_tile_dir(tmp_path, "report", article_id=0)
chunk_article(str(td))
# This is the real data flow the GPU embedder depends on.
assert _read_chunks_article_id(td) == 0


def test_chunk_without_article_id_omits_it(tmp_path):
td = _make_tile_dir(tmp_path, "5", article_id=None)
chunk_article(str(td))
assert "article_id" not in json.loads((td / "chunks.json").read_text())


# --- GPU embedder (scan_shard_chunks) reads it end-to-end --------------------


def test_gpu_scan_reads_propagated_article_id(tmp_path):
# Non-numeric dir name: only the manifest can supply the right id.
td = _make_tile_dir(tmp_path, "report", article_id=3)
chunk_article(str(td))
chunks = scan_shard_chunks(str(tmp_path))
assert chunks and all(c.article_id == 3 for c in chunks)


def test_gpu_scan_falls_back_to_tiles_json(tmp_path):
# chunks.json lacks article_id (legacy chunker) but tiles.json has it.
td = _make_tile_dir(tmp_path, "report", article_id=4)
chunk_article(str(td))
chunks_json = td / "chunks.json"
meta = json.loads(chunks_json.read_text())
meta.pop("article_id")
chunks_json.write_text(json.dumps(meta))
chunks = scan_shard_chunks(str(tmp_path))
assert chunks and all(c.article_id == 4 for c in chunks)


def test_gpu_scan_falls_back_to_numeric_dir_name(tmp_path):
td = _make_tile_dir(tmp_path, "42", article_id=None)
chunk_article(str(td))
chunks = scan_shard_chunks(str(tmp_path))
assert chunks and all(c.article_id == 42 for c in chunks)


# --- CPU embedder (scan_chunks) ---------------------------------------------


def test_cpu_scan_reads_article_id_from_manifest(tmp_path):
td = _make_tile_dir(tmp_path, "report", article_id=7)
chunk_article(str(td))
items = scan_chunks(str(tmp_path))
assert items and all(it["article_id"] == 7 for it in items)


def test_cpu_non_numeric_fallback_is_reproducible(tmp_path):
# No manifest id, non-numeric dir → must be a *stable* hash, not the salted
# builtin hash() (which changes per process via PYTHONHASHSEED and would make
# the index non-reproducible). Assert the exact sha1-derived value so a
# regression back to builtin hash() fails here.
import hashlib

td = _make_tile_dir(tmp_path, "my_report", article_id=None)
chunk_article(str(td))
got = scan_chunks(str(tmp_path))[0]["article_id"]
expected = int(hashlib.sha1(b"my_report").hexdigest()[:8], 16)
assert got == expected
Loading