-
Notifications
You must be signed in to change notification settings - Fork 0
rtpsw-x1 #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rtpsw-x1 #18
Conversation
|
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW Opening JIRAs ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename pull request title in the following format? or See also: |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite convinced this works (no updated start_size_). Do you have some manual tests you've been running for this?
| // 2) pop/try_pop cannot be called concurrently with this | ||
| const T& UnsyncFront() const { return queue_.front(); } | ||
|
|
||
| const size_t UnsyncSize() const { return queue_.size(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| const size_t UnsyncSize() const { return queue_.size(); } | |
| size_t UnsyncSize() const { return queue_.size(); } |
| } | ||
|
|
||
| BackpressureConcurrentQueue& queue_; | ||
| size_t start_size_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does anything ever update start_size_?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start_size_ only gets initialized. DoHandle is designed to report the start size and end size of a queue, where some operation happened in between. When DoHandle is added at the start of a block of code, its constructor will save the start size, then the block will run, and finally the DoHandle destructor will report the start-and-end sizes to the handler. This is an RAII idiom similar to a try-finally.
| if (process_thread_.get_id() != std::this_thread::get_id()) { // avoid deadlock | ||
| process_thread_.join(); | ||
| } else { | ||
| process_thread_.detach(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it should be impossible for process_thread_ to trigger deletion of the AsosJoinNode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some runs with a slow source, I observed the AsOfJoinNode destructor getting called from this process thread. The original code would then get a resource-deadlock exception when joining the process thread, because its the same thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record, I got this deadlock
terminate called after throwing an instance of 'std::system_error'
what(): Resource deadlock avoided
in the following gdb session:
$ gdb --args ./debug/arrow-compute-asof-join-node-test --gtest_filter=AsofJoinTest.BackpressureDemo
GNU gdb (Ubuntu 9.2-0ubuntu1~20.04) 9.2
Copyright (C) 2020 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.
Type "show copying" and "show warranty" for details.
This GDB was configured as "x86_64-linux-gnu".
Type "show configuration" for configuration details.
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>.
Find the GDB manual and other documentation resources online at:
<http://www.gnu.org/software/gdb/documentation/>.
For help, type "help".
Type "apropos word" to search for commands related to "word"...
Reading symbols from ./debug/arrow-compute-asof-join-node-test...
(gdb) catch throw
Catchpoint 1 (throw)
(gdb) run
Starting program: /github/westonpace/arrow/cpp/build/debug/debug/arrow-compute-asof-join-node-test --gtest_filter=AsofJoinTest.BackpressureDemo
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7ffff2fff700 (LWP 76047)]
Running main() from /build/googletest-j5yxiC/googletest-1.10.0/googletest/src/gtest_main.cc
Note: Google Test filter = AsofJoinTest.BackpressureDemo
[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from AsofJoinTest
[ RUN ] AsofJoinTest.BackpressureDemo
[New Thread 0x7ffff27fe700 (LWP 76048)]
[New Thread 0x7ffff1ffd700 (LWP 76049)]
2:fast: asking for batch(0)
[New Thread 0x7ffff17fc700 (LWP 76050)]
1:slow: asking for batch(0)
[New Thread 0x7ffff0ffb700 (LWP 76051)]
0:fast: asking for batch(0)
[New Thread 0x7fffe3fff700 (LWP 76052)]
[New Thread 0x7fffe37fe700 (LWP 76053)]
0:fast: asking for batch(1)
0:fast: asking for batch(2)
0:fast: asking for batch(3)
0:fast: asking for batch(4)
0:fast: asking for batch(5)
0:fast: asking for batch(6)
0:fast: asking for batch(7)
0:fast: asking for batch(8)
1:slow: asking for batch(1)
0:fast: asking for batch(9)
2:fast: asking for batch(1)
1:slow: asking for batch(2)
2:fast: asking for batch(2)
[New Thread 0x7fffe2ffd700 (LWP 76054)]
1:slow: asking for batch(3)
2:fast: asking for batch(3)
1:slow: asking for batch(4)
2:fast: asking for batch(4)
[New Thread 0x7fffe27fc700 (LWP 76055)]
1:slow: asking for batch(5)
[New Thread 0x7fffe1ffb700 (LWP 76056)]
2:fast: asking for batch(5)
1:slow: asking for batch(6)
2:fast: asking for batch(6)
2:fast: asking for batch(7)
1:slow: asking for batch(7)
2:fast: asking for batch(8)
1:slow: asking for batch(8)
2:fast: asking for batch(9)
1:slow: asking for batch(9)
[Switching to Thread 0x7ffff27fe700 (LWP 76048)]
Thread 3 "arrow-compute-a" hit Catchpoint 1 (exception thrown), 0x00007ffff3ded672 in __cxa_throw () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
(gdb) bt
#0 0x00007ffff3ded672 in __cxa_throw () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#1 0x00007ffff3de473f in std::__throw_system_error(int) () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#2 0x00007ffff3e1a060 in std::thread::join() () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#3 0x00007ffff5adabd9 in arrow::compute::AsofJoinNode::~AsofJoinNode (this=0x5555558b2c90, __in_chrg=<optimized out>)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:987
#4 0x00007ffff5adac96 in arrow::compute::AsofJoinNode::~AsofJoinNode (this=0x5555558b2c90, __in_chrg=<optimized out>)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:988
#5 0x00005555557138f4 in std::default_delete<arrow::compute::ExecNode>::operator() (this=0x5555558b2578, __ptr=0x5555558b2c90) at /usr/include/c++/9/bits/unique_ptr.h:81
#6 0x000055555571193e in std::unique_ptr<arrow::compute::ExecNode, std::default_delete<arrow::compute::ExecNode> >::~unique_ptr (this=0x5555558b2578, __in_chrg=<optimized out>)
at /usr/include/c++/9/bits/unique_ptr.h:292
#7 0x00007ffff5b1fcf0 in std::_Destroy<std::unique_ptr<arrow::compute::ExecNode, std::default_delete<arrow::compute::ExecNode> > > (__pointer=0x5555558b2578)
at /usr/include/c++/9/bits/stl_construct.h:98
#8 0x00007ffff5b1c676 in std::_Destroy_aux<false>::__destroy<std::unique_ptr<arrow::compute::ExecNode, std::default_delete<arrow::compute::ExecNode> >*> (__first=0x5555558b2578, __last=0x5555558b2588)
at /usr/include/c++/9/bits/stl_construct.h:108
#9 0x00007ffff5b17efa in std::_Destroy<std::unique_ptr<arrow::compute::ExecNode, std::default_delete<arrow::compute::ExecNode> >*> (__first=0x5555558b2560, __last=0x5555558b2588)
at /usr/include/c++/9/bits/stl_construct.h:137
#10 0x00007ffff5b1309f in std::_Destroy<std::unique_ptr<arrow::compute::ExecNode, std::default_delete<arrow::compute::ExecNode> >*, std::unique_ptr<arrow::compute::ExecNode, std::default_delete<arrow::compute::ExecNode> > > (__first=0x5555558b2560, __last=0x5555558b2588) at /usr/include/c++/9/bits/stl_construct.h:206
#11 0x00007ffff5b0f259 in std::vector<std::unique_ptr<arrow::compute::ExecNode, std::default_delete<arrow::compute::ExecNode> >, std::allocator<std::unique_ptr<arrow::compute::ExecNode, std::default_delete<arrow::compute::ExecNode> > > >::~vector (this=0x5555558b10d8, __in_chrg=<optimized out>) at /usr/include/c++/9/bits/stl_vector.h:677
#12 0x00007ffff5b0277b in arrow::compute::(anonymous namespace)::ExecPlanImpl::~ExecPlanImpl (this=0x5555558b1070, __in_chrg=<optimized out>)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:70
#13 0x00007ffff5b027da in arrow::compute::(anonymous namespace)::ExecPlanImpl::~ExecPlanImpl (this=0x5555558b1070, __in_chrg=<optimized out>)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:76
#14 0x00007ffff5b0c7da in std::_Sp_counted_ptr<arrow::compute::(anonymous namespace)::ExecPlanImpl*, (__gnu_cxx::_Lock_policy)2>::_M_dispose (this=0x5555558b1430)
at /usr/include/c++/9/bits/shared_ptr_base.h:377
#15 0x000055555569f51a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release (this=0x5555558b1430) at /usr/include/c++/9/bits/shared_ptr_base.h:155
#16 0x000055555568e407 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count (this=0x5555558b3600, __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr_base.h:730
#17 0x0000555555688184 in std::__shared_ptr<arrow::compute::ExecPlan, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr (this=0x5555558b35f8, __in_chrg=<optimized out>)
at /usr/include/c++/9/bits/shared_ptr_base.h:1169
#18 0x00005555556881a4 in std::shared_ptr<arrow::compute::ExecPlan>::~shared_ptr (this=0x5555558b35f8, __in_chrg=<optimized out>) at /usr/include/c++/9/bits/shared_ptr.h:103
#19 0x00007ffff5b06904 in arrow::compute::<lambda()>::~<lambda>(void) (this=0x5555558b35f8, __in_chrg=<optimized out>)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:586
#20 0x00007ffff5b08c70 in arrow::Future<arrow::internal::Empty>::ThenOnComplete<arrow::compute::DeclarationToTableAsync(arrow::compute::Declaration, arrow::compute::ExecContext*)::<lambda()>, arrow::Future<>::PassthruOnFailure<arrow::compute::DeclarationToTableAsync(arrow::compute::Declaration, arrow::compute::ExecContext*)::<lambda()> > >::~ThenOnComplete(void) (this=0x5555558b35f8,
__in_chrg=<optimized out>) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:518
#21 0x00007ffff5b09d8a in arrow::Future<arrow::internal::Empty>::WrapResultyOnComplete::Callback<arrow::Future<>::ThenOnComplete<arrow::compute::DeclarationToTableAsync(arrow::compute::Declaration, arrow::compute::ExecContext*)::<lambda()>, arrow::Future<>::PassthruOnFailure<arrow::compute::DeclarationToTableAsync(arrow::compute::Declaration, arrow::compute::ExecContext*)::<lambda()> > > >::~Callback(void) (this=0x5555558b35f8, __in_chrg=<optimized out>) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:440
#22 0x00007ffff5b0c2fc in arrow::internal::FnOnce<void(const arrow::FutureImpl&)>::FnImpl<arrow::Future<>::WrapResultyOnComplete::Callback<arrow::Future<>::ThenOnComplete<arrow::compute::DeclarationToTableAsync(arrow::compute::Declaration, arrow::compute::ExecContext*)::<lambda()>, arrow::Future<>::PassthruOnFailure<arrow::compute::DeclarationToTableAsync(arrow::compute::Declaration, arrow::compute::ExecContext*)::<lambda()> > > > >::~FnImpl(void) (this=0x5555558b35f0, __in_chrg=<optimized out>) at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:150
#23 0x00007ffff5b0c328 in arrow::internal::FnOnce<void(const arrow::FutureImpl&)>::FnImpl<arrow::Future<>::WrapResultyOnComplete::Callback<arrow::Future<>::ThenOnComplete<arrow::compute::DeclarationToTableAsync(arrow::compute::Declaration, arrow::compute::ExecContext*)::<lambda()>, arrow::Future<>::PassthruOnFailure<arrow::compute::DeclarationToTableAsync(arrow::compute::Declaration, arrow::compute::ExecContext*)::<lambda()> > > > >::~FnImpl(void) (this=0x5555558b35f0, __in_chrg=<optimized out>) at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:150
#24 0x00005555556dc7d0 in std::default_delete<arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::Impl>::operator()(arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::Impl*) const (
this=0x7ffff27fbe00, __ptr=0x5555558b35f0) at /usr/include/c++/9/bits/unique_ptr.h:81
#25 0x00005555556d6b34 in std::unique_ptr<arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::Impl, std::default_delete<arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::Impl> >::~unique_ptr() (this=0x7ffff27fbe00, __in_chrg=<optimized out>) at /usr/include/c++/9/bits/unique_ptr.h:292
#26 0x00007ffff5926ca9 in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::operator()(arrow::FutureImpl const&) && (this=0x5555558b6070, a#0=...)
at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:139
#27 0x00007ffff5926378 in arrow::ConcreteFutureImpl::RunOrScheduleCallback (self=std::shared_ptr<class arrow::FutureImpl> (use count 2, weak count 1) = {...}, callback_record=..., in_add_callback=false)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:109
#28 0x00007ffff59266ae in arrow::ConcreteFutureImpl::DoMarkFinishedOrFailed (this=0x5555558b11c0, state=arrow::FutureState::SUCCESS)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:147
#29 0x00007ffff5925b53 in arrow::ConcreteFutureImpl::DoMarkFinished (this=0x5555558b11c0) at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:38
--Type <RET> for more, q to quit, c to continue without paging--
#30 0x00007ffff5923c00 in arrow::FutureImpl::MarkFinished (this=0x5555558b11c0) at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:193
#31 0x00005555556f143e in arrow::Future<arrow::internal::Empty>::DoMarkFinished (this=0x5555558b10c0, res=...) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:658
#32 0x00005555556ee304 in arrow::Future<arrow::internal::Empty>::MarkFinished<arrow::internal::Empty, void> (this=0x5555558b10c0, s=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.h:409
#33 0x00007ffff5b03b4c in arrow::compute::(anonymous namespace)::ExecPlanImpl::<lambda(const arrow::Status&)>::operator()(const arrow::Status &) const (__closure=0x7fffd8005588, st=...)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:220
#34 0x00007ffff5b0cb93 in arrow::Future<arrow::internal::Empty>::WrapStatusyOnComplete::Callback<arrow::compute::(anonymous namespace)::ExecPlanImpl::EndTaskGroup()::<lambda(const arrow::Status&)> >::operator()(const arrow::FutureImpl &) (this=0x7fffd8005588, impl=...) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:455
#35 0x00007ffff5b0c933 in arrow::internal::FnOnce<void(const arrow::FutureImpl&)>::FnImpl<arrow::Future<>::WrapStatusyOnComplete::Callback<arrow::compute::(anonymous namespace)::ExecPlanImpl::EndTaskGroup()::<lambda(const arrow::Status&)> > >::invoke(const arrow::FutureImpl &) (this=0x7fffd8005580, a#0=...) at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:152
#36 0x00007ffff5926c9c in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::operator()(arrow::FutureImpl const&) && (this=0x7ffff27fc290, a#0=...)
at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:140
#37 0x00007ffff5926378 in arrow::ConcreteFutureImpl::RunOrScheduleCallback (self=std::shared_ptr<class arrow::FutureImpl> (use count 2, weak count 1) = {...}, callback_record=..., in_add_callback=true)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:109
#38 0x00007ffff5925db2 in arrow::ConcreteFutureImpl::AddCallback(arrow::internal::FnOnce<void (arrow::FutureImpl const&)>, arrow::CallbackOptions) (this=0x5555558b1340, callback=..., opts=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:63
#39 0x00007ffff5923ca0 in arrow::FutureImpl::AddCallback(arrow::internal::FnOnce<void (arrow::FutureImpl const&)>, arrow::CallbackOptions) (this=0x5555558b1340, callback=..., opts=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:198
#40 0x00007ffff5b0899a in arrow::Future<arrow::internal::Empty>::AddCallback<arrow::compute::(anonymous namespace)::ExecPlanImpl::EndTaskGroup()::<lambda(const arrow::Status&)> >(arrow::compute::(anonymous namespace)::ExecPlanImpl::<lambda(const arrow::Status&)>, arrow::CallbackOptions) const (this=0x7ffff27fc390, on_complete=..., opts=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.h:493
#41 0x00007ffff5b03c4e in arrow::compute::(anonymous namespace)::ExecPlanImpl::EndTaskGroup (this=0x5555558b1070)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:217
#42 0x00007ffff5b03203 in arrow::compute::(anonymous namespace)::ExecPlanImpl::<lambda(const arrow::Status&)>::operator()(const arrow::Status &) const (__closure=0x5555558b47e8, st=...)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/exec_plan.cc:171
#43 0x00007ffff5b0cbdb in arrow::Future<arrow::internal::Empty>::WrapStatusyOnComplete::Callback<arrow::compute::(anonymous namespace)::ExecPlanImpl::StartProducing(arrow::internal::Executor*)::<lambda(const arrow::Status&)> >::operator()(const arrow::FutureImpl &) (this=0x5555558b47e8, impl=...) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:455
#44 0x00007ffff5b0c97b in arrow::internal::FnOnce<void(const arrow::FutureImpl&)>::FnImpl<arrow::Future<>::WrapStatusyOnComplete::Callback<arrow::compute::(anonymous namespace)::ExecPlanImpl::StartProducing(arrow::internal::Executor*)::<lambda(const arrow::Status&)> > >::invoke(const arrow::FutureImpl &) (this=0x5555558b47e0, a#0=...)
at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:152
#45 0x00007ffff5926c9c in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::operator()(arrow::FutureImpl const&) && (this=0x5555558b4800, a#0=...)
at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:140
#46 0x00007ffff5926378 in arrow::ConcreteFutureImpl::RunOrScheduleCallback (self=std::shared_ptr<class arrow::FutureImpl> (use count 2, weak count 1) = {...}, callback_record=..., in_add_callback=false)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:109
#47 0x00007ffff59266ae in arrow::ConcreteFutureImpl::DoMarkFinishedOrFailed (this=0x5555558b46c0, state=arrow::FutureState::SUCCESS)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:147
#48 0x00007ffff5925b53 in arrow::ConcreteFutureImpl::DoMarkFinished (this=0x5555558b46c0) at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:38
#49 0x00007ffff5923c00 in arrow::FutureImpl::MarkFinished (this=0x5555558b46c0) at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:193
#50 0x00005555556f143e in arrow::Future<arrow::internal::Empty>::DoMarkFinished (this=0x7ffff27fc770, res=...) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:658
#51 0x00005555556ee304 in arrow::Future<arrow::internal::Empty>::MarkFinished<arrow::internal::Empty, void> (this=0x7ffff27fc770, s=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.h:409
#52 0x00007ffff5925a54 in arrow::detail::ContinueFuture::operator()<arrow::AllFinished(const std::vector<arrow::Future<> >&)::<lambda(const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&)>, const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&>(arrow::Future<arrow::internal::Empty>, arrow::<lambda(const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&)> &&) const (this=0x7ffff27fc7cf, next=..., f=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.h:150
#53 0x00007ffff59258b2 in arrow::detail::ContinueFuture::IgnoringArgsIf<arrow::AllFinished(const std::vector<arrow::Future<> >&)::<lambda(const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&)>, arrow::Future<>, const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&>(std::false_type, arrow::Future<arrow::internal::Empty> &&, arrow::<lambda(const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&)> &&) const (
this=0x7ffff27fc7cf, next=..., f=...) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:193
#54 0x00007ffff5925766 in arrow::Future<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > >::ThenOnComplete<arrow::AllFinished(const std::vector<arrow::Future<> >&)::<lambda(const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&)>, arrow::Future<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > >::PassthruOnFailure<arrow::AllFinished(const std::vector<arrow::Future<> >&)::<lambda(const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&)> > >::operator()(const arrow::Result<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > > &) (this=0x5555558b4798, result=...) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:545
#55 0x00007ffff5925669 in arrow::Future<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > >::WrapResultyOnComplete::Callback<arrow::Future<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > >::ThenOnComplete<arrow::AllFinished(const std::vector<arrow::Future<> >&)::<lambda(const std::vector<a--Type <RET> for more, q to quit, c to continue without paging--
rrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&)>, arrow::Future<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > >::PassthruOnFailure<arrow::AllFinished(const std::vector<arrow::Future<> >&)::<lambda(const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&)> > > >::operator()(const arrow::FutureImpl &) (this=0x5555558b4798, impl=...) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:442
#56 0x00007ffff59255a1 in arrow::internal::FnOnce<void(const arrow::FutureImpl&)>::FnImpl<arrow::Future<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > >::WrapResultyOnComplete::Callback<arrow::Future<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > >::ThenOnComplete<arrow::AllFinished(const std::vector<arrow::Future<> >&)::<lambda(const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&)>, arrow::Future<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > >::PassthruOnFailure<arrow::AllFinished(const std::vector<arrow::Future<> >&)::<lambda(const std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > >&)> > > > >::invoke(const arrow::FutureImpl &) (this=0x5555558b4790, a#0=...)
at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:152
#57 0x00007ffff5926c9c in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::operator()(arrow::FutureImpl const&) && (this=0x5555558b47c0, a#0=...)
at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:140
#58 0x00007ffff5926378 in arrow::ConcreteFutureImpl::RunOrScheduleCallback (self=std::shared_ptr<class arrow::FutureImpl> (use count 2, weak count 1) = {...}, callback_record=..., in_add_callback=false)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:109
#59 0x00007ffff59266ae in arrow::ConcreteFutureImpl::DoMarkFinishedOrFailed (this=0x5555558b44d0, state=arrow::FutureState::SUCCESS)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:147
#60 0x00007ffff5925b53 in arrow::ConcreteFutureImpl::DoMarkFinished (this=0x5555558b44d0) at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:38
#61 0x00007ffff5923c00 in arrow::FutureImpl::MarkFinished (this=0x5555558b44d0) at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:193
#62 0x00007ffff592a16c in arrow::Future<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > >::DoMarkFinished (this=0x5555558b4688, res=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.h:658
#63 0x00007ffff592923d in arrow::Future<std::vector<arrow::Result<arrow::internal::Empty>, std::allocator<arrow::Result<arrow::internal::Empty> > > >::MarkFinished (this=0x5555558b4688, res=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.h:403
#64 0x00007ffff5927c9c in arrow::All<arrow::internal::Empty>(std::vector<arrow::Future<arrow::internal::Empty>, std::allocator<arrow::Future<arrow::internal::Empty> > >)::{lambda(arrow::Result<arrow::internal::Empty> const&)#1}::operator()(arrow::Result<arrow::internal::Empty> const&) (this=0x5555558b4678) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:755
#65 0x00007ffff592c4c3 in arrow::Future<arrow::internal::Empty>::WrapResultyOnComplete::Callback<arrow::All<arrow::internal::Empty>(std::vector<arrow::Future<arrow::internal::Empty>, std::allocator<arrow::Future<arrow::internal::Empty> > >)::{lambda(arrow::Result<arrow::internal::Empty> const&)#1}>::operator()(arrow::FutureImpl const&) && (this=0x5555558b4678, impl=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.h:442
#66 0x00007ffff592c40b in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::FnImpl<arrow::Future<arrow::internal::Empty>::WrapResultyOnComplete::Callback<arrow::All<arrow::internal::Empty>(std::vector<arrow::Future<arrow::internal::Empty>, std::allocator<arrow::Future<arrow::internal::Empty> > >)::{lambda(arrow::Result<arrow::internal::Empty> const&)#1}> >::invoke(arrow::FutureImpl const&) (
this=0x5555558b4670, a#0=...) at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:152
#67 0x00007ffff5926c9c in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::operator()(arrow::FutureImpl const&) && (this=0x5555558b46a0, a#0=...)
at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:140
#68 0x00007ffff5926378 in arrow::ConcreteFutureImpl::RunOrScheduleCallback (self=std::shared_ptr<class arrow::FutureImpl> (use count 3, weak count 1) = {...}, callback_record=..., in_add_callback=false)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:109
#69 0x00007ffff59266ae in arrow::ConcreteFutureImpl::DoMarkFinishedOrFailed (this=0x5555558b3540, state=arrow::FutureState::SUCCESS)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:147
#70 0x00007ffff5925b53 in arrow::ConcreteFutureImpl::DoMarkFinished (this=0x5555558b3540) at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:38
#71 0x00007ffff5923c00 in arrow::FutureImpl::MarkFinished (this=0x5555558b3540) at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:193
#72 0x00005555556f143e in arrow::Future<arrow::internal::Empty>::DoMarkFinished (this=0x5555558b34a0, res=...) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:658
#73 0x00005555556ee304 in arrow::Future<arrow::internal::Empty>::MarkFinished<arrow::internal::Empty, void> (this=0x5555558b34a0, s=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.h:409
#74 0x00007ffff5bbdd4a in arrow::compute::(anonymous namespace)::ConsumingSinkNode::<lambda(const arrow::Status&)>::operator()(const arrow::Status &) const (__closure=0x7fffd8005118, st=...)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/sink_node.cc:382
#75 0x00007ffff5bc25bd in arrow::Future<arrow::internal::Empty>::WrapStatusyOnComplete::Callback<arrow::compute::(anonymous namespace)::ConsumingSinkNode::Finish(const arrow::Status&)::<lambda(const arrow::Status&)> >::operator()(const arrow::FutureImpl &) (this=0x7fffd8005118, impl=...) at /github/westonpace/arrow/cpp/src/arrow/util/future.h:455
#76 0x00007ffff5bc2535 in arrow::internal::FnOnce<void(const arrow::FutureImpl&)>::FnImpl<arrow::Future<>::WrapStatusyOnComplete::Callback<arrow::compute::(anonymous namespace)::ConsumingSinkNode::Finish(const arrow::Status&)::<lambda(const arrow::Status&)> > >::invoke(const arrow::FutureImpl &) (this=0x7fffd8005110, a#0=...)
at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:152
#77 0x00007ffff5926c9c in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::operator()(arrow::FutureImpl const&) && (this=0x7ffff27fd0e0, a#0=...)
at /github/westonpace/arrow/cpp/src/arrow/util/functional.h:140
#78 0x00007ffff5926378 in arrow::ConcreteFutureImpl::RunOrScheduleCallback (self=std::shared_ptr<class arrow::FutureImpl> (use count 2, weak count 1) = {...}, callback_record=..., in_add_callback=true)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:109
#79 0x00007ffff5925db2 in arrow::ConcreteFutureImpl::AddCallback(arrow::internal::FnOnce<void (arrow::FutureImpl const&)>, arrow::CallbackOptions) (this=0x7fffd8001e30, callback=..., opts=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:63
#80 0x00007ffff5923ca0 in arrow::FutureImpl::AddCallback(arrow::internal::FnOnce<void (arrow::FutureImpl const&)>, arrow::CallbackOptions) (this=0x7fffd8001e30, callback=..., opts=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.cc:198
#81 0x00007ffff5bc0739 in arrow::Future<arrow::internal::Empty>::AddCallback<arrow::compute::(anonymous namespace)::ConsumingSinkNode::Finish(const arrow::Status&)::<lambda(const arrow::Status&)> >(arrow:--Type <RET> for more, q to quit, c to continue without paging--
:compute::(anonymous namespace)::ConsumingSinkNode::<lambda(const arrow::Status&)>, arrow::CallbackOptions) const (this=0x7ffff27fd1f0, on_complete=..., opts=...)
at /github/westonpace/arrow/cpp/src/arrow/util/future.h:493
#82 0x00007ffff5bbde6b in arrow::compute::(anonymous namespace)::ConsumingSinkNode::Finish (this=0x5555558b3410, finish_st=...)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/sink_node.cc:379
#83 0x00007ffff5bbdc8c in arrow::compute::(anonymous namespace)::ConsumingSinkNode::InputFinished (this=0x5555558b3410, input=0x5555558b2c90, total_batches=10)
at /github/westonpace/arrow/cpp/src/arrow/compute/exec/sink_node.cc:373
#84 0x00007ffff5ada509 in arrow::compute::AsofJoinNode::Process (this=0x5555558b2c90) at /github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:944
#85 0x00007ffff5ada64f in arrow::compute::AsofJoinNode::ProcessThread (this=0x5555558b2c90) at /github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:954
#86 0x00007ffff5ada670 in arrow::compute::AsofJoinNode::ProcessThreadWrapper (node=0x5555558b2c90) at /github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:958
#87 0x00007ffff5afc376 in std::__invoke_impl<void, void (*)(arrow::compute::AsofJoinNode*), arrow::compute::AsofJoinNode*> (
__f=@0x5555558b31a0: 0x7ffff5ada654 <arrow::compute::AsofJoinNode::ProcessThreadWrapper(arrow::compute::AsofJoinNode*)>) at /usr/include/c++/9/bits/invoke.h:60
#88 0x00007ffff5afc2d6 in std::__invoke<void (*)(arrow::compute::AsofJoinNode*), arrow::compute::AsofJoinNode*> (
__fn=@0x5555558b31a0: 0x7ffff5ada654 <arrow::compute::AsofJoinNode::ProcessThreadWrapper(arrow::compute::AsofJoinNode*)>) at /usr/include/c++/9/bits/invoke.h:95
#89 0x00007ffff5afc235 in std::thread::_Invoker<std::tuple<void (*)(arrow::compute::AsofJoinNode*), arrow::compute::AsofJoinNode*> >::_M_invoke<0ul, 1ul> (this=0x5555558b3198)
at /usr/include/c++/9/thread:244
#90 0x00007ffff5afc1ec in std::thread::_Invoker<std::tuple<void (*)(arrow::compute::AsofJoinNode*), arrow::compute::AsofJoinNode*> >::operator() (this=0x5555558b3198) at /usr/include/c++/9/thread:251
#91 0x00007ffff5afc1cc in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(arrow::compute::AsofJoinNode*), arrow::compute::AsofJoinNode*> > >::_M_run (this=0x5555558b3190)
at /usr/include/c++/9/thread:195
#92 0x00007ffff3e19de4 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#93 0x00007ffff3b14609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#94 0x00007ffff3c58293 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
(gdb)
|
I've seen this CI job's test failure before: so it's unlikely to be related to the changes in this PR. When I looked into this test failure, I observed that it is due to round-trip batches coming out in a different order. @westonpace, you fixed a couple of cases like this recently, so you're probably the right person to look into this one. |
25d83a3 to
1ee83f9
Compare
242abb8 to
bb17b9d
Compare
e3f5147 to
04d0320
Compare
…n timezone (apache#45051) ### Rationale for this change If the timezone database is present on the system, but does not contain a timezone referenced in a ORC file, the ORC reader will crash with an uncaught C++ exception. This can happen for example on Ubuntu 24.04 where some timezone aliases have been removed from the main `tzdata` package to a `tzdata-legacy` package. If `tzdata-legacy` is not installed, trying to read a ORC file that references e.g. the "US/Pacific" timezone would crash. Here is a backtrace excerpt: ``` #12 0x00007f1a3ce23a55 in std::terminate() () from /lib/x86_64-linux-gnu/libstdc++.so.6 #13 0x00007f1a3ce39391 in __cxa_throw () from /lib/x86_64-linux-gnu/libstdc++.so.6 #14 0x00007f1a3f4accc4 in orc::loadTZDB(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900 #15 0x00007f1a3f4ad392 in std::call_once<orc::LazyTimezone::getImpl() const::{lambda()#1}>(std::once_flag&, orc::LazyTimezone::getImpl() const::{lambda()#1}&&)::{lambda()#2}::_FUN() () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900 #16 0x00007f1a4298bec3 in __pthread_once_slow (once_control=0xa5ca7c8, init_routine=0x7f1a3ce69420 <__once_proxy>) at ./nptl/pthread_once.c:116 #17 0x00007f1a3f4a9ad0 in orc::LazyTimezone::getEpoch() const () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900 #18 0x00007f1a3f4e76b1 in orc::TimestampColumnReader::TimestampColumnReader(orc::Type const&, orc::StripeStreams&, bool) () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900 #19 0x00007f1a3f4e84ad in orc::buildReader(orc::Type const&, orc::StripeStreams&, bool, bool, bool) () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900 #20 0x00007f1a3f4e8dd7 in orc::StructColumnReader::StructColumnReader(orc::Type const&, orc::StripeStreams&, bool, bool) () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900 #21 0x00007f1a3f4e8532 in orc::buildReader(orc::Type const&, orc::StripeStreams&, bool, bool, bool) () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900 #22 0x00007f1a3f4925e9 in orc::RowReaderImpl::startNextStripe() () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900 #23 0x00007f1a3f492c9d in orc::RowReaderImpl::next(orc::ColumnVectorBatch&) () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900 #24 0x00007f1a3e6b251f in arrow::adapters::orc::ORCFileReader::Impl::ReadBatch(orc::RowReaderOptions const&, std::shared_ptr<arrow::Schema> const&, long) () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900 ``` ### What changes are included in this PR? Catch C++ exceptions when iterating ORC batches instead of letting them slip through. ### Are these changes tested? Yes. ### Are there any user-facing changes? No. * GitHub Issue: apache#40633 Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Sutou Kouhei <[email protected]>
No description provided.