Skip to content

Commit

Permalink
Support shell expansion in file path.
Browse files Browse the repository at this point in the history
  • Loading branch information
2010YOUY01 committed Jul 23, 2023
1 parent 77fafb9 commit d71fd9e
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
rpath = false
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ ctor = "0.2.0"
doc-comment = "0.3"
env_logger = "0.10"
half = "2.2.1"
home = "0.5"
postgres-protocol = "0.6.4"
postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"] }
regex = "1.5.4"
Expand Down
93 changes: 92 additions & 1 deletion datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use itertools::Itertools;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use percent_encoding;
use std::process::Command;
use url::Url;

/// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`]
Expand Down Expand Up @@ -87,6 +88,34 @@ impl ListingTableUrl {
}
}

/// Perform shell-like path expansions
/// * Home directory expansion: "~/test.csv" expands to "/Users/user1/test.csv"
/// * Environment variable expansion: "$HOME/$DATA/test.csv" expands to
/// "/Users/user1/data/test.csv"
fn expand_path_prefix(prefix: &str) -> Result<String, DataFusionError> {
let error_msg = format!("Failed to perform shell expansion in path: {prefix}");

let expanded_dir_output = Command::new("sh")
.arg("-c")
.arg(&format!("echo {prefix}"))
.output()
.map_err(|_| DataFusionError::Internal(error_msg.clone()))?;

let expand_dir_result = if expanded_dir_output.status.success() {
String::from_utf8(expanded_dir_output.stdout)
.map(|mut s| {
assert!(s.ends_with('\n')); // echo will append trailing \n
s.truncate(s.len() - 1);
s
})
.map_err(|_| DataFusionError::Internal(String::new()))
} else {
Err(DataFusionError::Internal(String::new()))
};

expand_dir_result.map_err(|_| DataFusionError::Internal(error_msg))
}

/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
fn parse_path(s: &str) -> Result<Self> {
let (prefix, glob) = match split_glob_expression(s) {
Expand All @@ -98,7 +127,16 @@ impl ListingTableUrl {
None => (s, None),
};

let path = std::path::Path::new(prefix).canonicalize()?;
// Expand path like "~/$DATA/test.csv" (not supported by std::path)
let prefix = if !cfg!(target_os = "windows")
&& (prefix.starts_with('~') || prefix.contains('$'))
{
Self::expand_path_prefix(prefix)?
} else {
prefix.to_string()
};

let path = std::path::Path::new(&prefix).canonicalize()?;
let url = if path.is_dir() {
Url::from_directory_path(path)
} else {
Expand Down Expand Up @@ -241,6 +279,7 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
#[cfg(test)]
mod tests {
use super::*;
use crate::prelude::*;

#[test]
fn test_prefix_path() {
Expand Down Expand Up @@ -323,4 +362,56 @@ mod tests {
Some(("/a/b/c//", "alltypes_plain*.parquet")),
);
}

mod tests_path_expansion {
use super::*;
use crate::assert_batches_eq;
use crate::test_util;
use home;
use std::env;

#[test]
fn test_path_homedir_expansion() -> Result<()> {
let expanded_home_dir = ListingTableUrl::expand_path_prefix("~")?;
let home_dir = home::home_dir()
.unwrap()
.into_os_string()
.into_string()
.unwrap();
assert_eq!(home_dir, expanded_home_dir);

let expanded_data_dir = ListingTableUrl::expand_path_prefix("~/data/")?;
assert_eq!(home_dir + "/data/", expanded_data_dir);

Ok(())
}

#[tokio::test]
async fn test_path_envvar_expansion() -> Result<()> {
let ctx = SessionContext::new();
let testdata = test_util::arrow_test_data();
env::set_var("TESTDIR", testdata);
env::set_var("AGGR_FILE_NAME", "aggregate_test_100.csv");
ctx.register_csv(
"aggr",
"$TESTDIR/csv/$AGGR_FILE_NAME",
CsvReadOptions::new(),
)
.await?;

let query = "select sum(c2) from aggr;";
let query_result = ctx.sql(query).await?.collect().await?;

#[rustfmt::skip]
let expected = vec![
"+--------------+",
"| SUM(aggr.c2) |",
"+--------------+",
"| 285 |",
"+--------------+",
];
assert_batches_eq!(expected, &query_result);
Ok(())
}
}
}

0 comments on commit d71fd9e

Please sign in to comment.