Skip to content

Commit

Permalink
Harden parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
desmondcheongzx committed Aug 15, 2024
1 parent c74dd3f commit 69ed8e7
Showing 1 changed file with 63 additions and 2 deletions.
65 changes: 63 additions & 2 deletions src/arrow2/src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,23 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>(
decoder,
additional,
)?;
assert!(
*rows_remaining >= nested.len() - existing,
"Rows remaining ({}) is less than the number of new rows seen ({}). Please file an issue.",
*rows_remaining,
nested.len() - existing,
);
assert!(
nested.len() <= chunk_size,
"Number of rows in the chunk ({}) exceeds the chunk size ({}). Please file an issue.",
nested.len(),
chunk_size,
);
*rows_remaining -= nested.len() - existing;
items.push_back((nested, decoded));

// If we've filled the current chunk, but there are rows remaining in the current page, start
// filling up new chunks.
while page.len() > 0 && (*rows_remaining > 0 || *values_remaining > 0) {
let additional = chunk_size.min(*rows_remaining);

Expand All @@ -407,12 +421,40 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>(
decoder,
additional,
)?;
assert!(
*rows_remaining >= nested.len(),
"Rows remaining ({}) is less than the number of new rows seen ({}). Please file an issue.",
*rows_remaining,
nested.len(),
);
assert!(
nested.len() <= chunk_size,
"Number of rows in the chunk ({}) exceeds the chunk size ({}). Please file an issue.",
nested.len(),
chunk_size,
);
*rows_remaining -= nested.len();
items.push_back((nested, decoded));
}
Ok(())
}

/// Helper function that fills a chunk with nested values decoded from the current page. At most
/// `additional` values will be added to the current chunk.
///
///
/// # Arguments
///
/// * `page` - The repetition and definition levels for the current Parquet page.
/// * `values_state` - The state of our nested values.
/// * `nested` - The state of our nested data types.
/// * `rows_remaining` - The global number of top-level rows that remain in the current row group.
/// * `values_remaining` - The global number of leaf values that remain in the current row group.
/// * `decoded` - The state of our decoded values.
/// * `decoder` - The decoder for the leaf-level type.
/// * `additional` - The number of top-level rows to read for the current chunk. This is the
/// min of `chunk size - number of rows existing in the current chunk` and
/// `rows_remaining`.
#[allow(clippy::too_many_arguments)]
fn extend_offsets2<'a, D: NestedDecoder<'a>>(
page: &mut NestedPage<'a>,
Expand All @@ -424,6 +466,13 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(
decoder: &D,
additional: usize,
) -> Result<()> {
// Check that we have at least one value (which can be null) per row.
assert!(
*values_remaining >= rows_remaining,
"Values remaining({}) is lower than the number of rows remaining ({}). Please file an issue.",
*values_remaining,
rows_remaining,
);
let max_depth = nested.len();

let mut cum_sum = vec![0u32; max_depth + 1];
Expand Down Expand Up @@ -455,11 +504,11 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(
// or when `rows_remaining <= rows_seen` and we see a new record. In the latter
// case, the remaining values lie outside of the rows we're retrieving, so we
// zero out `values_remaining`.
if rows_remaining <= rows_seen {
if rows_seen >= rows_remaining {
*values_remaining = 0;
break;
}
if rows_seen == additional {
if rows_seen >= additional {
break;
}
rows_seen += 1;
Expand Down Expand Up @@ -500,6 +549,18 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(
}
}
}
assert!(
rows_seen <= rows_remaining,
"Rows seen ({}) is greater than the number of rows remaining ({}). Please file an issue.",
rows_seen,
rows_remaining,
);
assert!(
rows_seen <= additional,
"Rows seen ({}) is greater than the additional number of rows to read in the current data page ({}). Please file an issue.",
rows_seen,
additional,
);
Ok(())
}

Expand Down

0 comments on commit 69ed8e7

Please sign in to comment.