Skip to content

Commit 74466a3

Browse files
refactor as feedbacks
1 parent 396c0e1 commit 74466a3

File tree

4 files changed

+145
-107
lines changed

4 files changed

+145
-107
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,10 @@ private[hive] object HiveQl {
11281128
Explode(attributes, nodeToExpr(child))
11291129

11301130
case Token("TOK_FUNCTION", Token(functionName, Nil) :: children) =>
1131-
HiveGenericUdtf(new HiveFunctionCache(functionName), attributes, children.map(nodeToExpr))
1131+
HiveGenericUdtf(
1132+
new HiveFunctionWrapper(functionName),
1133+
attributes,
1134+
children.map(nodeToExpr))
11321135

11331136
case a: ASTNode =>
11341137
throw new NotImplementedError(

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala

Lines changed: 25 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -54,94 +54,31 @@ private[hive] abstract class HiveFunctionRegistry
5454
val functionClassName = functionInfo.getFunctionClass.getName
5555

5656
if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
57-
HiveSimpleUdf(new HiveFunctionCache(functionClassName), children)
57+
HiveSimpleUdf(new HiveFunctionWrapper(functionClassName), children)
5858
} else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
59-
HiveGenericUdf(new HiveFunctionCache(functionClassName), children)
59+
HiveGenericUdf(new HiveFunctionWrapper(functionClassName), children)
6060
} else if (
6161
classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
62-
HiveGenericUdaf(new HiveFunctionCache(functionClassName), children)
62+
HiveGenericUdaf(new HiveFunctionWrapper(functionClassName), children)
6363
} else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
64-
HiveUdaf(new HiveFunctionCache(functionClassName), children)
64+
HiveUdaf(new HiveFunctionWrapper(functionClassName), children)
6565
} else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
66-
HiveGenericUdtf(new HiveFunctionCache(functionClassName), Nil, children)
66+
HiveGenericUdtf(new HiveFunctionWrapper(functionClassName), Nil, children)
6767
} else {
6868
sys.error(s"No handler for udf ${functionInfo.getFunctionClass}")
6969
}
7070
}
7171
}
7272

73-
/**
74-
* This class provides the UDF creation and also the UDF instance serialization and
75-
* de-serialization cross process boundary.
76-
*
77-
* We use class instead of trait, seems property variables of trait cannot be serialized when
78-
* bundled with Case Class; in the other hand, we need to intercept the UDF instance ser/de.
79-
* the "Has-a" probably better than "Is-a".
80-
* @param functionClassName UDF class name
81-
*/
82-
class HiveFunctionCache(var functionClassName: String) extends java.io.Externalizable {
83-
// for Serialization
84-
def this() = this(null)
85-
86-
private var instance: Any = null
87-
88-
def writeExternal(out: java.io.ObjectOutput) {
89-
// output the function name
90-
out.writeUTF(functionClassName)
91-
92-
// Write a flag if instance is null or not
93-
out.writeBoolean(instance != null)
94-
if (instance != null) {
95-
// Some of the UDF are serializable, but some others are not
96-
// Hive Utilities can handle both cases
97-
val baos = new java.io.ByteArrayOutputStream()
98-
HiveShim.serializePlan(instance, baos)
99-
val functionInBytes = baos.toByteArray
100-
101-
// output the function bytes
102-
out.writeInt(functionInBytes.length)
103-
out.write(functionInBytes, 0, functionInBytes.length)
104-
}
105-
}
106-
107-
def readExternal(in: java.io.ObjectInput) {
108-
// read the function name
109-
functionClassName = in.readUTF()
110-
111-
if (in.readBoolean()) {
112-
// if the instance is not null
113-
// read the function in bytes
114-
val functionInBytesLength = in.readInt()
115-
val functionInBytes = new Array[Byte](functionInBytesLength)
116-
in.read(functionInBytes, 0, functionInBytesLength)
117-
118-
// deserialize the function object via Hive Utilities
119-
instance = HiveShim.deserializePlan(new java.io.ByteArrayInputStream(functionInBytes),
120-
getContextOrSparkClassLoader.loadClass(functionClassName))
121-
}
122-
}
123-
124-
def createFunction[UDFType](alwaysCreateNewInstance: Boolean = false) = {
125-
if (alwaysCreateNewInstance) {
126-
getContextOrSparkClassLoader.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
127-
} else {
128-
if (instance == null) {
129-
instance = getContextOrSparkClassLoader.loadClass(functionClassName).newInstance
130-
}
131-
instance.asInstanceOf[UDFType]
132-
}
133-
}
134-
}
135-
136-
private[hive] case class HiveSimpleUdf(cache: HiveFunctionCache, children: Seq[Expression])
73+
private[hive] case class HiveSimpleUdf(funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
13774
extends Expression with HiveInspectors with Logging {
13875
type EvaluatedType = Any
13976
type UDFType = UDF
14077

14178
def nullable = true
14279

14380
@transient
144-
lazy val function = cache.createFunction[UDFType](true) // Simple UDF should be not serialized.
81+
lazy val function = funcWrapper.createFunction[UDFType]()
14582

14683
@transient
14784
protected lazy val method =
@@ -180,7 +117,7 @@ private[hive] case class HiveSimpleUdf(cache: HiveFunctionCache, children: Seq[E
180117
returnInspector)
181118
}
182119

183-
override def toString = s"$nodeName#${cache.functionClassName}(${children.mkString(",")})"
120+
override def toString = s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
184121
}
185122

186123
// Adapter from Catalyst ExpressionResult to Hive DeferredObject
@@ -194,15 +131,15 @@ private[hive] class DeferredObjectAdapter(oi: ObjectInspector)
194131
override def get(): AnyRef = wrap(func(), oi)
195132
}
196133

