Skip to content

Commit

Permalink
feat(playback): use duration returned by annil (#50)
Browse files Browse the repository at this point in the history
* feat(playback): add AnniSource to pass extra data

* fix(playback): send request in new thread

Avoids blocking the caller.
  • Loading branch information
snylonue authored Aug 10, 2024
1 parent 024625e commit 616ccbc
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 33 deletions.
2 changes: 1 addition & 1 deletion anni-playback/src/controls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Controls {

pub fn open(
&self,
source: Box<dyn MediaSource>,
source: Box<dyn AnniSource>,
buffer_signal: Arc<AtomicBool>,
is_preload: bool,
) {
Expand Down
13 changes: 6 additions & 7 deletions anni-playback/src/decoder/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ use symphonia_core::{

use super::opus::OpusDecoder;
use crate::{
controls::*,
cpal_output::{CpalOutput, CpalOutputStream},
types::*,
controls::*, cpal_output::{CpalOutput, CpalOutputStream}, sources::AnniSource, types::*
};

enum PlaybackState {
Expand Down Expand Up @@ -352,10 +350,11 @@ impl Decoder {
/// Opens the given source for playback. Returns a `Playback`
/// for the source.
fn open(
source: Box<dyn MediaSource>,
source: Box<dyn AnniSource>,
buffer_signal: Arc<AtomicBool>,
) -> anyhow::Result<Playback> {
let mss = MediaSourceStream::new(source, Default::default());
let duration_hint = source.duration_hint();
let mss = MediaSourceStream::new(Box::new(source), Default::default());
let format_options = FormatOptions {
enable_gapless: true,
..Default::default()
Expand Down Expand Up @@ -392,7 +391,7 @@ impl Decoder {
let time = timebase.calc_time(ts);
time.seconds * 1000 + (time.frac * 1000.0) as u64
}
_ => 0,
_ => duration_hint.map(|dur| dur * 1000).unwrap_or(0),
};

Ok(Playback {
Expand All @@ -411,7 +410,7 @@ impl Decoder {
/// Returns a preloaded `Playback` and `CpalOutput` when complete.
fn preload(
&self,
source: Box<dyn MediaSource>,
source: Box<dyn AnniSource>,
buffer_signal: Arc<AtomicBool>,
) -> JoinHandle<anyhow::Result<Playback>> {
thread::spawn(move || {
Expand Down
81 changes: 58 additions & 23 deletions anni-playback/src/sources/cached_http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use provider::{AudioQuality, ProviderProxy};

use cache::CacheStore;

use super::AnniSource;

const BUF_SIZE: usize = 1024 * 64; // 64k

pub struct CachedHttpSource {
Expand All @@ -31,13 +33,14 @@ pub struct CachedHttpSource {
is_buffering: Arc<AtomicBool>,
#[allow(unused)]
buffer_signal: Arc<AtomicBool>,
duration: Option<u64>,
}

impl CachedHttpSource {
/// `cache_path` is the path to cache file.
pub fn new(
identifier: TrackIdentifier,
url: impl FnOnce() -> Option<Url>,
url: impl FnOnce() -> Option<(Url, Option<u64>)>,
cache_store: &CacheStore,
client: Client,
buffer_signal: Arc<AtomicBool>,
Expand All @@ -53,6 +56,7 @@ impl CachedHttpSource {
pos: Arc::new(AtomicUsize::new(0)),
is_buffering: Arc::new(AtomicBool::new(false)),
buffer_signal,
duration: None,
});
}
Err(cache) => cache,
Expand All @@ -62,40 +66,52 @@ impl CachedHttpSource {
let is_buffering = Arc::new(AtomicBool::new(true));
let pos = Arc::new(AtomicUsize::new(0));

thread::spawn({
let mut response = client.get(url().ok_or(anyhow!("no audio"))?).send()?;
let (url, duration) = url().ok_or(anyhow!("no audio"))?;

log::debug!("got duration {duration:?}");

thread::spawn({
let mut cache = cache.try_clone()?;
let buf_len = Arc::clone(&buf_len);
let pos = Arc::clone(&pos);
let mut buf = [0; BUF_SIZE];
let is_buffering = Arc::clone(&is_buffering);
let identifier = identifier.clone();

move || loop {
match response.read(&mut buf) {
Ok(0) => {
is_buffering.store(false, Ordering::Release);
log::info!("{identifier} reached eof");
break;
move || {
let mut response = match client.get(url).send() {
Ok(r) => r,
Err(e) => {
log::error!("failed to send request: {e}");
return;
}
Ok(n) => {
let pos = pos.load(Ordering::Acquire);
if let Err(e) = cache.write_all(&buf[..n]) {
log::error!("{e}")
};

loop {
match response.read(&mut buf) {
Ok(0) => {
is_buffering.store(false, Ordering::Release);
log::info!("{identifier} reached eof");
break;
}
Ok(n) => {
let pos = pos.load(Ordering::Acquire);
if let Err(e) = cache.write_all(&buf[..n]) {
log::error!("{e}")
}

log::trace!("wrote {n} bytes to {identifier}");
log::trace!("wrote {n} bytes to {identifier}");

let _ = cache.seek(std::io::SeekFrom::Start(pos as u64));
let _ = cache.flush();
let _ = cache.seek(std::io::SeekFrom::Start(pos as u64));
let _ = cache.flush();

buf_len.fetch_add(n, Ordering::AcqRel);
}
Err(e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => {
log::error!("{e}");
is_buffering.store(false, Ordering::Release);
buf_len.fetch_add(n, Ordering::AcqRel);
}
Err(e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => {
log::error!("{e}");
is_buffering.store(false, Ordering::Release);
}
}
}
}
Expand All @@ -108,6 +124,7 @@ impl CachedHttpSource {
pos,
is_buffering,
buffer_signal,
duration,
})
}
}
Expand Down Expand Up @@ -158,6 +175,12 @@ impl MediaSource for CachedHttpSource {
}
}

impl AnniSource for CachedHttpSource {
fn duration_hint(&self) -> Option<u64> {
self.duration
}
}

pub struct CachedAnnilSource(CachedHttpSource);

impl CachedAnnilSource {
Expand All @@ -179,7 +202,13 @@ impl CachedAnnilSource {
.inspect_err(|e| log::warn!("{e}"))
.ok()
})
.map(|r| r.url().clone());
.map(|response| {
let duration = response
.headers()
.get("X-Duration-Seconds")
.and_then(|v| v.to_str().ok().and_then(|dur| dur.parse().ok()));
(response.url().clone(), duration)
});

CachedHttpSource::new(track, || source.next(), cache_store, client, buffer_signal).map(Self)
}
Expand All @@ -206,3 +235,9 @@ impl MediaSource for CachedAnnilSource {
self.0.byte_len()
}
}

impl AnniSource for CachedAnnilSource {
fn duration_hint(&self) -> Option<u64> {
self.0.duration
}
}
31 changes: 31 additions & 0 deletions anni-playback/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU Lesser General Public License along with this program.
// If not, see <https://www.gnu.org/licenses/>.

use symphonia_core::io::MediaSource;

pub mod cached_http;
pub mod http;
pub mod streamable;
Expand All @@ -24,3 +26,32 @@ struct Receiver {
id: u128,
receiver: std::sync::mpsc::Receiver<(usize, Vec<u8>)>,
}

pub trait AnniSource: MediaSource {
/// The duration of underlying source in seconds.
fn duration_hint(&self) -> Option<u64> {
None
}
}

impl MediaSource for Box<dyn AnniSource> {
fn is_seekable(&self) -> bool {
self.as_ref().is_seekable()
}

fn byte_len(&self) -> Option<u64> {
self.as_ref().byte_len()
}
}

impl AnniSource for std::fs::File {}

// Specialization is not well-supported so far (even the unstable feature is unstable ww).
// Therefore, we do not provide the default implementation below.
// Users can use a newtype pattern if needed.
//
// default impl<T: MediaSource> AnniSource for T {
// fn duration_hint(&self) -> Option<u64> {
// None
// }
// }
6 changes: 4 additions & 2 deletions anni-playback/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::sync::{atomic::AtomicBool, Arc};
pub use crossbeam::channel::{Receiver, Sender};
pub use symphonia_core::io::MediaSource;

pub use crate::sources::AnniSource;

/// Provides the current progress of the player.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ProgressState {
Expand All @@ -30,14 +32,14 @@ pub struct ProgressState {
}

pub(crate) enum InternalPlayerEvent {
Open(Box<dyn MediaSource>, Arc<AtomicBool>),
Open(Box<dyn AnniSource>, Arc<AtomicBool>),
Play,
Pause,
Stop,
/// Called by `cpal_output` in the event the device outputting
/// audio was changed/disconnected.
DeviceChanged,
Preload(Box<dyn MediaSource>, Arc<AtomicBool>),
Preload(Box<dyn AnniSource>, Arc<AtomicBool>),
PlayPreloaded,
}

Expand Down

0 comments on commit 616ccbc

Please sign in to comment.