Feat/gemini live translate#23
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces real-time translation features powered by the Gemini Live API, adding a Python command-line tool for translating YouTube audio and a Next.js web application integrated with LiveKit for multilingual broadcasting. The review feedback highlights several critical improvements: resolving potential UnboundLocalError exceptions in the Python audio playback loops if PyAudio fails to initialize, fixing a concurrency race condition in the LiveKit session manager that could spin up duplicate translation bridges, removing duplicate unmount cleanup logic in the frontend that prematurely tears down translation sessions, and ensuring the Gemini WebSocket is closed on setup timeout to prevent resource leaks.
| pya = pyaudio.PyAudio() | ||
| try: | ||
| stream = await asyncio.to_thread( | ||
| pya.open, | ||
| format=FORMAT, | ||
| channels=CHANNELS, | ||
| rate=RECEIVE_SAMPLE_RATE, | ||
| output=True, | ||
| ) | ||
| while True: | ||
| bytestream = await audio_queue_output.get() | ||
| await asyncio.to_thread(stream.write, bytestream) | ||
| audio_queue_output.task_done() | ||
| except asyncio.CancelledError: | ||
| pass | ||
| finally: | ||
| try: | ||
| stream.stop_stream() | ||
| stream.close() | ||
| except Exception: | ||
| pass | ||
| pya.terminate() |
There was a problem hiding this comment.
If pyaudio.PyAudio().open fails (e.g., in a headless environment or when no audio output device is available), the stream variable will not be initialized. This causes an UnboundLocalError in the finally block when attempting to call stream.stop_stream(), masking the original PyAudio exception. Initializing stream = None and checking if it is not None before closing resolves this issue.
pya = pyaudio.PyAudio()
stream = None
try:
stream = await asyncio.to_thread(
pya.open,
format=FORMAT,
channels=CHANNELS,
rate=RECEIVE_SAMPLE_RATE,
output=True,
)
while True:
bytestream = await audio_queue_output.get()
await asyncio.to_thread(stream.write, bytestream)
audio_queue_output.task_done()
except asyncio.CancelledError:
pass
finally:
if stream is not None:
try:
stream.stop_stream()
stream.close()
except Exception:
pass
pya.terminate()| pya = pyaudio.PyAudio() | ||
| try: | ||
| stream = await asyncio.to_thread( | ||
| pya.open, | ||
| format=FORMAT, | ||
| channels=CHANNELS, | ||
| rate=SEND_SAMPLE_RATE, # 16000Hz (original audio rate) | ||
| output=True, | ||
| ) | ||
| while True: | ||
| bytestream = await original_playback_queue.get() | ||
| await asyncio.to_thread(stream.write, bytestream) | ||
| original_playback_queue.task_done() | ||
| except asyncio.CancelledError: | ||
| pass | ||
| finally: | ||
| try: | ||
| stream.stop_stream() | ||
| stream.close() | ||
| except Exception: | ||
| pass | ||
| pya.terminate() |
There was a problem hiding this comment.
Similar to play_audio, if pyaudio.PyAudio().open fails, stream remains uninitialized, leading to an UnboundLocalError in the finally block. Initializing stream = None and checking it before closing prevents this.
pya = pyaudio.PyAudio()
stream = None
try:
stream = await asyncio.to_thread(
pya.open,
format=FORMAT,
channels=CHANNELS,
rate=SEND_SAMPLE_RATE, # 16000Hz (original audio rate)
output=True,
)
while True:
bytestream = await original_playback_queue.get()
await asyncio.to_thread(stream.write, bytestream)
original_playback_queue.task_done()
except asyncio.CancelledError:
pass
finally:
if stream is not None:
try:
stream.stop_stream()
stream.close()
except Exception:
pass
pya.terminate()| const existingBridge = languageMap.get(targetLanguage); | ||
| if (existingBridge && existingBridge.status === "active") { | ||
| console.log( | ||
| `[SessionManager] Reusing existing bridge for ${targetLanguage} in session ${sessionId}` | ||
| ); | ||
| existingBridge.subscriberCount++; | ||
| return existingBridge; | ||
| } |
There was a problem hiding this comment.
There is a concurrency race condition when multiple attendees request the same language at the same time. While the first bridge is in the starting state (awaiting bridge.start()), subsequent concurrent requests will not find an active bridge and will spin up duplicate translation bridges for the same language, violating the single-session constraint. Checking for both active and starting states prevents duplicate bridge creation.
| const existingBridge = languageMap.get(targetLanguage); | |
| if (existingBridge && existingBridge.status === "active") { | |
| console.log( | |
| `[SessionManager] Reusing existing bridge for ${targetLanguage} in session ${sessionId}` | |
| ); | |
| existingBridge.subscriberCount++; | |
| return existingBridge; | |
| } | |
| const existingBridge = languageMap.get(targetLanguage); | |
| if (existingBridge && (existingBridge.status === "active" || existingBridge.status === "starting")) { | |
| console.log( | |
| `[SessionManager] Reusing existing bridge for ${targetLanguage} in session ${sessionId}` | |
| ); | |
| existingBridge.subscriberCount++; | |
| return existingBridge; | |
| } |
| useEffect(() => { | ||
| return () => { | ||
| const lang = activeLanguageRef.current; | ||
| if (lang && lang !== "original") { | ||
| const payload = JSON.stringify({ sessionId, targetLanguage: lang }); | ||
| const blob = new Blob([payload], { type: "application/json" }); | ||
| // sendBeacon is reliable during page unload | ||
| const sent = navigator.sendBeacon?.("/api/translate/unsubscribe", blob); | ||
| if (!sent) { | ||
| fetch("/api/translate/unsubscribe", { | ||
| method: "POST", | ||
| headers: { "Content-Type": "application/json" }, | ||
| body: payload, | ||
| keepalive: true, | ||
| }).catch(() => { }); | ||
| } | ||
| } | ||
| }; | ||
| }, [sessionId]); |
There was a problem hiding this comment.
Both LanguageSelector and its parent AttendeeView (in watch/page.tsx) implement unmount/unload cleanup logic that sends an unsubscribe request to /api/translate/unsubscribe. When a user leaves, both cleanups run, sending duplicate unsubscribe requests. This decrements the subscriber count twice, prematurely tearing down the translation bridge for other active listeners. Removing the duplicate cleanup from LanguageSelector resolves this, as AttendeeView already handles it comprehensively.
// Unsubscribe on unmount is handled by the parent AttendeeView component to prevent duplicate decrements
| setTimeout(() => { | ||
| if (!this.geminiSetupComplete) { | ||
| clearInterval(checkSetup); | ||
| reject(new Error("Gemini setup timeout")); | ||
| } | ||
| }, 15000); |
There was a problem hiding this comment.
If the Gemini setup times out, the WebSocket connection is left open, which can lead to resource leaks. Closing the WebSocket connection on timeout ensures proper cleanup.
// Timeout after 15 seconds
setTimeout(() => {
if (!this.geminiSetupComplete) {
clearInterval(checkSetup);
if (this.geminiWs) {
this.geminiWs.close();
this.geminiWs = null;
}
reject(new Error("Gemini setup timeout"));
}
}, 15000);
https://ai.google.dev/gemini-api/docs/live-api/live-translate