diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index 102fcc2ae7a63..c37b111c67dc9 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -74,7 +74,8 @@ public String showArchivedCommits( for (FileStatus fs : fsStatuses) { // read the archived file Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(basePath, HoodieCLI.conf), - new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema(), + HoodieCLI.getTableMetaClient().getTableConfig().getRecordKeyFieldProp()); List readRecords = new ArrayList<>(); // read the avro blocks @@ -149,7 +150,8 @@ public String showCommits( for (FileStatus fs : fsStatuses) { // read the archived file HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(basePath, HoodieCLI.conf), - new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema(), + metaClient.getTableConfig().getRecordKeyFieldProp()); List readRecords = new ArrayList<>(); // read the avro blocks diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java index 119ccb0dcf039..d9c0104203ef7 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java @@ -117,7 +117,8 @@ private int copyArchivedInstants(List statuses, Set actionSe for (FileStatus fs : statuses) { // read the archived file Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(HoodieCLI.getTableMetaClient().getBasePath(), HoodieCLI.conf), - new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema(), + HoodieCLI.getTableMetaClient().getTableConfig().getRecordKeyFieldProp()); // read the avro blocks while (reader.hasNext() && copyCount < limit) { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 27bcd81faefec..2ebe38f10124f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -92,9 +92,10 @@ public String showLogFileCommits( for (String logFilePath : logFilePaths) { FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath)); + final String keyField = HoodieCLI.getTableMetaClient().getTableConfig().getRecordKeyFieldProp(); Schema writerSchema = new AvroSchemaConverter() - .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePath)))); - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); + .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePath), keyField))); + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema, keyField); // read the avro blocks while (reader.hasNext()) { @@ -186,8 +187,10 @@ public String showLogFileRecords( // TODO : readerSchema can change across blocks/log files, fix this inside Scanner AvroSchemaConverter converter = new AvroSchemaConverter(); // get schema from last log file + final String keyField = client.getTableConfig().getRecordKeyFieldProp(); Schema readerSchema = - converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1))))); + converter.convert(Objects.requireNonNull( + TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1)), keyField))); List allRecords = new ArrayList<>(); @@ -224,9 +227,10 @@ public String showLogFileRecords( } else { for (String logFile : logFilePaths) { Schema writerSchema = new AvroSchemaConverter() - .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new Path(logFile)))); + .convert(Objects.requireNonNull( + TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new Path(logFile), keyField))); HoodieLogFormat.Reader reader = - HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema); + HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema, keyField); // read the avro blocks while (reader.hasNext()) { HoodieLogBlock n = reader.next(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 7cc0c5dfa4656..87dc84732f69f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -360,12 +360,8 @@ protected void appendDataAndDeleteBlocks(Map header) header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString()); List blocks = new ArrayList<>(2); if (recordList.size() > 0) { - if (config.populateMetaFields()) { - blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header)); - } else { - final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); - blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header, keyField)); - } + blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header, + hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp())); } if (keysToDelete.size() > 0) { blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index d1d67efff4b96..753ff1d0be80f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -420,7 +420,8 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) { long oldNumWrites = 0; try { - HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath); + HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath, + Option.of(hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp())); oldNumWrites = reader.getTotalRecords(); } catch (IOException e) { throw new HoodieUpsertException("Failed to check for merge data validation", e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java index a771c33c40661..f39d17f5ac0b4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieFileReader; @@ -64,6 +65,6 @@ protected HoodieBaseFile getLatestDataFile() { protected HoodieFileReader createNewFileReader() throws IOException { return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), - new Path(getLatestDataFile().getPath())); + new Path(getLatestDataFile().getPath()), Option.of(hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp())); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 0b6afd4d28b92..9ae52f6c1e9b2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -89,7 +89,10 @@ private static HoodieFi config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(), PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); - return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields()); + final String recordKey = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); + final Schema.Field schemaRecordKeyField = schema.getField(recordKey); + return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, schemaRecordKeyField, + taskContextSupplier, config.populateMetaFields()); } private static HoodieFileWriter newOrcFileWriter( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index a719bcb8f334f..78aa8f888cede 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.io.Writable; +import org.apache.hudi.common.util.ValidationUtils; import java.io.DataInput; import java.io.DataOutput; @@ -63,6 +64,8 @@ public class HoodieHFileWriter getMergingIterator(HoodieTable tab Schema readSchema, boolean externalSchemaTransformation) throws IOException { Path externalFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath()); Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf()); - HoodieFileReader bootstrapReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, externalFilePath); + HoodieFileReader bootstrapReader = HoodieFileReaderFactory.getFileReader( + bootstrapFileConfig, externalFilePath, Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp())); Schema bootstrapReadSchema; if (externalSchemaTransformation) { bootstrapReadSchema = bootstrapReader.getSchema(); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java index 86a0886de664d..a5502761ef7b3 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -93,7 +93,7 @@ private static Stream populateMetaFieldsAndTestAvroWithMeta() { }).map(Arguments::of); } - private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean populateMetaFields) throws Exception { + private HoodieHFileWriter createHFileWriter(Schema avroSchema, Schema.Field keyField, boolean populateMetaFields) throws Exception { BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name()); Configuration conf = new Configuration(); TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class); @@ -104,27 +104,30 @@ private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean populateM HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); - return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier, populateMetaFields); + return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, keyField, mockTaskContextSupplier, populateMetaFields); } @ParameterizedTest @MethodSource("populateMetaFieldsAndTestAvroWithMeta") public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithMeta) throws Exception { - Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc"); - HoodieHFileWriter writer = createHFileWriter(avroSchema, populateMetaFields); + final Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc"); + final String keyField = "_row_key"; + + HoodieHFileWriter writer = createHFileWriter(avroSchema, avroSchema.getField(keyField), populateMetaFields); List keys = new ArrayList<>(); Map recordMap = new HashMap<>(); for (int i = 0; i < 100; i++) { GenericRecord record = new GenericData.Record(avroSchema); String key = String.format("%s%04d", "key", i); - record.put("_row_key", key); + record.put(keyField, key); keys.add(key); record.put("time", Integer.toString(RANDOM.nextInt())); record.put("number", i); if (testAvroWithMeta) { - writer.writeAvroWithMetadata(record, new HoodieRecord(new HoodieKey((String) record.get("_row_key"), - Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed in is what matters + // payload does not matter. GenericRecord passed in is what matters // only HoodieKey will be looked up from the 2nd arg(HoodieRecord). + writer.writeAvroWithMetadata(record, new HoodieRecord(new HoodieKey((String) record.get(keyField), + Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload())); } else { writer.writeAvro(key, record); } @@ -134,7 +137,7 @@ public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithM Configuration conf = new Configuration(); CacheConfig cacheConfig = new CacheConfig(conf); - HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf)); + HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf), keyField); List> records = hoodieHFileReader.readAllRecords(); records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()))); hoodieHFileReader.close(); @@ -144,7 +147,7 @@ public void testWriteReadHFile(boolean populateMetaFields, boolean testAvroWithM Set rowsToFetch = getRandomKeys(randomRowstoFetch, keys); List rowsList = new ArrayList<>(rowsToFetch); Collections.sort(rowsList); - hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf)); + hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, filePath.getFileSystem(conf), keyField); List> result = hoodieHFileReader.readRecords(rowsList); assertEquals(result.size(), randomRowstoFetch); result.forEach(entry -> { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java index 68143a215c51c..14a974e0bf555 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; @@ -79,10 +80,11 @@ private HoodieOrcWriter createOrcWriter(Schema avroSchema) throws Exception { @Test public void testWriteReadMetadata() throws Exception { Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); + final String keyField = "_row_key"; HoodieOrcWriter writer = createOrcWriter(avroSchema); for (int i = 0; i < 3; i++) { GenericRecord record = new GenericData.Record(avroSchema); - record.put("_row_key", "key" + i); + record.put(keyField, "key" + i); record.put("time", Integer.toString(i)); record.put("number", i); writer.writeAvro("key" + i, record); @@ -98,7 +100,7 @@ public void testWriteReadMetadata() throws Exception { assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY)); assertEquals(CompressionKind.ZLIB.name(), orcReader.getCompressionKind().toString()); - HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath, Option.of(keyField)); BloomFilter filter = hoodieReader.readBloomFilter(); for (int i = 0; i < 3; i++) { assertTrue(filter.mightContain("key" + i)); @@ -114,10 +116,11 @@ public void testWriteReadMetadata() throws Exception { @Test public void testWriteReadPrimitiveRecord() throws Exception { Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); + final String keyField = "_row_key"; HoodieOrcWriter writer = createOrcWriter(avroSchema); for (int i = 0; i < 3; i++) { GenericRecord record = new GenericData.Record(avroSchema); - record.put("_row_key", "key" + i); + record.put(keyField, "key" + i); record.put("time", Integer.toString(i)); record.put("number", i); writer.writeAvro("key" + i, record); @@ -129,7 +132,7 @@ public void testWriteReadPrimitiveRecord() throws Exception { assertEquals("struct<_row_key:string,time:string,number:int>", orcReader.getSchema().toString()); assertEquals(3, orcReader.getNumberOfRows()); - HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath, Option.of(keyField)); Iterator iter = hoodieReader.getRecordIterator(); int index = 0; while (iter.hasNext()) { @@ -145,10 +148,11 @@ public void testWriteReadPrimitiveRecord() throws Exception { public void testWriteReadComplexRecord() throws Exception { Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithUDT.avsc"); Schema udtSchema = avroSchema.getField("driver").schema().getTypes().get(1); + final String keyField = "_row_key"; HoodieOrcWriter writer = createOrcWriter(avroSchema); for (int i = 0; i < 3; i++) { GenericRecord record = new GenericData.Record(avroSchema); - record.put("_row_key", "key" + i); + record.put(keyField, "key" + i); record.put("time", Integer.toString(i)); record.put("number", i); GenericRecord innerRecord = new GenericData.Record(udtSchema); @@ -166,7 +170,7 @@ public void testWriteReadComplexRecord() throws Exception { reader.getSchema().toString()); assertEquals(3, reader.getNumberOfRows()); - HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath, Option.of(keyField)); Iterator iter = hoodieReader.getRecordIterator(); int index = 0; while (iter.hasNext()) { @@ -186,10 +190,11 @@ public void testWriteReadComplexRecord() throws Exception { @Test public void testWriteReadWithEvolvedSchema() throws Exception { Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); + final String keyField = "_row_key"; HoodieOrcWriter writer = createOrcWriter(avroSchema); for (int i = 0; i < 3; i++) { GenericRecord record = new GenericData.Record(avroSchema); - record.put("_row_key", "key" + i); + record.put(keyField, "key" + i); record.put("time", Integer.toString(i)); record.put("number", i); writer.writeAvro("key" + i, record); @@ -197,7 +202,7 @@ public void testWriteReadWithEvolvedSchema() throws Exception { writer.close(); Configuration conf = new Configuration(); - HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + HoodieFileReader hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath, Option.of(keyField)); Schema evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchema.avsc"); Iterator iter = hoodieReader.getRecordIterator(evolvedSchema); int index = 0; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index 5ed6d5d529ba3..f9a986936c07e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -69,7 +69,8 @@ public void runMerge(HoodieTable>, List, List final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation(); HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { - readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); + readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), + mergeHandle.getOldFilePath(), Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp())).getSchema(); gWriter = new GenericDatumWriter<>(readSchema); gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields()); } else { @@ -80,7 +81,8 @@ public void runMerge(HoodieTable>, List, List BoundedInMemoryExecutor wrapper = null; Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf()); - HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); + HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, + mergeHandle.getOldFilePath(), Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp())); try { final Iterator readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index c830925419b57..38919b6769442 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -190,7 +190,8 @@ private List> readRecordsForGroupWithLogs(List baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() - : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))); + : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), + new Path(clusteringOp.getDataFilePath()), Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp()))); HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); Iterator> fileSliceReader = getFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPayloadClass(), @@ -214,7 +215,9 @@ private List> readRecordsForGroupBaseFiles(List { try { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader( + getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()), + Option.of(getHoodieTable().getMetaClient().getTableConfig().getRecordKeyFieldProp())); Iterator recordIterator = baseFileReader.getRecordIterator(readerSchema); recordIterator.forEachRemaining(record -> records.add(transform(record))); } catch (IOException e) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java index a55121472310d..3c1e44e471a0a 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java @@ -70,7 +70,8 @@ public void runMerge(HoodieTable>, List, List final GenericDatumReader gReader; Schema readSchema; if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { - readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); + readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), + mergeHandle.getOldFilePath(), Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp())).getSchema(); gWriter = new GenericDatumWriter<>(readSchema); gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields()); } else { @@ -80,7 +81,8 @@ public void runMerge(HoodieTable>, List, List } BoundedInMemoryExecutor wrapper = null; - HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); + HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, + mergeHandle.getOldFilePath(), Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp())); try { final Iterator readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 4824c757cd9df..2e39d5f37c08b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -209,7 +209,8 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() - : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))); + : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), + new Path(clusteringOp.getDataFilePath()), Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp()))); HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPayloadClass(), @@ -236,7 +237,9 @@ private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext j clusteringOpsPartition.forEachRemaining(clusteringOp -> { try { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader( + getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()), + Option.of(getHoodieTable().getMetaClient().getTableConfig().getRecordKeyFieldProp())); iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); } catch (IOException e) { throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java index 98bf9151fc9ef..7136f5fec101f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -150,7 +150,8 @@ private Iterator> readRecordsForGroupBaseFiles(List indexedRecords = () -> { try { - return HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())).getRecordIterator(readerSchema); + return HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()), + Option.of(getHoodieTable().getMetaClient().getTableConfig().getRecordKeyFieldProp())).getRecordIterator(readerSchema); } catch (IOException e) { throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java index 5e82dbd8c566d..6c3970734bfd5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; @@ -69,7 +70,8 @@ public void runMerge(HoodieTable>, JavaRDD final GenericDatumReader gReader; Schema readSchema; if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { - readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); + readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath(), + Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp())).getSchema(); gWriter = new GenericDatumWriter<>(readSchema); gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields()); } else { @@ -79,7 +81,8 @@ public void runMerge(HoodieTable>, JavaRDD } BoundedInMemoryExecutor wrapper = null; - HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); + HoodieFileReader reader = HoodieFileReaderFactory.getFileReader( + cfgForHoodieFile, mergeHandle.getOldFilePath(), Option.of(table.getMetaClient().getTableConfig().getRecordKeyFieldProp())); try { final Iterator readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 392d687349e76..648e35c667abf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,6 +18,8 @@ package org.apache.hudi.client.functional; +import org.apache.avro.Schema; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -38,12 +40,18 @@ import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -59,6 +67,7 @@ import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -73,6 +82,7 @@ import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader; import org.apache.hudi.metadata.HoodieMetadataMetrics; import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -96,6 +106,8 @@ import org.apache.hadoop.util.Time; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; @@ -137,6 +149,7 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -374,7 +387,6 @@ public void testMetadataTableServices() throws Exception { assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001"); } - /** * Tests that virtual key configs are honored in base files after compaction in metadata table. * @@ -413,7 +425,7 @@ public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws Except List fileSlices = table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList()); HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()), - new CacheConfig(context.getHadoopConf().get())); + new CacheConfig(context.getHadoopConf().get()), HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); List> records = hoodieHFileReader.readAllRecords(); records.forEach(entry -> { if (populateMetaFields) { @@ -507,6 +519,258 @@ public void testMetadataTableWithPendingCompaction(boolean simulateFailedCompact } } + /** + * Test arguments - Table type, populate meta fields, exclude key from payload. + */ + public static List testMetadataRecordKeyExcludeFromPayloadArgs() { + return asList( + Arguments.of(COPY_ON_WRITE, true), + Arguments.of(COPY_ON_WRITE, false), + Arguments.of(MERGE_ON_READ, true), + Arguments.of(MERGE_ON_READ, false) + ); + } + + /** + * 1. Verify metadata table records key deduplication feature. When record key + * deduplication is enabled, verify the metadata record payload on disk has empty key. + * Otherwise, verify the valid key. + * 2. Verify populate meta fields work irrespective of record key deduplication config. + * 3. Verify table services like compaction benefit from record key deduplication feature. + */ + @ParameterizedTest + @MethodSource("testMetadataRecordKeyExcludeFromPayloadArgs") + public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType tableType, final boolean enableMetaFields) throws Exception { + initPath(); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .withPopulateMetaFields(enableMetaFields) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .build()) + .build(); + init(tableType, writeConfig); + + // 2nd commit + doWriteOperation(testTable, "0000001", INSERT); + + final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder() + .setConf(hadoopConf) + .setBasePath(metadataTableBasePath) + .build(); + HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig); + metadataMetaClient.reloadActiveTimeline(); + final HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig, context, metadataMetaClient); + + // Compaction has not yet kicked in. Verify all the log files + // for the metadata records persisted on disk as per the config. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000001", + enableMetaFields); + }, "Metadata table should have valid log files!"); + + // Verify no base file created yet. + assertThrows(IllegalStateException.class, () -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields); + }, "Metadata table should not have a base file yet!"); + + // 3rd commit + doWriteOperation(testTable, "0000002", UPSERT); + + // Compaction should be triggered by now. Let's verify the log files + // if any for the metadata records persisted on disk as per the config. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000002", + enableMetaFields); + }, "Metadata table should have valid log files!"); + + // Verify the base file created by the just completed compaction. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields); + }, "Metadata table should have a valid base file!"); + + // 3 more commits to trigger one more compaction, along with a clean + doWriteOperation(testTable, "0000004", UPSERT); + doWriteOperation(testTable, "0000005", UPSERT); + doClean(testTable, "0000006", Arrays.asList("0000004")); + doWriteOperation(testTable, "0000007", UPSERT); + + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "7", enableMetaFields); + }, "Metadata table should have valid log files!"); + + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields); + }, "Metadata table should have a valid base file!"); + + validateMetadata(testTable); + } + + /** + * Verify the metadata table log files for the record field correctness. On disk format + * should be based on meta fields and key deduplication config. And the in-memory merged + * records should all be materialized fully irrespective of the config. + * + * @param table - Hoodie metadata test table + * @param metadataMetaClient - Metadata meta client + * @param latestCommitTimestamp - Latest commit timestamp + * @param enableMetaFields - Enable meta fields for the table records + * @throws IOException + */ + private void verifyMetadataRecordKeyExcludeFromPayloadLogFiles(HoodieTable table, HoodieTableMetaClient metadataMetaClient, + String latestCommitTimestamp, + boolean enableMetaFields) throws IOException { + table.getHoodieView().sync(); + + // Compaction should not be triggered yet. Let's verify no base file + // and few log files available. + List fileSlices = table.getSliceView() + .getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); + if (fileSlices.isEmpty()) { + throw new IllegalStateException("LogFile slices are not available!"); + } + + // Verify the log files honor the key deduplication and virtual keys config + List logFiles = fileSlices.get(0).getLogFiles().map(logFile -> { + return logFile; + }).collect(Collectors.toList()); + + List logFilePaths = logFiles.stream().map(logFile -> { + return logFile.getPath().toString(); + }).collect(Collectors.toList()); + + // Verify the on-disk raw records before they get materialized + verifyMetadataRawRecords(table, logFiles, enableMetaFields); + + // Verify the in-memory materialized and merged records + verifyMetadataMergedRecords(metadataMetaClient, logFilePaths, latestCommitTimestamp, enableMetaFields); + } + + /** + * Verify the metadata table on-disk raw records. When populate meta fields is enabled, + * these records should have additional meta fields in the payload. When key deduplication + * is enabled, these records on the disk should have key in the payload as empty string. + * + * @param table - Hoodie test table + * @param logFiles - Metadata table log files to be verified + * @param enableMetaFields - Enable meta fields for records + * @throws IOException + */ + private void verifyMetadataRawRecords(HoodieTable table, List logFiles, boolean enableMetaFields) throws IOException { + for (HoodieLogFile logFile : logFiles) { + FileStatus[] fsStatus = fs.listStatus(logFile.getPath()); + MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(fs, logFile.getPath(), + table.getMetaClient().getTableConfig().getRecordKeyFieldProp()); + if (writerSchemaMsg == null) { + // not a data block + continue; + } + + Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg); + HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader( + fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema, + table.getMetaClient().getTableConfig().getRecordKeyFieldProp()); + + while (logFileReader.hasNext()) { + HoodieLogBlock logBlock = logFileReader.next(); + if (logBlock instanceof HoodieDataBlock) { + for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) { + final GenericRecord record = (GenericRecord) indexRecord; + if (enableMetaFields) { + // Metadata table records should have meta fields! + assertNotNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + } else { + // Metadata table records should not have meta fields! + assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + } + + final String key = String.valueOf(record.get(HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY)); + assertFalse(key.isEmpty()); + if (enableMetaFields) { + assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)))); + } + } + } + } + } + } + + /** + * Verify the metadata table in-memory merged records. Irrespective of key deduplication + * config, the in-memory merged records should always have the key field in the record + * payload fully materialized. + * + * @param metadataMetaClient - Metadata table meta client + * @param logFilePaths - Metadata table log file paths + * @param latestCommitTimestamp + * @param enableMetaFields - Enable meta fields + */ + private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClient, List logFilePaths, + String latestCommitTimestamp, boolean enableMetaFields) { + Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); + if (enableMetaFields) { + schema = HoodieAvroUtils.addMetadataFields(schema); + } + HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder() + .withFileSystem(metadataMetaClient.getFs()) + .withBasePath(metadataMetaClient.getBasePath()) + .withLogFilePaths(logFilePaths) + .withLatestInstantTime(latestCommitTimestamp) + .withPartition(MetadataPartitionType.FILES.partitionPath()) + .withReaderSchema(schema) + .withMaxMemorySizeInBytes(100000L) + .withBufferSize(4096) + .withSpillableMapBasePath(tempDir.toString()) + .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK) + .build(); + + assertDoesNotThrow(() -> { + logRecordReader.scan(); + }, "Metadata log records materialization failed"); + + for (Map.Entry> entry : logRecordReader.getRecords().entrySet()) { + assertFalse(entry.getKey().isEmpty()); + assertFalse(entry.getValue().getRecordKey().isEmpty()); + assertEquals(entry.getKey(), entry.getValue().getRecordKey()); + } + } + + /** + * Verify metadata table base files for the records persisted based on the config. When + * the key deduplication is enabled, the records persisted on the disk in the base file + * should have key field in the payload as empty string. + * + * @param table - Metadata table + * @param enableMetaFields - Enable meta fields + */ + private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable table, boolean enableMetaFields) throws IOException { + table.getHoodieView().sync(); + List fileSlices = table.getSliceView() + .getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); + if (!fileSlices.get(0).getBaseFile().isPresent()) { + throw new IllegalStateException("Base file not available!"); + } + final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); + + HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), + new Path(baseFile.getPath()), + new CacheConfig(context.getHadoopConf().get()), HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); + List> records = hoodieHFileReader.readAllRecords(); + records.forEach(entry -> { + if (enableMetaFields) { + assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } else { + assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } + + final String keyInPayload = (String) ((GenericRecord) entry.getSecond()) + .get(HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); + assertFalse(keyInPayload.isEmpty()); + }); + } + /** * Test rollback of various table operations sync to Metadata Table correctly. */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 56c9f016bcc6e..3d03937f8d5c2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -18,6 +18,7 @@ package org.apache.hudi.client.functional; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; @@ -42,8 +43,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.testutils.HoodieClientTestHarness; - -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.AfterEach; @@ -94,6 +93,20 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable); } + public void init(HoodieTableType tableType, HoodieWriteConfig writeConfig) throws IOException { + this.tableType = tableType; + initPath(); + initSparkContexts("TestHoodieMetadata"); + initFileSystem(); + fs.mkdirs(new Path(basePath)); + initTimelineService(); + initMetaClient(tableType); + initTestDataGenerator(); + metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); + this.writeConfig = writeConfig; + initWriteConfigAndMetatableWriter(writeConfig, writeConfig.isMetadataTableEnabled()); + } + protected void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) { this.writeConfig = writeConfig; if (enableMetadataTable) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 51791c945d589..21ba5f4dd2083 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -126,7 +126,7 @@ public final class HoodieMetadataConfig extends HoodieConfig { public static final ConfigProperty POPULATE_META_FIELDS = ConfigProperty .key(METADATA_PREFIX + ".populate.meta.fields") - .defaultValue(true) + .defaultValue(false) .sinceVersion("0.10.0") .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated."); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index d1a4d96637543..49591c24aabfc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -474,7 +474,7 @@ public MessageType readSchemaFromLastCompaction(Option lastCompac * @return */ public MessageType readSchemaFromLogFile(Path path) throws IOException { - return readSchemaFromLogFile(metaClient.getRawFs(), path); + return readSchemaFromLogFile(metaClient.getRawFs(), path, metaClient.getTableConfig().getRecordKeyFieldProp()); } /** @@ -497,8 +497,8 @@ public MessageType readSchemaFromLogFile(Option lastCompactionCom * * @return */ - public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException { - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); + public static MessageType readSchemaFromLogFile(FileSystem fs, Path path, String keyField) throws IOException { + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, keyField); HoodieDataBlock lastBlock = null; while (reader.hasNext()) { HoodieLogBlock block = reader.next(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index e6ead54a48d77..b402147d3eeb0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream; import org.apache.hudi.common.fs.TimedFSDataInputStream; import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieCorruptBlock; @@ -77,14 +76,13 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private transient Thread shutdownThread = null; public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily) throws IOException { - this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false); + boolean readBlockLazily, String keyField) throws IOException { + this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false, keyField); } public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader) throws IOException { - this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false, - HoodieRecord.RECORD_KEY_METADATA_FIELD); + boolean readBlockLazily, boolean reverseReader, String keyField) throws IOException { + this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false, keyField); } public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, @@ -104,10 +102,6 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc addShutDownHook(); } - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { - this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false); - } - /** * Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams. * @param fsDataInputStream original instance of {@link FSDataInputStream}. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index 569b4a23b683b..4e79b78912957 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -272,15 +272,15 @@ static WriterBuilder newWriterBuilder() { return new WriterBuilder(); } - static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) + static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, String keyField) throws IOException { - return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false); + return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, keyField); } static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, - boolean readBlockLazily, boolean reverseReader) throws IOException { + boolean readBlockLazily, boolean reverseReader, String keyField) throws IOException { return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, readBlockLazily, - reverseReader); + reverseReader, keyField); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java index c1b20cbb4c55c..4a7158fc24590 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java @@ -43,10 +43,10 @@ */ public class LogReaderUtils { - private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, HoodieLogFile hoodieLogFile) - throws IOException { + private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, + HoodieLogFile hoodieLogFile, String keyField) throws IOException { // set length for the HoodieLogFile as it will be leveraged by HoodieLogFormat.Reader with reverseReading enabled - Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true, true); + Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true, true, keyField); Schema writerSchema = null; HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); while (reader.hasPrev()) { @@ -74,7 +74,8 @@ public static Schema readLatestSchemaFromLogFiles(String basePath, List records, @Nonnull Map(), keyField); } - public HoodieHFileDataBlock(@Nonnull List records, @Nonnull Map header) { - this(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); - } - @Override public HoodieLogBlockType getBlockType() { return HoodieLogBlockType.HFILE_DATA_BLOCK; @@ -110,8 +105,8 @@ protected byte[] serializeRecords() throws IOException { boolean useIntegerKey = false; int key = 0; int keySize = 0; - Field keyField = records.get(0).getSchema().getField(this.keyField); - if (keyField == null) { + final Field schemaKeyField = records.get(0).getSchema().getField(this.keyField); + if (schemaKeyField == null) { // Missing key metadata field so we should use an integer sequence key useIntegerKey = true; keySize = (int) Math.ceil(Math.log(records.size())) + 1; @@ -122,9 +117,9 @@ protected byte[] serializeRecords() throws IOException { if (useIntegerKey) { recordKey = String.format("%" + keySize + "s", key++); } else { - recordKey = record.get(keyField.pos()).toString(); + recordKey = record.get(schemaKeyField.pos()).toString(); } - byte[] recordBytes = HoodieAvroUtils.indexedRecordToBytes(record); + final byte[] recordBytes = serializeRecord(record, Option.ofNullable(schemaKeyField)); ValidationUtils.checkState(!sortedRecordsMap.containsKey(recordKey), "Writing multiple records with same key not supported for " + this.getClass().getName()); sortedRecordsMap.put(recordKey, recordBytes); @@ -162,6 +157,20 @@ public List getRecords(List keys) throws IOException { return records; } + /** + * Serialize the record to byte buffer. + * + * @param record - Record to serialize + * @param schemaKeyField - Key field in the schema + * @return Serialized byte buffer for the record + */ + private byte[] serializeRecord(final IndexedRecord record, final Option schemaKeyField) { + if (schemaKeyField.isPresent()) { + record.put(schemaKeyField.get().pos(), ""); + } + return HoodieAvroUtils.indexedRecordToBytes(record); + } + private void readWithInlineFS(List keys) throws IOException { boolean enableFullScan = keys.isEmpty(); // Get schema from the header @@ -184,7 +193,7 @@ private void readWithInlineFS(List keys) throws IOException { // HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks. Collections.sort(keys); } - HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath, cacheConf, inlinePath.getFileSystem(inlineConf)); + HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath, cacheConf, inlinePath.getFileSystem(inlineConf), this.keyField); List> logRecords = enableFullScan ? reader.readAllRecords(writerSchema, schema) : reader.readRecords(keys, schema); reader.close(); @@ -202,7 +211,7 @@ protected void deserializeRecords() throws IOException { } // Read the content - HoodieHFileReader reader = new HoodieHFileReader<>(getContent().get()); + HoodieHFileReader reader = new HoodieHFileReader<>(getContent().get(), this.keyField); List> records = reader.readAllRecords(writerSchema, schema); this.records = records.stream().map(t -> t.getSecond()).collect(Collectors.toList()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index e82819e73e1d9..c2da669c10c72 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -222,7 +222,8 @@ private List loadInstants(TimeRangeFilter filter, boolean loadIns for (FileStatus fs : fsStatuses) { // Read the archived file try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(), - new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) { + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema(), + metaClient.getTableConfig().getRecordKeyFieldProp())) { int instantsInPreviousFile = instantsInRange.size(); // Read the avro blocks while (reader.hasNext()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java index f913df7e152a9..2088fa7846fb7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java @@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metadata.HoodieMetadataPayload; import java.io.IOException; @@ -34,12 +36,17 @@ public class HoodieFileReaderFactory { public static HoodieFileReader getFileReader(Configuration conf, Path path) throws IOException { + return getFileReader(conf, path, Option.empty()); + } + + public static HoodieFileReader getFileReader(Configuration conf, Path path, + Option keyField) throws IOException { final String extension = FSUtils.getFileExtension(path.toString()); if (PARQUET.getFileExtension().equals(extension)) { return newParquetFileReader(conf, path); } if (HFILE.getFileExtension().equals(extension)) { - return newHFileFileReader(conf, path); + return newHFileFileReader(conf, path, keyField.orElse(HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY)); } if (ORC.getFileExtension().equals(extension)) { return newOrcFileReader(conf, path); @@ -52,9 +59,10 @@ private static HoodieFileReader newParquetFileReade return new HoodieParquetReader<>(conf, path); } - private static HoodieFileReader newHFileFileReader(Configuration conf, Path path) throws IOException { + private static HoodieFileReader newHFileFileReader(Configuration conf, Path path, + String keyField) throws IOException { CacheConfig cacheConfig = new CacheConfig(conf); - return new HoodieHFileReader<>(conf, path, cacheConfig); + return new HoodieHFileReader<>(conf, path, cacheConfig, keyField); } private static HoodieFileReader newOrcFileReader(Configuration conf, Path path) { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index e3e38eca86ca9..b859c51996eb7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -50,6 +50,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -62,6 +63,7 @@ public class HoodieHFileReader implements HoodieFileRea // Scanner used to read individual keys. This is cached to prevent the overhead of opening the scanner for each // key retrieval. private HFileScanner keyScanner; + private final String keyField; public static final String KEY_SCHEMA = "schema"; public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter"; @@ -69,24 +71,27 @@ public class HoodieHFileReader implements HoodieFileRea public static final String KEY_MIN_RECORD = "minRecordKey"; public static final String KEY_MAX_RECORD = "maxRecordKey"; - public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig) throws IOException { + public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, String keyField) throws IOException { this.conf = configuration; this.path = path; this.reader = HFile.createReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf); + this.keyField = keyField; } - public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem inlineFs) throws IOException { + public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem inlineFs, String keyField) throws IOException { this.conf = configuration; this.path = path; this.fsDataInputStream = inlineFs.open(path); this.reader = HFile.createReader(inlineFs, path, cacheConfig, configuration); + this.keyField = keyField; } - public HoodieHFileReader(byte[] content) throws IOException { + public HoodieHFileReader(byte[] content, String keyField) throws IOException { Configuration conf = new Configuration(); Path path = new Path("hoodie"); SeekableByteArrayInputStream bis = new SeekableByteArrayInputStream(content); FSDataInputStream fsdis = new FSDataInputStream(bis); + this.keyField = keyField; this.reader = HFile.createReader(FSUtils.getFs("hoodie", conf), path, new FSDataInputStreamWrapper(fsdis), content.length, new CacheConfig(conf), conf); } @@ -151,15 +156,16 @@ public Set filterRowKeys(Set candidateRowKeys) { } public List> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException { + final Schema.Field keySchemaField = readerSchema.getField(keyField); + ValidationUtils.checkState(keySchemaField != null); List> recordList = new LinkedList<>(); try { final HFileScanner scanner = reader.getScanner(false, false); if (scanner.seekTo()) { do { Cell c = scanner.getKeyValue(); - byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), c.getRowOffset(), c.getRowOffset() + c.getRowLength()); - R record = getRecordFromCell(c, writerSchema, readerSchema); - recordList.add(new Pair<>(new String(keyBytes), record)); + final Pair keyAndRecordPair = getRecordFromCell(c, writerSchema, readerSchema, keySchemaField); + recordList.add(new Pair<>(keyAndRecordPair.getFirst(), keyAndRecordPair.getSecond())); } while (scanner.next()); } @@ -196,6 +202,8 @@ public List> readRecords(List keys, Schema schema) throw @Override public Iterator getRecordIterator(Schema readerSchema) throws IOException { final HFileScanner scanner = reader.getScanner(false, false); + final Schema.Field keySchemaField = readerSchema.getField(keyField); + ValidationUtils.checkState(keySchemaField != null); return new Iterator() { private R next = null; private boolean eof = false; @@ -206,7 +214,8 @@ public boolean hasNext() { // To handle when hasNext() is called multiple times for idempotency and/or the first time if (this.next == null && !this.eof) { if (!scanner.isSeeked() && scanner.seekTo()) { - this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); + final Pair keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keySchemaField); + this.next = keyAndRecordPair.getSecond(); } } return this.next != null; @@ -226,7 +235,8 @@ public R next() { } R retVal = this.next; if (scanner.next()) { - this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); + final Pair keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keySchemaField); + this.next = keyAndRecordPair.getSecond(); } else { this.next = null; this.eof = true; @@ -242,6 +252,8 @@ public R next() { @Override public Option getRecordByKey(String key, Schema readerSchema) throws IOException { byte[] value = null; + final Schema.Field keySchemaField = readerSchema.getField(keyField); + ValidationUtils.checkState(keySchemaField != null); KeyValue kv = new KeyValue(key.getBytes(), null, null, null); synchronized (this) { @@ -257,16 +269,57 @@ public Option getRecordByKey(String key, Schema readerSchema) throws IOException } if (value != null) { - R record = (R)HoodieAvroUtils.bytesToAvro(value, getSchema(), readerSchema); + R record = deserialize(key.getBytes(), value, getSchema(), readerSchema, keySchemaField); return Option.of(record); } return Option.empty(); } - private R getRecordFromCell(Cell c, Schema writerSchema, Schema readerSchema) throws IOException { - byte[] value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength()); - return (R)HoodieAvroUtils.bytesToAvro(value, writerSchema, readerSchema); + /** + * Get the record from HBase cell. + * + * @param cell - HBase Cell + * @param writerSchema - Writer schema + * @param readerSchema - Reader schema + * @return Record key and record object pair + * @throws IOException + */ + private Pair getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema, Schema.Field keySchemaField) throws IOException { + final byte[] keyBytes = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength()); + final byte[] valueBytes = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength()); + R record = deserialize(keyBytes, valueBytes, writerSchema, readerSchema, keySchemaField); + return new Pair<>(new String(keyBytes), record); + } + + /** + * Deserialize the record byte array contents to record object. + * + * @param keyBytes - Record key as byte array + * @param valueBytes - Record content as byte array + * @param writerSchema - Writer schema + * @param readerSchema - Reader schema + * @param keySchemaField - Key field id in the schema + * @return Deserialized record object + */ + private R deserialize(final byte[] keyBytes, final byte[] valueBytes, Schema writerSchema, Schema readerSchema, Schema.Field keySchemaField) throws IOException { + R record = (R) HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema); + materializeRecordIfNeeded(keyBytes, record, keySchemaField); + return record; + } + + /** + * Materialize the record for any missing fields, if needed. + * + * @param keyBytes - Key byte array + * @param record - Record object to materialize + * @param keySchemaField - Key field id in the schema + */ + private void materializeRecordIfNeeded(final byte[] keyBytes, R record, Schema.Field keySchemaField) { + final Object keyObject = record.get(keySchemaField.pos()); + if (keyObject != null && keyObject.toString().isEmpty()) { + record.put(keySchemaField.pos(), new String(keyBytes)); + } } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index c9e538f72eaa0..525e43b33ab79 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -279,7 +279,8 @@ private Pair getBaseFileReader(FileSlice slice, HoodieTi Option basefile = slice.getBaseFile(); if (basefile.isPresent()) { String basefilePath = basefile.get().getPath(); - baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath)); + baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath), + Option.of(HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY)); baseFileOpenMs = timer.endTimer(); LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", basefilePath, basefile.get().getCommitTime(), baseFileOpenMs)); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index bbfd8cf4ad39b..0667ddf367bf6 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -56,6 +56,7 @@ import org.apache.hudi.exception.CorruptedLogFileException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.metadata.HoodieMetadataPayload; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -375,7 +376,7 @@ public void testBasicWriteAndScan() throws IOException, URISyntaxException, Inte writer.appendBlock(dataBlock); writer.close(); - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), HoodieRecord.RECORD_KEY_METADATA_FIELD); assertTrue(reader.hasNext(), "We wrote a block, we should be able to read it"); HoodieLogBlock nextBlock = reader.next(); assertEquals(dataBlockType, nextBlock.getBlockType(), "The next block should be a data block"); @@ -415,7 +416,7 @@ public void testHugeLogFileWrite() throws IOException, URISyntaxException, Inter writer.close(); Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), - true, true); + true, true, HoodieRecord.RECORD_KEY_METADATA_FIELD); assertTrue(reader.hasNext(), "We wrote a block, we should be able to read it"); HoodieLogBlock nextBlock = reader.next(); assertEquals(dataBlockType, nextBlock.getBlockType(), "The next block should be a data block"); @@ -486,7 +487,7 @@ public void testBasicAppendAndRead() throws IOException, URISyntaxException, Int writer.appendBlock(dataBlock); writer.close(); - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), HoodieRecord.RECORD_KEY_METADATA_FIELD); assertTrue(reader.hasNext(), "First block should be available"); HoodieLogBlock nextBlock = reader.next(); HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock; @@ -611,7 +612,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep writer.close(); // First round of reads - we should be able to read the first block and then EOF - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), HoodieRecord.RECORD_KEY_METADATA_FIELD); assertTrue(reader.hasNext(), "First block should be available"); reader.next(); assertTrue(reader.hasNext(), "We should have corrupted block next"); @@ -649,7 +650,7 @@ public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxExcep writer.close(); // Second round of reads - we should be able to read the first and last block - reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), HoodieRecord.RECORD_KEY_METADATA_FIELD); assertTrue(reader.hasNext(), "First block should be available"); reader.next(); assertTrue(reader.hasNext(), "We should get the 1st corrupted block next"); @@ -707,7 +708,7 @@ public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxE writer.close(); // Read data and corrupt block - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), HoodieRecord.RECORD_KEY_METADATA_FIELD); assertTrue(reader.hasNext(), "First block should be available"); reader.next(); assertTrue(reader.hasNext(), "We should have corrupted block next"); @@ -1603,7 +1604,7 @@ public void testBasicAppendAndReadInReverse(boolean readBlocksLazily) HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()), SchemaTestUtil.getSimpleSchema(), - bufferSize, readBlocksLazily, true); + bufferSize, readBlocksLazily, true, HoodieRecord.RECORD_KEY_METADATA_FIELD); assertTrue(reader.hasPrev(), "Last block should be available"); HoodieLogBlock prevBlock = reader.prev(); @@ -1681,7 +1682,8 @@ public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily) // First round of reads - we should be able to read the first block and then EOF HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(), - fs.getFileStatus(writer.getLogFile().getPath()).getLen()), schema, bufferSize, readBlocksLazily, true); + fs.getFileStatus(writer.getLogFile().getPath()).getLen()), schema, bufferSize, readBlocksLazily, + true, HoodieRecord.RECORD_KEY_METADATA_FIELD); assertTrue(reader.hasPrev(), "Last block should be available"); HoodieLogBlock block = reader.prev(); @@ -1733,7 +1735,7 @@ public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily) HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()), SchemaTestUtil.getSimpleSchema(), - bufferSize, readBlocksLazily, true); + bufferSize, readBlocksLazily, true, HoodieRecord.RECORD_KEY_METADATA_FIELD); assertTrue(reader.hasPrev(), "Third block should be available"); reader.moveToPrev(); @@ -1794,9 +1796,9 @@ private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List header) { switch (dataBlockType) { case AVRO_DATA_BLOCK: - return new HoodieAvroDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); + return new HoodieAvroDataBlock(records, header); case HFILE_DATA_BLOCK: - return new HoodieHFileDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); + return new HoodieHFileDataBlock(records, header, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); default: throw new RuntimeException("Unknown data block type " + dataBlockType); } diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java index ec334bde1e437..d834e89fecf35 100644 --- a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java +++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java @@ -21,6 +21,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.util.Option; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -41,19 +42,19 @@ public void testGetFileReader() throws IOException { // parquet file format. final Configuration hadoopConf = new Configuration(); final Path parquetPath = new Path("/partition/path/f1_1-0-1_000.parquet"); - HoodieFileReader parquetReader = HoodieFileReaderFactory.getFileReader(hadoopConf, parquetPath); + HoodieFileReader parquetReader = HoodieFileReaderFactory.getFileReader(hadoopConf, parquetPath, Option.empty()); assertTrue(parquetReader instanceof HoodieParquetReader); // log file format. final Path logPath = new Path("/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1"); final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> { - HoodieFileReader logWriter = HoodieFileReaderFactory.getFileReader(hadoopConf, logPath); + HoodieFileReader logWriter = HoodieFileReaderFactory.getFileReader(hadoopConf, logPath, Option.empty()); }, "should fail since log storage reader is not supported yet."); assertTrue(thrown.getMessage().contains("format not supported yet.")); // Orc file format. final Path orcPath = new Path("/partition/path/f1_1-0-1_000.orc"); - HoodieFileReader orcReader = HoodieFileReaderFactory.getFileReader(hadoopConf, orcPath); + HoodieFileReader orcReader = HoodieFileReaderFactory.getFileReader(hadoopConf, orcPath, Option.empty()); assertTrue(orcReader instanceof HoodieOrcReader); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java index e3bac0b4e8f9a..2c00cb822a131 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -131,7 +132,7 @@ private List listStatusForIncrementalMode( @Override public RecordReader getRecordReader(final InputSplit split, final JobConf job, final Reporter reporter) throws IOException { - return new HoodieHFileRecordReader(conf, split, job); + return new HoodieHFileRecordReader(conf, split, job, HoodieRecord.RECORD_KEY_METADATA_FIELD); } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java index 53ccb7413f9b6..bd3bf163bc250 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java @@ -44,10 +44,10 @@ public class HoodieHFileRecordReader implements RecordReader recordIterator; private Schema schema; - public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException { + public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job, String keyField) throws IOException { FileSplit fileSplit = (FileSplit) split; Path path = fileSplit.getPath(); - reader = new HoodieHFileReader(conf, path, new CacheConfig(conf)); + reader = new HoodieHFileReader(conf, path, new CacheConfig(conf), keyField); schema = reader.getSchema(); valueObj = new ArrayWritable(Writable.class, new Writable[schema.getFields().size()]); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 13d921979c70a..9d41e39527032 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -323,8 +323,9 @@ public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, Map header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); - HoodieDataBlock dataBlock = (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) ? new HoodieHFileDataBlock(records, header) : - new HoodieAvroDataBlock(records, header); + HoodieDataBlock dataBlock = (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) + ? new HoodieHFileDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD) + : new HoodieAvroDataBlock(records, header); writer.appendBlock(dataBlock); return writer; } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 2648740f54e0f..c5d604d6215dd 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -265,7 +265,8 @@ private Iterator readColumnarOrLogFiles(FileSlice fileSlice) thro // Read the base files using the latest writer schema. Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), - new Path(fileSlice.getBaseFile().get().getPath())); + new Path(fileSlice.getBaseFile().get().getPath()), + Option.of(metaClient.getTableConfig().getRecordKeyFieldProp())); return reader.getRecordIterator(schema); } else { // If there is no data file, fall back to reading log files diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 98b11f2f37cc4..1fa8159a33c06 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -232,7 +232,7 @@ public String getColumnType(ResultSet resultSet) throws SQLException { */ @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private MessageType readSchemaFromLogFile(Option lastCompactionCommitOpt, Path path) throws Exception { - MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path); + MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path, metaClient.getTableConfig().getRecordKeyFieldProp()); // Fall back to read the schema from last compaction if (messageType == null) { LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt); diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip index 299b070bee34a..9611d27690577 100644 Binary files a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip differ diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip index d80439d20d3df..1e498310ff71a 100644 Binary files a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip differ