Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class ECReconstructionCoordinator implements Closeable {
private final ContainerClientMetrics clientMetrics;
private final ECReconstructionMetrics metrics;
private final StateContext context;
private final OzoneClientConfig ozoneClientConfig;

public ECReconstructionCoordinator(
ConfigurationSource conf, CertificateClient certificateClient,
Expand All @@ -125,10 +126,10 @@ public ECReconstructionCoordinator(
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d")
.build();
ozoneClientConfig = conf.getObject(OzoneClientConfig.class);
this.ecReconstructExecutor =
new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
conf.getObject(OzoneClientConfig.class)
.getEcReconstructStripeReadPoolLimit(),
ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Expand Down Expand Up @@ -222,16 +223,15 @@ public void reconstructECContainerGroup(long containerID,

private ECBlockOutputStream getECBlockOutputStream(
BlockLocationInfo blockLocationInfo, DatanodeDetails datanodeDetails,
ECReplicationConfig repConfig, int replicaIndex,
OzoneClientConfig configuration) throws IOException {
ECReplicationConfig repConfig, int replicaIndex) throws IOException {
StreamBufferArgs streamBufferArgs =
StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, configuration);
StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, ozoneClientConfig);
return new ECBlockOutputStream(
blockLocationInfo.getBlockID(),
containerOperationClient.getXceiverClientManager(),
containerOperationClient.singleNodePipeline(datanodeDetails,
repConfig, replicaIndex),
BufferPool.empty(), configuration,
BufferPool.empty(), ozoneClientConfig,
blockLocationInfo.getToken(), clientMetrics, streamBufferArgs);
}

Expand Down Expand Up @@ -277,15 +277,14 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()];
OzoneClientConfig configuration = new OzoneClientConfig();
try {
for (int i = 0; i < toReconstructIndexes.size(); i++) {
int replicaIndex = toReconstructIndexes.get(i);
DatanodeDetails datanodeDetails =
targetMap.get(replicaIndex);
targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo,
datanodeDetails, repConfig, replicaIndex,
configuration);
datanodeDetails, repConfig, replicaIndex
);
bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
// Make sure it's clean. Don't want to reuse the erroneously returned
// buffers from the pool.
Expand Down