From 1bd6d55aadaa7520dd8d0bfa23c13bc028b09d4a Mon Sep 17 00:00:00 2001 From: HumphreySun98 Date: Wed, 24 Jun 2026 12:40:20 -0500 Subject: [PATCH 1/3] perf(memory): vectorize intra-batch dedup cosine similarity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit intra_batch_dedup compared every pair of batch embeddings with a pure-Python cosine helper that recomputed both norms from scratch on each call — O(n^2·d) with large constants (the default embedder is 3072-dim). For a 200-item batch this took ~4.3s. Normalize the embedding matrix once and compute the full similarity matrix in a single BLAS `X @ Xᵀ` call, then run the same greedy "first occurrence wins" selection over it. Drop decisions are identical to the scalar algorithm, which is retained as `_dedup_scalar` (reference + ragged-embedding fallback). ~95x faster on a 200-item batch; numpy is already a transitive core dependency (chromadb, lancedb). Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/crewai/src/crewai/memory/encoding_flow.py | 58 ++++++++- .../tests/memory/test_encoding_flow_dedup.py | 111 ++++++++++++++++++ 2 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 lib/crewai/tests/memory/test_encoding_flow_dedup.py diff --git a/lib/crewai/src/crewai/memory/encoding_flow.py b/lib/crewai/src/crewai/memory/encoding_flow.py index 968b439bff..80450a6900 100644 --- a/lib/crewai/src/crewai/memory/encoding_flow.py +++ b/lib/crewai/src/crewai/memory/encoding_flow.py @@ -18,6 +18,7 @@ from typing import Any from uuid import uuid4 +import numpy as np from pydantic import BaseModel, Field from crewai.flow.flow import Flow, listen, start @@ -118,12 +119,67 @@ def batch_embed(self) -> None: @listen(batch_embed) def intra_batch_dedup(self) -> None: - """Drop near-exact duplicates within the batch.""" + """Drop near-exact duplicates within the batch. + + Computes the pairwise cosine-similarity matrix in one vectorized pass + (normalize rows once, then a single ``X @ Xᵀ`` BLAS call) instead of the + previous O(n²) loop of pure-Python cosine calls, each of which also + recomputed both vector norms from scratch (O(n²·d)). The greedy + "first occurrence wins" selection is preserved exactly: item ``j`` is + dropped iff some earlier *kept* item is at least ``threshold`` similar. + """ items = list(self.state.items) if len(items) <= 1: return threshold = self._config.batch_dedup_threshold + + # Only items carrying an embedding participate; pre-dropped items are + # excluded so they neither get re-dropped nor suppress others — exactly + # as the scalar reference skips them. + active: list[tuple[int, list[float]]] = [ + (idx, item.embedding) + for idx, item in enumerate(items) + if item.embedding and not item.dropped + ] + if len(active) <= 1: + return + + dim = len(active[0][1]) + if any(len(emb) != dim for _, emb in active): + # Ragged embeddings cannot form a matrix; this should not happen for + # a single embedder, but fall back to the scalar reference so the + # len-mismatch-as-zero-similarity behavior is preserved exactly. + self._dedup_scalar(items, threshold) + return + + matrix = np.asarray([emb for _, emb in active], dtype=np.float64) + norms = np.linalg.norm(matrix, axis=1) + nonzero = norms > 0.0 + normalized = np.zeros_like(matrix) + normalized[nonzero] = matrix[nonzero] / norms[nonzero, None] + # Cosine-similarity matrix; zero-norm rows contribute 0.0, matching + # _cosine_similarity's zero-norm guard. + sims = normalized @ normalized.T + + m = len(active) + dropped = np.zeros(m, dtype=bool) + for j in range(1, m): + # Drop j iff an earlier, still-kept item is near-identical. + if bool((~dropped[:j] & (sims[:j, j] >= threshold)).any()): + dropped[j] = True + + for local_idx in range(m): + if dropped[local_idx]: + items[active[local_idx][0]].dropped = True + self.state.items_dropped_dedup += 1 + + def _dedup_scalar(self, items: list[ItemState], threshold: float) -> None: + """Reference O(n²) dedup using scalar cosine similarity. + + Retained as the exact behavioral reference and as a fallback for the + (unexpected) ragged-embedding case. + """ n = len(items) for j in range(1, n): if items[j].dropped or not items[j].embedding: diff --git a/lib/crewai/tests/memory/test_encoding_flow_dedup.py b/lib/crewai/tests/memory/test_encoding_flow_dedup.py new file mode 100644 index 0000000000..a5591c0f70 --- /dev/null +++ b/lib/crewai/tests/memory/test_encoding_flow_dedup.py @@ -0,0 +1,111 @@ +"""Tests for EncodingFlow.intra_batch_dedup (vectorized cosine dedup). + +The vectorized implementation must reproduce the exact drop decisions of the +original scalar O(n^2) algorithm, which is retained as ``_dedup_scalar``. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import numpy as np + +from crewai.memory.encoding_flow import EncodingFlow, ItemState +from crewai.memory.types import MemoryConfig + + +def _make_flow(threshold: float = 0.98) -> EncodingFlow: + config = MemoryConfig() + config.batch_dedup_threshold = threshold + return EncodingFlow( + storage=MagicMock(), llm=MagicMock(), embedder=MagicMock(), config=config + ) + + +def _run_vectorized(items: list[ItemState], threshold: float = 0.98) -> EncodingFlow: + flow = _make_flow(threshold) + flow.state.items = items + flow.intra_batch_dedup() + return flow + + +def test_drops_identical_keeps_distinct() -> None: + items = [ + ItemState(content="a", embedding=[0.5] * 8), + ItemState(content="b", embedding=[0.5] * 8), # identical to a -> dropped + ItemState(content="c", embedding=[1.0] + [0.0] * 7), # orthogonal -> kept + ] + flow = _run_vectorized(items) + assert [it.dropped for it in items] == [False, True, False] + assert flow.state.items_dropped_dedup == 1 + + +def test_first_occurrence_wins() -> None: + items = [ItemState(content=str(i), embedding=[0.5] * 8) for i in range(4)] + flow = _run_vectorized(items) + # Only the first survives; the rest are dropped against it. + assert [it.dropped for it in items] == [False, True, True, True] + assert flow.state.items_dropped_dedup == 3 + + +def test_items_without_embeddings_never_dropped() -> None: + items = [ + ItemState(content="a", embedding=[0.5] * 8), + ItemState(content="no-emb", embedding=[]), # never participates + ItemState(content="b", embedding=[0.5] * 8), # dup of a + ] + flow = _run_vectorized(items) + assert [it.dropped for it in items] == [False, False, True] + assert flow.state.items_dropped_dedup == 1 + + +def test_pre_dropped_item_is_skipped() -> None: + """A pre-dropped item must neither be re-counted nor suppress others.""" + a = ItemState(content="a", embedding=[0.5] * 8) + a.dropped = True # already dropped upstream + items = [ + a, + ItemState(content="b", embedding=[0.5] * 8), # same vector as the dropped a + ItemState(content="c", embedding=[0.5] * 8), # dup of b + ] + flow = _run_vectorized(items) + # 'a' stays dropped (not re-counted); 'b' becomes the surviving original; + # 'c' is dropped against 'b'. + assert items[0].dropped is True + assert items[1].dropped is False + assert items[2].dropped is True + assert flow.state.items_dropped_dedup == 1 # only 'c' is a new drop + + +def test_matches_scalar_reference_on_clustered_data() -> None: + """Vectorized dedup must match the scalar reference exactly on clustered + embeddings (intra-cluster ~1.0, inter-cluster low), across many trials. + """ + rng = np.random.default_rng(0) + dim = 32 + threshold = 0.98 + + for _ in range(25): + n_clusters = int(rng.integers(1, 5)) + centers = rng.normal(size=(n_clusters, dim)) + embeddings: list[list[float]] = [] + for _ in range(int(rng.integers(2, 12))): + c = centers[int(rng.integers(0, n_clusters))] + vec = c + rng.normal(scale=1e-3, size=dim) # tiny noise -> sim ~1.0 + embeddings.append(vec.tolist()) + + vec_items = [ + ItemState(content=str(i), embedding=e) for i, e in enumerate(embeddings) + ] + scalar_items = [ + ItemState(content=str(i), embedding=e) for i, e in enumerate(embeddings) + ] + + _run_vectorized(vec_items, threshold) + + scalar_flow = _make_flow(threshold) + scalar_flow._dedup_scalar(scalar_items, threshold) + + assert [it.dropped for it in vec_items] == [ + it.dropped for it in scalar_items + ] From 97b38dcf716c21f3a76eccdf258b4ad542f93962 Mon Sep 17 00:00:00 2001 From: HumphreySun98 Date: Wed, 24 Jun 2026 14:37:40 -0500 Subject: [PATCH 2/3] refactor(memory): import numpy lazily in intra_batch_dedup Avoid a hard module-level numpy import (numpy is a transitive dependency via chromadb/lancedb, not a declared one). Import it inside the dedup method and fall back to the scalar reference if it is unavailable, so the module always imports cleanly. Adds a numpy-absent fallback test. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/crewai/src/crewai/memory/encoding_flow.py | 9 ++++++++- .../tests/memory/test_encoding_flow_dedup.py | 18 +++++++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/lib/crewai/src/crewai/memory/encoding_flow.py b/lib/crewai/src/crewai/memory/encoding_flow.py index 80450a6900..dcb753976e 100644 --- a/lib/crewai/src/crewai/memory/encoding_flow.py +++ b/lib/crewai/src/crewai/memory/encoding_flow.py @@ -18,7 +18,6 @@ from typing import Any from uuid import uuid4 -import numpy as np from pydantic import BaseModel, Field from crewai.flow.flow import Flow, listen, start @@ -134,6 +133,14 @@ def intra_batch_dedup(self) -> None: threshold = self._config.batch_dedup_threshold + try: + import numpy as np + except ImportError: + # numpy is a transitive dependency (chromadb, lancedb); if it is + # somehow unavailable, fall back to the scalar reference. + self._dedup_scalar(items, threshold) + return + # Only items carrying an embedding participate; pre-dropped items are # excluded so they neither get re-dropped nor suppress others — exactly # as the scalar reference skips them. diff --git a/lib/crewai/tests/memory/test_encoding_flow_dedup.py b/lib/crewai/tests/memory/test_encoding_flow_dedup.py index a5591c0f70..559d0d10ff 100644 --- a/lib/crewai/tests/memory/test_encoding_flow_dedup.py +++ b/lib/crewai/tests/memory/test_encoding_flow_dedup.py @@ -6,7 +6,7 @@ from __future__ import annotations -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import numpy as np @@ -77,6 +77,22 @@ def test_pre_dropped_item_is_skipped() -> None: assert flow.state.items_dropped_dedup == 1 # only 'c' is a new drop +def test_falls_back_to_scalar_when_numpy_unavailable() -> None: + """If numpy can't be imported, dedup still works via the scalar path.""" + items = [ + ItemState(content="a", embedding=[0.5] * 8), + ItemState(content="b", embedding=[0.5] * 8), # dup -> dropped + ItemState(content="c", embedding=[1.0] + [0.0] * 7), # distinct -> kept + ] + flow = _make_flow() + flow.state.items = items + # Make `import numpy` raise ImportError inside intra_batch_dedup. + with patch.dict("sys.modules", {"numpy": None}): + flow.intra_batch_dedup() + assert [it.dropped for it in items] == [False, True, False] + assert flow.state.items_dropped_dedup == 1 + + def test_matches_scalar_reference_on_clustered_data() -> None: """Vectorized dedup must match the scalar reference exactly on clustered embeddings (intra-cluster ~1.0, inter-cluster low), across many trials. From 199a9ad427b74135c2449a66bbf4b46c7d452cde Mon Sep 17 00:00:00 2001 From: HumphreySun98 Date: Thu, 25 Jun 2026 00:15:56 -0500 Subject: [PATCH 3/3] chore(memory): warn when intra-batch dedup hits ragged embeddings The ragged-embedding fallback is a "should not happen" guard; if it fires, an embedder returned variable-length vectors. Log a warning (with sample lengths) instead of silently degrading to the scalar path, so the encoding bug surfaces. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/crewai/src/crewai/memory/encoding_flow.py | 8 ++++++++ .../tests/memory/test_encoding_flow_dedup.py | 20 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/lib/crewai/src/crewai/memory/encoding_flow.py b/lib/crewai/src/crewai/memory/encoding_flow.py index dcb753976e..a75faeacc5 100644 --- a/lib/crewai/src/crewai/memory/encoding_flow.py +++ b/lib/crewai/src/crewai/memory/encoding_flow.py @@ -157,6 +157,14 @@ def intra_batch_dedup(self) -> None: # Ragged embeddings cannot form a matrix; this should not happen for # a single embedder, but fall back to the scalar reference so the # len-mismatch-as-zero-similarity behavior is preserved exactly. + # Warn (rather than silently degrade) since it signals an embedder + # returning variable-length vectors — an encoding bug worth surfacing. + logger.warning( + "intra_batch_dedup: ragged embeddings in batch (sample lengths: " + "%s); falling back to scalar dedup. This usually means the " + "embedder returned variable-length vectors.", + [len(emb) for _, emb in active[:5]], + ) self._dedup_scalar(items, threshold) return diff --git a/lib/crewai/tests/memory/test_encoding_flow_dedup.py b/lib/crewai/tests/memory/test_encoding_flow_dedup.py index 559d0d10ff..f6198df60c 100644 --- a/lib/crewai/tests/memory/test_encoding_flow_dedup.py +++ b/lib/crewai/tests/memory/test_encoding_flow_dedup.py @@ -6,9 +6,11 @@ from __future__ import annotations +import logging from unittest.mock import MagicMock, patch import numpy as np +import pytest from crewai.memory.encoding_flow import EncodingFlow, ItemState from crewai.memory.types import MemoryConfig @@ -93,6 +95,24 @@ def test_falls_back_to_scalar_when_numpy_unavailable() -> None: assert flow.state.items_dropped_dedup == 1 +def test_ragged_embeddings_warn_and_fall_back( + caplog: pytest.LogCaptureFixture, +) -> None: + """Variable-length embeddings (an encoding bug) must warn and still dedup + correctly via the scalar fallback (len mismatch -> 0 similarity).""" + items = [ + ItemState(content="a", embedding=[0.5] * 8), + ItemState(content="b", embedding=[0.5] * 8), # dup of a -> dropped + ItemState(content="c", embedding=[0.5] * 4), # ragged -> never matches + ] + with caplog.at_level(logging.WARNING, logger="crewai.memory.encoding_flow"): + flow = _run_vectorized(items) + + assert any("ragged embeddings" in r.message for r in caplog.records) + assert [it.dropped for it in items] == [False, True, False] + assert flow.state.items_dropped_dedup == 1 + + def test_matches_scalar_reference_on_clustered_data() -> None: """Vectorized dedup must match the scalar reference exactly on clustered embeddings (intra-cluster ~1.0, inter-cluster low), across many trials.