Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getLocatedBlockLocations;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
Expand Down Expand Up @@ -364,7 +365,7 @@ private void setupReceiver(int timeoutMs) {
this.clientName = clientName;
this.src = src;
this.block = locatedBlock.getBlock();
this.locations = locatedBlock.getLocations();
this.locations = getLocatedBlockLocations(locatedBlock);
this.encryptor = encryptor;
this.datanodeInfoMap = datanodeInfoMap;
this.summer = summer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ private static FileCreator createFileCreator() throws NoSuchMethodException {
return createFileCreator3();
}

// hadoop 3.3.1 changed the return value of this method from DatanodeInfo[] to
// DatanodeInfoWithStorage[], which causes the JVM can not locate the method if we are compiled
// with hadoop 3.2 and then link with hadoop 3.3, so here we need to use reflection to make it
// work for both hadoop versions, otherwise we need to publish more artifacts for different hadoop
// versions...
private static final Method GET_LOCATED_BLOCK_LOCATIONS_METHOD;

static DatanodeInfo[] getLocatedBlockLocations(LocatedBlock block) {
try {
// DatanodeInfoWithStorage[] can be casted to DatanodeInfo[] directly
return (DatanodeInfo[]) GET_LOCATED_BLOCK_LOCATIONS_METHOD.invoke(block);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

// cancel the processing if DFSClient is already closed.
static final class CancelOnClose implements CancelableProgressable {

Expand All @@ -250,6 +266,7 @@ public boolean progress() {
try {
LEASE_MANAGER = createLeaseManager();
FILE_CREATOR = createFileCreator();
GET_LOCATED_BLOCK_LOCATIONS_METHOD = LocatedBlock.class.getMethod("getLocations");
} catch (Exception e) {
String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See "
Expand Down Expand Up @@ -383,7 +400,7 @@ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSC
BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) {
StorageType[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
DatanodeInfo[] datanodeInfos = getLocatedBlockLocations(locatedBlock);
boolean connectToDnViaHostname =
conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
Expand Down