diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index eadedf495d3e..6adf0831ea21 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -43,6 +43,7 @@ message Relation { LocalRelation local_relation = 11; Sample sample = 12; Offset offset = 13; + Deduplicate deduplicate = 14; Unknown unknown = 999; } @@ -181,6 +182,14 @@ message Sort { } } +// Relation of type [[Deduplicate]] which have duplicate rows removed, could consider either only +// the subset of columns or all the columns. +message Deduplicate { + Relation input = 1; + repeated string column_names = 2; + bool all_columns_as_keys = 3; +} + message LocalRelation { repeated Expression.QualifiedAttribute attributes = 1; // TODO: support local data. diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 8a267dff7d78..68bbc0487f97 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -215,6 +215,26 @@ package object dsl { .build() } + def deduplicate(colNames: Seq[String]): proto.Relation = + proto.Relation + .newBuilder() + .setDeduplicate( + proto.Deduplicate + .newBuilder() + .setInput(logicalPlan) + .addAllColumnNames(colNames.asJava)) + .build() + + def distinct(): proto.Relation = + proto.Relation + .newBuilder() + .setDeduplicate( + proto.Deduplicate + .newBuilder() + .setInput(logicalPlan) + .setAllColumnsAsKeys(true)) + .build() + def join( otherPlan: proto.Relation, joinType: JoinType = JoinType.JOIN_TYPE_INNER, diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 450283a9b81f..92c8bf01cba6 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -27,8 +27,9 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.{logical, FullOuter, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sample, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, LogicalPlan, Sample, SubqueryAlias} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.types._ final case class InvalidPlanInput( @@ -60,6 +61,7 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { case proto.Relation.RelTypeCase.OFFSET => transformOffset(rel.getOffset) case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin) case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion) + case proto.Relation.RelTypeCase.DEDUPLICATE => transformDeduplicate(rel.getDeduplicate) case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort) case proto.Relation.RelTypeCase.AGGREGATE => transformAggregate(rel.getAggregate) case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql) @@ -91,6 +93,37 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { transformRelation(rel.getInput)) } + private def transformDeduplicate(rel: proto.Deduplicate): LogicalPlan = { + if (!rel.hasInput) { + throw InvalidPlanInput("Deduplicate needs a plan input") + } + if (rel.getAllColumnsAsKeys && rel.getColumnNamesCount > 0) { + throw InvalidPlanInput("Cannot deduplicate on both all columns and a subset of columns") + } + if (!rel.getAllColumnsAsKeys && rel.getColumnNamesCount == 0) { + throw InvalidPlanInput( + "Deduplicate requires to either deduplicate on all columns or a subset of columns") + } + val queryExecution = new QueryExecution(session, transformRelation(rel.getInput)) + val resolver = session.sessionState.analyzer.resolver + val allColumns = queryExecution.analyzed.output + if (rel.getAllColumnsAsKeys) { + Deduplicate(allColumns, queryExecution.analyzed) + } else { + val toGroupColumnNames = rel.getColumnNamesList.asScala.toSeq + val groupCols = toGroupColumnNames.flatMap { (colName: String) => + // It is possibly there are more than one columns with the same name, + // so we call filter instead of find. + val cols = allColumns.filter(col => resolver(col.name, colName)) + if (cols.isEmpty) { + throw InvalidPlanInput(s"Invalid deduplicate column ${colName}") + } + cols + } + Deduplicate(groupCols, queryExecution.analyzed) + } + } + private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = { val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes) diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectDeduplicateSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectDeduplicateSuite.scala new file mode 100644 index 000000000000..88af60581ba2 --- /dev/null +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectDeduplicateSuite.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.planner + +import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + +/** + * [[SparkConnectPlanTestWithSparkSession]] contains a SparkSession for the connect planner. + * + * It is not recommended to use Catalyst DSL along with this trait because `SharedSparkSession` + * has also defined implicits over Catalyst LogicalPlan which will cause ambiguity with the + * implicits defined in Catalyst DSL. + */ +trait SparkConnectPlanTestWithSparkSession extends SharedSparkSession with SparkConnectPlanTest { + override def getSession(): SparkSession = spark +} + +class SparkConnectDeduplicateSuite extends SparkConnectPlanTestWithSparkSession { + lazy val connectTestRelation = createLocalRelationProto( + Seq( + AttributeReference("id", IntegerType)(), + AttributeReference("key", StringType)(), + AttributeReference("value", StringType)())) + + lazy val sparkTestRelation = { + spark.createDataFrame( + new java.util.ArrayList[Row](), + StructType( + Seq( + StructField("id", IntegerType), + StructField("key", StringType), + StructField("value", StringType)))) + } + + test("Test basic deduplicate") { + val connectPlan = { + import org.apache.spark.sql.connect.dsl.plans._ + Dataset.ofRows(spark, transform(connectTestRelation.distinct())) + } + + val sparkPlan = sparkTestRelation.distinct() + comparePlans(connectPlan.queryExecution.analyzed, sparkPlan.queryExecution.analyzed, false) + + val connectPlan2 = { + import org.apache.spark.sql.connect.dsl.plans._ + Dataset.ofRows(spark, transform(connectTestRelation.deduplicate(Seq("key", "value")))) + } + val sparkPlan2 = sparkTestRelation.dropDuplicates(Seq("key", "value")) + comparePlans(connectPlan2.queryExecution.analyzed, sparkPlan2.queryExecution.analyzed, false) + } +} diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index 83bf76efce1d..980e899c26ed 100644 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -31,8 +31,11 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan * test cases. */ trait SparkConnectPlanTest { + + def getSession(): SparkSession = None.orNull + def transform(rel: proto.Relation): LogicalPlan = { - new SparkConnectPlanner(rel, None.orNull).transform() + new SparkConnectPlanner(rel, getSession()).transform() } def readRel: proto.Relation = @@ -72,8 +75,6 @@ trait SparkConnectPlanTest { */ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { - protected var spark: SparkSession = null - test("Simple Limit") { assertThrows[IndexOutOfBoundsException] { new SparkConnectPlanner( @@ -266,4 +267,26 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { .build())) assert(e.getMessage.contains("DataSource requires a format")) } + + test("Test invalid deduplicate") { + val deduplicate = proto.Deduplicate + .newBuilder() + .setInput(readRel) + .setAllColumnsAsKeys(true) + .addColumnNames("test") + + val e = intercept[InvalidPlanInput] { + transform(proto.Relation.newBuilder.setDeduplicate(deduplicate).build()) + } + assert( + e.getMessage.contains("Cannot deduplicate on both all columns and a subset of columns")) + + val deduplicate2 = proto.Deduplicate + .newBuilder() + .setInput(readRel) + val e2 = intercept[InvalidPlanInput] { + transform(proto.Relation.newBuilder.setDeduplicate(deduplicate2).build()) + } + assert(e2.getMessage.contains("either deduplicate on all columns or a subset of columns")) + } } diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py b/python/pyspark/sql/connect/proto/relations_pb2.py index b244cdf8dcb9..1c868bcf411b 100644 --- a/python/pyspark/sql/connect/proto/relations_pb2.py +++ b/python/pyspark/sql/connect/proto/relations_pb2.py @@ -32,7 +32,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto"\xcf\x05\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04 \x01(\x0b\x32\x15.spark.connect.FilterH\x00R\x06\x66ilter\x12)\n\x04join\x18\x05 \x01(\x0b\x32\x13.spark.connect.JoinH\x00R\x04join\x12,\n\x05union\x18\x06 \x01(\x0b\x32\x14.spark.connect.UnionH\x00R\x05union\x12)\n\x04sort\x18\x07 \x01(\x0b\x32\x13.spark.connect.SortH\x00R\x04sort\x12,\n\x05limit\x18\x08 \x01(\x0b\x32\x14.spark.connect.LimitH\x00R\x05limit\x12\x38\n\taggregate\x18\t \x01(\x0b\x32\x18.spark.connect.AggregateH\x00R\taggregate\x12&\n\x03sql\x18\n \x01(\x0b\x32\x12.spark.connect.SQLH\x00R\x03sql\x12\x45\n\x0elocal_relation\x18\x0b \x01(\x0b\x32\x1c.spark.connect.LocalRelationH\x00R\rlocalRelation\x12/\n\x06sample\x18\x0c \x01(\x0b\x32\x15.spark.connect.SampleH\x00R\x06sample\x12/\n\x06offset\x18\r \x01(\x0b\x32\x15.spark.connect.OffsetH\x00R\x06offset\x12\x33\n\x07unknown\x18\xe7\x07 \x01(\x0b\x32\x16.spark.connect.UnknownH\x00R\x07unknownB\n\n\x08rel_type"\t\n\x07Unknown"G\n\x0eRelationCommon\x12\x1f\n\x0bsource_info\x18\x01 \x01(\tR\nsourceInfo\x12\x14\n\x05\x61lias\x18\x02 \x01(\tR\x05\x61lias"\x1b\n\x03SQL\x12\x14\n\x05query\x18\x01 \x01(\tR\x05query"\x9a\x03\n\x04Read\x12\x41\n\x0bnamed_table\x18\x01 \x01(\x0b\x32\x1e.spark.connect.Read.NamedTableH\x00R\nnamedTable\x12\x41\n\x0b\x64\x61ta_source\x18\x02 \x01(\x0b\x32\x1e.spark.connect.Read.DataSourceH\x00R\ndataSource\x1a=\n\nNamedTable\x12/\n\x13unparsed_identifier\x18\x01 \x01(\tR\x12unparsedIdentifier\x1a\xbf\x01\n\nDataSource\x12\x16\n\x06\x66ormat\x18\x01 \x01(\tR\x06\x66ormat\x12\x16\n\x06schema\x18\x02 \x01(\tR\x06schema\x12\x45\n\x07options\x18\x03 \x03(\x0b\x32+.spark.connect.Read.DataSource.OptionsEntryR\x07options\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x0b\n\tread_type"u\n\x07Project\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12;\n\x0b\x65xpressions\x18\x03 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x0b\x65xpressions"p\n\x06\x46ilter\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x37\n\tcondition\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\tcondition"\x9d\x03\n\x04Join\x12+\n\x04left\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x04left\x12-\n\x05right\x18\x02 \x01(\x0b\x32\x17.spark.connect.RelationR\x05right\x12@\n\x0ejoin_condition\x18\x03 \x01(\x0b\x32\x19.spark.connect.ExpressionR\rjoinCondition\x12\x39\n\tjoin_type\x18\x04 \x01(\x0e\x32\x1c.spark.connect.Join.JoinTypeR\x08joinType"\xbb\x01\n\x08JoinType\x12\x19\n\x15JOIN_TYPE_UNSPECIFIED\x10\x00\x12\x13\n\x0fJOIN_TYPE_INNER\x10\x01\x12\x18\n\x14JOIN_TYPE_FULL_OUTER\x10\x02\x12\x18\n\x14JOIN_TYPE_LEFT_OUTER\x10\x03\x12\x19\n\x15JOIN_TYPE_RIGHT_OUTER\x10\x04\x12\x17\n\x13JOIN_TYPE_LEFT_ANTI\x10\x05\x12\x17\n\x13JOIN_TYPE_LEFT_SEMI\x10\x06"\xcd\x01\n\x05Union\x12/\n\x06inputs\x18\x01 \x03(\x0b\x32\x17.spark.connect.RelationR\x06inputs\x12=\n\nunion_type\x18\x02 \x01(\x0e\x32\x1e.spark.connect.Union.UnionTypeR\tunionType"T\n\tUnionType\x12\x1a\n\x16UNION_TYPE_UNSPECIFIED\x10\x00\x12\x17\n\x13UNION_TYPE_DISTINCT\x10\x01\x12\x12\n\x0eUNION_TYPE_ALL\x10\x02"L\n\x05Limit\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x14\n\x05limit\x18\x02 \x01(\x05R\x05limit"O\n\x06Offset\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06offset\x18\x02 \x01(\x05R\x06offset"\xc5\x02\n\tAggregate\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12L\n\x14grouping_expressions\x18\x02 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13groupingExpressions\x12Y\n\x12result_expressions\x18\x03 \x03(\x0b\x32*.spark.connect.Aggregate.AggregateFunctionR\x11resultExpressions\x1a`\n\x11\x41ggregateFunction\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x37\n\targuments\x18\x02 \x03(\x0b\x32\x19.spark.connect.ExpressionR\targuments"\xf6\x03\n\x04Sort\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12>\n\x0bsort_fields\x18\x02 \x03(\x0b\x32\x1d.spark.connect.Sort.SortFieldR\nsortFields\x1a\xbc\x01\n\tSortField\x12\x39\n\nexpression\x18\x01 \x01(\x0b\x32\x19.spark.connect.ExpressionR\nexpression\x12?\n\tdirection\x18\x02 \x01(\x0e\x32!.spark.connect.Sort.SortDirectionR\tdirection\x12\x33\n\x05nulls\x18\x03 \x01(\x0e\x32\x1d.spark.connect.Sort.SortNullsR\x05nulls"l\n\rSortDirection\x12\x1e\n\x1aSORT_DIRECTION_UNSPECIFIED\x10\x00\x12\x1c\n\x18SORT_DIRECTION_ASCENDING\x10\x01\x12\x1d\n\x19SORT_DIRECTION_DESCENDING\x10\x02"R\n\tSortNulls\x12\x1a\n\x16SORT_NULLS_UNSPECIFIED\x10\x00\x12\x14\n\x10SORT_NULLS_FIRST\x10\x01\x12\x13\n\x0fSORT_NULLS_LAST\x10\x02"]\n\rLocalRelation\x12L\n\nattributes\x18\x01 \x03(\x0b\x32,.spark.connect.Expression.QualifiedAttributeR\nattributes"\xb8\x01\n\x06Sample\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1f\n\x0blower_bound\x18\x02 \x01(\x01R\nlowerBound\x12\x1f\n\x0bupper_bound\x18\x03 \x01(\x01R\nupperBound\x12)\n\x10with_replacement\x18\x04 \x01(\x08R\x0fwithReplacement\x12\x12\n\x04seed\x18\x05 \x01(\x03R\x04seedB"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3' + b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto"\x8f\x06\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04 \x01(\x0b\x32\x15.spark.connect.FilterH\x00R\x06\x66ilter\x12)\n\x04join\x18\x05 \x01(\x0b\x32\x13.spark.connect.JoinH\x00R\x04join\x12,\n\x05union\x18\x06 \x01(\x0b\x32\x14.spark.connect.UnionH\x00R\x05union\x12)\n\x04sort\x18\x07 \x01(\x0b\x32\x13.spark.connect.SortH\x00R\x04sort\x12,\n\x05limit\x18\x08 \x01(\x0b\x32\x14.spark.connect.LimitH\x00R\x05limit\x12\x38\n\taggregate\x18\t \x01(\x0b\x32\x18.spark.connect.AggregateH\x00R\taggregate\x12&\n\x03sql\x18\n \x01(\x0b\x32\x12.spark.connect.SQLH\x00R\x03sql\x12\x45\n\x0elocal_relation\x18\x0b \x01(\x0b\x32\x1c.spark.connect.LocalRelationH\x00R\rlocalRelation\x12/\n\x06sample\x18\x0c \x01(\x0b\x32\x15.spark.connect.SampleH\x00R\x06sample\x12/\n\x06offset\x18\r \x01(\x0b\x32\x15.spark.connect.OffsetH\x00R\x06offset\x12>\n\x0b\x64\x65\x64uplicate\x18\x0e \x01(\x0b\x32\x1a.spark.connect.DeduplicateH\x00R\x0b\x64\x65\x64uplicate\x12\x33\n\x07unknown\x18\xe7\x07 \x01(\x0b\x32\x16.spark.connect.UnknownH\x00R\x07unknownB\n\n\x08rel_type"\t\n\x07Unknown"G\n\x0eRelationCommon\x12\x1f\n\x0bsource_info\x18\x01 \x01(\tR\nsourceInfo\x12\x14\n\x05\x61lias\x18\x02 \x01(\tR\x05\x61lias"\x1b\n\x03SQL\x12\x14\n\x05query\x18\x01 \x01(\tR\x05query"\x9a\x03\n\x04Read\x12\x41\n\x0bnamed_table\x18\x01 \x01(\x0b\x32\x1e.spark.connect.Read.NamedTableH\x00R\nnamedTable\x12\x41\n\x0b\x64\x61ta_source\x18\x02 \x01(\x0b\x32\x1e.spark.connect.Read.DataSourceH\x00R\ndataSource\x1a=\n\nNamedTable\x12/\n\x13unparsed_identifier\x18\x01 \x01(\tR\x12unparsedIdentifier\x1a\xbf\x01\n\nDataSource\x12\x16\n\x06\x66ormat\x18\x01 \x01(\tR\x06\x66ormat\x12\x16\n\x06schema\x18\x02 \x01(\tR\x06schema\x12\x45\n\x07options\x18\x03 \x03(\x0b\x32+.spark.connect.Read.DataSource.OptionsEntryR\x07options\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x0b\n\tread_type"u\n\x07Project\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12;\n\x0b\x65xpressions\x18\x03 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x0b\x65xpressions"p\n\x06\x46ilter\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x37\n\tcondition\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\tcondition"\x9d\x03\n\x04Join\x12+\n\x04left\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x04left\x12-\n\x05right\x18\x02 \x01(\x0b\x32\x17.spark.connect.RelationR\x05right\x12@\n\x0ejoin_condition\x18\x03 \x01(\x0b\x32\x19.spark.connect.ExpressionR\rjoinCondition\x12\x39\n\tjoin_type\x18\x04 \x01(\x0e\x32\x1c.spark.connect.Join.JoinTypeR\x08joinType"\xbb\x01\n\x08JoinType\x12\x19\n\x15JOIN_TYPE_UNSPECIFIED\x10\x00\x12\x13\n\x0fJOIN_TYPE_INNER\x10\x01\x12\x18\n\x14JOIN_TYPE_FULL_OUTER\x10\x02\x12\x18\n\x14JOIN_TYPE_LEFT_OUTER\x10\x03\x12\x19\n\x15JOIN_TYPE_RIGHT_OUTER\x10\x04\x12\x17\n\x13JOIN_TYPE_LEFT_ANTI\x10\x05\x12\x17\n\x13JOIN_TYPE_LEFT_SEMI\x10\x06"\xcd\x01\n\x05Union\x12/\n\x06inputs\x18\x01 \x03(\x0b\x32\x17.spark.connect.RelationR\x06inputs\x12=\n\nunion_type\x18\x02 \x01(\x0e\x32\x1e.spark.connect.Union.UnionTypeR\tunionType"T\n\tUnionType\x12\x1a\n\x16UNION_TYPE_UNSPECIFIED\x10\x00\x12\x17\n\x13UNION_TYPE_DISTINCT\x10\x01\x12\x12\n\x0eUNION_TYPE_ALL\x10\x02"L\n\x05Limit\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x14\n\x05limit\x18\x02 \x01(\x05R\x05limit"O\n\x06Offset\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06offset\x18\x02 \x01(\x05R\x06offset"\xc5\x02\n\tAggregate\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12L\n\x14grouping_expressions\x18\x02 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13groupingExpressions\x12Y\n\x12result_expressions\x18\x03 \x03(\x0b\x32*.spark.connect.Aggregate.AggregateFunctionR\x11resultExpressions\x1a`\n\x11\x41ggregateFunction\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x37\n\targuments\x18\x02 \x03(\x0b\x32\x19.spark.connect.ExpressionR\targuments"\xf6\x03\n\x04Sort\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12>\n\x0bsort_fields\x18\x02 \x03(\x0b\x32\x1d.spark.connect.Sort.SortFieldR\nsortFields\x1a\xbc\x01\n\tSortField\x12\x39\n\nexpression\x18\x01 \x01(\x0b\x32\x19.spark.connect.ExpressionR\nexpression\x12?\n\tdirection\x18\x02 \x01(\x0e\x32!.spark.connect.Sort.SortDirectionR\tdirection\x12\x33\n\x05nulls\x18\x03 \x01(\x0e\x32\x1d.spark.connect.Sort.SortNullsR\x05nulls"l\n\rSortDirection\x12\x1e\n\x1aSORT_DIRECTION_UNSPECIFIED\x10\x00\x12\x1c\n\x18SORT_DIRECTION_ASCENDING\x10\x01\x12\x1d\n\x19SORT_DIRECTION_DESCENDING\x10\x02"R\n\tSortNulls\x12\x1a\n\x16SORT_NULLS_UNSPECIFIED\x10\x00\x12\x14\n\x10SORT_NULLS_FIRST\x10\x01\x12\x13\n\x0fSORT_NULLS_LAST\x10\x02"\x8e\x01\n\x0b\x44\x65\x64uplicate\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12!\n\x0c\x63olumn_names\x18\x02 \x03(\tR\x0b\x63olumnNames\x12-\n\x13\x61ll_columns_as_keys\x18\x03 \x01(\x08R\x10\x61llColumnsAsKeys"]\n\rLocalRelation\x12L\n\nattributes\x18\x01 \x03(\x0b\x32,.spark.connect.Expression.QualifiedAttributeR\nattributes"\xb8\x01\n\x06Sample\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1f\n\x0blower_bound\x18\x02 \x01(\x01R\nlowerBound\x12\x1f\n\x0bupper_bound\x18\x03 \x01(\x01R\nupperBound\x12)\n\x10with_replacement\x18\x04 \x01(\x08R\x0fwithReplacement\x12\x12\n\x04seed\x18\x05 \x01(\x03R\x04seedB"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3' ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) @@ -44,51 +44,53 @@ _READ_DATASOURCE_OPTIONSENTRY._options = None _READ_DATASOURCE_OPTIONSENTRY._serialized_options = b"8\001" _RELATION._serialized_start = 82 - _RELATION._serialized_end = 801 - _UNKNOWN._serialized_start = 803 - _UNKNOWN._serialized_end = 812 - _RELATIONCOMMON._serialized_start = 814 - _RELATIONCOMMON._serialized_end = 885 - _SQL._serialized_start = 887 - _SQL._serialized_end = 914 - _READ._serialized_start = 917 - _READ._serialized_end = 1327 - _READ_NAMEDTABLE._serialized_start = 1059 - _READ_NAMEDTABLE._serialized_end = 1120 - _READ_DATASOURCE._serialized_start = 1123 - _READ_DATASOURCE._serialized_end = 1314 - _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 1256 - _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 1314 - _PROJECT._serialized_start = 1329 - _PROJECT._serialized_end = 1446 - _FILTER._serialized_start = 1448 - _FILTER._serialized_end = 1560 - _JOIN._serialized_start = 1563 - _JOIN._serialized_end = 1976 - _JOIN_JOINTYPE._serialized_start = 1789 - _JOIN_JOINTYPE._serialized_end = 1976 - _UNION._serialized_start = 1979 - _UNION._serialized_end = 2184 - _UNION_UNIONTYPE._serialized_start = 2100 - _UNION_UNIONTYPE._serialized_end = 2184 - _LIMIT._serialized_start = 2186 - _LIMIT._serialized_end = 2262 - _OFFSET._serialized_start = 2264 - _OFFSET._serialized_end = 2343 - _AGGREGATE._serialized_start = 2346 - _AGGREGATE._serialized_end = 2671 - _AGGREGATE_AGGREGATEFUNCTION._serialized_start = 2575 - _AGGREGATE_AGGREGATEFUNCTION._serialized_end = 2671 - _SORT._serialized_start = 2674 - _SORT._serialized_end = 3176 - _SORT_SORTFIELD._serialized_start = 2794 - _SORT_SORTFIELD._serialized_end = 2982 - _SORT_SORTDIRECTION._serialized_start = 2984 - _SORT_SORTDIRECTION._serialized_end = 3092 - _SORT_SORTNULLS._serialized_start = 3094 - _SORT_SORTNULLS._serialized_end = 3176 - _LOCALRELATION._serialized_start = 3178 - _LOCALRELATION._serialized_end = 3271 - _SAMPLE._serialized_start = 3274 - _SAMPLE._serialized_end = 3458 + _RELATION._serialized_end = 865 + _UNKNOWN._serialized_start = 867 + _UNKNOWN._serialized_end = 876 + _RELATIONCOMMON._serialized_start = 878 + _RELATIONCOMMON._serialized_end = 949 + _SQL._serialized_start = 951 + _SQL._serialized_end = 978 + _READ._serialized_start = 981 + _READ._serialized_end = 1391 + _READ_NAMEDTABLE._serialized_start = 1123 + _READ_NAMEDTABLE._serialized_end = 1184 + _READ_DATASOURCE._serialized_start = 1187 + _READ_DATASOURCE._serialized_end = 1378 + _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 1320 + _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 1378 + _PROJECT._serialized_start = 1393 + _PROJECT._serialized_end = 1510 + _FILTER._serialized_start = 1512 + _FILTER._serialized_end = 1624 + _JOIN._serialized_start = 1627 + _JOIN._serialized_end = 2040 + _JOIN_JOINTYPE._serialized_start = 1853 + _JOIN_JOINTYPE._serialized_end = 2040 + _UNION._serialized_start = 2043 + _UNION._serialized_end = 2248 + _UNION_UNIONTYPE._serialized_start = 2164 + _UNION_UNIONTYPE._serialized_end = 2248 + _LIMIT._serialized_start = 2250 + _LIMIT._serialized_end = 2326 + _OFFSET._serialized_start = 2328 + _OFFSET._serialized_end = 2407 + _AGGREGATE._serialized_start = 2410 + _AGGREGATE._serialized_end = 2735 + _AGGREGATE_AGGREGATEFUNCTION._serialized_start = 2639 + _AGGREGATE_AGGREGATEFUNCTION._serialized_end = 2735 + _SORT._serialized_start = 2738 + _SORT._serialized_end = 3240 + _SORT_SORTFIELD._serialized_start = 2858 + _SORT_SORTFIELD._serialized_end = 3046 + _SORT_SORTDIRECTION._serialized_start = 3048 + _SORT_SORTDIRECTION._serialized_end = 3156 + _SORT_SORTNULLS._serialized_start = 3158 + _SORT_SORTNULLS._serialized_end = 3240 + _DEDUPLICATE._serialized_start = 3243 + _DEDUPLICATE._serialized_end = 3385 + _LOCALRELATION._serialized_start = 3387 + _LOCALRELATION._serialized_end = 3480 + _SAMPLE._serialized_start = 3483 + _SAMPLE._serialized_end = 3667 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi b/python/pyspark/sql/connect/proto/relations_pb2.pyi index f0a8b6412b51..fc135c559a65 100644 --- a/python/pyspark/sql/connect/proto/relations_pb2.pyi +++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi @@ -72,6 +72,7 @@ class Relation(google.protobuf.message.Message): LOCAL_RELATION_FIELD_NUMBER: builtins.int SAMPLE_FIELD_NUMBER: builtins.int OFFSET_FIELD_NUMBER: builtins.int + DEDUPLICATE_FIELD_NUMBER: builtins.int UNKNOWN_FIELD_NUMBER: builtins.int @property def common(self) -> global___RelationCommon: ... @@ -100,6 +101,8 @@ class Relation(google.protobuf.message.Message): @property def offset(self) -> global___Offset: ... @property + def deduplicate(self) -> global___Deduplicate: ... + @property def unknown(self) -> global___Unknown: ... def __init__( self, @@ -117,6 +120,7 @@ class Relation(google.protobuf.message.Message): local_relation: global___LocalRelation | None = ..., sample: global___Sample | None = ..., offset: global___Offset | None = ..., + deduplicate: global___Deduplicate | None = ..., unknown: global___Unknown | None = ..., ) -> None: ... def HasField( @@ -126,6 +130,8 @@ class Relation(google.protobuf.message.Message): b"aggregate", "common", b"common", + "deduplicate", + b"deduplicate", "filter", b"filter", "join", @@ -161,6 +167,8 @@ class Relation(google.protobuf.message.Message): b"aggregate", "common", b"common", + "deduplicate", + b"deduplicate", "filter", b"filter", "join", @@ -204,6 +212,7 @@ class Relation(google.protobuf.message.Message): "local_relation", "sample", "offset", + "deduplicate", "unknown", ] | None: ... @@ -759,6 +768,47 @@ class Sort(google.protobuf.message.Message): global___Sort = Sort +class Deduplicate(google.protobuf.message.Message): + """Relation of type [[Deduplicate]] which have duplicate rows removed, could consider either only + the subset of columns or all the columns. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + INPUT_FIELD_NUMBER: builtins.int + COLUMN_NAMES_FIELD_NUMBER: builtins.int + ALL_COLUMNS_AS_KEYS_FIELD_NUMBER: builtins.int + @property + def input(self) -> global___Relation: ... + @property + def column_names( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ... + all_columns_as_keys: builtins.bool + def __init__( + self, + *, + input: global___Relation | None = ..., + column_names: collections.abc.Iterable[builtins.str] | None = ..., + all_columns_as_keys: builtins.bool = ..., + ) -> None: ... + def HasField( + self, field_name: typing_extensions.Literal["input", b"input"] + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "all_columns_as_keys", + b"all_columns_as_keys", + "column_names", + b"column_names", + "input", + b"input", + ], + ) -> None: ... + +global___Deduplicate = Deduplicate + class LocalRelation(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor