Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
e947bcb
[SPARK-9853][Core] Optimize shuffle fetch of contiguous partition IDs
Nov 20, 2017
53affd4
Simplify length in convertMapStatuses
Nov 21, 2017
e437a26
Simplify totalSize in convertMapStatuses
Nov 24, 2017
12163f3
Modify for external shuffle service
Nov 24, 2017
7aa805b
Solve mima issue
Nov 24, 2017
9cb1f0f
Fix ExternalShuffleBlockResolverSuite
Nov 24, 2017
2799886
Merge remote-tracking branch 'origin/master' into 19788.1
Feb 1, 2018
80c8da9
Remove unnecessary logDebug
Nov 24, 2017
b410f9c
Improve indent
Nov 25, 2017
85a4f49
Using ContinuousShuffleBlockId
Nov 28, 2017
c7db28d
Support ContinuousShuffleBlockId in external shuffle service and fix …
Nov 28, 2017
ff4bece
Fix tests failure
Nov 28, 2017
80bb53c
Add comments for getBlockData in ExternalShuffleBlockResolver
Nov 28, 2017
e18eb59
Update as per internal review
Nov 29, 2017
69cde07
Minor fix
Nov 29, 2017
978e10d
Fix conflict for SPARK-22982
Feb 1, 2018
2a1a97c
Rename ContinuousShuffleBlockId.length to ContinuousShuffleBlockId.nu…
Feb 1, 2018
d8691ee
Rename length to numBlocks
Feb 2, 2018
f30f057
Fix mima issue
Feb 2, 2018
6c684bc
Disable continuous fetch for non-concatenatable compression format
Feb 11, 2018
a9ab62e
continuousBlockBulkFetch should be enabled in adaptive execution only
Feb 11, 2018
87c9af6
Enable ContinuousShuffleBlockId only:
Feb 22, 2018
7fa0fb9
Using isShuffleBlock to simplify
Feb 22, 2018
ab4aa5f
Remove getBlockData, this one is used in ExternalShuffleBlockResolver…
Feb 22, 2018
e5e7c38
Make ShuffleBlockIdBase extend BlockId
Feb 22, 2018
6ee294a
Merge remote-tracking branch 'upstream/master' into shuffle_fetch_opt
Feb 22, 2018
583666b
Fix failure in org.apache.spark.shuffle.BlockStoreShuffleReaderSuite.…
Feb 22, 2018
c133776
Use serializerRelocatable: Boolean
Mar 1, 2018
fc0fe77
Keep FetchFailed unchanged
yucai Mar 1, 2018
7009a5e
Merge remote-tracking branch 'upstream/master' into shuffle_fetch_opt
yucai Mar 3, 2018
4a31e9c
minor
yucai Mar 4, 2018
08d2ca1
Merge remote-tracking branch 'origin/master' into pr19788
yucai Oct 22, 2018
5efa1af
add spark.shuffle.continuousBlockBatchFetch
yucai Oct 24, 2018
3412bcb
make both server and client side back compatible
yucai Dec 26, 2018
63d9eb1
Use ContinuousShuffleBlockId for local shuffle read
yucai Jan 2, 2019
a0cd415
continuousBlockBatchFetch for remote shuffle
yucai Jan 2, 2019
3da48d8
Merge remote-tracking branch 'origin/master' into pr19788_server
yucai Jan 2, 2019
c650e65
external shuffle service
yucai Jan 3, 2019
752f90c
remove spark.shuffle.continuousBlockBatchFetch
yucai Jan 4, 2019
b91c3e9
improve encoders
yucai Jan 4, 2019
9ffe59d
rename shuffleBlocksBatchFetch
yucai Jan 4, 2019
9354ed0
address.executorId == blockManager.blockManagerId.executorId
yucai Jan 4, 2019
0f49142
enhance support for failure fetch
yucai Jan 4, 2019
535fe5f
DownloadCallback's onFailure
yucai Jan 5, 2019
087422f
Fix MapOutputTrackerSuite
yucai Jan 5, 2019
2191f8d
Fix tests
yucai Jan 5, 2019
2ca4092
minor
yucai Jan 6, 2019
6496abf
Fix ExternalShuffleIntegrationSuite
yucai Jan 6, 2019
e91dbd5
Fix ShuffleBlockFetcherIteratorSuite
yucai Jan 7, 2019
6cb7110
Fix ExternalShuffleServiceSuite
yucai Jan 7, 2019
05e4465
rename to ShuffleBlockBatchId
yucai Jan 8, 2019
eb752aa
Fix retry scenario
yucai Jan 8, 2019
694ec2d
add tests for ShuffleBlockFetcherIteratorSuite
yucai Jan 8, 2019
45d7096
enable optimization only when AE enables
yucai Jan 8, 2019
4850d33
add testBatchFetchThreeSort
yucai Jan 9, 2019
28355b1
add tests for OpenBlocks and StreamHandle backward compatibility
yucai Jan 9, 2019
7a65cc4
Fix MapOutputTrackerSuite
yucai Jan 9, 2019
a9ffdee
minor
yucai Jan 10, 2019
96b0082
Improve corrupt retry for external shuffle service
yucai Jan 10, 2019
ace48fd
improve getBlockData
yucai Jan 10, 2019
86aa1fb
minor
yucai Jan 10, 2019
fcf434b
minor
yucai Jan 11, 2019
a360bbf
fix all java style issue
yucai Jan 11, 2019
2f28a7e
fix ExternalShuffleBlockHandlerSuite
yucai Jan 11, 2019
5bd6d73
decrease 0-size array creation
yucai Jan 14, 2019
aa6134b
Dont change StreamHandle
yucai Jan 15, 2019
c6ebe0e
address bug
yucai Jan 15, 2019
be54f49
fix java style
yucai Jan 15, 2019
1c91d20
don't use batchId in refetch
yucai Jan 15, 2019
f810e28
java style
yucai Jan 15, 2019
8a1815f
return blockIds when failure
yucai Jan 15, 2019
076894f
Merge remote-tracking branch 'origin/master' into pr19788_server
yucai Jan 15, 2019
b33b4dd
using ArrayShuffleBlockId to improve retry
yucai Jan 16, 2019
91aeff9
fetchContinuousShuffleBlocksInBatch == true, OpenBlocks could contain…
yucai Jan 16, 2019
ea1795a
checkout MapOutputTracker.scala
yucai Jan 16, 2019
dd980dd
merge local in ShuffleBlockFetcherIterator
yucai Jan 16, 2019
e9d8620
revert MapOutputTrackerSuite.scala
yucai Jan 16, 2019
5933bf8
local merge
yucai Jan 17, 2019
57fab14
remove ArrayShuffleBlockString
yucai Jan 17, 2019
c75e016
use arrayBlockId in results always
yucai Jan 17, 2019
8398120
minor
yucai Jan 17, 2019
2424be0
remove ShuffleBlockBatchId
yucai Jan 17, 2019
bd9f70e
Merge remote-tracking branch 'origin/master' into pr19788_server
yucai Jan 17, 2019
5e4430a
simplify merge function
yucai Jan 19, 2019
401bddb
minor
yucai Jan 20, 2019
3d4fc7e
Merge remote-tracking branch 'origin/master' into pr19788_server
yucai Jan 20, 2019
039ae85
address comments
yucai Jan 22, 2019
92c0ab6
minor
yucai Jan 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ public interface BlockFetchingListener extends EventListener {
* automatically. If the data will be passed to another thread, the receiver should retain()
* and release() the buffer on their own, or copy the data to a new buffer.
*/
void onBlockFetchSuccess(String blockId, ManagedBuffer data);
void onBlockFetchSuccess(String[] blockIds, ManagedBuffer data);

This comment was marked as resolved.

This comment was marked as resolved.


/**
* Called at least once per block upon failures.
*/
void onBlockFetchFailure(String blockId, Throwable exception);
void onBlockFetchFailure(String[] blockIds, Throwable exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.*;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
Expand Down Expand Up @@ -91,16 +89,18 @@ protected void handleMessage(
try {
OpenBlocks msg = (OpenBlocks) msgObj;
checkAuth(client, msg.appId);
long streamId = streamManager.registerStream(client.getClientId(),
new ManagedBufferIterator(msg.appId, msg.execId, msg.blockIds), client.getChannel());
ManagedBufferIterator blocksIter = new ManagedBufferIterator(
msg.appId, msg.execId, msg.blockIds, msg.fetchContinuousShuffleBlocksInBatch);
long streamId = streamManager.registerStream(
client.getClientId(), blocksIter, client.getChannel());
if (logger.isTraceEnabled()) {
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
streamId,
msg.blockIds.length,
blocksIter.getNumChunks(),
client.getClientId(),
getRemoteAddress(client.getChannel()));
}
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
callback.onSuccess(new StreamHandle(streamId, blocksIter.getNumChunks()).toByteBuffer());
} finally {
responseDelayContext.stop();
}
Expand Down Expand Up @@ -211,42 +211,58 @@ private class ManagedBufferIterator implements Iterator<ManagedBuffer> {
private final String appId;
private final String execId;
private final int shuffleId;
// An array containing mapId and reduceId pairs.
private final int[] mapIdAndReduceIds;
// An array containing mapId, reduceId and numReducers tuple
private final int[] shuffleBlockBatches;

ManagedBufferIterator(String appId, String execId, String[] blockIds) {
ManagedBufferIterator(
String appId,
String execId,
String[] blockIds,
boolean fetchContinuousShuffleBlocksInBatch) {
this.appId = appId;
this.execId = execId;
String[] blockId0Parts = blockIds[0].split("_");
if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) {
if (!ExternalShuffleBlockResolver.isShuffleBlock(blockId0Parts)) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]);
}
this.shuffleId = Integer.parseInt(blockId0Parts[1]);
mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]);
if (fetchContinuousShuffleBlocksInBatch) {
ArrayList<ArrayList<int[]>> arrayShuffleBlockIds =
ExternalShuffleBlockResolver.mergeContinuousShuffleBlockIds(blockIds);
shuffleBlockBatches = new int[arrayShuffleBlockIds.size() * 3];
for (int i = 0; i < arrayShuffleBlockIds.size(); i++) {
ArrayList<int[]> arrayShuffleBlockId = arrayShuffleBlockIds.get(i);
int[] startBlockId = arrayShuffleBlockId.get(0);
int[] endBlockId = arrayShuffleBlockId.get(arrayShuffleBlockId.size() - 1);
shuffleBlockBatches[3 * i] = startBlockId[0];
shuffleBlockBatches[3 * i + 1] = startBlockId[1];
shuffleBlockBatches[3 * i + 2] = endBlockId[1] - startBlockId[1] + 1;
}
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockIds[i]);
} else {
shuffleBlockBatches = new int[3 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
int[] blockIdParts = ExternalShuffleBlockResolver.getBlockIdParts(blockIds[i]);
shuffleBlockBatches[3 * i] = blockIdParts[0];
shuffleBlockBatches[3 * i + 1] = blockIdParts[1];
shuffleBlockBatches[3 * i + 2] = 1;
}
mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
}
}

public int getNumChunks() {
return shuffleBlockBatches.length / 3;
}

@Override
public boolean hasNext() {
return index < mapIdAndReduceIds.length;
return index < shuffleBlockBatches.length;
}

@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId,
mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
index += 2;
shuffleBlockBatches[index], shuffleBlockBatches[index + 1], shuffleBlockBatches[index + 2]);
index += 3;
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,69 @@ public void registerExecutor(
executors.put(fullId, executorInfo);
}

// For testing
public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
int mapId,
int reduceId) {
return getBlockData(appId, execId, shuffleId, mapId, reduceId, 1);
}

/**
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
* about how the hash and sort based shuffles store their data.
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId, numReducers). We make
* assumptions about how the hash and sort based shuffles store their data.
*/
public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
int mapId,
int reduceId) {
int reduceId,
int numReducers) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId, numReducers);
}

public static boolean isShuffleBlock(String[] blockIdParts) {
return blockIdParts.length == 4 && blockIdParts[0].equals("shuffle");
}

public static int[] getBlockIdParts(String blockId) {
String[] blockIdParts = blockId.split("_");
if (!isShuffleBlock(blockIdParts)) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockId);
}
return new int[] { Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) };
}

// Currently, for all input blockIds, we can make assumption that block ids of the same mapper id
// are consecutive in the map output file. Although, logically, they might not be consecutive
// because of zero-sized blocks, which have been filtered out in the client side actually.
public static ArrayList<ArrayList<int[]>> mergeContinuousShuffleBlockIds(String[] blockIds) {
ArrayList<int[]> shuffleBlockIds = new ArrayList<>();
ArrayList<ArrayList<int[]>> arrayShuffleBlockIds = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only need to return ArrayList<int[]>, and the int[] has 3 parts: mapId, reduceId and numBlocks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I want to keep it the same as BlockManager.mergeContinuousShuffleBlockIds, but agree with you, ArrayList<int[]> is much simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, seems like numBlocks is not enough, which includes possible zero size blocks.
And this function will be reused in OneForOneBlockFetcher, there needs real size infor.

  private void initShuffleBlockIdIndices(String[] blockIds) {
    ArrayList<ArrayList<int[]>> arrayShuffleBlockIds =
      ExternalShuffleBlockResolver.mergeContinuousShuffleBlockIds(blockIds);
    assert(arrayShuffleBlockIds.size() == streamHandle.numChunks);
    blockIdIndices = new int[arrayShuffleBlockIds.size() + 1];
    blockIdIndices[0] = 0;
    for (int i = 0; i < arrayShuffleBlockIds.size(); i++) {
      blockIdIndices[i + 1] = blockIdIndices[i] + arrayShuffleBlockIds.get(i).size();
    }
  }


for (String blockId: blockIds) {
int[] blockIdParts = getBlockIdParts(blockId);
if (shuffleBlockIds.size() == 0) {
shuffleBlockIds.add(blockIdParts);
} else {
if (blockIdParts[0] != shuffleBlockIds.get(0)[0]) {
arrayShuffleBlockIds.add(shuffleBlockIds);
shuffleBlockIds = new ArrayList<>();
}
shuffleBlockIds.add(blockIdParts);
}
}
arrayShuffleBlockIds.add(shuffleBlockIds);

return arrayShuffleBlockIds;
}

