Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d08b946
compiling sketch
paleolimbot Mar 3, 2022
5671a13
remove event loop indirection
paleolimbot Mar 4, 2022
e3dbdd3
license
paleolimbot Mar 4, 2022
7941d8d
redo safe call into r
paleolimbot Mar 25, 2022
a9a6c3c
cpp lint
paleolimbot Mar 25, 2022
d17dcad
don't evaluate C++ when arrow not available
paleolimbot Mar 25, 2022
07be4fb
with theoretical support for an executor
paleolimbot Mar 25, 2022
b1d360f
shuffle tests to make room for the async with executor bit
paleolimbot Mar 25, 2022
21518f0
even more test simplifying
paleolimbot Mar 25, 2022
ed3ea45
insert test with event loop
paleolimbot Mar 25, 2022
7c03527
clang-format
paleolimbot Mar 25, 2022
9750f72
maybe fix cpp linter, add tests
paleolimbot Mar 25, 2022
f361ad5
maybe fix arrow-without-arrow
paleolimbot Mar 25, 2022
431d7b0
change location of main_r_thread
paleolimbot Mar 29, 2022
b373fe6
fix comments and error messages to align with behaviour
paleolimbot Mar 29, 2022
6455c98
don't copy `fun`
paleolimbot Mar 29, 2022
65801f3
RunWithCaptureR(task) -> RunWithCapturedR(make_arrow_call)
paleolimbot Mar 29, 2022
be22a4d
simplify Future/Status usage
paleolimbot Mar 29, 2022
9665226
fix one more lying comment
paleolimbot Mar 29, 2022
c1f62b2
RunTask returns a Result<>
paleolimbot Mar 29, 2022
9cabbdc
MainRThread::RunTask() returns a Future
paleolimbot Mar 29, 2022
50fd564
Implement SafeCallIntoRAsync() and make SafeCallIntoR() use it
paleolimbot Mar 29, 2022
8202e22
handle errors that occur during event loop execution
paleolimbot Mar 29, 2022
20867dd
remove unused class, use ResetError()
paleolimbot Mar 29, 2022
48845fe
remove comment that is no longer relevant
paleolimbot Mar 29, 2022
6a9915a
reviews
paleolimbot Apr 6, 2022
c795d20
make a note of where TestSafeCallIntoR is defined
paleolimbot Apr 7, 2022
c28d2b3
skip custom C++ checks on CRAN
paleolimbot Apr 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@

#' @importFrom vctrs s3_register vec_size vec_cast vec_unique
.onLoad <- function(...) {
if (arrow_available()) {
# Make sure C++ knows on which thread it is safe to call the R API
InitializeMainRThread()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know for a fact that the R thread never changes? For example, in JS, there is always "one thread" but the actual thread id can change from iteration to iteration of the event loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked in the r-lib slack channel and nobody seems to feel that this will be a problem. They did advise to check parallel::mclapply() since this creates a fork of the process, but a check seems to indicate that the value of std::this_thread::get_id() seems to be stable if somebody does happen to do that:

cpp11::cpp_source(code = '
#include "cpp11.hpp"
#include <thread>
#include <sstream>

[[cpp11::register]]
std::string thread_id() {
  std::thread::id id = std::this_thread::get_id();
  std::stringstream ss;
  ss << id;
  return ss.str();
}
')

thread_id()
#> [1] "0x100e33d40"
unique(lapply(1:1e3, function(x) thread_id()))
#> [[1]]
#> [1] "0x100e33d40"
unique(parallel::mclapply(1:1e3, function(x) thread_id(), mc.cores = 8))
#> [[1]]
#> [1] "0x100e33d40"

}

dplyr_methods <- paste0(
"dplyr::",
c(
Expand Down
8 changes: 8 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 33 additions & 28 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 89 additions & 0 deletions r/src/safe-call-into-r-impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "./arrow_types.h"
#if defined(ARROW_R_WITH_ARROW)

#include <functional>
#include <thread>
#include "./safe-call-into-r.h"

MainRThread& GetMainRThread() {
static MainRThread main_r_thread;
return main_r_thread;
}

// [[arrow::export]]
void InitializeMainRThread() { GetMainRThread().Initialize(); }

// [[arrow::export]]
std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string,
Comment on lines +33 to +34
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a precedent for this in the Arrow R package (a place to test C++ code from C++ that is hard to test from R). We probably don't want something like this running on CRAN, but I'm not sure what the best way is to fence this off / keep it from compiling anywhere except CI?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't dug in too much too the code yet, but is this resolved with new commits, or do we still need to find a way to gate this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neal took a quick look and said it it's fine as long as there's a note as to where TestSafeCallIntoR is defined (there's some Altrep tests that do this, too)

std::string opt) {
if (opt == "async_with_executor") {
std::thread* thread_ptr;

auto result =
RunWithCapturedR<std::string>([&thread_ptr, r_fun_that_returns_a_string]() {
auto fut = arrow::Future<std::string>::Make();
thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
auto result = SafeCallIntoR<std::string>([&] {
return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string());
});

fut.MarkFinished(result);
});

return fut;
});

thread_ptr->join();
delete thread_ptr;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is probably fine but you could wrap thread_ptr in a unique_ptr. For example:

thread_ptr = std::unique_ptr<std::thread>(new std::thread(...));

It gets rid of the delete call and guards you against very unlikely things like ->join() throwing an exception and the memory never getting cleaned up (not that such a thing would really matter in test code).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't get this to work without a crash!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Odd. If you want to create a commit (doesn't have to be part of any PR) then I'd be happy to take a look and see what was going on. Otherwise, like I said, it isn't very important, so let's not worry too much about it.


