Skip to content

Commit e0b9a72

Browse files
committed
improve robustness of recorder implementation
1 parent 8c617cc commit e0b9a72

File tree

2 files changed

+109
-81
lines changed

2 files changed

+109
-81
lines changed

backend/src/discord/recorder.rs

+105-80
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::HashMap;
1717
use std::collections::VecDeque;
1818
use std::env::var;
1919
use std::fmt;
20+
use std::num::Wrapping;
2021
use std::ops::Deref;
2122
use std::path::PathBuf;
2223
use std::process::Stdio;
@@ -25,6 +26,7 @@ use std::time::SystemTime;
2526
use tokio::fs;
2627
use tokio::io::AsyncWriteExt;
2728
use tokio::process::Command;
29+
use tokio::sync::MutexGuard;
2830
use tokio::time::sleep;
2931
use tokio::time::Duration;
3032
use tracing::Instrument;
@@ -78,6 +80,7 @@ struct VoiceRecording {
7880
struct UserData {
7981
user_id: UserId,
8082
last_voice_activity: SystemTime,
83+
last_rtp_timestamp: Wrapping<u32>,
8184
recordings: VecDeque<VoiceRecording>,
8285
}
8386

@@ -110,61 +113,27 @@ impl VoiceEventHandler for GuildRecorderArc {
110113
if let Some(user_id) = user_id {
111114
let is_new_user;
112115
{
113-
let mut users = self.users.write().await;
116+
let users = self.users.read().await;
114117
is_new_user = !users.contains_key(ssrc);
118+
}
119+
debug!(?is_new_user);
115120

116-
if is_new_user {
121+
if is_new_user {
122+
{
123+
let mut users = self.users.write().await;
117124
users.insert(
118125
*ssrc,
119126
Arc::new(Mutex::new(UserData {
120127
user_id: *user_id,
121128
last_voice_activity: SystemTime::now(),
129+
last_rtp_timestamp: Wrapping(0),
122130
recordings: Default::default(),
123131
})),
124132
);
125133
}
126-
}
127-
debug!(?is_new_user);
128134

129-
// When this user was newly added and did not exist before, we spawn a garbage collector
130-
if is_new_user {
131-
let users_lock = self.users.clone();
132-
let ssrc = *ssrc;
133-
let span = span!(Level::INFO, "user_gc", ssrc, ?user_id);
134-
tokio::spawn(
135-
async move {
136-
// Delete the user if no voice activity in the last hour
137-
loop {
138-
trace!("Sleeping");
139-
sleep(Duration::from_secs(60 * 60 * 2)).await;
140-
141-
let delete;
142-
{
143-
let users = users_lock.read().await;
144-
if let Some(user_mutex) = users.get(&ssrc) {
145-
let user = user_mutex.lock().await;
146-
delete = SystemTime::now()
147-
.duration_since(user.last_voice_activity)
148-
.unwrap()
149-
> Duration::from_secs(60 * 60);
150-
} else {
151-
debug!("User already deleted");
152-
break;
153-
}
154-
}
155-
156-
debug!(delete, "Checking timeout of user data");
157-
158-
if delete {
159-
let mut users = users_lock.write().await;
160-
users.remove(&ssrc);
161-
debug!("{} remaining known users", users.len());
162-
break;
163-
}
164-
}
165-
}
166-
.instrument(span),
167-
);
135+
// When this user was newly added and did not exist before, we spawn a garbage collector
136+
self.spawn_user_gc(*ssrc);
168137
}
169138
}
170139
}
@@ -175,38 +144,16 @@ impl VoiceEventHandler for GuildRecorderArc {
175144
if *speaking { "started" } else { "stopped" },
176145
);
177146

178-
let user_lock;
179-
{
180-
let users = self.users.read().await;
181-
user_lock = users.get(ssrc).cloned()?;
182-
}
147+
// Once a user stops speaking, we cleanup their recordings
148+
if !*speaking {
149+
let user_lock;
150+
{
151+
let users = self.users.read().await;
152+
user_lock = users.get(ssrc).cloned()?;
153+
}
183154

184-
if *speaking {
185-
// Create a new recording if the user starts speaking. We assume that all audio packets we receive between
186-
// a user starting and stopping to speak are adjunct.
187155
let mut user = user_lock.lock().await;
188-
user.recordings.push_back(VoiceRecording {
189-
timestamp: SystemTime::now(),
190-
data: Vec::new(),
191-
});
192-
} else {
193-
// Start a timeout for the recording. This starts when the user stops speaking.
194-
let span = span!(Level::INFO, "recording_gc", ssrc);
195-
tokio::spawn(
196-
async move {
197-
sleep(Duration::from_secs(*RECORDING_LENGTH)).await;
198-
199-
let mut user = user_lock.lock().await;
200-
user.recordings
201-
.pop_front()
202-
.expect("Missing element in Deque");
203-
debug!(
204-
remaining_recordings = user.recordings.len(),
205-
"Removed timed out recording"
206-
);
207-
}
208-
.instrument(span),
209-
);
156+
self.cleanup_user_recordings(&mut user);
210157
}
211158
}
212159
Ctx::VoicePacket(VoiceData { audio, packet, .. }) => {
@@ -218,7 +165,7 @@ impl VoiceEventHandler for GuildRecorderArc {
218165
"Audio packet sequence {:05} has {:04} bytes (decompressed from {})",
219166
packet.sequence.0,
220167
audio.len() * std::mem::size_of::<i16>(),
221-
packet.payload.len()
168+
packet.payload.len(),
222169
);
223170

224171
let user_lock;
@@ -227,12 +174,25 @@ impl VoiceEventHandler for GuildRecorderArc {
227174
user_lock = users.get(&packet.ssrc).cloned()?;
228175
}
229176

230-
// Append the audio to the latest recording
231177
let mut user = user_lock.lock().await;
232-
user.last_voice_activity = SystemTime::now();
233-
if let Some(recording) = user.recordings.back_mut() {
234-
recording.data.extend(audio);
178+
179+
if usize::try_from((packet.timestamp.0 - user.last_rtp_timestamp).0).ok()?
180+
* std::mem::size_of::<i16>()
181+
== audio.len()
182+
{
183+
// If this recording is the continuation of the previous one, we simply append
184+
if let Some(recording) = user.recordings.back_mut() {
185+
recording.data.extend(audio);
186+
}
187+
} else {
188+
// If it is a new recording, we create a new entry
189+
user.recordings.push_back(VoiceRecording {
190+
timestamp: SystemTime::now(),
191+
data: audio.clone(),
192+
});
235193
}
194+
user.last_voice_activity = SystemTime::now();
195+
user.last_rtp_timestamp = packet.timestamp.0;
236196
} else {
237197
error!("RTP packet, but no audio. Driver may not be configured to decode.");
238198
}
@@ -255,6 +215,68 @@ impl GuildRecorderArc {
255215
}))
256216
}
257217

