Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package org.apache.spark.sql.catalyst.util

import java.util.regex.{Pattern, PatternSyntaxException}

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable

import org.apache.commons.lang.StringUtils.isNotBlank

import org.apache.spark.sql.AnalysisException
import org.apache.spark.unsafe.types.UTF8String


object StringUtils {

/**
Expand Down Expand Up @@ -90,12 +93,145 @@ object StringUtils {
funcNames.toSeq
}

/**
* Split the text into one or more SQLs with bracketed comments reserved
*
* Highlighted Corner Cases: semicolon in double quotes, single quotes or inline comments.
* Expected Behavior: The blanks will be trimed and a blank line will be omitted.
*
* @param text One or more SQLs separated by semicolons
* @return the trimmed SQL array (Array is for Java introp)
*/
def split(text: String): Array[String] = {
val D_QUOTE: Char = '"'
val S_QUOTE: Char = '\''
val Q_QUOTE: Char = '`'
val SEMICOLON: Char = ';'
val ESCAPE: Char = '\\'
val DOT = '.'
val STAR = '*'
val DASH = '-'
val SINGLE_COMMENT = "--"
val BRACKETED_COMMENT_START = "/*"
val BRACKETED_COMMENT_END = "*/"
val FORWARD_SLASH = '/'

sealed trait Flag
case object Common extends Flag
trait Quote extends Flag {
def toChar: Char
def sameAs(quoteChar: Char): Boolean = { toChar == quoteChar }
}
object Quote {
def validate(quoteChar: Char): Boolean = {
List(D_QUOTE, S_QUOTE, Q_QUOTE).contains(quoteChar)
}

def apply(quoteChar: Char): Quote = quoteChar match {
case D_QUOTE => DoubleQuote
case S_QUOTE => SingleQuote
case Q_QUOTE => QuasiQuote
case _ =>
throw new IllegalArgumentException(s"$quoteChar is not a character for quoting")
}
}
trait Comment extends Flag
trait OpenAndClose {
def openBy(text: String): Boolean
def closeBy(text: String): Boolean
}
// the cursor stands on a doulbe quoted string
case object DoubleQuote extends Quote with OpenAndClose {
override def openBy(text: String): Boolean = text.startsWith(D_QUOTE.toString)
override def closeBy(text: String): Boolean = text.startsWith(D_QUOTE.toString)
override def toChar: Char = D_QUOTE
}
// the cursor stands on a quasiquoted string
case object QuasiQuote extends Quote with OpenAndClose {
override def openBy(text: String): Boolean = text.startsWith(Q_QUOTE.toString)
override def closeBy(text: String): Boolean = text.startsWith(Q_QUOTE.toString)
override def toChar: Char = Q_QUOTE
}
// the cursor stands on a single quoted string
case object SingleQuote extends Quote with OpenAndClose {
override def openBy(text: String): Boolean = text.startsWith(S_QUOTE.toString)
override def closeBy(text: String): Boolean = text.startsWith(S_QUOTE.toString)
override def toChar: Char = S_QUOTE
}
// the cursor stands in the SINGLE_COMMENT
case object SingleComment extends Comment {
def openBy(text: String): Boolean = text.startsWith(SINGLE_COMMENT)
}
// the cursor stands in the BRACKETED_COMMENT
case object BracketedComment extends Comment with OpenAndClose {
override def openBy(text: String): Boolean = text.startsWith(BRACKETED_COMMENT_START)
override def closeBy(text: String): Boolean = text.startsWith(BRACKETED_COMMENT_END)
}

var flag: Flag = Common
var cursor: Int = 0
val ret: mutable.ArrayBuffer[String] = mutable.ArrayBuffer()
var currentSQL: mutable.StringBuilder = mutable.StringBuilder.newBuilder

while (cursor < text.length) {
val current: Char = text(cursor)
val remaining = text.substring(cursor)

(flag, current) match {
// if it stands on the opening of a bracketed comment, consume 2 characters
case (Common, FORWARD_SLASH) if BracketedComment.openBy(remaining) =>
flag = BracketedComment
currentSQL.append(BRACKETED_COMMENT_START)
cursor += 2
// if it stands on the ending of a bracketed comment, consume 2 characters
case (BracketedComment, STAR) if BracketedComment.closeBy(remaining) =>
flag = Common
currentSQL.append(BRACKETED_COMMENT_END)
cursor += 2

// if it stands on the opening of inline comment, move cursor at the end of this line
case (Common, DASH) if SingleComment.openBy(remaining) =>
cursor += remaining.takeWhile(x => x != '\n').length

// if it stands on a normal semicolon, stage the current sql and move the cursor on
case (Common, SEMICOLON) =>
ret += currentSQL.toString.trim
currentSQL.clear()
cursor += 1

// if it stands on the openning of quotes, mark the flag and move on
case (Common, quoteChar) if Quote.validate(quoteChar) =>
flag = Quote(quoteChar)
currentSQL += current
cursor += 1
// if it stands on '\' in the quotes, consume 2 characters to avoid the ESCAPE of " or '
case (quote: Quote, ESCAPE) if remaining.length >= 2 =>
currentSQL.append(remaining.take(2))
cursor += 2
// if it stands on the ending of quotes, clear the flag and move on
case (quote: Quote, quoteChar) if quote.sameAs(quoteChar) =>
flag = Common
currentSQL += current
cursor += 1

// move on and push the char to the currentSQL
case _ =>
currentSQL += current
cursor += 1
}
}

ret += currentSQL.toString.trim
ret.filter(isNotBlank).toArray
}


/**
* Concatenation of sequence of strings to final string with cheap append method
* and one memory allocation for the final string.
*/
class StringConcat {
private val strings = new ArrayBuffer[String]
private val strings = new mutable.ArrayBuffer[String]
private var length: Int = 0

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,74 @@ class StringUtilsSuite extends SparkFunSuite {
assert(filterPattern(names, " d* ") === Nil)
}

test("split the SQL text") {
val statement = "select * from tmp.dada;"
assert(StringUtils.split(statement) === Array("select * from tmp.dada"))

// blanks will be omitted
val statements = " select * from tmp.data;;select * from tmp.ata;"
assert(StringUtils.split(statements) ===
Array("select * from tmp.data", "select * from tmp.ata"))

val escapedSingleQuote =
raw"""
|select "\';"
""".stripMargin.trim
val escapedDoubleQuote =
raw"""
|select "\";"
""".stripMargin.trim
assert(StringUtils.split(escapedSingleQuote) === Array(escapedSingleQuote))
assert(StringUtils.split(escapedDoubleQuote) === Array(escapedDoubleQuote))

val semicolonInDoubleQuotes =
"""
|select "^;^"
""".stripMargin.trim
val semicolonInSingleQuotes =
"""
|select '^;^'
""".stripMargin.trim
assert(StringUtils.split(semicolonInDoubleQuotes) === Array(semicolonInDoubleQuotes))
assert(StringUtils.split(semicolonInSingleQuotes) === Array(semicolonInSingleQuotes))

val inlineComments =
"""
|select 1; --;;;;;;;;
|select "---";
""".stripMargin
val select1 = "select 1"
val selectComments =
"""
|select "---"
""".stripMargin.trim
assert(StringUtils.split(inlineComments) === Array(select1, selectComments))

val bracketedComment1 = "select 1 /*;*/" // Good
assert(StringUtils.split(bracketedComment1) === Array(bracketedComment1))
val bracketedComment2 = "select 1 /* /* ; */" // Good
assert(StringUtils.split(bracketedComment2) === Array(bracketedComment2))
val bracketedComment3 = "select 1 /* */ ; */" // Bad
assert(StringUtils.split(bracketedComment3).head === "select 1 /* */")
val bracketedComment4 = "select 1 /**/ ; /* */" // Good
assert(StringUtils.split(bracketedComment4).head === "select 1 /**/")
val bracketedComment5 = "select 1 /**/ ; /**/" // Good
assert(StringUtils.split(bracketedComment5).head === "select 1 /**/")
val bracketedComment6 = "select /* bla bla */ 1" // Hints are reserved
assert(StringUtils.split(bracketedComment6).head === bracketedComment6)

val qQuote1 = "select 1 as `;`" // Good
assert(StringUtils.split(qQuote1) === Array(qQuote1))
val qQuote2 = "select 1 as ``;`" // Bad
assert(StringUtils.split(qQuote2) === Array("select 1 as ``", "`"))

// The splitter rule of the following two cases does not match the actual antlr4 rule
// We should not make the splitter two complicated
// val bracketedComment6 = "select 1 /**/ ; */" // Good
// val bracketedComment7 = "select 1 /* */ ; /* */" // Bad
// val qQuote3 = "select 1 as ```;`" // Good
}

test("string concatenation") {
def concat(seq: String*): String = {
seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString
Expand Down
5 changes: 5 additions & 0 deletions sql/hive-thriftserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>selenium-htmlunit-driver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._

import jline.console.ConsoleReader
import jline.console.history.FileHistory
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang.{StringUtils => ApacheStringUtils}
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor}
Expand All @@ -37,14 +37,16 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.log4j.Level
import org.apache.thrift.transport.TSocket
import sun.misc.{Signal, SignalHandler}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
import org.apache.spark.util.ShutdownHookManager
import org.apache.spark.util.{ShutdownHookManager, Utils}

/**
* This code doesn't support remote connections in Hive 1.2+, as the underlying CliDriver
Expand Down Expand Up @@ -143,8 +145,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// See also: code in ExecDriver.java
var loader = conf.getClassLoader
val auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)
if (StringUtils.isNotBlank(auxJars)) {
loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","))
if (ApacheStringUtils.isNotBlank(auxJars)) {
loader = Utilities.addToClassPath(loader, ApacheStringUtils.split(auxJars, ","))
}
conf.setClassLoader(loader)
Thread.currentThread().setContextClassLoader(loader)
Expand Down Expand Up @@ -309,12 +311,16 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
private val conf: Configuration =
if (sessionState != null) sessionState.getConf else new Configuration()

// Force initializing SparkSQLEnv. This is put here but not object SparkSQLCliDriver
// because the Hive unit tests do not go through the main() code path.
if (!isRemoteMode) {
SparkSQLEnv.init()
if (sessionState.getIsSilent) {
SparkSQLEnv.sparkContext.setLogLevel(Level.WARN.toString)
// Utils.isTesing consists of env[SPARK_TESTING] or props[spark.testing]
// env is multi-process-level, props is single-process-level
// CliSuite with env[SPARK_TESTING] requires SparkSQLEnv
// props[spark.testing] acts as a switcher for SparkSQLCLIDriverSuite
if (!sys.props.contains("spark.testing")) {
SparkSQLEnv.init()
if (sessionState.getIsSilent) {
SparkSQLEnv.sparkContext.setLogLevel(Level.WARN.toString)
}
}
} else {
// Hive 1.2 + not supported in CLI
Expand All @@ -331,6 +337,65 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
console.printInfo(s"Spark master: $master, Application Id: $appId")
}

// method body imported from Hive and translated from Java to Scala
override def processLine(line: String, allowInterrupting: Boolean): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the default processLine implementation doesn't handle ; well? do you mean hive sql shell have this bug as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,there is a buggy impl in hive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the hive code and seems the ; is well handled at least in the master branch: https://github.com/apache/hive/blob/master/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java#L395

Do you mean only Hive 1.2 has this bug? Maybe we should upgrade hive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,and I had not checked the impl on Hive master. We may judge which impl is better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var oldSignal: SignalHandler = null
var interruptSignal: Signal = null

if (allowInterrupting) {
// Remember all threads that were running at the time we started line processing.
// Hook up the custom Ctrl+C handler while processing this line
interruptSignal = new Signal("INT")
oldSignal = Signal.handle(interruptSignal, new SignalHandler() {
private val cliThread = Thread.currentThread()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the meaning of cliThread, I don't find any usage.

private var interruptRequested: Boolean = false

override def handle(signal: Signal) {
val initialRequest = !interruptRequested
interruptRequested = true

// Kill the VM on second ctrl+c
if (!initialRequest) {
console.printInfo("Exiting the JVM")
System.exit(127)
}

// Interrupt the CLI thread to stop the current statement and return
// to prompt
console.printInfo("Interrupting... Be patient, this might take some time.")
console.printInfo("Press Ctrl+C again to kill JVM")

// First, kill any running Spark jobs
// TODO
Copy link
Member

@turboFei turboFei Mar 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think HiveInterruptUtils.interrupt() should be added here.
Because SparkSQLCLIDriver has invoke installSignalHandler() to add HiveInterruptCallback, which would cancel all spark jobs when
HiveInterruptUtils.interrupt() is invoked.
see

installSignalHandler()
/**
* Install an interrupt callback to cancel all Spark jobs. In Hive's CliDriver#processLine(),
* a signal handler will invoke this registered callback if a Ctrl+C signal is detected while
* a command is being processed by the current thread.
*/
def installSignalHandler() {
HiveInterruptUtils.add(new HiveInterruptCallback {
override def interrupt() {
// Handle remote execution mode
if (SparkSQLEnv.sparkContext != null) {
SparkSQLEnv.sparkContext.cancelAllJobs()
} else {
if (transport != null) {
// Force closing of TCP connection upon session termination
transport.getSocket.close()
}
}
}
})
}

}
})
}

try {
var lastRet: Int = 0
var ret: Int = 0

for (command <- StringUtils.split(line)) {
ret = processCmd(command)
// wipe cli query state
sessionState.setCommandType(null)
lastRet = ret
val ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS)
if (ret != 0 && !ignoreErrors) {
CommandProcessorFactory.clean(conf.asInstanceOf[HiveConf])
return ret
}
}
CommandProcessorFactory.clean(conf.asInstanceOf[HiveConf]);
lastRet
} finally {
// Once we are done processing the line, restore the old handler
if (oldSignal != null && interruptSignal != null) {
Signal.handle(interruptSignal, oldSignal)
}
}
}

override def processCmd(cmd: String): Int = {
val cmd_trimmed: String = cmd.trim()
val cmd_lower = cmd_trimmed.toLowerCase(Locale.ROOT)
Expand Down
Loading