Skip to content

Commit

Permalink
feat: impl create dynamic table sql parser and planner (#15250)
Browse files Browse the repository at this point in the history
* create dynamic table

* remove async_backtrace::framed in optimizer

---------

Co-authored-by: Bohu <[email protected]>
  • Loading branch information
zhyass and BohuTANG authored Apr 20, 2024
1 parent 1544347 commit b1b622d
Show file tree
Hide file tree
Showing 30 changed files with 1,396 additions and 33 deletions.
3 changes: 3 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ build_exceptions! {
IllegalStream(2733),
StreamVersionMismatched(2734),

// dynamic error codes.
IllegalDynamicTable(2740),

// Variable error codes.
UnknownVariable(2801),
OnlySupportAsciiChars(2802),
Expand Down
162 changes: 162 additions & 0 deletions src/query/ast/src/ast/statements/dynamic_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Display;
use std::fmt::Formatter;

use databend_common_meta_app::schema::CreateOption;
use derive_visitor::Drive;
use derive_visitor::DriveMut;

use crate::ast::write_comma_separated_list;
use crate::ast::write_dot_separated_list;
use crate::ast::write_space_separated_string_map;
use crate::ast::CreateTableSource;
use crate::ast::Expr;
use crate::ast::Identifier;
use crate::ast::Query;
use crate::ast::WarehouseOptions;

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub enum TargetLag {
IntervalSecs(#[drive(skip)] u64),
Downstream,
}

impl Display for TargetLag {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TargetLag::IntervalSecs(secs) => {
write!(f, "{} SECOND", secs)
}
TargetLag::Downstream => {
write!(f, "DOWNSTREAM")
}
}
}
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub enum RefreshMode {
Auto,
Full,
Incremental,
}

impl Display for RefreshMode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
RefreshMode::Auto => {
write!(f, "AUTO")
}
RefreshMode::Full => {
write!(f, "FULL")
}
RefreshMode::Incremental => {
write!(f, "INCREMENTAL")
}
}
}
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub enum InitializeMode {
OnCreate,
OnSchedule,
}

impl Display for InitializeMode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
InitializeMode::OnCreate => {
write!(f, "ON_CREATE")
}
InitializeMode::OnSchedule => {
write!(f, "ON_SCHEDULE")
}
}
}
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct CreateDynamicTableStmt {
#[drive(skip)]
pub create_option: CreateOption,
#[drive(skip)]
pub transient: bool,
pub catalog: Option<Identifier>,
pub database: Option<Identifier>,
pub table: Identifier,
pub source: Option<CreateTableSource>,
pub cluster_by: Vec<Expr>,

pub target_lag: TargetLag,
pub warehouse_opts: WarehouseOptions,
pub refresh_mode: RefreshMode,
pub initialize: InitializeMode,

#[drive(skip)]
pub table_options: BTreeMap<String, String>,
pub as_query: Box<Query>,
}

impl Display for CreateDynamicTableStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "CREATE ")?;
if let CreateOption::CreateOrReplace = self.create_option {
write!(f, "OR REPLACE ")?;
}
if self.transient {
write!(f, "TRANSIENT ")?;
}
write!(f, "DYNAMIC TABLE ")?;
if let CreateOption::CreateIfNotExists = self.create_option {
write!(f, "IF NOT EXISTS ")?;
}
write_dot_separated_list(
f,
self.catalog
.iter()
.chain(&self.database)
.chain(Some(&self.table)),
)?;

if let Some(source) = &self.source {
write!(f, " {source}")?;
}

if !self.cluster_by.is_empty() {
write!(f, " CLUSTER BY (")?;
write_comma_separated_list(f, &self.cluster_by)?;
write!(f, ")")?
}

write!(f, " TARGET_LAG = {}", self.target_lag)?;
if self.warehouse_opts.warehouse.is_some() {
write!(f, " {}", self.warehouse_opts)?;
}
write!(f, " REFRESH_MODE = {}", self.refresh_mode)?;
write!(f, " INITIALIZE = {}", self.initialize)?;

// Format table options
if !self.table_options.is_empty() {
write!(f, " ")?;
write_space_separated_string_map(f, &self.table_options)?;
}

