Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_4
displayName: UT FT other modules
timeoutInMinutes: '150'
timeoutInMinutes: '180'
steps:
- task: [email protected]
displayName: maven install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
HoodieRecord rewrittenRecord = schemaOnReadEnabled ? finalRecord.get().rewriteRecordWithNewSchema(tableSchema, recordProperties, writeSchemaWithMetaFields)
: finalRecord.get().rewriteRecord(tableSchema, recordProperties, writeSchemaWithMetaFields);
HoodieRecord populatedRecord = populateMetadataFields(rewrittenRecord, writeSchemaWithMetaFields, recordProperties);
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into the recordList(List).
HoodieRecord populatedRecord = populateMetadataFields(rewrittenRecord.copy(), writeSchemaWithMetaFields, recordProperties);
finalRecord = Option.of(populatedRecord);
if (isUpdateRecord) {
updatedRecordsWritten++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,6 @@ public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?> hoodieTab
return baseFileOp.get();
}

@Override
public Schema getWriterSchemaWithMetaFields() {
return writeSchemaWithMetaFields;
}

public Schema getWriterSchema() {
return writeSchema;
}

/**
* Extract old file path, initialize StorageWriter and WriteStatus.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public Schema getWriterSchemaWithMetaFields() {
return writeSchemaWithMetaFields;
}

public Schema getWriterSchema() {
return writeSchema;
}

/**
* Determines whether we can accept the incoming records, into the current file. Depending on
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public BootstrapRecordConsumer(HoodieBootstrapHandle bootstrapHandle) {

@Override
protected void consumeOneRecord(HoodieRecord record) {
bootstrapHandle.write(record, bootstrapHandle.getWriterSchemaWithMetaFields(), new TypedProperties());
bootstrapHandle.write(record, bootstrapHandle.getWriterSchema(), new TypedProperties());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,22 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
if (needToReWriteRecord) {
readerIterator = new RewriteIterator(reader.getRecordIterator(), readSchema, readSchema, table.getConfig().getProps(), renameCols);
readerIterator = new RewriteIterator(reader.getRecordIterator(), reader.getSchema(), readSchema, table.getConfig().getProps(), renameCols);
} else {
readerIterator = reader.getRecordIterator(readSchema);
}
}

wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), readerIterator,
new UpdateHandler(mergeHandle), record -> {
HoodieRecord recordCopy = record.copy();
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into queue of BoundedInMemoryExecutor.
if (!externalSchemaTransformation) {
return recordCopy;
return record.copy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment elaborating why we're making a copy here

}
try {
return recordCopy.rewriteRecord(writerSchema, new Properties(), readerSchema);
return record.rewriteRecord(writerSchema, new Properties(), readerSchema).copy();
} catch (IOException e) {
throw new HoodieException(String.format("Failed to rewrite record. WriterSchema: %s; ReaderSchema: %s", writerSchema, readerSchema), e);
}
Expand Down Expand Up @@ -175,10 +177,10 @@ class RewriteIterator implements ClosableIterator<HoodieRecord> {
private final Properties prop;
private final Map<String, String> renameCols;

public RewriteIterator(ClosableIterator<HoodieRecord> iter, Schema newSchema, Schema recordSchema, Properties prop, Map<String, String> renameCols) {
public RewriteIterator(ClosableIterator<HoodieRecord> iter, Schema recordSchema, Schema newSchema, Properties prop, Map<String, String> renameCols) {
this.iter = iter;
this.newSchema = newSchema;
this.recordSchema = recordSchema;
this.newSchema = newSchema;
this.prop = prop;
this.renameCols = renameCols;
}
Expand All @@ -191,7 +193,8 @@ public boolean hasNext() {
@Override
public HoodieRecord next() {
try {
return iter.next().rewriteRecordWithNewSchema(recordSchema, prop, newSchema, renameCols);
HoodieRecord record = iter.next();
return record.rewriteRecordWithNewSchema(recordSchema, prop, newSchema, renameCols);
} catch (IOException e) {
LOG.error("Error rewrite record with new schema", e);
throw new HoodieException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List

wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
Option.of(new UpdateHandler(mergeHandle)), record -> {
HoodieRecord recordCopy = record.copy();
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into queue of BoundedInMemoryExecutor.
if (!externalSchemaTransformation) {
return recordCopy;
return record.copy();
}
try {
return recordCopy.rewriteRecord(writerSchema, new Properties(), readerSchema);
return record.rewriteRecord(writerSchema, new Properties(), readerSchema).copy();
} catch (IOException e) {
throw new HoodieException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ private List<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperat
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
Iterator<HoodieRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema);
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into the records(List).
recordIterator.forEachRemaining(record -> records.add(record.copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, new Properties(), Option.empty())));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List

wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
Option.of(new UpdateHandler(mergeHandle)), record -> {
HoodieRecord recordCopy = record.copy();
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into queue of BoundedInMemoryExecutor.
if (!externalSchemaTransformation) {
return recordCopy;
return record.copy();
}
try {
return recordCopy.rewriteRecord(writerSchema, new Properties(), readerSchema);
return record.rewriteRecord(writerSchema, new Properties(), readerSchema).copy();
} catch (IOException e) {
throw new HoodieException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(Ho
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be shuffled later.
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
.map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
.collect(Collectors.toList());
Expand Down Expand Up @@ -331,6 +334,9 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContex
HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(hadoopConf.get(), new Path(clusteringOp.getDataFilePath()));
Option<BaseKeyGenerator> keyGeneratorOp =
writeConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be shuffled later.
MappingIterator mappingIterator = new MappingIterator((ClosableIterator<HoodieRecord>) baseFileReader.getRecordIterator(readerSchema),
rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema,
writeConfig.getProps(), keyGeneratorOp));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ private Iterator<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOp
HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
Option<BaseKeyGenerator> keyGeneratorOp =
writeConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be shuffled later.
MappingIterator mappingIterator = new MappingIterator((ClosableIterator<HoodieRecord>) baseFileReader.getRecordIterator(readerSchema),
rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema,
getWriteConfig().getProps(), keyGeneratorOp));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.Arrays;

/**
* Hudi internal implementation of the {@link InternalRow} allowing to extend arbitrary
* {@link InternalRow} overlaying Hudi-internal meta-fields on top of it.
Expand Down Expand Up @@ -231,7 +229,11 @@ public MapData getMap(int ordinal) {

@Override
public InternalRow copy() {
return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), sourceRow.copy(), sourceContainsMetaFields);
UTF8String[] copyMetaFields = new UTF8String[metaFields.length];
for (int i = 0; i < metaFields.length; i++) {
copyMetaFields[i] = metaFields[i] != null ? metaFields[i].copy() : null;
}
return new HoodieInternalRow(copyMetaFields, sourceRow.copy(), sourceContainsMetaFields);
}

private int rebaseOrdinal(int ordinal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.Schema;
import org.apache.hudi.HoodieInternalRowUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.client.model.HoodieInternalRow;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
Expand All @@ -40,7 +39,7 @@
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
import org.apache.hudi.util.HoodieSparkRecordUtils;
import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
Expand Down Expand Up @@ -116,7 +115,7 @@ public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, boo
}

private HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, boolean copy) {
super(key, data, operation);
super(key, data, operation, Option.empty());

validateRow(data, schema);
this.copy = copy;
Expand Down Expand Up @@ -197,28 +196,31 @@ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);

boolean containMetaFields = hasMetaFields(structType);
UTF8String[] metaFields = tryExtractMetaFields(data, structType);
// TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType).apply(rewriteRecord);

// TODO add actual rewriting
InternalRow finalRow = new HoodieInternalRow(metaFields, data, containMetaFields);
boolean containMetaFields = hasMetaFields(targetStructType);
UTF8String[] metaFields = tryExtractMetaFields(unsafeRow, targetStructType);
HoodieInternalRow internalRow = new HoodieInternalRow(metaFields, unsafeRow, containMetaFields);

return new HoodieSparkRecord(getKey(), finalRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, copy);
return new HoodieSparkRecord(getKey(), internalRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, false);
}

@Override
public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);

boolean containMetaFields = hasMetaFields(structType);
UTF8String[] metaFields = tryExtractMetaFields(data, structType);
// TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType, newStructType, renameCols);
UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType, newStructType).apply(rewriteRecord);

InternalRow rewrittenRow =
HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, rewrittenRow, containMetaFields);
boolean containMetaFields = hasMetaFields(newStructType);
UTF8String[] metaFields = tryExtractMetaFields(unsafeRow, newStructType);
HoodieInternalRow internalRow = new HoodieInternalRow(metaFields, unsafeRow, containMetaFields);

return new HoodieSparkRecord(getKey(), finalRow, newStructType, getOperation(), this.currentLocation, this.newLocation, copy);
return new HoodieSparkRecord(getKey(), internalRow, newStructType, getOperation(), this.currentLocation, this.newLocation, false);
}

@Override
Expand Down Expand Up @@ -299,6 +301,7 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, P

@Override
public Option<Map<String, String>> getMetadata() {
// TODO HUDI-5282 support metaData
return Option.empty();
}

Expand All @@ -320,7 +323,7 @@ public HoodieSparkRecord copy() {
public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
String orderingField = ConfigUtils.getOrderingField(props);
if (!HoodieCatalystExpressionUtils$.MODULE$.existField(structType, orderingField)) {
if (!HoodieInternalRowUtils.existField(structType, orderingField)) {
return 0;
} else {
NestedFieldPath nestedFieldPath = HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
Expand Down Expand Up @@ -377,7 +380,10 @@ private static HoodieInternalRow wrapIntoUpdatableOverlay(InternalRow data, Stru

private static UTF8String[] tryExtractMetaFields(InternalRow row, StructType structType) {
boolean containsMetaFields = hasMetaFields(structType);
if (containsMetaFields) {
if (containsMetaFields && structType.size() == 1) {
// Support bootstrap with RECORD_KEY_SCHEMA
return new UTF8String[] {row.getUTF8String(0)};
} else if (containsMetaFields) {
return HoodieRecord.HOODIE_META_COLUMNS.stream()
.map(col -> row.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(col)))
.toArray(UTF8String[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieInternalRowUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkConversionUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
Expand All @@ -37,6 +36,7 @@
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.io.storage;

import org.apache.hudi.HoodieInternalRowUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
Expand All @@ -34,6 +33,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.sql.HoodieInternalRowUtils;

import java.io.IOException;

Expand Down
Loading