Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [base64(cast(g#0 as binary)) AS base64(g)#0]
Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.Base64, StringType, encode, cast(g#0 as binary), false, BinaryType, BooleanType, true, true, true) AS base64(g)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
Expand Up @@ -2419,24 +2419,40 @@ case class Chr(child: Expression)
""",
since = "1.5.0",
group = "string_funcs")
case class Base64(child: Expression)
extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
case class Base64(child: Expression, chunkBase64: Boolean)
extends UnaryExpression with RuntimeReplaceable with ImplicitCastInputTypes {

def this(expr: Expression) = this(expr, SQLConf.get.chunkBase64StringEnabled)

override def dataType: DataType = StringType
override def inputTypes: Seq[DataType] = Seq(BinaryType)

protected override def nullSafeEval(bytes: Any): Any = {
UTF8String.fromBytes(JBase64.getMimeEncoder.encode(bytes.asInstanceOf[Array[Byte]]))
}
override def replacement: Expression = StaticInvoke(
classOf[Base64],
dataType,
"encode",
Seq(child, Literal(chunkBase64, BooleanType)),
Seq(BinaryType, BooleanType))

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, (child) => {
s"""${ev.value} = UTF8String.fromBytes(
${classOf[JBase64].getName}.getMimeEncoder().encode($child));
"""})
}
override def toString: String = s"$prettyName($child)"

override protected def withNewChildInternal(newChild: Expression): Base64 = copy(child = newChild)
override protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)
}

object Base64 {
def apply(expr: Expression): Base64 = new Base64(expr)

private lazy val nonChunkEncoder = JBase64.getMimeEncoder(-1, Array())

def encode(input: Array[Byte], chunkBase64: Boolean): UTF8String = {
val encoder = if (chunkBase64) {
JBase64.getMimeEncoder
} else {
nonChunkEncoder
}
UTF8String.fromBytes(encoder.encode(input))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3229,6 +3229,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val CHUNK_BASE64_STRING_ENABLED = buildConf("spark.sql.legacy.chunkBase64String.enabled")
.internal()
.doc("Whether to truncate string generated by the `Base64` function. When true, base64" +
" strings generated by the base64 function are chunked into lines of at most 76" +
" characters. When false, the base64 strings are not chunked.")
.version("3.5.2")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, if we start to backport SPARK-47307, it will go to Apache Spark 3.4.4 together, right? In that case, I'm curious if 3.5.2 is correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Got it. I saw this comment from Apache Spark 3.5.2 release manager, @yaooqinn .

If then, are we going to update these values from master and branch-3.5 when we do the release of Apache Spark 3.4.4? I'm fine if we are going to do in that way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @dongjoon-hyun.

Not related to this PR, maybe we shall add multiple fixed version in this field, such as 3.4.4, 3.5.2, 4.0.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, it's possible, but it will enforce us to update all the existing configurations and documentations. So, we had better not to because it could be too much.

.booleanConf
.createWithDefault(false)

val ENABLE_DEFAULT_COLUMNS =
buildConf("spark.sql.defaultColumn.enabled")
.internal()
Expand Down Expand Up @@ -5111,6 +5120,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def ansiRelationPrecedence: Boolean = ansiEnabled && getConf(ANSI_RELATION_PRECEDENCE)

def chunkBase64StringEnabled: Boolean = getConf(CHUNK_BASE64_STRING_ENABLED)

def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
case "TIMESTAMP_LTZ" =>
// For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,15 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB
new ArrayBasedMapData(keyArray, valueArray)
}

protected def replace(expr: Expression): Expression = expr match {
case r: RuntimeReplaceable => replace(r.replacement)
case _ => expr.mapChildren(replace)
}

private def prepareEvaluation(expression: Expression): Expression = {
val serializer = new JavaSerializer(new SparkConf()).newInstance
val resolver = ResolveTimeZone
val expr = resolver.resolveTimeZones(expression)
val expr = resolver.resolveTimeZones(replace(expression))
assert(expr.resolved)
serializer.deserialize(serializer.serialize(expr))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,19 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
GenerateUnsafeProjection.generate(StringDecode(b, Literal("\"quote")) :: Nil)
}

test("SPARK-47307: base64 encoding without chunking") {
val longString = "a" * 58
val encoded = "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ=="
withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "false") {
checkEvaluation(Base64(Literal(longString.getBytes)), encoded)
}
val chunkEncoded =
s"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\r\nYQ=="
withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "true") {
checkEvaluation(Base64(Literal(longString.getBytes)), chunkEncoded)
}
}

test("initcap unit test") {
checkEvaluation(InitCap(Literal.create(null, StringType)), null)
checkEvaluation(InitCap(Literal("a b")), "A B")
Expand Down