Skip to content

Commit

Permalink
Give the full server struct to the FDW when instanciating it.
Browse files Browse the repository at this point in the history
  • Loading branch information
remilapeyre committed Aug 5, 2024
1 parent 43d17dc commit 97c57bd
Show file tree
Hide file tree
Showing 16 changed files with 88 additions and 63 deletions.
27 changes: 25 additions & 2 deletions supabase-wrappers/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,40 @@
use std::collections::HashMap;
use std::ffi::CStr;

use crate::prelude::*;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::*;

pub struct ForeignServer {
pub server_name: String,
pub server_type: String,
pub server_version: String,
pub options: HashMap<String, String>
}

// create a fdw instance from its id
pub(super) unsafe fn create_fdw_instance_from_server_id<
E: Into<ErrorReport>,
W: ForeignDataWrapper<E>,
>(
fserver_id: pg_sys::Oid,
) -> W {
let to_string = |raw: *mut std::ffi::c_char| -> String {
let c_str = CStr::from_ptr(raw);
c_str.to_str().map_err(|_| {
OptionsError::OptionValueIsInvalidUtf8(
String::from_utf8_lossy(c_str.to_bytes()).to_string(),
)
}).report_unwrap().to_string()
};
let fserver = pg_sys::GetForeignServer(fserver_id);
let fserver_opts = options_to_hashmap((*fserver).options).report_unwrap();
let wrapper = W::new(&fserver_opts);
let server = ForeignServer {
server_name: to_string((*fserver).servername),
server_type: to_string((*fserver).servertype),
server_version: to_string((*fserver).serverversion),
options: options_to_hashmap((*fserver).options).report_unwrap(),
};
let wrapper = W::new(server);
wrapper.report_unwrap()
}

