Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hadoop-client-modules/hadoop-client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@
<exclude>org/apache/hadoop/yarn/client/api/package-info.class</exclude>
</excludes>
</filter>
<filter>
<artifact>org.apache.hadoop:*</artifact>
<excludes>
<exclude>org/apache/hadoop/hdfs/server/protocol/package-info.class</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
Expand Down Expand Up @@ -791,6 +792,9 @@ private void offerService() throws Exception {
shouldServiceRun = false;
return;
}
if (InvalidBlockReportLeaseException.class.getName().equals(reClass)) {
fullBlockReportLeaseId = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sodonnel Thanks for your works here. Do we also need set forceFullBlockReport to true here? Otherwise, datanode will send block report at next 6 hour by default, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At line 717, we can see where it attempts to get a lease from the heartbeat if the lease in the DN == 0:

   boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                  scheduler.isBlockReportDue(startTime);

So its the isBlockReportDue that controls this. Then later, if we have a non zero lease, it will try to create the block report:

        boolean forceFullBr =
            scheduler.forceFullBlockReport.getAndSet(false);
        if (forceFullBr) {
          LOG.info("Forcing a full block report to " + nnAddr);
        }
        if ((fullBlockReportLeaseId != 0) || forceFullBr) {
          cmds = blockReport(fullBlockReportLeaseId);
          fullBlockReportLeaseId = 0;
        }

Its really the isBlockReportDue() method that controls whether a new one should be sent of not, and that is based on time since the last one. The the blockReport(), it updates the time after a successful block report, but if it gets an exception, like this change causes, it will not update the time and so it will try again on the next heartbeat if it gets a new lease.

I think forceFullBlockReport is only for tests, or the command to force a DN block from the CLI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do you have any idea about fixing the checkstyle issue? As I mentioned above, trying to add a package-info.java file broke my compile locally.

Copy link
Contributor

@virajjasani virajjasani Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't expect to reach here frequently (hopefully datanode is able to acquire lease successfully most of the times), perhaps we can add one liner log to indicate that the particular FBR went through this trouble (i.e. log report id and lease id)? (just in case if it helps debug further)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe reportId and leaseId can be added as constructor argument to InvalidBlockReportLeaseException. This way RemoteException in offerService log will likely print them anyways?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its really the isBlockReportDue() method that controls whether a new one should be sent of not, and that is based on time since the last one. The the blockReport(), it updates the time after a successful block report, but if it gets an exception, like this change causes, it will not update the time and so it will try again on the next heartbeat if it gets a new lease.

Thanks for the detailed explain. Make sense to me.

perhaps we can add one liner log to indicate that the particular FBR went through this trouble (i.e. log report id and lease id)

+1 from my side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the report and lease ID into the exception message, so it will get logged as part of the exception.

Still stuck on that checkstyle violation :-(

}
LOG.warn("RemoteException in offerService", re);
sleepAfterException();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
Expand Down Expand Up @@ -1651,6 +1652,8 @@ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
bm.processReport(nodeReg, reports[index].getStorage(),
blocks, context));
}
} else {
throw new InvalidBlockReportLeaseException(context.getReportId(), context.getLeaseId());
}
} catch (UnregisteredNodeException une) {
LOG.warn("Datanode {} is attempting to report but not register yet.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.protocol;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* This exception is thrown when a datanode sends a full block report but it is
* rejected by the Namenode due to an invalid lease (expired or otherwise).
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class InvalidBlockReportLeaseException extends IOException {
/** for java.io.Serializable. */
private static final long serialVersionUID = 1L;

public InvalidBlockReportLeaseException(long blockReportID, long leaseID) {
super("Block report 0x" + Long.toHexString(blockReportID) + " was rejected as lease 0x"
+ Long.toHexString(leaseID) + " is invalid");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This package provides classes for the namenode server protocol.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
package org.apache.hadoop.hdfs.server.protocol;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
Expand All @@ -41,12 +42,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -137,6 +140,72 @@ public void testCheckBlockReportLease() throws Exception {
}
}

@Test
public void testExceptionThrownWhenFBRLeaseExpired() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
Random rand = new Random();

try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
cluster.waitActive();

FSNamesystem fsn = cluster.getNamesystem();
BlockManager blockManager = fsn.getBlockManager();
BlockManager spyBlockManager = spy(blockManager);
fsn.setBlockManagerForTesting(spyBlockManager);
String poolId = cluster.getNamesystem().getBlockPoolId();

NamenodeProtocols rpcServer = cluster.getNameNodeRpc();

// Test based on one DataNode report to Namenode
DataNode dn = cluster.getDataNodes().get(0);
DatanodeDescriptor datanodeDescriptor = spyBlockManager
.getDatanodeManager().getDatanode(dn.getDatanodeId());

DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId);
StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId);

// Send heartbeat and request full block report lease
HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);

// Remove full block report lease about dn
spyBlockManager.getBlockReportLeaseManager()
.removeLease(datanodeDescriptor);

ExecutorService pool = Executors.newFixedThreadPool(1);

// Trigger sendBlockReport
BlockReportContext brContext = new BlockReportContext(1, 0,
rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
Future<DatanodeCommand> sendBRfuturea = pool.submit(() -> {
// Build every storage with 100 blocks for sending report
DatanodeStorage[] datanodeStorages
= new DatanodeStorage[storages.length];
for (int i = 0; i < storages.length; i++) {
datanodeStorages[i] = storages[i].getStorage();
}
StorageBlockReport[] reports = createReports(datanodeStorages, 100);

// Send blockReport
return rpcServer.blockReport(dnRegistration, poolId, reports,
brContext);
});

// Get result, it will not null if process successfully
ExecutionException exception = null;
try {
sendBRfuturea.get();
} catch (ExecutionException e) {
exception = e;
}
assertNotNull(exception);
assertEquals(InvalidBlockReportLeaseException.class,
exception.getCause().getClass());
}
}

@Test
public void testCheckBlockReportLeaseWhenDnUnregister() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;

import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
Expand All @@ -39,7 +40,6 @@

import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -1187,8 +1187,9 @@ public Object answer(InvocationOnMock invocation)
// just reject and wait until DN request for a new leaseId
if(leaseId == 1) {
firstLeaseId = leaseId;
throw new ConnectException(
"network is not reachable for test. ");
InvalidBlockReportLeaseException e =
new InvalidBlockReportLeaseException(context.getReportId(), 1);
throw new RemoteException(e.getClass().getName(), e.getMessage());
} else {
secondLeaseId = leaseId;
return null;
Expand Down