Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from services.tts import close_all_tts_services
from session import cleanup_inactive_sessions
from utils.http_client import close_http_client
from websocket.handler import handle_websocket_connection

# Module-level cache for HTML content
Expand Down Expand Up @@ -54,6 +55,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:

# Shutdown: Cleanup resources and cancel tasks
await close_all_tts_services()
await close_http_client() # Close shared HTTP client
cleanup_task.cancel()
try:
await cleanup_task
Expand Down
8 changes: 8 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from dotenv import load_dotenv
from loguru import logger

from utils.security import mask_sensitive

# Load environment variables
load_dotenv()

Expand Down Expand Up @@ -144,6 +146,12 @@ def get_service_config(cls, service_type: str) -> Dict[str, Any]:

return config

@classmethod
def get_service_config_masked(cls, service_type: str) -> Dict[str, str]:
"""Get provider-specific configuration with sensitive data masked (for logging)"""
config = cls.get_service_config(service_type)
return mask_sensitive(config)


# Validate configuration
Config.validate()
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ fastapi>=0.109.0
uvicorn>=0.25.0
python-dotenv>=1.0.0
azure-cognitiveservices-speech>=1.31.0
httpx>=0.25.2
httpx[http2]>=0.25.2
openai>=1.11.0
async-timeout>=4.0.3
websockets>=11.0.3
Expand Down
14 changes: 4 additions & 10 deletions services/tts/azure_tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

from config import Config
from services.tts.base import BaseTTSService
from utils.http_client import HTTPClientManager


class AzureTTSService(BaseTTSService):
"""Azure TTS服务实现"""

# 全局资源
_http_client: Optional[httpx.AsyncClient] = None # 共享HTTP客户端
active_tasks: Set[asyncio.Task] = set() # 活动任务集合,用于中断

