Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.internal.schema.InternalSchema;
Expand Down Expand Up @@ -86,8 +87,7 @@ public void write() {
.withLatestCommitTime(instantTime).withFileSlice(fileSlice).withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
.withInternalSchema(internalSchemaOption).withProps(props).withShouldUseRecordPosition(false).build()) {
// Reads the records from the file slice
try (HoodieFileGroupReader.HoodieFileGroupReaderIterator<RowData> recordIterator =
(HoodieFileGroupReader.HoodieFileGroupReaderIterator<RowData>) fileGroupReader.getClosableIterator()) {
try (ClosableIterator<RowData> recordIterator = (ClosableIterator<RowData>) fileGroupReader.getClosableIterator()) {
while (recordIterator.hasNext()) {
// Constructs Flink record for the Flink Parquet file writer
RowData row = recordIterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.read.HoodieFileGroupReader.HoodieFileGroupReaderIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.internal.schema.InternalSchema;
Expand Down Expand Up @@ -95,8 +95,7 @@ public void write() {
.withLatestCommitTime(instantTime).withFileSlice(fileSlice).withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
.withInternalSchema(internalSchemaOption).withProps(props).withShouldUseRecordPosition(usePosition).build()) {
// Reads the records from the file slice
try (HoodieFileGroupReaderIterator<InternalRow> recordIterator
= (HoodieFileGroupReaderIterator<InternalRow>) fileGroupReader.getClosableIterator()) {
try (ClosableIterator<InternalRow> recordIterator = (ClosableIterator<InternalRow>) fileGroupReader.getClosableIterator()) {
StructType sparkSchema = AvroConversionUtils.convertAvroSchemaToStructType(writeSchemaWithMetaFields);
while (recordIterator.hasNext()) {
// Constructs Spark record for the Spark Parquet file writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,16 @@ public Object getValue(InternalRow row, Schema schema, String fieldName) {

@Override
public HoodieRecord<InternalRow> constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
if (bufferedRecord.isDelete()) {
return new HoodieEmptyRecord<>(
new HoodieKey(bufferedRecord.getRecordKey(), null),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a bug fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we did not have a need for the hoodie record outside of the merger fallback code so not a bug but now we have another use case with the new iterator so this partition field should be set.

hoodieKey,
HoodieRecord.HoodieRecordType.SPARK);
}

Schema schema = getSchemaFromBufferRecord(bufferedRecord);
InternalRow row = bufferedRecord.getRecord();
return new HoodieSparkRecord(row, HoodieInternalRowUtils.getCachedSchema(schema));
return new HoodieSparkRecord(hoodieKey, row, HoodieInternalRowUtils.getCachedSchema(schema), false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestMerger;
Expand Down Expand Up @@ -125,11 +126,12 @@ public HoodieRecord<IndexedRecord> constructHoodieRecord(BufferedRecord<IndexedR
if (bufferedRecord.isDelete()) {
return SpillableMapUtils.generateEmptyPayload(
bufferedRecord.getRecordKey(),
null,
partitionPath,
bufferedRecord.getOrderingValue(),
payloadClass);
}
return new HoodieAvroIndexedRecord(bufferedRecord.getRecord());
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
return new HoodieAvroIndexedRecord(hoodieKey, bufferedRecord.getRecord());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public abstract class HoodieReaderContext<T> {
private Boolean hasBootstrapBaseFile = null;
private Boolean needsBootstrapMerge = null;
private Boolean shouldMergeUseRecordPosition = null;
protected String partitionPath;

// for encoding and decoding schemas to the spillable map
private final LocalAvroSchemaCache localAvroSchemaCache = LocalAvroSchemaCache.getInstance();
Expand Down Expand Up @@ -129,6 +130,10 @@ public void setHasLogFiles(boolean hasLogFiles) {
this.hasLogFiles = hasLogFiles;
}

public void setPartitionPath(String partitionPath) {
this.partitionPath = partitionPath;
}

// Getter and Setter for hasBootstrapBaseFile
public boolean getHasBootstrapBaseFile() {
return hasBootstrapBaseFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
Expand Down Expand Up @@ -80,8 +78,6 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB
protected final HoodieReaderContext<T> readerContext;
protected final Schema readerSchema;
protected final Option<String> orderingFieldName;
protected final Option<String> partitionNameOverrideOpt;
protected final Option<String[]> partitionPathFieldOpt;
protected final RecordMergeMode recordMergeMode;
protected final Option<HoodieRecordMerger> recordMerger;
protected final Option<String> payloadClass;
Expand All @@ -101,31 +97,19 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB
protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
Option<String> partitionNameOverrideOpt,
Option<String[]> partitionPathFieldOpt,
TypedProperties props,
HoodieReadStats readStats) {
HoodieReadStats readStats,
Option<String> orderingFieldName) {
this.readerContext = readerContext;
this.readerSchema = AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
this.partitionNameOverrideOpt = partitionNameOverrideOpt;
this.partitionPathFieldOpt = partitionPathFieldOpt;
this.recordMergeMode = recordMergeMode;
this.recordMerger = readerContext.getRecordMerger();
if (recordMerger.isPresent() && recordMerger.get().getMergingStrategy().equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID)) {
this.payloadClass = Option.of(hoodieTableMetaClient.getTableConfig().getPayloadClass());
} else {
this.payloadClass = Option.empty();
}
this.orderingFieldName = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING
? Option.empty()
: Option.ofNullable(ConfigUtils.getOrderingField(props))
.or(() -> {
String preCombineField = hoodieTableMetaClient.getTableConfig().getPreCombineField();
if (StringUtils.isNullOrEmpty(preCombineField)) {
return Option.empty();
}
return Option.of(preCombineField);
});
this.orderingFieldName = orderingFieldName;
this.props = props;
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
this.hoodieTableMetaClient = hoodieTableMetaClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.PartitionPathParser;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
Expand Down Expand Up @@ -77,6 +80,7 @@ public final class HoodieFileGroupReader<T> implements Closeable {
private final List<HoodieLogFile> logFiles;
private final String partitionPath;
private final Option<String[]> partitionPathFields;
private final Option<String> orderingFieldName;
private final HoodieStorage storage;
private final TypedProperties props;
// Byte offset to start reading from the base file
Expand Down Expand Up @@ -143,6 +147,7 @@ private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, HoodieStorag
boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, HoodieReaderConfig.MERGE_TYPE, true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition && !isSkipMerge);
readerContext.setHasLogFiles(!this.logFiles.isEmpty());
readerContext.setPartitionPath(partitionPath);
if (readerContext.getHasLogFiles() && start != 0) {
throw new IllegalArgumentException("Filegroup reader is doing log file merge but not reading from the start of the base file");
}
Expand All @@ -151,6 +156,16 @@ private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, HoodieStorag
? new PositionBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props)
: new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props));
this.outputConverter = readerContext.getSchemaHandler().getOutputConverter();
this.orderingFieldName = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING
? Option.empty()
: Option.ofNullable(ConfigUtils.getOrderingField(props))
.or(() -> {
String preCombineField = hoodieTableMetaClient.getTableConfig().getPreCombineField();
if (StringUtils.isNullOrEmpty(preCombineField)) {
return Option.empty();
}
return Option.of(preCombineField);
});
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
Expand All @@ -161,27 +176,26 @@ private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, HoodieStorag
/**
* Initialize correct record buffer
*/
private static FileGroupRecordBuffer getRecordBuffer(HoodieReaderContext readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
TypedProperties props,
Option<HoodieBaseFile> baseFileOption,
boolean hasNoLogFiles,
boolean isSkipMerge,
boolean shouldUseRecordPosition,
HoodieReadStats readStats) {
private FileGroupRecordBuffer<T> getRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
TypedProperties props,
Option<HoodieBaseFile> baseFileOption,
boolean hasNoLogFiles,
boolean isSkipMerge,
boolean shouldUseRecordPosition,
HoodieReadStats readStats) {
if (hasNoLogFiles) {
return null;
} else if (isSkipMerge) {
return new UnmergedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props, readStats);
readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats);
} else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
return new PositionBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode, Option.empty(),
Option.empty(), baseFileOption.get().getCommitTime(), props, readStats);
readerContext, hoodieTableMetaClient, recordMergeMode, baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName);
} else {
return new KeyBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props, readStats);
readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName);
}
}

Expand Down Expand Up @@ -352,11 +366,21 @@ public void close() throws IOException {
}
}

public HoodieFileGroupReaderIterator<T> getClosableIterator() throws IOException {
public ClosableIterator<T> getClosableIterator() throws IOException {
initRecordIterators();
return new HoodieFileGroupReaderIterator<>(this);
}

/**
* @return An iterator over the records that wraps the engine-specific record in a HoodieRecord.
*/
public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator() throws IOException {
return new CloseableMappingIterator<>(getClosableIterator(), nextRecord -> {
BufferedRecord<T> bufferedRecord = BufferedRecord.forRecordWithContext(nextRecord, readerContext.getSchemaHandler().getRequestedSchema(), readerContext, orderingFieldName, false);
return readerContext.constructHoodieRecord(bufferedRecord);
});
}

public static class HoodieFileGroupReaderIterator<T> implements ClosableIterator<T> {
private HoodieFileGroupReader<T> reader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ public class KeyBasedFileGroupRecordBuffer<T> extends FileGroupRecordBuffer<T> {
public KeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
Option<String> partitionNameOverrideOpt,
Option<String[]> partitionPathFieldOpt,
TypedProperties props,
HoodieReadStats readStats) {
super(readerContext, hoodieTableMetaClient, recordMergeMode, partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
HoodieReadStats readStats,
Option<String> orderingFieldName) {
super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ public class PositionBasedFileGroupRecordBuffer<T> extends KeyBasedFileGroupReco
public PositionBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
Option<String> partitionNameOverrideOpt,
Option<String[]> partitionPathFieldOpt,
String baseFileInstantTime,
TypedProperties props,
HoodieReadStats readStats) {
super(readerContext, hoodieTableMetaClient, recordMergeMode, partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
HoodieReadStats readStats,
Option<String> orderingFieldName) {
super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, orderingFieldName);
this.baseFileInstantTime = baseFileInstantTime;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ public UnmergedFileGroupRecordBuffer(
HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
Option<String> partitionNameOverrideOpt,
Option<String[]> partitionPathFieldOpt,
TypedProperties props,
HoodieReadStats readStats) {
super(readerContext, hoodieTableMetaClient, recordMergeMode, partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
super(readerContext, hoodieTableMetaClient, recordMergeMode, props, readStats, Option.empty());
this.currentInstantLogBlocks = new ArrayDeque<>();
}

Expand Down
Loading
Loading