Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Aug 9, 2023

This PR optimizes lookup performance in DeleteFileIndex when no useful bounds are present.

After this change:

Benchmark                                                                           Mode  Cnt           Score            Error   Units
DeleteFileIndexBenchmark.buildIndexAndLookup                                          ss   10           0.777 ±          0.761    s/op
DeleteFileIndexBenchmark.buildIndexAndLookup:·gc.alloc.rate                           ss   10        1147.705 ±        343.684  MB/sec
DeleteFileIndexBenchmark.buildIndexAndLookup:·gc.alloc.rate.norm                      ss   10  1453142312.800 ±     996440.801    B/op
DeleteFileIndexBenchmark.buildIndexAndLookup:·gc.count                                ss   10           6.000                   counts

Prior to this change:

Benchmark                                                                      Mode  Cnt           Score            Error   Units
DeleteFileIndexBenchmark.buildIndexAndLookup                                     ss   10           4.833 ±          0.020    s/op
DeleteFileIndexBenchmark.buildIndexAndLookup:·gc.alloc.rate                      ss   10         715.000 ±          9.019  MB/sec
DeleteFileIndexBenchmark.buildIndexAndLookup:·gc.alloc.rate.norm                 ss   10  4014130976.000 ±     997237.313    B/op
DeleteFileIndexBenchmark.buildIndexAndLookup:·gc.count                           ss   10          12.000                   counts
image

@aokolnychyi aokolnychyi force-pushed the disable-column-stats-filtering-in-deletes branch from ff35899 to 1a8d720 Compare August 9, 2023 23:09
this.files = files;
}

public DeleteFile[] filter(long seq) {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to use the Stream API (which has overhead) if column stats filtering is disabled. Take a look at the flamegraph below that uses streams without column stats filtering.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the stream could be costly with useColumnStatsFiltering? Should we prefer to use for-loop? It seems we have heavy stream usage.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Stream API makes the implementation easier to read in case we need to iterate through the elements and check if each of them matches. We would need to try this separately and see how much more complicated it would be and what kind of benefits we get.

.toArray(DeleteFile[]::new);
}

