Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add internal stream columns #13960

Merged
merged 2 commits into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/query/catalog/src/plan/internal_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::Path;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::string::StringColumnBuilder;
use common_expression::types::DataType;
use common_expression::types::DecimalDataType;
use common_expression::types::DecimalSize;
use common_expression::types::NumberDataType;
use common_expression::types::StringType;
use common_expression::types::UInt64Type;
use common_expression::BlockEntry;
use common_expression::BlockMetaInfo;
Expand All @@ -30,7 +33,11 @@ use common_expression::Scalar;
use common_expression::TableDataType;
use common_expression::Value;
use common_expression::BASE_BLOCK_IDS_COLUMN_ID;
use common_expression::BASE_ROW_ID_COLUMN_ID;
use common_expression::BLOCK_NAME_COLUMN_ID;
use common_expression::CHANGE_ACTION_COLUMN_ID;
use common_expression::CHANGE_IS_UPDATE_COLUMN_ID;
use common_expression::CHANGE_ROW_ID_COLUMN_ID;
use common_expression::ROW_ID_COLUMN_ID;
use common_expression::SEGMENT_NAME_COLUMN_ID;
use common_expression::SNAPSHOT_NAME_COLUMN_ID;
Expand Down Expand Up @@ -121,7 +128,13 @@ pub enum InternalColumnType {
BlockName,
SegmentName,
SnapshotName,

// stream columns
BaseRowId,
BaseBlockIds,
ChangeAction,
ChangeIsUpdate,
ChangeRowId,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
Expand All @@ -148,12 +161,16 @@ impl InternalColumn {
InternalColumnType::BlockName => TableDataType::String,
InternalColumnType::SegmentName => TableDataType::String,
InternalColumnType::SnapshotName => TableDataType::String,
InternalColumnType::BaseRowId => TableDataType::String,
InternalColumnType::BaseBlockIds => TableDataType::Array(Box::new(
TableDataType::Decimal(DecimalDataType::Decimal128(DecimalSize {
precision: 38,
scale: 0,
})),
)),
InternalColumnType::ChangeAction => TableDataType::String,
InternalColumnType::ChangeIsUpdate => TableDataType::Boolean,
InternalColumnType::ChangeRowId => TableDataType::String,
}
}

Expand All @@ -172,7 +189,21 @@ impl InternalColumn {
InternalColumnType::BlockName => BLOCK_NAME_COLUMN_ID,
InternalColumnType::SegmentName => SEGMENT_NAME_COLUMN_ID,
InternalColumnType::SnapshotName => SNAPSHOT_NAME_COLUMN_ID,
InternalColumnType::BaseRowId => BASE_ROW_ID_COLUMN_ID,
InternalColumnType::BaseBlockIds => BASE_BLOCK_IDS_COLUMN_ID,
InternalColumnType::ChangeAction => CHANGE_ACTION_COLUMN_ID,
InternalColumnType::ChangeIsUpdate => CHANGE_IS_UPDATE_COLUMN_ID,
InternalColumnType::ChangeRowId => CHANGE_ROW_ID_COLUMN_ID,
}
}

pub fn virtual_computed_expr(&self) -> Option<String> {
match &self.column_type {
InternalColumnType::ChangeRowId => Some(
"if(is_not_null(_origin_block_id), concat(to_uuid(_origin_block_id), lpad(hex(_origin_block_row_num), 6, '0')), _base_row_id)"
.to_string(),
),
_ => None,
}
}

Expand Down Expand Up @@ -234,6 +265,31 @@ impl InternalColumn {
Value::Scalar(Scalar::String(builder.build_scalar())),
)
}
InternalColumnType::BaseRowId => {
let file_stem = Path::new(&meta.block_location).file_stem().unwrap();
let file_strs = file_stem
.to_str()
.unwrap_or("")
.split('_')
.collect::<Vec<&str>>();
let uuid = file_strs[0];
let mut row_ids = Vec::with_capacity(num_rows);
if let Some(offsets) = &meta.offsets {
for i in offsets {
let row_id = format!("{}{:06x}", uuid, *i).as_bytes().to_vec();
row_ids.push(row_id);
}
} else {
for i in 0..num_rows {
let row_id = format!("{}{:06x}", uuid, i).as_bytes().to_vec();
row_ids.push(row_id);
}
}
BlockEntry::new(
DataType::String,
Value::Column(StringType::from_data(row_ids)),
)
}
InternalColumnType::BaseBlockIds => {
assert!(meta.base_block_ids.is_some());
BlockEntry::new(
Expand All @@ -246,6 +302,14 @@ impl InternalColumn {
Value::Scalar(meta.base_block_ids.clone().unwrap()),
)
}
InternalColumnType::ChangeAction => BlockEntry::new(
DataType::String,
Value::Scalar(Scalar::String("INSERT".as_bytes().to_vec())),
),
InternalColumnType::ChangeIsUpdate => {
BlockEntry::new(DataType::Boolean, Value::Scalar(Scalar::Boolean(false)))
}
InternalColumnType::ChangeRowId => unreachable!(),
}
}
}
21 changes: 19 additions & 2 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,25 @@ pub const ROW_ID_COLUMN_ID: u32 = u32::MAX;
pub const BLOCK_NAME_COLUMN_ID: u32 = u32::MAX - 1;
pub const SEGMENT_NAME_COLUMN_ID: u32 = u32::MAX - 2;
pub const SNAPSHOT_NAME_COLUMN_ID: u32 = u32::MAX - 3;
pub const BASE_BLOCK_IDS_COLUMN_ID: u32 = u32::MAX - 4;
// internal stream column id.
pub const BASE_ROW_ID_COLUMN_ID: u32 = u32::MAX - 5;
pub const BASE_BLOCK_IDS_COLUMN_ID: u32 = u32::MAX - 6;
pub const CHANGE_ACTION_COLUMN_ID: u32 = u32::MAX - 7;
pub const CHANGE_IS_UPDATE_COLUMN_ID: u32 = u32::MAX - 8;
pub const CHANGE_ROW_ID_COLUMN_ID: u32 = u32::MAX - 9;

