Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
818a956
initial draft of refactoring, needs unit test
Apr 22, 2025
3353eff
dedupe ordering field code
Apr 23, 2025
f84f4f9
start adding tests
Apr 23, 2025
42c4f64
fix rebase issues
Apr 28, 2025
9462a2a
reduce size of changes, fix schema evolution, handle deletes in a con…
Apr 28, 2025
2da7a16
add more testing, standardize delete records
Apr 29, 2025
f09b1c2
add more merge tests, fix bugs
Apr 29, 2025
d91d031
style
Apr 30, 2025
597677c
fix update optimization
Apr 30, 2025
e5d4dcd
add delete testing for custom payload
May 1, 2025
bd93749
add some java docs, ensure consistent ordering value type
May 1, 2025
f5d056b
fix use of final
May 1, 2025
6b1abd4
move conversion to record buffer
May 1, 2025
a350afe
update javadocs
May 4, 2025
1fdba01
update comment
May 6, 2025
336cbbd
fix conflicts
May 19, 2025
5ac01b9
add back delete record vs data record processing
May 19, 2025
cf7520d
move delete handling before partial merge, update signatures for cons…
May 19, 2025
a5dd982
add delete handling logic to merger and unit tests
May 20, 2025
f7743d3
simplify null handling, update unit test
May 20, 2025
5dc6d98
fix style, use constant
May 20, 2025
9886515
rename method
May 20, 2025
5a3d2ad
cleanup after rebase
May 23, 2025
a2d6140
undo empty record handling
May 23, 2025
3bb0c71
fix empty
May 23, 2025
2616258
fix conflicts
May 23, 2025
33fa072
restore doProcess methods for capturing logic for handling outputs of…
May 24, 2025
fae6454
add back inner method to keep code more similar to master
May 24, 2025
62720ba
further minimize diff with master
May 24, 2025
99006dc
update javadoc
May 24, 2025
8afa37c
add proper message for illegal state exception
May 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
Expand Down Expand Up @@ -99,9 +100,7 @@ public String getMetaFieldValue(InternalRow record, int pos) {
public HoodieRecord<InternalRow> constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
if (bufferedRecord.isDelete()) {
return new HoodieEmptyRecord<>(
hoodieKey,
HoodieRecord.HoodieRecordType.SPARK);
return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE, bufferedRecord.getOrderingValue(), HoodieRecord.HoodieRecordType.SPARK);
}

