Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NBS-5637: Asynchronous disks allocation #2763

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class TDiskRegistryActor final
// Pending requests
TDeque<TPendingRequest> PendingRequests;

THashMap<TDiskId, TVector<TRequestInfoPtr>> PendingDiskAllocationRequests;
THashMap<TDiskId, TVector<TRequestInfoPtr>> PendingDiskDeallocationRequests;

bool BrokenDisksDestructionInProgress = false;
Expand Down Expand Up @@ -227,6 +228,20 @@ class TDiskRegistryActor final
TDiskRegistryDatabase& db,
TDiskRegistryStateSnapshot& args);

void AddPendingAllocation(
const NActors::TActorContext& ctx,
const TString& diskId,
TRequestInfoPtr requestInfoPtr);

void ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
const TString& diskId);

void ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
TVector<TRequestInfoPtr>& requestInfos,
NProto::TError error);

void AddPendingDeallocation(
const NActors::TActorContext& ctx,
const TString& diskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,79 @@ void TDiskRegistryActor::CompleteAddDisk(
response->Record.SetMuteIOErrors(args.MuteIOErrors);
}

NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
TVector<TDeviceId> dirtyDevices;
for (const auto& device: args.Devices) {
const bool dirty = State->IsDirtyDevice(device.GetDeviceUUID());
if (dirty) {
dirtyDevices.push_back(device.GetDeviceUUID());
}
}

if (HasError(args.Error) || args.Error.GetCode() == S_ALREADY || dirtyDevices.empty()) {
NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
} else {
AddPendingAllocation(ctx, args.DiskId, args.RequestInfo);
}

DestroyBrokenDisks(ctx);
SecureErase(ctx); // I think we need this only if allocating dirty devices
NotifyUsers(ctx);
}

void TDiskRegistryActor::AddPendingAllocation(
const NActors::TActorContext& ctx,
const TString& diskId,
TRequestInfoPtr requestInfo)
{
auto& requestInfos = PendingDiskAllocationRequests[diskId];

// TODO: GetMaxNonReplicatedDiskAllocationRequests
if (requestInfos.size() > Config->GetMaxNonReplicatedDiskDeallocationRequests()) {
LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Too many pending allocation requests (%lu) for disk %s. "
"Reject all requests.",
requestInfos.size(),
diskId.Quote().c_str());

ReplyToPendingAllocations(ctx, requestInfos, MakeError(E_REJECTED));
}

requestInfos.emplace_back(std::move(requestInfo));
}

void TDiskRegistryActor::ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
TVector<TRequestInfoPtr>& requestInfos,
NProto::TError error)
{
for (auto& requestInfo: requestInfos) {
NCloud::Reply(
ctx,
*requestInfo,
std::make_unique<TEvDiskRegistry::TEvAllocateDiskResponse>(error));
}
requestInfos.clear();
}

void TDiskRegistryActor::ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
const TString& diskId)
{
auto it = PendingDiskAllocationRequests.find(diskId);
if (it == PendingDiskAllocationRequests.end()) {
return;
}

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Reply to pending allocation requests. DiskId=%s PendingRequests=%d",
diskId.Quote().c_str(),
static_cast<int>(it->second.size()));

ReplyToPendingAllocations(ctx, it->second, MakeError(S_OK));

PendingDiskAllocationRequests.erase(it);
}

////////////////////////////////////////////////////////////////////////////////

