Skip to content

Commit

Permalink
Merge branch 'main' into qianqian/fix-time
Browse files Browse the repository at this point in the history
  • Loading branch information
Sevenannn authored Aug 5, 2024
2 parents 798f3da + d7e7f22 commit a6bd3f4
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 13 deletions.
55 changes: 43 additions & 12 deletions src/sql/arrow_sql_gen/mysql.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::sql::arrow_sql_gen::arrow::map_data_type_to_array_builder_optional;
use arrow::{
array::{
ArrayBuilder, ArrayRef, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder,
Int16Builder, Int32Builder, Int64Builder, Int8Builder, LargeStringBuilder, NullBuilder,
RecordBatch, RecordBatchOptions, Time64NanosecondBuilder, TimestampMillisecondBuilder,
UInt64Builder,
ArrayBuilder, ArrayRef, BinaryBuilder, Date32Builder, Decimal128Builder, Float32Builder,
Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, LargeStringBuilder,
NullBuilder, RecordBatch, RecordBatchOptions, Time64NanosecondBuilder,
TimestampMillisecondBuilder, UInt64Builder,
},
datatypes::{DataType, Date32Type, Field, Schema, TimeUnit},
};
use bigdecimal::BigDecimal;
use bigdecimal::ToPrimitive;
use chrono::{NaiveDate, NaiveTime, Timelike};
use mysql_async::{consts::ColumnType, FromValueError, Row, Value};
use mysql_async::{consts::ColumnFlags, consts::ColumnType, FromValueError, Row, Value};
use snafu::{ResultExt, Snafu};
use std::{convert, sync::Arc};

Expand Down Expand Up @@ -93,13 +93,15 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
let mut arrow_columns_builders: Vec<Option<Box<dyn ArrayBuilder>>> = Vec::new();
let mut mysql_types: Vec<ColumnType> = Vec::new();
let mut column_names: Vec<String> = Vec::new();
let mut column_is_binary_stats: Vec<bool> = Vec::new();

if !rows.is_empty() {
let row = &rows[0];
for column in row.columns().iter() {
let column_name = column.name_str();
let column_type = column.column_type();
let data_type = map_column_to_data_type(column_type);
let column_is_binary = column.flags().contains(ColumnFlags::BINARY_FLAG);
let data_type = map_column_to_data_type(column_type, column_is_binary);
arrow_fields.push(
data_type
.clone()
Expand All @@ -109,6 +111,7 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
.push(map_data_type_to_array_builder_optional(data_type.as_ref()));
mysql_types.push(column_type);
column_names.push(column_name.to_string());
column_is_binary_stats.push(column_is_binary);
}
}

Expand Down Expand Up @@ -258,8 +261,6 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
dec_builder.append_value(val);
}
column_type @ (ColumnType::MYSQL_TYPE_VARCHAR
| ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING
| ColumnType::MYSQL_TYPE_JSON
| ColumnType::MYSQL_TYPE_TINY_BLOB
| ColumnType::MYSQL_TYPE_BLOB
Expand All @@ -275,6 +276,28 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
i
);
}
column_type @ (ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING) => {
if column_is_binary_stats[i] {
handle_primitive_type!(
builder,
column_type,
BinaryBuilder,
Vec<u8>,
row,
i
);
} else {
handle_primitive_type!(
builder,
column_type,
LargeStringBuilder,
String,
row,
i
);
}
}
ColumnType::MYSQL_TYPE_DATE => {
let Some(builder) = builder else {
return NoBuilderForIndexSnafu { index: i }.fail();
Expand Down Expand Up @@ -407,7 +430,10 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
}

#[allow(clippy::unnecessary_wraps)]
pub fn map_column_to_data_type(column_type: ColumnType) -> Option<DataType> {
pub fn map_column_to_data_type(
column_type: ColumnType,
column_is_binary: bool,
) -> Option<DataType> {
match column_type {
ColumnType::MYSQL_TYPE_NULL => Some(DataType::Null),
ColumnType::MYSQL_TYPE_BIT => Some(DataType::UInt64),
Expand All @@ -426,16 +452,21 @@ pub fn map_column_to_data_type(column_type: ColumnType) -> Option<DataType> {
Some(DataType::Time64(TimeUnit::Nanosecond))
}
ColumnType::MYSQL_TYPE_VARCHAR
| ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING
| ColumnType::MYSQL_TYPE_JSON
| ColumnType::MYSQL_TYPE_ENUM
| ColumnType::MYSQL_TYPE_SET
| ColumnType::MYSQL_TYPE_TINY_BLOB
| ColumnType::MYSQL_TYPE_BLOB
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
| ColumnType::MYSQL_TYPE_LONG_BLOB => Some(DataType::LargeUtf8),

ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING => {
if column_is_binary {
Some(DataType::Binary)
} else {
Some(DataType::LargeUtf8)
}
},
// replication only
ColumnType::MYSQL_TYPE_TYPED_ARRAY
// internal
Expand Down
4 changes: 4 additions & 0 deletions src/sql/arrow_sql_gen/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result<RecordBatch> {
Type::VARCHAR => {
handle_primitive_type!(builder, Type::VARCHAR, StringBuilder, &str, row, i);
}
Type::BYTEA => {
handle_primitive_type!(builder, Type::BYTEA, BinaryBuilder, Vec<u8>, row, i);
}
Type::BPCHAR => {
let Some(builder) = builder else {
return NoBuilderForIndexSnafu { index: i }.fail();
Expand Down Expand Up @@ -555,6 +558,7 @@ fn map_column_type_to_data_type(column_type: &Type) -> Option<DataType> {
Type::FLOAT4 => Some(DataType::Float32),
Type::FLOAT8 => Some(DataType::Float64),
Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::UUID => Some(DataType::Utf8),
Type::BYTEA => Some(DataType::Binary),
Type::BOOL => Some(DataType::Boolean),
// Inspect the scale from the first row. Precision will always be 38 for Decimal128.
Type::NUMERIC => None,
Expand Down
10 changes: 9 additions & 1 deletion src/sql/db_connection_pool/dbconnection/mysqlconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {
})?;

let column_type = map_str_type_to_column_type(&data_type)?;
let column_is_binary = map_str_type_to_is_binary(&data_type);

let arrow_data_type = match column_type {
// map_column_to_data_type does not support decimal mapping and uses special logic to handle conversion based on actual value
Expand All @@ -177,7 +178,7 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {
// rows_to_arrow uses hardcoded precision 38 for decimal so we use it here as well
DataType::Decimal128(38, scale)
}
_ => map_column_to_data_type(column_type)
_ => map_column_to_data_type(column_type, column_is_binary)
.context(UnsupportedDataTypeSnafu { data_type })?,
};
fields.push(Field::new(&column_name, arrow_data_type, true));
Expand Down Expand Up @@ -234,6 +235,13 @@ fn map_str_type_to_column_type(data_type: &str) -> Result<ColumnType> {
Ok(column_type)
}

fn map_str_type_to_is_binary(data_type: &str) -> bool {
if data_type.starts_with("binary") | data_type.starts_with("varbinary") {
return true;
}
false
}

fn extract_decimal_precision_and_scale(data_type: &str) -> Result<(u8, i8)> {
let (start, end) = match (data_type.find('('), data_type.find(')')) {
(Some(start), Some(end)) => (start, end),
Expand Down

0 comments on commit a6bd3f4

Please sign in to comment.