Skip to content

Conversation

@XuQianJin-Stars
Copy link
Contributor

@XuQianJin-Stars XuQianJin-Stars commented Jan 30, 2022

Tips

What is the purpose of the pull request

Support querying a table as of a savepoint
link: HUDI-3221
Support Spark Version:

version support
2.4.x No
3.1.2 No
3.2.0 Yes
3.0.x No

Brief change log

  1. HoodieSpark3_2ExtendedSqlAstBuilder have comments in the code fork from org.apache.spark.sql.catalyst.parser.AstBuilder, Additional withTimeTravel method.
  2. SqlBase.g4 have comments in the code forked from spark parser, Add SparkSQL Syntax TIMESTAMP AS OF and VERSION AS OF.
  3. UT Test TestTimeTravelParser , TestTimeTravelTable

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@nsivabalan nsivabalan added the priority:critical Production degraded; pipelines stalled label Jan 31, 2022
@nsivabalan nsivabalan requested a review from xushiyan February 3, 2022 22:58
@nsivabalan nsivabalan added priority:blocker Production down; release blocker and removed priority:critical Production degraded; pipelines stalled labels Feb 8, 2022
@fedsp
Copy link

fedsp commented Feb 9, 2022

@xushiyan , there is anything that I can help from my side to help you on this PR?
I can help testing using real datasets

@XuQianJin-Stars XuQianJin-Stars force-pushed the HUDI-3221 branch 2 times, most recently from 15931c0 to c9b323f Compare February 15, 2022 11:17
@nsivabalan nsivabalan removed their assignment Feb 16, 2022
@xushiyan
Copy link
Member

@xushiyan , there is anything that I can help from my side to help you on this PR? I can help testing using real datasets

hey @fedsp thank you for offering help. Feel free to build this branch against spark 3.2 (using maven profile spark3) and test with your datasets. Feel free to post any results or feedback. That'd be of great help!

@fedsp
Copy link

fedsp commented Feb 23, 2022

@xushiyan great! I will do this by today. I'm planning to use it on aws glue which unfortunately only offers spark 3.1 today. I know that Hudi documentation says explicitly that the supported version of spark is 3.2, but there is any chance that it will work on 3.1?

@XuQianJin-Stars
Copy link
Contributor Author

@xushiyan great! I will do this by today. I'm planning to use it on aws glue which unfortunately only offers spark 3.1 today. I know that Hudi documentation says explicitly that the supported version of spark is 3.2, but there is any chance that it will work on 3.1?

hi @fedsp Here is a multi version PR, you can test it. #4885

@xushiyan
Copy link
Member

@YannByron can you help reviewing this too? thanks

@YannByron
Copy link
Contributor

@YannByron can you help reviewing this too? thanks

As discussed with @XuQianJin-Stars, I prefer to make a separate pr based on the separate spark env with https://issues.apache.org/jira/browse/SPARK-37219. And we can merge this to hudi master once Spark3.3 releases.

Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'll be helpful to keep notes in hudi-spark-datasource README.md and make it clear which files are copied over from spark with modifications, and ready to be removed after using spark 3.3.

Comment on lines 160 to 178
<version>${spark3.version}</version>
<version>${spark3.2.version}</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify why this change? what if user needs to build the project with spark 3.1.x profile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify why this change? what if user needs to build the project with spark 3.1.x profile?

spark 3.1.x profile use <hudi.spark.module>hudi-spark3.1.x</hudi.spark.module> with spark3.1.x version.
spark 3 profile use <hudi.spark.module>hudi-spark3</hudi.spark.module> with spark 3.2.0(and above) versions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should prefer to simple set of properties to maintain: both spark3.version and hudi.spark.module change based on different spark profile. When spark3.version switch to 3.1.2, module will be hudi-spark3.1.x and effectively ignores hudi-spark3 here. So I don't think we need to introduce more properties here.


import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}

case class TimeTravelRelation(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks identical to the one in hudi-spark. can we deduplicate?

pom.xml Outdated
<spark3.1.version>3.1.2</spark3.1.version>
<spark3.2.version>3.2.1</spark3.2.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we need these new properties. spark3.version is always the default and point to the latest supported spark 3. and we shall build the project with spark3.1.x if we want spark3.version point to 3.1. can you clarify

@xushiyan
Copy link
Member

xushiyan commented Mar 6, 2022

@xushiyan great! I will do this by today. I'm planning to use it on aws glue which unfortunately only offers spark 3.1 today. I know that Hudi documentation says explicitly that the supported version of spark is 3.2, but there is any chance that it will work on 3.1?

This PR is ready for testing with TIMESTAMP AS OF syntax to fetch older commits' values. @fedsp any chance you have tested this branch out? we'd happy to land this with some real-world verifications. :)

@fedsp
Copy link

fedsp commented Mar 6, 2022

Hi @xushiyan! Sorry for the long response, I had some trouble building the .jar (not used to do it)

Anyways, I'm receiving a error from a straight sql statement running from AwsAthena (no problem creating the table on aws glue using spark 3 tho)

This is the error:
image

And this is the query that I used on athena:
select * from hudi_0_11_tst TIMESTAMP AS OF '2022-03-06 15:30:58'

