diff --git a/c_glib/arrow-dataset-glib/scanner.cpp b/c_glib/arrow-dataset-glib/scanner.cpp index 68c3e98ea08..5a8bdf21714 100644 --- a/c_glib/arrow-dataset-glib/scanner.cpp +++ b/c_glib/arrow-dataset-glib/scanner.cpp @@ -265,7 +265,7 @@ gad_scan_options_class_init(GADScanOptionsClass *klass) gobject_class->set_property = gad_scan_options_set_property; gobject_class->get_property = gad_scan_options_get_property; - auto scan_options = arrow::dataset::ScanOptions::Make(arrow::schema({})); + auto scan_options = std::make_shared(); spec = g_param_spec_pointer("scan-options", "ScanOptions", @@ -307,7 +307,8 @@ GADScanOptions * gad_scan_options_new(GArrowSchema *schema) { auto arrow_schema = garrow_schema_get_raw(schema); - auto arrow_scan_options = arrow::dataset::ScanOptions::Make(arrow_schema); + auto arrow_scan_options = std::make_shared(); + arrow_scan_options->dataset_schema = arrow_schema; return gad_scan_options_new_raw(&arrow_scan_options); } @@ -323,30 +324,10 @@ GArrowSchema * gad_scan_options_get_schema(GADScanOptions *scan_options) { auto priv = GAD_SCAN_OPTIONS_GET_PRIVATE(scan_options); - auto arrow_schema = priv->scan_options->schema(); + auto arrow_schema = priv->scan_options->dataset_schema; return garrow_schema_new_raw(&arrow_schema); } -/** - * gad_scan_options_replace_schema: - * @scan_options: A #GADScanOptions. - * @schema: A #GArrowSchema. - * - * Returns: (transfer full): - * A copy of the #GADScanOptions with the given #GArrowSchema. - * - * Since: 1.0.0 - */ -GADScanOptions * -gad_scan_options_replace_schema(GADScanOptions *scan_options, - GArrowSchema *schema) -{ - auto priv = GAD_SCAN_OPTIONS_GET_PRIVATE(scan_options); - auto arrow_schema = garrow_schema_get_raw(schema); - auto arrow_scan_options_copy = priv->scan_options->ReplaceSchema(arrow_schema); - return gad_scan_options_new_raw(&arrow_scan_options_copy); -} - /* arrow::dataset::ScanTask */ typedef struct GADScanTaskPrivate_ { diff --git a/c_glib/arrow-dataset-glib/scanner.h b/c_glib/arrow-dataset-glib/scanner.h index 75d212b1808..1c533433693 100644 --- a/c_glib/arrow-dataset-glib/scanner.h +++ b/c_glib/arrow-dataset-glib/scanner.h @@ -57,9 +57,6 @@ GARROW_AVAILABLE_IN_1_0 GADScanOptions *gad_scan_options_new(GArrowSchema *schema); GARROW_AVAILABLE_IN_1_0 GArrowSchema *gad_scan_options_get_schema(GADScanOptions *scan_options); -GARROW_AVAILABLE_IN_1_0 -GADScanOptions *gad_scan_options_replace_schema(GADScanOptions *scan_options, - GArrowSchema *schema); /* arrow::dataset::ScanTask */ diff --git a/c_glib/test/dataset/test-scan-options.rb b/c_glib/test/dataset/test-scan-options.rb index 1f5b77f2e9f..a8bcd12afde 100644 --- a/c_glib/test/dataset/test-scan-options.rb +++ b/c_glib/test/dataset/test-scan-options.rb @@ -34,11 +34,4 @@ def test_batch_size assert_equal(42, @scan_options.batch_size) end - - def test_replace_schema - other_schema = Arrow::Schema.new([Arrow::Field.new("visible", Arrow::BooleanDataType.new)]) - other_scan_options = @scan_options.replace_schema(other_schema) - assert_not_equal(@schema, other_scan_options.schema) - assert_equal(other_schema, other_scan_options.schema) - end end diff --git a/cpp/src/arrow/compute/api_scalar.h b/cpp/src/arrow/compute/api_scalar.h index 37f3077e4bd..0d95092c95b 100644 --- a/cpp/src/arrow/compute/api_scalar.h +++ b/cpp/src/arrow/compute/api_scalar.h @@ -115,10 +115,25 @@ struct CompareOptions : public FunctionOptions { }; struct ARROW_EXPORT ProjectOptions : public FunctionOptions { - explicit ProjectOptions(std::vector n) : field_names(std::move(n)) {} + ProjectOptions(std::vector n, std::vector r, + std::vector> m) + : field_names(std::move(n)), + field_nullability(std::move(r)), + field_metadata(std::move(m)) {} + + explicit ProjectOptions(std::vector n) + : field_names(std::move(n)), + field_nullability(field_names.size(), true), + field_metadata(field_names.size(), NULLPTR) {} /// Names for wrapped columns std::vector field_names; + + /// Nullability bits for wrapped columns + std::vector field_nullability; + + /// Metadata attached to wrapped columns + std::vector> field_metadata; }; /// @} diff --git a/cpp/src/arrow/compute/kernels/scalar_nested.cc b/cpp/src/arrow/compute/kernels/scalar_nested.cc index 6e9803caf9f..8a6a69932c0 100644 --- a/cpp/src/arrow/compute/kernels/scalar_nested.cc +++ b/cpp/src/arrow/compute/kernels/scalar_nested.cc @@ -63,9 +63,15 @@ const FunctionDoc list_value_length_doc{ Result ProjectResolve(KernelContext* ctx, const std::vector& descrs) { const auto& names = OptionsWrapper::Get(ctx).field_names; - if (names.size() != descrs.size()) { - return Status::Invalid("project() was passed ", names.size(), " field ", "names but ", - descrs.size(), " arguments"); + const auto& nullable = OptionsWrapper::Get(ctx).field_nullability; + const auto& metadata = OptionsWrapper::Get(ctx).field_metadata; + + if (names.size() != descrs.size() || nullable.size() != descrs.size() || + metadata.size() != descrs.size()) { + return Status::Invalid("project() was passed ", descrs.size(), " arguments but ", + names.size(), " field names, ", nullable.size(), + " nullability bits, and ", metadata.size(), + " metadata dictionaries."); } size_t i = 0; @@ -86,7 +92,7 @@ Result ProjectResolve(KernelContext* ctx, } } - fields[i] = field(names[i], descr.type); + fields[i] = field(names[i], descr.type, nullable[i], metadata[i]); ++i; } @@ -96,6 +102,16 @@ Result ProjectResolve(KernelContext* ctx, void ProjectExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { KERNEL_ASSIGN_OR_RAISE(auto descr, ctx, ProjectResolve(ctx, batch.GetDescriptors())); + for (int i = 0; i < batch.num_values(); ++i) { + const auto& field = checked_cast(*descr.type).field(i); + if (batch[i].null_count() > 0 && !field->nullable()) { + ctx->SetStatus(Status::Invalid("Output field ", field, " (#", i, + ") does not allow nulls but the corresponding " + "argument was not entirely valid.")); + return; + } + } + if (descr.shape == ValueDescr::SCALAR) { ScalarVector scalars(batch.num_values()); for (int i = 0; i < batch.num_values(); ++i) { diff --git a/cpp/src/arrow/compute/kernels/scalar_nested_test.cc b/cpp/src/arrow/compute/kernels/scalar_nested_test.cc index 14363f5d0d1..42de9bcdb50 100644 --- a/cpp/src/arrow/compute/kernels/scalar_nested_test.cc +++ b/cpp/src/arrow/compute/kernels/scalar_nested_test.cc @@ -22,6 +22,7 @@ #include "arrow/compute/kernels/test_util.h" #include "arrow/result.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/key_value_metadata.h" namespace arrow { namespace compute { @@ -38,51 +39,71 @@ TEST(TestScalarNested, ListValueLength) { } struct { - public: - Result operator()(std::vector args) { - ProjectOptions opts{field_names}; + template + Result operator()(std::vector args, std::vector field_names, + Options... options) { + ProjectOptions opts{field_names, options...}; return CallFunction("project", args, &opts); } - - std::vector field_names; } Project; TEST(Project, Scalar) { - std::shared_ptr expected(new StructScalar{{}, struct_({})}); - ASSERT_OK_AND_EQ(Datum(expected), Project({})); - auto i32 = MakeScalar(1); auto f64 = MakeScalar(2.5); auto str = MakeScalar("yo"); - expected.reset(new StructScalar{ - {i32, f64, str}, - struct_({field("i", i32->type), field("f", f64->type), field("s", str->type)})}); - Project.field_names = {"i", "f", "s"}; - ASSERT_OK_AND_EQ(Datum(expected), Project({i32, f64, str})); + ASSERT_OK_AND_ASSIGN(auto expected, + StructScalar::Make({i32, f64, str}, {"i", "f", "s"})); + ASSERT_OK_AND_EQ(Datum(expected), Project({i32, f64, str}, {"i", "f", "s"})); // Three field names but one input value - ASSERT_RAISES(Invalid, Project({str})); + ASSERT_RAISES(Invalid, Project({str}, {"i", "f", "s"})); + + // No field names or input values is fine + expected.reset(new StructScalar{{}, struct_({})}); + ASSERT_OK_AND_EQ(Datum(expected), Project(/*args=*/{}, /*field_names=*/{})); } TEST(Project, Array) { - Project.field_names = {"i", "s"}; + std::vector field_names{"i", "s"}; + auto i32 = ArrayFromJSON(int32(), "[42, 13, 7]"); auto str = ArrayFromJSON(utf8(), R"(["aa", "aa", "aa"])"); - ASSERT_OK_AND_ASSIGN(Datum expected, - StructArray::Make({i32, str}, Project.field_names)); + ASSERT_OK_AND_ASSIGN(Datum expected, StructArray::Make({i32, str}, field_names)); - ASSERT_OK_AND_EQ(expected, Project({i32, str})); + ASSERT_OK_AND_EQ(expected, Project({i32, str}, field_names)); // Scalars are broadcast to the length of the arrays - ASSERT_OK_AND_EQ(expected, Project({i32, MakeScalar("aa")})); + ASSERT_OK_AND_EQ(expected, Project({i32, MakeScalar("aa")}, field_names)); // Array length mismatch - ASSERT_RAISES(Invalid, Project({i32->Slice(1), str})); + ASSERT_RAISES(Invalid, Project({i32->Slice(1), str}, field_names)); +} + +TEST(Project, NullableMetadataPassedThru) { + auto i32 = ArrayFromJSON(int32(), "[42, 13, 7]"); + auto str = ArrayFromJSON(utf8(), R"(["aa", "aa", "aa"])"); + + std::vector field_names{"i", "s"}; + std::vector nullability{true, false}; + std::vector> metadata = { + key_value_metadata({"a", "b"}, {"ALPHA", "BRAVO"}), nullptr}; + + ASSERT_OK_AND_ASSIGN(auto proj, + Project({i32, str}, field_names, nullability, metadata)); + + AssertTypeEqual(*proj.type(), StructType({ + field("i", int32(), /*nullable=*/true, metadata[0]), + field("s", utf8(), /*nullable=*/false, nullptr), + })); + + // error: projecting an array containing nulls with nullable=false + str = ArrayFromJSON(utf8(), R"(["aa", null, "aa"])"); + ASSERT_RAISES(Invalid, Project({i32, str}, field_names, nullability, metadata)); } TEST(Project, ChunkedArray) { - Project.field_names = {"i", "s"}; + std::vector field_names{"i", "s"}; auto i32_0 = ArrayFromJSON(int32(), "[42, 13, 7]"); auto i32_1 = ArrayFromJSON(int32(), "[]"); @@ -95,26 +116,23 @@ TEST(Project, ChunkedArray) { ASSERT_OK_AND_ASSIGN(auto i32, ChunkedArray::Make({i32_0, i32_1, i32_2})); ASSERT_OK_AND_ASSIGN(auto str, ChunkedArray::Make({str_0, str_1, str_2})); - ASSERT_OK_AND_ASSIGN(auto expected_0, - StructArray::Make({i32_0, str_0}, Project.field_names)); - ASSERT_OK_AND_ASSIGN(auto expected_1, - StructArray::Make({i32_1, str_1}, Project.field_names)); - ASSERT_OK_AND_ASSIGN(auto expected_2, - StructArray::Make({i32_2, str_2}, Project.field_names)); + ASSERT_OK_AND_ASSIGN(auto expected_0, StructArray::Make({i32_0, str_0}, field_names)); + ASSERT_OK_AND_ASSIGN(auto expected_1, StructArray::Make({i32_1, str_1}, field_names)); + ASSERT_OK_AND_ASSIGN(auto expected_2, StructArray::Make({i32_2, str_2}, field_names)); ASSERT_OK_AND_ASSIGN(Datum expected, ChunkedArray::Make({expected_0, expected_1, expected_2})); - ASSERT_OK_AND_EQ(expected, Project({i32, str})); + ASSERT_OK_AND_EQ(expected, Project({i32, str}, field_names)); // Scalars are broadcast to the length of the arrays - ASSERT_OK_AND_EQ(expected, Project({i32, MakeScalar("aa")})); + ASSERT_OK_AND_EQ(expected, Project({i32, MakeScalar("aa")}, field_names)); // Array length mismatch - ASSERT_RAISES(Invalid, Project({i32->Slice(1), str})); + ASSERT_RAISES(Invalid, Project({i32->Slice(1), str}, field_names)); } TEST(Project, ChunkedArrayDifferentChunking) { - Project.field_names = {"i", "s"}; + std::vector field_names{"i", "s"}; auto i32_0 = ArrayFromJSON(int32(), "[42, 13, 7]"); auto i32_1 = ArrayFromJSON(int32(), "[]"); @@ -136,18 +154,18 @@ TEST(Project, ChunkedArrayDifferentChunking) { for (size_t i = 0; i < expected_chunks.size(); ++i) { ASSERT_OK_AND_ASSIGN(expected_chunks[i], StructArray::Make({expected_rechunked[0][i], expected_rechunked[1][i]}, - Project.field_names)); + field_names)); } ASSERT_OK_AND_ASSIGN(Datum expected, ChunkedArray::Make(expected_chunks)); - ASSERT_OK_AND_EQ(expected, Project({i32, str})); + ASSERT_OK_AND_EQ(expected, Project({i32, str}, field_names)); // Scalars are broadcast to the length of the arrays - ASSERT_OK_AND_EQ(expected, Project({i32, MakeScalar("aa")})); + ASSERT_OK_AND_EQ(expected, Project({i32, MakeScalar("aa")}, field_names)); // Array length mismatch - ASSERT_RAISES(Invalid, Project({i32->Slice(1), str})); + ASSERT_RAISES(Invalid, Project({i32->Slice(1), str}, field_names)); } } // namespace compute diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index 82a5a63c2c2..c57b90e7753 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -244,109 +244,6 @@ TEST(TestProjector, CheckProjectable) { "fields had matching names but differing types"); } -TEST(TestProjector, MismatchedType) { - constexpr int64_t kBatchSize = 1024; - - auto from_schema = schema({field("f64", float64())}); - auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, from_schema); - - auto to_schema = schema({field("f64", int32())}); - RecordBatchProjector projector(to_schema); - - auto result = projector.Project(*batch); - ASSERT_RAISES(TypeError, result.status()); -} - -TEST(TestProjector, AugmentWithNull) { - constexpr int64_t kBatchSize = 1024; - - auto from_schema = - schema({field("f64", float64()), field("b", boolean()), field("str", null())}); - auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, from_schema); - auto to_schema = - schema({field("i32", int32()), field("f64", float64()), field("str", utf8())}); - - RecordBatchProjector projector(to_schema); - - ASSERT_OK_AND_ASSIGN(auto null_i32, MakeArrayOfNull(int32(), batch->num_rows())); - ASSERT_OK_AND_ASSIGN(auto null_str, MakeArrayOfNull(utf8(), batch->num_rows())); - auto expected_batch = RecordBatch::Make(to_schema, batch->num_rows(), - {null_i32, batch->column(0), null_str}); - - ASSERT_OK_AND_ASSIGN(auto reconciled_batch, projector.Project(*batch)); - AssertBatchesEqual(*expected_batch, *reconciled_batch); -} - -TEST(TestProjector, AugmentWithScalar) { - static constexpr int64_t kBatchSize = 1024; - static constexpr int32_t kScalarValue = 3; - - auto from_schema = schema({field("f64", float64()), field("b", boolean())}); - auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, from_schema); - auto to_schema = schema({field("i32", int32()), field("f64", float64())}); - - auto scalar_i32 = std::make_shared(kScalarValue); - - RecordBatchProjector projector(to_schema); - ASSERT_OK(projector.SetDefaultValue(to_schema->GetFieldIndex("i32"), scalar_i32)); - - ASSERT_OK_AND_ASSIGN(auto array_i32, - ArrayFromBuilderVisitor(int32(), kBatchSize, [](Int32Builder* b) { - b->UnsafeAppend(kScalarValue); - })); - - auto expected_batch = - RecordBatch::Make(to_schema, batch->num_rows(), {array_i32, batch->column(0)}); - - ASSERT_OK_AND_ASSIGN(auto reconciled_batch, projector.Project(*batch)); - AssertBatchesEqual(*expected_batch, *reconciled_batch); -} - -TEST(TestProjector, NonTrivial) { - static constexpr int64_t kBatchSize = 1024; - - static constexpr float kScalarValue = 3.14f; - - auto from_schema = - schema({field("i8", int8()), field("u8", uint8()), field("i16", int16()), - field("u16", uint16()), field("i32", int32()), field("u32", uint32())}); - - auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, from_schema); - - auto to_schema = - schema({field("i32", int32()), field("f64", float64()), field("u16", uint16()), - field("u8", uint8()), field("b", boolean()), field("u32", uint32()), - field("f32", float32())}); - - auto scalar_f32 = std::make_shared(kScalarValue); - auto scalar_f64 = std::make_shared(kScalarValue); - - RecordBatchProjector projector(to_schema); - ASSERT_OK(projector.SetDefaultValue(to_schema->GetFieldIndex("f64"), scalar_f64)); - ASSERT_OK(projector.SetDefaultValue(to_schema->GetFieldIndex("f32"), scalar_f32)); - - ASSERT_OK_AND_ASSIGN( - auto array_f32, ArrayFromBuilderVisitor(float32(), kBatchSize, [](FloatBuilder* b) { - b->UnsafeAppend(kScalarValue); - })); - ASSERT_OK_AND_ASSIGN(auto array_f64, ArrayFromBuilderVisitor( - float64(), kBatchSize, [](DoubleBuilder* b) { - b->UnsafeAppend(kScalarValue); - })); - ASSERT_OK_AND_ASSIGN( - auto null_b, ArrayFromBuilderVisitor(boolean(), kBatchSize, [](BooleanBuilder* b) { - b->UnsafeAppendNull(); - })); - - auto expected_batch = RecordBatch::Make( - to_schema, batch->num_rows(), - {batch->GetColumnByName("i32"), array_f64, batch->GetColumnByName("u16"), - batch->GetColumnByName("u8"), null_b, batch->GetColumnByName("u32"), array_f32}); - - ASSERT_OK_AND_ASSIGN(auto reconciled_batch, projector.Project(*batch)); - AssertBatchesEqual(*expected_batch, *reconciled_batch); -} - class TestEndToEnd : public TestUnionDataset { void SetUp() override { bool nullable = false; @@ -716,6 +613,27 @@ TEST_F(TestSchemaUnification, SelectPhysicalColumnsFilterPartitionColumn) { AssertBuilderEquals(scan_builder, rows); } +TEST_F(TestSchemaUnification, SelectSyntheticColumn) { + // Select only a synthetic column + ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset_->NewScan()); + ASSERT_OK(scan_builder->Project( + {call("add", {field_ref("phy_1"), field_ref("part_df")})}, {"phy_1 + part_df"})); + + ASSERT_OK_AND_ASSIGN(auto scanner, scan_builder->Finish()); + AssertSchemaEqual(Schema({field("phy_1 + part_df", int32())}), + *scanner->options()->projected_schema); + + using TupleType = std::tuple; + std::vector rows = { + TupleType(111 + 1), + TupleType(nullopt), + TupleType(nullopt), + TupleType(nullopt), + }; + + AssertBuilderEquals(scan_builder, rows); +} + TEST_F(TestSchemaUnification, SelectPartitionColumns) { // Selects partition (virtual) columns, it ensures: // @@ -794,7 +712,7 @@ TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) { ASSERT_OK_AND_ASSIGN(auto scanner, scan_builder->Finish()); ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable()); AssertArraysEqual(*table->column(0)->chunk(0), - *DictArrayFromJSON(partition_field->type(), "[0]", "[\"one\"]")); + *ArrayFromJSON(partition_field->type(), R"(["one"])")); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index a951e827fa4..a51b3c09971 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -136,7 +136,9 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest { if (schema == nullptr) { ASSERT_OK_AND_ASSIGN(schema, factory_->Inspect(options)); } - options_ = ScanOptions::Make(schema); + options_ = std::make_shared(); + options_->dataset_schema = schema; + ASSERT_OK(SetProjection(options_.get(), schema->field_names())); ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema)); ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset_->GetFragments()); AssertFragmentsAreFromPath(std::move(fragment_it), paths); diff --git a/cpp/src/arrow/dataset/expression.cc b/cpp/src/arrow/dataset/expression.cc index 5ddb270451a..6e71aa17e74 100644 --- a/cpp/src/arrow/dataset/expression.cc +++ b/cpp/src/arrow/dataset/expression.cc @@ -752,22 +752,27 @@ Result ReplaceFieldsWithKnownValues( auto it = known_values.find(*ref); if (it != known_values.end()) { Datum lit = it->second; - if (expr.type()->id() == Type::DICTIONARY) { - if (lit.is_scalar()) { - // FIXME the "right" way to support this is adding support for scalars to - // dictionary_encode and support for casting between index types to cast - ARROW_ASSIGN_OR_RAISE( - auto index, - Int32Scalar(0).CastTo( - checked_cast(*expr.type()).index_type())); + if (lit.descr() == expr.descr()) return literal(std::move(lit)); + // type mismatch, try casting the known value to the correct type + + if (expr.type()->id() == Type::DICTIONARY && + lit.type()->id() != Type::DICTIONARY) { + // the known value must be dictionary encoded + + const auto& dict_type = checked_cast(*expr.type()); + if (!lit.type()->Equals(dict_type.value_type())) { + ARROW_ASSIGN_OR_RAISE(lit, compute::Cast(lit, dict_type.value_type())); + } + if (lit.is_scalar()) { ARROW_ASSIGN_OR_RAISE(auto dictionary, MakeArrayFromScalar(*lit.scalar(), 1)); - return literal( - DictionaryScalar::Make(std::move(index), std::move(dictionary))); + lit = Datum{DictionaryScalar::Make(MakeScalar(0), + std::move(dictionary))}; } } + ARROW_ASSIGN_OR_RAISE(lit, compute::Cast(lit, expr.type())); return literal(std::move(lit)); } diff --git a/cpp/src/arrow/dataset/expression_test.cc b/cpp/src/arrow/dataset/expression_test.cc index c837c5be893..2ab796b052f 100644 --- a/cpp/src/arrow/dataset/expression_test.cc +++ b/cpp/src/arrow/dataset/expression_test.cc @@ -455,7 +455,7 @@ TEST(Expression, BindNestedCall) { } TEST(Expression, ExecuteFieldRef) { - auto AssertRefIs = [](FieldRef ref, Datum in, Datum expected) { + auto ExpectRefIs = [](FieldRef ref, Datum in, Datum expected) { auto expr = field_ref(ref); ASSERT_OK_AND_ASSIGN(expr, expr.Bind(in.descr())); @@ -464,7 +464,7 @@ TEST(Expression, ExecuteFieldRef) { AssertDatumsEqual(actual, expected, /*verbose=*/true); }; - AssertRefIs("a", ArrayFromJSON(struct_({field("a", float64())}), R"([ + ExpectRefIs("a", ArrayFromJSON(struct_({field("a", float64())}), R"([ {"a": 6.125}, {"a": 0.0}, {"a": -1} @@ -472,7 +472,7 @@ TEST(Expression, ExecuteFieldRef) { ArrayFromJSON(float64(), R"([6.125, 0.0, -1])")); // more nested: - AssertRefIs(FieldRef{"a", "a"}, + ExpectRefIs(FieldRef{"a", "a"}, ArrayFromJSON(struct_({field("a", struct_({field("a", float64())}))}), R"([ {"a": {"a": 6.125}}, {"a": {"a": 0.0}}, @@ -481,7 +481,7 @@ TEST(Expression, ExecuteFieldRef) { ArrayFromJSON(float64(), R"([6.125, 0.0, -1])")); // absent fields are resolved as a null scalar: - AssertRefIs(FieldRef{"b"}, ArrayFromJSON(struct_({field("a", float64())}), R"([ + ExpectRefIs(FieldRef{"b"}, ArrayFromJSON(struct_({field("a", float64())}), R"([ {"a": 6.125}, {"a": 0.0}, {"a": -1} @@ -520,7 +520,7 @@ Result NaiveExecuteScalarExpression(const Expression& expr, const Datum& return function->Execute(arguments, call->options.get(), &exec_context); } -void AssertExecute(Expression expr, Datum in, Datum* actual_out = NULLPTR) { +void ExpectExecute(Expression expr, Datum in, Datum* actual_out = NULLPTR) { if (in.is_value()) { ASSERT_OK_AND_ASSIGN(expr, expr.Bind(in.descr())); } else { @@ -539,14 +539,14 @@ void AssertExecute(Expression expr, Datum in, Datum* actual_out = NULLPTR) { } TEST(Expression, ExecuteCall) { - AssertExecute(call("add", {field_ref("a"), literal(3.5)}), + ExpectExecute(call("add", {field_ref("a"), literal(3.5)}), ArrayFromJSON(struct_({field("a", float64())}), R"([ {"a": 6.125}, {"a": 0.0}, {"a": -1} ])")); - AssertExecute( + ExpectExecute( call("add", {field_ref("a"), call("subtract", {literal(3.5), field_ref("b")})}), ArrayFromJSON(struct_({field("a", float64()), field("b", float64())}), R"([ {"a": 6.125, "b": 3.375}, @@ -554,7 +554,7 @@ TEST(Expression, ExecuteCall) { {"a": -1, "b": 4.75} ])")); - AssertExecute(call("strptime", {field_ref("a")}, + ExpectExecute(call("strptime", {field_ref("a")}, compute::StrptimeOptions("%m/%d/%Y", TimeUnit::MICRO)), ArrayFromJSON(struct_({field("a", utf8())}), R"([ {"a": "5/1/2020"}, @@ -562,7 +562,7 @@ TEST(Expression, ExecuteCall) { {"a": "12/11/1900"} ])")); - AssertExecute(project({call("add", {field_ref("a"), literal(3.5)})}, {"a + 3.5"}), + ExpectExecute(project({call("add", {field_ref("a"), literal(3.5)})}, {"a + 3.5"}), ArrayFromJSON(struct_({field("a", float64())}), R"([ {"a": 6.125}, {"a": 0.0}, @@ -571,7 +571,7 @@ TEST(Expression, ExecuteCall) { } TEST(Expression, ExecuteDictionaryTransparent) { - AssertExecute( + ExpectExecute( equal(field_ref("a"), field_ref("b")), ArrayFromJSON( struct_({field("a", dictionary(int32(), utf8())), field("b", utf8())}), R"([ @@ -579,6 +579,30 @@ TEST(Expression, ExecuteDictionaryTransparent) { {"a": "", "b": ""}, {"a": "hi", "b": "hello"} ])")); + + ASSERT_OK_AND_ASSIGN( + auto expr, project({field_ref("i32"), field_ref("dict_str")}, {"i32", "dict_str"}) + .Bind(*kBoringSchema)); + + ASSERT_OK_AND_ASSIGN( + expr, SimplifyWithGuarantee(expr, equal(field_ref("dict_str"), literal("eh")))); + + ASSERT_OK_AND_ASSIGN( + auto res, + ExecuteScalarExpression(expr, ArrayFromJSON(struct_({field("i32", int32())}), R"([ + {"i32": 0}, + {"i32": 1}, + {"i32": 2} + ])"))); + + AssertDatumsEqual( + res, ArrayFromJSON(struct_({field("i32", int32()), + field("dict_str", dictionary(int32(), utf8()))}), + R"([ + {"i32": 0, "dict_str": "eh"}, + {"i32": 1, "dict_str": "eh"}, + {"i32": 2, "dict_str": "eh"} + ])")); } void ExpectIdenticalIfUnchanged(Expression modified, Expression original) { @@ -756,6 +780,10 @@ TEST(Expression, ReplaceFieldsWithKnownValues) { ExpectReplacesTo(equal(field_ref("i32"), literal(1)), i32_is_3, equal(literal(3), literal(1))); + Datum dict_str{ + DictionaryScalar::Make(MakeScalar(0), ArrayFromJSON(utf8(), R"(["3"])"))}; + ExpectReplacesTo(field_ref("dict_str"), {{"dict_str", dict_str}}, literal(dict_str)); + ExpectReplacesTo(call("add", { call("subtract", @@ -787,6 +815,18 @@ TEST(Expression, ReplaceFieldsWithKnownValues) { ExpectReplacesTo(is_valid(field_ref("str")), i32_valid_str_null, is_valid(null_literal(utf8()))); + + ASSERT_OK_AND_ASSIGN(auto expr, field_ref("dict_str").Bind(*kBoringSchema)); + Datum dict_i32{ + DictionaryScalar::Make(MakeScalar(0), ArrayFromJSON(int32(), R"([3])"))}; + // Unsupported cast dictionary(int32(), int32()) -> dictionary(int32(), utf8()) + ASSERT_RAISES(NotImplemented, + ReplaceFieldsWithKnownValues({{"dict_str", dict_i32}}, expr)); + // Unsupported cast dictionary(int8(), utf8()) -> dictionary(int32(), utf8()) + dict_str = Datum{ + DictionaryScalar::Make(MakeScalar(0), ArrayFromJSON(utf8(), R"(["a"])"))}; + ASSERT_RAISES(NotImplemented, + ReplaceFieldsWithKnownValues({{"dict_str", dict_str}}, expr)); } struct { @@ -1075,8 +1115,8 @@ TEST(Expression, SimplifyThenExecute) { ])"); Datum evaluated, simplified_evaluated; - AssertExecute(filter, input, &evaluated); - AssertExecute(simplified, input, &simplified_evaluated); + ExpectExecute(filter, input, &evaluated); + ExpectExecute(simplified, input, &simplified_evaluated); AssertDatumsEqual(evaluated, simplified_evaluated, /*verbose=*/true); } @@ -1173,5 +1213,70 @@ TEST(Expression, SerializationRoundTrips) { equal(field_ref("beta"), literal(3.25f))})); } +TEST(Projection, AugmentWithNull) { + // NB: input contains *no columns* except i32 + auto input = ArrayFromJSON(struct_({kBoringSchema->GetFieldByName("i32")}), + R"([{"i32": 0}, {"i32": 1}, {"i32": 2}])"); + + auto ExpectProject = [&](Expression proj, Datum expected) { + ASSERT_OK_AND_ASSIGN(proj, proj.Bind(*kBoringSchema)); + ASSERT_OK_AND_ASSIGN(auto actual, ExecuteScalarExpression(proj, input)); + AssertDatumsEqual(Datum(expected), actual); + }; + + ExpectProject(project({field_ref("f64"), field_ref("i32")}, + {"projected double", "projected int"}), + // "projected double" is materialized as a column of nulls + ArrayFromJSON(struct_({field("projected double", float64()), + field("projected int", int32())}), + R"([ + [null, 0], + [null, 1], + [null, 2] + ])")); + + ExpectProject( + project({field_ref("f64")}, {"projected double"}), + // NB: only a scalar was projected, this is *not* automatically broadcast + // to an array. "projected double" is materialized as a null scalar + Datum(*StructScalar::Make({MakeNullScalar(float64())}, {"projected double"}))); +} + +TEST(Projection, AugmentWithKnownValues) { + auto input = ArrayFromJSON(struct_({kBoringSchema->GetFieldByName("i32")}), + R"([{"i32": 0}, {"i32": 1}, {"i32": 2}])"); + + auto ExpectSimplifyAndProject = [&](Expression proj, Datum expected, + Expression guarantee) { + ASSERT_OK_AND_ASSIGN(proj, proj.Bind(*kBoringSchema)); + ASSERT_OK_AND_ASSIGN(proj, SimplifyWithGuarantee(proj, guarantee)); + ASSERT_OK_AND_ASSIGN(auto actual, ExecuteScalarExpression(proj, input)); + AssertDatumsEqual(Datum(expected), actual); + }; + + ExpectSimplifyAndProject( + project({field_ref("str"), field_ref("f64"), field_ref("i64"), field_ref("i32")}, + {"str", "f64", "i64", "i32"}), + ArrayFromJSON(struct_({ + field("str", utf8()), + field("f64", float64()), + field("i64", int64()), + field("i32", int32()), + }), + // str is explicitly null + // f64 is explicitly 3.5 + // i64 is not specified in the guarantee and implicitly null + // i32 is present in the input and passed through + R"([ + {"str": null, "f64": 3.5, "i64": null, "i32": 0}, + {"str": null, "f64": 3.5, "i64": null, "i32": 1}, + {"str": null, "f64": 3.5, "i64": null, "i32": 2} + ])"), + and_({ + equal(field_ref("f64"), literal(3.5)), + is_null(field_ref("str")), + })); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 534c4704cb9..65fc2927fd5 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -83,25 +83,12 @@ static inline Result GetConvertOptions( auto convert_options = csv::ConvertOptions::Defaults(); - for (const auto& field : scan_options->schema()->fields()) { + for (FieldRef ref : scan_options->MaterializedFields()) { + ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOne(*scan_options->dataset_schema)); + if (column_names.find(field->name()) == column_names.end()) continue; convert_options.column_types[field->name()] = field->type(); - convert_options.include_columns.push_back(field->name()); - } - - // FIXME(bkietz) also acquire types of fields materialized but not projected. - // (This will require that scan_options include the full dataset schema, not just - // the projected schema). - for (const FieldRef& ref : FieldsInExpression(scan_options->filter)) { - DCHECK(ref.name()); - ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*scan_options->schema())); - - if (match.empty()) { - // a field was filtered but not in the projected schema; be sure it is included - convert_options.include_columns.push_back(*ref.name()); - } } - return convert_options; } @@ -126,11 +113,11 @@ static inline Result> OpenReader( const auto& parse_options = format.parse_options; - ARROW_ASSIGN_OR_RAISE( - auto convert_options, - scan_options == nullptr - ? ToResult(csv::ConvertOptions::Defaults()) - : GetConvertOptions(format, scan_options, *first_block, pool)); + auto convert_options = csv::ConvertOptions::Defaults(); + if (scan_options != nullptr) { + ARROW_ASSIGN_OR_RAISE(convert_options, + GetConvertOptions(format, scan_options, *first_block, pool)); + } auto maybe_reader = csv::StreamingReader::Make(pool, std::move(input), reader_options, parse_options, convert_options); diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index eb0e1bf9395..5c27f81b094 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -36,14 +36,6 @@ namespace dataset { class TestCsvFileFormat : public testing::Test { public: - std::unique_ptr GetFileSource() { - return GetFileSource(R"(f64 -1.0 - -N/A -2)"); - } - std::unique_ptr GetFileSource(std::string csv) { return internal::make_unique(Buffer::FromString(std::move(csv))); } @@ -59,17 +51,24 @@ N/A return Batches(std::move(scan_task_it)); } - protected: + void SetSchema(std::vector> fields) { + opts_ = std::make_shared(); + opts_->dataset_schema = schema(std::move(fields)); + ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names())); + } + std::shared_ptr format_ = std::make_shared(); std::shared_ptr opts_; std::shared_ptr ctx_ = std::make_shared(); - std::shared_ptr schema_ = schema({field("f64", float64())}); }; TEST_F(TestCsvFileFormat, ScanRecordBatchReader) { - auto source = GetFileSource(); + auto source = GetFileSource(R"(f64 +1.0 - opts_ = ScanOptions::Make(schema_); +N/A +2)"); + SetSchema({field("f64", float64())}); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); int64_t row_count = 0; @@ -83,16 +82,23 @@ TEST_F(TestCsvFileFormat, ScanRecordBatchReader) { } TEST_F(TestCsvFileFormat, ScanRecordBatchReaderWithVirtualColumn) { - auto source = GetFileSource(); + auto source = GetFileSource(R"(f64 +1.0 - opts_ = ScanOptions::Make(schema({schema_->field(0), field("virtual", int32())})); +N/A +2)"); + // NB: dataset_schema includes a column not present in the file + SetSchema({field("f64", float64()), field("virtual", int32())}); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + ASSERT_OK_AND_ASSIGN(auto physical_schema, fragment->ReadPhysicalSchema()); + AssertSchemaEqual(Schema({field("f64", float64())}), *physical_schema); + int64_t row_count = 0; for (auto maybe_batch : Batches(fragment.get())) { ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); - AssertSchemaEqual(*batch->schema(), *schema_); + AssertSchemaEqual(*batch->schema(), *physical_schema); row_count += batch->num_rows(); } @@ -112,10 +118,13 @@ TEST_F(TestCsvFileFormat, OpenFailureWithRelevantError) { } TEST_F(TestCsvFileFormat, Inspect) { - auto source = GetFileSource(); + auto source = GetFileSource(R"(f64 +1.0 +N/A +2)"); ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get())); - EXPECT_EQ(*actual, *schema_); + EXPECT_EQ(*actual, Schema({field("f64", float64())})); } TEST_F(TestCsvFileFormat, IsSupported) { @@ -130,24 +139,40 @@ TEST_F(TestCsvFileFormat, IsSupported) { ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); ASSERT_EQ(supported, false); - source = GetFileSource(); + source = GetFileSource(R"(f64 +1.0 + +N/A +2)"); ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); EXPECT_EQ(supported, true); } -TEST_F(TestCsvFileFormat, DISABLED_NonMaterializedFieldWithDifferingTypeFromInferred) { - auto source = GetFileSource(R"(f64,str +TEST_F(TestCsvFileFormat, NonProjectedFieldWithDifferingTypeFromInferred) { + auto source = GetFileSource(R"(betrayal_not_really_f64,str 1.0,foo , N/A,bar 2,baz)"); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + ASSERT_OK_AND_ASSIGN(auto physical_schema, fragment->ReadPhysicalSchema()); + AssertSchemaEqual( + Schema({field("betrayal_not_really_f64", float64()), field("str", utf8())}), + *physical_schema); + + // CSV is a text format, so it is valid to read column betrayal_not_really_f64 as string + // rather than double + auto not_float64 = utf8(); + auto dataset_schema = + schema({field("betrayal_not_really_f64", not_float64), field("str", utf8())}); + + ScannerBuilder builder(dataset_schema, fragment, ctx_); + + // This filter is valid with declared schema, but would *not* be valid + // if betrayal_not_really_f64 were read as double rather than string. + ASSERT_OK( + builder.Filter(equal(field_ref("betrayal_not_really_f64"), field_ref("str")))); - // a valid schema for source: - schema_ = schema({field("f64", utf8()), field("str", utf8())}); - ScannerBuilder builder(schema_, fragment, ctx_); - // filter expression validated against declared schema - ASSERT_OK(builder.Filter(equal(field_ref("f64"), field_ref("str")))); // project only "str" ASSERT_OK(builder.Project({"str"})); ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); @@ -157,9 +182,10 @@ N/A,bar ASSERT_OK_AND_ASSIGN(auto scan_task, maybe_scan_task); ASSERT_OK_AND_ASSIGN(auto batch_it, scan_task->Execute()); for (auto maybe_batch : batch_it) { - // ERROR: "f64" is not projected and reverts to inferred type, - // breaking the comparison expression ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + // Run through the scan checking for errors to ensure that "f64" is read with the + // specified type and does not revert to the inferred type (if it reverts to + // inferring float64 then evaluation of the comparison expression should break) } } } diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index b8428e0a98e..e5347fa0cda 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -25,6 +25,7 @@ #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/partition.h" +#include "arrow/dataset/scanner_internal.h" #include "arrow/dataset/test_util.h" #include "arrow/io/memory.h" #include "arrow/ipc/reader.h" @@ -84,9 +85,8 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin { } std::unique_ptr GetRecordBatchReader( - std::shared_ptr schema = nullptr) { - return MakeGeneratedRecordBatch(schema ? schema : schema_, kBatchSize, - kBatchRepetitions); + std::shared_ptr schema) { + return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions); } Result> GetFileSink() { @@ -106,18 +106,23 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin { return Batches(std::move(scan_task_it)); } + void SetSchema(std::vector> fields) { + opts_ = std::make_shared(); + opts_->dataset_schema = schema(std::move(fields)); + ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names())); + } + protected: std::shared_ptr format_ = std::make_shared(); std::shared_ptr opts_; std::shared_ptr ctx_ = std::make_shared(); - std::shared_ptr schema_ = schema({field("f64", float64())}); }; TEST_F(TestIpcFileFormat, ScanRecordBatchReader) { - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); - opts_ = ScanOptions::Make(reader->schema()); + SetSchema(reader->schema()->fields()); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); int64_t row_count = 0; @@ -131,17 +136,21 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) { } TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) { - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); - opts_ = ScanOptions::Make(schema({schema_->field(0), field("virtual", int32())})); + // NB: dataset_schema includes a column not present in the file + SetSchema({reader->schema()->field(0), field("virtual", int32())}); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + ASSERT_OK_AND_ASSIGN(auto physical_schema, fragment->ReadPhysicalSchema()); + AssertSchemaEqual(Schema({field("f64", float64())}), *physical_schema); + int64_t row_count = 0; for (auto maybe_batch : Batches(fragment.get())) { ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); - AssertSchemaEqual(*batch->schema(), *schema_); + AssertSchemaEqual(*batch->schema(), *physical_schema); row_count += batch->num_rows(); } @@ -149,17 +158,17 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) { } TEST_F(TestIpcFileFormat, WriteRecordBatchReader) { - std::shared_ptr reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); - reader = GetRecordBatchReader(); - opts_ = ScanOptions::Make(reader->schema()); + SetSchema(reader->schema()->fields()); EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); auto options = format_->DefaultWriteOptions(); EXPECT_OK_AND_ASSIGN(auto writer, format_->MakeWriter(sink, reader->schema(), options)); - ASSERT_OK(writer->Write(reader.get())); + + ASSERT_OK(writer->Write(GetRecordBatchReader(schema({field("f64", float64())})).get())); ASSERT_OK(writer->Finish()); EXPECT_OK_AND_ASSIGN(auto written, sink->Finish()); @@ -168,11 +177,10 @@ TEST_F(TestIpcFileFormat, WriteRecordBatchReader) { } TEST_F(TestIpcFileFormat, WriteRecordBatchReaderCustomOptions) { - std::shared_ptr reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); - reader = GetRecordBatchReader(); - opts_ = ScanOptions::Make(reader->schema()); + SetSchema(reader->schema()->fields()); EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); @@ -185,7 +193,7 @@ TEST_F(TestIpcFileFormat, WriteRecordBatchReaderCustomOptions) { ipc_options->metadata = key_value_metadata({{"hello", "world"}}); EXPECT_OK_AND_ASSIGN(auto writer, format_->MakeWriter(sink, reader->schema(), ipc_options)); - ASSERT_OK(writer->Write(reader.get())); + ASSERT_OK(writer->Write(GetRecordBatchReader(schema({field("f64", float64())})).get())); ASSERT_OK(writer->Finish()); EXPECT_OK_AND_ASSIGN(auto written, sink->Finish()); @@ -249,19 +257,22 @@ TEST_F(TestIpcFileFormat, OpenFailureWithRelevantError) { result.status()); } -TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjected) { - schema_ = schema({field("f64", float64()), field("i64", int64()), - field("f32", float32()), field("i32", int32())}); +static auto f32 = field("f32", float32()); +static auto f64 = field("f64", float64()); +static auto i32 = field("i32", int32()); +static auto i64 = field("i64", int64()); - opts_ = ScanOptions::Make(schema_); - opts_->projector = RecordBatchProjector(SchemaFromColumnNames(schema_, {"f64"})); +TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjected) { + SetSchema({f64, i64, f32, i32}); + ASSERT_OK(SetProjection(opts_.get(), {"f64"})); opts_->filter = equal(field_ref("i32"), literal(0)); - // NB: projector is applied by the scanner; FileFragment does not evaluate it so - // we will not drop "i32" even though it is not in the projector's schema - auto expected_schema = schema({field("f64", float64()), field("i32", int32())}); + // NB: projection is applied by the scanner; FileFragment does not evaluate it so + // we will not drop "i32" even though it is not projected since we need it for + // filtering + auto expected_schema = schema({f64, i32}); - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(opts_->dataset_schema); auto source = GetFileSource(reader.get()); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); @@ -278,38 +289,32 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjected) { } TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjectedMissingCols) { - auto reader_without_i32 = GetRecordBatchReader( - schema({field("f64", float64()), field("i64", int64()), field("f32", float32())})); - - auto reader_without_f64 = GetRecordBatchReader( - schema({field("i64", int64()), field("f32", float32()), field("i32", int32())})); - - auto reader = - GetRecordBatchReader(schema({field("f64", float64()), field("i64", int64()), - field("f32", float32()), field("i32", int32())})); - - schema_ = reader->schema(); - opts_ = ScanOptions::Make(schema_); - opts_->projector = RecordBatchProjector(SchemaFromColumnNames(schema_, {"f64"})); + SetSchema({f64, i64, f32, i32}); + ASSERT_OK(SetProjection(opts_.get(), {"f64"})); opts_->filter = equal(field_ref("i32"), literal(0)); + auto reader_without_i32 = GetRecordBatchReader(schema({f64, i64, f32})); + auto reader_without_f64 = GetRecordBatchReader(schema({i64, f32, i32})); + auto reader = GetRecordBatchReader(schema({f64, i64, f32, i32})); + auto readers = {reader.get(), reader_without_i32.get(), reader_without_f64.get()}; for (auto reader : readers) { auto source = GetFileSource(reader); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); - // NB: projector is applied by the scanner; Fragment does not evaluate it. - // We will not drop "i32" even though it is not in the projector's schema. + // NB: projection is applied by the scanner; FileFragment does not evaluate it so + // we will not drop "i32" even though it is not projected since we need it for + // filtering // // in the case where a file doesn't contain a referenced field, we won't - // materialize it (the filter/projector will populate it with nulls later) + // materialize it as nulls later std::shared_ptr expected_schema; if (reader == reader_without_i32.get()) { - expected_schema = schema({field("f64", float64())}); + expected_schema = schema({f64}); } else if (reader == reader_without_f64.get()) { - expected_schema = schema({field("i32", int32())}); + expected_schema = schema({i32}); } else { - expected_schema = schema({field("f64", float64()), field("i32", int32())}); + expected_schema = schema({f64, i32}); } int64_t row_count = 0; @@ -326,15 +331,15 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjectedMissingCols) { } TEST_F(TestIpcFileFormat, Inspect) { - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get())); - EXPECT_EQ(*actual, *schema_); + EXPECT_EQ(*actual, *reader->schema()); } TEST_F(TestIpcFileFormat, IsSupported) { - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); bool supported = false; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index e198d18a8a7..977af6be81a 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -22,6 +22,7 @@ #include #include "arrow/dataset/dataset_internal.h" +#include "arrow/dataset/scanner_internal.h" #include "arrow/dataset/test_util.h" #include "arrow/io/memory.h" #include "arrow/record_batch.h" @@ -139,9 +140,8 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { } std::unique_ptr GetRecordBatchReader( - std::shared_ptr schema = nullptr) { - return MakeGeneratedRecordBatch(schema ? schema : schema_, kBatchSize, - kBatchRepetitions); + std::shared_ptr schema) { + return MakeGeneratedRecordBatch(schema, kBatchSize, kBatchRepetitions); } Result> GetFileSink() { @@ -162,7 +162,7 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { } void SetFilter(Expression filter) { - ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*schema_)); + ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema)); } std::shared_ptr SingleBatch(Fragment* fragment) { @@ -193,10 +193,10 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { void CountRowGroupsInFragment(const std::shared_ptr& fragment, std::vector expected_row_groups, Expression filter) { - schema_ = opts_->schema(); - ASSERT_OK_AND_ASSIGN(auto bound, filter.Bind(*schema_)); + SetFilter(filter); + auto parquet_fragment = checked_pointer_cast(fragment); - ASSERT_OK_AND_ASSIGN(auto fragments, parquet_fragment->SplitByRowGroup(bound)) + ASSERT_OK_AND_ASSIGN(auto fragments, parquet_fragment->SplitByRowGroup(opts_->filter)) EXPECT_EQ(fragments.size(), expected_row_groups.size()); for (size_t i = 0; i < fragments.size(); i++) { @@ -208,18 +208,23 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { } } + void SetSchema(std::vector> fields) { + opts_ = std::make_shared(); + opts_->dataset_schema = schema(std::move(fields)); + ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names())); + } + protected: - std::shared_ptr schema_ = schema({field("f64", float64())}); std::shared_ptr format_ = std::make_shared(); std::shared_ptr opts_; std::shared_ptr ctx_ = std::make_shared(); }; TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); - opts_ = ScanOptions::Make(reader->schema()); + SetSchema(reader->schema()->fields()); SetFilter(literal(true)); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); @@ -234,12 +239,10 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { } TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { - schema_ = schema({field("utf8", utf8())}); - - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("utf8", utf8())})); auto source = GetFileSource(reader.get()); - opts_ = ScanOptions::Make(reader->schema()); + SetSchema(reader->schema()->fields()); SetFilter(literal(true)); format_->reader_options.dict_columns = {"utf8"}; @@ -264,10 +267,10 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { } TEST_F(TestParquetFileFormat, ScanRecordBatchReaderPreBuffer) { - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); - opts_ = ScanOptions::Make(reader->schema()); + SetSchema(reader->schema()->fields()); SetFilter(literal(true)); format_->reader_options.pre_buffer = true; @@ -305,19 +308,22 @@ TEST_F(TestParquetFileFormat, OpenFailureWithRelevantError) { result.status()); } -TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) { - schema_ = schema({field("f64", float64()), field("i64", int64()), - field("f32", float32()), field("i32", int32())}); +static auto f32 = field("f32", float32()); +static auto f64 = field("f64", float64()); +static auto i32 = field("i32", int32()); +static auto i64 = field("i64", int64()); - opts_ = ScanOptions::Make(schema_); - opts_->projector = RecordBatchProjector(SchemaFromColumnNames(schema_, {"f64"})); +TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) { + SetSchema({f64, i64, f32, i32}); + ASSERT_OK(SetProjection(opts_.get(), {"f64"})); SetFilter(equal(field_ref("i32"), literal(0))); - // NB: projector is applied by the scanner; FileFragment does not evaluate it so - // we will not drop "i32" even though it is not in the projector's schema - auto expected_schema = schema({field("f64", float64()), field("i32", int32())}); + // NB: projection is applied by the scanner; FileFragment does not evaluate it so + // we will not drop "i32" even though it is not projected since we need it for + // filtering + auto expected_schema = schema({f64, i32}); - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(opts_->dataset_schema); auto source = GetFileSource(reader.get()); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); @@ -334,38 +340,32 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) { } TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) { - auto reader_without_i32 = GetRecordBatchReader( - schema({field("f64", float64()), field("i64", int64()), field("f32", float32())})); - - auto reader_without_f64 = GetRecordBatchReader( - schema({field("i64", int64()), field("f32", float32()), field("i32", int32())})); - - auto reader = - GetRecordBatchReader(schema({field("f64", float64()), field("i64", int64()), - field("f32", float32()), field("i32", int32())})); - - schema_ = reader->schema(); - opts_ = ScanOptions::Make(schema_); - opts_->projector = RecordBatchProjector(SchemaFromColumnNames(schema_, {"f64"})); + SetSchema({f64, i64, f32, i32}); + ASSERT_OK(SetProjection(opts_.get(), {"f64"})); SetFilter(equal(field_ref("i32"), literal(0))); + auto reader_without_i32 = GetRecordBatchReader(schema({f64, i64, f32})); + auto reader_without_f64 = GetRecordBatchReader(schema({i64, f32, i32})); + auto reader = GetRecordBatchReader(schema({f64, i64, f32, i32})); + auto readers = {reader.get(), reader_without_i32.get(), reader_without_f64.get()}; for (auto reader : readers) { auto source = GetFileSource(reader); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); - // NB: projector is applied by the scanner; ParquetFragment does not evaluate it. - // We will not drop "i32" even though it is not in the projector's schema. + // NB: projection is applied by the scanner; FileFragment does not evaluate it so + // we will not drop "i32" even though it is not projected since we need it for + // filtering // // in the case where a file doesn't contain a referenced field, we won't - // materialize it (the filter/projector will populate it with nulls later) + // materialize it as nulls later std::shared_ptr expected_schema; if (reader == reader_without_i32.get()) { - expected_schema = schema({field("f64", float64())}); + expected_schema = schema({f64}); } else if (reader == reader_without_f64.get()) { - expected_schema = schema({field("i32", int32())}); + expected_schema = schema({i32}); } else { - expected_schema = schema({field("f64", float64()), field("i32", int32())}); + expected_schema = schema({f64, i32}); } int64_t row_count = 0; @@ -382,17 +382,15 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) { } TEST_F(TestParquetFileFormat, Inspect) { - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get())); - AssertSchemaEqual(*actual, *schema_, /*check_metadata=*/false); + AssertSchemaEqual(*actual, *reader->schema(), /*check_metadata=*/false); } TEST_F(TestParquetFileFormat, InspectDictEncoded) { - schema_ = schema({field("utf8", utf8())}); - - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("utf8", utf8())})); auto source = GetFileSource(reader.get()); format_->reader_options.dict_columns = {"utf8"}; @@ -403,7 +401,7 @@ TEST_F(TestParquetFileFormat, InspectDictEncoded) { } TEST_F(TestParquetFileFormat, IsSupported) { - auto reader = GetRecordBatchReader(); + auto reader = GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); bool supported = false; @@ -439,8 +437,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdown) { auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups); auto source = GetFileSource(reader.get()); - opts_ = ScanOptions::Make(reader->schema()); - schema_ = reader->schema(); + SetSchema(reader->schema()->fields()); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); SetFilter(literal(true)); @@ -481,7 +478,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups); auto source = GetFileSource(reader.get()); - opts_ = ScanOptions::Make(reader->schema()); + SetSchema(reader->schema()->fields()); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); auto all_row_groups = internal::Iota(static_cast(kNumRowGroups)); @@ -537,7 +534,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragmentsUsingStringColum TableBatchReader reader(*table); auto source = GetFileSource(&reader); - opts_ = ScanOptions::Make(reader.schema()); + SetSchema(reader.schema()->fields()); ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"), literal("a"))); @@ -550,8 +547,7 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups); auto source = GetFileSource(reader.get()); - opts_ = ScanOptions::Make(reader->schema()); - schema_ = reader->schema(); + SetSchema(reader->schema()->fields()); SetFilter(literal(true)); auto row_groups_fragment = [&](std::vector row_groups) { @@ -602,11 +598,12 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { } TEST_F(TestParquetFileFormat, WriteRecordBatchReader) { - std::shared_ptr reader = GetRecordBatchReader(); + std::shared_ptr reader = + GetRecordBatchReader(schema({field("f64", float64())})); auto source = GetFileSource(reader.get()); - reader = GetRecordBatchReader(); + reader = GetRecordBatchReader(schema({field("f64", float64())})); - opts_ = ScanOptions::Make(reader->schema()); + SetSchema(reader->schema()->fields()); EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); @@ -627,7 +624,7 @@ TEST_F(TestParquetFileFormat, WriteRecordBatchReaderCustomOptions) { std::shared_ptr reader = GetRecordBatchReader(schema({field("ts", timestamp(coerce_timestamps_from))})); - opts_ = ScanOptions::Make(reader->schema()); + SetSchema(reader->schema()->fields()); EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 522dbbeb5d2..ec974787cae 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -70,23 +70,6 @@ std::shared_ptr Partitioning::Default() { return std::make_shared(); } -Status KeyValuePartitioning::SetDefaultValuesFromKeys(const Expression& expr, - RecordBatchProjector* projector) { - ARROW_ASSIGN_OR_RAISE(auto known_values, ExtractKnownFieldValues(expr)); - for (const auto& ref_value : known_values) { - if (!ref_value.second.is_scalar()) { - return Status::Invalid("non-scalar partition key ", ref_value.second.ToString()); - } - - ARROW_ASSIGN_OR_RAISE(auto match, - ref_value.first.FindOneOrNone(*projector->schema())); - - if (match.empty()) continue; - RETURN_NOT_OK(projector->SetDefaultValue(match, ref_value.second.scalar())); - } - return Status::OK(); -} - inline Expression ConjunctionFromGroupingRow(Scalar* row) { ScalarVector* values = &checked_cast(row)->value; std::vector equality_expressions(values->size()); diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 42e1b4c4097..eff1f2609e8 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -128,9 +128,6 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { util::optional value; }; - static Status SetDefaultValuesFromKeys(const Expression& expr, - RecordBatchProjector* projector); - Result Partition( const std::shared_ptr& batch) const override; diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 75e60f994f0..cf97507deac 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -464,55 +464,6 @@ TEST_F(TestPartitioning, HiveDictionaryHasUniqueValues) { AssertParseError("/alpha=yosemite"); // not in inspected dictionary } -TEST_F(TestPartitioning, SetDefaultValuesConcrete) { - auto small_schm = schema({field("c", int32())}); - auto schm = schema({field("a", int32()), field("b", utf8())}); - auto full_schm = schema({field("a", int32()), field("b", utf8()), field("c", int32())}); - RecordBatchProjector record_batch_projector(full_schm); - HivePartitioning part(schm); - ARROW_EXPECT_OK(part.SetDefaultValuesFromKeys( - and_(equal(field_ref("a"), literal(10)), is_valid(field_ref("b"))), - &record_batch_projector)); - - auto in_rb = RecordBatchFromJSON(small_schm, R"([{"c": 0}, - {"c": 1}, - {"c": 2}, - {"c": 3} - ])"); - - EXPECT_OK_AND_ASSIGN(auto out_rb, record_batch_projector.Project(*in_rb)); - auto expected_rb = RecordBatchFromJSON(full_schm, R"([{"a": 10, "b": null, "c": 0}, - {"a": 10, "b": null, "c": 1}, - {"a": 10, "b": null, "c": 2}, - {"a": 10, "b": null, "c": 3} - ])"); - AssertBatchesEqual(*expected_rb, *out_rb); -} - -TEST_F(TestPartitioning, SetDefaultValuesNull) { - auto small_schm = schema({field("c", int32())}); - auto schm = schema({field("a", int32()), field("b", utf8())}); - auto full_schm = schema({field("a", int32()), field("b", utf8()), field("c", int32())}); - RecordBatchProjector record_batch_projector(full_schm); - HivePartitioning part(schm); - ARROW_EXPECT_OK(part.SetDefaultValuesFromKeys( - and_(is_null(field_ref("a")), is_null(field_ref("b"))), &record_batch_projector)); - - auto in_rb = RecordBatchFromJSON(small_schm, R"([{"c": 0}, - {"c": 1}, - {"c": 2}, - {"c": 3} - ])"); - - EXPECT_OK_AND_ASSIGN(auto out_rb, record_batch_projector.Project(*in_rb)); - auto expected_rb = RecordBatchFromJSON(full_schm, R"([{"a": null, "b": null, "c": 0}, - {"a": null, "b": null, "c": 1}, - {"a": null, "b": null, "c": 2}, - {"a": null, "b": null, "c": 3} - ])"); - AssertBatchesEqual(*expected_rb, *out_rb); -} - TEST_F(TestPartitioning, EtlThenHive) { FieldVector etl_fields{field("year", int16()), field("month", int8()), field("day", int8()), field("hour", int8())}; diff --git a/cpp/src/arrow/dataset/projector.cc b/cpp/src/arrow/dataset/projector.cc index ba0eb2ddff5..b2196a8744c 100644 --- a/cpp/src/arrow/dataset/projector.cc +++ b/cpp/src/arrow/dataset/projector.cc @@ -17,20 +17,8 @@ #include "arrow/dataset/projector.h" -#include -#include -#include -#include - -#include "arrow/array.h" -#include "arrow/compute/cast.h" -#include "arrow/dataset/type_fwd.h" -#include "arrow/record_batch.h" -#include "arrow/result.h" -#include "arrow/scalar.h" #include "arrow/status.h" #include "arrow/type.h" -#include "arrow/util/logging.h" namespace arrow { namespace dataset { @@ -71,111 +59,5 @@ Status CheckProjectable(const Schema& from, const Schema& to) { return Status::OK(); } -RecordBatchProjector::RecordBatchProjector(std::shared_ptr to) - : to_(std::move(to)), - missing_columns_(to_->num_fields(), nullptr), - column_indices_(to_->num_fields(), kNoMatch), - scalars_(to_->num_fields(), nullptr) {} - -Status RecordBatchProjector::SetDefaultValue(FieldRef ref, - std::shared_ptr scalar) { - DCHECK_NE(scalar, nullptr); - if (ref.IsNested()) { - return Status::NotImplemented("setting default values for nested columns"); - } - - ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOne(*to_)); - auto index = match.indices()[0]; - - auto field_type = to_->field(index)->type(); - if (!field_type->Equals(scalar->type)) { - if (scalar->is_valid) { - auto maybe_converted = compute::Cast(scalar, field_type); - if (!maybe_converted.ok()) { - return Status::TypeError("Field ", to_->field(index)->ToString(), - " cannot be materialized from scalar of type ", - *scalar->type, - ". Cast error: ", maybe_converted.status().message()); - } - scalar = maybe_converted->scalar(); - } else { - scalar = MakeNullScalar(field_type); - } - } - - scalars_[index] = std::move(scalar); - return Status::OK(); -} - -Result> RecordBatchProjector::Project( - const RecordBatch& batch, MemoryPool* pool) { - if (from_ == nullptr || !batch.schema()->Equals(*from_, /*check_metadata=*/false)) { - RETURN_NOT_OK(SetInputSchema(batch.schema(), pool)); - } - - if (missing_columns_length_ < batch.num_rows()) { - RETURN_NOT_OK(ResizeMissingColumns(batch.num_rows(), pool)); - } - - ArrayVector columns(to_->num_fields()); - - for (int i = 0; i < to_->num_fields(); ++i) { - if (column_indices_[i] != kNoMatch) { - columns[i] = batch.column(column_indices_[i]); - } else { - columns[i] = missing_columns_[i]->Slice(0, batch.num_rows()); - } - } - - return RecordBatch::Make(to_, batch.num_rows(), std::move(columns)); -} - -Status RecordBatchProjector::SetInputSchema(std::shared_ptr from, - MemoryPool* pool) { - RETURN_NOT_OK(CheckProjectable(*from, *to_)); - from_ = std::move(from); - - for (int i = 0; i < to_->num_fields(); ++i) { - ARROW_ASSIGN_OR_RAISE(auto match, - FieldRef(to_->field(i)->name()).FindOneOrNone(*from_)); - - if (match.indices().empty() || - from_->field(match.indices()[0])->type()->id() == Type::NA) { - // Mark column i as missing by setting missing_columns_[i] - // to a non-null placeholder. - ARROW_ASSIGN_OR_RAISE(missing_columns_[i], - MakeArrayOfNull(to_->field(i)->type(), 0, pool)); - column_indices_[i] = kNoMatch; - } else { - // Mark column i as not missing by setting missing_columns_[i] to nullptr - missing_columns_[i] = nullptr; - column_indices_[i] = match.indices()[0]; - } - } - return Status::OK(); -} - -Status RecordBatchProjector::ResizeMissingColumns(int64_t new_length, MemoryPool* pool) { - // TODO(bkietz) MakeArrayOfNull could use fewer buffers by reusing a single zeroed - // buffer for every buffer in every column which is null - for (int i = 0; i < to_->num_fields(); ++i) { - if (missing_columns_[i] == nullptr) { - continue; - } - if (scalars_[i] == nullptr) { - ARROW_ASSIGN_OR_RAISE( - missing_columns_[i], - MakeArrayOfNull(missing_columns_[i]->type(), new_length, pool)); - continue; - } - ARROW_ASSIGN_OR_RAISE(missing_columns_[i], - MakeArrayFromScalar(*scalars_[i], new_length, pool)); - } - missing_columns_length_ = new_length; - return Status::OK(); -} - -constexpr int RecordBatchProjector::kNoMatch; - } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/projector.h b/cpp/src/arrow/dataset/projector.h index 31e5f49b68d..d3171fbfb3d 100644 --- a/cpp/src/arrow/dataset/projector.h +++ b/cpp/src/arrow/dataset/projector.h @@ -19,57 +19,14 @@ #pragma once -#include -#include - -#include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/type_fwd.h" namespace arrow { namespace dataset { +// FIXME this is superceded by Expression::Bind ARROW_DS_EXPORT Status CheckProjectable(const Schema& from, const Schema& to); -/// \brief Project a RecordBatch to a given schema. -/// -/// Projected record batches will reorder columns from input record batches when possible, -/// otherwise the given schema will be satisfied by augmenting with null or constant -/// columns. -/// -/// RecordBatchProjector is most efficient when projecting record batches with a -/// consistent schema (for example batches from a table), but it can project record -/// batches having any schema. -class ARROW_DS_EXPORT RecordBatchProjector { - public: - static constexpr int kNoMatch = -1; - - /// A column required by the given schema but absent from a record batch will be added - /// to the projected record batch with all its slots null. - explicit RecordBatchProjector(std::shared_ptr to); - - /// If the indexed field is absent from a record batch it will be added to the projected - /// record batch with all its slots equal to the provided scalar (instead of null). - Status SetDefaultValue(FieldRef ref, std::shared_ptr scalar); - - Result> Project(const RecordBatch& batch, - MemoryPool* pool = default_memory_pool()); - - const std::shared_ptr& schema() const { return to_; } - - Status SetInputSchema(std::shared_ptr from, - MemoryPool* pool = default_memory_pool()); - - private: - Status ResizeMissingColumns(int64_t new_length, MemoryPool* pool); - - std::shared_ptr from_, to_; - int64_t missing_columns_length_ = 0; - // these vectors are indexed parallel to to_->fields() - std::vector> missing_columns_; - std::vector column_indices_; - std::vector> scalars_; -}; - } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 0537d370125..3d81e2d12ee 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -21,6 +21,7 @@ #include #include +#include "arrow/compute/api_scalar.h" #include "arrow/dataset/dataset.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/scanner_internal.h" @@ -33,27 +34,14 @@ namespace arrow { namespace dataset { -ScanOptions::ScanOptions(std::shared_ptr schema) - : projector(RecordBatchProjector(std::move(schema))) {} - -std::shared_ptr ScanOptions::ReplaceSchema( - std::shared_ptr schema) const { - auto copy = ScanOptions::Make(std::move(schema)); - copy->filter = filter; - copy->batch_size = batch_size; - return copy; -} - std::vector ScanOptions::MaterializedFields() const { std::vector fields; - for (const auto& f : schema()->fields()) { - fields.push_back(f->name()); - } - - for (const FieldRef& ref : FieldsInExpression(filter)) { - DCHECK(ref.name()); - fields.push_back(*ref.name()); + for (const Expression* expr : {&filter, &projection}) { + for (const FieldRef& ref : FieldsInExpression(*expr)) { + DCHECK(ref.name()); + fields.push_back(*ref.name()); + } } return fields; @@ -94,8 +82,9 @@ ScannerBuilder::ScannerBuilder(std::shared_ptr dataset, std::shared_ptr scan_context) : dataset_(std::move(dataset)), fragment_(nullptr), - scan_options_(ScanOptions::Make(dataset_->schema())), + scan_options_(std::make_shared()), scan_context_(std::move(scan_context)) { + scan_options_->dataset_schema = dataset_->schema(); DCHECK_OK(Filter(literal(true))); } @@ -104,29 +93,27 @@ ScannerBuilder::ScannerBuilder(std::shared_ptr schema, std::shared_ptr scan_context) : dataset_(nullptr), fragment_(std::move(fragment)), - fragment_schema_(schema), - scan_options_(ScanOptions::Make(std::move(schema))), + scan_options_(std::make_shared()), scan_context_(std::move(scan_context)) { + scan_options_->dataset_schema = std::move(schema); DCHECK_OK(Filter(literal(true))); } const std::shared_ptr& ScannerBuilder::schema() const { - return fragment_ ? fragment_schema_ : dataset_->schema(); + return scan_options_->dataset_schema; } Status ScannerBuilder::Project(std::vector columns) { - RETURN_NOT_OK(schema()->CanReferenceFieldsByNames(columns)); - has_projection_ = true; - project_columns_ = std::move(columns); - return Status::OK(); + return SetProjection(scan_options_.get(), std::move(columns)); +} + +Status ScannerBuilder::Project(std::vector exprs, + std::vector names) { + return SetProjection(scan_options_.get(), std::move(exprs), std::move(names)); } Status ScannerBuilder::Filter(const Expression& filter) { - for (const auto& ref : FieldsInExpression(filter)) { - RETURN_NOT_OK(ref.FindOne(*schema())); - } - ARROW_ASSIGN_OR_RAISE(scan_options_->filter, filter.Bind(*schema())); - return Status::OK(); + return SetFilter(scan_options_.get(), filter); } Status ScannerBuilder::UseThreads(bool use_threads) { @@ -142,20 +129,15 @@ Status ScannerBuilder::BatchSize(int64_t batch_size) { return Status::OK(); } -Result> ScannerBuilder::Finish() const { - std::shared_ptr scan_options; - if (has_projection_ && !project_columns_.empty()) { - scan_options = - scan_options_->ReplaceSchema(SchemaFromColumnNames(schema(), project_columns_)); - } else { - scan_options = std::make_shared(*scan_options_); +Result> ScannerBuilder::Finish() { + if (!scan_options_->projection.IsBound()) { + RETURN_NOT_OK(Project(scan_options_->dataset_schema->field_names())); } if (dataset_ == nullptr) { - return std::make_shared(fragment_, std::move(scan_options), scan_context_); + return std::make_shared(fragment_, scan_options_, scan_context_); } - - return std::make_shared(dataset_, std::move(scan_options), scan_context_); + return std::make_shared(dataset_, scan_options_, scan_context_); } using arrow::internal::TaskGroup; @@ -220,7 +202,7 @@ Result> Scanner::ToTable() { // Wait for all tasks to complete, or the first error. RETURN_NOT_OK(task_group->Finish()); - return Table::FromRecordBatches(scan_options_->schema(), + return Table::FromRecordBatches(scan_options_->projected_schema, FlattenRecordBatchVector(std::move(state->batches))); } diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index e65ac7fa524..742f340e67a 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -50,26 +50,27 @@ struct ARROW_DS_EXPORT ScanContext { std::shared_ptr TaskGroup() const; }; -class ARROW_DS_EXPORT ScanOptions { - public: - virtual ~ScanOptions() = default; - - static std::shared_ptr Make(std::shared_ptr schema) { - return std::shared_ptr(new ScanOptions(std::move(schema))); - } - - // Construct a copy of these options with a different schema. - // The projector will be reconstructed. - std::shared_ptr ReplaceSchema(std::shared_ptr schema) const; - - // Filter +struct ARROW_DS_EXPORT ScanOptions { + // Filter and projection Expression filter = literal(true); + Expression projection; - // Schema to which record batches will be reconciled - const std::shared_ptr& schema() const { return projector.schema(); } + // Schema with which batches will be read from fragments. This is also known as the + // "reader schema" it will be used (for example) in constructing CSV file readers to + // identify column types for parsing. Usually only a subset of its fields (see + // MaterializedFields) will be materialized during a scan. + std::shared_ptr dataset_schema; - // Projector for reconciling the final RecordBatch to the requested schema. - RecordBatchProjector projector; + // Schema of projected record batches. This is independent of dataset_schema as its + // fields are derived from the projection. For example, let + // + // dataset_schema = {"a": int32, "b": int32, "id": utf8} + // projection = project({equal(field_ref("a"), field_ref("b"))}, {"a_plus_b"}) + // + // (no filter specified). In this case, the projected_schema would be + // + // {"a_plus_b": int32} + std::shared_ptr projected_schema; // Maximum row count for scanned batches. int64_t batch_size = kDefaultBatchSize; @@ -86,12 +87,9 @@ class ARROW_DS_EXPORT ScanOptions { // used in the final projection but is still required to evaluate the // expression. // - // This is used by Fragments implementation to apply the column + // This is used by Fragment implementations to apply the column // sub-selection optimization. std::vector MaterializedFields() const; - - private: - explicit ScanOptions(std::shared_ptr schema); }; /// \brief Read record batches from a range of a single data fragment. A @@ -172,7 +170,9 @@ class ARROW_DS_EXPORT Scanner { /// \brief GetFragments returns an iterator over all Fragments in this scan. Result GetFragments(); - const std::shared_ptr& schema() const { return scan_options_->schema(); } + const std::shared_ptr& schema() const { + return scan_options_->projected_schema; + } const std::shared_ptr& options() const { return scan_options_; } @@ -199,9 +199,7 @@ class ARROW_DS_EXPORT ScannerBuilder { /// \brief Set the subset of columns to materialize. /// - /// This subset will be passed down to Sources and corresponding Fragments. - /// The goal is to avoid loading/copying/deserializing columns that will - /// not be required further down the compute chain. + /// Columns which are not referenced may not be read from fragments. /// /// \param[in] columns list of columns to project. Order and duplicates will /// be preserved. @@ -210,11 +208,23 @@ class ARROW_DS_EXPORT ScannerBuilder { /// Schema. Status Project(std::vector columns); + /// \brief Set expressions which will be evaluated to produce the materialized columns. + /// + /// Columns which are not referenced may not be read from fragments. + /// + /// \param[in] exprs expressions to evaluate to produce columns. + /// \param[in] names list of names for the resulting columns. + /// + /// \return Failure if any referenced column does not exists in the dataset's + /// Schema. + Status Project(std::vector exprs, std::vector names); + /// \brief Set the filter expression to return only rows matching the filter. /// /// The predicate will be passed down to Sources and corresponding /// Fragments to exploit predicate pushdown if possible using /// partition information or Fragment internal metadata, e.g. Parquet statistics. + /// Columns which are not referenced may not be read from fragments. /// /// \param[in] filter expression to filter rows with. /// @@ -235,18 +245,15 @@ class ARROW_DS_EXPORT ScannerBuilder { Status BatchSize(int64_t batch_size); /// \brief Return the constructed now-immutable Scanner object - Result> Finish() const; + Result> Finish(); const std::shared_ptr& schema() const; private: std::shared_ptr dataset_; std::shared_ptr fragment_; - std::shared_ptr fragment_schema_; std::shared_ptr scan_options_; std::shared_ptr scan_context_; - bool has_projection_ = false; - std::vector project_columns_; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 729bd1f3a48..37c42c98e69 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -21,13 +21,19 @@ #include #include "arrow/array/array_nested.h" +#include "arrow/array/util.h" +#include "arrow/compute/api_scalar.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/partition.h" #include "arrow/dataset/scanner.h" +#include "arrow/util/logging.h" namespace arrow { + +using internal::checked_cast; + namespace dataset { inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, Expression filter, @@ -55,15 +61,24 @@ inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, Expression } inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, - RecordBatchProjector* projector, - MemoryPool* pool) { + Expression projection, MemoryPool* pool) { return MakeMaybeMapIterator( - [=](std::shared_ptr in) { - // The RecordBatchProjector is shared across ScanTasks of the same - // Fragment. The resize operation of missing columns is not thread safe. - // Ensure that each ScanTask gets his own projector. - RecordBatchProjector local_projector{*projector}; - return local_projector.Project(*in, pool); + [=](std::shared_ptr in) -> Result> { + compute::ExecContext exec_context{pool}; + ARROW_ASSIGN_OR_RAISE(Datum projected, ExecuteScalarExpression( + projection, Datum(in), &exec_context)); + + DCHECK_EQ(projected.type()->id(), Type::STRUCT); + if (projected.shape() == ValueDescr::SCALAR) { + // Only virtual columns are projected. Broadcast to an array + ARROW_ASSIGN_OR_RAISE( + projected, MakeArrayFromScalar(*projected.scalar(), in->num_rows(), pool)); + } + + ARROW_ASSIGN_OR_RAISE( + auto out, RecordBatch::FromStructArray(projected.array_as())); + + return out->ReplaceSchemaMetadata(in->schema()->metadata()); }, std::move(it)); } @@ -73,30 +88,27 @@ class FilterAndProjectScanTask : public ScanTask { explicit FilterAndProjectScanTask(std::shared_ptr task, Expression partition) : ScanTask(task->options(), task->context()), task_(std::move(task)), - partition_(std::move(partition)), - filter_(options()->filter), - projector_(options()->projector) {} + partition_(std::move(partition)) {} Result Execute() override { ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute()); ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, - SimplifyWithGuarantee(filter_, partition_)); + SimplifyWithGuarantee(options()->filter, partition_)); + + ARROW_ASSIGN_OR_RAISE(Expression simplified_projection, + SimplifyWithGuarantee(options()->projection, partition_)); RecordBatchIterator filter_it = FilterRecordBatch(std::move(it), simplified_filter, context_->pool); - RETURN_NOT_OK( - KeyValuePartitioning::SetDefaultValuesFromKeys(partition_, &projector_)); - - return ProjectRecordBatch(std::move(filter_it), &projector_, context_->pool); + return ProjectRecordBatch(std::move(filter_it), simplified_projection, + context_->pool); } private: std::shared_ptr task_; Expression partition_; - Expression filter_; - RecordBatchProjector projector_; }; /// \brief GetScanTaskIterator transforms an Iterator in a @@ -127,5 +139,62 @@ inline Result GetScanTaskIterator( return MakeFlattenIterator(std::move(maybe_scantask_it)); } +inline Status NestedFieldRefsNotImplemented() { + // TODO(ARROW-11259) Several functions (for example, IpcScanTask::Make) assume that + // only top level fields will be materialized. + return Status::NotImplemented("Nested field references in scans."); +} + +inline Status SetProjection(ScanOptions* options, const Expression& projection) { + ARROW_ASSIGN_OR_RAISE(options->projection, projection.Bind(*options->dataset_schema)); + + if (options->projection.type()->id() != Type::STRUCT) { + return Status::Invalid("Projection ", projection.ToString(), + " cannot yield record batches"); + } + options->projected_schema = ::arrow::schema( + checked_cast(*options->projection.type()).fields(), + options->dataset_schema->metadata()); + + return Status::OK(); +} + +inline Status SetProjection(ScanOptions* options, std::vector exprs, + std::vector names) { + compute::ProjectOptions project_options{std::move(names)}; + + for (size_t i = 0; i < exprs.size(); ++i) { + if (auto ref = exprs[i].field_ref()) { + if (!ref->name()) return NestedFieldRefsNotImplemented(); + + // set metadata and nullability for plain field references + ARROW_ASSIGN_OR_RAISE(auto field, ref->GetOne(*options->dataset_schema)); + project_options.field_nullability[i] = field->nullable(); + project_options.field_metadata[i] = field->metadata(); + } + } + + return SetProjection(options, + call("project", std::move(exprs), std::move(project_options))); +} + +inline Status SetProjection(ScanOptions* options, std::vector names) { + std::vector exprs(names.size()); + for (size_t i = 0; i < exprs.size(); ++i) { + exprs[i] = field_ref(names[i]); + } + return SetProjection(options, std::move(exprs), std::move(names)); +} + +inline Status SetFilter(ScanOptions* options, const Expression& filter) { + for (const auto& ref : FieldsInExpression(filter)) { + if (!ref.name()) return NestedFieldRefsNotImplemented(); + + RETURN_NOT_OK(ref.FindOne(*options->dataset_schema)); + } + ARROW_ASSIGN_OR_RAISE(options->filter, filter.Bind(*options->dataset_schema)); + return Status::OK(); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index c0e37bef6cc..0ceb5bc4434 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -19,21 +19,25 @@ #include +#include "arrow/dataset/scanner_internal.h" #include "arrow/dataset/test_util.h" #include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/testing/generator.h" #include "arrow/testing/util.h" +using testing::ElementsAre; +using testing::IsEmpty; + namespace arrow { namespace dataset { +constexpr int64_t kNumberChildDatasets = 2; +constexpr int64_t kNumberBatches = 16; +constexpr int64_t kBatchSize = 1024; + class TestScanner : public DatasetFixtureMixin { protected: - static constexpr int64_t kNumberChildDatasets = 2; - static constexpr int64_t kNumberBatches = 16; - static constexpr int64_t kBatchSize = 1024; - Scanner MakeScanner(std::shared_ptr batch) { std::vector> batches{static_cast(kNumberBatches), batch}; @@ -57,10 +61,6 @@ class TestScanner : public DatasetFixtureMixin { } }; -constexpr int64_t TestScanner::kNumberChildDatasets; -constexpr int64_t TestScanner::kNumberBatches; -constexpr int64_t TestScanner::kBatchSize; - TEST_F(TestScanner, Scan) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); @@ -111,8 +111,10 @@ TEST_F(TestScanner, MaterializeMissingColumn) { auto batch_missing_f64 = ConstantArrayGenerator::Zeroes(kBatchSize, schema({field("i32", int32())})); - ASSERT_OK(options_->projector.SetDefaultValue(schema_->GetFieldIndex("f64"), - MakeScalar(2.5))); + auto fragment_missing_f64 = std::make_shared( + RecordBatchVector{static_cast(kNumberChildDatasets * kNumberBatches), + batch_missing_f64}, + equal(field_ref("f64"), literal(2.5))); ASSERT_OK_AND_ASSIGN(auto f64, ArrayFromBuilderVisitor(float64(), kBatchSize, [&](DoubleBuilder* builder) { @@ -121,7 +123,10 @@ TEST_F(TestScanner, MaterializeMissingColumn) { auto batch_with_f64 = RecordBatch::Make(schema_, f64->length(), {batch_missing_f64->column(0), f64}); - AssertScannerEqualsRepetitionsOf(MakeScanner(batch_missing_f64), batch_with_f64); + ScannerBuilder builder{schema_, fragment_missing_f64, ctx_}; + ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); + + AssertScannerEqualsRepetitionsOf(*scanner, batch_with_f64); } TEST_F(TestScanner, ToTable) { @@ -175,9 +180,23 @@ TEST_F(TestScannerBuilder, TestProject) { ASSERT_OK(builder.Project({})); ASSERT_OK(builder.Project({"i64", "b", "i8"})); ASSERT_OK(builder.Project({"i16", "i16"})); + ASSERT_OK(builder.Project( + {field_ref("i16"), call("multiply", {field_ref("i16"), literal(2)})}, + {"i16 renamed", "i16 * 2"})); ASSERT_RAISES(Invalid, builder.Project({"not_found_column"})); ASSERT_RAISES(Invalid, builder.Project({"i8", "not_found_column"})); + ASSERT_RAISES(Invalid, + builder.Project({field_ref("not_found_column"), + call("multiply", {field_ref("i16"), literal(2)})}, + {"i16 renamed", "i16 * 2"})); + + ASSERT_RAISES(NotImplemented, builder.Project({field_ref(FieldRef("nested", "column"))}, + {"nested column"})); + + // provided more field names than column exprs or vice versa + ASSERT_RAISES(Invalid, builder.Project({}, {"i16 renamed", "i16 * 2"})); + ASSERT_RAISES(Invalid, builder.Project({literal(2), field_ref("a")}, {"a"})); } TEST_F(TestScannerBuilder, TestFilter) { @@ -192,35 +211,59 @@ TEST_F(TestScannerBuilder, TestFilter) { ASSERT_RAISES(Invalid, builder.Filter(equal(field_ref("not_a_column"), literal(true)))); + ASSERT_RAISES( + NotImplemented, + builder.Filter(equal(field_ref(FieldRef("nested", "column")), literal(true)))); + ASSERT_RAISES(Invalid, builder.Filter(or_(equal(field_ref("i64"), literal(10)), equal(field_ref("not_a_column"), literal(true))))); } -using testing::ElementsAre; -using testing::IsEmpty; - TEST(ScanOptions, TestMaterializedFields) { auto i32 = field("i32", int32()); auto i64 = field("i64", int64()); + auto opts = std::make_shared(); + + // empty dataset, project nothing = nothing materialized + opts->dataset_schema = schema({}); + ASSERT_OK(SetProjection(opts.get(), {}, {})); + EXPECT_THAT(opts->MaterializedFields(), IsEmpty()); - auto opts = ScanOptions::Make(schema({})); + // non-empty dataset, project nothing = nothing materialized + opts->dataset_schema = schema({i32, i64}); EXPECT_THAT(opts->MaterializedFields(), IsEmpty()); + // project nothing, filter on i32 = materialize i32 opts->filter = equal(field_ref("i32"), literal(10)); EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32")); - opts = ScanOptions::Make(schema({i32, i64})); + // project i32 & i64, filter nothing = materialize i32 & i64 + opts->filter = literal(true); + ASSERT_OK(SetProjection(opts.get(), {"i32", "i64"})); + EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i64")); + + // project i32 + i64, filter nothing = materialize i32 & i64 + opts->filter = literal(true); + ASSERT_OK(SetProjection(opts.get(), {call("add", {field_ref("i32"), field_ref("i64")})}, + {"i32 + i64"})); EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i64")); - opts = opts->ReplaceSchema(schema({i32})); + // project i32, filter nothing = materialize i32 + ASSERT_OK(SetProjection(opts.get(), {"i32"})); EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32")); + // project i32, filter on i32 = materialize i32 (reported twice) opts->filter = equal(field_ref("i32"), literal(10)); EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i32")); + // project i32, filter on i32 & i64 = materialize i64, i32 (reported twice) + opts->filter = less(field_ref("i32"), field_ref("i64")); + EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i64", "i32")); + + // project i32, filter on i64 = materialize i32 & i64 opts->filter = equal(field_ref("i64"), literal(10)); - EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i64")); + EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i64", "i32")); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index c72283312cb..fb1530ec372 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -33,6 +33,7 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" +#include "arrow/dataset/scanner_internal.h" #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/mockfs.h" #include "arrow/filesystem/path_util.h" @@ -200,7 +201,9 @@ class DatasetFixtureMixin : public ::testing::Test { protected: void SetSchema(std::vector> fields) { schema_ = schema(std::move(fields)); - options_ = ScanOptions::Make(schema_); + options_ = std::make_shared(); + options_->dataset_schema = schema_; + ASSERT_OK(SetProjection(options_.get(), schema_->field_names())); SetFilter(literal(true)); } @@ -372,7 +375,7 @@ struct MakeFileSystemDatasetMixin { std::shared_ptr fs_; std::shared_ptr dataset_; - std::shared_ptr options_ = ScanOptions::Make(schema({})); + std::shared_ptr options_; }; static const std::string& PathOf(const std::shared_ptr& fragment) { @@ -568,7 +571,9 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { FileSystemDatasetFactory::Make(fs_, s, source_format, options)); ASSERT_OK_AND_ASSIGN(dataset_, factory->Finish()); - scan_options_ = ScanOptions::Make(source_schema_); + scan_options_ = std::make_shared(); + scan_options_->dataset_schema = source_schema_; + ASSERT_OK(SetProjection(scan_options_.get(), source_schema_->field_names())); } void SetWriteOptions(std::shared_ptr file_write_options) { diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 66fed352d0f..9534de576e7 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -77,8 +77,7 @@ class DirectoryPartitioning; class HivePartitioning; struct ScanContext; - -class ScanOptions; +struct ScanOptions; class Scanner; @@ -88,7 +87,5 @@ class ScanTask; using ScanTaskVector = std::vector>; using ScanTaskIterator = Iterator>; -class RecordBatchProjector; - } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index fafe3338525..80da3eb9854 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -1674,10 +1674,10 @@ class ARROW_EXPORT Schema : public detail::Fingerprintable, public util::EqualityComparable, public util::ToStringOstreamable { public: - explicit Schema(std::vector> fields, Endianness endianness, + explicit Schema(FieldVector fields, Endianness endianness, std::shared_ptr metadata = NULLPTR); - explicit Schema(std::vector> fields, + explicit Schema(FieldVector fields, std::shared_ptr metadata = NULLPTR); Schema(const Schema&); @@ -1705,7 +1705,7 @@ class ARROW_EXPORT Schema : public detail::Fingerprintable, /// Return the ith schema element. Does not boundscheck const std::shared_ptr& field(int i) const; - const std::vector>& fields() const; + const FieldVector& fields() const; std::vector field_names() const; @@ -1713,7 +1713,7 @@ class ARROW_EXPORT Schema : public detail::Fingerprintable, std::shared_ptr GetFieldByName(const std::string& name) const; /// \brief Return the indices of all fields having this name in sorted order - std::vector> GetAllFieldsByName(const std::string& name) const; + FieldVector GetAllFieldsByName(const std::string& name) const; /// Returns -1 if name not found int GetFieldIndex(const std::string& name) const; diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc index a1e0f503340..3003888fc1c 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cc +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -440,7 +440,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann JniGetOrThrow(dataset->NewScan(context)); std::vector column_vector = ToStringVector(env, columns); - JniAssertOkOrThrow(scanner_builder->Project(column_vector)); + if (!column_vector.empty()) { + JniAssertOkOrThrow(scanner_builder->Project(column_vector)); + } JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size)); auto scanner = JniGetOrThrow(scanner_builder->Finish()); diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java index a9d950590a2..b86be100f6b 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestReservationListener.java @@ -39,11 +39,6 @@ public class TestReservationListener extends TestDataset { public static final String AVRO_SCHEMA_USER = "user.avsc"; - /** - * The default block size of C++ ReservationListenableMemoryPool. - */ - public static final long DEFAULT_NATIVE_MEMORY_POOL_BLOCK_SIZE = 512 * 1024; - @Test public void testDirectReservationListener() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); @@ -58,9 +53,8 @@ public void testDirectReservationListener() throws Exception { AutoCloseables.close(datum); AutoCloseables.close(pool); long finalReservation = DirectReservationListener.instance().getCurrentDirectMemReservation(); - final long expected_diff = DEFAULT_NATIVE_MEMORY_POOL_BLOCK_SIZE; - Assert.assertEquals(expected_diff, reservation - initReservation); - Assert.assertEquals(-expected_diff, finalReservation - reservation); + Assert.assertTrue(reservation >= initReservation); + Assert.assertTrue(finalReservation == initReservation); } @Test @@ -88,8 +82,7 @@ public void unreserve(long size) { AutoCloseables.close(datum); AutoCloseables.close(pool); long finalReservation = reserved.get(); - final long expected_diff = DEFAULT_NATIVE_MEMORY_POOL_BLOCK_SIZE; - Assert.assertEquals(expected_diff, reservation - initReservation); - Assert.assertEquals(-expected_diff, finalReservation - reservation); + Assert.assertTrue(reservation >= initReservation); + Assert.assertTrue(finalReservation == initReservation); } } diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 93bc0edddc1..bbe545cf794 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -55,12 +55,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef CResult[CExpression] CDeserializeExpression \ "arrow::dataset::Deserialize"(shared_ptr[CBuffer]) - cdef cppclass CRecordBatchProjector "arrow::dataset::RecordBatchProjector": - pass - cdef cppclass CScanOptions "arrow::dataset::ScanOptions": - CRecordBatchProjector projector - @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) @@ -310,11 +305,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CPartitioning] partitioning() const shared_ptr[CPartitioningFactory] factory() const - cdef CStatus CSetPartitionKeysInProjector \ - "arrow::dataset::KeyValuePartitioning::SetDefaultValuesFromKeys"( - const CExpression& partition_expression, - CRecordBatchProjector* projector) - cdef CResult[unordered_map[CFieldRef, CDatum, CFieldRefHash]] \ CExtractKnownFieldValues "arrow::dataset::ExtractKnownFieldValues"( const CExpression& partition_expression) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 57179f391de..09cad5d917b 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1520,13 +1520,14 @@ def test_construct_empty_dataset(): assert table.num_rows == 0 assert table.num_columns == 0 + +def test_construct_dataset_with_invalid_schema(): empty = ds.dataset([], schema=pa.schema([ ('a', pa.int64()), ('a', pa.string()) ])) - table = empty.to_table() - assert table.num_rows == 0 - assert table.num_columns == 2 + with pytest.raises(ValueError, match='Multiple matches for .*a.* in '): + empty.to_table() def test_construct_from_invalid_sources_raise(multisourcefs): @@ -2110,11 +2111,20 @@ def _check_dataset(schema, expected, expected_schema=None): names=['a', 'c']) _check_dataset(schema, expected) - # Specifying with incompatible schema + # Specifying with differing field types schema = pa.schema([('a', 'int32'), ('b', 'float64')]) dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema) + expected = pa.table([table['a'].cast('int32'), + table['b']], + names=['a', 'b']) + _check_dataset(schema, expected) + + # Specifying with incompatible schema + schema = pa.schema([('a', pa.list_(pa.int32())), ('b', 'float64')]) + dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema) assert dataset.schema.equals(schema) - with pytest.raises(TypeError): + with pytest.raises(NotImplementedError, + match='Unsupported cast from int64 to list'): dataset.to_table() @@ -2371,13 +2381,14 @@ def test_filter_mismatching_schema(tempdir): dataset = ds.dataset( tempdir / "data.parquet", format="parquet", schema=schema) - # filtering on a column with such type mismatch should give a proper error - with pytest.raises(TypeError): - dataset.to_table(filter=ds.field("col") > 2) + # filtering on a column with such type mismatch should implicitly + # cast the column + filtered = dataset.to_table(filter=ds.field("col") > 2) + assert filtered["col"].equals(table["col"].cast('int64').slice(2)) fragment = list(dataset.get_fragments())[0] - with pytest.raises(TypeError): - fragment.to_table(filter=ds.field("col") > 2, schema=schema) + filtered = fragment.to_table(filter=ds.field("col") > 2, schema=schema) + assert filtered["col"].equals(table["col"].cast('int64').slice(2)) @pytest.mark.parquet