Skip to content

Commit f627e55

Browse files
committed
http handler support set variable.
1 parent 9a45829 commit f627e55

File tree

7 files changed

+90
-9
lines changed

7 files changed

+90
-9
lines changed

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617
use std::time::SystemTime;
1718

@@ -22,6 +23,7 @@ use databend_common_exception::ErrorCode;
2223
use databend_common_exception::Result;
2324
use databend_common_expression::DataBlock;
2425
use databend_common_expression::DataSchemaRef;
26+
use databend_common_expression::Scalar;
2527
use databend_common_io::prelude::FormatSettings;
2628
use databend_common_settings::Settings;
2729
use databend_storages_common_txn::TxnManagerRef;
@@ -147,6 +149,7 @@ pub struct ExecutorSessionState {
147149
pub secondary_roles: Option<Vec<String>>,
148150
pub settings: Arc<Settings>,
149151
pub txn_manager: TxnManagerRef,
152+
pub variables: HashMap<String, Scalar>,
150153
}
151154

152155
impl ExecutorSessionState {
@@ -157,6 +160,7 @@ impl ExecutorSessionState {
157160
secondary_roles: session.get_secondary_roles(),
158161
settings: session.get_settings(),
159162
txn_manager: session.txn_mgr(),
163+
variables: session.get_all_variables(),
160164
}
161165
}
162166
}

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16+
use std::collections::HashMap;
1617
use std::fmt::Debug;
1718
use std::sync::atomic::AtomicBool;
1819
use std::sync::atomic::Ordering;
@@ -30,6 +31,7 @@ use databend_common_base::runtime::TrySpawn;
3031
use databend_common_catalog::table_context::StageAttachment;
3132
use databend_common_exception::ErrorCode;
3233
use databend_common_exception::Result;
34+
use databend_common_expression::Scalar;
3335
use databend_common_io::prelude::FormatSettings;
3436
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
3537
use databend_common_settings::ScopeLevel;
@@ -40,7 +42,9 @@ use log::warn;
4042
use poem::web::Json;
4143
use poem::IntoResponse;
4244
use serde::Deserialize;
45+
use serde::Deserializer;
4346
use serde::Serialize;
47+
use serde::Serializer;
4448

4549
use super::HttpQueryContext;
4650
use super::RemoveReason;
@@ -184,6 +188,42 @@ pub struct ServerInfo {
184188
pub start_time: String,
185189
}
186190

191+
#[derive(Deserialize, Serialize, Debug, Default, Clone, Eq, PartialEq)]
192+
pub struct HttpSessionStateInternal {
193+
variables: HashMap<String, Scalar>,
194+
}
195+
196+
fn serialize_as_json_string<S>(
197+
value: &Option<HttpSessionStateInternal>,
198+
serializer: S,
199+
) -> Result<S::Ok, S::Error>
200+
where
201+
S: Serializer,
202+
{
203+
match value {
204+
Some(complex_value) => {
205+
let json_string =
206+
serde_json::to_string(complex_value).map_err(serde::ser::Error::custom)?;
207+
serializer.serialize_some(&json_string)
208+
}
209+
None => serializer.serialize_none(),
210+
}
211+
}
212+
213+
fn deserialize_from_json_string<'de, D>(
214+
deserializer: D,
215+
) -> Result<Option<HttpSessionStateInternal>, D::Error>
216+
where D: Deserializer<'de> {
217+
let json_string: Option<String> = Option::deserialize(deserializer)?;
218+
match json_string {
219+
Some(s) => {
220+
let complex_value = serde_json::from_str(&s).map_err(serde::de::Error::custom)?;
221+
Ok(Some(complex_value))
222+
}
223+
None => Ok(None),
224+
}
225+
}
226+
187227
#[derive(Deserialize, Serialize, Debug, Default, Clone, Eq, PartialEq)]
188228
pub struct HttpSessionConf {
189229
#[serde(skip_serializing_if = "Option::is_none")]
@@ -192,6 +232,7 @@ pub struct HttpSessionConf {
192232
pub role: Option<String>,
193233
#[serde(skip_serializing_if = "Option::is_none")]
194234
pub secondary_roles: Option<Vec<String>>,
235+
// todo: remove this later
195236
#[serde(skip_serializing_if = "Option::is_none")]
196237
pub keep_server_session_secs: Option<u64>,
197238
#[serde(skip_serializing_if = "Option::is_none")]
@@ -201,9 +242,16 @@ pub struct HttpSessionConf {
201242
// used to check if the session is still on the same server
202243
#[serde(skip_serializing_if = "Option::is_none")]
203244
pub last_server_info: Option<ServerInfo>,
204-
// last_query_ids[0] is the last query id, last_query_ids[1] is the second last query id, etc.
245+
/// last_query_ids[0] is the last query id, last_query_ids[1] is the second last query id, etc.
205246
#[serde(default)]
206247
pub last_query_ids: Vec<String>,
248+
#[serde(default)]
249+
#[serde(skip_serializing_if = "Option::is_none")]
250+
#[serde(
251+
serialize_with = "serialize_as_json_string",
252+
deserialize_with = "deserialize_from_json_string"
253+
)]
254+
pub internal: Option<HttpSessionStateInternal>,
207255
}
208256

