Skip to content

Commit 518fc51

Browse files
authored
apacheGH-28074: [C++][Dataset] Handle NaNs correctly in Parquet predicate push-down (apache#15125)
This PR fixes the issue of handling NaNs in the Parquet predicate push-down. While computing the valid bounds for a column, if the max or min of the column is null, the range should ignore that. * Closes: apache#28074 Authored-by: Sanjiban Sengupta <[email protected]> Signed-off-by: Weston Pace <[email protected]>
1 parent 0a7e7fb commit 518fc51

File tree

5 files changed

+95
-41
lines changed

5 files changed

+95
-41
lines changed

cpp/src/arrow/dataset/file_parquet.cc

+74-39
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,19 @@ Result<std::shared_ptr<SchemaManifest>> GetSchemaManifest(
9898
return manifest;
9999
}
100100

101+
bool IsNan(const Scalar& value) {
102+
if (value.is_valid) {
103+
if (value.type->id() == Type::FLOAT) {
104+
const FloatScalar& float_scalar = checked_cast<const FloatScalar&>(value);
105+
return std::isnan(float_scalar.value);
106+
} else if (value.type->id() == Type::DOUBLE) {
107+
const DoubleScalar& double_scalar = checked_cast<const DoubleScalar&>(value);
108+
return std::isnan(double_scalar.value);
109+
}
110+
}
111+
return false;
112+
}
113+
101114
std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
102115
const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) {
103116
// For the remaining of this function, failure to extract/parse statistics
@@ -112,50 +125,13 @@ std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
112125

113126
auto column_metadata = metadata.ColumnChunk(schema_field.column_index);
114127
auto statistics = column_metadata->statistics();
115-
if (statistics == nullptr) {
116-
return std::nullopt;
117-
}
118-
119128
const auto& field = schema_field.field;
120-
auto field_expr = compute::field_ref(field->name());
121129

122-
// Optimize for corner case where all values are nulls
123-
if (statistics->num_values() == 0 && statistics->null_count() > 0) {
124-
return is_null(std::move(field_expr));
125-
}
126-
127-
std::shared_ptr<Scalar> min, max;
128-
if (!StatisticsAsScalars(*statistics, &min, &max).ok()) {
130+
if (statistics == nullptr) {
129131
return std::nullopt;
130132
}
131133

132-
auto maybe_min = min->CastTo(field->type());
133-
auto maybe_max = max->CastTo(field->type());
134-
if (maybe_min.ok() && maybe_max.ok()) {
135-
min = maybe_min.MoveValueUnsafe();
136-
max = maybe_max.MoveValueUnsafe();
137-
138-
if (min->Equals(max)) {
139-
auto single_value = compute::equal(field_expr, compute::literal(std::move(min)));
140-
141-
if (statistics->null_count() == 0) {
142-
return single_value;
143-
}
144-
return compute::or_(std::move(single_value), is_null(std::move(field_expr)));
145-
}
146-
147-
auto lower_bound =
148-
compute::greater_equal(field_expr, compute::literal(std::move(min)));
149-
auto upper_bound = compute::less_equal(field_expr, compute::literal(std::move(max)));
150-
151-
auto in_range = compute::and_(std::move(lower_bound), std::move(upper_bound));
152-
if (statistics->null_count() != 0) {
153-
return compute::or_(std::move(in_range), compute::is_null(field_expr));
154-
}
155-
return in_range;
156-
}
157-
158-
return std::nullopt;
134+
return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics);
159135
}
160136

