Skip to content

Commit

Permalink
feature: make uploading to s3 operate on multiple threads
Browse files Browse the repository at this point in the history
  • Loading branch information
cdxker committed Dec 16, 2024
1 parent 6c34cdc commit a34d7e0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 34 deletions.
11 changes: 5 additions & 6 deletions server/src/data/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2077,9 +2077,8 @@ pub enum EventType {
},
#[display(fmt = "pagefind_indexing_started")]
PagefindIndexingStarted,
#[display(fmt = "pagefind_indexing_progress")]
PagefindIndexingProgress {
files_indexed: usize,
#[display(fmt = "pagefind_indexing_finished")]
PagefindIndexingFinished {
total_files: usize,
},
}
Expand All @@ -2104,7 +2103,7 @@ impl EventType {
EventTypeRequest::CsvJsonlProcessingCheckpoint,
EventTypeRequest::CsvJsonlProcessingCompleted,
EventTypeRequest::PagefindIndexingStarted,
EventTypeRequest::PagefindIndexingProgress,
EventTypeRequest::PagefindIndexingFinished,
]
}
}
Expand Down Expand Up @@ -6614,8 +6613,8 @@ pub enum EventTypeRequest {
CsvJsonlProcessingCompleted,
#[display(fmt = "pagefind_indexing_started")]
PagefindIndexingStarted,
#[display(fmt = "pagefind_indexing_progress")]
PagefindIndexingProgress
#[display(fmt = "pagefind_indexing_finished")]
PagefindIndexingFinished
}

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand Down
61 changes: 33 additions & 28 deletions server/src/operators/pagefind_operator.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::path::PathBuf;

use actix_web::web;
use base64::{engine::general_purpose, Engine};
use pagefind::{api::PagefindIndex, options::PagefindServiceConfig};

use crate::{
data::models::{DatasetConfiguration, EventType, Pool, QdrantChunkMetadata, WorkerEvent},
data::models::{self, DatasetConfiguration, Pool, QdrantChunkMetadata, WorkerEvent},
errors::ServiceError,
operators::{clickhouse_operator::ClickHouseEvent, file_operator::get_pagefind_aws_bucket},
};
Expand Down Expand Up @@ -76,36 +75,42 @@ pub async fn build_index_for_dataset_id(
let total_files = files.len();
log::info!("Uploading {:?} pagefind indexed files to S3", total_files);

for (i, file) in files.iter().enumerate() {
let bucket = get_pagefind_aws_bucket()?;

// WARNING This s3 bucket cannot be default public. put ACL's on this somehow in case
// the user does not want their data to be public.
let futures = files.into_iter().enumerate().map(|(i, file)| -> tokio::task::JoinHandle<Result<(), ServiceError>> {
let mut filename = PathBuf::from("/pagefind");
filename.push(dataset_id.to_string());
filename.push(file.filename.clone());

bucket
.put_object(filename.to_string_lossy().to_string(), &file.contents.clone())
.await
.map_err(|e| {
ServiceError::BadRequest(format!("Could not upload file to S3 {:?}", e))
})?;

event_queue
.send(ClickHouseEvent::WorkerEvent(
WorkerEvent::from_details(
dataset_id,
EventType::PagefindIndexingProgress {
files_indexed: i + 1,
total_files,
},
)
.into(),
))
.await;
log::info!("Uploaded file to s3 {:?}/{:?}", i, total_files);
}

// WARNING This s3 bucket cannot be default public. put ACL's on this somehow in case
// the user does not want their data to be public.
tokio::task::spawn(async move {
let bucket = get_pagefind_aws_bucket()?;
bucket
.put_object(filename.to_string_lossy().to_string(), &file.contents.clone())
.await
.map_err(|e| {
ServiceError::BadRequest(format!("Could not upload file to S3 {:?}", e))
})?;

log::info!("Uploaded file {:?} to S3", i);
Ok(())
})
});

futures::future::join_all(futures).await;

event_queue
.send(ClickHouseEvent::WorkerEvent(
WorkerEvent::from_details(
dataset_id,
models::EventType::PagefindIndexingFinished {
total_files,
},
)
.into(),
))
.await;


Ok(())
}

0 comments on commit a34d7e0

Please sign in to comment.