Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5bf34f1ba22763bfd4ab2ed1dd82fc790746048a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "103948cbce833e1a17ee7083f5ba79564d08d6ec" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
3 changes: 3 additions & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
Expand Down Expand Up @@ -670,6 +671,7 @@ impl StartCommand {
node_manager,
cache_invalidator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_manager,
table_metadata_allocator,
flow_metadata_manager,
Expand Down Expand Up @@ -787,6 +789,7 @@ impl InformationExtension for StandaloneInformationExtension {
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
manifest_version: region_stat.manifest_version,
}
})
.collect::<Vec<_>>();
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub struct RegionStat {
pub sst_size: u64,
/// The size of the SST index files in bytes.
pub index_size: u64,
/// The version of manifest.
pub manifest_version: u64,
}

impl Stat {
Expand Down Expand Up @@ -185,6 +187,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
manifest_version: region_stat.manifest_version,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::region_registry::LeaderRegionRegistryRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
Expand Down Expand Up @@ -164,6 +165,8 @@ pub struct DdlContext {
pub cache_invalidator: CacheInvalidatorRef,
/// Keep tracking operating regions.
pub memory_region_keeper: MemoryRegionKeeperRef,
/// The leader region registry.
pub leader_region_registry: LeaderRegionRegistryRef,
/// Table metadata manager.
pub table_metadata_manager: TableMetadataManagerRef,
/// Allocator for table metadata.
Expand Down
9 changes: 8 additions & 1 deletion src/common/meta/src/ddl/drop_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};

/// [Control] indicated to the caller whether to go to the next step.
#[derive(Debug)]
Expand Down Expand Up @@ -250,6 +252,11 @@ impl DropTableExecutor {
.into_iter()
.collect::<Result<Vec<_>>>()?;

// Deletes the leader region from registry.
let region_ids = operating_leader_regions(region_routes);
ctx.leader_region_registry
.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));

Ok(())
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ mod tests {
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
use crate::wal_options_allocator::WalOptionsAllocator;
Expand Down Expand Up @@ -893,6 +894,7 @@ mod tests {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod node_manager;
pub mod peer;
pub mod range_stream;
pub mod region_keeper;
pub mod region_registry;
pub mod rpc;
pub mod sequence;
pub mod state_store;
Expand Down
102 changes: 102 additions & 0 deletions src/common/meta/src/region_registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use common_telemetry::warn;
use store_api::storage::RegionId;

/// Represents information about a leader region in the cluster.
/// Contains the datanode id where the leader is located,
/// and the current manifest version.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct LeaderRegion {
pub datanode_id: u64,
pub manifest_version: u64,
}

pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;

/// Registry that maintains a mapping of all leader regions in the cluster.
/// Tracks which datanode is hosting the leader for each region and the corresponding
/// manifest version.
#[derive(Default)]
pub struct LeaderRegionRegistry {
inner: RwLock<HashMap<RegionId, LeaderRegion>>,
}

impl LeaderRegionRegistry {
/// Creates a new empty leader region registry.
pub fn new() -> Self {
Self {
inner: RwLock::new(HashMap::new()),
}
}

/// Gets the leader region for the given region ids.
pub fn batch_get<I: Iterator<Item = RegionId>>(
&self,
region_ids: I,
) -> HashMap<RegionId, LeaderRegion> {
let inner = self.inner.read().unwrap();
region_ids
.into_iter()
.flat_map(|region_id| {
inner
.get(&region_id)
.map(|leader_region| (region_id, *leader_region))
})
.collect::<HashMap<_, _>>()
}

/// Puts the leader regions into the registry.
pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
let mut inner = self.inner.write().unwrap();
for (region_id, leader_region) in key_values {
match inner.entry(region_id) {
Entry::Vacant(entry) => {
entry.insert(leader_region);
}
Entry::Occupied(mut entry) => {
let manifest_version = entry.get().manifest_version;
if manifest_version > leader_region.manifest_version {
warn!(
"Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
region_id,
manifest_version,
leader_region.manifest_version
);
} else {
entry.insert(leader_region);
}
}
}
}
}

pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
let mut inner = self.inner.write().unwrap();
for region_id in region_ids {
inner.remove(&region_id);
}
}

