fix(telegram): keep polling while turns stream#3195
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
There was a problem hiding this comment.
Code Review
This pull request improves the Telegram bridge's concurrency model by running prompt turns in the background rather than blocking the update dispatch loop. It introduces active turn tracking using a Map of AbortControllers, allowing active streams to be aborted gracefully on shutdown or interruption. Additionally, callback query acknowledgements are now handled asynchronously to prevent blocking modal actions. The review feedback suggests making the stream completion handling more robust by only clearing the active turn from the database when a terminal event is successfully received, rather than unconditionally in the finally block. This ensures the bridge can reattach to a turn if a premature disconnection occurs. The reviewer also recommends notifying users of non-AbortError stream failures and updating the newly added tests to match these improvements.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if (record.event === "turn.lifecycle") { | ||
| const status = record.payload?.turn?.status || record.payload?.status; | ||
| if (["failed", "canceled", "interrupted"].includes(status)) { | ||
| await sendText(chatId, `Turn ${status}.`, { replyMarkup: controlKeyboard() }); | ||
| await sendText(chatId, `Turn ${status}.`, { replyMarkup: controlKeyboard() }); | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
Robust Stream Completion Handling
Currently, the active turn is cleared from the database in the finally block of runPrompt and startTrackedTurnStream regardless of whether the stream completed successfully or disconnected prematurely (e.g., due to a network glitch or proxy timeout). If the stream disconnects prematurely, the active turn is cleared, preventing the bridge from reattaching to it later, even though the turn might still be running in the runtime.
To make this more robust, we can have streamTurnEvents return a boolean indicating whether a terminal event (turn.completed or terminal turn.lifecycle) was successfully received, and only clear the active turn if it did.
Implementation Steps:
- Update
streamTurnEventsto returntrueon terminal events, andfalseotherwise:
async function streamTurnEvents(chatId, threadId, turnId, sinceSeq, options = {}) {
// ...
try {
// ...
for await (const event of readSse(response)) {
// ...
if (record.event === "turn.completed") {
// ...
return true; // Return true on completion
}
if (record.event === "turn.lifecycle") {
const status = record.payload?.turn?.status || record.payload?.status;
if (["failed", "canceled", "interrupted"].includes(status)) {
await sendText(chatId, `Turn ${status}.`, { replyMarkup: controlKeyboard() });
return true; // Return true on terminal lifecycle
}
}
}
return false; // Return false if stream ended without terminal event
} catch (error) {
if (error.name === "AbortError") {
// ...
return false;
}
throw error;
}
}- Update
runPromptandstartTrackedTurnStreamto only clear the active turn ifcompletedis true.
| function startTrackedTurnStream(chatId, threadId, turnId, sinceSeq) { | ||
| if (activeTurnTasks.has(chatId)) return false; | ||
|
|
||
| const controller = new AbortController(); | ||
| const task = { controller }; | ||
| activeTurnTasks.set(chatId, task); | ||
| void streamTurnEvents(chatId, threadId, turnId, sinceSeq, { signal: controller.signal }) | ||
| .catch((error) => { | ||
| console.error("failed to stream Telegram bridge turn", error); | ||
| }) | ||
| .finally(async () => { | ||
| if (activeTurnTasks.get(chatId) === task) { | ||
| activeTurnTasks.delete(chatId); | ||
| } | ||
| if (!stopping) { | ||
| await clearActiveTurn(chatId); | ||
| } | ||
| }); | ||
| return true; | ||
| } |
There was a problem hiding this comment.
Only clear the active turn from the database if the stream completed cleanly (i.e., received a terminal event). If the stream disconnected prematurely or encountered a network error, keeping the active turn in the database allows the bridge to reattach to it later.
function startTrackedTurnStream(chatId, threadId, turnId, sinceSeq) {
if (activeTurnTasks.has(chatId)) return false;
const controller = new AbortController();
const task = { controller };
activeTurnTasks.set(chatId, task);
let completed = false;
void streamTurnEvents(chatId, threadId, turnId, sinceSeq, { signal: controller.signal })
.then((res) => {
completed = res;
})
.catch((error) => {
console.error("failed to stream Telegram bridge turn", error);
})
.finally(async () => {
if (activeTurnTasks.get(chatId) === task) {
activeTurnTasks.delete(chatId);
}
if (!stopping && completed) {
await clearActiveTurn(chatId);
}
});
return true;
}| try { | ||
| await streamTurnEvents(chatId, state.threadId, turnId, sinceSeq); | ||
| await streamTurnEvents(chatId, state.threadId, turnId, sinceSeq, options); | ||
| } finally { | ||
| await threadStore.patchChat(chatId, { | ||
| activeTurnId: null, | ||
| updatedAt: new Date().toISOString() | ||
| }); | ||
| if (!stopping) { | ||
| await clearActiveTurn(chatId); | ||
| } | ||
| } |
There was a problem hiding this comment.
Only clear the active turn if the stream completed cleanly, matching the behavior of startTrackedTurnStream.
let completed = false;
try {
completed = await streamTurnEvents(chatId, state.threadId, turnId, sinceSeq, options);
} finally {
if (!stopping && completed) {
await clearActiveTurn(chatId);
}
}| } catch (error) { | ||
| if (error.name === "AbortError") { | ||
| await sendText(chatId, `Turn timed out after ${Math.round(config.turnTimeoutMs / 1000)}s.`); | ||
| if (timedOut) { | ||
| await sendText(chatId, `Turn timed out after ${Math.round(config.turnTimeoutMs / 1000)}s.`); | ||
| } else if (!stopping) { | ||
| await sendText(chatId, "Turn stream aborted."); | ||
| } | ||
| return; | ||
| } | ||
| throw error; | ||
| } finally { |
There was a problem hiding this comment.
When a non-AbortError occurs (e.g., network disconnection, runtime crash), the stream currently fails silently from the user's perspective. Notifying the user of the error improves the user experience.
} catch (error) {
if (error.name === "AbortError") {
if (timedOut) {
await sendText(chatId, `Turn timed out after ${Math.round(config.turnTimeoutMs / 1000)}s.`);
} else if (!stopping) {
await sendText(chatId, "Turn stream aborted.");
}
return;
}
if (!stopping) {
await sendText(chatId, `Error streaming turn: ${error.message}`).catch(() => {});
}
throw error;
}| assert.match(runPrompt, /if \(!stopping\) {\s*await clearActiveTurn\(chatId\);\s*}/); | ||
|
|
||
| const trackedStream = extractFunction(source, "startTrackedTurnStream"); | ||
| assert.match(trackedStream, /if \(!stopping\) {\s*await clearActiveTurn\(chatId\);\s*}/); |
There was a problem hiding this comment.
If you implement the robust stream completion handling (only clearing the active turn when completed is true), make sure to update these regex assertions to match the new pattern.
| assert.match(runPrompt, /if \(!stopping\) {\s*await clearActiveTurn\(chatId\);\s*}/); | |
| const trackedStream = extractFunction(source, "startTrackedTurnStream"); | |
| assert.match(trackedStream, /if \(!stopping\) {\s*await clearActiveTurn\(chatId\);\s*}/); | |
| assert.match(runPrompt, /if \(!stopping && completed\) {\s*await clearActiveTurn\(chatId\);\s*}/); | |
| const trackedStream = extractFunction(source, "startTrackedTurnStream"); | |
| assert.match(trackedStream, /if \(!stopping && completed\) {\s*await clearActiveTurn\(chatId\);\s*}/); |
Summary
Fixes #2966.
getUpdatesTesting
npm test(fromintegrations/telegram-bridge)npm run check(fromintegrations/telegram-bridge)git diff --check upstream/main...HEADcargo fmt --all -- --check(not run; this change is limited to the Node-only Telegram bridge package)cargo clippy --workspace --all-targets --all-features(not run; this change is limited to the Node-only Telegram bridge package)cargo test --workspace --all-features(not run; this change is limited to the Node-only Telegram bridge package)Checklist