Schema schema = getSchemaFromBufferRecord(bufferedRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -64,13 +65,11 @@
* This implementation does not rely on a specific engine and can be used in any JVM environment as a result.
*/
public class HoodieAvroReaderContext extends HoodieReaderContext<IndexedRecord> {
private final String payloadClass;

public HoodieAvroReaderContext(
StorageConfiguration<?> storageConfiguration,
HoodieTableConfig tableConfig) {
super(storageConfiguration, tableConfig);
this.payloadClass = tableConfig.getPayloadClass();
}

@Override
Expand Down Expand Up @@ -133,14 +132,10 @@ public String getMetaFieldValue(IndexedRecord record, int pos) {

@Override
public HoodieRecord<IndexedRecord> constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
if (bufferedRecord.isDelete()) {
return SpillableMapUtils.generateEmptyPayload(
bufferedRecord.getRecordKey(),
partitionPath,
bufferedRecord.getOrderingValue(),
payloadClass);
return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE, bufferedRecord.getOrderingValue(), HoodieRecord.HoodieRecordType.AVRO);
}
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
return new HoodieAvroIndexedRecord(hoodieKey, bufferedRecord.getRecord());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
Expand All @@ -34,11 +33,8 @@
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -50,8 +46,7 @@
*
* @param <T> type of engine-specific record representation.
*/
public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T>
implements Iterable<BufferedRecord<T>>, Closeable {
public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(HoodieMergedLogRecordReader.class);
// A timer for calculating elapsed time in millis
public final HoodieTimer timer = HoodieTimer.create();
Expand Down Expand Up @@ -165,15 +160,6 @@ private void performScan() {
LOG.info("Number of entries in Map => {}", recordBuffer.size());
}

@Override
public Iterator<BufferedRecord<T>> iterator() {
return recordBuffer.getLogRecordIterator();
}

public Map<Serializable, BufferedRecord<T>> getRecords() {
return recordBuffer.getLogRecords();
}

public long getNumMergedRecordsInLog() {
return numMergedRecordsInLog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
import java.util.Properties;

import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
Expand All @@ -39,31 +40,46 @@
* @param <T> The type of the engine specific row.
*/
public class BufferedRecord<T> implements Serializable {
// the key of the record
private final String recordKey;
// the ordering value of the record to be used for event time based ordering
// this is in the engine specific type to ensure compatibility between log and base file data
private final Comparable orderingValue;
// the record itself in the engine specific type
private T record;
// the schema id from the reader context schema cache
private final Integer schemaId;
// whether this is record represents a deletion for the key
private final boolean isDelete;

private BufferedRecord(String recordKey, Comparable orderingValue, T record, Integer schemaId, boolean isDelete) {
BufferedRecord(String recordKey, Comparable orderingValue, T record, Integer schemaId, boolean isDelete) {
this.recordKey = recordKey;
this.orderingValue = orderingValue;
this.record = record;
this.schemaId = schemaId;
this.isDelete = isDelete;
}

public static <T> BufferedRecord<T> forConvertedRecord(T convertedData, HoodieRecord<?> originalRecord, Schema schema, HoodieReaderContext<T> readerContext, Properties props) {
return forRecordWithContext(originalRecord, convertedData, schema, readerContext, props);
}

public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T> record, Schema schema, HoodieReaderContext<T> readerContext, Properties props) {
return forRecordWithContext(record, record.getData(), schema, readerContext, props);
}

private static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<?> record, T data, Schema schema, HoodieReaderContext<T> readerContext, Properties props) {
HoodieKey hoodieKey = record.getKey();
String recordKey = hoodieKey == null ? readerContext.getRecordKey(record.getData(), schema) : hoodieKey.getRecordKey();
String recordKey = hoodieKey == null ? readerContext.getRecordKey(data, schema) : hoodieKey.getRecordKey();
Integer schemaId = readerContext.encodeAvroSchema(schema);
boolean isDelete;
try {
isDelete = record.isDelete(schema, props);
} catch (IOException e) {
throw new HoodieException("Failed to get isDelete from record.", e);
}
return new BufferedRecord<>(recordKey, record.getOrderingValue(schema, props), record.getData(), schemaId, isDelete);
Comparable<?> orderingValue = record.getOrderingValue(schema, props);
return new BufferedRecord<>(recordKey, orderingValue, data, schemaId, isDelete);
}

public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema schema, HoodieReaderContext<T> readerContext, Option<String> orderingFieldName, boolean isDelete) {
Expand All @@ -73,7 +89,9 @@ public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema schema
return new BufferedRecord<>(recordKey, orderingValue, record, schemaId, isDelete);
}

public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord deleteRecord, Comparable orderingValue) {
public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord deleteRecord, HoodieReaderContext<T> readerContext) {
Comparable orderingValue = deleteRecord.getOrderingValue() == null || deleteRecord.getOrderingValue().equals(DEFAULT_ORDERING_VALUE) ? DEFAULT_ORDERING_VALUE :
readerContext.convertValueToEngineType(deleteRecord.getOrderingValue());
return new BufferedRecord<>(deleteRecord.getRecordKey(), orderingValue, null, null, true);
}

Expand All @@ -98,7 +116,7 @@ public boolean isDelete() {
}

public boolean isCommitTimeOrderingDelete() {
return isDelete && getOrderingValue().equals(DEFAULT_ORDERING_VALUE);
return isDelete && Objects.equals(getOrderingValue(), DEFAULT_ORDERING_VALUE);
}

public BufferedRecord<T> toBinary(HoodieReaderContext<T> readerContext) {
Expand All @@ -107,4 +125,19 @@ record = readerContext.seal(readerContext.toBinaryRow(readerContext.getSchemaFro
}
return this;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
BufferedRecord<?> that = (BufferedRecord<?>) o;
return isDelete == that.isDelete && Objects.equals(recordKey, that.recordKey) && Objects.equals(orderingValue, that.orderingValue)
&& Objects.equals(record, that.record) && Objects.equals(schemaId, that.schemaId);
}

@Override
public int hashCode() {
return Objects.hash(recordKey, orderingValue, record, schemaId, isDelete);
}
}
Loading
Loading