-
Notifications
You must be signed in to change notification settings - Fork 0
/
riffpatch.py
executable file
·190 lines (148 loc) · 4.34 KB
/
riffpatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/usr/bin/env python2.7
from __future__ import division
import os
import sys
import json
import re
import wave
from riffcheck import *
dopatch = True
rounded_digits = 3
sampling_rate = 48000
def iround(x):
return int(round(x))
def parse_point(definition, cues):
m = re.match(r'#?([+-]?\d+)$', definition)
if m:
index = int(m.group(1))
if index < 0:
return cues[index]
elif index >= 1:
return cues[index-1]
else:
assert False
m = re.match(r'(\d+)[:h](\d+)[:m](\d+(?:\.\d+)?)s?$', definition)
if m:
h,m,s = m.groups()
h = int(h)
m = int(m)
s = float(s)
if cues:
start = time.localtime(min(cues))
cue = time.mktime((
start.tm_year,
start.tm_mon,
start.tm_mday,
h, m, 0,
0, 0, start.tm_isdst
)) + s
else:
cue = int(h) * 3600 + int(m) * 60 + float(s)
return cue
raise ValueError("can't parse this", definition)
def packchunk(type, *datalist):
datastr = ''.join(
data if isinstance(data, str) else ''.join(data)
for data in datalist
)
datalen = len(datastr)
npadding = (-datalen) % 2
return struct.pack("4sI", type, datalen) + datastr + ("\x00" * npadding)
def packlabl(id, text):
return packchunk('labl',
struct.pack("I", id) + text + "\x00"
)
audiofile = sys.argv[1]
markerfile = sys.argv[2]
audiopoint = sys.argv[3]
markerpoint = sys.argv[4]
# get audio cues
sampling_rate = wave.open(audiofile).getframerate()
print "sampling rate:", sampling_rate
assert os.path.isfile(audiofile)
fb = FileBuffer(audiofile, 'r+b' if dopatch else 'rb')
fp = fb.fp # needed for patching
tree = parse_riff_file(fb)
labelchunks = tree.getchunks(('LIST', 'adtl'), ('list', 'adtl'))
# TODO: explain
assert all( not tree.query(lambda ch: ch.start > labelchunk.start) for labelchunk in labelchunks )
cuechunks = tree.getchunks("cue ")
if not cuechunks:
audiocues = []
audiolabels = []
else:
assert len(cuechunks) == 1
audiocues = cuechunks[0].content
audiocues = [c[4].value for i,c in audiocues.items()]
audiolabels = sorted([(l[1]['name'], l[1]['data']) for l in labelchunks[0].content if l[0] == 'labl'])
audiolabels = zip(*audiolabels)[1]
# get marker cues
assert os.path.isfile(markerfile)
markers = map(json.loads, open(markerfile))
markercues = [m['timestamp'] for m in markers]
markerlabels = [m['type'].encode('utf8') for m in markers]
# parse sync points
audiopoint = parse_point(audiopoint, audiocues)
markerpoint = parse_point(markerpoint, markercues)
offset = audiopoint - markerpoint
# shift marker cues
markercues = [round(c + offset, rounded_digits) for c in markercues]
# audiocues (+labels), markercues + offset (+markerlabels)
mergedcues = dict((c,"[%s] %s:" % (i+1, repr(t))) for i,(c,t) in enumerate(zip(markercues, markerlabels)))
for c,l in zip(audiocues, audiolabels):
if c not in mergedcues:
mergedcues[c] = l
else:
mergedcues[c] += " / " + l
if audiopoint in mergedcues:
mergedcues[audiopoint] += " / sync"
else:
mergedcues[audiopoint] = "sync"
print
for k,v in sorted(mergedcues.items()):
print "%8.3f s -> %s" % (k, v)
print
# patch into audio file
icues = [iround(c * sampling_rate) for c,l in sorted(mergedcues.items())]
ilabels = [l for c,l in sorted(mergedcues.items())]
cuechunk = packchunk("cue ",
struct.pack("I", len(icues)),
(
struct.pack("II4sIII", i+1, v, 'data', 0, 0, v)
for i,v in enumerate(icues)
)
)
adtlchunk = packchunk("LIST", "adtl",
[
packlabl(i+1, l)
for i,l in enumerate(ilabels)
]
)
# JUNK-out old chunks, append new, update RIFF chunk size
if dopatch:
raw_input("hit enter to continue")
# just overwrite outdated chunks
tokill = tree.getchunks('cue ', ('LIST', 'adtl'), ('list', 'adtl'))
for chunk in tokill:
fp.seek(chunk.start)
fp.write(packchunk("JUNK", (chunk.length - 8) * '\x00'))
fp.seek(4)
(oldlen,) = struct.unpack("I", fp.read(4))
fp.seek(0, os.SEEK_END)
assert oldlen == fp.tell() - 8
fp.write(cuechunk)
fp.write(adtlchunk)
newlen = fp.tell()
fp.seek(4)
fp.write(struct.pack("I", newlen - 8))
# TODO
# * open as r+ for patching
# * chunk += "modified" flag, set/reset
# * riff chunk un/serializable (write back if modified)
# * UI
# * show markers in file
# * imported markers
# * synchronize (give equal times -> offset)
# * merge markers, |ta-tb| < dt = 0.01s ?
# * re-layouting for chunks that grew or shrank: move chunk if needed (read+write), insert JUNK
# * fixed and moving chunks -> if it's rewritten anyway, move freely