def __init__(self, subscription_key: str, region: str, voice_name: str = Config.AZURE_TTS_VOICE) -> None:
Expand All @@ -38,14 +38,12 @@ def __init__(self, subscription_key: str, region: str, voice_name: str = Config.

@classmethod
async def get_http_client(cls) -> httpx.AsyncClient:
"""获取或创建HTTP客户端
"""获取共享HTTP客户端(使用连接池)

Returns:
HTTP客户端实例
"""
if cls._http_client is None or (cls._http_client is not None and cls._http_client.is_closed):
cls._http_client = httpx.AsyncClient()
return cls._http_client
return await HTTPClientManager.get_client()

async def synthesize_text(self, text: str, websocket: WebSocket, is_first: bool = False) -> None:
"""将文本合成为语音并发送到客户端
Expand Down Expand Up @@ -249,11 +247,7 @@ async def close_all(cls) -> None:
"""关闭所有TTS资源"""
# 取消所有活动任务
await cls.interrupt_all()

# 关闭HTTP客户端
if cls._http_client is not None and not cls._http_client.is_closed:
await cls._http_client.aclose()
cls._http_client = None
# HTTP client is managed by HTTPClientManager, no need to close here

async def close(self) -> None:
"""关闭TTS服务,释放资源"""
Expand Down
16 changes: 4 additions & 12 deletions services/tts/minimax_tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
from loguru import logger

from services.tts.base import BaseTTSService
from utils.http_client import HTTPClientManager


class MiniMaxTTSService(BaseTTSService):
"""MiniMax TTS服务实现"""

# 全局资源
_http_client: Optional[httpx.AsyncClient] = None # 共享HTTP客户端
active_tasks: Set[asyncio.Task] = set() # 活动任务集合,用于中断

def __init__(self, api_key: str, voice_id: str = "male-qn-qingse") -> None:
Expand Down Expand Up @@ -47,16 +47,12 @@ def __init__(self, api_key: str, voice_id: str = "male-qn-qingse") -> None:

@classmethod
async def get_http_client(cls) -> httpx.AsyncClient:
"""获取或创建HTTP客户端
"""获取共享HTTP客户端(使用连接池)

Returns:
HTTP客户端实例
"""
if cls._http_client is None or (cls._http_client is not None and cls._http_client.is_closed):
# 设置超时参数
timeout = httpx.Timeout(30.0, connect=10.0)
cls._http_client = httpx.AsyncClient(timeout=timeout)
return cls._http_client
return await HTTPClientManager.get_client()

async def synthesize_text(self, text: str, websocket: WebSocket, is_first: bool = False) -> None:
"""将文本合成为语音并发送到客户端
Expand Down Expand Up @@ -369,11 +365,7 @@ async def close_all(cls) -> None:
"""关闭所有MiniMax TTS资源"""
# 中断所有活动任务
await cls.interrupt_all()

# 关闭HTTP客户端
if cls._http_client is not None and not cls._http_client.is_closed:
await cls._http_client.aclose()
cls._http_client = None
# HTTP client is managed by HTTPClientManager, no need to close here

async def close(self) -> None:
"""关闭当前TTS服务实例"""
Expand Down
49 changes: 29 additions & 20 deletions session.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import asyncio
import time
import uuid
from threading import RLock
from typing import Any, Dict, List, Optional

from loguru import logger

from config import Config

# Thread-safe lock for session dictionary
_sessions_lock = RLock()


class SessionState:
"""Manages user session state and pipeline resources"""
Expand Down Expand Up @@ -98,25 +102,27 @@ def _clear_queues(self) -> None:


def get_session(session_id: str) -> SessionState:
"""Get or create session state"""
if session_id not in _sessions:
_sessions[session_id] = SessionState(session_id)

# Update activity timestamp
_sessions[session_id].update_activity()
return _sessions[session_id]
"""Get or create session state (thread-safe)"""
with _sessions_lock:
if session_id not in _sessions:
_sessions[session_id] = SessionState(session_id)
# Update activity timestamp
_sessions[session_id].update_activity()
return _sessions[session_id]


def remove_session(session_id: str) -> None:
"""Remove a session"""
if session_id in _sessions:
del _sessions[session_id]
logger.info(f"Session removed: {session_id}")
"""Remove a session (thread-safe)"""
with _sessions_lock:
if session_id in _sessions:
del _sessions[session_id]
logger.info(f"Session removed: {session_id}")


def get_all_sessions() -> Dict[str, SessionState]:
"""Get all active sessions"""
return _sessions
"""Get a copy of all active sessions (thread-safe)"""
with _sessions_lock:
return _sessions.copy()


async def cleanup_inactive_sessions() -> None:
Expand All @@ -125,17 +131,20 @@ async def cleanup_inactive_sessions() -> None:
try:
await asyncio.sleep(60) # Check every minute

inactive_session_ids = [
session_id
for session_id, state in _sessions.items()
if state.is_inactive()
]
# Get inactive sessions with lock
with _sessions_lock:
inactive_session_ids = [
session_id
for session_id, state in _sessions.items()
if state.is_inactive()
]

for session_id in inactive_session_ids:
logger.info(f"Cleaning up inactive session: {session_id}")
try:
if _sessions[session_id].tts_processor:
await _sessions[session_id].tts_processor.interrupt()
session = get_session(session_id)
if session.tts_processor:
await session.tts_processor.interrupt()
except Exception as e:
logger.error(f"Error interrupting TTS processor: {e}")

Expand Down
34 changes: 33 additions & 1 deletion static/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,28 @@ class EventManager {
ui.EventBinder.bindButtonClick('stop-btn', SessionManager.endConversation);
ui.EventBinder.bindButtonClick('reset-btn', SessionManager.resetConversation);

// 绑定文本输入事件
ui.EventBinder.bindTextInput(
// 输入变化处理 - 根据输入内容和连接状态更新发送按钮
() => {
const hasText = ui.elements.textInput?.value.trim().length > 0;
const isConnected = websocketHandler.getSocket()?.readyState === WebSocket.OPEN;
ui.StateManager.updateSendButtonState(hasText && isConnected);
},
// 提交处理 - 发送文本消息
() => {
const text = ui.elements.textInput?.value.trim();
if (text && websocketHandler.getSocket()?.readyState === WebSocket.OPEN) {
// 发送文本输入命令
websocketHandler.sendCommand('text_input', { text });
// 清空输入框
ui.elements.textInput.value = '';
// 禁用发送按钮
ui.StateManager.updateSendButtonState(false);
}
}
);

// 音频上下文恢复
ui.EventBinder.bindAudioContextResume(() => {
const audioContext = audioProcessor.getAudioContext();
Expand Down Expand Up @@ -255,7 +277,17 @@ class EventManager {
*/
function init() {
try {
websocketHandler.initializeWebSocket(ui.StateManager.updateStatus, ui.elements.startButton);
// 连接成功回调 - 根据输入框内容更新发送按钮状态
const onConnected = () => {
const hasText = ui.elements.textInput?.value.trim().length > 0;
ui.StateManager.updateSendButtonState(hasText);
};

websocketHandler.initializeWebSocket(
ui.StateManager.updateStatus,
ui.elements.startButton,
onConnected
);
EventManager.initialize();
} catch (error) {
console.error('应用初始化错误:', error);
Expand Down
14 changes: 10 additions & 4 deletions static/js/websocket-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ const websocketHandler = {
* 建立与服务器的WebSocket连接,并设置各种事件处理器
* @param {Function} updateStatus - 状态更新函数,用于更新UI状态
* @param {HTMLButtonElement} startButton - 开始按钮元素,用于控制按钮状态
* @param {Function} onConnected - 连接成功回调函数(可选)
*/
initializeWebSocket(updateStatus, startButton) {
initializeWebSocket(updateStatus, startButton, onConnected = null) {
this.statusCallback = updateStatus;
this._onConnectedCallback = onConnected;
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws`;

Expand All @@ -102,6 +104,10 @@ const websocketHandler = {
this._updateStatus('idle', '已连接,准备就绪');
startButton.disabled = false;
audioProcessor.initAudioContext();
// 触发连接成功回调
if (this._onConnectedCallback) {
this._onConnectedCallback();
}
};

// 消息处理
Expand All @@ -116,7 +122,7 @@ const websocketHandler = {
// 延迟重连
setTimeout(() => {
console.log('尝试重新连接...');
this.initializeWebSocket(updateStatus, startButton);
this.initializeWebSocket(updateStatus, startButton, onConnected);
}, WS_CONFIG.RECONNECT_DELAY);
};

Expand Down Expand Up @@ -370,13 +376,13 @@ const websocketHandler = {

/**
* 发送命令到服务器
* @param {string} command - 命令名称
* @param {string} command - 命令名称 (type字段)
* @param {Object} commandData - 命令附加数据
*/
sendCommand(command, commandData = {}) {
if (this.socket?.readyState === WebSocket.OPEN) {
const message = {
command,
type: command, // 后端期望 'type' 字段
...commandData
};
this.socket.send(JSON.stringify(message));
Expand Down
Loading
Loading