Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 109 additions & 14 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike};
use derive_more::{Deref, DerefMut};
use itertools::Itertools;
use once_cell::sync::Lazy;
use parquet::{
arrow::ArrowWriter,
basic::Encoding,
Expand All @@ -41,6 +42,7 @@ use parquet::{
schema::types::ColumnPath,
};
use rand::distributions::DistString;
use regex::Regex;
use relative_path::RelativePathBuf;
use tokio::task::JoinSet;
use tracing::{error, info, trace, warn};
Expand Down Expand Up @@ -68,6 +70,41 @@ use super::{
LogStream, ARROW_FILE_EXTENSION,
};

/// Regex pattern for parsing arrow file names.
///
/// # Format
/// The expected format is: `<schema_key>.<front_part>.data.arrows`
/// where:
/// - schema_key: A key that is associated with the timestamp at ingestion, hash of arrow schema and the key-value
/// pairs joined by '&' and '=' (e.g., "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2")
/// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition
/// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76")
///
/// # Limitations
/// - Partition keys and values must only contain alphanumeric characters
/// - Special characters in partition values will cause the pattern to fail in capturing
///
/// # Examples
/// Valid: "key1=value1,key2=value2"
/// Invalid: "key1=special!value,key2=special#value"
static ARROWS_NAME_STRUCTURE: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"^[a-zA-Z0-9&=]+\.(?P<front>\S+)\.data\.arrows$").expect("Validated regex")
});

