diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index ae29e05d4339..33ee2f6aba63 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -39,11 +39,15 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.source.metrics.NumSplits; +import org.apache.iceberg.spark.source.metrics.TaskNumSplits; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReader; @@ -162,6 +166,11 @@ public String description() { return String.format("%s [filters=%s]", table, filters); } + @Override + public CustomMetric[] supportedCustomMetrics() { + return new CustomMetric[] { new NumSplits() }; + } + static class ReaderFactory implements PartitionReaderFactory { private final int batchSize; @@ -194,14 +203,32 @@ public boolean supportColumnarReads(InputPartition partition) { } private static class RowReader extends RowDataReader implements PartitionReader { + private long numSplits; + RowReader(ReadTask task) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive()); + numSplits = task.task.files().size(); + LOG.debug("Reading {} file split(s) for table {} using RowReader", numSplits, task.table().name()); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + return new CustomTaskMetric[] { new TaskNumSplits(numSplits) }; } } private static class BatchReader extends BatchDataReader implements PartitionReader { + private long numSplits; + BatchReader(ReadTask task, int batchSize) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize); + numSplits = task.task.files().size(); + LOG.debug("Reading {} file split(s) for table {} using BatchReader", numSplits, task.table().name()); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + return new CustomTaskMetric[] { new TaskNumSplits(numSplits) }; } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java new file mode 100644 index 000000000000..5cd3c8a75c92 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.metrics; + +import java.text.NumberFormat; +import org.apache.spark.sql.connector.metric.CustomMetric; + +public class NumSplits implements CustomMetric { + + @Override + public String name() { + return "numSplits"; + } + + @Override + public String description() { + return "number of file splits read"; + } + + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + long sum = initialValue; + for (int i = 0; i < taskMetrics.length; i++) { + sum += taskMetrics[i]; + } + return NumberFormat.getIntegerInstance().format(sum); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java new file mode 100644 index 000000000000..5715955be220 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskNumSplits implements CustomTaskMetric { + private final long value; + + public TaskNumSplits(long value) { + this.value = value; + } + + @Override + public String name() { + return "numSplits"; + } + + @Override + public long value() { + return value; + } +}