Skip to content

Commit

Permalink
Merge pull request #144 from supabase/feat/fallible-fdw-trait
Browse files Browse the repository at this point in the history
Fallible ForeignDataWrapper trait methods
  • Loading branch information
burmecia authored Sep 13, 2023
2 parents 59b7ec5 + ce92c76 commit 5306290
Show file tree
Hide file tree
Showing 14 changed files with 545 additions and 278 deletions.
23 changes: 20 additions & 3 deletions supabase-wrappers-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,30 @@ pub fn wrappers_fdw(attr: TokenStream, item: TokenStream) -> TokenStream {
let mut metas = TokenStream2::new();
let meta_attrs: Punctuated<MetaNameValue, Token![,]> =
parse_macro_input!(attr with Punctuated::parse_terminated);
let mut error_type: Option<String> = None;
for attr in meta_attrs {
let name = format!("{}", attr.path.segments.first().unwrap().ident);
if let Lit::Str(val) = attr.lit {
let value = val.value();
metas.append_all(quote! {
meta.insert(#name.to_owned(), #value.to_owned());
});
if name == "version" || name == "author" || name == "website" {
metas.append_all(quote! {
meta.insert(#name.to_owned(), #value.to_owned());
});
} else if name == "error_type" {
error_type = Some(value);
}
}
}

let error_type_ident = if let Some(error_type) = error_type {
format_ident!("{}", error_type)
} else {
let quoted = quote! {
compile_error!("Missing `error_type` in the `wrappers_fdw` attribute");
};
return quoted.into();
};

let item: ItemStruct = parse_macro_input!(item as ItemStruct);
let item_tokens = item.to_token_stream();
let ident = item.ident;
Expand All @@ -68,6 +82,7 @@ pub fn wrappers_fdw(attr: TokenStream, item: TokenStream) -> TokenStream {
mod #module_ident {
use super::#ident;
use std::collections::HashMap;
use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable};
use pgrx::prelude::*;
use supabase_wrappers::prelude::*;

Expand All @@ -79,6 +94,8 @@ pub fn wrappers_fdw(attr: TokenStream, item: TokenStream) -> TokenStream {
#[pg_extern(create_or_replace)]
fn #fn_validator_ident(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) {
#ident::validator(options, catalog)
.map_err(|e| <super::#error_type_ident as Into<ErrorReport>>::into(e))
.report();
}

#[pg_extern(create_or_replace)]
Expand Down
8 changes: 6 additions & 2 deletions supabase-wrappers/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::prelude::*;
use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable};
use pgrx::prelude::*;

use super::utils;

// create a fdw instance
pub(super) unsafe fn create_fdw_instance<W: ForeignDataWrapper>(ftable_id: pg_sys::Oid) -> W {
pub(super) unsafe fn create_fdw_instance<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
ftable_id: pg_sys::Oid,
) -> W {
let ftable = pg_sys::GetForeignTable(ftable_id);
let fserver = pg_sys::GetForeignServer((*ftable).serverid);
let fserver_opts = utils::options_to_hashmap((*fserver).options);
W::new(&fserver_opts)
let wrapper = W::new(&fserver_opts);
wrapper.map_err(|e| e.into()).report()
}
97 changes: 68 additions & 29 deletions supabase-wrappers/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!

use crate::FdwRoutine;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::{Date, Timestamp};
use pgrx::{
fcinfo,
Expand Down Expand Up @@ -454,7 +455,7 @@ impl Limit {
///
/// See the module-level document for more details.
///
pub trait ForeignDataWrapper {
pub trait ForeignDataWrapper<E: Into<ErrorReport>> {
/// Create a FDW instance
///
/// `options` is the key-value pairs defined in `CREATE SERVER` SQL. For example,
Expand All @@ -472,7 +473,9 @@ pub trait ForeignDataWrapper {
/// You can do any initalization in this function, like saving connection
/// info or API url in an variable, but don't do heavy works like database
/// connection or API call.
fn new(options: &HashMap<String, String>) -> Self;
fn new(options: &HashMap<String, String>) -> Result<Self, E>
where
Self: Sized;

/// Obtain relation size estimates for a foreign table
///
Expand All @@ -487,8 +490,8 @@ pub trait ForeignDataWrapper {
_sorts: &[Sort],
_limit: &Option<Limit>,
_options: &HashMap<String, String>,
) -> (i64, i32) {
(0, 0)
) -> Result<(i64, i32), E> {
Ok((0, 0))
}

/// Called when begin executing a foreign scan
Expand All @@ -507,24 +510,26 @@ pub trait ForeignDataWrapper {
sorts: &[Sort],
limit: &Option<Limit>,
options: &HashMap<String, String>,
);
) -> Result<(), E>;

/// Called when fetch one row from the foreign source
///
/// FDW must save fetched foreign data into the [`Row`], or return `None` if no more rows to read.
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-SCAN).
fn iter_scan(&mut self, row: &mut Row) -> Option<()>;
fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, E>;

/// Called when restart the scan from the beginning.
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-SCAN).
fn re_scan(&mut self) {}
fn re_scan(&mut self) -> Result<(), E> {
Ok(())
}

/// Called when end the scan
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-SCAN).
fn end_scan(&mut self);
fn end_scan(&mut self) -> Result<(), E>;

/// Called when begin executing a foreign table modification operation.
///
Expand All @@ -548,34 +553,44 @@ pub trait ForeignDataWrapper {
/// ```
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE).
fn begin_modify(&mut self, _options: &HashMap<String, String>) {}
fn begin_modify(&mut self, _options: &HashMap<String, String>) -> Result<(), E> {
Ok(())
}

/// Called when insert one row into the foreign table
///
/// - row - the new row to be inserted
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE).
fn insert(&mut self, _row: &Row) {}
fn insert(&mut self, _row: &Row) -> Result<(), E> {
Ok(())
}

/// Called when update one row into the foreign table
///
/// - rowid - the `rowid_column` cell
/// - new_row - the new row with updated cells
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE).
fn update(&mut self, _rowid: &Cell, _new_row: &Row) {}
fn update(&mut self, _rowid: &Cell, _new_row: &Row) -> Result<(), E> {
Ok(())
}

/// Called when delete one row into the foreign table
///
/// - rowid - the `rowid_column` cell
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE).
fn delete(&mut self, _rowid: &Cell) {}
fn delete(&mut self, _rowid: &Cell) -> Result<(), E> {
Ok(())
}

/// Called when end the table update
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE).
fn end_modify(&mut self) {}
fn end_modify(&mut self) -> Result<(), E> {
Ok(())
}

