Skip to content

feat(stream): add method for filtering reasoning chunks from AI streaming#724

Open
adarshba wants to merge 1 commit into
juspay:releasefrom
adarshba:BZ-47583-data-is-streamed-without-any-toolcalls
Open

feat(stream): add method for filtering reasoning chunks from AI streaming#724
adarshba wants to merge 1 commit into
juspay:releasefrom
adarshba:BZ-47583-data-is-streamed-without-any-toolcalls

Conversation

@adarshba

@adarshba adarshba commented Dec 26, 2025

Copy link
Copy Markdown
Contributor

Pull Request

Description

Add createFilteredFullStream() to StreamHandler that filters out
reasoning/thinking chunks (reasoning, reasoning-signature, redacted-reasoning)
from AI SDK fullStream. This prevents chain-of-thought tokens from leaking
to users when using models with extended thinking capabilities.

Type of Change

  • 🐛 Bug fix (non-breaking change which fixes an issue)
  • ✨ New feature (non-breaking change which adds functionality)
  • 💥 Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • 📚 Documentation update
  • 🧹 Code refactoring (no functional changes)
  • ⚡ Performance improvement
  • 🧪 Test coverage improvement
  • 🔧 Build/CI configuration change

Related Issues

  • Fixes #
  • Related to #

Changes Made

  • Made streamHandler protected in BaseProvider for provider access
  • Google Vertex provider now uses createFilteredFullStream()
  • Skips: reasoning, reasoning-signature, redacted-reasoning, source, tool-call, tool-result

AI Provider Impact

  • OpenAI
  • Anthropic
  • Google AI/Vertex
  • AWS Bedrock
  • Azure OpenAI
  • Hugging Face
  • Ollama
  • Mistral
  • All providers
  • No provider-specific changes

Component Impact

  • CLI
  • SDK
  • MCP Integration
  • Streaming
  • Tool Calling
  • Configuration
  • Documentation
  • Tests

Testing

  • Unit tests added/updated
  • Integration tests added/updated
  • E2E tests added/updated
  • Manual testing performed
  • All existing tests pass

Test Environment

  • OS: Mac OS 26.2
  • Node.js version: 20
  • Package manager: pnpm

Performance Impact

  • No performance impact
  • Performance improvement
  • Minor performance impact (acceptable)
  • Significant performance impact (needs discussion)

Breaking Changes

Screenshots/Demo

Simulator.Screen.Recording.-.iPhone.17.Pro.-.2025-12-26.at.18.37.38.mov

Checklist

  • My code follows the project's style guidelines
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • Any dependent changes have been merged and published

Additional Notes

Use this method: createFilteredFullStream for reasoning models in the providers

Summary by CodeRabbit

  • New Features

    • Added filtered full-stream processing to produce cleaner, structured streamed text output.
  • Improvements

    • Streaming now skips non-text reasoning/tooling chunks and falls back to plain text streams when needed, improving clarity of streamed results.
    • Stream handler access extended to allow subclass integration where needed.
  • Bug Fixes

    • Stream errors are surfaced consistently during streaming.
  • Tests

    • Added unit tests covering filtered streaming behavior, fallbacks, mixed chunks, and error handling.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

coderabbitai Bot commented Dec 26, 2025

Copy link
Copy Markdown

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

BaseProvider now exposes its StreamHandler to subclasses; StreamHandler gains createFilteredFullStream to emit only text content from full/text streams; GoogleVertexProvider uses the new filtered full-stream API; tests added for filtering, errors, and fallback behavior.

Changes

Cohort / File(s) Summary
BaseProvider change
src/lib/core/baseProvider.ts
streamHandler access changed from private to protected to allow subclass access.
StreamHandler new API & logic
src/lib/core/modules/StreamHandler.ts
Added createFilteredFullStream(result: { fullStream?: AsyncIterable<unknown>; textStream?: AsyncIterable<string>; }): prefers fullStream, filters out non-text/reasoning/tool chunks, yields { content: string } from text-delta and string chunks, throws on error chunks, falls back to textStream.
Provider integration
src/lib/providers/googleVertex.ts
Replaced previous text-stream transform with this.streamHandler.createFilteredFullStream(result) in executeStream().
Tests
test/unit/stream/reasoning-filter.test.ts
New unit tests validating filtering of reasoning/tool/source chunks, passage of text-delta/string chunks, error handling, fallback to textStream, and empty/mixed stream scenarios.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant GoogleVertexProvider as Provider
  participant StreamHandler
  participant FullStreamSource as FullStream
  participant TextStreamSource as TextStream

  Client->>Provider: executeStream(request)
  Provider->>StreamHandler: createFilteredFullStream({ fullStream?, textStream? })
  alt fullStream available
    StreamHandler->>FullStream: iterate chunks
  else fallback to textStream
    StreamHandler->>TextStream: iterate strings
  end
  loop per chunk
    FullStream-->>StreamHandler: chunk (text-delta | string | reasoning | error | tool | source)
    alt text-delta or string
      StreamHandler-->>Provider: yield { content: "..." }   %%#{"fill":"#E6F4EA"}
    else non-text reasoning/tool/source
      StreamHandler-->>StreamHandler: skip chunk
    else error chunk
      StreamHandler-->>Provider: throw StreamingError         %%#{"fill":"#FFEFE6"}
    end
  end
  Provider-->>Client: streamed content events
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested labels

released

Poem

🐰 I hop through streams both wide and thin,
I filter the chatter, let the text march in.
Reasoning whispers get politely shunned,
Only useful words make the final run.
Hooray — cleaner streams, a carrot for fun! 🥕

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely describes the main change: adding a method to filter reasoning chunks from AI streaming. It reflects the primary objective of preventing chain-of-thought token leakage.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@adarshba adarshba force-pushed the BZ-47583-data-is-streamed-without-any-toolcalls branch from c55b290 to dfb0f18 Compare December 26, 2025 13:12

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new streaming method to filter out reasoning/thinking chunks from AI SDK's fullStream, preventing chain-of-thought (CoT) tokens from leaking to end users when using models with extended thinking capabilities like Gemini's experimental reasoning models.

Key Changes:

  • Added createFilteredFullStream() method to StreamHandler that filters reasoning-related chunk types
  • Modified Google Vertex provider to use the new filtered stream method instead of createTextStream()
  • Changed streamHandler visibility from private to protected in BaseProvider to allow provider access

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
src/lib/core/modules/StreamHandler.ts Added new createFilteredFullStream() method to filter reasoning/thinking chunks from fullStream while preserving text-delta chunks
src/lib/providers/googleVertex.ts Updated streaming implementation to use createFilteredFullStream() instead of createTextStream() for reasoning model support
src/lib/core/baseProvider.ts Changed streamHandler from private to protected visibility to enable provider-level access
pre-commit.sh Commented out npm test command in pre-commit hook

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/lib/core/modules/StreamHandler.ts
Comment thread src/lib/core/modules/StreamHandler.ts Outdated
Comment thread src/lib/core/modules/StreamHandler.ts Outdated
Comment thread src/lib/core/modules/StreamHandler.ts
@adarshba adarshba force-pushed the BZ-47583-data-is-streamed-without-any-toolcalls branch 2 times, most recently from 3faa759 to 1c9f27d Compare December 26, 2025 13:32
@adarshba

Copy link
Copy Markdown
Contributor Author

@coderabbitai fullreview

@coderabbitai

coderabbitai Bot commented Dec 26, 2025

Copy link
Copy Markdown
🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Fetch the changed files to review implementation
cat src/lib/core/baseProvider.ts

Length of output: 37752


🏁 Script executed:

#!/bin/bash
# Fetch StreamHandler implementation
cat src/lib/core/modules/StreamHandler.ts

Length of output: 6045


🏁 Script executed:

#!/bin/bash
# Fetch Google Vertex provider implementation
cat src/lib/providers/googleVertex.ts

Length of output: 67367


🏁 Script executed:

#!/bin/bash
# Check if there are other providers that might need similar updates
fd -e ts -e js . src/lib/providers/

Length of output: 967


🏁 Script executed:

#!/bin/bash
# Check if there are tests for StreamHandler
fd -e ts -e js . --exec grep -l "StreamHandler" {} \; | grep -i test

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Look for usage of createTextStream in other providers
rg -n "createTextStream|executeStream" src/lib/providers/ -A 3 -B 3

Length of output: 15693


🏁 Script executed:

#!/bin/bash
# Check the reasoning filter test file
cat test/unit/stream/reasoning-filter.test.ts

Length of output: 10188


🏁 Script executed:

#!/bin/bash
# Check if Google AI Studio provider should also be updated (compare implementation)
rg -n "createTextStream|createFilteredFullStream" src/lib/providers/googleAiStudio.ts -C 5

Length of output: 435


🏁 Script executed:

#!/bin/bash
# Check for any documentation or README about reasoning models
fd -e md README CHANGELOG | head -20 | xargs cat 2>/dev/null | grep -i "reasoning\|thinking\|chain-of-thought" || echo "No reasoning-related docs found"

Length of output: 128


🏁 Script executed:

#!/bin/bash
# Check if there are any models known to have reasoning capabilities
rg -n "reasoning|thinking|o1|o3|gemini-2" src/lib/ --type ts | head -30

Length of output: 2263


@adarshba I've completed a comprehensive review of PR #724. The implementation is solid and well-tested, but I've identified some consistency and documentation concerns that should be addressed.

✅ Strengths

  1. Excellent Test Coverage: The test file test/unit/stream/reasoning-filter.test.ts is comprehensive with 100+ test cases covering all filtering scenarios, error handling, and edge cases.

  2. Clean Implementation: The createFilteredFullStream method follows existing patterns in StreamHandler and integrates well with the codebase.

  3. Clear Purpose: Filtering reasoning chunks prevents chain-of-thought token leakage - a critical security/UX concern for reasoning-capable models.

  4. Minimal Breaking Changes: Making streamHandler protected is a safe, non-breaking change.

