Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "583daa3fbbbe39c90b7b92d13646bc3291d9c941" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b6d9cffd43c4e6358805a798f17e03e232994b82" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
44 changes: 38 additions & 6 deletions src/common/meta/src/ddl/alter_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ mod region_request;
mod table_cache_keys;
mod update_metadata;

use api::region::RegionResponse;
use async_trait::async_trait;
use common_catalog::format_full_table_name;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, Status};
use common_telemetry::{info, warn};
use common_telemetry::{error, info, warn};
use futures_util::future;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
Expand All @@ -30,7 +32,7 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use strum::AsRefStr;
use table::metadata::TableId;

use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::utils::{add_peer_context_if_needed, sync_follower_regions};
use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
use crate::key::table_info::TableInfoValue;
Expand All @@ -39,7 +41,7 @@ use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::find_leaders;
use crate::rpc::router::{find_leaders, RegionRoute};

pub struct AlterLogicalTablesProcedure {
pub context: DdlContext,
Expand Down Expand Up @@ -125,14 +127,20 @@ impl AlterLogicalTablesProcedure {
});
}

// Collects responses from datanodes.
let phy_raw_schemas = future::join_all(alter_region_tasks)
let mut results = future::join_all(alter_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;

// Collects responses from datanodes.
let phy_raw_schemas = results
.iter_mut()
.map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
.collect::<Vec<_>>();

if phy_raw_schemas.is_empty() {
self.submit_sync_region_requests(results, &physical_table_route.region_routes)
.await;
self.data.state = AlterTablesState::UpdateMetadata;
return Ok(Status::executing(true));
}
Expand All @@ -155,10 +163,34 @@ impl AlterLogicalTablesProcedure {
warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}

self.submit_sync_region_requests(results, &physical_table_route.region_routes)
.await;
self.data.state = AlterTablesState::UpdateMetadata;
Ok(Status::executing(true))
}

async fn submit_sync_region_requests(
&self,
results: Vec<RegionResponse>,
region_routes: &[RegionRoute],
) {
let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
if let Err(err) = sync_follower_regions(
&self.context,
self.data.physical_table_id,
results,
region_routes,
table_info.meta.engine.as_str(),
)
.await
{
error!(err; "Failed to sync regions for table {}, table_id: {}",
format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
self.data.physical_table_id
);
}
}

pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
self.update_physical_table_metadata().await?;
self.update_logical_tables_metadata().await?;
Expand Down
33 changes: 29 additions & 4 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod update_metadata;

use std::vec;

use api::region::RegionResponse;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use async_trait::async_trait;
Expand All @@ -29,7 +30,7 @@ use common_procedure::{
PoisonKeys, Procedure, ProcedureId, Status, StringKey,
};
use common_telemetry::{debug, error, info};
use futures::future;
use futures::future::{self};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::RegionId;
Expand All @@ -38,7 +39,9 @@ use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_reference::TableReference;

use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_multiple_results, MultipleResults};
use crate::ddl::utils::{
add_peer_context_if_needed, handle_multiple_results, sync_follower_regions, MultipleResults,
};
use crate::ddl::DdlContext;
use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result};
use crate::instruction::CacheIdent;
Expand All @@ -48,7 +51,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::metrics;
use crate::poison_key::table_poison_key;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution};
use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution, RegionRoute};

/// The alter table procedure
pub struct AlterTableProcedure {
Expand Down Expand Up @@ -194,7 +197,9 @@ impl AlterTableProcedure {
// Just returns the error, and wait for the next try.
Err(error)
}
MultipleResults::Ok => {
MultipleResults::Ok(results) => {
self.submit_sync_region_requests(results, &physical_table_route.region_routes)
.await;
self.data.state = AlterTableState::UpdateMetadata;
Ok(Status::executing_with_clean_poisons(true))
}
Expand All @@ -211,6 +216,26 @@ impl AlterTableProcedure {
}
}

async fn submit_sync_region_requests(
&mut self,
results: Vec<RegionResponse>,
region_routes: &[RegionRoute],
) {
// Safety: filled in `prepare` step.
let table_info = self.data.table_info().unwrap();
if let Err(err) = sync_follower_regions(
&self.context,
self.data.table_id(),
results,
region_routes,
table_info.meta.engine.as_str(),
)
.await
{
error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
}
}

/// Update table metadata.
pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
Expand Down
40 changes: 34 additions & 6 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ mod metadata;
mod region_request;
mod update_metadata;

use api::region::RegionResponse;
use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::{debug, warn};
use futures_util::future::join_all;
use common_telemetry::{debug, error, warn};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;
Expand All @@ -31,7 +33,7 @@ use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};

