diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala index 1f0e164d8467..98361b8dadb6 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala @@ -24,10 +24,12 @@ import org.apache.spark.sql.catalyst.expressions.BinaryComparison import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.plans.logical.Filter +import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.BINARY_COMPARISON import org.apache.spark.sql.catalyst.trees.TreePattern.FILTER +import org.apache.spark.sql.catalyst.trees.TreePattern.JOIN import org.apache.spark.sql.connector.catalog.functions.ScalarFunction import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType @@ -39,22 +41,31 @@ import org.apache.spark.sql.types.StructType */ object ReplaceStaticInvoke extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = - plan.transformWithPruning (_.containsAllPatterns(BINARY_COMPARISON, FILTER)) { - case filter @ Filter(condition, _) => - val newCondition = condition.transformWithPruning(_.containsPattern(BINARY_COMPARISON)) { - case c @ BinaryComparison(left: StaticInvoke, right) if canReplace(left) && right.foldable => - c.withNewChildren(Seq(replaceStaticInvoke(left), right)) + private val rule:PartialFunction[Expression, Expression] = { + case c@BinaryComparison(left: StaticInvoke, right) if canReplace(left) && right.foldable => + c.withNewChildren(Seq(replaceStaticInvoke(left), right)) - case c @ BinaryComparison(left, right: StaticInvoke) if canReplace(right) && left.foldable => - c.withNewChildren(Seq(left, replaceStaticInvoke(right))) - } + case c@BinaryComparison(left, right: StaticInvoke) if canReplace(right) && left.foldable => + c.withNewChildren(Seq(left, replaceStaticInvoke(right))) + } + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.transformWithPruning(_.containsAnyPattern(FILTER, JOIN)) { + case filter @ Filter(condition, _) => + val newCondition = condition.transformWithPruning(_.containsPattern(BINARY_COMPARISON))(rule) if (newCondition fastEquals condition) { filter } else { filter.copy(condition = newCondition) } + case j @ Join(_, _, _, Some(condition), _) => + val newCondition = condition.transformWithPruning(_.containsPattern(BINARY_COMPARISON))(rule) + if (newCondition fastEquals condition) { + j + } else { + j.copy(condition = Some(newCondition)) + } + } } private def replaceStaticInvoke(invoke: StaticInvoke): Expression = { diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSPJWithBucketing.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSPJWithBucketing.java new file mode 100644 index 000000000000..af332a63bccd --- /dev/null +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSPJWithBucketing.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.Map; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestSPJWithBucketing extends SparkExtensionsTestBase { + + @Test + public void testMergeSPJwithCondition() { + testWithCondition( + " AND (" + + "(t.year_month='202306' AND t.day='01' AND testhive.system.bucket(4, t.id) = 0) OR\n" + + "(t.year_month='202306' AND t.day='01' AND testhive.system.bucket(4, t.id) = 1) OR\n" + + "(t.year_month='202306' AND t.day='02' AND testhive.system.bucket(4, t.id) = 0) OR\n" + + "(t.year_month='202307' AND t.day='01' AND testhive.system.bucket(4, t.id) = 3)\n" + + ")"); + } + + @Test + public void testMergeSPJwithoutCondition() { + testWithCondition(""); + } + + private void testWithCondition(String condition) { + createPartitionedTable(spark, targetTableName); + insertRecords(spark, targetTableName); + createPartitionedTable(spark, sourceTableName); + insertRecordsToUpdate(spark, sourceTableName); + int tasks = + executeAndCountTasks( + spark, + (s) -> + withSQLConf( + ENABLED_SPJ_SQL_CONF, + () -> + sql( + s, + "MERGE INTO %s t USING (SELECT * FROM %s) s \n" + + "ON t.id = s.id AND t.year_month = s.year_month AND t.day = s.day\n" + + "%s\n" + + "WHEN MATCHED THEN UPDATE SET\n" + + " t.data = s.data\n" + + "WHEN NOT MATCHED THEN INSERT *", + targetTableName, + sourceTableName, + condition))); + long affectedPartitions = + sql( + spark, + "SELECT DISTINCT(t.year_month, t.day, testhive.system.bucket(4, t.id)) FROM %s t WHERE 1=1 %s", + targetTableName, + condition) + .count(); + int shufflePartitions = Integer.parseInt(spark.conf().get("spark.sql.shuffle.partitions")); + Assertions.assertThat(tasks).isEqualTo(affectedPartitions * 2 + shufflePartitions); + } + + private final String sourceTableName; + private final String targetTableName; + + public TestSPJWithBucketing( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + sourceTableName = tablePrefix() + ".source"; + targetTableName = tablePrefix() + ".target"; + } + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties(), + }, + }; + } + + private static Dataset sql(SparkSession sparkSession, String sqlFormat, Object... args) { + return sparkSession.sql(String.format(sqlFormat, args)); + } + + private static void createTable(SparkSession spark, String tableName, String partitionCol) { + sql( + spark, + "CREATE TABLE %s (id STRING, year_month STRING, day STRING, data STRING) USING iceberg PARTITIONED BY (%s)", + tableName, + partitionCol); + sql( + spark, + "ALTER TABLE %s SET TBLPROPERTIES ('write.merge.distribution-mode'='none')", + tableName); + } + + private static void insertRecords(SparkSession sparkSession, String tableName) { + sql( + spark, + "INSERT INTO TABLE %s VALUES %s", + tableName, + String.join( + ", ", + "('3', '202306', '01', 'data-0')", // 202306/01/0 + "('9', '202306', '01', 'data-0')", // 202306/01/1 + "('11', '202306', '01', 'data-0')", // 202306/01/2 + "('0', '202306', '01', 'data-0')", // 202306/01/3 + "('3', '202306', '02', 'data-0')", // 202306/02/0 + "('9', '202306', '02', 'data-0')", // 202306/02/1 + "('0', '202307', '01', 'data-0')" // 202307/01/3 + )); + } + + private static void insertRecordsToUpdate(SparkSession sparkSession, String tableName) { + sql( + spark, + "INSERT INTO TABLE %s VALUES %s", + tableName, + String.join( + ", ", + "('3', '202306', '01', 'data-1')", // 202306/01/0 + "('9', '202306', '01', 'data-1')", // 202306/01/1 + "('3', '202306', '02', 'data-1')", // 202306/02/0 + "('0', '202307', '01', 'data-1')" // 202307/01/3 + )); + } + + private static void createPartitionedTable(SparkSession spark, String tableName) { + createTable(spark, tableName, "year_month, day, bucket(4, id)"); + } + + @Before + public void before() { + sql("USE %s", catalogName); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tablePrefix() + ".source"); + sql("DROP TABLE IF EXISTS %s", tablePrefix() + ".target"); + } + + private String tablePrefix() { + return (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default"; + } + + private static final Map ENABLED_SPJ_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "true", + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED().key(), + "true", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + public static int executeAndCountTasks(SparkSession spark, Consumer f) { + + CountTaskListener listener = new CountTaskListener(); + spark.sparkContext().addSparkListener(listener); + + f.accept(spark); + + try { + spark.sparkContext().listenerBus().waitUntilEmpty(); + } catch (TimeoutException e) { + throw new RuntimeException("Timeout while waiting for processing events", e); + } + + return listener.getTaskCount(); + } + + private static class CountTaskListener extends SparkListener { + private final AtomicInteger tasks = new AtomicInteger(0); + + @Override + public void onTaskEnd(SparkListenerTaskEnd taskEnd) { + tasks.incrementAndGet(); + } + + public int getTaskCount() { + return tasks.get(); + } + } +} diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownDQL.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownDQL.java index 7f2857cce0b9..9048320ff09e 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownDQL.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownDQL.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.apache.iceberg.expressions.ExpressionUtil; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.source.PlanUtils; @@ -54,6 +55,8 @@ import org.junit.Before; import org.junit.Test; import org.junit.runners.Parameterized; +import scala.PartialFunction; +import scala.collection.JavaConverters; public class TestSystemFunctionPushDownDQL extends SparkExtensionsTestBase { public TestSystemFunctionPushDownDQL( @@ -264,6 +267,52 @@ public void testTruncateFunctionOnPartitionedTable() { testTruncateFunction(true); } + @Test + public void testBucketStringFunctionMergeOnMoRPartitionedTable() { + testBucketStringFunctionJoinOnPartitionedTable("mor"); + } + + @Test + public void testBucketStringFunctionMergeOnCoWPartitionedTable() { + testBucketStringFunctionJoinOnPartitionedTable("cow"); + } + + private void testBucketStringFunctionJoinOnPartitionedTable(String mergeMode) { + int target = 1; + createPartitionedTable(spark, tableName, "bucket(5, data)"); + if (mergeMode.equals("mor")) { + spark.sql( + String.format( + "ALTER TABLE %s SET TBLPROPERTIES ('write.merge.mode'='merge-on-read')", tableName)); + } + String query = + String.format( + "SELECT * FROM %s s1 FULL OUTER JOIN %s s2 ON s1.data = s2.data and system.bucket(5, s1.data) = %d", + tableName, tableName, target); + Dataset df = spark.sql(query); + + LogicalPlan plan = df.queryExecution().optimizedPlan(); + Stream expressions = JavaConverters.asJavaCollection(plan.expressions()).stream(); + Stream numOfStaticInvokes = + expressions.flatMap( + e -> + JavaConverters.asJavaCollection( + e.collect( + new PartialFunction() { + @Override + public boolean isDefinedAt(Expression x) { + return x instanceof StaticInvoke; + } + + @Override + public StaticInvoke apply(Expression v1) { + return (StaticInvoke) v1; + } + })) + .stream()); + Assertions.assertThat(numOfStaticInvokes.count()).isZero(); + } + private void testTruncateFunction(boolean partitioned) { String target = "data"; String query = diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSPJWithBucketing.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSPJWithBucketing.java new file mode 100644 index 000000000000..adcf1ff60423 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSPJWithBucketing.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.Map; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestSPJWithBucketing extends ExtensionsTestBase { + + private String sourceTableName() { + return tablePrefix() + ".source"; + } + + private String targetTableName() { + return tablePrefix() + ".target"; + } + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties(), + }, + }; + } + + @BeforeEach + public void before() { + super.before(); + sql("USE %s", catalogName); + } + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", targetTableName()); + sql("DROP TABLE IF EXISTS %s", sourceTableName()); + } + + @TestTemplate + public void testMergeSPJwithCondition() { + testWithCondition( + " AND (" + + "(t.year_month='202306' AND t.day='01' AND testhive.system.bucket(4, t.id) = 0) OR\n" + + "(t.year_month='202306' AND t.day='01' AND testhive.system.bucket(4, t.id) = 1) OR\n" + + "(t.year_month='202306' AND t.day='02' AND testhive.system.bucket(4, t.id) = 0) OR\n" + + "(t.year_month='202307' AND t.day='01' AND testhive.system.bucket(4, t.id) = 3)\n" + + ")"); + } + + @TestTemplate + public void testMergeSPJwithoutCondition() { + testWithCondition(""); + } + + private void testWithCondition(String condition) { + createPartitionedTable(targetTableName()); + insertRecords(targetTableName()); + createPartitionedTable(sourceTableName()); + insertRecordsToUpdate(sourceTableName()); + int tasks = + executeAndCountTasks( + spark, + (s) -> + withSQLConf( + ENABLED_SPJ_SQL_CONF, + () -> + // id STRING, year_month STRING, day STRING, data STRING + sql( + s, + "MERGE INTO %s t USING (SELECT * FROM %s) s \n" + + "ON t.id = s.id AND t.year_month = s.year_month AND t.day = s.day\n" + + "%s\n" + + "WHEN MATCHED THEN UPDATE SET\n" + + " t.data = s.data\n" + + "WHEN NOT MATCHED THEN INSERT *", + targetTableName(), + sourceTableName(), + condition))); + long affectedPartitions = + sql( + spark, + "SELECT DISTINCT(t.year_month, t.day, testhive.system.bucket(4, t.id)) FROM %s t WHERE 1=1 %s", + targetTableName(), + condition) + .count(); + int shufflePartitions = Integer.parseInt(spark.conf().get("spark.sql.shuffle.partitions")); + Assertions.assertThat(tasks).isEqualTo(affectedPartitions * 2 + shufflePartitions); + } + + private static Dataset sql(SparkSession sparkSession, String sqlFormat, Object... args) { + return sparkSession.sql(String.format(sqlFormat, args)); + } + + private static void createTable(String tName, String partitionCol) { + sql( + spark, + "CREATE TABLE %s (id STRING, year_month STRING, day STRING, data STRING) USING iceberg PARTITIONED BY (%s)", + tName, + partitionCol); + sql(spark, "ALTER TABLE %s SET TBLPROPERTIES ('write.merge.distribution-mode'='none')", tName); + } + + private static void insertRecords(String tName) { + sql( + spark, + "INSERT INTO TABLE %s VALUES %s", + tName, + String.join( + ", ", + "('3', '202306', '01', 'data-0')", // 202306/01/0 + "('9', '202306', '01', 'data-0')", // 202306/01/1 + "('11', '202306', '01', 'data-0')", // 202306/01/2 + "('0', '202306', '01', 'data-0')", // 202306/01/3 + "('3', '202306', '02', 'data-0')", // 202306/02/0 + "('9', '202306', '02', 'data-0')", // 202306/02/1 + "('0', '202307', '01', 'data-0')" // 202307/01/3 + )); + } + + private static void insertRecordsToUpdate(String tName) { + sql( + spark, + "INSERT INTO TABLE %s VALUES %s", + tName, + String.join( + ", ", + "('3', '202306', '01', 'data-1')", // 202306/01/0 + "('9', '202306', '01', 'data-1')", // 202306/01/1 + "('3', '202306', '02', 'data-1')", // 202306/02/0 + "('0', '202307', '01', 'data-1')" // 202307/01/3 + )); + } + + private static void createPartitionedTable(String tName) { + createTable(tName, "year_month, day, bucket(4, id)"); + } + + private String tablePrefix() { + return (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default"; + } + + private static final Map ENABLED_SPJ_SQL_CONF = + ImmutableMap.of( + SQLConf.V2_BUCKETING_ENABLED().key(), + "true", + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED().key(), + "true", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), + "false", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), + "-1", + SparkSQLProperties.PRESERVE_DATA_GROUPING, + "true"); + + public static int executeAndCountTasks(SparkSession spark, Consumer f) { + + CountTaskListener listener = new CountTaskListener(); + spark.sparkContext().addSparkListener(listener); + + f.accept(spark); + + try { + spark.sparkContext().listenerBus().waitUntilEmpty(); + } catch (TimeoutException e) { + throw new RuntimeException("Timeout while waiting for processing events", e); + } + + return listener.getTaskCount(); + } + + private static class CountTaskListener extends SparkListener { + private final AtomicInteger tasks = new AtomicInteger(0); + + @Override + public void onTaskEnd(SparkListenerTaskEnd taskEnd) { + tasks.incrementAndGet(); + } + + public int getTaskCount() { + return tasks.get(); + } + } +} diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownDQL.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownDQL.java index f6102bab69b0..9b94a974f479 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownDQL.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownDQL.java @@ -40,6 +40,8 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.List; +import java.util.stream.Stream; + import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.expressions.ExpressionUtil; @@ -55,6 +57,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; +import scala.PartialFunction; +import scala.collection.JavaConverters; @ExtendWith(ParameterizedTestExtension.class) public class TestSystemFunctionPushDownDQL extends ExtensionsTestBase { @@ -263,6 +267,49 @@ public void testTruncateFunctionOnPartitionedTable() { testTruncateFunction(true); } + @TestTemplate + public void testBucketStringFunctionFullOuterJoinUnpartitionedTable() { + createUnpartitionedTable(spark, tableName); + testBucketStringFunctionFullOuterJoin(); + } + + @TestTemplate + public void testBucketStringFunctionFullOuterJoinPartitionedTable() { + createPartitionedTable(spark, tableName, "bucket(5, data)"); + testBucketStringFunctionFullOuterJoin(); + } + + private void testBucketStringFunctionFullOuterJoin() { + int target = 1; + String query = + String.format( + "SELECT * FROM %s s1 FULL OUTER JOIN %s s2 ON s1.data = s2.data and system.bucket(5, s1.data) = %d", + tableName, tableName, target); + Dataset df = spark.sql(query); + + LogicalPlan optimizedPlan = df.queryExecution().optimizedPlan(); + Stream expressions = + JavaConverters.asJavaCollection(optimizedPlan.expressions()).stream(); + Stream numOfStaticInvokes = + expressions.flatMap( + e -> + JavaConverters.asJavaCollection( + e.collect( + new PartialFunction() { + @Override + public boolean isDefinedAt(Expression x) { + return x instanceof StaticInvoke; + } + + @Override + public StaticInvoke apply(Expression v1) { + return (StaticInvoke) v1; + } + })) + .stream()); + assertThat(numOfStaticInvokes.count()).isZero(); + } + private void testTruncateFunction(boolean partitioned) { String target = "data"; String query =