Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a37a311
resolve merge conflicts
paleolimbot Mar 25, 2022
2308545
start on using safecallintor
paleolimbot Apr 8, 2022
c47564f
actually use safecallintor
paleolimbot Apr 8, 2022
94d40a9
uncomment tests that should pass
paleolimbot Apr 11, 2022
4e4dd72
use RunWithCapturedR for some specific functions
paleolimbot Apr 11, 2022
37f9376
clarify comment
paleolimbot Apr 12, 2022
395ee50
use the io thread pool for RunWithCapturedR in the feather reader
paleolimbot Apr 12, 2022
f26a449
use TableReader->ReadAsync() for csv reader
paleolimbot Apr 12, 2022
4c1dee1
more realistic test for connection objects with readers
paleolimbot Apr 12, 2022
7e1593f
clang-format
paleolimbot Apr 12, 2022
7e745cc
Update r/src/io.cpp
paleolimbot Apr 12, 2022
db9cecd
implement and use SafeCallIntoRVoid()
paleolimbot Apr 12, 2022
1751eaa
improve SafeCallIntoR usage in io.cpp
paleolimbot Apr 12, 2022
d4e24b1
also use bigger table for ipc stream test
paleolimbot Apr 12, 2022
29828d1
complete thought on comment
paleolimbot Apr 12, 2022
4f136b2
try the other async trick for the csv reader to see if it works on wi…
paleolimbot Apr 14, 2022
220b79c
don't use RunWithCapturedR for feather reading on old windows
paleolimbot Apr 14, 2022
4acdbf4
clang-format
paleolimbot Apr 14, 2022
76ae1ab
try to avoid segfault on R 3.4
paleolimbot Apr 21, 2022
eac17c5
skip one more SafeCallIntoR test on R 3.4
paleolimbot Apr 22, 2022
13ddd20
maybe pass CMD check on R 3.4
paleolimbot Apr 22, 2022
82e6b6d
fix csv reading on R 3.4
paleolimbot Apr 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/R/feather.R
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ FeatherReader <- R6Class("FeatherReader",
inherit = ArrowObject,
public = list(
Read = function(columns) {
ipc___feather___Reader__Read(self, columns)
ipc___feather___Reader__Read(self, columns, on_old_windows())
},
print = function(...) {
cat("FeatherReader:\n")
Expand All @@ -215,5 +215,5 @@ names.FeatherReader <- function(x) x$column_names

FeatherReader$create <- function(file) {
assert_is(file, "RandomAccessFile")
ipc___feather___Reader__Open(file)
ipc___feather___Reader__Open(file, on_old_windows())
}
43 changes: 37 additions & 6 deletions r/R/io.R
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ BufferReader$create <- function(x) {
io___BufferReader__initialize(x)
}


#' Create a new read/write memory mapped file of a given size
#'
#' @param path file path
Expand Down Expand Up @@ -244,32 +245,59 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem
}
if (is.string(file)) {
if (is_url(file)) {
fs_and_path <- FileSystem$from_uri(file)
filesystem <- fs_and_path$fs
file <- fs_and_path$path
file <- tryCatch({
fs_and_path <- FileSystem$from_uri(file)
filesystem <- fs_and_path$fs
fs_and_path$path
}, error = function(e) {
MakeRConnectionInputStream(url(file, open = "rb"))
})
}

if (is.null(compression)) {
# Infer compression from the file path
compression <- detect_compression(file)
}

if (!is.null(filesystem)) {
file <- filesystem$OpenInputFile(file)
} else if (isTRUE(mmap)) {
} else if (is.string(file) && isTRUE(mmap)) {
file <- mmap_open(file)
} else {
} else if (is.string(file)) {
file <- ReadableFile$create(file)
}

if (!identical(compression, "uncompressed")) {
file <- CompressedInputStream$create(file, compression)
}
} else if (inherits(file, c("raw", "Buffer"))) {
file <- BufferReader$create(file)
} else if (inherits(file, "connection")) {
if (!isOpen(file)) {
open(file, "rb")
}

# Try to create a RandomAccessFile first because some readers need this
# (e.g., feather, parquet) but fall back on an InputStream for the readers
# that don't (e.g., IPC, CSV)
file <- tryCatch(
MakeRConnectionRandomAccessFile(file),
error = function(e) MakeRConnectionInputStream(file)
)
}
assert_is(file, "InputStream")
file
}

make_output_stream <- function(x, filesystem = NULL) {
if (inherits(x, "connection")) {
if (!isOpen(x)) {
open(x, "wb")
}

return(MakeRConnectionOutputStream(x))
}

if (inherits(x, "SubTreeFileSystem")) {
filesystem <- x$base_fs
x <- x$base_path
Expand All @@ -287,7 +315,10 @@ make_output_stream <- function(x, filesystem = NULL) {
}

detect_compression <- function(path) {
assert_that(is.string(path))
if (!is.string(path)) {
return("uncompressed")
}

switch(tools::file_ext(path),
bz2 = "bz2",
gz = "gzip",
Expand Down
3 changes: 2 additions & 1 deletion r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ read_parquet <- function(file,
as_data_frame = TRUE,
props = ParquetArrowReaderProperties$create(),
...) {
if (is.string(file)) {
if (!inherits(file, "RandomAccessFile")) {
file <- make_readable_file(file)
on.exit(file$close())
}
Expand Down Expand Up @@ -541,6 +541,7 @@ ParquetFileReader$create <- function(file,
...) {
file <- make_readable_file(file, mmap)
assert_is(props, "ParquetArrowReaderProperties")
assert_is(file, "RandomAccessFile")

parquet___arrow___FileReader__OpenFile(file, props)
}
Expand Down
47 changes: 38 additions & 9 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions r/src/csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#if defined(ARROW_R_WITH_ARROW)

#include "./safe-call-into-r.h"

#include <arrow/csv/reader.h>
#include <arrow/csv/writer.h>
#include <arrow/memory_pool.h>
Expand Down Expand Up @@ -162,7 +164,16 @@ std::shared_ptr<arrow::csv::TableReader> csv___TableReader__Make(
// [[arrow::export]]
std::shared_ptr<arrow::Table> csv___TableReader__Read(
const std::shared_ptr<arrow::csv::TableReader>& table_reader) {
#if !defined(HAS_SAFE_CALL_INTO_R)
return ValueOrStop(table_reader->Read());
#else
const auto& io_context = arrow::io::default_io_context();
auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
return DeferNotOk(
io_context.executor()->Submit([&]() { return table_reader->Read(); }));
});
return ValueOrStop(result);
#endif
}

// [[arrow::export]]
Expand Down
74 changes: 54 additions & 20 deletions r/src/feather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "./arrow_types.h"

#if defined(ARROW_R_WITH_ARROW)

#include "./safe-call-into-r.h"

#include <arrow/ipc/feather.h>
#include <arrow/type.h>

Expand Down Expand Up @@ -48,34 +51,65 @@ int ipc___feather___Reader__version(

// [[arrow::export]]
std::shared_ptr<arrow::Table> ipc___feather___Reader__Read(
const std::shared_ptr<arrow::ipc::feather::Reader>& reader, SEXP columns) {
std::shared_ptr<arrow::Table> table;

switch (TYPEOF(columns)) {
case STRSXP: {
R_xlen_t n = XLENGTH(columns);
std::vector<std::string> names(n);
for (R_xlen_t i = 0; i < n; i++) {
names[i] = CHAR(STRING_ELT(columns, i));
}
StopIfNotOk(reader->Read(names, &table));
break;
const std::shared_ptr<arrow::ipc::feather::Reader>& reader, cpp11::sexp columns,
bool on_old_windows) {
bool use_names = columns != R_NilValue;
std::vector<std::string> names;
if (use_names) {
cpp11::strings columns_chr(columns);
names.reserve(columns_chr.size());
for (const auto& name : columns_chr) {
names.push_back(name);
}
case NILSXP:
StopIfNotOk(reader->Read(&table));
break;
default:
cpp11::stop("incompatible column specification");
break;
}

return table;
auto read_table = [&]() {
std::shared_ptr<arrow::Table> table;
arrow::Status read_result;
if (use_names) {
read_result = reader->Read(names, &table);
} else {
read_result = reader->Read(&table);
}

if (read_result.ok()) {
return arrow::Result<std::shared_ptr<arrow::Table>>(table);
} else {
return arrow::Result<std::shared_ptr<arrow::Table>>(read_result);
}
};

#if !defined(HAS_SAFE_CALL_INTO_R)
return ValueOrStop(read_table());
#else
if (!on_old_windows) {
const auto& io_context = arrow::io::default_io_context();
auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>(
[&]() { return DeferNotOk(io_context.executor()->Submit(read_table)); });
return ValueOrStop(result);
} else {
return ValueOrStop(read_table());
}
#endif
}

// [[arrow::export]]
std::shared_ptr<arrow::ipc::feather::Reader> ipc___feather___Reader__Open(
const std::shared_ptr<arrow::io::RandomAccessFile>& stream) {
const std::shared_ptr<arrow::io::RandomAccessFile>& stream, bool on_old_windows) {
#if !defined(HAS_SAFE_CALL_INTO_R)
return ValueOrStop(arrow::ipc::feather::Reader::Open(stream));
#else
if (!on_old_windows) {
const auto& io_context = arrow::io::default_io_context();
auto result = RunWithCapturedR<std::shared_ptr<arrow::ipc::feather::Reader>>([&]() {
return DeferNotOk(io_context.executor()->Submit(
[&]() { return arrow::ipc::feather::Reader::Open(stream); }));
});
return ValueOrStop(result);
} else {
return ValueOrStop(arrow::ipc::feather::Reader::Open(stream));
}
#endif
}

// [[arrow::export]]
Expand Down
Loading