161137
void AddColumnIndices(const SchemaField& schema_field,
@@ -306,6 +282,65 @@ Result<bool> IsSupportedParquetFile(const ParquetFileFormat& format,
306282

307283
} // namespace
308284

285+
std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpression(
286+
const Field& field, const parquet::Statistics& statistics) {
287+
auto field_expr = compute::field_ref(field.name());
288+
289+
// Optimize for corner case where all values are nulls
290+
if (statistics.num_values() == 0 && statistics.null_count() > 0) {
291+
return is_null(std::move(field_expr));
292+
}
293+
294+
std::shared_ptr<Scalar> min, max;
295+
if (!StatisticsAsScalars(statistics, &min, &max).ok()) {
296+
return std::nullopt;
297+
}
298+
299+
auto maybe_min = min->CastTo(field.type());
300+
auto maybe_max = max->CastTo(field.type());
301+
302+
if (maybe_min.ok() && maybe_max.ok()) {
303+
min = maybe_min.MoveValueUnsafe();
304+
max = maybe_max.MoveValueUnsafe();
305+
306+
if (min->Equals(max)) {
307+
auto single_value = compute::equal(field_expr, compute::literal(std::move(min)));
308+
309+
if (statistics.null_count() == 0) {
310+
return single_value;
311+
}
312+
return compute::or_(std::move(single_value), is_null(std::move(field_expr)));
313+
}
314+
315+
auto lower_bound = compute::greater_equal(field_expr, compute::literal(min));
316+
auto upper_bound = compute::less_equal(field_expr, compute::literal(max));
317+
compute::Expression in_range;
318+
319+
// Since the minimum & maximum values are NaN, useful statistics
320+
// cannot be extracted for checking the presence of a value within
321+
// range
322+
if (IsNan(*min) && IsNan(*max)) {
323+
return std::nullopt;
324+
}
325+
326+
// If either minimum or maximum is NaN, it should be ignored for the
327+
// range computation
328+
if (IsNan(*min)) {
329+
in_range = std::move(upper_bound);
330+
} else if (IsNan(*max)) {
331+
in_range = std::move(lower_bound);
332+
} else {
333+
in_range = compute::and_(std::move(lower_bound), std::move(upper_bound));
334+
}
335+
336+
if (statistics.null_count() != 0) {
337+
return compute::or_(std::move(in_range), compute::is_null(field_expr));
338+
}
339+
return in_range;
340+
}
341+
return std::nullopt;
342+
}
343+
309344
ParquetFileFormat::ParquetFileFormat()
310345
: FileFormat(std::make_shared<ParquetFragmentScanOptions>()) {}
311346

cpp/src/arrow/dataset/file_parquet.h

+3
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
171171
Result<std::shared_ptr<Fragment>> Subset(compute::Expression predicate);
172172
Result<std::shared_ptr<Fragment>> Subset(std::vector<int> row_group_ids);
173173

174+
static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
175+
const Field& field, const parquet::Statistics& statistics);
176+
174177
private:
175178
ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
176179
compute::Expression partition_expression,

cpp/src/arrow/dataset/file_parquet_test.cc

+16
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,14 @@
3333
#include "arrow/testing/util.h"
3434
#include "arrow/type.h"
3535
#include "arrow/type_fwd.h"
36+
#include "arrow/util/io_util.h"
3637
#include "arrow/util/range.h"
3738

3839
#include "parquet/arrow/writer.h"
40+
#include "parquet/file_reader.h"
3941
#include "parquet/metadata.h"
42+
#include "parquet/statistics.h"
43+
#include "parquet/types.h"
4044

4145
namespace arrow {
4246

@@ -678,5 +682,17 @@ INSTANTIATE_TEST_SUITE_P(TestScan, TestParquetFileFormatScan,
678682
::testing::ValuesIn(TestFormatParams::Values()),
679683
TestFormatParams::ToTestNameString);
680684

685+
TEST(TestParquetStatistics, NullMax) {
686+
auto field = ::arrow::field("x", float32());
687+
ASSERT_OK_AND_ASSIGN(std::string dir_string,
688+
arrow::internal::GetEnvVar("PARQUET_TEST_DATA"));
689+
auto reader =
690+
parquet::ParquetFileReader::OpenFile(dir_string + "/nan_in_stats.parquet");
691+
auto statistics = reader->RowGroup(0)->metadata()->ColumnChunk(0)->statistics();
692+
auto stat_expression =
693+
ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics);
694+
EXPECT_EQ(stat_expression->ToString(), "(x >= 1)");
695+
}
696+
681697
} // namespace dataset
682698
} // namespace arrow

testing

0 commit comments

Comments
 (0)