diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 223094d48593..45b4f013620c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -87,7 +87,14 @@ abstract class ExternalCatalog * Note: If the underlying implementation does not support altering a certain field, * this becomes a no-op. */ - def alterDatabase(dbDefinition: CatalogDatabase): Unit + final def alterDatabase(dbDefinition: CatalogDatabase): Unit = { + val db = dbDefinition.name + postToAll(AlterDatabasePreEvent(db)) + doAlterDatabase(dbDefinition) + postToAll(AlterDatabaseEvent(db)) + } + + protected def doAlterDatabase(dbDefinition: CatalogDatabase): Unit def getDatabase(db: String): CatalogDatabase @@ -147,7 +154,15 @@ abstract class ExternalCatalog * Note: If the underlying implementation does not support altering a certain field, * this becomes a no-op. */ - def alterTable(tableDefinition: CatalogTable): Unit + final def alterTable(tableDefinition: CatalogTable): Unit = { + val db = tableDefinition.database + val name = tableDefinition.identifier.table + postToAll(AlterTablePreEvent(db, name, AlterTableKind.TABLE)) + doAlterTable(tableDefinition) + postToAll(AlterTableEvent(db, name, AlterTableKind.TABLE)) + } + + protected def doAlterTable(tableDefinition: CatalogTable): Unit /** * Alter the data schema of a table identified by the provided database and table name. The new @@ -158,10 +173,22 @@ abstract class ExternalCatalog * @param table Name of table to alter schema for * @param newDataSchema Updated data schema to be used for the table. */ - def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit + final def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit = { + postToAll(AlterTablePreEvent(db, table, AlterTableKind.DATASCHEMA)) + doAlterTableDataSchema(db, table, newDataSchema) + postToAll(AlterTableEvent(db, table, AlterTableKind.DATASCHEMA)) + } + + protected def doAlterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */ - def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit + final def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit = { + postToAll(AlterTablePreEvent(db, table, AlterTableKind.STATS)) + doAlterTableStats(db, table, stats) + postToAll(AlterTableEvent(db, table, AlterTableKind.STATS)) + } + + protected def doAlterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit def getTable(db: String, table: String): CatalogTable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 9504140d51e9..8eacfa058bd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -152,7 +152,7 @@ class InMemoryCatalog( } } - override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized { + override def doAlterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized { requireDbExists(dbDefinition.name) catalog(dbDefinition.name).db = dbDefinition } @@ -294,7 +294,7 @@ class InMemoryCatalog( catalog(db).tables.remove(oldName) } - override def alterTable(tableDefinition: CatalogTable): Unit = synchronized { + override def doAlterTable(tableDefinition: CatalogTable): Unit = synchronized { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) @@ -303,7 +303,7 @@ class InMemoryCatalog( catalog(db).tables(tableDefinition.identifier.table).table = newTableDefinition } - override def alterTableDataSchema( + override def doAlterTableDataSchema( db: String, table: String, newDataSchema: StructType): Unit = synchronized { @@ -313,7 +313,7 @@ class InMemoryCatalog( catalog(db).tables(table).table = origTable.copy(schema = newSchema) } - override def alterTableStats( + override def doAlterTableStats( db: String, table: String, stats: Option[CatalogStatistics]): Unit = synchronized { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala index 742a51e64038..e7d41644392d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala @@ -61,6 +61,16 @@ case class DropDatabasePreEvent(database: String) extends DatabaseEvent */ case class DropDatabaseEvent(database: String) extends DatabaseEvent +/** + * Event fired before a database is altered. + */ +case class AlterDatabasePreEvent(database: String) extends DatabaseEvent + +/** + * Event fired after a database is altered. + */ +case class AlterDatabaseEvent(database: String) extends DatabaseEvent + /** * Event fired when a table is created, dropped or renamed. */ @@ -110,7 +120,33 @@ case class RenameTableEvent( extends TableEvent /** - * Event fired when a function is created, dropped or renamed. + * String to indicate which part of table is altered. If a plain alterTable API is called, then + * type will generally be Table. + */ +object AlterTableKind extends Enumeration { + val TABLE = "table" + val DATASCHEMA = "dataSchema" + val STATS = "stats" +} + +/** + * Event fired before a table is altered. + */ +case class AlterTablePreEvent( + database: String, + name: String, + kind: String) extends TableEvent + +/** + * Event fired after a table is altered. + */ +case class AlterTableEvent( + database: String, + name: String, + kind: String) extends TableEvent + +/** + * Event fired when a function is created, dropped, altered or renamed. */ trait FunctionEvent extends DatabaseEvent { /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala index 087c26f23f38..1acbe34d9a07 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala @@ -75,6 +75,11 @@ class ExternalCatalogEventSuite extends SparkFunSuite { } checkEvents(CreateDatabasePreEvent("db5") :: Nil) + // ALTER + val newDbDefinition = dbDefinition.copy(description = "test") + catalog.alterDatabase(newDbDefinition) + checkEvents(AlterDatabasePreEvent("db5") :: AlterDatabaseEvent("db5") :: Nil) + // DROP intercept[AnalysisException] { catalog.dropDatabase("db4", ignoreIfNotExists = false, cascade = false) @@ -119,6 +124,23 @@ class ExternalCatalogEventSuite extends SparkFunSuite { } checkEvents(CreateTablePreEvent("db5", "tbl1") :: Nil) + // ALTER + val newTableDefinition = tableDefinition.copy(tableType = CatalogTableType.EXTERNAL) + catalog.alterTable(newTableDefinition) + checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.TABLE) :: + AlterTableEvent("db5", "tbl1", AlterTableKind.TABLE) :: Nil) + + // ALTER schema + val newSchema = new StructType().add("id", "long", nullable = false) + catalog.alterTableDataSchema("db5", "tbl1", newSchema) + checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: + AlterTableEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: Nil) + + // ALTER stats + catalog.alterTableStats("db5", "tbl1", None) + checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.STATS) :: + AlterTableEvent("db5", "tbl1", AlterTableKind.STATS) :: Nil) + // RENAME catalog.renameTable("db5", "tbl1", "tbl2") checkEvents( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index f8a947bf527e..7cd772544a96 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -177,7 +177,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * * Note: As of now, this only supports altering database properties! */ - override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient { + override def doAlterDatabase(dbDefinition: CatalogDatabase): Unit = withClient { val existingDb = getDatabase(dbDefinition.name) if (existingDb.properties == dbDefinition.properties) { logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " + @@ -540,7 +540,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * Note: As of now, this doesn't support altering table schema, partition column names and bucket * specification. We will ignore them even if users do specify different values for these fields. */ - override def alterTable(tableDefinition: CatalogTable): Unit = withClient { + override def doAlterTable(tableDefinition: CatalogTable): Unit = withClient { assert(tableDefinition.identifier.database.isDefined) val db = tableDefinition.identifier.database.get requireTableExists(db, tableDefinition.identifier.table) @@ -619,8 +619,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } - override def alterTableDataSchema( - db: String, table: String, newDataSchema: StructType): Unit = withClient { + /** + * Alter the data schema of a table identified by the provided database and table name. The new + * data schema should not have conflict column names with the existing partition columns, and + * should still contain all the existing data columns. + */ + override def doAlterTableDataSchema( + db: String, + table: String, + newDataSchema: StructType): Unit = withClient { requireTableExists(db, table) val oldTable = getTable(db, table) verifyDataSchema(oldTable.identifier, oldTable.tableType, newDataSchema) @@ -648,7 +655,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } - override def alterTableStats( + /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */ + override def doAlterTableStats( db: String, table: String, stats: Option[CatalogStatistics]): Unit = withClient {