Skip to content

Commit 56dafc3

Browse files
committed
[SC-5492] Fix drop table command in ACL enabled Spark
## What changes were proposed in this pull request? Drop table commands (and probably other create/drop commands) currently fail with a table does not exists exception when using Thrift with ACLs enabled. This bug is caused by the following factors: 1. Thrift always executes an action on a dataset by calling `collect()`. 2. A `Dataset` for a command is executed eagerly. As soon as we create a `Dataset` that contains a `DropTableCommand`, the given table is dropped. 3. When you execute an action on a dataset. The `Dataset` creates a new `Dataframe` to track the query execution. The created `Dataset` will re-check analysis, the ACL `CheckPermissions` rule is triggered in this case, which fails because it cannot find the table to be dropped (it has already been dropped when the `Dataset` was eagerly executed). This PR fixes this issue by modifying the `Dataset` actions; they will not spin off a new `Dataframe` anymore, the `queryExecution` is used directly when evaluating an action. This also gets rid of some code duplication for `Dataset` action evaluation by merging the typed and untyped code paths. The changes to `Dataset` will put in Apache Spark, I have created https://issues.apache.org/jira/browse/SPARK-19070 to track this. ## How was this patch tested? I have added a regression test to `HiveThriftServer2Suites` and I have expanded the `base` scenario in the thrift ACL end to end tests. Author: Herman van Hovell <[email protected]> Closes apache#160 from hvanhovell/SC-5492.
1 parent c836d98 commit 56dafc3

File tree

5 files changed

+214
-142
lines changed

5 files changed

+214
-142
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 26 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans._
4343
import org.apache.spark.sql.catalyst.plans.logical._
4444
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
4545
import org.apache.spark.sql.catalyst.util.usePrettyExpression
46-
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
46+
import org.apache.spark.sql.execution._
4747
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
4848
import org.apache.spark.sql.execution.datasources.LogicalRelation
4949
import org.apache.spark.sql.execution.python.EvaluatePython
@@ -2109,9 +2109,7 @@ class Dataset[T] private[sql](
21092109
* @group action
21102110
* @since 1.6.0
21112111
*/
2112-
def head(n: Int): Array[T] = withTypedCallback("head", limit(n)) { df =>
2113-
df.collect(needCallback = false)
2114-
}
2112+
def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)
21152113

21162114
/**
21172115
* Returns the first row.
@@ -2338,7 +2336,7 @@ class Dataset[T] private[sql](
23382336
def takeAsList(n: Int): java.util.List[T] = java.util.Arrays.asList(take(n) : _*)
23392337

23402338
/**
2341-
* Returns an array that contains all of [[Row]]s in this Dataset.
2339+
* Returns an array that contains all rows in this Dataset.
23422340
*
23432341
* Running collect requires moving all the data into the application's driver process, and
23442342
* doing so on a very large dataset can crash the driver process with OutOfMemoryError.
@@ -2348,38 +2346,24 @@ class Dataset[T] private[sql](
23482346
* @group action
23492347
* @since 1.6.0
23502348
*/
2351-
def collect(): Array[T] = collect(needCallback = true)
2349+
def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)
23522350

23532351
/**
2354-
* Returns a Java list that contains all of [[Row]]s in this Dataset.
2352+
* Returns a Java list that contains all rows in this Dataset.
23552353
*
23562354
* Running collect requires moving all the data into the application's driver process, and
23572355
* doing so on a very large dataset can crash the driver process with OutOfMemoryError.
23582356
*
23592357
* @group action
23602358
* @since 1.6.0
23612359
*/
2362-
def collectAsList(): java.util.List[T] = withCallback("collectAsList", toDF()) { _ =>
2363-
withNewExecutionId {
2364-
val values = queryExecution.executedPlan.executeCollect().map(boundEnc.fromRow)
2365-
java.util.Arrays.asList(values : _*)
2366-
}
2367-
}
2368-
2369-
private def collect(needCallback: Boolean): Array[T] = {
2370-
def execute(): Array[T] = withNewExecutionId {
2371-
queryExecution.executedPlan.executeCollect().map(boundEnc.fromRow)
2372-
}
2373-
2374-
if (needCallback) {
2375-
withCallback("collect", toDF())(_ => execute())
2376-
} else {
2377-
execute()
2378-
}
2360+
def collectAsList(): java.util.List[T] = withAction("collectAsList", queryExecution) { plan =>
2361+
val values = collectFromPlan(plan)
2362+
java.util.Arrays.asList(values : _*)
23792363
}
23802364

23812365
/**
2382-
* Return an iterator that contains all of [[Row]]s in this Dataset.
2366+
* Return an iterator that contains all rows in this Dataset.
23832367
*
23842368
* The iterator will consume as much memory as the largest partition in this Dataset.
23852369
*
@@ -2390,9 +2374,9 @@ class Dataset[T] private[sql](
23902374
* @group action
23912375
* @since 2.0.0
23922376
*/
2393-
def toLocalIterator(): java.util.Iterator[T] = withCallback("toLocalIterator", toDF()) { _ =>
2394-
withNewExecutionId {
2395-
queryExecution.executedPlan.executeToIterator().map(boundEnc.fromRow).asJava
2377+
def toLocalIterator(): java.util.Iterator[T] = {
2378+
withAction("toLocalIterator", queryExecution) { plan =>
2379+
plan.executeToIterator().map(boundEnc.fromRow).asJava
23962380
}
23972381
}
23982382

@@ -2401,8 +2385,8 @@ class Dataset[T] private[sql](
24012385
* @group action
24022386
* @since 1.6.0
24032387
*/
2404-
def count(): Long = withCallback("count", groupBy().count()) { df =>
2405-
df.collect(needCallback = false).head.getLong(0)
2388+
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
2389+
plan.executeCollect().head.getLong(0)
24062390
}
24072391

24082392
/**
@@ -2769,38 +2753,30 @@ class Dataset[T] private[sql](
27692753
* Wrap a Dataset action to track the QueryExecution and time cost, then report to the
27702754
* user-registered callback functions.
27712755
*/
2772-
private def withCallback[U](name: String, df: DataFrame)(action: DataFrame => U) = {
2756+
private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
27732757
try {
2774-
df.queryExecution.executedPlan.foreach { plan =>
2758+
qe.executedPlan.foreach { plan =>
27752759
plan.resetMetrics()
27762760
}
27772761
val start = System.nanoTime()
2778-
val result = action(df)
2762+
val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
2763+
action(qe.executedPlan)
2764+
}
27792765
val end = System.nanoTime()
2780-
sparkSession.listenerManager.onSuccess(name, df.queryExecution, end - start)
2766+
sparkSession.listenerManager.onSuccess(name, qe, end - start)
27812767
result
27822768
} catch {
27832769
case e: Exception =>
2784-
sparkSession.listenerManager.onFailure(name, df.queryExecution, e)
2770+
sparkSession.listenerManager.onFailure(name, qe, e)
27852771
throw e
27862772
}
27872773
}
27882774

2789-
private def withTypedCallback[A, B](name: String, ds: Dataset[A])(action: Dataset[A] => B) = {
2790-
try {
2791-
ds.queryExecution.executedPlan.foreach { plan =>
2792-
plan.resetMetrics()
2793-
}
2794-
val start = System.nanoTime()
2795-
val result = action(ds)
2796-
val end = System.nanoTime()
2797-
sparkSession.listenerManager.onSuccess(name, ds.queryExecution, end - start)
2798-
result
2799-
} catch {
2800-
case e: Exception =>
2801-
sparkSession.listenerManager.onFailure(name, ds.queryExecution, e)
2802-
throw e
2803-
}
2775+
/**
2776+
* Collect all elements from a spark plan.
2777+
*/
2778+
private def collectFromPlan(plan: SparkPlan): Array[T] = {
2779+
plan.executeCollect().map(boundEnc.fromRow)
28042780
}
28052781

28062782
private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {

sql/hive-thriftserver/src/test/resources/acl-tests/inputs/base.sql

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,20 @@ show grant usr1 on db1.vw1;
2020
show tables in db1;
2121
select * from db1.vw1;
2222

23+
-- Revoke rights
24+
@super;
25+
revoke all privileges on db1.vw1 from usr1;
26+
revoke all privileges on database db1 from usr1;
27+
28+
-- Fail selecting data for usr1.
29+
@usr1;
30+
show grant usr1 on db1.vw1;
31+
show tables in db1;
32+
select * from db1.vw1;
33+
2334
-- Cleanup
2435
@super;
25-
drop database db1 cascade;
36+
drop view db1.vw1;
37+
drop database db1;
2638
show databases;
2739
msck repair database __all__ privileges;

sql/hive-thriftserver/src/test/resources/acl-tests/results/base.sql.out

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-- Automatically generated by ThriftEndToEndAclTestSuite
2-
-- Number of queries: 12
2+
-- Number of queries: 18
33

44

55
-- !query 0
@@ -111,7 +111,7 @@ struct<id:bigint>
111111

112112

113113
-- !query 9
114-
drop database db1 cascade
114+
revoke all privileges on db1.vw1 from usr1
115115
-- !query 9 token
116116
super
117117
-- !query 9 schema
@@ -121,20 +121,80 @@ struct<Result:string>
121121

122122

123123
-- !query 10
124-
show databases
124+
revoke all privileges on database db1 from usr1
125125
-- !query 10 token
126126
super
127127
-- !query 10 schema
128-
struct<databaseName:string>
128+
struct<Result:string>
129129
-- !query 10 output
130-
[default]
130+
131131

132132

133133
-- !query 11
134-
msck repair database __all__ privileges
134+
show grant usr1 on db1.vw1
135135
-- !query 11 token
136-
super
136+
usr1
137137
-- !query 11 schema
138-
struct<Result:string>
138+
struct<Principal:string,ActionType:string,ObjectType:string,ObjectKey:string>
139139
-- !query 11 output
140140

141+
142+
143+
-- !query 12
144+
show tables in db1
145+
-- !query 12 token
146+
usr1
147+
-- !query 12 schema
148+
struct<>
149+
-- !query 12 output
150+
java.lang.SecurityException: Principal is not authorized to execute the given query
151+
152+
153+
-- !query 13
154+
select * from db1.vw1
155+
-- !query 13 token
156+
usr1
157+
-- !query 13 schema
158+
struct<>
159+
-- !query 13 output
160+
java.lang.SecurityException: Principal is not authorized to execute the given query
161+
162+
163+
-- !query 14
164+
drop view db1.vw1
165+
-- !query 14 token
166+
super
167+
-- !query 14 schema
168+
struct<Result:string>
169+
-- !query 14 output
170+
171+
172+
173+
-- !query 15
174+
drop database db1
175+
-- !query 15 token
176+
super
177+
-- !query 15 schema
178+
struct<Result:string>
179+
-- !query 15 output
180+
181+
182+
183+
-- !query 16
184+
show databases
185+
-- !query 16 token
186+
super
187+
-- !query 16 schema
188+
struct<databaseName:string>
189+
-- !query 16 output
190+
[default]
191+
192+
193+
-- !query 17
194+
msck repair database __all__ privileges
195+
-- !query 17 token
196+
super
197+
-- !query 17 schema
198+
struct<Result:string>
199+
-- !query 17 output
200+

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -770,17 +770,11 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
770770
val tempLog4jConf = Utils.createTempDir().getCanonicalPath
771771

772772
Files.write(
773-
"""log4j.rootCategory=DEBUG, console, file
773+
"""log4j.rootCategory=DEBUG, console
774774
|log4j.appender.console=org.apache.log4j.ConsoleAppender
775775
|log4j.appender.console.target=System.err
776776
|log4j.appender.console.layout=org.apache.log4j.PatternLayout
777777
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
778-
|log4j.appender.file=org.apache.log4j.FileAppender
779-
|log4j.appender.file.append=false
780-
|log4j.appender.file.file=/Users/hvanhovell/thrift.log
781-
|log4j.appender.file.threshold=DEBUG
782-
|log4j.appender.file.layout=org.apache.log4j.PatternLayout
783-
|log4j.appender.file.layout.ConversionPattern=%-5p %c: %m%n
784778
""".stripMargin,
785779
new File(s"$tempLog4jConf/log4j.properties"),
786780
StandardCharsets.UTF_8)

0 commit comments

Comments
 (0)