rotary-phone-audio-guestbook/audioInterface.py
Nick Pourazima 30bb0d91f7
feat: add installer script prompts (#15)
add time expired sound
update README
2023-10-09 23:27:53 -04:00

135 lines
4.1 KiB
Python

#! /usr/bin/env python3
import logging
import time
import wave
from typing import List
import pyaudio
from pydub import AudioSegment
from pydub.effects import compress_dynamic_range, normalize
from pydub.scipy_effects import band_pass_filter
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AudioInterface:
def __init__(
self,
hook,
buffer_size,
channels,
format,
sample_rate,
recording_limit,
dev_index,
hook_type,
) -> None:
self.chunk = buffer_size
self.chans = channels
self.format = format
self.frames: List[bytes] = []
self.hook = hook
self.samp_rate = sample_rate
self.recording_limit = recording_limit
self.dev_index = dev_index
self.hook_type = hook_type
self.audio = None
self.stream = None
def init_audio(self):
if self.audio is None:
self.audio = pyaudio.PyAudio()
if self.stream is not None:
self.stream.stop_stream()
self.stream.close()
self.stream = None
def record(self):
self.init_audio()
self.stream = self.audio.open(
format=self.format,
rate=self.samp_rate,
channels=self.chans,
input_device_index=self.dev_index,
input=True,
frames_per_buffer=self.chunk,
)
# loop through stream and append audio chunks to frame array
try:
start = time.time()
while self.off_hook_condition():
if time.time() - start < self.recording_limit:
data = self.stream.read(self.chunk, exception_on_overflow=True)
self.frames.append(data)
else:
# Notify the user that their recording time is up
self.play("time_exceeded.wav")
break
except KeyboardInterrupt:
logger.info("Done recording")
except Exception as e:
logger.error(str(e))
def off_hook_condition(self):
if self.hook_type == "NC":
return not self.hook.is_pressed
else: # Assuming default is "NO" if not "NC"
return self.hook.is_pressed
def play(self, file):
self.init_audio()
with wave.open(file, "rb") as wf:
self.stream = self.audio.open(
format=self.audio.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True,
)
data = wf.readframes(self.chunk)
while data:
self.stream.write(data)
data = wf.readframes(self.chunk)
def stop(self):
if self.stream:
self.stream.stop_stream()
self.stream.close()
if self.audio:
self.audio.terminate()
def close(self, output_file):
try:
with wave.open(output_file, "wb") as wavefile:
wavefile.setnchannels(self.chans)
wavefile.setsampwidth(self.audio.get_sample_size(self.format))
wavefile.setframerate(self.samp_rate)
wavefile.writeframes(b"".join(self.frames))
except OSError as e:
logger.error(f"Error writing to file {output_file}. Error: {e}")
def post_process(self, output_file):
source = AudioSegment.from_wav(output_file + ".wav")
filtered = self.filter_audio(source)
normalized = self.normalize_audio(filtered)
compressed = self.compress_audio(normalized)
normalized.export(output_file + "normalized.wav", format="wav")
compressed.export(output_file + "compressed.mp3", format="mp3")
def filter_audio(self, audio):
logger.info("Filtering...")
return band_pass_filter(audio, 300, 10000)
def normalize_audio(self, audio):
logger.info("Normalizing...")
return normalize(audio)
def compress_audio(self, audio):
logger.info("Compress Dynamic Range")
return compress_dynamic_range(audio)