diff --git a/env.example b/env.example index 84d774970..4af1ead90 100644 --- a/env.example +++ b/env.example @@ -50,9 +50,19 @@ OLLAMA_EMULATING_MODEL_TAG=latest # ENABLE_TABLE_PROCESSING=true # ENABLE_EQUATION_PROCESSING=true +### Audio Processing Configuration (requires: pip install raganything[audio]) +# WHISPER_MODEL=base # tiny/base/small/medium/large-v3 +# WHISPER_LANGUAGE= # Auto-detect if empty. Set to "zh", "en", etc. + +### Video Processing Configuration (requires: pip install raganything[video]) +### Uses SceneDetect for scene boundaries + VLM for visual description + Whisper for audio +# VIDEO_SCENE_THRESHOLD=27.0 # SceneDetect sensitivity (lower = more scenes) +# VIDEO_MIN_SCENE_DURATION=5.0 # Minimum scene duration in seconds +# VIDEO_MAX_SCENES=50 # Maximum scenes to process per video + ### Batch Processing Configuration # MAX_CONCURRENT_FILES=1 -# SUPPORTED_FILE_EXTENSIONS=.pdf,.jpg,.jpeg,.png,.bmp,.tiff,.tif,.gif,.webp,.doc,.docx,.ppt,.pptx,.xls,.xlsx,.txt,.md +# SUPPORTED_FILE_EXTENSIONS=.pdf,.jpg,.jpeg,.png,.bmp,.tiff,.tif,.gif,.webp,.doc,.docx,.ppt,.pptx,.xls,.xlsx,.txt,.md,.mp3,.wav,.flac,.m4a,.ogg # RECURSIVE_FOLDER_PROCESSING=true ### Context Extraction Configuration diff --git a/pyproject.toml b/pyproject.toml index e612fa4e4..f26e0a875 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,8 @@ markdown = [ "weasyprint>=60.0", "pygments>=2.10.0", ] +audio = ["faster-whisper>=1.0.0"] +video = ["scenedetect[opencv]>=0.6.0", "moviepy>=2.0.0", "faster-whisper>=1.0.0", "opencv-python>=4.8.0"] all = [ "Pillow>=10.0.0", "reportlab>=4.0.0", @@ -48,6 +50,10 @@ all = [ "markdown>=3.4.0", "weasyprint>=60.0", "pygments>=2.10.0", + "faster-whisper>=1.0.0", + "scenedetect[opencv]>=0.6.0", + "moviepy>=2.0.0", + "opencv-python>=4.8.0", ] [project.urls] diff --git a/raganything/__init__.py b/raganything/__init__.py index fa8efb10c..86d957b16 100644 --- a/raganything/__init__.py +++ b/raganything/__init__.py @@ -43,6 +43,26 @@ except ImportError: pass +# Optional: audio modal processor (requires faster-whisper). +try: + from .modalprocessors_audio import ( + AudioModalProcessor as AudioModalProcessor, + is_audio_file as is_audio_file, + ) +except ImportError: + # faster-whisper not installed; audio processing unavailable. + pass + +# Optional: video modal processor (requires scenedetect + moviepy + faster-whisper + opencv). +try: + from .modalprocessors_video import ( + VideoModalProcessor as VideoModalProcessor, + is_video_file as is_video_file, + ) +except ImportError: + # Video dependencies not installed; video processing unavailable. + pass + # Optional: multilingual prompt manager. try: from .prompt_manager import ( @@ -97,6 +117,22 @@ ] ) +if "AudioModalProcessor" in globals(): + __all__.extend( + [ + "AudioModalProcessor", + "is_audio_file", + ] + ) + +if "VideoModalProcessor" in globals(): + __all__.extend( + [ + "VideoModalProcessor", + "is_video_file", + ] + ) + if "set_prompt_language" in globals(): __all__.extend( [ diff --git a/raganything/config.py b/raganything/config.py index c1969b396..3151c9f5f 100644 --- a/raganything/config.py +++ b/raganything/config.py @@ -63,7 +63,7 @@ class RAGAnythingConfig: x.strip() for x in get_env_value( "SUPPORTED_FILE_EXTENSIONS", - ".pdf,.jpg,.jpeg,.png,.bmp,.tiff,.tif,.gif,.webp,.doc,.docx,.ppt,.pptx,.xls,.xlsx,.txt,.md", + ".pdf,.jpg,.jpeg,.png,.bmp,.tiff,.tif,.gif,.webp,.doc,.docx,.ppt,.pptx,.xls,.xlsx,.txt,.md,.mp3,.wav,.flac,.m4a,.ogg,.wma,.aac,.opus,.mp4,.mov,.webm,.avi,.mkv,.flv,.wmv,.m4v", str, ).split(",") ] diff --git a/raganything/modalprocessors_audio.py b/raganything/modalprocessors_audio.py new file mode 100644 index 000000000..22e1b4bf1 --- /dev/null +++ b/raganything/modalprocessors_audio.py @@ -0,0 +1,323 @@ +""" +Audio Modal Processor for RAG-Anything + +Processes audio files (MP3, WAV, FLAC, M4A, OGG) by transcribing speech to text +using faster-whisper, then feeding the transcribed text into LightRAG's knowledge graph. + +Supports: +- Speech-to-text transcription with timestamps +- Meeting recordings, phone calls, podcasts, lectures +- Multiple languages (auto-detect or specify) + +Dependencies: + pip install raganything[audio] + # or: pip install faster-whisper +""" + +import json +import logging +import os +import tempfile +from pathlib import Path +from typing import Any, Dict, List, Tuple + +from lightrag.utils import compute_mdhash_id + +from .modalprocessors import BaseModalProcessor +from .prompt import PROMPTS + +logger = logging.getLogger(__name__) + +# Supported audio file extensions +AUDIO_EXTENSIONS = {".mp3", ".wav", ".flac", ".m4a", ".ogg", ".wma", ".aac", ".opus"} + + +def is_audio_file(file_path: str) -> bool: + """Check if a file is a supported audio format.""" + return Path(file_path).suffix.lower() in AUDIO_EXTENSIONS + + +class AudioModalProcessor(BaseModalProcessor): + """Processor for audio content using faster-whisper for transcription. + + Transcribes audio files into timestamped text segments, then processes + them through LightRAG for knowledge graph construction and retrieval. + + Suitable for: + - Meeting recordings + - Phone call recordings + - Podcasts and interviews + - Lectures and presentations + - Voice memos + + Example: + >>> processor = AudioModalProcessor( + ... lightrag=rag_instance, + ... modal_caption_func=caption_func, + ... whisper_model="large-v3", + ... ) + >>> result = await processor.process_multimodal_content( + ... modal_content={"audio_path": "/path/to/meeting.mp3"}, + ... content_type="audio", + ... ) + """ + + def __init__( + self, + lightrag, + modal_caption_func, + context_extractor=None, + whisper_model: str = None, + whisper_device: str = "auto", + whisper_compute_type: str = "auto", + language: str = None, + segment_min_length: int = 30, + ): + """Initialize audio processor. + + Args: + lightrag: LightRAG instance + modal_caption_func: Function for generating descriptions + context_extractor: Context extractor instance + whisper_model: Whisper model size (tiny/base/small/medium/large-v3) + Defaults to env WHISPER_MODEL or "base" + whisper_device: Device for inference ("auto", "cpu", "cuda") + whisper_compute_type: Compute type ("auto", "float16", "int8") + language: Language code (e.g., "zh", "en"). None for auto-detect. + segment_min_length: Minimum segment length in characters to keep + """ + super().__init__(lightrag, modal_caption_func, context_extractor) + + self.whisper_model_name = whisper_model or os.environ.get( + "WHISPER_MODEL", "base" + ) + self.whisper_device = whisper_device + self.whisper_compute_type = whisper_compute_type + self.language = language or os.environ.get("WHISPER_LANGUAGE", None) + self.segment_min_length = segment_min_length + self._whisper_model = None + + @property + def whisper(self): + """Lazy-load whisper model on first use.""" + if self._whisper_model is None: + try: + from faster_whisper import WhisperModel + except ImportError: + raise ImportError( + "faster-whisper is required for audio processing. " + "Install it with: pip install raganything[audio] " + "or: pip install faster-whisper" + ) + + logger.info( + f"Loading whisper model: {self.whisper_model_name} " + f"(device={self.whisper_device})" + ) + self._whisper_model = WhisperModel( + self.whisper_model_name, + device=self.whisper_device, + compute_type=self.whisper_compute_type, + ) + return self._whisper_model + + def transcribe(self, audio_path: str) -> List[Dict[str, Any]]: + """Transcribe audio file to timestamped segments. + + Args: + audio_path: Path to the audio file + + Returns: + List of segments with start, end (seconds) and text + """ + if not Path(audio_path).exists(): + raise FileNotFoundError(f"Audio file not found: {audio_path}") + + logger.info(f"Transcribing audio: {audio_path}") + + segments_iter, info = self.whisper.transcribe( + audio_path, + language=self.language, + vad_filter=True, # Filter out silence + vad_parameters=dict(min_silence_duration_ms=500), + ) + + segments = [] + for segment in segments_iter: + text = segment.text.strip() + if len(text) >= self.segment_min_length: + segments.append( + { + "start": segment.start, + "end": segment.end, + "text": text, + } + ) + + logger.info( + f"Transcription complete: {len(segments)} segments, " + f"language={info.language}, duration={info.duration:.1f}s" + ) + return segments + + def _format_timestamp(self, seconds: float) -> str: + """Format seconds to HH:MM:SS or MM:SS.""" + h = int(seconds // 3600) + m = int((seconds % 3600) // 60) + s = int(seconds % 60) + if h > 0: + return f"{h}:{m:02d}:{s:02d}" + return f"{m}:{s:02d}" + + def _segments_to_text(self, segments: List[Dict[str, Any]]) -> str: + """Convert transcription segments to formatted text with timestamps.""" + lines = [] + for seg in segments: + start_str = self._format_timestamp(seg["start"]) + end_str = self._format_timestamp(seg["end"]) + lines.append(f"[{start_str}-{end_str}] {seg['text']}") + return "\n".join(lines) + + async def generate_description_only( + self, + modal_content, + content_type: str, + item_info: Dict[str, Any] = None, + entity_name: str = None, + ) -> Tuple[str, Dict[str, Any]]: + """Generate audio transcription and entity info. + + Args: + modal_content: Audio content dict with 'audio_path' key + content_type: Type of modal content ("audio") + item_info: Item information for context extraction + entity_name: Optional predefined entity name + + Returns: + Tuple of (transcription_text, entity_info) + """ + try: + # Parse audio content + if isinstance(modal_content, str): + try: + content_data = json.loads(modal_content) + except json.JSONDecodeError: + content_data = {"audio_path": modal_content} + else: + content_data = modal_content + + audio_path = content_data.get("audio_path") or content_data.get("img_path") + if not audio_path: + raise ValueError( + f"No audio path provided in modal_content: {modal_content}" + ) + + # Transcribe + segments = self.transcribe(audio_path) + if not segments: + raise RuntimeError(f"No speech detected in audio: {audio_path}") + + # Format transcription + transcription = self._segments_to_text(segments) + + # Generate entity info + filename = Path(audio_path).stem + duration = segments[-1]["end"] if segments else 0 + entity_info = { + "entity_name": entity_name + if entity_name + else f"audio_{filename}", + "entity_type": "audio", + "summary": ( + f"Audio recording ({self._format_timestamp(duration)} duration). " + f"Transcription: {segments[0]['text'][:100]}..." + if segments + else "Empty audio" + ), + } + + return transcription, entity_info + + except Exception as e: + logger.error(f"Error generating audio transcription: {e}") + fallback_entity = { + "entity_name": entity_name + if entity_name + else f"audio_{compute_mdhash_id(str(modal_content))}", + "entity_type": "audio", + "summary": f"Audio content: {str(modal_content)[:100]}", + } + return str(modal_content), fallback_entity + + async def process_multimodal_content( + self, + modal_content, + content_type: str, + file_path: str = "manual_creation", + entity_name: str = None, + item_info: Dict[str, Any] = None, + batch_mode: bool = False, + doc_id: str = None, + chunk_order_index: int = 0, + ) -> Tuple[str, Dict[str, Any]]: + """Process audio content: transcribe and insert into knowledge graph. + + Args: + modal_content: Audio content dict with 'audio_path' key + content_type: Type of modal content ("audio") + file_path: Source file path for attribution + entity_name: Optional entity name + item_info: Item info for context + batch_mode: Whether in batch processing mode + doc_id: Document ID + chunk_order_index: Chunk ordering index + + Returns: + Tuple of (chunk_text, entity_info) + """ + try: + # Generate transcription and entity info + transcription, entity_info = await self.generate_description_only( + modal_content, content_type, item_info, entity_name + ) + + # Parse audio path for chunk formatting + if isinstance(modal_content, str): + try: + content_data = json.loads(modal_content) + except json.JSONDecodeError: + content_data = {"audio_path": modal_content} + else: + content_data = modal_content + + audio_path = content_data.get("audio_path") or content_data.get( + "img_path", "" + ) + + # Build audio chunk text + modal_chunk = ( + f"[Audio Content]\n" + f"Source: {audio_path}\n" + f"Entity: {entity_info['entity_name']}\n" + f"Transcription:\n{transcription}" + ) + + return await self._create_entity_and_chunk( + modal_chunk, + entity_info, + file_path, + batch_mode, + doc_id, + chunk_order_index, + ) + + except Exception as e: + logger.error(f"Error processing audio content: {e}") + fallback_entity = { + "entity_name": entity_name + if entity_name + else f"audio_{compute_mdhash_id(str(modal_content))}", + "entity_type": "audio", + "summary": f"Audio content: {str(modal_content)[:100]}", + } + return str(modal_content), fallback_entity diff --git a/raganything/modalprocessors_video.py b/raganything/modalprocessors_video.py new file mode 100644 index 000000000..c9d298bf7 --- /dev/null +++ b/raganything/modalprocessors_video.py @@ -0,0 +1,511 @@ +""" +Video Modal Processor for RAG-Anything + +Processes video files (MP4, MOV, WebM, AVI, MKV) with dual-channel analysis: +- Visual channel: scene detection + keyframe extraction + VLM description +- Audio channel: audio track extraction + faster-whisper transcription + +Results are merged by timestamp, producing rich text descriptions like: + [0:00-0:30] 画面:展示Q3营收图表 | 语音:本季度同比增长23%... + +Supports: +- Meeting recordings (screen share + voice) +- Lectures/tutorials (slides + narration) +- Product demos (UI operations + voiceover) +- Surveillance/inspection (visual scenes, often no audio) +- Podcasts with video (talking heads + speech) + +Dependencies: + pip install raganything[video] + # or: pip install scenedetect[opencv] moviepy faster-whisper opencv-python +""" + +import json +import logging +import os +import tempfile +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import cv2 +from lightrag.utils import compute_mdhash_id + +from .modalprocessors import BaseModalProcessor +from .modalprocessors_audio import AudioModalProcessor + +logger = logging.getLogger(__name__) + +# Supported video file extensions +VIDEO_EXTENSIONS = {".mp4", ".mov", ".webm", ".avi", ".mkv", ".flv", ".wmv", ".m4v"} + + +def is_video_file(file_path: str) -> bool: + """Check if a file is a supported video format.""" + return Path(file_path).suffix.lower() in VIDEO_EXTENSIONS + + +class VideoModalProcessor(BaseModalProcessor): + """Processor for video content with visual + audio dual-channel analysis. + + Combines: + - SceneDetect for intelligent scene boundary detection + - OpenCV for keyframe extraction + - VLM (via modal_caption_func) for visual description + - faster-whisper for audio transcription + - Timestamp-aligned merging of both channels + + Suitable for: + - Meeting recordings (screen share + discussion) + - Lectures and tutorials (slides + narration) + - Product demos (UI + voiceover) + - Surveillance / inspection videos (visual only) + - Podcasts with video component + + Example: + >>> processor = VideoModalProcessor( + ... lightrag=rag_instance, + ... modal_caption_func=caption_func, + ... whisper_model="large-v3", + ... ) + >>> result = await processor.process_multimodal_content( + ... modal_content={"video_path": "/path/to/meeting.mp4"}, + ... content_type="video", + ... ) + """ + + def __init__( + self, + lightrag, + modal_caption_func, + context_extractor=None, + whisper_model: str = None, + whisper_device: str = "auto", + whisper_compute_type: str = "auto", + language: str = None, + min_scene_duration: float = 5.0, + max_scenes: int = 50, + scene_threshold: float = 27.0, + ): + """Initialize video processor. + + Args: + lightrag: LightRAG instance + modal_caption_func: Function for generating visual descriptions + context_extractor: Context extractor instance + whisper_model: Whisper model size (tiny/base/small/medium/large-v3) + whisper_device: Device for inference ("auto", "cpu", "cuda") + whisper_compute_type: Compute type ("auto", "float16", "int8") + language: Language code for ASR (None for auto-detect) + min_scene_duration: Minimum scene duration in seconds to keep + max_scenes: Maximum number of scenes to process + scene_threshold: ContentDetector threshold (lower = more sensitive) + """ + super().__init__(lightrag, modal_caption_func, context_extractor) + + self.whisper_model_name = whisper_model or os.environ.get( + "WHISPER_MODEL", "base" + ) + self.whisper_device = whisper_device + self.whisper_compute_type = whisper_compute_type + self.language = language or os.environ.get("WHISPER_LANGUAGE", None) + self.min_scene_duration = min_scene_duration + self.max_scenes = max_scenes + self.scene_threshold = scene_threshold + + # Lazy-loaded audio processor (shares whisper model config) + self._audio_processor = None + + @property + def audio_processor(self) -> AudioModalProcessor: + """Get or create audio processor for transcription.""" + if self._audio_processor is None: + self._audio_processor = AudioModalProcessor( + lightrag=self.lightrag, + modal_caption_func=self.modal_caption_func, + whisper_model=self.whisper_model_name, + whisper_device=self.whisper_device, + whisper_compute_type=self.whisper_compute_type, + language=self.language, + segment_min_length=10, + ) + return self._audio_processor + + def _detect_scenes(self, video_path: str) -> List[Tuple[float, float]]: + """Detect scene boundaries using SceneDetect. + + Args: + video_path: Path to video file + + Returns: + List of (start_seconds, end_seconds) tuples + """ + try: + from scenedetect import detect, ContentDetector + except ImportError: + raise ImportError( + "scenedetect is required for video processing. " + "Install it with: pip install raganything[video] " + "or: pip install scenedetect[opencv]" + ) + + scene_list = detect( + video_path, + ContentDetector(threshold=self.scene_threshold), + ) + + scenes = [] + for scene in scene_list: + start = scene[0].get_seconds() + end = scene[1].get_seconds() + duration = end - start + if duration >= self.min_scene_duration: + scenes.append((start, end)) + + # If no scenes detected (e.g. static video), treat as single scene + if not scenes: + cap = cv2.VideoCapture(video_path) + total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) + fps = cap.get(cv2.CAP_PROP_FPS) + cap.release() + if fps > 0: + total_duration = total_frames / fps + scenes = [(0, total_duration)] + + # Limit number of scenes + return scenes[: self.max_scenes] + + def _extract_frame_at(self, video_path: str, timestamp: float) -> Optional[str]: + """Extract a single frame at the given timestamp. + + Args: + video_path: Path to video file + timestamp: Time in seconds + + Returns: + Path to saved frame image, or None on failure + """ + cap = cv2.VideoCapture(video_path) + fps = cap.get(cv2.CAP_PROP_FPS) + if fps <= 0: + cap.release() + return None + + frame_num = int(timestamp * fps) + cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) + ret, frame = cap.read() + cap.release() + + if not ret: + return None + + # Save to temp file + frame_path = os.path.join( + tempfile.gettempdir(), + f"raganything_vframe_{os.getpid()}_{timestamp:.1f}.jpg", + ) + cv2.imwrite(frame_path, frame) + return frame_path + + def _extract_audio_track(self, video_path: str) -> Optional[str]: + """Extract audio track from video file. + + Args: + video_path: Path to video file + + Returns: + Path to extracted audio WAV file, or None if no audio + """ + try: + from moviepy import VideoFileClip + except ImportError: + raise ImportError( + "moviepy is required for video audio extraction. " + "Install it with: pip install raganything[video] " + "or: pip install moviepy" + ) + + audio_path = os.path.join( + tempfile.gettempdir(), + f"raganything_vaudio_{os.getpid()}.wav", + ) + + try: + clip = VideoFileClip(video_path) + if clip.audio is None: + clip.close() + return None + clip.audio.write_audiofile(audio_path, logger=None) + clip.close() + return audio_path + except Exception as e: + logger.warning(f"Failed to extract audio from {video_path}: {e}") + return None + + def _format_timestamp(self, seconds: float) -> str: + """Format seconds to HH:MM:SS or MM:SS.""" + h = int(seconds // 3600) + m = int((seconds % 3600) // 60) + s = int(seconds % 60) + if h > 0: + return f"{h}:{m:02d}:{s:02d}" + return f"{m}:{s:02d}" + + def _get_transcript_in_range( + self, + transcript: List[Dict[str, Any]], + start: float, + end: float, + ) -> str: + """Get transcript text that falls within a time range.""" + texts = [] + for seg in transcript: + # Include segment if it overlaps with the range + if seg["end"] > start and seg["start"] < end: + texts.append(seg["text"]) + return " ".join(texts).strip() + + async def _describe_scenes( + self, video_path: str, scenes: List[Tuple[float, float]] + ) -> List[Dict[str, Any]]: + """Generate VLM descriptions for each scene. + + Args: + video_path: Path to video file + scenes: List of (start, end) tuples + + Returns: + List of scene dicts with start, end, visual description + """ + results = [] + for start, end in scenes: + # Extract frame from middle of scene + mid_time = (start + end) / 2 + frame_path = self._extract_frame_at(video_path, mid_time) + + visual_desc = "" + if frame_path: + try: + # Encode frame to base64 for VLM + import base64 + + with open(frame_path, "rb") as f: + image_base64 = base64.b64encode(f.read()).decode("utf-8") + + prompt = ( + f"Describe this video frame in detail. " + f"This is from a video at approximately " + f"{self._format_timestamp(mid_time)}. " + f"Include: what is shown, any text/UI visible, " + f"people/objects present, and the overall context." + ) + visual_desc = await self.modal_caption_func( + prompt, image_data=image_base64 + ) + except Exception as e: + logger.warning(f"Failed to describe frame at {mid_time:.1f}s: {e}") + finally: + if os.path.exists(frame_path): + os.remove(frame_path) + + results.append({"start": start, "end": end, "visual": visual_desc}) + + return results + + def _merge_channels( + self, + visual_segments: List[Dict[str, Any]], + audio_segments: List[Dict[str, Any]], + ) -> str: + """Merge visual and audio channels by timestamp alignment. + + Args: + visual_segments: Scene descriptions with start/end/visual + audio_segments: Transcription segments with start/end/text + + Returns: + Formatted text with aligned visual + audio per scene + """ + lines = [] + for vs in visual_segments: + start_str = self._format_timestamp(vs["start"]) + end_str = self._format_timestamp(vs["end"]) + + # Find audio transcript in this time range + audio_text = self._get_transcript_in_range( + audio_segments, vs["start"], vs["end"] + ) + + line = f"[{start_str}-{end_str}]" + if vs.get("visual"): + line += f" 画面:{vs['visual']}" + if audio_text: + line += f" | 语音:{audio_text}" + + # Only add if we have at least one channel + if vs.get("visual") or audio_text: + lines.append(line) + + return "\n\n".join(lines) + + async def generate_description_only( + self, + modal_content, + content_type: str, + item_info: Dict[str, Any] = None, + entity_name: str = None, + ) -> Tuple[str, Dict[str, Any]]: + """Generate video description using dual-channel analysis. + + Args: + modal_content: Video content dict with 'video_path' key + content_type: Type of modal content ("video") + item_info: Item information for context + entity_name: Optional predefined entity name + + Returns: + Tuple of (merged_description, entity_info) + """ + try: + # Parse video content + if isinstance(modal_content, str): + try: + content_data = json.loads(modal_content) + except json.JSONDecodeError: + content_data = {"video_path": modal_content} + else: + content_data = modal_content + + video_path = content_data.get("video_path") or content_data.get("img_path") + if not video_path: + raise ValueError( + f"No video path provided in modal_content: {modal_content}" + ) + + if not Path(video_path).exists(): + raise FileNotFoundError(f"Video file not found: {video_path}") + + logger.info(f"Processing video: {video_path}") + + # Step 1: Detect scenes + scenes = self._detect_scenes(video_path) + logger.info(f"Detected {len(scenes)} scenes") + + # Step 2: Visual channel - describe each scene + visual_segments = await self._describe_scenes(video_path, scenes) + + # Step 3: Audio channel - extract and transcribe + audio_segments = [] + audio_path = self._extract_audio_track(video_path) + if audio_path: + try: + audio_segments = self.audio_processor.transcribe(audio_path) + except Exception as e: + logger.warning(f"Audio transcription failed: {e}") + finally: + if os.path.exists(audio_path): + os.remove(audio_path) + + # Step 4: Merge by timestamp + merged_description = self._merge_channels(visual_segments, audio_segments) + + if not merged_description: + merged_description = f"Video file: {Path(video_path).name}" + + # Generate entity info + filename = Path(video_path).stem + total_duration = scenes[-1][1] if scenes else 0 + entity_info = { + "entity_name": entity_name if entity_name else f"video_{filename}", + "entity_type": "video", + "summary": ( + f"Video ({self._format_timestamp(total_duration)} duration, " + f"{len(scenes)} scenes). " + f"{merged_description[:150]}..." + ), + } + + return merged_description, entity_info + + except Exception as e: + logger.error(f"Error generating video description: {e}") + fallback_entity = { + "entity_name": entity_name + if entity_name + else f"video_{compute_mdhash_id(str(modal_content))}", + "entity_type": "video", + "summary": f"Video content: {str(modal_content)[:100]}", + } + return str(modal_content), fallback_entity + + async def process_multimodal_content( + self, + modal_content, + content_type: str, + file_path: str = "manual_creation", + entity_name: str = None, + item_info: Dict[str, Any] = None, + batch_mode: bool = False, + doc_id: str = None, + chunk_order_index: int = 0, + ) -> Tuple[str, Dict[str, Any]]: + """Process video content: analyze and insert into knowledge graph. + + Args: + modal_content: Video content dict with 'video_path' key + content_type: Type of modal content ("video") + file_path: Source file path for attribution + entity_name: Optional entity name + item_info: Item info for context + batch_mode: Whether in batch processing mode + doc_id: Document ID + chunk_order_index: Chunk ordering index + + Returns: + Tuple of (chunk_text, entity_info) + """ + try: + # Generate description using dual-channel analysis + description, entity_info = await self.generate_description_only( + modal_content, content_type, item_info, entity_name + ) + + # Parse video path + if isinstance(modal_content, str): + try: + content_data = json.loads(modal_content) + except json.JSONDecodeError: + content_data = {"video_path": modal_content} + else: + content_data = modal_content + + video_path = content_data.get("video_path") or content_data.get( + "img_path", "" + ) + + # Build video chunk text + modal_chunk = ( + f"[Video Content]\n" + f"Source: {video_path}\n" + f"Entity: {entity_info['entity_name']}\n" + f"Analysis:\n{description}" + ) + + return await self._create_entity_and_chunk( + modal_chunk, + entity_info, + file_path, + batch_mode, + doc_id, + chunk_order_index, + ) + + except Exception as e: + logger.error(f"Error processing video content: {e}") + fallback_entity = { + "entity_name": entity_name + if entity_name + else f"video_{compute_mdhash_id(str(modal_content))}", + "entity_type": "video", + "summary": f"Video content: {str(modal_content)[:100]}", + } + return str(modal_content), fallback_entity diff --git a/raganything/processor.py b/raganything/processor.py index add0de017..a1ac409cd 100644 --- a/raganything/processor.py +++ b/raganything/processor.py @@ -1172,6 +1172,24 @@ def _apply_chunk_template( enhanced_caption=description, ) + elif content_type == "audio": + audio_path = original_item.get("audio_path", original_item.get("img_path", "")) + + return ( + f"[Audio Content]\n" + f"Source: {audio_path}\n" + f"Transcription:\n{description}" + ) + + elif content_type == "video": + video_path = original_item.get("video_path", original_item.get("img_path", "")) + + return ( + f"[Video Content]\n" + f"Source: {video_path}\n" + f"Analysis:\n{description}" + ) + else: # generic or unknown types content = str(original_item.get("content", original_item)) diff --git a/tests/test_audio_processor.py b/tests/test_audio_processor.py new file mode 100644 index 000000000..35572cd5a --- /dev/null +++ b/tests/test_audio_processor.py @@ -0,0 +1,202 @@ +"""Tests for the AudioModalProcessor.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from raganything.modalprocessors_audio import is_audio_file + +# AudioModalProcessor requires faster-whisper for full functionality, +# but we can test it with mocked whisper model +try: + import faster_whisper # noqa: F401 + + HAS_FASTER_WHISPER = True +except ImportError: + HAS_FASTER_WHISPER = False + +from raganything.modalprocessors_audio import AudioModalProcessor + + +class TestIsAudioFile: + """Test audio file detection.""" + + def test_supported_extensions(self): + assert is_audio_file("meeting.mp3") + assert is_audio_file("recording.wav") + assert is_audio_file("podcast.flac") + assert is_audio_file("voice.m4a") + assert is_audio_file("music.ogg") + assert is_audio_file("/path/to/file.WAV") # case insensitive + assert is_audio_file("call.aac") + assert is_audio_file("audio.opus") + + def test_unsupported_extensions(self): + assert not is_audio_file("document.pdf") + assert not is_audio_file("image.png") + assert not is_audio_file("video.mp4") + assert not is_audio_file("text.txt") + assert not is_audio_file("no_extension") + + +class TestAudioModalProcessorInit: + """Test AudioModalProcessor initialization.""" + + def test_default_init(self): + lightrag = MagicMock() + lightrag.tokenizer = None + caption_func = AsyncMock() + + processor = AudioModalProcessor( + lightrag=lightrag, + modal_caption_func=caption_func, + ) + assert processor.whisper_model_name == "base" + assert processor.whisper_device == "auto" + assert processor.language is None + + def test_custom_model(self): + lightrag = MagicMock() + lightrag.tokenizer = None + caption_func = AsyncMock() + + processor = AudioModalProcessor( + lightrag=lightrag, + modal_caption_func=caption_func, + whisper_model="large-v3", + language="zh", + ) + assert processor.whisper_model_name == "large-v3" + assert processor.language == "zh" + + def test_env_override(self, monkeypatch): + monkeypatch.setenv("WHISPER_MODEL", "medium") + monkeypatch.setenv("WHISPER_LANGUAGE", "en") + + lightrag = MagicMock() + lightrag.tokenizer = None + caption_func = AsyncMock() + + processor = AudioModalProcessor( + lightrag=lightrag, + modal_caption_func=caption_func, + ) + assert processor.whisper_model_name == "medium" + assert processor.language == "en" + + +class TestFormatTimestamp: + """Test timestamp formatting.""" + + def setup_method(self): + lightrag = MagicMock() + lightrag.tokenizer = None + self.processor = AudioModalProcessor( + lightrag=lightrag, + modal_caption_func=AsyncMock(), + ) + + def test_seconds_only(self): + assert self.processor._format_timestamp(45) == "0:45" + + def test_minutes_and_seconds(self): + assert self.processor._format_timestamp(125) == "2:05" + + def test_hours(self): + assert self.processor._format_timestamp(3661) == "1:01:01" + + def test_zero(self): + assert self.processor._format_timestamp(0) == "0:00" + + +class TestSegmentsToText: + """Test segment formatting.""" + + def setup_method(self): + lightrag = MagicMock() + lightrag.tokenizer = None + self.processor = AudioModalProcessor( + lightrag=lightrag, + modal_caption_func=AsyncMock(), + ) + + def test_single_segment(self): + segments = [{"start": 0, "end": 30, "text": "Hello world"}] + result = self.processor._segments_to_text(segments) + assert result == "[0:00-0:30] Hello world" + + def test_multiple_segments(self): + segments = [ + {"start": 0, "end": 30, "text": "First segment"}, + {"start": 30, "end": 65, "text": "Second segment"}, + ] + result = self.processor._segments_to_text(segments) + assert "[0:00-0:30] First segment" in result + assert "[0:30-1:05] Second segment" in result + + def test_empty_segments(self): + result = self.processor._segments_to_text([]) + assert result == "" + + +@pytest.mark.asyncio +class TestGenerateDescriptionOnly: + """Test the generate_description_only method.""" + + async def test_file_not_found(self): + lightrag = MagicMock() + lightrag.tokenizer = None + processor = AudioModalProcessor( + lightrag=lightrag, + modal_caption_func=AsyncMock(), + ) + + # Should return fallback on missing file + result, entity_info = await processor.generate_description_only( + {"audio_path": "/nonexistent/file.mp3"}, + "audio", + ) + assert entity_info["entity_type"] == "audio" + + async def test_with_dict_content(self): + lightrag = MagicMock() + lightrag.tokenizer = None + processor = AudioModalProcessor( + lightrag=lightrag, + modal_caption_func=AsyncMock(), + ) + + # Mock transcribe to avoid needing actual audio + mock_segments = [ + {"start": 0, "end": 10, "text": "This is a test transcription"}, + {"start": 10, "end": 20, "text": "Second part of the audio"}, + ] + processor.transcribe = MagicMock(return_value=mock_segments) + + result, entity_info = await processor.generate_description_only( + {"audio_path": "/tmp/test.mp3"}, + "audio", + ) + + assert "[0:00-0:10] This is a test transcription" in result + assert "[0:10-0:20] Second part of the audio" in result + assert entity_info["entity_type"] == "audio" + assert "audio_test" in entity_info["entity_name"] + + async def test_with_string_content(self): + lightrag = MagicMock() + lightrag.tokenizer = None + processor = AudioModalProcessor( + lightrag=lightrag, + modal_caption_func=AsyncMock(), + ) + + mock_segments = [{"start": 0, "end": 5, "text": "Hello"}] + processor.transcribe = MagicMock(return_value=mock_segments) + + # Pass path as string directly + result, entity_info = await processor.generate_description_only( + "/tmp/test.mp3", + "audio", + ) + assert "[0:00-0:05] Hello" in result diff --git a/tests/test_video_processor.py b/tests/test_video_processor.py new file mode 100644 index 000000000..3a713c1bf --- /dev/null +++ b/tests/test_video_processor.py @@ -0,0 +1,240 @@ +"""Tests for the VideoModalProcessor.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from raganything.modalprocessors_video import is_video_file + +try: + import cv2 + import faster_whisper + + HAS_VIDEO_DEPS = True +except ImportError: + HAS_VIDEO_DEPS = False + +from raganything.modalprocessors_video import VideoModalProcessor + + +class TestIsVideoFile: + """Test video file detection.""" + + def test_supported_extensions(self): + assert is_video_file("meeting.mp4") + assert is_video_file("recording.mov") + assert is_video_file("demo.webm") + assert is_video_file("clip.avi") + assert is_video_file("video.mkv") + assert is_video_file("/path/to/file.MP4") # case insensitive + assert is_video_file("stream.flv") + assert is_video_file("screen.m4v") + + def test_unsupported_extensions(self): + assert not is_video_file("document.pdf") + assert not is_video_file("image.png") + assert not is_video_file("audio.mp3") + assert not is_video_file("text.txt") + assert not is_video_file("no_extension") + + +class TestVideoModalProcessorInit: + """Test VideoModalProcessor initialization.""" + + def test_default_init(self): + lightrag = MagicMock() + lightrag.tokenizer = None + caption_func = AsyncMock() + + processor = VideoModalProcessor( + lightrag=lightrag, + modal_caption_func=caption_func, + ) + assert processor.whisper_model_name == "base" + assert processor.min_scene_duration == 5.0 + assert processor.max_scenes == 50 + assert processor.scene_threshold == 27.0 + + def test_custom_config(self): + lightrag = MagicMock() + lightrag.tokenizer = None + caption_func = AsyncMock() + + processor = VideoModalProcessor( + lightrag=lightrag, + modal_caption_func=caption_func, + whisper_model="large-v3", + min_scene_duration=10.0, + max_scenes=20, + scene_threshold=30.0, + ) + assert processor.whisper_model_name == "large-v3" + assert processor.min_scene_duration == 10.0 + assert processor.max_scenes == 20 + assert processor.scene_threshold == 30.0 + + +class TestTimestampFormatting: + """Test timestamp formatting.""" + + def setup_method(self): + lightrag = MagicMock() + lightrag.tokenizer = None + self.processor = VideoModalProcessor( + lightrag=lightrag, + modal_caption_func=AsyncMock(), + ) + + def test_seconds_only(self): + assert self.processor._format_timestamp(45) == "0:45" + + def test_minutes_and_seconds(self): + assert self.processor._format_timestamp(125) == "2:05" + + def test_hours(self): + assert self.processor._format_timestamp(3661) == "1:01:01" + + def test_zero(self): + assert self.processor._format_timestamp(0) == "0:00" + + +class TestGetTranscriptInRange: + """Test transcript time-range filtering.""" + + def setup_method(self): + lightrag = MagicMock() + lightrag.tokenizer = None + self.processor = VideoModalProcessor( + lightrag=lightrag, + modal_caption_func=AsyncMock(), + ) + + def test_overlapping_segments(self): + transcript = [ + {"start": 0, "end": 10, "text": "Hello"}, + {"start": 10, "end": 20, "text": "World"}, + {"start": 20, "end": 30, "text": "Goodbye"}, + ] + result = self.processor._get_transcript_in_range(transcript, 5, 25) + assert "Hello" in result + assert "World" in result + assert "Goodbye" in result + + def test_no_overlap(self): + transcript = [ + {"start": 0, "end": 10, "text": "Hello"}, + {"start": 50, "end": 60, "text": "Later"}, + ] + result = self.processor._get_transcript_in_range(transcript, 20, 40) + assert result == "" + + def test_exact_boundaries(self): + transcript = [ + {"start": 10, "end": 20, "text": "Exact"}, + ] + result = self.processor._get_transcript_in_range(transcript, 10, 20) + assert "Exact" in result + + +class TestMergeChannels: + """Test visual + audio channel merging.""" + + def setup_method(self): + lightrag = MagicMock() + lightrag.tokenizer = None + self.processor = VideoModalProcessor( + lightrag=lightrag, + modal_caption_func=AsyncMock(), + ) + + def test_both_channels(self): + visual = [{"start": 0, "end": 30, "visual": "PPT showing revenue chart"}] + audio = [{"start": 5, "end": 25, "text": "Revenue grew 23%"}] + + result = self.processor._merge_channels(visual, audio) + assert "画面:PPT showing revenue chart" in result + assert "语音:Revenue grew 23%" in result + assert "[0:00-0:30]" in result + + def test_visual_only(self): + visual = [{"start": 0, "end": 30, "visual": "Surveillance footage"}] + audio = [] + + result = self.processor._merge_channels(visual, audio) + assert "画面:Surveillance footage" in result + assert "语音" not in result + + def test_audio_only(self): + visual = [{"start": 0, "end": 30, "visual": ""}] + audio = [{"start": 0, "end": 30, "text": "Just audio content"}] + + result = self.processor._merge_channels(visual, audio) + assert "语音:Just audio content" in result + + def test_multiple_scenes(self): + visual = [ + {"start": 0, "end": 30, "visual": "Scene 1"}, + {"start": 30, "end": 60, "visual": "Scene 2"}, + ] + audio = [ + {"start": 5, "end": 25, "text": "First part"}, + {"start": 35, "end": 55, "text": "Second part"}, + ] + + result = self.processor._merge_channels(visual, audio) + assert "Scene 1" in result + assert "Scene 2" in result + assert "First part" in result + assert "Second part" in result + + def test_empty_both(self): + result = self.processor._merge_channels([], []) + assert result == "" + + +@pytest.mark.asyncio +class TestGenerateDescriptionOnly: + """Test the generate_description_only method.""" + + async def test_missing_file(self): + lightrag = MagicMock() + lightrag.tokenizer = None + processor = VideoModalProcessor( + lightrag=lightrag, + modal_caption_func=AsyncMock(), + ) + + result, entity_info = await processor.generate_description_only( + {"video_path": "/nonexistent/video.mp4"}, + "video", + ) + assert entity_info["entity_type"] == "video" + + async def test_with_mocked_pipeline(self): + lightrag = MagicMock() + lightrag.tokenizer = None + caption_func = AsyncMock(return_value="A person presenting slides") + processor = VideoModalProcessor( + lightrag=lightrag, + modal_caption_func=caption_func, + ) + + # Mock internal methods + processor._detect_scenes = MagicMock(return_value=[(0, 30), (30, 60)]) + processor._describe_scenes = AsyncMock( + return_value=[ + {"start": 0, "end": 30, "visual": "Presenter with slides"}, + {"start": 30, "end": 60, "visual": "Demo of product"}, + ] + ) + processor._extract_audio_track = MagicMock(return_value=None) + + result, entity_info = await processor.generate_description_only( + {"video_path": "/tmp/test.mp4"}, + "video", + ) + + assert "Presenter with slides" in result + assert "Demo of product" in result + assert entity_info["entity_type"] == "video" + assert "video_test" in entity_info["entity_name"]