Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ object DataSourceReadOptions {
.withDocumentation("Enables data-skipping allowing queries to leverage indexes to reduce the search space by " +
"skipping over files")

val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
.defaultValue("false")
.withDocumentation("When doing an incremental query whether we should fall back to full table scans if file does not exist.")
/** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package org.apache.hudi

import org.apache.avro.Schema
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}

import java.util.stream.Collectors
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.HoodieTimer
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hadoop.fs.GlobPattern
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.table.HoodieSparkTable
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
Expand All @@ -40,11 +41,11 @@ import scala.collection.JavaConversions._
import scala.collection.mutable

/**
* Relation, that implements the Hoodie incremental view.
*
* Implemented for Copy_on_write storage.
*
*/
* Relation, that implements the Hoodie incremental view.
*
* Implemented for Copy_on_write storage.
*
*/
class IncrementalRelation(val sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: StructType,
Expand Down Expand Up @@ -85,7 +86,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val tableSchema = if (useEndInstantSchema) {
if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else
if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else
schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last)
} else {
schemaResolver.getTableAvroSchemaWithoutMetadataFields()
Expand Down Expand Up @@ -165,26 +166,63 @@ class IncrementalRelation(val sqlContext: SQLContext,
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters)
log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))

var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)

if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean

var doFullTableScan = false

if (fallbackToFullTableScan) {
val fs = new Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
val timer = new HoodieTimer().startTimer();

val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths
val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new Path(path)))
val timeTaken = timer.endTimer()
log.info("Checking if paths exists took " + timeTaken + "ms")

val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
val isInstantArchived = optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // True if optStartTs < activeTimeline.first

if (isInstantArchived || firstNotFoundPath.isDefined) {
doFullTableScan = true
log.info("Falling back to full table scan")
}
}

if (regularFileIdToFullPath.nonEmpty) {
df = df.union(sqlContext.read.options(sOpts)
if (doFullTableScan) {
val hudiDF = sqlContext.read
.format("hudi")
.schema(usedSchema)
.parquet(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.load(basePath)
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
commitsToReturn.last.getTimestamp))
// schema enforcement does not happen in above spark.read with hudi. hence selecting explicitly w/ right column order
val fieldNames : Array[String] = df.schema.fields.map(field => field.name)
df = df.union(hudiDF.select(fieldNames.head, fieldNames.tail: _*))
} else {
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
}

if (regularFileIdToFullPath.nonEmpty) {
df = df.union(sqlContext.read.options(sOpts)
.schema(usedSchema)
.parquet(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
}
}

filters.foldLeft(df)((e, f) => e.filter(f)).rdd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.types._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
Expand Down Expand Up @@ -708,6 +709,90 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
}

@Test def testFailEarlyForIncrViewQueryForNonExistingFiles(): Unit = {
// Create 10 commits
for (i <- 1 to 10) {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.cleaner.commits.retained", "3")
.option("hoodie.keep.min.commits", "4")
.option("hoodie.keep.max.commits", "5")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(HoodieMetadataConfig.ENABLE.key(), value = false)
.mode(SaveMode.Append)
.save(basePath)
}

val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build()
/**
* State of timeline after 10 commits
* +------------------+--------------------------------------+
* | Archived | Active Timeline |
* +------------------+--------------+-----------------------+
* | C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9 |
* +------------------+--------------+-----------------------+
* | Data cleaned | Data exists in table |
* +---------------------------------+-----------------------+
*/

val completedCommits = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9
//Anything less than 2 is a valid commit in the sense no cleanup has been done for those commit files
var startTs = completedCommits.nthInstant(0).get().getTimestamp //C4
var endTs = completedCommits.nthInstant(1).get().getTimestamp //C5

//Calling without the fallback should result in Path does not exist
var hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.load(basePath)

val msg = "Should fail with Path does not exist"
assertThrows(classOf[AnalysisException], new Executable {
override def execute(): Unit = {
hoodieIncViewDF.count()
}
}, msg)

//Should work with fallback enabled
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true")
.load(basePath)
assertEquals(100, hoodieIncViewDF.count())

//Test out for archived commits
val archivedInstants = hoodieMetaClient.getArchivedTimeline.getInstants.distinct().toArray
startTs = archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0
endTs = completedCommits.nthInstant(1).get().getTimestamp //C5

//Calling without the fallback should result in Path does not exist
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.load(basePath)

assertThrows(classOf[AnalysisException], new Executable {
override def execute(): Unit = {
hoodieIncViewDF.count()
}
}, msg)

//Should work with fallback enabled
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true")
.load(basePath)
assertEquals(500, hoodieIncViewDF.count())
}

def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = {
val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
DataFrameReader reader = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight())
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()));

Dataset<Row> source = reader.load(srcPath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.utilities.functional;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
Expand Down Expand Up @@ -1739,6 +1740,54 @@ public void testJdbcSourceIncrementalFetchInContinuousMode() {
}
}

@Test
public void testHoodieIncrFallback() throws Exception {
String tableBasePath = dfsBasePath + "/incr_test_table";
String downstreamTableBasePath = dfsBasePath + "/incr_test_downstream_table";

insertInTable(tableBasePath, 1, WriteOperationType.BULK_INSERT);
HoodieDeltaStreamer.Config downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath,
WriteOperationType.BULK_INSERT, true, null);
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();

insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
//No change as this fails with Path not exist error
assertThrows(org.apache.spark.sql.AnalysisException.class, () -> new HoodieDeltaStreamer(downstreamCfg, jsc).sync());
TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*", sqlContext);

if (downstreamCfg.configs == null) {
downstreamCfg.configs = new ArrayList<>();
}

downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key() + "=true");
//Adding this conf to make testing easier :)
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");
downstreamCfg.operation = WriteOperationType.UPSERT;
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();

long baseTableRecords = sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").count();
long downStreamTableRecords = sqlContext.read().format("org.apache.hudi").load(downstreamTableBasePath + "/*/*.parquet").count();
assertEquals(baseTableRecords, downStreamTableRecords);
}

private void insertInTable(String tableBasePath, int count, WriteOperationType operationType) throws Exception {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, operationType,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false);
if (cfg.configs == null) {
cfg.configs = new ArrayList<>();
}
cfg.configs.add("hoodie.cleaner.commits.retained=3");
cfg.configs.add("hoodie.keep.min.commits=4");
cfg.configs.add("hoodie.keep.max.commits=5");
cfg.configs.add("hoodie.test.source.generate.inserts=true");

for (int i = 0; i < count; i++) {
new HoodieDeltaStreamer(cfg, jsc).sync();
}
}

@Test
public void testInsertOverwrite() throws Exception {
testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite", WriteOperationType.INSERT_OVERWRITE);
Expand Down