Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions connector/connect/src/main/protobuf/spark/connect/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ message AnalyzePlanResponse {

// The extended explain string as produced by Spark.
string explain_string = 3;

// Get the tree string of the schema.
string tree_string = 4;

// Whether the 'collect' and 'take' methods can be run locally.
bool is_local = 5;

// Whether this plan contains one or more sources that continuously
// return data as it arrives.
bool is_streaming = 6;

// A best-effort snapshot of the files that compose this Dataset
repeated string input_files = 7;
}

// A request to be executed by the service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.connect.service

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import com.google.common.base.Ticker
import com.google.common.cache.CacheBuilder
import io.grpc.{Server, Status}
Expand Down Expand Up @@ -127,10 +129,13 @@ class SparkConnectService(debug: Boolean)
val ds = Dataset.ofRows(session, logicalPlan)
val explainString = ds.queryExecution.explainString(explainMode)

val response = proto.AnalyzePlanResponse
.newBuilder()
.setExplainString(explainString)
val response = proto.AnalyzePlanResponse.newBuilder()
response.setSchema(DataTypeProtoConverter.toConnectProtoType(ds.schema))
response.setExplainString(explainString)
response.setTreeString(ds.schema.treeString)
response.setIsLocal(ds.isLocal)
response.setIsStreaming(ds.isStreaming)
response.addAllInputFiles(ds.inputFiles.toSeq.asJava)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ class SparkConnectServiceSuite extends SharedSparkSession {
assert(
schema.getFields(1).getName == "col2"
&& schema.getFields(1).getDataType.getKindCase == proto.DataType.KindCase.STRING)

assert(!response.getIsLocal)
assert(!response.getIsLocal)

assert(response.getTreeString.contains("root"))
assert(response.getTreeString.contains("|-- col1: integer (nullable = true)"))
assert(response.getTreeString.contains("|-- col2: string (nullable = true)"))

assert(response.getInputFilesCount === 0)
}
}

Expand Down
23 changes: 21 additions & 2 deletions python/pyspark/sql/connect/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,32 @@ def metrics(self) -> List[MetricValue]:


class AnalyzeResult:
def __init__(self, schema: pb2.DataType, explain: str):
def __init__(
self,
schema: pb2.DataType,
explain: str,
tree_string: str,
is_local: bool,
is_streaming: bool,
input_files: List[str],
):
self.schema = schema
self.explain_string = explain
self.tree_string = tree_string
self.is_local = is_local
self.is_streaming = is_streaming
self.input_files = input_files

@classmethod
def fromProto(cls, pb: Any) -> "AnalyzeResult":
return AnalyzeResult(pb.schema, pb.explain_string)
return AnalyzeResult(
pb.schema,
pb.explain_string,
pb.tree_string,
pb.is_local,
pb.is_streaming,
pb.input_files,
)


