Skip to content

Conversation

@satishkotha
Copy link
Member

What is the purpose of the pull request

Add support for reusing fileId in clustering execution strategy. This is strategy specific. Default is still to create new files

Brief change log

Some datasets rely on external index. We cannot change record location for clustering (because external index doesn't support update). We can still take advantage of clustering by doing 'local' sorting within each file. Add support for such strategies.

Also, made small changes on how metadata is generated after clustering is complete. (metadata is getting generated redundantly twice before. Removed 1 to make it simple).

Verify this pull request

This change added tests

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@codecov-commenter
Copy link

codecov-commenter commented May 7, 2021

Codecov Report

Merging #2918 (32dbe35) into master (0284cde) will increase coverage by 15.29%.
The diff coverage is n/a.

Impacted file tree graph

@@              Coverage Diff              @@
##             master    #2918       +/-   ##
=============================================
+ Coverage     54.23%   69.53%   +15.29%     
+ Complexity     3810      374     -3436     
=============================================
  Files           488       54      -434     
  Lines         23574     2002    -21572     
  Branches       2510      237     -2273     
=============================================
- Hits          12786     1392    -11394     
+ Misses         9636      478     -9158     
+ Partials       1152      132     -1020     
Flag Coverage Δ Complexity Δ
hudicli ? ?
hudiclient ? ?
hudicommon ? ?
hudiflink ? ?
hudihadoopmr ? ?
hudisparkdatasource ? ?
hudisync ? ?
huditimelineservice ? ?
hudiutilities 69.53% <ø> (-0.05%) 374.00 <ø> (-1.00)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...apache/hudi/utilities/deltastreamer/DeltaSync.java 71.08% <0.00%> (-0.35%) 55.00% <0.00%> (-1.00%)
...n/java/org/apache/hudi/common/HoodieCleanStat.java
...n/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
...hadoop/realtime/RealtimeCompactedRecordReader.java
...che/hudi/exception/InvalidHoodiePathException.java
.../hudi/async/SparkStreamingAsyncCompactService.java
...org/apache/hudi/common/model/TableServiceType.java
.../common/util/queue/FunctionBasedQueueProducer.java
...ava/org/apache/hudi/cli/commands/UtilsCommand.java
.../hudi/table/format/cow/ParquetSplitReaderUtil.java
... and 425 more

final Map<String, String> strategyParams,
final Schema schema,
final List<HoodieFileGroupId> inputFileIds) {
if (inputRecords.getNumPartitions() != 1 || inputFileIds.size() != 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if must one fileid, each clustering group should just have one file group? but not see the limit in clustering scheduling

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is enforced by setting group size limit to a small number. See unit test added .withClusteringMaxBytesInGroup(10) // set small number so each file is considered as separate clustering group

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support a other config such as filegroupLocalSort? Because reuse withClusteringMaxBytesInGroup to set it so small , users may be confuse.

@lw309637554
Copy link
Contributor

@satishkotha hello , have some doubt

  1. Just see add a test strategy . Will a formal strategy be added later?
  2. This PR is to support which Index?
  3. If every file group just transfrom to a same name file group. If the small files can not merge ?

@satishkotha
Copy link
Member Author

@satishkotha hello , have some doubt

  1. Just see add a test strategy . Will a formal strategy be added later?
  2. This PR is to support which Index?
  3. If every file group just transfrom to a same name file group. If the small files can not merge ?

@lw309637554

  1. Yes, the actual strategy can be added easily if we agree on high level change
  2. This is to support HBaseIndex, which does not support update for record location
  3. yes, you are right. merging strategy cannot be applied to tables that use HBaseIndex. We can still local 'file-level' sorting i.e., sorting records in each data file by specified column so only one block (row group) needs to be read for queries.

Let me know if you any other questions/comments.

@nsivabalan nsivabalan added the priority:high Significant impact; potential bugs label May 11, 2021
public void testClusteringWithOneFilePerGroup() throws Exception {
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringMaxBytesInGroup(10) // set small number so each file is considered as separate clustering group
.withClusteringExecutionStrategyClass("org.apache.hudi.ClusteringIdentityTestExecutionStrategy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add a config

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a unit test. Will provide another schedule clustering strategy as part of another PR to limit number of files per group.

import java.util.Map;

/**
* A HoodieCreateHandle which writes all data into a single file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieCreateFixedHandle

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bit of a misnomer. Even HoodieCreateHandle only writes to a single file.

Rename: HoodieUnboundedCreateHandle or something that captures that intent , that this does not respect the sizing aspects.

@lw309637554
Copy link
Contributor

lw309637554 commented May 13, 2021

@satishkotha hello , have some doubt

  1. Just see add a test strategy . Will a formal strategy be added later?
  2. This PR is to support which Index?
  3. If every file group just transfrom to a same name file group. If the small files can not merge ?

@lw309637554

  1. Yes, the actual strategy can be added easily if we agree on high level change
  2. This is to support HBaseIndex, which does not support update for record location
  3. yes, you are right. merging strategy cannot be applied to tables that use HBaseIndex. We can still local 'file-level' sorting i.e., sorting records in each data file by specified column so only one block (row group) needs to be read for queries.

Let me know if you any other questions/comments.
@satishkotha
high level change is OK . Just have a other two comments

  1. ".withClusteringMaxBytesInGroup(10) // set small number so each file is considered as separate clustering group" , Can we add aother config
  2. If the sort will support in HoodieCreateFixedHandle?

@satishkotha
Copy link
Member Author

satishkotha commented May 13, 2021

Can we add aother config

Yes that will actually be provided as separate strategy.

If the sort will support in HoodieCreateFixedHandle?

This part will be provided in execution strategy. Right now i only add test strategy which doesn't support sorting. I'm going to work on adding real strategy to sort.

Both above strategies will be sent as another PR. Let me know if that works.

@lw309637554
Copy link
Contributor

@satishkotha LGTM

@vinothchandar
Copy link
Member

We cannot change record location for clustering (because external index doesn't support update). We can still take advantage of clustering by doing 'local' sorting within each file.

This can be achieved by sorting during original write time, correct?

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to understand if more changes are needed to support what we are shooting for here. or you plan to keep the strategy internal and just these changes are enough for you do it in production?

I would like to avoid lot of specific changes to support not changing the record location and have it as a separate strategy. Is that how you are thinking about it as well?

If so, then we can address the naming and other reuse comments and land this. Please let me know.

*
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
public class CreateFixedFileHandleFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we subclass this from CreateHandleFactory? or call this SingleFileCreateHandleFactory?

}

@Override
public HoodieWriteHandle<T, I, K, O> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering why we need this actually. Would n't just passing Long.MAX_VALUE as the target file size, get the create handle to do this?

}

@Override
public boolean canWrite(HoodieRecord record) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just reuse CreateHandle with a large target file size? if we are doing all this for just a specific clustering strategy?

import java.util.Map;

/**
* A HoodieCreateHandle which writes all data into a single file.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bit of a misnomer. Even HoodieCreateHandle only writes to a single file.

Rename: HoodieUnboundedCreateHandle or something that captures that intent , that this does not respect the sizing aspects.

*/
public abstract O performClustering(final I inputRecords, final int numOutputGroups, final String instantTime,
final Map<String, String> strategyParams, final Schema schema);
final Map<String, String> strategyParams, final Schema schema, final List<HoodieFileGroupId> inputFileIds);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add javadocs for this method explaining what each param is.

assertEquals(0, fileIdIntersection.size());

config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(completeClustering)
config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we don't honor completeClustering anymore? Not following why this change was needed

JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> inputRecords = readRecordsForGroup(jsc, clusteringGroup);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the input file ids are already in the serialized plan? This PR just passes this around additionally?

protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan).collect(
Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
Set<HoodieFileGroupId> newFilesWritten = new HashSet(writeStatuses.map(s -> s.getFileId()).collect());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename: newFileIds

Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
Set<HoodieFileGroupId> newFilesWritten = new HashSet(writeStatuses.map(s -> s.getFileId()).collect());
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
.filter(fg -> !newFilesWritten.contains(fg))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry. not following. why do we need this filter?

return hoodieRecord;
}

private HoodieWriteMetadata<JavaRDD<WriteStatus>> buildWriteMetadata(JavaRDD<WriteStatus> writeStatusJavaRDD) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was removed, because the constructor does the same job?

@satishkotha
Copy link
Member Author

I think all changes in this have already been merged as part of #3419. Closing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:high Significant impact; potential bugs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants