Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3b731f0
Tidying up
Oct 13, 2022
1b6bd4f
Tidying up more
Oct 13, 2022
b5dfef6
Cleaning up duplication
Oct 13, 2022
068f514
Tidying up
Oct 13, 2022
3b4cadb
Revisited legacy operating mode configuration
Oct 13, 2022
3c061b1
Tidying up
Oct 13, 2022
d9f177e
Cleaned up `projectUnsafe` API
Oct 13, 2022
52a1cba
Fixing compilation
Oct 13, 2022
7289ea7
Cleaning up `HoodieSparkRecord` ctors;
Oct 13, 2022
5b1170e
Fixing compilation
Oct 13, 2022
d0ea187
Cleaned up `ParquetReader` initialization
Oct 14, 2022
94472f7
Revisited `HoodieSparkRecord` to accept either `UnsafeRow` or `Hoodie…
Oct 14, 2022
a1b3955
Cleaning up redundant exception spec
Oct 14, 2022
b26e304
Make sure `updateMetadataFields` properly wraps `InternalRow` into `H…
Oct 14, 2022
6cccc60
Fixed meta-fields extraction and `HoodieInternalRow` composition w/in…
Oct 14, 2022
c100fc4
De-duplicate `HoodieSparkRecord` ctors;
Oct 14, 2022
dd12671
Removed unnecessary copying
Oct 14, 2022
b375589
Cleaned up projection for `HoodieSparkRecord` (dropping partition col…
Oct 14, 2022
64c9cef
Fixing compilation
Oct 14, 2022
04ccf7f
Fixing compilation (for Flink)
Oct 14, 2022
9835631
Cleaned up File Raders' interfaces:
Oct 14, 2022
533e7a4
Cleaned up File Readers impls (inline with then new interfaces)
Oct 14, 2022
39f5c2c
Rebsaed `HoodieBackedTableMetadata` onto new `HoodieSeekingFileReader`
Oct 14, 2022
b6776d1
Tidying up
Oct 14, 2022
9db7567
Missing licenses
Oct 14, 2022
ff7ce6d
Re-instate custom override for `HoodieAvroParquetReader`;
Oct 14, 2022
13ed917
Fixed missing cloning w/in `HoodieLazyInsertIterable`
Oct 14, 2022
24fb1ff
Fixed missing cloning in deduplication flow
Oct 14, 2022
d1e037a
Allow `HoodieSparkRecord` to hold `ColumnarBatchRow`
Oct 15, 2022
7f73b29
Missing licenses
Oct 15, 2022
9e2aabd
Fixing compilation
Oct 15, 2022
14f36e0
Missing changes
Oct 15, 2022
e3941b7
Fixed Spark 2.x validation whether the row was read as a batch
Oct 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
Expand Down Expand Up @@ -90,18 +90,24 @@ public R getResult() {
}
}

/**
* Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some
* expensive operations of transformation to the reader thread.
*/
static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
Schema schema, HoodieWriteConfig config) {
return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, config.getProps());
static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema,
HoodieWriteConfig config) {
return getCloningTransformerInternal(schema, config.getProps());
}

static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
Schema schema) {
return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, CollectionUtils.emptyProps());
static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema) {
return getCloningTransformerInternal(schema, new TypedProperties());
}

private static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformerInternal(Schema schema,
TypedProperties props) {
return record -> {
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be subsequently buffered (w/in the in-memory queue)
HoodieRecord<T> clonedRecord = record.copy();
return new HoodieInsertValueGenResult(clonedRecord, schema, props);
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props
} else {
rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields);
}
MetadataValues metadataValues = new MetadataValues();
metadataValues.setFileName(path.getName());
MetadataValues metadataValues = new MetadataValues().setFileName(path.getName());
rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues);
if (preserveMetadata) {
fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,12 +388,11 @@ protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema schema,
} else {
rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields);
}
MetadataValues metadataValues = new MetadataValues();
metadataValues.setFileName(newFilePath.getName());
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
// file holding this record even in cases when overall metadata is preserved
MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName());
rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues);
if (shouldPreserveRecordMetadata) {
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
// file holding this record even in cases when overall metadata is preserved
fileWriter.write(key.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
} else {
fileWriter.writeWithMetadata(key, rewriteRecord, writeSchemaWithMetaFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hudi.io;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
Expand All @@ -33,10 +36,6 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -80,25 +79,16 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}

protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema)
protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combinedRecordOpt, Schema writerSchema)
throws IOException {
// TODO [HUDI-5019] Remove these unnecessary newInstance invocations
Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
Option<HoodieRecord> savedCombineRecordOp = combinedRecordOpt.map(HoodieRecord::newInstance);
final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combinedRecordOpt, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
Option<IndexedRecord> avroOpt = savedCombineRecordOp
.flatMap(r -> {
try {
return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
.map(HoodieAvroIndexedRecord::getData);
} catch (IOException e) {
LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
return Option.empty();
}
});
cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
Option<IndexedRecord> avroRecordOpt = savedCombineRecordOp.flatMap(r ->
toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
Expand All @@ -27,6 +28,7 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
Expand Down Expand Up @@ -279,4 +281,13 @@ protected HoodieLogFormat.Writer createLogWriter(String baseCommitTime, String f
+ "file suffix: " + fileSuffix + " error");
}
}

protected static Option<IndexedRecord> toAvroRecord(HoodieRecord record, Schema writerSchema, TypedProperties props) {
try {
return record.toIndexedRecord(writerSchema, props).map(HoodieAvroIndexedRecord::getData);
} catch (IOException e) {
LOG.error("Fail to get indexRecord from " + record, e);
return Option.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand All @@ -33,8 +32,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Iterator;

Expand Down Expand Up @@ -77,16 +74,7 @@ protected Iterator<HoodieRecord> getMergingIterator(HoodieTable<T, I, K, O> tabl
return new MergingIterator<>(
(Iterator<HoodieRecord>) reader.getRecordIterator(readerSchema),
(Iterator<HoodieRecord>) bootstrapReader.getRecordIterator(bootstrapReadSchema),
(oneRecord, otherRecord) -> mergeRecords(oneRecord, otherRecord, mergeHandle.getWriterSchemaWithMetaFields()));
}

@Nonnull
private static HoodieRecord mergeRecords(HoodieRecord left, HoodieRecord right, Schema targetSchema) {
try {
return left.joinWith(right, targetSchema);
} catch (IOException e) {
throw new HoodieIOException("Failed to merge records", e);
}
(oneRecord, otherRecord) -> oneRecord.joinWith(otherRecord, mergeHandle.getWriterSchemaWithMetaFields()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecordMerger;
Expand Down Expand Up @@ -82,17 +83,16 @@ public I combineOnCondition(
* @param parallelism parallelism or partitions to be used while reducing/deduplicating
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(
I records, HoodieTable<T, I, K, O> table, int parallelism) {
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int parallelism) {
HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
}

public I deduplicateRecords(
I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) {
return deduplicateRecordsInternal(records, index, parallelism, schema, HoodieAvroRecordMerger.withDeDuping(props), merger);
public I deduplicateRecords(I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) {
TypedProperties updatedProps = HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(props);
return doDeduplicateRecords(records, index, parallelism, schema, updatedProps, merger);
}

protected abstract I deduplicateRecordsInternal(
I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger);
protected abstract I doDeduplicateRecords(
I records, HoodieIndex<?, ?> index, int parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
Expand All @@ -31,7 +32,6 @@
import org.apache.hudi.table.HoodieTable;

import java.io.IOException;
import java.util.Properties;

public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
Expand All @@ -53,8 +53,8 @@ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRec
}

@Override
public HoodieData<HoodieRecord<T>> deduplicateRecordsInternal(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
protected HoodieData<HoodieRecord<T>> doDeduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
boolean isIndexingGlobal = index.isGlobal();
final SerializableSchema schema = new SerializableSchema(schemaStr);
// Auto-tunes the parallelism for reduce transformation based on the number of data partitions
Expand All @@ -64,7 +64,10 @@ public HoodieData<HoodieRecord<T>> deduplicateRecordsInternal(
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return Pair.of(key, record);
// NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
// Here we have to make a copy of the incoming record, since it might be holding
// an instance of [[InternalRow]] pointing into shared, mutable buffer
return Pair.of(key, record.copy());
}).reduceByKey((rec1, rec2) -> {
HoodieRecord<T> reducedRecord;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ public void testReaderGetRecordIterator() throws Exception {
IntStream.concat(IntStream.range(40, NUM_RECORDS * 2), IntStream.range(10, 20))
.mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toList());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
Iterator<IndexedRecord> iterator = hfileReader.getIndexedRecordsByKeysIterator(keys, avroSchema);
Iterator<HoodieRecord<IndexedRecord>> iterator = hfileReader.getRecordsByKeysIterator(keys, avroSchema);

List<Integer> expectedIds =
IntStream.concat(IntStream.range(40, NUM_RECORDS), IntStream.range(10, 20))
.boxed().collect(Collectors.toList());
int index = 0;
while (iterator.hasNext()) {
GenericRecord record = (GenericRecord) iterator.next();
GenericRecord record = (GenericRecord) iterator.next().getData();
String key = "key" + String.format("%02d", expectedIds.get(index));
assertEquals(key, record.get("_row_key").toString());
assertEquals(Integer.toString(expectedIds.get(index)), record.get("time").toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected List<WriteStatus> computeNext() {
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr),
Option.of(getExplicitInsertHandler()), getTransformFunction(schema, hoodieConfig));
Option.of(getExplicitInsertHandler()), getCloningTransformer(schema, hoodieConfig));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,12 @@ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> o
throws IOException {
// TODO [HUDI-5019] Remove these unnecessary newInstance invocations
Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
Option<IndexedRecord> avroOpt = savedCombineRecordOp
.flatMap(r -> {
try {
return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
.map(HoodieAvroIndexedRecord::getData);
} catch (IOException e) {
LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
return Option.empty();
}
});
cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
Option<IndexedRecord> avroRecordOpt = savedCombineRecordOp.flatMap(r ->
toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,12 @@ protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> o
throws IOException {
// TODO [HUDI-5019] Remove these unnecessary newInstance invocations
Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
Option<IndexedRecord> avroOpt = savedCombineRecordOp
.flatMap(r -> {
try {
return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
.map(HoodieAvroIndexedRecord::getData);
} catch (IOException e) {
LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
return Option.empty();
}
});
cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
Option<IndexedRecord> avroRecordOpt = savedCombineRecordOp.flatMap(r ->
toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.hudi.table.action.commit;

import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
Expand All @@ -32,15 +34,12 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.Schema;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -91,8 +90,8 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie
}

@Override
public List<HoodieRecord<T>> deduplicateRecordsInternal(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
protected List<HoodieRecord<T>> doDeduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
// If index used is global, then records are expected to differ in their partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
Expand Down
Loading