and this is the DDL of the table creation (I created the table by hand, not on the spark write operation):

CREATE EXTERNAL TABLE hudi_0_11_tst(
    `tpep_pickup_datetime` timestamp,
    `tpep_dropoff_datetime` timestamp,
    `passenger_count` int,
    `trip_distance` double,
    `ratecodeid` int,
    `store_and_fwd_flag` string,
    `pulocationid` int,
    `dolocationid` int,
    `payment_type` int,
    `fare_amount` double,
    `extra` double,
    `mta_tax` double,
    `tip_amount` double,
    `tolls_amount` double,
    `improvement_surcharge` double,
    `total_amount` double,
    `congestion_surcharge` double,
    `pk_col` bigint
)
PARTITIONED BY (
    `vendorid` string)
ROW FORMAT SERDE
    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES ( 
    'path'='s3://mybuckethudi_0_11_tst')
STORED AS INPUTFORMAT
    'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
    's3://mybucket/hudi_0_11_tst'

Did I did something wrong?

@XuQianJin-Stars
Copy link
Contributor Author

hi @fedsp Your spark version is 3.2.x?

@fedsp
Copy link

fedsp commented Mar 7, 2022

No, @XuQianJin-Stars , unfortunatelly it is 3.1, since I am limited to the aws glue environment. But I used your branch #4885

@fedsp
Copy link

fedsp commented Mar 7, 2022

Also please note that this error is not from a spark environment, but from AwsAthena, which uses prestodb as a engine.

There is any additional setup?

@XuQianJin-Stars
Copy link
Contributor Author

Also please note that this error is not from a spark environment, but from AwsAthena, which uses prestodb as a engine.

There is any additional setup?

prestodb also needs to support this syntax.

@xushiyan
Copy link
Member

xushiyan commented Mar 7, 2022

@fedsp let me clarify that the time travel query here is supported in Spark SQL with Spark 3.2+ but not yet in other query engine like presto. So you won't be able to run this in Athena. Are you able to verify the branch by running spark sql against your datasets?

@fedsp
Copy link

fedsp commented Mar 7, 2022

Thank you for the clarification @xushiyan!
If you are avaliable, I can test right now screensharing and we can see the results live


LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
} else {
plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not a Hoodie table, it should return the origin object: l.

* @since 1.3.0
*/
@Stable
class AnalysisException protected[sql](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to define this instead of using AnalysisException inside of Spark.

/**
* The adapter for spark3.2.
*/
class Spark3_2Adapter extends SparkAdapter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you let Spark3_2Adapter extend Spark3Adapter, and only overwrite isRelationTimeTravel and getRelationTimeTravel.

@fedsp
Copy link

fedsp commented Mar 7, 2022

Now, from a spark context (glue context), I tried the following pyspark command:

df = spark_context.sql("SELECT * FROM SANDBOX.hudi_0_11_tst TIMESTAMP AS OF '2022-03-06 15:30:58'")

and it gave me the following error:

File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 433, in sql
    return self.sparkSession.sql(sqlQuery)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 723, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in _call_
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.ParseException: 
mismatched input 'AS' expecting {<EOF>, ';'}(line 1, pos 46)

== SQL ==
SELECT * FROM SANDBOX.hudi_0_11_tst TIMESTAMP AS OF '2022-03-06 15:30:58'
----------------------------------------------^^^

@XuQianJin-Stars
Copy link
Contributor Author

@hudi-bot run azure

@apache apache deleted a comment from hudi-bot Mar 8, 2022
@hudi-bot
Copy link
Collaborator

hudi-bot commented Mar 8, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@xushiyan
Copy link
Member

xushiyan commented Mar 8, 2022

@fedsp it's a spark bundle or version mismatch problem where the syntax is not recognized. Maybe previous version from the branch has some misconfig but now it's resolved. I verified the feature in spark 3.2.1

➜ mvn -T 2.5C clean install -DskipTests -Djacoco.skip=true -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true -Dscala-2.12 -Dspark3

// test using packaging/hudi-spark-bundle/target/hudi-spark3.2.1-bundle_2.12-0.11.0-SNAPSHOT.jar

// Spark 3.2.1
// COW
spark.sql("""
create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';
""")
spark.sql("insert into hudi_cow_pt_tbl select 1, 'a0', 1000, '2021-12-09', '10'")
spark.sql("select * from hudi_cow_pt_tbl").show()
spark.sql("insert into hudi_cow_pt_tbl select 1, 'a1', 1001, '2021-12-09', '10'")
spark.sql("select * from hudi_cow_pt_tbl").show()


// time travel based on first commit time
spark.sql("select * from hudi_cow_pt_tbl timestamp as of '20220308175415995' where id = 1").show()
// time travel based on different timestamp formats
spark.sql("select * from hudi_cow_pt_tbl timestamp as of '2022-03-08 17:54:15.995' where id = 1").show()
spark.sql("select * from hudi_cow_pt_tbl timestamp as of '2022-03-09' where id = 1").show()

@xushiyan xushiyan merged commit 08fd80c into apache:master Mar 8, 2022
vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
stayrascal pushed a commit to stayrascal/hudi that referenced this pull request Apr 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants