Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,43 @@ public static String getPartitionPathFromGenericRecord(GenericRecord genericReco
*/
public static String[] extractRecordKeys(String recordKey) {
String[] fieldKV = recordKey.split(",");
if (fieldKV.length == 1) {
return fieldKV;
} else {
// a complex key
return Arrays.stream(fieldKV).map(kv -> {
final String[] kvArray = kv.split(":");
if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
return null;
} else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
return "";
} else {
return kvArray[1];
}
}).toArray(String[]::new);
}

return Arrays.stream(fieldKV).map(kv -> {
final String[] kvArray = kv.split(":");
if (kvArray.length == 1) {
return kvArray[0];
} else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
return null;
} else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
return "";
} else {
return kvArray[1];
}
}).toArray(String[]::new);
}

/**
* Extracts the partition fields in strings out of the given partitionPath,
* this is the reverse operation of {@link #getPartitionPath(GenericRecord record, String partitionPathField,
* boolean hiveStylePartitioning, boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled)}.
*
* @see SimpleAvroKeyGenerator
* @see org.apache.hudi.keygen.ComplexAvroKeyGenerator
*/
public static String[] extractPartitionPath(String partitionPath, boolean hiveStylePartitioning, boolean encodePartitionPath) {
String[] fields = partitionPath.split(DEFAULT_PARTITION_PATH_SEPARATOR);

return Arrays.stream(fields).map(field -> {
String partitionVal = field;
if (hiveStylePartitioning) {
final String[] partitionArray = field.split("=");
partitionVal = partitionArray.length == 1 ? partitionArray[0] : partitionArray[1];
}
if (encodePartitionPath) {
partitionVal = PartitionPathEncodeUtils.unescapePathName(partitionVal);
}
return partitionVal;
}).toArray(String[]::new);
}

public static String getRecordKey(GenericRecord record, List<String> recordKeyFields, boolean consistentLogicalTimestampEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ public Stream<HoodieBaseFile> getAllBaseFiles() {
return getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).map(slice -> slice.getBaseFile().get());
}

/**
* Stream of committed log files, sorted reverse commit time.
*/
public Stream<HoodieLogFile> getAllLogFiles() {
return getAllFileSlices().flatMap(slice -> slice.getLogFiles());
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("HoodieFileGroup {");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,19 @@ public final Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partit
}
}

public final Stream<FileSlice> getMergedFileSlicesUsingLogFiles(String partitionStr, String maxInstantTime) {
try {
readLock.lock();
String partition = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partition);
return fetchAllStoredFileGroups(partition)
.filter(fg -> !isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxInstantTime))
.map(fileGroup -> Option.of(mergeLogFilesInFileGroup(fileGroup))).filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
}
}

@Override
public final Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn) {
try {
Expand Down Expand Up @@ -1052,6 +1065,19 @@ private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, F
return merged;
}

/**
* Helper to merge logFiles in a fileGroup
*
* @param fileGroup the file-group used to construct FileSlice
*/
private static FileSlice mergeLogFilesInFileGroup(HoodieFileGroup fileGroup) {
Option<FileSlice> firstFileSlice = Option.fromJavaOptional(fileGroup.getAllFileSlices().min(Comparator.comparing(FileSlice::getBaseInstantTime)));
FileSlice merged = new FileSlice(firstFileSlice.get());
merged.setBaseFile(null);
fileGroup.getAllLogFiles().forEach(merged::addLogFile);
return merged;
}

/**
* If the file-slice is because of pending compaction instant, this method merges the file-slice with the one before
* the compaction instant time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,18 @@ private FlinkOptions() {
.withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space by"
+ "skipping over files");

public static final ConfigOption<Boolean> READ_BATCH_INCREMENTAL_CHANGELOG_ENABLED = ConfigOptions
.key("read.batch.incremental.changelog.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enables batch incremental query changLog");

public static final ConfigOption<Boolean> READ_DATA_DELETE = ConfigOptions
.key("read.data.delete.enabled")
.booleanType()
.defaultValue(false)// default not read delete data
.withDescription("Whether to read delete data, default false");

// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
Expand Down Expand Up @@ -214,8 +215,14 @@ public Result inputSplits(
? commitTimeline.lastInstant().get().getTimestamp()
: instants.get(instants.size() - 1).getTimestamp();

List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
List<MergeOnReadInputSplit> inputSplits;
if (fullTableScan || this.conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.COPY_ON_WRITE.name()) || !this.conf.getBoolean(FlinkOptions.READ_BATCH_INCREMENTAL_CHANGELOG_ENABLED)) {
inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, instantRange);
} else {
inputSplits = getInputSplitsUsingLogFiles(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, instantRange);
}

return Result.instance(inputSplits, endInstant);
}
Expand Down Expand Up @@ -341,6 +348,31 @@ private List<MergeOnReadInputSplit> getInputSplits(
.collect(Collectors.toList());
}

private List<MergeOnReadInputSplit> getInputSplitsUsingLogFiles(
HoodieTableMetaClient metaClient,
HoodieTimeline commitTimeline,
FileStatus[] fileStatuses,
Set<String> readPartitions,
String endInstant,
InstantRange instantRange) {
final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
return readPartitions.stream()
.map(relPartitionPath -> fsView.getMergedFileSlicesUsingLogFiles(relPartitionPath, endInstant)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
null, logPaths, endInstant,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId());
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

private FileIndex getFileIndex() {
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
if (this.requiredPartitions != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ private List<MergeOnReadInputSplit> buildFileIndex() {
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();

final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
boolean emitDelete = this.conf.getBoolean(FlinkOptions.READ_DATA_DELETE);
switch (queryType) {
case FlinkOptions.QUERY_TYPE_SNAPSHOT:
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
Expand All @@ -350,7 +351,7 @@ private List<MergeOnReadInputSplit> buildFileIndex() {
return InputFormats.EMPTY_INPUT_FORMAT;
}
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, inputSplits, false);
rowDataType, inputSplits, emitDelete);
case COPY_ON_WRITE:
return baseFileOnlyInputFormat();
default:
Expand All @@ -372,7 +373,7 @@ private List<MergeOnReadInputSplit> buildFileIndex() {
return InputFormats.EMPTY_INPUT_FORMAT;
}
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, result.getInputSplits(), false);
rowDataType, result.getInputSplits(), emitDelete);
default:
String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,15 @@ private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split
final int[] pkOffset = tableState.getPkOffsetsInRequired();
// flag saying whether the pk semantics has been dropped by user specified
// projections. For e.g, if the pk fields are [a, b] but user only select a,
// then the pk semantics is lost.
// then the pk semantics is lost.final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getColumnTypes(pkOffset);
final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
final int[] partitionOffset = tableState.getColumnsOffsetsInRequired(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","));
final boolean partitionSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
final LogicalType[] partitionTypes = partitionSemanticLost ? null : tableState.getColumnTypes(partitionOffset);
final StringToRowDataConverter partitionConverter = partitionSemanticLost ? null : new StringToRowDataConverter(partitionTypes);
final int preCombineOffset = tableState.getRequiredPosition(conf.getString(FlinkOptions.PRECOMBINE_FIELD));

return new ClosableIterator<RowData>() {
private RowData currentRecord;
Expand All @@ -340,10 +345,12 @@ private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit split
public boolean hasNext() {
while (logRecordsKeyIterator.hasNext()) {
String curAvroKey = logRecordsKeyIterator.next();
Option<IndexedRecord> curAvroRecord = null;
Option<IndexedRecord> curAvroRecord;
Comparable preCombineValue;
final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord) scanner.getRecords().get(curAvroKey);
try {
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
preCombineValue = hoodieRecord.getData().getOrderingValue();
} catch (IOException e) {
throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e);
}
Expand All @@ -358,6 +365,20 @@ public boolean hasNext() {
for (int i = 0; i < pkOffset.length; i++) {
delete.setField(pkOffset[i], converted[i]);
}
if (preCombineOffset != -1) {
final AvroToRowDataConverters.AvroToRowDataConverter preCombineConverter =
AvroToRowDataConverters.createConverter(tableState.getRequiredRowType().getTypeAt(preCombineOffset));
delete.setField(preCombineOffset, preCombineConverter.convert(preCombineValue));
}
if (!partitionSemanticLost) {
final String partitionPath = hoodieRecord.getPartitionPath();
final String[] partitionFields =
KeyGenUtils.extractPartitionPath(partitionPath, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING));
final Object[] partitionConverted = partitionConverter.convert(partitionFields);
for (int i = 0; i < partitionOffset.length; i++) {
delete.setField(partitionOffset[i], partitionConverted[i]);
}
}
delete.setRowKind(RowKind.DELETE);

this.currentRecord = delete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public int[] getRequiredPositions() {
.toArray();
}

public int getRequiredPosition(String fieldName) {
final List<String> fieldNames = requiredRowType.getFieldNames();
return fieldNames.indexOf(fieldName);
}

/**
* Get the primary key positions in required row type.
*/
Expand All @@ -102,16 +107,27 @@ public int[] getPkOffsetsInRequired() {
}

/**
* Returns the primary key fields logical type with given offsets.
* Get column positions in required row type.
*/
public int[] getColumnsOffsetsInRequired(String[] columns) {
final List<String> fieldNames = requiredRowType.getFieldNames();
return Arrays.stream(columns)
.map(fieldNames::indexOf)
.mapToInt(i -> i)
.toArray();
}

/**
* Returns the fields logical type with given offsets.
*
* @param pkOffsets the pk offsets in required row type
* @return pk field logical types
* @param Offsets the offsets in required row type
* @return field logical types
* @see #getPkOffsetsInRequired()
*/
public LogicalType[] getPkTypes(int[] pkOffsets) {
public LogicalType[] getColumnTypes(int[] offsets) {
final LogicalType[] requiredTypes = requiredRowType.getFields().stream()
.map(RowType.RowField::getType).toArray(LogicalType[]::new);
return Arrays.stream(pkOffsets).mapToObj(offset -> requiredTypes[offset])
return Arrays.stream(offsets).mapToObj(offset -> requiredTypes[offset])
.toArray(LogicalType[]::new);
}
}
Loading