Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions sumlens/fuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Signal fusion, calibration, and labelling.

Fusion combines the three sentence-level signals into one grounding probability.
If a trained model is present at `model_path` it is used; otherwise we fall back to
an identity fusion (mean of grounding-oriented signals) so the pipeline still runs
without weights (CI, fresh checkout). Same idea for Platt calibration.

Convention: class 1 = **grounded**. The fusion model is trained with `grounded`
labels (1 if the summary sentence is grounded, 0 if hallucinated), so its
`predict_proba[:, 1]` is the grounding score that `label` thresholds. Feature order
is `FEATURE_ORDER`; missing signals are imputed to neutral 0.5.

Scikit-learn is imported lazily so the pipeline does not depend on it unless a
trained model is actually loaded or fitted.
"""

import pickle
from pathlib import Path
from typing import Any, Literal

from sumlens.types import AnalysisConfig, SignalScores

_NEUTRAL = 0.5
FEATURE_ORDER = ("classifier", "nli", "attribution")


def fuse(signals: dict[str, SignalScores], model_path: Path) -> dict[str, float]:
if not model_path.exists():
return {sentence_id: _grounding(score) for sentence_id, score in signals.items()}
model = _load(model_path)
ids = list(signals)
features = [_feature_vector(signals[i]) for i in ids]
grounded_proba = model.predict_proba(features)[:, 1]
return {i: float(p) for i, p in zip(ids, grounded_proba, strict=True)}


def calibrate(scores: dict[str, float], platt_path: Path) -> dict[str, float]:
if not platt_path.exists():
return dict(scores)
platt = _load(platt_path)
ids = list(scores)
calibrated = platt.predict_proba([[scores[i]] for i in ids])[:, 1]
return {i: float(c) for i, c in zip(ids, calibrated, strict=True)}


def label(score: float, cfg: AnalysisConfig) -> Literal["grounded", "weak", "hallucinated"]:
if score < cfg.tau_hallucinated:
return "hallucinated"
if score >= cfg.tau_grounded:
return "grounded"
return "weak"


def fit_fusion(features: list[list[float]], grounded: list[int]) -> Any:
"""Fit the fusion LogisticRegression. `grounded` = 1 if grounded, 0 if hallucinated."""
from sklearn.linear_model import LogisticRegression

model = LogisticRegression(max_iter=1000)
model.fit(features, grounded)
return model


def fit_platt(scores: list[float], grounded: list[int]) -> Any:
"""Fit a 1-D Platt calibrator mapping a fused score to a calibrated grounding prob."""
from sklearn.linear_model import LogisticRegression

model = LogisticRegression(max_iter=1000)
model.fit([[s] for s in scores], grounded)
return model


def _feature_vector(signals: SignalScores) -> list[float]:
"""Signals in FEATURE_ORDER; missing values imputed to neutral 0.5."""
values = signals.model_dump()
return [_NEUTRAL if values[name] is None else float(values[name]) for name in FEATURE_ORDER]


def _grounding(signals: SignalScores) -> float:
"""Identity fusion: mean of available grounding-oriented signals; 0.5 if none."""
contributions = []
if signals.classifier is not None:
contributions.append(1.0 - signals.classifier)
if signals.nli is not None:
contributions.append(signals.nli)
if signals.attribution is not None:
contributions.append(signals.attribution)
if not contributions:
return _NEUTRAL
return sum(contributions) / len(contributions)


def _load(path: Path) -> Any:
with path.open("rb") as fh:
return pickle.load(fh)
74 changes: 74 additions & 0 deletions sumlens/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Ingestion — PDF or raw text into a `Document`.

PDF text is extracted with pdfplumber. Text is cleaned and paragraph-segmented on
blank lines, then split into sentences with NLTK Punkt. Sentence ids are stable
`src-0000`, `src-0001`, ... and carry char offsets into `Document.raw_text`.
"""

import re
from pathlib import Path
from typing import Any

import nltk
import pdfplumber

from sumlens.types import Document, Sentence

_BLANK_LINE = re.compile(r"\n\s*\n")
_WHITESPACE = re.compile(r"\s+")


def load_pdf(path: Path) -> Document:
with pdfplumber.open(path) as pdf:
pages = [page.extract_text() or "" for page in pdf.pages]
raw_text = _clean("\n\n".join(pages))
meta: dict[str, Any] = {"filename": path.name, "word_count": _word_count(raw_text)}
return Document(
id=path.stem,
raw_text=raw_text,
sentences=split_sentences(raw_text, "src"),
source="pdf",
meta=meta,
)


def load_text(text: str) -> Document:
raw_text = _clean(text)
meta: dict[str, Any] = {"word_count": _word_count(raw_text)}
return Document(
id="text",
raw_text=raw_text,
sentences=split_sentences(raw_text, "src"),
source="text",
meta=meta,
)


def _clean(text: str) -> str:
"""Collapse each blank-line-delimited paragraph onto one line; join with \\n\\n."""
paragraphs = []
for para in _BLANK_LINE.split(text):
collapsed = _WHITESPACE.sub(" ", para).strip()
if collapsed:
paragraphs.append(collapsed)
return "\n\n".join(paragraphs)


def split_sentences(text: str, id_prefix: str) -> list[Sentence]:
"""NLTK Punkt sentence split with char offsets; ids `{id_prefix}-0000`, ..."""
if not text:
return []
tokenizer = nltk.data.load("tokenizers/punkt/english.pickle")
return [
Sentence(
id=f"{id_prefix}-{i:04d}",
text=text[start:end],
char_start=start,
char_end=end,
)
for i, (start, end) in enumerate(tokenizer.span_tokenize(text))
]


def _word_count(raw_text: str) -> int:
return len(raw_text.split())
111 changes: 111 additions & 0 deletions sumlens/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Pipeline orchestration — Document into a full AnalysisResult.

Stages: summarise -> signal A (classifier) and signal B (NLI) -> gate signal C
(attribution) on the sentences A or B flag as suspicious -> fuse -> calibrate ->
label -> assemble evidence. Each stage is timed into `timings_ms`.

A and B run sequentially here; the data-model calls for them "in parallel", which
is a latency optimisation, not a correctness requirement, so it is deferred.
"""

import time
from collections.abc import Callable
from pathlib import Path
from typing import TypeVar

from sumlens.fuse import calibrate, fuse, label
from sumlens.signals.attribution import attribute
from sumlens.signals.classifier import classify
from sumlens.signals.nli import entail, extract_claims
from sumlens.summarise import summarise
from sumlens.types import (
AnalysisConfig,
AnalysisResult,
Claim,
Document,
Evidence,
SentenceVerdict,
SignalScores,
Summary,
)

_MODELS_DIR = Path(__file__).resolve().parent.parent / "models"
_FUSION_MODEL_PATH = _MODELS_DIR / "fusion.pkl"
_PLATT_MODEL_PATH = _MODELS_DIR / "platt.pkl"
_C_GATE = 0.5 # run attribution where classifier >= gate (A high) or nli < gate (B low)

T = TypeVar("T")


def analyse(document: Document, cfg: AnalysisConfig) -> AnalysisResult:
timings: dict[str, int] = {}

summary = _timed(timings, "summarise", lambda: summarise(document, cfg))
classifier_out = _timed(timings, "classify", lambda: classify(document, summary, cfg))
nli_out = _timed(timings, "nli", lambda: entail(extract_claims(summary), document, cfg))

gated = _gated_summary(summary, classifier_out, nli_out)
attribution_out = _timed(timings, "attribute", lambda: attribute(document, gated, cfg))

signals: dict[str, SignalScores] = {}
evidence_parts: dict[str, tuple[list[tuple[int, int]], list[Claim], list[str]]] = {}
for sentence in summary.sentences:
a_score, a_spans = classifier_out[sentence.id]
b_score, b_failed = nli_out.get(sentence.id, (None, []))
c_peak, c_top = attribution_out.get(sentence.id, (None, []))
signals[sentence.id] = SignalScores(classifier=a_score, nli=b_score, attribution=c_peak)
evidence_parts[sentence.id] = (a_spans, b_failed, c_top)

fused = _timed(
timings,
"fuse",
lambda: calibrate(fuse(signals, _FUSION_MODEL_PATH), _PLATT_MODEL_PATH),
)

verdicts = []
for sentence in summary.sentences:
a_spans, b_failed, c_top = evidence_parts[sentence.id]
score = fused[sentence.id]
verdicts.append(
SentenceVerdict(
sentence_id=sentence.id,
fused_score=score,
label=label(score, cfg),
signals=signals[sentence.id],
evidence=Evidence(
failed_claims=b_failed,
top_source_sentence_ids=c_top,
classifier_token_spans=a_spans,
),
)
)

return AnalysisResult(
document=document,
summary=summary,
verdicts=verdicts,
config=cfg,
timings_ms=timings,
)


def _gated_summary(
summary: Summary,
classifier_out: dict[str, tuple[float, list[tuple[int, int]]]],
nli_out: dict[str, tuple[float, list[Claim]]],
) -> Summary:
"""Keep only sentences A or B flag as suspicious — attribution runs on these."""
suspicious = []
for sentence in summary.sentences:
a_score = classifier_out[sentence.id][0]
b_score = nli_out.get(sentence.id, (None, []))[0]
if a_score >= _C_GATE or (b_score is not None and b_score < _C_GATE):
suspicious.append(sentence)
return summary.model_copy(update={"sentences": suspicious})


def _timed(timings: dict[str, int], name: str, fn: Callable[[], T]) -> T:
start = time.perf_counter()
result = fn()
timings[name] = int((time.perf_counter() - start) * 1000)
return result
Empty file added sumlens/signals/__init__.py
Empty file.
94 changes: 94 additions & 0 deletions sumlens/signals/attribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Signal C — Inseq integrated-gradients source attribution.

Integrated gradients on the *same* model that produced the summary. For each
summary sentence we get token-level attribution back onto the source text, map
each source token to the source sentence containing it, sum the (absolute)
attribution mass per source sentence and normalise. The peak normalised mass is
the sentence's attribution score; the top-k source sentences are its supporting
spans.

The pipeline gates this signal (runs it only on sentences flagged by A or B) for
speed — see `data-model.md` §3. The Inseq computation is isolated in
`_source_token_attributions`, which tests mock at the module boundary.
"""

import re
from functools import lru_cache
from typing import Any

from sumlens.types import AnalysisConfig, Document, Sentence, Summary

_N_STEPS = 40
_TOP_K = 5
_SUBWORD_PREFIX = re.compile(r"^[Ġ▁]") # GPT-2 'Ġ' and SentencePiece '▁'


def attribute(
document: Document, summary: Summary, cfg: AnalysisConfig
) -> dict[str, tuple[float, list[str]]]:
results: dict[str, tuple[float, list[str]]] = {}
for sentence in summary.sentences:
token_attrs = _source_token_attributions(document.raw_text, sentence.text, cfg)
per_source = _aggregate_to_source_sentences(token_attrs, document.sentences)
if per_source:
peak = max(per_source.values())
top_ids = sorted(per_source, key=lambda sid: per_source[sid], reverse=True)[:_TOP_K]
else:
peak, top_ids = 0.0, []
results[sentence.id] = (peak, top_ids)
return results


def _aggregate_to_source_sentences(
token_attrs: list[tuple[int, int, float]], source_sentences: list[Sentence]
) -> dict[str, float]:
"""Sum |score| of each source token into its sentence; normalise to sum 1."""
masses: dict[str, float] = {}
for start, end, score in token_attrs:
mid = (start + end) // 2
for sentence in source_sentences:
if sentence.char_start <= mid < sentence.char_end:
masses[sentence.id] = masses.get(sentence.id, 0.0) + abs(score)
break
total = sum(masses.values())
if total <= 0:
return {}
return {sid: mass / total for sid, mass in masses.items()}


def _source_token_attributions(
source_text: str, target_text: str, cfg: AnalysisConfig
) -> list[tuple[int, int, float]]:
"""Run Inseq and return source-token (char_start, char_end, score) records.

This is the Inseq boundary — mocked in tests, verified against real weights on
HPC. Token char offsets are reconstructed by walking `source_text` because
Inseq tokens carry subword strings, not source offsets.
"""
import numpy as np

model = _get_attributor(cfg.summariser, cfg.attribution_method)
out = model.attribute(source_text, target_text, n_steps=_N_STEPS, show_progress=False)
seq = out.sequence_attributions[0]
matrix = np.asarray(seq.source_attributions, dtype=float)
per_token = matrix.sum(axis=tuple(range(1, matrix.ndim))) if matrix.ndim > 1 else matrix

records: list[tuple[int, int, float]] = []
cursor = 0
for token, score in zip(seq.source, per_token, strict=False):
text = _SUBWORD_PREFIX.sub("", token.token)
if not text:
continue
idx = source_text.find(text, cursor)
if idx < 0:
continue
records.append((idx, idx + len(text), float(score)))
cursor = idx + len(text)
return records


@lru_cache(maxsize=1)
def _get_attributor(model_name: str, method: str) -> Any:
import inseq

return inseq.load_model(model_name, method)
Loading
Loading