-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathrecorder.py
110 lines (90 loc) · 3.75 KB
/
recorder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
This file is part of Scott W Harden's "Realtime FFT Audio Visualization with Python"
http://www.swharden.com/blog/2013-05-09-realtime-fft-audio-visualization-with-python/
Visualizations using matplot/pylab have been commented as we use gifs instead.
"""
#import matplotlib
#matplotlib.use('TkAgg') # <-- THIS MAKES IT FAST!
import numpy
import scipy
import struct
import pyaudio
import threading
#import pylab
import struct
class SwhRecorder:
"""Simple, cross-platform class to record from the microphone."""
def __init__(self):
"""minimal garb is executed when class is loaded."""
self.RATE=48100
self.BUFFERSIZE=2**12 #1024 is a good buffer size
self.secToRecord=.1
self.threadsDieNow=False
self.newAudio=False
def setup(self):
"""initialize sound card."""
#TODO - windows detection vs. alsa or something for linux
#TODO - try/except for sound card selection/initiation
self.buffersToRecord=int(self.RATE*self.secToRecord/self.BUFFERSIZE)
if self.buffersToRecord==0: self.buffersToRecord=1
self.samplesToRecord=int(self.BUFFERSIZE*self.buffersToRecord)
self.chunksToRecord=int(self.samplesToRecord/self.BUFFERSIZE)
self.secPerPoint=1.0/self.RATE
self.p = pyaudio.PyAudio()
self.inStream = self.p.open(format=pyaudio.paInt16,channels=1,rate=self.RATE,input=True,frames_per_buffer=self.BUFFERSIZE)
self.xsBuffer=numpy.arange(self.BUFFERSIZE)*self.secPerPoint
self.xs=numpy.arange(self.chunksToRecord*self.BUFFERSIZE)*self.secPerPoint
self.audio=numpy.empty((self.chunksToRecord*self.BUFFERSIZE),dtype=numpy.int16)
def close(self):
"""cleanly back out and release sound card."""
self.p.close(self.inStream)
### RECORDING AUDIO ###
def getAudio(self):
"""get a single buffer size worth of audio."""
audioString=self.inStream.read(self.BUFFERSIZE)
return numpy.fromstring(audioString,dtype=numpy.int16)
def record(self,forever=True):
"""record secToRecord seconds of audio."""
while True:
if self.threadsDieNow: break
for i in range(self.chunksToRecord):
self.audio[i*self.BUFFERSIZE:(i+1)*self.BUFFERSIZE]=self.getAudio()
self.newAudio=True
if forever==False: break
def continuousStart(self):
"""CALL THIS to start running forever."""
self.t = threading.Thread(target=self.record)
self.t.start()
def continuousEnd(self):
"""shut down continuous recording."""
self.threadsDieNow=True
### MATH ###
def downsample(self,data,mult):
"""Given 1D data, return the binned average."""
overhang=len(data)%mult
if overhang: data=data[:-overhang]
data=numpy.reshape(data,(len(data)/mult,mult))
data=numpy.average(data,1)
return data
def fft(self,data=None,trimBy=10,logScale=False,divBy=100):
if data==None:
data=self.audio.flatten()
left,right=numpy.split(numpy.abs(numpy.fft.fft(data)),2)
ys=numpy.add(left,right[::-1])
if logScale:
ys=numpy.multiply(20,numpy.log10(ys))
xs=numpy.arange(self.BUFFERSIZE/2,dtype=float)
if trimBy:
i=int((self.BUFFERSIZE/2)/trimBy)
ys=ys[:i]
xs=xs[:i]*self.RATE/self.BUFFERSIZE
if divBy:
ys=ys/float(divBy)
return xs,ys
### VISUALIZATION ###
#def plotAudio(self):
# """open a matplotlib popup window showing audio data."""
# pylab.plot(self.audio.flatten())
# pylab.show()