Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", branch = "zhongzc/alter-fulltext-backend" }
# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "103948cbce833e1a17ee7083f5ba79564d08d6ec" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fb8e20ce29afd81835e3ea3c1164c8ce10de2c65" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
67 changes: 56 additions & 11 deletions src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,24 @@ pub struct RegionAliveKeeper {
/// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the
/// duration acts like an "invariant point" for region's keep alive lease.
epoch: Instant,

countdown_task_ext_handler: Option<CountdownTaskHandlerExtRef>,
}

impl RegionAliveKeeper {
/// Returns an empty [RegionAliveKeeper].
pub fn new(region_server: RegionServer, heartbeat_interval_millis: u64) -> Self {
pub fn new(
region_server: RegionServer,
countdown_task_ext_handler: Option<CountdownTaskHandlerExtRef>,
heartbeat_interval_millis: u64,
) -> Self {
Self {
region_server,
tasks: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
started: Arc::new(AtomicBool::new(false)),
epoch: Instant::now(),
countdown_task_ext_handler,
}
}

Expand All @@ -85,6 +92,7 @@ impl RegionAliveKeeper {

let handle = Arc::new(CountdownTaskHandle::new(
self.region_server.clone(),
self.countdown_task_ext_handler.clone(),
region_id,
));

Expand Down Expand Up @@ -114,7 +122,9 @@ impl RegionAliveKeeper {
for region in regions {
let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
if let Some(handle) = self.find_handle(region_id).await {
handle.reset_deadline(role, deadline).await;
handle
.reset_deadline(role, deadline, region.extensions.clone())
.await;
} else {
warn!(
"Trying to renew the lease for region {region_id}, the keeper handler is not found!"
Expand Down Expand Up @@ -265,13 +275,27 @@ enum CountdownCommand {
/// 4 * `heartbeat_interval_millis`
Start(u64),
/// Reset countdown deadline to the given instance.
/// (NextRole, Deadline)
Reset((RegionRole, Instant)),
/// (NextRole, Deadline, ExtensionInfo)
Reset((RegionRole, Instant, HashMap<String, Vec<u8>>)),
/// Returns the current deadline of the countdown task.
#[cfg(test)]
Deadline(oneshot::Sender<Instant>),
}

pub type CountdownTaskHandlerExtRef = Arc<dyn CountdownTaskExtHandler>;

/// Extension trait for [CountdownTaskHandle] to reset deadline method.
#[async_trait]
pub trait CountdownTaskExtHandler: Send + Sync {
async fn reset_deadline(
&self,
region_server: &RegionServer,
role: RegionRole,
deadline: Instant,
extension_info: HashMap<String, Vec<u8>>,
);
}

struct CountdownTaskHandle {
tx: mpsc::Sender<CountdownCommand>,
handler: JoinHandle<()>,
Expand All @@ -280,11 +304,16 @@ struct CountdownTaskHandle {

impl CountdownTaskHandle {
/// Creates a new [CountdownTaskHandle] and starts the countdown task.
fn new(region_server: RegionServer, region_id: RegionId) -> Self {
fn new(
region_server: RegionServer,
handler_ext: Option<CountdownTaskHandlerExtRef>,
region_id: RegionId,
) -> Self {
let (tx, rx) = mpsc::channel(1024);

let mut countdown_task = CountdownTask {
region_server,
handler_ext,
region_id,
rx,
};
Expand Down Expand Up @@ -323,10 +352,15 @@ impl CountdownTaskHandle {
None
}

async fn reset_deadline(&self, role: RegionRole, deadline: Instant) {
async fn reset_deadline(
&self,
role: RegionRole,
deadline: Instant,
extension_info: HashMap<String, Vec<u8>>,
) {
if let Err(e) = self
.tx
.send(CountdownCommand::Reset((role, deadline)))
.send(CountdownCommand::Reset((role, deadline, extension_info)))
.await
{
warn!(
Expand All @@ -350,6 +384,7 @@ impl Drop for CountdownTaskHandle {
struct CountdownTask {
region_server: RegionServer,
region_id: RegionId,
handler_ext: Option<CountdownTaskHandlerExtRef>,
rx: mpsc::Receiver<CountdownCommand>,
}

Expand Down Expand Up @@ -379,10 +414,18 @@ impl CountdownTask {
started = true;
}
},
Some(CountdownCommand::Reset((role, deadline))) => {
Some(CountdownCommand::Reset((role, deadline, extension_info))) => {
if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
error!(err; "Failed to set region role to {role} for region {region_id}");
}
if let Some(ext_handler) = self.handler_ext.as_ref() {
ext_handler.reset_deadline(
&self.region_server,
role,
deadline,
extension_info,
).await;
}
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later.",
(deadline - Instant::now()).as_secs_f32(),
Expand Down Expand Up @@ -435,7 +478,7 @@ mod test {
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());

let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), 100));
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100));

let region_id = RegionId::new(1024, 1);
let builder = CreateRequestBuilder::new();
Expand Down Expand Up @@ -472,7 +515,7 @@ mod test {
&[GrantedRegion {
region_id: region_id.as_u64(),
role: api::v1::meta::RegionRole::Leader.into(),
manifest_version: 0,
extensions: HashMap::new(),
}],
Instant::now() + Duration::from_millis(200),
)
Expand All @@ -497,7 +540,8 @@ mod test {
async fn countdown_task() {
let region_server = mock_region_server();

let countdown_handle = CountdownTaskHandle::new(region_server, RegionId::new(9999, 2));
let countdown_handle =
CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2));

// If countdown task is not started, its deadline is set to far future.
assert!(
Expand Down Expand Up @@ -527,6 +571,7 @@ mod test {
.reset_deadline(
RegionRole::Leader,
Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
HashMap::new(),
)
.await;
assert!(
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ impl DatanodeBuilder {
region_server.clone(),
meta_client,
cache_registry,
self.plugins.clone(),
)
.await?,
)
Expand Down
6 changes: 5 additions & 1 deletion src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use common_base::Plugins;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
Expand All @@ -37,7 +38,7 @@ use tokio::sync::{mpsc, Notify};
use tokio::time::Instant;

use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::RegionAliveKeeper;
use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
Expand Down Expand Up @@ -73,9 +74,12 @@ impl HeartbeatTask {
region_server: RegionServer,
meta_client: MetaClientRef,
cache_invalidator: CacheInvalidatorRef,
plugins: Plugins,
) -> Result<Self> {
let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
countdown_task_handler_ext,
opts.heartbeat.interval.as_millis() as u64,
));
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
Expand Down
18 changes: 17 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
};
use store_api::region_engine::{
RegionEngineRef, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState,
};
use store_api::region_request::{
Expand Down Expand Up @@ -308,6 +308,22 @@ impl RegionServer {
.with_context(|_| HandleRegionRequestSnafu { region_id })
}

pub async fn sync_region_manifest(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<()> {
let engine = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
engine
.sync_region(region_id, manifest_info)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })
}

/// Set region role state gracefully.
///
/// For [SettableRegionRoleState::Follower]:
Expand Down
10 changes: 7 additions & 3 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use query::{QueryEngine, QueryEngineContext};
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState,
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState,
};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
Expand Down Expand Up @@ -246,7 +246,11 @@ impl RegionEngine for MockRegionEngine {
Some(RegionRole::Leader)
}

async fn sync_region(&self, _region_id: RegionId, _version: u64) -> Result<(), BoxedError> {
async fn sync_region(
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
unimplemented!()
}

Expand Down
7 changes: 3 additions & 4 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState, SinglePartitionScanner,
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState, SinglePartitionScanner,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
Expand Down Expand Up @@ -142,7 +141,7 @@ impl RegionEngine for FileRegionEngine {
async fn sync_region(
&self,
_region_id: RegionId,
_manifest_version: ManifestVersion,
_manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
// File engine doesn't need to sync region manifest.
Ok(())
Expand Down
50 changes: 40 additions & 10 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use common_error::status_code::StatusCode;
use mito2::engine::MitoEngine;
pub(crate) use options::IndexOptions;
use snafu::ResultExt;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
Expand All @@ -49,7 +48,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use self::state::MetricEngineState;
use crate::config::EngineConfig;
use crate::data_region::DataRegion;
use crate::error::{self, Result, UnsupportedRegionRequestSnafu, UnsupportedSyncRegionSnafu};
use crate::error::{self, MetricManifestInfoSnafu, Result, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::row_modifier::RowModifier;
use crate::utils;
Expand Down Expand Up @@ -274,10 +273,10 @@ impl RegionEngine for MetricEngine {
sst_size: metadata_stat.sst_size + data_stat.sst_size,
index_size: metadata_stat.index_size + data_stat.index_size,
manifest: RegionManifestInfo::Metric {
data_flushed_entry_id: data_stat.manifest.flushed_entry_id(),
data_manifest_version: data_stat.manifest.manifest_version(),
metadata_flushed_entry_id: metadata_stat.manifest.flushed_entry_id(),
metadata_manifest_version: metadata_stat.manifest.manifest_version(),
data_flushed_entry_id: data_stat.manifest.data_flushed_entry_id(),
data_manifest_version: data_stat.manifest.data_manifest_version(),
metadata_flushed_entry_id: metadata_stat.manifest.data_flushed_entry_id(),
metadata_manifest_version: metadata_stat.manifest.data_manifest_version(),
},
}),
_ => None,
Expand Down Expand Up @@ -310,11 +309,42 @@ impl RegionEngine for MetricEngine {

async fn sync_region(
&self,
_region_id: RegionId,
_manifest_version: ManifestVersion,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
// TODO(weny): implement it later.
Err(BoxedError::new(UnsupportedSyncRegionSnafu {}.build()))
if !manifest_info.is_metric() {
return Err(BoxedError::new(
MetricManifestInfoSnafu { region_id }.build(),
));
}

let metadata_region_id = utils::to_metadata_region_id(region_id);
// checked by ensure above
let metadata_manifest_version = manifest_info
.metadata_manifest_version()
.unwrap_or_default();
let metadata_flushed_entry_id = manifest_info
.metadata_flushed_entry_id()
.unwrap_or_default();
let metadata_region_manifest =
RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id);
self.inner
.mito
.sync_region(metadata_region_id, metadata_region_manifest)
.await?;

let data_region_id = utils::to_data_region_id(region_id);
let data_manifest_version = manifest_info.data_manifest_version();
let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
let data_region_manifest =
RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id);

self.inner
.mito
.sync_region(data_region_id, data_region_manifest)
.await?;

Ok(())
}

async fn set_region_role_state_gracefully(
Expand Down
Loading
Loading