-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgladia_handler.py
More file actions
177 lines (141 loc) · 6.05 KB
/
Copy pathgladia_handler.py
File metadata and controls
177 lines (141 loc) · 6.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
"""
Gladia real-time transcription handler for Gradio integration
"""
import asyncio
import json
import os
from typing import Optional, Callable
import httpx
from websockets.asyncio.client import ClientConnection, connect
from dotenv import load_dotenv
class GladiaHandler:
"""Handle Gladia real-time transcription with WebSocket"""
def __init__(self, language: str = "de", partial: bool = True):
load_dotenv()
self.api_key = os.getenv("GLADIA_API_KEY")
if not self.api_key:
raise ValueError("GLADIA_API_KEY not found in environment")
self.language = language
self.partial = partial
self.session_id: Optional[str] = None
self.websocket: Optional[ClientConnection] = None
self.receiver_task: Optional[asyncio.Task] = None
self.is_running = False
# Callback for transcript updates
self.on_transcript: Optional[Callable[[str, bool], None]] = None
async def initialize(self):
"""Initialize session and connect to WebSocket"""
if self.is_running:
print("⚠️ Gladia session already running")
return
config = {
"model": "solaria-1",
"encoding": "wav/pcm",
"sample_rate": 16000,
"bit_depth": 16,
"channels": 1,
"language_config": {
"languages": [self.language] if self.language != "auto" else [],
"code_switching": self.language == "auto",
},
"messages_config": {
"receive_partial_transcripts": self.partial,
"receive_final_transcripts": True,
},
}
try:
# Initialize session
response = httpx.post(
"https://api.gladia.io/v2/live",
headers={"x-gladia-key": self.api_key},
json=config,
timeout=30,
)
if response.status_code not in (200, 201):
raise Exception(f"Failed to initialize: {response.status_code} - {response.text}")
session_data = response.json()
self.session_id = session_data["id"]
websocket_url = session_data["url"]
print(f"✅ Gladia session initialized: {self.session_id}")
# Connect to WebSocket
self.websocket = await connect(websocket_url)
self.is_running = True
# Start receiver task
self.receiver_task = asyncio.create_task(self._receive_messages())
print("✅ Gladia WebSocket connected")
except Exception as e:
print(f"❌ Gladia initialization failed: {e}")
raise
async def send_audio(self, audio_data: bytes):
"""Send audio chunk to Gladia"""
if not self.is_running or not self.websocket:
raise Exception("Gladia session not initialized")
try:
await self.websocket.send(audio_data)
except Exception as e:
print(f"❌ Failed to send audio: {e}")
raise
async def _receive_messages(self):
"""Receive and process messages from Gladia WebSocket"""
try:
async for message in self.websocket:
try:
msg = json.loads(message)
self._handle_message(msg)
except json.JSONDecodeError as e:
print(f"⚠️ JSON decode error: {e}")
except Exception as e:
if self.is_running:
print(f"❌ Receiver error: {e}")
def _handle_message(self, message: dict):
"""Handle incoming message from Gladia"""
msg_type = message.get("type")
if msg_type == "transcript":
data = message.get("data", {})
is_final = data.get("is_final", False)
utterance = data.get("utterance", {})
text = utterance.get("text", "").strip()
if text and self.on_transcript:
# Call the callback with transcript and is_final flag
self.on_transcript(text, is_final)
elif msg_type == "speech_start":
print("🎤 Speech detected")
elif msg_type == "speech_end":
print("🎤 Speech ended")
elif msg_type == "post_final_transcript":
print("✅ Final transcript received")
async def stop(self):
"""Stop the Gladia session"""
if not self.is_running:
return
self.is_running = False
try:
# Send stop recording message
if self.websocket:
await self.websocket.send(json.dumps({"type": "stop_recording"}))
await asyncio.sleep(0.5) # Give it time to process
# Cancel receiver task
if self.receiver_task:
self.receiver_task.cancel()
try:
await self.receiver_task
except asyncio.CancelledError:
pass
# Close WebSocket
await self.websocket.close()
print(f"✅ Gladia session stopped: {self.session_id}")
except Exception as e:
print(f"⚠️ Error stopping Gladia: {e}")
finally:
self.websocket = None
self.receiver_task = None
self.session_id = None
def set_transcript_callback(self, callback: Callable[[str, bool], None]):
"""Set callback function for transcript updates"""
self.on_transcript = callback
# Convenience function for Gradio integration
async def create_gladia_handler(language: str = "de", partial: bool = True) -> GladiaHandler:
"""Create and initialize a Gladia handler"""
handler = GladiaHandler(language=language, partial=partial)
await handler.initialize()
return handler