/// Resets the registry to an empty state.
pub fn reset(&self) {
let mut inner = self.inner.write().unwrap();
inner.clear();
}
}
2 changes: 2 additions & 0 deletions src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::node_manager::{
};
use crate::peer::{Peer, PeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::{DatanodeId, FlownodeId};
Expand Down Expand Up @@ -177,6 +178,7 @@ pub fn new_ddl_context_with_kv_backend(
node_manager,
cache_invalidator: Arc::new(DummyCacheInvalidator),
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_allocator,
table_metadata_manager,
flow_metadata_allocator,
Expand Down
1 change: 0 additions & 1 deletion src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ mod test {
&[GrantedRegion {
region_id: region_id.as_u64(),
role: api::v1::meta::RegionRole::Leader.into(),
// TODO(weny): use real manifest version
manifest_version: 0,
}],
Instant::now() + Duration::from_millis(200),
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub trait HeartbeatHandler: Send + Sync {
/// HandleControl
///
/// Controls process of handling heartbeat request.
#[derive(PartialEq)]
#[derive(PartialEq, Debug)]
pub enum HandleControl {
Continue,
Done,
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/handler/collect_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ mod tests {
use common_meta::datanode::DatanodeStatKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;

use super::*;
Expand Down Expand Up @@ -257,6 +258,7 @@ mod tests {
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
};

let handler = CollectStatsHandler::default();
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ mod tests {
manifest_size: 0,
sst_size: 0,
index_size: 0,
manifest_version: 0,
}
}
acc.stat = Some(Stat {
Expand Down
46 changes: 32 additions & 14 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use store_api::region_engine::GrantedRegion;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;

use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
Expand All @@ -29,20 +31,33 @@ use crate::region::RegionLeaseKeeper;
pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,
customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
}

pub type CustomizedRegionLeaseRenewerRef = Arc<dyn CustomizedRegionLeaseRenewer>;

pub trait CustomizedRegionLeaseRenewer: Send + Sync {
fn renew(
&self,
ctx: &mut Context,
regions: HashMap<RegionId, RegionRole>,
) -> Vec<GrantedRegion>;
}

impl RegionLeaseHandler {
pub fn new(
region_lease_seconds: u64,
table_metadata_manager: TableMetadataManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
customized_region_lease_renewer: Option<CustomizedRegionLeaseRenewerRef>,
) -> Self {
let region_lease_keeper =
RegionLeaseKeeper::new(table_metadata_manager, memory_region_keeper.clone());

Self {
region_lease_seconds,
region_lease_keeper: Arc::new(region_lease_keeper),
customized_region_lease_renewer,
}
}
}
Expand All @@ -56,7 +71,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &mut Context,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some(stat) = acc.stat.as_ref() else {
Expand All @@ -74,18 +89,18 @@ impl HeartbeatHandler for RegionLeaseHandler {
.renew_region_leases(datanode_id, &regions)
.await?;

let renewed = renewed
.into_iter()
.map(|(region_id, region_role)| {
GrantedRegion {
region_id,
region_role,
// TODO(weny): use real manifest version
manifest_version: 0,
}
.into()
})
.collect::<Vec<_>>();
let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
renewer
.renew(ctx, renewed)
.into_iter()
.map(|region| region.into())
.collect()
} else {
renewed
.into_iter()
.map(|(region_id, region_role)| GrantedRegion::new(region_id, region_role).into())
.collect::<Vec<_>>()
};

acc.region_lease = Some(RegionLease {
regions: renewed,
Expand Down Expand Up @@ -141,6 +156,7 @@ mod test {
manifest_size: 0,
sst_size: 0,
index_size: 0,
manifest_version: 0,
}
}

Expand Down Expand Up @@ -200,6 +216,7 @@ mod test {
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
opening_region_keeper.clone(),
None,
);

handler.handle(&req, ctx, acc).await.unwrap();
Expand Down Expand Up @@ -342,6 +359,7 @@ mod test {
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
Default::default(),
None,
);

handler.handle(&req, ctx, acc).await.unwrap();
Expand Down
Loading
Loading