Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove LargeUtf8|Binary, Utf8|BinaryView, and Dictionary from ScalarValue #11978

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

notfilippo
Copy link

@notfilippo notfilippo commented Aug 14, 2024

This PR removes LargeUtf8|Binary, Utf8|BinaryView, and Dictionary from ScalarValue, following the discussion on #11513


Open questions

Top level ScalarValue cast

This change initially failed the select_arrow_cast test (see review comments for updates). The test fails because of an interaction of expression_simplifier + optimize_projections.

The query is: SELECT arrow_cast(1234, 'Float64') as f64, arrow_cast('foo', 'LargeUtf8') as large

  1. expression_simplifier correctly tries to evaluate_to_scalar the arrow_cast('foo', 'LargeUtf8') cast (here)
  2. this leads to the cast physical expression (here), but this is a no-op since it transforms Utf8('foo') (scalar) → LargeUtf8('foo') (array) → Utf8('foo') (scalar)
    • TODO: if this is what we want this should probably not execute at all
  3. This edit doesn't change the underlying schema of the LogicalPlan
  4. optimize_projections rewrites the Projection and updates the schema, seeing the Utf8('foo') it correctly assumes that the LogicalPlan's schema field for this value should have DataType == Utf8

This check is the one raising this error but I guess it should instead check if schema fields are logically equivalent to eachother. I'm not totally convinced this is the correct solution because it removes some guarantees that might be expected by users downstream. Happy to hear everyone's opinion on this.

---- optimizer::select_arrow_cast stdout ----
thread 'optimizer::select_arrow_cast' panicked at datafusion/core/tests/optimizer/mod.rs:118:30:
called `Result::unwrap()` on an `Err` value: Context("Optimizer rule 'optimize_projections' failed", Context("optimize_projections", Internal("Failed due to a difference in schemas, original schema: DFSchema { inner: Schema { fields: [Field { name: \"f64\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"large\", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [None, None], functional_dependencies: FunctionalDependencies { deps: [] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"f64\", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"large\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [None, None], functional_dependencies: FunctionalDependencies { deps: [] } }")))
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

arrow_typeof

https://github.com/apache/datafusion/blob/main/datafusion/functions/src/core/arrowtypeof.rs#L59-L72 uses column_value.data_type() to determine the type of the argument but this information is not really accurate. If the ColumnValue is a ScalarValue the data_type() will be "logical". e.g. arrow_typeof(arrow_cast('hello', 'Utf8View')) would yield Utf8.

Type info

Let's take this expr as an example a_function_that_takes_utf8_view(arrow_cast('test', 'Utf8View'))
the cast expression currently evaluates to a ColumnValue::Scalar(Utf8View("test")) and the function is happy to receive that. With this change the cast expression instead evaluates to ColumnValue::Scalar(Utf8("test")) (as ScalarValue::Utf8View doesn't exist it produces a logically equal value) and the cast expression data_type() returns Utf8View.

  • What would the function do?
  • How can we communicate to the parent expression in the expression tree that the column value needs to be of that particular type? (it should know because of the schema)

@github-actions github-actions bot added sql SQL Planner physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate substrait common Related to common crate proto Related to proto crate functions labels Aug 14, 2024
notfilippo

This comment was marked as resolved.

}

fn create_output_array(val: &ScalarValue, len: usize) -> Result<ArrayRef> {
// TODO(@notfilippo): should we reintroduce a way to encode as dictionaries?
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm missing some context for partitions so I'm not really sure if it's the right call to remove the Dictionary output array. IIUC it seems like it's very useful to save space and optimise columns added this way.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Aug 14, 2024
02)--Filter: test.column1_utf8view = Utf8View("Andrew")
02)--Filter: test.column1_utf8view = CAST(Utf8("Andrew") AS Utf8View)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Casts are not simplified to retain the type information to fix the optimize_projection issue above.

datafusion/common/src/scalar/mod.rs Outdated Show resolved Hide resolved
let col_val = match phys_expr.evaluate(&self.input_batch) {
Ok(v) => v,
Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err, expr),
};

// TODO(@notfilippo): a fix for the select_arrow_cast error
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check partially fixes the issue with select_arrow_cast.

datafusion/physical-plan/src/projection.rs Outdated Show resolved Hide resolved
datafusion/common/src/scalar/mod.rs Outdated Show resolved Hide resolved
datafusion/common/src/scalar/mod.rs Outdated Show resolved Hide resolved
@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 15, 2024

statement ok
create table test_source as values
    ('Andrew', 'X'),
    ('Xiangpeng', 'Xiangpeng'),
    ('Raphael', 'R'),
    (NULL, 'R')
;

statement ok
create table t as
SELECT
  arrow_cast(column1, 'Utf8') as column1_utf8,
  arrow_cast(column2, 'Utf8') as column2_utf8,
  arrow_cast(column1, 'LargeUtf8') as column1_large_utf8,
  arrow_cast(column2, 'LargeUtf8') as column2_large_utf8,
  arrow_cast(column1, 'Utf8View') as column1_utf8view,
  arrow_cast(column2, 'Utf8View') as column2_utf8view,
  arrow_cast(column1, 'Dictionary(Int32, Utf8)') as column1_dict,
  arrow_cast(column2, 'Dictionary(Int32, Utf8)') as column2_dict
FROM test_source;

query error DataFusion error: Arrow error: Invalid argument error: column types must match schema types, expected Utf8View but found Utf8 at column index 0
select min(column1_utf8view) from t;

I run this test and fail. It success on main branch.

I guess we need schema for converting ScalarValue to ArrayRef

Given that we have ScalarValue::Utf8, and we have StringView in schema. We can then get the corresponding StrginViewArray.

@notfilippo
Copy link
Author

Given that we have ScalarValue::Utf8, and we have StringView in schema. We can then get the corresponding StrginViewArray.

I feel like this is a little bit more complex because casting a scalar value to a logically equal type (arrow_cast('test', 'Dictionary(Int32, Utf8)')) is a no-op for all cases but one the cast is at the top level, which indicates that you actually want an array of that type. So we need a way to distinguish those two scenarios, which is why I've added this:

.and_then(|v| v.into_array_of_type(batch.num_rows(), field.data_type()))

But I'm not really sure about it...

@jayzhan211
Copy link
Contributor

Since you have no commit yet, so CI is not running.

You could run these command to pass CI
cargo test --test sqllogictests (slt files)
./dev/rust_lint.sh
cargo test --lib --tests --bins --features avro,json (rust test)

@notfilippo
Copy link
Author

Since you have no commit yet, so CI is not running.

You could run these command to pass CI cargo test --test sqllogictests (slt files) ./dev/rust_lint.sh cargo test --lib --tests --bins --features avro,json (rust test)

Thanks! Don't worry about turning it on as I can still get results from my fork.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 17, 2024

I wonder if we could do some sort of hybrid approach where ScalarValue still retains a DataType, type, but it isn't encoded in the variant

like

enum ScalarValue {
  ...
  /// A logical string value, with the specified data type
  ScalarValue::Utf8(String,  DataType) ,
}

That way you would still have the DataType for each ScalarValue 🤔

This should be the last resort if we don't have any other way to figure the actual target type to cast to. Otherwise, IMO this is just a code simplification (Still a nice step forward) but not the goal of decoupling types #11513 , since we still hide the physical type inside ScalarValue.

I think it is possible to get the DataType from Schema without it passed along the logical layer 🤔

@alamb
Copy link
Contributor

alamb commented Aug 17, 2024

Otherwise, IMO this is just a code simplification (Still a nice step forward) but not the goal of decoupling types #11513 ,

Yes I agree with this assesment

🤔 so that leaves us with the question of how to incrementally try and introduce logical types into logical plans 🤔

@notfilippo
Copy link
Author

notfilippo commented Aug 17, 2024

🤔 so that leaves us with the question of how to incrementally try and introduce logical types into logical plans 🤔

As @jayzhan211 says, the type information is still retained in the Schema, but it would be impractical to modify all expression orders to support the fact that the type is not stored in the ColumnarValue.

My discussion around Datum revolves around the fact that to retain physical type information maybe ColumnarValue enum should instead be:

enum ColumnarValue {
	Array(Arc<dyn Array>),
	Scalar(Arc<dyn Array>) // this is an array of len = 1
}

