Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 53 additions & 6 deletions datafusion-postgres/src/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const PG_CATALOG_TABLE_PG_DATABASE: &str = "pg_database";
const PG_CATALOG_TABLE_PG_AM: &str = "pg_am";
const PG_CATALOG_TABLE_PG_RANGE: &str = "pg_range";
const PG_CATALOG_TABLE_PG_ENUM: &str = "pg_enum";
const PG_CATALOG_TABLE_PG_DESCRIPTION: &str = "pg_description";

/// Determine PostgreSQL table type (relkind) from DataFusion TableProvider
fn get_table_type(table: &Arc<dyn TableProvider>) -> &'static str {
Expand Down Expand Up @@ -72,6 +73,7 @@ pub const PG_CATALOG_TABLES: &[&str] = &[
PG_CATALOG_TABLE_PG_AM,
PG_CATALOG_TABLE_PG_RANGE,
PG_CATALOG_TABLE_PG_ENUM,
PG_CATALOG_TABLE_PG_DESCRIPTION,
];

// Data structure to hold pg_type table data
Expand Down Expand Up @@ -281,6 +283,7 @@ impl SchemaProvider for PgCatalogSchemaProvider {
PG_CATALOG_TABLE_PG_PROC => Ok(Some(self.create_pg_proc_table())),
PG_CATALOG_TABLE_PG_RANGE => Ok(Some(self.create_pg_range_table())),
PG_CATALOG_TABLE_PG_ENUM => Ok(Some(self.create_pg_enum_table())),
PG_CATALOG_TABLE_PG_DESCRIPTION => Ok(Some(self.create_pg_description_table())),
_ => Ok(None),
}
}
Expand Down Expand Up @@ -782,6 +785,18 @@ impl PgCatalogSchemaProvider {
Arc::new(provider)
}

/// Create a mock empty table for pg_description
fn create_pg_description_table(&self) -> Arc<dyn TableProvider> {
let schema = Arc::new(Schema::new(vec![
Field::new("objoid", DataType::Int32, false), // Oid
Field::new("classoid", DataType::Int32, false), // Oid of the obj class
Field::new("objsubid", DataType::Int32, false), // subid
Field::new("description", DataType::Utf8, false),
]));
let provider = MemTable::try_new(schema, vec![vec![]]).unwrap();
Arc::new(provider)
}

/// Create a populated pg_proc table with standard PostgreSQL functions
fn create_pg_proc_table(&self) -> Arc<dyn TableProvider> {
// Define complete schema for pg_proc (matching PostgreSQL)
Expand Down Expand Up @@ -1927,13 +1942,16 @@ pub fn create_has_table_privilege_3param_udf() -> ScalarUDF {
// Define the function implementation for 3-parameter version
let func = move |args: &[ColumnarValue]| {
let args = ColumnarValue::values_to_arrays(args)?;
let _user = &args[0]; // User (can be name or OID)
let user = &args[0]; // User (can be name or OID)
let _table = &args[1]; // Table (can be name or OID)
let _privilege = &args[2]; // Privilege type (SELECT, INSERT, etc.)

// For now, always return true (full access)
let mut builder = BooleanArray::builder(1);
builder.append_value(true);
let mut builder = BooleanArray::builder(user.len());
for _ in 0..user.len() {
builder.append_value(true);
}

let array: ArrayRef = Arc::new(builder.finish());

Ok(ColumnarValue::Array(array))
Expand All @@ -1953,12 +1971,14 @@ pub fn create_has_table_privilege_2param_udf() -> ScalarUDF {
// Define the function implementation for 2-parameter version (current user, table, privilege)
let func = move |args: &[ColumnarValue]| {
let args = ColumnarValue::values_to_arrays(args)?;
let _table = &args[0]; // Table (can be name or OID)
let table = &args[0]; // Table (can be name or OID)
let _privilege = &args[1]; // Privilege type (SELECT, INSERT, etc.)

// For now, always return true (full access for current user)
let mut builder = BooleanArray::builder(1);
builder.append_value(true);
let mut builder = BooleanArray::builder(table.len());
for _ in 0..table.len() {
builder.append_value(true);
}
let array: ArrayRef = Arc::new(builder.finish());

Ok(ColumnarValue::Array(array))
Expand All @@ -1974,6 +1994,32 @@ pub fn create_has_table_privilege_2param_udf() -> ScalarUDF {
)
}

pub fn create_format_type_udf() -> ScalarUDF {
let func = move |args: &[ColumnarValue]| {
let args = ColumnarValue::values_to_arrays(args)?;
let type_oids = &args[0]; // Table (can be name or OID)
let _type_mods = &args[1]; // Privilege type (SELECT, INSERT, etc.)

// For now, always return true (full access for current user)
let mut builder = StringBuilder::new();
for _ in 0..type_oids.len() {
builder.append_value("???");
}

let array: ArrayRef = Arc::new(builder.finish());

Ok(ColumnarValue::Array(array))
};

create_udf(
"format_type",
vec![DataType::Int32, DataType::Int32],
DataType::Utf8,
Volatility::Stable,
Arc::new(func),
)
}

/// Install pg_catalog and postgres UDFs to current `SessionContext`
pub fn setup_pg_catalog(
session_context: &SessionContext,
Expand All @@ -1995,6 +2041,7 @@ pub fn setup_pg_catalog(
session_context.register_udf(create_pg_get_userbyid_udf());
session_context.register_udf(create_has_table_privilege_2param_udf());
session_context.register_udf(create_pg_table_is_visible());
session_context.register_udf(create_format_type_udf());

Ok(())
}
Loading