-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Add custom metric for number of deletes applied by a SparkScan #4588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
| package org.apache.iceberg.deletes; | ||
|
|
||
| public class DeleteCounter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is not strictly necessary. I could simply use an AtomicLong in its place. (I do not think, however, that the counter needs to be thread-safe, as each task will have its own counter.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the new class.
|
|
||
| public static ColumnVector forHolder(VectorHolder holder, int[] rowIdMapping, int numRows) { | ||
| return holder.isDummy() ? | ||
| new ConstantColumnVector(Types.IntegerType.get(), numRows, ((ConstantVectorHolder) holder).getConstant()) : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not know why the ConstantColumnVector is constructed with a hardcoded integer type. I observe that this code here can get called with ConstantVectorHolders containing a constant of other primitive types. If copy() is called on an InternalRow in the ColumnarBatch containing this ColumnVector, then we get a ClassCastException. In the situations where I call copy(), the ConstantColumnVector contains either a boolean or an integer, so I only fix it for those possibilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@flyrain this code was added by you; can you please explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied some code from class IcebergArrowColumnVector, and didn't change its construction logic. But I agreed with you that we shouldn't hard-code to int type.
|
@rdblue @RussellSpitzer would you please review? |
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java
Outdated
Show resolved
Hide resolved
| TableProperties.SPLIT_LOOKBACK_DEFAULT, | ||
| TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); | ||
|
|
||
| long delCount = 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like we are subverting the intent of this test which is to use the DF reader method to actually read the table. I think we'll need another approach for counting deletes or possibly just explicitly state how many deletes there are in tests that need to check it. At least until we have a method of reading a table with a user readable marker for deleted rows like _isDeleted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could replace
CloseableIterable<CombinedScanTask> tasks = TableScanUtil.planTasks(
table.newScan().planFiles(),
TableProperties.METADATA_SPLIT_SIZE_DEFAULT,
TableProperties.SPLIT_LOOKBACK_DEFAULT,
TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
with
SparkScanBuilder scanBuilder = new SparkScanBuilder(spark, table, CaseInsensitiveStringMap.empty());
scanBuilder.pruneColumns(sparkSchema);
SparkScan scan = (SparkScan) scanBuilder.buildMergeOnReadScan();
List<CombinedScanTask> tasks = scan.tasks();
That is the path that a Spark read takes. I am also doing most of the rest of what Spark does once we have the tasks and hand them to Spark in the form of InputPartitions, which is in each InputPartition, get a PartitionReader (which is a subclass of either BatchDataReader or RowDataReader) and iterate through the reader. I just don't assemble the InternalRows into a DataFrame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is the path it takes at this moment, but that's the issue here, we want this test check the output of
Dataset<Row> df = spark.read()
.format("iceberg")
.load(TableIdentifier.of("default", name).toString())
.selectExpr(columns);
Having this copy the implementation details decouples the test from what actually may be happening we change the code in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is another option:
I'll leave TestSparkReaderDeletes alone, and instead create a new class that also extends DeleteReadTests (or even TestSparkReaderDeletes) and override rowSet there. It is perhaps also not necessary to run all the tests in DeleteReadTests or TestSparkReaderDeletes in this new class. I had also increased the parameter space of TestSparkReaderDeletes in order to cover all 3 file formats, but I can dial it back and distribute the increased coverage to the new class. Basically, there will be no loss of test coverage.
|
@RussellSpitzer thank you for reviewing! |
| boolean posIsDeleted = roaring64Bitmap.contains(position); | ||
| if (counter != null && posIsDeleted) { | ||
| counter.increment(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we have defined the DeleteCounter interface ourselves, does it make sense to define a default NullDeleteCounter that is a no-op?
There are a number of places where we check if (counter != null).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, is there any concern that this makes calls to isDeleted not idempotent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are existing tests that have no need for counting deletes, that call some static methods in Deletes. I kept those forms of the methods and had them call new forms that take a DeleteCounter, passing a null DeleteCounter. That is the reason why a DeleteCounter field might be null. The way the code is now, if we decide to do without DeleteCounter and simply use an AtomicLong, then it can be updated very easily. I do understand your suggestion for a NullDeleteCounter.
You raise a good point about PositionDeleteIndex#isDeleted not being idempotent now. I have checked where it is called, and it is not a concern. We should document this point though, to prevent it being a problem in future code. The current use case for PositionDeleteIndex is to iterate through row positions and check against a PositionDeleteIndex if the position is deleted; in this use case, there is no reason to check a position more than once.
Before I ended up introducing a delete counter into the PositionDeleteIndex implementation and incrementing it when isDeleted is called, I considered other approaches, but there was one read path in particular where this seemed to the best solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a note to the javadoc for PositionDeleteIndex#isDeleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we be able to determine the number of deleted rows here by checking the number of entries in the bitmap? So shouldn't we be able to just count in the "delete methods"? Also do we have to pass through the counter here? Seems like we could just get this value out of the index when we are done building it instead of passing through the counter into the index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right that one can get the number of deleted positions in the PositionDeleteIndex by roaring64Bitmap.getLongCardinality().
However, having one counter keeps the bookkeeping simple and easy to reason about.
There are multiple paths through the code that can be taken, where deletes may be applied, and the unifying thing here is that the same counter is passed into all these places.
It is more difficult to reason through the counting if I have to account for the PositionDeleteIndex cases separately and I would have to keep references to the PositionDeleteIndex instances (I haven't looked in depth into this). I think it would be messy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think having "isDeleted" have a side effect ruins the book keeping simplicity for me. I'd much rather if we were to do this counter approach we would the "delete" methods rather than the "isDeleted" method. The delete method already has a side effect and we would expect it to be the place where the count would happen as opposed to "isDeleted" which we have to add a JavaDoc note to for illustrating the side effect behavior.
I still am not really sold on passing through the counter here, it just seems to me like we are complicating the internals of this class and we don't really need to since like we've already discussed, the value we are looking for is already calculated even without us passing through an object. I'm also not a big fan of behaviors being dependent on objects being null or not.
But if other reviewers feel differently I could go along with this approach as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it has been almost 4 weeks since I touched this (it seems that's how long the last update has gone unreviewed), I had forgotten the details of what I had investigated and had done. I went through the code paths again, and I see now that there is just one place where PositionDeleteIndex#delete() is called and where I need to track the delete count in the PositionDeleteIndex. So your suggestion is very feasible and not messy as I feared!
Thank you for the suggestion. I'll update the PR tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kbendick I considered using an Optional<DeleteCounter> instead of a DeleteCounter that could be null, but it's six of one and half a dozen of the other. In the end, I adopted your suggestion to have a no-op DeleteCounter.
|
@RussellSpitzer I have cleaned up the logging, and reworked the unit tests, on top of the updates to #4395. |
|
@RussellSpitzer please review again when you can. Thanks. |
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java
Show resolved
Hide resolved
| while (lastExecution.metricValues() == null && attempts > 0) { | ||
| try { | ||
| Thread.sleep(100); | ||
| attempts -= 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Why not attempts-- or --attempts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A scala-ism. :-)
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java
Show resolved
Hide resolved
|
@wypoon: LGTM. Left a few formatting asks, nothing big. For the next time, AFAIK we do separate PRs for the different versions. We implement stuff first for the latest version and then backport to the older ones. This is generally easier for the reviewer. |
…es applied (by a SparkScan).
Remove debug logging from ColumnarBatchReader. Explicitly disable vectorization for columnar file formats (orc, parquet) in the table properties when not testing the vectorized case.
…is present. Added tests for the metric for this case too.
Simplify setting of vectorization properties. Call checkDeleteCount directly.
Improve comments for a DeleteReadTests test case. Add blank line after if clause.
|
Rebased on master, and fixed the newly added |
|
LGTM. Thanks @wypoon. |
|
@flyrain have you sorted out your credentials? can you or another committer merge this now? Thanks. |
|
@wypoon I'd love to help on here. Unfortunately the issue is still there :-(. I wish [email protected] could be a bit more responsive. |
|
Thanks @wypoon for the PR and @flyrain, @kbendick and @RussellSpitzer for the reviews! |
|
Thanks @pvary! |
This is an extension of #4395.
Here we add a custom metric for the number of delete rows that have been applied in a scan of a format v2 table.
We introduce a counter in
BatchDataReaderandRowDataReader, that is incremented when a delete is applied. This counter is passed intoDeleteFilter, and in the cases where we construct aPositionDeleteIndex, is passed into the implementation of thePositionDeleteIndex. In all the read paths, the counter is incremented whenever a delete is applied. When Spark callscurrentMetricsValues()on aPartitionReader, which is a subclass of eitherBatchDataReaderorRowDataReader, we get the current value of the counter and return that.Tested manually by creating a format v2 table using each of Parquet, ORC, and Avro files, deleting and updating rows in the tables, and reading from the table. The expected number of deletes show up in the Spark UI.
Also extended existing unit tests (
DeleteReadTests) to count the number of deletes applied during the scan.