diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java index b0831f0bc9ca0..20f75f63c5234 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java @@ -21,9 +21,9 @@ import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.LazyIterableIterator; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.WriteHandleFactory; @@ -90,18 +90,24 @@ public R getResult() { } } - /** - * Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some - * expensive operations of transformation to the reader thread. - */ - static Function, HoodieInsertValueGenResult> getTransformFunction( - Schema schema, HoodieWriteConfig config) { - return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, config.getProps()); + static Function, HoodieInsertValueGenResult> getCloningTransformer(Schema schema, + HoodieWriteConfig config) { + return getCloningTransformerInternal(schema, config.getProps()); } - static Function, HoodieInsertValueGenResult> getTransformFunction( - Schema schema) { - return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, CollectionUtils.emptyProps()); + static Function, HoodieInsertValueGenResult> getCloningTransformer(Schema schema) { + return getCloningTransformerInternal(schema, new TypedProperties()); + } + + private static Function, HoodieInsertValueGenResult> getCloningTransformerInternal(Schema schema, + TypedProperties props) { + return record -> { + // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific + // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of + // it since these records will be subsequently buffered (w/in the in-memory queue) + HoodieRecord clonedRecord = record.copy(); + return new HoodieInsertValueGenResult(clonedRecord, schema, props); + }; } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index d3782e9f2008f..6f6d96394f87d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -143,8 +143,7 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props } else { rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields); } - MetadataValues metadataValues = new MetadataValues(); - metadataValues.setFileName(path.getName()); + MetadataValues metadataValues = new MetadataValues().setFileName(path.getName()); rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues); if (preserveMetadata) { fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 28377918b4d38..276b318890138 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -388,12 +388,11 @@ protected void writeToFile(HoodieKey key, HoodieRecord record, Schema schema, } else { rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields); } - MetadataValues metadataValues = new MetadataValues(); - metadataValues.setFileName(newFilePath.getName()); + // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the + // file holding this record even in cases when overall metadata is preserved + MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName()); rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues); if (shouldPreserveRecordMetadata) { - // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the - // file holding this record even in cases when overall metadata is preserved fileWriter.write(key.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields); } else { fileWriter.writeWithMetadata(key, rewriteRecord, writeSchemaWithMetaFields); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java index fda1435345b92..e39a60d390cec 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java @@ -18,6 +18,9 @@ package org.apache.hudi.io; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; @@ -33,10 +36,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.generic.GenericRecord; - import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -80,25 +79,16 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)); } - protected boolean writeUpdateRecord(HoodieRecord newRecord, HoodieRecord oldRecord, Option combineRecordOpt, Schema writerSchema) + protected boolean writeUpdateRecord(HoodieRecord newRecord, HoodieRecord oldRecord, Option combinedRecordOpt, Schema writerSchema) throws IOException { // TODO [HUDI-5019] Remove these unnecessary newInstance invocations - Option savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance); - HoodieRecord savedOldRecord = oldRecord.newInstance(); - final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema); + Option savedCombineRecordOp = combinedRecordOpt.map(HoodieRecord::newInstance); + final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combinedRecordOpt, writerSchema); if (result) { boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation()); - Option avroOpt = savedCombineRecordOp - .flatMap(r -> { - try { - return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps()) - .map(HoodieAvroIndexedRecord::getData); - } catch (IOException e) { - LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e); - return Option.empty(); - } - }); - cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt); + Option avroRecordOpt = savedCombineRecordOp.flatMap(r -> + toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps())); + cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt); } return result; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 2066e9c241c3d..3e0691cdf044d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -19,6 +19,7 @@ package org.apache.hudi.io; import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; @@ -27,6 +28,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; @@ -279,4 +281,13 @@ protected HoodieLogFormat.Writer createLogWriter(String baseCommitTime, String f + "file suffix: " + fileSuffix + " error"); } } + + protected static Option toAvroRecord(HoodieRecord record, Schema writerSchema, TypedProperties props) { + try { + return record.toIndexedRecord(writerSchema, props).map(HoodieAvroIndexedRecord::getData); + } catch (IOException e) { + LOG.error("Fail to get indexRecord from " + record, e); + return Option.empty(); + } + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 4c5db03d3593c..c1bf6de1fd681 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -33,8 +32,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import javax.annotation.Nonnull; - import java.io.IOException; import java.util.Iterator; @@ -77,16 +74,7 @@ protected Iterator getMergingIterator(HoodieTable tabl return new MergingIterator<>( (Iterator) reader.getRecordIterator(readerSchema), (Iterator) bootstrapReader.getRecordIterator(bootstrapReadSchema), - (oneRecord, otherRecord) -> mergeRecords(oneRecord, otherRecord, mergeHandle.getWriterSchemaWithMetaFields())); - } - - @Nonnull - private static HoodieRecord mergeRecords(HoodieRecord left, HoodieRecord right, Schema targetSchema) { - try { - return left.joinWith(right, targetSchema); - } catch (IOException e) { - throw new HoodieIOException("Failed to merge records", e); - } + (oneRecord, otherRecord) -> oneRecord.joinWith(otherRecord, mergeHandle.getWriterSchemaWithMetaFields())); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index 622ed4573ed7c..adef1c44591a8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecordMerger; @@ -82,17 +83,16 @@ public I combineOnCondition( * @param parallelism parallelism or partitions to be used while reducing/deduplicating * @return Collection of HoodieRecord already be deduplicated */ - public I deduplicateRecords( - I records, HoodieTable table, int parallelism) { + public I deduplicateRecords(I records, HoodieTable table, int parallelism) { HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger(); return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger); } - public I deduplicateRecords( - I records, HoodieIndex index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) { - return deduplicateRecordsInternal(records, index, parallelism, schema, HoodieAvroRecordMerger.withDeDuping(props), merger); + public I deduplicateRecords(I records, HoodieIndex index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) { + TypedProperties updatedProps = HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(props); + return doDeduplicateRecords(records, index, parallelism, schema, updatedProps, merger); } - protected abstract I deduplicateRecordsInternal( - I records, HoodieIndex index, int parallelism, String schema, Properties props, HoodieRecordMerger merger); + protected abstract I doDeduplicateRecords( + I records, HoodieIndex index, int parallelism, String schema, TypedProperties props, HoodieRecordMerger merger); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index 1893132e7d7ff..6557f83b24181 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.SerializableSchema; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; @@ -31,7 +32,6 @@ import org.apache.hudi.table.HoodieTable; import java.io.IOException; -import java.util.Properties; public class HoodieWriteHelper extends BaseWriteHelper>, HoodieData, HoodieData, R> { @@ -53,8 +53,8 @@ protected HoodieData> tag(HoodieData> dedupedRec } @Override - public HoodieData> deduplicateRecordsInternal( - HoodieData> records, HoodieIndex index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) { + protected HoodieData> doDeduplicateRecords( + HoodieData> records, HoodieIndex index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) { boolean isIndexingGlobal = index.isGlobal(); final SerializableSchema schema = new SerializableSchema(schemaStr); // Auto-tunes the parallelism for reduce transformation based on the number of data partitions @@ -64,7 +64,10 @@ public HoodieData> deduplicateRecordsInternal( HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; - return Pair.of(key, record); + // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING + // Here we have to make a copy of the incoming record, since it might be holding + // an instance of [[InternalRow]] pointing into shared, mutable buffer + return Pair.of(key, record.copy()); }).reduceByKey((rec1, rec2) -> { HoodieRecord reducedRecord; try { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java index 5fb7065a8a89c..815b04c9dbc87 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -228,14 +228,14 @@ public void testReaderGetRecordIterator() throws Exception { IntStream.concat(IntStream.range(40, NUM_RECORDS * 2), IntStream.range(10, 20)) .mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toList()); Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc"); - Iterator iterator = hfileReader.getIndexedRecordsByKeysIterator(keys, avroSchema); + Iterator> iterator = hfileReader.getRecordsByKeysIterator(keys, avroSchema); List expectedIds = IntStream.concat(IntStream.range(40, NUM_RECORDS), IntStream.range(10, 20)) .boxed().collect(Collectors.toList()); int index = 0; while (iterator.hasNext()) { - GenericRecord record = (GenericRecord) iterator.next(); + GenericRecord record = (GenericRecord) iterator.next().getData(); String key = "key" + String.format("%02d", expectedIds.get(index)); assertEquals(key, record.get("_row_key").toString()); assertEquals(Integer.toString(expectedIds.get(index)), record.get("time").toString()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index 5ac00bd80589a..120fdb7deb2a6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -61,7 +61,7 @@ protected List computeNext() { try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), - Option.of(getExplicitInsertHandler()), getTransformFunction(schema, hoodieConfig)); + Option.of(getExplicitInsertHandler()), getCloningTransformer(schema, hoodieConfig)); final List result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java index 7dde24ec8c8f9..39f55f027f1cb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java @@ -70,21 +70,12 @@ protected boolean writeUpdateRecord(HoodieRecord newRecord, HoodieRecord o throws IOException { // TODO [HUDI-5019] Remove these unnecessary newInstance invocations Option savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance); - HoodieRecord savedOldRecord = oldRecord.newInstance(); final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema); if (result) { boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation()); - Option avroOpt = savedCombineRecordOp - .flatMap(r -> { - try { - return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps()) - .map(HoodieAvroIndexedRecord::getData); - } catch (IOException e) { - LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e); - return Option.empty(); - } - }); - cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt); + Option avroRecordOpt = savedCombineRecordOp.flatMap(r -> + toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps())); + cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt); } return result; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java index 4c9a7484c192e..ce419f16e2a6f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java @@ -68,21 +68,12 @@ protected boolean writeUpdateRecord(HoodieRecord newRecord, HoodieRecord o throws IOException { // TODO [HUDI-5019] Remove these unnecessary newInstance invocations Option savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance); - HoodieRecord savedOldRecord = oldRecord.newInstance(); final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema); if (result) { boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation()); - Option avroOpt = savedCombineRecordOp - .flatMap(r -> { - try { - return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps()) - .map(HoodieAvroIndexedRecord::getData); - } catch (IOException e) { - LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e); - return Option.empty(); - } - }); - cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt); + Option avroRecordOpt = savedCombineRecordOp.flatMap(r -> + toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps())); + cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt); } return result; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index db9422336d3a9..8855457684993 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -18,7 +18,9 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; @@ -32,15 +34,12 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.avro.Schema; - import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Properties; import java.util.stream.Collectors; /** @@ -91,8 +90,8 @@ protected List> tag(List> dedupedRecords, Hoodie } @Override - public List> deduplicateRecordsInternal( - List> records, HoodieIndex index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) { + protected List> doDeduplicateRecords( + List> records, HoodieIndex index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) { // If index used is global, then records are expected to differ in their partitionPath Map>> keyedRecords = records.stream() .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey())); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java index ddf6345926e4a..1e430cd699fd7 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java @@ -64,7 +64,7 @@ protected List computeNext() { try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = - new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema)); + new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getCloningTransformer(schema)); final List result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java index 14016cb5c0ff2..dc109f8103012 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java @@ -18,7 +18,9 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; @@ -29,13 +31,10 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; - import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Properties; import java.util.stream.Collectors; public class JavaWriteHelper extends BaseWriteHelper>, @@ -58,8 +57,8 @@ protected List> tag(List> dedupedRecords, Hoodie } @Override - public List> deduplicateRecordsInternal( - List> records, HoodieIndex index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) { + protected List> doDeduplicateRecords( + List> records, HoodieIndex index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) { boolean isIndexingGlobal = index.isGlobal(); Map>>> keyedRecords = records.stream().map(record -> { HoodieKey hoodieKey = record.getKey(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java index e111679842a39..9cdccbe407337 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java @@ -18,7 +18,9 @@ package org.apache.hudi.commmon.model; +import org.apache.avro.Schema; import org.apache.hudi.HoodieInternalRowUtils; +import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.client.model.HoodieInternalRow; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieKey; @@ -28,18 +30,19 @@ import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.SparkKeyGeneratorInterface; import org.apache.hudi.util.HoodieSparkRecordUtils; - -import org.apache.avro.Schema; import org.apache.spark.sql.HoodieCatalystExpressionUtils$; import org.apache.spark.sql.HoodieUnsafeRowUtils; import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath; import org.apache.spark.sql.catalyst.CatalystTypeConverters; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.JoinedRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; @@ -55,7 +58,19 @@ import static org.apache.spark.sql.types.DataTypes.StringType; /** - * Spark Engine-specific Implementations of `HoodieRecord`. + * Spark Engine-specific Implementations of `HoodieRecord` + * + * NOTE: [[HoodieSparkRecord]] is expected to hold either [[UnsafeRow]] or [[HoodieInternalRow]]: + * + *
    + *
  • [[UnsafeRow]] is held to make sure a) we don't deserialize raw bytes payload + * into JVM types unnecessarily, b) we don't incur penalty of ser/de during shuffling, + * c) we don't add strain on GC
  • + *
  • [[HoodieInternalRow]] is held in cases when underlying [[UnsafeRow]]'s metadata fields + * need to be updated (ie serving as an overlay layer on top of [[UnsafeRow]])
  • + *
