diff --git a/host/tests/integration_tests/evil/fs.rs b/host/tests/integration_tests/evil/fs.rs index 032aeb5..73e0777 100644 --- a/host/tests/integration_tests/evil/fs.rs +++ b/host/tests/integration_tests/evil/fs.rs @@ -1,8 +1,9 @@ use arrow::{ - array::{Array, RecordBatch, StringArray}, + array::{RecordBatch, StringArray}, + compute::concat, datatypes::{DataType, Field}, }; -use datafusion_common::{config::ConfigOptions, test_util::batches_to_string}; +use datafusion_common::{ScalarValue, config::ConfigOptions, test_util::batches_to_string}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, async_udf::AsyncScalarUDFImpl, }; @@ -44,10 +45,8 @@ const PATHS: &[&str] = &[ #[tokio::test] async fn test_canonicalize() { - let udf = udf("canonicalize").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("canonicalize").await, @r" +-------------+-----------------------------------------------+ | path | result | @@ -86,10 +85,8 @@ async fn test_canonicalize() { #[tokio::test] async fn test_copy() { - let udf = udf("copy").await; - insta::assert_snapshot!( - run_2(&udf).await, + run_2("copy").await, @r" +-------------+-------------+-------------------------------------------------+ | from | to | output | @@ -830,10 +827,8 @@ async fn test_copy() { #[tokio::test] async fn test_create_dir() { - let udf = udf("create_dir").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("create_dir").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -846,15 +841,15 @@ async fn test_create_dir() { | /boot | OK: created | | /dev | OK: created | | /etc | OK: created | - | /etc/group | OK: created | - | /etc/passwd | OK: created | - | /etc/shadow | OK: created | + | /etc/group | ERR: No such file or directory (os error 44) | + | /etc/passwd | ERR: No such file or directory (os error 44) | + | /etc/shadow | ERR: No such file or directory (os error 44) | | /home | OK: created | | /lib | OK: created | | /lib64 | OK: created | | /opt | OK: created | | /proc | OK: created | - | /proc/self | OK: created | + | /proc/self | ERR: No such file or directory (os error 44) | | /root | OK: created | | /run | OK: created | | /sbin | OK: created | @@ -872,10 +867,8 @@ async fn test_create_dir() { #[tokio::test] async fn test_exists() { - let udf = udf("exists").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("exists").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -914,10 +907,8 @@ async fn test_exists() { #[tokio::test] async fn test_hard_link() { - let udf = udf("hard_link").await; - insta::assert_snapshot!( - run_2(&udf).await, + run_2("hard_link").await, @r" +-------------+-------------+-------------------------------------------------+ | from | to | output | @@ -1658,10 +1649,8 @@ async fn test_hard_link() { #[tokio::test] async fn test_metadata() { - let udf = udf("metadata").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("metadata").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -1700,10 +1689,8 @@ async fn test_metadata() { #[tokio::test] async fn test_open_append() { - let udf = udf("open_append").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("open_append").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -1742,10 +1729,8 @@ async fn test_open_append() { #[tokio::test] async fn test_open_create() { - let udf = udf("open_create").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("open_create").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -1784,10 +1769,8 @@ async fn test_open_create() { #[tokio::test] async fn test_open_create_new() { - let udf = udf("open_create_new").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("open_create_new").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -1826,10 +1809,8 @@ async fn test_open_create_new() { #[tokio::test] async fn test_open_read() { - let udf = udf("open_read").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("open_read").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -1868,10 +1849,8 @@ async fn test_open_read() { #[tokio::test] async fn test_open_truncate() { - let udf = udf("open_truncate").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("open_truncate").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -1910,10 +1889,8 @@ async fn test_open_truncate() { #[tokio::test] async fn test_open_write() { - let udf = udf("open_write").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("open_write").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -1952,10 +1929,8 @@ async fn test_open_write() { #[tokio::test] async fn test_read_dir() { - let udf = udf("read_dir").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("read_dir").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -1994,10 +1969,8 @@ async fn test_read_dir() { #[tokio::test] async fn test_read_link() { - let udf = udf("read_link").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("read_link").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -2036,10 +2009,8 @@ async fn test_read_link() { #[tokio::test] async fn test_remove_dir() { - let udf = udf("remove_dir").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("remove_dir").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -2078,10 +2049,8 @@ async fn test_remove_dir() { #[tokio::test] async fn test_remove_file() { - let udf = udf("remove_file").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("remove_file").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -2120,10 +2089,8 @@ async fn test_remove_file() { #[tokio::test] async fn test_rename() { - let udf = udf("rename").await; - insta::assert_snapshot!( - run_2(&udf).await, + run_2("rename").await, @r" +-------------+-------------+-------------------------------------------------+ | from | to | output | @@ -2864,10 +2831,8 @@ async fn test_rename() { #[tokio::test] async fn test_set_permissions() { - let udf = udf("set_permissions").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("set_permissions").await, @r" +-------------+-----------------------------------------------+ | path | result | @@ -2906,10 +2871,8 @@ async fn test_set_permissions() { #[tokio::test] async fn test_symlink_metadata() { - let udf = udf("symlink_metadata").await; - insta::assert_snapshot!( - run_1(&udf).await, + run_1("symlink_metadata").await, @r" +-------------+-------------------------------------------------+ | path | result | @@ -2980,81 +2943,88 @@ fn nice_path(path: &str) -> String { } /// Run UDF that expects one string input. -async fn run_1(udf: &WasmScalarUdf) -> String { - let input = Arc::new( - PATHS - .iter() - .map(|p| Some(p.to_owned())) - .collect::(), - ); - let input_nice = Arc::new( - input - .iter() - .map(|s| s.map(nice_path)) - .collect::(), - ) as _; +async fn run_1(name: &'static str) -> String { + let mut input_nice = Vec::with_capacity(PATHS.len()); + let mut results = Vec::with_capacity(PATHS.len()); + for input in PATHS { + // tests are stateful, hence get a fresh UDF for every input + let udf = udf(name).await; - let result = udf - .invoke_async_with_args(ScalarFunctionArgs { - args: vec![ColumnarValue::Array(input as _)], - arg_fields: vec![Arc::new(Field::new("a", DataType::Utf8, true))], - number_rows: PATHS.len(), - return_field: Arc::new(Field::new("r", DataType::Utf8, true)), - config_options: Arc::new(ConfigOptions::default()), - }) - .await - .unwrap() - .unwrap_array(); + let result = udf + .invoke_async_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(Some( + (*input).to_owned(), + )))], + arg_fields: vec![Arc::new(Field::new("a", DataType::Utf8, true))], + number_rows: 1, + return_field: Arc::new(Field::new("r", DataType::Utf8, true)), + config_options: Arc::new(ConfigOptions::default()), + }) + .await + .unwrap() + .unwrap_array(); - batches_to_string(&[ - RecordBatch::try_from_iter([("path", input_nice), ("result", result)]).unwrap(), + results.push(result); + input_nice.push(nice_path(input)); + } + + let results_ref = results.iter().map(|a| a.as_ref()).collect::>(); + + batches_to_string(&[RecordBatch::try_from_iter([ + ( + "path", + Arc::new(StringArray::from_iter_values(input_nice)) as _, + ), + ("result", concat(&results_ref).unwrap()), ]) + .unwrap()]) } /// Run UDF that expects two string inputs. -async fn run_2(udf: &WasmScalarUdf) -> String { +async fn run_2(name: &'static str) -> String { let (paths_from, paths_to) = cross(PATHS); + let mut input_from_nice = Vec::with_capacity(paths_from.len()); + let mut input_to_nice = Vec::with_capacity(paths_to.len()); + let mut results = Vec::with_capacity(paths_from.len()); + for (input_from, input_to) in paths_from.into_iter().zip(paths_to) { + // tests are stateful, hence get a fresh UDF for every input + let udf = udf(name).await; - let input_from = Arc::new(paths_from.into_iter().map(Some).collect::()); - let input_to = Arc::new(paths_to.into_iter().map(Some).collect::()); + let result = udf + .invoke_async_with_args(ScalarFunctionArgs { + args: vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(Some(input_from.clone()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some(input_to.clone()))), + ], + arg_fields: vec![ + Arc::new(Field::new("from", DataType::Utf8, true)), + Arc::new(Field::new("to", DataType::Utf8, true)), + ], + number_rows: 1, + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + config_options: Arc::new(ConfigOptions::default()), + }) + .await + .unwrap() + .unwrap_array(); - let input_from_nice = Arc::new( - input_from - .iter() - .map(|s| s.map(nice_path)) - .collect::(), - ) as _; - let input_to_nice = Arc::new( - input_to - .iter() - .map(|s| s.map(nice_path)) - .collect::(), - ) as _; - - let number_rows = input_from.len(); + results.push(result); + input_from_nice.push(nice_path(&input_from)); + input_to_nice.push(nice_path(&input_to)); + } - let result = udf - .invoke_async_with_args(ScalarFunctionArgs { - args: vec![ - ColumnarValue::Array(input_from as _), - ColumnarValue::Array(input_to as _), - ], - arg_fields: vec![ - Arc::new(Field::new("from", DataType::Utf8, true)), - Arc::new(Field::new("to", DataType::Utf8, true)), - ], - number_rows, - return_field: Arc::new(Field::new("result", DataType::Utf8, true)), - config_options: Arc::new(ConfigOptions::default()), - }) - .await - .unwrap() - .unwrap_array(); + let results_ref = results.iter().map(|a| a.as_ref()).collect::>(); batches_to_string(&[RecordBatch::try_from_iter([ - ("from", input_from_nice), - ("to", input_to_nice), - ("output", result), + ( + "from", + Arc::new(StringArray::from_iter_values(input_from_nice)) as _, + ), + ( + "to", + Arc::new(StringArray::from_iter_values(input_to_nice)) as _, + ), + ("output", concat(&results_ref).unwrap()), ]) .unwrap()]) }