From 1d447d2c82d30b74bfaecb15f6d0b0d7fc5229a2 Mon Sep 17 00:00:00 2001 From: Bacem Date: Mon, 8 Jun 2026 18:15:19 +0200 Subject: [PATCH] feat: ingestion, summariser, signals, fusion, and pipeline --- sumlens/fuse.py | 94 +++++++++++++++++++++ sumlens/ingest.py | 74 ++++++++++++++++ sumlens/pipeline.py | 111 ++++++++++++++++++++++++ sumlens/signals/__init__.py | 0 sumlens/signals/attribution.py | 94 +++++++++++++++++++++ sumlens/signals/classifier.py | 57 +++++++++++++ sumlens/signals/nli.py | 87 +++++++++++++++++++ sumlens/summarise.py | 46 ++++++++++ tests/test_attribution.py | 75 +++++++++++++++++ tests/test_classifier.py | 83 ++++++++++++++++++ tests/test_fuse.py | 87 +++++++++++++++++++ tests/test_ingest.py | 72 ++++++++++++++++ tests/test_nli.py | 95 +++++++++++++++++++++ tests/test_pipeline.py | 149 +++++++++++++++++++++++++++++++++ tests/test_summarise.py | 61 ++++++++++++++ 15 files changed, 1185 insertions(+) create mode 100644 sumlens/fuse.py create mode 100644 sumlens/ingest.py create mode 100644 sumlens/pipeline.py create mode 100644 sumlens/signals/__init__.py create mode 100644 sumlens/signals/attribution.py create mode 100644 sumlens/signals/classifier.py create mode 100644 sumlens/signals/nli.py create mode 100644 sumlens/summarise.py create mode 100644 tests/test_attribution.py create mode 100644 tests/test_classifier.py create mode 100644 tests/test_fuse.py create mode 100644 tests/test_ingest.py create mode 100644 tests/test_nli.py create mode 100644 tests/test_pipeline.py create mode 100644 tests/test_summarise.py diff --git a/sumlens/fuse.py b/sumlens/fuse.py new file mode 100644 index 0000000..5f5c002 --- /dev/null +++ b/sumlens/fuse.py @@ -0,0 +1,94 @@ +"""Signal fusion, calibration, and labelling. + +Fusion combines the three sentence-level signals into one grounding probability. +If a trained model is present at `model_path` it is used; otherwise we fall back to +an identity fusion (mean of grounding-oriented signals) so the pipeline still runs +without weights (CI, fresh checkout). Same idea for Platt calibration. + +Convention: class 1 = **grounded**. The fusion model is trained with `grounded` +labels (1 if the summary sentence is grounded, 0 if hallucinated), so its +`predict_proba[:, 1]` is the grounding score that `label` thresholds. Feature order +is `FEATURE_ORDER`; missing signals are imputed to neutral 0.5. + +Scikit-learn is imported lazily so the pipeline does not depend on it unless a +trained model is actually loaded or fitted. +""" + +import pickle +from pathlib import Path +from typing import Any, Literal + +from sumlens.types import AnalysisConfig, SignalScores + +_NEUTRAL = 0.5 +FEATURE_ORDER = ("classifier", "nli", "attribution") + + +def fuse(signals: dict[str, SignalScores], model_path: Path) -> dict[str, float]: + if not model_path.exists(): + return {sentence_id: _grounding(score) for sentence_id, score in signals.items()} + model = _load(model_path) + ids = list(signals) + features = [_feature_vector(signals[i]) for i in ids] + grounded_proba = model.predict_proba(features)[:, 1] + return {i: float(p) for i, p in zip(ids, grounded_proba, strict=True)} + + +def calibrate(scores: dict[str, float], platt_path: Path) -> dict[str, float]: + if not platt_path.exists(): + return dict(scores) + platt = _load(platt_path) + ids = list(scores) + calibrated = platt.predict_proba([[scores[i]] for i in ids])[:, 1] + return {i: float(c) for i, c in zip(ids, calibrated, strict=True)} + + +def label(score: float, cfg: AnalysisConfig) -> Literal["grounded", "weak", "hallucinated"]: + if score < cfg.tau_hallucinated: + return "hallucinated" + if score >= cfg.tau_grounded: + return "grounded" + return "weak" + + +def fit_fusion(features: list[list[float]], grounded: list[int]) -> Any: + """Fit the fusion LogisticRegression. `grounded` = 1 if grounded, 0 if hallucinated.""" + from sklearn.linear_model import LogisticRegression + + model = LogisticRegression(max_iter=1000) + model.fit(features, grounded) + return model + + +def fit_platt(scores: list[float], grounded: list[int]) -> Any: + """Fit a 1-D Platt calibrator mapping a fused score to a calibrated grounding prob.""" + from sklearn.linear_model import LogisticRegression + + model = LogisticRegression(max_iter=1000) + model.fit([[s] for s in scores], grounded) + return model + + +def _feature_vector(signals: SignalScores) -> list[float]: + """Signals in FEATURE_ORDER; missing values imputed to neutral 0.5.""" + values = signals.model_dump() + return [_NEUTRAL if values[name] is None else float(values[name]) for name in FEATURE_ORDER] + + +def _grounding(signals: SignalScores) -> float: + """Identity fusion: mean of available grounding-oriented signals; 0.5 if none.""" + contributions = [] + if signals.classifier is not None: + contributions.append(1.0 - signals.classifier) + if signals.nli is not None: + contributions.append(signals.nli) + if signals.attribution is not None: + contributions.append(signals.attribution) + if not contributions: + return _NEUTRAL + return sum(contributions) / len(contributions) + + +def _load(path: Path) -> Any: + with path.open("rb") as fh: + return pickle.load(fh) diff --git a/sumlens/ingest.py b/sumlens/ingest.py new file mode 100644 index 0000000..cb48efb --- /dev/null +++ b/sumlens/ingest.py @@ -0,0 +1,74 @@ +"""Ingestion — PDF or raw text into a `Document`. + +PDF text is extracted with pdfplumber. Text is cleaned and paragraph-segmented on +blank lines, then split into sentences with NLTK Punkt. Sentence ids are stable +`src-0000`, `src-0001`, ... and carry char offsets into `Document.raw_text`. +""" + +import re +from pathlib import Path +from typing import Any + +import nltk +import pdfplumber + +from sumlens.types import Document, Sentence + +_BLANK_LINE = re.compile(r"\n\s*\n") +_WHITESPACE = re.compile(r"\s+") + + +def load_pdf(path: Path) -> Document: + with pdfplumber.open(path) as pdf: + pages = [page.extract_text() or "" for page in pdf.pages] + raw_text = _clean("\n\n".join(pages)) + meta: dict[str, Any] = {"filename": path.name, "word_count": _word_count(raw_text)} + return Document( + id=path.stem, + raw_text=raw_text, + sentences=split_sentences(raw_text, "src"), + source="pdf", + meta=meta, + ) + + +def load_text(text: str) -> Document: + raw_text = _clean(text) + meta: dict[str, Any] = {"word_count": _word_count(raw_text)} + return Document( + id="text", + raw_text=raw_text, + sentences=split_sentences(raw_text, "src"), + source="text", + meta=meta, + ) + + +def _clean(text: str) -> str: + """Collapse each blank-line-delimited paragraph onto one line; join with \\n\\n.""" + paragraphs = [] + for para in _BLANK_LINE.split(text): + collapsed = _WHITESPACE.sub(" ", para).strip() + if collapsed: + paragraphs.append(collapsed) + return "\n\n".join(paragraphs) + + +def split_sentences(text: str, id_prefix: str) -> list[Sentence]: + """NLTK Punkt sentence split with char offsets; ids `{id_prefix}-0000`, ...""" + if not text: + return [] + tokenizer = nltk.data.load("tokenizers/punkt/english.pickle") + return [ + Sentence( + id=f"{id_prefix}-{i:04d}", + text=text[start:end], + char_start=start, + char_end=end, + ) + for i, (start, end) in enumerate(tokenizer.span_tokenize(text)) + ] + + +def _word_count(raw_text: str) -> int: + return len(raw_text.split()) diff --git a/sumlens/pipeline.py b/sumlens/pipeline.py new file mode 100644 index 0000000..3667abe --- /dev/null +++ b/sumlens/pipeline.py @@ -0,0 +1,111 @@ +"""Pipeline orchestration — Document into a full AnalysisResult. + +Stages: summarise -> signal A (classifier) and signal B (NLI) -> gate signal C +(attribution) on the sentences A or B flag as suspicious -> fuse -> calibrate -> +label -> assemble evidence. Each stage is timed into `timings_ms`. + +A and B run sequentially here; the data-model calls for them "in parallel", which +is a latency optimisation, not a correctness requirement, so it is deferred. +""" + +import time +from collections.abc import Callable +from pathlib import Path +from typing import TypeVar + +from sumlens.fuse import calibrate, fuse, label +from sumlens.signals.attribution import attribute +from sumlens.signals.classifier import classify +from sumlens.signals.nli import entail, extract_claims +from sumlens.summarise import summarise +from sumlens.types import ( + AnalysisConfig, + AnalysisResult, + Claim, + Document, + Evidence, + SentenceVerdict, + SignalScores, + Summary, +) + +_MODELS_DIR = Path(__file__).resolve().parent.parent / "models" +_FUSION_MODEL_PATH = _MODELS_DIR / "fusion.pkl" +_PLATT_MODEL_PATH = _MODELS_DIR / "platt.pkl" +_C_GATE = 0.5 # run attribution where classifier >= gate (A high) or nli < gate (B low) + +T = TypeVar("T") + + +def analyse(document: Document, cfg: AnalysisConfig) -> AnalysisResult: + timings: dict[str, int] = {} + + summary = _timed(timings, "summarise", lambda: summarise(document, cfg)) + classifier_out = _timed(timings, "classify", lambda: classify(document, summary, cfg)) + nli_out = _timed(timings, "nli", lambda: entail(extract_claims(summary), document, cfg)) + + gated = _gated_summary(summary, classifier_out, nli_out) + attribution_out = _timed(timings, "attribute", lambda: attribute(document, gated, cfg)) + + signals: dict[str, SignalScores] = {} + evidence_parts: dict[str, tuple[list[tuple[int, int]], list[Claim], list[str]]] = {} + for sentence in summary.sentences: + a_score, a_spans = classifier_out[sentence.id] + b_score, b_failed = nli_out.get(sentence.id, (None, [])) + c_peak, c_top = attribution_out.get(sentence.id, (None, [])) + signals[sentence.id] = SignalScores(classifier=a_score, nli=b_score, attribution=c_peak) + evidence_parts[sentence.id] = (a_spans, b_failed, c_top) + + fused = _timed( + timings, + "fuse", + lambda: calibrate(fuse(signals, _FUSION_MODEL_PATH), _PLATT_MODEL_PATH), + ) + + verdicts = [] + for sentence in summary.sentences: + a_spans, b_failed, c_top = evidence_parts[sentence.id] + score = fused[sentence.id] + verdicts.append( + SentenceVerdict( + sentence_id=sentence.id, + fused_score=score, + label=label(score, cfg), + signals=signals[sentence.id], + evidence=Evidence( + failed_claims=b_failed, + top_source_sentence_ids=c_top, + classifier_token_spans=a_spans, + ), + ) + ) + + return AnalysisResult( + document=document, + summary=summary, + verdicts=verdicts, + config=cfg, + timings_ms=timings, + ) + + +def _gated_summary( + summary: Summary, + classifier_out: dict[str, tuple[float, list[tuple[int, int]]]], + nli_out: dict[str, tuple[float, list[Claim]]], +) -> Summary: + """Keep only sentences A or B flag as suspicious — attribution runs on these.""" + suspicious = [] + for sentence in summary.sentences: + a_score = classifier_out[sentence.id][0] + b_score = nli_out.get(sentence.id, (None, []))[0] + if a_score >= _C_GATE or (b_score is not None and b_score < _C_GATE): + suspicious.append(sentence) + return summary.model_copy(update={"sentences": suspicious}) + + +def _timed(timings: dict[str, int], name: str, fn: Callable[[], T]) -> T: + start = time.perf_counter() + result = fn() + timings[name] = int((time.perf_counter() - start) * 1000) + return result diff --git a/sumlens/signals/__init__.py b/sumlens/signals/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sumlens/signals/attribution.py b/sumlens/signals/attribution.py new file mode 100644 index 0000000..f28034e --- /dev/null +++ b/sumlens/signals/attribution.py @@ -0,0 +1,94 @@ +"""Signal C — Inseq integrated-gradients source attribution. + +Integrated gradients on the *same* model that produced the summary. For each +summary sentence we get token-level attribution back onto the source text, map +each source token to the source sentence containing it, sum the (absolute) +attribution mass per source sentence and normalise. The peak normalised mass is +the sentence's attribution score; the top-k source sentences are its supporting +spans. + +The pipeline gates this signal (runs it only on sentences flagged by A or B) for +speed — see `data-model.md` §3. The Inseq computation is isolated in +`_source_token_attributions`, which tests mock at the module boundary. +""" + +import re +from functools import lru_cache +from typing import Any + +from sumlens.types import AnalysisConfig, Document, Sentence, Summary + +_N_STEPS = 40 +_TOP_K = 5 +_SUBWORD_PREFIX = re.compile(r"^[Ġ▁]") # GPT-2 'Ġ' and SentencePiece '▁' + + +def attribute( + document: Document, summary: Summary, cfg: AnalysisConfig +) -> dict[str, tuple[float, list[str]]]: + results: dict[str, tuple[float, list[str]]] = {} + for sentence in summary.sentences: + token_attrs = _source_token_attributions(document.raw_text, sentence.text, cfg) + per_source = _aggregate_to_source_sentences(token_attrs, document.sentences) + if per_source: + peak = max(per_source.values()) + top_ids = sorted(per_source, key=lambda sid: per_source[sid], reverse=True)[:_TOP_K] + else: + peak, top_ids = 0.0, [] + results[sentence.id] = (peak, top_ids) + return results + + +def _aggregate_to_source_sentences( + token_attrs: list[tuple[int, int, float]], source_sentences: list[Sentence] +) -> dict[str, float]: + """Sum |score| of each source token into its sentence; normalise to sum 1.""" + masses: dict[str, float] = {} + for start, end, score in token_attrs: + mid = (start + end) // 2 + for sentence in source_sentences: + if sentence.char_start <= mid < sentence.char_end: + masses[sentence.id] = masses.get(sentence.id, 0.0) + abs(score) + break + total = sum(masses.values()) + if total <= 0: + return {} + return {sid: mass / total for sid, mass in masses.items()} + + +def _source_token_attributions( + source_text: str, target_text: str, cfg: AnalysisConfig +) -> list[tuple[int, int, float]]: + """Run Inseq and return source-token (char_start, char_end, score) records. + + This is the Inseq boundary — mocked in tests, verified against real weights on + HPC. Token char offsets are reconstructed by walking `source_text` because + Inseq tokens carry subword strings, not source offsets. + """ + import numpy as np + + model = _get_attributor(cfg.summariser, cfg.attribution_method) + out = model.attribute(source_text, target_text, n_steps=_N_STEPS, show_progress=False) + seq = out.sequence_attributions[0] + matrix = np.asarray(seq.source_attributions, dtype=float) + per_token = matrix.sum(axis=tuple(range(1, matrix.ndim))) if matrix.ndim > 1 else matrix + + records: list[tuple[int, int, float]] = [] + cursor = 0 + for token, score in zip(seq.source, per_token, strict=False): + text = _SUBWORD_PREFIX.sub("", token.token) + if not text: + continue + idx = source_text.find(text, cursor) + if idx < 0: + continue + records.append((idx, idx + len(text), float(score))) + cursor = idx + len(text) + return records + + +@lru_cache(maxsize=1) +def _get_attributor(model_name: str, method: str) -> Any: + import inseq + + return inseq.load_model(model_name, method) diff --git a/sumlens/signals/classifier.py b/sumlens/signals/classifier.py new file mode 100644 index 0000000..7b874d8 --- /dev/null +++ b/sumlens/signals/classifier.py @@ -0,0 +1,57 @@ +"""Signal A — LettuceDetect hallucination classifier wrapper. + +Thin wrapper around `lettucedetect`. For each summary sentence we run the detector +with the source document as context and the sentence as the answer. The score is +the mean of the top-k per-token hallucination probabilities (`output_format= +"tokens"` gives `{token, pred, prob}`); the char spans come from `output_format= +"spans"` (`{start, end, confidence, text}`), which are offsets within the summary +sentence. Two calls because token output carries no char offsets (verified against +real weights). +""" + +from functools import lru_cache +from typing import Any + +from sumlens.types import AnalysisConfig, Document, Summary + +_TOP_K = 3 + + +def classify( + document: Document, summary: Summary, cfg: AnalysisConfig +) -> dict[str, tuple[float, list[tuple[int, int]]]]: + detector = _get_detector(cfg.classifier_model) + results: dict[str, tuple[float, list[tuple[int, int]]]] = {} + for sentence in summary.sentences: + tokens = detector.predict( + context=[document.raw_text], + question="", + answer=sentence.text, + output_format="tokens", + ) + spans = detector.predict( + context=[document.raw_text], + question="", + answer=sentence.text, + output_format="spans", + ) + score = _aggregate(tokens) + token_spans = [(s["start"], s["end"]) for s in spans] + results[sentence.id] = (score, token_spans) + return results + + +def _aggregate(tokens: list[dict[str, Any]]) -> float: + """Mean of the top-k token hallucination probabilities (0.0 if no tokens).""" + probs = sorted((float(t["prob"]) for t in tokens), reverse=True) + if not probs: + return 0.0 + top = probs[:_TOP_K] + return sum(top) / len(top) + + +@lru_cache(maxsize=1) +def _get_detector(model_path: str) -> Any: + from lettucedetect.models.inference import HallucinationDetector + + return HallucinationDetector(method="transformer", model_path=model_path) diff --git a/sumlens/signals/nli.py b/sumlens/signals/nli.py new file mode 100644 index 0000000..aa42c13 --- /dev/null +++ b/sumlens/signals/nli.py @@ -0,0 +1,87 @@ +"""Signal B — atomic-claim NLI against the source. + +Claims start simple: each summary sentence is split into clauses on coordinating +conjunctions, and each clause is treated as an atomic claim. For each claim we run +NLI against every source sentence and keep the max entailment probability +(SummaC-Conv style). Per summary sentence the score is the min over its claims +(the weakest claim wins); claims whose entailment falls below `_ENTAIL_FAIL` are +returned as the failing claims for the UI. +""" + +import re +from collections import defaultdict +from functools import lru_cache +from typing import Any + +from sumlens.types import AnalysisConfig, Claim, Document, Summary + +_ENTAIL_FAIL = 0.5 +_BATCH_SIZE = 64 +_CLAUSE = re.compile(r",?\s+(?:and|but|or|however|whereas|while)\s+|\s*;\s*", re.IGNORECASE) + + +def extract_claims(summary: Summary) -> list[Claim]: + claims: list[Claim] = [] + for sentence in summary.sentences: + for n, clause in enumerate(_split_clauses(sentence.text), start=1): + claims.append( + Claim(id=f"{sentence.id}-claim-{n}", sentence_id=sentence.id, text=clause) + ) + return claims + + +def entail( + claims: list[Claim], document: Document, cfg: AnalysisConfig +) -> dict[str, tuple[float, list[Claim]]]: + nli = _get_nli(cfg.nli_model) + sources = [s.text for s in document.sentences] + scored: dict[str, list[tuple[Claim, float]]] = defaultdict(list) + + if claims and sources: + # One batched NLI call over every (source, claim) pair, then reduce + # max-over-sources per claim. Batching is far faster than per-pair calls + # on GPU; the resulting scores are identical. + pairs = [ + {"text": src, "text_pair": claim.text} for claim in claims for src in sources + ] + batched = nli(pairs, top_k=None, batch_size=_BATCH_SIZE) + n = len(sources) + for i, claim in enumerate(claims): + prob = max( + (_entail_prob(scores) for scores in batched[i * n : (i + 1) * n]), + default=0.0, + ) + scored[claim.sentence_id].append((claim, prob)) + elif claims: + for claim in claims: + scored[claim.sentence_id].append((claim, 0.0)) + + results: dict[str, tuple[float, list[Claim]]] = {} + for sentence_id, claim_scores in scored.items(): + sentence_score = min(p for _, p in claim_scores) + failed = [c for c, p in claim_scores if p < _ENTAIL_FAIL] + results[sentence_id] = (sentence_score, failed) + return results + + +def _split_clauses(text: str) -> list[str]: + parts = [p.strip() for p in _CLAUSE.split(text)] + parts = [p for p in parts if p] + return parts or [text.strip()] + + +def _entail_prob(scores: list[dict[str, Any]]) -> float: + return next((s["score"] for s in scores if "entail" in s["label"].lower()), 0.0) + + +@lru_cache(maxsize=1) +def _get_nli(model_name: str) -> Any: + import torch + from transformers import pipeline + + on_gpu = torch.cuda.is_available() + kwargs: dict[str, Any] = {"device": 0 if on_gpu else -1} + if on_gpu: + # bf16 ~2x faster on L40S/Ada; entailment scores drift negligibly. + kwargs["torch_dtype"] = torch.bfloat16 + return pipeline("text-classification", model=model_name, **kwargs) diff --git a/sumlens/summarise.py b/sumlens/summarise.py new file mode 100644 index 0000000..e5afdf0 --- /dev/null +++ b/sumlens/summarise.py @@ -0,0 +1,46 @@ +"""Summarisation — Document into a Summary via a local transformers pipeline. + +The model runs locally (no external inference API). The pipeline is built lazily +and cached so tests can mock `_get_summariser` at the module boundary and never +load weights. The output summary is re-tokenised with NLTK Punkt into sentences +with stable ids `sum-0000`, `sum-0001`, ... +""" + +from functools import lru_cache +from typing import Any + +from sumlens.ingest import split_sentences +from sumlens.types import AnalysisConfig, Document, Summary + + +def summarise(document: Document, cfg: AnalysisConfig) -> Summary: + summariser = _get_summariser(cfg.summariser) + max_length, min_length = _length_bounds(cfg.summary_target_words) + output = summariser( + document.raw_text, + max_length=max_length, + min_length=min_length, + truncation=True, + ) + text = output[0]["summary_text"].strip() + return Summary( + id=f"{document.id}-summary", + document_id=document.id, + text=text, + sentences=split_sentences(text, "sum"), + model_name=cfg.summariser, + ) + + +def _length_bounds(target_words: int) -> tuple[int, int]: + """Words to a token max/min band (~1.3 tokens/word; min at ~60% of target).""" + return int(target_words * 1.3), int(target_words * 0.6) + + +@lru_cache(maxsize=1) +def _get_summariser(model_name: str) -> Any: + import torch + from transformers import pipeline + + device = 0 if torch.cuda.is_available() else -1 + return pipeline("summarization", model=model_name, device=device) diff --git a/tests/test_attribution.py b/tests/test_attribution.py new file mode 100644 index 0000000..02ad97d --- /dev/null +++ b/tests/test_attribution.py @@ -0,0 +1,75 @@ +"""Attribution (signal C) tests — Inseq mocked at the `_source_token_attributions` boundary.""" + +import pytest + +from sumlens.signals import attribution as attribution_mod +from sumlens.signals.attribution import _aggregate_to_source_sentences, attribute +from sumlens.types import AnalysisConfig, Document, Sentence, Summary + +# raw_text = "Alpha beta. Gamma delta. Epsilon zeta." +# 0 12 25 +_SOURCE = [ + Sentence(id="src-0000", text="Alpha beta.", char_start=0, char_end=11), + Sentence(id="src-0001", text="Gamma delta.", char_start=12, char_end=24), + Sentence(id="src-0002", text="Epsilon zeta.", char_start=25, char_end=38), +] + +# (char_start, char_end, score) source-token records per target sentence text. +# Negative score on src-0001 exercises the abs-mass aggregation. +_ATTRS: dict[str, list[tuple[int, int, float]]] = { + "Heavily grounded.": [(0, 5, 0.2), (12, 17, -0.6), (25, 32, 0.2)], + "No support.": [], +} + + +def _document() -> Document: + return Document( + id="doc-1", + raw_text="Alpha beta. Gamma delta. Epsilon zeta.", + sentences=_SOURCE, + source="text", + ) + + +def _summary() -> Summary: + return Summary( + id="doc-1-summary", + document_id="doc-1", + text="Heavily grounded. No support.", + sentences=[ + Sentence(id="sum-0000", text="Heavily grounded.", char_start=0, char_end=17), + Sentence(id="sum-0001", text="No support.", char_start=18, char_end=29), + ], + model_name="facebook/bart-large-cnn", + ) + + +def test_attribute(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + attribution_mod, + "_source_token_attributions", + lambda source_text, target_text, cfg: _ATTRS[target_text], + ) + + result = attribute(_document(), _summary(), AnalysisConfig()) + + peak, top_ids = result["sum-0000"] + # abs masses 0.2 / 0.6 / 0.2 -> normalised 0.2 / 0.6 / 0.2; peak = 0.6 + assert peak == pytest.approx(0.6) + assert top_ids == ["src-0001", "src-0000", "src-0002"] + + assert result["sum-0001"] == (0.0, []) + + +def test_aggregate_normalises_to_one() -> None: + masses = _aggregate_to_source_sentences([(0, 5, 0.2), (12, 17, -0.6), (25, 32, 0.2)], _SOURCE) + assert sum(masses.values()) == pytest.approx(1.0) + assert masses == { + "src-0000": pytest.approx(0.2), + "src-0001": pytest.approx(0.6), + "src-0002": pytest.approx(0.2), + } + + +def test_aggregate_no_tokens() -> None: + assert _aggregate_to_source_sentences([], _SOURCE) == {} diff --git a/tests/test_classifier.py b/tests/test_classifier.py new file mode 100644 index 0000000..8317962 --- /dev/null +++ b/tests/test_classifier.py @@ -0,0 +1,83 @@ +"""Classifier (signal A) tests — LettuceDetect mocked at the `_get_detector` boundary.""" + +import pytest + +from sumlens.signals import classifier as classifier_mod +from sumlens.signals.classifier import _aggregate, classify +from sumlens.types import AnalysisConfig, Document, Sentence, Summary + +# LettuceDetect "tokens" output: {token, pred, prob} — no char offsets. +_GROUNDED_TOKENS: list[dict[str, object]] = [ + {"token": "a", "pred": 0, "prob": 0.05}, + {"token": "b", "pred": 0, "prob": 0.02}, +] +_HALLUCINATED_TOKENS: list[dict[str, object]] = [ + {"token": "a", "pred": 1, "prob": 0.91}, + {"token": "b", "pred": 1, "prob": 0.84}, + {"token": "c", "pred": 0, "prob": 0.10}, +] +# "spans" output: {start, end, confidence, text} — the char offsets. +_HALLUCINATED_SPANS: list[dict[str, object]] = [ + {"start": 0, "end": 4, "confidence": 0.9, "text": "Inve"}, + {"start": 5, "end": 9, "confidence": 0.8, "text": "nted"}, +] + + +class _FakeDetector: + def predict( + self, *, context: list[str], question: str, answer: str, output_format: str + ) -> list[dict[str, object]]: + grounded = answer == "Grounded claim here." + if output_format == "spans": + return [] if grounded else _HALLUCINATED_SPANS + return _GROUNDED_TOKENS if grounded else _HALLUCINATED_TOKENS + + +def _summary() -> Summary: + return Summary( + id="doc-1-summary", + document_id="doc-1", + text="Grounded claim here. Invented figure cited.", + sentences=[ + Sentence(id="sum-0000", text="Grounded claim here.", char_start=0, char_end=20), + Sentence(id="sum-0001", text="Invented figure cited.", char_start=21, char_end=43), + ], + model_name="facebook/bart-large-cnn", + ) + + +def _document() -> Document: + return Document(id="doc-1", raw_text="Some source text.", sentences=[], source="text") + + +def test_classify(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(classifier_mod, "_get_detector", lambda model_path: _FakeDetector()) + + result = classify(_document(), _summary(), AnalysisConfig()) + + assert set(result) == {"sum-0000", "sum-0001"} + + grounded_score, grounded_spans = result["sum-0000"] + assert grounded_score == pytest.approx((0.05 + 0.02) / 2) + assert grounded_spans == [] + + halluc_score, halluc_spans = result["sum-0001"] + assert halluc_score == pytest.approx((0.91 + 0.84 + 0.10) / 3) + assert halluc_spans == [(0, 4), (5, 9)] + + +def test_classify_empty_summary(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(classifier_mod, "_get_detector", lambda model_path: _FakeDetector()) + summary = Summary( + id="s", document_id="doc-1", text="", sentences=[], model_name="m" + ) + assert classify(_document(), summary, AnalysisConfig()) == {} + + +def test_aggregate_uses_top_k() -> None: + tokens = [{"prob": p} for p in (0.9, 0.8, 0.7, 0.6, 0.1)] + assert _aggregate(tokens) == pytest.approx((0.9 + 0.8 + 0.7) / 3) + + +def test_aggregate_empty() -> None: + assert _aggregate([]) == 0.0 diff --git a/tests/test_fuse.py b/tests/test_fuse.py new file mode 100644 index 0000000..9a366e9 --- /dev/null +++ b/tests/test_fuse.py @@ -0,0 +1,87 @@ +"""Fusion tests — identity fallback, trained model path, calibration, labelling.""" + +import pickle +from pathlib import Path + +import pytest + +from sumlens.fuse import _feature_vector, calibrate, fit_fusion, fit_platt, fuse, label +from sumlens.types import AnalysisConfig, SignalScores + +_UNUSED = Path("does-not-exist.pkl") + + +def test_fuse_all_signals() -> None: + signals = {"sum-0000": SignalScores(classifier=0.2, nli=0.6, attribution=0.7)} + # grounding: (1 - 0.2) + 0.6 + 0.7 = 2.1 / 3 = 0.7 + assert fuse(signals, _UNUSED)["sum-0000"] == pytest.approx(0.7) + + +def test_fuse_partial_signals() -> None: + signals = {"sum-0000": SignalScores(classifier=0.1, nli=None, attribution=0.5)} + # (1 - 0.1) + 0.5 = 1.4 / 2 = 0.7 + assert fuse(signals, _UNUSED)["sum-0000"] == pytest.approx(0.7) + + +def test_fuse_no_signals_is_neutral() -> None: + signals = {"sum-0000": SignalScores(classifier=None, nli=None, attribution=None)} + assert fuse(signals, _UNUSED)["sum-0000"] == pytest.approx(0.5) + + +def test_calibrate_is_passthrough() -> None: + scores = {"sum-0000": 0.42, "sum-0001": 0.91} + assert calibrate(scores, _UNUSED) == scores + + +@pytest.mark.parametrize( + ("score", "expected"), + [ + (0.0, "hallucinated"), + (0.29, "hallucinated"), + (0.30, "weak"), # boundary: not < tau_hallucinated + (0.50, "weak"), + (0.69, "weak"), + (0.70, "grounded"), # boundary: >= tau_grounded + (1.0, "grounded"), + ], +) +def test_label_thresholds(score: float, expected: str) -> None: + assert label(score, AnalysisConfig()) == expected + + +def test_feature_vector_imputes_missing() -> None: + assert _feature_vector(SignalScores(classifier=0.2, nli=None, attribution=0.7)) == [ + 0.2, + 0.5, + 0.7, + ] + + +# Separable toy data: grounded (1) = low classifier, high nli/attribution; flipped for 0. +_X = [[0.1, 0.9, 0.9], [0.05, 0.95, 0.85], [0.9, 0.1, 0.1], [0.95, 0.2, 0.05]] * 5 +_Y = [1, 1, 0, 0] * 5 + + +def test_fuse_uses_trained_model(tmp_path: Path) -> None: + model = fit_fusion(_X, _Y) + path = tmp_path / "fusion.pkl" + with path.open("wb") as fh: + pickle.dump(model, fh) + + signals = { + "g": SignalScores(classifier=0.1, nli=0.9, attribution=0.9), + "h": SignalScores(classifier=0.9, nli=0.1, attribution=0.1), + } + out = fuse(signals, path) + assert out["g"] > 0.5 > out["h"] # grounded scores higher than hallucinated + + +def test_calibrate_uses_trained_platt(tmp_path: Path) -> None: + platt = fit_platt([0.1, 0.2, 0.8, 0.9] * 5, [0, 0, 1, 1] * 5) + path = tmp_path / "platt.pkl" + with path.open("wb") as fh: + pickle.dump(platt, fh) + + out = calibrate({"a": 0.85, "b": 0.15}, path) + assert 0.0 <= out["a"] <= 1.0 + assert out["a"] > out["b"] diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..487e8b8 --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,72 @@ +"""Ingestion tests — fixture string and fixture PDF. + +Checks paragraph segmentation, sentence count, stable ids, and that every +sentence's char offsets reconstruct its text from `Document.raw_text`. +""" + +from pathlib import Path + +from fpdf import FPDF + +from sumlens.ingest import load_pdf, load_text +from sumlens.types import Document + + +def _assert_offsets_reconstruct(doc: Document) -> None: + for sent in doc.sentences: + assert doc.raw_text[sent.char_start : sent.char_end] == sent.text + + +def test_load_text_segments_paragraphs_and_sentences() -> None: + text = "The bill passed. It allocates funds.\n\nA second paragraph here." + doc = load_text(text) + + assert doc.source == "text" + assert doc.raw_text.split("\n\n") == [ + "The bill passed. It allocates funds.", + "A second paragraph here.", + ] + assert [s.text for s in doc.sentences] == [ + "The bill passed.", + "It allocates funds.", + "A second paragraph here.", + ] + assert [s.id for s in doc.sentences] == ["src-0000", "src-0001", "src-0002"] + assert doc.meta["word_count"] == 10 + _assert_offsets_reconstruct(doc) + + +def test_load_text_collapses_internal_whitespace() -> None: + doc = load_text("A line\nwrapped awkwardly.\n\n\n Next para. ") + + assert doc.raw_text == "A line wrapped awkwardly.\n\nNext para." + _assert_offsets_reconstruct(doc) + + +def test_load_text_empty_has_no_sentences() -> None: + doc = load_text(" \n\n ") + + assert doc.raw_text == "" + assert doc.sentences == [] + assert doc.meta["word_count"] == 0 + + +def test_load_pdf(tmp_path: Path) -> None: + pdf = FPDF() + pdf.add_page() + pdf.set_font("Helvetica", size=12) + pdf.multi_cell(0, 10, text="The bill passed. It allocates funds.") + path = tmp_path / "report.pdf" + pdf.output(str(path)) + + doc = load_pdf(path) + + assert doc.source == "pdf" + assert doc.id == "report" + assert doc.meta["filename"] == "report.pdf" + assert [s.text for s in doc.sentences] == [ + "The bill passed.", + "It allocates funds.", + ] + assert [s.id for s in doc.sentences] == ["src-0000", "src-0001"] + _assert_offsets_reconstruct(doc) diff --git a/tests/test_nli.py b/tests/test_nli.py new file mode 100644 index 0000000..0c02ce5 --- /dev/null +++ b/tests/test_nli.py @@ -0,0 +1,95 @@ +"""NLI (signal B) tests — NLI model mocked at the `_get_nli` boundary.""" + +import pytest + +from sumlens.signals import nli as nli_mod +from sumlens.signals.nli import _split_clauses, entail, extract_claims +from sumlens.types import AnalysisConfig, Claim, Document, Sentence, Summary + + +def _summary(text: str) -> Summary: + return Summary( + id="doc-1-summary", + document_id="doc-1", + text=text, + sentences=[Sentence(id="sum-0000", text=text, char_start=0, char_end=len(text))], + model_name="m", + ) + + +def test_extract_claims_splits_on_conjunction() -> None: + claims = extract_claims(_summary("The bill passed and funds were allocated.")) + assert [(c.id, c.text) for c in claims] == [ + ("sum-0000-claim-1", "The bill passed"), + ("sum-0000-claim-2", "funds were allocated."), + ] + assert all(c.sentence_id == "sum-0000" for c in claims) + + +def test_extract_claims_single_clause() -> None: + claims = extract_claims(_summary("The bill passed today.")) + assert [(c.id, c.text) for c in claims] == [("sum-0000-claim-1", "The bill passed today.")] + + +def test_split_clauses_comma_and() -> None: + assert _split_clauses("X happened, and Y followed") == ["X happened", "Y followed"] + assert _split_clauses("only one") == ["only one"] + + +# Entailment lookup: (premise, hypothesis) -> entailment prob. +_TABLE = { + ("Src A.", "The bill passed"): 0.8, + ("Src B.", "The bill passed"): 0.4, + ("Src A.", "funds were allocated"): 0.2, + ("Src B.", "funds were allocated"): 0.3, +} + + +class _FakeNLI: + def __call__( + self, pairs: list[dict[str, str]], top_k: object = None, batch_size: object = None + ) -> list[list[dict[str, object]]]: + out: list[list[dict[str, object]]] = [] + for pair in pairs: + ent = _TABLE[(pair["text"], pair["text_pair"])] + out.append( + [ + {"label": "entailment", "score": ent}, + {"label": "neutral", "score": 1.0 - ent}, + {"label": "contradiction", "score": 0.0}, + ] + ) + return out + + +def _document() -> Document: + return Document( + id="doc-1", + raw_text="Src A. Src B.", + sentences=[ + Sentence(id="src-0000", text="Src A.", char_start=0, char_end=6), + Sentence(id="src-0001", text="Src B.", char_start=7, char_end=13), + ], + source="text", + ) + + +def test_entail_max_over_sources_min_over_claims(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(nli_mod, "_get_nli", lambda model_name: _FakeNLI()) + claims = [ + Claim(id="sum-0000-claim-1", sentence_id="sum-0000", text="The bill passed"), + Claim(id="sum-0000-claim-2", sentence_id="sum-0000", text="funds were allocated"), + ] + + result = entail(claims, _document(), AnalysisConfig()) + + score, failed = result["sum-0000"] + # claim-1 max-over-sources = 0.8, claim-2 = 0.3 -> sentence min = 0.3 + assert score == pytest.approx(0.3) + # only claim-2 (0.3) is below the 0.5 fail threshold + assert [c.id for c in failed] == ["sum-0000-claim-2"] + + +def test_entail_empty_claims(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(nli_mod, "_get_nli", lambda model_name: _FakeNLI()) + assert entail([], _document(), AnalysisConfig()) == {} diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..31093b4 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,149 @@ +"""Pipeline integration test — real orchestration + signal logic, all models mocked. + +A ~100-word fixture document runs end-to-end through analyse(). Mocks sit at each +model boundary (summariser, classifier detector, NLI, Inseq attribution) so no +weights load. Exercises the A/B-gated attribution path and the full assembly. +""" + +from collections.abc import Callable + +import pytest + +from sumlens import summarise as summarise_mod +from sumlens.ingest import split_sentences +from sumlens.pipeline import analyse +from sumlens.signals import attribution as attribution_mod +from sumlens.signals import classifier as classifier_mod +from sumlens.signals import nli as nli_mod +from sumlens.types import AnalysisConfig, AnalysisResult, Document + +_RAW = ( + "The parliament met on Monday to discuss the proposed national budget for the " + "coming fiscal year. Lawmakers from every party debated the spending priorities " + "for several hours without reaching a clear consensus on the final allocations. " + "The finance minister presented projections covering health, education, and " + "transport infrastructure across the regions. Several members raised concerns " + "about the long term sustainability of the proposed deficit levels. No final " + "figure for total expenditure was announced to the press by the end of the day." +) + +_SUMMARY_TEXT = "The bill passed today. The budget is one trillion euros." + +# Signal A: LettuceDetect "tokens" output ({token,pred,prob}) and "spans" output. +_GROUNDED_TOKENS: list[dict[str, object]] = [ + {"token": "a", "pred": 0, "prob": 0.10}, + {"token": "b", "pred": 0, "prob": 0.05}, +] +_HALLUCINATED_TOKENS: list[dict[str, object]] = [ + {"token": "a", "pred": 1, "prob": 0.95}, + {"token": "b", "pred": 1, "prob": 0.90}, + {"token": "c", "pred": 1, "prob": 0.85}, +] +_HALLUCINATED_SPANS: list[dict[str, object]] = [ + {"start": 0, "end": 3, "confidence": 0.9, "text": "x"}, + {"start": 4, "end": 7, "confidence": 0.9, "text": "y"}, + {"start": 8, "end": 12, "confidence": 0.9, "text": "z"}, +] + + +class _FakeDetector: + def predict( + self, *, context: list[str], question: str, answer: str, output_format: str + ) -> list[dict[str, object]]: + grounded = answer.startswith("The bill") + if output_format == "spans": + return [] if grounded else _HALLUCINATED_SPANS + return _GROUNDED_TOKENS if grounded else _HALLUCINATED_TOKENS + + +class _FakeNLI: + def __call__( + self, pairs: list[dict[str, str]], top_k: object = None, batch_size: object = None + ) -> list[list[dict[str, object]]]: + out: list[list[dict[str, object]]] = [] + for pair in pairs: + ent = 0.9 if "bill" in pair["text_pair"] else 0.2 + out.append( + [{"label": "entailment", "score": ent}, {"label": "neutral", "score": 1.0 - ent}] + ) + return out + + +def _fake_summariser(model_name: str) -> Callable[..., list[dict[str, str]]]: + def _pipeline(text: str, **kwargs: object) -> list[dict[str, str]]: + return [{"summary_text": _SUMMARY_TEXT}] + + return _pipeline + + +@pytest.fixture +def fixture_document() -> Document: + return Document( + id="doc-1", + raw_text=_RAW, + sentences=split_sentences(_RAW, "src"), + source="text", + meta={"word_count": len(_RAW.split())}, + ) + + +def _install_mocks(monkeypatch: pytest.MonkeyPatch, document: Document) -> None: + monkeypatch.setattr(summarise_mod, "_get_summariser", _fake_summariser) + monkeypatch.setattr(classifier_mod, "_get_detector", lambda model_path: _FakeDetector()) + monkeypatch.setattr(nli_mod, "_get_nli", lambda model_name: _FakeNLI()) + + src = document.sentences + + def _fake_attr(source_text: str, target_text: str, cfg: object) -> list[tuple[int, int, float]]: + # spread equal mass over the first three source sentences -> peak 1/3 + return [(s.char_start, s.char_start + 4, 0.1) for s in src[:3]] + + monkeypatch.setattr(attribution_mod, "_source_token_attributions", _fake_attr) + + +def test_analyse_end_to_end( + monkeypatch: pytest.MonkeyPatch, fixture_document: Document +) -> None: + _install_mocks(monkeypatch, fixture_document) + + result = analyse(fixture_document, AnalysisConfig()) + + assert isinstance(result, AnalysisResult) + assert len(result.summary.sentences) == 2 + assert len(result.verdicts) == 2 + + grounded, flagged = result.verdicts + + # Sentence 0: A low, B high -> not gated, attribution skipped, grounded. + assert grounded.sentence_id == "sum-0000" + assert grounded.label == "grounded" + assert grounded.signals.attribution is None + assert grounded.evidence.top_source_sentence_ids == [] + assert grounded.evidence.failed_claims == [] + + # Sentence 1: A high -> gated, attribution runs, hallucinated. + assert flagged.sentence_id == "sum-0001" + assert flagged.label == "hallucinated" + assert flagged.signals.classifier == pytest.approx(0.9) + assert flagged.signals.nli == pytest.approx(0.2) + assert flagged.signals.attribution == pytest.approx(1 / 3) + assert flagged.evidence.classifier_token_spans == [(0, 3), (4, 7), (8, 12)] + assert flagged.evidence.top_source_sentence_ids == ["src-0000", "src-0001", "src-0002"] + assert len(flagged.evidence.failed_claims) == 1 + + +def test_analyse_records_stage_timings( + monkeypatch: pytest.MonkeyPatch, fixture_document: Document +) -> None: + _install_mocks(monkeypatch, fixture_document) + result = analyse(fixture_document, AnalysisConfig()) + assert set(result.timings_ms) == {"summarise", "classify", "nli", "attribute", "fuse"} + assert all(v >= 0 for v in result.timings_ms.values()) + + +def test_analysis_result_round_trips( + monkeypatch: pytest.MonkeyPatch, fixture_document: Document +) -> None: + _install_mocks(monkeypatch, fixture_document) + result = analyse(fixture_document, AnalysisConfig()) + assert AnalysisResult.model_validate_json(result.model_dump_json()) == result diff --git a/tests/test_summarise.py b/tests/test_summarise.py new file mode 100644 index 0000000..ca75078 --- /dev/null +++ b/tests/test_summarise.py @@ -0,0 +1,61 @@ +"""Summarise tests — model mocked at the `_get_summariser` boundary (no weights).""" + +import pytest + +from sumlens import summarise as summarise_mod +from sumlens.summarise import _length_bounds, summarise +from sumlens.types import AnalysisConfig, Document, Sentence + + +def _doc() -> Document: + return Document( + id="doc-1", + raw_text="A long source document about a bill that passed today.", + sentences=[ + Sentence(id="src-0000", text="A long source document.", char_start=0, char_end=23) + ], + source="text", + ) + + +def test_summarise_builds_summary(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_pipeline(text: str, **kwargs: object) -> list[dict[str, str]]: + return [{"summary_text": "The bill passed. It allocates funds."}] + + monkeypatch.setattr(summarise_mod, "_get_summariser", lambda model_name: fake_pipeline) + + cfg = AnalysisConfig() + summary = summarise(_doc(), cfg) + + assert summary.document_id == "doc-1" + assert summary.id == "doc-1-summary" + assert summary.model_name == cfg.summariser + assert summary.text == "The bill passed. It allocates funds." + assert [s.text for s in summary.sentences] == ["The bill passed.", "It allocates funds."] + assert [s.id for s in summary.sentences] == ["sum-0000", "sum-0001"] + for sent in summary.sentences: + assert summary.text[sent.char_start : sent.char_end] == sent.text + + +def test_summarise_forwards_length_and_truncation(monkeypatch: pytest.MonkeyPatch) -> None: + captured: dict[str, object] = {} + + def fake_pipeline(text: str, **kwargs: object) -> list[dict[str, str]]: + captured.update(kwargs) + captured["text"] = text + return [{"summary_text": "Short."}] + + monkeypatch.setattr(summarise_mod, "_get_summariser", lambda model_name: fake_pipeline) + + cfg = AnalysisConfig(summary_target_words=150) + summarise(_doc(), cfg) + + assert captured["text"] == _doc().raw_text + assert captured["truncation"] is True + assert captured["max_length"] == 195 + assert captured["min_length"] == 90 + + +def test_length_bounds() -> None: + assert _length_bounds(150) == (195, 90) + assert _length_bounds(100) == (130, 60)