Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
arpack_combined_all-0.1.jar
arrow-format-0.8.0.jar
arrow-memory-0.8.0.jar
arrow-vector-0.8.0.jar
arrow-format-0.10.0.jar
arrow-memory-0.10.0.jar
arrow-vector-0.10.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
avro-ipc-1.8.2.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
arpack_combined_all-0.1.jar
arrow-format-0.8.0.jar
arrow-memory-0.8.0.jar
arrow-vector-0.8.0.jar
arrow-format-0.10.0.jar
arrow-memory-0.10.0.jar
arrow-vector-0.10.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
avro-ipc-1.8.2.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ aopalliance-1.0.jar
aopalliance-repackaged-2.4.0-b34.jar
apache-log4j-extras-1.2.17.jar
arpack_combined_all-0.1.jar
arrow-format-0.8.0.jar
arrow-memory-0.8.0.jar
arrow-vector-0.8.0.jar
arrow-format-0.10.0.jar
arrow-memory-0.10.0.jar
arrow-vector-0.10.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
avro-ipc-1.8.2.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@
If you are changing Arrow version specification, please check ./python/pyspark/sql/utils.py,
./python/run-tests.py and ./python/setup.py too.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do this check?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, actually I did the check instead in his previous attempt :-). Seems we don't need to change the minimum pyarrow version by this upgrade.

BTW, we can remove ./python/run-tests.py. here in the comment and in setup.py comment. This cleanup in ./python/run-tests.py. was done by https://github.com/apache/spark/pull/21107/files#diff-871d87c62d4e9228a47145a8894b6694L172

-->
<arrow.version>0.8.0</arrow.version>
<arrow.version>0.10.0</arrow.version>

<test.java.home>${java.home}</test.java.home>
<test.exclude.tags></test.exclude.tags>
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,14 @@ def _create_batch(series, timezone):
def create_array(s, t):
mask = s.isnull()
# Ensure timestamp series are in expected form for Spark internal representation
# TODO: maybe don't need None check anymore as of Arrow 0.9.1
if t is not None and pa.types.is_timestamp(t):
s = _check_series_convert_timestamps_internal(s.fillna(0), timezone)
# TODO: need cast after Arrow conversion, ns values cause error with pandas 0.19.2
return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
elif t is not None and pa.types.is_string(t) and sys.version < '3':
# TODO: need decode before converting to Arrow in Python 2
# TODO: don't need as of Arrow 0.9.1
return pa.Array.from_pandas(s.apply(
lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t)
elif t is not None and pa.types.is_decimal(t) and \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ public ArrowColumnVector(ValueVector vector) {
} else if (vector instanceof ListVector) {
ListVector listVector = (ListVector) vector;
accessor = new ArrayAccessor(listVector);
} else if (vector instanceof NullableMapVector) {
NullableMapVector mapVector = (NullableMapVector) vector;
accessor = new StructAccessor(mapVector);
} else if (vector instanceof StructVector) {
StructVector structVector = (StructVector) vector;
accessor = new StructAccessor(structVector);

childColumns = new ArrowColumnVector[mapVector.size()];
childColumns = new ArrowColumnVector[structVector.size()];
for (int i = 0; i < childColumns.length; ++i) {
childColumns[i] = new ArrowColumnVector(mapVector.getVectorById(i));
childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i));
}
} else {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -472,7 +472,7 @@ final ColumnarArray getArray(int rowId) {
*/
private static class StructAccessor extends ArrowVectorAccessor {

StructAccessor(NullableMapVector vector) {
StructAccessor(StructVector vector) {
super(vector);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.collection.JavaConverters._

import org.apache.arrow.vector._
import org.apache.arrow.vector.complex._
import org.apache.arrow.vector.types.pojo.ArrowType

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
Expand Down Expand Up @@ -62,7 +61,7 @@ object ArrowWriter {
case (ArrayType(_, _), vector: ListVector) =>
val elementVector = createFieldWriter(vector.getDataVector())
new ArrayWriter(vector, elementVector)
case (StructType(_), vector: NullableMapVector) =>
case (StructType(_), vector: StructVector) =>
val children = (0 until vector.size()).map { ordinal =>
createFieldWriter(vector.getChildByOrdinal(ordinal))
}
Expand Down Expand Up @@ -129,20 +128,7 @@ private[arrow] abstract class ArrowFieldWriter {
}

def reset(): Unit = {
// TODO: reset() should be in a common interface
valueVector match {
case fixedWidthVector: BaseFixedWidthVector => fixedWidthVector.reset()
case variableWidthVector: BaseVariableWidthVector => variableWidthVector.reset()
case listVector: ListVector =>
// Manual "reset" the underlying buffer.
// TODO: When we upgrade to Arrow 0.10.0, we can simply remove this and call
// `listVector.reset()`.
val buffers = listVector.getBuffers(false)
buffers.foreach(buf => buf.setZero(0, buf.capacity()))
listVector.setValueCount(0)
listVector.setLastSet(0)
case _ =>
}
valueVector.reset()
count = 0
}
}
Expand Down Expand Up @@ -323,7 +309,7 @@ private[arrow] class ArrayWriter(
}

private[arrow] class StructWriter(
val valueVector: NullableMapVector,
val valueVector: StructVector,
children: Array[ArrowFieldWriter]) extends ArrowFieldWriter {

override def setNull(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
val schema = new StructType().add("int", IntegerType).add("long", LongType)
val vector = ArrowUtils.toArrowField("struct", schema, nullable = false, null)
.createVector(allocator).asInstanceOf[NullableMapVector]
.createVector(allocator).asInstanceOf[StructVector]

vector.allocateNew()
val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector]
Expand Down Expand Up @@ -373,7 +373,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
val schema = new StructType().add("int", IntegerType).add("long", LongType)
val vector = ArrowUtils.toArrowField("struct", schema, nullable = true, null)
.createVector(allocator).asInstanceOf[NullableMapVector]
.createVector(allocator).asInstanceOf[StructVector]
vector.allocateNew()
val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector]
val longVector = vector.getChildByOrdinal(1).asInstanceOf[BigIntVector]
Expand Down