Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/common/function/src/admin/migrate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use session::context::QueryContextRef;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;

const DEFAULT_TIMEOUT_SECS: u64 = 30;
/// The default timeout for migrate region procedure.
const DEFAULT_TIMEOUT_SECS: u64 = 300;

/// A function to migrate a region from source peer to target peer.
/// Returns the submitted procedure id if success. Only available in cluster mode.
///
/// - `migrate_region(region_id, from_peer, to_peer)`, with timeout(30 seconds).
/// - `migrate_region(region_id, from_peer, to_peer)`, with timeout(300 seconds).
/// - `migrate_region(region_id, from_peer, to_peer, timeout(secs))`.
///
/// The parameters:
Expand Down
10 changes: 6 additions & 4 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ impl Display for RegionIdent {
pub struct DowngradeRegionReply {
/// Returns the `last_entry_id` if available.
pub last_entry_id: Option<u64>,
/// Returns the `metadata_last_entry_id` if available (Only available for metric engine).
pub metadata_last_entry_id: Option<u64>,
/// Indicates whether the region exists.
pub exists: bool,
/// Return error if any during the operation.
Expand Down Expand Up @@ -136,16 +138,14 @@ pub struct DowngradeRegion {
/// `None` stands for don't flush before downgrading the region.
#[serde(default)]
pub flush_timeout: Option<Duration>,
/// Rejects all write requests after flushing.
pub reject_write: bool,
}

impl Display for DowngradeRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DowngradeRegion(region_id={}, flush_timeout={:?}, rejct_write={})",
self.region_id, self.flush_timeout, self.reject_write
"DowngradeRegion(region_id={}, flush_timeout={:?})",
self.region_id, self.flush_timeout,
)
}
}
Expand All @@ -157,6 +157,8 @@ pub struct UpgradeRegion {
pub region_id: RegionId,
/// The `last_entry_id` of old leader region.
pub last_entry_id: Option<u64>,
/// The `last_entry_id` of old leader metadata region (Only used for metric engine).
pub metadata_last_entry_id: Option<u64>,
/// The timeout of waiting for a wal replay.
///
/// `None` stands for no wait,
Expand Down
4 changes: 1 addition & 3 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ mod tests {
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
flush_timeout: Some(Duration::from_secs(1)),
reject_write: false,
});
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
Expand All @@ -229,6 +228,7 @@ mod tests {
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
region_id,
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: None,
location_id: None,
});
Expand Down Expand Up @@ -419,7 +419,6 @@ mod tests {
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_secs(1)),
reject_write: false,
});

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
Expand All @@ -442,7 +441,6 @@ mod tests {
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
flush_timeout: Some(Duration::from_secs(1)),
reject_write: false,
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
Expand Down
119 changes: 67 additions & 52 deletions src/datanode/src/heartbeat/handler/downgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply};
use common_telemetry::tracing::info;
use common_telemetry::warn;
use common_telemetry::{error, warn};
use futures_util::future::BoxFuture;
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
Expand All @@ -33,25 +33,32 @@ impl HandlerContext {
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
.await
{
Ok(SetRegionRoleStateResponse::Success { last_entry_id }) => {
Ok(SetRegionRoleStateResponse::Success(success)) => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
last_entry_id: success.last_entry_id(),
metadata_last_entry_id: success.metadata_last_entry_id(),
exists: true,
error: None,
}))
}
Ok(SetRegionRoleStateResponse::NotFound) => {
warn!("Region: {region_id} is not found");
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: false,
error: None,
}))
}
Err(err) => Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
})),
Err(err) => {
error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}))
}
}
}

Expand All @@ -60,14 +67,14 @@ impl HandlerContext {
DowngradeRegion {
region_id,
flush_timeout,
reject_write,
}: DowngradeRegion,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
warn!("Region: {region_id} is not found");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: false,
error: None,
}));
Expand All @@ -89,33 +96,35 @@ impl HandlerContext {
return self.downgrade_to_follower_gracefully(region_id).await;
};

if reject_write {
// Sets region to downgrading, the downgrading region will reject all write requests.
match self
.region_server
.set_region_role_state_gracefully(
region_id,
SettableRegionRoleState::DowngradingLeader,
)
.await
{
Ok(SetRegionRoleStateResponse::Success { .. }) => {}
Ok(SetRegionRoleStateResponse::NotFound) => {
warn!("Region: {region_id} is not found");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
}));
}
Err(err) => {
warn!(err; "Failed to convert region to downgrading leader");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}));
}
// Sets region to downgrading,
// the downgrading region will reject all write requests.
// However, the downgrading region will still accept read, flush requests.
match self
.region_server
.set_region_role_state_gracefully(
region_id,
SettableRegionRoleState::DowngradingLeader,
)
.await
{
Ok(SetRegionRoleStateResponse::Success { .. }) => {}
Ok(SetRegionRoleStateResponse::NotFound) => {
warn!("Region: {region_id} is not found");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: false,
error: None,
}));
}
Err(err) => {
error!(err; "Failed to convert region to downgrading leader");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}));
}
}

Expand Down Expand Up @@ -144,20 +153,25 @@ impl HandlerContext {
}

let mut watcher = register_result.into_watcher();
let result = self.catchup_tasks.wait(&mut watcher, flush_timeout).await;
let result = self.downgrade_tasks.wait(&mut watcher, flush_timeout).await;

match result {
WaitResult::Timeout => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: Some(format!("Flush region: {region_id} is timeout")),
error: Some(format!(
"Flush region timeout, region: {region_id}, timeout: {:?}",
flush_timeout
)),
}))
}
WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await,
WaitResult::Finish(Err(err)) => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}))
Expand All @@ -174,7 +188,9 @@ mod tests {

use common_meta::instruction::{DowngradeRegion, InstructionReply};
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_engine::{RegionRole, SetRegionRoleStateResponse};
use store_api::region_engine::{
RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
};
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use tokio::time::Instant;
Expand All @@ -198,7 +214,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
Expand Down Expand Up @@ -227,7 +242,9 @@ mod tests {
Ok(0)
}));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
Ok(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(1024),
))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
Expand All @@ -240,7 +257,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
Expand All @@ -262,7 +278,9 @@ mod tests {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_delay = Some(Duration::from_secs(100));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
Ok(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(1024),
))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
Expand All @@ -274,7 +292,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(flush_timeout),
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
Expand All @@ -295,7 +312,9 @@ mod tests {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_delay = Some(Duration::from_millis(300));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
Ok(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(1024),
))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
Expand All @@ -312,7 +331,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
Expand All @@ -327,7 +345,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
Expand Down Expand Up @@ -356,7 +373,9 @@ mod tests {
.fail()
}));
region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
Ok(SetRegionRoleStateResponse::success(Some(1024)))
Ok(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(1024),
))
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
Expand All @@ -373,7 +392,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
Expand All @@ -388,7 +406,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
Expand Down Expand Up @@ -419,7 +436,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: None,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
Expand Down Expand Up @@ -451,7 +467,6 @@ mod tests {
.handle_downgrade_region_instruction(DowngradeRegion {
region_id,
flush_timeout: None,
reject_write: false,
})
.await;
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
Expand Down
Loading
Loading