Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
190884f
builds
May 1, 2024
4b12fc7
remove duplicate license
May 1, 2024
3a25d17
uncomment and fix commented code
May 1, 2024
d8f574e
get more tests working. Still can build
May 1, 2024
e4b291f
builds, testhoodiefilegroupreaderonspark fails because of LZ4_RAW codec
May 1, 2024
d1de8c5
fix compression issue
May 1, 2024
d6fd984
all tests should pass hudi-common now
May 1, 2024
8c4a9b3
Merge branch 'master' into move_hadoop_readers_to_hadoop_module
May 1, 2024
9ad99ef
fix spark 2.4 and try to fix other spark versions
May 1, 2024
6c2925b
fix hadoop mr test
May 1, 2024
65e6b37
fix integration test
May 1, 2024
7e38f4e
fix failing test by commenting out org.apache.hudi.hadoop.fs.HadoopFS…
May 2, 2024
565a8c4
fix reflection wrong class path
May 2, 2024
385b66d
remove double copyright
May 2, 2024
c97daf2
fix some more tests
May 2, 2024
50b743d
Merge branch 'master' into move_hadoop_readers_to_hadoop_module
May 2, 2024
4e4a01a
fix failing java test
May 2, 2024
9ad3d3e
use try so writer closes and flushes buffer
May 2, 2024
b88fa88
add hudi-io dependency to another module
May 2, 2024
c0a81f2
try to fix testLocalFileSystemLoading which only fails in azure ci
May 2, 2024
834aad2
remove parquet dep from hadoop common
May 2, 2024
1a5e0d9
Merge branch 'master' into move_hadoop_readers_to_hadoop_module
May 6, 2024
7376b45
get building
May 6, 2024
f75268f
unify org config
May 6, 2024
d491f7e
get rid of duplicate copyright
May 6, 2024
ea258fe
fix flink issue
May 6, 2024
deaa09e
fix failing test
May 7, 2024
e78b401
Merge branch 'master' into move_hadoop_readers_to_hadoop_module
May 7, 2024
9c503b6
Merge branch 'move_hadoop_readers_to_hadoop_module' into create_hudi_…
May 7, 2024
7c72471
fix ci
May 7, 2024
b7f31f2
Merge branch 'move_hadoop_readers_to_hadoop_module' into create_hudi_…
May 7, 2024
158cc30
Merge branch 'master' into move_hadoop_readers_to_hadoop_module
May 7, 2024
2bde7d5
fix merge
May 7, 2024
253d52a
Merge branch 'move_hadoop_readers_to_hadoop_module' into create_hudi_…
May 7, 2024
0de96bd
address some of the review feedback
May 8, 2024
94063d1
Merge branch 'master' into move_hadoop_readers_to_hadoop_module
May 8, 2024
3d33de3
fix integ test issue
May 8, 2024
532c386
address more review comments
May 8, 2024
bf97332
remove unnecessary changes
May 8, 2024
789308f
Merge branch 'master' into move_hadoop_readers_to_hadoop_module
May 8, 2024
ea50694
fix merge issue
May 8, 2024
a24b9db
move TestInProcessLockProvider to hadoop common
May 8, 2024
fbbe1d0
Merge branch 'master' into move_hadoop_readers_to_hadoop_module
May 9, 2024
1219237
remove extra license
May 9, 2024
c1f6b79
Merge branch 'move_hadoop_readers_to_hadoop_module' into create_hudi_…
May 9, 2024
8693b73
address most review comments
May 9, 2024
8e7d0a2
addressed review comments
May 9, 2024
e2abfbe
clean up
May 9, 2024
c4dd659
address reveiw comments
May 9, 2024
e8f82c4
address review comments
May 9, 2024
b449d6e
Merge branch 'move_hadoop_readers_to_hadoop_module' into create_hudi_…
May 9, 2024
5bf8143
change some naming
May 9, 2024
12faeb3
Merge branch 'master' into create_hudi_io_factory
May 9, 2024
0007f84
address review comments
May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.HoodieAvroParquetReader;
import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
Expand Down Expand Up @@ -71,7 +71,7 @@ public void testProperWriting() throws IOException {

HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, storageConf.unwrap(), 0.1, true);
ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, storageConf, 0.1, true);

StoragePath filePath = new StoragePath(tmpDir.resolve("test.parquet").toAbsolutePath().toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.io.storage.HoodieAvroOrcWriter;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter;
import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.orc.CompressionKind;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
Expand Down Expand Up @@ -124,7 +124,7 @@ public StoragePath withInserts(String partition, String fileId, List<HoodieRecor
new AvroSchemaConverter().convert(schema), schema, Option.of(filter), new Properties());
HoodieParquetConfig<HoodieAvroWriteSupport> config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true);
storage.getConf(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true);
try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter(
new StoragePath(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime,
contextSupplier, populateMetaFields)) {
Expand All @@ -142,7 +142,7 @@ public StoragePath withInserts(String partition, String fileId, List<HoodieRecor
}
}
} else if (HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.ORC)) {
Configuration conf = new Configuration();
StorageConfiguration conf = storage.getConf().newInstance();
int orcStripSize = Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue());
int orcBlockSize = Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue());
int maxFileSize = Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieTable;

import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -76,7 +77,7 @@ private static HoodieRowDataFileWriter newParquetInternalRowFileWriter(
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
writeSupport.getHadoopConf(),
new HadoopStorageConfiguration(writeSupport.getHadoopConf()),
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.io.storage.row;

import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.spark.sql.types.StructType;

import java.io.IOException;
import java.io.OutputStream;

public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory {

Expand Down Expand Up @@ -67,7 +68,7 @@ protected HoodieFileWriter newParquetFileWriter(
}

protected HoodieFileWriter newParquetFileWriter(
FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException {
OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException {
boolean enableBloomFilter = false;
HoodieRowParquetWriteSupport writeSupport = getHoodieRowParquetWriteSupport(conf, schema, config, enableBloomFilter);
String compressionCodecName = config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
Expand All @@ -83,7 +84,7 @@ protected HoodieFileWriter newParquetFileWriter(
writeSupport.getHadoopConf(), config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED));
parquetConfig.getHadoopConf().addResource(writeSupport.getHadoopConf());
return new HoodieSparkParquetStreamWriter(outputStream, parquetConfig);
return new HoodieSparkParquetStreamWriter(new FSDataOutputStream(outputStream, null), parquetConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter;
import org.apache.hudi.io.storage.row.HoodieRowParquetConfig;
import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport;
import org.apache.hudi.storage.StoragePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -79,7 +80,7 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(Stora
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
writeSupport.getHadoopConf(),
new HadoopStorageConfiguration(writeSupport.getHadoopConf()),
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled()
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.io.storage.row;

import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.StoragePath;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io.storage.row;

import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand All @@ -31,6 +32,11 @@ public class HoodieRowParquetConfig extends HoodieParquetConfig<HoodieRowParquet
public HoodieRowParquetConfig(HoodieRowParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName,
int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf,
double compressionRatio, boolean enableDictionary) {
super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, enableDictionary);
super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize,
new HadoopStorageConfiguration(hadoopConf), compressionRatio, enableDictionary);
}

public Configuration getHadoopConf() {
return getStorageConf().unwrapAs(Configuration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.io.hadoop.HoodieAvroHFileWriter;
import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter;
import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.storage.HoodieAvroOrcReader;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.storage.HoodieStorage;
Expand Down Expand Up @@ -356,8 +355,9 @@ private MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOExcepti

private MessageType readSchemaFromORCBaseFile(StoragePath orcFilePath) throws IOException {
LOG.info("Reading schema from {}", orcFilePath);

HoodieAvroOrcReader orcReader = new HoodieAvroOrcReader(metaClient.getRawHoodieStorage().getConf(), orcFilePath);
HoodieFileReader orcReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
.getFileReader(metaClient.getTableConfig(), metaClient.getRawHoodieStorage().getConf(), orcFilePath,
HoodieFileFormat.ORC, Option.empty());
return convertAvroSchemaToParquet(orcReader.getSchema());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

Expand Down Expand Up @@ -107,38 +106,25 @@ protected byte[] serializeRecords(List<HoodieRecord> records) throws IOException
}

Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) {
HoodieFileWriter parquetWriter = null;
HoodieConfig config = new HoodieConfig();
config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get().name());
config.setValue(PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
config.setValue(PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024));
config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get()));
config.setValue(PARQUET_DICTIONARY_ENABLED, String.valueOf(useDictionaryEncoding.get()));
HoodieRecordType recordType = records.iterator().next().getRecordType();
try {
parquetWriter = HoodieFileWriterFactory.getFileWriter(
HoodieFileFormat.PARQUET,
outputStream,
HoodieStorageUtils.getStorageConf(new Configuration()),
config,
writerSchema,
recordType);
for (HoodieRecord<?> record : records) {
String recordKey = getRecordKey(record).orElse(null);
parquetWriter.write(recordKey, record, writerSchema);
}
outputStream.flush();
} finally {
if (parquetWriter != null) {
parquetWriter.close();
}
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
HoodieConfig config = new HoodieConfig();
config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get().name());
config.setValue(PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
config.setValue(PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024));
config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get()));
config.setValue(PARQUET_DICTIONARY_ENABLED, String.valueOf(useDictionaryEncoding.get()));
HoodieRecordType recordType = records.iterator().next().getRecordType();
try (HoodieFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter(
HoodieFileFormat.PARQUET, outputStream, HoodieStorageUtils.getStorageConf(new Configuration()),
config, writerSchema, recordType)) {
for (HoodieRecord<?> record : records) {
String recordKey = getRecordKey(record).orElse(null);
parquetWriter.write(recordKey, record, writerSchema);
}
outputStream.flush();
}

return baos.toByteArray();
return outputStream.toByteArray();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieAvroParquetReader;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.storage.StoragePath;

Expand Down Expand Up @@ -266,7 +266,7 @@ public static void loadInstants(
.filter(fileName -> filter == null || LSMTimeline.isFileInRange(filter, fileName))
.parallel().forEach(fileName -> {
// Read the archived file
try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
try (HoodieAvroFileReader reader = (HoodieAvroFileReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, metaClient.getStorageConf(), new StoragePath(metaClient.getArchivePath(), fileName))) {
try (ClosableIterator<IndexedRecord> iterator = reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), readSchema)) {
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,32 @@

package org.apache.hudi.io.storage;

import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;

import static org.apache.hudi.common.util.TypeUtils.unsafeCast;

/**
* Marker interface for every {@link HoodieFileReader} reading in Avro (ie
* producing {@link IndexedRecord}s)
* Base class for every Avro file reader
*/
public interface HoodieAvroFileReader extends HoodieFileReader<IndexedRecord> {}
public abstract class HoodieAvroFileReader implements HoodieFileReader<IndexedRecord> {

@Override
public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema, requestedSchema);
return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
}

protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException {
return getIndexedRecordIterator(readerSchema, readerSchema);
}

public abstract ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;

public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReaderBase
public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReader
implements HoodieSeekingFileReader<IndexedRecord> {
// TODO HoodieHFileReader right now tightly coupled to MT, we should break that coupling
public static final String SCHEMA_KEY = "schema";
Expand All @@ -54,7 +54,7 @@ public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReader
* <p>
* Reads all the records with given schema
*/
public static List<IndexedRecord> readAllRecords(HoodieAvroFileReaderBase reader)
public static List<IndexedRecord> readAllRecords(HoodieAvroFileReader reader)
throws IOException {
Schema schema = reader.getSchema();
return toStream(reader.getIndexedRecordIterator(schema))
Expand Down
Loading