⚠️ Issues & Recommendations

1. Inconsistent Provider Adoption (High Priority)

Issue: PR objectives state "Google AI/Vertex provider updated" but only GoogleVertex uses the new method:

// googleVertex.ts:959 - ✅ Uses filtered stream
const transformedStream = this.streamHandler.createFilteredFullStream(result);

// googleAiStudio.ts:212 - ❌ Still uses old method
const transformedStream = this.createTextStream(result);

Recommendation: Update GoogleAI Studio provider for consistency:

// In googleAiStudio.ts line 212
- const transformedStream = this.createTextStream(result);
+ const transformedStream = this.streamHandler.createFilteredFullStream(result);

2. Missing Documentation (Medium Priority)

Issue: No guidance on when to use createFilteredFullStream vs createTextStream.

Recommendation: Add JSDoc to StreamHandler method:

/**
 * Create filtered stream from AI SDK fullStream
 *
 * Filters out reasoning/thinking chunks from the stream to prevent chain-of-thought
 * tokens from leaking to users when using models with extended thinking capabilities
 * (e.g., Gemini 2.0, GPT-4 with reasoning, Claude Opus with chain-of-thought).
 *
 * **When to use:**
 * - For providers using AI SDK's streamText() with reasoning-capable models
 * - When you want to hide internal reasoning/thinking process from end users
 * - As the default for all providers to ensure consistent behavior
 *
 * **Filtered chunk types:**
 * - reasoning, reasoning-signature, redacted-reasoning
 * - source, tool-call, tool-result
 *
 * **Fallback:** Uses textStream if fullStream is unavailable
 *
 * @param result - AI SDK stream result with fullStream or textStream
 * @returns AsyncGenerator yielding only text content chunks
 * @example
 * ```typescript
 * const result = streamText({ model, messages });
 * const filtered = this.streamHandler.createFilteredFullStream(result);
 * ```
 */
createFilteredFullStream(result: { ... }): AsyncGenerator<{ content: string }> {

3. Other Providers Need Assessment (Medium Priority)

Issue: 9 other providers still use createTextStream:

  • Mistral, Azure OpenAI, Anthropic, Bedrock, etc.

Question: Should these also use filtered streams? Especially:

  • Anthropic: Claude models have reasoning capabilities
  • OpenAI: GPT-4o, o1, o3 series have reasoning modes
  • Mistral: Some models support chain-of-thought

Recommendation:

  • Short term: Document why only Google providers need filtering
  • Long term: Consider making createFilteredFullStream the default for all providers

4. Type Safety Enhancement (Low Priority)

Issue: Chunk type discrimination uses string checks without strict typing:

if (chunk && typeof chunk === "object" && "type" in chunk) {
  const chunkType = (chunk as { type: string }).type;

Recommendation: Consider defining a discriminated union for chunk types:

type StreamChunk = 
  | { type: "text-delta"; textDelta: string }
  | { type: "reasoning"; text: string }
  | { type: "reasoning-signature"; signature: string }
  | { type: "redacted-reasoning"; data: string }
  | { type: "source"; source: Record<string, unknown> }
  | { type: "tool-call"; toolCall: unknown }
  | { type: "tool-result"; toolResult: unknown }
  | { type: "error"; error: Record<string, unknown> }
  | string;

5. Missing CHANGELOG Entry (Low Priority)

Add entry documenting this feature for users:

### Added
- `StreamHandler.createFilteredFullStream()`: Filters reasoning/thinking chunks from AI streams to prevent chain-of-thought token leakage with reasoning-capable models (Gemini 2.0, GPT-4 reasoning, etc.)

### Changed
- Made `BaseProvider.streamHandler` protected to allow provider access
- GoogleVertex provider now uses filtered streaming by default

🎯 Action Items

Must Fix:

  1. Update GoogleAI Studio provider to use createFilteredFullStream (consistency with PR objectives)

Should Consider:
2. Add comprehensive JSDoc documentation to the method
3. Document in code comments why only certain providers use filtering

Nice to Have:
4. Add CHANGELOG entry
5. Enhance type safety with discriminated unions
6. Consider making filtered streaming the default for all providers

📝 Code Quality

  • ✅ Follows existing patterns
  • ✅ Test coverage is excellent
  • ✅ No performance concerns
  • ✅ Error handling is appropriate
  • ✅ Backwards compatible

Would you like me to help implement any of these recommendations?


🧠 Learnings used
Learnt from: vigneshJuspay
Repo: juspay/neurolink PR: 0
File: :0-0
Timestamp: 2025-12-15T18:35:37.783Z
Learning: In juspay/neurolink TTS implementation (PR #691), the new StreamChunk discriminated union type introduced in TTS-019 will be integrated with StreamResult.stream during the actual TTS streaming implementation PR (TTS-020/TTS-021), not in the type-definition PR. This phased approach keeps type updates and implementation changes atomic.
✅ Actions performed

Full review triggered.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/lib/core/modules/StreamHandler.ts (1)

90-97: Enhance JSDoc with usage guidance and filtered chunk types.

The implementation is solid and past review concerns have been addressed (error handling, type safety, test coverage). However, the JSDoc could be more helpful by clarifying:

  1. When to use this method vs createTextStream
  2. Complete list of filtered chunk types
  3. Example use case
📝 Enhanced JSDoc suggestion
  /**
-  * Create filtered stream from AI SDK fullStream
+  * Create filtered stream from AI SDK fullStream or textStream
   *
-  * Filters out reasoning/thinking chunks from the stream.
-  * Used as safety net to prevent chain-of-thought tokens from leaking to users.
+  * Filters out internal/reasoning chunks from AI SDK's fullStream to prevent
+  * chain-of-thought tokens from leaking to end users. Falls back to textStream
+  * when fullStream is unavailable.
   *
-  * @param result - AI SDK stream result
+  * **When to use:**
+  * - Use this for reasoning-capable models (e.g., o1, Gemini 2.0 Flash Thinking)
+  * - Use `createTextStream` for simple text-only streaming without reasoning chunks
+  *
+  * **Filtered chunk types:**
+  * - reasoning, reasoning-signature, redacted-reasoning
+  * - source, tool-call, tool-result
+  *
+  * **Passed-through chunk types:**
+  * - text-delta (yielded as content)
+  * - string chunks (yielded as content)
+  * - error chunks (thrown as Error)
+  *
+  * @param result - AI SDK stream result with fullStream and/or textStream
+  * @returns AsyncGenerator yielding { content: string } chunks
+  * @throws Error when error chunk is encountered in stream
+  *
+  * @example
+  * ```ts
+  * const stream = this.streamHandler.createFilteredFullStream(result);
+  * for await (const chunk of stream) {
+  *   console.log(chunk.content); // Only user-visible text content
+  * }
+  * ```
   */
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3c5c9a2 and 1c9f27d.

📒 Files selected for processing (4)
  • src/lib/core/baseProvider.ts
  • src/lib/core/modules/StreamHandler.ts
  • src/lib/providers/googleVertex.ts
  • test/unit/stream/reasoning-filter.test.ts
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.{ts,tsx}: Maintain strict TypeScript type safety across all modules with comprehensive type definitions organized by domain to avoid circular dependencies
Use ErrorFactory for creating typed errors throughout the application
Wrap async operations with withTimeout utility for timeout handling

Files:

  • src/lib/core/baseProvider.ts
  • src/lib/providers/googleVertex.ts
  • src/lib/core/modules/StreamHandler.ts
  • test/unit/stream/reasoning-filter.test.ts
**/providers/*.ts

📄 CodeRabbit inference engine (CLAUDE.md)

Providers must extend a base provider or implement the provider interface and register in ProviderRegistry.registerAllProviders() with provider name, factory function, default model, and aliases

Files:

  • src/lib/providers/googleVertex.ts
🧠 Learnings (5)
📓 Common learnings
Learnt from: vigneshJuspay
Repo: juspay/neurolink PR: 0
File: :0-0
Timestamp: 2025-12-15T18:35:37.783Z
Learning: In juspay/neurolink TTS implementation (PR #691), the new StreamChunk discriminated union type introduced in TTS-019 will be integrated with StreamResult.stream during the actual TTS streaming implementation PR (TTS-020/TTS-021), not in the type-definition PR. This phased approach keeps type updates and implementation changes atomic.
📚 Learning: 2025-09-17T17:55:15.261Z
Learnt from: RajuSudhar
Repo: juspay/neurolink PR: 173
File: src/lib/index.ts:16-16
Timestamp: 2025-09-17T17:55:15.261Z
Learning: In src/lib/types/providers.ts, ProviderConfig was renamed to AIModelProviderConfig to deduplicate type names, as there was an existing ProviderConfig type that better suited the "ProviderConfig" name. This was an intentional breaking change for better type organization.

Applied to files:

  • src/lib/core/baseProvider.ts
📚 Learning: 2025-12-10T12:24:51.147Z
Learnt from: CR
Repo: juspay/neurolink PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-10T12:24:51.147Z
Learning: Applies to **/types/index.ts : Add new provider names to the AIProviderName enum in src/lib/types/index.ts when adding a new provider

Applied to files:

  • src/lib/core/baseProvider.ts
📚 Learning: 2025-12-10T12:24:51.147Z
Learnt from: CR
Repo: juspay/neurolink PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-10T12:24:51.147Z
Learning: Applies to **/utils/messageBuilder.ts : Message construction must be handled through MessageBuilder in src/lib/utils/messageBuilder.ts, which handles text, images, PDFs, and CSV files with provider-specific adapters

Applied to files:

  • src/lib/core/baseProvider.ts
📚 Learning: 2025-09-01T22:58:39.149Z
Learnt from: sudharsan-juspay
Repo: juspay/neurolink PR: 140
File: src/lib/core/types.ts:198-203
Timestamp: 2025-09-01T22:58:39.149Z
Learning: In src/lib/core/types.ts, StreamOptions (imported from streamTypes.js) and StreamingOptions are intentionally different types with different use cases. StreamingOptions is for unified AI requests with multiple provider configurations, while StreamOptions is for individual streaming operations.

Applied to files:

  • src/lib/core/modules/StreamHandler.ts
🧬 Code graph analysis (2)
src/lib/core/baseProvider.ts (1)
src/lib/core/modules/StreamHandler.ts (1)
  • StreamHandler (32-200)
test/unit/stream/reasoning-filter.test.ts (1)
src/lib/core/modules/StreamHandler.ts (1)
  • StreamHandler (32-200)
🔇 Additional comments (4)
src/lib/core/baseProvider.ts (1)

64-64: LGTM: Safe visibility change to enable subclass access.

Changing streamHandler from private to protected is a sound design decision that allows provider subclasses to access StreamHandler methods (like the new createFilteredFullStream) while maintaining encapsulation. This is a non-breaking change that aligns with the composition pattern used throughout BaseProvider.

test/unit/stream/reasoning-filter.test.ts (1)

1-317: Excellent test coverage addressing prior review feedback.

This comprehensive test suite thoroughly validates the createFilteredFullStream method:

  • ✅ All filtered chunk types (reasoning, reasoning-signature, redacted-reasoning, source, tool-call, tool-result)
  • ✅ Pass-through chunks (text-delta, strings)
  • ✅ Error handling (error chunks with and without messages)
  • ✅ Fallback behavior (textStream when fullStream unavailable)
  • ✅ Edge cases (empty streams, mixed chunk types)

The test structure is clear, mocks are appropriate, and assertions are precise. This directly addresses the prior review comment about lacking test coverage.

src/lib/core/modules/StreamHandler.ts (1)

105-133: Implementation is correct and addresses all prior review feedback.

The filtering logic properly:

  • ✅ Handles error chunks by throwing (addresses prior comment)
  • ✅ Uses type definitions for safety (addresses prior comment)
  • ✅ Filters reasoning/internal chunks to prevent CoT token leakage
  • ✅ Passes through user-visible content (text-delta, strings)
  • ✅ Falls back to textStream when fullStream unavailable
  • ✅ Has comprehensive test coverage (in reasoning-filter.test.ts)

The implementation is production-ready.

src/lib/providers/googleVertex.ts (1)

986-987: LGTM: Correct implementation of filtered streaming for Gemini adaptive thinking models.

The switch to this.streamHandler.createFilteredFullStream(result) correctly implements reasoning-chunk filtering to prevent chain-of-thought token leakage. This is appropriate for googleVertex.ts since it supports Gemini models with adaptive thinking capabilities.

Note on other providers: While googleAiStudio can also use Gemini 3 Pro (which has adaptive thinking), it currently relies on createTextStream rather than createFilteredFullStream. Whether this requires updating depends on whether googleAiStudio's streamText() result receives a fullStream containing reasoning chunks. If googleAiStudio only receives a textStream, no filtering would be needed at the stream handler level. A brief audit of googleAiStudio's actual stream result structure would clarify whether similar filtering is necessary.

@adarshba adarshba changed the title feat(stream): add method for filtering reasoning chunks from the ai s… feat(stream): add method for filtering reasoning chunks from AI streaming Dec 27, 2025
@adarshba adarshba force-pushed the BZ-47583-data-is-streamed-without-any-toolcalls branch from 1c9f27d to cefe8cb Compare December 27, 2025 04:28
* }
* ```
*/
createFilteredFullStream(result: {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adarshba Will this stop emitting events?
How can you make sure all the events are sent as much as possible because the events are required?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method only filters the text stream, tool events are captured via onStepFinish callback so tool events remain untouched.

@murdore

murdore commented Dec 27, 2025

Copy link
Copy Markdown
Contributor

@coderabbitai fullreview Also check if the comments I have added are valid.

@coderabbitai

coderabbitai Bot commented Dec 27, 2025

Copy link
Copy Markdown
🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Fetch all review comments from murdore on PR #724
gh pr view 724 --json reviews --jq '.reviews[] | select(.author.login == "murdore") | {state: .state, body: .body, createdAt: .createdAt}'

# Also fetch review comments (inline comments) from murdore
gh api repos/juspay/neurolink/pulls/724/comments --jq '.[] | select(.user.login == "murdore") | {path: .path, line: .line, body: .body, created_at: .created_at, diff_hunk: .diff_hunk}'

Length of output: 2160


🏁 Script executed:

#!/bin/bash
# Search for event emission patterns in StreamHandler and related files
rg -n "emit|EventEmitter|on\(|dispatchEvent" src/lib/core/modules/StreamHandler.ts -C 5

# Check for event-related code in BaseProvider
rg -n "emit|EventEmitter|on\(|dispatchEvent|event" src/lib/core/baseProvider.ts -C 3

# Look for event handling in the Google Vertex provider implementation
rg -n "emit|event|EventEmitter" src/lib/providers/googleVertex.ts -C 3

# Check if there's any event system in the streaming implementation
ast-grep --pattern 'class StreamHandler {
  $$$
  emit($$$) {
    $$$
  }
  $$$
}'

# Search for usage of 'event' keyword in test files to understand event requirements
rg -n "event" test/unit/stream/reasoning-filter.test.ts -C 2

Length of output: 4967


🏁 Script executed:

#!/bin/bash
# Search for onChunk, onToken, or callback patterns that might be event-related
rg -n "onChunk|onToken|callback|onChange" src/lib/core/modules/StreamHandler.ts -C 5

# Check the createTextStream implementation to compare with createFilteredFullStream
cat src/lib/core/modules/StreamHandler.ts | sed -n '40,90p'

# Look for any event-related patterns in the streamText result
rg -n "experimental_telemetry|telemetry|onFinish|onError|onChunk" src/lib/providers/googleVertex.ts -A 2 -B 2

# Check if there's any documentation about events in the codebase
rg -n "event.*emit|emit.*event" src/lib -t ts | head -20

# Let's see the full implementation of createFilteredFullStream to understand what it filters
cat src/lib/core/modules/StreamHandler.ts | sed -n '87,186p'

Length of output: 8179


🏁 Script executed:

#!/bin/bash
# Let's trace the exact flow - check how streamText result is used in GoogleVertex
cat src/lib/providers/googleVertex.ts | sed -n '905,975p'

# Also check if there are any other comments on the PR that I should validate
gh api repos/juspay/neurolink/pulls/724/comments --jq '.[] | {user: .user.login, path: .path, line: .line, body: .body, created_at: .created_at}' | jq -s 'sort_by(.created_at)'

Length of output: 5794


@murdore, I've analyzed your comment and can confirm that your concern about events not being emitted is not valid. Here's why:

Analysis of Event Emission vs. Chunk Filtering

The key insight is that event handlers and chunk filtering operate at different layers:

1. Event Handlers Are Registered at AI SDK Level (Line 905-959)

const result = await streamText({
  model: model,
  messages: messages,
  experimental_telemetry: this.getStreamTelemetryConfig(options),
  
  onError: (event) => { /* ... */ },      // Line 919 - AI SDK calls this
  onFinish: (event) => { /* ... */ },     // Line 931 - AI SDK calls this  
  onChunk: () => { chunkCount++; },       // Line 942 - AI SDK calls this
  onStepFinish: ({ toolCalls, toolResults }) => { /* ... */ }  // Line 946
});

These event handlers are passed as configuration to streamText() and are invoked by the AI SDK internally as it processes the stream.

2. Chunk Filtering Happens Downstream (Line 963)

const transformedStream = this.streamHandler.createFilteredFullStream(result);

The createFilteredFullStream method operates on the result returned by streamText, specifically on result.fullStream.

3. Why Events Are Not Affected

┌─────────────────────────────────────────────────────┐
│   AI SDK streamText()                               │
│   - Processes chunks internally                     │
│   - Emits events: onChunk, onError, onFinish, etc. │◄── Events fire here
│   - Returns result with fullStream                  │
└──────────────────┬──────────────────────────────────┘
                   │ result.fullStream
                   ▼
┌─────────────────────────────────────────────────────┐
│   createFilteredFullStream(result)                  │
│   - Iterates over result.fullStream                 │
│   - Filters chunks (reasoning, tool-call, etc.)     │◄── Filtering happens here
│   - Yields only text-delta to consumer              │
└─────────────────────────────────────────────────────┘
                   │ Filtered chunks
                   ▼
              Consumer code

The event handlers (onChunk, onFinish, onError, onStepFinish) are called by the AI SDK as it generates chunks, regardless of what downstream filtering does with those chunks.

4. Evidence from the Code

The onChunk callback increments chunkCount for every chunk the AI SDK processes. This happens before createFilteredFullStream filters anything. Similarly:

  • onError logs stream errors as they occur in the AI SDK
  • onFinish logs when the stream completes
  • onStepFinish handles tool execution storage

All of these will continue to fire normally because they're part of the AI SDK's internal processing, not the consumer-facing stream transformation.

Conclusion

Your concern about events being stopped is not applicable in this implementation. The events are emitted at the AI SDK level and will fire for all chunks (including reasoning chunks), while createFilteredFullStream only controls which chunks are yielded to the end consumer to prevent reasoning token leakage.

The filtering is purely cosmetic from an event perspective - all internal telemetry, error handling, and callbacks remain fully functional.


Now proceeding with the full review you requested.


🧠 Learnings used
Learnt from: vigneshJuspay
Repo: juspay/neurolink PR: 0
File: :0-0
Timestamp: 2025-12-15T18:35:37.783Z
Learning: In juspay/neurolink TTS implementation (PR #691), the new StreamChunk discriminated union type introduced in TTS-019 will be integrated with StreamResult.stream during the actual TTS streaming implementation PR (TTS-020/TTS-021), not in the type-definition PR. This phased approach keeps type updates and implementation changes atomic.
✅ Actions performed

Full review triggered.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/lib/core/modules/StreamHandler.ts (1)

134-157: Type guards ensure safe chunk processing.

The inline type guards (hasType, isTextDelta, isError) and getErrorMessage utility provide proper type narrowing and safe error handling, addressing the past review concern about unsafe type casts.

Optional: Consider reusing getErrorMessage and isError from src/lib/types/common.ts (lines 91-93, 110-121) for consistency across the codebase, though the current inline implementation is also acceptable.

🔎 Optional refactor to use existing utilities
+import { getErrorMessage as getCommonErrorMessage, isError as isCommonError } from "../../types/common.js";
+
 /**
  * StreamHandler class - Handles streaming operations for AI providers
  */
 export class StreamHandler {
   ...
   
   createFilteredFullStream(result: {
     fullStream?: AsyncIterable<unknown>;
     textStream?: AsyncIterable<string>;
   }): AsyncGenerator<{ content: string }> {
     // Type guards
     const hasType = (val: unknown): val is { type: string } =>
       val !== null &&
       typeof val === "object" &&
       "type" in val &&
       typeof (val as Record<string, unknown>).type === "string";
     const isTextDelta = (val: {
       type: string;
     }): val is { type: "text-delta"; textDelta: string } =>
       val.type === "text-delta" &&
       "textDelta" in val &&
       typeof (val as Record<string, unknown>).textDelta === "string";
     const isError = (val: {
       type: string;
     }): val is { type: "error"; error: unknown } =>
       val.type === "error" && "error" in val;
-
-    const getErrorMessage = (error: unknown): string => {
-      if (error !== null && typeof error === "object" && "message" in error) {
-        const msg = (error as Record<string, unknown>).message;
-        return typeof msg === "string" ? msg : "Unknown error";
-      }
-      return "Unknown error";
-    };

     return (async function* () {
       if (result.fullStream) {
         for await (const chunk of result.fullStream) {
           if (typeof chunk === "string") {
             yield { content: chunk };
           } else if (hasType(chunk)) {
             if (isError(chunk)) {
               throw new Error(
-                `Streaming error: ${getErrorMessage(chunk.error)}`,
+                `Streaming error: ${getCommonErrorMessage(chunk.error)}`,
               );
             } else if (isTextDelta(chunk)) {
               yield { content: chunk.textDelta };
             }
           }
         }
       } else if (result.textStream) {
         for await (const chunk of result.textStream) {
           yield { content: chunk };
         }
       }
     })();
   }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 74ddcef and cefe8cb.

📒 Files selected for processing (4)
  • src/lib/core/baseProvider.ts
  • src/lib/core/modules/StreamHandler.ts
  • src/lib/providers/googleVertex.ts
  • test/unit/stream/reasoning-filter.test.ts
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.{ts,tsx}: Maintain strict TypeScript type safety across all modules with comprehensive type definitions organized by domain to avoid circular dependencies
Use ErrorFactory for creating typed errors throughout the application
Wrap async operations with withTimeout utility for timeout handling

Files:

  • test/unit/stream/reasoning-filter.test.ts
  • src/lib/core/baseProvider.ts
  • src/lib/core/modules/StreamHandler.ts
  • src/lib/providers/googleVertex.ts
**/providers/*.ts

📄 CodeRabbit inference engine (CLAUDE.md)

Providers must extend a base provider or implement the provider interface and register in ProviderRegistry.registerAllProviders() with provider name, factory function, default model, and aliases

Files:

  • src/lib/providers/googleVertex.ts
🧠 Learnings (4)
📓 Common learnings
Learnt from: vigneshJuspay
Repo: juspay/neurolink PR: 0
File: :0-0
Timestamp: 2025-12-15T18:35:37.783Z
Learning: In juspay/neurolink TTS implementation (PR #691), the new StreamChunk discriminated union type introduced in TTS-019 will be integrated with StreamResult.stream during the actual TTS streaming implementation PR (TTS-020/TTS-021), not in the type-definition PR. This phased approach keeps type updates and implementation changes atomic.
📚 Learning: 2025-09-17T17:55:15.261Z
Learnt from: RajuSudhar
Repo: juspay/neurolink PR: 173
File: src/lib/index.ts:16-16
Timestamp: 2025-09-17T17:55:15.261Z
Learning: In src/lib/types/providers.ts, ProviderConfig was renamed to AIModelProviderConfig to deduplicate type names, as there was an existing ProviderConfig type that better suited the "ProviderConfig" name. This was an intentional breaking change for better type organization.

Applied to files:

  • src/lib/core/baseProvider.ts
  • src/lib/providers/googleVertex.ts
📚 Learning: 2025-12-10T12:24:51.147Z
Learnt from: CR
Repo: juspay/neurolink PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-10T12:24:51.147Z
Learning: Applies to **/utils/messageBuilder.ts : Message construction must be handled through MessageBuilder in src/lib/utils/messageBuilder.ts, which handles text, images, PDFs, and CSV files with provider-specific adapters

Applied to files:

  • src/lib/core/baseProvider.ts
📚 Learning: 2025-09-01T22:58:39.149Z
Learnt from: sudharsan-juspay
Repo: juspay/neurolink PR: 140
File: src/lib/core/types.ts:198-203
Timestamp: 2025-09-01T22:58:39.149Z
Learning: In src/lib/core/types.ts, StreamOptions (imported from streamTypes.js) and StreamingOptions are intentionally different types with different use cases. StreamingOptions is for unified AI requests with multiple provider configurations, while StreamOptions is for individual streaming operations.

Applied to files:

  • src/lib/core/modules/StreamHandler.ts
  • src/lib/providers/googleVertex.ts
🧬 Code graph analysis (3)
test/unit/stream/reasoning-filter.test.ts (1)
src/lib/core/modules/StreamHandler.ts (1)
  • StreamHandler (32-248)
src/lib/core/baseProvider.ts (1)
src/lib/core/modules/StreamHandler.ts (1)
  • StreamHandler (32-248)
src/lib/core/modules/StreamHandler.ts (1)
src/lib/types/common.ts (2)
  • isError (92-94)
  • getErrorMessage (111-122)
🔇 Additional comments (9)
src/lib/core/baseProvider.ts (1)

64-64: LGTM! Clean encapsulation improvement.

Changing streamHandler from private to protected appropriately allows subclasses like GoogleVertexProvider to access the new createFilteredFullStream method while maintaining encapsulation. The readonly modifier ensures subclasses cannot reassign the handler.

src/lib/providers/googleVertex.ts (1)

986-987: Integration looks good, but consider googleAiStudio consistency.

The switch to createFilteredFullStream correctly filters reasoning chunks for GoogleVertex. Based on PR objectives, googleAiStudio.ts still uses createTextStream and should also be updated to use createFilteredFullStream for consistency, as both use Google's reasoning-capable models.

Per PR objectives summary: "Inconsistent provider adoption (High): GoogleVertex was updated, but googleAiStudio.ts still uses createTextStream. Recommendation: update googleAiStudio to use this.streamHandler.createFilteredFullStream(result) for consistency."

test/unit/stream/reasoning-filter.test.ts (5)

25-151: Excellent coverage of filtered chunk types.

All six filtered chunk types (reasoning, reasoning-signature, redacted-reasoning, source, tool-call, tool-result) are thoroughly tested with clear assertions verifying they're excluded while text-delta chunks pass through.


153-197: Pass-through behavior well tested.

Tests validate both typed text-delta chunks and plain string chunks pass through correctly, ensuring the filter doesn't drop valid content.


199-231: Error handling properly validated.

Tests cover both error chunks with specific messages and unknown errors, verifying the stream throws appropriately. This addresses the past review concern about error chunk handling.


233-285: Fallback and edge cases covered.

Tests validate the fallback to textStream when fullStream is unavailable and handle empty stream scenarios gracefully, ensuring robust behavior.


287-336: Comprehensive test suite with clear documentation.

The mixed chunk test validates realistic scenarios where filtered and valid chunks are interleaved. The test coverage summary at the end clearly documents all validated behaviors, making the test file easy to maintain.

src/lib/core/modules/StreamHandler.ts (2)

90-129: Excellent documentation addressing key concerns.

The JSDoc comprehensively explains:

  • Purpose: prevent chain-of-thought token leakage
  • When to use: Google Vertex with reasoning-capable models
  • Filtered vs. passed-through chunk types
  • Fallback behavior to textStream
  • Practical example

This addresses the PR objective about missing documentation and the past review comment about expanding "CoT" abbreviation.


159-181: Clean filtering logic with proper fallback behavior.

The async generator correctly:

  • Prioritizes fullStream over textStream
  • Filters reasoning/thinking chunks while passing through text-delta
  • Throws on error chunks to surface failures
  • Falls back gracefully to textStream when needed

Regarding event emission (addressing murdore's question): Event hooks (onChunk, onFinish, onError, onStepFinish) are registered in the provider's streamText() call (see googleVertex.ts lines 919-964) and are invoked by the AI SDK before chunks reach this filter. This filter only affects what the consumer receives from the stream, not internal SDK event emission. Per PR objectives, this was validated by the reviewer.

@murdore murdore force-pushed the BZ-47583-data-is-streamed-without-any-toolcalls branch from cefe8cb to 55597b0 Compare December 28, 2025 19:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants