Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(baseFile.getPath());
List<HoodieKey> hoodieKeyList = new ArrayList<>();
if (keyGeneratorOpt.isPresent()) {
hoodieKeyList = baseFileUtils.fetchRecordKeyPartitionPath(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()), keyGeneratorOpt);
hoodieKeyList = baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()), keyGeneratorOpt);
} else {
hoodieKeyList = baseFileUtils.fetchRecordKeyPartitionPath(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()));
hoodieKeyList = baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()));
}
return hoodieKeyList.stream()
.map(entry -> Pair.of(entry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,35 @@ public abstract Map<String, String> readFooter(Configuration configuration, bool
* Fetch {@link HoodieKey}s from the given data file.
* @param configuration configuration to build fs object
* @param filePath The data file path
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file
* @return {@link List} of {@link HoodieKey}s fetched from the data file
*/
public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath);
public abstract List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath);

/**
* Provides a closable iterator for reading the given data file.
* @param configuration configuration to build fs object
* @param filePath The data file path
* @param keyGeneratorOpt instance of KeyGenerator.
* @return {@link ClosableIterator} of {@link HoodieKey}s for reading the file
*/
public abstract ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt);

/**
* Provides a closable iterator for reading the given data file.
* @param configuration configuration to build fs object
* @param filePath The data file path
* @return {@link ClosableIterator} of {@link HoodieKey}s for reading the file
*/
public abstract ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath);

/**
* Fetch {@link HoodieKey}s from the given data file.
* @param configuration configuration to build fs object
* @param filePath The data file path
* @param keyGeneratorOpt instance of KeyGenerator.
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file
* @return {@link List} of {@link HoodieKey}s fetched from the data file
*/
public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt);
public abstract List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt);

/**
* Read the Avro schema of the data file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,18 @@
import org.apache.orc.TypeDescription;

import java.io.IOException;
import java.util.Iterator;

/**
* This class wraps a ORC reader and provides an iterator based api to read from an ORC file.
*/
public class OrcReaderIterator<T> implements Iterator<T> {
public class OrcReaderIterator<T> implements ClosableIterator<T> {

private final RecordReader recordReader;
private final Schema avroSchema;
List<String> fieldNames;
List<TypeDescription> orcFieldTypes;
Schema[] avroFieldSchemas;
private VectorizedRowBatch batch;
private final List<String> fieldNames;
private final List<TypeDescription> orcFieldTypes;
private final Schema[] avroFieldSchemas;
private final VectorizedRowBatch batch;
private int rowInBatch;
private T next;

Expand All @@ -52,7 +51,7 @@ public OrcReaderIterator(RecordReader recordReader, Schema schema, TypeDescripti
this.orcFieldTypes = orcSchema.getChildren();
this.avroFieldSchemas = fieldNames.stream()
.map(fieldName -> avroSchema.getField(fieldName).schema())
.toArray(size -> new Schema[size]);
.toArray(Schema[]::new);
this.batch = orcSchema.createRowBatch();
this.rowInBatch = 0;
}
Expand Down Expand Up @@ -115,4 +114,9 @@ private GenericData.Record readRecordFromBatch() throws IOException {
rowInBatch++;
return record;
}

@Override
public void close() {
FileIOUtils.closeQuietly(this.recordReader);
}
}
128 changes: 69 additions & 59 deletions hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -55,29 +56,23 @@
public class OrcUtils extends BaseFileUtils {

/**
* Fetch {@link HoodieKey}s from the given ORC file.
* Provides a closable iterator for reading the given ORC file.
*
* @param filePath The ORC file path.
* @param configuration configuration to build fs object
* @return {@link List} of {@link HoodieKey}s fetched from the ORC file
* @param filePath The ORC file path
* @return {@link ClosableIterator} of {@link HoodieKey}s for reading the ORC file
*/
@Override
public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) {
List<HoodieKey> hoodieKeys = new ArrayList<>();
public ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath) {
try {
if (!filePath.getFileSystem(configuration).exists(filePath)) {
return new ArrayList<>();
}

Configuration conf = new Configuration(configuration);
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));

Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema);
List<String> fieldNames = orcSchema.getFieldNames();
VectorizedRowBatch batch = orcSchema.createRowBatch();
RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema));
List<String> fieldNames = orcSchema.getFieldNames();

// column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields
int keyCol = -1;
Expand All @@ -93,24 +88,43 @@ public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration,
if (keyCol == -1 || partitionCol == -1) {
throw new HoodieException(String.format("Couldn't find row keys or partition path in %s.", filePath));
}
while (recordReader.nextBatch(batch)) {
BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[keyCol];
BytesColumnVector partitionPaths = (BytesColumnVector) batch.cols[partitionCol];
for (int i = 0; i < batch.size; i++) {
String rowKey = rowKeys.toString(i);
String partitionPath = partitionPaths.toString(i);
hoodieKeys.add(new HoodieKey(rowKey, partitionPath));
}
return new OrcReaderIterator<>(recordReader, readSchema, orcSchema);
} catch (IOException e) {
throw new HoodieIOException("Failed to open reader from ORC file:" + filePath, e);
}
}

/**
* Fetch {@link HoodieKey}s from the given ORC file.
*
* @param filePath The ORC file path.
* @param configuration configuration to build fs object
* @return {@link List} of {@link HoodieKey}s fetched from the ORC file
*/
@Override
public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath) {
try {
if (!filePath.getFileSystem(configuration).exists(filePath)) {
return Collections.emptyList();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to read from ORC file:" + filePath, e);
}
List<HoodieKey> hoodieKeys = new ArrayList<>();
try (ClosableIterator<HoodieKey> iterator = getHoodieKeyIterator(configuration, filePath, Option.empty())) {
iterator.forEachRemaining(hoodieKeys::add);
}
return hoodieKeys;
}

@Override
public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
throw new HoodieIOException("UnsupportedOperation : Disabling meta fields not yet supported for Orc");
public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
throw new UnsupportedOperationException("Custom key generator is not supported yet");
}

@Override
public ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
throw new UnsupportedOperationException("Custom key generator is not supported yet");
}

/**
Expand All @@ -119,8 +133,7 @@ public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration,
@Override
public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath) {
Schema avroSchema;
try {
Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration));
try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration))) {
avroSchema = AvroOrcUtils.createAvroSchema(reader.getSchema());
} catch (IOException io) {
throw new HoodieIOException("Unable to read Avro records from an ORC file:" + filePath, io);
Expand All @@ -134,14 +147,14 @@ public List<GenericRecord> readAvroRecords(Configuration configuration, Path fil
@Override
public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema avroSchema) {
List<GenericRecord> records = new ArrayList<>();
try {
Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration));
try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration))) {
TypeDescription orcSchema = reader.getSchema();
RecordReader recordReader = reader.rows(new Options(configuration).schema(orcSchema));
OrcReaderIterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema);
while (iterator.hasNext()) {
GenericRecord record = iterator.next();
records.add(record);
try (RecordReader recordReader = reader.rows(new Options(configuration).schema(orcSchema))) {
OrcReaderIterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema);
while (iterator.hasNext()) {
GenericRecord record = iterator.next();
records.add(record);
}
}
} catch (IOException io) {
throw new HoodieIOException("Unable to create an ORC reader for ORC file:" + filePath, io);
Expand All @@ -161,35 +174,35 @@ public List<GenericRecord> readAvroRecords(Configuration configuration, Path fil
@Override
public Set<String> filterRowKeys(Configuration conf, Path filePath, Set<String> filter)
throws HoodieIOException {
try {
Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
Set<String> filteredRowKeys = new HashSet<>();
try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));) {
TypeDescription schema = reader.getSchema();
List<String> fieldNames = schema.getFieldNames();
VectorizedRowBatch batch = schema.createRowBatch();
RecordReader recordReader = reader.rows(new Options(conf).schema(schema));
try (RecordReader recordReader = reader.rows(new Options(conf).schema(schema))) {
Set<String> filteredRowKeys = new HashSet<>();
List<String> fieldNames = schema.getFieldNames();
VectorizedRowBatch batch = schema.createRowBatch();

// column index for the RECORD_KEY_METADATA_FIELD field
int colIndex = -1;
for (int i = 0; i < fieldNames.size(); i++) {
if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
colIndex = i;
break;
// column index for the RECORD_KEY_METADATA_FIELD field
int colIndex = -1;
for (int i = 0; i < fieldNames.size(); i++) {
if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
colIndex = i;
break;
}
}
}
if (colIndex == -1) {
throw new HoodieException(String.format("Couldn't find row keys in %s.", filePath));
}
while (recordReader.nextBatch(batch)) {
BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex];
for (int i = 0; i < batch.size; i++) {
String rowKey = rowKeys.toString(i);
if (filter.isEmpty() || filter.contains(rowKey)) {
filteredRowKeys.add(rowKey);
if (colIndex == -1) {
throw new HoodieException(String.format("Couldn't find row keys in %s.", filePath));
}
while (recordReader.nextBatch(batch)) {
BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex];
for (int i = 0; i < batch.size; i++) {
String rowKey = rowKeys.toString(i);
if (filter.isEmpty() || filter.contains(rowKey)) {
filteredRowKeys.add(rowKey);
}
}
}
return filteredRowKeys;
}
return filteredRowKeys;
} catch (IOException io) {
throw new HoodieIOException("Unable to read row keys for ORC file:" + filePath, io);
}
Expand All @@ -198,8 +211,7 @@ public Set<String> filterRowKeys(Configuration conf, Path filePath, Set<String>
@Override
public Map<String, String> readFooter(Configuration conf, boolean required,
Path orcFilePath, String... footerNames) {
try {
Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) {
Map<String, String> footerVals = new HashMap<>();
List<UserMetadataItem> metadataItemList = reader.getFileTail().getFooter().getMetadataList();
Map<String, String> metadata = metadataItemList.stream().collect(Collectors.toMap(
Expand All @@ -221,8 +233,7 @@ public Map<String, String> readFooter(Configuration conf, boolean required,

@Override
public Schema readAvroSchema(Configuration conf, Path orcFilePath) {
try {
Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) {
if (reader.hasMetadataValue("orc.avro.schema")) {
ByteBuffer metadataValue = reader.getMetadataValue("orc.avro.schema");
byte[] bytes = new byte[metadataValue.remaining()];
Expand All @@ -239,8 +250,7 @@ public Schema readAvroSchema(Configuration conf, Path orcFilePath) {

@Override
public long getRowCount(Configuration conf, Path orcFilePath) {
try {
Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) {
return reader.getNumberOfRows();
} catch (IOException io) {
throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@
import org.apache.parquet.hadoop.ParquetReader;

import java.io.IOException;
import java.util.Iterator;

/**
* This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in
* {@link BoundedInMemoryQueue}
*/
public class ParquetReaderIterator<T> implements Iterator<T> {
public class ParquetReaderIterator<T> implements ClosableIterator<T> {

// Parquet reader for an existing parquet file
private final ParquetReader<T> parquetReader;
Expand Down Expand Up @@ -73,7 +72,11 @@ public T next() {
}
}

public void close() throws IOException {
parquetReader.close();
public void close() {
try {
parquetReader.close();
} catch (IOException e) {
throw new HoodieException("Exception while closing the parquet reader", e);
}
}
}
Loading