Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,12 @@ object DefaultSource {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)

if (fileFormatUtils.isDefined) {
new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
} else {
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
}
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,28 +231,23 @@ class HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
)

val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
val fileGroupReaderBasedFileFormat = new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, isBootstrap, false, shouldUseRecordPosition, Seq.empty)

val newHoodieParquetFileFormat = new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true, isBootstrap, false, Seq.empty)

val multipleBaseFileFormat = new HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true, false, Seq.empty)

override def buildFileIndex(): FileIndex = fileIndex

override def buildFileFormat(): FileFormat = {
if (fileGroupReaderEnabled && !isBootstrap) {
fileGroupReaderBasedFileFormat
new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
} else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && !isBootstrap) {
multipleBaseFileFormat
new HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true, false, Seq.empty)
} else {
newHoodieParquetFileFormat
new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, true, isBootstrap, false, Seq.empty)
}
}

Expand Down Expand Up @@ -286,20 +281,24 @@ class HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val sqlContex
override val fileIndex = new HoodieIncrementalFileIndex(
sparkSession, metaClient, schemaSpec, options, FileStatusCache.getOrCreate(sparkSession), true, true)

override val fileGroupReaderBasedFileFormat = new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, isBootstrap, true, shouldUseRecordPosition, fileIndex.getRequiredFilters)

override val newHoodieParquetFileFormat = new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, isBootstrap, true, fileIndex.getRequiredFilters)

override val multipleBaseFileFormat = new HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, true, fileIndex.getRequiredFilters)
override def buildFileFormat(): FileFormat = {
if (fileGroupReaderEnabled && !isBootstrap) {
new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, isBootstrap, true, shouldUseRecordPosition, fileIndex.getRequiredFilters)
} else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && !isBootstrap) {
new HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, true, fileIndex.getRequiredFilters)
} else {
new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
true, isBootstrap, true, fileIndex.getRequiredFilters)
}
}
}

class HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext: SQLContext,
Expand All @@ -319,18 +318,22 @@ class HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
FileStatusCache.getOrCreate(sparkSession),
shouldEmbedFileSlices = true)

override val fileGroupReaderBasedFileFormat = new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, isBootstrap, false, shouldUseRecordPosition, Seq.empty)

override val newHoodieParquetFileFormat = new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, false, isBootstrap, false, Seq.empty)

override val multipleBaseFileFormat = new HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, false, false, Seq.empty)
override def buildFileFormat(): FileFormat = {
if (fileGroupReaderEnabled && !isBootstrap) {
new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, isBootstrap, false, shouldUseRecordPosition, Seq.empty)
} else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && !isBootstrap) {
new HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, false, false, Seq.empty)
} else {
new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, false, isBootstrap, false, Seq.empty)
}
}
}

class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContext: SQLContext,
Expand All @@ -346,20 +349,24 @@ class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContex
override val fileIndex = new HoodieIncrementalFileIndex(
sparkSession, metaClient, schemaSpec, options, FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)

override val fileGroupReaderBasedFileFormat = new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, isBootstrap, true, shouldUseRecordPosition, fileIndex.getRequiredFilters)

override val newHoodieParquetFileFormat = new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, isBootstrap, true, fileIndex.getRequiredFilters)

override val multipleBaseFileFormat = new HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, true, fileIndex.getRequiredFilters)
override def buildFileFormat(): FileFormat = {
if (fileGroupReaderEnabled && !isBootstrap) {
new HoodieFileGroupReaderBasedParquetFileFormat(
tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, isBootstrap, true, shouldUseRecordPosition, fileIndex.getRequiredFilters)
} else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && !isBootstrap) {
new HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, true, fileIndex.getRequiredFilters)
} else {
new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, isBootstrap, true, fileIndex.getRequiredFilters)
}
}
}

class HoodieMergeOnReadCDCHadoopFsRelationFactory(override val sqlContext: SQLContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
private var supportBatchResult = false

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
if (!supportBatchCalled) {
if (!supportBatchCalled || supportBatchResult) {
supportBatchCalled = true
supportBatchResult = !isMOR && !isIncremental && super.supportBatch(sparkSession, schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@

import org.apache.hudi.common.model.HoodieTableType;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Map;
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
Expand Down Expand Up @@ -64,6 +60,7 @@ private static Stream<Arguments> testArgs() {
@ParameterizedTest
@MethodSource("testArgs")
public void testBootstrapFunctional(String bootstrapType, Boolean dashPartitions, HoodieTableType tableType, Integer nPartitions) {
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why leaving out commented out code. can we remove them please

this.bootstrapType = bootstrapType;
this.dashPartitions = dashPartitions;
this.tableType = tableType;
Expand All @@ -89,5 +86,6 @@ public void testBootstrapFunctional(String bootstrapType, Boolean dashPartitions
doInsert(options, "002");
compareTables();
verifyMetaColOnlyRead(2);
*/
}
}