diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java index 991e52982cf74..8cc8c07a02ec2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java @@ -23,11 +23,11 @@ import org.apache.hudi.client.utils.LazyIterableIterator; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.util.ExecutorFactory; import java.util.Iterator; import java.util.List; @@ -104,7 +104,7 @@ private static Function, HoodieInsertValueGenResult { HoodieRecord clonedRecord = shouldClone ? record.copy() : record; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 02053127d9c83..ef63e3ba727d4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -236,21 +236,25 @@ private Option prepareRecord(HoodieRecord hoodieRecord) { // If the format can not record the operation field, nullify the DELETE payload manually. boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField(); recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord)); - Option finalRecord = nullifyPayload ? Option.empty() : Option.of(hoodieRecord); + + Option finalRecordOpt = nullifyPayload ? Option.empty() : Option.of(hoodieRecord); // Check for delete - if (finalRecord.isPresent() && !finalRecord.get().isDelete(schema, recordProperties)) { - // Check for ignore ExpressionPayload - if (finalRecord.get().shouldIgnore(schema, recordProperties)) { - return finalRecord; + if (finalRecordOpt.isPresent() && !finalRecordOpt.get().isDelete(schema, recordProperties)) { + HoodieRecord finalRecord = finalRecordOpt.get(); + // Check if the record should be ignored (special case for [[ExpressionPayload]]) + if (finalRecord.shouldIgnore(schema, recordProperties)) { + return finalRecordOpt; } - // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema - HoodieRecord rewrittenRecord = schemaOnReadEnabled ? finalRecord.get().rewriteRecordWithNewSchema(schema, recordProperties, writeSchemaWithMetaFields) - : finalRecord.get().rewriteRecord(schema, recordProperties, writeSchemaWithMetaFields); + + // Prepend meta-fields into the record + MetadataValues metadataValues = populateMetadataFields(finalRecord); + HoodieRecord populatedRecord = + finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties); + // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be put into the recordList(List). - HoodieRecord populatedRecord = populateMetadataFields(rewrittenRecord.copy(), writeSchemaWithMetaFields, recordProperties); - finalRecord = Option.of(populatedRecord); + finalRecordOpt = Option.of(populatedRecord.copy()); if (isUpdateRecord || isLogCompaction) { updatedRecordsWritten++; } else { @@ -258,7 +262,7 @@ private Option prepareRecord(HoodieRecord hoodieRecord) { } recordsWritten++; } else { - finalRecord = Option.empty(); + finalRecordOpt = Option.empty(); recordsDeleted++; } @@ -267,7 +271,7 @@ private Option prepareRecord(HoodieRecord hoodieRecord) { // part of marking // record successful. hoodieRecord.deflate(); - return finalRecord; + return finalRecordOpt; } catch (Exception e) { LOG.error("Error writing record " + hoodieRecord, e); writeStatus.markFailure(hoodieRecord, e, recordMetadata); @@ -275,7 +279,7 @@ private Option prepareRecord(HoodieRecord hoodieRecord) { return Option.empty(); } - private HoodieRecord populateMetadataFields(HoodieRecord hoodieRecord, Schema schema, Properties prop) throws IOException { + private MetadataValues populateMetadataFields(HoodieRecord hoodieRecord) { MetadataValues metadataValues = new MetadataValues(); if (config.populateMetaFields()) { String seqId = @@ -292,7 +296,7 @@ private HoodieRecord populateMetadataFields(HoodieRecord hoodieRecord, Schema metadataValues.setOperation(hoodieRecord.getOperation().getName()); } - return hoodieRecord.updateMetadataValues(schema, prop, metadataValues); + return metadataValues; } private void initNewStatus() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java index f110bf585dbe3..e4985907e2e38 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java @@ -18,13 +18,19 @@ package org.apache.hudi.io; -import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.avro.JsonProperties; +import org.apache.avro.Schema; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; + /** * This class is essentially same as Create Handle but overrides two things * 1) Schema : Metadata bootstrap writes only metadata fields as part of write. So, setup the writer schema accordingly. @@ -34,14 +40,28 @@ */ public class HoodieBootstrapHandle extends HoodieCreateHandle { + // NOTE: We have to use schema containing all the meta-fields in here b/c unlike for [[HoodieAvroRecord]], + // [[HoodieSparkRecord]] requires records to always bear either all or no meta-fields in the + // record schema (ie partial inclusion of the meta-fields in the schema is not allowed) + public static final Schema METADATA_BOOTSTRAP_RECORD_SCHEMA = createMetadataBootstrapRecordSchema(); + public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { super(config, commitTime, hoodieTable, partitionPath, fileId, - Option.of(HoodieAvroUtils.RECORD_KEY_SCHEMA), taskContextSupplier); + Option.of(METADATA_BOOTSTRAP_RECORD_SCHEMA), taskContextSupplier); } @Override public boolean canWrite(HoodieRecord record) { return true; } + + private static Schema createMetadataBootstrapRecordSchema() { + List fields = + HoodieRecord.HOODIE_META_COLUMNS.stream() + .map(metaField -> + new Schema.Field(metaField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)) + .collect(Collectors.toList()); + return Schema.createRecord("HoodieRecordKey", "", "", false, fields); + } } \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 96611959f9a5a..4d3f52a5ba6aa 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -136,24 +136,22 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props if (record.shouldIgnore(schema, config.getProps())) { return; } - // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema - HoodieRecord rewriteRecord; - if (schemaOnReadEnabled) { - rewriteRecord = record.rewriteRecordWithNewSchema(schema, config.getProps(), writeSchemaWithMetaFields); - } else { - rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields); - } + MetadataValues metadataValues = new MetadataValues().setFileName(path.getName()); - rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues); + HoodieRecord populatedRecord = + record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, config.getProps()); + if (preserveMetadata) { - fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields); + fileWriter.write(record.getRecordKey(), populatedRecord, writeSchemaWithMetaFields); } else { - fileWriter.writeWithMetadata(record.getKey(), rewriteRecord, writeSchemaWithMetaFields); + fileWriter.writeWithMetadata(record.getKey(), populatedRecord, writeSchemaWithMetaFields); } - // update the new location of record, so we know where to find it next + + // Update the new location of record, so we know where to find it next record.unseal(); record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId())); record.seal(); + recordsWritten++; insertRecordsWritten++; } else { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 0460f88101c54..f92a1e73d5849 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -374,20 +374,16 @@ public void write(HoodieRecord oldRecord) { } protected void writeToFile(HoodieKey key, HoodieRecord record, Schema schema, Properties prop, boolean shouldPreserveRecordMetadata) throws IOException { - HoodieRecord rewriteRecord; - if (schemaOnReadEnabled) { - rewriteRecord = record.rewriteRecordWithNewSchema(schema, prop, writeSchemaWithMetaFields); - } else { - rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields); - } // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the // file holding this record even in cases when overall metadata is preserved MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName()); - rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues); + HoodieRecord populatedRecord = + record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, prop); + if (shouldPreserveRecordMetadata) { - fileWriter.write(key.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields); + fileWriter.write(key.getRecordKey(), populatedRecord, writeSchemaWithMetaFields); } else { - fileWriter.writeWithMetadata(key, rewriteRecord, writeSchemaWithMetaFields); + fileWriter.writeWithMetadata(key, populatedRecord, writeSchemaWithMetaFields); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 8e470471db91a..889d7a64769a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -95,7 +95,7 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction()); this.taskContextSupplier = taskContextSupplier; this.writeToken = makeWriteToken(); - schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema()); + this.schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema()); this.recordMerger = config.getRecordMerger(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 7f46e211e7391..3c8255a21b9a3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -27,8 +27,6 @@ import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.MappingIterator; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -94,8 +92,8 @@ public void runMerge(HoodieTable table, // In case Advanced Schema Evolution is enabled we might need to rewrite currently // persisted records to adhere to an evolved schema - Option>, Schema>> schemaEvolutionTransformerOpt = - composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, table.getMetaClient()); + Option> schemaEvolutionTransformerOpt = + composeSchemaEvolutionTransformer(readerSchema, writerSchema, baseFile, writeConfig, table.getMetaClient()); // Check whether the writer schema is simply a projection of the file's one, ie // - Its field-set is a proper subset (of the reader schema) @@ -130,29 +128,27 @@ public void runMerge(HoodieTable table, (left, right) -> left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields())); recordSchema = mergeHandle.getWriterSchemaWithMetaFields(); - } else if (schemaEvolutionTransformerOpt.isPresent()) { - recordIterator = new MappingIterator<>(baseFileRecordIterator, - schemaEvolutionTransformerOpt.get().getLeft().apply(isPureProjection ? writerSchema : readerSchema)); - recordSchema = schemaEvolutionTransformerOpt.get().getRight(); } else { recordIterator = baseFileRecordIterator; recordSchema = isPureProjection ? writerSchema : readerSchema; } + boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig); + wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> { + HoodieRecord newRecord; + if (schemaEvolutionTransformerOpt.isPresent()) { + newRecord = schemaEvolutionTransformerOpt.get().apply(record); + } else if (shouldRewriteInWriterSchema) { + newRecord = record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema); + } else { + newRecord = record; + } + // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be put into queue of QueueBasedExecutorFactory. - if (shouldRewriteInWriterSchema) { - try { - return record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema).copy(); - } catch (IOException e) { - LOG.error("Error rewrite record with new schema", e); - throw new HoodieException(e); - } - } else { - return record.copy(); - } + return isBufferingRecords ? newRecord.copy() : newRecord; }, table.getPreExecuteRunnable()); wrapper.execute(); @@ -173,10 +169,11 @@ public void runMerge(HoodieTable table, } } - private Option>, Schema>> composeSchemaEvolutionTransformer(Schema writerSchema, - HoodieBaseFile baseFile, - HoodieWriteConfig writeConfig, - HoodieTableMetaClient metaClient) { + private Option> composeSchemaEvolutionTransformer(Schema recordSchema, + Schema writerSchema, + HoodieBaseFile baseFile, + HoodieWriteConfig writeConfig, + HoodieTableMetaClient metaClient) { Option querySchemaOpt = SerDeHelper.fromJson(writeConfig.getInternalSchema()); // TODO support bootstrap if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { @@ -214,18 +211,12 @@ private Option>, Sche || SchemaCompatibility.checkReaderWriterCompatibility(newWriterSchema, writeSchemaFromFile).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; if (needToReWriteRecord) { Map renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema); - return Option.of(Pair.of( - (schema) -> (record) -> { - try { - return record.rewriteRecordWithNewSchema( - schema, - writeConfig.getProps(), - newWriterSchema, renameCols); - } catch (IOException e) { - LOG.error("Error rewrite record with new schema", e); - throw new HoodieException(e); - } - }, newWriterSchema)); + return Option.of(record -> { + return record.rewriteRecordWithNewSchema( + recordSchema, + writeConfig.getProps(), + newWriterSchema, renameCols); + }); } else { return Option.empty(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java index 7baada740899d..49e83733adf01 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java @@ -33,31 +33,47 @@ public class ExecutorFactory { - public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, + public static HoodieExecutor create(HoodieWriteConfig config, Iterator inputItr, HoodieConsumer consumer, Function transformFunction) { - return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop()); + return create(config, inputItr, consumer, transformFunction, Functions.noop()); } - public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, + public static HoodieExecutor create(HoodieWriteConfig config, Iterator inputItr, HoodieConsumer consumer, Function transformFunction, Runnable preExecuteRunnable) { - ExecutorType executorType = hoodieConfig.getExecutorType(); - + ExecutorType executorType = config.getExecutorType(); switch (executorType) { case BOUNDED_IN_MEMORY: - return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer, + return new BoundedInMemoryExecutor<>(config.getWriteBufferLimitBytes(), inputItr, consumer, transformFunction, preExecuteRunnable); case DISRUPTOR: - return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer, - transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable); + return new DisruptorExecutor<>(config.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer, + transformFunction, config.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable); case SIMPLE: return new SimpleExecutor<>(inputItr, consumer, transformFunction); default: throw new HoodieException("Unsupported Executor Type " + executorType); } } + + /** + * Checks whether configured {@link HoodieExecutor} buffer records (for ex, by holding them + * in the queue) + */ + public static boolean isBufferingRecords(HoodieWriteConfig config) { + ExecutorType executorType = config.getExecutorType(); + switch (executorType) { + case BOUNDED_IN_MEMORY: + case DISRUPTOR: + return true; + case SIMPLE: + return false; + default: + throw new HoodieException("Unsupported Executor Type " + executorType); + } + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java index b119e75e2177c..7756d2502e897 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java @@ -30,9 +30,9 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.SparkKeyGeneratorInterface; -import org.apache.hudi.util.HoodieSparkRecordUtils; import org.apache.spark.sql.HoodieInternalRowUtils; import org.apache.spark.sql.HoodieUnsafeRowUtils; import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath; @@ -44,14 +44,13 @@ import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; +import scala.Function1; import java.io.IOException; import java.util.Map; import java.util.Properties; import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS; -import static org.apache.hudi.util.HoodieSparkRecordUtils.getNullableValAsString; -import static org.apache.hudi.util.HoodieSparkRecordUtils.getValue; import static org.apache.spark.sql.types.DataTypes.BooleanType; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -150,8 +149,9 @@ public String getRecordKey(Schema recordSchema, Option keyGene return getRecordKey(); } StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); - return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()) - .getRecordKey(data, structType).toString() : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal()); + return keyGeneratorOpt.isPresent() + ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType).toString() + : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal()); } @Override @@ -173,7 +173,11 @@ public HoodieRecordType getRecordType() { @Override public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) { StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); - return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled); + Object[] objects = new Object[columns.length]; + for (int i = 0; i < objects.length; i++) { + objects[i] = getValue(structType, columns[i], data); + } + return objects; } @Override @@ -186,50 +190,27 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { } @Override - public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException { + public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) { StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema); - // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter - InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType); - UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType).apply(rewriteRecord); - - boolean containMetaFields = hasMetaFields(targetStructType); - UTF8String[] metaFields = tryExtractMetaFields(unsafeRow, targetStructType); - HoodieInternalRow internalRow = new HoodieInternalRow(metaFields, unsafeRow, containMetaFields); + HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(this.data, structType); + updateMetadataValuesInternal(updatableRow, metadataValues); - return new HoodieSparkRecord(getKey(), internalRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, false); + return new HoodieSparkRecord(getKey(), updatableRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, false); } @Override - public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) throws IOException { + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) { StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema); - // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter - InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType, newStructType, renameCols); - UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType, newStructType).apply(rewriteRecord); + Function1 unsafeRowWriter = + HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, newStructType, renameCols); - boolean containMetaFields = hasMetaFields(newStructType); - UTF8String[] metaFields = tryExtractMetaFields(unsafeRow, newStructType); - HoodieInternalRow internalRow = new HoodieInternalRow(metaFields, unsafeRow, containMetaFields); + UnsafeRow unsafeRow = unsafeRowWriter.apply(this.data); - return new HoodieSparkRecord(getKey(), internalRow, newStructType, getOperation(), this.currentLocation, this.newLocation, false); - } - - @Override - public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException { - StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); - HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(data, structType); - - metadataValues.getKv().forEach((key, value) -> { - int pos = structType.fieldIndex(key); - if (value != null) { - updatableRow.update(pos, CatalystTypeConverters.convertToCatalyst(value)); - } - }); - - return new HoodieSparkRecord(getKey(), updatableRow, structType, getOperation(), this.currentLocation, this.newLocation, copy); + return new HoodieSparkRecord(getKey(), unsafeRow, newStructType, getOperation(), this.currentLocation, this.newLocation, false); } @Override @@ -317,12 +298,13 @@ public HoodieSparkRecord copy() { public Comparable getOrderingValue(Schema recordSchema, Properties props) { StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); String orderingField = ConfigUtils.getOrderingField(props); - if (!HoodieInternalRowUtils.existField(structType, orderingField)) { - return 0; + scala.Option cachedNestedFieldPath = + HoodieInternalRowUtils.getCachedPosList(structType, orderingField); + if (cachedNestedFieldPath.isDefined()) { + NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get(); + return (Comparable) HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath); } else { - NestedFieldPath nestedFieldPath = HoodieInternalRowUtils.getCachedPosList(structType, orderingField); - Comparable value = (Comparable) HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath); - return value; + return 0; } } @@ -368,21 +350,28 @@ private static HoodieInternalRow wrapIntoUpdatableOverlay(InternalRow data, Stru } boolean containsMetaFields = hasMetaFields(structType); - UTF8String[] metaFields = tryExtractMetaFields(data, structType); + UTF8String[] metaFields = extractMetaFields(data, structType); return new HoodieInternalRow(metaFields, data, containsMetaFields); } - private static UTF8String[] tryExtractMetaFields(InternalRow row, StructType structType) { + private static UTF8String[] extractMetaFields(InternalRow row, StructType structType) { boolean containsMetaFields = hasMetaFields(structType); - if (containsMetaFields && structType.size() == 1) { - // Support bootstrap with RECORD_KEY_SCHEMA - return new UTF8String[] {row.getUTF8String(0)}; - } else if (containsMetaFields) { + if (containsMetaFields) { return HoodieRecord.HOODIE_META_COLUMNS.stream() .map(col -> row.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(col))) .toArray(UTF8String[]::new); - } else { - return new UTF8String[HoodieRecord.HOODIE_META_COLUMNS.size()]; + } + + return new UTF8String[HoodieRecord.HOODIE_META_COLUMNS.size()]; + } + + private static void updateMetadataValuesInternal(HoodieInternalRow updatableRow, MetadataValues metadataValues) { + String[] values = metadataValues.getValues(); + for (int pos = 0; pos < values.length; ++pos) { + String value = values[pos]; + if (value != null) { + updatableRow.update(pos, CatalystTypeConverters.convertToCatalyst(value)); + } } } @@ -416,7 +405,8 @@ private static HoodieRecord convertToHoodieSparkRecord(StructType s getValue(structType, recordKeyPartitionPathFieldPair.getRight(), record.data).toString()); HoodieOperation operation = withOperationField - ? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null; + ? HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD))) + : null; return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, structType, operation, record.copy); } @@ -434,4 +424,14 @@ private static void validateRow(InternalRow data, StructType schema) { ValidationUtils.checkState(isValid); } + + private static Object getValue(StructType structType, String fieldName, InternalRow row) { + scala.Option cachedNestedFieldPath = + HoodieInternalRowUtils.getCachedPosList(structType, fieldName); + if (cachedNestedFieldPath.isDefined()) { + return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, cachedNestedFieldPath.get()); + } else { + throw new HoodieException(String.format("Field at %s is not present in %s", fieldName, structType)); + } + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index 147b5cf6b3333..3b42d40a1a22a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -38,19 +38,7 @@ public class SparkLazyInsertIterable extends HoodieLazyInsertIterable { - private boolean useWriterSchema; - - public SparkLazyInsertIterable(Iterator> recordItr, - boolean areRecordsSorted, - HoodieWriteConfig config, - String instantTime, - HoodieTable hoodieTable, - String idPrefix, - TaskContextSupplier taskContextSupplier, - boolean useWriterSchema) { - super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier); - this.useWriterSchema = useWriterSchema; - } + private final boolean useWriterSchema; public SparkLazyInsertIterable(Iterator> recordItr, boolean areRecordsSorted, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriter.java index 58ee01182c373..3c69f0ab5c2ec 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriter.java @@ -21,18 +21,11 @@ import org.apache.avro.Schema; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.spark.sql.catalyst.CatalystTypeConverters; import org.apache.spark.sql.catalyst.InternalRow; import java.io.IOException; import java.util.Properties; -import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD; -import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD; -import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD; -import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD; -import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD; - public interface HoodieSparkFileWriter extends HoodieFileWriter { boolean canWrite(); @@ -51,14 +44,4 @@ default void write(String recordKey, HoodieRecord record, Schema schema, Propert default void writeWithMetadata(HoodieKey key, HoodieRecord record, Schema schema, Properties props) throws IOException { writeRowWithMetadata(key, (InternalRow) record.getData()); } - - default InternalRow prepRecordWithMetadata(HoodieKey key, InternalRow row, String instantTime, Integer partitionId, long recordIndex, String fileName) { - String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex); - row.update(COMMIT_TIME_METADATA_FIELD.ordinal(), CatalystTypeConverters.convertToCatalyst(instantTime)); - row.update(COMMIT_SEQNO_METADATA_FIELD.ordinal(), CatalystTypeConverters.convertToCatalyst(seqId)); - row.update(RECORD_KEY_METADATA_FIELD.ordinal(), CatalystTypeConverters.convertToCatalyst(key.getRecordKey())); - row.update(PARTITION_PATH_METADATA_FIELD.ordinal(), CatalystTypeConverters.convertToCatalyst(key.getPartitionPath())); - row.update(FILENAME_METADATA_FIELD.ordinal(), CatalystTypeConverters.convertToCatalyst(fileName)); - return row; - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index 320217aff1d4a..e24f03a4cb453 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -122,7 +122,8 @@ public Schema getSchema() { // and therefore if we convert to Avro directly we'll lose logical type-info. MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(conf, path); StructType structType = new ParquetToSparkSchemaConverter(conf).convert(messageType); - return SparkAdapterSupport$.MODULE$.sparkAdapter().getAvroSchemaConverters() + return SparkAdapterSupport$.MODULE$.sparkAdapter() + .getAvroSchemaConverters() .toAvroType(structType, true, messageType.getName(), StringUtils.EMPTY_STRING); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java index 3b4a86502d280..d601e6ded3e12 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java @@ -21,22 +21,32 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.io.storage.row.HoodieRowParquetConfig; import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.unsafe.types.UTF8String; import java.io.IOException; +import java.util.function.Function; + +import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD; +import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD; +import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD; +import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD; +import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD; public class HoodieSparkParquetWriter extends HoodieBaseParquetWriter implements HoodieSparkFileWriter { - // TODO: better code reuse - private final String fileName; - private final String instantTime; - private final TaskContextSupplier taskContextSupplier; + private final UTF8String fileName; + private final UTF8String instantTime; + private final boolean populateMetaFields; + private final HoodieRowParquetWriteSupport writeSupport; + private final Function seqIdGenerator; + public HoodieSparkParquetWriter(Path file, HoodieRowParquetConfig parquetConfig, String instantTime, @@ -44,19 +54,23 @@ public HoodieSparkParquetWriter(Path file, boolean populateMetaFields) throws IOException { super(file, parquetConfig); this.writeSupport = parquetConfig.getWriteSupport(); - this.fileName = file.getName(); - this.instantTime = instantTime; - this.taskContextSupplier = taskContextSupplier; + this.fileName = UTF8String.fromString(file.getName()); + this.instantTime = UTF8String.fromString(instantTime); this.populateMetaFields = populateMetaFields; + this.seqIdGenerator = recordIndex -> { + Integer partitionId = taskContextSupplier.getPartitionIdSupplier().get(); + return HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex); + }; } @Override public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOException { if (populateMetaFields) { - prepRecordWithMetadata(key, row, instantTime, - taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName); + UTF8String recordKey = UTF8String.fromString(key.getRecordKey()); + updateRecordMetadata(row, recordKey, key.getPartitionPath(), getWrittenRecordCount()); + super.write(row); - writeSupport.add(UTF8String.fromString(key.getRecordKey())); + writeSupport.add(recordKey); } else { super.write(row); } @@ -74,4 +88,16 @@ public void writeRow(String recordKey, InternalRow row) throws IOException { public void close() throws IOException { super.close(); } + + protected void updateRecordMetadata(InternalRow row, + UTF8String recordKey, + String partitionPath, + long recordCount) { + row.update(COMMIT_TIME_METADATA_FIELD.ordinal(), instantTime); + row.update(COMMIT_SEQNO_METADATA_FIELD.ordinal(), UTF8String.fromString(seqIdGenerator.apply(recordCount))); + row.update(RECORD_KEY_METADATA_FIELD.ordinal(), recordKey); + // TODO set partition path in ctor + row.update(PARTITION_PATH_METADATA_FIELD.ordinal(), UTF8String.fromString(partitionPath)); + row.update(FILENAME_METADATA_FIELD.ordinal(), fileName); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java index 40154d8675cff..b95f1b1ef743b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java @@ -480,7 +480,7 @@ private Object[] getNestedFieldValues(InternalRow row, HoodieUnsafeRowUtils.Nest private HoodieUnsafeRowUtils.NestedFieldPath[] resolveNestedFieldPaths(List fieldPaths, StructType schema, boolean returnNull) { try { return fieldPaths.stream() - .map(fieldPath -> HoodieUnsafeRowUtils$.MODULE$.composeNestedFieldPath(schema, fieldPath)) + .map(fieldPath -> HoodieUnsafeRowUtils$.MODULE$.composeNestedFieldPath(schema, fieldPath).get()) .toArray(HoodieUnsafeRowUtils.NestedFieldPath[]::new); } catch (Exception e) { if (returnNull) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java index 0bc15fa2106a5..3e63d0bb22c09 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java @@ -34,7 +34,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.parquet.avro.AvroReadSupport; import java.io.IOException; import java.util.List; @@ -62,10 +61,10 @@ public BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String .map(HoodieAvroUtils::getRootLevelFieldName) .collect(Collectors.toList()); Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns); - LOG.info("Schema to be used for reading record Keys :" + recordKeySchema); - AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema); - AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema); - executeBootstrap(bootstrapHandle, sourceFilePath, keyGenerator, partitionPath, avroSchema); + + LOG.info("Schema to be used for reading record keys: " + recordKeySchema); + + executeBootstrap(bootstrapHandle, sourceFilePath, keyGenerator, partitionPath, recordKeySchema); } catch (Exception e) { throw new HoodieException(e.getMessage(), e); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java index d7a4a2b52abf7..14a442c93b1d8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java @@ -18,7 +18,6 @@ package org.apache.hudi.table.action.bootstrap; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.client.bootstrap.BootstrapRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -47,6 +46,8 @@ import java.io.IOException; +import static org.apache.hudi.io.HoodieBootstrapHandle.METADATA_BOOTSTRAP_RECORD_SCHEMA; + class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler { private static final Logger LOG = LogManager.getLogger(OrcBootstrapMetadataHandler.class); @@ -75,7 +76,7 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, Path so wrapper = new BoundedInMemoryExecutor(config.getWriteBufferLimitBytes(), new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> { String recKey = keyGenerator.getKey(inp).getRecordKey(); - GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA); + GenericRecord gr = new GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA); gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey); BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java index d008d7cf9fd96..e5944225750d0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java @@ -18,11 +18,15 @@ package org.apache.hudi.table.action.bootstrap; -import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.avro.model.HoodieFileStatus; +import org.apache.hudi.client.bootstrap.BootstrapRecordPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.MetadataValues; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.model.HoodieSparkRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.config.HoodieWriteConfig; @@ -35,19 +39,24 @@ import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.HoodieInternalRowUtils$; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; import java.io.IOException; -import java.util.Properties; +import java.util.function.Function; + +import static org.apache.hudi.io.HoodieBootstrapHandle.METADATA_BOOTSTRAP_RECORD_SCHEMA; class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler { - private static final Logger LOG = LogManager.getLogger(ParquetBootstrapMetadataHandler.class); public ParquetBootstrapMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) { super(config, table, srcFileStatus); @@ -62,31 +71,29 @@ Schema getAvroSchema(Path sourceFilePath) throws IOException { } @Override - void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, - Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception { + protected void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, + Path sourceFilePath, + KeyGeneratorInterface keyGenerator, + String partitionPath, + Schema schema) throws Exception { BoundedInMemoryExecutor wrapper = null; - HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType()) + HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger(); + + HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType()) .getFileReader(table.getHadoopConf(), sourceFilePath); try { + Function transformer = record -> { + String recordKey = record.getRecordKey(schema, Option.of(keyGenerator)); + return createNewMetadataBootstrapRecord(recordKey, partitionPath, recordMerger.getRecordType()) + // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific + // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of + // it since these records will be inserted into the queue later. + .copy(); + }; + wrapper = new BoundedInMemoryExecutor(config.getWriteBufferLimitBytes(), - reader.getRecordIterator(), new BootstrapRecordConsumer(bootstrapHandle), record -> { - try { - String recKey = record.getRecordKey(reader.getSchema(), Option.of(keyGenerator)); - // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific - // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of - // it since these records will be inserted into the queue later. - HoodieRecord hoodieRecord = record - .rewriteRecord(reader.getSchema(), config.getProps(), HoodieAvroUtils.RECORD_KEY_SCHEMA) - .copy(); - MetadataValues metadataValues = new MetadataValues().setRecordKey(recKey); - return hoodieRecord - .updateMetadataValues(HoodieAvroUtils.RECORD_KEY_SCHEMA, new Properties(), metadataValues) - .newInstance(new HoodieKey(recKey, partitionPath)); - } catch (IOException e) { - LOG.error("Unable to overrideMetadataFieldValue", e); - return null; - } - }, table.getPreExecuteRunnable()); + reader.getRecordIterator(schema), new BootstrapRecordConsumer(bootstrapHandle), transformer, table.getPreExecuteRunnable()); + wrapper.execute(); } catch (Exception e) { throw new HoodieException(e); @@ -99,5 +106,31 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, bootstrapHandle.close(); } } + + private HoodieRecord createNewMetadataBootstrapRecord(String recordKey, String partitionPath, HoodieRecord.HoodieRecordType recordType) { + HoodieKey hoodieKey = new HoodieKey(recordKey, partitionPath); + switch (recordType) { + case AVRO: + GenericRecord avroRecord = new GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA); + avroRecord.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey); + BootstrapRecordPayload payload = new BootstrapRecordPayload(avroRecord); + return new HoodieAvroRecord<>(hoodieKey, payload); + + case SPARK: + StructType schema = HoodieInternalRowUtils$.MODULE$.getCachedSchema(METADATA_BOOTSTRAP_RECORD_SCHEMA); + UnsafeProjection unsafeProjection = HoodieInternalRowUtils$.MODULE$.getCachedUnsafeProjection(schema, schema); + + GenericInternalRow row = new GenericInternalRow(METADATA_BOOTSTRAP_RECORD_SCHEMA.getFields().size()); + row.update(HoodieRecord.RECORD_KEY_META_FIELD_ORD, UTF8String.fromString(recordKey)); + + UnsafeRow unsafeRow = unsafeProjection.apply(row); + + return new HoodieSparkRecord(hoodieKey, unsafeRow,false); + + default: + throw new UnsupportedOperationException(String.format("Record type %s is not supported yet!", recordType)); + } + + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java deleted file mode 100644 index 9a4aaa1dbc585..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.util; - -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; - -import org.apache.spark.sql.HoodieInternalRowUtils; -import org.apache.spark.sql.HoodieUnsafeRowUtils; -import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.StructType; - -public class HoodieSparkRecordUtils { - - public static Object getValue(StructType structType, String fieldName, InternalRow row) { - NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, fieldName); - return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList); - } - - /** - * Returns the string value of the given record {@code rec} and field {@code fieldName}. The field and value both could be missing. - * - * @param row The record - * @param fieldName The field name - * @return the string form of the field or empty if the schema does not contain the field name or the value is null - */ - public static Option getNullableValAsString(StructType structType, InternalRow row, String fieldName) { - String fieldVal = !HoodieInternalRowUtils.existField(structType, fieldName) - ? null : StringUtils.objToString(getValue(structType, fieldName, row)); - return Option.ofNullable(fieldVal); - } - - /** - * Gets record column values into one object. - * - * @param row InternalRow. - * @param columns Names of the columns to get values. - * @param structType {@link StructType} instance. - * @return Column value if a single column, or concatenated String values by comma. - */ - public static Object[] getRecordColumnValues(InternalRow row, - String[] columns, - StructType structType, boolean consistentLogicalTimestampEnabled) { - Object[] objects = new Object[columns.length]; - for (int i = 0; i < objects.length; i++) { - NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, columns[i]); - Object value = HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList); - objects[i] = value; - } - return objects; - } -} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 238fed526a18e..7e235993e3365 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -25,6 +25,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.util.ReflectionUtils import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.HoodieException import org.apache.hudi.index.SparkHoodieIndexFactory import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface} import org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, ParallelismHelper} @@ -175,6 +176,7 @@ object HoodieDatasetBulkInsertHelper val partitionPathMetaFieldOrd = schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD) // NOTE: Pre-combine field could be a nested field val preCombineFieldPath = composeNestedFieldPath(schema, preCombineFieldRef) + .getOrElse(throw new HoodieException(s"Pre-combine field $preCombineFieldRef is missing in $schema")) rdd.map { row => val rowKey = if (isGlobalIndex) { diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala index dfe3295cf003c..4d2ee33d154e9 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala @@ -18,304 +18,413 @@ package org.apache.spark.sql -import java.nio.charset.StandardCharsets -import java.util.HashMap -import java.util.concurrent.ConcurrentHashMap import org.apache.avro.Schema import org.apache.hbase.thirdparty.com.google.common.base.Supplier -import org.apache.hudi.AvroConversionUtils +import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate} import org.apache.hudi.exception.HoodieException -import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection +import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath} +import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String + +import java.util.concurrent.ConcurrentHashMap +import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections, Deque => JDeque, Map => JMap} +import java.util.function.{Function => JFunction} import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter object HoodieInternalRowUtils { - // Projection are all thread local. Projection is not thread-safe - val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType, StructType), UnsafeProjection]] = - ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType), UnsafeProjection]] { - override def get(): HashMap[(StructType, StructType), UnsafeProjection] = new HashMap[(StructType, StructType), UnsafeProjection] + private type RenamedColumnMap = JMap[String, String] + private type UnsafeRowWriter = InternalRow => UnsafeRow + + // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe + private val unsafeWriterThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] = + ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] { + override def get(): mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter] = + new mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter] }) - val schemaMap = new ConcurrentHashMap[Schema, StructType] - val orderPosListMap = new ConcurrentHashMap[(StructType, String), NestedFieldPath] - /** - * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema) - */ - def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = { - val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]]) - - for ((field, pos) <- newSchema.fields.zipWithIndex) { - var oldValue: AnyRef = null - var oldType: DataType = null - if (existField(oldSchema, field.name)) { - val oldField = oldSchema(field.name) - val oldPos = oldSchema.fieldIndex(field.name) - oldType = oldField.dataType - oldValue = oldRecord.get(oldPos, oldType) - } - if (oldValue != null) { - field.dataType match { - case structType: StructType => - val oldType = oldSchema(field.name).dataType.asInstanceOf[StructType] - val newValue = rewriteRecord(oldValue.asInstanceOf[InternalRow], oldType, structType) - newRow.update(pos, newValue) - case decimalType: DecimalType => - val oldFieldSchema = oldSchema(field.name).dataType.asInstanceOf[DecimalType] - if (decimalType.scale != oldFieldSchema.scale || decimalType.precision != oldFieldSchema.precision) { - newRow.update(pos, Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale)) - ) - } else { - newRow.update(pos, oldValue) - } - case t if t == oldType => newRow.update(pos, oldValue) - // Type promotion - case _: ShortType => - oldType match { - case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toShort) - case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible") - } - case _: IntegerType => - oldType match { - case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toInt) - case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toInt) - case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible") - } - case _: LongType => - oldType match { - case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toLong) - case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toLong) - case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toLong) - case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible") - } - case _: FloatType => - oldType match { - case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toFloat) - case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toFloat) - case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toFloat) - case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toFloat) - case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible") - } - case _: DoubleType => - oldType match { - case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toDouble) - case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toDouble) - case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toDouble) - case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toDouble) - case _: FloatType => newRow.update(pos, oldValue.asInstanceOf[Float].toDouble) - case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible") - } - case _: BinaryType if oldType.isInstanceOf[StringType] => newRow.update(pos, oldValue.asInstanceOf[String].getBytes) - case _ => newRow.update(pos, oldValue) - } - } else { - // TODO default value in newSchema - } - } + // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe + private val unsafeProjectionThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType), UnsafeProjection]] = + ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType), UnsafeProjection]] { + override def get(): mutable.HashMap[(StructType, StructType), UnsafeProjection] = + new mutable.HashMap[(StructType, StructType), UnsafeProjection] + }) - newRow - } + private val schemaMap = new ConcurrentHashMap[Schema, StructType] + private val orderPosListMap = new ConcurrentHashMap[(StructType, String), Option[NestedFieldPath]] /** - * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord, org.apache.avro.Schema, java.util.Map) + * Provides cached instance of [[UnsafeProjection]] transforming provided [[InternalRow]]s from + * one [[StructType]] and into another [[StructType]] + * + * For more details regarding its semantic, please check corresponding scala-doc for + * [[HoodieCatalystExpressionUtils.generateUnsafeProjection]] */ - def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: java.util.Map[String, String]): InternalRow = { - rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new java.util.LinkedList[String]).asInstanceOf[InternalRow] + def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = { + unsafeProjectionThreadLocal.get() + .getOrElseUpdate((from, to), generateUnsafeProjection(from, to)) } /** - * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object, org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque) + * Provides cached instance of [[UnsafeRowWriter]] transforming provided [[InternalRow]]s from + * one [[StructType]] and into another [[StructType]] + * + * Unlike [[UnsafeProjection]] requiring that [[from]] has to be a proper subset of [[to]] schema, + * [[UnsafeRowWriter]] is able to perform whole spectrum of schema-evolution transformations including: + * + *
    + *
  • Transforming nested structs/maps/arrays
  • + *
  • Handling type promotions (int -> long, etc)
  • + *
  • Handling (field) renames
  • + *
*/ - private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: java.util.Map[String, String], fieldNames: java.util.Deque[String]): Any = { - if (oldRecord == null) { - null - } else { - newSchema match { - case targetSchema: StructType => - if (!oldRecord.isInstanceOf[InternalRow]) { - throw new IllegalArgumentException("cannot rewrite record with different type") - } - val oldRow = oldRecord.asInstanceOf[InternalRow] - val helper = mutable.Map[Integer, Any]() - - val oldStrucType = oldSchema.asInstanceOf[StructType] - targetSchema.fields.zipWithIndex.foreach { case (field, i) => - fieldNames.push(field.name) - if (existField(oldStrucType, field.name)) { - val oldField = oldStrucType(field.name) - val oldPos = oldStrucType.fieldIndex(field.name) - helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames) - } else { - val fieldFullName = createFullName(fieldNames) - val colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.") - val lastColNameFromOldSchema = colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1) - // deal with rename - if (!existField(oldStrucType, field.name) && existField(oldStrucType, lastColNameFromOldSchema)) { - // find rename - val oldField = oldStrucType(lastColNameFromOldSchema) - val oldPos = oldStrucType.fieldIndex(lastColNameFromOldSchema) - helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames) - } - } - fieldNames.pop() - } - val newRow = new GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]]) - targetSchema.fields.zipWithIndex.foreach { case (_, i) => - if (helper.contains(i)) { - newRow.update(i, helper(i)) - } else { - // TODO add default val - newRow.update(i, null) - } - } + def getCachedUnsafeRowWriter(from: StructType, to: StructType, renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()): UnsafeRowWriter = { + unsafeWriterThreadLocal.get() + .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from, to, renamedColumnsMap)) + } - newRow - case targetSchema: ArrayType => - if (!oldRecord.isInstanceOf[ArrayData]) { - throw new IllegalArgumentException("cannot rewrite record with different type") - } - val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType - val oldArray = oldRecord.asInstanceOf[ArrayData] - val newElementType = targetSchema.elementType - val newArray = new GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]]) - fieldNames.push("element") - oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case (value, i) => newArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, newElementType, renameCols, fieldNames)) } - fieldNames.pop() - - newArray - case targetSchema: MapType => - if (!oldRecord.isInstanceOf[MapData]) { - throw new IllegalArgumentException("cannot rewrite record with different type") - } - val oldValueType = oldSchema.asInstanceOf[MapType].valueType - val oldKeyType = oldSchema.asInstanceOf[MapType].keyType - val oldMap = oldRecord.asInstanceOf[MapData] - val newValueType = targetSchema.valueType - val newKeyArray = new GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]]) - val newValueArray = new GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]]) - val newMap = new ArrayBasedMapData(newKeyArray, newValueArray) - fieldNames.push("value") - oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case (value, i) => newKeyArray.update(i, rewritePrimaryType(value, oldKeyType, oldKeyType)) } - oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { case (value, i) => newValueArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, newValueType, renameCols, fieldNames)) } - fieldNames.pop() - - newMap - case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema) - } + def getCachedPosList(structType: StructType, field: String): Option[NestedFieldPath] = { + val nestedFieldPathOpt = orderPosListMap.get((structType, field)) + // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid + // allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path + if (nestedFieldPathOpt != null) { + nestedFieldPathOpt + } else { + orderPosListMap.computeIfAbsent((structType, field), new JFunction[(StructType, String), Option[NestedFieldPath]] { + override def apply(t: (StructType, String)): Option[NestedFieldPath] = + composeNestedFieldPath(structType, field) + }) } } - def getCachedPosList(structType: StructType, field: String): NestedFieldPath = { - val schemaPair = (structType, field) - if (!orderPosListMap.containsKey(schemaPair)) { - val posList = HoodieUnsafeRowUtils.composeNestedFieldPath(structType, field) - orderPosListMap.put(schemaPair, posList) + def getCachedSchema(schema: Schema): StructType = { + val structType = schemaMap.get(schema) + // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid + // allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path + if (structType != null) { + structType + } else { + schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] { + override def apply(t: Schema): StructType = + convertAvroSchemaToStructType(schema) + }) } - orderPosListMap.get(schemaPair) } - def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = { - val schemaPair = (from, to) - val map = unsafeProjectionThreadLocal.get() - if (!map.containsKey(schemaPair)) { - val projection = HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to) - map.put(schemaPair, projection) + private[sql] def genUnsafeRowWriter(prevSchema: StructType, + newSchema: StructType, + renamedColumnsMap: JMap[String, String]): UnsafeRowWriter = { + val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, new JArrayDeque[String]()) + val unsafeProjection = generateUnsafeProjection(newSchema, newSchema) + val phonyUpdater = new CatalystDataUpdater { + var value: InternalRow = _ + + override def set(ordinal: Int, value: Any): Unit = + this.value = value.asInstanceOf[InternalRow] } - map.get(schemaPair) - } - def getCachedSchema(schema: Schema): StructType = { - if (!schemaMap.containsKey(schema)) { - val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) - schemaMap.put(schema, structType) + oldRow => { + writer(phonyUpdater, 0, oldRow) + unsafeProjection(phonyUpdater.value) } - schemaMap.get(schema) } - def existField(structType: StructType, name: String): Boolean = { - try { - HoodieUnsafeRowUtils.composeNestedFieldPath(structType, name) - true - } catch { - case _: IllegalArgumentException => false + private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit + + private def genUnsafeStructWriter(prevStructType: StructType, + newStructType: StructType, + renamedColumnsMap: JMap[String, String], + fieldNamesStack: JDeque[String]): (CatalystDataUpdater, Any) => Unit = { + // TODO need to canonicalize schemas (casing) + val fieldWriters = ArrayBuffer.empty[RowFieldUpdater] + val positionMap = ArrayBuffer.empty[Int] + + for (newField <- newStructType.fields) { + fieldNamesStack.push(newField.name) + + val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) = + prevStructType.getFieldIndex(newField.name) match { + case Some(prevFieldPos) => + val prevField = prevStructType(prevFieldPos) + (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos) + + case None => + val newFieldQualifiedName = createFullName(fieldNamesStack) + val prevFieldName: String = lookupRenamedField(newFieldQualifiedName, renamedColumnsMap) + + // Handle rename + prevStructType.getFieldIndex(prevFieldName) match { + case Some(prevFieldPos) => + val prevField = prevStructType.fields(prevFieldPos) + (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos) + + case None => + val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => fieldUpdater.setNullAt(ordinal) + (updater, -1) + } + } + + fieldWriters += fieldWriter + positionMap += prevFieldPos + + fieldNamesStack.pop() } - } - private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = { - if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) { - oldSchema match { - case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | DateType | TimestampType | BinaryType => - oldValue - // Copy UTF8String before putting into GenericInternalRow - case StringType => UTF8String.fromString(oldValue.toString) - case DecimalType() => - Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale)) - case _ => - throw new HoodieException("Unknown schema type: " + newSchema) + (fieldUpdater, row) => { + var pos = 0 + while (pos < fieldWriters.length) { + val prevPos = positionMap(pos) + val prevValue = if (prevPos >= 0) { + row.asInstanceOf[InternalRow].get(prevPos, prevStructType.fields(prevPos).dataType) + } else { + null + } + + fieldWriters(pos)(fieldUpdater, pos, prevValue) + pos += 1 } - } else { - rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema) } } - private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = { - val value = newSchema match { - case NullType | BooleanType => - case DateType if oldSchema.equals(StringType) => - CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(oldValue.toString)) - case LongType => - oldSchema match { - case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].longValue()) + private def newWriterRenaming(prevDataType: DataType, + newDataType: DataType, + renamedColumnsMap: JMap[String, String], + fieldNameStack: JDeque[String]): RowFieldUpdater = { + (newDataType, prevDataType) match { + case (newType, prevType) if prevType == newType => + (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value) + + case (newStructType: StructType, prevStructType: StructType) => + val writer = genUnsafeStructWriter(prevStructType, newStructType, renamedColumnsMap, fieldNameStack) + + val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType)) + val rowUpdater = new RowUpdater(newRow) + + (fieldUpdater, ordinal, value) => { + // Here new row is built in 2 stages: + // - First, we pass mutable row (used as buffer/scratchpad) created above wrapped into [[RowUpdater]] + // into generated row-writer + // - Upon returning from row-writer, we call back into parent row's [[fieldUpdater]] to set returned + // row as a value in it + writer(rowUpdater, value) + fieldUpdater.set(ordinal, newRow) + } + + case (ArrayType(newElementType, _), ArrayType(prevElementType, containsNull)) => + fieldNameStack.push("element") + val elementWriter = newWriterRenaming(prevElementType, newElementType, renamedColumnsMap, fieldNameStack) + fieldNameStack.pop() + + (fieldUpdater, ordinal, value) => { + val prevArrayData = value.asInstanceOf[ArrayData] + val prevArray = prevArrayData.toObjectArray(prevElementType) + + val newArrayData = createArrayData(newElementType, prevArrayData.numElements()) + val elementUpdater = new ArrayDataUpdater(newArrayData) + + var i = 0 + while (i < prevArray.length) { + val element = prevArray(i) + if (element == null) { + if (!containsNull) { + throw new HoodieException( + s"Array value at path '${fieldNameStack.asScala.mkString(".")}' is not allowed to be null") + } else { + elementUpdater.setNullAt(i) + } + } else { + elementWriter(elementUpdater, i, element) + } + i += 1 + } + + fieldUpdater.set(ordinal, newArrayData) + } + + case (MapType(_, newValueType, _), MapType(_, prevValueType, valueContainsNull)) => + fieldNameStack.push("value") + val valueWriter = newWriterRenaming(prevValueType, newValueType, renamedColumnsMap, fieldNameStack) + fieldNameStack.pop() + + (updater, ordinal, value) => + val mapData = value.asInstanceOf[MapData] + val prevKeyArrayData = mapData.keyArray + val prevValueArrayData = mapData.valueArray + val prevValueArray = prevValueArrayData.toObjectArray(prevValueType) + + val newValueArray = createArrayData(newValueType, mapData.numElements()) + val valueUpdater = new ArrayDataUpdater(newValueArray) + var i = 0 + while (i < prevValueArray.length) { + val value = prevValueArray(i) + if (value == null) { + if (!valueContainsNull) { + throw new HoodieException(s"Map value at path ${fieldNameStack.asScala.mkString(".")} is not allowed to be null") + } else { + valueUpdater.setNullAt(i) + } + } else { + valueWriter(valueUpdater, i, value) + } + i += 1 + } + + // NOTE: Key's couldn't be transformed and have to always be of [[StringType]] + updater.set(ordinal, new ArrayBasedMapData(prevKeyArrayData, newValueArray)) + + case (newDecimal: DecimalType, _) => + prevDataType match { + case IntegerType | LongType | FloatType | DoubleType | StringType => + (fieldUpdater, ordinal, value) => + val scale = newDecimal.scale + // TODO this has to be revisited to avoid loss of precision (for fps) + fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(BigDecimal(value.toString).setScale(scale, ROUND_HALF_EVEN))) + + case _: DecimalType => + (fieldUpdater, ordinal, value) => + fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(value.asInstanceOf[Decimal].toBigDecimal.setScale(newDecimal.scale))) + case _ => + throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible") } - case FloatType => - oldSchema match { - case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].floatValue()) - case LongType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].floatValue()) + + case (_: ShortType, _) => + prevDataType match { + case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setShort(ordinal, value.asInstanceOf[Byte].toShort) case _ => + throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible") } - case DoubleType => - oldSchema match { - case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].doubleValue()) - case LongType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].doubleValue()) - case FloatType => CatalystTypeConverters.convertToCatalyst(java.lang.Double.valueOf(oldValue.asInstanceOf[Float] + "")) + + case (_: IntegerType, _) => + prevDataType match { + case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Short].toInt) + case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Byte].toInt) case _ => + throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible") } - case BinaryType => - oldSchema match { - case StringType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8)) + + case (_: LongType, _) => + prevDataType match { + case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Int].toLong) + case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Short].toLong) + case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Byte].toLong) case _ => + throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible") } - case StringType => - oldSchema match { - case BinaryType => CatalystTypeConverters.convertToCatalyst(new String(oldValue.asInstanceOf[Array[Byte]])) - case DateType => CatalystTypeConverters.convertToCatalyst(toJavaDate(oldValue.asInstanceOf[Integer]).toString) - case IntegerType | LongType | FloatType | DoubleType | DecimalType() => CatalystTypeConverters.convertToCatalyst(oldValue.toString) + + case (_: FloatType, _) => + prevDataType match { + case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Long].toFloat) + case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Int].toFloat) + case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Short].toFloat) + case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Byte].toFloat) case _ => + throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible") } - case DecimalType() => - oldSchema match { - case IntegerType | LongType | FloatType | DoubleType | StringType => - val scale = newSchema.asInstanceOf[DecimalType].scale - Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale)) + case (_: DoubleType, _) => + prevDataType match { + case _: FloatType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble) + case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Long].toDouble) + case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Int].toDouble) + case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Short].toDouble) + case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Byte].toDouble) case _ => + throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible") } - case _ => - } - if (value == None) { - throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema)) - } else { - CatalystTypeConverters.convertToCatalyst(value) + + case (_: BinaryType, _: StringType) => + (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value.asInstanceOf[UTF8String].getBytes) + + // TODO revisit this (we need to align permitted casting w/ Spark) + // NOTE: This is supported to stay compatible w/ [[HoodieAvroUtils.rewriteRecordWithNewSchema]] + case (_: StringType, _) => + prevDataType match { + case BinaryType => (fieldUpdater, ordinal, value) => + fieldUpdater.set(ordinal, UTF8String.fromBytes(value.asInstanceOf[Array[Byte]])) + case DateType => (fieldUpdater, ordinal, value) => + fieldUpdater.set(ordinal, UTF8String.fromString(toJavaDate(value.asInstanceOf[Integer]).toString)) + case IntegerType | LongType | FloatType | DoubleType | _: DecimalType => + (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, UTF8String.fromString(value.toString)) + + case _ => + throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible") + } + + case (DateType, StringType) => + (fieldUpdater, ordinal, value) => + fieldUpdater.set(ordinal, CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString))) + + case (_, _) => + throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible") } } - def removeFields(schema: StructType, fieldsToRemove: java.util.List[String]): StructType = { - StructType(schema.fields.filter(field => !fieldsToRemove.contains(field.name))) + private def lookupRenamedField(newFieldQualifiedName: String, renamedColumnsMap: JMap[String, String]) = { + val prevFieldQualifiedName = renamedColumnsMap.getOrDefault(newFieldQualifiedName, "") + val prevFieldQualifiedNameParts = prevFieldQualifiedName.split("\\.") + val prevFieldName = prevFieldQualifiedNameParts(prevFieldQualifiedNameParts.length - 1) + + prevFieldName } + + private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match { + case BooleanType => UnsafeArrayData.fromPrimitiveArray(new Array[Boolean](length)) + case ByteType => UnsafeArrayData.fromPrimitiveArray(new Array[Byte](length)) + case ShortType => UnsafeArrayData.fromPrimitiveArray(new Array[Short](length)) + case IntegerType => UnsafeArrayData.fromPrimitiveArray(new Array[Int](length)) + case LongType => UnsafeArrayData.fromPrimitiveArray(new Array[Long](length)) + case FloatType => UnsafeArrayData.fromPrimitiveArray(new Array[Float](length)) + case DoubleType => UnsafeArrayData.fromPrimitiveArray(new Array[Double](length)) + case _ => new GenericArrayData(new Array[Any](length)) + } + + sealed trait CatalystDataUpdater { + def set(ordinal: Int, value: Any): Unit + def setNullAt(ordinal: Int): Unit = set(ordinal, null) + def setBoolean(ordinal: Int, value: Boolean): Unit = set(ordinal, value) + def setByte(ordinal: Int, value: Byte): Unit = set(ordinal, value) + def setShort(ordinal: Int, value: Short): Unit = set(ordinal, value) + def setInt(ordinal: Int, value: Int): Unit = set(ordinal, value) + def setLong(ordinal: Int, value: Long): Unit = set(ordinal, value) + def setDouble(ordinal: Int, value: Double): Unit = set(ordinal, value) + def setFloat(ordinal: Int, value: Float): Unit = set(ordinal, value) + def setDecimal(ordinal: Int, value: Decimal): Unit = set(ordinal, value) + } + + final class RowUpdater(row: InternalRow) extends CatalystDataUpdater { + override def set(ordinal: Int, value: Any): Unit = row.update(ordinal, value) + override def setNullAt(ordinal: Int): Unit = row.setNullAt(ordinal) + override def setBoolean(ordinal: Int, value: Boolean): Unit = row.setBoolean(ordinal, value) + override def setByte(ordinal: Int, value: Byte): Unit = row.setByte(ordinal, value) + override def setShort(ordinal: Int, value: Short): Unit = row.setShort(ordinal, value) + override def setInt(ordinal: Int, value: Int): Unit = row.setInt(ordinal, value) + override def setLong(ordinal: Int, value: Long): Unit = row.setLong(ordinal, value) + override def setDouble(ordinal: Int, value: Double): Unit = row.setDouble(ordinal, value) + override def setFloat(ordinal: Int, value: Float): Unit = row.setFloat(ordinal, value) + override def setDecimal(ordinal: Int, value: Decimal): Unit = + row.setDecimal(ordinal, value, value.precision) + } + + final class ArrayDataUpdater(array: ArrayData) extends CatalystDataUpdater { + override def set(ordinal: Int, value: Any): Unit = array.update(ordinal, value) + override def setNullAt(ordinal: Int): Unit = array.setNullAt(ordinal) + override def setBoolean(ordinal: Int, value: Boolean): Unit = array.setBoolean(ordinal, value) + override def setByte(ordinal: Int, value: Byte): Unit = array.setByte(ordinal, value) + override def setShort(ordinal: Int, value: Short): Unit = array.setShort(ordinal, value) + override def setInt(ordinal: Int, value: Int): Unit = array.setInt(ordinal, value) + override def setLong(ordinal: Int, value: Long): Unit = array.setLong(ordinal, value) + override def setDouble(ordinal: Int, value: Double): Unit = array.setDouble(ordinal, value) + override def setFloat(ordinal: Int, value: Float): Unit = array.setFloat(ordinal, value) + override def setDecimal(ordinal: Int, value: Decimal): Unit = array.update(ordinal, value) + } + } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala index c105142de0f45..5486c9f6551a5 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala @@ -93,29 +93,36 @@ object HoodieUnsafeRowUtils { * * This method produces nested-field path, that is subsequently used by [[getNestedInternalRowValue]], [[getNestedRowValue]] */ - def composeNestedFieldPath(schema: StructType, nestedFieldRef: String): NestedFieldPath = { + def composeNestedFieldPath(schema: StructType, nestedFieldRef: String): Option[NestedFieldPath]= { val fieldRefParts = nestedFieldRef.split('.') val ordSeq = ArrayBuffer[(Int, StructField)]() var curSchema = schema var idx = 0 while (idx < fieldRefParts.length) { val fieldRefPart = fieldRefParts(idx) - val ord = curSchema.fieldIndex(fieldRefPart) - val field = curSchema(ord) - // Append current field's (ordinal, data-type) - ordSeq.append((ord, field)) - // Update current schema, unless terminal field-ref part - if (idx < fieldRefParts.length - 1) { - curSchema = field.dataType match { - case st: StructType => st - case dt@_ => - throw new IllegalArgumentException(s"Invalid nested field reference ${fieldRefParts.drop(idx).mkString(".")} into $dt") - } + curSchema.getFieldIndex(fieldRefPart) match { + case Some(ord) => + val field = curSchema(ord) + // Append current field's (ordinal, data-type) + ordSeq.append((ord, field)) + // Update current schema, unless terminal field-ref part + if (idx < fieldRefParts.length - 1) { + curSchema = field.dataType match { + case st: StructType => st + case _ => + // In case we've stumbled upon something other than the [[StructType]] means that + // provided nested field reference is invalid. In that case we simply return null + return None + } + } + + // In case, field is not found we return null + case None => return None } idx += 1 } - NestedFieldPath(ordSeq.toArray) + Some(NestedFieldPath(ordSeq.toArray)) } case class NestedFieldPath(parts: Array[(Int, StructField)]) diff --git a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala index c23bbab99b4f0..104d1870eaead 100644 --- a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala +++ b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue, getNestedRowValue} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ -import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail} import org.junit.jupiter.api.Test class TestHoodieUnsafeRowUtils { @@ -41,11 +41,9 @@ class TestHoodieUnsafeRowUtils { assertEquals( Seq((1, schema(1)), (0, schema(1).dataType.asInstanceOf[StructType](0))), - composeNestedFieldPath(schema, "bar.baz").parts.toSeq) + composeNestedFieldPath(schema, "bar.baz").get.parts.toSeq) - assertThrows(classOf[IllegalArgumentException]) { () => - composeNestedFieldPath(schema, "foo.baz") - } + assertTrue(composeNestedFieldPath(schema, "foo.baz").isEmpty) } @Test @@ -65,36 +63,36 @@ class TestHoodieUnsafeRowUtils { assertEquals( 123, - getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar.baz")) + getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar.baz").get) ) assertEquals( 456L, - getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar.bor")) + getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar.bor").get) ) assertEquals( "str", - getNestedInternalRowValue(row, composeNestedFieldPath(schema, "foo")) + getNestedInternalRowValue(row, composeNestedFieldPath(schema, "foo").get) ) assertEquals( row.getStruct(1, 2), - getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar")) + getNestedInternalRowValue(row, composeNestedFieldPath(schema, "bar").get) ) val rowProperNullable = InternalRow("str", null) assertEquals( null, - getNestedInternalRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar.baz")) + getNestedInternalRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar.baz").get) ) assertEquals( null, - getNestedInternalRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar")) + getNestedInternalRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar").get) ) val rowInvalidNullable = InternalRow(null, InternalRow(123, 456L)) assertThrows(classOf[IllegalArgumentException]) { () => - getNestedInternalRowValue(rowInvalidNullable, composeNestedFieldPath(schema, "foo")) + getNestedInternalRowValue(rowInvalidNullable, composeNestedFieldPath(schema, "foo").get) } } @@ -115,36 +113,36 @@ class TestHoodieUnsafeRowUtils { assertEquals( 123, - getNestedRowValue(row, composeNestedFieldPath(schema, "bar.baz")) + getNestedRowValue(row, composeNestedFieldPath(schema, "bar.baz").get) ) assertEquals( 456L, - getNestedRowValue(row, composeNestedFieldPath(schema, "bar.bor")) + getNestedRowValue(row, composeNestedFieldPath(schema, "bar.bor").get) ) assertEquals( "str", - getNestedRowValue(row, composeNestedFieldPath(schema, "foo")) + getNestedRowValue(row, composeNestedFieldPath(schema, "foo").get) ) assertEquals( row.getStruct(1), - getNestedRowValue(row, composeNestedFieldPath(schema, "bar")) + getNestedRowValue(row, composeNestedFieldPath(schema, "bar").get) ) val rowProperNullable = Row("str", null) assertEquals( null, - getNestedRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar.baz")) + getNestedRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar.baz").get) ) assertEquals( null, - getNestedRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar")) + getNestedRowValue(rowProperNullable, composeNestedFieldPath(schema, "bar").get) ) val rowInvalidNullable = Row(null, Row(123, 456L)) assertThrows(classOf[IllegalArgumentException]) { () => - getNestedRowValue(rowInvalidNullable, composeNestedFieldPath(schema, "foo")) + getNestedRowValue(rowInvalidNullable, composeNestedFieldPath(schema, "foo").get) } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java index a8c695240cddc..db77c21e8f465 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java @@ -112,28 +112,18 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { } @Override - public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException { - GenericRecord record = HoodieAvroUtils.rewriteRecordWithNewSchema(data, targetSchema); - return new HoodieAvroIndexedRecord(key, record, operation, metaData); + public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) { + GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(data, targetSchema); + updateMetadataValuesInternal(newAvroRecord, metadataValues); + return new HoodieAvroIndexedRecord(key, newAvroRecord, operation, metaData); } @Override - public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) throws IOException { + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) { GenericRecord record = HoodieAvroUtils.rewriteRecordWithNewSchema(data, newSchema, renameCols); return new HoodieAvroIndexedRecord(key, record, operation, metaData); } - @Override - public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException { - metadataValues.getKv().forEach((key, value) -> { - if (value != null) { - ((GenericRecord) data).put(key, value); - } - }); - - return new HoodieAvroIndexedRecord(key, data, operation, metaData); - } - @Override public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) { ((GenericRecord) data).put(keyFieldName, StringUtils.EMPTY_STRING); @@ -234,4 +224,18 @@ protected final IndexedRecord readRecordPayload(Kryo kryo, Input input) { return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer); } + + static void updateMetadataValuesInternal(GenericRecord avroRecord, MetadataValues metadataValues) { + if (metadataValues.isEmpty()) { + return; // no-op + } + + String[] values = metadataValues.getValues(); + for (int pos = 0; pos < values.length; ++pos) { + String value = values[pos]; + if (value != null) { + avroRecord.put(HoodieMetadataField.values()[pos].getFieldName(), value); + } + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index 62001356b4fad..943c4e0953fe8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.avro.Schema; @@ -37,6 +38,8 @@ import java.util.Map; import java.util.Properties; +import static org.apache.hudi.common.model.HoodieAvroIndexedRecord.updateMetadataValuesInternal; + /** * Implementation of {@link HoodieRecord} using Avro payload. * @@ -123,31 +126,26 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { } @Override - public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException { - Option avroRecordPayloadOpt = getData().getInsertValue(recordSchema, props); - GenericRecord avroPayloadInNewSchema = - HoodieAvroUtils.rewriteRecord((GenericRecord) avroRecordPayloadOpt.get(), targetSchema); - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroPayloadInNewSchema), getOperation(), this.currentLocation, this.newLocation); - } - - @Override - public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) throws IOException { - GenericRecord oldRecord = (GenericRecord) getData().getInsertValue(recordSchema, props).get(); - GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols); - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(rewriteRecord), getOperation(), this.currentLocation, this.newLocation); + public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) { + try { + Option avroRecordOpt = getData().getInsertValue(recordSchema, props); + GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecordOpt.get(), targetSchema); + updateMetadataValuesInternal(newAvroRecord, metadataValues); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(newAvroRecord), getOperation(), this.currentLocation, this.newLocation); + } catch (IOException e) { + throw new HoodieIOException("Failed to deserialize record!", e); + } } @Override - public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException { - GenericRecord avroRecordPayload = (GenericRecord) getData().getInsertValue(recordSchema, props).get(); - - metadataValues.getKv().forEach((key, value) -> { - if (value != null) { - avroRecordPayload.put(key, value); - } - }); - - return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation, this.newLocation); + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) { + try { + GenericRecord oldRecord = (GenericRecord) getData().getInsertValue(recordSchema, props).get(); + GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols); + return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(rewriteRecord), getOperation(), this.currentLocation, this.newLocation); + } catch (IOException e) { + throw new HoodieIOException("Failed to deserialize record!", e); + } } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java index e2f9334f8a2ec..3c7927b6b65f4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java @@ -99,17 +99,12 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { } @Override - public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException { + public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) { throw new UnsupportedOperationException(); } @Override - public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException { + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index a23316a2ff9fd..d78241aaeb455 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -354,25 +354,21 @@ public final void read(Kryo kryo, Input input) { public abstract HoodieRecord joinWith(HoodieRecord other, Schema targetSchema); /** - * Rewrite record into new schema(add meta columns) + * Rewrites record into new target schema containing Hudi-specific meta-fields + * + * NOTE: This operation is idempotent */ - public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException; + public abstract HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props); /** * Support schema evolution. */ - public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) throws IOException; + public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols); - public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) throws IOException { + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) { return rewriteRecordWithNewSchema(recordSchema, props, newSchema, Collections.emptyMap()); } - /** - * This method could change in the future. - * @temporary - */ - public abstract HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException; - public abstract boolean isDelete(Schema recordSchema, Properties props) throws IOException; /** @@ -391,6 +387,10 @@ public static String generateSequenceId(String instantTime, int partitionId, lon return instantTime + "_" + partitionId + "_" + recordIndex; } + protected static boolean hasMetaFields(Schema schema) { + return schema.getField(HoodieRecord.RECORD_KEY_METADATA_FIELD) != null; + } + /** * A special record returned by {@link HoodieRecordPayload}, which means we should just skip this record. * This record is only used for {@link HoodieRecordPayload} currently, so it should not diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java index baee08163380b..361da5639f48b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java @@ -18,47 +18,95 @@ package org.apache.hudi.common.model; -import java.util.HashMap; -import java.util.Map; - public class MetadataValues { - private final Map kv; - public MetadataValues() { - this.kv = new HashMap<>(); + // NOTE: These fields are laid out in the same order as they are encoded in + // each record and that should be preserved + private String commitTime; + private String commitSeqNo; + private String recordKey; + private String partitionPath; + private String fileName; + private String operation; + + private boolean set = false; + + public MetadataValues() {} + + public String getCommitTime() { + return commitTime; + } + + public String getCommitSeqNo() { + return commitSeqNo; + } + + public String getRecordKey() { + return recordKey; + } + + public String getPartitionPath() { + return partitionPath; + } + + public String getFileName() { + return fileName; + } + + public String getOperation() { + return operation; } public MetadataValues setCommitTime(String value) { - this.kv.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, value); + this.commitTime = value; + this.set = true; return this; } public MetadataValues setCommitSeqno(String value) { - this.kv.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, value); + this.commitSeqNo = value; + this.set = true; return this; } public MetadataValues setRecordKey(String value) { - this.kv.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, value); + this.recordKey = value; + this.set = true; return this; } public MetadataValues setPartitionPath(String value) { - this.kv.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, value); + this.partitionPath = value; + this.set = true; return this; } public MetadataValues setFileName(String value) { - this.kv.put(HoodieRecord.FILENAME_METADATA_FIELD, value); + this.fileName = value; + this.set = true; return this; } public MetadataValues setOperation(String value) { - this.kv.put(HoodieRecord.OPERATION_METADATA_FIELD, value); + this.operation = value; + this.set = true; return this; } - public Map getKv() { - return kv; + public boolean isEmpty() { + return !set; + } + + public String[] getValues() { + return new String[] { + // NOTE: These fields are laid out in the same order as they are encoded in + // each record and that should be preserved + commitTime, + commitSeqNo, + recordKey, + partitionPath, + fileName, + operation + }; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 42babc775b942..9855c9323dc95 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -842,16 +842,11 @@ private Option, Schema>> composeEvolve Schema mergedAvroSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, readerSchema.getFullName()); return Option.of(Pair.of((record) -> { - try { - return record.rewriteRecordWithNewSchema( - dataBlock.getSchema(), - this.hoodieTableMetaClient.getTableConfig().getProps(), - mergedAvroSchema, - Collections.emptyMap()); - } catch (IOException e) { - LOG.error("Error rewrite record with new schema", e); - throw new HoodieException(e); - } + return record.rewriteRecordWithNewSchema( + dataBlock.getSchema(), + this.hoodieTableMetaClient.getTableConfig().getProps(), + mergedAvroSchema, + Collections.emptyMap()); }, mergedAvroSchema)); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7e234775faa28..c912189a15369 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -37,7 +37,7 @@ import org.apache.hudi.common.model._ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.util.{CommitUtils, StringUtils} +import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException} @@ -57,6 +57,7 @@ import org.apache.hudi.util.SparkKeyGenUtils import org.apache.log4j.LogManager import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.HoodieInternalRowUtils.getCachedUnsafeRowWriter import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable @@ -95,12 +96,11 @@ object HoodieSparkSqlWriter { optParams: Map[String, String], df: DataFrame, hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, - hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty, - asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty, - asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty, - extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty) - : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String], - SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = { + hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty, + asyncCompactionTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = Option.empty, + asyncClusteringTriggerFn: Option[SparkRDDWriteClient[_] => Unit] = Option.empty, + extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty): + (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") val path = optParams("path") @@ -255,7 +255,7 @@ object HoodieSparkSqlWriter { } // scalastyle:on - val (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) = + val (writeResult, writeClient: SparkRDDWriteClient[_]) = operation match { case WriteOperationType.DELETE => val genericRecords = HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace) @@ -271,7 +271,7 @@ object HoodieSparkSqlWriter { val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) - .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] + .asInstanceOf[SparkRDDWriteClient[_]] if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { asyncCompactionTriggerFn.get.apply(client) @@ -307,7 +307,7 @@ object HoodieSparkSqlWriter { val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schemaStr, path, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) - .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] + .asInstanceOf[SparkRDDWriteClient[_]] // Issue delete partitions client.startCommitWithTime(instantTime, commitActionType) val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime) @@ -330,31 +330,20 @@ object HoodieSparkSqlWriter { writerSchema } - // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM native serialization framework - // (due to containing cyclic refs), therefore we have to convert it to string before - // passing onto the Executor - val dataFileSchemaStr = dataFileSchema.toString - // Create a HoodieWriteClient & issue the write. val client = hoodieWriteClient.getOrElse { val finalOpts = addSchemaEvolutionParameters(parameters, internalSchemaOpt, Some(writerSchema)) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key // TODO(HUDI-4772) proper writer-schema has to be specified here - DataSourceUtils.createHoodieClient(jsc, dataFileSchemaStr, path, tblName, mapAsJavaMap(finalOpts)) - }.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] + DataSourceUtils.createHoodieClient(jsc, dataFileSchema.toString, path, tblName, mapAsJavaMap(finalOpts)) + } val writeConfig = client.getConfig if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) { throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.") } // Convert to RDD[HoodieRecord] - val hoodieRecords = createHoodieRecordRdd( - df, - writeConfig, - parameters, - avroRecordName, - avroRecordNamespace, - writerSchema, - dataFileSchemaStr, - operation) + val hoodieRecords = + createHoodieRecordRdd(df, writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema, + dataFileSchema, operation) if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { asyncCompactionTriggerFn.get.apply(client) @@ -518,31 +507,6 @@ object HoodieSparkSqlWriter { HoodieAvroUtils.removeFields(schema, partitionColumns.toSet.asJava) } - def generateSparkSchemaWithoutPartitionColumns(partitionParam: String, schema: StructType): StructType = { - val partitionColumns = getPartitionColumns(partitionParam) - HoodieInternalRowUtils.removeFields(schema, partitionColumns) - } - - def getAvroProcessedRecord(partitionParam: String, record: GenericRecord, - dropPartitionColumns: Boolean): GenericRecord = { - var processedRecord = record - if (dropPartitionColumns) { - val writeSchema = generateSchemaWithoutPartitionColumns(partitionParam, record.getSchema) - processedRecord = HoodieAvroUtils.rewriteRecord(record, writeSchema) - } - processedRecord - } - - def getProcessedRecord(partitionParam: String, record: GenericRecord, - dropPartitionColumns: Boolean): GenericRecord = { - var processedRecord = record - if (dropPartitionColumns) { - val writeSchema = generateSchemaWithoutPartitionColumns(partitionParam, record.getSchema) - processedRecord = HoodieAvroUtils.rewriteRecord(record, writeSchema) - } - processedRecord - } - def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema], writeSchemaOpt: Option[Schema] = None): Map[String, String] = { val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false" @@ -640,7 +604,7 @@ object HoodieSparkSqlWriter { optParams: Map[String, String], df: DataFrame, hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, - hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = { + hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): Boolean = { assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") val path = optParams("path") @@ -911,7 +875,7 @@ object HoodieSparkSqlWriter { schema: StructType, writeResult: HoodieWriteResult, parameters: Map[String, String], - client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], + client: SparkRDDWriteClient[_], tableConfig: HoodieTableConfig, jsc: JavaSparkContext, tableInstantInfo: TableInstantInfo, @@ -981,7 +945,7 @@ object HoodieSparkSqlWriter { } } - private def isAsyncCompactionEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], + private def isAsyncCompactionEnabled(client: SparkRDDWriteClient[_], tableConfig: HoodieTableConfig, parameters: Map[String, String], configuration: Configuration): Boolean = { log.info(s"Config.inlineCompactionEnabled ? ${client.getConfig.inlineCompactionEnabled}") @@ -993,7 +957,7 @@ object HoodieSparkSqlWriter { } } - private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], + private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[_], parameters: Map[String, String]): Boolean = { log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}") asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled @@ -1049,7 +1013,7 @@ object HoodieSparkSqlWriter { recordName: String, recordNameSpace: String, writerSchema: Schema, - dataFileSchemaStr: String, + dataFileSchema: Schema, operation: WriteOperationType) = { val shouldDropPartitionColumns = config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS) val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps)) @@ -1060,7 +1024,13 @@ object HoodieSparkSqlWriter { parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean - log.debug(s"Use $recordType") + // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM native serialization framework + // (due to containing cyclic refs), therefore we have to convert it to string before + // passing onto the Executor + val dataFileSchemaStr = dataFileSchema.toString + + log.debug(s"Creating HoodieRecords (as $recordType)") + recordType match { case HoodieRecord.HoodieRecordType.AVRO => val avroRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, recordName, recordNameSpace, @@ -1090,29 +1060,27 @@ object HoodieSparkSqlWriter { hoodieRecord } }).toJavaRDD() + case HoodieRecord.HoodieRecordType.SPARK => - // ut will use AvroKeyGenerator, so we need to cast it in spark record val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface] val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr) val dataFileStructType = HoodieInternalRowUtils.getCachedSchema(dataFileSchema) val writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema) val sourceStructType = df.schema - df.queryExecution.toRdd.mapPartitions { iter => - iter.map { internalRow => - val recordKey = sparkKeyGenerator.getRecordKey(internalRow, sourceStructType) - val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, sourceStructType) + df.queryExecution.toRdd.mapPartitions { it => + val targetStructType = if (shouldDropPartitionColumns) dataFileStructType else writerStructType + // NOTE: To make sure we properly transform records + val targetStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, targetStructType) + + it.map { sourceRow => + val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, sourceStructType) + val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, sourceStructType) val key = new HoodieKey(recordKey.toString, partitionPath.toString) - if (shouldDropPartitionColumns) { - val row = HoodieInternalRowUtils.getCachedUnsafeProjection(dataFileStructType, dataFileStructType) - .apply(HoodieInternalRowUtils.rewriteRecord(internalRow, sourceStructType, dataFileStructType)) - new HoodieSparkRecord(key, row, dataFileStructType, false) - } else { - val row = HoodieInternalRowUtils.getCachedUnsafeProjection(writerStructType, writerStructType) - .apply(HoodieInternalRowUtils.rewriteRecord(internalRow, sourceStructType, writerStructType)) - new HoodieSparkRecord(key, row, writerStructType, false) - } + val targetRow = targetStructTypeRowWriter(sourceRow) + + new HoodieSparkRecord(key, targetRow, dataFileStructType, false) } }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]] } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 9830d3230817b..502d25f82870f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -89,7 +89,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, private var asyncCompactorService: AsyncCompactService = _ private var asyncClusteringService: AsyncClusteringService = _ - private var writeClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty + private var writeClient: Option[SparkRDDWriteClient[_]] = Option.empty private var hoodieTableConfig: Option[HoodieTableConfig] = Option.empty override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized { @@ -253,7 +253,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, } } - protected def triggerAsyncCompactor(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = { + protected def triggerAsyncCompactor(client: SparkRDDWriteClient[_]): Unit = { if (null == asyncCompactorService) { log.info("Triggering Async compaction !!") asyncCompactorService = new SparkStreamingAsyncCompactService(new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)), @@ -282,7 +282,7 @@ class HoodieStreamingSink(sqlContext: SQLContext, } } - protected def triggerAsyncClustering(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = { + protected def triggerAsyncClustering(client: SparkRDDWriteClient[_]): Unit = { if (null == asyncClusteringService) { log.info("Triggering async clustering!") asyncClusteringService = new SparkStreamingAsyncClusteringService(new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala deleted file mode 100644 index 26a343d2ff613..0000000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi - -import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.testutils.HoodieClientTestUtils - -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{HoodieInternalRowUtils, Row, SparkSession} -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} - -class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAfterAll { - - private var sparkSession: SparkSession = _ - - private val schema1 = StructType( - Array( - StructField("name", StringType), - StructField("age", IntegerType) - ) - ) - private val schema2 = StructType( - Array( - StructField("name1", StringType), - StructField("age1", IntegerType) - ) - ) - private val schemaMerge = StructType(schema1.fields ++ schema2.fields) - private val schema1WithMetaData = StructType(Array( - StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType), - StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType), - StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType), - StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType), - StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType), - StructField(HoodieRecord.OPERATION_METADATA_FIELD, StringType), - StructField(HoodieRecord.HOODIE_IS_DELETED_FIELD, BooleanType) - ) ++ schema1.fields) - - override protected def beforeAll(): Unit = { - // Initialize a local spark env - val jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName)) - jsc.setLogLevel("ERROR") - sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate - } - - override protected def afterAll(): Unit = { - sparkSession.close() - } - - test("test rewrite") { - val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, "like1", 181))) - val oldRow = sparkSession.createDataFrame(data, schemaMerge).queryExecution.toRdd.first() - val newRow1 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema1) - val newRow2 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema2) - assert(newRow1.get(0, StringType).toString.equals("like")) - assert(newRow1.get(1, IntegerType) == 18) - assert(newRow2.get(0, StringType).toString.equals("like1")) - assert(newRow2.get(1, IntegerType) == 181) - } - - test("test rewrite with nullable value") { - val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18))) - val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first() - val newRow = HoodieInternalRowUtils.rewriteRecord(oldRow, schema1, schemaMerge) - assert(newRow.get(0, StringType).toString.equals("like")) - assert(newRow.get(1, IntegerType) == 18) - assert(newRow.get(2, StringType) == null) - assert(newRow.get(3, IntegerType) == null) - } - - -} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala similarity index 77% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala index 31d36adbee5a3..be1dae8cc279f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala @@ -16,28 +16,30 @@ * limitations under the License. */ -package org.apache.hudi - -import java.nio.ByteBuffer -import java.util.{ArrayList, HashMap, Objects} +package org.apache.spark.sql.hudi import org.apache.avro.generic.GenericData import org.apache.avro.{LogicalTypes, Schema} +import org.apache.hudi.AvroConversionUtils +import org.apache.hudi.SparkAdapterSupport.sparkAdapter import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.internal.schema.Types import org.apache.hudi.internal.schema.action.TableChanges import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.SchemaChangeUtils import org.apache.hudi.testutils.HoodieClientTestUtils - import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.{HoodieInternalRowUtils, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{HoodieInternalRowUtils, Row, SparkSession} +import org.junit.jupiter.api.Assertions.assertEquals import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with BeforeAndAfterAll { +import java.nio.ByteBuffer +import java.util.{ArrayList, HashMap, Objects, Collections => JCollections} + +class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAfterAll { private var sparkSession: SparkSession = _ override protected def beforeAll(): Unit = { @@ -51,6 +53,54 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef sparkSession.close() } + private val schema1 = StructType(Seq( + StructField("name", StringType), + StructField("age", IntegerType), + StructField("address", + StructType(Seq( + StructField("city", StringType), + StructField("street", StringType) + )) + ) + )) + + private val schema2 = StructType(Seq( + StructField("name1", StringType), + StructField("age1", IntegerType) + )) + + private val mergedSchema = StructType(schema1.fields ++ schema2.fields) + + test("Test simple row rewriting") { + val rows = Seq( + Row("Andrew", 18, Row("Mission st", "SF"), "John", 19) + ) + val data = sparkSession.sparkContext.parallelize(rows) + val oldRow = sparkSession.createDataFrame(data, mergedSchema).queryExecution.toRdd.first() + + val rowWriter1 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema, schema1, JCollections.emptyMap()) + val newRow1 = rowWriter1(oldRow) + + val serDe1 = sparkAdapter.createSparkRowSerDe(schema1) + assertEquals(serDe1.deserializeRow(newRow1), Row("Andrew", 18, Row("Mission st", "SF"))); + + val rowWriter2 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema, schema2, JCollections.emptyMap()) + val newRow2 = rowWriter2(oldRow) + + val serDe2 = sparkAdapter.createSparkRowSerDe(schema2) + assertEquals(serDe2.deserializeRow(newRow2), Row("John", 19)); + } + + test("Test simple rewriting (with nullable value)") { + val data = sparkSession.sparkContext.parallelize(Seq(Row("Rob", 18, null.asInstanceOf[StructType]))) + val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first() + val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(schema1, mergedSchema, JCollections.emptyMap()) + val newRow = rowWriter(oldRow) + + val serDe = sparkAdapter.createSparkRowSerDe(mergedSchema) + assertEquals(serDe.deserializeRow(newRow), Row("Rob", 18, null.asInstanceOf[StructType], null.asInstanceOf[StringType], null.asInstanceOf[IntegerType])) + } + /** * test record data type changes. * int => long/float/double/string @@ -61,7 +111,7 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef * String => date/decimal * date => String */ - test("test rewrite record with type changed") { + test("Test rewrite record with type changed") { val avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\"" + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null}," + "{\"name\":\"comb\",\"type\":[\"null\",\"int\"],\"default\":null}," @@ -114,7 +164,22 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef val internalSchema = AvroInternalSchemaConverter.convert(avroSchema) // do change type operation val updateChange = TableChanges.ColumnUpdateChange.get(internalSchema) - updateChange.updateColumnType("id", Types.LongType.get).updateColumnType("comb", Types.FloatType.get).updateColumnType("com1", Types.DoubleType.get).updateColumnType("col0", Types.StringType.get).updateColumnType("col1", Types.FloatType.get).updateColumnType("col11", Types.DoubleType.get).updateColumnType("col12", Types.StringType.get).updateColumnType("col2", Types.DoubleType.get).updateColumnType("col21", Types.StringType.get).updateColumnType("col3", Types.StringType.get).updateColumnType("col31", Types.DecimalType.get(18, 9)).updateColumnType("col4", Types.DecimalType.get(18, 9)).updateColumnType("col41", Types.StringType.get).updateColumnType("col5", Types.DateType.get).updateColumnType("col51", Types.DecimalType.get(18, 9)).updateColumnType("col6", Types.StringType.get) + updateChange.updateColumnType("id", Types.LongType.get) + .updateColumnType("comb", Types.FloatType.get) + .updateColumnType("com1", Types.DoubleType.get) + .updateColumnType("col0", Types.StringType.get) + .updateColumnType("col1", Types.FloatType.get) + .updateColumnType("col11", Types.DoubleType.get) + .updateColumnType("col12", Types.StringType.get) + .updateColumnType("col2", Types.DoubleType.get) + .updateColumnType("col21", Types.StringType.get) + .updateColumnType("col3", Types.StringType.get) + .updateColumnType("col31", Types.DecimalType.get(18, 9)) + .updateColumnType("col4", Types.DecimalType.get(18, 9)) + .updateColumnType("col41", Types.StringType.get) + .updateColumnType("col5", Types.DateType.get) + .updateColumnType("col51", Types.DecimalType.get(18, 9)) + .updateColumnType("col6", Types.StringType.get) val newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange) val newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName) val newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap[String, String]) @@ -125,11 +190,14 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef val row = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, structTypeSchema).apply(avroRecord).get val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema) .apply(newRecord).get - val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String]) - internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema) + + val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema, newStructTypeSchema, new HashMap[String, String]) + val newRow = rowWriter(row) + + internalRowCompare(newRowExpected, newRow, newStructTypeSchema) } - test("test rewrite nest record") { + test("Test rewrite nest record") { val record = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get()), Types.Field.get(1, true, "data", Types.StringType.get()), Types.Field.get(2, true, "preferences", @@ -177,8 +245,11 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef val newStructTypeSchema = HoodieInternalRowUtils.getCachedSchema(newAvroSchema) val row = AvroConversionUtils.createAvroToInternalRowConverter(schema, structTypeSchema).apply(avroRecord).get val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema).apply(newAvroRecord).get - val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String]) - internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema) + + val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema, newStructTypeSchema, new HashMap[String, String]) + val newRow = rowWriter(row) + + internalRowCompare(newRowExpected, newRow, newStructTypeSchema) } private def internalRowCompare(expected: Any, actual: Any, schema: DataType): Unit = { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 7a3aa3e3bf58b..7134e8ff7ccfb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -89,12 +89,6 @@ public static class Config { */ static final String SOURCE_FILE_FORMAT = "hoodie.deltastreamer.source.hoodieincr.file.format"; static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet"; - - /** - * Drops all meta fields from the source hudi table while ingesting into sink hudi table. - */ - static final String HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = "hoodie.deltastreamer.source.hoodieincr.drop.all.meta.fields.from.source"; - public static final Boolean DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE = false; } public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, @@ -159,35 +153,8 @@ public Pair>, String> fetchNextBatch(Option lastCkpt queryTypeAndInstantEndpts.getRight().getRight())); } - /* - * log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema()); - * - * StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema - * = newSchema.add(field, DataTypes.StringType, true); } - * - * /** Validates if the commit time is sane and also generates Partition fields from _hoodie_partition_path if - * configured - * - * Dataset validated = source.map((MapFunction) (Row row) -> { // _hoodie_instant_time String - * instantTime = row.getString(0); IncrSourceHelper.validateInstantTime(row, instantTime, instantEndpts.getKey(), - * instantEndpts.getValue()); if (!partitionFields.isEmpty()) { // _hoodie_partition_path String hoodiePartitionPath - * = row.getString(3); List partitionVals = - * extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream() .map(o -> (Object) - * o).collect(Collectors.toList()); ValidationUtils.checkArgument(partitionVals.size() == partitionFields.size(), - * "#partition-fields != #partition-values-extracted"); List rowObjs = new - * ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); rowObjs.addAll(partitionVals); return - * RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema)); - * - * log.info("Validated Source Schema :" + validated.schema()); - */ - boolean dropAllMetaFields = props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE, - Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE); - - // Remove Hoodie meta columns except partition path from input source - String[] colsToDrop = dropAllMetaFields ? HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) : - HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new); - final Dataset src = source.drop(colsToDrop); - // log.info("Final Schema from Source is :" + src.schema()); + // Remove Hoodie meta columns + final Dataset src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new)); return Pair.of(Option.of(src), queryTypeAndInstantEndpts.getRight().getRight()); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java index fd684e95b2d6d..4c03e1c67f2f3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java @@ -73,6 +73,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona String propsFilePath; String tableBasePath; + @Disabled("HUDI-5653") @ParameterizedTest @EnumSource(HoodieTableType.class) void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception {