Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,33 +319,36 @@ TEST_F(TestEndToEnd, EndToEndSingleSource) {
FileSystemDiscoveryOptions options;
options.ignore_prefixes = {"."};

ASSERT_OK_AND_ASSIGN(discovery,
FileSystemDataSourceDiscovery::Make(fs_, s, format, options));

// Partitions expressions can be discovered for DataSource and DataFragments.
// This metadata is then used in conjuction with the query filter to apply
// the pushdown predicate optimization.
auto partition_schema = SchemaFromColumnNames(schema_, {"year", "month", "country"});
//
// The SchemaPartitionScheme is a simple scheme where the path is split with
// the directory separator character and the components are typed and named
// with the equivalent index in the schema, e.g.
// (with the previous defined schema):
// the directory separator character and the components are parsed as values
// of the corresponding fields in its schema.
//
// Since a PartitionSchemeDiscovery is specified instead of an explicit
// PartitionScheme, the types of partition fields will be inferred.
//
// - "/2019" -> {"year": 2019}
// - "/2019/01 -> {"year": 2019, "month": 1}
// - "/2019/01/CA -> {"year": 2019, "month": 1, "country": "CA"}
// - "/2019/01/CA/a_file.json -> {"year": 2019, "month": 1, "country": "CA"}
auto partition_scheme = std::make_shared<SchemaPartitionScheme>(partition_schema);
ASSERT_OK(discovery->SetPartitionScheme(partition_scheme));
options.partition_scheme =
SchemaPartitionScheme::MakeDiscovery({"year", "month", "country"});

ASSERT_OK_AND_ASSIGN(discovery,
FileSystemDataSourceDiscovery::Make(fs_, s, format, options));

// DataFragments might have compatible but slightly different schemas, e.g.
// schema evolved by adding/renaming columns. In this case, the schema is
// passed to the dataset constructor.
// The inspected_schema may optionally be modified before being finalized.
ASSERT_OK_AND_ASSIGN(auto inspected_schema, discovery->Inspect());
EXPECT_EQ(*schema_, *inspected_schema);

// Build the DataSource where partitions are attached to fragments (files).
ASSERT_OK_AND_ASSIGN(auto datasource, discovery->Finish());
ASSERT_OK_AND_ASSIGN(auto datasource, discovery->Finish(inspected_schema));

// Create the Dataset from our single DataSource.
ASSERT_OK_AND_ASSIGN(auto dataset, Dataset::Make({datasource}, inspected_schema));
Expand Down Expand Up @@ -458,16 +461,18 @@ class TestSchemaUnification : public TestDataset {
};

auto format = std::make_shared<JSONRecordBatchFileFormat>(resolver);

FileSystemDiscoveryOptions options;
options.partition_base_dir = base;
options.partition_scheme =
std::make_shared<HivePartitionScheme>(SchemaFromNames({"part_ds", "part_df"}));

ARROW_ASSIGN_OR_RAISE(auto discovery, FileSystemDataSourceDiscovery::Make(
fs_, std::move(paths), format, options));
fs_, paths, format, options));

auto scheme_schema = SchemaFromNames({"part_ds", "part_df"});
auto partition_scheme = std::make_shared<HivePartitionScheme>(scheme_schema);
RETURN_NOT_OK(discovery->SetPartitionScheme(partition_scheme));
ARROW_ASSIGN_OR_RAISE(auto schema, discovery->Inspect());

return discovery->Finish();
return discovery->Finish(schema);
};

schema_ = SchemaFromNames({"phy_1", "phy_2", "phy_3", "phy_4", "part_ds", "part_df"});
Expand Down
51 changes: 38 additions & 13 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,11 @@
namespace arrow {
namespace dataset {

DataSourceDiscovery::DataSourceDiscovery()
: schema_(arrow::schema({})),
partition_scheme_(PartitionScheme::Default()),
root_partition_(scalar(true)) {}
DataSourceDiscovery::DataSourceDiscovery() : root_partition_(scalar(true)) {}

Result<std::shared_ptr<Schema>> DataSourceDiscovery::Inspect() {
ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas());

if (partition_scheme()) {
schemas.push_back(partition_scheme()->schema());
}

if (schemas.empty()) {
schemas.push_back(arrow::schema({}));
}
Expand Down Expand Up @@ -135,7 +128,7 @@ Result<std::shared_ptr<DataSourceDiscovery>> FileSystemDataSourceDiscovery::Make
ARROW_ASSIGN_OR_RAISE(forest, Filter(filesystem, format, options, std::move(forest)));

return std::shared_ptr<DataSourceDiscovery>(new FileSystemDataSourceDiscovery(
filesystem, std::move(forest), std::move(format), std::move(options)));
std::move(filesystem), std::move(forest), std::move(format), std::move(options)));
}

Result<std::shared_ptr<DataSourceDiscovery>> FileSystemDataSourceDiscovery::Make(
Expand All @@ -158,23 +151,54 @@ Result<std::shared_ptr<DataSourceDiscovery>> FileSystemDataSourceDiscovery::Make
filesystem, std::move(forest), std::move(format), std::move(options)));
}

Result<std::shared_ptr<Schema>> FileSystemDataSourceDiscovery::PartitionSchema() {
if (auto partition_scheme = options_.partition_scheme.scheme()) {
return partition_scheme->schema();
}

std::vector<util::string_view> paths;
for (const auto& stats : forest_.stats()) {
if (auto relative =
fs::internal::RemoveAncestor(options_.partition_base_dir, stats.path())) {
paths.push_back(*relative);
}
}

return options_.partition_scheme.discovery()->Inspect(paths);
}

Result<std::vector<std::shared_ptr<Schema>>>
FileSystemDataSourceDiscovery::InspectSchemas() {
std::vector<std::shared_ptr<Schema>> schemas;

for (const auto& f : forest_.stats()) {
if (!f.IsFile()) continue;
auto source = FileSource(f.path(), fs_.get());
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(source));
FileSource src(f.path(), fs_.get());
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(src));
schemas.push_back(schema);
}

ARROW_ASSIGN_OR_RAISE(auto partition_schema, PartitionSchema());
schemas.push_back(partition_schema);

return schemas;
}