/// Returns a FdwRoutine for the FDW
///
Expand All @@ -590,25 +605,25 @@ pub trait ForeignDataWrapper {
FdwRoutine::<AllocatedByRust>::alloc_node(pg_sys::NodeTag_T_FdwRoutine);

// plan phase
fdw_routine.GetForeignRelSize = Some(scan::get_foreign_rel_size::<Self>);
fdw_routine.GetForeignPaths = Some(scan::get_foreign_paths::<Self>);
fdw_routine.GetForeignPlan = Some(scan::get_foreign_plan::<Self>);
fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::<Self>);
fdw_routine.GetForeignRelSize = Some(scan::get_foreign_rel_size::<E, Self>);
fdw_routine.GetForeignPaths = Some(scan::get_foreign_paths::<E, Self>);
fdw_routine.GetForeignPlan = Some(scan::get_foreign_plan::<E, Self>);
fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::<E, Self>);

// scan phase
fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::<Self>);
fdw_routine.IterateForeignScan = Some(scan::iterate_foreign_scan::<Self>);
fdw_routine.ReScanForeignScan = Some(scan::re_scan_foreign_scan::<Self>);
fdw_routine.EndForeignScan = Some(scan::end_foreign_scan::<Self>);
fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::<E, Self>);
fdw_routine.IterateForeignScan = Some(scan::iterate_foreign_scan::<E, Self>);
fdw_routine.ReScanForeignScan = Some(scan::re_scan_foreign_scan::<E, Self>);
fdw_routine.EndForeignScan = Some(scan::end_foreign_scan::<E, Self>);

// modify phase
fdw_routine.AddForeignUpdateTargets = Some(modify::add_foreign_update_targets);
fdw_routine.PlanForeignModify = Some(modify::plan_foreign_modify::<Self>);
fdw_routine.BeginForeignModify = Some(modify::begin_foreign_modify::<Self>);
fdw_routine.ExecForeignInsert = Some(modify::exec_foreign_insert::<Self>);
fdw_routine.ExecForeignDelete = Some(modify::exec_foreign_delete::<Self>);
fdw_routine.ExecForeignUpdate = Some(modify::exec_foreign_update::<Self>);
fdw_routine.EndForeignModify = Some(modify::end_foreign_modify::<Self>);
fdw_routine.PlanForeignModify = Some(modify::plan_foreign_modify::<E, Self>);
fdw_routine.BeginForeignModify = Some(modify::begin_foreign_modify::<E, Self>);
fdw_routine.ExecForeignInsert = Some(modify::exec_foreign_insert::<E, Self>);
fdw_routine.ExecForeignDelete = Some(modify::exec_foreign_delete::<E, Self>);
fdw_routine.ExecForeignUpdate = Some(modify::exec_foreign_update::<E, Self>);
fdw_routine.EndForeignModify = Some(modify::end_foreign_modify::<E, Self>);

Self::fdw_routine_hook(&mut fdw_routine);
fdw_routine.into_pg_boxed()
Expand All @@ -631,7 +646,27 @@ pub trait ForeignDataWrapper {
/// use pgrx::pg_sys::Oid;
/// use supabase_wrappers::prelude::check_options_contain;
///
/// fn validator(opt_list: Vec<Option<String>>, catalog: Option<Oid>) {
/// use pgrx::pg_sys::panic::ErrorReport;
/// use pgrx::PgSqlErrorCode;
///
/// enum FdwError {
/// InvalidFdwOption,
/// InvalidServerOption,
/// InvalidTableOption,
/// }
///
/// impl From<FdwError> for ErrorReport {
/// fn from(value: FdwError) -> Self {
/// let error_message = match value {
/// FdwError::InvalidFdwOption => "invalid foreign data wrapper option",
/// FdwError::InvalidServerOption => "invalid foreign server option",
/// FdwError::InvalidTableOption => "invalid foreign table option",
/// };
/// ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, error_message, "")
/// }
/// }
///
/// fn validator(opt_list: Vec<Option<String>>, catalog: Option<Oid>) -> Result<(), FdwError> {
/// if let Some(oid) = catalog {
/// match oid {
/// FOREIGN_DATA_WRAPPER_RELATION_ID => {
Expand All @@ -649,7 +684,11 @@ pub trait ForeignDataWrapper {
/// _ => {}
/// }
/// }
///
/// Ok(())
/// }
/// ```
fn validator(_options: Vec<Option<String>>, _catalog: Option<Oid>) {}
fn validator(_options: Vec<Option<String>>, _catalog: Option<Oid>) -> Result<(), E> {
Ok(())
}
}
Loading

0 comments on commit 5306290

Please sign in to comment.