Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
case proto.Relation.RelTypeCase.AGGREGATE => transformAggregate(rel.getAggregate)
case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
case proto.Relation.RelTypeCase.LOCAL_RELATION =>
transformLocalRelation(rel.getLocalRelation)
transformLocalRelation(rel.getLocalRelation, common)
Copy link
Contributor

@cloud-fan cloud-fan Oct 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is common? Every logical plan has an optional alias?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is legacy design that I believe it thinks only relations have the optional alias.

Every logical plan could have an optional alias, in that case I prefer to move that alias out of the common to have its own message. This is because by that we can differentiate

.xx()
.xx().as("") // probably invalid but user can write down such API
.xx().as("alias_1")

I can also change this in this PR if you think this is a right time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sent a PR for this topic (to avoid complicate current refactoring PR too much): #38415

case proto.Relation.RelTypeCase.SAMPLE => transformSample(rel.getSample)
case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
throw new IndexOutOfBoundsException("Expected Relation to be set, but is empty.")
Expand Down Expand Up @@ -125,9 +125,16 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
}
}

private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = {
private def transformLocalRelation(
rel: proto.LocalRelation,
common: Option[proto.RelationCommon]): LogicalPlan = {
val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq
new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes)
val relation = new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes)
if (common.nonEmpty && common.get.getAlias.nonEmpty) {
logical.SubqueryAlias(identifier = common.get.getAlias, child = relation)
} else {
relation
}
}

private def transformAttribute(exp: proto.Expression.QualifiedAttribute): Attribute = {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,18 @@ import scala.collection.JavaConverters._
import org.apache.spark.SparkFunSuite
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.Expression.UnresolvedStar
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.test.SharedSparkSession

/**
* Testing trait for SparkConnect tests with some helper methods to make it easier to create new
* test cases.
*/
trait SparkConnectPlanTest {

def getSession(): SparkSession = None.orNull
trait SparkConnectPlanTest extends SharedSparkSession {

def transform(rel: proto.Relation): LogicalPlan = {
new SparkConnectPlanner(rel, getSession()).transform()
new SparkConnectPlanner(rel, spark).transform()
}

def readRel: proto.Relation =
Expand Down
Loading