diff --git a/supabase-wrappers-macros/src/lib.rs b/supabase-wrappers-macros/src/lib.rs index 48a3a02a..f1a3f600 100644 --- a/supabase-wrappers-macros/src/lib.rs +++ b/supabase-wrappers-macros/src/lib.rs @@ -41,16 +41,30 @@ pub fn wrappers_fdw(attr: TokenStream, item: TokenStream) -> TokenStream { let mut metas = TokenStream2::new(); let meta_attrs: Punctuated = parse_macro_input!(attr with Punctuated::parse_terminated); + let mut error_type: Option = None; for attr in meta_attrs { let name = format!("{}", attr.path.segments.first().unwrap().ident); if let Lit::Str(val) = attr.lit { let value = val.value(); - metas.append_all(quote! { - meta.insert(#name.to_owned(), #value.to_owned()); - }); + if name == "version" || name == "author" || name == "website" { + metas.append_all(quote! { + meta.insert(#name.to_owned(), #value.to_owned()); + }); + } else if name == "error_type" { + error_type = Some(value); + } } } + let error_type_ident = if let Some(error_type) = error_type { + format_ident!("{}", error_type) + } else { + let quoted = quote! { + compile_error!("Missing `error_type` in the `wrappers_fdw` attribute"); + }; + return quoted.into(); + }; + let item: ItemStruct = parse_macro_input!(item as ItemStruct); let item_tokens = item.to_token_stream(); let ident = item.ident; @@ -68,6 +82,7 @@ pub fn wrappers_fdw(attr: TokenStream, item: TokenStream) -> TokenStream { mod #module_ident { use super::#ident; use std::collections::HashMap; + use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; use pgrx::prelude::*; use supabase_wrappers::prelude::*; @@ -79,6 +94,8 @@ pub fn wrappers_fdw(attr: TokenStream, item: TokenStream) -> TokenStream { #[pg_extern(create_or_replace)] fn #fn_validator_ident(options: Vec>, catalog: Option) { #ident::validator(options, catalog) + .map_err(|e| >::into(e)) + .report(); } #[pg_extern(create_or_replace)] diff --git a/supabase-wrappers/src/instance.rs b/supabase-wrappers/src/instance.rs index ef89da08..6ee800f2 100644 --- a/supabase-wrappers/src/instance.rs +++ b/supabase-wrappers/src/instance.rs @@ -1,12 +1,16 @@ use crate::prelude::*; +use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; use pgrx::prelude::*; use super::utils; // create a fdw instance -pub(super) unsafe fn create_fdw_instance(ftable_id: pg_sys::Oid) -> W { +pub(super) unsafe fn create_fdw_instance, W: ForeignDataWrapper>( + ftable_id: pg_sys::Oid, +) -> W { let ftable = pg_sys::GetForeignTable(ftable_id); let fserver = pg_sys::GetForeignServer((*ftable).serverid); let fserver_opts = utils::options_to_hashmap((*fserver).options); - W::new(&fserver_opts) + let wrapper = W::new(&fserver_opts); + wrapper.map_err(|e| e.into()).report() } diff --git a/supabase-wrappers/src/interface.rs b/supabase-wrappers/src/interface.rs index 96633d85..99e742c5 100644 --- a/supabase-wrappers/src/interface.rs +++ b/supabase-wrappers/src/interface.rs @@ -2,6 +2,7 @@ //! use crate::FdwRoutine; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::{Date, Timestamp}; use pgrx::{ fcinfo, @@ -454,7 +455,7 @@ impl Limit { /// /// See the module-level document for more details. /// -pub trait ForeignDataWrapper { +pub trait ForeignDataWrapper> { /// Create a FDW instance /// /// `options` is the key-value pairs defined in `CREATE SERVER` SQL. For example, @@ -472,7 +473,9 @@ pub trait ForeignDataWrapper { /// You can do any initalization in this function, like saving connection /// info or API url in an variable, but don't do heavy works like database /// connection or API call. - fn new(options: &HashMap) -> Self; + fn new(options: &HashMap) -> Result + where + Self: Sized; /// Obtain relation size estimates for a foreign table /// @@ -487,8 +490,8 @@ pub trait ForeignDataWrapper { _sorts: &[Sort], _limit: &Option, _options: &HashMap, - ) -> (i64, i32) { - (0, 0) + ) -> Result<(i64, i32), E> { + Ok((0, 0)) } /// Called when begin executing a foreign scan @@ -507,24 +510,26 @@ pub trait ForeignDataWrapper { sorts: &[Sort], limit: &Option, options: &HashMap, - ); + ) -> Result<(), E>; /// Called when fetch one row from the foreign source /// /// FDW must save fetched foreign data into the [`Row`], or return `None` if no more rows to read. /// /// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-SCAN). - fn iter_scan(&mut self, row: &mut Row) -> Option<()>; + fn iter_scan(&mut self, row: &mut Row) -> Result, E>; /// Called when restart the scan from the beginning. /// /// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-SCAN). - fn re_scan(&mut self) {} + fn re_scan(&mut self) -> Result<(), E> { + Ok(()) + } /// Called when end the scan /// /// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-SCAN). - fn end_scan(&mut self); + fn end_scan(&mut self) -> Result<(), E>; /// Called when begin executing a foreign table modification operation. /// @@ -548,14 +553,18 @@ pub trait ForeignDataWrapper { /// ``` /// /// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE). - fn begin_modify(&mut self, _options: &HashMap) {} + fn begin_modify(&mut self, _options: &HashMap) -> Result<(), E> { + Ok(()) + } /// Called when insert one row into the foreign table /// /// - row - the new row to be inserted /// /// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE). - fn insert(&mut self, _row: &Row) {} + fn insert(&mut self, _row: &Row) -> Result<(), E> { + Ok(()) + } /// Called when update one row into the foreign table /// @@ -563,19 +572,25 @@ pub trait ForeignDataWrapper { /// - new_row - the new row with updated cells /// /// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE). - fn update(&mut self, _rowid: &Cell, _new_row: &Row) {} + fn update(&mut self, _rowid: &Cell, _new_row: &Row) -> Result<(), E> { + Ok(()) + } /// Called when delete one row into the foreign table /// /// - rowid - the `rowid_column` cell /// /// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE). - fn delete(&mut self, _rowid: &Cell) {} + fn delete(&mut self, _rowid: &Cell) -> Result<(), E> { + Ok(()) + } /// Called when end the table update /// /// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE). - fn end_modify(&mut self) {} + fn end_modify(&mut self) -> Result<(), E> { + Ok(()) + } /// Returns a FdwRoutine for the FDW /// @@ -590,25 +605,25 @@ pub trait ForeignDataWrapper { FdwRoutine::::alloc_node(pg_sys::NodeTag_T_FdwRoutine); // plan phase - fdw_routine.GetForeignRelSize = Some(scan::get_foreign_rel_size::); - fdw_routine.GetForeignPaths = Some(scan::get_foreign_paths::); - fdw_routine.GetForeignPlan = Some(scan::get_foreign_plan::); - fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::); + fdw_routine.GetForeignRelSize = Some(scan::get_foreign_rel_size::); + fdw_routine.GetForeignPaths = Some(scan::get_foreign_paths::); + fdw_routine.GetForeignPlan = Some(scan::get_foreign_plan::); + fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::); // scan phase - fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::); - fdw_routine.IterateForeignScan = Some(scan::iterate_foreign_scan::); - fdw_routine.ReScanForeignScan = Some(scan::re_scan_foreign_scan::); - fdw_routine.EndForeignScan = Some(scan::end_foreign_scan::); + fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::); + fdw_routine.IterateForeignScan = Some(scan::iterate_foreign_scan::); + fdw_routine.ReScanForeignScan = Some(scan::re_scan_foreign_scan::); + fdw_routine.EndForeignScan = Some(scan::end_foreign_scan::); // modify phase fdw_routine.AddForeignUpdateTargets = Some(modify::add_foreign_update_targets); - fdw_routine.PlanForeignModify = Some(modify::plan_foreign_modify::); - fdw_routine.BeginForeignModify = Some(modify::begin_foreign_modify::); - fdw_routine.ExecForeignInsert = Some(modify::exec_foreign_insert::); - fdw_routine.ExecForeignDelete = Some(modify::exec_foreign_delete::); - fdw_routine.ExecForeignUpdate = Some(modify::exec_foreign_update::); - fdw_routine.EndForeignModify = Some(modify::end_foreign_modify::); + fdw_routine.PlanForeignModify = Some(modify::plan_foreign_modify::); + fdw_routine.BeginForeignModify = Some(modify::begin_foreign_modify::); + fdw_routine.ExecForeignInsert = Some(modify::exec_foreign_insert::); + fdw_routine.ExecForeignDelete = Some(modify::exec_foreign_delete::); + fdw_routine.ExecForeignUpdate = Some(modify::exec_foreign_update::); + fdw_routine.EndForeignModify = Some(modify::end_foreign_modify::); Self::fdw_routine_hook(&mut fdw_routine); fdw_routine.into_pg_boxed() @@ -631,7 +646,27 @@ pub trait ForeignDataWrapper { /// use pgrx::pg_sys::Oid; /// use supabase_wrappers::prelude::check_options_contain; /// - /// fn validator(opt_list: Vec>, catalog: Option) { + /// use pgrx::pg_sys::panic::ErrorReport; + /// use pgrx::PgSqlErrorCode; + /// + /// enum FdwError { + /// InvalidFdwOption, + /// InvalidServerOption, + /// InvalidTableOption, + /// } + /// + /// impl From for ErrorReport { + /// fn from(value: FdwError) -> Self { + /// let error_message = match value { + /// FdwError::InvalidFdwOption => "invalid foreign data wrapper option", + /// FdwError::InvalidServerOption => "invalid foreign server option", + /// FdwError::InvalidTableOption => "invalid foreign table option", + /// }; + /// ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, error_message, "") + /// } + /// } + /// + /// fn validator(opt_list: Vec>, catalog: Option) -> Result<(), FdwError> { /// if let Some(oid) = catalog { /// match oid { /// FOREIGN_DATA_WRAPPER_RELATION_ID => { @@ -649,7 +684,11 @@ pub trait ForeignDataWrapper { /// _ => {} /// } /// } + /// + /// Ok(()) /// } /// ``` - fn validator(_options: Vec>, _catalog: Option) {} + fn validator(_options: Vec>, _catalog: Option) -> Result<(), E> { + Ok(()) + } } diff --git a/supabase-wrappers/src/lib.rs b/supabase-wrappers/src/lib.rs index dbeefebd..aef3d24c 100644 --- a/supabase-wrappers/src/lib.rs +++ b/supabase-wrappers/src/lib.rs @@ -62,11 +62,14 @@ //! ```rust,no_run //! # mod wrapper { //! use std::collections::HashMap; +//! use pgrx::pg_sys::panic::ErrorReport; +//! use pgrx::PgSqlErrorCode; //! use supabase_wrappers::prelude::*; //! #[wrappers_fdw( //! version = "0.1.0", //! author = "Supabase", -//! website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/helloworld_fdw" +//! website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/helloworld_fdw", +//! error_type = "HelloWorldFdwError" //! )] //! pub struct HelloWorldFdw { //! //row counter @@ -76,8 +79,16 @@ //! tgt_cols: Vec, //! } //! -//! impl ForeignDataWrapper for HelloWorldFdw { -//! fn new(options: &HashMap) -> Self { +//! enum HelloWorldFdwError {} +//! +//! impl From for ErrorReport { +//! fn from(_value: HelloWorldFdwError) -> Self { +//! ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") +//! } +//! } +//! +//! impl ForeignDataWrapper for HelloWorldFdw { +//! fn new(options: &HashMap) -> Result { //! // 'options' is the key-value pairs defined in `CREATE SERVER` SQL, for example, //! // //! // create server my_helloworld_server @@ -91,23 +102,25 @@ //! // You can do any initalization in this new() function, like saving connection //! // info or API url in an variable, but don't do heavy works like database //! // connection or API call. -//! Self { +//! Ok(Self { //! row_cnt: 0, //! tgt_cols: Vec::new(), -//! } +//! }) //! } //! -//! fn begin_scan(&mut self, quals: &[Qual], columns: &[Column], sorts: &[Sort], limit: &Option, options: &HashMap) { +//! fn begin_scan(&mut self, quals: &[Qual], columns: &[Column], sorts: &[Sort], limit: &Option, options: &HashMap) -> Result<(), HelloWorldFdwError> { //! // Do any initilization +//! Ok(()) //! } //! -//! fn iter_scan(&mut self, row: &mut Row) -> Option<()> { +//! fn iter_scan(&mut self, row: &mut Row) -> Result, HelloWorldFdwError> { //! // Return None when done -//! None +//! Ok(None) //! } //! -//! fn end_scan(&mut self) { +//! fn end_scan(&mut self) -> Result<(), HelloWorldFdwError> { //! // Cleanup any resources +//! Ok(()) //! } //! } //! # } @@ -136,6 +149,8 @@ //! //! ```rust,no_run //! use std::collections::HashMap; +//! use pgrx::pg_sys::panic::ErrorReport; +//! use pgrx::PgSqlErrorCode; //! use supabase_wrappers::prelude::*; //! //! pub(crate) struct HelloWorldFdw { @@ -145,12 +160,21 @@ //! // target column name list //! tgt_cols: Vec, //! } -//! impl ForeignDataWrapper for HelloWorldFdw { -//! fn new(options: &HashMap) -> Self { -//! Self { +//! +//! enum HelloWorldFdwError {} +//! +//! impl From for ErrorReport { +//! fn from(_value: HelloWorldFdwError) -> Self { +//! ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") +//! } +//! } +//! +//! impl ForeignDataWrapper for HelloWorldFdw { +//! fn new(options: &HashMap) -> Result { +//! Ok(Self { //! row_cnt: 0, //! tgt_cols: Vec::new(), -//! } +//! }) //! } //! //! fn begin_scan( @@ -160,15 +184,16 @@ //! _sorts: &[Sort], //! _limit: &Option, //! _options: &HashMap, -//! ) { +//! ) -> Result<(), HelloWorldFdwError> { //! // reset row count //! self.row_cnt = 0; //! //! // save a copy of target columns //! self.tgt_cols = columns.to_vec(); +//! Ok(()) //! } //! -//! fn iter_scan(&mut self, row: &mut Row) -> Option<()> { +//! fn iter_scan(&mut self, row: &mut Row) -> Result, HelloWorldFdwError> { //! // this is called on each row and we only return one row here //! if self.row_cnt < 1 { //! // add values to row if they are in target column list @@ -183,15 +208,16 @@ //! self.row_cnt += 1; //! //! // return the 'Some(())' to Postgres and continue data scan -//! return Some(()); +//! return Ok(Some(())); //! } //! //! // return 'None' to stop data scan -//! None +//! Ok(None) //! } //! -//! fn end_scan(&mut self) { +//! fn end_scan(&mut self) -> Result<(), HelloWorldFdwError> { //! // we do nothing here, but you can do things like resource cleanup and etc. +//! Ok(()) //! } //! } //! ``` diff --git a/supabase-wrappers/src/modify.rs b/supabase-wrappers/src/modify.rs index 39e5e12c..fd90406b 100644 --- a/supabase-wrappers/src/modify.rs +++ b/supabase-wrappers/src/modify.rs @@ -1,8 +1,10 @@ +use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; use pgrx::{ debug2, memcxt::PgMemoryContexts, pg_sys::Oid, prelude::*, rel::PgRelation, tupdesc::PgTupleDesc, FromDatum, PgSqlErrorCode, }; use std::collections::HashMap; +use std::marker::PhantomData; use std::os::raw::c_int; use std::ptr; @@ -14,7 +16,7 @@ use super::polyfill; use super::utils; // Fdw private state for modify -struct FdwModifyState { +struct FdwModifyState, W: ForeignDataWrapper> { // foreign data wrapper instance instance: W, @@ -29,9 +31,10 @@ struct FdwModifyState { // temporary memory context per foreign table, created under Wrappers root // memory context tmp_ctx: PgMemoryContexts, + _phantom: PhantomData, } -impl FdwModifyState { +impl, W: ForeignDataWrapper> FdwModifyState { unsafe fn new(foreigntableid: Oid, tmp_ctx: PgMemoryContexts) -> Self { Self { instance: instance::create_fdw_instance(foreigntableid), @@ -40,31 +43,32 @@ impl FdwModifyState { rowid_typid: Oid::INVALID, opts: HashMap::new(), tmp_ctx, + _phantom: PhantomData, } } - fn begin_modify(&mut self) { - self.instance.begin_modify(&self.opts); + fn begin_modify(&mut self) -> Result<(), E> { + self.instance.begin_modify(&self.opts) } - fn insert(&mut self, row: &Row) { - self.instance.insert(row); + fn insert(&mut self, row: &Row) -> Result<(), E> { + self.instance.insert(row) } - fn update(&mut self, rowid: &Cell, new_row: &Row) { - self.instance.update(rowid, new_row); + fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), E> { + self.instance.update(rowid, new_row) } - fn delete(&mut self, rowid: &Cell) { - self.instance.delete(rowid); + fn delete(&mut self, rowid: &Cell) -> Result<(), E> { + self.instance.delete(rowid) } - fn end_modify(&mut self) { - self.instance.end_modify(); + fn end_modify(&mut self) -> Result<(), E> { + self.instance.end_modify() } } -impl utils::SerdeList for FdwModifyState {} +impl, W: ForeignDataWrapper> utils::SerdeList for FdwModifyState {} #[pg_guard] pub(super) extern "C" fn add_foreign_update_targets( @@ -112,7 +116,7 @@ pub(super) extern "C" fn add_foreign_update_targets( } #[pg_guard] -pub(super) extern "C" fn plan_foreign_modify( +pub(super) extern "C" fn plan_foreign_modify, W: ForeignDataWrapper>( root: *mut pg_sys::PlannerInfo, plan: *mut pg_sys::ModifyTable, result_relation: pg_sys::Index, @@ -158,7 +162,7 @@ pub(super) extern "C" fn plan_foreign_modify( let ctx = memctx::refresh_wrappers_memctx(&ctx_name); // create modify state - let mut state = FdwModifyState::::new(ftable_id, ctx); + let mut state = FdwModifyState::::new(ftable_id, ctx); state.rowid_name = rowid_name.to_string(); state.rowid_typid = attr.atttypid; @@ -181,7 +185,7 @@ pub(super) extern "C" fn plan_foreign_modify( } #[pg_guard] -pub(super) extern "C" fn begin_foreign_modify( +pub(super) extern "C" fn begin_foreign_modify, W: ForeignDataWrapper>( mtstate: *mut pg_sys::ModifyTableState, rinfo: *mut pg_sys::ResultRelInfo, fdw_private: *mut pg_sys::List, @@ -195,7 +199,7 @@ pub(super) extern "C" fn begin_foreign_modify( } unsafe { - let mut state = FdwModifyState::::deserialize_from_list(fdw_private as _); + let mut state = FdwModifyState::::deserialize_from_list(fdw_private as _); assert!(!state.is_null()); // search for rowid attribute number @@ -204,14 +208,14 @@ pub(super) extern "C" fn begin_foreign_modify( state.rowid_attno = pg_sys::ExecFindJunkAttributeInTlist((*subplan).targetlist, rowid_name_c); - state.begin_modify(); + state.begin_modify().map_err(|e| e.into()).report(); (*rinfo).ri_FdwState = state.into_pg() as _; } } #[pg_guard] -pub(super) extern "C" fn exec_foreign_insert( +pub(super) extern "C" fn exec_foreign_insert, W: ForeignDataWrapper>( _estate: *mut pg_sys::EState, rinfo: *mut pg_sys::ResultRelInfo, slot: *mut pg_sys::TupleTableSlot, @@ -219,18 +223,19 @@ pub(super) extern "C" fn exec_foreign_insert( ) -> *mut pg_sys::TupleTableSlot { debug2!("---> exec_foreign_insert"); unsafe { - let mut state = - PgBox::>::from_pg((*rinfo).ri_FdwState as *mut FdwModifyState); + let mut state = PgBox::>::from_pg( + (*rinfo).ri_FdwState as *mut FdwModifyState, + ); let row = utils::tuple_table_slot_to_row(slot); - state.insert(&row); + state.insert(&row).map_err(|e| e.into()).report(); } slot } -unsafe fn get_rowid_cell( - state: &FdwModifyState, +unsafe fn get_rowid_cell, W: ForeignDataWrapper>( + state: &FdwModifyState, plan_slot: *mut pg_sys::TupleTableSlot, ) -> Option { let mut is_null: bool = true; @@ -239,7 +244,7 @@ unsafe fn get_rowid_cell( } #[pg_guard] -pub(super) extern "C" fn exec_foreign_delete( +pub(super) extern "C" fn exec_foreign_delete, W: ForeignDataWrapper>( _estate: *mut pg_sys::EState, rinfo: *mut pg_sys::ResultRelInfo, slot: *mut pg_sys::TupleTableSlot, @@ -247,12 +252,13 @@ pub(super) extern "C" fn exec_foreign_delete( ) -> *mut pg_sys::TupleTableSlot { debug2!("---> exec_foreign_delete"); unsafe { - let mut state = - PgBox::>::from_pg((*rinfo).ri_FdwState as *mut FdwModifyState); + let mut state = PgBox::>::from_pg( + (*rinfo).ri_FdwState as *mut FdwModifyState, + ); let cell = get_rowid_cell(&state, plan_slot); if let Some(rowid) = cell { - state.delete(&rowid); + state.delete(&rowid).map_err(|e| e.into()).report(); } } @@ -260,7 +266,7 @@ pub(super) extern "C" fn exec_foreign_delete( } #[pg_guard] -pub(super) extern "C" fn exec_foreign_update( +pub(super) extern "C" fn exec_foreign_update, W: ForeignDataWrapper>( _estate: *mut pg_sys::EState, rinfo: *mut pg_sys::ResultRelInfo, slot: *mut pg_sys::TupleTableSlot, @@ -268,8 +274,9 @@ pub(super) extern "C" fn exec_foreign_update( ) -> *mut pg_sys::TupleTableSlot { debug2!("---> exec_foreign_update"); unsafe { - let mut state = - PgBox::>::from_pg((*rinfo).ri_FdwState as *mut FdwModifyState); + let mut state = PgBox::>::from_pg( + (*rinfo).ri_FdwState as *mut FdwModifyState, + ); let rowid_cell = get_rowid_cell(&state, plan_slot); if let Some(rowid) = rowid_cell { @@ -285,7 +292,10 @@ pub(super) extern "C" fn exec_foreign_update( }) && state.rowid_name != col.as_str() }); - state.update(&rowid, &new_row); + state + .update(&rowid, &new_row) + .map_err(|e| e.into()) + .report(); } } @@ -293,16 +303,16 @@ pub(super) extern "C" fn exec_foreign_update( } #[pg_guard] -pub(super) extern "C" fn end_foreign_modify( +pub(super) extern "C" fn end_foreign_modify, W: ForeignDataWrapper>( _estate: *mut pg_sys::EState, rinfo: *mut pg_sys::ResultRelInfo, ) { debug2!("---> end_foreign_modify"); unsafe { - let fdw_state = (*rinfo).ri_FdwState as *mut FdwModifyState; + let fdw_state = (*rinfo).ri_FdwState as *mut FdwModifyState; if !fdw_state.is_null() { - let mut state = PgBox::>::from_pg(fdw_state); - state.end_modify(); + let mut state = PgBox::>::from_pg(fdw_state); + state.end_modify().map_err(|e| e.into()).report(); } } } diff --git a/supabase-wrappers/src/scan.rs b/supabase-wrappers/src/scan.rs index 08b50310..431f6859 100644 --- a/supabase-wrappers/src/scan.rs +++ b/supabase-wrappers/src/scan.rs @@ -4,7 +4,9 @@ use pgrx::{ PgSqlErrorCode, }; use std::collections::HashMap; +use std::marker::PhantomData; +use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable}; use std::os::raw::c_int; use std::ptr; @@ -19,7 +21,7 @@ use crate::sort::*; use crate::utils::{self, report_error, SerdeList}; // Fdw private state for scan -struct FdwState { +struct FdwState, W: ForeignDataWrapper> { // foreign data wrapper instance instance: W, @@ -46,9 +48,10 @@ struct FdwState { values: Vec, nulls: Vec, row: Row, + _phantom: PhantomData, } -impl FdwState { +impl, W: ForeignDataWrapper> FdwState { unsafe fn new(foreigntableid: Oid, tmp_ctx: PgMemoryContexts) -> Self { Self { instance: instance::create_fdw_instance(foreigntableid), @@ -61,11 +64,12 @@ impl FdwState { values: Vec::new(), nulls: Vec::new(), row: Row::new(), + _phantom: PhantomData, } } #[inline] - fn get_rel_size(&mut self) -> (i64, i32) { + fn get_rel_size(&mut self) -> Result<(i64, i32), E> { self.instance.get_rel_size( &self.quals, &self.tgts, @@ -76,7 +80,7 @@ impl FdwState { } #[inline] - fn begin_scan(&mut self) { + fn begin_scan(&mut self) -> Result<(), E> { self.instance.begin_scan( &self.quals, &self.tgts, @@ -87,25 +91,25 @@ impl FdwState { } #[inline] - fn iter_scan(&mut self) -> Option<()> { + fn iter_scan(&mut self) -> Result, E> { self.instance.iter_scan(&mut self.row) } #[inline] - fn re_scan(&mut self) { + fn re_scan(&mut self) -> Result<(), E> { self.instance.re_scan() } #[inline] - fn end_scan(&mut self) { - self.instance.end_scan(); + fn end_scan(&mut self) -> Result<(), E> { + self.instance.end_scan() } } -impl utils::SerdeList for FdwState {} +impl, W: ForeignDataWrapper> utils::SerdeList for FdwState {} #[pg_guard] -pub(super) extern "C" fn get_foreign_rel_size( +pub(super) extern "C" fn get_foreign_rel_size, W: ForeignDataWrapper>( root: *mut pg_sys::PlannerInfo, baserel: *mut pg_sys::RelOptInfo, foreigntableid: pg_sys::Oid, @@ -117,7 +121,7 @@ pub(super) extern "C" fn get_foreign_rel_size( let ctx = memctx::refresh_wrappers_memctx(&ctx_name); // create scan state - let mut state = FdwState::::new(foreigntableid, ctx); + let mut state = FdwState::::new(foreigntableid, ctx); // extract qual list state.quals = extract_quals(root, baserel, foreigntableid); @@ -136,7 +140,7 @@ pub(super) extern "C" fn get_foreign_rel_size( state.opts = utils::options_to_hashmap((*ftable).options); // get estimate row count and mean row width - let (rows, width) = state.get_rel_size(); + let (rows, width) = state.get_rel_size().map_err(|e| e.into()).report(); (*baserel).rows = rows as f64; (*(*baserel).reltarget).width = width; @@ -147,14 +151,14 @@ pub(super) extern "C" fn get_foreign_rel_size( } #[pg_guard] -pub(super) extern "C" fn get_foreign_paths( +pub(super) extern "C" fn get_foreign_paths, W: ForeignDataWrapper>( root: *mut pg_sys::PlannerInfo, baserel: *mut pg_sys::RelOptInfo, _foreigntableid: pg_sys::Oid, ) { debug2!("---> get_foreign_paths"); unsafe { - let state = PgBox::>::from_pg((*baserel).fdw_private as _); + let state = PgBox::>::from_pg((*baserel).fdw_private as _); // get startup cost from foreign table options let startup_cost = state @@ -187,7 +191,7 @@ pub(super) extern "C" fn get_foreign_paths( } #[pg_guard] -pub(super) extern "C" fn get_foreign_plan( +pub(super) extern "C" fn get_foreign_plan, W: ForeignDataWrapper>( _root: *mut pg_sys::PlannerInfo, baserel: *mut pg_sys::RelOptInfo, _foreigntableid: pg_sys::Oid, @@ -198,7 +202,7 @@ pub(super) extern "C" fn get_foreign_plan( ) -> *mut pg_sys::ForeignScan { debug2!("---> get_foreign_plan"); unsafe { - let state = PgBox::>::from_pg((*baserel).fdw_private as _); + let state = PgBox::>::from_pg((*baserel).fdw_private as _); // make foreign scan plan let scan_clauses = pg_sys::extract_actual_clauses(scan_clauses, false); @@ -227,18 +231,18 @@ pub(super) extern "C" fn get_foreign_plan( } #[pg_guard] -pub(super) extern "C" fn explain_foreign_scan( +pub(super) extern "C" fn explain_foreign_scan, W: ForeignDataWrapper>( node: *mut pg_sys::ForeignScanState, es: *mut pg_sys::ExplainState, ) { debug2!("---> explain_foreign_scan"); unsafe { - let fdw_state = (*node).fdw_state as *mut FdwState; + let fdw_state = (*node).fdw_state as *mut FdwState; if fdw_state.is_null() { return; } - let state = PgBox::>::from_pg(fdw_state); + let state = PgBox::>::from_pg(fdw_state); let ctx = PgMemoryContexts::CurrentMemoryContext; @@ -259,9 +263,9 @@ pub(super) extern "C" fn explain_foreign_scan( } // extract paramter value and assign it to qual in scan state -unsafe fn assign_paramenter_value( +unsafe fn assign_paramenter_value, W: ForeignDataWrapper>( node: *mut pg_sys::ForeignScanState, - state: &mut FdwState, + state: &mut FdwState, ) { // get parameter list in execution state let estate = (*node).ss.ps.state; @@ -285,7 +289,7 @@ unsafe fn assign_paramenter_value( } #[pg_guard] -pub(super) extern "C" fn begin_foreign_scan( +pub(super) extern "C" fn begin_foreign_scan, W: ForeignDataWrapper>( node: *mut pg_sys::ForeignScanState, eflags: c_int, ) { @@ -293,7 +297,7 @@ pub(super) extern "C" fn begin_foreign_scan( unsafe { let scan_state = (*node).ss; let plan = scan_state.ps.plan as *mut pg_sys::ForeignScan; - let mut state = FdwState::::deserialize_from_list((*plan).fdw_private as _); + let mut state = FdwState::::deserialize_from_list((*plan).fdw_private as _); assert!(!state.is_null()); // assign parameter values to qual @@ -301,7 +305,7 @@ pub(super) extern "C" fn begin_foreign_scan( // begin scan if it is not EXPLAIN statement if eflags & pg_sys::EXEC_FLAG_EXPLAIN_ONLY as c_int <= 0 { - state.begin_scan(); + state.begin_scan().map_err(|e| e.into()).report(); let rel = scan_state.ss_currentRelation; let tup_desc = (*rel).rd_att; @@ -319,20 +323,20 @@ pub(super) extern "C" fn begin_foreign_scan( } #[pg_guard] -pub(super) extern "C" fn iterate_foreign_scan( +pub(super) extern "C" fn iterate_foreign_scan, W: ForeignDataWrapper>( node: *mut pg_sys::ForeignScanState, ) -> *mut pg_sys::TupleTableSlot { // `debug!` macros are quite expensive at the moment, so avoid logging in the inner loop // debug2!("---> iterate_foreign_scan"); unsafe { - let mut state = PgBox::>::from_pg((*node).fdw_state as _); + let mut state = PgBox::>::from_pg((*node).fdw_state as _); // clear slot let slot = (*node).ss.ss_ScanTupleSlot; polyfill::exec_clear_tuple(slot); state.row.clear(); - if state.iter_scan().is_some() { + if state.iter_scan().map_err(|e| e.into()).report().is_some() { if state.row.cols.len() != state.tgts.len() { report_error( PgSqlErrorCode::ERRCODE_FDW_INVALID_COLUMN_NUMBER, @@ -363,31 +367,31 @@ pub(super) extern "C" fn iterate_foreign_scan( } #[pg_guard] -pub(super) extern "C" fn re_scan_foreign_scan( +pub(super) extern "C" fn re_scan_foreign_scan, W: ForeignDataWrapper>( node: *mut pg_sys::ForeignScanState, ) { debug2!("---> re_scan_foreign_scan"); unsafe { - let fdw_state = (*node).fdw_state as *mut FdwState; + let fdw_state = (*node).fdw_state as *mut FdwState; if !fdw_state.is_null() { - let mut state = PgBox::>::from_pg(fdw_state); - state.re_scan(); + let mut state = PgBox::>::from_pg(fdw_state); + state.re_scan().map_err(|e| e.into()).report(); } } } #[pg_guard] -pub(super) extern "C" fn end_foreign_scan( +pub(super) extern "C" fn end_foreign_scan, W: ForeignDataWrapper>( node: *mut pg_sys::ForeignScanState, ) { debug2!("---> end_foreign_scan"); unsafe { - let fdw_state = (*node).fdw_state as *mut FdwState; + let fdw_state = (*node).fdw_state as *mut FdwState; if fdw_state.is_null() { return; } - let mut state = PgBox::>::from_pg(fdw_state); - state.end_scan(); + let mut state = PgBox::>::from_pg(fdw_state); + state.end_scan().map_err(|e| e.into()).report(); } } diff --git a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs index 56dc1f76..ec196182 100644 --- a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs +++ b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs @@ -1,5 +1,6 @@ use crate::stats; use pgrx::pg_sys; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::PgSqlErrorCode; use reqwest::{self, header}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; @@ -30,7 +31,8 @@ fn create_client(api_key: &str) -> ClientWithMiddleware { #[wrappers_fdw( version = "0.1.2", author = "Ankur Goyal", - website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/airtable_fdw" + website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/airtable_fdw", + error_type = "AirtableFdwError" )] pub(crate) struct AirtableFdw { rt: Runtime, @@ -93,9 +95,17 @@ macro_rules! report_fetch_error { }; } +enum AirtableFdwError {} + +impl From for ErrorReport { + fn from(_value: AirtableFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + } +} + // TODO Add support for INSERT, UPDATE, DELETE -impl ForeignDataWrapper for AirtableFdw { - fn new(options: &HashMap) -> Self { +impl ForeignDataWrapper for AirtableFdw { + fn new(options: &HashMap) -> Result { let base_url = options .get("api_url") .map(|t| t.to_owned()) @@ -110,12 +120,12 @@ impl ForeignDataWrapper for AirtableFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); - Self { + Ok(Self { rt: create_async_runtime(), client, base_url, scan_result: None, - } + }) } fn begin_scan( @@ -125,14 +135,14 @@ impl ForeignDataWrapper for AirtableFdw { _sorts: &[Sort], // TODO: Propagate sort _limit: &Option, // TODO: maxRecords options: &HashMap, - ) { + ) -> Result<(), AirtableFdwError> { let url = if let Some(url) = require_option("base_id", options).and_then(|base_id| { require_option("table_id", options) .map(|table_id| self.build_url(&base_id, &table_id, options.get("view_id"))) }) { url } else { - return; + return Ok(()); }; let mut rows = Vec::new(); @@ -150,7 +160,7 @@ impl ForeignDataWrapper for AirtableFdw { PgSqlErrorCode::ERRCODE_FDW_ERROR, &format!("internal error: {}", err), ); - return; + return Ok(()); } }; @@ -183,30 +193,37 @@ impl ForeignDataWrapper for AirtableFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::RowsOut, rows.len() as i64); self.scan_result = Some(rows); + Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Option<()> { + fn iter_scan(&mut self, row: &mut Row) -> Result, AirtableFdwError> { if let Some(ref mut result) = self.scan_result { if !result.is_empty() { - return result + return Ok(result .drain(0..1) .last() - .map(|src_row| row.replace_with(src_row)); + .map(|src_row| row.replace_with(src_row))); } } - None + Ok(None) } - fn end_scan(&mut self) { + fn end_scan(&mut self) -> Result<(), AirtableFdwError> { self.scan_result.take(); + Ok(()) } - fn validator(options: Vec>, catalog: Option) { + fn validator( + options: Vec>, + catalog: Option, + ) -> Result<(), AirtableFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { check_options_contain(&options, "base_id"); check_options_contain(&options, "table_id"); } } + + Ok(()) } } diff --git a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs index e8833282..685dba5b 100644 --- a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs +++ b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs @@ -10,6 +10,7 @@ use gcp_bigquery_client::{ }, Client, }; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::PgSqlErrorCode; use pgrx::prelude::{AnyNumeric, Date, Timestamp}; use serde_json::json; @@ -85,7 +86,8 @@ fn field_to_cell(rs: &ResultSet, field: &TableFieldSchema) -> Option { #[wrappers_fdw( version = "0.1.4", author = "Supabase", - website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/bigquery_fdw" + website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/bigquery_fdw", + error_type = "BigQueryFdwError" )] pub(crate) struct BigQueryFdw { rt: Runtime, @@ -158,8 +160,16 @@ impl BigQueryFdw { } } -impl ForeignDataWrapper for BigQueryFdw { - fn new(options: &HashMap) -> Self { +enum BigQueryFdwError {} + +impl From for ErrorReport { + fn from(_value: BigQueryFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + } +} + +impl ForeignDataWrapper for BigQueryFdw { + fn new(options: &HashMap) -> Result { let mut ret = BigQueryFdw { rt: create_async_runtime(), client: None, @@ -176,7 +186,7 @@ impl ForeignDataWrapper for BigQueryFdw { let dataset_id = require_option("dataset_id", options); if project_id.is_none() || dataset_id.is_none() { - return ret; + return Ok(ret); } ret.project_id = project_id.unwrap(); ret.dataset_id = dataset_id.unwrap(); @@ -208,11 +218,11 @@ impl ForeignDataWrapper for BigQueryFdw { None => { let sa_key_id = match require_option("sa_key_id", options) { Some(sa_key_id) => sa_key_id, - None => return ret, + None => return Ok(ret), }; match get_vault_secret(&sa_key_id) { Some(sa_key) => sa_key, - None => return ret, + None => return Ok(ret), } } }, @@ -225,7 +235,7 @@ impl ForeignDataWrapper for BigQueryFdw { PgSqlErrorCode::ERRCODE_FDW_ERROR, &format!("parse service account key JSON failed: {}", err), ); - return ret; + return Ok(ret); } }; @@ -246,7 +256,7 @@ impl ForeignDataWrapper for BigQueryFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); - ret + Ok(ret) } fn get_rel_size( @@ -256,8 +266,8 @@ impl ForeignDataWrapper for BigQueryFdw { _sorts: &[Sort], _limit: &Option, _options: &HashMap, - ) -> (i64, i32) { - (0, 0) + ) -> Result<(i64, i32), BigQueryFdwError> { + Ok((0, 0)) } fn begin_scan( @@ -267,10 +277,10 @@ impl ForeignDataWrapper for BigQueryFdw { sorts: &[Sort], limit: &Option, options: &HashMap, - ) { + ) -> Result<(), BigQueryFdwError> { let table = require_option("table", options); if table.is_none() { - return; + return Ok(()); } self.table = table.unwrap(); self.tgt_cols = columns.to_vec(); @@ -343,9 +353,11 @@ impl ForeignDataWrapper for BigQueryFdw { } } } + + Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Option<()> { + fn iter_scan(&mut self, row: &mut Row) -> Result, BigQueryFdwError> { if let Some(client) = &self.client { if let Some(ref mut rs) = self.scan_result { let mut extract_row = |rs: &mut ResultSet| { @@ -368,7 +380,7 @@ impl ForeignDataWrapper for BigQueryFdw { }; if extract_row(rs) { - return Some(()); + return Ok(Some(())); } // deal with pagination @@ -388,7 +400,7 @@ impl ForeignDataWrapper for BigQueryFdw { // replace result set with data from the new page *rs = ResultSet::new(QueryResponse::from(resp)); if extract_row(rs) { - return Some(()); + return Ok(Some(())); } } Err(err) => { @@ -404,24 +416,27 @@ impl ForeignDataWrapper for BigQueryFdw { } } } - None + Ok(None) } - fn end_scan(&mut self) { + fn end_scan(&mut self) -> Result<(), BigQueryFdwError> { self.scan_result.take(); + Ok(()) } - fn begin_modify(&mut self, options: &HashMap) { + fn begin_modify(&mut self, options: &HashMap) -> Result<(), BigQueryFdwError> { let table = require_option("table", options); let rowid_col = require_option("rowid_column", options); if table.is_none() || rowid_col.is_none() { - return; + return Ok(()); } self.table = table.unwrap(); self.rowid_col = rowid_col.unwrap(); + + Ok(()) } - fn insert(&mut self, src: &Row) { + fn insert(&mut self, src: &Row) -> Result<(), BigQueryFdwError> { if let Some(ref mut client) = self.client { let mut insert_request = TableDataInsertAllRequest::new(); let mut row_json = json!({}); @@ -460,9 +475,11 @@ impl ForeignDataWrapper for BigQueryFdw { ); } } + + Ok(()) } - fn update(&mut self, rowid: &Cell, new_row: &Row) { + fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), BigQueryFdwError> { if let Some(ref mut client) = self.client { let mut sets = Vec::new(); for (col, cell) in new_row.iter() { @@ -495,9 +512,10 @@ impl ForeignDataWrapper for BigQueryFdw { ); } } + Ok(()) } - fn delete(&mut self, rowid: &Cell) { + fn delete(&mut self, rowid: &Cell) -> Result<(), BigQueryFdwError> { if let Some(ref mut client) = self.client { let sql = format!( "delete from `{}.{}.{}` where {} = {}", @@ -514,9 +532,8 @@ impl ForeignDataWrapper for BigQueryFdw { ); } } + Ok(()) } - - fn end_modify(&mut self) {} } use auth_mock::GoogleAuthMock; diff --git a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs index 679d94e2..1d33ef0e 100644 --- a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs +++ b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs @@ -2,6 +2,7 @@ use crate::stats; use chrono::{Date, DateTime, NaiveDate, NaiveDateTime, Utc}; use chrono_tz::Tz; use clickhouse_rs::{types, types::Block, types::SqlType, ClientHandle, Pool}; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::{prelude::PgSqlErrorCode, to_timestamp}; use regex::{Captures, Regex}; use std::collections::HashMap; @@ -77,7 +78,8 @@ fn field_to_cell(row: &types::Row, i: usize) -> Option { #[wrappers_fdw( version = "0.1.3", author = "Supabase", - website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/clickhouse_fdw" + website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/clickhouse_fdw", + error_type = "ClickHouseFdwError" )] pub(crate) struct ClickHouseFdw { rt: Runtime, @@ -196,8 +198,16 @@ impl ClickHouseFdw { } } -impl ForeignDataWrapper for ClickHouseFdw { - fn new(options: &HashMap) -> Self { +enum ClickHouseFdwError {} + +impl From for ErrorReport { + fn from(_value: ClickHouseFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + } +} + +impl ForeignDataWrapper for ClickHouseFdw { + fn new(options: &HashMap) -> Result { let rt = create_async_runtime(); let conn_str = match options.get("conn_string") { Some(conn_str) => conn_str.to_owned(), @@ -208,7 +218,7 @@ impl ForeignDataWrapper for ClickHouseFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); - Self { + Ok(Self { rt, conn_str, client: None, @@ -218,7 +228,7 @@ impl ForeignDataWrapper for ClickHouseFdw { scan_blk: None, row_idx: 0, params: Vec::new(), - } + }) } fn begin_scan( @@ -228,12 +238,12 @@ impl ForeignDataWrapper for ClickHouseFdw { sorts: &[Sort], limit: &Option, options: &HashMap, - ) { + ) -> Result<(), ClickHouseFdwError> { self.create_client(); let table = require_option("table", options); if table.is_none() { - return; + return Ok(()); } self.table = table.unwrap(); self.tgt_cols = columns.to_vec(); @@ -264,9 +274,11 @@ impl ForeignDataWrapper for ClickHouseFdw { ), } } + + Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Option<()> { + fn iter_scan(&mut self, row: &mut Row) -> Result, ClickHouseFdwError> { if let Some(block) = &self.scan_blk { let mut rows = block.rows(); @@ -287,33 +299,40 @@ impl ForeignDataWrapper for ClickHouseFdw { .unwrap(); let cell = field_to_cell(&src_row, i); let col_name = src_row.name(i).unwrap(); - cell.as_ref()?; + if cell.as_ref().is_none() { + return Ok(None); + } row.push(col_name, cell); } self.row_idx += 1; - return Some(()); + return Ok(Some(())); } } - None + Ok(None) } - fn end_scan(&mut self) { + fn end_scan(&mut self) -> Result<(), ClickHouseFdwError> { self.scan_blk.take(); + Ok(()) } - fn begin_modify(&mut self, options: &HashMap) { + fn begin_modify( + &mut self, + options: &HashMap, + ) -> Result<(), ClickHouseFdwError> { self.create_client(); let table = require_option("table", options); let rowid_col = require_option("rowid_column", options); if table.is_none() || rowid_col.is_none() { - return; + return Ok(()); } self.table = table.unwrap(); self.rowid_col = rowid_col.unwrap(); + Ok(()) } - fn insert(&mut self, src: &Row) { + fn insert(&mut self, src: &Row) -> Result<(), ClickHouseFdwError> { if let Some(ref mut client) = self.client { let mut row = Vec::new(); for (col_name, cell) in src.iter() { @@ -368,9 +387,10 @@ impl ForeignDataWrapper for ClickHouseFdw { ); } } + Ok(()) } - fn update(&mut self, rowid: &Cell, new_row: &Row) { + fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), ClickHouseFdwError> { if let Some(ref mut client) = self.client { let mut sets = Vec::new(); for (col, cell) in new_row.iter() { @@ -399,9 +419,10 @@ impl ForeignDataWrapper for ClickHouseFdw { ); } } + Ok(()) } - fn delete(&mut self, rowid: &Cell) { + fn delete(&mut self, rowid: &Cell) -> Result<(), ClickHouseFdwError> { if let Some(ref mut client) = self.client { let sql = format!( "alter table {} delete where {} = {}", @@ -416,7 +437,6 @@ impl ForeignDataWrapper for ClickHouseFdw { ); } } + Ok(()) } - - fn end_modify(&mut self) {} } diff --git a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs index 77d7f62f..4d879803 100644 --- a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs +++ b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs @@ -1,4 +1,5 @@ use crate::stats; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::{pg_sys, prelude::*, JsonB}; use regex::Regex; use reqwest::{self, header}; @@ -167,7 +168,8 @@ fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec { #[wrappers_fdw( version = "0.1.2", author = "Supabase", - website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/firebase_fdw" + website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/firebase_fdw", + error_type = "FirebaseFdwError" )] pub(crate) struct FirebaseFdw { rt: Runtime, @@ -245,8 +247,16 @@ impl FirebaseFdw { } } -impl ForeignDataWrapper for FirebaseFdw { - fn new(options: &HashMap) -> Self { +enum FirebaseFdwError {} + +impl From for ErrorReport { + fn from(_value: FirebaseFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + } +} + +impl ForeignDataWrapper for FirebaseFdw { + fn new(options: &HashMap) -> Result { let mut ret = Self { rt: create_async_runtime(), project_id: "".to_string(), @@ -256,7 +266,7 @@ impl ForeignDataWrapper for FirebaseFdw { ret.project_id = match require_option("project_id", options) { Some(project_id) => project_id, - None => return ret, + None => return Ok(ret), }; // get oauth2 access token if it is directly defined in options @@ -269,18 +279,18 @@ impl ForeignDataWrapper for FirebaseFdw { None => { let sa_key_id = match require_option("sa_key_id", options) { Some(sa_key_id) => sa_key_id, - None => return ret, + None => return Ok(ret), }; match get_vault_secret(&sa_key_id) { Some(sa_key) => sa_key, - None => return ret, + None => return Ok(ret), } } }; if let Some(access_token) = get_oauth2_token(&sa_key, &ret.rt) { access_token.token().map(|t| t.to_owned()).unwrap() } else { - return ret; + return Ok(ret); } }; @@ -302,7 +312,7 @@ impl ForeignDataWrapper for FirebaseFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); - ret + Ok(ret) } fn begin_scan( @@ -312,10 +322,10 @@ impl ForeignDataWrapper for FirebaseFdw { _sorts: &[Sort], _limit: &Option, options: &HashMap, - ) { + ) -> Result<(), FirebaseFdwError> { let obj = match require_option("object", options) { Some(obj) => obj, - None => return, + None => return Ok(()), }; let row_cnt_limit = options .get("limit") @@ -374,29 +384,37 @@ impl ForeignDataWrapper for FirebaseFdw { self.scan_result = Some(result); } + + Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Option<()> { + fn iter_scan(&mut self, row: &mut Row) -> Result, FirebaseFdwError> { if let Some(ref mut result) = self.scan_result { if !result.is_empty() { - return result + return Ok(result .drain(0..1) .last() - .map(|src_row| row.replace_with(src_row)); + .map(|src_row| row.replace_with(src_row))); } } - None + Ok(None) } - fn end_scan(&mut self) { + fn end_scan(&mut self) -> Result<(), FirebaseFdwError> { self.scan_result.take(); + Ok(()) } - fn validator(options: Vec>, catalog: Option) { + fn validator( + options: Vec>, + catalog: Option, + ) -> Result<(), FirebaseFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { check_options_contain(&options, "object"); } } + + Ok(()) } } diff --git a/wrappers/src/fdw/helloworld_fdw/helloworld_fdw.rs b/wrappers/src/fdw/helloworld_fdw/helloworld_fdw.rs index ed56b852..7d755d73 100644 --- a/wrappers/src/fdw/helloworld_fdw/helloworld_fdw.rs +++ b/wrappers/src/fdw/helloworld_fdw/helloworld_fdw.rs @@ -1,3 +1,5 @@ +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::PgSqlErrorCode; use std::collections::HashMap; use supabase_wrappers::prelude::*; @@ -5,7 +7,8 @@ use supabase_wrappers::prelude::*; #[wrappers_fdw( version = "0.1.0", author = "Supabase", - website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/helloworld_fdw" + website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/helloworld_fdw", + error_type = "HelloWorldFdwError" )] pub(crate) struct HelloWorldFdw { // row counter @@ -15,7 +18,15 @@ pub(crate) struct HelloWorldFdw { tgt_cols: Vec, } -impl ForeignDataWrapper for HelloWorldFdw { +enum HelloWorldFdwError {} + +impl From for ErrorReport { + fn from(_value: HelloWorldFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + } +} + +impl ForeignDataWrapper for HelloWorldFdw { // 'options' is the key-value pairs defined in `CREATE SERVER` SQL, for example, // // create server my_helloworld_server @@ -29,11 +40,11 @@ impl ForeignDataWrapper for HelloWorldFdw { // You can do any initalization in this new() function, like saving connection // info or API url in an variable, but don't do any heavy works like making a // database connection or API call. - fn new(_options: &HashMap) -> Self { - Self { + fn new(_options: &HashMap) -> Result { + Ok(Self { row_cnt: 0, tgt_cols: Vec::new(), - } + }) } fn begin_scan( @@ -43,15 +54,17 @@ impl ForeignDataWrapper for HelloWorldFdw { _sorts: &[Sort], _limit: &Option, _options: &HashMap, - ) { + ) -> Result<(), HelloWorldFdwError> { // reset row counter self.row_cnt = 0; // save a copy of target columns self.tgt_cols = columns.to_vec(); + + Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Option<()> { + fn iter_scan(&mut self, row: &mut Row) -> Result, HelloWorldFdwError> { // this is called on each row and we only return one row here if self.row_cnt < 1 { // add values to row if they are in target column list @@ -66,14 +79,15 @@ impl ForeignDataWrapper for HelloWorldFdw { self.row_cnt += 1; // return Some(()) to Postgres and continue data scan - return Some(()); + return Ok(Some(())); } // return 'None' to stop data scan - None + Ok(None) } - fn end_scan(&mut self) { + fn end_scan(&mut self) -> Result<(), HelloWorldFdwError> { // we do nothing here, but you can do things like resource cleanup and etc. + Ok(()) } } diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index 1c8ed7db..8868b0f5 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -1,4 +1,5 @@ use crate::stats; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::{ pg_sys, prelude::{AnyNumeric, Date, PgSqlErrorCode, Timestamp}, @@ -38,7 +39,7 @@ macro_rules! report_request_error { PgSqlErrorCode::ERRCODE_FDW_ERROR, &format!("request failed: {}", $err), ); - return; + return Ok(()); }}; } @@ -124,7 +125,8 @@ fn json_value_to_cell(tgt_col: &Column, v: &JsonValue) -> Cell { #[wrappers_fdw( version = "0.1.0", author = "Supabase", - website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/logflare_fdw" + website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/logflare_fdw", + error_type = "LogflareFdwError" )] pub(crate) struct LogflareFdw { rt: Runtime, @@ -213,8 +215,16 @@ impl LogflareFdw { } } -impl ForeignDataWrapper for LogflareFdw { - fn new(options: &HashMap) -> Self { +enum LogflareFdwError {} + +impl From for ErrorReport { + fn from(_value: LogflareFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + } +} + +impl ForeignDataWrapper for LogflareFdw { + fn new(options: &HashMap) -> Result { let base_url = options .get("api_url") .map(|t| t.to_owned()) @@ -235,13 +245,13 @@ impl ForeignDataWrapper for LogflareFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); - LogflareFdw { + Ok(LogflareFdw { rt: create_async_runtime(), base_url: Url::parse(&base_url).unwrap(), client, scan_result: None, params: Vec::default(), - } + }) } fn begin_scan( @@ -251,25 +261,25 @@ impl ForeignDataWrapper for LogflareFdw { _sorts: &[Sort], _limit: &Option, options: &HashMap, - ) { + ) -> Result<(), LogflareFdwError> { let endpoint = if let Some(name) = require_option("endpoint", options) { name } else { - return; + return Ok(()); }; // extract params self.params = if let Some(params) = extract_params(quals) { params } else { - return; + return Ok(()); }; if let Some(client) = &self.client { // build url let url = self.build_url(&endpoint); if url.is_none() { - return; + return Ok(()); } let url = url.unwrap(); @@ -285,7 +295,7 @@ impl ForeignDataWrapper for LogflareFdw { if resp.status() == StatusCode::NOT_FOUND { // if it is 404 error, we should treat it as an empty // result rather than a request error - return; + return Ok(()); } match resp.error_for_status() { @@ -312,29 +322,37 @@ impl ForeignDataWrapper for LogflareFdw { Err(err) => report_request_error!(err), } } + + Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Option<()> { + fn iter_scan(&mut self, row: &mut Row) -> Result, LogflareFdwError> { if let Some(ref mut result) = self.scan_result { if !result.is_empty() { - return result + return Ok(result .drain(0..1) .last() - .map(|src_row| row.replace_with(src_row)); + .map(|src_row| row.replace_with(src_row))); } } - None + Ok(None) } - fn end_scan(&mut self) { + fn end_scan(&mut self) -> Result<(), LogflareFdwError> { self.scan_result.take(); + Ok(()) } - fn validator(options: Vec>, catalog: Option) { + fn validator( + options: Vec>, + catalog: Option, + ) -> Result<(), LogflareFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { check_options_contain(&options, "endpoint"); } } + + Ok(()) } } diff --git a/wrappers/src/fdw/s3_fdw/s3_fdw.rs b/wrappers/src/fdw/s3_fdw/s3_fdw.rs index b14cf96a..dc88313d 100644 --- a/wrappers/src/fdw/s3_fdw/s3_fdw.rs +++ b/wrappers/src/fdw/s3_fdw/s3_fdw.rs @@ -3,6 +3,7 @@ use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZlibD use aws_sdk_s3 as s3; use http::Uri; use pgrx::pg_sys; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::PgSqlErrorCode; use serde_json::{self, Value as JsonValue}; use std::collections::{HashMap, VecDeque}; @@ -25,7 +26,8 @@ enum Parser { #[wrappers_fdw( version = "0.1.2", author = "Supabase", - website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/s3_fdw" + website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/s3_fdw", + error_type = "S3FdwError" )] pub(crate) struct S3Fdw { rt: Runtime, @@ -125,8 +127,16 @@ impl S3Fdw { } } -impl ForeignDataWrapper for S3Fdw { - fn new(options: &HashMap) -> Self { +enum S3FdwError {} + +impl From for ErrorReport { + fn from(_value: S3FdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + } +} + +impl ForeignDataWrapper for S3Fdw { + fn new(options: &HashMap) -> Result { let rt = tokio::runtime::Runtime::new().unwrap(); let mut ret = S3Fdw { rt, @@ -164,7 +174,7 @@ impl ForeignDataWrapper for S3Fdw { } }; if creds.is_none() { - return ret; + return Ok(ret); } let creds = creds.unwrap(); @@ -199,7 +209,7 @@ impl ForeignDataWrapper for S3Fdw { }; ret.client = Some(client); - ret + Ok(ret) } fn begin_scan( @@ -209,7 +219,7 @@ impl ForeignDataWrapper for S3Fdw { _sorts: &[Sort], _limit: &Option, options: &HashMap, - ) { + ) -> Result<(), S3FdwError> { // extract s3 bucket and object path from uri option let (bucket, object) = if let Some(uri) = require_option("uri", options) { match uri.parse::() { @@ -222,7 +232,7 @@ impl ForeignDataWrapper for S3Fdw { PgSqlErrorCode::ERRCODE_FDW_ERROR, &format!("invalid s3 uri: {}", uri), ); - return; + return Ok(()); } // exclude 1st "/" char in the path as s3 object path doesn't like it (uri.host().unwrap().to_owned(), uri.path()[1..].to_string()) @@ -232,11 +242,11 @@ impl ForeignDataWrapper for S3Fdw { PgSqlErrorCode::ERRCODE_FDW_ERROR, &format!("parse s3 uri failed: {}", err), ); - return; + return Ok(()); } } } else { - return; + return Ok(()); }; let has_header: bool = options.get("has_header") == Some(&"true".to_string()); @@ -261,11 +271,11 @@ impl ForeignDataWrapper for S3Fdw { format ), ); - return; + return Ok(()); } } } else { - return; + return Ok(()); }; let stream = match self @@ -278,7 +288,7 @@ impl ForeignDataWrapper for S3Fdw { PgSqlErrorCode::ERRCODE_FDW_ERROR, &format!("request s3 failed: {}", err), ); - return; + return Ok(()); } }; @@ -295,7 +305,7 @@ impl ForeignDataWrapper for S3Fdw { PgSqlErrorCode::ERRCODE_FDW_ERROR, &format!("invalid compression option: {}", compress), ); - return; + return Ok(()); } } } else { @@ -321,7 +331,7 @@ impl ForeignDataWrapper for S3Fdw { &self.tgt_cols, )); } - return; + return Ok(()); } let mut rdr: BufReader>> = BufReader::new(boxed_stream); @@ -335,26 +345,30 @@ impl ForeignDataWrapper for S3Fdw { PgSqlErrorCode::ERRCODE_FDW_ERROR, &format!("fetch csv file failed: {}", err), ); - return; + return Ok(()); } } } self.rdr = Some(rdr); } + + Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Option<()> { + fn iter_scan(&mut self, row: &mut Row) -> Result, S3FdwError> { // read parquet record if let Parser::Parquet(ref mut s3parquet) = &mut self.parser { - self.rt.block_on(s3parquet.refill())?; + if self.rt.block_on(s3parquet.refill()).is_none() { + return Ok(None); + } let ret = s3parquet.read_into_row(row, &self.tgt_cols); if ret.is_some() { self.rows_out += 1; } else { stats::inc_stats(Self::FDW_NAME, stats::Metric::RowsOut, self.rows_out); } - return ret; + return Ok(ret); } // read csv or jsonl record @@ -376,7 +390,7 @@ impl ForeignDataWrapper for S3Fdw { row.push(&col.name, cell); } self.rows_out += 1; - return Some(()); + return Ok(Some(())); } else { // no more records left in the local buffer, refill from remote self.buf.clear(); @@ -419,7 +433,7 @@ impl ForeignDataWrapper for S3Fdw { } } self.rows_out += 1; - return Some(()); + return Ok(Some(())); } None => { // no more records left in the local buffer, refill from remote @@ -433,21 +447,27 @@ impl ForeignDataWrapper for S3Fdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::RowsOut, self.rows_out); - None + Ok(None) } - fn end_scan(&mut self) { + fn end_scan(&mut self) -> Result<(), S3FdwError> { // release local resources self.rdr.take(); self.parser = Parser::JsonLine(VecDeque::new()); + Ok(()) } - fn validator(options: Vec>, catalog: Option) { + fn validator( + options: Vec>, + catalog: Option, + ) -> Result<(), S3FdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { check_options_contain(&options, "uri"); check_options_contain(&options, "format"); } } + + Ok(()) } } diff --git a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs index 21ebc066..d496889f 100644 --- a/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs +++ b/wrappers/src/fdw/stripe_fdw/stripe_fdw.rs @@ -1,4 +1,5 @@ use crate::stats; +use pgrx::pg_sys::panic::ErrorReport; use pgrx::{datum::datetime_support::to_timestamp, pg_sys, prelude::PgSqlErrorCode, JsonB}; use reqwest::{self, header, StatusCode, Url}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; @@ -248,14 +249,14 @@ macro_rules! report_request_error { PgSqlErrorCode::ERRCODE_FDW_ERROR, &format!("request failed: {}", $err), ); - return; }}; } #[wrappers_fdw( version = "0.1.7", author = "Supabase", - website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/stripe_fdw" + website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/stripe_fdw", + error_type = "StripeFdwError" )] pub(crate) struct StripeFdw { rt: Runtime, @@ -625,8 +626,16 @@ impl StripeFdw { } } -impl ForeignDataWrapper for StripeFdw { - fn new(options: &HashMap) -> Self { +enum StripeFdwError {} + +impl From for ErrorReport { + fn from(_value: StripeFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + } +} + +impl ForeignDataWrapper for StripeFdw { + fn new(options: &HashMap) -> Result { let base_url = options .get("api_url") .map(|t| t.to_owned()) @@ -649,14 +658,14 @@ impl ForeignDataWrapper for StripeFdw { stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); - StripeFdw { + Ok(StripeFdw { rt: create_async_runtime(), base_url: Url::parse(&base_url).unwrap(), client, scan_result: None, obj: String::default(), rowid_col: String::default(), - } + }) } fn begin_scan( @@ -666,18 +675,18 @@ impl ForeignDataWrapper for StripeFdw { _sorts: &[Sort], limit: &Option, options: &HashMap, - ) { + ) -> Result<(), StripeFdwError> { let obj = if let Some(name) = require_option("object", options) { name } else { - return; + return Ok(()); }; if let Some(client) = &self.client { let page_size = 100; // maximum page size limit for Stripe API let page_cnt = if let Some(limit) = limit { if limit.count == 0 { - return; + return Ok(()); } (limit.offset + limit.count) / page_size + 1 } else { @@ -693,7 +702,7 @@ impl ForeignDataWrapper for StripeFdw { // build url let url = self.build_url(&obj, quals, page_size, &cursor); if url.is_none() { - return; + return Ok(()); } let url = url.unwrap(); @@ -732,10 +741,16 @@ impl ForeignDataWrapper for StripeFdw { } cursor = starting_after; } - Err(err) => report_request_error!(err), + Err(err) => { + report_request_error!(err); + return Ok(()); + } } } - Err(err) => report_request_error!(err), + Err(err) => { + report_request_error!(err); + return Ok(()); + } } page += 1; @@ -748,35 +763,39 @@ impl ForeignDataWrapper for StripeFdw { self.scan_result = Some(result); } + + Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Option<()> { + fn iter_scan(&mut self, row: &mut Row) -> Result, StripeFdwError> { if let Some(ref mut result) = self.scan_result { if !result.is_empty() { - return result + return Ok(result .drain(0..1) .last() - .map(|src_row| row.replace_with(src_row)); + .map(|src_row| row.replace_with(src_row))); } } - None + Ok(None) } - fn end_scan(&mut self) { + fn end_scan(&mut self) -> Result<(), StripeFdwError> { self.scan_result.take(); + Ok(()) } - fn begin_modify(&mut self, options: &HashMap) { + fn begin_modify(&mut self, options: &HashMap) -> Result<(), StripeFdwError> { self.obj = require_option("object", options).unwrap_or_default(); self.rowid_col = require_option("rowid_column", options).unwrap_or_default(); + Ok(()) } - fn insert(&mut self, src: &Row) { + fn insert(&mut self, src: &Row) -> Result<(), StripeFdwError> { if let Some(ref mut client) = self.client { let url = self.base_url.join(&self.obj).unwrap(); let body = row_to_body(src); if body.is_null() { - return; + return Ok(()); } let mut stats_metadata = get_stats_metadata(); @@ -797,16 +816,23 @@ impl ForeignDataWrapper for StripeFdw { report_info(&format!("inserted {} {}", self.obj, id)); } } - Err(err) => report_request_error!(err), + Err(err) => { + report_request_error!(err); + return Ok(()); + } }, - Err(err) => report_request_error!(err), + Err(err) => { + report_request_error!(err); + return Ok(()); + } } set_stats_metadata(stats_metadata); } + Ok(()) } - fn update(&mut self, rowid: &Cell, new_row: &Row) { + fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), StripeFdwError> { if let Some(ref mut client) = self.client { let mut stats_metadata = get_stats_metadata(); @@ -820,7 +846,7 @@ impl ForeignDataWrapper for StripeFdw { .unwrap(); let body = row_to_body(new_row); if body.is_null() { - return; + return Ok(()); } // call Stripe API @@ -839,9 +865,15 @@ impl ForeignDataWrapper for StripeFdw { report_info(&format!("updated {} {}", self.obj, id)); } } - Err(err) => report_request_error!(err), + Err(err) => { + report_request_error!(err); + return Ok(()); + } }, - Err(err) => report_request_error!(err), + Err(err) => { + report_request_error!(err); + return Ok(()); + } } } _ => unreachable!(), @@ -849,9 +881,10 @@ impl ForeignDataWrapper for StripeFdw { set_stats_metadata(stats_metadata); } + Ok(()) } - fn delete(&mut self, rowid: &Cell) { + fn delete(&mut self, rowid: &Cell) -> Result<(), StripeFdwError> { if let Some(ref mut client) = self.client { let mut stats_metadata = get_stats_metadata(); @@ -880,9 +913,15 @@ impl ForeignDataWrapper for StripeFdw { report_info(&format!("deleted {} {}", self.obj, id)); } } - Err(err) => report_request_error!(err), + Err(err) => { + report_request_error!(err); + return Ok(()); + } }, - Err(err) => report_request_error!(err), + Err(err) => { + report_request_error!(err); + return Ok(()); + } } } _ => unreachable!(), @@ -890,15 +929,19 @@ impl ForeignDataWrapper for StripeFdw { set_stats_metadata(stats_metadata); } + Ok(()) } - fn end_modify(&mut self) {} - - fn validator(options: Vec>, catalog: Option) { + fn validator( + options: Vec>, + catalog: Option, + ) -> Result<(), StripeFdwError> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { check_options_contain(&options, "object"); } } + + Ok(()) } }