+ * + */ public class HoodieSparkRecord extends HoodieRecord { @@ -68,48 +83,38 @@ public class HoodieSparkRecord extends HoodieRecord { * We should use this construction method when we read internalRow from file. * The record constructed by this method must be used in iter. */ - public HoodieSparkRecord(InternalRow data, StructType schema) { + public HoodieSparkRecord(InternalRow data) { super(null, data); - this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, false); + validateRow(data); this.copy = false; } - public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) { + public HoodieSparkRecord(HoodieKey key, InternalRow data, boolean copy) { super(key, data); - this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true); - this.copy = true; + validateRow(data); + this.copy = copy; } - public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation) { + private HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation operation, boolean copy) { super(key, data, operation); - this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true); - this.copy = true; - } + validateRow(data); - public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, boolean copy) { - super(key, data, operation); - this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true); this.copy = copy; } - public HoodieSparkRecord(HoodieSparkRecord record) { - super(record); - this.copy = record.copy; - } - @Override public HoodieSparkRecord newInstance() { - return new HoodieSparkRecord(this); + return new HoodieSparkRecord(this.key, this.data, this.operation, this.copy); } @Override public HoodieSparkRecord newInstance(HoodieKey key, HoodieOperation op) { - return new HoodieSparkRecord(key, data, null, op); + return new HoodieSparkRecord(key, this.data, op, this.copy); } @Override public HoodieSparkRecord newInstance(HoodieKey key) { - return new HoodieSparkRecord(key, data, null); + return new HoodieSparkRecord(key, this.data, this.operation, this.copy); } @Override @@ -145,50 +150,56 @@ public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean c } @Override - public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException { + public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema); InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData()); - return new HoodieSparkRecord(getKey(), mergeRow, targetStructType, getOperation(), copy); + UnsafeProjection projection = + HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType); + return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), getOperation(), copy); } @Override public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException { StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema); - UTF8String[] metaFields = extractMetaField(structType, targetStructType); - if (metaFields.length == 0) { - throw new UnsupportedOperationException(); - } - boolean containMetaFields = hasMetaField(structType); - InternalRow resultRow = new HoodieInternalRow(metaFields, data, containMetaFields); - return new HoodieSparkRecord(getKey(), resultRow, targetStructType, getOperation(), copy); + boolean containMetaFields = hasMetaFields(structType); + UTF8String[] metaFields = tryExtractMetaFields(data, structType); + + // TODO add actual rewriting + InternalRow finalRow = new HoodieInternalRow(metaFields, data, containMetaFields); + + return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy); } @Override public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map renameCols) throws IOException { StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema); - InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols); - UTF8String[] metaFields = extractMetaField(structType, newStructType); - if (metaFields.length > 0) { - rewriteRow = new HoodieInternalRow(metaFields, data, true); - } - return new HoodieSparkRecord(getKey(), rewriteRow, newStructType, getOperation(), copy); + boolean containMetaFields = hasMetaFields(structType); + UTF8String[] metaFields = tryExtractMetaFields(data, structType); + + InternalRow rewrittenRow = + HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols); + HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, rewrittenRow, containMetaFields); + + return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy); } @Override public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException { StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(data, structType); + metadataValues.getKv().forEach((key, value) -> { int pos = structType.fieldIndex(key); if (value != null) { - data.update(pos, CatalystTypeConverters.convertToCatalyst(value)); + updatableRow.update(pos, CatalystTypeConverters.convertToCatalyst(value)); } }); - return new HoodieSparkRecord(getKey(), data, structType, getOperation(), copy); + return new HoodieSparkRecord(getKey(), updatableRow, getOperation(), copy); } @Override @@ -242,7 +253,9 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, P StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); String key; String partition; - if (keyGen.isPresent() && !Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) { + boolean populateMetaFields = Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(), + POPULATE_META_FIELDS.defaultValue().toString()).toString()); + if (!populateMetaFields && keyGen.isPresent()) { SparkKeyGeneratorInterface keyGenerator = (SparkKeyGeneratorInterface) keyGen.get(); key = keyGenerator.getRecordKey(data, structType).toString(); partition = keyGenerator.getPartitionPath(data, structType).toString(); @@ -251,7 +264,7 @@ public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, P partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString(); } HoodieKey hoodieKey = new HoodieKey(key, partition); - return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), copy); + return new HoodieSparkRecord(hoodieKey, data, getOperation(), copy); } @Override @@ -268,7 +281,7 @@ public Option toIndexedRecord(Schema recordSchema, Prop public HoodieSparkRecord copy() { if (!copy) { this.data = this.data.copy(); - copy = true; + this.copy = true; } return this; } @@ -286,20 +299,29 @@ public Comparable getOrderingValue(Schema recordSchema, Properties props) { } } - private UTF8String[] extractMetaField(StructType recordStructType, StructType structTypeWithMetaField) { - return HOODIE_META_COLUMNS_WITH_OPERATION.stream() - .filter(f -> HoodieCatalystExpressionUtils$.MODULE$.existField(structTypeWithMetaField, f)) - .map(field -> { - if (HoodieCatalystExpressionUtils$.MODULE$.existField(recordStructType, field)) { - return data.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(field)); - } else { - return UTF8String.EMPTY_UTF8; - } - }).toArray(UTF8String[]::new); + private static HoodieInternalRow wrapIntoUpdatableOverlay(InternalRow data, StructType structType) { + if (data instanceof HoodieInternalRow) { + return (HoodieInternalRow) data; + } + + boolean containsMetaFields = hasMetaFields(structType); + UTF8String[] metaFields = tryExtractMetaFields(data, structType); + return new HoodieInternalRow(metaFields, data, containsMetaFields); } - private static boolean hasMetaField(StructType structType) { - return HoodieCatalystExpressionUtils$.MODULE$.existField(structType, COMMIT_TIME_METADATA_FIELD); + private static UTF8String[] tryExtractMetaFields(InternalRow row, StructType structType) { + boolean containsMetaFields = hasMetaFields(structType); + if (containsMetaFields) { + return HoodieRecord.HOODIE_META_COLUMNS.stream() + .map(col -> row.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(col))) + .toArray(UTF8String[]::new); + } else { + return new UTF8String[HoodieRecord.HOODIE_META_COLUMNS.size()]; + } + } + + private static boolean hasMetaFields(StructType structType) { + return structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined(); } /** @@ -329,6 +351,14 @@ private static HoodieRecord convertToHoodieSparkRecord(StructType s HoodieOperation operation = withOperationField ? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null; - return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, structType, operation, record.copy); + return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, operation, record.copy); + } + + private static void validateRow(InternalRow data) { + // NOTE: [[HoodieSparkRecord]] is expected to hold either + // - Instance of [[UnsafeRow]] or + // - Instance of [[HoodieInternalRow]] or + // - Instance of [[ColumnarBatchRow]] + ValidationUtils.checkState(data instanceof UnsafeRow || data instanceof HoodieInternalRow || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index f1ae56d70bfc5..b6bb893e02bf4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -85,7 +85,7 @@ protected List computeNext() { } bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(), - getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); + getCloningTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); final List result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java index 0a042d3362956..8673d2f5ba624 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java @@ -18,6 +18,8 @@ package org.apache.hudi.execution.bulkinsert; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.HoodieInternalRowUtils; import org.apache.hudi.HoodieSparkUtils; @@ -33,13 +35,9 @@ import org.apache.hudi.common.model.RewriteAvroPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieClusteringConfig; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import java.util.Properties; @@ -80,7 +78,6 @@ public JavaRDD> repartitionRecords(JavaRDD> reco schema.toString(), sparkEngineContext.getSqlContext().sparkSession() ); - Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty()) @@ -94,19 +91,17 @@ public JavaRDD> repartitionRecords(JavaRDD> reco }); } else if (recordType == HoodieRecordType.SPARK) { StructType structType = HoodieInternalRowUtils.getCachedSchema(schema.get()); - Dataset sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(), sparkEngineContext.getSqlContext().sparkSession(), structType); - + Dataset sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(), + sparkEngineContext.getSqlContext().sparkSession(), structType); Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); return sortedDataset.queryExecution().toRdd() .toJavaRDD() - .map(row -> { - InternalRow internalRow = row.copy(); + .map(internalRow -> { String key = internalRow.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal()); String partition = internalRow.getString(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal()); HoodieKey hoodieKey = new HoodieKey(key, partition); - HoodieRecord hoodieRecord = new HoodieSparkRecord(hoodieKey, internalRow, structType); - return hoodieRecord; + return (HoodieRecord) new HoodieSparkRecord(hoodieKey, internalRow, false); }); } else { throw new UnsupportedOperationException(recordType.name()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java index 86134eda37885..553b084e29c58 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java @@ -18,36 +18,10 @@ package org.apache.hudi.io.storage; -import org.apache.hudi.HoodieInternalRowUtils; -import org.apache.hudi.commmon.model.HoodieSparkRecord; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.common.util.MappingIterator; - -import org.apache.avro.Schema; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.StructType; - -import java.io.IOException; - -import static org.apache.hudi.common.util.TypeUtils.unsafeCast; - -public interface HoodieSparkFileReader extends HoodieFileReader { - ClosableIterator getInternalRowIterator(Schema readerSchema) throws IOException; - - ClosableIterator getInternalRowIterator(Schema readerSchema, Schema requestedSchema) throws IOException; - - default ClosableIterator> getRecordIterator(Schema readerSchema) throws IOException { - ClosableIterator iterator = getInternalRowIterator(readerSchema); - StructType structType = HoodieInternalRowUtils.getCachedSchema(readerSchema); - return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieSparkRecord(data, structType))); - } - - @Override - default ClosableIterator> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { - ClosableIterator iterator = getInternalRowIterator(readerSchema, requestedSchema); - StructType structType = HoodieInternalRowUtils.getCachedSchema(requestedSchema); - return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieSparkRecord(data, structType))); - } -} +/** + * Marker interface for every {@link HoodieFileReader} reading in Catalyst (Spark native tyeps, ie + * producing {@link InternalRow}s) + */ +public interface HoodieSparkFileReader extends HoodieFileReader {} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index a46174cbae5a6..b162cbbadc757 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -22,16 +22,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hudi.HoodieInternalRowUtils; +import org.apache.hudi.commmon.model.HoodieSparkRecord; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.MappingIterator; import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.io.InputFile; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; @@ -41,6 +44,8 @@ import java.util.List; import java.util.Set; +import static org.apache.hudi.common.util.TypeUtils.unsafeCast; + public class HoodieSparkParquetReader implements HoodieSparkFileReader { private final Path path; @@ -70,12 +75,20 @@ public Set filterRowKeys(Set candidateRowKeys) { } @Override - public ClosableIterator getInternalRowIterator(Schema schema) throws IOException { - return getInternalRowIterator(schema, null); + public ClosableIterator> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { + ClosableIterator iterator = getInternalRowIterator(readerSchema, requestedSchema); + StructType structType = HoodieInternalRowUtils.getCachedSchema(requestedSchema); + UnsafeProjection projection = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType); + + return new MappingIterator<>(iterator, data -> { + // NOTE: We have to do [[UnsafeProjection]] of incoming [[InternalRow]] to convert + // it to [[UnsafeRow]] holding just raw bytes + UnsafeRow unsafeRow = projection.apply(data); + return unsafeCast(new HoodieSparkRecord(unsafeRow)); + }); } - @Override - public ClosableIterator getInternalRowIterator(Schema readerSchema, Schema requestedSchema) throws IOException { + private ClosableIterator getInternalRowIterator(Schema readerSchema, Schema requestedSchema) throws IOException { if (requestedSchema == null) { requestedSchema = readerSchema; } @@ -85,13 +98,9 @@ public ClosableIterator getInternalRowIterator(Schema readerSchema, conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), requestedStructType.json()); conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), (Boolean) SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING())); conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), (Boolean) SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP())); - InputFile inputFile = HadoopInputFile.fromPath(path, conf); - ParquetReader reader = new ParquetReader.Builder(inputFile) { - @Override - protected ReadSupport getReadSupport() { - return new ParquetReadSupport(); - } - }.withConf(conf).build(); + ParquetReader reader = ParquetReader.builder((ReadSupport) new ParquetReadSupport(), path) + .withConf(conf) + .build(); ParquetReaderIterator parquetReaderIterator = new ParquetReaderIterator<>(reader); readerIterators.add(parquetReaderIterator); return parquetReaderIterator; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java index 3828f63564e31..86a984dd83517 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java @@ -74,8 +74,7 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, HoodieRecord recordCopy = record.copy(); String recKey = recordCopy.getRecordKey(reader.getSchema(), Option.of(keyGenerator)); HoodieRecord hoodieRecord = recordCopy.rewriteRecord(reader.getSchema(), config.getProps(), HoodieAvroUtils.RECORD_KEY_SCHEMA); - MetadataValues metadataValues = new MetadataValues(); - metadataValues.setRecordKey(recKey); + MetadataValues metadataValues = new MetadataValues().setRecordKey(recKey); return hoodieRecord .updateMetadataValues(HoodieAvroUtils.RECORD_KEY_SCHEMA, new Properties(), metadataValues) .newInstance(new HoodieKey(recKey, partitionPath)); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala index 169ccd61c38c0..b6c331cbebc43 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala @@ -219,15 +219,6 @@ object HoodieInternalRowUtils { schemaMap.get(schema) } - def projectUnsafe(row: InternalRow, structType: StructType, copy: Boolean = true): InternalRow = { - if (row == null || row.isInstanceOf[UnsafeRow] || row.isInstanceOf[HoodieInternalRow]) { - row - } else { - val unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType).apply(row) - if (copy) unsafeRow.copy() else unsafeRow - } - } - private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = { if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) { oldSchema match { diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 6f9616b669c47..4b692222ed744 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -46,6 +46,11 @@ import java.util.Locale */ trait SparkAdapter extends Serializable { + /** + * Checks whether provided instance of [[InternalRow]] is actually an instance of [[ColumnarBatchRow]] + */ + def isColumnarBatchRow(r: InternalRow): Boolean + /** * Returns an instance of [[HoodieCatalogUtils]] providing for common utils operating on Spark's * [[TableCatalog]]s diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java index a714d60d0033a..2b19904407844 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java @@ -41,7 +41,7 @@ import scala.Tuple2; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction; +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -98,7 +98,7 @@ protected Integer getResult() { BoundedInMemoryExecutor>, Integer> executor = null; try { executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, - getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); + getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); int result = executor.execute(); // It should buffer and write 100 records assertEquals(100, result); @@ -145,7 +145,7 @@ protected Integer getResult() { BoundedInMemoryExecutor>, Integer> executor = null; try { executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, - getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); + getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); BoundedInMemoryExecutor>, Integer> finalExecutor = executor; Thread.currentThread().interrupt(); @@ -194,7 +194,7 @@ protected Integer getResult() { BoundedInMemoryExecutor>, Integer> executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter, - consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), + consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); executor.shutdownNow(); boolean terminatedGracefully = executor.awaitTermination(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java index a4f723ff010d1..b4fe688256551 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java @@ -54,7 +54,7 @@ import scala.Tuple2; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction; +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -85,7 +85,7 @@ public void testRecordReading() throws Exception { final int numRecords = 128; final List hoodieRecords = dataGen.generateInserts(instantTime, numRecords); final BoundedInMemoryQueue queue = - new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(FileIOUtils.KB, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce Future resFuture = executorService.submit(() -> { new IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue); @@ -125,7 +125,7 @@ public void testCompositeProducerRecordReading() throws Exception { final List> recs = new ArrayList<>(); final BoundedInMemoryQueue queue = - new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(FileIOUtils.KB, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); // Record Key to Map> keyToProducerAndIndexMap = new HashMap<>(); @@ -220,11 +220,11 @@ public void testMemoryLimitForBuffering() throws Exception { final int recordLimit = 5; final SizeEstimator sizeEstimator = new DefaultSizeEstimator<>(); HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = - getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0)); + getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0)); final long objSize = sizeEstimator.sizeEstimate(genResult); final long memoryLimitInBytes = recordLimit * objSize; final BoundedInMemoryQueue queue = - new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce executorService.submit(() -> { @@ -269,7 +269,7 @@ public void testException() throws Exception { final SizeEstimator>> sizeEstimator = new DefaultSizeEstimator<>(); // queue memory limit HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = - getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0)); + getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0)); final long objSize = sizeEstimator.sizeEstimate(new Tuple2(genResult.getResult(), genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties()))); final long memoryLimitInBytes = 4 * objSize; @@ -277,7 +277,7 @@ public void testException() throws Exception { // stops and throws // correct exception back. BoundedInMemoryQueue>> queue1 = - new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce Future resFuture = executorService.submit(() -> { @@ -305,7 +305,7 @@ public void testException() throws Exception { when(mockHoodieRecordsIterator.hasNext()).thenReturn(true); when(mockHoodieRecordsIterator.next()).thenThrow(expectedException); BoundedInMemoryQueue>> queue2 = - new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce Future res = executorService.submit(() -> { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java index 913bbcb97cfdc..479c8eb9d6734 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java @@ -98,8 +98,7 @@ public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean c } @Override - public HoodieRecord joinWith(HoodieRecord other, - Schema targetSchema) throws IOException { + public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { GenericRecord record = HoodieAvroUtils.stitchRecords((GenericRecord) data, (GenericRecord) other.getData(), targetSchema); return new HoodieAvroIndexedRecord(record); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index dfd2b4ba33574..5cbadece6b14b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -101,8 +101,7 @@ public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean c } @Override - public HoodieRecord joinWith(HoodieRecord other, - Schema targetSchema) throws IOException { + public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java index e57a18b59240c..b9e29787f8a98 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java @@ -21,6 +21,8 @@ import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -34,28 +36,33 @@ public class HoodieAvroRecordMerger implements HoodieRecordMerger { - public static String DE_DUPING = "de_duping"; - @Override public String getMergingStrategy() { return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID; } @Override - public Option> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException { + public Option> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException { ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.AVRO); ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.AVRO); - boolean deDuping = Boolean.parseBoolean(props.getOrDefault(DE_DUPING, "false").toString()); - if (deDuping) { - HoodieRecord res = preCombine(older, newer); - if (res == older) { - return Option.of(Pair.of(res, oldSchema)); - } else { - return Option.of(Pair.of(res, newSchema)); - } - } else { - return combineAndGetUpdateValue(older, newer, newSchema, props) - .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord) r).getData()).getSchema())); + Config.LegacyOperationMode legacyOperatingMode = Config.LegacyOperationMode.valueOf( + props.getString(Config.LEGACY_OPERATING_MODE.key(), Config.LEGACY_OPERATING_MODE.defaultValue())); + + switch (legacyOperatingMode) { + case PRE_COMBINING: + HoodieRecord res = preCombine(older, newer); + if (res == older) { + return Option.of(Pair.of(res, oldSchema)); + } else { + return Option.of(Pair.of(res, newSchema)); + } + + case COMBINING: + return combineAndGetUpdateValue(older, newer, newSchema, props) + .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord) r).getData()).getSchema())); + + default: + throw new UnsupportedOperationException(String.format("Unsupported legacy operating mode (%s)", legacyOperatingMode)); } } @@ -83,10 +90,25 @@ private Option combineAndGetUpdateValue(HoodieRecord older, Hoodie .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload)); } - public static Properties withDeDuping(Properties props) { - Properties newProps = new Properties(); - newProps.putAll(props); - newProps.setProperty(HoodieAvroRecordMerger.DE_DUPING, "true"); - return newProps; + public static class Config { + + public enum LegacyOperationMode { + PRE_COMBINING, + COMBINING + } + + public static ConfigProperty LEGACY_OPERATING_MODE = + ConfigProperty.key("hoodie.datasource.write.merger.legacy.operation") + .defaultValue(LegacyOperationMode.COMBINING.name()) + .withDocumentation("Controls the mode of the merging operation performed by `HoodieAvroRecordMerger`. " + + "This is required to maintain backward-compatibility w/ the existing semantic of `HoodieRecordPayload` " + + "implementations providing `preCombine` and `combineAndGetUpdateValue` methods."); + + public static TypedProperties withLegacyOperatingModePreCombining(Properties props) { + TypedProperties newProps = new TypedProperties(); + newProps.putAll(props); + newProps.setProperty(Config.LEGACY_OPERATING_MODE.key(), Config.LegacyOperationMode.PRE_COMBINING.name()); + return newProps; + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java index 74954a5e63137..eca1ad9a3b7c5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java @@ -92,8 +92,7 @@ public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean c } @Override - public HoodieRecord joinWith(HoodieRecord other, - Schema targetSchema) throws IOException { + public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 1acefe2204a11..363092409dc84 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -291,7 +291,7 @@ public void checkState() { /** * Support bootstrap. */ - public abstract HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException; + public abstract HoodieRecord joinWith(HoodieRecord other, Schema targetSchema); /** * Rewrite record into new schema(add meta columns) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java index 407775283159b..da413592abc6e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java @@ -19,16 +19,15 @@ package org.apache.hudi.common.model; import org.apache.avro.Schema; - import org.apache.hudi.ApiMaturityLevel; import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import java.io.IOException; import java.io.Serializable; -import java.util.Properties; /** * HoodieMerge defines how to merge two records. It is a stateless component. @@ -45,7 +44,7 @@ public interface HoodieRecordMerger extends Serializable { * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C * of the single record, both orders of operations applications have to yield the same result) */ - Option> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException; + Option> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException; /** * The record type handled by the current merger. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java index a4ff9a1896bd4..baee08163380b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java @@ -18,46 +18,44 @@ package org.apache.hudi.common.model; -import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION; - import java.util.HashMap; import java.util.Map; -import org.apache.hudi.common.util.ValidationUtils; public class MetadataValues { - private Map kv; - - public MetadataValues(Map kv) { - ValidationUtils.checkArgument(HOODIE_META_COLUMNS_WITH_OPERATION.containsAll(kv.values())); - this.kv = kv; - } + private final Map kv; public MetadataValues() { this.kv = new HashMap<>(); } - public void setCommitTime(String value) { + public MetadataValues setCommitTime(String value) { this.kv.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, value); + return this; } - public void setCommitSeqno(String value) { + public MetadataValues setCommitSeqno(String value) { this.kv.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, value); + return this; } - public void setRecordKey(String value) { + public MetadataValues setRecordKey(String value) { this.kv.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, value); + return this; } - public void setPartitionPath(String value) { + public MetadataValues setPartitionPath(String value) { this.kv.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, value); + return this; } - public void setFileName(String value) { + public MetadataValues setFileName(String value) { this.kv.put(HoodieRecord.FILENAME_METADATA_FIELD, value); + return this; } - public void setOperation(String value) { + public MetadataValues setOperation(String value) { this.kv.put(HoodieRecord.OPERATION_METADATA_FIELD, value); + return this; } public Map getKv() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 8a4901019c8f5..862cea8f79af0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieAvroRecordMerger; import org.apache.hudi.common.model.HoodieLogFile; @@ -96,7 +97,7 @@ public abstract class AbstractHoodieLogRecordReader { protected final String preCombineField; // Stateless component for merging records protected final HoodieRecordMerger recordMerger; - private final Properties payloadProps = new Properties(); + private final TypedProperties payloadProps; // simple key gen fields private Option> simpleKeyGenFields = Option.empty(); // Log File Paths @@ -164,10 +165,11 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logBlocks, int } private ClosableIterator getRecordsIterator(HoodieDataBlock dataBlock, Option keySpecOpt, HoodieRecordType type) throws IOException { - ClosableIterator iter; if (keySpecOpt.isPresent()) { KeySpec keySpec = keySpecOpt.get(); - iter = unsafeCast(dataBlock.getRecordIterator(keySpec.keys, keySpec.fullKey, type)); + return unsafeCast(dataBlock.getRecordIterator(keySpec.keys, keySpec.fullKey, type)); } else { - iter = unsafeCast(dataBlock.getRecordIterator(type)); + return unsafeCast(dataBlock.getRecordIterator(type)); } - return iter; } /** @@ -524,7 +524,7 @@ public boolean isWithOperationField() { return withOperationField; } - protected Properties getPayloadProps() { + protected TypedProperties getPayloadProps() { return payloadProps; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 42294c3d110a7..7d2e2a654a7d5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -155,7 +155,8 @@ protected void processNextRecord(HoodieRecord newRecord) throws IOExcepti HoodieRecord oldRecord = records.get(key); T oldValue = oldRecord.getData(); - HoodieRecord combinedRecord = (HoodieRecord) recordMerger.merge(oldRecord, readerSchema, newRecord, readerSchema, this.getPayloadProps()).get().getLeft(); + HoodieRecord combinedRecord = (HoodieRecord) recordMerger.merge(oldRecord, readerSchema, + newRecord, readerSchema, this.getPayloadProps()).get().getLeft(); // If combinedValue is oldValue, no need rePut oldRecord if (combinedRecord.getData() != oldValue) { records.put(key, combinedRecord.copy()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index 19ecdbf9ee54f..2e499f4ae9015 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -179,7 +179,7 @@ public final ClosableIterator> getRecordIterator(List keySet = new HashSet<>(keys); - return FilteringIterator.getInstance(allRecords, keySet, fullKey, record -> getRecordKey(record)); + return FilteringIterator.getInstance(allRecords, keySet, fullKey, this::getRecordKey); } protected ClosableIterator> readRecordsFromBlockPayload(HoodieRecordType type) throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index faa7856e438da..8f23dce02afb3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.table.log.block; -import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; @@ -34,19 +33,15 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.inline.InLineFSUtils; import org.apache.hudi.common.fs.inline.InLineFileSystem; -import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; -import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; -import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockContentLocation; -import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.MappingIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieHBaseKVComparator; import org.apache.hudi.io.storage.HoodieAvroHFileReader; +import org.apache.hudi.io.storage.HoodieHBaseKVComparator; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -58,10 +53,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.TreeMap; -import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.common.util.TypeUtils.unsafeCast; +import static org.apache.hudi.common.util.ValidationUtils.checkState; /** * HoodieHFileDataBlock contains a list of records stored inside an HFile format. It is used with the HFile @@ -205,10 +201,10 @@ protected ClosableIterator> lookupRecords(List keys, new HoodieAvroHFileReader(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf)); // Get writer's schema from the header - final ClosableIterator recordIterator = - fullKey ? reader.getIndexedRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getIndexedRecordsByKeyPrefixIterator(sortedKeys, readerSchema); + final ClosableIterator> recordIterator = + fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema); - return new MappingIterator<>(recordIterator, data -> (HoodieRecord) new HoodieAvroIndexedRecord((data))); + return new MappingIterator<>(recordIterator, data -> (HoodieRecord) data); } private byte[] serializeRecord(HoodieRecord record, Schema schema) throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java index 42ea766eb90e9..f681bdfe844d3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java @@ -32,11 +32,11 @@ public class ParquetReaderIterator implements ClosableIterator { // Parquet reader for an existing parquet file - private final ParquetReader parquetReader; + private final ParquetReader parquetReader; // Holds the next entry returned by the parquet reader private T next; - public ParquetReaderIterator(ParquetReader parquetReader) { + public ParquetReaderIterator(ParquetReader parquetReader) { this.parquetReader = parquetReader; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/VisibleForTesting.java b/hudi-common/src/main/java/org/apache/hudi/common/util/VisibleForTesting.java new file mode 100644 index 0000000000000..c6e4e488f0959 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/VisibleForTesting.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +/** + * Annotation designating a field or a method as visible for the testing purposes + */ +public @interface VisibleForTesting { +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java index c2ddfa319fae9..a829880d5f948 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java @@ -18,71 +18,10 @@ package org.apache.hudi.io.storage; -import org.apache.hudi.common.model.HoodieAvroIndexedRecord; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.common.util.MappingIterator; -import org.apache.hudi.common.util.Option; - -import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.List; - -import static org.apache.hudi.common.util.TypeUtils.unsafeCast; - -public interface HoodieAvroFileReader extends HoodieFileReader, AutoCloseable { - - ClosableIterator getIndexedRecordIterator(Schema readerSchema) throws IOException; - - ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException; - - default Option getIndexedRecordByKey(String key, Schema readerSchema) throws IOException { - throw new UnsupportedOperationException(); - } - - default ClosableIterator getIndexedRecordsByKeysIterator(List keys, Schema schema) throws IOException { - throw new UnsupportedOperationException(); - } - - default ClosableIterator getIndexedRecordsByKeysIterator(List keys) throws IOException { - return getIndexedRecordsByKeysIterator(keys, getSchema()); - } - - default ClosableIterator getIndexedRecordsByKeyPrefixIterator(List keyPrefixes, Schema schema) throws IOException { - throw new UnsupportedEncodingException(); - } - - default ClosableIterator getIndexedRecordsByKeyPrefixIterator(List keyPrefixes) throws IOException { - return getIndexedRecordsByKeyPrefixIterator(keyPrefixes, getSchema()); - } - - default ClosableIterator> getRecordsByKeysIterator(List keys, Schema schema) throws IOException { - ClosableIterator iterator = getIndexedRecordsByKeysIterator(keys, schema); - return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); - } - - default ClosableIterator> getRecordsByKeyPrefixIterator(List keyPrefixes, Schema schema) throws IOException { - ClosableIterator iterator = getIndexedRecordsByKeyPrefixIterator(keyPrefixes, schema); - return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); - } - - @Override - default ClosableIterator> getRecordIterator(Schema schema) throws IOException { - ClosableIterator iterator = getIndexedRecordIterator(schema); - return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); - } - - @Override - default ClosableIterator> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { - ClosableIterator iterator = getIndexedRecordIterator(readerSchema, requestedSchema); - return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); - } - - default Option> getRecordByKey(String key, Schema readerSchema) throws IOException { - return getIndexedRecordByKey(key, readerSchema) - .map(data -> unsafeCast(new HoodieAvroIndexedRecord(data))); - } -} +/** + * Marker interface for every {@link HoodieFileReader} reading in Avro (ie + * producing {@link IndexedRecord}s) + */ +public interface HoodieAvroFileReader extends HoodieFileReader {} diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java new file mode 100644 index 0000000000000..5cee50449a443 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.MappingIterator; + +import java.io.IOException; + +import static org.apache.hudi.common.util.TypeUtils.unsafeCast; + +/** + * Base class for every {@link HoodieAvroFileReader} + */ +abstract class HoodieAvroFileReaderBase implements HoodieAvroFileReader { + + @Override + public ClosableIterator> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { + ClosableIterator iterator = getIndexedRecordIterator(readerSchema, requestedSchema); + return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); + } + + protected ClosableIterator getIndexedRecordIterator(Schema readerSchema) throws IOException { + return getIndexedRecordIterator(readerSchema, readerSchema); + } + + protected abstract ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException; +} diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java index 90323d88831ba..7a70a1f2a33c3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java @@ -37,8 +37,12 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.MappingIterator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.io.ByteBufferBackedInputStream; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -53,10 +57,11 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import java.util.TreeSet; +import java.util.stream.Collectors; import static org.apache.hudi.common.util.CollectionUtils.toStream; +import static org.apache.hudi.common.util.TypeUtils.unsafeCast; import static org.apache.hudi.common.util.ValidationUtils.checkState; /** @@ -64,7 +69,7 @@ *

* {@link HoodieFileReader} implementation allowing to read from {@link HFile}. */ -public class HoodieAvroHFileReader implements HoodieAvroFileReader { +public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements HoodieSeekingFileReader { // TODO HoodieHFileReader right now tightly coupled to MT, we should break that coupling public static final String SCHEMA_KEY = "schema"; @@ -114,6 +119,30 @@ public HoodieAvroHFileReader(Path path, HFile.Reader reader, Option sche .orElseGet(() -> Lazy.lazily(() -> fetchSchema(reader))); } + @Override + public Option> getRecordByKey(String key, Schema readerSchema) throws IOException { + synchronized (sharedScannerLock) { + return fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema) + .map(data -> unsafeCast(new HoodieAvroIndexedRecord(data))); + } + } + + @Override + public ClosableIterator> getRecordsByKeysIterator(List keys, Schema schema) throws IOException { + // We're caching blocks for this scanner to minimize amount of traffic + // to the underlying storage as we fetched (potentially) sparsely distributed + // keys + HFileScanner scanner = getHFileScanner(reader, true); + ClosableIterator iterator = new RecordByKeyIterator(scanner, keys, getSchema(), schema); + return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); + } + + @Override + public ClosableIterator> getRecordsByKeyPrefixIterator(List keyPrefixes, Schema schema) throws IOException { + ClosableIterator iterator = getIndexedRecordsByKeyPrefixIterator(keyPrefixes, schema); + return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); + } + @Override public String[] readMinMaxRecordKeys() { // NOTE: This access to reader is thread-safe @@ -169,28 +198,19 @@ public Set filterRowKeys(Set candidateRowKeys) { } } - @SuppressWarnings("unchecked") @Override - public Option getIndexedRecordByKey(String key, Schema readerSchema) throws IOException { - synchronized (sharedScannerLock) { - return fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema); + protected ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) { + if (!Objects.equals(readerSchema, requestedSchema)) { + throw new UnsupportedOperationException("Schema projections are not supported in HFile reader"); } - } - public ClosableIterator getIndexedRecordIterator(Schema readerSchema) throws IOException { // TODO eval whether seeking scanner would be faster than pread HFileScanner scanner = getHFileScanner(reader, false); return new RecordIterator(scanner, getSchema(), readerSchema); } - @Override - public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { - throw new UnsupportedOperationException(); - } - - @SuppressWarnings("unchecked") - @Override - public ClosableIterator getIndexedRecordsByKeysIterator(List keys, Schema readerSchema) throws IOException { + @VisibleForTesting + protected ClosableIterator getIndexedRecordsByKeysIterator(List keys, Schema readerSchema) throws IOException { // We're caching blocks for this scanner to minimize amount of traffic // to the underlying storage as we fetched (potentially) sparsely distributed // keys @@ -198,9 +218,8 @@ public ClosableIterator getIndexedRecordsByKeysIterator(List getIndexedRecordsByKeyPrefixIterator(List keyPrefixes, Schema readerSchema) throws IOException { + @VisibleForTesting + protected ClosableIterator getIndexedRecordsByKeyPrefixIterator(List keyPrefixes, Schema readerSchema) throws IOException { // We're caching blocks for this scanner to minimize amount of traffic // to the underlying storage as we fetched (potentially) sparsely distributed // keys diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java index 051add110282e..77b86fe1f407d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java @@ -36,11 +36,13 @@ import org.apache.orc.TypeDescription; import java.io.IOException; +import java.util.Objects; import java.util.Set; -public class HoodieAvroOrcReader implements HoodieAvroFileReader { - private Path path; - private Configuration conf; +public class HoodieAvroOrcReader extends HoodieAvroFileReaderBase { + + private final Path path; + private final Configuration conf; private final BaseFileUtils orcUtils; public HoodieAvroOrcReader(Configuration configuration, Path path) { @@ -65,30 +67,28 @@ public Set filterRowKeys(Set candidateRowKeys) { } @Override - public ClosableIterator getIndexedRecordIterator(Schema schema) throws IOException { + protected ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { + if (!Objects.equals(readerSchema, requestedSchema)) { + throw new UnsupportedOperationException("Schema projections are not supported in HFile reader"); + } + try { Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); - TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(schema); + TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readerSchema); RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema)); - return new OrcReaderIterator<>(recordReader, schema, orcSchema); + return new OrcReaderIterator<>(recordReader, readerSchema, orcSchema); } catch (IOException io) { throw new HoodieIOException("Unable to create an ORC reader.", io); } } - @Override - public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { - throw new UnsupportedOperationException(); - } - @Override public Schema getSchema() { return orcUtils.readAvroSchema(conf, path); } @Override - public void close() { - } + public void close() {} @Override public long getTotalRecords() { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java index 769ef391b4630..e249da0b8a835 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java @@ -23,9 +23,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.MappingIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.parquet.avro.AvroParquetReader; @@ -37,12 +40,14 @@ import java.util.List; import java.util.Set; -public class HoodieAvroParquetReader implements HoodieAvroFileReader { +import static org.apache.hudi.common.util.TypeUtils.unsafeCast; + +public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase { private final Path path; private final Configuration conf; private final BaseFileUtils parquetUtils; - private List readerIterators = new ArrayList<>(); + private final List readerIterators = new ArrayList<>(); public HoodieAvroParquetReader(Configuration configuration, Path path) { this.conf = configuration; @@ -50,6 +55,15 @@ public HoodieAvroParquetReader(Configuration configuration, Path path) { this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); } + @Override + public ClosableIterator> getRecordIterator(Schema readerSchema) throws IOException { + // TODO(HUDI-4588) remove after HUDI-4588 is resolved + // NOTE: This is a workaround to avoid leveraging projection w/in [[AvroParquetReader]], + // until schema handling issues (nullability canonicalization, etc) are resolved + ClosableIterator iterator = getIndexedRecordIterator(readerSchema); + return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); + } + @Override public String[] readMinMaxRecordKeys() { return parquetUtils.readMinMaxRecordKeys(conf, path); @@ -66,12 +80,12 @@ public Set filterRowKeys(Set candidateRowKeys) { } @Override - public ClosableIterator getIndexedRecordIterator(Schema schema) throws IOException { + protected ClosableIterator getIndexedRecordIterator(Schema schema) throws IOException { return getIndexedRecordIteratorInternal(schema, Option.empty()); } @Override - public ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { + protected ClosableIterator getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { return getIndexedRecordIteratorInternal(readerSchema, Option.of(requestedSchema)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java index 8152a176a074b..30dbc5b94ea57 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java @@ -22,13 +22,23 @@ import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.common.util.Option; import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.List; import java.util.Set; +/** + * Hudi's File Reader interface providing common set of APIs to fetch + * + *

    + *
  • {@link HoodieRecord}s
  • + *
  • Metadata (statistics, bloom-filters, etc)
  • + *
+ * + * from a file persisted in storage. + * + * @param target engine-specific representation of the raw data ({@code IndexedRecord} for Avro, + * {@code InternalRow} for Spark, etc) + */ public interface HoodieFileReader extends AutoCloseable { String[] readMinMaxRecordKeys(); @@ -37,36 +47,14 @@ public interface HoodieFileReader extends AutoCloseable { Set filterRowKeys(Set candidateRowKeys); - ClosableIterator> getRecordIterator(Schema readerSchema) throws IOException; - ClosableIterator> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException; - default ClosableIterator> getRecordIterator() throws IOException { - return getRecordIterator(getSchema()); + default ClosableIterator> getRecordIterator(Schema readerSchema) throws IOException { + return getRecordIterator(readerSchema, readerSchema); } - default Option> getRecordByKey(String key, Schema readerSchema) throws IOException { - throw new UnsupportedOperationException(); - } - - default Option> getRecordByKey(String key) throws IOException { - return getRecordByKey(key, getSchema()); - } - - default ClosableIterator> getRecordsByKeysIterator(List keys, Schema schema) throws IOException { - throw new UnsupportedOperationException(); - } - - default ClosableIterator> getRecordsByKeysIterator(List keys) throws IOException { - return getRecordsByKeysIterator(keys, getSchema()); - } - - default ClosableIterator> getRecordsByKeyPrefixIterator(List keyPrefixes, Schema schema) throws IOException { - throw new UnsupportedEncodingException(); - } - - default ClosableIterator> getRecordsByKeyPrefixIterator(List keyPrefixes) throws IOException { - return getRecordsByKeyPrefixIterator(keyPrefixes, getSchema()); + default ClosableIterator> getRecordIterator() throws IOException { + return getRecordIterator(getSchema()); } Schema getSchema(); diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java new file mode 100644 index 0000000000000..ee4a5f5e7a0f1 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.avro.Schema; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.List; + +public interface HoodieSeekingFileReader extends HoodieFileReader { + + default Option> getRecordByKey(String key, Schema readerSchema) throws IOException { + throw new UnsupportedOperationException(); + } + + default Option> getRecordByKey(String key) throws IOException { + return getRecordByKey(key, getSchema()); + } + + default ClosableIterator> getRecordsByKeysIterator(List keys, Schema schema) throws IOException { + throw new UnsupportedOperationException(); + } + + default ClosableIterator> getRecordsByKeysIterator(List keys) throws IOException { + return getRecordsByKeysIterator(keys, getSchema()); + } + + default ClosableIterator> getRecordsByKeyPrefixIterator(List keyPrefixes, Schema schema) throws IOException { + throw new UnsupportedEncodingException(); + } + + default ClosableIterator> getRecordsByKeyPrefixIterator(List keyPrefixes) throws IOException { + return getRecordsByKeyPrefixIterator(keyPrefixes, getSchema()); + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index f777c55e89e0f..b22ebc1ad710e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -18,6 +18,9 @@ package org.apache.hudi.metadata; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -50,12 +53,8 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.TableNotFoundException; -import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.Path; +import org.apache.hudi.io.storage.HoodieSeekingFileReader; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -97,7 +96,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { private final boolean reuse; // Readers for the latest file slice corresponding to file groups in the metadata partition - private Map, Pair> partitionReaders = + private final Map, Pair, HoodieMetadataMergedLogRecordReader>> partitionReaders = new ConcurrentHashMap<>(); public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, @@ -164,12 +163,12 @@ public HoodieData> getRecordsByKeyPrefixes(L (SerializableFunction>>) fileSlice -> { // NOTE: Since this will be executed by executors, we can't access previously cached // readers, and therefore have to always open new ones - Pair readers = + Pair, HoodieMetadataMergedLogRecordReader> readers = openReaders(partitionName, fileSlice); try { List timings = new ArrayList<>(); - HoodieFileReader baseFileReader = readers.getKey(); + HoodieSeekingFileReader baseFileReader = readers.getKey(); HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); if (baseFileReader == null && logRecordScanner == null) { @@ -210,11 +209,11 @@ public List>>> getRecord List>>> result = new ArrayList<>(); AtomicInteger fileSlicesKeysCount = new AtomicInteger(); partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> { - Pair readers = + Pair, HoodieMetadataMergedLogRecordReader> readers = getOrCreateReaders(partitionName, partitionFileSlicePair.getRight()); try { List timings = new ArrayList<>(); - HoodieFileReader baseFileReader = readers.getKey(); + HoodieSeekingFileReader baseFileReader = readers.getKey(); HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); if (baseFileReader == null && logRecordScanner == null) { return; @@ -281,7 +280,7 @@ private Map>> readLogRecords( return logRecords; } - private List>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader, + private List>>> readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader reader, List keys, boolean fullKeys, Map>> logRecords, @@ -290,7 +289,7 @@ private List>>> readFrom HoodieTimer timer = new HoodieTimer().startTimer(); timer.startTimer(); - if (baseFileReader == null) { + if (reader == null) { // No base file at all timings.add(timer.endTimer()); if (fullKeys) { @@ -310,7 +309,7 @@ private List>>> readFrom readTimer.startTimer(); Map> records = - fetchBaseFileRecordsByKeys(baseFileReader, keys, fullKeys, partitionName); + fetchBaseFileRecordsByKeys(reader, keys, fullKeys, partitionName); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); @@ -342,12 +341,14 @@ private List>>> readFrom } } - private Map> fetchBaseFileRecordsByKeys(HoodieFileReader baseFileReader, + @SuppressWarnings("unchecked") + private Map> fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader, List keys, boolean fullKeys, String partitionName) throws IOException { - ClosableIterator records = fullKeys ? baseFileReader.getRecordsByKeysIterator(keys) - : baseFileReader.getRecordsByKeyPrefixIterator(keys); + ClosableIterator> records = fullKeys + ? reader.getRecordsByKeysIterator(keys) + : reader.getRecordsByKeyPrefixIterator(keys); return toStream(records) .map(record -> { @@ -402,21 +403,21 @@ private Map, List> getPartitionFileSliceToKeysMa * @param slice - The file slice to open readers for * @return File reader and the record scanner pair for the requested file slice */ - private Pair getOrCreateReaders(String partitionName, FileSlice slice) { + private Pair, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) { if (reuse) { - return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> { - return openReaders(partitionName, slice); }); + Pair key = Pair.of(partitionName, slice.getFileId()); + return partitionReaders.computeIfAbsent(key, ignored -> openReaders(partitionName, slice)); } else { return openReaders(partitionName, slice); } } - private Pair openReaders(String partitionName, FileSlice slice) { + private Pair, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) { try { HoodieTimer timer = new HoodieTimer().startTimer(); // Open base file reader - Pair baseFileReaderOpenTimePair = getBaseFileReader(slice, timer); - HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey(); + Pair, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer); + HoodieSeekingFileReader baseFileReader = baseFileReaderOpenTimePair.getKey(); final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue(); // Open the log record scanner using the log files from the latest file slice @@ -434,18 +435,20 @@ private Pair openReaders( } } - private Pair getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException { - HoodieFileReader baseFileReader = null; + private Pair, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException { + HoodieSeekingFileReader baseFileReader; Long baseFileOpenMs; // If the base file is present then create a reader Option basefile = slice.getBaseFile(); if (basefile.isPresent()) { - String basefilePath = basefile.get().getPath(); - baseFileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(hadoopConf.get(), new Path(basefilePath)); + String baseFilePath = basefile.get().getPath(); + baseFileReader = (HoodieSeekingFileReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + .getFileReader(hadoopConf.get(), new Path(baseFilePath)); baseFileOpenMs = timer.endTimer(); - LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", basefilePath, + LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", baseFilePath, basefile.get().getCommitTime(), baseFileOpenMs)); } else { + baseFileReader = null; baseFileOpenMs = 0L; timer.endTimer(); } @@ -572,7 +575,7 @@ public void close() { * @param partitionFileSlicePair - Partition and FileSlice */ private synchronized void close(Pair partitionFileSlicePair) { - Pair readers = + Pair, HoodieMetadataMergedLogRecordReader> readers = partitionReaders.remove(partitionFileSlicePair); closeReader(readers); } @@ -587,7 +590,7 @@ private void closePartitionReaders() { partitionReaders.clear(); } - private void closeReader(Pair readers) { + private void closeReader(Pair, HoodieMetadataMergedLogRecordReader> readers) { if (readers != null) { try { if (readers.getKey() != null) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 1b3bc83547eec..459dff0b45d46 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -25,9 +25,11 @@ import org.apache.hudi.client.utils.ConcatenatingIterator; import org.apache.hudi.common.model.ClusteringGroupInfo; import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieFileSliceReader; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.MappingIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -301,8 +303,10 @@ private Iterator readRecordsForGroupBaseFiles(List List> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> { Iterable indexedRecords = () -> { try { - return ((HoodieAvroFileReader)HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType()) - .getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))).getIndexedRecordIterator(readerSchema); + HoodieFileReaderFactory fileReaderFactory = HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType()); + HoodieAvroFileReader fileReader = (HoodieAvroFileReader) fileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + + return new MappingIterator<>(fileReader.getRecordIterator(readerSchema), HoodieRecord::getData); } catch (IOException e) { throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index e9accb0a86079..4daf8df52324c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.format.mor; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -715,7 +716,7 @@ protected static class MergeIterator implements RecordIterator { private final Set keyToSkip = new HashSet<>(); - private final Properties payloadProps; + private final TypedProperties payloadProps; private RowData currentRecord; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java index 5b7e7fbc6774b..a3b4a6c1660c6 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.hudi.io.storage.HoodieAvroHFileReader; @@ -41,7 +42,7 @@ public class HoodieHFileRecordReader implements RecordReader recordIterator; + private Iterator> recordIterator; private Schema schema; public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException { @@ -56,14 +57,14 @@ public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job @Override public boolean next(NullWritable key, ArrayWritable value) throws IOException { if (recordIterator == null) { - recordIterator = reader.getIndexedRecordIterator(schema); + recordIterator = reader.getRecordIterator(schema); } if (!recordIterator.hasNext()) { return false; } - IndexedRecord record = recordIterator.next(); + IndexedRecord record = recordIterator.next().getData(); ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schema); value.set(aWritable.get()); count++; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 0644656ce4e40..7d8aa9f50412f 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.HoodieRecordUtils; +import org.apache.hudi.common.util.MappingIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypeUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -269,7 +270,7 @@ private Iterator readColumnarOrLogFiles(FileSlice fileSlice) thro Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); HoodieAvroFileReader reader = TypeUtils.unsafeCast(HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(metaClient.getHadoopConf(), new Path(fileSlice.getBaseFile().get().getPath()))); - return reader.getIndexedRecordIterator(schema); + return new MappingIterator<>(reader.getRecordIterator(schema), HoodieRecord::getData); } else { // If there is no data file, fall back to reading log files HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java index 04daafbab0ff6..ff51e02702050 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java @@ -18,17 +18,16 @@ package org.apache.hudi; +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieRecordMerger; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; - -import org.apache.avro.Schema; +import org.apache.hudi.common.util.collection.Pair; import java.io.IOException; -import java.util.Properties; public class HoodieSparkRecordMerger implements HoodieRecordMerger { @@ -38,7 +37,7 @@ public String getMergingStrategy() { } @Override - public Option> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException { + public Option> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException { ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.SPARK); ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.SPARK); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 53380c9aa6c98..9bbf0a9414723 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -752,9 +752,9 @@ object HoodieBaseRelation extends SparkAdapterSupport { val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr) val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema) - reader.getIndexedRecordIterator(requiredAvroSchema).asScala + reader.getRecordIterator(requiredAvroSchema).asScala .map(record => { - avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get + avroToRowConverter.apply(record.getData.asInstanceOf[GenericRecord]).get }) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 1b8cb5793615b..99313884c33f9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -355,18 +355,7 @@ object HoodieSparkSqlWriter { processedRecord } - def getSparkProcessedRecord(partitionParam: String, record: InternalRow, - dropPartitionColumns: Boolean, schema: StructType): (InternalRow, StructType) = { - var processedRecord = record - var writeSchema = schema - if (dropPartitionColumns) { - writeSchema = generateSparkSchemaWithoutPartitionColumns(partitionParam, schema) - processedRecord = HoodieInternalRowUtils.rewriteRecord(record, schema, writeSchema) - } - (processedRecord, writeSchema) - } - - def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = { + private def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = { val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false" parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable) @@ -874,16 +863,23 @@ object HoodieSparkSqlWriter { // ut will use AvroKeyGenerator, so we need to cast it in spark record val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface] val structType = HoodieInternalRowUtils.getCachedSchema(schema) - df.queryExecution.toRdd.map(row => { - val internalRow = row.copy() - val (processedRow, writeSchema) = getSparkProcessedRecord(partitionCols, internalRow, dropPartitionColumns, structType) - val recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType) - val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType) - val key = new HoodieKey(recordKey.toString, partitionPath.toString) - - new HoodieSparkRecord(key, processedRow, writeSchema) - }).toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]] + df.queryExecution.toRdd.mapPartitions { iter => + val projection: Function[InternalRow, InternalRow] = if (dropPartitionColumns) { + val newSchema = generateSparkSchemaWithoutPartitionColumns(partitionCols, structType) + HoodieInternalRowUtils.getCachedUnsafeProjection(structType, newSchema) + } else { + identity + } + + iter.map { internalRow => + val processedRow = projection(internalRow) + val recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType) + val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType) + val key = new HoodieKey(recordKey.toString, partitionPath.toString) + + new HoodieSparkRecord(key, processedRow, false) + } + }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]] } } - } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala index f5a38343042cd..256bc14e82bf8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala @@ -18,36 +18,36 @@ package org.apache.hudi +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection} import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport -import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord} -import org.apache.hudi.config.HoodiePayloadConfig -import org.apache.hudi.commmon.model.HoodieSparkRecord -import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes import org.apache.hudi.LogFileIterator._ -import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} +import org.apache.hudi.commmon.model.HoodieSparkRecord +import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext} -import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath +import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType +import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord} import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner import org.apache.hudi.common.util.HoodieRecordUtils +import org.apache.hudi.config.HoodiePayloadConfig import org.apache.hudi.hadoop.config.HoodieRealtimeConfig +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes import org.apache.hudi.internal.schema.InternalSchema -import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable -import org.apache.avro.Schema -import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred.JobConf -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} import org.apache.spark.sql.HoodieCatalystExpressionUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.types.StructType + import java.io.Closeable -import java.util.Properties -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.util.Try @@ -65,13 +65,14 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) - protected val payloadProps = tableState.preCombineFieldOpt + protected val payloadProps: TypedProperties = tableState.preCombineFieldOpt .map { preCombineField => HoodiePayloadConfig.newBuilder .withPayloadOrderingField(preCombineField) .build .getProps - }.getOrElse(new Properties()) + } + .getOrElse(new TypedProperties()) protected override val avroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) protected override val structTypeSchema: StructType = requiredSchema.structTypeSchema @@ -82,8 +83,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, protected var recordToLoad: InternalRow = _ protected val requiredSchemaSafeAvroProjection: SafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema) - - protected val requiredSchemaSafeRowProjection: UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, structTypeSchema) + protected val requiredSchemaUnsafeRowProjection: UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, structTypeSchema) // TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe private var logScanner = { @@ -122,7 +122,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit, recordToLoad = deserialize(projectedAvroRecord) true case Some(r: HoodieSparkRecord) => - recordToLoad = requiredSchemaSafeRowProjection(r.getData) + recordToLoad = requiredSchemaUnsafeRowProjection(r.getData) true case None => this.hasNextInternal } @@ -238,7 +238,7 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, // on the record from the Delta Log recordMerger.getRecordType match { case HoodieRecordType.SPARK => - val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema) + val curRecord = new HoodieSparkRecord(curRow) val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps) toScalaOption(result) .map(r => { diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java index 371a2d7b41333..dc408ee919473 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java @@ -18,6 +18,7 @@ package org.apache.hudi.bootstrap; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.avro.HoodieAvroUtils; @@ -38,13 +39,10 @@ import org.apache.hudi.keygen.SparkKeyGeneratorInterface; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; - -import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import java.io.IOException; @@ -94,12 +92,11 @@ public JavaRDD generateInputRecords(String tableName, String sourc } else if (recordType == HoodieRecordType.SPARK) { SparkKeyGeneratorInterface sparkKeyGenerator = (SparkKeyGeneratorInterface) keyGenerator; StructType structType = inputDataset.schema(); - return inputDataset.queryExecution().toRdd().toJavaRDD().map(row -> { - InternalRow internalRow = row.copy(); + return inputDataset.queryExecution().toRdd().toJavaRDD().map(internalRow -> { String recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType).toString(); String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType).toString(); HoodieKey key = new HoodieKey(recordKey, partitionPath); - return new HoodieSparkRecord(key, internalRow, structType); + return new HoodieSparkRecord(key, internalRow, false); }); } else { throw new UnsupportedOperationException(recordType.name()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 03dcd2db4d2b8..a77a14047dfbb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -646,7 +646,8 @@ class TestCOWDataSource extends HoodieClientTestBase { var recordsReadDF = spark.read.format("org.apache.hudi") .options(readOpts) .load(basePath + "/*/*") - assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0) + + assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count()) // Specify fieldType as TIMESTAMP writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts) @@ -659,7 +660,8 @@ class TestCOWDataSource extends HoodieClientTestBase { .options(readOpts) .load(basePath + "/*/*") val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd"))) - assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0) + + assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count()) // Mixed fieldType writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts) diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index d4ca8bc402015..d537dcef4b424 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql._ +import org.apache.spark.sql.execution.vectorized.MutableColumnarRow import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ @@ -48,6 +49,12 @@ import scala.collection.mutable.ArrayBuffer */ class Spark2Adapter extends SparkAdapter { + override def isColumnarBatchRow(r: InternalRow): Boolean = { + // NOTE: In Spark 2.x there's no [[ColumnarBatchRow]], instead [[MutableColumnarRow]] is leveraged + // for vectorized reads + r.isInstanceOf[MutableColumnarRow] + } + override def getCatalogUtils: HoodieCatalogUtils = { throw new UnsupportedOperationException("Catalog utilities are not supported in Spark 2.x"); } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala index bed6ff33508ba..0a4bc289b35e7 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.vectorized.ColumnarUtils import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalogUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils, HoodieSpark3CatalogUtils, SparkSession} /** @@ -37,6 +38,8 @@ import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansU */ class Spark3_1Adapter extends BaseSpark3Adapter { + override def isColumnarBatchRow(r: InternalRow): Boolean = ColumnarUtils.isColumnarBatchRow(r) + override def getCatalogUtils: HoodieSpark3CatalogUtils = HoodieSpark31CatalogUtils override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala new file mode 100644 index 0000000000000..e6015a65cb0d6 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.vectorized + +import org.apache.spark.sql.catalyst.InternalRow + +object ColumnarUtils { + + /** + * Utility verifying whether provided instance of [[InternalRow]] is actually + * an instance of [[ColumnarBatchRow]] + * + * NOTE: This utility is required, since in Spark <= 3.3 [[ColumnarBatchRow]] is package-private + */ + def isColumnarBatchRow(r: InternalRow): Boolean = r.isInstanceOf[ColumnarBatchRow] + +} diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala index 8e15745d57bb1..4c58f6b1190b3 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala @@ -29,12 +29,15 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Sp import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} import org.apache.spark.sql.parser.HoodieSpark3_2ExtendedSqlParser import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.vectorized.ColumnarUtils /** * Implementation of [[SparkAdapter]] for Spark 3.2.x branch */ class Spark3_2Adapter extends BaseSpark3Adapter { + override def isColumnarBatchRow(r: InternalRow): Boolean = ColumnarUtils.isColumnarBatchRow(r) + override def getCatalogUtils: HoodieSpark3CatalogUtils = HoodieSpark32CatalogUtils override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala new file mode 100644 index 0000000000000..e6015a65cb0d6 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.vectorized + +import org.apache.spark.sql.catalyst.InternalRow + +object ColumnarUtils { + + /** + * Utility verifying whether provided instance of [[InternalRow]] is actually + * an instance of [[ColumnarBatchRow]] + * + * NOTE: This utility is required, since in Spark <= 3.3 [[ColumnarBatchRow]] is package-private + */ + def isColumnarBatchRow(r: InternalRow): Boolean = r.isInstanceOf[ColumnarBatchRow] + +} diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala index 72bc1d19f47fc..5f5414eb7ee3e 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32PlusHoodieParquetFileFormat} import org.apache.spark.sql.parser.HoodieSpark3_3ExtendedSqlParser import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.vectorized.ColumnarBatchRow import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark33CatalogUtils, HoodieSpark33CatalystExpressionUtils, HoodieSpark33CatalystPlanUtils, HoodieSpark3CatalogUtils, SparkSession} /** @@ -35,6 +36,8 @@ import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansU */ class Spark3_3Adapter extends BaseSpark3Adapter { + override def isColumnarBatchRow(r: InternalRow): Boolean = r.isInstanceOf[ColumnarBatchRow] + override def getCatalogUtils: HoodieSpark3CatalogUtils = HoodieSpark33CatalogUtils override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark33CatalystExpressionUtils