Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class XceiverClientMetrics {

private @Metric MutableCounterLong pendingOps;
private @Metric MutableCounterLong totalOps;
private @Metric MutableCounterLong ecReconstructionTotal;
private @Metric MutableCounterLong ecReconstructionFailsTotal;
private MutableCounterLong[] pendingOpsArray;
private MutableCounterLong[] opsArray;
private MutableRate[] containerOpsLatency;
Expand Down Expand Up @@ -100,6 +102,14 @@ public long getPendingContainerOpCountMetrics(ContainerProtos.Type type) {
return pendingOpsArray[type.ordinal()].value();
}

public void incECReconstructionTotal() {
ecReconstructionTotal.incr();
}

public void incECReconstructionFailsTotal() {
ecReconstructionFailsTotal.incr();
}

@VisibleForTesting
public long getTotalOpCount() {
return totalOps.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
Expand Down Expand Up @@ -117,6 +118,10 @@ private synchronized void setReaderType() {
}

private void createBlockReader() {
if (reconstructionReader) {
XceiverClientManager.getXceiverClientMetrics()
.incECReconstructionTotal();
}
blockReader = ecBlockInputStreamFactory.create(reconstructionReader,
failedLocations, repConfig, blockInfo, verifyChecksum,
xceiverClientFactory, refreshFunction);
Expand Down Expand Up @@ -162,6 +167,8 @@ public synchronized int read(ByteBuffer buf) throws IOException {
// If we get an error from the reconstruction reader, there
// is nothing left to try. It will re-try until it has insufficient
// locations internally, so if an error comes here, just re-throw it.
XceiverClientManager.getXceiverClientMetrics()
.incECReconstructionFailsTotal();
throw e;
}
if (e instanceof BadDataLocationException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionSupervisor;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
Expand Down Expand Up @@ -116,6 +117,7 @@ public class DatanodeStateMachine implements Closeable {
private final ReadWriteLock constructionLock = new ReentrantReadWriteLock();
private final MeasuredReplicator replicatorMetrics;
private final ReplicationSupervisorMetrics replicationSupervisorMetrics;
private final ECReconstructionMetrics ecReconstructionMetrics;

/**
* Constructs a datanode state machine.
Expand Down Expand Up @@ -182,8 +184,11 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
replicationSupervisorMetrics =
ReplicationSupervisorMetrics.create(supervisor);

ecReconstructionMetrics = ECReconstructionMetrics.create();

ECReconstructionCoordinator ecReconstructionCoordinator =
new ECReconstructionCoordinator(conf, certClient);
new ECReconstructionCoordinator(conf, certClient,
ecReconstructionMetrics);
ecReconstructionSupervisor =
new ECReconstructionSupervisor(container.getContainerSet(), context,
replicationConfig.getReplicationMaxStreams(),
Expand Down Expand Up @@ -378,6 +383,7 @@ public void close() throws IOException {
}
context.setState(DatanodeStates.getLastState());
replicationSupervisorMetrics.unRegister();
ecReconstructionMetrics.unRegister();
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ public class ECReconstructionCoordinator implements Closeable {
private final BlockInputStreamFactory blockInputStreamFactory;
private final TokenHelper tokenHelper;
private final ContainerClientMetrics clientMetrics;
private final ECReconstructionMetrics metrics;

public ECReconstructionCoordinator(ConfigurationSource conf,
CertificateClient certificateClient)
throws IOException {
CertificateClient certificateClient,
ECReconstructionMetrics metrics) throws IOException {
this.containerOperationClient = new ECContainerOperationClient(conf,
certificateClient);
this.byteBufferPool = new ElasticByteBufferPool();
Expand All @@ -121,6 +122,7 @@ public ECReconstructionCoordinator(ConfigurationSource conf,
.getInstance(byteBufferPool, () -> ecReconstructExecutor);
tokenHelper = new TokenHelper(conf, certificateClient);
this.clientMetrics = ContainerClientMetrics.acquire();
this.metrics = metrics;
}

public void reconstructECContainerGroup(long containerID,
Expand Down Expand Up @@ -162,8 +164,13 @@ public void reconstructECContainerGroup(long containerID,
containerOperationClient
.closeContainer(containerID, dn, repConfig, containerToken);
}
metrics.incReconstructionTotal();
metrics.incBlockGroupReconstructionTotal(blockLocationInfoMap.size());
} catch (Exception e) {
// Any exception let's delete the recovering containers.
metrics.incReconstructionFailsTotal();
metrics.incBlockGroupReconstructionFailsTotal(
blockLocationInfoMap.size());
LOG.warn(
"Exception while reconstructing the container {}. Cleaning up"
+ " all the recovering containers in the reconstruction process.",
Expand Down Expand Up @@ -445,4 +452,8 @@ private long calcEffectiveBlockGroupLen(BlockData[] blockGroup,
}
return blockGroupLen == Long.MAX_VALUE ? 0 : blockGroupLen;
}

public ECReconstructionMetrics getECReconstructionMetrics() {
return this.metrics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ec.reconstruction;

import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.ozone.OzoneConsts;

/**
* Metrics class for EC Reconstruction.
*/
@InterfaceAudience.Private
@Metrics(about = "EC Reconstruction Coordinator Metrics",
context = OzoneConsts.OZONE)
public final class ECReconstructionMetrics {
private static final String SOURCE =
ECReconstructionMetrics.class.getSimpleName();

private @Metric MutableCounterLong blockGroupReconstructionTotal;
private @Metric MutableCounterLong blockGroupReconstructionFailsTotal;
private @Metric MutableCounterLong reconstructionTotal;
private @Metric MutableCounterLong reconstructionFailsTotal;

private ECReconstructionMetrics() {
}

public static ECReconstructionMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE, "EC Reconstruction Coordinator Metrics",
new ECReconstructionMetrics());
}

public void unRegister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE);
}

public void incBlockGroupReconstructionTotal(long count) {
blockGroupReconstructionTotal.incr(count);
}

public void incBlockGroupReconstructionFailsTotal(long count) {
blockGroupReconstructionFailsTotal.incr(count);
}

public void incReconstructionTotal() {
reconstructionTotal.incr();
}

public void incReconstructionFailsTotal() {
reconstructionFailsTotal.incr();
}

public long getReconstructionTotal() {
return reconstructionTotal.value();
}

public long getBlockGroupReconstructionTotal() {
return blockGroupReconstructionTotal.value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void testAddTaskShouldExecuteTheGivenTask()
final CountDownLatch holdProcessing = new CountDownLatch(1);
ECReconstructionSupervisor supervisor =
new ECReconstructionSupervisor(null, null, 5,
new ECReconstructionCoordinator(new OzoneConfiguration(), null) {
new ECReconstructionCoordinator(new OzoneConfiguration(), null,
ECReconstructionMetrics.create()) {
@Override
public void reconstructECContainerGroup(long containerID,
ECReplicationConfig repConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECContainerOperationClient;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
Expand Down Expand Up @@ -390,8 +391,10 @@ private void testECReconstructionCoordinator(List<Integer> missingIndexes)
new XceiverClientManager(config);
createKeyAndWriteData(keyString, bucket);
ECReconstructionCoordinator coordinator =
new ECReconstructionCoordinator(config, certClient);
new ECReconstructionCoordinator(config, certClient,
ECReconstructionMetrics.create());

ECReconstructionMetrics metrics = coordinator.getECReconstructionMetrics();
OzoneKeyDetails key = bucket.getKey(keyString);
long conID = key.getOzoneKeyLocations().get(0).getContainerID();
Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
Expand Down Expand Up @@ -493,7 +496,7 @@ private void testECReconstructionCoordinator(List<Integer> missingIndexes)
readContainerResponseProto.getContainerData().getState());
i++;
}

Assertions.assertEquals(metrics.getReconstructionTotal(), 1L);
}

private void createKeyAndWriteData(String keyString, OzoneBucket bucket)
Expand Down Expand Up @@ -565,7 +568,8 @@ public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure()

Assert.assertThrows(IOException.class, () -> {
ECReconstructionCoordinator coordinator =
new ECReconstructionCoordinator(config, certClient);
new ECReconstructionCoordinator(config, certClient,
ECReconstructionMetrics.create());
coordinator.reconstructECContainerGroup(conID,
(ECReplicationConfig) containerPipeline.getReplicationConfig(),
sourceNodeMap, targetNodeMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
Expand Down Expand Up @@ -196,7 +197,7 @@ public void testBucketOps() throws Exception {
ozoneManager, "bucketManager");
BucketManager mockBm = Mockito.spy(bucketManager);

OmBucketInfo bucketInfo = createBucketInfo();
OmBucketInfo bucketInfo = createBucketInfo(false);
doBucketOps(bucketInfo);

MetricsRecordBuilder omMetrics = getMetrics("OMMetrics");
Expand All @@ -208,11 +209,18 @@ public void testBucketOps() throws Exception {
assertCounter("NumBucketLists", 1L, omMetrics);
assertCounter("NumBuckets", 0L, omMetrics);

bucketInfo = createBucketInfo();
OmBucketInfo ecBucketInfo = createBucketInfo(true);
writeClient.createBucket(ecBucketInfo);
writeClient.deleteBucket(ecBucketInfo.getVolumeName(),
ecBucketInfo.getBucketName());
omMetrics = getMetrics("OMMetrics");
assertCounter("EcBucketCreateTotal", 1L, omMetrics);

bucketInfo = createBucketInfo(false);
writeClient.createBucket(bucketInfo);
bucketInfo = createBucketInfo();
bucketInfo = createBucketInfo(false);
writeClient.createBucket(bucketInfo);
bucketInfo = createBucketInfo();
bucketInfo = createBucketInfo(false);
writeClient.createBucket(bucketInfo);
writeClient.deleteBucket(bucketInfo.getVolumeName(),
bucketInfo.getBucketName());
Expand All @@ -232,15 +240,24 @@ public void testBucketOps() throws Exception {
mockWritePathExceptions(OmBucketInfo.class);
doBucketOps(bucketInfo);

ecBucketInfo = createBucketInfo(true);
try {
writeClient.createBucket(ecBucketInfo);
} catch (Exception e) {
//Expected failure
}
omMetrics = getMetrics("OMMetrics");
assertCounter("EcBucketCreateFailsTotal", 1L, omMetrics);

omMetrics = getMetrics("OMMetrics");
assertCounter("NumBucketOps", 14L, omMetrics);
assertCounter("NumBucketCreates", 5L, omMetrics);
assertCounter("NumBucketOps", 17L, omMetrics);
assertCounter("NumBucketCreates", 7L, omMetrics);
assertCounter("NumBucketUpdates", 2L, omMetrics);
assertCounter("NumBucketInfos", 2L, omMetrics);
assertCounter("NumBucketDeletes", 3L, omMetrics);
assertCounter("NumBucketDeletes", 4L, omMetrics);
assertCounter("NumBucketLists", 2L, omMetrics);

assertCounter("NumBucketCreateFails", 1L, omMetrics);
assertCounter("NumBucketCreateFails", 2L, omMetrics);
assertCounter("NumBucketUpdateFails", 1L, omMetrics);
assertCounter("NumBucketInfoFails", 1L, omMetrics);
assertCounter("NumBucketDeleteFails", 1L, omMetrics);
Expand Down Expand Up @@ -283,6 +300,7 @@ public void testKeyOps() throws Exception {
doKeyOps(keyArgs);
omMetrics = getMetrics("OMMetrics");
assertCounter("NumKeyOps", 14L, omMetrics);
assertCounter("EcKeyCreateTotal", 1L, omMetrics);

keyArgs = createKeyArgs(volumeName, bucketName,
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
Expand Down Expand Up @@ -331,10 +349,19 @@ public void testKeyOps() throws Exception {
assertCounter("NumKeyListFails", 1L, omMetrics);
assertCounter("NumTrashKeyListFails", 1L, omMetrics);
assertCounter("NumInitiateMultipartUploadFails", 1L, omMetrics);


assertCounter("NumKeys", 2L, omMetrics);

keyArgs = createKeyArgs(volumeName, bucketName,
new ECReplicationConfig("rs-3-2-1024K"));
try {
keySession = writeClient.openKey(keyArgs);
writeClient.commitKey(keyArgs, keySession.getId());
} catch (Exception e) {
//Expected Failure
}
omMetrics = getMetrics("OMMetrics");
assertCounter("EcKeyCreateFailsTotal", 1L, omMetrics);

cluster.restartOzoneManager();
assertCounter("NumKeys", 2L, omMetrics);

Expand Down Expand Up @@ -601,13 +628,19 @@ private OmBucketArgs getBucketArgs(OmBucketInfo info) {
.setBucketName(info.getBucketName())
.build();
}
private OmBucketInfo createBucketInfo() throws IOException {
private OmBucketInfo createBucketInfo(boolean isEcBucket) throws IOException {
OmVolumeArgs volumeArgs = createVolumeArgs();
writeClient.createVolume(volumeArgs);
DefaultReplicationConfig repConf = new DefaultReplicationConfig(
new ECReplicationConfig("rs-3-2-1024k"));
String bucketName = UUID.randomUUID().toString();
return new OmBucketInfo.Builder()

OmBucketInfo.Builder builder = new OmBucketInfo.Builder()
.setVolumeName(volumeArgs.getVolume())
.setBucketName(bucketName)
.build();
.setBucketName(bucketName);
if (isEcBucket) {
builder.setDefaultReplicationConfig(repConf);
}
return builder.build();
}
}
Loading