Skip to content

Conversation

@BryanCutler
Copy link
Owner

Using Arrow 0.1.1-SNAPSHOT artifact, added more types for testing

@BryanCutler
Copy link
Owner Author

@icexelloss, @wesm we have added more types to the conversion. If you could take a look and make sure it looks ok from an Arrow perspective, that would be great -the StringType in particular.

Good news is with the latest code from Arrow, the conversion of a small DataFrame with longs and doubles works! I'll keep working with more types/data to test.

btw, we are planning on moving the conversion code out of Dataset.scala and into a more suitable location, hopefully that won't affect your code. I'll leave this open for a day or so for discussion.
cc @yinxusen

@wesm
Copy link

wesm commented Dec 12, 2016

cool! see #10 -- we should do plenty of unit testing in Scala as well, let me know how I can assist over the next couple weeks.

@BryanCutler
Copy link
Owner Author

Thanks @wesm!

I'm having an issue now with testing this out on larger DataFrames, like 500k rows, 10 cols of doubles. It works fine with smaller sizes, but at about this size I start getting a seg fault in pyarrow.table.RecordBatch.to_pandas. It creates the Arrow Arrays fine, but seems to fail while trying to copy something in pd.DataFrame(dict(zip(names, data)), columns=names) which I thought is a zero-copy op.

Any ideas what could be going on or where to look? I tracked it down to this line in numpy, https://github.com/numpy/numpy/blob/v1.11.1/numpy/core/src/multiarray/array_assign_array.c#L96 but I can't figure out why it's going there. Here is the stack trace:

Thread 1 "python" received signal SIGSEGV, Segmentation fault.
__memmove_avx_unaligned () at ../sysdeps/x86_64/multiarch/memcpy-avx-unaligned.S:244
244	../sysdeps/x86_64/multiarch/memcpy-avx-unaligned.S: No such file or directory.

(gdb) bt
#0  __memmove_avx_unaligned () at ../sysdeps/x86_64/multiarch/memcpy-avx-unaligned.S:244
#1  0x00007ffff35ede48 in raw_array_assign_array (ndim=1, shape=0x198b830, dst_dtype=<optimized out>, 
    dst_data=0x7fffc5ac2010 "", dst_strides=<optimized out>, src_dtype=<optimized out>, 
    src_data=0x7fffb73b42a4 <error: Cannot access memory at address 0x7fffb73b42a4>, 
    src_strides=src_strides@entry=0x7fffffff97d0) at numpy/core/src/multiarray/array_assign_array.c:96
#2  0x00007ffff35ee75a in PyArray_AssignArray (dst=dst@entry=0x7ffff0656940, src=0x7ffff06a0c60, 
    wheremask=wheremask@entry=0x0, casting=casting@entry=NPY_UNSAFE_CASTING)
    at numpy/core/src/multiarray/array_assign_array.c:351
#3  0x00007ffff360131c in PyArray_MoveInto (dst=dst@entry=0x7ffff0656940, src=<optimized out>)
    at numpy/core/src/multiarray/ctors.c:2777
#4  0x00007ffff35a3660 in PyArray_CopyObject (dest=0x7ffff0656940, src_object=src_object@entry=0x7ffff06a0c60)
    at numpy/core/src/multiarray/arrayobject.c:329
#5  0x00007ffff368b016 in array_assign_subscript (self=0x7ffff0656850, ind=<optimized out>, op=0x7ffff06a0c60)
    at numpy/core/src/multiarray/mapping.c:2084
#6  0x00000000004c5642 in PyEval_EvalFrameEx ()
#7  0x00000000004c9d8f in PyEval_EvalFrameEx ()
#8  0x00000000004c2765 in PyEval_EvalCodeEx ()
#9  0x00000000004ca099 in PyEval_EvalFrameEx ()
#10 0x00000000004c9d8f in PyEval_EvalFrameEx ()
#11 0x00000000004c9d8f in PyEval_EvalFrameEx ()
#12 0x00000000004c2765 in PyEval_EvalCodeEx ()
#13 0x00000000004ca099 in PyEval_EvalFrameEx ()
#14 0x00000000004c2765 in PyEval_EvalCodeEx ()
#15 0x00000000004ca099 in PyEval_EvalFrameEx ()
#16 0x00000000004c2765 in PyEval_EvalCodeEx ()
#17 0x00000000004de8b8 in ?? ()
#18 0x00000000004b0cb3 in PyObject_Call ()
#19 0x00000000004f492e in ?? ()
#20 0x00000000004b0cb3 in PyObject_Call ()
#21 0x00000000004f46a7 in ?? ()
#22 0x00000000004b670c in ?? ()
#23 0x00007fffcadab17f in __Pyx_PyObject_Call (func=0x16dc570, arg=<optimized out>, kw=<optimized out>)
    at /home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/table.cxx:9987
#24 0x00007fffcadb97ab in __pyx_pf_7pyarrow_5table_11RecordBatch_8to_pandas (__pyx_v_self=0x7ffff065b578)
---Type <return> to continue, or q <return> to quit---
    at /home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/table.cxx:4989
#25 __pyx_pw_7pyarrow_5table_11RecordBatch_9to_pandas (__pyx_v_self=0x7ffff065b578, unused=<optimized out>)
    at /home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/table.cxx:4787

@wesm
Copy link

wesm commented Dec 14, 2016

It's a memory lifetime issue -- see

https://github.com/BryanCutler/spark/blob/arrow-integration/python/pyspark/serializers.py#L189

the obj is getting garbage collected / freed before the conversion to DataFrame takes place.

Ideally, the underlying PyBytesReader (https://github.com/apache/arrow/blob/master/python/src/pyarrow/io.h#L84) should retain a reference to the underlying PyBytes object. I'll open an Arrow JIRA

in the meantime, I suggest you either convert the Arrow batch immediately to pandas (not letting the obj get garbage collected) or create a wrapper object that extends the lifetime of the bytes until you're done converting (e.g. if you have multiple record batches)

@wesm
Copy link

wesm commented Dec 14, 2016

@BryanCutler
Copy link
Owner Author

BryanCutler commented Dec 14, 2016

Ahh, I see. Works great after holding a ref to obj, thanks!

@BryanCutler BryanCutler merged this pull request into arrow-integration Dec 14, 2016
@wesm
Copy link

wesm commented Dec 14, 2016

If it were me, I would try to carefully rebase and do FF-merges when working on an integration branch like this. It might be worth doing a git merge --squash against upstream master and getting back to a clean history

@BryanCutler
Copy link
Owner Author

Good point, I'll fix it up

BryanCutler pushed a commit that referenced this pull request Mar 7, 2017
## What changes were proposed in this pull request?

This PR aims to optimize GroupExpressions by removing repeating expressions. `RemoveRepetitionFromGroupExpressions` is added.

**Before**
```scala
scala> sql("select a+1 from values 1,2 T(a) group by a+1, 1+a, A+1, 1+A").explain()
== Physical Plan ==
WholeStageCodegen
:  +- TungstenAggregate(key=[(a#0 + 1)#6,(1 + a#0)#7,(A#0 + 1)#8,(1 + A#0)#9], functions=[], output=[(a + 1)#5])
:     +- INPUT
+- Exchange hashpartitioning((a#0 + 1)#6, (1 + a#0)#7, (A#0 + 1)#8, (1 + A#0)#9, 200), None
   +- WholeStageCodegen
      :  +- TungstenAggregate(key=[(a#0 + 1) AS (a#0 + 1)#6,(1 + a#0) AS (1 + a#0)#7,(A#0 + 1) AS (A#0 + 1)#8,(1 + A#0) AS (1 + A#0)#9], functions=[], output=[(a#0 + 1)#6,(1 + a#0)#7,(A#0 + 1)#8,(1 + A#0)#9])
      :     +- INPUT
      +- LocalTableScan [a#0], [[1],[2]]
```

**After**
```scala
scala> sql("select a+1 from values 1,2 T(a) group by a+1, 1+a, A+1, 1+A").explain()
== Physical Plan ==
WholeStageCodegen
:  +- TungstenAggregate(key=[(a#0 + 1)#6], functions=[], output=[(a + 1)#5])
:     +- INPUT
+- Exchange hashpartitioning((a#0 + 1)#6, 200), None
   +- WholeStageCodegen
      :  +- TungstenAggregate(key=[(a#0 + 1) AS (a#0 + 1)#6], functions=[], output=[(a#0 + 1)#6])
      :     +- INPUT
      +- LocalTableScan [a#0], [[1],[2]]
```

## How was this patch tested?

Pass the Jenkins tests (with a new testcase)

Author: Dongjoon Hyun <[email protected]>

Closes apache#12590 from dongjoon-hyun/SPARK-14830.

(cherry picked from commit 6e63201)
Signed-off-by: Michael Armbrust <[email protected]>
BryanCutler pushed a commit that referenced this pull request Jan 6, 2020
### Why are the changes needed?
`EnsureRequirements` adds `ShuffleExchangeExec` (RangePartitioning) after Sort if `RoundRobinPartitioning` behinds it. This will cause 2 shuffles, and the number of partitions in the final stage is not the number specified by `RoundRobinPartitioning.

**Example SQL**
```
SELECT /*+ REPARTITION(5) */ * FROM test ORDER BY a
```

**BEFORE**
```
== Physical Plan ==
*(1) Sort [a#0 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 200), true, [id=#11]
   +- Exchange RoundRobinPartitioning(5), false, [id=#9]
      +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1]
```

**AFTER**
```
== Physical Plan ==
*(1) Sort [a#0 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 5), true, [id=#11]
   +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1]
```

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Run suite Tests and add new test for this.

Closes apache#26946 from stczwd/RoundRobinPartitioning.

Lead-authored-by: lijunqing <[email protected]>
Co-authored-by: stczwd <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants