diff --git a/sdk/storage/azure-storage-datamovement/CMakeLists.txt b/sdk/storage/azure-storage-datamovement/CMakeLists.txt index a72fcdb349..ba1806cc46 100644 --- a/sdk/storage/azure-storage-datamovement/CMakeLists.txt +++ b/sdk/storage/azure-storage-datamovement/CMakeLists.txt @@ -50,6 +50,8 @@ set( inc/azure/storage/datamovement/scheduler.hpp inc/azure/storage/datamovement/storage_transfer_manager.hpp inc/azure/storage/datamovement/task.hpp + inc/azure/storage/datamovement/tasks/download_blob_directory_task.hpp + inc/azure/storage/datamovement/tasks/download_blob_to_file_task.hpp inc/azure/storage/datamovement/tasks/upload_blob_from_file_task.hpp inc/azure/storage/datamovement/tasks/upload_blobs_from_directory_task.hpp src/private/package_version.hpp @@ -62,6 +64,8 @@ set( src/directory_iterator.cpp src/scheduler.cpp src/storage_transfer_manager.cpp + src/tasks/download_blob_directory_task.cpp + src/tasks/download_blob_to_file_task.cpp src/tasks/upload_blob_from_file_task.cpp src/tasks/upload_blobs_from_directory_task.cpp ) diff --git a/sdk/storage/azure-storage-datamovement/inc/azure/storage/datamovement/tasks/download_blob_directory_task.hpp b/sdk/storage/azure-storage-datamovement/inc/azure/storage/datamovement/tasks/download_blob_directory_task.hpp new file mode 100644 index 0000000000..e109e528a4 --- /dev/null +++ b/sdk/storage/azure-storage-datamovement/inc/azure/storage/datamovement/tasks/download_blob_directory_task.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include "azure/storage/datamovement/blob_folder.hpp" +#include "azure/storage/datamovement/task.hpp" + +namespace Azure { namespace Storage { namespace Blobs { namespace _detail { + + struct DownloadBlobDirectoryTask final : public _internal::TaskBase + { + DownloadBlobDirectoryTask( + _internal::TaskType type, + _internal::Scheduler* scheduler, + const Blobs::BlobFolder& source, + const std::string& destination) + : TaskBase(type, scheduler), Context(std::make_shared(source, destination)) + { + } + + struct TaskContext final + { + explicit TaskContext(Blobs::BlobFolder source, std::string destination) + : Source(std::move(source)), Destination(std::move(destination)) + { + } + Blobs::BlobFolder Source; + std::string Destination; + bool ListCompleted; + size_t FileCounts{0}; + std::atomic NumDownloadedFileCounts{0}; + + std::mutex m_subTasksMutex; + }; + std::shared_ptr Context; + + void Execute() override; + }; + +}}}} // namespace Azure::Storage::Blobs::_detail diff --git a/sdk/storage/azure-storage-datamovement/inc/azure/storage/datamovement/tasks/download_blob_to_file_task.hpp b/sdk/storage/azure-storage-datamovement/inc/azure/storage/datamovement/tasks/download_blob_to_file_task.hpp new file mode 100644 index 0000000000..f8f7ed25d8 --- /dev/null +++ b/sdk/storage/azure-storage-datamovement/inc/azure/storage/datamovement/tasks/download_blob_to_file_task.hpp @@ -0,0 +1,59 @@ +#pragma once + +#include "azure/storage/datamovement/datamovement_options.hpp" +#include "azure/storage/datamovement/task.hpp" + +namespace Azure { namespace Storage { namespace Blobs { namespace _detail { + + struct DownloadBlobToFileTask final : public Storage::_internal::TaskBase + { + DownloadBlobToFileTask( + Storage::_internal::TaskType type, + Storage::_internal::Scheduler* scheduler, + const Blobs::BlobClient& source, + const std::string& destination) + : TaskBase(type, scheduler), Context(std::make_shared(source, destination)) + { + } + + struct TaskContext final + { + explicit TaskContext(Blobs::BlobClient source, std::string destination) + : Source(std::move(source)), Destination(std::move(destination)) + { + } + Blobs::BlobClient Source; + std::string Destination; + std::unique_ptr FileWriter; + uint64_t FileSize{0}; + int NumChunks{0}; + std::atomic NumDownloadedChunks{0}; + }; + std::shared_ptr Context; + + void Execute() override; + }; + + struct DownloadRangeToMemoryTask final : public Storage::_internal::TaskBase + { + using TaskBase::TaskBase; + + std::shared_ptr Context; + int64_t Offset; + size_t Length; + + void Execute() override; + }; + + struct WriteToFileTask final : public Storage::_internal::TaskBase + { + using TaskBase::TaskBase; + + std::shared_ptr Context; + int64_t Offset; + size_t Length; + std::unique_ptr Buffer; + + void Execute() override; + }; +}}}} // namespace Azure::Storage::Blobs::_detail diff --git a/sdk/storage/azure-storage-datamovement/src/tasks/download_blob_directory_task.cpp b/sdk/storage/azure-storage-datamovement/src/tasks/download_blob_directory_task.cpp new file mode 100644 index 0000000000..ea8a536577 --- /dev/null +++ b/sdk/storage/azure-storage-datamovement/src/tasks/download_blob_directory_task.cpp @@ -0,0 +1,10 @@ +#include "azure/storage/datamovement/tasks/download_blob_directory_task.hpp" +#include "azure/storage/datamovement/scheduler.hpp" +#include "azure/storage/datamovement/tasks/download_blob_to_file_task.hpp" +namespace Azure { namespace Storage { namespace Blobs { namespace _detail { + void DownloadBlobDirectoryTask::Execute() + { + // TODO complete handling + } + +}}}} // namespace Azure::Storage::Blobs::_detail diff --git a/sdk/storage/azure-storage-datamovement/src/tasks/download_blob_to_file_task.cpp b/sdk/storage/azure-storage-datamovement/src/tasks/download_blob_to_file_task.cpp new file mode 100644 index 0000000000..470eb6e973 --- /dev/null +++ b/sdk/storage/azure-storage-datamovement/src/tasks/download_blob_to_file_task.cpp @@ -0,0 +1,90 @@ +#include "azure/storage/datamovement/tasks/download_blob_to_file_task.hpp" + +#include "azure/storage/datamovement/scheduler.hpp" + +namespace Azure { namespace Storage { namespace Blobs { namespace _detail { + namespace { + constexpr uint64_t ChunkSize = 4 * 1024 * 1024; + static_assert(ChunkSize < static_cast(std::numeric_limits::max()), ""); + + } // namespace + + // TODO: Should not allow expection thrown out from the method, should add some error handling. + void DownloadBlobToFileTask::Execute() + { + // TODO Error handling here + auto properties = Context->Source.GetProperties().Value; + const uint64_t fileSize = properties.BlobSize; + Context->FileSize = fileSize; + + if (!Context->FileWriter) + { + // TODO before creation, check file existence. + // TODO: error handling here, when failed on opening the file. + Context->FileWriter = std::make_unique(Context->Destination); + } + + if (fileSize == 0) + { + // TODO: completed + return; + } + + // TODO: if file is small enough + + Context->NumChunks = static_cast((fileSize + ChunkSize - 1) / ChunkSize); + std::vector subtasks; + for (int index = 0; index < Context->NumChunks; ++index) + { + auto downloadRangeTask = std::make_unique( + Storage::_internal::TaskType::NetworkDownload, m_scheduler); + downloadRangeTask->Context = Context; + downloadRangeTask->Offset = index * ChunkSize; + downloadRangeTask->Length + = static_cast(std::min(ChunkSize, fileSize - index * ChunkSize)); + downloadRangeTask->MemoryCost = downloadRangeTask->Length; + subtasks.push_back(std::move(downloadRangeTask)); + } + m_scheduler->AddTasks(std::move(subtasks)); + } + + void DownloadRangeToMemoryTask::Execute() + { + std::unique_ptr buffer = std::make_unique(Length); + Azure::Storage::Blobs::DownloadBlobOptions options; + options.Range = Core::Http::HttpRange(); + options.Range->Offset = this->Offset; + options.Range->Length = this->Length; + // TODO: error handling & retry + // TODO: when error happens, memory should be given back, handling MemoryGiveBack. + // TODO: should check for source blob changing to avoid data corruption. + auto downloadResult = Context->Source.Download(options).Value; + size_t bytesRead = downloadResult.BodyStream->ReadToCount(buffer.get(), this->Length); + if (bytesRead != this->Length) + { + // TODO: error handling + } + + auto writeToFileTask + = std::make_unique(_internal::TaskType::DiskIO, m_scheduler); + writeToFileTask->Context = Context; + writeToFileTask->Buffer = std::move(buffer); + writeToFileTask->Offset = this->Offset; + writeToFileTask->Length = Length; + writeToFileTask->MemoryGiveBack = MemoryCost; + + m_scheduler->AddTask(std::move(writeToFileTask)); + } + + void WriteToFileTask::Execute() + { + Context->FileWriter->Write(this->Buffer.get(), this->Length, this->Offset); + Buffer.reset(); + int numDownloadedBlocks + = Context->NumDownloadedChunks.fetch_add(1, std::memory_order_relaxed) + 1; + if (numDownloadedBlocks == Context->NumChunks) + { + // TODO handling complete + } + } +}}}} // namespace Azure::Storage::Blobs::_detail