Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,20 +308,36 @@ impl RegionServer {
.with_context(|_| HandleRegionRequestSnafu { region_id })
}

/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region_manifest(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<()> {
let engine = self
let engine_with_status = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
engine

let Some(new_opened_regions) = engine_with_status
.sync_region(region_id, manifest_info)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })
.with_context(|_| HandleRegionRequestSnafu { region_id })?
.new_opened_logical_region_ids()
else {
return Ok(());
};

for region in new_opened_regions {
self.inner.region_map.insert(
region,
RegionEngineWithStatus::Ready(engine_with_status.engine().clone()),
);
info!("Logical region {} is registered!", region);
}

Ok(())
}

/// Set region role state gracefully.
Expand Down Expand Up @@ -526,6 +542,15 @@ impl RegionEngineWithStatus {
RegionEngineWithStatus::Ready(engine) => engine,
}
}

/// Returns [RegionEngineRef] reference.
pub fn engine(&self) -> &RegionEngineRef {
match self {
RegionEngineWithStatus::Registering(engine) => engine,
RegionEngineWithStatus::Deregistering(engine) => engine,
RegionEngineWithStatus::Ready(engine) => engine,
}
}
}

impl Deref for RegionEngineWithStatus {
Expand Down Expand Up @@ -1029,7 +1054,7 @@ impl RegionServerInner {
for region in logical_regions {
self.region_map
.insert(region, RegionEngineWithStatus::Ready(engine.clone()));
debug!("Logical region {} is registered!", region);
info!("Logical region {} is registered!", region);
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState,
SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
Expand Down Expand Up @@ -250,7 +250,7 @@ impl RegionEngine for MockRegionEngine {
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
) -> Result<SyncManifestResponse, BoxedError> {
unimplemented!()
}

Expand Down
6 changes: 3 additions & 3 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SinglePartitionScanner,
SinglePartitionScanner, SyncManifestResponse,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
Expand Down Expand Up @@ -145,9 +145,9 @@ impl RegionEngine for FileRegionEngine {
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
) -> Result<SyncManifestResponse, BoxedError> {
// File engine doesn't need to sync region manifest.
Ok(())
Ok(SyncManifestResponse::NotSupported)
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
Expand Down
42 changes: 8 additions & 34 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod put;
mod read;
mod region_metadata;
mod state;
mod sync;

use std::any::Any;
use std::collections::HashMap;
Expand All @@ -41,14 +42,15 @@ use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SyncManifestResponse,
};
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};

use self::state::MetricEngineState;
use crate::config::EngineConfig;
use crate::data_region::DataRegion;
use crate::error::{self, MetricManifestInfoSnafu, Result, UnsupportedRegionRequestSnafu};
use crate::error::{self, Result, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::row_modifier::RowModifier;
use crate::utils;
Expand Down Expand Up @@ -311,40 +313,11 @@ impl RegionEngine for MetricEngine {
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
if !manifest_info.is_metric() {
return Err(BoxedError::new(
MetricManifestInfoSnafu { region_id }.build(),
));
}

let metadata_region_id = utils::to_metadata_region_id(region_id);
// checked by ensure above
let metadata_manifest_version = manifest_info
.metadata_manifest_version()
.unwrap_or_default();
let metadata_flushed_entry_id = manifest_info
.metadata_flushed_entry_id()
.unwrap_or_default();
let metadata_region_manifest =
RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id);
self.inner
.mito
.sync_region(metadata_region_id, metadata_region_manifest)
.await?;

let data_region_id = utils::to_data_region_id(region_id);
let data_manifest_version = manifest_info.data_manifest_version();
let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
let data_region_manifest =
RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id);

) -> Result<SyncManifestResponse, BoxedError> {
self.inner
.mito
.sync_region(data_region_id, data_region_manifest)
.await?;

Ok(())
.sync_region(region_id, manifest_info)
.await
.map_err(BoxedError::new)
}

async fn set_region_role_state_gracefully(
Expand Down Expand Up @@ -423,6 +396,7 @@ impl MetricEngine {
self.inner.mito.clone()
}

/// Returns all logical regions associated with the physical region.
pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
self.inner
.metadata_region
Expand Down
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl MetricEngineInner {
let _write_guard = self
.metadata_region
.write_lock_logical_region(*region_id)
.await;
.await?;
write_guards.insert(*region_id, _write_guard);
}

Expand Down
13 changes: 10 additions & 3 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,16 @@ impl MetricEngineInner {
.add_logical_regions(physical_region_id, true, logical_region_columns)
.await?;

let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.add_logical_regions(physical_region_id, logical_regions);
{
let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.add_logical_regions(physical_region_id, logical_regions.clone());
}
for logical_region_id in logical_regions {
self.metadata_region
.open_logical_region(logical_region_id)
.await;
}

Ok(())
}
Expand Down
20 changes: 14 additions & 6 deletions src/metric-engine/src/engine/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,14 @@ impl MetricEngineInner {
/// Includes:
/// - Record physical region's column names
/// - Record the mapping between logical region id and physical region id
///
/// Returns new opened logical region ids.
pub(crate) async fn recover_states(
&self,
physical_region_id: RegionId,
primary_key_encoding: PrimaryKeyEncoding,
physical_region_options: PhysicalRegionOptions,
) -> Result<()> {
) -> Result<Vec<RegionId>> {
// load logical regions and physical column names
let logical_regions = self
.metadata_region
Expand All @@ -147,7 +149,6 @@ impl MetricEngineInner {
.data_region
.physical_columns(physical_region_id)
.await?;
let logical_region_num = logical_regions.len();

{
let mut state = self.state.write().unwrap();
Expand All @@ -168,15 +169,22 @@ impl MetricEngineInner {
}
}

let mut opened_logical_region_ids = Vec::new();
// The `recover_states` may be called multiple times, we only count the logical regions
// that are opened for the first time.
for logical_region_id in logical_regions {
self.metadata_region
if self
.metadata_region
.open_logical_region(logical_region_id)
.await;
.await
{
opened_logical_region_ids.push(logical_region_id);
}
}

LOGICAL_REGION_COUNT.add(logical_region_num as i64);
LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);

Ok(())
Ok(opened_logical_region_ids)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine/region_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl MetricEngineInner {
let _read_guard = self
.metadata_region
.read_lock_logical_region(logical_region_id)
.await;
.await?;
// Load logical and physical columns, and intersect them to get logical column metadata.
let logical_column_metadata = self
.metadata_region
Expand Down
Loading
Loading