write!(f, " AS {}", self.as_query)?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod copy;
mod data_mask;
mod database;
mod delete;
mod dynamic_table;
mod explain;
mod hint;
mod index;
Expand Down Expand Up @@ -59,6 +60,7 @@ pub use copy::*;
pub use data_mask::*;
pub use database::*;
pub use delete::*;
pub use dynamic_table::*;
pub use explain::*;
pub use hint::*;
pub use index::*;
Expand Down
3 changes: 3 additions & 0 deletions src/query/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ pub enum Statement {
DropTask(DropTaskStmt),
ShowTasks(ShowTasksStmt),

CreateDynamicTable(CreateDynamicTableStmt),

// pipes
CreatePipe(CreatePipeStmt),
DescribePipe(DescribePipeStmt),
Expand Down Expand Up @@ -726,6 +728,7 @@ impl Display for Statement {
Statement::ExecuteImmediate(stmt) => write!(f, "{stmt}")?,
Statement::CreateSequence(stmt) => write!(f, "{stmt}")?,
Statement::DropSequence(stmt) => write!(f, "{stmt}")?,
Statement::CreateDynamicTable(stmt) => write!(f, "{stmt}")?,
}
Ok(())
}
Expand Down
9 changes: 7 additions & 2 deletions src/query/ast/src/ast/statements/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ impl Display for CreateTaskStmt {

write!(f, " {}", self.name)?;

write!(f, " {}", self.warehouse_opts)?;
if self.warehouse_opts.warehouse.is_some() {
write!(f, " {}", self.warehouse_opts)?;
}

if let Some(schedule_opt) = self.schedule_opts.as_ref() {
write!(f, " SCHEDULE = {}", schedule_opt)?;
Expand All @@ -106,7 +108,10 @@ impl Display for CreateTaskStmt {
write!(f, " COMMENTS = '{}'", comments)?;
}

write_comma_separated_string_map(f, &self.session_parameters)?;
if !self.session_parameters.is_empty() {
write!(f, " ")?;
write_comma_separated_string_map(f, &self.session_parameters)?;
}

write!(f, " AS {}", self.sql)?;

Expand Down
4 changes: 4 additions & 0 deletions src/query/ast/src/ast/visitors/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,10 @@ pub trait Visitor<'ast>: Sized {

fn visit_alter_task(&mut self, _stmt: &'ast AlterTaskStmt) {}

fn visit_create_dynamic_table(&mut self, stmt: &'ast CreateDynamicTableStmt) {
self.visit_query(stmt.as_query.as_ref())
}

fn visit_create_notification(&mut self, _stmt: &'ast CreateNotificationStmt) {}
fn visit_drop_notification(&mut self, _stmt: &'ast DropNotificationStmt) {}
fn visit_describe_notification(&mut self, _stmt: &'ast DescribeNotificationStmt) {}
Expand Down
4 changes: 4 additions & 0 deletions src/query/ast/src/ast/visitors/visitor_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,10 @@ pub trait VisitorMut: Sized {

fn visit_alter_task(&mut self, _stmt: &mut AlterTaskStmt) {}

fn visit_create_dynamic_table(&mut self, stmt: &mut CreateDynamicTableStmt) {
self.visit_query(&mut stmt.as_query)
}

// notification
fn visit_create_notification(&mut self, _stmt: &mut CreateNotificationStmt) {}
fn visit_drop_notification(&mut self, _stmt: &mut DropNotificationStmt) {}
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/ast/visitors/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,5 +588,6 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem
Statement::ExecuteImmediate(_) => {}
Statement::CreateSequence(stmt) => visitor.visit_create_sequence(stmt),
Statement::DropSequence(stmt) => visitor.visit_drop_sequence(stmt),
Statement::CreateDynamicTable(stmt) => visitor.visit_create_dynamic_table(stmt),
}
}
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/visitors/walk_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ pub fn walk_statement_mut<V: VisitorMut>(visitor: &mut V, statement: &mut Statem
Statement::ShowTasks(stmt) => visitor.visit_show_tasks(stmt),
Statement::DescribeTask(stmt) => visitor.visit_describe_task(stmt),

Statement::CreateDynamicTable(stmt) => visitor.visit_create_dynamic_table(stmt),

Statement::CreateConnection(stmt) => visitor.visit_create_connection(stmt),
Statement::DropConnection(stmt) => visitor.visit_drop_connection(stmt),
Statement::DescribeConnection(stmt) => visitor.visit_describe_connection(stmt),
Expand Down
Loading

0 comments on commit b1b622d

Please sign in to comment.