Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 43 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,56 @@

![Crates.io Version](https://img.shields.io/crates/v/datafusion-postgres?label=datafusion-postgres)

Serving any [datafusion](https://datafusion.apache.org) `SessionContext` with full PostgreSQL compatibility, including authentication, role-based access control, and SSL/TLS encryption. Available as a library and a CLI tool.

This project adds a comprehensive [PostgreSQL compatible access layer](https://github.com/sunng87/pgwire) to the [Apache DataFusion](https://github.com/apache/arrow-datafusion) query engine, making it a drop-in replacement for PostgreSQL in analytics workloads.
A PostgreSQL-compatible server for [Apache DataFusion](https://datafusion.apache.org), supporting authentication, role-based access control, and SSL/TLS encryption. Available as both a library and CLI tool.

Built on [pgwire](https://github.com/sunng87/pgwire) to provide PostgreSQL wire protocol compatibility for analytical workloads.
It was originally an example of the [pgwire](https://github.com/sunng87/pgwire)
project.

## ✨ Key Features

- 🔌 **Full PostgreSQL Wire Protocol** - Compatible with all PostgreSQL clients and drivers
- 🛡️ **Enterprise Security** - Authentication, RBAC, and SSL/TLS encryption
- 🛡️ **Security Features** - Authentication, RBAC, and SSL/TLS encryption
- 🏗️ **Complete System Catalogs** - Real `pg_catalog` tables with accurate metadata
- 📊 **Advanced Data Types** - Comprehensive Arrow ↔ PostgreSQL type mapping
- 🔄 **Transaction Support** - Full ACID transaction lifecycle (BEGIN/COMMIT/ROLLBACK)
- 🔄 **Transaction Support** - ACID transaction lifecycle (BEGIN/COMMIT/ROLLBACK)
- ⚡ **High Performance** - Apache DataFusion's columnar query execution

## 🎯 Roadmap & Status

- [x] **Core Features**
- [x] datafusion-postgres as a CLI tool
- [x] datafusion-postgres as a library
- [x] datafusion information schema
- [x] Complete `pg_catalog` system tables (pg_type, pg_attribute, pg_proc, pg_class, etc.)
- [x] Comprehensive Arrow ↔ PostgreSQL data type mapping
- [x] Essential PostgreSQL functions (version(), current_schema(), has_table_privilege(), etc.)

- [x] **Security & Authentication** 🆕
- [x] User authentication and management
- [x] Role-based access control (RBAC)
- [x] Granular permissions (SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, etc.)
- [x] Role inheritance and grant management
- [x] SSL/TLS connection encryption
- [x] Query-level permission checking

- [x] **Transaction Support** 🆕
- [x] Full ACID transaction lifecycle
- [x] BEGIN/COMMIT/ROLLBACK with all variants
- [x] Failed transaction handling and recovery
- [x] Transaction state management

- [ ] **Future Enhancements**
- [ ] Connection pooling and performance optimizations
- [ ] Advanced authentication methods (SCRAM, LDAP)
- [ ] More PostgreSQL functions and operators
- [ ] COPY protocol for bulk data loading
## 🎯 Features

### Core Functionality
- ✅ Library and CLI tool
- ✅ PostgreSQL wire protocol compatibility
- ✅ Complete `pg_catalog` system tables
- ✅ Arrow ↔ PostgreSQL data type mapping
- ✅ PostgreSQL functions (version, current_schema, has_table_privilege, etc.)

### Security & Authentication
- ✅ User authentication and RBAC
- ✅ Granular permissions (SELECT, INSERT, UPDATE, DELETE, CREATE, DROP)
- ✅ Role inheritance and grant management
- ✅ SSL/TLS encryption
- ✅ Query-level permission checking

### Transaction Support
- ✅ ACID transaction lifecycle
- ✅ BEGIN/COMMIT/ROLLBACK with all variants
- ✅ Failed transaction handling and recovery

### Future Enhancements
- ⏳ Connection pooling optimizations
- ⏳ Advanced authentication (LDAP, certificates)
- ⏳ COPY protocol for bulk data loading

## 🔐 Authentication

Supports standard pgwire authentication methods:

- **Cleartext**: `CleartextStartupHandler` for simple password authentication
- **MD5**: `MD5StartupHandler` for MD5-hashed passwords
- **SCRAM**: `SASLScramAuthStartupHandler` for secure authentication

See `auth.rs` for complete implementation examples using `DfAuthSource`.

## 🚀 Quick Start

Expand Down Expand Up @@ -91,12 +96,11 @@ serve(session_context, &server_options).await

### The CLI `datafusion-postgres-cli`

As a command-line application, this tool serves any JSON/CSV/Arrow/Parquet/Avro
files as tables, and exposes them via PostgreSQL compatible protocol with full security features.
Command-line tool to serve JSON/CSV/Arrow/Parquet/Avro files as PostgreSQL-compatible tables.

```
datafusion-postgres-cli 0.6.1
A secure postgres interface for datafusion. Serve any CSV/JSON/Arrow/Parquet files as tables.
A PostgreSQL interface for DataFusion. Serve CSV/JSON/Arrow/Parquet files as tables.

USAGE:
datafusion-postgres-cli [OPTIONS]
Expand Down Expand Up @@ -149,6 +153,8 @@ Listening on 127.0.0.1:5432 (unencrypted)

### Connect with psql

> **🔐 Authentication**: The default setup allows connections without authentication for development. For secure deployments, use `DfAuthSource` with standard pgwire authentication handlers (cleartext, MD5, or SCRAM). See `auth.rs` for implementation examples.

```bash
psql -h 127.0.0.1 -p 5432 -U postgres
```
Expand Down
6 changes: 3 additions & 3 deletions arrow-pg/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ fn get_numeric_128_value(
let value = array.value(idx);
Decimal::try_from_i128_with_scale(value, scale)
.map_err(|e| {
let message = match e {
let error_code = match e {
rust_decimal::Error::ExceedsMaximumPossibleValue => {
"22003" // numeric_value_out_of_range
}
Expand All @@ -280,8 +280,8 @@ fn get_numeric_128_value(
};
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_string(),
message.to_string(),
format!("Numeric value conversion failed: {e:?}"),
error_code.to_string(),
format!("Numeric value conversion failed: {e}"),
)))
})
.map(Some)
Expand Down
2 changes: 1 addition & 1 deletion datafusion-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ pgwire = { workspace = true, features = ["server-api-ring", "scram"] }
postgres-types.workspace = true
rust_decimal.workspace = true
tokio = { version = "1.45", features = ["sync", "net"] }
tokio-rustls = "0.26"
tokio-rustls = { version = "0.26", features = ["ring"] }
rustls-pemfile = "2.0"
rustls-pki-types = "1.0"
125 changes: 74 additions & 51 deletions datafusion-postgres/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,8 @@ use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use futures::sink::Sink;
use pgwire::api::auth::{
finish_authentication, save_startup_parameters_to_metadata, AuthSource,
DefaultServerParameterProvider, LoginInfo, Password, StartupHandler,
};
use pgwire::api::ClientInfo;
use pgwire::api::auth::{AuthSource, LoginInfo, Password};
use pgwire::error::{PgWireError, PgWireResult};
use pgwire::messages::{PgWireBackendMessage, PgWireFrontendMessage};
use std::fmt::Debug;
use tokio::sync::RwLock;

/// User information stored in the authentication system
Expand Down Expand Up @@ -575,67 +568,97 @@ impl AuthManager {
}
}

/// Custom startup handler that performs authentication
pub struct AuthStartupHandler {
auth_manager: Arc<AuthManager>,
/// AuthSource implementation for integration with pgwire authentication
/// Provides proper password-based authentication instead of custom startup handler
#[derive(Clone)]
pub struct DfAuthSource {
pub auth_manager: Arc<AuthManager>,
}

impl AuthStartupHandler {
impl DfAuthSource {
pub fn new(auth_manager: Arc<AuthManager>) -> Self {
AuthStartupHandler { auth_manager }
DfAuthSource { auth_manager }
}
}

#[async_trait]
impl StartupHandler for AuthStartupHandler {
async fn on_startup<C>(
&self,
client: &mut C,
message: PgWireFrontendMessage,
) -> PgWireResult<()>
where
C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send,
C::Error: Debug,
PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
{
if let PgWireFrontendMessage::Startup(ref startup) = message {
save_startup_parameters_to_metadata(client, startup);

// Extract username from startup message
let username = startup
.parameters
.get("user")
.unwrap_or(&"anonymous".to_string())
.clone();

// For now, we'll do basic authentication
// In a full implementation, this would involve password authentication
let is_authenticated = if username == "postgres" {
// Always allow postgres user for compatibility
true
impl AuthSource for DfAuthSource {
async fn get_password(&self, login: &LoginInfo) -> PgWireResult<Password> {
if let Some(username) = login.user() {
// Check if user exists in our RBAC system
if let Some(user) = self.auth_manager.get_user(username).await {
if user.can_login {
// Return the stored password hash for authentication
// The pgwire authentication handlers (cleartext/md5/scram) will
// handle the actual password verification process
Ok(Password::new(None, user.password_hash.into_bytes()))
} else {
Err(PgWireError::UserError(Box::new(
pgwire::error::ErrorInfo::new(
"FATAL".to_string(),
"28000".to_string(), // invalid_authorization_specification
format!("User \"{username}\" is not allowed to login"),
),
)))
}
} else {
// Check if user exists in our system
self.auth_manager.get_user(&username).await.is_some()
};

if !is_authenticated {
return Err(PgWireError::UserError(Box::new(
Err(PgWireError::UserError(Box::new(
pgwire::error::ErrorInfo::new(
"FATAL".to_string(),
"28P01".to_string(), // invalid_password
format!("password authentication failed for user \"{username}\""),
),
)));
)))
}

// Complete authentication process
finish_authentication(client, &DefaultServerParameterProvider::default()).await?;
} else {
Err(PgWireError::UserError(Box::new(
pgwire::error::ErrorInfo::new(
"FATAL".to_string(),
"28P01".to_string(), // invalid_password
"No username provided in login request".to_string(),
),
)))
}

Ok(())
}
}

// REMOVED: Custom startup handler approach
//
// Instead of implementing a custom StartupHandler, use the proper pgwire authentication:
//
// For cleartext authentication:
// ```rust
// use pgwire::api::auth::cleartext::CleartextStartupHandler;
//
// let auth_source = Arc::new(DfAuthSource::new(auth_manager));
// let authenticator = CleartextStartupHandler::new(
// auth_source,
// Arc::new(DefaultServerParameterProvider::default())
// );
// ```
//
// For MD5 authentication:
// ```rust
// use pgwire::api::auth::md5::MD5StartupHandler;
//
// let auth_source = Arc::new(DfAuthSource::new(auth_manager));
// let authenticator = MD5StartupHandler::new(
// auth_source,
// Arc::new(DefaultServerParameterProvider::default())
// );
// ```
//
// For SCRAM authentication (requires "server-api-scram" feature):
// ```rust
// use pgwire::api::auth::scram::SASLScramAuthStartupHandler;
//
// let auth_source = Arc::new(DfAuthSource::new(auth_manager));
// let authenticator = SASLScramAuthStartupHandler::new(
// auth_source,
// Arc::new(DefaultServerParameterProvider::default())
// );
// ```

/// Simple AuthSource implementation that accepts any user with empty password
pub struct SimpleAuthSource {
auth_manager: Arc<AuthManager>,
Expand Down
Loading
Loading