Skip to content

Commit

Permalink
test: optimize out partition split insert requests if there is only o…
Browse files Browse the repository at this point in the history
…ne region
  • Loading branch information
MichaelScofield committed Jan 6, 2025
1 parent b229c94 commit 3ce63f3
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions src/operator/src/req_convert/insert/row_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
// limitations under the License.

use ahash::{HashMap, HashSet};
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::region::{InsertRequest, InsertRequests as RegionInsertRequests};
use api::v1::RowInsertRequests;
use partition::manager::PartitionRuleManager;
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;

use crate::error::{Result, TableNotFoundSnafu};
use crate::error::{FindTablePartitionRuleSnafu, Result, TableNotFoundSnafu};
use crate::insert::InstantAndNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;

Expand Down Expand Up @@ -49,10 +50,36 @@ impl<'a> RowToRegion<'a> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
let mut instant_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
let Some(rows) = request.rows else { continue };

let table_id = self.get_table_id(&request.table_name)?;
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
.await?;
let table_route = self
.partition_manager
.find_physical_table_route(table_id)
.await
.context(FindTablePartitionRuleSnafu {
table_name: &request.table_name,
})?;
let requests = if let Some(region_id) = match table_route.region_routes.split_first() {
Some((first, rest)) => {
if rest.is_empty() {
Some(first.region.id)
} else {
None
}
}
None => Some(RegionId::new(table_id, 0)),
} {
vec![InsertRequest {
region_id: region_id.as_u64(),
rows: Some(rows),
}]
} else {
Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, rows)
.await?
};

if self.instant_table_ids.contains(&table_id) {
instant_request.extend(requests);
} else {
Expand Down

0 comments on commit 3ce63f3

Please sign in to comment.