Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ can connect using psql or language drivers to execute `SELECT` queries against
them.

```
datafusion-postgres 0.1.0
A postgres interface for datatfusion. Serve any CSV/JSON/Arrow files as tables.
datafusion-postgres 0.4.0
A postgres interface for datafusion. Serve any CSV/JSON/Arrow files as tables.

USAGE:
datafusion-postgres [OPTIONS]
datafusion-postgres-cli [OPTIONS]

FLAGS:
-h, --help Prints help information
Expand All @@ -67,8 +67,11 @@ OPTIONS:
--arrow <arrow-tables>... Arrow files to register as table, using syntax `table_name:file_path`
--avro <avro-tables>... Avro files to register as table, using syntax `table_name:file_path`
--csv <csv-tables>... CSV files to register as table, using syntax `table_name:file_path`
-d, --dir <directory> Directory to serve, all supported files will be registered as tables
--host <host> Host address the server listens to, default to 127.0.0.1 [default: 127.0.0.1]
--json <json-tables>... JSON files to register as table, using syntax `table_name:file_path`
--parquet <parquet-tables>... Parquet files to register as table, using syntax `table_name:file_path`
-p <port> Port the server listens to, default to 5432 [default: 5432]
```

For example, we use this command to host `ETTm1.csv` dataset as table `ettm1`.
Expand Down
83 changes: 79 additions & 4 deletions datafusion-postgres-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::ffi::OsStr;
use std::fs;

use datafusion::execution::options::{
ArrowReadOptions, AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
};
Expand Down Expand Up @@ -26,6 +29,9 @@ struct Opt {
/// Avro files to register as table, using syntax `table_name:file_path`
#[structopt(long("avro"))]
avro_tables: Vec<String>,
/// Directory to serve, all supported files will be registered as tables
#[structopt(long("dir"), short("d"))]
directory: Option<String>,
/// Port the server listens to, default to 5432
#[structopt(short, default_value = "5432")]
port: u16,
Expand All @@ -40,12 +46,69 @@ fn parse_table_def(table_def: &str) -> (&str, &str) {
.expect("Use this pattern to register table: table_name:file_path")
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let opts = Opt::from_args();
impl Opt {
fn include_directory_files(&mut self) {
if let Some(directory) = &self.directory {
if let Ok(entries) = fs::read_dir(directory) {
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}

let session_context = SessionContext::new();
if let Some(ext) = path.extension().and_then(OsStr::to_str) {
let ext_lower = ext.to_lowercase();
if let Some(base_name) = path.file_stem().and_then(|s| s.to_str()) {
match ext_lower.as_ref() {
"json" => {
self.json_tables.push(format!(
"{}:{}",
base_name,
path.to_string_lossy()
));
}
"avro" => {
self.avro_tables.push(format!(
"{}:{}",
base_name,
path.to_string_lossy()
));
}
"parquet" => {
self.parquet_tables.push(format!(
"{}:{}",
base_name,
path.to_string_lossy()
));
}
"csv" => {
self.csv_tables.push(format!(
"{}:{}",
base_name,
path.to_string_lossy()
));
}
"arrow" => {
self.arrow_tables.push(format!(
"{}:{}",
base_name,
path.to_string_lossy()
));
}
_ => {}
}
}
}
}
}
}
}
}

async fn setup_session_context(
session_context: &SessionContext,
opts: &Opt,
) -> Result<(), Box<dyn std::error::Error>> {
// Register CSV tables
for (table_name, table_path) in opts.csv_tables.iter().map(|s| parse_table_def(s.as_ref())) {
session_context
Expand Down Expand Up @@ -99,6 +162,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Loaded {} as table {}", table_path, table_name);
}

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut opts = Opt::from_args();
opts.include_directory_files();

let session_context = SessionContext::new();

setup_session_context(&session_context, &opts).await?;

let server_options = ServerOptions::new()
.with_host(opts.host)
.with_port(opts.port);
Expand Down
5 changes: 3 additions & 2 deletions datafusion-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ mod encoder;
mod handlers;
mod information_schema;

pub use handlers::{DfSessionService, HandlerFactory, Parser};

use std::sync::Arc;

use datafusion::prelude::SessionContext;
use getset::{Getters, Setters, WithSetters};
use pgwire::tokio::process_socket;
use tokio::net::TcpListener;

use handlers::HandlerFactory;
pub use handlers::{DfSessionService, Parser};

#[derive(Getters, Setters, WithSetters)]
#[getset(get = "pub", set = "pub", set_with = "pub")]
pub struct ServerOptions {
Expand Down
Loading