Skip to content

Commit

Permalink
feat: add internal stream columns (#13960)
Browse files Browse the repository at this point in the history
* add internal stream columns

* fix
  • Loading branch information
zhyass authored Dec 10, 2023
1 parent f8c5419 commit 993f03c
Show file tree
Hide file tree
Showing 22 changed files with 292 additions and 192 deletions.
64 changes: 64 additions & 0 deletions src/query/catalog/src/plan/internal_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::path::Path;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::string::StringColumnBuilder;
use common_expression::types::DataType;
use common_expression::types::DecimalDataType;
use common_expression::types::DecimalSize;
use common_expression::types::NumberDataType;
use common_expression::types::StringType;
use common_expression::types::UInt64Type;
use common_expression::BlockEntry;
use common_expression::BlockMetaInfo;
Expand All @@ -30,7 +33,11 @@ use common_expression::Scalar;
use common_expression::TableDataType;
use common_expression::Value;
use common_expression::BASE_BLOCK_IDS_COLUMN_ID;
use common_expression::BASE_ROW_ID_COLUMN_ID;
use common_expression::BLOCK_NAME_COLUMN_ID;
use common_expression::CHANGE_ACTION_COLUMN_ID;
use common_expression::CHANGE_IS_UPDATE_COLUMN_ID;
use common_expression::CHANGE_ROW_ID_COLUMN_ID;
use common_expression::ROW_ID_COLUMN_ID;
use common_expression::SEGMENT_NAME_COLUMN_ID;
use common_expression::SNAPSHOT_NAME_COLUMN_ID;
Expand Down Expand Up @@ -121,7 +128,13 @@ pub enum InternalColumnType {
BlockName,
SegmentName,
SnapshotName,

// stream columns
BaseRowId,
BaseBlockIds,
ChangeAction,
ChangeIsUpdate,
ChangeRowId,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
Expand All @@ -148,12 +161,16 @@ impl InternalColumn {
InternalColumnType::BlockName => TableDataType::String,
InternalColumnType::SegmentName => TableDataType::String,
InternalColumnType::SnapshotName => TableDataType::String,
InternalColumnType::BaseRowId => TableDataType::String,
InternalColumnType::BaseBlockIds => TableDataType::Array(Box::new(
TableDataType::Decimal(DecimalDataType::Decimal128(DecimalSize {
precision: 38,
scale: 0,
})),
)),
InternalColumnType::ChangeAction => TableDataType::String,
InternalColumnType::ChangeIsUpdate => TableDataType::Boolean,
InternalColumnType::ChangeRowId => TableDataType::String,
}
}

Expand All @@ -172,7 +189,21 @@ impl InternalColumn {
InternalColumnType::BlockName => BLOCK_NAME_COLUMN_ID,
InternalColumnType::SegmentName => SEGMENT_NAME_COLUMN_ID,
InternalColumnType::SnapshotName => SNAPSHOT_NAME_COLUMN_ID,
InternalColumnType::BaseRowId => BASE_ROW_ID_COLUMN_ID,
InternalColumnType::BaseBlockIds => BASE_BLOCK_IDS_COLUMN_ID,
InternalColumnType::ChangeAction => CHANGE_ACTION_COLUMN_ID,
InternalColumnType::ChangeIsUpdate => CHANGE_IS_UPDATE_COLUMN_ID,
InternalColumnType::ChangeRowId => CHANGE_ROW_ID_COLUMN_ID,
}
}

pub fn virtual_computed_expr(&self) -> Option<String> {
match &self.column_type {
InternalColumnType::ChangeRowId => Some(
"if(is_not_null(_origin_block_id), concat(to_uuid(_origin_block_id), lpad(hex(_origin_block_row_num), 6, '0')), _base_row_id)"
.to_string(),
),
_ => None,
}
}

Expand Down Expand Up @@ -234,6 +265,31 @@ impl InternalColumn {
Value::Scalar(Scalar::String(builder.build_scalar())),
)
}
InternalColumnType::BaseRowId => {
let file_stem = Path::new(&meta.block_location).file_stem().unwrap();
let file_strs = file_stem
.to_str()
.unwrap_or("")
.split('_')
.collect::<Vec<&str>>();
let uuid = file_strs[0];
let mut row_ids = Vec::with_capacity(num_rows);
if let Some(offsets) = &meta.offsets {
for i in offsets {
let row_id = format!("{}{:06x}", uuid, *i).as_bytes().to_vec();
row_ids.push(row_id);
}
} else {
for i in 0..num_rows {
let row_id = format!("{}{:06x}", uuid, i).as_bytes().to_vec();
row_ids.push(row_id);
}
}
BlockEntry::new(
DataType::String,
Value::Column(StringType::from_data(row_ids)),
)
}
InternalColumnType::BaseBlockIds => {
assert!(meta.base_block_ids.is_some());
BlockEntry::new(
Expand All @@ -246,6 +302,14 @@ impl InternalColumn {
Value::Scalar(meta.base_block_ids.clone().unwrap()),
)
}
InternalColumnType::ChangeAction => BlockEntry::new(
DataType::String,
Value::Scalar(Scalar::String("INSERT".as_bytes().to_vec())),
),
InternalColumnType::ChangeIsUpdate => {
BlockEntry::new(DataType::Boolean, Value::Scalar(Scalar::Boolean(false)))
}
InternalColumnType::ChangeRowId => unreachable!(),
}
}
}
21 changes: 19 additions & 2 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,25 @@ pub const ROW_ID_COLUMN_ID: u32 = u32::MAX;
pub const BLOCK_NAME_COLUMN_ID: u32 = u32::MAX - 1;
pub const SEGMENT_NAME_COLUMN_ID: u32 = u32::MAX - 2;
pub const SNAPSHOT_NAME_COLUMN_ID: u32 = u32::MAX - 3;
pub const BASE_BLOCK_IDS_COLUMN_ID: u32 = u32::MAX - 4;
// internal stream column id.
pub const BASE_ROW_ID_COLUMN_ID: u32 = u32::MAX - 5;
pub const BASE_BLOCK_IDS_COLUMN_ID: u32 = u32::MAX - 6;
pub const CHANGE_ACTION_COLUMN_ID: u32 = u32::MAX - 7;
pub const CHANGE_IS_UPDATE_COLUMN_ID: u32 = u32::MAX - 8;
pub const CHANGE_ROW_ID_COLUMN_ID: u32 = u32::MAX - 9;

// internal column name.
pub const ROW_ID_COL_NAME: &str = "_row_id";
pub const SNAPSHOT_NAME_COL_NAME: &str = "_snapshot_name";
pub const SEGMENT_NAME_COL_NAME: &str = "_segment_name";
pub const BLOCK_NAME_COL_NAME: &str = "_block_name";
// internal stream column name.
pub const BASE_ROW_ID_COL_NAME: &str = "_base_row_id";
pub const BASE_BLOCK_IDS_COL_NAME: &str = "_base_block_ids";
pub const CHANGE_ACTION_COL_NAME: &str = "change$action";
pub const CHANGE_IS_UPDATE_COL_NAME: &str = "change$is_update";
pub const CHANGE_ROW_ID_COL_NAME: &str = "change$row_id";

pub const ROW_NUMBER_COL_NAME: &str = "_row_number";
pub const PREDICATE_COLUMN_NAME: &str = "_predicate";

Expand All @@ -71,7 +83,12 @@ pub const ORIGIN_BLOCK_ROW_NUM_COL_NAME: &str = "_origin_block_row_num";

#[inline]
pub fn is_internal_column_id(column_id: ColumnId) -> bool {
column_id >= BASE_BLOCK_IDS_COLUMN_ID
column_id >= CHANGE_ROW_ID_COLUMN_ID
}

