Skip to content

Commit

Permalink
feat: support convert nested types from string to values
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed Apr 11, 2024
1 parent 79d1703 commit b04a426
Show file tree
Hide file tree
Showing 10 changed files with 744 additions and 8 deletions.
3 changes: 3 additions & 0 deletions cli/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ impl Session {
'F: loop {
match rl.readline(&self.prompt().await) {
Ok(line) => {
if line == "exit" {
break;
}
let queries = self.append_query(&line);
for query in queries {
let _ = rl.add_history_entry(&query);
Expand Down
3 changes: 3 additions & 0 deletions sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ flight-sql = ["dep:arrow-array", "dep:arrow-schema", "dep:tonic"]
[dependencies]
databend-client = { workspace = true }

memchr = "2.7.2"
chrono = { version = "0.4.35", default-features = false }
geozero = { version = "0.12.0", features = ["default", "with-wkb"] }
glob = "0.3"
itertools = "0.12"
jsonb = "0.3"
lexical-core = "0.8.5"

roaring = { version = "0.10", features = ["serde"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
Expand Down
33 changes: 33 additions & 0 deletions sql/src/cursor_ext/cursor_checkpoint_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Cursor;

pub trait ReadCheckPointExt {
fn checkpoint(&self) -> u64;
fn rollback(&mut self, checkpoint: u64);
}

impl<T> ReadCheckPointExt for Cursor<T>
where
T: AsRef<[u8]>,
{
fn checkpoint(&self) -> u64 {
self.position()
}

fn rollback(&mut self, checkpoint: u64) {
self.set_position(checkpoint)
}
}
171 changes: 171 additions & 0 deletions sql/src/cursor_ext/cursor_read_bytes_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use memchr::memchr;
use std::io::BufRead;
use std::io::Cursor;
use std::io::ErrorKind;
use std::io::Result;

pub trait ReadBytesExt {
fn peek(&mut self) -> Option<char>;
fn peek_byte(&mut self) -> Option<u8>;
fn ignore(&mut self, f: impl Fn(u8) -> bool) -> bool;
fn ignores(&mut self, f: impl Fn(u8) -> bool) -> usize;
fn ignore_byte(&mut self, b: u8) -> bool;
fn ignore_bytes(&mut self, bs: &[u8]) -> bool;
fn ignore_white_spaces(&mut self) -> bool;
fn until(&mut self, delim: u8, buf: &mut Vec<u8>) -> usize;
fn keep_read(&mut self, buf: &mut Vec<u8>, f: impl Fn(u8) -> bool) -> usize;
fn must_ignore(&mut self, f: impl Fn(u8) -> bool) -> Result<()> {
if !self.ignore(f) {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
"Expected to ignore a byte",
));
}
Ok(())
}

fn must_ignore_byte(&mut self, b: u8) -> Result<()>;
}

impl<T> ReadBytesExt for Cursor<T>
where
T: AsRef<[u8]>,
{
fn peek(&mut self) -> Option<char> {
let buf = self.fill_buf().ok()?;
if buf.is_empty() {
None
} else {
Some(buf[0] as char)
}
}

fn peek_byte(&mut self) -> Option<u8> {
let buf = self.fill_buf().ok()?;
if buf.is_empty() {
None
} else {
Some(buf[0])
}
}

fn ignore(&mut self, f: impl Fn(u8) -> bool) -> bool {
match self.fill_buf() {
Ok(available) => {
if available.is_empty() {
false
} else if f(available[0]) {
self.consume(1);
true
} else {
false
}
}
Err(_) => false,
}
}

fn ignores(&mut self, f: impl Fn(u8) -> bool) -> usize {
match self.fill_buf() {
Ok(available) => {
if available.is_empty() {
return 0;
}
for (index, byt) in available.iter().enumerate() {
if !f(*byt) {
self.consume(index);
return index;
}
}
let len = available.len();
self.consume(len);
len
}
Err(_) => 0,
}
}

fn ignore_byte(&mut self, b: u8) -> bool {
self.ignore(|c| c == b)
}

fn ignore_bytes(&mut self, bs: &[u8]) -> bool {
match self.fill_buf() {
Ok(available) => {
let len = bs.len();
if available.len() < len {
return false;
}
let eq = available[..len].iter().zip(bs).all(|(x, y)| x == y);
if eq {
self.consume(len);
}
eq
}
Err(_) => false,
}
}

fn must_ignore_byte(&mut self, b: u8) -> Result<()> {
if !self.ignore_byte(b) {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
format!(
"Expected to have char '{}', got '{:?}' at pos {}",
b as char,
self.peek(),
self.position()
),
));
}
Ok(())
}

fn ignore_white_spaces(&mut self) -> bool {
self.ignores(|c| c.is_ascii_whitespace()) > 0
}

fn until(&mut self, delim: u8, buf: &mut Vec<u8>) -> usize {
match self.fill_buf() {
Ok(remaining_slice) => {
let to_read = memchr(delim, remaining_slice).map_or(buf.len(), |n| n + 1);
buf.extend_from_slice(&remaining_slice[..to_read]);
self.consume(to_read);
to_read
}
Err(_) => 0,
}
}

fn keep_read(&mut self, buf: &mut Vec<u8>, f: impl Fn(u8) -> bool) -> usize {
match self.fill_buf() {
Ok(remaining_slice) => {
let mut to_read = remaining_slice.len();
for (i, b) in remaining_slice.iter().enumerate() {
if !f(*b) {
to_read = i;
break;
}
}
buf.extend_from_slice(&remaining_slice[..to_read]);
self.consume(to_read);
to_read
}
Err(_) => 0,
}
}
}
133 changes: 133 additions & 0 deletions sql/src/cursor_ext/cursor_read_number_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::BufRead;
use std::io::Cursor;

use std::io::ErrorKind;
use std::io::Result;

use lexical_core::FromLexical;

pub trait ReadNumberExt {
fn read_int_text<T: FromLexical>(&mut self) -> Result<T>;
fn read_float_text<T: FromLexical>(&mut self) -> Result<T>;
}

pub fn collect_number(buffer: &[u8]) -> (usize, usize) {
let mut has_number = false;
let mut index = 0;
let len = buffer.len();
let mut point_pos = len;

for _ in 0..len {
match buffer[index] {
b'0'..=b'9' => {
has_number = true;
}

b'-' | b'+' => {
if has_number {
break;
}
}
b'.' => {
point_pos = index;
index += 1;
break;
}
_ => {
break;
}
}
index += 1;
}
if point_pos < len {
while index < len && buffer[index].is_ascii_digit() {
index += 1;
}
}

let mut is_scientific = false;
if has_number && index < len && (buffer[index] == b'e' || buffer[index] == b'E') {
is_scientific = true;
index += 1;
if index < len && (buffer[index] == b'-' || buffer[index] == b'+') {
index += 1
}
while index < len && buffer[index].is_ascii_digit() {
index += 1;
}
}

let effective = if !is_scientific
&& point_pos < len
&& buffer[point_pos + 1..index].iter().all(|x| *x == b'0')
{
point_pos
} else {
index
};
(index, effective)
}

#[inline]
fn read_num_text_exact<T: FromLexical>(buf: &[u8]) -> Result<T> {
match FromLexical::from_lexical(buf) {
Ok(value) => Ok(value),
Err(cause) => Err(std::io::Error::new(
ErrorKind::InvalidData,
format!(
"Cannot parse value:{:?} to number type, cause: {:?}",
String::from_utf8_lossy(buf),
cause
),
)),
}
}

impl<B> ReadNumberExt for Cursor<B>
where
B: AsRef<[u8]>,
{
fn read_int_text<T: FromLexical>(&mut self) -> Result<T> {
let buf = self.fill_buf()?;
let (n_in, n_out) = collect_number(buf);
if n_in == 0 {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
"Failed to parse number from text: input does not contain a valid numeric format."
.to_string(),
));
}
let n = read_num_text_exact(&buf[..n_out])?;
self.consume(n_in);
Ok(n)
}

fn read_float_text<T: FromLexical>(&mut self) -> Result<T> {
let buf = self.fill_buf()?;
let (n_in, n_out) = collect_number(buf);
if n_in == 0 {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
"Unable to parse float: provided text is not in a recognizable floating-point format.".to_string()
));
}
let buf = self.fill_buf()?;
let n = read_num_text_exact(&buf[..n_out])?;
self.consume(n_in);
Ok(n)
}
}
Loading

0 comments on commit b04a426

Please sign in to comment.