Skip to content

Conversation

@xiarixiaoyao
Copy link
Contributor

…iting

Tips

What is the purpose of the pull request

(For example: This pull request adds quick-start document.)

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@xiarixiaoyao
Copy link
Contributor Author

@xushiyan @YannByron @leesf @alexeykudinkin
could you pls help me review this pr, thanks

its a serious bug
before pacth: 295553 ms
after patch: 5279 ms

    val dfx = spark.range(0, 50000000).toDF("id")
      .withColumn("c1", lit("dsfsdfsafsasdfa"))
      .withColumn("c2", lit(12.99d))
      .withColumn("c3", lit(1))

    val avroSchemax = AvroConversionUtils.convertStructTypeToAvroSchema(dfx.schema, "record", "my")
    val sparkSchema = dfx.schema
    spark.sparkContext.getConf.registerAvroSchemas(avroSchemax)

    val testRDD = HoodieSparkUtils.createRdd(dfx,"record", "my", Some(avroSchemax))

// warm up
    dfx.count()
    spark.time(testRDD.foreach(f => f))

Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. thanks for making the patch!

@xushiyan xushiyan added the priority:critical Production degraded; pipelines stalled label Mar 26, 2022
Copy link
Contributor

@alexeykudinkin alexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this @xiarixiaoyao !

* @return converter accepting Avro payload and transforming it into a Catalyst one (in the form of [[InternalRow]])
*/
def createAvroToInternalRowConverter(rootAvroType: Schema, rootCatalystType: StructType): GenericRecord => Option[InternalRow] =
record => sparkAdapter.createAvroDeserializer(rootAvroType, rootCatalystType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao that's a very sneaky issue

Please leave a note in a comment explaining what was the issue and how you've addressed it.

However, i'd suggest to just pull out SparkAdapter from the closure keeping the API of this method intact: we don't want to push casting (to InternalRow/GenericRecord) onto the users

def createAvroToInternalRowConverter(rootAvroType: Schema, rootCatalystType: StructType): GenericRecord => Option[InternalRow] = {
    val deserilizer = sparkAdapter.createAvroDeserializer(rootAvroType, rootCatalystType)
    record => deserializer.deserialize(record).map(_.asInstanceOf[InternalRow])
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@alexeykudinkin
Copy link
Contributor

@xiarixiaoyao also, please update the PR description so that whoever will be reviewing this PR will not need to go to JIRA to understand what it is about

@alexeykudinkin
Copy link
Contributor

@xiarixiaoyao @xushiyan let's also think about how we can prevent such regressions in the future. Ideally, we should check in the test that you used to validate it as a smoke test (checking in that it completes, say, w/in 10s).

Alternatively, we can also reshape and commit it as a benchmark (also committing its output as a reference) so that we can at least verify it manually by running it periodically and comparing against the baseline.

@xiarixiaoyao xiarixiaoyao force-pushed the regression branch 2 times, most recently from d85c848 to dcec725 Compare March 27, 2022 03:16
@alexeykudinkin
Copy link
Contributor

@xiarixiaoyao thanks for fixing this!

Can you please also add the test that you've used as a benchmark (based on JMH)?

@xiarixiaoyao
Copy link
Contributor Author

@xiarixiaoyao thanks for fixing this!

Can you please also add the test that you've used as a benchmark (based on JMH)?

i will add benchmark to cover it. not jmh.

@YannByron
Copy link
Contributor

LGTM

@xiarixiaoyao
Copy link
Contributor Author

@alexeykudinkin
add hoodie benchmark framework which modified from spark(diff spark has diff benchmark framework, we cannot refrerence directly)
add benchmark for avroDerSer

i think we should add more benchmarks for hot codes to prevent regressions

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@xushiyan xushiyan merged commit 9da2dd4 into apache:master Mar 27, 2022
@nsivabalan
Copy link
Contributor

thanks for finding the regression and fixing it. good job!

vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
apache#5137)

* [HUDI-3719] High performance costs of AvroSerizlizer in DataSource writing

* add benchmark framework which modify from spark
add avroSerDerBenchmark
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:critical Production degraded; pipelines stalled

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[SUPPORT] High performance costs of AvroSerializer in Datasource writing

6 participants