Skip to content

Conversation

@izchen
Copy link
Contributor

@izchen izchen commented Oct 25, 2021

Currently, directly register iceberg api org.apache.iceberg.transforms.Bucket#apply as spark UDF in org.apache.iceberg.spark#IcebergSpark.

For byte, short, date, timestamp, and binary, Spark value of these types is different from iceberg‘s internal representation, which will cause a ClassCastException error.

SPARK TYPE SPARK VALUE ICEBERG TYPE ICEBERG VALUE
ByteType java.lang.Byte IntegerType java.lang.Integer
ShortType java.lang.Short IntegerType java.lang.Integer
DateType java.sql.Date DateType java.lang.Integer
TimestampType java.sql.Timestamp TimestampType.withZone java.lang.Long
BinaryType byte array BinaryType java.nio.ByteBuffer

We should first convert the spark value to iceberg's internal representation, and then use the converted value as the input of iceberg api org.apache.iceberg.spark#IcebergSpark.

In addition, add more ut in this PR to cover all spark atom types.

@github-actions github-actions bot added the spark label Oct 25, 2021
@izchen
Copy link
Contributor Author

izchen commented Oct 25, 2021

Related issue: #2838

@izchen
Copy link
Contributor Author

izchen commented Oct 25, 2021

@rdblue @RussellSpitzer , could you help to review this PR? :)

}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
public static Object convertAtomicValue(DataType atomic, Object object) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

I think you could handle short and byte types by updating the convert(Type, Object) method above:

      case INTEGER:
        return ((Number) object).intValue();

Then you wouldn't need a new method at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. The method is not needed.

@rdblue
Copy link
Contributor

rdblue commented Oct 25, 2021

@izchen, could you fix just the latest version of Spark and then we'll port the changes to the other versions when after it is merged? That way we don't have a commit that affects all Spark versions.

zhangchen added 3 commits October 26, 2021 00:41
This reverts commit 6919b32.
This reverts commit 5b7201f.
@izchen
Copy link
Contributor Author

izchen commented Oct 25, 2021

@rdblue done, could you help to review this PR again?

Copy link
Contributor

@jackye1995 jackye1995 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me

@rdblue rdblue added this to the Java 0.12.1 Release milestone Oct 26, 2021
@rdblue
Copy link
Contributor

rdblue commented Oct 26, 2021

Looks good. Thanks, @izchen!

@rdblue rdblue merged commit 425641a into apache:master Oct 26, 2021
@izchen izchen deleted the fix_bucket_udf branch October 27, 2021 01:45
@izchen
Copy link
Contributor Author

izchen commented Oct 27, 2021

Thanks, @rdblue @jackye1995 !

rdblue pushed a commit that referenced this pull request Nov 18, 2021
rdblue pushed a commit that referenced this pull request Nov 18, 2021
rdblue pushed a commit that referenced this pull request Nov 18, 2021
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Nov 23, 2021
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Nov 23, 2021
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Nov 23, 2021
izchen added a commit to izchen/iceberg that referenced this pull request Dec 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants