diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index a0b6000ea4a..4524e05af34 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -319,33 +319,36 @@ TEST_F(TestEndToEnd, EndToEndSingleSource) { FileSystemDiscoveryOptions options; options.ignore_prefixes = {"."}; - ASSERT_OK_AND_ASSIGN(discovery, - FileSystemDataSourceDiscovery::Make(fs_, s, format, options)); - // Partitions expressions can be discovered for DataSource and DataFragments. // This metadata is then used in conjuction with the query filter to apply // the pushdown predicate optimization. - auto partition_schema = SchemaFromColumnNames(schema_, {"year", "month", "country"}); + // // The SchemaPartitionScheme is a simple scheme where the path is split with - // the directory separator character and the components are typed and named - // with the equivalent index in the schema, e.g. - // (with the previous defined schema): + // the directory separator character and the components are parsed as values + // of the corresponding fields in its schema. + // + // Since a PartitionSchemeDiscovery is specified instead of an explicit + // PartitionScheme, the types of partition fields will be inferred. // // - "/2019" -> {"year": 2019} // - "/2019/01 -> {"year": 2019, "month": 1} // - "/2019/01/CA -> {"year": 2019, "month": 1, "country": "CA"} // - "/2019/01/CA/a_file.json -> {"year": 2019, "month": 1, "country": "CA"} - auto partition_scheme = std::make_shared(partition_schema); - ASSERT_OK(discovery->SetPartitionScheme(partition_scheme)); + options.partition_scheme = + SchemaPartitionScheme::MakeDiscovery({"year", "month", "country"}); + + ASSERT_OK_AND_ASSIGN(discovery, + FileSystemDataSourceDiscovery::Make(fs_, s, format, options)); // DataFragments might have compatible but slightly different schemas, e.g. // schema evolved by adding/renaming columns. In this case, the schema is // passed to the dataset constructor. + // The inspected_schema may optionally be modified before being finalized. ASSERT_OK_AND_ASSIGN(auto inspected_schema, discovery->Inspect()); EXPECT_EQ(*schema_, *inspected_schema); // Build the DataSource where partitions are attached to fragments (files). - ASSERT_OK_AND_ASSIGN(auto datasource, discovery->Finish()); + ASSERT_OK_AND_ASSIGN(auto datasource, discovery->Finish(inspected_schema)); // Create the Dataset from our single DataSource. ASSERT_OK_AND_ASSIGN(auto dataset, Dataset::Make({datasource}, inspected_schema)); @@ -458,16 +461,18 @@ class TestSchemaUnification : public TestDataset { }; auto format = std::make_shared(resolver); + FileSystemDiscoveryOptions options; options.partition_base_dir = base; + options.partition_scheme = + std::make_shared(SchemaFromNames({"part_ds", "part_df"})); + ARROW_ASSIGN_OR_RAISE(auto discovery, FileSystemDataSourceDiscovery::Make( - fs_, std::move(paths), format, options)); + fs_, paths, format, options)); - auto scheme_schema = SchemaFromNames({"part_ds", "part_df"}); - auto partition_scheme = std::make_shared(scheme_schema); - RETURN_NOT_OK(discovery->SetPartitionScheme(partition_scheme)); + ARROW_ASSIGN_OR_RAISE(auto schema, discovery->Inspect()); - return discovery->Finish(); + return discovery->Finish(schema); }; schema_ = SchemaFromNames({"phy_1", "phy_2", "phy_3", "phy_4", "part_ds", "part_df"}); diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 2c92832b4e1..791824fae64 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -35,18 +35,11 @@ namespace arrow { namespace dataset { -DataSourceDiscovery::DataSourceDiscovery() - : schema_(arrow::schema({})), - partition_scheme_(PartitionScheme::Default()), - root_partition_(scalar(true)) {} +DataSourceDiscovery::DataSourceDiscovery() : root_partition_(scalar(true)) {} Result> DataSourceDiscovery::Inspect() { ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas()); - if (partition_scheme()) { - schemas.push_back(partition_scheme()->schema()); - } - if (schemas.empty()) { schemas.push_back(arrow::schema({})); } @@ -135,7 +128,7 @@ Result> FileSystemDataSourceDiscovery::Make ARROW_ASSIGN_OR_RAISE(forest, Filter(filesystem, format, options, std::move(forest))); return std::shared_ptr(new FileSystemDataSourceDiscovery( - filesystem, std::move(forest), std::move(format), std::move(options))); + std::move(filesystem), std::move(forest), std::move(format), std::move(options))); } Result> FileSystemDataSourceDiscovery::Make( @@ -158,23 +151,54 @@ Result> FileSystemDataSourceDiscovery::Make filesystem, std::move(forest), std::move(format), std::move(options))); } +Result> FileSystemDataSourceDiscovery::PartitionSchema() { + if (auto partition_scheme = options_.partition_scheme.scheme()) { + return partition_scheme->schema(); + } + + std::vector paths; + for (const auto& stats : forest_.stats()) { + if (auto relative = + fs::internal::RemoveAncestor(options_.partition_base_dir, stats.path())) { + paths.push_back(*relative); + } + } + + return options_.partition_scheme.discovery()->Inspect(paths); +} + Result>> FileSystemDataSourceDiscovery::InspectSchemas() { std::vector> schemas; for (const auto& f : forest_.stats()) { if (!f.IsFile()) continue; - auto source = FileSource(f.path(), fs_.get()); - ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(source)); + FileSource src(f.path(), fs_.get()); + ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(src)); schemas.push_back(schema); } + ARROW_ASSIGN_OR_RAISE(auto partition_schema, PartitionSchema()); + schemas.push_back(partition_schema); + return schemas; } -Result> FileSystemDataSourceDiscovery::Finish() { +Result> DataSourceDiscovery::Finish() { + ARROW_ASSIGN_OR_RAISE(auto schema, Inspect()); + return Finish(schema); +} + +Result> FileSystemDataSourceDiscovery::Finish( + const std::shared_ptr& schema) { ExpressionVector partitions(forest_.size(), scalar(true)); + std::shared_ptr partition_scheme = options_.partition_scheme.scheme(); + if (partition_scheme == nullptr) { + auto discovery = options_.partition_scheme.discovery(); + ARROW_ASSIGN_OR_RAISE(partition_scheme, discovery->Finish(schema)); + } + // apply partition_scheme to forest to derive partitions auto apply_partition_scheme = [&](fs::PathForest::Ref ref) { if (auto relative = fs::internal::RemoveAncestor(options_.partition_base_dir, @@ -183,7 +207,7 @@ Result> FileSystemDataSourceDiscovery::Finish() { if (segments.size() > 0) { auto segment_index = static_cast(segments.size()) - 1; - auto maybe_partition = partition_scheme_->Parse(segments.back(), segment_index); + auto maybe_partition = partition_scheme->Parse(segments.back(), segment_index); partitions[ref.i] = std::move(maybe_partition).ValueOr(scalar(true)); } @@ -192,6 +216,7 @@ Result> FileSystemDataSourceDiscovery::Finish() { }; RETURN_NOT_OK(forest_.Visit(apply_partition_scheme)); + return FileSystemDataSource::Make(fs_, forest_, std::move(partitions), root_partition_, format_); } diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index 883c5acba0e..95e448d54aa 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -25,11 +25,13 @@ #include #include +#include "arrow/dataset/partition.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/filesystem/filesystem.h" #include "arrow/filesystem/path_forest.h" #include "arrow/util/macros.h" +#include "arrow/util/variant.h" namespace arrow { namespace dataset { @@ -59,23 +61,14 @@ class ARROW_DS_EXPORT DataSourceDiscovery { /// \brief Get unified schema for the resulting DataSource. virtual Result> Inspect(); - /// \brief Create a DataSource with a given partition. - virtual Result> Finish() = 0; + /// \brief Create a DataSource with the given schema. + virtual Result> Finish( + const std::shared_ptr& schema) = 0; - std::shared_ptr schema() const { return schema_; } - Status SetSchema(std::shared_ptr schema) { - schema_ = schema; - return Status::OK(); - } - - const std::shared_ptr& partition_scheme() const { - return partition_scheme_; - } - Status SetPartitionScheme(std::shared_ptr partition_scheme) { - partition_scheme_ = partition_scheme; - return Status::OK(); - } + /// \brief Create a DataSource using an inspected schema. + virtual Result> Finish(); + /// \brief Optional root partition for the resulting DataSource. const std::shared_ptr& root_partition() const { return root_partition_; } Status SetRootPartition(std::shared_ptr partition) { root_partition_ = partition; @@ -87,12 +80,20 @@ class ARROW_DS_EXPORT DataSourceDiscovery { protected: DataSourceDiscovery(); - std::shared_ptr schema_; - std::shared_ptr partition_scheme_; std::shared_ptr root_partition_; }; struct FileSystemDiscoveryOptions { + // Either an explicit PartitionScheme or a PartitionSchemeDiscovery to discover one. + // + // If a discovery is provided, it will be used to infer a schema for partition fields + // based on file and directory paths then construct a PartitionScheme. The default + // is a PartitionScheme which will yield no partition information. + // + // The (explicit or discovered) partition scheme will be applied to discovered files + // and the resulting partition information embedded in the DataSource. + PartitionSchemeOrDiscovery partition_scheme{PartitionScheme::Default()}; + // For the purposes of applying the partition scheme, paths will be stripped // of the partition_base_dir. Files not matching the partition_base_dir // prefix will be skipped for partition discovery. The ignored files will still @@ -113,7 +114,7 @@ struct FileSystemDiscoveryOptions { // Invalid files (via selector or explicitly) will be excluded by checking // with the FileFormat::IsSupported method. This will incur IO for each files // in a serial and single threaded fashion. Disabling this feature will skip the - // IO, but unsupported files may will be present in the DataSource + // IO, but unsupported files may be present in the DataSource // (resulting in an error at scan time). bool exclude_invalid_files = true; @@ -166,7 +167,8 @@ class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery Result>> InspectSchemas() override; - Result> Finish() override; + Result> Finish( + const std::shared_ptr& schema) override; protected: FileSystemDataSourceDiscovery(std::shared_ptr filesystem, @@ -178,6 +180,8 @@ class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery const FileSystemDiscoveryOptions& options, fs::PathForest forest); + Result> PartitionSchema(); + std::shared_ptr fs_; fs::PathForest forest_; std::shared_ptr format_; diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index 6e022d9e416..563a84d8725 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -31,13 +31,9 @@ namespace dataset { class DataSourceDiscoveryTest : public TestFileSystemDataSource { public: - void AssertInspect(std::shared_ptr schema) { - ASSERT_OK_AND_ASSIGN(auto actual, discovery_->Inspect()); - EXPECT_EQ(*actual, *schema); - } - void AssertInspect(const std::vector>& expected_fields) { - AssertInspect(schema(expected_fields)); + ASSERT_OK_AND_ASSIGN(auto actual, discovery_->Inspect()); + EXPECT_EQ(*actual, Schema(expected_fields)); } void AssertInspectSchemas(std::vector> expected) { @@ -62,7 +58,7 @@ class MockDataSourceDiscovery : public DataSourceDiscovery { return schemas_; } - Result> Finish() override { + Result> Finish(const std::shared_ptr&) override { return std::make_shared( std::vector>{}); } @@ -103,39 +99,28 @@ class MockDataSourceDiscoveryTest : public DataSourceDiscoveryTest { TEST_F(MockDataSourceDiscoveryTest, UnifySchemas) { MakeDiscovery({}); - AssertInspect(schema({})); + AssertInspect({}); MakeDiscovery({schema({i32}), schema({i32})}); - AssertInspect(schema({i32})); + AssertInspect({i32}); MakeDiscovery({schema({i32}), schema({i64})}); - AssertInspect(schema({i32, i64})); + AssertInspect({i32, i64}); MakeDiscovery({schema({i32}), schema({i64})}); - AssertInspect(schema({i32, i64})); + AssertInspect({i32, i64}); MakeDiscovery({schema({i32}), schema({i32_req})}); - AssertInspect(schema({i32})); + AssertInspect({i32}); MakeDiscovery({schema({i32, f64}), schema({i32_req, i64})}); - AssertInspect(schema({i32, f64, i64})); + AssertInspect({i32, f64, i64}); MakeDiscovery({schema({i32, f64}), schema({f64, i32_fake})}); // Unification fails when fields with the same name have clashing types. ASSERT_RAISES(Invalid, discovery_->Inspect()); // Return the individual schema for closer inspection should not fail. AssertInspectSchemas({schema({i32, f64}), schema({f64, i32_fake})}); - - // Partition Scheme's schema should be taken into account - MakeDiscovery({schema({i64, f64})}); - auto partition_scheme = std::make_shared(schema({i32})); - ASSERT_OK(discovery_->SetPartitionScheme(partition_scheme)); - AssertInspect(schema({i64, f64, i32})); - - // Partition scheme with an existing column should be fine. - partition_scheme = std::make_shared(schema({i64})); - ASSERT_OK(discovery_->SetPartitionScheme(partition_scheme)); - AssertInspect(schema({i64, f64})); } class FileSystemDataSourceDiscoveryTest : public DataSourceDiscoveryTest { @@ -146,9 +131,13 @@ class FileSystemDataSourceDiscoveryTest : public DataSourceDiscoveryTest { fs_, selector_, format_, discovery_options_)); } - void AssertFinishWithPaths(std::vector paths) { - options_ = ScanOptions::Make(discovery_->schema()); - ASSERT_OK_AND_ASSIGN(source_, discovery_->Finish()); + void AssertFinishWithPaths(std::vector paths, + std::shared_ptr schema = nullptr) { + if (schema == nullptr) { + ASSERT_OK_AND_ASSIGN(schema, discovery_->Inspect()); + } + options_ = ScanOptions::Make(schema); + ASSERT_OK_AND_ASSIGN(source_, discovery_->Finish(schema)); AssertFragmentsAreFromPath(source_->GetFragments(options_), paths); } @@ -176,17 +165,28 @@ TEST_F(FileSystemDataSourceDiscoveryTest, Selector) { MakeDiscovery({fs::File("0"), fs::File("A/a"), fs::File("A/A/a")}); // partition_base_dir should not affect filtered files, ony the applied // partition scheme. + AssertInspect({}); AssertFinishWithPaths({"A/a", "A/A/a"}); } -TEST_F(FileSystemDataSourceDiscoveryTest, Partition) { +TEST_F(FileSystemDataSourceDiscoveryTest, ExplicitPartition) { selector_.base_dir = "a=ignored/base"; + discovery_options_.partition_scheme = + std::make_shared(schema({field("a", float64())})); + + MakeDiscovery( + {fs::File(selector_.base_dir + "/a=1"), fs::File(selector_.base_dir + "/a=2")}); + + AssertInspect({field("a", float64())}); + AssertFinishWithPaths({selector_.base_dir + "/a=1", selector_.base_dir + "/a=2"}); +} + +TEST_F(FileSystemDataSourceDiscoveryTest, DiscoveredPartition) { + selector_.base_dir = "a=ignored/base"; + discovery_options_.partition_scheme = HivePartitionScheme::MakeDiscovery(); MakeDiscovery( {fs::File(selector_.base_dir + "/a=1"), fs::File(selector_.base_dir + "/a=2")}); - auto partition_scheme = - std::make_shared(schema({field("a", int32())})); - ASSERT_OK(discovery_->SetPartitionScheme(partition_scheme)); AssertInspect({field("a", int32())}); AssertFinishWithPaths({selector_.base_dir + "/a=1", selector_.base_dir + "/a=2"}); } @@ -195,13 +195,13 @@ TEST_F(FileSystemDataSourceDiscoveryTest, MissingDirectories) { MakeFileSystem({fs::File("base_dir/a=3/b=3/dat"), fs::File("unpartitioned/ignored=3")}); discovery_options_.partition_base_dir = "base_dir"; + discovery_options_.partition_scheme = std::make_shared( + schema({field("a", int32()), field("b", int32())})); + ASSERT_OK_AND_ASSIGN( discovery_, FileSystemDataSourceDiscovery::Make( fs_, {"base_dir/a=3/b=3/dat", "unpartitioned/ignored=3"}, format_, discovery_options_)); - auto partition_scheme = std::make_shared( - schema({field("a", int32()), field("b", int32())})); - ASSERT_OK(discovery_->SetPartitionScheme(partition_scheme)); AssertInspect({field("a", int32()), field("b", int32())}); AssertFinishWithPaths({"base_dir/a=3/b=3/dat", "unpartitioned/ignored=3"}); @@ -236,15 +236,12 @@ TEST_F(FileSystemDataSourceDiscoveryTest, Inspect) { auto s = schema({field("f64", float64())}); format_ = std::make_shared(s); - MakeDiscovery({}); - // No files - ASSERT_OK_AND_ASSIGN(auto actual, discovery_->Inspect()); - EXPECT_EQ(*actual, Schema({})); + MakeDiscovery({}); + AssertInspect({}); MakeDiscovery({fs::File("test")}); - ASSERT_OK_AND_ASSIGN(actual, discovery_->Inspect()); - EXPECT_EQ(*actual, *s); + AssertInspect(s->fields()); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 3bb02275fe8..07e5cc238bb 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -59,8 +59,6 @@ std::shared_ptr PartitionScheme::Default() { return std::make_shared(); } -PartitionSchemeDiscovery::PartitionSchemeDiscovery() : schema_(::arrow::schema({})) {} - Result> SegmentDictionaryPartitionScheme::Parse( const std::string& segment, int i) const { if (static_cast(i) < dictionaries_.size()) { @@ -143,8 +141,18 @@ class SchemaPartitionSchemeDiscovery : public PartitionSchemeDiscovery { return SchemaFromColumnNames(InferSchema(name_to_values), field_names_); } - Result> Finish() const override { - return std::shared_ptr(new SchemaPartitionScheme(schema_)); + Result> Finish( + const std::shared_ptr& schema) const override { + for (const auto& field_name : field_names_) { + if (schema->GetFieldIndex(field_name) == -1) { + return Status::TypeError("no field named '", field_name, "' in schema", *schema); + } + } + + // drop fields which aren't in field_names_ + auto out_schema = SchemaFromColumnNames(schema, field_names_); + + return std::make_shared(std::move(out_schema)); } private: @@ -186,8 +194,9 @@ class HivePartitionSchemeDiscovery : public PartitionSchemeDiscovery { return InferSchema(name_to_values); } - Result> Finish() const override { - return std::shared_ptr(new SchemaPartitionScheme(schema_)); + Result> Finish( + const std::shared_ptr& schema) const override { + return std::shared_ptr(new SchemaPartitionScheme(schema)); } private: diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index b485e2b5f32..8f18975a713 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -94,19 +94,10 @@ class ARROW_DS_EXPORT PartitionSchemeDiscovery { virtual Result> Inspect( const std::vector& paths) const = 0; - /// Create a partition scheme with schema inferred from stats. - virtual Result> Finish() const = 0; - - std::shared_ptr schema() const { return schema_; } - Status SetSchema(std::shared_ptr schema) { - schema_ = schema; - return Status::OK(); - } - - protected: - PartitionSchemeDiscovery(); - - std::shared_ptr schema_; + /// Create a partition scheme using the provided schema + /// (fields may be dropped). + virtual Result> Finish( + const std::shared_ptr& schema) const = 0; }; /// \brief Subclass for representing the default, always true scheme. @@ -236,5 +227,45 @@ class ARROW_DS_EXPORT FunctionPartitionScheme : public PartitionScheme { // TODO(bkietz) use RE2 and named groups to provide RegexpPartitionScheme +/// \brief Either a PartitionScheme or a PartitionSchemeDiscovery +class ARROW_DS_EXPORT PartitionSchemeOrDiscovery { + public: + explicit PartitionSchemeOrDiscovery(std::shared_ptr scheme) + : variant_(std::move(scheme)) {} + + explicit PartitionSchemeOrDiscovery(std::shared_ptr discovery) + : variant_(std::move(discovery)) {} + + PartitionSchemeOrDiscovery& operator=(std::shared_ptr scheme) { + variant_ = std::move(scheme); + return *this; + } + + PartitionSchemeOrDiscovery& operator=( + std::shared_ptr discovery) { + variant_ = std::move(discovery); + return *this; + } + + std::shared_ptr scheme() const { + if (util::holds_alternative>(variant_)) { + return util::get>(variant_); + } + return NULLPTR; + } + + std::shared_ptr discovery() const { + if (util::holds_alternative>(variant_)) { + return util::get>(variant_); + } + return NULLPTR; + } + + private: + util::variant, + std::shared_ptr> + variant_; +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index e9a75b421af..39ac39504a2 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -57,6 +57,7 @@ class TestPartitionScheme : public ::testing::Test { const std::vector>& expected) { ASSERT_OK_AND_ASSIGN(auto actual, discovery_->Inspect(paths)); ASSERT_EQ(*actual, Schema(expected)); + ASSERT_OK(discovery_->Finish(actual).status()); } protected: diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 0db03cd9e4f..42cbd7930b3 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -74,6 +74,8 @@ class PartitionScheme; class PartitionSchemeDiscovery; +class PartitionSchemeOrDiscovery; + struct ScanContext; class ScanOptions; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 782c9c46e8e..58d8537fbba 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -118,6 +118,27 @@ cdef class PartitionScheme: return pyarrow_wrap_schema(self.scheme.schema()) +cdef class PartitionSchemeDiscovery: + + cdef: + shared_ptr[CPartitionSchemeDiscovery] wrapped + CPartitionSchemeDiscovery* discovery + + def __init__(self): + _forbid_instantiation(self.__class__) + + @staticmethod + cdef wrap(const shared_ptr[CPartitionSchemeDiscovery]& sp): + cdef PartitionSchemeDiscovery self + self = PartitionSchemeDiscovery() + self.wrapped = sp + self.discovery = sp.get() + return self + + cdef inline shared_ptr[CPartitionSchemeDiscovery] unwrap(self): + return self.wrapped + + cdef class DefaultPartitionScheme(PartitionScheme): cdef: @@ -186,6 +207,37 @@ cdef class FileSystemDiscoveryOptions: cdef inline CFileSystemDiscoveryOptions unwrap(self): return self.options + @property + def partition_scheme(self): + """PartitionScheme to apply to discovered files. + + NOTE: setting this property will overwrite partition_scheme_discovery. + """ + cdef shared_ptr[CPartitionScheme] s = self.options.partition_scheme.scheme() + if s.get() == nullptr: + return None + return PartitionScheme.wrap(s) + + @partition_scheme.setter + def partition_scheme(self, PartitionScheme value): + self.options.partition_scheme = ( value).unwrap() + + @property + def partition_scheme_discovery(self): + """PartitionSchemeDiscovery to apply to discovered files and + discover a PartitionScheme. + + NOTE: setting this property will overwrite partition_scheme. + """ + cdef shared_ptr[CPartitionSchemeDiscovery] d = self.options.partition_scheme.discovery() + if d.get() == nullptr: + return None + return PartitionSchemeDiscovery.wrap(d) + + @partition_scheme_discovery.setter + def partition_scheme_discovery(self, PartitionSchemeDiscovery value): + self.options.partition_scheme = ( value).unwrap() + @property def partition_base_dir(self): return frombytes(self.options.partition_base_dir) @@ -234,19 +286,6 @@ cdef class DataSourceDiscovery: cdef inline shared_ptr[CDataSourceDiscovery] unwrap(self) nogil: return self.wrapped - @property - def partition_scheme(self): - cdef shared_ptr[CPartitionScheme] scheme - scheme = self.discovery.partition_scheme() - if scheme.get() == nullptr: - return None - else: - return PartitionScheme.wrap(scheme) - - @partition_scheme.setter - def partition_scheme(self, PartitionScheme scheme not None): - check_status(self.discovery.SetPartitionScheme(scheme.unwrap())) - @property def root_partition(self): cdef shared_ptr[CExpression] expr = self.discovery.root_partition() @@ -275,10 +314,17 @@ cdef class DataSourceDiscovery: result = self.discovery.Inspect() return pyarrow_wrap_schema(GetResultValue(result)) - def finish(self): - cdef CResult[shared_ptr[CDataSource]] result - with nogil: - result = self.discovery.Finish() + def finish(self, Schema schema = None): + cdef: + shared_ptr[CSchema] sp_schema + CResult[shared_ptr[CDataSource]] result + if schema is not None: + sp_schema = pyarrow_unwrap_schema(schema) + with nogil: + result = self.discovery.Finish(sp_schema) + else: + with nogil: + result = self.discovery.Finish() return DataSource.wrap(GetResultValue(result)) @@ -291,20 +337,41 @@ cdef class FileSystemDataSourceDiscovery(DataSourceDiscovery): FileFormat format not None, FileSystemDiscoveryOptions options=None): cdef: - FileStats file_stats - vector[CFileStats] stats + vector[c_string] paths + CFileSelector selector CResult[shared_ptr[CDataSourceDiscovery]] result + shared_ptr[CFileSystem] c_filesystem + shared_ptr[CFileFormat] c_format + CFileSystemDiscoveryOptions c_options + + c_filesystem = filesystem.unwrap() + + c_format = format.unwrap() options = options or FileSystemDiscoveryOptions() - for file_stats in filesystem.get_target_stats(paths_or_selector): - stats.push_back(file_stats.unwrap()) + c_options = options.unwrap() + + if isinstance(paths_or_selector, FileSelector): + with nogil: + selector = (paths_or_selector).selector + result = CFileSystemDataSourceDiscovery.MakeFromSelector( + c_filesystem, + selector, + c_format, + c_options + ) + elif isinstance(paths_or_selector, (list, tuple)): + paths = [tobytes(s) for s in paths_or_selector] + with nogil: + result = CFileSystemDataSourceDiscovery.MakeFromPaths( + c_filesystem, + paths, + c_format, + c_options + ) + else: + raise TypeError('Must pass either paths or a FileSelector') - result = CFileSystemDataSourceDiscovery.MakeFromFileStats( - filesystem.unwrap(), - stats, - format.unwrap(), - options.unwrap() - ) self.init(GetResultValue(result)) cdef init(self, shared_ptr[CDataSourceDiscovery]& sp): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index c67bb740b8a..9d7f379d047 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -169,9 +169,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[shared_ptr[CScanner]] Finish() shared_ptr[CSchema] schema() const - ctypedef shared_ptr[CScannerBuilder] shared_ptr[CScannerBuilder] \ - "arrow::dataset::std::shared_ptr" - cdef cppclass CDataFragment "arrow::dataset::DataFragment": CResult[CScanTaskIterator] Scan(shared_ptr[CScanContext] context) c_bool splittable() @@ -179,6 +176,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: ctypedef vector[shared_ptr[CDataFragment]] CDataFragmentVector \ "arrow::dataset::DataFragmentVector" + ctypedef CIterator[shared_ptr[CDataFragment]] CDataFragmentIterator \ "arrow::dataset::DataFragmentIterator" @@ -256,9 +254,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CParquetDataFragment(const CFileSource& source, shared_ptr[CScanOptions] options) - ctypedef unordered_map[c_string, shared_ptr[CExpression]] CPathPartitions \ - "arrow::dataset::PathPartitions" - cdef cppclass CFileSystemDataSource \ "arrow::dataset::FileSystemDataSource"(CDataSource): @staticmethod @@ -295,6 +290,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[shared_ptr[CExpression]] Parse(const c_string& path) const const shared_ptr[CSchema]& schema() + cdef cppclass CPartitionSchemeDiscovery "arrow::dataset::PartitionSchemeDiscovery": + pass + cdef cppclass CDefaultPartitionScheme \ "arrow::dataset::DefaultPartitionScheme"(CPartitionScheme): CDefaultPartitionScheme() @@ -307,8 +305,18 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::HivePartitionScheme"(CPartitionScheme): CHivePartitionScheme(shared_ptr[CSchema] schema) + cdef cppclass CPartitionSchemeOrDiscovery \ + "arrow::dataset::PartitionSchemeOrDiscovery": + CPartitionSchemeOrDiscovery(shared_ptr[CPartitionScheme]) + CPartitionSchemeOrDiscovery(shared_ptr[CPartitionSchemeDiscovery]) + CPartitionSchemeOrDiscovery& operator=(shared_ptr[CPartitionScheme]) + CPartitionSchemeOrDiscovery& operator=(shared_ptr[CPartitionSchemeDiscovery]) + shared_ptr[CPartitionScheme] scheme() const + shared_ptr[CPartitionSchemeDiscovery] discovery() const + cdef cppclass CFileSystemDiscoveryOptions \ "arrow::dataset::FileSystemDiscoveryOptions": + CPartitionSchemeOrDiscovery partition_scheme c_string partition_base_dir c_bool exclude_invalid_files vector[c_string] ignore_prefixes @@ -316,12 +324,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CDataSourceDiscovery "arrow::dataset::DataSourceDiscovery": CResult[vector[shared_ptr[CSchema]]] InspectSchemas() CResult[shared_ptr[CSchema]] Inspect() + CResult[shared_ptr[CDataSource]] Finish(shared_ptr[CSchema]) CResult[shared_ptr[CDataSource]] Finish() - shared_ptr[CSchema] schema() - CStatus SetSchema(shared_ptr[CSchema]) - shared_ptr[CPartitionScheme] partition_scheme() - CStatus SetPartitionScheme( - shared_ptr[CPartitionScheme] partition_scheme) shared_ptr[CExpression] root_partition() CStatus SetRootPartition(shared_ptr[CExpression] partition) @@ -329,16 +333,16 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::FileSystemDataSourceDiscovery"( CDataSourceDiscovery): @staticmethod - CResult[shared_ptr[CDataSourceDiscovery]] MakeFromFileStats "Make"( + CResult[shared_ptr[CDataSourceDiscovery]] MakeFromPaths "Make"( shared_ptr[CFileSystem] filesytem, - CFileStatsVector paths, + vector[c_string] paths, shared_ptr[CFileFormat] format, CFileSystemDiscoveryOptions options ) @staticmethod CResult[shared_ptr[CDataSourceDiscovery]] MakeFromSelector "Make"( shared_ptr[CFileSystem] filesytem, - CSelector, + CFileSelector, shared_ptr[CFileFormat] format, CFileSystemDiscoveryOptions options ) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index a18d2404f6c..75948cc24b6 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -67,16 +67,16 @@ def dataset(mockfs): format = ds.ParquetFileFormat() selector = fs.FileSelector('subdir', recursive=True) options = ds.FileSystemDiscoveryOptions('subdir') - discovery = ds.FileSystemDataSourceDiscovery(mockfs, selector, format, - options) - discovery.partition_scheme = ds.SchemaPartitionScheme( + options.partition_scheme = ds.SchemaPartitionScheme( pa.schema([ pa.field('group', pa.int32()), pa.field('key', pa.string()) ]) ) - source = discovery.finish() + discovery = ds.FileSystemDataSourceDiscovery(mockfs, selector, format, + options) schema = discovery.inspect() + source = discovery.finish() return ds.Dataset([source], schema) @@ -307,6 +307,12 @@ def test_file_system_discovery(mockfs, paths_or_selector): format = ds.ParquetFileFormat() options = ds.FileSystemDiscoveryOptions('subdir') + options.partition_scheme = ds.SchemaPartitionScheme( + pa.schema([ + pa.field('group', pa.int32()), + pa.field('key', pa.string()) + ]) + ) assert options.partition_base_dir == 'subdir' assert options.ignore_prefixes == ['.', '_'] assert options.exclude_invalid_files is True @@ -314,22 +320,17 @@ def test_file_system_discovery(mockfs, paths_or_selector): discovery = ds.FileSystemDataSourceDiscovery( mockfs, paths_or_selector, format, options ) + inspected_schema = discovery.inspect() + assert isinstance(discovery.inspect(), pa.Schema) assert isinstance(discovery.inspect_schemas(), list) - assert isinstance(discovery.finish(), ds.FileSystemDataSource) - assert isinstance(discovery.partition_scheme, ds.DefaultPartitionScheme) + assert isinstance(discovery.finish(inspected_schema), + ds.FileSystemDataSource) assert discovery.root_partition.equals(ds.ScalarExpression(True)) - discovery.partition_scheme = ds.SchemaPartitionScheme( - pa.schema([ - pa.field('group', pa.int32()), - pa.field('key', pa.string()) - ]) - ) data_source = discovery.finish() assert isinstance(data_source, ds.DataSource) - inspected_schema = discovery.inspect() dataset = ds.Dataset([data_source], inspected_schema) scanner = dataset.new_scan().finish() diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 10d2eb5cd4c..3fdc78f9a2d 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -340,20 +340,24 @@ csv___TableReader__Read <- function(table_reader){ .Call(`_arrow_csv___TableReader__Read` , table_reader) } -dataset___FSDSDiscovery__Make <- function(fs, selector){ - .Call(`_arrow_dataset___FSDSDiscovery__Make` , fs, selector) +dataset___FSDSDiscovery__Make2 <- function(fs, selector, partition_scheme){ + .Call(`_arrow_dataset___FSDSDiscovery__Make2` , fs, selector, partition_scheme) } -dataset___DSDiscovery__Finish <- function(discovery){ - .Call(`_arrow_dataset___DSDiscovery__Finish` , discovery) +dataset___FSDSDiscovery__Make1 <- function(fs, selector){ + .Call(`_arrow_dataset___FSDSDiscovery__Make1` , fs, selector) } -dataset___DSDiscovery__Inspect <- function(discovery){ - .Call(`_arrow_dataset___DSDiscovery__Inspect` , discovery) +dataset___DSDiscovery__Finish1 <- function(discovery){ + .Call(`_arrow_dataset___DSDiscovery__Finish1` , discovery) +} + +dataset___DSDiscovery__Finish2 <- function(discovery, schema){ + .Call(`_arrow_dataset___DSDiscovery__Finish2` , discovery, schema) } -dataset___DSDiscovery__SetPartitionScheme <- function(discovery, part){ - invisible(.Call(`_arrow_dataset___DSDiscovery__SetPartitionScheme` , discovery, part)) +dataset___DSDiscovery__Inspect <- function(discovery){ + .Call(`_arrow_dataset___DSDiscovery__Inspect` , discovery) } dataset___SchemaPartitionScheme <- function(schm){ diff --git a/r/R/dataset.R b/r/R/dataset.R index 609205c9fc3..4c7223c8d61 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -34,18 +34,18 @@ #' @seealso [PartitionScheme] for defining partitioning #' @include arrow-package.R open_dataset <- function (path, schema = NULL, partition = NULL, ...) { - dsd <- DataSourceDiscovery$create(path, ...) if (!is.null(partition)) { if (inherits(partition, "Schema")) { + # TODO(bkietz): allow specifying a PartitionSchemeDiscovery partition <- SchemaPartitionScheme$create(partition) } assert_is(partition, "PartitionScheme") - dsd$SetPartitionScheme(partition) } + dsd <- DataSourceDiscovery$create(path, partition_scheme = partition, ...) if (is.null(schema)) { schema <- dsd$Inspect() } - Dataset$create(list(dsd$Finish()), schema) + Dataset$create(list(dsd$Finish(schema)), schema) } #' Multi-file datasets @@ -122,8 +122,7 @@ names.Dataset <- function(x) names(x$schema) #' `DataSourceDiscovery` and its subclasses have the following methods: #' #' - `$Inspect()`: Walks the files in the directory and returns a common [Schema] -#' - `$SetPartitionScheme(part)`: Takes a [PartitionScheme] -#' - `$Finish()`: Returns a `DataSource` +#' - `$Finish(schema)`: Returns a `DataSource` #' @rdname DataSource #' @name DataSource #' @seealso [Dataset] for what do do with a `DataSource` @@ -136,11 +135,12 @@ DataSource <- R6Class("DataSource", inherit = Object) #' @export DataSourceDiscovery <- R6Class("DataSourceDiscovery", inherit = Object, public = list( - Finish = function() shared_ptr(DataSource, dataset___DSDiscovery__Finish(self)), - SetPartitionScheme = function(part) { - assert_is(part, "PartitionScheme") - dataset___DSDiscovery__SetPartitionScheme(self, part) - self + Finish = function(schema = NULL) { + if (is.null(schema)) { + shared_ptr(DataSource, dataset___DSDiscovery__Finish1(self)) + } else { + shared_ptr(DataSource, dataset___DSDiscovery__Finish2(self, schema)) + } }, Inspect = function() shared_ptr(Schema, dataset___DSDiscovery__Inspect(self)) ) @@ -150,6 +150,7 @@ DataSourceDiscovery$create <- function(path, format = c("parquet"), allow_non_existent = FALSE, recursive = TRUE, + partition_scheme = NULL, ...) { if (!inherits(filesystem, "FileSystem")) { filesystem <- match.arg(filesystem) @@ -168,7 +169,7 @@ DataSourceDiscovery$create <- function(path, recursive = recursive ) # This may also require different initializers - FileSystemDataSourceDiscovery$create(filesystem, selector, format) + FileSystemDataSourceDiscovery$create(filesystem, selector, format, partition_scheme) } #' @usage NULL @@ -180,14 +181,23 @@ FileSystemDataSourceDiscovery <- R6Class("FileSystemDataSourceDiscovery", ) FileSystemDataSourceDiscovery$create <- function(filesystem, selector, - format = "parquet") { + format = "parquet", + partition_scheme = NULL) { assert_is(filesystem, "FileSystem") assert_is(selector, "FileSelector") format <- match.arg(format) # Only parquet for now - shared_ptr( - FileSystemDataSourceDiscovery, - dataset___FSDSDiscovery__Make(filesystem, selector) - ) + if (is.null(partition_scheme)) { + shared_ptr( + FileSystemDataSourceDiscovery, + dataset___FSDSDiscovery__Make1(filesystem, selector) + ) + } else { + assert_is(partition_scheme, "PartitionScheme") + shared_ptr( + FileSystemDataSourceDiscovery, + dataset___FSDSDiscovery__Make2(filesystem, selector, partition_scheme) + ) + } } #' Scan the contents of a dataset @@ -253,7 +263,7 @@ names.ScannerBuilder <- function(x) names(x$schema) #' Define a partition scheme for a DataSource #' #' @description -#' Pass a `PartitionScheme` to a [DataSourceDiscovery]'s `$SetPartitionScheme()` +#' Pass a `PartitionScheme` to a [FileSystemDataSourceDiscovery]'s `$create()` #' method to indicate how the file's paths should be interpreted to define #' partitioning. #' diff --git a/r/man/DataSource.Rd b/r/man/DataSource.Rd index 66ada015b80..25120c251c1 100644 --- a/r/man/DataSource.Rd +++ b/r/man/DataSource.Rd @@ -45,8 +45,8 @@ takes the following arguments: \code{DataSourceDiscovery} and its subclasses have the following methods: \itemize{ \item \verb{$Inspect()}: Walks the files in the directory and returns a common \link{Schema} -\item \verb{$SetPartitionScheme(part)}: Takes a \link{PartitionScheme} -\item \verb{$Finish()}: Returns a \code{DataSource} +\item \verb{$Finish(schema)}: Returns a \code{DataSource}. If no schema is provided one +will be derived using \verb{$Inspect()}. } } diff --git a/r/man/PartitionScheme.Rd b/r/man/PartitionScheme.Rd index 3eb65b40d04..c8ab448c289 100644 --- a/r/man/PartitionScheme.Rd +++ b/r/man/PartitionScheme.Rd @@ -6,7 +6,7 @@ \alias{HivePartitionScheme} \title{Define a partition scheme for a DataSource} \description{ -Pass a \code{PartitionScheme} to a \link{DataSourceDiscovery}'s \verb{$SetPartitionScheme()} +Pass a \code{PartitionScheme} to \link{DataSourceDiscovery}'s \verb{$create()} method to indicate how the file's paths should be interpreted to define partitioning. diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 4fd9b6e372c..fd003806eaa 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1340,64 +1340,80 @@ RcppExport SEXP _arrow_csv___TableReader__Read(SEXP table_reader_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr dataset___FSDSDiscovery__Make(const std::shared_ptr& fs, const std::shared_ptr& selector); -RcppExport SEXP _arrow_dataset___FSDSDiscovery__Make(SEXP fs_sexp, SEXP selector_sexp){ +std::shared_ptr dataset___FSDSDiscovery__Make2(const std::shared_ptr& fs, const std::shared_ptr& selector, const std::shared_ptr& partition_scheme); +RcppExport SEXP _arrow_dataset___FSDSDiscovery__Make2(SEXP fs_sexp, SEXP selector_sexp, SEXP partition_scheme_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type fs(fs_sexp); Rcpp::traits::input_parameter&>::type selector(selector_sexp); - return Rcpp::wrap(dataset___FSDSDiscovery__Make(fs, selector)); + Rcpp::traits::input_parameter&>::type partition_scheme(partition_scheme_sexp); + return Rcpp::wrap(dataset___FSDSDiscovery__Make2(fs, selector, partition_scheme)); END_RCPP } #else -RcppExport SEXP _arrow_dataset___FSDSDiscovery__Make(SEXP fs_sexp, SEXP selector_sexp){ - Rf_error("Cannot call dataset___FSDSDiscovery__Make(). Please use arrow::install_arrow() to install required runtime libraries. "); +RcppExport SEXP _arrow_dataset___FSDSDiscovery__Make2(SEXP fs_sexp, SEXP selector_sexp, SEXP partition_scheme_sexp){ + Rf_error("Cannot call dataset___FSDSDiscovery__Make2(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr dataset___DSDiscovery__Finish(const std::shared_ptr& discovery); -RcppExport SEXP _arrow_dataset___DSDiscovery__Finish(SEXP discovery_sexp){ +std::shared_ptr dataset___FSDSDiscovery__Make1(const std::shared_ptr& fs, const std::shared_ptr& selector); +RcppExport SEXP _arrow_dataset___FSDSDiscovery__Make1(SEXP fs_sexp, SEXP selector_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type fs(fs_sexp); + Rcpp::traits::input_parameter&>::type selector(selector_sexp); + return Rcpp::wrap(dataset___FSDSDiscovery__Make1(fs, selector)); +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___FSDSDiscovery__Make1(SEXP fs_sexp, SEXP selector_sexp){ + Rf_error("Cannot call dataset___FSDSDiscovery__Make1(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr dataset___DSDiscovery__Finish1(const std::shared_ptr& discovery); +RcppExport SEXP _arrow_dataset___DSDiscovery__Finish1(SEXP discovery_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type discovery(discovery_sexp); - return Rcpp::wrap(dataset___DSDiscovery__Finish(discovery)); + return Rcpp::wrap(dataset___DSDiscovery__Finish1(discovery)); END_RCPP } #else -RcppExport SEXP _arrow_dataset___DSDiscovery__Finish(SEXP discovery_sexp){ - Rf_error("Cannot call dataset___DSDiscovery__Finish(). Please use arrow::install_arrow() to install required runtime libraries. "); +RcppExport SEXP _arrow_dataset___DSDiscovery__Finish1(SEXP discovery_sexp){ + Rf_error("Cannot call dataset___DSDiscovery__Finish1(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr dataset___DSDiscovery__Inspect(const std::shared_ptr& discovery); -RcppExport SEXP _arrow_dataset___DSDiscovery__Inspect(SEXP discovery_sexp){ +std::shared_ptr dataset___DSDiscovery__Finish2(const std::shared_ptr& discovery, const std::shared_ptr& schema); +RcppExport SEXP _arrow_dataset___DSDiscovery__Finish2(SEXP discovery_sexp, SEXP schema_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type discovery(discovery_sexp); - return Rcpp::wrap(dataset___DSDiscovery__Inspect(discovery)); + Rcpp::traits::input_parameter&>::type schema(schema_sexp); + return Rcpp::wrap(dataset___DSDiscovery__Finish2(discovery, schema)); END_RCPP } #else -RcppExport SEXP _arrow_dataset___DSDiscovery__Inspect(SEXP discovery_sexp){ - Rf_error("Cannot call dataset___DSDiscovery__Inspect(). Please use arrow::install_arrow() to install required runtime libraries. "); +RcppExport SEXP _arrow_dataset___DSDiscovery__Finish2(SEXP discovery_sexp, SEXP schema_sexp){ + Rf_error("Cannot call dataset___DSDiscovery__Finish2(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -void dataset___DSDiscovery__SetPartitionScheme(const std::shared_ptr& discovery, const std::shared_ptr& part); -RcppExport SEXP _arrow_dataset___DSDiscovery__SetPartitionScheme(SEXP discovery_sexp, SEXP part_sexp){ +std::shared_ptr dataset___DSDiscovery__Inspect(const std::shared_ptr& discovery); +RcppExport SEXP _arrow_dataset___DSDiscovery__Inspect(SEXP discovery_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type discovery(discovery_sexp); - Rcpp::traits::input_parameter&>::type part(part_sexp); - dataset___DSDiscovery__SetPartitionScheme(discovery, part); - return R_NilValue; + return Rcpp::wrap(dataset___DSDiscovery__Inspect(discovery)); END_RCPP } #else -RcppExport SEXP _arrow_dataset___DSDiscovery__SetPartitionScheme(SEXP discovery_sexp, SEXP part_sexp){ - Rf_error("Cannot call dataset___DSDiscovery__SetPartitionScheme(). Please use arrow::install_arrow() to install required runtime libraries. "); +RcppExport SEXP _arrow_dataset___DSDiscovery__Inspect(SEXP discovery_sexp){ + Rf_error("Cannot call dataset___DSDiscovery__Inspect(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -5552,10 +5568,11 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_csv___ConvertOptions__initialize", (DL_FUNC) &_arrow_csv___ConvertOptions__initialize, 1}, { "_arrow_csv___TableReader__Make", (DL_FUNC) &_arrow_csv___TableReader__Make, 4}, { "_arrow_csv___TableReader__Read", (DL_FUNC) &_arrow_csv___TableReader__Read, 1}, - { "_arrow_dataset___FSDSDiscovery__Make", (DL_FUNC) &_arrow_dataset___FSDSDiscovery__Make, 2}, - { "_arrow_dataset___DSDiscovery__Finish", (DL_FUNC) &_arrow_dataset___DSDiscovery__Finish, 1}, + { "_arrow_dataset___FSDSDiscovery__Make2", (DL_FUNC) &_arrow_dataset___FSDSDiscovery__Make2, 3}, + { "_arrow_dataset___FSDSDiscovery__Make1", (DL_FUNC) &_arrow_dataset___FSDSDiscovery__Make1, 2}, + { "_arrow_dataset___DSDiscovery__Finish1", (DL_FUNC) &_arrow_dataset___DSDiscovery__Finish1, 1}, + { "_arrow_dataset___DSDiscovery__Finish2", (DL_FUNC) &_arrow_dataset___DSDiscovery__Finish2, 2}, { "_arrow_dataset___DSDiscovery__Inspect", (DL_FUNC) &_arrow_dataset___DSDiscovery__Inspect, 1}, - { "_arrow_dataset___DSDiscovery__SetPartitionScheme", (DL_FUNC) &_arrow_dataset___DSDiscovery__SetPartitionScheme, 2}, { "_arrow_dataset___SchemaPartitionScheme", (DL_FUNC) &_arrow_dataset___SchemaPartitionScheme, 1}, { "_arrow_dataset___HivePartitionScheme", (DL_FUNC) &_arrow_dataset___HivePartitionScheme, 1}, { "_arrow_dataset___Dataset__create", (DL_FUNC) &_arrow_dataset___Dataset__create, 2}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 25b85905b18..160b18f5857 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -20,35 +20,47 @@ #if defined(ARROW_R_WITH_ARROW) // [[arrow::export]] -std::shared_ptr dataset___FSDSDiscovery__Make( +std::shared_ptr dataset___FSDSDiscovery__Make2( const std::shared_ptr& fs, - const std::shared_ptr& selector) { + const std::shared_ptr& selector, + const std::shared_ptr& partition_scheme) { // TODO(npr): add format as an argument, don't hard-code Parquet auto format = std::make_shared(); + // TODO(fsaintjacques): Make options configurable auto options = ds::FileSystemDiscoveryOptions{}; + if (partition_scheme != nullptr) { + options.partition_scheme = partition_scheme; + } return VALUE_OR_STOP( ds::FileSystemDataSourceDiscovery::Make(fs, *selector, format, options)); } // [[arrow::export]] -std::shared_ptr dataset___DSDiscovery__Finish( - const std::shared_ptr& discovery) { - return VALUE_OR_STOP(discovery->Finish()); +std::shared_ptr dataset___FSDSDiscovery__Make1( + const std::shared_ptr& fs, + const std::shared_ptr& selector) { + return dataset___FSDSDiscovery__Make2(fs, selector, nullptr); } // [[arrow::export]] -std::shared_ptr dataset___DSDiscovery__Inspect( +std::shared_ptr dataset___DSDiscovery__Finish1( const std::shared_ptr& discovery) { - return VALUE_OR_STOP(discovery->Inspect()); + return VALUE_OR_STOP(discovery->Finish()); } // [[arrow::export]] -void dataset___DSDiscovery__SetPartitionScheme( +std::shared_ptr dataset___DSDiscovery__Finish2( const std::shared_ptr& discovery, - const std::shared_ptr& part) { - STOP_IF_NOT_OK(discovery->SetPartitionScheme(part)); + const std::shared_ptr& schema) { + return VALUE_OR_STOP(discovery->Finish(schema)); +} + +// [[arrow::export]] +std::shared_ptr dataset___DSDiscovery__Inspect( + const std::shared_ptr& discovery) { + return VALUE_OR_STOP(discovery->Inspect()); } // [[arrow::export]] diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index ad7a1dbe4af..7c2c6522239 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -152,33 +152,37 @@ test_that("filter() on timestamp columns", { test_that("Assembling a Dataset manually and getting a Table", { fs <- LocalFileSystem$create() selector <- FileSelector$create(dataset_dir, recursive = TRUE) - dsd <- FileSystemDataSourceDiscovery$create(fs, selector) + partition <- SchemaPartitionScheme$create(schema(part = double())) + + dsd <- FileSystemDataSourceDiscovery$create(fs, selector, partition_scheme = partition) expect_is(dsd, "FileSystemDataSourceDiscovery") + schm <- dsd$Inspect() expect_is(schm, "Schema") - expect_equal( - schm, - ParquetFileReader$create(file.path(dataset_dir, 1, "file1.parquet"))$GetSchema() - ) - dsd$SetPartitionScheme(SchemaPartitionScheme$create(schema(part = double()))) - datasource <- dsd$Finish() + + phys_schm <- ParquetFileReader$create(file.path(dataset_dir, 1, "file1.parquet"))$GetSchema() + expect_equal(names(phys_schm), names(df1)) + expect_equal(names(schm), c(names(phys_schm), "part")) + + datasource <- dsd$Finish(schm) expect_is(datasource, "DataSource") ds <- Dataset$create(list(datasource), schm) expect_is(ds, "Dataset") - # TODO: this should fail when "part" is in the schema - expect_equal(names(ds), names(df1)) + expect_equal(names(ds), names(schm)) sb <- ds$NewScan() expect_is(sb, "ScannerBuilder") expect_equal(sb$schema, schm) - expect_equal(names(sb), names(df1)) + sb$Project(c("chr", "lgl")) sb$Filter(FieldExpression$create("dbl") == 8) scn <- sb$Finish() expect_is(scn, "Scanner") + tab <- scn$ToTable() expect_is(tab, "Table") + expect_equivalent( as.data.frame(tab), df1[8, c("chr", "lgl")]