Skip to content

Commit

Permalink
feat: rename more files
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Jan 8, 2024
1 parent 214b30a commit 0929278
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 33 deletions.
4 changes: 4 additions & 0 deletions wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ airtable_fdw = [
]
cognito_fdw = [
"aws-sdk-cognitoidentityprovider",
"aws-config",
"aws-types",
"reqwest",
"reqwest-middleware",
"reqwest-retry",
"http",
"serde_json",
"tokio",
"serde",
"url",
"thiserror",
Expand Down Expand Up @@ -142,6 +144,7 @@ clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs", branch = "a
chrono = { version = "0.4", optional = true }
chrono-tz = { version = "0.6", optional = true }


# for bigquery_fdw, firebase_fdw, airtable_fdw and etc.
gcp-bigquery-client = { version = "0.17.0", optional = true }
serde = { version = "1", optional = true }
Expand All @@ -166,6 +169,7 @@ aws-config = { version = "1.1.1", optional = true }
aws-sdk-s3 = { version = "1.11.0", optional = true }
aws-smithy-http = { version = "0.60.1", optional = true }
aws-smithy-runtime-api = { version = "1.1.1", optional = true }

csv = { version = "1.2", optional = true }
tokio = { version = "1", features = ["full"], optional = true }
tokio-util = { version = "0.7", features = ["full"], optional = true }
Expand Down
34 changes: 17 additions & 17 deletions wrappers/src/fdw/cognito_fdw/cognito_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::fdw::auth0_fdw::auth0_client::row::Auth0User;
use crate::fdw::cognito_fdw::cognito_client::row::CognitoUser;
use http::{HeaderMap, HeaderName, HeaderValue};
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::PgSqlErrorCode;
Expand All @@ -12,26 +12,26 @@ use url::ParseError;

pub(crate) mod row;

pub(crate) struct Auth0Client {
pub(crate) struct CognitoClient {
url: Url,
client: ClientWithMiddleware,
}

pub(crate) mod rows_iterator;

impl Auth0Client {
pub(crate) fn new(url: &str, api_key: &str) -> Result<Self, Auth0ClientError> {
impl CognitoClient {
pub(crate) fn new(url: &str, api_key: &str) -> Result<Self, CognitoClientError> {
Ok(Self {
url: Url::parse(url)?,
client: Self::create_client(api_key)?,
})
}

fn create_client(api_key: &str) -> Result<ClientWithMiddleware, Auth0ClientError> {
fn create_client(api_key: &str) -> Result<ClientWithMiddleware, CognitoClientError> {
let mut headers = HeaderMap::new();
let header_name = HeaderName::from_static("api-key");
let mut api_key_value =
HeaderValue::from_str(api_key).map_err(|_| Auth0ClientError::InvalidApiKeyHeader)?;
HeaderValue::from_str(api_key).map_err(|_| CognitoClientError::InvalidApiKeyHeader)?;
api_key_value.set_sensitive(true);
headers.insert(header_name, api_key_value);
let client = reqwest::Client::builder()
Expand All @@ -51,21 +51,21 @@ impl Auth0Client {
&self,
_limit: Option<u64>,
_offset: Option<u64>,
) -> Result<Vec<Auth0User>, Auth0ClientError> {
) -> Result<Vec<CognitoUser>, CognitoClientError> {
let rt = create_async_runtime()?;

rt.block_on(async {
let response = self.get_client().get(self.url.clone()).send().await?;
let response = response.error_for_status()?;
let users = response.json::<Vec<Auth0User>>().await?;
let users = response.json::<Vec<CognitoUser>>().await?;
// let users = user_response.get_user_result()?;

Ok(users)
})
}
}
#[derive(Error, Debug)]
pub(crate) enum Auth0ClientError {
pub(crate) enum CognitoClientError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),

Expand All @@ -85,15 +85,15 @@ pub(crate) enum Auth0ClientError {
UrlParseError(#[from] ParseError),
}

impl From<Auth0ClientError> for ErrorReport {
fn from(value: Auth0ClientError) -> Self {
impl From<CognitoClientError> for ErrorReport {
fn from(value: CognitoClientError) -> Self {
match value {
Auth0ClientError::CreateRuntimeError(e) => e.into(),
Auth0ClientError::UrlParseError(_)
| Auth0ClientError::InvalidApiKeyHeader
| Auth0ClientError::ReqwestError(_)
| Auth0ClientError::ReqwestMiddlewareError(_)
| Auth0ClientError::SerdeError(_) => {
CognitoClientError::CreateRuntimeError(e) => e.into(),
CognitoClientError::UrlParseError(_)
| CognitoClientError::InvalidApiKeyHeader
| CognitoClientError::ReqwestError(_)
| CognitoClientError::ReqwestMiddlewareError(_)
| CognitoClientError::SerdeError(_) => {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), "")
}
}
Expand Down
8 changes: 4 additions & 4 deletions wrappers/src/fdw/cognito_fdw/cognito_client/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) struct UserRequest {

#[derive(Debug, Deserialize, PartialEq)]
pub struct ResultPayload {
pub(crate) users: Vec<Auth0User>,
pub(crate) users: Vec<CognitoUser>,
pub(crate) next_page_offset: Option<u64>,
}

Expand All @@ -27,17 +27,17 @@ pub(crate) struct Success {
}

#[derive(Debug)]
pub struct Auth0Fields(HashMap<String, Value>);
pub struct CognitoFields(HashMap<String, Value>);

#[derive(Debug, Deserialize, PartialEq)]
pub struct Auth0User {
pub struct CognitoUser {
pub created_at: String,
pub email: String,
pub email_verified: bool,
pub identities: Option<serde_json::Value>,
}

impl Auth0User {
impl CognitoUser {
pub(crate) fn into_row(mut self, columns: &[Column]) -> Row {
let mut row = Row::new();
for tgt_col in columns {
Expand Down
14 changes: 7 additions & 7 deletions wrappers/src/fdw/cognito_fdw/cognito_client/rows_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::fdw::auth0_fdw::auth0_client::{Auth0Client, Auth0ClientError};
use crate::fdw::cognito_fdw::cognito_client::{CognitoClient, CognitoClientError};
use std::collections::VecDeque;
use supabase_wrappers::prelude::{Column, Row};

pub(crate) struct RowsIterator {
auth0_client: Auth0Client,
cognito_client: CognitoClient,
batch_size: u64,
columns: Vec<Column>,
rows: VecDeque<Row>,
Expand All @@ -12,10 +12,10 @@ pub(crate) struct RowsIterator {
}

impl RowsIterator {
pub(crate) fn new(columns: Vec<Column>, batch_size: u64, auth0_client: Auth0Client) -> Self {
pub(crate) fn new(columns: Vec<Column>, batch_size: u64, cognito_client: CognitoClient) -> Self {
Self {
columns,
auth0_client,
cognito_client,
batch_size,
rows: VecDeque::new(),
have_more_rows: true,
Expand All @@ -31,9 +31,9 @@ impl RowsIterator {
self.next_page_offset
}

fn fetch_rows_batch(&mut self) -> Result<Option<Row>, Auth0ClientError> {
fn fetch_rows_batch(&mut self) -> Result<Option<Row>, CognitoClientError> {
let users = self
.auth0_client
.cognito_client
.fetch_users(self.get_limit(), self.get_offset())?;
self.rows = users
.into_iter()
Expand All @@ -53,7 +53,7 @@ impl RowsIterator {
}

impl Iterator for RowsIterator {
type Item = Result<Row, Auth0ClientError>;
type Item = Result<Row, CognitoClientError>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(row) = self.get_next_row() {
Expand Down
46 changes: 41 additions & 5 deletions wrappers/src/fdw/cognito_fdw/cognito_fdw.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
use crate::fdw::auth0_fdw::auth0_client::rows_iterator::RowsIterator;
use crate::fdw::auth0_fdw::auth0_client::CognitoClient;
use crate::fdw::cognito_fdw::cognito_client::rows_iterator::RowsIterator;
use crate::fdw::cognito_fdw::cognito_client::CognitoClient;
use crate::fdw::cognito_fdw::cognito_client::CognitoClientError;

use aws_sdk_cognitoidentityprovider::Client;
use std::sync::Arc;
use tokio::runtime::Runtime;

use aws_config::meta::region::RegionProviderChain;
use aws_sdk_cognitoidentityprovider::Credentials;
use aws_sdk_cognitoidentityprovider::Region;

use crate::stats;
use pgrx::pg_sys;
use std::collections::HashMap;
use supabase_wrappers::prelude::*;

use crate::fdw::cognito_fdw::cognito_client::CognitoClientError;
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::PgSqlErrorCode;
use thiserror::Error;
Expand Down Expand Up @@ -96,14 +104,39 @@ impl ForeignDataWrapper<CognitoFdwError> for CognitoFdw {

fn new(options: &HashMap<String, String>) -> Result<Self, CognitoFdwError> {
let url = require_option("url", options)?.to_string();
let cognito_key_id = require_option("access_key_id", options)?.to_string();
let cognito_access_key = require_option("access_key", options)?.to_string();
let user_pool_id = require_option("user_pool_id", options)?.to_string();
let aws_region = require_option("region", options)?.to_string();
let region = Region::new(aws_region);

let api_key = if let Some(api_key) = options.get("api_key") {
api_key.clone()
} else {
let api_key_id = options
.get("api_key_id")
.expect("`api_key_id` must be set if `api_key` is not");
get_vault_secret(api_key_id).ok_or(CognitoFdwError::SecretNotFound(api_key_id.clone()))?
get_vault_secret(api_key_id)
.ok_or(CognitoFdwError::SecretNotFound(api_key_id.clone()))?
};
let credentials = Credentials::new(
cognito_key_id,
cognito_access_key,
None, // Token if available
None, // Expiry if available
"manual",
);

let rt = Runtime::new().unwrap();
let config = rt.block_on(async {
aws_config::from_env()
.credentials_provider(credentials)
.region(region)
.load()
.await
});

let client = Client::new(&config);

stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1);
Ok(Self {
Expand Down Expand Up @@ -145,7 +178,10 @@ impl ForeignDataWrapper<CognitoFdwError> for CognitoFdw {
Ok(())
}

fn validator(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) -> CognitoFdwResult<()> {
fn validator(
options: Vec<Option<String>>,
catalog: Option<pg_sys::Oid>,
) -> CognitoFdwResult<()> {
if let Some(oid) = catalog {
if oid == FOREIGN_SERVER_RELATION_ID {
let api_key_exists = check_options_contain(&options, "api_key").is_ok();
Expand Down

0 comments on commit 0929278

Please sign in to comment.