Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieLogCompactException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand Down Expand Up @@ -1088,6 +1089,9 @@ public void rollbackFailedBootstrap() {
table.rollbackBootstrap(context, createNewInstantTime());
LOG.info("Finished rolling back pending bootstrap");
}

// if bootstrap failed, lets delete metadata and restart from scratch
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -82,7 +84,10 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable {

protected final HoodieMetadataConfig metadataConfig;

private final HoodieTableQueryType queryType;
private final Option<String> specifiedQueryInstant;
private final Option<String> beginInstantTime;
private final Option<String> endInstantTime;
private final List<Path> queryPaths;

private final boolean shouldIncludePendingCommits;
Expand Down Expand Up @@ -123,6 +128,8 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
* @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations
* @param shouldValidateInstant flags to validate whether query instant is present in the timeline
* @param fileStatusCache transient cache of fetched [[FileStatus]]es
* @param beginInstantTime begin instant time for incremental query (optional)
* @param endInstantTime end instant time for incremental query (optional)
*/
public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
HoodieTableMetaClient metaClient,
Expand All @@ -133,7 +140,9 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
boolean shouldIncludePendingCommits,
boolean shouldValidateInstant,
FileStatusCache fileStatusCache,
boolean shouldListLazily) {
boolean shouldListLazily,
Option<String> beginInstantTime,
Option<String> endInstantTime) {
this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
.orElse(new String[0]);

Expand All @@ -143,11 +152,14 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
&& HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient))
.build();

this.queryType = queryType;
this.queryPaths = queryPaths;
this.specifiedQueryInstant = specifiedQueryInstant;
this.shouldIncludePendingCommits = shouldIncludePendingCommits;
this.shouldValidateInstant = shouldValidateInstant;
this.shouldListLazily = shouldListLazily;
this.beginInstantTime = beginInstantTime;
this.endInstantTime = endInstantTime;

this.basePath = metaClient.getBasePathV2();

Expand Down Expand Up @@ -300,7 +312,17 @@ protected List<PartitionPath> listPartitionPaths(List<String> relativePartitionP
protected List<PartitionPath> listPartitionPaths(List<String> relativePartitionPaths) {
List<String> matchedPartitionPaths;
try {
matchedPartitionPaths = tableMetadata.getPartitionPathWithPathPrefixes(relativePartitionPaths);
if (isPartitionedTable()) {
if (queryType == HoodieTableQueryType.INCREMENTAL && beginInstantTime.isPresent()) {
HoodieTimeline timelineAfterBeginInstant = TimelineUtils.getCommitsTimelineAfter(metaClient, beginInstantTime.get(), Option.empty());
HoodieTimeline timelineToQuery = endInstantTime.map(timelineAfterBeginInstant::findInstantsBeforeOrEquals).orElse(timelineAfterBeginInstant);
matchedPartitionPaths = TimelineUtils.getWrittenPartitions(timelineToQuery);
} else {
matchedPartitionPaths = tableMetadata.getPartitionPathWithPathPrefixes(relativePartitionPaths);
}
} else {
matchedPartitionPaths = Collections.singletonList(StringUtils.EMPTY_STRING);
}
} catch (IOException e) {
throw new HoodieIOException("Error fetching partition paths", e);
}
Expand All @@ -319,6 +341,10 @@ protected void refresh() {
doRefresh();
}

private boolean isPartitionedTable() {
return partitionColumns.length > 0 || HoodieTableMetadata.isMetadataTable(basePath.toString());
}

protected HoodieTimeline getActiveTimeline() {
// NOTE: We have to use commits and compactions timeline, to make sure that we're properly
// handling the following case: when records are inserted into the new log-file w/in the file-group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,20 @@ public static HoodieTableMetaClient init(String basePath, HoodieTableType tableT
}

public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable, String keyGenerator) throws IOException {
return init(basePath, tableType, bootstrapBasePath, bootstrapIndexEnable, keyGenerator, "datestr");
}

public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable, String keyGenerator,
String partitionFieldConfigValue) throws IOException {
Properties props = new Properties();
props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath);
props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), bootstrapIndexEnable);
if (keyGenerator != null) {
props.put("hoodie.datasource.write.keygenerator.class", keyGenerator);
props.put("hoodie.datasource.write.partitionpath.field", "datestr");
}
if (keyGenerator != null && !keyGenerator.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")) {
props.put("hoodie.datasource.write.partitionpath.field", partitionFieldConfigValue);
props.put(HoodieTableConfig.PARTITION_FIELDS.key(), partitionFieldConfigValue);
}
return init(getDefaultHadoopConf(), basePath, tableType, props);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ public HiveHoodieTableFileIndex(HoodieEngineContext engineContext,
shouldIncludePendingCommits,
true,
new NoopCache(),
false);
false,
Option.empty(),
Option.empty());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ case class HoodieFileIndex(spark: SparkSession,
configProperties = getConfigProperties(spark, options),
queryPaths = HoodieFileIndex.getQueryPaths(options),
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
fileStatusCache = fileStatusCache,
beginInstantTime = options.get(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
endInstantTime = options.get(DataSourceReadOptions.END_INSTANTTIME.key)
)
with FileIndex {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
configProperties: TypedProperties,
queryPaths: Seq[Path],
specifiedQueryInstant: Option[String] = None,
@transient fileStatusCache: FileStatusCache = NoopCache)
@transient fileStatusCache: FileStatusCache = NoopCache,
beginInstantTime: Option[String] = None,
endInstantTime: Option[String] = None)
extends BaseHoodieTableFileIndex(
new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
metaClient,
Expand All @@ -77,7 +79,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
false,
false,
SparkHoodieTableFileIndex.adapt(fileStatusCache),
shouldListLazily(configProperties)
shouldListLazily(configProperties),
toJavaOption(beginInstantTime),
toJavaOption(endInstantTime)
)
with SparkAdapterSupport
with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
Expand Down Expand Up @@ -88,6 +89,8 @@ public void setUp() throws IOException {
properties.setProperty(
PAYLOAD_ORDERING_FIELD_PROP_KEY,
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),"partition_path");
properties.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(),"partition_path");
metaClient = getHoodieMetaClient(hadoopConf(), basePath(), HoodieTableType.MERGE_ON_READ, properties);
}

Expand Down Expand Up @@ -173,6 +176,7 @@ public HoodieWriteConfig getWriteConfig(Schema avroSchema) {
extraProperties.setProperty(
WRITE_RECORD_POSITIONS.key(),
"true");
extraProperties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),"partition_path");

return getConfigBuilder(true)
.withPath(basePath())
Expand Down Expand Up @@ -249,7 +253,7 @@ public void checkDataEquality(int numRecords) {
.read()
.options(properties)
.format("org.apache.hudi")
.load(basePath() + "/" + getPartitionPath());
.load(basePath());
List<Row> result = rows.collectAsList();
assertEquals(numRecords, result.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -201,9 +202,9 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName()
: NonpartitionedKeyGenerator.class.getCanonicalName();
if (deltaCommit) {
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, true, keyGeneratorClass);
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, true, keyGeneratorClass, "partition_path");
} else {
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, true, keyGeneratorClass);
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, true, keyGeneratorClass, "partition_path");
}

int totalRecords = 100;
Expand Down Expand Up @@ -240,7 +241,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
break;
}
List<String> partitions = Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03");
List<String> partitions = partitioned ? Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03") : Collections.EMPTY_LIST;
long timestamp = Instant.now().toEpochMilli();
Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, bootstrapBasePath);
HoodieWriteConfig config = getConfigBuilder(schema.toString())
Expand Down