Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class BlockInputStream extends BlockExtendedInputStream {
private final BlockID blockID;
private final long length;
private Pipeline pipeline;
private final Token<OzoneBlockTokenIdentifier> token;
private Token<OzoneBlockTokenIdentifier> token;
private final boolean verifyChecksum;
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
Expand Down Expand Up @@ -103,19 +103,19 @@ public class BlockInputStream extends BlockExtendedInputStream {
// can be reset if a new position is seeked.
private int chunkIndexOfPrevPosition;

private final Function<BlockID, Pipeline> refreshPipelineFunction;
private final Function<BlockID, BlockLocationInfo> refreshFunction;

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, Pipeline> refreshPipelineFunction) {
Function<BlockID, BlockLocationInfo> refreshFunction) {
this.blockID = blockId;
this.length = blockLen;
this.pipeline = pipeline;
this.token = token;
this.verifyChecksum = verifyChecksum;
this.xceiverClientFactory = xceiverClientFactory;
this.refreshPipelineFunction = refreshPipelineFunction;
this.refreshFunction = refreshFunction;
}

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Expand Down Expand Up @@ -150,12 +150,12 @@ public synchronized void initialize() throws IOException {
} catch (SCMSecurityException ex) {
throw ex;
} catch (StorageContainerException ex) {
refreshPipeline(ex);
refreshBlockInfo(ex);
catchEx = ex;
} catch (IOException ex) {
LOG.debug("Retry to get chunk info fail", ex);
if (isConnectivityIssue(ex)) {
refreshPipeline(ex);
refreshBlockInfo(ex);
}
catchEx = ex;
}
Expand Down Expand Up @@ -199,17 +199,19 @@ private boolean isConnectivityIssue(IOException ex) {
return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
}

private void refreshPipeline(IOException cause) throws IOException {
private void refreshBlockInfo(IOException cause) throws IOException {
LOG.info("Unable to read information for block {} from pipeline {}: {}",
blockID, pipeline.getId(), cause.getMessage());
if (refreshPipelineFunction != null) {
LOG.debug("Re-fetching pipeline for block {}", blockID);
Pipeline newPipeline = refreshPipelineFunction.apply(blockID);
if (newPipeline == null) {
LOG.debug("No new pipeline for block {}", blockID);
if (refreshFunction != null) {
LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
if (blockLocationInfo == null) {
LOG.debug("No new block location info for block {}", blockID);
} else {
LOG.debug("New pipeline for block {}: {}", blockID, newPipeline);
this.pipeline = newPipeline;
LOG.debug("New block location info for block {}: {}",
blockID, blockLocationInfo);
Comment on lines +211 to +212
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockLocationInfo#toString includes the token. I think we should keep logging the pipeline only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg i missed this one. Will add an addendum to it.

this.pipeline = blockLocationInfo.getPipeline();
this.token = blockLocationInfo.getToken();
}
} else {
throw cause;
Expand Down Expand Up @@ -526,7 +528,7 @@ private void handleReadError(IOException cause) throws IOException {
}
}

refreshPipeline(cause);
refreshBlockInfo(cause);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ public interface BlockInputStreamFactory {
* @param token The block Access Token
* @param verifyChecksum Whether to verify checksums or not.
* @param xceiverFactory Factory to create the xceiver in the client
* @param refreshFunction Function to refresh the pipeline if needed
* @param refreshFunction Function to refresh the block location if needed
* @return BlockExtendedInputStream of the correct type.
*/
BlockExtendedInputStream create(ReplicationConfig repConfig,
BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction);
Function<BlockID, BlockLocationInfo> refreshFunction);

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig,
BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction) {
Function<BlockID, BlockLocationInfo> refreshFunction) {
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
private final BlockInputStreamFactory streamFactory;
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, Pipeline> refreshFunction;
private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
private final DatanodeDetails[] dataLocations;
private final BlockExtendedInputStream[] blockStreams;
Expand Down Expand Up @@ -120,8 +120,9 @@ protected int availableParityLocations() {

public ECBlockInputStream(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
BlockInputStreamFactory streamFactory) {
this.repConfig = repConfig;
this.ecChunkSize = repConfig.getEcChunkSize();
this.verifyChecksum = verifyChecksum;
Expand Down Expand Up @@ -215,27 +216,30 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
* @param refreshFunc
* @return
*/
protected Function<BlockID, Pipeline> ecPipelineRefreshFunction(
int replicaIndex, Function<BlockID, Pipeline> refreshFunc) {
protected Function<BlockID, BlockLocationInfo> ecPipelineRefreshFunction(
int replicaIndex, Function<BlockID, BlockLocationInfo> refreshFunc) {
return (blockID) -> {
Pipeline ecPipeline = refreshFunc.apply(blockID);
if (ecPipeline == null) {
BlockLocationInfo blockLocationInfo = refreshFunc.apply(blockID);
if (blockLocationInfo == null) {
return null;
}
Pipeline ecPipeline = blockLocationInfo.getPipeline();
DatanodeDetails curIndexNode = ecPipeline.getNodes()
.stream().filter(dn ->
ecPipeline.getReplicaIndex(dn) == replicaIndex)
.findAny().orElse(null);
if (curIndexNode == null) {
return null;
}
return Pipeline.newBuilder().setReplicationConfig(
Pipeline pipeline = Pipeline.newBuilder().setReplicationConfig(
StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Collections.singletonList(curIndexNode))
.setId(PipelineID.randomId())
.setState(Pipeline.PipelineState.CLOSED)
.build();
blockLocationInfo.setPipeline(pipeline);
return blockLocationInfo;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;

Expand All @@ -47,12 +46,12 @@ public interface ECBlockInputStreamFactory {
* @param blockInfo The blockInfo representing the block.
* @param verifyChecksum Whether to verify checksums or not.
* @param xceiverFactory Factory to create the xceiver in the client
* @param refreshFunction Function to refresh the pipeline if needed
* @param refreshFunction Function to refresh the block location if needed
* @return BlockExtendedInputStream of the correct type.
*/
BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction);
Function<BlockID, BlockLocationInfo> refreshFunction);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.io.ByteBufferPool;
Expand Down Expand Up @@ -77,7 +76,7 @@ public BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction) {
Function<BlockID, BlockLocationInfo> refreshFunction) {
if (missingLocations) {
// We create the reconstruction reader
ECBlockReconstructedStripeInputStream sis =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
private final ECReplicationConfig repConfig;
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, Pipeline> refreshFunction;
private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
private final ECBlockInputStreamFactory ecBlockInputStreamFactory;

Expand Down Expand Up @@ -99,7 +99,8 @@ public static int availableDataLocations(Pipeline pipeline,
public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
BlockLocationInfo> refreshFunction,
ECBlockInputStreamFactory streamFactory) {
this.repConfig = repConfig;
this.verifyChecksum = verifyChecksum;
this.blockInfo = blockInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
Expand Down Expand Up @@ -152,8 +151,9 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
@SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
BlockInputStreamFactory streamFactory,
ByteBufferPool byteBufferPool,
ExecutorService ecReconstructExecutor) {
super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DummyBlockInputStream extends BlockInputStream {
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
XceiverClientFactory xceiverClientManager,
Function<BlockID, Pipeline> refreshFunction,
Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunks) {
super(blockId, blockLen, pipeline, token, verifyChecksum,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@
package org.apache.hadoop.hdds.scm.storage;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* A dummy BlockInputStream with pipeline refresh function to mock read
Expand All @@ -60,13 +59,15 @@ final class DummyBlockInputStreamWithRetry
super(blockId, blockLen, pipeline, token, verifyChecksum,
xceiverClientManager, blockID -> {
isRerfreshed.set(true);
return Pipeline.newBuilder()
.setState(Pipeline.PipelineState.OPEN)
.setId(PipelineID.randomId())
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
ReplicationFactor.ONE))
.setNodes(Collections.emptyList())
.build();
try {
BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class);
Pipeline mockPipeline = MockPipeline.createPipeline(1);
when(blockLocationInfo.getPipeline()).thenReturn(mockPipeline);
return blockLocationInfo;
} catch (IOException e) {
throw new RuntimeException(e);
}

}, chunkList, chunkMap);
this.ioException = ioException;
}
Expand Down
Loading