diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala index c1521ebfe7..842ef9b395 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala @@ -50,7 +50,8 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl override def loadTable(ident: Identifier): Table = { try { val tablePath = toTablePath(ident) - SparkTable(tablePath, admin.getTableInfo(tablePath).get(), flussConfig) + val table = new SparkTable(tablePath, admin.getTableInfo(tablePath).get(), flussConfig, admin) + table } catch { case e: ExecutionException if e.getCause.isInstanceOf[TableNotExistException] => throw new NoSuchTableException(ident) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala index 6d31f6fbcd..93c51d1e0b 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala @@ -17,6 +17,7 @@ package org.apache.fluss.spark +import org.apache.fluss.client.admin.Admin import org.apache.fluss.config.{Configuration => FlussConfiguration} import org.apache.fluss.metadata.{TableInfo, TablePath} import org.apache.fluss.spark.catalog.{AbstractSparkTable, SupportsFlussPartitionManagement} @@ -25,8 +26,12 @@ import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, FlussUpsertWriteBu import org.apache.spark.sql.connector.catalog.SupportsWrite import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -case class SparkTable(tablePath: TablePath, tableInfo: TableInfo, flussConfig: FlussConfiguration) - extends AbstractSparkTable(tableInfo) +class SparkTable( + tablePath: TablePath, + tableInfo: TableInfo, + flussConfig: FlussConfiguration, + admin: Admin) + extends AbstractSparkTable(admin, tableInfo) with SupportsFlussPartitionManagement with SupportsWrite { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala index dc03e94b44..33f73f600e 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala @@ -17,7 +17,9 @@ package org.apache.fluss.spark.catalog -import org.apache.fluss.metadata.TableInfo +import org.apache.fluss.client.admin.Admin +import org.apache.fluss.config.{Configuration => FlussConfiguration} +import org.apache.fluss.metadata.{TableInfo, TablePath} import org.apache.fluss.spark.SparkConversions import org.apache.spark.sql.CatalogV2UtilShim @@ -29,8 +31,7 @@ import java.util import scala.collection.JavaConverters._ -abstract class AbstractSparkTable(tableInfo: TableInfo) extends Table { - +abstract class AbstractSparkTable(val admin: Admin, val tableInfo: TableInfo) extends Table { protected lazy val _schema: StructType = SparkConversions.toSparkDataType(tableInfo.getSchema.getRowType) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala index ed888e5fe9..8d9578b98b 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala @@ -17,22 +17,33 @@ package org.apache.fluss.spark.catalog +import org.apache.fluss.metadata.PartitionSpec + import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, SpecificInternalRow} +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String import java.util +import scala.collection.JavaConverters._ + trait SupportsFlussPartitionManagement extends AbstractSparkTable with SupportsPartitionManagement { + import SupportsFlussPartitionManagement._ override def partitionSchema(): StructType = _partitionSchema override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = { - throw new UnsupportedOperationException("Creating partition is not supported") + val partitionSpec = toPartitionSpec(ident, partitionSchema()) + admin.createPartition(tableInfo.getTablePath, partitionSpec, false).get() } override def dropPartition(ident: InternalRow): Boolean = { - throw new UnsupportedOperationException("Dropping partition is not supported") + val partitionSpec = toPartitionSpec(ident, partitionSchema()) + admin.dropPartition(tableInfo.getTablePath, partitionSpec, false).get() + true } override def replacePartitionMetadata( @@ -48,6 +59,67 @@ trait SupportsFlussPartitionManagement extends AbstractSparkTable with SupportsP override def listPartitionIdentifiers( names: Array[String], ident: InternalRow): Array[InternalRow] = { - throw new UnsupportedOperationException("Listing partition is not supported") + assert( + names.length == ident.numFields, + s"Number of partition names (${names.length}) must be equal to " + + s"the number of partition values (${ident.numFields})." + ) + val schema = partitionSchema() + assert( + names.forall(fieldName => schema.fieldNames.contains(fieldName)), + s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to " + + s"the partition schema '${schema.sql}'." + ) + + val flussPartitionRows = admin + .listPartitionInfos(tableInfo.getTablePath) + .get() + .asScala + .map(p => toInternalRow(p.getPartitionSpec, schema)) + + val indexes = names.map(schema.fieldIndex) + val dataTypes = names.map(schema(_).dataType) + val currentRow = new GenericInternalRow(new Array[Any](names.length)) + flussPartitionRows.filter { + partRow => + for (i <- names.indices) { + currentRow.values(i) = partRow.get(indexes(i), dataTypes(i)) + } + currentRow == ident + }.toArray + } +} + +object SupportsFlussPartitionManagement { + private def toInternalRow( + partitionSpec: PartitionSpec, + partitionSchema: StructType): InternalRow = { + val row = new SpecificInternalRow(partitionSchema) + for ((field, i) <- partitionSchema.fields.zipWithIndex) { + val partValue = partitionSpec.getSpecMap.get(field.name) + val value = field.dataType match { + case dt => + // TODO Support more types when needed. + PhysicalDataType(field.dataType) match { + case PhysicalBooleanType => partValue.toBoolean + case PhysicalIntegerType => partValue.toInt + case PhysicalDoubleType => partValue.toDouble + case PhysicalFloatType => partValue.toFloat + case PhysicalLongType => partValue.toLong + case PhysicalShortType => partValue.toShort + case PhysicalStringType => UTF8String.fromString(partValue) + } + } + row.update(i, value) + } + row + } + + private def toPartitionSpec(row: InternalRow, partitionSchema: StructType): PartitionSpec = { + val map = partitionSchema.fields.zipWithIndex.map { + case (field, idx) => + (field.name, row.get(idx, field.dataType).toString) + }.toMap + new PartitionSpec(map.asJava) } } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala similarity index 72% rename from fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala rename to fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala index 4f2fc877fb..4c4bec355a 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkCatalogTest.scala @@ -17,16 +17,17 @@ package org.apache.fluss.spark -import org.apache.fluss.metadata.{DatabaseDescriptor, Schema, TableDescriptor, TablePath} +import org.apache.fluss.metadata._ import org.apache.fluss.types.{DataTypes, RowType} -import org.apache.spark.sql.Row +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.connector.catalog.Identifier import org.assertj.core.api.Assertions.{assertThat, assertThatList} import scala.collection.JavaConverters._ -class FlussCatalogTest extends FlussSparkTestBase { +class SparkCatalogTest extends FlussSparkTestBase { test("Catalog: namespaces") { // Always a default database 'fluss'. @@ -190,4 +191,63 @@ class FlussCatalogTest extends FlussSparkTestBase { admin.dropDatabase(dbName, true, true).get() checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) } + + test("Partition: show partitions") { + withTable("t") { + sql(s"CREATE TABLE t (id int, name string, pt1 string, pt2 int) PARTITIONED BY (pt1, pt2)") + sql(s"INSERT INTO t values(1, 'a', 'a', 1), (2, 'b', 'a', 2), (3, 'c', 'c', 3)") + + var expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=a/pt2=2"), Row("pt1=c/pt2=3")) + checkAnswer(sql(s"SHOW PARTITIONS t"), expect) + expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=a/pt2=2")) + checkAnswer(sql(s"SHOW PARTITIONS t PARTITION (pt1 = 'a')"), expect) + } + } + + test("Partition: add partition") { + withTable("t") { + sql("CREATE TABLE t (id int, name string, pt1 string, pt2 int) PARTITIONED BY (pt1, pt2)") + + // add from sparksql + sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt2 = 1)") + var expect = Seq(Row("pt1=b/pt2=1")) + checkAnswer(sql(s"SHOW PARTITIONS t"), expect) + sql(s"ALTER TABLE t ADD IF NOT EXISTS PARTITION (pt1 = 'b', pt2 = 1)") + checkAnswer(sql(s"SHOW PARTITIONS t"), expect) + + // add from fluss + val map = Map("pt1" -> "b", "pt2" -> "2") + admin.createPartition(createTablePath("t"), new PartitionSpec(map.asJava), false).get() + expect = Seq(Row("pt1=b/pt2=1"), Row("pt1=b/pt2=2")) + checkAnswer(sql(s"SHOW PARTITIONS t"), expect) + + intercept[AnalysisException](sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt2 = 1)")) + intercept[AnalysisException](sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt3 = 1)")) + intercept[PartitionsAlreadyExistException]( + sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt2 = 1)")) + } + } + + test("Partition: drop partition") { + withTable("t") { + sql("CREATE TABLE t (id int, name string, pt1 string, pt2 int) PARTITIONED BY (pt1, pt2)") + sql(s"INSERT INTO t values(1, 'a', 'a', 1), (2, 'b', 'a', 2), (3, 'c', 'c', 3)") + + // drop from sparksql + sql(s"ALTER TABLE t DROP PARTITION (pt1 = 'a', pt2 = 2)") + var expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=c/pt2=3")) + checkAnswer(sql(s"SHOW PARTITIONS t"), expect) + sql(s"ALTER TABLE t DROP IF EXISTS PARTITION (pt1 = 'a', pt2 = 2)") + checkAnswer(sql(s"SHOW PARTITIONS t"), expect) + + // drop from fluss + val map = Map("pt1" -> "c", "pt2" -> "3") + admin.dropPartition(createTablePath("t"), new PartitionSpec(map.asJava), false).get() + expect = Seq(Row("pt1=a/pt2=1")) + checkAnswer(sql(s"SHOW PARTITIONS t"), expect) + + // spark does not support drop partial partition + intercept[AnalysisException](sql(s"ALTER TABLE t DROP PARTITION (pt1 = 'a')")) + } + } }