Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class CustomAvroKeyGenerator extends BaseKeyGenerator {

private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
private static final String SPLIT_REGEX = ":";
public static final String SPLIT_REGEX = ":";

/**
* Used as a part of config in CustomKeyGenerator.java.
Expand Down Expand Up @@ -117,8 +117,4 @@ private void validateRecordKeyFields() {
public String getDefaultPartitionPathSeparator() {
return DEFAULT_PARTITION_PATH_SEPARATOR;
}

public String getSplitRegex() {
return SPLIT_REGEX;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private String getPartitionPath(Option<GenericRecord> record, Option<Row> row) {
return "";
}
for (String field : getPartitionPathFields()) {
String[] fieldWithType = field.split(customAvroKeyGenerator.getSplitRegex());
String[] fieldWithType = field.split(customAvroKeyGenerator.SPLIT_REGEX);
if (fieldWithType.length != 2) {
throw new HoodieKeyGeneratorException("Unable to find field names for partition path in proper format");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import java.util.TimeZone

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues
import org.apache.spark.sql.types.DataType

trait SparkParsePartitionUtil extends Serializable {

def parsePartition(
path: Path,
typeInference: Boolean,
basePaths: Set[Path],
userSpecifiedDataTypes: Map[String, DataType],
timeZone: TimeZone): Option[PartitionValues]
}
10 changes: 10 additions & 0 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,16 @@ public static List<String> getAllPartitionPaths(HoodieEngineContext engineContex
}
}

public static FileStatus[] getFilesInPartition(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
String basePathStr, Path partitionPath) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext,
metadataConfig, basePathStr, FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR)) {
return tableMetadata.getAllFilesInPartition(partitionPath);
} catch (Exception e) {
throw new HoodieException("Error get files in partition: " + partitionPath, e);
}
}

public static String getFileExtension(String fullName) {
Objects.requireNonNull(fullName);
String fileName = new File(fullName).getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.table;

import java.util.Arrays;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class HoodieTableConfig implements Serializable {
public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type";
public static final String HOODIE_TABLE_VERSION_PROP_NAME = "hoodie.table.version";
public static final String HOODIE_TABLE_PRECOMBINE_FIELD = "hoodie.table.precombine.field";
public static final String HOODIE_TABLE_PARTITION_COLUMNS = "hoodie.table.partition.columns";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should persist the key generator class. and not the partition columns themselves? let me think over this more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vinothchandar , we need the partition schema for partition prune for spark sql. So the partition columns is need for that. Or we cannot get the partition schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this work for existing tables? do we need an upgrade-downgrade step for writing this to hoodie.properties?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For existing tables without store the partition columns, we query it as a non-partitioned table.


@Deprecated
public static final String HOODIE_RO_FILE_FORMAT_PROP_NAME = "hoodie.table.ro.file.format";
Expand Down Expand Up @@ -193,6 +195,14 @@ public String getPreCombineField() {
return props.getProperty(HOODIE_TABLE_PRECOMBINE_FIELD);
}

public Option<String[]> getPartitionColumns() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use an empty array to signify no partition columns?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the previous version of hudi, There are no partition column stored in the hoodie.properties. So It return an Option#empty for this case. We can distinguish the case of empty partitions and not store partition columns. I saw other property like getBootstrapBasePath also use the Option.

if (props.containsKey(HOODIE_TABLE_PARTITION_COLUMNS)) {
return Option.of(Arrays.stream(props.getProperty(HOODIE_TABLE_PARTITION_COLUMNS).split(","))
.filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[]{}));
}
return Option.empty();
}

/**
* Read the payload class for HoodieRecords from the table properties.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ public static class PropertyBuilder {
private Integer timelineLayoutVersion;
private String baseFileFormat;
private String preCombineField;
private String partitionColumns;
private String bootstrapIndexClass;
private String bootstrapBasePath;

Expand Down Expand Up @@ -646,6 +647,11 @@ public PropertyBuilder setPreCombineField(String preCombineField) {
return this;
}

public PropertyBuilder setPartitionColumns(String partitionColumns) {
this.partitionColumns = partitionColumns;
return this;
}

public PropertyBuilder setBootstrapIndexClass(String bootstrapIndexClass) {
this.bootstrapIndexClass = bootstrapIndexClass;
return this;
Expand Down Expand Up @@ -696,6 +702,9 @@ public PropertyBuilder fromProperties(Properties properties) {
if (properties.containsKey(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD)) {
setPreCombineField(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD));
}
if (properties.containsKey(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS)) {
setPartitionColumns(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS));
}
return this;
}

Expand Down Expand Up @@ -738,6 +747,10 @@ public Properties build() {
if (null != preCombineField) {
properties.put(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD, preCombineField);
}

if (null != partitionColumns) {
properties.put(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS, partitionColumns);
}
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ object DataSourceReadOptions {

val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP

val ENABLE_HOODIE_FILE_INDEX = "hoodie.file.index.enable"
val DEFAULT_ENABLE_HOODIE_FILE_INDEX = true

@Deprecated
val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ package org.apache.hudi

import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -79,39 +81,53 @@ class DefaultSource extends RelationProvider
val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths

val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)

val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
// Use the HoodieFileIndex only if the 'path' is not globbed.
// Or else we use the original way to read hoodie table.
val enableFileIndex = optParams.get(ENABLE_HOODIE_FILE_INDEX)
.map(_.toBoolean).getOrElse(DEFAULT_ENABLE_HOODIE_FILE_INDEX)
val useHoodieFileIndex = enableFileIndex && path.isDefined && !path.get.contains("*") &&
!parameters.contains(DataSourceReadOptions.READ_PATHS_OPT_KEY)
val globPaths = if (useHoodieFileIndex) {
None
} else {
Some(HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs))
}
// Get the table base path
val tablePath = if (globPaths.isDefined) {
DataSourceUtils.getTablePath(fs, globPaths.get.toArray)
} else {
DataSourceUtils.getTablePath(fs, Array(new Path(path.get)))
}
log.info("Obtained hudi table path: " + tablePath)

val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
log.info("Is bootstrapped table => " + isBootstrappedTable)

if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
if (isBootstrappedTable) {
// Snapshot query is not supported for Bootstrapped MOR tables
log.warn("Snapshot query is not supported for Bootstrapped Merge-on-Read tables." +
" Falling back to Read Optimized query.")
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
} else {
new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
}
} else {
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
}
} else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, metaClient)
} else {
new IncrementalRelation(sqlContext, optParams, schema, metaClient)
}
} else {
throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
val tableType = metaClient.getTableType
val queryType = parameters(QUERY_TYPE_OPT_KEY)
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType")

(tableType, queryType, isBootstrappedTable) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very neat. thanks @pengzhiwei2018 !

case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do (_, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! will refactor this in the next PR.

(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath,
readPaths, metaClient)

case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this to line 118. would be nice to have all incremental next to each other.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will looks more nice to have all incremental next to each other.

new IncrementalRelation(sqlContext, parameters, schema, metaClient)

case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient)

case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient)

case (_, _, true) =>
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters)

case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
s"isBootstrappedTable: $isBootstrappedTable ")
}
}

Expand Down Expand Up @@ -162,18 +178,28 @@ class DefaultSource extends RelationProvider

override def shortName(): String = "hudi"

private def getBaseFileOnlyView(sqlContext: SQLContext,
private def getBaseFileOnlyView(useHoodieFileIndex: Boolean,
sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType,
tablePath: String,
extraReadPaths: Seq[String],
isBootstrappedTable: Boolean,
globPaths: Seq[Path],
metaClient: HoodieTableMetaClient): BaseRelation = {
log.warn("Loading Base File Only View.")
log.info("Loading Base File Only View with options :" + optParams)

if (useHoodieFileIndex) {

val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
if (schema == null) Option.empty[StructType] else Some(schema),
optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))

if (isBootstrappedTable) {
// For bootstrapped tables, use our custom Spark relation for querying
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
HadoopFsRelation(
fileIndex,
fileIndex.partitionSchema,
fileIndex.dataSchema,
bucketSpec = None,
fileFormat = new ParquetFileFormat,
optParams)(sqlContext.sparkSession)
} else {
// this is just effectively RO view only, where `path` can contain a mix of
// non-hoodie/hoodie path files. set the path filter up
Expand All @@ -182,7 +208,6 @@ class DefaultSource extends RelationProvider
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])

log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
// simply return as a regular parquet relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
Expand All @@ -46,13 +46,14 @@ import scala.collection.JavaConverters._
*
* @param _sqlContext Spark SQL Context
* @param userSchema User specified schema in the datasource query
* @param globPaths Globbed paths obtained from the user provided path for querying
* @param globPaths The global paths to query. If it not none, read from the globPaths,
* else read data from tablePath using HoodiFileIndex.
* @param metaClient Hoodie table meta client
* @param optParams DataSource options passed by the user
*/
class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
val userSchema: StructType,
val globPaths: Seq[Path],
val globPaths: Option[Seq[Path]],
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String]) extends BaseRelation
with PrunedFilteredScan with Logging {
Expand Down Expand Up @@ -156,9 +157,14 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,

def buildFileIndex(): HoodieBootstrapFileIndex = {
logInfo("Building file index..")
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths)
val fileStatuses = inMemoryFileIndex.allFiles()

val fileStatuses = if (globPaths.isDefined) {
// Load files from the global paths if it has defined to be compatible with the original mode
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths.get)
inMemoryFileIndex.allFiles()
} else { // Load files by the HoodieFileIndex.
HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams,
FileStatusCache.getOrCreate(sqlContext.sparkSession)).allFiles
}
if (fileStatuses.isEmpty) {
throw new HoodieException("No files found for reading in user provided path.")
}
Expand Down
Loading