-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20254][SQL] Remove unnecessary data conversion for Dataset with primitive array #17568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #75610 has finished for PR 17568 at commit
|
|
@cloud-fan could you please review this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it really safe to do so? The MapObject is not only used for null checking, but also to resolve struct in array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be safe, we should check:
- no custom collection class specified
- the
functionwill convert an expressionetoAssertNotNull(e)(this guarantees we are expecting a primitive array) - the
inputDatais of type array and its element is not nullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. Now, I missed 2. Am I correct?
- Already did by
cls.isEmpty - We have to check whether
etis primitive type or not at line 2233. - Already did by
ArrayType(et, false)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea
|
Test build #75624 has finished for PR 17568 at commit
|
|
@cloud-fan how about this check for 2.? |
|
ping @cloud-fan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the expected array element type is same as the actual element array type, and the cutomer collection class is empty, then we can say that this MapObjects is a no-op and we can remove it.
The information of expected array element type is actually hidden in the map function, so I think it's hard to do the check here. How about we add an optimizer rule that if MapObjects.lambdaFunction is LambdaVariable, we eliminate this MapObjects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, the et here is the actual array element type, not the expected one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I will work for adding this optimizer rule.
|
Test build #75669 has finished for PR 17568 at commit
|
|
@cloud-fan I would appreciate it if you have time to look at this. |
|
@cloud-fan we would appreciate it if we merge this into Spark 2.2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just match MapObjects, as it's quite self-contained, we don't need other information to decide if we can remove MapObjects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have AssertNotNull, then it's not a no-op and we can not eliminate this MapObjects. We may need to update NullPropagation to eliminate no-op AssertNotNull
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done, too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type doesn't matter, we just need to check if the map function is a no-op. If the type is not primitive, e.g. String, Timestamp, etc. the map function will be Invoke
|
Test build #75804 has finished for PR 17568 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a is not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good cattch. done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case MapObjects(_, _, _, _: LambdaVariable, inputData, None, _) => inputData
can this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually don't test optimizer in end-to-end test, see TypedFilterOptimizationSuite as an example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing it out. I implemented non-e2e tests.
|
Test build #75807 has finished for PR 17568 at commit
|
|
Test build #75814 has finished for PR 17568 at commit
|
|
Test build #75822 has finished for PR 17568 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert these changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert these changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @param?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: case DeserializeToObject(Invoke(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: returnType: ObjectType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I addressed all of your comments.
|
Test build #75825 has finished for PR 17568 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case AssertNotNull(c, _) if !c.nullable => c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Cast should be eliminated when running this rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I eliminate Cast in this rule, too. This Cast seems to be added very recently by a recent commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, do you mean
Castshould be eliminated by this rule?; orCastshould be eliminated before applying this rule?
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.objects.Invoke | ||
| import org.apache.spark.sql.execution.DeserializeToObjectExec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert the above two lines?
| MapObjects(_, _, _, Cast(LambdaVariable(_, _, dataType, _), castDataType, _), | ||
| inputData, None), | ||
| funcName, returnType: ObjectType, arguments, propagateNull, returnNullable), | ||
| outputObjAttr, child) if dataType == castDataType => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove this case. Cast has been removed by SimplifyCasts .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, as you pointed out, Cast has been removed by SimplifyCasts.
I leave this for robustness. In the future, this optimization will be executed before SimplifyCasts by reordering.
What do you think? cc: @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order does not matter. The batch will be run multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
| inputData, funcName, returnType, arguments, propagateNull, returnNullable), | ||
| outputObjAttr, child) | ||
| case _ @ DeserializeToObject(Invoke( | ||
| MapObjects(_, _, _, LambdaVariable(_, _, dataType, _), inputData, None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dataType is not being used. Basically, this rule is to get rid of MapObjects when no function is applied to LambdaVariable. Do we still need to check whether customCollectionCls is equal to None? Any other scenario besides type casting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @cloud-fan pointed out in this comment , it is necessary. customCollectionCls is introduced by #16541.
This is not equal to None when Seq() is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, for safety, we can keep it.
| DeserializeToObject(Invoke( | ||
| inputData, funcName, returnType, arguments, propagateNull, returnNullable), | ||
| outputObjAttr, child) | ||
| case _ @ DeserializeToObject(Invoke( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: _ @ DeserializeToObject -> DeserializeToObject
| case EqualNullSafe(Literal(null, _), r) => IsNull(r) | ||
| case EqualNullSafe(l, Literal(null, _)) => IsNull(l) | ||
|
|
||
| case AssertNotNull(c, _) if !c.nullable => c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this safe to do? According to the description of AssertNotNull, even c is non-nullable, we still need to add this assertion for some cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this is what @cloud-fan suggested in his comment.
Is there better alternative implementation to remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if @cloud-fan's no-op AssertNotNull is as the same as the case in AssertNotNull's description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch! sorry it was my mistake, but then seems we can not remove MapObjects, as the null check have to be done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, I checked all the usage of AssertNotNull, we never use AssertNotNull to check a not nullable column/field, seems the document of AssertNotNull is wrong. Can you double check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @cloud-fan. I have also checked the usages of AssertNotNull. IIUC, all of them are used for throwing a runtime exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the purpose of AssertNotNull is used to give proper exception in runtime when an expression (note: it can be nullable or non-nullable expression) evaluates to null value.
Maybe for MapObjects, we can safely remove it. But I am not sure other cases it is okay too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as long as we trust the nullable property, I think it's safe to remove it. We don't use AssertNull to validate the input data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Sounds reasonable to me.
| * 1. Mapobject(e) where e is lambdavariable(), which means types for input output | ||
| * are primitive types | ||
| * 2. no custom collection class specified | ||
| * representation of data item. For example back to back map operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment broken?
|
Test build #75850 has finished for PR 17568 at commit
|
| object EliminateMapObjects extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
| case DeserializeToObject(Invoke( | ||
| MapObjects(_, _, _, _ : LambdaVariable, inputData, None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just replace MapObjects with its child? Seems the only reason you match the whole DeserializeToObject is to make sure the returnType is object type, but that's guaranteed if the collectionClass is None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To replace MapObjects with its child is a type LogicalPlan => Expression while this method requires LogicalPlan => LogicalPlan.
Is it fine to replace Invoke(MapObject(..., inputData, None)...) with Invoke(inputData, ...)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, I misunderstood. Both Expression and Invoke are not LogicalPlan.
I think that we have to replace some of arguments in DeserializeToObject.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Unfortunately, this change caused two test failures.
"checkAnswer should compare map correctly" and "SPARK-18717: code generation works for both scala.collection.Map and scala.collection.imutable.Map" in DatasetSuite
I will check what's happen very soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we run the following code, the following exception occurs. This is because UnsafeArrayData.copy(), which is unsupported, is called.
val ds = Seq((1, Map(2 -> 3))).toDS.map(t => t)
ds.collect.toSeq
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The solution is to use the following matching:
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case MapObjects(_, _, _, LambdaVariable(_, _, _, false), inputData, None) => inputData
}
Previous one avoided the case such as Object[] -> Integer[]. Thus, the generated code incorrectly called UnsafeArrayData.array().
| * representation of data item. For example back to back map operations. | ||
| */ | ||
| object EliminateMapObjects extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we call plan.transformAllExpressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, it works. done
|
Test build #75892 has finished for PR 17568 at commit
|
|
@cloud-fan, would it be possible to review this again? |
| DeserializeToObject(Invoke( | ||
| inputData, funcName, returnType, arguments, propagateNull, returnNullable), | ||
| outputObjAttr, child) | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry for my mistake. Removed.
|
LGTM |
|
|
||
| /** | ||
| * Removes MapObjects when the following conditions are satisfied | ||
| * 1. Mapobject(e) where e is lambdavariable(), which means types for input output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is obscure. Can we improve it a bit?
For example, MapObject(e) is confusing. Shall we clearly say the lambdaFunction of MapObject?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, done
| * 1. Mapobject(e) where e is lambdavariable(), which means types for input output | ||
| * are primitive types | ||
| * 2. no custom collection class specified | ||
| * representation of data item. For example back to back map operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rephrase this comment too? It looks weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, deleted
|
LGTM except for minor comments to the code comments. |
|
Test build #75895 has finished for PR 17568 at commit
|
|
Test build #75899 has finished for PR 17568 at commit
|
|
thanks, merging to master/2.2! |
…h primitive array ## What changes were proposed in this pull request? This PR elminates unnecessary data conversion, which is introduced by SPARK-19716, for Dataset with primitve array in the generated Java code. When we run the following example program, now we get the Java code "Without this PR". In this code, lines 56-82 are unnecessary since the primitive array in ArrayData can be converted into Java primitive array by using ``toDoubleArray()`` method. ``GenericArrayData`` is not required. ```java val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache ds.count ds.map(e => e).show ``` Without this PR ``` == Parsed Logical Plan == 'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D +- 'DeserializeToObject unresolveddeserializer(unresolvedmapobjects(<function1>, getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), obj#23: [D +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- ExternalRDD [obj#1] == Analyzed Logical Plan == value: array<double> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D +- DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- ExternalRDD [obj#1] == Optimized Logical Plan == SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D +- DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- Scan ExternalRDDScan[obj#1] == Physical Plan == *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- *MapElements <function1>, obj#24: [D +- *DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D +- InMemoryTableScan [value#2] +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- Scan ExternalRDDScan[obj#1] ``` ```java /* 050 */ protected void processNext() throws java.io.IOException { /* 051 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 052 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 053 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 054 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0)); /* 055 */ /* 056 */ ArrayData deserializetoobject_value1 = null; /* 057 */ /* 058 */ if (!inputadapter_isNull) { /* 059 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 060 */ /* 061 */ Double[] deserializetoobject_convertedArray = null; /* 062 */ deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength]; /* 063 */ /* 064 */ int deserializetoobject_loopIndex = 0; /* 065 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 066 */ MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex)); /* 067 */ MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 068 */ /* 069 */ if (MapObjects_loopIsNull2) { /* 070 */ throw new RuntimeException(((java.lang.String) references[0])); /* 071 */ } /* 072 */ if (false) { /* 073 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 074 */ } else { /* 075 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2; /* 076 */ } /* 077 */ /* 078 */ deserializetoobject_loopIndex += 1; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/ /* 082 */ } /* 083 */ boolean deserializetoobject_isNull = true; /* 084 */ double[] deserializetoobject_value = null; /* 085 */ if (!inputadapter_isNull) { /* 086 */ deserializetoobject_isNull = false; /* 087 */ if (!deserializetoobject_isNull) { /* 088 */ Object deserializetoobject_funcResult = null; /* 089 */ deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray(); /* 090 */ if (deserializetoobject_funcResult == null) { /* 091 */ deserializetoobject_isNull = true; /* 092 */ } else { /* 093 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult; /* 094 */ } /* 095 */ /* 096 */ } /* 097 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 098 */ } /* 099 */ /* 100 */ boolean mapelements_isNull = true; /* 101 */ double[] mapelements_value = null; /* 102 */ if (!false) { /* 103 */ mapelements_resultIsNull = false; /* 104 */ /* 105 */ if (!mapelements_resultIsNull) { /* 106 */ mapelements_resultIsNull = deserializetoobject_isNull; /* 107 */ mapelements_argValue = deserializetoobject_value; /* 108 */ } /* 109 */ /* 110 */ mapelements_isNull = mapelements_resultIsNull; /* 111 */ if (!mapelements_isNull) { /* 112 */ Object mapelements_funcResult = null; /* 113 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 114 */ if (mapelements_funcResult == null) { /* 115 */ mapelements_isNull = true; /* 116 */ } else { /* 117 */ mapelements_value = (double[]) mapelements_funcResult; /* 118 */ } /* 119 */ /* 120 */ } /* 121 */ mapelements_isNull = mapelements_value == null; /* 122 */ } /* 123 */ /* 124 */ serializefromobject_resultIsNull = false; /* 125 */ /* 126 */ if (!serializefromobject_resultIsNull) { /* 127 */ serializefromobject_resultIsNull = mapelements_isNull; /* 128 */ serializefromobject_argValue = mapelements_value; /* 129 */ } /* 130 */ /* 131 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull; /* 132 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue); /* 133 */ serializefromobject_isNull = serializefromobject_value == null; /* 134 */ serializefromobject_holder.reset(); /* 135 */ /* 136 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 137 */ /* 138 */ if (serializefromobject_isNull) { /* 139 */ serializefromobject_rowWriter.setNullAt(0); /* 140 */ } else { /* 141 */ // Remember the current cursor so that we can calculate how many bytes are /* 142 */ // written later. /* 143 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 144 */ /* 145 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 146 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 147 */ // grow the global buffer before writing data. /* 148 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 149 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 150 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 151 */ /* 152 */ } else { /* 153 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 154 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8); /* 155 */ /* 156 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 157 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 158 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index); /* 159 */ } else { /* 160 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index); /* 161 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 162 */ } /* 163 */ } /* 164 */ } /* 165 */ /* 166 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 167 */ } /* 168 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 169 */ append(serializefromobject_result); /* 170 */ if (shouldStop()) return; /* 171 */ } /* 172 */ } ``` With this PR (eliminated lines 56-62 in the above code) ```java /* 047 */ protected void processNext() throws java.io.IOException { /* 048 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 049 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 050 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 051 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0)); /* 052 */ /* 053 */ boolean deserializetoobject_isNull = true; /* 054 */ double[] deserializetoobject_value = null; /* 055 */ if (!inputadapter_isNull) { /* 056 */ deserializetoobject_isNull = false; /* 057 */ if (!deserializetoobject_isNull) { /* 058 */ Object deserializetoobject_funcResult = null; /* 059 */ deserializetoobject_funcResult = inputadapter_value.toDoubleArray(); /* 060 */ if (deserializetoobject_funcResult == null) { /* 061 */ deserializetoobject_isNull = true; /* 062 */ } else { /* 063 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult; /* 064 */ } /* 065 */ /* 066 */ } /* 067 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 068 */ } /* 069 */ /* 070 */ boolean mapelements_isNull = true; /* 071 */ double[] mapelements_value = null; /* 072 */ if (!false) { /* 073 */ mapelements_resultIsNull = false; /* 074 */ /* 075 */ if (!mapelements_resultIsNull) { /* 076 */ mapelements_resultIsNull = deserializetoobject_isNull; /* 077 */ mapelements_argValue = deserializetoobject_value; /* 078 */ } /* 079 */ /* 080 */ mapelements_isNull = mapelements_resultIsNull; /* 081 */ if (!mapelements_isNull) { /* 082 */ Object mapelements_funcResult = null; /* 083 */ mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue); /* 084 */ if (mapelements_funcResult == null) { /* 085 */ mapelements_isNull = true; /* 086 */ } else { /* 087 */ mapelements_value = (double[]) mapelements_funcResult; /* 088 */ } /* 089 */ /* 090 */ } /* 091 */ mapelements_isNull = mapelements_value == null; /* 092 */ } /* 093 */ /* 094 */ serializefromobject_resultIsNull = false; /* 095 */ /* 096 */ if (!serializefromobject_resultIsNull) { /* 097 */ serializefromobject_resultIsNull = mapelements_isNull; /* 098 */ serializefromobject_argValue = mapelements_value; /* 099 */ } /* 100 */ /* 101 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull; /* 102 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue); /* 103 */ serializefromobject_isNull = serializefromobject_value == null; /* 104 */ serializefromobject_holder.reset(); /* 105 */ /* 106 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 107 */ /* 108 */ if (serializefromobject_isNull) { /* 109 */ serializefromobject_rowWriter.setNullAt(0); /* 110 */ } else { /* 111 */ // Remember the current cursor so that we can calculate how many bytes are /* 112 */ // written later. /* 113 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 114 */ /* 115 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 116 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 117 */ // grow the global buffer before writing data. /* 118 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 119 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 120 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 121 */ /* 122 */ } else { /* 123 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 124 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8); /* 125 */ /* 126 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 127 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 128 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index); /* 129 */ } else { /* 130 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index); /* 131 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 132 */ } /* 133 */ } /* 134 */ } /* 135 */ /* 136 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 137 */ } /* 138 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 139 */ append(serializefromobject_result); /* 140 */ if (shouldStop()) return; /* 141 */ } /* 142 */ } ``` ## How was this patch tested? Add test suites into `DatasetPrimitiveSuite` Author: Kazuaki Ishizaki <[email protected]> Closes #17568 from kiszk/SPARK-20254. (cherry picked from commit e468a96) Signed-off-by: Wenchen Fan <[email protected]>
|
Thank you very much!! |
…h primitive array
## What changes were proposed in this pull request?
This PR elminates unnecessary data conversion, which is introduced by SPARK-19716, for Dataset with primitve array in the generated Java code.
When we run the following example program, now we get the Java code "Without this PR". In this code, lines 56-82 are unnecessary since the primitive array in ArrayData can be converted into Java primitive array by using ``toDoubleArray()`` method. ``GenericArrayData`` is not required.
```java
val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
ds.count
ds.map(e => e).show
```
Without this PR
```
== Parsed Logical Plan ==
'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
+- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
+- 'DeserializeToObject unresolveddeserializer(unresolvedmapobjects(<function1>, getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), obj#23: [D
+- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
+- ExternalRDD [obj#1]
== Analyzed Logical Plan ==
value: array<double>
SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
+- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
+- DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D
+- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
+- ExternalRDD [obj#1]
== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
+- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
+- DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D
+- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
+- Scan ExternalRDDScan[obj#1]
== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
+- *MapElements <function1>, obj#24: [D
+- *DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D
+- InMemoryTableScan [value#2]
+- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
+- Scan ExternalRDDScan[obj#1]
```
```java
/* 050 */ protected void processNext() throws java.io.IOException {
/* 051 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 052 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 053 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 054 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
/* 055 */
/* 056 */ ArrayData deserializetoobject_value1 = null;
/* 057 */
/* 058 */ if (!inputadapter_isNull) {
/* 059 */ int deserializetoobject_dataLength = inputadapter_value.numElements();
/* 060 */
/* 061 */ Double[] deserializetoobject_convertedArray = null;
/* 062 */ deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength];
/* 063 */
/* 064 */ int deserializetoobject_loopIndex = 0;
/* 065 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
/* 066 */ MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex));
/* 067 */ MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
/* 068 */
/* 069 */ if (MapObjects_loopIsNull2) {
/* 070 */ throw new RuntimeException(((java.lang.String) references[0]));
/* 071 */ }
/* 072 */ if (false) {
/* 073 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
/* 074 */ } else {
/* 075 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2;
/* 076 */ }
/* 077 */
/* 078 */ deserializetoobject_loopIndex += 1;
/* 079 */ }
/* 080 */
/* 081 */ deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/
/* 082 */ }
/* 083 */ boolean deserializetoobject_isNull = true;
/* 084 */ double[] deserializetoobject_value = null;
/* 085 */ if (!inputadapter_isNull) {
/* 086 */ deserializetoobject_isNull = false;
/* 087 */ if (!deserializetoobject_isNull) {
/* 088 */ Object deserializetoobject_funcResult = null;
/* 089 */ deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray();
/* 090 */ if (deserializetoobject_funcResult == null) {
/* 091 */ deserializetoobject_isNull = true;
/* 092 */ } else {
/* 093 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult;
/* 094 */ }
/* 095 */
/* 096 */ }
/* 097 */ deserializetoobject_isNull = deserializetoobject_value == null;
/* 098 */ }
/* 099 */
/* 100 */ boolean mapelements_isNull = true;
/* 101 */ double[] mapelements_value = null;
/* 102 */ if (!false) {
/* 103 */ mapelements_resultIsNull = false;
/* 104 */
/* 105 */ if (!mapelements_resultIsNull) {
/* 106 */ mapelements_resultIsNull = deserializetoobject_isNull;
/* 107 */ mapelements_argValue = deserializetoobject_value;
/* 108 */ }
/* 109 */
/* 110 */ mapelements_isNull = mapelements_resultIsNull;
/* 111 */ if (!mapelements_isNull) {
/* 112 */ Object mapelements_funcResult = null;
/* 113 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
/* 114 */ if (mapelements_funcResult == null) {
/* 115 */ mapelements_isNull = true;
/* 116 */ } else {
/* 117 */ mapelements_value = (double[]) mapelements_funcResult;
/* 118 */ }
/* 119 */
/* 120 */ }
/* 121 */ mapelements_isNull = mapelements_value == null;
/* 122 */ }
/* 123 */
/* 124 */ serializefromobject_resultIsNull = false;
/* 125 */
/* 126 */ if (!serializefromobject_resultIsNull) {
/* 127 */ serializefromobject_resultIsNull = mapelements_isNull;
/* 128 */ serializefromobject_argValue = mapelements_value;
/* 129 */ }
/* 130 */
/* 131 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull;
/* 132 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue);
/* 133 */ serializefromobject_isNull = serializefromobject_value == null;
/* 134 */ serializefromobject_holder.reset();
/* 135 */
/* 136 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 137 */
/* 138 */ if (serializefromobject_isNull) {
/* 139 */ serializefromobject_rowWriter.setNullAt(0);
/* 140 */ } else {
/* 141 */ // Remember the current cursor so that we can calculate how many bytes are
/* 142 */ // written later.
/* 143 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 144 */
/* 145 */ if (serializefromobject_value instanceof UnsafeArrayData) {
/* 146 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 147 */ // grow the global buffer before writing data.
/* 148 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 149 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 150 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 151 */
/* 152 */ } else {
/* 153 */ final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 154 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8);
/* 155 */
/* 156 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 157 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 158 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index);
/* 159 */ } else {
/* 160 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index);
/* 161 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 162 */ }
/* 163 */ }
/* 164 */ }
/* 165 */
/* 166 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 167 */ }
/* 168 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 169 */ append(serializefromobject_result);
/* 170 */ if (shouldStop()) return;
/* 171 */ }
/* 172 */ }
```
With this PR (eliminated lines 56-62 in the above code)
```java
/* 047 */ protected void processNext() throws java.io.IOException {
/* 048 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 049 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 050 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 051 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
/* 052 */
/* 053 */ boolean deserializetoobject_isNull = true;
/* 054 */ double[] deserializetoobject_value = null;
/* 055 */ if (!inputadapter_isNull) {
/* 056 */ deserializetoobject_isNull = false;
/* 057 */ if (!deserializetoobject_isNull) {
/* 058 */ Object deserializetoobject_funcResult = null;
/* 059 */ deserializetoobject_funcResult = inputadapter_value.toDoubleArray();
/* 060 */ if (deserializetoobject_funcResult == null) {
/* 061 */ deserializetoobject_isNull = true;
/* 062 */ } else {
/* 063 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult;
/* 064 */ }
/* 065 */
/* 066 */ }
/* 067 */ deserializetoobject_isNull = deserializetoobject_value == null;
/* 068 */ }
/* 069 */
/* 070 */ boolean mapelements_isNull = true;
/* 071 */ double[] mapelements_value = null;
/* 072 */ if (!false) {
/* 073 */ mapelements_resultIsNull = false;
/* 074 */
/* 075 */ if (!mapelements_resultIsNull) {
/* 076 */ mapelements_resultIsNull = deserializetoobject_isNull;
/* 077 */ mapelements_argValue = deserializetoobject_value;
/* 078 */ }
/* 079 */
/* 080 */ mapelements_isNull = mapelements_resultIsNull;
/* 081 */ if (!mapelements_isNull) {
/* 082 */ Object mapelements_funcResult = null;
/* 083 */ mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue);
/* 084 */ if (mapelements_funcResult == null) {
/* 085 */ mapelements_isNull = true;
/* 086 */ } else {
/* 087 */ mapelements_value = (double[]) mapelements_funcResult;
/* 088 */ }
/* 089 */
/* 090 */ }
/* 091 */ mapelements_isNull = mapelements_value == null;
/* 092 */ }
/* 093 */
/* 094 */ serializefromobject_resultIsNull = false;
/* 095 */
/* 096 */ if (!serializefromobject_resultIsNull) {
/* 097 */ serializefromobject_resultIsNull = mapelements_isNull;
/* 098 */ serializefromobject_argValue = mapelements_value;
/* 099 */ }
/* 100 */
/* 101 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull;
/* 102 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue);
/* 103 */ serializefromobject_isNull = serializefromobject_value == null;
/* 104 */ serializefromobject_holder.reset();
/* 105 */
/* 106 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 107 */
/* 108 */ if (serializefromobject_isNull) {
/* 109 */ serializefromobject_rowWriter.setNullAt(0);
/* 110 */ } else {
/* 111 */ // Remember the current cursor so that we can calculate how many bytes are
/* 112 */ // written later.
/* 113 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 114 */
/* 115 */ if (serializefromobject_value instanceof UnsafeArrayData) {
/* 116 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 117 */ // grow the global buffer before writing data.
/* 118 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 119 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 120 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 121 */
/* 122 */ } else {
/* 123 */ final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 124 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8);
/* 125 */
/* 126 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 127 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 128 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index);
/* 129 */ } else {
/* 130 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index);
/* 131 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 132 */ }
/* 133 */ }
/* 134 */ }
/* 135 */
/* 136 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 137 */ }
/* 138 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 139 */ append(serializefromobject_result);
/* 140 */ if (shouldStop()) return;
/* 141 */ }
/* 142 */ }
```
## How was this patch tested?
Add test suites into `DatasetPrimitiveSuite`
Author: Kazuaki Ishizaki <[email protected]>
Closes apache#17568 from kiszk/SPARK-20254.
What changes were proposed in this pull request?
This PR elminates unnecessary data conversion, which is introduced by SPARK-19716, for Dataset with primitve array in the generated Java code.
When we run the following example program, now we get the Java code "Without this PR". In this code, lines 56-82 are unnecessary since the primitive array in ArrayData can be converted into Java primitive array by using
toDoubleArray()method.GenericArrayDatais not required.Without this PR
With this PR (eliminated lines 56-62 in the above code)
How was this patch tested?
Add test suites into
DatasetPrimitiveSuite