Skip to content
Closed
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
585ad11
padd default instance
cookiedough77 Sep 12, 2025
1c89918
return dummy name for define dataset
cookiedough77 Sep 12, 2025
3c7d421
return dummy for both define flow and dataset
cookiedough77 Sep 12, 2025
692579c
get full name for mv and table works for both dataset and flow
cookiedough77 Sep 15, 2025
84b6d0f
refactor to register fully qualified name
cookiedough77 Sep 15, 2025
30311cb
helper function refactor
cookiedough77 Sep 15, 2025
36811b4
refactor using convertToQualifiedIdentifier
cookiedough77 Sep 15, 2025
3907448
refactor flows
cookiedough77 Sep 15, 2025
7472e0c
rename proto
cookiedough77 Sep 16, 2025
7c425f9
revert convertToQualifiedTableIdentifier
cookiedough77 Sep 16, 2025
11aca82
add val isImplicitFlowForTempView
cookiedough77 Sep 16, 2025
c032f7a
use gridTest
cookiedough77 Sep 16, 2025
540baca
add custom default catalog test
cookiedough77 Sep 16, 2025
51921b9
define dataset and define flow returns qualifiers, return quoted string
cookiedough77 Sep 16, 2025
5e4c7a4
refactor to avoid using var
cookiedough77 Sep 16, 2025
2bd1ffe
nit: add comments, rename
cookiedough77 Sep 16, 2025
983df42
refactor test case using class
cookiedough77 Sep 17, 2025
2981e4d
git: refactor test
cookiedough77 Sep 17, 2025
6bd24a0
nit
cookiedough77 Sep 17, 2025
215e317
fmt
cookiedough77 Sep 17, 2025
ffcc0df
fix proto
cookiedough77 Sep 17, 2025
d39132d
table and view works, need to resolve flow name
cookiedough77 Sep 18, 2025
533743e
fix views as well
cookiedough77 Sep 18, 2025
2b3ae69
refactor flows
cookiedough77 Sep 18, 2025
a920e06
update proto and fmt
cookiedough77 Sep 18, 2025
882ed02
Merge branch 'master' into jessie.luo_data/spark-add-response
cookiedough77 Sep 18, 2025
017e1ea
nit
cookiedough77 Sep 19, 2025
541584e
remove unnecessary parseAndQualifyFlowIdentifier
cookiedough77 Sep 19, 2025
f042af1
update current catallg, databases
cookiedough77 Sep 19, 2025
a4b309c
rename resolved_data_name as resolved_dataset_name
cookiedough77 Sep 20, 2025
f196df9
refactored DefineDatasetResult proto
cookiedough77 Sep 22, 2025
26cadf8
refactor DefineFlow proto
cookiedough77 Sep 22, 2025
7db13d6
Update sql/connect/common/src/main/protobuf/spark/connect/pipelines.p…
cookiedough77 Sep 23, 2025
e4a7f0f
Update sql/connect/common/src/main/protobuf/spark/connect/pipelines.p…
cookiedough77 Sep 23, 2025
39042fd
update namespace as repeated string
cookiedough77 Sep 24, 2025
256d7d9
Merge remote-tracking branch 'origin/jessie.luo_data/spark-add-respon…
cookiedough77 Sep 24, 2025
13beb23
updated comments in proto
cookiedough77 Sep 24, 2025
762df1f
Merge branch 'master' into jessie.luo_data/spark-add-response
cookiedough77 Sep 24, 2025
23ebeee
rename CatalogIdentifier proto fields and put it in common.proto
cookiedough77 Sep 24, 2025
57f96c4
Merge branch 'master' into jessie.luo_data/spark-add-response
cookiedough77 Sep 25, 2025
9f79fc9
rename to ResolvedIdentifier
cookiedough77 Sep 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 31 additions & 27 deletions python/pyspark/sql/connect/proto/pipelines_pb2.py

Large diffs are not rendered by default.

190 changes: 155 additions & 35 deletions python/pyspark/sql/connect/proto/pipelines_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -110,40 +110,6 @@ class PipelineCommand(google.protobuf.message.Message):
self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]
) -> None: ...

class Response(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int
dataflow_graph_id: builtins.str
"""The ID of the created graph."""
def __init__(
self,
*,
dataflow_graph_id: builtins.str | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"_dataflow_graph_id",
b"_dataflow_graph_id",
"dataflow_graph_id",
b"dataflow_graph_id",
],
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"_dataflow_graph_id",
b"_dataflow_graph_id",
"dataflow_graph_id",
b"dataflow_graph_id",
],
) -> None: ...
def WhichOneof(
self,
oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"],
) -> typing_extensions.Literal["dataflow_graph_id"] | None: ...

DEFAULT_CATALOG_FIELD_NUMBER: builtins.int
DEFAULT_DATABASE_FIELD_NUMBER: builtins.int
SQL_CONF_FIELD_NUMBER: builtins.int
Expand Down Expand Up @@ -753,6 +719,69 @@ class PipelineCommandResult(google.protobuf.message.Message):

DESCRIPTOR: google.protobuf.descriptor.Descriptor

class CatalogIdentifier(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

CATALOG_FIELD_NUMBER: builtins.int
DATABASE_FIELD_NUMBER: builtins.int
NAME_FIELD_NUMBER: builtins.int
catalog: builtins.str
database: builtins.str
name: builtins.str
def __init__(
self,
*,
catalog: builtins.str | None = ...,
database: builtins.str | None = ...,
name: builtins.str | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"_catalog",
b"_catalog",
"_database",
b"_database",
"_name",
b"_name",
"catalog",
b"catalog",
"database",
b"database",
"name",
b"name",
],
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"_catalog",
b"_catalog",
"_database",
b"_database",
"_name",
b"_name",
"catalog",
b"catalog",
"database",
b"database",
"name",
b"name",
],
) -> None: ...
@typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_catalog", b"_catalog"]
) -> typing_extensions.Literal["catalog"] | None: ...
@typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_database", b"_database"]
) -> typing_extensions.Literal["database"] | None: ...
@typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_name", b"_name"]
) -> typing_extensions.Literal["name"] | None: ...

class CreateDataflowGraphResult(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

Expand Down Expand Up @@ -787,22 +816,104 @@ class PipelineCommandResult(google.protobuf.message.Message):
oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"],
) -> typing_extensions.Literal["dataflow_graph_id"] | None: ...

class DefineDatasetResult(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

RESOLVED_IDENTIFIER_FIELD_NUMBER: builtins.int
@property
def resolved_identifier(self) -> global___PipelineCommandResult.CatalogIdentifier:
"""Resolved name of the dataset"""
def __init__(
self,
*,
resolved_identifier: global___PipelineCommandResult.CatalogIdentifier | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"_resolved_identifier",
b"_resolved_identifier",
"resolved_identifier",
b"resolved_identifier",
],
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"_resolved_identifier",
b"_resolved_identifier",
"resolved_identifier",
b"resolved_identifier",
],
) -> None: ...
def WhichOneof(
self,
oneof_group: typing_extensions.Literal["_resolved_identifier", b"_resolved_identifier"],
) -> typing_extensions.Literal["resolved_identifier"] | None: ...

class DefineFlowResult(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

RESOLVED_IDENTIFIER_FIELD_NUMBER: builtins.int
@property
def resolved_identifier(self) -> global___PipelineCommandResult.CatalogIdentifier:
"""Resolved name of the flow"""
def __init__(
self,
*,
resolved_identifier: global___PipelineCommandResult.CatalogIdentifier | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"_resolved_identifier",
b"_resolved_identifier",
"resolved_identifier",
b"resolved_identifier",
],
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"_resolved_identifier",
b"_resolved_identifier",
"resolved_identifier",
b"resolved_identifier",
],
) -> None: ...
def WhichOneof(
self,
oneof_group: typing_extensions.Literal["_resolved_identifier", b"_resolved_identifier"],
) -> typing_extensions.Literal["resolved_identifier"] | None: ...

CREATE_DATAFLOW_GRAPH_RESULT_FIELD_NUMBER: builtins.int
DEFINE_DATASET_RESULT_FIELD_NUMBER: builtins.int
DEFINE_FLOW_RESULT_FIELD_NUMBER: builtins.int
@property
def create_dataflow_graph_result(
self,
) -> global___PipelineCommandResult.CreateDataflowGraphResult: ...
@property
def define_dataset_result(self) -> global___PipelineCommandResult.DefineDatasetResult: ...
@property
def define_flow_result(self) -> global___PipelineCommandResult.DefineFlowResult: ...
def __init__(
self,
*,
create_dataflow_graph_result: global___PipelineCommandResult.CreateDataflowGraphResult
| None = ...,
define_dataset_result: global___PipelineCommandResult.DefineDatasetResult | None = ...,
define_flow_result: global___PipelineCommandResult.DefineFlowResult | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"create_dataflow_graph_result",
b"create_dataflow_graph_result",
"define_dataset_result",
b"define_dataset_result",
"define_flow_result",
b"define_flow_result",
"result_type",
b"result_type",
],
Expand All @@ -812,13 +923,22 @@ class PipelineCommandResult(google.protobuf.message.Message):
field_name: typing_extensions.Literal[
"create_dataflow_graph_result",
b"create_dataflow_graph_result",
"define_dataset_result",
b"define_dataset_result",
"define_flow_result",
b"define_flow_result",
"result_type",
b"result_type",
],
) -> None: ...
def WhichOneof(
self, oneof_group: typing_extensions.Literal["result_type", b"result_type"]
) -> typing_extensions.Literal["create_dataflow_graph_result"] | None: ...
) -> (
typing_extensions.Literal[
"create_dataflow_graph_result", "define_dataset_result", "define_flow_result"
]
| None
): ...

global___PipelineCommandResult = PipelineCommandResult

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ sealed trait CatalystIdentifier {
*/
private def quoteIdentifier(name: String): String = name.replace("`", "``")

def resolvedId: String = quoteIdentifier(identifier)
def resolvedDb: Option[String] = database.map(quoteIdentifier)
def resolvedCatalog: Option[String] = catalog.map(quoteIdentifier)

def quotedString: String = {
val replacedId = quoteIdentifier(identifier)
val replacedDb = database.map(quoteIdentifier)
val replacedCatalog = catalog.map(quoteIdentifier)

if (replacedCatalog.isDefined && replacedDb.isDefined) {
s"`${replacedCatalog.get}`.`${replacedDb.get}`.`$replacedId`"
} else if (replacedDb.isDefined) {
s"`${replacedDb.get}`.`$replacedId`"
if (resolvedCatalog.isDefined && resolvedDb.isDefined) {
s"`${resolvedCatalog.get}`.`${resolvedDb.get}`.`$resolvedId`"
} else if (resolvedDb.isDefined) {
s"`${resolvedDb.get}`.`$resolvedId`"
} else {
s"`$replacedId`"
s"`$resolvedId`"
}
}

Expand Down
20 changes: 15 additions & 5 deletions sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ message PipelineCommand {

// SQL configurations for all flows in this graph.
map<string, string> sql_conf = 5;

message Response {
// The ID of the created graph.
optional string dataflow_graph_id = 1;
}
}

// Drops the graph and stops any running attached flows.
Expand Down Expand Up @@ -146,11 +141,26 @@ message PipelineCommand {
message PipelineCommandResult {
oneof result_type {
CreateDataflowGraphResult create_dataflow_graph_result = 1;
DefineDatasetResult define_dataset_result = 2;
DefineFlowResult define_flow_result = 3;
}
message CatalogIdentifier {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang @cloud-fan – thoughts on whether CatalogIdentifier is the right name and whether this is the right location for this message? Since this is a type that might end up useful elsewhere as well, I wonder if it would make more sense as a top-level message inside base.proto or catalog.proto.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it just be Identifier?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Identifier is pretty general. The Identifier class in Scala is scoped within the catalog package. If we had a similar package within the proto namespace, then an Identifier proto could make sense?

optional string catalog = 1;
optional string database = 2;
optional string name = 3;
}
message CreateDataflowGraphResult {
// The ID of the created graph.
optional string dataflow_graph_id = 1;
}
message DefineDatasetResult {
// Resolved name of the dataset
optional CatalogIdentifier resolved_identifier = 1;
}
message DefineFlowResult {
// Resolved name of the flow
optional CatalogIdentifier resolved_identifier = 1;
}
}

// The type of dataset.
Expand Down
Loading