diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index 30468501236e..20b067ddb468 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -20,6 +20,7 @@ syntax = 'proto3'; package spark.connect; import "spark/connect/expressions.proto"; +import "spark/connect/types.proto"; option java_multiple_files = true; option java_package = "org.apache.spark.connect.proto"; @@ -305,6 +306,17 @@ message LocalRelation { // Local collection data serialized into Arrow IPC streaming format which contains // the schema of the data. bytes data = 1; + + // (Optional) The user provided schema. + // + // The Sever side will update the column names and data types according to this schema. + oneof schema { + + DataType datatype = 2; + + // Server will use Catalyst parser to parse this string to DataType. + string datatype_str = 3; + } } // Relation of type [[Sample]] that samples a fraction of the dataset. diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 0ea8cc6c6349..a40fe9903071 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -371,6 +371,21 @@ class SparkConnectPlanner(session: SparkSession) { } } + private def parseDatatypeString(sqlText: String): DataType = { + val parser = session.sessionState.sqlParser + try { + parser.parseTableSchema(sqlText) + } catch { + case _: ParseException => + try { + parser.parseDataType(sqlText) + } catch { + case _: ParseException => + parser.parseDataType(s"struct<${sqlText.trim}>") + } + } + } + private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = { val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator( Iterator(rel.getData.toByteArray), @@ -380,7 +395,28 @@ class SparkConnectPlanner(session: SparkSession) { } val attributes = structType.toAttributes val proj = UnsafeProjection.create(attributes, attributes) - new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq) + val relation = logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq) + + if (!rel.hasDatatype && !rel.hasDatatypeStr) { + return relation + } + + val schemaType = if (rel.hasDatatype) { + DataTypeProtoConverter.toCatalystType(rel.getDatatype) + } else { + parseDatatypeString(rel.getDatatypeStr) + } + + val schemaStruct = schemaType match { + case s: StructType => s + case d => StructType(Seq(StructField("value", d))) + } + + Dataset + .ofRows(session, logicalPlan = relation) + .toDF(schemaStruct.names: _*) + .to(schemaStruct) + .logicalPlan } private def transformReadRel(rel: proto.Read): LogicalPlan = { diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 748a353e0c32..e8b6d79943e0 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -17,11 +17,13 @@ from typing import Any, List, Optional, Sequence, Union, cast, TYPE_CHECKING, Mapping, Dict import functools -import pandas import pyarrow as pa + +from pyspark.sql.types import DataType + import pyspark.sql.connect.proto as proto from pyspark.sql.connect.column import Column, SortOrder, ColumnReference - +from pyspark.sql.connect.types import pyspark_types_to_proto_types if TYPE_CHECKING: from pyspark.sql.connect._typing import ColumnOrName @@ -167,21 +169,34 @@ def _repr_html_(self) -> str: class LocalRelation(LogicalPlan): - """Creates a LocalRelation plan object based on a Pandas DataFrame.""" + """Creates a LocalRelation plan object based on a PyArrow Table.""" - def __init__(self, pdf: "pandas.DataFrame") -> None: + def __init__( + self, + table: "pa.Table", + schema: Optional[Union[DataType, str]] = None, + ) -> None: super().__init__(None) - self._pdf = pdf + assert table is not None and isinstance(table, pa.Table) + self._table = table + + if schema is not None: + assert isinstance(schema, (DataType, str)) + self._schema = schema def plan(self, session: "SparkConnectClient") -> proto.Relation: sink = pa.BufferOutputStream() - table = pa.Table.from_pandas(self._pdf) - with pa.ipc.new_stream(sink, table.schema) as writer: - for b in table.to_batches(): + with pa.ipc.new_stream(sink, self._table.schema) as writer: + for b in self._table.to_batches(): writer.write_batch(b) plan = proto.Relation() plan.local_relation.data = sink.getvalue().to_pybytes() + if self._schema is not None: + if isinstance(self._schema, DataType): + plan.local_relation.datatype.CopyFrom(pyspark_types_to_proto_types(self._schema)) + elif isinstance(self._schema, str): + plan.local_relation.datatype_str = self._schema return plan def print(self, indent: int = 0) -> str: diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py b/python/pyspark/sql/connect/proto/relations_pb2.py index 68e4c423cc42..d1651d0b723a 100644 --- a/python/pyspark/sql/connect/proto/relations_pb2.py +++ b/python/pyspark/sql/connect/proto/relations_pb2.py @@ -30,10 +30,11 @@ from pyspark.sql.connect.proto import expressions_pb2 as spark_dot_connect_dot_expressions__pb2 +from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__pb2 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto"\xbf\x0e\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04 \x01(\x0b\x32\x15.spark.connect.FilterH\x00R\x06\x66ilter\x12)\n\x04join\x18\x05 \x01(\x0b\x32\x13.spark.connect.JoinH\x00R\x04join\x12\x34\n\x06set_op\x18\x06 \x01(\x0b\x32\x1b.spark.connect.SetOperationH\x00R\x05setOp\x12)\n\x04sort\x18\x07 \x01(\x0b\x32\x13.spark.connect.SortH\x00R\x04sort\x12,\n\x05limit\x18\x08 \x01(\x0b\x32\x14.spark.connect.LimitH\x00R\x05limit\x12\x38\n\taggregate\x18\t \x01(\x0b\x32\x18.spark.connect.AggregateH\x00R\taggregate\x12&\n\x03sql\x18\n \x01(\x0b\x32\x12.spark.connect.SQLH\x00R\x03sql\x12\x45\n\x0elocal_relation\x18\x0b \x01(\x0b\x32\x1c.spark.connect.LocalRelationH\x00R\rlocalRelation\x12/\n\x06sample\x18\x0c \x01(\x0b\x32\x15.spark.connect.SampleH\x00R\x06sample\x12/\n\x06offset\x18\r \x01(\x0b\x32\x15.spark.connect.OffsetH\x00R\x06offset\x12>\n\x0b\x64\x65\x64uplicate\x18\x0e \x01(\x0b\x32\x1a.spark.connect.DeduplicateH\x00R\x0b\x64\x65\x64uplicate\x12,\n\x05range\x18\x0f \x01(\x0b\x32\x14.spark.connect.RangeH\x00R\x05range\x12\x45\n\x0esubquery_alias\x18\x10 \x01(\x0b\x32\x1c.spark.connect.SubqueryAliasH\x00R\rsubqueryAlias\x12>\n\x0brepartition\x18\x11 \x01(\x0b\x32\x1a.spark.connect.RepartitionH\x00R\x0brepartition\x12|\n#rename_columns_by_same_length_names\x18\x12 \x01(\x0b\x32-.spark.connect.RenameColumnsBySameLengthNamesH\x00R\x1erenameColumnsBySameLengthNames\x12w\n"rename_columns_by_name_to_name_map\x18\x13 \x01(\x0b\x32+.spark.connect.RenameColumnsByNameToNameMapH\x00R\x1crenameColumnsByNameToNameMap\x12<\n\x0bshow_string\x18\x14 \x01(\x0b\x32\x19.spark.connect.ShowStringH\x00R\nshowString\x12)\n\x04\x64rop\x18\x15 \x01(\x0b\x32\x13.spark.connect.DropH\x00R\x04\x64rop\x12)\n\x04tail\x18\x16 \x01(\x0b\x32\x13.spark.connect.TailH\x00R\x04tail\x12?\n\x0cwith_columns\x18\x17 \x01(\x0b\x32\x1a.spark.connect.WithColumnsH\x00R\x0bwithColumns\x12)\n\x04hint\x18\x18 \x01(\x0b\x32\x13.spark.connect.HintH\x00R\x04hint\x12\x32\n\x07unpivot\x18\x19 \x01(\x0b\x32\x16.spark.connect.UnpivotH\x00R\x07unpivot\x12\x30\n\x07\x66ill_na\x18Z \x01(\x0b\x32\x15.spark.connect.NAFillH\x00R\x06\x66illNa\x12\x30\n\x07\x64rop_na\x18[ \x01(\x0b\x32\x15.spark.connect.NADropH\x00R\x06\x64ropNa\x12\x34\n\x07replace\x18\\ \x01(\x0b\x32\x18.spark.connect.NAReplaceH\x00R\x07replace\x12\x36\n\x07summary\x18\x64 \x01(\x0b\x32\x1a.spark.connect.StatSummaryH\x00R\x07summary\x12\x39\n\x08\x63rosstab\x18\x65 \x01(\x0b\x32\x1b.spark.connect.StatCrosstabH\x00R\x08\x63rosstab\x12\x39\n\x08\x64\x65scribe\x18\x66 \x01(\x0b\x32\x1b.spark.connect.StatDescribeH\x00R\x08\x64\x65scribe\x12\x33\n\x07unknown\x18\xe7\x07 \x01(\x0b\x32\x16.spark.connect.UnknownH\x00R\x07unknownB\n\n\x08rel_type"\t\n\x07Unknown"1\n\x0eRelationCommon\x12\x1f\n\x0bsource_info\x18\x01 \x01(\tR\nsourceInfo"\x1b\n\x03SQL\x12\x14\n\x05query\x18\x01 \x01(\tR\x05query"\xaa\x03\n\x04Read\x12\x41\n\x0bnamed_table\x18\x01 \x01(\x0b\x32\x1e.spark.connect.Read.NamedTableH\x00R\nnamedTable\x12\x41\n\x0b\x64\x61ta_source\x18\x02 \x01(\x0b\x32\x1e.spark.connect.Read.DataSourceH\x00R\ndataSource\x1a=\n\nNamedTable\x12/\n\x13unparsed_identifier\x18\x01 \x01(\tR\x12unparsedIdentifier\x1a\xcf\x01\n\nDataSource\x12\x16\n\x06\x66ormat\x18\x01 \x01(\tR\x06\x66ormat\x12\x1b\n\x06schema\x18\x02 \x01(\tH\x00R\x06schema\x88\x01\x01\x12\x45\n\x07options\x18\x03 \x03(\x0b\x32+.spark.connect.Read.DataSource.OptionsEntryR\x07options\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07_schemaB\x0b\n\tread_type"u\n\x07Project\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12;\n\x0b\x65xpressions\x18\x03 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x0b\x65xpressions"p\n\x06\x46ilter\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x37\n\tcondition\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\tcondition"\xd7\x03\n\x04Join\x12+\n\x04left\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x04left\x12-\n\x05right\x18\x02 \x01(\x0b\x32\x17.spark.connect.RelationR\x05right\x12@\n\x0ejoin_condition\x18\x03 \x01(\x0b\x32\x19.spark.connect.ExpressionR\rjoinCondition\x12\x39\n\tjoin_type\x18\x04 \x01(\x0e\x32\x1c.spark.connect.Join.JoinTypeR\x08joinType\x12#\n\rusing_columns\x18\x05 \x03(\tR\x0cusingColumns"\xd0\x01\n\x08JoinType\x12\x19\n\x15JOIN_TYPE_UNSPECIFIED\x10\x00\x12\x13\n\x0fJOIN_TYPE_INNER\x10\x01\x12\x18\n\x14JOIN_TYPE_FULL_OUTER\x10\x02\x12\x18\n\x14JOIN_TYPE_LEFT_OUTER\x10\x03\x12\x19\n\x15JOIN_TYPE_RIGHT_OUTER\x10\x04\x12\x17\n\x13JOIN_TYPE_LEFT_ANTI\x10\x05\x12\x17\n\x13JOIN_TYPE_LEFT_SEMI\x10\x06\x12\x13\n\x0fJOIN_TYPE_CROSS\x10\x07"\x8c\x03\n\x0cSetOperation\x12\x36\n\nleft_input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\tleftInput\x12\x38\n\x0bright_input\x18\x02 \x01(\x0b\x32\x17.spark.connect.RelationR\nrightInput\x12\x45\n\x0bset_op_type\x18\x03 \x01(\x0e\x32%.spark.connect.SetOperation.SetOpTypeR\tsetOpType\x12\x1a\n\x06is_all\x18\x04 \x01(\x08H\x00R\x05isAll\x88\x01\x01\x12\x1c\n\x07\x62y_name\x18\x05 \x01(\x08H\x01R\x06\x62yName\x88\x01\x01"r\n\tSetOpType\x12\x1b\n\x17SET_OP_TYPE_UNSPECIFIED\x10\x00\x12\x19\n\x15SET_OP_TYPE_INTERSECT\x10\x01\x12\x15\n\x11SET_OP_TYPE_UNION\x10\x02\x12\x16\n\x12SET_OP_TYPE_EXCEPT\x10\x03\x42\t\n\x07_is_allB\n\n\x08_by_name"L\n\x05Limit\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x14\n\x05limit\x18\x02 \x01(\x05R\x05limit"O\n\x06Offset\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06offset\x18\x02 \x01(\x05R\x06offset"K\n\x04Tail\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x14\n\x05limit\x18\x02 \x01(\x05R\x05limit"\xd2\x01\n\tAggregate\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12L\n\x14grouping_expressions\x18\x02 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13groupingExpressions\x12H\n\x12result_expressions\x18\x03 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x11resultExpressions"\xa6\x04\n\x04Sort\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12>\n\x0bsort_fields\x18\x02 \x03(\x0b\x32\x1d.spark.connect.Sort.SortFieldR\nsortFields\x12 \n\tis_global\x18\x03 \x01(\x08H\x00R\x08isGlobal\x88\x01\x01\x1a\xbc\x01\n\tSortField\x12\x39\n\nexpression\x18\x01 \x01(\x0b\x32\x19.spark.connect.ExpressionR\nexpression\x12?\n\tdirection\x18\x02 \x01(\x0e\x32!.spark.connect.Sort.SortDirectionR\tdirection\x12\x33\n\x05nulls\x18\x03 \x01(\x0e\x32\x1d.spark.connect.Sort.SortNullsR\x05nulls"l\n\rSortDirection\x12\x1e\n\x1aSORT_DIRECTION_UNSPECIFIED\x10\x00\x12\x1c\n\x18SORT_DIRECTION_ASCENDING\x10\x01\x12\x1d\n\x19SORT_DIRECTION_DESCENDING\x10\x02"R\n\tSortNulls\x12\x1a\n\x16SORT_NULLS_UNSPECIFIED\x10\x00\x12\x14\n\x10SORT_NULLS_FIRST\x10\x01\x12\x13\n\x0fSORT_NULLS_LAST\x10\x02\x42\x0c\n\n_is_global"d\n\x04\x44rop\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12-\n\x04\x63ols\x18\x02 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x04\x63ols"\xab\x01\n\x0b\x44\x65\x64uplicate\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12!\n\x0c\x63olumn_names\x18\x02 \x03(\tR\x0b\x63olumnNames\x12\x32\n\x13\x61ll_columns_as_keys\x18\x03 \x01(\x08H\x00R\x10\x61llColumnsAsKeys\x88\x01\x01\x42\x16\n\x14_all_columns_as_keys"#\n\rLocalRelation\x12\x12\n\x04\x64\x61ta\x18\x01 \x01(\x0cR\x04\x64\x61ta"\xe0\x01\n\x06Sample\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1f\n\x0blower_bound\x18\x02 \x01(\x01R\nlowerBound\x12\x1f\n\x0bupper_bound\x18\x03 \x01(\x01R\nupperBound\x12.\n\x10with_replacement\x18\x04 \x01(\x08H\x00R\x0fwithReplacement\x88\x01\x01\x12\x17\n\x04seed\x18\x05 \x01(\x03H\x01R\x04seed\x88\x01\x01\x42\x13\n\x11_with_replacementB\x07\n\x05_seed"\x91\x01\n\x05Range\x12\x19\n\x05start\x18\x01 \x01(\x03H\x00R\x05start\x88\x01\x01\x12\x10\n\x03\x65nd\x18\x02 \x01(\x03R\x03\x65nd\x12\x12\n\x04step\x18\x03 \x01(\x03R\x04step\x12*\n\x0enum_partitions\x18\x04 \x01(\x05H\x01R\rnumPartitions\x88\x01\x01\x42\x08\n\x06_startB\x11\n\x0f_num_partitions"r\n\rSubqueryAlias\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x14\n\x05\x61lias\x18\x02 \x01(\tR\x05\x61lias\x12\x1c\n\tqualifier\x18\x03 \x03(\tR\tqualifier"\x8e\x01\n\x0bRepartition\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12%\n\x0enum_partitions\x18\x02 \x01(\x05R\rnumPartitions\x12\x1d\n\x07shuffle\x18\x03 \x01(\x08H\x00R\x07shuffle\x88\x01\x01\x42\n\n\x08_shuffle"\x8d\x01\n\nShowString\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x18\n\x07numRows\x18\x02 \x01(\x05R\x07numRows\x12\x1a\n\x08truncate\x18\x03 \x01(\x05R\x08truncate\x12\x1a\n\x08vertical\x18\x04 \x01(\x08R\x08vertical"\\\n\x0bStatSummary\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1e\n\nstatistics\x18\x02 \x03(\tR\nstatistics"Q\n\x0cStatDescribe\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04\x63ols\x18\x02 \x03(\tR\x04\x63ols"e\n\x0cStatCrosstab\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04\x63ol1\x18\x02 \x01(\tR\x04\x63ol1\x12\x12\n\x04\x63ol2\x18\x03 \x01(\tR\x04\x63ol2"\x86\x01\n\x06NAFill\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04\x63ols\x18\x02 \x03(\tR\x04\x63ols\x12\x39\n\x06values\x18\x03 \x03(\x0b\x32!.spark.connect.Expression.LiteralR\x06values"\x86\x01\n\x06NADrop\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04\x63ols\x18\x02 \x03(\tR\x04\x63ols\x12\'\n\rmin_non_nulls\x18\x03 \x01(\x05H\x00R\x0bminNonNulls\x88\x01\x01\x42\x10\n\x0e_min_non_nulls"\xa8\x02\n\tNAReplace\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04\x63ols\x18\x02 \x03(\tR\x04\x63ols\x12H\n\x0creplacements\x18\x03 \x03(\x0b\x32$.spark.connect.NAReplace.ReplacementR\x0creplacements\x1a\x8d\x01\n\x0bReplacement\x12>\n\told_value\x18\x01 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x08oldValue\x12>\n\tnew_value\x18\x02 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x08newValue"r\n\x1eRenameColumnsBySameLengthNames\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12!\n\x0c\x63olumn_names\x18\x02 \x03(\tR\x0b\x63olumnNames"\x83\x02\n\x1cRenameColumnsByNameToNameMap\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12o\n\x12rename_columns_map\x18\x02 \x03(\x0b\x32\x41.spark.connect.RenameColumnsByNameToNameMap.RenameColumnsMapEntryR\x10renameColumnsMap\x1a\x43\n\x15RenameColumnsMapEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01"\x83\x01\n\x0bWithColumns\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x45\n\x0ename_expr_list\x18\x02 \x03(\x0b\x32\x1f.spark.connect.Expression.AliasR\x0cnameExprList"\x8c\x01\n\x04Hint\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x41\n\nparameters\x18\x03 \x03(\x0b\x32!.spark.connect.Expression.LiteralR\nparameters"\xf6\x01\n\x07Unpivot\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12+\n\x03ids\x18\x02 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x03ids\x12\x31\n\x06values\x18\x03 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x06values\x12\x30\n\x14variable_column_name\x18\x04 \x01(\tR\x12variableColumnName\x12*\n\x11value_column_name\x18\x05 \x01(\tR\x0fvalueColumnNameB"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3' + b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto"\xbf\x0e\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04 \x01(\x0b\x32\x15.spark.connect.FilterH\x00R\x06\x66ilter\x12)\n\x04join\x18\x05 \x01(\x0b\x32\x13.spark.connect.JoinH\x00R\x04join\x12\x34\n\x06set_op\x18\x06 \x01(\x0b\x32\x1b.spark.connect.SetOperationH\x00R\x05setOp\x12)\n\x04sort\x18\x07 \x01(\x0b\x32\x13.spark.connect.SortH\x00R\x04sort\x12,\n\x05limit\x18\x08 \x01(\x0b\x32\x14.spark.connect.LimitH\x00R\x05limit\x12\x38\n\taggregate\x18\t \x01(\x0b\x32\x18.spark.connect.AggregateH\x00R\taggregate\x12&\n\x03sql\x18\n \x01(\x0b\x32\x12.spark.connect.SQLH\x00R\x03sql\x12\x45\n\x0elocal_relation\x18\x0b \x01(\x0b\x32\x1c.spark.connect.LocalRelationH\x00R\rlocalRelation\x12/\n\x06sample\x18\x0c \x01(\x0b\x32\x15.spark.connect.SampleH\x00R\x06sample\x12/\n\x06offset\x18\r \x01(\x0b\x32\x15.spark.connect.OffsetH\x00R\x06offset\x12>\n\x0b\x64\x65\x64uplicate\x18\x0e \x01(\x0b\x32\x1a.spark.connect.DeduplicateH\x00R\x0b\x64\x65\x64uplicate\x12,\n\x05range\x18\x0f \x01(\x0b\x32\x14.spark.connect.RangeH\x00R\x05range\x12\x45\n\x0esubquery_alias\x18\x10 \x01(\x0b\x32\x1c.spark.connect.SubqueryAliasH\x00R\rsubqueryAlias\x12>\n\x0brepartition\x18\x11 \x01(\x0b\x32\x1a.spark.connect.RepartitionH\x00R\x0brepartition\x12|\n#rename_columns_by_same_length_names\x18\x12 \x01(\x0b\x32-.spark.connect.RenameColumnsBySameLengthNamesH\x00R\x1erenameColumnsBySameLengthNames\x12w\n"rename_columns_by_name_to_name_map\x18\x13 \x01(\x0b\x32+.spark.connect.RenameColumnsByNameToNameMapH\x00R\x1crenameColumnsByNameToNameMap\x12<\n\x0bshow_string\x18\x14 \x01(\x0b\x32\x19.spark.connect.ShowStringH\x00R\nshowString\x12)\n\x04\x64rop\x18\x15 \x01(\x0b\x32\x13.spark.connect.DropH\x00R\x04\x64rop\x12)\n\x04tail\x18\x16 \x01(\x0b\x32\x13.spark.connect.TailH\x00R\x04tail\x12?\n\x0cwith_columns\x18\x17 \x01(\x0b\x32\x1a.spark.connect.WithColumnsH\x00R\x0bwithColumns\x12)\n\x04hint\x18\x18 \x01(\x0b\x32\x13.spark.connect.HintH\x00R\x04hint\x12\x32\n\x07unpivot\x18\x19 \x01(\x0b\x32\x16.spark.connect.UnpivotH\x00R\x07unpivot\x12\x30\n\x07\x66ill_na\x18Z \x01(\x0b\x32\x15.spark.connect.NAFillH\x00R\x06\x66illNa\x12\x30\n\x07\x64rop_na\x18[ \x01(\x0b\x32\x15.spark.connect.NADropH\x00R\x06\x64ropNa\x12\x34\n\x07replace\x18\\ \x01(\x0b\x32\x18.spark.connect.NAReplaceH\x00R\x07replace\x12\x36\n\x07summary\x18\x64 \x01(\x0b\x32\x1a.spark.connect.StatSummaryH\x00R\x07summary\x12\x39\n\x08\x63rosstab\x18\x65 \x01(\x0b\x32\x1b.spark.connect.StatCrosstabH\x00R\x08\x63rosstab\x12\x39\n\x08\x64\x65scribe\x18\x66 \x01(\x0b\x32\x1b.spark.connect.StatDescribeH\x00R\x08\x64\x65scribe\x12\x33\n\x07unknown\x18\xe7\x07 \x01(\x0b\x32\x16.spark.connect.UnknownH\x00R\x07unknownB\n\n\x08rel_type"\t\n\x07Unknown"1\n\x0eRelationCommon\x12\x1f\n\x0bsource_info\x18\x01 \x01(\tR\nsourceInfo"\x1b\n\x03SQL\x12\x14\n\x05query\x18\x01 \x01(\tR\x05query"\xaa\x03\n\x04Read\x12\x41\n\x0bnamed_table\x18\x01 \x01(\x0b\x32\x1e.spark.connect.Read.NamedTableH\x00R\nnamedTable\x12\x41\n\x0b\x64\x61ta_source\x18\x02 \x01(\x0b\x32\x1e.spark.connect.Read.DataSourceH\x00R\ndataSource\x1a=\n\nNamedTable\x12/\n\x13unparsed_identifier\x18\x01 \x01(\tR\x12unparsedIdentifier\x1a\xcf\x01\n\nDataSource\x12\x16\n\x06\x66ormat\x18\x01 \x01(\tR\x06\x66ormat\x12\x1b\n\x06schema\x18\x02 \x01(\tH\x00R\x06schema\x88\x01\x01\x12\x45\n\x07options\x18\x03 \x03(\x0b\x32+.spark.connect.Read.DataSource.OptionsEntryR\x07options\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07_schemaB\x0b\n\tread_type"u\n\x07Project\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12;\n\x0b\x65xpressions\x18\x03 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x0b\x65xpressions"p\n\x06\x46ilter\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x37\n\tcondition\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\tcondition"\xd7\x03\n\x04Join\x12+\n\x04left\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x04left\x12-\n\x05right\x18\x02 \x01(\x0b\x32\x17.spark.connect.RelationR\x05right\x12@\n\x0ejoin_condition\x18\x03 \x01(\x0b\x32\x19.spark.connect.ExpressionR\rjoinCondition\x12\x39\n\tjoin_type\x18\x04 \x01(\x0e\x32\x1c.spark.connect.Join.JoinTypeR\x08joinType\x12#\n\rusing_columns\x18\x05 \x03(\tR\x0cusingColumns"\xd0\x01\n\x08JoinType\x12\x19\n\x15JOIN_TYPE_UNSPECIFIED\x10\x00\x12\x13\n\x0fJOIN_TYPE_INNER\x10\x01\x12\x18\n\x14JOIN_TYPE_FULL_OUTER\x10\x02\x12\x18\n\x14JOIN_TYPE_LEFT_OUTER\x10\x03\x12\x19\n\x15JOIN_TYPE_RIGHT_OUTER\x10\x04\x12\x17\n\x13JOIN_TYPE_LEFT_ANTI\x10\x05\x12\x17\n\x13JOIN_TYPE_LEFT_SEMI\x10\x06\x12\x13\n\x0fJOIN_TYPE_CROSS\x10\x07"\x8c\x03\n\x0cSetOperation\x12\x36\n\nleft_input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\tleftInput\x12\x38\n\x0bright_input\x18\x02 \x01(\x0b\x32\x17.spark.connect.RelationR\nrightInput\x12\x45\n\x0bset_op_type\x18\x03 \x01(\x0e\x32%.spark.connect.SetOperation.SetOpTypeR\tsetOpType\x12\x1a\n\x06is_all\x18\x04 \x01(\x08H\x00R\x05isAll\x88\x01\x01\x12\x1c\n\x07\x62y_name\x18\x05 \x01(\x08H\x01R\x06\x62yName\x88\x01\x01"r\n\tSetOpType\x12\x1b\n\x17SET_OP_TYPE_UNSPECIFIED\x10\x00\x12\x19\n\x15SET_OP_TYPE_INTERSECT\x10\x01\x12\x15\n\x11SET_OP_TYPE_UNION\x10\x02\x12\x16\n\x12SET_OP_TYPE_EXCEPT\x10\x03\x42\t\n\x07_is_allB\n\n\x08_by_name"L\n\x05Limit\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x14\n\x05limit\x18\x02 \x01(\x05R\x05limit"O\n\x06Offset\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06offset\x18\x02 \x01(\x05R\x06offset"K\n\x04Tail\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x14\n\x05limit\x18\x02 \x01(\x05R\x05limit"\xd2\x01\n\tAggregate\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12L\n\x14grouping_expressions\x18\x02 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13groupingExpressions\x12H\n\x12result_expressions\x18\x03 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x11resultExpressions"\xa6\x04\n\x04Sort\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12>\n\x0bsort_fields\x18\x02 \x03(\x0b\x32\x1d.spark.connect.Sort.SortFieldR\nsortFields\x12 \n\tis_global\x18\x03 \x01(\x08H\x00R\x08isGlobal\x88\x01\x01\x1a\xbc\x01\n\tSortField\x12\x39\n\nexpression\x18\x01 \x01(\x0b\x32\x19.spark.connect.ExpressionR\nexpression\x12?\n\tdirection\x18\x02 \x01(\x0e\x32!.spark.connect.Sort.SortDirectionR\tdirection\x12\x33\n\x05nulls\x18\x03 \x01(\x0e\x32\x1d.spark.connect.Sort.SortNullsR\x05nulls"l\n\rSortDirection\x12\x1e\n\x1aSORT_DIRECTION_UNSPECIFIED\x10\x00\x12\x1c\n\x18SORT_DIRECTION_ASCENDING\x10\x01\x12\x1d\n\x19SORT_DIRECTION_DESCENDING\x10\x02"R\n\tSortNulls\x12\x1a\n\x16SORT_NULLS_UNSPECIFIED\x10\x00\x12\x14\n\x10SORT_NULLS_FIRST\x10\x01\x12\x13\n\x0fSORT_NULLS_LAST\x10\x02\x42\x0c\n\n_is_global"d\n\x04\x44rop\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12-\n\x04\x63ols\x18\x02 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x04\x63ols"\xab\x01\n\x0b\x44\x65\x64uplicate\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12!\n\x0c\x63olumn_names\x18\x02 \x03(\tR\x0b\x63olumnNames\x12\x32\n\x13\x61ll_columns_as_keys\x18\x03 \x01(\x08H\x00R\x10\x61llColumnsAsKeys\x88\x01\x01\x42\x16\n\x14_all_columns_as_keys"\x89\x01\n\rLocalRelation\x12\x12\n\x04\x64\x61ta\x18\x01 \x01(\x0cR\x04\x64\x61ta\x12\x35\n\x08\x64\x61tatype\x18\x02 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x00R\x08\x64\x61tatype\x12#\n\x0c\x64\x61tatype_str\x18\x03 \x01(\tH\x00R\x0b\x64\x61tatypeStrB\x08\n\x06schema"\xe0\x01\n\x06Sample\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1f\n\x0blower_bound\x18\x02 \x01(\x01R\nlowerBound\x12\x1f\n\x0bupper_bound\x18\x03 \x01(\x01R\nupperBound\x12.\n\x10with_replacement\x18\x04 \x01(\x08H\x00R\x0fwithReplacement\x88\x01\x01\x12\x17\n\x04seed\x18\x05 \x01(\x03H\x01R\x04seed\x88\x01\x01\x42\x13\n\x11_with_replacementB\x07\n\x05_seed"\x91\x01\n\x05Range\x12\x19\n\x05start\x18\x01 \x01(\x03H\x00R\x05start\x88\x01\x01\x12\x10\n\x03\x65nd\x18\x02 \x01(\x03R\x03\x65nd\x12\x12\n\x04step\x18\x03 \x01(\x03R\x04step\x12*\n\x0enum_partitions\x18\x04 \x01(\x05H\x01R\rnumPartitions\x88\x01\x01\x42\x08\n\x06_startB\x11\n\x0f_num_partitions"r\n\rSubqueryAlias\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x14\n\x05\x61lias\x18\x02 \x01(\tR\x05\x61lias\x12\x1c\n\tqualifier\x18\x03 \x03(\tR\tqualifier"\x8e\x01\n\x0bRepartition\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12%\n\x0enum_partitions\x18\x02 \x01(\x05R\rnumPartitions\x12\x1d\n\x07shuffle\x18\x03 \x01(\x08H\x00R\x07shuffle\x88\x01\x01\x42\n\n\x08_shuffle"\x8d\x01\n\nShowString\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x18\n\x07numRows\x18\x02 \x01(\x05R\x07numRows\x12\x1a\n\x08truncate\x18\x03 \x01(\x05R\x08truncate\x12\x1a\n\x08vertical\x18\x04 \x01(\x08R\x08vertical"\\\n\x0bStatSummary\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1e\n\nstatistics\x18\x02 \x03(\tR\nstatistics"Q\n\x0cStatDescribe\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04\x63ols\x18\x02 \x03(\tR\x04\x63ols"e\n\x0cStatCrosstab\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04\x63ol1\x18\x02 \x01(\tR\x04\x63ol1\x12\x12\n\x04\x63ol2\x18\x03 \x01(\tR\x04\x63ol2"\x86\x01\n\x06NAFill\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04\x63ols\x18\x02 \x03(\tR\x04\x63ols\x12\x39\n\x06values\x18\x03 \x03(\x0b\x32!.spark.connect.Expression.LiteralR\x06values"\x86\x01\n\x06NADrop\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04\x63ols\x18\x02 \x03(\tR\x04\x63ols\x12\'\n\rmin_non_nulls\x18\x03 \x01(\x05H\x00R\x0bminNonNulls\x88\x01\x01\x42\x10\n\x0e_min_non_nulls"\xa8\x02\n\tNAReplace\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04\x63ols\x18\x02 \x03(\tR\x04\x63ols\x12H\n\x0creplacements\x18\x03 \x03(\x0b\x32$.spark.connect.NAReplace.ReplacementR\x0creplacements\x1a\x8d\x01\n\x0bReplacement\x12>\n\told_value\x18\x01 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x08oldValue\x12>\n\tnew_value\x18\x02 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x08newValue"r\n\x1eRenameColumnsBySameLengthNames\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12!\n\x0c\x63olumn_names\x18\x02 \x03(\tR\x0b\x63olumnNames"\x83\x02\n\x1cRenameColumnsByNameToNameMap\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12o\n\x12rename_columns_map\x18\x02 \x03(\x0b\x32\x41.spark.connect.RenameColumnsByNameToNameMap.RenameColumnsMapEntryR\x10renameColumnsMap\x1a\x43\n\x15RenameColumnsMapEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01"\x83\x01\n\x0bWithColumns\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x45\n\x0ename_expr_list\x18\x02 \x03(\x0b\x32\x1f.spark.connect.Expression.AliasR\x0cnameExprList"\x8c\x01\n\x04Hint\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x41\n\nparameters\x18\x03 \x03(\x0b\x32!.spark.connect.Expression.LiteralR\nparameters"\xf6\x01\n\x07Unpivot\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12+\n\x03ids\x18\x02 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x03ids\x12\x31\n\x06values\x18\x03 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x06values\x12\x30\n\x14variable_column_name\x18\x04 \x01(\tR\x12variableColumnName\x12*\n\x11value_column_name\x18\x05 \x01(\tR\x0fvalueColumnNameB"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3' ) @@ -513,90 +514,90 @@ _READ_DATASOURCE_OPTIONSENTRY._serialized_options = b"8\001" _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._options = None _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_options = b"8\001" - _RELATION._serialized_start = 82 - _RELATION._serialized_end = 1937 - _UNKNOWN._serialized_start = 1939 - _UNKNOWN._serialized_end = 1948 - _RELATIONCOMMON._serialized_start = 1950 - _RELATIONCOMMON._serialized_end = 1999 - _SQL._serialized_start = 2001 - _SQL._serialized_end = 2028 - _READ._serialized_start = 2031 - _READ._serialized_end = 2457 - _READ_NAMEDTABLE._serialized_start = 2173 - _READ_NAMEDTABLE._serialized_end = 2234 - _READ_DATASOURCE._serialized_start = 2237 - _READ_DATASOURCE._serialized_end = 2444 - _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 2375 - _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 2433 - _PROJECT._serialized_start = 2459 - _PROJECT._serialized_end = 2576 - _FILTER._serialized_start = 2578 - _FILTER._serialized_end = 2690 - _JOIN._serialized_start = 2693 - _JOIN._serialized_end = 3164 - _JOIN_JOINTYPE._serialized_start = 2956 - _JOIN_JOINTYPE._serialized_end = 3164 - _SETOPERATION._serialized_start = 3167 - _SETOPERATION._serialized_end = 3563 - _SETOPERATION_SETOPTYPE._serialized_start = 3426 - _SETOPERATION_SETOPTYPE._serialized_end = 3540 - _LIMIT._serialized_start = 3565 - _LIMIT._serialized_end = 3641 - _OFFSET._serialized_start = 3643 - _OFFSET._serialized_end = 3722 - _TAIL._serialized_start = 3724 - _TAIL._serialized_end = 3799 - _AGGREGATE._serialized_start = 3802 - _AGGREGATE._serialized_end = 4012 - _SORT._serialized_start = 4015 - _SORT._serialized_end = 4565 - _SORT_SORTFIELD._serialized_start = 4169 - _SORT_SORTFIELD._serialized_end = 4357 - _SORT_SORTDIRECTION._serialized_start = 4359 - _SORT_SORTDIRECTION._serialized_end = 4467 - _SORT_SORTNULLS._serialized_start = 4469 - _SORT_SORTNULLS._serialized_end = 4551 - _DROP._serialized_start = 4567 - _DROP._serialized_end = 4667 - _DEDUPLICATE._serialized_start = 4670 - _DEDUPLICATE._serialized_end = 4841 - _LOCALRELATION._serialized_start = 4843 - _LOCALRELATION._serialized_end = 4878 - _SAMPLE._serialized_start = 4881 - _SAMPLE._serialized_end = 5105 - _RANGE._serialized_start = 5108 - _RANGE._serialized_end = 5253 - _SUBQUERYALIAS._serialized_start = 5255 - _SUBQUERYALIAS._serialized_end = 5369 - _REPARTITION._serialized_start = 5372 - _REPARTITION._serialized_end = 5514 - _SHOWSTRING._serialized_start = 5517 - _SHOWSTRING._serialized_end = 5658 - _STATSUMMARY._serialized_start = 5660 - _STATSUMMARY._serialized_end = 5752 - _STATDESCRIBE._serialized_start = 5754 - _STATDESCRIBE._serialized_end = 5835 - _STATCROSSTAB._serialized_start = 5837 - _STATCROSSTAB._serialized_end = 5938 - _NAFILL._serialized_start = 5941 - _NAFILL._serialized_end = 6075 - _NADROP._serialized_start = 6078 - _NADROP._serialized_end = 6212 - _NAREPLACE._serialized_start = 6215 - _NAREPLACE._serialized_end = 6511 - _NAREPLACE_REPLACEMENT._serialized_start = 6370 - _NAREPLACE_REPLACEMENT._serialized_end = 6511 - _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_start = 6513 - _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_end = 6627 - _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_start = 6630 - _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_end = 6889 - _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_start = 6822 - _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_end = 6889 - _WITHCOLUMNS._serialized_start = 6892 - _WITHCOLUMNS._serialized_end = 7023 - _HINT._serialized_start = 7026 - _HINT._serialized_end = 7166 - _UNPIVOT._serialized_start = 7169 - _UNPIVOT._serialized_end = 7415 + _RELATION._serialized_start = 109 + _RELATION._serialized_end = 1964 + _UNKNOWN._serialized_start = 1966 + _UNKNOWN._serialized_end = 1975 + _RELATIONCOMMON._serialized_start = 1977 + _RELATIONCOMMON._serialized_end = 2026 + _SQL._serialized_start = 2028 + _SQL._serialized_end = 2055 + _READ._serialized_start = 2058 + _READ._serialized_end = 2484 + _READ_NAMEDTABLE._serialized_start = 2200 + _READ_NAMEDTABLE._serialized_end = 2261 + _READ_DATASOURCE._serialized_start = 2264 + _READ_DATASOURCE._serialized_end = 2471 + _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 2402 + _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 2460 + _PROJECT._serialized_start = 2486 + _PROJECT._serialized_end = 2603 + _FILTER._serialized_start = 2605 + _FILTER._serialized_end = 2717 + _JOIN._serialized_start = 2720 + _JOIN._serialized_end = 3191 + _JOIN_JOINTYPE._serialized_start = 2983 + _JOIN_JOINTYPE._serialized_end = 3191 + _SETOPERATION._serialized_start = 3194 + _SETOPERATION._serialized_end = 3590 + _SETOPERATION_SETOPTYPE._serialized_start = 3453 + _SETOPERATION_SETOPTYPE._serialized_end = 3567 + _LIMIT._serialized_start = 3592 + _LIMIT._serialized_end = 3668 + _OFFSET._serialized_start = 3670 + _OFFSET._serialized_end = 3749 + _TAIL._serialized_start = 3751 + _TAIL._serialized_end = 3826 + _AGGREGATE._serialized_start = 3829 + _AGGREGATE._serialized_end = 4039 + _SORT._serialized_start = 4042 + _SORT._serialized_end = 4592 + _SORT_SORTFIELD._serialized_start = 4196 + _SORT_SORTFIELD._serialized_end = 4384 + _SORT_SORTDIRECTION._serialized_start = 4386 + _SORT_SORTDIRECTION._serialized_end = 4494 + _SORT_SORTNULLS._serialized_start = 4496 + _SORT_SORTNULLS._serialized_end = 4578 + _DROP._serialized_start = 4594 + _DROP._serialized_end = 4694 + _DEDUPLICATE._serialized_start = 4697 + _DEDUPLICATE._serialized_end = 4868 + _LOCALRELATION._serialized_start = 4871 + _LOCALRELATION._serialized_end = 5008 + _SAMPLE._serialized_start = 5011 + _SAMPLE._serialized_end = 5235 + _RANGE._serialized_start = 5238 + _RANGE._serialized_end = 5383 + _SUBQUERYALIAS._serialized_start = 5385 + _SUBQUERYALIAS._serialized_end = 5499 + _REPARTITION._serialized_start = 5502 + _REPARTITION._serialized_end = 5644 + _SHOWSTRING._serialized_start = 5647 + _SHOWSTRING._serialized_end = 5788 + _STATSUMMARY._serialized_start = 5790 + _STATSUMMARY._serialized_end = 5882 + _STATDESCRIBE._serialized_start = 5884 + _STATDESCRIBE._serialized_end = 5965 + _STATCROSSTAB._serialized_start = 5967 + _STATCROSSTAB._serialized_end = 6068 + _NAFILL._serialized_start = 6071 + _NAFILL._serialized_end = 6205 + _NADROP._serialized_start = 6208 + _NADROP._serialized_end = 6342 + _NAREPLACE._serialized_start = 6345 + _NAREPLACE._serialized_end = 6641 + _NAREPLACE_REPLACEMENT._serialized_start = 6500 + _NAREPLACE_REPLACEMENT._serialized_end = 6641 + _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_start = 6643 + _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_end = 6757 + _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_start = 6760 + _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_end = 7019 + _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_start = 6952 + _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_end = 7019 + _WITHCOLUMNS._serialized_start = 7022 + _WITHCOLUMNS._serialized_end = 7153 + _HINT._serialized_start = 7156 + _HINT._serialized_end = 7296 + _UNPIVOT._serialized_start = 7299 + _UNPIVOT._serialized_end = 7545 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi b/python/pyspark/sql/connect/proto/relations_pb2.pyi index db8720920022..e942a63629d5 100644 --- a/python/pyspark/sql/connect/proto/relations_pb2.pyi +++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi @@ -40,6 +40,7 @@ import google.protobuf.internal.containers import google.protobuf.internal.enum_type_wrapper import google.protobuf.message import pyspark.sql.connect.proto.expressions_pb2 +import pyspark.sql.connect.proto.types_pb2 import sys import typing @@ -1168,16 +1169,45 @@ class LocalRelation(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor DATA_FIELD_NUMBER: builtins.int + DATATYPE_FIELD_NUMBER: builtins.int + DATATYPE_STR_FIELD_NUMBER: builtins.int data: builtins.bytes """Local collection data serialized into Arrow IPC streaming format which contains the schema of the data. """ + @property + def datatype(self) -> pyspark.sql.connect.proto.types_pb2.DataType: ... + datatype_str: builtins.str + """Server will use Catalyst parser to parse this string to DataType.""" def __init__( self, *, data: builtins.bytes = ..., + datatype: pyspark.sql.connect.proto.types_pb2.DataType | None = ..., + datatype_str: builtins.str = ..., + ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "datatype", b"datatype", "datatype_str", b"datatype_str", "schema", b"schema" + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "data", + b"data", + "datatype", + b"datatype", + "datatype_str", + b"datatype_str", + "schema", + b"schema", + ], ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["data", b"data"]) -> None: ... + def WhichOneof( + self, oneof_group: typing_extensions.Literal["schema", b"schema"] + ) -> typing_extensions.Literal["datatype", "datatype_str"] | None: ... global___LocalRelation = LocalRelation diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 28aebbdecb88..0a3d03110fc7 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -16,17 +16,35 @@ # from threading import RLock -from typing import Optional, Any, Union, Dict, cast, overload +from collections.abc import Sized + +import numpy as np import pandas as pd +import pyarrow as pa + +from pyspark.sql.types import DataType, StructType -import pyspark.sql.types from pyspark.sql.connect.client import SparkConnectClient from pyspark.sql.connect.dataframe import DataFrame -from pyspark.sql.connect.plan import SQL, Range +from pyspark.sql.connect.plan import SQL, Range, LocalRelation from pyspark.sql.connect.readwriter import DataFrameReader from pyspark.sql.utils import to_str -from . import plan -from ._typing import OptionalPrimitiveType + +from typing import ( + Optional, + Any, + Union, + Dict, + List, + Tuple, + cast, + overload, + Iterable, + TYPE_CHECKING, +) + +if TYPE_CHECKING: + from pyspark.sql.connect._typing import OptionalPrimitiveType # TODO(SPARK-38912): This method can be dropped once support for Python 3.8 is dropped @@ -240,7 +258,11 @@ def read(self) -> "DataFrameReader": """ return DataFrameReader(self) - def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame": + def createDataFrame( + self, + data: Union["pd.DataFrame", "np.ndarray", Iterable[Any]], + schema: Optional[Union[StructType, str, List[str], Tuple[str, ...]]] = None, + ) -> "DataFrame": """ Creates a :class:`DataFrame` from a :class:`pandas.DataFrame`. @@ -249,7 +271,15 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame": Parameters ---------- - data : :class:`pandas.DataFrame` + data : :class:`pandas.DataFrame` or :class:`list`, or :class:`numpy.ndarray`. + schema : :class:`pyspark.sql.types.DataType`, str or list, optional + + When ``schema`` is :class:`pyspark.sql.types.DataType` or a datatype string, it must + match the real data, or an exception will be thrown at runtime. If the given schema is + not :class:`pyspark.sql.types.StructType`, it will be wrapped into a + :class:`pyspark.sql.types.StructType` as its only field, and the field name will be + "value". Each record will also be wrapped into a tuple, which can be converted to row + later. Returns ------- @@ -264,9 +294,71 @@ def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame": """ assert data is not None - if len(data) == 0: + if isinstance(data, DataFrame): + raise TypeError("data is already a DataFrame") + if isinstance(data, Sized) and len(data) == 0: raise ValueError("Input data cannot be empty") - return DataFrame.withPlan(plan.LocalRelation(data), self) + + _schema: Optional[StructType] = None + _schema_str: Optional[str] = None + _cols: Optional[List[str]] = None + + if isinstance(schema, StructType): + _schema = schema + + elif isinstance(schema, str): + _schema_str = schema + + elif isinstance(schema, (list, tuple)): + # Must re-encode any unicode strings to be consistent with StructField names + _cols = [x.encode("utf-8") if not isinstance(x, str) else x for x in schema] + + # Create the Pandas DataFrame + if isinstance(data, pd.DataFrame): + pdf = data + + elif isinstance(data, np.ndarray): + # `data` of numpy.ndarray type will be converted to a pandas DataFrame, + if data.ndim not in [1, 2]: + raise ValueError("NumPy array input should be of 1 or 2 dimensions.") + + pdf = pd.DataFrame(data) + + if _cols is None: + if data.ndim == 1 or data.shape[1] == 1: + _cols = ["value"] + else: + _cols = ["_%s" % i for i in range(1, data.shape[1] + 1)] + + else: + pdf = pd.DataFrame(list(data)) + + if _cols is None: + _cols = ["_%s" % i for i in range(1, pdf.shape[1] + 1)] + + # Validate number of columns + num_cols = pdf.shape[1] + if _schema is not None and len(_schema.fields) != num_cols: + raise ValueError( + f"Length mismatch: Expected axis has {num_cols} elements, " + f"new values have {len(_schema.fields)} elements" + ) + elif _cols is not None and len(_cols) != num_cols: + raise ValueError( + f"Length mismatch: Expected axis has {num_cols} elements, " + f"new values have {len(_cols)} elements" + ) + + table = pa.Table.from_pandas(pdf) + + if _schema is not None: + return DataFrame.withPlan(LocalRelation(table, schema=_schema), self) + elif _schema_str is not None: + return DataFrame.withPlan(LocalRelation(table, schema=_schema_str), self) + elif _cols is not None and len(_cols) > 0: + return DataFrame.withPlan(LocalRelation(table), self).toDF(*_cols) + else: + return DataFrame.withPlan(LocalRelation(table), self) @property def client(self) -> "SparkConnectClient": @@ -279,9 +371,7 @@ def client(self) -> "SparkConnectClient": """ return self._client - def register_udf( - self, function: Any, return_type: Union[str, pyspark.sql.types.DataType] - ) -> str: + def register_udf(self, function: Any, return_type: Union[str, DataType]) -> str: return self._client.register_udf(function, return_type) def sql(self, sql_string: str) -> "DataFrame": diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index ae3813b43ae5..c2a17c7786ee 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -17,6 +17,7 @@ from typing import Any import unittest import shutil +import numpy as np import tempfile import grpc # type: ignore @@ -45,9 +46,6 @@ from pyspark.testing.utils import ReusedPySparkTestCase -import tempfile - - @unittest.skipIf(not should_test_connect, connect_requirement_message) class SparkConnectSQLTestCase(PandasOnSparkTestCase, ReusedPySparkTestCase, SQLTestUtils): """Parent test fixture class for all Spark Connect related @@ -224,6 +222,90 @@ def test_with_local_data(self): with self.assertRaises(ValueError): self.connect.createDataFrame(pdf) + def test_with_local_ndarray(self): + """SPARK-41446: Test creating a dataframe using local list""" + data = np.array([[1, 2, 3, 4], [5, 6, 7, 8]]) + + sdf = self.spark.createDataFrame(data) + cdf = self.connect.createDataFrame(data) + self.assertEqual(sdf.schema, cdf.schema) + self.assert_eq(sdf.toPandas(), cdf.toPandas()) + + # TODO: add cases for StructType after 'pyspark_types_to_proto_types' support StructType + for schema in [ + "struct", + "col1 int, col2 int, col3 int, col4 int", + "col1 int, col2 long, col3 string, col4 long", + "col1 int, col2 string, col3 short, col4 long", + ["a", "b", "c", "d"], + ("x1", "x2", "x3", "x4"), + ]: + sdf = self.spark.createDataFrame(data, schema=schema) + cdf = self.connect.createDataFrame(data, schema=schema) + + self.assertEqual(sdf.schema, cdf.schema) + self.assert_eq(sdf.toPandas(), cdf.toPandas()) + + with self.assertRaisesRegex( + ValueError, + "Length mismatch: Expected axis has 4 elements, new values have 5 elements", + ): + self.connect.createDataFrame(data, ["a", "b", "c", "d", "e"]) + + with self.assertRaises(grpc.RpcError): + self.connect.createDataFrame( + data, "col1 magic_type, col2 int, col3 int, col4 int" + ).show() + + with self.assertRaises(grpc.RpcError): + self.connect.createDataFrame(data, "col1 int, col2 int, col3 int").show() + + def test_with_local_list(self): + """SPARK-41446: Test creating a dataframe using local list""" + data = [[1, 2, 3, 4]] + + sdf = self.spark.createDataFrame(data) + cdf = self.connect.createDataFrame(data) + self.assertEqual(sdf.schema, cdf.schema) + self.assert_eq(sdf.toPandas(), cdf.toPandas()) + + for schema in [ + "struct", + "col1 int, col2 int, col3 int, col4 int", + "col1 int, col2 long, col3 string, col4 long", + "col1 int, col2 string, col3 short, col4 long", + ["a", "b", "c", "d"], + ("x1", "x2", "x3", "x4"), + ]: + sdf = self.spark.createDataFrame(data, schema=schema) + cdf = self.connect.createDataFrame(data, schema=schema) + + self.assertEqual(sdf.schema, cdf.schema) + self.assert_eq(sdf.toPandas(), cdf.toPandas()) + + with self.assertRaisesRegex( + ValueError, + "Length mismatch: Expected axis has 4 elements, new values have 5 elements", + ): + self.connect.createDataFrame(data, ["a", "b", "c", "d", "e"]) + + with self.assertRaises(grpc.RpcError): + self.connect.createDataFrame( + data, "col1 magic_type, col2 int, col3 int, col4 int" + ).show() + + with self.assertRaises(grpc.RpcError): + self.connect.createDataFrame(data, "col1 int, col2 int, col3 int").show() + + def test_with_atom_type(self): + for data in [[(1), (2), (3)], [1, 2, 3]]: + for schema in ["long", "int", "short"]: + sdf = self.spark.createDataFrame(data, schema=schema) + cdf = self.connect.createDataFrame(data, schema=schema) + + self.assertEqual(sdf.schema, cdf.schema) + self.assert_eq(sdf.toPandas(), cdf.toPandas()) + def test_simple_explain_string(self): df = self.connect.read.table(self.tbl_name).limit(10) result = df._explain_string()