From 9bc412417f4b5db51b5fafbc5a4d38201364b0c3 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 9 May 2023 13:48:50 -0700 Subject: [PATCH 1/3] Spark 3.4: Add RewritePositionDeleteFilesProcedure --- ...stRewritePositionDeleteFilesProcedure.java | 152 ++++++++++++++++++ .../RewritePositionDeleteFilesProcedure.java | 114 +++++++++++++ .../spark/procedures/SparkProcedures.java | 1 + 3 files changed, 267 insertions(+) create mode 100644 spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java create mode 100644 spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java new file mode 100644 index 000000000000..88715fd89c74 --- /dev/null +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP; +import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.TestHelpers; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Encoders; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase { + + public TestRewritePositionDeleteFilesProcedure( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + private void createTable() throws Exception { + sql( + "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES" + + "('format-version'='2', 'write.delete.mode'='merge-on-read')", + tableName); + + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c"), + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e"), + new SimpleRecord(6, "f")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testExpireDeleteFilesAll() throws Exception { + createTable(); + + sql("DELETE FROM %s WHERE id=1", tableName); + sql("DELETE FROM %s WHERE id=2", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertEquals(2, TestHelpers.deleteFiles(table).size()); + + List output = + sql( + "CALL %s.system.rewrite_position_delete_files(" + + "table => '%s'," + + "options => map(" + + "'rewrite-all','true'))", + catalogName, tableIdent); + table.refresh(); + + Map snapshotSummary = snapshotSummary(); + assertEquals( + "Should delete 2 delete files and add 1", + ImmutableList.of( + row( + 2, + 1, + Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)), + Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))), + output); + + Assert.assertEquals(1, TestHelpers.deleteFiles(table).size()); + } + + @Test + public void testExpireDeleteFilesNoOption() throws Exception { + createTable(); + + sql("DELETE FROM %s WHERE id=1", tableName); + sql("DELETE FROM %s WHERE id=2", tableName); + sql("DELETE FROM %s WHERE id=3", tableName); + sql("DELETE FROM %s WHERE id=4", tableName); + sql("DELETE FROM %s WHERE id=5", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertEquals(5, TestHelpers.deleteFiles(table).size()); + + List output = + sql( + "CALL %s.system.rewrite_position_delete_files(" + "table => '%s')", + catalogName, tableIdent); + table.refresh(); + + Map snapshotSummary = snapshotSummary(); + assertEquals( + "Should replace 5 delete files with 1", + ImmutableList.of( + row( + 5, + 1, + Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)), + Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))), + output); + } + + @Test + public void testInvalidOption() throws Exception { + createTable(); + + Assert.assertThrows( + "Cannot use options [foo], they are not supported by the action or the rewriter BIN-PACK", + IllegalArgumentException.class, + () -> + sql( + "CALL %s.system.rewrite_position_delete_files(" + + "table => '%s'," + + "options => map(" + + "'foo', 'bar'))", + catalogName, tableIdent)); + } + + private Map snapshotSummary() { + return validationCatalog.loadTable(tableIdent).currentSnapshot().summary(); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java new file mode 100644 index 000000000000..4a5cad2e9874 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.procedures; + +import java.util.Map; +import org.apache.iceberg.actions.RewritePositionDeleteFiles; +import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +public class RewritePositionDeleteFilesProcedure extends BaseProcedure { + + private static final ProcedureParameter TABLE_PARAM = + ProcedureParameter.required("table", DataTypes.StringType); + private static final ProcedureParameter OPTIONS_PARAM = + ProcedureParameter.optional("options", STRING_MAP); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM}; + + // counts are not nullable since the action result is never null + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField( + "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()), + new StructField( + "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()), + new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty()) + }); + + public static SparkProcedures.ProcedureBuilder builder() { + return new Builder() { + @Override + protected RewritePositionDeleteFilesProcedure doBuild() { + return new RewritePositionDeleteFilesProcedure(tableCatalog()); + } + }; + } + + private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); + + Identifier tableIdent = input.ident(TABLE_PARAM); + + Map options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of()); + + return modifyIcebergTable( + tableIdent, + table -> { + RewritePositionDeleteFiles action = + actions().rewritePositionDeletes(table).options(options); + Result result = action.execute(); + + return toOutputRows(result); + }); + } + + private InternalRow[] toOutputRows(Result result) { + int rewrittenDeleteFilesCount = result.rewrittenDeleteFilesCount(); + long rewrittenBytesCount = result.rewrittenBytesCount(); + int addedDeleteFilesCount = result.addedDeleteFilesCount(); + long addedBytesCount = result.addedBytesCount(); + + InternalRow row = + newInternalRow( + rewrittenDeleteFilesCount, addedDeleteFilesCount, rewrittenBytesCount, addedBytesCount); + return new InternalRow[] {row}; + } + + @Override + public String description() { + return "RewritePositionDeleteFilesProcedure"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index 8ee3a9550194..7ebbb46c3d2d 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -54,6 +54,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("register_table", RegisterTableProcedure::builder); mapBuilder.put("publish_changes", PublishChangesProcedure::builder); mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder); + mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder); return mapBuilder.build(); } From d829806d70c1dae80b5fa60f5265f4d8f5260e41 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 15 May 2023 14:23:32 -0700 Subject: [PATCH 2/3] Review comments --- .../procedures/RewritePositionDeleteFilesProcedure.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java index 4a5cad2e9874..fca3ec5efa00 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.procedures; import java.util.Map; +import org.apache.iceberg.Table; import org.apache.iceberg.actions.RewritePositionDeleteFiles; import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -31,6 +32,11 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +/** + * A procedure that rewrites position delete files in a table. + * + * @see org.apache.iceberg.spark.actions.SparkActions#rewritePositionDeletes(Table) + */ public class RewritePositionDeleteFilesProcedure extends BaseProcedure { private static final ProcedureParameter TABLE_PARAM = @@ -41,7 +47,6 @@ public class RewritePositionDeleteFilesProcedure extends BaseProcedure { private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM}; - // counts are not nullable since the action result is never null private static final StructType OUTPUT_TYPE = new StructType( new StructField[] { @@ -79,9 +84,7 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); - Identifier tableIdent = input.ident(TABLE_PARAM); - Map options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of()); return modifyIcebergTable( From 662e534cfc7917a2ce522248b2693c589a436d42 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 16 May 2023 16:36:42 -0700 Subject: [PATCH 3/3] Rebase and review comments --- .../RewritePositionDeleteFilesProcedure.java | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java index fca3ec5efa00..a4a3f63ba766 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java @@ -20,7 +20,6 @@ import java.util.Map; import org.apache.iceberg.Table; -import org.apache.iceberg.actions.RewritePositionDeleteFiles; import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.spark.sql.catalyst.InternalRow; @@ -90,24 +89,17 @@ public InternalRow[] call(InternalRow args) { return modifyIcebergTable( tableIdent, table -> { - RewritePositionDeleteFiles action = - actions().rewritePositionDeletes(table).options(options); - Result result = action.execute(); - - return toOutputRows(result); + Result result = actions().rewritePositionDeletes(table).options(options).execute(); + return new InternalRow[] {toOutputRow(result)}; }); } - private InternalRow[] toOutputRows(Result result) { - int rewrittenDeleteFilesCount = result.rewrittenDeleteFilesCount(); - long rewrittenBytesCount = result.rewrittenBytesCount(); - int addedDeleteFilesCount = result.addedDeleteFilesCount(); - long addedBytesCount = result.addedBytesCount(); - - InternalRow row = - newInternalRow( - rewrittenDeleteFilesCount, addedDeleteFilesCount, rewrittenBytesCount, addedBytesCount); - return new InternalRow[] {row}; + private InternalRow toOutputRow(Result result) { + return newInternalRow( + result.rewrittenDeleteFilesCount(), + result.addedDeleteFilesCount(), + result.rewrittenBytesCount(), + result.addedBytesCount()); } @Override