Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8f3a844
HDDS-12547. Container creation and import use the same VolumeChoosing…
peterxcli Mar 15, 2025
8d1e90a
Don't handle exception in VolumeChoosingPolicyFactory#getSharedPolicy
peterxcli Mar 15, 2025
ecb4c38
Move to VolumeChoosingPolicy DatanodeStateMachine and pass it to KeyV…
peterxcli Mar 18, 2025
fa00382
Adjust others to fit new interface of the volumeChoosingPolicy place …
peterxcli Mar 18, 2025
59e06cb
Merge branch 'master' of https://github.com/apache/ozone into hdds125…
peterxcli Mar 18, 2025
71aacee
Convert reflection-related exceptions in getPolicy instead of its cal…
peterxcli Mar 18, 2025
e1e13e3
Let KeyValueHandler create the policy if they get null instance (for …
peterxcli Mar 18, 2025
924eb8a
Add comment for the change in CapacityVolumeChoosingPolicy
peterxcli Mar 18, 2025
31b935e
Make VolumeChoosingPolicy#chooseVolume method synchronized
peterxcli Mar 19, 2025
5ead5eb
Revert threadLocalRandom
peterxcli Mar 20, 2025
b1bc0d6
revert unnecessary change
peterxcli Mar 21, 2025
88028ca
Move commit volume space to VolumeChoosingPolicy to make it atomic
peterxcli Mar 21, 2025
90740b7
Set committedSpace for containerData manually after choosing volume
peterxcli Mar 21, 2025
f86e8fc
Add test coverage for VolumeChoosingPolicy(s)
peterxcli Mar 21, 2025
46842c1
Merge remote-tracking branch 'upstream/master' into hdds12547-contain…
peterxcli Mar 31, 2025
f2d22b8
Revert "Add test coverage for VolumeChoosingPolicy(s)"
peterxcli Apr 10, 2025
d05c13f
Revert "Set committedSpace for containerData manually after choosing …
peterxcli Apr 10, 2025
c04ba06
Revert "Move commit volume space to VolumeChoosingPolicy to make it a…
peterxcli Apr 10, 2025
afdd3ff
Remove synchronized from VolumeChoosingPolicy#chooseVolume at this time
peterxcli Apr 11, 2025
c6659a0
keep patch cleaner
peterxcli Apr 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Clock;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Expand Down Expand Up @@ -67,16 +68,18 @@ protected Handler(ConfigurationSource config, String datanodeId,
this.icrSender = icrSender;
}

@SuppressWarnings("checkstyle:ParameterNumber")
public static Handler getHandlerForContainerType(
final ContainerType containerType, final ConfigurationSource config,
final String datanodeId, final ContainerSet contSet,
final VolumeSet volumeSet, final ContainerMetrics metrics,
final VolumeSet volumeSet, final VolumeChoosingPolicy volumeChoosingPolicy,
final ContainerMetrics metrics,
IncrementalReportSender<Container> icrSender) {
switch (containerType) {
case KeyValueContainer:
return new KeyValueHandler(config,
datanodeId, contSet, volumeSet, metrics,
icrSender);
datanodeId, contSet, volumeSet, volumeChoosingPolicy, metrics,
icrSender, Clock.systemUTC());
default:
throw new IllegalArgumentException("Handler for ContainerType: " +
containerType + "doesn't exist.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ClosePipelineCommandHandler;
Expand All @@ -61,6 +62,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
Expand Down Expand Up @@ -98,6 +100,7 @@ public class DatanodeStateMachine implements Closeable {
private final SCMConnectionManager connectionManager;
private final ECReconstructionCoordinator ecReconstructionCoordinator;
private StateContext context;
private VolumeChoosingPolicy volumeChoosingPolicy;
private final OzoneContainer container;
private final DatanodeDetails datanodeDetails;
private final CommandDispatcher commandDispatcher;
Expand Down Expand Up @@ -174,13 +177,14 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
connectionManager = new SCMConnectionManager(conf);
context = new StateContext(this.conf, DatanodeStates.getInitState(), this,
threadNamePrefix);
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
// OzoneContainer instance is used in a non-thread safe way by the context
// past to its constructor, so we much synchronize its access. See
// HDDS-3116 for more details.
constructionLock.writeLock().lock();
try {
container = new OzoneContainer(hddsDatanodeService, this.datanodeDetails,
conf, context, certClient, secretKeyClient);
conf, context, certClient, secretKeyClient, volumeChoosingPolicy);
} finally {
constructionLock.writeLock().unlock();
}
Expand All @@ -189,7 +193,8 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
ContainerImporter importer = new ContainerImporter(conf,
container.getContainerSet(),
container.getController(),
container.getVolumeSet());
container.getVolumeSet(),
volumeChoosingPolicy);
ContainerReplicator pullReplicator = new DownloadAndImportReplicator(
conf, container.getContainerSet(),
importer,
Expand Down Expand Up @@ -745,4 +750,8 @@ public DatanodeQueueMetrics getQueueMetrics() {
public ReconfigurationHandler getReconfigurationHandler() {
return reconfigurationHandler;
}

public VolumeChoosingPolicy getVolumeChoosingPolicy() {
return volumeChoosingPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.ratis.util.ReflectionUtils;

/**
* A factory to create volume choosing policy instance based on configuration
Expand All @@ -35,10 +36,10 @@ public final class VolumeChoosingPolicyFactory {
private VolumeChoosingPolicyFactory() {
}

public static VolumeChoosingPolicy getPolicy(ConfigurationSource conf)
throws InstantiationException, IllegalAccessException {
return conf.getClass(HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
DEFAULT_VOLUME_CHOOSING_POLICY, VolumeChoosingPolicy.class)
.newInstance();
public static VolumeChoosingPolicy getPolicy(ConfigurationSource conf) {
Class<? extends VolumeChoosingPolicy> policyClass = conf.getClass(
HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
DEFAULT_VOLUME_CHOOSING_POLICY, VolumeChoosingPolicy.class);
return ReflectionUtils.newInstance(policyClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,15 @@ public KeyValueHandler(ConfigurationSource config,
VolumeSet volSet,
ContainerMetrics metrics,
IncrementalReportSender<Container> icrSender) {
this(config, datanodeId, contSet, volSet, metrics, icrSender, Clock.systemUTC());
this(config, datanodeId, contSet, volSet, null, metrics, icrSender, Clock.systemUTC());
}

@SuppressWarnings("checkstyle:ParameterNumber")
public KeyValueHandler(ConfigurationSource config,
String datanodeId,
ContainerSet contSet,
VolumeSet volSet,
VolumeChoosingPolicy volumeChoosingPolicy,
ContainerMetrics metrics,
IncrementalReportSender<Container> icrSender,
Clock clock) {
Expand All @@ -174,11 +176,8 @@ public KeyValueHandler(ConfigurationSource config,
DatanodeConfiguration.class).isChunkDataValidationCheck();
chunkManager = ChunkManagerFactory.createChunkManager(config, blockManager,
volSet);
try {
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
this.volumeChoosingPolicy = volumeChoosingPolicy != null ? volumeChoosingPolicy
: VolumeChoosingPolicyFactory.getPolicy(config);

maxContainerSize = (long) config.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
Expand Down Expand Up @@ -216,11 +215,6 @@ public KeyValueHandler(ConfigurationSource config,
}
}

@VisibleForTesting
public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() {
return volumeChoosingPolicy;
}

@Override
public StateMachine.DataChannel getStreamDataChannel(
Container container, ContainerCommandRequestProto msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
Expand Down Expand Up @@ -152,7 +153,8 @@ enum InitializingStatus {
public OzoneContainer(HddsDatanodeService hddsDatanodeService,
DatanodeDetails datanodeDetails, ConfigurationSource conf,
StateContext context, CertificateClient certClient,
SecretKeyVerifierClient secretKeyClient) throws IOException {
SecretKeyVerifierClient secretKeyClient,
VolumeChoosingPolicy volumeChoosingPolicy) throws IOException {
config = conf;
this.datanodeDetails = datanodeDetails;
this.context = context;
Expand Down Expand Up @@ -214,7 +216,7 @@ public OzoneContainer(HddsDatanodeService hddsDatanodeService,
Handler.getHandlerForContainerType(
containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics, icrSender));
containerSet, volumeSet, volumeChoosingPolicy, metrics, icrSender));
}

SecurityConfig secConf = new SecurityConfig(conf);
Expand All @@ -239,7 +241,7 @@ public OzoneContainer(HddsDatanodeService hddsDatanodeService,
secConf,
certClient,
new ContainerImporter(conf, containerSet, controller,
volumeSet),
volumeSet, volumeChoosingPolicy),
datanodeDetails.threadNamePrefix());

readChannel = new XceiverServerGrpc(
Expand Down Expand Up @@ -299,8 +301,9 @@ public OzoneContainer(HddsDatanodeService hddsDatanodeService,
@VisibleForTesting
public OzoneContainer(
DatanodeDetails datanodeDetails, ConfigurationSource conf,
StateContext context) throws IOException {
this(null, datanodeDetails, conf, context, null, null);
StateContext context, VolumeChoosingPolicy volumeChoosingPolicy)
throws IOException {
this(null, datanodeDetails, conf, context, null, null, volumeChoosingPolicy);
}

public GrpcTlsConfig getTlsClientConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
Expand Down Expand Up @@ -71,15 +70,12 @@ public class ContainerImporter {
public ContainerImporter(@Nonnull ConfigurationSource conf,
@Nonnull ContainerSet containerSet,
@Nonnull ContainerController controller,
@Nonnull MutableVolumeSet volumeSet) {
@Nonnull MutableVolumeSet volumeSet,
@Nonnull VolumeChoosingPolicy volumeChoosingPolicy) {
this.containerSet = containerSet;
this.controller = controller;
this.volumeSet = volumeSet;
try {
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
this.volumeChoosingPolicy = volumeChoosingPolicy;
containerSize = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
Expand Down Expand Up @@ -132,7 +133,8 @@ public static OzoneContainer getOzoneContainer(
DatanodeDetails datanodeDetails, OzoneConfiguration conf)
throws IOException {
StateContext context = getMockContext(datanodeDetails, conf);
return new OzoneContainer(datanodeDetails, conf, context);
VolumeChoosingPolicy volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
return new OzoneContainer(datanodeDetails, conf, context, volumeChoosingPolicy);
}

public static StateContext getMockContext(DatanodeDetails datanodeDetails,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
import org.apache.hadoop.ozone.container.common.volume.CapacityVolumeChoosingPolicy;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -204,6 +206,8 @@ public void testDatanodeStateContext() throws IOException,
ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath, conf);
try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(datanodeDetails, conf)) {
VolumeChoosingPolicy volumeChoosingPolicy = stateMachine.getVolumeChoosingPolicy();
assertEquals(CapacityVolumeChoosingPolicy.class, volumeChoosingPolicy.getClass());
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
Expand All @@ -87,13 +88,15 @@
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.security.token.Token;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
Expand All @@ -111,10 +114,17 @@ public class TestHddsDispatcher {
@TempDir
private File testDir;

private static VolumeChoosingPolicy volumeChoosingPolicy;

public static final IncrementalReportSender<Container> NO_OP_ICR_SENDER =
c -> {
};

@BeforeAll
public static void init() {
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(new OzoneConfiguration());
}

@ContainerLayoutTestInfo.ContainerTest
public void testContainerCloseActionWhenFull(
ContainerLayoutVersion layout) throws IOException {
Expand Down Expand Up @@ -147,7 +157,7 @@ public void testContainerCloseActionWhenFull(
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
containerSet, volumeSet, volumeChoosingPolicy, metrics, NO_OP_ICR_SENDER));
}
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics, null);
Expand Down Expand Up @@ -284,7 +294,7 @@ public void testContainerCloseActionWhenVolumeFull(
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
containerSet, volumeSet, volumeChoosingPolicy, metrics, NO_OP_ICR_SENDER));
}
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics, null);
Expand Down Expand Up @@ -533,7 +543,7 @@ static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
containerSet, volumeSet, volumeChoosingPolicy, metrics, NO_OP_ICR_SENDER));
}

final HddsDispatcher hddsDispatcher = new HddsDispatcher(conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.ozone.container.common.impl.TestHddsDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -56,6 +57,7 @@ public void setup() throws Exception {
this.conf = new OzoneConfiguration();
this.containerSet = mock(ContainerSet.class);
this.volumeSet = mock(MutableVolumeSet.class);
VolumeChoosingPolicy volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
DatanodeDetails datanodeDetails = mock(DatanodeDetails.class);
StateContext context = ContainerTestUtils.getMockContext(
datanodeDetails, conf);
Expand All @@ -67,7 +69,7 @@ public void setup() throws Exception {
Handler.getHandlerForContainerType(
containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics,
containerSet, volumeSet, volumeChoosingPolicy, metrics,
TestHddsDispatcher.NO_OP_ICR_SENDER));
}
this.dispatcher = new HddsDispatcher(
Expand Down
Loading