Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,37 @@ namespace arrow {
namespace dataset {

FileSystemDataSourceDiscovery::FileSystemDataSourceDiscovery(
fs::FileSystem* filesystem, std::vector<fs::FileStats> files,
fs::FileSystem* filesystem, std::string base_dir, std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format)
: fs_(filesystem), files_(std::move(files)), format_(std::move(format)) {}
: fs_(filesystem),
base_dir_(std::move(base_dir)),
files_(std::move(files)),
format_(std::move(format)) {}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
std::string base_dir,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
out->reset(new FileSystemDataSourceDiscovery(filesystem, files, format));
out->reset(
new FileSystemDataSourceDiscovery(filesystem, std::move(base_dir), files, format));
return Status::OK();
}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
return Make(filesystem, "", std::move(files), std::move(format), out);
}

Status FileSystemDataSourceDiscovery::Make(fs::FileSystem* filesystem,
fs::Selector selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out) {
std::vector<fs::FileStats> files;
RETURN_NOT_OK(filesystem->GetTargetStats(selector, &files));
return Make(filesystem, files, format, out);
return Make(filesystem, std::move(selector.base_dir), std::move(files), format, out);
}

static inline Status InspectSchema(fs::FileSystem* fs,
Expand Down Expand Up @@ -85,7 +97,8 @@ Status FileSystemDataSourceDiscovery::Finish(std::shared_ptr<DataSource>* out) {
PathPartitions partitions;

if (partition_scheme_ != nullptr) {
RETURN_NOT_OK(ApplyPartitionScheme(*partition_scheme_, files_, &partitions));
RETURN_NOT_OK(
ApplyPartitionScheme(*partition_scheme_, base_dir_, files_, &partitions));
}

return FileSystemBasedDataSource::Make(fs_, files_, root_partition(),
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ class ARROW_DS_EXPORT DataSourceDiscovery {
/// of fs::FileStats or a fs::Selector.
class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery {
public:
static Status Make(fs::FileSystem* filesystem, std::string base_dir,
std::vector<fs::FileStats> files, std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out);

static Status Make(fs::FileSystem* filesystem, std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
std::shared_ptr<DataSourceDiscovery>* out);
Expand All @@ -102,11 +106,12 @@ class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery
Status Finish(std::shared_ptr<DataSource>* out) override;

protected:
FileSystemDataSourceDiscovery(fs::FileSystem* filesystem,
FileSystemDataSourceDiscovery(fs::FileSystem* filesystem, std::string base_dir,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format);

fs::FileSystem* fs_;
std::string base_dir_;
std::vector<fs::FileStats> files_;
std::shared_ptr<FileFormat> format_;
};
Expand Down
32 changes: 21 additions & 11 deletions cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "arrow/dataset/partition.h"
#include "arrow/dataset/test_util.h"
#include "arrow/filesystem/test_util.h"

Expand All @@ -31,16 +32,11 @@ class FileSystemDataSourceDiscoveryTest : public TestFileSystemBasedDataSource {
void MakeDiscovery(const std::vector<fs::FileStats>& files) {
MakeFileSystem(files);
ASSERT_OK(
FileSystemDataSourceDiscovery::Make(fs_.get(), files, format_, &discovery_));
}

void MakeDiscovery(const std::vector<fs::FileStats>& files, fs::Selector selector) {
MakeFileSystem(files);
ASSERT_OK(
FileSystemDataSourceDiscovery::Make(fs_.get(), selector, format_, &discovery_));
FileSystemDataSourceDiscovery::Make(fs_.get(), selector_, format_, &discovery_));
}

protected:
fs::Selector selector_;
std::shared_ptr<DataSourceDiscovery> discovery_;
std::shared_ptr<FileFormat> format_ = std::make_shared<DummyFileFormat>();
};
Expand All @@ -53,15 +49,29 @@ TEST_F(FileSystemDataSourceDiscoveryTest, Basic) {
}

TEST_F(FileSystemDataSourceDiscoveryTest, Selector) {
// This test ensure that the Selector is enforced.
fs::Selector selector;
selector.base_dir = "A";
MakeDiscovery({fs::File("0"), fs::File("A/a")}, selector);
selector_.base_dir = "A";
MakeDiscovery({fs::File("0"), fs::File("A/a")});

ASSERT_OK(discovery_->Finish(&source_));
// "0" doesn't match selector, so it has been dropped:
AssertFragmentsAreFromPath(source_->GetFragments(options_), {"A/a"});
}

TEST_F(FileSystemDataSourceDiscoveryTest, Partition) {
selector_.base_dir = "a=ignored/base";
MakeDiscovery(
{fs::File(selector_.base_dir + "/a=1"), fs::File(selector_.base_dir + "/a=2")});

auto partition_scheme =
std::make_shared<HivePartitionScheme>(schema({field("a", int32())}));

ASSERT_OK(discovery_->SetPartitionScheme(partition_scheme));
ASSERT_OK(discovery_->Finish(&source_));

AssertFragmentsAreFromPath(source_->GetFragments(options_),
{selector_.base_dir + "/a=1", selector_.base_dir + "/a=2"});
}

TEST_F(FileSystemDataSourceDiscoveryTest, Inspect) {
auto s = schema({field("f64", float64())});
format_ = std::make_shared<DummyFileFormat>(s);
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,18 @@ Result<std::shared_ptr<Expression>> HivePartitionScheme::Parse(

Status ApplyPartitionScheme(const PartitionScheme& scheme,
std::vector<fs::FileStats> files, PathPartitions* out) {
return ApplyPartitionScheme(scheme, "", std::move(files), out);
}

Status ApplyPartitionScheme(const PartitionScheme& scheme, const std::string& base_dir,
std::vector<fs::FileStats> files, PathPartitions* out) {
for (const auto& file : files) {
const auto& path = file.path();
if (file.path().substr(0, base_dir.size()) != base_dir) continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to decide what should be the most friendly behavior in such case. I'd tempted to vote for parse without trimming instead of skipping the file.

std::string path = file.path();
if (!base_dir.empty() && path.substr(0, base_dir.size()) == base_dir) {
  path = path.substr(base_dir.size());
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parsing some paths with a known prefix removed and other paths with an unknown prefix seems like a foot gun to me. I'd expect the files which lie outside base_dir to be picked up with a different instance of Discovery (with the correct base_dir for those files).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Ben. If I'm supplying my own file paths, and some are relative to the given "base_dir" and some aren't, (1) the simple partition scheme of applying a schema to path segments probably won't just work across all of them if they don't share a base_dir, and if I want to use it I should put them in different Discovery instances; and (2) if we tried to get partition info from files outside of base_dir, it will probably error in parsing the path segments, so continue and not assign partition data to that file is probably safest.

auto path = file.path().substr(base_dir.size());

std::shared_ptr<Expression> partition;
RETURN_NOT_OK(scheme.Parse(path, &partition));
out->emplace(path, std::move(partition));
out->emplace(std::move(path), std::move(partition));
}

return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ namespace arrow {

namespace fs {
struct FileStats;
}
struct Selector;
} // namespace fs

namespace dataset {

Expand Down Expand Up @@ -171,6 +172,9 @@ using PathPartitions = std::unordered_map<std::string, std::shared_ptr<Expressio
Status ApplyPartitionScheme(const PartitionScheme& scheme,
std::vector<fs::FileStats> files, PathPartitions* out);

Status ApplyPartitionScheme(const PartitionScheme& scheme, const std::string& base_dir,
std::vector<fs::FileStats> files, PathPartitions* out);

// TODO(bkietz) use RE2 and named groups to provide RegexpPartitionScheme

} // namespace dataset
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct ARROW_EXPORT FileStats {
void set_type(FileType type) { type_ = type; }

/// The full file path in the filesystem
std::string path() const { return path_; }
const std::string& path() const { return path_; }
void set_path(const std::string& path) { path_ = path; }

/// The file base name (component after the last directory separator)
Expand Down