Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ abstract class ExternalCatalog
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterDatabase(dbDefinition: CatalogDatabase): Unit
final def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
val db = dbDefinition.name
postToAll(AlterDatabasePreEvent(db))
doAlterDatabase(dbDefinition)
postToAll(AlterDatabaseEvent(db))
}

protected def doAlterDatabase(dbDefinition: CatalogDatabase): Unit

def getDatabase(db: String): CatalogDatabase

Expand Down Expand Up @@ -147,7 +154,15 @@ abstract class ExternalCatalog
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterTable(tableDefinition: CatalogTable): Unit
final def alterTable(tableDefinition: CatalogTable): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we leave it for now and watch other alterTableXXX events instead. I feel it's an overkill to have a heavy alterTable method to handling all table metadata updating, I think we will add more and more fine-grained alter table methods in the future, like alterTableStats, alterTableDataSchema, etc. and eventually this alterTable method will go away.

Copy link
Contributor

@wzhfy wzhfy Nov 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This depends on which level of details we wanna collect in the event. Are there any guidelines or documentation of what events spark should monitor?
Besides, partition changes are missing, I think it's necessary to monitor these changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , since now we expose alterTable interface for other components to leverage, if we don't track this, then looks like we missed a piece of ExternalCatalogEvents. I think for now we can add this AlterTableEvent, later on if we removed this method, then we can make this event a no-op (only kept for compatibility), what do you think?

@wzhfy , I was thinking to add partition related events, but I'm not clearly sure why this whole piece is missing and is it necessary to add partition related events? If we have an agreement on such events, I'm OK to add them.

val db = tableDefinition.database
val name = tableDefinition.identifier.table
postToAll(AlterTablePreEvent(db, name, AlterTableKind.TABLE))
doAlterTable(tableDefinition)
postToAll(AlterTableEvent(db, name, AlterTableKind.TABLE))
}

protected def doAlterTable(tableDefinition: CatalogTable): Unit

/**
* Alter the data schema of a table identified by the provided database and table name. The new
Expand All @@ -158,10 +173,22 @@ abstract class ExternalCatalog
* @param table Name of table to alter schema for
* @param newDataSchema Updated data schema to be used for the table.
*/
def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit
final def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about alterTableStats?

postToAll(AlterTablePreEvent(db, table, AlterTableKind.DATASCHEMA))
doAlterTableDataSchema(db, table, newDataSchema)
postToAll(AlterTableEvent(db, table, AlterTableKind.DATASCHEMA))
}

protected def doAlterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit

/** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */
def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit
final def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit = {
postToAll(AlterTablePreEvent(db, table, AlterTableKind.STATS))
doAlterTableStats(db, table, stats)
postToAll(AlterTableEvent(db, table, AlterTableKind.STATS))
}

protected def doAlterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit

def getTable(db: String, table: String): CatalogTable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class InMemoryCatalog(
}
}

override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized {
override def doAlterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized {
requireDbExists(dbDefinition.name)
catalog(dbDefinition.name).db = dbDefinition
}
Expand Down Expand Up @@ -294,7 +294,7 @@ class InMemoryCatalog(
catalog(db).tables.remove(oldName)
}

override def alterTable(tableDefinition: CatalogTable): Unit = synchronized {
override def doAlterTable(tableDefinition: CatalogTable): Unit = synchronized {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireTableExists(db, tableDefinition.identifier.table)
Expand All @@ -303,7 +303,7 @@ class InMemoryCatalog(
catalog(db).tables(tableDefinition.identifier.table).table = newTableDefinition
}

override def alterTableDataSchema(
override def doAlterTableDataSchema(
db: String,
table: String,
newDataSchema: StructType): Unit = synchronized {
Expand All @@ -313,7 +313,7 @@ class InMemoryCatalog(
catalog(db).tables(table).table = origTable.copy(schema = newSchema)
}

override def alterTableStats(
override def doAlterTableStats(
db: String,
table: String,
stats: Option[CatalogStatistics]): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ case class DropDatabasePreEvent(database: String) extends DatabaseEvent
*/
case class DropDatabaseEvent(database: String) extends DatabaseEvent

/**
* Event fired before a database is altered.
*/
case class AlterDatabasePreEvent(database: String) extends DatabaseEvent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @jerryshao .
We are adding AlterTableEvent and AlterDatabaseEvent. Can we have a more specific PR title instead of Add more ExternalCatalogEvent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will update the title.


/**
* Event fired after a database is altered.
*/
case class AlterDatabaseEvent(database: String) extends DatabaseEvent

/**
* Event fired when a table is created, dropped or renamed.
*/
Expand Down Expand Up @@ -110,7 +120,33 @@ case class RenameTableEvent(
extends TableEvent

/**
* Event fired when a function is created, dropped or renamed.
* String to indicate which part of table is altered. If a plain alterTable API is called, then
* type will generally be Table.
*/
object AlterTableKind extends Enumeration {
val TABLE = "table"
val DATASCHEMA = "dataSchema"
val STATS = "stats"
}

/**
* Event fired before a table is altered.
*/
case class AlterTablePreEvent(
database: String,
name: String,
kind: String) extends TableEvent

/**
* Event fired after a table is altered.
*/
case class AlterTableEvent(
database: String,
name: String,
kind: String) extends TableEvent

/**
* Event fired when a function is created, dropped, altered or renamed.
*/
trait FunctionEvent extends DatabaseEvent {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
}
checkEvents(CreateDatabasePreEvent("db5") :: Nil)

// ALTER
val newDbDefinition = dbDefinition.copy(description = "test")
catalog.alterDatabase(newDbDefinition)
checkEvents(AlterDatabasePreEvent("db5") :: AlterDatabaseEvent("db5") :: Nil)

// DROP
intercept[AnalysisException] {
catalog.dropDatabase("db4", ignoreIfNotExists = false, cascade = false)
Expand Down Expand Up @@ -119,6 +124,23 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
}
checkEvents(CreateTablePreEvent("db5", "tbl1") :: Nil)

// ALTER
val newTableDefinition = tableDefinition.copy(tableType = CatalogTableType.EXTERNAL)
catalog.alterTable(newTableDefinition)
checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.TABLE) ::
AlterTableEvent("db5", "tbl1", AlterTableKind.TABLE) :: Nil)

// ALTER schema
val newSchema = new StructType().add("id", "long", nullable = false)
catalog.alterTableDataSchema("db5", "tbl1", newSchema)
checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) ::
AlterTableEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: Nil)

// ALTER stats
catalog.alterTableStats("db5", "tbl1", None)
checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.STATS) ::
AlterTableEvent("db5", "tbl1", AlterTableKind.STATS) :: Nil)

// RENAME
catalog.renameTable("db5", "tbl1", "tbl2")
checkEvents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
*
* Note: As of now, this only supports altering database properties!
*/
override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
override def doAlterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
val existingDb = getDatabase(dbDefinition.name)
if (existingDb.properties == dbDefinition.properties) {
logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
Expand Down Expand Up @@ -540,7 +540,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* Note: As of now, this doesn't support altering table schema, partition column names and bucket
* specification. We will ignore them even if users do specify different values for these fields.
*/
override def alterTable(tableDefinition: CatalogTable): Unit = withClient {
override def doAlterTable(tableDefinition: CatalogTable): Unit = withClient {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireTableExists(db, tableDefinition.identifier.table)
Expand Down Expand Up @@ -619,8 +619,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

override def alterTableDataSchema(
db: String, table: String, newDataSchema: StructType): Unit = withClient {
/**
* Alter the data schema of a table identified by the provided database and table name. The new
* data schema should not have conflict column names with the existing partition columns, and
* should still contain all the existing data columns.
*/
override def doAlterTableDataSchema(
db: String,
table: String,
newDataSchema: StructType): Unit = withClient {
requireTableExists(db, table)
val oldTable = getTable(db, table)
verifyDataSchema(oldTable.identifier, oldTable.tableType, newDataSchema)
Expand Down Expand Up @@ -648,7 +655,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

override def alterTableStats(
/** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */
override def doAlterTableStats(
db: String,
table: String,
stats: Option[CatalogStatistics]): Unit = withClient {
Expand Down