Skip to content

Conversation

@xuzifu666
Copy link
Member

@xuzifu666 xuzifu666 commented Aug 17, 2024

Purpose

Linked issue: close #xxx

Support write with composite partition key.

Currently paimon spark write not support when table partition key is composite key,this pr is aimed to support it.
BTW,to support scan pushdown, changed the PaimonUtils##fieldReference with FieldReference(Seq(name)),this is to avoid error as follow which also can refer spark issue apache/spark#35108 and this api can also compative with Spark3.2.0 (which can refer the pr #3968)

1723868408209.png

org.apache.spark.sql.catalyst.parser.ParseException: 
Syntax error at or near 't': extra input 't'(line 1, pos 3)

== SQL ==
pt t
---^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:67)
	at org.apache.spark.sql.connector.expressions.LogicalExpressions$.parseReference(expressions.scala:40)
	at org.apache.spark.sql.connector.expressions.FieldReference$.apply(expressions.scala:368)
	at org.apache.spark.sql.PaimonUtils$.fieldReference(PaimonUtils.scala:69)
	at org.apache.paimon.spark.PaimonScan.$anonfun$filterAttributes$2(PaimonScan.scala:106)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at org.apache.paimon.spark.PaimonScan.filterAttributes(PaimonScan.scala:106)
	at org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.$anonfun$getFilterableTableScan$1(PartitionPruning.scala:82)

another condition if not quote SparkTable##partitioning would parse error as follow:

Syntax error at or near 't': extra input 't'(line 1, pos 3)

== SQL ==
pt t
---^^^

org.apache.spark.sql.catalyst.parser.ParseException: 
Syntax error at or near 't': extra input 't'(line 1, pos 3)

== SQL ==
pt t
---^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:67)
	at org.apache.spark.sql.connector.expressions.LogicalExpressions$.parseReference(expressions.scala:40)
	at org.apache.spark.sql.connector.expressions.LogicalExpressions.parseReference(expressions.scala)
	at org.apache.spark.sql.connector.expressions.Expressions.column(Expressions.java:58)
	at org.apache.spark.sql.connector.expressions.Expressions.identity(Expressions.java:105)
	at org.apache.paimon.spark.SparkTable.$anonfun$partitioning$1(SparkTable.scala:54)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.paimon.spark.SparkTable.partitioning(SparkTable.scala:54)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveInsertInto$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveInsertInto$$partitionColumnNames(Analyzer.scala:1238)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveInsertInto$$anonfun$apply$14.applyOrElse(Analyzer.scala:1218)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveInsertInto$$anonfun$apply$14.applyOrElse(Analyzer.scala:1210)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)

So support composite partition key need change PaimonUtils##fieldReference and quote SparkTable##partitioning.

Tests

API and Format

Documentation

@xuzifu666 xuzifu666 marked this pull request as ready for review August 17, 2024 04:14
@xuzifu666 xuzifu666 changed the title [spark] Support composite key in paimon paitition [spark] Support write with composite partition key Aug 17, 2024
@xuzifu666 xuzifu666 closed this Aug 17, 2024
@xuzifu666 xuzifu666 reopened this Aug 17, 2024
@xuzifu666 xuzifu666 closed this Aug 17, 2024
@xuzifu666 xuzifu666 reopened this Aug 17, 2024
Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 0a11857 into apache:master Aug 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants