Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public class DatanodeConfiguration {
private static final Logger LOG =
LoggerFactory.getLogger(DatanodeConfiguration.class);

static final String REPLICATION_STREAMS_LIMIT_KEY =
"hdds.datanode.replication.streams.limit";
static final String CONTAINER_DELETE_THREADS_MAX_KEY =
"hdds.datanode.container.delete.threads.max";
static final String PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY =
Expand All @@ -57,8 +55,6 @@ public class DatanodeConfiguration {

static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;

static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;

static final long PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT = 60;

static final int FAILED_VOLUMES_TOLERATED_DEFAULT = -1;
Expand All @@ -71,19 +67,6 @@ public class DatanodeConfiguration {
static final long DISK_CHECK_TIMEOUT_DEFAULT =
Duration.ofMinutes(10).toMillis();

/**
* The maximum number of replication commands a single datanode can execute
* simultaneously.
*/
@Config(key = "replication.streams.limit",
type = ConfigType.INT,
defaultValue = "10",
tags = {DATANODE},
description = "The maximum number of replication commands a single " +
"datanode can execute simultaneously"
)
private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;

/**
* Number of threads per volume that Datanode will use for chunk read.
*/
Expand Down Expand Up @@ -264,13 +247,6 @@ public void setWaitOnAllFollowers(boolean val) {

@PostConstruct
public void validate() {
if (replicationMaxStreams < 1) {
LOG.warn(REPLICATION_STREAMS_LIMIT_KEY + " must be greater than zero " +
"and was set to {}. Defaulting to {}",
replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT);
replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
}

if (containerDeleteThreads < 1) {
LOG.warn(CONTAINER_DELETE_THREADS_MAX_KEY + " must be greater than zero" +
" and was set to {}. Defaulting to {}",
Expand Down Expand Up @@ -316,18 +292,10 @@ public void validate() {
}
}

public void setReplicationMaxStreams(int replicationMaxStreams) {
this.replicationMaxStreams = replicationMaxStreams;
}

public void setContainerDeleteThreads(int containerDeleteThreads) {
this.containerDeleteThreads = containerDeleteThreads;
}

public int getReplicationMaxStreams() {
return replicationMaxStreams;
}

public int getContainerDeleteThreads() {
return containerDeleteThreads;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
import org.apache.hadoop.ozone.container.replication.MeasuredReplicator;
import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics;
import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
Expand Down Expand Up @@ -166,9 +167,11 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,

replicatorMetrics = new MeasuredReplicator(replicator);

ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);
supervisor =
new ReplicationSupervisor(container.getContainerSet(), context,
replicatorMetrics, dnConf.getReplicationMaxStreams());
replicatorMetrics, replicationConfig);

replicationSupervisorMetrics =
ReplicationSupervisorMetrics.create(supervisor);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -22,7 +22,8 @@

import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.PostConstruct;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
Expand All @@ -39,6 +40,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
import static org.apache.hadoop.hdds.conf.ConfigTag.MANAGEMENT;

/**
* Separated network server for server2server container replication.
*/
Expand Down Expand Up @@ -128,12 +132,33 @@ public int getPort() {
/**
* Replication-related configuration.
*/
@ConfigGroup(prefix = "hdds.datanode.replication")
@ConfigGroup(prefix = ReplicationConfig.PREFIX)
public static final class ReplicationConfig {

@Config(key = "port", defaultValue = "9886", description = "Port used for"
+ " the server2server replication server", tags = {
ConfigTag.MANAGEMENT})
public static final String PREFIX = "hdds.datanode.replication";
public static final String STREAMS_LIMIT_KEY = "streams.limit";

public static final String REPLICATION_STREAMS_LIMIT_KEY =
PREFIX + "." + STREAMS_LIMIT_KEY;

public static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;

/**
* The maximum number of replication commands a single datanode can execute
* simultaneously.
*/
@Config(key = STREAMS_LIMIT_KEY,
type = ConfigType.INT,
defaultValue = "10",
tags = {DATANODE},
description = "The maximum number of replication commands a single " +
"datanode can execute simultaneously"
)
private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;

@Config(key = "port", defaultValue = "9886",
description = "Port used for the server2server replication server",
tags = {DATANODE, MANAGEMENT})
private int port;

public int getPort() {
Expand All @@ -144,6 +169,25 @@ public ReplicationConfig setPort(int portParam) {
this.port = portParam;
return this;
}

public int getReplicationMaxStreams() {
return replicationMaxStreams;
}

public void setReplicationMaxStreams(int replicationMaxStreams) {
this.replicationMaxStreams = replicationMaxStreams;
}

@PostConstruct
public void validate() {
if (replicationMaxStreams < 1) {
LOG.warn(REPLICATION_STREAMS_LIMIT_KEY + " must be greater than zero " +
"and was set to {}. Defaulting to {}",
replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT);
replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -71,6 +72,13 @@ public class ReplicationSupervisor {
this.context = context;
}

public ReplicationSupervisor(
ContainerSet containerSet, StateContext context,
ContainerReplicator replicator, ReplicationConfig replicationConfig) {
this(containerSet, context, replicator,
replicationConfig.getReplicationMaxStreams());
}

public ReplicationSupervisor(
ContainerSet containerSet, StateContext context,
ContainerReplicator replicator, int poolSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_MIN_GAP_DEFAULT;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.DISK_CHECK_TIMEOUT_KEY;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_MAX_STREAMS_DEFAULT;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.REPLICATION_STREAMS_LIMIT_KEY;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DATA_VOLUMES_TOLERATED_KEY;
Expand All @@ -46,14 +44,12 @@ public class TestDatanodeConfiguration {
@Test
public void acceptsValidValues() {
// GIVEN
int validReplicationLimit = 123;
int validDeleteThreads = 42;
long validDiskCheckIntervalMinutes = 60;
int validFailedVolumesTolerated = 10;
long validDiskCheckMinGap = 2;
long validDiskCheckTimeout = 1;
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit);
conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, validDeleteThreads);
conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
validDiskCheckIntervalMinutes);
Expand All @@ -70,7 +66,6 @@ public void acceptsValidValues() {
DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class);

// THEN
assertEquals(validReplicationLimit, subject.getReplicationMaxStreams());
assertEquals(validDeleteThreads, subject.getContainerDeleteThreads());
assertEquals(validDiskCheckIntervalMinutes,
subject.getPeriodicDiskCheckIntervalMinutes());
Expand All @@ -87,14 +82,12 @@ public void acceptsValidValues() {
@Test
public void overridesInvalidValues() {
// GIVEN
int invalidReplicationLimit = -5;
int invalidDeleteThreads = 0;
long invalidDiskCheckIntervalMinutes = -1;
int invalidFailedVolumesTolerated = -2;
long invalidDiskCheckMinGap = -1;
long invalidDiskCheckTimeout = -1;
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit);
conf.setInt(CONTAINER_DELETE_THREADS_MAX_KEY, invalidDeleteThreads);
conf.setLong(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY,
invalidDiskCheckIntervalMinutes);
Expand All @@ -111,8 +104,6 @@ public void overridesInvalidValues() {
DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class);

// THEN
assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
subject.getReplicationMaxStreams());
assertEquals(CONTAINER_DELETE_THREADS_DEFAULT,
subject.getContainerDeleteThreads());
assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
Expand All @@ -136,8 +127,6 @@ public void isCreatedWitDefaultValues() {
DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class);

// THEN
assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
subject.getReplicationMaxStreams());
assertEquals(CONTAINER_DELETE_THREADS_DEFAULT,
subject.getContainerDeleteThreads());
assertEquals(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.junit.Test;

import static org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_MAX_STREAMS_DEFAULT;
import static org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY;
import static org.junit.Assert.assertEquals;

/**
* Tests for {@link ReplicationConfig}.
*/
public class TestReplicationConfig {

@Test
public void acceptsValidValues() {
// GIVEN
int validReplicationLimit = 123;
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit);

// WHEN
ReplicationConfig subject = conf.getObject(ReplicationConfig.class);

// THEN
assertEquals(validReplicationLimit, subject.getReplicationMaxStreams());
}

@Test
public void overridesInvalidValues() {
// GIVEN
int invalidReplicationLimit = -5;
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit);

// WHEN
ReplicationConfig subject = conf.getObject(ReplicationConfig.class);

// THEN
assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
subject.getReplicationMaxStreams());
}

@Test
public void isCreatedWitDefaultValues() {
// GIVEN
OzoneConfiguration conf = new OzoneConfiguration();

// WHEN
ReplicationConfig subject = conf.getObject(ReplicationConfig.class);

// THEN
assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
subject.getReplicationMaxStreams());
}

}