197-
private[hive] case class HiveGenericUdf(cache: HiveFunctionCache, children: Seq[Expression])
134+
private[hive] case class HiveGenericUdf(funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
198135
extends Expression with HiveInspectors with Logging {
199136
type UDFType = GenericUDF
200137
type EvaluatedType = Any
201138

202139
def nullable = true
203140

204141
@transient
205-
lazy val function = cache.createFunction[UDFType]()
142+
lazy val function = funcWrapper.createFunction[UDFType]()
206143

207144
@transient
208145
protected lazy val argumentInspectors = children.map(toInspector)
@@ -241,18 +178,18 @@ private[hive] case class HiveGenericUdf(cache: HiveFunctionCache, children: Seq[
241178
unwrap(function.evaluate(deferedObjects), returnInspector)
242179
}
243180

244-
override def toString = s"$nodeName#${cache.functionClassName}(${children.mkString(",")})"
181+
override def toString = s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
245182
}
246183

247184
private[hive] case class HiveGenericUdaf(
248-
cache: HiveFunctionCache,
185+
funcWrapper: HiveFunctionWrapper,
249186
children: Seq[Expression]) extends AggregateExpression
250187
with HiveInspectors {
251188

252189
type UDFType = AbstractGenericUDAFResolver
253190

254191
@transient
255-
protected lazy val resolver: AbstractGenericUDAFResolver = cache.createFunction()
192+
protected lazy val resolver: AbstractGenericUDAFResolver = funcWrapper.createFunction()
256193

257194
@transient
258195
protected lazy val objectInspector = {
@@ -267,22 +204,22 @@ private[hive] case class HiveGenericUdaf(
267204

268205
def nullable: Boolean = true
269206

270-
override def toString = s"$nodeName#${cache.functionClassName}(${children.mkString(",")})"
207+
override def toString = s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
271208

272-
def newInstance() = new HiveUdafFunction(cache, children, this)
209+
def newInstance() = new HiveUdafFunction(funcWrapper, children, this)
273210
}
274211

275212
/** It is used as a wrapper for the hive functions which uses UDAF interface */
276213
private[hive] case class HiveUdaf(
277-
cache: HiveFunctionCache,
214+
funcWrapper: HiveFunctionWrapper,
278215
children: Seq[Expression]) extends AggregateExpression
279216
with HiveInspectors {
280217

281218
type UDFType = UDAF
282219

283220
@transient
284221
protected lazy val resolver: AbstractGenericUDAFResolver =
285-
new GenericUDAFBridge(cache.createFunction())
222+
new GenericUDAFBridge(funcWrapper.createFunction())
286223

287224
@transient
288225
protected lazy val objectInspector = {
@@ -297,10 +234,10 @@ private[hive] case class HiveUdaf(
297234

298235
def nullable: Boolean = true
299236

300-
override def toString = s"$nodeName#${cache.functionClassName}(${children.mkString(",")})"
237+
override def toString = s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
301238

302239
def newInstance() =
303-
new HiveUdafFunction(cache, children, this, true)
240+
new HiveUdafFunction(funcWrapper, children, this, true)
304241
}
305242

306243
/**
@@ -315,13 +252,13 @@ private[hive] case class HiveUdaf(
315252
* user defined aggregations, which have clean semantics even in a partitioned execution.
316253
*/
317254
private[hive] case class HiveGenericUdtf(
318-
cache: HiveFunctionCache,
255+
funcWrapper: HiveFunctionWrapper,
319256
aliasNames: Seq[String],
320257
children: Seq[Expression])
321258
extends Generator with HiveInspectors {
322259

323260
@transient
324-
protected lazy val function: GenericUDTF = cache.createFunction()
261+
protected lazy val function: GenericUDTF = funcWrapper.createFunction()
325262

326263
@transient
327264
protected lazy val inputInspectors = children.map(_.dataType).map(toInspector)
@@ -378,11 +315,11 @@ private[hive] case class HiveGenericUdtf(
378315
}
379316
}
380317

381-
override def toString = s"$nodeName#${cache.functionClassName}(${children.mkString(",")})"
318+
override def toString = s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
382319
}
383320

384321
private[hive] case class HiveUdafFunction(
385-
cache: HiveFunctionCache,
322+
funcWrapper: HiveFunctionWrapper,
386323
exprs: Seq[Expression],
387324
base: AggregateExpression,
388325
isUDAFBridgeRequired: Boolean = false)
@@ -393,9 +330,9 @@ private[hive] case class HiveUdafFunction(
393330

394331
private val resolver =
395332
if (isUDAFBridgeRequired) {
396-
new GenericUDAFBridge(cache.createFunction[UDAF]())
333+
new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
397334
} else {
398-
cache.createFunction[AbstractGenericUDAFResolver]()
335+
funcWrapper.createFunction[AbstractGenericUDAFResolver]()
399336
}
400337

401338
private val inspectors = exprs.map(_.dataType).map(toInspector).toArray

sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,22 +43,23 @@ import scala.language.implicitConversions
4343

4444
import org.apache.spark.sql.catalyst.types.DecimalType
4545

46+
class HiveFunctionWrapper(var functionClassName: String) extends java.io.Serializable {
47+
// for Serialization
48+
def this() = this(null)
49+
50+
import org.apache.spark.util.Utils._
51+
def createFunction[UDFType <: AnyRef](): UDFType = {
52+
getContextOrSparkClassLoader
53+
.loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
54+
}
55+
}
56+
4657
/**
4758
* A compatibility layer for interacting with Hive version 0.12.0.
4859
*/
4960
private[hive] object HiveShim {
5061
val version = "0.12.0"
5162

52-
import org.apache.hadoop.hive.ql.exec.Utilities
53-
54-
def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[UDFType]): UDFType = {
55-
Utilities.deserializePlan(is).asInstanceOf[UDFType]
56-
}
57-
58-
def serializePlan(function: Any, out: java.io.OutputStream): Unit = {
59-
Utilities.serializePlan(function, out)
60-
}
61-
6263
def getTableDesc(
6364
serdeClass: Class[_ <: Deserializer],
6465
inputFormatClass: Class[_ <: InputFormat[_, _]],

0 commit comments

Comments
 (0)