Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: devflowinc/trieve
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: d6ca5d811d584d466a44594ee207415d8b3459eb
Choose a base ref
..
head repository: devflowinc/trieve
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 5da1466ffb8e372a7f9d943080dc019763b9e4b5
Choose a head ref
Showing with 70 additions and 61 deletions.
  1. +68 −22 server/src/bin/file-worker.rs
  2. +2 −39 server/src/operators/file_operator.rs
90 changes: 68 additions & 22 deletions server/src/bin/file-worker.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ use std::sync::{
Arc,
};
use trieve_server::{
data::models::{self, FileWorkerMessage},
data::models::{self, ChunkGroup, FileWorkerMessage},
errors::ServiceError,
establish_connection, get_env,
handlers::chunk_handler::ChunkReqPayload,
@@ -17,6 +17,7 @@ use trieve_server::{
file_operator::{
create_file_chunks, create_file_query, get_aws_bucket, preprocess_file_to_chunks,
},
group_operator::{create_group_from_file_query, create_groups_query},
},
};

@@ -312,6 +313,70 @@ async fn upload_file(
)
.await?;

let file_size_mb = (file_data.len() as f64 / 1024.0 / 1024.0).round() as i64;

let created_file = create_file_query(
file_id,
file_size_mb,
file_worker_message.upload_file_data.clone(),
file_worker_message.dataset_id,
web_pool.clone(),
)
.await?;

let group_id = if !file_worker_message
.upload_file_data
.pdf2md_options
.as_ref()
.is_some_and(|options| options.split_headings.unwrap_or(false))
{
let chunk_group = ChunkGroup::from_details(
Some(file_worker_message.upload_file_data.file_name.clone()),
file_worker_message.upload_file_data.description.clone(),
dataset_org_plan_sub.dataset.id,
file_worker_message
.upload_file_data
.group_tracking_id
.clone(),
None,
file_worker_message
.upload_file_data
.tag_set
.clone()
.map(|tag_set| tag_set.into_iter().map(Some).collect()),
);

let chunk_group_option = create_groups_query(vec![chunk_group], true, web_pool.clone())
.await
.map_err(|e| {
log::error!("Could not create group {:?}", e);
ServiceError::BadRequest("Could not create group".to_string())
})?
.pop();

let chunk_group = match chunk_group_option {
Some(group) => group,
None => {
return Err(ServiceError::BadRequest(
"Could not create group from file".to_string(),
));
}
};

let group_id = chunk_group.id;

create_group_from_file_query(group_id, created_file.id, web_pool.clone())
.await
.map_err(|e| {
log::error!("Could not create group from file {:?}", e);
e
})?;

Some(group_id)
} else {
None
};

if file_name.ends_with(".pdf")
&& file_worker_message
.upload_file_data
@@ -382,16 +447,6 @@ async fn upload_file(
}
};

let file_size_mb = (file_data.len() as f64 / 1024.0 / 1024.0).round() as i64;
let created_file = create_file_query(
file_id,
file_size_mb,
file_worker_message.upload_file_data.clone(),
file_worker_message.dataset_id,
web_pool.clone(),
)
.await?;

log::info!("Waiting on Task {}", task_id);
let mut processed_pages = std::collections::HashSet::new();
let mut pagination_token: Option<u32> = None;
@@ -507,6 +562,7 @@ async fn upload_file(
file_worker_message.upload_file_data.clone(),
new_chunks.clone(),
dataset_org_plan_sub.clone(),
group_id,
web_pool.clone(),
event_queue.clone(),
redis_conn.clone(),
@@ -571,17 +627,6 @@ async fn upload_file(
));
}

let file_size_mb = (file_data.len() as f64 / 1024.0 / 1024.0).round() as i64;

let created_file = create_file_query(
file_id,
file_size_mb,
file_worker_message.upload_file_data.clone(),
file_worker_message.dataset_id,
web_pool.clone(),
)
.await?;

if file_worker_message
.upload_file_data
.create_chunks
@@ -637,6 +682,7 @@ async fn upload_file(
file_worker_message.upload_file_data,
chunks,
dataset_org_plan_sub,
group_id,
web_pool.clone(),
event_queue.clone(),
redis_conn,
41 changes: 2 additions & 39 deletions server/src/operators/file_operator.rs
Original file line number Diff line number Diff line change
@@ -183,6 +183,7 @@ pub async fn create_file_chunks(
upload_file_data: UploadFileReqPayload,
mut chunks: Vec<ChunkReqPayload>,
dataset_org_plan_sub: DatasetAndOrgWithSubAndPlan,
group_id: Option<uuid::Uuid>,
pool: web::Data<Pool>,
event_queue: web::Data<EventQueue>,
mut redis_conn: MultiplexedConnection,
@@ -257,47 +258,9 @@ pub async fn create_file_chunks(

chunks = new_chunks;
} else {
let chunk_group = ChunkGroup::from_details(
Some(name.clone()),
upload_file_data.description.clone(),
dataset_org_plan_sub.dataset.id,
upload_file_data.group_tracking_id.clone(),
None,
upload_file_data
.tag_set
.clone()
.map(|tag_set| tag_set.into_iter().map(Some).collect()),
);

let chunk_group_option = create_groups_query(vec![chunk_group], true, pool.clone())
.await
.map_err(|e| {
log::error!("Could not create group {:?}", e);
ServiceError::BadRequest("Could not create group".to_string())
})?
.pop();

let chunk_group = match chunk_group_option {
Some(group) => group,
None => {
return Err(ServiceError::BadRequest(
"Could not create group from file".to_string(),
));
}
};

let group_id = chunk_group.id;

chunks.iter_mut().for_each(|chunk| {
chunk.group_ids = Some(vec![group_id]);
chunk.group_ids = group_id.map(|id| vec![id]);
});

create_group_from_file_query(group_id, created_file_id, pool.clone())
.await
.map_err(|e| {
log::error!("Could not create group from file {:?}", e);
e
})?;
}

let chunk_count = get_row_count_for_organization_id_query(