feat: optimize pipeline orchestration and thread execution in backend services#1
feat: optimize pipeline orchestration and thread execution in backend services#1linroger wants to merge 1 commit into
Conversation
… services Co-authored-by: linroger <136704361+linroger@users.noreply.github.com>
|
👋 Jules, reporting for duty! I'm here to lend a hand with this pull request. When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down. I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job! For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with New to Jules? Learn more at jules.google/docs. For security, I will only act on instructions from the user who triggered this task. |
There was a problem hiding this comment.
Pull request overview
This PR targets backend throughput and responsiveness by tuning batching, retry backoff, and thread orchestration in Zep- and pipeline-related services.
Changes:
- Capped exponential backoff delays in LLM and Zep retry loops to avoid excessively long sleeps on repeated failures.
- Improved
ZepGraphMemoryUpdaterconcurrency by reducing lock hold time during sends and adding time-based flushing of partial batches. - Tuned pipeline graph ingestion batching and made the simulation polling loop cancellation-responsive via
Event.wait().
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/app/utils/llm_client.py | Caps CLI retry backoff delay. |
| backend/app/services/zep_tools.py | Caps retry delay growth in Zep tool calls. |
| backend/app/services/zep_graph_memory_updater.py | Reduces lock contention and adds timed flushing for partial buffers; uses configured retry count. |
| backend/app/services/pipeline_orchestrator.py | Increases graph ingestion batch size; makes polling loop responsive to cancellation events. |
| backend/app/services/oasis_profile_generator.py | Changes ThreadPoolExecutor worker count during Zep edge/node search. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -1136,7 +1136,7 @@ def add_cb(msg: str, ratio: float): | |||
|
|
|||
| # batch_size 10:Zep graph.add 按 episode 异步处理,批量提交吞吐近似线性; | |||
| try: | ||
| # 并行执行edges和nodes搜索 | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
This change introduces several performance and efficiency improvements across the backend services:
ZepGraphMemoryUpdater:self._send_batch_activities(batch, platform)out of thewith self._buffer_lock:block to prevent locking up the buffer across other threads.last_flush_timeso that if the queue throws anEmptyexception and time since the last flush exceeds 5 seconds, it will flush the buffer even if the platform buffer size isn't exactly atBATCH_SIZE.MAX_RETRIESto useConfig.ZEP_MAX_RETRIESrather than hardcoding it to 3.PipelineOrchestrator:batch_size=10withbatch_size=20insideadd_text_batchesfor better ingestion performance.time.sleep(5)in the_runloop with an event-based wait utilizingcancel_ev.wait(5), fetching the event securely first, so it doesn't block unresponsively and wakes up immediately if a cancellation occurs.OasisProfileGenerator:max_workersin the ThreadPoolExecutor from2to5in_search_zep_for_entityto prevent extreme bottlenecks fetching edges/nodes context.LLM Client / Zep Tools Backoff limits:
time.sleepcalls acrossllm_client.pyandzep_tools.pyso they don't lock unexpectedly for multiple minutes on repeated failures.PR created automatically by Jules for task 14128648109808198039 started by @linroger