/**
Expand Down Expand Up @@ -280,13 +327,14 @@ public boolean accept(File dir, String name) {
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId, int numReducers) {
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");

try {
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
ShuffleIndexRecord shuffleIndexRecord =
shuffleIndexInformation.getIndex(reduceId, numReducers);
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,16 @@ public void fetchBlocks(
String execId,
String[] blockIds,
BlockFetchingListener listener,
DownloadFileManager downloadFileManager) {
DownloadFileManager downloadFileManager,
boolean fetchContinuousShuffleBlocksInBatch) {
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
(blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockFetcher(client, appId, execId,
blockIds1, listener1, conf, downloadFileManager).start();
new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1, conf,
downloadFileManager, fetchContinuousShuffleBlocksInBatch).start();
};

int maxRetries = conf.maxIORetries();
Expand All @@ -112,9 +113,7 @@ public void fetchBlocks(
}
} catch (Exception e) {
logger.error("Exception while beginning fetchBlocks", e);
for (String blockId : blockIds) {
listener.onBlockFetchFailure(blockId, e);
}
listener.onBlockFetchFailure(blockIds, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.ArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -50,6 +51,10 @@ public class OneForOneBlockFetcher {
private final TransportClient client;
private final OpenBlocks openMessage;
private final String[] blockIds;
// In adaptive execution, one returned chunk might contain data for several consecutive blockIds,
// blockIdIndices is used to record the mapping relationship between chunk and its blockIds.
// chunk i contains block Ids: blockIdIndices[i] until blockIdIndices[i + 1] in blockIds
private int[] blockIdIndices = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment to explain the relationship between blocks and chunks.

private final BlockFetchingListener listener;
private final ChunkReceivedCallback chunkCallback;
private final TransportConf transportConf;
Expand All @@ -64,7 +69,7 @@ public OneForOneBlockFetcher(
String[] blockIds,
BlockFetchingListener listener,
TransportConf transportConf) {
this(client, appId, execId, blockIds, listener, transportConf, null);
this(client, appId, execId, blockIds, listener, transportConf, null, false);
}

public OneForOneBlockFetcher(
Expand All @@ -74,9 +79,10 @@ public OneForOneBlockFetcher(
String[] blockIds,
BlockFetchingListener listener,
TransportConf transportConf,
DownloadFileManager downloadFileManager) {
DownloadFileManager downloadFileManager,
boolean fetchContinuousShuffleBlocksInBatch) {
this.client = client;
this.openMessage = new OpenBlocks(appId, execId, blockIds);
this.openMessage = new OpenBlocks(appId, execId, blockIds, fetchContinuousShuffleBlocksInBatch);
this.blockIds = blockIds;
this.listener = listener;
this.chunkCallback = new ChunkCallback();
Expand All @@ -89,13 +95,15 @@ private class ChunkCallback implements ChunkReceivedCallback {
@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
// On receipt of a chunk, pass it upwards as a block.
listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
listener.onBlockFetchSuccess(Arrays.copyOfRange(blockIds, blockIdIndices[chunkIndex],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to avoid copy? e.g. if we change the callback interface to not take Array but Seq, then maybe we can create a special Seq which re-maps the index of blockIds, to avoid array copy.

blockIdIndices[chunkIndex + 1]), buffer);
}

@Override
public void onFailure(int chunkIndex, Throwable e) {
// On receipt of a failure, fail every block from chunkIndex onwards.
String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length);
String[] remainingBlockIds = Arrays.copyOfRange(blockIds, blockIdIndices[chunkIndex],
blockIds.length);
failRemainingBlocks(remainingBlockIds, e);
}
}
Expand All @@ -117,6 +125,25 @@ public void onSuccess(ByteBuffer response) {
streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);

// initiate blockIdIndices
if (streamHandle.numChunks == blockIds.length) {
blockIdIndices = new int[streamHandle.numChunks + 1];
for (int i = 0; i < blockIdIndices.length; i++) {
blockIdIndices[i] = i;
}
} else {
// server fetches continuous shuffle blocks in batch
ArrayList<ArrayList<int[]>> arrayShuffleBlockIds =
ExternalShuffleBlockResolver.mergeContinuousShuffleBlockIds(blockIds);
assert(streamHandle.numChunks == arrayShuffleBlockIds.size());
blockIdIndices = new int[arrayShuffleBlockIds.size() + 1];
blockIdIndices[0] = 0;
for (int i = 1; i < blockIdIndices.length; i++) {
blockIdIndices[i] = blockIdIndices[i - 1] + arrayShuffleBlockIds.get(i - 1).size();;
}
}
assert blockIdIndices[blockIdIndices.length - 1] == blockIds.length;

// Immediately request all chunks -- we expect that the total size of the request is
// reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
for (int i = 0; i < streamHandle.numChunks; i++) {
Expand All @@ -143,12 +170,10 @@ public void onFailure(Throwable e) {

/** Invokes the "onBlockFetchFailure" callback for every listed block id. */
private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
for (String blockId : failedBlockIds) {
try {
listener.onBlockFetchFailure(blockId, e);
} catch (Exception e2) {
logger.error("Error in block fetch failure callback", e2);
}
try {
listener.onBlockFetchFailure(failedBlockIds, e);
} catch (Exception e2) {
logger.error("Error in block fetch failure callback", e2);
}
}

Expand All @@ -173,7 +198,8 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {

@Override
public void onComplete(String streamId) throws IOException {
listener.onBlockFetchSuccess(blockIds[chunkIndex], channel.closeAndRead());
listener.onBlockFetchSuccess(Arrays.copyOfRange(blockIds, blockIdIndices[chunkIndex],
blockIdIndices[chunkIndex + 1]), channel.closeAndRead());
if (!downloadFileManager.registerTempFileToClean(targetFile)) {
targetFile.delete();
}
Expand All @@ -183,7 +209,8 @@ public void onComplete(String streamId) throws IOException {
public void onFailure(String streamId, Throwable cause) throws IOException {
channel.close();
// On receipt of a failure, fail every block from chunkIndex onwards.
String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length);
String[] remainingBlockIds =
Arrays.copyOfRange(blockIds, blockIdIndices[chunkIndex], blockIds.length);
failRemainingBlocks(remainingBlockIds, cause);
targetFile.delete();
}
Expand Down
Loading