Skip to content

Commit 3e4cef0

Browse files
committed
feature: make uploading to s3 operate on multiple threads
1 parent 1002c3b commit 3e4cef0

File tree

2 files changed

+38
-34
lines changed

2 files changed

+38
-34
lines changed

server/src/data/models.rs

+5-6
Original file line numberDiff line numberDiff line change
@@ -2077,9 +2077,8 @@ pub enum EventType {
20772077
},
20782078
#[display(fmt = "pagefind_indexing_started")]
20792079
PagefindIndexingStarted,
2080-
#[display(fmt = "pagefind_indexing_progress")]
2081-
PagefindIndexingProgress {
2082-
files_indexed: usize,
2080+
#[display(fmt = "pagefind_indexing_finished")]
2081+
PagefindIndexingFinished {
20832082
total_files: usize,
20842083
},
20852084
}
@@ -2104,7 +2103,7 @@ impl EventType {
21042103
EventTypeRequest::CsvJsonlProcessingCheckpoint,
21052104
EventTypeRequest::CsvJsonlProcessingCompleted,
21062105
EventTypeRequest::PagefindIndexingStarted,
2107-
EventTypeRequest::PagefindIndexingProgress,
2106+
EventTypeRequest::PagefindIndexingFinished,
21082107
]
21092108
}
21102109
}
@@ -6632,8 +6631,8 @@ pub enum EventTypeRequest {
66326631
CsvJsonlProcessingCompleted,
66336632
#[display(fmt = "pagefind_indexing_started")]
66346633
PagefindIndexingStarted,
6635-
#[display(fmt = "pagefind_indexing_progress")]
6636-
PagefindIndexingProgress
6634+
#[display(fmt = "pagefind_indexing_finished")]
6635+
PagefindIndexingFinished
66376636
}
66386637

66396638
#[derive(Debug, Clone, Deserialize, Serialize)]

server/src/operators/pagefind_operator.rs

+33-28
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use std::path::PathBuf;
22

33
use actix_web::web;
4-
use base64::{engine::general_purpose, Engine};
54
use pagefind::{api::PagefindIndex, options::PagefindServiceConfig};
65

76
use crate::{
8-
data::models::{DatasetConfiguration, EventType, Pool, QdrantChunkMetadata, WorkerEvent},
7+
data::models::{self, DatasetConfiguration, Pool, QdrantChunkMetadata, WorkerEvent},
98
errors::ServiceError,
109
operators::{clickhouse_operator::ClickHouseEvent, file_operator::get_pagefind_aws_bucket},
1110
};
@@ -76,36 +75,42 @@ pub async fn build_index_for_dataset_id(
7675
let total_files = files.len();
7776
log::info!("Uploading {:?} pagefind indexed files to S3", total_files);
7877

79-
for (i, file) in files.iter().enumerate() {
80-
let bucket = get_pagefind_aws_bucket()?;
81-
82-
// WARNING This s3 bucket cannot be default public. put ACL's on this somehow in case
83-
// the user does not want their data to be public.
78+
let futures = files.into_iter().enumerate().map(|(i, file)| -> tokio::task::JoinHandle<Result<(), ServiceError>> {
8479
let mut filename = PathBuf::from("/pagefind");
8580
filename.push(dataset_id.to_string());
8681
filename.push(file.filename.clone());
8782

88-
bucket
89-
.put_object(filename.to_string_lossy().to_string(), &file.contents.clone())
90-
.await
91-
.map_err(|e| {
92-
ServiceError::BadRequest(format!("Could not upload file to S3 {:?}", e))
93-
})?;
94-
95-
event_queue
96-
.send(ClickHouseEvent::WorkerEvent(
97-
WorkerEvent::from_details(
98-
dataset_id,
99-
EventType::PagefindIndexingProgress {
100-
files_indexed: i + 1,
101-
total_files,
102-
},
103-
)
104-
.into(),
105-
))
106-
.await;
107-
log::info!("Uploaded file to s3 {:?}/{:?}", i, total_files);
108-
}
83+
84+
// WARNING This s3 bucket cannot be default public. put ACL's on this somehow in case
85+
// the user does not want their data to be public.
86+
tokio::task::spawn(async move {
87+
let bucket = get_pagefind_aws_bucket()?;
88+
bucket
89+
.put_object(filename.to_string_lossy().to_string(), &file.contents.clone())
90+
.await
91+
.map_err(|e| {
92+
ServiceError::BadRequest(format!("Could not upload file to S3 {:?}", e))
93+
})?;
94+
95+
log::info!("Uploaded file {:?} to S3", i);
96+
Ok(())
97+
})
98+
});
99+
100+
futures::future::join_all(futures).await;
101+
102+
event_queue
103+
.send(ClickHouseEvent::WorkerEvent(
104+
WorkerEvent::from_details(
105+
dataset_id,
106+
models::EventType::PagefindIndexingFinished {
107+
total_files,
108+
},
109+
)
110+
.into(),
111+
))
112+
.await;
113+
109114

110115
Ok(())
111116
}

0 commit comments

Comments
 (0)