Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/examples/arrow/filesystem_definition_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class ExampleFileSystem : public fs::FileSystem {
}
};

fs::FileSystemRegistrar kExampleFileSystemModule{
auto kExampleFileSystemModule = ARROW_REGISTER_FILESYSTEM(
"example",
[](const arrow::util::Uri& uri, const io::IOContext& io_context,
std::string* out_path) -> Result<std::shared_ptr<fs::FileSystem>> {
Expand All @@ -148,4 +148,4 @@ fs::FileSystemRegistrar kExampleFileSystemModule{
}
return fs;
},
};
{});
4 changes: 3 additions & 1 deletion cpp/src/arrow/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ add_arrow_test(filesystem-test
EXTRA_LABELS
filesystem
DEFINITIONS
ARROW_FILESYSTEM_EXAMPLE_LIBPATH="$<TARGET_FILE:arrow_filesystem_example>")
ARROW_FILESYSTEM_EXAMPLE_LIBPATH="$<TARGET_FILE:arrow_filesystem_example>"
EXTRA_DEPENDENCIES
arrow_filesystem_example)

if(ARROW_BUILD_BENCHMARKS)
add_arrow_benchmark(localfs_benchmark
Expand Down
35 changes: 16 additions & 19 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,13 @@
#include "arrow/util/visibility.h"
#include "arrow/util/windows_fixup.h"

namespace arrow {

using internal::checked_pointer_cast;
using internal::TaskHints;
using io::internal::SubmitIO;
using util::Uri;

namespace fs {
namespace arrow::fs {

using arrow::internal::checked_pointer_cast;
using arrow::internal::GetEnvVar;
using arrow::internal::TaskHints;
using arrow::io::internal::SubmitIO;
using arrow::util::Uri;
using internal::ConcatAbstractPath;
using internal::EnsureTrailingSlash;
using internal::GetAbstractPathParent;
Expand Down Expand Up @@ -273,6 +270,11 @@ Result<std::string> FileSystem::PathFromUri(const std::string& uri_string) const
return Status::NotImplemented("PathFromUri is not yet supported on this filesystem");
}

Result<std::string> FileSystem::MakeUri(std::string path) const {
return Status::NotImplemented("MakeUri is not yet supported for ", type_name(),
" filesystems");
}

//////////////////////////////////////////////////////////////////////////
// SubTreeFileSystem implementation

Expand Down Expand Up @@ -726,6 +728,10 @@ class FileSystemFactoryRegistry {
main_registry->scheme_to_factory_.emplace(std::move(scheme), registered);
if (success) continue;

if (it->second.ok()) {
if (registered->factory == it->second->factory) continue;
}

duplicated_schemes.emplace_back(it->first);
}
scheme_to_factory_.clear();
Expand Down Expand Up @@ -852,18 +858,10 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
auto* factory,
FileSystemFactoryRegistry::GetInstance()->FactoryForScheme(scheme));
if (factory != nullptr) {
return (*factory)(uri, io_context, out_path);
return factory->function(uri, io_context, out_path);
}
}

if (scheme == "file") {
std::string path;
ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path));
if (out_path != nullptr) {
*out_path = path;
}
return std::make_shared<LocalFileSystem>(options, io_context);
}
if (scheme == "abfs" || scheme == "abfss") {
#ifdef ARROW_AZURE
ARROW_ASSIGN_OR_RAISE(auto options, AzureOptions::FromUri(uri, out_path));
Expand Down Expand Up @@ -969,5 +967,4 @@ Status Initialize(const FileSystemGlobalOptions& options) {
return Status::OK();
}

} // namespace fs
} // namespace arrow
} // namespace arrow::fs
30 changes: 28 additions & 2 deletions cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ class ARROW_EXPORT FileSystem
/// \return The path inside the filesystem that is indicated by the URI.
virtual Result<std::string> PathFromUri(const std::string& uri_string) const;

/// \brief Make a URI from which FileSystemFromUri produces an equivalent filesystem
/// \param path The path component to use in the resulting URI
/// \return A URI string, or an error if an equivalent URI cannot be produced
virtual Result<std::string> MakeUri(std::string path) const;

virtual bool Equals(const FileSystem& other) const = 0;

virtual bool Equals(const std::shared_ptr<FileSystem>& other) const {
Expand Down Expand Up @@ -352,8 +357,23 @@ class ARROW_EXPORT FileSystem
bool default_async_is_sync_ = true;
};

using FileSystemFactory = std::function<Result<std::shared_ptr<FileSystem>>(
const Uri& uri, const io::IOContext& io_context, std::string* out_path)>;
struct FileSystemFactory {
std::function<Result<std::shared_ptr<FileSystem>>(
const Uri& uri, const io::IOContext& io_context, std::string* out_path)>
function;
std::string_view file;
int line;

bool operator==(const FileSystemFactory& other) const {
// In the case where libarrow is linked statically both to the executable and to a
// dynamically loaded filesystem implementation library, the library contains a
// duplicate definition of the registry and duplicate definitions of any
// FileSystemRegistrars which are statically linked to libarrow. When retrieving
// factories from the filesystem implementation library, we use the file and line
// of the registrar's definition to determine equivalence of the duplicate factories.
return file == other.file && line == other.line;
}
};

