Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 61 additions & 60 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,17 @@ Result<std::shared_ptr<io::OutputStream>> FileSystem::OpenAppendStream(
//////////////////////////////////////////////////////////////////////////
// SubTreeFileSystem implementation

namespace {

Status ValidateSubPath(util::string_view s) {
if (internal::IsLikelyUri(s)) {
return Status::Invalid("Expected a filesystem path, got a URI: '", s, "'");
}
return Status::OK();
}

} // namespace

SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path,
std::shared_ptr<FileSystem> base_fs)
: FileSystem(base_fs->io_context()),
Expand All @@ -270,20 +281,21 @@ bool SubTreeFileSystem::Equals(const FileSystem& other) const {
return base_path_ == subfs.base_path_ && base_fs_->Equals(subfs.base_fs_);
}

std::string SubTreeFileSystem::PrependBase(const std::string& s) const {
Result<std::string> SubTreeFileSystem::PrependBase(const std::string& s) const {
RETURN_NOT_OK(ValidateSubPath(s));
if (s.empty()) {
return base_path_;
} else {
return ConcatAbstractPath(base_path_, s);
}
}

Status SubTreeFileSystem::PrependBaseNonEmpty(std::string* s) const {
if (s->empty()) {
Result<std::string> SubTreeFileSystem::PrependBaseNonEmpty(const std::string& s) const {
RETURN_NOT_OK(ValidateSubPath(s));
if (s.empty()) {
return Status::IOError("Empty path");
} else {
*s = ConcatAbstractPath(base_path_, *s);
return Status::OK();
return ConcatAbstractPath(base_path_, s);
}
}

Expand All @@ -305,19 +317,21 @@ Status SubTreeFileSystem::FixInfo(FileInfo* info) const {
}

Result<std::string> SubTreeFileSystem::NormalizePath(std::string path) {
ARROW_ASSIGN_OR_RAISE(auto normalized, base_fs_->NormalizePath(PrependBase(path)));
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBase(path));
ARROW_ASSIGN_OR_RAISE(auto normalized, base_fs_->NormalizePath(real_path));
return StripBase(std::move(normalized));
}

Result<FileInfo> SubTreeFileSystem::GetFileInfo(const std::string& path) {
ARROW_ASSIGN_OR_RAISE(FileInfo info, base_fs_->GetFileInfo(PrependBase(path)));
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBase(path));
ARROW_ASSIGN_OR_RAISE(FileInfo info, base_fs_->GetFileInfo(real_path));
RETURN_NOT_OK(FixInfo(&info));
return info;
}

Result<std::vector<FileInfo>> SubTreeFileSystem::GetFileInfo(const FileSelector& select) {
auto selector = select;
selector.base_dir = PrependBase(selector.base_dir);
ARROW_ASSIGN_OR_RAISE(selector.base_dir, PrependBase(selector.base_dir));
ARROW_ASSIGN_OR_RAISE(auto infos, base_fs_->GetFileInfo(selector));
for (auto& info : infos) {
RETURN_NOT_OK(FixInfo(&info));
Expand All @@ -327,7 +341,11 @@ Result<std::vector<FileInfo>> SubTreeFileSystem::GetFileInfo(const FileSelector&

FileInfoGenerator SubTreeFileSystem::GetFileInfoGenerator(const FileSelector& select) {
auto selector = select;
selector.base_dir = PrependBase(selector.base_dir);
auto maybe_base_dir = PrependBase(selector.base_dir);
if (!maybe_base_dir.ok()) {
return MakeFailingGenerator<std::vector<FileInfo>>(maybe_base_dir.status());
}
selector.base_dir = *std::move(maybe_base_dir);
auto gen = base_fs_->GetFileInfoGenerator(selector);

auto self = checked_pointer_cast<SubTreeFileSystem>(shared_from_this());
Expand All @@ -343,23 +361,21 @@ FileInfoGenerator SubTreeFileSystem::GetFileInfoGenerator(const FileSelector& se
}

Status SubTreeFileSystem::CreateDir(const std::string& path, bool recursive) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->CreateDir(s, recursive);
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(path));
return base_fs_->CreateDir(real_path, recursive);
}

Status SubTreeFileSystem::DeleteDir(const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->DeleteDir(s);
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(path));
return base_fs_->DeleteDir(real_path);
}

Status SubTreeFileSystem::DeleteDirContents(const std::string& path) {
if (internal::IsEmptyPath(path)) {
return internal::InvalidDeleteDirContents(path);
}
auto s = PrependBase(path);
return base_fs_->DeleteDirContents(s);
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBase(path));
return base_fs_->DeleteDirContents(real_path);
}

Status SubTreeFileSystem::DeleteRootDirContents() {
Expand All @@ -371,103 +387,88 @@ Status SubTreeFileSystem::DeleteRootDirContents() {
}

Status SubTreeFileSystem::DeleteFile(const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->DeleteFile(s);
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(path));
return base_fs_->DeleteFile(real_path);
}

Status SubTreeFileSystem::Move(const std::string& src, const std::string& dest) {
auto s = src;
auto d = dest;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
RETURN_NOT_OK(PrependBaseNonEmpty(&d));
return base_fs_->Move(s, d);
ARROW_ASSIGN_OR_RAISE(auto real_src, PrependBaseNonEmpty(src));
ARROW_ASSIGN_OR_RAISE(auto real_dest, PrependBaseNonEmpty(dest));
return base_fs_->Move(real_src, real_dest);
}

Status SubTreeFileSystem::CopyFile(const std::string& src, const std::string& dest) {
auto s = src;
auto d = dest;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
RETURN_NOT_OK(PrependBaseNonEmpty(&d));
return base_fs_->CopyFile(s, d);
ARROW_ASSIGN_OR_RAISE(auto real_src, PrependBaseNonEmpty(src));
ARROW_ASSIGN_OR_RAISE(auto real_dest, PrependBaseNonEmpty(dest));
return base_fs_->CopyFile(real_src, real_dest);
}

Result<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStream(
const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenInputStream(s);
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(path));
return base_fs_->OpenInputStream(real_path);
}

