Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,9 @@ private PartitionReader createReader(
clientFactory,
startMapIndex,
endMapIndex,
callback);
callback,
startChunkIndex,
endChunkIndex);
default:
throw new CelebornIOException(
String.format("Unknown storage info %s to read location %s", storageInfo, location));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,16 @@ public class DfsPartitionReader implements PartitionReader {
private int numChunks = 0;
private int returnedChunks = 0;
private int currentChunkIndex = 0;
private int startChunkIndex;
private int endChunkIndex;
private final List<Long> chunkOffsets = new ArrayList<>();
private TransportClient client;
private PbStreamHandler streamHandler;
private MetricsCallback metricsCallback;
private FileSystem hadoopFs;

private Path dataFilePath;

public DfsPartitionReader(
CelebornConf conf,
String shuffleKey,
Expand All @@ -80,7 +84,9 @@ public DfsPartitionReader(
TransportClientFactory clientFactory,
int startMapIndex,
int endMapIndex,
MetricsCallback metricsCallback)
MetricsCallback metricsCallback,
int startChunkIndex,
int endChunkIndex)
throws IOException {
this.conf = conf;
shuffleChunkSize = conf.dfsReadChunkSize();
Expand Down Expand Up @@ -121,23 +127,33 @@ public DfsPartitionReader(
"read shuffle file from DFS failed, filePath: " + location.getStorageInfo().getFilePath(),
e);
}

if (endMapIndex != Integer.MAX_VALUE) {
dfsInputStream =
hadoopFs.open(new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
if (endMapIndex != Integer.MAX_VALUE && endMapIndex != -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the feature of optimizing skew partition read has been enabled, there will be no more sort. So here should not read the sorted DFS file.
The condition should be the condition startMapIndex > endMapIndex to be true and read chunk range from DFS shuffle file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my understanding, currently celeborn supports both sort-based skew-handling and the sort-free skew-handling introduced in #2373. If we change the condition to startMapIndex > endMapIndex, then it will no longer support sort-based skew-handling at all.
There are some other sort-based skew-handling code in current codebase, such as PartitionFilesSorter. If we want to completely remove the code related to sort-based skew-handling, would it be more appropriate to do it in a new PR after #3118 is merged into the main branch?
Please correct me if there's anything I haven't understood correctly.

dataFilePath = new Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath()));
dfsInputStream = hadoopFs.open(dataFilePath);
chunkOffsets.addAll(
getChunkOffsetsFromSortedIndex(conf, location, startMapIndex, endMapIndex));
} else {
dfsInputStream = hadoopFs.open(new Path(location.getStorageInfo().getFilePath()));
dataFilePath = new Path(location.getStorageInfo().getFilePath());
dfsInputStream = hadoopFs.open(dataFilePath);
chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
}
this.startChunkIndex = startChunkIndex == -1 ? 0 : startChunkIndex;
this.endChunkIndex =
endChunkIndex == -1
? chunkOffsets.size() - 2
: Math.min(chunkOffsets.size() - 2, endChunkIndex);
this.currentChunkIndex = this.startChunkIndex;
this.numChunks = this.endChunkIndex - this.startChunkIndex + 1;
logger.debug(
"DFS {} index count:{} offsets:{}",
"DFS {} total offset count:{} chunk count: {} "
+ "start chunk index:{} end chunk index:{} offsets:{}",
location.getStorageInfo().getFilePath(),
chunkOffsets.size(),
this.numChunks,
this.startChunkIndex,
this.endChunkIndex,
chunkOffsets);
if (chunkOffsets.size() > 1) {
numChunks = chunkOffsets.size() - 1;
if (this.numChunks > 0) {
fetchThread =
ThreadUtils.newDaemonSingleThreadExecutor(
"celeborn-client-dfs-partition-fetcher" + location.getStorageInfo().getFilePath());
Expand Down Expand Up @@ -197,7 +213,7 @@ public ByteBuf next() throws IOException, InterruptedException {
fetchThread.submit(
() -> {
try {
Copy link
Contributor

@wangshengjie123 wangshengjie123 Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this catch case, we could avoid to fallback to read SortedPath when optimizing skew partition read is enabled , code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, PTAL @wangshengjie123

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Z1Wu , sorry for late reply.

LGTM, thanks

while (!closed && currentChunkIndex < numChunks) {
while (!closed && currentChunkIndex <= endChunkIndex) {
while (results.size() >= fetchMaxReqsInFlight) {
Thread.sleep(50);
}
Expand All @@ -208,16 +224,10 @@ public ByteBuf next() throws IOException, InterruptedException {
try {
dfsInputStream.readFully(offset, buffer);
} catch (IOException e) {
logger.warn(
"read DFS {} failed will retry, error detail {}",
location.getStorageInfo().getFilePath(),
e);
logger.warn("read DFS {} failed will retry, error detail {}", dataFilePath, e);
try {
dfsInputStream.close();
dfsInputStream =
hadoopFs.open(
new Path(
Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
dfsInputStream = hadoopFs.open(dataFilePath);
dfsInputStream.readFully(offset, buffer);
} catch (IOException ex) {
logger.warn(
Expand Down
Loading