feat: 🔖 Deploy v1.0

Commented out rotary dialer for now
This commit is contained in:
Nick Pourazima 2022-09-13 22:54:17 -04:00
parent ad784673f7
commit b6202bbaad
2 changed files with 112 additions and 54 deletions

9
config.yaml Normal file
View File

@ -0,0 +1,9 @@
hook_gpio: 22
rotary_gpio: 3
rotary_hold_time: 0.25
rotary_hold_repeat: true
playback_reduction: 16
beep_reduction: 24
sample_rate: 44100
buffer_size: 4096
channels: 2

View File

@ -1,32 +1,39 @@
#! /usr/bin/env python3
import os
import yaml
import pyaudio
import time
import wave
from datetime import datetime
from gpiozero import Button
from multiprocessing import Process
from signal import pause
from pydub.scipy_effects import band_pass_filter
from pydub.effects import normalize, compress_dynamic_range, low_pass_filter, high_pass_filter
from pydub.effects import normalize
from pydub import AudioSegment
from pydub.playback import play
hook = Button(17)
rotaryDial = Button(18, hold_time=0.25, hold_repeat=True)
count = 0
dialed = []
reset_flag = False
with open("config.yaml") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
hook = Button(config["hook_gpio"])
# rotaryDial = Button(pin=config['rotary_gpio'], hold_time=config['rotary_hold_time'], hold_repeat=config['rotary_hold_repeat'])
"""
TODO: These globals are a temp solution for the rotary dialer, would love to not
depend on globals for this logic.
"""
# count = 0
# dialed = []
# reset_flag = False
# TODO: rotary encoder: special key codes trigger certain voicemails
class AudioInterface:
def __init__(self) -> None:
self.audio = pyaudio.PyAudio() # create pyaudio instantiation
self.samp_rate = 44100 # 44.1kHz sampling rate
self.chunk = 4096 # 2^12 samples for buffer
self.chans = 1 # 1 channel
self.audio = pyaudio.PyAudio()
self.samp_rate = config["sample_rate"]
self.chunk = config["buffer_size"]
self.chans = config["channels"]
self.format = pyaudio.paInt16 # 16-bit resolution
self.frames = [] # raw data frames recorded from mic
@ -43,8 +50,9 @@ class AudioInterface:
)
# loop through stream and append audio chunks to frame array
try:
# TODO: either pass hook as object into class, or figure out another cleaner solution
while hook.is_pressed:
data = self.stream.read(self.chunk, exception_on_overflow = True) # Set to False when live
data = self.stream.read(self.chunk, exception_on_overflow=True)
self.frames.append(data)
except KeyboardInterrupt:
print("Done recording")
@ -65,11 +73,12 @@ class AudioInterface:
self.stream.write(data)
data = self.wf.readframes(self.chunk)
def stop(self):
# stop the stream, close it, and terminate the pyaudio instantiation
# stop the stream
self.stream.stop_stream()
# close it
self.stream.close()
# terminate the pyaudio instantiation
self.audio.terminate()
def close(self, outputFile):
@ -80,83 +89,123 @@ class AudioInterface:
wavefile.setframerate(self.samp_rate)
wavefile.writeframes(b"".join(self.frames))
def postProcess(self, outputFile):
source = AudioSegment.from_file(outputFile + ".wav", format="wav")
"""
TODO: Evaluate whether this is worthwhile...
"""
source = AudioSegment.from_wav(outputFile + ".wav")
print("Filtering...")
filtered = band_pass_filter(source, 200, 15000)
filtered = band_pass_filter(source, 300, 10000)
print("Normalizing...")
normalized = normalize(filtered)
# print("Compress Dynamic Range")
# compressed = compress_dynamic_range(normalized)
print("Exporting normalized")
normalized.export(outputFile + "normalized.mp3", format="mp3")
normalized.export(outputFile + "normalized.wav", format="wav")
# print("Exporting compressed")
# compressed.export(outputFile + "compressed.mp3", format="mp3")
print("Finished...")
# Effects.apply_gain_stereo(audio_segment, 3, 3)
def offHook():
audioInterface = AudioInterface()
print("Phone off hook, ready to begin!")
# wait a second for user to place phone to ear
time.sleep(1)
# if dialed and dialed[0] == 0:
audioInterface = AudioInterface()
# playback voice message through speaker
print("Playing voicemail message...")
# audioInterface.play(os.getcwd() + "/sounds/voicemail.wav")
play(AudioSegment.from_wav(os.getcwd() + "/sounds/voicemail.wav"))
play(
AudioSegment.from_wav(
os.path.dirname(os.path.abspath("rotaryGuestBook.py"))
+ "/sounds/voicemail.wav"
)
- config["playback_reduction"]
)
# start recording beep
print("Playing beep...")
# audioInterface.play(os.getcwd() + "/sounds/beep.wav")
play(AudioSegment.from_file(os.getcwd() + "/sounds/beep.wav", format="wav") - 16)
play(
AudioSegment.from_wav(
os.path.dirname(os.path.abspath("rotaryGuestBook.py")) + "/sounds/beep.wav"
)
- config["beep_reduction"]
)
# now, while phone is not off the hook, record audio from the microphone
print("recording")
audioInterface.record()
audioInterface.stop()
outputFile = os.getcwd() + "/recordings/" + f"{datetime.now().isoformat()}"
outputFile = (
os.path.dirname(os.path.abspath("rotaryGuestBook.py"))
+ "/recordings/"
+ f"{datetime.now().isoformat()}"
)
audioInterface.close(outputFile + ".wav")
print("finished recording")
print("spawn postProcessing thread")
Process(target=audioInterface.postProcess, args=(outputFile,)).start()
"""
post processing
"""
# print("spawn postProcessing thread")
# Process(target=audioInterface.postProcess, args=(outputFil e,)).start()
"""
rotary dialer special messages
"""
# if dialed[0:3] == [9,2,7]:
# # play special vm
# play(AudioSegment.from_wav(os.path.dirname(os.path.abspath("rotaryGuestBook.py")) + "/sounds/927.wav") - config['playback_reduction'])
# elif dialed[0:4] == [5,4,5,3]:
# # play special vm
# play(AudioSegment.from_wav(os.path.dirname(os.path.abspath("rotaryGuestBook.py")) + "/sounds/beep.wav") - config['beep_reduction'])
def onHook():
print("Phone on hook. Sleeping...")
# print("Resetting dial list")
# global dialed
# dialed = []
# reset_pulse_counter()
def dialing():
global count, reset_flag
count+=1
print(f"dialing, increment count: {count}")
reset_flag = False
# def dialing():
# if hook.is_pressed:
# global count, reset_flag
# count+=1
# print(f"dialing, increment count: {count}")
# reset_flag = False
def reset():
global count, reset_flag
count = 0
toggle = False
print(f"reset count: {count}")
reset_flag = True
# def reset_pulse_counter():
# global count, reset_flag
# count = 0
# print(f"reset count: {count}")
# reset_flag = True
def held():
if(not reset_flag):
print("holding")
print(count)
global dialed
if (count == 10):
dialed.append(0)
else:
dialed.append(count)
print(f"number dialed: {dialed}")
reset()
# def held():
# if not reset_flag:
# print("holding")
# print(count)
# global dialed
# if (count == 10):
# dialed.append(0)
# else:
# dialed.append(count)
# print(f"number dialed: {dialed}")
# offHook()
# reset_pulse_counter()
def main():
rotaryDial.when_pressed = dialing
rotaryDial.when_held = held
# rotaryDial.when_pressed = dialing
# rotaryDial.when_held = held
hook.when_pressed = offHook
hook.when_released = onHook
pause()