Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public String stats() throws IOException {
@CliCommand(value = "metadata list-partitions", help = "Print a list of all partitions from the metadata")
public String listPartitions() throws IOException {
HoodieCLI.getTableMetaClient();
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true, false, false);
initJavaSparkContext();
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc),
HoodieCLI.basePath, "/tmp", true, false, false, false);

StringBuffer out = new StringBuffer("\n");
if (!metadata.enabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public HoodieWriteConfig getConfig() {
return config;
}

public HoodieEngineContext getEngineContext() {
return context;
}

protected void initWrapperFSMetrics() {
// no-op.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ public HoodieBackedTableMetadata metadata() {
protected abstract void initialize(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient);

private void initTableMetadata() {
this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(), datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
datasetWriteConfig.useFileListingMetadata(), datasetWriteConfig.getFileListingMetadataVerify(), false,
this.metadata = new HoodieBackedTableMetadata(engineContext, datasetWriteConfig.getBasePath(),
datasetWriteConfig.getSpillableMapBasePath(), datasetWriteConfig.useFileListingMetadata(),
datasetWriteConfig.getFileListingMetadataVerify(), false,
datasetWriteConfig.shouldAssumeDatePartitioning());
this.metaClient = metadata.getMetaClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
Expand Down Expand Up @@ -93,6 +94,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem

protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final transient HoodieEngineContext context;
protected final HoodieIndex<T, I, K, O> index;

private SerializableConfiguration hadoopConfiguration;
Expand All @@ -108,6 +110,7 @@ protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, Hoo
config.getViewStorageConfig());
this.metaClient = metaClient;
this.index = getIndex(config, context);
this.context = context;
this.taskContextSupplier = context.getTaskContextSupplier();
}

Expand Down Expand Up @@ -660,8 +663,16 @@ public boolean requireSortedRecords() {

public HoodieTableMetadata metadata() {
if (metadata == null) {
metadata = HoodieTableMetadata.create(hadoopConfiguration.get(), config.getBasePath(), config.getSpillableMapBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.isMetricsOn(), config.shouldAssumeDatePartitioning());
HoodieEngineContext engineContext = context;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a more elegant way to do this would be through a getEngineContext() method that reinits lazily

if (engineContext == null) {
// This is to handle scenarios where this is called at the executor tasks which do not have access
// to engine context, and it ends up being null (as its not serializable and marked transient here).
engineContext = new HoodieLocalEngineContext(hadoopConfiguration.get());
}

metadata = HoodieTableMetadata.create(engineContext, config.getBasePath(), config.getSpillableMapBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.isMetricsOn(),
config.shouldAssumeDatePartitioning());
}
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public Option<HoodieClusteringPlan> generateClusteringPlan() {
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
LOG.info("Scheduling clustering for " + metaClient.getBasePath());
HoodieWriteConfig config = getWriteConfig();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
List<String> partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
config.shouldAssumeDatePartitioning());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRoll
* @param config instance of {@link HoodieWriteConfig} to use.
* @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected.
*/
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, HoodieWriteConfig config) {
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext,
FileSystem fs, String basePath, HoodieWriteConfig config) {
try {
return FSUtils.getAllPartitionPaths(fs, basePath, config.useFileListingMetadata(),
return FSUtils.getAllPartitionPaths(engineContext, fs, basePath, config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()).stream()
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
.collect(Collectors.toList());
Expand All @@ -113,7 +114,7 @@ public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListin
public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException {
String commit = instantToRollback.getTimestamp();
HoodieWriteConfig config = table.getConfig();
List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public HoodieSavepointMetadata execute() {
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);

context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime);
List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
config.shouldAssumeDatePartitioning()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {
@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config);
context, table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config);
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ private static void recreateMarkerFiles(final String commitInstantTime,
// generate rollback stats
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
table.getConfig());
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), table.getConfig());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitio
final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
return super.loadInvolvedFiles(allPartitionPaths, context, hoodieTable);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(final Hoodie
final HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
// Obtain the latest data files from all the partitions.
return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, hoodieTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
try {
List<String> partitionPaths = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), config.useFileListingMetadata(), config.getFileListingMetadataVerify(),
false);
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context,
// TODO - rollback any compactions in flight
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getFs(), metaClient.getBasePath(),
config.useFileListingMetadata(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());

// filter the partition paths if needed to reduce list status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() {

@Override
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), config);
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), config);
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ private static void recreateMarkerFiles(final String commitInstantTime,
// generate rollback stats
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
table.getConfig());
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), table.getConfig());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testSavepointAndRollback() throws Exception {
assertNoWriteErrors(statuses);
HoodieWriteConfig config = getConfig();
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), config.useFileListingMetadata(),
FSUtils.getAllPartitionPaths(context, fs, cfg.getBasePath(), config.useFileListingMetadata(),
config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning());
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieSparkTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
Expand Down
Loading