-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpredict_on_audio.py
423 lines (350 loc) · 12.5 KB
/
predict_on_audio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
"""Script to predict deepsalience output from audio"""
from __future__ import print_function
import argparse
import librosa
import numpy as np
import os
import scipy
import csv
from keras.models import Model
from keras.layers import Dense, Input, Reshape, Lambda
from keras.layers.convolutional import Conv2D
from keras.layers.normalization import BatchNormalization
from keras import backend as K
from keras.models import load_model
TASKS = ['bass', 'melody1', 'melody2', 'melody3', 'multif0', 'pitch', 'vocal']
BINS_PER_OCTAVE = 60
N_OCTAVES = 6
HARMONICS = [0.5, 1, 2, 3, 4, 5]
#HARMONICS = [1, 2]
#SR = 22050
SR = 16000
FMIN = 24.5
#FMIN = 196.0
#HOP_LENGTH = 256
HOP_LENGTH = 160
def compute_hcqt(y,fs):
"""Compute the harmonic CQT from a given audio file
Parameters
----------
y : np.ndarray
audio array
fs : int
sampling rate
Returns
-------
hcqt : np.ndarray
Harmonic cqt
time_grid : np.ndarray
List of time stamps in seconds
freq_grid : np.ndarray
List of frequency values in Hz
"""
#y, fs = librosa.load(audio_fpath, sr=SR)
cqt_list = []
shapes = []
for h in HARMONICS:
cqt = librosa.cqt(
y, sr=fs, hop_length=HOP_LENGTH, fmin=FMIN*float(h),
n_bins=BINS_PER_OCTAVE*N_OCTAVES,
bins_per_octave=BINS_PER_OCTAVE
)
cqt_list.append(cqt)
shapes.append(cqt.shape)
shapes_equal = [s == shapes[0] for s in shapes]
if not all(shapes_equal):
min_time = np.min([s[1] for s in shapes])
new_cqt_list = []
for i in range(len(cqt_list)):
new_cqt_list.append(cqt_list[i][:, :min_time])
cqt_list = new_cqt_list
log_hcqt = ((1.0/80.0) * librosa.core.amplitude_to_db(
np.abs(np.array(cqt_list)), ref=np.max)) + 1.0
freq_grid = librosa.cqt_frequencies(
BINS_PER_OCTAVE*N_OCTAVES, FMIN, bins_per_octave=BINS_PER_OCTAVE
)
time_grid = librosa.core.frames_to_time(
range(log_hcqt.shape[2]), sr=SR, hop_length=HOP_LENGTH
)
return log_hcqt, freq_grid, time_grid
def bkld(y_true, y_pred):
"""KL Divergence where both y_true an y_pred are probabilities
"""
y_true = K.clip(y_true, K.epsilon(), 1.0 - K.epsilon())
y_pred = K.clip(y_pred, K.epsilon(), 1.0 - K.epsilon())
return K.mean(K.mean(
-1.0*y_true* K.log(y_pred) - (1.0 - y_true) * K.log(1.0 - y_pred),
axis=-1), axis=-1)
def model_def():
"""Created compiled Keras model
Returns
-------
model : Model
Compiled keras model
"""
input_shape = (None, None, 6)
inputs = Input(shape=input_shape)
y0 = BatchNormalization()(inputs)
y1 = Conv2D(128, (5, 5), padding='same', activation='relu', name='bendy1')(y0)
y1a = BatchNormalization()(y1)
y2 = Conv2D(64, (5, 5), padding='same', activation='relu', name='bendy2')(y1a)
y2a = BatchNormalization()(y2)
y3 = Conv2D(64, (3, 3), padding='same', activation='relu', name='smoothy1')(y2a)
y3a = BatchNormalization()(y3)
y4 = Conv2D(64, (3, 3), padding='same', activation='relu', name='smoothy2')(y3a)
y4a = BatchNormalization()(y4)
y5 = Conv2D(8, (70, 3), padding='same', activation='relu', name='distribute')(y4a)
y5a = BatchNormalization()(y5)
y6 = Conv2D(1, (1, 1), padding='same', activation='sigmoid', name='squishy')(y5a)
predictions = Lambda(lambda x: K.squeeze(x, axis=3))(y6)
model = Model(inputs=inputs, outputs=predictions)
model.compile(loss=bkld, metrics=['mse'], optimizer='adam')
return model
def load_model(task):
"""Load a precompiled, pretrained model
Parameters
----------
task : str
One of
-'bass'
-'melody1'
-'melody2'
-'melody3'
-'multif0'
-'pitch'
-'vocal'
Returns
-------
model : Model
Pretrained, precompiled Keras model
"""
model = model_def()
if task not in TASKS:
raise ValueError("task must be one of {}".format(TASKS))
weights_path = os.path.join('weights', '{}.h5'.format(task))
if not os.path.exists(weights_path):
raise IOError(
"Cannot find weights path {} for this task.".format(weights_path))
model.load_weights(weights_path)
return model
def get_single_test_prediction(model, input_hcqt):
"""Generate output from a model given an input numpy file
Parameters
----------
model : Model
Pretrained model
input_hcqt : np.ndarray
HCQT
Returns
-------
predicted_output : np.ndarray
Matrix of predictions
"""
input_hcqt = input_hcqt.transpose(1, 2, 0)[np.newaxis, :, :, :]
n_t = input_hcqt.shape[2]
n_slices = 2000
t_slices = list(np.arange(0, n_t, n_slices))
output_list = []
for i, t in enumerate(t_slices):
print(" > {} / {}".format(i + 1, len(t_slices)))
prediction = model.predict(input_hcqt[:, :, t:t+n_slices, :])
output_list.append(prediction[0, :, :])
predicted_output = np.hstack(output_list)
return predicted_output
def get_multif0(pitch_activation_mat, freq_grid, time_grid, thresh=0.3):
"""Compute multif0 output containing all peaks in the output that
fall above thresh
Parameters
----------
pitch_activation_mat : np.ndarray
Deep salience prediction
freq_grid : np.ndarray
Frequency values
time_grid : np.ndarray
Time values
thresh : float, default=0.3
Likelihood threshold
Returns
-------
times : np.ndarray
Time values
freqs : list
List of lists of frequency values
"""
peak_thresh_mat = np.zeros(pitch_activation_mat.shape)
peaks = scipy.signal.argrelmax(pitch_activation_mat, axis=0)
peak_thresh_mat[peaks] = pitch_activation_mat[peaks]
idx = np.where(peak_thresh_mat >= thresh)
est_freqs = [[] for _ in range(len(time_grid))]
for f, t in zip(idx[0], idx[1]):
est_freqs[t].append(freq_grid[f])
est_freqs = [np.array(lst) for lst in est_freqs]
return time_grid, est_freqs
def get_singlef0(pitch_activation_mat, freq_grid, time_grid, thresh=0.3,
use_neg=True):
"""Compute single-f0 output containing the maximum likelihood per time frame.
Frames with no likelihoods above the threshold are given negative values.
Parameters
----------
pitch_activation_mat : np.ndarray
Deep salience prediction
freq_grid : np.ndarray
Frequency values
time_grid : np.ndarray
Time values
thresh : float, default=0.3
Likelihood threshold
use_neg : bool
If True, frames with no value above the threshold the frequency
are given negative values of the frequency with the largest liklihood.
If False, those frames are given the value 0.0
Returns
-------
times : np.ndarray
Time values
freqs : np.ndarray
Frequency values
"""
max_idx = np.argmax(pitch_activation_mat, axis=0)
est_freqs = []
for i, f in enumerate(max_idx):
if pitch_activation_mat[f, i] < thresh:
if use_neg:
est_freqs.append(-1.0*freq_grid[f])
else:
est_freqs.append(0.0)
else:
est_freqs.append(freq_grid[f])
est_freqs = np.array(est_freqs)
return time_grid, est_freqs
def save_multif0_output(times, freqs, output_path):
"""Save multif0 output to a csv file
Parameters
----------
times : np.ndarray
array of time values
freqs : list of lists
list of lists of frequency values
output_path : str
path to save output
"""
with open(output_path, 'w') as fhandle:
csv_writer = csv.writer(fhandle, delimiter='\t')
for t, f in zip(times, freqs):
row = [t]
row.extend(f)
csv_writer.writerow(row)
def save_singlef0_output(times, freqs, output_path):
"""Save singlef0 output to a csv file
Parameters
----------
times : np.ndarray
array of time values
freqs : np.ndarray
array of frequency values
output_path : str
path to save output
"""
with open(output_path, 'w') as fhandle:
csv_writer = csv.writer(fhandle, delimiter='\t')
for t, f in zip(times, freqs):
csv_writer.writerow([t, f])
def compute_output(hcqt, time_grid, freq_grid, task, output_format, threshold,
use_neg, save_dir, save_name):
"""Comput output for a given task
Parameters
----------
hcqt : np.ndarray
harmonic cqt
time_grid : np.ndarray
array of times
freq_grid : np.ndarray
array of frequencies
task : str
which task to compute
output_format : str
specify whehter to save output as singlef0, multif0 or salience
threshold : float
amplitude threshold for multif0 and singlef0 output
use_neg : bool
whether to report negative frequency values in singlef0 output
save_dir : str
Path to folder to save output
save_name : str
Output file basename
"""
model = load_model(task)
print("Computing salience...")
pitch_activation_mat = get_single_test_prediction(model, hcqt)
print("Saving output...")
if output_format == 'singlef0':
times, freqs = get_singlef0(
pitch_activation_mat, freq_grid, time_grid, thresh=threshold,
use_neg=use_neg
)
save_path = os.path.join(
save_dir, "{}_{}_singlef0.csv".format(save_name, task))
save_singlef0_output(times, freqs, save_path)
elif output_format == 'multif0':
times, freqs = get_multif0(
pitch_activation_mat, freq_grid, time_grid, thresh=threshold)
save_path = os.path.join(
save_dir, "{}_{}_multif0.csv".format(save_name, task))
save_multif0_output(times, freqs, save_path)
else:
save_path = os.path.join(
save_dir, "{}_{}_salience.npz".format(save_name, task))
np.savez(save_path, salience=pitch_activation_mat, times=time_grid,
freqs=freq_grid)
print("Done!")
def main(args):
if args.task not in ['all'] + TASKS:
raise ValueError("task must be 'all' or one of {}".format(TASKS))
save_name = os.path.basename(args.audio_fpath).split('.')[0]
# this is slow for long audio files
print("Computing HCQT...")
hcqt, freq_grid, time_grid = compute_hcqt(args.audio_fpath)
if args.task == 'all':
for task in TASKS:
print("[Computing {} output]".format(task))
compute_output(
hcqt, time_grid, freq_grid, task, args.output_format,
args.threshold, args.use_neg, args.save_dir, save_name)
else:
compute_output(
hcqt, time_grid, freq_grid, args.task, args.output_format,
args.threshold, args.use_neg, args.save_dir, save_name)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Predict deep salience output for a given task")
parser.add_argument("audio_fpath",
type=str,
help="Path to input audio file.")
parser.add_argument("task",
type=str,
help="Task to compute one of "
"all, bass, melody1, melody2, melody3, "
"multif0, pitch, vocal.")
parser.add_argument("save_dir",
type=str,
help="Path to folder for saving output")
parser.add_argument("-f", "--output_format",
type=str,
choices=['singlef0', 'multif0', 'salience'],
default='salience',
help="Which format to save output. "
"singlef0 saves a csv of single f0 values. "
"mulif0 saves a csv of multif0 values. "
"salience (default) saves a npz file of the "
"salience matrix.")
parser.add_argument("-t", "--threshold",
type=float,
default=0.3,
help="Amplitude threshold. Only used when "
"output_format is singlef0 or multif0")
parser.add_argument("-n", "--use_neg",
type=bool,
default=True,
help="If True, report unvoiced frames with negative values. "
"This is only used when output_format is singlef0.")
main(parser.parse_args())