Skip to content

Commit

Permalink
bugfix: csv jsonl worker with incomplete byte reads
Browse files Browse the repository at this point in the history
  • Loading branch information
skeptrunedev committed Dec 19, 2024
1 parent 0a92ecc commit 21e5800
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
5 changes: 3 additions & 2 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ minijinja = { version = "2.2.0", features = ["loader", "json"] }
hallucination-detection = { version = "0.1.5", default-features = false, optional = true }
broccoli_queue = "0.1.1"
youtube-transcript = { git = "https://github.com/densumesh/summarizer.git" }
bytes = "1.9.0"


[build-dependencies]
Expand Down
22 changes: 15 additions & 7 deletions server/src/bin/csv-jsonl-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,19 +315,27 @@ async fn process_csv_jsonl_file(

let mut columns = vec![];
let mut line = String::new();
let mut bytes: bytes::BytesMut = bytes::BytesMut::new();
let mut byte_count = 0;
let mut chunk_req_payloads: Vec<ChunkReqPayload> = vec![];
while let Some(chunk) = response_data_stream.bytes().next().await {
let chunk = chunk.map_err(|err| {
let chunk_bytes = chunk.map_err(|err| {
log::error!("Failed to get chunk from stream: {:?}", err);
ServiceError::InternalServerError("Failed to get chunk from stream".to_string())
})?;
let chunk = String::from_utf8(chunk.to_vec()).map_err(|err| {
log::error!("Failed to convert chunk from stream to string: {:?}", err);
ServiceError::InternalServerError(
"Failed to convert chunk from stream to string".to_string(),
)
})?;
bytes.extend_from_slice(&chunk_bytes);
let chunk = match String::from_utf8(bytes.to_vec()) {
Ok(chunk) => {
bytes.clear();
chunk
}
Err(_) => {
log::info!(
"Failed to convert bytes chunk to utf8, continuing with bytes append..."
);
continue;
}
};

byte_count += chunk.len();

Expand Down

0 comments on commit 21e5800

Please sign in to comment.