Skip to content

Conversation

@wypoon
Copy link
Contributor

@wypoon wypoon commented Sep 10, 2022

We enable setting the threshold for using a streaming delete filter to a low number (2), in order to exercise the streaming filter code path when counting number of positional deletes applied. This is a follow up to #4588.

@wypoon
Copy link
Contributor Author

wypoon commented Sep 12, 2022

@flyrain @RussellSpitzer this is a follow up to #4588. In that change, there is a code path that is not tested, which is counting positional deletes when using a streaming delete filter. I manually tested that code path by temporarily changing the threshold to use a streaming filter in DeleteFilter from 100,000 to 2 and running TestSparkReaderDeletes that way. With this change, we make the threshold configurable so we can set it for testing. I had actually introduced the change here in the original PR at some point, but Russell asked me to separate it out because the PR was already quite complex.

The logic behind this change is as follows:
We add a streamDeleteFilterThreshold field to SparkScan.ReadTask, because the planInputPartitions method of both SparkBatch and SparkMicroBatchStream construct SparkScan.ReadTasks, and SparkBatch and SparkMicroBatchStream both take a SparkReadConf and thus can get the threshold value from the SparkReadConf and pass it in when constructing SparkScan.ReadTask. SparkScan.RowReader and SparkScan.BatchReader both take a SparkScan.ReadTask in their constructor, so they can get the threshold from SparkScan.ReadTask and pass it up their constructor chain to their respective superclasses, RowDataReader and BatchDataReader, where in their open(FileScanTask) methods, they construct a BaseReader.SparkDeleteFilter, which is where we pass in the threshold value.

Comment on lines 255 to 261
super(filePath, deletes, table.schema(), expectedSchema, counter);
super(
filePath,
deletes,
table.schema(),
expectedSchema,
DeleteFilter.DEFAULT_STREAM_FILTER_THRESHOLD,
counter);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only change I make to spark/v3.2 here, which is needed because the superclass, DeleteFilter, now takes the additional parameter.
Once this PR is merged, I can port the changes to spark/v3.3 to v3.2.

@flyrain
Copy link
Contributor

flyrain commented Sep 20, 2022

Hi @wypoon, thanks for the PR. I don't see a strong reason to expose the threshold to users. Instead, it's better to hide it from users. Here are reasons:

  1. It is an internal threshold that user doesn't have to understand, and probably don't want to understand.
  2. We can potentially remove it in the future if possible. We use to discuss that here Core: Replace Set with Bitmap to make delete filtering simpler and faster #3535 (comment), it is not valid at that time though.
  3. We can adjust the value according to internal implementation. For example, we can increase the threshold when we use more efficient data structure to store pos delete rows.

What do you think?

@wypoon
Copy link
Contributor Author

wypoon commented Sep 21, 2022

Hi @wypoon, thanks for the PR. I don't see a strong reason to expose the threshold to users. Instead, it's better to hide it from users. Here are reasons:

  1. It is an internal threshold that user doesn't have to understand, and probably don't want to understand.
  2. We can potentially remove it in the future if possible. We use to discuss that here Core: Replace Set with Bitmap to make delete filtering simpler and faster #3535 (comment), it is not valid at that time though.
  3. We can adjust the value according to internal implementation. For example, we can increase the threshold when we use more efficient data structure to store pos delete rows.

What do you think?

I don't have a strong opinion on whether to expose this threshold to the user. We do expose various optimizations to the user, with sensible defaults, so users who are not interested or have no need to tune them do not need to. So even though this particular setting may not be of interest to most users, I don't see much harm in it. My main interest, though, is in allowing a way to set this threshold easily for testing the code path I mention. If you have a good suggestion for another way to set the threshold, I'm happy to consider it.

A hacky way would be to allow the threshold to be set in DeleteFilter by a system property, and to set and unset the property in the test.

@flyrain
Copy link
Contributor

flyrain commented Sep 22, 2022

@wypoon, we can put the new test case in the class like TestPositionFilter, in that case, it is easier to modify the threshold, and we don't have to touch any thing related to Spark.

@wypoon
Copy link
Contributor Author

wypoon commented Sep 24, 2022

@wypoon, we can put the new test case in the class like TestPositionFilter, in that case, it is easier to modify the threshold, and we don't have to touch any thing related to Spark.

