diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java index 6796557714221..ab8b83c14aeec 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java @@ -57,9 +57,9 @@ public Stream> locations() { BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(baseFile.getPath()); List hoodieKeyList = new ArrayList<>(); if (keyGeneratorOpt.isPresent()) { - hoodieKeyList = baseFileUtils.fetchRecordKeyPartitionPath(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()), keyGeneratorOpt); + hoodieKeyList = baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()), keyGeneratorOpt); } else { - hoodieKeyList = baseFileUtils.fetchRecordKeyPartitionPath(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())); + hoodieKeyList = baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())); } return hoodieKeyList.stream() .map(entry -> Pair.of(entry, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index 25a2bec5baff2..7ec6110d723ab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -167,18 +167,35 @@ public abstract Map readFooter(Configuration configuration, bool * Fetch {@link HoodieKey}s from the given data file. * @param configuration configuration to build fs object * @param filePath The data file path - * @return {@link List} of {@link HoodieKey}s fetched from the parquet file + * @return {@link List} of {@link HoodieKey}s fetched from the data file */ - public abstract List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath); + public abstract List fetchHoodieKeys(Configuration configuration, Path filePath); + + /** + * Provides a closable iterator for reading the given data file. + * @param configuration configuration to build fs object + * @param filePath The data file path + * @param keyGeneratorOpt instance of KeyGenerator. + * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the file + */ + public abstract ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt); + + /** + * Provides a closable iterator for reading the given data file. + * @param configuration configuration to build fs object + * @param filePath The data file path + * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the file + */ + public abstract ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath); /** * Fetch {@link HoodieKey}s from the given data file. * @param configuration configuration to build fs object * @param filePath The data file path * @param keyGeneratorOpt instance of KeyGenerator. - * @return {@link List} of {@link HoodieKey}s fetched from the parquet file + * @return {@link List} of {@link HoodieKey}s fetched from the data file */ - public abstract List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option keyGeneratorOpt); + public abstract List fetchHoodieKeys(Configuration configuration, Path filePath, Option keyGeneratorOpt); /** * Read the Avro schema of the data file. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java index 4b3caa756a65f..d9ceeeee40f63 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java @@ -29,19 +29,18 @@ import org.apache.orc.TypeDescription; import java.io.IOException; -import java.util.Iterator; /** * This class wraps a ORC reader and provides an iterator based api to read from an ORC file. */ -public class OrcReaderIterator implements Iterator { +public class OrcReaderIterator implements ClosableIterator { private final RecordReader recordReader; private final Schema avroSchema; - List fieldNames; - List orcFieldTypes; - Schema[] avroFieldSchemas; - private VectorizedRowBatch batch; + private final List fieldNames; + private final List orcFieldTypes; + private final Schema[] avroFieldSchemas; + private final VectorizedRowBatch batch; private int rowInBatch; private T next; @@ -52,7 +51,7 @@ public OrcReaderIterator(RecordReader recordReader, Schema schema, TypeDescripti this.orcFieldTypes = orcSchema.getChildren(); this.avroFieldSchemas = fieldNames.stream() .map(fieldName -> avroSchema.getField(fieldName).schema()) - .toArray(size -> new Schema[size]); + .toArray(Schema[]::new); this.batch = orcSchema.createRowBatch(); this.rowInBatch = 0; } @@ -115,4 +114,9 @@ private GenericData.Record readRecordFromBatch() throws IOException { rowInBatch++; return record; } + + @Override + public void close() { + FileIOUtils.closeQuietly(this.recordReader); + } } \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 26fa6928a8b9d..88c28d75204a7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -55,29 +56,23 @@ public class OrcUtils extends BaseFileUtils { /** - * Fetch {@link HoodieKey}s from the given ORC file. + * Provides a closable iterator for reading the given ORC file. * - * @param filePath The ORC file path. * @param configuration configuration to build fs object - * @return {@link List} of {@link HoodieKey}s fetched from the ORC file + * @param filePath The ORC file path + * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the ORC file */ @Override - public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) { - List hoodieKeys = new ArrayList<>(); + public ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath) { try { - if (!filePath.getFileSystem(configuration).exists(filePath)) { - return new ArrayList<>(); - } - Configuration conf = new Configuration(configuration); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema(); TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema); - List fieldNames = orcSchema.getFieldNames(); - VectorizedRowBatch batch = orcSchema.createRowBatch(); RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema)); + List fieldNames = orcSchema.getFieldNames(); // column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields int keyCol = -1; @@ -93,24 +88,43 @@ public List fetchRecordKeyPartitionPath(Configuration configuration, if (keyCol == -1 || partitionCol == -1) { throw new HoodieException(String.format("Couldn't find row keys or partition path in %s.", filePath)); } - while (recordReader.nextBatch(batch)) { - BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[keyCol]; - BytesColumnVector partitionPaths = (BytesColumnVector) batch.cols[partitionCol]; - for (int i = 0; i < batch.size; i++) { - String rowKey = rowKeys.toString(i); - String partitionPath = partitionPaths.toString(i); - hoodieKeys.add(new HoodieKey(rowKey, partitionPath)); - } + return new OrcReaderIterator<>(recordReader, readSchema, orcSchema); + } catch (IOException e) { + throw new HoodieIOException("Failed to open reader from ORC file:" + filePath, e); + } + } + + /** + * Fetch {@link HoodieKey}s from the given ORC file. + * + * @param filePath The ORC file path. + * @param configuration configuration to build fs object + * @return {@link List} of {@link HoodieKey}s fetched from the ORC file + */ + @Override + public List fetchHoodieKeys(Configuration configuration, Path filePath) { + try { + if (!filePath.getFileSystem(configuration).exists(filePath)) { + return Collections.emptyList(); } } catch (IOException e) { throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); } + List hoodieKeys = new ArrayList<>(); + try (ClosableIterator iterator = getHoodieKeyIterator(configuration, filePath, Option.empty())) { + iterator.forEachRemaining(hoodieKeys::add); + } return hoodieKeys; } @Override - public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option keyGeneratorOpt) { - throw new HoodieIOException("UnsupportedOperation : Disabling meta fields not yet supported for Orc"); + public List fetchHoodieKeys(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + throw new UnsupportedOperationException("Custom key generator is not supported yet"); + } + + @Override + public ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + throw new UnsupportedOperationException("Custom key generator is not supported yet"); } /** @@ -119,8 +133,7 @@ public List fetchRecordKeyPartitionPath(Configuration configuration, @Override public List readAvroRecords(Configuration configuration, Path filePath) { Schema avroSchema; - try { - Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration)); + try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration))) { avroSchema = AvroOrcUtils.createAvroSchema(reader.getSchema()); } catch (IOException io) { throw new HoodieIOException("Unable to read Avro records from an ORC file:" + filePath, io); @@ -134,14 +147,14 @@ public List readAvroRecords(Configuration configuration, Path fil @Override public List readAvroRecords(Configuration configuration, Path filePath, Schema avroSchema) { List records = new ArrayList<>(); - try { - Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration)); + try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration))) { TypeDescription orcSchema = reader.getSchema(); - RecordReader recordReader = reader.rows(new Options(configuration).schema(orcSchema)); - OrcReaderIterator iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema); - while (iterator.hasNext()) { - GenericRecord record = iterator.next(); - records.add(record); + try (RecordReader recordReader = reader.rows(new Options(configuration).schema(orcSchema))) { + OrcReaderIterator iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema); + while (iterator.hasNext()) { + GenericRecord record = iterator.next(); + records.add(record); + } } } catch (IOException io) { throw new HoodieIOException("Unable to create an ORC reader for ORC file:" + filePath, io); @@ -161,35 +174,35 @@ public List readAvroRecords(Configuration configuration, Path fil @Override public Set filterRowKeys(Configuration conf, Path filePath, Set filter) throws HoodieIOException { - try { - Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); - Set filteredRowKeys = new HashSet<>(); + try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));) { TypeDescription schema = reader.getSchema(); - List fieldNames = schema.getFieldNames(); - VectorizedRowBatch batch = schema.createRowBatch(); - RecordReader recordReader = reader.rows(new Options(conf).schema(schema)); + try (RecordReader recordReader = reader.rows(new Options(conf).schema(schema))) { + Set filteredRowKeys = new HashSet<>(); + List fieldNames = schema.getFieldNames(); + VectorizedRowBatch batch = schema.createRowBatch(); - // column index for the RECORD_KEY_METADATA_FIELD field - int colIndex = -1; - for (int i = 0; i < fieldNames.size(); i++) { - if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { - colIndex = i; - break; + // column index for the RECORD_KEY_METADATA_FIELD field + int colIndex = -1; + for (int i = 0; i < fieldNames.size(); i++) { + if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { + colIndex = i; + break; + } } - } - if (colIndex == -1) { - throw new HoodieException(String.format("Couldn't find row keys in %s.", filePath)); - } - while (recordReader.nextBatch(batch)) { - BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex]; - for (int i = 0; i < batch.size; i++) { - String rowKey = rowKeys.toString(i); - if (filter.isEmpty() || filter.contains(rowKey)) { - filteredRowKeys.add(rowKey); + if (colIndex == -1) { + throw new HoodieException(String.format("Couldn't find row keys in %s.", filePath)); + } + while (recordReader.nextBatch(batch)) { + BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex]; + for (int i = 0; i < batch.size; i++) { + String rowKey = rowKeys.toString(i); + if (filter.isEmpty() || filter.contains(rowKey)) { + filteredRowKeys.add(rowKey); + } } } + return filteredRowKeys; } - return filteredRowKeys; } catch (IOException io) { throw new HoodieIOException("Unable to read row keys for ORC file:" + filePath, io); } @@ -198,8 +211,7 @@ public Set filterRowKeys(Configuration conf, Path filePath, Set @Override public Map readFooter(Configuration conf, boolean required, Path orcFilePath, String... footerNames) { - try { - Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); + try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) { Map footerVals = new HashMap<>(); List metadataItemList = reader.getFileTail().getFooter().getMetadataList(); Map metadata = metadataItemList.stream().collect(Collectors.toMap( @@ -221,8 +233,7 @@ public Map readFooter(Configuration conf, boolean required, @Override public Schema readAvroSchema(Configuration conf, Path orcFilePath) { - try { - Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); + try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) { if (reader.hasMetadataValue("orc.avro.schema")) { ByteBuffer metadataValue = reader.getMetadataValue("orc.avro.schema"); byte[] bytes = new byte[metadataValue.remaining()]; @@ -239,8 +250,7 @@ public Schema readAvroSchema(Configuration conf, Path orcFilePath) { @Override public long getRowCount(Configuration conf, Path orcFilePath) { - try { - Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); + try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) { return reader.getNumberOfRows(); } catch (IOException io) { throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java index 5970e02d6799a..03bd471b606f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java @@ -24,13 +24,12 @@ import org.apache.parquet.hadoop.ParquetReader; import java.io.IOException; -import java.util.Iterator; /** * This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in * {@link BoundedInMemoryQueue} */ -public class ParquetReaderIterator implements Iterator { +public class ParquetReaderIterator implements ClosableIterator { // Parquet reader for an existing parquet file private final ParquetReader parquetReader; @@ -73,7 +72,11 @@ public T next() { } } - public void close() throws IOException { - parquetReader.close(); + public void close() { + try { + parquetReader.close(); + } catch (IOException e) { + throw new HoodieException("Exception while closing the parquet reader", e); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index ff85a6fd9d276..f67cf65258371 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -128,12 +128,26 @@ private static Set filterParquetRowKeys(Configuration configuration, Pat * @return {@link List} of {@link HoodieKey}s fetched from the parquet file */ @Override - public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) { - return fetchRecordKeyPartitionPathInternal(configuration, filePath, Option.empty()); + public List fetchHoodieKeys(Configuration configuration, Path filePath) { + return fetchHoodieKeys(configuration, filePath, Option.empty()); } - private List fetchRecordKeyPartitionPathInternal(Configuration configuration, Path filePath, Option keyGeneratorOpt) { - List hoodieKeys = new ArrayList<>(); + @Override + public ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath) { + return getHoodieKeyIterator(configuration, filePath, Option.empty()); + } + + /** + * Returns a closable iterator for reading the given parquet file. + * + * @param configuration configuration to build fs object + * @param filePath The parquet file path + * @param keyGeneratorOpt instance of KeyGenerator + * + * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the parquet file + */ + @Override + public ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt) { try { Configuration conf = new Configuration(configuration); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); @@ -146,27 +160,11 @@ private List fetchRecordKeyPartitionPathInternal(Configuration config .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema()); AvroReadSupport.setAvroReadSchema(conf, readSchema); AvroReadSupport.setRequestedProjection(conf, readSchema); - ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build(); - Object obj = reader.read(); - while (obj != null) { - if (obj instanceof GenericRecord) { - String recordKey = null; - String partitionPath = null; - if (keyGeneratorOpt.isPresent()) { - recordKey = keyGeneratorOpt.get().getRecordKey((GenericRecord) obj); - partitionPath = keyGeneratorOpt.get().getPartitionPath((GenericRecord) obj); - } else { - recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); - partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); - } - hoodieKeys.add(new HoodieKey(recordKey, partitionPath)); - obj = reader.read(); - } - } + ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build(); + return HoodieKeyIterator.getInstance(new ParquetReaderIterator<>(reader), keyGeneratorOpt); } catch (IOException e) { throw new HoodieIOException("Failed to read from Parquet file " + filePath, e); } - return hoodieKeys; } /** @@ -174,12 +172,16 @@ private List fetchRecordKeyPartitionPathInternal(Configuration config * * @param configuration configuration to build fs object * @param filePath The parquet file path. - * @param keyGeneratorOpt + * @param keyGeneratorOpt instance of KeyGenerator. * @return {@link List} of {@link HoodieKey}s fetched from the parquet file */ @Override - public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option keyGeneratorOpt) { - return fetchRecordKeyPartitionPathInternal(configuration, filePath, keyGeneratorOpt); + public List fetchHoodieKeys(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + List hoodieKeys = new ArrayList<>(); + try (ClosableIterator iterator = getHoodieKeyIterator(configuration, filePath, keyGeneratorOpt)) { + iterator.forEachRemaining(hoodieKeys::add); + return hoodieKeys; + } } public ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) { @@ -228,10 +230,8 @@ public Schema readAvroSchema(Configuration configuration, Path parquetFilePath) */ @Override public List readAvroRecords(Configuration configuration, Path filePath) { - ParquetReader reader = null; List records = new ArrayList<>(); - try { - reader = AvroParquetReader.builder(filePath).withConf(configuration).build(); + try (ParquetReader reader = AvroParquetReader.builder(filePath).withConf(configuration).build()) { Object obj = reader.read(); while (obj != null) { if (obj instanceof GenericRecord) { @@ -242,14 +242,6 @@ public List readAvroRecords(Configuration configuration, Path fil } catch (IOException e) { throw new HoodieIOException("Failed to read avro records from Parquet " + filePath, e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - // ignore - } - } } return records; } @@ -424,4 +416,55 @@ private static BigDecimal extractDecimal(Object val, DecimalMetadata decimalMeta throw new UnsupportedOperationException(String.format("Unsupported value type (%s)", val.getClass().getName())); } } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * An iterator that can apply the given function {@code func} to transform records + * from the underneath record iterator to hoodie keys. + */ + private static class HoodieKeyIterator implements ClosableIterator { + private final ClosableIterator nestedItr; + private final Function func; + + public static HoodieKeyIterator getInstance(ClosableIterator nestedItr, Option keyGenerator) { + return new HoodieKeyIterator(nestedItr, keyGenerator); + } + + private HoodieKeyIterator(ClosableIterator nestedItr, Option keyGenerator) { + this.nestedItr = nestedItr; + if (keyGenerator.isPresent()) { + this.func = retVal -> { + String recordKey = keyGenerator.get().getRecordKey(retVal); + String partitionPath = keyGenerator.get().getPartitionPath(retVal); + return new HoodieKey(recordKey, partitionPath); + }; + } else { + this.func = retVal -> { + String recordKey = retVal.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partitionPath = retVal.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + return new HoodieKey(recordKey, partitionPath); + }; + } + } + + @Override + public void close() { + if (this.nestedItr != null) { + this.nestedItr.close(); + } + } + + @Override + public boolean hasNext() { + return this.nestedItr.hasNext(); + } + + @Override + public HoodieKey next() { + return this.func.apply(this.nestedItr.next()); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java index e07c0fad3d24e..7bef8477125c2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java @@ -147,7 +147,7 @@ public void testFetchRecordKeyPartitionPathFromParquet(String typeCode) throws E // Read and verify List fetchedRows = - parquetUtils.fetchRecordKeyPartitionPath(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)); + parquetUtils.fetchHoodieKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)); assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match"); for (HoodieKey entry : fetchedRows) { @@ -173,7 +173,7 @@ public void testFetchRecordKeyPartitionPathVirtualKeysFromParquet() throws Excep // Read and verify List fetchedRows = - parquetUtils.fetchRecordKeyPartitionPath(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath), + parquetUtils.fetchHoodieKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath), Option.of(new TestBaseKeyGen("abc","def"))); assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index eee9347ba1a0e..0f944c56577e2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -210,17 +211,10 @@ protected void loadRecords(String partitionPath) throws Exception { if (!isValidFile(baseFile.getFileStatus())) { return; } - - final List hoodieKeys; - try { - hoodieKeys = - fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath())); - } catch (Exception e) { - throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e); - } - - for (HoodieKey hoodieKey : hoodieKeys) { - output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)))); + try (ClosableIterator iterator = fileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(baseFile.getPath()))) { + iterator.forEachRemaining(hoodieKey -> { + output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)))); + }); } });