diff --git a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto index e716364f69bb..d75cccc348b4 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto @@ -344,6 +344,8 @@ message StreamingQueryManagerCommand { AwaitAnyTerminationCommand await_any_termination = 3; // resetTerminated() API. bool reset_terminated = 4; + // addListener() API. + AddStreamingQueryListenerCommand add_listener = 5; } message AwaitAnyTerminationCommand { @@ -377,6 +379,14 @@ message StreamingQueryManagerCommandResult { } } +// TODO: maybe serialize the whole class? +message AddStreamingQueryListenerCommand { + bytes on_query_started = 1; + bytes on_query_progress = 2; + bytes on_query_terminated = 3; + // TODO: deserialize in python or in scala? +} + // Command to get the output of 'SparkContext.resources' message GetResourcesCommand { } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 631a9eee5f26..4c3764ae9c54 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2646,6 +2646,11 @@ class SparkConnectPlanner(val session: SparkSession) { session.streams.resetTerminated() respBuilder.setResetTerminated(true) + case StreamingQueryManagerCommand.CommandCase.ADD_LISTENER => + val listener = + new PythonStreamingQueryListener(command.getAddListener, sessionId, pythonExec) + session.streams.addListener(listener) + case StreamingQueryManagerCommand.CommandCase.COMMAND_NOT_SET => throw new IllegalArgumentException("Missing command in StreamingQueryManagerCommand") } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerConverter.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerConverter.scala new file mode 100644 index 000000000000..09a3e9aff8b3 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerConverter.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.planner + +import java.util.Base64 + +import scala.io.Source + +import org.apache.spark.api.python.PythonUtils +import org.apache.spark.connect.proto +import org.apache.spark.sql.streaming.StreamingQueryListener + +class PythonStreamingQueryListener( + listener: proto.AddStreamingQueryListenerCommand, + sessionId: String, + pythonExec: String) + extends StreamingQueryListener { + // Start a process to run foreachbatch python func + // TODO: Reuse some functions from PythonRunner.scala + // TODO: Handle process better: reuse process; release process; monitor process + // TODO(wei) reuse process +// val envVars = udf.func.envVars.asScala.toMap + + val pb = new ProcessBuilder() + val pbEnv = pb.environment() + val pythonPath = PythonUtils.mergePythonPaths( + PythonUtils.sparkPythonPath, +// envVars.getOrElse("PYTHONPATH", ""), + sys.env.getOrElse("PYTHONPATH", "")) + pbEnv.put("PYTHONPATH", pythonPath) +// pbEnv.putAll(envVars.asJava) + + pb.command(pythonExec) + + // Encode serialized func as string so that it can be passed into the process through + // arguments + val onQueryStartedBytes = listener.getOnQueryStarted.toByteArray + val onQueryStartedStr = Base64.getEncoder().encodeToString(onQueryStartedBytes) + + // Output for debug for now. + // TODO: redirect the output stream + // TODO: handle error + + // TODO(Wei): serialize and deserialize events + + private def toJSON(event: StreamingQueryListener.QueryStartedEvent): String = + s""" + |{ + | "id": "${event.id}", + | "runId": "${event.runId}", + | "name": "${event.name}", + | "timestamp": "${event.timestamp}" + |} + """.stripMargin + + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + val eventJson = toJSON(event) + val pythonScript = s""" + |print('###### Start running onQueryStarted ######') + |from pyspark.sql import SparkSession + |from pyspark.serializers import CloudPickleSerializer + |from pyspark.sql.connect.streaming.listener import ( + | StreamingQueryListener, + | QueryStartedEvent + |) + |from pyspark.sql.streaming.listener import ( + | QueryProgressEvent, + | QueryTerminatedEvent, + | QueryIdleEvent + |) + |import sys + |import base64 + |import json + | + |startEvent = QueryStartedEvent.fromJson(json.loads('''$eventJson''')) + |sessionId = '$sessionId' + |sparkConnectSession = SparkSession.builder.remote("sc://localhost:15002").getOrCreate() + |sparkConnectSession._client._session_id = sessionId + | + |bytes = base64.b64decode('$onQueryStartedStr') + |func = CloudPickleSerializer().loads(bytes) + |# forEachBatchFunc = unpickledCode[0] + |func(startEvent) + |exit() + """.stripMargin + pb.command(pythonExec, "-c", pythonScript) + val process = pb.start() + // Output for debug for now. + // TODO: redirect the output stream + // TODO: handle error + // TODO(WEI): python ver? + val is = process.getInputStream() + // scalastyle:off println + val out = Source.fromInputStream(is).mkString + println(s"##### Python out for query start event is: out=$out") + + val es = process.getErrorStream + val errorOut = Source.fromInputStream(es).mkString + println(s"##### Python error for query start event is: error=$errorOut") + + val exitCode = process.waitFor() + println(s"##### End processing query start event exitCode=$exitCode") + // scalastyle:on println + } + + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {} + + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} +} diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py index 9bc86d25f21b..33bbccdbaf06 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.py +++ b/python/pyspark/sql/connect/proto/commands_pb2.py @@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x86\x07\n\x07\x43ommand\x12]\n\x11register_function\x18\x01 \x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateDataframeView\x12O\n\x12write_operation_v2\x18\x04 \x01(\x0b\x32\x1f.spark.connect.WriteOperationV2H\x00R\x10writeOperationV2\x12<\n\x0bsql_command\x18\x05 \x01(\x0b\x32\x19.spark.connect.SqlCommandH\x00R\nsqlCommand\x12k\n\x1cwrite_stream_operation_start\x18\x06 \x01(\x0b\x32(.spark.connect.WriteStreamOperationStartH\x00R\x19writeStreamOperationStart\x12^\n\x17streaming_query_command\x18\x07 \x01(\x0b\x32$.spark.connect.StreamingQueryCommandH\x00R\x15streamingQueryCommand\x12X\n\x15get_resources_command\x18\x08 \x01(\x0b\x32".spark.connect.GetResourcesCommandH\x00R\x13getResourcesCommand\x12t\n\x1fstreaming_query_manager_command\x18\t \x01(\x0b\x32+.spark.connect.StreamingQueryManagerCommandH\x00R\x1cstreamingQueryManagerCommand\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textensionB\x0e\n\x0c\x63ommand_type"\xb3\x01\n\nSqlCommand\x12\x10\n\x03sql\x18\x01 \x01(\tR\x03sql\x12\x37\n\x04\x61rgs\x18\x02 \x03(\x0b\x32#.spark.connect.SqlCommand.ArgsEntryR\x04\x61rgs\x1aZ\n\tArgsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x05value:\x02\x38\x01"\x96\x01\n\x1a\x43reateDataFrameViewCommand\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x1b\n\tis_global\x18\x03 \x01(\x08R\x08isGlobal\x12\x18\n\x07replace\x18\x04 \x01(\x08R\x07replace"\x9b\x08\n\x0eWriteOperation\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1b\n\x06source\x18\x02 \x01(\tH\x01R\x06source\x88\x01\x01\x12\x14\n\x04path\x18\x03 \x01(\tH\x00R\x04path\x12?\n\x05table\x18\x04 \x01(\x0b\x32\'.spark.connect.WriteOperation.SaveTableH\x00R\x05table\x12:\n\x04mode\x18\x05 \x01(\x0e\x32&.spark.connect.WriteOperation.SaveModeR\x04mode\x12*\n\x11sort_column_names\x18\x06 \x03(\tR\x0fsortColumnNames\x12\x31\n\x14partitioning_columns\x18\x07 \x03(\tR\x13partitioningColumns\x12\x43\n\tbucket_by\x18\x08 \x01(\x0b\x32&.spark.connect.WriteOperation.BucketByR\x08\x62ucketBy\x12\x44\n\x07options\x18\t \x03(\x0b\x32*.spark.connect.WriteOperation.OptionsEntryR\x07options\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x82\x02\n\tSaveTable\x12\x1d\n\ntable_name\x18\x01 \x01(\tR\ttableName\x12X\n\x0bsave_method\x18\x02 \x01(\x0e\x32\x37.spark.connect.WriteOperation.SaveTable.TableSaveMethodR\nsaveMethod"|\n\x0fTableSaveMethod\x12!\n\x1dTABLE_SAVE_METHOD_UNSPECIFIED\x10\x00\x12#\n\x1fTABLE_SAVE_METHOD_SAVE_AS_TABLE\x10\x01\x12!\n\x1dTABLE_SAVE_METHOD_INSERT_INTO\x10\x02\x1a[\n\x08\x42ucketBy\x12.\n\x13\x62ucket_column_names\x18\x01 \x03(\tR\x11\x62ucketColumnNames\x12\x1f\n\x0bnum_buckets\x18\x02 \x01(\x05R\nnumBuckets"\x89\x01\n\x08SaveMode\x12\x19\n\x15SAVE_MODE_UNSPECIFIED\x10\x00\x12\x14\n\x10SAVE_MODE_APPEND\x10\x01\x12\x17\n\x13SAVE_MODE_OVERWRITE\x10\x02\x12\x1d\n\x19SAVE_MODE_ERROR_IF_EXISTS\x10\x03\x12\x14\n\x10SAVE_MODE_IGNORE\x10\x04\x42\x0b\n\tsave_typeB\t\n\x07_source"\xad\x06\n\x10WriteOperationV2\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1d\n\ntable_name\x18\x02 \x01(\tR\ttableName\x12\x1f\n\x08provider\x18\x03 \x01(\tH\x00R\x08provider\x88\x01\x01\x12L\n\x14partitioning_columns\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13partitioningColumns\x12\x46\n\x07options\x18\x05 \x03(\x0b\x32,.spark.connect.WriteOperationV2.OptionsEntryR\x07options\x12_\n\x10table_properties\x18\x06 \x03(\x0b\x32\x34.spark.connect.WriteOperationV2.TablePropertiesEntryR\x0ftableProperties\x12\x38\n\x04mode\x18\x07 \x01(\x0e\x32$.spark.connect.WriteOperationV2.ModeR\x04mode\x12J\n\x13overwrite_condition\x18\x08 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x12overwriteCondition\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01"\x9f\x01\n\x04Mode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\x0f\n\x0bMODE_CREATE\x10\x01\x12\x12\n\x0eMODE_OVERWRITE\x10\x02\x12\x1d\n\x19MODE_OVERWRITE_PARTITIONS\x10\x03\x12\x0f\n\x0bMODE_APPEND\x10\x04\x12\x10\n\x0cMODE_REPLACE\x10\x05\x12\x1a\n\x16MODE_CREATE_OR_REPLACE\x10\x06\x42\x0b\n\t_provider"\xd0\x05\n\x19WriteStreamOperationStart\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06\x66ormat\x18\x02 \x01(\tR\x06\x66ormat\x12O\n\x07options\x18\x03 \x03(\x0b\x32\x35.spark.connect.WriteStreamOperationStart.OptionsEntryR\x07options\x12:\n\x19partitioning_column_names\x18\x04 \x03(\tR\x17partitioningColumnNames\x12:\n\x18processing_time_interval\x18\x05 \x01(\tH\x00R\x16processingTimeInterval\x12%\n\ravailable_now\x18\x06 \x01(\x08H\x00R\x0c\x61vailableNow\x12\x14\n\x04once\x18\x07 \x01(\x08H\x00R\x04once\x12\x46\n\x1e\x63ontinuous_checkpoint_interval\x18\x08 \x01(\tH\x00R\x1c\x63ontinuousCheckpointInterval\x12\x1f\n\x0boutput_mode\x18\t \x01(\tR\noutputMode\x12\x1d\n\nquery_name\x18\n \x01(\tR\tqueryName\x12\x14\n\x04path\x18\x0b \x01(\tH\x01R\x04path\x12\x1f\n\ntable_name\x18\x0c \x01(\tH\x01R\ttableName\x12L\n\x0e\x66oreach_writer\x18\r \x01(\x0b\x32%.spark.connect.StreamingForeachWriterR\rforeachWriter\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07triggerB\x12\n\x10sink_destination"c\n\x16StreamingForeachWriter\x12?\n\rpython_writer\x18\x01 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x0cpythonWriterB\x08\n\x06writer"y\n\x1fWriteStreamOperationStartResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name"A\n\x18StreamingQueryInstanceId\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x15\n\x06run_id\x18\x02 \x01(\tR\x05runId"\xf8\x04\n\x15StreamingQueryCommand\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x18\n\x06status\x18\x02 \x01(\x08H\x00R\x06status\x12%\n\rlast_progress\x18\x03 \x01(\x08H\x00R\x0clastProgress\x12)\n\x0frecent_progress\x18\x04 \x01(\x08H\x00R\x0erecentProgress\x12\x14\n\x04stop\x18\x05 \x01(\x08H\x00R\x04stop\x12\x34\n\x15process_all_available\x18\x06 \x01(\x08H\x00R\x13processAllAvailable\x12O\n\x07\x65xplain\x18\x07 \x01(\x0b\x32\x33.spark.connect.StreamingQueryCommand.ExplainCommandH\x00R\x07\x65xplain\x12\x1e\n\texception\x18\x08 \x01(\x08H\x00R\texception\x12k\n\x11\x61wait_termination\x18\t \x01(\x0b\x32<.spark.connect.StreamingQueryCommand.AwaitTerminationCommandH\x00R\x10\x61waitTermination\x1a,\n\x0e\x45xplainCommand\x12\x1a\n\x08\x65xtended\x18\x01 \x01(\x08R\x08\x65xtended\x1aL\n\x17\x41waitTerminationCommand\x12"\n\ntimeout_ms\x18\x02 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xf5\x08\n\x1bStreamingQueryCommandResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12Q\n\x06status\x18\x02 \x01(\x0b\x32\x37.spark.connect.StreamingQueryCommandResult.StatusResultH\x00R\x06status\x12j\n\x0frecent_progress\x18\x03 \x01(\x0b\x32?.spark.connect.StreamingQueryCommandResult.RecentProgressResultH\x00R\x0erecentProgress\x12T\n\x07\x65xplain\x18\x04 \x01(\x0b\x32\x38.spark.connect.StreamingQueryCommandResult.ExplainResultH\x00R\x07\x65xplain\x12Z\n\texception\x18\x05 \x01(\x0b\x32:.spark.connect.StreamingQueryCommandResult.ExceptionResultH\x00R\texception\x12p\n\x11\x61wait_termination\x18\x06 \x01(\x0b\x32\x41.spark.connect.StreamingQueryCommandResult.AwaitTerminationResultH\x00R\x10\x61waitTermination\x1a\xaa\x01\n\x0cStatusResult\x12%\n\x0estatus_message\x18\x01 \x01(\tR\rstatusMessage\x12*\n\x11is_data_available\x18\x02 \x01(\x08R\x0fisDataAvailable\x12*\n\x11is_trigger_active\x18\x03 \x01(\x08R\x0fisTriggerActive\x12\x1b\n\tis_active\x18\x04 \x01(\x08R\x08isActive\x1aH\n\x14RecentProgressResult\x12\x30\n\x14recent_progress_json\x18\x05 \x03(\tR\x12recentProgressJson\x1a\'\n\rExplainResult\x12\x16\n\x06result\x18\x01 \x01(\tR\x06result\x1a\xc5\x01\n\x0f\x45xceptionResult\x12\x30\n\x11\x65xception_message\x18\x01 \x01(\tH\x00R\x10\x65xceptionMessage\x88\x01\x01\x12$\n\x0b\x65rror_class\x18\x02 \x01(\tH\x01R\nerrorClass\x88\x01\x01\x12$\n\x0bstack_trace\x18\x03 \x01(\tH\x02R\nstackTrace\x88\x01\x01\x42\x14\n\x12_exception_messageB\x0e\n\x0c_error_classB\x0e\n\x0c_stack_trace\x1a\x38\n\x16\x41waitTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\xde\x02\n\x1cStreamingQueryManagerCommand\x12\x18\n\x06\x61\x63tive\x18\x01 \x01(\x08H\x00R\x06\x61\x63tive\x12\x1d\n\tget_query\x18\x02 \x01(\tH\x00R\x08getQuery\x12|\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32\x46.spark.connect.StreamingQueryManagerCommand.AwaitAnyTerminationCommandH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x1aO\n\x1a\x41waitAnyTerminationCommand\x12"\n\ntimeout_ms\x18\x01 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xd3\x05\n"StreamingQueryManagerCommandResult\x12X\n\x06\x61\x63tive\x18\x01 \x01(\x0b\x32>.spark.connect.StreamingQueryManagerCommandResult.ActiveResultH\x00R\x06\x61\x63tive\x12`\n\x05query\x18\x02 \x01(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceH\x00R\x05query\x12\x81\x01\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32K.spark.connect.StreamingQueryManagerCommandResult.AwaitAnyTerminationResultH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x1a\x7f\n\x0c\x41\x63tiveResult\x12o\n\x0e\x61\x63tive_queries\x18\x01 \x03(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceR\ractiveQueries\x1as\n\x16StreamingQueryInstance\x12\x37\n\x02id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x02id\x12\x17\n\x04name\x18\x02 \x01(\tH\x00R\x04name\x88\x01\x01\x42\x07\n\x05_name\x1a;\n\x19\x41waitAnyTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\x15\n\x13GetResourcesCommand"\xd4\x01\n\x19GetResourcesCommandResult\x12U\n\tresources\x18\x01 \x03(\x0b\x32\x37.spark.connect.GetResourcesCommandResult.ResourcesEntryR\tresources\x1a`\n\x0eResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.ResourceInformationR\x05value:\x02\x38\x01\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x86\x07\n\x07\x43ommand\x12]\n\x11register_function\x18\x01 \x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateDataframeView\x12O\n\x12write_operation_v2\x18\x04 \x01(\x0b\x32\x1f.spark.connect.WriteOperationV2H\x00R\x10writeOperationV2\x12<\n\x0bsql_command\x18\x05 \x01(\x0b\x32\x19.spark.connect.SqlCommandH\x00R\nsqlCommand\x12k\n\x1cwrite_stream_operation_start\x18\x06 \x01(\x0b\x32(.spark.connect.WriteStreamOperationStartH\x00R\x19writeStreamOperationStart\x12^\n\x17streaming_query_command\x18\x07 \x01(\x0b\x32$.spark.connect.StreamingQueryCommandH\x00R\x15streamingQueryCommand\x12X\n\x15get_resources_command\x18\x08 \x01(\x0b\x32".spark.connect.GetResourcesCommandH\x00R\x13getResourcesCommand\x12t\n\x1fstreaming_query_manager_command\x18\t \x01(\x0b\x32+.spark.connect.StreamingQueryManagerCommandH\x00R\x1cstreamingQueryManagerCommand\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textensionB\x0e\n\x0c\x63ommand_type"\xb3\x01\n\nSqlCommand\x12\x10\n\x03sql\x18\x01 \x01(\tR\x03sql\x12\x37\n\x04\x61rgs\x18\x02 \x03(\x0b\x32#.spark.connect.SqlCommand.ArgsEntryR\x04\x61rgs\x1aZ\n\tArgsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x05value:\x02\x38\x01"\x96\x01\n\x1a\x43reateDataFrameViewCommand\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x1b\n\tis_global\x18\x03 \x01(\x08R\x08isGlobal\x12\x18\n\x07replace\x18\x04 \x01(\x08R\x07replace"\x9b\x08\n\x0eWriteOperation\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1b\n\x06source\x18\x02 \x01(\tH\x01R\x06source\x88\x01\x01\x12\x14\n\x04path\x18\x03 \x01(\tH\x00R\x04path\x12?\n\x05table\x18\x04 \x01(\x0b\x32\'.spark.connect.WriteOperation.SaveTableH\x00R\x05table\x12:\n\x04mode\x18\x05 \x01(\x0e\x32&.spark.connect.WriteOperation.SaveModeR\x04mode\x12*\n\x11sort_column_names\x18\x06 \x03(\tR\x0fsortColumnNames\x12\x31\n\x14partitioning_columns\x18\x07 \x03(\tR\x13partitioningColumns\x12\x43\n\tbucket_by\x18\x08 \x01(\x0b\x32&.spark.connect.WriteOperation.BucketByR\x08\x62ucketBy\x12\x44\n\x07options\x18\t \x03(\x0b\x32*.spark.connect.WriteOperation.OptionsEntryR\x07options\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x82\x02\n\tSaveTable\x12\x1d\n\ntable_name\x18\x01 \x01(\tR\ttableName\x12X\n\x0bsave_method\x18\x02 \x01(\x0e\x32\x37.spark.connect.WriteOperation.SaveTable.TableSaveMethodR\nsaveMethod"|\n\x0fTableSaveMethod\x12!\n\x1dTABLE_SAVE_METHOD_UNSPECIFIED\x10\x00\x12#\n\x1fTABLE_SAVE_METHOD_SAVE_AS_TABLE\x10\x01\x12!\n\x1dTABLE_SAVE_METHOD_INSERT_INTO\x10\x02\x1a[\n\x08\x42ucketBy\x12.\n\x13\x62ucket_column_names\x18\x01 \x03(\tR\x11\x62ucketColumnNames\x12\x1f\n\x0bnum_buckets\x18\x02 \x01(\x05R\nnumBuckets"\x89\x01\n\x08SaveMode\x12\x19\n\x15SAVE_MODE_UNSPECIFIED\x10\x00\x12\x14\n\x10SAVE_MODE_APPEND\x10\x01\x12\x17\n\x13SAVE_MODE_OVERWRITE\x10\x02\x12\x1d\n\x19SAVE_MODE_ERROR_IF_EXISTS\x10\x03\x12\x14\n\x10SAVE_MODE_IGNORE\x10\x04\x42\x0b\n\tsave_typeB\t\n\x07_source"\xad\x06\n\x10WriteOperationV2\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1d\n\ntable_name\x18\x02 \x01(\tR\ttableName\x12\x1f\n\x08provider\x18\x03 \x01(\tH\x00R\x08provider\x88\x01\x01\x12L\n\x14partitioning_columns\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13partitioningColumns\x12\x46\n\x07options\x18\x05 \x03(\x0b\x32,.spark.connect.WriteOperationV2.OptionsEntryR\x07options\x12_\n\x10table_properties\x18\x06 \x03(\x0b\x32\x34.spark.connect.WriteOperationV2.TablePropertiesEntryR\x0ftableProperties\x12\x38\n\x04mode\x18\x07 \x01(\x0e\x32$.spark.connect.WriteOperationV2.ModeR\x04mode\x12J\n\x13overwrite_condition\x18\x08 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x12overwriteCondition\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01"\x9f\x01\n\x04Mode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\x0f\n\x0bMODE_CREATE\x10\x01\x12\x12\n\x0eMODE_OVERWRITE\x10\x02\x12\x1d\n\x19MODE_OVERWRITE_PARTITIONS\x10\x03\x12\x0f\n\x0bMODE_APPEND\x10\x04\x12\x10\n\x0cMODE_REPLACE\x10\x05\x12\x1a\n\x16MODE_CREATE_OR_REPLACE\x10\x06\x42\x0b\n\t_provider"\xd0\x05\n\x19WriteStreamOperationStart\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06\x66ormat\x18\x02 \x01(\tR\x06\x66ormat\x12O\n\x07options\x18\x03 \x03(\x0b\x32\x35.spark.connect.WriteStreamOperationStart.OptionsEntryR\x07options\x12:\n\x19partitioning_column_names\x18\x04 \x03(\tR\x17partitioningColumnNames\x12:\n\x18processing_time_interval\x18\x05 \x01(\tH\x00R\x16processingTimeInterval\x12%\n\ravailable_now\x18\x06 \x01(\x08H\x00R\x0c\x61vailableNow\x12\x14\n\x04once\x18\x07 \x01(\x08H\x00R\x04once\x12\x46\n\x1e\x63ontinuous_checkpoint_interval\x18\x08 \x01(\tH\x00R\x1c\x63ontinuousCheckpointInterval\x12\x1f\n\x0boutput_mode\x18\t \x01(\tR\noutputMode\x12\x1d\n\nquery_name\x18\n \x01(\tR\tqueryName\x12\x14\n\x04path\x18\x0b \x01(\tH\x01R\x04path\x12\x1f\n\ntable_name\x18\x0c \x01(\tH\x01R\ttableName\x12L\n\x0e\x66oreach_writer\x18\r \x01(\x0b\x32%.spark.connect.StreamingForeachWriterR\rforeachWriter\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07triggerB\x12\n\x10sink_destination"c\n\x16StreamingForeachWriter\x12?\n\rpython_writer\x18\x01 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x0cpythonWriterB\x08\n\x06writer"y\n\x1fWriteStreamOperationStartResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name"A\n\x18StreamingQueryInstanceId\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x15\n\x06run_id\x18\x02 \x01(\tR\x05runId"\xf8\x04\n\x15StreamingQueryCommand\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x18\n\x06status\x18\x02 \x01(\x08H\x00R\x06status\x12%\n\rlast_progress\x18\x03 \x01(\x08H\x00R\x0clastProgress\x12)\n\x0frecent_progress\x18\x04 \x01(\x08H\x00R\x0erecentProgress\x12\x14\n\x04stop\x18\x05 \x01(\x08H\x00R\x04stop\x12\x34\n\x15process_all_available\x18\x06 \x01(\x08H\x00R\x13processAllAvailable\x12O\n\x07\x65xplain\x18\x07 \x01(\x0b\x32\x33.spark.connect.StreamingQueryCommand.ExplainCommandH\x00R\x07\x65xplain\x12\x1e\n\texception\x18\x08 \x01(\x08H\x00R\texception\x12k\n\x11\x61wait_termination\x18\t \x01(\x0b\x32<.spark.connect.StreamingQueryCommand.AwaitTerminationCommandH\x00R\x10\x61waitTermination\x1a,\n\x0e\x45xplainCommand\x12\x1a\n\x08\x65xtended\x18\x01 \x01(\x08R\x08\x65xtended\x1aL\n\x17\x41waitTerminationCommand\x12"\n\ntimeout_ms\x18\x02 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xf5\x08\n\x1bStreamingQueryCommandResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12Q\n\x06status\x18\x02 \x01(\x0b\x32\x37.spark.connect.StreamingQueryCommandResult.StatusResultH\x00R\x06status\x12j\n\x0frecent_progress\x18\x03 \x01(\x0b\x32?.spark.connect.StreamingQueryCommandResult.RecentProgressResultH\x00R\x0erecentProgress\x12T\n\x07\x65xplain\x18\x04 \x01(\x0b\x32\x38.spark.connect.StreamingQueryCommandResult.ExplainResultH\x00R\x07\x65xplain\x12Z\n\texception\x18\x05 \x01(\x0b\x32:.spark.connect.StreamingQueryCommandResult.ExceptionResultH\x00R\texception\x12p\n\x11\x61wait_termination\x18\x06 \x01(\x0b\x32\x41.spark.connect.StreamingQueryCommandResult.AwaitTerminationResultH\x00R\x10\x61waitTermination\x1a\xaa\x01\n\x0cStatusResult\x12%\n\x0estatus_message\x18\x01 \x01(\tR\rstatusMessage\x12*\n\x11is_data_available\x18\x02 \x01(\x08R\x0fisDataAvailable\x12*\n\x11is_trigger_active\x18\x03 \x01(\x08R\x0fisTriggerActive\x12\x1b\n\tis_active\x18\x04 \x01(\x08R\x08isActive\x1aH\n\x14RecentProgressResult\x12\x30\n\x14recent_progress_json\x18\x05 \x03(\tR\x12recentProgressJson\x1a\'\n\rExplainResult\x12\x16\n\x06result\x18\x01 \x01(\tR\x06result\x1a\xc5\x01\n\x0f\x45xceptionResult\x12\x30\n\x11\x65xception_message\x18\x01 \x01(\tH\x00R\x10\x65xceptionMessage\x88\x01\x01\x12$\n\x0b\x65rror_class\x18\x02 \x01(\tH\x01R\nerrorClass\x88\x01\x01\x12$\n\x0bstack_trace\x18\x03 \x01(\tH\x02R\nstackTrace\x88\x01\x01\x42\x14\n\x12_exception_messageB\x0e\n\x0c_error_classB\x0e\n\x0c_stack_trace\x1a\x38\n\x16\x41waitTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\xb4\x03\n\x1cStreamingQueryManagerCommand\x12\x18\n\x06\x61\x63tive\x18\x01 \x01(\x08H\x00R\x06\x61\x63tive\x12\x1d\n\tget_query\x18\x02 \x01(\tH\x00R\x08getQuery\x12|\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32\x46.spark.connect.StreamingQueryManagerCommand.AwaitAnyTerminationCommandH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12T\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x0b\x32/.spark.connect.AddStreamingQueryListenerCommandH\x00R\x0b\x61\x64\x64Listener\x1aO\n\x1a\x41waitAnyTerminationCommand\x12"\n\ntimeout_ms\x18\x01 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xd3\x05\n"StreamingQueryManagerCommandResult\x12X\n\x06\x61\x63tive\x18\x01 \x01(\x0b\x32>.spark.connect.StreamingQueryManagerCommandResult.ActiveResultH\x00R\x06\x61\x63tive\x12`\n\x05query\x18\x02 \x01(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceH\x00R\x05query\x12\x81\x01\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32K.spark.connect.StreamingQueryManagerCommandResult.AwaitAnyTerminationResultH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x1a\x7f\n\x0c\x41\x63tiveResult\x12o\n\x0e\x61\x63tive_queries\x18\x01 \x03(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceR\ractiveQueries\x1as\n\x16StreamingQueryInstance\x12\x37\n\x02id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x02id\x12\x17\n\x04name\x18\x02 \x01(\tH\x00R\x04name\x88\x01\x01\x42\x07\n\x05_name\x1a;\n\x19\x41waitAnyTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\xa8\x01\n AddStreamingQueryListenerCommand\x12(\n\x10on_query_started\x18\x01 \x01(\x0cR\x0eonQueryStarted\x12*\n\x11on_query_progress\x18\x02 \x01(\x0cR\x0fonQueryProgress\x12.\n\x13on_query_terminated\x18\x03 \x01(\x0cR\x11onQueryTerminated"\x15\n\x13GetResourcesCommand"\xd4\x01\n\x19GetResourcesCommandResult\x12U\n\tresources\x18\x01 \x03(\x0b\x32\x37.spark.connect.GetResourcesCommandResult.ResourcesEntryR\tresources\x1a`\n\x0eResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.ResourceInformationR\x05value:\x02\x38\x01\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) @@ -101,6 +101,9 @@ _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT = ( _STREAMINGQUERYMANAGERCOMMANDRESULT.nested_types_by_name["AwaitAnyTerminationResult"] ) +_ADDSTREAMINGQUERYLISTENERCOMMAND = DESCRIPTOR.message_types_by_name[ + "AddStreamingQueryListenerCommand" +] _GETRESOURCESCOMMAND = DESCRIPTOR.message_types_by_name["GetResourcesCommand"] _GETRESOURCESCOMMANDRESULT = DESCRIPTOR.message_types_by_name["GetResourcesCommandResult"] _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY = _GETRESOURCESCOMMANDRESULT.nested_types_by_name[ @@ -434,6 +437,17 @@ _sym_db.RegisterMessage(StreamingQueryManagerCommandResult.StreamingQueryInstance) _sym_db.RegisterMessage(StreamingQueryManagerCommandResult.AwaitAnyTerminationResult) +AddStreamingQueryListenerCommand = _reflection.GeneratedProtocolMessageType( + "AddStreamingQueryListenerCommand", + (_message.Message,), + { + "DESCRIPTOR": _ADDSTREAMINGQUERYLISTENERCOMMAND, + "__module__": "spark.connect.commands_pb2" + # @@protoc_insertion_point(class_scope:spark.connect.AddStreamingQueryListenerCommand) + }, +) +_sym_db.RegisterMessage(AddStreamingQueryListenerCommand) + GetResourcesCommand = _reflection.GeneratedProtocolMessageType( "GetResourcesCommand", (_message.Message,), @@ -541,21 +555,23 @@ _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 5996 _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 6052 _STREAMINGQUERYMANAGERCOMMAND._serialized_start = 6070 - _STREAMINGQUERYMANAGERCOMMAND._serialized_end = 6420 - _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_start = 6330 - _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_end = 6409 - _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_start = 6423 - _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_end = 7146 - _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_start = 6826 - _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_end = 6953 - _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_start = 6955 - _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_end = 7070 - _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_start = 7072 - _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_end = 7131 - _GETRESOURCESCOMMAND._serialized_start = 7148 - _GETRESOURCESCOMMAND._serialized_end = 7169 - _GETRESOURCESCOMMANDRESULT._serialized_start = 7172 - _GETRESOURCESCOMMANDRESULT._serialized_end = 7384 - _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 7288 - _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 7384 + _STREAMINGQUERYMANAGERCOMMAND._serialized_end = 6506 + _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_start = 6416 + _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_end = 6495 + _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_start = 6509 + _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_end = 7232 + _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_start = 6912 + _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_end = 7039 + _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_start = 7041 + _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_end = 7156 + _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_start = 7158 + _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_end = 7217 + _ADDSTREAMINGQUERYLISTENERCOMMAND._serialized_start = 7235 + _ADDSTREAMINGQUERYLISTENERCOMMAND._serialized_end = 7403 + _GETRESOURCESCOMMAND._serialized_start = 7405 + _GETRESOURCESCOMMAND._serialized_end = 7426 + _GETRESOURCESCOMMANDRESULT._serialized_start = 7429 + _GETRESOURCESCOMMANDRESULT._serialized_end = 7641 + _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 7545 + _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 7641 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi b/python/pyspark/sql/connect/proto/commands_pb2.pyi index 9255a081544e..3b90b8931664 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.pyi +++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi @@ -1323,6 +1323,7 @@ class StreamingQueryManagerCommand(google.protobuf.message.Message): GET_QUERY_FIELD_NUMBER: builtins.int AWAIT_ANY_TERMINATION_FIELD_NUMBER: builtins.int RESET_TERMINATED_FIELD_NUMBER: builtins.int + ADD_LISTENER_FIELD_NUMBER: builtins.int active: builtins.bool """active() API, returns a list of active queries.""" get_query: builtins.str @@ -1334,6 +1335,9 @@ class StreamingQueryManagerCommand(google.protobuf.message.Message): """awaitAnyTermination() API, wait until any query terminates or timeout.""" reset_terminated: builtins.bool """resetTerminated() API.""" + @property + def add_listener(self) -> global___AddStreamingQueryListenerCommand: + """addListener() API.""" def __init__( self, *, @@ -1342,12 +1346,15 @@ class StreamingQueryManagerCommand(google.protobuf.message.Message): await_any_termination: global___StreamingQueryManagerCommand.AwaitAnyTerminationCommand | None = ..., reset_terminated: builtins.bool = ..., + add_listener: global___AddStreamingQueryListenerCommand | None = ..., ) -> None: ... def HasField( self, field_name: typing_extensions.Literal[ "active", b"active", + "add_listener", + b"add_listener", "await_any_termination", b"await_any_termination", "command", @@ -1363,6 +1370,8 @@ class StreamingQueryManagerCommand(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "active", b"active", + "add_listener", + b"add_listener", "await_any_termination", b"await_any_termination", "command", @@ -1376,7 +1385,7 @@ class StreamingQueryManagerCommand(google.protobuf.message.Message): def WhichOneof( self, oneof_group: typing_extensions.Literal["command", b"command"] ) -> typing_extensions.Literal[ - "active", "get_query", "await_any_termination", "reset_terminated" + "active", "get_query", "await_any_termination", "reset_terminated", "add_listener" ] | None: ... global___StreamingQueryManagerCommand = StreamingQueryManagerCommand @@ -1510,6 +1519,39 @@ class StreamingQueryManagerCommandResult(google.protobuf.message.Message): global___StreamingQueryManagerCommandResult = StreamingQueryManagerCommandResult +class AddStreamingQueryListenerCommand(google.protobuf.message.Message): + """TODO: maybe serialize the whole class?""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ON_QUERY_STARTED_FIELD_NUMBER: builtins.int + ON_QUERY_PROGRESS_FIELD_NUMBER: builtins.int + ON_QUERY_TERMINATED_FIELD_NUMBER: builtins.int + on_query_started: builtins.bytes + on_query_progress: builtins.bytes + on_query_terminated: builtins.bytes + """TODO: deserialize in python or in scala?""" + def __init__( + self, + *, + on_query_started: builtins.bytes = ..., + on_query_progress: builtins.bytes = ..., + on_query_terminated: builtins.bytes = ..., + ) -> None: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "on_query_progress", + b"on_query_progress", + "on_query_started", + b"on_query_started", + "on_query_terminated", + b"on_query_terminated", + ], + ) -> None: ... + +global___AddStreamingQueryListenerCommand = AddStreamingQueryListenerCommand + class GetResourcesCommand(google.protobuf.message.Message): """Command to get the output of 'SparkContext.resources'""" diff --git a/python/pyspark/sql/connect/streaming/listener.py b/python/pyspark/sql/connect/streaming/listener.py new file mode 100644 index 000000000000..a7be49d01022 --- /dev/null +++ b/python/pyspark/sql/connect/streaming/listener.py @@ -0,0 +1,226 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import uuid +from typing import Optional +from abc import ABC, abstractmethod + +__all__ = ["StreamingQueryListener"] + + +class StreamingQueryListener(ABC): + """ + Interface for listening to events related to :class:`~pyspark.sql.streaming.StreamingQuery`. + + .. versionadded:: 3.5.0 + + Notes + ----- + The methods are not thread-safe as they may be called from different threads. + The events received are identical with Scala API. Refer to its documentation. + + This API is evolving. + + Examples + -------- + >>> class MyListener(StreamingQueryListener): + ... def onQueryStarted(self, event: QueryStartedEvent) -> None: + ... # Do something with event. + ... pass + ... + ... def onQueryProgress(self, event: QueryProgressEvent) -> None: + ... # Do something with event. + ... pass + ... + ... def onQueryIdle(self, event: QueryIdleEvent) -> None: + ... # Do something with event. + ... pass + ... + ... def onQueryTerminated(self, event: QueryTerminatedEvent) -> None: + ... # Do something with event. + ... pass + ... + >>> spark.streams.addListener(MyListener()) + """ + + @abstractmethod + def onQueryStarted(self, event: "QueryStartedEvent") -> None: + """ + Called when a query is started. + + Notes + ----- + This is called synchronously with :py:meth:`~pyspark.sql.streaming.DataStreamWriter.start`, + that is, `onQueryStart` will be called on all listeners before `DataStreamWriter.start()` + returns the corresponding :class:`~pyspark.sql.streaming.StreamingQuery`. + Please don't block this method as it will block your query. + """ + pass + + @abstractmethod + def onQueryProgress(self, event: "QueryProgressEvent") -> None: + """ + Called when there is some status update (ingestion rate updated, etc.) + + Notes + ----- + This method is asynchronous. The status in :class:`~pyspark.sql.streaming.StreamingQuery` + will always be latest no matter when this method is called. Therefore, the status of + :class:`~pyspark.sql.streaming.StreamingQuery`. + may be changed before/when you process the event. E.g., you may find + :class:`~pyspark.sql.streaming.StreamingQuery` is terminated when you are + processing `QueryProgressEvent`. + """ + pass + + @abstractmethod + def onQueryIdle(self, event: "QueryIdleEvent") -> None: + """ + Called when the query is idle and waiting for new data to process. + """ + pass + + @abstractmethod + def onQueryTerminated(self, event: "QueryTerminatedEvent") -> None: + """ + Called when a query is stopped, with or without error. + """ + pass + + +class QueryStartedEvent: + """ + Event representing the start of a query. + + .. versionadded:: 3.5.0 + + Notes + ----- + This API is evolving. + """ + + def __init__(self, id: str, runId: str, name: Optional[str], timestamp: str) -> None: + self._id = id + self._runId = runId + self._name = name + self._timestamp = timestamp + + # TODO (wei): change back to UUID? what about connect/StreamingQuery + @property + def id(self) -> str: + """ + A unique query id that persists across restarts. See + py:meth:`~pyspark.sql.streaming.StreamingQuery.id`. + """ + return self._id + + @property + def runId(self) -> str: + """ + A query id that is unique for every start/restart. See + py:meth:`~pyspark.sql.streaming.StreamingQuery.runId`. + """ + return self._runId + + @property + def name(self) -> Optional[str]: + """ + User-specified name of the query, `None` if not specified. + """ + return self._name + + @property + def timestamp(self) -> str: + """ + The timestamp to start a query. + """ + return self._timestamp + + @classmethod + def fromJson(cls, j) -> "QueryStartedEvent": + return cls(j["id"], j["runId"], j["name"], j["timestamp"]) + + +class QueryIdleEvent: + """ + Event representing that query is idle and waiting for new data to process. + + .. versionadded:: 3.5.0 + + Notes + ----- + This API is evolving. + """ + + @property + def id(self) -> str: + """ + A unique query id that persists across restarts. See + py:meth:`~pyspark.sql.streaming.StreamingQuery.id`. + """ + return self._id + + @property + def runId(self) -> str: + """ + A query id that is unique for every start/restart. See + py:meth:`~pyspark.sql.streaming.StreamingQuery.runId`. + """ + return self._runId + + @property + def name(self) -> Optional[str]: + """ + User-specified name of the query, `None` if not specified. + """ + return self._name + + @property + def timestamp(self) -> str: + """ + The timestamp to start a query. + """ + return self._timestamp + + @classmethod + def fromJson(cls, j) -> "QueryStartedEvent": + return cls(j["id"], j["runId"], j["name"], j["timestamp"]) + + +# class QueryProgressEvent: +# """ +# Event representing any progress updates in a query. +# +# .. versionadded:: 3.4.0 +# +# Notes +# ----- +# This API is evolving. +# """ +# +# def __init__(self, progress: "StreamingQueryProgress") -> None: +# self._progress = progress +# +# @property +# def progress(self) -> "StreamingQueryProgress": +# """ +# The query progress updates. +# """ +# return self._progress +# +# @classmethod +# def fromJson(cls, j) -> "QueryProgressEvent": +# return cls(QueryProgressEvent.fromJson(j["progress"])) diff --git a/python/pyspark/sql/connect/streaming/query.py b/python/pyspark/sql/connect/streaming/query.py index e5aa881c9906..f8429c3887e1 100644 --- a/python/pyspark/sql/connect/streaming/query.py +++ b/python/pyspark/sql/connect/streaming/query.py @@ -21,6 +21,7 @@ from pyspark.errors import StreamingQueryException, PySparkValueError import pyspark.sql.connect.proto as pb2 +from pyspark.sql.connect.streaming.listener import StreamingQueryListener from pyspark.sql.streaming.query import ( StreamingQuery as PySparkStreamingQuery, StreamingQueryManager as PySparkStreamingQueryManager, @@ -28,6 +29,7 @@ from pyspark.errors.exceptions.connect import ( StreamingQueryException as CapturedStreamingQueryException, ) +from pyspark.serializers import CloudPickleSerializer __all__ = ["StreamingQuery", "StreamingQueryManager"] @@ -230,10 +232,14 @@ def resetTerminated(self) -> None: resetTerminated.__doc__ = PySparkStreamingQueryManager.resetTerminated.__doc__ - def addListener(self, listener: Any) -> None: - # TODO(SPARK-42941): Change listener type to Connect StreamingQueryListener - # and implement below - raise NotImplementedError("addListener() is not implemented.") + def addListener(self, listener: StreamingQueryListener) -> None: + cmd = pb2.StreamingQueryManagerCommand() + ser = CloudPickleSerializer() + cmd.add_listener.on_query_started = ser.dumps(listener.onQueryStarted) + cmd.add_listener.on_query_progress = ser.dumps(listener.onQueryProgress) + cmd.add_listener.on_query_terminated = ser.dumps(listener.onQueryTerminated) + self._execute_streaming_query_manager_cmd(cmd) + return None # TODO(SPARK-42941): uncomment below # addListener.__doc__ = PySparkStreamingQueryManager.addListener.__doc__ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 61a0ef1b98e5..21e3ba4940b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -123,7 +123,19 @@ object StreamingQueryListener { val id: UUID, val runId: UUID, val name: String, - val timestamp: String) extends Event + val timestamp: String) extends Event { + + def toJSON: String = { + s""" + |{ + | "id": "$id", + | "runId": "$runId", + | "name": "$name", + | "timestamp": "$timestamp" + |} + """.stripMargin + } + } /** * Event representing any progress updates in a query.