209257
impl HttpSessionConf {}
@@ -394,6 +442,11 @@ impl HttpQuery {
394442
})?;
395443
}
396444
}
445+
if let Some(state) = &session_conf.internal {
446+
if !state.variables.is_empty() {
447+
session.set_all_variables(state.variables.clone())
448+
}
449+
}
397450
try_set_txn(&ctx.query_id, &session, session_conf, &http_query_manager)?;
398451

399452
if let Some(secs) = session_conf.keep_server_session_secs {
@@ -577,7 +630,7 @@ impl HttpQuery {
577630
// - secondary_roles: updated by SET SECONDARY ROLES ALL|NONE;
578631
// - settings: updated by SET XXX = YYY;
579632
let executor = self.state.read().await;
580-
let session_state = executor.get_session_state();
633+
let mut session_state = executor.get_session_state();
581634

582635
let settings = session_state
583636
.settings
@@ -590,6 +643,13 @@ impl HttpQuery {
590643
let role = session_state.current_role.clone();
591644
let secondary_roles = session_state.secondary_roles.clone();
592645
let txn_state = session_state.txn_manager.lock().state();
646+
let internal = if !session_state.variables.is_empty() {
647+
Some(HttpSessionStateInternal {
648+
variables: std::mem::take(&mut session_state.variables),
649+
})
650+
} else {
651+
None
652+
};
593653
if txn_state != TxnState::AutoCommit
594654
&& !self.is_txn_mgr_saved.load(Ordering::Relaxed)
595655
&& matches!(executor.state, ExecuteState::Stopped(_))
@@ -615,6 +675,7 @@ impl HttpQuery {
615675
txn_state: Some(txn_state),
616676
last_server_info: Some(HttpQueryManager::instance().server_info.clone()),
617677
last_query_ids: vec![self.id.clone()],
678+
internal,
618679
}
619680
}
620681

src/query/service/src/sessions/session.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::net::SocketAddr;
1617
use std::sync::Arc;
1718

@@ -20,6 +21,7 @@ use databend_common_catalog::cluster_info::Cluster;
2021
use databend_common_config::GlobalConfig;
2122
use databend_common_exception::ErrorCode;
2223
use databend_common_exception::Result;
24+
use databend_common_expression::Scalar;
2325
use databend_common_io::prelude::FormatSettings;
2426
use databend_common_meta_app::principal::GrantObject;
2527
use databend_common_meta_app::principal::OwnershipObject;
@@ -356,6 +358,14 @@ impl Session {
356358
Some(x) => x.get_query_profiles(),
357359
}
358360
}
361+
362+
pub fn get_all_variables(&self) -> HashMap<String, Scalar> {
363+
self.session_ctx.get_all_variables()
364+
}
365+
366+
pub fn set_all_variables(&self, variables: HashMap<String, Scalar>) {
367+
self.session_ctx.set_all_variables(variables)
368+
}
359369
}
360370

