Skip to content

Commit e4b17cb

Browse files
timsauceralamb
authored andcommitted
Initial commit of UDWF via FFI
1 parent 5d3ed9c commit e4b17cb

File tree

6 files changed

+894
-1
lines changed

6 files changed

+894
-1
lines changed

datafusion/ffi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub mod table_source;
3737
pub mod udaf;
3838
pub mod udf;
3939
pub mod udtf;
40+
pub mod udwf;
4041
pub mod util;
4142
pub mod volatility;
4243

datafusion/ffi/src/udwf/mod.rs

Lines changed: 373 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,373 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{ffi::c_void, sync::Arc};
19+
20+
use abi_stable::{
21+
std_types::{ROption, RResult, RString, RVec},
22+
StableAbi,
23+
};
24+
use arrow::datatypes::Schema;
25+
use arrow::{
26+
compute::SortOptions,
27+
datatypes::{DataType, SchemaRef},
28+
};
29+
use datafusion::{
30+
error::DataFusionError,
31+
logical_expr::{
32+
function::WindowUDFFieldArgs,
33+
type_coercion::functions::data_types_with_window_udf, PartitionEvaluator,
34+
},
35+
};
36+
use datafusion::{
37+
error::Result,
38+
logical_expr::{Signature, WindowUDF, WindowUDFImpl},
39+
};
40+
use partition_evaluator::{FFI_PartitionEvaluator, ForeignPartitionEvaluator};
41+
use partition_evaluator_args::{
42+
FFI_PartitionEvaluatorArgs, ForeignPartitionEvaluatorArgs,
43+
};
44+
mod partition_evaluator;
45+
mod partition_evaluator_args;
46+
mod range;
47+
48+
use crate::{
49+
arrow_wrappers::WrappedSchema,
50+
df_result, rresult, rresult_return,
51+
util::{rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped},
52+
volatility::FFI_Volatility,
53+
};
54+
55+
/// A stable struct for sharing a [`WindowUDF`] across FFI boundaries.
56+
#[repr(C)]
57+
#[derive(Debug, StableAbi)]
58+
#[allow(non_camel_case_types)]
59+
pub struct FFI_WindowUDF {
60+
/// FFI equivalent to the `name` of a [`WindowUDF`]
61+
pub name: RString,
62+
63+
/// FFI equivalent to the `aliases` of a [`WindowUDF`]
64+
pub aliases: RVec<RString>,
65+
66+
/// FFI equivalent to the `volatility` of a [`WindowUDF`]
67+
pub volatility: FFI_Volatility,
68+
69+
pub partition_evaluator:
70+
unsafe extern "C" fn(
71+
udwf: &Self,
72+
args: FFI_PartitionEvaluatorArgs,
73+
) -> RResult<FFI_PartitionEvaluator, RString>,
74+
75+
pub field: unsafe extern "C" fn(
76+
udwf: &Self,
77+
input_types: RVec<WrappedSchema>,
78+
display_name: RString,
79+
) -> RResult<WrappedSchema, RString>,
80+
81+
/// Performs type coersion. To simply this interface, all UDFs are treated as having
82+
/// user defined signatures, which will in turn call coerce_types to be called. This
83+
/// call should be transparent to most users as the internal function performs the
84+
/// appropriate calls on the underlying [`WindowUDF`]
85+
pub coerce_types: unsafe extern "C" fn(
86+
udf: &Self,
87+
arg_types: RVec<WrappedSchema>,
88+
) -> RResult<RVec<WrappedSchema>, RString>,
89+
90+
pub schema: unsafe extern "C" fn(udwf: &Self) -> WrappedSchema,
91+
92+
pub sort_options: ROption<FFI_SortOptions>,
93+
94+
/// Used to create a clone on the provider of the udf. This should
95+
/// only need to be called by the receiver of the udf.
96+
pub clone: unsafe extern "C" fn(udf: &Self) -> Self,
97+
98+
/// Release the memory of the private data when it is no longer being used.
99+
pub release: unsafe extern "C" fn(udf: &mut Self),
100+
101+
/// Internal data. This is only to be accessed by the provider of the udf.
102+
/// A [`ForeignWindowUDF`] should never attempt to access this data.
103+
pub private_data: *mut c_void,
104+
}
105+
106+
unsafe impl Send for FFI_WindowUDF {}
107+
unsafe impl Sync for FFI_WindowUDF {}
108+
109+
pub struct WindowUDFPrivateData {
110+
pub udf: Arc<WindowUDF>,
111+
pub schema: SchemaRef,
112+
}
113+
114+
impl FFI_WindowUDF {
115+
unsafe fn inner(&self) -> &Arc<WindowUDF> {
116+
let private_data = self.private_data as *const WindowUDFPrivateData;
117+
&(*private_data).udf
118+
}
119+
120+
unsafe fn inner_schema(&self) -> SchemaRef {
121+
let private_data = self.private_data as *const WindowUDFPrivateData;
122+
Arc::clone(&(*private_data).schema)
123+
}
124+
}
125+
126+
unsafe extern "C" fn partition_evaluator_fn_wrapper(
127+
udwf: &FFI_WindowUDF,
128+
args: FFI_PartitionEvaluatorArgs,
129+
) -> RResult<FFI_PartitionEvaluator, RString> {
130+
let inner = udwf.inner();
131+
132+
let args = rresult_return!(ForeignPartitionEvaluatorArgs::try_from(args));
133+
134+
let evaluator = rresult_return!(inner.partition_evaluator_factory((&args).into()));
135+
136+
RResult::ROk(evaluator.into())
137+
}
138+
139+
unsafe extern "C" fn field_fn_wrapper(
140+
udwf: &FFI_WindowUDF,
141+
input_types: RVec<WrappedSchema>,
142+
display_name: RString,
143+
) -> RResult<WrappedSchema, RString> {
144+
let inner = udwf.inner();
145+
146+
let input_types = rresult_return!(rvec_wrapped_to_vec_datatype(&input_types));
147+
148+
let field = rresult_return!(
149+
inner.field(WindowUDFFieldArgs::new(&input_types, display_name.as_str()))
150+
);
151+
152+
let schema = Arc::new(Schema::new(vec![field]));
153+
154+
RResult::ROk(WrappedSchema::from(schema))
155+
}
156+
157+
unsafe extern "C" fn coerce_types_fn_wrapper(
158+
udwf: &FFI_WindowUDF,
159+
arg_types: RVec<WrappedSchema>,
160+
) -> RResult<RVec<WrappedSchema>, RString> {
161+
let inner = udwf.inner();
162+
163+
let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types));
164+
165+
let return_types = rresult_return!(data_types_with_window_udf(&arg_types, inner));
166+
167+
rresult!(vec_datatype_to_rvec_wrapped(&return_types))
168+
}
169+
170+
unsafe extern "C" fn schema_fn_wrapper(udwf: &FFI_WindowUDF) -> WrappedSchema {
171+
let schema = udwf.inner_schema();
172+
173+
schema.into()
174+
}
175+
176+
unsafe extern "C" fn release_fn_wrapper(udwf: &mut FFI_WindowUDF) {
177+
let private_data = Box::from_raw(udwf.private_data as *mut WindowUDFPrivateData);
178+
drop(private_data);
179+
}
180+
181+
unsafe extern "C" fn clone_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_WindowUDF {
182+
// let private_data = udf.private_data as *const WindowUDFPrivateData;
183+
// let udf_data = &(*private_data);
184+
185+
// let private_data = Box::new(WindowUDFPrivateData {
186+
// udf: Arc::clone(&udf_data.udf),
187+
// });
188+
let private_data = Box::new(WindowUDFPrivateData {
189+
udf: Arc::clone(udwf.inner()),
190+
schema: udwf.inner_schema(),
191+
});
192+
193+
FFI_WindowUDF {
194+
name: udwf.name.clone(),
195+
aliases: udwf.aliases.clone(),
196+
volatility: udwf.volatility.clone(),
197+
partition_evaluator: partition_evaluator_fn_wrapper,
198+
sort_options: udwf.sort_options.clone(),
199+
schema: schema_fn_wrapper,
200+
coerce_types: coerce_types_fn_wrapper,
201+
field: field_fn_wrapper,
202+
clone: clone_fn_wrapper,
203+
release: release_fn_wrapper,
204+
private_data: Box::into_raw(private_data) as *mut c_void,
205+
}
206+
}
207+
208+
impl Clone for FFI_WindowUDF {
209+
fn clone(&self) -> Self {
210+
unsafe { (self.clone)(self) }
211+
}
212+
}
213+
214+
impl FFI_WindowUDF {
215+
pub fn new(udf: Arc<WindowUDF>, schema: SchemaRef) -> Self {
216+
let name = udf.name().into();
217+
let aliases = udf.aliases().iter().map(|a| a.to_owned().into()).collect();
218+
let volatility = udf.signature().volatility.into();
219+
let sort_options = udf.sort_options().map(|v| (&v).into()).into();
220+
221+
let private_data = Box::new(WindowUDFPrivateData { udf, schema });
222+
223+
Self {
224+
name,
225+
aliases,
226+
volatility,
227+
partition_evaluator: partition_evaluator_fn_wrapper,
228+
sort_options,
229+
schema: schema_fn_wrapper,
230+
coerce_types: coerce_types_fn_wrapper,
231+
field: field_fn_wrapper,
232+
clone: clone_fn_wrapper,
233+
release: release_fn_wrapper,
234+
private_data: Box::into_raw(private_data) as *mut c_void,
235+
}
236+
}
237+
}
238+
239+
impl Drop for FFI_WindowUDF {
240+
fn drop(&mut self) {
241+
unsafe { (self.release)(self) }
242+
}
243+
}
244+
245+
/// This struct is used to access an UDF provided by a foreign
246+
/// library across a FFI boundary.
247+
///
248+
/// The ForeignWindowUDF is to be used by the caller of the UDF, so it has
249+
/// no knowledge or access to the private data. All interaction with the UDF
250+
/// must occur through the functions defined in FFI_WindowUDF.
251+
#[derive(Debug)]
252+
pub struct ForeignWindowUDF {
253+
name: String,
254+
aliases: Vec<String>,
255+
udf: FFI_WindowUDF,
256+
signature: Signature,
257+
}
258+
259+
unsafe impl Send for ForeignWindowUDF {}
260+
unsafe impl Sync for ForeignWindowUDF {}
261+
262+
impl TryFrom<&FFI_WindowUDF> for ForeignWindowUDF {
263+
type Error = DataFusionError;
264+
265+
fn try_from(udf: &FFI_WindowUDF) -> Result<Self, Self::Error> {
266+
let name = udf.name.to_owned().into();
267+
let signature = Signature::user_defined((&udf.volatility).into());
268+
269+
let aliases = udf.aliases.iter().map(|s| s.to_string()).collect();
270+
271+
Ok(Self {
272+
name,
273+
udf: udf.clone(),
274+
aliases,
275+
signature,
276+
})
277+
}
278+
}
279+
280+
impl WindowUDFImpl for ForeignWindowUDF {
281+
fn as_any(&self) -> &dyn std::any::Any {
282+
self
283+
}
284+
285+
fn name(&self) -> &str {
286+
&self.name
287+
}
288+
289+
fn signature(&self) -> &Signature {
290+
&self.signature
291+
}
292+
293+
fn aliases(&self) -> &[String] {
294+
&self.aliases
295+
}
296+
297+
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
298+
unsafe {
299+
let arg_types = vec_datatype_to_rvec_wrapped(arg_types)?;
300+
let result_types = df_result!((self.udf.coerce_types)(&self.udf, arg_types))?;
301+
Ok(rvec_wrapped_to_vec_datatype(&result_types)?)
302+
}
303+
}
304+
305+
fn partition_evaluator(
306+
&self,
307+
args: datafusion::logical_expr::function::PartitionEvaluatorArgs,
308+
) -> Result<Box<dyn PartitionEvaluator>> {
309+
let evaluator = unsafe {
310+
let schema = (self.udf.schema)(&self.udf);
311+
let args = FFI_PartitionEvaluatorArgs::new(args, schema.into())?;
312+
(self.udf.partition_evaluator)(&self.udf, args)
313+
};
314+
315+
df_result!(evaluator).map(|evaluator| {
316+
Box::new(ForeignPartitionEvaluator::from(evaluator))
317+
as Box<dyn PartitionEvaluator>
318+
})
319+
}
320+
321+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<arrow::datatypes::Field> {
322+
unsafe {
323+
let input_types = vec_datatype_to_rvec_wrapped(field_args.input_types())?;
324+
let schema = df_result!((self.udf.field)(
325+
&self.udf,
326+
input_types,
327+
field_args.name().into()
328+
))?;
329+
let schema: SchemaRef = schema.into();
330+
331+
match schema.fields().is_empty() {
332+
true => Err(DataFusionError::Execution(
333+
"Unable to retrieve field in WindowUDF via FFI".to_string(),
334+
)),
335+
false => Ok(schema.field(0).to_owned()),
336+
}
337+
}
338+
}
339+
340+
fn sort_options(&self) -> Option<SortOptions> {
341+
let options: Option<&FFI_SortOptions> = self.udf.sort_options.as_ref().into();
342+
options.map(|s| s.into())
343+
}
344+
}
345+
346+
#[repr(C)]
347+
#[derive(Debug, StableAbi, Clone)]
348+
#[allow(non_camel_case_types)]
349+
pub struct FFI_SortOptions {
350+
pub descending: bool,
351+
pub nulls_first: bool,
352+
}
353+
354+
impl From<&SortOptions> for FFI_SortOptions {
355+
fn from(value: &SortOptions) -> Self {
356+
Self {
357+
descending: value.descending,
358+
nulls_first: value.nulls_first,
359+
}
360+
}
361+
}
362+
363+
impl From<&FFI_SortOptions> for SortOptions {
364+
fn from(value: &FFI_SortOptions) -> Self {
365+
Self {
366+
descending: value.descending,
367+
nulls_first: value.nulls_first,
368+
}
369+
}
370+
}
371+
372+
#[cfg(test)]
373+
mod tests {}

0 commit comments

Comments
 (0)