From a34d7e001c38f6f971ec17cf8c70d07367a27440 Mon Sep 17 00:00:00 2001 From: cdxker Date: Mon, 16 Dec 2024 12:28:07 -0800 Subject: [PATCH] feature: make uploading to s3 operate on multiple threads --- server/src/data/models.rs | 11 ++-- server/src/operators/pagefind_operator.rs | 61 ++++++++++++----------- 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/server/src/data/models.rs b/server/src/data/models.rs index 955991b2b9..bd650d7ea4 100644 --- a/server/src/data/models.rs +++ b/server/src/data/models.rs @@ -2077,9 +2077,8 @@ pub enum EventType { }, #[display(fmt = "pagefind_indexing_started")] PagefindIndexingStarted, - #[display(fmt = "pagefind_indexing_progress")] - PagefindIndexingProgress { - files_indexed: usize, + #[display(fmt = "pagefind_indexing_finished")] + PagefindIndexingFinished { total_files: usize, }, } @@ -2104,7 +2103,7 @@ impl EventType { EventTypeRequest::CsvJsonlProcessingCheckpoint, EventTypeRequest::CsvJsonlProcessingCompleted, EventTypeRequest::PagefindIndexingStarted, - EventTypeRequest::PagefindIndexingProgress, + EventTypeRequest::PagefindIndexingFinished, ] } } @@ -6614,8 +6613,8 @@ pub enum EventTypeRequest { CsvJsonlProcessingCompleted, #[display(fmt = "pagefind_indexing_started")] PagefindIndexingStarted, - #[display(fmt = "pagefind_indexing_progress")] - PagefindIndexingProgress + #[display(fmt = "pagefind_indexing_finished")] + PagefindIndexingFinished } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/server/src/operators/pagefind_operator.rs b/server/src/operators/pagefind_operator.rs index 4cdf94103d..2dcbae3e33 100644 --- a/server/src/operators/pagefind_operator.rs +++ b/server/src/operators/pagefind_operator.rs @@ -1,11 +1,10 @@ use std::path::PathBuf; use actix_web::web; -use base64::{engine::general_purpose, Engine}; use pagefind::{api::PagefindIndex, options::PagefindServiceConfig}; use crate::{ - data::models::{DatasetConfiguration, EventType, Pool, QdrantChunkMetadata, WorkerEvent}, + data::models::{self, DatasetConfiguration, Pool, QdrantChunkMetadata, WorkerEvent}, errors::ServiceError, operators::{clickhouse_operator::ClickHouseEvent, file_operator::get_pagefind_aws_bucket}, }; @@ -76,36 +75,42 @@ pub async fn build_index_for_dataset_id( let total_files = files.len(); log::info!("Uploading {:?} pagefind indexed files to S3", total_files); - for (i, file) in files.iter().enumerate() { - let bucket = get_pagefind_aws_bucket()?; - - // WARNING This s3 bucket cannot be default public. put ACL's on this somehow in case - // the user does not want their data to be public. + let futures = files.into_iter().enumerate().map(|(i, file)| -> tokio::task::JoinHandle> { let mut filename = PathBuf::from("/pagefind"); filename.push(dataset_id.to_string()); filename.push(file.filename.clone()); - bucket - .put_object(filename.to_string_lossy().to_string(), &file.contents.clone()) - .await - .map_err(|e| { - ServiceError::BadRequest(format!("Could not upload file to S3 {:?}", e)) - })?; - - event_queue - .send(ClickHouseEvent::WorkerEvent( - WorkerEvent::from_details( - dataset_id, - EventType::PagefindIndexingProgress { - files_indexed: i + 1, - total_files, - }, - ) - .into(), - )) - .await; - log::info!("Uploaded file to s3 {:?}/{:?}", i, total_files); - } + + // WARNING This s3 bucket cannot be default public. put ACL's on this somehow in case + // the user does not want their data to be public. + tokio::task::spawn(async move { + let bucket = get_pagefind_aws_bucket()?; + bucket + .put_object(filename.to_string_lossy().to_string(), &file.contents.clone()) + .await + .map_err(|e| { + ServiceError::BadRequest(format!("Could not upload file to S3 {:?}", e)) + })?; + + log::info!("Uploaded file {:?} to S3", i); + Ok(()) + }) + }); + + futures::future::join_all(futures).await; + + event_queue + .send(ClickHouseEvent::WorkerEvent( + WorkerEvent::from_details( + dataset_id, + models::EventType::PagefindIndexingFinished { + total_files, + }, + ) + .into(), + )) + .await; + Ok(()) }