From b963a437866d2ba03f3f75768364e0db915b1c3d Mon Sep 17 00:00:00 2001 From: wulingqi Date: Mon, 18 Oct 2021 23:48:13 +0800 Subject: [PATCH 1/9] Spark: Expose AncestorsOf Snapshot as Spark Procedure(#3297) --- .../extensions/TestAncestorsOfProcedure.java | 147 ++++++++++++++++++ .../procedures/AncestorsOfProcedure.java | 104 +++++++++++++ .../spark/procedures/SparkProcedures.java | 1 + 3 files changed, 252 insertions(+) create mode 100644 spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java create mode 100644 spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java new file mode 100644 index 000000000000..e8cc69fdc7d8 --- /dev/null +++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.AnalysisException; +import org.junit.After; +import org.junit.Test; + +public class TestAncestorsOfProcedure extends SparkExtensionsTestBase { + + public TestAncestorsOfProcedure( + String catalogName, + String implementation, + Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testAncestorOfUsingEmptyArgs() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Long currentSnapshotId = table.currentSnapshot().snapshotId(); + Long preSnapshotId = table.currentSnapshot().parentId(); + + List output = sql("CALL %s.system.ancestors_of('%s')", + catalogName, tableIdent); + + assertEquals( + "Procedure output must match", + ImmutableList.of(row(currentSnapshotId), row(preSnapshotId)), + output); + } + + @Test + public void testAncestorOfUsingSnapshotId() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Long currentSnapshotId = table.currentSnapshot().snapshotId(); + Long preSnapshotId = table.currentSnapshot().parentId(); + + assertEquals( + "Procedure output must match", + ImmutableList.of(row(currentSnapshotId), row(preSnapshotId)), + sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, currentSnapshotId)); + + assertEquals( + "Procedure output must match", + ImmutableList.of(row(preSnapshotId)), + sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, preSnapshotId)); + } + + @Test + public void testAncestorOfWithRollBack() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + Table table = validationCatalog.loadTable(tableIdent); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + table.refresh(); + Long firstSnapshotId = table.currentSnapshot().snapshotId(); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + table.refresh(); + Long secondSnapshotId = table.currentSnapshot().snapshotId(); + sql("INSERT INTO TABLE %s VALUES (3, 'c')", tableName); + table.refresh(); + Long thirdSnapshotId = table.currentSnapshot().snapshotId(); + + // roll back + sql("CALL %s.system.rollback_to_snapshot('%s', %dL)", + catalogName, tableIdent, secondSnapshotId); + + sql("INSERT INTO TABLE %s VALUES (4, 'd')", tableName); + table.refresh(); + Long fourthSnapshotId = table.currentSnapshot().snapshotId(); + + assertEquals( + "Procedure output must match", + ImmutableList.of(row(fourthSnapshotId), row(secondSnapshotId), row(firstSnapshotId)), + sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, fourthSnapshotId)); + + assertEquals( + "Procedure output must match", + ImmutableList.of(row(thirdSnapshotId), row(secondSnapshotId), row(firstSnapshotId)), + sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, thirdSnapshotId)); + } + + @Test + public void testAncestorOfUsingNamedArgs() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Long firstSnapshotId = table.currentSnapshot().snapshotId(); + + assertEquals( + "Procedure output must match", + ImmutableList.of(row(firstSnapshotId)), + sql("CALL %s.system.ancestors_of(snapshot_id => %dL, table => '%s')", + catalogName, firstSnapshotId, tableIdent)); + } + + @Test + public void testInvalidAncestorOfCases() { + AssertHelpers.assertThrows("Should reject calls without all required args", + AnalysisException.class, "Missing required parameters", + () -> sql("CALL %s.system.ancestors_of()", catalogName)); + + AssertHelpers.assertThrows("Should reject calls with empty table identifier", + IllegalArgumentException.class, "Cannot handle an empty identifier for argument table", + () -> sql("CALL %s.system.ancestors_of('')", catalogName)); + + AssertHelpers.assertThrows("Should reject calls with invalid arg types", + AnalysisException.class, "Wrong arg type for snapshot_id: cannot cast", + () -> sql("CALL %s.system.ancestors_of('%s', 1.1)", catalogName, tableIdent)); + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java new file mode 100644 index 000000000000..ddde86c677b7 --- /dev/null +++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.procedures; + +import java.util.List; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +public class AncestorsOfProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.optional("snapshot_id", DataTypes.LongType), + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[] { + new StructField("snapshot_id_new_to_old ↓", DataTypes.LongType, true, Metadata.empty()), + }); + + private AncestorsOfProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + protected AncestorsOfProcedure doBuild() { + return new AncestorsOfProcedure(tableCatalog()); + } + }; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + Long toSnapshotId = args.isNullAt(1) ? null : args.getLong(1); + + SparkTable sparkTable = loadSparkTable(tableIdent); + Table icebergTable = sparkTable.table(); + + if (toSnapshotId == null) { + toSnapshotId = icebergTable.currentSnapshot() != null ? icebergTable.currentSnapshot().snapshotId() : -1; + } + + List snapshotIds = SnapshotUtil.snapshotIdsBetween(icebergTable, 0L, toSnapshotId); + + return toOutPutRow(snapshotIds); + } + + @Override + public String description() { + return "AncestorsOf"; + } + + private InternalRow[] toOutPutRow(List snapshotIds) { + if (snapshotIds.isEmpty()) { + return new InternalRow[0]; + } + + InternalRow[] internalRows = new InternalRow[snapshotIds.size()]; + internalRows[0] = newInternalRow(snapshotIds.get(0)); + for (int i = 1; i < snapshotIds.size(); i++) { + internalRows[i] = newInternalRow(snapshotIds.get(i)); + } + + return internalRows; + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index fc9d4ae9a348..42545abe11d2 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -51,6 +51,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("migrate", MigrateTableProcedure::builder); mapBuilder.put("snapshot", SnapshotTableProcedure::builder); mapBuilder.put("add_files", AddFilesProcedure::builder); + mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder); return mapBuilder.build(); } From 1a6ee7026bf538d61c4892d9d213fdbfb589620d Mon Sep 17 00:00:00 2001 From: wulingqi Date: Thu, 21 Oct 2021 10:21:23 +0800 Subject: [PATCH 2/9] update ancestors result col name and add timestamp --- .../extensions/TestAncestorsOfProcedure.java | 25 ++++++++++++++----- .../procedures/AncestorsOfProcedure.java | 13 +++++----- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java index e8cc69fdc7d8..65e7efe8ec1f 100644 --- a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java +++ b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java @@ -50,14 +50,16 @@ public void testAncestorOfUsingEmptyArgs() { Table table = validationCatalog.loadTable(tableIdent); Long currentSnapshotId = table.currentSnapshot().snapshotId(); + Long currentTimestamp = table.currentSnapshot().timestampMillis(); Long preSnapshotId = table.currentSnapshot().parentId(); + Long preTimeStamp = table.snapshot(table.currentSnapshot().parentId()).timestampMillis(); List output = sql("CALL %s.system.ancestors_of('%s')", catalogName, tableIdent); assertEquals( "Procedure output must match", - ImmutableList.of(row(currentSnapshotId), row(preSnapshotId)), + ImmutableList.of(row(currentSnapshotId, currentTimestamp), row(preSnapshotId, preTimeStamp)), output); } @@ -69,16 +71,18 @@ public void testAncestorOfUsingSnapshotId() { Table table = validationCatalog.loadTable(tableIdent); Long currentSnapshotId = table.currentSnapshot().snapshotId(); + Long currentTimestamp = table.currentSnapshot().timestampMillis(); Long preSnapshotId = table.currentSnapshot().parentId(); + Long preTimeStamp = table.snapshot(table.currentSnapshot().parentId()).timestampMillis(); assertEquals( "Procedure output must match", - ImmutableList.of(row(currentSnapshotId), row(preSnapshotId)), + ImmutableList.of(row(currentSnapshotId, currentTimestamp), row(preSnapshotId, preTimeStamp)), sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, currentSnapshotId)); assertEquals( "Procedure output must match", - ImmutableList.of(row(preSnapshotId)), + ImmutableList.of(row(preSnapshotId, preTimeStamp)), sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, preSnapshotId)); } @@ -89,12 +93,15 @@ public void testAncestorOfWithRollBack() { sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); table.refresh(); Long firstSnapshotId = table.currentSnapshot().snapshotId(); + Long firstTimestamp = table.currentSnapshot().timestampMillis(); sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); table.refresh(); Long secondSnapshotId = table.currentSnapshot().snapshotId(); + Long secondTimestamp = table.currentSnapshot().timestampMillis(); sql("INSERT INTO TABLE %s VALUES (3, 'c')", tableName); table.refresh(); Long thirdSnapshotId = table.currentSnapshot().snapshotId(); + Long thirdTimestamp = table.currentSnapshot().timestampMillis(); // roll back sql("CALL %s.system.rollback_to_snapshot('%s', %dL)", @@ -103,15 +110,20 @@ public void testAncestorOfWithRollBack() { sql("INSERT INTO TABLE %s VALUES (4, 'd')", tableName); table.refresh(); Long fourthSnapshotId = table.currentSnapshot().snapshotId(); + Long fourthTimestamp = table.currentSnapshot().timestampMillis(); assertEquals( "Procedure output must match", - ImmutableList.of(row(fourthSnapshotId), row(secondSnapshotId), row(firstSnapshotId)), + ImmutableList.of(row(fourthSnapshotId, fourthTimestamp), row(secondSnapshotId, secondTimestamp), row( + firstSnapshotId, + firstTimestamp)), sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, fourthSnapshotId)); assertEquals( "Procedure output must match", - ImmutableList.of(row(thirdSnapshotId), row(secondSnapshotId), row(firstSnapshotId)), + ImmutableList.of(row(thirdSnapshotId, thirdTimestamp), row(secondSnapshotId, secondTimestamp), row( + firstSnapshotId, + firstTimestamp)), sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, thirdSnapshotId)); } @@ -122,10 +134,11 @@ public void testAncestorOfUsingNamedArgs() { Table table = validationCatalog.loadTable(tableIdent); Long firstSnapshotId = table.currentSnapshot().snapshotId(); + Long firstTimestamp = table.currentSnapshot().timestampMillis(); assertEquals( "Procedure output must match", - ImmutableList.of(row(firstSnapshotId)), + ImmutableList.of(row(firstSnapshotId, firstTimestamp)), sql("CALL %s.system.ancestors_of(snapshot_id => %dL, table => '%s')", catalogName, firstSnapshotId, tableIdent)); } diff --git a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java index ddde86c677b7..bec5ff7c36e2 100644 --- a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java +++ b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java @@ -40,7 +40,8 @@ public class AncestorsOfProcedure extends BaseProcedure { }; private static final StructType OUTPUT_TYPE = new StructType(new StructField[] { - new StructField("snapshot_id_new_to_old ↓", DataTypes.LongType, true, Metadata.empty()), + new StructField("snapshot_id", DataTypes.LongType, true, Metadata.empty()), + new StructField("timestamp", DataTypes.LongType, true, Metadata.empty()) }); private AncestorsOfProcedure(TableCatalog tableCatalog) { @@ -80,7 +81,7 @@ public InternalRow[] call(InternalRow args) { List snapshotIds = SnapshotUtil.snapshotIdsBetween(icebergTable, 0L, toSnapshotId); - return toOutPutRow(snapshotIds); + return toOutPutRow(icebergTable, snapshotIds); } @Override @@ -88,15 +89,15 @@ public String description() { return "AncestorsOf"; } - private InternalRow[] toOutPutRow(List snapshotIds) { + private InternalRow[] toOutPutRow(Table table, List snapshotIds) { if (snapshotIds.isEmpty()) { return new InternalRow[0]; } InternalRow[] internalRows = new InternalRow[snapshotIds.size()]; - internalRows[0] = newInternalRow(snapshotIds.get(0)); - for (int i = 1; i < snapshotIds.size(); i++) { - internalRows[i] = newInternalRow(snapshotIds.get(i)); + for (int i = 0; i < snapshotIds.size(); i++) { + internalRows[i] = newInternalRow(snapshotIds.get(i), + table.snapshot(snapshotIds.get(i)).timestampMillis()); } return internalRows; From 538b0f1ddc88d922a2fafba35fbbe2b309caf4e7 Mon Sep 17 00:00:00 2001 From: wulingqi Date: Fri, 22 Oct 2021 09:34:37 +0800 Subject: [PATCH 3/9] spark: update code style and som performance nits --- .../extensions/TestAncestorsOfProcedure.java | 22 ++++++++++++------- .../procedures/AncestorsOfProcedure.java | 9 ++++---- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java index 65e7efe8ec1f..2a325b3dfce8 100644 --- a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java +++ b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java @@ -59,7 +59,9 @@ public void testAncestorOfUsingEmptyArgs() { assertEquals( "Procedure output must match", - ImmutableList.of(row(currentSnapshotId, currentTimestamp), row(preSnapshotId, preTimeStamp)), + ImmutableList.of( + row(currentSnapshotId, currentTimestamp), + row(preSnapshotId, preTimeStamp)), output); } @@ -77,7 +79,9 @@ public void testAncestorOfUsingSnapshotId() { assertEquals( "Procedure output must match", - ImmutableList.of(row(currentSnapshotId, currentTimestamp), row(preSnapshotId, preTimeStamp)), + ImmutableList.of( + row(currentSnapshotId, currentTimestamp), + row(preSnapshotId, preTimeStamp)), sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, currentSnapshotId)); assertEquals( @@ -114,16 +118,18 @@ public void testAncestorOfWithRollBack() { assertEquals( "Procedure output must match", - ImmutableList.of(row(fourthSnapshotId, fourthTimestamp), row(secondSnapshotId, secondTimestamp), row( - firstSnapshotId, - firstTimestamp)), + ImmutableList.of( + row(fourthSnapshotId, fourthTimestamp), + row(secondSnapshotId, secondTimestamp), + row(firstSnapshotId, firstTimestamp)), sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, fourthSnapshotId)); assertEquals( "Procedure output must match", - ImmutableList.of(row(thirdSnapshotId, thirdTimestamp), row(secondSnapshotId, secondTimestamp), row( - firstSnapshotId, - firstTimestamp)), + ImmutableList.of( + row(thirdSnapshotId, thirdTimestamp), + row(secondSnapshotId, secondTimestamp), + row(firstSnapshotId, firstTimestamp)), sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, thirdSnapshotId)); } diff --git a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java index bec5ff7c36e2..389c032fde67 100644 --- a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java +++ b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java @@ -81,7 +81,7 @@ public InternalRow[] call(InternalRow args) { List snapshotIds = SnapshotUtil.snapshotIdsBetween(icebergTable, 0L, toSnapshotId); - return toOutPutRow(icebergTable, snapshotIds); + return toOutputRow(icebergTable, snapshotIds); } @Override @@ -89,15 +89,16 @@ public String description() { return "AncestorsOf"; } - private InternalRow[] toOutPutRow(Table table, List snapshotIds) { + private InternalRow[] toOutputRow(Table table, List snapshotIds) { if (snapshotIds.isEmpty()) { return new InternalRow[0]; } InternalRow[] internalRows = new InternalRow[snapshotIds.size()]; for (int i = 0; i < snapshotIds.size(); i++) { - internalRows[i] = newInternalRow(snapshotIds.get(i), - table.snapshot(snapshotIds.get(i)).timestampMillis()); + Long snapshotId = snapshotIds.get(i); + internalRows[i] = newInternalRow(snapshotId, + table.snapshot(snapshotId).timestampMillis()); } return internalRows; From 68536a734c365cf5b07be4719c8ff8b10f3a4d48 Mon Sep 17 00:00:00 2001 From: wulingqi Date: Sat, 23 Oct 2021 11:09:33 +0800 Subject: [PATCH 4/9] spark: update code style --- .../iceberg/spark/procedures/AncestorsOfProcedure.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java index 389c032fde67..d91276957d17 100644 --- a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java +++ b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java @@ -37,7 +37,7 @@ public class AncestorsOfProcedure extends BaseProcedure { private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", DataTypes.StringType), ProcedureParameter.optional("snapshot_id", DataTypes.LongType), - }; + }; private static final StructType OUTPUT_TYPE = new StructType(new StructField[] { new StructField("snapshot_id", DataTypes.LongType, true, Metadata.empty()), @@ -97,8 +97,7 @@ private InternalRow[] toOutputRow(Table table, List snapshotIds) { InternalRow[] internalRows = new InternalRow[snapshotIds.size()]; for (int i = 0; i < snapshotIds.size(); i++) { Long snapshotId = snapshotIds.get(i); - internalRows[i] = newInternalRow(snapshotId, - table.snapshot(snapshotId).timestampMillis()); + internalRows[i] = newInternalRow(snapshotId, table.snapshot(snapshotId).timestampMillis()); } return internalRows; From 226896921e123c0e22cea3a12604646d633244c2 Mon Sep 17 00:00:00 2001 From: wulingqi Date: Mon, 25 Oct 2021 23:06:45 +0800 Subject: [PATCH 5/9] doc: add the usage of ancestors_of to spark-procedures.md --- site/docs/spark-procedures.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md index 1aea34420dda..04473b1ce442 100644 --- a/site/docs/spark-procedures.md +++ b/site/docs/spark-procedures.md @@ -407,3 +407,36 @@ CALL spark_catalog.system.add_files( source_table => '`parquet`.`path/to/table`' ) ``` + +## `Metadata information` + +### `ancestors_of` + +Get the snapshot ancestors info by a particular snapshot + +#### Usage + +| Argument Name | Required? | Type | Description | +|---------------|-----------|------|-------------| +| `table` | ✔️ | string | Table which will get ancestors | +| `snapshot_id` | ✔️ | long | Snapshot ID to get ancestors | + +#### Output + +| Output Name | Type | Description | +| ------------|------|-------------| +| `snapshot_id` | long | the ancestor snapshot id | +| `timestamp` | long | timestamp | + +#### Examples + +Get all the snapshot ancestors of current snapshots(default) +```sql +CALL spark_catalog.system.ancestors_of('db.tbl') +``` + +Get all the snapshot ancestors by a particular snapshot +```sql +CALL spark_catalog.system.ancestors_of('db.tbl', 1) +CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl') +``` From 125a27254580a7d40b8477423df8d47571532444 Mon Sep 17 00:00:00 2001 From: wulingqi Date: Tue, 26 Oct 2021 09:43:38 +0800 Subject: [PATCH 6/9] doc: update desc for ancestors_of procedure --- site/docs/spark-procedures.md | 4 ++-- .../iceberg/spark/extensions/TestAncestorsOfProcedure.java | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md index 04473b1ce442..19375ff7545a 100644 --- a/site/docs/spark-procedures.md +++ b/site/docs/spark-procedures.md @@ -412,14 +412,14 @@ CALL spark_catalog.system.add_files( ### `ancestors_of` -Get the snapshot ancestors info by a particular snapshot +Report the live snapshot IDs of parents of a specified snapshot #### Usage | Argument Name | Required? | Type | Description | |---------------|-----------|------|-------------| | `table` | ✔️ | string | Table which will get ancestors | -| `snapshot_id` | ✔️ | long | Snapshot ID to get ancestors | +| `snapshot_id` | ️ | long | Snapshot ID to get ancestors | #### Output diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java index 2a325b3dfce8..baf464d94ad0 100644 --- a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java +++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java @@ -30,10 +30,7 @@ public class TestAncestorsOfProcedure extends SparkExtensionsTestBase { - public TestAncestorsOfProcedure( - String catalogName, - String implementation, - Map config) { + public TestAncestorsOfProcedure(String catalogName, String implementation, Map config) { super(catalogName, implementation, config); } From 9eab05f0cf62a40f0f1c149b43727129008d0778 Mon Sep 17 00:00:00 2001 From: wulingqi Date: Thu, 28 Oct 2021 09:34:38 +0800 Subject: [PATCH 7/9] doc: update desc for ancestors_of procedure --- site/docs/spark-procedures.md | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md index 19375ff7545a..ee67a6cd36d9 100644 --- a/site/docs/spark-procedures.md +++ b/site/docs/spark-procedures.md @@ -418,15 +418,28 @@ Report the live snapshot IDs of parents of a specified snapshot | Argument Name | Required? | Type | Description | |---------------|-----------|------|-------------| -| `table` | ✔️ | string | Table which will get ancestors | -| `snapshot_id` | ️ | long | Snapshot ID to get ancestors | +| `table` | ✔️ | string | Name of the table to report live snapshot IDs | +| `snapshot_id` | ️ | long | Use a specified snapshot to get the live snapshot IDs of parents | + +> tip : Using snapshot_id +> +> I have snapshots +> ```shell +> A -> B - > C -> (D) +> ``` +> I then roll back to B and add C' and D', So i have the following live snapshots +> ```shell +> A -> B - > C -> D +> \ -> C' -> (D') +> ``` +> Now I can specify id D to get the live snapshot IDs: A -> B -> C -> D #### Output | Output Name | Type | Description | | ------------|------|-------------| | `snapshot_id` | long | the ancestor snapshot id | -| `timestamp` | long | timestamp | +| `timestamp` | long | snapshot creation time | #### Examples From f08f5ef08dc2059230496f93163ff1138dd4c182 Mon Sep 17 00:00:00 2001 From: wulingqi Date: Fri, 29 Oct 2021 10:08:31 +0800 Subject: [PATCH 8/9] doc: update desc for ancestors_of procedure --- site/docs/spark-procedures.md | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md index ee67a6cd36d9..ea78760f0063 100644 --- a/site/docs/spark-procedures.md +++ b/site/docs/spark-procedures.md @@ -423,16 +423,13 @@ Report the live snapshot IDs of parents of a specified snapshot > tip : Using snapshot_id > -> I have snapshots -> ```shell -> A -> B - > C -> (D) -> ``` -> I then roll back to B and add C' and D', So i have the following live snapshots +> Given snapshots history with roll back to B and addition of C' -> D' > ```shell > A -> B - > C -> D > \ -> C' -> (D') > ``` -> Now I can specify id D to get the live snapshot IDs: A -> B -> C -> D +> Not specifying the snapshot ID would return A -> B -> C' -> D', while providing the snapshot ID of +> D as an argument would return A-> B -> C -> D #### Output From 9dccb9527dab8c43b2ac715923acf563a896d810 Mon Sep 17 00:00:00 2001 From: wulingqi Date: Tue, 2 Nov 2021 09:38:27 +0800 Subject: [PATCH 9/9] spark: add ancestors_of to spark3.2 moudle --- .../extensions/TestAncestorsOfProcedure.java | 163 ++++++++++++++++++ .../procedures/AncestorsOfProcedure.java | 105 +++++++++++ .../spark/procedures/SparkProcedures.java | 1 + 3 files changed, 269 insertions(+) create mode 100644 spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java create mode 100644 spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java new file mode 100644 index 000000000000..baf464d94ad0 --- /dev/null +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.AnalysisException; +import org.junit.After; +import org.junit.Test; + +public class TestAncestorsOfProcedure extends SparkExtensionsTestBase { + + public TestAncestorsOfProcedure(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testAncestorOfUsingEmptyArgs() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Long currentSnapshotId = table.currentSnapshot().snapshotId(); + Long currentTimestamp = table.currentSnapshot().timestampMillis(); + Long preSnapshotId = table.currentSnapshot().parentId(); + Long preTimeStamp = table.snapshot(table.currentSnapshot().parentId()).timestampMillis(); + + List output = sql("CALL %s.system.ancestors_of('%s')", + catalogName, tableIdent); + + assertEquals( + "Procedure output must match", + ImmutableList.of( + row(currentSnapshotId, currentTimestamp), + row(preSnapshotId, preTimeStamp)), + output); + } + + @Test + public void testAncestorOfUsingSnapshotId() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Long currentSnapshotId = table.currentSnapshot().snapshotId(); + Long currentTimestamp = table.currentSnapshot().timestampMillis(); + Long preSnapshotId = table.currentSnapshot().parentId(); + Long preTimeStamp = table.snapshot(table.currentSnapshot().parentId()).timestampMillis(); + + assertEquals( + "Procedure output must match", + ImmutableList.of( + row(currentSnapshotId, currentTimestamp), + row(preSnapshotId, preTimeStamp)), + sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, currentSnapshotId)); + + assertEquals( + "Procedure output must match", + ImmutableList.of(row(preSnapshotId, preTimeStamp)), + sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, preSnapshotId)); + } + + @Test + public void testAncestorOfWithRollBack() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + Table table = validationCatalog.loadTable(tableIdent); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + table.refresh(); + Long firstSnapshotId = table.currentSnapshot().snapshotId(); + Long firstTimestamp = table.currentSnapshot().timestampMillis(); + sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); + table.refresh(); + Long secondSnapshotId = table.currentSnapshot().snapshotId(); + Long secondTimestamp = table.currentSnapshot().timestampMillis(); + sql("INSERT INTO TABLE %s VALUES (3, 'c')", tableName); + table.refresh(); + Long thirdSnapshotId = table.currentSnapshot().snapshotId(); + Long thirdTimestamp = table.currentSnapshot().timestampMillis(); + + // roll back + sql("CALL %s.system.rollback_to_snapshot('%s', %dL)", + catalogName, tableIdent, secondSnapshotId); + + sql("INSERT INTO TABLE %s VALUES (4, 'd')", tableName); + table.refresh(); + Long fourthSnapshotId = table.currentSnapshot().snapshotId(); + Long fourthTimestamp = table.currentSnapshot().timestampMillis(); + + assertEquals( + "Procedure output must match", + ImmutableList.of( + row(fourthSnapshotId, fourthTimestamp), + row(secondSnapshotId, secondTimestamp), + row(firstSnapshotId, firstTimestamp)), + sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, fourthSnapshotId)); + + assertEquals( + "Procedure output must match", + ImmutableList.of( + row(thirdSnapshotId, thirdTimestamp), + row(secondSnapshotId, secondTimestamp), + row(firstSnapshotId, firstTimestamp)), + sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, thirdSnapshotId)); + } + + @Test + public void testAncestorOfUsingNamedArgs() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Long firstSnapshotId = table.currentSnapshot().snapshotId(); + Long firstTimestamp = table.currentSnapshot().timestampMillis(); + + assertEquals( + "Procedure output must match", + ImmutableList.of(row(firstSnapshotId, firstTimestamp)), + sql("CALL %s.system.ancestors_of(snapshot_id => %dL, table => '%s')", + catalogName, firstSnapshotId, tableIdent)); + } + + @Test + public void testInvalidAncestorOfCases() { + AssertHelpers.assertThrows("Should reject calls without all required args", + AnalysisException.class, "Missing required parameters", + () -> sql("CALL %s.system.ancestors_of()", catalogName)); + + AssertHelpers.assertThrows("Should reject calls with empty table identifier", + IllegalArgumentException.class, "Cannot handle an empty identifier for argument table", + () -> sql("CALL %s.system.ancestors_of('')", catalogName)); + + AssertHelpers.assertThrows("Should reject calls with invalid arg types", + AnalysisException.class, "Wrong arg type for snapshot_id: cannot cast", + () -> sql("CALL %s.system.ancestors_of('%s', 1.1)", catalogName, tableIdent)); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java new file mode 100644 index 000000000000..ff0e11baf65d --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.procedures; + +import java.util.List; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +public class AncestorsOfProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.optional("snapshot_id", DataTypes.LongType), + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[] { + new StructField("snapshot_id", DataTypes.LongType, true, Metadata.empty()), + new StructField("timestamp", DataTypes.LongType, true, Metadata.empty()) + }); + + private AncestorsOfProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + public static SparkProcedures.ProcedureBuilder builder() { + return new Builder() { + @Override + protected AncestorsOfProcedure doBuild() { + return new AncestorsOfProcedure(tableCatalog()); + } + }; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + Long toSnapshotId = args.isNullAt(1) ? null : args.getLong(1); + + SparkTable sparkTable = loadSparkTable(tableIdent); + Table icebergTable = sparkTable.table(); + + if (toSnapshotId == null) { + toSnapshotId = icebergTable.currentSnapshot() != null ? icebergTable.currentSnapshot().snapshotId() : -1; + } + + List snapshotIds = SnapshotUtil.snapshotIdsBetween(icebergTable, 0L, toSnapshotId); + + return toOutputRow(icebergTable, snapshotIds); + } + + @Override + public String description() { + return "AncestorsOf"; + } + + private InternalRow[] toOutputRow(Table table, List snapshotIds) { + if (snapshotIds.isEmpty()) { + return new InternalRow[0]; + } + + InternalRow[] internalRows = new InternalRow[snapshotIds.size()]; + for (int i = 0; i < snapshotIds.size(); i++) { + Long snapshotId = snapshotIds.get(i); + internalRows[i] = newInternalRow(snapshotId, table.snapshot(snapshotId).timestampMillis()); + } + + return internalRows; + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java index fc9d4ae9a348..42545abe11d2 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java @@ -51,6 +51,7 @@ private static Map> initProcedureBuilders() { mapBuilder.put("migrate", MigrateTableProcedure::builder); mapBuilder.put("snapshot", SnapshotTableProcedure::builder); mapBuilder.put("add_files", AddFilesProcedure::builder); + mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder); return mapBuilder.build(); }