Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
3933e57
HDDS-6280. Support Container Balancer HA
siddhantsangwan May 16, 2022
c7633ba
Set nextIterationIndex to 0 in ContainerBalancer#setBalancerConfigOnS…
siddhantsangwan May 17, 2022
b1a80d7
improve exception handling
siddhantsangwan May 17, 2022
4b7cd67
minor change
siddhantsangwan May 17, 2022
bc99004
register ContainerBalancer with SCMServiceManager
siddhantsangwan May 20, 2022
0501ef6
test that configuration is persisted in all SCMs
siddhantsangwan May 20, 2022
276d748
mock SCMServiceManager in TestContainerBalancer
siddhantsangwan May 20, 2022
93dbdff
Merge branch 'TestContainerBalancerHA' into HDDS-6280
siddhantsangwan May 20, 2022
6cbdf21
rat and checkstyle
siddhantsangwan May 20, 2022
c65f297
trigger new CI check
siddhantsangwan May 23, 2022
d0c56b3
address review comments and wait for followers to catch up with leade…
siddhantsangwan May 23, 2022
f2c279e
save reference to balancer configuration in ContainerBalancer#setBala…
siddhantsangwan May 23, 2022
d1d090f
Merge branch 'master' into HDDS-6280
siddhantsangwan May 23, 2022
f592ebd
update testContainerBalancerPersistsConfigurationInAllSCMs
siddhantsangwan May 24, 2022
61620cc
add timed checks for isBalancerRunning and shouldRun in testContainer…
siddhantsangwan May 24, 2022
4b40d99
introduced ContainerBalancer#tryStopBalancer for better code reuse
siddhantsangwan May 25, 2022
066f284
use stop instead of stopBalancer in StorageContainerManager#stop
siddhantsangwan May 26, 2022
20fe445
Merge branch 'master' into HDDS-6280
siddhantsangwan May 26, 2022
58f4e81
add checks for LOG.isDebugEnabled()
siddhantsangwan Jun 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public ContainerBalancer(StorageContainerManager scm) {

this.lock = new ReentrantLock();
findSourceStrategy = new FindSourceGreedy(nodeManager);
scm.getSCMServiceManager().register(this);
}

/**
Expand Down Expand Up @@ -196,12 +197,8 @@ private void balance() {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a balancer running check currentThread = null inside stopBalancer, so that the two operations will be protected by a single lock. if we first call isBalancerRunning() and then call stopBalancer() , there might be a case that between the two operation in a single thread, stopbalancer is called from another thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopBalancer calls validateState(true), which checks if balancer is currently running. Is this what you're looking for?

// otherwise, try to stop balancer
try {
stopBalancer();
} catch (IOException | IllegalContainerBalancerStateException e) {
LOG.warn("Tried and failed to stop Container Balancer when it " +
"could not initialize an iteration", e);
}
tryStopBalancer("Could not initialize ContainerBalancer's " +
"iteration number " + i);
return;
}

Expand All @@ -211,30 +208,18 @@ private void balance() {

// persist next iteration index
if (iR == IterationResult.ITERATION_COMPLETED) {
lock.lock();
try {
saveConfiguration(
config.toProtobufBuilder()
.setShouldRun(true)
.setNextIterationIndex(i + 1)
.build());
saveConfiguration(config, true, i + 1);
} catch (IOException e) {
LOG.warn("Could not persist next iteration index value for " +
"ContainerBalancer after completing an iteration", e);
} finally {
lock.unlock();
}
}

// if no new move option is generated, it means the cluster cannot be
// balanced anymore; so just stop balancer
if (iR == IterationResult.CAN_NOT_BALANCE_ANY_MORE) {
try {
stopBalancer();
} catch (IOException | IllegalContainerBalancerStateException e) {
LOG.warn("Tried and failed to stop Container Balancer when result " +
"of the latest iteration was " + iR, e);
}
tryStopBalancer(iR.toString());
return;
}

Expand All @@ -260,13 +245,8 @@ private void balance() {
}

// finally, stop balancer if it hasn't been stopped already
try {
if (isBalancerRunning()) {
stopBalancer();
}
} catch (IOException | IllegalContainerBalancerStateException e) {
LOG.warn("Failed to stop Container Balancer after it completed all " +
"iterations", e);
if (isBalancerRunning()) {
tryStopBalancer("Completed all iterations.");
}
}

Expand All @@ -279,7 +259,7 @@ private void balance() {
*/
private boolean initializeIteration() {
if (scmContext.isInSafeMode()) {
LOG.warn("Container Balancer cannot operate while SCM is in Safe Mode.");
LOG.error("Container Balancer cannot operate while SCM is in Safe Mode.");
return false;
}
if (!scmContext.isLeader()) {
Expand Down Expand Up @@ -861,11 +841,13 @@ public void notifyStatusChanged() {
try {
if (!scmContext.isLeader() || scmContext.isInSafeMode()) {
if (isBalancerRunning()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check and set the ServiceStatus here? for example

      if (scmContext.isLeaderReady() && !scmContext.isInSafeMode()) {
        if (serviceStatus != ServiceStatus.RUNNING) {
          LOG.info("Service {} transitions to RUNNING.", getServiceName());
          serviceStatus = ServiceStatus.RUNNING;
          lastTimeToBeReadyInMillis = clock.millis();
        }
      } else {
        if (serviceStatus != ServiceStatus.PAUSING) {
          LOG.info("Service {} transitions to PAUSING.", getServiceName());
          serviceStatus = ServiceStatus.PAUSING;
        }
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to my previous reply, I think holding state in ServiceStatus along with persisting it in RocksDB would make the logic a bit complex. We'd then have three ways of checking state - ServiceStatus, RocksDB, and checking the current thread for null. What do you think @JacksonYao287 ?

LOG.info("Stopping ContainerBalancer in this scm on status change");
stop();
}
} else {
if (shouldRun()) {
try {
LOG.info("Starting ContainerBalancer in this scm on status change");
start();
} catch (IllegalContainerBalancerStateException |
InvalidContainerBalancerConfigurationException e) {
Expand Down Expand Up @@ -1071,16 +1053,33 @@ public void stopBalancer()
try {
// should be leader, out of safe mode, and currently running
validateState(true);
saveConfiguration(config.toProtobufBuilder()
.setShouldRun(false)
.setNextIterationIndex(0)
.build());
saveConfiguration(config, false, 0);
stop();
} finally {
lock.unlock();
}
}

/**
* Tries to stop ContainerBalancer. Logs the reason for stopping. Calls
* {@link ContainerBalancer#stopBalancer()}.
* @param stopReason a string specifying the reason for stopping
* ContainerBalancer.
*/
private void tryStopBalancer(String stopReason) {
lock.lock();
try {
LOG.info("Stopping ContainerBalancer. Reason for stopping: {}",
stopReason);
stopBalancer();
} catch (IllegalContainerBalancerStateException | IOException e) {
LOG.warn("Tried to stop ContainerBalancer but failed. Reason for " +
"stopping: {}", stopReason, e);
} finally {
lock.unlock();
}
}

private void stopBalancingThread() {
Thread balancingThread;
lock.lock();
Expand All @@ -1103,6 +1102,20 @@ private void stopBalancingThread() {
LOG.info("Container Balancer stopped successfully.");
}

private void saveConfiguration(ContainerBalancerConfiguration configuration,
boolean shouldRun, int index)
throws IOException {
lock.lock();
try {
saveConfiguration(configuration.toProtobufBuilder()
.setShouldRun(shouldRun)
.setNextIterationIndex(index)
.build());
} finally {
lock.unlock();
}
}

private void validateConfiguration(ContainerBalancerConfiguration conf)
throws InvalidContainerBalancerConfigurationException {
// maxSizeEnteringTarget and maxSizeLeavingSource should by default be
Expand Down Expand Up @@ -1158,27 +1171,24 @@ public void setOzoneConfiguration(
}

/**
* Persists the configuration that ContainerBalancer will use after validating
* state and the specified configuration.
* @param configuration ContainerBalancerConfiguration to persist
*
* Persists the configuration that ContainerBalancer will use after
* validating state and the specified configuration.
* @param configuration ContainerBalancerConfiguration to persist
* @throws InvalidContainerBalancerConfigurationException on failure to
* validate the specified configuration
* @throws IllegalContainerBalancerStateException if this SCM is not leader
* or not out of safe mode or if ContainerBalancer is currently running in
* this SCM
* @throws IOException on failure to persist configuration
*/
private void setBalancerConfigOnStartBalancer(
ContainerBalancerConfiguration configuration)
throws InvalidContainerBalancerConfigurationException,
IllegalContainerBalancerStateException, IOException {
validateState(false);
validateConfiguration(configuration);
lock.lock();
try {
saveConfiguration(configuration.toProtobufBuilder()
.setShouldRun(true)
.setNextIterationIndex(0)
.build());
this.config = configuration;
} finally {
lock.unlock();
}
saveConfiguration(configuration, true, 0);
this.config = configuration;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ public final class ContainerBalancerConfiguration {
"data node is very high")
private boolean triggerDuEnable = false;

private int nextIterationIndex = 0;

/**
* Gets the threshold value for Container Balancer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.utils.db.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Proxy;
Expand All @@ -34,6 +36,9 @@
public final class StatefulServiceStateManagerImpl
implements StatefulServiceStateManager {

public static final Logger LOG =
LoggerFactory.getLogger(StatefulServiceStateManagerImpl.class);

// this table maps the service name to the configuration (ByteString)
private Table<String, ByteString> statefulServiceConfig;
private final DBTransactionBuffer transactionBuffer;
Expand All @@ -52,10 +57,13 @@ private StatefulServiceStateManagerImpl(
public void saveConfiguration(String serviceName, ByteString bytes)
throws IOException {
transactionBuffer.addToBuffer(statefulServiceConfig, serviceName, bytes);
LOG.debug("Added specified bytes to the transaction buffer for key {} to " +
"table {}", serviceName, statefulServiceConfig.getName());
if (transactionBuffer instanceof SCMHADBTransactionBuffer) {
SCMHADBTransactionBuffer buffer =
(SCMHADBTransactionBuffer) transactionBuffer;
buffer.flush();
LOG.debug("Transaction buffer flushed");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.ha.io;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

/**
* A dummy codec that serializes a ByteString object to ByteString.
*/
public class ByteStringCodec implements Codec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a ByteStringCodec class. Is it possible to reuse it?


@Override
public ByteString serialize(Object object)
throws InvalidProtocolBufferException {
return (ByteString) object;
}

@Override
public Object deserialize(Class<?> type, ByteString value)
throws InvalidProtocolBufferException {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hdds.scm.ha.io;

import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolMessageEnum;
Expand Down Expand Up @@ -45,6 +46,7 @@ public final class CodecFactory {
codecs.put(Boolean.class, new BooleanCodec());
codecs.put(BigInteger.class, new BigIntegerCodec());
codecs.put(X509Certificate.class, new X509CertificateCodec());
codecs.put(ByteString.class, new ByteStringCodec());
}

private CodecFactory() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,8 @@ public void stop() {
try {
if (containerBalancer.isBalancerRunning()) {
LOG.info("Stopping Container Balancer service.");
containerBalancer.stopBalancer();
// stop ContainerBalancer thread in this scm
containerBalancer.stop();
} else {
LOG.info("Container Balancer is not running.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManager;
import org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
Expand Down Expand Up @@ -111,6 +113,7 @@ public void setup() throws IOException, NodeNotFoundException {
containerManager = Mockito.mock(ContainerManager.class);
replicationManager = Mockito.mock(ReplicationManager.class);
serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class);
SCMServiceManager scmServiceManager = Mockito.mock(SCMServiceManager.class);

// these configs will usually be specified in each test
balancerConfiguration =
Expand Down Expand Up @@ -164,6 +167,7 @@ public void setup() throws IOException, NodeNotFoundException {
when(scm.getEventQueue()).thenReturn(mock(EventPublisher.class));
when(scm.getConfiguration()).thenReturn(conf);
when(scm.getStatefulServiceStateManager()).thenReturn(serviceStateManager);
when(scm.getSCMServiceManager()).thenReturn(scmServiceManager);

/*
When StatefulServiceStateManager#saveConfiguration is called, save to
Expand All @@ -184,6 +188,8 @@ public void setup() throws IOException, NodeNotFoundException {
when(serviceStateManager.readConfiguration(Mockito.anyString())).thenAnswer(
i -> serviceToConfigMap.get(i.getArgument(0, String.class)));

Mockito.doNothing().when(scmServiceManager)
.register(Mockito.any(SCMService.class));
containerBalancer = new ContainerBalancer(scm);
}

Expand Down
Loading