Skip to content
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2682,24 +2682,38 @@ case class Chr(child: Expression)
""",
since = "1.5.0",
group = "string_funcs")
case class Base64(child: Expression)
extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
case class Base64(child: Expression, chunkBase64: Boolean)
extends UnaryExpression with RuntimeReplaceable with ImplicitCastInputTypes with NullIntolerant {

def this(expr: Expression) = this(expr, SQLConf.get.chunkBase64StringEnabled)

override def dataType: DataType = SQLConf.get.defaultStringType
override def inputTypes: Seq[DataType] = Seq(BinaryType)

protected override def nullSafeEval(bytes: Any): Any = {
UTF8String.fromBytes(JBase64.getMimeEncoder.encode(bytes.asInstanceOf[Array[Byte]]))
}
override def replacement: Expression = StaticInvoke(
classOf[Base64],
dataType,
"encode",
Seq(child, Literal(chunkBase64, BooleanType)),
Seq(BinaryType, BooleanType))

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, (child) => {
s"""${ev.value} = UTF8String.fromBytes(
${classOf[JBase64].getName}.getMimeEncoder().encode($child));
"""})
}
override protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)
}

object Base64 {
def apply(expr: Expression): Base64 = new Base64(expr)

override protected def withNewChildInternal(newChild: Expression): Base64 = copy(child = newChild)
private lazy val nonChunkEncoder = JBase64.getMimeEncoder(-1, Array())

def encode(input: Array[Byte], chunkBase64: Boolean): UTF8String = {
val encoder = if (chunkBase64) {
JBase64.getMimeEncoder
} else {
nonChunkEncoder
}
UTF8String.fromBytes(encoder.encode(input))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3525,6 +3525,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val CHUNK_BASE_64_STRING_ENABLED = buildConf("spark.sql.legacy.chunkBase64String.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: BASE_64 to BASE64

.internal()
.doc("Whether to truncate string generated by the `Base64` function. When true, base64" +
" strings generated by the base64 function are chunked into lines of at most 76" +
" characters. When false, the base64 strings are not chunked.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val ENABLE_DEFAULT_COLUMNS =
buildConf("spark.sql.defaultColumn.enabled")
.internal()
Expand Down Expand Up @@ -5856,6 +5865,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def ansiRelationPrecedence: Boolean = ansiEnabled && getConf(ANSI_RELATION_PRECEDENCE)

def chunkBase64StringEnabled: Boolean = getConf(CHUNK_BASE_64_STRING_ENABLED)

def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
case "TIMESTAMP_LTZ" =>
// For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,19 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
GenerateUnsafeProjection.generate(StringDecode(b, Literal("\"quote")).replacement :: Nil)
}

test("SPARK-47307: base64 encoding without chunking") {
val longString = "a" * 58
val encoded = "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ=="
withSQLConf(SQLConf.CHUNK_BASE_64_STRING_ENABLED.key -> "false") {
checkEvaluation(Base64(Literal(longString.getBytes)), encoded)
}
val chunkEncoded =
s"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\r\nYQ=="
withSQLConf(SQLConf.CHUNK_BASE_64_STRING_ENABLED.key -> "true") {
checkEvaluation(Base64(Literal(longString.getBytes)), chunkEncoded)
}
}

test("initcap unit test") {
checkEvaluation(InitCap(Literal.create(null, StringType)), null)
checkEvaluation(InitCap(Literal("a b")), "A B")
Expand Down