Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fallible ForeignDataWrapper trait methods #144

Merged
merged 15 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions supabase-wrappers-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,30 @@ pub fn wrappers_fdw(attr: TokenStream, item: TokenStream) -> TokenStream {
let mut metas = TokenStream2::new();
let meta_attrs: Punctuated<MetaNameValue, Token![,]> =
parse_macro_input!(attr with Punctuated::parse_terminated);
let mut error_type: Option<String> = None;
for attr in meta_attrs {
let name = format!("{}", attr.path.segments.first().unwrap().ident);
if let Lit::Str(val) = attr.lit {
let value = val.value();
metas.append_all(quote! {
meta.insert(#name.to_owned(), #value.to_owned());
});
if name == "version" || name == "author" || name == "website" {
metas.append_all(quote! {
meta.insert(#name.to_owned(), #value.to_owned());
});
} else if name == "error_type" {
error_type = Some(value);
}
}
}

let error_type_ident = if let Some(error_type) = error_type {
format_ident!("{}", error_type)
} else {
let quoted = quote! {
compile_error!("Missing `error_type` in the `wrappers_fdw` attribute");
};
return quoted.into();
};

let item: ItemStruct = parse_macro_input!(item as ItemStruct);
let item_tokens = item.to_token_stream();
let ident = item.ident;
Expand All @@ -68,6 +82,7 @@ pub fn wrappers_fdw(attr: TokenStream, item: TokenStream) -> TokenStream {
mod #module_ident {
use super::#ident;
use std::collections::HashMap;
use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable};
use pgrx::prelude::*;
use supabase_wrappers::prelude::*;

Expand All @@ -79,6 +94,8 @@ pub fn wrappers_fdw(attr: TokenStream, item: TokenStream) -> TokenStream {
#[pg_extern(create_or_replace)]
fn #fn_validator_ident(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) {
#ident::validator(options, catalog)
.map_err(|e| <super::#error_type_ident as Into<ErrorReport>>::into(e))
.report();
}

#[pg_extern(create_or_replace)]
Expand Down
8 changes: 6 additions & 2 deletions supabase-wrappers/src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::prelude::*;
use pgrx::pg_sys::panic::{ErrorReport, ErrorReportable};
use pgrx::prelude::*;

use super::utils;

// create a fdw instance
pub(super) unsafe fn create_fdw_instance<W: ForeignDataWrapper>(ftable_id: pg_sys::Oid) -> W {
pub(super) unsafe fn create_fdw_instance<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
ftable_id: pg_sys::Oid,
) -> W {
let ftable = pg_sys::GetForeignTable(ftable_id);
let fserver = pg_sys::GetForeignServer((*ftable).serverid);
let fserver_opts = utils::options_to_hashmap((*fserver).options);
W::new(&fserver_opts)
let wrapper = W::new(&fserver_opts);
wrapper.map_err(|e| e.into()).report()
}
97 changes: 68 additions & 29 deletions supabase-wrappers/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!

use crate::FdwRoutine;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::prelude::{Date, Timestamp};
use pgrx::{
fcinfo,
Expand Down Expand Up @@ -454,7 +455,7 @@ impl Limit {
///
/// See the module-level document for more details.
///
pub trait ForeignDataWrapper {
pub trait ForeignDataWrapper<E: Into<ErrorReport>> {
/// Create a FDW instance
///
/// `options` is the key-value pairs defined in `CREATE SERVER` SQL. For example,
Expand All @@ -472,7 +473,9 @@ pub trait ForeignDataWrapper {
/// You can do any initalization in this function, like saving connection
/// info or API url in an variable, but don't do heavy works like database
/// connection or API call.
fn new(options: &HashMap<String, String>) -> Self;
fn new(options: &HashMap<String, String>) -> Result<Self, E>
where
Self: Sized;

/// Obtain relation size estimates for a foreign table
///
Expand All @@ -487,8 +490,8 @@ pub trait ForeignDataWrapper {
_sorts: &[Sort],
_limit: &Option<Limit>,
_options: &HashMap<String, String>,
) -> (i64, i32) {
(0, 0)
) -> Result<(i64, i32), E> {
Ok((0, 0))
}

/// Called when begin executing a foreign scan
Expand All @@ -507,24 +510,26 @@ pub trait ForeignDataWrapper {
sorts: &[Sort],
limit: &Option<Limit>,
options: &HashMap<String, String>,
);
) -> Result<(), E>;

/// Called when fetch one row from the foreign source
///
/// FDW must save fetched foreign data into the [`Row`], or return `None` if no more rows to read.
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-SCAN).
fn iter_scan(&mut self, row: &mut Row) -> Option<()>;
fn iter_scan(&mut self, row: &mut Row) -> Result<Option<()>, E>;

/// Called when restart the scan from the beginning.
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-SCAN).
fn re_scan(&mut self) {}
fn re_scan(&mut self) -> Result<(), E> {
Ok(())
}

/// Called when end the scan
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-SCAN).
fn end_scan(&mut self);
fn end_scan(&mut self) -> Result<(), E>;

/// Called when begin executing a foreign table modification operation.
///
Expand All @@ -548,34 +553,44 @@ pub trait ForeignDataWrapper {
/// ```
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE).
fn begin_modify(&mut self, _options: &HashMap<String, String>) {}
fn begin_modify(&mut self, _options: &HashMap<String, String>) -> Result<(), E> {
Ok(())
}

/// Called when insert one row into the foreign table
///
/// - row - the new row to be inserted
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE).
fn insert(&mut self, _row: &Row) {}
fn insert(&mut self, _row: &Row) -> Result<(), E> {
Ok(())
}

/// Called when update one row into the foreign table
///
/// - rowid - the `rowid_column` cell
/// - new_row - the new row with updated cells
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE).
fn update(&mut self, _rowid: &Cell, _new_row: &Row) {}
fn update(&mut self, _rowid: &Cell, _new_row: &Row) -> Result<(), E> {
Ok(())
}

/// Called when delete one row into the foreign table
///
/// - rowid - the `rowid_column` cell
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE).
fn delete(&mut self, _rowid: &Cell) {}
fn delete(&mut self, _rowid: &Cell) -> Result<(), E> {
Ok(())
}

/// Called when end the table update
///
/// [See more details](https://www.postgresql.org/docs/current/fdw-callbacks.html#FDW-CALLBACKS-UPDATE).
fn end_modify(&mut self) {}
fn end_modify(&mut self) -> Result<(), E> {
Ok(())
}

/// Returns a FdwRoutine for the FDW
///
Expand All @@ -590,25 +605,25 @@ pub trait ForeignDataWrapper {
FdwRoutine::<AllocatedByRust>::alloc_node(pg_sys::NodeTag_T_FdwRoutine);

// plan phase
fdw_routine.GetForeignRelSize = Some(scan::get_foreign_rel_size::<Self>);
fdw_routine.GetForeignPaths = Some(scan::get_foreign_paths::<Self>);
fdw_routine.GetForeignPlan = Some(scan::get_foreign_plan::<Self>);
fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::<Self>);
fdw_routine.GetForeignRelSize = Some(scan::get_foreign_rel_size::<E, Self>);
fdw_routine.GetForeignPaths = Some(scan::get_foreign_paths::<E, Self>);
fdw_routine.GetForeignPlan = Some(scan::get_foreign_plan::<E, Self>);
fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::<E, Self>);

// scan phase
fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::<Self>);
fdw_routine.IterateForeignScan = Some(scan::iterate_foreign_scan::<Self>);
fdw_routine.ReScanForeignScan = Some(scan::re_scan_foreign_scan::<Self>);
fdw_routine.EndForeignScan = Some(scan::end_foreign_scan::<Self>);
fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::<E, Self>);
fdw_routine.IterateForeignScan = Some(scan::iterate_foreign_scan::<E, Self>);
fdw_routine.ReScanForeignScan = Some(scan::re_scan_foreign_scan::<E, Self>);
fdw_routine.EndForeignScan = Some(scan::end_foreign_scan::<E, Self>);

// modify phase
fdw_routine.AddForeignUpdateTargets = Some(modify::add_foreign_update_targets);
fdw_routine.PlanForeignModify = Some(modify::plan_foreign_modify::<Self>);
fdw_routine.BeginForeignModify = Some(modify::begin_foreign_modify::<Self>);
fdw_routine.ExecForeignInsert = Some(modify::exec_foreign_insert::<Self>);
fdw_routine.ExecForeignDelete = Some(modify::exec_foreign_delete::<Self>);
fdw_routine.ExecForeignUpdate = Some(modify::exec_foreign_update::<Self>);
fdw_routine.EndForeignModify = Some(modify::end_foreign_modify::<Self>);
fdw_routine.PlanForeignModify = Some(modify::plan_foreign_modify::<E, Self>);
fdw_routine.BeginForeignModify = Some(modify::begin_foreign_modify::<E, Self>);
fdw_routine.ExecForeignInsert = Some(modify::exec_foreign_insert::<E, Self>);
fdw_routine.ExecForeignDelete = Some(modify::exec_foreign_delete::<E, Self>);
fdw_routine.ExecForeignUpdate = Some(modify::exec_foreign_update::<E, Self>);
fdw_routine.EndForeignModify = Some(modify::end_foreign_modify::<E, Self>);

Self::fdw_routine_hook(&mut fdw_routine);
fdw_routine.into_pg_boxed()
Expand All @@ -631,7 +646,27 @@ pub trait ForeignDataWrapper {
/// use pgrx::pg_sys::Oid;
/// use supabase_wrappers::prelude::check_options_contain;
///
/// fn validator(opt_list: Vec<Option<String>>, catalog: Option<Oid>) {
/// use pgrx::pg_sys::panic::ErrorReport;
/// use pgrx::PgSqlErrorCode;
///
/// enum FdwError {
/// InvalidFdwOption,
/// InvalidServerOption,
/// InvalidTableOption,
/// }
///
/// impl From<FdwError> for ErrorReport {
/// fn from(value: FdwError) -> Self {
/// let error_message = match value {
/// FdwError::InvalidFdwOption => "invalid foreign data wrapper option",
/// FdwError::InvalidServerOption => "invalid foreign server option",
/// FdwError::InvalidTableOption => "invalid foreign table option",
/// };
/// ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, error_message, "")
/// }
/// }
///
/// fn validator(opt_list: Vec<Option<String>>, catalog: Option<Oid>) -> Result<(), FdwError> {
/// if let Some(oid) = catalog {
/// match oid {
/// FOREIGN_DATA_WRAPPER_RELATION_ID => {
Expand All @@ -649,7 +684,11 @@ pub trait ForeignDataWrapper {
/// _ => {}
/// }
/// }
///
/// Ok(())
/// }
/// ```
fn validator(_options: Vec<Option<String>>, _catalog: Option<Oid>) {}
fn validator(_options: Vec<Option<String>>, _catalog: Option<Oid>) -> Result<(), E> {
Ok(())
}
}
Loading
Loading