diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 91347cfe80366..5d8cd5958129c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1988,6 +1988,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val TRUNCATE_TABLE_IGNORE_PERMISSION_ACL = + buildConf("spark.sql.truncateTable.ignorePermissionAcl") + .internal() + .doc("When set to true, TRUNCATE TABLE command will not try to set back original " + + "permission and ACLs when re-creating the table/partition paths.") + .booleanConf + .createWithDefault(false) + val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE = buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue") .internal() @@ -2594,6 +2602,9 @@ class SQLConf extends Serializable with Logging { def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG) + def truncateTableIgnorePermissionAcl: Boolean = + getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL) + def nameNonStructGroupingKeyAsValue: Boolean = getConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 71500c304bd48..0cbd16d5d7308 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -24,6 +24,7 @@ import scala.util.Try import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileContext, FsConstants, Path} +import org.apache.hadoop.fs.permission.{AclEntry, FsPermission} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -494,13 +495,59 @@ case class TruncateTableCommand( partLocations } val hadoopConf = spark.sessionState.newHadoopConf() + val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl locations.foreach { location => if (location.isDefined) { val path = new Path(location.get) try { val fs = path.getFileSystem(hadoopConf) + + // Not all fs impl. support these APIs. + var optPermission: Option[FsPermission] = None + var optAcls: Option[java.util.List[AclEntry]] = None + if (!ignorePermissionAcl) { + val fileStatus = fs.getFileStatus(path) + try { + optPermission = Some(fileStatus.getPermission()) + } catch { + case NonFatal(_) => // do nothing + } + + try { + optAcls = Some(fs.getAclStatus(path).getEntries) + } catch { + case NonFatal(_) => // do nothing + } + } + fs.delete(path, true) + + // We should keep original permission/acl of the path. + // For owner/group, only super-user can set it, for example on HDFS. Because + // current user can delete the path, we assume the user/group is correct or not an issue. fs.mkdirs(path) + if (!ignorePermissionAcl) { + optPermission.foreach { permission => + try { + fs.setPermission(path, permission) + } catch { + case NonFatal(e) => + throw new SecurityException( + s"Failed to set original permission $permission back to " + + s"the created path: $path. Exception: ${e.getMessage}") + } + } + optAcls.foreach { acls => + try { + fs.setAcl(path, acls) + } catch { + case NonFatal(e) => + throw new SecurityException( + s"Failed to set original ACL $acls back to " + + s"the created path: $path. Exception: ${e.getMessage}") + } + } + } } catch { case NonFatal(e) => throw new AnalysisException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 2bb121b27e7d6..2335ff8997653 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -21,7 +21,8 @@ import java.io.{File, PrintWriter} import java.net.URI import java.util.Locale -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, RawLocalFileSystem} +import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, AclStatus, FsAction, FsPermission} import org.apache.spark.internal.config import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD @@ -1981,6 +1982,60 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } + test("SPARK-30312: truncate table - keep acl/permission") { + import testImplicits._ + val ignorePermissionAcl = Seq(true, false) + + ignorePermissionAcl.foreach { ignore => + withSQLConf( + "fs.file.impl" -> classOf[FakeLocalFsFileSystem].getName, + "fs.file.impl.disable.cache" -> "true", + SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL.key -> ignore.toString) { + withTable("tab1") { + sql("CREATE TABLE tab1 (col INT) USING parquet") + sql("INSERT INTO tab1 SELECT 1") + checkAnswer(spark.table("tab1"), Row(1)) + + val tablePath = new Path(spark.sessionState.catalog + .getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get) + + val hadoopConf = spark.sessionState.newHadoopConf() + val fs = tablePath.getFileSystem(hadoopConf) + val fileStatus = fs.getFileStatus(tablePath); + + fs.setPermission(tablePath, new FsPermission("777")) + assert(fileStatus.getPermission().toString() == "rwxrwxrwx") + + // Set ACL to table path. + val customAcl = new java.util.ArrayList[AclEntry]() + customAcl.add(new AclEntry.Builder() + .setType(AclEntryType.USER) + .setScope(AclEntryScope.ACCESS) + .setPermission(FsAction.READ).build()) + fs.setAcl(tablePath, customAcl) + assert(fs.getAclStatus(tablePath).getEntries().get(0) == customAcl.get(0)) + + sql("TRUNCATE TABLE tab1") + assert(spark.table("tab1").collect().isEmpty) + + val fileStatus2 = fs.getFileStatus(tablePath) + if (ignore) { + assert(fileStatus2.getPermission().toString() == "rwxr-xr-x") + } else { + assert(fileStatus2.getPermission().toString() == "rwxrwxrwx") + } + val aclEntries = fs.getAclStatus(tablePath).getEntries() + if (ignore) { + assert(aclEntries.size() == 0) + } else { + assert(aclEntries.size() == 1) + assert(aclEntries.get(0) == customAcl.get(0)) + } + } + } + } + } + test("create temporary view with mismatched schema") { withTable("tab1") { spark.range(10).write.saveAsTable("tab1") @@ -2879,3 +2934,25 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + +object FakeLocalFsFileSystem { + var aclStatus = new AclStatus.Builder().build() +} + +// A fake test local filesystem used to test ACL. It keeps a ACL status. If deletes +// a path of this filesystem, it will clean up the ACL status. Note that for test purpose, +// it has only one ACL status for all paths. +class FakeLocalFsFileSystem extends RawLocalFileSystem { + import FakeLocalFsFileSystem._ + + override def delete(f: Path, recursive: Boolean): Boolean = { + aclStatus = new AclStatus.Builder().build() + super.delete(f, recursive) + } + + override def getAclStatus(path: Path): AclStatus = aclStatus + + override def setAcl(path: Path, aclSpec: java.util.List[AclEntry]): Unit = { + aclStatus = new AclStatus.Builder().addEntries(aclSpec).build() + } +}