diff --git a/core/src/main/java/org/apache/iceberg/puffin/StandardBlobTypes.java b/core/src/main/java/org/apache/iceberg/puffin/StandardBlobTypes.java index 2be5df5f88b9..cdc9453e71bd 100644 --- a/core/src/main/java/org/apache/iceberg/puffin/StandardBlobTypes.java +++ b/core/src/main/java/org/apache/iceberg/puffin/StandardBlobTypes.java @@ -26,4 +26,6 @@ private StandardBlobTypes() {} * href="https://datasketches.apache.org/">Apache DataSketches library */ public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1"; + + public static final String NDV_BLOB = "ndv-blob"; } diff --git a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index 6a21e79c2803..c24728f2eb0e 100644 --- a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -120,6 +120,7 @@ transformArgument expression : constant | stringMap + | stringArray ; constant @@ -133,6 +134,10 @@ stringMap : MAP '(' constant (',' constant)* ')' ; +stringArray + : ARRAY '(' constant (',' constant)* ')' + ; + booleanValue : TRUE | FALSE ; @@ -171,7 +176,7 @@ nonReserved : ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE | DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET | TRUE | FALSE - | MAP + | MAP | ARRAY ; ADD: 'ADD'; @@ -203,6 +208,7 @@ TRUE: 'TRUE'; FALSE: 'FALSE'; MAP: 'MAP'; +ARRAY: 'ARRAY'; PLUS: '+'; MINUS: '-'; diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDistinctCountProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDistinctCountProcedure.java new file mode 100644 index 000000000000..7b1166345302 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDistinctCountProcedure.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.BlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestDistinctCountProcedure extends SparkExtensionsTestBase { + + public TestDistinctCountProcedure( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Before + public void setupTable() { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testAnalyze() { + sql("INSERT INTO %s VALUES (1, 'a')", tableName); + sql("INSERT INTO %s VALUES (2, 'b')", tableName); + sql("INSERT INTO %s VALUES (3, 'a')", tableName); + sql("INSERT INTO %s VALUES (4, 'b')", tableName); + sql("INSERT INTO %s VALUES (5, 'a')", tableName); + sql("INSERT INTO %s VALUES (6, 'b')", tableName); + + List returns = + sql( + "CALL %s.system.distinct_count(" + + "table => '%s'," + + "distinct_count_view => '%s'," + + "columns => array('%s','%s'))", + catalogName, tableName, "result", "id", "data"); + + Table table = validationCatalog.loadTable(tableIdent); + List files = table.statisticsFiles(); + List metadataList = ((GenericStatisticsFile) files.get(0)).blobMetadata(); + + BlobMetadata firstBlob = metadataList.get(0); + Assertions.assertThat(firstBlob.type()).as("type").isEqualTo(StandardBlobTypes.NDV_BLOB); + Assertions.assertThat(firstBlob.fields()).as("columns").isEqualTo(ImmutableList.of(0)); + Assertions.assertThat(Long.parseLong(firstBlob.properties().get("ndv"))).isEqualTo(6); + + BlobMetadata secondBlob = metadataList.get(1); + Assertions.assertThat(secondBlob.type()).as("type").isEqualTo(StandardBlobTypes.NDV_BLOB); + Assertions.assertThat(secondBlob.fields()).as("columns").isEqualTo(ImmutableList.of(1)); + Assertions.assertThat(Long.parseLong(secondBlob.properties().get("ndv"))).isEqualTo(2); + + String viewName = (String) returns.get(0)[0]; + assertEquals( + "Rows should match", ImmutableList.of(row(6L, 2L)), sql("select * from %s", viewName)); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java index 86364dc262b2..fa9763f78d96 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java @@ -50,6 +50,9 @@ abstract class BaseProcedure implements Procedure { protected static final DataType STRING_MAP = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType); + protected static final DataType STRING_ARRAY = + DataTypes.createArrayType(DataTypes.StringType, false); + private final SparkSession spark; private final TableCatalog tableCatalog; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/DistinctCountProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/DistinctCountProcedure.java new file mode 100644 index 000000000000..304904d07476 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/DistinctCountProcedure.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.puffin.StandardBlobTypes; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A procedure that gets approximate NDV (number of distinct value) for the requested columns and + * sets this to the table's StatisticsFile. + */ +public class DistinctCountProcedure extends BaseProcedure { + private static final Logger LOG = LoggerFactory.getLogger(DistinctCountProcedure.class); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.optional("distinct_count_view", DataTypes.StringType), + ProcedureParameter.optional("columns", STRING_ARRAY), + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("view_name", DataTypes.StringType, false, Metadata.empty()) + }); + + public static SparkProcedures.ProcedureBuilder builder() { + return new Builder() { + @Override + protected DistinctCountProcedure doBuild() { + return new DistinctCountProcedure(tableCatalog()); + } + }; + } + + private DistinctCountProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + String tableName = args.getString(0); + Identifier tableIdent = toIdentifier(tableName, PARAMETERS[0].name()); + SparkTable sparkTable = loadSparkTable(tableIdent); + StructType schema = sparkTable.schema(); + Table table = sparkTable.table(); + ArrayData columns = args.getArray(2); + int columnSizes = columns.numElements(); + + long[] ndvs = new long[columnSizes]; + int[] fieldId = new int[columnSizes]; + String query = "SELECT "; + for (int i = 0; i < columnSizes; i++) { + String colName = columns.getUTF8String(i).toString(); + query += "APPROX_COUNT_DISTINCT(" + colName + "), "; + fieldId[i] = schema.fieldIndex(colName); + } + + query = query.substring(0, query.length() - 2) + " FROM " + tableName; + Dataset df = spark().sql(query); + + for (int i = 0; i < columnSizes; i++) { + ndvs[i] = df.head().getLong(i); + } + + TableOperations operations = ((HasTableOperations) table).operations(); + FileIO fileIO = ((HasTableOperations) table).operations().io(); + String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID())); + OutputFile outputFile = fileIO.newOutputFile(path); + + try (PuffinWriter writer = + Puffin.write(outputFile).createdBy("Spark DistinctCountProcedure").build()) { + for (int i = 0; i < columnSizes; i++) { + writer.add( + new Blob( + StandardBlobTypes.NDV_BLOB, + ImmutableList.of(fieldId[i]), + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + ByteBuffer.allocate(0), + null, + ImmutableMap.of("ndv", Long.toString(ndvs[i])))); + } + writer.finish(); + + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + table.currentSnapshot().snapshotId(), + path, + writer.fileSize(), + writer.footerSize(), + writer.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + + table + .updateStatistics() + .setStatistics(table.currentSnapshot().snapshotId(), statisticsFile) + .commit(); + } catch (IOException exception) { + throw new UncheckedIOException(exception); + } + + String viewName = viewName(args, tableName); + // Create a view for users to query + df.createOrReplaceTempView(viewName); + return toOutputRows(viewName); + } + + @NotNull + private static String viewName(InternalRow args, String tableName) { + String viewName = args.isNullAt(1) ? null : args.getString(1); + if (viewName == null) { + String shortTableName = + tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName; + viewName = shortTableName + "_changes"; + } + return viewName; + } + + private InternalRow[] toOutputRows(String viewName) { + InternalRow row = newInternalRow(UTF8String.fromString(viewName)); + return new InternalRow[] {row}; + } + + @Override + public String description() { + return "DistinctCountProcedure"; + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index 6d59cb876b1e..8ecf0279c1e4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -53,6 +53,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder); mapBuilder.put("register_table", RegisterTableProcedure::builder); mapBuilder.put("publish_changes", PublishChangesProcedure::builder); + mapBuilder.put("distinct_count", DistinctCountProcedure::builder); return mapBuilder.build(); }