diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 0b6afd4d28b92..38db1cde41226 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -87,7 +87,8 @@ private static HoodieFi BloomFilter filter = createBloomFilter(config); HoodieHFileConfig hfileConfig = new HoodieHFileConfig(hoodieTable.getHadoopConf(), config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(), - PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); + HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, + filter, HFILE_COMPARATOR); return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java index 7e4c519a8fafc..1079566b782f1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java @@ -43,9 +43,10 @@ public class HoodieHFileConfig { private final Configuration hadoopConf; private final BloomFilter bloomFilter; private final KeyValue.KVComparator hfileComparator; + private final String keyFieldName; public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize, - long maxFileSize, boolean prefetchBlocksOnOpen, boolean cacheDataInL1, + long maxFileSize, String keyFieldName, boolean prefetchBlocksOnOpen, boolean cacheDataInL1, boolean dropBehindCacheCompaction, BloomFilter bloomFilter, KeyValue.KVComparator hfileComparator) { this.hadoopConf = hadoopConf; this.compressionAlgorithm = compressionAlgorithm; @@ -56,6 +57,7 @@ public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compres this.dropBehindCacheCompaction = dropBehindCacheCompaction; this.bloomFilter = bloomFilter; this.hfileComparator = hfileComparator; + this.keyFieldName = keyFieldName; } public Configuration getHadoopConf() { @@ -97,4 +99,8 @@ public BloomFilter getBloomFilter() { public KeyValue.KVComparator getHfileComparator() { return hfileComparator; } + + public String getKeyFieldName() { + return keyFieldName; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index a719bcb8f334f..2ad6d7f9220b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.io.Writable; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import java.io.DataInput; import java.io.DataOutput; @@ -63,6 +65,8 @@ public class HoodieHFileWriter keyFieldSchema; private HFile.Writer writer; private String minRecordKey; private String maxRecordKey; @@ -77,6 +81,8 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf); this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf); this.hfileConfig = hfileConfig; + this.schema = schema; + this.keyFieldSchema = Option.ofNullable(schema.getField(hfileConfig.getKeyFieldName())); // TODO - compute this compression ratio dynamically by looking at the bytes written to the // stream and the actual file size reported by HDFS @@ -121,8 +127,25 @@ public boolean canWrite() { } @Override - public void writeAvro(String recordKey, IndexedRecord object) throws IOException { - byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord)object); + public void writeAvro(String recordKey, IndexedRecord record) throws IOException { + byte[] value = null; + boolean isRecordSerialized = false; + if (keyFieldSchema.isPresent()) { + GenericRecord keyExcludedRecord = (GenericRecord) record; + int keyFieldPos = this.keyFieldSchema.get().pos(); + boolean isKeyAvailable = (record.get(keyFieldPos) != null && !(record.get(keyFieldPos).toString().isEmpty())); + if (isKeyAvailable) { + Object originalKey = keyExcludedRecord.get(keyFieldPos); + keyExcludedRecord.put(keyFieldPos, StringUtils.EMPTY_STRING); + value = HoodieAvroUtils.avroToBytes(keyExcludedRecord); + keyExcludedRecord.put(keyFieldPos, originalKey); + isRecordSerialized = true; + } + } + if (!isRecordSerialized) { + value = HoodieAvroUtils.avroToBytes((GenericRecord) record); + } + KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, value); writer.append(kv); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 32f05cbad870c..c849150fb8dbf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -96,7 +96,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta // Virtual keys support for metadata table. This Field is // from the metadata payload schema. - private static final String RECORD_KEY_FIELD = HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY; + private static final String RECORD_KEY_FIELD_NAME = HoodieMetadataPayload.KEY_FIELD_NAME; protected HoodieWriteConfig metadataWriteConfig; protected HoodieWriteConfig dataWriteConfig; @@ -217,8 +217,8 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi // RecordKey properties are needed for the metadata table records final Properties properties = new Properties(); - properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), RECORD_KEY_FIELD); - properties.put("hoodie.datasource.write.recordkey.field", RECORD_KEY_FIELD); + properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), RECORD_KEY_FIELD_NAME); + properties.put("hoodie.datasource.write.recordkey.field", RECORD_KEY_FIELD_NAME); builder.withProperties(properties); if (writeConfig.isMetricsOn()) { @@ -454,7 +454,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPayloadClassName(HoodieMetadataPayload.class.getName()) .setBaseFileFormat(HoodieFileFormat.HFILE.toString()) - .setRecordKeyFields(RECORD_KEY_FIELD) + .setRecordKeyFields(RECORD_KEY_FIELD_NAME) .setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields()) .setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java index 4ec143bf06789..332be73b14f57 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java @@ -42,7 +42,7 @@ public HoodieTableMetadataKeyGenerator(TypedProperties config) { @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY, isConsistentLogicalTimestampEnabled()); + return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.KEY_FIELD_NAME, isConsistentLogicalTimestampEnabled()); } @Override diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java index 86a0886de664d..190ebcbdbce16 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -103,7 +103,7 @@ private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean populateM String instantTime = "000"; HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024, - PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); + HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR); return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier, populateMetaFields); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index e3db3914ada77..014cc63546434 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,6 +18,8 @@ package org.apache.hudi.client.functional; +import org.apache.avro.Schema; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -31,20 +33,24 @@ import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; -import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -59,24 +65,22 @@ import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.config.metrics.HoodieMetricsConfig; -import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; -import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader; import org.apache.hudi.metadata.HoodieMetadataMetrics; import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.metadata.HoodieTableMetadata; -import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; @@ -96,6 +100,8 @@ import org.apache.hadoop.util.Time; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; @@ -135,8 +141,8 @@ import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; -import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -374,7 +380,6 @@ public void testMetadataTableServices() throws Exception { assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000003001"); } - /** * Tests that virtual key configs are honored in base files after compaction in metadata table. * @@ -508,6 +513,255 @@ public void testMetadataTableWithPendingCompaction(boolean simulateFailedCompact } } + /** + * Test arguments - Table type, populate meta fields, exclude key from payload. + */ + public static List testMetadataRecordKeyExcludeFromPayloadArgs() { + return asList( + Arguments.of(COPY_ON_WRITE, true), + Arguments.of(COPY_ON_WRITE, false), + Arguments.of(MERGE_ON_READ, true), + Arguments.of(MERGE_ON_READ, false) + ); + } + + /** + * 1. Verify metadata table records key deduplication feature. When record key + * deduplication is enabled, verify the metadata record payload on disk has empty key. + * Otherwise, verify the valid key. + * 2. Verify populate meta fields work irrespective of record key deduplication config. + * 3. Verify table services like compaction benefit from record key deduplication feature. + */ + @ParameterizedTest + @MethodSource("testMetadataRecordKeyExcludeFromPayloadArgs") + public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType tableType, final boolean enableMetaFields) throws Exception { + initPath(); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .withPopulateMetaFields(enableMetaFields) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .build()) + .build(); + init(tableType, writeConfig); + + // 2nd commit + doWriteOperation(testTable, "0000001", INSERT); + + final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder() + .setConf(hadoopConf) + .setBasePath(metadataTableBasePath) + .build(); + HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig); + metadataMetaClient.reloadActiveTimeline(); + final HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig, context, metadataMetaClient); + + // Compaction has not yet kicked in. Verify all the log files + // for the metadata records persisted on disk as per the config. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000001", + enableMetaFields); + }, "Metadata table should have valid log files!"); + + // Verify no base file created yet. + assertThrows(IllegalStateException.class, () -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields); + }, "Metadata table should not have a base file yet!"); + + // 2 more commits + doWriteOperation(testTable, "0000002", UPSERT); + doWriteOperation(testTable, "0000004", UPSERT); + + // Compaction should be triggered by now. Let's verify the log files + // if any for the metadata records persisted on disk as per the config. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000002", + enableMetaFields); + }, "Metadata table should have valid log files!"); + + // Verify the base file created by the just completed compaction. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields); + }, "Metadata table should have a valid base file!"); + + // 2 more commits to trigger one more compaction, along with a clean + doWriteOperation(testTable, "0000005", UPSERT); + doClean(testTable, "0000006", Arrays.asList("0000004")); + doWriteOperation(testTable, "0000007", UPSERT); + + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "7", enableMetaFields); + }, "Metadata table should have valid log files!"); + + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table, enableMetaFields); + }, "Metadata table should have a valid base file!"); + + validateMetadata(testTable); + } + + /** + * Verify the metadata table log files for the record field correctness. On disk format + * should be based on meta fields and key deduplication config. And the in-memory merged + * records should all be materialized fully irrespective of the config. + * + * @param table - Hoodie metadata test table + * @param metadataMetaClient - Metadata meta client + * @param latestCommitTimestamp - Latest commit timestamp + * @param enableMetaFields - Enable meta fields for the table records + * @throws IOException + */ + private void verifyMetadataRecordKeyExcludeFromPayloadLogFiles(HoodieTable table, HoodieTableMetaClient metadataMetaClient, + String latestCommitTimestamp, + boolean enableMetaFields) throws IOException { + table.getHoodieView().sync(); + + // Compaction should not be triggered yet. Let's verify no base file + // and few log files available. + List fileSlices = table.getSliceView() + .getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); + if (fileSlices.isEmpty()) { + throw new IllegalStateException("LogFile slices are not available!"); + } + + // Verify the log files honor the key deduplication and virtual keys config + List logFiles = fileSlices.get(0).getLogFiles().map(logFile -> { + return logFile; + }).collect(Collectors.toList()); + + List logFilePaths = logFiles.stream().map(logFile -> { + return logFile.getPath().toString(); + }).collect(Collectors.toList()); + + // Verify the on-disk raw records before they get materialized + verifyMetadataRawRecords(table, logFiles, enableMetaFields); + + // Verify the in-memory materialized and merged records + verifyMetadataMergedRecords(metadataMetaClient, logFilePaths, latestCommitTimestamp, enableMetaFields); + } + + /** + * Verify the metadata table on-disk raw records. When populate meta fields is enabled, + * these records should have additional meta fields in the payload. When key deduplication + * is enabled, these records on the disk should have key in the payload as empty string. + * + * @param table + * @param logFiles - Metadata table log files to be verified + * @param enableMetaFields - Enable meta fields for records + * @throws IOException + */ + private void verifyMetadataRawRecords(HoodieTable table, List logFiles, boolean enableMetaFields) throws IOException { + for (HoodieLogFile logFile : logFiles) { + FileStatus[] fsStatus = fs.listStatus(logFile.getPath()); + MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(fs, logFile.getPath()); + if (writerSchemaMsg == null) { + // not a data block + continue; + } + + Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg); + HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); + + while (logFileReader.hasNext()) { + HoodieLogBlock logBlock = logFileReader.next(); + if (logBlock instanceof HoodieDataBlock) { + for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) { + final GenericRecord record = (GenericRecord) indexRecord; + if (enableMetaFields) { + // Metadata table records should have meta fields! + assertNotNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + } else { + // Metadata table records should not have meta fields! + assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + } + + final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); + assertFalse(key.isEmpty()); + if (enableMetaFields) { + assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)))); + } + } + } + } + } + } + + /** + * Verify the metadata table in-memory merged records. Irrespective of key deduplication + * config, the in-memory merged records should always have the key field in the record + * payload fully materialized. + * + * @param metadataMetaClient - Metadata table meta client + * @param logFilePaths - Metadata table log file paths + * @param latestCommitTimestamp + * @param enableMetaFields - Enable meta fields + */ + private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClient, List logFilePaths, + String latestCommitTimestamp, boolean enableMetaFields) { + Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); + if (enableMetaFields) { + schema = HoodieAvroUtils.addMetadataFields(schema); + } + HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder() + .withFileSystem(metadataMetaClient.getFs()) + .withBasePath(metadataMetaClient.getBasePath()) + .withLogFilePaths(logFilePaths) + .withLatestInstantTime(latestCommitTimestamp) + .withPartition(MetadataPartitionType.FILES.partitionPath()) + .withReaderSchema(schema) + .withMaxMemorySizeInBytes(100000L) + .withBufferSize(4096) + .withSpillableMapBasePath(tempDir.toString()) + .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK) + .build(); + + assertDoesNotThrow(() -> { + logRecordReader.scan(); + }, "Metadata log records materialization failed"); + + for (Map.Entry> entry : logRecordReader.getRecords().entrySet()) { + assertFalse(entry.getKey().isEmpty()); + assertFalse(entry.getValue().getRecordKey().isEmpty()); + assertEquals(entry.getKey(), entry.getValue().getRecordKey()); + } + } + + /** + * Verify metadata table base files for the records persisted based on the config. When + * the key deduplication is enabled, the records persisted on the disk in the base file + * should have key field in the payload as empty string. + * + * @param table - Metadata table + * @param enableMetaFields - Enable meta fields + */ + private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable table, boolean enableMetaFields) throws IOException { + table.getHoodieView().sync(); + List fileSlices = table.getSliceView() + .getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); + if (!fileSlices.get(0).getBaseFile().isPresent()) { + throw new IllegalStateException("Base file not available!"); + } + final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); + + HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), + new Path(baseFile.getPath()), + new CacheConfig(context.getHadoopConf().get())); + List> records = hoodieHFileReader.readAllRecords(); + records.forEach(entry -> { + if (enableMetaFields) { + assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } else { + assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + } + + final String keyInPayload = (String) ((GenericRecord) entry.getSecond()) + .get(HoodieMetadataPayload.KEY_FIELD_NAME); + assertFalse(keyInPayload.isEmpty()); + }); + } + /** * Test rollback of various table operations sync to Metadata Table correctly. */ @@ -1491,95 +1745,6 @@ public void testMetadataMetrics() throws Exception { } } - /** - * Fetching WriteConfig for metadata table from Data table's writeConfig is not trivial and the method is not public in source code. so, for now, - * using this method which mimics source code. - * @param writeConfig - * @return - */ - private HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig writeConfig) { - int parallelism = writeConfig.getMetadataInsertParallelism(); - - int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep()); - int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep()); - - // Create the write config for the metadata table by borrowing options from the main write config. - HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() - .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) - .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() - .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled()) - .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs()) - .withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs()) - .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks()) - .build()) - .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build()) - .withAutoCommit(true) - .withAvroSchemaValidate(true) - .withEmbeddedTimelineServerEnabled(false) - .withMarkersType(MarkerType.DIRECT.name()) - .withRollbackUsingMarkers(false) - .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath())) - .withSchema(HoodieMetadataRecord.getClassSchema().toString()) - .forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withAsyncClean(writeConfig.isMetadataAsyncClean()) - // we will trigger cleaning manually, to control the instant times - .withAutoClean(false) - .withCleanerParallelism(parallelism) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) - .retainCommits(writeConfig.getMetadataCleanerCommitsRetained()) - .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep) - // we will trigger compaction manually, to control the instant times - .withInlineCompaction(false) - .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build()) - .withParallelism(parallelism, parallelism) - .withDeleteParallelism(parallelism) - .withRollbackParallelism(parallelism) - .withFinalizeWriteParallelism(parallelism) - .withAllowMultiWriteOnSameInstant(true) - .withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) - .withPopulateMetaFields(writeConfig.getMetadataConfig().populateMetaFields()); - - // RecordKey properties are needed for the metadata table records - final Properties properties = new Properties(); - properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); - properties.put("hoodie.datasource.write.recordkey.field", HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); - builder.withProperties(properties); - - if (writeConfig.isMetricsOn()) { - builder.withMetricsConfig(HoodieMetricsConfig.newBuilder() - .withReporterType(writeConfig.getMetricsReporterType().toString()) - .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled()) - .on(true).build()); - switch (writeConfig.getMetricsReporterType()) { - case GRAPHITE: - builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() - .onGraphitePort(writeConfig.getGraphiteServerPort()) - .toGraphiteHost(writeConfig.getGraphiteServerHost()) - .usePrefix(writeConfig.getGraphiteMetricPrefix()).build()); - break; - case JMX: - builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder() - .onJmxPort(writeConfig.getJmxPort()) - .toJmxHost(writeConfig.getJmxHost()) - .build()); - break; - case DATADOG: - case PROMETHEUS: - case PROMETHEUS_PUSHGATEWAY: - case CONSOLE: - case INMEMORY: - case CLOUDWATCH: - break; - default: - throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType()); - } - } - return builder.build(); - } - private void doPreBootstrapOperations(HoodieTestTable testTable) throws Exception { doPreBootstrapOperations(testTable, "0000001", "0000002"); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index d6f151e34255a..1abe15bd008d8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -18,30 +18,63 @@ package org.apache.hudi.client.functional; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader; +import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator; +import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.fs.FileStatus; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.common.model.WriteOperationType.INSERT; +import static org.apache.hudi.common.model.WriteOperationType.UPSERT; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -123,4 +156,216 @@ public void testNotExistPartition(final HoodieTableType tableType) throws Except tableMetadata.getAllFilesInPartition(new Path(writeConfig.getBasePath() + "dummy")); assertEquals(allFilesInPartition.length, 0); } + + /** + * 1. Verify metadata table records key deduplication feature. When record key + * deduplication is enabled, verify the metadata record payload on disk has empty key. + * Otherwise, verify the valid key. + * 2. Verify populate meta fields work irrespective of record key deduplication config. + * 3. Verify table services like compaction benefit from record key deduplication feature. + */ + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetadataRecordKeyExcludeFromPayload(final HoodieTableType tableType) throws Exception { + initPath(); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .withPopulateMetaFields(false) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .build()) + .build(); + init(tableType, writeConfig); + + // 2nd commit + doWriteOperation(testTable, "0000001", INSERT); + + final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder() + .setConf(hadoopConf) + .setBasePath(metadataTableBasePath) + .build(); + HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig); + metadataMetaClient.reloadActiveTimeline(); + final HoodieTable table = HoodieSparkTable.create(metadataTableWriteConfig, context, metadataMetaClient); + + // Compaction has not yet kicked in. Verify all the log files + // for the metadata records persisted on disk as per the config. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000001"); + }, "Metadata table should have valid log files!"); + + // Verify no base file created yet. + assertThrows(IllegalStateException.class, () -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table); + }, "Metadata table should not have a base file yet!"); + + // 2 more commits + doWriteOperation(testTable, "0000002", UPSERT); + doWriteOperation(testTable, "0000004", UPSERT); + + // Compaction should be triggered by now. Let's verify the log files + // if any for the metadata records persisted on disk as per the config. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "0000002"); + }, "Metadata table should have valid log files!"); + + // Verify the base file created by the just completed compaction. + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table); + }, "Metadata table should have a valid base file!"); + + // 2 more commits to trigger one more compaction, along with a clean + doWriteOperation(testTable, "0000005", UPSERT); + doClean(testTable, "0000006", Arrays.asList("0000004")); + doWriteOperation(testTable, "0000007", UPSERT); + + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadLogFiles(table, metadataMetaClient, "7"); + }, "Metadata table should have valid log files!"); + + assertDoesNotThrow(() -> { + verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(table); + }, "Metadata table should have a valid base file!"); + + validateMetadata(testTable); + } + + /** + * Verify the metadata table log files for the record field correctness. On disk format + * should be based on meta fields and key deduplication config. And the in-memory merged + * records should all be materialized fully irrespective of the config. + * + * @param table - Hoodie metadata test table + * @param metadataMetaClient - Metadata meta client + * @param latestCommitTimestamp - Latest commit timestamp + * @throws IOException + */ + private void verifyMetadataRecordKeyExcludeFromPayloadLogFiles(HoodieTable table, HoodieTableMetaClient metadataMetaClient, + String latestCommitTimestamp) throws IOException { + table.getHoodieView().sync(); + + // Compaction should not be triggered yet. Let's verify no base file + // and few log files available. + List fileSlices = table.getSliceView() + .getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); + if (fileSlices.isEmpty()) { + throw new IllegalStateException("LogFile slices are not available!"); + } + + // Verify the log files honor the key deduplication and virtual keys config + List logFiles = fileSlices.get(0).getLogFiles().map(logFile -> { + return logFile; + }).collect(Collectors.toList()); + + List logFilePaths = logFiles.stream().map(logFile -> { + return logFile.getPath().toString(); + }).collect(Collectors.toList()); + + // Verify the on-disk raw records before they get materialized + verifyMetadataRawRecords(table, logFiles); + + // Verify the in-memory materialized and merged records + verifyMetadataMergedRecords(metadataMetaClient, logFilePaths, latestCommitTimestamp); + } + + /** + * Verify the metadata table on-disk raw records. When populate meta fields is enabled, + * these records should have additional meta fields in the payload. When key deduplication + * is enabled, these records on the disk should have key in the payload as empty string. + * + * @param table + * @param logFiles - Metadata table log files to be verified + * @throws IOException + */ + private void verifyMetadataRawRecords(HoodieTable table, List logFiles) throws IOException { + for (HoodieLogFile logFile : logFiles) { + FileStatus[] fsStatus = fs.listStatus(logFile.getPath()); + MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(fs, logFile.getPath()); + if (writerSchemaMsg == null) { + // not a data block + continue; + } + + Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg); + HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); + + while (logFileReader.hasNext()) { + HoodieLogBlock logBlock = logFileReader.next(); + if (logBlock instanceof HoodieDataBlock) { + for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) { + final GenericRecord record = (GenericRecord) indexRecord; + // Metadata table records should not have meta fields! + assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + + final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); + assertFalse(key.isEmpty()); + } + } + } + } + } + + /** + * Verify the metadata table in-memory merged records. Irrespective of key deduplication + * config, the in-memory merged records should always have the key field in the record + * payload fully materialized. + * + * @param metadataMetaClient - Metadata table meta client + * @param logFilePaths - Metadata table log file paths + * @param latestCommitTimestamp - Latest commit timestamp + */ + private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClient, List logFilePaths, String latestCommitTimestamp) { + Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); + HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder() + .withFileSystem(metadataMetaClient.getFs()) + .withBasePath(metadataMetaClient.getBasePath()) + .withLogFilePaths(logFilePaths) + .withLatestInstantTime(latestCommitTimestamp) + .withPartition(MetadataPartitionType.FILES.partitionPath()) + .withReaderSchema(schema) + .withMaxMemorySizeInBytes(100000L) + .withBufferSize(4096) + .withSpillableMapBasePath(tempDir.toString()) + .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK) + .build(); + + assertDoesNotThrow(() -> { + logRecordReader.scan(); + }, "Metadata log records materialization failed"); + + for (Map.Entry> entry : logRecordReader.getRecords().entrySet()) { + assertFalse(entry.getKey().isEmpty()); + assertFalse(entry.getValue().getRecordKey().isEmpty()); + assertEquals(entry.getKey(), entry.getValue().getRecordKey()); + } + } + + /** + * Verify metadata table base files for the records persisted based on the config. When + * the key deduplication is enabled, the records persisted on the disk in the base file + * should have key field in the payload as empty string. + * + * @param table - Metadata table + */ + private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable table) throws IOException { + table.getHoodieView().sync(); + List fileSlices = table.getSliceView() + .getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); + if (!fileSlices.get(0).getBaseFile().isPresent()) { + throw new IllegalStateException("Base file not available!"); + } + final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get(); + + HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), + new Path(baseFile.getPath()), + new CacheConfig(context.getHadoopConf().get())); + List> records = hoodieHFileReader.readAllRecords(); + records.forEach(entry -> { + assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + final String keyInPayload = (String) ((GenericRecord) entry.getSecond()) + .get(HoodieMetadataPayload.KEY_FIELD_NAME); + assertFalse(keyInPayload.isEmpty()); + }); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 56c9f016bcc6e..f419858558690 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -18,12 +18,19 @@ package org.apache.hudi.client.functional; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestTable; @@ -33,17 +40,19 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; +import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.testutils.HoodieClientTestHarness; - -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.AfterEach; @@ -59,6 +68,7 @@ import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; public class TestHoodieMetadataBase extends HoodieClientTestHarness { @@ -94,6 +104,20 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable); } + public void init(HoodieTableType tableType, HoodieWriteConfig writeConfig) throws IOException { + this.tableType = tableType; + initPath(); + initSparkContexts("TestHoodieMetadata"); + initFileSystem(); + fs.mkdirs(new Path(basePath)); + initTimelineService(); + initMetaClient(tableType); + initTestDataGenerator(); + metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); + this.writeConfig = writeConfig; + initWriteConfigAndMetatableWriter(writeConfig, writeConfig.isMetadataTableEnabled()); + } + protected void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) { this.writeConfig = writeConfig; if (enableMetadataTable) { @@ -327,4 +351,91 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea .withProperties(properties); } + /** + * Fetching WriteConfig for metadata table from Data table's writeConfig is not trivial and + * the method is not public in source code. so, for now, using this method which mimics source code. + */ + protected HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig writeConfig) { + int parallelism = writeConfig.getMetadataInsertParallelism(); + + int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep()); + int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep()); + + // Create the write config for the metadata table by borrowing options from the main write config. + HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() + .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled()) + .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs()) + .withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs()) + .withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks()) + .build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build()) + .withAutoCommit(true) + .withAvroSchemaValidate(true) + .withEmbeddedTimelineServerEnabled(false) + .withMarkersType(MarkerType.DIRECT.name()) + .withRollbackUsingMarkers(false) + .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath())) + .withSchema(HoodieMetadataRecord.getClassSchema().toString()) + .forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withAsyncClean(writeConfig.isMetadataAsyncClean()) + // we will trigger cleaning manually, to control the instant times + .withAutoClean(false) + .withCleanerParallelism(parallelism) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .retainCommits(writeConfig.getMetadataCleanerCommitsRetained()) + .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep) + // we will trigger compaction manually, to control the instant times + .withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build()) + .withParallelism(parallelism, parallelism) + .withDeleteParallelism(parallelism) + .withRollbackParallelism(parallelism) + .withFinalizeWriteParallelism(parallelism) + .withAllowMultiWriteOnSameInstant(true) + .withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) + .withPopulateMetaFields(writeConfig.getMetadataConfig().populateMetaFields()); + + // RecordKey properties are needed for the metadata table records + final Properties properties = new Properties(); + properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), HoodieMetadataPayload.KEY_FIELD_NAME); + properties.put("hoodie.datasource.write.recordkey.field", HoodieMetadataPayload.KEY_FIELD_NAME); + builder.withProperties(properties); + + if (writeConfig.isMetricsOn()) { + builder.withMetricsConfig(HoodieMetricsConfig.newBuilder() + .withReporterType(writeConfig.getMetricsReporterType().toString()) + .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled()) + .on(true).build()); + switch (writeConfig.getMetricsReporterType()) { + case GRAPHITE: + builder.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() + .onGraphitePort(writeConfig.getGraphiteServerPort()) + .toGraphiteHost(writeConfig.getGraphiteServerHost()) + .usePrefix(writeConfig.getGraphiteMetricPrefix()).build()); + break; + case JMX: + builder.withMetricsJmxConfig(HoodieMetricsJmxConfig.newBuilder() + .onJmxPort(writeConfig.getJmxPort()) + .toJmxHost(writeConfig.getJmxHost()) + .build()); + break; + case DATADOG: + case PROMETHEUS: + case PROMETHEUS_PUSHGATEWAY: + case CONSOLE: + case INMEMORY: + case CLOUDWATCH: + break; + default: + throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType()); + } + } + return builder.build(); + } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 51791c945d589..21ba5f4dd2083 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -126,7 +126,7 @@ public final class HoodieMetadataConfig extends HoodieConfig { public static final ConfigProperty POPULATE_META_FIELDS = ConfigProperty .key(METADATA_PREFIX + ".populate.meta.fields") - .defaultValue(true) + .defaultValue(false) .sinceVersion("0.10.0") .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated."); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 7f1fa2aa1d64a..02b500458aeae 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -22,8 +22,8 @@ import org.apache.hudi.common.fs.inline.InLineFSUtils; import org.apache.hudi.common.fs.inline.InLineFileSystem; import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieHBaseKVComparator; @@ -83,10 +83,6 @@ public HoodieHFileDataBlock(@Nonnull List records, @Nonnull Map(), keyField); } - public HoodieHFileDataBlock(@Nonnull List records, @Nonnull Map header) { - this(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); - } - @Override public HoodieLogBlockType getBlockType() { return HoodieLogBlockType.HFILE_DATA_BLOCK; @@ -110,8 +106,8 @@ protected byte[] serializeRecords() throws IOException { boolean useIntegerKey = false; int key = 0; int keySize = 0; - Field keyField = records.get(0).getSchema().getField(this.keyField); - if (keyField == null) { + final Field keyFieldSchema = records.get(0).getSchema().getField(HoodieHFileReader.KEY_FIELD_NAME); + if (keyFieldSchema == null) { // Missing key metadata field so we should use an integer sequence key useIntegerKey = true; keySize = (int) Math.ceil(Math.log(records.size())) + 1; @@ -122,9 +118,9 @@ protected byte[] serializeRecords() throws IOException { if (useIntegerKey) { recordKey = String.format("%" + keySize + "s", key++); } else { - recordKey = record.get(keyField.pos()).toString(); + recordKey = record.get(keyFieldSchema.pos()).toString(); } - byte[] recordBytes = HoodieAvroUtils.indexedRecordToBytes(record); + final byte[] recordBytes = serializeRecord(record, Option.ofNullable(keyFieldSchema)); ValidationUtils.checkState(!sortedRecordsMap.containsKey(recordKey), "Writing multiple records with same key not supported for " + this.getClass().getName()); sortedRecordsMap.put(recordKey, recordBytes); @@ -162,6 +158,20 @@ public List getRecords(List keys) throws IOException { return records; } + /** + * Serialize the record to byte buffer. + * + * @param record - Record to serialize + * @param keyField - Key field in the schema + * @return Serialized byte buffer for the record + */ + private byte[] serializeRecord(final IndexedRecord record, final Option keyField) { + if (keyField.isPresent()) { + record.put(keyField.get().pos(), StringUtils.EMPTY_STRING); + } + return HoodieAvroUtils.indexedRecordToBytes(record); + } + private void readWithInlineFS(List keys) throws IOException { boolean enableFullScan = keys.isEmpty(); // Get schema from the header diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index e3e38eca86ca9..f4058911e4aa6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -50,6 +50,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -63,6 +64,7 @@ public class HoodieHFileReader implements HoodieFileRea // key retrieval. private HFileScanner keyScanner; + public static final String KEY_FIELD_NAME = "key"; public static final String KEY_SCHEMA = "schema"; public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter"; public static final String KEY_BLOOM_FILTER_TYPE_CODE = "bloomFilterTypeCode"; @@ -151,15 +153,15 @@ public Set filterRowKeys(Set candidateRowKeys) { } public List> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException { + final Option keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); List> recordList = new LinkedList<>(); try { final HFileScanner scanner = reader.getScanner(false, false); if (scanner.seekTo()) { do { Cell c = scanner.getKeyValue(); - byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), c.getRowOffset(), c.getRowOffset() + c.getRowLength()); - R record = getRecordFromCell(c, writerSchema, readerSchema); - recordList.add(new Pair<>(new String(keyBytes), record)); + final Pair keyAndRecordPair = getRecordFromCell(c, writerSchema, readerSchema, keyFieldSchema); + recordList.add(keyAndRecordPair); } while (scanner.next()); } @@ -196,6 +198,9 @@ public List> readRecords(List keys, Schema schema) throw @Override public Iterator getRecordIterator(Schema readerSchema) throws IOException { final HFileScanner scanner = reader.getScanner(false, false); + final Option keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); + ValidationUtils.checkState(keyFieldSchema != null, + "Missing key field '" + KEY_FIELD_NAME + "' in the schema!"); return new Iterator() { private R next = null; private boolean eof = false; @@ -206,7 +211,8 @@ public boolean hasNext() { // To handle when hasNext() is called multiple times for idempotency and/or the first time if (this.next == null && !this.eof) { if (!scanner.isSeeked() && scanner.seekTo()) { - this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); + final Pair keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keyFieldSchema); + this.next = keyAndRecordPair.getSecond(); } } return this.next != null; @@ -226,7 +232,8 @@ public R next() { } R retVal = this.next; if (scanner.next()) { - this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); + final Pair keyAndRecordPair = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema, keyFieldSchema); + this.next = keyAndRecordPair.getSecond(); } else { this.next = null; this.eof = true; @@ -242,6 +249,8 @@ public R next() { @Override public Option getRecordByKey(String key, Schema readerSchema) throws IOException { byte[] value = null; + final Option keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); + ValidationUtils.checkState(keyFieldSchema != null); KeyValue kv = new KeyValue(key.getBytes(), null, null, null); synchronized (this) { @@ -257,16 +266,51 @@ public Option getRecordByKey(String key, Schema readerSchema) throws IOException } if (value != null) { - R record = (R)HoodieAvroUtils.bytesToAvro(value, getSchema(), readerSchema); + R record = deserialize(key.getBytes(), value, getSchema(), readerSchema, keyFieldSchema); return Option.of(record); } return Option.empty(); } - private R getRecordFromCell(Cell c, Schema writerSchema, Schema readerSchema) throws IOException { - byte[] value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength()); - return (R)HoodieAvroUtils.bytesToAvro(value, writerSchema, readerSchema); + private Pair getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema, Option keyFieldSchema) throws IOException { + final byte[] keyBytes = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength()); + final byte[] valueBytes = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength()); + R record = deserialize(keyBytes, valueBytes, writerSchema, readerSchema, keyFieldSchema); + return new Pair<>(new String(keyBytes), record); + } + + /** + * Deserialize the record byte array contents to record object. + * + * @param keyBytes - Record key as byte array + * @param valueBytes - Record content as byte array + * @param writerSchema - Writer schema + * @param readerSchema - Reader schema + * @param keyFieldSchema - Key field id in the schema + * @return Deserialized record object + */ + private R deserialize(final byte[] keyBytes, final byte[] valueBytes, Schema writerSchema, Schema readerSchema, + Option keyFieldSchema) throws IOException { + R record = (R) HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema); + materializeRecordIfNeeded(keyBytes, record, keyFieldSchema); + return record; + } + + /** + * Materialize the record for any missing fields, if needed. + * + * @param keyBytes - Key byte array + * @param record - Record object to materialize + * @param keyFieldSchema - Key field id in the schema + */ + private void materializeRecordIfNeeded(final byte[] keyBytes, R record, Option keyFieldSchema) { + if (keyFieldSchema.isPresent()) { + final Object keyObject = record.get(keyFieldSchema.get().pos()); + if (keyObject != null && keyObject.toString().isEmpty()) { + record.put(keyFieldSchema.get().pos(), new String(keyBytes)); + } + } } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java index 01c8d05e9b220..c03bf40c443f4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java @@ -139,7 +139,7 @@ public synchronized List @Override protected String getKeyField() { - return HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY; + return HoodieMetadataPayload.KEY_FIELD_NAME; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 0b0d144a6e7e9..a80f33b2d756b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.io.storage.HoodieHFileReader; import java.io.IOException; import java.util.Arrays; @@ -63,9 +64,9 @@ public class HoodieMetadataPayload implements HoodieRecordPayload { // HoodieMetadata schema field ids - public static final String SCHEMA_FIELD_ID_KEY = "key"; - public static final String SCHEMA_FIELD_ID_TYPE = "type"; - public static final String SCHEMA_FIELD_ID_METADATA = "filesystemMetadata"; + public static final String KEY_FIELD_NAME = HoodieHFileReader.KEY_FIELD_NAME; + public static final String SCHEMA_FIELD_NAME_TYPE = "type"; + public static final String SCHEMA_FIELD_NAME_METADATA = "filesystemMetadata"; // Type of the record // This can be an enum in the schema but Avro 1.8 has a bug - https://issues.apache.org/jira/browse/AVRO-1810 @@ -84,9 +85,9 @@ public HoodieMetadataPayload(Option record) { if (record.isPresent()) { // This can be simplified using SpecificData.deepcopy once this bug is fixed // https://issues.apache.org/jira/browse/AVRO-1811 - key = record.get().get(SCHEMA_FIELD_ID_KEY).toString(); - type = (int) record.get().get(SCHEMA_FIELD_ID_TYPE); - if (record.get().get(SCHEMA_FIELD_ID_METADATA) != null) { + key = record.get().get(KEY_FIELD_NAME).toString(); + type = (int) record.get().get(SCHEMA_FIELD_NAME_TYPE); + if (record.get().get(SCHEMA_FIELD_NAME_METADATA) != null) { filesystemMetadata = (Map) record.get().get("filesystemMetadata"); filesystemMetadata.keySet().forEach(k -> { GenericRecord v = filesystemMetadata.get(k); @@ -237,8 +238,8 @@ private Map combineFilesystemMetadata(HoodieMeta @Override public String toString() { final StringBuilder sb = new StringBuilder("HoodieMetadataPayload {"); - sb.append(SCHEMA_FIELD_ID_KEY + "=").append(key).append(", "); - sb.append(SCHEMA_FIELD_ID_TYPE + "=").append(type).append(", "); + sb.append(KEY_FIELD_NAME + "=").append(key).append(", "); + sb.append(SCHEMA_FIELD_NAME_TYPE + "=").append(type).append(", "); sb.append("creations=").append(Arrays.toString(getFilenames().toArray())).append(", "); sb.append("deletions=").append(Arrays.toString(getDeletions().toArray())).append(", "); sb.append('}'); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index b71652b67d518..cbe1b90ce4228 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -341,7 +341,8 @@ public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, Map header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); - HoodieDataBlock dataBlock = (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) ? new HoodieHFileDataBlock(records, header) : + HoodieDataBlock dataBlock = (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) + ? new HoodieHFileDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD) : new HoodieAvroDataBlock(records, header); writer.appendBlock(dataBlock); return writer; diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip index 299b070bee34a..9611d27690577 100644 Binary files a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip differ diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip index d80439d20d3df..1e498310ff71a 100644 Binary files a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip differ