Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ public UnsafeSorterIterator getIterator(int startIndex) throws IOException {
return iter;
} else {
LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
logger.debug("number of spillWriters: {}", spillWriters.size());
int i = 0;
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
if (i + spillWriter.recordsSpilled() > startIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,19 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
// Variables that change with every record read:
private int recordLength;
private long keyPrefix;
private int numRecords;
private final int numRecords;
private int numRecordsRemaining;

private byte[] arr = new byte[1024 * 1024];
private Object baseObject = arr;
private final long baseOffset = Platform.BYTE_ARRAY_OFFSET;
private final TaskContext taskContext = TaskContext.get();

private final long buffSize;
private final File file;
private final BlockId blockId;
private final SerializerManager serializerManager;

public UnsafeSorterSpillReader(
SerializerManager serializerManager,
File file,
Expand All @@ -72,12 +77,27 @@ public UnsafeSorterSpillReader(
bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
}

try (InputStream bs = new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
Copy link
Member

@viirya viirya Sep 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here to say we don't need to hold the file open until we actually want to load the records, so we can prevent too many file open issue partially.

DataInputStream dataIn = new DataInputStream(serializerManager.wrapStream(blockId, bs))) {
this.numRecords = dataIn.readInt();
this.numRecordsRemaining = numRecords;
}

this.buffSize = bufferSizeBytes;
this.file = file;
this.blockId = blockId;
this.serializerManager = serializerManager;

logger.debug("bufSize: {}, file: {}, records: {}", buffSize, file, this.numRecords);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this log useful? If the number of spill readers is so many, I guess we don't want to see so many log info?

}

private void initStreams() throws IOException {
final InputStream bs =
new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
new NioBufferedFileInputStream(file, (int) buffSize);
try {
this.in = serializerManager.wrapStream(blockId, bs);
this.din = new DataInputStream(this.in);
numRecords = numRecordsRemaining = din.readInt();
this.numRecordsRemaining = din.readInt();
} catch (IOException e) {
Closeables.close(bs, /* swallowIOException = */ true);
throw e;
Expand All @@ -104,6 +124,10 @@ public void loadNext() throws IOException {
if (taskContext != null) {
taskContext.killTaskIfInterrupted();
}
if (this.din == null) {
// Good time to init (if all files are opened, we can get Too Many files exception)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks confusing. Maybe It is the time to initialize and hold the input stream of the spill file for loading records. Keeps the input stream open too early will very possibly encounter too many file open issue.

initStreams();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this solve the too many file open issue? When we do merging the readers, it is possibly that all the readers in priority queue still have records and are asked for records (so their files open). You still can encounter too many file open issue.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. PR has been tried with queries involving window functions (e.g Q67) for which it worked fine.

During spill merges (esp getSortedIterator), it is possible to encounter too many open files issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to first describe more about how to fix this issue in the description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @viirya , we're using priority queue to do merge sort, this will turn out to be all the readers in the priority queue is opened, so still cannot solve this issue.

I think a valid fix is to control the number of concurrent merged files, like MR's io.sort.factor.

Also we still need to address similar issue in ExternalSorter and other places in Shuffle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this PR does not reduce the number of total open files. Since this PR tries to open files when they are required, this PR may reduce possibility of occurring an error of too may open files.

As @viirya pointed out, it is necessary to provide a feature to control the number of opening files at one point (e.g. priority queue).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The valid fix should be to import a new config to control the concurrent number of opened spill files, it also means you should use some data structure to keep and track the request of open spill files.

recordLength = din.readInt();
keyPrefix = din.readLong();
if (recordLength > arr.length) {
Expand Down