/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option<PathBuf> {
let filename = path.file_name().unwrap().to_str().unwrap();
let filename = ARROWS_NAME_STRUCTURE
.captures(filename)
.and_then(|c| c.get(1))?
.as_str();
let filename_with_random_number = format!("{filename}.data.{random_string}.parquet");
let mut parquet_path = path.to_owned();
parquet_path.set_file_name(filename_with_random_number);

Some(parquet_path)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
de-sh marked this conversation as resolved.
Comment thread
de-sh marked this conversation as resolved.
Comment on lines +70 to +80
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Format string doesn't match expected parquet file format

The format string used for constructing the parquet filename on line 75 doesn't include the 'data' portion that's present in the arrow filename, but the tests expect it to be included.

Update the format string to maintain the same structure:

fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option<PathBuf> {
    let filename = path.file_stem()?.to_str()?;
    let (_, front) = filename.split_once('.')?;
    assert!(front.contains('.'), "contains the delim `.`");
-    let filename_with_random_number = format!("{front}.{random_string}.parquet");
+    let filename_with_random_number = format!("{front}.data.{random_string}.parquet");
    let mut parquet_path = path.to_owned();
    parquet_path.set_file_name(filename_with_random_number);

    Some(parquet_path)
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option<PathBuf> {
let filename = path.file_stem()?.to_str()?;
let (_, front) = filename.split_once('.')?;
assert!(front.contains('.'), "contains the delim `.`");
let filename_with_random_number = format!("{front}.{random_string}.parquet");
let mut parquet_path = path.to_owned();
parquet_path.set_file_name(filename_with_random_number);
Some(parquet_path)
}
/// Returns the filename for parquet if provided arrows file path is valid as per our expectation
fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option<PathBuf> {
let filename = path.file_stem()?.to_str()?;
let (_, front) = filename.split_once('.')?;
assert!(front.contains('.'), "contains the delim `.`");
let filename_with_random_number = format!("{front}.data.{random_string}.parquet");
let mut parquet_path = path.to_owned();
parquet_path.set_file_name(filename_with_random_number);
Some(parquet_path)
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

front already carries ".data" at the end as asserted above

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@de-sh You're absolutely right. After tracing through the function with examples from the tests, I can see that "front" already contains ".data" at the end.

For example, when processing a file named "abc.date=2023-10-01.hour=12.minute=30.hostname.data.arrows":

  1. file_stem → "abc.date=2023-10-01.hour=12.minute=30.hostname.data"
  2. After split_once('.') → front = "date=2023-10-01.hour=12.minute=30.hostname.data"
  3. Result with format → "date=2023-10-01.hour=12.minute=30.hostname.data.{random_string}.parquet"

The original implementation is correct. Thanks for the clarification!


✏️ Learnings added
Learnt from: de-sh
PR: parseablehq/parseable#1239
File: src/parseable/streams.rs:70-80
Timestamp: 2025-03-24T06:08:47.139Z
Learning: In the Parseable codebase, arrow filenames follow a structure ending with ".data.arrows". When converting to parquet, the "front" part (after the first dot) already includes ".data" at the end, so there's no need to add it explicitly in the arrow_path_to_parquet function.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


#[derive(Debug, thiserror::Error)]
#[error("Stream not found: {0}")]
pub struct StreamNotFound(pub String);
Expand Down Expand Up @@ -187,7 +224,11 @@ impl Stream {
let paths = dir
.flatten()
.map(|file| file.path())
.filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows")))
.filter(|path| {
path.file_name()
.and_then(|f| f.to_str())
.is_some_and(|file_name| ARROWS_NAME_STRUCTURE.is_match(file_name))
})
.sorted_by_key(|f| f.metadata().unwrap().modified().unwrap())
.collect();

Expand Down Expand Up @@ -230,12 +271,13 @@ impl Stream {
&arrow_file_path, self.stream_name
);
remove_file(&arrow_file_path).unwrap();
} else {
let key = Self::arrow_path_to_parquet(&arrow_file_path, &random_string);
} else if let Some(key) = arrow_path_to_parquet(&arrow_file_path, &random_string) {
grouped_arrow_file
.entry(key)
.or_default()
.push(arrow_file_path);
} else {
warn!("Unexpected arrows file: {}", arrow_file_path.display());
}
}
grouped_arrow_file
Expand Down Expand Up @@ -294,17 +336,6 @@ impl Stream {
}
}

fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf {
let filename = path.file_stem().unwrap().to_str().unwrap();
let (_, filename) = filename.split_once('.').unwrap();
assert!(filename.contains('.'), "contains the delim `.`");
let filename_with_random_number = format!("{filename}.{random_string}.arrows");
let mut parquet_path = path.to_owned();
parquet_path.set_file_name(filename_with_random_number);
parquet_path.set_extension("parquet");
parquet_path
}

/// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal`
pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
info!(
Expand Down Expand Up @@ -1199,4 +1230,68 @@ mod tests {
assert_eq!(staging.parquet_files().len(), 2);
assert_eq!(staging.arrow_files().len(), 1);
}

#[test]
fn test_valid_arrow_path_conversion() {
let path = Path::new("/tmp/12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.arrows");
let random_string = "random123";

let result = arrow_path_to_parquet(path, random_string);

assert!(result.is_some());
let parquet_path = result.unwrap();
assert_eq!(
parquet_path.to_str().unwrap(),
"/tmp/date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.random123.parquet"
);
}
Comment on lines +1224 to +1239
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Test doesn't check filename structure expectation properly

The expected output filename in test_valid_arrow_path_conversion is inconsistent with the current implementation of arrow_path_to_parquet. The test assumes a format that includes the "data" component which would be created by format!("{front}.data.{random_string}.parquet"), but the implementation uses format!("{front}.{random_string}.parquet").

Either update the test expectation or change the implementation to match. If you change the implementation, be sure to update line 75 as suggested above.


#[test]
fn test_invalid_arrow_path() {
// Missing the ".data.arrows" suffix
let path = Path::new("/tmp/12345abcde&key1=value1.date=2020-01-21.hour=10.minute=30");
let random_string = "random123";

let result = arrow_path_to_parquet(path, random_string);

assert!(result.is_none());
}
Comment on lines +1224 to +1264
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add negative test cases for arrow_path_to_parquet function

While you've added tests for valid paths, there are no tests for invalid paths where the function should return None.

Add tests for cases where the function should return None:

#[test]
fn test_invalid_arrow_path_without_extension() {
    let temp_dir = TempDir::new().expect("Failed to create temp dir");
    let filename = "invalid_filename_without_extension";
    let file_path = create_test_file(&temp_dir, filename);
    let random_string = "random123";

    let result = arrow_path_to_parquet(&file_path, random_string);
    assert!(result.is_none());
}

#[test]
fn test_invalid_arrow_path_without_delimiters() {
    let temp_dir = TempDir::new().expect("Failed to create temp dir");
    let filename = "invalid_filename.arrows";
    let file_path = create_test_file(&temp_dir, filename);
    let random_string = "random123";

    let result = arrow_path_to_parquet(&file_path, random_string);
    assert!(result.is_none());
}

#[test]
fn test_invalid_arrow_path_without_second_delimiter() {
    let temp_dir = TempDir::new().expect("Failed to create temp dir");
    let filename = "schema.nodelimiter.arrows";
    let file_path = create_test_file(&temp_dir, filename);
    let random_string = "random123";

    // This would fail with the current implementation due to the assert!
    // but should return None if the assert is replaced with a check
    let result = arrow_path_to_parquet(&file_path, random_string);
    assert!(result.is_none());
}


#[test]
fn test_invalid_schema_key() {
// Invalid schema key with special characters
let path =
Path::new("/tmp/12345abcde&key1=value1!.date=2020-01-21.hour=10.minute=30.data.arrows");
let random_string = "random123";

let result = arrow_path_to_parquet(path, random_string);

assert!(result.is_none());
}

#[test]
fn test_complex_path() {
let path = Path::new("/nested/directory/structure/20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2.date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.arrows");
let random_string = "random456";

let result = arrow_path_to_parquet(path, random_string);

assert!(result.is_some());
let parquet_path = result.unwrap();
assert_eq!(
parquet_path.to_str().unwrap(),
"/nested/directory/structure/date=2020-01-21.hour=10.minute=30.region=us-west.ee529ffc8e76.data.random456.parquet"
);
}

#[test]
fn test_empty_front_part() {
// Valid but with empty front part
let path = Path::new("/tmp/schema_key..data.arrows");
let random_string = "random789";

let result = arrow_path_to_parquet(path, random_string);

assert!(result.is_none());
}
}