Result<std::shared_ptr<DataSource>> FileSystemDataSourceDiscovery::Finish() {
Result<std::shared_ptr<DataSource>> DataSourceDiscovery::Finish() {
ARROW_ASSIGN_OR_RAISE(auto schema, Inspect());
return Finish(schema);
}

Result<std::shared_ptr<DataSource>> FileSystemDataSourceDiscovery::Finish(
const std::shared_ptr<Schema>& schema) {
ExpressionVector partitions(forest_.size(), scalar(true));

std::shared_ptr<PartitionScheme> partition_scheme = options_.partition_scheme.scheme();
if (partition_scheme == nullptr) {
auto discovery = options_.partition_scheme.discovery();
ARROW_ASSIGN_OR_RAISE(partition_scheme, discovery->Finish(schema));
}

// apply partition_scheme to forest to derive partitions
auto apply_partition_scheme = [&](fs::PathForest::Ref ref) {
if (auto relative = fs::internal::RemoveAncestor(options_.partition_base_dir,
Expand All @@ -183,7 +207,7 @@ Result<std::shared_ptr<DataSource>> FileSystemDataSourceDiscovery::Finish() {

if (segments.size() > 0) {
auto segment_index = static_cast<int>(segments.size()) - 1;
auto maybe_partition = partition_scheme_->Parse(segments.back(), segment_index);
auto maybe_partition = partition_scheme->Parse(segments.back(), segment_index);

partitions[ref.i] = std::move(maybe_partition).ValueOr(scalar(true));
}
Expand All @@ -192,6 +216,7 @@ Result<std::shared_ptr<DataSource>> FileSystemDataSourceDiscovery::Finish() {
};

RETURN_NOT_OK(forest_.Visit(apply_partition_scheme));

return FileSystemDataSource::Make(fs_, forest_, std::move(partitions), root_partition_,
format_);
}
Expand Down
42 changes: 23 additions & 19 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
#include <string>
#include <vector>

#include "arrow/dataset/partition.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/path_forest.h"
#include "arrow/util/macros.h"
#include "arrow/util/variant.h"

namespace arrow {
namespace dataset {
Expand Down Expand Up @@ -59,23 +61,14 @@ class ARROW_DS_EXPORT DataSourceDiscovery {
/// \brief Get unified schema for the resulting DataSource.
virtual Result<std::shared_ptr<Schema>> Inspect();

/// \brief Create a DataSource with a given partition.
virtual Result<std::shared_ptr<DataSource>> Finish() = 0;
/// \brief Create a DataSource with the given schema.
virtual Result<std::shared_ptr<DataSource>> Finish(
const std::shared_ptr<Schema>& schema) = 0;

std::shared_ptr<Schema> schema() const { return schema_; }
Status SetSchema(std::shared_ptr<Schema> schema) {
schema_ = schema;
return Status::OK();
}

const std::shared_ptr<PartitionScheme>& partition_scheme() const {
return partition_scheme_;
}
Status SetPartitionScheme(std::shared_ptr<PartitionScheme> partition_scheme) {
partition_scheme_ = partition_scheme;
return Status::OK();
}
/// \brief Create a DataSource using an inspected schema.
virtual Result<std::shared_ptr<DataSource>> Finish();

/// \brief Optional root partition for the resulting DataSource.
const std::shared_ptr<Expression>& root_partition() const { return root_partition_; }
Status SetRootPartition(std::shared_ptr<Expression> partition) {
root_partition_ = partition;
Expand All @@ -87,12 +80,20 @@ class ARROW_DS_EXPORT DataSourceDiscovery {
protected:
DataSourceDiscovery();

std::shared_ptr<Schema> schema_;
std::shared_ptr<PartitionScheme> partition_scheme_;
std::shared_ptr<Expression> root_partition_;
};

struct FileSystemDiscoveryOptions {
// Either an explicit PartitionScheme or a PartitionSchemeDiscovery to discover one.
//
// If a discovery is provided, it will be used to infer a schema for partition fields
// based on file and directory paths then construct a PartitionScheme. The default
// is a PartitionScheme which will yield no partition information.
//
// The (explicit or discovered) partition scheme will be applied to discovered files
// and the resulting partition information embedded in the DataSource.
PartitionSchemeOrDiscovery partition_scheme{PartitionScheme::Default()};

// For the purposes of applying the partition scheme, paths will be stripped
// of the partition_base_dir. Files not matching the partition_base_dir
// prefix will be skipped for partition discovery. The ignored files will still
Expand All @@ -113,7 +114,7 @@ struct FileSystemDiscoveryOptions {
// Invalid files (via selector or explicitly) will be excluded by checking
// with the FileFormat::IsSupported method. This will incur IO for each files
// in a serial and single threaded fashion. Disabling this feature will skip the
// IO, but unsupported files may will be present in the DataSource
// IO, but unsupported files may be present in the DataSource
// (resulting in an error at scan time).
bool exclude_invalid_files = true;

Expand Down Expand Up @@ -166,7 +167,8 @@ class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery

Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas() override;

Result<std::shared_ptr<DataSource>> Finish() override;
Result<std::shared_ptr<DataSource>> Finish(
const std::shared_ptr<Schema>& schema) override;

protected:
FileSystemDataSourceDiscovery(std::shared_ptr<fs::FileSystem> filesystem,
Expand All @@ -178,6 +180,8 @@ class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery
const FileSystemDiscoveryOptions& options,
fs::PathForest forest);

Result<std::shared_ptr<Schema>> PartitionSchema();

std::shared_ptr<fs::FileSystem> fs_;
fs::PathForest forest_;
std::shared_ptr<FileFormat> format_;
Expand Down
Loading