diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 490670bf66b..bf32782f770 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -243,6 +243,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d || (managed_storage->engineType() != ::TiDB::StorageEngine::DT && managed_storage->engineType() != ::TiDB::StorageEngine::TMT)) { + LOG_DEBUG(log, "{}.{} is not ManageableStorage", database_name, table_name); storage = storage_tmp; table_lock = storage->lockForShare(context.getCurrentQueryId()); return; diff --git a/dbms/src/Server/BgStorageInit.cpp b/dbms/src/Server/BgStorageInit.cpp index 6fbb2060348..5c53db7083e 100644 --- a/dbms/src/Server/BgStorageInit.cpp +++ b/dbms/src/Server/BgStorageInit.cpp @@ -53,7 +53,7 @@ void doInitStores(Context & global_context, const LoggerPtr & log) const auto & [ks_id, table_id] = ks_table_id; try { - init_cnt += storage->initStoreIfNeed(restore_segments_thread_pool) ? 1 : 0; + init_cnt += storage->initStoreIfDataDirExist(restore_segments_thread_pool) ? 1 : 0; LOG_INFO(log, "Storage inited done, keyspace={} table_id={}", ks_id, table_id); } catch (...) diff --git a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp index 72543d32c16..3a4a47975a7 100644 --- a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp @@ -175,7 +175,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() RUNTIME_CHECK(dt_stream == nullptr); // The parent_path and file_id are generated by the storage. - auto [parent_path, file_id] = storage->getAndMaybeInitStore()->preAllocateIngestFile(); + auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile(); if (parent_path.empty()) { // Can not allocate path and id for storing DTFiles (the storage may be dropped / shutdown) @@ -223,7 +223,7 @@ bool SSTFilesToDTFilesOutputStream::finalizeDTFileStream() // If remote data store is not enabled, add the DTFile to StoragePathPool so that we can restore it later // Else just add it's size to disk delegator - storage->getAndMaybeInitStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); + storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); dt_stream.reset(); @@ -360,7 +360,7 @@ void SSTFilesToDTFilesOutputStream::cancel() try { // If DMFile has pre-ingested, remove it. - storage->getAndMaybeInitStore()->removePreIngestFile(file->fileId(), /*throw_on_not_exist*/ false); + storage->getStore()->removePreIngestFile(file->fileId(), /*throw_on_not_exist*/ false); // Remove local DMFile. file->remove(context.getFileProvider()); } diff --git a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp index d01090639f1..afeaf90a02f 100644 --- a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp @@ -205,23 +205,4 @@ LocalIndexInfosPtr generateLocalIndexInfos( return new_index_infos; } -bool containsLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger) -{ - if (!isVectorIndexSupported(logger)) - return false; - - for (const auto & col : table_info.columns) - { - if (col.vector_index) - { - return true; - } - } - for (const auto & idx : table_info.index_infos) - { - if (idx.vector_index) - return true; - } - return false; -} } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h index 113cfc4d41b..34a5db7d1b6 100644 --- a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h +++ b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h @@ -61,6 +61,4 @@ LocalIndexInfosPtr generateLocalIndexInfos( const TiDB::TableInfo & new_table_info, const LoggerPtr & logger); - -bool containsLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger); } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index 8860110ee1c..01d55d503b9 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -162,7 +162,7 @@ try EXPECT_EQ(col_value->getDataAt(i), String(DB::toString(num_rows_write))); } } - auto delta_store = storage->getAndMaybeInitStore(); + auto delta_store = storage->getStore(); size_t total_segment_rows = 0; auto segment_stats = delta_store->getSegmentsStats(); for (auto & stat : segment_stats) @@ -308,7 +308,7 @@ try auto sort_desc = storage->getPrimarySortDescription(); ASSERT_FALSE(storage->storeInited()); - const auto & store = storage->getAndMaybeInitStore(); + const auto & store = storage->getStore(); ASSERT_TRUE(storage->storeInited()); auto pk_type2 = store->getPKDataType(); auto sort_desc2 = store->getPrimarySortDescription(); @@ -826,11 +826,11 @@ try { write_data(num_rows_write, 1000); num_rows_write += 1000; - if (storage->getAndMaybeInitStore()->getSegmentsStats().size() > 1) + if (storage->getStore()->getSegmentsStats().size() > 1) break; } { - ASSERT_GT(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); + ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1); ASSERT_EQ(read_data(), num_rows_write); } storage->flushCache(*ctx); @@ -847,13 +847,13 @@ try // write more data make sure segments more than 1 for (size_t i = 0; i < 100000; i++) { - if (storage->getAndMaybeInitStore()->getSegmentsStats().size() > 1) + if (storage->getStore()->getSegmentsStats().size() > 1) break; write_data(num_rows_write, 1000); num_rows_write += 1000; } { - ASSERT_GT(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); + ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1); ASSERT_EQ(read_data(), num_rows_write); } storage->flushCache(*ctx); @@ -873,7 +873,7 @@ try // restore the table and make sure there is just one segment left create_table(); { - ASSERT_EQ(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); + ASSERT_EQ(storage->getStore()->getSegmentsStats().size(), 1); ASSERT_LT(read_data(), num_rows_write); } storage->drop(); diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index 343ecdee2ea..27d761f064c 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -90,8 +90,8 @@ class IManageableStorage : public IStorage /// `limit` is the max number of segments to gc, return value is the number of segments gced virtual UInt64 onSyncGc(Int64 /*limit*/, const DM::GCOptions &) { throw Exception("Unsupported"); } - /// Return true if the DeltaMergeStore instance need to be inited - virtual bool initStoreIfNeed(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); } + /// Return true is data dir exist + virtual bool initStoreIfDataDirExist(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); } virtual TiDB::StorageEngine engineType() const = 0; diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp index cb1ef198828..be220967915 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp @@ -80,8 +80,7 @@ CheckpointIngestInfoPtr CheckpointIngestInfo::restore( if (storage && storage->engineType() == TiDB::StorageEngine::DT) { auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context - = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & seg_persisted : ingest_info_persisted.segments()) { ReadBufferFromString buf(seg_persisted.segment_meta()); @@ -198,8 +197,7 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region, if (storage && storage->engineType() == TiDB::StorageEngine::DT) { auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context - = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & segment_to_drop : segments) { DM::WriteBatches wbs(*dm_context->storage_pool, dm_context->getWriteLimiter()); diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index 4508c7af3c2..7a70e673fad 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -1085,7 +1085,7 @@ try { if (ingest_using_split) { - auto stats = storage->getAndMaybeInitStore()->getStoreStats(); + auto stats = storage->getStore()->getStoreStats(); // Including 0..20, 20..100, 100..inf. ASSERT_EQ(3, stats.segment_count); } @@ -1128,7 +1128,7 @@ try } } { - auto stats = storage->getAndMaybeInitStore()->getStoreStats(); + auto stats = storage->getStore()->getStoreStats(); ASSERT_NE(0, stats.total_stable_size_on_disk); ASSERT_NE(0, stats.total_rows); ASSERT_NE(0, stats.total_size); @@ -1150,7 +1150,7 @@ try auto gc_n = storage->onSyncGc(100, DM::GCOptions::newAllForTest()); ASSERT_EQ(0, gc_n); - auto stats = storage->getAndMaybeInitStore()->getStoreStats(); + auto stats = storage->getStore()->getStoreStats(); // The data of [20, 100), is not reclaimed during Apply Snapshot. if (ingest_using_split) { diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 87626e2e72d..15e2373a34f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -246,8 +246,7 @@ void assertNoSegment( auto storage = storages.get(keyspace_id, table_id); RUNTIME_CHECK(storage && storage->engineType() == TiDB::StorageEngine::DT); auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context - = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & seg_persisted : ingest_info_persisted.segments()) { ReadBufferFromString buf(seg_persisted.segment_meta()); @@ -486,7 +485,7 @@ try auto storage = global_context.getTMTContext().getStorages().get(keyspace_id, table_id); ASSERT_TRUE(storage && storage->engineType() == TiDB::StorageEngine::DT); auto dm_storage = std::dynamic_pointer_cast(storage); - auto store = dm_storage->getAndMaybeInitStore(); + auto store = dm_storage->getStore(); ASSERT_EQ(store->getRowKeyColumnSize(), 1); verifyRows( global_context, diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index af7afa18a09..9188e84f4a6 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -93,10 +93,11 @@ StorageDeltaMerge::StorageDeltaMerge( , global_context(global_context_.getGlobalContext()) , log(Logger::get(fmt::format("{}.{}", db_name_, table_name_))) { - RUNTIME_CHECK_MSG(!primary_expr_ast_->children.empty(), "No primary key, ident={}", log->identifier()); + if (primary_expr_ast_->children.empty()) + throw Exception("No primary key"); // save schema from TiDB - if (likely(table_info_)) + if (table_info_) { tidb_table_info = table_info_->get(); is_common_handle = tidb_table_info.is_common_handle; @@ -112,11 +113,6 @@ StorageDeltaMerge::StorageDeltaMerge( table_column_info = std::make_unique(db_name_, table_name_, primary_expr_ast_); updateTableColumnInfo(); - - if (table_info_ && containsLocalIndexInfos(table_info_.value(), log)) - { - getAndMaybeInitStore(); - } } void StorageDeltaMerge::updateTableColumnInfo() @@ -1398,34 +1394,16 @@ void StorageDeltaMerge::alterSchemaChange( LOG_DEBUG(log, "Update table_info: {} => {}", tidb_table_info.serialize(), table_info.serialize()); { - // In order to avoid concurrent issue between init store and DDL, - // we must acquire the lock before schema changes is applied. - std::lock_guard lock(store_mutex); + std::lock_guard lock(store_mutex); // Avoid concurrent init store and DDL. if (storeInited()) { _store->applySchemaChanges(table_info); } - else + else // it seems we will never come into this branch ? { - if (containsLocalIndexInfos(table_info, log)) - { - // If there exist vector index, then we must init the store to create - // at least 1 segment. So that tidb can detect the index is added. - initStore(lock); - _store->applySchemaChanges(table_info); - } - else - { - // If there is no data need to be stored for this table, the _store instance - // is not inited to reduce fragmentation files that may exhaust the inode of - // disk. - // Under this case, we update some in-memory variables to ensure the correctness. - updateTableColumnInfo(); - } + updateTableColumnInfo(); } } - - // Should generate new decoding snapshot and cache block decoding_schema_changed = true; SortDescription pk_desc = getPrimarySortDescription(); @@ -1824,9 +1802,14 @@ SortDescription StorageDeltaMerge::getPrimarySortDescription() const return desc; } -DeltaMergeStorePtr & StorageDeltaMerge::initStore(const std::lock_guard &, ThreadPool * thread_pool) +DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread_pool) { - if (likely(_store == nullptr)) + if (storeInited()) + { + return _store; + } + std::lock_guard lock(store_mutex); + if (_store == nullptr) { auto index_infos = initLocalIndexInfos(tidb_table_info, log); _store = DeltaMergeStore::create( @@ -1851,20 +1834,8 @@ DeltaMergeStorePtr & StorageDeltaMerge::initStore(const std::lock_guard &, ThreadPool * thread_pool = nullptr); + DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr); bool storeInited() const { return store_inited.load(std::memory_order_acquire); } void updateTableColumnInfo(); ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info); diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index a547cb8e4cf..90675aa00c0 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -795,7 +795,7 @@ try DM::tests::DeltaMergeStoreVectorBase dmsv; StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); - dmsv.store = storage->getAndMaybeInitStore(); + dmsv.store = storage->getStore(); dmsv.db_context = std::make_shared(global_ctx.getGlobalContext()); dmsv.vec_column_name = cols.getAllPhysical().back().name; dmsv.vec_column_id = mustGetSyncedTable(t1_id)->getTableInfo().getColumnID(dmsv.vec_column_name); @@ -817,6 +817,7 @@ try // sync table schema, the VectorIndex in TableInfo should be updated refreshTableSchema(t1_id); auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); idx_infos = tbl_info.index_infos; ASSERT_EQ(idx_infos.size(), 1); for (const auto & idx : idx_infos) @@ -898,95 +899,4 @@ try } CATCH - -TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase1) -try -{ - auto pd_client = global_ctx.getTMTContext().getPDClient(); - - const String db_name = "mock_db"; - MockTiDB::instance().newDataBase(db_name); - - auto cols = ColumnsDescription({ - {"col1", typeFromString("Int64")}, - {"vec", typeFromString("Array(Float32)")}, - }); - auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); - refreshSchema(); - - // The `StorageDeltaMerge` is created but `DeltaMergeStore` is not inited - StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); - ASSERT_EQ(nullptr,storage->getStoreIfInited()); - - // add a vector index - IndexID idx_id = 11; - auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ - .kind = tipb::VectorIndexKind::HNSW, - .dimension = 3, - .distance_metric = tipb::VectorDistanceMetric::L2, - }); - MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); - - // sync table schema, the VectorIndex in TableInfo should be updated - refreshTableSchema(t1_id); - // The `DeltaMergeStore` instanced should be inited - ASSERT_NE(nullptr,storage->getStoreIfInited()); - - auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); - auto idx_infos = tbl_info.index_infos; - ASSERT_EQ(idx_infos.size(), 1); - for (const auto & idx : idx_infos) - { - ASSERT_EQ(idx.id, idx_id); - ASSERT_NE(idx.vector_index, nullptr); - ASSERT_EQ(idx.vector_index->kind, vector_index->kind); - ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); - ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); - } - -} -CATCH - -TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase2) -try -{ - auto pd_client = global_ctx.getTMTContext().getPDClient(); - - const String db_name = "mock_db"; - MockTiDB::instance().newDataBase(db_name); - - // The table is created and vector index is added. After that, the table info is synced to TiFlash - auto cols = ColumnsDescription({ - {"col1", typeFromString("Int64")}, - {"vec", typeFromString("Array(Float32)")}, - }); - auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); - IndexID idx_id = 11; - auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ - .kind = tipb::VectorIndexKind::HNSW, - .dimension = 3, - .distance_metric = tipb::VectorDistanceMetric::L2, - }); - MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); - - // Synced with mock tidb, and create the StorageDeltaMerge instance - refreshTableSchema(t1_id); - // The `DeltaMergeStore` instanced should be inited - StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); - ASSERT_NE(nullptr, storage->getStoreIfInited()); - - auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); - auto idx_infos = tbl_info.index_infos; - ASSERT_EQ(idx_infos.size(), 1); - for (const auto & idx : idx_infos) - { - ASSERT_EQ(idx.id, idx_id); - ASSERT_NE(idx.vector_index, nullptr); - ASSERT_EQ(idx.vector_index->kind, vector_index->kind); - ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); - ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); - } -} -CATCH - } // namespace DB::tests