Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4f2da1f
Add hybrid scan reader stubs
mhaseeb123 Apr 10, 2025
343176f
Minor style and docs fix
mhaseeb123 Apr 10, 2025
46d7d11
docstring updates
mhaseeb123 Apr 10, 2025
67ac400
Add more stubs
mhaseeb123 Apr 10, 2025
173ed02
Remove unneeded definitions
mhaseeb123 Apr 10, 2025
52cca2f
Remove more declarations
mhaseeb123 Apr 10, 2025
6be4a4b
Add `setup_page_index` to `hybrid_scan_helpers`
mhaseeb123 Apr 10, 2025
952baec
Add hybrid scan reader's metadata API implementations
mhaseeb123 Apr 11, 2025
133ddd0
Suggestions from code reviews
mhaseeb123 Apr 11, 2025
32535d6
style fix
mhaseeb123 Apr 11, 2025
7e4474d
Suggestions from code review
mhaseeb123 Apr 11, 2025
2111da5
style fix
mhaseeb123 Apr 11, 2025
80c9952
Impl rg pruning with stats in expt PQ reader
mhaseeb123 Apr 22, 2025
8cbe3c6
Merge branch 'branch-25.06' into fea/filter-row-groups-with-stats
mhaseeb123 Apr 30, 2025
10d9cfa
Merge changes
mhaseeb123 Apr 30, 2025
736c76c
Improvements
mhaseeb123 Apr 30, 2025
9b0cdfc
Minor improvements
mhaseeb123 Apr 30, 2025
36973db
Minor improvement
mhaseeb123 Apr 30, 2025
c88e8ad
Minor improvements
mhaseeb123 Apr 30, 2025
9cf6598
Minor improvements
mhaseeb123 Apr 30, 2025
8b4ba9a
fix build error
mhaseeb123 Apr 30, 2025
e314dc6
Apply suggestions from reviews
mhaseeb123 May 1, 2025
eac6235
Use `EXPECT_EQ` instead of `ASSERT_EQ` in tests
mhaseeb123 May 1, 2025
d7f22da
Merge branch 'branch-25.06' into fea/filter-row-groups-with-stats
mhaseeb123 May 2, 2025
890c7a8
Merge branch 'branch-25.06' into fea/filter-row-groups-with-stats
mhaseeb123 May 3, 2025
ca43c1e
Suggestions from code review
mhaseeb123 May 5, 2025
e837f34
Remove the missed source_info from test
mhaseeb123 May 5, 2025
8109a61
Minor improvement
mhaseeb123 May 5, 2025
ff7744d
Minor docs update
mhaseeb123 May 5, 2025
4f49637
Merge branch 'branch-25.06' into fea/filter-row-groups-with-stats
mhaseeb123 May 5, 2025
0cb0264
Use hash set to remove filter columns
mhaseeb123 May 6, 2025
7538030
Revert the file buffer span in test
mhaseeb123 May 6, 2025
0b82253
Merge branch 'branch-25.06' into fea/filter-row-groups-with-stats
bdice May 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,18 @@ class parquet_reader_options {
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
* The `hybrid_scan_reader` also uses this to create `parquet_reader_options` without a source.
*/
explicit parquet_reader_options() = default;

/**
* @brief Creates a parquet_reader_options_builder which will build parquet_reader_options.
* @brief Creates a `parquet_reader_options_builder` to build `parquet_reader_options`.
* By default, build with empty data source info.
*
* @param src Source information to read parquet file
* @return Builder to build reader options
*/
static parquet_reader_options_builder builder(source_info src);
static parquet_reader_options_builder builder(source_info src = source_info{});

/**
* @brief Returns source info.
Expand All @@ -137,8 +139,7 @@ class parquet_reader_options {
[[nodiscard]] source_info const& get_source() const { return _source; }

/**
* @brief Returns true/false depending on whether strings should be converted to categories or
* not.
* @brief Returns boolean depending on whether strings should be converted to categories.
*
* @return `true` if strings should be converted to categories
*/
Expand All @@ -148,21 +149,21 @@ class parquet_reader_options {
}

/**
* @brief Returns true/false depending whether to use pandas metadata or not while reading.
* @brief Returns boolean depending on whether to use pandas metadata while reading.
*
* @return `true` if pandas metadata is used while reading
*/
[[nodiscard]] bool is_enabled_use_pandas_metadata() const { return _use_pandas_metadata; }

/**
* @brief Returns true/false depending whether to use arrow schema while reading.
* @brief Returns boolean depending on whether to use arrow schema while reading.
*
* @return `true` if arrow schema is used while reading
*/
[[nodiscard]] bool is_enabled_use_arrow_schema() const { return _use_arrow_schema; }

/**
* @brief Returns true/false depending on whether to read matching projected and filter columns
* @brief Returns boolean depending on whether to read matching projected and filter columns
* from mismatched Parquet sources.
*
* @return `true` if mismatched projected and filter columns will be read from mismatched Parquet
Expand Down Expand Up @@ -308,23 +309,23 @@ class parquet_reader_options {
/**
* @brief Sets to enable/disable use of pandas metadata to read.
*
* @param val Boolean value whether to use pandas metadata
* @param val Boolean indicating whether to use pandas metadata
*/
void enable_use_pandas_metadata(bool val) { _use_pandas_metadata = val; }

/**
* @brief Sets to enable/disable use of arrow schema to read.
*
* @param val Boolean value whether to use arrow schema
* @param val Boolean indicating whether to use arrow schema
*/
void enable_use_arrow_schema(bool val) { _use_arrow_schema = val; }

/**
* @brief Sets to enable/disable reading of matching projected and filter columns from mismatched
* Parquet sources.
*
* @param val Boolean value whether to read matching projected and filter columns from mismatched
* Parquet sources.
* @param val Boolean indicating whether to read matching projected and filter columns from
* mismatched Parquet sources.
*/
void enable_allow_mismatched_pq_schemas(bool val) { _allow_mismatched_pq_schemas = val; }

Expand Down Expand Up @@ -372,6 +373,7 @@ class parquet_reader_options_builder {
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
* The `hybrid_scan_reader` also uses this to construct `parquet_reader_options` without a source.
*/
parquet_reader_options_builder() = default;

Expand Down
126 changes: 126 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,30 @@
#include <functional>
#include <numeric>
#include <optional>
#include <unordered_set>

namespace cudf::io::parquet::experimental::detail {

using aggregate_reader_metadata_base = parquet::detail::aggregate_reader_metadata;
using metadata_base = parquet::detail::metadata;

using io::detail::inline_column_buffer;
using parquet::detail::CompactProtocolReader;
using parquet::detail::equality_literals_collector;
using parquet::detail::input_column_info;
using parquet::detail::row_group_info;

namespace {

[[nodiscard]] auto all_row_group_indices(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need this in other filter_row_groups_with_** APIs as well so separated out here.

host_span<std::vector<cudf::size_type> const> row_group_indices)
{
return std::vector<std::vector<cudf::size_type>>(row_group_indices.begin(),
row_group_indices.end());
}

} // namespace

metadata::metadata(cudf::host_span<uint8_t const> footer_bytes)
{
CompactProtocolReader cp(footer_bytes.data(), footer_bytes.size());
Expand Down Expand Up @@ -137,4 +150,117 @@ void aggregate_reader_metadata::setup_page_index(cudf::host_span<uint8_t const>
}
}

std::tuple<std::vector<input_column_info>,
std::vector<inline_column_buffer>,
std::vector<cudf::size_type>>
aggregate_reader_metadata::select_payload_columns(
std::optional<std::vector<std::string>> const& payload_column_names,
std::optional<std::vector<std::string>> const& filter_column_names,
bool include_index,
bool strings_to_categorical,
type_id timestamp_type_id)
{
// If neither payload nor filter columns are specified, select all columns
if (not payload_column_names.has_value() and not filter_column_names.has_value()) {
// Call the base `select_columns()` method without specifying any columns
return select_columns({}, {}, include_index, strings_to_categorical, timestamp_type_id);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Base class function selects all columns if the first argument is empty

}

std::vector<std::string> valid_payload_columns;

// If payload columns are specified, only select payload columns that do not appear in the filter
// expression
if (payload_column_names.has_value()) {
valid_payload_columns = *payload_column_names;
// Remove filter columns from the provided payload column names
if (filter_column_names.has_value() and not filter_column_names->empty()) {
// Add filter column names to a hash set for faster lookup
std::unordered_set<std::string> filter_columns_set(filter_column_names->begin(),
filter_column_names->end());
// Remove a payload column name if it is also present in the hash set
valid_payload_columns.erase(std::remove_if(valid_payload_columns.begin(),
valid_payload_columns.end(),
[&filter_columns_set](auto const& col) {
return filter_columns_set.count(col) > 0;
}),
valid_payload_columns.end());
}
// Call the base `select_columns()` method with valid payload columns
return select_columns(
valid_payload_columns, {}, include_index, strings_to_categorical, timestamp_type_id);
}

// Else if only filter columns are specified, select all columns that do not appear in the
// filter expression

// Add filter column names to a hash set for faster lookup
std::unordered_set<std::string> filter_columns_set(filter_column_names->begin(),
filter_column_names->end());

std::function<void(std::string, int)> add_column_path = [&](std::string path_till_now,
int schema_idx) {
auto const& schema_elem = get_schema(schema_idx);
std::string const curr_path = path_till_now + schema_elem.name;
// If the current path is not a filter column, then add it and its children to the list of valid
// payload columns
if (filter_columns_set.count(curr_path) == 0) {
valid_payload_columns.push_back(curr_path);
// Add all children as well
for (auto const& child_idx : schema_elem.children_idx) {
add_column_path(curr_path + ".", child_idx);
}
}
};

// Add all but filter columns to valid payload columns
if (not filter_column_names->empty()) {
for (auto const& child_idx : get_schema(0).children_idx) {
add_column_path("", child_idx);
}
}

// Call the base `select_columns()` method with all but filter columns
return select_columns(
valid_payload_columns, {}, include_index, strings_to_categorical, timestamp_type_id);
}

std::vector<std::vector<cudf::size_type>> aggregate_reader_metadata::filter_row_groups_with_stats(
host_span<std::vector<cudf::size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const
{
// Return all row groups if no filter expression
if (not filter.has_value()) { return all_row_group_indices(row_group_indices); }

// Compute total number of input row groups
cudf::size_type total_row_groups = [&]() {
if (not row_group_indices.empty()) {
size_t const total_row_groups =
std::accumulate(row_group_indices.begin(),
row_group_indices.end(),
size_t{0},
[](auto sum, auto const& pfm) { return sum + pfm.size(); });

// Check if we have less than 2B total row groups.
CUDF_EXPECTS(total_row_groups <= std::numeric_limits<cudf::size_type>::max(),
"Total number of row groups exceed the cudf::size_type's limit");
return static_cast<cudf::size_type>(total_row_groups);
} else {
return num_row_groups;
}
}();

// Filter stats table with StatsAST expression and collect filtered row group indices
auto const stats_filtered_row_group_indices = apply_stats_filters(row_group_indices,
Copy link
Member Author

@mhaseeb123 mhaseeb123 Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reuse the base class utility

total_row_groups,
output_dtypes,
output_column_schemas,
filter.value(),
stream);

return stats_filtered_row_group_indices.value_or(all_row_group_indices(row_group_indices));
}

} // namespace cudf::io::parquet::experimental::detail
39 changes: 39 additions & 0 deletions cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace cudf::io::parquet::experimental::detail {
using aggregate_reader_metadata_base = parquet::detail::aggregate_reader_metadata;
using metadata_base = parquet::detail::metadata;

using io::detail::inline_column_buffer;
using parquet::detail::equality_literals_collector;
using parquet::detail::input_column_info;
using parquet::detail::row_group_info;
Expand Down Expand Up @@ -124,6 +125,44 @@ class aggregate_reader_metadata : public aggregate_reader_metadata_base {
* @param page_index_bytes Host span of Parquet page index buffer bytes
*/
void setup_page_index(cudf::host_span<uint8_t const> page_index_bytes);

/**
* @brief Filters and reduces down to the selection of payload columns
*
* @param payload_column_names List of paths of select payload column names, if any
* @param filter_columns_names List of paths of column names present only in filter, if any
* @param include_index Whether to always include the PANDAS index column(s)
* @param strings_to_categorical Type conversion parameter
* @param timestamp_type_id Type conversion parameter
*
* @return input column information, output column buffers, list of output column schema
* indices
*/
[[nodiscard]] std::
tuple<std::vector<input_column_info>, std::vector<inline_column_buffer>, std::vector<size_type>>
select_payload_columns(std::optional<std::vector<std::string>> const& payload_column_names,
std::optional<std::vector<std::string>> const& filter_column_names,
bool include_index,
bool strings_to_categorical,
type_id timestamp_type_id);

/**
* @brief Filter the row groups with statistics based on predicate filter
*
* @param row_group_indices Input row groups indices
* @param output_dtypes Datatypes of output columns
* @param output_column_schemas schema indices of output columns
* @param filter Optional AST expression to filter row groups based on Column chunk statistics
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Filtered row group indices, if any are filtered
*/
[[nodiscard]] std::vector<std::vector<size_type>> filter_row_groups_with_stats(
host_span<std::vector<size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const;
};

} // namespace cudf::io::parquet::experimental::detail
Loading
Loading