Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catalog/src/system_schema/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod information_memory_table;
pub mod key_column_usage;
mod partitions;
mod procedure_info;
mod region_peers;
pub mod region_peers;
mod region_statistics;
mod runtime_metrics;
pub mod schemata;
Expand Down
2 changes: 2 additions & 0 deletions src/catalog/src/system_schema/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const COLUMN_NAME: &str = "column_name";
pub const REGION_ID: &str = "region_id";
pub const PEER_ID: &str = "peer_id";
const ORDINAL_POSITION: &str = "ordinal_position";
const CHARACTER_MAXIMUM_LENGTH: &str = "character_maximum_length";
const CHARACTER_OCTET_LENGTH: &str = "character_octet_length";
Expand Down
78 changes: 66 additions & 12 deletions src/catalog/src/system_schema/information_schema/region_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::common::HashMap;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
Expand All @@ -43,16 +44,22 @@ use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;

const REGION_ID: &str = "region_id";
const PEER_ID: &str = "peer_id";
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const REGION_ID: &str = "region_id";
pub const PEER_ID: &str = "peer_id";
const PEER_ADDR: &str = "peer_addr";
const IS_LEADER: &str = "is_leader";
pub const IS_LEADER: &str = "is_leader";
const STATUS: &str = "status";
const DOWN_SECONDS: &str = "down_seconds";
const INIT_CAPACITY: usize = 42;

/// The `REGION_PEERS` table provides information about the region distribution and routes. Including fields:
///
/// - `table_catalog`: the table catalog name
/// - `table_schema`: the table schema name
/// - `table_name`: the table name
/// - `region_id`: the region id
/// - `peer_id`: the region storage datanode peer id
/// - `peer_addr`: the region storage datanode gRPC peer address
Expand All @@ -77,6 +84,9 @@ impl InformationSchemaRegionPeers {

pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(PEER_ID, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
Expand Down Expand Up @@ -134,6 +144,9 @@ struct InformationSchemaRegionPeersBuilder {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,

table_catalogs: StringVectorBuilder,
table_schemas: StringVectorBuilder,
table_names: StringVectorBuilder,
region_ids: UInt64VectorBuilder,
peer_ids: UInt64VectorBuilder,
peer_addrs: StringVectorBuilder,
Expand All @@ -152,6 +165,9 @@ impl InformationSchemaRegionPeersBuilder {
schema,
catalog_name,
catalog_manager,
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
Expand All @@ -177,24 +193,28 @@ impl InformationSchemaRegionPeersBuilder {
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
let table_id_stream = catalog_manager
let table_stream = catalog_manager
.tables(&catalog_name, &schema_name, None)
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Ok(None)
} else {
Ok(Some(table_info.ident.table_id))
Ok(Some((
table_info.ident.table_id,
table_info.name.to_string(),
)))
}
});

const BATCH_SIZE: usize = 128;

// Split table ids into chunks
let mut table_id_chunks = pin!(table_id_stream.ready_chunks(BATCH_SIZE));
// Split tables into chunks
let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));

while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids.into_iter().collect::<Result<Vec<_>>>()?;
while let Some(tables) = table_chunks.next().await {
let tables = tables.into_iter().collect::<Result<HashMap<_, _>>>()?;
let table_ids = tables.keys().cloned().collect::<Vec<_>>();

let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
Expand All @@ -206,7 +226,16 @@ impl InformationSchemaRegionPeersBuilder {
};

for (table_id, routes) in table_routes {
self.add_region_peers(&predicates, table_id, &routes);
// Safety: table_id is guaranteed to be in the map
let table_name = tables.get(&table_id).unwrap();
self.add_region_peers(
&catalog_name,
&schema_name,
table_name,
&predicates,
table_id,
&routes,
);
}
}
}
Expand All @@ -216,6 +245,9 @@ impl InformationSchemaRegionPeersBuilder {

fn add_region_peers(
&mut self,
table_catalog: &str,
table_schema: &str,
table_name: &str,
predicates: &Predicates,
table_id: TableId,
routes: &[RegionRoute],
Expand All @@ -231,25 +263,47 @@ impl InformationSchemaRegionPeersBuilder {
Some("ALIVE".to_string())
};

let row = [(REGION_ID, &Value::from(region_id))];
let row = [
(TABLE_CATALOG, &Value::from(table_catalog)),
(TABLE_SCHEMA, &Value::from(table_schema)),
(TABLE_NAME, &Value::from(table_name)),
(REGION_ID, &Value::from(region_id)),
];

if !predicates.eval(&row) {
return;
}

// TODO(dennis): adds followers.
self.table_catalogs.push(Some(table_catalog));
self.table_schemas.push(Some(table_schema));
self.table_names.push(Some(table_name));
self.region_ids.push(Some(region_id));
self.peer_ids.push(peer_id);
self.peer_addrs.push(peer_addr.as_deref());
self.is_leaders.push(Some("Yes"));
self.statuses.push(state.as_deref());
self.down_seconds
.push(route.leader_down_millis().map(|m| m / 1000));

for follower in &route.follower_peers {
self.table_catalogs.push(Some(table_catalog));
self.table_schemas.push(Some(table_schema));
self.table_names.push(Some(table_name));
self.region_ids.push(Some(region_id));
self.peer_ids.push(Some(follower.id));
self.peer_addrs.push(Some(follower.addr.as_str()));
self.is_leaders.push(Some("No"));
self.statuses.push(None);
self.down_seconds.push(None);
}
}
}

fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.table_catalogs.finish()),
Arc::new(self.table_schemas.finish()),
Arc::new(self.table_names.finish()),
Arc::new(self.region_ids.finish()),
Arc::new(self.peer_ids.finish()),
Arc::new(self.peer_addrs.finish()),
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ pub fn check_permission(
Statement::ShowIndex(stmt) => {
validate_db_permission!(stmt, query_ctx);
}
Statement::ShowRegion(stmt) => {
validate_db_permission!(stmt, query_ctx);
}
Statement::ShowViews(stmt) => {
validate_db_permission!(stmt, query_ctx);
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/region_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ mod test_util;

use common_error::ext::BoxedError;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::distributed_time_constants;
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::peer::Peer;
use common_meta::{distributed_time_constants, DatanodeId};
use common_procedure::StringKey;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
Expand Down Expand Up @@ -129,9 +129,9 @@ impl AlterRegionFollowerData {
pub(crate) async fn load_datanode_table_value(
&self,
ctx: &Context,
datanode_id: DatanodeId,
) -> Result<Option<DatanodeTableValue>> {
let table_id = self.region_id.table_id();
let datanode_id = self.peer_id;
let datanode_table_key = DatanodeTableKey {
datanode_id,
table_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,26 @@ impl AddRegionFollowerProcedure {
// loads the datanode peer and check peer is alive
self.data.peer = self.data.load_datanode_peer(&self.context).await?;

// loads the datanode table value
self.data.datanode_table_value = self.data.load_datanode_table_value(&self.context).await?;

// loads the table route of the region
self.data.table_route = self.data.load_table_route(&self.context).await?;
let region_leader_datanode_id = {
let table_route = self.data.physical_table_route().unwrap();
table_route
.region_routes
.iter()
.find(|region_route| region_route.region.id == self.data.region_id)
.map(|region_route| region_route.leader_peer.as_ref().unwrap().id)
.unwrap_or_default()
};

// loads the datanode table value
self.data.datanode_table_value = self
.data
.load_datanode_table_value(&self.context, region_leader_datanode_id)
.await?;

let table_route = self.data.physical_table_route().unwrap();

let datanode_peer = self.data.datanode_peer().unwrap();

// check if the destination peer is already a leader/follower of the region
Expand Down Expand Up @@ -111,6 +125,7 @@ impl AddRegionFollowerProcedure {
"Add region({}) follower procedure is preparing, peer: {datanode_peer:?}",
self.data.region_id
);
self.data.state = AlterRegionFollowerState::SubmitRequest;

Ok(Status::executing(true))
}
Expand All @@ -126,6 +141,7 @@ impl AddRegionFollowerProcedure {
create_follower
.send_open_region_instruction(&self.context, instruction)
.await?;
self.data.state = AlterRegionFollowerState::UpdateMetadata;

Ok(Status::executing(true))
}
Expand Down Expand Up @@ -161,6 +177,7 @@ impl AddRegionFollowerProcedure {
)
.await
.context(error::TableMetadataManagerSnafu)?;
self.data.state = AlterRegionFollowerState::InvalidateTableCache;

Ok(Status::executing(true))
}
Expand All @@ -174,7 +191,7 @@ impl AddRegionFollowerProcedure {
.cache_invalidator
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
.await;
Ok(Status::executing(true))
Ok(Status::done())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,27 @@ impl RemoveRegionFollowerProcedure {
// loads the datanode peer and check peer is alive
self.data.peer = self.data.load_datanode_peer(&self.context).await?;

// loads the datanode table value
self.data.datanode_table_value = self.data.load_datanode_table_value(&self.context).await?;
// loads the table route of the region
self.data.table_route = self.data.load_table_route(&self.context).await?;

// loads the table route of the region
self.data.table_route = self.data.load_table_route(&self.context).await?;
let region_leader_datanode_id = {
let table_route = self.data.physical_table_route().unwrap();
table_route
.region_routes
.iter()
.find(|region_route| region_route.region.id == self.data.region_id)
.map(|region_route| region_route.leader_peer.as_ref().unwrap().id)
.unwrap_or_default()
};

// loads the datanode table value
self.data.datanode_table_value = self
.data
.load_datanode_table_value(&self.context, region_leader_datanode_id)
.await?;

let table_route = self.data.physical_table_route().unwrap();

// check if the destination peer has this region
Expand All @@ -96,6 +112,7 @@ impl RemoveRegionFollowerProcedure {
self.data.region_id,
self.data.datanode_peer()
);
self.data.state = AlterRegionFollowerState::SubmitRequest;

Ok(Status::executing(true))
}
Expand All @@ -111,6 +128,7 @@ impl RemoveRegionFollowerProcedure {
remove_follower
.send_close_region_instruction(&self.context, instruction)
.await?;
self.data.state = AlterRegionFollowerState::UpdateMetadata;

Ok(Status::executing(true))
}
Expand Down Expand Up @@ -147,6 +165,7 @@ impl RemoveRegionFollowerProcedure {
)
.await
.context(error::TableMetadataManagerSnafu)?;
self.data.state = AlterRegionFollowerState::InvalidateTableCache;

Ok(Status::executing(true))
}
Expand All @@ -160,7 +179,7 @@ impl RemoveRegionFollowerProcedure {
.cache_invalidator
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
.await;
Ok(Status::executing(true))
Ok(Status::done())
}
}

Expand Down
1 change: 1 addition & 0 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ impl StatementExecutor {
self.show_columns(show_columns, query_ctx).await
}
Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
Statement::ShowStatus(_) => self.show_status(query_ctx).await,
Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
Statement::Use(db) => self.use_database(db, query_ctx).await,
Expand Down
12 changes: 11 additions & 1 deletion src/operator/src/statement/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use sql::ast::Ident;
use sql::statements::create::Partitions;
use sql::statements::show::{
ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
ShowTableStatus, ShowTables, ShowVariables, ShowViews,
ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
};
use sql::statements::OptionMap;
use table::metadata::TableType;
Expand Down Expand Up @@ -94,6 +94,16 @@ impl StatementExecutor {
.context(ExecuteStatementSnafu)
}

pub(super) async fn show_region(
&self,
stmt: ShowRegion,
query_ctx: QueryContextRef,
) -> Result<Output> {
query::sql::show_region(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
.await
.context(ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn show_create_database(
&self,
Expand Down
Loading
Loading