diff --git a/README.md b/README.md index d9a6357..6d767d4 100644 --- a/README.md +++ b/README.md @@ -2,51 +2,56 @@ ![Crates.io Version](https://img.shields.io/crates/v/datafusion-postgres?label=datafusion-postgres) -Serving any [datafusion](https://datafusion.apache.org) `SessionContext` with full PostgreSQL compatibility, including authentication, role-based access control, and SSL/TLS encryption. Available as a library and a CLI tool. - -This project adds a comprehensive [PostgreSQL compatible access layer](https://github.com/sunng87/pgwire) to the [Apache DataFusion](https://github.com/apache/arrow-datafusion) query engine, making it a drop-in replacement for PostgreSQL in analytics workloads. +A PostgreSQL-compatible server for [Apache DataFusion](https://datafusion.apache.org), supporting authentication, role-based access control, and SSL/TLS encryption. Available as both a library and CLI tool. +Built on [pgwire](https://github.com/sunng87/pgwire) to provide PostgreSQL wire protocol compatibility for analytical workloads. It was originally an example of the [pgwire](https://github.com/sunng87/pgwire) project. ## ✨ Key Features - 🔌 **Full PostgreSQL Wire Protocol** - Compatible with all PostgreSQL clients and drivers -- 🛡️ **Enterprise Security** - Authentication, RBAC, and SSL/TLS encryption +- 🛡️ **Security Features** - Authentication, RBAC, and SSL/TLS encryption - 🏗️ **Complete System Catalogs** - Real `pg_catalog` tables with accurate metadata - 📊 **Advanced Data Types** - Comprehensive Arrow ↔ PostgreSQL type mapping -- 🔄 **Transaction Support** - Full ACID transaction lifecycle (BEGIN/COMMIT/ROLLBACK) +- 🔄 **Transaction Support** - ACID transaction lifecycle (BEGIN/COMMIT/ROLLBACK) - ⚡ **High Performance** - Apache DataFusion's columnar query execution -## 🎯 Roadmap & Status - -- [x] **Core Features** - - [x] datafusion-postgres as a CLI tool - - [x] datafusion-postgres as a library - - [x] datafusion information schema - - [x] Complete `pg_catalog` system tables (pg_type, pg_attribute, pg_proc, pg_class, etc.) - - [x] Comprehensive Arrow ↔ PostgreSQL data type mapping - - [x] Essential PostgreSQL functions (version(), current_schema(), has_table_privilege(), etc.) - -- [x] **Security & Authentication** 🆕 - - [x] User authentication and management - - [x] Role-based access control (RBAC) - - [x] Granular permissions (SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, etc.) - - [x] Role inheritance and grant management - - [x] SSL/TLS connection encryption - - [x] Query-level permission checking - -- [x] **Transaction Support** 🆕 - - [x] Full ACID transaction lifecycle - - [x] BEGIN/COMMIT/ROLLBACK with all variants - - [x] Failed transaction handling and recovery - - [x] Transaction state management - -- [ ] **Future Enhancements** - - [ ] Connection pooling and performance optimizations - - [ ] Advanced authentication methods (SCRAM, LDAP) - - [ ] More PostgreSQL functions and operators - - [ ] COPY protocol for bulk data loading +## 🎯 Features + +### Core Functionality +- ✅ Library and CLI tool +- ✅ PostgreSQL wire protocol compatibility +- ✅ Complete `pg_catalog` system tables +- ✅ Arrow ↔ PostgreSQL data type mapping +- ✅ PostgreSQL functions (version, current_schema, has_table_privilege, etc.) + +### Security & Authentication +- ✅ User authentication and RBAC +- ✅ Granular permissions (SELECT, INSERT, UPDATE, DELETE, CREATE, DROP) +- ✅ Role inheritance and grant management +- ✅ SSL/TLS encryption +- ✅ Query-level permission checking + +### Transaction Support +- ✅ ACID transaction lifecycle +- ✅ BEGIN/COMMIT/ROLLBACK with all variants +- ✅ Failed transaction handling and recovery + +### Future Enhancements +- ⏳ Connection pooling optimizations +- ⏳ Advanced authentication (LDAP, certificates) +- ⏳ COPY protocol for bulk data loading + +## 🔐 Authentication + +Supports standard pgwire authentication methods: + +- **Cleartext**: `CleartextStartupHandler` for simple password authentication +- **MD5**: `MD5StartupHandler` for MD5-hashed passwords +- **SCRAM**: `SASLScramAuthStartupHandler` for secure authentication + +See `auth.rs` for complete implementation examples using `DfAuthSource`. ## 🚀 Quick Start @@ -91,12 +96,11 @@ serve(session_context, &server_options).await ### The CLI `datafusion-postgres-cli` -As a command-line application, this tool serves any JSON/CSV/Arrow/Parquet/Avro -files as tables, and exposes them via PostgreSQL compatible protocol with full security features. +Command-line tool to serve JSON/CSV/Arrow/Parquet/Avro files as PostgreSQL-compatible tables. ``` datafusion-postgres-cli 0.6.1 -A secure postgres interface for datafusion. Serve any CSV/JSON/Arrow/Parquet files as tables. +A PostgreSQL interface for DataFusion. Serve CSV/JSON/Arrow/Parquet files as tables. USAGE: datafusion-postgres-cli [OPTIONS] @@ -149,6 +153,8 @@ Listening on 127.0.0.1:5432 (unencrypted) ### Connect with psql +> **🔐 Authentication**: The default setup allows connections without authentication for development. For secure deployments, use `DfAuthSource` with standard pgwire authentication handlers (cleartext, MD5, or SCRAM). See `auth.rs` for implementation examples. + ```bash psql -h 127.0.0.1 -p 5432 -U postgres ``` diff --git a/arrow-pg/src/encoder.rs b/arrow-pg/src/encoder.rs index 519e772..2de82f4 100644 --- a/arrow-pg/src/encoder.rs +++ b/arrow-pg/src/encoder.rs @@ -262,7 +262,7 @@ fn get_numeric_128_value( let value = array.value(idx); Decimal::try_from_i128_with_scale(value, scale) .map_err(|e| { - let message = match e { + let error_code = match e { rust_decimal::Error::ExceedsMaximumPossibleValue => { "22003" // numeric_value_out_of_range } @@ -280,8 +280,8 @@ fn get_numeric_128_value( }; PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_string(), - message.to_string(), - format!("Numeric value conversion failed: {e:?}"), + error_code.to_string(), + format!("Numeric value conversion failed: {e}"), ))) }) .map(Some) diff --git a/datafusion-postgres/Cargo.toml b/datafusion-postgres/Cargo.toml index 52b7ef9..2b9f753 100644 --- a/datafusion-postgres/Cargo.toml +++ b/datafusion-postgres/Cargo.toml @@ -25,6 +25,6 @@ pgwire = { workspace = true, features = ["server-api-ring", "scram"] } postgres-types.workspace = true rust_decimal.workspace = true tokio = { version = "1.45", features = ["sync", "net"] } -tokio-rustls = "0.26" +tokio-rustls = { version = "0.26", features = ["ring"] } rustls-pemfile = "2.0" rustls-pki-types = "1.0" diff --git a/datafusion-postgres/src/auth.rs b/datafusion-postgres/src/auth.rs index e26912c..88cbd4f 100644 --- a/datafusion-postgres/src/auth.rs +++ b/datafusion-postgres/src/auth.rs @@ -2,15 +2,8 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; -use futures::sink::Sink; -use pgwire::api::auth::{ - finish_authentication, save_startup_parameters_to_metadata, AuthSource, - DefaultServerParameterProvider, LoginInfo, Password, StartupHandler, -}; -use pgwire::api::ClientInfo; +use pgwire::api::auth::{AuthSource, LoginInfo, Password}; use pgwire::error::{PgWireError, PgWireResult}; -use pgwire::messages::{PgWireBackendMessage, PgWireFrontendMessage}; -use std::fmt::Debug; use tokio::sync::RwLock; /// User information stored in the authentication system @@ -575,67 +568,97 @@ impl AuthManager { } } -/// Custom startup handler that performs authentication -pub struct AuthStartupHandler { - auth_manager: Arc, +/// AuthSource implementation for integration with pgwire authentication +/// Provides proper password-based authentication instead of custom startup handler +#[derive(Clone)] +pub struct DfAuthSource { + pub auth_manager: Arc, } -impl AuthStartupHandler { +impl DfAuthSource { pub fn new(auth_manager: Arc) -> Self { - AuthStartupHandler { auth_manager } + DfAuthSource { auth_manager } } } #[async_trait] -impl StartupHandler for AuthStartupHandler { - async fn on_startup( - &self, - client: &mut C, - message: PgWireFrontendMessage, - ) -> PgWireResult<()> - where - C: ClientInfo + Sink + Unpin + Send, - C::Error: Debug, - PgWireError: From<>::Error>, - { - if let PgWireFrontendMessage::Startup(ref startup) = message { - save_startup_parameters_to_metadata(client, startup); - - // Extract username from startup message - let username = startup - .parameters - .get("user") - .unwrap_or(&"anonymous".to_string()) - .clone(); - - // For now, we'll do basic authentication - // In a full implementation, this would involve password authentication - let is_authenticated = if username == "postgres" { - // Always allow postgres user for compatibility - true +impl AuthSource for DfAuthSource { + async fn get_password(&self, login: &LoginInfo) -> PgWireResult { + if let Some(username) = login.user() { + // Check if user exists in our RBAC system + if let Some(user) = self.auth_manager.get_user(username).await { + if user.can_login { + // Return the stored password hash for authentication + // The pgwire authentication handlers (cleartext/md5/scram) will + // handle the actual password verification process + Ok(Password::new(None, user.password_hash.into_bytes())) + } else { + Err(PgWireError::UserError(Box::new( + pgwire::error::ErrorInfo::new( + "FATAL".to_string(), + "28000".to_string(), // invalid_authorization_specification + format!("User \"{username}\" is not allowed to login"), + ), + ))) + } } else { - // Check if user exists in our system - self.auth_manager.get_user(&username).await.is_some() - }; - - if !is_authenticated { - return Err(PgWireError::UserError(Box::new( + Err(PgWireError::UserError(Box::new( pgwire::error::ErrorInfo::new( "FATAL".to_string(), "28P01".to_string(), // invalid_password format!("password authentication failed for user \"{username}\""), ), - ))); + ))) } - - // Complete authentication process - finish_authentication(client, &DefaultServerParameterProvider::default()).await?; + } else { + Err(PgWireError::UserError(Box::new( + pgwire::error::ErrorInfo::new( + "FATAL".to_string(), + "28P01".to_string(), // invalid_password + "No username provided in login request".to_string(), + ), + ))) } - - Ok(()) } } +// REMOVED: Custom startup handler approach +// +// Instead of implementing a custom StartupHandler, use the proper pgwire authentication: +// +// For cleartext authentication: +// ```rust +// use pgwire::api::auth::cleartext::CleartextStartupHandler; +// +// let auth_source = Arc::new(DfAuthSource::new(auth_manager)); +// let authenticator = CleartextStartupHandler::new( +// auth_source, +// Arc::new(DefaultServerParameterProvider::default()) +// ); +// ``` +// +// For MD5 authentication: +// ```rust +// use pgwire::api::auth::md5::MD5StartupHandler; +// +// let auth_source = Arc::new(DfAuthSource::new(auth_manager)); +// let authenticator = MD5StartupHandler::new( +// auth_source, +// Arc::new(DefaultServerParameterProvider::default()) +// ); +// ``` +// +// For SCRAM authentication (requires "server-api-scram" feature): +// ```rust +// use pgwire::api::auth::scram::SASLScramAuthStartupHandler; +// +// let auth_source = Arc::new(DfAuthSource::new(auth_manager)); +// let authenticator = SASLScramAuthStartupHandler::new( +// auth_source, +// Arc::new(DefaultServerParameterProvider::default()) +// ); +// ``` + /// Simple AuthSource implementation that accepts any user with empty password pub struct SimpleAuthSource { auth_manager: Arc, diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index 8fde81e..00d88fe 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -1,11 +1,12 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::auth::{AuthManager, AuthStartupHandler, Permission, ResourceType}; +use crate::auth::{AuthManager, Permission, ResourceType}; use async_trait::async_trait; use datafusion::arrow::datatypes::DataType; use datafusion::logical_expr::LogicalPlan; use datafusion::prelude::*; +use pgwire::api::auth::noop::NoopStartupHandler; use pgwire::api::copy::NoopCopyHandler; use pgwire::api::portal::{Format, Portal}; use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler}; @@ -29,24 +30,27 @@ pub enum TransactionState { Failed, } +/// Simple startup handler that does no authentication +/// For production, use DfAuthSource with proper pgwire authentication handlers +pub struct SimpleStartupHandler; + +#[async_trait::async_trait] +impl NoopStartupHandler for SimpleStartupHandler {} + pub struct HandlerFactory { pub session_service: Arc, - pub auth_handler: Arc, } impl HandlerFactory { pub fn new(session_context: Arc, auth_manager: Arc) -> Self { let session_service = Arc::new(DfSessionService::new(session_context, auth_manager.clone())); - HandlerFactory { - session_service, - auth_handler: Arc::new(AuthStartupHandler::new(auth_manager)), - } + HandlerFactory { session_service } } } impl PgWireServerHandlers for HandlerFactory { - type StartupHandler = AuthStartupHandler; + type StartupHandler = SimpleStartupHandler; type SimpleQueryHandler = DfSessionService; type ExtendedQueryHandler = DfSessionService; type CopyHandler = NoopCopyHandler; @@ -61,7 +65,7 @@ impl PgWireServerHandlers for HandlerFactory { } fn startup_handler(&self) -> Arc { - self.auth_handler.clone() + Arc::new(SimpleStartupHandler) } fn copy_handler(&self) -> Arc { @@ -226,18 +230,20 @@ impl DfSessionService { &self, query_lower: &str, ) -> PgWireResult>> { + // Transaction handling based on pgwire example: + // https://github.com/sunng87/pgwire/blob/master/examples/transaction.rs#L57 match query_lower.trim() { "begin" | "begin transaction" | "begin work" | "start transaction" => { let mut state = self.transaction_state.lock().await; match *state { TransactionState::None => { *state = TransactionState::Active; - Ok(Some(Response::Execution(Tag::new("BEGIN")))) + Ok(Some(Response::TransactionStart(Tag::new("BEGIN")))) } TransactionState::Active => { // Already in transaction, PostgreSQL allows this but issues a warning // For simplicity, we'll just return BEGIN again - Ok(Some(Response::Execution(Tag::new("BEGIN")))) + Ok(Some(Response::TransactionStart(Tag::new("BEGIN")))) } TransactionState::Failed => { // Can't start new transaction from failed state @@ -256,23 +262,23 @@ impl DfSessionService { match *state { TransactionState::Active => { *state = TransactionState::None; - Ok(Some(Response::Execution(Tag::new("COMMIT")))) + Ok(Some(Response::TransactionEnd(Tag::new("COMMIT")))) } TransactionState::None => { // PostgreSQL allows COMMIT outside transaction with warning - Ok(Some(Response::Execution(Tag::new("COMMIT")))) + Ok(Some(Response::TransactionEnd(Tag::new("COMMIT")))) } TransactionState::Failed => { // COMMIT in failed transaction is treated as ROLLBACK *state = TransactionState::None; - Ok(Some(Response::Execution(Tag::new("ROLLBACK")))) + Ok(Some(Response::TransactionEnd(Tag::new("ROLLBACK")))) } } } "rollback" | "rollback transaction" | "rollback work" | "abort" => { let mut state = self.transaction_state.lock().await; *state = TransactionState::None; - Ok(Some(Response::Execution(Tag::new("ROLLBACK")))) + Ok(Some(Response::TransactionEnd(Tag::new("ROLLBACK")))) } _ => Ok(None), } diff --git a/datafusion-postgres/src/lib.rs b/datafusion-postgres/src/lib.rs index e96fa8b..b933606 100644 --- a/datafusion-postgres/src/lib.rs +++ b/datafusion-postgres/src/lib.rs @@ -111,14 +111,14 @@ pub async fn serve( // Accept incoming connections loop { match listener.accept().await { - Ok((socket, addr)) => { + Ok((socket, _addr)) => { let factory_ref = factory.clone(); let tls_acceptor_ref = tls_acceptor.clone(); - println!("Accepted connection from {addr}"); + // Connection accepted from {addr} - log appropriately based on your logging strategy tokio::spawn(async move { - if let Err(e) = process_socket(socket, tls_acceptor_ref, factory_ref).await { - eprintln!("Error processing socket: {e}"); + if let Err(_e) = process_socket(socket, tls_acceptor_ref, factory_ref).await { + // Log error or handle appropriately based on your logging strategy } }); }