(edit: this possibility was already discussed #7353)

or that ScalarValue is kept as is and a LogicalScalarValue is introduced.

I think that the main problem is that scalar values in this change are purely logical. Instead, if in the future we introduce logical types for arrays, we would have a physical type that is distinct from its logical representation and that we can use during physical execution (e.g. that can be cast to another physical type without losing its type information).

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 17, 2024

Yeah, I think Datum is actually what we need. After casting the array, we don't need to convert it back to ScalarValue, it has additional conversion cost and the physical type information is lost too. Ideally we could replace ColumnarValue with Datum, but it seems the change is quite large.

We could try replacing ScalarValue with Scalar.

enum ColumnarValue {
	Array(Arc<dyn Array>),
	Scalar(arrow_array::Scalar)
}

#7353 is talking about changing something like ScalarValue::Int64(i64) to ScalarValue::Int64(arrow_array::Scalar), I don't think we require the change for this PR

Upd: If there is performance concern, a conservative change is to keep the ScalarValue.It depends on how we rely on ColumnarValue::Scalar and whether they are performance critical. I'm not sure about it too 🤔

enum ColumnarValue {
	Array(Arc<dyn Array>),
        ScalarArray(arrow_array::Scalar)
	Scalar(ScalarValue)
}

@github-actions github-actions bot added the logical-expr Logical plan and expressions label Aug 19, 2024
@notfilippo
Copy link
Author

I've pushed a fairly big experiment. I've tried to change ColumnarValue to

#[derive(Clone, Debug)]
pub enum ColumnarValue {
    /// Array of values
    Array(ArrayRef),
    /// A single value
    Scalar(ScalarValue),
    Scalar(Scalar),
}

#[derive(Clone, Debug)]
pub struct Scalar {
    value: ScalarValue,
    data_type: DataType,
}

which follows the approache that was discussed with @jayzhan211 in the comments above. I've opted for this hybrid solution to retain most of the flexibility of the original ColumnarValue and I'm mostly satisfied with how it turned out. Curious to hear your thoughts @jayzhan211 and @alamb

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 20, 2024

It seems the trick you did is to get the first index of ArrayRef (instead of keeping it as arrow::Scalar) as ScalarValue but we still ends up require DataType to keep the type information.

However, I think we could move on with this approach, we could figure out if there is better approach later on

@notfilippo
Copy link
Author

notfilippo commented Aug 20, 2024

I'm happy to report that I've got most sqllogictests to run successfully (albeit there is still the issue pointed out by @alamb, which i plan to address after I've got all tests passing). The only errors I'm seeing are the following:

Aggregates using ScalarValues as state

Aggregates use ScalarValue to represent state and evaluate to a result. Should I look into restricting their return type to the subset of types which can be represented by ScalarValues?

External error: query failed: DataFusion error: Arrow error: Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 3
[SQL] SELECT
  min(utf8),
  max(utf8),
  count(utf8),
  min(largeutf8),
  max(largeutf8),
  count(largeutf8)
FROM t
at test_files/aggregate.slt:4104

Weird error I don't quite understand

Removing this query doesn't yield any other error in the slt file. I don't have any other clue and I'm not sure where to start 🤷 .

External error: query result mismatch:
[SQL] SELECT
  count,
  LAG(timestamp, 1) OVER (ORDER BY timestamp),
  arrow_typeof(LAG(timestamp, 1) OVER (ORDER BY timestamp))
FROM timestamp_with_tz
LIMIT 10;
[Diff] (-expected|+actual)
    0 NULL Timestamp(Millisecond, Some("UTC"))
    0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
    0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
-   4 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
    0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
-   0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+   12 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+   2 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
    0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
    14 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
-   0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
-   0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+   5 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
+   1 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
at test_files/parquet.slt:255

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 20, 2024

Aggregates using ScalarValues as state

The issue is that we have only ScalarValue but no DataType to get the actual ArrayRef back.
One of the solution is to store DataType in MaxAccumulator and call to_array_of_type to return ArrayRef as the output of evaluate, maybe evaluate_as_array to avoid breaking existing function.

Weird error I don't quite understand

I fail to reproduce the error 😕

UIpd: It seems there is only one place that calls Accumulator::evaluate and transform it to ArrayRef, I think we can just change the return value to ArrayRef for Accumulator::evaluate

.map(|accumulator| accumulator.evaluate().and_then(|v| v.to_array()))

@notfilippo
Copy link
Author

UIpd: It seems there is only one place that calls Accumulator::evaluate and transform it to ArrayRef, I think we can just change the return value to ArrayRef for Accumulator::evaluate

It seems like this solution is not that easy as the state needs to also be accounted for, but it's definitely a good start!

@notfilippo notfilippo marked this pull request as ready for review August 23, 2024 09:36
@notfilippo
Copy link
Author

notfilippo commented Aug 23, 2024

Marking as "ready for review" because all tests pass on my end. Some casting issues still remain that I'll look into soon but in the meantime I'm looking forward to some feedback on this huge change ❤️

@jayzhan211
Copy link
Contributor

Will evaluate_as_scalar replaces evaluate and state_as_scalars replaces state? If it is, then it looks good to me

@notfilippo
Copy link
Author

Will evaluate_as_scalar replaces evaluate and state_as_scalars replaces state? If it is, then it looks good to me

Yes that's the plan!

@notfilippo
Copy link
Author

I'm back from vacation and I've rebased my PR to the latest upstream.

pub struct Scalar {
value: ScalarValue,
data_type: DataType,
}
Copy link
Contributor

@jayzhan211 jayzhan211 Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is the main change of this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate functions logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions proto Related to proto crate sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants