Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,9 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
longArray.set(pos * 2 + 1, keyHashcode);
isDefined = true;

if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remind me some more details of BytesToBytesMap? What happens if we don't grow? I don't see a loop in this method and not sure how the job hangs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sure.

The client of BytesToBytesMap, like HashAggregate, will call lookup to find a Location to write value. The returned location will be used to do append (Location.append). Everytime after we append a key/value, we check if it is time to grow internal array and grow up it if needed.

lookup delegates looking up keys to safeLookup. Its control flow looks like:

int pos = hash & mask;
int step = 1;
// an infinite loop until find matching key or empty slot.
while (true) {
  if (empty slot found) {
    ...
  } else {
    // check if matching key
    ...
  }
  // increase pos and step
  pos = (pos + step) & mask;
  step++;
}

So the job hangs in this loop because it can not find any empty location as the internal array is full.

We early stop growing the internal array due to wrongly check array size at:

if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
  ...
}

Another point #26828 (comment) is we may want to set canGrowArray to false once we are close to max capacity, so we can avoid infinite loop again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't lookup throw OOM if no space can be found?

Copy link
Member Author

@viirya viirya Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookup just looks up for empty slot in the internal array for a new key. It does not allocate memory. The array is allocated/grown up in last time append.

Once an empty slot (a Location object) is found, the client of BytesToBytesMap may call append to the Location, OOM could be thrown during append new key/value.

Copy link
Contributor

@cloud-fan cloud-fan Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me confirm the problem: so append mistakenly think there is enough space, and doesn't grow the array. This makes the client of BytesToBytesMap keeping calling lookup and hang. Is my understanding correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append mistakenly think it can not grow the array anymore so does not grow the array. It keeps append value until full. Then the client calling lookup can not find an empty slot and gets stuck in infinite loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the append doesn't grow the array while it should. This makes BytesToBytesMap malformed (the array is not big enough to serve the data region) and causes problems.

// We use two array entries per key, so the array size is twice the capacity.
// We should compare the current capacity of the array, instead of its size.
if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya . Can we have explicit test cases for these boundary conditions?
(Sorry, I removed my previous comment.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max capacity is big number. Is it ok to have unit test allocating such big array?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guessed we can use mock with simple growAndRehash (although I didn't try).

Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. I don't want to block this PR and you because this looks urgent. I'll try that later by myself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Sounds good. I will also do test to see if I can add it. Thanks for the suggestion!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, even mock with growAndRehash to avoid allocate the array, append still needs to allocate memory page for values to insert. Mock with append makes less sense because it is where this logic remains.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you already tested that. Got it. Thank you for spending time for that.

try {
growAndRehash();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I also think that we should false canGrowArray like:

if (numKeys >= growthThreshold && longArray.size() / 2 >= MAX_CAPACITY) {
  canGrowArray = false;
}

So as we reach max capacity of the map, canGrowArray is set to false. We can fail next append and let the map spill and fallback to sort-based aggregation in HashAggregate. Thus we can prevent a similar forever-loop happens when we reach max capacity.

cc @cloud-fan @felixcheung

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, we won't call growAndRehash here, is it expected?

Copy link
Member Author

@viirya viirya Dec 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was not meaning to replace current condition, but to add another check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me think about it. if making sense, will submit another PR for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya I'm encountering the same problem that you describe here. When the map is close to MAX_CAPACITY and needs to grow, numKeys >= growthThreshold && longArray.size() / 2 >= MAX_CAPACITY is true. This prevents the map from resizing, but currently canGrowArray remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. This ultimately causes the query to fail in the UnsafeKVExternalSorter constructor.

It looks like you didn't submit a PR for this - is there a reason why not? If there's no problem with your suggested fix, I can submit a PR now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick response! I saw that PR (#26914) but I don't think it solves the problem I'm encountering. That PR stops accepting new keys once we have reached MAX_CAPACITY - 1 keys, but this is too late. By that time, we will have far exceeded the growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting, causing the query to fail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem I posted above, is when we reach MAX_CAPACITY, a forever-loop happens during calling lookup. The previous PR fixed it. Sounds like you are encountering another problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting, causing the query to fail.

In UnsafeKVExternalSorter, it will check if the long array can be reused or not. Isn't? If it cannot be reused, a new pointer array will be created, no?

Copy link
Contributor

@ankurdave ankurdave Sep 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like you are encountering another problem?

You're right, it's not the same problem - I was mistaken in saying so earlier.

In UnsafeKVExternalSorter, it will check if the long array can be reused or not. Isn't? If it cannot be reused, a new pointer array will be created, no?

Yes, but by this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed SPARK-32872 and submitted #29744 to fix this.

} catch (SparkOutOfMemoryError oom) {
Expand Down