Expand Down
3 changes: 2 additions & 1 deletion supabase-wrappers/src/interface.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Provides interface types and trait to develop Postgres foreign data wrapper
//!
use crate::instance::ForeignServer;
use crate::FdwRoutine;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::{Date, Timestamp, TimestampWithTimeZone};
Expand Down Expand Up @@ -502,7 +503,7 @@ pub trait ForeignDataWrapper<E: Into<ErrorReport>> {
/// You can do any initalization in this function, like saving connection
/// info or API url in an variable, but don't do heavy works like database
/// connection or API call.
fn new(options: &HashMap<String, String>) -> Result<Self, E>
fn new(server: ForeignServer) -> Result<Self, E>
where
Self: Sized;

Expand Down
1 change: 1 addition & 0 deletions supabase-wrappers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ pub mod utils;
/// The prelude includes all necessary imports to make Wrappers work
pub mod prelude {
pub use crate::import_foreign_schema::*;
pub use crate::instance::ForeignServer;
pub use crate::interface::*;
pub use crate::options::*;
pub use crate::utils::*;
Expand Down
8 changes: 4 additions & 4 deletions wrappers/src/fdw/airtable_fdw/airtable_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ impl AirtableFdw {

// TODO Add support for INSERT, UPDATE, DELETE
impl ForeignDataWrapper<AirtableFdwError> for AirtableFdw {
fn new(options: &HashMap<String, String>) -> AirtableFdwResult<Self> {
let base_url = options
fn new(server: ForeignServer) -> AirtableFdwResult<Self> {
let base_url = server.options
.get("api_url")
.map(|t| t.to_owned())
.unwrap_or_else(|| "https://api.airtable.com/v0".to_string());

let client = match options.get("api_key") {
let client = match server.options.get("api_key") {
Some(api_key) => Some(create_client(api_key)?),
None => {
let key_id = require_option("api_key_id", options)?;
let key_id = require_option("api_key_id", &server.options)?;
if let Some(api_key) = get_vault_secret(key_id) {
Some(create_client(&api_key)?)
} else {
Expand Down
8 changes: 4 additions & 4 deletions wrappers/src/fdw/auth0_fdw/auth0_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ impl ForeignDataWrapper<Auth0FdwError> for Auth0Fdw {
// info or API url in an variable, but don't do any heavy works like making a
// database connection or API call.

fn new(options: &HashMap<String, String>) -> Result<Self, Auth0FdwError> {
let url = require_option("url", options)?.to_string();
let api_key = if let Some(api_key) = options.get("api_key") {
fn new(server: ForeignServer) -> Result<Self, Auth0FdwError> {
let url = require_option("url", &server.options)?.to_string();
let api_key = if let Some(api_key) = server.options.get("api_key") {
api_key.clone()
} else {
let api_key_id = options
let api_key_id = server.options
.get("api_key_id")
.expect("`api_key_id` must be set if `api_key` is not");
get_vault_secret(api_key_id).ok_or(Auth0FdwError::SecretNotFound(api_key_id.clone()))?
Expand Down
14 changes: 7 additions & 7 deletions wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ impl BigQueryFdw {
}

impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
fn new(options: &HashMap<String, String>) -> Result<Self, BigQueryFdwError> {
fn new(server: ForeignServer) -> Result<Self, BigQueryFdwError> {
let mut ret = BigQueryFdw {
rt: create_async_runtime()?,
client: None,
project_id: require_option("project_id", options)?.to_string(),
dataset_id: require_option("dataset_id", options)?.to_string(),
project_id: require_option("project_id", &server.options)?.to_string(),
dataset_id: require_option("dataset_id", &server.options)?.to_string(),
table: "".to_string(),
rowid_col: "".to_string(),
tgt_cols: Vec::new(),
Expand All @@ -160,13 +160,13 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
};

// Is authentication mocked
let mock_auth: bool = options
let mock_auth: bool = server.options
.get("mock_auth")
.map(|t| t.to_owned())
.unwrap_or_else(|| "false".to_string())
== *"true";

let api_endpoint = options
let api_endpoint = server.options
.get("api_endpoint")
.map(|t| t.to_owned())
.unwrap_or_else(|| "https://bigquery.googleapis.com/bigquery/v2".to_string());
Expand All @@ -182,10 +182,10 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
serde_json::to_string_pretty(&dummy_auth_config)
.expect("dummy auth config should not fail to serialize")
}
false => match options.get("sa_key") {
false => match server.options.get("sa_key") {
Some(sa_key) => sa_key.to_owned(),
None => {
let sa_key_id = require_option("sa_key_id", options)?;
let sa_key_id = require_option("sa_key_id", &server.options)?;
match get_vault_secret(sa_key_id) {
Some(sa_key) => sa_key,
None => return Ok(ret),
Expand Down
6 changes: 3 additions & 3 deletions wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ impl ClickHouseFdw {
}

impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {
fn new(options: &HashMap<String, String>) -> ClickHouseFdwResult<Self> {
fn new(server: ForeignServer) -> ClickHouseFdwResult<Self> {
let rt = create_async_runtime()?;
let conn_str = match options.get("conn_string") {
let conn_str = match server.options.get("conn_string") {
Some(conn_str) => conn_str.to_owned(),
None => {
let conn_str_id = require_option("conn_string_id", options)?;
let conn_str_id = require_option("conn_string_id", &server.options)?;
get_vault_secret(conn_str_id).unwrap_or_default()
}
};
Expand Down
14 changes: 7 additions & 7 deletions wrappers/src/fdw/cognito_fdw/cognito_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,16 @@ impl ForeignDataWrapper<CognitoFdwError> for CognitoFdw {
// info or API url in an variable, but don't do any heavy works like making a
// database connection or API call.

fn new(options: &HashMap<String, String>) -> Result<Self, CognitoFdwError> {
let user_pool_id = require_option("user_pool_id", options)?.to_string();
let aws_region = require_option("region", options)?.to_string();
fn new(server: ForeignServer) -> Result<Self, CognitoFdwError> {
let user_pool_id = require_option("user_pool_id", &server.options)?.to_string();
let aws_region = require_option("region", &server.options)?.to_string();

let aws_access_key_id = require_option("aws_access_key_id", options)?.to_string();
let aws_access_key_id = require_option("aws_access_key_id", &server.options)?.to_string();
let aws_secret_access_key =
if let Some(aws_secret_access_key) = options.get("aws_secret_access_key") {
if let Some(aws_secret_access_key) = server.options.get("aws_secret_access_key") {
aws_secret_access_key.clone()
} else {
let aws_secret_access_key = options
let aws_secret_access_key = server.options
.get("api_key_id")
.expect("`api_key_id` must be set if `aws_secret_access_key` is not");
get_vault_secret(aws_secret_access_key).ok_or(CognitoFdwError::SecretNotFound(
Expand All @@ -122,7 +122,7 @@ impl ForeignDataWrapper<CognitoFdwError> for CognitoFdw {
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;

let mut builder = config.to_builder();
if let Some(endpoint_url) = options.get("endpoint_url") {
if let Some(endpoint_url) = server.options.get("endpoint_url") {
if !endpoint_url.is_empty() {
builder.set_endpoint_url(Some(endpoint_url.clone()));
}
Expand Down
10 changes: 5 additions & 5 deletions wrappers/src/fdw/firebase_fdw/firebase_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,23 +211,23 @@ impl FirebaseFdw {
}

impl ForeignDataWrapper<FirebaseFdwError> for FirebaseFdw {
fn new(options: &HashMap<String, String>) -> FirebaseFdwResult<Self> {
fn new(server: ForeignServer) -> FirebaseFdwResult<Self> {
let mut ret = Self {
rt: create_async_runtime()?,
project_id: require_option("project_id", options)?.to_string(),
project_id: require_option("project_id", &server.options)?.to_string(),
client: None,
scan_result: Vec::default(),
};

// get oauth2 access token if it is directly defined in options
let token = if let Some(access_token) = options.get("access_token") {
let token = if let Some(access_token) = server.options.get("access_token") {
access_token.to_owned()
} else {
// otherwise, get it from the options or Vault
let sa_key = match options.get("sa_key") {
let sa_key = match server.options.get("sa_key") {
Some(sa_key) => sa_key.to_owned(),
None => {
let sa_key_id = require_option("sa_key_id", options)?;
let sa_key_id = require_option("sa_key_id", &server.options)?;
match get_vault_secret(sa_key_id) {
Some(sa_key) => sa_key,
None => return Ok(ret),
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/fdw/helloworld_fdw/helloworld_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl ForeignDataWrapper<HelloWorldFdwError> for HelloWorldFdw {
// You can do any initalization in this new() function, like saving connection
// info or API url in an variable, but don't do any heavy works like making a
// database connection or API call.
fn new(_options: &HashMap<String, String>) -> HelloWorldFdwResult<Self> {
fn new(_server: ForeignServer) -> HelloWorldFdwResult<Self> {
Ok(Self {
row_cnt: 0,
tgt_cols: Vec::new(),
Expand Down
8 changes: 4 additions & 4 deletions wrappers/src/fdw/logflare_fdw/logflare_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ impl LogflareFdw {
}

impl ForeignDataWrapper<LogflareFdwError> for LogflareFdw {
fn new(options: &HashMap<String, String>) -> LogflareFdwResult<Self> {
let base_url = options
fn new(server: ForeignServer) -> LogflareFdwResult<Self> {
let base_url = server.options
.get("api_url")
.map(|t| t.to_owned())
.map(|s| {
Expand All @@ -205,10 +205,10 @@ impl ForeignDataWrapper<LogflareFdwError> for LogflareFdw {
}
})
.unwrap_or_else(|| LogflareFdw::BASE_URL.to_string());
let client = match options.get("api_key") {
let client = match server.options.get("api_key") {
Some(api_key) => Some(create_client(api_key)),
None => {
let key_id = require_option("api_key_id", options)?;
let key_id = require_option("api_key_id", &server.options)?;
get_vault_secret(key_id).map(|api_key| create_client(&api_key))
}
}
Expand Down
6 changes: 3 additions & 3 deletions wrappers/src/fdw/mssql_fdw/mssql_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ impl MssqlFdw {
}

impl ForeignDataWrapper<MssqlFdwError> for MssqlFdw {
fn new(options: &HashMap<String, String>) -> MssqlFdwResult<Self> {
fn new(server: ForeignServer) -> MssqlFdwResult<Self> {
let rt = create_async_runtime()?;
let conn_str = match options.get("conn_string") {
let conn_str = match server.options.get("conn_string") {
Some(conn_str) => conn_str.to_owned(),
None => {
let conn_str_id = require_option("conn_string_id", options)?;
let conn_str_id = require_option("conn_string_id", &server.options)?;
get_vault_secret(conn_str_id).unwrap_or_default()
}
};
Expand Down
6 changes: 3 additions & 3 deletions wrappers/src/fdw/redis_fdw/redis_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,11 @@ impl RedisFdw {
}

impl ForeignDataWrapper<RedisFdwError> for RedisFdw {
fn new(options: &HashMap<String, String>) -> RedisFdwResult<Self> {
let conn_url = match options.get("conn_url") {
fn new(server: ForeignServer) -> RedisFdwResult<Self> {
let conn_url = match server.options.get("conn_url") {
Some(url) => url.to_owned(),
None => {
let conn_url_id = require_option("conn_url_id", options)?;
let conn_url_id = require_option("conn_url_id", &server.options)?;
get_vault_secret(conn_url_id).unwrap_or_default()
}
};
Expand Down
16 changes: 8 additions & 8 deletions wrappers/src/fdw/s3_fdw/s3_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl S3Fdw {
}

impl ForeignDataWrapper<S3FdwError> for S3Fdw {
fn new(options: &HashMap<String, String>) -> S3FdwResult<Self> {
fn new(server: ForeignServer) -> S3FdwResult<Self> {
// cannot use create_async_runtime() as the runtime needs to be created
// for multiple threads
let rt = tokio::runtime::Runtime::new()
Expand All @@ -128,27 +128,27 @@ impl ForeignDataWrapper<S3FdwError> for S3Fdw {
};

// get is_mock flag
let is_mock: bool = options.get("is_mock") == Some(&"true".to_string());
let is_mock: bool = server.options.get("is_mock") == Some(&"true".to_string());

// get credentials
let creds = if is_mock {
// LocalStack uses hardcoded credentials
Some(("test".to_string(), "test".to_string()))
} else {
match options.get("vault_access_key_id") {
match server.options.get("vault_access_key_id") {
Some(vault_access_key_id) => {
// if using credentials stored in Vault
let vault_secret_access_key =
require_option("vault_secret_access_key", options)?;
require_option("vault_secret_access_key", &server.options)?;
get_vault_secret(vault_access_key_id)
.zip(get_vault_secret(vault_secret_access_key))
}
None => {
// if using credentials directly specified
let aws_access_key_id =
require_option("aws_access_key_id", options)?.to_string();
require_option("aws_access_key_id", &server.options)?.to_string();
let aws_secret_access_key =
require_option("aws_secret_access_key", options)?.to_string();
require_option("aws_secret_access_key", &server.options)?.to_string();
Some((aws_access_key_id, aws_secret_access_key))
}
}
Expand All @@ -163,7 +163,7 @@ impl ForeignDataWrapper<S3FdwError> for S3Fdw {
let region = if is_mock {
default_region
} else {
options
server.options
.get("aws_region")
.map(|t| t.to_owned())
.unwrap_or(default_region)
Expand All @@ -177,7 +177,7 @@ impl ForeignDataWrapper<S3FdwError> for S3Fdw {
let mut config_loader = aws_config::defaults(BehaviorVersion::latest());

// endpoint_url not supported as env var in rust https://github.com/awslabs/aws-sdk-rust/issues/932
if let Some(endpoint_url) = options.get("endpoint_url") {
if let Some(endpoint_url) = server.options.get("endpoint_url") {
config_loader = config_loader.endpoint_url(endpoint_url);
}

Expand Down
10 changes: 5 additions & 5 deletions wrappers/src/fdw/stripe_fdw/stripe_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,8 @@ impl StripeFdw {
}

impl ForeignDataWrapper<StripeFdwError> for StripeFdw {
fn new(options: &HashMap<String, String>) -> StripeFdwResult<Self> {
let base_url = options
fn new(server: ForeignServer) -> StripeFdwResult<Self> {
let base_url = server.options
.get("api_url")
.map(|t| t.to_owned())
// Ensure trailing slash is always present, otherwise /v1 will get obliterated when
Expand All @@ -634,11 +634,11 @@ impl ForeignDataWrapper<StripeFdwError> for StripeFdw {
}
})
.unwrap_or_else(|| "https://api.stripe.com/v1/".to_string());
let api_version = options.get("api_version").map(|t| t.as_str());
let client = match options.get("api_key") {
let api_version = server.options.get("api_version").map(|t| t.as_str());
let client = match server.options.get("api_key") {
Some(api_key) => Some(create_client(api_key, api_version)),
None => {
let key_id = require_option("api_key_id", options)?;
let key_id = require_option("api_key_id", &server.options)?;
get_vault_secret(key_id).map(|api_key| create_client(&api_key, api_version))
}
}
Expand Down
Loading

0 comments on commit 97c57bd

Please sign in to comment.