diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index c8921ca702f1..7ee711ed243d 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -269,6 +269,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/logstore:logstore_test", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_test", "//pkg/kv/kvserver/loqrecovery:loqrecovery_test", + "//pkg/kv/kvserver/mmaintegration:mmaintegration_test", "//pkg/kv/kvserver/multiqueue:multiqueue_test", "//pkg/kv/kvserver/print:print_test", "//pkg/kv/kvserver/protectedts/ptcache:ptcache_test", @@ -1462,7 +1463,6 @@ GO_TARGETS = [ "//pkg/kv/kvserver/allocator/load:load_test", "//pkg/kv/kvserver/allocator/mmaprototype:mmaprototype", "//pkg/kv/kvserver/allocator/mmaprototype:mmaprototype_test", - "//pkg/kv/kvserver/allocator/mmaprototypehelpers:mmaprototypehelpers", "//pkg/kv/kvserver/allocator/plan:plan", "//pkg/kv/kvserver/allocator/plan:plan_test", "//pkg/kv/kvserver/allocator/storepool:storepool", @@ -1557,6 +1557,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_test", "//pkg/kv/kvserver/loqrecovery:loqrecovery", "//pkg/kv/kvserver/loqrecovery:loqrecovery_test", + "//pkg/kv/kvserver/mmaintegration:mmaintegration", + "//pkg/kv/kvserver/mmaintegration:mmaintegration_test", "//pkg/kv/kvserver/multiqueue:multiqueue", "//pkg/kv/kvserver/multiqueue:multiqueue_test", "//pkg/kv/kvserver/print:print", diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 677e3ee5612d..f5fcad005793 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -168,6 +168,7 @@ go_library( "//pkg/kv/kvserver/load", "//pkg/kv/kvserver/lockspanset", "//pkg/kv/kvserver/logstore", + "//pkg/kv/kvserver/mmaintegration", "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/print", "//pkg/kv/kvserver/raftentry", diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go index cef6652b3442..f67af269f822 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go @@ -57,9 +57,9 @@ func (rt ReplicaIDAndType) SafeFormat(w redact.SafePrinter, _ rune) { default: w.Print(rt.ReplicaID) } - w.Printf(" type=%v", rt.ReplicaType.ReplicaType) + w.Printf(",type=%v", rt.ReplicaType.ReplicaType) if rt.IsLeaseholder { - w.Print(" leaseholder=true") + w.Print(",leaseholder=true") } } @@ -116,6 +116,14 @@ type ReplicaState struct { // beneficial. } +func (rs ReplicaState) String() string { + return redact.StringWithoutMarkers(rs) +} + +func (rs ReplicaState) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("%s,lagging=%v", rs.ReplicaIDAndType, rs.VoterIsLagging) +} + // ChangeID is a unique ID, in the context of this data-structure and when // receiving updates about enactment having happened or having been rejected // (by the component responsible for change enactment). @@ -133,13 +141,13 @@ const ( func (s ReplicaChangeType) String() string { switch s { case AddLease: - return "AddLease" + return "add-lease" case RemoveLease: - return "RemoveLease" + return "remove-lease" case AddReplica: - return "AddReplica" + return "add-replica" case RemoveReplica: - return "RemoveReplica" + return "remove-replica" default: panic("unknown ReplicaChangeType") } @@ -183,7 +191,7 @@ func (rc ReplicaChange) String() string { // SafeFormat implements the redact.SafeFormatter interface. func (rc ReplicaChange) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("r%v type: %v target store %v (%v)->(%v)", rc.rangeID, rc.replicaChangeType, rc.target, rc.prev, rc.next) + w.Printf("r%v:%v,target=(%v), prev=(%v)->next=(%v)", rc.rangeID, rc.replicaChangeType, rc.target, rc.prev, rc.next) } // isRemoval returns true if the change is a removal of a replica. @@ -375,6 +383,22 @@ type PendingRangeChange struct { pendingReplicaChanges []*pendingReplicaChange } +func MakePendingRangeChangeForTesting( + rangeID roachpb.RangeID, changes []ReplicaChange, changeIDs []ChangeID, +) PendingRangeChange { + prc := PendingRangeChange{ + RangeID: rangeID, + pendingReplicaChanges: make([]*pendingReplicaChange, len(changes)), + } + for i, c := range changes { + prc.pendingReplicaChanges[i] = &pendingReplicaChange{ + ChangeID: changeIDs[i], + ReplicaChange: c, + } + } + return prc +} + func (prc PendingRangeChange) String() string { return redact.StringWithoutMarkers(prc) } diff --git a/pkg/kv/kvserver/allocator/mmaprototype/load.go b/pkg/kv/kvserver/allocator/mmaprototype/load.go index 9654723ce1bd..4cfe8930d320 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/load.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/load.go @@ -141,6 +141,15 @@ type RangeLoad struct { RaftCPU LoadValue } +func (rl RangeLoad) String() string { + return redact.StringWithoutMarkers(rl) +} + +func (rl RangeLoad) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("(cpu:%d,raft-cpu:%d,write-band:%d,byte-size:%d)", + rl.Load[CPURate], rl.RaftCPU, rl.Load[WriteBandwidth], rl.Load[ByteSize]) +} + // storeLoad is the load information for a store. Roughly, this is the // information we need each store to provide us periodically, i.e., // StoreLoadMsg is the input used to compute this. diff --git a/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/rebalance_replica b/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/rebalance_replica index b691506e6ed0..707cde498e1a 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/rebalance_replica +++ b/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/rebalance_replica @@ -25,7 +25,7 @@ store-id=1 ranges ---- range-id=1 load=[cpu:80, write-bandwidth:80, byte-size:80] raft-cpu=20 - store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true + store-id=1 replica-id=1,type=VOTER_FULL,leaseholder=true get-load-info @@ -39,16 +39,16 @@ make-pending-changes range-id=1 ---- pending(2) change-id=1 store-id=2 node-id=2 range-id=1 load-delta=[cpu:88, write-bandwidth:88, byte-size:88] start=0s - prev=(replica-id=none type=VOTER_FULL) - next=(replica-id=unknown type=VOTER_FULL leaseholder=true) + prev=(replica-id=none,type=VOTER_FULL,lagging=false) + next=(replica-id=unknown,type=VOTER_FULL,leaseholder=true) change-id=2 store-id=1 node-id=1 range-id=1 load-delta=[cpu:-80, write-bandwidth:-80, byte-size:-80] start=0s - prev=(replica-id=1 type=VOTER_FULL leaseholder=true) - next=(replica-id=none type=VOTER_FULL) + prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false) + next=(replica-id=none,type=VOTER_FULL) ranges ---- range-id=1 load=[cpu:80, write-bandwidth:80, byte-size:80] raft-cpu=20 - store-id=2 replica-id=unknown type=VOTER_FULL leaseholder=true + store-id=2 replica-id=unknown,type=VOTER_FULL,leaseholder=true get-load-info ---- @@ -75,11 +75,11 @@ get-pending-changes ---- pending(2) change-id=1 store-id=2 node-id=2 range-id=1 load-delta=[cpu:88, write-bandwidth:88, byte-size:88] start=0s enacted=0s - prev=(replica-id=none type=VOTER_FULL) - next=(replica-id=unknown type=VOTER_FULL leaseholder=true) + prev=(replica-id=none,type=VOTER_FULL,lagging=false) + next=(replica-id=unknown,type=VOTER_FULL,leaseholder=true) change-id=2 store-id=1 node-id=1 range-id=1 load-delta=[cpu:-80, write-bandwidth:-80, byte-size:-80] start=0s enacted=0s - prev=(replica-id=1 type=VOTER_FULL leaseholder=true) - next=(replica-id=none type=VOTER_FULL) + prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false) + next=(replica-id=none,type=VOTER_FULL) store-load-msg store-id=2 node-id=2 load=[80,80,80] capacity=[100,100,100] secondary-load=1 load-time=15s @@ -89,8 +89,8 @@ get-pending-changes ---- pending(1) change-id=2 store-id=1 node-id=1 range-id=1 load-delta=[cpu:-80, write-bandwidth:-80, byte-size:-80] start=0s enacted=0s - prev=(replica-id=1 type=VOTER_FULL leaseholder=true) - next=(replica-id=none type=VOTER_FULL) + prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false) + next=(replica-id=none,type=VOTER_FULL) get-load-info ---- diff --git a/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/remove_replica b/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/remove_replica index c677b308c7ad..7b499f2a014b 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/remove_replica +++ b/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/remove_replica @@ -33,23 +33,23 @@ store-id=2 node-id=2 reported=[cpu:20, write-bandwidth:80, byte-size:80] adjuste ranges ---- range-id=1 load=[cpu:80, write-bandwidth:80, byte-size:80] raft-cpu=20 - store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true - store-id=2 replica-id=2 type=VOTER_FULL + store-id=1 replica-id=1,type=VOTER_FULL,leaseholder=true + store-id=2 replica-id=2,type=VOTER_FULL make-pending-changes range-id=1 remove-replica: remove-store-id=2 ---- pending(1) change-id=1 store-id=2 node-id=2 range-id=1 load-delta=[cpu:-20, write-bandwidth:-80, byte-size:-80] start=0s - prev=(replica-id=2 type=VOTER_FULL) - next=(replica-id=none type=VOTER_FULL) + prev=(replica-id=2,type=VOTER_FULL,lagging=false) + next=(replica-id=none,type=VOTER_FULL) # We should see the change applied to the range state, with only the replica on # store 1 remaining. ranges ---- range-id=1 load=[cpu:80, write-bandwidth:80, byte-size:80] raft-cpu=20 - store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true + store-id=1 replica-id=1,type=VOTER_FULL,leaseholder=true # The load info for s2 should also reflect the load delta [-20,-80,-80] being # applied. @@ -76,8 +76,8 @@ get-pending-changes ---- pending(1) change-id=1 store-id=2 node-id=2 range-id=1 load-delta=[cpu:-20, write-bandwidth:-80, byte-size:-80] start=0s enacted=0s - prev=(replica-id=2 type=VOTER_FULL) - next=(replica-id=none type=VOTER_FULL) + prev=(replica-id=2,type=VOTER_FULL,lagging=false) + next=(replica-id=none,type=VOTER_FULL) store-load-msg store-id=2 node-id=2 load=[0,0,0] capacity=[100,100,100] secondary-load=0 load-time=15s diff --git a/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/removed_ranges b/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/removed_ranges index 2cd0f0a57bcf..0527fb4e1f2f 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/removed_ranges +++ b/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/removed_ranges @@ -19,18 +19,18 @@ store-id=1 ranges ---- range-id=1 load=[cpu:80, write-bandwidth:80, byte-size:80] raft-cpu=20 - store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true + store-id=1 replica-id=1,type=VOTER_FULL,leaseholder=true make-pending-changes range-id=1 rebalance-replica: remove-store-id=1 add-store-id=2 ---- pending(2) change-id=1 store-id=2 node-id=2 range-id=1 load-delta=[cpu:88, write-bandwidth:88, byte-size:88] start=0s - prev=(replica-id=none type=VOTER_FULL) - next=(replica-id=unknown type=VOTER_FULL leaseholder=true) + prev=(replica-id=none,type=VOTER_FULL,lagging=false) + next=(replica-id=unknown,type=VOTER_FULL,leaseholder=true) change-id=2 store-id=1 node-id=1 range-id=1 load-delta=[cpu:-80, write-bandwidth:-80, byte-size:-80] start=0s - prev=(replica-id=1 type=VOTER_FULL leaseholder=true) - next=(replica-id=none type=VOTER_FULL) + prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false) + next=(replica-id=none,type=VOTER_FULL) # Advance time to 1s so that we can see pending changes for removed ranges are # properly enacted. @@ -47,11 +47,11 @@ get-pending-changes ---- pending(2) change-id=1 store-id=2 node-id=2 range-id=1 load-delta=[cpu:88, write-bandwidth:88, byte-size:88] start=0s enacted=1s - prev=(replica-id=none type=VOTER_FULL) - next=(replica-id=unknown type=VOTER_FULL leaseholder=true) + prev=(replica-id=none,type=VOTER_FULL,lagging=false) + next=(replica-id=unknown,type=VOTER_FULL,leaseholder=true) change-id=2 store-id=1 node-id=1 range-id=1 load-delta=[cpu:-80, write-bandwidth:-80, byte-size:-80] start=0s enacted=1s - prev=(replica-id=1 type=VOTER_FULL leaseholder=true) - next=(replica-id=none type=VOTER_FULL) + prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false) + next=(replica-id=none,type=VOTER_FULL) ranges ---- diff --git a/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/transfer_lease b/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/transfer_lease index bc2167e8363f..39f05e09c4b1 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/transfer_lease +++ b/pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/transfer_lease @@ -37,19 +37,19 @@ store-id=2 node-id=2 reported=[cpu:20, write-bandwidth:80, byte-size:80] adjuste ranges ---- range-id=1 load=[cpu:80, write-bandwidth:80, byte-size:80] raft-cpu=20 - store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true - store-id=2 replica-id=2 type=VOTER_FULL + store-id=1 replica-id=1,type=VOTER_FULL,leaseholder=true + store-id=2 replica-id=2,type=VOTER_FULL make-pending-changes range-id=1 transfer-lease: remove-store-id=1 add-store-id=2 ---- pending(2) change-id=1 store-id=1 node-id=1 range-id=1 load-delta=[cpu:-60, write-bandwidth:0, byte-size:0] start=0s - prev=(replica-id=1 type=VOTER_FULL leaseholder=true) - next=(replica-id=1 type=VOTER_FULL) + prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false) + next=(replica-id=1,type=VOTER_FULL) change-id=2 store-id=2 node-id=2 range-id=1 load-delta=[cpu:66, write-bandwidth:0, byte-size:0] start=0s - prev=(replica-id=2 type=VOTER_FULL) - next=(replica-id=2 type=VOTER_FULL leaseholder=true) + prev=(replica-id=2,type=VOTER_FULL,lagging=false) + next=(replica-id=2,type=VOTER_FULL,leaseholder=true) get-load-info ---- @@ -61,8 +61,8 @@ store-id=2 node-id=2 reported=[cpu:20, write-bandwidth:80, byte-size:80] adjuste ranges ---- range-id=1 load=[cpu:80, write-bandwidth:80, byte-size:80] raft-cpu=20 - store-id=1 replica-id=1 type=VOTER_FULL - store-id=2 replica-id=2 type=VOTER_FULL leaseholder=true + store-id=1 replica-id=1,type=VOTER_FULL + store-id=2 replica-id=2,type=VOTER_FULL,leaseholder=true reject-pending-changes change-ids=(1,2) ---- @@ -78,8 +78,8 @@ store-id=2 node-id=2 reported=[cpu:20, write-bandwidth:80, byte-size:80] adjuste ranges ---- range-id=1 load=[cpu:80, write-bandwidth:80, byte-size:80] raft-cpu=20 - store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true - store-id=2 replica-id=2 type=VOTER_FULL + store-id=1 replica-id=1,type=VOTER_FULL,leaseholder=true + store-id=2 replica-id=2,type=VOTER_FULL # Set both existing stores again, this should have no effect. set-store diff --git a/pkg/kv/kvserver/allocator/mmaprototypehelpers/BUILD.bazel b/pkg/kv/kvserver/allocator/mmaprototypehelpers/BUILD.bazel deleted file mode 100644 index 667c2a76c116..000000000000 --- a/pkg/kv/kvserver/allocator/mmaprototypehelpers/BUILD.bazel +++ /dev/null @@ -1,23 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") - -go_library( - name = "mmaprototypehelpers", - srcs = [ - "allocator_mma_integration.go", - "kvserver_mma_integration.go", - ], - importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers", - visibility = ["//visibility:public"], - deps = [ - "//pkg/kv/kvpb", - "//pkg/kv/kvserver/allocator", - "//pkg/kv/kvserver/allocator/mmaprototype", - "//pkg/kv/kvserver/allocator/storepool", - "//pkg/kv/kvserver/kvserverbase", - "//pkg/roachpb", - "//pkg/settings/cluster", - "//pkg/util/log", - "//pkg/util/syncutil", - "//pkg/util/timeutil", - ], -) diff --git a/pkg/kv/kvserver/allocator/mmaprototypehelpers/kvserver_mma_integration.go b/pkg/kv/kvserver/allocator/mmaprototypehelpers/kvserver_mma_integration.go deleted file mode 100644 index b4294e5522d8..000000000000 --- a/pkg/kv/kvserver/allocator/mmaprototypehelpers/kvserver_mma_integration.go +++ /dev/null @@ -1,382 +0,0 @@ -// Copyright 2025 The Cockroach Authors. -// -// Use of this software is governed by the CockroachDB Software License -// included in the /LICENSE file. - -package mmaprototypehelpers - -import ( - "context" - "fmt" - - "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" -) - -type SyncChangeID uint64 - -const InvalidSyncChangeID SyncChangeID = 0 - -func (id SyncChangeID) IsValid() bool { - return id != 0 -} - -type Author uint64 - -const ( - LeaseQueue Author = iota - ReplicateQueue - MMA -) - -func (s Author) External() bool { - switch s { - case LeaseQueue: - return true - case ReplicateQueue: - return true - case MMA: - return false - default: - panic("unknown Author") - } -} - -// The expected usage for the non-mma components is: -// changeIDs := allocatorSync.NonMMA(PreTransferLease|PreChangeReplicas)() -// // Actually apply the change, or when simulating, queue up the change to be -// // applied. -// allocatorSync.PostApply(changeIDs, success) - -// AllocatorSync updates the storepool.Storepool and the mmaprototype.Allocator to be in -// sync with eachother, notifying each other of changes generated by the other. -// -// AllocatorSync doesn't apply the changes itself because simulation (asim) -// applies changes without blocking the goroutine, while in production code we -// apply the changes synchronously on the same goroutine. -type AllocatorSync struct { - sp *storepool.StorePool - mmAllocator mmaprototype.Allocator - st *cluster.Settings - mu struct { - syncutil.Mutex - changeSeqGen SyncChangeID - trackedChanges map[SyncChangeID]trackedAllocatorChange - } -} - -func NewAllocatorSync( - sp *storepool.StorePool, mmAllocator mmaprototype.Allocator, st *cluster.Settings, -) *AllocatorSync { - as := &AllocatorSync{ - sp: sp, - mmAllocator: mmAllocator, - st: st, - } - as.mu.trackedChanges = make(map[SyncChangeID]trackedAllocatorChange) - return as -} - -// AllocatorChangeType is used to identify the type of change that is being -// made. -type AllocatorChangeType int - -// TODO(kvoli): These overlap with the operations in the plan pkg. We could do -// with just one set. -const ( - // AllocatorChangeTypeLeaseTransfer corresponds to TransferLease. - AllocatorChangeTypeLeaseTransfer AllocatorChangeType = iota - // AllocatorChangeTypeChangeReplicas corresponds to ChangeReplicas. - AllocatorChangeTypeChangeReplicas - // AllocatorChangeTypeRelocateRange corresponds to RelocateRange. - AllocatorChangeTypeRelocateRange -) - -type trackedAllocatorChange struct { - typ AllocatorChangeType - author Author - usage allocator.RangeUsageInfo - changeIDs []mmaprototype.ChangeID - - chgs kvpb.ReplicationChanges - transferFrom, transferTo roachpb.StoreID -} - -func (as *AllocatorSync) NonMMAPreTransferLease( - ctx context.Context, - desc *roachpb.RangeDescriptor, - usage allocator.RangeUsageInfo, - transferFrom, transferTo roachpb.ReplicationTarget, - author Author, -) SyncChangeID { - existingReplicas := make([]mmaprototype.StoreIDAndReplicaState, len(desc.InternalReplicas)) - for i, replica := range desc.Replicas().Descriptors() { - existingReplicas[i] = ReplicaDescriptorToReplicaIDAndType(replica, transferFrom.StoreID) - } - replicaChanges := mmaprototype.MakeLeaseTransferChanges(desc.RangeID, - existingReplicas, - UsageInfoToMMALoad(usage), - transferTo, - transferFrom, - ) - log.Infof(ctx, "registering external lease transfer change: usage=%v changes=%v", - usage, replicaChanges) - var changeIDs []mmaprototype.ChangeID - if kvserverbase.LoadBasedRebalancingMode.Get(&as.st.SV) == kvserverbase.LBRebalancingMultiMetric { - changeIDs = as.mmAllocator.RegisterExternalChanges(replicaChanges[:]) - if changeIDs == nil { - log.Info(ctx, "mma did not track lease transfer, skipping") - } - } - trackedChange := trackedAllocatorChange{ - typ: AllocatorChangeTypeLeaseTransfer, - usage: usage, - changeIDs: changeIDs, - transferFrom: transferFrom.StoreID, - transferTo: transferTo.StoreID, - author: author, - } - // We only track one of the changeIDs, since they are the same for both - // lease transfer. - as.mu.Lock() - defer as.mu.Unlock() - syncChangeID := as.newSyncChangeIDLocked() - as.mu.trackedChanges[syncChangeID] = trackedChange - return syncChangeID -} - -func (as *AllocatorSync) NonMMAPreChangeReplicas( - ctx context.Context, - desc *roachpb.RangeDescriptor, - usage allocator.RangeUsageInfo, - changes kvpb.ReplicationChanges, - leaseholder roachpb.StoreID, -) SyncChangeID { - rLoad := UsageInfoToMMALoad(usage) - replicaChanges := make([]mmaprototype.ReplicaChange, 0, len(changes)) - replicaSet := desc.Replicas() - - var lhBeingRemoved bool - for _, chg := range changes { - if chg.ChangeType == roachpb.REMOVE_VOTER || chg.ChangeType == roachpb.REMOVE_NON_VOTER { - filteredSet := replicaSet.Filter(func(r roachpb.ReplicaDescriptor) bool { - return r.StoreID == chg.Target.StoreID - }) - replDescriptors := filteredSet.Descriptors() - if len(replDescriptors) != 1 { - panic(fmt.Sprintf( - "no replica found for removal target=%v post-filter=%v pre-filter=%v", - chg.Target.StoreID, replDescriptors, desc)) - } - replDesc := replDescriptors[0] - lhBeingRemoved = replDesc.StoreID == leaseholder - replicaChanges = append(replicaChanges, mmaprototype.MakeRemoveReplicaChange( - desc.RangeID, rLoad, mmaprototype.ReplicaState{ - ReplicaIDAndType: mmaprototype.ReplicaIDAndType{ - ReplicaID: replDesc.ReplicaID, - ReplicaType: mmaprototype.ReplicaType{ - ReplicaType: replDesc.Type, - IsLeaseholder: lhBeingRemoved, - }, - }, - }, - chg.Target)) - } - } - - for _, chg := range changes { - if chg.ChangeType == roachpb.ADD_VOTER || - chg.ChangeType == roachpb.ADD_NON_VOTER { - rType := roachpb.VOTER_FULL - if chg.ChangeType == roachpb.ADD_NON_VOTER { - rType = roachpb.NON_VOTER - } - replicaChanges = append(replicaChanges, mmaprototype.MakeAddReplicaChange( - desc.RangeID, rLoad, mmaprototype.ReplicaState{ - ReplicaIDAndType: mmaprototype.ReplicaIDAndType{ - ReplicaType: mmaprototype.ReplicaType{ - ReplicaType: rType, - // TODO(sumeer): can there be multiple ADD_VOTERs? - IsLeaseholder: lhBeingRemoved && chg.ChangeType == roachpb.ADD_VOTER, - }, - }, - }, chg.Target)) - } else if chg.ChangeType == roachpb.REMOVE_VOTER || - chg.ChangeType == roachpb.REMOVE_NON_VOTER { - // Handled above. - continue - } else { - panic("unimplemented change type") - } - } - - log.Infof(ctx, "registering external replica change: chgs=%v usage=%v changes=%v", - changes, usage, replicaChanges) - var changeIDs []mmaprototype.ChangeID - if kvserverbase.LoadBasedRebalancingMode.Get(&as.st.SV) == kvserverbase.LBRebalancingMultiMetric { - changeIDs = as.mmAllocator.RegisterExternalChanges(replicaChanges) - if changeIDs == nil { - log.Info(ctx, "cluster does not have a range for the external replica change, skipping") - } - } - trackedChange := trackedAllocatorChange{ - typ: AllocatorChangeTypeChangeReplicas, - usage: usage, - changeIDs: changeIDs, - chgs: changes, - author: ReplicateQueue, - } - log.Infof(ctx, "registered external replica change: chgs=%v change_ids=%v", - changes, changeIDs) - - as.mu.Lock() - defer as.mu.Unlock() - syncChangeID := as.newSyncChangeIDLocked() - as.mu.trackedChanges[syncChangeID] = trackedChange - return syncChangeID -} - -func (as *AllocatorSync) newSyncChangeIDLocked() SyncChangeID { - as.mu.changeSeqGen += 1 - return as.mu.changeSeqGen -} - -func (as *AllocatorSync) NonMMAPreRelocateRange( - desc *roachpb.RangeDescriptor, - usage allocator.RangeUsageInfo, - voterTargets, nonVoterTargets []roachpb.ReplicationTarget, -) []mmaprototype.ChangeID { - // TODO(sumeer): implement this, since it is needed for the old store - // rebalancer to work. - panic("unimplemented") -} - -// MMAPreApply is called before mma generated changes are applied to the -// cluster. -func (as *AllocatorSync) MMAPreApply( - ctx context.Context, - usage allocator.RangeUsageInfo, - pendingChange mmaprototype.PendingRangeChange, -) SyncChangeID { - var trackedChange trackedAllocatorChange - trackedChange.author = MMA - trackedChange.usage = usage - trackedChange.changeIDs = pendingChange.ChangeIDs() - if pendingChange.IsTransferLease() { - trackedChange.typ = AllocatorChangeTypeLeaseTransfer - trackedChange.transferTo = pendingChange.LeaseTransferTarget() - trackedChange.transferFrom = pendingChange.LeaseTransferFrom() - as.mmAllocator.Metrics().MMARegisterLeaseSuccess.Inc(1) - } else if pendingChange.IsChangeReplicas() { - trackedChange.typ = AllocatorChangeTypeChangeReplicas - trackedChange.chgs = pendingChange.ReplicationChanges() - as.mmAllocator.Metrics().MMARegisterRebalanceSuccess.Inc(1) - } else { - panic("unexpected change type") - } - as.mu.Lock() - defer as.mu.Unlock() - syncChangeID := as.newSyncChangeIDLocked() - as.mu.trackedChanges[syncChangeID] = trackedChange - return syncChangeID -} - -func (as *AllocatorSync) updateMetrics( - success bool, actionType AllocatorChangeType, executorOfChange Author, -) { - switch executorOfChange { - case LeaseQueue: - if success { - as.mmAllocator.Metrics().ExternalLeaseTransferSuccess.Inc(1) - } else { - as.mmAllocator.Metrics().ExternalLeaseTransferFailure.Inc(1) - } - case ReplicateQueue: - if success { - as.mmAllocator.Metrics().ExternalReplicaRebalanceSuccess.Inc(1) - } else { - as.mmAllocator.Metrics().ExternalReplicaRebalanceFailure.Inc(1) - } - case MMA: - if success { - switch actionType { - case AllocatorChangeTypeChangeReplicas: - as.mmAllocator.Metrics().MMAReplicaRebalanceSuccess.Inc(1) - case AllocatorChangeTypeLeaseTransfer: - as.mmAllocator.Metrics().MMALeaseTransferSuccess.Inc(1) - case AllocatorChangeTypeRelocateRange: - panic("unimplemented") - } - } else { - switch actionType { - case AllocatorChangeTypeChangeReplicas: - as.mmAllocator.Metrics().MMAReplicaRebalanceFailure.Inc(1) - case AllocatorChangeTypeLeaseTransfer: - as.mmAllocator.Metrics().MMALeaseTransferFailure.Inc(1) - case AllocatorChangeTypeRelocateRange: - panic("unimplemented") - } - } - default: - panic("unknown author for PostApply") - } -} - -// PostApply is called after changes have been applied to the cluster, by both -// the old allocator components (lease queue, replicate queue and store -// rebalancer), as well as the new mmaprototype.Allocator. -func (as *AllocatorSync) PostApply(ctx context.Context, syncChangeID SyncChangeID, success bool) { - if as == nil { - return - } - var tracked trackedAllocatorChange - func() { - as.mu.Lock() - defer as.mu.Unlock() - var ok bool - tracked, ok = as.mu.trackedChanges[syncChangeID] - if !ok { - panic("PostApply called with unknown SyncChangeID") - } - delete(as.mu.trackedChanges, syncChangeID) - }() - if kvserverbase.LoadBasedRebalancingMode.Get(&as.st.SV) == kvserverbase.LBRebalancingMultiMetric { - if changeIDs := tracked.changeIDs; changeIDs != nil { - log.Infof(ctx, "PostApply: tracked=%v change_ids=%v success: %v", tracked, changeIDs, success) - as.updateMetrics(success, tracked.typ, tracked.author) - as.mmAllocator.AdjustPendingChangesDisposition(changeIDs, success) - } else { - log.Infof(ctx, "PostApply: tracked=%v no change_ids success: %v", tracked, success) - } - } - as.updateMetrics(success, tracked.typ, tracked.author) - if !success { - return - } - switch tracked.typ { - case AllocatorChangeTypeLeaseTransfer: - as.sp.UpdateLocalStoresAfterLeaseTransfer(tracked.transferFrom, - tracked.transferTo, tracked.usage) - case AllocatorChangeTypeChangeReplicas: - for _, chg := range tracked.chgs { - as.sp.UpdateLocalStoreAfterRebalance( - chg.Target.StoreID, tracked.usage, chg.ChangeType) - } - case AllocatorChangeTypeRelocateRange: - // TODO(kvoli): We don't need to implement this until later, as only one - // store rebalancer will run at a time and only the old store rebalancer - // issues relocate range commands. - // - // TODO(sumeer): We should implement it, and make all changes flow through - // AllocatorSync, even when the mma.Allocator is not used, since that - // simplifies the code. - panic("unimplemented") - } -} diff --git a/pkg/kv/kvserver/asim/mmaintegration/BUILD.bazel b/pkg/kv/kvserver/asim/mmaintegration/BUILD.bazel index 3f5345efaa3f..5a15f42f897b 100644 --- a/pkg/kv/kvserver/asim/mmaintegration/BUILD.bazel +++ b/pkg/kv/kvserver/asim/mmaintegration/BUILD.bazel @@ -11,11 +11,11 @@ go_library( deps = [ "//pkg/kv/kvserver/allocator", "//pkg/kv/kvserver/allocator/mmaprototype", - "//pkg/kv/kvserver/allocator/mmaprototypehelpers", "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/op", "//pkg/kv/kvserver/asim/state", "//pkg/kv/kvserver/kvserverbase", + "//pkg/kv/kvserver/mmaintegration", "//pkg/roachpb", "//pkg/util/log", "@com_github_cockroachdb_logtags//:logtags", diff --git a/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go b/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go index b1f668fb453f..f58f2ceb487e 100644 --- a/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go +++ b/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go @@ -12,11 +12,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/logtags" @@ -29,7 +29,7 @@ type MMAStoreRebalancer struct { localStoreID state.StoreID controller op.Controller allocator mmaprototype.Allocator - as *mmaprototypehelpers.AllocatorSync + as *mmaintegration.AllocatorSync settings *config.SimulationSettings // lastRebalanceTime is the last time allocator.ComputeChanges was called. @@ -58,7 +58,7 @@ type MMAStoreRebalancer struct { type pendingChangeAndRangeUsageInfo struct { change mmaprototype.PendingRangeChange usage allocator.RangeUsageInfo - syncChangeID mmaprototypehelpers.SyncChangeID + syncChangeID mmaintegration.SyncChangeID } // NewMMAStoreRebalancer creates a new MMAStoreRebalancer. @@ -66,7 +66,7 @@ func NewMMAStoreRebalancer( localStoreID state.StoreID, localNodeID state.NodeID, allocator mmaprototype.Allocator, - as *mmaprototypehelpers.AllocatorSync, + as *mmaintegration.AllocatorSync, controller op.Controller, settings *config.SimulationSettings, ) *MMAStoreRebalancer { @@ -137,7 +137,7 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state } else { log.VInfof(ctx, 1, "operation for pendingChange=%v completed successfully", curChange) } - msr.as.PostApply(ctx, curChange.syncChangeID, success) + msr.as.PostApply(curChange.syncChangeID, success) msr.pendingChangeIdx++ } else { log.VInfof(ctx, 1, "operation for pendingChange=%v is still in progress", curChange) @@ -204,7 +204,7 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state } log.VInfof(ctx, 1, "dispatching operation for pendingChange=%v", curChange) msr.pendingChanges[msr.pendingChangeIdx].syncChangeID = - msr.as.MMAPreApply(ctx, curChange.usage, curChange.change) + msr.as.MMAPreApply(curChange.usage, curChange.change) msr.pendingTicket = msr.controller.Dispatch(ctx, tick, s, curOp) } } diff --git a/pkg/kv/kvserver/asim/queue/BUILD.bazel b/pkg/kv/kvserver/asim/queue/BUILD.bazel index 106ff69da87b..b94bf2b516c3 100644 --- a/pkg/kv/kvserver/asim/queue/BUILD.bazel +++ b/pkg/kv/kvserver/asim/queue/BUILD.bazel @@ -16,7 +16,6 @@ go_library( "//pkg/kv/kvpb", "//pkg/kv/kvserver/allocator", "//pkg/kv/kvserver/allocator/allocatorimpl", - "//pkg/kv/kvserver/allocator/mmaprototypehelpers", "//pkg/kv/kvserver/allocator/plan", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/asim/config", @@ -24,6 +23,7 @@ go_library( "//pkg/kv/kvserver/constraint", "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/mmaintegration", "//pkg/raft", "//pkg/roachpb", "//pkg/util/hlc", diff --git a/pkg/kv/kvserver/asim/queue/lease_queue.go b/pkg/kv/kvserver/asim/queue/lease_queue.go index c8995d17f1fd..57c10024b4b7 100644 --- a/pkg/kv/kvserver/asim/queue/lease_queue.go +++ b/pkg/kv/kvserver/asim/queue/lease_queue.go @@ -12,11 +12,11 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -29,8 +29,8 @@ type leaseQueue struct { planner plan.ReplicationPlanner clock *hlc.Clock settings *config.SimulationSettings - as *mmaprototypehelpers.AllocatorSync - lastSyncChangeID mmaprototypehelpers.SyncChangeID + as *mmaintegration.AllocatorSync + lastSyncChangeID mmaintegration.SyncChangeID } // NewLeaseQueue returns a new lease queue. @@ -40,7 +40,7 @@ func NewLeaseQueue( stateChanger state.Changer, settings *config.SimulationSettings, allocator allocatorimpl.Allocator, - allocatorSync *mmaprototypehelpers.AllocatorSync, + allocatorSync *mmaintegration.AllocatorSync, storePool storepool.AllocatorStorePool, start time.Time, ) RangeQueue { @@ -121,8 +121,8 @@ func (lq *leaseQueue) Tick(ctx context.Context, tick time.Time, s state.State) { } if !tick.Before(lq.next) && lq.lastSyncChangeID.IsValid() { - lq.as.PostApply(ctx, lq.lastSyncChangeID, true /* success */) - lq.lastSyncChangeID = mmaprototypehelpers.InvalidSyncChangeID + lq.as.PostApply(lq.lastSyncChangeID, true /* success */) + lq.lastSyncChangeID = mmaintegration.InvalidSyncChangeID } for !tick.Before(lq.next) && lq.priorityQueue.Len() != 0 { diff --git a/pkg/kv/kvserver/asim/queue/replicate_queue.go b/pkg/kv/kvserver/asim/queue/replicate_queue.go index 024498f0ee8a..c654bfa3d8b8 100644 --- a/pkg/kv/kvserver/asim/queue/replicate_queue.go +++ b/pkg/kv/kvserver/asim/queue/replicate_queue.go @@ -12,11 +12,11 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -27,8 +27,8 @@ type replicateQueue struct { planner plan.ReplicationPlanner clock *hlc.Clock settings *config.SimulationSettings - as *mmaprototypehelpers.AllocatorSync - lastSyncChangeID mmaprototypehelpers.SyncChangeID + as *mmaintegration.AllocatorSync + lastSyncChangeID mmaintegration.SyncChangeID } // NewReplicateQueue returns a new replicate queue. @@ -38,7 +38,7 @@ func NewReplicateQueue( stateChanger state.Changer, settings *config.SimulationSettings, allocator allocatorimpl.Allocator, - allocatorSync *mmaprototypehelpers.AllocatorSync, + allocatorSync *mmaintegration.AllocatorSync, storePool storepool.AllocatorStorePool, start time.Time, ) RangeQueue { @@ -120,8 +120,8 @@ func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.Stat } if !tick.Before(rq.next) && rq.lastSyncChangeID.IsValid() { - rq.as.PostApply(ctx, rq.lastSyncChangeID, true /* success */) - rq.lastSyncChangeID = mmaprototypehelpers.InvalidSyncChangeID + rq.as.PostApply(rq.lastSyncChangeID, true /* success */) + rq.lastSyncChangeID = mmaintegration.InvalidSyncChangeID } for !tick.Before(rq.next) && rq.priorityQueue.Len() != 0 { @@ -172,28 +172,26 @@ func pushReplicateChange( tick time.Time, delayFn func(int64, bool) time.Duration, stateChanger state.Changer, - as *mmaprototypehelpers.AllocatorSync, + as *mmaintegration.AllocatorSync, queueName string, -) (time.Time, mmaprototypehelpers.SyncChangeID) { +) (time.Time, mmaintegration.SyncChangeID) { var stateChange state.Change - var changeID mmaprototypehelpers.SyncChangeID + var changeID mmaintegration.SyncChangeID next := tick switch op := change.Op.(type) { case plan.AllocationNoop: // Nothing to do. - return next, mmaprototypehelpers.InvalidSyncChangeID + return next, mmaintegration.InvalidSyncChangeID case plan.AllocationFinalizeAtomicReplicationOp: panic("unimplemented finalize atomic replication op") case plan.AllocationTransferLeaseOp: if as != nil { // as may be nil in some tests. changeID = as.NonMMAPreTransferLease( - ctx, repl.Desc(), repl.RangeUsageInfo(), op.Source, op.Target, - mmaprototypehelpers.ReplicateQueue, ) } stateChange = &state.LeaseTransferChange{ @@ -206,7 +204,6 @@ func pushReplicateChange( if as != nil { // as may be nil in some tests. changeID = as.NonMMAPreChangeReplicas( - ctx, repl.Desc(), repl.RangeUsageInfo(), op.Chgs, @@ -229,8 +226,8 @@ func pushReplicateChange( next = completeAt } else { log.VEventf(ctx, 1, "pushing state change failed") - as.PostApply(ctx, changeID, false /* success */) - changeID = mmaprototypehelpers.InvalidSyncChangeID + as.PostApply(changeID, false /* success */) + changeID = mmaintegration.InvalidSyncChangeID } return next, changeID } diff --git a/pkg/kv/kvserver/asim/state/BUILD.bazel b/pkg/kv/kvserver/asim/state/BUILD.bazel index c212999df69c..33306b1373fd 100644 --- a/pkg/kv/kvserver/asim/state/BUILD.bazel +++ b/pkg/kv/kvserver/asim/state/BUILD.bazel @@ -26,13 +26,13 @@ go_library( "//pkg/kv/kvserver/allocator", "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/allocator/mmaprototype", - "//pkg/kv/kvserver/allocator/mmaprototypehelpers", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/workload", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/load", + "//pkg/kv/kvserver/mmaintegration", "//pkg/kv/kvserver/split", "//pkg/raft", "//pkg/raft/raftpb", diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index 9735364cae39..6d0ab61bdf87 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -23,12 +23,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/raft/tracker" @@ -433,7 +433,7 @@ func (s *state) AddNode() Node { stores: []StoreID{}, mmAllocator: mmAllocator, storepool: sp, - as: mmaprototypehelpers.NewAllocatorSync(sp, mmAllocator, s.settings.ST), + as: mmaintegration.NewAllocatorSync(sp, mmAllocator, s.settings.ST), } s.nodes[nodeID] = node s.SetNodeLiveness(nodeID, livenesspb.NodeLivenessStatus_LIVE) @@ -1147,7 +1147,7 @@ func (s *state) UpdateStorePool( copiedDesc := *copiedDetail.Desc // TODO(mma): Support origin timestamps. ts := s.clock.Now() - storeLoadMsg := mmaprototypehelpers.MakeStoreLoadMsg(copiedDesc, ts.UnixNano()) + storeLoadMsg := mmaintegration.MakeStoreLoadMsg(copiedDesc, ts.UnixNano()) node.mmAllocator.SetStore(StoreAttrAndLocFromDesc(copiedDesc)) ctx := logtags.AddTag(context.Background(), fmt.Sprintf("n%d", nodeID), "") ctx = logtags.AddTag(ctx, "t", ts.Sub(s.settings.StartTime)) @@ -1434,7 +1434,7 @@ type node struct { stores []StoreID storepool *storepool.StorePool mmAllocator mmaprototype.Allocator - as *mmaprototypehelpers.AllocatorSync + as *mmaintegration.AllocatorSync } // NodeID returns the ID of this node. @@ -1456,7 +1456,7 @@ func (n *node) MMAllocator() mmaprototype.Allocator { return n.mmAllocator } -func (n *node) AllocatorSync() *mmaprototypehelpers.AllocatorSync { +func (n *node) AllocatorSync() *mmaintegration.AllocatorSync { return n.as } diff --git a/pkg/kv/kvserver/asim/state/state.go b/pkg/kv/kvserver/asim/state/state.go index 984a1d42c898..ed3e805a754a 100644 --- a/pkg/kv/kvserver/asim/state/state.go +++ b/pkg/kv/kvserver/asim/state/state.go @@ -13,10 +13,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -222,7 +222,7 @@ type Node interface { // TODO(wenyihu6): use this in mma store rebalancer MMAllocator() mmaprototype.Allocator // AllocatorSync returns the AllocatorSync for this node. - AllocatorSync() *mmaprototypehelpers.AllocatorSync + AllocatorSync() *mmaintegration.AllocatorSync } // Store is a container for replicas. diff --git a/pkg/kv/kvserver/lease_queue.go b/pkg/kv/kvserver/lease_queue.go index 20de28dc7e74..0b0b8b97b516 100644 --- a/pkg/kv/kvserver/lease_queue.go +++ b/pkg/kv/kvserver/lease_queue.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -61,6 +62,7 @@ var MinIOOverloadLeaseShedInterval = settings.RegisterDurationSetting( type leaseQueue struct { planner plan.ReplicationPlanner allocator allocatorimpl.Allocator + as *mmaintegration.AllocatorSync storePool storepool.AllocatorStorePool purgCh <-chan time.Time lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines @@ -77,6 +79,7 @@ func newLeaseQueue(store *Store, allocator allocatorimpl.Allocator) *leaseQueue lq := &leaseQueue{ planner: plan.NewLeasePlanner(allocator, storePool), allocator: allocator, + as: store.cfg.AllocatorSync, storePool: storePool, purgCh: time.NewTicker(leaseQueuePurgatoryCheckInterval).C, } @@ -136,14 +139,20 @@ func (lq *leaseQueue) process( lease, _ := repl.GetLease() log.KvDistribution.Infof(ctx, "transferring lease to s%d usage=%v, lease=[%v type=%v]", transferOp.Target, transferOp.Usage, lease, lease.Type()) lq.lastLeaseTransfer.Store(timeutil.Now()) - if err := repl.AdminTransferLease(ctx, transferOp.Target.StoreID, false /* bypassSafetyChecks */); err != nil { + changeID := lq.as.NonMMAPreTransferLease( + desc, + transferOp.Usage, + transferOp.Source, + transferOp.Target, + ) + err = repl.AdminTransferLease(ctx, transferOp.Target.StoreID, false /* bypassSafetyChecks */) + // Inform allocator sync that the change has been applied which applies + // changes to store pool and inform mma. + lq.as.PostApply(changeID, err == nil /*success*/) + if err != nil { return false, errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, transferOp.Target) } - - lq.storePool.UpdateLocalStoresAfterLeaseTransfer( - transferOp.Source.StoreID, transferOp.Target.StoreID, transferOp.Usage) } - return true, nil } diff --git a/pkg/kv/kvserver/mma_store_rebalancer.go b/pkg/kv/kvserver/mma_store_rebalancer.go index e4a41ef07f6d..37cd90bf7626 100644 --- a/pkg/kv/kvserver/mma_store_rebalancer.go +++ b/pkg/kv/kvserver/mma_store_rebalancer.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -23,6 +24,7 @@ import ( ) type replicaToApplyChanges interface { + RangeUsageInfo() allocator.RangeUsageInfo AdminTransferLease(ctx context.Context, target roachpb.StoreID, bypassSafetyChecks bool) error changeReplicasImpl( ctx context.Context, @@ -47,7 +49,7 @@ type mmaStoreRebalancer struct { mma mmaprototype.Allocator st *cluster.Settings sp *storepool.StorePool - // TODO(wenyihu6): add allocator sync + as *mmaintegration.AllocatorSync } func newMMAStoreRebalancer( @@ -58,6 +60,7 @@ func newMMAStoreRebalancer( mma: mma, st: st, sp: sp, + as: s.cfg.AllocatorSync, } } @@ -151,15 +154,23 @@ func (m *mmaStoreRebalancer) applyChange( ) error { repl := m.store.GetReplicaIfExists(change.RangeID) if repl == nil { + m.as.MarkChangesAsFailed(change.ChangeIDs()) return errors.Errorf("replica not found for range %d", change.RangeID) } - if change.IsTransferLease() { - return m.applyLeaseTransfer(ctx, repl, change) - } else if change.IsChangeReplicas() { - return m.applyReplicaChanges(ctx, repl, change) + changeID := m.as.MMAPreApply(repl.RangeUsageInfo(), change) + var err error + switch { + case change.IsTransferLease(): + err = m.applyLeaseTransfer(ctx, repl, change) + case change.IsChangeReplicas(): + err = m.applyReplicaChanges(ctx, repl, change) + default: + return errors.Errorf("unknown change type for range %d", change.RangeID) } - - return errors.Errorf("unknown change type for range %d", change.RangeID) + // Inform allocator sync that the change has been applied which applies + // changes to store pool and inform mma. + m.as.PostApply(changeID, err == nil /*success*/) + return err } // applyLeaseTransfer applies a lease transfer change. diff --git a/pkg/kv/kvserver/mmaintegration/BUILD.bazel b/pkg/kv/kvserver/mmaintegration/BUILD.bazel new file mode 100644 index 000000000000..6b14abb007f8 --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/BUILD.bazel @@ -0,0 +1,45 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "mmaintegration", + srcs = [ + "allocator_op.go", + "allocator_sync.go", + "mma_conversion.go", + "store_load_msg.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvpb", + "//pkg/kv/kvserver/allocator", + "//pkg/kv/kvserver/allocator/mmaprototype", + "//pkg/kv/kvserver/kvserverbase", + "//pkg/roachpb", + "//pkg/settings/cluster", + "//pkg/util/buildutil", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + ], +) + +go_test( + name = "mmaintegration_test", + srcs = [ + "datadriven_mmaintegration_test.go", + "datadriven_storeload_test.go", + ], + data = glob(["testdata/**"]), + embed = [":mmaintegration"], + deps = [ + "//pkg/kv/kvpb", + "//pkg/kv/kvserver/allocator", + "//pkg/kv/kvserver/allocator/mmaprototype", + "//pkg/kv/kvserver/kvserverbase", + "//pkg/roachpb", + "//pkg/settings/cluster", + "//pkg/testutils/datapathutils", + "//pkg/util/leaktest", + "@com_github_cockroachdb_datadriven//:datadriven", + ], +) diff --git a/pkg/kv/kvserver/mmaintegration/allocator_op.go b/pkg/kv/kvserver/mmaintegration/allocator_op.go new file mode 100644 index 000000000000..3e2189358ff0 --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/allocator_op.go @@ -0,0 +1,40 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package mmaintegration + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// trackedAllocatorChange represents a change registered with AllocatorSync +// (e.g. lease transfer or change replicas). +type trackedAllocatorChange struct { + // changeIDs are the change IDs that are registered with mma. Nil if mma is + // disabled or the change cannot be registered with mma. If changeIDs is + // nil, PostApply does not need to inform mma. Otherwise, PostApply should + // inform mma by passing changeIDs to AdjustPendingChangesDisposition. + changeIDs []mmaprototype.ChangeID + // Usage is range load usage. + usage allocator.RangeUsageInfo + // Exactly one of the following two fields will be set. + leaseTransferOp *leaseTransferOp + changeReplicasOp *changeReplicasOp +} + +// leaseTransferOp represents a lease transfer operation. +type leaseTransferOp struct { + transferFrom, transferTo roachpb.StoreID +} + +// changeReplicasOp represents a change replicas operation. +type changeReplicasOp struct { + // chgs is the replication changes that are applied to the range. len(chgs) + // may be = [1,4]. + chgs kvpb.ReplicationChanges +} diff --git a/pkg/kv/kvserver/mmaintegration/allocator_sync.go b/pkg/kv/kvserver/mmaintegration/allocator_sync.go new file mode 100644 index 000000000000..c28f077e9344 --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/allocator_sync.go @@ -0,0 +1,232 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package mmaintegration + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// InvalidSyncChangeID is a sentinel value for an invalid sync change ID. It is +// used only for asim to indicate that a change is not in progress. +const InvalidSyncChangeID SyncChangeID = 0 + +func (id SyncChangeID) IsValid() bool { + return id != 0 +} + +type SyncChangeID uint64 + +// storePool is an interface that defines the methods that the allocator sync +// needs to call on the store pool. Using an interface to simplify testing. +type storePool interface { + // UpdateLocalStoresAfterLeaseTransfer is called by the allocator sync to + // update the store pool after a lease transfer operation. + UpdateLocalStoresAfterLeaseTransfer(transferFrom, transferTo roachpb.StoreID, usage allocator.RangeUsageInfo) + // UpdateLocalStoreAfterRebalance is called by the allocator sync to update + // the store pool after a rebalance operation. + UpdateLocalStoreAfterRebalance(storeID roachpb.StoreID, rangeUsageInfo allocator.RangeUsageInfo, changeType roachpb.ReplicaChangeType) +} + +// mmaAllocator is an interface that defines the methods that the allocator sync +// needs to call on the mma. Using an interface to simplify testing. +type mmaAllocator interface { + // RegisterExternalChanges is called by the allocator sync to register + // external changes with the mma. + RegisterExternalChanges(changes []mmaprototype.ReplicaChange) []mmaprototype.ChangeID + // AdjustPendingChangesDisposition is called by the allocator sync to adjust + // the disposition of pending changes. + AdjustPendingChangesDisposition(changeIDs []mmaprototype.ChangeID, success bool) +} + +// TODO(wenyihu6): make sure allocator sync can tolerate cluster setting +// changes not happening consistently or atomically across components. (For +// example, replicate queue may call into allocator sync when mma is enabled but +// has been disabled postapply or other components call into allocator sync when +// mma is disabled.) + +// AllocatorSync is a component that coordinates changes from all components +// (including mma/replicate/lease queue) with mma and store pool. When mma is +// disabled, its sole purpose is to track and apply changes to the store pool +// upon success. +type AllocatorSync struct { + sp storePool + st *cluster.Settings + mmaAllocator mmaAllocator + mu struct { + syncutil.Mutex + // changeSeqGen is a monotonically increasing sequence number for + // tracked changes. + changeSeqGen SyncChangeID + // trackedChanges is a map of tracked changes. Added right before + // trackedAllocatorChange is being applied, deleted when it has been + // applied. SyncChangeID is used as an identifier for the replica + // changes. + trackedChanges map[SyncChangeID]trackedAllocatorChange + } +} + +func NewAllocatorSync( + sp storePool, mmaAllocator mmaAllocator, st *cluster.Settings, +) *AllocatorSync { + as := &AllocatorSync{ + sp: sp, + st: st, + mmaAllocator: mmaAllocator, + } + as.mu.trackedChanges = make(map[SyncChangeID]trackedAllocatorChange) + return as +} + +// mmaRangeLoad converts range load usage to mma range load. +// +// TODO(wenyihu6): This is bit redundant to mmaRangeLoad in kvserver. See if we +// can refactor to use the same helper function. +func mmaRangeLoad(rangeUsageInfo allocator.RangeUsageInfo) mmaprototype.RangeLoad { + var rl mmaprototype.RangeLoad + rl.Load[mmaprototype.CPURate] = mmaprototype.LoadValue( + rangeUsageInfo.RequestCPUNanosPerSecond + rangeUsageInfo.RaftCPUNanosPerSecond) + rl.RaftCPU = mmaprototype.LoadValue(rangeUsageInfo.RaftCPUNanosPerSecond) + rl.Load[mmaprototype.WriteBandwidth] = mmaprototype.LoadValue(rangeUsageInfo.WriteBytesPerSecond) + // Note that LogicalBytes is already populated as enginepb.MVCCStats.Total() + // in repl.RangeUsageInfo(). + rl.Load[mmaprototype.ByteSize] = mmaprototype.LoadValue(rangeUsageInfo.LogicalBytes) + return rl +} + +// addTrackedChange adds a tracked change to the allocator sync. +func (as *AllocatorSync) addTrackedChange(change trackedAllocatorChange) SyncChangeID { + as.mu.Lock() + defer as.mu.Unlock() + as.mu.changeSeqGen++ + syncChangeID := as.mu.changeSeqGen + as.mu.trackedChanges[syncChangeID] = change + return syncChangeID +} + +// getTrackedChange gets a tracked change from the allocator sync. It deletes +// the change from the map. +func (as *AllocatorSync) getTrackedChange(syncChangeID SyncChangeID) trackedAllocatorChange { + as.mu.Lock() + defer as.mu.Unlock() + change, ok := as.mu.trackedChanges[syncChangeID] + if !ok { + panic("AllocatorSync: change not found") + } + delete(as.mu.trackedChanges, syncChangeID) + return change +} + +// NonMMAPreTransferLease is called by the lease/replicate queue to register a +// transfer operation. SyncChangeID is returned to the caller. It is an +// identifier that can be used to call PostApply to apply the change to the +// store pool upon success. +func (as *AllocatorSync) NonMMAPreTransferLease( + desc *roachpb.RangeDescriptor, + usage allocator.RangeUsageInfo, + transferFrom, transferTo roachpb.ReplicationTarget, +) SyncChangeID { + var changeIDs []mmaprototype.ChangeID + if kvserverbase.LoadBasedRebalancingMode.Get(&as.st.SV) == kvserverbase.LBRebalancingMultiMetric { + changeIDs = as.mmaAllocator.RegisterExternalChanges(convertLeaseTransferToMMA(desc, usage, transferFrom, transferTo)) + } + trackedChange := trackedAllocatorChange{ + changeIDs: changeIDs, + usage: usage, + leaseTransferOp: &leaseTransferOp{ + transferFrom: transferFrom.StoreID, + transferTo: transferTo.StoreID, + }, + } + return as.addTrackedChange(trackedChange) +} + +// NonMMAPreChangeReplicas is called by the replicate queue to register a +// change replicas operation. SyncChangeID is returned to the caller. It is an +// identifier that can be used to call PostApply to apply the change to the +// store pool upon success. +func (as *AllocatorSync) NonMMAPreChangeReplicas( + desc *roachpb.RangeDescriptor, + usage allocator.RangeUsageInfo, + changes kvpb.ReplicationChanges, + leaseholderStoreID roachpb.StoreID, +) SyncChangeID { + var changeIDs []mmaprototype.ChangeID + if kvserverbase.LoadBasedRebalancingMode.Get(&as.st.SV) == kvserverbase.LBRebalancingMultiMetric { + changeIDs = as.mmaAllocator.RegisterExternalChanges(convertReplicaChangeToMMA(desc, usage, changes, leaseholderStoreID)) + } + trackedChange := trackedAllocatorChange{ + changeIDs: changeIDs, + usage: usage, + changeReplicasOp: &changeReplicasOp{ + chgs: changes, + }, + } + return as.addTrackedChange(trackedChange) +} + +// MMAPreApply is called by the mma to register a change with AllocatorSync. +// It is called before the change is applied. SyncChangeID is returned to the +// caller. It is an identifier that can be used to call PostApply to apply the +// change to the store pool upon success. +func (as *AllocatorSync) MMAPreApply( + usage allocator.RangeUsageInfo, pendingChange mmaprototype.PendingRangeChange, +) SyncChangeID { + trackedChange := trackedAllocatorChange{ + changeIDs: pendingChange.ChangeIDs(), + usage: usage, + } + switch { + case pendingChange.IsTransferLease(): + trackedChange.leaseTransferOp = &leaseTransferOp{ + transferFrom: pendingChange.LeaseTransferFrom(), + transferTo: pendingChange.LeaseTransferTarget(), + } + case pendingChange.IsChangeReplicas(): + trackedChange.changeReplicasOp = &changeReplicasOp{ + chgs: pendingChange.ReplicationChanges(), + } + default: + panic("unexpected change type") + } + return as.addTrackedChange(trackedChange) +} + +// MarkChangesAsFailed marks the given change IDs as failed without going +// through allocator sync. This is used when mma changes fail before even +// registering with mma via MMAPreApply. +func (as *AllocatorSync) MarkChangesAsFailed(changeIDs []mmaprototype.ChangeID) { + as.mmaAllocator.AdjustPendingChangesDisposition(changeIDs, false /* success */) +} + +// PostApply is called by the lease/replicate queue to apply a change to the +// store pool upon success. It is called with the SyncChangeID returned by +// NonMMAPreTransferLease or NonMMAPreChangeReplicas. +func (as *AllocatorSync) PostApply(syncChangeID SyncChangeID, success bool) { + trackedChange := as.getTrackedChange(syncChangeID) + if changeIDs := trackedChange.changeIDs; changeIDs != nil { + // Call into without checking cluster setting. + as.mmaAllocator.AdjustPendingChangesDisposition(changeIDs, success) + } + if !success { + return + } + switch { + case trackedChange.leaseTransferOp != nil: + as.sp.UpdateLocalStoresAfterLeaseTransfer(trackedChange.leaseTransferOp.transferFrom, + trackedChange.leaseTransferOp.transferTo, trackedChange.usage) + case trackedChange.changeReplicasOp != nil: + for _, chg := range trackedChange.changeReplicasOp.chgs { + as.sp.UpdateLocalStoreAfterRebalance( + chg.Target.StoreID, trackedChange.usage, chg.ChangeType) + } + } +} diff --git a/pkg/kv/kvserver/mmaintegration/datadriven_mmaintegration_test.go b/pkg/kv/kvserver/mmaintegration/datadriven_mmaintegration_test.go new file mode 100644 index 000000000000..9bfad67b70dc --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/datadriven_mmaintegration_test.go @@ -0,0 +1,625 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package mmaintegration + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/datadriven" +) + +// changeState represents the state of a registered change. +type changeState int + +const rangeID = 1 + +var rangeState = map[roachpb.RangeID]struct { + leaseholder roachpb.StoreID + replicas []roachpb.ReplicaDescriptor + usage allocator.RangeUsageInfo +}{ + rangeID: { + leaseholder: 1, + replicas: []roachpb.ReplicaDescriptor{ + {StoreID: 1, NodeID: 1, ReplicaID: 1, Type: roachpb.VOTER_FULL}, + {StoreID: 2, NodeID: 2, ReplicaID: 2, Type: roachpb.NON_VOTER}, + }, + usage: allocator.RangeUsageInfo{ + RequestCPUNanosPerSecond: 100, + RaftCPUNanosPerSecond: 50, + WriteBytesPerSecond: 1024, + LogicalBytes: 2048, + }, + }, +} + +const ( + // changeRegistered indicates the change has been registered but not enacted. + changeRegistered changeState = iota + // changeSucceeded indicates the change was successfully enacted. + changeSucceeded + // changeFailed indicates the change failed to enact. + changeFailed +) + +// mockMMAAllocator implements the mmaAllocator interface for testing. +type mockMMAAllocator struct { + changeSeqGen mmaprototype.ChangeID + // tracks registered changes and their state + changes map[mmaprototype.ChangeID]changeState +} + +func printRangeState(rangeID roachpb.RangeID) string { + if _, ok := rangeState[rangeID]; !ok { + return fmt.Sprintf("range %d not found", rangeID) + } + var sb strings.Builder + _, _ = fmt.Fprintf(&sb, "range_id=%d", rangeID) + _, _ = fmt.Fprintf(&sb, " [replicas:{") + for i, replica := range rangeState[rangeID].replicas { + if i > 0 { + _, _ = fmt.Fprintf(&sb, ",") + } + var isLeaseholder string + if replica.StoreID == rangeState[rangeID].leaseholder { + isLeaseholder = "*" + } + var replicaType string + if replica.Type == roachpb.VOTER_FULL { + replicaType = "voter" + } else if replica.Type == roachpb.NON_VOTER { + replicaType = "non-voter" + } else { + panic(fmt.Sprintf("unknown replica type: %s", replica.Type)) + } + _, _ = fmt.Fprintf(&sb, "(r%d%s:n%d,s%d,%s)", + replica.ReplicaID, isLeaseholder, replica.NodeID, replica.StoreID, replicaType) + } + _, _ = fmt.Fprintf(&sb, "} usage=%v]", mmaRangeLoad(rangeState[rangeID].usage)) + return sb.String() +} + +func (m *mockMMAAllocator) String() string { + var sb strings.Builder + + var ids []mmaprototype.ChangeID + for id := range m.changes { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + if len(ids) == 0 { + _, _ = fmt.Fprintf(&sb, "\tempty") + } else { + _, _ = fmt.Fprintf(&sb, "\t") + } + for _, id := range ids { + state := m.changes[id] + if id > 0 { + sb.WriteString(", ") + } + _, _ = fmt.Fprintf(&sb, "cid(%d)=", id) + switch state { + case changeRegistered: + _, _ = fmt.Fprintf(&sb, "pending") + case changeSucceeded: + _, _ = fmt.Fprintf(&sb, "success") + case changeFailed: + _, _ = fmt.Fprintf(&sb, "failed") + } + } + _, _ = fmt.Fprintf(&sb, "\n") + return sb.String() +} + +type storeLoad struct { + cpu float64 + writeBandwidth float64 + byteSize int64 + leaseCount int + replicaCount int +} + +// mockStorePool implements the storePool interface for testing. +type mockStorePool struct { + // Track store loads. + load map[roachpb.StoreID]storeLoad +} + +// UpdateLocalStoresAfterLeaseTransfer updates the mock store pool after a lease +// transfer. +func (m *mockStorePool) UpdateLocalStoresAfterLeaseTransfer( + removeFrom, addTo roachpb.StoreID, usage allocator.RangeUsageInfo, +) { + // Update CPU loads after lease transfer. + deltaLoad := storeLoad{ + cpu: usage.RequestCPUNanosPerSecond, //non-raft cpu + leaseCount: 1, + } + // Lease transfer. + addToLoad := m.load[addTo] + addToLoad.cpu += deltaLoad.cpu + addToLoad.leaseCount += deltaLoad.leaseCount + m.load[addTo] = addToLoad + + removeFromLoad := m.load[removeFrom] + removeFromLoad.cpu -= deltaLoad.cpu + removeFromLoad.leaseCount -= deltaLoad.leaseCount + m.load[removeFrom] = removeFromLoad +} + +// UpdateLocalStoreAfterRebalance updates the mock store pool after a rebalance like storepool.UpdateLocalStoreAfterRebalance. +// TODO(wenyihu6): should this function update RequestCPUNanosPerSecond if there is a lease transfer? +func (m *mockStorePool) UpdateLocalStoreAfterRebalance( + storeID roachpb.StoreID, usage allocator.RangeUsageInfo, changeType roachpb.ReplicaChangeType, +) { + deltaLoad := storeLoad{ + cpu: usage.RaftCPUNanosPerSecond, + writeBandwidth: usage.WriteBytesPerSecond, + byteSize: usage.LogicalBytes, + replicaCount: 1, + } + switch changeType { + case roachpb.ADD_VOTER, roachpb.ADD_NON_VOTER: + addToLoad := m.load[storeID] + addToLoad.cpu += deltaLoad.cpu + addToLoad.writeBandwidth += deltaLoad.writeBandwidth + addToLoad.byteSize += deltaLoad.byteSize + addToLoad.replicaCount += deltaLoad.replicaCount + m.load[storeID] = addToLoad + + case roachpb.REMOVE_VOTER, roachpb.REMOVE_NON_VOTER: + removeFromLoad := m.load[storeID] + removeFromLoad.cpu -= deltaLoad.cpu + removeFromLoad.writeBandwidth -= deltaLoad.writeBandwidth + removeFromLoad.byteSize -= deltaLoad.byteSize + removeFromLoad.replicaCount -= deltaLoad.replicaCount + m.load[storeID] = removeFromLoad + } +} + +// String implements the Stringer interface for mockStorePool. +func (m *mockStorePool) String() string { + var sb strings.Builder + storeIDs := make([]roachpb.StoreID, 0, len(m.load)) + for storeID := range m.load { + storeIDs = append(storeIDs, storeID) + } + sort.Slice(storeIDs, func(i, j int) bool { + return storeIDs[i] < storeIDs[j] + }) + for _, storeID := range storeIDs { + load := m.load[storeID] + _, _ = fmt.Fprintf(&sb, "\ts%d: (cpu=%.2f, write_band=%.2f, byte_size=%d, lease_count=%d, replica_count=%d)\n", + storeID, load.cpu, load.writeBandwidth, load.byteSize, load.leaseCount, load.replicaCount) + } + return sb.String() +} + +// AdjustPendingChangesDisposition informs mockMMAAllocator that the pending +// changes have been applied. +func (m *mockMMAAllocator) AdjustPendingChangesDisposition( + changes []mmaprototype.ChangeID, success bool, +) { + for _, id := range changes { + if _, ok := m.changes[id]; !ok { + panic(fmt.Sprintf("change %d not found", id)) + } + if success { + m.changes[id] = changeSucceeded + } else { + m.changes[id] = changeFailed + } + } +} + +// RegisterExternalChanges informs mockMMAAllocator that the external changes +// have been registered. +func (m *mockMMAAllocator) RegisterExternalChanges( + changes []mmaprototype.ReplicaChange, +) []mmaprototype.ChangeID { + changeIDs := make([]mmaprototype.ChangeID, len(changes)) + for i := range changeIDs { + id := m.nextChangeID() + changeIDs[i] = id + m.changes[id] = changeRegistered // Register change as pending + } + return changeIDs +} + +func (m *mockMMAAllocator) nextChangeID() mmaprototype.ChangeID { + id := m.changeSeqGen + m.changeSeqGen++ + return id +} + +func (m *mockMMAAllocator) makeMMAPendingRangeChange( + rangeID roachpb.RangeID, replicaChanges []mmaprototype.ReplicaChange, +) mmaprototype.PendingRangeChange { + changeIDs := make([]mmaprototype.ChangeID, len(replicaChanges)) + for i := range changeIDs { + changeIDs[i] = m.nextChangeID() + m.changes[changeIDs[i]] = changeRegistered + } + return mmaprototype.MakePendingRangeChangeForTesting(rangeID, replicaChanges, changeIDs) +} + +// createTestAllocatorSync creates a test allocator sync with mock dependencies +// and some predefined ranges. +func createTestAllocatorSync(mmaEnabled bool) (*AllocatorSync, *mockMMAAllocator, *mockStorePool) { + st := cluster.MakeTestingClusterSettings() + mma := &mockMMAAllocator{ + changes: make(map[mmaprototype.ChangeID]changeState), + } + sp := &mockStorePool{ + load: make(map[roachpb.StoreID]storeLoad), + } + for i := 1; i <= storeCount; i++ { + sp.load[roachpb.StoreID(i)] = storeLoad{} + } + for _, replica := range rangeState[rangeID].replicas { + load := sp.load[replica.StoreID] + load.cpu += rangeState[rangeID].usage.RaftCPUNanosPerSecond + load.writeBandwidth += rangeState[rangeID].usage.WriteBytesPerSecond + load.byteSize += rangeState[rangeID].usage.LogicalBytes + load.replicaCount++ + sp.load[replica.StoreID] = load + } + leaseholderLoad := sp.load[rangeState[rangeID].leaseholder] + leaseholderLoad.cpu += rangeState[rangeID].usage.RequestCPUNanosPerSecond + leaseholderLoad.leaseCount++ + sp.load[rangeState[rangeID].leaseholder] = leaseholderLoad + + if mmaEnabled { + kvserverbase.LoadBasedRebalancingMode.Override(context.Background(), &st.SV, kvserverbase.LBRebalancingMultiMetric) + } + as := NewAllocatorSync(sp, mma, st) + return as, mma, sp +} + +// getTracked is a helper function to get a tracked change from the allocator sync +// without deleting the change. +func getTracked(as *AllocatorSync, id SyncChangeID) (trackedAllocatorChange, bool) { + as.mu.Lock() + defer as.mu.Unlock() + change, ok := as.mu.trackedChanges[id] + return change, ok +} + +func makeAddNonVoterOp(target int) changeReplicasOp { + return changeReplicasOp{ + chgs: kvpb.ReplicationChanges{ + kvpb.ReplicationChange{ + ChangeType: roachpb.ADD_NON_VOTER, + Target: roachpb.ReplicationTarget{StoreID: roachpb.StoreID(target), NodeID: roachpb.NodeID(target)}, + }, + }, + } +} + +func makeReplaceVoterWithPromotionOp(from, to int) changeReplicasOp { + return changeReplicasOp{ + chgs: kvpb.ReplicationChanges{ + kvpb.ReplicationChange{ + ChangeType: roachpb.ADD_VOTER, + Target: roachpb.ReplicationTarget{StoreID: roachpb.StoreID(to), NodeID: roachpb.NodeID(to)}, + }, + kvpb.ReplicationChange{ + ChangeType: roachpb.REMOVE_NON_VOTER, + Target: roachpb.ReplicationTarget{StoreID: roachpb.StoreID(to), NodeID: roachpb.NodeID(to)}, + }, + kvpb.ReplicationChange{ + ChangeType: roachpb.REMOVE_VOTER, + Target: roachpb.ReplicationTarget{StoreID: roachpb.StoreID(from), NodeID: roachpb.NodeID(from)}, + }, + }, + } +} + +func makeRebalanceVoterOpWithPromotionAndDemotion(from, to int) changeReplicasOp { + return changeReplicasOp{ + chgs: kvpb.ReplicationChanges{ + // Promotion. + kvpb.ReplicationChange{ + ChangeType: roachpb.REMOVE_NON_VOTER, + Target: roachpb.ReplicationTarget{StoreID: roachpb.StoreID(to), NodeID: roachpb.NodeID(to)}, + }, + kvpb.ReplicationChange{ + ChangeType: roachpb.ADD_VOTER, + Target: roachpb.ReplicationTarget{StoreID: roachpb.StoreID(to), NodeID: roachpb.NodeID(to)}, + }, + // Demotion. + kvpb.ReplicationChange{ + ChangeType: roachpb.ADD_NON_VOTER, + Target: roachpb.ReplicationTarget{StoreID: roachpb.StoreID(from), NodeID: roachpb.NodeID(from)}, + }, + kvpb.ReplicationChange{ + ChangeType: roachpb.REMOVE_VOTER, + Target: roachpb.ReplicationTarget{StoreID: roachpb.StoreID(from), NodeID: roachpb.NodeID(from)}, + }, + }, + } +} + +func makeChangeReplicasOperation(opType string, from, to int) changeReplicasOp { + switch opType { + case "add-nonvoter": + return makeAddNonVoterOp(to) + case "replace-voter-with-promotion": + return makeReplaceVoterWithPromotionOp(from, to) + case "rebalance-voter-with-promotion-and-demotion": + return makeRebalanceVoterOpWithPromotionAndDemotion(from, to) + default: + panic(fmt.Sprintf("unknown operation: %s", opType)) + } +} + +func printReplicaChange(change []mmaprototype.ReplicaChange) string { + var sb strings.Builder + _, _ = fmt.Fprintf(&sb, "\t[") + for i, c := range change { + if i > 0 { + _, _ = fmt.Fprintf(&sb, "\t") + } + _, _ = fmt.Fprintf(&sb, "%s", c.String()) + if i < len(change)-1 { + _, _ = fmt.Fprintf(&sb, "\n") + } + } + _, _ = fmt.Fprintf(&sb, "]") + return sb.String() +} + +func printTrackedChange(change trackedAllocatorChange) string { + var sb strings.Builder + if len(change.changeIDs) > 0 { + _, _ = fmt.Fprintf(&sb, "cid:%v", change.changeIDs) + } else { + _, _ = fmt.Fprintf(&sb, "cid:empty") + } + switch { + case change.leaseTransferOp != nil: + _, _ = fmt.Fprintf(&sb, ", lease_transfer_from:s%d->s%d", + change.leaseTransferOp.transferFrom, change.leaseTransferOp.transferTo) + case change.changeReplicasOp != nil: + _, _ = fmt.Fprintf(&sb, ", change_replicas:%v", change.changeReplicasOp.chgs) + } + _, _ = fmt.Fprintf(&sb, ", {request_cpu:%.1f, raft_cpu:%.1f, write_bytes:%.1f, logical_bytes:%d}", + change.usage.RequestCPUNanosPerSecond, change.usage.RaftCPUNanosPerSecond, + change.usage.WriteBytesPerSecond, change.usage.LogicalBytes) + return sb.String() +} + +func printAllocatorSync(as *AllocatorSync) string { + var sb strings.Builder + _, _ = fmt.Fprintf(&sb, "\ttracked:[") + var ids []int + for id := range as.mu.trackedChanges { + ids = append(ids, int(id)) + } + sort.Ints(ids) + + for i, id := range ids { + change := as.mu.trackedChanges[SyncChangeID(id)] + if i > 0 { + _, _ = fmt.Fprintf(&sb, ",") + } + _, _ = fmt.Fprintf(&sb, "sync_id=%d->(%s)", id, printTrackedChange(change)) + } + _, _ = fmt.Fprintf(&sb, "]\n") + return sb.String() +} + +var fromSupported = map[string]bool{ + "add-nonvoter": false, + "replace-voter-with-promotion": true, + "rebalance-voter-with-promotion-and-demotion": true, +} + +const storeCount = 4 + +// TestDataDriven is a data-driven test for the allocator sync functionality. +// It provides the following commands: +// +// - "init" +// Initialize a new allocator sync with mock dependencies. +// Args: mma_enabled= +// +// - "pre-apply-lease-transfer" +// Register a lease transfer operation with the allocator sync. +// Args: range_id= from= to= from_mma= +// +// - "pre-apply-change-replicas" +// Register a replica change operation with the allocator sync. +// Args: range_id= type= from= to= from_mma= +// +// - "mark-changes-failed" +// Mark specific change IDs as failed. +// Args: change_ids= +// +// - "get-tracked-change" +// Get details about a tracked change. +// Args: sync_id= +// +// - "post-apply" +// Apply a tracked change with success or failure. +// Args: id= success= +// +// - "print" +// Print the current state of the allocator sync, MMA state and store pool. +// +// - "make-operation" +// Create a replication operation based on the given type and store IDs. +// Args: type= to= from= +// +// - "convert-lease-transfer" +// Convert a lease transfer operation to MMA format. +// Args: transfer_from= transfer_to= +// +// - "convert-replica-change" +// Convert a replica change operation to MMA format. +// Args: change=, (multiple) +func TestDataDrivenMMAIntegration(t *testing.T) { + defer leaktest.AfterTest(t)() + + datadriven.Walk(t, datapathutils.TestDataPath(t, "mmaintegration"), func(t *testing.T, path string) { + var as *AllocatorSync + var mma *mockMMAAllocator + var sp *mockStorePool + + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + var mmaEnabled bool + d.ScanArgs(t, "mma_enabled", &mmaEnabled) + as, mma, sp = createTestAllocatorSync(mmaEnabled) + return printRangeState(rangeID) + case "mark-changes-failed": + var changeIDsStr string + d.ScanArgs(t, "change_ids", &changeIDsStr) + + changeIDStrs := strings.Split(changeIDsStr, ",") + var changeIDs []mmaprototype.ChangeID + for _, idStr := range changeIDStrs { + id, err := strconv.Atoi(strings.TrimSpace(idStr)) + if err != nil { + d.Fatalf(t, "invalid change ID: %s", idStr) + } + changeIDs = append(changeIDs, mmaprototype.ChangeID(id)) + } + as.MarkChangesAsFailed(changeIDs) + return fmt.Sprintf("marked %v as failed", changeIDs) + case "get-tracked-change": + var id int + d.ScanArgs(t, "sync_id", &id) + change, ok := getTracked(as, SyncChangeID(id)) + if !ok { + return fmt.Sprintf("change not found: %d", id) + } + return printTrackedChange(change) + + case "post-apply": + var id int + var success bool + d.ScanArgs(t, "id", &id) + d.ScanArgs(t, "success", &success) + as.PostApply(SyncChangeID(id), success) + return fmt.Sprintf("applied change %d with success=%v", id, success) + + case "print": + var stringBuilder strings.Builder + _, _ = fmt.Fprintf(&stringBuilder, "allocator_sync:\n%s", printAllocatorSync(as)) + _, _ = fmt.Fprintf(&stringBuilder, "mma_state:\n%s", mma.String()) + _, _ = fmt.Fprintf(&stringBuilder, "store_pool:\n%s", sp.String()) + return stringBuilder.String() + + case "pre-apply-lease-transfer": + var to, from, rangeID int + var fromMMA bool + d.ScanArgs(t, "to", &to) + d.ScanArgs(t, "from", &from) + d.ScanArgs(t, "range_id", &rangeID) + d.ScanArgs(t, "from_mma", &fromMMA) + rangeState := rangeState[roachpb.RangeID(rangeID)] + var stringBuilder strings.Builder + desc := &roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(rangeID), + InternalReplicas: rangeState.replicas, + } + if to > storeCount || from > storeCount { + return fmt.Sprintf("invalid target or from store id: %d, %d", to, from) + } + fromID := roachpb.ReplicationTarget{StoreID: roachpb.StoreID(from), NodeID: roachpb.NodeID(from)} + toID := roachpb.ReplicationTarget{StoreID: roachpb.StoreID(to), NodeID: roachpb.NodeID(to)} + mmaChange := convertLeaseTransferToMMA( + desc, + rangeState.usage, + fromID, + toID, + ) + var syncID SyncChangeID + if fromMMA { + prc := mma.makeMMAPendingRangeChange( + roachpb.RangeID(rangeID), + mmaChange, + ) + syncID = as.MMAPreApply(rangeState.usage, prc) + } else { + syncID = as.NonMMAPreTransferLease( + desc, + rangeState.usage, + fromID, + toID, + ) + } + _, _ = fmt.Fprintf(&stringBuilder, "sync_change_id:%d\n", syncID) + _, _ = fmt.Fprintf(&stringBuilder, "mma_change:\n%v\n", printReplicaChange(mmaChange)) + return stringBuilder.String() + case "pre-apply-change-replicas": + var opType string + var to, from, rangeID int + var fromMMA bool + d.ScanArgs(t, "type", &opType) + d.ScanArgs(t, "to", &to) + hasFrom := d.MaybeScanArgs(t, "from", &from) + if b, ok := fromSupported[opType]; ok && b != hasFrom { + panic(fmt.Sprintf("%v from(%t)!=from_supported(%t)", opType, hasFrom, b)) + } + d.ScanArgs(t, "range_id", &rangeID) + d.ScanArgs(t, "from_mma", &fromMMA) + op := makeChangeReplicasOperation(opType, from, to) + if to > storeCount || from > storeCount { + return fmt.Sprintf("invalid target or from store id: %d, %d", to, from) + } + rangeState := rangeState[roachpb.RangeID(rangeID)] + var stringBuilder strings.Builder + desc := &roachpb.RangeDescriptor{ + RangeID: roachpb.RangeID(rangeID), + InternalReplicas: rangeState.replicas, + } + mmaChange := convertReplicaChangeToMMA( + desc, + rangeState.usage, + op.chgs, + rangeState.leaseholder, + ) + var syncID SyncChangeID + if fromMMA { + prc := mma.makeMMAPendingRangeChange( + roachpb.RangeID(rangeID), + mmaChange, + ) + syncID = as.MMAPreApply(rangeState.usage, prc) + } else { + syncID = as.NonMMAPreChangeReplicas( + desc, + rangeState.usage, + op.chgs, + rangeState.leaseholder, + ) + } + _, _ = fmt.Fprintf(&stringBuilder, "sync_change_id:%d\n", syncID) + _, _ = fmt.Fprintf(&stringBuilder, "mma_change:\n%v\n", printReplicaChange(mmaChange)) + return stringBuilder.String() + default: + d.Fatalf(t, "unknown command: %s", d.Cmd) + return "" + } + }) + }) +} diff --git a/pkg/kv/kvserver/mmaintegration/datadriven_storeload_test.go b/pkg/kv/kvserver/mmaintegration/datadriven_storeload_test.go new file mode 100644 index 000000000000..418062a9764e --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/datadriven_storeload_test.go @@ -0,0 +1,101 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package mmaintegration + +import ( + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/datadriven" +) + +// TestDataDrivenStoreLoadMsg is a data-driven test for the store load functionality. +// It provides the following commands: +// +// - "make-store-load-msg" +// Creates a StoreLoadMsg from a StoreDescriptor. +// Args: store_id= cpu_per_second= write_bytes_per_second= +// logical_bytes= capacity= available= lease_count= range_count= +// node_cpu_rate_capacity= node_cpu_rate_usage= stores_cpu_rate= num_stores= +// timestamp= +func TestDataDrivenStoreLoadMsg(t *testing.T) { + defer leaktest.AfterTest(t)() + + const nodeID = 1 + + datadriven.Walk(t, datapathutils.TestDataPath(t, "storeload"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "make-store-load-msg": + var storeID, writeBytesPerSecond, logicalBytes, capacity, available, leaseCount, rangeCount int + var nodeCPURateCapacity, nodeCPURateUsage, storesCPURate, numStores, timestamp int + var cpuPerSecond float64 + + d.ScanArgs(t, "store_id", &storeID) + d.ScanArgs(t, "cpu_per_second", &cpuPerSecond) + d.ScanArgs(t, "write_bytes_per_second", &writeBytesPerSecond) + d.ScanArgs(t, "logical_bytes", &logicalBytes) + d.ScanArgs(t, "capacity", &capacity) + d.ScanArgs(t, "available", &available) + d.ScanArgs(t, "lease_count", &leaseCount) + d.ScanArgs(t, "range_count", &rangeCount) + d.ScanArgs(t, "node_cpu_rate_capacity", &nodeCPURateCapacity) + d.ScanArgs(t, "node_cpu_rate_usage", &nodeCPURateUsage) + d.ScanArgs(t, "stores_cpu_rate", &storesCPURate) + d.ScanArgs(t, "num_stores", &numStores) + d.ScanArgs(t, "timestamp", ×tamp) + + desc := roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(storeID), + Node: roachpb.NodeDescriptor{ + NodeID: roachpb.NodeID(nodeID), + }, + Capacity: roachpb.StoreCapacity{ + CPUPerSecond: cpuPerSecond, + WriteBytesPerSecond: float64(writeBytesPerSecond), + LogicalBytes: int64(logicalBytes), + Capacity: int64(capacity), + Available: int64(available), + LeaseCount: int32(leaseCount), + RangeCount: int32(rangeCount), + }, + NodeCapacity: roachpb.NodeCapacity{ + NodeCPURateCapacity: int64(nodeCPURateCapacity), + NodeCPURateUsage: int64(nodeCPURateUsage), + StoresCPURate: int64(storesCPURate), + NumStores: int32(numStores), + }, + } + + msg := MakeStoreLoadMsg(desc, int64(timestamp)) + return formatStoreLoadMsg(msg) + default: + d.Fatalf(t, "unknown command: %s", d.Cmd) + return "" + } + }) + }) +} + +// formatStoreLoadMsg formats a StoreLoadMsg for output. +func formatStoreLoadMsg(msg mmaprototype.StoreLoadMsg) string { + var sb strings.Builder + _, _ = fmt.Fprintf(&sb, "s%d,n%d at %d:\n", msg.StoreID, msg.NodeID, msg.LoadTime.UnixNano()) + _, _ = fmt.Fprintf(&sb, "load[cpu]=%v\n", msg.Load[mmaprototype.CPURate]) + _, _ = fmt.Fprintf(&sb, "load[write]=%v\n", msg.Load[mmaprototype.WriteBandwidth]) + _, _ = fmt.Fprintf(&sb, "load[byte]=%v\n", msg.Load[mmaprototype.ByteSize]) + _, _ = fmt.Fprintf(&sb, "capacity[cpu]=%v\n", msg.Capacity[mmaprototype.CPURate]) + _, _ = fmt.Fprintf(&sb, "capacity[write]=%v\n", msg.Capacity[mmaprototype.WriteBandwidth]) + _, _ = fmt.Fprintf(&sb, "capacity[byte]=%v\n", msg.Capacity[mmaprototype.ByteSize]) + _, _ = fmt.Fprintf(&sb, "secondary_load[lease]=%v\n", msg.SecondaryLoad[mmaprototype.LeaseCount]) + _, _ = fmt.Fprintf(&sb, "secondary_load[replica]=%v\n", msg.SecondaryLoad[mmaprototype.ReplicaCount]) + return sb.String() +} diff --git a/pkg/kv/kvserver/mmaintegration/mma_conversion.go b/pkg/kv/kvserver/mmaintegration/mma_conversion.go new file mode 100644 index 000000000000..1b16025915c3 --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/mma_conversion.go @@ -0,0 +1,126 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package mmaintegration + +import ( + "fmt" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" +) + +// convertLeaseTransferToMMA converts a lease transfer operation to mma replica +// changes. It will be passed to mma.RegisterExternalChanges. +func convertLeaseTransferToMMA( + desc *roachpb.RangeDescriptor, + usage allocator.RangeUsageInfo, + transferFrom, transferTo roachpb.ReplicationTarget, +) []mmaprototype.ReplicaChange { + // TODO(wenyihu6): we are passing existing replicas to + // mmaprototype.MakeLeaseTransferChanges just to get the add and remove + // replica state. See if things could be cleaned up. + existingReplicas := make([]mmaprototype.StoreIDAndReplicaState, len(desc.InternalReplicas)) + for i, replica := range desc.Replicas().Descriptors() { + existingReplicas[i] = mmaprototype.StoreIDAndReplicaState{ + StoreID: replica.StoreID, + ReplicaState: mmaprototype.ReplicaState{ + ReplicaIDAndType: mmaprototype.ReplicaIDAndType{ + ReplicaID: replica.ReplicaID, + ReplicaType: mmaprototype.ReplicaType{ + ReplicaType: replica.Type, + // transferFrom is the leaseholder replica. + IsLeaseholder: replica.StoreID == transferFrom.StoreID, + }, + }, + }, + } + } + replicaChanges := mmaprototype.MakeLeaseTransferChanges(desc.RangeID, + existingReplicas, + mmaRangeLoad(usage), + transferTo, + transferFrom, + ) + return replicaChanges[:] +} + +// convertReplicaChangeToMMA converts a replica change to mma replica changes. +// It will be passed to mma.RegisterExternalChanges. +func convertReplicaChangeToMMA( + desc *roachpb.RangeDescriptor, + usage allocator.RangeUsageInfo, + changes kvpb.ReplicationChanges, + leaseholderStoreID roachpb.StoreID, +) []mmaprototype.ReplicaChange { + rLoad := mmaRangeLoad(usage) + replicaChanges := make([]mmaprototype.ReplicaChange, 0, len(changes)) + replicaSet := desc.Replicas() + + var lhBeingRemoved bool + // TODO(wenyihu6): check what type of replication changes will there be + // here (can there be two voter removals or additions at the same time?) + // If yes, lhBeingRemoved may be wrong. + if (len(changes.VoterRemovals()) > 1 || len(changes.VoterAdditions()) > 1) && buildutil.CrdbTestBuild { + panic("voter removals should not be more than one at a time") + } + + // Put remove replica changes first so that we can see if the leaseholder + // is being removed. + for _, chg := range changes { + if chg.ChangeType == roachpb.REMOVE_VOTER || chg.ChangeType == roachpb.REMOVE_NON_VOTER { + filteredSet := replicaSet.Filter(func(r roachpb.ReplicaDescriptor) bool { + return r.StoreID == chg.Target.StoreID + }) + replDescriptors := filteredSet.Descriptors() + if len(replDescriptors) != 1 { + panic(fmt.Sprintf( + "no replica found for removal target=%v post-filter=%v pre-filter=%v", + chg.Target.StoreID, replDescriptors, desc)) + } + replDesc := replDescriptors[0] + lhBeingRemoved = replDesc.StoreID == leaseholderStoreID + replicaChanges = append(replicaChanges, mmaprototype.MakeRemoveReplicaChange( + desc.RangeID, rLoad, mmaprototype.ReplicaState{ + ReplicaIDAndType: mmaprototype.ReplicaIDAndType{ + ReplicaID: replDesc.ReplicaID, + ReplicaType: mmaprototype.ReplicaType{ + ReplicaType: replDesc.Type, + IsLeaseholder: lhBeingRemoved, + }, + }, + }, + chg.Target)) + } + } + for _, chg := range changes { + switch { + case chg.ChangeType == roachpb.ADD_VOTER || chg.ChangeType == roachpb.ADD_NON_VOTER: + rType := roachpb.VOTER_FULL + if chg.ChangeType == roachpb.ADD_NON_VOTER { + rType = roachpb.NON_VOTER + } + replicaChanges = append(replicaChanges, mmaprototype.MakeAddReplicaChange( + desc.RangeID, rLoad, mmaprototype.ReplicaState{ + ReplicaIDAndType: mmaprototype.ReplicaIDAndType{ + ReplicaType: mmaprototype.ReplicaType{ + ReplicaType: rType, + // TODO(sumeer): can there be multiple ADD_VOTERs? + IsLeaseholder: lhBeingRemoved && chg.ChangeType == roachpb.ADD_VOTER, + }, + }, + }, chg.Target)) + case chg.ChangeType == roachpb.REMOVE_VOTER || chg.ChangeType == roachpb.REMOVE_NON_VOTER: + // Handled above. + continue + default: + panic("unimplemented change type") + } + } + return replicaChanges +} diff --git a/pkg/kv/kvserver/allocator/mmaprototypehelpers/allocator_mma_integration.go b/pkg/kv/kvserver/mmaintegration/store_load_msg.go similarity index 83% rename from pkg/kv/kvserver/allocator/mmaprototypehelpers/allocator_mma_integration.go rename to pkg/kv/kvserver/mmaintegration/store_load_msg.go index 2755f6ea73d6..27ed29fce21a 100644 --- a/pkg/kv/kvserver/allocator/mmaprototypehelpers/allocator_mma_integration.go +++ b/pkg/kv/kvserver/mmaintegration/store_load_msg.go @@ -3,15 +3,17 @@ // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package mmaprototypehelpers +package mmaintegration import ( - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) +// MakeStoreLoadMsg makes a store load message. +// +// TODO(wenyihu6): Add more tests for this function. func MakeStoreLoadMsg( desc roachpb.StoreDescriptor, origTimestampNanos int64, ) mmaprototype.StoreLoadMsg { @@ -133,34 +135,3 @@ func MakeStoreLoadMsg( LoadTime: timeutil.FromUnixNanos(origTimestampNanos), } } - -// UsageInfoToMMALoad converts a RangeUsageInfo to a mmaprototype.RangeLoad. -func UsageInfoToMMALoad(usage allocator.RangeUsageInfo) mmaprototype.RangeLoad { - lv := mmaprototype.LoadVector{} - lv[mmaprototype.CPURate] = mmaprototype.LoadValue(usage.RequestCPUNanosPerSecond) + mmaprototype.LoadValue(usage.RaftCPUNanosPerSecond) - lv[mmaprototype.WriteBandwidth] = mmaprototype.LoadValue(usage.WriteBytesPerSecond) - lv[mmaprototype.ByteSize] = mmaprototype.LoadValue(usage.LogicalBytes) - return mmaprototype.RangeLoad{ - Load: lv, - RaftCPU: mmaprototype.LoadValue(usage.RaftCPUNanosPerSecond), - } -} - -// ReplicaDescriptorToReplicaIDAndType converts a ReplicaDescriptor to a -// StoreIDAndReplicaState. The leaseholder store is passed in as lh. -func ReplicaDescriptorToReplicaIDAndType( - desc roachpb.ReplicaDescriptor, lh roachpb.StoreID, -) mmaprototype.StoreIDAndReplicaState { - return mmaprototype.StoreIDAndReplicaState{ - StoreID: desc.StoreID, - ReplicaState: mmaprototype.ReplicaState{ - ReplicaIDAndType: mmaprototype.ReplicaIDAndType{ - ReplicaID: desc.ReplicaID, - ReplicaType: mmaprototype.ReplicaType{ - ReplicaType: desc.Type, - IsLeaseholder: desc.StoreID == lh, - }, - }, - }, - } -} diff --git a/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/add_nonvoter.txt b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/add_nonvoter.txt new file mode 100644 index 000000000000..f951a32637db --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/add_nonvoter.txt @@ -0,0 +1,41 @@ +# This test registers a change replica operation which adds a non-voter +# replica to node s3. +init mma_enabled=true +---- +range_id=1 [replicas:{(r1*:n1,s1,voter),(r2:n2,s2,non-voter)} usage=(cpu:150,raft-cpu:50,write-band:1024,byte-size:2048)] + +pre-apply-change-replicas range_id=1 type=add-nonvoter to=3 from_mma=false +---- +sync_change_id:1 +mma_change: + [r1:add-replica,target=(n3,s3), prev=(replica-id=none,type=VOTER_FULL,lagging=false)->next=(replica-id=unknown,type=NON_VOTER)] + +print +---- +allocator_sync: + tracked:[sync_id=1->(cid:[0], change_replicas:[{ADD_NON_VOTER n3,s3}], {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048})] +mma_state: + cid(0)=pending +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + +post-apply id=1 success=true +---- +applied change 1 with success=true + +# s3 has a new non-voter (cpu:0+50(raft)=50, write_band:0+1024=1024, byte_size:0+2048=2048) +# and one more replica count. Rest remain the same. +print +---- +allocator_sync: + tracked:[] +mma_state: + cid(0)=success +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) diff --git a/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/add_nonvoter_mma.txt b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/add_nonvoter_mma.txt new file mode 100644 index 000000000000..afb05ea25805 --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/add_nonvoter_mma.txt @@ -0,0 +1,41 @@ +# This test registers a change replica operation which adds a non-voter +# replica to node s3. It is the same as add_nonvoter.txt but represents +# a change generated by MMA directly (and call into MMAPreApply). It has +# from_mma=true. Note that all output is the same as add_nonvoter.txt. +init mma_enabled=true +---- +range_id=1 [replicas:{(r1*:n1,s1,voter),(r2:n2,s2,non-voter)} usage=(cpu:150,raft-cpu:50,write-band:1024,byte-size:2048)] + +pre-apply-change-replicas range_id=1 type=add-nonvoter to=3 from_mma=true +---- +sync_change_id:1 +mma_change: + [r1:add-replica,target=(n3,s3), prev=(replica-id=none,type=VOTER_FULL,lagging=false)->next=(replica-id=unknown,type=NON_VOTER)] + +print +---- +allocator_sync: + tracked:[sync_id=1->(cid:[0], change_replicas:[{ADD_NON_VOTER n3,s3}], {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048})] +mma_state: + cid(0)=pending +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + +post-apply id=1 success=true +---- +applied change 1 with success=true + +print +---- +allocator_sync: + tracked:[] +mma_state: + cid(0)=success +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) diff --git a/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/lease_transfer.txt b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/lease_transfer.txt new file mode 100644 index 000000000000..1312d45ba790 --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/lease_transfer.txt @@ -0,0 +1,61 @@ +# This test registers a lease transfer operation which transfers the lease # +# from s1 to s2. +init mma_enabled=true +---- +range_id=1 [replicas:{(r1*:n1,s1,voter),(r2:n2,s2,non-voter)} usage=(cpu:150,raft-cpu:50,write-band:1024,byte-size:2048)] + +print +---- +allocator_sync: + tracked:[] +mma_state: + empty +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + +pre-apply-lease-transfer range_id=1 from=1 to=2 from_mma=false +---- +sync_change_id:1 +mma_change: + [r1:remove-lease,target=(n1,s1), prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false)->next=(replica-id=1,type=VOTER_FULL) + r1:add-lease,target=(n2,s2), prev=(replica-id=2,type=NON_VOTER,lagging=false)->next=(replica-id=2,type=NON_VOTER,leaseholder=true)] + +get-tracked-change sync_id=1 +---- +cid:[0 1], lease_transfer_from:s1->s2, {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048} + +# cid(0) is the remove-lease change, cid(1) is the add-lease change. They are +# both pending. +print +---- +allocator_sync: + tracked:[sync_id=1->(cid:[0 1], lease_transfer_from:s1->s2, {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048})] +mma_state: + cid(0)=pending, cid(1)=pending +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + +# After applying the change, s1 has one less lease and s2 has one more lease. +# Non-raft cpu of the range is 150-50=100. s1 has 150-100=50 cpu, s2 has +# 50+100=150 cpu. Rest remain the same. +post-apply id=1 success=true +---- +applied change 1 with success=true + +print +---- +allocator_sync: + tracked:[] +mma_state: + cid(0)=success, cid(1)=success +store_pool: + s1: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s2: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) diff --git a/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/lease_transfer_mma.txt b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/lease_transfer_mma.txt new file mode 100644 index 000000000000..963502cc1851 --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/lease_transfer_mma.txt @@ -0,0 +1,58 @@ +# This test registers a lease transfer operation which transfers the lease +# from s1 to s2. It is the same as lease_transfer.txt but represents +# a change generated by MMA directly (and call into MMAPreApply). It has +# from_mma=true. Note that all output is the same as lease_transfer.txt. +init mma_enabled=true +---- +range_id=1 [replicas:{(r1*:n1,s1,voter),(r2:n2,s2,non-voter)} usage=(cpu:150,raft-cpu:50,write-band:1024,byte-size:2048)] + +print +---- +allocator_sync: + tracked:[] +mma_state: + empty +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + +pre-apply-lease-transfer range_id=1 from=1 to=2 from_mma=true +---- +sync_change_id:1 +mma_change: + [r1:remove-lease,target=(n1,s1), prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false)->next=(replica-id=1,type=VOTER_FULL) + r1:add-lease,target=(n2,s2), prev=(replica-id=2,type=NON_VOTER,lagging=false)->next=(replica-id=2,type=NON_VOTER,leaseholder=true)] + +get-tracked-change sync_id=1 +---- +cid:[0 1], lease_transfer_from:s1->s2, {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048} + +print +---- +allocator_sync: + tracked:[sync_id=1->(cid:[0 1], lease_transfer_from:s1->s2, {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048})] +mma_state: + cid(0)=pending, cid(1)=pending +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + +post-apply id=1 success=true +---- +applied change 1 with success=true + +print +---- +allocator_sync: + tracked:[] +mma_state: + cid(0)=success, cid(1)=success +store_pool: + s1: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s2: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) diff --git a/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/marked_as_failed.txt b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/marked_as_failed.txt new file mode 100644 index 000000000000..bcc3e03d128d --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/marked_as_failed.txt @@ -0,0 +1,58 @@ +# This test marks a change as failed. It makes sure that when one change id is +# marked as failed, other change ids are not affected. Note that in reality, +# marked as failed will only be called before any pre-apply. But it is still +# useful to test this case. +init mma_enabled=true +---- +range_id=1 [replicas:{(r1*:n1,s1,voter),(r2:n2,s2,non-voter)} usage=(cpu:150,raft-cpu:50,write-band:1024,byte-size:2048)] + +print +---- +allocator_sync: + tracked:[] +mma_state: + empty +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + +pre-apply-lease-transfer range_id=1 from=1 to=2 from_mma=false +---- +sync_change_id:1 +mma_change: + [r1:remove-lease,target=(n1,s1), prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false)->next=(replica-id=1,type=VOTER_FULL) + r1:add-lease,target=(n2,s2), prev=(replica-id=2,type=NON_VOTER,lagging=false)->next=(replica-id=2,type=NON_VOTER,leaseholder=true)] + +get-tracked-change sync_id=1 +---- +cid:[0 1], lease_transfer_from:s1->s2, {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048} + +print +---- +allocator_sync: + tracked:[sync_id=1->(cid:[0 1], lease_transfer_from:s1->s2, {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048})] +mma_state: + cid(0)=pending, cid(1)=pending +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + +mark-changes-failed change_ids=1 +---- +marked [1] as failed + +print +---- +allocator_sync: + tracked:[sync_id=1->(cid:[0 1], lease_transfer_from:s1->s2, {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048})] +mma_state: + cid(0)=pending, cid(1)=failed +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) diff --git a/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/rebalance_voter_with_promo_demo.txt b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/rebalance_voter_with_promo_demo.txt new file mode 100644 index 000000000000..dab279a4e87b --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/rebalance_voter_with_promo_demo.txt @@ -0,0 +1,52 @@ +# This test registers a range rebalance operation which promotes a non-voter +# to a voter and demotes a voter to a non-voter. It has from_mma=false. +init mma_enabled=true +---- +range_id=1 [replicas:{(r1*:n1,s1,voter),(r2:n2,s2,non-voter)} usage=(cpu:150,raft-cpu:50,write-band:1024,byte-size:2048)] + +# Note the leaseholder=true field in the mma_change output: +# - When removing the voter from s1, leaseholder=true is set since s1 was the original leaseholder +# - When adding the voter to s2, leaseholder=true is set since s2 will become the new leaseholder +# - The other remove/add operations don't set leaseholder since they don't involve lease transfers directly. +pre-apply-change-replicas range_id=1 type=rebalance-voter-with-promotion-and-demotion from=1 to=2 from_mma=false +---- +sync_change_id:1 +mma_change: + [r1:remove-replica,target=(n2,s2), prev=(replica-id=2,type=NON_VOTER,lagging=false)->next=(replica-id=none,type=VOTER_FULL) + r1:remove-replica,target=(n1,s1), prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false)->next=(replica-id=none,type=VOTER_FULL) + r1:add-replica,target=(n2,s2), prev=(replica-id=none,type=VOTER_FULL,lagging=false)->next=(replica-id=unknown,type=VOTER_FULL,leaseholder=true) + r1:add-replica,target=(n1,s1), prev=(replica-id=none,type=VOTER_FULL,lagging=false)->next=(replica-id=unknown,type=NON_VOTER)] + +print +---- +allocator_sync: + tracked:[sync_id=1->(cid:[0 1 2 3], change_replicas:[{REMOVE_NON_VOTER n2,s2} {ADD_VOTER n2,s2} {ADD_NON_VOTER n1,s1} {REMOVE_VOTER n1,s1}], {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048})] +mma_state: + cid(0)=pending, cid(1)=pending, cid(2)=pending, cid(3)=pending +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + +post-apply id=1 success=true +---- +applied change 1 with success=true + +# Since s1 is the original leaseholder and s1 is demoted and s2 is promoted, +# s1 should have one less lease and s2 should have one more lease. However, +# it is unfortunate that UpdateLocalStoreAfterRebalance doesn't update the lease count. +# So we still have s1 with one lease and s2 with no lease. Same with cpu. +# TODO(weniyhu6): is this a bug? + +print +---- +allocator_sync: + tracked:[] +mma_state: + cid(0)=success, cid(1)=success, cid(2)=success, cid(3)=success +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) diff --git a/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/replace_voter_with_promotion.txt b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/replace_voter_with_promotion.txt new file mode 100644 index 000000000000..f55bc6a62b50 --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/testdata/mmaintegration/replace_voter_with_promotion.txt @@ -0,0 +1,55 @@ +# This test registers a range rebalance operation which replaces a voter by promoting a non-voter +# to a voter. It has from_mma=false. +init mma_enabled=true +---- +range_id=1 [replicas:{(r1*:n1,s1,voter),(r2:n2,s2,non-voter)} usage=(cpu:150,raft-cpu:50,write-band:1024,byte-size:2048)] + +# Note the leaseholder=true field in the mma_change output: +# - When removing the voter from s1, leaseholder=true is set since s1 was the original leaseholder +# - When adding the voter to s2, leaseholder=true is set since s2 will become the new leaseholder +# - When removing the non-voter from s2, we don't set leaseholder since they don't involve lease transfers directly. +pre-apply-change-replicas range_id=1 type=replace-voter-with-promotion from=1 to=2 from_mma=false +---- +sync_change_id:1 +mma_change: + [r1:remove-replica,target=(n2,s2), prev=(replica-id=2,type=NON_VOTER,lagging=false)->next=(replica-id=none,type=VOTER_FULL) + r1:remove-replica,target=(n1,s1), prev=(replica-id=1,type=VOTER_FULL,leaseholder=true,lagging=false)->next=(replica-id=none,type=VOTER_FULL) + r1:add-replica,target=(n2,s2), prev=(replica-id=none,type=VOTER_FULL,lagging=false)->next=(replica-id=unknown,type=VOTER_FULL,leaseholder=true)] + +print +---- +allocator_sync: + tracked:[sync_id=1->(cid:[0 1 2], change_replicas:[{ADD_VOTER n2,s2} {REMOVE_NON_VOTER n2,s2} {REMOVE_VOTER n1,s1}], {request_cpu:100.0, raft_cpu:50.0, write_bytes:1024.0, logical_bytes:2048})] +mma_state: + cid(0)=pending, cid(1)=pending, cid(2)=pending +store_pool: + s1: (cpu=150.00, write_band=1024.00, byte_size=2048, lease_count=1, replica_count=1) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + +# Since s1 lost the voter, it should have one less lease and one less replica count. However, +# it is unfortunate that UpdateLocalStoreAfterRebalance doesn't update the lease count. +# TODO(weniyhu6): is this a bug in storepool? Should we also differentiate load +# change delta based on whether we are adding a voter v.s. a non-voter? For +# example, we are promoting s2 here. But no change is reflected in mock or real store pool on +# that. + +# After applying the change, s1 has one less replica count. s2's replica count is unchanged. +# s1 should have 0 across load and s2 has no load change. + +post-apply id=1 success=true +---- +applied change 1 with success=true + +print +---- +allocator_sync: + tracked:[] +mma_state: + cid(0)=success, cid(1)=success, cid(2)=success +store_pool: + s1: (cpu=100.00, write_band=0.00, byte_size=0, lease_count=1, replica_count=0) + s2: (cpu=50.00, write_band=1024.00, byte_size=2048, lease_count=0, replica_count=1) + s3: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) + s4: (cpu=0.00, write_band=0.00, byte_size=0, lease_count=0, replica_count=0) diff --git a/pkg/kv/kvserver/mmaintegration/testdata/storeload/almost_zero_util.txt b/pkg/kv/kvserver/mmaintegration/testdata/storeload/almost_zero_util.txt new file mode 100644 index 000000000000..1278d600fc57 --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/testdata/storeload/almost_zero_util.txt @@ -0,0 +1,23 @@ +# This test has the same set up as basic_store_load.txt but focuses on +# the case when cpu utilization and byte size utilization is almost zero. +# +# Basic: make-store-load-msg store_id=1 cpu_per_second=1000 +# write_bytes_per_second=1024 logical_bytes=2048 capacity=1000 available=200 +# lease_count=500 range_count=1000 node_cpu_rate_capacity=2000 +# node_cpu_rate_usage=1000 stores_cpu_rate=1500 num_stores=2 timestamp=1234567890 +# If StoresCPURate is 0, store_capacity = ((NodeCPURateCapacity(200000000000)/2)/NumStores(2)) = 50000000000. +# - desc.NodeCapacity.NodeCPURateUsage = 1 +# - desc.NodeCapacity.NodeCPURateCapacity = 200000000000 +# If capacity == available, byte size capacity is Available(200). +# - desc.Capacity.Capacity = desc.Capacity.Available +make-store-load-msg store_id=1 cpu_per_second=1000 write_bytes_per_second=1024 logical_bytes=2048 capacity=200 available=200 lease_count=500 range_count=1000 node_cpu_rate_capacity=200000000000 node_cpu_rate_usage=1 stores_cpu_rate=1500 num_stores=2 timestamp=1234567890 +---- +s1,n1 at 1234567890: +load[cpu]=1000 +load[write]=1024 +load[byte]=2048 +capacity[cpu]=50000000000 +capacity[write]=9223372036854775807 +capacity[byte]=200 +secondary_load[lease]=500 +secondary_load[replica]=1000 diff --git a/pkg/kv/kvserver/mmaintegration/testdata/storeload/basic_store_load.txt b/pkg/kv/kvserver/mmaintegration/testdata/storeload/basic_store_load.txt new file mode 100644 index 000000000000..875e0b5e8cf6 --- /dev/null +++ b/pkg/kv/kvserver/mmaintegration/testdata/storeload/basic_store_load.txt @@ -0,0 +1,29 @@ +# This test is to test the basic case for MakeStoreLoadMsg. +# Load: +# load[cpu] = CPUPerSecond(1000) +# load[write] = WriteBytesPerSecond(1024) +# load[byte] = LogicalBytes(2048) +# +# Capacity: +# cpuUtil = NodeCPURateUsage(1000)/NodeCPURateCapacity(2000) (50%) +# nodeCapacity = StoresCPURate(1500)/cpuUtil(0.5) (3000) +# capacity[cpu] = nodeCapacity(3000)/NumStores(2) (1500) +# +# capacity[write] = UnknownCapacity (9223372036854775807) +# +# byteSizeUtil = (Capacity(1000)-Available(200))/Capacity(1000) = 0.8 +# capacity[byteSize] = LogicalBytes(2048)/byteSizeUtil(0.8) (2560) +# +# Secondary load: +# lease count = 500, range count = 1000 +make-store-load-msg store_id=1 cpu_per_second=1000 write_bytes_per_second=1024 logical_bytes=2048 capacity=1000 available=200 lease_count=500 range_count=1000 node_cpu_rate_capacity=2000 node_cpu_rate_usage=1000 stores_cpu_rate=1500 num_stores=2 timestamp=1234567890 +---- +s1,n1 at 1234567890: +load[cpu]=1000 +load[write]=1024 +load[byte]=2048 +capacity[cpu]=1500 +capacity[write]=9223372036854775807 +capacity[byte]=2560 +secondary_load[lease]=500 +secondary_load[replica]=1000 diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 4ad15a401f72..9bfac52f27f7 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -525,6 +526,7 @@ type replicateQueue struct { *baseQueue metrics ReplicateQueueMetrics allocator allocatorimpl.Allocator + as *mmaintegration.AllocatorSync storePool storepool.AllocatorStorePool planner plan.ReplicationPlanner @@ -558,6 +560,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica logTracesThresholdFunc: makeRateLimitedTimeoutFuncByPermittedSlowdown( permittedRangeScanSlowdown/2, rebalanceSnapshotRate, ), + as: store.cfg.AllocatorSync, } store.metrics.registry.AddMetricStruct(&rq.metrics) rq.baseQueue = newBaseQueue( @@ -976,9 +979,10 @@ func (rq *replicateQueue) shedLease( // ReplicaLeaseMover handles lease transfers for a single range. type ReplicaLeaseMover interface { + // Desc returns the range descriptor of the range. + Desc() *roachpb.RangeDescriptor // AdminTransferLease moves the lease to the requested store. AdminTransferLease(ctx context.Context, target roachpb.StoreID, bypassSafetyChecks bool) error - // String returns info about the replica. String() string } @@ -1028,11 +1032,21 @@ func (rq *replicateQueue) TransferLease( ) error { rq.metrics.TransferLeaseCount.Inc(1) log.KvDistribution.Infof(ctx, "transferring lease to s%d", target) - if err := rlm.AdminTransferLease(ctx, target.StoreID, false /* bypassSafetyChecks */); err != nil { + // Inform allocator sync that the change has been applied which applies + // changes to store pool and inform mma. + changeID := rq.as.NonMMAPreTransferLease( + rlm.Desc(), + rangeUsageInfo, + source, + target, + ) + + err := rlm.AdminTransferLease(ctx, target.StoreID, false /* bypassSafetyChecks */) + rq.as.PostApply(changeID, err == nil /*success*/) + + if err != nil { return errors.Wrapf(err, "%s: unable to transfer lease to s%d", rlm, target) } - - rq.storePool.UpdateLocalStoresAfterLeaseTransfer(source.StoreID, target.StoreID, rangeUsageInfo) return nil } @@ -1062,21 +1076,24 @@ func (rq *replicateQueue) changeReplicas( reason kvserverpb.RangeLogEventReason, details string, ) error { + // Inform allocator sync that the change has been applied which applies + // changes to store pool and inform mma. + changeID := rq.as.NonMMAPreChangeReplicas( + desc, + rangeUsageInfo, + chgs, + repl.StoreID(), + ) // NB: this calls the impl rather than ChangeReplicas because // the latter traps tests that try to call it while the replication // queue is active. - if _, err := repl.changeReplicasImpl( + _, err := repl.changeReplicasImpl( ctx, desc, kvserverpb.SnapshotRequest_REPLICATE_QUEUE, allocatorPriority, reason, details, chgs, - ); err != nil { - return err - } - // On success, update local store pool to reflect the result of applying the - // operations. - for _, chg := range chgs { - rq.storePool.UpdateLocalStoreAfterRebalance(chg.Target.StoreID, rangeUsageInfo, chg.ChangeType) - } - return nil + ) + + rq.as.PostApply(changeID, err == nil /*success*/) + return err } func (*replicateQueue) postProcessScheduled( diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 83cbd69f529b..c8d1eca5862f 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -50,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" loadmonitor "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" @@ -1184,6 +1185,7 @@ type StoreConfig struct { // One MMAllocator per node which guides mma store rebalancer to make // allocation changes when LBRebalancingMultiMetric is enabled. MMAllocator mmaprototype.Allocator + AllocatorSync *mmaintegration.AllocatorSync Transport *RaftTransport NodeDialer *nodedialer.Dialer RPCContext *rpc.Context diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 3a721732751a..b48587c49c18 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -129,7 +129,6 @@ go_library( "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/allocator/mmaprototype", - "//pkg/kv/kvserver/allocator/mmaprototypehelpers", "//pkg/kv/kvserver/allocator/plan", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/asim/state", @@ -149,6 +148,7 @@ go_library( "//pkg/kv/kvserver/logstore", "//pkg/kv/kvserver/loqrecovery", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb", + "//pkg/kv/kvserver/mmaintegration", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptprovider", "//pkg/kv/kvserver/protectedts/ptreconcile", diff --git a/pkg/server/server.go b/pkg/server/server.go index ace1a7c2d17a..f52ac2482bd6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -38,7 +38,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvprober" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototypehelpers" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" @@ -55,6 +54,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/mmaintegration" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptreconcile" @@ -904,6 +904,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf mmaAllocator := mmaprototype.NewAllocatorState(timeutil.DefaultTimeSource{}, rand.New(rand.NewSource(timeutil.Now().UnixNano()))) + allocatorSync := mmaintegration.NewAllocatorSync(storePool, mmaAllocator, st) g.RegisterCallback( gossip.MakePrefixPattern(gossip.KeyStoreDescPrefix), func(_ string, content roachpb.Value, origTimestampNanos int64) { @@ -912,7 +913,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf log.Errorf(ctx, "%v", err) return } - storeLoadMsg := mmaprototypehelpers.MakeStoreLoadMsg(storeDesc, origTimestampNanos) + storeLoadMsg := mmaintegration.MakeStoreLoadMsg(storeDesc, origTimestampNanos) mmaAllocator.SetStore(state.StoreAttrAndLocFromDesc(storeDesc)) mmaAllocator.ProcessStoreLoadMsg(context.TODO(), &storeLoadMsg) }, @@ -930,6 +931,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf StoreLiveness: storeLiveness, StorePool: storePool, MMAllocator: mmaAllocator, + AllocatorSync: allocatorSync, Transport: raftTransport, NodeDialer: kvNodeDialer, RPCContext: rpcContext,