Skip to content

Commit

Permalink
chore: support temporary table with memory engine (#16332)
Browse files Browse the repository at this point in the history
* chore: support temporary table with memory engine

* fix fmt

* fix

* aovid holding two locks simultaneously
  • Loading branch information
SkyFan2002 authored Aug 27, 2024
1 parent 5222abb commit 41cf22e
Show file tree
Hide file tree
Showing 19 changed files with 424 additions and 40 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dist/
test/
tests/longrun/_*.csv
tests/longrun/**/_*.csv
tests/sqllogictests/data/*

# fuzz
hfuzz_target/
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,7 @@ impl SchemaApiTestSuite {
db_id: *db_id,
table_name: table_name.to_string(),
tb_id: table_id,
engine: "FUSE".to_string(),
})
.await?;

Expand Down Expand Up @@ -1844,6 +1845,7 @@ impl SchemaApiTestSuite {
db_id: *db_id,
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
};
mt.drop_table_by_id(plan.clone()).await?;

Expand Down Expand Up @@ -1874,6 +1876,7 @@ impl SchemaApiTestSuite {
db_id: *db_id,
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
};
let res = mt.drop_table_by_id(plan).await;
let err = res.unwrap_err();
Expand All @@ -1893,6 +1896,7 @@ impl SchemaApiTestSuite {
db_id: *db_id,
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
};
mt.drop_table_by_id(plan.clone()).await?;
}
Expand Down Expand Up @@ -4147,6 +4151,7 @@ impl SchemaApiTestSuite {
db_id: *db_id,
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
})
.await?;
}
Expand All @@ -4172,6 +4177,7 @@ impl SchemaApiTestSuite {
db_id: *db_id,
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
})
.await?;
let table_id = resp.table_id;
Expand Down Expand Up @@ -4246,6 +4252,7 @@ impl SchemaApiTestSuite {
db_id: *db_id,
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
})
.await?;
}
Expand All @@ -4272,6 +4279,7 @@ impl SchemaApiTestSuite {
db_id: *db_id,
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
})
.await?;
let table_id = resp.table_id;
Expand Down Expand Up @@ -4446,6 +4454,7 @@ impl SchemaApiTestSuite {
db_id,
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
})
.await?;
}
Expand Down Expand Up @@ -4686,6 +4695,7 @@ impl SchemaApiTestSuite {
db_id: old_db.database_id.db_id,
table_name: tbl_name_ident.table_name.clone(),
tb_id,
engine: "FUSE".to_string(),
})
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -4737,6 +4747,7 @@ impl SchemaApiTestSuite {
db_id: old_db.database_id.db_id,
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
})
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -4794,6 +4805,7 @@ impl SchemaApiTestSuite {
db_id: old_db.database_id.db_id,
table_name: tbl_name.to_string(),
tb_id: tb_info.ident.table_id,
engine: "FUSE".to_string(),
})
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -4895,6 +4907,7 @@ impl SchemaApiTestSuite {
db_id: cur_db.database_id.db_id,
table_name: tbl_name.to_string(),
tb_id: new_tb_info.ident.table_id,
engine: "FUSE".to_string(),
};

let old_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -7658,6 +7671,7 @@ where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError>
if_exists: false,
db_id: self.db_id,
tb_id: self.table_id,
engine: "FUSE".to_string(),
};
self.mt.drop_table_by_id(req.clone()).await?;

Expand Down
2 changes: 2 additions & 0 deletions src/meta/api/src/share_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ impl ShareApiTestSuite {
table_name: table_name.to_string(),
tb_id: table_id,
db_id,
engine: "FUSE".to_string(),
};
let res = mt.drop_table_by_id(plan).await?;
let (share_db_id, share_specs) = res.spec_vec.unwrap();
Expand Down Expand Up @@ -2466,6 +2467,7 @@ impl ShareApiTestSuite {
table_name: tbl_name.to_string(),
tb_id: table_id,
db_id,
engine: "FUSE".to_string(),
};
let _res = mt.drop_table_by_id(plan).await;

Expand Down
2 changes: 2 additions & 0 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ pub struct DropTableByIdReq {
pub table_name: String,

pub db_id: MetaId,

pub engine: String,
}

impl DropTableByIdReq {
Expand Down
1 change: 1 addition & 0 deletions src/meta/binaries/metabench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ async fn benchmark_table(client: &Arc<ClientHandle>, prefix: u64, client_num: u6
db_id,
table_name: table_name(),
tb_id: t.ident.table_id,
engine: "FUSE".to_string(),
})
.await;

Expand Down
1 change: 1 addition & 0 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl StreamHandler for RealStreamHandler {
table_name: stream_name.clone(),
tb_id: table.get_id(),
db_id: db.get_db_info().database_id.db_id,
engine: engine.to_string(),
})
.await
} else if plan.if_exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl Interpreter for DropTableInterpreter {
table_name: tbl_name.to_string(),
tb_id: tbl.get_table_info().ident.table_id,
db_id: db.get_db_info().database_id.db_id,
engine: tbl.engine().to_string(),
})
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl Interpreter for DropViewInterpreter {
table_name: self.plan.view_name.clone(),
tb_id: table.get_id(),
db_id: db.get_db_info().database_id.db_id,
engine: table.engine().to_string(),
})
.await?;
};
Expand Down
1 change: 1 addition & 0 deletions src/query/service/tests/it/catalogs/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ async fn test_catalogs_table() -> Result<()> {
table_name: "test_table".to_string(),
tb_id: tbl.get_table_info().ident.table_id,
db_id: db.get_db_info().database_id.db_id,
engine: tbl.engine().to_string(),
})
.await;
assert!(res.is_ok());
Expand Down
13 changes: 11 additions & 2 deletions src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,9 @@ impl Binder {
let _ = options.insert("TRANSIENT".to_owned(), "T".to_owned());
}
TableType::Temporary => {
if engine != Engine::Fuse {
if engine != Engine::Fuse && engine != Engine::Memory {
return Err(ErrorCode::BadArguments(
"Temporary table is only supported for FUSE engine",
"Temporary table is only supported for FUSE and MEMORY engine",
));
}
let _ = options.insert(OPT_KEY_TEMP_PREFIX.to_string(), self.ctx.get_session_id());
Expand Down Expand Up @@ -592,6 +592,15 @@ impl Binder {
}
};

if engine == Engine::Memory {
let catalog = self.ctx.get_catalog(&catalog).await?;
let db = catalog
.get_database(&self.ctx.get_tenant(), &database)
.await?;
let db_id = db.get_db_info().database_id.db_id;
options.insert(OPT_KEY_DATABASE_ID.to_owned(), db_id.to_string());
}

if engine == Engine::Fuse {
// Currently, [Table] can not accesses its database id yet, thus
// here we keep the db id AS an entry of `table_meta.options`.
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/common/blocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ test = true
databend-common-exception = { workspace = true }
databend-common-expression = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true }

[build-dependencies]
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/common/blocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@

mod parquet_rs;
pub use parquet_rs::blocks_to_parquet;
pub mod memory;
27 changes: 27 additions & 0 deletions src/query/storages/common/blocks/src/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::LazyLock;

use databend_common_expression::DataBlock;
use parking_lot::RwLock;
/// Shared store to support memory tables.
///
/// Indexed by table id etc.
pub type InMemoryData<K> = HashMap<K, Arc<RwLock<Vec<DataBlock>>>>;

pub static IN_MEMORY_DATA: LazyLock<Arc<RwLock<InMemoryData<u64>>>> =
LazyLock::new(|| Arc::new(Default::default()));
1 change: 1 addition & 0 deletions src/query/storages/common/session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ databend-common-exception = { workspace = true }
databend-common-meta-app = { workspace = true }
databend-common-meta-types = { workspace = true }
databend-common-storage = { workspace = true }
databend-storages-common-blocks = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
Expand Down
Loading

0 comments on commit 41cf22e

Please sign in to comment.