Skip to content

Conversation

@wypoon
Copy link
Contributor

@wypoon wypoon commented Aug 27, 2021

When estimating statistics, we should take the read schema into account.
As explained in the PR for SPARK-36568:
V2ScanRelationPushDown can column prune DataSourceV2ScanRelations and change read schema of Scan operations.
In SparkBatchScan.estimateStatistics(), before this change, for unpartitioned tables or partitioned tables with no filter expressions, we sum the file sizes of the files to be scanned and use that as the estimate for the size in bytes. With this change, we adjust that by the ratio of the size of the columns to be read to the size of all the columns; we also adjust that by the compression factor in case it is set (the default is 1.0). With this change, some joins that are broadcast joins on V1 tables but are SortMergeJoins on Iceberg tables can be broadcast joins as well.
Basically this change does for Iceberg what the above-mentioned Spark PR does for the built-in V2 FileScans.

When estimating statistics, we should take the read schema into account.
@github-actions github-actions bot added the spark label Aug 27, 2021
@wypoon
Copy link
Contributor Author

wypoon commented Aug 27, 2021

Here is a part of the optimized logical plan for a TPC-DS query (q18) on V1 tables:

+- Project [c_customer_sk#50, c_current_cdemo_sk#52, c_current_addr_sk#54, c_birth_year#63], Statistics(sizeInBytes=2.7 MiB)
   +- Filter (((c_birth_month#62 IN (1,6,8,9,12,2) AND isnotnull(c_customer_sk#50)) AND isnotnull(c_current_cdemo_sk#52)) AND isnotnull(c_current_addr_sk#54)), Statistics(sizeInBytes=25.5 MiB)
      +- Relation[c_customer_sk#50,c_customer_id#51,c_current_cdemo_sk#52,c_current_hdemo_sk#53,c_current_addr_sk#54,c_first_shipto_date_sk#55,c_first_sales_date_sk#56,c_salutation#57,c_first_name#58,c_last_name#59,c_preferred_cust_flag#60,c_birth_day#61,c_birth_month#62,c_birth_year#63,c_birth_country#64,c_login#65,c_email_address#66,c_last_review_date#67] parquet, Statistics(sizeInBytes=25.5 MiB)

The customer table is 25.5 MiB but the Project plan (4 columns of the 18 columns of the customer table) is small enough to be broadcast.
Here is the equivalent snippet for Iceberg tables before this change:

+- Project [c_customer_sk#59, c_current_cdemo_sk#61, c_current_addr_sk#63, c_birth_year#72], Statistics(sizeInBytes=21.9 MiB)
   +- Filter (((c_birth_month#71 IN (1,6,8,9,12,2) AND isnotnull(c_customer_sk#59)) AND isnotnull(c_current_cdemo_sk#61)) AND isnotnull(c_current_addr_sk#63)), Statistics(sizeInBytes=25.5 MiB)
      +- RelationV2[c_customer_sk#59, c_current_cdemo_sk#61, c_current_addr_sk#63, c_birth_month#71, c_birth_year#72] spark_catalog.tpcds_10_iceberg.customer, Statistics(sizeInBytes=25.5 MiB, rowCount=5.00E+5)

A read schema of 5 columns has already been pushed down to the Iceberg customer table, but the statistics for it is the full 25.5 MiB for the table. Consequently, the Project plan, which is estimated basically using the ratio of the size of the 4 columns to the size of the 5 columns times the 25.5 MiB, is too big to be broadcast.
With this change, the equivalent snippet becomes:

+- Project [c_customer_sk#59, c_current_cdemo_sk#61, c_current_addr_sk#63, c_birth_year#72], Statistics(sizeInBytes=2.0 MiB)
   +- Filter (((c_birth_month#71 IN (1,6,8,9,12,2) AND isnotnull(c_customer_sk#59)) AND isnotnull(c_current_cdemo_sk#61)) AND isnotnull(c_current_addr_sk#63)), Statistics(sizeInBytes=2.4 MiB)
      +- RelationV2[c_customer_sk#59, c_current_cdemo_sk#61, c_current_addr_sk#63, c_birth_month#71, c_birth_year#72] spark_catalog.tpcds_10_iceberg.customer, Statistics(sizeInBytes=2.4 MiB, rowCount=5.00E+5)

Here the statistics for the Iceberg customer table is adjusted according to read schema and is now 2.4 MiB, and the estimate for the Project adjusts it by the ratio of the size of the 4 columns to the size of the 5 columns, making it 2.0 MiB, and it can be broadcast.

@wypoon
Copy link
Contributor Author

wypoon commented Aug 27, 2021

@rdblue, @aokolnychyi, can you please review this? Can you also please enlighten me why we use record count * row size as the estimate for partitioned tables without filter expressions, but file sizes for unpartitioned tables or partitioned tables with filter expressions? Should we use the record count * row size everywhere?

@rdblue
Copy link
Contributor

rdblue commented Aug 29, 2021

I'm surprised to see that we report the total data file size back to Spark. At Netflix, we switched over to reporting row size estimate * number of rows a long time ago to fix broadcast join memory problems. Spark should ignore the size on disk and use the row size * num rows, but if Spark doesn't do that then I think Iceberg should report that as the size instead.

@rdblue
Copy link
Contributor

rdblue commented Aug 29, 2021

Thanks @wypoon! I'm okay with this, but I'd prefer to return a better estimate based on number of rows and not compressed size at all. Interested to hear what @aokolnychyi thinks.

Comment on lines -294 to +298
long approximateSize = 0;
for (StructField sparkField : tableSchema.fields()) {
approximateSize += sparkField.dataType().defaultSize();
long result;
try {
result = LongMath.checkedMultiply(tableSchema.defaultSize(), totalRecords);
} catch (ArithmeticException e) {
result = Long.MAX_VALUE;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StructType has a defaultSize method; there is no need to re-implement it. The main utility of the estimateSize static method is checking for overflow; we should just use Guava's LongMath.checkedMultiply.

LOG.debug("using table metadata to estimate table statistics");
long totalRecords = PropertyUtil.propertyAsLong(table.currentSnapshot().summary(),
SnapshotSummary.TOTAL_RECORDS_PROP, Long.MAX_VALUE);
Schema projectedSchema = expectedSchema != null ? expectedSchema : table.schema();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think expectedSchema is ever null. Thus, I think we can just call readSchema() to get the StructType.

@wypoon
Copy link
Contributor Author

wypoon commented Sep 2, 2021

@rdblue thanks for reviewing this. I changed the statistics estimation to use (row size * number of rows) for all cases. Of course, the row size estimation can be very far from accurate. Is this version preferable? @aokolnychyi or @RussellSpitzer do you prefer this change?

Fix SparkSchemaUtil.estimateSize to use LongMath.checkedMultiply.
@wypoon wypoon force-pushed the estimate_statistics branch from 682b648 to 5deeb56 Compare September 7, 2021 04:27
@rdblue rdblue merged commit b1d5a58 into apache:master Sep 12, 2021
@rdblue
Copy link
Contributor

rdblue commented Sep 12, 2021

Thanks, @wypoon! Looks great.

@aokolnychyi
Copy link
Contributor

Sorry for the delay. I think this change makes sense too. Thanks, @wypoon!

Shall we update Spark 2 as well?

@aokolnychyi
Copy link
Contributor

I created #3108 so that we don't forget.

@wypoon
Copy link
Contributor Author

wypoon commented Sep 14, 2021

@aokolnychyi thanks for seconding the change.
Yes, I think it makes sense to do the same for Spark 2 as well. I'll put up a PR.

@aokolnychyi
Copy link
Contributor

Much appreciated, @wypoon!

@wypoon wypoon deleted the estimate_statistics branch September 16, 2021 17:24
wypoon added a commit to wypoon/iceberg that referenced this pull request Sep 17, 2021
wypoon added a commit to wypoon/iceberg that referenced this pull request Sep 17, 2021
Follow-up to apache#3038.
Use (estimated) row size * number of rows to estimate the size instead of adding up file sizes.
The row size is estimated from the pruned schema if we prune columns.
rdblue pushed a commit that referenced this pull request Sep 17, 2021
Follow-up to #3038.
Use (estimated) row size * number of rows to estimate the size instead of adding up file sizes.
The row size is estimated from the pruned schema if we prune columns.
@zinking
Copy link
Contributor

zinking commented May 20, 2022

Thanks @wypoon! I'm okay with this, but I'd prefer to return a better estimate based on number of rows and not compressed size at all. Interested to hear what @aokolnychyi thinks.

well, rows * schema size over estimates table sizes under some circumstances, for example TPCDS-sf1000 Q7 demographies dim table, causing broadcast join degrade to sort merge join.

I guess it still works in 3.2 because of AQE.

totalSize * readcols size / total cols size is what hive adopted. but certainly this is underestimating in some circumstances.

cc @rdblue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants