Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/storage/azure-storage-datamovement/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ set(
inc/azure/storage/datamovement/scheduler.hpp
inc/azure/storage/datamovement/storage_transfer_manager.hpp
inc/azure/storage/datamovement/task.hpp
inc/azure/storage/datamovement/tasks/download_blob_directory_task.hpp
inc/azure/storage/datamovement/tasks/download_blob_to_file_task.hpp
inc/azure/storage/datamovement/tasks/upload_blob_from_file_task.hpp
inc/azure/storage/datamovement/tasks/upload_blobs_from_directory_task.hpp
src/private/package_version.hpp
Expand All @@ -62,6 +64,8 @@ set(
src/directory_iterator.cpp
src/scheduler.cpp
src/storage_transfer_manager.cpp
src/tasks/download_blob_directory_task.cpp
src/tasks/download_blob_to_file_task.cpp
src/tasks/upload_blob_from_file_task.cpp
src/tasks/upload_blobs_from_directory_task.cpp
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#pragma once

#include "azure/storage/datamovement/blob_folder.hpp"
#include "azure/storage/datamovement/task.hpp"

namespace Azure { namespace Storage { namespace Blobs { namespace _detail {

struct DownloadBlobDirectoryTask final : public _internal::TaskBase
{
DownloadBlobDirectoryTask(
_internal::TaskType type,
_internal::Scheduler* scheduler,
const Blobs::BlobFolder& source,
const std::string& destination)
: TaskBase(type, scheduler), Context(std::make_shared<TaskContext>(source, destination))
{
}

struct TaskContext final
{
explicit TaskContext(Blobs::BlobFolder source, std::string destination)
: Source(std::move(source)), Destination(std::move(destination))
{
}
Blobs::BlobFolder Source;
std::string Destination;
bool ListCompleted;
size_t FileCounts{0};
std::atomic<int> NumDownloadedFileCounts{0};

std::mutex m_subTasksMutex;
};
std::shared_ptr<TaskContext> Context;

void Execute() override;
};

}}}} // namespace Azure::Storage::Blobs::_detail
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once

#include "azure/storage/datamovement/datamovement_options.hpp"
#include "azure/storage/datamovement/task.hpp"

namespace Azure { namespace Storage { namespace Blobs { namespace _detail {

struct DownloadBlobToFileTask final : public Storage::_internal::TaskBase
{
DownloadBlobToFileTask(
Storage::_internal::TaskType type,
Storage::_internal::Scheduler* scheduler,
const Blobs::BlobClient& source,
const std::string& destination)
: TaskBase(type, scheduler), Context(std::make_shared<TaskContext>(source, destination))
{
}

struct TaskContext final
{
explicit TaskContext(Blobs::BlobClient source, std::string destination)
: Source(std::move(source)), Destination(std::move(destination))
{
}
Blobs::BlobClient Source;
std::string Destination;
std::unique_ptr<Storage::_internal::FileWriter> FileWriter;
uint64_t FileSize{0};
int NumChunks{0};
std::atomic<int> NumDownloadedChunks{0};
};
std::shared_ptr<TaskContext> Context;

void Execute() override;
};

struct DownloadRangeToMemoryTask final : public Storage::_internal::TaskBase
{
using TaskBase::TaskBase;

std::shared_ptr<DownloadBlobToFileTask::TaskContext> Context;
int64_t Offset;
size_t Length;

void Execute() override;
};

struct WriteToFileTask final : public Storage::_internal::TaskBase
{
using TaskBase::TaskBase;

std::shared_ptr<DownloadBlobToFileTask::TaskContext> Context;
int64_t Offset;
size_t Length;
std::unique_ptr<uint8_t[]> Buffer;

void Execute() override;
};
}}}} // namespace Azure::Storage::Blobs::_detail
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "azure/storage/datamovement/tasks/download_blob_directory_task.hpp"
#include "azure/storage/datamovement/scheduler.hpp"
#include "azure/storage/datamovement/tasks/download_blob_to_file_task.hpp"
namespace Azure { namespace Storage { namespace Blobs { namespace _detail {
void DownloadBlobDirectoryTask::Execute()
{
// TODO complete handling
}

}}}} // namespace Azure::Storage::Blobs::_detail
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#include "azure/storage/datamovement/tasks/download_blob_to_file_task.hpp"

#include "azure/storage/datamovement/scheduler.hpp"

namespace Azure { namespace Storage { namespace Blobs { namespace _detail {
namespace {
constexpr uint64_t ChunkSize = 4 * 1024 * 1024;
static_assert(ChunkSize < static_cast<uint64_t>(std::numeric_limits<size_t>::max()), "");

} // namespace

// TODO: Should not allow expection thrown out from the method, should add some error handling.
void DownloadBlobToFileTask::Execute()
{
// TODO Error handling here
auto properties = Context->Source.GetProperties().Value;
const uint64_t fileSize = properties.BlobSize;
Context->FileSize = fileSize;

if (!Context->FileWriter)
{
// TODO before creation, check file existence.
// TODO: error handling here, when failed on opening the file.
Context->FileWriter = std::make_unique<Storage::_internal::FileWriter>(Context->Destination);
}

if (fileSize == 0)
{
// TODO: completed
return;
}

// TODO: if file is small enough

Context->NumChunks = static_cast<int>((fileSize + ChunkSize - 1) / ChunkSize);
std::vector<Storage::_internal::Task> subtasks;
for (int index = 0; index < Context->NumChunks; ++index)
{
auto downloadRangeTask = std::make_unique<DownloadRangeToMemoryTask>(
Storage::_internal::TaskType::NetworkDownload, m_scheduler);
downloadRangeTask->Context = Context;
downloadRangeTask->Offset = index * ChunkSize;
downloadRangeTask->Length
= static_cast<size_t>(std::min(ChunkSize, fileSize - index * ChunkSize));
downloadRangeTask->MemoryCost = downloadRangeTask->Length;
subtasks.push_back(std::move(downloadRangeTask));
}
m_scheduler->AddTasks(std::move(subtasks));
}

void DownloadRangeToMemoryTask::Execute()
{
std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(Length);
Azure::Storage::Blobs::DownloadBlobOptions options;
options.Range = Core::Http::HttpRange();
options.Range->Offset = this->Offset;
options.Range->Length = this->Length;
// TODO: error handling & retry
// TODO: when error happens, memory should be given back, handling MemoryGiveBack.
// TODO: should check for source blob changing to avoid data corruption.
auto downloadResult = Context->Source.Download(options).Value;
size_t bytesRead = downloadResult.BodyStream->ReadToCount(buffer.get(), this->Length);
if (bytesRead != this->Length)
{
// TODO: error handling
}

auto writeToFileTask
= std::make_unique<WriteToFileTask>(_internal::TaskType::DiskIO, m_scheduler);
writeToFileTask->Context = Context;
writeToFileTask->Buffer = std::move(buffer);
writeToFileTask->Offset = this->Offset;
writeToFileTask->Length = Length;
writeToFileTask->MemoryGiveBack = MemoryCost;

m_scheduler->AddTask(std::move(writeToFileTask));
}

void WriteToFileTask::Execute()
{
Context->FileWriter->Write(this->Buffer.get(), this->Length, this->Offset);
Buffer.reset();
int numDownloadedBlocks
= Context->NumDownloadedChunks.fetch_add(1, std::memory_order_relaxed) + 1;
if (numDownloadedBlocks == Context->NumChunks)
{
// TODO handling complete
}
}
}}}} // namespace Azure::Storage::Blobs::_detail