diff --git a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java index ea3df88d95a..9c9e8fd20ca 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java +++ b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java @@ -589,7 +589,9 @@ private PartitionReader createReader( clientFactory, startMapIndex, endMapIndex, - callback); + callback, + startChunkIndex, + endChunkIndex); default: throw new CelebornIOException( String.format("Unknown storage info %s to read location %s", storageInfo, location)); diff --git a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java index 313fe77d1c4..6cca0d47be0 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java @@ -66,12 +66,16 @@ public class DfsPartitionReader implements PartitionReader { private int numChunks = 0; private int returnedChunks = 0; private int currentChunkIndex = 0; + private int startChunkIndex; + private int endChunkIndex; private final List chunkOffsets = new ArrayList<>(); private TransportClient client; private PbStreamHandler streamHandler; private MetricsCallback metricsCallback; private FileSystem hadoopFs; + private Path dataFilePath; + public DfsPartitionReader( CelebornConf conf, String shuffleKey, @@ -80,7 +84,9 @@ public DfsPartitionReader( TransportClientFactory clientFactory, int startMapIndex, int endMapIndex, - MetricsCallback metricsCallback) + MetricsCallback metricsCallback, + int startChunkIndex, + int endChunkIndex) throws IOException { this.conf = conf; shuffleChunkSize = conf.dfsReadChunkSize(); @@ -121,23 +127,33 @@ public DfsPartitionReader( "read shuffle file from DFS failed, filePath: " + location.getStorageInfo().getFilePath(), e); } - - if (endMapIndex != Integer.MAX_VALUE) { - dfsInputStream = - hadoopFs.open(new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath()))); + if (endMapIndex != Integer.MAX_VALUE && endMapIndex != -1) { + dataFilePath = new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())); + dfsInputStream = hadoopFs.open(dataFilePath); chunkOffsets.addAll( getChunkOffsetsFromSortedIndex(conf, location, startMapIndex, endMapIndex)); } else { - dfsInputStream = hadoopFs.open(new Path(location.getStorageInfo().getFilePath())); + dataFilePath = new Path(location.getStorageInfo().getFilePath()); + dfsInputStream = hadoopFs.open(dataFilePath); chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location)); } + this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex; + this.endChunkIndex = + endChunkIndex == -1 + ? chunkOffsets.size() - 2 + : Math.min(chunkOffsets.size() - 2, endChunkIndex); + this.currentChunkIndex = this.startChunkIndex; + this.numChunks = this.endChunkIndex - this.startChunkIndex + 1; logger.debug( - "DFS {} index count:{} offsets:{}", + "DFS {} total offset count:{} chunk count: {} " + + "start chunk index:{} end chunk index:{} offsets:{}", location.getStorageInfo().getFilePath(), chunkOffsets.size(), + this.numChunks, + this.startChunkIndex, + this.endChunkIndex, chunkOffsets); - if (chunkOffsets.size() > 1) { - numChunks = chunkOffsets.size() - 1; + if (this.numChunks > 0) { fetchThread = ThreadUtils.newDaemonSingleThreadExecutor( "celeborn-client-dfs-partition-fetcher" + location.getStorageInfo().getFilePath()); @@ -197,7 +213,7 @@ public ByteBuf next() throws IOException, InterruptedException { fetchThread.submit( () -> { try { - while (!closed && currentChunkIndex < numChunks) { + while (!closed && currentChunkIndex <= endChunkIndex) { while (results.size() >= fetchMaxReqsInFlight) { Thread.sleep(50); } @@ -208,16 +224,10 @@ public ByteBuf next() throws IOException, InterruptedException { try { dfsInputStream.readFully(offset, buffer); } catch (IOException e) { - logger.warn( - "read DFS {} failed will retry, error detail {}", - location.getStorageInfo().getFilePath(), - e); + logger.warn("read DFS {} failed will retry, error detail {}", dataFilePath, e); try { dfsInputStream.close(); - dfsInputStream = - hadoopFs.open( - new Path( - Utils.getSortedFilePath(location.getStorageInfo().getFilePath()))); + dfsInputStream = hadoopFs.open(dataFilePath); dfsInputStream.readFully(offset, buffer); } catch (IOException ex) { logger.warn(