Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/common/function/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod build;
mod database;
mod partitionrule;
mod pg_catalog;
mod procedure_state;
mod timezone;
Expand All @@ -26,6 +27,7 @@ use database::{
ConnectionIdFunction, CurrentSchemaFunction, DatabaseFunction, PgBackendPidFunction,
ReadPreferenceFunction, SessionUserFunction,
};
use partitionrule::GeneratePartitionRuleFunction;
use pg_catalog::PGCatalogFunction;
use procedure_state::ProcedureStateFunction;
use timezone::TimezoneFunction;
Expand All @@ -48,5 +50,6 @@ impl SystemFunction {
registry.register_scalar(TimezoneFunction);
registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry);
registry.register_scalar(GeneratePartitionRuleFunction);
}
}
121 changes: 121 additions & 0 deletions src/common/function/src/system/partitionrule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::error::{self, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::*;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use derive_more::Display;
use snafu::ResultExt;
use sql::partition::gen::partition_rule_for_range;

use crate::function::{Function, FunctionContext};
use crate::scalars::geo::helpers::{ensure_columns_len, ensure_columns_n};

/// The helper function to generate partition rule from given criteria.
///
/// It takes 4 arguments:
/// - field name
/// - all possible characters, represented in json arrays like `[["0", "9"],
/// ["a", "z"]]`
/// - number of expected partitions
/// - given hardstops, as json string array
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct GeneratePartitionRuleFunction;

impl Function for GeneratePartitionRuleFunction {
fn name(&self) -> &str {
"generate_partition_rule"
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}

fn signature(&self) -> Signature {
Signature::new(
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::string_datatype(),
]),
Volatility::Stable,
)
}

fn eval(&self, _ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure_columns_n!(columns, 4);

let field_name_vec = &columns[0];
let ranges_vec = &columns[1];
let partition_num_vec = &columns[2];
let hardstops_vec = &columns[3];

let size = ranges_vec.len();

let mut results = StringVectorBuilder::with_capacity(size);

for i in 0..size {
let field_name = field_name_vec.get(i).as_string().unwrap();
let ranges = ranges_vec.get(i).as_string().unwrap();
let partition_num = partition_num_vec.get(i).as_u64().unwrap();
let hardstops = hardstops_vec.get(i).as_string().unwrap();

let ranges: Vec<(char, char)> = serde_json::from_str::<Vec<Vec<String>>>(&ranges)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("Json parse error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?
.iter()
.map(|v| (v[0].chars().nth(0).unwrap(), v[1].chars().nth(0).unwrap()))
.collect();
let hardstops = serde_json::from_str::<Vec<String>>(&hardstops)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("Json parse error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;

let rules =
partition_rule_for_range(&field_name, &ranges, partition_num as u32, &hardstops)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("Json parse error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
results.push(
Some(
rules
.iter()
.map(|r| r.to_string())
.collect::<Vec<String>>()
.join(",\n"),
)
.as_deref(),
);
}
Ok(results.to_vector())
}
}
4 changes: 2 additions & 2 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use snafu::ResultExt;
use sql::partition::partition_rule_for_hexstring;
use sql::partition::partition_rule_for_traceid;
use sql::statements::create::Partitions;
use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{
Expand Down Expand Up @@ -632,7 +632,7 @@ impl Inserter {
} else {
// prebuilt partition rules for uuid data: see the function
// for more information
let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
let partitions = partition_rule_for_traceid(TRACE_ID_COLUMN)
.context(CreatePartitionRulesSnafu)?;
// add skip index to
// - trace_id: when searching by trace id
Expand Down
2 changes: 2 additions & 0 deletions src/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ iso8601 = "0.6.1"
itertools.workspace = true
jsonb.workspace = true
lazy_static.workspace = true
num-bigint = "0.4"
num-traits = "0.2"
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
11 changes: 10 additions & 1 deletion src/sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid partition range, start: {start}, end: {end}"))]
InvalidPartitionRange {
start: String,
end: String,
#[snafu(implicit)]
location: Location,
},

#[cfg(feature = "enterprise")]
#[snafu(display("Missing `{}` clause", name))]
MissingClause {
Expand Down Expand Up @@ -371,7 +379,8 @@ impl ErrorExt for Error {
| ConvertToLogicalExpression { .. }
| Simplification { .. }
| InvalidInterval { .. }
| InvalidPartitionNumber { .. } => StatusCode::InvalidArguments,
| InvalidPartitionNumber { .. }
| InvalidPartitionRange { .. } => StatusCode::InvalidArguments,

#[cfg(feature = "enterprise")]
InvalidTriggerName { .. } => StatusCode::InvalidArguments,
Expand Down
15 changes: 7 additions & 8 deletions src/sql/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,34 @@ use sqlparser::ast::{BinaryOperator, Expr, Ident, Value};
use crate::error::{InvalidPartitionNumberSnafu, Result};
use crate::statements::create::Partitions;

pub mod gen;

/// The default number of partitions for OpenTelemetry traces.
const DEFAULT_PARTITION_NUM_FOR_TRACES: u32 = 16;

/// The maximum number of partitions for OpenTelemetry traces.
const MAX_PARTITION_NUM_FOR_TRACES: u32 = 65536;

#[macro_export]
macro_rules! between_string {
($col: expr, $left_incl: expr, $right_excl: expr) => {
Expr::BinaryOp {
op: BinaryOperator::And,
left: Box::new(Expr::BinaryOp {
op: BinaryOperator::GtEq,
left: Box::new($col.clone()),
right: Box::new(Expr::Value(Value::SingleQuotedString(
$left_incl.to_string(),
))),
right: Box::new(Expr::Value(Value::SingleQuotedString($left_incl))),
}),
right: Box::new(Expr::BinaryOp {
op: BinaryOperator::Lt,
left: Box::new($col.clone()),
right: Box::new(Expr::Value(Value::SingleQuotedString(
$right_excl.to_string(),
))),
right: Box::new(Expr::Value(Value::SingleQuotedString($right_excl))),
}),
}
};
}

pub fn partition_rule_for_hexstring(ident: &str) -> Result<Partitions> {
pub fn partition_rule_for_traceid(ident: &str) -> Result<Partitions> {
Ok(Partitions {
column_list: vec![Ident::new(ident)],
exprs: partition_rules_for_uuid(DEFAULT_PARTITION_NUM_FOR_TRACES, ident)?,
Expand Down Expand Up @@ -177,7 +176,7 @@ mod tests {

assert_eq!(
results,
partition_rule_for_hexstring("trace_id").unwrap().exprs
partition_rule_for_traceid("trace_id").unwrap().exprs
);
}

Expand Down
Loading
Loading