Skip to content

Commit

Permalink
clear temp table
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Dec 3, 2024
1 parent 0d34ed8 commit 59e593a
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 56 deletions.
4 changes: 4 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ pub trait TableContext: Send + Sync {
fn get_shared_settings(&self) -> Arc<Settings>;

fn get_runtime(&self) -> Result<Arc<Runtime>>;

fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str);

async fn drop_m_cte_temp_table(&self) -> Result<()>;
}

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;
Expand Down
9 changes: 7 additions & 2 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ pub trait Interpreter: Sync + Send {
#[fastrace::trace]
async fn execute(&self, ctx: Arc<QueryContext>) -> Result<SendableDataBlockStream> {
log_query_start(&ctx);
match self.execute_inner(ctx.clone()).await {
let res = self.execute_inner(ctx.clone()).await;
// Drop the temp tables that are generated by materialized cte.
ctx.drop_m_cte_temp_table().await?;
match res {
Ok(stream) => Ok(stream),
Err(err) => {
log_query_finished(&ctx, Some(err.clone()), false);
Expand Down Expand Up @@ -230,7 +233,9 @@ async fn plan_sql(
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
let mut planner = Planner::new_with_query_executor(
ctx.clone(),
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
Arc::new(ServiceQueryExecutor::new(QueryContext::create_from(
ctx.clone(),
))),
);

// Parse the SQL query, get extract additional information.
Expand Down
44 changes: 44 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use databend_common_meta_app::principal::UserPrivilegeType;
use databend_common_meta_app::principal::COPY_MAX_FILES_COMMIT_MSG;
use databend_common_meta_app::principal::COPY_MAX_FILES_PER_COMMIT;
use databend_common_meta_app::schema::CatalogType;
use databend_common_meta_app::schema::DropTableByIdReq;
use databend_common_meta_app::schema::GetTableCopiedFileReq;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::storage::StorageParams;
Expand Down Expand Up @@ -107,9 +108,11 @@ use databend_common_storages_stage::StageTable;
use databend_common_storages_stream::stream_table::StreamTable;
use databend_common_users::GrantObjectVisibilityChecker;
use databend_common_users::UserApiProvider;
use databend_storages_common_session::drop_table_by_id;
use databend_storages_common_session::SessionState;
use databend_storages_common_session::TxnManagerRef;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
use jiff::tz::TimeZone;
use jiff::Zoned;
use log::debug;
Expand Down Expand Up @@ -149,6 +152,9 @@ pub struct QueryContext {
fragment_id: Arc<AtomicUsize>,
// Used by synchronized generate aggregating indexes when new data written.
inserted_segment_locs: Arc<RwLock<HashSet<Location>>>,
// Temp table for materialized CTE, first string is the database_name, second string is the table_name
// All temp tables' catalog is `CATALOG_DEFAULT`, so we don't need to store it.
m_cte_temp_table: Arc<RwLock<Vec<(String, String)>>>,
}

impl QueryContext {
Expand All @@ -174,6 +180,7 @@ impl QueryContext {
fragment_id: Arc::new(AtomicUsize::new(0)),
inserted_segment_locs: Arc::new(RwLock::new(HashSet::new())),
block_threshold: Arc::new(RwLock::new(BlockThresholds::default())),
m_cte_temp_table: Arc::new(Default::default()),
})
}

Expand Down Expand Up @@ -1467,6 +1474,43 @@ impl TableContext for QueryContext {
fn get_runtime(&self) -> Result<Arc<Runtime>> {
self.shared.try_get_runtime()
}

fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str) {
self.m_cte_temp_table
.write()
.push((database_name.to_string(), table_name.to_string()));
}

async fn drop_m_cte_temp_table(&self) -> Result<()> {
let temp_tbl_mgr = self.shared.session.session_ctx.temp_tbl_mgr();
let m_cte_temp_table = self.m_cte_temp_table.read().clone();
let tenant = self.get_tenant();
for (db_name, table_name) in m_cte_temp_table.iter() {
let table = self.get_table(CATALOG_DEFAULT, db_name, table_name).await?;
let db = self
.get_catalog(CATALOG_DEFAULT)
.await?
.get_database(&tenant, db_name)
.await?;
let drop_table_req = DropTableByIdReq {
if_exists: true,
tenant: tenant.clone(),
tb_id: table.get_table_info().ident.table_id,
table_name: table_name.to_string(),
db_id: db.get_db_info().database_id.db_id,
engine: table.engine().to_string(),
session_id: table
.options()
.get(OPT_KEY_TEMP_PREFIX)
.cloned()
.unwrap_or_default(),
};
drop_table_by_id(temp_tbl_mgr.clone(), drop_table_req).await?;
}
let mut m_cte_temp_table = self.m_cte_temp_table.write();
m_cte_temp_table.clear();
Ok(())
}
}

impl TrySpawn for QueryContext {
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,14 @@ impl TableContext for CtxDelegation {
fn get_runtime(&self) -> Result<Arc<Runtime>> {
todo!()
}

fn add_m_cte_temp_table(&self, _database_name: &str, _table_name: &str) {
todo!()
}

async fn drop_m_cte_temp_table(&self) -> Result<()> {
todo!()
}
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,14 @@ impl TableContext for CtxDelegation {
fn get_runtime(&self) -> Result<Arc<Runtime>> {
todo!()
}

fn add_m_cte_temp_table(&self, _database_name: &str, _table_name: &str) {
todo!()
}

async fn drop_m_cte_temp_table(&self) -> Result<()> {
todo!()
}
}

#[derive(Clone, Debug)]
Expand Down
1 change: 0 additions & 1 deletion src/query/sql/src/planner/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ pub struct CteInfo {
pub cte_idx: IndexType,
// If cte is materialized, save its columns
pub columns: Vec<ColumnBinding>,
pub m_cte_name_to_temp_table: HashMap<String, String>,
}

impl BindContext {
Expand Down
41 changes: 20 additions & 21 deletions src/query/sql/src/planner/binder/bind_query/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use databend_common_ast::ast::CreateOption;
Expand All @@ -26,6 +25,7 @@ use databend_common_ast::ast::TableType;
use databend_common_ast::ast::With;
use databend_common_ast::ast::CTE;
use databend_common_ast::Span;
use databend_common_catalog::catalog::CATALOG_DEFAULT;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

Expand Down Expand Up @@ -89,21 +89,17 @@ impl Binder {
.iter()
.map(|ident| self.normalize_identifier(ident).name)
.collect();
let mut cte_info = CteInfo {
let cte_info = CteInfo {
columns_alias: column_name,
query: *cte.query.clone(),
recursive: with.recursive,
cte_idx: idx,
columns: vec![],
materialized: cte.materialized,
m_cte_name_to_temp_table: HashMap::new(),
};
// If the CTE is materialized, we'll construct a temp table for it.
if cte.materialized {
let temp_table_name = self.m_cte_to_temp_table(cte)?;
cte_info
.m_cte_name_to_temp_table
.insert(cte.alias.name.name.clone(), temp_table_name);
self.m_cte_to_temp_table(cte)?;
}
bind_context
.cte_context
Expand Down Expand Up @@ -183,27 +179,27 @@ impl Binder {
}

// The return value is temp_table name`
fn m_cte_to_temp_table(&self, cte: &CTE) -> Result<String> {
fn m_cte_to_temp_table(&self, cte: &CTE) -> Result<()> {
let engine = if self.ctx.get_settings().get_persist_materialized_cte()? {
Engine::Fuse
} else {
Engine::Memory
};
let database = self.ctx.get_current_database();
let catalog = self.ctx.get_current_catalog();
let query_id = self.ctx.get_id();
// Navigate the temp table for cte to `cte_name + query_id`
// Avoid the conflict of the temp table name with the same name of user's temp table.
let table_name = format!(
"{}_{}",
cte.alias.name.name,
query_id.split('-').next().unwrap_or(&query_id)
);
if self
.ctx
.is_temp_table(CATALOG_DEFAULT, &database, &cte.alias.name.name)
{
return Err(ErrorCode::Internal(format!(
"Temporary table {:?} already exists in current session, please change the materialized CTE name",
cte.alias.name.name
)));
}
let create_table_stmt = CreateTableStmt {
create_option: CreateOption::CreateOrReplace,
catalog: Some(Identifier::from_name(Span::None, catalog.clone())),
catalog: Some(Identifier::from_name(Span::None, CATALOG_DEFAULT)),
database: Some(Identifier::from_name(Span::None, database.clone())),
table: Identifier::from_name(cte.alias.name.span, table_name.clone()),
table: cte.alias.name.clone(),
source: None,
engine: Some(engine),
uri_location: None,
Expand All @@ -225,7 +221,10 @@ impl Binder {
};

self.ctx
.remove_table_from_cache(&catalog, &database, &cte.alias.name.name);
Ok(table_name)
.add_m_cte_temp_table(&database, &cte.alias.name.name);

self.ctx
.remove_table_from_cache(CATALOG_DEFAULT, &database, &cte.alias.name.name);
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,31 +97,14 @@ impl Binder {
};
}

let mut temp_table_name = None;
if let Some(cte_info) = cte_map.get(&table_name)
&& cte_info.materialized
{
temp_table_name = Some(
cte_info
.m_cte_name_to_temp_table
.get(&table_name)
.unwrap()
.to_string(),
);
}

let navigation = self.resolve_temporal_clause(bind_context, temporal)?;

// Resolve table with catalog
let table_meta = {
match self.resolve_data_source(
catalog.as_str(),
database.as_str(),
if let Some(temp_table_name) = temp_table_name.as_ref() {
temp_table_name.as_str()
} else {
table_name.as_str()
},
table_name.as_str(),
navigation.as_ref(),
max_batch_size,
self.ctx.clone().get_abort_checker(),
Expand Down Expand Up @@ -167,7 +150,6 @@ impl Binder {
database.clone(),
table_meta,
table_name_alias,
None,
bind_context.view_info.is_some(),
bind_context.planning_agg_index,
false,
Expand Down Expand Up @@ -246,7 +228,6 @@ impl Binder {
database.clone(),
table_meta,
table_name_alias,
None,
false,
false,
false,
Expand Down Expand Up @@ -278,7 +259,6 @@ impl Binder {
catalog,
database.clone(),
table_meta,
Some(table_name.clone()),
table_name_alias,
bind_context.view_info.is_some(),
bind_context.planning_agg_index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ impl Binder {
"system".to_string(),
table.clone(),
table_alias_name,
None,
false,
false,
false,
Expand Down Expand Up @@ -208,7 +207,6 @@ impl Binder {
"system".to_string(),
table.clone(),
table_alias_name,
None,
false,
false,
false,
Expand Down
1 change: 0 additions & 1 deletion src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ impl Binder {
"system".to_string(),
table.clone(),
table_alias_name,
None,
false,
false,
true,
Expand Down
1 change: 0 additions & 1 deletion src/query/sql/src/planner/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ impl Dataframe {
database.to_string(),
table_meta,
None,
None,
false,
false,
false,
Expand Down
1 change: 0 additions & 1 deletion src/query/sql/src/planner/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ pub fn bind_table(table_meta: Arc<dyn Table>) -> Result<(BindContext, MetadataRe
"default".to_string(),
table_meta,
None,
None,
false,
false,
false,
Expand Down
7 changes: 1 addition & 6 deletions src/query/sql/src/planner/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,18 +349,13 @@ impl Metadata {
catalog: String,
database: String,
table_meta: Arc<dyn Table>,
table_name: Option<String>,
table_alias_name: Option<String>,
source_of_view: bool,
source_of_index: bool,
source_of_stage: bool,
consume: bool,
) -> IndexType {
let table_name = if let Some(table_name) = table_name {
table_name
} else {
table_meta.name().to_string()
};
let table_name = table_meta.name().to_string();

let table_index = self.tables.len();
// If exists table alias name, use it instead of origin name
Expand Down

0 comments on commit 59e593a

Please sign in to comment.