diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 44aca898b696..78fe78742b45 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -245,6 +245,49 @@ public void testRewriteDataFilesWithFilter() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @Test + public void testRewriteDataFilesWithDeterministicTrueFilter() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + // select all 10 files for compaction + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')", + catalogName, tableIdent); + assertEquals( + "Action should rewrite 10 data files and add 1 data files", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDataFilesWithDeterministicFalseFilter() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + // select no files for compaction + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')", + catalogName, tableIdent); + assertEquals( + "Action should rewrite 0 data files and add 0 data files", + row(0, 0), + Arrays.copyOf(output.get(0), 2)); + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + } + @Test public void testRewriteDataFilesWithFilterOnPartitionTable() { createPartitionTable(); diff --git a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala index 554fa7f66dc6..9f53eae60aba 100644 --- a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala +++ b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala @@ -24,8 +24,10 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation object SparkExpressionConverter { @@ -44,6 +46,8 @@ object SparkExpressionConverter { val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan optimizedLogicalPlan.collectFirst { case filter: Filter => filter.condition + case dummyRelation: DummyRelation => Literal.TrueLiteral + case localRelation: LocalRelation => Literal.FalseLiteral }.getOrElse(throw new AnalysisException("Failed to find filter expression")) } diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 3c07e676a53b..3ed47d54d374 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -253,6 +253,49 @@ public void testRewriteDataFilesWithFilter() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @Test + public void testRewriteDataFilesWithDeterministicTrueFilter() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + // select all 10 files for compaction + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')", + catalogName, tableIdent); + assertEquals( + "Action should rewrite 10 data files and add 1 data files", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(4); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + } + + @Test + public void testRewriteDataFilesWithDeterministicFalseFilter() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + // select no files for compaction + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')", + catalogName, tableIdent); + assertEquals( + "Action should rewrite 0 data files and add 0 data files", + row(0, 0), + Arrays.copyOf(output.get(0), 2)); + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + } + @Test public void testRewriteDataFilesWithFilterOnPartitionTable() { createPartitionTable(); diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala index 554fa7f66dc6..9f53eae60aba 100644 --- a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala +++ b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala @@ -24,8 +24,10 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation object SparkExpressionConverter { @@ -44,6 +46,8 @@ object SparkExpressionConverter { val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan optimizedLogicalPlan.collectFirst { case filter: Filter => filter.condition + case dummyRelation: DummyRelation => Literal.TrueLiteral + case localRelation: LocalRelation => Literal.FalseLiteral }.getOrElse(throw new AnalysisException("Failed to find filter expression")) }