Skip to content

feat: implement Redis-backed trajectory storage and concurrency tests#31

Open
Ad1th wants to merge 2 commits into
Vishisht16:mainfrom
Ad1th:update_redis
Open

feat: implement Redis-backed trajectory storage and concurrency tests#31
Ad1th wants to merge 2 commits into
Vishisht16:mainfrom
Ad1th:update_redis

Conversation

@Ad1th

@Ad1th Ad1th commented May 27, 2026

Copy link
Copy Markdown

Description

This PR upgrades the Redis backend for safe multi-process and distributed execution.
Fixes issue #5

It introduces atomic Redis operations for both escalation logging and rate limiting, and adds a dedicated Redis trajectory store so session risk windows are shared consistently across workers. The trajectory flow now switches to Redis-backed storage when the Redis backend is enabled, while preserving the existing in-memory behavior for non-Redis backends.

It also adds concurrency-focused tests using fakeredis and threaded contention to validate:

  • atomic rate-limit checks under parallel requests
  • atomic escalation log writes under parallel requests
  • atomic trajectory window updates and trimming under parallel requests

Type of Change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation update

Checklist

  • I have read the CONTRIBUTING guide
  • My code follows the project's style
  • I have added tests for new or changed behaviour
  • All tests pass (pytest tests/ -v)
  • I have updated documentation if needed
  • Self-harm / safety-related changes have been reviewed for sensitivity

Checklist point 5: marked even though no doc update since it wsa not needed

Some test results:
m pytest tests/test_redis_concurrency.py -q
... [100%]
3 passed in 0.35s
image

-m pytest tests/test_redis_concurrency.py tests/test_storage_backends.py tests/test_trajectory.py -q
.......................... [100%]
26 passed in 0.79s
image

@Ad1th Ad1th requested a review from Vishisht16 as a code owner May 27, 2026 15:15
@coderabbitai

coderabbitai Bot commented May 27, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Added Redis-backed storage for trajectory data, enabling risk analysis across distributed deployments with atomic consistency.
  • Refactor

    • Optimized rate limiting and logging operations for improved reliability under concurrent access patterns.
  • Tests

    • Added concurrency validation tests covering atomic behavior of rate limiting, logging, and trajectory analysis.

Walkthrough

This PR introduces distributed Redis deployment capabilities by adding an atomic Redis-backed trajectory store, refactoring trajectory analysis to support pluggable backends, and upgrading existing RedisStore operations to use atomic Lua scripts. A concurrency test suite validates correctness under multi-threaded contention.

Changes

Distributed Redis Backend with Atomic Operations

Layer / File(s) Summary
RedisTrajectoryStore implementation
humane_proxy/risk/redis_trajectory.py
New RedisTrajectoryStore class registers and uses an atomic Lua script to append trajectory entries (incrementing sequence ids, maintaining sorted sets and payload hashes) and reads snapshots without mutation. Includes payload parsing and window decoding helpers.
Trajectory backend infrastructure and spike/trend helpers
humane_proxy/risk/trajectory.py (config, cache, helpers)
Module now supports backend dispatch via cached RedisTrajectoryStore selection. Introduces shared internal helpers for extracting window scores/categories, computing trend from score halves, computing exponential decay spike detection, and assembling consistent TrajectoryResult output.
Trajectory public API refactoring
humane_proxy/risk/trajectory.py (detect_spike(), analyze())
detect_spike() and analyze() now dispatch to Redis or local storage based on configuration, append via the selected backend, and compute spike/trend/scores/categories using shared builders instead of inline logic. Public signatures unchanged.
RedisStore atomic Lua operations
humane_proxy/storage/redis.py
Registers two Lua scripts: one for atomic log() that increments escalation ids and writes multiple sorted-set indexes in one operation, and one for atomic check_rate_limit() that increments per-session counters with conditional expiry and threshold checking.
Concurrency tests for atomic Redis operations
tests/test_redis_concurrency.py
Validates atomicity under threading: rate limiting allows exactly rate_limit_max of 10 attempts, logging produces 12 unique non-overlapping ids, and trajectory windowing maintains exactly window_size (5) entries with unique scores under 10 concurrent analyze() calls.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

Poem

🐰 Hops gleefully through Redis keyspace

Scripts and windows dance in sync,
Lua brings the atomic link,
No more races, clean and bright—
Distributed scores forever right! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.13% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main changes: implementing Redis-backed trajectory storage and adding concurrency tests.
Description check ✅ Passed The description is directly related to the changeset, explaining the Redis backend upgrades, atomic operations, trajectory storage, and concurrency tests.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
⚔️ Resolve merge conflicts
  • Resolve merge conflict in branch update_redis

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
humane_proxy/risk/trajectory.py (1)

145-145: 💤 Low value

Add strict=True to zip() for defensive validation.

While scores[:-1] and timestamps[:-1] are derived from the same window and should always have matching lengths, adding strict=True provides a safety net if future changes inadvertently cause length mismatches.

