diff --git a/python/pyspark/pipelines/api.py b/python/pyspark/pipelines/api.py index b68cc30b43a7d..c01e8524eee20 100644 --- a/python/pyspark/pipelines/api.py +++ b/python/pyspark/pipelines/api.py @@ -76,6 +76,7 @@ def _validate_stored_dataset_args( name: Optional[str], table_properties: Optional[Dict[str, str]], partition_cols: Optional[List[str]], + cluster_by: Optional[List[str]], ) -> None: if name is not None and type(name) is not str: raise PySparkTypeError( @@ -91,6 +92,7 @@ def _validate_stored_dataset_args( }, ) validate_optional_list_of_str_arg(arg_name="partition_cols", arg_value=partition_cols) + validate_optional_list_of_str_arg(arg_name="cluster_by", arg_value=cluster_by) @overload @@ -107,6 +109,7 @@ def table( spark_conf: Optional[Dict[str, str]] = None, table_properties: Optional[Dict[str, str]] = None, partition_cols: Optional[List[str]] = None, + cluster_by: Optional[List[str]] = None, schema: Optional[Union[StructType, str]] = None, ) -> Callable[[QueryFunction], None]: ... @@ -120,6 +123,7 @@ def table( spark_conf: Optional[Dict[str, str]] = None, table_properties: Optional[Dict[str, str]] = None, partition_cols: Optional[List[str]] = None, + cluster_by: Optional[List[str]] = None, schema: Optional[Union[StructType, str]] = None, format: Optional[str] = None, ) -> Union[Callable[[QueryFunction], None], None]: @@ -142,11 +146,12 @@ def table( :param table_properties: A dict where the keys are the property names and the values are the \ property values. These properties will be set on the table. :param partition_cols: A list containing the column names of the partition columns. + :param cluster_by: A list containing the column names of the cluster columns. :param schema: Explicit Spark SQL schema to materialize this table with. Supports either a \ Pyspark StructType or a SQL DDL string, such as "a INT, b STRING". :param format: The format of the table, e.g. "parquet". """ - _validate_stored_dataset_args(name, table_properties, partition_cols) + _validate_stored_dataset_args(name, table_properties, partition_cols, cluster_by) source_code_location = get_caller_source_code_location(stacklevel=1) @@ -163,6 +168,7 @@ def outer( name=resolved_name, table_properties=table_properties or {}, partition_cols=partition_cols, + cluster_by=cluster_by, schema=schema, source_code_location=source_code_location, format=format, @@ -209,6 +215,7 @@ def materialized_view( spark_conf: Optional[Dict[str, str]] = None, table_properties: Optional[Dict[str, str]] = None, partition_cols: Optional[List[str]] = None, + cluster_by: Optional[List[str]] = None, schema: Optional[Union[StructType, str]] = None, ) -> Callable[[QueryFunction], None]: ... @@ -222,6 +229,7 @@ def materialized_view( spark_conf: Optional[Dict[str, str]] = None, table_properties: Optional[Dict[str, str]] = None, partition_cols: Optional[List[str]] = None, + cluster_by: Optional[List[str]] = None, schema: Optional[Union[StructType, str]] = None, format: Optional[str] = None, ) -> Union[Callable[[QueryFunction], None], None]: @@ -244,11 +252,12 @@ def materialized_view( :param table_properties: A dict where the keys are the property names and the values are the \ property values. These properties will be set on the table. :param partition_cols: A list containing the column names of the partition columns. + :param cluster_by: A list containing the column names of the cluster columns. :param schema: Explicit Spark SQL schema to materialize this table with. Supports either a \ Pyspark StructType or a SQL DDL string, such as "a INT, b STRING". :param format: The format of the table, e.g. "parquet". """ - _validate_stored_dataset_args(name, table_properties, partition_cols) + _validate_stored_dataset_args(name, table_properties, partition_cols, cluster_by) source_code_location = get_caller_source_code_location(stacklevel=1) @@ -265,6 +274,7 @@ def outer( name=resolved_name, table_properties=table_properties or {}, partition_cols=partition_cols, + cluster_by=cluster_by, schema=schema, source_code_location=source_code_location, format=format, @@ -403,6 +413,7 @@ def create_streaming_table( comment: Optional[str] = None, table_properties: Optional[Dict[str, str]] = None, partition_cols: Optional[List[str]] = None, + cluster_by: Optional[List[str]] = None, schema: Optional[Union[StructType, str]] = None, format: Optional[str] = None, ) -> None: @@ -417,6 +428,7 @@ def create_streaming_table( :param table_properties: A dict where the keys are the property names and the values are the \ property values. These properties will be set on the table. :param partition_cols: A list containing the column names of the partition columns. + :param cluster_by: A list containing the column names of the cluster columns. :param schema Explicit Spark SQL schema to materialize this table with. Supports either a \ Pyspark StructType or a SQL DDL string, such as "a INT, b STRING". :param format: The format of the table, e.g. "parquet". @@ -435,6 +447,7 @@ def create_streaming_table( }, ) validate_optional_list_of_str_arg(arg_name="partition_cols", arg_value=partition_cols) + validate_optional_list_of_str_arg(arg_name="cluster_by", arg_value=cluster_by) source_code_location = get_caller_source_code_location(stacklevel=1) @@ -444,6 +457,7 @@ def create_streaming_table( source_code_location=source_code_location, table_properties=table_properties or {}, partition_cols=partition_cols, + cluster_by=cluster_by, schema=schema, format=format, ) diff --git a/python/pyspark/pipelines/output.py b/python/pyspark/pipelines/output.py index 84e950f161742..92058e68721f4 100644 --- a/python/pyspark/pipelines/output.py +++ b/python/pyspark/pipelines/output.py @@ -45,6 +45,7 @@ class Table(Output): :param table_properties: A dict where the keys are the property names and the values are the property values. These properties will be set on the table. :param partition_cols: A list containing the column names of the partition columns. + :param cluster_by: A list containing the column names of the cluster columns. :param schema Explicit Spark SQL schema to materialize this table with. Supports either a Pyspark StructType or a SQL DDL string, such as "a INT, b STRING". :param format: The format of the table, e.g. "parquet". @@ -52,6 +53,7 @@ class Table(Output): table_properties: Mapping[str, str] partition_cols: Optional[Sequence[str]] + cluster_by: Optional[Sequence[str]] schema: Optional[Union[StructType, str]] format: Optional[str] diff --git a/python/pyspark/pipelines/spark_connect_graph_element_registry.py b/python/pyspark/pipelines/spark_connect_graph_element_registry.py index 5c5ef9fc30401..e8a8561c3e749 100644 --- a/python/pyspark/pipelines/spark_connect_graph_element_registry.py +++ b/python/pyspark/pipelines/spark_connect_graph_element_registry.py @@ -63,6 +63,7 @@ def register_output(self, output: Output) -> None: table_details = pb2.PipelineCommand.DefineOutput.TableDetails( table_properties=output.table_properties, partition_cols=output.partition_cols, + clustering_columns=output.cluster_by, format=output.format, # Even though schema_string is not required, the generated Python code seems to # erroneously think it is required. diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py b/python/pyspark/sql/connect/proto/pipelines_pb2.py index d7321fa7cf0c1..139de83dc1aaf 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.py +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py @@ -42,7 +42,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x9c"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutputH\x00R\x0c\x64\x65\x66ineOutput\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x12\xa1\x01\n*get_query_function_execution_signal_stream\x18\x07 \x01(\x0b\x32\x44.spark.connect.PipelineCommand.GetQueryFunctionExecutionSignalStreamH\x00R%getQueryFunctionExecutionSignalStream\x12\x88\x01\n!define_flow_query_function_result\x18\x08 \x01(\x0b\x32<.spark.connect.PipelineCommand.DefineFlowQueryFunctionResultH\x00R\x1d\x64\x65\x66ineFlowQueryFunctionResult\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a\xb4\x02\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xe3\t\n\x0c\x44\x65\x66ineOutput\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12$\n\x0boutput_name\x18\x02 \x01(\tH\x02R\noutputName\x88\x01\x01\x12?\n\x0boutput_type\x18\x03 \x01(\x0e\x32\x19.spark.connect.OutputTypeH\x03R\noutputType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x04R\x07\x63omment\x88\x01\x01\x12X\n\x14source_code_location\x18\x05 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12_\n\rtable_details\x18\x06 \x01(\x0b\x32\x38.spark.connect.PipelineCommand.DefineOutput.TableDetailsH\x00R\x0ctableDetails\x12\\\n\x0csink_details\x18\x07 \x01(\x0b\x32\x37.spark.connect.PipelineCommand.DefineOutput.SinkDetailsH\x00R\x0bsinkDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a\x91\x03\n\x0cTableDetails\x12x\n\x10table_properties\x18\x01 \x03(\x0b\x32M.spark.connect.PipelineCommand.DefineOutput.TableDetails.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x02 \x03(\tR\rpartitionCols\x12\x1b\n\x06\x66ormat\x18\x03 \x01(\tH\x01R\x06\x66ormat\x88\x01\x01\x12\x43\n\x10schema_data_type\x18\x04 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x00R\x0eschemaDataType\x12%\n\rschema_string\x18\x05 \x01(\tH\x00R\x0cschemaString\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x08\n\x06schemaB\t\n\x07_format\x1a\xd1\x01\n\x0bSinkDetails\x12^\n\x07options\x18\x01 \x03(\x0b\x32\x44.spark.connect.PipelineCommand.DefineOutput.SinkDetails.OptionsEntryR\x07options\x12\x1b\n\x06\x66ormat\x18\x02 \x01(\tH\x00R\x06\x66ormat\x88\x01\x01\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07_formatB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0e\n\x0c_output_nameB\x0e\n\x0c_output_typeB\n\n\x08_commentB\x17\n\x15_source_code_location\x1a\xdd\x06\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x02R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x03R\x11targetDatasetName\x88\x01\x01\x12Q\n\x08sql_conf\x18\x04 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12 \n\tclient_id\x18\x05 \x01(\tH\x04R\x08\x63lientId\x88\x01\x01\x12X\n\x14source_code_location\x18\x06 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12x\n\x15relation_flow_details\x18\x07 \x01(\x0b\x32\x42.spark.connect.PipelineCommand.DefineFlow.WriteRelationFlowDetailsH\x00R\x13relationFlowDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x61\n\x18WriteRelationFlowDetails\x12\x38\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x08relation\x88\x01\x01\x42\x0b\n\t_relation\x1a:\n\x08Response\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x42\x0c\n\n_flow_nameB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0c\n\n_client_idB\x17\n\x15_source_code_location\x1a\xc2\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x12\x1d\n\x07storage\x18\x06 \x01(\tH\x03R\x07storage\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dryB\n\n\x08_storage\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text\x1a\x9e\x01\n%GetQueryFunctionExecutionSignalStream\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tclient_id\x18\x02 \x01(\tH\x01R\x08\x63lientId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_client_id\x1a\xdd\x01\n\x1d\x44\x65\x66ineFlowQueryFunctionResult\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x12/\n\x11\x64\x61taflow_graph_id\x18\x02 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x38\n\x08relation\x18\x03 \x01(\x0b\x32\x17.spark.connect.RelationH\x02R\x08relation\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0b\n\t_relationB\x0e\n\x0c\x63ommand_type"\xf0\x05\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x12k\n\x14\x64\x65\x66ine_output_result\x18\x02 \x01(\x0b\x32\x37.spark.connect.PipelineCommandResult.DefineOutputResultH\x00R\x12\x64\x65\x66ineOutputResult\x12\x65\n\x12\x64\x65\x66ine_flow_result\x18\x03 \x01(\x0b\x32\x35.spark.connect.PipelineCommandResult.DefineFlowResultH\x00R\x10\x64\x65\x66ineFlowResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x85\x01\n\x12\x44\x65\x66ineOutputResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifier\x1a\x83\x01\n\x10\x44\x65\x66ineFlowResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifierB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"\xf1\x01\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x12,\n\x0f\x64\x65\x66inition_path\x18\x03 \x01(\tH\x02R\x0e\x64\x65\x66initionPath\x88\x01\x01\x12\x33\n\textension\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\textensionB\x0c\n\n_file_nameB\x0e\n\x0c_line_numberB\x12\n\x10_definition_path"E\n$PipelineQueryFunctionExecutionSignal\x12\x1d\n\nflow_names\x18\x01 \x03(\tR\tflowNames"\xd7\x01\n\x17PipelineAnalysisContext\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12,\n\x0f\x64\x65\x66inition_path\x18\x02 \x01(\tH\x01R\x0e\x64\x65\x66initionPath\x88\x01\x01\x12\x33\n\textension\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\textensionB\x14\n\x12_dataflow_graph_idB\x12\n\x10_definition_path*i\n\nOutputType\x12\x1b\n\x17OUTPUT_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x12\x08\n\x04SINK\x10\x04\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xcb"\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOutputH\x00R\x0c\x64\x65\x66ineOutput\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x12\xa1\x01\n*get_query_function_execution_signal_stream\x18\x07 \x01(\x0b\x32\x44.spark.connect.PipelineCommand.GetQueryFunctionExecutionSignalStreamH\x00R%getQueryFunctionExecutionSignalStream\x12\x88\x01\n!define_flow_query_function_result\x18\x08 \x01(\x0b\x32<.spark.connect.PipelineCommand.DefineFlowQueryFunctionResultH\x00R\x1d\x64\x65\x66ineFlowQueryFunctionResult\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a\xb4\x02\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x92\n\n\x0c\x44\x65\x66ineOutput\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12$\n\x0boutput_name\x18\x02 \x01(\tH\x02R\noutputName\x88\x01\x01\x12?\n\x0boutput_type\x18\x03 \x01(\x0e\x32\x19.spark.connect.OutputTypeH\x03R\noutputType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x04R\x07\x63omment\x88\x01\x01\x12X\n\x14source_code_location\x18\x05 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12_\n\rtable_details\x18\x06 \x01(\x0b\x32\x38.spark.connect.PipelineCommand.DefineOutput.TableDetailsH\x00R\x0ctableDetails\x12\\\n\x0csink_details\x18\x07 \x01(\x0b\x32\x37.spark.connect.PipelineCommand.DefineOutput.SinkDetailsH\x00R\x0bsinkDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a\xc0\x03\n\x0cTableDetails\x12x\n\x10table_properties\x18\x01 \x03(\x0b\x32M.spark.connect.PipelineCommand.DefineOutput.TableDetails.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x02 \x03(\tR\rpartitionCols\x12\x1b\n\x06\x66ormat\x18\x03 \x01(\tH\x01R\x06\x66ormat\x88\x01\x01\x12\x43\n\x10schema_data_type\x18\x04 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x00R\x0eschemaDataType\x12%\n\rschema_string\x18\x05 \x01(\tH\x00R\x0cschemaString\x12-\n\x12\x63lustering_columns\x18\x06 \x03(\tR\x11\x63lusteringColumns\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x08\n\x06schemaB\t\n\x07_format\x1a\xd1\x01\n\x0bSinkDetails\x12^\n\x07options\x18\x01 \x03(\x0b\x32\x44.spark.connect.PipelineCommand.DefineOutput.SinkDetails.OptionsEntryR\x07options\x12\x1b\n\x06\x66ormat\x18\x02 \x01(\tH\x00R\x06\x66ormat\x88\x01\x01\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07_formatB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0e\n\x0c_output_nameB\x0e\n\x0c_output_typeB\n\n\x08_commentB\x17\n\x15_source_code_location\x1a\xdd\x06\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x02R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x03R\x11targetDatasetName\x88\x01\x01\x12Q\n\x08sql_conf\x18\x04 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12 \n\tclient_id\x18\x05 \x01(\tH\x04R\x08\x63lientId\x88\x01\x01\x12X\n\x14source_code_location\x18\x06 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12x\n\x15relation_flow_details\x18\x07 \x01(\x0b\x32\x42.spark.connect.PipelineCommand.DefineFlow.WriteRelationFlowDetailsH\x00R\x13relationFlowDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x61\n\x18WriteRelationFlowDetails\x12\x38\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x08relation\x88\x01\x01\x42\x0b\n\t_relation\x1a:\n\x08Response\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x42\x0c\n\n_flow_nameB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0c\n\n_client_idB\x17\n\x15_source_code_location\x1a\xc2\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x12\x1d\n\x07storage\x18\x06 \x01(\tH\x03R\x07storage\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dryB\n\n\x08_storage\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text\x1a\x9e\x01\n%GetQueryFunctionExecutionSignalStream\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tclient_id\x18\x02 \x01(\tH\x01R\x08\x63lientId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_client_id\x1a\xdd\x01\n\x1d\x44\x65\x66ineFlowQueryFunctionResult\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x12/\n\x11\x64\x61taflow_graph_id\x18\x02 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x38\n\x08relation\x18\x03 \x01(\x0b\x32\x17.spark.connect.RelationH\x02R\x08relation\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0b\n\t_relationB\x0e\n\x0c\x63ommand_type"\xf0\x05\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x12k\n\x14\x64\x65\x66ine_output_result\x18\x02 \x01(\x0b\x32\x37.spark.connect.PipelineCommandResult.DefineOutputResultH\x00R\x12\x64\x65\x66ineOutputResult\x12\x65\n\x12\x64\x65\x66ine_flow_result\x18\x03 \x01(\x0b\x32\x35.spark.connect.PipelineCommandResult.DefineFlowResultH\x00R\x10\x64\x65\x66ineFlowResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x85\x01\n\x12\x44\x65\x66ineOutputResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifier\x1a\x83\x01\n\x10\x44\x65\x66ineFlowResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifierB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"\xf1\x01\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x12,\n\x0f\x64\x65\x66inition_path\x18\x03 \x01(\tH\x02R\x0e\x64\x65\x66initionPath\x88\x01\x01\x12\x33\n\textension\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\textensionB\x0c\n\n_file_nameB\x0e\n\x0c_line_numberB\x12\n\x10_definition_path"E\n$PipelineQueryFunctionExecutionSignal\x12\x1d\n\nflow_names\x18\x01 \x03(\tR\tflowNames"\xd7\x01\n\x17PipelineAnalysisContext\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12,\n\x0f\x64\x65\x66inition_path\x18\x02 \x01(\tH\x01R\x0e\x64\x65\x66initionPath\x88\x01\x01\x12\x33\n\textension\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\textensionB\x14\n\x12_dataflow_graph_idB\x12\n\x10_definition_path*i\n\nOutputType\x12\x1b\n\x17OUTPUT_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x12\x08\n\x04SINK\x10\x04\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _globals = globals() @@ -69,10 +69,10 @@ ]._serialized_options = b"8\001" _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_OUTPUTTYPE"]._serialized_start = 6058 - _globals["_OUTPUTTYPE"]._serialized_end = 6163 + _globals["_OUTPUTTYPE"]._serialized_start = 6105 + _globals["_OUTPUTTYPE"]._serialized_end = 6210 _globals["_PIPELINECOMMAND"]._serialized_start = 195 - _globals["_PIPELINECOMMAND"]._serialized_end = 4575 + _globals["_PIPELINECOMMAND"]._serialized_end = 4622 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1129 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1437 _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 1338 @@ -80,51 +80,51 @@ _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1439 _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1529 _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1532 - _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2783 + _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2830 _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start = 2068 - _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end = 2469 + _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end = 2516 _globals[ "_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY" - ]._serialized_start = 2382 + ]._serialized_start = 2429 _globals[ "_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY" - ]._serialized_end = 2448 - _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start = 2472 - _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end = 2681 - _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start = 2612 - _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end = 2670 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2786 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3647 + ]._serialized_end = 2495 + _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start = 2519 + _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end = 2728 + _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start = 2659 + _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end = 2717 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2833 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3694 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 1338 _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1396 - _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start = 3380 - _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end = 3477 - _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3479 - _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3537 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3650 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 3972 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 3975 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4174 - _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start = 4177 - _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end = 4335 - _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 4338 - _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end = 4559 - _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4578 - _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5330 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 4947 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 5045 - _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 5048 - _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 5181 - _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 5184 - _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5315 - _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5332 - _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5405 - _globals["_PIPELINEEVENT"]._serialized_start = 5407 - _globals["_PIPELINEEVENT"]._serialized_end = 5523 - _globals["_SOURCECODELOCATION"]._serialized_start = 5526 - _globals["_SOURCECODELOCATION"]._serialized_end = 5767 - _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5769 - _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5838 - _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5841 - _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6056 + _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start = 3427 + _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end = 3524 + _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3526 + _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3584 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3697 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4019 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 4022 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4221 + _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start = 4224 + _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end = 4382 + _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 4385 + _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end = 4606 + _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4625 + _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5377 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 4994 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 5092 + _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 5095 + _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 5228 + _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 5231 + _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5362 + _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5379 + _globals["_PIPELINEEVENTRESULT"]._serialized_end = 5452 + _globals["_PIPELINEEVENT"]._serialized_start = 5454 + _globals["_PIPELINEEVENT"]._serialized_end = 5570 + _globals["_SOURCECODELOCATION"]._serialized_start = 5573 + _globals["_SOURCECODELOCATION"]._serialized_end = 5814 + _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5816 + _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5885 + _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 5888 + _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6103 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index b9170e763ed92..60d131037c99d 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -240,6 +240,7 @@ class PipelineCommand(google.protobuf.message.Message): FORMAT_FIELD_NUMBER: builtins.int SCHEMA_DATA_TYPE_FIELD_NUMBER: builtins.int SCHEMA_STRING_FIELD_NUMBER: builtins.int + CLUSTERING_COLUMNS_FIELD_NUMBER: builtins.int @property def table_properties( self, @@ -255,6 +256,11 @@ class PipelineCommand(google.protobuf.message.Message): @property def schema_data_type(self) -> pyspark.sql.connect.proto.types_pb2.DataType: ... schema_string: builtins.str + @property + def clustering_columns( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """Optional cluster columns for the table.""" def __init__( self, *, @@ -263,6 +269,7 @@ class PipelineCommand(google.protobuf.message.Message): format: builtins.str | None = ..., schema_data_type: pyspark.sql.connect.proto.types_pb2.DataType | None = ..., schema_string: builtins.str = ..., + clustering_columns: collections.abc.Iterable[builtins.str] | None = ..., ) -> None: ... def HasField( self, @@ -284,6 +291,8 @@ class PipelineCommand(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "_format", b"_format", + "clustering_columns", + b"clustering_columns", "format", b"format", "partition_cols", diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index c6a5e571f9792..0fa36f8a15143 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -104,6 +104,9 @@ message PipelineCommand { spark.connect.DataType schema_data_type = 4; string schema_string = 5; } + + // Optional cluster columns for the table. + repeated string clustering_columns = 6; } // Metadata that's only applicable to sinks. diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 7e69e546893e6..0929b07be5237 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -203,6 +203,8 @@ private[connect] object PipelinesHandler extends Logging { }, partitionCols = Option(tableDetails.getPartitionColsList.asScala.toSeq) .filter(_.nonEmpty), + clusterCols = Option(tableDetails.getClusteringColumnsList.asScala.toSeq) + .filter(_.nonEmpty), properties = tableDetails.getTablePropertiesMap.asScala.toMap, origin = QueryOrigin( filePath = Option.when(output.getSourceCodeLocation.hasFileName)( diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 79c34ac46b9fb..1a72d112aa2ef 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -865,4 +865,41 @@ class PythonPipelineSuite (exitCode, output.toSeq) } + + test("empty cluster_by list should work and create table with no clustering") { + withTable("mv", "st") { + val graph = buildGraph(""" + |from pyspark.sql.functions import col + | + |@dp.materialized_view(cluster_by = []) + |def mv(): + | return spark.range(5).withColumn("id_mod", col("id") % 2) + | + |@dp.table(cluster_by = []) + |def st(): + | return spark.readStream.table("mv") + |""".stripMargin) + val updateContext = + new PipelineUpdateContextImpl(graph, eventCallback = _ => (), storageRoot = storageRoot) + updateContext.pipelineExecution.runPipeline() + updateContext.pipelineExecution.awaitCompletion() + + // Check tables are created with no clustering transforms + val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] + + val mvIdentifier = Identifier.of(Array("default"), "mv") + val mvTable = catalog.loadTable(mvIdentifier) + val mvTransforms = mvTable.partitioning() + assert( + mvTransforms.isEmpty, + s"MaterializedView should have no transforms, but got: ${mvTransforms.mkString(", ")}") + + val stIdentifier = Identifier.of(Array("default"), "st") + val stTable = catalog.loadTable(stIdentifier) + val stTransforms = stTable.partitioning() + assert( + stTransforms.isEmpty, + s"Table should have no transforms, but got: ${stTransforms.mkString(", ")}") + } + } } diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala index dfb766b1df778..f3b63f7914218 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala @@ -41,10 +41,12 @@ class TestPipelineDefinition(graphId: String) { // TODO: Add support for specifiedSchema // specifiedSchema: Option[StructType] = None, partitionCols: Option[Seq[String]] = None, + clusterCols: Option[Seq[String]] = None, properties: Map[String, String] = Map.empty): Unit = { val tableDetails = sc.PipelineCommand.DefineOutput.TableDetails .newBuilder() .addAllPartitionCols(partitionCols.getOrElse(Seq()).asJava) + .addAllClusteringColumns(clusterCols.getOrElse(Seq()).asJava) .putAllTableProperties(properties.asJava) .build() diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala index cb142988ce519..e5c87fa542ad1 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.connector.catalog.{ TableInfo } import org.apache.spark.sql.connector.catalog.CatalogV2Util.v2ColumnsToStructType -import org.apache.spark.sql.connector.expressions.Expressions +import org.apache.spark.sql.connector.expressions.{ClusterByTransform, Expressions} import org.apache.spark.sql.execution.command.CreateViewCommand import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers import org.apache.spark.sql.pipelines.util.SchemaInferenceUtils.diffSchemas @@ -266,6 +266,19 @@ object DatasetManager extends Logging { ) val mergedProperties = resolveTableProperties(table, identifier) val partitioning = table.partitionCols.toSeq.flatten.map(Expressions.identity) + val clustering = table.clusterCols.map(cols => + ClusterByTransform(cols.map(col => Expressions.column(col))) + ).toSeq + + // Validate that partition and cluster columns don't coexist + if (partitioning.nonEmpty && clustering.nonEmpty) { + throw new AnalysisException( + errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED", + messageParameters = Map.empty + ) + } + + val allTransforms = partitioning ++ clustering val existingTableOpt = if (catalog.tableExists(identifier)) { Some(catalog.loadTable(identifier)) @@ -273,15 +286,15 @@ object DatasetManager extends Logging { None } - // Error if partitioning doesn't match + // Error if partitioning/clustering doesn't match if (existingTableOpt.isDefined) { - val existingPartitioning = existingTableOpt.get.partitioning().toSeq - if (existingPartitioning != partitioning) { + val existingTransforms = existingTableOpt.get.partitioning().toSeq + if (existingTransforms != allTransforms) { throw new AnalysisException( errorClass = "CANNOT_UPDATE_PARTITION_COLUMNS", messageParameters = Map( - "existingPartitionColumns" -> existingPartitioning.mkString(", "), - "requestedPartitionColumns" -> partitioning.mkString(", ") + "existingPartitionColumns" -> existingTransforms.mkString(", "), + "requestedPartitionColumns" -> allTransforms.mkString(", ") ) ) } @@ -314,7 +327,7 @@ object DatasetManager extends Logging { new TableInfo.Builder() .withProperties(mergedProperties.asJava) .withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema)) - .withPartitions(partitioning.toArray) + .withPartitions(allTransforms.toArray) .build() ) } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala index 2c9029fdd34d6..647df79bb940c 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala @@ -264,12 +264,20 @@ class BatchTableWrite( if (destination.format.isDefined) { dataFrameWriter.format(destination.format.get) } + + // In "append" mode with saveAsTable, partition/cluster columns must be specified in query + // because the format and options of the existing table is used, and the table could + // have been created with partition columns. + if (destination.clusterCols.isDefined) { + val clusterCols = destination.clusterCols.get + dataFrameWriter.clusterBy(clusterCols.head, clusterCols.tail: _*) + } + if (destination.partitionCols.isDefined) { + dataFrameWriter.partitionBy(destination.partitionCols.get: _*) + } + dataFrameWriter .mode("append") - // In "append" mode with saveAsTable, partition columns must be specified in query - // because the format and options of the existing table is used, and the table could - // have been created with partition columns. - .partitionBy(destination.partitionCols.getOrElse(Seq.empty): _*) .saveAsTable(destination.identifier.unquotedString) } } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala index 55a03a2d19f90..5df12be7f4cfe 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala @@ -192,6 +192,7 @@ class SqlGraphRegistrationContext( specifiedSchema = Option.when(cst.columns.nonEmpty)(StructType(cst.columns.map(_.toV1Column))), partitionCols = Option(PartitionHelper.applyPartitioning(cst.partitioning, queryOrigin)), + clusterCols = None, properties = cst.tableSpec.properties, origin = queryOrigin.copy( objectName = Option(stIdentifier.unquotedString), @@ -223,6 +224,7 @@ class SqlGraphRegistrationContext( specifiedSchema = Option.when(cst.columns.nonEmpty)(StructType(cst.columns.map(_.toV1Column))), partitionCols = Option(PartitionHelper.applyPartitioning(cst.partitioning, queryOrigin)), + clusterCols = None, properties = cst.tableSpec.properties, origin = queryOrigin.copy( objectName = Option(stIdentifier.unquotedString), @@ -273,6 +275,7 @@ class SqlGraphRegistrationContext( specifiedSchema = Option.when(cmv.columns.nonEmpty)(StructType(cmv.columns.map(_.toV1Column))), partitionCols = Option(PartitionHelper.applyPartitioning(cmv.partitioning, queryOrigin)), + clusterCols = None, properties = cmv.tableSpec.properties, origin = queryOrigin.copy( objectName = Option(mvIdentifier.unquotedString), diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala index 87e01ed2021ef..c762174e67251 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala @@ -114,6 +114,7 @@ sealed trait TableInput extends Input { * @param identifier The identifier of this table within the graph. * @param specifiedSchema The user-specified schema for this table. * @param partitionCols What columns the table should be partitioned by when materialized. + * @param clusterCols What columns the table should be clustered by when materialized. * @param normalizedPath Normalized storage location for the table based on the user-specified table * path (if not defined, we will normalize a managed storage path for it). * @param properties Table Properties to set in table metadata. @@ -124,6 +125,7 @@ case class Table( identifier: TableIdentifier, specifiedSchema: Option[StructType], partitionCols: Option[Seq[String]], + clusterCols: Option[Seq[String]], normalizedPath: Option[String], properties: Map[String, String] = Map.empty, comment: Option[String], diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala index 31afc5a27a545..ba8419eb6e9c8 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.pipelines.graph import scala.jdk.CollectionConverters._ import org.apache.spark.SparkThrowable +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog} -import org.apache.spark.sql.connector.expressions.Expressions +import org.apache.spark.sql.connector.expressions.{ClusterByTransform, Expressions, FieldReference} import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.pipelines.graph.DatasetManager.TableMaterializationException import org.apache.spark.sql.pipelines.utils.{BaseCoreExecutionTest, TestGraphRegistrationContext} @@ -885,4 +886,247 @@ abstract class MaterializeTablesSuite extends BaseCoreExecutionTest { storageRoot = storageRoot ) } + + test("cluster columns with user schema") { + val session = spark + import session.implicits._ + + materializeGraph( + new TestGraphRegistrationContext(spark) { + registerTable( + "a", + query = Option(dfFlowFunc(Seq((1, 1, "x"), (2, 3, "y")).toDF("x1", "x2", "x3"))), + specifiedSchema = Option( + new StructType() + .add("x1", IntegerType) + .add("x2", IntegerType) + .add("x3", StringType) + ), + clusterCols = Option(Seq("x1", "x3")) + ) + }.resolveToDataflowGraph(), + storageRoot = storageRoot + ) + val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] + val identifier = Identifier.of(Array(TestGraphRegistrationContext.DEFAULT_DATABASE), "a") + val table = catalog.loadTable(identifier) + assert( + table.columns() sameElements CatalogV2Util.structTypeToV2Columns( + new StructType() + .add("x1", IntegerType) + .add("x2", IntegerType) + .add("x3", StringType) + ) + ) + val expectedClusterTransform = ClusterByTransform( + Seq(FieldReference("x1"), FieldReference("x3")).toSeq + ) + assert(table.partitioning().contains(expectedClusterTransform)) + } + + test("specifying cluster column with existing clustered table") { + val session = spark + import session.implicits._ + + materializeGraph( + new TestGraphRegistrationContext(spark) { + registerTable( + "t10", + query = Option(dfFlowFunc(Seq((1, true, "a"), (2, false, "b")).toDF("x", "y", "z"))), + clusterCols = Option(Seq("x", "z")) + ) + }.resolveToDataflowGraph(), + storageRoot = storageRoot + ) + + val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] + val identifier = Identifier.of(Array(TestGraphRegistrationContext.DEFAULT_DATABASE), "t10") + val table = catalog.loadTable(identifier) + val expectedClusterTransform = ClusterByTransform( + Seq(FieldReference("x"), FieldReference("z")).toSeq + ) + assert(table.partitioning().contains(expectedClusterTransform)) + + // Specify the same cluster columns - should work + materializeGraph( + new TestGraphRegistrationContext(spark) { + registerFlow( + "t10", + "t10", + query = dfFlowFunc(Seq((3, true, "c"), (4, false, "d")).toDF("x", "y", "z")) + ) + registerTable("t10", clusterCols = Option(Seq("x", "z"))) + }.resolveToDataflowGraph(), + storageRoot = storageRoot + ) + + val table2 = catalog.loadTable(identifier) + assert(table2.partitioning().contains(expectedClusterTransform)) + + // Don't specify cluster columns when table already has them - should throw + val ex = intercept[TableMaterializationException] { + materializeGraph( + new TestGraphRegistrationContext(spark) { + registerFlow( + "t10", + "t10", + query = dfFlowFunc(Seq((5, true, "e"), (6, false, "f")).toDF("x", "y", "z")) + ) + registerTable("t10") + }.resolveToDataflowGraph(), + storageRoot = storageRoot + ) + } + assert(ex.cause.asInstanceOf[SparkThrowable].getCondition == "CANNOT_UPDATE_PARTITION_COLUMNS") + } + + test("specifying cluster column different from existing clustered table") { + val session = spark + import session.implicits._ + + materializeGraph( + new TestGraphRegistrationContext(spark) { + registerTable( + "t11", + query = Option(dfFlowFunc(Seq((1, true, "a"), (2, false, "b")).toDF("x", "y", "z"))), + clusterCols = Option(Seq("x")) + ) + }.resolveToDataflowGraph(), + storageRoot = storageRoot + ) + + val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] + val identifier = Identifier.of(Array(TestGraphRegistrationContext.DEFAULT_DATABASE), "t11") + + // Specify different cluster columns - should throw + val ex = intercept[TableMaterializationException] { + materializeGraph( + new TestGraphRegistrationContext(spark) { + registerFlow( + "t11", + "t11", + query = dfFlowFunc(Seq((3, true, "c"), (4, false, "d")).toDF("x", "y", "z")) + ) + registerTable("t11", clusterCols = Option(Seq("y"))) + }.resolveToDataflowGraph(), + storageRoot = storageRoot + ) + } + assert(ex.cause.asInstanceOf[SparkThrowable].getCondition == "CANNOT_UPDATE_PARTITION_COLUMNS") + + val table = catalog.loadTable(identifier) + val expectedClusterTransform = ClusterByTransform(Seq(FieldReference("x")).toSeq) + assert(table.partitioning().contains(expectedClusterTransform)) + } + + test("cluster columns only (no partitioning)") { + val session = spark + import session.implicits._ + + materializeGraph( + new TestGraphRegistrationContext(spark) { + registerTable( + "t12", + query = Option(dfFlowFunc(Seq((1, 1, "x"), (2, 3, "y")).toDF("x1", "x2", "x3"))), + specifiedSchema = Option( + new StructType() + .add("x1", IntegerType) + .add("x2", IntegerType) + .add("x3", StringType) + ), + clusterCols = Option(Seq("x1", "x3")) + ) + }.resolveToDataflowGraph(), + storageRoot = storageRoot + ) + val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] + val identifier = Identifier.of(Array(TestGraphRegistrationContext.DEFAULT_DATABASE), "t12") + val table = catalog.loadTable(identifier) + assert( + table.columns() sameElements CatalogV2Util.structTypeToV2Columns( + new StructType() + .add("x1", IntegerType) + .add("x2", IntegerType) + .add("x3", StringType) + ) + ) + + val transforms = table.partitioning() + val expectedClusterTransform = ClusterByTransform( + Seq(FieldReference("x1"), FieldReference("x3")).toSeq + ) + assert(transforms.contains(expectedClusterTransform)) + } + + test("materialized view with cluster columns") { + val session = spark + import session.implicits._ + + materializeGraph( + new TestGraphRegistrationContext(spark) { + registerMaterializedView( + "mv1", + query = dfFlowFunc(Seq((1, 1, "x"), (2, 3, "y")).toDF("x1", "x2", "x3")), + clusterCols = Option(Seq("x1", "x2")) + ) + }.resolveToDataflowGraph(), + storageRoot = storageRoot + ) + val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] + val identifier = Identifier.of(Array(TestGraphRegistrationContext.DEFAULT_DATABASE), "mv1") + val table = catalog.loadTable(identifier) + assert( + table.columns() sameElements CatalogV2Util.structTypeToV2Columns( + new StructType() + .add("x1", IntegerType) + .add("x2", IntegerType) + .add("x3", StringType) + ) + ) + val expectedClusterTransform = ClusterByTransform( + Seq(FieldReference("x1"), FieldReference("x2")).toSeq + ) + assert(table.partitioning().contains(expectedClusterTransform)) + } + + test("partition and cluster columns together should fail") { + val session = spark + import session.implicits._ + + val ex = intercept[TableMaterializationException] { + materializeGraph( + new TestGraphRegistrationContext(spark) { + registerTable( + "invalid_table", + query = Option(dfFlowFunc(Seq((1, 1, "x"), (2, 3, "y")).toDF("x1", "x2", "x3"))), + partitionCols = Option(Seq("x2")), + clusterCols = Option(Seq("x1", "x3")) + ) + }.resolveToDataflowGraph(), + storageRoot = storageRoot + ) + } + assert(ex.cause.isInstanceOf[AnalysisException]) + val analysisEx = ex.cause.asInstanceOf[AnalysisException] + assert(analysisEx.errorClass.get == "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED") + } + + test("cluster column that doesn't exist in table schema should fail") { + val session = spark + import session.implicits._ + + val ex = intercept[TableMaterializationException] { + materializeGraph( + new TestGraphRegistrationContext(spark) { + registerTable( + "invalid_cluster_table", + query = Option(dfFlowFunc(Seq((1, 1, "x"), (2, 3, "y")).toDF("x1", "x2", "x3"))), + clusterCols = Option(Seq("nonexistent_column")) + ) + }.resolveToDataflowGraph(), + storageRoot = storageRoot + ) + } + assert(ex.cause.isInstanceOf[AnalysisException]) + } } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/APITest.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/APITest.scala index bb7c8e833f84b..c6b457ee04eba 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/APITest.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/APITest.scala @@ -542,6 +542,66 @@ trait APITest } } + test("Python Pipeline with cluster columns") { + val pipelineSpec = + TestPipelineSpec(include = Seq("transformations/**")) + val pipelineConfig = TestPipelineConfiguration(pipelineSpec) + val sources = Seq( + PipelineSourceFile( + name = "transformations/definition.py", + contents = """ + |from pyspark import pipelines as dp + |from pyspark.sql import DataFrame, SparkSession + |from pyspark.sql.functions import col + | + |spark = SparkSession.active() + | + |@dp.materialized_view(cluster_by = ["cluster_col1"]) + |def mv(): + | df = spark.range(10) + | df = df.withColumn("cluster_col1", col("id") % 3) + | df = df.withColumn("cluster_col2", col("id") % 2) + | return df + | + |@dp.table(cluster_by = ["cluster_col1"]) + |def st(): + | return spark.readStream.table("mv") + |""".stripMargin)) + val pipeline = createAndRunPipeline(pipelineConfig, sources) + awaitPipelineTermination(pipeline) + + // Verify tables have correct data + Seq("mv", "st").foreach { tbl => + val fullName = s"$tbl" + checkAnswer( + spark.sql(s"SELECT * FROM $fullName ORDER BY id"), + Seq( + Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 0), Row(3, 0, 1), Row(4, 1, 0), + Row(5, 2, 1), Row(6, 0, 0), Row(7, 1, 1), Row(8, 2, 0), Row(9, 0, 1) + )) + } + + // Verify clustering information is stored in catalog + val catalog = spark.sessionState.catalogManager.currentCatalog + .asInstanceOf[org.apache.spark.sql.connector.catalog.TableCatalog] + // Check materialized view has clustering transform + val mvIdentifier = org.apache.spark.sql.connector.catalog.Identifier + .of(Array("default"), "mv") + val mvTable = catalog.loadTable(mvIdentifier) + val mvTransforms = mvTable.partitioning() + assert(mvTransforms.length == 1) + assert(mvTransforms.head.name() == "cluster_by") + assert(mvTransforms.head.toString.contains("cluster_col1")) + // Check streaming table has clustering transform + val stIdentifier = org.apache.spark.sql.connector.catalog.Identifier + .of(Array("default"), "st") + val stTable = catalog.loadTable(stIdentifier) + val stTransforms = stTable.partitioning() + assert(stTransforms.length == 1) + assert(stTransforms.head.name() == "cluster_by") + assert(stTransforms.head.toString.contains("cluster_col1")) + } + /* Below tests pipeline execution configurations */ test("Pipeline with dry run") { diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala index 599aab87d1f7d..d88432d68ca3c 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala @@ -46,6 +46,7 @@ class TestGraphRegistrationContext( comment: Option[String] = None, specifiedSchema: Option[StructType] = None, partitionCols: Option[Seq[String]] = None, + clusterCols: Option[Seq[String]] = None, properties: Map[String, String] = Map.empty, baseOrigin: QueryOrigin = QueryOrigin.empty, format: Option[String] = None, @@ -58,6 +59,7 @@ class TestGraphRegistrationContext( comment, specifiedSchema, partitionCols, + clusterCols, properties, baseOrigin, format, @@ -99,6 +101,7 @@ class TestGraphRegistrationContext( comment: Option[String] = None, specifiedSchema: Option[StructType] = None, partitionCols: Option[Seq[String]] = None, + clusterCols: Option[Seq[String]] = None, properties: Map[String, String] = Map.empty, baseOrigin: QueryOrigin = QueryOrigin.empty, format: Option[String] = None, @@ -111,6 +114,7 @@ class TestGraphRegistrationContext( comment, specifiedSchema, partitionCols, + clusterCols, properties, baseOrigin, format, @@ -129,6 +133,7 @@ class TestGraphRegistrationContext( comment: Option[String], specifiedSchema: Option[StructType], partitionCols: Option[Seq[String]], + clusterCols: Option[Seq[String]], properties: Map[String, String], baseOrigin: QueryOrigin, format: Option[String], @@ -150,6 +155,7 @@ class TestGraphRegistrationContext( comment = comment, specifiedSchema = specifiedSchema, partitionCols = partitionCols, + clusterCols = clusterCols, properties = properties, origin = baseOrigin.merge( QueryOrigin(