diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java index f215aa033c5a..2e5e383baf42 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java @@ -18,18 +18,27 @@ */ package org.apache.iceberg.spark; +import org.apache.iceberg.spark.functions.SparkFunctions; import org.apache.iceberg.spark.procedures.SparkProcedures; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; import org.apache.iceberg.spark.source.HasIcebergCatalog; +import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.apache.spark.sql.connector.catalog.FunctionCatalog; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.StagingTableCatalog; import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.apache.spark.sql.connector.iceberg.catalog.Procedure; import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog; abstract class BaseCatalog - implements StagingTableCatalog, ProcedureCatalog, SupportsNamespaces, HasIcebergCatalog { + implements StagingTableCatalog, + ProcedureCatalog, + SupportsNamespaces, + HasIcebergCatalog, + FunctionCatalog { @Override public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException { @@ -38,7 +47,7 @@ public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException // namespace resolution is case insensitive until we have a way to configure case sensitivity in // catalogs - if (namespace.length == 1 && namespace[0].equalsIgnoreCase("system")) { + if (isSystemNamespace(namespace)) { ProcedureBuilder builder = SparkProcedures.newBuilder(name); if (builder != null) { return builder.withTableCatalog(this).build(); @@ -47,4 +56,40 @@ public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException throw new NoSuchProcedureException(ident); } + + @Override + public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { + if (namespace.length == 0 || isSystemNamespace(namespace)) { + return SparkFunctions.list().stream() + .map(name -> Identifier.of(namespace, name)) + .toArray(Identifier[]::new); + } else if (namespaceExists(namespace)) { + return new Identifier[0]; + } + + throw new NoSuchNamespaceException(namespace); + } + + @Override + public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { + String[] namespace = ident.namespace(); + String name = ident.name(); + + // Allow for empty namespace, as Spark's storage partitioned joins look up + // the corresponding functions to generate transforms for partitioning + // with an empty namespace, such as `bucket`. + // Otherwise, use `system` namespace. + if (namespace.length == 0 || isSystemNamespace(namespace)) { + UnboundFunction func = SparkFunctions.load(name); + if (func != null) { + return func; + } + } + + throw new NoSuchFunctionException(ident); + } + + private static boolean isSystemNamespace(String[] namespace) { + return namespace.length == 1 && namespace[0].equalsIgnoreCase("system"); + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java index ebf12cb2c22e..444244e38b42 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java @@ -27,7 +27,6 @@ import org.apache.iceberg.spark.source.HasIcebergCatalog; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; -import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; @@ -42,7 +41,6 @@ import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.TableChange; -import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -379,14 +377,4 @@ public Catalog icebergCatalog() { "Cannot return underlying Iceberg Catalog, wrapped catalog does not contain an Iceberg Catalog"); return ((HasIcebergCatalog) icebergCatalog).icebergCatalog(); } - - @Override - public Identifier[] listFunctions(String[] namespace) { - return new Identifier[0]; - } - - @Override - public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { - throw new NoSuchFunctionException(ident); - } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java new file mode 100644 index 000000000000..9cd059377ce3 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.functions; + +import org.apache.iceberg.IcebergBuild; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.functions.BoundFunction; +import org.apache.spark.sql.connector.catalog.functions.ScalarFunction; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A function for use in SQL that returns the current Iceberg version, e.g. {@code SELECT + * system.iceberg_version()} will return a String such as "0.14.0" or "0.15.0-SNAPSHOT" + */ +public class IcebergVersionFunction implements UnboundFunction { + @Override + public BoundFunction bind(StructType inputType) { + if (inputType.fields().length > 0) { + throw new UnsupportedOperationException( + String.format("Cannot bind: %s does not accept arguments", name())); + } + + return new IcebergVersionFunctionImpl(); + } + + @Override + public String description() { + return name() + " - Returns the runtime Iceberg version"; + } + + @Override + public String name() { + return "iceberg_version"; + } + + // Implementing class cannot be private, otherwise Spark is unable to access the static invoke + // function during code-gen and calling the function fails + static class IcebergVersionFunctionImpl implements ScalarFunction { + private static final UTF8String VERSION = UTF8String.fromString(IcebergBuild.version()); + + // magic function used in code-gen. must be named `invoke`. + public static UTF8String invoke() { + return VERSION; + } + + @Override + public DataType[] inputTypes() { + return new DataType[0]; + } + + @Override + public DataType resultType() { + return DataTypes.StringType; + } + + @Override + public boolean isResultNullable() { + return false; + } + + @Override + public String canonicalName() { + return "iceberg." + name(); + } + + @Override + public String name() { + return "iceberg_version"; + } + + @Override + public UTF8String produceResult(InternalRow input) { + return invoke(); + } + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/SparkFunctions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/SparkFunctions.java new file mode 100644 index 000000000000..44e5db51c7bb --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/SparkFunctions.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.functions; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; + +public class SparkFunctions { + + private SparkFunctions() {} + + private static final Map FUNCTIONS = + ImmutableMap.of("iceberg_version", new IcebergVersionFunction()); + + private static final List FUNCTION_NAMES = ImmutableList.copyOf(FUNCTIONS.keySet()); + + // Functions that are added to all Iceberg catalogs should be accessed with the `system` + // namespace. They can also be accessed with no namespace at all if qualified with the + // catalog name, e.g. my_hadoop_catalog.iceberg_version(). + // As namespace resolution is handled by those rules in BaseCatalog, a list of names + // alone is returned. + public static List list() { + return FUNCTION_NAMES; + } + + public static UnboundFunction load(String name) { + // function resolution is case-insensitive to match the existing Spark behavior for functions + return FUNCTIONS.get(name.toLowerCase(Locale.ROOT)); + } +} diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java new file mode 100644 index 000000000000..9dcd59cb4f11 --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.functions.IcebergVersionFunction; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.connector.catalog.FunctionCatalog; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestFunctionCatalog extends SparkTestBaseWithCatalog { + private static final String[] EMPTY_NAMESPACE = new String[] {}; + private static final String[] SYSTEM_NAMESPACE = new String[] {"system"}; + private static final String[] DEFAULT_NAMESPACE = new String[] {"default"}; + private static final String[] DB_NAMESPACE = new String[] {"db"}; + private final FunctionCatalog asFunctionCatalog; + + public TestFunctionCatalog() { + this.asFunctionCatalog = castToFunctionCatalog(catalogName); + } + + @Before + public void createDefaultNamespace() { + sql("CREATE NAMESPACE IF NOT EXISTS %s", catalogName + ".default"); + } + + @After + public void dropDefaultNamespace() { + sql("DROP NAMESPACE IF EXISTS %s", catalogName + ".default"); + } + + @Test + public void testListFunctionsViaCatalog() throws NoSuchNamespaceException { + Assertions.assertThat(asFunctionCatalog.listFunctions(EMPTY_NAMESPACE)) + .anyMatch(func -> "iceberg_version".equals(func.name())); + + Assertions.assertThat(asFunctionCatalog.listFunctions(SYSTEM_NAMESPACE)) + .anyMatch(func -> "iceberg_version".equals(func.name())); + + Assert.assertArrayEquals( + "Listing functions in an existing namespace that's not system should not throw", + new Identifier[0], + asFunctionCatalog.listFunctions(DEFAULT_NAMESPACE)); + + AssertHelpers.assertThrows( + "Listing functions in a namespace that does not exist should throw", + NoSuchNamespaceException.class, + "Namespace 'db' not found", + () -> asFunctionCatalog.listFunctions(DB_NAMESPACE)); + } + + @Test + public void testLoadFunctions() throws NoSuchFunctionException { + for (String[] namespace : ImmutableList.of(EMPTY_NAMESPACE, SYSTEM_NAMESPACE)) { + Identifier identifier = Identifier.of(namespace, "iceberg_version"); + UnboundFunction func = asFunctionCatalog.loadFunction(identifier); + + Assertions.assertThat(func) + .isNotNull() + .isInstanceOf(UnboundFunction.class) + .isExactlyInstanceOf(IcebergVersionFunction.class); + } + + AssertHelpers.assertThrows( + "Cannot load a function if it's not used with the system namespace or the empty namespace", + NoSuchFunctionException.class, + "Undefined function: default.iceberg_version", + () -> asFunctionCatalog.loadFunction(Identifier.of(DEFAULT_NAMESPACE, "iceberg_version"))); + + Identifier undefinedFunction = Identifier.of(SYSTEM_NAMESPACE, "undefined_function"); + AssertHelpers.assertThrows( + "Cannot load a function that does not exist", + NoSuchFunctionException.class, + "Undefined function: system.undefined_function", + () -> asFunctionCatalog.loadFunction(undefinedFunction)); + + AssertHelpers.assertThrows( + "Using an undefined function from SQL should fail analysis", + AnalysisException.class, + "Undefined function", + () -> sql("SELECT undefined_function(1, 2)")); + } + + @Test + public void testCallingFunctionInSQLEndToEnd() { + String buildVersion = IcebergBuild.version(); + + Assert.assertEquals( + "Should be able to use the Iceberg version function from the fully qualified system namespace", + buildVersion, + scalarSql("SELECT %s.system.iceberg_version()", catalogName)); + + Assert.assertEquals( + "Should be able to use the Iceberg version function when fully qualified without specifying a namespace", + buildVersion, + scalarSql("SELECT %s.iceberg_version()", catalogName)); + + sql("USE %s", catalogName); + + Assert.assertEquals( + "Should be able to call iceberg_version from system namespace without fully qualified name when using Iceberg catalog", + buildVersion, + scalarSql("SELECT system.iceberg_version()")); + + Assert.assertEquals( + "Should be able to call iceberg_version from empty namespace without fully qualified name when using Iceberg catalog", + buildVersion, + scalarSql("SELECT iceberg_version()")); + } + + private FunctionCatalog castToFunctionCatalog(String name) { + return (FunctionCatalog) spark.sessionState().catalogManager().catalog(name); + } +}