Skip to content

Conversation

@RussellSpitzer
Copy link
Member

WIP Based on Other PR's we need in first
#2820
#2828

@RussellSpitzer RussellSpitzer deleted the SparkSortCompaction branch July 15, 2021 20:01
@RussellSpitzer RussellSpitzer restored the SparkSortCompaction branch July 20, 2021 15:04
kamalbhavyam added a commit to kamalbhavyam/iceberg that referenced this pull request Jul 22, 2021
.format("iceberg")
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
.option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
.mode("append")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi :)

Anton worried about this naming when we designed this special write task. It doesn't actually append to the table in this action, instead it stages these files to be committed in the rewrite operation performed by the base action. So the "append" here really just means "write these files but don't add them to the table". This is triggered by the REWRITTEN_FILE_SCAN_TASK_SET_ID parameter which causes us not to use the normal write path. Sorry it's pretty weird, but we are basically trying to hijack the existing spark write mechanism and while adding a side-effect to track which files were written.

build.gradle Outdated
apply plugin: 'scala'

sourceSets {
main {
Copy link
Member Author

@RussellSpitzer RussellSpitzer Sep 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue never-mind this IS required if you want to do cross compilation (Have Java code depend on Scala code in the same module)

Otherwise it compiles Java without the Scala source before the Scala code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the Java plugin if we are using Scala here? I'm wondering why we were using both in the past. Maybe we only had Scala depending on Java code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the other direction is fine, we don't have the Java plugin set for this project but the Scala plugin does extends the Java one.

if (task.file().specId() == table.spec().specId()) {
return task.file().partition();
} else {
return EmptyStruct.instance();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks which are comprised of data which is not partitioned according to the curtain spec must treated as if they were not partitioned. We can probably ease this restriction for partitioning that satisfies the current partitioning (Ie Table is set to partition on day but this is an hour partition) but this is the simplest approach for now.

@RussellSpitzer
Copy link
Member Author

@aokolnychyi + @rdblue Now that we've moved distribution and sort utils, we can get this implementation in as well :) Please review when you have a chance


@Override
public <T> void set(int pos, T value) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw an exception because you're trying to write to a slot that doesn't exist?


@Override
public <T> T get(int pos, Class<T> javaClass) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw an exception?


private static EmptyStruct instance;

static EmptyStruct instance() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For most singletons, we use public static EmptyStruct get() and do the initialization at class init time to avoid a race condition:

  private static final EmptyStruct INSTANCE = new EmptyStruct();
  static EmptyStruct get() {
    return INSTANCE;
  }

}

protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, long numOutputFiles,
LogicalPlan plan, SQLConf conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd normally wrap the line to start at the same position as Distribution.

if (task.file().specId() == table.spec().specId()) {
return task.file().partition();
} else {
return EmptyStruct.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the failure? Is our hash or equals function not correct for empty structs? Should this use a StructLikeMap instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is for files with the old partitioning, if we have another way of making an empty struct here that's fine. I can't remember why I chose to make a new class here since it was a while ago now.

The core issue is
Say we have originally have a table which is partitioned Bucket(x, 5) meaning any original data files are written with values of x more or less randomly distributed in our data files. Then our table has a partitioning changed to something like Bucket(x, 10). Worst case scenario is that when we write we end up having to make 10 files for every partition in our original bucketing. This says let's just assume all those files created with old partitioning are best dealt with at the same time, rather than splitting them up using the old partitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so all data where we're changing the partitioning goes into a separate task. That makes sense to me. Probably worth a comment here to explain it.

You may also want to change how this works slightly and use a StructLikeMap. Otherwise, EmptyStruct looks a lot like structs for the unpartitioned spec. It would or wouldn't be equal depending on the order of the comparison, which probably isn't what you want.

Instead, I would keep a StructLikeMap for the current partitioning scheme and then create a separate group using a List for the rest.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure I understand what you are describing here as a fix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think I see now. Should have this in a bit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StructLike makes no guarantees about equals/hashCode behavior, so using it as a map key is like using a CharSequence as a map key. Probably not a good idea because it will break if the underlying implementation changes or differs. I'd recommend using StructLikeMap that handles consistent behavior. But that requires using a specific struct type. Since you really only need the table spec's struct type, you can use that. Then keep any tasks that are not in the current table spec in a list, like this:

StructLikeMap<FileScanTask> filesByPartition = StructLikeMap.create(table.spec().partitionType());
List<FileScanTask> tasksFromOtherSpecs = Lists.newArrayList();
Streams.stream(fileScanTasks).forEach(task -> {
    if (task.file().specId() != table.spec().specId()) {
      tasksFromOtherSpecs.add(task);
    } else {
      filesByPartition.put(task.file().partition(), task);
    }
  });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only issue I can think of here is that it doesn't fit well with the rest of the code which assumes all FileGroups have an associated partition value, I think empty is the right thing to pass since we are basically treating those files as unpartitioned.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying something like

      StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(table.spec().partitionType());
      StructLike emptyStruct = GenericRecord.create(table.schema());

      fileScanTasks.forEach(task -> {
        /*
        If a task uses an incompatible partition spec the data inside could contain values which
        belong to multiple partitions in the current spec. Treating all such files as un-partitioned and
        grouping them together helps to minimize new files made.
        */
        StructLike taskPartition = task.file().specId() == table.spec().specId() ?
            task.file().partition() : emptyStruct;

        List<FileScanTask> files = filesByPartition.get(taskPartition);
        if (files == null) {
          files = Lists.newArrayList();
        }

        files.add(task);
        filesByPartition.put(taskPartition, files);
      });
      ```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you may need to pass an empty struct through the rest of the code. But it is probably a good idea not to use StructLike as a map key.

.sort()
.option(SortStrategy.REWRITE_ALL, "true")
.option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100))
.option(BinPackStrategy.MIN_INPUT_FILES, "1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use BinPackStrategy.MIN_INPUT_FILES with a sort strategy? Should this be moved or renamed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the next test uses SortStrategy.MIN_INPUT_FILES.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this isn't needed now that I added REWRITE_ALL, This can be removed now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot I set REWRITE ALL to only apply to filtering and not to the group limits, I should make it apply to both

}

@Override
protected SortStrategy sortStrategy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little strange to me. It feels more like a createSortStrategy(), but I think it's fine since it isn't exposed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is just copying what we did for binPack ^ but we can change them both, if needed


public class Spark3SortStrategy extends SortStrategy {

public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't clear to me from the name what this does.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let me work on that naming/documenting. This is from a discussion with @aokolnychyi where we were trying to figure out how to deal with the shuffle size estimation and trying to give users a way to bail out of the automatic sizing. Let me go back to our notes and fix the name here. This is meant to be an advanced user escape valve.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For our in-house implementation of this compaction strategy the advanced user bailout approach used "rows per file" which mostly worked but wasn't stable in the face of extensive schema changes (file sized grew with column growth) but was largely stable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added in a description, we can discuss alternatives with that context. The current difficulty is that at this point in time we know task file sizes and our nob for adjusting output size is the number of shuffle partitions we create.


public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";

public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per input file? Basically the default is to balance the data across the same number of files? Seems like we would prefer to try to output totalInputSize / tableTargetFileSize.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this since we don't have the output task combiner code in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this constant also be removed then?

public RewriteStrategy options(Map<String, String> options) {
sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
SIZE_ESTIMATE_MULTIPLE,
1.0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about "compression factor" or something similar? That seems like what you mean by this.

1.0);

Preconditions.checkArgument(sizeEstimateMultiple > 0,
"Cannot use Spark3Sort Strategy without %s being positive, found %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be simpler. How about "Invalid size estimation factor: %s (not positive)".

boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
SortOrder[] ordering;
Distribution distribution;
ordering = Spark3Util.convert(sortOrder());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not set the default in the variable declaration?

Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
Arrays.stream(ordering)).toArray(SortOrder[]::new);
} else {
distribution = Distributions.ordered(ordering);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the logic here. Won't an ordered distribution result in a repartition as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the difference between running a repartition with the Partition Columns included in the ordering or just using the requested sort Columns. We can probably combine the logic here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we not use the partition columns in the sort? It seems to me that we want to always go through the utility methods to build the distribution and sort.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that sounds good, I think the only reason I had this here is that I have another PR for allowing compacting between different specs for BinPack as well and it used basically this same branching logic. For sort I think you are correct and we are fine just always including the partition columns ... I don't think this would effect the range partitioner even if all the values are the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want to make this change, or leave it as is?


// Reset Shuffle Partitions for our sort
long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why increase the number of tasks? Won't that cause more output files than numOutputFiles to be created?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this is part of our internal implementation with an additional feature that @aokolnychyi cooked up. We add in a special combiner that allows multiple shuffle tasks to end up writing to the same file, this helped us out when users have an especially large target size but can't handle a shuffle with outputs that large.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, something like keeping files open across tasks in your internal version? Do you plan to remove this from open source, or is this something we need to keep so you don't have too many deltas from master?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to bring in the combiner to OSS as well but I believe it requires some additional catalyst extending. I'll remove it for now and @aokolnychyi and I can discuss whether to bring that feature into OSS. I think the only issue is that we will have to add more Spark Version dependent scala code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's omit it for now then. No sense in adding it if it won't work, right?


protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) {
Seq<SortOrder> scalaOrder = JavaConverters.asScalaBuffer(Lists.newArrayList(ordering));
return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, scalaOrder, plan, conf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use a List or Array for order rather than creating a Seq and passing that? Using Seq in our APIs can cause compatibility problems across Scala versions. It would be better to pass everything as an Array.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using this as Seq in our Scala code, I can change the API to always use array but I'm not sure that helps us if we have other library code using Seq?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pass Array into a method that requires a Seq in Scala, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that our SparkExtensions will still have the Scala version incompatibility. I switched this to array here though just in case.

@RussellSpitzer
Copy link
Member Author

@rdblue + @adamkennedy Changes applied, if you have time please take another look


@Override
public RewriteDataFiles sort(SortOrder sortOrder) {
this.strategy = sortStrategy().sortOrder(sortOrder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a check that this is only replacing the default binPack strategy? Right now, we would allow calling binPack().sort(SortOrder...build()).execuite() which would be strange.

If a task uses an incompatible partition spec the data inside could contain values which
belong to multiple partitions in the current spec. Treating all such files as un-partitioned and
grouping them together helps to minimize new files made.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: we typically use // even for multi-line comments.

}
}

static class EmptyStruct implements StructLike {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is no longer needed?

if (requiresRepartition) {
distribution = Spark3Util.buildRequiredDistribution(table);
ordering = Stream.concat(
Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
Copy link
Contributor

@rdblue rdblue Oct 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to use the table order if it is RANGE partitioned and has a table ordering. That will override the given ordering from the action config.

I think you should do something similar to Spark3Util.buildRequiredDistribution but pass around the action's sortOrder() rather than using the table's sort order. Also, the mode would always be range, so you can simplify it to just this:

ordering = Spark3Util.convert(SortOrderUtil.buildSortOrder(table.schema(), table.spec(), sortOrder()))
distribution = Distribution.ordered(ordering);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, refactored this

Implements Spark3 Sort Based compaction. Uses similar logic to the
Spark3BinPack Strategy but instead of doing a direct read then right,
issues a read, sort, and then write.
* Fix Sort Strategy when custom Sort Order is provided but table requires repartitioning
java.srcDirs = []
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed now? I thought that everything built just fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only builds fine if you aren't accessing Scala code from Java code. It works fine in the other direction, see
#2829 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so this was working fine because we hadn't accessed Scala from Java yet. Thanks!

*/
public static final String COMPRESSION_FACTOR = "compression-factor";

public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this isn't used. Can we remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep will remove it

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, but I'm curious about the build change and the unused config property.

@RussellSpitzer RussellSpitzer merged commit 61fe9b1 into apache:master Oct 18, 2021
@RussellSpitzer RussellSpitzer deleted the SparkSortCompaction branch October 18, 2021 22:03
@RussellSpitzer
Copy link
Member Author

Thanks @rdblue for the review!

chenjunjiedada pushed a commit to chenjunjiedada/incubator-iceberg that referenced this pull request Oct 20, 2021
Merge remote-tracking branch 'upstream/merge-master-20210816' into master
## 该MR主要解决什么?

merge upstream/master,引入最近的一些bugFix和优化

## 该MR的修改是什么?

核心关注PR:
> Predicate PushDown 支持,https://github.com/apache/iceberg/pull/2358, https://github.com/apache/iceberg/pull/2926, https://github.com/apache/iceberg/pull/2777/files
> Spark场景写入空dataset 报错问题,直接skip掉即可, apache#2960
> Flink UI补充uidPrefix到operator方便跟踪多个iceberg sink任务, apache#288
> Spark 修复nested Struct Pruning问题, apache#2877
> 可以使用Table Properties指定创建v2 format表,apache#2887
> 补充SortRewriteStrategy框架,逐步支持不同rewrite策略, apache#2609 (WIP:apache#2829)
> Spark 为catalog配置hadoop属性支持, apache#2792
> Spark 针对timestamps without timezone读写支持, apache#2757
> Spark MicroBatch支持配置属性skip delete snapshots, apache#2752
> Spark V2 RewriteDatafilesAction 支持
> Core: Add validation for row-level deletes with rewrites, apache#2865 > schema time travel 功能相关,补充schema-id, Core: add schema id to snapshot 
> Spark Extension支持identifier fields操作, apache#2560
> Parquet: Update to 1.12.0, apache#2441
> Hive: Vectorized ORC reads for Hive, apache#2613
> Spark: Add an action to remove all referenced files, apache#2415

## 该MR是如何测试的?

UT
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants