Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Jan 28, 2023

Change Logs

This change addresses a few performance regressions in HoodieSparkRecord identified during our recent benchmarking::

  1. HoodieSparkRecord rewrites records using rewriteRecord and rewriteRecordWithNewSchema which do Schema traversals for every record. Instead we should do schema traversal only once and produce a transformer that will directly create new record from the old one.

  2. HoodieRecords currently could be rewritten multiple times even in cases when just meta-fields need to be mixed into the schema (in that case, HoodieSparkRecord simply wraps source InternalRow into HoodieInternalRow holding the meta-fields). This is problematic due to a) UnsafeProjection re-using mutable row (as a buffer) to avoid allocation of small objects leading to b) recursive overwriting of the same row.

  3. Records are currently copied for every Executor even for Simple one which actually is not buffering any records and therefore doesn't require records to be copied.

To address aforementioned gaps following changes have been implemented:

  1. Row writing utils have been revisited to decouple RowWriter generation from actual application (to the source row; that way actual application is much more efficient). Additionally, considerable number of row-writing utilities have been eliminated as these are purely duplicative.

  2. HoodieRecord.rewriteRecord API is renamed into prependMetaFields to clearly disambiguate it from rewriteRecordWithSchema

  3. WriteHandle and HoodieMergeHelper implementations are substantially simplified and streamlined accommodating being rebased onto prependMetaFields

Impact

TBA

Risk level (write none, low medium or high below)

Low

Documentation Update

N/A

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@alexeykudinkin alexeykudinkin changed the title [MINOR] Fixing performance regression in HoodieSparkRecord [HUDI-5633] Fixing performance regression in HoodieSparkRecord Jan 28, 2023
@alexeykudinkin alexeykudinkin added priority:blocker Production down; release blocker area:performance Performance optimizations labels Jan 28, 2023
@xushiyan xushiyan self-assigned this Jan 28, 2023
val recordKey = sparkKeyGenerator.getRecordKey(internalRow, sourceStructType)
val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, sourceStructType)
df.queryExecution.toRdd.mapPartitions { it =>
// TODO elaborate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please elaborate.

df.queryExecution.toRdd.mapPartitions { it =>
// TODO elaborate
val (unsafeProjection, transformer) = if (shouldDropPartitionColumns) {
(generateUnsafeProjection(dataFileStructType, dataFileStructType), genUnsafeRowWriter(sourceStructType, dataFileStructType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice optimization

val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(schema1, schemaMerge)
val newRow = rowWriter(oldRow)

val serDe = sparkAdapter.createSparkRowSerDe(schemaMerge)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's test all data types as much as possible (all primitives, arrays, maps etc).
also, lets test some null values for some of the fields.

// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into queue of QueueBasedExecutorFactory.
return isBufferingRecords ? newRecord.copy() : newRecord;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

val rowUpdater = new RowUpdater(newRow)

(fieldUpdater, ordinal, value) => {
// TODO elaborate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does elaborate mean? Need more comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self to explain particular piece

HoodieAvroUtils.removeFields(schema, partitionColumns.toSet.asJava)
}

def generateSparkSchemaWithoutPartitionColumns(partitionParam: String, schema: StructType): StructType = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code

extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty)
: (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just search-and-replace removing invalid type references

val rowUpdater = new RowUpdater(newRow)

(fieldUpdater, ordinal, value) => {
// TODO elaborate
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self to explain particular piece

InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType, newStructType, renameCols);
UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType, newStructType).apply(rewriteRecord);
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, newStructType, Collections.emptyMap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

miss renameCols

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType).apply(rewriteRecord);
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, targetStructType, Collections.emptyMap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewriteRecord is used to do compatible with avro schema evolution. And rewriteRecordWithNewSchema is used to do hudi schema evolution. They have different logic in type change.
For example, we can not change IntegerType -> DecimalType in rewriteRecord. But we can change it in rewriteRecordWithNewSchema.

Should we keep this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key point here is that we actually don't need actually rewriteRecord operation as such: historically it has been used to expand (Avro) schema of the record to accommodate for meta-fields, which is actually handled differently now in HoodieSparkRecord

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should support avro type promotion in this function in HoodieSparkRecord. We have discussed it before in #7003.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure i follow your train of thought -- there's no more rewriteRecord method, instead it's being replaced w/ prependMetaFields

HoodieRecord rewrittenRecord = schemaOnReadEnabled ? finalRecord.get().rewriteRecordWithNewSchema(schema, recordProperties, writeSchemaWithMetaFields)
: finalRecord.get().rewriteRecord(schema, recordProperties, writeSchemaWithMetaFields);

// Prepend meta-fields into the record
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is primary change in this file: instead of the sequence:

  • rewriteRecord/rewriteRecordWithNewSchema (rewriting record into schema bearing meta-fields)
  • updateMetadataValues

we call directly prependMetaFields API (expanding record's schema w/ meta-fields and setting them at the same time)


HoodieOperation operation = withOperationField
? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
? HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getNullableValAsString considers the situation that field does not exist, But structType.fieldIndex not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Let me revisit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, looked at it again and in that case withOperationField is true so this field has to be present in the schema


MetadataValues metadataValues = new MetadataValues().setFileName(path.getName());
rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues);
HoodieRecord populatedRecord =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change similar to #7769 (comment)

// file holding this record even in cases when overall metadata is preserved
MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName());
rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues);
HoodieRecord populatedRecord =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change similar to #7769 (comment)


wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
HoodieRecord newRecord;
if (schemaEvolutionTransformerOpt.isPresent()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema Evolution transformer now is applied inside the transformer as opposed to as an MappingIterator previously

InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType, newStructType, renameCols);
UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType, newStructType).apply(rewriteRecord);
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, newStructType, Collections.emptyMap());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType).apply(rewriteRecord);
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, targetStructType, Collections.emptyMap());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key point here is that we actually don't need actually rewriteRecord operation as such: historically it has been used to expand (Avro) schema of the record to accommodate for meta-fields, which is actually handled differently now in HoodieSparkRecord


HoodieOperation operation = withOperationField
? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
? HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Let me revisit

// persisted records to adhere to an evolved schema
Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>, Schema>> schemaEvolutionTransformerOpt =
composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, table.getMetaClient());
Option<Function<HoodieRecord, HoodieRecord>> schemaEvolutionTransformerOpt =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplifying implementation by supplying reader-scheme into the method

StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get())
.getRecordKey(data, structType).toString() : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
return keyGeneratorOpt.isPresent()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is unchanged (there was a change but it got reverted)

Function1<InternalRow, UnsafeRow> unsafeRowWriter =
HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, newStructType, renameCols);

boolean containMetaFields = hasMetaFields(newStructType);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping into HoodieInternalRow has been removed and abstracted to only occur in prependMetaFields API (considerably simplifying the impl here)


HoodieOperation operation = withOperationField
? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
? HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, looked at it again and in that case withOperationField is true so this field has to be present in the schema


private boolean useWriterSchema;

public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code

prevFieldName
}

private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These utility classes are borrowed from Spark's AvroSerializer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these the same across different spark versions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they 1:1 w/ Spark type system

public void consume(HoodieRecord record) {
try {
Thread.currentThread().wait();
synchronized (this) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fixing the flaky test

df.queryExecution.toRdd.mapPartitions { it =>
val targetStructType = if (shouldDropPartitionColumns) dataFileStructType else writerStructType
// NOTE: To make sure we properly transform records
val targetStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, targetStructType)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This replaces old way of rewriteRecord then doing unsafeProjection w/ just applying new UnsafeRowWriter

import org.apache.spark.sql.{HoodieInternalRowUtils, Row, SparkSession}
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}

class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAfterAll {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are merged into another test suite

// do change type operation
val updateChange = TableChanges.ColumnUpdateChange.get(internalSchema)
updateChange.updateColumnType("id", Types.LongType.get).updateColumnType("comb", Types.FloatType.get).updateColumnType("com1", Types.DoubleType.get).updateColumnType("col0", Types.StringType.get).updateColumnType("col1", Types.FloatType.get).updateColumnType("col11", Types.DoubleType.get).updateColumnType("col12", Types.StringType.get).updateColumnType("col2", Types.DoubleType.get).updateColumnType("col21", Types.StringType.get).updateColumnType("col3", Types.StringType.get).updateColumnType("col31", Types.DecimalType.get(18, 9)).updateColumnType("col4", Types.DecimalType.get(18, 9)).updateColumnType("col41", Types.StringType.get).updateColumnType("col5", Types.DateType.get).updateColumnType("col51", Types.DecimalType.get(18, 9)).updateColumnType("col6", Types.StringType.get)
updateChange.updateColumnType("id", Types.LongType.get)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes just breaking down unreadably long line

Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm. did not review HoodieInternalRowUtils line by line, which should have more UT coverage

* Checks whether configured {@link HoodieExecutor} buffer records (for ex, by holding them
* in the queue)
*/
public static boolean isBufferingRecords(HoodieWriteConfig config) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not make this a property of ExecutorType? so we don't need this extra helper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Will address in a follow-up (to avoid re-running CI)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually realized that this is not possible unfortunately: we're copying in transformers which we pass as args to ctor of the respective Executor, therefore we can't just call a method on it

prevFieldName
}

private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these the same across different spark versions?

return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer);
}

static void updateMetadataValuesInternal(GenericRecord avroRecord, MetadataValues metadataValues) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a helper method that fits in some avro utils

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very specific in its purpose though -- it overwrites meta-fields that shouldn't occur outside of HoodieRecord

@codope codope force-pushed the ak/rfc46-perf-fix branch from 1aa1f0c to ca3b5fa Compare January 30, 2023 12:55
@alexeykudinkin alexeykudinkin force-pushed the ak/rfc46-perf-fix branch 2 times, most recently from 9bfa20f to 62f1095 Compare January 31, 2023 00:36
.map(HoodieAvroUtils::getRootLevelFieldName)
.collect(Collectors.toList());
Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This are now properly set by actual FileReaders

try {
Function<HoodieRecord, HoodieRecord> transformer = record -> {
String recordKey = record.getRecordKey(schema, Option.of(keyGenerator));
return createNewMetadataBootstrapRecord(recordKey, partitionPath, recordMerger.getRecordType())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating createNewMetadataBootstrapRecord is the crux of the change here:

  • Now metadata bootstrap record is properly initialized with schema including all of the meta-fields and not the one truncated to just record-key (HoodieSparkRecord is not able to handle such truncated meta-fields schema)
  • Avro path is restored to what it was before RFC-46

…t propagated as data columns;

Cleaning up dead-code
}

/*
* log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaning up dead commented code (not updated since 2018)

final Dataset<Row> src = source.drop(colsToDrop);
// log.info("Final Schema from Source is :" + src.schema());
// Remove Hoodie meta columns
final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change here is to avoid keeping partition-path as this will make HoodieSparkSqlWriter treat it as data column which is not compatible w/ SparkRecordMerger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_hoodie_partition_path isn't used neither in the source or DS and according to the commented out code it's been used previously but is not used anymore.

#7132 recently added config that forces all of the meta-fields to be cleaned up, but it's false by default

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:performance Performance optimizations priority:blocker Production down; release blocker

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

6 participants