void TDiskRegistryActor::HandleDeallocateDisk(
Expand Down Expand Up @@ -379,7 +446,7 @@ void TDiskRegistryActor::ReplyToPendingDeallocations(
}

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Reply to pending deallocation requests. DiskId=%s PendingRquests=%d",
"Reply to pending deallocation requests. DiskId=%s PendingRequests=%d",
diskId.Quote().c_str(),
static_cast<int>(it->second.size()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ void TDiskRegistryActor::ExecuteCleanupDevices(
TTxDiskRegistry::TCleanupDevices& args)
{
TDiskRegistryDatabase db(tx.DB);
args.SyncDeallocatedDisks =
State->MarkDevicesAsClean(ctx.Now(), db, args.Devices);
std::tie(args.SyncAllocatedDisks, args.SyncDeallocatedDisks) =
std::move(State->MarkDevicesAsClean(ctx.Now(), db, args.Devices));
}

void TDiskRegistryActor::CompleteCleanupDevices(
Expand All @@ -293,6 +293,10 @@ void TDiskRegistryActor::CompleteCleanupDevices(
for (const auto& diskId: args.SyncDeallocatedDisks) {
ReplyToPendingDeallocations(ctx, diskId);
}

for (const auto& diskId: args.SyncAllocatedDisks) {
ReplyToPendingAllocations(ctx, diskId);
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
39 changes: 26 additions & 13 deletions cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1299,16 +1299,17 @@ NProto::TError TDiskRegistryState::ReplaceDeviceWithoutDiskStateUpdate(
}

if (!manual && !deviceReplacementId.empty()) {
auto cleaningDiskId =
auto [allocating, deallocating] =
PendingCleanup.FindDiskId(deviceReplacementId);
if (!cleaningDiskId.empty() && cleaningDiskId != diskId) {
Y_UNUSED(allocating); // TODO: probably should react to this
if (deallocating && *deallocating != diskId) {
return MakeError(
E_ARGUMENT,
TStringBuilder()
<< "can't allocate specific device "
<< deviceReplacementId.Quote() << " for disk " << diskId
<< " since it is in pending cleanup for disk "
<< cleaningDiskId);
<< *deallocating);
}
}

Expand Down Expand Up @@ -3855,16 +3856,20 @@ bool TDiskRegistryState::MarkDeviceAsDirty(
return true;
}

TDiskRegistryState::TDiskId TDiskRegistryState::MarkDeviceAsClean(
TDiskRegistryState::TOpt2Disk TDiskRegistryState::MarkDeviceAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TDeviceId& uuid)
{
auto ret = MarkDevicesAsClean(now, db, TVector<TDeviceId>{uuid});
return ret.empty() ? "" : ret[0];
auto [alloc, dealloc] = MarkDevicesAsClean(now, db, TVector<TDeviceId>{uuid});
return {
alloc.empty() ? std::nullopt : std::make_optional(std::move(alloc[0])),
dealloc.empty() ? std::nullopt
: std::make_optional(std::move(dealloc[0]))};
}

TVector<TDiskRegistryState::TDiskId> TDiskRegistryState::MarkDevicesAsClean(
std::pair<TDiskRegistryState::TAllocatedDisksList, TDiskRegistryState::TDellocatedDisksList>
TDiskRegistryState::MarkDevicesAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TVector<TDeviceId>& uuids)
Expand All @@ -3878,14 +3883,19 @@ TVector<TDiskRegistryState::TDiskId> TDiskRegistryState::MarkDevicesAsClean(
}
}

TVector<TDiskId> ret;
TAllocatedDisksList allocatedDisks;
TDellocatedDisksList dellocatedDisks;
for (const auto& uuid: TryUpdateDevices(now, db, uuids)) {
if (auto diskId = PendingCleanup.EraseDevice(uuid); !diskId.empty()) {
ret.push_back(std::move(diskId));
auto [allocatedDisk, deallocatedDisk] = PendingCleanup.EraseDevice(uuid);
if (allocatedDisk) {
allocatedDisks.push_back(std::move(*allocatedDisk));
}
if (deallocatedDisk) {
dellocatedDisks.push_back(std::move(*deallocatedDisk));
}
}

return ret;
return {std::move(allocatedDisks), std::move(dellocatedDisks)};
}

bool TDiskRegistryState::TryUpdateDevice(
Expand Down Expand Up @@ -5072,8 +5082,9 @@ bool TDiskRegistryState::HasDependentSsdDisks(
continue;
}

auto [allocating, deallocating] = PendingCleanup.FindDiskId(d.GetDeviceUUID());
if (d.GetPoolKind() == NProto::DEVICE_POOL_KIND_LOCAL &&
PendingCleanup.FindDiskId(d.GetDeviceUUID()))
(allocating || deallocating))
{
return true;
}
Expand Down Expand Up @@ -6354,7 +6365,9 @@ NProto::TDiskRegistryStateBackup TDiskRegistryState::BackupState() const
transform(GetDirtyDevices(), backup.MutableDirtyDevices(), [this] (auto& x) {
NProto::TDiskRegistryStateBackup::TDirtyDevice dd;
dd.SetId(x.GetDeviceUUID());
dd.SetDiskId(PendingCleanup.FindDiskId(x.GetDeviceUUID()));
auto [allocating, deallocating] = PendingCleanup.FindDiskId(x.GetDeviceUUID()); // TODO: need to backup
Y_UNUSED(allocating);
dd.SetDiskId(*deallocating);

return dd;
});
Expand Down
18 changes: 14 additions & 4 deletions cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,15 @@ class TDiskRegistryState
using TCheckpoints = THashMap<TCheckpointId, TCheckpointInfo>;
using TPlacementGroups = THashMap<TString, TPlacementGroupInfo>;

using TAllocatingDiskId = TDiskRegistryState::TDiskId;
using TDeallocatingDiskId = TDiskRegistryState::TDiskId;
using TAllocatedDisksList = TVector<TAllocatingDiskId>;
using TDellocatedDisksList = TVector<TDeallocatingDiskId>;

template<typename T, typename U>
using TOpt2 = std::pair<std::optional<T>, std::optional<U>>;
using TOpt2Disk = TOpt2<TAllocatingDiskId, TDeallocatingDiskId>;

private:
TLog Log;

Expand Down Expand Up @@ -503,16 +512,17 @@ class TDiskRegistryState

/// Mark selected device as clean and remove it
/// from lists of suspended/dirty/pending cleanup devices
/// @return disk id where selected device was allocated
TDiskId MarkDeviceAsClean(
/// @return allocated/deallocated disk id of where selected device was allocated/deallocated
TOpt2Disk MarkDeviceAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TDeviceId& uuid);

/// Mark selected devices as clean and remove them
/// from lists of suspended/dirty/pending cleanup devices
/// @return vector of disk ids where selected devices were allocated
TVector<TDiskId> MarkDevicesAsClean(
/// @return vector of allocated/deallocated disk ids where selected devices were allocated/deallocated
std::pair<TAllocatedDisksList, TDellocatedDisksList>
MarkDevicesAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TVector<TDeviceId>& uuids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ struct TTxDiskRegistry
const TRequestInfoPtr RequestInfo;
const TVector<TString> Devices;

TVector<TString> SyncAllocatedDisks;
TVector<TString> SyncDeallocatedDisks;

explicit TCleanupDevices(
Expand All @@ -316,6 +317,7 @@ struct TTxDiskRegistry

void Clear()
{
SyncAllocatedDisks.clear();
SyncDeallocatedDisks.clear();
}
};
Expand Down
Loading
Loading