// internal column name.
pub const ROW_ID_COL_NAME: &str = "_row_id";
pub const SNAPSHOT_NAME_COL_NAME: &str = "_snapshot_name";
pub const SEGMENT_NAME_COL_NAME: &str = "_segment_name";
pub const BLOCK_NAME_COL_NAME: &str = "_block_name";
// internal stream column name.
pub const BASE_ROW_ID_COL_NAME: &str = "_base_row_id";
pub const BASE_BLOCK_IDS_COL_NAME: &str = "_base_block_ids";
pub const CHANGE_ACTION_COL_NAME: &str = "change$action";
pub const CHANGE_IS_UPDATE_COL_NAME: &str = "change$is_update";
pub const CHANGE_ROW_ID_COL_NAME: &str = "change$row_id";

pub const ROW_NUMBER_COL_NAME: &str = "_row_number";
pub const PREDICATE_COLUMN_NAME: &str = "_predicate";

Expand All @@ -71,7 +83,12 @@ pub const ORIGIN_BLOCK_ROW_NUM_COL_NAME: &str = "_origin_block_row_num";

#[inline]
pub fn is_internal_column_id(column_id: ColumnId) -> bool {
column_id >= BASE_BLOCK_IDS_COLUMN_ID
column_id >= CHANGE_ROW_ID_COLUMN_ID
}

#[inline]
pub fn is_internal_stream_column_id(column_id: ColumnId) -> bool {
(CHANGE_ROW_ID_COLUMN_ID..=BASE_ROW_ID_COLUMN_ID).contains(&column_id)
}

#[inline]
Expand Down
13 changes: 13 additions & 0 deletions src/query/functions/src/scalars/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::io::Write;
use base64::engine::general_purpose;
use base64::prelude::*;
use bstr::ByteSlice;
use common_base::base::uuid::Uuid;
use common_expression::types::decimal::Decimal128Type;
use common_expression::types::number::SimpleDomain;
use common_expression::types::number::UInt64Type;
use common_expression::types::string::StringColumn;
Expand Down Expand Up @@ -263,6 +265,17 @@ pub fn register(registry: &mut FunctionRegistry) {
}),
);

registry.register_passthrough_nullable_1_arg::<Decimal128Type, StringType, _, _>(
"to_uuid",
|_, _| FunctionDomain::Full,
vectorize_with_builder_1_arg::<Decimal128Type, StringType>(|arg, output, _| {
let uuid = Uuid::from_u128(arg as u128);
let str = uuid.as_simple().to_string();
output.put_slice(str.as_bytes());
output.commit_row();
}),
);

