Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public abstract class AsyncCompactService extends HoodieAsyncService {

private final int maxConcurrentCompaction;
private transient AbstractCompactor compactor;
private transient HoodieEngineContext context;
protected transient HoodieEngineContext context;
private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>();
private transient ReentrantLock queueLock = new ReentrantLock();
private transient Condition consumed = queueLock.newCondition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ protected AbstractHoodieClient(HoodieEngineContext context, HoodieWriteConfig cl
@Override
public void close() {
stopEmbeddedServerView(true);
this.context.setJobStatus("", "");
}

private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,7 @@ public abstract void commitCompaction(String compactionInstantTime, O writeStatu
*/
protected abstract void completeCompaction(HoodieCommitMetadata metadata, O writeStatuses,
HoodieTable<T, I, K, O> table, String compactionCommitTime);



/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static org.apache.hudi.avro.model.HoodieReplaceCommitMetadata convertRepl
public static boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieTableMetaClient metaClient,
TableFileSystemView fileSystemView,
HoodieInstant instant, List<String> replacedPartitions) {

context.setJobStatus(ReplaceArchivalHelper.class.getSimpleName(), "Delete replaced file groups");
List<Boolean> f = context.map(replacedPartitions, partition -> {
Stream<FileSlice> fileSlices = fileSystemView.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp(), partition)
.flatMap(HoodieFileGroup::getAllRawFileSlices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ public void finalizeWrite(HoodieEngineContext context, String instantTs, List<Ho

private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
// Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation");
context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> {
final FileSystem fileSystem = metaClient.getFs();
LOG.info("Deleting invalid data files=" + partitionWithFileList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa
if (subDirectories.size() > 0) {
parallelism = Math.min(subDirectories.size(), parallelism);
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker files for all created, merged paths");
dataFiles.addAll(context.flatMap(subDirectories, directory -> {
Path path = new Path(directory);
FileSystem fileSystem = path.getFileSystem(serializedConf.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ public SparkAsyncCompactService(HoodieEngineContext context, AbstractHoodieWrite

@Override
protected AbstractCompactor createCompactor(AbstractHoodieWriteClient client) {
return new HoodieSparkCompactor(client);
return new HoodieSparkCompactor(client, this.context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client;

import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand All @@ -33,16 +34,20 @@
public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends AbstractCompactor<T,
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(HoodieSparkCompactor.class);
private transient HoodieEngineContext context;

public HoodieSparkCompactor(AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> compactionClient) {
public HoodieSparkCompactor(AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> compactionClient,
HoodieEngineContext context) {
super(compactionClient);
this.context = context;
}

@Override
public void compact(HoodieInstant instant) throws IOException {
LOG.info("Compactor executing compaction " + instant);
SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>)compactionClient;
JavaRDD<WriteStatus> res = writeClient.compact(instant.getTimestamp());
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status");
long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count();
if (numWriteErrors != 0) {
// We treat even a single error in compaction as fatal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus>
protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD);
Map<String, Long> comparisonsPerFileGroup =
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD);
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context);
int inputParallelism = partitionRecordKeyPairRDD.partitions().size();
int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
Expand All @@ -139,11 +139,13 @@ private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
*/
private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long> recordsPerPartition,
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
final JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD) {
final JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD,
final HoodieEngineContext context) {
Map<String, Long> fileToComparisons;
if (config.getBloomIndexPruneByRanges()) {
// we will just try exploding the input and then count to determine comparisons
// FIX(vc): Only do sampling here and extrapolate?
context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files");
fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey();
} else {
fileToComparisons = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>

WorkloadProfile profile = null;
if (isWorkloadProfileNeeded()) {
context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile");
profile = new WorkloadProfile(buildProfile(inputRecordsRDD));
LOG.info("Workload profile :" + profile);
saveWorkloadProfileMetadataToInflight(profile, instantTime);
Expand Down Expand Up @@ -206,6 +207,7 @@ protected String getCommitActionType() {

@Override
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect");
commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
JavaRDD<WriteStatus> statuses = compactor.compact(context, compactionPlan, table, config, instantTime);

statuses.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata");
List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collect();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
MarkerFiles markerFiles = new MarkerFiles(table, instantToRollback.getTimestamp());
List<String> markerFilePaths = markerFiles.allMarkerFilePaths();
int parallelism = Math.max(Math.min(markerFilePaths.size(), config.getRollbackParallelism()), 1);
jsc.setJobGroup(this.getClass().getSimpleName(), "Rolling back using marker files");
return jsc.parallelize(markerFilePaths, parallelism)
.map(markerFilePath -> {
String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ public SparkStreamingAsyncCompactService(HoodieEngineContext context, AbstractHo

@Override
protected AbstractCompactor createCompactor(AbstractHoodieWriteClient client) {
return new HoodieSparkCompactor(client);
return new HoodieSparkCompactor(client, this.context);
}
}