diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index b15cde597a193..4c9e585c363e5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -110,29 +110,32 @@ class DefaultSource extends RelationProvider val queryType = parameters(QUERY_TYPE.key) log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") + if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { + new EmptyRelation(sqlContext, metaClient) + } else { + (tableType, queryType, isBootstrappedTable) match { + case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | + (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | + (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => + getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath, + readPaths, metaClient) - (tableType, queryType, isBootstrappedTable) match { - case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | - (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | - (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => - getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath, - readPaths, metaClient) - - case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => - new IncrementalRelation(sqlContext, parameters, schema, metaClient) + case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => + new IncrementalRelation(sqlContext, parameters, schema, metaClient) - case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => - new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient) + case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => + new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient) - case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => - new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient) + case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => + new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient) - case (_, _, true) => - new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters) + case (_, _, true) => + new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters) - case (_, _, _) => - throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," + - s"isBootstrappedTable: $isBootstrappedTable ") + case (_, _, _) => + throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," + + s"isBootstrappedTable: $isBootstrappedTable ") + } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/EmptyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/EmptyRelation.scala new file mode 100644 index 0000000000000..3645eb8d9b9e8 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/EmptyRelation.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Row, SQLContext} + +import scala.util.control.NonFatal + +/** + * BaseRelation representing empty RDD. + * @param sqlContext instance of SqlContext. + */ +class EmptyRelation(val sqlContext: SQLContext, metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan { + + override def schema: StructType = { + // do the best to find the table schema. + val schemaResolver = new TableSchemaResolver(metaClient) + try { + val avroSchema = schemaResolver.getTableAvroSchema + AvroConversionUtils.convertAvroSchemaToStructType(avroSchema) + } catch { + case NonFatal(e) => + StructType(Nil) + } + } + + override def buildScan(): RDD[Row] = { + sqlContext.sparkContext.emptyRDD[Row] + } +}