use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_follower_regions};
use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
use crate::key::table_route::TableRouteValue;
Expand Down Expand Up @@ -156,14 +158,20 @@ impl CreateLogicalTablesProcedure {
});
}

// Collects response from datanodes.
let phy_raw_schemas = join_all(create_region_tasks)
let mut results = future::join_all(create_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;

// Collects response from datanodes.
let phy_raw_schemas = results
.iter_mut()
.map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
.collect::<Vec<_>>();

if phy_raw_schemas.is_empty() {
self.submit_sync_region_requests(results, region_routes)
.await;
self.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(false));
}
Expand All @@ -186,10 +194,30 @@ impl CreateLogicalTablesProcedure {
warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}

self.submit_sync_region_requests(results, region_routes)
.await;
self.data.state = CreateTablesState::CreateMetadata;

Ok(Status::executing(true))
}

async fn submit_sync_region_requests(
&self,
results: Vec<RegionResponse>,
region_routes: &[RegionRoute],
) {
if let Err(err) = sync_follower_regions(
&self.context,
self.data.physical_table_id,
results,
region_routes,
METRIC_ENGINE,
)
.await
{
error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
}
}
}

#[async_trait]
Expand Down
57 changes: 53 additions & 4 deletions src/common/meta/src/ddl/drop_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
use std::collections::HashMap;

use api::v1::region::{
region_request, DropRequest as PbDropRegionRequest, RegionRequest, RegionRequestHeader,
region_request, CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest,
RegionRequest, RegionRequestHeader,
};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, error};
use common_wal::options::WalOptions;
use futures::future::join_all;
use snafu::ensure;
Expand All @@ -36,7 +37,8 @@ use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
find_follower_regions, find_followers, find_leader_regions, find_leaders,
operating_leader_regions, RegionRoute,
};

/// [Control] indicated to the caller whether to go to the next step.
Expand Down Expand Up @@ -210,10 +212,10 @@ impl DropTableExecutor {
region_routes: &[RegionRoute],
fast_path: bool,
) -> Result<()> {
// Drops leader regions on datanodes.
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
let table_id = self.table_id;

for datanode in leaders {
let requester = ctx.node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
Expand Down Expand Up @@ -252,6 +254,53 @@ impl DropTableExecutor {
.into_iter()
.collect::<Result<Vec<_>>>()?;

// Drops follower regions on datanodes.
let followers = find_followers(region_routes);
let mut close_region_tasks = Vec::with_capacity(followers.len());
for datanode in followers {
let requester = ctx.node_manager.datanode(&datanode).await;
let regions = find_follower_regions(region_routes, &datanode);
let region_ids = regions
.iter()
.map(|region_number| RegionId::new(table_id, *region_number))
.collect::<Vec<_>>();

for region_id in region_ids {
debug!("Closing region {region_id} on Datanode {datanode:?}");
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Close(PbCloseRegionRequest {
region_id: region_id.as_u64(),
})),
};

let datanode = datanode.clone();
let requester = requester.clone();
close_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(add_peer_context_if_needed(datanode)(err));
}
}
Ok(())
});
}
}

// Failure to close follower regions is not critical.
// When a leader region is dropped, follower regions will be unable to renew their leases via metasrv.
// Eventually, these follower regions will be automatically closed by the region livekeeper.
if let Err(err) = join_all(close_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()
{
error!(err; "Failed to close follower regions on datanodes, table_id: {}", table_id);
}

// Deletes the leader region from registry.
let region_ids = operating_leader_regions(region_routes);
ctx.leader_region_registry
Expand Down
5 changes: 4 additions & 1 deletion src/common/meta/src/ddl/test_util/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use api::v1::column_def::try_as_column_schema;
use api::v1::meta::Partition;
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE, MITO_ENGINE,
};
use datatypes::schema::RawSchema;
use derive_builder::Builder;
use store_api::storage::TableId;
Expand Down Expand Up @@ -164,6 +166,7 @@ pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.engine(MITO_ENGINE)
.build()
.unwrap()
.into();
Expand Down
Loading
Loading