private DeleteFile[] limitWithoutColumnStatsFiltering(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below why a separate method is added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find, maybe worth a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

public static final int ORC_BATCH_SIZE_DEFAULT = 5000;

public static final String DELETE_COLUMN_STATS_FILTERING_ENABLED =
"read.delete.column-stats-filtering.enabled";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still thinking about the best name. Ideas are welcome.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we should have an option to do this.

What we talked about was only applying stats filtering if the min and max file are the same, and otherwise disabling it. Why have an option when we know what the setting should be?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reconsidered that a bit after spending more time profiling.

  • We want to get rid of any extra overhead (like Stream API) and transforms if stats filtering is disabled. The deletes are being looked up in tight loops and MoR use cases are frequently needed for larger tables where CoW is just too expensive. That means we need to look up deletes for a large number of data files. Any extra work matters. All of this overhead is present in flamegraphs. This also becomes even more critical when data files are loaded in memory (distributed planning).
  • We want to skip even loading delete stats if they will not be used.
  • We may reconsider how our paths are being generated and sometimes include a temporal part to facilitate filtering. That won't be always possible but we can't assume the current way of generating file names is the only way to do that.
  • We want a way to disable column stats filtering for some equality delete use cases where filtering is not beneficial. We don't know how useful stats for equality deletes are. Even if they are different, they may be selective. So we can't skip indexing equality delete stats if they are different since they may be still useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your thoughts, @rdblue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep track whether we found any position delete files with the same min/max boundaries while building the index but other points would still apply.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another use case where it is beneficial to know whether delete stats filtering is on upfront is distributed planning. If stats are on, I need to use ParallelIterable. If not, there would be substantial overhead with that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I wasn't very clear about what I was trying to say. I'll reply to some of the specific points:

We want a way to disable column stats filtering for some equality delete use cases where filtering is not beneficial.

I completely agree that we should disable the filtering. What I'm saying is that we should completely disable it and not provide the option to turn it on or off. We don't have an expectation that it is helpful, except in the case where the min and max are identical.

I also think it would be good to detect the case where the path's min == max and put those positional delete files in a map. I think that is worth it because we have two cases for a positional delete file:

  1. There are deletes for multiple data files and stats aren't going to help
  2. There are deletes for just one file and stats tell us that

Those are very different cases because of the number of delete files. If you have a strategy to rewrite position deletes into one file per data file, then you'd have a lot of very narrowly targeted delete files. Using a map keyed by the delete path for this case makes a lot of sense because of the number of data files that are removed from the current delete index entirely.

We want to skip even loading delete stats if they will not be used

I agree in principle, but I don't think this is a huge cost. The data has to be read into memory anyway, we avoid converting to String, and it all runs in parallel.

The biggest cost by far is comparison --- in part because of the expense of comparing large strings, but more related to the number of comparisons. We have to compare each data file against each delete file with a newer sequence number. That amounts to O(m*n) comparisons if we have m delete files and n data files. Reducing m by separating out min == max delete files and then not running the stats comparison for the remaining position deletes seems like a really good start.

I could be convinced that we need to avoid even loading the maps, but I'd like to be convinced by benchmark numbers. Plus, we still need to read the upper and lower bounds for equality deletes. Not loading stats complicates the projection.

Another idea is to add the ability to load data for a subset of columns by ID.

We may reconsider how our paths are being generated and sometimes include a temporal part to facilitate filtering

Wouldn't we just be back to loading stats then? If I understand correctly, this would make the stats check more valuable, but it's more expensive than we want to even run it right now.

We want to get rid of any extra overhead (like Stream API) and transforms if stats filtering is disabled

I agree. Don't we often do this by not adding filters or transforms if they aren't needed?

@aokolnychyi aokolnychyi force-pushed the disable-column-stats-filtering-in-deletes branch from 1a8d720 to a2a5ee5 Compare August 9, 2023 23:28
@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Aug 9, 2023

private DeleteFile[] limitWithoutColumnStatsFiltering(
long sequenceNumber, DeleteFileGroup partitionDeletes) {

if (partitionDeletes == null) {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only invoked if global or partition deletes are not null. If both are null, we return earlier.

@aokolnychyi aokolnychyi force-pushed the disable-column-stats-filtering-in-deletes branch from a2a5ee5 to 69516a5 Compare August 9, 2023 23:41
@ConeyLiu
Copy link
Contributor

@aokolnychyi Do you find any reasons why the performance degradation when filtering with column status?

@ConeyLiu
Copy link
Contributor

Previously, I found that Avro decoding is a CPU-bound operation by profiling. I guess the cost may come from it.

@aokolnychyi
Copy link
Contributor Author

@ConeyLiu, the cost comes from decoding bounds, conversion and the actual comparison. For instance, we compare file paths that share a common prefix for position deletes.

@ConeyLiu
Copy link
Contributor

For instance, we compare file paths that share a common prefix for position deletes.

We notice this as well. We have increased the default truncate length of column metrics.

@ConeyLiu
Copy link
Contributor

We also noticed another performance issue here which may be worth mentioning is the decompression for Manifest File. Right now, we can not change the compression codec for the ManifestFile/ManifestListFile. The default one is GZIP. We have implemented some code to support changes by the write.avro.compression-codec. I could submit a PR for it if it is acceptable.

@aokolnychyi
Copy link
Contributor Author

@ConeyLiu, I wonder whether you can give this PR a try on some of your datasets to see whether the planning performance will improve and whether there would be more matches after the planning.

@ConeyLiu
Copy link
Contributor

ConeyLiu commented Aug 10, 2023

No problem, let me test the PR.

@ConeyLiu
Copy link
Contributor

ConeyLiu commented Aug 10, 2023

@aokolnychyi here are the results.

// enabled
totalPlanningDuration = TimerResult {
  timeUnit = NANOSECONDS,
  totalDuration = PT2M28.834846604S,
  count = 1
},
resultDataFiles = CounterResult {
  unit = COUNT,
  value = 676831
},
resultDeleteFiles = CounterResult {
  unit = COUNT,
  value = 3581513
},
indexedDeleteFiles = CounterResult {
  unit = COUNT,
  value = 684457
},
equalityDeleteFiles = CounterResult {
  unit = COUNT,
  value = 684457
},
positionalDeleteFiles = CounterResult {
  unit = COUNT,
  value = 0
}
// disabled
totalPlanningDuration = TimerResult {
  timeUnit = NANOSECONDS,
  totalDuration = PT1M18.308298092S,
  count = 1
},
resultDataFiles = CounterResult {
  unit = COUNT,
  value = 676831
},
resultDeleteFiles = CounterResult {
  unit = COUNT,
  value = 361118360
},
indexedDeleteFiles = CounterResult {
  unit = COUNT,
  value = 684457
},
equalityDeleteFiles = CounterResult {
  unit = COUNT,
  value = 684457
},
positionalDeleteFiles = CounterResult {
  unit = COUNT,
  value = 0
}

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Aug 10, 2023

Thanks a lot for testing this out, @ConeyLiu! Disabling column stats filtering would clearly be a bad idea in that use case, even though the planning speed improved. I assume that table was partitioned, right?

We also noticed another performance issue here which may be worth mentioning is the decompression for Manifest File. Right now, we can not change the compression codec for the ManifestFile/ManifestListFile. The default one is GZIP. We have implemented some code to support changes by the write.avro.compression-codec. I could submit a PR for it if it is acceptable.

Yeah, we use DeflateCodec by default. It makes sense to have that configurable. I'll need to think a bit whether manifest writers should pick up Avro config for writing data or have its own. Do you want to create a PR, @ConeyLiu?

@ConeyLiu
Copy link
Contributor

ConeyLiu commented Aug 11, 2023

I assume that table was partitioned, right?

Yeah, it is partitioned by (year(ts), other_column)
Yeah, it is partitioned by (day(ts), other_column)

Yeah, we use DeflateCodec by default. It makes sense to have that configurable. I'll need to think a bit whether manifest writers should pick up Avro config for writing data or have its own. Do you want to create a PR, @ConeyLiu?

I have submitted #8284, and just noticed by @nastra there is a similar PR #6799

@rdblue
Copy link
Contributor

rdblue commented Aug 11, 2023

@ConeyLiu, is that table's delete key aligned with that partitioning? Meaning are you deleting by values in other_column?

@ConeyLiu
Copy link
Contributor

ConeyLiu commented Aug 12, 2023

@rdblue Actually, the table is from customers. I checked with him and correct the table partition. It is partitioned by (day(ts), a_string_column). The data is upsert into Iceberg with a flink Job. The primary key is: (a_string_column, the_partition_string_column, the_ts_column).

@rdblue
Copy link
Contributor

rdblue commented Aug 13, 2023

@ConeyLiu, are you using a keyBy to distribute the data by partition or could any task produce output for any partition?

@ConeyLiu
Copy link
Contributor

@rdblue, it is keyBy the partition. The distribution mode is set to hash.

return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));

@aokolnychyi
Copy link
Contributor Author

As #8360 is in, I will rebase this PR to use the optimized lookup only if there is no useable bounds.

@aokolnychyi aokolnychyi force-pushed the disable-column-stats-filtering-in-deletes branch from 69516a5 to 0d3b06e Compare August 22, 2023 16:45
@aokolnychyi
Copy link
Contributor Author

I've updated this PR to use a faster lookup when no useful stats are present, which should be common in Spark after #8360. I kept equality delete logic as before. We may consider a table property in the future if we want to disable this for equality deletes.

@aokolnychyi aokolnychyi changed the title Core: Ability to disable column stats filtering in DeleteFileIndex Core: Optimize lookup in DeleteFileIndex without useful bounds Aug 22, 2023
@aokolnychyi
Copy link
Contributor Author

I've also updated the PR description with new benchmark results.

Copy link
Contributor

@ConeyLiu ConeyLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Just one question

if (globalDeletes == null && partitionDeletes == null) {
return NO_DELETES;
} else if (useColumnStatsFiltering) {
return limitWithColumnStatsFiltering(sequenceNumber, file, partitionDeletes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about filterWithColumnStats, filterWithoutColumnStats ?

Slightly shorter, and not sure if word 'limit' has any different significance than filter worth calling out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of wanted to indicate that we both limit by sequence number as well as filter using column stats.

.toArray(DeleteFile[]::new);
}

private DeleteFile[] limitWithoutColumnStatsFiltering(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find, maybe worth a comment

IndexedDeleteFile indexedFile = new IndexedDeleteFile(spec, file);
deleteFilesByPartition.put(Pair.of(specId, wrapper), indexedFile);

if (!useColumnStatsFiltering) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useColumnStatsFiltering |= indexedFile.hasLowerAndUpperBounds() also works, but optional as its just style perference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am avoiding an extra call to hasLowerAndUpperBounds if we have already seen a file with stats. This allows us to avoid loading and converting the underlying boundaries if present. It is not guaranteed we would have to check those boundaries as the delete file could be filtered using sequence numbers.

Copy link
Member

@szehon-ho szehon-ho Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea my thought was it translates to:

useColumnStatsWithFiltering = useColumnStatsWithFiltering || indexFile.hasLowerAndUpperBound, which shouldn't evaluate the second one in theory if I understand correctly.

So was thinking JVM would do the optimization for us, but not a big deal either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order in which the operands are evaluated is not guaranteed. In a lot of cases, JVM would start evaluate both at the same time. I've seen this while profiling.

@aokolnychyi aokolnychyi merged commit d61159e into apache:master Aug 24, 2023
@aokolnychyi
Copy link
Contributor Author

@rdblue, I've merged this to consume it in distributed planning, but I'd appreciate another look. It follows the idea we discussed on the thread above: there is no option to turn the stats filtering on or off, we check if bounds are present. Position deletes that cover multiple data files would not have bounds anymore. I think not storing them is even better than checking at planning time if they are equal.

@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @ConeyLiu @szehon-ho @jerqi @rdblue!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants