Skip to content

Conversation

@Ngone51
Copy link
Member

@Ngone51 Ngone51 commented May 22, 2024

What changes were proposed in this pull request?

This PR cleans up mapIdToMapIndex when the corresponding mapstatus is unregistered in three places:

  • removeMapOutput
  • removeOutputsByFilter
  • addMapOutput (old mapstatus overwritten)

Why are the changes needed?

There is only one valid mapstatus for the same mapIndex at the same time in Spark. mapIdToMapIndex should also follows the same rule to avoid chaos.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit tests.

Was this patch authored or co-authored using generative AI tooling?

No.

* Exposed for testing.
*/
private[this] val mapIdToMapIndex = new OpenHashMap[Long, Int]()
private[spark] val mapIdToMapIndex = new HashMap[Long, Int]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: Why change to HashMap from OpenHashMap ? (it is specialized for Long and Int)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the above question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenHashMap doesn't support remove operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes !

_numAvailableMapOutputs -= 1
mapStatusesDeleted(mapIndex) = mapStatuses(mapIndex)
val currentMapStatus = mapStatuses(mapIndex)
mapIdToMapIndex.remove(currentMapStatus.mapId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing it here will mean we cant query for it in mapStatusesDeleted, where we are relying on mapId -> mapIndex being in mapIdToMapIndex even when mapIndex is in mapStatusesDeleted

We should move this cleanup to when mapStatusesDeleted is being cleaned up.

Same applies to the cases below as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. But IIUC, mapStatusesDeleted will only be cleanedup when there is recovery happen using K8s. So it's not guaranteed to be always cleaned up in the end. I removed the dependency of mapIdToMapIndex for mapStatusesDeleted as it's not a common use case.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, @Ngone51 , this should have a new JIRA ID because the original is Apache Spark 3.5.0. This PR cannot be a follow-up of the released JIRA issue.

Screenshot 2024-05-22 at 10 57 58

@Ngone51 Ngone51 changed the title [SPARK-43043][FOLLOW-UP] Cleanup mapIdToMapIndex on mapoutput unregister [SPARK-48394][CORE] Cleanup mapIdToMapIndex on mapoutput unregister May 23, 2024
@Ngone51
Copy link
Member Author

Ngone51 commented May 23, 2024

@dongjoon-hyun Thanks for the reminder. Have created a separate ticket: SPARK-48394.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, thanks for fixing this @Ngone51 !

@Ngone51 Ngone51 requested a review from dongjoon-hyun May 24, 2024 00:25
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (with one minor test case prefix comment).

}
}

test("mapIdToMapIndex should cleanup unused mapIndexes after removeOutputsByFilter") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Jira ID test prefix for this bug fix.

@dongjoon-hyun
Copy link
Member

Merged to master only because this was defined as Improvement.

Screenshot 2024-05-24 at 16 02 01

@Ngone51
Copy link
Member Author

Ngone51 commented May 25, 2024

@dongjoon-hyun @mridulm Sorry, can we make it a bug and backport to maintenance release branches? This actually causes us an issue internally. I was pushing a quick fix before realizing it is the root cause.

The issue leads to shuffle fetch failure and the job failure in the end. It happens this way:

  1. Stage A computes the partition P0 by task t1 (TID, a.k.a mapId) on executor e1
  2. Executor Y starts deommission
  3. Executor Y reports false-positve lost to driver during its decommission
  4. Stage B reuse the shuffle dependency with Stage A, and computes the partition P0 again by task t2 on executor e2
  5. When task t2 finishes, we see two items ((t1 -> P0), (t2 -> P0)) for the same paritition in mapIdToMapIndex but only one item (mapStatuses(P0)=MapStatus(t2, e2)) in mapStatuses.
  6. Executor Y starts to migrate task t1's mapstatus (to executor e3 for example) and call updateMapOutput on driver. Regarding to 5), we'd use mapId (i.e., t1) to get mapIndex (i.e., P0) and use P0 to get task t2's mapstatus.
// updateMapOutput
val mapIndex = mapIdToMapIndex.get(mapId)
val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_))
  1. Task t2's mapstatus's location then would be updated to executor e3 but it's indeed still located on executor e2. This finally leads to the fetch failure in the end.

@mridulm
Copy link
Contributor

mridulm commented May 25, 2024

Looks like a valid bug to me - can you raise a backport please ?
@dongjoon-hyun , thoughts on changing the type from improvement to bug (to commit to 3.5) ?

@dongjoon-hyun
Copy link
Member

Of cource, @Ngone51 can.

Feel free to update the JIRA issue and backport this.

BTW, please create the JIRA issue properly next time, @Ngone51 , because it's used for our communication .

@Ngone51
Copy link
Member Author

Ngone51 commented May 26, 2024

Thanks. Created the backport PR (#46747) for branch-3.5.

Ngone51 added a commit to Ngone51/spark that referenced this pull request May 28, 2024
This PR cleans up `mapIdToMapIndex` when the corresponding mapstatus is unregistered in three places:
* `removeMapOutput`
* `removeOutputsByFilter`
* `addMapOutput` (old mapstatus overwritten)

There is only one valid mapstatus for the same `mapIndex` at the same time in Spark. `mapIdToMapIndex` should also follows the same rule to avoid chaos.

No.

Unit tests.

No.

Closes apache#46706 from Ngone51/SPARK-43043-followup.

Lead-authored-by: Yi Wu <[email protected]>
Co-authored-by: wuyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Ngone51 added a commit to Ngone51/spark that referenced this pull request May 31, 2024
This PR cleans up `mapIdToMapIndex` when the corresponding mapstatus is unregistered in three places:
* `removeMapOutput`
* `removeOutputsByFilter`
* `addMapOutput` (old mapstatus overwritten)

There is only one valid mapstatus for the same `mapIndex` at the same time in Spark. `mapIdToMapIndex` should also follows the same rule to avoid chaos.

No.

Unit tests.

No.

Closes apache#46706 from Ngone51/SPARK-43043-followup.

Lead-authored-by: Yi Wu <[email protected]>
Co-authored-by: wuyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
yaooqinn pushed a commit that referenced this pull request Jun 3, 2024
This PR backports #46706 to branch 3.5.

### What changes were proposed in this pull request?

This PR cleans up `mapIdToMapIndex` when the corresponding mapstatus is unregistered in three places:
* `removeMapOutput`
* `removeOutputsByFilter`
* `addMapOutput` (old mapstatus overwritten)

### Why are the changes needed?

There is only one valid mapstatus for the same `mapIndex` at the same time in Spark. `mapIdToMapIndex` should also follows the same rule to avoid chaos.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46768 from Ngone51/SPARK-48394-3.5.

Authored-by: Yi Wu <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
This PR backports apache#46706 to branch 3.5.

### What changes were proposed in this pull request?

This PR cleans up `mapIdToMapIndex` when the corresponding mapstatus is unregistered in three places:
* `removeMapOutput`
* `removeOutputsByFilter`
* `addMapOutput` (old mapstatus overwritten)

### Why are the changes needed?

There is only one valid mapstatus for the same `mapIndex` at the same time in Spark. `mapIdToMapIndex` should also follows the same rule to avoid chaos.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46768 from Ngone51/SPARK-48394-3.5.

Authored-by: Yi Wu <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants