Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ impl InformationExtension for StandaloneInformationExtension {
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
manifest_version: region_stat.manifest_version,
region_manifest: region_stat.manifest.into(),
}
})
.collect::<Vec<_>>();
Expand Down
45 changes: 42 additions & 3 deletions src/common/meta/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,22 @@ pub struct RegionStat {
pub sst_size: u64,
/// The size of the SST index files in bytes.
pub index_size: u64,
/// The version of manifest.
pub manifest_version: u64,
/// The manifest infoof the region.
pub region_manifest: RegionManifestInfo,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum RegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
},
Metric {
data_manifest_version: u64,
data_flushed_entry_id: u64,
metadata_manifest_version: u64,
metadata_flushed_entry_id: u64,
},
}

impl Stat {
Expand Down Expand Up @@ -167,6 +181,31 @@ impl TryFrom<&HeartbeatRequest> for Stat {
}
}

impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
match value {
store_api::region_engine::RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
} => RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
},
store_api::region_engine::RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
} => RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
},
}
}
}

impl From<&api::v1::meta::RegionStat> for RegionStat {
fn from(value: &api::v1::meta::RegionStat) -> Self {
let region_stat = value
Expand All @@ -187,7 +226,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
manifest_version: region_stat.manifest_version,
region_manifest: region_stat.manifest.into(),
}
}
}
Expand Down
77 changes: 73 additions & 4 deletions src/common/meta/src/region_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,82 @@ use std::sync::{Arc, RwLock};
use common_telemetry::warn;
use store_api::storage::RegionId;

use crate::datanode::RegionManifestInfo;

/// Represents information about a leader region in the cluster.
/// Contains the datanode id where the leader is located,
/// and the current manifest version.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct LeaderRegion {
pub datanode_id: u64,
pub manifest_version: u64,
pub manifest: LeaderRegionManifestInfo,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum LeaderRegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
},
Metric {
data_manifest_version: u64,
data_flushed_entry_id: u64,
metadata_manifest_version: u64,
metadata_flushed_entry_id: u64,
},
}

impl From<RegionManifestInfo> for LeaderRegionManifestInfo {
fn from(value: RegionManifestInfo) -> Self {
match value {
RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
} => LeaderRegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
},
RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
} => LeaderRegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
},
}
}
}

impl LeaderRegionManifestInfo {
/// Returns the manifest version of the leader region.
pub fn manifest_version(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
manifest_version, ..
} => *manifest_version,
LeaderRegionManifestInfo::Metric {
data_manifest_version,
..
} => *data_manifest_version,
}
}

/// Returns the flushed entry id of the leader region.
pub fn flushed_entry_id(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
flushed_entry_id, ..
} => *flushed_entry_id,
LeaderRegionManifestInfo::Metric {
data_flushed_entry_id,
..
} => *data_flushed_entry_id,
}
}
}

pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
Expand Down Expand Up @@ -71,13 +140,13 @@ impl LeaderRegionRegistry {
entry.insert(leader_region);
}
Entry::Occupied(mut entry) => {
let manifest_version = entry.get().manifest_version;
if manifest_version > leader_region.manifest_version {
let manifest_version = entry.get().manifest.manifest_version();
if manifest_version > leader_region.manifest.manifest_version() {
warn!(
"Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
region_id,
manifest_version,
leader_region.manifest_version
leader_region.manifest.manifest_version()
);
} else {
entry.insert(leader_region);
Expand Down
27 changes: 19 additions & 8 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use collect_cluster_info_handler::{
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
CollectFrontendClusterInfoHandler,
};
use collect_leader_region_handler::CollectLeaderRegionHandler;
use collect_stats_handler::CollectStatsHandler;
use common_base::Plugins;
use common_meta::datanode::Stat;
Expand Down Expand Up @@ -62,6 +63,7 @@ use crate::service::mailbox::{

pub mod check_leader_handler;
pub mod collect_cluster_info_handler;
pub mod collect_leader_region_handler;
pub mod collect_stats_handler;
pub mod extract_stat_handler;
pub mod failure_handler;
Expand Down Expand Up @@ -570,6 +572,7 @@ impl HeartbeatHandlerGroupBuilder {
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
self.add_handler_last(publish_heartbeat_handler);
}
self.add_handler_last(CollectLeaderRegionHandler);
self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
self.add_handler_last(RemapFlowPeerHandler::default());

Expand Down Expand Up @@ -848,7 +851,7 @@ mod tests {
.unwrap();

let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());

let names = [
"ResponseHeaderHandler",
Expand All @@ -862,6 +865,7 @@ mod tests {
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
Expand All @@ -884,7 +888,7 @@ mod tests {

let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(14, handlers.len());
assert_eq!(15, handlers.len());

let names = [
"ResponseHeaderHandler",
Expand All @@ -899,6 +903,7 @@ mod tests {
"MailboxHandler",
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
Expand All @@ -918,7 +923,7 @@ mod tests {

let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(14, handlers.len());
assert_eq!(15, handlers.len());

let names = [
"CollectStatsHandler",
Expand All @@ -933,6 +938,7 @@ mod tests {
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
Expand All @@ -952,7 +958,7 @@ mod tests {

let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(14, handlers.len());
assert_eq!(15, handlers.len());

let names = [
"ResponseHeaderHandler",
Expand All @@ -967,6 +973,7 @@ mod tests {
"MailboxHandler",
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
Expand All @@ -986,7 +993,7 @@ mod tests {

let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(14, handlers.len());
assert_eq!(15, handlers.len());

let names = [
"ResponseHeaderHandler",
Expand All @@ -1000,6 +1007,7 @@ mod tests {
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectStatsHandler",
"ResponseHeaderHandler",
"RemapFlowPeerHandler",
Expand All @@ -1020,7 +1028,7 @@ mod tests {

let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());

let names = [
"ResponseHeaderHandler",
Expand All @@ -1034,6 +1042,7 @@ mod tests {
"CollectFlownodeClusterInfoHandler",
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
Expand All @@ -1053,7 +1062,7 @@ mod tests {

let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());

let names = [
"ResponseHeaderHandler",
Expand All @@ -1067,6 +1076,7 @@ mod tests {
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"ResponseHeaderHandler",
"RemapFlowPeerHandler",
];
Expand All @@ -1086,7 +1096,7 @@ mod tests {

let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());

let names = [
"CollectStatsHandler",
Expand All @@ -1100,6 +1110,7 @@ mod tests {
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
Expand Down
Loading