registry.register_2_arg::<StringType, StringType, NumberType<i8>, _, _>(
"strcmp",
|_, _, _| FunctionDomain::Full,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3607,6 +3607,8 @@ Functions overloads:
25 to_uint8(Boolean NULL) :: UInt8 NULL
0 to_unix_timestamp(Timestamp) :: Int64
1 to_unix_timestamp(Timestamp NULL) :: Int64 NULL
0 to_uuid(Decimal(38, 0)) :: String
1 to_uuid(Decimal(38, 0) NULL) :: String NULL
0 to_variant(T0) :: Variant
1 to_variant(T0 NULL) :: Variant NULL
0 to_week_of_year(Date) :: UInt32
Expand Down
10 changes: 2 additions & 8 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use common_sql::evaluator::CompoundBlockOperator;
use common_sql::executor::physical_plans::ConstantTableScan;
use common_sql::executor::physical_plans::CteScan;
use common_sql::executor::physical_plans::TableScan;
use common_storages_fuse::operations::FillInternalColumnProcessor;

use crate::pipelines::processors::transforms::MaterializedCteSource;
use crate::pipelines::processors::transforms::TransformAddInternalColumns;
use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
Expand Down Expand Up @@ -74,13 +74,7 @@ impl PipelineBuilder {
if let Some(internal_columns) = &scan.internal_column {
if table.support_row_id_column() {
self.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(Box::new(
FillInternalColumnProcessor::create(
internal_columns.clone(),
input,
output,
),
)))
TransformAddInternalColumns::try_create(input, output, internal_columns.clone())
})?;
} else {
return Err(ErrorCode::TableEngineNotSupported(format!(
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod processor_extract_hash_table_by_row_number;
pub(crate) mod range_join;
mod transform_add_computed_columns;
mod transform_add_const_columns;
mod transform_add_internal_columns;
mod transform_add_stream_columns;
mod transform_cast_schema;
mod transform_create_sets;
Expand All @@ -42,6 +43,7 @@ pub use processor_extract_hash_table_by_row_number::ExtractHashTableByRowNumber;
pub use range_join::RangeJoinState;
pub use transform_add_computed_columns::TransformAddComputedColumns;
pub use transform_add_const_columns::TransformAddConstColumns;
pub use transform_add_internal_columns::TransformAddInternalColumns;
pub use transform_add_stream_columns::TransformAddStreamColumns;
pub use transform_cast_schema::TransformCastSchema;
pub use transform_create_sets::SubqueryReceiver;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use common_catalog::plan::InternalColumn;
use common_catalog::plan::InternalColumnMeta;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::BlockMetaInfoDowncast;
use common_expression::DataBlock;
use common_expression::FieldIndex;
use common_expression::CHANGE_ROW_ID_COLUMN_ID;
use common_pipeline_core::processors::InputPort;
use common_pipeline_core::processors::OutputPort;
use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_transforms::processors::Transform;
use common_pipeline_transforms::processors::Transformer;

pub struct TransformAddInternalColumns {
internal_columns: BTreeMap<FieldIndex, InternalColumn>,
}

impl TransformAddInternalColumns
where Self: Transform
{
pub fn try_create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
internal_columns: BTreeMap<FieldIndex, InternalColumn>,
) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(Transformer::create(
input,
output,
Self { internal_columns },
)))
}
}

impl Transform for TransformAddInternalColumns {
const NAME: &'static str = "AddInternalColumnsTransform";

fn transform(&mut self, mut block: DataBlock) -> Result<DataBlock> {
if let Some(meta) = block.take_meta() {
let internal_column_meta =
InternalColumnMeta::downcast_from(meta).ok_or(ErrorCode::Internal("It's a bug"))?;
let num_rows = block.num_rows();
for internal_column in self.internal_columns.values() {
if internal_column.column_id() == CHANGE_ROW_ID_COLUMN_ID {
continue;
}
let column =
internal_column.generate_column_values(&internal_column_meta, num_rows);
block.add_column(column);
}
}
Ok(block)
}
}
24 changes: 14 additions & 10 deletions src/query/sql/src/executor/physical_plans/physical_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,21 @@ impl PhysicalPlanBuilder {
continue;
}
let column = metadata.column(*index);
if let ColumnEntry::BaseTableColumn(BaseTableColumn { path_indices, .. }) = column {
if path_indices.is_some() {
has_inner_column = true;
match column {
ColumnEntry::BaseTableColumn(BaseTableColumn { path_indices, .. }) => {
if path_indices.is_some() {
has_inner_column = true;
}
}
} else if let ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) = column
{
project_internal_columns.insert(*index, internal_column.to_owned());
} else if let ColumnEntry::VirtualColumn(_) = column {
has_virtual_column = true;
ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) => {
project_internal_columns.insert(*index, internal_column.to_owned());
}
ColumnEntry::VirtualColumn(_) => {
has_virtual_column = true;
}
_ => {}
}

if let Some(prewhere) = &scan.prewhere {
Expand Down
18 changes: 17 additions & 1 deletion src/query/sql/src/planner/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_catalog::plan::InternalColumn;
use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::Span;
use common_expression::is_internal_stream_column_id;
use common_expression::ColumnId;
use common_expression::DataField;
use common_expression::DataSchemaRef;
Expand Down Expand Up @@ -527,6 +528,7 @@ impl BindContext {
&mut self,
column_binding: &InternalColumnBinding,
metadata: MetadataRef,
visible: bool,
) -> Result<ColumnBinding> {
if !self.allow_internal_columns {
return Err(ErrorCode::SemanticError(format!(
Expand Down Expand Up @@ -554,16 +556,30 @@ impl BindContext {

let metadata = metadata.read();
let table = metadata.table(table_index);
if table.table().engine() != "STREAM" && is_internal_stream_column_id(column_id) {
return Err(ErrorCode::SemanticError(format!(
"Internal column `{}` is not allowed in table `{}`",
column_binding.internal_column.column_name(),
table.table().name()
)));
}

let column = metadata.column(column_index);
let virtual_computed_expr = column_binding.internal_column.virtual_computed_expr();
let column_binding = ColumnBindingBuilder::new(
column.name(),
column_index,
Box::new(column.data_type()),
Visibility::Visible,
if visible {
Visibility::Visible
} else {
Visibility::InVisible
},
)
.database_name(Some(table.database().to_string()))
.table_name(Some(table.name().to_string()))
.table_index(Some(table_index))
.virtual_computed_expr(virtual_computed_expr)
.build();

if new {
Expand Down
Loading
Loading