diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index fcbae332d23..179be069b2a 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -17,11 +17,17 @@ #include "arrow/filesystem/azurefs.h" -#include #include +#include "arrow/buffer.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/util_internal.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" +#include "arrow/util/string.h" namespace arrow { namespace fs { @@ -37,34 +43,329 @@ bool AzureOptions::Equals(const AzureOptions& other) const { credentials_kind == other.credentials_kind); } +Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_name, + const std::string& account_key) { + if (this->backend == AzureBackend::Azurite) { + account_blob_url = "http://127.0.0.1:10000/" + account_name + "/"; + account_dfs_url = "http://127.0.0.1:10000/" + account_name + "/"; + } else { + account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/"; + account_blob_url = "https://" + account_name + ".blob.core.windows.net/"; + } + storage_credentials_provider = + std::make_shared(account_name, + account_key); + credentials_kind = AzureCredentialsKind::StorageCredentials; + return Status::OK(); +} +namespace { + +// An AzureFileSystem represents a single Azure storage account. AzurePath describes a +// container and path within that storage account. +struct AzurePath { + std::string full_path; + std::string container; + std::string path_to_file; + std::vector path_to_file_parts; + + static Result FromString(const std::string& s) { + // Example expected string format: testcontainer/testdir/testfile.txt + // container = testcontainer + // path_to_file = testdir/testfile.txt + // path_to_file_parts = [testdir, testfile.txt] + if (internal::IsLikelyUri(s)) { + return Status::Invalid( + "Expected an Azure object path of the form 'container/path...', got a URI: '", + s, "'"); + } + const auto src = internal::RemoveTrailingSlash(s); + auto first_sep = src.find_first_of(internal::kSep); + if (first_sep == 0) { + return Status::Invalid("Path cannot start with a separator ('", s, "')"); + } + if (first_sep == std::string::npos) { + return AzurePath{std::string(src), std::string(src), "", {}}; + } + AzurePath path; + path.full_path = std::string(src); + path.container = std::string(src.substr(0, first_sep)); + path.path_to_file = std::string(src.substr(first_sep + 1)); + path.path_to_file_parts = internal::SplitAbstractPath(path.path_to_file); + RETURN_NOT_OK(Validate(path)); + return path; + } + + static Status Validate(const AzurePath& path) { + auto status = internal::ValidateAbstractPathParts(path.path_to_file_parts); + if (!status.ok()) { + return Status::Invalid(status.message(), " in path ", path.full_path); + } else { + return status; + } + } + + AzurePath parent() const { + DCHECK(has_parent()); + auto parent = AzurePath{"", container, "", path_to_file_parts}; + parent.path_to_file_parts.pop_back(); + parent.path_to_file = internal::JoinAbstractPath(parent.path_to_file_parts); + if (parent.path_to_file.empty()) { + parent.full_path = parent.container; + } else { + parent.full_path = parent.container + internal::kSep + parent.path_to_file; + } + return parent; + } + + bool has_parent() const { return !path_to_file.empty(); } + + bool empty() const { return container.empty() && path_to_file.empty(); } + + bool operator==(const AzurePath& other) const { + return container == other.container && path_to_file == other.path_to_file; + } +}; + +Status PathNotFound(const AzurePath& path) { + return ::arrow::fs::internal::PathNotFound(path.full_path); +} + +Status NotAFile(const AzurePath& path) { + return ::arrow::fs::internal::NotAFile(path.full_path); +} + +Status ValidateFilePath(const AzurePath& path) { + if (path.container.empty()) { + return PathNotFound(path); + } + + if (path.path_to_file.empty()) { + return NotAFile(path); + } + return Status::OK(); +} + +Status ErrorToStatus(const std::string& prefix, + const Azure::Storage::StorageException& exception) { + return Status::IOError(prefix, " Azure Error: ", exception.what()); +} + +template +std::shared_ptr GetObjectMetadata(const ObjectResult& result) { + auto md = std::make_shared(); + for (auto prop : result) { + md->Append(prop.first, prop.second); + } + return md; +} + +class ObjectInputFile final : public io::RandomAccessFile { + public: + ObjectInputFile(std::shared_ptr blob_client, + const io::IOContext& io_context, AzurePath path, int64_t size = kNoSize) + : blob_client_(std::move(blob_client)), + io_context_(io_context), + path_(std::move(path)), + content_length_(size) {} + + Status Init() { + if (content_length_ != kNoSize) { + DCHECK_GE(content_length_, 0); + return Status::OK(); + } + try { + auto properties = blob_client_->GetProperties(); + content_length_ = properties.Value.BlobSize; + metadata_ = GetObjectMetadata(properties.Value.Metadata); + return Status::OK(); + } catch (const Azure::Storage::StorageException& exception) { + if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + // Could be either container or blob not found. + return PathNotFound(path_); + } + return ErrorToStatus( + "When fetching properties for '" + blob_client_->GetUrl() + "': ", exception); + } + } + + Status CheckClosed(const char* action) const { + if (closed_) { + return Status::Invalid("Cannot ", action, " on closed file."); + } + return Status::OK(); + } + + Status CheckPosition(int64_t position, const char* action) const { + DCHECK_GE(content_length_, 0); + if (position < 0) { + return Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return Status::IOError("Cannot ", action, " past end of file"); + } + return Status::OK(); + } + + // RandomAccessFile APIs + + Result> ReadMetadata() override { + return metadata_; + } + + Future> ReadMetadataAsync( + const io::IOContext& io_context) override { + return metadata_; + } + + Status Close() override { + blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Result Tell() const override { + RETURN_NOT_OK(CheckClosed("tell")); + return pos_; + } + + Result GetSize() override { + RETURN_NOT_OK(CheckClosed("size")); + return content_length_; + } + + Status Seek(int64_t position) override { + RETURN_NOT_OK(CheckClosed("seek")); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return Status::OK(); + } + + Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + RETURN_NOT_OK(CheckClosed("read")); + RETURN_NOT_OK(CheckPosition(position, "read")); + + nbytes = std::min(nbytes, content_length_ - position); + if (nbytes == 0) { + return 0; + } + + // Read the desired range of bytes + Azure::Core::Http::HttpRange range{position, nbytes}; + Azure::Storage::Blobs::DownloadBlobToOptions download_options; + download_options.Range = range; + try { + return blob_client_ + ->DownloadTo(reinterpret_cast(out), nbytes, download_options) + .Value.ContentRange.Length.Value(); + } catch (const Azure::Storage::StorageException& exception) { + return ErrorToStatus("When reading from '" + blob_client_->GetUrl() + + "' at position " + std::to_string(position) + " for " + + std::to_string(nbytes) + " bytes: ", + exception); + } + } + + Result> ReadAt(int64_t position, int64_t nbytes) override { + RETURN_NOT_OK(CheckClosed("read")); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + ARROW_ASSIGN_OR_RAISE(auto buffer, + AllocateResizableBuffer(nbytes, io_context_.pool())); + if (nbytes > 0) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, + ReadAt(position, nbytes, buffer->mutable_data())); + DCHECK_LE(bytes_read, nbytes); + RETURN_NOT_OK(buffer->Resize(bytes_read)); + } + return std::move(buffer); + } + + Result Read(int64_t nbytes, void* out) override { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + Result> Read(int64_t nbytes) override { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return std::move(buffer); + } + + private: + std::shared_ptr blob_client_; + const io::IOContext io_context_; + AzurePath path_; + + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = kNoSize; + std::shared_ptr metadata_; +}; + +} // namespace + // ----------------------------------------------------------------------- // AzureFilesystem Implementation class AzureFileSystem::Impl { public: io::IOContext io_context_; - bool is_hierarchical_namespace_enabled_; + std::shared_ptr service_client_; AzureOptions options_; explicit Impl(AzureOptions options, io::IOContext io_context) : io_context_(io_context), options_(std::move(options)) {} Status Init() { - // TODO: GH-18014 Delete this once we have a proper implementation. This just - // initializes a pointless Azure blob service client with a fake endpoint to ensure - // the build will fail if the Azure SDK build is broken. - auto default_credential = std::make_shared(); - auto service_client = Azure::Storage::Blobs::BlobServiceClient( - "http://fake-blob-storage-endpoint", default_credential); - if (options_.backend == AzureBackend::Azurite) { - // gen1Client_->GetAccountInfo().Value.IsHierarchicalNamespaceEnabled - // throws error in azurite - is_hierarchical_namespace_enabled_ = false; - } + service_client_ = std::make_shared( + options_.account_blob_url, options_.storage_credentials_provider); return Status::OK(); } const AzureOptions& options() const { return options_; } + + Result> OpenInputFile(const std::string& s, + AzureFileSystem* fs) { + ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s)); + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + RETURN_NOT_OK(ValidateFilePath(path)); + auto blob_client = std::make_shared( + service_client_->GetBlobContainerClient(path.container) + .GetBlobClient(path.path_to_file)); + + auto ptr = + std::make_shared(blob_client, fs->io_context(), std::move(path)); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Result> OpenInputFile(const FileInfo& info, + AzureFileSystem* fs) { + ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path())); + if (info.type() == FileType::NotFound) { + return ::arrow::fs::internal::PathNotFound(info.path()); + } + if (info.type() != FileType::File && info.type() != FileType::Unknown) { + return ::arrow::fs::internal::NotAFile(info.path()); + } + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(info.path())); + RETURN_NOT_OK(ValidateFilePath(path)); + auto blob_client = std::make_shared( + service_client_->GetBlobContainerClient(path.container) + .GetBlobClient(path.path_to_file)); + + auto ptr = std::make_shared(blob_client, fs->io_context(), + std::move(path), info.size()); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } }; const AzureOptions& AzureFileSystem::options() const { return impl_->options(); } @@ -118,22 +419,22 @@ Status AzureFileSystem::CopyFile(const std::string& src, const std::string& dest Result> AzureFileSystem::OpenInputStream( const std::string& path) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + return impl_->OpenInputFile(path, this); } Result> AzureFileSystem::OpenInputStream( const FileInfo& info) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + return impl_->OpenInputFile(info, this); } Result> AzureFileSystem::OpenInputFile( const std::string& path) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + return impl_->OpenInputFile(path, this); } Result> AzureFileSystem::OpenInputFile( const FileInfo& info) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + return impl_->OpenInputFile(info, this); } Result> AzureFileSystem::OpenOutputStream( diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index e5af4d23aab..1f7047ff94c 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -79,6 +79,9 @@ struct ARROW_EXPORT AzureOptions { AzureOptions(); + Status ConfigureAccountKeyCredentials(const std::string& account_name, + const std::string& account_key); + bool Equals(const AzureOptions& other) const; }; diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 9bf7cb8e753..5d454bdc33f 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -34,23 +34,24 @@ #include #include "arrow/filesystem/azurefs.h" -#include "arrow/util/io_util.h" + +#include +#include #include #include #include - -#include - -#include "arrow/testing/gtest_util.h" -#include "arrow/testing/util.h" - #include #include #include #include #include +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" +#include "arrow/util/io_util.h" +#include "arrow/util/key_value_metadata.h" + namespace arrow { using internal::TemporaryDir; namespace fs { @@ -61,6 +62,15 @@ using ::testing::IsEmpty; using ::testing::Not; using ::testing::NotNull; +auto const* kLoremIpsum = R"""( +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor +incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis +nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. +Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu +fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in +culpa qui officia deserunt mollit anim id est laborum. +)"""; + class AzuriteEnv : public ::testing::Environment { public: AzuriteEnv() { @@ -113,33 +123,6 @@ AzuriteEnv* GetAzuriteEnv() { // Placeholder tests // TODO: GH-18014 Remove once a proper test is added -TEST(AzureFileSystem, UploadThenDownload) { - const std::string container_name = "sample-container"; - const std::string blob_name = "sample-blob.txt"; - const std::string blob_content = "Hello Azure!"; - - const std::string& account_name = GetAzuriteEnv()->account_name(); - const std::string& account_key = GetAzuriteEnv()->account_key(); - - auto credential = std::make_shared( - account_name, account_key); - - auto service_client = Azure::Storage::Blobs::BlobServiceClient( - std::string("http://127.0.0.1:10000/") + account_name, credential); - auto container_client = service_client.GetBlobContainerClient(container_name); - container_client.CreateIfNotExists(); - auto blob_client = container_client.GetBlockBlobClient(blob_name); - - std::vector buffer(blob_content.begin(), blob_content.end()); - blob_client.UploadFrom(buffer.data(), buffer.size()); - - std::vector downloaded_content(blob_content.size()); - blob_client.DownloadTo(downloaded_content.data(), downloaded_content.size()); - - EXPECT_EQ(std::string(downloaded_content.begin(), downloaded_content.end()), - blob_content); -} - TEST(AzureFileSystem, InitializeCredentials) { auto default_credential = std::make_shared(); auto managed_identity_credential = @@ -154,6 +137,326 @@ TEST(AzureFileSystem, OptionsCompare) { EXPECT_TRUE(options.Equals(options)); } +class TestAzureFileSystem : public ::testing::Test { + public: + std::shared_ptr fs_; + std::shared_ptr service_client_; + std::mt19937_64 generator_; + std::string container_name_; + + TestAzureFileSystem() : generator_(std::random_device()()) {} + + AzureOptions MakeOptions() { + const std::string& account_name = GetAzuriteEnv()->account_name(); + const std::string& account_key = GetAzuriteEnv()->account_key(); + AzureOptions options; + options.backend = AzureBackend::Azurite; + ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(account_name, account_key)); + return options; + } + + void SetUp() override { + ASSERT_THAT(GetAzuriteEnv(), NotNull()); + ASSERT_OK(GetAzuriteEnv()->status()); + + container_name_ = RandomChars(32); + auto options = MakeOptions(); + service_client_ = std::make_shared( + options.account_blob_url, options.storage_credentials_provider); + ASSERT_OK_AND_ASSIGN(fs_, AzureFileSystem::Make(options)); + auto container_client = service_client_->GetBlobContainerClient(container_name_); + container_client.CreateIfNotExists(); + + auto blob_client = container_client.GetBlockBlobClient(PreexistingObjectName()); + blob_client.UploadFrom(reinterpret_cast(kLoremIpsum), + strlen(kLoremIpsum)); + } + + void TearDown() override { + auto containers = service_client_->ListBlobContainers(); + for (auto container : containers.BlobContainers) { + auto container_client = service_client_->GetBlobContainerClient(container.Name); + container_client.DeleteIfExists(); + } + } + + std::string PreexistingContainerName() const { return container_name_; } + + std::string PreexistingContainerPath() const { + return PreexistingContainerName() + '/'; + } + + static std::string PreexistingObjectName() { return "test-object-name"; } + + std::string PreexistingObjectPath() const { + return PreexistingContainerPath() + PreexistingObjectName(); + } + + std::string NotFoundObjectPath() { return PreexistingContainerPath() + "not-found"; } + + std::string RandomLine(int lineno, std::size_t width) { + auto line = std::to_string(lineno) + ": "; + line += RandomChars(width - line.size() - 1); + line += '\n'; + return line; + } + + std::size_t RandomIndex(std::size_t end) { + return std::uniform_int_distribution(0, end - 1)(generator_); + } + + std::string RandomChars(std::size_t count) { + auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789"); + std::uniform_int_distribution d(0, fillers.size() - 1); + std::string s; + std::generate_n(std::back_inserter(s), count, [&] { return fillers[d(generator_)]; }); + return s; + } + + void UploadLines(const std::vector& lines, const char* path_to_file, + int total_size) { + // TODO(GH-38333): Switch to using Azure filesystem to write once its implemented. + auto blob_client = service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path_to_file); + std::string all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); + blob_client.UploadFrom(reinterpret_cast(all_lines.data()), + total_size); + } +}; + +TEST_F(TestAzureFileSystem, OpenInputStreamString) { + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath())); + + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024)); + EXPECT_EQ(buffer->ToString(), kLoremIpsum); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamStringBuffers) { + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath())); + + std::string contents; + std::shared_ptr buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, stream->Read(16)); + contents.append(buffer->ToString()); + } while (buffer && buffer->size() != 0); + + EXPECT_EQ(contents, kLoremIpsum); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamInfo) { + // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(PreexistingObjectPath())); + arrow::fs::FileInfo info(PreexistingObjectPath(), FileType::File); + + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(info)); + + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024)); + EXPECT_EQ(buffer->ToString(), kLoremIpsum); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamEmpty) { + const auto path_to_file = "empty-object.txt"; + const auto path = PreexistingContainerPath() + path_to_file; + service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path_to_file) + .UploadFrom(nullptr, 0); + + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(path)); + std::array buffer{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data())); + EXPECT_EQ(size, 0); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamNotFound) { + ASSERT_RAISES(IOError, fs_->OpenInputStream(NotFoundObjectPath())); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamInfoInvalid) { + // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(PreexistingBucketPath())); + arrow::fs::FileInfo info(PreexistingContainerPath(), FileType::Directory); + ASSERT_RAISES(IOError, fs_->OpenInputStream(info)); + + // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(NotFoundObjectPath())); + arrow::fs::FileInfo info2(PreexistingContainerPath(), FileType::NotFound); + ASSERT_RAISES(IOError, fs_->OpenInputStream(info2)); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamUri) { + ASSERT_RAISES(Invalid, fs_->OpenInputStream("abfss://" + PreexistingObjectPath())); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamTrailingSlash) { + ASSERT_RAISES(IOError, fs_->OpenInputStream(PreexistingObjectPath() + '/')); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) { + const std::string object_name = "OpenInputStreamMetadataTest/simple.txt"; + + service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlobClient(PreexistingObjectName()) + .SetMetadata(Azure::Storage::Metadata{{"key0", "value0"}}); + + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath())); + + std::shared_ptr actual; + ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata()); + // TODO(GH-38330): This is asserting that the user defined metadata is returned but this + // is probably not the correct behaviour. + ASSERT_OK_AND_EQ("value0", actual->Get("key0")); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamClosed) { + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(PreexistingObjectPath())); + ASSERT_OK(stream->Close()); + std::array buffer{}; + ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data())); + ASSERT_RAISES(Invalid, stream->Read(buffer.size())); + ASSERT_RAISES(Invalid, stream->Tell()); +} + +TEST_F(TestAzureFileSystem, OpenInputFileMixedReadVsReadAt) { + // Create a file large enough to make the random access tests non-trivial. + auto constexpr kLineWidth = 100; + auto constexpr kLineCount = 4096; + std::vector lines(kLineCount); + int lineno = 0; + std::generate_n(lines.begin(), lines.size(), + [&] { return RandomLine(++lineno, kLineWidth); }); + + const auto path_to_file = "OpenInputFileMixedReadVsReadAt/object-name"; + const auto path = PreexistingContainerPath() + path_to_file; + + UploadLines(lines, path_to_file, kLineCount * kLineWidth); + + std::shared_ptr file; + ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path)); + for (int i = 0; i != 32; ++i) { + SCOPED_TRACE("Iteration " + std::to_string(i)); + // Verify sequential reads work as expected. + std::array buffer{}; + std::int64_t size; + { + ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth)); + EXPECT_EQ(lines[2 * i], actual->ToString()); + } + { + ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data())); + EXPECT_EQ(size, kLineWidth); + auto actual = std::string{buffer.begin(), buffer.end()}; + EXPECT_EQ(lines[2 * i + 1], actual); + } + + // Verify random reads interleave too. + auto const index = RandomIndex(kLineCount); + auto const position = index * kLineWidth; + ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(), buffer.data())); + EXPECT_EQ(size, kLineWidth); + auto actual = std::string{buffer.begin(), buffer.end()}; + EXPECT_EQ(lines[index], actual); + + // Verify random reads using buffers work. + ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth)); + EXPECT_EQ(lines[index], b->ToString()); + } +} + +TEST_F(TestAzureFileSystem, OpenInputFileRandomSeek) { + // Create a file large enough to make the random access tests non-trivial. + auto constexpr kLineWidth = 100; + auto constexpr kLineCount = 4096; + std::vector lines(kLineCount); + int lineno = 0; + std::generate_n(lines.begin(), lines.size(), + [&] { return RandomLine(++lineno, kLineWidth); }); + + const auto path_to_file = "OpenInputFileRandomSeek/object-name"; + const auto path = PreexistingContainerPath() + path_to_file; + std::shared_ptr output; + + UploadLines(lines, path_to_file, kLineCount * kLineWidth); + + std::shared_ptr file; + ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path)); + for (int i = 0; i != 32; ++i) { + SCOPED_TRACE("Iteration " + std::to_string(i)); + // Verify sequential reads work as expected. + auto const index = RandomIndex(kLineCount); + auto const position = index * kLineWidth; + ASSERT_OK(file->Seek(position)); + ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth)); + EXPECT_EQ(lines[index], actual->ToString()); + } +} + +TEST_F(TestAzureFileSystem, OpenInputFileIoContext) { + // Create a test file. + const auto path_to_file = "OpenInputFileIoContext/object-name"; + const auto path = PreexistingContainerPath() + path_to_file; + const std::string contents = "The quick brown fox jumps over the lazy dog"; + + auto blob_client = service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path_to_file); + blob_client.UploadFrom(reinterpret_cast(contents.data()), + contents.length()); + + std::shared_ptr file; + ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path)); + EXPECT_EQ(fs_->io_context().external_id(), file->io_context().external_id()); +} + +TEST_F(TestAzureFileSystem, OpenInputFileInfo) { + // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(PreexistingObjectPath())); + arrow::fs::FileInfo info(PreexistingObjectPath(), FileType::File); + + std::shared_ptr file; + ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(info)); + + std::array buffer{}; + std::int64_t size; + auto constexpr kStart = 16; + ASSERT_OK_AND_ASSIGN(size, file->ReadAt(kStart, buffer.size(), buffer.data())); + + auto const expected = std::string(kLoremIpsum).substr(kStart); + EXPECT_EQ(std::string(buffer.data(), size), expected); +} + +TEST_F(TestAzureFileSystem, OpenInputFileNotFound) { + ASSERT_RAISES(IOError, fs_->OpenInputFile(NotFoundObjectPath())); +} + +TEST_F(TestAzureFileSystem, OpenInputFileInfoInvalid) { + // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(PreexistingContainerPath())); + arrow::fs::FileInfo info(PreexistingContainerPath(), FileType::File); + ASSERT_RAISES(IOError, fs_->OpenInputFile(info)); + + // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(NotFoundObjectPath())); + arrow::fs::FileInfo info2(NotFoundObjectPath(), FileType::NotFound); + ASSERT_RAISES(IOError, fs_->OpenInputFile(info2)); +} + +TEST_F(TestAzureFileSystem, OpenInputFileClosed) { + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputFile(PreexistingObjectPath())); + ASSERT_OK(stream->Close()); + std::array buffer{}; + ASSERT_RAISES(Invalid, stream->Tell()); + ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data())); + ASSERT_RAISES(Invalid, stream->Read(buffer.size())); + ASSERT_RAISES(Invalid, stream->ReadAt(1, buffer.size(), buffer.data())); + ASSERT_RAISES(Invalid, stream->ReadAt(1, 1)); + ASSERT_RAISES(Invalid, stream->Seek(2)); +} + } // namespace } // namespace fs } // namespace arrow