From c821aef7629e5669a200fa9ac5b2a62f101de24b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 24 Mar 2025 08:15:20 +0000 Subject: [PATCH 1/6] feat: add manifest_version to `GrantedRegion` --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/cmd/src/standalone.rs | 3 + src/common/meta/src/datanode.rs | 3 + src/common/meta/src/ddl.rs | 3 + .../meta/src/ddl/drop_table/executor.rs | 9 +- src/common/meta/src/ddl_manager.rs | 2 + src/common/meta/src/lib.rs | 1 + src/common/meta/src/region_registry.rs | 90 +++++++++++++++++++ src/common/meta/src/test_util.rs | 2 + src/datanode/src/alive_keeper.rs | 1 + src/meta-srv/src/handler.rs | 2 +- .../src/handler/collect_stats_handler.rs | 2 + src/meta-srv/src/handler/failure_handler.rs | 1 + .../src/handler/region_lease_handler.rs | 29 +++--- .../src/handler/response_header_handler.rs | 2 + src/meta-srv/src/metasrv.rs | 6 ++ src/meta-srv/src/metasrv/builder.rs | 4 + src/meta-srv/src/procedure/utils.rs | 2 + src/meta-srv/src/selector/weight_compute.rs | 3 + src/mito2/src/compaction/compactor.rs | 16 ++-- src/mito2/src/manifest/manager.rs | 69 ++++++++------ src/mito2/src/region.rs | 7 ++ src/mito2/src/region/opener.rs | 11 ++- src/mito2/src/test_util.rs | 13 ++- src/mito2/src/test_util/scheduler_util.rs | 1 + src/store-api/src/region_engine.rs | 9 +- tests-integration/src/standalone.rs | 2 + 28 files changed, 241 insertions(+), 56 deletions(-) create mode 100644 src/common/meta/src/region_registry.rs diff --git a/Cargo.lock b/Cargo.lock index 52b93a73f35f..4a35c8dbb36e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4705,7 +4705,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=97e298d119fdb9499bc6ba9e03f375cfa7cdf130#97e298d119fdb9499bc6ba9e03f375cfa7cdf130" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2d52b660fa13ed3fb6d64d64d8dd6bfaf4ca3ef6#2d52b660fa13ed3fb6d64d64d8dd6bfaf4ca3ef6" dependencies = [ "prost 0.13.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 61a0211ff876..2175a83a8f74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "97e298d119fdb9499bc6ba9e03f375cfa7cdf130" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2d52b660fa13ed3fb6d64d64d8dd6bfaf4ca3ef6" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 57c21f2ca998..cdbbbcc5a5e0 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -41,6 +41,7 @@ use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; +use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef}; use common_procedure::{ProcedureInfo, ProcedureManagerRef}; @@ -662,6 +663,7 @@ impl StartCommand { node_manager, cache_invalidator, memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + leader_region_registry: Arc::new(LeaderRegionRegistry::default()), table_metadata_manager, table_metadata_allocator, flow_metadata_manager, @@ -779,6 +781,7 @@ impl InformationExtension for StandaloneInformationExtension { manifest_size: region_stat.manifest_size, sst_size: region_stat.sst_size, index_size: region_stat.index_size, + manifest_version: region_stat.manifest_version, } }) .collect::>(); diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index d717d1523e32..a167d89599c6 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -92,6 +92,8 @@ pub struct RegionStat { pub sst_size: u64, /// The size of the SST index files in bytes. pub index_size: u64, + /// The version of manifest. + pub manifest_version: u64, } impl Stat { @@ -185,6 +187,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat { manifest_size: region_stat.manifest_size, sst_size: region_stat.sst_size, index_size: region_stat.index_size, + manifest_version: region_stat.manifest_version, } } } diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 55a9a64c84a4..51e0025272f1 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -28,6 +28,7 @@ use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; use crate::node_manager::NodeManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; +use crate::region_registry::LeaderRegionRegistryRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; use crate::DatanodeId; @@ -137,6 +138,8 @@ pub struct DdlContext { pub cache_invalidator: CacheInvalidatorRef, /// Keep tracking operating regions. pub memory_region_keeper: MemoryRegionKeeperRef, + /// The leader region registry. + pub leader_region_registry: LeaderRegionRegistryRef, /// Table metadata manager. pub table_metadata_manager: TableMetadataManagerRef, /// Allocator for table metadata. diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 43ca7ce5ac85..1204629a1e0b 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -35,7 +35,9 @@ use crate::error::{self, Result}; use crate::instruction::CacheIdent; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; -use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; +use crate::rpc::router::{ + find_leader_regions, find_leaders, operating_leader_regions, RegionRoute, +}; /// [Control] indicated to the caller whether to go to the next step. #[derive(Debug)] @@ -250,6 +252,11 @@ impl DropTableExecutor { .into_iter() .collect::>>()?; + // Deletes the leader region from registry. + let region_ids = operating_leader_regions(region_routes); + ctx.leader_region_registry + .batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id)); + Ok(()) } } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index fa01e9f700f0..f3f54a0987b0 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -850,6 +850,7 @@ mod tests { use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; + use crate::region_registry::LeaderRegionRegistry; use crate::sequence::SequenceBuilder; use crate::state_store::KvStateStore; use crate::wal_options_allocator::WalOptionsAllocator; @@ -893,6 +894,7 @@ mod tests { flow_metadata_manager, flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + leader_region_registry: Arc::new(LeaderRegionRegistry::default()), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager.clone(), diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index ccd00ab8901c..96d5326d13ff 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -39,6 +39,7 @@ pub mod node_manager; pub mod peer; pub mod range_stream; pub mod region_keeper; +pub mod region_registry; pub mod rpc; pub mod sequence; pub mod state_store; diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs new file mode 100644 index 000000000000..a71f7cb771f6 --- /dev/null +++ b/src/common/meta/src/region_registry.rs @@ -0,0 +1,90 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use common_telemetry::warn; +use store_api::storage::RegionId; + +/// Represents information about a leader region in the cluster. +/// Contains the datanode id where the leader is located, +/// and the current manifest version. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct LeaderRegion { + pub datanode_id: u64, + pub manifest_version: u64, +} + +pub type LeaderRegionRegistryRef = Arc; + +/// Registry that maintains a mapping of all leader regions in the cluster. +/// Tracks which datanode is hosting the leader for each region and the corresponding +/// manifest version. +#[derive(Default)] +pub struct LeaderRegionRegistry { + inner: RwLock>, +} + +impl LeaderRegionRegistry { + /// Creates a new empty leader region registry. + pub fn new() -> Self { + Self { + inner: RwLock::new(HashMap::new()), + } + } + + /// Gets the leader region for the given region ids. + pub fn batch_get>( + &self, + region_ids: I, + ) -> HashMap { + let inner = self.inner.read().unwrap(); + let mut result = HashMap::new(); + for region_id in region_ids { + if let Some(leader_region) = inner.get(®ion_id) { + result.insert(region_id, *leader_region); + } + } + result + } + + /// Puts the leader regions into the registry. + pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) { + let mut inner = self.inner.write().unwrap(); + for (region_id, leader_region) in key_values { + if let Some(previous) = inner.insert(region_id, leader_region) { + if previous.manifest_version > leader_region.manifest_version { + warn!( + "The manifest version of region {} is decreased from {} to {}", + region_id, previous.manifest_version, leader_region.manifest_version + ); + } + } + } + } + + pub fn batch_delete>(&self, region_ids: I) { + let mut inner = self.inner.write().unwrap(); + for region_id in region_ids { + inner.remove(®ion_id); + } + } + + /// Resets the registry to an empty state. + pub fn reset(&self) { + let mut inner = self.inner.write().unwrap(); + inner.clear(); + } +} diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 2c4ba59c7b21..0f94aad814d6 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -35,6 +35,7 @@ use crate::node_manager::{ }; use crate::peer::{Peer, PeerLookupService}; use crate::region_keeper::MemoryRegionKeeper; +use crate::region_registry::LeaderRegionRegistry; use crate::sequence::SequenceBuilder; use crate::wal_options_allocator::WalOptionsAllocator; use crate::{DatanodeId, FlownodeId}; @@ -177,6 +178,7 @@ pub fn new_ddl_context_with_kv_backend( node_manager, cache_invalidator: Arc::new(DummyCacheInvalidator), memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), + leader_region_registry: Arc::new(LeaderRegionRegistry::default()), table_metadata_allocator, table_metadata_manager, flow_metadata_allocator, diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index bf9cb16f6b67..92738033427d 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -468,6 +468,7 @@ mod test { &[GrantedRegion { region_id: region_id.as_u64(), role: api::v1::meta::RegionRole::Leader.into(), + manifest_version: 0, }], Instant::now() + Duration::from_millis(200), ) diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 6d06f328c1e5..bfecd26ee2d6 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -96,7 +96,7 @@ pub trait HeartbeatHandler: Send + Sync { /// HandleControl /// /// Controls process of handling heartbeat request. -#[derive(PartialEq)] +#[derive(PartialEq, Debug)] pub enum HandleControl { Continue, Done, diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index 20f803cb1e2b..c0cca2ca03ff 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -223,6 +223,7 @@ mod tests { use common_meta::datanode::DatanodeStatKey; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; use super::*; @@ -257,6 +258,7 @@ mod tests { is_infancy: false, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), cache_invalidator: Arc::new(DummyCacheInvalidator), + leader_region_registry: Arc::new(LeaderRegionRegistry::new()), }; let handler = CollectStatsHandler::default(); diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 203cf4af333c..066d149534ae 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -98,6 +98,7 @@ mod tests { manifest_size: 0, sst_size: 0, index_size: 0, + manifest_version: 0, } } acc.stat = Some(Stat { diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 64ec1f01e4d1..1f1907f0783a 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -56,7 +56,7 @@ impl HeartbeatHandler for RegionLeaseHandler { async fn handle( &self, req: &HeartbeatRequest, - _ctx: &mut Context, + ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result { let Some(stat) = acc.stat.as_ref() else { @@ -73,15 +73,16 @@ impl HeartbeatHandler for RegionLeaseHandler { .region_lease_keeper .renew_region_leases(datanode_id, ®ions) .await?; + let renewed_regions = renewed.keys().cloned(); + let leader_regions = ctx.leader_region_registry.batch_get(renewed_regions); let renewed = renewed .into_iter() .map(|(region_id, region_role)| { - GrantedRegion { - region_id, - region_role, - } - .into() + let manifest_version = leader_regions + .get(®ion_id) + .map_or(0, |leader_region| leader_region.manifest_version); + GrantedRegion::new(region_id, region_role, manifest_version).into() }) .collect::>(); @@ -139,6 +140,7 @@ mod test { manifest_size: 0, sst_size: 0, index_size: 0, + manifest_version: 0, } } @@ -202,7 +204,10 @@ mod test { handler.handle(&req, ctx, acc).await.unwrap(); - assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]); + assert_region_lease( + acc, + vec![GrantedRegion::new(region_id, RegionRole::Leader, 0)], + ); assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); assert_eq!( acc.region_lease.as_ref().unwrap().closeable_region_ids, @@ -229,7 +234,7 @@ mod test { assert_region_lease( acc, - vec![GrantedRegion::new(region_id, RegionRole::Follower)], + vec![GrantedRegion::new(region_id, RegionRole::Follower, 0)], ); assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); assert_eq!( @@ -264,8 +269,8 @@ mod test { assert_region_lease( acc, vec![ - GrantedRegion::new(region_id, RegionRole::Follower), - GrantedRegion::new(opening_region_id, RegionRole::Follower), + GrantedRegion::new(region_id, RegionRole::Follower, 0), + GrantedRegion::new(opening_region_id, RegionRole::Follower, 0), ], ); assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); @@ -347,8 +352,8 @@ mod test { assert_region_lease( acc, vec![ - GrantedRegion::new(region_id, RegionRole::DowngradingLeader), - GrantedRegion::new(another_region_id, RegionRole::Leader), + GrantedRegion::new(region_id, RegionRole::DowngradingLeader, 0), + GrantedRegion::new(another_region_id, RegionRole::Leader, 0), ], ); assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id])); diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 1cd6201598ee..3f7bdd5f0287 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -49,6 +49,7 @@ mod tests { use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; use common_telemetry::tracing_context::W3cTrace; @@ -84,6 +85,7 @@ mod tests { is_infancy: false, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), cache_invalidator: Arc::new(DummyCacheInvalidator), + leader_region_registry: Arc::new(LeaderRegionRegistry::new()), }; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index ca9f3ad87d92..8cc9e4a2c9f9 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -36,6 +36,7 @@ use common_meta::leadership_notifier::{ use common_meta::node_expiry_listener::NodeExpiryListener; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; +use common_meta::region_registry::LeaderRegionRegistryRef; use common_meta::wal_options_allocator::WalOptionsAllocatorRef; use common_options::datanode::DatanodeClientOptions; use common_procedure::options::ProcedureConfig; @@ -257,11 +258,13 @@ pub struct Context { pub is_infancy: bool, pub table_metadata_manager: TableMetadataManagerRef, pub cache_invalidator: CacheInvalidatorRef, + pub leader_region_registry: LeaderRegionRegistryRef, } impl Context { pub fn reset_in_memory(&self) { self.in_memory.reset(); + self.leader_region_registry.reset(); } } @@ -402,6 +405,7 @@ pub struct Metasrv { region_migration_manager: RegionMigrationManagerRef, region_supervisor_ticker: Option, cache_invalidator: CacheInvalidatorRef, + leader_region_registry: LeaderRegionRegistryRef, plugins: Plugins, } @@ -667,6 +671,7 @@ impl Metasrv { let election = self.election.clone(); let table_metadata_manager = self.table_metadata_manager.clone(); let cache_invalidator = self.cache_invalidator.clone(); + let leader_region_registry = self.leader_region_registry.clone(); Context { server_addr, @@ -679,6 +684,7 @@ impl Metasrv { is_infancy: false, table_metadata_manager, cache_invalidator, + leader_region_registry, } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index dc275e32ae63..f206129c95f6 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -34,6 +34,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::node_manager::NodeManagerRef; use common_meta::region_keeper::MemoryRegionKeeper; +use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; use common_meta::wal_options_allocator::build_wal_options_allocator; @@ -327,12 +328,14 @@ impl MetasrvBuilder { None }; + let leader_region_registry = Arc::new(LeaderRegionRegistry::default()); let ddl_manager = Arc::new( DdlManager::try_new( DdlContext { node_manager, cache_invalidator: cache_invalidator.clone(), memory_region_keeper: memory_region_keeper.clone(), + leader_region_registry: leader_region_registry.clone(), table_metadata_manager: table_metadata_manager.clone(), table_metadata_allocator: table_metadata_allocator.clone(), flow_metadata_manager: flow_metadata_manager.clone(), @@ -412,6 +415,7 @@ impl MetasrvBuilder { region_migration_manager, region_supervisor_ticker, cache_invalidator, + leader_region_registry, }) } } diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index d81c9d9c97b3..f2420522ae4a 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -123,6 +123,7 @@ pub mod test_data { use common_meta::node_manager::NodeManagerRef; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; + use common_meta::region_registry::LeaderRegionRegistry; use common_meta::rpc::router::RegionRoute; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::WalOptionsAllocator; @@ -227,6 +228,7 @@ pub mod test_data { flow_metadata_manager, flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), + leader_region_registry: Arc::new(LeaderRegionRegistry::default()), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), } } diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index cd8b0409cd01..63a10d3b4f37 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -189,6 +189,7 @@ mod tests { manifest_size: 0, sst_size: 0, index_size: 0, + manifest_version: 0, }], ..Default::default() } @@ -210,6 +211,7 @@ mod tests { manifest_size: 0, sst_size: 0, index_size: 0, + manifest_version: 0, }], ..Default::default() } @@ -231,6 +233,7 @@ mod tests { manifest_size: 0, sst_size: 0, index_size: 0, + manifest_version: 0, }], ..Default::default() } diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index fb6e7bd03f0e..dfddb437262a 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -157,12 +157,16 @@ pub async fn open_compaction_region( checkpoint_distance: mito_config.manifest_checkpoint_distance, }; - RegionManifestManager::open(region_manifest_options, Default::default()) - .await? - .context(EmptyRegionDirSnafu { - region_id: req.region_id, - region_dir: req.region_dir.as_str(), - })? + RegionManifestManager::open( + region_manifest_options, + Default::default(), + Default::default(), + ) + .await? + .context(EmptyRegionDirSnafu { + region_id: req.region_id, + region_dir: req.region_dir.as_str(), + })? }; let manifest = manifest_manager.manifest(); diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index ead8ce7bfc0e..6afe0a387483 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use common_datasource::compression::CompressionType; @@ -23,6 +23,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION}; use store_api::metadata::RegionMetadataRef; +use super::storage::is_checkpoint_file; use crate::error::{ self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result, }; @@ -115,7 +116,7 @@ pub struct RegionManifestOptions { #[derive(Debug)] pub struct RegionManifestManager { store: ManifestObjectStore, - last_version: ManifestVersion, + last_version: Arc, checkpointer: Checkpointer, manifest: Arc, stopped: bool, @@ -127,6 +128,7 @@ impl RegionManifestManager { metadata: RegionMetadataRef, options: RegionManifestOptions, total_manifest_size: Arc, + manifest_version: Arc, ) -> Result { // construct storage let mut store = ManifestObjectStore::new( @@ -164,9 +166,10 @@ impl RegionManifestManager { store.save(version, &action_list.encode()?).await?; let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION); + manifest_version.store(version, Ordering::Relaxed); Ok(Self { store, - last_version: version, + last_version: manifest_version, checkpointer, manifest: Arc::new(manifest), stopped: false, @@ -179,6 +182,7 @@ impl RegionManifestManager { pub async fn open( options: RegionManifestOptions, total_manifest_size: Arc, + manifest_version: Arc, ) -> Result> { let _t = MANIFEST_OP_ELAPSED .with_label_values(&["open"]) @@ -263,9 +267,10 @@ impl RegionManifestManager { store.clone(), last_checkpoint_version, ); + manifest_version.store(version, Ordering::Relaxed); Ok(Some(Self { store, - last_version: version, + last_version: manifest_version, checkpointer, manifest: Arc::new(manifest), stopped: false, @@ -290,13 +295,14 @@ impl RegionManifestManager { .with_label_values(&["install_manifest_to"]) .start_timer(); + let last_version = self.last_version(); // Case 1: If the target version is less than the current version, return the current version. - if self.last_version >= target_version { + if last_version >= target_version { debug!( "Target version {} is less than or equal to the current version {}, region: {}, skip install", - target_version, self.last_version, self.manifest.metadata.region_id + target_version, last_version, self.manifest.metadata.region_id ); - return Ok(self.last_version); + return Ok(last_version); } ensure!( @@ -310,7 +316,7 @@ impl RegionManifestManager { let mut manifests = self .store // Invariant: last_version < target_version. - .fetch_manifests_strict_from(self.last_version + 1, target_version + 1) + .fetch_manifests_strict_from(last_version + 1, target_version + 1) .await?; // Case 2: No manifests in range: [current_version+1, target_version+1) @@ -322,7 +328,7 @@ impl RegionManifestManager { if manifests.is_empty() { debug!( "Manifests are not strict from {}, region: {}, tries to install the last checkpoint", - self.last_version, self.manifest.metadata.region_id + last_version, self.manifest.metadata.region_id ); let last_version = self.install_last_checkpoint().await?; // Case 2.1: If the installed checkpoint version is greater than or equal to the target version, return the last version. @@ -341,14 +347,14 @@ impl RegionManifestManager { if manifests.is_empty() { return NoManifestsSnafu { region_id: self.manifest.metadata.region_id, - start_version: self.last_version + 1, + start_version: last_version + 1, end_version: target_version + 1, - last_version: self.last_version, + last_version, } .fail(); } - debug_assert_eq!(manifests.first().unwrap().0, self.last_version + 1); + debug_assert_eq!(manifests.first().unwrap().0, last_version + 1); let mut manifest_builder = RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone())); @@ -384,28 +390,29 @@ impl RegionManifestManager { region_id: self.manifest.metadata.region_id, target_version, available_version: new_manifest.manifest_version, - last_version: self.last_version, + last_version, } ); - let version = self.last_version; + let version = self.last_version(); self.manifest = Arc::new(new_manifest); - self.last_version = self.manifest.manifest_version; + let last_version = self.set_version(self.manifest.manifest_version); info!( "Install manifest changes from {} to {}, region: {}", - version, self.last_version, self.manifest.metadata.region_id + version, last_version, self.manifest.metadata.region_id ); - Ok(self.last_version) + Ok(last_version) } /// Installs the last checkpoint. pub(crate) async fn install_last_checkpoint(&mut self) -> Result { + let last_version = self.last_version(); let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await? else { return NoCheckpointSnafu { region_id: self.manifest.metadata.region_id, - last_version: self.last_version, + last_version, } .fail(); }; @@ -414,14 +421,14 @@ impl RegionManifestManager { .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size); let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint); let manifest = builder.try_build()?; - self.last_version = manifest.manifest_version; + let last_version = self.set_version(manifest.manifest_version); self.manifest = Arc::new(manifest); info!( "Installed region manifest from checkpoint: {}, region: {}", checkpoint.last_version, self.manifest.metadata.region_id ); - Ok(self.last_version) + Ok(last_version) } /// Updates the manifest. Returns the current manifest version number. @@ -486,7 +493,7 @@ impl RegionManifestManager { /// It doesn't lock the manifest directory in the object store so the result /// may be inaccurate if there are concurrent writes. pub async fn has_update(&self) -> Result { - let last_version = self.last_version; + let last_version = self.last_version(); let streamer = self.store @@ -499,7 +506,7 @@ impl RegionManifestManager { let need_update = streamer .try_any(|entry| async move { let file_name = entry.name(); - if is_delta_file(file_name) { + if is_delta_file(file_name) || is_checkpoint_file(file_name) { let version = file_version(file_name); if version > last_version { return true; @@ -515,8 +522,18 @@ impl RegionManifestManager { /// Increases last version and returns the increased version. fn increase_version(&mut self) -> ManifestVersion { - self.last_version += 1; - self.last_version + let previous = self.last_version.fetch_add(1, Ordering::Relaxed); + previous + 1 + } + + /// Sets the last version. + fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion { + self.last_version.store(version, Ordering::Relaxed); + version + } + + fn last_version(&self) -> ManifestVersion { + self.last_version.load(Ordering::Relaxed) } /// Fetches the last [RegionCheckpoint] from storage. @@ -547,8 +564,8 @@ impl RegionManifestManager { fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) { let manifest = self.manifest(); assert_eq!(manifest.metadata, *expect); - assert_eq!(self.manifest.manifest_version, self.last_version); - assert_eq!(last_version, self.last_version); + assert_eq!(self.manifest.manifest_version, self.last_version()); + assert_eq!(last_version, self.last_version()); } pub fn store(&self) -> ManifestObjectStore { diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index a2b55bc7b10d..671aaa4229a6 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -289,6 +289,7 @@ impl MitoRegion { let wal_usage = self.estimated_wal_usage(memtable_usage); let manifest_usage = self.stats.total_manifest_size(); let num_rows = version.ssts.num_rows() + version.memtables.num_rows(); + let manifest_version = self.stats.manifest_version(); RegionStatistic { num_rows, @@ -297,6 +298,7 @@ impl MitoRegion { manifest_size: manifest_usage, sst_size: sst_usage, index_size: index_usage, + manifest_version, } } @@ -747,12 +749,17 @@ pub(crate) type OpeningRegionsRef = Arc; #[derive(Default, Debug, Clone)] pub(crate) struct ManifestStats { total_manifest_size: Arc, + manifest_version: Arc, } impl ManifestStats { fn total_manifest_size(&self) -> u64 { self.total_manifest_size.load(Ordering::Relaxed) } + + fn manifest_version(&self) -> u64 { + self.manifest_version.load(Ordering::Relaxed) + } } #[cfg(test)] diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index e75ed2ee95b5..5d051f6669b0 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -224,6 +224,7 @@ impl RegionOpener { metadata.clone(), region_manifest_options, self.stats.total_manifest_size.clone(), + self.stats.manifest_version.clone(), ) .await?; @@ -351,6 +352,7 @@ impl RegionOpener { let Some(manifest_manager) = RegionManifestManager::open( region_manifest_options, self.stats.total_manifest_size.clone(), + self.stats.manifest_version.clone(), ) .await? else { @@ -528,9 +530,12 @@ impl RegionMetadataLoader { region_dir, &self.object_store_manager, )?; - let Some(manifest_manager) = - RegionManifestManager::open(region_manifest_options, Arc::new(AtomicU64::new(0))) - .await? + let Some(manifest_manager) = RegionManifestManager::open( + region_manifest_options, + Arc::new(AtomicU64::new(0)), + Arc::new(AtomicU64::new(0)), + ) + .await? else { return Ok(None); }; diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index b4b7be118406..1149f7c89aa3 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -625,11 +625,16 @@ impl TestEnv { }; if let Some(metadata) = initial_metadata { - RegionManifestManager::new(metadata, manifest_opts, Default::default()) - .await - .map(Some) + RegionManifestManager::new( + metadata, + manifest_opts, + Default::default(), + Default::default(), + ) + .await + .map(Some) } else { - RegionManifestManager::open(manifest_opts, Default::default()).await + RegionManifestManager::open(manifest_opts, Default::default(), Default::default()).await } } diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index d55213369af9..6f864ef00e36 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -121,6 +121,7 @@ impl SchedulerEnv { checkpoint_distance: 10, }, Default::default(), + Default::default(), ) .await .unwrap(), diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index d6a5aac4cc48..3f898bebe716 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -83,13 +83,15 @@ impl SetRegionRoleStateResponse { pub struct GrantedRegion { pub region_id: RegionId, pub region_role: RegionRole, + pub manifest_version: u64, } impl GrantedRegion { - pub fn new(region_id: RegionId, region_role: RegionRole) -> Self { + pub fn new(region_id: RegionId, region_role: RegionRole, manifest_version: u64) -> Self { Self { region_id, region_role, + manifest_version, } } } @@ -99,6 +101,7 @@ impl From for PbGrantedRegion { PbGrantedRegion { region_id: value.region_id.as_u64(), role: PbRegionRole::from(value.region_role).into(), + manifest_version: value.manifest_version, } } } @@ -108,6 +111,7 @@ impl From for GrantedRegion { GrantedRegion { region_id: RegionId::from_u64(value.region_id), region_role: value.role().into(), + manifest_version: value.manifest_version, } } } @@ -372,6 +376,9 @@ pub struct RegionStatistic { /// The size of SST index files in bytes. #[serde(default)] pub index_size: u64, + /// The version of manifest. + #[serde(default)] + pub manifest_version: u64, } impl RegionStatistic { diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 427a4ba4694f..81e3fcca9b5a 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -34,6 +34,7 @@ use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; +use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::build_wal_options_allocator; use common_procedure::options::ProcedureConfig; @@ -217,6 +218,7 @@ impl GreptimeDbStandaloneBuilder { node_manager: node_manager.clone(), cache_invalidator: cache_registry.clone(), memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + leader_region_registry: Arc::new(LeaderRegionRegistry::default()), table_metadata_manager, table_metadata_allocator, flow_metadata_manager, From fd33d2824663b79d0d128d4debaf935219677be0 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 25 Mar 2025 13:06:50 +0000 Subject: [PATCH 2/6] chore: upgrade proto --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a35c8dbb36e..e8b1a536c1ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4705,7 +4705,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2d52b660fa13ed3fb6d64d64d8dd6bfaf4ca3ef6#2d52b660fa13ed3fb6d64d64d8dd6bfaf4ca3ef6" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5bf34f1ba22763bfd4ab2ed1dd82fc790746048a#5bf34f1ba22763bfd4ab2ed1dd82fc790746048a" dependencies = [ "prost 0.13.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 2175a83a8f74..537c51e3cd0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2d52b660fa13ed3fb6d64d64d8dd6bfaf4ca3ef6" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5bf34f1ba22763bfd4ab2ed1dd82fc790746048a" } hex = "0.4" http = "1" humantime = "2.1" From e66a4b3b0a8bb451f78d6dfd285feb0726ff4448 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 25 Mar 2025 13:07:07 +0000 Subject: [PATCH 3/6] chore: apply review suggestions --- src/common/meta/src/region_registry.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs index a71f7cb771f6..81a20bf598cf 100644 --- a/src/common/meta/src/region_registry.rs +++ b/src/common/meta/src/region_registry.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -64,12 +65,22 @@ impl LeaderRegionRegistry { pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) { let mut inner = self.inner.write().unwrap(); for (region_id, leader_region) in key_values { - if let Some(previous) = inner.insert(region_id, leader_region) { - if previous.manifest_version > leader_region.manifest_version { - warn!( - "The manifest version of region {} is decreased from {} to {}", - region_id, previous.manifest_version, leader_region.manifest_version - ); + match inner.entry(region_id) { + Entry::Vacant(entry) => { + entry.insert(leader_region); + } + Entry::Occupied(mut entry) => { + let manifest_version = entry.get().manifest_version; + if manifest_version > leader_region.manifest_version { + warn!( + "Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}", + region_id, + manifest_version, + leader_region.manifest_version + ); + } else { + entry.insert(leader_region); + } } } } From e952417bdeac08d789a0a627dcc5506e6cfc33b7 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 26 Mar 2025 12:40:53 +0000 Subject: [PATCH 4/6] chore: apply suggestions from CR --- src/common/meta/src/region_registry.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs index 81a20bf598cf..e478c556d3fe 100644 --- a/src/common/meta/src/region_registry.rs +++ b/src/common/meta/src/region_registry.rs @@ -52,13 +52,14 @@ impl LeaderRegionRegistry { region_ids: I, ) -> HashMap { let inner = self.inner.read().unwrap(); - let mut result = HashMap::new(); - for region_id in region_ids { - if let Some(leader_region) = inner.get(®ion_id) { - result.insert(region_id, *leader_region); - } - } - result + region_ids + .into_iter() + .flat_map(|region_id| { + inner + .get(®ion_id) + .map(|leader_region| (region_id, *leader_region)) + }) + .collect::>() } /// Puts the leader regions into the registry. From 96a1e0063eeaed2a15e96bfc8fb4813d91aad8b2 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 27 Mar 2025 08:30:35 +0000 Subject: [PATCH 5/6] feat: introduce `CustomizedRegionLeaseRenewerRef` --- .../src/handler/region_lease_handler.rs | 57 ++++++++++++------- src/meta-srv/src/metasrv/builder.rs | 7 ++- src/store-api/src/region_engine.rs | 4 +- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 1f1907f0783a..7b1b1a7637f7 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; use common_meta::key::TableMetadataManagerRef; use common_meta::region_keeper::MemoryRegionKeeperRef; -use store_api::region_engine::GrantedRegion; +use store_api::region_engine::{GrantedRegion, RegionRole}; +use store_api::storage::RegionId; use crate::error::Result; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; @@ -29,6 +31,17 @@ use crate::region::RegionLeaseKeeper; pub struct RegionLeaseHandler { region_lease_seconds: u64, region_lease_keeper: RegionLeaseKeeperRef, + customized_region_lease_renewer: Option, +} + +pub type CustomizedRegionLeaseRenewerRef = Arc; + +pub trait CustomizedRegionLeaseRenewer: Send + Sync { + fn renew( + &self, + ctx: &mut Context, + regions: HashMap, + ) -> Vec; } impl RegionLeaseHandler { @@ -36,6 +49,7 @@ impl RegionLeaseHandler { region_lease_seconds: u64, table_metadata_manager: TableMetadataManagerRef, memory_region_keeper: MemoryRegionKeeperRef, + customized_region_lease_renewer: Option, ) -> Self { let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone()); @@ -43,6 +57,7 @@ impl RegionLeaseHandler { Self { region_lease_seconds, region_lease_keeper: Arc::new(region_lease_keeper), + customized_region_lease_renewer, } } } @@ -73,18 +88,19 @@ impl HeartbeatHandler for RegionLeaseHandler { .region_lease_keeper .renew_region_leases(datanode_id, ®ions) .await?; - let renewed_regions = renewed.keys().cloned(); - let leader_regions = ctx.leader_region_registry.batch_get(renewed_regions); - let renewed = renewed - .into_iter() - .map(|(region_id, region_role)| { - let manifest_version = leader_regions - .get(®ion_id) - .map_or(0, |leader_region| leader_region.manifest_version); - GrantedRegion::new(region_id, region_role, manifest_version).into() - }) - .collect::>(); + let renewed = if let Some(renewer) = &self.customized_region_lease_renewer { + renewer + .renew(ctx, renewed) + .into_iter() + .map(|region| region.into()) + .collect() + } else { + renewed + .into_iter() + .map(|(region_id, region_role)| GrantedRegion::new(region_id, region_role).into()) + .collect::>() + }; acc.region_lease = Some(RegionLease { regions: renewed, @@ -200,14 +216,12 @@ mod test { distributed_time_constants::REGION_LEASE_SECS, table_metadata_manager.clone(), opening_region_keeper.clone(), + None, ); handler.handle(&req, ctx, acc).await.unwrap(); - assert_region_lease( - acc, - vec![GrantedRegion::new(region_id, RegionRole::Leader, 0)], - ); + assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]); assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); assert_eq!( acc.region_lease.as_ref().unwrap().closeable_region_ids, @@ -234,7 +248,7 @@ mod test { assert_region_lease( acc, - vec![GrantedRegion::new(region_id, RegionRole::Follower, 0)], + vec![GrantedRegion::new(region_id, RegionRole::Follower)], ); assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); assert_eq!( @@ -269,8 +283,8 @@ mod test { assert_region_lease( acc, vec![ - GrantedRegion::new(region_id, RegionRole::Follower, 0), - GrantedRegion::new(opening_region_id, RegionRole::Follower, 0), + GrantedRegion::new(region_id, RegionRole::Follower), + GrantedRegion::new(opening_region_id, RegionRole::Follower), ], ); assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id])); @@ -345,6 +359,7 @@ mod test { distributed_time_constants::REGION_LEASE_SECS, table_metadata_manager.clone(), Default::default(), + None, ); handler.handle(&req, ctx, acc).await.unwrap(); @@ -352,8 +367,8 @@ mod test { assert_region_lease( acc, vec![ - GrantedRegion::new(region_id, RegionRole::DowngradingLeader, 0), - GrantedRegion::new(another_region_id, RegionRole::Leader, 0), + GrantedRegion::new(region_id, RegionRole::DowngradingLeader), + GrantedRegion::new(another_region_id, RegionRole::Leader), ], ); assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id])); diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index f206129c95f6..373bbf959e79 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -50,7 +50,7 @@ use crate::flow_meta_alloc::FlowPeerAllocator; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::failure_handler::RegionFailureHandler; use crate::handler::flow_state_handler::FlowStateHandler; -use crate::handler::region_lease_handler::RegionLeaseHandler; +use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler}; use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers}; use crate::lease::MetaPeerLookupService; use crate::metasrv::{ @@ -361,6 +361,10 @@ impl MetasrvBuilder { )); region_follower_manager.try_start()?; + let customized_region_lease_renewer = plugins + .as_ref() + .and_then(|plugins| plugins.get::()); + let handler_group_builder = match handler_group_builder { Some(handler_group_builder) => handler_group_builder, None => { @@ -368,6 +372,7 @@ impl MetasrvBuilder { distributed_time_constants::REGION_LEASE_SECS, table_metadata_manager.clone(), memory_region_keeper.clone(), + customized_region_lease_renewer, ); HeartbeatHandlerGroupBuilder::new(pushers) diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 3f898bebe716..69c3532f6fd4 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -87,11 +87,11 @@ pub struct GrantedRegion { } impl GrantedRegion { - pub fn new(region_id: RegionId, region_role: RegionRole, manifest_version: u64) -> Self { + pub fn new(region_id: RegionId, region_role: RegionRole) -> Self { Self { region_id, region_role, - manifest_version, + manifest_version: 0, } } } From 43263b8616be4d5831465446e9823bf5e0f12f3b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 31 Mar 2025 06:24:10 +0000 Subject: [PATCH 6/6] chore: upgrade to `103948` --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e7fc8305c44..af28ecf6a216 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4671,7 +4671,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5bf34f1ba22763bfd4ab2ed1dd82fc790746048a#5bf34f1ba22763bfd4ab2ed1dd82fc790746048a" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=103948cbce833e1a17ee7083f5ba79564d08d6ec#103948cbce833e1a17ee7083f5ba79564d08d6ec" dependencies = [ "prost 0.13.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 00539c52120f..8f221b2be1a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5bf34f1ba22763bfd4ab2ed1dd82fc790746048a" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "103948cbce833e1a17ee7083f5ba79564d08d6ec" } hex = "0.4" http = "1" humantime = "2.1"