Result<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStream(
const FileInfo& info) {
auto s = info.path();
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(info.path()));
FileInfo new_info(info);
new_info.set_path(std::move(s));
new_info.set_path(std::move(real_path));
return base_fs_->OpenInputStream(new_info);
}

Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync(
const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenInputStreamAsync(s);
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(path));
return base_fs_->OpenInputStreamAsync(real_path);
}

Future<std::shared_ptr<io::InputStream>> SubTreeFileSystem::OpenInputStreamAsync(
const FileInfo& info) {
auto s = info.path();
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(info.path()));
FileInfo new_info(info);
new_info.set_path(std::move(s));
new_info.set_path(std::move(real_path));
return base_fs_->OpenInputStreamAsync(new_info);
}

Result<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFile(
const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenInputFile(s);
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(path));
return base_fs_->OpenInputFile(real_path);
}

Result<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFile(
const FileInfo& info) {
auto s = info.path();
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(info.path()));
FileInfo new_info(info);
new_info.set_path(std::move(s));
new_info.set_path(std::move(real_path));
return base_fs_->OpenInputFile(new_info);
}

Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync(
const std::string& path) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenInputFileAsync(s);
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(path));
return base_fs_->OpenInputFileAsync(real_path);
}

Future<std::shared_ptr<io::RandomAccessFile>> SubTreeFileSystem::OpenInputFileAsync(
const FileInfo& info) {
auto s = info.path();
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(info.path()));
FileInfo new_info(info);
new_info.set_path(std::move(s));
new_info.set_path(std::move(real_path));
return base_fs_->OpenInputFileAsync(new_info);
}

Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenOutputStream(
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenOutputStream(s, metadata);
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(path));
return base_fs_->OpenOutputStream(real_path, metadata);
}

Result<std::shared_ptr<io::OutputStream>> SubTreeFileSystem::OpenAppendStream(
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
auto s = path;
RETURN_NOT_OK(PrependBaseNonEmpty(&s));
return base_fs_->OpenAppendStream(s, metadata);
ARROW_ASSIGN_OR_RAISE(auto real_path, PrependBaseNonEmpty(path));
return base_fs_->OpenAppendStream(real_path, metadata);
}

//////////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem {
const std::string base_path_;
std::shared_ptr<FileSystem> base_fs_;

