Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.common;

import org.apache.hadoop.util.Time;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;

/**
* This is a class which implements the Clock interface. It is a copy of the
* Java Clock.SystemClock only it uses MonotonicNow (nanotime) rather than
* System.currentTimeMills.
*/

public final class MonotonicClock extends Clock {

private final ZoneId zoneId;

public MonotonicClock(ZoneId zone) {
this.zoneId = zone;
}

@Override
public ZoneId getZone() {
return zoneId;
}

@Override
public Clock withZone(ZoneId zone) {
if (zone.equals(this.zoneId)) { // intentional NPE
return this;
}
return new MonotonicClock(zone);
}

@Override
public long millis() {
return Time.monotonicNow();
}

@Override
public Instant instant() {
return Instant.ofEpochMilli(millis());
}

@Override
public boolean equals(Object obj) {
if (obj instanceof MonotonicClock) {
return zoneId.equals(((MonotonicClock) obj).zoneId);
Comment on lines +65 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MonotonicClock should be final or equals should check class equality instead of instanceof. Otherwise equality of instances of a subclass and MonotonicClock would not be symmetric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I will make it final to avoid this problem. I basically copied this code from the Java SystemClock class, and it was a final class, so that explains why this code is that way.

}
return false;
}

@Override
public int hashCode() {
return zoneId.hashCode() + 1;
}

@Override
public String toString() {
return "MonotonicClock[" + zoneId + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.container;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -72,7 +73,6 @@
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.GeneratedMessage;
Expand Down Expand Up @@ -166,6 +166,7 @@ public class ReplicationManager implements MetricsSource, SCMService {
private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
private final long waitTimeInMillis;
private long lastTimeToBeReadyInMillis = 0;
private final Clock clock;

/**
* Constructs ReplicationManager instance with the given configuration.
Expand All @@ -182,7 +183,8 @@ public ReplicationManager(final ConfigurationSource conf,
final EventPublisher eventPublisher,
final SCMContext scmContext,
final SCMServiceManager serviceManager,
final NodeManager nodeManager) {
final NodeManager nodeManager,
final java.time.Clock clock) {
this.containerManager = containerManager;
this.containerPlacement = containerPlacement;
this.eventPublisher = eventPublisher;
Expand All @@ -193,6 +195,7 @@ public ReplicationManager(final ConfigurationSource conf,
this.inflightReplication = new ConcurrentHashMap<>();
this.inflightDeletion = new ConcurrentHashMap<>();
this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
this.clock = clock;

this.waitTimeInMillis = conf.getTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
Expand Down Expand Up @@ -276,13 +279,13 @@ public synchronized void stop() {
private synchronized void run() {
try {
while (running) {
final long start = Time.monotonicNow();
final long start = clock.millis();
final List<ContainerInfo> containers =
containerManager.getContainers();
containers.forEach(this::processContainer);

LOG.info("Replication Monitor Thread took {} milliseconds for" +
" processing {} containers.", Time.monotonicNow() - start,
" processing {} containers.", clock.millis() - start,
containers.size());

wait(rmConf.getInterval());
Expand Down Expand Up @@ -446,7 +449,7 @@ private void updateInflightAction(final ContainerInfo container,
final Map<ContainerID, List<InflightAction>> inflightActions,
final Predicate<InflightAction> filter) {
final ContainerID id = container.containerID();
final long deadline = Time.monotonicNow() - rmConf.getEventTimeout();
final long deadline = clock.millis() - rmConf.getEventTimeout();
if (inflightActions.containsKey(id)) {
final List<InflightAction> actions = inflightActions.get(id);

Expand Down Expand Up @@ -1087,7 +1090,7 @@ private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand(
final CommandForDatanode<T> datanodeCommand =
new CommandForDatanode<>(datanode.getUuid(), command);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
tracker.accept(new InflightAction(datanode, Time.monotonicNow()));
tracker.accept(new InflightAction(datanode, clock.millis()));
}

/**
Expand Down Expand Up @@ -1279,7 +1282,7 @@ public void notifyStatusChanged() {
// transition from PAUSING to RUNNING
if (serviceStatus != ServiceStatus.RUNNING) {
LOG.info("Service {} transitions to RUNNING.", getServiceName());
lastTimeToBeReadyInMillis = Time.monotonicNow();
lastTimeToBeReadyInMillis = clock.millis();
serviceStatus = ServiceStatus.RUNNING;
}
} else {
Expand All @@ -1296,7 +1299,7 @@ public boolean shouldRun() {
try {
// If safe mode is off, then this SCMService starts to run with a delay.
return serviceStatus == ServiceStatus.RUNNING &&
Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
clock.millis() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
} finally {
serviceLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.ozone.common.MonotonicClock;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
Expand All @@ -147,6 +148,7 @@
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -600,7 +602,8 @@ private void initializeSystemManagers(OzoneConfiguration conf,
eventQueue,
scmContext,
serviceManager,
scmNodeManager);
scmNodeManager,
new MonotonicClock(ZoneOffset.UTC));
}
if(configurator.getScmSafeModeManager() != null) {
scmSafeModeManager = configurator.getScmSafeModeManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -89,6 +92,7 @@ public class TestReplicationManager {
private ContainerManagerV2 containerManager;
private OzoneConfiguration conf;
private SCMNodeManager scmNodeManager;
private TestClock clock;

@Before
public void setup()
Expand Down Expand Up @@ -150,19 +154,8 @@ public void setup()
Mockito.any(DatanodeDetails.class)))
.thenReturn(NodeStatus.inServiceHealthy());

SCMServiceManager serviceManager = new SCMServiceManager();

replicationManager = new ReplicationManager(
conf,
containerManager,
containerPlacementPolicy,
eventQueue,
SCMContext.emptyContext(),
serviceManager,
nodeManager);

serviceManager.notifyStatusChanged();
Thread.sleep(100L);
clock = new TestClock(Instant.now(), ZoneId.of("UTC"));
createReplicationManager(new ReplicationManagerConfiguration());
}

private void createReplicationManager(ReplicationManagerConfiguration rmConf)
Expand All @@ -174,15 +167,15 @@ private void createReplicationManager(ReplicationManagerConfiguration rmConf)
config.setFromObject(rmConf);

SCMServiceManager serviceManager = new SCMServiceManager();

replicationManager = new ReplicationManager(
config,
containerManager,
containerPlacementPolicy,
eventQueue,
SCMContext.emptyContext(),
serviceManager,
nodeManager);
nodeManager,
clock);

serviceManager.notifyStatusChanged();
Thread.sleep(100L);
Expand Down Expand Up @@ -1093,6 +1086,25 @@ public void testUnderReplicatedNotHealthySource()
assertReplicaScheduled(0);
}

@Test
public void testReplicateCommandTimeout() throws
SCMException, InterruptedException {
long timeout = new ReplicationManagerConfiguration().getEventTimeout();

final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
assertReplicaScheduled(1);

// Already a pending replica, so nothing scheduled
assertReplicaScheduled(0);

// Advance the clock past the timeout, and there should be a replica
// scheduled
clock.fastForward(timeout + 1000);
assertReplicaScheduled(1);
}

private ContainerInfo createContainer(LifeCycleState containerState)
throws SCMException {
final ContainerInfo container = getContainer(containerState);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ozone.test;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;

/**
* An implementation of Clock which allows the time to be set to an instant and
* moved forward and back. Intended for use only in tests.
*/

public class TestClock extends Clock {

private Instant instant;
private final ZoneId zoneId;

public TestClock(Instant instant, ZoneId zone) {
this.instant = instant;
this.zoneId = zone;
}

@Override
public ZoneId getZone() {
return zoneId;
}

@Override
public Clock withZone(ZoneId zone) {
return new TestClock(Instant.now(), zone);
}

@Override
public Instant instant() {
return instant;
}

public void fastForward(long millis) {
set(instant().plusMillis(millis));
}

public void fastForward(TemporalAmount temporalAmount) {
set(instant().plus(temporalAmount));
}

public void rewind(long millis) {
set(instant().minusMillis(millis));
}

public void rewind(TemporalAmount temporalAmount) {
set(instant().minus(temporalAmount));
}

public void set(Instant newInstant) {
this.instant = newInstant;
}

}