Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public String showArchivedCommits(
for (FileStatus fs : fsStatuses) {
// read the archived file
Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema(),
HoodieCLI.getTableMetaClient().getTableConfig().getRecordKeyFieldProp());

List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks
Expand Down Expand Up @@ -149,7 +150,8 @@ public String showCommits(
for (FileStatus fs : fsStatuses) {
// read the archived file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema(),
metaClient.getTableConfig().getRecordKeyFieldProp());

List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSe
for (FileStatus fs : statuses) {
// read the archived file
Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(HoodieCLI.getTableMetaClient().getBasePath(), HoodieCLI.conf),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema(),
HoodieCLI.getTableMetaClient().getTableConfig().getRecordKeyFieldProp());

// read the avro blocks
while (reader.hasNext() && copyCount < limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ public String showLogFileCommits(

for (String logFilePath : logFilePaths) {
FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath));
final String keyField = HoodieCLI.getTableMetaClient().getTableConfig().getRecordKeyFieldProp();
Schema writerSchema = new AvroSchemaConverter()
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePath))));
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePath), keyField)));
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema, keyField);

// read the avro blocks
while (reader.hasNext()) {
Expand Down Expand Up @@ -186,8 +187,10 @@ public String showLogFileRecords(
// TODO : readerSchema can change across blocks/log files, fix this inside Scanner
AvroSchemaConverter converter = new AvroSchemaConverter();
// get schema from last log file
final String keyField = client.getTableConfig().getRecordKeyFieldProp();
Schema readerSchema =
converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1)))));
converter.convert(Objects.requireNonNull(
TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1)), keyField)));

List<IndexedRecord> allRecords = new ArrayList<>();

Expand Down Expand Up @@ -224,9 +227,10 @@ public String showLogFileRecords(
} else {
for (String logFile : logFilePaths) {
Schema writerSchema = new AvroSchemaConverter()
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new Path(logFile))));
.convert(Objects.requireNonNull(
TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new Path(logFile), keyField)));
HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema);
HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema, keyField);
// read the avro blocks
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,8 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header)
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString());
List<HoodieLogBlock> blocks = new ArrayList<>(2);
if (recordList.size() > 0) {
if (config.populateMetaFields()) {
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
} else {
final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header, keyField));
}
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header,
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp()));
}
if (keysToDelete.size() > 0) {
blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) {

long oldNumWrites = 0;
try {
HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath,
Option.of(hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp()));
oldNumWrites = reader.getTotalRecords();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to check for merge data validation", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
Expand Down Expand Up @@ -64,6 +65,6 @@ protected HoodieBaseFile getLatestDataFile() {

protected HoodieFileReader createNewFileReader() throws IOException {
return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),
new Path(getLatestDataFile().getPath()));
new Path(getLatestDataFile().getPath()), Option.of(hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFi
config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(),
PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR);

return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields());
final String recordKey = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
final Schema.Field schemaRecordKeyField = schema.getField(recordKey);
return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, schemaRecordKeyField,
taskContextSupplier, config.populateMetaFields());
}

private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newOrcFileWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.io.Writable;
import org.apache.hudi.common.util.ValidationUtils;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -63,6 +64,8 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
private final String instantTime;
private final TaskContextSupplier taskContextSupplier;
private final boolean populateMetaFields;
private final Schema schema;
private final Schema.Field schemaRecordKeyField;
private HFile.Writer writer;
private String minRecordKey;
private String maxRecordKey;
Expand All @@ -71,12 +74,15 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
private static String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction";

public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileConfig, Schema schema,
TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException {

Schema.Field schemaRecordKeyField, TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
ValidationUtils.checkArgument(schemaRecordKeyField != null, "Unknown record key field in the schema!");
Configuration conf = FSUtils.registerFileSystem(file, hfileConfig.getHadoopConf());
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
this.hfileConfig = hfileConfig;
this.schemaRecordKeyField = schemaRecordKeyField;
this.schema = schema;

// TODO - compute this compression ratio dynamically by looking at the bytes written to the
// stream and the actual file size reported by HDFS
Expand Down Expand Up @@ -122,7 +128,10 @@ public boolean canWrite() {

@Override
public void writeAvro(String recordKey, IndexedRecord object) throws IOException {
byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord)object);
byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord) object);
GenericRecord recordKeyExcludedRecord = HoodieAvroUtils.bytesToAvro(value, this.schema);
recordKeyExcludedRecord.put(this.schemaRecordKeyField.pos(), "");
value = HoodieAvroUtils.avroToBytes(recordKeyExcludedRecord);
KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, value);
writer.append(kv);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieMergeHandle;
Expand Down Expand Up @@ -94,7 +95,8 @@ protected Iterator<GenericRecord> getMergingIterator(HoodieTable<T, I, K, O> tab
Schema readSchema, boolean externalSchemaTransformation) throws IOException {
Path externalFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> bootstrapReader = HoodieFileReaderFactory.<GenericRecord>getFileReader(bootstrapFileConfig, externalFilePath);
HoodieFileReader<GenericRecord> bootstrapReader = HoodieFileReaderFactory.<GenericRecord>getFileReader(
bootstrapFileConfig, externalFilePath, Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp()));
Schema bootstrapReadSchema;
if (externalSchemaTransformation) {
bootstrapReadSchema = bootstrapReader.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
}).map(Arguments::of);
}

private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean populateMetaFields) throws Exception {
private HoodieHFileWriter createHFileWriter(Schema avroSchema, Schema.Field keyField, boolean populateMetaFields) throws Exception {
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name());
Configuration conf = new Configuration();
TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class);
Expand All @@ -104,27 +104,30 @@ private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean populateM

HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024,
PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR);
return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier, populateMetaFields);
return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, keyField, mockTaskContextSupplier, populateMetaFields);
}

@ParameterizedTest
@MethodSource("populateMetaFieldsAndTestAvroWithMeta")
public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithMeta) throws Exception {
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc");
HoodieHFileWriter writer = createHFileWriter(avroSchema, populateMetaFields);
final Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc");
final String keyField = "_row_key";

HoodieHFileWriter writer = createHFileWriter(avroSchema, avroSchema.getField(keyField), populateMetaFields);
List<String> keys = new ArrayList<>();
Map<String, GenericRecord> recordMap = new HashMap<>();
for (int i = 0; i < 100; i++) {
GenericRecord record = new GenericData.Record(avroSchema);
String key = String.format("%s%04d", "key", i);
record.put("_row_key", key);
record.put(keyField, key);
keys.add(key);
record.put("time", Integer.toString(RANDOM.nextInt()));
record.put("number", i);
if (testAvroWithMeta) {
writer.writeAvroWithMetadata(record, new HoodieRecord(new HoodieKey((String) record.get("_row_key"),
Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed in is what matters
// payload does not matter. GenericRecord passed in is what matters
// only HoodieKey will be looked up from the 2nd arg(HoodieRecord).
writer.writeAvroWithMetadata(record, new HoodieRecord(new HoodieKey((String) record.get(keyField),
Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload()));
} else {
writer.writeAvro(key, record);
}
Expand All @@ -134,7 +137,7 @@ public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithM

Configuration conf = new Configuration();
CacheConfig cacheConfig = new CacheConfig(conf);
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf));
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf), keyField);
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())));
hoodieHFileReader.close();
Expand All @@ -144,7 +147,7 @@ public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithM
Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
List<String> rowsList = new ArrayList<>(rowsToFetch);
Collections.sort(rowsList);
hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf));
hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf), keyField);
List<Pair<String, GenericRecord>> result = hoodieHFileReader.readRecords(rowsList);
assertEquals(result.size(), randomRowstoFetch);
result.forEach(entry -> {
Expand Down
Loading