Skip to content

Commit

Permalink
refactor: optimize out partition split insert requests (GreptimeTeam#…
Browse files Browse the repository at this point in the history
…5298)

* test: optimize out partition split insert requests if there is only one region

* Now that the optimization for single region insert has been lifted up, the original "fast path" can be obsoleted.

* resolve PR comments
  • Loading branch information
MichaelScofield authored and v0y4g3r committed Jan 19, 2025
1 parent 76326f4 commit cd51f5f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 57 deletions.
33 changes: 15 additions & 18 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use store_api::metric_engine_consts::{
};
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfoRef;
use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY};
use table::table_reference::TableReference;
use table::TableRef;
Expand Down Expand Up @@ -201,11 +202,11 @@ impl Inserter {
});
validate_column_count_match(&requests)?;

let (table_name_to_ids, instant_table_ids) = self
let (tables_info, instant_table_ids) = self
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
.await?;
let inserts = RowToRegion::new(
table_name_to_ids,
tables_info,
instant_table_ids,
self.partition_manager.as_ref(),
)
Expand Down Expand Up @@ -237,21 +238,17 @@ impl Inserter {
.await?;

// check and create logical tables
let (table_name_to_ids, instant_table_ids) = self
let (tables_info, instant_table_ids) = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
AutoCreateTableType::Logical(physical_table.to_string()),
statement_executor,
)
.await?;
let inserts = RowToRegion::new(
table_name_to_ids,
instant_table_ids,
&self.partition_manager,
)
.convert(requests)
.await?;
let inserts = RowToRegion::new(tables_info, instant_table_ids, &self.partition_manager)
.convert(requests)
.await?;

self.do_request(inserts, &ctx).await
}
Expand Down Expand Up @@ -500,14 +497,14 @@ impl Inserter {
ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
) -> Result<(HashMap<String, TableId>, HashSet<TableId>)> {
) -> Result<(HashMap<String, TableInfoRef>, HashSet<TableId>)> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
.start_timer();

let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len());
let mut tables_info = HashMap::with_capacity(requests.inserts.len());
// If `auto_create_table` hint is disabled, skip creating/altering tables.
let auto_create_table_hint = ctx
.extension(AUTO_CREATE_TABLE_KEY)
Expand Down Expand Up @@ -536,9 +533,9 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
tables_info.insert(table_info.name.clone(), table_info);
}
return Ok((table_name_to_ids, instant_table_ids));
return Ok((tables_info, instant_table_ids));
}

let mut create_tables = vec![];
Expand All @@ -552,7 +549,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
tables_info.insert(table_info.name.clone(), table_info);
if let Some(alter_expr) =
self.get_alter_table_expr_on_demand(req, &table, ctx)?
{
Expand Down Expand Up @@ -580,7 +577,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
tables_info.insert(table_info.name.clone(), table_info);
}
}
if !alter_tables.is_empty() {
Expand All @@ -603,7 +600,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
tables_info.insert(table_info.name.clone(), table_info);
}
for alter_expr in alter_tables.into_iter() {
statement_executor
Expand All @@ -613,7 +610,7 @@ impl Inserter {
}
}

Ok((table_name_to_ids, instant_table_ids))
Ok((tables_info, instant_table_ids))
}

async fn create_physical_table_on_demand(
Expand Down
42 changes: 32 additions & 10 deletions src/operator/src/req_convert/insert/row_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,31 @@
// limitations under the License.

use ahash::{HashMap, HashSet};
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::region::{InsertRequest, InsertRequests as RegionInsertRequests};
use api::v1::RowInsertRequests;
use partition::manager::PartitionRuleManager;
use snafu::OptionExt;
use table::metadata::TableId;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{TableId, TableInfoRef};

use crate::error::{Result, TableNotFoundSnafu};
use crate::insert::InstantAndNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;

pub struct RowToRegion<'a> {
table_name_to_ids: HashMap<String, TableId>,
tables_info: HashMap<String, TableInfoRef>,
instant_table_ids: HashSet<TableId>,
partition_manager: &'a PartitionRuleManager,
}

impl<'a> RowToRegion<'a> {
pub fn new(
table_name_to_ids: HashMap<String, TableId>,
tables_info: HashMap<String, TableInfoRef>,
instant_table_ids: HashSet<TableId>,
partition_manager: &'a PartitionRuleManager,
) -> Self {
Self {
table_name_to_ids,
tables_info,
instant_table_ids,
partition_manager,
}
Expand All @@ -49,10 +50,24 @@ impl<'a> RowToRegion<'a> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
let mut instant_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
let Some(rows) = request.rows else { continue };

let table_id = self.get_table_id(&request.table_name)?;
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
.await?;
let region_numbers = self.region_numbers(&request.table_name)?;
let requests = if let Some(region_id) = match region_numbers[..] {
[singular] => Some(RegionId::new(table_id, singular)),
_ => None,
} {
vec![InsertRequest {
region_id: region_id.as_u64(),
rows: Some(rows),
}]
} else {
Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, rows)
.await?
};

if self.instant_table_ids.contains(&table_id) {
instant_request.extend(requests);
} else {
Expand All @@ -71,9 +86,16 @@ impl<'a> RowToRegion<'a> {
}

fn get_table_id(&self, table_name: &str) -> Result<TableId> {
self.table_name_to_ids
self.tables_info
.get(table_name)
.map(|x| x.table_id())
.context(TableNotFoundSnafu { table_name })
}

fn region_numbers(&self, table_name: &str) -> Result<&Vec<RegionNumber>> {
self.tables_info
.get(table_name)
.cloned()
.map(|x| &x.meta.region_numbers)
.context(TableNotFoundSnafu { table_name })
}
}
43 changes: 14 additions & 29 deletions src/partition/src/splitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,35 +80,20 @@ impl<'a> SplitReadRowHelper<'a> {

fn split_rows(mut self) -> Result<HashMap<RegionNumber, Rows>> {
let regions = self.split_to_regions()?;
let request_splits = if regions.len() == 1 {
// fast path, zero copy
regions
.into_keys()
.map(|region_number| {
let rows = std::mem::take(&mut self.rows);
let rows = Rows {
schema: self.schema.clone(),
rows,
};
(region_number, rows)
})
.collect::<HashMap<_, _>>()
} else {
regions
.into_iter()
.map(|(region_number, row_indexes)| {
let rows = row_indexes
.into_iter()
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
.collect();
let rows = Rows {
schema: self.schema.clone(),
rows,
};
(region_number, rows)
})
.collect::<HashMap<_, _>>()
};
let request_splits = regions
.into_iter()
.map(|(region_number, row_indexes)| {
let rows = row_indexes
.into_iter()
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
.collect();
let rows = Rows {
schema: self.schema.clone(),
rows,
};
(region_number, rows)
})
.collect::<HashMap<_, _>>();

Ok(request_splits)
}
Expand Down

0 comments on commit cd51f5f

Please sign in to comment.