From 2649135dac1472b90534abf0e51ac2760360c134 Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Fri, 9 Jun 2017 22:45:53 -0700 Subject: [PATCH 1/4] use alterStats for spark's stats --- .../catalyst/catalog/ExternalCatalog.scala | 2 + .../catalyst/catalog/InMemoryCatalog.scala | 9 ++ .../sql/catalyst/catalog/SessionCatalog.scala | 13 +++ .../command/AnalyzeColumnCommand.scala | 2 +- .../command/AnalyzeTableCommand.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala | 58 +++++++------ .../spark/sql/hive/StatisticsSuite.scala | 82 +++++++++++-------- 7 files changed, 105 insertions(+), 63 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 974ef900e2ee..12ba5aedde02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -160,6 +160,8 @@ abstract class ExternalCatalog */ def alterTableSchema(db: String, table: String, schema: StructType): Unit + def alterTableStats(db: String, table: String, stats: CatalogStatistics): Unit + def getTable(db: String, table: String): CatalogTable def getTableOption(db: String, table: String): Option[CatalogTable] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 8a5319bebe54..9820522a230e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -312,6 +312,15 @@ class InMemoryCatalog( catalog(db).tables(table).table = origTable.copy(schema = schema) } + override def alterTableStats( + db: String, + table: String, + stats: CatalogStatistics): Unit = synchronized { + requireTableExists(db, table) + val origTable = catalog(db).tables(table).table + catalog(db).tables(table).table = origTable.copy(stats = Some(stats)) + } + override def getTable(db: String, table: String): CatalogTable = synchronized { requireTableExists(db, table) catalog(db).tables(table).table diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 57006bfaf9b6..878f22b310d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -376,6 +376,19 @@ class SessionCatalog( schema.fields.map(_.name).exists(conf.resolver(_, colName)) } + /** + * Alter Spark's statistics of an existing metastore table identified by the provided table + * identifier. + */ + def alterTableStats(identifier: TableIdentifier, newStats: CatalogStatistics): Unit = { + val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(identifier.table) + val tableIdentifier = TableIdentifier(table, Some(db)) + requireDbExists(db) + requireTableExists(tableIdentifier) + externalCatalog.alterTableStats(db, table, newStats) + } + /** * Return whether a table/view with the specified name exists. If no database is specified, check * with current database. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 2de14c90ec75..2f273b63e834 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -54,7 +54,7 @@ case class AnalyzeColumnCommand( // Newly computed column stats should override the existing ones. colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats) - sessionState.catalog.alterTable(tableMeta.copy(stats = Some(statistics))) + sessionState.catalog.alterTableStats(tableIdentWithDB, statistics) // Refresh the cached data source table in the catalog. sessionState.catalog.refreshTable(tableIdentWithDB) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 3183c7911b1f..3c59b982c2dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -69,7 +69,7 @@ case class AnalyzeTableCommand( // Update the metastore if the above statistics of the table are different from those // recorded in the metastore. if (newStats.isDefined) { - sessionState.catalog.alterTable(tableMeta.copy(stats = newStats)) + sessionState.catalog.alterTableStats(tableIdentWithDB, newStats.get) // Refresh the cached data source table in the catalog. sessionState.catalog.refreshTable(tableIdentWithDB) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 918459fe7c24..03e29a819c49 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -538,30 +538,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat requireTableExists(db, tableDefinition.identifier.table) verifyTableProperties(tableDefinition) - // convert table statistics to properties so that we can persist them through hive api - val withStatsProps = if (tableDefinition.stats.isDefined) { - val stats = tableDefinition.stats.get - var statsProperties: Map[String, String] = - Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()) - if (stats.rowCount.isDefined) { - statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString() - } - val colNameTypeMap: Map[String, DataType] = - tableDefinition.schema.fields.map(f => (f.name, f.dataType)).toMap - stats.colStats.foreach { case (colName, colStat) => - colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => - statsProperties += (columnStatKeyPropName(colName, k) -> v) - } - } - tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties) - } else { - tableDefinition - } - if (tableDefinition.tableType == VIEW) { - client.alterTable(withStatsProps) + client.alterTable(tableDefinition) } else { - val oldTableDef = getRawTable(db, withStatsProps.identifier.table) + val oldTableDef = getRawTable(db, tableDefinition.identifier.table) val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) { tableDefinition.storage @@ -614,9 +594,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, // to retain the spark specific format if it is. Also add old data source properties to table // properties, to retain the data source table format. - val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX)) - val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp - val newDef = withStatsProps.copy( + val propsFromOldTable = oldTableDef.properties.filter { case (k, v) => + k.startsWith(DATASOURCE_PREFIX) || k.startsWith(STATISTICS_PREFIX) + } + val newTableProps = propsFromOldTable ++ tableDefinition.properties + partitionProviderProp + val newDef = tableDefinition.copy( storage = newStorage, schema = oldTableDef.schema, partitionColumnNames = oldTableDef.partitionColumnNames, @@ -647,6 +629,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } + override def alterTableStats( + db: String, + table: String, + stats: CatalogStatistics): Unit = withClient { + requireTableExists(db, table) + val rawTable = getRawTable(db, table) + + // convert table statistics to properties so that we can persist them through hive client + var statsProperties: Map[String, String] = + Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()) + if (stats.rowCount.isDefined) { + statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString() + } + val colNameTypeMap: Map[String, DataType] = + rawTable.schema.fields.map(f => (f.name, f.dataType)).toMap + stats.colStats.foreach { case (colName, colStat) => + colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => + statsProperties += (columnStatKeyPropName(colName, k) -> v) + } + } + + val oldTableNonStatsProps = rawTable.properties.filterNot(_._1.startsWith(STATISTICS_PREFIX)) + val updatedTable = rawTable.copy(properties = oldTableNonStatsProps ++ statsProperties) + client.alterTable(updatedTable) + } + override def getTable(db: String, table: String): CatalogTable = withClient { restoreTableMetadata(getRawTable(db, table)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 5d52f8baa3b9..5e54f6941746 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -25,7 +25,7 @@ import scala.util.matching.Regex import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -267,7 +267,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } - test("get statistics when not analyzed in both Hive and Spark") { + test("get statistics when not analyzed in Hive or Spark") { val tabName = "tab1" withTable(tabName) { createNonPartitionedTable(tabName, analyzedByHive = false, analyzedBySpark = false) @@ -313,60 +313,70 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } - test("alter table SET TBLPROPERTIES after analyze table") { - Seq(true, false).foreach { analyzedBySpark => - val tabName = "tab1" - withTable(tabName) { - createNonPartitionedTable(tabName, analyzedByHive = true, analyzedBySpark = analyzedBySpark) - val fetchedStats1 = checkTableStats( - tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) - sql(s"ALTER TABLE $tabName SET TBLPROPERTIES ('foo' = 'a')") - val fetchedStats2 = checkTableStats( - tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) - assert(fetchedStats1 == fetchedStats2) - - val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName") - - val totalSize = extractStatsPropValues(describeResult, "totalSize") - assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost") + test("alter table should not have the side effect to store statistics in Spark side") { + def getCatalogTable(tableName: String): CatalogTable = { + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + } - // ALTER TABLE SET TBLPROPERTIES invalidates some Hive specific statistics - // This is triggered by the Hive alterTable API - val numRows = extractStatsPropValues(describeResult, "numRows") - assert(numRows.isDefined && numRows.get == -1, "numRows is lost") - val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize") - assert(rawDataSize.isDefined && rawDataSize.get == -1, "rawDataSize is lost") - } + val table = "alter_table_side_effect" + withTable(table) { + sql(s"CREATE TABLE $table (i string, j string)") + sql(s"INSERT INTO TABLE $table SELECT 'a', 'b'") + val catalogTable1 = getCatalogTable(table) + val hiveSize1 = BigInt(catalogTable1.ignoredProperties(StatsSetupConst.TOTAL_SIZE)) + + sql(s"ALTER TABLE $table SET TBLPROPERTIES ('prop1' = 'a')") + + sql(s"INSERT INTO TABLE $table SELECT 'c', 'd'") + val catalogTable2 = getCatalogTable(table) + val hiveSize2 = BigInt(catalogTable2.ignoredProperties(StatsSetupConst.TOTAL_SIZE)) + // After insertion, Hive's stats should be changed. + assert(hiveSize2 > hiveSize1) + // We haven't generate stats in Spark, so we should still use Hive's stats here. + assert(catalogTable2.stats.get.sizeInBytes == hiveSize2) } } - test("alter table UNSET TBLPROPERTIES after analyze table") { - Seq(true, false).foreach { analyzedBySpark => - val tabName = "tab1" + private def testAlterTableProperties(tabName: String, alterTablePropCmd: String): Unit = { + Seq(true).foreach { analyzedBySpark => withTable(tabName) { createNonPartitionedTable(tabName, analyzedByHive = true, analyzedBySpark = analyzedBySpark) - val fetchedStats1 = checkTableStats( - tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) - sql(s"ALTER TABLE $tabName UNSET TBLPROPERTIES ('prop1')") - val fetchedStats2 = checkTableStats( - tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) - assert(fetchedStats1 == fetchedStats2) + checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) + + // Run ALTER TABLE command + sql(alterTablePropCmd) val describeResult = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName") val totalSize = extractStatsPropValues(describeResult, "totalSize") assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost") - // ALTER TABLE UNSET TBLPROPERTIES invalidates some Hive specific statistics - // This is triggered by the Hive alterTable API + // ALTER TABLE SET TBLPROPERTIES invalidates some Hive specific statistics, but not Spark + // specific statistics. This is triggered by the Hive alterTable API. val numRows = extractStatsPropValues(describeResult, "numRows") assert(numRows.isDefined && numRows.get == -1, "numRows is lost") val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize") assert(rawDataSize.isDefined && rawDataSize.get == -1, "rawDataSize is lost") + + if (analyzedBySpark) { + checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) + } else { + checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = None) + } } } } + test("alter table SET TBLPROPERTIES after analyze table") { + testAlterTableProperties("set_prop_table", + "ALTER TABLE set_prop_table SET TBLPROPERTIES ('foo' = 'a')") + } + + test("alter table UNSET TBLPROPERTIES after analyze table") { + testAlterTableProperties("unset_prop_table", + "ALTER TABLE unset_prop_table UNSET TBLPROPERTIES ('prop1')") + } + test("add/drop partitions - managed table") { val catalog = spark.sessionState.catalog val managedTable = "partitionedTable" From 38d03d7d5c492c4d9bb2e0029ea9595f0eefda0e Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Fri, 9 Jun 2017 23:02:51 -0700 Subject: [PATCH 2/4] minor --- .../scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 5e54f6941746..001bbc230ff1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -338,7 +338,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } private def testAlterTableProperties(tabName: String, alterTablePropCmd: String): Unit = { - Seq(true).foreach { analyzedBySpark => + Seq(true, false).foreach { analyzedBySpark => withTable(tabName) { createNonPartitionedTable(tabName, analyzedByHive = true, analyzedBySpark = analyzedBySpark) checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = Some(500)) @@ -351,8 +351,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val totalSize = extractStatsPropValues(describeResult, "totalSize") assert(totalSize.isDefined && totalSize.get > 0, "totalSize is lost") - // ALTER TABLE SET TBLPROPERTIES invalidates some Hive specific statistics, but not Spark - // specific statistics. This is triggered by the Hive alterTable API. + // ALTER TABLE SET/UNSET TBLPROPERTIES invalidates some Hive specific statistics, but not + // Spark specific statistics. This is triggered by the Hive alterTable API. val numRows = extractStatsPropValues(describeResult, "numRows") assert(numRows.isDefined && numRows.get == -1, "numRows is lost") val rawDataSize = extractStatsPropValues(describeResult, "rawDataSize") From 0ba01ac3dd850b782f1f51f34a33214dd08f65ba Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Sat, 10 Jun 2017 00:27:23 -0700 Subject: [PATCH 3/4] update comments --- .../org/apache/spark/sql/hive/HiveExternalCatalog.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 03e29a819c49..247199c3e6ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -591,9 +591,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_FILESYSTEM } - // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, - // to retain the spark specific format if it is. Also add old data source properties to table - // properties, to retain the data source table format. + // Add old data source properties to table properties, to retain the data source table format. + // Add old stats properties to table properties, to retain spark's stats. + // Set the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, + // to retain the spark specific format if it is. val propsFromOldTable = oldTableDef.properties.filter { case (k, v) => k.startsWith(DATASOURCE_PREFIX) || k.startsWith(STATISTICS_PREFIX) } From 221d0525c989f1d9a92d3d9e58c7667bec8db182 Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Sun, 11 Jun 2017 12:18:34 -0700 Subject: [PATCH 4/4] add test cases and update comment --- .../sql/catalyst/catalog/ExternalCatalogSuite.scala | 11 ++++++++++- .../sql/catalyst/catalog/SessionCatalogSuite.scala | 12 ++++++++++++ .../apache/spark/sql/hive/HiveExternalCatalog.scala | 3 ++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 1759ac04c003..557b0970b54e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -245,7 +245,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac test("alter table schema") { val catalog = newBasicCatalog() - val tbl1 = catalog.getTable("db2", "tbl1") val newSchema = StructType(Seq( StructField("col1", IntegerType), StructField("new_field_2", StringType), @@ -256,6 +255,16 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(newTbl1.schema == newSchema) } + test("alter table stats") { + val catalog = newBasicCatalog() + val oldTableStats = catalog.getTable("db2", "tbl1").stats + assert(oldTableStats.isEmpty) + val newStats = CatalogStatistics(sizeInBytes = 1) + catalog.alterTableStats("db2", "tbl1", newStats) + val newTableStats = catalog.getTable("db2", "tbl1").stats + assert(newTableStats.get == newStats) + } + test("get table") { assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index be8903000a0d..78e3acf45954 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -448,6 +448,18 @@ abstract class SessionCatalogSuite extends PlanTest { } } + test("alter table stats") { + withBasicCatalog { catalog => + val tableId = TableIdentifier("tbl1", Some("db2")) + val oldTableStats = catalog.getTableMetadata(tableId).stats + assert(oldTableStats.isEmpty) + val newStats = CatalogStatistics(sizeInBytes = 1) + catalog.alterTableStats(tableId, newStats) + val newTableStats = catalog.getTableMetadata(tableId).stats + assert(newTableStats.get == newStats) + } + } + test("alter table add columns") { withBasicCatalog { sessionCatalog => sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 247199c3e6ae..7fcf06d66b5e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -527,7 +527,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat /** * Alter a table whose name that matches the one specified in `tableDefinition`, - * assuming the table exists. + * assuming the table exists. This method does not change the properties for data source and + * statistics. * * Note: As of now, this doesn't support altering table schema, partition column names and bucket * specification. We will ignore them even if users do specify different values for these fields.