diff --git a/.github/workflows/docs_pr.yaml b/.github/workflows/docs_pr.yaml index 435006ed93bb3..29908abba0e4e 100644 --- a/.github/workflows/docs_pr.yaml +++ b/.github/workflows/docs_pr.yaml @@ -49,6 +49,6 @@ jobs: rust-version: stable # Note: this does not include dictionary_expressions to reduce codegen - name: Run doctests - run: cargo test --doc --features avro,jit,scheduler,json + run: cargo test --doc --features avro,scheduler,json - name: Verify Working Directory Clean run: git diff --exit-code diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 36a620aaa06c0..e53d66945d3e0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -67,7 +67,7 @@ jobs: # Note: this does not include dictionary_expressions to reduce codegen - name: Check workspace with all features - run: cargo check --workspace --benches --features avro,jit,scheduler,json + run: cargo check --workspace --benches --features avro,scheduler,json - name: Check Cargo.lock for datafusion-cli run: | @@ -97,7 +97,7 @@ jobs: with: rust-version: stable - name: Run tests (excluding doctests) - run: cargo test --lib --tests --bins --features avro,jit,scheduler,json,dictionary_expressions + run: cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions - name: Verify Working Directory Clean run: git diff --exit-code @@ -153,7 +153,7 @@ jobs: rust-version: stable # Note: this does not include dictionary_expressions to reduce codegen - name: Run doctests - run: cargo test --doc --features avro,jit,scheduler,json + run: cargo test --doc --features avro,scheduler,json - name: Verify Working Directory Clean run: git diff --exit-code @@ -272,7 +272,7 @@ jobs: shell: bash run: | export PATH=$PATH:$HOME/d/protoc/bin - cargo test --lib --tests --bins --features avro,jit,scheduler,json,dictionary_expressions + cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions env: # do not produce debug symbols to keep memory usage down RUSTFLAGS: "-C debuginfo=0" @@ -305,7 +305,7 @@ jobs: - name: Run tests (excluding doctests) shell: bash run: | - cargo test --lib --tests --bins --features avro,jit,scheduler,json,dictionary_expressions + cargo test --lib --tests --bins --features avro,scheduler,json,dictionary_expressions env: # do not produce debug symbols to keep memory usage down RUSTFLAGS: "-C debuginfo=0" diff --git a/Cargo.toml b/Cargo.toml index c5100041b29b7..0888306cad04d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,6 @@ members = [ "datafusion/core", "datafusion/expr", "datafusion/execution", - "datafusion/jit", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", diff --git a/ci/scripts/rust_clippy.sh b/ci/scripts/rust_clippy.sh index c48881b7a7bde..d22a09890553d 100755 --- a/ci/scripts/rust_clippy.sh +++ b/ci/scripts/rust_clippy.sh @@ -18,4 +18,4 @@ # under the License. set -ex -cargo clippy --all-targets --workspace --features avro,jit,pyarrow,scheduler -- -D warnings +cargo clippy --all-targets --workspace --features avro,pyarrow,scheduler -- -D warnings diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index c0eff9797579f..701cbdb51510d 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -35,7 +35,6 @@ path = "src/lib.rs" [features] avro = ["apache-avro"] default = [] -jit = ["cranelift-module"] pyarrow = ["pyo3", "arrow/pyarrow"] [dependencies] diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index c0201c7ec16dc..97c448f41acb5 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -28,8 +28,6 @@ use crate::{Column, DFSchema, OwnedTableReference}; #[cfg(feature = "avro")] use apache_avro::Error as AvroError; use arrow::error::ArrowError; -#[cfg(feature = "jit")] -use cranelift_module::ModuleError; #[cfg(feature = "parquet")] use parquet::errors::ParquetError; use sqlparser::parser::ParserError; @@ -85,9 +83,6 @@ pub enum DataFusionError { /// Errors originating from outside DataFusion's core codebase. /// For example, a custom S3Error from the crate datafusion-objectstore-s3 External(GenericError), - #[cfg(feature = "jit")] - /// Error occurs during code generation - JITError(ModuleError), /// Error with additional context Context(String, Box), /// Errors originating from either mapping LogicalPlans to/from Substrait plans @@ -281,13 +276,6 @@ impl From for DataFusionError { } } -#[cfg(feature = "jit")] -impl From for DataFusionError { - fn from(e: ModuleError) -> Self { - DataFusionError::JITError(e) - } -} - impl From for DataFusionError { fn from(err: GenericError) -> Self { DataFusionError::External(err) @@ -332,10 +320,6 @@ impl Display for DataFusionError { DataFusionError::External(ref desc) => { write!(f, "External error: {desc}") } - #[cfg(feature = "jit")] - DataFusionError::JITError(ref desc) => { - write!(f, "JIT error: {desc}") - } #[cfg(feature = "object_store")] DataFusionError::ObjectStore(ref desc) => { write!(f, "Object Store error: {desc}") @@ -369,8 +353,6 @@ impl Error for DataFusionError { DataFusionError::Execution(_) => None, DataFusionError::ResourcesExhausted(_) => None, DataFusionError::External(e) => Some(e.as_ref()), - #[cfg(feature = "jit")] - DataFusionError::JITError(e) => Some(e), DataFusionError::Context(_, e) => Some(e.as_ref()), DataFusionError::Substrait(_) => None, } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 5196d278b8fcc..1fe9947a208ba 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -44,8 +44,6 @@ default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "co dictionary_expressions = ["datafusion-physical-expr/dictionary_expressions", "datafusion-optimizer/dictionary_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] -# Used to enable JIT code generation -jit = ["datafusion-jit", "datafusion-row/jit"] pyarrow = ["datafusion-common/pyarrow"] regex_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion-optimizer/regex_expressions"] # Used to enable scheduler @@ -68,7 +66,6 @@ dashmap = "5.4.0" datafusion-common = { path = "../common", version = "23.0.0", features = ["parquet", "object_store"] } datafusion-execution = { path = "../execution", version = "23.0.0" } datafusion-expr = { path = "../expr", version = "23.0.0" } -datafusion-jit = { path = "../jit", version = "23.0.0", optional = true } datafusion-optimizer = { path = "../optimizer", version = "23.0.0", default-features = false } datafusion-physical-expr = { path = "../physical-expr", version = "23.0.0", default-features = false } datafusion-row = { path = "../row", version = "23.0.0" } @@ -159,11 +156,6 @@ required-features = ["scheduler"] harness = false name = "sql_planner" -[[bench]] -harness = false -name = "jit" -required-features = ["jit"] - [[bench]] harness = false name = "sort" diff --git a/datafusion/core/benches/jit.rs b/datafusion/core/benches/jit.rs deleted file mode 100644 index d42df8e033aba..0000000000000 --- a/datafusion/core/benches/jit.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#[macro_use] -extern crate criterion; -extern crate arrow; -extern crate datafusion; - -mod data_utils; -use crate::criterion::Criterion; -use crate::data_utils::{create_record_batches, create_schema}; -use datafusion::row::jit::writer::bench_write_batch_jit; -use datafusion::row::writer::bench_write_batch; -use std::sync::Arc; - -fn criterion_benchmark(c: &mut Criterion) { - let partitions_len = 8; - let array_len = 32768 * 1024; // 2^25 - let batch_size = 2048; // 2^11 - - let schema = Arc::new(create_schema()); - let batches = - create_record_batches(schema.clone(), array_len, partitions_len, batch_size); - - c.bench_function("word aligned row serializer", |b| { - b.iter(|| { - criterion::black_box(bench_write_batch(&batches, schema.clone()).unwrap()) - }) - }); - - c.bench_function("word aligned row serializer jit", |b| { - b.iter(|| { - criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap()) - }) - }); -} - -criterion_group!(benches, criterion_benchmark); -criterion_main!(benches); diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 4f1e7929e18e9..ddc63c157ff2c 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -439,9 +439,6 @@ pub use datafusion_physical_expr as physical_expr; pub use datafusion_row as row; pub use datafusion_sql as sql; -#[cfg(feature = "jit")] -pub use datafusion_jit as jit; - pub use common::from_slice; #[cfg(test)] diff --git a/datafusion/jit/Cargo.toml b/datafusion/jit/Cargo.toml deleted file mode 100644 index baf1697f07118..0000000000000 --- a/datafusion/jit/Cargo.toml +++ /dev/null @@ -1,47 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "datafusion-jit" -description = "Just In Time (JIT) compilation support for DataFusion query engine" -keywords = ["arrow", "query", "sql"] -version = { workspace = true } -edition = { workspace = true } -readme = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -license = { workspace = true } -authors = { workspace = true } -rust-version = { workspace = true } - -[lib] -name = "datafusion_jit" -path = "src/lib.rs" - -[features] -jit = [] - -[dependencies] -arrow = { workspace = true } -cranelift = "0.89.0" -cranelift-jit = "0.89.0" -cranelift-module = "0.89.0" -cranelift-native = "0.89.0" -datafusion-common = { path = "../common", version = "23.0.0", features = ["jit"] } -datafusion-expr = { path = "../expr", version = "23.0.0" } - -parking_lot = "0.12" diff --git a/datafusion/jit/README.md b/datafusion/jit/README.md deleted file mode 100644 index de931ed6758df..0000000000000 --- a/datafusion/jit/README.md +++ /dev/null @@ -1,26 +0,0 @@ - - -# DataFusion JIT - -[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. - -This crate is a submodule of DataFusion that provides JIT code generation. - -[df]: https://crates.io/crates/datafusion diff --git a/datafusion/jit/src/api.rs b/datafusion/jit/src/api.rs deleted file mode 100644 index 4ac26438bc2b6..0000000000000 --- a/datafusion/jit/src/api.rs +++ /dev/null @@ -1,643 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Constructing a function AST at runtime. - -use crate::ast::*; -use crate::jit::JIT; -use datafusion_common::internal_err; -use datafusion_common::{DataFusionError, Result}; -use parking_lot::Mutex; -use std::collections::HashMap; -use std::collections::VecDeque; -use std::fmt::{Debug, Display, Formatter}; -use std::sync::Arc; - -/// External Function signature -struct ExternFuncSignature { - name: String, - /// pointer to the function - code: *const u8, - params: Vec, - returns: Option, -} - -#[derive(Clone, Debug)] -/// A function consisting of AST nodes that JIT can compile. -pub struct GeneratedFunction { - pub(crate) name: String, - pub(crate) params: Vec<(String, JITType)>, - pub(crate) body: Vec, - pub(crate) ret: Option<(String, JITType)>, -} - -#[derive(Default)] -/// State of Assembler, keep tracking of generated function names -/// and registered external functions. -pub struct AssemblerState { - name_next_id: HashMap, - extern_funcs: HashMap, -} - -impl AssemblerState { - /// Create a fresh function name with prefix `name`. - pub fn fresh_name(&mut self, name: impl Into) -> String { - let name = name.into(); - if !self.name_next_id.contains_key(&name) { - self.name_next_id.insert(name.clone(), 0); - } - - let id = self.name_next_id.get_mut(&name).unwrap(); - let name = format!("{}_{}", &name, id); - *id += 1; - name - } -} - -/// The very first step for constructing a function at runtime. -pub struct Assembler { - state: Arc>, -} - -impl Default for Assembler { - fn default() -> Self { - Self { - state: Arc::new(Default::default()), - } - } -} - -impl Assembler { - /// Register an external Rust function to make it accessible by runtime generated functions. - /// Parameters and return types are used to impose type safety while constructing an AST. - pub fn register_extern_fn( - &self, - name: impl Into, - ptr: *const u8, - params: Vec, - returns: Option, - ) -> Result<()> { - let extern_funcs = &mut self.state.lock().extern_funcs; - let fn_name = name.into(); - let old = extern_funcs.insert( - fn_name.clone(), - ExternFuncSignature { - name: fn_name, - code: ptr, - params, - returns, - }, - ); - - match old { - None => Ok(()), - Some(old) => internal_err!("Extern function {} already exists", old.name), - } - } - - /// Create a new FunctionBuilder with `name` prefix - pub fn new_func_builder(&self, name: impl Into) -> FunctionBuilder { - let name = self.state.lock().fresh_name(name); - FunctionBuilder::new(name, self.state.clone()) - } - - /// Create JIT env which we could compile the AST of constructed function - /// into runnable code. - pub fn create_jit(&self) -> JIT { - let symbols = self - .state - .lock() - .extern_funcs - .values() - .map(|s| (s.name.clone(), s.code)) - .collect::>(); - JIT::new(symbols) - } -} - -/// Function builder API. Stores the state while -/// we are constructing an AST for a function. -pub struct FunctionBuilder { - name: String, - params: Vec<(String, JITType)>, - ret: Option<(String, JITType)>, - fields: VecDeque>, - assembler_state: Arc>, -} - -impl FunctionBuilder { - fn new(name: impl Into, assembler_state: Arc>) -> Self { - let mut fields = VecDeque::new(); - fields.push_back(HashMap::new()); - Self { - name: name.into(), - params: Vec::new(), - ret: None, - fields, - assembler_state, - } - } - - /// Add one more parameter to the function. - #[must_use] - pub fn param(mut self, name: impl Into, ty: JITType) -> Self { - let name = name.into(); - assert!(!self.fields.back().unwrap().contains_key(&name)); - self.params.push((name.clone(), ty)); - self.fields.back_mut().unwrap().insert(name, ty); - self - } - - /// Set return type for the function. Functions are of `void` type by default if - /// you do not set the return type. - #[must_use] - pub fn ret(mut self, name: impl Into, ty: JITType) -> Self { - let name = name.into(); - assert!(!self.fields.back().unwrap().contains_key(&name)); - self.ret = Some((name.clone(), ty)); - self.fields.back_mut().unwrap().insert(name, ty); - self - } - - /// Enter the function body at start the building. - pub fn enter_block(&mut self) -> CodeBlock { - self.fields.push_back(HashMap::new()); - CodeBlock { - fields: &mut self.fields, - state: &self.assembler_state, - stmts: vec![], - while_state: None, - if_state: None, - fn_state: Some(GeneratedFunction { - name: self.name.clone(), - params: self.params.clone(), - body: vec![], - ret: self.ret.clone(), - }), - } - } -} - -/// Keep `while` condition expr as we are constructing while loop body. -struct WhileState { - condition: Expr, -} - -/// Keep `if-then-else` state, including condition expr, the already built -/// then statements (if we are during building the else block). -struct IfElseState { - condition: Expr, - then_stmts: Vec, - in_then: bool, -} - -impl IfElseState { - /// Move the all current statements in the `then` block and move to `else` block. - fn enter_else(&mut self, then_stmts: Vec) { - self.then_stmts = then_stmts; - self.in_then = false; - } -} - -/// Code block consists of statements and acts as anonymous namespace scope for items and variable declarations. -pub struct CodeBlock<'a> { - /// A stack that containing all defined variables so far. The variables defined - /// in the current block are at the top stack frame. - /// Fields provides a shadow semantics of the same name in outsider block, and are - /// used to guarantee type safety while constructing AST. - fields: &'a mut VecDeque>, - /// The state of Assembler, used for type checking function calls. - state: &'a Arc>, - /// Holding all statements for the current code block. - stmts: Vec, - while_state: Option, - if_state: Option, - /// Keep track of function params and return types, only valid for function main block. - fn_state: Option, -} - -impl<'a> CodeBlock<'a> { - pub fn build(&mut self) -> GeneratedFunction { - assert!( - self.fn_state.is_some(), - "Can only call build on outermost function block" - ); - let mut gen = self.fn_state.take().unwrap(); - gen.body = self.stmts.drain(..).collect::>(); - gen - } - - /// Leave the current block and returns the statements constructed. - fn leave(&mut self) -> Result { - self.fields.pop_back(); - if let Some(ref mut while_state) = self.while_state { - let WhileState { condition } = while_state; - let stmts = self.stmts.drain(..).collect::>(); - return Ok(Stmt::WhileLoop(Box::new(condition.clone()), stmts)); - } - - if let Some(ref mut if_state) = self.if_state { - let IfElseState { - condition, - then_stmts, - in_then, - } = if_state; - return if *in_then { - assert!(then_stmts.is_empty()); - let stmts = self.stmts.drain(..).collect::>(); - Ok(Stmt::IfElse(Box::new(condition.clone()), stmts, Vec::new())) - } else { - assert!(!then_stmts.is_empty()); - let then_stmts = then_stmts.drain(..).collect::>(); - let else_stmts = self.stmts.drain(..).collect::>(); - Ok(Stmt::IfElse( - Box::new(condition.clone()), - then_stmts, - else_stmts, - )) - }; - } - unreachable!() - } - - /// Enter else block. Try [`Self::if_block`] first which is much easier to use. - fn enter_else(&mut self) { - self.fields.pop_back(); - self.fields.push_back(HashMap::new()); - assert!(self.if_state.is_some() && self.if_state.as_ref().unwrap().in_then); - let new_then = self.stmts.drain(..).collect::>(); - if let Some(s) = self.if_state.iter_mut().next() { - s.enter_else(new_then) - } - } - - /// Declare variable `name` of a type. - pub fn declare(&mut self, name: impl Into, ty: JITType) -> Result<()> { - let name = name.into(); - let typ = self.fields.back().unwrap().get(&name); - match typ { - Some(typ) => internal_err!( - "Variable {} of {} already exists in the current scope", - name, - typ - ), - None => { - self.fields.back_mut().unwrap().insert(name.clone(), ty); - self.stmts.push(Stmt::Declare(name, ty)); - Ok(()) - } - } - } - - fn find_type(&self, name: impl Into) -> Option { - let name = name.into(); - for scope in self.fields.iter().rev() { - let typ = scope.get(&name); - if let Some(typ) = typ { - return Some(*typ); - } - } - None - } - - /// Assignment statement. Assign a expression value to a variable. - pub fn assign(&mut self, name: impl Into, expr: Expr) -> Result<()> { - let name = name.into(); - let typ = self.find_type(&name); - match typ { - Some(typ) => { - if typ != expr.get_type() { - internal_err!( - "Variable {} of {} cannot be assigned to {}", - name, - typ, - expr.get_type() - ) - } else { - self.stmts.push(Stmt::Assign(name, Box::new(expr))); - Ok(()) - } - } - None => internal_err!("unknown identifier: {}", name), - } - } - - /// Declare variable with initialization. - pub fn declare_as(&mut self, name: impl Into, expr: Expr) -> Result<()> { - let name = name.into(); - let typ = self.fields.back().unwrap().get(&name); - match typ { - Some(typ) => { - internal_err!( - "Variable {} of {} already exists in the current scope", - name, - typ - ) - } - None => { - self.fields - .back_mut() - .unwrap() - .insert(name.clone(), expr.get_type()); - self.stmts - .push(Stmt::Declare(name.clone(), expr.get_type())); - self.stmts.push(Stmt::Assign(name, Box::new(expr))); - Ok(()) - } - } - } - - /// Call external function for side effect only. - pub fn call_stmt(&mut self, name: impl Into, args: Vec) -> Result<()> { - self.stmts.push(Stmt::Call(name.into(), args)); - Ok(()) - } - - /// Enter `while` loop block. Try [`Self::while_block`] first which is much easier to use. - fn while_loop(&mut self, cond: Expr) -> Result { - if cond.get_type() != BOOL { - internal_err!("while condition must be bool") - } else { - self.fields.push_back(HashMap::new()); - Ok(CodeBlock { - fields: self.fields, - state: self.state, - stmts: vec![], - while_state: Some(WhileState { condition: cond }), - if_state: None, - fn_state: None, - }) - } - } - - /// Enter `if-then-else`'s then block. Try [`Self::if_block`] first which is much easier to use. - fn if_else(&mut self, cond: Expr) -> Result { - if cond.get_type() != BOOL { - internal_err!("if condition must be bool") - } else { - self.fields.push_back(HashMap::new()); - Ok(CodeBlock { - fields: self.fields, - state: self.state, - stmts: vec![], - while_state: None, - if_state: Some(IfElseState { - condition: cond, - then_stmts: vec![], - in_then: true, - }), - fn_state: None, - }) - } - } - - /// Construct a `if-then-else` block with each part provided. - /// - /// E.g. if n == 0 { r = 0 } else { r = 1} could be write as: - /// x.if_block( - /// |cond| cond.eq(cond.id("n")?, cond.lit_i(0)), - /// |t| { - /// t.assign("r", t.lit_i(0))?; - /// Ok(()) - /// }, - /// |e| t.assign("r", t.lit_i(1))?; - /// Ok(()) - /// }, - /// )?; - pub fn if_block( - &mut self, - mut cond: C, - mut then_blk: T, - mut else_blk: E, - ) -> Result<()> - where - C: FnMut(&mut CodeBlock) -> Result, - T: FnMut(&mut CodeBlock) -> Result<()>, - E: FnMut(&mut CodeBlock) -> Result<()>, - { - let cond = cond(self)?; - let mut body = self.if_else(cond)?; - then_blk(&mut body)?; - body.enter_else(); - else_blk(&mut body)?; - let if_else = body.leave()?; - self.stmts.push(if_else); - Ok(()) - } - - /// Construct a `while` block with each part provided. - /// - /// E.g. while n != 0 { n = n - 1;} could be write as: - /// x.while_block( - /// |cond| cond.ne(cond.id("n")?, cond.lit_i(0)), - /// |w| { - /// w.assign("n", w.sub(w.id("n")?, w.lit_i(1))?)?; - /// Ok(()) - /// }, - /// )?; - pub fn while_block(&mut self, mut cond: C, mut body_blk: B) -> Result<()> - where - C: FnMut(&mut CodeBlock) -> Result, - B: FnMut(&mut CodeBlock) -> Result<()>, - { - let cond = cond(self)?; - let mut body = self.while_loop(cond)?; - body_blk(&mut body)?; - let while_stmt = body.leave()?; - self.stmts.push(while_stmt); - Ok(()) - } - - /// Create a literal `val` of `ty` type. - pub fn lit(&self, val: impl Into, ty: JITType) -> Expr { - Expr::Literal(Literal::Parsing(val.into(), ty)) - } - - /// Shorthand to create i64 literal - pub fn lit_i(&self, val: impl Into) -> Expr { - Expr::Literal(Literal::Typed(TypedLit::Int(val.into()))) - } - - /// Shorthand to create f32 literal - pub fn lit_f(&self, val: f32) -> Expr { - Expr::Literal(Literal::Typed(TypedLit::Float(val))) - } - - /// Shorthand to create f64 literal - pub fn lit_d(&self, val: f64) -> Expr { - Expr::Literal(Literal::Typed(TypedLit::Double(val))) - } - - /// Shorthand to create boolean literal - pub fn lit_b(&self, val: bool) -> Expr { - Expr::Literal(Literal::Typed(TypedLit::Bool(val))) - } - - /// Create a reference to an already defined variable. - pub fn id(&self, name: impl Into) -> Result { - let name = name.into(); - match self.find_type(&name) { - None => internal_err!("unknown identifier: {}", name), - Some(typ) => Ok(Expr::Identifier(name, typ)), - } - } - - /// Binary comparison expression: lhs == rhs - pub fn eq(&self, lhs: Expr, rhs: Expr) -> Result { - if lhs.get_type() != rhs.get_type() { - internal_err!("cannot compare {} and {}", lhs.get_type(), rhs.get_type()) - } else { - Ok(Expr::Binary(BinaryExpr::Eq(Box::new(lhs), Box::new(rhs)))) - } - } - - /// Binary comparison expression: lhs != rhs - pub fn ne(&self, lhs: Expr, rhs: Expr) -> Result { - if lhs.get_type() != rhs.get_type() { - internal_err!("cannot compare {} and {}", lhs.get_type(), rhs.get_type()) - } else { - Ok(Expr::Binary(BinaryExpr::Ne(Box::new(lhs), Box::new(rhs)))) - } - } - - /// Binary comparison expression: lhs < rhs - pub fn lt(&self, lhs: Expr, rhs: Expr) -> Result { - if lhs.get_type() != rhs.get_type() { - internal_err!("cannot compare {} and {}", lhs.get_type(), rhs.get_type()) - } else { - Ok(Expr::Binary(BinaryExpr::Lt(Box::new(lhs), Box::new(rhs)))) - } - } - - /// Binary comparison expression: lhs <= rhs - pub fn le(&self, lhs: Expr, rhs: Expr) -> Result { - if lhs.get_type() != rhs.get_type() { - internal_err!("cannot compare {} and {}", lhs.get_type(), rhs.get_type()) - } else { - Ok(Expr::Binary(BinaryExpr::Le(Box::new(lhs), Box::new(rhs)))) - } - } - - /// Binary comparison expression: lhs > rhs - pub fn gt(&self, lhs: Expr, rhs: Expr) -> Result { - if lhs.get_type() != rhs.get_type() { - internal_err!("cannot compare {} and {}", lhs.get_type(), rhs.get_type()) - } else { - Ok(Expr::Binary(BinaryExpr::Gt(Box::new(lhs), Box::new(rhs)))) - } - } - - /// Binary comparison expression: lhs >= rhs - pub fn ge(&self, lhs: Expr, rhs: Expr) -> Result { - if lhs.get_type() != rhs.get_type() { - internal_err!("cannot compare {} and {}", lhs.get_type(), rhs.get_type()) - } else { - Ok(Expr::Binary(BinaryExpr::Ge(Box::new(lhs), Box::new(rhs)))) - } - } - - /// Binary arithmetic expression: lhs + rhs - pub fn add(&self, lhs: Expr, rhs: Expr) -> Result { - if lhs.get_type() != rhs.get_type() { - internal_err!("cannot add {} and {}", lhs.get_type(), rhs.get_type()) - } else { - Ok(Expr::Binary(BinaryExpr::Add(Box::new(lhs), Box::new(rhs)))) - } - } - - /// Binary arithmetic expression: lhs - rhs - pub fn sub(&self, lhs: Expr, rhs: Expr) -> Result { - if lhs.get_type() != rhs.get_type() { - internal_err!("cannot subtract {} and {}", lhs.get_type(), rhs.get_type()) - } else { - Ok(Expr::Binary(BinaryExpr::Sub(Box::new(lhs), Box::new(rhs)))) - } - } - - /// Binary arithmetic expression: lhs * rhs - pub fn mul(&self, lhs: Expr, rhs: Expr) -> Result { - if lhs.get_type() != rhs.get_type() { - internal_err!("cannot multiply {} and {}", lhs.get_type(), rhs.get_type()) - } else { - Ok(Expr::Binary(BinaryExpr::Mul(Box::new(lhs), Box::new(rhs)))) - } - } - - /// Binary arithmetic expression: lhs / rhs - pub fn div(&self, lhs: Expr, rhs: Expr) -> Result { - if lhs.get_type() != rhs.get_type() { - internal_err!("cannot divide {} and {}", lhs.get_type(), rhs.get_type()) - } else { - Ok(Expr::Binary(BinaryExpr::Div(Box::new(lhs), Box::new(rhs)))) - } - } - - /// Call external function `name` with parameters - pub fn call(&self, name: impl Into, params: Vec) -> Result { - let fn_name = name.into(); - if let Some(func) = self.state.lock().extern_funcs.get(&fn_name) { - for ((i, t1), t2) in params.iter().enumerate().zip(func.params.iter()) { - if t1.get_type() != *t2 { - return internal_err!( - "Func {} need {} as arg{}, get {}", - &fn_name, - t2, - i, - t1.get_type() - ); - } - } - Ok(Expr::Call(fn_name, params, func.returns.unwrap_or(NIL))) - } else { - internal_err!("No func with the name {} exist", fn_name) - } - } - - /// Return the value pointed to by the ptr stored in `ptr` - pub fn load(&self, ptr: Expr, ty: JITType) -> Result { - Ok(Expr::Load(Box::new(ptr), ty)) - } - - /// Store the value in `value` to the address in `ptr` - pub fn store(&mut self, value: Expr, ptr: Expr) -> Result<()> { - self.stmts.push(Stmt::Store(Box::new(value), Box::new(ptr))); - Ok(()) - } -} - -impl Display for GeneratedFunction { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "fn {}(", self.name)?; - for (i, (name, ty)) in self.params.iter().enumerate() { - if i != 0 { - write!(f, ", ")?; - } - write!(f, "{name}: {ty}")?; - } - write!(f, ") -> ")?; - if let Some((name, ty)) = &self.ret { - write!(f, "{name}: {ty}")?; - } else { - write!(f, "()")?; - } - writeln!(f, " {{")?; - for stmt in &self.body { - stmt.fmt_ident(4, f)?; - } - write!(f, "}}") - } -} diff --git a/datafusion/jit/src/ast.rs b/datafusion/jit/src/ast.rs deleted file mode 100644 index 36741432ec257..0000000000000 --- a/datafusion/jit/src/ast.rs +++ /dev/null @@ -1,449 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::datatypes::DataType; -use cranelift::codegen::ir; -use datafusion_common::{DFSchemaRef, DataFusionError, ScalarValue}; -use std::fmt::{Display, Formatter}; - -#[derive(Clone, Debug)] -/// Statement -pub enum Stmt { - /// if-then-else - IfElse(Box, Vec, Vec), - /// while - WhileLoop(Box, Vec), - /// assignment - Assign(String, Box), - /// call function for side effect - Call(String, Vec), - /// declare a new variable of type - Declare(String, JITType), - /// store value (the first expr) to an address (the second expr) - Store(Box, Box), -} - -#[derive(Copy, Clone, Debug, PartialEq)] -/// Shorthand typed literals -pub enum TypedLit { - Bool(bool), - Int(i64), - Float(f32), - Double(f64), -} - -#[derive(Clone, Debug, PartialEq)] -/// Expression -pub enum Expr { - /// literal - Literal(Literal), - /// variable - Identifier(String, JITType), - /// binary expression - Binary(BinaryExpr), - /// call function expression - Call(String, Vec, JITType), - /// Load a value from pointer - Load(Box, JITType), -} - -impl Expr { - pub fn get_type(&self) -> JITType { - match self { - Expr::Literal(lit) => lit.get_type(), - Expr::Identifier(_, ty) => *ty, - Expr::Binary(bin) => bin.get_type(), - Expr::Call(_, _, ty) => *ty, - Expr::Load(_, ty) => *ty, - } - } -} - -impl Literal { - fn get_type(&self) -> JITType { - match self { - Literal::Parsing(_, ty) => *ty, - Literal::Typed(tl) => tl.get_type(), - } - } -} - -impl TypedLit { - fn get_type(&self) -> JITType { - match self { - TypedLit::Bool(_) => BOOL, - TypedLit::Int(_) => I64, - TypedLit::Float(_) => F32, - TypedLit::Double(_) => F64, - } - } -} - -impl BinaryExpr { - fn get_type(&self) -> JITType { - match self { - BinaryExpr::Eq(_, _) => BOOL, - BinaryExpr::Ne(_, _) => BOOL, - BinaryExpr::Lt(_, _) => BOOL, - BinaryExpr::Le(_, _) => BOOL, - BinaryExpr::Gt(_, _) => BOOL, - BinaryExpr::Ge(_, _) => BOOL, - BinaryExpr::Add(lhs, _) => lhs.get_type(), - BinaryExpr::Sub(lhs, _) => lhs.get_type(), - BinaryExpr::Mul(lhs, _) => lhs.get_type(), - BinaryExpr::Div(lhs, _) => lhs.get_type(), - } - } -} - -#[derive(Clone, Debug, PartialEq)] -/// Binary expression -pub enum BinaryExpr { - /// == - Eq(Box, Box), - /// != - Ne(Box, Box), - /// < - Lt(Box, Box), - /// <= - Le(Box, Box), - /// > - Gt(Box, Box), - /// >= - Ge(Box, Box), - /// add - Add(Box, Box), - /// subtract - Sub(Box, Box), - /// multiply - Mul(Box, Box), - /// divide - Div(Box, Box), -} - -#[derive(Clone, Debug, PartialEq)] -/// Literal -pub enum Literal { - /// Parsable literal with type - Parsing(String, JITType), - /// Shorthand literals of common types - Typed(TypedLit), -} - -impl TryFrom<(datafusion_expr::Expr, DFSchemaRef)> for Expr { - type Error = DataFusionError; - - // Try to JIT compile the Expr for faster evaluation - fn try_from( - (value, schema): (datafusion_expr::Expr, DFSchemaRef), - ) -> Result { - match &value { - datafusion_expr::Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr { - left, - op, - right, - }) => { - let op = match op { - datafusion_expr::Operator::Eq => BinaryExpr::Eq, - datafusion_expr::Operator::NotEq => BinaryExpr::Ne, - datafusion_expr::Operator::Lt => BinaryExpr::Lt, - datafusion_expr::Operator::LtEq => BinaryExpr::Le, - datafusion_expr::Operator::Gt => BinaryExpr::Gt, - datafusion_expr::Operator::GtEq => BinaryExpr::Ge, - datafusion_expr::Operator::Plus => BinaryExpr::Add, - datafusion_expr::Operator::Minus => BinaryExpr::Sub, - datafusion_expr::Operator::Multiply => BinaryExpr::Mul, - datafusion_expr::Operator::Divide => BinaryExpr::Div, - _ => { - return Err(DataFusionError::NotImplemented(format!( - "Compiling binary expression {value} not yet supported" - ))) - } - }; - Ok(Expr::Binary(op( - Box::new((*left.clone(), schema.clone()).try_into()?), - Box::new((*right.clone(), schema).try_into()?), - ))) - } - datafusion_expr::Expr::Column(col) => { - let field = schema.field_from_column(col)?; - let ty = field.data_type(); - - let jit_type = JITType::try_from(ty)?; - - Ok(Expr::Identifier(field.qualified_name(), jit_type)) - } - datafusion_expr::Expr::Literal(s) => { - let lit = match s { - ScalarValue::Boolean(Some(b)) => TypedLit::Bool(*b), - ScalarValue::Float32(Some(f)) => TypedLit::Float(*f), - ScalarValue::Float64(Some(f)) => TypedLit::Double(*f), - ScalarValue::Int64(Some(i)) => TypedLit::Int(*i), - _ => { - return Err(DataFusionError::NotImplemented(format!( - "Compiling Scalar {s} not yet supported in JIT mode" - ))) - } - }; - Ok(Expr::Literal(Literal::Typed(lit))) - } - _ => Err(DataFusionError::NotImplemented(format!( - "Compiling {value} not yet supported" - ))), - } - } -} - -#[derive(Copy, Clone, PartialEq, Eq, Hash)] -/// Type to be used in JIT -pub struct JITType { - /// The cranelift type - pub(crate) native: ir::Type, - /// re-expose inner field of `ir::Type` out for easier pattern matching - pub(crate) code: u8, -} - -/// null type as placeholder -pub const NIL: JITType = JITType { - native: ir::types::INVALID, - code: 0, -}; -/// bool -pub const BOOL: JITType = JITType { - native: ir::types::B1, - code: 0x70, -}; -/// integer of 1 byte -pub const I8: JITType = JITType { - native: ir::types::I8, - code: 0x76, -}; -/// integer of 2 bytes -pub const I16: JITType = JITType { - native: ir::types::I16, - code: 0x77, -}; -/// integer of 4 bytes -pub const I32: JITType = JITType { - native: ir::types::I32, - code: 0x78, -}; -/// integer of 8 bytes -pub const I64: JITType = JITType { - native: ir::types::I64, - code: 0x79, -}; -/// Ieee float of 32 bits -pub const F32: JITType = JITType { - native: ir::types::F32, - code: 0x7b, -}; -/// Ieee float of 64 bits -pub const F64: JITType = JITType { - native: ir::types::F64, - code: 0x7c, -}; -/// Pointer type of 32 bits -pub const R32: JITType = JITType { - native: ir::types::R32, - code: 0x7e, -}; -/// Pointer type of 64 bits -pub const R64: JITType = JITType { - native: ir::types::R64, - code: 0x7f, -}; -pub const PTR_SIZE: usize = std::mem::size_of::(); -/// The pointer type to use based on our currently target. -pub const PTR: JITType = if PTR_SIZE == 8 { R64 } else { R32 }; - -impl TryFrom<&DataType> for JITType { - type Error = DataFusionError; - - /// Try to convert DataFusion's [DataType] to [JITType] - fn try_from(df_type: &DataType) -> Result { - match df_type { - DataType::Int64 => Ok(I64), - DataType::Float32 => Ok(F32), - DataType::Float64 => Ok(F64), - DataType::Boolean => Ok(BOOL), - - _ => Err(DataFusionError::NotImplemented(format!( - "Compiling Expression with type {df_type} not yet supported in JIT mode" - ))), - } - } -} - -impl Stmt { - /// print the statement with indentation - pub fn fmt_ident(&self, ident: usize, f: &mut Formatter) -> std::fmt::Result { - let mut ident_str = String::new(); - for _ in 0..ident { - ident_str.push(' '); - } - match self { - Stmt::IfElse(cond, then_stmts, else_stmts) => { - writeln!(f, "{ident_str}if {cond} {{")?; - for stmt in then_stmts { - stmt.fmt_ident(ident + 4, f)?; - } - writeln!(f, "{ident_str}}} else {{")?; - for stmt in else_stmts { - stmt.fmt_ident(ident + 4, f)?; - } - writeln!(f, "{ident_str}}}") - } - Stmt::WhileLoop(cond, stmts) => { - writeln!(f, "{ident_str}while {cond} {{")?; - for stmt in stmts { - stmt.fmt_ident(ident + 4, f)?; - } - writeln!(f, "{ident_str}}}") - } - Stmt::Assign(name, expr) => { - writeln!(f, "{ident_str}{name} = {expr};") - } - Stmt::Call(name, args) => { - writeln!( - f, - "{}{}({});", - ident_str, - name, - args.iter() - .map(|e| e.to_string()) - .collect::>() - .join(", ") - ) - } - Stmt::Declare(name, ty) => { - writeln!(f, "{ident_str}let {name}: {ty};") - } - Stmt::Store(value, ptr) => { - writeln!(f, "{ident_str}*({ptr}) = {value}") - } - } - } -} - -impl Display for Stmt { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - self.fmt_ident(0, f)?; - Ok(()) - } -} - -impl Display for Expr { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Expr::Literal(l) => write!(f, "{l}"), - Expr::Identifier(name, _) => write!(f, "{name}"), - Expr::Binary(be) => write!(f, "{be}"), - Expr::Call(name, exprs, _) => { - write!( - f, - "{}({})", - name, - exprs - .iter() - .map(|e| e.to_string()) - .collect::>() - .join(", ") - ) - } - Expr::Load(ptr, _) => write!(f, "*({ptr})",), - } - } -} - -impl Display for Literal { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Literal::Parsing(str, _) => write!(f, "{str}"), - Literal::Typed(tl) => write!(f, "{tl}"), - } - } -} - -impl Display for TypedLit { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - TypedLit::Bool(b) => write!(f, "{b}"), - TypedLit::Int(i) => write!(f, "{i}"), - TypedLit::Float(fl) => write!(f, "{fl}"), - TypedLit::Double(d) => write!(f, "{d}"), - } - } -} - -impl Display for BinaryExpr { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - BinaryExpr::Eq(lhs, rhs) => write!(f, "{lhs} == {rhs}"), - BinaryExpr::Ne(lhs, rhs) => write!(f, "{lhs} != {rhs}"), - BinaryExpr::Lt(lhs, rhs) => write!(f, "{lhs} < {rhs}"), - BinaryExpr::Le(lhs, rhs) => write!(f, "{lhs} <= {rhs}"), - BinaryExpr::Gt(lhs, rhs) => write!(f, "{lhs} > {rhs}"), - BinaryExpr::Ge(lhs, rhs) => write!(f, "{lhs} >= {rhs}"), - BinaryExpr::Add(lhs, rhs) => write!(f, "{lhs} + {rhs}"), - BinaryExpr::Sub(lhs, rhs) => write!(f, "{lhs} - {rhs}"), - BinaryExpr::Mul(lhs, rhs) => write!(f, "{lhs} * {rhs}"), - BinaryExpr::Div(lhs, rhs) => write!(f, "{lhs} / {rhs}"), - } - } -} - -impl std::fmt::Display for JITType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{self:?}") - } -} - -impl std::fmt::Debug for JITType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.code { - 0 => write!(f, "nil"), - 0x70 => write!(f, "bool"), - 0x76 => write!(f, "i8"), - 0x77 => write!(f, "i16"), - 0x78 => write!(f, "i32"), - 0x79 => write!(f, "i64"), - 0x7b => write!(f, "f32"), - 0x7c => write!(f, "f64"), - 0x7e => write!(f, "small_ptr"), - 0x7f => write!(f, "ptr"), - _ => write!(f, "unknown"), - } - } -} - -impl From<&str> for JITType { - fn from(x: &str) -> Self { - match x { - "bool" => BOOL, - "i8" => I8, - "i16" => I16, - "i32" => I32, - "i64" => I64, - "f32" => F32, - "f64" => F64, - "small_ptr" => R32, - "ptr" => R64, - _ => panic!("unknown type: {x}"), - } - } -} diff --git a/datafusion/jit/src/compile.rs b/datafusion/jit/src/compile.rs deleted file mode 100644 index 93b463d5cb1cd..0000000000000 --- a/datafusion/jit/src/compile.rs +++ /dev/null @@ -1,207 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Compile DataFusion Expr to JIT'd function. - -use datafusion_common::Result; - -use crate::api::Assembler; -use crate::ast::{JITType, I32}; -use crate::{ - api::GeneratedFunction, - ast::{Expr as JITExpr, I64, PTR_SIZE}, -}; - -/// Wrap JIT Expr to array compute function. -pub fn build_calc_fn( - assembler: &Assembler, - jit_expr: JITExpr, - inputs: Vec<(String, JITType)>, - ret_type: JITType, -) -> Result { - // Alias pointer type. - // The raw pointer `R64` or `R32` is not compatible with integers. - const PTR_TYPE: JITType = if PTR_SIZE == 8 { I64 } else { I32 }; - - let mut builder = assembler.new_func_builder("calc_fn"); - // Declare in-param. - // Each input takes one position, following by a pointer to place result, - // and the last is the length of inputs/output arrays. - for (name, _) in &inputs { - builder = builder.param(format!("{name}_array"), PTR_TYPE); - } - let mut builder = builder.param("result", ret_type).param("len", I64); - - // Start build function body. - // It's loop that calculates the result one by one. - let mut fn_body = builder.enter_block(); - fn_body.declare_as("index", fn_body.lit_i(0))?; - fn_body.while_block( - |cond| cond.lt(cond.id("index")?, cond.id("len")?), - |w| { - w.declare_as("offset", w.mul(w.id("index")?, w.lit_i(PTR_SIZE as i64))?)?; - for (name, ty) in &inputs { - w.declare_as( - format!("{name}_ptr"), - w.add(w.id(format!("{name}_array"))?, w.id("offset")?)?, - )?; - w.declare_as(name, w.load(w.id(format!("{name}_ptr"))?, *ty)?)?; - } - w.declare_as("res_ptr", w.add(w.id("result")?, w.id("offset")?)?)?; - w.declare_as("res", jit_expr.clone())?; - w.store(w.id("res")?, w.id("res_ptr")?)?; - - w.assign("index", w.add(w.id("index")?, w.lit_i(1))?)?; - Ok(()) - }, - )?; - - let gen_func = fn_body.build(); - Ok(gen_func) -} - -#[cfg(test)] -mod test { - use std::{collections::HashMap, sync::Arc}; - - use arrow::{ - array::{Array, PrimitiveArray}, - datatypes::{DataType, Int64Type}, - }; - use datafusion_common::{DFField, DFSchema, DataFusionError}; - use datafusion_expr::Expr as DFExpr; - - use crate::ast::BinaryExpr; - - use super::*; - - fn run_df_expr( - df_expr: DFExpr, - schema: Arc, - lhs: PrimitiveArray, - rhs: PrimitiveArray, - ) -> Result> { - if lhs.null_count() != 0 || rhs.null_count() != 0 { - return Err(DataFusionError::NotImplemented( - "Computing on nullable array not yet supported".to_string(), - )); - } - if lhs.len() != rhs.len() { - return Err(DataFusionError::NotImplemented( - "Computing on different length arrays not yet supported".to_string(), - )); - } - - // translate DF Expr to JIT Expr - let input_fields = schema - .fields() - .iter() - .map(|field| { - Ok(( - field.qualified_name(), - JITType::try_from(field.data_type())?, - )) - }) - .collect::>>()?; - let jit_expr: JITExpr = (df_expr, schema).try_into()?; - - // allocate memory for calc result - let len = lhs.len(); - let result = vec![0i64; len]; - - // compile and run JIT code - let assembler = Assembler::default(); - let gen_func = build_calc_fn(&assembler, jit_expr, input_fields, I64)?; - let mut jit = assembler.create_jit(); - let code_ptr = jit.compile(gen_func)?; - let code_fn = unsafe { - core::mem::transmute::<_, fn(*const i64, *const i64, *const i64, i64) -> ()>( - code_ptr, - ) - }; - code_fn( - lhs.values().as_ptr(), - rhs.values().as_ptr(), - result.as_ptr(), - len as i64, - ); - - let result_array = PrimitiveArray::::from_iter(result); - Ok(result_array) - } - - #[test] - fn array_add() { - let array_a: PrimitiveArray = - PrimitiveArray::from_iter_values((0..10).map(|x| x + 1)); - let array_b: PrimitiveArray = - PrimitiveArray::from_iter_values((10..20).map(|x| x + 1)); - let expected = - arrow::compute::kernels::arithmetic::add(&array_a, &array_b).unwrap(); - - let df_expr = datafusion_expr::col("a") + datafusion_expr::col("b"); - let schema = Arc::new( - DFSchema::new_with_metadata( - vec![ - DFField::new(Some("table1"), "a", DataType::Int64, false), - DFField::new(Some("table1"), "b", DataType::Int64, false), - ], - HashMap::new(), - ) - .unwrap(), - ); - - let result = run_df_expr(df_expr, schema, array_a, array_b).unwrap(); - assert_eq!(result, expected); - } - - #[test] - fn calc_fn_builder() { - let expr = JITExpr::Binary(BinaryExpr::Add( - Box::new(JITExpr::Identifier("table1.a".to_string(), I64)), - Box::new(JITExpr::Identifier("table1.b".to_string(), I64)), - )); - let fields = vec![("table1.a".to_string(), I64), ("table1.b".to_string(), I64)]; - - let expected = r#"fn calc_fn_0(table1.a_array: i64, table1.b_array: i64, result: i64, len: i64) -> () { - let index: i64; - index = 0; - while index < len { - let offset: i64; - offset = index * 8; - let table1.a_ptr: i64; - table1.a_ptr = table1.a_array + offset; - let table1.a: i64; - table1.a = *(table1.a_ptr); - let table1.b_ptr: i64; - table1.b_ptr = table1.b_array + offset; - let table1.b: i64; - table1.b = *(table1.b_ptr); - let res_ptr: i64; - res_ptr = result + offset; - let res: i64; - res = table1.a + table1.b; - *(res_ptr) = res - index = index + 1; - } -}"#; - - let assembler = Assembler::default(); - let gen_func = build_calc_fn(&assembler, expr, fields, I64).unwrap(); - assert_eq!(format!("{}", &gen_func), expected); - } -} diff --git a/datafusion/jit/src/jit.rs b/datafusion/jit/src/jit.rs deleted file mode 100644 index 674d38b42a18e..0000000000000 --- a/datafusion/jit/src/jit.rs +++ /dev/null @@ -1,739 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::api::GeneratedFunction; -use crate::ast::{BinaryExpr, Expr, JITType, Literal, Stmt, TypedLit, BOOL, I64, NIL}; -use cranelift::prelude::*; -use cranelift_jit::{JITBuilder, JITModule}; -use cranelift_module::{Linkage, Module}; -use datafusion_common::internal_err; -use datafusion_common::{DataFusionError, Result}; -use std::collections::HashMap; - -/// The basic JIT class. -#[allow(clippy::upper_case_acronyms)] -pub struct JIT { - /// The function builder context, which is reused across multiple - /// FunctionBuilder instances. - builder_context: FunctionBuilderContext, - - /// The main Cranelift context, which holds the state for codegen. Cranelift - /// separates this from `Module` to allow for parallel compilation, with a - /// context per thread, though this is not the case now. - ctx: codegen::Context, - - /// The module, with the jit backend, which manages the JIT'd - /// functions. - module: JITModule, -} - -impl Default for JIT { - #[cfg(target_arch = "x86_64")] - fn default() -> Self { - let builder = JITBuilder::new(cranelift_module::default_libcall_names()).unwrap(); - let module = JITModule::new(builder); - Self { - builder_context: FunctionBuilderContext::new(), - ctx: module.make_context(), - module, - } - } - - #[cfg(target_arch = "aarch64")] - fn default() -> Self { - let mut flag_builder = settings::builder(); - // On at least AArch64, "colocated" calls use shorter-range relocations, - // which might not reach all definitions; we can't handle that here, so - // we require long-range relocation types. - flag_builder.set("use_colocated_libcalls", "false").unwrap(); - flag_builder.set("is_pic", "false").unwrap(); - let isa_builder = cranelift_native::builder().unwrap_or_else(|msg| { - panic!("host machine is not supported: {msg}"); - }); - let isa = isa_builder - .finish(settings::Flags::new(flag_builder)) - .unwrap_or_else(|msg| { - panic!("host machine is not supported: {msg}"); - }); - let builder = - JITBuilder::with_isa(isa, cranelift_module::default_libcall_names()); - let module = JITModule::new(builder); - Self { - builder_context: FunctionBuilderContext::new(), - ctx: module.make_context(), - module, - } - } -} - -impl JIT { - /// New while registering external functions - pub fn new(symbols: It) -> Self - where - It: IntoIterator, - K: Into, - { - let mut flag_builder = settings::builder(); - flag_builder.set("use_colocated_libcalls", "false").unwrap(); - - #[cfg(target_arch = "x86_64")] - flag_builder.set("is_pic", "true").unwrap(); - - #[cfg(target_arch = "aarch64")] - flag_builder.set("is_pic", "false").unwrap(); - - flag_builder.set("opt_level", "speed").unwrap(); - flag_builder.set("enable_simd", "true").unwrap(); - let isa_builder = cranelift_native::builder().unwrap_or_else(|msg| { - panic!("host machine is not supported: {msg}"); - }); - let isa = isa_builder - .finish(settings::Flags::new(flag_builder)) - .unwrap(); - let mut builder = - JITBuilder::with_isa(isa, cranelift_module::default_libcall_names()); - builder.symbols(symbols); - let module = JITModule::new(builder); - Self { - builder_context: FunctionBuilderContext::new(), - ctx: module.make_context(), - module, - } - } - - /// Compile the generated function into machine code. - pub fn compile(&mut self, func: GeneratedFunction) -> Result<*const u8> { - let GeneratedFunction { - name, - params, - body, - ret, - } = func; - - // Translate the AST nodes into Cranelift IR. - self.translate(params, ret, body)?; - - // Next, declare the function to jit. Functions must be declared - // before they can be called, or defined. - let id = self - .module - .declare_function(&name, Linkage::Export, &self.ctx.func.signature) - .map_err(|e| { - DataFusionError::Internal(format!( - "failed in declare the function to jit: {e:?}" - )) - })?; - - // Define the function to jit. This finishes compilation, although - // there may be outstanding relocations to perform. Currently, jit - // cannot finish relocations until all functions to be called are - // defined. For now, we'll just finalize the function below. - self.module - .define_function(id, &mut self.ctx) - .map_err(|e| { - DataFusionError::Internal(format!( - "failed in define the function to jit: {e:?}" - )) - })?; - - // Now that compilation is finished, we can clear out the context state. - self.module.clear_context(&mut self.ctx); - - // Finalize the functions which we just defined, which resolves any - // outstanding relocations (patching in addresses, now that they're - // available). - self.module.finalize_definitions(); - - // We can now retrieve a pointer to the machine code. - let code = self.module.get_finalized_function(id); - - Ok(code) - } - - // Translate into Cranelift IR. - fn translate( - &mut self, - params: Vec<(String, JITType)>, - the_return: Option<(String, JITType)>, - stmts: Vec, - ) -> Result<()> { - for param in ¶ms { - self.ctx - .func - .signature - .params - .push(AbiParam::new(param.1.native)); - } - - let mut void_return: bool = false; - - // We currently only supports one return value, though - // Cranelift is designed to support more. - match the_return { - None => void_return = true, - Some(ref ret) => { - self.ctx - .func - .signature - .returns - .push(AbiParam::new(ret.1.native)); - } - } - - // Create the builder to build a function. - let mut builder = - FunctionBuilder::new(&mut self.ctx.func, &mut self.builder_context); - - // Create the entry block, to start emitting code in. - let entry_block = builder.create_block(); - - // Since this is the entry block, add block parameters corresponding to - // the function's parameters. - builder.append_block_params_for_function_params(entry_block); - - // Tell the builder to emit code in this block. - builder.switch_to_block(entry_block); - - // And, tell the builder that this block will have no further - // predecessors. Since it's the entry block, it won't have any - // predecessors. - builder.seal_block(entry_block); - - // Walk the AST and declare all variables. - let variables = declare_variables( - &mut builder, - ¶ms, - the_return.as_ref(), - &stmts, - entry_block, - ); - - // Now translate the statements of the function body. - let mut trans = FunctionTranslator { - builder, - variables, - module: &mut self.module, - }; - for stmt in stmts { - trans.translate_stmt(stmt)?; - } - - if !void_return { - // Set up the return variable of the function. Above, we declared a - // variable to hold the return value. Here, we just do a use of that - // variable. - let return_variable = trans - .variables - .get(&the_return.as_ref().unwrap().0) - .unwrap(); - let return_value = trans.builder.use_var(*return_variable); - - // Emit the return instruction. - trans.builder.ins().return_(&[return_value]); - } else { - trans.builder.ins().return_(&[]); - } - - // Tell the builder we're done with this function. - trans.builder.finalize(); - Ok(()) - } -} - -/// A collection of state used for translating from AST nodes -/// into Cranelift IR. -struct FunctionTranslator<'a> { - builder: FunctionBuilder<'a>, - variables: HashMap, - module: &'a mut JITModule, -} - -impl<'a> FunctionTranslator<'a> { - fn translate_stmt(&mut self, stmt: Stmt) -> Result<()> { - match stmt { - Stmt::IfElse(condition, then_body, else_body) => { - self.translate_if_else(*condition, then_body, else_body) - } - Stmt::WhileLoop(condition, loop_body) => { - self.translate_while_loop(*condition, loop_body) - } - Stmt::Assign(name, expr) => self.translate_assign(name, *expr), - Stmt::Call(name, args) => { - self.translate_call_stmt(name, args, NIL)?; - Ok(()) - } - Stmt::Declare(_, _) => Ok(()), - Stmt::Store(value, ptr) => self.translate_store(*ptr, *value), - } - } - - fn translate_typed_lit(&mut self, tl: TypedLit) -> Value { - match tl { - TypedLit::Bool(b) => self.builder.ins().bconst(BOOL.native, b), - TypedLit::Int(i) => self.builder.ins().iconst(I64.native, i), - TypedLit::Float(f) => self.builder.ins().f32const(f), - TypedLit::Double(d) => self.builder.ins().f64const(d), - } - } - - /// When you write out instructions in Cranelift, you get back `Value`s. You - /// can then use these references in other instructions. - fn translate_expr(&mut self, expr: Expr) -> Result { - match expr { - Expr::Literal(nl) => self.translate_literal(nl), - Expr::Identifier(name, _) => { - // `use_var` is used to read the value of a variable. - let variable = self.variables.get(&name).ok_or_else(|| { - DataFusionError::Internal("variable not defined".to_owned()) - })?; - Ok(self.builder.use_var(*variable)) - } - Expr::Binary(b) => self.translate_binary_expr(b), - Expr::Call(name, args, ret) => self.translate_call_expr(name, args, ret), - Expr::Load(ptr, ty) => self.translate_deref(*ptr, ty), - } - } - - fn translate_literal(&mut self, expr: Literal) -> Result { - match expr { - Literal::Parsing(literal, ty) => self.translate_string_lit(literal, ty), - Literal::Typed(lt) => Ok(self.translate_typed_lit(lt)), - } - } - - fn translate_binary_expr(&mut self, expr: BinaryExpr) -> Result { - match expr { - BinaryExpr::Eq(lhs, rhs) => { - let ty = lhs.get_type(); - if ty.code >= 0x76 && ty.code <= 0x79 { - self.translate_icmp(IntCC::Equal, *lhs, *rhs) - } else if ty.code == 0x7b || ty.code == 0x7c { - self.translate_fcmp(FloatCC::Equal, *lhs, *rhs) - } else { - internal_err!("Unsupported type {} for equal comparison", ty) - } - } - BinaryExpr::Ne(lhs, rhs) => { - let ty = lhs.get_type(); - if ty.code >= 0x76 && ty.code <= 0x79 { - self.translate_icmp(IntCC::NotEqual, *lhs, *rhs) - } else if ty.code == 0x7b || ty.code == 0x7c { - self.translate_fcmp(FloatCC::NotEqual, *lhs, *rhs) - } else { - internal_err!("Unsupported type {} for not equal comparison", ty) - } - } - BinaryExpr::Lt(lhs, rhs) => { - let ty = lhs.get_type(); - if ty.code >= 0x76 && ty.code <= 0x79 { - self.translate_icmp(IntCC::SignedLessThan, *lhs, *rhs) - } else if ty.code == 0x7b || ty.code == 0x7c { - self.translate_fcmp(FloatCC::LessThan, *lhs, *rhs) - } else { - internal_err!("Unsupported type {} for less than comparison", ty) - } - } - BinaryExpr::Le(lhs, rhs) => { - let ty = lhs.get_type(); - if ty.code >= 0x76 && ty.code <= 0x79 { - self.translate_icmp(IntCC::SignedLessThanOrEqual, *lhs, *rhs) - } else if ty.code == 0x7b || ty.code == 0x7c { - self.translate_fcmp(FloatCC::LessThanOrEqual, *lhs, *rhs) - } else { - internal_err!( - "Unsupported type {} for less than or equal comparison", - ty - ) - } - } - BinaryExpr::Gt(lhs, rhs) => { - let ty = lhs.get_type(); - if ty.code >= 0x76 && ty.code <= 0x79 { - self.translate_icmp(IntCC::SignedGreaterThan, *lhs, *rhs) - } else if ty.code == 0x7b || ty.code == 0x7c { - self.translate_fcmp(FloatCC::GreaterThan, *lhs, *rhs) - } else { - internal_err!("Unsupported type {} for greater than comparison", ty) - } - } - BinaryExpr::Ge(lhs, rhs) => { - let ty = lhs.get_type(); - if ty.code >= 0x76 && ty.code <= 0x79 { - self.translate_icmp(IntCC::SignedGreaterThanOrEqual, *lhs, *rhs) - } else if ty.code == 0x7b || ty.code == 0x7c { - self.translate_fcmp(FloatCC::GreaterThanOrEqual, *lhs, *rhs) - } else { - internal_err!( - "Unsupported type {} for greater than or equal comparison", - ty - ) - } - } - BinaryExpr::Add(lhs, rhs) => { - let ty = lhs.get_type(); - let lhs = self.translate_expr(*lhs)?; - let rhs = self.translate_expr(*rhs)?; - if ty.code >= 0x76 && ty.code <= 0x79 { - Ok(self.builder.ins().iadd(lhs, rhs)) - } else if ty.code == 0x7b || ty.code == 0x7c { - Ok(self.builder.ins().fadd(lhs, rhs)) - } else { - internal_err!("Unsupported type {} for add", ty) - } - } - BinaryExpr::Sub(lhs, rhs) => { - let ty = lhs.get_type(); - let lhs = self.translate_expr(*lhs)?; - let rhs = self.translate_expr(*rhs)?; - if ty.code >= 0x76 && ty.code <= 0x79 { - Ok(self.builder.ins().isub(lhs, rhs)) - } else if ty.code == 0x7b || ty.code == 0x7c { - Ok(self.builder.ins().fsub(lhs, rhs)) - } else { - internal_err!("Unsupported type {} for sub", ty) - } - } - BinaryExpr::Mul(lhs, rhs) => { - let ty = lhs.get_type(); - let lhs = self.translate_expr(*lhs)?; - let rhs = self.translate_expr(*rhs)?; - if ty.code >= 0x76 && ty.code <= 0x79 { - Ok(self.builder.ins().imul(lhs, rhs)) - } else if ty.code == 0x7b || ty.code == 0x7c { - Ok(self.builder.ins().fmul(lhs, rhs)) - } else { - internal_err!("Unsupported type {} for mul", ty) - } - } - BinaryExpr::Div(lhs, rhs) => { - let ty = lhs.get_type(); - let lhs = self.translate_expr(*lhs)?; - let rhs = self.translate_expr(*rhs)?; - if ty.code >= 0x76 && ty.code <= 0x79 { - Ok(self.builder.ins().udiv(lhs, rhs)) - } else if ty.code == 0x7b || ty.code == 0x7c { - Ok(self.builder.ins().fdiv(lhs, rhs)) - } else { - internal_err!("Unsupported type {} for div", ty) - } - } - } - } - - fn translate_string_lit(&mut self, lit: String, ty: JITType) -> Result { - match ty.code { - 0x70 => { - let b = lit.parse::().unwrap(); - Ok(self.builder.ins().bconst(ty.native, b)) - } - 0x76 => { - let i = lit.parse::().unwrap(); - Ok(self.builder.ins().iconst(ty.native, i as i64)) - } - 0x77 => { - let i = lit.parse::().unwrap(); - Ok(self.builder.ins().iconst(ty.native, i as i64)) - } - 0x78 => { - let i = lit.parse::().unwrap(); - Ok(self.builder.ins().iconst(ty.native, i as i64)) - } - 0x79 => { - let i = lit.parse::().unwrap(); - Ok(self.builder.ins().iconst(ty.native, i)) - } - 0x7b => { - let f = lit.parse::().unwrap(); - Ok(self.builder.ins().f32const(f)) - } - 0x7c => { - let f = lit.parse::().unwrap(); - Ok(self.builder.ins().f64const(f)) - } - _ => internal_err!("Unsupported type {} for string literal", ty), - } - } - - fn translate_assign(&mut self, name: String, expr: Expr) -> Result<()> { - // `def_var` is used to write the value of a variable. Note that - // variables can have multiple definitions. Cranelift will - // convert them into SSA form for itself automatically. - let new_value = self.translate_expr(expr)?; - let variable = self.variables.get(&*name).unwrap(); - self.builder.def_var(*variable, new_value); - Ok(()) - } - - fn translate_deref(&mut self, ptr: Expr, ty: JITType) -> Result { - let ptr = self.translate_expr(ptr)?; - Ok(self.builder.ins().load(ty.native, MemFlags::new(), ptr, 0)) - } - - fn translate_store(&mut self, ptr: Expr, value: Expr) -> Result<()> { - let ptr = self.translate_expr(ptr)?; - let value = self.translate_expr(value)?; - self.builder.ins().store(MemFlags::new(), value, ptr, 0); - Ok(()) - } - - fn translate_icmp(&mut self, cmp: IntCC, lhs: Expr, rhs: Expr) -> Result { - let lhs = self.translate_expr(lhs)?; - let rhs = self.translate_expr(rhs)?; - let c = self.builder.ins().icmp(cmp, lhs, rhs); - Ok(self.builder.ins().bint(I64.native, c)) - } - - fn translate_fcmp(&mut self, cmp: FloatCC, lhs: Expr, rhs: Expr) -> Result { - let lhs = self.translate_expr(lhs)?; - let rhs = self.translate_expr(rhs)?; - let c = self.builder.ins().fcmp(cmp, lhs, rhs); - Ok(self.builder.ins().bint(I64.native, c)) - } - - fn translate_if_else( - &mut self, - condition: Expr, - then_body: Vec, - else_body: Vec, - ) -> Result<()> { - let condition_value = self.translate_expr(condition)?; - - let then_block = self.builder.create_block(); - let else_block = self.builder.create_block(); - let merge_block = self.builder.create_block(); - - // Test the if condition and conditionally branch. - self.builder.ins().brz(condition_value, else_block, &[]); - // Fall through to then block. - self.builder.ins().jump(then_block, &[]); - - self.builder.switch_to_block(then_block); - self.builder.seal_block(then_block); - for stmt in then_body { - self.translate_stmt(stmt)?; - } - - // Jump to the merge block, passing it the block return value. - self.builder.ins().jump(merge_block, &[]); - - self.builder.switch_to_block(else_block); - self.builder.seal_block(else_block); - for stmt in else_body { - self.translate_stmt(stmt)?; - } - - // Jump to the merge block, passing it the block return value. - self.builder.ins().jump(merge_block, &[]); - - // Switch to the merge block for subsequent statements. - self.builder.switch_to_block(merge_block); - - // We've now seen all the predecessors of the merge block. - self.builder.seal_block(merge_block); - Ok(()) - } - - fn translate_while_loop( - &mut self, - condition: Expr, - loop_body: Vec, - ) -> Result<()> { - let header_block = self.builder.create_block(); - let body_block = self.builder.create_block(); - let exit_block = self.builder.create_block(); - - self.builder.ins().jump(header_block, &[]); - self.builder.switch_to_block(header_block); - - let condition_value = self.translate_expr(condition)?; - self.builder.ins().brz(condition_value, exit_block, &[]); - self.builder.ins().jump(body_block, &[]); - - self.builder.switch_to_block(body_block); - self.builder.seal_block(body_block); - - for stmt in loop_body { - self.translate_stmt(stmt)?; - } - self.builder.ins().jump(header_block, &[]); - - self.builder.switch_to_block(exit_block); - - // We've reached the bottom of the loop, so there will be no - // more backedges to the header to exits to the bottom. - self.builder.seal_block(header_block); - self.builder.seal_block(exit_block); - Ok(()) - } - - fn translate_call_expr( - &mut self, - name: String, - args: Vec, - ret: JITType, - ) -> Result { - let mut sig = self.module.make_signature(); - - // Add a parameter for each argument. - for arg in &args { - sig.params.push(AbiParam::new(arg.get_type().native)); - } - - if ret.code == 0 { - return internal_err!( - "Call function {}(..) has void type, it can not be an expression", - &name - ); - } else { - sig.returns.push(AbiParam::new(ret.native)); - } - - let callee = self - .module - .declare_function(&name, Linkage::Import, &sig) - .expect("problem declaring function"); - let local_callee = self.module.declare_func_in_func(callee, self.builder.func); - - let mut arg_values = Vec::new(); - for arg in args { - arg_values.push(self.translate_expr(arg)?) - } - let call = self.builder.ins().call(local_callee, &arg_values); - Ok(self.builder.inst_results(call)[0]) - } - - fn translate_call_stmt( - &mut self, - name: String, - args: Vec, - ret: JITType, - ) -> Result<()> { - let mut sig = self.module.make_signature(); - - // Add a parameter for each argument. - for arg in &args { - sig.params.push(AbiParam::new(arg.get_type().native)); - } - - if ret.code != 0 { - sig.returns.push(AbiParam::new(ret.native)); - } - - let callee = self - .module - .declare_function(&name, Linkage::Import, &sig) - .expect("problem declaring function"); - let local_callee = self.module.declare_func_in_func(callee, self.builder.func); - - let mut arg_values = Vec::new(); - for arg in args { - arg_values.push(self.translate_expr(arg)?) - } - let _ = self.builder.ins().call(local_callee, &arg_values); - Ok(()) - } -} - -fn typed_zero(typ: JITType, builder: &mut FunctionBuilder) -> Value { - match typ.code { - 0x70 => builder.ins().bconst(typ.native, false), - 0x76 => builder.ins().iconst(typ.native, 0), - 0x77 => builder.ins().iconst(typ.native, 0), - 0x78 => builder.ins().iconst(typ.native, 0), - 0x79 => builder.ins().iconst(typ.native, 0), - 0x7b => builder.ins().f32const(0.0), - 0x7c => builder.ins().f64const(0.0), - 0x7e => builder.ins().null(typ.native), - 0x7f => builder.ins().null(typ.native), - _ => panic!("unsupported type"), - } -} - -fn declare_variables( - builder: &mut FunctionBuilder, - params: &[(String, JITType)], - the_return: Option<&(String, JITType)>, - stmts: &[Stmt], - entry_block: Block, -) -> HashMap { - let mut variables = HashMap::new(); - let mut index = 0; - - for (i, name) in params.iter().enumerate() { - let val = builder.block_params(entry_block)[i]; - let var = declare_variable(builder, &mut variables, &mut index, &name.0, name.1); - builder.def_var(var, val); - } - - if let Some(ret) = the_return { - let zero = typed_zero(ret.1, builder); - let return_variable = - declare_variable(builder, &mut variables, &mut index, &ret.0, ret.1); - builder.def_var(return_variable, zero); - } - - for stmt in stmts { - declare_variables_in_stmt(builder, &mut variables, &mut index, stmt); - } - - variables -} - -/// Recursively descend through the AST, translating all declarations. -fn declare_variables_in_stmt( - builder: &mut FunctionBuilder, - variables: &mut HashMap, - index: &mut usize, - stmt: &Stmt, -) { - match *stmt { - Stmt::IfElse(_, ref then_body, ref else_body) => { - for stmt in then_body { - declare_variables_in_stmt(builder, variables, index, stmt); - } - for stmt in else_body { - declare_variables_in_stmt(builder, variables, index, stmt); - } - } - Stmt::WhileLoop(_, ref loop_body) => { - for stmt in loop_body { - declare_variables_in_stmt(builder, variables, index, stmt); - } - } - Stmt::Declare(ref name, typ) => { - declare_variable(builder, variables, index, name, typ); - } - _ => {} - } -} - -/// Declare a single variable declaration. -fn declare_variable( - builder: &mut FunctionBuilder, - variables: &mut HashMap, - index: &mut usize, - name: &str, - typ: JITType, -) -> Variable { - let var = Variable::new(*index); - if !variables.contains_key(name) { - variables.insert(name.into(), var); - builder.declare_var(var, typ.native); - *index += 1; - } - var -} diff --git a/datafusion/jit/src/lib.rs b/datafusion/jit/src/lib.rs deleted file mode 100644 index 377d32d8a37de..0000000000000 --- a/datafusion/jit/src/lib.rs +++ /dev/null @@ -1,153 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Just-In-Time compilation to accelerate DataFusion physical plan execution. - -pub mod api; -pub mod ast; -pub mod compile; -pub mod jit; - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::Arc; - - use crate::api::{Assembler, GeneratedFunction}; - use crate::ast::{BinaryExpr, Expr, Literal, TypedLit, I64}; - use crate::jit::JIT; - use arrow::datatypes::DataType; - use datafusion_common::{DFField, DFSchema, Result}; - use datafusion_expr::{col, lit}; - - #[test] - fn iterative_fib() -> Result<()> { - let expected = r#"fn iterative_fib_0(n: i64) -> r: i64 { - if n == 0 { - r = 0; - } else { - n = n - 1; - let a: i64; - a = 0; - r = 1; - while n != 0 { - let t: i64; - t = r; - r = r + a; - a = t; - n = n - 1; - } - } -}"#; - let assembler = Assembler::default(); - let mut builder = assembler - .new_func_builder("iterative_fib") - .param("n", I64) - .ret("r", I64); - let mut fn_body = builder.enter_block(); - - fn_body.if_block( - |cond| cond.eq(cond.id("n")?, cond.lit_i(0)), - |t| { - t.assign("r", t.lit_i(0))?; - Ok(()) - }, - |e| { - e.assign("n", e.sub(e.id("n")?, e.lit_i(1))?)?; - e.declare_as("a", e.lit_i(0))?; - e.assign("r", e.lit_i(1))?; - e.while_block( - |cond| cond.ne(cond.id("n")?, cond.lit_i(0)), - |w| { - w.declare_as("t", w.id("r")?)?; - w.assign("r", w.add(w.id("r")?, w.id("a")?)?)?; - w.assign("a", w.id("t")?)?; - w.assign("n", w.sub(w.id("n")?, w.lit_i(1))?)?; - Ok(()) - }, - )?; - Ok(()) - }, - )?; - - let gen_func = fn_body.build(); - assert_eq!(format!("{}", &gen_func), expected); - let mut jit = assembler.create_jit(); - assert_eq!(55, run_iterative_fib_code(&mut jit, gen_func, 10)?); - Ok(()) - } - - #[test] - fn from_datafusion_expression() -> Result<()> { - let df_expr = lit(1.0f32) + lit(2.0f32); - let schema = Arc::new(DFSchema::empty()); - let jit_expr: crate::ast::Expr = (df_expr, schema).try_into()?; - - assert_eq!( - jit_expr, - Expr::Binary(BinaryExpr::Add( - Box::new(Expr::Literal(Literal::Typed(TypedLit::Float(1.0)))), - Box::new(Expr::Literal(Literal::Typed(TypedLit::Float(2.0)))) - )), - ); - - Ok(()) - } - - #[test] - fn from_datafusion_expression_schema() -> Result<()> { - let df_expr = col("a") + lit(1i64); - let schema = Arc::new(DFSchema::new_with_metadata( - vec![DFField::new(Some("table1"), "a", DataType::Int64, false)], - HashMap::new(), - )?); - let jit_expr: crate::ast::Expr = (df_expr, schema).try_into()?; - - assert_eq!( - jit_expr, - Expr::Binary(BinaryExpr::Add( - Box::new(Expr::Identifier("table1.a".to_string(), I64)), - Box::new(Expr::Literal(Literal::Typed(TypedLit::Int(1)))) - )), - ); - - Ok(()) - } - - unsafe fn run_code( - jit: &mut JIT, - code: GeneratedFunction, - input: I, - ) -> Result { - // Pass the string to the JIT, and it returns a raw pointer to machine code. - let code_ptr = jit.compile(code)?; - // Cast the raw pointer to a typed function pointer. This is unsafe, because - // this is the critical point where you have to trust that the generated code - // is safe to be called. - let code_fn = core::mem::transmute::<_, fn(I) -> O>(code_ptr); - // And now we can call it! - Ok(code_fn(input)) - } - - fn run_iterative_fib_code( - jit: &mut JIT, - code: GeneratedFunction, - input: isize, - ) -> Result { - unsafe { run_code(jit, code, input) } - } -} diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml index b9886962f61a5..abdf8d9f453b6 100644 --- a/datafusion/row/Cargo.toml +++ b/datafusion/row/Cargo.toml @@ -32,13 +32,8 @@ rust-version = { workspace = true } name = "datafusion_row" path = "src/lib.rs" -[features] -# Used to enable JIT code generation -jit = ["datafusion-jit"] - [dependencies] arrow = { workspace = true } datafusion-common = { path = "../common", version = "23.0.0" } -datafusion-jit = { path = "../jit", version = "23.0.0", optional = true } paste = "^1.0" rand = "0.8" diff --git a/datafusion/row/src/jit/mod.rs b/datafusion/row/src/jit/mod.rs deleted file mode 100644 index 803c96b176b75..0000000000000 --- a/datafusion/row/src/jit/mod.rs +++ /dev/null @@ -1,169 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Just-In-Time(JIT) version for row reader and writers - -pub mod reader; -pub mod writer; - -#[macro_export] -/// register external functions to the assembler -macro_rules! reg_fn { - ($ASS:ident, $FN: path, $PARAM: expr, $RET: expr) => { - $ASS.register_extern_fn(fn_name($FN), $FN as *const u8, $PARAM, $RET)?; - }; -} - -fn fn_name(f: T) -> &'static str { - fn type_name_of(_: T) -> &'static str { - std::any::type_name::() - } - let name = type_name_of(f); - - // Find and cut the rest of the path - match &name.rfind(':') { - Some(pos) => &name[pos + 1..name.len()], - None => name, - } -} - -#[cfg(test)] -mod tests { - use crate::jit::reader::read_as_batch_jit; - use crate::jit::writer::write_batch_unchecked_jit; - use arrow::record_batch::RecordBatch; - use arrow::{array::*, datatypes::*}; - use datafusion_common::Result; - use datafusion_jit::api::Assembler; - use std::sync::Arc; - use DataType::*; - - macro_rules! fn_test_single_type { - ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { - paste::item! { - #[test] - #[allow(non_snake_case)] - fn []() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)])); - let a = $ARRAY::from($VEC); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; 1024]; - let assembler = Assembler::default(); - let row_offsets = - { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; - let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - #[allow(non_snake_case)] - fn []() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); - let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); - let a = $ARRAY::from(v); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; 1024]; - let assembler = Assembler::default(); - let row_offsets = - { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; - let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - } - }; - } - - fn_test_single_type!( - BooleanArray, - Boolean, - vec![Some(true), Some(false), None, Some(true), None] - ); - - fn_test_single_type!( - Int8Array, - Int8, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Int16Array, - Int16, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Int32Array, - Int32, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Int64Array, - Int64, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt8Array, - UInt8, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt16Array, - UInt16, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt32Array, - UInt32, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt64Array, - UInt64, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Float32Array, - Float32, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] - ); - - fn_test_single_type!( - Float64Array, - Float64, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] - ); - - fn_test_single_type!( - Date32Array, - Date32, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Date64Array, - Date64, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); -} diff --git a/datafusion/row/src/jit/reader.rs b/datafusion/row/src/jit/reader.rs deleted file mode 100644 index 5d91de7be3807..0000000000000 --- a/datafusion/row/src/jit/reader.rs +++ /dev/null @@ -1,153 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Accessing row from raw bytes with JIT - -use crate::jit::fn_name; -use crate::reader::RowReader; -use crate::reader::*; -use crate::reg_fn; -use crate::MutableRecordBatch; -use arrow::array::ArrayBuilder; -use arrow::datatypes::{DataType, Schema}; -use arrow::record_batch::RecordBatch; -use datafusion_common::{DataFusionError, Result}; -use datafusion_jit::api::Assembler; -use datafusion_jit::api::GeneratedFunction; -use datafusion_jit::ast::{I64, PTR}; -use std::sync::Arc; - -/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch - -pub fn read_as_batch_jit( - data: &[u8], - schema: Arc, - offsets: &[usize], - assembler: &Assembler, -) -> Result { - let row_num = offsets.len(); - let mut output = MutableRecordBatch::new(row_num, schema.clone()); - let mut row = RowReader::new(&schema); - register_read_functions(assembler)?; - let gen_func = gen_read_row(&schema, assembler)?; - let mut jit = assembler.create_jit(); - let code_ptr = jit.compile(gen_func)?; - let code_fn = unsafe { - std::mem::transmute::<_, fn(&RowReader, &mut MutableRecordBatch)>(code_ptr) - }; - - for offset in offsets.iter().take(row_num) { - row.point_to(*offset, data); - code_fn(&row, &mut output); - } - - output.output().map_err(DataFusionError::ArrowError) -} - -fn get_array_mut( - batch: &mut MutableRecordBatch, - col_idx: usize, -) -> &mut Box { - let arrays: &mut [Box] = batch.arrays.as_mut(); - &mut arrays[col_idx] -} - -fn register_read_functions(asm: &Assembler) -> Result<()> { - let reader_param = vec![PTR, I64, PTR]; - reg_fn!(asm, get_array_mut, vec![PTR, I64], Some(PTR)); - reg_fn!(asm, read_field_bool, reader_param.clone(), None); - reg_fn!(asm, read_field_u8, reader_param.clone(), None); - reg_fn!(asm, read_field_u16, reader_param.clone(), None); - reg_fn!(asm, read_field_u32, reader_param.clone(), None); - reg_fn!(asm, read_field_u64, reader_param.clone(), None); - reg_fn!(asm, read_field_i8, reader_param.clone(), None); - reg_fn!(asm, read_field_i16, reader_param.clone(), None); - reg_fn!(asm, read_field_i32, reader_param.clone(), None); - reg_fn!(asm, read_field_i64, reader_param.clone(), None); - reg_fn!(asm, read_field_f32, reader_param.clone(), None); - reg_fn!(asm, read_field_f64, reader_param.clone(), None); - reg_fn!(asm, read_field_date32, reader_param.clone(), None); - reg_fn!(asm, read_field_date64, reader_param.clone(), None); - reg_fn!(asm, read_field_bool_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_u8_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_u16_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_u32_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_u64_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_i8_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_i16_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_i32_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_i64_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_f32_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_f64_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_date32_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_date64_null_free, reader_param, None); - Ok(()) -} - -fn gen_read_row(schema: &Schema, assembler: &Assembler) -> Result { - use DataType::*; - let mut builder = assembler - .new_func_builder("read_row") - .param("row", PTR) - .param("batch", PTR); - let mut b = builder.enter_block(); - for (i, f) in schema.fields().iter().enumerate() { - let dt = f.data_type(); - let arr = format!("a{i}"); - b.declare_as( - &arr, - b.call("get_array_mut", vec![b.id("batch")?, b.lit_i(i as i64)])?, - )?; - let params = vec![b.id(&arr)?, b.lit_i(i as i64), b.id("row")?]; - if f.is_nullable() { - match dt { - Boolean => b.call_stmt("read_field_bool", params)?, - UInt8 => b.call_stmt("read_field_u8", params)?, - UInt16 => b.call_stmt("read_field_u16", params)?, - UInt32 => b.call_stmt("read_field_u32", params)?, - UInt64 => b.call_stmt("read_field_u64", params)?, - Int8 => b.call_stmt("read_field_i8", params)?, - Int16 => b.call_stmt("read_field_i16", params)?, - Int32 => b.call_stmt("read_field_i32", params)?, - Int64 => b.call_stmt("read_field_i64", params)?, - Float32 => b.call_stmt("read_field_f32", params)?, - Float64 => b.call_stmt("read_field_f64", params)?, - Date32 => b.call_stmt("read_field_date32", params)?, - Date64 => b.call_stmt("read_field_date64", params)?, - _ => unimplemented!(), - } - } else { - match dt { - Boolean => b.call_stmt("read_field_bool_null_free", params)?, - UInt8 => b.call_stmt("read_field_u8_null_free", params)?, - UInt16 => b.call_stmt("read_field_u16_null_free", params)?, - UInt32 => b.call_stmt("read_field_u32_null_free", params)?, - UInt64 => b.call_stmt("read_field_u64_null_free", params)?, - Int8 => b.call_stmt("read_field_i8_null_free", params)?, - Int16 => b.call_stmt("read_field_i16_null_free", params)?, - Int32 => b.call_stmt("read_field_i32_null_free", params)?, - Int64 => b.call_stmt("read_field_i64_null_free", params)?, - Float32 => b.call_stmt("read_field_f32_null_free", params)?, - Float64 => b.call_stmt("read_field_f64_null_free", params)?, - Date32 => b.call_stmt("read_field_date32_null_free", params)?, - Date64 => b.call_stmt("read_field_date64_null_free", params)?, - _ => unimplemented!(), - } - } - } - Ok(b.build()) -} diff --git a/datafusion/row/src/jit/writer.rs b/datafusion/row/src/jit/writer.rs deleted file mode 100644 index f4d4e83be1adc..0000000000000 --- a/datafusion/row/src/jit/writer.rs +++ /dev/null @@ -1,201 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Reusable JIT version of row writer backed by `Vec` to stitch attributes together - -use crate::jit::fn_name; -use crate::reg_fn; -use crate::schema_null_free; -use crate::writer::RowWriter; -use crate::writer::*; -use arrow::array::Array; -use arrow::datatypes::{DataType, Schema}; -use arrow::record_batch::RecordBatch; -use datafusion_common::Result; -use datafusion_jit::api::CodeBlock; -use datafusion_jit::api::{Assembler, GeneratedFunction}; -use datafusion_jit::ast::Expr; -use datafusion_jit::ast::{BOOL, I64, PTR}; -use std::sync::Arc; - -/// Append batch from `row_idx` to `output` buffer start from `offset` -/// # Panics -/// -/// This function will panic if the output buffer doesn't have enough space to hold all the rows -pub fn write_batch_unchecked_jit( - output: &mut [u8], - offset: usize, - batch: &RecordBatch, - row_idx: usize, - schema: Arc, - assembler: &Assembler, -) -> Result> { - let mut writer = RowWriter::new(&schema); - let mut current_offset = offset; - let mut offsets = vec![]; - register_write_functions(assembler)?; - let gen_func = gen_write_row(&schema, assembler)?; - let mut jit = assembler.create_jit(); - let code_ptr = jit.compile(gen_func)?; - - let code_fn = unsafe { - std::mem::transmute::<_, fn(&mut RowWriter, usize, &RecordBatch)>(code_ptr) - }; - - for cur_row in row_idx..batch.num_rows() { - offsets.push(current_offset); - code_fn(&mut writer, cur_row, batch); - let row_width = writer.row_width; - output[current_offset..current_offset + row_width] - .copy_from_slice(writer.get_row()); - current_offset += row_width; - writer.reset() - } - Ok(offsets) -} - -/// bench jit version write -#[inline(never)] -pub fn bench_write_batch_jit( - batches: &[Vec], - schema: Arc, -) -> Result> { - let assembler = Assembler::default(); - let mut writer = RowWriter::new(&schema); - let mut lengths = vec![]; - register_write_functions(&assembler)?; - let gen_func = gen_write_row(&schema, &assembler)?; - let mut jit = assembler.create_jit(); - let code_ptr = jit.compile(gen_func)?; - let code_fn = unsafe { - std::mem::transmute::<_, fn(&mut RowWriter, usize, &RecordBatch)>(code_ptr) - }; - - for batch in batches.iter().flatten() { - for cur_row in 0..batch.num_rows() { - code_fn(&mut writer, cur_row, batch); - lengths.push(writer.row_width); - writer.reset() - } - } - Ok(lengths) -} - -// we could remove this function wrapper once we find a way to call the trait method directly. -fn is_null(col: &Arc, row_idx: usize) -> bool { - col.is_null(row_idx) -} - -fn register_write_functions(asm: &Assembler) -> Result<()> { - let reader_param = vec![PTR, I64, PTR]; - reg_fn!(asm, RecordBatch::column, vec![PTR, I64], Some(PTR)); - reg_fn!(asm, RowWriter::set_null_at, vec![PTR, I64], None); - reg_fn!(asm, RowWriter::set_non_null_at, vec![PTR, I64], None); - reg_fn!(asm, is_null, vec![PTR, I64], Some(BOOL)); - reg_fn!(asm, write_field_bool, reader_param.clone(), None); - reg_fn!(asm, write_field_u8, reader_param.clone(), None); - reg_fn!(asm, write_field_u16, reader_param.clone(), None); - reg_fn!(asm, write_field_u32, reader_param.clone(), None); - reg_fn!(asm, write_field_u64, reader_param.clone(), None); - reg_fn!(asm, write_field_i8, reader_param.clone(), None); - reg_fn!(asm, write_field_i16, reader_param.clone(), None); - reg_fn!(asm, write_field_i32, reader_param.clone(), None); - reg_fn!(asm, write_field_i64, reader_param.clone(), None); - reg_fn!(asm, write_field_f32, reader_param.clone(), None); - reg_fn!(asm, write_field_f64, reader_param.clone(), None); - reg_fn!(asm, write_field_date32, reader_param.clone(), None); - reg_fn!(asm, write_field_date64, reader_param, None); - Ok(()) -} - -fn gen_write_row(schema: &Schema, assembler: &Assembler) -> Result { - let mut builder = assembler - .new_func_builder("write_row") - .param("row", PTR) - .param("row_idx", I64) - .param("batch", PTR); - let null_free = schema_null_free(schema); - let mut b = builder.enter_block(); - for (i, f) in schema.fields().iter().enumerate() { - let dt = f.data_type(); - let arr = format!("a{i}"); - b.declare_as( - &arr, - b.call("column", vec![b.id("batch")?, b.lit_i(i as i64)])?, - )?; - if f.is_nullable() { - b.if_block( - |c| c.call("is_null", vec![c.id(&arr)?, c.id("row_idx")?]), - |t| { - t.call_stmt("set_null_at", vec![t.id("row")?, t.lit_i(i as i64)])?; - Ok(()) - }, - |e| { - e.call_stmt( - "set_non_null_at", - vec![e.id("row")?, e.lit_i(i as i64)], - )?; - let params = vec![ - e.id("row")?, - e.id(&arr)?, - e.lit_i(i as i64), - e.id("row_idx")?, - ]; - write_typed_field_stmt(dt, e, params)?; - Ok(()) - }, - )?; - } else { - if !null_free { - b.call_stmt("set_non_null_at", vec![b.id("row")?, b.lit_i(i as i64)])?; - } - let params = vec![ - b.id("row")?, - b.id(&arr)?, - b.lit_i(i as i64), - b.id("row_idx")?, - ]; - write_typed_field_stmt(dt, &mut b, params)?; - } - } - Ok(b.build()) -} - -fn write_typed_field_stmt( - dt: &DataType, - b: &mut CodeBlock, - params: Vec, -) -> Result<()> { - use DataType::*; - match dt { - Boolean => b.call_stmt("write_field_bool", params)?, - UInt8 => b.call_stmt("write_field_u8", params)?, - UInt16 => b.call_stmt("write_field_u16", params)?, - UInt32 => b.call_stmt("write_field_u32", params)?, - UInt64 => b.call_stmt("write_field_u64", params)?, - Int8 => b.call_stmt("write_field_i8", params)?, - Int16 => b.call_stmt("write_field_i16", params)?, - Int32 => b.call_stmt("write_field_i32", params)?, - Int64 => b.call_stmt("write_field_i64", params)?, - Float32 => b.call_stmt("write_field_f32", params)?, - Float64 => b.call_stmt("write_field_f64", params)?, - Date32 => b.call_stmt("write_field_date32", params)?, - Date64 => b.call_stmt("write_field_date64", params)?, - _ => unimplemented!(), - } - Ok(()) -} diff --git a/datafusion/row/src/lib.rs b/datafusion/row/src/lib.rs index 6d00bb44c811c..a15806830fc09 100644 --- a/datafusion/row/src/lib.rs +++ b/datafusion/row/src/lib.rs @@ -39,8 +39,6 @@ pub use layout::row_supported; use std::sync::Arc; pub mod accessor; -#[cfg(feature = "jit")] -pub mod jit; pub mod layout; pub mod reader; mod validity; diff --git a/dev/release/README.md b/dev/release/README.md index fbbde87b3f3af..71ebb27493a95 100644 --- a/dev/release/README.md +++ b/dev/release/README.md @@ -279,7 +279,6 @@ of the following crates: - [datafusion-cli](https://crates.io/crates/datafusion-cli) - [datafusion-common](https://crates.io/crates/datafusion-common) - [datafusion-expr](https://crates.io/crates/datafusion-expr) -- [datafusion-jit](https://crates.io/crates/datafusion-jit) - [datafusion-physical-expr](https://crates.io/crates/datafusion-physical-expr) - [datafusion-proto](https://crates.io/crates/datafusion-proto) - [datafusion-row](https://crates.io/crates/datafusion-row) @@ -306,7 +305,6 @@ dot -Tsvg dev/release/crate-deps.dot > dev/release/crate-deps.svg (cd datafusion/common && cargo publish) (cd datafusion/expr && cargo publish) (cd datafusion/sql && cargo publish) -(cd datafusion/jit && cargo publish) (cd datafusion/row && cargo publish) (cd datafusion/physical-expr && cargo publish) (cd datafusion/optimizer && cargo publish) @@ -388,7 +386,6 @@ https://crates.io/crates/datafusion/8.0.0 https://crates.io/crates/datafusion-cli/8.0.0 https://crates.io/crates/datafusion-common/8.0.0 https://crates.io/crates/datafusion-expr/8.0.0 -https://crates.io/crates/datafusion-jit/8.0.0 https://crates.io/crates/datafusion-optimizer/8.0.0 https://crates.io/crates/datafusion-physical-expr/8.0.0 https://crates.io/crates/datafusion-proto/8.0.0 diff --git a/dev/release/crate-deps.dot b/dev/release/crate-deps.dot index a5a1db1a79e67..a2199befaf8ed 100644 --- a/dev/release/crate-deps.dot +++ b/dev/release/crate-deps.dot @@ -30,15 +30,10 @@ digraph G { datafusion_physical_expr -> datafusion_common datafusion_physical_expr -> datafusion_expr - datafusion_jit -> datafusion_common - datafusion_jit -> datafusion_expr - datafusion_row -> datafusion_common - datafusion_row -> datafusion_jit datafusion -> datafusion_common datafusion -> datafusion_expr - datafusion -> datafusion_jit datafusion -> datafusion_optimizer datafusion -> datafusion_physical_expr datafusion -> datafusion_row diff --git a/dev/release/crate-deps.svg b/dev/release/crate-deps.svg index c3fa7217dc804..f55a5fcd7b246 100644 --- a/dev/release/crate-deps.svg +++ b/dev/release/crate-deps.svg @@ -1,205 +1,175 @@ - - - + + G - + datafusion_common - -datafusion_common + +datafusion_common datafusion_expr - -datafusion_expr + +datafusion_expr datafusion_expr->datafusion_common - - + + datafusion_sql - -datafusion_sql + +datafusion_sql datafusion_sql->datafusion_common - - + + datafusion_sql->datafusion_expr - - + + datafusion_optimizer - -datafusion_optimizer + +datafusion_optimizer datafusion_optimizer->datafusion_common - - + + datafusion_optimizer->datafusion_expr - - + + datafusion_physical_expr - -datafusion_physical_expr + +datafusion_physical_expr datafusion_physical_expr->datafusion_common - - + + datafusion_physical_expr->datafusion_expr - - - - - -datafusion_jit - -datafusion_jit - - - -datafusion_jit->datafusion_common - - - - - -datafusion_jit->datafusion_expr - - + + - + datafusion_row - -datafusion_row + +datafusion_row - + datafusion_row->datafusion_common - - - - - -datafusion_row->datafusion_jit - - + + - + datafusion - -datafusion + +datafusion - + datafusion->datafusion_common - - + + - + datafusion->datafusion_expr - - + + - + datafusion->datafusion_sql - - + + - + datafusion->datafusion_optimizer - - + + - + datafusion->datafusion_physical_expr - - - - - -datafusion->datafusion_jit - - + + - + datafusion->datafusion_row - - + + - + datafusion_proto - -datafusion_proto + +datafusion_proto - + datafusion_proto->datafusion - - + + - + datafusion_substrait - -datafusion_substrait + +datafusion_substrait - + datafusion_substrait->datafusion - - + + - + datafusion_cli - -datafusion_cli + +datafusion_cli - + datafusion_cli->datafusion - - + + diff --git a/dev/release/release-crates.sh b/dev/release/release-crates.sh index 7acd9eb950ad7..658ec88b899da 100644 --- a/dev/release/release-crates.sh +++ b/dev/release/release-crates.sh @@ -32,7 +32,6 @@ if ! [ git rev-parse --is-inside-work-tree ]; then cd datafusion/common && cargo publish cd datafusion/expr && cargo publish cd datafusion/sql && cargo publish - cd datafusion/jit && cargo publish cd datafusion/row && cargo publish cd datafusion/physical-expr && cargo publish cd datafusion/optimizer && cargo publish diff --git a/dev/update_datafusion_versions.py b/dev/update_datafusion_versions.py index f61395a007c87..fd4bfadb9ed0f 100755 --- a/dev/update_datafusion_versions.py +++ b/dev/update_datafusion_versions.py @@ -33,7 +33,6 @@ 'datafusion-common': 'datafusion/common/Cargo.toml', 'datafusion-expr': 'datafusion/expr/Cargo.toml', 'datafusion-execution': 'datafusion/execution/Cargo.toml', - 'datafusion-jit': 'datafusion/jit/Cargo.toml', 'datafusion-optimizer': 'datafusion/optimizer/Cargo.toml', 'datafusion-physical-expr': 'datafusion/physical-expr/Cargo.toml', 'datafusion-proto': 'datafusion/proto/Cargo.toml',