Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions src/common/meta/src/kv_backend/rds/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,18 @@ impl<'a> MySqlTemplateFactory<'a> {
"CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM {table_name} WHERE k = ?"),
range: format!("SELECT k, v FROM {table_name} WHERE k >= ? AND k < ? ORDER BY k"),
full: format!("SELECT k, v FROM {table_name} ? ORDER BY k"),
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= ? ORDER BY k"),
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE ? ORDER BY k"),
point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
full: format!("SELECT k, v FROM `{table_name}` ? ORDER BY k"),
left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
},
delete_template: RangeTemplate {
point: format!("DELETE FROM {table_name} WHERE k = ?;"),
range: format!("DELETE FROM {table_name} WHERE k >= ? AND k < ?;"),
full: format!("DELETE FROM {table_name}"),
left_bounded: format!("DELETE FROM {table_name} WHERE k >= ?;"),
prefix: format!("DELETE FROM {table_name} WHERE k LIKE ?;"),
point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
full: format!("DELETE FROM `{table_name}`"),
left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
},
}
}
Expand All @@ -189,14 +189,17 @@ impl MySqlTemplateSet {
fn generate_batch_get_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
format!(
"SELECT k, v FROM `{table_name}` WHERE k in ({});",
in_clause
)
}

/// Generates the sql for batch delete.
fn generate_batch_delete_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
format!("DELETE FROM {table_name} WHERE k in ({});", in_clause)
format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
}

/// Generates the sql for batch upsert.
Expand All @@ -212,9 +215,9 @@ impl MySqlTemplateSet {
let values_clause = values_placeholders.join(", ");

(
format!(r#"SELECT k, v FROM {table_name} WHERE k IN ({in_clause})"#,),
format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
format!(
r#"INSERT INTO {table_name} (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
),
)
}
Expand Down
37 changes: 22 additions & 15 deletions src/common/meta/src/kv_backend/rds/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,25 @@ impl<'a> PgSqlTemplateFactory<'a> {
PgSqlTemplateSet {
table_name: table_name.to_string(),
create_table_statement: format!(
"CREATE TABLE IF NOT EXISTS {table_name}(k bytea PRIMARY KEY, v bytea)",
"CREATE TABLE IF NOT EXISTS \"{table_name}\"(k bytea PRIMARY KEY, v bytea)",
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM {table_name} WHERE k = $1"),
range: format!("SELECT k, v FROM {table_name} WHERE k >= $1 AND k < $2 ORDER BY k"),
full: format!("SELECT k, v FROM {table_name} $1 ORDER BY k"),
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= $1 ORDER BY k"),
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE $1 ORDER BY k"),
point: format!("SELECT k, v FROM \"{table_name}\" WHERE k = $1"),
range: format!(
"SELECT k, v FROM \"{table_name}\" WHERE k >= $1 AND k < $2 ORDER BY k"
),
full: format!("SELECT k, v FROM \"{table_name}\" $1 ORDER BY k"),
left_bounded: format!("SELECT k, v FROM \"{table_name}\" WHERE k >= $1 ORDER BY k"),
prefix: format!("SELECT k, v FROM \"{table_name}\" WHERE k LIKE $1 ORDER BY k"),
},
delete_template: RangeTemplate {
point: format!("DELETE FROM {table_name} WHERE k = $1 RETURNING k,v;"),
range: format!("DELETE FROM {table_name} WHERE k >= $1 AND k < $2 RETURNING k,v;"),
full: format!("DELETE FROM {table_name} RETURNING k,v"),
left_bounded: format!("DELETE FROM {table_name} WHERE k >= $1 RETURNING k,v;"),
prefix: format!("DELETE FROM {table_name} WHERE k LIKE $1 RETURNING k,v;"),
point: format!("DELETE FROM \"{table_name}\" WHERE k = $1 RETURNING k,v;"),
range: format!(
"DELETE FROM \"{table_name}\" WHERE k >= $1 AND k < $2 RETURNING k,v;"
),
full: format!("DELETE FROM \"{table_name}\" RETURNING k,v"),
left_bounded: format!("DELETE FROM \"{table_name}\" WHERE k >= $1 RETURNING k,v;"),
prefix: format!("DELETE FROM \"{table_name}\" WHERE k LIKE $1 RETURNING k,v;"),
},
}
}
Expand All @@ -191,15 +195,18 @@ impl PgSqlTemplateSet {
fn generate_batch_get_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
format!(
"SELECT k, v FROM \"{table_name}\" WHERE k in ({});",
in_clause
)
}

/// Generates the sql for batch delete.
fn generate_batch_delete_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
format!(
"DELETE FROM {table_name} WHERE k in ({}) RETURNING k,v;",
"DELETE FROM \"{table_name}\" WHERE k in ({}) RETURNING k,v;",
in_clause
)
}
Expand All @@ -220,9 +227,9 @@ impl PgSqlTemplateSet {
format!(
r#"
WITH prev AS (
SELECT k,v FROM {table_name} WHERE k IN ({in_clause})
SELECT k,v FROM "{table_name}" WHERE k IN ({in_clause})
), update AS (
INSERT INTO {table_name} (k, v) VALUES
INSERT INTO "{table_name}" (k, v) VALUES
{values_clause}
ON CONFLICT (
k
Expand Down
17 changes: 8 additions & 9 deletions src/meta-srv/src/election/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ impl<'a> ElectionSqlFactory<'a> {
fn put_value_with_lease_sql(&self) -> String {
format!(
r#"WITH prev AS (
SELECT k, v FROM {} WHERE k = $1
SELECT k, v FROM "{}" WHERE k = $1
), insert AS (
INSERT INTO {}
INSERT INTO "{}"
VALUES($1, convert_to($2 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
ON CONFLICT (k) DO NOTHING
)
Expand All @@ -140,7 +140,7 @@ impl<'a> ElectionSqlFactory<'a> {

fn update_value_with_lease_sql(&self) -> String {
format!(
r#"UPDATE {}
r#"UPDATE "{}"
SET v = convert_to($3 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
WHERE k = $1 AND v = $2"#,
self.table_name, LEASE_SEP
Expand All @@ -149,21 +149,21 @@ impl<'a> ElectionSqlFactory<'a> {

fn get_value_with_lease_sql(&self) -> String {
format!(
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k = $1"#,
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k = $1"#,
self.table_name
)
}

fn get_value_with_lease_by_prefix_sql(&self) -> String {
format!(
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k LIKE $1"#,
r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k LIKE $1"#,
self.table_name
)
}

fn delete_value_sql(&self) -> String {
format!(
"DELETE FROM {} WHERE k = $1 RETURNING k,v;",
"DELETE FROM \"{}\" WHERE k = $1 RETURNING k,v;",
self.table_name
)
}
Expand Down Expand Up @@ -285,7 +285,6 @@ impl Election for PgElection {
.is_ok()
}

/// TODO(CookiePie): Split the candidate registration and keep alive logic into separate methods, so that upper layers can call them separately.
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
let key = self.candidate_key();
let node_info =
Expand Down Expand Up @@ -745,7 +744,7 @@ mod tests {
});
if let Some(table_name) = table_name {
let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS {}(k bytea PRIMARY KEY, v bytea);",
"CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
table_name
);
client.execute(&create_table_sql, &[]).await.unwrap();
Expand All @@ -754,7 +753,7 @@ mod tests {
}

async fn drop_table(client: &Client, table_name: &str) {
let sql = format!("DROP TABLE IF EXISTS {};", table_name);
let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
client.execute(&sql, &[]).await.unwrap();
}

Expand Down