Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace Azure { namespace Storage { namespace _internal {

class FileWriter final {
public:
FileWriter(const std::string& filename);
FileWriter(const std::string& filename, bool truncate = true);
FileWriter(const FileWriter&) = delete;
FileWriter& operator=(const FileWriter&) = delete;
~FileWriter();
Expand Down
21 changes: 15 additions & 6 deletions sdk/storage/azure-storage-common/src/file_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ namespace Azure { namespace Storage { namespace _internal {
return bytesRead;
}

FileWriter::FileWriter(const std::string& filename)
FileWriter::FileWriter(const std::string& filename, bool truncate)
{
DWORD creationDisposition = truncate ? CREATE_ALWAYS : OPEN_ALWAYS;
const std::wstring filenameW = Utf8ToWide(filename);

HANDLE fileHandle;
Expand All @@ -162,12 +163,16 @@ namespace Azure { namespace Storage { namespace _internal {
GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE,
nullptr,
CREATE_ALWAYS,
creationDisposition,
FILE_ATTRIBUTE_NORMAL,
NULL);
#else
fileHandle = CreateFile2(
filenameW.data(), GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, CREATE_ALWAYS, NULL);
filenameW.data(),
GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE,
creationDisposition,
NULL);
#endif
if (fileHandle == INVALID_HANDLE_VALUE)
{
Expand Down Expand Up @@ -235,10 +240,14 @@ namespace Azure { namespace Storage { namespace _internal {
return bytesRead;
}

FileWriter::FileWriter(const std::string& filename)
FileWriter::FileWriter(const std::string& filename, bool truncate)
{
m_handle = open(
filename.data(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
int flags = O_WRONLY | O_CREAT;
if (truncate)
{
flags |= O_TRUNC;
}
m_handle = open(filename.data(), flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (m_handle == -1)
{
throw std::runtime_error("Failed to open file.");
Expand Down
2 changes: 2 additions & 0 deletions sdk/storage/azure-storage-datamovement/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ set(
inc/azure/storage/datamovement/storage_transfer_manager.hpp
inc/azure/storage/datamovement/task.hpp
inc/azure/storage/datamovement/task_shared_status.hpp
inc/azure/storage/datamovement/tasks/async_copy_blob_task.hpp
inc/azure/storage/datamovement/tasks/download_blob_to_file_task.hpp
inc/azure/storage/datamovement/tasks/upload_blob_from_file_task.hpp
inc/azure/storage/datamovement/transfer_engine.hpp
Expand All @@ -69,6 +70,7 @@ set(
src/private/package_version.hpp
src/storage_transfer_manager.cpp
src/task_shared_status.cpp
src/tasks/async_copy_blob_task.cpp
src/tasks/download_blob_to_file_task.cpp
src/tasks/upload_blob_from_file_task.cpp
src/transfer_engine.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace Azure { namespace Storage {
std::string m_folderPath;

friend class _internal::TransferEnd;
friend struct Azure::Storage::_detail::JobPlan;
friend struct Storage::_detail::JobPlan;
};

} // namespace Blobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ namespace Azure { namespace Storage { namespace Blobs {
const BlobFolder& sourceBlobFolder,
const std::string& destinationLocalPath,
const ScheduleDownloadBlobOptions& options = ScheduleDownloadBlobOptions());

JobProperties ScheduleCopy(
const BlobClient& sourceBlob,
const BlobClient& destinationBlob,
const ScheduleCopyBlobOptions& options = ScheduleCopyBlobOptions());

JobProperties ScheduleCopyDirectory(
const BlobFolder& sourceBlobFolder,
const BlobFolder& destinationBlobFolder,
const ScheduleCopyBlobOptions& options = ScheduleCopyBlobOptions());
};

}}} // namespace Azure::Storage::Blobs
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,11 @@ namespace Azure { namespace Storage {
std::function<void(const TransferProgress&)> ProgressHandler;
std::function<void(TransferError&)> ErrorHandler;
};

struct ScheduleCopyBlobOptions final
{
std::function<void(const TransferProgress&)> ProgressHandler;
std::function<void(TransferError&)> ErrorHandler;
};
} // namespace Blobs
}} // namespace Azure::Storage
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ namespace Azure { namespace Storage {
Nullable<Blobs::BlobClient> m_blobClient;
Nullable<Blobs::BlobFolder> m_blobFolder;

friend struct JobModel;
friend struct _detail::JobPlan;
friend class _detail::JobEngine;
};
Expand All @@ -73,6 +74,8 @@ namespace Azure { namespace Storage {
{
TransferEnd Source;
TransferEnd Destination;

static TransferType DeduceTransferType(const JobModel& model);
};

struct HydrationParameters final
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ namespace Azure { namespace Storage {
enum class TransferType
{
SingleUpload = 0,
SingleDownload = 1,
DirectoryUpload = 2,
DirectoryUpload = 1,
SingleDownload = 2,
DirectoryDownload = 3,
SingleCopy = 4,
DirectoryCopy = 5,
};

enum class JobStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,10 @@ namespace Azure { namespace Storage {

using Task = std::unique_ptr<TaskBase>;

struct DummyTask final : public Storage::_internal::TaskBase
{
using TaskBase::TaskBase;
void Execute() noexcept override { AZURE_UNREACHABLE_CODE(); }
};
} // namespace _internal
}} // namespace Azure::Storage
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT

#pragma once

#include <azure/storage/blobs.hpp>

#include "azure/storage/datamovement/task.hpp"

namespace Azure { namespace Storage { namespace Blobs { namespace _detail {

struct AsyncCopyBlobTask final : public Storage::_internal::TaskBase
{
explicit AsyncCopyBlobTask(
_internal::TaskType type,
Blobs::BlobClient source,
Blobs::BlobClient destination)
: TaskBase(type), Source(std::move(source)), Destination(std::move(destination))
{
}

Blobs::BlobClient Source;
Blobs::BlobClient Destination;

void Execute() noexcept override;
};

struct WaitAsyncCopyToFinishTask final : public Storage::_internal::TaskBase
{
explicit WaitAsyncCopyToFinishTask(
_internal::TaskType type,
Blobs::BlobClient source,
Blobs::BlobClient destination)
: TaskBase(type), Source(std::move(source)), Destination(std::move(destination))
{
}

Blobs::BlobClient Source;
Blobs::BlobClient Destination;

void Execute() noexcept override;
};

}}}} // namespace Azure::Storage::Blobs::_detail
Original file line number Diff line number Diff line change
Expand Up @@ -3,64 +3,78 @@

#pragma once

#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include <azure/storage/blobs/blob_client.hpp>
#include <azure/storage/common/internal/file_io.hpp>

#include "azure/storage/datamovement/datamovement_options.hpp"
#include "azure/storage/datamovement/task.hpp"

namespace Azure { namespace Storage { namespace Blobs { namespace _detail {
struct WriteChunk
{
int64_t Offset;
size_t Length;
size_t MemoryGiveBack;
std::unique_ptr<uint8_t[]> Buffer;
Azure::Storage::_detail::JournalContext JournalContext;
};

struct DownloadRangeToMemoryTask final : public Storage::_internal::TaskBase
{
using TaskBase::TaskBase;

struct TaskContext final
namespace Azure { namespace Storage {

namespace _internal {
class TransferEngine;
}
namespace Blobs { namespace _detail {

struct WriteChunk
{
explicit TaskContext(Blobs::BlobClient source, std::string destination)
: Source(std::move(source)), Destination(std::move(destination))
{
}
Blobs::BlobClient Source;
std::string Destination;
std::mutex FileWriterMutex; // TODO: optimize this if it becomes a bottleneck
std::unique_ptr<Storage::_internal::FileWriter> FileWriter;
uint64_t FileSize{0};
int NumChunks{0};
std::atomic<int> NumDownloadedChunks{0};
std::atomic<bool> Failed{false};
std::mutex m_writeChunksMutex;
bool m_writeTaskRunning{false};
std::map<int64_t, std::unique_ptr<WriteChunk>> m_chunksToWrite;
int64_t m_offsetToWrite{0};
int64_t Offset;
size_t Length;
size_t MemoryGiveBack = 0;
std::unique_ptr<uint8_t[]> Buffer;
Azure::Storage::_detail::JournalContext JournalContext;
};

std::shared_ptr<TaskContext> Context;
int64_t Offset;
size_t Length;
struct DownloadRangeToMemoryTask final : public Storage::_internal::TaskBase
{
using TaskBase::TaskBase;

struct TaskContext final
{
explicit TaskContext(Blobs::BlobClient source, std::string destination)
: Source(std::move(source)), Destination(std::move(destination))
{
}
~TaskContext();

Blobs::BlobClient Source;
std::string Destination;

void Execute() noexcept override;
};
std::mutex FileWriterMutex; // TODO: optimize this if it becomes a bottleneck
std::unique_ptr<Storage::_internal::FileWriter> FileWriter;
uint64_t FileSize{0};
int NumChunks{0};
std::atomic<int> NumDownloadedChunks{0};
std::atomic<bool> Failed{false};

struct WriteToFileTask final : public Storage::_internal::TaskBase
{
using TaskBase::TaskBase;
std::mutex WriteChunksMutex;
bool WriteTaskRunning{false};
std::map<int64_t, std::unique_ptr<WriteChunk>> ChunksToWrite;
int64_t OffsetToWrite{-1};
_internal::TransferEngine* TransferEngine = nullptr;
};

std::shared_ptr<DownloadRangeToMemoryTask::TaskContext> Context;
std::vector<std::unique_ptr<WriteChunk>> m_chunksToWrite;
std::shared_ptr<TaskContext> Context;
int64_t Offset;
size_t Length;

void Execute() noexcept override;
};
}}}} // namespace Azure::Storage::Blobs::_detail
void Execute() noexcept override;
};

struct WriteToFileTask final : public Storage::_internal::TaskBase
{
using TaskBase::TaskBase;

std::shared_ptr<DownloadRangeToMemoryTask::TaskContext> Context;
std::vector<std::unique_ptr<WriteChunk>> ChunksToWrite;

void Execute() noexcept override;
};
}} // namespace Blobs::_detail
}} // namespace Azure::Storage
Loading