diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 6b969336c93..6bf9a75d0fe 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1112,12 +1112,12 @@ ipc___feather___Reader__version <- function(reader) { .Call(`_arrow_ipc___feather___Reader__version`, reader) } -ipc___feather___Reader__Read <- function(reader, columns) { - .Call(`_arrow_ipc___feather___Reader__Read`, reader, columns) +ipc___feather___Reader__Read <- function(reader, columns, on_old_windows) { + .Call(`_arrow_ipc___feather___Reader__Read`, reader, columns, on_old_windows) } -ipc___feather___Reader__Open <- function(stream) { - .Call(`_arrow_ipc___feather___Reader__Open`, stream) +ipc___feather___Reader__Open <- function(stream, on_old_windows) { + .Call(`_arrow_ipc___feather___Reader__Open`, stream, on_old_windows) } ipc___feather___Reader__schema <- function(reader) { @@ -1384,6 +1384,18 @@ io___BufferOutputStream__Write <- function(stream, bytes) { invisible(.Call(`_arrow_io___BufferOutputStream__Write`, stream, bytes)) } +MakeRConnectionInputStream <- function(con) { + .Call(`_arrow_MakeRConnectionInputStream`, con) +} + +MakeRConnectionOutputStream <- function(con) { + .Call(`_arrow_MakeRConnectionOutputStream`, con) +} + +MakeRConnectionRandomAccessFile <- function(con) { + .Call(`_arrow_MakeRConnectionRandomAccessFile`, con) +} + MakeReencodeInputStream <- function(wrapped, from) { .Call(`_arrow_MakeReencodeInputStream`, wrapped, from) } diff --git a/r/R/feather.R b/r/R/feather.R index 70a270bbe02..6065c285e8d 100644 --- a/r/R/feather.R +++ b/r/R/feather.R @@ -194,7 +194,7 @@ FeatherReader <- R6Class("FeatherReader", inherit = ArrowObject, public = list( Read = function(columns) { - ipc___feather___Reader__Read(self, columns) + ipc___feather___Reader__Read(self, columns, on_old_windows()) }, print = function(...) { cat("FeatherReader:\n") @@ -215,5 +215,5 @@ names.FeatherReader <- function(x) x$column_names FeatherReader$create <- function(file) { assert_is(file, "RandomAccessFile") - ipc___feather___Reader__Open(file) + ipc___feather___Reader__Open(file, on_old_windows()) } diff --git a/r/R/io.R b/r/R/io.R index eafa24fc655..5f332ad8dda 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -200,6 +200,7 @@ BufferReader$create <- function(x) { io___BufferReader__initialize(x) } + #' Create a new read/write memory mapped file of a given size #' #' @param path file path @@ -244,32 +245,59 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem } if (is.string(file)) { if (is_url(file)) { - fs_and_path <- FileSystem$from_uri(file) - filesystem <- fs_and_path$fs - file <- fs_and_path$path + file <- tryCatch({ + fs_and_path <- FileSystem$from_uri(file) + filesystem <- fs_and_path$fs + fs_and_path$path + }, error = function(e) { + MakeRConnectionInputStream(url(file, open = "rb")) + }) } + if (is.null(compression)) { # Infer compression from the file path compression <- detect_compression(file) } + if (!is.null(filesystem)) { file <- filesystem$OpenInputFile(file) - } else if (isTRUE(mmap)) { + } else if (is.string(file) && isTRUE(mmap)) { file <- mmap_open(file) - } else { + } else if (is.string(file)) { file <- ReadableFile$create(file) } + if (!identical(compression, "uncompressed")) { file <- CompressedInputStream$create(file, compression) } } else if (inherits(file, c("raw", "Buffer"))) { file <- BufferReader$create(file) + } else if (inherits(file, "connection")) { + if (!isOpen(file)) { + open(file, "rb") + } + + # Try to create a RandomAccessFile first because some readers need this + # (e.g., feather, parquet) but fall back on an InputStream for the readers + # that don't (e.g., IPC, CSV) + file <- tryCatch( + MakeRConnectionRandomAccessFile(file), + error = function(e) MakeRConnectionInputStream(file) + ) } assert_is(file, "InputStream") file } make_output_stream <- function(x, filesystem = NULL) { + if (inherits(x, "connection")) { + if (!isOpen(x)) { + open(x, "wb") + } + + return(MakeRConnectionOutputStream(x)) + } + if (inherits(x, "SubTreeFileSystem")) { filesystem <- x$base_fs x <- x$base_path @@ -287,7 +315,10 @@ make_output_stream <- function(x, filesystem = NULL) { } detect_compression <- function(path) { - assert_that(is.string(path)) + if (!is.string(path)) { + return("uncompressed") + } + switch(tools::file_ext(path), bz2 = "bz2", gz = "gzip", diff --git a/r/R/parquet.R b/r/R/parquet.R index c6c00ed3a48..4d63791a4f5 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -38,7 +38,7 @@ read_parquet <- function(file, as_data_frame = TRUE, props = ParquetArrowReaderProperties$create(), ...) { - if (is.string(file)) { + if (!inherits(file, "RandomAccessFile")) { file <- make_readable_file(file) on.exit(file$close()) } @@ -541,6 +541,7 @@ ParquetFileReader$create <- function(file, ...) { file <- make_readable_file(file, mmap) assert_is(props, "ParquetArrowReaderProperties") + assert_is(file, "RandomAccessFile") parquet___arrow___FileReader__OpenFile(file, props) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index fb9f3b94d18..760b71a5be3 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -2814,20 +2814,22 @@ BEGIN_CPP11 END_CPP11 } // feather.cpp -std::shared_ptr ipc___feather___Reader__Read(const std::shared_ptr& reader, SEXP columns); -extern "C" SEXP _arrow_ipc___feather___Reader__Read(SEXP reader_sexp, SEXP columns_sexp){ +std::shared_ptr ipc___feather___Reader__Read(const std::shared_ptr& reader, cpp11::sexp columns, bool on_old_windows); +extern "C" SEXP _arrow_ipc___feather___Reader__Read(SEXP reader_sexp, SEXP columns_sexp, SEXP on_old_windows_sexp){ BEGIN_CPP11 arrow::r::Input&>::type reader(reader_sexp); - arrow::r::Input::type columns(columns_sexp); - return cpp11::as_sexp(ipc___feather___Reader__Read(reader, columns)); + arrow::r::Input::type columns(columns_sexp); + arrow::r::Input::type on_old_windows(on_old_windows_sexp); + return cpp11::as_sexp(ipc___feather___Reader__Read(reader, columns, on_old_windows)); END_CPP11 } // feather.cpp -std::shared_ptr ipc___feather___Reader__Open(const std::shared_ptr& stream); -extern "C" SEXP _arrow_ipc___feather___Reader__Open(SEXP stream_sexp){ +std::shared_ptr ipc___feather___Reader__Open(const std::shared_ptr& stream, bool on_old_windows); +extern "C" SEXP _arrow_ipc___feather___Reader__Open(SEXP stream_sexp, SEXP on_old_windows_sexp){ BEGIN_CPP11 arrow::r::Input&>::type stream(stream_sexp); - return cpp11::as_sexp(ipc___feather___Reader__Open(stream)); + arrow::r::Input::type on_old_windows(on_old_windows_sexp); + return cpp11::as_sexp(ipc___feather___Reader__Open(stream, on_old_windows)); END_CPP11 } // feather.cpp @@ -3442,6 +3444,30 @@ BEGIN_CPP11 END_CPP11 } // io.cpp +std::shared_ptr MakeRConnectionInputStream(cpp11::sexp con); +extern "C" SEXP _arrow_MakeRConnectionInputStream(SEXP con_sexp){ +BEGIN_CPP11 + arrow::r::Input::type con(con_sexp); + return cpp11::as_sexp(MakeRConnectionInputStream(con)); +END_CPP11 +} +// io.cpp +std::shared_ptr MakeRConnectionOutputStream(cpp11::sexp con); +extern "C" SEXP _arrow_MakeRConnectionOutputStream(SEXP con_sexp){ +BEGIN_CPP11 + arrow::r::Input::type con(con_sexp); + return cpp11::as_sexp(MakeRConnectionOutputStream(con)); +END_CPP11 +} +// io.cpp +std::shared_ptr MakeRConnectionRandomAccessFile(cpp11::sexp con); +extern "C" SEXP _arrow_MakeRConnectionRandomAccessFile(SEXP con_sexp){ +BEGIN_CPP11 + arrow::r::Input::type con(con_sexp); + return cpp11::as_sexp(MakeRConnectionRandomAccessFile(con)); +END_CPP11 +} +// io.cpp std::shared_ptr MakeReencodeInputStream(const std::shared_ptr& wrapped, std::string from); extern "C" SEXP _arrow_MakeReencodeInputStream(SEXP wrapped_sexp, SEXP from_sexp){ BEGIN_CPP11 @@ -5372,8 +5398,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_arrow__UnregisterRExtensionType", (DL_FUNC) &_arrow_arrow__UnregisterRExtensionType, 1}, { "_arrow_ipc___WriteFeather__Table", (DL_FUNC) &_arrow_ipc___WriteFeather__Table, 6}, { "_arrow_ipc___feather___Reader__version", (DL_FUNC) &_arrow_ipc___feather___Reader__version, 1}, - { "_arrow_ipc___feather___Reader__Read", (DL_FUNC) &_arrow_ipc___feather___Reader__Read, 2}, - { "_arrow_ipc___feather___Reader__Open", (DL_FUNC) &_arrow_ipc___feather___Reader__Open, 1}, + { "_arrow_ipc___feather___Reader__Read", (DL_FUNC) &_arrow_ipc___feather___Reader__Read, 3}, + { "_arrow_ipc___feather___Reader__Open", (DL_FUNC) &_arrow_ipc___feather___Reader__Open, 2}, { "_arrow_ipc___feather___Reader__schema", (DL_FUNC) &_arrow_ipc___feather___Reader__schema, 1}, { "_arrow_Field__initialize", (DL_FUNC) &_arrow_Field__initialize, 3}, { "_arrow_Field__ToString", (DL_FUNC) &_arrow_Field__ToString, 1}, @@ -5440,6 +5466,9 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_io___BufferOutputStream__Finish", (DL_FUNC) &_arrow_io___BufferOutputStream__Finish, 1}, { "_arrow_io___BufferOutputStream__Tell", (DL_FUNC) &_arrow_io___BufferOutputStream__Tell, 1}, { "_arrow_io___BufferOutputStream__Write", (DL_FUNC) &_arrow_io___BufferOutputStream__Write, 2}, + { "_arrow_MakeRConnectionInputStream", (DL_FUNC) &_arrow_MakeRConnectionInputStream, 1}, + { "_arrow_MakeRConnectionOutputStream", (DL_FUNC) &_arrow_MakeRConnectionOutputStream, 1}, + { "_arrow_MakeRConnectionRandomAccessFile", (DL_FUNC) &_arrow_MakeRConnectionRandomAccessFile, 1}, { "_arrow_MakeReencodeInputStream", (DL_FUNC) &_arrow_MakeReencodeInputStream, 2}, { "_arrow_json___ReadOptions__initialize", (DL_FUNC) &_arrow_json___ReadOptions__initialize, 2}, { "_arrow_json___ParseOptions__initialize1", (DL_FUNC) &_arrow_json___ParseOptions__initialize1, 1}, diff --git a/r/src/csv.cpp b/r/src/csv.cpp index bb901e798d7..2902462b50e 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -19,6 +19,8 @@ #if defined(ARROW_R_WITH_ARROW) +#include "./safe-call-into-r.h" + #include #include #include @@ -162,7 +164,16 @@ std::shared_ptr csv___TableReader__Make( // [[arrow::export]] std::shared_ptr csv___TableReader__Read( const std::shared_ptr& table_reader) { +#if !defined(HAS_SAFE_CALL_INTO_R) return ValueOrStop(table_reader->Read()); +#else + const auto& io_context = arrow::io::default_io_context(); + auto result = RunWithCapturedR>([&]() { + return DeferNotOk( + io_context.executor()->Submit([&]() { return table_reader->Read(); })); + }); + return ValueOrStop(result); +#endif } // [[arrow::export]] diff --git a/r/src/feather.cpp b/r/src/feather.cpp index 1df992baaa7..ab66c4bb32c 100644 --- a/r/src/feather.cpp +++ b/r/src/feather.cpp @@ -18,6 +18,9 @@ #include "./arrow_types.h" #if defined(ARROW_R_WITH_ARROW) + +#include "./safe-call-into-r.h" + #include #include @@ -48,34 +51,65 @@ int ipc___feather___Reader__version( // [[arrow::export]] std::shared_ptr ipc___feather___Reader__Read( - const std::shared_ptr& reader, SEXP columns) { - std::shared_ptr table; - - switch (TYPEOF(columns)) { - case STRSXP: { - R_xlen_t n = XLENGTH(columns); - std::vector names(n); - for (R_xlen_t i = 0; i < n; i++) { - names[i] = CHAR(STRING_ELT(columns, i)); - } - StopIfNotOk(reader->Read(names, &table)); - break; + const std::shared_ptr& reader, cpp11::sexp columns, + bool on_old_windows) { + bool use_names = columns != R_NilValue; + std::vector names; + if (use_names) { + cpp11::strings columns_chr(columns); + names.reserve(columns_chr.size()); + for (const auto& name : columns_chr) { + names.push_back(name); } - case NILSXP: - StopIfNotOk(reader->Read(&table)); - break; - default: - cpp11::stop("incompatible column specification"); - break; } - return table; + auto read_table = [&]() { + std::shared_ptr table; + arrow::Status read_result; + if (use_names) { + read_result = reader->Read(names, &table); + } else { + read_result = reader->Read(&table); + } + + if (read_result.ok()) { + return arrow::Result>(table); + } else { + return arrow::Result>(read_result); + } + }; + +#if !defined(HAS_SAFE_CALL_INTO_R) + return ValueOrStop(read_table()); +#else + if (!on_old_windows) { + const auto& io_context = arrow::io::default_io_context(); + auto result = RunWithCapturedR>( + [&]() { return DeferNotOk(io_context.executor()->Submit(read_table)); }); + return ValueOrStop(result); + } else { + return ValueOrStop(read_table()); + } +#endif } // [[arrow::export]] std::shared_ptr ipc___feather___Reader__Open( - const std::shared_ptr& stream) { + const std::shared_ptr& stream, bool on_old_windows) { +#if !defined(HAS_SAFE_CALL_INTO_R) return ValueOrStop(arrow::ipc::feather::Reader::Open(stream)); +#else + if (!on_old_windows) { + const auto& io_context = arrow::io::default_io_context(); + auto result = RunWithCapturedR>([&]() { + return DeferNotOk(io_context.executor()->Submit( + [&]() { return arrow::ipc::feather::Reader::Open(stream); })); + }); + return ValueOrStop(result); + } else { + return ValueOrStop(arrow::ipc::feather::Reader::Open(stream)); + } +#endif } // [[arrow::export]] diff --git a/r/src/io.cpp b/r/src/io.cpp index 7127cbe9ee3..36d8dfa5375 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -19,6 +19,8 @@ #if defined(ARROW_R_WITH_ARROW) +#include "./safe-call-into-r.h" + #include #include @@ -207,7 +209,199 @@ void io___BufferOutputStream__Write( StopIfNotOk(stream->Write(RAW(bytes), bytes.size())); } -// TransformInputStream::TransformFunc wrapper +// ------ RConnectionInputStream / RConnectionOutputStream + +class RConnectionFileInterface : public virtual arrow::io::FileInterface { + public: + explicit RConnectionFileInterface(cpp11::sexp connection_sexp) + : connection_sexp_(connection_sexp), closed_(false) { + check_closed(); + } + + arrow::Status Close() { + if (closed_) { + return arrow::Status::OK(); + } + + closed_ = true; + + return SafeCallIntoRVoid( + [&]() { cpp11::package("base")["close"](connection_sexp_); }); + } + + arrow::Result Tell() const { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + return SafeCallIntoR([&]() { + cpp11::sexp result = cpp11::package("base")["seek"](connection_sexp_); + return cpp11::as_cpp(result); + }); + } + + bool closed() const { return closed_; } + + protected: + cpp11::sexp connection_sexp_; + + // Define the logic here because multiple inheritance makes it difficult + // for this base class, the InputStream and the RandomAccessFile + // interfaces to co-exist. + arrow::Result ReadBase(int64_t nbytes, void* out) { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + return SafeCallIntoR([&] { + cpp11::function read_bin = cpp11::package("base")["readBin"]; + cpp11::writable::raws ptype((R_xlen_t)0); + cpp11::integers n = cpp11::as_sexp(nbytes); + + cpp11::sexp result = read_bin(connection_sexp_, ptype, n); + + int64_t result_size = cpp11::safe[Rf_xlength](result); + memcpy(out, cpp11::safe[RAW](result), result_size); + return result_size; + }); + } + + arrow::Result> ReadBase(int64_t nbytes) { + arrow::BufferBuilder builder; + RETURN_NOT_OK(builder.Reserve(nbytes)); + + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadBase(nbytes, builder.mutable_data())); + builder.UnsafeAdvance(bytes_read); + return builder.Finish(); + } + + arrow::Status WriteBase(const void* data, int64_t nbytes) { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + return SafeCallIntoRVoid([&]() { + cpp11::writable::raws data_raw(nbytes); + memcpy(cpp11::safe[RAW](data_raw), data, nbytes); + + cpp11::function write_bin = cpp11::package("base")["writeBin"]; + write_bin(data_raw, connection_sexp_); + }); + } + + arrow::Status SeekBase(int64_t pos) { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + return SafeCallIntoRVoid([&]() { + cpp11::package("base")["seek"](connection_sexp_, cpp11::as_sexp(pos)); + }); + } + + private: + bool closed_; + + bool check_closed() { + if (closed_) { + return true; + } + + auto is_open_result = SafeCallIntoR([&]() { + cpp11::sexp result = cpp11::package("base")["isOpen"](connection_sexp_); + return cpp11::as_cpp(result); + }); + + if (!is_open_result.ok()) { + closed_ = true; + } else { + closed_ = !is_open_result.ValueUnsafe(); + } + + return closed_; + } +}; + +class RConnectionInputStream : public virtual arrow::io::InputStream, + public RConnectionFileInterface { + public: + explicit RConnectionInputStream(cpp11::sexp connection_sexp) + : RConnectionFileInterface(connection_sexp) {} + + arrow::Result Read(int64_t nbytes, void* out) { return ReadBase(nbytes, out); } + + arrow::Result> Read(int64_t nbytes) { + return ReadBase(nbytes); + } +}; + +class RConnectionRandomAccessFile : public arrow::io::RandomAccessFile, + public RConnectionFileInterface { + public: + explicit RConnectionRandomAccessFile(cpp11::sexp connection_sexp) + : RConnectionFileInterface(connection_sexp) { + // save the current position to seek back to it + auto current_pos = Tell(); + if (!current_pos.ok()) { + cpp11::stop("Tell() returned an error"); + } + int64_t initial_pos = current_pos.ValueUnsafe(); + + cpp11::package("base")["seek"](connection_sexp_, 0, "end"); + current_pos = Tell(); + if (!current_pos.ok()) { + cpp11::stop("Tell() returned an error"); + } + size_ = current_pos.ValueUnsafe(); + + auto status = Seek(initial_pos); + if (!status.ok()) { + cpp11::stop("Seek() returned an error"); + } + } + + arrow::Result GetSize() { return size_; } + + arrow::Status Seek(int64_t pos) { return SeekBase(pos); } + + arrow::Result Read(int64_t nbytes, void* out) { return ReadBase(nbytes, out); } + + arrow::Result> Read(int64_t nbytes) { + return ReadBase(nbytes); + } + + private: + int64_t size_; +}; + +class RConnectionOutputStream : public arrow::io::OutputStream, + public RConnectionFileInterface { + public: + explicit RConnectionOutputStream(cpp11::sexp connection_sexp) + : RConnectionFileInterface(connection_sexp) {} + + arrow::Status Write(const void* data, int64_t nbytes) { + return WriteBase(data, nbytes); + } +}; + +// [[arrow::export]] +std::shared_ptr MakeRConnectionInputStream(cpp11::sexp con) { + return std::make_shared(con); +} + +// [[arrow::export]] +std::shared_ptr MakeRConnectionOutputStream(cpp11::sexp con) { + return std::make_shared(con); +} + +// [[arrow::export]] +std::shared_ptr MakeRConnectionRandomAccessFile( + cpp11::sexp con) { + return std::make_shared(con); +} + +// ------ MakeReencodeInputStream() class RIconvWrapper { public: diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 1a27507b788..0555628d7d5 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -26,6 +26,13 @@ #include #include +// Unwind protection was added in R 3.5 and some calls here use it +// and crash R in older versions (ARROW-16201). We use this define +// to make sure we don't crash on R 3.4 and lower. +#if defined(HAS_UNWIND_PROTECT) +#define HAS_SAFE_CALL_INTO_R +#endif + // The MainRThread class keeps track of the thread on which it is safe // to call the R API to facilitate its safe use (or erroring // if it is not safe). The MainRThread singleton can be accessed from @@ -86,7 +93,7 @@ MainRThread& GetMainRThread(); // a SEXP (use cpp11::as_cpp to convert it to a C++ type inside // `fun`). template -arrow::Future SafeCallIntoRAsync(std::function fun) { +arrow::Future SafeCallIntoRAsync(std::function(void)> fun) { MainRThread& main_r_thread = GetMainRThread(); if (main_r_thread.IsMainThread()) { // If we're on the main thread, run the task immediately and let @@ -104,7 +111,7 @@ arrow::Future SafeCallIntoRAsync(std::function fun) { } try { - return arrow::Result(fun()); + return fun(); } catch (cpp11::unwind_exception& e) { GetMainRThread().SetError(e.token); return arrow::Result(arrow::Status::UnknownError("R code execution error")); @@ -122,8 +129,19 @@ arrow::Result SafeCallIntoR(std::function fun) { return future.result(); } +static inline arrow::Status SafeCallIntoRVoid(std::function fun) { + arrow::Future future = SafeCallIntoRAsync([&fun]() { + fun(); + return true; + }); + return future.status(); +} + template arrow::Result RunWithCapturedR(std::function()> make_arrow_call) { +#if !defined(HAS_SAFE_CALL_INTO_R) + return arrow::Status::NotImplemented("RunWithCapturedR() without UnwindProtect"); +#else if (GetMainRThread().Executor() != nullptr) { return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()"); } @@ -140,6 +158,7 @@ arrow::Result RunWithCapturedR(std::function()> make_arrow_c GetMainRThread().ClearError(); return result; +#endif } #endif diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 08075de8b27..631e75fd74a 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -292,6 +292,27 @@ test_that("more informative error when reading a CSV with headers and schema", { ) }) +test_that("read_csv_arrow() and write_csv_arrow() accept connection objects", { + # connections with csv need RunWithCapturedR, which is not available + # in R <= 3.4.4 + skip_if_r_version("3.4.4") + + tf <- tempfile() + on.exit(unlink(tf)) + + # make this big enough that we might expose concurrency problems, + # but not so big that it slows down the tests + test_tbl <- tibble::tibble( + x = 1:1e4, + y = vapply(x, rlang::hash, character(1), USE.NAMES = FALSE), + z = vapply(y, rlang::hash, character(1), USE.NAMES = FALSE) + ) + + write_csv_arrow(test_tbl, file(tf)) + expect_identical(read_csv_arrow(tf), test_tbl) + expect_identical(read_csv_arrow(file(tf)), read_csv_arrow(tf)) +}) + test_that("CSV reader works on files with non-UTF-8 encoding", { strings <- c("a", "\u00e9", "\U0001f4a9") file_string <- paste0( diff --git a/r/tests/testthat/test-feather.R b/r/tests/testthat/test-feather.R index 80e9d09d8d3..65fb2f03759 100644 --- a/r/tests/testthat/test-feather.R +++ b/r/tests/testthat/test-feather.R @@ -181,6 +181,29 @@ test_that("read_feather requires RandomAccessFile and errors nicely otherwise (A ) }) +test_that("read_feather() and write_feather() accept connection objects", { + # connection object don't work on Windows i386 before R 4.0 + skip_if(on_old_windows()) + # connections with feather need RunWithCapturedR, which is not available + # in R <= 3.4.4 + skip_if_r_version("3.4.4") + + tf <- tempfile() + on.exit(unlink(tf)) + + # make this big enough that we might expose concurrency problems, + # but not so big that it slows down the tests + test_tbl <- tibble::tibble( + x = 1:1e4, + y = vapply(x, rlang::hash, character(1), USE.NAMES = FALSE), + z = vapply(y, rlang::hash, character(1), USE.NAMES = FALSE) + ) + + write_feather(test_tbl, file(tf)) + expect_identical(read_feather(tf), test_tbl) + expect_identical(read_feather(file(tf)), read_feather(tf)) +}) + test_that("read_feather closes connection to file", { tf <- tempfile() on.exit(unlink(tf)) diff --git a/r/tests/testthat/test-io.R b/r/tests/testthat/test-io.R index 8c1d0b7928f..39cfe6e5e68 100644 --- a/r/tests/testthat/test-io.R +++ b/r/tests/testthat/test-io.R @@ -27,6 +27,102 @@ test_that("RandomAccessFile$ReadMetadata() works for LocalFileSystem", { ) }) +test_that("RConnectionInputStream can read from R connections", { + con <- rawConnection(as.raw(1:100)) + seek(con, 12) + stream <- MakeRConnectionRandomAccessFile(con) + expect_identical(stream$GetSize(), 100L) + expect_identical(stream$tell(), 12L) + + expect_identical(as.raw(stream$ReadAt(50, 50)), as.raw(51:100)) + expect_identical(as.raw(stream$ReadAt(0, 50)), as.raw(1:50)) + stream$close() + expect_error(isOpen(con), "invalid connection") +}) + +test_that("RConnectionRandomAccessFile can read from R connections", { + con <- rawConnection(as.raw(1:100)) + stream <- MakeRConnectionInputStream(con) + + expect_identical(as.raw(stream$Read(50)), as.raw(1:50)) + expect_identical(as.raw(stream$Read(50)), as.raw(51:100)) + stream$close() + expect_error(isOpen(con), "invalid connection") +}) + +test_that("RConnectionOutputStream can write to R connections", { + tf <- tempfile() + on.exit(unlink(tf)) + + con <- file(tf, open = "wb") + stream <- MakeRConnectionOutputStream(con) + stream$write(as.raw(1:50)) + stream$write(as.raw(51:100)) + stream$close() + expect_error(isOpen(con), "invalid connection") + + con <- file(tf, open = "rb") + expect_identical(readBin(con, raw(), 100), as.raw(1:100)) + expect_identical(readBin(con, raw(), 100), raw()) + close(con) +}) + +test_that("make_readable_file() works for non-filesystem URLs", { + skip_if_offline() + + readable_file <- make_readable_file( + "https://github.com/apache/arrow/raw/master/r/inst/v0.7.1.parquet" + ) + expect_r6_class(readable_file, "InputStream") + expect_identical(rawToChar(as.raw(readable_file$Read(3))), "PAR") + readable_file$close() +}) + +test_that("make_readable_file() works for seekable connection objects", { + con <- rawConnection(as.raw(1:100)) + readable_file <- make_readable_file(con) + expect_r6_class(readable_file, "RandomAccessFile") + expect_identical(as.raw(readable_file$Read(100)), as.raw(1:100)) + readable_file$close() +}) + +test_that("make_readable_file() and make_writable_file() open connections", { + tf <- tempfile() + on.exit(unlink(tf)) + + # check a seekable connection + write("abcdefg", tf) + readable_file <- make_readable_file(file(tf)) + expect_r6_class(readable_file, "RandomAccessFile") + expect_identical( + rawToChar(as.raw(readable_file$Read(7))), + "abcdefg" + ) + readable_file$close() + + # check output stream/non-seekable connection + con <- gzfile(tf) + stream <- make_output_stream(con) + stream$write(as.raw(1:100)) + stream$close() + + readable_file <- make_readable_file(gzfile(tf)) + expect_identical( + as.raw(readable_file$Read(100)), + as.raw(1:100) + ) + readable_file$close() +}) + +test_that("make_output_stream() works for connection objects", { + tf <- tempfile() + on.exit(unlink(tf)) + + con <- rawConnection(as.raw(1:100)) + expect_r6_class(make_readable_file(con), "InputStream") + close(con) +}) + test_that("reencoding input stream works for windows-1252", { string <- "province_name\nQu\u00e9bec" bytes_windows1252 <- iconv( diff --git a/r/tests/testthat/test-ipc_stream.R b/r/tests/testthat/test-ipc_stream.R new file mode 100644 index 00000000000..905a22f679c --- /dev/null +++ b/r/tests/testthat/test-ipc_stream.R @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +test_that("read_ipc_stream() and write_ipc_stream() accept connection objects", { + tf <- tempfile() + on.exit(unlink(tf)) + + test_tbl <- tibble::tibble( + x = 1:1e4, + y = vapply(x, rlang::hash, character(1), USE.NAMES = FALSE), + z = vapply(y, rlang::hash, character(1), USE.NAMES = FALSE) + ) + + write_ipc_stream(test_tbl, file(tf)) + expect_identical(read_ipc_stream(tf), test_tbl) + expect_identical(read_ipc_stream(file(tf)), read_ipc_stream(tf)) +}) diff --git a/r/tests/testthat/test-parquet.R b/r/tests/testthat/test-parquet.R index d4b088928ad..3f43c3d8941 100644 --- a/r/tests/testthat/test-parquet.R +++ b/r/tests/testthat/test-parquet.R @@ -199,6 +199,23 @@ test_that("Maps are preserved when writing/reading from Parquet", { expect_equal(df, df_read, ignore_attr = TRUE) }) +test_that("read_parquet() and write_parquet() accept connection objects", { + tf <- tempfile() + on.exit(unlink(tf)) + + # make this big enough that we might expose concurrency problems, + # but not so big that it slows down the tests + test_tbl <- tibble::tibble( + x = 1:1e4, + y = vapply(x, rlang::hash, character(1), USE.NAMES = FALSE), + z = vapply(y, rlang::hash, character(1), USE.NAMES = FALSE) + ) + + write_parquet(test_tbl, file(tf)) + expect_identical(read_parquet(tf), test_tbl) + expect_identical(read_parquet(file(tf)), read_parquet(tf)) +}) + test_that("write_parquet() to stream", { df <- tibble::tibble(x = 1:5) tf <- tempfile() @@ -231,6 +248,14 @@ test_that("write_parquet() handles version argument", { }) }) +test_that("ParquetFileReader raises an error for non-RandomAccessFile source", { + skip_if_not_available("gzip") + expect_error( + ParquetFileReader$create(CompressedInputStream$create(pq_file)), + 'file must be a "RandomAccessFile"' + ) +}) + test_that("ParquetFileWriter raises an error for non-OutputStream sink", { sch <- schema(a = float32()) # ARROW-9946 diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R index 55cb68abdd3..ab69c339c5c 100644 --- a/r/tests/testthat/test-safe-call-into-r.R +++ b/r/tests/testthat/test-safe-call-into-r.R @@ -32,6 +32,7 @@ test_that("SafeCallIntoR works from the main R thread", { }) test_that("SafeCallIntoR works within RunWithCapturedR", { + skip_if_r_version("3.4.4") skip_on_cran() expect_identical(