std::string PrependBase(const std::string& s) const;
Status PrependBaseNonEmpty(std::string* s) const;
Result<std::string> PrependBase(const std::string& s) const;
Result<std::string> PrependBaseNonEmpty(const std::string& s) const;
Result<std::string> StripBase(const std::string& s) const;
Status FixInfo(FileInfo* info) const;

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ struct GcsPath {
std::string object;

static Result<GcsPath> FromString(const std::string& s) {
if (internal::IsLikelyUri(s)) {
return Status::Invalid(
"Expected a GCS object path of the form 'bucket/key...', got a URI: '", s, "'");
}
auto const first_sep = s.find_first_of(internal::kSep);
if (first_sep == 0) {
return Status::Invalid("Path cannot start with a separator ('", s, "')");
Expand Down
54 changes: 54 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,9 @@ TEST(GcsFileSystem, ObjectMetadataRoundtrip) {
TEST_F(GcsIntegrationTest, GetFileInfoBucket) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
arrow::fs::AssertFileInfo(fs.get(), PreexistingBucketPath(), FileType::Directory);

// URI
ASSERT_RAISES(Invalid, fs->GetFileInfo("gs://" + PreexistingBucketPath()));
}

TEST_F(GcsIntegrationTest, GetFileInfoObject) {
Expand All @@ -487,6 +490,9 @@ TEST_F(GcsIntegrationTest, GetFileInfoObject) {
ASSERT_TRUE(object.ok()) << "status=" << object.status();
arrow::fs::AssertFileInfo(fs.get(), PreexistingObjectPath(), FileType::File,
object->time_created(), static_cast<int64_t>(object->size()));

// URI
ASSERT_RAISES(Invalid, fs->GetFileInfo("gs://" + PreexistingObjectName()));
}

TEST_F(GcsIntegrationTest, GetFileInfoSelectorRecursive) {
Expand All @@ -508,6 +514,10 @@ TEST_F(GcsIntegrationTest, GetFileInfoSelectorRecursive) {
selector.max_recursion = 16;
ASSERT_OK_AND_ASSIGN(auto results, fs->GetFileInfo(selector));
EXPECT_THAT(results, UnorderedElementsAreArray(expected.begin(), expected.end()));

// URI
selector.base_dir = "gs://" + selector.base_dir;
ASSERT_RAISES(Invalid, fs->GetFileInfo(selector));
}

TEST_F(GcsIntegrationTest, GetFileInfoSelectorNonRecursive) {
Expand Down Expand Up @@ -626,6 +636,11 @@ TEST_F(GcsIntegrationTest, CreateDirRecursiveBucketAndFolder) {
arrow::fs::AssertFileInfo(fs.get(), bucket_name + "/", FileType::Directory);
}

TEST_F(GcsIntegrationTest, CreateDirUri) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
ASSERT_RAISES(Invalid, fs->CreateDir("gs://" + RandomBucketName(), true));
}

TEST_F(GcsIntegrationTest, DeleteDirSuccess) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
ASSERT_OK_AND_ASSIGN(auto hierarchy, CreateHierarchy(fs));
Expand All @@ -641,6 +656,11 @@ TEST_F(GcsIntegrationTest, DeleteDirSuccess) {
}
}

TEST_F(GcsIntegrationTest, DeleteDirUri) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
ASSERT_RAISES(Invalid, fs->DeleteDir("gs://" + PreexistingBucketPath()));
}

TEST_F(GcsIntegrationTest, DeleteDirContentsSuccess) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
ASSERT_OK_AND_ASSIGN(auto hierarchy, CreateHierarchy(fs));
Expand Down Expand Up @@ -682,6 +702,11 @@ TEST_F(GcsIntegrationTest, DeleteFileDirectoryFails) {
ASSERT_RAISES(IOError, fs->DeleteFile(path));
}

TEST_F(GcsIntegrationTest, DeleteFileUri) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
ASSERT_RAISES(Invalid, fs->DeleteFile("gs://" + PreexistingObjectPath()));
}

TEST_F(GcsIntegrationTest, MoveFileSuccess) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
const auto destination_path = PreexistingBucketPath() + "move-destination";
Expand All @@ -708,6 +733,13 @@ TEST_F(GcsIntegrationTest, MoveFileCannotRenameToDirectory) {
PreexistingBucketPath() + "destination/"));
}

TEST_F(GcsIntegrationTest, MoveFileUri) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
const auto destination_path = PreexistingBucketPath() + "move-destination";
ASSERT_RAISES(Invalid, fs->Move("gs://" + PreexistingObjectPath(), destination_path));
ASSERT_RAISES(Invalid, fs->Move(PreexistingObjectPath(), "gs://" + destination_path));
}

TEST_F(GcsIntegrationTest, CopyFileSuccess) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
const auto destination_path = PreexistingBucketPath() + "copy-destination";
Expand All @@ -721,6 +753,15 @@ TEST_F(GcsIntegrationTest, CopyFileNotFound) {
ASSERT_RAISES(IOError, fs->CopyFile(NotFoundObjectPath(), destination_path));
}

TEST_F(GcsIntegrationTest, CopyFileUri) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
const auto destination_path = PreexistingBucketPath() + "copy-destination";
ASSERT_RAISES(Invalid,
fs->CopyFile("gs://" + PreexistingObjectPath(), destination_path));
ASSERT_RAISES(Invalid,
fs->CopyFile(PreexistingObjectPath(), "gs://" + destination_path));
}

TEST_F(GcsIntegrationTest, OpenInputStreamString) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

Expand Down Expand Up @@ -797,6 +838,11 @@ TEST_F(GcsIntegrationTest, OpenInputStreamInfoInvalid) {
ASSERT_RAISES(IOError, fs->OpenInputStream(info));
}

TEST_F(GcsIntegrationTest, OpenInputStreamUri) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
ASSERT_RAISES(Invalid, fs->OpenInputStream("gs://" + PreexistingObjectPath()));
}

TEST_F(GcsIntegrationTest, OpenInputStreamReadMetadata) {
auto client = GcsClient();
const auto custom_time = std::chrono::system_clock::now() + std::chrono::hours(1);
Expand Down Expand Up @@ -940,6 +986,14 @@ TEST_F(GcsIntegrationTest, OpenOutputStreamClosed) {
ASSERT_RAISES(Invalid, output->Tell());
}

TEST_F(GcsIntegrationTest, OpenOutputStreamUri) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

const auto path =
internal::ConcatAbstractPath(PreexistingBucketName(), "open-output-stream-uri.txt");
ASSERT_RAISES(Invalid, fs->OpenInputStream("gs://" + path));
}

TEST_F(GcsIntegrationTest, OpenInputFileMixedReadVsReadAt) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

Expand Down
Loading