Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.htrace.core.Tracer;
import org.eclipse.jetty.util.ajax.JSON;

Expand Down Expand Up @@ -395,6 +396,8 @@ public static InetSocketAddress createSocketAddr(String target) {
private static final double CONGESTION_RATIO = 1.5;
private DiskBalancer diskBalancer;

private final ExecutorService xferService;

@Nullable
private final StorageLocationChecker storageLocationChecker;

Expand Down Expand Up @@ -435,6 +438,8 @@ private static Tracer createTracer(Configuration conf) {
initOOBTimeout();
storageLocationChecker = null;
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
this.xferService =
HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory());
}

/**
Expand Down Expand Up @@ -475,6 +480,8 @@ private static Tracer createTracer(Configuration conf) {
conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");

this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());
this.xferService =
HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory());

// Determine whether we should try to pass file descriptors to clients.
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
Expand Down Expand Up @@ -2056,6 +2063,9 @@ public void shutdown() {
// wait reconfiguration thread, if any, to exit
shutdownReconfigurationTask();

LOG.info("Waiting up to 30 seconds for transfer threads to complete");
HadoopExecutors.shutdown(this.xferService, LOG, 15L, TimeUnit.SECONDS);

// wait for all data receiver threads to exit
if (this.threadGroup != null) {
int sleepMs = 2;
Expand Down Expand Up @@ -2329,16 +2339,16 @@ void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,

int numTargets = xferTargets.length;
if (numTargets > 0) {
StringBuilder xfersBuilder = new StringBuilder();
for (int i = 0; i < numTargets; i++) {
xfersBuilder.append(xferTargets[i]).append(" ");
}
LOG.info(bpReg + " Starting thread to transfer " +
block + " to " + xfersBuilder);
final String xferTargetsString =
StringUtils.join(" ", Arrays.asList(xferTargets));
LOG.info("{} Starting thread to transfer {} to {}", bpReg, block,
xferTargetsString);

final DataTransfer dataTransferTask = new DataTransfer(xferTargets,
xferTargetStorageTypes, xferTargetStorageIDs, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "");

new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes,
xferTargetStorageIDs, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
this.xferService.execute(dataTransferTask);
}
}

Expand Down Expand Up @@ -3016,15 +3026,22 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
b.setNumBytes(visible);

if (targets.length > 0) {
Daemon daemon = new Daemon(threadGroup,
new DataTransfer(targets, targetStorageTypes, targetStorageIds, b,
stage, client));
daemon.start();
if (LOG.isDebugEnabled()) {
final String xferTargetsString =
StringUtils.join(" ", Arrays.asList(targets));
LOG.debug("Transferring a replica to {}", xferTargetsString);
}

final DataTransfer dataTransferTask = new DataTransfer(targets,
targetStorageTypes, targetStorageIds, b, stage, client);

@SuppressWarnings("unchecked")
Future<Void> f = (Future<Void>) this.xferService.submit(dataTransferTask);
try {
daemon.join();
} catch (InterruptedException e) {
throw new IOException(
"Pipeline recovery for " + b + " is interrupted.", e);
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException("Pipeline recovery for " + b + " is interrupted.",
e);
}
}
}
Expand Down