Skip to content

Commit

Permalink
refactor: artifact download
Browse files Browse the repository at this point in the history
  • Loading branch information
PhotonQuantum committed Aug 31, 2022
1 parent 0e13f8f commit c1e3bde
Showing 1 changed file with 158 additions and 80 deletions.
238 changes: 158 additions & 80 deletions src/artifacts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Artifact download implementation.

use std::borrow::Cow;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::pin::Pin;
Expand All @@ -15,11 +16,10 @@ use reqwest::{Client, Response};
use tap::Pipe;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::sync::mpsc::{unbounded_channel, Receiver};
use tokio::sync::Mutex;
use tokio::sync::Semaphore;
use tokio::sync::mpsc::{unbounded_channel, Receiver, UnboundedSender};
use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
use tokio_util::codec;
use tracing::{debug, info, warn};
use tracing::{debug, info, instrument, warn};
use url::Url;

use crate::common::{Config, Metrics, Task};
Expand Down Expand Up @@ -141,11 +141,152 @@ async fn into_memory_stream(
Ok(futures::stream::iter(vec![Ok(Bytes::from(result))]))
}

/// Download context for a single artifact.
struct DownloadCtx<'a> {
task: Task,
config: Cow<'a, Arc<Config>>,
metrics: Cow<'a, Arc<Metrics>>,
client: Cow<'a, Client>,
processing_task: Cow<'a, Arc<Mutex<HashSet<Url>>>>,
fail_tx: Cow<'a, UnboundedSender<Task>>,
extra: DownloadStageExtra<'a>,
}

/// Extra context for different stages of the download process.
enum DownloadStageExtra<'a> {
Pre {
sem: &'a Arc<Semaphore>,
},
On {
_permit: OwnedSemaphorePermit,
task_hash: Url,
},
}

impl<'a> DownloadCtx<'a> {
/// Do prepare work before actual download.
///
/// This method checks whether there's an ongoing download for the same artifact,
/// limit the number of concurrent downloads, and skip the download if retry_limit is reached.
///
/// After the prepare work has been done, you must call `spawn()` immediately to start the
/// download, or further download tasks will be blocked.
///
/// Returns `None` if the download should be skipped.
#[instrument(skip(self), fields(storage = self.task.storage, origin = self.task.origin, path = self.task.path))]
pub async fn prepare(self) -> Option<DownloadCtx<'static>> {
self.metrics.task_in_queue.dec();

// We need to ensure that the total count of pending tasks doesn't exceed the set limit.
// The income `rx` is already bounded by `max_pending_task`, so it's the retried tasks that
// are the problem.
// If a task is retried and current pending queue is full, this will randomly ignore a
// retried task or an incoming task.

// TODO I don't think the current double queue design is good. We need to prio income over
// retries, i.e. income overtakes retries.
if self.metrics.task_in_queue.get() > self.config.max_pending_task as i64 {
return None;
}

if self.task.retry_limit == 0 {
// The task has been retried too many times. Skip it.
return None;
}

let task_hash = self.task.upstream_url();

{
// Deduplicate tasks.
let mut processing_task = self.processing_task.lock().await;
if processing_task.contains(&task_hash) {
info!("already processing, continue to next task");
return None;
}
processing_task.insert(task_hash.clone());
}

match self.extra {
DownloadStageExtra::Pre { sem } => {
// Wait for concurrency permit.
let permit = Arc::clone(sem).acquire_owned().await.unwrap();

let client = Cow::Owned(self.client.into_owned());
let processing_task = Cow::Owned(self.processing_task.into_owned());
let metrics = Cow::Owned(self.metrics.into_owned());
let fail_tx = Cow::Owned(self.fail_tx.into_owned());
let config = Cow::Owned(self.config.into_owned());

Some(DownloadCtx {
task: self.task,
config,
metrics,
client,
processing_task,
fail_tx,
extra: DownloadStageExtra::On {
_permit: permit,
task_hash,
},
})
}
DownloadStageExtra::On { .. } => unreachable!(),
}
}
}

