Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion lib/crewai/src/crewai/memory/encoding_flow.py

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Building the full pairwise cosine-similarity matrix introduces an O(m^2) memory footprint that can exhaust memory and crash the process for large batches. This is a denial-of-service risk if an attacker can supply or influence very large batches.

Vulnerable code:

matrix = np.asarray([emb for _, emb in active], dtype=np.float64)
norms = np.linalg.norm(matrix, axis=1)
nonzero = norms > 0.0
normalized = np.zeros_like(matrix)
normalized[nonzero] = matrix[nonzero] / norms[nonzero, None]
# Cosine-similarity matrix; zero-norm rows contribute 0.0, matching
# _cosine_similarity's zero-norm guard.
sims = normalized @ normalized.T

Impact: For batch size m, sims requires mm8 bytes (float64), quickly leading to OOM (e.g., m=50,000 -> ~20 GB). The previous scalar algorithm used O(1) extra memory.

Remediation:

  • Enforce a maximum batch size (configurable) and short-circuit or fall back to the scalar/streaming approach when exceeded.
  • Pre-check and cap based on memory budget (e.g., if m*m*8 > MAX_BYTES: fallback).
  • Compute similarities in blocks or examine only prior kept items without materializing the full matrix.

For more details, see the finding in Corridor.

Provide feedback: Reply with whether this is a valid vulnerability or false positive to help improve Corridor's accuracy.

Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,83 @@ def batch_embed(self) -> None:

@listen(batch_embed)
def intra_batch_dedup(self) -> None:
"""Drop near-exact duplicates within the batch."""
"""Drop near-exact duplicates within the batch.

Computes the pairwise cosine-similarity matrix in one vectorized pass
(normalize rows once, then a single ``X @ Xᵀ`` BLAS call) instead of the
previous O(n²) loop of pure-Python cosine calls, each of which also
recomputed both vector norms from scratch (O(n²·d)). The greedy
"first occurrence wins" selection is preserved exactly: item ``j`` is
dropped iff some earlier *kept* item is at least ``threshold`` similar.
"""
items = list(self.state.items)
if len(items) <= 1:
return

threshold = self._config.batch_dedup_threshold

try:
import numpy as np
except ImportError:
# numpy is a transitive dependency (chromadb, lancedb); if it is
# somehow unavailable, fall back to the scalar reference.
self._dedup_scalar(items, threshold)
return

# Only items carrying an embedding participate; pre-dropped items are
# excluded so they neither get re-dropped nor suppress others — exactly
# as the scalar reference skips them.
active: list[tuple[int, list[float]]] = [
(idx, item.embedding)
for idx, item in enumerate(items)
if item.embedding and not item.dropped
]
if len(active) <= 1:
return

dim = len(active[0][1])
if any(len(emb) != dim for _, emb in active):
# Ragged embeddings cannot form a matrix; this should not happen for
# a single embedder, but fall back to the scalar reference so the
# len-mismatch-as-zero-similarity behavior is preserved exactly.
# Warn (rather than silently degrade) since it signals an embedder
# returning variable-length vectors — an encoding bug worth surfacing.
logger.warning(
"intra_batch_dedup: ragged embeddings in batch (sample lengths: "
"%s); falling back to scalar dedup. This usually means the "
"embedder returned variable-length vectors.",
[len(emb) for _, emb in active[:5]],
)
self._dedup_scalar(items, threshold)
return

matrix = np.asarray([emb for _, emb in active], dtype=np.float64)
norms = np.linalg.norm(matrix, axis=1)
nonzero = norms > 0.0
normalized = np.zeros_like(matrix)
normalized[nonzero] = matrix[nonzero] / norms[nonzero, None]
# Cosine-similarity matrix; zero-norm rows contribute 0.0, matching
# _cosine_similarity's zero-norm guard.
sims = normalized @ normalized.T

m = len(active)
dropped = np.zeros(m, dtype=bool)
for j in range(1, m):
# Drop j iff an earlier, still-kept item is near-identical.
if bool((~dropped[:j] & (sims[:j, j] >= threshold)).any()):
dropped[j] = True

for local_idx in range(m):
if dropped[local_idx]:
items[active[local_idx][0]].dropped = True
self.state.items_dropped_dedup += 1

def _dedup_scalar(self, items: list[ItemState], threshold: float) -> None:
"""Reference O(n²) dedup using scalar cosine similarity.

Retained as the exact behavioral reference and as a fallback for the
(unexpected) ragged-embedding case.
"""
n = len(items)
for j in range(1, n):
if items[j].dropped or not items[j].embedding:
Expand Down
147 changes: 147 additions & 0 deletions lib/crewai/tests/memory/test_encoding_flow_dedup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Tests for EncodingFlow.intra_batch_dedup (vectorized cosine dedup).

The vectorized implementation must reproduce the exact drop decisions of the
original scalar O(n^2) algorithm, which is retained as ``_dedup_scalar``.
"""

from __future__ import annotations

import logging
from unittest.mock import MagicMock, patch

import numpy as np
import pytest

from crewai.memory.encoding_flow import EncodingFlow, ItemState
from crewai.memory.types import MemoryConfig


def _make_flow(threshold: float = 0.98) -> EncodingFlow:
config = MemoryConfig()
config.batch_dedup_threshold = threshold
return EncodingFlow(
storage=MagicMock(), llm=MagicMock(), embedder=MagicMock(), config=config
)


def _run_vectorized(items: list[ItemState], threshold: float = 0.98) -> EncodingFlow:
flow = _make_flow(threshold)
flow.state.items = items
flow.intra_batch_dedup()
return flow


def test_drops_identical_keeps_distinct() -> None:
items = [
ItemState(content="a", embedding=[0.5] * 8),
ItemState(content="b", embedding=[0.5] * 8), # identical to a -> dropped
ItemState(content="c", embedding=[1.0] + [0.0] * 7), # orthogonal -> kept
]
flow = _run_vectorized(items)
assert [it.dropped for it in items] == [False, True, False]
assert flow.state.items_dropped_dedup == 1


def test_first_occurrence_wins() -> None:
items = [ItemState(content=str(i), embedding=[0.5] * 8) for i in range(4)]
flow = _run_vectorized(items)
# Only the first survives; the rest are dropped against it.
assert [it.dropped for it in items] == [False, True, True, True]
assert flow.state.items_dropped_dedup == 3


def test_items_without_embeddings_never_dropped() -> None:
items = [
ItemState(content="a", embedding=[0.5] * 8),
ItemState(content="no-emb", embedding=[]), # never participates
ItemState(content="b", embedding=[0.5] * 8), # dup of a
]
flow = _run_vectorized(items)
assert [it.dropped for it in items] == [False, False, True]
assert flow.state.items_dropped_dedup == 1


def test_pre_dropped_item_is_skipped() -> None:
"""A pre-dropped item must neither be re-counted nor suppress others."""
a = ItemState(content="a", embedding=[0.5] * 8)
a.dropped = True # already dropped upstream
items = [
a,
ItemState(content="b", embedding=[0.5] * 8), # same vector as the dropped a
ItemState(content="c", embedding=[0.5] * 8), # dup of b
]
flow = _run_vectorized(items)
# 'a' stays dropped (not re-counted); 'b' becomes the surviving original;
# 'c' is dropped against 'b'.
assert items[0].dropped is True
assert items[1].dropped is False
assert items[2].dropped is True
assert flow.state.items_dropped_dedup == 1 # only 'c' is a new drop


def test_falls_back_to_scalar_when_numpy_unavailable() -> None:
"""If numpy can't be imported, dedup still works via the scalar path."""
items = [
ItemState(content="a", embedding=[0.5] * 8),
ItemState(content="b", embedding=[0.5] * 8), # dup -> dropped
ItemState(content="c", embedding=[1.0] + [0.0] * 7), # distinct -> kept
]
flow = _make_flow()
flow.state.items = items
# Make `import numpy` raise ImportError inside intra_batch_dedup.
with patch.dict("sys.modules", {"numpy": None}):
flow.intra_batch_dedup()
assert [it.dropped for it in items] == [False, True, False]
assert flow.state.items_dropped_dedup == 1


def test_ragged_embeddings_warn_and_fall_back(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Variable-length embeddings (an encoding bug) must warn and still dedup
correctly via the scalar fallback (len mismatch -> 0 similarity)."""
items = [
ItemState(content="a", embedding=[0.5] * 8),
ItemState(content="b", embedding=[0.5] * 8), # dup of a -> dropped
ItemState(content="c", embedding=[0.5] * 4), # ragged -> never matches
]
with caplog.at_level(logging.WARNING, logger="crewai.memory.encoding_flow"):
flow = _run_vectorized(items)

assert any("ragged embeddings" in r.message for r in caplog.records)
assert [it.dropped for it in items] == [False, True, False]
assert flow.state.items_dropped_dedup == 1


def test_matches_scalar_reference_on_clustered_data() -> None:
"""Vectorized dedup must match the scalar reference exactly on clustered
embeddings (intra-cluster ~1.0, inter-cluster low), across many trials.
"""
rng = np.random.default_rng(0)
dim = 32
threshold = 0.98

for _ in range(25):
n_clusters = int(rng.integers(1, 5))
centers = rng.normal(size=(n_clusters, dim))
embeddings: list[list[float]] = []
for _ in range(int(rng.integers(2, 12))):
c = centers[int(rng.integers(0, n_clusters))]
vec = c + rng.normal(scale=1e-3, size=dim) # tiny noise -> sim ~1.0
embeddings.append(vec.tolist())

vec_items = [
ItemState(content=str(i), embedding=e) for i, e in enumerate(embeddings)
]
scalar_items = [
ItemState(content=str(i), embedding=e) for i, e in enumerate(embeddings)
]

_run_vectorized(vec_items, threshold)

scalar_flow = _make_flow(threshold)
scalar_flow._dedup_scalar(scalar_items, threshold)

assert [it.dropped for it in vec_items] == [
it.dropped for it in scalar_items
]