-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Proposal] Decouple logical from physical types #11513
Comments
Thank you @notfilippo -- I think this proposal is well thought out and makes a lot of sense to me. If we were to implement it I think the benefits for DataFusion would be enormous From my perspective, the use of Arrow types in logical planning in DataFusion (e.g. type coercion) has always been a little bit of an impedance mismatch. When there were just a few variants (e.g. As Arrow evolves (e.g. to include
I think breaking changes to the API is inevitable, but I think we can mange the pain through careful API thought and deprecation. More thoughts to follow |
Thoughts on the technical details
Since
Another possibility wuld be to migate I think the biggest challenge of this project will be managing the transition from Arrow DataType to LogicalTypes. You have done a great job articulating the places that would need to be changed. I think we could manage the transition over time by for example deprecating (but leaving UDF APIs in terms of DataType)
ANother possibility would be to make a function like let input: ColumnarValue = &args[0];
// get input as one of the named types, casting if necessary
let input = input.into_one_of(&[DataType::Utf8View, DataType::Utf8])?;
match input.data_type() {
DataType::Utf8View => { /*specialized impl for StringViewArray */ },
DataType::Utf8 => { /*specialized impl for StringArray */ },
_ => unreachable!()
}
Another thing we could do is relax the requirement that the |
Initially, I planned to propose repurposing the DFSchema for this change. Still, I decided against it (at least for the first draft) because of this open question that I've noted above:
This issue originates from the fact that TableSource and TableProvider (the "native" sources of schemas) would have to return a DFSchema to include the |
This proposal makes sense to me. Thanks for driving this @notfilippo. |
I was thining that a So like if a TableProvider said it returned a I haven't looked at the code so there may be some reason this wouldn't work |
The main challenge I see is that this will be a very large project. The high-level premise of separating logical and physical types makes sense from a first-principle POV, but the cost/benefit analysis at this point is not very clear to me. The latter will probably depend on the implementation strategy and the exact architecture we adopt, so I think we should try out different ideas and approaches before committing to a strategy/architecture and accepting/rejecting/deferring the idea based on a possibly premature choice. |
AFAICT |
I was thinking that there is no fundamental difference between using Thus I was thinking we could simplify the physical implementations by not having different codepaths and this would also give us some first hand experience in how mapping Logical --> Physical types might look like |
This generally looks good. I agree that starting with making One small nit: I don't think I would lump together |
Is it possible to have the mapping of the relation between arrow's DataType and As long as there is only one
We can then easily get the Is there any type mapping that can't be done without |
I think |
This could be interesting to try -- both in terms of whether we can somehow simplify It would also be a relatively contained draft, and if the result is satisfactory, can possibly get merged in even if the full proposal is for some reason not yet doable in our current state |
I agree with this approach. I'll dive deeper next week and report back my findings. |
Why don't we have or something like #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum LogicalType {
LogicalPrimitiveType,
Date,
Time32(TimeUnit),
Time64(TimeUnit),
Timestamp(TimeUnit, Option<Arc<str>>),
Duration(TimeUnit),
Interval(IntervalUnit),
List(LogicalPhysicalFieldRef),
Struct(LogicalPhysicalFields),
Dict(LogicalPrimitiveType),
Map(LogicalPhysicalFieldRef, bool),
Decimal128(u8, i8),
Decimal256(u8, i8),
Union(LogicalUnionFields, UnionMode), // TODO: extension signatures?
// UserDefinedType
}
pub enum LogicalPrimitiveType {
Null,
Int8,
Int16,
Int32,
Int64,
UInt8,
UInt16,
UInt32,
UInt64,
Boolean,
Float16,
Float32,
Float64,
Utf8,
Binary,
} |
That is an interesting idea. I don't fully understand the implications, but if we can figure out how to avoid having to switching from |
I like the idea of separating logical types from arrow types, but it would be great to understand the exact consequences. The SQL frontend should have "very logical types". For example, we don't need Then, from DF-as-a-library perspective, physical representation becomes important.
I am concerned about deriving support for a logical type based on the support for a physical type is actually slipper slope. Let's consider an example. Let's assume i have Now, i have add_one() function that can take 64-bit integer values and add +1 to them. The +1 operation is perfectly valid operation for i64 -- it's valid for sql long/bigint type. It's valid also for my_duration(nanos), but it's not valid for my_duration(micros), since it produces unaligned value (not divisible by 1000). |
I find a previous discussion and reference it here: #7421 |
I have a question: How users specify the underlying physical types? FYI, Clickhouse exposes physical types to users like this. |
Physical Type in Datafusion is Arrow Type |
Apologies for the unclear description. I meant to ask, if we opt for logicalType, how do users then specify the physical types? This is important because in certain scenarios, only the users can determine the most suitable data types. |
@findepi -- If I understand correctly your example is about a the logical type Functions that have specialised implementations for Hypothetically, if |
@doki23 -- Through the use of a custom implementation of the Example#[derive(Debug)]
struct MyString {
logical: LogicalType,
physical: DataType,
}
impl Default for MyString {
fn default() -> Self {
Self {
logical: LogicalType::Utf8,
physical: DataType::new_list(DataType::UInt8, false),
}
}
}
impl TypeRelation for MyString {
fn logical(&self) -> &LogicalType {
&self.logical
}
fn physical(&self) -> &DataType {
&self.physical
}
// ...
} |
@wjones127 -- Noted! I was also hesitant on including the Fixed* variants into the base ones and your explanation makes sense to me. While I agree that having fixed length constraint for a list of logical types makes sense I am not convinced about FixedSizeBinaries. What would be the use case? Do you have some example in mind? |
Example uses of fixed size binary are representing arrays of data types not supported in Arrow, such as bf16 or i128 (UUID). A value with a different number of bytes would be considered invalid. |
Makes sense! Thanks for explaining – I'll edit the proposal. |
@notfilippo Are you suggesting that to utilize the CREATE TABLE lc_t
(
`id` UInt16,
`strings` LowCardinality(String)
) Using LogicalType in the SQL layer does not necessarily mean that users cannot declare the underlying datatype. It is possible to provide a special SQL syntax for this purpose. |
@notfilippo #[derive(Debug, Clone, PartialEq, Eq)]
pub struct DFSchema {
/// Inner Arrow schema reference.
inner: SchemaRef,
/// Optional qualifiers for each column in this schema. In the same order as
/// the `self.inner.fields()`
field_qualifiers: Vec<Option<TableReference>>,
/// Stores functional dependencies in the schema.
functional_dependencies: FunctionalDependencies,
} We have Arrow Schema in DFSchema, if we could get all the LogicalType based on carried arrow type, the breaking change could be really small. pub enum LogicalType { // Datafusion's primitive type
Null,
Int8,
Int16,
Int32,
Int64,
UInt8,
UInt16,
UInt32,
UInt64,
Boolean,
Float16,
Float32,
Float64,
Utf8,
Binary,
Date,
Time,
List(Box<LogicalType>),
FixedSizeList(Box<LogicalType>, usize),
.... and more
// No Arrow DataType here. Ideally only basic type that is not possible to be constructed from any other
// Any other user defined type could be built on top of df's primitive type
}
impl From<DataType> for LogicalType {
fn from() {
DataType::Utf8View => LogicalType::Utf8,
DataType::Utf8 => LogicalType::Utf8,
}
} This is the rough idea, given you have already playing around with the code, maybe you know whether this makes sense |
@jayzhan211 & @doki23 -- I'll try to answer both with a single comment: the proposal includes (you can see it where LogicalPhysicalType is defined) an implementation of the let schema = Schema::new(/* ... */);
let logical_physical_schema: LogicalPhysicalSchema = schema.into();
// Suppose that DFSchema is edited to support LogicalPhysicalSchemaRef instead of SchemaRef
let df_schema = DFSchema::try_from_qualified_schema("t1", &logical_physical_schema).unwrap(); The let string_view = DataType::StringView;
let logical_string_view: LogicalPhysicalType = string_view.into();
// logical_string_view will be:
// LogicalPhysicalType(Arc(NativeType { logical: Utf8, physical: StringView })) For user defined types instead of converting an arrow::Schema into a LogicalPhysicalSchema you would directly define a LogicalPhysicalSchema where you can use your own arrow-compatible LogicalPhysicalTypes. |
Instead of For example, |
Oh now I understand. It makes sense to do it this way but it would lose the ability of potentially adding new arrow-compatible physical encodings (e.g. logical |
Great, a step closer to it. I think we could consider this kind of case as The most importance thing is that we could keep |
Makes sense and I like this approach. The only thing I'm still not understanding is how a type source (like a TableProvider) would communicate to the logical layer that the
This would be tremendously beneficial to this proposal :) |
I think we could register the type mapping to the session state like what we register function. We define the relation from Arrow DataType to LogicalType only. We then could lookup the type relation we have and figure out that codepub enum LogicalType {
Utf8,
Float,
FixedSizeList(Box<LogicalType>, usize),
}
pub trait TypeRelation {
pub fn as_any(&self) -> &dyn Any {
self
}
pub fn get_logical_type(&self, data_type: &DataType) -> Result<LogicalType> {
_not_impl_err!("nyi")
}
}
pub struct ListOfU8AsStringType {}
impl TypeRelation for ListOfU8AsStringType {
fn get_logical_type(&self, data_type: &DataType) -> Result<LogicalType> {
match data_type {
DataType::List(field) if field.data_type() == &DataType::Utf8 => {
Ok(LogicalType::Utf8)
}
_ => _not_impl_err!("nyi")
}
}
}
pub struct GeoType {}
impl TypeRelation for GeoType {
fn get_logical_type(&self, data_type: &DataType) -> Result<LogicalType> {
match data_type {
DataType::FixedSizeList(field, 2) if field.data_type() == &DataType::Float32 => {
Ok(LogicalType::FixedSizeList(Box::new(LogicalType::Float), 2))
}
_ => _not_impl_err!("nyi")
}
}
}
pub struct DatafusionBuiltinType {}
impl TypeRelation for DatafusionBuiltinType {
fn get_logical_type(&self, data_type: &DataType) -> Result<LogicalType> {
match data_type {
DataType::Utf8View => {
Ok(LogicalType::Utf8)
}
_ => _not_impl_err!("nyi")
}
}
}
// Ideally function in logical layer care about LogicalType only, function in physical layer care about ArrowDataType only.
fn any_function(udt: Arc<dyn TypeRelation>, data_type: DataType) -> Result<()> {
let logical_type = udt.get_logical_type(&data_type)?;
match data_type {
LogicalType::FixedSizeList(inner_type, size) => {
Ok(())
}
_ => todo!()
}
}
impl SessionState {
fn register_type_relation(&self) -> Result<()> {
// Similar to the idea of register function, so we can have user-defined type relation (mapping).
self.register(DatafusionBuiltinType)
self.register(ListOfU8AsStringType)
self.register(GeoType)
Ok(())
}
}
impl TableProvider for MyTable {
fn some_func(&self, state: CatalogSessions) {
let udt: Arc<dyn TypeRelation> = self.get_type_relation();
let dfschema = DFSchema::empty();
let expr: Expr;
let data_type: arrow::DataType = expr.get_type(&dfschema);
any_funcion(udf, data_type)
}
} |
This past week I've started some work on transitioning Scalar Types to being strictly logical. Most of the refactoring work is done and now I'm reworking some of the casting logic to make execution work again. Will report back soon! |
Sorry late to the party. Very nice and well thought out proposal. But I'm a little confused about the role of |
I'm 👍 👍 👍 for this. I'd loke to use a logic time (backed by variable length binary) to encode JSON-like data in a more efficient format, but make sure it's still displayed as JSON or some other human-readable format. |
👋 I opened a draft PR ^ to make ScalarValue logical. I have a bunch of open questions that I would be very happy to get feedback on. |
Abstract
Logical types are an abstract representation of data that emphasises structure without regard for physical implementation, while physical types are tangible representations that describe how data will be stored, accessed, and retrieved.
Currently the type model in DataFusion, both in logical and physical plans, relies purely on arrow’s DataType enum. But whilst this choice makes sense its physical execution engine (DataTypes map 1:1 with the physical array representation, defining implicit rules for storage, access, and retrieval), it has flaws when dealing with its logical plans. This is due to the fact that some logically equivalent array types in the Arrow format have very different DataTypes – for example a logical string array can be represented in the Arrow array of DataType
Utf8
,Dictionary(Utf8)
,RunEndEncoded(Utf8)
, andStringView
(without mentioning the different indexes types that can be specified for dictionaries and REE arrays).This proposal evaluates possible solutions for decoupling the notion of types in DataFusion’s logical plans with DataTypes, evaluating their impact on DataFusion itself and on downstream crates.
Goals
Proposal
Defining a logical type
To define the list of logical types we must first take a look at the physical representation of the engine: the Arrow columnar format. DataTypes are the physical types of the DataFusion engine and they define storage and access pattern for buffers in the Arrow format.
Looking at a list of the possible DataTypes it's clear that while some map 1:1 with their logical representation other also specify information about the encoding (e.g.
Large*
,FixedSize*
,Dictionary
,RunEndEncoded
...). The latter must be consolidate into what they represent, discarding the encoding information and, in general, types that can store different ranges of values should be different logical types. (ref).What follows is a list of DataTypes and how would they map to their respective logical type following the rules above:
Null
Null
Boolean
Boolean
Int8
Int8
Int16
Int16
Int32
Int32
Int64
Int64
UInt8
UInt8
UInt16
Uint16
UInt32
UInt32
UInt64
UInt64
Float16
Float16
Float32
Float32
Float64
Float64
Timestamp(unit, tz)
Timestamp(unit, tz)
Date32
Date
Date64
Date
Date64
doesn't actually provide more precision. (docs)Time32(unit)
Time32(unit)
Time64(unit)
Time64(unit)
Duration(unit)
Duration(uint)
Interval(unit)
Interval(unit)
Binary
Binary
FixedSizeBinary(size)
Binary
LargeBinary
Binary
BinaryView
Binary
Utf8
Utf8
LargeUtf8
Utf8
Utf8View
Utf8
List(field)
List(field)
ListView(field)
List(field)
FixedSizeList(field, size)
List(field)
LargeList(field)
List(field)
LargeListView(field)
List(field)
Struct(fields)
Struct(fields)
Union(fields, mode)
Union(fields)
Dictionary(index_type, data_type)
data_type
, converted to logical typeDecimal128(precision, scale)
Decimal128(precision, scale)
Decimal256(precision, scale)
Decimal256(precision, scale)
Map(fields, sorted)
Map(fields, sorted)
RunEndEncoded(run_ends_type, data_type)
data_type
, converted to logical typeUser defined types
User defined physical types
The Arrow columnar format provides guidelines to define Extension types though the composition of native DataTypes and custom metadata in fields. Since this proposal already includes a mapping from DataType to logical type we could extend it to support user defined types (through extension types) which would map to a known logical type.
For example an extension type with the DataType of
List(UInt8)
and a custom metadata field{'ARROW:extension:name': 'myorg.mystring'}
could have a logical type ofUtf8
.User defined logical types
Arrow extension types can also be used to extend the list of supported logical types. An additional logical type called
Extension
could be introduced. This extension type would contain a structure detailing its logical type and the extension type metadata.Boundaries of logical and physical types
In plans and expressions
As the prefix suggests, logical types should be used exclusively in logical plans (LogicalPlan and Expr) while physical types should be used exclusively in physical plans (ExecutionPlan and PhysicalExpr). This would enable logical plans to be purely logical, without worrying about underlying encodings.
Expr in logical plans would need to represent their resulting value as logical types through the trait method ExprSchemable::get_type, which would need to return a logical type instead.
In functions
ScalarUDF, WindowUDF, and AggregateUDF all define their Signatures through the use of DataTypes. Function arguments are currently validated against signatures through type coercion during logical planning. With logical types Signatures would be expressed without the need to specify the underlying encoding. This would simplify the type coercion rules, removing the need of traversing dictionaries and handling different containers and focusing instead on explicit logical rules (e.g. all logical types can be coerced to
Utf8
).During execution the function receives a slice of ColumnarValue that is guaranteed to match the signature. Being strictly a physical operation, the function will have to deal with physical types. ColumnarValue enum could be extended so that functions could choose to provide their own optimised implementation for a subset of physical types and then fall back to a generic implementation that materialises the argument to known physical type. This would potentially allow native functions to support user defined physical types that map to known logical types.
In substrait
The
datafusion_substrait
crate provides helper functions to enable interoperability between substrait plans and datafusion's plan. While some effort has been made to support converting from / to DataTypes viatype_variation_reference
(example here), dictionaries and not supported as both literal types and cast types, leading to potential errors when trying to encode a valid LogicalPlan) into a substrait plan. The usage of logical types would enable a more seamless transition between DataFusion's native logical plan and substrait.Keeping track of the physical type
While logical types simplify the list of possible types that can be handled during logical planning, the relation to their underlying physical representation needs to be accounted for when transforming the logical plan into nested ExecutionPlan and PhysicalExpr which will dictate how will the query execute.
This proposal introduces a new trait that represents the link between a logical type and its underlying physical representation:
While
NativeType
would be primarily used for standard DataTypes and their logical relation,TypeRelation
is defined to provide support for used defined physical types.What follows is an exploration of the areas in which
LogicalPhysicalType
would need to get introduced:A new type of Schema
To support the creation of
LogicalPhysicalType
a new schema must be introduced, which can be consumed as either a logical schema or used to access the underlying physical representation. Currently DFSchema is used throughout DataFusion as a thin wrapper for Arrow's native Schema in order to qualify fields originating from potentially different tables. This proposal suggest to decouple the DFSchema from its underlying Schema and instead adopt a new Schema-compatible structure (LogicalPhysicalSchema
) but with DataTypes replaced byLogicalPhysicalType
. This would also mean the introduction of new Field-compatible structure (LogicalPhysicalField
) which also supportsLogicalPhysicalType
instead of Arrow's native Field DataType.DFSchema would be used by DataFusion's planning and execution engine to derive either logical or physical type information of each field. It should retain the current interoperability with Schema (and additionally the new
LogicalPhysicalSchema
) allowing easyInto
&From
conversion.Type sources
Types in plans sourced through Arrow's native Schema returned by implementations of TableSource / TableProvider , variables DataTypes returned by VarProvider , and ScalarValue. To allow definition of custom
LogicalPhysicalType
these type sources should be edited to returnLogicalPhysicalSchema
/LogicalPhysicalType
.Tables
For tables a non-breaking way of editing the trait to support
LogicalPhysicalSchema
could be:logical_physical_schema() -> LogicalPhysicalSchema
, this method's default implementation calls theschema()
and converts it toLogicalPhysicalSchema
without introducing any customLogicalPhysicalType
. Implementers are free to override this method and add customLogicalPhysicalType
.schema()
method to returnimpl Into<LogicalPhysicalSchema>
.VarProvider
VarProvider needs to be edited in order to return a
LogicalPhysicalType
when getting the type of the variable, while the actual variable can very well remain aScalarValue
.ScalarValue
ScalarValue should be wrapped in order to have a way of retrieving both its logical and its underlying physical type. When reasoning about logical plans it should be treated as its logical type while its physical properties should be accessed exclusively by the physical plan.
Physical execution
For physical planning and execution, much like the invocation of UDFs, ExecutionPlan and PhysicalExpr must also be granted access to the
LogicalPhysicalType
in order to have the capabilities of performing optimised execution for a subset of supported physical types and then fall back to a generic implementation that materialises other types to known physical type. This can be achieved by substituting the use DataType and Schema with, respectively,LogicalPhysicalType
andLogicalPhysicalSchema
.Impact on downstream dependencies
Care must be put in place not to introduce breaking changes for downstream crates and dependencies that build on top of DataFusion.
The most impactful changes introduced by this proposal are the
LogicalPhysicalType
,LogicalPhysicalSchema
andLogicalPhysicalField
types. These structures would replace most of the mentions of DataType, Schema and Field in the DataFusion codebase. Type sources (TableProvider / TableSource, VarProvider, and ScalarValue) and Logical / ExecutionPlan nodes would be greatly affected by this change. This effect can be mitigated by providing goodInto
&From
implementations for the new types and providing editing existing function arguments and return types asimpl Into<LogicalPhysical*>
, but it will still break a lot of things.Case study: datafusion-comet
datafusion-comet
is a high-performance accelerator for Apache Spark, built on top of DataFusion. A fork of the project containing changes from this proposal currently compiles without modifications. As more features in this proposal are implemented, namely UDFs Logical Signature, some refactoring might be required (e.g forCometScalarFunction
and other functions defined in the codebase). Refer to the draft's TODOs to see what's missing.Non-goals topics that might be interesting to dive into
While not the primary goals of this proposal, here are some interesting topics that could be explored in the future:
RecordBatches with same logical type but different physical types
Integrating
LogicalPhysicalSchema
into DataFusion's RecordBatches, streamed from one ExecutionPlan to the other, could be an interesting approach to support the possibility of two record batches having logically equivalent schemas with different underlying physical types. This could be useful in situations where data, stored in multiple pages mapping 1:1 with RecordBatches, is encoded with different strategies based on the density of the rows and the cardinality of the values.Draft implementation
The draft work can be tracked via #11160.
To Do
The text was updated successfully, but these errors were encountered: