Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,7 @@ public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute() {
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
// if there are full bootstrap to be performed, perform that too
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
// Delete the marker directory for the instant
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());

return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
Expand All @@ -148,17 +146,22 @@ protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataBootstrap
}

HoodieTableMetaClient metaClient = table.getMetaClient();
String bootstrapInstantTime = HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
metaClient.getActiveTimeline().createNewInstant(
new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(),
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(), bootstrapInstantTime));

table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
metaClient.getCommitActionType(), HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS), Option.empty());
metaClient.getCommitActionType(), bootstrapInstantTime), Option.empty());

HoodieData<BootstrapWriteStatus> bootstrapWriteStatuses = runMetadataBootstrap(partitionFilesList);

HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
updateIndexAndCommitIfNeeded(bootstrapWriteStatuses.map(w -> w), result);

// Delete the marker directory for the instant
WriteMarkersFactory.get(config.getMarkersType(), table, bootstrapInstantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());

return Option.of(result);
}

Expand Down Expand Up @@ -265,12 +268,20 @@ protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrap(Lis
(JavaRDD<HoodieRecord>) inputProvider.generateInputRecords("bootstrap_source", config.getBootstrapSourceBasePath(),
partitionFilesList);
// Start Full Bootstrap
final HoodieInstant requested = new HoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(),
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
String bootstrapInstantTime = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
final HoodieInstant requested = new HoodieInstant(
State.REQUESTED, table.getMetaClient().getCommitActionType(), bootstrapInstantTime);
table.getActiveTimeline().createNewInstant(requested);

// Setup correct schema and run bulk insert.
return Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> writeMetadataOption =
Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());

// Delete the marker directory for the instant
WriteMarkersFactory.get(config.getMarkersType(), table, bootstrapInstantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());

return writeMetadataOption;
}

protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.JobConf;
Expand Down Expand Up @@ -346,6 +347,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(instant, metaClient.getActiveTimeline()
.getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
verifyNoMarkerInTempFolder();

Dataset<Row> bootstrapped = sqlContext.read().format("parquet").load(basePath);
Dataset<Row> original = sqlContext.read().format("parquet").load(bootstrapBasePath);
Expand Down Expand Up @@ -463,7 +465,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
jsc.hadoopConfiguration(),
FSUtils.getAllPartitionPaths(context, basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true,
basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true,
Arrays.asList("_row_key"));
assertEquals(totalRecords, records.size());
for (GenericRecord r : records) {
Expand All @@ -473,6 +475,12 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
assertEquals(totalRecords, seenKeys.size());
}

private void verifyNoMarkerInTempFolder() throws IOException {
String tempFolderPath = metaClient.getTempFolderPath();
FileSystem fileSystem = FSUtils.getFs(tempFolderPath, jsc.hadoopConfiguration());
assertEquals(0, fileSystem.listStatus(new Path(tempFolderPath)).length);
}

public static class TestFullBootstrapDataProvider extends FullRecordBootstrapDataProvider<JavaRDD<HoodieRecord>> {

public TestFullBootstrapDataProvider(TypedProperties props, HoodieSparkEngineContext context) {
Expand All @@ -481,7 +489,7 @@ public TestFullBootstrapDataProvider(TypedProperties props, HoodieSparkEngineCon

@Override
public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath,
List<Pair<String, List<HoodieFileStatus>>> partitionPaths) {
List<Pair<String, List<HoodieFileStatus>>> partitionPaths) {
String filePath = FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream())
.findAny().get().getPath()).toString();
ParquetFileReader reader = null;
Expand Down