-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvoice.py
More file actions
160 lines (136 loc) · 4.89 KB
/
voice.py
File metadata and controls
160 lines (136 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import sounddevice as sd
import soundfile as sf
import noisereduce as nr
import numpy as np
import os
from datetime import datetime
from pynput import keyboard
from queue import Queue
from scipy.signal import butter, filtfilt
# ---------- SETTINGS ----------
SAMPLE_RATE = 44100
CHANNELS = 1
OUTPUT_FOLDER = "recordings"
GAIN = 1.3 # slight gain after compression
NORMALIZE = True
NOISE_PROFILE_SECONDS = 1 # first second as noise profile
MAX_SECONDS = 60*60 # safety cap (1 hour)
COMP_THRESHOLD = 0.6 # compression threshold
COMP_RATIO = 3.0 # compression ratio
SMOOTH_WINDOW = 500 # smoothing window in samples (optional)
if not os.path.exists(OUTPUT_FOLDER):
os.makedirs(OUTPUT_FOLDER)
# ---------- GLOBAL STATE ----------
is_recording = False
audio_queue = Queue()
stream = None
# ---------- AUDIO CALLBACK ----------
def audio_callback(indata, frames, time, status):
if status:
print(f"Stream status: {status}", flush=True)
audio_queue.put(indata.copy())
# ---------- AUDIO PROCESSING ----------
def highpass_filter(y, sr, cutoff=80):
"""Remove low-frequency hum"""
b, a = butter(1, cutoff / (sr / 2), btype='high')
return filtfilt(b, a, y)
def compress_audio(audio, threshold=COMP_THRESHOLD, ratio=COMP_RATIO):
"""Simple dynamic range compression"""
compressed = np.copy(audio)
above = np.abs(compressed) > threshold
compressed[above] = np.sign(compressed[above]) * (threshold + (np.abs(compressed[above]) - threshold) / ratio)
return compressed
def smooth_audio(y, window_size=SMOOTH_WINDOW):
"""Optional smoothing to stabilize volume further"""
return np.convolve(y, np.ones(window_size)/window_size, mode='same')
def process_audio(audio_np):
audio_np = audio_np.flatten() # mono
audio_np = highpass_filter(audio_np, SAMPLE_RATE, cutoff=80)
# Noise reduction using first second as noise profile
try:
noise_profile = audio_np[:SAMPLE_RATE * NOISE_PROFILE_SECONDS]
enhanced = nr.reduce_noise(y=audio_np, y_noise=noise_profile, sr=SAMPLE_RATE)
except Exception as e:
print("Noise reduction failed:", e)
enhanced = audio_np
# Normalize
if NORMALIZE:
peak = np.max(np.abs(enhanced))
if peak > 0:
enhanced = enhanced / peak
# Compression for stable voice
enhanced = compress_audio(enhanced, threshold=COMP_THRESHOLD, ratio=COMP_RATIO)
# Optional smoothing
enhanced = smooth_audio(enhanced, window_size=SMOOTH_WINDOW)
# Slight gain
enhanced *= GAIN
enhanced = np.clip(enhanced, -1.0, 1.0)
return enhanced
# ---------- STREAM CONTROL ----------
def start_stream(device=None):
global stream, is_recording
if stream is not None:
return
audio_queue.queue.clear()
stream = sd.InputStream(samplerate=SAMPLE_RATE, channels=CHANNELS,
callback=audio_callback, dtype='float32', device=device)
stream.start()
is_recording = True
print("Recording started... press 's' to stop.", flush=True)
def stop_stream_and_process():
global stream, is_recording
if stream is None:
return
stream.stop()
stream.close()
stream = None
is_recording = False
print("Recording stopped. Processing...", flush=True)
frames = []
while not audio_queue.empty():
frames.append(audio_queue.get())
if not frames:
print("No audio captured.", flush=True)
return
audio_np = np.concatenate(frames, axis=0)
max_samples = MAX_SECONDS * SAMPLE_RATE
if audio_np.shape[0] > max_samples:
audio_np = audio_np[:max_samples]
processed_audio = process_audio(audio_np)
# Save WAV file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
outpath = os.path.join(OUTPUT_FOLDER, f"voice_{timestamp}.wav")
sf.write(outpath, processed_audio, SAMPLE_RATE, subtype='PCM_16')
print(f"Saved: {outpath}", flush=True)
# ---------- KEY HANDLER ----------
def on_press(key):
global is_recording
try:
k = key.char.lower()
except AttributeError:
return
if k == 'r' and not is_recording:
start_stream()
elif k == 's' and is_recording:
stop_stream_and_process()
elif k == 'q':
if is_recording:
stop_stream_and_process()
print("Quitting program.")
return False
# ---------- MAIN ----------
def print_devices():
print("\nAvailable audio devices (input):")
devices = sd.query_devices()
for i, d in enumerate(devices):
if d['max_input_channels'] > 0:
default_mark = ""
if i == sd.default.device[0]:
default_mark = " (default)"
print(f" [{i}] {d['name']}{default_mark}")
print()
if __name__ == "__main__":
print("Press 'r' to start recording, 's' to stop & save, 'q' to quit.")
print_devices()
with keyboard.Listener(on_press=on_press) as listener:
listener.join()