Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
55addb3
[SPARK-41213][CONNECT][PYTHON] Implement `DataFrame.__repr__` and `Da…
zhengruifeng Nov 22, 2022
51b0440
[SPARK-41209][PYTHON] Improve PySpark type inference in _merge_type m…
sadikovi Nov 22, 2022
8496059
[SPARK-41193][SQL][TESTS] Ignore `collect data with single partition …
LuciferYang Nov 22, 2022
b38a115
[SPARK-41169][CONNECT][PYTHON] Implement `DataFrame.drop`
zhengruifeng Nov 22, 2022
d453598
[SPARK-40809][CONNECT][FOLLOW-UP] Do not use Buffer to make Scala 2.1…
HyukjinKwon Nov 22, 2022
40b7d29
[SPARK-41217][SQL] Add the error class `FAILED_FUNCTION_CALL`
MaxGekk Nov 22, 2022
a80899f
[SPARK-41206][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1233` t…
MaxGekk Nov 22, 2022
e16dd7c
[SPARK-41212][CONNECT][PYTHON] Implement `DataFrame.isEmpty`
zhengruifeng Nov 22, 2022
3bff4f6
[SPARK-41135][SQL] Rename `UNSUPPORTED_EMPTY_LOCATION` to `INVALID_EM…
itholic Nov 22, 2022
2513368
[SPARK-35531][SQL] Update hive table stats without unnecessary convert
wankunde Nov 22, 2022
04f026c
[SPARK-41054][UI][CORE] Support RocksDB as KVStore in live UI
gengliangwang Nov 22, 2022
2c8da56
[SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Py…
amaliujia Nov 23, 2022
d275a83
[SPARK-41201][CONNECT][PYTHON][TEST][FOLLOWUP] Reenable test_fill_na
zhengruifeng Nov 23, 2022
1781617
[SPARK-40948][SQL][FOLLOWUP] Restore PATH_NOT_FOUND
itholic Nov 23, 2022
e42d383
[SPARK-41206][SQL][FOLLOWUP] Make result of `checkColumnNameDuplicati…
LuciferYang Nov 23, 2022
79f8c79
[SPARK-40834][SQL][FOLLOWUP] Take care of legacy query end events
cloud-fan Nov 23, 2022
49e102b
[SPARK-40633][BUILD] Upgrade janino to 3.1.9
LuciferYang Nov 23, 2022
2dfb81f
[SPARK-41223][BUILD] Upgrade slf4j to 2.0.4
LuciferYang Nov 23, 2022
2913158
[SPARK-41221][SQL] Add the error class `INVALID_FORMAT`
MaxGekk Nov 23, 2022
c3f8c97
[SPARK-41174][CORE][SQL] Propagate an error class to users for invali…
LuciferYang Nov 23, 2022
b77ced5
[SPARK-41131][SQL] Improve error message for `UNRESOLVED_MAP_KEY.WITH…
itholic Nov 23, 2022
bf687ad
[SPARK-41239][BUILD] Upgrade jackson to 2.14.1
LuciferYang Nov 23, 2022
c0746df
Update pom.xml
xinrong-meng Nov 23, 2022
03cc935
Revert "Update pom.xml"
xinrong-meng Nov 23, 2022
3627b3e
[SPARK-41243][CONNECT][PYTHON][DOCS] Update the protobuf version in R…
xinrong-meng Nov 24, 2022
aa564b9
[SPARK-41225][CONNECT][PYTHON] Disable unsupported functions
grundprinzip Nov 24, 2022
957b0bc
[SPARK-41183][SQL][FOLLOWUP] Change the name from injectPlanNormaliza…
cloud-fan Nov 24, 2022
381dd79
[SPARK-41222][CONNECT][PYTHON] Unify the typing definitions
zhengruifeng Nov 24, 2022
68bdeb8
[SPARK-41215][BUILD][PROTOBUF] Support user configurable protoc execu…
Nov 24, 2022
2b2ffcd
[SPARK-41114][CONNECT] Support local data for LocalRelation
dengziming Nov 24, 2022
57f3f0f
[MINOR][SQL] Fix error message for `UNEXPECTED_INPUT_TYPE`
itholic Nov 24, 2022
9f0aa27
[SPARK-41176][SQL] Assign a name to the error class _LEGACY_ERROR_TEM…
panbingkun Nov 24, 2022
074444b
[SPARK-41179][SQL] Assign a name to the error class _LEGACY_ERROR_TEM…
panbingkun Nov 24, 2022
1f90e41
[SPARK-41182][SQL] Assign a name to the error class _LEGACY_ERROR_TEM…
panbingkun Nov 24, 2022
02e7fad
[SPARK-41250][CONNECT][PYTHON] DataFrame. toPandas should not return …
amaliujia Nov 24, 2022
3cd7f11
[SPARK-41249][SS][TEST] Add acceptance test for self-union on streami…
HeartSaVioR Nov 24, 2022
ac029d6
[SPARK-41224][SPARK-41165][SPARK-41184] Optimized Arrow-based collect…
HyukjinKwon Nov 24, 2022
483e3c9
[SPARK-41097][CORE][SQL][SS][PROTOBUF] Remove redundant collection co…
LuciferYang Nov 24, 2022
033dbe6
[SPARK-41247][BUILD] Unify the Protobuf versions in Spark connect and…
gengliangwang Nov 24, 2022
71b5c5b
[SPARK-41251][PS][INFRA] Upgrade pandas from 1.5.1 to 1.5.2
panbingkun Nov 25, 2022
4006d19
[SPARK-41238][CONNECT][PYTHON] Support more built-in datatypes
zhengruifeng Nov 25, 2022
a205e97
[SPARK-41230][CONNECT][PYTHON] Remove `str` from Aggregate expression…
amaliujia Nov 25, 2022
575b8f0
[SPARK-41257][INFRA] Upgrade actions/labeler to v4
Yikun Nov 25, 2022
b84ddd5
[SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStrea…
zhengruifeng Nov 25, 2022
0ae82d9
[SPARK-41181][SQL] Migrate the map options errors onto error classes
panbingkun Nov 25, 2022
22268ae
[SPARK-41252][BUILD] Upgrade arrow from 10.0.0 to 10.0.1
panbingkun Nov 25, 2022
eb91d8b
[SPARK-41245][BUILD] Upgrade `postgresql` to 42.5.1
bjornjorgensen Nov 25, 2022
da71626
[SPARK-41240][CONNECT][BUILD][INFRA][DOCS] Upgrade `Protobuf` to 3.19.5
bjornjorgensen Nov 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ jobs:
- name: Install Python packages (Python 3.8)
if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
run: |
python3.8 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'grpcio==1.48.1' 'protobuf==3.19.4'
python3.8 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'grpcio==1.48.1' 'protobuf==3.19.5'
python3.8 -m pip list
# Run the tests.
- name: Run tests
Expand Down Expand Up @@ -589,7 +589,7 @@ jobs:
# See also https://issues.apache.org/jira/browse/SPARK-38279.
python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme ipython nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0'
python3.9 -m pip install ipython_genutils # See SPARK-38517
python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' 'grpcio==1.48.1' 'protobuf==3.19.4' 'mypy-protobuf==3.3.0'
python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' 'grpcio==1.48.1' 'protobuf==3.19.5' 'mypy-protobuf==3.3.0'
python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421
apt-get update -y
apt-get install -y ruby ruby-dev
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
#
# However, these are not in a published release and the current `main` branch
# has some issues upon testing.
- uses: actions/labeler@5f867a63be70efff62b767459b009290364495eb # [email protected]
- uses: actions/labeler@v4
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
sync-labels: true
14 changes: 9 additions & 5 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -3990,12 +3990,16 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume
expect_error(read.df(source = "json"),
paste("Error in load : analysis error - Unable to infer schema for JSON.",
"It must be specified manually"))
expect_error(read.df("arbitrary_path"), "Error in load : analysis error - Path does not exist")
expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist")
expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist")
expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist")
expect_error(read.df("arbitrary_path"),
"Error in load : analysis error - \\[PATH_NOT_FOUND\\].*")
expect_error(read.json("arbitrary_path"),
"Error in json : analysis error - \\[PATH_NOT_FOUND\\].*")
expect_error(read.text("arbitrary_path"),
"Error in text : analysis error - \\[PATH_NOT_FOUND\\].*")
expect_error(read.orc("arbitrary_path"),
"Error in orc : analysis error - \\[PATH_NOT_FOUND\\].*")
expect_error(read.parquet("arbitrary_path"),
"Error in parquet : analysis error - Path does not exist")
"Error in parquet : analysis error - \\[PATH_NOT_FOUND\\].*")

# Arguments checking in R side.
expect_error(read.df(path = c(3)),
Expand Down
2 changes: 1 addition & 1 deletion connector/connect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ To use the release version of Spark Connect:

### Generate proto generated files for the Python client
1. Install `buf version 1.8.0`: https://docs.buf.build/installation
2. Run `pip install grpcio==1.48.1 protobuf==4.21.6 mypy-protobuf==3.3.0`
2. Run `pip install grpcio==1.48.1 protobuf==3.19.5 mypy-protobuf==3.3.0`
3. Run `./connector/connect/dev/generate_protos.sh`
4. Optional Check `./dev/check-codegen-python.py`

Expand Down
1 change: 0 additions & 1 deletion connector/connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
<url>https://spark.apache.org/</url>
<properties>
<sbt.project.name>connect</sbt.project.name>
<protobuf.version>3.21.1</protobuf.version>
<guava.version>31.0.1-jre</guava.version>
<guava.failureaccess.version>1.0.1</guava.failureaccess.version>
<io.grpc.version>1.47.0</io.grpc.version>
Expand Down
15 changes: 15 additions & 0 deletions connector/connect/src/main/protobuf/spark/connect/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ message AnalyzePlanResponse {

// The extended explain string as produced by Spark.
string explain_string = 3;

// Get the tree string of the schema.
string tree_string = 4;

// Whether the 'collect' and 'take' methods can be run locally.
bool is_local = 5;

// Whether this plan contains one or more sources that continuously
// return data as it arrives.
bool is_streaming = 6;

// A best-effort snapshot of the files that compose this Dataset
repeated string input_files = 7;
}

// A request to be executed by the service.
Expand Down Expand Up @@ -174,6 +187,8 @@ message ExecutePlanResponse {
service SparkConnectService {

// Executes a request that contains the query and returns a stream of [[Response]].
//
// It is guaranteed that there is at least one ARROW batch returned even if the result set is empty.
rpc ExecutePlan(ExecutePlanRequest) returns (stream ExecutePlanResponse) {}

// Analyzes a query and returns a [[AnalyzeResponse]] containing metadata about the query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ message Expression {
bytes uuid = 28;
DataType null = 29; // a typed null literal
List list = 30;
DataType.List empty_list = 31;
DataType.Array empty_array = 31;
DataType.Map empty_map = 32;
UserDefined user_defined = 33;
}
Expand Down
20 changes: 17 additions & 3 deletions connector/connect/src/main/protobuf/spark/connect/relations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message Relation {
RenameColumnsBySameLengthNames rename_columns_by_same_length_names = 18;
RenameColumnsByNameToNameMap rename_columns_by_name_to_name_map = 19;
ShowString show_string = 20;
Drop drop = 21;

// NA functions
NAFill fill_na = 90;
Expand Down Expand Up @@ -252,6 +253,19 @@ message Sort {
}
}


// Drop specified columns.
message Drop {
// (Required) The input relation.
Relation input = 1;

// (Required) columns to drop.
//
// Should contain at least 1 item.
repeated Expression cols = 2;
}


// Relation of type [[Deduplicate]] which have duplicate rows removed, could consider either only
// the subset of columns or all the columns.
message Deduplicate {
Expand All @@ -271,9 +285,9 @@ message Deduplicate {

// A relation that does not need to be qualified by name.
message LocalRelation {
// (Optional) A list qualified attributes.
repeated Expression.QualifiedAttribute attributes = 1;
// TODO: support local data.
// Local collection data serialized into Arrow IPC streaming format which contains
// the schema of the data.
bytes data = 1;
}

// Relation of type [[Sample]] that samples a fraction of the dataset.
Expand Down
123 changes: 73 additions & 50 deletions connector/connect/src/main/protobuf/spark/connect/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,46 @@ option java_package = "org.apache.spark.connect.proto";
// itself but only describes it.
message DataType {
oneof kind {
Boolean bool = 1;
I8 i8 = 2;
I16 i16 = 3;
I32 i32 = 5;
I64 i64 = 7;
FP32 fp32 = 10;
FP64 fp64 = 11;
String string = 12;
Binary binary = 13;
Timestamp timestamp = 14;
Date date = 16;
Time time = 17;
IntervalYear interval_year = 19;
IntervalDay interval_day = 20;
TimestampTZ timestamp_tz = 29;
UUID uuid = 32;

FixedChar fixed_char = 21;
VarChar varchar = 22;
FixedBinary fixed_binary = 23;
Decimal decimal = 24;

Struct struct = 25;
List list = 27;
Map map = 28;
NULL null = 1;

Binary binary = 2;

Boolean boolean = 3;

// Numeric types
Byte byte = 4;
Short short = 5;
Integer integer = 6;
Long long = 7;

Float float = 8;
Double double = 9;
Decimal decimal = 10;

// String types
String string = 11;
Char char = 12;
VarChar var_char = 13;

// Datatime types
Date date = 14;
Timestamp timestamp = 15;
TimestampNTZ timestamp_ntz = 16;

// Interval types
CalendarInterval calendar_interval = 17;
YearMonthInterval year_month_interval = 18;
DayTimeInterval day_time_interval = 19;

// Complex types
Array array = 20;
Struct struct = 21;
Map map = 22;


UUID uuid = 25;

FixedBinary fixed_binary = 26;

uint32 user_defined_type_reference = 31;
}
Expand All @@ -59,27 +74,27 @@ message DataType {
uint32 type_variation_reference = 1;
}

message I8 {
message Byte {
uint32 type_variation_reference = 1;
}

message I16 {
message Short {
uint32 type_variation_reference = 1;
}

message I32 {
message Integer {
uint32 type_variation_reference = 1;
}

message I64 {
message Long {
uint32 type_variation_reference = 1;
}

message FP32 {
message Float {
uint32 type_variation_reference = 1;
}

message FP64 {
message Double {
uint32 type_variation_reference = 1;
}

Expand All @@ -91,6 +106,10 @@ message DataType {
uint32 type_variation_reference = 1;
}

message NULL {
uint32 type_variation_reference = 1;
}

message Timestamp {
uint32 type_variation_reference = 1;
}
Expand All @@ -99,28 +118,32 @@ message DataType {
uint32 type_variation_reference = 1;
}

message Time {
message TimestampNTZ {
uint32 type_variation_reference = 1;
}

message TimestampTZ {
message CalendarInterval {
uint32 type_variation_reference = 1;
}

message IntervalYear {
uint32 type_variation_reference = 1;
message YearMonthInterval {
optional int32 start_field = 1;
optional int32 end_field = 2;
uint32 type_variation_reference = 3;
}

message IntervalDay {
uint32 type_variation_reference = 1;
message DayTimeInterval {
optional int32 start_field = 1;
optional int32 end_field = 2;
uint32 type_variation_reference = 3;
}

message UUID {
uint32 type_variation_reference = 1;
}

// Start compound types.
message FixedChar {
message Char {
int32 length = 1;
uint32 type_variation_reference = 2;
}
Expand All @@ -136,14 +159,14 @@ message DataType {
}

message Decimal {
int32 scale = 1;
int32 precision = 2;
optional int32 scale = 1;
optional int32 precision = 2;
uint32 type_variation_reference = 3;
}

message StructField {
DataType type = 1;
string name = 2;
string name = 1;
DataType data_type = 2;
bool nullable = 3;
map<string, string> metadata = 4;
}
Expand All @@ -153,16 +176,16 @@ message DataType {
uint32 type_variation_reference = 2;
}

message List {
DataType DataType = 1;
uint32 type_variation_reference = 2;
bool element_nullable = 3;
message Array {
DataType element_type = 1;
bool contains_null = 2;
uint32 type_variation_reference = 3;
}

message Map {
DataType key = 1;
DataType value = 2;
uint32 type_variation_reference = 3;
bool value_nullable = 4;
DataType key_type = 1;
DataType value_type = 2;
bool value_contains_null = 3;
uint32 type_variation_reference = 4;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ package object dsl {
for (attr <- attrs) {
val structField = DataType.StructField.newBuilder()
structField.setName(attr.getName)
structField.setType(attr.getType)
structField.setDataType(attr.getType)
structField.setNullable(true)
structExpr.addFields(structField)
}
Expression.QualifiedAttribute
Expand All @@ -66,7 +67,7 @@ package object dsl {

/** Creates a new AttributeReference of type int */
def int: Expression.QualifiedAttribute = protoQualifiedAttrWithType(
DataType.newBuilder().setI32(DataType.I32.newBuilder()).build())
DataType.newBuilder().setInteger(DataType.Integer.newBuilder()).build())

private def protoQualifiedAttrWithType(dataType: DataType): Expression.QualifiedAttribute =
Expression.QualifiedAttribute
Expand Down Expand Up @@ -510,6 +511,28 @@ package object dsl {
.build()
}

def drop(columns: String*): Relation = {
assert(columns.nonEmpty)

val cols = columns.map(col =>
Expression.newBuilder
.setUnresolvedAttribute(
Expression.UnresolvedAttribute.newBuilder
.setUnparsedIdentifier(col)
.build())
.build())

Relation
.newBuilder()
.setDrop(
Drop
.newBuilder()
.setInput(logicalPlan)
.addAllCols(cols.asJava)
.build())
.build()
}

def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*): Relation = {
val agg = Aggregate.newBuilder()
agg.setInput(logicalPlan)
Expand Down
Loading