diff --git a/README.md b/README.md index 68a65e8..70e8ec7 100644 --- a/README.md +++ b/README.md @@ -53,11 +53,11 @@ can connect using psql or language drivers to execute `SELECT` queries against them. ``` -datafusion-postgres 0.1.0 -A postgres interface for datatfusion. Serve any CSV/JSON/Arrow files as tables. +datafusion-postgres 0.4.0 +A postgres interface for datafusion. Serve any CSV/JSON/Arrow files as tables. USAGE: - datafusion-postgres [OPTIONS] + datafusion-postgres-cli [OPTIONS] FLAGS: -h, --help Prints help information @@ -67,8 +67,11 @@ OPTIONS: --arrow ... Arrow files to register as table, using syntax `table_name:file_path` --avro ... Avro files to register as table, using syntax `table_name:file_path` --csv ... CSV files to register as table, using syntax `table_name:file_path` + -d, --dir Directory to serve, all supported files will be registered as tables + --host Host address the server listens to, default to 127.0.0.1 [default: 127.0.0.1] --json ... JSON files to register as table, using syntax `table_name:file_path` --parquet ... Parquet files to register as table, using syntax `table_name:file_path` + -p Port the server listens to, default to 5432 [default: 5432] ``` For example, we use this command to host `ETTm1.csv` dataset as table `ettm1`. diff --git a/datafusion-postgres-cli/src/main.rs b/datafusion-postgres-cli/src/main.rs index a9bfe4a..ae69aac 100644 --- a/datafusion-postgres-cli/src/main.rs +++ b/datafusion-postgres-cli/src/main.rs @@ -1,3 +1,6 @@ +use std::ffi::OsStr; +use std::fs; + use datafusion::execution::options::{ ArrowReadOptions, AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, }; @@ -26,6 +29,9 @@ struct Opt { /// Avro files to register as table, using syntax `table_name:file_path` #[structopt(long("avro"))] avro_tables: Vec, + /// Directory to serve, all supported files will be registered as tables + #[structopt(long("dir"), short("d"))] + directory: Option, /// Port the server listens to, default to 5432 #[structopt(short, default_value = "5432")] port: u16, @@ -40,12 +46,75 @@ fn parse_table_def(table_def: &str) -> (&str, &str) { .expect("Use this pattern to register table: table_name:file_path") } -#[tokio::main] -async fn main() -> Result<(), Box> { - let opts = Opt::from_args(); +impl Opt { + fn include_directory_files(&mut self) -> Result<(), Box> { + if let Some(directory) = &self.directory { + match fs::read_dir(directory) { + Ok(entries) => { + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } - let session_context = SessionContext::new(); + if let Some(ext) = path.extension().and_then(OsStr::to_str) { + let ext_lower = ext.to_lowercase(); + if let Some(base_name) = path.file_stem().and_then(|s| s.to_str()) { + match ext_lower.as_ref() { + "json" => { + self.json_tables.push(format!( + "{}:{}", + base_name, + path.to_string_lossy() + )); + } + "avro" => { + self.avro_tables.push(format!( + "{}:{}", + base_name, + path.to_string_lossy() + )); + } + "parquet" => { + self.parquet_tables.push(format!( + "{}:{}", + base_name, + path.to_string_lossy() + )); + } + "csv" => { + self.csv_tables.push(format!( + "{}:{}", + base_name, + path.to_string_lossy() + )); + } + "arrow" => { + self.arrow_tables.push(format!( + "{}:{}", + base_name, + path.to_string_lossy() + )); + } + _ => {} + } + } + } + } + } + Err(e) => { + return Err(format!("Failed to load directory {}: {}", directory, e).into()); + } + } + } + Ok(()) + } +} +async fn setup_session_context( + session_context: &SessionContext, + opts: &Opt, +) -> Result<(), Box> { // Register CSV tables for (table_name, table_path) in opts.csv_tables.iter().map(|s| parse_table_def(s.as_ref())) { session_context @@ -99,6 +168,18 @@ async fn main() -> Result<(), Box> { println!("Loaded {} as table {}", table_path, table_name); } + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut opts = Opt::from_args(); + opts.include_directory_files()?; + + let session_context = SessionContext::new(); + + setup_session_context(&session_context, &opts).await?; + let server_options = ServerOptions::new() .with_host(opts.host) .with_port(opts.port); diff --git a/datafusion-postgres/src/lib.rs b/datafusion-postgres/src/lib.rs index af1bbc4..454761e 100644 --- a/datafusion-postgres/src/lib.rs +++ b/datafusion-postgres/src/lib.rs @@ -3,8 +3,6 @@ mod encoder; mod handlers; mod information_schema; -pub use handlers::{DfSessionService, HandlerFactory, Parser}; - use std::sync::Arc; use datafusion::prelude::SessionContext; @@ -12,6 +10,9 @@ use getset::{Getters, Setters, WithSetters}; use pgwire::tokio::process_socket; use tokio::net::TcpListener; +use handlers::HandlerFactory; +pub use handlers::{DfSessionService, Parser}; + #[derive(Getters, Setters, WithSetters)] #[getset(get = "pub", set = "pub", set_with = "pub")] pub struct ServerOptions {