Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
private long totalTime;

/**
* Constructs a ContainerReport handler.
* Constructs a close container command handler.
*/
public CloseContainerCommandHandler(
int threadPoolSize, int queueSize, String threadNamePrefix) {
Expand Down Expand Up @@ -220,4 +220,14 @@ public long getTotalRunTime() {
public int getQueuedCount() {
return queuedCount.get();
}

@Override
public int getThreadPoolMaxPoolSize() {
return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return ((ThreadPoolExecutor)executor).getActiveCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public final class CommandDispatcher {
private CommandDispatcher(OzoneContainer container, SCMConnectionManager
connectionManager, StateContext context,
CommandHandler... handlers) {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(handlers);
Preconditions.checkArgument(handlers.length > 0);
Preconditions.checkNotNull(container);
Preconditions.checkNotNull(connectionManager);
this.context = context;
this.container = container;
this.connectionManager = connectionManager;
Expand All @@ -77,6 +72,7 @@ private CommandDispatcher(OzoneContainer container, SCMConnectionManager
commandHandlerMetrics = CommandHandlerMetrics.create(handlerMap);
}

@VisibleForTesting
public CommandHandler getCloseContainerHandler() {
return handlerMap.get(Type.closeContainerCommand);
}
Expand Down Expand Up @@ -201,11 +197,12 @@ public Builder setContext(StateContext stateContext) {
* @return Command Dispatcher.
*/
public CommandDispatcher build() {
Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
" manager.");
Preconditions.checkNotNull(this.container, "Missing container.");
Preconditions.checkNotNull(this.context, "Missing context.");
Preconditions.checkArgument(this.handlerList.size() > 0);
Preconditions.checkNotNull(this.connectionManager,
"Missing scm connection manager.");
Preconditions.checkNotNull(this.container, "Missing ozone container.");
Preconditions.checkNotNull(this.context, "Missing state context.");
Preconditions.checkArgument(this.handlerList.size() > 0,
"The number of command handlers must be greater than 0.");
return new CommandDispatcher(this.container, this.connectionManager,
this.context, handlerList.toArray(
new CommandHandler[handlerList.size()]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ public int getQueuedCount() {

@Override
public int getThreadPoolMaxPoolSize() {
return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
return executor.getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return ((ThreadPoolExecutor)executor).getActiveCount();
return executor.getActiveCount();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ public long getTotalRunTime() {
return totalTime.get();
}

@Override
public int getThreadPoolMaxPoolSize() {
return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return ((ThreadPoolExecutor)executor).getActiveCount();
}

@Override
public void stop() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,22 @@ public Type getCommandType() {

@Override
public int getInvocationCount() {
return 0;
return (int) supervisor.getReplicationRequestCount(
ECReconstructionCoordinatorTask.class);
}

@Override
public long getAverageRunTime() {
int invocationCount = getInvocationCount();
if (invocationCount > 0) {
return getTotalRunTime() / invocationCount;
}
return 0;
}

@Override
public long getTotalRunTime() {
return 0;
return supervisor.getTotalTime(ECReconstructionCoordinatorTask.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
static final Logger LOG =
LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);

private int invocationCount;

private long totalTime;

private ConfigurationSource conf;

private ReplicationSupervisor supervisor;

private ContainerReplicator downloadReplicator;
Expand All @@ -60,7 +54,6 @@ public ReplicateContainerCommandHandler(
ReplicationSupervisor supervisor,
ContainerReplicator downloadReplicator,
ContainerReplicator pushReplicator) {
this.conf = conf;
this.supervisor = supervisor;
this.downloadReplicator = downloadReplicator;
this.pushReplicator = pushReplicator;
Expand Down Expand Up @@ -101,19 +94,20 @@ public SCMCommandProto.Type getCommandType() {

@Override
public int getInvocationCount() {
return this.invocationCount;
return (int) supervisor.getReplicationRequestCount(ReplicationTask.class);
}

@Override
public long getAverageRunTime() {
int invocationCount = getInvocationCount();
if (invocationCount > 0) {
return totalTime / invocationCount;
return getTotalRunTime() / invocationCount;
}
return 0;
}

@Override
public long getTotalRunTime() {
return totalTime;
return supervisor.getTotalTime(ReplicationTask.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -71,11 +72,15 @@ public final class ReplicationSupervisor {
private final StateContext context;
private final Clock clock;

private final AtomicLong requestCounter = new AtomicLong();
private final Map<Class<?>, AtomicLong> requestCounter =
new ConcurrentHashMap<>();
private final AtomicLong successCounter = new AtomicLong();
private final AtomicLong failureCounter = new AtomicLong();
private final AtomicLong timeoutCounter = new AtomicLong();
private final AtomicLong skippedCounter = new AtomicLong();
private final Map<Class<?>, AtomicLong> totalTimeCounter =
new ConcurrentHashMap<>();
private Object lock = new Object();

/**
* A set of container IDs that are currently being downloaded
Expand Down Expand Up @@ -221,6 +226,22 @@ public void addTask(AbstractReplicationTask task) {
return;
}

if (totalTimeCounter.get(task.getClass()) == null) {
synchronized (lock) {
if (totalTimeCounter.get(task.getClass()) == null) {
totalTimeCounter.put(task.getClass(), new AtomicLong());
}
}
}

if (requestCounter.get(task.getClass()) == null) {
synchronized (lock) {
if (requestCounter.get(task.getClass()) == null) {
requestCounter.put(task.getClass(), new AtomicLong());
}
}
}

if (inFlight.add(task)) {
if (task.getPriority() != ReplicationCommandPriority.LOW) {
// Low priority tasks are not included in the replication queue sizes
Expand Down Expand Up @@ -329,8 +350,9 @@ public TaskRunner(AbstractReplicationTask task) {

@Override
public void run() {
long startTime = Time.monotonicNow();
try {
requestCounter.incrementAndGet();
requestCounter.get(task.getClass()).incrementAndGet();

final long now = clock.millis();
final long deadline = task.getDeadline();
Expand Down Expand Up @@ -377,8 +399,10 @@ public void run() {
LOG.warn("Failed {}", this, e);
failureCounter.incrementAndGet();
} finally {
long endTime = Time.monotonicNow();
inFlight.remove(task);
decrementTaskCounter(task);
totalTimeCounter.get(task.getClass()).addAndGet(endTime - startTime);
}
}

Expand Down Expand Up @@ -419,7 +443,17 @@ public boolean equals(Object o) {
}

public long getReplicationRequestCount() {
return requestCounter.get();
AtomicLong totalRequest = new AtomicLong();
requestCounter.forEach((key, value) -> {
totalRequest.set(totalRequest.get() + value.get());
});
return totalRequest.get();
}

public long getReplicationRequestCount(
Class<? extends AbstractReplicationTask> taskClass) {
AtomicLong counter = requestCounter.get(taskClass);
return counter == null ? 0 : counter.get();
}

public long getQueueSize() {
Expand All @@ -442,6 +476,11 @@ public long getReplicationSuccessCount() {
return successCounter.get();
}

public long getTotalTime(Class<? extends AbstractReplicationTask> taskClass) {
AtomicLong counter = totalTimeCounter.get(taskClass);
return counter == null ? 0 : counter.get();
}

public long getReplicationFailureCount() {
return failureCounter.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.UUID;

import static java.util.Collections.singletonMap;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
Expand Down Expand Up @@ -292,4 +294,11 @@ private void waitTillFinishExecution(
GenericTestUtils.waitFor(()
-> closeHandler.getQueuedCount() <= 0, 10, 3000);
}

@Test
public void testThreadPoolPoolSize() {
assertEquals(1, subject.getThreadPoolMaxPoolSize());
assertEquals(0, subject.getThreadPoolActivePoolSize());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL;
import static org.mockito.Mockito.any;
Expand Down Expand Up @@ -168,6 +169,33 @@ public void normal(ContainerLayoutVersion layout) {
}
}

@ContainerLayoutTestInfo.ContainerTest
public void normal2(ContainerLayoutVersion layout) {
this.layoutVersion = layout;
// GIVEN
ReplicationSupervisor supervisor =
supervisorWithReplicator(FakeReplicator::new);
try {
//WHEN
supervisor.addTask(createTask(1L));
supervisor.addTask(createECTask(2L));
supervisor.addTask(createECTask(3L));
// Sleep 10s, wait all tasks processed
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
assertTrue(supervisor.getTotalTime(ReplicationTask.class) > 0);
assertEquals(1,
supervisor.getReplicationRequestCount(ReplicationTask.class));
assertEquals(2,
supervisor.getReplicationRequestCount(
ECReconstructionCoordinatorTask.class));
} finally {
supervisor.stop();
}
}

@ContainerLayoutTestInfo.ContainerTest
public void duplicateMessage(ContainerLayoutVersion layout) {
this.layoutVersion = layout;
Expand Down