Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add PG13 support for wrappers framework #313

Merged
merged 2 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions supabase-wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ keywords = ["database", "postgres", "postgresql", "extension"]
[features]
default = ["cshim", "pg15"]
cshim = ["pgrx/cshim"]
pg13 = ["pgrx/pg13", "pgrx-tests/pg13"]
pg14 = ["pgrx/pg14", "pgrx-tests/pg14"]
pg15 = ["pgrx/pg15", "pgrx-tests/pg15"]
pg16 = ["pgrx/pg16", "pgrx-tests/pg16"]
Expand Down
137 changes: 108 additions & 29 deletions supabase-wrappers/src/modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ struct FdwModifyState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
// memory context
tmp_ctx: PgMemoryContexts,
_phantom: PhantomData<E>,

#[cfg(feature = "pg13")]
update_cols: Vec<String>,
}

impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwModifyState<E, W> {
Expand All @@ -44,6 +47,8 @@ impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwModifyState<E, W> {
opts: HashMap::new(),
tmp_ctx,
_phantom: PhantomData,
#[cfg(feature = "pg13")]
update_cols: Vec::new(),
}
}

Expand All @@ -70,6 +75,66 @@ impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwModifyState<E, W> {

impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> utils::SerdeList for FdwModifyState<E, W> {}

// find rowid column in relation description
unsafe fn find_rowid_column(
target_relation: pg_sys::Relation,
) -> Option<pg_sys::FormData_pg_attribute> {
// get rowid column name from table options
let ftable = pg_sys::GetForeignTable((*target_relation).rd_id);
let opts = options_to_hashmap((*ftable).options).report_unwrap();
let rowid_name = require_option("rowid_column", &opts).report_unwrap();

// find rowid attribute
let tup_desc = PgTupleDesc::from_pg_copy((*target_relation).rd_att);
for attr in tup_desc.iter().filter(|a| !a.is_dropped()) {
if pgrx::name_data_to_str(&attr.attname) == rowid_name {
return Some(*attr);
}
}

report_error(
PgSqlErrorCode::ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION,
"cannot find rowid_column attribute in the foreign table",
);

None
}

#[cfg(feature = "pg13")]
#[pg_guard]
pub(super) extern "C" fn add_foreign_update_targets(
parsetree: *mut pg_sys::Query,
_target_rte: *mut pg_sys::RangeTblEntry,
target_relation: pg_sys::Relation,
) {
debug2!("---> add_foreign_update_targets");
unsafe {
if let Some(attr) = find_rowid_column(target_relation) {
// make a Var representing the desired value
let var = pg_sys::makeVar(
(*parsetree).resultRelation as _,
attr.attnum,
attr.atttypid,
attr.atttypmod,
attr.attcollation,
0,
);

// wrap the var in a resjunk TLE
let tle = pg_sys::makeTargetEntry(
var as _,
((*(*parsetree).targetList).length + 1) as _,
pg_sys::pstrdup(attr.attname.data.as_ptr()),
true,
);

// add it to the query's target list
(*parsetree).targetList = pg_sys::lappend((*parsetree).targetList, tle as _);
}
}
}

#[cfg(not(feature = "pg13"))]
#[pg_guard]
pub(super) extern "C" fn add_foreign_update_targets(
root: *mut pg_sys::PlannerInfo,
Expand All @@ -79,35 +144,20 @@ pub(super) extern "C" fn add_foreign_update_targets(
) {
debug2!("---> add_foreign_update_targets");
unsafe {
// get rowid column name from table options
let ftable = pg_sys::GetForeignTable((*target_relation).rd_id);
let opts = options_to_hashmap((*ftable).options).report_unwrap();
let rowid_name = require_option("rowid_column", &opts).report_unwrap();
if let Some(attr) = find_rowid_column(target_relation) {
// make a Var representing the desired value
let var = pg_sys::makeVar(
rtindex as _,
attr.attnum,
attr.atttypid,
attr.atttypmod,
attr.attcollation,
0,
);

// find rowid attribute
let tup_desc = PgTupleDesc::from_pg_copy((*target_relation).rd_att);
for attr in tup_desc.iter().filter(|a| !a.attisdropped) {
if pgrx::name_data_to_str(&attr.attname) == rowid_name {
// make a Var representing the desired value
let var = pg_sys::makeVar(
rtindex.try_into().unwrap(),
attr.attnum,
attr.atttypid,
attr.atttypmod,
attr.attcollation,
0,
);

// register it as a row-identity column needed by this target rel
pg_sys::add_row_identity_var(root, var, rtindex, &attr.attname.data as _);
return;
}
// register it as a row-identity column needed by this target rel
pg_sys::add_row_identity_var(root, var, rtindex, &attr.attname.data as _);
}

report_error(
PgSqlErrorCode::ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION,
"cannot find rowid_column attribute in the foreign table",
)
}
}

Expand Down Expand Up @@ -164,6 +214,22 @@ pub(super) extern "C" fn plan_foreign_modify<E: Into<ErrorReport>, W: ForeignDat
state.rowid_typid = attr.atttypid;
state.opts = opts;

#[cfg(feature = "pg13")]
{
// get update column list
let tgts: pgrx::PgList<pg_sys::TargetEntry> =
pgrx::PgList::from_pg((*(*root).parse).targetList);
for tgt in tgts.iter_ptr() {
let col_name = std::ffi::CStr::from_ptr((*tgt).resname)
.to_str()
.unwrap()
.to_owned();
if !(*tgt).resjunk {
state.update_cols.push(col_name);
}
}
}

// install callback to drop the state when memory context is reset
let mut ctx = PgMemoryContexts::For(state.tmp_ctx.value());
let p = PgBox::from_pg(ctx.leak_and_drop_on_delete(state));
Expand Down Expand Up @@ -199,6 +265,9 @@ pub(super) extern "C" fn begin_foreign_modify<E: Into<ErrorReport>, W: ForeignDa
assert!(!state.is_null());

// search for rowid attribute number
#[cfg(feature = "pg13")]
let subplan = (*(*(*mtstate).mt_plans.offset(_subplan_index as _))).plan;
#[cfg(not(feature = "pg13"))]
let subplan = (*polyfill::outer_plan_state(&mut (*mtstate).ps)).plan;
let rowid_name_c = PgMemoryContexts::CurrentMemoryContext.pstrdup(&state.rowid_name);
state.rowid_attno =
Expand Down Expand Up @@ -282,10 +351,20 @@ pub(super) extern "C" fn exec_foreign_update<E: Into<ErrorReport>, W: ForeignDat
// so we only keep the updated new attributes
let tup_desc = PgTupleDesc::from_pg_copy((*slot).tts_tupleDescriptor);
new_row.retain(|(col, _)| {
tup_desc.iter().filter(|a| !a.attisdropped).any(|a| {
let is_ft_col = tup_desc.iter().filter(|a| !a.attisdropped).any(|a| {
let attr_name = pgrx::name_data_to_str(&a.attname);
attr_name == col.as_str()
}) && state.rowid_name != col.as_str()
});

#[cfg(not(feature = "pg13"))]
{
is_ft_col && state.rowid_name != col.as_str()
}

#[cfg(feature = "pg13")]
{
is_ft_col && state.update_cols.iter().any(|c| c == col.as_str())
}
});

state.update(&rowid, &new_row).report_unwrap();
Expand Down
1 change: 1 addition & 0 deletions supabase-wrappers/src/polyfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub(super) unsafe fn slot_getattr(
values[attnum - 1]
}

#[cfg(not(feature = "pg13"))]
#[inline]
pub(super) unsafe fn outer_plan_state(node: *mut pg_sys::PlanState) -> *mut pg_sys::PlanState {
(*node).lefttree
Expand Down
2 changes: 1 addition & 1 deletion supabase-wrappers/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub(super) unsafe fn extract_target_columns(
// get column names from var list
let col_vars: PgList<pg_sys::Var> = PgList::from_pg(col_vars);
for var in col_vars.iter_ptr() {
let rte = pg_sys::planner_rt_fetch((*var).varno as u32, root);
let rte = pg_sys::planner_rt_fetch((*var).varno as _, root);
let attno = (*var).varattno;
let attname = pg_sys::get_attname((*rte).relid, attno, true);
if !attname.is_null() {
Expand Down
1 change: 1 addition & 0 deletions wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ crate-type = ["cdylib"]
[features]
default = ["cshim", "pg15"]
cshim = ["pgrx/cshim"]
pg13 = ["pgrx/pg13", "pgrx-tests/pg13", "supabase-wrappers/pg13"]
pg14 = ["pgrx/pg14", "pgrx-tests/pg14", "supabase-wrappers/pg14"]
pg15 = ["pgrx/pg15", "pgrx-tests/pg15", "supabase-wrappers/pg15"]
pg16 = ["pgrx/pg16", "pgrx-tests/pg16", "supabase-wrappers/pg16"]
Expand Down
Loading