diff --git a/docker/compose/docker-compose_hadoop284_hive233_spark244.yml b/docker/compose/docker-compose_hadoop284_hive233_spark244.yml index 086004f121e97..b8217fc0d0401 100644 --- a/docker/compose/docker-compose_hadoop284_hive233_spark244.yml +++ b/docker/compose/docker-compose_hadoop284_hive233_spark244.yml @@ -26,6 +26,8 @@ services: ports: - "50070:50070" - "8020:8020" + # JVM debugging port (will be mapped to a random port on host) + - "5005" env_file: - ./hadoop.env healthcheck: @@ -45,6 +47,8 @@ services: ports: - "50075:50075" - "50010:50010" + # JVM debugging port (will be mapped to a random port on host) + - "5005" links: - "namenode" - "historyserver" @@ -99,6 +103,8 @@ services: SERVICE_PRECONDITION: "namenode:50070 hive-metastore-postgresql:5432" ports: - "9083:9083" + # JVM debugging port (will be mapped to a random port on host) + - "5005" healthcheck: test: ["CMD", "nc", "-z", "hivemetastore", "9083"] interval: 30s @@ -118,6 +124,8 @@ services: SERVICE_PRECONDITION: "hivemetastore:9083" ports: - "10000:10000" + # JVM debugging port (will be mapped to a random port on host) + - "5005" depends_on: - "hivemetastore" links: @@ -136,6 +144,8 @@ services: ports: - "8080:8080" - "7077:7077" + # JVM debugging port (will be mapped to a random port on host) + - "5005" environment: - INIT_DAEMON_STEP=setup_spark links: @@ -154,6 +164,8 @@ services: - sparkmaster ports: - "8081:8081" + # JVM debugging port (will be mapped to a random port on host) + - "5005" environment: - "SPARK_MASTER=spark://sparkmaster:7077" links: @@ -167,7 +179,7 @@ services: hostname: zookeeper container_name: zookeeper ports: - - '2181:2181' + - "2181:2181" environment: - ALLOW_ANONYMOUS_LOGIN=yes @@ -176,7 +188,7 @@ services: hostname: kafkabroker container_name: kafkabroker ports: - - '9092:9092' + - "9092:9092" environment: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes @@ -186,7 +198,9 @@ services: hostname: presto-coordinator-1 image: apachehudi/hudi-hadoop_2.8.4-prestobase_0.271:latest ports: - - '8090:8090' + - "8090:8090" + # JVM debugging port (will be mapped to a random port on host) + - "5005" environment: - PRESTO_JVM_MAX_HEAP=512M - PRESTO_QUERY_MAX_MEMORY=1GB @@ -226,7 +240,9 @@ services: hostname: trino-coordinator-1 image: apachehudi/hudi-hadoop_2.8.4-trinocoordinator_368:latest ports: - - '8091:8091' + - "8091:8091" + # JVM debugging port (will be mapped to a random port on host) + - "5005" links: - "hivemetastore" volumes: @@ -239,7 +255,9 @@ services: image: apachehudi/hudi-hadoop_2.8.4-trinoworker_368:latest depends_on: [ "trino-coordinator-1" ] ports: - - '8092:8092' + - "8092:8092" + # JVM debugging port (will be mapped to a random port on host) + - "5005" links: - "hivemetastore" - "hiveserver" @@ -268,6 +286,8 @@ services: - sparkmaster ports: - '4040:4040' + # JVM debugging port (mapped to 5006 on the host) + - "5006:5005" environment: - "SPARK_MASTER=spark://sparkmaster:7077" links: @@ -286,6 +306,9 @@ services: container_name: adhoc-2 env_file: - ./hadoop.env + ports: + # JVM debugging port (mapped to 5005 on the host) + - "5005:5005" depends_on: - sparkmaster environment: diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java index c33c0f08ca830..022f600b5e078 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -93,7 +94,8 @@ public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTa public void write(GenericRecord oldRecord) { String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt); try { - fileWriter.writeAvro(key, oldRecord); + // NOTE: We're enforcing preservation of the record metadata to keep existing semantic + writeToFile(new HoodieKey(key, partitionPath), oldRecord, true); } catch (IOException | RuntimeException e) { String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s", key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 91a7622bf8065..41d583668a933 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -142,7 +142,7 @@ public void write(HoodieRecord record, Option avroRecord) { fileWriter.writeAvro(record.getRecordKey(), rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName())); } else { - fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) avroRecord.get()), record); + fileWriter.writeAvroWithMetadata(record.getKey(), rewriteRecord((GenericRecord) avroRecord.get())); } // update the new location of record, so we know where to find it next record.unseal(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 06e752f59daea..3363571ddf0cb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -292,13 +293,7 @@ protected boolean writeRecord(HoodieRecord hoodieRecord, Option> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 931b08c2fe0c2..d6c1d1be40f36 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -47,7 +47,7 @@ */ public class HoodieSortedMergeHandle extends HoodieMergeHandle { - private Queue newRecordKeysSorted = new PriorityQueue<>(); + private final Queue newRecordKeysSorted = new PriorityQueue<>(); public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java index 9f749566b255b..1d1dd5c9bae6d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.avro.generic.IndexedRecord; @@ -29,7 +30,7 @@ public interface HoodieFileWriter { - void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException; + void writeAvroWithMetadata(HoodieKey key, R newRecord) throws IOException; boolean canWrite(); @@ -37,9 +38,9 @@ public interface HoodieFileWriter { void writeAvro(String key, R oldRecord) throws IOException; - default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) { + default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) { String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement()); - HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); + HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); return; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index 1642eb2c42fc6..91f79cefa23d2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -18,16 +18,6 @@ package org.apache.hudi.io.storage; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -40,6 +30,15 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.io.Writable; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import java.io.DataInput; import java.io.DataOutput; @@ -111,13 +110,13 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC } @Override - public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException { + public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException { if (populateMetaFields) { - prepRecordWithMetadata(avroRecord, record, instantTime, + prepRecordWithMetadata(key, avroRecord, instantTime, taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName()); - writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord); + writeAvro(key.getRecordKey(), avroRecord); } else { - writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord); + writeAvro(key.getRecordKey(), avroRecord); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java index 3fe8be05c09f0..17d5ead3efb79 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java @@ -18,34 +18,35 @@ package org.apache.hudi.io.storage; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.orc.storage.ql.exec.vector.ColumnVector; -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; -import org.apache.orc.OrcFile; -import org.apache.orc.TypeDescription; -import org.apache.orc.Writer; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.AvroOrcUtils; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; public class HoodieOrcWriter implements HoodieFileWriter, Closeable { @@ -94,10 +95,10 @@ public HoodieOrcWriter(String instantTime, Path file, HoodieOrcConfig config, Sc } @Override - public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException { - prepRecordWithMetadata(avroRecord, record, instantTime, + public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException { + prepRecordWithMetadata(key, avroRecord, instantTime, taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName()); - writeAvro(record.getRecordKey(), avroRecord); + writeAvro(key.getRecordKey(), avroRecord); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java index 957a0ff52e91d..5b3c69ddf943e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java @@ -18,16 +18,15 @@ package org.apache.hudi.io.storage; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; - -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; @@ -84,12 +83,12 @@ public HoodieParquetWriter(String instantTime, } @Override - public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException { + public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException { if (populateMetaFields) { - prepRecordWithMetadata(avroRecord, record, instantTime, + prepRecordWithMetadata(key, avroRecord, instantTime, taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName()); super.write(avroRecord); - writeSupport.add(record.getRecordKey()); + writeSupport.add(key.getRecordKey()); } else { super.write(avroRecord); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java index 2db8eb0204b34..da6f717258877 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -154,8 +154,9 @@ public void testWriteReadHFileWithMetaFields(boolean populateMetaFields, boolean record.put("time", Integer.toString(RANDOM.nextInt())); record.put("number", i); if (testAvroWithMeta) { - writer.writeAvroWithMetadata(record, new HoodieAvroRecord(new HoodieKey((String) record.get("_row_key"), - Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed in is what matters + // payload does not matter. GenericRecord passed in is what matters + writer.writeAvroWithMetadata(new HoodieAvroRecord(new HoodieKey((String) record.get("_row_key"), + Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload()).getKey(), record); // only HoodieKey will be looked up from the 2nd arg(HoodieRecord). } else { writer.writeAvro(key, record); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 17ebccb153d2d..72749160e6bd0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -21,13 +21,15 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieCommonConfig; -import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -36,8 +38,6 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestUtils; - -import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -51,6 +51,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; @@ -94,7 +96,6 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap .withProperties(properties) .build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - FileSystem fs = FSUtils.getFs(basePath, hadoopConf); /** * Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times @@ -202,6 +203,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap // Check the entire dataset has 47 records still dataSet = getRecords(); assertEquals(47, dataSet.count(), "Must contain 47 records"); + Row[] rows = (Row[]) dataSet.collect(); int record1Count = 0; int record2Count = 0; @@ -228,6 +230,22 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap // Assert that id2 record count which has been updated to rider-004 and driver-004 is 21, which is the total // number of records with row_key id2 assertEquals(21, record2Count); + + // Validate that all the records only reference the _latest_ base files as part of the + // FILENAME_METADATA_FIELD payload (entailing that corresponding metadata is in-sync with + // the state of the table + HoodieTableFileSystemView tableView = + getHoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), HoodieTestTable.of(metaClient).listAllBaseFiles()); + + Set latestBaseFileNames = tableView.getLatestBaseFiles() + .map(BaseFile::getFileName) + .collect(Collectors.toSet()); + + Set metadataFilenameFieldRefs = dataSet.collectAsList().stream() + .map(row -> row.getAs(HoodieRecord.FILENAME_METADATA_FIELD)) + .collect(Collectors.toSet()); + + assertEquals(latestBaseFileNames, metadataFilenameFieldRefs); } }