class RemoteSparkSession(object):
Expand Down
78 changes: 77 additions & 1 deletion python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def __init__(
"""Creates a new data frame"""
self._schema = schema
self._plan: Optional[plan.LogicalPlan] = None
self._cache: Dict[str, Any] = {}
self._session: "RemoteSparkSession" = session

def __repr__(self) -> str:
Expand Down Expand Up @@ -822,6 +821,83 @@ def schema(self) -> StructType:
else:
return self._schema

@property
def isLocal(self) -> bool:
"""Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally
(without any Spark executors).

.. versionadded:: 3.4.0

Returns
-------
bool
"""
if self._plan is None:
raise Exception("Cannot analyze on empty plan.")
query = self._plan.to_proto(self._session)
return self._session._analyze(query).is_local
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we going to cache the analyze result later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#38742 (comment)

I think we will do the caching in near future.

Copy link
Contributor

@amaliujia amaliujia Nov 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We literally can cache everything for each DataFrame since it is immutable. But I guess we need a design/discussion to clarify details of how and when.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another interesting question is if we want to do caching on the server side.


@property
def isStreaming(self) -> bool:
"""Returns ``True`` if this :class:`DataFrame` contains one or more sources that
continuously return data as it arrives. A :class:`DataFrame` that reads data from a
streaming source must be executed as a :class:`StreamingQuery` using the :func:`start`
method in :class:`DataStreamWriter`. Methods that return a single answer, (e.g.,
:func:`count` or :func:`collect`) will throw an :class:`AnalysisException` when there
is a streaming source present.

.. versionadded:: 3.4.0

Notes
-----
This API is evolving.

Returns
-------
bool
Whether it's streaming DataFrame or not.
"""
if self._plan is None:
raise Exception("Cannot analyze on empty plan.")
query = self._plan.to_proto(self._session)
return self._session._analyze(query).is_streaming

def _tree_string(self) -> str:
if self._plan is None:
raise Exception("Cannot analyze on empty plan.")
query = self._plan.to_proto(self._session)
return self._session._analyze(query).tree_string

def printSchema(self) -> None:
"""Prints out the schema in the tree format.

.. versionadded:: 3.4.0

Returns
-------
None
"""
print(self._tree_string())

def inputFiles(self) -> List[str]:
"""
Returns a best-effort snapshot of the files that compose this :class:`DataFrame`.
This method simply asks each constituent BaseRelation for its respective files and
takes the union of all results. Depending on the source relations, this may not find
all input files. Duplicates are removed.

.. versionadded:: 3.4.0

Returns
-------
list
List of file paths.
"""
if self._plan is None:
raise Exception("Cannot analyze on empty plan.")
query = self._plan.to_proto(self._session)
return self._session._analyze(query).input_files

def transform(self, func: Callable[..., "DataFrame"], *args: Any, **kwargs: Any) -> "DataFrame":
"""Returns a new :class:`DataFrame`. Concise syntax for chaining custom transformations.

Expand Down
36 changes: 18 additions & 18 deletions python/pyspark/sql/connect/proto/base_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xb5\x01\n\x07\x45xplain\x12\x45\n\x0c\x65xplain_mode\x18\x01 \x01(\x0e\x32".spark.connect.Explain.ExplainModeR\x0b\x65xplainMode"c\n\x0b\x45xplainMode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\n\n\x06SIMPLE\x10\x01\x12\x0c\n\x08\x45XTENDED\x10\x02\x12\x0b\n\x07\x43ODEGEN\x10\x03\x12\x08\n\x04\x43OST\x10\x04\x12\r\n\tFORMATTED\x10\x05"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x01(\tR\x08userName\x12\x35\n\nextensions\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\nextensions"\x81\x02\n\x12\x41nalyzePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x12\x30\n\x07\x65xplain\x18\x05 \x01(\x0b\x32\x16.spark.connect.ExplainR\x07\x65xplainB\x0e\n\x0c_client_type"\x8a\x01\n\x13\x41nalyzePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12/\n\x06schema\x18\x02 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x12%\n\x0e\x65xplain_string\x18\x03 \x01(\tR\rexplainString"\xcf\x01\n\x12\x45xecutePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x42\x0e\n\x0c_client_type"\x8f\x06\n\x13\x45xecutePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12N\n\x0b\x61rrow_batch\x18\x02 \x01(\x0b\x32-.spark.connect.ExecutePlanResponse.ArrowBatchR\narrowBatch\x12\x44\n\x07metrics\x18\x04 \x01(\x0b\x32*.spark.connect.ExecutePlanResponse.MetricsR\x07metrics\x1a=\n\nArrowBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x1a\x85\x04\n\x07Metrics\x12Q\n\x07metrics\x18\x01 \x03(\x0b\x32\x37.spark.connect.ExecutePlanResponse.Metrics.MetricObjectR\x07metrics\x1a\xcc\x02\n\x0cMetricObject\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x17\n\x07plan_id\x18\x02 \x01(\x03R\x06planId\x12\x16\n\x06parent\x18\x03 \x01(\x03R\x06parent\x12z\n\x11\x65xecution_metrics\x18\x04 \x03(\x0b\x32M.spark.connect.ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntryR\x10\x65xecutionMetrics\x1a{\n\x15\x45xecutionMetricsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ExecutePlanResponse.Metrics.MetricValueR\x05value:\x02\x38\x01\x1aX\n\x0bMetricValue\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n\x05value\x18\x02 \x01(\x03R\x05value\x12\x1f\n\x0bmetric_type\x18\x03 \x01(\tR\nmetricType2\xc7\x01\n\x13SparkConnectService\x12X\n\x0b\x45xecutePlan\x12!.spark.connect.ExecutePlanRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12V\n\x0b\x41nalyzePlan\x12!.spark.connect.AnalyzePlanRequest\x1a".spark.connect.AnalyzePlanResponse"\x00\x42"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3'
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xb5\x01\n\x07\x45xplain\x12\x45\n\x0c\x65xplain_mode\x18\x01 \x01(\x0e\x32".spark.connect.Explain.ExplainModeR\x0b\x65xplainMode"c\n\x0b\x45xplainMode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\n\n\x06SIMPLE\x10\x01\x12\x0c\n\x08\x45XTENDED\x10\x02\x12\x0b\n\x07\x43ODEGEN\x10\x03\x12\x08\n\x04\x43OST\x10\x04\x12\r\n\tFORMATTED\x10\x05"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x01(\tR\x08userName\x12\x35\n\nextensions\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\nextensions"\x81\x02\n\x12\x41nalyzePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x12\x30\n\x07\x65xplain\x18\x05 \x01(\x0b\x32\x16.spark.connect.ExplainR\x07\x65xplainB\x0e\n\x0c_client_type"\x8a\x02\n\x13\x41nalyzePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12/\n\x06schema\x18\x02 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x12%\n\x0e\x65xplain_string\x18\x03 \x01(\tR\rexplainString\x12\x1f\n\x0btree_string\x18\x04 \x01(\tR\ntreeString\x12\x19\n\x08is_local\x18\x05 \x01(\x08R\x07isLocal\x12!\n\x0cis_streaming\x18\x06 \x01(\x08R\x0bisStreaming\x12\x1f\n\x0binput_files\x18\x07 \x03(\tR\ninputFiles"\xcf\x01\n\x12\x45xecutePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x42\x0e\n\x0c_client_type"\x8f\x06\n\x13\x45xecutePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12N\n\x0b\x61rrow_batch\x18\x02 \x01(\x0b\x32-.spark.connect.ExecutePlanResponse.ArrowBatchR\narrowBatch\x12\x44\n\x07metrics\x18\x04 \x01(\x0b\x32*.spark.connect.ExecutePlanResponse.MetricsR\x07metrics\x1a=\n\nArrowBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x1a\x85\x04\n\x07Metrics\x12Q\n\x07metrics\x18\x01 \x03(\x0b\x32\x37.spark.connect.ExecutePlanResponse.Metrics.MetricObjectR\x07metrics\x1a\xcc\x02\n\x0cMetricObject\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x17\n\x07plan_id\x18\x02 \x01(\x03R\x06planId\x12\x16\n\x06parent\x18\x03 \x01(\x03R\x06parent\x12z\n\x11\x65xecution_metrics\x18\x04 \x03(\x0b\x32M.spark.connect.ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntryR\x10\x65xecutionMetrics\x1a{\n\x15\x45xecutionMetricsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ExecutePlanResponse.Metrics.MetricValueR\x05value:\x02\x38\x01\x1aX\n\x0bMetricValue\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n\x05value\x18\x02 \x01(\x03R\x05value\x12\x1f\n\x0bmetric_type\x18\x03 \x01(\tR\nmetricType2\xc7\x01\n\x13SparkConnectService\x12X\n\x0b\x45xecutePlan\x12!.spark.connect.ExecutePlanRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12V\n\x0b\x41nalyzePlan\x12!.spark.connect.AnalyzePlanRequest\x1a".spark.connect.AnalyzePlanResponse"\x00\x42"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3'
)


Expand Down Expand Up @@ -204,21 +204,21 @@
_ANALYZEPLANREQUEST._serialized_start = 585
_ANALYZEPLANREQUEST._serialized_end = 842
_ANALYZEPLANRESPONSE._serialized_start = 845
_ANALYZEPLANRESPONSE._serialized_end = 983
_EXECUTEPLANREQUEST._serialized_start = 986
_EXECUTEPLANREQUEST._serialized_end = 1193
_EXECUTEPLANRESPONSE._serialized_start = 1196
_EXECUTEPLANRESPONSE._serialized_end = 1979
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 1398
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 1459
_EXECUTEPLANRESPONSE_METRICS._serialized_start = 1462
_EXECUTEPLANRESPONSE_METRICS._serialized_end = 1979
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 1557
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 1889
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1766
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 1889
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 1891
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 1979
_SPARKCONNECTSERVICE._serialized_start = 1982
_SPARKCONNECTSERVICE._serialized_end = 2181
_ANALYZEPLANRESPONSE._serialized_end = 1111
_EXECUTEPLANREQUEST._serialized_start = 1114
_EXECUTEPLANREQUEST._serialized_end = 1321
_EXECUTEPLANRESPONSE._serialized_start = 1324
_EXECUTEPLANRESPONSE._serialized_end = 2107
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 1526
_EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 1587
_EXECUTEPLANRESPONSE_METRICS._serialized_start = 1590
_EXECUTEPLANRESPONSE_METRICS._serialized_end = 2107
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 1685
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 2017
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 1894
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 2017
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 2019
_EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 2107
_SPARKCONNECTSERVICE._serialized_start = 2110
_SPARKCONNECTSERVICE._serialized_end = 2309
# @@protoc_insertion_point(module_scope)
36 changes: 35 additions & 1 deletion python/pyspark/sql/connect/proto/base_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -283,25 +283,59 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
CLIENT_ID_FIELD_NUMBER: builtins.int
SCHEMA_FIELD_NUMBER: builtins.int
EXPLAIN_STRING_FIELD_NUMBER: builtins.int
TREE_STRING_FIELD_NUMBER: builtins.int
IS_LOCAL_FIELD_NUMBER: builtins.int
IS_STREAMING_FIELD_NUMBER: builtins.int
INPUT_FILES_FIELD_NUMBER: builtins.int
client_id: builtins.str
@property
def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType: ...
explain_string: builtins.str
"""The extended explain string as produced by Spark."""
tree_string: builtins.str
"""Get the tree string of the schema."""
is_local: builtins.bool
"""Whether the 'collect' and 'take' methods can be run locally."""
is_streaming: builtins.bool
"""Whether this plan contains one or more sources that continuously
return data as it arrives.
"""
@property
def input_files(
self,
) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
"""A best-effort snapshot of the files that compose this Dataset"""
def __init__(
self,
*,
client_id: builtins.str = ...,
schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
explain_string: builtins.str = ...,
tree_string: builtins.str = ...,
is_local: builtins.bool = ...,
is_streaming: builtins.bool = ...,
input_files: collections.abc.Iterable[builtins.str] | None = ...,
) -> None: ...
def HasField(
self, field_name: typing_extensions.Literal["schema", b"schema"]
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"client_id", b"client_id", "explain_string", b"explain_string", "schema", b"schema"
"client_id",
b"client_id",
"explain_string",
b"explain_string",
"input_files",
b"input_files",
"is_local",
b"is_local",
"is_streaming",
b"is_streaming",
"schema",
b"schema",
"tree_string",
b"tree_string",
],
) -> None: ...

Expand Down
40 changes: 40 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,46 @@ def test_schema(self):
self.connect.sql(query).schema.__repr__(),
)

def test_print_schema(self):
# SPARK-41216: Test print schema
tree_str = self.connect.sql("SELECT 1 AS X, 2 AS Y")._tree_string()
# root
# |-- X: integer (nullable = false)
# |-- Y: integer (nullable = false)
expected = "root\n |-- X: integer (nullable = false)\n |-- Y: integer (nullable = false)\n"
self.assertEqual(tree_str, expected)

def test_is_local(self):
# SPARK-41216: Test is local
self.assertTrue(self.connect.sql("SHOW DATABASES").isLocal)
self.assertFalse(self.connect.read.table(self.tbl_name).isLocal)

def test_is_streaming(self):
# SPARK-41216: Test is streaming
self.assertFalse(self.connect.read.table(self.tbl_name).isStreaming)
self.assertFalse(self.connect.sql("SELECT 1 AS X LIMIT 0").isStreaming)

def test_input_files(self):
# SPARK-41216: Test input files
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
try:
self.df_text.write.text(tmpPath)

input_files_list1 = (
self.spark.read.format("text").schema("id STRING").load(path=tmpPath).inputFiles()
)
input_files_list2 = (
self.connect.read.format("text").schema("id STRING").load(path=tmpPath).inputFiles()
)

self.assertTrue(len(input_files_list1) > 0)
self.assertEqual(len(input_files_list1), len(input_files_list2))
for file_path in input_files_list2:
self.assertTrue(file_path in input_files_list1)
finally:
shutil.rmtree(tmpPath)

def test_simple_binary_expressions(self):
"""Test complex expression"""
df = self.connect.read.table(self.tbl_name)
Expand Down