Skip to content

Conversation

@LuciferYang
Copy link
Contributor

What changes were proposed in this pull request?

This pr introduce two new toMap method to o.a.spark.util.collection.Utils, use while loop manually style to optimize the performance of keys.zip(values).toMap code pattern in Spark.

Why are the changes needed?

Performance improvement

Does this PR introduce any user-facing change?

No

How was this patch tested?

Pass GitHub Actions

@LuciferYang
Copy link
Contributor Author

@LuciferYang LuciferYang changed the title [SPARK-40175][CORE][SQL][MLLIB][STREAMING] Optimize the performance of keys.zip(values).toMap [SPARK-40175][CORE][SQL][MLLIB][STREAMING] Optimize the performance of keys.zip(values).toMap code pattern Sep 14, 2022
def ndcgAt(k: Int): Double = {
require(k > 0, "ranking position k should be positive")
rdd.map { case (pred, lab, rel) =>
import org.apache.spark.util.collection.Utils
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No big deal but why import here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mistake, I will fix it.

while (keyIter.hasNext && valueIter.hasNext) {
map.put(keyIter.next(), valueIter.next())
}
map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this immutable ?
I believe the original code would have resulted in an immutable map ?

Copy link
Contributor Author

@LuciferYang LuciferYang Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method return a Java Map, how to make it immutable...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap to Collections.unmodifiableMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u are right, change to return Collections.unmodifiableMap(map)

import scala.collection.JavaConverters._
keys.zip(values).toMap.asJava
Utils.toJavaMap(keys, values)
}
Copy link
Contributor

@mridulm mridulm Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this method anymore ? Why not replace with Utils.toJavaMap entirely (in JavaTypeInference) ? Any issues with that ?

Copy link
Contributor Author

@LuciferYang LuciferYang Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayBasedMapData#toJavaMap is already a never used method, I think we can delete it, but need to confirm whether MiMa check can pass first

EDIT: ArrayBasedMapData#toJavaMap not unused method, it used by JavaTypeInference, sorry for missing what @mridulm said

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me check this later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayBasedMapData is not a public API and shouldn't be tracked by mima.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Copy link
Contributor Author

@LuciferYang LuciferYang Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm @cloud-fan

If

StaticInvoke(
ArrayBasedMapData.getClass,
ObjectType(classOf[JMap[_, _]]),
"toJavaMap",
keyData :: valueData :: Nil,
returnNullable = false)

change to

        StaticInvoke(
          Utils.getClass,
          ObjectType(classOf[JMap[_, _]]),
          "toJavaMap",
          keyData :: valueData :: Nil,
          returnNullable = false)

The signature to toJavaMap method in collection.Utils need change from

def toJavaMap[K, V](keys: Iterable[K], values: Iterable[V]): java.util.Map[K, V]

to

def toJavaMap[K, V](keys: Array[K], values: Array[V]): java.util.Map[K, V]

Otherwise, relevant tests will fail as due to

16:20:35.587 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 375, Column 50: No applicable constructor/method found for actual parameters "java.lang.Object[], java.lang.Object[]"; candidates are: "public static java.util.Map org.apache.spark.util.collection.Utils.toJavaMap(scala.collection.Iterable, scala.collection.Iterable)"
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 375, Column 50: No applicable constructor/method found for actual parameters "java.lang.Object[], java.lang.Object[]"; candidates are: "public static java.util.Map org.apache.spark.util.collection.Utils.toJavaMap(scala.collection.Iterable, scala.collection.Iterable)"

If the method signature is def toJavaMap[K, V](keys: Array[K], values: Array[V]): java.util.Map[K, V], it will limit the use scope of this method, so I prefer to retain the ArrayBasedMapData#toJavaMap method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it acceptable to retain this method?

@LuciferYang LuciferYang changed the title [SPARK-40175][CORE][SQL][MLLIB][STREAMING] Optimize the performance of keys.zip(values).toMap code pattern [SPARK-40175][CORE][SQL][MLLIB][DSTREAM][R] Optimize the performance of keys.zip(values).toMap code pattern Sep 15, 2022
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 8b6b3be Sep 16, 2022
LuciferYang added a commit to LuciferYang/spark that referenced this pull request Sep 20, 2022
…of `keys.zip(values).toMap` code pattern

This pr introduce two new `toMap` method to `o.a.spark.util.collection.Utils`,  use `while loop manually` style to optimize the performance of `keys.zip(values).toMap` code pattern in Spark.

Performance improvement

No

Pass GitHub Actions

Closes apache#37876 from LuciferYang/SPARK-40175.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Sep 21, 2022
….zipWithIndex.toMap` code pattern

### What changes were proposed in this pull request?
Similar as #37876,  this pr introduce a new `toMapWithIndex` method to `o.a.spark.util.collection.Utils`, use `while loop manually style` to optimize the performance of `keys.zipWithIndex.toMap` code pattern in Spark.

### Why are the changes needed?
Performance improvement

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #37940 from LuciferYang/SPARK-40494.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@caican00
Copy link
Contributor

I tested it using a real job and the bottleneck seems to be still in MapBuilder.$plus$eq
And i have used a for loop manually for testing but still no significant improvement.
Do you know the reason?
Gently ping @LuciferYang
image

@caican00
Copy link
Contributor

my testing code:

import spark.implicits._
val df = spark.sql("select triggerId,adMetadata,userData from iceberg_test_catalog.test_db.test_table" +
  " where date = 20220801 and adMetadata is not null and size(adMetadata) > 0  and userData is not null")
  .map(x =>
    x.getInt(0) + "_" + x.getStruct(1).toString() + "_" + x.getMap(2).keySet.toString()
  )
  .count()

@caican00
Copy link
Contributor

my testing code:

import spark.implicits._
val df = spark.sql("select triggerId,adMetadata,userData from iceberg_test_catalog.test_db.test_table" +
  " where date = 20220801 and adMetadata is not null and size(adMetadata) > 0  and userData is not null")
  .map(x =>
    x.getInt(0) + "_" + x.getStruct(1).toString() + "_" + x.getMap(2).keySet.toString()
  )
  .count()

iceberg_test_catalog.test_db.test_table is a iceberg table with complex types such as struct, Map, etc

@LuciferYang
Copy link
Contributor Author

@caican00 Yes, it was also clear before that when the collection size is greater than 500, there will be no significant performance improvement.

In fact, according to the test results from GA, when the collection size is between 100 and 500, the revenue is about 10%, and this is only a partial tuning. I don't think it will significantly improve the overall situation, but this performance is similar to keys.zip(values)(collection.breakOut[From, T, To])

@LuciferYang
Copy link
Contributor Author

my testing code:

import spark.implicits._
val df = spark.sql("select triggerId,adMetadata,userData from iceberg_test_catalog.test_db.test_table" +
  " where date = 20220801 and adMetadata is not null and size(adMetadata) > 0  and userData is not null")
  .map(x =>
    x.getInt(0) + "_" + x.getStruct(1).toString() + "_" + x.getMap(2).keySet.toString()
  )
  .count()

iceberg_test_catalog.test_db.test_table is a iceberg table with complex types such as struct, Map, etc

Cloud you provide a test case that is easier to test in Spark project?

@caican00
Copy link
Contributor

the collection size is greater than 500

the collection size is greater than 500, is it the number of elements in a collection?

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Sep 27, 2022

the collection size is greater than 500

the collection size is greater than 500, is it the number of elements in a collection?

Yes.

EDIT: keys.size > 500

Previous test results

#37609 (comment)

@LuciferYang
Copy link
Contributor Author

@caican00 I'm not sure whether it would be better change o use toJavaMap or toJavaMap.asScala here? Can you help test it?

@LuciferYang
Copy link
Contributor Author

@caican00 Or if you can provide a micro-bench that can be run with GA, I am happy to continue to solve your issue together

@caican00
Copy link
Contributor

my testing code:

import spark.implicits._
val df = spark.sql("select triggerId,adMetadata,userData from iceberg_test_catalog.test_db.test_table" +
  " where date = 20220801 and adMetadata is not null and size(adMetadata) > 0  and userData is not null")
  .map(x =>
    x.getInt(0) + "_" + x.getStruct(1).toString() + "_" + x.getMap(2).keySet.toString()
  )
  .count()

iceberg_test_catalog.test_db.test_table is a iceberg table with complex types such as struct, Map, etc

Cloud you provide a test case that is easier to test in Spark project?

Data collection is hard to build, and i'm sorry that it's hard to provide cases that are easy to test in Spark project

@LuciferYang
Copy link
Contributor Author

@caican00 I'm not sure whether it would be better change o use toJavaMap or toJavaMap.asScala here? Can you help test it?

Hmm... Could you try this one?

@caican00
Copy link
Contributor

@caican00 I'm not sure whether it would be better change o use toJavaMap or toJavaMap.asScala here? Can you help test it?

yep. i would change to use toJavaMap.asScala and test it with my real case.

@caican00
Copy link
Contributor

caican00 commented Sep 27, 2022

@caican00 I'm not sure whether it would be better change o use toJavaMap or toJavaMap.asScala here? Can you help test it?

Hmm... Could you try this one?

Okay. I'll tell you the testing result in a soon.

@LuciferYang
Copy link
Contributor Author

Thanks ~ @caican00 waiting for your feedback :)

@caican00
Copy link
Contributor

caican00 commented Sep 27, 2022

Thanks ~ @caican00 waiting for your feedback :)

@LuciferYang ~~
The bottleneck was still in MapBuilder.$plus$eq, because the method convertToMapData return a immutable.Map value and i must to convert mutable.Map to immutable.Map invoking toMap.
Method toMap invokes MapBuilder.$plus$eq.

image
image
image

@caican00
Copy link
Contributor

caican00 commented Sep 27, 2022

using java Map had the same problem.
image

@caican00
Copy link
Contributor

Thanks ~ @caican00 waiting for your feedback :)

The bottleneck was still in MapBuilder.$plus$eq, because the method convertToMapData return a immutable.Map value and i must to convert mutable.Map to immutable.Map invoking toMap @LuciferYang image image

any way to convert mutable.Map to immutable.Map without invoking toMap?

@LuciferYang
Copy link
Contributor Author

I haven't thought of a better way yet

@caican00
Copy link
Contributor

I haven't thought of a better way yet

Thanks. I'll share it with you if I can think of a better way

@caican00
Copy link
Contributor

caican00 commented Oct 19, 2022

I haven't thought of a better way yet

Thanks. I'll share it with you if I can think of a better way

@LuciferYang
why ArrayBasedMapData#toScalaMap method return immutable.Map type rather than mutable.Map type?
I write a simple case to compare building mutable.Map and building immutable.Map and building mutable.Map is about twice as fast as building immutable.Map

test code:

    case class Test(id: Int, name: String)

    // build immutable.Map
    val builder = immutable.Map.newBuilder[Test, String]
    var index: Int = 0
    val start = System.currentTimeMillis()
    while (index < 10000000) {
      builder += (Test(index, s"$index _ $index"), index.toString).asInstanceOf[(Test, String)]
      index += 1
    }
    builder.result()
    val end = System.currentTimeMillis()
    // scalastyle:off println
    println(s"immutable map spent time: ${(end - start)/1000}s")
    // scalastyle:off println


    // build mutable.Map
    val map = mutable.Map[Test, String]()
    index = 0
    val start1 = System.currentTimeMillis()
    while (index < 10000000) {
      map.put(Test(index, s"$index _ $index"), index.toString)
      index += 1
    }
    val end1 = System.currentTimeMillis()
    // scalastyle:off println
    println(s"mutable map spent time: ${(end1 - start1)/1000}s")
    // scalastyle:off println

test result:

5000000 elements:
immutable map spent time:  8s
mutable map spent time:  4s

10000000 elements:
immutable map spent time:  21s
mutable map spent time:  9s

@caican00
Copy link
Contributor

caican00 commented Oct 19, 2022

I haven't thought of a better way yet

Thanks. I'll share it with you if I can think of a better way

@LuciferYang why ArrayBasedMapData#toScalaMap method return immutable.Map type rather than mutable.Map type? I write a simple case to compare building mutable.Map and building immutable.Map and building mutable.Map is about twice as fast as building immutable.Map

test code:

    case class Test(id: Int, name: String)

    // build immutable.Map
    val builder = immutable.Map.newBuilder[Test, String]
    var index: Int = 0
    val start = System.currentTimeMillis()
    while (index < 10000000) {
      builder += (Test(index, s"$index _ $index"), index.toString).asInstanceOf[(Test, String)]
      index += 1
    }
    builder.result()
    val end = System.currentTimeMillis()
    // scalastyle:off println
    println(s"immutable map spent time: ${(end - start)/1000}s")
    // scalastyle:off println


    // build mutable.Map
    val map = mutable.Map[Test, String]()
    index = 0
    val start1 = System.currentTimeMillis()
    while (index < 10000000) {
      map.put(Test(index, s"$index _ $index"), index.toString)
      index += 1
    }
    val end1 = System.currentTimeMillis()
    // scalastyle:off println
    println(s"mutable map spent time: ${(end1 - start1)/1000}s")
    // scalastyle:off println

test result:

5000000 elements:
immutable map spent time:  8s
mutable map spent time:  4s

10000000 elements:
immutable map spent time:  21s
mutable map spent time:  9s

cc @cloud-fan

@LuciferYang
Copy link
Contributor Author

@caican00 For the performance gap between mutable.Map and immutable.Map, I think it is better to turn to the Scala community

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants