Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ namespace Azure { namespace Storage {
namespace _internal {
class TransferEnd;
}

namespace _detail {
struct JobPlan;
}

namespace Blobs {

class BlobFolder final {
Expand All @@ -29,6 +34,7 @@ namespace Azure { namespace Storage {
std::string m_folderPath;

friend class _internal::TransferEnd;
friend struct Azure::Storage::_detail::JobPlan;
};

} // namespace Blobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
#include "azure/storage/datamovement/task.hpp"

namespace Azure { namespace Storage { namespace Blobs { namespace _detail {
struct WriteChunk
{
int64_t Offset;
size_t Length;
size_t MemoryGiveBack;
std::unique_ptr<uint8_t[]> Buffer;
Azure::Storage::_detail::JournalContext JournalContext;
};

struct DownloadRangeToMemoryTask final : public Storage::_internal::TaskBase
{
Expand All @@ -33,6 +41,10 @@ namespace Azure { namespace Storage { namespace Blobs { namespace _detail {
int NumChunks{0};
std::atomic<int> NumDownloadedChunks{0};
std::atomic<bool> Failed{false};
std::mutex m_writeChunksMutex;
bool m_writeTaskRunning{false};
std::map<int64_t, std::unique_ptr<WriteChunk>> m_chunksToWrite;
int64_t m_offsetToWrite{0};
};

std::shared_ptr<TaskContext> Context;
Expand All @@ -47,9 +59,7 @@ namespace Azure { namespace Storage { namespace Blobs { namespace _detail {
using TaskBase::TaskBase;

std::shared_ptr<DownloadRangeToMemoryTask::TaskContext> Context;
int64_t Offset;
size_t Length;
std::unique_ptr<uint8_t[]> Buffer;
std::vector<std::unique_ptr<WriteChunk>> m_chunksToWrite;

void Execute() noexcept override;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,9 @@ namespace Azure { namespace Storage {
_internal::JoinPath(jobPlanDir, "job_info"), std::fstream::in | std::fstream::binary);
fin.exceptions(std::fstream::failbit | std::fstream::badbit);
int32_t planFileVersion = ReadFixedInt<int32_t>(fin);
AZURE_ASSERT(planFileVersion == g_PlanFileVersion);
AZURE_ASSERT(
planFileVersion
== g_PlanFileVersion); // Assert may break the whole process, should report error here.
fin.seekg(g_JobInfoFileHeaderSize);
auto serializedJobInfo = ReadString(fin);
auto object = Core::Json::_internal::json::parse(serializedJobInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <azure/core/azure_assert.hpp>

#include "azure/storage/blobs/blob_options.hpp"
#include "azure/storage/datamovement/tasks/download_blob_to_file_task.hpp"
#include "azure/storage/datamovement/tasks/upload_blob_from_file_task.hpp"
#include "azure/storage/datamovement/utilities.hpp"
Expand All @@ -19,6 +20,32 @@ namespace Azure { namespace Storage { namespace _detail {
constexpr size_t g_DownloadBlockSize = 8 * 1024 * 1024;
constexpr size_t g_NumSubtasksPerPart = 50000;
constexpr size_t g_MaxTasksGenerated = 1000000;
constexpr int32_t g_BlobListPageSize = 250;

std::string GetParentDir(const std::string& blobPath)
{
auto pos = blobPath.find_last_of("/\\");
if (std::string::npos != pos)
{
return blobPath.substr(0, pos);
}

return std::string();
}

void CreateDirectoryIfNotExists(const std::string& dirPath)
{
if (!_internal::IsDirectory(dirPath))
{
const auto parent = GetParentDir(dirPath);
if (!parent.empty())
{
CreateDirectoryIfNotExists(parent);
}

_internal::CreateDirectory(dirPath);
}
}
} // namespace

void JobPlan::GeneratePartImpl(const PartGeneratorModel& gen)
Expand Down Expand Up @@ -127,7 +154,59 @@ namespace Azure { namespace Storage { namespace _detail {
}
else if (m_model.Source.m_type == _internal::TransferEnd::EndType::AzureBlobFolder)
{
AZURE_NOT_IMPLEMENTED();
const std::string rootDirectory = _internal::PathFromUrl(m_model.Destination.m_url);
std::string currentDirectory = rootDirectory;

partGens.push_back(gen);
CreateDirectoryIfNotExists(currentDirectory);

do
{
PartGeneratorModel currGen = std::move(partGens.back());
partGens.pop_back();

Blobs::ListBlobsOptions options;
std::string prefix = m_model.Source.m_blobFolder.Value().m_folderPath;
if (!prefix.empty() && (prefix.back() != '/'))
{
prefix.append("/");
}

options.Prefix = std::move(prefix);
options.PageSizeHint = g_BlobListPageSize;
options.ContinuationToken = std::move(currGen.ContinuationToken);

std::vector<_internal::Task> subtasks;
auto result = m_model.Source.m_blobFolder.Value().m_blobContainerClient.ListBlobs(options);
for (auto blobItem : result.Blobs)
{
TaskModel task;
const std::string blobName = blobItem.Name.substr(options.Prefix->length());
std::string localFileName = blobName;

std::string parentDir = _internal::JoinPath(rootDirectory, GetParentDir(localFileName));
if (parentDir != currentDirectory)
{
CreateDirectoryIfNotExists(parentDir); // TODO: error handling
currentDirectory = std::move(parentDir);
}

task.Source = _internal::JoinPath(currGen.Source, blobName);
task.Destination = _internal::JoinPath(currGen.Destination, localFileName);
task.ObjectSize = blobItem.BlobSize;
task.ChunkSize = g_DownloadBlockSize;
task.NumSubtasks = static_cast<int32_t>(
(blobItem.BlobSize + g_DownloadBlockSize - 1) / g_DownloadBlockSize);
task.NumSubtasks = std::max(task.NumSubtasks, 1);
taskGenerated(std::move(task));
}

if (result.NextPageToken.HasValue())
{
currGen.ContinuationToken = std::move(result.NextPageToken.Value());
partGens.push_back(std::move(currGen));
}
} while (!partGens.empty() && totalNumNewSubtasks < g_MaxTasksGenerated);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

namespace Azure { namespace Storage { namespace Blobs { namespace _detail {

namespace {
constexpr size_t WritePieceLength = 8 * 1024 * 1024;
} // namespace

void DownloadRangeToMemoryTask::Execute() noexcept
{
if (Context->Failed.load(std::memory_order_relaxed))
Expand Down Expand Up @@ -41,15 +45,42 @@ namespace Azure { namespace Storage { namespace Blobs { namespace _detail {
return;
}

std::unique_ptr<WriteChunk> writeChunk = std::make_unique<WriteChunk>();
writeChunk->Buffer = std::move(buffer);
writeChunk->Offset = this->Offset;
writeChunk->Length = Length;
writeChunk->MemoryGiveBack = 0;
std::swap(writeChunk->MemoryGiveBack, this->MemoryGiveBack);
writeChunk->JournalContext = std::move(JournalContext);

auto writeToFileTask = CreateTask<WriteToFileTask>(_internal::TaskType::DiskIO);
writeToFileTask->Context = Context;
writeToFileTask->Buffer = std::move(buffer);
writeToFileTask->Offset = this->Offset;
writeToFileTask->Length = Length;
std::swap(writeToFileTask->MemoryGiveBack, this->MemoryGiveBack);
writeToFileTask->JournalContext = std::move(JournalContext);
{
std::lock_guard<std::mutex> guard(Context->m_writeChunksMutex);
Context->m_chunksToWrite.emplace(writeChunk->Offset, std::move(writeChunk));
if (!Context->m_writeTaskRunning)
{
auto iter = Context->m_chunksToWrite.begin();
while ((iter != Context->m_chunksToWrite.end())
&& (iter->first == Context->m_offsetToWrite))
{
Context->m_offsetToWrite += iter->second->Length;
writeToFileTask->MemoryGiveBack += iter->second->MemoryGiveBack;
writeToFileTask->m_chunksToWrite.push_back(std::move(iter->second));
Context->m_chunksToWrite.erase(iter);
iter = Context->m_chunksToWrite.begin();
}
if (!writeToFileTask->m_chunksToWrite.empty())
{
Context->m_writeTaskRunning = true;
}
}
}

SharedStatus->TransferEngine->AddTask(std::move(writeToFileTask));
if (!writeToFileTask->m_chunksToWrite.empty())
{
writeToFileTask->Context = Context;
SharedStatus->TransferEngine->AddTask(std::move(writeToFileTask));
}
}

void WriteToFileTask::Execute() noexcept
Expand All @@ -59,37 +90,77 @@ namespace Azure { namespace Storage { namespace Blobs { namespace _detail {
return;
}

try
{
std::lock_guard<std::mutex> guard(Context->FileWriterMutex);
if (!Context->FileWriter)
{
std::lock_guard<std::mutex> guard(Context->FileWriterMutex);
if (!Context->FileWriter)
{
Context->FileWriter
= std::make_unique<Storage::_internal::FileWriter>(Context->Destination);
}
Context->FileWriter
= std::make_unique<Storage::_internal::FileWriter>(Context->Destination);
}
Context->FileWriter->Write(this->Buffer.get(), this->Length, this->Offset);
}
catch (std::exception&)

for (auto& chunkToWrite : m_chunksToWrite)
{
bool firstFailure = !Context->Failed.exchange(true, std::memory_order_relaxed);
if (firstFailure)
int64_t offset = chunkToWrite->Offset;
size_t length = chunkToWrite->Length;
uint8_t* bufferPointer = chunkToWrite->Buffer.get();
try
{
TransferFailed(Context->Source.GetUrl(), _internal::PathToUrl(Context->Destination));
while (length > 0)
{
size_t thisWriteLength = std::min(length, WritePieceLength);
Context->FileWriter->Write(bufferPointer, thisWriteLength, offset);
bufferPointer += thisWriteLength;
offset += thisWriteLength;
length -= thisWriteLength;
}
}
catch (std::exception&)
{
bool firstFailure = !Context->Failed.exchange(true, std::memory_order_relaxed);
if (firstFailure)
{
TransferFailed(Context->Source.GetUrl(), _internal::PathToUrl(Context->Destination));
}
return;
}

int numDownloadedBlocks
= Context->NumDownloadedChunks.fetch_add(1, std::memory_order_relaxed) + 1;
JournalContext = std::move(chunkToWrite->JournalContext);
if (numDownloadedBlocks == Context->NumChunks)
{
TransferSucceeded(chunkToWrite->Length, 1);
return;
}
else
{
TransferSucceeded(chunkToWrite->Length, 0);
}
return;
}

int numDownloadedBlocks
= Context->NumDownloadedChunks.fetch_add(1, std::memory_order_relaxed) + 1;
if (numDownloadedBlocks == Context->NumChunks)
auto writeToFileTask = CreateTask<WriteToFileTask>(_internal::TaskType::DiskIO);
{
TransferSucceeded(Length, 1);
std::lock_guard<std::mutex> guard(Context->m_writeChunksMutex);
auto iter = Context->m_chunksToWrite.begin();
while ((iter != Context->m_chunksToWrite.end()) && (iter->first == Context->m_offsetToWrite))
{
Context->m_offsetToWrite += iter->second->Length;
writeToFileTask->MemoryGiveBack += iter->second->MemoryGiveBack;
writeToFileTask->m_chunksToWrite.push_back(std::move(iter->second));
Context->m_chunksToWrite.erase(iter);
iter = Context->m_chunksToWrite.begin();
}
if (writeToFileTask->m_chunksToWrite.empty())
{
Context->m_writeTaskRunning = false;
}
}
else

if (!writeToFileTask->m_chunksToWrite.empty())
{
TransferSucceeded(Length, 0);
writeToFileTask->Context = Context;
SharedStatus->TransferEngine->AddTask(std::move(writeToFileTask));
}
}
}}}} // namespace Azure::Storage::Blobs::_detail
Loading