diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index cfa390221ecc..38e0a929f405 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -20,11 +20,13 @@ package org.apache.iceberg.spark.source; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Partitioning; import org.apache.iceberg.Schema; @@ -42,17 +44,25 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import org.apache.spark.sql.connector.catalog.MetadataColumn; import org.apache.spark.sql.connector.catalog.SupportsDelete; import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; +import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement; import org.apache.spark.sql.connector.catalog.SupportsRead; import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.catalog.TableCapability; @@ -72,7 +82,8 @@ import org.slf4j.LoggerFactory; public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, - SupportsRead, SupportsWrite, SupportsDelete, SupportsRowLevelOperations, SupportsMetadataColumns { + SupportsRead, SupportsWrite, SupportsDelete, SupportsRowLevelOperations, SupportsMetadataColumns, + SupportsPartitionManagement { private static final Logger LOG = LoggerFactory.getLogger(SparkTable.class); @@ -283,6 +294,65 @@ public void deleteWhere(Filter[] filters) { } } + @Override + public StructType partitionSchema() { + return (StructType) SparkSchemaUtil.convert(Partitioning.partitionType(table())); + } + + @Override + public void createPartition(InternalRow ident, Map properties) throws UnsupportedOperationException { + throw new UnsupportedOperationException("Cannot explicitly create partitions in Iceberg tables"); + } + + @Override + public boolean dropPartition(InternalRow ident) { + throw new UnsupportedOperationException("Cannot explicitly drop partitions in Iceberg tables"); + } + + @Override + public void replacePartitionMetadata(InternalRow ident, Map properties) + throws UnsupportedOperationException { + throw new UnsupportedOperationException("Iceberg partitions do not support metadata"); + } + + @Override + public Map loadPartitionMetadata(InternalRow ident) throws UnsupportedOperationException { + throw new UnsupportedOperationException("Iceberg partitions do not support metadata"); + } + + @Override + public InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident) { + // support show partitions + List rows = Lists.newArrayList(); + Dataset df = SparkTableUtil.loadMetadataTable(sparkSession(), icebergTable, MetadataTableType.PARTITIONS); + if (names.length > 0) { + StructType schema = partitionSchema(); + df.collectAsList().forEach(row -> { + GenericRowWithSchema genericRow = (GenericRowWithSchema) row.apply(0); + boolean exits = true; + int index = 0; + while (index < names.length) { + DataType dataType = schema.apply(names[index]).dataType(); + int fieldIndex = schema.fieldIndex(names[index]); + if (!genericRow.values()[fieldIndex].equals(ident.get(index, dataType))) { + exits = false; + break; + } + index += 1; + } + if (exits) { + rows.add(new GenericInternalRow(genericRow.values())); + } + }); + } else { + df.collectAsList().forEach(row -> { + GenericRowWithSchema genericRow = (GenericRowWithSchema) row.apply(0); + rows.add(new GenericInternalRow(genericRow.values())); + }); + } + return rows.toArray(new InternalRow[0]); + } + @Override public String toString() { return icebergTable.toString(); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java index 9223797ada32..7d015a936894 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.spark.source.SimpleRecord; @@ -176,4 +177,27 @@ public void testViewsReturnRecentResults() { ImmutableList.of(row(1L, "a"), row(1L, "a")), sql("SELECT * FROM tmp")); } + + @Test + public void testAddPartition() { + // only check V2 command [IF NOT EXISTS] syntax + AssertHelpers.assertThrows("Cannot explicitly create partitions in Iceberg tables", + UnsupportedOperationException.class, + () -> sql("ALTER TABLE %s ADD IF NOT EXISTS PARTITION (id_trunc=2)", tableName)); + } + + @Test + public void testDropPartition() { + // only check V2 command [IF EXISTS] syntax + AssertHelpers.assertThrows("Cannot explicitly drop partitions in Iceberg tables", + UnsupportedOperationException.class, + () -> sql("ALTER TABLE %s DROP IF EXISTS PARTITION (id_trunc=0)", tableName)); + } + + @Test + public void testShowPartitions() { + Assert.assertEquals("Should have 2 rows", 2L, sql("SHOW PARTITIONS %s", tableName).size()); + Assert.assertEquals("Should have 1 row", 1L, + sql("SHOW PARTITIONS %s PARTITION (id_trunc=0)", tableName).size()); + } }