From 5e4061e16947d0c7d81b86cbccfa0e222b33d48b Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 9 Apr 2018 12:26:06 -0700 Subject: [PATCH 1/8] made required code changes for upgrade --- pom.xml | 2 +- .../spark/sql/vectorized/ArrowColumnVector.java | 12 ++++++------ .../spark/sql/execution/arrow/ArrowWriter.scala | 4 ++-- .../vectorized/ArrowColumnVectorSuite.scala | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pom.xml b/pom.xml index be84661a50dc..603e70b4fbd7 100644 --- a/pom.xml +++ b/pom.xml @@ -190,7 +190,7 @@ If you are changing Arrow version specification, please check ./python/pyspark/sql/utils.py, ./python/run-tests.py and ./python/setup.py too. --> - 0.8.0 + 0.9.0 ${java.home} diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java index 227a16f7e69e..440a3a0cd23f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java @@ -162,13 +162,13 @@ public ArrowColumnVector(ValueVector vector) { } else if (vector instanceof ListVector) { ListVector listVector = (ListVector) vector; accessor = new ArrayAccessor(listVector); - } else if (vector instanceof NullableMapVector) { - NullableMapVector mapVector = (NullableMapVector) vector; - accessor = new StructAccessor(mapVector); + } else if (vector instanceof StructVector) { + StructVector structVector = (StructVector) vector; + accessor = new StructAccessor(structVector); - childColumns = new ArrowColumnVector[mapVector.size()]; + childColumns = new ArrowColumnVector[structVector.size()]; for (int i = 0; i < childColumns.length; ++i) { - childColumns[i] = new ArrowColumnVector(mapVector.getVectorById(i)); + childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i)); } } else { throw new UnsupportedOperationException(); @@ -472,7 +472,7 @@ final ColumnarArray getArray(int rowId) { */ private static class StructAccessor extends ArrowVectorAccessor { - StructAccessor(NullableMapVector vector) { + StructAccessor(StructVector vector) { super(vector); } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index 3de6ea8bb257..71d4a72732ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -62,7 +62,7 @@ object ArrowWriter { case (ArrayType(_, _), vector: ListVector) => val elementVector = createFieldWriter(vector.getDataVector()) new ArrayWriter(vector, elementVector) - case (StructType(_), vector: NullableMapVector) => + case (StructType(_), vector: StructVector) => val children = (0 until vector.size()).map { ordinal => createFieldWriter(vector.getChildByOrdinal(ordinal)) } @@ -323,7 +323,7 @@ private[arrow] class ArrayWriter( } private[arrow] class StructWriter( - val valueVector: NullableMapVector, + val valueVector: StructVector, children: Array[ArrowFieldWriter]) extends ArrowFieldWriter { override def setNull(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala index b55489cb2678..4592a1663fae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala @@ -336,7 +336,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue) val schema = new StructType().add("int", IntegerType).add("long", LongType) val vector = ArrowUtils.toArrowField("struct", schema, nullable = false, null) - .createVector(allocator).asInstanceOf[NullableMapVector] + .createVector(allocator).asInstanceOf[StructVector] vector.allocateNew() val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector] @@ -373,7 +373,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue) val schema = new StructType().add("int", IntegerType).add("long", LongType) val vector = ArrowUtils.toArrowField("struct", schema, nullable = true, null) - .createVector(allocator).asInstanceOf[NullableMapVector] + .createVector(allocator).asInstanceOf[StructVector] vector.allocateNew() val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector] val longVector = vector.getChildByOrdinal(1).asInstanceOf[BigIntVector] From 8b63e6c39e2b88ba2f4d1e9c666e7dd456d265bf Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 9 Apr 2018 12:26:54 -0700 Subject: [PATCH 2/8] remove unused import --- .../scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index 71d4a72732ac..ab726116212e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -21,7 +21,6 @@ import scala.collection.JavaConverters._ import org.apache.arrow.vector._ import org.apache.arrow.vector.complex._ -import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters From 95a1bf9de43bcee8684adf47990dc64923a63c0c Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 20 Apr 2018 09:47:05 -0700 Subject: [PATCH 3/8] added some TODOs --- python/pyspark/serializers.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 82abf1947c81..47c4c3e663b9 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -229,12 +229,14 @@ def _create_batch(series, timezone): def create_array(s, t): mask = s.isnull() # Ensure timestamp series are in expected form for Spark internal representation + # TODO: maybe don't need None check anymore as of Arrow 0.9.1 if t is not None and pa.types.is_timestamp(t): s = _check_series_convert_timestamps_internal(s.fillna(0), timezone) # TODO: need cast after Arrow conversion, ns values cause error with pandas 0.19.2 return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False) elif t is not None and pa.types.is_string(t) and sys.version < '3': # TODO: need decode before converting to Arrow in Python 2 + # TODO: don't need as of Arrow 0.9.1 return pa.Array.from_pandas(s.apply( lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t) elif t is not None and pa.types.is_decimal(t) and \ From 739a12eca9dbd83256be388703098e4d1f9cab9e Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 18 May 2018 13:36:21 -0700 Subject: [PATCH 4/8] change to using 0.10 snapshot --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 603e70b4fbd7..35a31e6114fe 100644 --- a/pom.xml +++ b/pom.xml @@ -190,7 +190,7 @@ If you are changing Arrow version specification, please check ./python/pyspark/sql/utils.py, ./python/run-tests.py and ./python/setup.py too. --> - 0.9.0 + 0.10.0-SNAPSHOT ${java.home} From d1f7355e78367a1b8a4347887fa1780106c6bffd Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 31 Jul 2018 17:40:39 -0700 Subject: [PATCH 5/8] replaced manifests --- dev/deps/spark-deps-hadoop-2.6 | 6 +++--- dev/deps/spark-deps-hadoop-2.7 | 6 +++--- dev/deps/spark-deps-hadoop-3.1 | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 4ef61b2ab8cb..0ffe2d5b151d 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.8.0.jar -arrow-memory-0.8.0.jar -arrow-vector-0.8.0.jar +arrow-format-0.10.0-SNAPSHOT.jar +arrow-memory-0.10.0-SNAPSHOT.jar +arrow-vector-0.10.0-SNAPSHOT.jar automaton-1.11-8.jar avro-1.8.2.jar avro-ipc-1.8.2.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index a74ce1f26b14..ad7c43290eed 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.8.0.jar -arrow-memory-0.8.0.jar -arrow-vector-0.8.0.jar +arrow-format-0.10.0-SNAPSHOT.jar +arrow-memory-0.10.0-SNAPSHOT.jar +arrow-vector-0.10.0-SNAPSHOT.jar automaton-1.11-8.jar avro-1.8.2.jar avro-ipc-1.8.2.jar diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1 index e0fcca0eeb31..715f985e58ec 100644 --- a/dev/deps/spark-deps-hadoop-3.1 +++ b/dev/deps/spark-deps-hadoop-3.1 @@ -12,9 +12,9 @@ aopalliance-1.0.jar aopalliance-repackaged-2.4.0-b34.jar apache-log4j-extras-1.2.17.jar arpack_combined_all-0.1.jar -arrow-format-0.8.0.jar -arrow-memory-0.8.0.jar -arrow-vector-0.8.0.jar +arrow-format-0.10.0-SNAPSHOT.jar +arrow-memory-0.10.0-SNAPSHOT.jar +arrow-vector-0.10.0-SNAPSHOT.jar automaton-1.11-8.jar avro-1.8.2.jar avro-ipc-1.8.2.jar From abedec3243cce5fa92ea3acf711f504bab6814f3 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 31 Jul 2018 17:42:56 -0700 Subject: [PATCH 6/8] removed version snapshot --- dev/deps/spark-deps-hadoop-2.6 | 6 +++--- dev/deps/spark-deps-hadoop-2.7 | 6 +++--- dev/deps/spark-deps-hadoop-3.1 | 6 +++--- pom.xml | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 0ffe2d5b151d..c193ffa52804 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.10.0-SNAPSHOT.jar -arrow-memory-0.10.0-SNAPSHOT.jar -arrow-vector-0.10.0-SNAPSHOT.jar +arrow-format-0.10.0.jar +arrow-memory-0.10.0.jar +arrow-vector-0.10.0.jar automaton-1.11-8.jar avro-1.8.2.jar avro-ipc-1.8.2.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index ad7c43290eed..20e91e539546 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.10.0-SNAPSHOT.jar -arrow-memory-0.10.0-SNAPSHOT.jar -arrow-vector-0.10.0-SNAPSHOT.jar +arrow-format-0.10.0.jar +arrow-memory-0.10.0.jar +arrow-vector-0.10.0.jar automaton-1.11-8.jar avro-1.8.2.jar avro-ipc-1.8.2.jar diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1 index 715f985e58ec..74d43c80fe34 100644 --- a/dev/deps/spark-deps-hadoop-3.1 +++ b/dev/deps/spark-deps-hadoop-3.1 @@ -12,9 +12,9 @@ aopalliance-1.0.jar aopalliance-repackaged-2.4.0-b34.jar apache-log4j-extras-1.2.17.jar arpack_combined_all-0.1.jar -arrow-format-0.10.0-SNAPSHOT.jar -arrow-memory-0.10.0-SNAPSHOT.jar -arrow-vector-0.10.0-SNAPSHOT.jar +arrow-format-0.10.0.jar +arrow-memory-0.10.0.jar +arrow-vector-0.10.0.jar automaton-1.11-8.jar avro-1.8.2.jar avro-ipc-1.8.2.jar diff --git a/pom.xml b/pom.xml index 35a31e6114fe..0b5607fd17d3 100644 --- a/pom.xml +++ b/pom.xml @@ -190,7 +190,7 @@ If you are changing Arrow version specification, please check ./python/pyspark/sql/utils.py, ./python/run-tests.py and ./python/setup.py too. --> - 0.10.0-SNAPSHOT + 0.10.0 ${java.home} From 0652617d2960bedbf11a643d67bb6d65cb467ebc Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 7 Aug 2018 21:59:05 -0700 Subject: [PATCH 7/8] remove space typo --- .../java/org/apache/spark/sql/vectorized/ArrowColumnVector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java index 440a3a0cd23f..8adcb9d09069 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java @@ -163,7 +163,7 @@ public ArrowColumnVector(ValueVector vector) { ListVector listVector = (ListVector) vector; accessor = new ArrayAccessor(listVector); } else if (vector instanceof StructVector) { - StructVector structVector = (StructVector) vector; + StructVector structVector = (StructVector) vector; accessor = new StructAccessor(structVector); childColumns = new ArrowColumnVector[structVector.size()]; From ae8a6aaf7f309e4f0c3962968b6b81f429d9ae39 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 13 Aug 2018 22:46:08 -0700 Subject: [PATCH 8/8] cleanup ArrowFieldWriter.reset() --- .../spark/sql/execution/arrow/ArrowWriter.scala | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index ab726116212e..8dd484af6e90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -128,20 +128,7 @@ private[arrow] abstract class ArrowFieldWriter { } def reset(): Unit = { - // TODO: reset() should be in a common interface - valueVector match { - case fixedWidthVector: BaseFixedWidthVector => fixedWidthVector.reset() - case variableWidthVector: BaseVariableWidthVector => variableWidthVector.reset() - case listVector: ListVector => - // Manual "reset" the underlying buffer. - // TODO: When we upgrade to Arrow 0.10.0, we can simply remove this and call - // `listVector.reset()`. - val buffers = listVector.getBuffers(false) - buffers.foreach(buf => buf.setZero(0, buf.capacity())) - listVector.setValueCount(0) - listVector.setLastSet(0) - case _ => - } + valueVector.reset() count = 0 } }