From d08b946351929c3c3660ccc62737be8b9364c458 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 3 Mar 2022 17:01:22 -0400 Subject: [PATCH 01/28] compiling sketch --- r/R/arrowExports.R | 4 ++ r/src/arrowExports.cpp | 44 +++++++---------- r/src/safe-call-into-r.cpp | 67 ++++++++++++++++++++++++++ r/src/safe-call-into-r.h | 98 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 28 deletions(-) create mode 100644 r/src/safe-call-into-r.cpp create mode 100644 r/src/safe-call-into-r.h diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index c9468f52ae3..f584423b40c 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1704,6 +1704,10 @@ ipc___RecordBatchStreamWriter__Open <- function(stream, schema, use_legacy_forma .Call(`_arrow_ipc___RecordBatchStreamWriter__Open`, stream, schema, use_legacy_format, metadata_version) } +TestSafeCallIntoR <- function(funs_that_return_a_string) { + .Call(`_arrow_TestSafeCallIntoR`, funs_that_return_a_string) +} + Array__GetScalar <- function(x, i) { .Call(`_arrow_Array__GetScalar`, x, i) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 59762790fa3..17375966ef9 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -6711,6 +6711,21 @@ extern "C" SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEX } #endif +// safe-call-into-r.cpp +#if defined(ARROW_R_WITH_ARROW) +cpp11::strings TestSafeCallIntoR(cpp11::list funs_that_return_a_string); +extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP funs_that_return_a_string_sexp){ +BEGIN_CPP11 + arrow::r::Input::type funs_that_return_a_string(funs_that_return_a_string_sexp); + return cpp11::as_sexp(TestSafeCallIntoR(funs_that_return_a_string)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP funs_that_return_a_string_sexp){ + Rf_error("Cannot call TestSafeCallIntoR(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // scalar.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr Array__GetScalar(const std::shared_ptr& x, int64_t i); @@ -7541,32 +7556,6 @@ extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){ } #endif -#if defined(ARROW_R_WITH_ARROW) -extern "C" SEXP _arrow_Table__Reset(SEXP r6) { -BEGIN_CPP11 -arrow::r::r6_reset_pointer(r6); -END_CPP11 -return R_NilValue; -} -#else -extern "C" SEXP _arrow_Table__Reset(SEXP r6){ - Rf_error("Cannot call Table(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif - -#if defined(ARROW_R_WITH_ARROW) -extern "C" SEXP _arrow_RecordBatch__Reset(SEXP r6) { -BEGIN_CPP11 -arrow::r::r6_reset_pointer(r6); -END_CPP11 -return R_NilValue; -} -#else -extern "C" SEXP _arrow_RecordBatch__Reset(SEXP r6){ - Rf_error("Cannot call RecordBatch(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif - extern "C" SEXP _arrow_available() { return Rf_ScalarLogical( #if defined(ARROW_R_WITH_ARROW) @@ -8044,6 +8033,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1}, { "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 4}, { "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 4}, + { "_arrow_TestSafeCallIntoR", (DL_FUNC) &_arrow_TestSafeCallIntoR, 1}, { "_arrow_Array__GetScalar", (DL_FUNC) &_arrow_Array__GetScalar, 2}, { "_arrow_Scalar__ToString", (DL_FUNC) &_arrow_Scalar__ToString, 1}, { "_arrow_StructScalar__field", (DL_FUNC) &_arrow_StructScalar__field, 2}, @@ -8097,8 +8087,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_GetIOThreadPoolCapacity", (DL_FUNC) &_arrow_GetIOThreadPoolCapacity, 0}, { "_arrow_SetIOThreadPoolCapacity", (DL_FUNC) &_arrow_SetIOThreadPoolCapacity, 1}, { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, - { "_arrow_Table__Reset", (DL_FUNC) &_arrow_Table__Reset, 1}, - { "_arrow_RecordBatch__Reset", (DL_FUNC) &_arrow_RecordBatch__Reset, 1}, {NULL, NULL, 0} }; extern "C" void R_init_arrow(DllInfo* dll){ diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r.cpp new file mode 100644 index 00000000000..4b646a321f4 --- /dev/null +++ b/r/src/safe-call-into-r.cpp @@ -0,0 +1,67 @@ + +#include "safe-call-into-r.h" +#include +#include + +static MainRThreadTasks main_r_thread_tasks; + +void SafeCallIntoRBase(MainRThreadTasks::Task* task) { + main_r_thread_tasks.Add(task); + while (!main_r_thread_tasks.is_finished(task)) { + std::this_thread::yield(); + std::this_thread::sleep_for(std::chrono::nanoseconds(1000)); + } +} + +MainRThreadTasks::EventLoop::EventLoop(): keep_looping_(false) { + thread_ = std::this_thread::get_id(); + main_r_thread_tasks.Register(this); +} + +MainRThreadTasks::EventLoop::~EventLoop() { + main_r_thread_tasks.Unregister(); +} + +void MainRThreadTasks::EventLoop::start_looping() { + // must be called from another thread but needs to evaluate this next + // part on the main thread? + while (keep_looping_) { + try { + main_r_thread_tasks.EvaluatePending(); + } catch (cpp11::unwind_exception& e) { + // important that this this error makes its way back to the top + // level and is caught in the auto-generated glue code + throw e; + } + std::this_thread::sleep_for(std::chrono::nanoseconds(10000)); + } +} + +void MainRThreadTasks::EventLoop::stop_looping() { + keep_looping_ = false; +} + +// [[arrow::export]] +cpp11::strings TestSafeCallIntoR(cpp11::list funs_that_return_a_string) { + + // pretending that this could be called from another thread + std::vector results; + for (R_xlen_t i = 0; i < funs_that_return_a_string.size(); i++) { + std::function f = [&]() { + cpp11::function fun (funs_that_return_a_string[i]); + return fun(); + }; + + std::string result = SafeCallIntoR(f); + results.push_back(result); + } + + // and then this would be back on the main thread just to make + // sure the results are correct + cpp11::writable::strings results_sexp; + for (std::string& result: results) { + results_sexp.push_back(result); + } + + return results_sexp; +} diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h new file mode 100644 index 00000000000..21f0c7dc884 --- /dev/null +++ b/r/src/safe-call-into-r.h @@ -0,0 +1,98 @@ + +#ifndef SAFE_CALL_INTO_R_INCLUDED +#define SAFE_CALL_INTO_R_INCLUDED + +#include "./arrow_types.h" + +#include +#include +#include +#include +#include + +class MainRThreadTasks { +public: + class Task { + public: + virtual ~Task() {} + virtual void run() {} + }; + + class EventLoop { + public: + EventLoop(); + ~EventLoop(); + void start_looping(); + void stop_looping(); + private: + std::thread::id thread_; + bool keep_looping_; + }; + + void EvaluatePending() { + lock_.lock(); + while (!tasks_.empty()) { + Task* task = tasks_.back(); + tasks_.pop_back(); + task->run(); + } + lock_.unlock(); + } + + void Add(Task* task) { + lock_.lock(); + tasks_.push_front(task); + lock_.unlock(); + } + + bool is_finished(Task* task) { + lock_.lock(); + bool result = results_.find(task) != results_.end(); + lock_.unlock(); + return result; + } + + void Register(EventLoop* loop) { + if (loop_ == nullptr) { + throw std::runtime_error("Event loop already registered"); + } + loop_ = loop; + } + + void Unregister() { + loop_ = nullptr; + } + +private: + std::deque tasks_; + std::unordered_set results_; + std::mutex lock_; + EventLoop* loop_; +}; + + +void SafeCallIntoRBase(MainRThreadTasks::Task* task); + +template +T SafeCallIntoR(std::function fun) { + class TypedTask: public MainRThreadTasks::Task { + public: + TypedTask(std::function fun): fun_(fun) {}; + + void run() { + result = cpp11::as_cpp(fun_()); + } + + T result; + + private: + std::function fun_; + }; + + TypedTask task(fun); + // SafeCallIntoRBase(&task); + task.run(); + return task.result; +} + +#endif From 5671a1372a31b54634cf770e2692c3d1d23a4555 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 4 Mar 2022 08:09:12 -0400 Subject: [PATCH 02/28] remove event loop indirection --- r/src/safe-call-into-r.cpp | 86 +++++++++++----------------- r/src/safe-call-into-r.h | 112 +++++++++++++------------------------ 2 files changed, 72 insertions(+), 126 deletions(-) diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r.cpp index 4b646a321f4..ffb39ae1d71 100644 --- a/r/src/safe-call-into-r.cpp +++ b/r/src/safe-call-into-r.cpp @@ -1,67 +1,49 @@ #include "safe-call-into-r.h" -#include #include +#include static MainRThreadTasks main_r_thread_tasks; void SafeCallIntoRBase(MainRThreadTasks::Task* task) { - main_r_thread_tasks.Add(task); - while (!main_r_thread_tasks.is_finished(task)) { - std::this_thread::yield(); - std::this_thread::sleep_for(std::chrono::nanoseconds(1000)); - } -} - -MainRThreadTasks::EventLoop::EventLoop(): keep_looping_(false) { - thread_ = std::this_thread::get_id(); - main_r_thread_tasks.Register(this); + if (main_r_thread_tasks.Loop() == nullptr) { + throw std::runtime_error("Global R executor not registered"); + } + + if (main_r_thread_tasks.Loop()->thread() == std::this_thread::get_id()) { + task->run(); + } else { + throw std::runtime_error("Attempt to evaluate task on the non-R thread"); + } } -MainRThreadTasks::EventLoop::~EventLoop() { - main_r_thread_tasks.Unregister(); +MainRThreadTasks::EventLoop::EventLoop() { + thread_ = std::this_thread::get_id(); + main_r_thread_tasks.Register(this); } -void MainRThreadTasks::EventLoop::start_looping() { - // must be called from another thread but needs to evaluate this next - // part on the main thread? - while (keep_looping_) { - try { - main_r_thread_tasks.EvaluatePending(); - } catch (cpp11::unwind_exception& e) { - // important that this this error makes its way back to the top - // level and is caught in the auto-generated glue code - throw e; - } - std::this_thread::sleep_for(std::chrono::nanoseconds(10000)); - } -} - -void MainRThreadTasks::EventLoop::stop_looping() { - keep_looping_ = false; -} +MainRThreadTasks::EventLoop::~EventLoop() { main_r_thread_tasks.Unregister(); } // [[arrow::export]] cpp11::strings TestSafeCallIntoR(cpp11::list funs_that_return_a_string) { - - // pretending that this could be called from another thread - std::vector results; - for (R_xlen_t i = 0; i < funs_that_return_a_string.size(); i++) { - std::function f = [&]() { - cpp11::function fun (funs_that_return_a_string[i]); - return fun(); - }; - - std::string result = SafeCallIntoR(f); - results.push_back(result); - } - - // and then this would be back on the main thread just to make - // sure the results are correct - cpp11::writable::strings results_sexp; - for (std::string& result: results) { - results_sexp.push_back(result); - } - - return results_sexp; + MainRThreadTasks::EventLoop loop; + + // pretending that this could be called from another thread + std::vector results; + for (R_xlen_t i = 0; i < funs_that_return_a_string.size(); i++) { + std::string result = SafeCallIntoR([&]() { + cpp11::function fun(funs_that_return_a_string[i]); + return fun(); + }); + results.push_back(result); + } + + // and then this would be back on the main thread just to make + // sure the results are correct + cpp11::writable::strings results_sexp; + for (std::string& result : results) { + results_sexp.push_back(result); + } + + return results_sexp; } diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 21f0c7dc884..250141a6a35 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -5,94 +5,58 @@ #include "./arrow_types.h" #include -#include +#include #include #include -#include +#include class MainRThreadTasks { -public: - class Task { - public: - virtual ~Task() {} - virtual void run() {} - }; - - class EventLoop { - public: - EventLoop(); - ~EventLoop(); - void start_looping(); - void stop_looping(); - private: - std::thread::id thread_; - bool keep_looping_; - }; - - void EvaluatePending() { - lock_.lock(); - while (!tasks_.empty()) { - Task* task = tasks_.back(); - tasks_.pop_back(); - task->run(); - } - lock_.unlock(); - } - - void Add(Task* task) { - lock_.lock(); - tasks_.push_front(task); - lock_.unlock(); - } - - bool is_finished(Task* task) { - lock_.lock(); - bool result = results_.find(task) != results_.end(); - lock_.unlock(); - return result; - } - - void Register(EventLoop* loop) { - if (loop_ == nullptr) { - throw std::runtime_error("Event loop already registered"); - } - loop_ = loop; - } - - void Unregister() { - loop_ = nullptr; - } - -private: - std::deque tasks_; - std::unordered_set results_; - std::mutex lock_; - EventLoop* loop_; -}; + public: + class Task { + public: + virtual ~Task() {} + virtual void run() {} + }; + + class EventLoop { + public: + EventLoop(); + ~EventLoop(); + std::thread::id thread() { return thread_; } + private: + std::thread::id thread_; + }; + + void Register(EventLoop* loop) { loop_ = loop; } + + void Unregister() { loop_ = nullptr; } + + EventLoop* Loop() { return loop_; } + + private: + EventLoop* loop_; +}; void SafeCallIntoRBase(MainRThreadTasks::Task* task); template T SafeCallIntoR(std::function fun) { - class TypedTask: public MainRThreadTasks::Task { - public: - TypedTask(std::function fun): fun_(fun) {}; + class TypedTask : public MainRThreadTasks::Task { + public: + TypedTask(std::function fun) : fun_(fun){}; - void run() { - result = cpp11::as_cpp(fun_()); - } + void run() { result = cpp11::as_cpp(fun_()); } - T result; + T result; - private: - std::function fun_; - }; + private: + std::function fun_; + }; - TypedTask task(fun); - // SafeCallIntoRBase(&task); - task.run(); - return task.result; + TypedTask task(fun); + SafeCallIntoRBase(&task); + return task.result; } #endif From e3dbdd3a578837fe11f64bff0d063a0406334777 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 4 Mar 2022 08:11:27 -0400 Subject: [PATCH 03/28] license --- r/src/safe-call-into-r.cpp | 16 ++++++++++++++++ r/src/safe-call-into-r.h | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r.cpp index ffb39ae1d71..a1d581fe088 100644 --- a/r/src/safe-call-into-r.cpp +++ b/r/src/safe-call-into-r.cpp @@ -1,3 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. #include "safe-call-into-r.h" #include diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 250141a6a35..fa899094844 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -1,3 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. #ifndef SAFE_CALL_INTO_R_INCLUDED #define SAFE_CALL_INTO_R_INCLUDED From 7941d8de72f3d6b2d050a3239f6cbe72444ca3b2 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 13:51:13 -0300 Subject: [PATCH 04/28] redo safe call into r --- r/R/arrow-package.R | 3 ++ r/R/arrowExports.R | 8 +++- r/src/arrowExports.cpp | 27 +++++++++-- r/src/safe-call-into-r.cpp | 96 ++++++++++++++++++++++++++++---------- r/src/safe-call-into-r.h | 88 ++++++++++++++++++++++++---------- 5 files changed, 165 insertions(+), 57 deletions(-) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index 47b10b1bb10..953d374152e 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -31,6 +31,9 @@ #' @importFrom vctrs s3_register vec_size vec_cast vec_unique .onLoad <- function(...) { + # Make sure C++ knows on which thread it is safe to call the R API + InitializeMainRThread() + dplyr_methods <- paste0( "dplyr::", c( diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index f584423b40c..448f3fb101f 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1704,8 +1704,12 @@ ipc___RecordBatchStreamWriter__Open <- function(stream, schema, use_legacy_forma .Call(`_arrow_ipc___RecordBatchStreamWriter__Open`, stream, schema, use_legacy_format, metadata_version) } -TestSafeCallIntoR <- function(funs_that_return_a_string) { - .Call(`_arrow_TestSafeCallIntoR`, funs_that_return_a_string) +InitializeMainRThread <- function() { + invisible(.Call(`_arrow_InitializeMainRThread`)) +} + +TestSafeCallIntoR <- function(funs_that_return_a_string, async) { + .Call(`_arrow_TestSafeCallIntoR`, funs_that_return_a_string, async) } Array__GetScalar <- function(x, i) { diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 17375966ef9..8c49e7ea5cc 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -6713,15 +6713,31 @@ extern "C" SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEX // safe-call-into-r.cpp #if defined(ARROW_R_WITH_ARROW) -cpp11::strings TestSafeCallIntoR(cpp11::list funs_that_return_a_string); -extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP funs_that_return_a_string_sexp){ +void InitializeMainRThread(); +extern "C" SEXP _arrow_InitializeMainRThread(){ +BEGIN_CPP11 + InitializeMainRThread(); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_InitializeMainRThread(){ + Rf_error("Cannot call InitializeMainRThread(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// safe-call-into-r.cpp +#if defined(ARROW_R_WITH_ARROW) +cpp11::strings TestSafeCallIntoR(cpp11::list funs_that_return_a_string, bool async); +extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP funs_that_return_a_string_sexp, SEXP async_sexp){ BEGIN_CPP11 arrow::r::Input::type funs_that_return_a_string(funs_that_return_a_string_sexp); - return cpp11::as_sexp(TestSafeCallIntoR(funs_that_return_a_string)); + arrow::r::Input::type async(async_sexp); + return cpp11::as_sexp(TestSafeCallIntoR(funs_that_return_a_string, async)); END_CPP11 } #else -extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP funs_that_return_a_string_sexp){ +extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP funs_that_return_a_string_sexp, SEXP async_sexp){ Rf_error("Cannot call TestSafeCallIntoR(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -8033,7 +8049,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1}, { "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 4}, { "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 4}, - { "_arrow_TestSafeCallIntoR", (DL_FUNC) &_arrow_TestSafeCallIntoR, 1}, + { "_arrow_InitializeMainRThread", (DL_FUNC) &_arrow_InitializeMainRThread, 0}, + { "_arrow_TestSafeCallIntoR", (DL_FUNC) &_arrow_TestSafeCallIntoR, 2}, { "_arrow_Array__GetScalar", (DL_FUNC) &_arrow_Array__GetScalar, 2}, { "_arrow_Scalar__ToString", (DL_FUNC) &_arrow_Scalar__ToString, 1}, { "_arrow_StructScalar__field", (DL_FUNC) &_arrow_StructScalar__field, 2}, diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r.cpp index a1d581fe088..22298bbac44 100644 --- a/r/src/safe-call-into-r.cpp +++ b/r/src/safe-call-into-r.cpp @@ -19,44 +19,90 @@ #include #include -static MainRThreadTasks main_r_thread_tasks; +static MainRThread main_r_thread; -void SafeCallIntoRBase(MainRThreadTasks::Task* task) { - if (main_r_thread_tasks.Loop() == nullptr) { - throw std::runtime_error("Global R executor not registered"); - } +MainRThread* GetMainRThread() { return &main_r_thread; } - if (main_r_thread_tasks.Loop()->thread() == std::this_thread::get_id()) { - task->run(); +arrow::Status MainRThread::RunTask(Task* task) { + if (IsMainThread()) { + try { + ARROW_RETURN_NOT_OK(task->run()); + return arrow::Status::OK(); + } catch (cpp11::unwind_exception& e) { + SetError(e.token); + return arrow::Status::UnknownError("R code execution error"); + } } else { - throw std::runtime_error("Attempt to evaluate task on the non-R thread"); + return arrow::Status::NotImplemented("Call to R from a non-R thread"); } } -MainRThreadTasks::EventLoop::EventLoop() { - thread_ = std::this_thread::get_id(); - main_r_thread_tasks.Register(this); -} - -MainRThreadTasks::EventLoop::~EventLoop() { main_r_thread_tasks.Unregister(); } - // [[arrow::export]] -cpp11::strings TestSafeCallIntoR(cpp11::list funs_that_return_a_string) { - MainRThreadTasks::EventLoop loop; +void InitializeMainRThread() { main_r_thread.Initialize(); } - // pretending that this could be called from another thread +// [[arrow::export]] +cpp11::strings TestSafeCallIntoR(cpp11::list funs_that_return_a_string, bool async) { std::vector results; - for (R_xlen_t i = 0; i < funs_that_return_a_string.size(); i++) { - std::string result = SafeCallIntoR([&]() { - cpp11::function fun(funs_that_return_a_string[i]); - return fun(); + + if (async) { + // Probably a better test would be to submit all the jobs at once and + // wait for them all to finish, but I'm not sure how to do that yet! + + // This simulates the Arrow thread pool. Just imagine it is static and lives forever. + std::thread* thread_ptr; + + SEXP funs_sexp = funs_that_return_a_string; + R_xlen_t n_funs = funs_that_return_a_string.size(); + + auto fut = arrow::Future>::Make(); + + thread_ptr = new std::thread([fut, funs_sexp, n_funs]() mutable { + std::vector results_local; + + for (R_xlen_t i = 0; i < n_funs; i++) { + auto result = SafeCallIntoR([&] { + cpp11::function fun(VECTOR_ELT(funs_sexp, i)); + return cpp11::as_cpp(fun()); + }); + + if (result.ok()) { + results_local.push_back(result.ValueUnsafe()); + } else { + fut.MarkFinished(result.status()); + return; + } + } + + fut.MarkFinished(results_local); }); - results.push_back(result); + + // Simulated thread pool + thread_ptr->join(); + delete thread_ptr; + + // We didn't evaluate any R code because it wasn't safe, but + // if we did and there was an error, we need to stop() here + GetMainRThread()->ClearError(); + + // We should be able to get this far, but fut will contain an error + // because it tried to evaluate R code from another thread + results = arrow::ValueOrStop(fut.result()); + + } else { + for (R_xlen_t i = 0; i < funs_that_return_a_string.size(); i++) { + auto result = SafeCallIntoR([&]() { + cpp11::function fun(funs_that_return_a_string[i]); + return cpp11::as_cpp(fun()); + }); + GetMainRThread()->ClearError(); + arrow::StopIfNotOk(result.status()); + results.push_back(result.ValueUnsafe()); + } } - // and then this would be back on the main thread just to make - // sure the results are correct + // To make sure the results are correct cpp11::writable::strings results_sexp; + for (std::string& result : results) { results_sexp.push_back(result); } diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index fa899094844..2e0e51a14cb 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -20,58 +20,96 @@ #include "./arrow_types.h" -#include +#include + #include -#include #include -#include -class MainRThreadTasks { +// The MainRThread class keeps track of the thread on which it is safe +// to call the R API to facilitate its safe use (or erroring +// if it is not safe). The MainRThread singleton can be accessed from +// any thread using GetMainRThread(); the preferred way to call +// the R API where it may not be safe to do so is to use +// SafeCallIntoR([&]() { some_expression_returning_sexp; }). +class MainRThread { public: + MainRThread() : initialized_(false) {} + + // Call this method from the R thread (e.g., on package load) + // to save an internal copy of the thread id. + void Initialize() { + thread_id_ = std::this_thread::get_id(); + initialized_ = true; + SetError(R_NilValue); + } + + bool IsInitialized() { return initialized_; } + + // Check if the current thread is the main R thread + bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; } + + // Class whose run() method will be called from the main R thread + // but whose results may be accessed (as class fields) from + // potentially another thread. class Task { public: virtual ~Task() {} - virtual void run() {} + virtual arrow::Status run() = 0; }; - class EventLoop { - public: - EventLoop(); - ~EventLoop(); - std::thread::id thread() { return thread_; } - - private: - std::thread::id thread_; - }; + // Synchronously run `task` if it is safe to do so or return an + // error otherwise. + arrow::Status RunTask(Task* task); - void Register(EventLoop* loop) { loop_ = loop; } + // Save an error token generated from a cpp11::unwind_exception + // so that it can be properly handled after some cleanup code + // has run (e.g., cancelling some futures or waiting for them + // to finish). + void SetError(cpp11::sexp token) { error_token_ = token; } - void Unregister() { loop_ = nullptr; } + // Check if there is a saved error + bool HasError() { return error_token_ != R_NilValue; } - EventLoop* Loop() { return loop_; } + // Throw a cpp11::unwind_exception() with the saved token if it exists + void ClearError() { + if (HasError()) { + cpp11::unwind_exception e(error_token_); + SetError(R_NilValue); + throw e; + } + } private: - EventLoop* loop_; + bool initialized_; + std::thread::id thread_id_; + cpp11::sexp error_token_; }; -void SafeCallIntoRBase(MainRThreadTasks::Task* task); +// Retrieve the MainRThread singleton +MainRThread* GetMainRThread(); +// Call into R and return a C++ object. Note that you can't return +// a SEXP (use cpp11::as_sexp to convert it to a C++ type inside +// `fun`). template -T SafeCallIntoR(std::function fun) { - class TypedTask : public MainRThreadTasks::Task { +arrow::Result SafeCallIntoR(std::function fun) { + class TypedTask : public MainRThread::Task { public: - TypedTask(std::function fun) : fun_(fun){}; + TypedTask(std::function fun) : fun_(fun){}; - void run() { result = cpp11::as_cpp(fun_()); } + arrow::Status run() { + result = fun_(); + return arrow::Status::OK(); + } T result; private: - std::function fun_; + std::function fun_; }; TypedTask task(fun); - SafeCallIntoRBase(&task); + ARROW_RETURN_NOT_OK(GetMainRThread()->RunTask(&task)); return task.result; } From a9a6c3c457725dd460be482364a63fd9c5dcb7c7 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 14:06:06 -0300 Subject: [PATCH 05/28] cpp lint --- r/src/safe-call-into-r.cpp | 2 +- r/src/safe-call-into-r.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r.cpp index 22298bbac44..04848d26f6b 100644 --- a/r/src/safe-call-into-r.cpp +++ b/r/src/safe-call-into-r.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "safe-call-into-r.h" +#include "./safe-call-into-r.h" #include #include diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 2e0e51a14cb..99836ea3d27 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -95,7 +95,7 @@ template arrow::Result SafeCallIntoR(std::function fun) { class TypedTask : public MainRThread::Task { public: - TypedTask(std::function fun) : fun_(fun){}; + explicit TypedTask(std::function fun) : fun_(fun) {} arrow::Status run() { result = fun_(); From d17dcada675d08c5b03b2db01102d119b37adb5e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 14:07:39 -0300 Subject: [PATCH 06/28] don't evaluate C++ when arrow not available --- r/R/arrow-package.R | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index 953d374152e..9b40495413e 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -31,8 +31,10 @@ #' @importFrom vctrs s3_register vec_size vec_cast vec_unique .onLoad <- function(...) { - # Make sure C++ knows on which thread it is safe to call the R API - InitializeMainRThread() + if (arrow_available()) { + # Make sure C++ knows on which thread it is safe to call the R API + InitializeMainRThread() + } dplyr_methods <- paste0( "dplyr::", From 07be4fb8fe974e8fb12909a59eaf4a1de4a9062f Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 14:47:17 -0300 Subject: [PATCH 07/28] with theoretical support for an executor --- r/src/safe-call-into-r.cpp | 12 ++++++++++-- r/src/safe-call-into-r.h | 31 ++++++++++++++++++++++++++++--- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r.cpp index 04848d26f6b..b84251d82b9 100644 --- a/r/src/safe-call-into-r.cpp +++ b/r/src/safe-call-into-r.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "./safe-call-into-r.h" +#include "safe-call-into-r.h" #include #include @@ -25,6 +25,7 @@ MainRThread* GetMainRThread() { return &main_r_thread; } arrow::Status MainRThread::RunTask(Task* task) { if (IsMainThread()) { + // If we're on the main thread, run the task immediately try { ARROW_RETURN_NOT_OK(task->run()); return arrow::Status::OK(); @@ -32,8 +33,15 @@ arrow::Status MainRThread::RunTask(Task* task) { SetError(e.token); return arrow::Status::UnknownError("R code execution error"); } + } else if (executor_ != nullptr) { + // If we are not on the main thread and have an Executor + // use it to run the task on the main R thread. + auto fut = executor_->Submit([task]() { return task->run(); }); + ARROW_RETURN_NOT_OK(fut); + ARROW_RETURN_NOT_OK(fut.ValueUnsafe().result()); + return arrow::Status::OK(); } else { - return arrow::Status::NotImplemented("Call to R from a non-R thread"); + return arrow::Status::NotImplemented("Call to R from a non-R thread without an event loop"); } } diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 99836ea3d27..4a04c3d69eb 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -21,6 +21,7 @@ #include "./arrow_types.h" #include +#include #include #include @@ -33,7 +34,7 @@ // SafeCallIntoR([&]() { some_expression_returning_sexp; }). class MainRThread { public: - MainRThread() : initialized_(false) {} + MainRThread() : initialized_(false), executor_(nullptr) {} // Call this method from the R thread (e.g., on package load) // to save an internal copy of the thread id. @@ -48,6 +49,8 @@ class MainRThread { // Check if the current thread is the main R thread bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; } + + // Class whose run() method will be called from the main R thread // but whose results may be accessed (as class fields) from // potentially another thread. @@ -57,10 +60,13 @@ class MainRThread { virtual arrow::Status run() = 0; }; - // Synchronously run `task` if it is safe to do so or return an - // error otherwise. + // Run `task` if it is safe to do so or return an error otherwise. arrow::Status RunTask(Task* task); + arrow::internal::Executor*& Executor() { + return executor_; + } + // Save an error token generated from a cpp11::unwind_exception // so that it can be properly handled after some cleanup code // has run (e.g., cancelling some futures or waiting for them @@ -83,6 +89,7 @@ class MainRThread { bool initialized_; std::thread::id thread_id_; cpp11::sexp error_token_; + arrow::internal::Executor* executor_; }; // Retrieve the MainRThread singleton @@ -113,4 +120,22 @@ arrow::Result SafeCallIntoR(std::function fun) { return task.result; } +template +arrow::Result RunWithCapturedR(std::function()> task) { + if (GetMainRThread()->Executor() != nullptr) { + return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()"); + } + + arrow::Result result = arrow::internal::SerialExecutor::RunInSerialExecutor( + [task](arrow::internal::Executor* executor) { + GetMainRThread()->Executor() = executor; + arrow::Future result = task(); + return result; + }); + + GetMainRThread()->Executor() = nullptr; + + return result; +} + #endif From b1d360fcf8a45a5fcc583a4835d207a1bea147fe Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 15:05:39 -0300 Subject: [PATCH 08/28] shuffle tests to make room for the async with executor bit --- r/R/arrowExports.R | 4 +-- r/src/arrowExports.cpp | 12 ++++---- r/src/safe-call-into-r.cpp | 60 +++++++++++++------------------------- 3 files changed, 29 insertions(+), 47 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 448f3fb101f..d056d5d66a6 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1708,8 +1708,8 @@ InitializeMainRThread <- function() { invisible(.Call(`_arrow_InitializeMainRThread`)) } -TestSafeCallIntoR <- function(funs_that_return_a_string, async) { - .Call(`_arrow_TestSafeCallIntoR`, funs_that_return_a_string, async) +TestSafeCallIntoR <- function(fun_that_returns_a_string, opt) { + .Call(`_arrow_TestSafeCallIntoR`, fun_that_returns_a_string, opt) } Array__GetScalar <- function(x, i) { diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 8c49e7ea5cc..62e6b611cd1 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -6728,16 +6728,16 @@ extern "C" SEXP _arrow_InitializeMainRThread(){ // safe-call-into-r.cpp #if defined(ARROW_R_WITH_ARROW) -cpp11::strings TestSafeCallIntoR(cpp11::list funs_that_return_a_string, bool async); -extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP funs_that_return_a_string_sexp, SEXP async_sexp){ +std::string TestSafeCallIntoR(cpp11::sexp fun_that_returns_a_string, std::string opt); +extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP fun_that_returns_a_string_sexp, SEXP opt_sexp){ BEGIN_CPP11 - arrow::r::Input::type funs_that_return_a_string(funs_that_return_a_string_sexp); - arrow::r::Input::type async(async_sexp); - return cpp11::as_sexp(TestSafeCallIntoR(funs_that_return_a_string, async)); + arrow::r::Input::type fun_that_returns_a_string(fun_that_returns_a_string_sexp); + arrow::r::Input::type opt(opt_sexp); + return cpp11::as_sexp(TestSafeCallIntoR(fun_that_returns_a_string, opt)); END_CPP11 } #else -extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP funs_that_return_a_string_sexp, SEXP async_sexp){ +extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP fun_that_returns_a_string_sexp, SEXP opt_sexp){ Rf_error("Cannot call TestSafeCallIntoR(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r.cpp index b84251d82b9..544b65672d3 100644 --- a/r/src/safe-call-into-r.cpp +++ b/r/src/safe-call-into-r.cpp @@ -49,42 +49,33 @@ arrow::Status MainRThread::RunTask(Task* task) { void InitializeMainRThread() { main_r_thread.Initialize(); } // [[arrow::export]] -cpp11::strings TestSafeCallIntoR(cpp11::list funs_that_return_a_string, bool async) { - std::vector results; +std::string TestSafeCallIntoR(cpp11::sexp fun_that_returns_a_string, std::string opt) { + if (opt == "async_with_executor") { + // std::thread* thread_ptr; - if (async) { - // Probably a better test would be to submit all the jobs at once and - // wait for them all to finish, but I'm not sure how to do that yet! - // This simulates the Arrow thread pool. Just imagine it is static and lives forever. + // thread_ptr->join(); + // delete thread_ptr; + return ""; + } else if (opt == "async_without_executor") { std::thread* thread_ptr; - SEXP funs_sexp = funs_that_return_a_string; - R_xlen_t n_funs = funs_that_return_a_string.size(); + SEXP fun_sexp = fun_that_returns_a_string; + auto fut = arrow::Future::Make(); - auto fut = arrow::Future>::Make(); - - thread_ptr = new std::thread([fut, funs_sexp, n_funs]() mutable { - std::vector results_local; - - for (R_xlen_t i = 0; i < n_funs; i++) { + thread_ptr = new std::thread([fut, fun_sexp]() mutable { auto result = SafeCallIntoR([&] { - cpp11::function fun(VECTOR_ELT(funs_sexp, i)); + cpp11::function fun(fun_sexp); return cpp11::as_cpp(fun()); }); if (result.ok()) { - results_local.push_back(result.ValueUnsafe()); + fut.MarkFinished(result.ValueUnsafe()); } else { fut.MarkFinished(result.status()); - return; } - } - - fut.MarkFinished(results_local); }); - // Simulated thread pool thread_ptr->join(); delete thread_ptr; @@ -94,26 +85,17 @@ cpp11::strings TestSafeCallIntoR(cpp11::list funs_that_return_a_string, bool asy // We should be able to get this far, but fut will contain an error // because it tried to evaluate R code from another thread - results = arrow::ValueOrStop(fut.result()); + return arrow::ValueOrStop(fut.result()); - } else { - for (R_xlen_t i = 0; i < funs_that_return_a_string.size(); i++) { - auto result = SafeCallIntoR([&]() { - cpp11::function fun(funs_that_return_a_string[i]); + } else if (opt == "on_main_thread") { + auto result = SafeCallIntoR([&]() { + cpp11::function fun(fun_that_returns_a_string); return cpp11::as_cpp(fun()); }); - GetMainRThread()->ClearError(); - arrow::StopIfNotOk(result.status()); - results.push_back(result.ValueUnsafe()); - } - } - - // To make sure the results are correct - cpp11::writable::strings results_sexp; - - for (std::string& result : results) { - results_sexp.push_back(result); + GetMainRThread()->ClearError(); + arrow::StopIfNotOk(result.status()); + return result.ValueUnsafe(); + } else { + return ""; } - - return results_sexp; } From 21518f056581f1a1f58988050df52628a5f9eeb1 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 15:21:07 -0300 Subject: [PATCH 09/28] even more test simplifying --- r/R/arrowExports.R | 4 ++-- r/src/arrowExports.cpp | 10 +++++----- r/src/safe-call-into-r.cpp | 12 ++++-------- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index d056d5d66a6..332e797ee04 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1708,8 +1708,8 @@ InitializeMainRThread <- function() { invisible(.Call(`_arrow_InitializeMainRThread`)) } -TestSafeCallIntoR <- function(fun_that_returns_a_string, opt) { - .Call(`_arrow_TestSafeCallIntoR`, fun_that_returns_a_string, opt) +TestSafeCallIntoR <- function(r_fun_that_returns_a_string, opt) { + .Call(`_arrow_TestSafeCallIntoR`, r_fun_that_returns_a_string, opt) } Array__GetScalar <- function(x, i) { diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 62e6b611cd1..c1ba5f3125c 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -6728,16 +6728,16 @@ extern "C" SEXP _arrow_InitializeMainRThread(){ // safe-call-into-r.cpp #if defined(ARROW_R_WITH_ARROW) -std::string TestSafeCallIntoR(cpp11::sexp fun_that_returns_a_string, std::string opt); -extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP fun_that_returns_a_string_sexp, SEXP opt_sexp){ +std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, std::string opt); +extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP r_fun_that_returns_a_string_sexp, SEXP opt_sexp){ BEGIN_CPP11 - arrow::r::Input::type fun_that_returns_a_string(fun_that_returns_a_string_sexp); + arrow::r::Input::type r_fun_that_returns_a_string(r_fun_that_returns_a_string_sexp); arrow::r::Input::type opt(opt_sexp); - return cpp11::as_sexp(TestSafeCallIntoR(fun_that_returns_a_string, opt)); + return cpp11::as_sexp(TestSafeCallIntoR(r_fun_that_returns_a_string, opt)); END_CPP11 } #else -extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP fun_that_returns_a_string_sexp, SEXP opt_sexp){ +extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP r_fun_that_returns_a_string_sexp, SEXP opt_sexp){ Rf_error("Cannot call TestSafeCallIntoR(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r.cpp index 544b65672d3..d895063000b 100644 --- a/r/src/safe-call-into-r.cpp +++ b/r/src/safe-call-into-r.cpp @@ -49,7 +49,7 @@ arrow::Status MainRThread::RunTask(Task* task) { void InitializeMainRThread() { main_r_thread.Initialize(); } // [[arrow::export]] -std::string TestSafeCallIntoR(cpp11::sexp fun_that_returns_a_string, std::string opt) { +std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, std::string opt) { if (opt == "async_with_executor") { // std::thread* thread_ptr; @@ -60,13 +60,10 @@ std::string TestSafeCallIntoR(cpp11::sexp fun_that_returns_a_string, std::string } else if (opt == "async_without_executor") { std::thread* thread_ptr; - SEXP fun_sexp = fun_that_returns_a_string; auto fut = arrow::Future::Make(); - - thread_ptr = new std::thread([fut, fun_sexp]() mutable { + thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable { auto result = SafeCallIntoR([&] { - cpp11::function fun(fun_sexp); - return cpp11::as_cpp(fun()); + return cpp11::as_cpp(r_fun_that_returns_a_string()); }); if (result.ok()) { @@ -89,8 +86,7 @@ std::string TestSafeCallIntoR(cpp11::sexp fun_that_returns_a_string, std::string } else if (opt == "on_main_thread") { auto result = SafeCallIntoR([&]() { - cpp11::function fun(fun_that_returns_a_string); - return cpp11::as_cpp(fun()); + return cpp11::as_cpp(r_fun_that_returns_a_string()); }); GetMainRThread()->ClearError(); arrow::StopIfNotOk(result.status()); From ed3ea45b09eed17b4aa9db9e9611b5be393579c0 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 15:35:04 -0300 Subject: [PATCH 10/28] insert test with event loop --- r/src/safe-call-into-r.cpp | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r.cpp index d895063000b..b61cc1eec0f 100644 --- a/r/src/safe-call-into-r.cpp +++ b/r/src/safe-call-into-r.cpp @@ -51,12 +51,32 @@ void InitializeMainRThread() { main_r_thread.Initialize(); } // [[arrow::export]] std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, std::string opt) { if (opt == "async_with_executor") { - // std::thread* thread_ptr; + std::thread* thread_ptr; + auto result = RunWithCapturedR([&thread_ptr, r_fun_that_returns_a_string]() { + auto fut = arrow::Future::Make(); + thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable { + auto result = SafeCallIntoR([&] { + return cpp11::as_cpp(r_fun_that_returns_a_string()); + }); - // thread_ptr->join(); - // delete thread_ptr; - return ""; + if (result.ok()) { + fut.MarkFinished(result.ValueUnsafe()); + } else { + fut.MarkFinished(result.status()); + } + }); + + return fut; + }); + + thread_ptr->join(); + delete thread_ptr; + + // Stop for any R execution errors that may have occurred + GetMainRThread()->ClearError(); + + return arrow::ValueOrStop(result); } else if (opt == "async_without_executor") { std::thread* thread_ptr; From 7c03527c7a1219a1322b6adb0889723f1eee4d55 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 15:35:34 -0300 Subject: [PATCH 11/28] clang-format --- r/src/safe-call-into-r.cpp | 57 +++++++++++++++++++------------------- r/src/safe-call-into-r.h | 6 +--- 2 files changed, 30 insertions(+), 33 deletions(-) diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r.cpp index b61cc1eec0f..e3cd91ced7c 100644 --- a/r/src/safe-call-into-r.cpp +++ b/r/src/safe-call-into-r.cpp @@ -41,7 +41,8 @@ arrow::Status MainRThread::RunTask(Task* task) { ARROW_RETURN_NOT_OK(fut.ValueUnsafe().result()); return arrow::Status::OK(); } else { - return arrow::Status::NotImplemented("Call to R from a non-R thread without an event loop"); + return arrow::Status::NotImplemented( + "Call to R from a non-R thread without an event loop"); } } @@ -49,26 +50,28 @@ arrow::Status MainRThread::RunTask(Task* task) { void InitializeMainRThread() { main_r_thread.Initialize(); } // [[arrow::export]] -std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, std::string opt) { +std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, + std::string opt) { if (opt == "async_with_executor") { std::thread* thread_ptr; - auto result = RunWithCapturedR([&thread_ptr, r_fun_that_returns_a_string]() { - auto fut = arrow::Future::Make(); - thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable { - auto result = SafeCallIntoR([&] { - return cpp11::as_cpp(r_fun_that_returns_a_string()); + auto result = + RunWithCapturedR([&thread_ptr, r_fun_that_returns_a_string]() { + auto fut = arrow::Future::Make(); + thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable { + auto result = SafeCallIntoR([&] { + return cpp11::as_cpp(r_fun_that_returns_a_string()); + }); + + if (result.ok()) { + fut.MarkFinished(result.ValueUnsafe()); + } else { + fut.MarkFinished(result.status()); + } }); - if (result.ok()) { - fut.MarkFinished(result.ValueUnsafe()); - } else { - fut.MarkFinished(result.status()); - } - }); - - return fut; - }); + return fut; + }); thread_ptr->join(); delete thread_ptr; @@ -82,15 +85,14 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, std:: auto fut = arrow::Future::Make(); thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable { - auto result = SafeCallIntoR([&] { - return cpp11::as_cpp(r_fun_that_returns_a_string()); - }); - - if (result.ok()) { - fut.MarkFinished(result.ValueUnsafe()); - } else { - fut.MarkFinished(result.status()); - } + auto result = SafeCallIntoR( + [&] { return cpp11::as_cpp(r_fun_that_returns_a_string()); }); + + if (result.ok()) { + fut.MarkFinished(result.ValueUnsafe()); + } else { + fut.MarkFinished(result.status()); + } }); thread_ptr->join(); @@ -105,9 +107,8 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, std:: return arrow::ValueOrStop(fut.result()); } else if (opt == "on_main_thread") { - auto result = SafeCallIntoR([&]() { - return cpp11::as_cpp(r_fun_that_returns_a_string()); - }); + auto result = SafeCallIntoR( + [&]() { return cpp11::as_cpp(r_fun_that_returns_a_string()); }); GetMainRThread()->ClearError(); arrow::StopIfNotOk(result.status()); return result.ValueUnsafe(); diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 4a04c3d69eb..e242852672a 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -49,8 +49,6 @@ class MainRThread { // Check if the current thread is the main R thread bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; } - - // Class whose run() method will be called from the main R thread // but whose results may be accessed (as class fields) from // potentially another thread. @@ -63,9 +61,7 @@ class MainRThread { // Run `task` if it is safe to do so or return an error otherwise. arrow::Status RunTask(Task* task); - arrow::internal::Executor*& Executor() { - return executor_; - } + arrow::internal::Executor*& Executor() { return executor_; } // Save an error token generated from a cpp11::unwind_exception // so that it can be properly handled after some cleanup code From 9750f727b43167ba8034d370a590df6a3212197f Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 16:24:15 -0300 Subject: [PATCH 12/28] maybe fix cpp linter, add tests --- r/src/arrowExports.cpp | 4 +- ...l-into-r.cpp => safe-call-into-r-impl.cpp} | 2 +- r/src/safe-call-into-r.h | 1 + r/tests/testthat/test-safe-call-into-r.R | 55 +++++++++++++++++++ 4 files changed, 59 insertions(+), 3 deletions(-) rename r/src/{safe-call-into-r.cpp => safe-call-into-r-impl.cpp} (99%) create mode 100644 r/tests/testthat/test-safe-call-into-r.R diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index c1ba5f3125c..23681a7927c 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -6711,7 +6711,7 @@ extern "C" SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEX } #endif -// safe-call-into-r.cpp +// safe-call-into-r-impl.cpp #if defined(ARROW_R_WITH_ARROW) void InitializeMainRThread(); extern "C" SEXP _arrow_InitializeMainRThread(){ @@ -6726,7 +6726,7 @@ extern "C" SEXP _arrow_InitializeMainRThread(){ } #endif -// safe-call-into-r.cpp +// safe-call-into-r-impl.cpp #if defined(ARROW_R_WITH_ARROW) std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, std::string opt); extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP r_fun_that_returns_a_string_sexp, SEXP opt_sexp){ diff --git a/r/src/safe-call-into-r.cpp b/r/src/safe-call-into-r-impl.cpp similarity index 99% rename from r/src/safe-call-into-r.cpp rename to r/src/safe-call-into-r-impl.cpp index e3cd91ced7c..d758a9229d2 100644 --- a/r/src/safe-call-into-r.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -#include "safe-call-into-r.h" #include #include +#include "./safe-call-into-r.h" static MainRThread main_r_thread; diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index e242852672a..7a8e976d25b 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -61,6 +61,7 @@ class MainRThread { // Run `task` if it is safe to do so or return an error otherwise. arrow::Status RunTask(Task* task); + // The Executor that is running on the main R thread, if it exists arrow::internal::Executor*& Executor() { return executor_; } // Save an error token generated from a cpp11::unwind_exception diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R new file mode 100644 index 00000000000..52dade419cd --- /dev/null +++ b/r/tests/testthat/test-safe-call-into-r.R @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +test_that("SafeCallIntoR works from the main R thread", { + expect_identical( + TestSafeCallIntoR(function() "string one!", opt = "on_main_thread"), + "string one!" + ) + + expect_error( + TestSafeCallIntoR(function() stop("an error!"), opt = "on_main_thread"), + "an error!" + ) +}) + +test_that("SafeCallIntoR works within RunWithCapturedR", { + expect_identical( + TestSafeCallIntoR(function() "string one!", opt = "async_with_executor"), + "string one!" + ) + + # This runs with the expected error, but causes subsequent segfaults, probably related + # to the error_token_ (maybe having to do with the copy-constructor?) + # expect_error( + # TestSafeCallIntoR(function() stop("an error!"), opt = "async_with_executor"), + # "an error!" + # ) +}) + + +test_that("SafeCallIntoR errors from the non-R thread", { + expect_error( + TestSafeCallIntoR(function() "string one!", opt = "async_without_executor"), + "Call to R from a non-R thread without an event loop" + ) + + expect_error( + TestSafeCallIntoR(function() stop("an error!"), opt = "async_without_executor"), + "Call to R from a non-R thread without an event loop" + ) +}) From f361ad5821fbb526b33716409700b4685a556aee Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 16:34:01 -0300 Subject: [PATCH 13/28] maybe fix arrow-without-arrow --- r/src/safe-call-into-r-impl.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index d758a9229d2..aad297aad02 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +#include "./arrow_types.h" +#if defined(ARROW_R_WITH_ARROW) + #include #include #include "./safe-call-into-r.h" @@ -116,3 +119,5 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, return ""; } } + +#endif From 431d7b05058f59785a7ad9253878e26e9c724be1 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 11:45:12 -0300 Subject: [PATCH 14/28] change location of main_r_thread --- r/src/safe-call-into-r-impl.cpp | 15 ++++++++------- r/src/safe-call-into-r.h | 10 +++++----- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index aad297aad02..ac44634b71e 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -22,9 +22,10 @@ #include #include "./safe-call-into-r.h" -static MainRThread main_r_thread; - -MainRThread* GetMainRThread() { return &main_r_thread; } +MainRThread& GetMainRThread() { + static MainRThread main_r_thread; + return main_r_thread; +} arrow::Status MainRThread::RunTask(Task* task) { if (IsMainThread()) { @@ -50,7 +51,7 @@ arrow::Status MainRThread::RunTask(Task* task) { } // [[arrow::export]] -void InitializeMainRThread() { main_r_thread.Initialize(); } +void InitializeMainRThread() { GetMainRThread().Initialize(); } // [[arrow::export]] std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, @@ -80,7 +81,7 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, delete thread_ptr; // Stop for any R execution errors that may have occurred - GetMainRThread()->ClearError(); + GetMainRThread().ClearError(); return arrow::ValueOrStop(result); } else if (opt == "async_without_executor") { @@ -103,7 +104,7 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, // We didn't evaluate any R code because it wasn't safe, but // if we did and there was an error, we need to stop() here - GetMainRThread()->ClearError(); + GetMainRThread().ClearError(); // We should be able to get this far, but fut will contain an error // because it tried to evaluate R code from another thread @@ -112,7 +113,7 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, } else if (opt == "on_main_thread") { auto result = SafeCallIntoR( [&]() { return cpp11::as_cpp(r_fun_that_returns_a_string()); }); - GetMainRThread()->ClearError(); + GetMainRThread().ClearError(); arrow::StopIfNotOk(result.status()); return result.ValueUnsafe(); } else { diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 7a8e976d25b..be8d3cca985 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -90,7 +90,7 @@ class MainRThread { }; // Retrieve the MainRThread singleton -MainRThread* GetMainRThread(); +MainRThread& GetMainRThread(); // Call into R and return a C++ object. Note that you can't return // a SEXP (use cpp11::as_sexp to convert it to a C++ type inside @@ -113,24 +113,24 @@ arrow::Result SafeCallIntoR(std::function fun) { }; TypedTask task(fun); - ARROW_RETURN_NOT_OK(GetMainRThread()->RunTask(&task)); + ARROW_RETURN_NOT_OK(GetMainRThread().RunTask(&task)); return task.result; } template arrow::Result RunWithCapturedR(std::function()> task) { - if (GetMainRThread()->Executor() != nullptr) { + if (GetMainRThread().Executor() != nullptr) { return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()"); } arrow::Result result = arrow::internal::SerialExecutor::RunInSerialExecutor( [task](arrow::internal::Executor* executor) { - GetMainRThread()->Executor() = executor; + GetMainRThread().Executor() = executor; arrow::Future result = task(); return result; }); - GetMainRThread()->Executor() = nullptr; + GetMainRThread().Executor() = nullptr; return result; } From b373fe6a773818373ae721a1bfc504ad476bf9c1 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 11:50:38 -0300 Subject: [PATCH 15/28] fix comments and error messages to align with behaviour --- r/src/safe-call-into-r-impl.cpp | 2 +- r/src/safe-call-into-r.h | 2 +- r/tests/testthat/test-safe-call-into-r.R | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index ac44634b71e..5a417577483 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -46,7 +46,7 @@ arrow::Status MainRThread::RunTask(Task* task) { return arrow::Status::OK(); } else { return arrow::Status::NotImplemented( - "Call to R from a non-R thread without an event loop"); + "Call to R from a non-R thread without calling RunWithCapturedR"); } } diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index be8d3cca985..782c230de87 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -31,7 +31,7 @@ // if it is not safe). The MainRThread singleton can be accessed from // any thread using GetMainRThread(); the preferred way to call // the R API where it may not be safe to do so is to use -// SafeCallIntoR([&]() { some_expression_returning_sexp; }). +// SafeCallIntoR([&]() { ... }). class MainRThread { public: MainRThread() : initialized_(false), executor_(nullptr) {} diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R index 52dade419cd..6a1689f39bd 100644 --- a/r/tests/testthat/test-safe-call-into-r.R +++ b/r/tests/testthat/test-safe-call-into-r.R @@ -45,11 +45,11 @@ test_that("SafeCallIntoR works within RunWithCapturedR", { test_that("SafeCallIntoR errors from the non-R thread", { expect_error( TestSafeCallIntoR(function() "string one!", opt = "async_without_executor"), - "Call to R from a non-R thread without an event loop" + "Call to R from a non-R thread" ) expect_error( TestSafeCallIntoR(function() stop("an error!"), opt = "async_without_executor"), - "Call to R from a non-R thread without an event loop" + "Call to R from a non-R thread" ) }) From 6455c980b0bcc478a0312d6dff1b925b2a8566c9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 11:51:51 -0300 Subject: [PATCH 16/28] don't copy `fun` --- r/src/safe-call-into-r.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 782c230de87..e5963760da3 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -112,7 +112,7 @@ arrow::Result SafeCallIntoR(std::function fun) { std::function fun_; }; - TypedTask task(fun); + TypedTask task(std::move(fun)); ARROW_RETURN_NOT_OK(GetMainRThread().RunTask(&task)); return task.result; } From 65801f38dab52ba87c994f3e0fe9f118a1ececf0 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 11:54:29 -0300 Subject: [PATCH 17/28] RunWithCaptureR(task) -> RunWithCapturedR(make_arrow_call) --- r/src/safe-call-into-r.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index e5963760da3..2b4f7bb7b5f 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -118,15 +118,15 @@ arrow::Result SafeCallIntoR(std::function fun) { } template -arrow::Result RunWithCapturedR(std::function()> task) { +arrow::Result RunWithCapturedR(std::function()> make_arrow_call) { if (GetMainRThread().Executor() != nullptr) { return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()"); } arrow::Result result = arrow::internal::SerialExecutor::RunInSerialExecutor( - [task](arrow::internal::Executor* executor) { + [make_arrow_call](arrow::internal::Executor* executor) { GetMainRThread().Executor() = executor; - arrow::Future result = task(); + arrow::Future result = make_arrow_call(); return result; }); From be22a4d0958c28d6614ca2fdd6e8e6021e9d495d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 11:59:40 -0300 Subject: [PATCH 18/28] simplify Future/Status usage --- r/src/safe-call-into-r-impl.cpp | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index 5a417577483..f1f28d422a2 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -31,8 +31,7 @@ arrow::Status MainRThread::RunTask(Task* task) { if (IsMainThread()) { // If we're on the main thread, run the task immediately try { - ARROW_RETURN_NOT_OK(task->run()); - return arrow::Status::OK(); + return task->run(); } catch (cpp11::unwind_exception& e) { SetError(e.token); return arrow::Status::UnknownError("R code execution error"); @@ -40,10 +39,8 @@ arrow::Status MainRThread::RunTask(Task* task) { } else if (executor_ != nullptr) { // If we are not on the main thread and have an Executor // use it to run the task on the main R thread. - auto fut = executor_->Submit([task]() { return task->run(); }); - ARROW_RETURN_NOT_OK(fut); - ARROW_RETURN_NOT_OK(fut.ValueUnsafe().result()); - return arrow::Status::OK(); + auto fut = DeferNotOk(executor_->Submit([task]() { return task->run(); })); + return fut.status(); } else { return arrow::Status::NotImplemented( "Call to R from a non-R thread without calling RunWithCapturedR"); @@ -67,11 +64,7 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, return cpp11::as_cpp(r_fun_that_returns_a_string()); }); - if (result.ok()) { - fut.MarkFinished(result.ValueUnsafe()); - } else { - fut.MarkFinished(result.status()); - } + fut.MarkFinished(result); }); return fut; From 966522604368668e2e80385bc3dc9a025f7f96e9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 14:01:11 -0300 Subject: [PATCH 19/28] fix one more lying comment --- r/src/safe-call-into-r.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 2b4f7bb7b5f..1988fa673be 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -93,7 +93,7 @@ class MainRThread { MainRThread& GetMainRThread(); // Call into R and return a C++ object. Note that you can't return -// a SEXP (use cpp11::as_sexp to convert it to a C++ type inside +// a SEXP (use cpp11::as_cpp to convert it to a C++ type inside // `fun`). template arrow::Result SafeCallIntoR(std::function fun) { From c1f62b20e7b79368a621e059dc7be6d38ada2561 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 14:12:48 -0300 Subject: [PATCH 20/28] RunTask returns a Result<> --- r/src/safe-call-into-r-impl.cpp | 4 ++-- r/src/safe-call-into-r.h | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index f1f28d422a2..ff536aaa698 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -27,7 +27,7 @@ MainRThread& GetMainRThread() { return main_r_thread; } -arrow::Status MainRThread::RunTask(Task* task) { +arrow::Result MainRThread::RunTask(Task* task) { if (IsMainThread()) { // If we're on the main thread, run the task immediately try { @@ -40,7 +40,7 @@ arrow::Status MainRThread::RunTask(Task* task) { // If we are not on the main thread and have an Executor // use it to run the task on the main R thread. auto fut = DeferNotOk(executor_->Submit([task]() { return task->run(); })); - return fut.status(); + return fut.result(); } else { return arrow::Status::NotImplemented( "Call to R from a non-R thread without calling RunWithCapturedR"); diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 1988fa673be..6c87535cbe0 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -55,11 +55,11 @@ class MainRThread { class Task { public: virtual ~Task() {} - virtual arrow::Status run() = 0; + virtual arrow::Result run() = 0; }; // Run `task` if it is safe to do so or return an error otherwise. - arrow::Status RunTask(Task* task); + arrow::Result RunTask(Task* task); // The Executor that is running on the main R thread, if it exists arrow::internal::Executor*& Executor() { return executor_; } @@ -101,9 +101,9 @@ arrow::Result SafeCallIntoR(std::function fun) { public: explicit TypedTask(std::function fun) : fun_(fun) {} - arrow::Status run() { + arrow::Result run() { result = fun_(); - return arrow::Status::OK(); + return this; } T result; From 9cabbdc80f99db4d5f55153a236c2670e0ea9d5d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 14:37:45 -0300 Subject: [PATCH 21/28] MainRThread::RunTask() returns a Future --- r/src/safe-call-into-r-impl.cpp | 13 ++++++------- r/src/safe-call-into-r.h | 5 +++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index ff536aaa698..932db40a6df 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -27,23 +27,22 @@ MainRThread& GetMainRThread() { return main_r_thread; } -arrow::Result MainRThread::RunTask(Task* task) { +arrow::Future MainRThread::RunTask(Task* task) { if (IsMainThread()) { // If we're on the main thread, run the task immediately try { - return task->run(); + return arrow::Future::MakeFinished(task->run()); } catch (cpp11::unwind_exception& e) { SetError(e.token); - return arrow::Status::UnknownError("R code execution error"); + return arrow::Future::MakeFinished(arrow::Status::UnknownError("R code execution error")); } } else if (executor_ != nullptr) { // If we are not on the main thread and have an Executor // use it to run the task on the main R thread. - auto fut = DeferNotOk(executor_->Submit([task]() { return task->run(); })); - return fut.result(); + return DeferNotOk(executor_->Submit([task]() { return task->run(); })); } else { - return arrow::Status::NotImplemented( - "Call to R from a non-R thread without calling RunWithCapturedR"); + return arrow::Future::MakeFinished(arrow::Status::NotImplemented( + "Call to R from a non-R thread without calling RunWithCapturedR")); } } diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 6c87535cbe0..f85690b3bb1 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -59,7 +59,7 @@ class MainRThread { }; // Run `task` if it is safe to do so or return an error otherwise. - arrow::Result RunTask(Task* task); + arrow::Future RunTask(Task* task); // The Executor that is running on the main R thread, if it exists arrow::internal::Executor*& Executor() { return executor_; } @@ -113,7 +113,8 @@ arrow::Result SafeCallIntoR(std::function fun) { }; TypedTask task(std::move(fun)); - ARROW_RETURN_NOT_OK(GetMainRThread().RunTask(&task)); + arrow::Future task_result = GetMainRThread().RunTask(&task); + ARROW_RETURN_NOT_OK(task_result.result()); return task.result; } From 50fd564cb255908d5adf18cdbb55133f8416baa4 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 15:06:21 -0300 Subject: [PATCH 22/28] Implement SafeCallIntoRAsync() and make SafeCallIntoR() use it --- r/src/safe-call-into-r-impl.cpp | 19 ------------ r/src/safe-call-into-r.h | 53 ++++++++++++++------------------- 2 files changed, 23 insertions(+), 49 deletions(-) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index 932db40a6df..248c055a401 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -27,25 +27,6 @@ MainRThread& GetMainRThread() { return main_r_thread; } -arrow::Future MainRThread::RunTask(Task* task) { - if (IsMainThread()) { - // If we're on the main thread, run the task immediately - try { - return arrow::Future::MakeFinished(task->run()); - } catch (cpp11::unwind_exception& e) { - SetError(e.token); - return arrow::Future::MakeFinished(arrow::Status::UnknownError("R code execution error")); - } - } else if (executor_ != nullptr) { - // If we are not on the main thread and have an Executor - // use it to run the task on the main R thread. - return DeferNotOk(executor_->Submit([task]() { return task->run(); })); - } else { - return arrow::Future::MakeFinished(arrow::Status::NotImplemented( - "Call to R from a non-R thread without calling RunWithCapturedR")); - } -} - // [[arrow::export]] void InitializeMainRThread() { GetMainRThread().Initialize(); } diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index f85690b3bb1..7869fb18595 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -49,18 +49,6 @@ class MainRThread { // Check if the current thread is the main R thread bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; } - // Class whose run() method will be called from the main R thread - // but whose results may be accessed (as class fields) from - // potentially another thread. - class Task { - public: - virtual ~Task() {} - virtual arrow::Result run() = 0; - }; - - // Run `task` if it is safe to do so or return an error otherwise. - arrow::Future RunTask(Task* task); - // The Executor that is running on the main R thread, if it exists arrow::internal::Executor*& Executor() { return executor_; } @@ -96,26 +84,31 @@ MainRThread& GetMainRThread(); // a SEXP (use cpp11::as_cpp to convert it to a C++ type inside // `fun`). template -arrow::Result SafeCallIntoR(std::function fun) { - class TypedTask : public MainRThread::Task { - public: - explicit TypedTask(std::function fun) : fun_(fun) {} - - arrow::Result run() { - result = fun_(); - return this; +arrow::Future SafeCallIntoRAsync(std::function fun) { + MainRThread& main_r_thread = GetMainRThread(); + if (main_r_thread.IsMainThread()) { + // If we're on the main thread, run the task immediately + try { + return arrow::Future::MakeFinished(fun()); + } catch (cpp11::unwind_exception& e) { + main_r_thread.SetError(e.token); + return arrow::Future::MakeFinished( + arrow::Status::UnknownError("R code execution error")); } + } else if (main_r_thread.Executor() != nullptr) { + // If we are not on the main thread and have an Executor + // use it to run the task on the main R thread. + return DeferNotOk(main_r_thread.Executor()->Submit(fun)); + } else { + return arrow::Future::MakeFinished(arrow::Status::NotImplemented( + "Call to R from a non-R thread without calling RunWithCapturedR")); + } +} - T result; - - private: - std::function fun_; - }; - - TypedTask task(std::move(fun)); - arrow::Future task_result = GetMainRThread().RunTask(&task); - ARROW_RETURN_NOT_OK(task_result.result()); - return task.result; +template +arrow::Result SafeCallIntoR(std::function fun) { + arrow::Future result = SafeCallIntoRAsync(std::move(fun)); + return result.result(); } template From 8202e22e873c6cb1fe83da6b536132891787d1c5 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 15:48:09 -0300 Subject: [PATCH 23/28] handle errors that occur during event loop execution --- r/src/safe-call-into-r-impl.cpp | 8 ---- r/src/safe-call-into-r.h | 54 +++++++++++++++++++----- r/tests/testthat/test-safe-call-into-r.R | 8 ++-- 3 files changed, 47 insertions(+), 23 deletions(-) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index 248c055a401..783f9041ccb 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -53,9 +53,6 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, thread_ptr->join(); delete thread_ptr; - // Stop for any R execution errors that may have occurred - GetMainRThread().ClearError(); - return arrow::ValueOrStop(result); } else if (opt == "async_without_executor") { std::thread* thread_ptr; @@ -75,10 +72,6 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, thread_ptr->join(); delete thread_ptr; - // We didn't evaluate any R code because it wasn't safe, but - // if we did and there was an error, we need to stop() here - GetMainRThread().ClearError(); - // We should be able to get this far, but fut will contain an error // because it tried to evaluate R code from another thread return arrow::ValueOrStop(fut.result()); @@ -86,7 +79,6 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, } else if (opt == "on_main_thread") { auto result = SafeCallIntoR( [&]() { return cpp11::as_cpp(r_fun_that_returns_a_string()); }); - GetMainRThread().ClearError(); arrow::StopIfNotOk(result.status()); return result.ValueUnsafe(); } else { diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 7869fb18595..530239afa36 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -58,6 +58,8 @@ class MainRThread { // to finish). void SetError(cpp11::sexp token) { error_token_ = token; } + void ResetError() { error_token_ = R_NilValue; } + // Check if there is a saved error bool HasError() { return error_token_ != R_NilValue; } @@ -70,6 +72,17 @@ class MainRThread { } } + class UnwindStatusDetail : public arrow::StatusDetail { + public: + UnwindStatusDetail(SEXP token) : token_(token) {} + const char* type_id() const { return "MainRThread::UnwindStatusDetail"; }; + std::string ToString() const { return type_id(); } + SEXP token() { return token_; } + + private: + SEXP token_; + }; + private: bool initialized_; std::thread::id thread_id_; @@ -87,18 +100,34 @@ template arrow::Future SafeCallIntoRAsync(std::function fun) { MainRThread& main_r_thread = GetMainRThread(); if (main_r_thread.IsMainThread()) { - // If we're on the main thread, run the task immediately - try { - return arrow::Future::MakeFinished(fun()); - } catch (cpp11::unwind_exception& e) { - main_r_thread.SetError(e.token); - return arrow::Future::MakeFinished( - arrow::Status::UnknownError("R code execution error")); - } + // If we're on the main thread, run the task immediately and let + // the cpp11::unwind_exception be thrown since it will be caught + // at the top level. + return arrow::Future::MakeFinished(fun()); + // try { + + // } catch (cpp11::unwind_exception& e) { + // auto error_status = arrow::Status::ExecutionError("R code execution error"); + // auto error_detail = std::make_shared(e.token); + // return arrow::Future::MakeFinished(error_status.WithDetail(error_detail)); + // } } else if (main_r_thread.Executor() != nullptr) { - // If we are not on the main thread and have an Executor - // use it to run the task on the main R thread. - return DeferNotOk(main_r_thread.Executor()->Submit(fun)); + // If we are not on the main thread and have an Executor, + // use it to run the task on the main R thread. We can't throw + // a cpp11::unwind_exception here, so we need to propagate it back + // to RunWithCapturedR through the MainRThread singleton. + return DeferNotOk(main_r_thread.Executor()->Submit([fun]() { + if (GetMainRThread().HasError()) { + return arrow::Result(arrow::Status::UnknownError("R code execution error")); + } + + try { + return arrow::Result(fun()); + } catch (cpp11::unwind_exception& e) { + GetMainRThread().SetError(e.token); + return arrow::Result(arrow::Status::UnknownError("R code execution error")); + } + })); } else { return arrow::Future::MakeFinished(arrow::Status::NotImplemented( "Call to R from a non-R thread without calling RunWithCapturedR")); @@ -117,6 +146,8 @@ arrow::Result RunWithCapturedR(std::function()> make_arrow_c return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()"); } + GetMainRThread().ResetError(); + arrow::Result result = arrow::internal::SerialExecutor::RunInSerialExecutor( [make_arrow_call](arrow::internal::Executor* executor) { GetMainRThread().Executor() = executor; @@ -125,6 +156,7 @@ arrow::Result RunWithCapturedR(std::function()> make_arrow_c }); GetMainRThread().Executor() = nullptr; + GetMainRThread().ClearError(); return result; } diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R index 6a1689f39bd..c47bf040968 100644 --- a/r/tests/testthat/test-safe-call-into-r.R +++ b/r/tests/testthat/test-safe-call-into-r.R @@ -35,10 +35,10 @@ test_that("SafeCallIntoR works within RunWithCapturedR", { # This runs with the expected error, but causes subsequent segfaults, probably related # to the error_token_ (maybe having to do with the copy-constructor?) - # expect_error( - # TestSafeCallIntoR(function() stop("an error!"), opt = "async_with_executor"), - # "an error!" - # ) + expect_error( + TestSafeCallIntoR(function() stop("an error!"), opt = "async_with_executor"), + "an error!" + ) }) From 20867ddd8e1a971a9278c8af853ffe53f352d9aa Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 15:50:36 -0300 Subject: [PATCH 24/28] remove unused class, use ResetError() --- r/src/safe-call-into-r.h | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 530239afa36..c2503799b7e 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -67,22 +67,11 @@ class MainRThread { void ClearError() { if (HasError()) { cpp11::unwind_exception e(error_token_); - SetError(R_NilValue); + ResetError(); throw e; } } - class UnwindStatusDetail : public arrow::StatusDetail { - public: - UnwindStatusDetail(SEXP token) : token_(token) {} - const char* type_id() const { return "MainRThread::UnwindStatusDetail"; }; - std::string ToString() const { return type_id(); } - SEXP token() { return token_; } - - private: - SEXP token_; - }; - private: bool initialized_; std::thread::id thread_id_; From 48845fe362553f390ec6d488b20380cc06acfc79 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 29 Mar 2022 16:31:39 -0300 Subject: [PATCH 25/28] remove comment that is no longer relevant --- r/tests/testthat/test-safe-call-into-r.R | 2 -- 1 file changed, 2 deletions(-) diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R index c47bf040968..420aaa0b9e7 100644 --- a/r/tests/testthat/test-safe-call-into-r.R +++ b/r/tests/testthat/test-safe-call-into-r.R @@ -33,8 +33,6 @@ test_that("SafeCallIntoR works within RunWithCapturedR", { "string one!" ) - # This runs with the expected error, but causes subsequent segfaults, probably related - # to the error_token_ (maybe having to do with the copy-constructor?) expect_error( TestSafeCallIntoR(function() stop("an error!"), opt = "async_with_executor"), "an error!" From 6a9915aad9e3e104ae61ef8b62c862632b501878 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 6 Apr 2022 09:31:00 -0300 Subject: [PATCH 26/28] reviews --- r/src/safe-call-into-r-impl.cpp | 2 +- r/src/safe-call-into-r.h | 20 ++++++-------------- r/tests/testthat/test-safe-call-into-r.R | 1 - 3 files changed, 7 insertions(+), 16 deletions(-) diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index 783f9041ccb..aa0645aa7b4 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -82,7 +82,7 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, arrow::StopIfNotOk(result.status()); return result.ValueUnsafe(); } else { - return ""; + cpp11::stop("Unknown `opt`"); } } diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index c2503799b7e..1a27507b788 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -92,14 +92,7 @@ arrow::Future SafeCallIntoRAsync(std::function fun) { // If we're on the main thread, run the task immediately and let // the cpp11::unwind_exception be thrown since it will be caught // at the top level. - return arrow::Future::MakeFinished(fun()); - // try { - - // } catch (cpp11::unwind_exception& e) { - // auto error_status = arrow::Status::ExecutionError("R code execution error"); - // auto error_detail = std::make_shared(e.token); - // return arrow::Future::MakeFinished(error_status.WithDetail(error_detail)); - // } + return fun(); } else if (main_r_thread.Executor() != nullptr) { // If we are not on the main thread and have an Executor, // use it to run the task on the main R thread. We can't throw @@ -118,15 +111,15 @@ arrow::Future SafeCallIntoRAsync(std::function fun) { } })); } else { - return arrow::Future::MakeFinished(arrow::Status::NotImplemented( - "Call to R from a non-R thread without calling RunWithCapturedR")); + return arrow::Status::NotImplemented( + "Call to R from a non-R thread without calling RunWithCapturedR"); } } template arrow::Result SafeCallIntoR(std::function fun) { - arrow::Future result = SafeCallIntoRAsync(std::move(fun)); - return result.result(); + arrow::Future future = SafeCallIntoRAsync(std::move(fun)); + return future.result(); } template @@ -140,8 +133,7 @@ arrow::Result RunWithCapturedR(std::function()> make_arrow_c arrow::Result result = arrow::internal::SerialExecutor::RunInSerialExecutor( [make_arrow_call](arrow::internal::Executor* executor) { GetMainRThread().Executor() = executor; - arrow::Future result = make_arrow_call(); - return result; + return make_arrow_call(); }); GetMainRThread().Executor() = nullptr; diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R index 420aaa0b9e7..19ce0e7ed9f 100644 --- a/r/tests/testthat/test-safe-call-into-r.R +++ b/r/tests/testthat/test-safe-call-into-r.R @@ -39,7 +39,6 @@ test_that("SafeCallIntoR works within RunWithCapturedR", { ) }) - test_that("SafeCallIntoR errors from the non-R thread", { expect_error( TestSafeCallIntoR(function() "string one!", opt = "async_without_executor"), From c795d20eda6e9e79bf09f4ddb0e47c331ebf9178 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 7 Apr 2022 09:10:39 -0300 Subject: [PATCH 27/28] make a note of where TestSafeCallIntoR is defined --- r/tests/testthat/test-safe-call-into-r.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R index 19ce0e7ed9f..fe3c127c919 100644 --- a/r/tests/testthat/test-safe-call-into-r.R +++ b/r/tests/testthat/test-safe-call-into-r.R @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +# Note that TestSafeCallIntoR is defined in safe-call-into-r-impl.cpp + test_that("SafeCallIntoR works from the main R thread", { expect_identical( TestSafeCallIntoR(function() "string one!", opt = "on_main_thread"), From c28d2b31f060c43f0b03563bcbb2696a34db9177 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 7 Apr 2022 10:58:45 -0300 Subject: [PATCH 28/28] skip custom C++ checks on CRAN --- r/tests/testthat/test-safe-call-into-r.R | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R index fe3c127c919..e9438de58be 100644 --- a/r/tests/testthat/test-safe-call-into-r.R +++ b/r/tests/testthat/test-safe-call-into-r.R @@ -18,6 +18,8 @@ # Note that TestSafeCallIntoR is defined in safe-call-into-r-impl.cpp test_that("SafeCallIntoR works from the main R thread", { + skip_on_cran() + expect_identical( TestSafeCallIntoR(function() "string one!", opt = "on_main_thread"), "string one!" @@ -30,6 +32,8 @@ test_that("SafeCallIntoR works from the main R thread", { }) test_that("SafeCallIntoR works within RunWithCapturedR", { + skip_on_cran() + expect_identical( TestSafeCallIntoR(function() "string one!", opt = "async_with_executor"), "string one!" @@ -42,6 +46,8 @@ test_that("SafeCallIntoR works within RunWithCapturedR", { }) test_that("SafeCallIntoR errors from the non-R thread", { + skip_on_cran() + expect_error( TestSafeCallIntoR(function() "string one!", opt = "async_without_executor"), "Call to R from a non-R thread"