361371
impl Drop for Session {

src/query/service/src/sessions/session_ctx.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,10 @@ impl SessionContext {
316316
pub fn get_variable(&self, key: &str) -> Option<Scalar> {
317317
self.variables.read().get(key).cloned()
318318
}
319+
pub fn get_all_variables(&self) -> HashMap<String, Scalar> {
320+
self.variables.read().clone()
321+
}
322+
pub fn set_all_variables(&self, variables: HashMap<String, Scalar>) {
323+
*self.variables.write() = variables
324+
}
319325
}

src/query/service/tests/it/servers/http/http_query_handlers.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,6 +1465,7 @@ async fn test_affect() -> Result<()> {
14651465
txn_state: Some(TxnState::AutoCommit),
14661466
last_server_info: None,
14671467
last_query_ids: vec![],
1468+
internal: None,
14681469
}),
14691470
),
14701471
(
@@ -1487,6 +1488,7 @@ async fn test_affect() -> Result<()> {
14871488
txn_state: Some(TxnState::AutoCommit),
14881489
last_server_info: None,
14891490
last_query_ids: vec![],
1491+
internal: None,
14901492
}),
14911493
),
14921494
(
@@ -1504,6 +1506,7 @@ async fn test_affect() -> Result<()> {
15041506
txn_state: Some(TxnState::AutoCommit),
15051507
last_server_info: None,
15061508
last_query_ids: vec![],
1509+
internal: None,
15071510
}),
15081511
),
15091512
(
@@ -1523,6 +1526,7 @@ async fn test_affect() -> Result<()> {
15231526
txn_state: Some(TxnState::AutoCommit),
15241527
last_server_info: None,
15251528
last_query_ids: vec![],
1529+
internal: None,
15261530
}),
15271531
),
15281532
(
@@ -1544,6 +1548,7 @@ async fn test_affect() -> Result<()> {
15441548
txn_state: Some(TxnState::AutoCommit),
15451549
last_server_info: None,
15461550
last_query_ids: vec![],
1551+
internal: None,
15471552
}),
15481553
),
15491554
];

tests/sqllogictests/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub struct HttpSessionConf {
4242
pub last_server_info: Option<ServerInfo>,
4343
#[serde(default)]
4444
pub last_query_ids: Vec<String>,
45+
pub internal: Option<String>,
4546
}
4647

4748
pub fn parser_rows(rows: &Value) -> Result<Vec<Vec<String>>> {

tests/sqllogictests/suites/query/set.test

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ select value, default = value from system.settings where name in ('max_threads'
77
4 0
88
56 0
99

10-
onlyif mysql
1110
statement ok
1211
set variable (a, b) = (select 3, 55)
1312

14-
onlyif mysql
13+
1514
statement ok
1615
SET GLOBAL (max_threads, storage_io_min_bytes_for_seek) = select $a + 1, $b + 1;
1716

@@ -30,25 +29,20 @@ select default = value from system.settings where name in ('max_threads', 'stor
3029
1
3130
1
3231

33-
onlyif mysql
3432
statement ok
3533
set variable a = 1;
3634

37-
onlyif mysql
3835
statement ok
3936
set variable (b, c) = ('yy', 'zz');
4037

41-
onlyif mysql
4238
query ITT
4339
select $a + getvariable('a') + $a, getvariable('b'), getvariable('c'), getvariable('d')
4440
----
4541
3 yy zz NULL
4642

47-
onlyif mysql
4843
statement ok
4944
unset variable (a, b)
5045

51-
onlyif mysql
5246
query ITT
5347
select getvariable('a'), getvariable('b'), 'xx' || 'yy' || getvariable('c') , getvariable('d')
5448
----

0 commit comments

Comments
 (0)