return arrow::ValueOrStop(result);
} else if (opt == "async_without_executor") {
std::thread* thread_ptr;

auto fut = arrow::Future<std::string>::Make();
thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
auto result = SafeCallIntoR<std::string>(
[&] { return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string()); });

if (result.ok()) {
fut.MarkFinished(result.ValueUnsafe());
} else {
fut.MarkFinished(result.status());
}
});

thread_ptr->join();
delete thread_ptr;

// We should be able to get this far, but fut will contain an error
// because it tried to evaluate R code from another thread
return arrow::ValueOrStop(fut.result());

} else if (opt == "on_main_thread") {
auto result = SafeCallIntoR<std::string>(
[&]() { return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string()); });
arrow::StopIfNotOk(result.status());
return result.ValueUnsafe();
} else {
cpp11::stop("Unknown `opt`");
}
}

#endif
145 changes: 145 additions & 0 deletions r/src/safe-call-into-r.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef SAFE_CALL_INTO_R_INCLUDED
#define SAFE_CALL_INTO_R_INCLUDED

#include "./arrow_types.h"

#include <arrow/util/future.h>
#include <arrow/util/thread_pool.h>

#include <functional>
#include <thread>

// The MainRThread class keeps track of the thread on which it is safe
// to call the R API to facilitate its safe use (or erroring
// if it is not safe). The MainRThread singleton can be accessed from
// any thread using GetMainRThread(); the preferred way to call
// the R API where it may not be safe to do so is to use
// SafeCallIntoR<cpp_type>([&]() { ... }).
class MainRThread {
public:
MainRThread() : initialized_(false), executor_(nullptr) {}

// Call this method from the R thread (e.g., on package load)
// to save an internal copy of the thread id.
void Initialize() {
thread_id_ = std::this_thread::get_id();
initialized_ = true;
SetError(R_NilValue);
}

bool IsInitialized() { return initialized_; }

// Check if the current thread is the main R thread
bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }

// The Executor that is running on the main R thread, if it exists
arrow::internal::Executor*& Executor() { return executor_; }

// Save an error token generated from a cpp11::unwind_exception
// so that it can be properly handled after some cleanup code
// has run (e.g., cancelling some futures or waiting for them
// to finish).
void SetError(cpp11::sexp token) { error_token_ = token; }

void ResetError() { error_token_ = R_NilValue; }

// Check if there is a saved error
bool HasError() { return error_token_ != R_NilValue; }

// Throw a cpp11::unwind_exception() with the saved token if it exists
void ClearError() {
if (HasError()) {
cpp11::unwind_exception e(error_token_);
ResetError();
throw e;
}
}

private:
bool initialized_;
std::thread::id thread_id_;
cpp11::sexp error_token_;
arrow::internal::Executor* executor_;
};

// Retrieve the MainRThread singleton
MainRThread& GetMainRThread();

// Call into R and return a C++ object. Note that you can't return
// a SEXP (use cpp11::as_cpp<T> to convert it to a C++ type inside
// `fun`).
template <typename T>
arrow::Future<T> SafeCallIntoRAsync(std::function<T(void)> fun) {
MainRThread& main_r_thread = GetMainRThread();
if (main_r_thread.IsMainThread()) {
// If we're on the main thread, run the task immediately and let
// the cpp11::unwind_exception be thrown since it will be caught
// at the top level.
return fun();
} else if (main_r_thread.Executor() != nullptr) {
// If we are not on the main thread and have an Executor,
// use it to run the task on the main R thread. We can't throw
// a cpp11::unwind_exception here, so we need to propagate it back
// to RunWithCapturedR through the MainRThread singleton.
return DeferNotOk(main_r_thread.Executor()->Submit([fun]() {
if (GetMainRThread().HasError()) {
return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
}

try {
return arrow::Result<T>(fun());
} catch (cpp11::unwind_exception& e) {
GetMainRThread().SetError(e.token);
return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
}
}));
} else {
return arrow::Status::NotImplemented(
"Call to R from a non-R thread without calling RunWithCapturedR");
}
}

template <typename T>
arrow::Result<T> SafeCallIntoR(std::function<T(void)> fun) {
arrow::Future<T> future = SafeCallIntoRAsync<T>(std::move(fun));
return future.result();
}

template <typename T>
arrow::Result<T> RunWithCapturedR(std::function<arrow::Future<T>()> make_arrow_call) {
if (GetMainRThread().Executor() != nullptr) {
return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()");
}

GetMainRThread().ResetError();

arrow::Result<T> result = arrow::internal::SerialExecutor::RunInSerialExecutor<T>(
[make_arrow_call](arrow::internal::Executor* executor) {
GetMainRThread().Executor() = executor;
return make_arrow_call();
});

GetMainRThread().Executor() = nullptr;
GetMainRThread().ClearError();

return result;
}

#endif
Loading