Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl
override def loadTable(ident: Identifier): Table = {
try {
val tablePath = toTablePath(ident)
SparkTable(tablePath, admin.getTableInfo(tablePath).get(), flussConfig)
val table = new SparkTable(tablePath, admin.getTableInfo(tablePath).get(), flussConfig, admin)
table
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[TableNotExistException] =>
throw new NoSuchTableException(ident)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.spark

import org.apache.fluss.client.admin.Admin
import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.spark.catalog.{AbstractSparkTable, SupportsFlussPartitionManagement}
Expand All @@ -25,8 +26,12 @@ import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, FlussUpsertWriteBu
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}

case class SparkTable(tablePath: TablePath, tableInfo: TableInfo, flussConfig: FlussConfiguration)
extends AbstractSparkTable(tableInfo)
class SparkTable(
tablePath: TablePath,
tableInfo: TableInfo,
flussConfig: FlussConfiguration,
admin: Admin)
extends AbstractSparkTable(admin, tableInfo)
with SupportsFlussPartitionManagement
with SupportsWrite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.fluss.spark.catalog

import org.apache.fluss.metadata.TableInfo
import org.apache.fluss.client.admin.Admin
import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.spark.SparkConversions

import org.apache.spark.sql.CatalogV2UtilShim
Expand All @@ -29,8 +31,7 @@ import java.util

import scala.collection.JavaConverters._

abstract class AbstractSparkTable(tableInfo: TableInfo) extends Table {

abstract class AbstractSparkTable(val admin: Admin, val tableInfo: TableInfo) extends Table {
protected lazy val _schema: StructType =
SparkConversions.toSparkDataType(tableInfo.getSchema.getRowType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,33 @@

package org.apache.fluss.spark.catalog

import org.apache.fluss.metadata.PartitionSpec

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, SpecificInternalRow}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

import java.util

import scala.collection.JavaConverters._

trait SupportsFlussPartitionManagement extends AbstractSparkTable with SupportsPartitionManagement {
import SupportsFlussPartitionManagement._

override def partitionSchema(): StructType = _partitionSchema

override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = {
throw new UnsupportedOperationException("Creating partition is not supported")
val partitionSpec = toPartitionSpec(ident, partitionSchema())
admin.createPartition(tableInfo.getTablePath, partitionSpec, false).get()
}

override def dropPartition(ident: InternalRow): Boolean = {
throw new UnsupportedOperationException("Dropping partition is not supported")
val partitionSpec = toPartitionSpec(ident, partitionSchema())
admin.dropPartition(tableInfo.getTablePath, partitionSpec, false).get()
true
}

override def replacePartitionMetadata(
Expand All @@ -48,6 +59,67 @@ trait SupportsFlussPartitionManagement extends AbstractSparkTable with SupportsP
override def listPartitionIdentifiers(
names: Array[String],
ident: InternalRow): Array[InternalRow] = {
throw new UnsupportedOperationException("Listing partition is not supported")
assert(
names.length == ident.numFields,
s"Number of partition names (${names.length}) must be equal to " +
s"the number of partition values (${ident.numFields})."
)
val schema = partitionSchema()
assert(
names.forall(fieldName => schema.fieldNames.contains(fieldName)),
s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to " +
s"the partition schema '${schema.sql}'."
)

val flussPartitionRows = admin
.listPartitionInfos(tableInfo.getTablePath)
.get()
.asScala
.map(p => toInternalRow(p.getPartitionSpec, schema))

val indexes = names.map(schema.fieldIndex)
val dataTypes = names.map(schema(_).dataType)
val currentRow = new GenericInternalRow(new Array[Any](names.length))
flussPartitionRows.filter {
partRow =>
for (i <- names.indices) {
currentRow.values(i) = partRow.get(indexes(i), dataTypes(i))
}
currentRow == ident
}.toArray
}
}

object SupportsFlussPartitionManagement {
private def toInternalRow(
partitionSpec: PartitionSpec,
partitionSchema: StructType): InternalRow = {
val row = new SpecificInternalRow(partitionSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it here temporarily. I will move this to other place in to support batch read PR later.

for ((field, i) <- partitionSchema.fields.zipWithIndex) {
val partValue = partitionSpec.getSpecMap.get(field.name)
val value = field.dataType match {
case dt =>
// TODO Support more types when needed.
PhysicalDataType(field.dataType) match {
case PhysicalBooleanType => partValue.toBoolean
case PhysicalIntegerType => partValue.toInt
case PhysicalDoubleType => partValue.toDouble
case PhysicalFloatType => partValue.toFloat
case PhysicalLongType => partValue.toLong
case PhysicalShortType => partValue.toShort
case PhysicalStringType => UTF8String.fromString(partValue)
}
}
row.update(i, value)
}
row
}

private def toPartitionSpec(row: InternalRow, partitionSchema: StructType): PartitionSpec = {
val map = partitionSchema.fields.zipWithIndex.map {
case (field, idx) =>
(field.name, row.get(idx, field.dataType).toString)
}.toMap
new PartitionSpec(map.asJava)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

package org.apache.fluss.spark

import org.apache.fluss.metadata.{DatabaseDescriptor, Schema, TableDescriptor, TablePath}
import org.apache.fluss.metadata._
import org.apache.fluss.types.{DataTypes, RowType}

import org.apache.spark.sql.Row
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.connector.catalog.Identifier
import org.assertj.core.api.Assertions.{assertThat, assertThatList}

import scala.collection.JavaConverters._

class FlussCatalogTest extends FlussSparkTestBase {
class SparkCatalogTest extends FlussSparkTestBase {

test("Catalog: namespaces") {
// Always a default database 'fluss'.
Expand Down Expand Up @@ -190,4 +191,63 @@ class FlussCatalogTest extends FlussSparkTestBase {
admin.dropDatabase(dbName, true, true).get()
checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil)
}

test("Partition: show partitions") {
withTable("t") {
sql(s"CREATE TABLE t (id int, name string, pt1 string, pt2 int) PARTITIONED BY (pt1, pt2)")
sql(s"INSERT INTO t values(1, 'a', 'a', 1), (2, 'b', 'a', 2), (3, 'c', 'c', 3)")

var expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=a/pt2=2"), Row("pt1=c/pt2=3"))
checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkAnswer(sql(s"SHOW PARTITIONS t"), Row("pt1=a/pt2=1") :: Row("pt1=a/pt2=2") :: Row("pt1=c/pt2=3") :: Nil)

expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=a/pt2=2"))
checkAnswer(sql(s"SHOW PARTITIONS t PARTITION (pt1 = 'a')"), expect)
}
}

test("Partition: add partition") {
withTable("t") {
sql("CREATE TABLE t (id int, name string, pt1 string, pt2 int) PARTITIONED BY (pt1, pt2)")

// add from sparksql
sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt2 = 1)")
var expect = Seq(Row("pt1=b/pt2=1"))
checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
sql(s"ALTER TABLE t ADD IF NOT EXISTS PARTITION (pt1 = 'b', pt2 = 1)")
checkAnswer(sql(s"SHOW PARTITIONS t"), expect)

// add from fluss
val map = Map("pt1" -> "b", "pt2" -> "2")
admin.createPartition(createTablePath("t"), new PartitionSpec(map.asJava), false).get()
expect = Seq(Row("pt1=b/pt2=1"), Row("pt1=b/pt2=2"))
checkAnswer(sql(s"SHOW PARTITIONS t"), expect)

intercept[AnalysisException](sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt2 = 1)"))
intercept[AnalysisException](sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt3 = 1)"))
intercept[PartitionsAlreadyExistException](
sql(s"ALTER TABLE t ADD PARTITION (pt1 = 'b', pt2 = 1)"))
}
}

test("Partition: drop partition") {
withTable("t") {
sql("CREATE TABLE t (id int, name string, pt1 string, pt2 int) PARTITIONED BY (pt1, pt2)")
sql(s"INSERT INTO t values(1, 'a', 'a', 1), (2, 'b', 'a', 2), (3, 'c', 'c', 3)")

// drop from sparksql
sql(s"ALTER TABLE t DROP PARTITION (pt1 = 'a', pt2 = 2)")
var expect = Seq(Row("pt1=a/pt2=1"), Row("pt1=c/pt2=3"))
checkAnswer(sql(s"SHOW PARTITIONS t"), expect)
sql(s"ALTER TABLE t DROP IF EXISTS PARTITION (pt1 = 'a', pt2 = 2)")
checkAnswer(sql(s"SHOW PARTITIONS t"), expect)

// drop from fluss
val map = Map("pt1" -> "c", "pt2" -> "3")
admin.dropPartition(createTablePath("t"), new PartitionSpec(map.asJava), false).get()
expect = Seq(Row("pt1=a/pt2=1"))
checkAnswer(sql(s"SHOW PARTITIONS t"), expect)

// spark does not support drop partial partition
intercept[AnalysisException](sql(s"ALTER TABLE t DROP PARTITION (pt1 = 'a')"))
}
}
}