diff --git a/lib/crewai/src/crewai/memory/encoding_flow.py b/lib/crewai/src/crewai/memory/encoding_flow.py index 968b439bff..a75faeacc5 100644 --- a/lib/crewai/src/crewai/memory/encoding_flow.py +++ b/lib/crewai/src/crewai/memory/encoding_flow.py @@ -118,12 +118,83 @@ def batch_embed(self) -> None: @listen(batch_embed) def intra_batch_dedup(self) -> None: - """Drop near-exact duplicates within the batch.""" + """Drop near-exact duplicates within the batch. + + Computes the pairwise cosine-similarity matrix in one vectorized pass + (normalize rows once, then a single ``X @ Xᵀ`` BLAS call) instead of the + previous O(n²) loop of pure-Python cosine calls, each of which also + recomputed both vector norms from scratch (O(n²·d)). The greedy + "first occurrence wins" selection is preserved exactly: item ``j`` is + dropped iff some earlier *kept* item is at least ``threshold`` similar. + """ items = list(self.state.items) if len(items) <= 1: return threshold = self._config.batch_dedup_threshold + + try: + import numpy as np + except ImportError: + # numpy is a transitive dependency (chromadb, lancedb); if it is + # somehow unavailable, fall back to the scalar reference. + self._dedup_scalar(items, threshold) + return + + # Only items carrying an embedding participate; pre-dropped items are + # excluded so they neither get re-dropped nor suppress others — exactly + # as the scalar reference skips them. + active: list[tuple[int, list[float]]] = [ + (idx, item.embedding) + for idx, item in enumerate(items) + if item.embedding and not item.dropped + ] + if len(active) <= 1: + return + + dim = len(active[0][1]) + if any(len(emb) != dim for _, emb in active): + # Ragged embeddings cannot form a matrix; this should not happen for + # a single embedder, but fall back to the scalar reference so the + # len-mismatch-as-zero-similarity behavior is preserved exactly. + # Warn (rather than silently degrade) since it signals an embedder + # returning variable-length vectors — an encoding bug worth surfacing. + logger.warning( + "intra_batch_dedup: ragged embeddings in batch (sample lengths: " + "%s); falling back to scalar dedup. This usually means the " + "embedder returned variable-length vectors.", + [len(emb) for _, emb in active[:5]], + ) + self._dedup_scalar(items, threshold) + return + + matrix = np.asarray([emb for _, emb in active], dtype=np.float64) + norms = np.linalg.norm(matrix, axis=1) + nonzero = norms > 0.0 + normalized = np.zeros_like(matrix) + normalized[nonzero] = matrix[nonzero] / norms[nonzero, None] + # Cosine-similarity matrix; zero-norm rows contribute 0.0, matching + # _cosine_similarity's zero-norm guard. + sims = normalized @ normalized.T + + m = len(active) + dropped = np.zeros(m, dtype=bool) + for j in range(1, m): + # Drop j iff an earlier, still-kept item is near-identical. + if bool((~dropped[:j] & (sims[:j, j] >= threshold)).any()): + dropped[j] = True + + for local_idx in range(m): + if dropped[local_idx]: + items[active[local_idx][0]].dropped = True + self.state.items_dropped_dedup += 1 + + def _dedup_scalar(self, items: list[ItemState], threshold: float) -> None: + """Reference O(n²) dedup using scalar cosine similarity. + + Retained as the exact behavioral reference and as a fallback for the + (unexpected) ragged-embedding case. + """ n = len(items) for j in range(1, n): if items[j].dropped or not items[j].embedding: diff --git a/lib/crewai/tests/memory/test_encoding_flow_dedup.py b/lib/crewai/tests/memory/test_encoding_flow_dedup.py new file mode 100644 index 0000000000..f6198df60c --- /dev/null +++ b/lib/crewai/tests/memory/test_encoding_flow_dedup.py @@ -0,0 +1,147 @@ +"""Tests for EncodingFlow.intra_batch_dedup (vectorized cosine dedup). + +The vectorized implementation must reproduce the exact drop decisions of the +original scalar O(n^2) algorithm, which is retained as ``_dedup_scalar``. +""" + +from __future__ import annotations + +import logging +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from crewai.memory.encoding_flow import EncodingFlow, ItemState +from crewai.memory.types import MemoryConfig + + +def _make_flow(threshold: float = 0.98) -> EncodingFlow: + config = MemoryConfig() + config.batch_dedup_threshold = threshold + return EncodingFlow( + storage=MagicMock(), llm=MagicMock(), embedder=MagicMock(), config=config + ) + + +def _run_vectorized(items: list[ItemState], threshold: float = 0.98) -> EncodingFlow: + flow = _make_flow(threshold) + flow.state.items = items + flow.intra_batch_dedup() + return flow + + +def test_drops_identical_keeps_distinct() -> None: + items = [ + ItemState(content="a", embedding=[0.5] * 8), + ItemState(content="b", embedding=[0.5] * 8), # identical to a -> dropped + ItemState(content="c", embedding=[1.0] + [0.0] * 7), # orthogonal -> kept + ] + flow = _run_vectorized(items) + assert [it.dropped for it in items] == [False, True, False] + assert flow.state.items_dropped_dedup == 1 + + +def test_first_occurrence_wins() -> None: + items = [ItemState(content=str(i), embedding=[0.5] * 8) for i in range(4)] + flow = _run_vectorized(items) + # Only the first survives; the rest are dropped against it. + assert [it.dropped for it in items] == [False, True, True, True] + assert flow.state.items_dropped_dedup == 3 + + +def test_items_without_embeddings_never_dropped() -> None: + items = [ + ItemState(content="a", embedding=[0.5] * 8), + ItemState(content="no-emb", embedding=[]), # never participates + ItemState(content="b", embedding=[0.5] * 8), # dup of a + ] + flow = _run_vectorized(items) + assert [it.dropped for it in items] == [False, False, True] + assert flow.state.items_dropped_dedup == 1 + + +def test_pre_dropped_item_is_skipped() -> None: + """A pre-dropped item must neither be re-counted nor suppress others.""" + a = ItemState(content="a", embedding=[0.5] * 8) + a.dropped = True # already dropped upstream + items = [ + a, + ItemState(content="b", embedding=[0.5] * 8), # same vector as the dropped a + ItemState(content="c", embedding=[0.5] * 8), # dup of b + ] + flow = _run_vectorized(items) + # 'a' stays dropped (not re-counted); 'b' becomes the surviving original; + # 'c' is dropped against 'b'. + assert items[0].dropped is True + assert items[1].dropped is False + assert items[2].dropped is True + assert flow.state.items_dropped_dedup == 1 # only 'c' is a new drop + + +def test_falls_back_to_scalar_when_numpy_unavailable() -> None: + """If numpy can't be imported, dedup still works via the scalar path.""" + items = [ + ItemState(content="a", embedding=[0.5] * 8), + ItemState(content="b", embedding=[0.5] * 8), # dup -> dropped + ItemState(content="c", embedding=[1.0] + [0.0] * 7), # distinct -> kept + ] + flow = _make_flow() + flow.state.items = items + # Make `import numpy` raise ImportError inside intra_batch_dedup. + with patch.dict("sys.modules", {"numpy": None}): + flow.intra_batch_dedup() + assert [it.dropped for it in items] == [False, True, False] + assert flow.state.items_dropped_dedup == 1 + + +def test_ragged_embeddings_warn_and_fall_back( + caplog: pytest.LogCaptureFixture, +) -> None: + """Variable-length embeddings (an encoding bug) must warn and still dedup + correctly via the scalar fallback (len mismatch -> 0 similarity).""" + items = [ + ItemState(content="a", embedding=[0.5] * 8), + ItemState(content="b", embedding=[0.5] * 8), # dup of a -> dropped + ItemState(content="c", embedding=[0.5] * 4), # ragged -> never matches + ] + with caplog.at_level(logging.WARNING, logger="crewai.memory.encoding_flow"): + flow = _run_vectorized(items) + + assert any("ragged embeddings" in r.message for r in caplog.records) + assert [it.dropped for it in items] == [False, True, False] + assert flow.state.items_dropped_dedup == 1 + + +def test_matches_scalar_reference_on_clustered_data() -> None: + """Vectorized dedup must match the scalar reference exactly on clustered + embeddings (intra-cluster ~1.0, inter-cluster low), across many trials. + """ + rng = np.random.default_rng(0) + dim = 32 + threshold = 0.98 + + for _ in range(25): + n_clusters = int(rng.integers(1, 5)) + centers = rng.normal(size=(n_clusters, dim)) + embeddings: list[list[float]] = [] + for _ in range(int(rng.integers(2, 12))): + c = centers[int(rng.integers(0, n_clusters))] + vec = c + rng.normal(scale=1e-3, size=dim) # tiny noise -> sim ~1.0 + embeddings.append(vec.tolist()) + + vec_items = [ + ItemState(content=str(i), embedding=e) for i, e in enumerate(embeddings) + ] + scalar_items = [ + ItemState(content=str(i), embedding=e) for i, e in enumerate(embeddings) + ] + + _run_vectorized(vec_items, threshold) + + scalar_flow = _make_flow(threshold) + scalar_flow._dedup_scalar(scalar_items, threshold) + + assert [it.dropped for it in vec_items] == [ + it.dropped for it in scalar_items + ]