impl DownloadCtx<'static> {
/// Spawn the download task.
pub fn spawn(self) {
tokio::spawn(self.download());
}
/// Actual download future.
#[instrument(skip(self), fields(storage = self.task.storage, origin = self.task.origin, path = self.task.path))]
async fn download(self) {
info!("start download");
self.metrics.download_counter.inc();

self.metrics.task_download.inc();

let mut task_new = self.task.clone();

info!("begin stream");
let config = self.config.into_owned();
let task_fut = cache_task(self.task, self.client.into_owned(), &config);
let task_fut = tokio::time::timeout(
std::time::Duration::from_secs(config.download_timeout),
task_fut,
);
if let Err(err) = task_fut.await.unwrap_or(Err(Error::Timeout(()))) {
warn!("{:?}, ttl={}", err, task_new.retry_limit);
task_new.retry_limit -= 1;
self.metrics.failed_download_counter.inc();

if !matches!(err, Error::HTTPError(_)) && !matches!(err, Error::TooLarge(_)) {
self.fail_tx.send(task_new).unwrap();
self.metrics.task_in_queue.inc();
}
};

{
let mut processing_task = self.processing_task.lock().await;
match self.extra {
DownloadStageExtra::On { task_hash, .. } => {
processing_task.remove(&task_hash);
}
DownloadStageExtra::Pre { .. } => unreachable!(),
}
}

self.metrics.task_download.dec();
}
}

/// Cache a task.
///
/// This function does the actual caching part.
/// It's called in `download_artifact`, which does something like concurrency control and retries.
async fn process_task(task: Task, client: Client, config: &Config) -> Result<()> {
async fn cache_task(task: Task, client: Client, config: &Config) -> Result<()> {
if client
.head(task.cached_url(config))
.send()
Expand Down Expand Up @@ -265,82 +406,19 @@ pub async fn download_artifacts(
// Apply override rules on the task.
task.apply_override(&config.endpoints.overrides);

metrics.task_in_queue.dec();

// We need to ensure that the total count of pending tasks doesn't exceed the set limit.
// The income `rx` is already bounded by `max_pending_task`, so it's the retried tasks that
// are the problem.
// If a task is retried and current pending queue is full, this will randomly ignore a
// retried task or an incoming task.

// TODO I don't think the current double queue design is good. We need to prio income over
// retries, i.e. income overtakes retries.
if metrics.task_in_queue.get() > config.max_pending_task as i64 {
continue;
let ctx = DownloadCtx {
task,
config: Cow::Borrowed(&config),
metrics: Cow::Borrowed(&metrics),
client: Cow::Borrowed(&client),
processing_task: Cow::Borrowed(&processing_task),
fail_tx: Cow::Borrowed(&fail_tx),
extra: DownloadStageExtra::Pre { sem: &sem },
};

if let Some(ctx) = ctx.prepare().await {
ctx.spawn();
}

if task.retry_limit == 0 {
// The task has been retried too many times. Skip it.
continue;
}

let task_hash = task.upstream_url();

{
// Deduplicate tasks.
let mut processing_task = processing_task.lock().await;
if processing_task.contains(&task_hash) {
info!("already processing, continue to next task");
continue;
}
processing_task.insert(task_hash.clone());
}

info!("start download");
metrics.download_counter.inc();

// Wait for concurrency permit.
let permit = Arc::clone(&sem).acquire_owned().await.unwrap();

let client = client.clone();
let processing_task = processing_task.clone();
let metrics = metrics.clone();
let fail_tx = fail_tx.clone();
let config = config.clone();

metrics.task_download.inc();
// Spawn actual task download task.
tokio::spawn(async move {
let _permit = permit;
let mut task_new = task.clone();

info!("begin stream");
let task_fut = process_task(task, client, &config);
let task_fut = tokio::time::timeout(
std::time::Duration::from_secs(config.download_timeout),
task_fut,
);
if let Err(err) = task_fut.await.unwrap_or(Err(Error::Timeout(()))) {
warn!("{:?}, ttl={}", err, task_new.retry_limit);
task_new.retry_limit -= 1;
metrics.failed_download_counter.inc();

{
let mut processing_task = processing_task.lock().await;
processing_task.remove(&task_hash);
}

if !matches!(err, Error::HTTPError(_)) && !matches!(err, Error::TooLarge(_)) {
fail_tx.send(task_new).unwrap();
metrics.task_in_queue.inc();
}
} else {
let mut processing_task = processing_task.lock().await;
processing_task.remove(&task_hash);
}

metrics.task_download.dec();
});
}

info!("artifact download stop");
Expand Down

0 comments on commit c1e3bde

Please sign in to comment.