Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
4a6f903
Reuse completeNextStageWithFetchFailure
beliefer Jun 19, 2020
96456e2
Merge remote-tracking branch 'upstream/master'
beliefer Jul 1, 2020
4314005
Merge remote-tracking branch 'upstream/master'
beliefer Jul 3, 2020
d6af4a7
Merge remote-tracking branch 'upstream/master'
beliefer Jul 9, 2020
f69094f
Merge remote-tracking branch 'upstream/master'
beliefer Jul 16, 2020
b86a42d
Merge remote-tracking branch 'upstream/master'
beliefer Jul 25, 2020
2ac5159
Merge branch 'master' of github.com:beliefer/spark
beliefer Jul 25, 2020
9021d6c
Merge remote-tracking branch 'upstream/master'
beliefer Jul 28, 2020
74a2ef4
Merge branch 'master' of github.com:beliefer/spark
beliefer Jul 28, 2020
9828158
Merge remote-tracking branch 'upstream/master'
beliefer Jul 31, 2020
9cd1aaf
Merge remote-tracking branch 'upstream/master'
beliefer Aug 5, 2020
abfcbb9
Merge remote-tracking branch 'upstream/master'
beliefer Aug 26, 2020
07c6c81
Merge remote-tracking branch 'upstream/master'
beliefer Sep 1, 2020
580130b
Merge remote-tracking branch 'upstream/master'
beliefer Sep 2, 2020
3712808
Merge branch 'master' of github.com:beliefer/spark
beliefer Sep 11, 2020
6107413
Merge remote-tracking branch 'upstream/master'
beliefer Sep 11, 2020
4b799b4
Merge remote-tracking branch 'upstream/master'
beliefer Sep 14, 2020
ee0ecbf
Merge remote-tracking branch 'upstream/master'
beliefer Sep 18, 2020
596bc61
Merge remote-tracking branch 'upstream/master'
beliefer Sep 24, 2020
0164e2f
Merge remote-tracking branch 'upstream/master'
beliefer Sep 27, 2020
90b79fc
Merge remote-tracking branch 'upstream/master'
beliefer Sep 29, 2020
2cef3a9
Merge remote-tracking branch 'upstream/master'
beliefer Oct 13, 2020
c26b64f
Merge remote-tracking branch 'upstream/master'
beliefer Oct 19, 2020
2e02cd2
Merge remote-tracking branch 'upstream/master'
beliefer Oct 22, 2020
a6d0741
Merge remote-tracking branch 'upstream/master'
beliefer Oct 28, 2020
82e5b2c
Merge remote-tracking branch 'upstream/master'
beliefer Nov 4, 2020
70bbf5d
Merge remote-tracking branch 'upstream/master'
beliefer Nov 6, 2020
126a51e
Merge remote-tracking branch 'upstream/master'
beliefer Nov 13, 2020
f2ceacd
Merge remote-tracking branch 'upstream/master'
beliefer Nov 19, 2020
5ad208f
Merge remote-tracking branch 'upstream/master'
beliefer Nov 23, 2020
cd81faf
Extended decode
beliefer Nov 24, 2020
338d28b
Update golden file
beliefer Nov 24, 2020
2c7a822
Improve code
beliefer Nov 25, 2020
1cf326b
Improve code
beliefer Nov 25, 2020
80c56a1
Optimize code
beliefer Dec 8, 2020
6a1b919
Update comments
beliefer Dec 8, 2020
1614e62
Optimize code
beliefer Dec 9, 2020
eea967a
Update since
beliefer Dec 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.commons.codec.binary.{Base64 => CommonsBase64}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TypeCheckResult}
import org.apache.spark.sql.catalyst.expressions.codegen._
Expand Down Expand Up @@ -2082,6 +2083,65 @@ case class UnBase64(child: Expression)
}
}

object Decode {
def createExpr(params: Seq[Expression]): Expression = {
params.length match {
case 0 | 1 =>
throw new AnalysisException("Invalid number of arguments for function decode. " +
s"Expected: 2; Found: ${params.length}")
case 2 => StringDecode(params.head, params.last)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, IMHO it would be good to rename the existing decode func or assign a new name into the new decode func because the generated doc for decode will be complicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If rename the existing decode func will break the behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, so I think we need add a legacy config, update the migration guide, ... if we rename it. Anyway, the latter approach (assigning a new name into the new decode func) looks better if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR could avoid migration and support the new decode as well. cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably rename the function in ansi mode.

case _ =>
val input = params.head
val other = params.tail
val itr = other.iterator
var default: Expression = Literal.create(null, StringType)
val branches = ArrayBuffer.empty[(Expression, Expression)]
while (itr.hasNext) {
val search = itr.next
if (itr.hasNext) {
val condition = EqualTo(input, search)
branches += ((condition, itr.next))
} else {
default = search
}
}
CaseWhen(branches.seq, default)
}
}
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
|_FUNC_(bin, charset) - Decodes the first argument using the second argument character set.
|
|_FUNC_(expr, search, result [, search, result ] ... [, default]) - Decode compares expr
| to each search value one by one. If expr is equal to a search, returns the corresponding result.
| If no match is found, then Oracle returns default. If default is omitted, returns null.
""",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you follow the other format? e.g.,

usage = "_FUNC_(str, search[, replace]) - Replaces all occurrences of `search` with `replace`.",
arguments = """
Arguments:
* str - a string expression
* search - a string expression. If `search` is not found in `str`, `str` is returned unchanged.
* replace - a string expression. If `replace` is not specified or is an empty string, nothing replaces
the string that is removed from `str`.
""",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new decode function is the same as other function uses RuntimeReplaceable. The parameter list is not fixed.

examples = """
Examples:
> SELECT _FUNC_(encode('abc', 'utf-8'), 'utf-8');
abc
> SELECT _FUNC_(2, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle', 'Non domestic');
San Francisco
> SELECT _FUNC_(6, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle', 'Non domestic');
Non domestic
> SELECT _FUNC_(6, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle');
NULL
""",
since = "3.2.0")
// scalastyle:on line.size.limit
case class Decode(params: Seq[Expression], child: Expression) extends RuntimeReplaceable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this replaces the existing decode in SQL side only, right? shall we update migration guide?

Copy link
Contributor Author

@beliefer beliefer Dec 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new decode could keep compatible with old one.


def this(params: Seq[Expression]) = {
this(params, Decode.createExpr(params))
}

override def flatArguments: Iterator[Any] = Iterator(params)
override def exprsReplaced: Seq[Expression] = params
}

/**
* Decodes the first argument into a String using the provided character set
* (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16').
Expand All @@ -2097,7 +2157,7 @@ case class UnBase64(child: Expression)
""",
since = "1.5.0")
// scalastyle:on line.size.limit
case class Decode(bin: Expression, charset: Expression)
case class StringDecode(bin: Expression, charset: Expression)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an alternative for the previous decode after this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only renames the expression, not the function. This PR just adds an overload of the decode function when there are more than 3 parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest the following followup:

  1. add a new function name for StringDecode. Probably "string_decode".
  2. require decode function to take more than 2 parameters, under ansi mode(or a legacy config)

extends BinaryExpression with ImplicitCastInputTypes with NullIntolerant {

override def left: Expression = bin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
test("SPARK-22543: split large if expressions into blocks due to JVM code size limit") {
var strExpr: Expression = Literal("abc")
for (_ <- 1 to 150) {
strExpr = Decode(Encode(strExpr, "utf-8"), "utf-8")
strExpr = StringDecode(Encode(strExpr, "utf-8"), "utf-8")
}

val expressions = Seq(If(EqualTo(strExpr, strExpr), strExpr, strExpr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,23 +349,23 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// scalastyle:off
// non ascii characters are not allowed in the code, so we disable the scalastyle here.
checkEvaluation(
Decode(Encode(Literal("大千世界"), Literal("UTF-16LE")), Literal("UTF-16LE")), "大千世界")
StringDecode(Encode(Literal("大千世界"), Literal("UTF-16LE")), Literal("UTF-16LE")), "大千世界")
checkEvaluation(
Decode(Encode(a, Literal("utf-8")), Literal("utf-8")), "大千世界", create_row("大千世界"))
StringDecode(Encode(a, Literal("utf-8")), Literal("utf-8")), "大千世界", create_row("大千世界"))
checkEvaluation(
Decode(Encode(a, Literal("utf-8")), Literal("utf-8")), "", create_row(""))
StringDecode(Encode(a, Literal("utf-8")), Literal("utf-8")), "", create_row(""))
// scalastyle:on
checkEvaluation(Encode(a, Literal("utf-8")), null, create_row(null))
checkEvaluation(Encode(Literal.create(null, StringType), Literal("utf-8")), null)
checkEvaluation(Encode(a, Literal.create(null, StringType)), null, create_row(""))

checkEvaluation(Decode(b, Literal("utf-8")), null, create_row(null))
checkEvaluation(Decode(Literal.create(null, BinaryType), Literal("utf-8")), null)
checkEvaluation(Decode(b, Literal.create(null, StringType)), null, create_row(null))
checkEvaluation(StringDecode(b, Literal("utf-8")), null, create_row(null))
checkEvaluation(StringDecode(Literal.create(null, BinaryType), Literal("utf-8")), null)
checkEvaluation(StringDecode(b, Literal.create(null, StringType)), null, create_row(null))

// Test escaping of charset
GenerateUnsafeProjection.generate(Encode(a, Literal("\"quote")) :: Nil)
GenerateUnsafeProjection.generate(Decode(b, Literal("\"quote")) :: Nil)
GenerateUnsafeProjection.generate(StringDecode(b, Literal("\"quote")) :: Nil)
}

test("initcap unit test") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2438,7 +2438,7 @@ object functions {
* @since 1.5.0
*/
def decode(value: Column, charset: String): Column = withExpr {
Decode(value.expr, lit(charset).expr)
StringDecode(value.expr, lit(charset).expr)
}

/**
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/string-functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,13 @@ SELECT trim(TRAILING 'xy' FROM 'TURNERyxXxy');
-- Check lpad/rpad with invalid length parameter
SELECT lpad('hi', 'invalid_length');
SELECT rpad('hi', 'invalid_length');

-- decode
select decode();
select decode(encode('abc', 'utf-8'));
select decode(encode('abc', 'utf-8'), 'utf-8');
select decode(1, 1, 'Southlake');
select decode(2, 1, 'Southlake');
select decode(2, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle', 'Non domestic');
select decode(6, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle', 'Non domestic');
select decode(6, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle');
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 36
-- Number of queries: 44


-- !query
Expand Down Expand Up @@ -294,3 +294,69 @@ struct<>
-- !query output
java.lang.NumberFormatException
invalid input syntax for type numeric: invalid_length


-- !query
select decode()
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function decode. Expected: 2; Found: 0;; line 1 pos 7


-- !query
select decode(encode('abc', 'utf-8'))
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function decode. Expected: 2; Found: 1;; line 1 pos 7


-- !query
select decode(encode('abc', 'utf-8'), 'utf-8')
-- !query schema
struct<decode(encode(abc, utf-8), utf-8):string>
-- !query output
abc


-- !query
select decode(1, 1, 'Southlake')
-- !query schema
struct<decode(1, 1, Southlake):string>
-- !query output
Southlake


-- !query
select decode(2, 1, 'Southlake')
-- !query schema
struct<decode(2, 1, Southlake):string>
-- !query output
NULL


-- !query
select decode(2, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle', 'Non domestic')
-- !query schema
struct<decode(2, 1, Southlake, 2, San Francisco, 3, New Jersey, 4, Seattle, Non domestic):string>
-- !query output
San Francisco


-- !query
select decode(6, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle', 'Non domestic')
-- !query schema
struct<decode(6, 1, Southlake, 2, San Francisco, 3, New Jersey, 4, Seattle, Non domestic):string>
-- !query output
Non domestic


-- !query
select decode(6, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle')
-- !query schema
struct<decode(6, 1, Southlake, 2, San Francisco, 3, New Jersey, 4, Seattle):string>
-- !query output
NULL
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 36
-- Number of queries: 44


-- !query
Expand Down Expand Up @@ -290,3 +290,69 @@ SELECT rpad('hi', 'invalid_length')
struct<rpad(hi, CAST(invalid_length AS INT), ):string>
-- !query output
NULL


-- !query
select decode()
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function decode. Expected: 2; Found: 0;; line 1 pos 7


-- !query
select decode(encode('abc', 'utf-8'))
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function decode. Expected: 2; Found: 1;; line 1 pos 7


-- !query
select decode(encode('abc', 'utf-8'), 'utf-8')
-- !query schema
struct<decode(encode(abc, utf-8), utf-8):string>
-- !query output
abc


-- !query
select decode(1, 1, 'Southlake')
-- !query schema
struct<decode(1, 1, Southlake):string>
-- !query output
Southlake


-- !query
select decode(2, 1, 'Southlake')
-- !query schema
struct<decode(2, 1, Southlake):string>
-- !query output
NULL


-- !query
select decode(2, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle', 'Non domestic')
-- !query schema
struct<decode(2, 1, Southlake, 2, San Francisco, 3, New Jersey, 4, Seattle, Non domestic):string>
-- !query output
San Francisco


-- !query
select decode(6, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle', 'Non domestic')
-- !query schema
struct<decode(6, 1, Southlake, 2, San Francisco, 3, New Jersey, 4, Seattle, Non domestic):string>
-- !query output
Non domestic


-- !query
select decode(6, 1, 'Southlake', 2, 'San Francisco', 3, 'New Jersey', 4, 'Seattle')
-- !query schema
struct<decode(6, 1, Southlake, 2, San Francisco, 3, New Jersey, 4, Seattle):string>
-- !query output
NULL