diff --git a/supabase-wrappers/src/instance.rs b/supabase-wrappers/src/instance.rs index 6ee800f2..61542fbf 100644 --- a/supabase-wrappers/src/instance.rs +++ b/supabase-wrappers/src/instance.rs @@ -1,16 +1,14 @@ use crate::prelude::*; -use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::*; -use super::utils; - // create a fdw instance pub(super) unsafe fn create_fdw_instance, W: ForeignDataWrapper>( ftable_id: pg_sys::Oid, ) -> W { let ftable = pg_sys::GetForeignTable(ftable_id); let fserver = pg_sys::GetForeignServer((*ftable).serverid); - let fserver_opts = utils::options_to_hashmap((*fserver).options); + let fserver_opts = options_to_hashmap((*fserver).options).report_unwrap(); let wrapper = W::new(&fserver_opts); - wrapper.map_err(|e| e.into()).report() + wrapper.report_unwrap() } diff --git a/supabase-wrappers/src/lib.rs b/supabase-wrappers/src/lib.rs index aef3d24c..c85a03fe 100644 --- a/supabase-wrappers/src/lib.rs +++ b/supabase-wrappers/src/lib.rs @@ -288,11 +288,13 @@ //! - [Logflare](https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/logflare_fdw): A FDW for [Logflare](https://logflare.app/) which supports data read only. pub mod interface; +pub mod options; pub mod utils; /// The prelude includes all necessary imports to make Wrappers work pub mod prelude { pub use crate::interface::*; + pub use crate::options::*; pub use crate::utils::*; pub use crate::wrappers_fdw; pub use ::tokio::runtime::Runtime; diff --git a/supabase-wrappers/src/modify.rs b/supabase-wrappers/src/modify.rs index fd90406b..130b5f4a 100644 --- a/supabase-wrappers/src/modify.rs +++ b/supabase-wrappers/src/modify.rs @@ -1,4 +1,4 @@ -use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::{ debug2, memcxt::PgMemoryContexts, pg_sys::Oid, prelude::*, rel::PgRelation, tupdesc::PgTupleDesc, FromDatum, PgSqlErrorCode, @@ -81,12 +81,8 @@ pub(super) extern "C" fn add_foreign_update_targets( unsafe { // get rowid column name from table options let ftable = pg_sys::GetForeignTable((*target_relation).rd_id); - let opts = utils::options_to_hashmap((*ftable).options); - let rowid_name = if let Some(name) = require_option("rowid_column", &opts) { - name - } else { - return; - }; + let opts = options_to_hashmap((*ftable).options).report_unwrap(); + let rowid_name = require_option("rowid_column", &opts).report_unwrap(); // find rowid attribute let tup_desc = PgTupleDesc::from_pg_copy((*target_relation).rd_att); @@ -139,7 +135,7 @@ pub(super) extern "C" fn plan_foreign_modify, W: ForeignDat // get rowid column name from table options let ftable = pg_sys::GetForeignTable(rel.oid()); - let opts = utils::options_to_hashmap((*ftable).options); + let opts = options_to_hashmap((*ftable).options).report_unwrap(); let rowid_name = opts.get("rowid_column"); if rowid_name.is_none() { report_error( @@ -208,7 +204,7 @@ pub(super) extern "C" fn begin_foreign_modify, W: ForeignDa state.rowid_attno = pg_sys::ExecFindJunkAttributeInTlist((*subplan).targetlist, rowid_name_c); - state.begin_modify().map_err(|e| e.into()).report(); + state.begin_modify().report_unwrap(); (*rinfo).ri_FdwState = state.into_pg() as _; } @@ -228,7 +224,7 @@ pub(super) extern "C" fn exec_foreign_insert, W: ForeignDat ); let row = utils::tuple_table_slot_to_row(slot); - state.insert(&row).map_err(|e| e.into()).report(); + state.insert(&row).report_unwrap(); } slot @@ -258,7 +254,7 @@ pub(super) extern "C" fn exec_foreign_delete, W: ForeignDat let cell = get_rowid_cell(&state, plan_slot); if let Some(rowid) = cell { - state.delete(&rowid).map_err(|e| e.into()).report(); + state.delete(&rowid).report_unwrap(); } } @@ -292,10 +288,7 @@ pub(super) extern "C" fn exec_foreign_update, W: ForeignDat }) && state.rowid_name != col.as_str() }); - state - .update(&rowid, &new_row) - .map_err(|e| e.into()) - .report(); + state.update(&rowid, &new_row).report_unwrap(); } } @@ -312,7 +305,7 @@ pub(super) extern "C" fn end_foreign_modify, W: ForeignData let fdw_state = (*rinfo).ri_FdwState as *mut FdwModifyState; if !fdw_state.is_null() { let mut state = PgBox::>::from_pg(fdw_state); - state.end_modify().map_err(|e| e.into()).report(); + state.end_modify().report_unwrap(); } } } diff --git a/supabase-wrappers/src/options.rs b/supabase-wrappers/src/options.rs new file mode 100644 index 00000000..5ef417cf --- /dev/null +++ b/supabase-wrappers/src/options.rs @@ -0,0 +1,121 @@ +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::{pg_sys, PgList, PgSqlErrorCode}; +use std::collections::HashMap; +use std::ffi::CStr; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum OptionsError { + #[error("required option `{0}` is not specified")] + OptionNameNotFound(String), + #[error("option name `{0}` is not a valid UTF-8 string")] + OptionNameIsInvalidUtf8(String), + #[error("option value `{0}` is not a valid UTF-8 string")] + OptionValueIsInvalidUtf8(String), +} + +impl From for ErrorReport { + fn from(value: OptionsError) -> Self { + let error_message = format!("{value}"); + match value { + OptionsError::OptionNameNotFound(_) => ErrorReport::new( + PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND, + error_message, + "", + ), + OptionsError::OptionNameIsInvalidUtf8(_) + | OptionsError::OptionValueIsInvalidUtf8(_) => ErrorReport::new( + PgSqlErrorCode::ERRCODE_FDW_INVALID_STRING_FORMAT, + error_message, + "", + ), + } + } +} + +/// Get required option value from the `options` map +/// +/// Get the required option's value from `options` map, return None and report +/// error and stop current transaction if it does not exist. +/// +/// For example, +/// +/// ```rust,no_run +/// # use supabase_wrappers::prelude::require_option; +/// # use std::collections::HashMap; +/// # use supabase_wrappers::options::OptionsError; +/// # fn main() -> Result<(), OptionsError> { +/// # let options = &HashMap::new(); +/// require_option("my_option", options)?; +/// # Ok(()) +/// # } +/// ``` +pub fn require_option<'map>( + opt_name: &str, + options: &'map HashMap, +) -> Result<&'map str, OptionsError> { + options + .get(opt_name) + .map(|t| t.as_ref()) + .ok_or(OptionsError::OptionNameNotFound(opt_name.to_string())) +} + +/// Get required option value from the `options` map or a provided default +/// +/// Get the required option's value from `options` map, return default if it does not exist. +/// +/// For example, +/// +/// ```rust,no_run +/// # use supabase_wrappers::prelude::require_option_or; +/// # use std::collections::HashMap; +/// # let options = &HashMap::new(); +/// require_option_or("my_option", options, "default value"); +/// ``` +pub fn require_option_or<'a>( + opt_name: &str, + options: &'a HashMap, + default: &'a str, +) -> &'a str { + options.get(opt_name).map(|t| t.as_ref()).unwrap_or(default) +} + +/// Check if the option list contains a specific option, used in [validator](crate::interface::ForeignDataWrapper::validator) +pub fn check_options_contain(opt_list: &[Option], tgt: &str) -> Result<(), OptionsError> { + let search_key = tgt.to_owned() + "="; + if !opt_list.iter().any(|opt| { + if let Some(s) = opt { + s.starts_with(&search_key) + } else { + false + } + }) { + Err(OptionsError::OptionNameNotFound(tgt.to_string())) + } else { + Ok(()) + } +} + +// convert options definition to hashmap +pub(super) unsafe fn options_to_hashmap( + options: *mut pg_sys::List, +) -> Result, OptionsError> { + let mut ret = HashMap::new(); + let options: PgList = PgList::from_pg(options); + for option in options.iter_ptr() { + let name = CStr::from_ptr((*option).defname); + let value = CStr::from_ptr(pg_sys::defGetString(option)); + let name = name.to_str().map_err(|_| { + OptionsError::OptionNameIsInvalidUtf8( + String::from_utf8_lossy(name.to_bytes()).to_string(), + ) + })?; + let value = value.to_str().map_err(|_| { + OptionsError::OptionValueIsInvalidUtf8( + String::from_utf8_lossy(value.to_bytes()).to_string(), + ) + })?; + ret.insert(name.to_string(), value.to_string()); + } + Ok(ret) +} diff --git a/supabase-wrappers/src/scan.rs b/supabase-wrappers/src/scan.rs index 431f6859..82a7fd8f 100644 --- a/supabase-wrappers/src/scan.rs +++ b/supabase-wrappers/src/scan.rs @@ -6,7 +6,7 @@ use pgrx::{ use std::collections::HashMap; use std::marker::PhantomData; -use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; +use pgrx::pg_sys::panic::ErrorReport; use std::os::raw::c_int; use std::ptr; @@ -14,11 +14,12 @@ use crate::instance; use crate::interface::{Cell, Column, Limit, Qual, Row, Sort, Value}; use crate::limit::*; use crate::memctx; +use crate::options::options_to_hashmap; use crate::polyfill; use crate::prelude::ForeignDataWrapper; use crate::qual::*; use crate::sort::*; -use crate::utils::{self, report_error, SerdeList}; +use crate::utils::{self, report_error, ReportableError, SerdeList}; // Fdw private state for scan struct FdwState, W: ForeignDataWrapper> { @@ -137,10 +138,10 @@ pub(super) extern "C" fn get_foreign_rel_size, W: ForeignDa // get foreign table options let ftable = pg_sys::GetForeignTable(foreigntableid); - state.opts = utils::options_to_hashmap((*ftable).options); + state.opts = options_to_hashmap((*ftable).options).report_unwrap(); // get estimate row count and mean row width - let (rows, width) = state.get_rel_size().map_err(|e| e.into()).report(); + let (rows, width) = state.get_rel_size().report_unwrap(); (*baserel).rows = rows as f64; (*(*baserel).reltarget).width = width; @@ -305,7 +306,7 @@ pub(super) extern "C" fn begin_foreign_scan, W: ForeignData // begin scan if it is not EXPLAIN statement if eflags & pg_sys::EXEC_FLAG_EXPLAIN_ONLY as c_int <= 0 { - state.begin_scan().map_err(|e| e.into()).report(); + state.begin_scan().report_unwrap(); let rel = scan_state.ss_currentRelation; let tup_desc = (*rel).rd_att; @@ -336,7 +337,7 @@ pub(super) extern "C" fn iterate_foreign_scan, W: ForeignDa polyfill::exec_clear_tuple(slot); state.row.clear(); - if state.iter_scan().map_err(|e| e.into()).report().is_some() { + if state.iter_scan().report_unwrap().is_some() { if state.row.cols.len() != state.tgts.len() { report_error( PgSqlErrorCode::ERRCODE_FDW_INVALID_COLUMN_NUMBER, @@ -375,7 +376,7 @@ pub(super) extern "C" fn re_scan_foreign_scan, W: ForeignDa let fdw_state = (*node).fdw_state as *mut FdwState; if !fdw_state.is_null() { let mut state = PgBox::>::from_pg(fdw_state); - state.re_scan().map_err(|e| e.into()).report(); + state.re_scan().report_unwrap(); } } } @@ -392,6 +393,6 @@ pub(super) extern "C" fn end_foreign_scan, W: ForeignDataWr } let mut state = PgBox::>::from_pg(fdw_state); - state.end_scan().map_err(|e| e.into()).report(); + state.end_scan().report_unwrap(); } } diff --git a/supabase-wrappers/src/utils.rs b/supabase-wrappers/src/utils.rs index 5abc6c77..d5a6abf0 100644 --- a/supabase-wrappers/src/utils.rs +++ b/supabase-wrappers/src/utils.rs @@ -2,12 +2,11 @@ //! use crate::interface::{Cell, Column, Row}; -use pgrx::pg_sys::panic::ErrorReport; +use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; use pgrx::prelude::PgBuiltInOids; use pgrx::spi::Spi; use pgrx::IntoDatum; use pgrx::*; -use std::collections::HashMap; use std::ffi::CStr; use std::num::NonZeroUsize; use std::ptr; @@ -155,52 +154,6 @@ pub fn create_async_runtime() -> Result { Ok(Builder::new_current_thread().enable_all().build()?) } -/// Get required option value from the `options` map -/// -/// Get the required option's value from `options` map, return None and report -/// error and stop current transaction if it does not exist. -/// -/// For example, -/// -/// ```rust,no_run -/// # use supabase_wrappers::prelude::require_option; -/// # use std::collections::HashMap; -/// # let options = &HashMap::new(); -/// require_option("my_option", options); -/// ``` -pub fn require_option(opt_name: &str, options: &HashMap) -> Option { - options.get(opt_name).map(|t| t.to_owned()).or_else(|| { - report_error( - PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND, - &format!("required option \"{}\" is not specified", opt_name), - ); - None - }) -} - -/// Get required option value from the `options` map or a provided default -/// -/// Get the required option's value from `options` map, return default if it does not exist. -/// -/// For example, -/// -/// ```rust,no_run -/// # use supabase_wrappers::prelude::require_option_or; -/// # use std::collections::HashMap; -/// # let options = &HashMap::new(); -/// require_option_or("my_option", options, "default value".to_string()); -/// ``` -pub fn require_option_or( - opt_name: &str, - options: &HashMap, - default: String, -) -> String { - options - .get(opt_name) - .map(|t| t.to_owned()) - .unwrap_or(default) -} - /// Get decrypted secret from Vault /// /// Get decrypted secret as string from Vault. Vault is an extension for storing @@ -236,21 +189,6 @@ pub fn get_vault_secret(secret_id: &str) -> Option { } } -// convert options definition to hashmap -pub(super) unsafe fn options_to_hashmap(options: *mut pg_sys::List) -> HashMap { - let mut ret = HashMap::new(); - let options: PgList = PgList::from_pg(options); - for option in options.iter_ptr() { - let name = CStr::from_ptr((*option).defname); - let value = CStr::from_ptr(pg_sys::defGetString(option)); - ret.insert( - name.to_str().unwrap().to_owned(), - value.to_str().unwrap().to_owned(), - ); - } - ret -} - pub(super) unsafe fn tuple_table_slot_to_row(slot: *mut pg_sys::TupleTableSlot) -> Row { let tup_desc = PgTupleDesc::from_pg_copy((*slot).tts_tupleDescriptor); @@ -327,23 +265,6 @@ pub(super) unsafe fn extract_target_columns( ret } -/// Check if the option list contains a specific option, used in [validator](crate::interface::ForeignDataWrapper::validator) -pub fn check_options_contain(opt_list: &[Option], tgt: &str) { - let search_key = tgt.to_owned() + "="; - if !opt_list.iter().any(|opt| { - if let Some(s) = opt { - s.starts_with(&search_key) - } else { - false - } - }) { - report_error( - PgSqlErrorCode::ERRCODE_FDW_OPTION_NAME_NOT_FOUND, - &format!("required option \"{}\" is not specified", tgt), - ); - } -} - // trait for "serialize" and "deserialize" state from specified memory context, // so that it is safe to be carried between the planning and the execution pub(super) trait SerdeList { @@ -386,3 +307,17 @@ pub(super) trait SerdeList { PgBox::::from_pg(ptr as _) } } + +pub(crate) trait ReportableError { + type Output; + + fn report_unwrap(self) -> Self::Output; +} + +impl> ReportableError for Result { + type Output = T; + + fn report_unwrap(self) -> Self::Output { + self.map_err(|e| e.into()).report() + } +} diff --git a/wrappers/Cargo.toml b/wrappers/Cargo.toml index 8e7cf8ba..90de9c8d 100644 --- a/wrappers/Cargo.toml +++ b/wrappers/Cargo.toml @@ -24,7 +24,7 @@ firebase_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", s3_fdw = [ "reqwest", "reqwest-middleware", "reqwest-retry", "aws-config", "aws-sdk-s3", "tokio", "tokio-util", "csv", "async-compression", "serde_json", - "http", "parquet", "futures", "arrow-array", "chrono" + "http", "parquet", "futures", "arrow-array", "chrono", "thiserror" ] airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url", "thiserror"] logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror"] diff --git a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs index 613dda63..c1d97ca1 100644 --- a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs +++ b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use url::Url; use supabase_wrappers::prelude::*; +use thiserror::Error; use super::result::AirtableResponse; use super::{AirtableFdwError, AirtableFdwResult}; @@ -98,9 +99,10 @@ impl ForeignDataWrapper for AirtableFdw { let client = match options.get("api_key") { Some(api_key) => Some(create_client(api_key)), - None => require_option("api_key_id", options) - .and_then(|key_id| get_vault_secret(&key_id)) - .map(|api_key| create_client(&api_key)), + None => { + let key_id = require_option("api_key_id", options)?; + get_vault_secret(&key_id).map(|api_key| create_client(&api_key)) + } }; stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); @@ -121,14 +123,10 @@ impl ForeignDataWrapper for AirtableFdw { _limit: &Option, // TODO: maxRecords options: &HashMap, ) -> AirtableFdwResult<()> { - let url = if let Some(url) = require_option("base_id", options).and_then(|base_id| { - require_option("table_id", options) - .map(|table_id| self.build_url(&base_id, &table_id, options.get("view_id"))) - }) { - url - } else { - return Ok(()); - }; + let base_id = require_option("base_id", options)?; + let table_id = require_option("table_id", options)?; + let view_id = options.get("view_id"); + let url = self.build_url(&base_id, &table_id, view_id); let mut rows = Vec::new(); if let Some(client) = &self.client { @@ -189,8 +187,8 @@ impl ForeignDataWrapper for AirtableFdw { ) -> AirtableFdwResult<()> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "base_id"); - check_options_contain(&options, "table_id"); + check_options_contain(&options, "base_id")?; + check_options_contain(&options, "table_id")?; } } diff --git a/wrappers/src/fdw/airtable_fdw/mod.rs b/wrappers/src/fdw/airtable_fdw/mod.rs index 5a571e28..366a8ad6 100644 --- a/wrappers/src/fdw/airtable_fdw/mod.rs +++ b/wrappers/src/fdw/airtable_fdw/mod.rs @@ -6,7 +6,7 @@ use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::PgSqlErrorCode; use thiserror::Error; -use supabase_wrappers::prelude::CreateRuntimeError; +use supabase_wrappers::prelude::{CreateRuntimeError, OptionsError}; #[derive(Error, Debug)] enum AirtableFdwError { @@ -24,11 +24,18 @@ enum AirtableFdwError { #[error("request failed: {0}")] RequestError(#[from] reqwest_middleware::Error), + + #[error("{0}")] + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: AirtableFdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), "") + match value { + AirtableFdwError::CreateRuntimeError(e) => e.into(), + AirtableFdwError::OptionsError(e) => e.into(), + _ => ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), ""), + } } } diff --git a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs index 6987fa70..56dea395 100644 --- a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs +++ b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs @@ -165,12 +165,15 @@ impl BigQueryFdw { enum BigQueryFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: BigQueryFdwError) -> Self { match value { BigQueryFdwError::CreateRuntimeError(e) => e.into(), + BigQueryFdwError::OptionsError(e) => e.into(), } } } @@ -180,8 +183,8 @@ impl ForeignDataWrapper for BigQueryFdw { let mut ret = BigQueryFdw { rt: create_async_runtime()?, client: None, - project_id: "".to_string(), - dataset_id: "".to_string(), + project_id: require_option("project_id", options)?.to_string(), + dataset_id: require_option("dataset_id", options)?.to_string(), table: "".to_string(), rowid_col: "".to_string(), tgt_cols: Vec::new(), @@ -189,15 +192,6 @@ impl ForeignDataWrapper for BigQueryFdw { auth_mock: None, }; - let project_id = require_option("project_id", options); - let dataset_id = require_option("dataset_id", options); - - if project_id.is_none() || dataset_id.is_none() { - return Ok(ret); - } - ret.project_id = project_id.unwrap(); - ret.dataset_id = dataset_id.unwrap(); - // Is authentication mocked let mock_auth: bool = options .get("mock_auth") @@ -223,10 +217,7 @@ impl ForeignDataWrapper for BigQueryFdw { false => match options.get("sa_key") { Some(sa_key) => sa_key.to_owned(), None => { - let sa_key_id = match require_option("sa_key_id", options) { - Some(sa_key_id) => sa_key_id, - None => return Ok(ret), - }; + let sa_key_id = require_option("sa_key_id", options)?; match get_vault_secret(&sa_key_id) { Some(sa_key) => sa_key, None => return Ok(ret), @@ -285,11 +276,7 @@ impl ForeignDataWrapper for BigQueryFdw { limit: &Option, options: &HashMap, ) -> Result<(), BigQueryFdwError> { - let table = require_option("table", options); - if table.is_none() { - return Ok(()); - } - self.table = table.unwrap(); + self.table = require_option("table", options)?.to_string(); self.tgt_cols = columns.to_vec(); let location = options @@ -432,13 +419,8 @@ impl ForeignDataWrapper for BigQueryFdw { } fn begin_modify(&mut self, options: &HashMap) -> Result<(), BigQueryFdwError> { - let table = require_option("table", options); - let rowid_col = require_option("rowid_column", options); - if table.is_none() || rowid_col.is_none() { - return Ok(()); - } - self.table = table.unwrap(); - self.rowid_col = rowid_col.unwrap(); + self.table = require_option("table", options)?.to_string(); + self.rowid_col = require_option("rowid_column", options)?.to_string(); Ok(()) } diff --git a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs index 9d23f0ac..3417c478 100644 --- a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs +++ b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs @@ -203,12 +203,15 @@ impl ClickHouseFdw { enum ClickHouseFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: ClickHouseFdwError) -> Self { match value { ClickHouseFdwError::CreateRuntimeError(e) => e.into(), + ClickHouseFdwError::OptionsError(e) => e.into(), } } } @@ -218,9 +221,10 @@ impl ForeignDataWrapper for ClickHouseFdw { let rt = create_async_runtime()?; let conn_str = match options.get("conn_string") { Some(conn_str) => conn_str.to_owned(), - None => require_option("conn_string_id", options) - .and_then(|conn_str_id| get_vault_secret(&conn_str_id)) - .unwrap_or_default(), + None => { + let conn_str_id = require_option("conn_string_id", options)?; + get_vault_secret(&conn_str_id).unwrap_or_default() + } }; stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); @@ -248,11 +252,7 @@ impl ForeignDataWrapper for ClickHouseFdw { ) -> Result<(), ClickHouseFdwError> { self.create_client(); - let table = require_option("table", options); - if table.is_none() { - return Ok(()); - } - self.table = table.unwrap(); + self.table = require_option("table", options)?.to_string(); self.tgt_cols = columns.to_vec(); self.row_idx = 0; @@ -329,13 +329,8 @@ impl ForeignDataWrapper for ClickHouseFdw { ) -> Result<(), ClickHouseFdwError> { self.create_client(); - let table = require_option("table", options); - let rowid_col = require_option("rowid_column", options); - if table.is_none() || rowid_col.is_none() { - return Ok(()); - } - self.table = table.unwrap(); - self.rowid_col = rowid_col.unwrap(); + self.table = require_option("table", options)?.to_string(); + self.rowid_col = require_option("rowid_column", options)?.to_string(); Ok(()) } diff --git a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs index bbe007d1..b563a108 100644 --- a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs +++ b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs @@ -223,11 +223,8 @@ impl FirebaseFdw { // ref: https://firebase.google.com/docs/firestore/reference/rest/v1beta1/projects.databases.documents/listDocuments let re = Regex::new(r"^firestore/(?P[^/]+)").unwrap(); if let Some(caps) = re.captures(obj) { - let base_url = require_option_or( - "base_url", - options, - Self::DEFAULT_FIRESTORE_BASE_URL.to_owned(), - ); + let base_url = + require_option_or("base_url", options, Self::DEFAULT_FIRESTORE_BASE_URL); let collection = caps.name("collection").unwrap().as_str(); let mut ret = format!( "{}/{}/databases/(default)/documents/{}?pageSize={}", @@ -252,12 +249,15 @@ impl FirebaseFdw { enum FirebaseFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: FirebaseFdwError) -> Self { match value { FirebaseFdwError::CreateRuntimeError(e) => e.into(), + FirebaseFdwError::OptionsError(e) => e.into(), } } } @@ -266,16 +266,11 @@ impl ForeignDataWrapper for FirebaseFdw { fn new(options: &HashMap) -> Result { let mut ret = Self { rt: create_async_runtime()?, - project_id: "".to_string(), + project_id: require_option("project_id", options)?.to_string(), client: None, scan_result: None, }; - ret.project_id = match require_option("project_id", options) { - Some(project_id) => project_id, - None => return Ok(ret), - }; - // get oauth2 access token if it is directly defined in options let token = if let Some(access_token) = options.get("access_token") { access_token.to_owned() @@ -284,10 +279,7 @@ impl ForeignDataWrapper for FirebaseFdw { let sa_key = match options.get("sa_key") { Some(sa_key) => sa_key.to_owned(), None => { - let sa_key_id = match require_option("sa_key_id", options) { - Some(sa_key_id) => sa_key_id, - None => return Ok(ret), - }; + let sa_key_id = require_option("sa_key_id", options)?; match get_vault_secret(&sa_key_id) { Some(sa_key) => sa_key, None => return Ok(ret), @@ -330,10 +322,7 @@ impl ForeignDataWrapper for FirebaseFdw { _limit: &Option, options: &HashMap, ) -> Result<(), FirebaseFdwError> { - let obj = match require_option("object", options) { - Some(obj) => obj, - None => return Ok(()), - }; + let obj = require_option("object", options)?; let row_cnt_limit = options .get("limit") .map(|n| n.parse::().unwrap()) @@ -418,7 +407,7 @@ impl ForeignDataWrapper for FirebaseFdw { ) -> Result<(), FirebaseFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "object"); + check_options_contain(&options, "object")?; } } diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index 38b8e4e8..5e6647e3 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -220,12 +220,15 @@ impl LogflareFdw { enum LogflareFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: LogflareFdwError) -> Self { match value { LogflareFdwError::CreateRuntimeError(e) => e.into(), + LogflareFdwError::OptionsError(e) => e.into(), } } } @@ -245,9 +248,10 @@ impl ForeignDataWrapper for LogflareFdw { .unwrap_or_else(|| LogflareFdw::BASE_URL.to_string()); let client = match options.get("api_key") { Some(api_key) => Some(create_client(api_key)), - None => require_option("api_key_id", options) - .and_then(|key_id| get_vault_secret(&key_id)) - .map(|api_key| create_client(&api_key)), + None => { + let key_id = require_option("api_key_id", options)?; + get_vault_secret(&key_id).map(|api_key| create_client(&api_key)) + } }; stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); @@ -269,11 +273,7 @@ impl ForeignDataWrapper for LogflareFdw { _limit: &Option, options: &HashMap, ) -> Result<(), LogflareFdwError> { - let endpoint = if let Some(name) = require_option("endpoint", options) { - name - } else { - return Ok(()); - }; + let endpoint = require_option("endpoint", options)?; // extract params self.params = if let Some(params) = extract_params(quals) { @@ -356,7 +356,7 @@ impl ForeignDataWrapper for LogflareFdw { ) -> Result<(), LogflareFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "endpoint"); + check_options_contain(&options, "endpoint")?; } } diff --git a/wrappers/src/fdw/s3_fdw/s3_fdw.rs b/wrappers/src/fdw/s3_fdw/s3_fdw.rs index dc88313d..5cdcf4c4 100644 --- a/wrappers/src/fdw/s3_fdw/s3_fdw.rs +++ b/wrappers/src/fdw/s3_fdw/s3_fdw.rs @@ -10,6 +10,7 @@ use std::collections::{HashMap, VecDeque}; use std::env; use std::io::Cursor; use std::pin::Pin; +use thiserror::Error; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; use super::parquet::*; @@ -127,11 +128,17 @@ impl S3Fdw { } } -enum S3FdwError {} +#[derive(Error, Debug)] +enum S3FdwError { + #[error("{0}")] + OptionsError(#[from] OptionsError), +} impl From for ErrorReport { - fn from(_value: S3FdwError) -> Self { - ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + fn from(value: S3FdwError) -> Self { + match value { + S3FdwError::OptionsError(e) => e.into(), + } } } @@ -159,17 +166,18 @@ impl ForeignDataWrapper for S3Fdw { match options.get("vault_access_key_id") { Some(vault_access_key_id) => { // if using credentials stored in Vault - require_option("vault_secret_access_key", options).and_then( - |vault_secret_access_key| { - get_vault_secret(vault_access_key_id) - .zip(get_vault_secret(&vault_secret_access_key)) - }, - ) + let vault_secret_access_key = + require_option("vault_secret_access_key", options)?; + get_vault_secret(vault_access_key_id) + .zip(get_vault_secret(&vault_secret_access_key)) } None => { // if using credentials directly specified - require_option("aws_access_key_id", options) - .zip(require_option("aws_secret_access_key", options)) + let aws_access_key_id = + require_option("aws_access_key_id", options)?.to_string(); + let aws_secret_access_key = + require_option("aws_secret_access_key", options)?.to_string(); + Some((aws_access_key_id, aws_secret_access_key)) } } }; @@ -221,7 +229,8 @@ impl ForeignDataWrapper for S3Fdw { options: &HashMap, ) -> Result<(), S3FdwError> { // extract s3 bucket and object path from uri option - let (bucket, object) = if let Some(uri) = require_option("uri", options) { + let (bucket, object) = { + let uri = require_option("uri", options)?; match uri.parse::() { Ok(uri) => { if uri.scheme_str() != Option::Some("s3") @@ -245,8 +254,6 @@ impl ForeignDataWrapper for S3Fdw { return Ok(()); } } - } else { - return Ok(()); }; let has_header: bool = options.get("has_header") == Some(&"true".to_string()); @@ -255,28 +262,23 @@ impl ForeignDataWrapper for S3Fdw { if let Some(client) = &self.client { // initialise parser according to format option - if let Some(format) = require_option("format", options) { - // create dummy parser - match format.as_str() { - "csv" => { - self.parser = Parser::Csv(csv::Reader::from_reader(Cursor::new(vec![0]))) - } - "jsonl" => self.parser = Parser::JsonLine(VecDeque::new()), - "parquet" => self.parser = Parser::Parquet(S3Parquet::default()), - _ => { - report_error( - PgSqlErrorCode::ERRCODE_FDW_ERROR, - &format!( - "invalid format option: {}, it can only be 'csv', 'jsonl' or 'parquet'", - format - ), - ); - return Ok(()); - } + let format = require_option("format", options)?; + // create dummy parser + match format { + "csv" => self.parser = Parser::Csv(csv::Reader::from_reader(Cursor::new(vec![0]))), + "jsonl" => self.parser = Parser::JsonLine(VecDeque::new()), + "parquet" => self.parser = Parser::Parquet(S3Parquet::default()), + _ => { + report_error( + PgSqlErrorCode::ERRCODE_FDW_ERROR, + &format!( + "invalid format option: {}, it can only be 'csv', 'jsonl' or 'parquet'", + format + ), + ); + return Ok(()); } - } else { - return Ok(()); - }; + } let stream = match self .rt @@ -463,8 +465,8 @@ impl ForeignDataWrapper for S3Fdw { ) -> Result<(), S3FdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "uri"); - check_options_contain(&options, "format"); + check_options_contain(&options, "uri")?; + check_options_contain(&options, "format")?; } } diff --git a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs index 59972834..0785622f 100644 --- a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs +++ b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs @@ -631,12 +631,15 @@ impl StripeFdw { enum StripeFdwError { #[error("{0}")] CreateRuntimeError(#[from] CreateRuntimeError), + #[error("{0}")] + OptionsError(#[from] OptionsError), } impl From for ErrorReport { fn from(value: StripeFdwError) -> Self { match value { StripeFdwError::CreateRuntimeError(e) => e.into(), + StripeFdwError::OptionsError(e) => e.into(), } } } @@ -658,9 +661,10 @@ impl ForeignDataWrapper for StripeFdw { .unwrap_or_else(|| "https://api.stripe.com/v1/".to_string()); let client = match options.get("api_key") { Some(api_key) => Some(create_client(api_key)), - None => require_option("api_key_id", options) - .and_then(|key_id| get_vault_secret(&key_id)) - .map(|api_key| create_client(&api_key)), + None => { + let key_id = require_option("api_key_id", options)?; + get_vault_secret(&key_id).map(|api_key| create_client(&api_key)) + } }; stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); @@ -683,11 +687,7 @@ impl ForeignDataWrapper for StripeFdw { limit: &Option, options: &HashMap, ) -> Result<(), StripeFdwError> { - let obj = if let Some(name) = require_option("object", options) { - name - } else { - return Ok(()); - }; + let obj = require_option("object", options)?; if let Some(client) = &self.client { let page_size = 100; // maximum page size limit for Stripe API @@ -792,8 +792,8 @@ impl ForeignDataWrapper for StripeFdw { } fn begin_modify(&mut self, options: &HashMap) -> Result<(), StripeFdwError> { - self.obj = require_option("object", options).unwrap_or_default(); - self.rowid_col = require_option("rowid_column", options).unwrap_or_default(); + self.obj = require_option("object", options)?.to_string(); + self.rowid_col = require_option("rowid_column", options)?.to_string(); Ok(()) } @@ -945,7 +945,7 @@ impl ForeignDataWrapper for StripeFdw { ) -> Result<(), StripeFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "object"); + check_options_contain(&options, "object")?; } }