218+
/// Spawns a garbage collector thread for the given user. The thread periodically checks whether the user is
219+
/// inactive and removes them if so.
220+
#[instrument(skip(self))]
221+
fn spawn_user_gc(&self, ssrc: u32) {
222+
let users_lock = self.users.clone();
223+
let span = span!(Level::INFO, "user_gc", ssrc);
224+
tokio::spawn(
225+
async move {
226+
// Delete the user if no voice activity in the last hour
227+
loop {
228+
trace!("Sleeping");
229+
sleep(Duration::from_secs(60 * 60 * 2)).await;
230+
231+
let delete;
232+
{
233+
let users = users_lock.read().await;
234+
if let Some(user_mutex) = users.get(&ssrc) {
235+
let user = user_mutex.lock().await;
236+
delete = SystemTime::now()
237+
.duration_since(user.last_voice_activity)
238+
.unwrap()
239+
> Duration::from_secs(60 * 60);
240+
} else {
241+
debug!("User already deleted");
242+
break;
243+
}
244+
}
245+
246+
debug!(delete, "Checking timeout of user data");
247+
248+
if delete {
249+
let mut users = users_lock.write().await;
250+
users.remove(&ssrc);
251+
debug!("{} remaining known users", users.len());
252+
break;
253+
}
254+
}
255+
}
256+
.instrument(span),
257+
);
258+
}
259+
260+
/// Cleans up timed out recordings for a user
261+
#[instrument(skip(self, user))]
262+
fn cleanup_user_recordings(&self, user: &mut MutexGuard<UserData>) -> Option<()> {
263+
while let Some(true) = user.recordings.front().map(|front| {
264+
SystemTime::now().duration_since(front.timestamp).unwrap()
265+
> Duration::from_secs(60) + samples_to_duration(front.data.len())
266+
}) {
267+
user.recordings
268+
.pop_front()
269+
.expect("Missing element in Deque");
270+
}
271+
272+
debug!(
273+
remaining_recordings = user.recordings.len(),
274+
"Removed timed out recordings"
275+
);
276+
277+
Some(())
278+
}
279+
258280
/// Saves the recording to disk
259281
#[instrument(skip(self, cache_and_http), err)]
260282
pub async fn save_recording(&self, cache_and_http: &CacheHttp) -> Result<(), RecordingError> {
@@ -263,7 +285,10 @@ impl GuildRecorderArc {
263285
let users = self.users.read().await;
264286

265287
for user in users.values() {
266-
let user = user.lock().await;
288+
let mut user = user.lock().await;
289+
290+
// Make sure we only consider recordings that are within scope
291+
self.cleanup_user_recordings(&mut user);
267292

268293
if !user.recordings.is_empty() {
269294
recordings.insert(user.user_id, user.recordings.clone());
@@ -375,7 +400,7 @@ impl GuildRecorderArc {
375400
];
376401
let mut child = Command::new("ffmpeg")
377402
.kill_on_drop(true)
378-
.args(&args)
403+
.args(args)
379404
.arg(file.as_os_str())
380405
.stdin(Stdio::piped())
381406
.stdout(Stdio::null())

frontend/angular.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@
119119
}
120120
},
121121
"cli": {
122-
"schematicCollections": ["@angular-eslint/schematics"]
122+
"schematicCollections": [
123+
"@angular-eslint/schematics"
124+
],
125+
"analytics": false
123126
}
124127
}

0 commit comments

Comments
 (0)