Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public DesiredBalance compute(
Predicate<DesiredBalanceInput> isFresh
) {

logger.trace("starting to recompute desired balance for [{}]", desiredBalanceInput.index());
logger.debug("Recomputing desired balance for [{}]", desiredBalanceInput.index());

final var routingAllocation = desiredBalanceInput.routingAllocation().mutableCloneForSimulation();
final var routingNodes = routingAllocation.routingNodes();
Expand Down Expand Up @@ -229,17 +229,18 @@ public DesiredBalance compute(
// TODO maybe expose interim desired balances computed here

if (hasChanges == false) {
logger.trace("desired balance computation converged after {} iterations", i);
logger.debug("Desired balance computation converged after {} iterations", i);
break;
}
if (isFresh.test(desiredBalanceInput) == false) {
// we run at least one iteration, but if another reroute happened meanwhile
// then publish the interim state and restart the calculation
logger.trace("newer cluster state received, publishing incomplete desired balance and restarting computation");
logger.debug("Newer cluster state received, publishing incomplete desired balance and restarting computation");
break;
}
if (i > 0 && i % 100 == 0) {
logger.warn("desired balance computation is still not converged after {} iterations", i);
// TODO this warning should be time based, iteration count should be proportional to the number of shards
logger.debug("Desired balance computation is still not converged after {} iterations", i);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class DesiredBalanceReconciler {

void run() {

logger.trace("starting to reconcile current allocation with desired balance {}", desiredBalance);
logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());

if (routingNodes.size() == 0) {
// no data nodes, so fail allocation to report red health
Expand Down Expand Up @@ -85,7 +85,7 @@ void run() {
logger.trace("Reconciler#balance");
balance();

logger.trace("done");
logger.debug("Reconciliation is complete");
}

private boolean allocateUnassignedInvariant() {
Expand Down Expand Up @@ -190,11 +190,6 @@ private void allocateUnassigned() {
int primaryLength = primary.length;
ArrayUtil.timSort(primary, comparator);

// TODO this should be removed before merging the feature branch
if (logger.isTraceEnabled()) {
allocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS);
}

do {
nextShard: for (int i = 0; i < primaryLength; i++) {
final var shard = primary[i];
Expand Down Expand Up @@ -260,12 +255,7 @@ private void allocateUnassigned() {
case THROTTLE -> isThrottled = true;
case NO -> {
if (logger.isTraceEnabled()) {
logger.trace(
"Unexpected NO decision [{}] for shard [{}] on assigned node [{}]",
decision,
shard.shardId(),
desiredNodeId
);
logger.trace("Couldn't assign shard [{}] to [{}]", shard.shardId(), desiredNodeId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,17 @@ public DesiredBalanceShardsAllocator(
@Override
protected void processInput(DesiredBalanceInput desiredBalanceInput) {

logger.trace("Computing balance for [{}]", desiredBalanceInput.index());
long index = desiredBalanceInput.index();
logger.debug("Starting desired balance computation for [{}]", index);

setCurrentDesiredBalance(
desiredBalanceComputer.compute(currentDesiredBalance, desiredBalanceInput, pendingDesiredBalanceMoves, this::isFresh)
);
if (isFresh(desiredBalanceInput)) {
logger.trace("Scheduling a reconciliation");
logger.debug("Desired balance computation for [{}] is completed, scheduling reconciliation", index);
submitReconcileTask(currentDesiredBalance);
} else {
logger.trace("outdated");
logger.debug("Desired balance computation for [{}] is discarded as newer one is submitted", index);
}
}

Expand Down Expand Up @@ -131,7 +132,7 @@ public void allocate(RoutingAllocation allocation, ActionListener<Void> listener
// TODO must also capture any shards that the existing-shards allocators have allocated this pass, not just the ignored ones

var index = indexGenerator.incrementAndGet();
logger.trace("Executing allocate for [{}]", index);
logger.debug("Executing allocate for [{}]", index);
queue.add(index, listener);
desiredBalanceComputation.onNewInput(
new DesiredBalanceInput(index, allocation.immutableClone(), new ArrayList<>(allocation.routingNodes().unassigned().ignored()))
Expand Down Expand Up @@ -165,11 +166,12 @@ private static List<MoveAllocationCommand> getMoveCommands(AllocationCommands co

private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
if (logger.isTraceEnabled()) {
if (DesiredBalance.hasChanges(currentDesiredBalance, newDesiredBalance)) {
logger.trace("desired balance changed: {}\n{}", newDesiredBalance, diff(currentDesiredBalance, newDesiredBalance));
} else {
logger.trace("desired balance unchanged: {}", newDesiredBalance);
}
var diff = DesiredBalance.hasChanges(currentDesiredBalance, newDesiredBalance)
? "Diff: " + diff(currentDesiredBalance, newDesiredBalance)
: "No changes";
logger.trace("Desired balance updated: {}. {}", newDesiredBalance, diff);
} else {
logger.debug("Desired balance updated for [{}]", newDesiredBalance.lastConvergedIndex());
}
currentDesiredBalance = newDesiredBalance;
}
Expand All @@ -184,6 +186,11 @@ protected void submitReconcileTask(DesiredBalance desiredBalance) {
}

protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) {
if (logger.isTraceEnabled()) {
logger.trace("Reconciling desired balance: {}", desiredBalance);
} else {
logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());
}
allocationOrdering.retainNodes(getNodeIds(allocation.routingNodes()));
new DesiredBalanceReconciler(desiredBalance, allocation, allocationOrdering).run();
}
Expand Down