#[inline]
pub fn is_internal_stream_column_id(column_id: ColumnId) -> bool {
(CHANGE_ROW_ID_COLUMN_ID..=BASE_ROW_ID_COLUMN_ID).contains(&column_id)
}

#[inline]
Expand Down
13 changes: 13 additions & 0 deletions src/query/functions/src/scalars/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::io::Write;
use base64::engine::general_purpose;
use base64::prelude::*;
use bstr::ByteSlice;
use common_base::base::uuid::Uuid;
use common_expression::types::decimal::Decimal128Type;
use common_expression::types::number::SimpleDomain;
use common_expression::types::number::UInt64Type;
use common_expression::types::string::StringColumn;
Expand Down Expand Up @@ -263,6 +265,17 @@ pub fn register(registry: &mut FunctionRegistry) {
}),
);

registry.register_passthrough_nullable_1_arg::<Decimal128Type, StringType, _, _>(
"to_uuid",
|_, _| FunctionDomain::Full,
vectorize_with_builder_1_arg::<Decimal128Type, StringType>(|arg, output, _| {
let uuid = Uuid::from_u128(arg as u128);
let str = uuid.as_simple().to_string();
output.put_slice(str.as_bytes());
output.commit_row();
}),
);

registry.register_2_arg::<StringType, StringType, NumberType<i8>, _, _>(
"strcmp",
|_, _, _| FunctionDomain::Full,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3607,6 +3607,8 @@ Functions overloads:
25 to_uint8(Boolean NULL) :: UInt8 NULL
0 to_unix_timestamp(Timestamp) :: Int64
1 to_unix_timestamp(Timestamp NULL) :: Int64 NULL
0 to_uuid(Decimal(38, 0)) :: String
1 to_uuid(Decimal(38, 0) NULL) :: String NULL
0 to_variant(T0) :: Variant
1 to_variant(T0 NULL) :: Variant NULL
0 to_week_of_year(Date) :: UInt32
Expand Down
10 changes: 2 additions & 8 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use common_sql::evaluator::CompoundBlockOperator;
use common_sql::executor::physical_plans::ConstantTableScan;
use common_sql::executor::physical_plans::CteScan;
use common_sql::executor::physical_plans::TableScan;
use common_storages_fuse::operations::FillInternalColumnProcessor;

use crate::pipelines::processors::transforms::MaterializedCteSource;
use crate::pipelines::processors::transforms::TransformAddInternalColumns;
use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
Expand Down Expand Up @@ -74,13 +74,7 @@ impl PipelineBuilder {
if let Some(internal_columns) = &scan.internal_column {
if table.support_row_id_column() {
self.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(Box::new(
FillInternalColumnProcessor::create(
internal_columns.clone(),
input,
output,
),
)))
TransformAddInternalColumns::try_create(input, output, internal_columns.clone())
})?;
} else {
return Err(ErrorCode::TableEngineNotSupported(format!(
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod processor_extract_hash_table_by_row_number;
pub(crate) mod range_join;
mod transform_add_computed_columns;
mod transform_add_const_columns;
mod transform_add_internal_columns;
mod transform_add_stream_columns;
mod transform_cast_schema;
mod transform_create_sets;
Expand All @@ -42,6 +43,7 @@ pub use processor_extract_hash_table_by_row_number::ExtractHashTableByRowNumber;
pub use range_join::RangeJoinState;
pub use transform_add_computed_columns::TransformAddComputedColumns;
pub use transform_add_const_columns::TransformAddConstColumns;
pub use transform_add_internal_columns::TransformAddInternalColumns;
pub use transform_add_stream_columns::TransformAddStreamColumns;
pub use transform_cast_schema::TransformCastSchema;
pub use transform_create_sets::SubqueryReceiver;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use common_catalog::plan::InternalColumn;
use common_catalog::plan::InternalColumnMeta;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::BlockMetaInfoDowncast;
use common_expression::DataBlock;
use common_expression::FieldIndex;
use common_expression::CHANGE_ROW_ID_COLUMN_ID;
use common_pipeline_core::processors::InputPort;
use common_pipeline_core::processors::OutputPort;
use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_transforms::processors::Transform;
use common_pipeline_transforms::processors::Transformer;

pub struct TransformAddInternalColumns {
internal_columns: BTreeMap<FieldIndex, InternalColumn>,
}

impl TransformAddInternalColumns
where Self: Transform
{
pub fn try_create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
internal_columns: BTreeMap<FieldIndex, InternalColumn>,
) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(Transformer::create(
input,
output,
Self { internal_columns },
)))
}
}

impl Transform for TransformAddInternalColumns {
const NAME: &'static str = "AddInternalColumnsTransform";

fn transform(&mut self, mut block: DataBlock) -> Result<DataBlock> {
if let Some(meta) = block.take_meta() {
let internal_column_meta =
InternalColumnMeta::downcast_from(meta).ok_or(ErrorCode::Internal("It's a bug"))?;
let num_rows = block.num_rows();
for internal_column in self.internal_columns.values() {
if internal_column.column_id() == CHANGE_ROW_ID_COLUMN_ID {
continue;
}
let column =
internal_column.generate_column_values(&internal_column_meta, num_rows);
block.add_column(column);
}
}
Ok(block)
}
}
24 changes: 14 additions & 10 deletions src/query/sql/src/executor/physical_plans/physical_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,21 @@ impl PhysicalPlanBuilder {
continue;
}
let column = metadata.column(*index);
if let ColumnEntry::BaseTableColumn(BaseTableColumn { path_indices, .. }) = column {
if path_indices.is_some() {
has_inner_column = true;
match column {
ColumnEntry::BaseTableColumn(BaseTableColumn { path_indices, .. }) => {
if path_indices.is_some() {
has_inner_column = true;
}
}
} else if let ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) = column
{
project_internal_columns.insert(*index, internal_column.to_owned());
} else if let ColumnEntry::VirtualColumn(_) = column {
has_virtual_column = true;
ColumnEntry::InternalColumn(TableInternalColumn {
internal_column, ..
}) => {
project_internal_columns.insert(*index, internal_column.to_owned());
}
ColumnEntry::VirtualColumn(_) => {
has_virtual_column = true;
}
_ => {}
}

if let Some(prewhere) = &scan.prewhere {
Expand Down
18 changes: 17 additions & 1 deletion src/query/sql/src/planner/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_catalog::plan::InternalColumn;
use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::Span;
use common_expression::is_internal_stream_column_id;
use common_expression::ColumnId;
use common_expression::DataField;
use common_expression::DataSchemaRef;
Expand Down Expand Up @@ -527,6 +528,7 @@ impl BindContext {
&mut self,
column_binding: &InternalColumnBinding,
metadata: MetadataRef,
visible: bool,
) -> Result<ColumnBinding> {
if !self.allow_internal_columns {
return Err(ErrorCode::SemanticError(format!(
Expand Down Expand Up @@ -554,16 +556,30 @@ impl BindContext {

let metadata = metadata.read();
let table = metadata.table(table_index);
if table.table().engine() != "STREAM" && is_internal_stream_column_id(column_id) {
return Err(ErrorCode::SemanticError(format!(
"Internal column `{}` is not allowed in table `{}`",
column_binding.internal_column.column_name(),
table.table().name()
)));
}

let column = metadata.column(column_index);
let virtual_computed_expr = column_binding.internal_column.virtual_computed_expr();
let column_binding = ColumnBindingBuilder::new(
column.name(),
column_index,
Box::new(column.data_type()),
Visibility::Visible,
if visible {
Visibility::Visible
} else {
Visibility::InVisible
},
)
.database_name(Some(table.database().to_string()))
.table_name(Some(table.name().to_string()))
.table_index(Some(table_index))
.virtual_computed_expr(virtual_computed_expr)
.build();

if new {
Expand Down
Loading

0 comments on commit 993f03c

Please sign in to comment.