Skip to content

Conversation

@rtpsw
Copy link
Collaborator

@rtpsw rtpsw commented Oct 26, 2022

Added support for making generator sources suitable for as-of-join. Also refactored a bit and added docstrings.

@github-actions
Copy link

Thanks for opening a pull request!

If this is not a minor PR. Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW

Opening JIRAs ahead of time contributes to the Openness of the Apache Arrow project.

Then could you also rename pull request title in the following format?

ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}

or

MINOR: [${COMPONENT}] ${SUMMARY}

See also:

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 27, 2022

@icexelloss, @westonpace: I've added a experimentation tool for backpressure with large sources. Here's an example invocation:

$ (export ARROW_BACKPRESSURE_DEMO_NUM_BATCHES=10000; export ARROW_BACKPRESSURE_DEMO_BATCH_SIZE=1000; ./release/arrow-compute-asof-join-node-test --gtest_filter=AsofJoinTest.BackpressureDemoWithBatchesGen)
Running main() from ./googletest/src/gtest_main.cc
Note: Google Test filter = AsofJoinTest.BackpressureDemoWithBatchesGen
[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from AsofJoinTest
[ RUN      ] AsofJoinTest.BackpressureDemoWithBatchesGen
[       OK ] AsofJoinTest.BackpressureDemoWithBatchesGen (115623 ms)
[----------] 1 test from AsofJoinTest (115623 ms total)

[----------] Global test environment tear-down
[==========] 1 test from 1 test suite ran. (115623 ms total)
[  PASSED  ] 1 test.

Trying various parameterizations, I've observed runs with:

  • low batch size leading to reasonably capped memory (VIRT and RES) usage;
  • high batch size leading to exhausted memory, perhaps before reaching the maximum.

I intend to investigate further.

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 27, 2022

This CI job failure suggests that the detach code path in ~AsofJoinNode, which occurs when the process thread invokes the destructor, leads to a race condition, i.e., the destructor completes too early. Given this, I don't yet have a solution for the case of the process thread invoking the destructor.

@icexelloss
Copy link
Collaborator

This CI job failure suggests that the detach code path in ~AsofJoinNode, which occurs when the process thread invokes the destructor, leads to a race condition, i.e., the destructor completes too early. Given this, I don't yet have a solution for the case of the process thread invoking the destructor

Is this an issue because of using the serial executor? I don't recall having this issue before

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 27, 2022

Is this an issue because of using the serial executor? I don't recall having this issue before

No, the issue is due to a fix, the detach call, I added in the destructor, which I did because of a deadlock problem (process thread trying to join on itself) that the slow-source experiment revealed. So, my fix is not good, and while the adding of the detach call can be reverted, I'm not sure how to properly fix the original issue.

@icexelloss
Copy link
Collaborator

Ok. As far as I recall this is a new issue that got introduced on this branch. Perhaps it is because of the serial execution but I am not sure. Do you know why would the process thread try to join itself? As far as I remember the destructor should be called on the thread that owns the Plan then join the process thread.

@icexelloss
Copy link
Collaborator

Perhaps @westonpace has some insight what might be going on

@westonpace
Copy link
Owner

Ah, we probably need to move line 952 up to line 950 in asof_join_node.cc so that the node gets mark finished on a plan thread and not on the process thread. I'll reproduce / fix that this evening (I'm in meetings until about 2PM PST so I won't be able to look at it until then). I'll do a quick review and merge then too.

@icexelloss
Copy link
Collaborator

icexelloss commented Oct 27, 2022 via email

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 27, 2022

This sounds in the right direction, however:

  • The code in line 950 should ensure that MarkFinished is called at the end of the scope no matter what, otherwise an exception could cause a hang. This can be done using an RAII idiom.
  • The destructor code should be reverted to just joining, without a detach code path.

@westonpace
Copy link
Owner

Just moving MarkFinished into the plan task caused a small problem because then the process thread might run again before the plan task had a chance to launch and it might try to mark the node finished a second time. I changed the process thread to return a bool so that it returns false if it is finished (and then this exits the thread). I don't think RAII is needed anymore since, as soon as the if (state_.at(0)->Finished()) check returns true we know that Process() will return false and the thread will end so there is no risk of deadlock if the MarkFinished call fails.

@westonpace
Copy link
Owner

Also, I was able to reproduce the bug by repeatedly running the new backpressure test under stress and confirmed that this change prevented this bug.

if (state_.at(0)->Finished()) {
ErrorIfNotOk(plan_->ScheduleTask([this] {
outputs_[0]->InputFinished(this, batches_produced_);
finished_.MarkFinished();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the previous line throws an exception? This line potentially calls into a node not provided by Arrow, so it could throw. The call to MarkFinished should happen in any case.

Copy link
Collaborator

@icexelloss icexelloss Oct 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be mistaken but I don't think input finished is allowed to throw exception. I think there is a ticket/discussion to allow InputReceived to return a Status rather than void but not sure about InputFinished.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take a safety point of view here. Even if the method is not allowed to throw an exception, it is not something Arrow QA can validate, because the node called into is potentially outside of Arrow code. I'm proposing a small fix that prevents a developer oversight, due to causing an exception where one shouldn't occur, from escalating into a deadlock. A deadlock is strictly worse at least when a long-running execution is planned because the user may notice the deadlock much later than when it occurred; an early-failure is better.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree an early-failure is better - an intended exception should not cause the system to deadlock ideally. @rtpsw I think I am missing sth - do you have a proposed fix?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose fixing by wrapping finished_.MarkFinished() in a destructor for its scope - something like:

ErrorIfNotOk(plan_->ScheduleTask([this] {
        Defer cleanup([]() { finished_.MarkFinished(); });
        outputs_[0]->InputFinished(this, batches_produced_);
        return Status::OK();
      }));

where Defer is something like:

template <typename Callable>
struct Defer {
  Callable callable;
  Defer(Callable callable) : callable(std::move(callable)) {}
  ~Defer() noexcept { callable(); }
};

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for posting code here. I'll only free up to preparing a commit a bit later.

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 28, 2022

@westonpace, your recent commit appears to cause an as-of-join test to hang on my machine. The issue doesn't seem to be deterministic; I see a hang in a "random" AsofJoinBasicTest.TestBasic* test each time. Did you check these tests?

@westonpace
Copy link
Owner

@westonpace, your recent commit appears to cause an as-of-join test to hang on my machine. The issue doesn't seem to be deterministic; I see a hang in a "random" AsofJoinBasicTest.TestBasic* test each time. Did you check these tests?

I'm not sure if I did. I was mostly testing the backpressure test. I'll take a look.

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 28, 2022

Below is a gdb session of a hanging test. I waited maybe a minute before manually interrupting.

$  gdb --args  ./debug/arrow-compute-asof-join-node-test --gtest_filter='AsofJoinNodeTest/AsofJoinBasicTest.TestBasic*'
GNU gdb (Ubuntu 12.0.90-0ubuntu1) 12.0.90
Copyright (C) 2022 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.
Type "show copying" and "show warranty" for details.
This GDB was configured as "x86_64-linux-gnu".
Type "show configuration" for configuration details.
For bug reporting instructions, please see:
<https://www.gnu.org/software/gdb/bugs/>.
Find the GDB manual and other documentation resources online at:
    <http://www.gnu.org/software/gdb/documentation/>.

For help, type "help".
Type "apropos word" to search for commands related to "word"...
Reading symbols from ./debug/arrow-compute-asof-join-node-test...
(gdb) run
Starting program: /mnt/user1/tscontract/github/westonpace/arrow/cpp/build/debug/debug/arrow-compute-asof-join-node-test --gtest_filter=AsofJoinNodeTest/AsofJoinBasicTest.TestBasic\*
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7ffff2dff640 (LWP 248925)]
Running main() from ./googletest/src/gtest_main.cc
Note: Google Test filter = AsofJoinNodeTest/AsofJoinBasicTest.TestBasic*
[==========] Running 36 tests from 1 test suite.
[----------] Global test environment set-up.
[----------] 36 tests from AsofJoinNodeTest/AsofJoinBasicTest
[ RUN      ] AsofJoinNodeTest/AsofJoinBasicTest.TestBasic1/0
[New Thread 0x7ffff25fe640 (LWP 248926)]
[New Thread 0x7ffff17ff640 (LWP 248927)]
[Thread 0x7ffff25fe640 (LWP 248926) exited]
[New Thread 0x7ffff25fe640 (LWP 248928)]
[Thread 0x7ffff25fe640 (LWP 248928) exited]
[New Thread 0x7ffff25fe640 (LWP 248929)]
[Thread 0x7ffff25fe640 (LWP 248929) exited]
[New Thread 0x7ffff25fe640 (LWP 248930)]
[Thread 0x7ffff25fe640 (LWP 248930) exited]
[New Thread 0x7ffff25fe640 (LWP 248931)]
[Thread 0x7ffff25fe640 (LWP 248931) exited]
[New Thread 0x7ffff25fe640 (LWP 248932)]
[Thread 0x7ffff25fe640 (LWP 248932) exited]
^C
Thread 1 "arrow-compute-a" received signal SIGINT, Interrupt.
__futex_abstimed_wait_common64 (private=0, cancel=true, abstime=0x0, op=393, expected=0, futex_word=0x5555557bd330) at ./nptl/futex-internal.c:57
57	./nptl/futex-internal.c: No such file or directory.
(gdb) info threads
  Id   Target Id                                            Frame 
* 1    Thread 0x7ffff7e281c0 (LWP 248922) "arrow-compute-a" __futex_abstimed_wait_common64 (private=0, cancel=true, abstime=0x0, op=393, expected=0, futex_word=0x5555557bd330)
    at ./nptl/futex-internal.c:57
  2    Thread 0x7ffff2dff640 (LWP 248925) "jemalloc_bg_thd" __futex_abstimed_wait_common64 (private=0, cancel=true, abstime=0x0, op=393, expected=0, futex_word=0x7ffff3416630)
    at ./nptl/futex-internal.c:57
  4    Thread 0x7ffff17ff640 (LWP 248927) "jemalloc_bg_thd" __futex_abstimed_wait_common64 (private=0, cancel=true, abstime=0x0, op=393, expected=0, futex_word=0x7ffff3416704)
    at ./nptl/futex-internal.c:57
(gdb) thread apply all bt

Thread 4 (Thread 0x7ffff17ff640 (LWP 248927) "jemalloc_bg_thd"):
#0  __futex_abstimed_wait_common64 (private=0, cancel=true, abstime=0x0, op=393, expected=0, futex_word=0x7ffff3416704) at ./nptl/futex-internal.c:57
#1  __futex_abstimed_wait_common (cancel=true, private=0, abstime=0x0, clockid=0, expected=0, futex_word=0x7ffff3416704) at ./nptl/futex-internal.c:87
#2  __GI___futex_abstimed_wait_cancelable64 (futex_word=futex_word@entry=0x7ffff3416704, expected=expected@entry=0, clockid=clockid@entry=0, abstime=abstime@entry=0x0, private=private@entry=0) at ./nptl/futex-internal.c:139
#3  0x00007ffff3869ac1 in __pthread_cond_wait_common (abstime=0x0, clockid=0, mutex=0x7ffff3416748, cond=0x7ffff34166d8) at ./nptl/pthread_cond_wait.c:503
#4  ___pthread_cond_wait (cond=cond@entry=0x7ffff34166d8, mutex=mutex@entry=0x7ffff3416748) at ./nptl/pthread_cond_wait.c:627
#5  0x00007ffff69e50b3 in background_thread_sleep (tsdn=<optimized out>, interval=<optimized out>, info=<optimized out>) at src/background_thread.c:137
#6  background_work_sleep_once (ind=<optimized out>, info=<optimized out>, tsdn=<optimized out>) at src/background_thread.c:229
#7  background_work (ind=<optimized out>, tsd=<optimized out>) at src/background_thread.c:419
#8  background_thread_entry (ind_arg=<optimized out>) at src/background_thread.c:444
#9  0x00007ffff386ab43 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#10 0x00007ffff38fca00 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

Thread 2 (Thread 0x7ffff2dff640 (LWP 248925) "jemalloc_bg_thd"):
#0  __futex_abstimed_wait_common64 (private=0, cancel=true, abstime=0x0, op=393, expected=0, futex_word=0x7ffff3416630) at ./nptl/futex-internal.c:57
#1  __futex_abstimed_wait_common (cancel=true, private=0, abstime=0x0, clockid=0, expected=0, futex_word=0x7ffff3416630) at ./nptl/futex-internal.c:87
#2  __GI___futex_abstimed_wait_cancelable64 (futex_word=futex_word@entry=0x7ffff3416630, expected=expected@entry=0, clockid=clockid@entry=0, abstime=abstime@entry=0x0, private=private@entry=0) at ./nptl/futex-internal.c:139
#3  0x00007ffff3869ac1 in __pthread_cond_wait_common (abstime=0x0, clockid=0, mutex=0x7ffff3416678, cond=0x7ffff3416608) at ./nptl/pthread_cond_wait.c:503
#4  ___pthread_cond_wait (cond=cond@entry=0x7ffff3416608, mutex=mutex@entry=0x7ffff3416678) at ./nptl/pthread_cond_wait.c:627
#5  0x00007ffff69e58b5 in background_thread_sleep (tsdn=<optimized out>, interval=<optimized out>, info=<optimized out>) at src/background_thread.c:137
#6  background_work_sleep_once (ind=0, info=<optimized out>, tsdn=<optimized out>) at src/background_thread.c:229
#7  background_thread0_work (tsd=<optimized out>) at src/background_thread.c:374
#8  background_work (ind=<optimized out>, tsd=<optimized out>) at src/background_thread.c:412
#9  background_thread_entry (ind_arg=<optimized out>) at src/background_thread.c:444
#10 0x00007ffff386ab43 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#11 0x00007ffff38fca00 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

Thread 1 (Thread 0x7ffff7e281c0 (LWP 248922) "arrow-compute-a"):
#0  __futex_abstimed_wait_common64 (private=0, cancel=true, abstime=0x0, op=393, expected=0, futex_word=0x5555557bd330) at ./nptl/futex-internal.c:57
#1  __futex_abstimed_wait_common (cancel=true, private=0, abstime=0x0, clockid=0, expected=0, futex_word=0x5555557bd330) at ./nptl/futex-internal.c:87
#2  __GI___futex_abstimed_wait_cancelable64 (futex_word=futex_word@entry=0x5555557bd330, expected=expected@entry=0, clockid=clockid@entry=0, abstime=abstime@entry=0x0, private=private@entry=0) at ./nptl/futex-internal.c:139
#3  0x00007ffff3869ac1 in __pthread_cond_wait_common (abstime=0x0, clockid=0, mutex=0x5555557bd2e0, cond=0x5555557bd308) at ./nptl/pthread_cond_wait.c:503
#4  ___pthread_cond_wait (cond=0x5555557bd308, mutex=0x5555557bd2e0) at ./nptl/pthread_cond_wait.c:627
#5  0x00007ffff56b763d in std::condition_variable::wait<arrow::internal::SerialExecutor::RunLoop()::<lambda()> >(std::unique_lock<std::mutex> &, struct {...}) (this=0x5555557bd308, __lock=..., __p=...) at /usr/include/c++/11/condition_variable:103
#6  0x00007ffff56b56e0 in arrow::internal::SerialExecutor::RunLoop (this=0x7fffffffca40) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/util/thread_pool.cc:173
#7  0x00007ffff7a6ed38 in arrow::internal::SerialExecutor::Run<arrow::internal::Empty, arrow::Status>(arrow::internal::FnOnce<arrow::Future<arrow::internal::Empty> (arrow::internal::Executor*)>) (this=0x7fffffffca40, initial_task=...) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/util/thread_pool.h:400
#8  0x00007ffff7a68587 in arrow::internal::SerialExecutor::RunInSerialExecutor<arrow::internal::Empty, arrow::Future<arrow::internal::Empty>, arrow::Status>(arrow::internal::FnOnce<arrow::Future<arrow::internal::Empty> (arrow::internal::Executor*)>) (initial_task=...) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/util/thread_pool.h:297
#9  0x00007ffff7a610e2 in arrow::internal::RunSynchronously<arrow::Future<arrow::internal::Empty>, arrow::internal::Empty>(arrow::internal::FnOnce<arrow::Future<arrow::internal::Empty> (arrow::internal::Executor*)>, bool) (get_future=..., use_threads=false) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/util/thread_pool.h:510
#10 0x00007ffff7a5373c in arrow::compute::StartAndFinish (plan=0x5555558ae070, use_threads=false) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/compute/exec/test_util.cc:178
#11 0x00007ffff7a539f6 in arrow::compute::StartAndCollect(arrow::compute::ExecPlan*, std::function<arrow::Future<std::optional<arrow::compute::ExecBatch> > ()>, bool) (plan=0x5555558ae070, gen=..., use_threads=false) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/compute/exec/test_util.cc:189
#12 0x0000555555662dee in arrow::compute::CheckRunOutput (l_batches=..., r0_batches=..., r1_batches=..., exp_batches=..., join_options=...) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node_test.cc:238
#13 0x0000555555663f86 in arrow::compute::CheckRunOutput (l_batches=..., r0_batches=..., r1_batches=..., exp_batches=..., time=..., key=..., tolerance=1000) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node_test.cc:262
#14 0x000055555567e6ba in arrow::compute::BasicTest::RunSingleByKey()::{lambda(arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema)#1}::operator()(arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema) const (__closure=0x7fffffffd1b8, l_batches=..., r0_batches=..., r1_batches=..., exp_nokey_batches=--Type <RET> for more, q to quit, c to continue without paging--c
..., exp_emptykey_batches=..., exp_batches=...) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node_test.cc:459
#15 0x00005555556a68cc in arrow::compute::BasicTest::RunTypes<arrow::compute::BasicTest::RunSingleByKey()::{lambda(arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema)#1}>(arrow::compute::BasicTestTypes, arrow::compute::BasicTest::RunSingleByKey()::{lambda(arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema)#1}) (this=0x7fffffffdc10, basic_test_types=..., batches_runner=...) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node_test.cc:628
#16 0x000055555568e21c in arrow::compute::BasicTest::RunBatches<arrow::compute::BasicTest::RunSingleByKey()::{lambda(arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema)#1}>(arrow::compute::BasicTest::RunSingleByKey()::{lambda(arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema, arrow::compute::BatchesWithSchema)#1}) (this=0x7fffffffdc10, batches_runner=...) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node_test.cc:591
#17 0x000055555567e749 in arrow::compute::BasicTest::RunSingleByKey (this=0x7fffffffdc10) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node_test.cc:457
#18 0x000055555567e768 in arrow::compute::BasicTest::DoSingleByKey (basic_tests=...) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node_test.cc:463
#19 0x00005555556dbb25 in std::__invoke_impl<void, void (*&)(arrow::compute::BasicTest&), arrow::compute::BasicTest&> (__f=@0x7fffffffdcd0: 0x55555567e74c <arrow::compute::BasicTest::DoSingleByKey(arrow::compute::BasicTest&)>) at /usr/include/c++/11/bits/invoke.h:61
#20 0x00005555556d703b in std::__invoke_r<void, void (*&)(arrow::compute::BasicTest&), arrow::compute::BasicTest&> (__fn=@0x7fffffffdcd0: 0x55555567e74c <arrow::compute::BasicTest::DoSingleByKey(arrow::compute::BasicTest&)>) at /usr/include/c++/11/bits/invoke.h:111
#21 0x00005555556d126f in std::_Function_handler<void (arrow::compute::BasicTest&), void (*)(arrow::compute::BasicTest&)>::_M_invoke(std::_Any_data const&, arrow::compute::BasicTest&) (__functor=..., __args#0=...) at /usr/include/c++/11/bits/std_function.h:290
#22 0x0000555555694677 in std::function<void (arrow::compute::BasicTest&)>::operator()(arrow::compute::BasicTest&) const (this=0x7fffffffdcd0, __args#0=...) at /usr/include/c++/11/bits/std_function.h:590
#23 0x0000555555668689 in arrow::compute::AsofJoinBasicTest_TestBasic1_Test::TestBody (this=0x5555557ae560) at /mnt/user1/tscontract/github/westonpace/arrow/cpp/src/arrow/compute/exec/asof_join_node_test.cc:662
#24 0x00007ffff7b1422f in void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) () from /mnt/user1/tscontract/github/westonpace/arrow/cpp/build/debug/debug/libarrow_testing.so.1000
#25 0x00007ffff7b08dd6 in testing::Test::Run() () from /mnt/user1/tscontract/github/westonpace/arrow/cpp/build/debug/debug/libarrow_testing.so.1000
#26 0x00007ffff7b08f55 in testing::TestInfo::Run() () from /mnt/user1/tscontract/github/westonpace/arrow/cpp/build/debug/debug/libarrow_testing.so.1000
#27 0x00007ffff7b09509 in testing::TestSuite::Run() () from /mnt/user1/tscontract/github/westonpace/arrow/cpp/build/debug/debug/libarrow_testing.so.1000
#28 0x00007ffff7b09c0f in testing::internal::UnitTestImpl::RunAllTests() () from /mnt/user1/tscontract/github/westonpace/arrow/cpp/build/debug/debug/libarrow_testing.so.1000
#29 0x00007ffff7b147f7 in bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) () from /mnt/user1/tscontract/github/westonpace/arrow/cpp/build/debug/debug/libarrow_testing.so.1000
#30 0x00007ffff7b0901c in testing::UnitTest::Run() () from /mnt/user1/tscontract/github/westonpace/arrow/cpp/build/debug/debug/libarrow_testing.so.1000
#31 0x000055555565fb04 in main ()
(gdb) Quit
(gdb) 

@westonpace
Copy link
Owner

Another issue seems to be that TSAN is reporting a race condition because the backpressure concurrent queue's DoHandle is accessing UnsyncSize which doesn't use any lock. I'll work on the deadlock first and then look at the race condition later.

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 28, 2022

I'll commit shortly with a possible fix. @westonpace, please review.

@westonpace
Copy link
Owner

Ah, I also have a possible fix 😆

@westonpace
Copy link
Owner

This was mine: apache@7ef1e24

@westonpace
Copy link
Owner

Your fix is ok but that condition will never evaluate to false so if your fix works then we should just get rid of that if check entirely.

@westonpace
Copy link
Owner

*never evaluate to true

@westonpace
Copy link
Owner

Btw, here is the race condition:

WARNING: ThreadSanitizer: data race (pid=20018)
  Read of size 8 at 0x7b4c00001f90 by main thread:
    #0 std::operator-(std::_Deque_iterator<std::shared_ptr<arrow::RecordBatch>, std::shared_ptr<arrow::RecordBatch>&, std::shared_ptr<arrow::RecordBatch>*> const&, std::_Deque_iterator<std::shared_ptr<arrow::RecordBatch>, std::shared_ptr<arrow::RecordBatch>&, std::shared_ptr<arrow::RecordBatch>*> const&) /usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_deque.h:358:25 (libarrow.so.1000+0x1481995)
    #1 std::deque<std::shared_ptr<arrow::RecordBatch>, std::allocator<std::shared_ptr<arrow::RecordBatch> > >::size() const /usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_deque.h:1233:40 (libarrow.so.1000+0x1481840)
    #2 std::queue<std::shared_ptr<arrow::RecordBatch>, std::deque<std::shared_ptr<arrow::RecordBatch>, std::allocator<std::shared_ptr<arrow::RecordBatch> > > >::size() const /usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_queue.h:209:18 (libarrow.so.1000+0x14817f5)
    #3 arrow::compute::ConcurrentQueue<std::shared_ptr<arrow::RecordBatch> >::UnsyncSize() const /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:128:45 (libarrow.so.1000+0x14817b5)
    #4 arrow::compute::BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch> >::DoHandle::DoHandle(arrow::compute::BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch> >&) /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:297:45 (libarrow.so.1000+0x1481524)
    #5 arrow::compute::BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch> >::Push(std::shared_ptr<arrow::RecordBatch> const&) /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:318:14 (libarrow.so.1000+0x14a7346)
    #6 arrow::compute::InputState::Push(std::shared_ptr<arrow::RecordBatch> const&) /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:550:14 (libarrow.so.1000+0x14a68d8)
    #7 arrow::compute::AsofJoinNode::InputReceived(arrow::compute::ExecNode*, arrow::compute::ExecBatch) /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:1278:31 (libarrow.so.1000+0x14695c5)

  Previous write of size 8 at 0x7b4c00001f90 by thread T3 (mutexes: write M860041654558527520, write M859197195268661200):
    #0 std::deque<std::shared_ptr<arrow::RecordBatch>, std::allocator<std::shared_ptr<arrow::RecordBatch> > >::pop_front() /usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_deque.h:1539:6 (libarrow.so.1000+0x1481f46)
    #1 std::queue<std::shared_ptr<arrow::RecordBatch>, std::deque<std::shared_ptr<arrow::RecordBatch>, std::allocator<std::shared_ptr<arrow::RecordBatch> > > >::pop() /usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_queue.h:301:4 (libarrow.so.1000+0x1481aa5)
    #2 arrow::compute::ConcurrentQueue<std::shared_ptr<arrow::RecordBatch> >::TryPop() /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:112:14 (libarrow.so.1000+0x1481609)
    #3 arrow::compute::BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch> >::TryPop() /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:329:32 (libarrow.so.1000+0x14813bc)
    #4 arrow::compute::InputState::Advance() /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:496:38 (libarrow.so.1000+0x146f830)
    #5 arrow::compute::InputState::AdvanceAndMemoize(unsigned long) /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:534:7 (libarrow.so.1000+0x1475f5c)
    #6 arrow::compute::AsofJoinNode::UpdateRhs() /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:845:7 (libarrow.so.1000+0x146e9a8)
    #7 arrow::compute::AsofJoinNode::ProcessInner() /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:883:7 (libarrow.so.1000+0x146cd8e)
    #8 arrow::compute::AsofJoinNode::Process() /home/pace/dev/arrow/cpp/src/arrow/compute/exec/asof_join_node.cc:937:53 (libarrow.so.1000+0x146be04)

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 28, 2022

@westonpace, if the as-of-join tests work with your fix, then let's go with it. I'd just suggest adding this patch to it.

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 28, 2022

To be sure, is the race condition related to the hang and fixed by your hang-fixing code?

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 28, 2022

Another issue seems to be that TSAN is reporting a race condition because the backpressure concurrent queue's DoHandle is accessing UnsyncSize which doesn't use any lock. I'll work on the deadlock first and then look at the race condition later.

Never mind, now I see this.

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 28, 2022

I'll work on the deadlock first and then look at the race condition later.

Ah, the DoHandle logic should be ran inside the mutex lock of ConcurrentQueue. I'll work on this.

: handler_(std::move(handler)) {}

T Pop() {
std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the deadlock issue?

@icexelloss
Copy link
Collaborator

icexelloss commented Oct 31, 2022

@rtpsw It looks like this PR is a mixture of
(1) Various refactors
(2) Fix for a deadlock / race condition.

Can you better title the PR and summarize the change so I can understand how urgent/important this issue is? I assume we want to pick this change for the MVP release as well?

@rtpsw
Copy link
Collaborator Author

rtpsw commented Oct 31, 2022

This PR started with refactoring and adding testing of as-of-join backpressure for large sources. During discussions, a couple of side-issues were handled:

  • A destructor issue: This issue led to a couple of attempted fixes. Originally, the destructor caused a deadlock when it was called from the process thread. Then, a detach code path attempt was made, but it led to a (first) race condition. Finally, Weston said he has a fix that remains to be integrated. @westonpace, are you happy with this fix? and intend to add it? or should I?
  • A TSAN (second) race condition issue: My recent commit should fix this issue. @westonpace, could you run you ASAN test again on this commit or let me know how to run it?

Given the above, I think this PR is important to finish soon. If we had ample time, we could have had a separate PR for each side-issue.

@westonpace
Copy link
Owner

A destructor issue: This issue led to a couple of attempted fixes. Originally, the destructor caused a deadlock when it was called from the process thread. Then, a detach code path attempt was made, but it led to a (first) race condition. Finally, #21 (comment) he has a fix that remains to be integrated. @westonpace, are you happy with this fix? and intend to add it? or should I?

@rtpsw I put my fix in and added your defer suggestion. If custom code throws exceptions it will probably still break things. I don't think we are going to solve that today. But at least we can be a little bit safer :)

A TSAN (second) race condition issue: My recent commit should fix this issue. @westonpace, could you run you ASAN test again on this commit or let me know how to run it?

I ran my test repeatedly and did not see the issue. I was actually running TSAN, just with stress (an ubuntu program which stresses the CPU). To do this I run stress -c 32 in a separate window (I have 16 cores so 32 stressors jams the system up pretty good which is the goal). Sometimes I also find it useful to limit the # of cores a test runs on. I do this with taskset -c 0,1 TEST_NAME. That would limit the test to cores 0 & 1. This kind of thing can help mimic the CI environment.

If you want to run ASAN you can (there is a CI job which runs it as well) by setting ARROW_USE_ASAN in the cmake options. For good measure I went ahead and ran the asof join tests through ASAN a few times and didn't see anything.

…o make it more obvious what the function is doing. Fix a substrait test that was relying on an order that isn't really deterministic.
@westonpace westonpace merged commit b9cda43 into feature/bamboo-demo Nov 1, 2022
@westonpace
Copy link
Owner

I went ahead and merged this since we seemed to be in a stable state. Let me know if there is anything else I need to look at.

@rtpsw
Copy link
Collaborator Author

rtpsw commented Nov 1, 2022

Thanks, @westonpace. I reviewed - LGTM.

westonpace pushed a commit that referenced this pull request Apr 1, 2025
…n timezone (apache#45051)

### Rationale for this change

If the timezone database is present on the system, but does not contain a timezone referenced in a ORC file, the ORC reader will crash with an uncaught C++ exception.

This can happen for example on Ubuntu 24.04 where some timezone aliases have been removed from the main `tzdata` package to a `tzdata-legacy` package. If `tzdata-legacy` is not installed, trying to read a ORC file that references e.g. the "US/Pacific" timezone would crash.

Here is a backtrace excerpt:
```
#12 0x00007f1a3ce23a55 in std::terminate() () from /lib/x86_64-linux-gnu/libstdc++.so.6
#13 0x00007f1a3ce39391 in __cxa_throw () from /lib/x86_64-linux-gnu/libstdc++.so.6
#14 0x00007f1a3f4accc4 in orc::loadTZDB(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) ()
   from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900
#15 0x00007f1a3f4ad392 in std::call_once<orc::LazyTimezone::getImpl() const::{lambda()#1}>(std::once_flag&, orc::LazyTimezone::getImpl() const::{lambda()#1}&&)::{lambda()#2}::_FUN() () from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900
#16 0x00007f1a4298bec3 in __pthread_once_slow (once_control=0xa5ca7c8, init_routine=0x7f1a3ce69420 <__once_proxy>) at ./nptl/pthread_once.c:116
#17 0x00007f1a3f4a9ad0 in orc::LazyTimezone::getEpoch() const ()
   from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900
#18 0x00007f1a3f4e76b1 in orc::TimestampColumnReader::TimestampColumnReader(orc::Type const&, orc::StripeStreams&, bool) ()
   from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900
#19 0x00007f1a3f4e84ad in orc::buildReader(orc::Type const&, orc::StripeStreams&, bool, bool, bool) ()
   from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900
#20 0x00007f1a3f4e8dd7 in orc::StructColumnReader::StructColumnReader(orc::Type const&, orc::StripeStreams&, bool, bool) ()
   from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900
#21 0x00007f1a3f4e8532 in orc::buildReader(orc::Type const&, orc::StripeStreams&, bool, bool, bool) ()
   from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900
#22 0x00007f1a3f4925e9 in orc::RowReaderImpl::startNextStripe() ()
   from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900
#23 0x00007f1a3f492c9d in orc::RowReaderImpl::next(orc::ColumnVectorBatch&) ()
   from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900
#24 0x00007f1a3e6b251f in arrow::adapters::orc::ORCFileReader::Impl::ReadBatch(orc::RowReaderOptions const&, std::shared_ptr<arrow::Schema> const&, long) ()
   from /tmp/arrow-HEAD.ArqTs/venv-wheel-3.12-manylinux_2_17_x86_64.manylinux2014_x86_64/lib/python3.12/site-packages/pyarrow/libarrow.so.1900
```

### What changes are included in this PR?

Catch C++ exceptions when iterating ORC batches instead of letting them slip through.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* GitHub Issue: apache#40633

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants