From 682892adc09727fc48797aeacc3c40631622b397 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 25 Sep 2024 16:55:09 +0800 Subject: [PATCH] Remove useless func --- .../Interpreters/InterpreterSelectQuery.cpp | 1 - dbms/src/Server/BgStorageInit.cpp | 2 +- .../Decode/SSTFilesToDTFilesOutputStream.cpp | 6 +- .../Storages/DeltaMerge/Index/IndexInfo.cpp | 19 ++++ .../src/Storages/DeltaMerge/Index/IndexInfo.h | 2 + .../tests/gtest_dm_storage_delta_merge.cpp | 14 +-- dbms/src/Storages/IManageableStorage.h | 4 +- .../MultiRaft/Disagg/CheckpointIngestInfo.cpp | 6 +- .../Storages/KVStore/tests/gtest_kvstore.cpp | 6 +- .../tests/gtest_kvstore_fast_add_peer.cpp | 5 +- dbms/src/Storages/StorageDeltaMerge.cpp | 69 ++++++++++---- dbms/src/Storages/StorageDeltaMerge.h | 15 ++- .../TiDB/Schema/tests/gtest_schema_sync.cpp | 94 ++++++++++++++++++- 13 files changed, 197 insertions(+), 46 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index bf32782f770..490670bf66b 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -243,7 +243,6 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d || (managed_storage->engineType() != ::TiDB::StorageEngine::DT && managed_storage->engineType() != ::TiDB::StorageEngine::TMT)) { - LOG_DEBUG(log, "{}.{} is not ManageableStorage", database_name, table_name); storage = storage_tmp; table_lock = storage->lockForShare(context.getCurrentQueryId()); return; diff --git a/dbms/src/Server/BgStorageInit.cpp b/dbms/src/Server/BgStorageInit.cpp index 5c53db7083e..6fbb2060348 100644 --- a/dbms/src/Server/BgStorageInit.cpp +++ b/dbms/src/Server/BgStorageInit.cpp @@ -53,7 +53,7 @@ void doInitStores(Context & global_context, const LoggerPtr & log) const auto & [ks_id, table_id] = ks_table_id; try { - init_cnt += storage->initStoreIfDataDirExist(restore_segments_thread_pool) ? 1 : 0; + init_cnt += storage->initStoreIfNeed(restore_segments_thread_pool) ? 1 : 0; LOG_INFO(log, "Storage inited done, keyspace={} table_id={}", ks_id, table_id); } catch (...) diff --git a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp index 3a4a47975a7..72543d32c16 100644 --- a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp @@ -175,7 +175,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() RUNTIME_CHECK(dt_stream == nullptr); // The parent_path and file_id are generated by the storage. - auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile(); + auto [parent_path, file_id] = storage->getAndMaybeInitStore()->preAllocateIngestFile(); if (parent_path.empty()) { // Can not allocate path and id for storing DTFiles (the storage may be dropped / shutdown) @@ -223,7 +223,7 @@ bool SSTFilesToDTFilesOutputStream::finalizeDTFileStream() // If remote data store is not enabled, add the DTFile to StoragePathPool so that we can restore it later // Else just add it's size to disk delegator - storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); + storage->getAndMaybeInitStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); dt_stream.reset(); @@ -360,7 +360,7 @@ void SSTFilesToDTFilesOutputStream::cancel() try { // If DMFile has pre-ingested, remove it. - storage->getStore()->removePreIngestFile(file->fileId(), /*throw_on_not_exist*/ false); + storage->getAndMaybeInitStore()->removePreIngestFile(file->fileId(), /*throw_on_not_exist*/ false); // Remove local DMFile. file->remove(context.getFileProvider()); } diff --git a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp index afeaf90a02f..d01090639f1 100644 --- a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp @@ -205,4 +205,23 @@ LocalIndexInfosPtr generateLocalIndexInfos( return new_index_infos; } +bool containsLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger) +{ + if (!isVectorIndexSupported(logger)) + return false; + + for (const auto & col : table_info.columns) + { + if (col.vector_index) + { + return true; + } + } + for (const auto & idx : table_info.index_infos) + { + if (idx.vector_index) + return true; + } + return false; +} } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h index 34a5db7d1b6..113cfc4d41b 100644 --- a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h +++ b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h @@ -61,4 +61,6 @@ LocalIndexInfosPtr generateLocalIndexInfos( const TiDB::TableInfo & new_table_info, const LoggerPtr & logger); + +bool containsLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger); } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index 01d55d503b9..8860110ee1c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -162,7 +162,7 @@ try EXPECT_EQ(col_value->getDataAt(i), String(DB::toString(num_rows_write))); } } - auto delta_store = storage->getStore(); + auto delta_store = storage->getAndMaybeInitStore(); size_t total_segment_rows = 0; auto segment_stats = delta_store->getSegmentsStats(); for (auto & stat : segment_stats) @@ -308,7 +308,7 @@ try auto sort_desc = storage->getPrimarySortDescription(); ASSERT_FALSE(storage->storeInited()); - const auto & store = storage->getStore(); + const auto & store = storage->getAndMaybeInitStore(); ASSERT_TRUE(storage->storeInited()); auto pk_type2 = store->getPKDataType(); auto sort_desc2 = store->getPrimarySortDescription(); @@ -826,11 +826,11 @@ try { write_data(num_rows_write, 1000); num_rows_write += 1000; - if (storage->getStore()->getSegmentsStats().size() > 1) + if (storage->getAndMaybeInitStore()->getSegmentsStats().size() > 1) break; } { - ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1); + ASSERT_GT(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); ASSERT_EQ(read_data(), num_rows_write); } storage->flushCache(*ctx); @@ -847,13 +847,13 @@ try // write more data make sure segments more than 1 for (size_t i = 0; i < 100000; i++) { - if (storage->getStore()->getSegmentsStats().size() > 1) + if (storage->getAndMaybeInitStore()->getSegmentsStats().size() > 1) break; write_data(num_rows_write, 1000); num_rows_write += 1000; } { - ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1); + ASSERT_GT(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); ASSERT_EQ(read_data(), num_rows_write); } storage->flushCache(*ctx); @@ -873,7 +873,7 @@ try // restore the table and make sure there is just one segment left create_table(); { - ASSERT_EQ(storage->getStore()->getSegmentsStats().size(), 1); + ASSERT_EQ(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); ASSERT_LT(read_data(), num_rows_write); } storage->drop(); diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index 27d761f064c..343ecdee2ea 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -90,8 +90,8 @@ class IManageableStorage : public IStorage /// `limit` is the max number of segments to gc, return value is the number of segments gced virtual UInt64 onSyncGc(Int64 /*limit*/, const DM::GCOptions &) { throw Exception("Unsupported"); } - /// Return true is data dir exist - virtual bool initStoreIfDataDirExist(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); } + /// Return true if the DeltaMergeStore instance need to be inited + virtual bool initStoreIfNeed(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); } virtual TiDB::StorageEngine engineType() const = 0; diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp index be220967915..cb1ef198828 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp @@ -80,7 +80,8 @@ CheckpointIngestInfoPtr CheckpointIngestInfo::restore( if (storage && storage->engineType() == TiDB::StorageEngine::DT) { auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context + = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & seg_persisted : ingest_info_persisted.segments()) { ReadBufferFromString buf(seg_persisted.segment_meta()); @@ -197,7 +198,8 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region, if (storage && storage->engineType() == TiDB::StorageEngine::DT) { auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context + = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & segment_to_drop : segments) { DM::WriteBatches wbs(*dm_context->storage_pool, dm_context->getWriteLimiter()); diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index 7a70e673fad..4508c7af3c2 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -1085,7 +1085,7 @@ try { if (ingest_using_split) { - auto stats = storage->getStore()->getStoreStats(); + auto stats = storage->getAndMaybeInitStore()->getStoreStats(); // Including 0..20, 20..100, 100..inf. ASSERT_EQ(3, stats.segment_count); } @@ -1128,7 +1128,7 @@ try } } { - auto stats = storage->getStore()->getStoreStats(); + auto stats = storage->getAndMaybeInitStore()->getStoreStats(); ASSERT_NE(0, stats.total_stable_size_on_disk); ASSERT_NE(0, stats.total_rows); ASSERT_NE(0, stats.total_size); @@ -1150,7 +1150,7 @@ try auto gc_n = storage->onSyncGc(100, DM::GCOptions::newAllForTest()); ASSERT_EQ(0, gc_n); - auto stats = storage->getStore()->getStoreStats(); + auto stats = storage->getAndMaybeInitStore()->getStoreStats(); // The data of [20, 100), is not reclaimed during Apply Snapshot. if (ingest_using_split) { diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 15e2373a34f..87626e2e72d 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -246,7 +246,8 @@ void assertNoSegment( auto storage = storages.get(keyspace_id, table_id); RUNTIME_CHECK(storage && storage->engineType() == TiDB::StorageEngine::DT); auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context + = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & seg_persisted : ingest_info_persisted.segments()) { ReadBufferFromString buf(seg_persisted.segment_meta()); @@ -485,7 +486,7 @@ try auto storage = global_context.getTMTContext().getStorages().get(keyspace_id, table_id); ASSERT_TRUE(storage && storage->engineType() == TiDB::StorageEngine::DT); auto dm_storage = std::dynamic_pointer_cast(storage); - auto store = dm_storage->getStore(); + auto store = dm_storage->getAndMaybeInitStore(); ASSERT_EQ(store->getRowKeyColumnSize(), 1); verifyRows( global_context, diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 9188e84f4a6..af7afa18a09 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -93,11 +93,10 @@ StorageDeltaMerge::StorageDeltaMerge( , global_context(global_context_.getGlobalContext()) , log(Logger::get(fmt::format("{}.{}", db_name_, table_name_))) { - if (primary_expr_ast_->children.empty()) - throw Exception("No primary key"); + RUNTIME_CHECK_MSG(!primary_expr_ast_->children.empty(), "No primary key, ident={}", log->identifier()); // save schema from TiDB - if (table_info_) + if (likely(table_info_)) { tidb_table_info = table_info_->get(); is_common_handle = tidb_table_info.is_common_handle; @@ -113,6 +112,11 @@ StorageDeltaMerge::StorageDeltaMerge( table_column_info = std::make_unique(db_name_, table_name_, primary_expr_ast_); updateTableColumnInfo(); + + if (table_info_ && containsLocalIndexInfos(table_info_.value(), log)) + { + getAndMaybeInitStore(); + } } void StorageDeltaMerge::updateTableColumnInfo() @@ -1394,16 +1398,34 @@ void StorageDeltaMerge::alterSchemaChange( LOG_DEBUG(log, "Update table_info: {} => {}", tidb_table_info.serialize(), table_info.serialize()); { - std::lock_guard lock(store_mutex); // Avoid concurrent init store and DDL. + // In order to avoid concurrent issue between init store and DDL, + // we must acquire the lock before schema changes is applied. + std::lock_guard lock(store_mutex); if (storeInited()) { _store->applySchemaChanges(table_info); } - else // it seems we will never come into this branch ? + else { - updateTableColumnInfo(); + if (containsLocalIndexInfos(table_info, log)) + { + // If there exist vector index, then we must init the store to create + // at least 1 segment. So that tidb can detect the index is added. + initStore(lock); + _store->applySchemaChanges(table_info); + } + else + { + // If there is no data need to be stored for this table, the _store instance + // is not inited to reduce fragmentation files that may exhaust the inode of + // disk. + // Under this case, we update some in-memory variables to ensure the correctness. + updateTableColumnInfo(); + } } } + + // Should generate new decoding snapshot and cache block decoding_schema_changed = true; SortDescription pk_desc = getPrimarySortDescription(); @@ -1802,14 +1824,9 @@ SortDescription StorageDeltaMerge::getPrimarySortDescription() const return desc; } -DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread_pool) +DeltaMergeStorePtr & StorageDeltaMerge::initStore(const std::lock_guard &, ThreadPool * thread_pool) { - if (storeInited()) - { - return _store; - } - std::lock_guard lock(store_mutex); - if (_store == nullptr) + if (likely(_store == nullptr)) { auto index_infos = initLocalIndexInfos(tidb_table_info, log); _store = DeltaMergeStore::create( @@ -1834,8 +1851,20 @@ DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread return _store; } -bool StorageDeltaMerge::initStoreIfDataDirExist(ThreadPool * thread_pool) +DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread_pool) +{ + if (storeInited()) + { + return _store; + } + std::lock_guard lock(store_mutex); + return initStore(lock, thread_pool); +} + +bool StorageDeltaMerge::initStoreIfNeed(ThreadPool * thread_pool) { + // If the table is tombstone, then wait for it exceeds the gc_safepoint and + // get physically drop without initing the instance. if (shutdown_called.load(std::memory_order_relaxed) || isTombstone()) { return false; @@ -1845,12 +1874,16 @@ bool StorageDeltaMerge::initStoreIfDataDirExist(ThreadPool * thread_pool) { return true; } - if (!dataDirExist()) + if (dataDirExist() || containsLocalIndexInfos(tidb_table_info, log)) { - return false; + // - there exist some data stored on disk + // - there exist tiflash local index + // We need to init the DeltaMergeStore instance for reporting the disk usage, local index + // status, etc. + getAndMaybeInitStore(thread_pool); + return true; } - getAndMaybeInitStore(thread_pool); - return true; + return false; } bool StorageDeltaMerge::dataDirExist() diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index a66b9de2572..a2a33e60943 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -191,10 +191,17 @@ class StorageDeltaMerge void checkStatus(const Context & context) override; void deleteRows(const Context &, size_t rows) override; - const DM::DeltaMergeStorePtr & getStore() { return getAndMaybeInitStore(); } - + // Return the DeltaMergeStore instance if it has been inited + // Else return nullptr DM::DeltaMergeStorePtr getStoreIfInited() const; + // Return the DeltaMergeStore instance + // If the instance is not inited, this method will initialize the instance + // and return it. + DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr); + + bool initStoreIfNeed(ThreadPool * thread_pool) override; + bool isCommonHandle() const override { return is_common_handle; } size_t getRowKeyColumnSize() const override { return rowkey_column_size; } @@ -206,8 +213,6 @@ class StorageDeltaMerge void releaseDecodingBlock(Int64 block_decoding_schema_epoch, BlockUPtr block) override; - bool initStoreIfDataDirExist(ThreadPool * thread_pool) override; - DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); } #ifndef DBMS_PUBLIC_GTEST @@ -238,7 +243,7 @@ class StorageDeltaMerge DataTypePtr getPKTypeImpl() const override; - DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr); + DM::DeltaMergeStorePtr & initStore(const std::lock_guard &, ThreadPool * thread_pool = nullptr); bool storeInited() const { return store_inited.load(std::memory_order_acquire); } void updateTableColumnInfo(); ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info); diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index 90675aa00c0..a547cb8e4cf 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -795,7 +795,7 @@ try DM::tests::DeltaMergeStoreVectorBase dmsv; StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); - dmsv.store = storage->getStore(); + dmsv.store = storage->getAndMaybeInitStore(); dmsv.db_context = std::make_shared(global_ctx.getGlobalContext()); dmsv.vec_column_name = cols.getAllPhysical().back().name; dmsv.vec_column_id = mustGetSyncedTable(t1_id)->getTableInfo().getColumnID(dmsv.vec_column_name); @@ -817,7 +817,6 @@ try // sync table schema, the VectorIndex in TableInfo should be updated refreshTableSchema(t1_id); auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); - tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); idx_infos = tbl_info.index_infos; ASSERT_EQ(idx_infos.size(), 1); for (const auto & idx : idx_infos) @@ -899,4 +898,95 @@ try } CATCH + +TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase1) +try +{ + auto pd_client = global_ctx.getTMTContext().getPDClient(); + + const String db_name = "mock_db"; + MockTiDB::instance().newDataBase(db_name); + + auto cols = ColumnsDescription({ + {"col1", typeFromString("Int64")}, + {"vec", typeFromString("Array(Float32)")}, + }); + auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); + refreshSchema(); + + // The `StorageDeltaMerge` is created but `DeltaMergeStore` is not inited + StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); + ASSERT_EQ(nullptr,storage->getStoreIfInited()); + + // add a vector index + IndexID idx_id = 11; + auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); + + // sync table schema, the VectorIndex in TableInfo should be updated + refreshTableSchema(t1_id); + // The `DeltaMergeStore` instanced should be inited + ASSERT_NE(nullptr,storage->getStoreIfInited()); + + auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + auto idx_infos = tbl_info.index_infos; + ASSERT_EQ(idx_infos.size(), 1); + for (const auto & idx : idx_infos) + { + ASSERT_EQ(idx.id, idx_id); + ASSERT_NE(idx.vector_index, nullptr); + ASSERT_EQ(idx.vector_index->kind, vector_index->kind); + ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); + ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); + } + +} +CATCH + +TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase2) +try +{ + auto pd_client = global_ctx.getTMTContext().getPDClient(); + + const String db_name = "mock_db"; + MockTiDB::instance().newDataBase(db_name); + + // The table is created and vector index is added. After that, the table info is synced to TiFlash + auto cols = ColumnsDescription({ + {"col1", typeFromString("Int64")}, + {"vec", typeFromString("Array(Float32)")}, + }); + auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); + IndexID idx_id = 11; + auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); + + // Synced with mock tidb, and create the StorageDeltaMerge instance + refreshTableSchema(t1_id); + // The `DeltaMergeStore` instanced should be inited + StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); + ASSERT_NE(nullptr, storage->getStoreIfInited()); + + auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + auto idx_infos = tbl_info.index_infos; + ASSERT_EQ(idx_infos.size(), 1); + for (const auto & idx : idx_infos) + { + ASSERT_EQ(idx.id, idx_id); + ASSERT_NE(idx.vector_index, nullptr); + ASSERT_EQ(idx.vector_index->kind, vector_index->kind); + ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); + ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); + } +} +CATCH + } // namespace DB::tests