/// \brief A FileSystem implementation that delegates to another
/// implementation after prepending a fixed base path.
Expand Down Expand Up @@ -645,6 +665,12 @@ struct ARROW_EXPORT FileSystemRegistrar {
std::function<void()> finalizer = {});
};

#define ARROW_REGISTER_FILESYSTEM(scheme, factory_function, finalizer) \
::arrow::fs::FileSystemRegistrar { \
scheme, ::arrow::fs::FileSystemFactory{factory_function, __FILE__, __LINE__}, \
finalizer \
}

/// @}

namespace internal {
Expand Down
37 changes: 35 additions & 2 deletions cpp/src/arrow/filesystem/localfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "arrow/io/type_fwd.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/io_util.h"
#include "arrow/util/string.h"
#include "arrow/util/uri.h"
#include "arrow/util/windows_fixup.h"

Expand Down Expand Up @@ -246,8 +247,20 @@ Result<LocalFileSystemOptions> LocalFileSystemOptions::FromUri(
std::string(internal::RemoveTrailingSlash(uri.path(), /*preserve_root=*/true));
}

// TODO handle use_mmap option
return LocalFileSystemOptions();
LocalFileSystemOptions options;
ARROW_ASSIGN_OR_RAISE(auto params, uri.query_items());
for (const auto& [key, value] : params) {
if (key == "use_mmap") {
if (value.empty()) {
options.use_mmap = true;
continue;
} else {
ARROW_ASSIGN_OR_RAISE(options.use_mmap, ::arrow::internal::ParseBoolean(value));
}
break;
}
}
return options;
}

LocalFileSystem::LocalFileSystem(const io::IOContext& io_context)
Expand All @@ -273,6 +286,11 @@ Result<std::string> LocalFileSystem::PathFromUri(const std::string& uri_string)
authority_handling);
}

Result<std::string> LocalFileSystem::MakeUri(std::string path) const {
ARROW_ASSIGN_OR_RAISE(path, DoNormalizePath(std::move(path)));
return "file://" + path + (options_.use_mmap ? "?use_mmap" : "");
}

bool LocalFileSystem::Equals(const FileSystem& other) const {
if (other.type_name() != type_name()) {
return false;
Expand Down Expand Up @@ -686,4 +704,19 @@ Result<std::shared_ptr<io::OutputStream>> LocalFileSystem::OpenAppendStream(
return OpenOutputStreamGeneric(path, truncate, append);
}

static Result<std::shared_ptr<fs::FileSystem>> LocalFileSystemFactory(
const arrow::util::Uri& uri, const io::IOContext& io_context, std::string* out_path) {
std::string path;
ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path));
if (out_path != nullptr) {
*out_path = std::move(path);
}
return std::make_shared<LocalFileSystem>(options, io_context);
}

FileSystemRegistrar kLocalFileSystemModule[]{
ARROW_REGISTER_FILESYSTEM("file", LocalFileSystemFactory, {}),
ARROW_REGISTER_FILESYSTEM("local", LocalFileSystemFactory, {}),
};

} // namespace arrow::fs
1 change: 1 addition & 0 deletions cpp/src/arrow/filesystem/localfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem {

Result<std::string> NormalizePath(std::string path) override;
Result<std::string> PathFromUri(const std::string& uri_string) const override;
Result<std::string> MakeUri(std::string path) const override;

bool Equals(const FileSystem& other) const override;

Expand Down
29 changes: 18 additions & 11 deletions cpp/src/arrow/filesystem/localfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

#include <cerrno>
#include <chrono>
#include <memory>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -113,10 +111,8 @@ Result<std::shared_ptr<FileSystem>> SlowFileSystemFactory(const Uri& uri,
}
return std::make_shared<SlowFileSystemPublicProps>(base_fs, average_latency, seed);
}
FileSystemRegistrar kSlowFileSystemModule{
"slowfile",
SlowFileSystemFactory,
};
auto kSlowFileSystemModule =
ARROW_REGISTER_FILESYSTEM("slowfile", SlowFileSystemFactory, {});