@flyrain I don't think TestPositionFilter is the right place for testing the code path that is not exercised by existing tests. What I want to test is that the custom metric I added in #4588 produces the correct count of deletes applied, when using the streaming delete filter. E.g., I want to check TestSparkReaderDeletes#testMixedPosAndEqDeletesWithDeletedColumn passes when the streaming delete filter is used.
I think the best way is still to run TestSparkReaderDeletes but with the threshold to use the streaming delete filter set to a low number from the test.
I'll explore the system property approach, unless you have another suggestion.

This enables us to set the threshold to a low number (2), to exercise
the streaming filter code path when counting number of positional deletes
applied.
@wypoon wypoon force-pushed the num_deletes_followup_33 branch from 21e1280 to 41490b3 Compare September 24, 2022 20:07
@wypoon wypoon changed the title Spark: Add read conf for setting threshold to use streaming delete filter Spark: Test custom metric for number of deletes applied, in code path that use streaming delete filter Sep 24, 2022
@wypoon
Copy link
Contributor Author

wypoon commented Sep 24, 2022

I enable setting the threshold via a system property. I updated the PR title and description.

@wypoon
Copy link
Contributor Author

wypoon commented Sep 24, 2022

The change in this PR is now quite small. I can add the spark/v3.2 change here as well, or in a separate follow-up PR if preferred.

new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS);

private final long setFilterThreshold;
private final long streamFilterThreshold;
Copy link
Contributor Author

@wypoon wypoon Sep 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think calling this the streamFilterThreshold is more appropriate, since it is the threshold at which we use the streaming delete filter.

@flyrain
Copy link
Contributor

flyrain commented Sep 27, 2022

Hi @wypoon, sorry I may not be clear in my last comment. Let me explain a bit more.

  1. No matter whether the pos deletes are streamed or not. Most logic has already been tested by the cases in TestSparkReaderDeletes, which including the logic of mix of pos deletes and eq deletes.
  2. The only thing we forget to test is that, in case of streaming pos deletes, whether the count logic is correct, mainly what happens in the class PositionStreamDeleteFilter. TestPositionFilter will the right place for a unit test.
  3. BTW, I’m OK with the system property approach if there is no other way to approach it. It’s a bit hacky. In this case, I'd think it is not necessary.

@wypoon
Copy link
Contributor Author

wypoon commented Sep 27, 2022

The code paths that goes through the streaming filters -- Deletes.streamingMarker and Deletes.streamingFilter --

    return hasIsDeletedColumn
        ? Deletes.streamingMarker(
            records, this::pos, Deletes.deletePositions(filePath, deletes), this::markRowDeleted)
        : Deletes.streamingFilter(
            records, this::pos, Deletes.deletePositions(filePath, deletes), counter);

are called in DeleteFilter#applyPosDeletes, which is called by DeleteFilter#filter.
In the two streaming filters, there is a counter with state.
this::markRowDeleted ends up being BaseReader.SparkDeleteFilter#markRowDeleted, which uses the counter in the reader. In the other one, the counter is again the counter from the reader which is passed into the constructor of the DeleteFilter.
In addition, BaseReader.SparkDeleteFilter#markRowDeleted has to account for additional state:

      if (!row.getBoolean(columnIsDeletedPosition())) {
        row.setBoolean(columnIsDeletedPosition(), true);
        counter().increment();
      }

In order to avoid double-counting, it checks if the row has already been marked deleted. This happens to come into play in Deletes.markDeleted which is called by (among others) DeleteFilter#applyEqDeletes(CloseableIterable<T>) via DeleteFilter#createDeleteIterable. In this case, the applyEqDeletes is applied after the applyPosDeletes, but the point is that there is a lot of interaction of different parts of stateful code. Unless we are very clear that we know exactly what to test for, it is not obvious that just testing a call to Deletes.streamingMarker or a call to Deletes.streamingFilter has exercised the logic correctly.
That is why I think testing at the level of TestSparkReaderDeletes makes it obvious what we've exercised.

@wypoon
Copy link
Contributor Author

wypoon commented Sep 27, 2022

Even supposing we determined through careful tracing of the code paths that it is sufficient to test calling Deletes.streamingMarker and Deletes.streamingFilter, it is still critical to call Deletes.streamingMarker with BaseReader.SparkDeleteFilter#markRowDeleted, which is not accessible directly, so we could use a function that we define purely for testing that is implemented similarly, but this depends on direct knowledge of implementation details that could change in future. Hence I do not really favor this approach.

@github-actions
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 19, 2024
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Aug 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants