diff --git a/cpp/src/arrow/util/cancel.cc b/cpp/src/arrow/util/cancel.cc index 5fe4ae3e304..2648059af81 100644 --- a/cpp/src/arrow/util/cancel.cc +++ b/cpp/src/arrow/util/cancel.cc @@ -20,11 +20,14 @@ #include #include #include +#include #include #include "arrow/result.h" +#include "arrow/util/atfork_internal.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" +#include "arrow/util/mutex.h" #include "arrow/util/visibility.h" namespace arrow { @@ -33,7 +36,9 @@ namespace arrow { #error Lock-free atomic int required for signal safety #endif +using internal::AtForkHandler; using internal::ReinstateSignalHandler; +using internal::SelfPipe; using internal::SetSignalHandler; using internal::SignalHandler; @@ -99,16 +104,58 @@ Status StopToken::Poll() const { namespace { -struct SignalStopState { +struct SignalStopState : public std::enable_shared_from_this { struct SavedSignalHandler { int signum; SignalHandler handler; }; + // NOTE: shared_from_this() doesn't work from constructor + void Init() { + // XXX this pattern appears in several places, factor it out? + atfork_handler_ = std::make_shared( + /*before=*/ + [weak_self = std::weak_ptr(shared_from_this())] { + auto self = weak_self.lock(); + if (self) { + self->BeforeFork(); + } + return self; + }, + /*parent_after=*/ + [](std::any token) { + auto self = std::any_cast>(std::move(token)); + self->ParentAfterFork(); + }, + /*child_after=*/ + [](std::any token) { + auto self = std::any_cast>(std::move(token)); + self->ChildAfterFork(); + }); + RegisterAtFork(atfork_handler_); + } + Status RegisterHandlers(const std::vector& signals) { + std::lock_guard lock(mutex_); if (!saved_handlers_.empty()) { return Status::Invalid("Signal handlers already registered"); } + if (!self_pipe_) { + // Make sure the self-pipe is initialized + // (NOTE: avoid std::atomic_is_lock_free() which may require libatomic) +#if ATOMIC_POINTER_LOCK_FREE != 2 + return Status::NotImplemented( + "Cannot setup signal StopSource because atomic pointers are not " + "lock-free on this platform"); +#else + ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true)); +#endif + } + if (!signal_receiving_thread_) { + // Spawn thread for receiving signals + SpawnSignalReceivingThread(); + } + self_pipe_ptr_.store(self_pipe_.get()); for (int signum : signals) { ARROW_ASSIGN_OR_RAISE(auto handler, SetSignalHandler(signum, SignalHandler{&HandleSignal})); @@ -118,6 +165,8 @@ struct SignalStopState { } void UnregisterHandlers() { + std::lock_guard lock(mutex_); + self_pipe_ptr_.store(nullptr); auto handlers = std::move(saved_handlers_); for (const auto& h : handlers) { ARROW_CHECK_OK(SetSignalHandler(h.signum, h.handler).status()); @@ -125,71 +174,126 @@ struct SignalStopState { } ~SignalStopState() { + atfork_handler_.reset(); UnregisterHandlers(); Disable(); + if (signal_receiving_thread_) { + // Tell the receiving thread to stop + auto st = self_pipe_->Shutdown(); + ARROW_WARN_NOT_OK(st, "Failed to shutdown self-pipe"); + if (st.ok()) { + signal_receiving_thread_->join(); + } else { + signal_receiving_thread_->detach(); + } + } } - StopSource* stop_source() { return stop_source_.get(); } + StopSource* stop_source() { + std::lock_guard lock(mutex_); + return stop_source_.get(); + } - bool enabled() { return stop_source_ != nullptr; } + bool enabled() { + std::lock_guard lock(mutex_); + return stop_source_ != nullptr; + } void Enable() { - // Before creating a new StopSource, delete any lingering reference to - // the previous one in the trash can. See DoHandleSignal() for details. - EmptyTrashCan(); - std::atomic_store(&stop_source_, std::make_shared()); + std::lock_guard lock(mutex_); + stop_source_ = std::make_shared(); } - void Disable() { std::atomic_store(&stop_source_, NullSource()); } + void Disable() { + std::lock_guard lock(mutex_); + stop_source_.reset(); + } - static SignalStopState* instance() { return &instance_; } + static SignalStopState* instance() { + static std::shared_ptr instance = []() { + auto ptr = std::make_shared(); + ptr->Init(); + return ptr; + }(); + return instance.get(); + } private: - // For readability - std::shared_ptr NullSource() { return nullptr; } - - void EmptyTrashCan() { std::atomic_store(&trash_can_, NullSource()); } + void SpawnSignalReceivingThread() { + signal_receiving_thread_ = std::make_unique(ReceiveSignals, self_pipe_); + } - static void HandleSignal(int signum) { instance_.DoHandleSignal(signum); } + static void HandleSignal(int signum) { + auto self = instance(); + if (self) { + self->DoHandleSignal(signum); + } + } void DoHandleSignal(int signum) { // async-signal-safe code only - auto source = std::atomic_load(&stop_source_); - if (source) { - source->RequestStopFromSignal(signum); - // Disable() may have been called in the meantime, but we can't - // deallocate a shared_ptr here, so instead move it to a "trash can". - // This minimizes the possibility of running a deallocator here, - // however it doesn't entirely preclude it. - // - // Possible case: - // - a signal handler (A) starts running, fetches the current source - // - Disable() then Enable() are called, emptying the trash can and - // replacing the current source - // - a signal handler (B) starts running, fetches the current source - // - signal handler A resumes, moves its source (the old source) into - // the trash can (the only remaining reference) - // - signal handler B resumes, moves its source (the current source) - // into the trash can. This triggers deallocation of the old source, - // since the trash can had the only remaining reference to it. - // - // This case should be sufficiently unlikely, but we cannot entirely - // rule it out. The problem might be solved properly with a lock-free - // linked list of StopSources. - std::atomic_store(&trash_can_, std::move(source)); + SelfPipe* self_pipe = self_pipe_ptr_.load(); + if (self_pipe) { + self_pipe->Send(/*payload=*/signum); } ReinstateSignalHandler(signum, &HandleSignal); } - std::shared_ptr stop_source_; - std::shared_ptr trash_can_; + static void ReceiveSignals(std::shared_ptr self_pipe) { + // Wait for signals on the self-pipe and propagate them to the current StopSource + DCHECK(self_pipe); + while (true) { + auto maybe_payload = self_pipe->Wait(); + if (maybe_payload.status().IsInvalid()) { + // Pipe shut down + return; + } + if (!maybe_payload.ok()) { + maybe_payload.status().Warn(); + return; + } + const int signum = static_cast(maybe_payload.ValueUnsafe()); + instance()->ReceiveSignal(signum); + } + } - std::vector saved_handlers_; + void ReceiveSignal(int signum) { + std::lock_guard lock(mutex_); + if (stop_source_) { + stop_source_->RequestStopFromSignal(signum); + } + } - static SignalStopState instance_; -}; + // At-fork handlers + + void BeforeFork() { mutex_.lock(); } + + void ParentAfterFork() { mutex_.unlock(); } -SignalStopState SignalStopState::instance_{}; + void ChildAfterFork() { + new (&mutex_) std::mutex; + // Leak previous thread, as it has become invalid. + // We can't spawn a new one here as it would have unfortunate side effects; + // especially in the frequent context of a fork+exec. + // (for example the Python subprocess module closes all fds before calling exec) + ARROW_UNUSED(signal_receiving_thread_.release()); + // Make internal state consistent: with no listening thread, we shouldn't + // feed the self-pipe from the signal handler. + UnregisterHandlers(); + } + + std::mutex mutex_; + std::vector saved_handlers_; + std::shared_ptr stop_source_; + std::unique_ptr signal_receiving_thread_; + std::shared_ptr atfork_handler_; + + // For signal handler interaction + std::shared_ptr self_pipe_; + // Raw atomic pointer, as atomic load/store of a shared_ptr may not be lock-free + // (it is not on libstdc++). + std::atomic self_pipe_ptr_; +}; } // namespace diff --git a/cpp/src/arrow/util/cancel.h b/cpp/src/arrow/util/cancel.h index 7fc62271959..f0d704b2ce0 100644 --- a/cpp/src/arrow/util/cancel.h +++ b/cpp/src/arrow/util/cancel.h @@ -42,6 +42,7 @@ class ARROW_EXPORT StopSource { // Consumer API (the side that stops) void RequestStop(); void RequestStop(Status error); + // Async-signal-safe. TODO Deprecate this? void RequestStopFromSignal(int signum); StopToken token(); @@ -103,6 +104,10 @@ ARROW_EXPORT void ResetSignalStopSource(); /// EXPERIMENTAL: Register signal handler triggering the signal-receiving StopSource +/// +/// Note that those handlers are automatically un-registered in a fork()ed process, +/// therefore the child process will need to call RegisterCancellingSignalHandler() +/// if desired. ARROW_EXPORT Status RegisterCancellingSignalHandler(const std::vector& signals); diff --git a/cpp/src/arrow/util/cancel_test.cc b/cpp/src/arrow/util/cancel_test.cc index bca78034c04..45f6cde4f55 100644 --- a/cpp/src/arrow/util/cancel_test.cc +++ b/cpp/src/arrow/util/cancel_test.cc @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -29,6 +30,8 @@ #include #ifndef _WIN32 #include // for setitimer() +#include +#include #endif #include "arrow/testing/gtest_util.h" @@ -201,6 +204,23 @@ class SignalCancelTest : public CancelTest { ASSERT_EQ(internal::SignalFromStatus(st), expected_signal_); } +#ifndef _WIN32 + void RunInChild(std::function func) { + auto child_pid = fork(); + if (child_pid == -1) { + ASSERT_OK(internal::IOErrorFromErrno(errno, "Error calling fork(): ")); + } + if (child_pid == 0) { + // Child + ASSERT_NO_FATAL_FAILURE(func()) << "Failure in child process"; + std::exit(0); + } else { + // Parent + AssertChildExit(child_pid); + } + } +#endif + protected: #ifdef _WIN32 const int expected_signal_ = SIGINT; @@ -238,6 +258,54 @@ TEST_F(SignalCancelTest, RegisterUnregister) { AssertStopRequested(); } +#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \ + defined(THREAD_SANITIZER)) +TEST_F(SignalCancelTest, ForkSafetyUnregisteredHandlers) { + RunInChild([&]() { + // Child + TriggerSignal(); + AssertStopNotRequested(); + + RegisterHandler(); + TriggerSignal(); + AssertStopRequested(); + }); + + // Parent: shouldn't notice signals raised in child + AssertStopNotRequested(); + + // Stop source still usable in parent + TriggerSignal(); + AssertStopNotRequested(); + + RegisterHandler(); + TriggerSignal(); + AssertStopRequested(); +} + +TEST_F(SignalCancelTest, ForkSafetyRegisteredHandlers) { + RegisterHandler(); + + RunInChild([&]() { + // Child: signal handlers are unregistered and need to be re-registered + TriggerSignal(); + AssertStopNotRequested(); + + // Can re-register and receive signals + RegisterHandler(); + TriggerSignal(); + AssertStopRequested(); + }); + + // Parent: shouldn't notice signals raised in child + AssertStopNotRequested(); + + // Stop source still usable in parent + TriggerSignal(); + AssertStopRequested(); +} +#endif + TEST_F(CancelTest, ThreadedPollSuccess) { constexpr int kNumThreads = 10; diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 571b49c1d7f..1f117de7b2c 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -92,9 +92,11 @@ #include "arrow/buffer.h" #include "arrow/result.h" +#include "arrow/util/atfork_internal.h" #include "arrow/util/checked_cast.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" +#include "arrow/util/mutex.h" // For filename conversion #if defined(_WIN32) @@ -1148,19 +1150,44 @@ Result FileTell(int fd) { } Result CreatePipe() { - int ret; + bool ok; int fds[2]; + Pipe pipe; #if defined(_WIN32) - ret = _pipe(fds, 4096, _O_BINARY); + ok = _pipe(fds, 4096, _O_BINARY) >= 0; + if (ok) { + pipe = {FileDescriptor(fds[0]), FileDescriptor(fds[1])}; + } +#elif defined(__linux__) && defined(__GLIBC__) + // On Unix, we don't want the file descriptors to survive after an exec() call + ok = pipe2(fds, O_CLOEXEC) >= 0; + if (ok) { + pipe = {FileDescriptor(fds[0]), FileDescriptor(fds[1])}; + } #else - ret = ::pipe(fds); + auto set_cloexec = [](int fd) -> bool { + int flags = fcntl(fd, F_GETFD); + if (flags >= 0) { + flags = fcntl(fd, F_SETFD, flags | FD_CLOEXEC); + } + return flags >= 0; + }; + + ok = ::pipe(fds) >= 0; + if (ok) { + pipe = {FileDescriptor(fds[0]), FileDescriptor(fds[1])}; + ok &= set_cloexec(fds[0]); + if (ok) { + ok &= set_cloexec(fds[1]); + } + } #endif - if (ret == -1) { + if (!ok) { return IOErrorFromErrno(errno, "Error creating pipe"); } - return Pipe{FileDescriptor(fds[0]), FileDescriptor(fds[1])}; + return pipe; } Status SetPipeFileDescriptorNonBlocking(int fd) { @@ -1189,7 +1216,7 @@ namespace { #define PIPE_READ read #endif -class SelfPipeImpl : public SelfPipe { +class SelfPipeImpl : public SelfPipe, public std::enable_shared_from_this { static constexpr uint64_t kEofPayload = 5804561806345822987ULL; public: @@ -1204,6 +1231,28 @@ class SelfPipeImpl : public SelfPipe { // We cannot afford blocking writes in a signal handler RETURN_NOT_OK(SetPipeFileDescriptorNonBlocking(pipe_.wfd.fd())); } + + atfork_handler_ = std::make_shared( + /*before=*/ + [weak_self = std::weak_ptr(shared_from_this())] { + auto self = weak_self.lock(); + if (self) { + self->BeforeFork(); + } + return self; + }, + /*parent_after=*/ + [](std::any token) { + auto self = std::any_cast>(std::move(token)); + self->ParentAfterFork(); + }, + /*child_after=*/ + [](std::any token) { + auto self = std::any_cast>(std::move(token)); + self->ChildAfterFork(); + }); + RegisterAtFork(atfork_handler_); + return Status::OK(); } @@ -1263,6 +1312,19 @@ class SelfPipeImpl : public SelfPipe { ~SelfPipeImpl() { ARROW_WARN_NOT_OK(Shutdown(), "On self-pipe destruction"); } protected: + void BeforeFork() {} + + void ParentAfterFork() {} + + void ChildAfterFork() { + // Close and recreate pipe, to avoid interfering with parent. + const bool was_closed = pipe_.rfd.closed() || pipe_.wfd.closed(); + ARROW_CHECK_OK(pipe_.Close()); + if (!was_closed) { + ARROW_CHECK_OK(CreatePipe().Value(&pipe_)); + } + } + Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); } bool DoSend(uint64_t payload) { @@ -1294,6 +1356,8 @@ class SelfPipeImpl : public SelfPipe { const bool signal_safe_; Pipe pipe_; std::atomic please_shutdown_{false}; + + std::shared_ptr atfork_handler_; }; #undef PIPE_WRITE diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index f4fcc26d072..bb5113440bb 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -30,6 +30,7 @@ #ifndef _WIN32 #include +#include #include #endif @@ -446,6 +447,36 @@ TEST_F(TestSelfPipe, SendFromSignalAndWait) { ASSERT_OK(ReadStatus()); } +#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \ + defined(THREAD_SANITIZER)) +TEST_F(TestSelfPipe, ForkSafety) { + self_pipe_->Send(123456789123456789ULL); + + auto child_pid = fork(); + if (child_pid == 0) { + // Child: pipe is reinitialized and usable without interfering with parent + self_pipe_->Send(41ULL); + StartReading(); + SleepABit(); + self_pipe_->Send(42ULL); + AssertPayloadsEventually({41ULL, 42ULL}); + + self_pipe_.reset(); + std::exit(0); + } else { + // Parent: pipe is usable concurrently with child, data is read correctly + StartReading(); + SleepABit(); + self_pipe_->Send(987654321987654321ULL); + + AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL}); + ASSERT_OK(ReadStatus()); + + AssertChildExit(child_pid); + } +} +#endif + TEST(PlatformFilename, RoundtripAscii) { PlatformFilename fn; ASSERT_OK_AND_ASSIGN(fn, PlatformFilename::FromString("a/b"));