diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/MonotonicClock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/MonotonicClock.java new file mode 100644 index 000000000000..62a323d25387 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/MonotonicClock.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.common; + +import org.apache.hadoop.util.Time; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; + +/** + * This is a class which implements the Clock interface. It is a copy of the + * Java Clock.SystemClock only it uses MonotonicNow (nanotime) rather than + * System.currentTimeMills. + */ + +public final class MonotonicClock extends Clock { + + private final ZoneId zoneId; + + public MonotonicClock(ZoneId zone) { + this.zoneId = zone; + } + + @Override + public ZoneId getZone() { + return zoneId; + } + + @Override + public Clock withZone(ZoneId zone) { + if (zone.equals(this.zoneId)) { // intentional NPE + return this; + } + return new MonotonicClock(zone); + } + + @Override + public long millis() { + return Time.monotonicNow(); + } + + @Override + public Instant instant() { + return Instant.ofEpochMilli(millis()); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof MonotonicClock) { + return zoneId.equals(((MonotonicClock) obj).zoneId); + } + return false; + } + + @Override + public int hashCode() { + return zoneId.hashCode() + 1; + } + + @Override + public String toString() { + return "MonotonicClock[" + zoneId + "]"; + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 2310848fd10e..c979fd4db0e8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import java.io.IOException; +import java.time.Clock; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -72,7 +73,6 @@ import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.GeneratedMessage; @@ -166,6 +166,7 @@ public class ReplicationManager implements MetricsSource, SCMService { private ServiceStatus serviceStatus = ServiceStatus.PAUSING; private final long waitTimeInMillis; private long lastTimeToBeReadyInMillis = 0; + private final Clock clock; /** * Constructs ReplicationManager instance with the given configuration. @@ -182,7 +183,8 @@ public ReplicationManager(final ConfigurationSource conf, final EventPublisher eventPublisher, final SCMContext scmContext, final SCMServiceManager serviceManager, - final NodeManager nodeManager) { + final NodeManager nodeManager, + final java.time.Clock clock) { this.containerManager = containerManager; this.containerPlacement = containerPlacement; this.eventPublisher = eventPublisher; @@ -193,6 +195,7 @@ public ReplicationManager(final ConfigurationSource conf, this.inflightReplication = new ConcurrentHashMap<>(); this.inflightDeletion = new ConcurrentHashMap<>(); this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum(); + this.clock = clock; this.waitTimeInMillis = conf.getTimeDuration( HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, @@ -276,13 +279,13 @@ public synchronized void stop() { private synchronized void run() { try { while (running) { - final long start = Time.monotonicNow(); + final long start = clock.millis(); final List containers = containerManager.getContainers(); containers.forEach(this::processContainer); LOG.info("Replication Monitor Thread took {} milliseconds for" + - " processing {} containers.", Time.monotonicNow() - start, + " processing {} containers.", clock.millis() - start, containers.size()); wait(rmConf.getInterval()); @@ -446,7 +449,7 @@ private void updateInflightAction(final ContainerInfo container, final Map> inflightActions, final Predicate filter) { final ContainerID id = container.containerID(); - final long deadline = Time.monotonicNow() - rmConf.getEventTimeout(); + final long deadline = clock.millis() - rmConf.getEventTimeout(); if (inflightActions.containsKey(id)) { final List actions = inflightActions.get(id); @@ -1087,7 +1090,7 @@ private void sendAndTrackDatanodeCommand( final CommandForDatanode datanodeCommand = new CommandForDatanode<>(datanode.getUuid(), command); eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand); - tracker.accept(new InflightAction(datanode, Time.monotonicNow())); + tracker.accept(new InflightAction(datanode, clock.millis())); } /** @@ -1279,7 +1282,7 @@ public void notifyStatusChanged() { // transition from PAUSING to RUNNING if (serviceStatus != ServiceStatus.RUNNING) { LOG.info("Service {} transitions to RUNNING.", getServiceName()); - lastTimeToBeReadyInMillis = Time.monotonicNow(); + lastTimeToBeReadyInMillis = clock.millis(); serviceStatus = ServiceStatus.RUNNING; } } else { @@ -1296,7 +1299,7 @@ public boolean shouldRun() { try { // If safe mode is off, then this SCMService starts to run with a delay. return serviceStatus == ServiceStatus.RUNNING && - Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis; + clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis; } finally { serviceLock.unlock(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index ad011e7b62be..e8380c6d0fa9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -126,6 +126,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneSecurityUtil; +import org.apache.hadoop.ozone.common.MonotonicClock; import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer; @@ -147,6 +148,7 @@ import java.net.InetSocketAddress; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -600,7 +602,8 @@ private void initializeSystemManagers(OzoneConfiguration conf, eventQueue, scmContext, serviceManager, - scmNodeManager); + scmNodeManager, + new MonotonicClock(ZoneOffset.UTC)); } if(configurator.getScmSafeModeManager() != null) { scmSafeModeManager = configurator.getScmSafeModeManager(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index 3bdc6ab1e98f..b09688577c9f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.TestClock; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -50,6 +51,8 @@ import org.mockito.Mockito; import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -89,6 +92,7 @@ public class TestReplicationManager { private ContainerManagerV2 containerManager; private OzoneConfiguration conf; private SCMNodeManager scmNodeManager; + private TestClock clock; @Before public void setup() @@ -150,19 +154,8 @@ public void setup() Mockito.any(DatanodeDetails.class))) .thenReturn(NodeStatus.inServiceHealthy()); - SCMServiceManager serviceManager = new SCMServiceManager(); - - replicationManager = new ReplicationManager( - conf, - containerManager, - containerPlacementPolicy, - eventQueue, - SCMContext.emptyContext(), - serviceManager, - nodeManager); - - serviceManager.notifyStatusChanged(); - Thread.sleep(100L); + clock = new TestClock(Instant.now(), ZoneId.of("UTC")); + createReplicationManager(new ReplicationManagerConfiguration()); } private void createReplicationManager(ReplicationManagerConfiguration rmConf) @@ -174,7 +167,6 @@ private void createReplicationManager(ReplicationManagerConfiguration rmConf) config.setFromObject(rmConf); SCMServiceManager serviceManager = new SCMServiceManager(); - replicationManager = new ReplicationManager( config, containerManager, @@ -182,7 +174,8 @@ private void createReplicationManager(ReplicationManagerConfiguration rmConf) eventQueue, SCMContext.emptyContext(), serviceManager, - nodeManager); + nodeManager, + clock); serviceManager.notifyStatusChanged(); Thread.sleep(100L); @@ -1093,6 +1086,25 @@ public void testUnderReplicatedNotHealthySource() assertReplicaScheduled(0); } + @Test + public void testReplicateCommandTimeout() throws + SCMException, InterruptedException { + long timeout = new ReplicationManagerConfiguration().getEventTimeout(); + + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + assertReplicaScheduled(1); + + // Already a pending replica, so nothing scheduled + assertReplicaScheduled(0); + + // Advance the clock past the timeout, and there should be a replica + // scheduled + clock.fastForward(timeout + 1000); + assertReplicaScheduled(1); + } + private ContainerInfo createContainer(LifeCycleState containerState) throws SCMException { final ContainerInfo container = getContainer(containerState); diff --git a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java new file mode 100644 index 000000000000..8cf0c7e548ac --- /dev/null +++ b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/TestClock.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ozone.test; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.time.temporal.TemporalAmount; + +/** + * An implementation of Clock which allows the time to be set to an instant and + * moved forward and back. Intended for use only in tests. + */ + +public class TestClock extends Clock { + + private Instant instant; + private final ZoneId zoneId; + + public TestClock(Instant instant, ZoneId zone) { + this.instant = instant; + this.zoneId = zone; + } + + @Override + public ZoneId getZone() { + return zoneId; + } + + @Override + public Clock withZone(ZoneId zone) { + return new TestClock(Instant.now(), zone); + } + + @Override + public Instant instant() { + return instant; + } + + public void fastForward(long millis) { + set(instant().plusMillis(millis)); + } + + public void fastForward(TemporalAmount temporalAmount) { + set(instant().plus(temporalAmount)); + } + + public void rewind(long millis) { + set(instant().minusMillis(millis)); + } + + public void rewind(TemporalAmount temporalAmount) { + set(instant().minus(temporalAmount)); + } + + public void set(Instant newInstant) { + this.instant = newInstant; + } + +}