Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions connector/connect/src/main/protobuf/spark/connect/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,7 @@ message ExecutePlanRequest {
message ExecutePlanResponse {
string client_id = 1;

// Result type
oneof result_type {
ArrowBatch arrow_batch = 2;
JSONBatch json_batch = 3;
}
ArrowBatch arrow_batch = 2;

// Metrics for the query execution. Typically, this field is only present in the last
// batch of results and then represent the overall state of the query execution.
Expand All @@ -155,14 +151,6 @@ message ExecutePlanResponse {
bytes data = 2;
}

// Message type when the result is returned as JSON. This is essentially a bulk wrapper
// for the JSON result of a Spark DataFrame. All rows are returned in the JSON record format
// of `{col -> row}`.
message JSONBatch {
int64 row_count = 1;
bytes data = 2;
}

message Metrics {

repeated MetricObject metrics = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.spark.sql.connect.service

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.google.protobuf.ByteString
import io.grpc.stub.StreamObserver

import org.apache.spark.SparkException
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
import org.apache.spark.internal.Logging
Expand All @@ -34,7 +32,6 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils

class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResponse])
extends Logging {
Expand All @@ -57,75 +54,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
// Extract the plan from the request and convert it to a logical plan
val planner = new SparkConnectPlanner(session)
val dataframe = Dataset.ofRows(session, planner.transformRelation(request.getPlan.getRoot))
try {
processAsArrowBatches(request.getClientId, dataframe)
} catch {
case e: Exception =>
logWarning(e.getMessage)
processAsJsonBatches(request.getClientId, dataframe)
}
}

def processAsJsonBatches(clientId: String, dataframe: DataFrame): Unit = {
// Only process up to 10MB of data.
val sb = new StringBuilder
var rowCount = 0
dataframe.toJSON
.collect()
.foreach(row => {

// There are a few cases to cover here.
// 1. The aggregated buffer size is larger than the MAX_BATCH_SIZE
// -> send the current batch and reset.
// 2. The aggregated buffer size is smaller than the MAX_BATCH_SIZE
// -> append the row to the buffer.
// 3. The row in question is larger than the MAX_BATCH_SIZE
// -> fail the query.

// Case 3. - Fail
if (row.size > MAX_BATCH_SIZE) {
throw SparkException.internalError(
s"Serialized row is larger than MAX_BATCH_SIZE: ${row.size} > ${MAX_BATCH_SIZE}")
}

// Case 1 - FLush and send.
if (sb.size + row.size > MAX_BATCH_SIZE) {
val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
val batch = proto.ExecutePlanResponse.JSONBatch
.newBuilder()
.setData(ByteString.copyFromUtf8(sb.toString()))
.setRowCount(rowCount)
.build()
response.setJsonBatch(batch)
responseObserver.onNext(response.build())
sb.clear()
sb.append(row)
rowCount = 1
} else {
// Case 2 - Append.
// Make sure to put the newline delimiters only between items and not at the end.
if (rowCount > 0) {
sb.append("\n")
}
sb.append(row)
rowCount += 1
}
})

// If the last batch is not empty, send out the data to the client.
if (sb.size > 0) {
val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
val batch = proto.ExecutePlanResponse.JSONBatch
.newBuilder()
.setData(ByteString.copyFromUtf8(sb.toString()))
.setRowCount(rowCount)
.build()
response.setJsonBatch(batch)
responseObserver.onNext(response.build())
}

responseObserver.onNext(sendMetricsToResponse(clientId, dataframe))
responseObserver.onCompleted()
processAsArrowBatches(request.getClientId, dataframe)
}

def processAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = {
Expand All @@ -142,83 +71,20 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
var numSent = 0

if (numPartitions > 0) {
type Batch = (Array[Byte], Long)

val batches = rows.mapPartitionsInternal(
SparkConnectStreamHandler
.rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId))

val signal = new Object
val partitions = collection.mutable.Map.empty[Int, Array[Batch]]
var error: Throwable = null

val processPartition = (iter: Iterator[Batch]) => iter.toArray

// This callback is executed by the DAGScheduler thread.
// After fetching a partition, it inserts the partition into the Map, and then
// wakes up the main thread.
val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
signal.synchronized {
partitions(partitionId) = partition
signal.notify()
}
()
}

val future = spark.sparkContext.submitJob(
rdd = batches,
processPartition = processPartition,
partitions = Seq.range(0, numPartitions),
resultHandler = resultHandler,
resultFunc = () => ())

// Collect errors and propagate them to the main thread.
future.onComplete { result =>
result.failed.foreach { throwable =>
signal.synchronized {
error = throwable
signal.notify()
}
}
}(ThreadUtils.sameThread)

// The main thread will wait until 0-th partition is available,
// then send it to client and wait for the next partition.
// Different from the implementation of [[Dataset#collectAsArrowToPython]], it sends
// the arrow batches in main thread to avoid DAGScheduler thread been blocked for
// tasks not related to scheduling. This is particularly important if there are
// multiple users or clients running code at the same time.
var currentPartitionId = 0
while (currentPartitionId < numPartitions) {
val partition = signal.synchronized {
var result = partitions.remove(currentPartitionId)
while (result.isEmpty && error == null) {
signal.wait()
result = partitions.remove(currentPartitionId)
}
error match {
case NonFatal(e) =>
responseObserver.onError(error)
logError("Error while processing query.", e)
return
case fatal: Throwable => throw fatal
case null => result.get
}
}

partition.foreach { case (bytes, count) =>
val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
val batch = proto.ExecutePlanResponse.ArrowBatch
.newBuilder()
.setRowCount(count)
.setData(ByteString.copyFrom(bytes))
.build()
response.setArrowBatch(batch)
responseObserver.onNext(response.build())
numSent += 1
}

currentPartitionId += 1
batches.collect().foreach { case (bytes, count) =>
val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
val batch = proto.ExecutePlanResponse.ArrowBatch
.newBuilder()
.setRowCount(count)
.setData(ByteString.copyFrom(bytes))
.build()
response.setArrowBatch(batch)
responseObserver.onNext(response.build())
numSent += 1
}
}

Expand Down
5 changes: 0 additions & 5 deletions python/pyspark/sql/connect/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#


import io
import logging
import os
import typing
Expand Down Expand Up @@ -446,13 +445,9 @@ def _analyze(self, plan: pb2.Plan, explain_mode: str = "extended") -> AnalyzeRes
return AnalyzeResult.fromProto(resp)

def _process_batch(self, b: pb2.ExecutePlanResponse) -> Optional[pandas.DataFrame]:
import pandas as pd

if b.arrow_batch is not None and len(b.arrow_batch.data) > 0:
with pa.ipc.open_stream(b.arrow_batch.data) as rd:
return rd.read_pandas()
elif b.json_batch is not None and len(b.json_batch.data) > 0:
return pd.read_json(io.BytesIO(b.json_batch.data), lines=True)
return None

def _execute_and_fetch(self, req: pb2.ExecutePlanRequest) -> typing.Optional[pandas.DataFrame]:
Expand Down
41 changes: 14 additions & 27 deletions python/pyspark/sql/connect/proto/base_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xb5\x01\n\x07\x45xplain\x12\x45\n\x0c\x65xplain_mode\x18\x01 \x01(\x0e\x32".spark.connect.Explain.ExplainModeR\x0b\x65xplainMode"c\n\x0b\x45xplainMode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\n\n\x06SIMPLE\x10\x01\x12\x0c\n\x08\x45XTENDED\x10\x02\x12\x0b\n\x07\x43ODEGEN\x10\x03\x12\x08\n\x04\x43OST\x10\x04\x12\r\n\tFORMATTED\x10\x05"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x01(\tR\x08userName\x12\x35\n\nextensions\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\nextensions"\x81\x02\n\x12\x41nalyzePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x12\x30\n\x07\x65xplain\x18\x05 \x01(\x0b\x32\x16.spark.connect.ExplainR\x07\x65xplainB\x0e\n\x0c_client_type"\x8a\x01\n\x13\x41nalyzePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12/\n\x06schema\x18\x02 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x12%\n\x0e\x65xplain_string\x18\x03 \x01(\tR\rexplainString"\xcf\x01\n\x12\x45xecutePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x42\x0e\n\x0c_client_type"\xad\x07\n\x13\x45xecutePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12P\n\x0b\x61rrow_batch\x18\x02 \x01(\x0b\x32-.spark.connect.ExecutePlanResponse.ArrowBatchH\x00R\narrowBatch\x12M\n\njson_batch\x18\x03 \x01(\x0b\x32,.spark.connect.ExecutePlanResponse.JSONBatchH\x00R\tjsonBatch\x12\x44\n\x07metrics\x18\x04 \x01(\x0b\x32*.spark.connect.ExecutePlanResponse.MetricsR\x07metrics\x1a=\n\nArrowBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x1a<\n\tJSONBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x1a\x85\x04\n\x07Metrics\x12Q\n\x07metrics\x18\x01 \x03(\x0b\x32\x37.spark.connect.ExecutePlanResponse.Metrics.MetricObjectR\x07metrics\x1a\xcc\x02\n\x0cMetricObject\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x17\n\x07plan_id\x18\x02 \x01(\x03R\x06planId\x12\x16\n\x06parent\x18\x03 \x01(\x03R\x06parent\x12z\n\x11\x65xecution_metrics\x18\x04 \x03(\x0b\x32M.spark.connect.ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntryR\x10\x65xecutionMetrics\x1a{\n\x15\x45xecutionMetricsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ExecutePlanResponse.Metrics.MetricValueR\x05value:\x02\x38\x01\x1aX\n\x0bMetricValue\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n\x05value\x18\x02 \x01(\x03R\x05value\x12\x1f\n\x0bmetric_type\x18\x03 \x01(\tR\nmetricTypeB\r\n\x0bresult_type2\xc7\x01\n\x13SparkConnectService\x12X\n\x0b\x45xecutePlan\x12!.spark.connect.ExecutePlanRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12V\n\x0b\x41nalyzePlan\x12!.spark.connect.AnalyzePlanRequest\x1a".spark.connect.AnalyzePlanResponse"\x00\x42"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3'
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xb5\x01\n\x07\x45xplain\x12\x45\n\x0c\x65xplain_mode\x18\x01 \x01(\x0e\x32".spark.connect.Explain.ExplainModeR\x0b\x65xplainMode"c\n\x0b\x45xplainMode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\n\n\x06SIMPLE\x10\x01\x12\x0c\n\x08\x45XTENDED\x10\x02\x12\x0b\n\x07\x43ODEGEN\x10\x03\x12\x08\n\x04\x43OST\x10\x04\x12\r\n\tFORMATTED\x10\x05"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x01(\tR\x08userName\x12\x35\n\nextensions\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\nextensions"\x81\x02\n\x12\x41nalyzePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x12\x30\n\x07\x65xplain\x18\x05 \x01(\x0b\x32\x16.spark.connect.ExplainR\x07\x65xplainB\x0e\n\x0c_client_type"\x8a\x01\n\x13\x41nalyzePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12/\n\x06schema\x18\x02 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x12%\n\x0e\x65xplain_string\x18\x03 \x01(\tR\rexplainString"\xcf\x01\n\x12\x45xecutePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x42\x0e\n\x0c_client_type"\x8f\x06\n\x13\x45xecutePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12N\n\x0b\x61rrow_batch\x18\x02 \x01(\x0b\x32-.spark.connect.ExecutePlanResponse.ArrowBatchR\narrowBatch\x12\x44\n\x07metrics\x18\x04 \x01(\x0b\x32*.spark.connect.ExecutePlanResponse.MetricsR\x07metrics\x1a=\n\nArrowBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x1a\x85\x04\n\x07Metrics\x12Q\n\x07metrics\x18\x01 \x03(\x0b\x32\x37.spark.connect.ExecutePlanResponse.Metrics.MetricObjectR\x07metrics\x1a\xcc\x02\n\x0cMetricObject\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x17\n\x07plan_id\x18\x02 \x01(\x03R\x06planId\x12\x16\n\x06parent\x18\x03 \x01(\x03R\x06parent\x12z\n\x11\x65xecution_metrics\x18\x04 \x03(\x0b\x32M.spark.connect.ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntryR\x10\x65xecutionMetrics\x1a{\n\x15\x45xecutionMetricsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ExecutePlanResponse.Metrics.MetricValueR\x05value:\x02\x38\x01\x1aX\n\x0bMetricValue\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n\x05value\x18\x02 \x01(\x03R\x05value\x12\x1f\n\x0bmetric_type\x18\x03 \x01(\tR\nmetricType2\xc7\x01\n\x13SparkConnectService\x12X\n\x0b\x45xecutePlan\x12!.spark.connect.ExecutePlanRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12V\n\x0b\x41nalyzePlan\x12!.spark.connect.AnalyzePlanRequest\x1a".spark.connect.AnalyzePlanResponse"\x00\x42"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3'
)


Expand All @@ -48,7 +48,6 @@
_EXECUTEPLANREQUEST = DESCRIPTOR.message_types_by_name["ExecutePlanRequest"]
_EXECUTEPLANRESPONSE = DESCRIPTOR.message_types_by_name["ExecutePlanResponse"]
_EXECUTEPLANRESPONSE_ARROWBATCH = _EXECUTEPLANRESPONSE.nested_types_by_name["ArrowBatch"]
_EXECUTEPLANRESPONSE_JSONBATCH = _EXECUTEPLANRESPONSE.nested_types_by_name["JSONBatch"]
_EXECUTEPLANRESPONSE_METRICS = _EXECUTEPLANRESPONSE.nested_types_by_name["Metrics"]
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT = _EXECUTEPLANRESPONSE_METRICS.nested_types_by_name[
"MetricObject"
Expand Down Expand Up @@ -139,15 +138,6 @@
# @@protoc_insertion_point(class_scope:spark.connect.ExecutePlanResponse.ArrowBatch)
},
),
"JSONBatch": _reflection.GeneratedProtocolMessageType(
"JSONBatch",
(_message.Message,),
{
"DESCRIPTOR": _EXECUTEPLANRESPONSE_JSONBATCH,
"__module__": "spark.connect.base_pb2"
# @@protoc_insertion_point(class_scope:spark.connect.ExecutePlanResponse.JSONBatch)
},
),
"Metrics": _reflection.GeneratedProtocolMessageType(
"Metrics",
(_message.Message,),
Expand Down Expand Up @@ -191,7 +181,6 @@
)
_sym_db.RegisterMessage(ExecutePlanResponse)
_sym_db.RegisterMessage(ExecutePlanResponse.ArrowBatch)
_sym_db.RegisterMessage(ExecutePlanResponse.JSONBatch)
_sym_db.RegisterMessage(ExecutePlanResponse.Metrics)
_sym_db.RegisterMessage(ExecutePlanResponse.Metrics.MetricObject)
_sym_db.RegisterMessage(ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntry)
Expand Down Expand Up @@ -219,19 +208,17 @@
_EXECUTEPLANREQUEST._serialized_start = 986
_EXECUTEPLANREQUEST._serialized_end = 1193
_EXECUTEPLANRESPONSE._serialized_start = 1196
_EXECUTEPLANRESPONSE._serialized_end = 2137
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 1479
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 1540
_EXECUTEPLANRESPONSE_JSONBATCH._serialized_start = 1542
_EXECUTEPLANRESPONSE_JSONBATCH._serialized_end = 1602
_EXECUTEPLANRESPONSE_METRICS._serialized_start = 1605
_EXECUTEPLANRESPONSE_METRICS._serialized_end = 2122
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 1700
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 2032
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1909
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 2032
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 2034
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 2122
_SPARKCONNECTSERVICE._serialized_start = 2140
_SPARKCONNECTSERVICE._serialized_end = 2339
_EXECUTEPLANRESPONSE._serialized_end = 1979
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 1398
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 1459
_EXECUTEPLANRESPONSE_METRICS._serialized_start = 1462
_EXECUTEPLANRESPONSE_METRICS._serialized_end = 1979
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 1557
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 1889
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1766
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 1889
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 1891
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 1979
_SPARKCONNECTSERVICE._serialized_start = 1982
_SPARKCONNECTSERVICE._serialized_end = 2181
# @@protoc_insertion_point(module_scope)
Loading