diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index 087a4dca2f70..9a9b4fdbbb96 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -31,7 +31,7 @@ option java_package = "org.apache.spark.connect.proto"; message Relation { RelationCommon common = 1; oneof rel_type { - Read read = 2; + UnresolvedRelation unresolved_relation = 2; Project project = 3; Filter filter = 4; Join join = 5; @@ -60,16 +60,9 @@ message SQL { string query = 1; } -// Relation that reads from a file / table or other data source. Does not have additional -// inputs. -message Read { - oneof read_type { - NamedTable named_table = 1; - } - - message NamedTable { - repeated string parts = 1; - } +// An unresolved relation. Does not have additional inputs. +message UnresolvedRelation { + repeated string name_parts = 1; } // Projection of a bag of expressions for a given input relation. diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index fa9dd18d3bfa..0a10e2f8933c 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -51,7 +51,8 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { } rel.getRelTypeCase match { - case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead, common) + case proto.Relation.RelTypeCase.UNRESOLVED_RELATION => + transformReadRel(rel.getUnresolvedRelation, common) case proto.Relation.RelTypeCase.PROJECT => transformProject(rel.getProject, common) case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter) case proto.Relation.RelTypeCase.FETCH => transformFetch(rel.getFetch) @@ -82,19 +83,17 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { } private def transformReadRel( - rel: proto.Read, + rel: proto.UnresolvedRelation, common: Option[proto.RelationCommon]): LogicalPlan = { - val baseRelation = rel.getReadTypeCase match { - case proto.Read.ReadTypeCase.NAMED_TABLE => - val child = UnresolvedRelation(rel.getNamedTable.getPartsList.asScala.toSeq) - if (common.nonEmpty && common.get.getAlias.nonEmpty) { - SubqueryAlias(identifier = common.get.getAlias, child = child) - } else { - child - } - case _ => throw InvalidPlanInput() + if (rel.getNamePartsCount == 0) { + throw InvalidPlanInput("Unresolved relation requires at least one name part") + } + val child = UnresolvedRelation(rel.getNamePartsList.asScala.toSeq) + if (common.nonEmpty && common.get.getAlias.nonEmpty) { + SubqueryAlias(identifier = common.get.getAlias, child = child) + } else { + child } - baseRelation } private def transformFilter(rel: proto.Filter): LogicalPlan = { diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index e1a658fb57b2..47ac01fb8de5 100644 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -37,10 +37,10 @@ trait SparkConnectPlanTest { def readRel: proto.Relation = proto.Relation .newBuilder() - .setRead( - proto.Read + .setUnresolvedRelation( + proto.UnresolvedRelation .newBuilder() - .setNamedTable(proto.Read.NamedTable.newBuilder().addParts("table")) + .addNameParts("table") .build()) .build() } @@ -81,24 +81,25 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { } test("Simple Read") { - val read = proto.Read.newBuilder().build() + val read = proto.UnresolvedRelation.newBuilder().build() // Invalid read without Table name. - intercept[InvalidPlanInput](transform(proto.Relation.newBuilder.setRead(read).build())) + intercept[InvalidPlanInput]( + transform(proto.Relation.newBuilder.setUnresolvedRelation(read).build())) val readWithTable = read.toBuilder - .setNamedTable(proto.Read.NamedTable.newBuilder.addParts("name").build()) + .addNameParts("name") .build() - val res = transform(proto.Relation.newBuilder.setRead(readWithTable).build()) + val res = transform(proto.Relation.newBuilder.setUnresolvedRelation(readWithTable).build()) assert(res !== null) assert(res.nodeName == "UnresolvedRelation") } test("Simple Project") { - val readWithTable = proto.Read.newBuilder() - .setNamedTable(proto.Read.NamedTable.newBuilder.addParts("name").build()) + val readWithTable = proto.UnresolvedRelation.newBuilder() + .addNameParts("name") .build() val project = proto.Project.newBuilder() - .setInput(proto.Relation.newBuilder().setRead(readWithTable).build()) + .setInput(proto.Relation.newBuilder().setUnresolvedRelation(readWithTable).build()) .addExpressions( proto.Expression.newBuilder() .setUnresolvedStar(UnresolvedStar.newBuilder().build()).build()