-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-5633] Fixing performance regression in HoodieSparkRecord
#7769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b287072
3130aa8
5be80db
122e9c8
51dff0b
ceacbc6
0556d1d
50a6307
150bd85
1e062f2
757997f
b7b706f
65f545d
c79f2df
37225a0
c367473
8dc8f30
f06701b
ac1819d
da4ea35
dfdbe9c
6d13330
6f627a6
7c18557
ebad3b3
3509b7e
9c5c9fc
a3cdd3e
21c1a56
06ca3c1
845b3b1
b1670bf
ca13108
ab05ada
622be32
e498a67
5cb4e7a
5446734
f0ffe7c
b86cca7
60c6afc
7832e4f
4c3b1bc
d9ab8da
5e9f80f
bd0216a
a0d21f3
d379195
3252447
24020a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -136,24 +136,22 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props | |
| if (record.shouldIgnore(schema, config.getProps())) { | ||
| return; | ||
| } | ||
| // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema | ||
| HoodieRecord rewriteRecord; | ||
| if (schemaOnReadEnabled) { | ||
| rewriteRecord = record.rewriteRecordWithNewSchema(schema, config.getProps(), writeSchemaWithMetaFields); | ||
| } else { | ||
| rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields); | ||
| } | ||
|
|
||
| MetadataValues metadataValues = new MetadataValues().setFileName(path.getName()); | ||
| rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues); | ||
| HoodieRecord populatedRecord = | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change similar to #7769 (comment) |
||
| record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, config.getProps()); | ||
|
|
||
| if (preserveMetadata) { | ||
| fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields); | ||
| fileWriter.write(record.getRecordKey(), populatedRecord, writeSchemaWithMetaFields); | ||
| } else { | ||
| fileWriter.writeWithMetadata(record.getKey(), rewriteRecord, writeSchemaWithMetaFields); | ||
| fileWriter.writeWithMetadata(record.getKey(), populatedRecord, writeSchemaWithMetaFields); | ||
| } | ||
| // update the new location of record, so we know where to find it next | ||
|
|
||
| // Update the new location of record, so we know where to find it next | ||
| record.unseal(); | ||
| record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId())); | ||
| record.seal(); | ||
|
|
||
| recordsWritten++; | ||
| insertRecordsWritten++; | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -374,20 +374,16 @@ public void write(HoodieRecord<T> oldRecord) { | |
| } | ||
|
|
||
| protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema schema, Properties prop, boolean shouldPreserveRecordMetadata) throws IOException { | ||
| HoodieRecord rewriteRecord; | ||
| if (schemaOnReadEnabled) { | ||
| rewriteRecord = record.rewriteRecordWithNewSchema(schema, prop, writeSchemaWithMetaFields); | ||
| } else { | ||
| rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields); | ||
| } | ||
| // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the | ||
| // file holding this record even in cases when overall metadata is preserved | ||
| MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName()); | ||
| rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues); | ||
| HoodieRecord populatedRecord = | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change similar to #7769 (comment) |
||
| record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, prop); | ||
|
|
||
| if (shouldPreserveRecordMetadata) { | ||
| fileWriter.write(key.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields); | ||
| fileWriter.write(key.getRecordKey(), populatedRecord, writeSchemaWithMetaFields); | ||
| } else { | ||
| fileWriter.writeWithMetadata(key, rewriteRecord, writeSchemaWithMetaFields); | ||
| fileWriter.writeWithMetadata(key, populatedRecord, writeSchemaWithMetaFields); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,8 +27,6 @@ | |
| import org.apache.hudi.common.util.ClosableIterator; | ||
| import org.apache.hudi.common.util.InternalSchemaCache; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.common.util.collection.MappingIterator; | ||
| import org.apache.hudi.common.util.collection.Pair; | ||
| import org.apache.hudi.common.util.queue.HoodieExecutor; | ||
| import org.apache.hudi.config.HoodieWriteConfig; | ||
| import org.apache.hudi.exception.HoodieException; | ||
|
|
@@ -94,8 +92,8 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table, | |
|
|
||
| // In case Advanced Schema Evolution is enabled we might need to rewrite currently | ||
| // persisted records to adhere to an evolved schema | ||
| Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>, Schema>> schemaEvolutionTransformerOpt = | ||
| composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, table.getMetaClient()); | ||
| Option<Function<HoodieRecord, HoodieRecord>> schemaEvolutionTransformerOpt = | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simplifying implementation by supplying reader-scheme into the method |
||
| composeSchemaEvolutionTransformer(readerSchema, writerSchema, baseFile, writeConfig, table.getMetaClient()); | ||
|
|
||
| // Check whether the writer schema is simply a projection of the file's one, ie | ||
| // - Its field-set is a proper subset (of the reader schema) | ||
|
|
@@ -130,29 +128,27 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table, | |
| (left, right) -> | ||
| left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields())); | ||
| recordSchema = mergeHandle.getWriterSchemaWithMetaFields(); | ||
| } else if (schemaEvolutionTransformerOpt.isPresent()) { | ||
| recordIterator = new MappingIterator<>(baseFileRecordIterator, | ||
| schemaEvolutionTransformerOpt.get().getLeft().apply(isPureProjection ? writerSchema : readerSchema)); | ||
| recordSchema = schemaEvolutionTransformerOpt.get().getRight(); | ||
| } else { | ||
| recordIterator = baseFileRecordIterator; | ||
| recordSchema = isPureProjection ? writerSchema : readerSchema; | ||
| } | ||
|
|
||
| boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig); | ||
|
|
||
| wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> { | ||
| HoodieRecord newRecord; | ||
| if (schemaEvolutionTransformerOpt.isPresent()) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Schema Evolution transformer now is applied inside the transformer as opposed to as an MappingIterator previously |
||
| newRecord = schemaEvolutionTransformerOpt.get().apply(record); | ||
| } else if (shouldRewriteInWriterSchema) { | ||
| newRecord = record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema); | ||
| } else { | ||
| newRecord = record; | ||
| } | ||
|
|
||
| // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific | ||
| // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of | ||
| // it since these records will be put into queue of QueueBasedExecutorFactory. | ||
| if (shouldRewriteInWriterSchema) { | ||
| try { | ||
| return record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema).copy(); | ||
| } catch (IOException e) { | ||
| LOG.error("Error rewrite record with new schema", e); | ||
| throw new HoodieException(e); | ||
| } | ||
| } else { | ||
| return record.copy(); | ||
| } | ||
| return isBufferingRecords ? newRecord.copy() : newRecord; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| }, table.getPreExecuteRunnable()); | ||
|
|
||
| wrapper.execute(); | ||
|
|
@@ -173,10 +169,11 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table, | |
| } | ||
| } | ||
|
|
||
| private Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>, Schema>> composeSchemaEvolutionTransformer(Schema writerSchema, | ||
| HoodieBaseFile baseFile, | ||
| HoodieWriteConfig writeConfig, | ||
| HoodieTableMetaClient metaClient) { | ||
| private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTransformer(Schema recordSchema, | ||
| Schema writerSchema, | ||
| HoodieBaseFile baseFile, | ||
| HoodieWriteConfig writeConfig, | ||
| HoodieTableMetaClient metaClient) { | ||
| Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(writeConfig.getInternalSchema()); | ||
| // TODO support bootstrap | ||
| if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { | ||
|
|
@@ -214,18 +211,12 @@ private Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>, Sche | |
| || SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; | ||
| if (needToReWriteRecord) { | ||
| Map<String, String> renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema); | ||
| return Option.of(Pair.of( | ||
| (schema) -> (record) -> { | ||
| try { | ||
| return record.rewriteRecordWithNewSchema( | ||
| schema, | ||
| writeConfig.getProps(), | ||
| newWriterSchema, renameCols); | ||
| } catch (IOException e) { | ||
| LOG.error("Error rewrite record with new schema", e); | ||
| throw new HoodieException(e); | ||
| } | ||
| }, newWriterSchema)); | ||
| return Option.of(record -> { | ||
| return record.rewriteRecordWithNewSchema( | ||
| recordSchema, | ||
| writeConfig.getProps(), | ||
| newWriterSchema, renameCols); | ||
| }); | ||
| } else { | ||
| return Option.empty(); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,31 +33,47 @@ | |
|
|
||
| public class ExecutorFactory { | ||
|
|
||
| public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig, | ||
| public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config, | ||
| Iterator<I> inputItr, | ||
| HoodieConsumer<O, E> consumer, | ||
| Function<I, O> transformFunction) { | ||
| return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop()); | ||
| return create(config, inputItr, consumer, transformFunction, Functions.noop()); | ||
| } | ||
|
|
||
| public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig, | ||
| public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config, | ||
| Iterator<I> inputItr, | ||
| HoodieConsumer<O, E> consumer, | ||
| Function<I, O> transformFunction, | ||
| Runnable preExecuteRunnable) { | ||
| ExecutorType executorType = hoodieConfig.getExecutorType(); | ||
|
|
||
| ExecutorType executorType = config.getExecutorType(); | ||
| switch (executorType) { | ||
| case BOUNDED_IN_MEMORY: | ||
| return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer, | ||
| return new BoundedInMemoryExecutor<>(config.getWriteBufferLimitBytes(), inputItr, consumer, | ||
| transformFunction, preExecuteRunnable); | ||
| case DISRUPTOR: | ||
| return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer, | ||
| transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable); | ||
| return new DisruptorExecutor<>(config.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer, | ||
| transformFunction, config.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable); | ||
| case SIMPLE: | ||
| return new SimpleExecutor<>(inputItr, consumer, transformFunction); | ||
| default: | ||
| throw new HoodieException("Unsupported Executor Type " + executorType); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Checks whether configured {@link HoodieExecutor} buffer records (for ex, by holding them | ||
| * in the queue) | ||
| */ | ||
| public static boolean isBufferingRecords(HoodieWriteConfig config) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not make this a property of ExecutorType? so we don't need this extra helper
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. Will address in a follow-up (to avoid re-running CI)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually realized that this is not possible unfortunately: we're copying in transformers which we pass as args to ctor of the respective Executor, therefore we can't just call a method on it |
||
| ExecutorType executorType = config.getExecutorType(); | ||
| switch (executorType) { | ||
| case BOUNDED_IN_MEMORY: | ||
| case DISRUPTOR: | ||
| return true; | ||
| case SIMPLE: | ||
| return false; | ||
| default: | ||
| throw new HoodieException("Unsupported Executor Type " + executorType); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is primary change in this file: instead of the sequence:
rewriteRecord/rewriteRecordWithNewSchema(rewriting record into schema bearing meta-fields)updateMetadataValueswe call directly
prependMetaFieldsAPI (expanding record's schema w/ meta-fields and setting them at the same time)