Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> {
Expand All @@ -48,11 +51,15 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;

private final Set<String> deltaRecordKeys;
private Iterator<String> deltaItr;

public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job,
RecordReader<NullWritable, ArrayWritable> realReader) throws IOException {
super(split, job);
this.parquetReader = realReader;
this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
}

/**
Expand Down Expand Up @@ -89,72 +96,71 @@ private Option<GenericRecord> buildGenericRecordwithCustomPayload(HoodieRecord r
public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable
// with a new block of values
boolean result = this.parquetReader.next(aVoid, arrayWritable);
if (!result) {
// if the result is false, then there are no more records
return false;
}
if (!deltaRecordMap.isEmpty()) {
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which
// would be true until we have a way to index logs too)
// return from delta records map if we have some match.
String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
if (deltaRecordMap.containsKey(key)) {
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
// deltaRecord may not be a full record and needs values of columns from the parquet
Option<GenericRecord> rec;
rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
// If the record is not present, this is a delete record using an empty payload so skip this base record
// and move to the next record
while (!rec.isPresent()) {
// if current parquet reader has no record, return false
if (!this.parquetReader.next(aVoid, arrayWritable)) {
return false;
while (this.parquetReader.next(aVoid, arrayWritable)) {
if (!deltaRecordMap.isEmpty()) {
String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
if (deltaRecordMap.containsKey(key)) {
// mark the key as handled
this.deltaRecordKeys.remove(key);
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
// deltaRecord may not be a full record and needs values of columns from the parquet
Option<GenericRecord> rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
// If the record is not present, this is a delete record using an empty payload so skip this base record
// and move to the next record
if (!rec.isPresent()) {
continue;
}
String tempKey = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
if (deltaRecordMap.containsKey(tempKey)) {
rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey));
} else {
// need to return true, since now log file does not contain tempKey, but parquet file contains tempKey
return true;
}
}
if (!rec.isPresent()) {
return false;
}
GenericRecord recordToReturn = rec.get();
if (usesCustomPayload) {
// If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
// the writerSchema with only the projection fields
recordToReturn = HoodieAvroUtils.rewriteRecord(rec.get(), getReaderSchema());
}
// we assume, a later safe record in the log, is newer than what we have in the map &
// replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
// schema, we use writerSchema to create the arrayWritable from the latest generic record
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema());
Writable[] replaceValue = aWritable.get();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable),
HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
}
Writable[] originalValue = arrayWritable.get();
try {
// Sometime originalValue.length > replaceValue.length.
// This can happen when hive query is looking for pseudo parquet columns like BLOCK_OFFSET_INSIDE_FILE
System.arraycopy(replaceValue, 0, originalValue, 0,
Math.min(originalValue.length, replaceValue.length));
arrayWritable.set(originalValue);
} catch (RuntimeException re) {
LOG.error("Got exception when doing array copy", re);
LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)
+ " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
throw new RuntimeException(errMsg, re);
setUpWritable(rec, arrayWritable, key);
return true;
}
}
return true;
}
if (this.deltaItr == null) {
this.deltaItr = this.deltaRecordKeys.iterator();
}
while (this.deltaItr.hasNext()) {
final String key = this.deltaItr.next();
Option<GenericRecord> rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
if (rec.isPresent()) {
setUpWritable(rec, arrayWritable, key);
return true;
}
}
return false;
}

private void setUpWritable(Option<GenericRecord> rec, ArrayWritable arrayWritable, String key) {
GenericRecord recordToReturn = rec.get();
if (usesCustomPayload) {
// If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
// the writerSchema with only the projection fields
recordToReturn = HoodieAvroUtils.rewriteRecord(rec.get(), getReaderSchema());
}
// we assume, a later safe record in the log, is newer than what we have in the map &
// replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
// schema, we use writerSchema to create the arrayWritable from the latest generic record
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema());
Writable[] replaceValue = aWritable.get();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable),
HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
}
Writable[] originalValue = arrayWritable.get();
try {
// Sometime originalValue.length > replaceValue.length.
// This can happen when hive query is looking for pseudo parquet columns like BLOCK_OFFSET_INSIDE_FILE
System.arraycopy(replaceValue, 0, originalValue, 0,
Math.min(originalValue.length, replaceValue.length));
arrayWritable.set(originalValue);
} catch (RuntimeException re) {
LOG.error("Got exception when doing array copy", re);
LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)
+ " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
throw new RuntimeException(errMsg, re);
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void testReader(boolean partitioned) throws Exception {
} else {
writer =
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant,
instantTime, 100, 0, logVersion);
instantTime, 120, 0, logVersion);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why this was needed? This test was already adding inserts into logs? and now with this change we can actually read this.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added 20 more records because the first 100 log records was expected to merge with the initial 100 parquet records, so after merge, we should read out 120 records actually.

}
long size = writer.getCurrentSize();
writer.close();
Expand All @@ -182,17 +182,21 @@ public void testReader(boolean partitioned) throws Exception {

// use reader to read base Parquet File and log file, merge in flight and return latest commit
// here all 100 records should be updated, see above
// another 20 new insert records should also output with new commit time.
NullWritable key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
int recordCnt = 0;
while (recordReader.next(key, value)) {
Writable[] values = value.get();
// check if the record written is with latest commit, here "101"
assertEquals(latestInstant, values[0].toString());
key = recordReader.createKey();
value = recordReader.createValue();
recordCnt++;
}
recordReader.getPos();
assertEquals(1.0, recordReader.getProgress(), 0.05);
assertEquals(120, recordCnt);
recordReader.close();
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
Expand Down