♻️ Suggested fix
-    history = deque(zip(scores[:-1], timestamps[:-1]))
+    history = deque(zip(scores[:-1], timestamps[:-1], strict=True))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@humane_proxy/risk/trajectory.py` at line 145, The history deque is built with
zip(scores[:-1], timestamps[:-1]) which can silently drop items if lengths
diverge; change the construction in trajectory.py (the line assigning history)
to use zip(scores[:-1], timestamps[:-1], strict=True) so mismatched lengths
raise immediately (TypeError) and surface bugs; update the invocation assigning
to history (variable name history and the deque call) to include strict=True.
humane_proxy/risk/redis_trajectory.py (1)

34-60: ⚖️ Poor tradeoff

Consider adding TTL to trajectory keys for session cleanup.

The Lua script creates three keys per session (seq, window, payload) but none have an expiration set. For long-running deployments with many unique sessions, this could lead to unbounded memory growth in Redis.

Consider either:

  1. Setting TTL on keys in the Lua script (e.g., after each append, refresh TTL to 24-48 hours)
  2. Documenting that operators should configure Redis's maxmemory-policy appropriately
♻️ Example: Add TTL refresh to Lua script
             local ids = redis.call('ZRANGE', KEYS[2], 0, -1)
             local response = {}
             for _, id in ipairs(ids) do
                 local payload_value = redis.call('HGET', KEYS[3], id)
                 table.insert(response, id)
                 table.insert(response, payload_value)
             end
+            -- Refresh TTL on all keys (e.g., 48 hours = 172800 seconds)
+            local ttl = tonumber(ARGV[5]) or 172800
+            redis.call('EXPIRE', KEYS[1], ttl)
+            redis.call('EXPIRE', KEYS[2], ttl)
+            redis.call('EXPIRE', KEYS[3], ttl)
             return response

Then pass the TTL as an additional argument in append().

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@humane_proxy/risk/redis_trajectory.py` around lines 34 - 60, The Lua script
registered as self._append_script currently creates three per-session keys (the
seq key, the ZSET/window key, and the HSET payload key) but never sets
expirations; update the script to refresh TTL on all three keys by calling
redis.call('EXPIRE', KEYS[1], ARGV[5]), redis.call('EXPIRE', KEYS[2], ARGV[5]),
and redis.call('EXPIRE', KEYS[3], ARGV[5]) after the writes (or where
appropriate), and change the Python append wrapper (the append method that
invokes self._append_script) to accept/pass a TTL seconds argument (e.g.,
default 24*3600) as ARGV[5]; this ensures every append refreshes the TTL for
seq/window/payload and prevents unbounded Redis growth.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@humane_proxy/risk/redis_trajectory.py`:
- Around line 34-60: The Lua script registered as self._append_script currently
creates three per-session keys (the seq key, the ZSET/window key, and the HSET
payload key) but never sets expirations; update the script to refresh TTL on all
three keys by calling redis.call('EXPIRE', KEYS[1], ARGV[5]),
redis.call('EXPIRE', KEYS[2], ARGV[5]), and redis.call('EXPIRE', KEYS[3],
ARGV[5]) after the writes (or where appropriate), and change the Python append
wrapper (the append method that invokes self._append_script) to accept/pass a
TTL seconds argument (e.g., default 24*3600) as ARGV[5]; this ensures every
append refreshes the TTL for seq/window/payload and prevents unbounded Redis
growth.

In `@humane_proxy/risk/trajectory.py`:
- Line 145: The history deque is built with zip(scores[:-1], timestamps[:-1])
which can silently drop items if lengths diverge; change the construction in
trajectory.py (the line assigning history) to use zip(scores[:-1],
timestamps[:-1], strict=True) so mismatched lengths raise immediately
(TypeError) and surface bugs; update the invocation assigning to history
(variable name history and the deque call) to include strict=True.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c563a293-96fc-4462-b635-9862889508e8

📥 Commits

Reviewing files that changed from the base of the PR and between 1924099 and d2f59d1.

📒 Files selected for processing (4)
  • humane_proxy/risk/redis_trajectory.py
  • humane_proxy/risk/trajectory.py
  • humane_proxy/storage/redis.py
  • tests/test_redis_concurrency.py

@CLAassistant

Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@Vishisht16

Copy link
Copy Markdown
Owner

@Ad1th please resolve merge conflicts and sign the CLA. Also review CodeRabbit's comments on some files and overall docstring coverage. Didn't do a thorough review yet but seems alright at a high level.

You may need to refer to #17 to resolve merge conflicts with full understanding. Feel free to ask me if you have any questions.

@Ad1th

Ad1th commented May 27, 2026

Copy link
Copy Markdown
Author

Okay, will do so

@Ad1th

Ad1th commented May 27, 2026

Copy link
Copy Markdown
Author

I guess the failed test is because the pyproject.toml file was not updated with fakeredis, can I add it in that?

@Ad1th

Ad1th commented May 27, 2026

Copy link
Copy Markdown
Author

Also @Vishisht16 , the it is not letting me type anythign in the version box for the CLA

@Vishisht16

Copy link
Copy Markdown
Owner

@Ad1th I had already told you in #5 that you have permission to add fakeredis to pyproject.toml, so add it to the dev extras. Also, there's an import error in test_trajectory.py so rebase on latest main to include changes from #17.

For the CLA, you're not supposed to write in the version box. It takes some time to load and you'll be able to sign it once you give it some time.

@Vishisht16

Copy link
Copy Markdown
Owner

Also respect CodeRabbit's review from the previous commit.

@Ad1th

Ad1th commented May 28, 2026

Copy link
Copy Markdown
Author

okay, will work on it

@Vishisht16 Vishisht16 linked an issue May 31, 2026 that may be closed by this pull request
4 tasks
@Vishisht16

Copy link
Copy Markdown
Owner

@Ad1th when can I expect the changes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Backend] Upgrade Redis Backend for Multi-Process / Distributed Environments

3 participants