TEST(FileSystemFromUri, LinkedRegisteredFactory) {
// Since the registrar's definition is in this translation unit (which is linked to the
Expand Down Expand Up @@ -155,23 +151,24 @@ TEST(FileSystemFromUri, RuntimeRegisteredFactory) {
EXPECT_THAT(FileSystemFromUri("slowfile2:///hey/yo", &path),
Raises(StatusCode::Invalid));

EXPECT_THAT(RegisterFileSystemFactory("slowfile2", SlowFileSystemFactory), Ok());
EXPECT_THAT(RegisterFileSystemFactory("slowfile2", {SlowFileSystemFactory, "", 0}),
Ok());

ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("slowfile2:///hey/yo", &path));
EXPECT_EQ(path, "/hey/yo");
EXPECT_EQ(fs->type_name(), "slow");

EXPECT_THAT(
RegisterFileSystemFactory("slowfile2", SlowFileSystemFactory),
RegisterFileSystemFactory("slowfile2", {SlowFileSystemFactory, "", 0}),
Raises(StatusCode::KeyError,
testing::HasSubstr("Attempted to register factory for scheme 'slowfile2' "
"but that scheme is already registered")));
}

FileSystemRegistrar kSegfaultFileSystemModule[]{
{"segfault", nullptr},
{"segfault", nullptr},
{"segfault", nullptr},
ARROW_REGISTER_FILESYSTEM("segfault", nullptr, {}),
ARROW_REGISTER_FILESYSTEM("segfault", nullptr, {}),
ARROW_REGISTER_FILESYSTEM("segfault", nullptr, {}),
};
TEST(FileSystemFromUri, LinkedRegisteredFactoryNameCollision) {
// Since multiple registrars are defined in this translation unit which all
Expand Down Expand Up @@ -309,6 +306,7 @@ class TestLocalFS : public LocalFSTestMixin {
std::string path;
ASSERT_OK_AND_ASSIGN(fs_, fs_from_uri(uri, &path));
ASSERT_EQ(fs_->type_name(), "local");
local_fs_ = ::arrow::internal::checked_pointer_cast<LocalFileSystem>(fs_);
ASSERT_EQ(path, expected_path);
ASSERT_OK_AND_ASSIGN(path, fs_->PathFromUri(uri));
ASSERT_EQ(path, expected_path);
Expand Down Expand Up @@ -420,8 +418,17 @@ TYPED_TEST(TestLocalFS, FileSystemFromUriFile) {

// Variations
this->TestLocalUri("file:/foo/bar", "/foo/bar");
ASSERT_FALSE(this->local_fs_->options().use_mmap);
this->TestLocalUri("file:///foo/bar", "/foo/bar");
this->TestLocalUri("file:///some%20path/%25percent", "/some path/%percent");

this->TestLocalUri("file:///_?use_mmap", "/_");
if (this->path_formatter_.supports_uri()) {
ASSERT_TRUE(this->local_fs_->options().use_mmap);
ASSERT_OK_AND_ASSIGN(auto uri, this->fs_->MakeUri("/_"));
EXPECT_EQ(uri, "file:///_?use_mmap");
}

#ifdef _WIN32
this->TestLocalUri("file:/C:/foo/bar", "C:/foo/bar");
this->TestLocalUri("file:///C:/foo/bar", "C:/foo/bar");
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/testing/examplefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace arrow::fs {

FileSystemRegistrar kExampleFileSystemModule{
auto kExampleFileSystemModule = ARROW_REGISTER_FILESYSTEM(
"example",
[](const Uri& uri, const io::IOContext& io_context,
std::string* out_path) -> Result<std::shared_ptr<FileSystem>> {
Expand All @@ -33,6 +33,6 @@ FileSystemRegistrar kExampleFileSystemModule{
auto local_uri = "file" + uri.ToString().substr(kScheme.size());
return FileSystemFromUri(local_uri, io_context, out_path);
},
};
{});

} // namespace arrow::fs
7 changes: 3 additions & 4 deletions docs/source/cpp/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ scope, which will register a factory whenever the instance is loaded:

.. code-block:: cpp

arrow::fs::FileSystemRegistrar kExampleFileSystemModule{
auto kExampleFileSystemModule = ARROW_REGISTER_FILESYSTEM(
"example",
[](const Uri& uri, const io::IOContext& io_context,
std::string* out_path) -> Result<std::shared_ptr<arrow::fs::FileSystem>> {
EnsureExampleFileSystemInitialized();
return std::make_shared<ExampleFileSystem>();
},
&EnsureExampleFileSystemFinalized,
};
&EnsureExampleFileSystemFinalized
);

If a filesystem implementation requires initialization before any instances
may be constructed, this should be included in the corresponding factory or
Expand All @@ -144,4 +144,3 @@ should have exactly one of its sources
``#include "arrow/filesystem/filesystem_library.h"``
in order to ensure the presence of the symbol on which
:func:`~arrow::fs::LoadFileSystemFactories` depends.

3 changes: 0 additions & 3 deletions python/pyarrow/_fs.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ cdef class FileSystem(_Weakrefable):


cdef class LocalFileSystem(FileSystem):
cdef:
CLocalFileSystem* localfs

cdef init(self, const shared_ptr[CFileSystem]& wrapped)


Expand Down
Loading