From 38e390e15df681761491177da56e5f032a940d49 Mon Sep 17 00:00:00 2001 From: Bo Date: Thu, 17 Feb 2022 18:29:20 +0800 Subject: [PATCH 1/4] [HUDI-3446][HUDI-FLINK] support batch Reader in BootstrapOperator#loadRecords --- .../hudi/common/util/BaseFileUtils.java | 32 +++++ .../org/apache/hudi/common/util/OrcUtils.java | 113 ++++++++++++++---- .../apache/hudi/common/util/ParquetUtils.java | 60 +++++++--- .../hudi/configuration/FlinkOptions.java | 7 ++ .../sink/bootstrap/BootstrapOperator.java | 35 ++++-- .../hudi/sink/TestWriteCopyOnWrite.java | 9 +- 6 files changed, 205 insertions(+), 51 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index 25a2bec5baff2..08c4db05dca12 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util; +import java.io.Closeable; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -171,6 +172,33 @@ public abstract Map readFooter(Configuration configuration, bool */ public abstract List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath); + /** + * Fetch {@link HoodieKey}s from the given data file. + * @param reader The file reader + * @param filePath The data file path + * @return {@link List} of {@link HoodieKey}s fetched from the parquet file + */ + public abstract List fetchRecordKeyPartitionPath(BaseFileReader reader, Path filePath, int batchSize); + + /** + * Open File Reader. + * @param configuration configuration to build file reader + * @param filePath The data file path + * @param keyGeneratorOpt instance of KeyGenerator + * @return file reader + */ + public abstract BaseFileReader getReader(Configuration configuration, Path filePath, Option keyGeneratorOpt); + + /** + * Open File Reader. + * @param configuration configuration to build file reader + * @param filePath The data file path + * @return file reader + */ + public BaseFileReader getReader(Configuration configuration, Path filePath) { + return getReader(configuration, filePath, Option.empty()); + } + /** * Fetch {@link HoodieKey}s from the given data file. * @param configuration configuration to build fs object @@ -187,4 +215,8 @@ public abstract Map readFooter(Configuration configuration, bool * @return The Avro schema of the data file */ public abstract Schema readAvroSchema(Configuration configuration, Path filePath); + + public abstract class BaseFileReader implements Closeable { + + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 26fa6928a8b9d..04bb363cf0e05 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -54,6 +54,22 @@ */ public class OrcUtils extends BaseFileUtils { + @Override + public BaseFileReader getReader(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + try { + Configuration conf = new Configuration(configuration); + conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); + Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); + + Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema(); + TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema); + RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema)); + return new OrcFileInnerReader(reader, readSchema, orcSchema, recordReader); + } catch (IOException e) { + throw new HoodieIOException("Failed to open reader from ORC file:" + filePath, e); + } + } + /** * Fetch {@link HoodieKey}s from the given ORC file. * @@ -63,21 +79,31 @@ public class OrcUtils extends BaseFileUtils { */ @Override public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) { - List hoodieKeys = new ArrayList<>(); try { if (!filePath.getFileSystem(configuration).exists(filePath)) { return new ArrayList<>(); } + } catch (IOException e) { + throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); + } + try (BaseFileReader reader = getReader(configuration, filePath)) { + return fetchRecordKeyPartitionPathInternal(reader, filePath, -1); + } catch (IOException e) { + throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); + } + } - Configuration conf = new Configuration(configuration); - conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); - Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); + @Override + public List fetchRecordKeyPartitionPath(BaseFileReader reader, Path filePath, int batchSize) { + return fetchRecordKeyPartitionPathInternal(reader, filePath, batchSize); + } - Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema(); - TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema); - List fieldNames = orcSchema.getFieldNames(); - VectorizedRowBatch batch = orcSchema.createRowBatch(); - RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema)); + public List fetchRecordKeyPartitionPathInternal(BaseFileReader reader, Path filePath, int batchSize) { + List hoodieKeys = new ArrayList<>(); + try { + List fieldNames = (((OrcFileInnerReader)reader).orcSchema).getFieldNames(); + VectorizedRowBatch batch = (((OrcFileInnerReader)reader).orcSchema).createRowBatch(); + RecordReader recordReader = ((OrcFileInnerReader)reader).recordReader; // column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields int keyCol = -1; @@ -100,6 +126,9 @@ public List fetchRecordKeyPartitionPath(Configuration configuration, String rowKey = rowKeys.toString(i); String partitionPath = partitionPaths.toString(i); hoodieKeys.add(new HoodieKey(rowKey, partitionPath)); + if (batchSize > 0 && hoodieKeys.size() >= batchSize) { + break; + } } } } catch (IOException e) { @@ -119,8 +148,7 @@ public List fetchRecordKeyPartitionPath(Configuration configuration, @Override public List readAvroRecords(Configuration configuration, Path filePath) { Schema avroSchema; - try { - Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration)); + try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration))) { avroSchema = AvroOrcUtils.createAvroSchema(reader.getSchema()); } catch (IOException io) { throw new HoodieIOException("Unable to read Avro records from an ORC file:" + filePath, io); @@ -134,10 +162,12 @@ public List readAvroRecords(Configuration configuration, Path fil @Override public List readAvroRecords(Configuration configuration, Path filePath, Schema avroSchema) { List records = new ArrayList<>(); + Reader reader = null; + RecordReader recordReader = null; try { - Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration)); + reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration)); TypeDescription orcSchema = reader.getSchema(); - RecordReader recordReader = reader.rows(new Options(configuration).schema(orcSchema)); + recordReader = reader.rows(new Options(configuration).schema(orcSchema)); OrcReaderIterator iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema); while (iterator.hasNext()) { GenericRecord record = iterator.next(); @@ -145,6 +175,17 @@ public List readAvroRecords(Configuration configuration, Path fil } } catch (IOException io) { throw new HoodieIOException("Unable to create an ORC reader for ORC file:" + filePath, io); + } finally { + try { + if (reader != null) { + reader.close(); + } + if (recordReader != null) { + recordReader.close(); + } + } catch (IOException e) { + // ignore + } } return records; } @@ -161,13 +202,15 @@ public List readAvroRecords(Configuration configuration, Path fil @Override public Set filterRowKeys(Configuration conf, Path filePath, Set filter) throws HoodieIOException { + Reader reader = null; + RecordReader recordReader = null; try { - Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); + reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); Set filteredRowKeys = new HashSet<>(); TypeDescription schema = reader.getSchema(); List fieldNames = schema.getFieldNames(); VectorizedRowBatch batch = schema.createRowBatch(); - RecordReader recordReader = reader.rows(new Options(conf).schema(schema)); + recordReader = reader.rows(new Options(conf).schema(schema)); // column index for the RECORD_KEY_METADATA_FIELD field int colIndex = -1; @@ -192,14 +235,24 @@ public Set filterRowKeys(Configuration conf, Path filePath, Set return filteredRowKeys; } catch (IOException io) { throw new HoodieIOException("Unable to read row keys for ORC file:" + filePath, io); + } finally { + try { + if (reader != null) { + reader.close(); + } + if (recordReader != null) { + recordReader.close(); + } + } catch (IOException e) { + //ignore + } } } @Override public Map readFooter(Configuration conf, boolean required, Path orcFilePath, String... footerNames) { - try { - Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); + try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) { Map footerVals = new HashMap<>(); List metadataItemList = reader.getFileTail().getFooter().getMetadataList(); Map metadata = metadataItemList.stream().collect(Collectors.toMap( @@ -221,8 +274,7 @@ public Map readFooter(Configuration conf, boolean required, @Override public Schema readAvroSchema(Configuration conf, Path orcFilePath) { - try { - Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); + try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) { if (reader.hasMetadataValue("orc.avro.schema")) { ByteBuffer metadataValue = reader.getMetadataValue("orc.avro.schema"); byte[] bytes = new byte[metadataValue.remaining()]; @@ -239,11 +291,30 @@ public Schema readAvroSchema(Configuration conf, Path orcFilePath) { @Override public long getRowCount(Configuration conf, Path orcFilePath) { - try { - Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); + try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) { return reader.getNumberOfRows(); } catch (IOException io) { throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io); } } + + private class OrcFileInnerReader extends BaseFileReader { + Reader reader; + Schema readSchema; + TypeDescription orcSchema; + RecordReader recordReader; + + public OrcFileInnerReader(Reader reader, Schema readSchema, TypeDescription orcSchema, RecordReader recordReader) { + this.reader = reader; + this.readSchema = readSchema; + this.orcSchema = orcSchema; + this.recordReader = recordReader; + } + + @Override + public void close() throws IOException { + this.recordReader.close(); + this.reader.close(); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index ff85a6fd9d276..0f7a75cb1c8e7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -129,11 +129,16 @@ private static Set filterParquetRowKeys(Configuration configuration, Pat */ @Override public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) { - return fetchRecordKeyPartitionPathInternal(configuration, filePath, Option.empty()); + return fetchRecordKeyPartitionPath(configuration, filePath, Option.empty()); } - private List fetchRecordKeyPartitionPathInternal(Configuration configuration, Path filePath, Option keyGeneratorOpt) { - List hoodieKeys = new ArrayList<>(); + @Override + public List fetchRecordKeyPartitionPath(BaseFileReader reader, Path filePath, int batchSize) { + return fetchRecordKeyPartitionPathInternal(reader, filePath, Option.empty(), batchSize); + } + + @Override + public BaseFileReader getReader(Configuration configuration, Path filePath, Option keyGeneratorOpt) { try { Configuration conf = new Configuration(configuration); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); @@ -143,11 +148,20 @@ private List fetchRecordKeyPartitionPathInternal(Configuration config fields.addAll(keyGenerator.getPartitionPathFields()); return HoodieAvroUtils.getSchemaForFields(readAvroSchema(conf, filePath), fields); }) - .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema()); + .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema()); AvroReadSupport.setAvroReadSchema(conf, readSchema); AvroReadSupport.setRequestedProjection(conf, readSchema); ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build(); - Object obj = reader.read(); + return new ParquetFileInnerReader(reader); + } catch (IOException e) { + throw new HoodieIOException("Failed to open reader from Parquet file " + filePath, e); + } + } + + private List fetchRecordKeyPartitionPathInternal(BaseFileReader reader, Path filePath, Option keyGeneratorOpt, int batchSize) { + List hoodieKeys = new ArrayList<>(); + try { + Object obj = (((ParquetFileInnerReader)reader).reader).read(); while (obj != null) { if (obj instanceof GenericRecord) { String recordKey = null; @@ -160,7 +174,10 @@ private List fetchRecordKeyPartitionPathInternal(Configuration config partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); } hoodieKeys.add(new HoodieKey(recordKey, partitionPath)); - obj = reader.read(); + if (batchSize > 0 && hoodieKeys.size() >= batchSize) { + break; + } + obj = (((ParquetFileInnerReader)reader).reader).read(); } } } catch (IOException e) { @@ -179,7 +196,11 @@ private List fetchRecordKeyPartitionPathInternal(Configuration config */ @Override public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option keyGeneratorOpt) { - return fetchRecordKeyPartitionPathInternal(configuration, filePath, keyGeneratorOpt); + try (BaseFileReader reader = getReader(configuration, filePath, keyGeneratorOpt)) { + return fetchRecordKeyPartitionPathInternal(reader, filePath, keyGeneratorOpt, -1); + } catch (IOException e) { + throw new HoodieIOException("Failed to read from Parquet file " + filePath, e); + } } public ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) { @@ -228,10 +249,8 @@ public Schema readAvroSchema(Configuration configuration, Path parquetFilePath) */ @Override public List readAvroRecords(Configuration configuration, Path filePath) { - ParquetReader reader = null; List records = new ArrayList<>(); - try { - reader = AvroParquetReader.builder(filePath).withConf(configuration).build(); + try (ParquetReader reader = AvroParquetReader.builder(filePath).withConf(configuration).build()) { Object obj = reader.read(); while (obj != null) { if (obj instanceof GenericRecord) { @@ -242,14 +261,6 @@ public List readAvroRecords(Configuration configuration, Path fil } catch (IOException e) { throw new HoodieIOException("Failed to read avro records from Parquet " + filePath, e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - // ignore - } - } } return records; } @@ -424,4 +435,17 @@ private static BigDecimal extractDecimal(Object val, DecimalMetadata decimalMeta throw new UnsupportedOperationException(String.format("Unsupported value type (%s)", val.getClass().getName())); } } + + private class ParquetFileInnerReader extends BaseFileReader { + ParquetReader reader; + + private ParquetFileInnerReader(ParquetReader reader) { + this.reader = reader; + } + + @Override + public void close() throws IOException { + this.reader.close(); + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 77c3f15e54c45..439169338acaf 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -131,6 +131,13 @@ private FlinkOptions() { .defaultValue(".*") .withDescription("Whether to load partitions in state if partition path matching, default *"); + public static final ConfigOption READ_RECORD_BATCH_SIZE = ConfigOptions + .key("read.record.batch.size") + .intType() + .defaultValue(-1) + .withDescription("The Batch Size of Read Record, if the Config is less than or equal to 0, Batch Read is disabled,\n" + + "default -1"); + // ------------------------------------------------------------------------ // Read Options // ------------------------------------------------------------------------ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index eee9347ba1a0e..6e8dbb0ab5ce4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -92,6 +92,7 @@ public class BootstrapOperator> private transient ListState instantState; private final Pattern pattern; private String lastInstantTime; + private int batchSize = -1; public BootstrapOperator(Configuration conf) { this.conf = conf; @@ -124,6 +125,7 @@ public void initializeState(StateInitializationContext context) throws Exception this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext()); this.aggregateManager = getRuntimeContext().getGlobalAggregateManager(); + this.batchSize = conf.getInteger(FlinkOptions.READ_RECORD_BATCH_SIZE); preLoadIndexRecords(); } @@ -211,16 +213,29 @@ protected void loadRecords(String partitionPath) throws Exception { return; } - final List hoodieKeys; - try { - hoodieKeys = - fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath())); - } catch (Exception e) { - throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e); - } - - for (HoodieKey hoodieKey : hoodieKeys) { - output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)))); + if (this.batchSize > 0) { + List hoodieKeys; + Path filePath = new Path(baseFile.getPath()); + try (BaseFileUtils.BaseFileReader reader = fileUtils.getReader(this.hadoopConf, filePath)) { + do { + hoodieKeys = fileUtils.fetchRecordKeyPartitionPath(reader, filePath, batchSize); + hoodieKeys.forEach(hoodieKey -> + output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))))); + } while (hoodieKeys.size() >= this.batchSize); + } catch (Exception e) { + throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e); + } + } else { + final List hoodieKeys; + try { + hoodieKeys = + fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath())); + } catch (Exception e) { + throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e); + } + + hoodieKeys.forEach(hoodieKey -> + output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))))); } }); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index a91f45263ff25..5b72088c9a96f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -33,12 +33,15 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import static org.apache.hudi.configuration.FlinkOptions.READ_RECORD_BATCH_SIZE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; @@ -338,8 +341,10 @@ protected Map getExpectedBeforeCheckpointComplete() { return EXPECTED2; } - @Test - public void testIndexStateBootstrap() throws Exception { + @ParameterizedTest(name = "read.record.batch.size={0}") + @ValueSource(ints = {-1, 1, 2, 10}) + public void testIndexStateBootstrap(int argument) throws Exception { + conf.setInteger(READ_RECORD_BATCH_SIZE, argument); // open the function and ingest data preparePipeline() .consume(TestData.DATA_SET_INSERT) From 821993994f981600a89e7f2239387d63f6a14916 Mon Sep 17 00:00:00 2001 From: Bo Date: Fri, 18 Feb 2022 17:13:56 +0800 Subject: [PATCH 2/4] [HUDI-3446][HUDI-FLINK] support batch Reader in BootstrapOperator#loadRecord --- .../hudi/common/util/BaseFileUtils.java | 18 ++++++------- .../org/apache/hudi/common/util/OrcUtils.java | 26 +++++++++---------- .../apache/hudi/common/util/ParquetUtils.java | 20 +++++++------- .../sink/bootstrap/BootstrapOperator.java | 2 +- 4 files changed, 33 insertions(+), 33 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index 08c4db05dca12..329ae42204e8e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -178,25 +178,25 @@ public abstract Map readFooter(Configuration configuration, bool * @param filePath The data file path * @return {@link List} of {@link HoodieKey}s fetched from the parquet file */ - public abstract List fetchRecordKeyPartitionPath(BaseFileReader reader, Path filePath, int batchSize); + public abstract List fetchRecordKeyPartitionPath(ReaderWrapper reader, Path filePath, int batchSize); /** * Open File Reader. - * @param configuration configuration to build file reader - * @param filePath The data file path + * @param configuration configuration to build file reader + * @param filePath The data file path * @param keyGeneratorOpt instance of KeyGenerator * @return file reader */ - public abstract BaseFileReader getReader(Configuration configuration, Path filePath, Option keyGeneratorOpt); + public abstract ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration, Path filePath, Option keyGeneratorOpt); /** * Open File Reader. - * @param configuration configuration to build file reader - * @param filePath The data file path + * @param configuration configuration to build file reader + * @param filePath The data file path * @return file reader */ - public BaseFileReader getReader(Configuration configuration, Path filePath) { - return getReader(configuration, filePath, Option.empty()); + public ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration, Path filePath) { + return getRecordKeyPartitionPathReader(configuration, filePath, Option.empty()); } /** @@ -216,7 +216,7 @@ public BaseFileReader getReader(Configuration configuration, Path filePath) { */ public abstract Schema readAvroSchema(Configuration configuration, Path filePath); - public abstract class BaseFileReader implements Closeable { + public abstract class ReaderWrapper implements Closeable { } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 04bb363cf0e05..2bb9d67841e64 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -55,7 +55,7 @@ public class OrcUtils extends BaseFileUtils { @Override - public BaseFileReader getReader(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + public ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration, Path filePath, Option keyGeneratorOpt) { try { Configuration conf = new Configuration(configuration); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); @@ -64,7 +64,7 @@ public BaseFileReader getReader(Configuration configuration, Path filePath, Opti Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema(); TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema); RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema)); - return new OrcFileInnerReader(reader, readSchema, orcSchema, recordReader); + return new OrcReaderWrapper(reader, readSchema, orcSchema, recordReader); } catch (IOException e) { throw new HoodieIOException("Failed to open reader from ORC file:" + filePath, e); } @@ -86,7 +86,7 @@ public List fetchRecordKeyPartitionPath(Configuration configuration, } catch (IOException e) { throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); } - try (BaseFileReader reader = getReader(configuration, filePath)) { + try (ReaderWrapper reader = getRecordKeyPartitionPathReader(configuration, filePath)) { return fetchRecordKeyPartitionPathInternal(reader, filePath, -1); } catch (IOException e) { throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); @@ -94,16 +94,16 @@ public List fetchRecordKeyPartitionPath(Configuration configuration, } @Override - public List fetchRecordKeyPartitionPath(BaseFileReader reader, Path filePath, int batchSize) { + public List fetchRecordKeyPartitionPath(ReaderWrapper reader, Path filePath, int batchSize) { return fetchRecordKeyPartitionPathInternal(reader, filePath, batchSize); } - public List fetchRecordKeyPartitionPathInternal(BaseFileReader reader, Path filePath, int batchSize) { + public List fetchRecordKeyPartitionPathInternal(ReaderWrapper reader, Path filePath, int batchSize) { List hoodieKeys = new ArrayList<>(); try { - List fieldNames = (((OrcFileInnerReader)reader).orcSchema).getFieldNames(); - VectorizedRowBatch batch = (((OrcFileInnerReader)reader).orcSchema).createRowBatch(); - RecordReader recordReader = ((OrcFileInnerReader)reader).recordReader; + List fieldNames = (((OrcReaderWrapper)reader).orcSchema).getFieldNames(); + VectorizedRowBatch batch = (((OrcReaderWrapper)reader).orcSchema).createRowBatch(); + RecordReader recordReader = ((OrcReaderWrapper)reader).recordReader; // column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields int keyCol = -1; @@ -126,9 +126,9 @@ public List fetchRecordKeyPartitionPathInternal(BaseFileReader reader String rowKey = rowKeys.toString(i); String partitionPath = partitionPaths.toString(i); hoodieKeys.add(new HoodieKey(rowKey, partitionPath)); - if (batchSize > 0 && hoodieKeys.size() >= batchSize) { - break; - } + } + if (batchSize > 0 && hoodieKeys.size() >= batchSize) { + break; } } } catch (IOException e) { @@ -298,13 +298,13 @@ public long getRowCount(Configuration conf, Path orcFilePath) { } } - private class OrcFileInnerReader extends BaseFileReader { + private class OrcReaderWrapper extends ReaderWrapper { Reader reader; Schema readSchema; TypeDescription orcSchema; RecordReader recordReader; - public OrcFileInnerReader(Reader reader, Schema readSchema, TypeDescription orcSchema, RecordReader recordReader) { + public OrcReaderWrapper(Reader reader, Schema readSchema, TypeDescription orcSchema, RecordReader recordReader) { this.reader = reader; this.readSchema = readSchema; this.orcSchema = orcSchema; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 0f7a75cb1c8e7..4fbf129555013 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -133,12 +133,12 @@ public List fetchRecordKeyPartitionPath(Configuration configuration, } @Override - public List fetchRecordKeyPartitionPath(BaseFileReader reader, Path filePath, int batchSize) { + public List fetchRecordKeyPartitionPath(ReaderWrapper reader, Path filePath, int batchSize) { return fetchRecordKeyPartitionPathInternal(reader, filePath, Option.empty(), batchSize); } @Override - public BaseFileReader getReader(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + public ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration, Path filePath, Option keyGeneratorOpt) { try { Configuration conf = new Configuration(configuration); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); @@ -148,20 +148,20 @@ public BaseFileReader getReader(Configuration configuration, Path filePath, Opti fields.addAll(keyGenerator.getPartitionPathFields()); return HoodieAvroUtils.getSchemaForFields(readAvroSchema(conf, filePath), fields); }) - .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema()); + .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema()); AvroReadSupport.setAvroReadSchema(conf, readSchema); AvroReadSupport.setRequestedProjection(conf, readSchema); ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build(); - return new ParquetFileInnerReader(reader); + return new ParquetReaderWrapper(reader); } catch (IOException e) { throw new HoodieIOException("Failed to open reader from Parquet file " + filePath, e); } } - private List fetchRecordKeyPartitionPathInternal(BaseFileReader reader, Path filePath, Option keyGeneratorOpt, int batchSize) { + private List fetchRecordKeyPartitionPathInternal(ReaderWrapper reader, Path filePath, Option keyGeneratorOpt, int batchSize) { List hoodieKeys = new ArrayList<>(); try { - Object obj = (((ParquetFileInnerReader)reader).reader).read(); + Object obj = (((ParquetReaderWrapper)reader).reader).read(); while (obj != null) { if (obj instanceof GenericRecord) { String recordKey = null; @@ -177,7 +177,7 @@ private List fetchRecordKeyPartitionPathInternal(BaseFileReader reade if (batchSize > 0 && hoodieKeys.size() >= batchSize) { break; } - obj = (((ParquetFileInnerReader)reader).reader).read(); + obj = (((ParquetReaderWrapper)reader).reader).read(); } } } catch (IOException e) { @@ -196,7 +196,7 @@ private List fetchRecordKeyPartitionPathInternal(BaseFileReader reade */ @Override public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option keyGeneratorOpt) { - try (BaseFileReader reader = getReader(configuration, filePath, keyGeneratorOpt)) { + try (ReaderWrapper reader = getRecordKeyPartitionPathReader(configuration, filePath, keyGeneratorOpt)) { return fetchRecordKeyPartitionPathInternal(reader, filePath, keyGeneratorOpt, -1); } catch (IOException e) { throw new HoodieIOException("Failed to read from Parquet file " + filePath, e); @@ -436,10 +436,10 @@ private static BigDecimal extractDecimal(Object val, DecimalMetadata decimalMeta } } - private class ParquetFileInnerReader extends BaseFileReader { + private class ParquetReaderWrapper extends ReaderWrapper { ParquetReader reader; - private ParquetFileInnerReader(ParquetReader reader) { + private ParquetReaderWrapper(ParquetReader reader) { this.reader = reader; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 6e8dbb0ab5ce4..13721163c63f7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -216,7 +216,7 @@ protected void loadRecords(String partitionPath) throws Exception { if (this.batchSize > 0) { List hoodieKeys; Path filePath = new Path(baseFile.getPath()); - try (BaseFileUtils.BaseFileReader reader = fileUtils.getReader(this.hadoopConf, filePath)) { + try (BaseFileUtils.ReaderWrapper reader = fileUtils.getRecordKeyPartitionPathReader(this.hadoopConf, filePath)) { do { hoodieKeys = fileUtils.fetchRecordKeyPartitionPath(reader, filePath, batchSize); hoodieKeys.forEach(hoodieKey -> From 3ca0191014d2714fd63319aab36431236bd156ad Mon Sep 17 00:00:00 2001 From: Bo Date: Sat, 19 Feb 2022 10:25:48 +0800 Subject: [PATCH 3/4] [HUDI-3446][HUDI-FLINK] support batch Reader in BootstrapOperator#loadRecord --- .../hudi/common/util/BaseFileUtils.java | 39 ++---- .../hudi/common/util/OrcReaderIterator.java | 27 ++-- .../org/apache/hudi/common/util/OrcUtils.java | 120 ++++++++---------- .../common/util/ParquetReaderIterator.java | 17 +-- .../apache/hudi/common/util/ParquetUtils.java | 79 +++++------- .../hudi/configuration/FlinkOptions.java | 7 - .../sink/bootstrap/BootstrapOperator.java | 31 +---- .../hudi/sink/TestWriteCopyOnWrite.java | 9 +- 8 files changed, 124 insertions(+), 205 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index 329ae42204e8e..99eec1cbe9e38 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.util; -import java.io.Closeable; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -168,43 +167,33 @@ public abstract Map readFooter(Configuration configuration, bool * Fetch {@link HoodieKey}s from the given data file. * @param configuration configuration to build fs object * @param filePath The data file path - * @return {@link List} of {@link HoodieKey}s fetched from the parquet file + * @return {@link List} of {@link HoodieKey}s fetched from the data file */ public abstract List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath); /** - * Fetch {@link HoodieKey}s from the given data file. - * @param reader The file reader + * Provides a closable iterator for reading the given data file. + * @param configuration configuration to build fs object * @param filePath The data file path - * @return {@link List} of {@link HoodieKey}s fetched from the parquet file - */ - public abstract List fetchRecordKeyPartitionPath(ReaderWrapper reader, Path filePath, int batchSize); - - /** - * Open File Reader. - * @param configuration configuration to build file reader - * @param filePath The data file path - * @param keyGeneratorOpt instance of KeyGenerator - * @return file reader + * @param keyGeneratorOpt instance of KeyGenerator. + * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the file */ - public abstract ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration, Path filePath, Option keyGeneratorOpt); + public abstract ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt); /** - * Open File Reader. - * @param configuration configuration to build file reader - * @param filePath The data file path - * @return file reader + * Provides a closable iterator for reading the given data file. + * @param configuration configuration to build fs object + * @param filePath The data file path + * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the file */ - public ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration, Path filePath) { - return getRecordKeyPartitionPathReader(configuration, filePath, Option.empty()); - } + public abstract ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath); /** * Fetch {@link HoodieKey}s from the given data file. * @param configuration configuration to build fs object * @param filePath The data file path * @param keyGeneratorOpt instance of KeyGenerator. - * @return {@link List} of {@link HoodieKey}s fetched from the parquet file + * @return {@link List} of {@link HoodieKey}s fetched from the data file */ public abstract List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option keyGeneratorOpt); @@ -215,8 +204,4 @@ public ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration * @return The Avro schema of the data file */ public abstract Schema readAvroSchema(Configuration configuration, Path filePath); - - public abstract class ReaderWrapper implements Closeable { - - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java index 4b3caa756a65f..f81865ce81e17 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java @@ -20,7 +20,6 @@ import java.util.List; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.hudi.exception.HoodieIOException; @@ -29,20 +28,19 @@ import org.apache.orc.TypeDescription; import java.io.IOException; -import java.util.Iterator; /** * This class wraps a ORC reader and provides an iterator based api to read from an ORC file. */ -public class OrcReaderIterator implements Iterator { +public class OrcReaderIterator implements ClosableIterator { private final RecordReader recordReader; private final Schema avroSchema; - List fieldNames; - List orcFieldTypes; - Schema[] avroFieldSchemas; + private List fieldNames; + protected List orcFieldTypes; + protected Schema[] avroFieldSchemas; private VectorizedRowBatch batch; - private int rowInBatch; + protected int rowInBatch; private T next; public OrcReaderIterator(RecordReader recordReader, Schema schema, TypeDescription orcSchema) { @@ -62,7 +60,7 @@ public OrcReaderIterator(RecordReader recordReader, Schema schema, TypeDescripti * @return true if we have rows available. * @throws IOException */ - private boolean ensureBatch() throws IOException { + protected boolean ensureBatch() throws IOException { if (rowInBatch >= batch.size) { rowInBatch = 0; return recordReader.nextBatch(batch); @@ -93,26 +91,31 @@ public T next() { } } T retVal = this.next; - this.next = (T) readRecordFromBatch(); + this.next = readRecordFromBatch(); return retVal; } catch (IOException io) { throw new HoodieIOException("unable to read next record from ORC file ", io); } } - private GenericData.Record readRecordFromBatch() throws IOException { + protected T readRecordFromBatch() throws IOException { // No more records left to read from ORC file if (!ensureBatch()) { return null; } - GenericData.Record record = new Record(avroSchema); + Record record = new Record(avroSchema); int numFields = orcFieldTypes.size(); for (int i = 0; i < numFields; i++) { Object data = AvroOrcUtils.readFromVector(orcFieldTypes.get(i), batch.cols[i], avroFieldSchemas[i], rowInBatch); record.put(fieldNames.get(i), data); } rowInBatch++; - return record; + return (T) record; + } + + @Override + public void close() { + FileIOUtils.closeQuietly(this.recordReader); } } \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 2bb9d67841e64..be7d2b5bfbc4d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -54,8 +54,15 @@ */ public class OrcUtils extends BaseFileUtils { + /** + * Provides a closable iterator for reading the given ORC file. + * + * @param configuration configuration to build fs object + * @param filePath The ORC file path + * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the ORC file + */ @Override - public ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + public ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath) { try { Configuration conf = new Configuration(configuration); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); @@ -64,7 +71,41 @@ public ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema(); TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema); RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema)); - return new OrcReaderWrapper(reader, readSchema, orcSchema, recordReader); + List fieldNames = orcSchema.getFieldNames(); + VectorizedRowBatch batch = orcSchema.createRowBatch(); + + // column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields + int keyCol = -1; + int partitionCol = -1; + for (int i = 0; i < fieldNames.size(); i++) { + if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { + keyCol = i; + } + if (fieldNames.get(i).equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)) { + partitionCol = i; + } + } + if (keyCol == -1 || partitionCol == -1) { + throw new HoodieException(String.format("Couldn't find row keys or partition path in %s.", filePath)); + } + final int finalKeyCol = keyCol; + final int finalPartitionCol = partitionCol; + OrcReaderIterator iterator = new OrcReaderIterator(recordReader, readSchema, orcSchema) { + @Override + protected Object readRecordFromBatch() throws IOException { + // No more records left to read from ORC file + if (!ensureBatch()) { + return null; + } + String rowKey = (String) AvroOrcUtils.readFromVector((TypeDescription) orcFieldTypes.get(finalKeyCol), + batch.cols[finalKeyCol], avroFieldSchemas[finalKeyCol], rowInBatch); + String partitionPath = (String) AvroOrcUtils.readFromVector((TypeDescription) orcFieldTypes.get(finalPartitionCol), + batch.cols[finalPartitionCol], avroFieldSchemas[finalPartitionCol], rowInBatch); + rowInBatch++; + return new HoodieKey(rowKey, partitionPath); + } + }; + return iterator; } catch (IOException e) { throw new HoodieIOException("Failed to open reader from ORC file:" + filePath, e); } @@ -86,53 +127,9 @@ public List fetchRecordKeyPartitionPath(Configuration configuration, } catch (IOException e) { throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); } - try (ReaderWrapper reader = getRecordKeyPartitionPathReader(configuration, filePath)) { - return fetchRecordKeyPartitionPathInternal(reader, filePath, -1); - } catch (IOException e) { - throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); - } - } - - @Override - public List fetchRecordKeyPartitionPath(ReaderWrapper reader, Path filePath, int batchSize) { - return fetchRecordKeyPartitionPathInternal(reader, filePath, batchSize); - } - - public List fetchRecordKeyPartitionPathInternal(ReaderWrapper reader, Path filePath, int batchSize) { List hoodieKeys = new ArrayList<>(); - try { - List fieldNames = (((OrcReaderWrapper)reader).orcSchema).getFieldNames(); - VectorizedRowBatch batch = (((OrcReaderWrapper)reader).orcSchema).createRowBatch(); - RecordReader recordReader = ((OrcReaderWrapper)reader).recordReader; - - // column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields - int keyCol = -1; - int partitionCol = -1; - for (int i = 0; i < fieldNames.size(); i++) { - if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { - keyCol = i; - } - if (fieldNames.get(i).equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)) { - partitionCol = i; - } - } - if (keyCol == -1 || partitionCol == -1) { - throw new HoodieException(String.format("Couldn't find row keys or partition path in %s.", filePath)); - } - while (recordReader.nextBatch(batch)) { - BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[keyCol]; - BytesColumnVector partitionPaths = (BytesColumnVector) batch.cols[partitionCol]; - for (int i = 0; i < batch.size; i++) { - String rowKey = rowKeys.toString(i); - String partitionPath = partitionPaths.toString(i); - hoodieKeys.add(new HoodieKey(rowKey, partitionPath)); - } - if (batchSize > 0 && hoodieKeys.size() >= batchSize) { - break; - } - } - } catch (IOException e) { - throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); + try (ClosableIterator iterator = fetchRecordKeyPartitionPathIterator(configuration, filePath, Option.empty())) { + iterator.forEachRemaining(hoodieKeys::add); } return hoodieKeys; } @@ -142,6 +139,11 @@ public List fetchRecordKeyPartitionPath(Configuration configuration, throw new HoodieIOException("UnsupportedOperation : Disabling meta fields not yet supported for Orc"); } + @Override + public ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + throw new HoodieIOException("UnsupportedOperation : Disabling meta fields not yet supported for Orc"); + } + /** * NOTE: This literally reads the entire file contents, thus should be used with caution. */ @@ -244,7 +246,7 @@ public Set filterRowKeys(Configuration conf, Path filePath, Set recordReader.close(); } } catch (IOException e) { - //ignore + // ignore } } } @@ -297,24 +299,4 @@ public long getRowCount(Configuration conf, Path orcFilePath) { throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io); } } - - private class OrcReaderWrapper extends ReaderWrapper { - Reader reader; - Schema readSchema; - TypeDescription orcSchema; - RecordReader recordReader; - - public OrcReaderWrapper(Reader reader, Schema readSchema, TypeDescription orcSchema, RecordReader recordReader) { - this.reader = reader; - this.readSchema = readSchema; - this.orcSchema = orcSchema; - this.recordReader = recordReader; - } - - @Override - public void close() throws IOException { - this.recordReader.close(); - this.reader.close(); - } - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java index 5970e02d6799a..28d90e0b801b2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java @@ -23,14 +23,11 @@ import org.apache.parquet.hadoop.ParquetReader; -import java.io.IOException; -import java.util.Iterator; - /** * This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in * {@link BoundedInMemoryQueue} */ -public class ParquetReaderIterator implements Iterator { +public class ParquetReaderIterator implements ClosableIterator { // Parquet reader for an existing parquet file private final ParquetReader parquetReader; @@ -66,14 +63,18 @@ public T next() { } T retVal = this.next; this.next = parquetReader.read(); - return retVal; + return map(retVal); } catch (Exception e) { FileIOUtils.closeQuietly(parquetReader); throw new HoodieException("unable to read next record from parquet file ", e); } } - public void close() throws IOException { - parquetReader.close(); + protected T map(T retVal) { + return retVal; + } + + public void close() { + FileIOUtils.closeQuietly(parquetReader); } -} +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 4fbf129555013..b2a2455ab3143 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -133,12 +133,20 @@ public List fetchRecordKeyPartitionPath(Configuration configuration, } @Override - public List fetchRecordKeyPartitionPath(ReaderWrapper reader, Path filePath, int batchSize) { - return fetchRecordKeyPartitionPathInternal(reader, filePath, Option.empty(), batchSize); + public ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath) { + return fetchRecordKeyPartitionPathIterator(configuration, filePath, Option.empty()); } + /** + * Provides a closable iterator for reading the given parquet file. + * + * @param configuration configuration to build fs object + * @param filePath The parquet file path + * @param keyGeneratorOpt instance of KeyGenerator + * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the parquet file + */ @Override - public ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + public ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt) { try { Configuration conf = new Configuration(configuration); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); @@ -152,38 +160,24 @@ public ReaderWrapper getRecordKeyPartitionPathReader(Configuration configuration AvroReadSupport.setAvroReadSchema(conf, readSchema); AvroReadSupport.setRequestedProjection(conf, readSchema); ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build(); - return new ParquetReaderWrapper(reader); - } catch (IOException e) { - throw new HoodieIOException("Failed to open reader from Parquet file " + filePath, e); - } - } - - private List fetchRecordKeyPartitionPathInternal(ReaderWrapper reader, Path filePath, Option keyGeneratorOpt, int batchSize) { - List hoodieKeys = new ArrayList<>(); - try { - Object obj = (((ParquetReaderWrapper)reader).reader).read(); - while (obj != null) { - if (obj instanceof GenericRecord) { - String recordKey = null; - String partitionPath = null; - if (keyGeneratorOpt.isPresent()) { - recordKey = keyGeneratorOpt.get().getRecordKey((GenericRecord) obj); - partitionPath = keyGeneratorOpt.get().getPartitionPath((GenericRecord) obj); - } else { - recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); - partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); - } - hoodieKeys.add(new HoodieKey(recordKey, partitionPath)); - if (batchSize > 0 && hoodieKeys.size() >= batchSize) { - break; - } - obj = (((ParquetReaderWrapper)reader).reader).read(); + return new ParquetReaderIterator(reader) { + @Override + protected Object map(Object retVal) { + String recordKey = null; + String partitionPath = null; + if (keyGeneratorOpt.isPresent()) { + recordKey = keyGeneratorOpt.get().getRecordKey((GenericRecord) retVal); + partitionPath = keyGeneratorOpt.get().getPartitionPath((GenericRecord) retVal); + } else { + recordKey = ((GenericRecord) retVal).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + partitionPath = ((GenericRecord) retVal).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + } + return new HoodieKey(recordKey, partitionPath); } - } + }; } catch (IOException e) { throw new HoodieIOException("Failed to read from Parquet file " + filePath, e); } - return hoodieKeys; } /** @@ -191,15 +185,15 @@ private List fetchRecordKeyPartitionPathInternal(ReaderWrapper reader * * @param configuration configuration to build fs object * @param filePath The parquet file path. - * @param keyGeneratorOpt + * @param keyGeneratorOpt instance of KeyGenerator. * @return {@link List} of {@link HoodieKey}s fetched from the parquet file */ @Override public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option keyGeneratorOpt) { - try (ReaderWrapper reader = getRecordKeyPartitionPathReader(configuration, filePath, keyGeneratorOpt)) { - return fetchRecordKeyPartitionPathInternal(reader, filePath, keyGeneratorOpt, -1); - } catch (IOException e) { - throw new HoodieIOException("Failed to read from Parquet file " + filePath, e); + List hoodieKeys = new ArrayList<>(); + try (ClosableIterator iterator = fetchRecordKeyPartitionPathIterator(configuration, filePath, keyGeneratorOpt)) { + iterator.forEachRemaining(hoodieKeys::add); + return hoodieKeys; } } @@ -435,17 +429,4 @@ private static BigDecimal extractDecimal(Object val, DecimalMetadata decimalMeta throw new UnsupportedOperationException(String.format("Unsupported value type (%s)", val.getClass().getName())); } } - - private class ParquetReaderWrapper extends ReaderWrapper { - ParquetReader reader; - - private ParquetReaderWrapper(ParquetReader reader) { - this.reader = reader; - } - - @Override - public void close() throws IOException { - this.reader.close(); - } - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 439169338acaf..77c3f15e54c45 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -131,13 +131,6 @@ private FlinkOptions() { .defaultValue(".*") .withDescription("Whether to load partitions in state if partition path matching, default *"); - public static final ConfigOption READ_RECORD_BATCH_SIZE = ConfigOptions - .key("read.record.batch.size") - .intType() - .defaultValue(-1) - .withDescription("The Batch Size of Read Record, if the Config is less than or equal to 0, Batch Read is disabled,\n" - + "default -1"); - // ------------------------------------------------------------------------ // Read Options // ------------------------------------------------------------------------ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 13721163c63f7..27efef2228f40 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -92,7 +93,6 @@ public class BootstrapOperator> private transient ListState instantState; private final Pattern pattern; private String lastInstantTime; - private int batchSize = -1; public BootstrapOperator(Configuration conf) { this.conf = conf; @@ -125,7 +125,6 @@ public void initializeState(StateInitializationContext context) throws Exception this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext()); this.aggregateManager = getRuntimeContext().getGlobalAggregateManager(); - this.batchSize = conf.getInteger(FlinkOptions.READ_RECORD_BATCH_SIZE); preLoadIndexRecords(); } @@ -212,30 +211,10 @@ protected void loadRecords(String partitionPath) throws Exception { if (!isValidFile(baseFile.getFileStatus())) { return; } - - if (this.batchSize > 0) { - List hoodieKeys; - Path filePath = new Path(baseFile.getPath()); - try (BaseFileUtils.ReaderWrapper reader = fileUtils.getRecordKeyPartitionPathReader(this.hadoopConf, filePath)) { - do { - hoodieKeys = fileUtils.fetchRecordKeyPartitionPath(reader, filePath, batchSize); - hoodieKeys.forEach(hoodieKey -> - output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))))); - } while (hoodieKeys.size() >= this.batchSize); - } catch (Exception e) { - throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e); - } - } else { - final List hoodieKeys; - try { - hoodieKeys = - fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath())); - } catch (Exception e) { - throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e); - } - - hoodieKeys.forEach(hoodieKey -> - output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))))); + try (ClosableIterator iterator = fileUtils.fetchRecordKeyPartitionPathIterator(this.hadoopConf, new Path(baseFile.getPath()))) { + iterator.forEachRemaining(hoodieKey -> { + output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(iterator.next(), fileSlice)))); + }); } }); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 5b72088c9a96f..a91f45263ff25 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -33,15 +33,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import static org.apache.hudi.configuration.FlinkOptions.READ_RECORD_BATCH_SIZE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; @@ -341,10 +338,8 @@ protected Map getExpectedBeforeCheckpointComplete() { return EXPECTED2; } - @ParameterizedTest(name = "read.record.batch.size={0}") - @ValueSource(ints = {-1, 1, 2, 10}) - public void testIndexStateBootstrap(int argument) throws Exception { - conf.setInteger(READ_RECORD_BATCH_SIZE, argument); + @Test + public void testIndexStateBootstrap() throws Exception { // open the function and ingest data preparePipeline() .consume(TestData.DATA_SET_INSERT) From 9c8cd84551964d1000a0ca6c2a6ad9f59aa52afa Mon Sep 17 00:00:00 2001 From: Bo Date: Sat, 19 Feb 2022 19:13:51 +0800 Subject: [PATCH 4/4] [HUDI-3446][HUDI-FLINK] support batch Reader in BootstrapOperator#loadRecord --- .../hudi/io/HoodieKeyLocationFetchHandle.java | 4 +- .../hudi/common/util/BaseFileUtils.java | 8 +- .../hudi/common/util/OrcReaderIterator.java | 23 ++-- .../org/apache/hudi/common/util/OrcUtils.java | 123 ++++++------------ .../common/util/ParquetReaderIterator.java | 16 ++- .../apache/hudi/common/util/ParquetUtils.java | 86 ++++++++---- .../hudi/common/util/TestParquetUtils.java | 4 +- .../sink/bootstrap/BootstrapOperator.java | 4 +- 8 files changed, 133 insertions(+), 135 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java index 6796557714221..ab8b83c14aeec 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java @@ -57,9 +57,9 @@ public Stream> locations() { BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(baseFile.getPath()); List hoodieKeyList = new ArrayList<>(); if (keyGeneratorOpt.isPresent()) { - hoodieKeyList = baseFileUtils.fetchRecordKeyPartitionPath(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()), keyGeneratorOpt); + hoodieKeyList = baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()), keyGeneratorOpt); } else { - hoodieKeyList = baseFileUtils.fetchRecordKeyPartitionPath(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())); + hoodieKeyList = baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())); } return hoodieKeyList.stream() .map(entry -> Pair.of(entry, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index 99eec1cbe9e38..7ec6110d723ab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -169,7 +169,7 @@ public abstract Map readFooter(Configuration configuration, bool * @param filePath The data file path * @return {@link List} of {@link HoodieKey}s fetched from the data file */ - public abstract List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath); + public abstract List fetchHoodieKeys(Configuration configuration, Path filePath); /** * Provides a closable iterator for reading the given data file. @@ -178,7 +178,7 @@ public abstract Map readFooter(Configuration configuration, bool * @param keyGeneratorOpt instance of KeyGenerator. * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the file */ - public abstract ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt); + public abstract ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt); /** * Provides a closable iterator for reading the given data file. @@ -186,7 +186,7 @@ public abstract Map readFooter(Configuration configuration, bool * @param filePath The data file path * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the file */ - public abstract ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath); + public abstract ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath); /** * Fetch {@link HoodieKey}s from the given data file. @@ -195,7 +195,7 @@ public abstract Map readFooter(Configuration configuration, bool * @param keyGeneratorOpt instance of KeyGenerator. * @return {@link List} of {@link HoodieKey}s fetched from the data file */ - public abstract List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option keyGeneratorOpt); + public abstract List fetchHoodieKeys(Configuration configuration, Path filePath, Option keyGeneratorOpt); /** * Read the Avro schema of the data file. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java index f81865ce81e17..d9ceeeee40f63 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.hudi.exception.HoodieIOException; @@ -36,11 +37,11 @@ public class OrcReaderIterator implements ClosableIterator { private final RecordReader recordReader; private final Schema avroSchema; - private List fieldNames; - protected List orcFieldTypes; - protected Schema[] avroFieldSchemas; - private VectorizedRowBatch batch; - protected int rowInBatch; + private final List fieldNames; + private final List orcFieldTypes; + private final Schema[] avroFieldSchemas; + private final VectorizedRowBatch batch; + private int rowInBatch; private T next; public OrcReaderIterator(RecordReader recordReader, Schema schema, TypeDescription orcSchema) { @@ -50,7 +51,7 @@ public OrcReaderIterator(RecordReader recordReader, Schema schema, TypeDescripti this.orcFieldTypes = orcSchema.getChildren(); this.avroFieldSchemas = fieldNames.stream() .map(fieldName -> avroSchema.getField(fieldName).schema()) - .toArray(size -> new Schema[size]); + .toArray(Schema[]::new); this.batch = orcSchema.createRowBatch(); this.rowInBatch = 0; } @@ -60,7 +61,7 @@ public OrcReaderIterator(RecordReader recordReader, Schema schema, TypeDescripti * @return true if we have rows available. * @throws IOException */ - protected boolean ensureBatch() throws IOException { + private boolean ensureBatch() throws IOException { if (rowInBatch >= batch.size) { rowInBatch = 0; return recordReader.nextBatch(batch); @@ -91,27 +92,27 @@ public T next() { } } T retVal = this.next; - this.next = readRecordFromBatch(); + this.next = (T) readRecordFromBatch(); return retVal; } catch (IOException io) { throw new HoodieIOException("unable to read next record from ORC file ", io); } } - protected T readRecordFromBatch() throws IOException { + private GenericData.Record readRecordFromBatch() throws IOException { // No more records left to read from ORC file if (!ensureBatch()) { return null; } - Record record = new Record(avroSchema); + GenericData.Record record = new Record(avroSchema); int numFields = orcFieldTypes.size(); for (int i = 0; i < numFields; i++) { Object data = AvroOrcUtils.readFromVector(orcFieldTypes.get(i), batch.cols[i], avroFieldSchemas[i], rowInBatch); record.put(fieldNames.get(i), data); } rowInBatch++; - return (T) record; + return record; } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index be7d2b5bfbc4d..88c28d75204a7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -62,7 +63,7 @@ public class OrcUtils extends BaseFileUtils { * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the ORC file */ @Override - public ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath) { + public ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath) { try { Configuration conf = new Configuration(configuration); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); @@ -72,7 +73,6 @@ public ClosableIterator fetchRecordKeyPartitionPathIterator(Configura TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema); RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema)); List fieldNames = orcSchema.getFieldNames(); - VectorizedRowBatch batch = orcSchema.createRowBatch(); // column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields int keyCol = -1; @@ -88,24 +88,7 @@ public ClosableIterator fetchRecordKeyPartitionPathIterator(Configura if (keyCol == -1 || partitionCol == -1) { throw new HoodieException(String.format("Couldn't find row keys or partition path in %s.", filePath)); } - final int finalKeyCol = keyCol; - final int finalPartitionCol = partitionCol; - OrcReaderIterator iterator = new OrcReaderIterator(recordReader, readSchema, orcSchema) { - @Override - protected Object readRecordFromBatch() throws IOException { - // No more records left to read from ORC file - if (!ensureBatch()) { - return null; - } - String rowKey = (String) AvroOrcUtils.readFromVector((TypeDescription) orcFieldTypes.get(finalKeyCol), - batch.cols[finalKeyCol], avroFieldSchemas[finalKeyCol], rowInBatch); - String partitionPath = (String) AvroOrcUtils.readFromVector((TypeDescription) orcFieldTypes.get(finalPartitionCol), - batch.cols[finalPartitionCol], avroFieldSchemas[finalPartitionCol], rowInBatch); - rowInBatch++; - return new HoodieKey(rowKey, partitionPath); - } - }; - return iterator; + return new OrcReaderIterator<>(recordReader, readSchema, orcSchema); } catch (IOException e) { throw new HoodieIOException("Failed to open reader from ORC file:" + filePath, e); } @@ -119,29 +102,29 @@ protected Object readRecordFromBatch() throws IOException { * @return {@link List} of {@link HoodieKey}s fetched from the ORC file */ @Override - public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) { + public List fetchHoodieKeys(Configuration configuration, Path filePath) { try { if (!filePath.getFileSystem(configuration).exists(filePath)) { - return new ArrayList<>(); + return Collections.emptyList(); } } catch (IOException e) { throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); } List hoodieKeys = new ArrayList<>(); - try (ClosableIterator iterator = fetchRecordKeyPartitionPathIterator(configuration, filePath, Option.empty())) { + try (ClosableIterator iterator = getHoodieKeyIterator(configuration, filePath, Option.empty())) { iterator.forEachRemaining(hoodieKeys::add); } return hoodieKeys; } @Override - public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option keyGeneratorOpt) { - throw new HoodieIOException("UnsupportedOperation : Disabling meta fields not yet supported for Orc"); + public List fetchHoodieKeys(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + throw new UnsupportedOperationException("Custom key generator is not supported yet"); } @Override - public ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt) { - throw new HoodieIOException("UnsupportedOperation : Disabling meta fields not yet supported for Orc"); + public ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + throw new UnsupportedOperationException("Custom key generator is not supported yet"); } /** @@ -164,30 +147,17 @@ public List readAvroRecords(Configuration configuration, Path fil @Override public List readAvroRecords(Configuration configuration, Path filePath, Schema avroSchema) { List records = new ArrayList<>(); - Reader reader = null; - RecordReader recordReader = null; - try { - reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration)); + try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration))) { TypeDescription orcSchema = reader.getSchema(); - recordReader = reader.rows(new Options(configuration).schema(orcSchema)); - OrcReaderIterator iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema); - while (iterator.hasNext()) { - GenericRecord record = iterator.next(); - records.add(record); + try (RecordReader recordReader = reader.rows(new Options(configuration).schema(orcSchema))) { + OrcReaderIterator iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema); + while (iterator.hasNext()) { + GenericRecord record = iterator.next(); + records.add(record); + } } } catch (IOException io) { throw new HoodieIOException("Unable to create an ORC reader for ORC file:" + filePath, io); - } finally { - try { - if (reader != null) { - reader.close(); - } - if (recordReader != null) { - recordReader.close(); - } - } catch (IOException e) { - // ignore - } } return records; } @@ -204,50 +174,37 @@ public List readAvroRecords(Configuration configuration, Path fil @Override public Set filterRowKeys(Configuration conf, Path filePath, Set filter) throws HoodieIOException { - Reader reader = null; - RecordReader recordReader = null; - try { - reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); - Set filteredRowKeys = new HashSet<>(); + try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));) { TypeDescription schema = reader.getSchema(); - List fieldNames = schema.getFieldNames(); - VectorizedRowBatch batch = schema.createRowBatch(); - recordReader = reader.rows(new Options(conf).schema(schema)); + try (RecordReader recordReader = reader.rows(new Options(conf).schema(schema))) { + Set filteredRowKeys = new HashSet<>(); + List fieldNames = schema.getFieldNames(); + VectorizedRowBatch batch = schema.createRowBatch(); - // column index for the RECORD_KEY_METADATA_FIELD field - int colIndex = -1; - for (int i = 0; i < fieldNames.size(); i++) { - if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { - colIndex = i; - break; + // column index for the RECORD_KEY_METADATA_FIELD field + int colIndex = -1; + for (int i = 0; i < fieldNames.size(); i++) { + if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { + colIndex = i; + break; + } } - } - if (colIndex == -1) { - throw new HoodieException(String.format("Couldn't find row keys in %s.", filePath)); - } - while (recordReader.nextBatch(batch)) { - BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex]; - for (int i = 0; i < batch.size; i++) { - String rowKey = rowKeys.toString(i); - if (filter.isEmpty() || filter.contains(rowKey)) { - filteredRowKeys.add(rowKey); + if (colIndex == -1) { + throw new HoodieException(String.format("Couldn't find row keys in %s.", filePath)); + } + while (recordReader.nextBatch(batch)) { + BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex]; + for (int i = 0; i < batch.size; i++) { + String rowKey = rowKeys.toString(i); + if (filter.isEmpty() || filter.contains(rowKey)) { + filteredRowKeys.add(rowKey); + } } } + return filteredRowKeys; } - return filteredRowKeys; } catch (IOException io) { throw new HoodieIOException("Unable to read row keys for ORC file:" + filePath, io); - } finally { - try { - if (reader != null) { - reader.close(); - } - if (recordReader != null) { - recordReader.close(); - } - } catch (IOException e) { - // ignore - } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java index 28d90e0b801b2..03bd471b606f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java @@ -23,6 +23,8 @@ import org.apache.parquet.hadoop.ParquetReader; +import java.io.IOException; + /** * This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in * {@link BoundedInMemoryQueue} @@ -63,18 +65,18 @@ public T next() { } T retVal = this.next; this.next = parquetReader.read(); - return map(retVal); + return retVal; } catch (Exception e) { FileIOUtils.closeQuietly(parquetReader); throw new HoodieException("unable to read next record from parquet file ", e); } } - protected T map(T retVal) { - return retVal; - } - public void close() { - FileIOUtils.closeQuietly(parquetReader); + try { + parquetReader.close(); + } catch (IOException e) { + throw new HoodieException("Exception while closing the parquet reader", e); + } } -} \ No newline at end of file +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index b2a2455ab3143..f67cf65258371 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -128,25 +128,26 @@ private static Set filterParquetRowKeys(Configuration configuration, Pat * @return {@link List} of {@link HoodieKey}s fetched from the parquet file */ @Override - public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) { - return fetchRecordKeyPartitionPath(configuration, filePath, Option.empty()); + public List fetchHoodieKeys(Configuration configuration, Path filePath) { + return fetchHoodieKeys(configuration, filePath, Option.empty()); } @Override - public ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath) { - return fetchRecordKeyPartitionPathIterator(configuration, filePath, Option.empty()); + public ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath) { + return getHoodieKeyIterator(configuration, filePath, Option.empty()); } /** - * Provides a closable iterator for reading the given parquet file. + * Returns a closable iterator for reading the given parquet file. * * @param configuration configuration to build fs object * @param filePath The parquet file path * @param keyGeneratorOpt instance of KeyGenerator + * * @return {@link ClosableIterator} of {@link HoodieKey}s for reading the parquet file */ @Override - public ClosableIterator fetchRecordKeyPartitionPathIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + public ClosableIterator getHoodieKeyIterator(Configuration configuration, Path filePath, Option keyGeneratorOpt) { try { Configuration conf = new Configuration(configuration); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); @@ -159,22 +160,8 @@ public ClosableIterator fetchRecordKeyPartitionPathIterator(Configura .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema()); AvroReadSupport.setAvroReadSchema(conf, readSchema); AvroReadSupport.setRequestedProjection(conf, readSchema); - ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build(); - return new ParquetReaderIterator(reader) { - @Override - protected Object map(Object retVal) { - String recordKey = null; - String partitionPath = null; - if (keyGeneratorOpt.isPresent()) { - recordKey = keyGeneratorOpt.get().getRecordKey((GenericRecord) retVal); - partitionPath = keyGeneratorOpt.get().getPartitionPath((GenericRecord) retVal); - } else { - recordKey = ((GenericRecord) retVal).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); - partitionPath = ((GenericRecord) retVal).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); - } - return new HoodieKey(recordKey, partitionPath); - } - }; + ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build(); + return HoodieKeyIterator.getInstance(new ParquetReaderIterator<>(reader), keyGeneratorOpt); } catch (IOException e) { throw new HoodieIOException("Failed to read from Parquet file " + filePath, e); } @@ -189,9 +176,9 @@ protected Object map(Object retVal) { * @return {@link List} of {@link HoodieKey}s fetched from the parquet file */ @Override - public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option keyGeneratorOpt) { + public List fetchHoodieKeys(Configuration configuration, Path filePath, Option keyGeneratorOpt) { List hoodieKeys = new ArrayList<>(); - try (ClosableIterator iterator = fetchRecordKeyPartitionPathIterator(configuration, filePath, keyGeneratorOpt)) { + try (ClosableIterator iterator = getHoodieKeyIterator(configuration, filePath, keyGeneratorOpt)) { iterator.forEachRemaining(hoodieKeys::add); return hoodieKeys; } @@ -429,4 +416,55 @@ private static BigDecimal extractDecimal(Object val, DecimalMetadata decimalMeta throw new UnsupportedOperationException(String.format("Unsupported value type (%s)", val.getClass().getName())); } } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * An iterator that can apply the given function {@code func} to transform records + * from the underneath record iterator to hoodie keys. + */ + private static class HoodieKeyIterator implements ClosableIterator { + private final ClosableIterator nestedItr; + private final Function func; + + public static HoodieKeyIterator getInstance(ClosableIterator nestedItr, Option keyGenerator) { + return new HoodieKeyIterator(nestedItr, keyGenerator); + } + + private HoodieKeyIterator(ClosableIterator nestedItr, Option keyGenerator) { + this.nestedItr = nestedItr; + if (keyGenerator.isPresent()) { + this.func = retVal -> { + String recordKey = keyGenerator.get().getRecordKey(retVal); + String partitionPath = keyGenerator.get().getPartitionPath(retVal); + return new HoodieKey(recordKey, partitionPath); + }; + } else { + this.func = retVal -> { + String recordKey = retVal.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partitionPath = retVal.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + return new HoodieKey(recordKey, partitionPath); + }; + } + } + + @Override + public void close() { + if (this.nestedItr != null) { + this.nestedItr.close(); + } + } + + @Override + public boolean hasNext() { + return this.nestedItr.hasNext(); + } + + @Override + public HoodieKey next() { + return this.func.apply(this.nestedItr.next()); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java index e07c0fad3d24e..7bef8477125c2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java @@ -147,7 +147,7 @@ public void testFetchRecordKeyPartitionPathFromParquet(String typeCode) throws E // Read and verify List fetchedRows = - parquetUtils.fetchRecordKeyPartitionPath(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)); + parquetUtils.fetchHoodieKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)); assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match"); for (HoodieKey entry : fetchedRows) { @@ -173,7 +173,7 @@ public void testFetchRecordKeyPartitionPathVirtualKeysFromParquet() throws Excep // Read and verify List fetchedRows = - parquetUtils.fetchRecordKeyPartitionPath(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath), + parquetUtils.fetchHoodieKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath), Option.of(new TestBaseKeyGen("abc","def"))); assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 27efef2228f40..0f944c56577e2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -211,9 +211,9 @@ protected void loadRecords(String partitionPath) throws Exception { if (!isValidFile(baseFile.getFileStatus())) { return; } - try (ClosableIterator iterator = fileUtils.fetchRecordKeyPartitionPathIterator(this.hadoopConf, new Path(baseFile.getPath()))) { + try (ClosableIterator iterator = fileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(baseFile.getPath()))) { iterator.forEachRemaining(hoodieKey -> { - output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(iterator.next(), fileSlice)))); + output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)))); }); } });