Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5457193
[MINOR][BUILD] Correct the `files` contend in `checkstyle-suppression…
LuciferYang Nov 1, 2022
658681f
[SPARK-40930][CONNECT] Support Collect() in Python client
amaliujia Nov 2, 2022
a0ab172
Revert "[SPARK-40976][BUILD] Upgrade sbt to 1.7.3"
linhongliu-db Nov 2, 2022
3b0d0aa
[SPARK-40990][PYTHON] DataFrame creation from 2d NumPy array with arb…
xinrong-meng Nov 2, 2022
23c54ad
[SPARK-40748][SQL] Migrate type check failures of conditions onto err…
panbingkun Nov 2, 2022
9fda99a
[MINOR][SQL] Wrap `given` in backticks to fix compilation warning
LuciferYang Nov 2, 2022
b938058
[SPARK-40968] Fix a few wrong/misleading comments in DAGSchedulerSuite
JiexingLi Nov 2, 2022
9d30e33
[SPARK-40991][PYTHON] Update `cloudpickle` to v2.2.0
dongjoon-hyun Nov 2, 2022
f03fdf9
[SPARK-40883][CONNECT][FOLLOW-UP] Range.step is required and Python c…
amaliujia Nov 2, 2022
d627d8e
[SPARK-40248][SQL] Use larger number of bits to build Bloom filter
wangyum Nov 2, 2022
68531ad
[SPARK-40374][SQL] Migrate type check failures of type creators onto …
panbingkun Nov 2, 2022
5fa2c13
[SPARK-40957] Add in memory cache in HDFSMetadataLog
jerrypeng Nov 2, 2022
c4e6b2c
[SPARK-40985][BUILD] Upgrade RoaringBitmap to 0.9.35
LuciferYang Nov 2, 2022
e63b7da
[SPARK-40995][CONNECT][DOC] Defining Spark Connect Client Connection …
grundprinzip Nov 3, 2022
adb41ca
[SPARK-40989][CONNECT][PYTHON][TESTS] Improve `session.sql` testing c…
amaliujia Nov 3, 2022
39824f1
[SPARK-40995][CONNECT][DOC][FOLLOW-UP] Fix the type in the doc name
amaliujia Nov 3, 2022
1072204
[SPARK-40977][CONNECT][PYTHON] Complete Support for Union in Python c…
amaliujia Nov 3, 2022
a3c6cd6
[SPARK-40998][SQL] Rename the error class `_LEGACY_ERROR_TEMP_0040` t…
MaxGekk Nov 3, 2022
84d025c
[MINOR][CONNECT] Fix file name in docs
dengziming Nov 3, 2022
3f8a8b3
[SPARK-40798][SQL][FOLLOW-UP] Fix ansi mode in v2 ALTER TABLE PARTITION
ulysses-you Nov 3, 2022
f913a6e
[SPARK-38270][BUILD][FOLLOW-UP] Exclude productElementName and produc…
HyukjinKwon Nov 3, 2022
5a2da01
[SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the…
beliefer Nov 3, 2022
0745dae
[SPARK-40834][SQL] Use SparkListenerSQLExecutionEnd to track final SQ…
ulysses-you Nov 3, 2022
67d0dc8
[SPARK-40996][BUILD] Upgrade `sbt-checkstyle-plugin` to 4.0.0 to res…
LuciferYang Nov 3, 2022
7f3b598
[SPARK-40869][K8S] Resource name prefix should not start with a hyphen
tobiasstadler Nov 3, 2022
2de57d6
[SPARK-40976][BUILD] Upgrade sbt to 1.7.3
LuciferYang Nov 4, 2022
af66a31
[SPARK-40815][SQL][FOLLOW-UP] Disable DelegateSymlinkTextInputFormat …
sadikovi Nov 4, 2022
ed72695
[SPARK-41001][CONNECT][DOC] Note: Connection string parameters are ca…
grundprinzip Nov 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ private static long optimalNumOfBits(long n, double p) {

static final double DEFAULT_FPP = 0.03;

/**
* Computes m (total bits of Bloom filter) which is expected to achieve.
* The smaller the expectedNumItems, the smaller the fpp.
*/
public static long optimalNumOfBits(long expectedNumItems, long maxNumItems, long maxNumOfBits) {
double fpp = Math.min(expectedNumItems / (maxNumItems / DEFAULT_FPP), DEFAULT_FPP);
return Math.min(optimalNumOfBits(expectedNumItems, fpp), maxNumOfBits);
}

/**
* Creates a {@link BloomFilter} with the expected number of insertions and a default expected
* false positive probability of 3%.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Spark Connect
# Spark Connect - Developer Documentation

**Spark Connect is a strictly experimental feature and under heavy development.
All APIs should be considered volatile and should not be used in production.**
Expand All @@ -7,7 +7,13 @@ This module contains the implementation of Spark Connect which is a logical plan
facade for the implementation in Spark. Spark Connect is directly integrated into the build
of Spark. To enable it, you only need to activate the driver plugin for Spark Connect.

## Build
The documentation linked here is specifically for developers of Spark Connect and not
directly intended to be end-user documentation.


## Getting Started

### Build

```bash
./build/mvn -Phive clean package
Expand All @@ -19,7 +25,7 @@ or
./build/sbt -Phive clean package
```

## Run Spark Shell
### Run Spark Shell

To run Spark Connect you locally built:

Expand All @@ -43,14 +49,24 @@ To use the release version of Spark Connect:
--conf spark.plugins=org.apache.spark.sql.connect.SparkConnectPlugin
```

## Run Tests
### Run Tests

```bash
./python/run-tests --testnames 'pyspark.sql.tests.connect.test_connect_basic'
```

## Generate proto generated files for the Python client

## Development Topics

### Generate proto generated files for the Python client
1. Install `buf version 1.8.0`: https://docs.buf.build/installation
2. Run `pip install grpcio==1.48.1 protobuf==4.21.6 mypy-protobuf==3.3.0`
3. Run `./connector/connect/dev/generate_protos.sh`
4. Optional Check `./dev/check-codegen-python.py`

### Guidelines for new clients

When contributing a new client please be aware that we strive to have a common
user experience across all languages. Please follow the below guidelines:

* [Connection string configuration](docs/client-connection-string.md)
116 changes: 116 additions & 0 deletions connector/connect/docs/client-connection-string.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Connecting to Spark Connect using Clients

From the client perspective, Spark Connect mostly behaves as any other GRPC
client and can be configured as such. However, to make it easy to use from
different programming languages and to have a homogenous connection surface
this document proposes what the user surface is for connecting to a
Spark Connect endpoint.

## Background
Similar to JDBC or other database connections, Spark Connect leverages a
connection string that contains the relevant parameters that are interpreted
to connect to the Spark Connect endpoint


## Connection String

Generally, the connection string follows the standard URI definitions. The URI
scheme is fixed and set to `sc://`. The full URI has to be a
[valid URI](http://www.faqs.org/rfcs/rfc2396.html) and must
be parsed properly by most systems. For example, hostnames have to be valid and
cannot contain arbitrary characters. Configuration parameter are passed in the
style of the HTTP URL Path Parameter Syntax. This is similar to the JDBC connection
strings. The path component must be empty. All parameters are interpreted **case sensitive**.

```shell
sc://hostname:port/;param1=value;param2=value
```

<table>
<tr>
<td>Parameter</td>
<td>Type</td>
<td>Description</td>
<td>Examples</td>
</tr>
<tr>
<td>hostname</td>
<td>String</td>
<td>
The hostname of the endpoint for Spark Connect. Since the endpoint
has to be a fully GRPC compatible endpoint a particular path cannot
be specified. The hostname must be fully qualified or can be an IP
address as well.
</td>
<td>
<pre>myexample.com</pre>
<pre>127.0.0.1</pre>
</td>
</tr>
<tr>
<td>port</td>
<td>Numeric</td>
<td>The portname to be used when connecting to the GRPC endpoint. The
default values is: <b>15002</b>. Any valid port number can be used.</td>
<td><pre>15002</pre><pre>443</pre></td>
</tr>
<tr>
<td>token</td>
<td>String</td>
<td>When this param is set in the URL, it will enable standard
bearer token authentication using GRPC. By default this value is not set.</td>
<td><pre>token=ABCDEFGH</pre></td>
</tr>
<tr>
<td>use_ssl</td>
<td>Boolean</td>
<td>When this flag is set, will by default connect to the endpoint
using TLS. The assumption is that the necessary certificates to verify
the server certificates are available in the system. The default
value is <b>false</b></td>
<td><pre>use_ssl=true</pre><pre>use_ssl=false</pre></td>
</tr>
<tr>
<td>user_id</td>
<td>String</td>
<td>User ID to automatically set in the Spark Connect UserContext message.
This is necssary for the appropriate Spark Session management. This is an
*optional* parameter and depending on the deployment this parameter might
be automatically injected using other means.</td>
<td>
<pre>user_id=Martin</pre>
</td>
</tr>
</table>

## Examples

### Valid Examples
Below we capture valid configuration examples, explaining how the connection string
will be used when configuring the Spark Connect client.

The below example connects to port **`15002`** on **myhost.com**.
```python
server_url = "sc://myhost.com/"
```

The next example configures the connection to use a different port with SSL.

```python
server_url = "sc://myhost.com:443/;use_ssl=true"
```

```python
server_url = "sc://myhost.com:443/;use_ssl=true;token=ABCDEFG"
```

### Invalid Examples

As mentioned above, Spark Connect uses a regular GRPC client and the server path
cannot be configured to remain compatible with the GRPC standard and HTTP. For
example the following examles are invalid.

```python
server_url = "sc://myhost.com:443/mypathprefix/;token=AAAAAAA"
```

Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,12 @@ message Range {
int64 start = 1;
// Required.
int64 end = 2;
// Optional. Default value = 1
Step step = 3;
// Required.
int64 step = 3;
// Optional. Default value is assigned by 1) SQL conf "spark.sql.leafNodeDefaultParallelism" if
// it is set, or 2) spark default parallelism.
NumPartitions num_partitions = 4;

message Step {
int64 step = 1;
}

message NumPartitions {
int32 num_partitions = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ package object dsl {
}
range.setEnd(end)
if (step.isDefined) {
range.setStep(proto.Range.Step.newBuilder().setStep(step.get))
range.setStep(step.get)
} else {
range.setStep(1L)
}
if (numPartitions.isDefined) {
range.setNumPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
private def transformRange(rel: proto.Range): LogicalPlan = {
val start = rel.getStart
val end = rel.getEnd
val step = if (rel.hasStep) {
rel.getStep.getStep
} else {
1
}
val step = rel.getStep
val numPartitions = if (rel.hasNumPartitions) {
rel.getNumPartitions.getNumPartitions
} else {
Expand Down
8 changes: 4 additions & 4 deletions core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
MapStatuses Convert Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
OpenJDK 64-Bit Server VM 11.0.16.1+1 on Linux 5.15.0-1022-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 1269 1276 8 0.0 1268666001.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2672 2695 39 0.0 2671542753.0 0.5X
Num Maps: 50000 Fetch partitions:1500 4034 4069 50 0.0 4033696987.0 0.3X
Num Maps: 50000 Fetch partitions:500 1227 1262 47 0.0 1226744907.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2620 2637 15 0.0 2620288061.0 0.5X
Num Maps: 50000 Fetch partitions:1500 3975 3990 17 0.0 3974979610.0 0.3X


10 changes: 5 additions & 5 deletions core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
MapStatuses Convert Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
OpenJDK 64-Bit Server VM 17.0.4.1+1 on Linux 5.15.0-1022-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 1228 1238 17 0.0 1228191051.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2380 2393 16 0.0 2379601524.0 0.5X
Num Maps: 50000 Fetch partitions:1500 3803 3857 55 0.0 3802550172.0 0.3X
Num Maps: 50000 Fetch partitions:500 1159 1184 38 0.0 1159155979.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2329 2387 57 0.0 2328833805.0 0.5X
Num Maps: 50000 Fetch partitions:1500 3608 3712 92 0.0 3607631972.0 0.3X


10 changes: 5 additions & 5 deletions core/benchmarks/MapStatusesConvertBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
MapStatuses Convert Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1022-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 1179 1187 13 0.0 1178581948.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2237 2257 21 0.0 2236562602.0 0.5X
Num Maps: 50000 Fetch partitions:1500 3448 3680 367 0.0 3447793753.0 0.3X
Num Maps: 50000 Fetch partitions:500 1099 1127 27 0.0 1099192398.0 1.0X
Num Maps: 50000 Fetch partitions:1000 1981 1999 16 0.0 1981390271.0 0.6X
Num Maps: 50000 Fetch partitions:1500 2973 3011 39 0.0 2973029597.0 0.4X


40 changes: 35 additions & 5 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@
"Unable to convert column <name> of type <type> to JSON."
]
},
"CANNOT_DROP_ALL_FIELDS" : {
"message" : [
"Cannot drop all fields in struct."
]
},
"CAST_WITHOUT_SUGGESTION" : {
"message" : [
"cannot cast <srcType> to <targetType>."
Expand All @@ -155,6 +160,21 @@
"To convert values from <srcType> to <targetType>, you can use the functions <functionNames> instead."
]
},
"CREATE_MAP_KEY_DIFF_TYPES" : {
"message" : [
"The given keys of function <functionName> should all be the same type, but they are <dataType>."
]
},
"CREATE_MAP_VALUE_DIFF_TYPES" : {
"message" : [
"The given values of function <functionName> should all be the same type, but they are <dataType>."
]
},
"CREATE_NAMED_STRUCT_WITHOUT_FOLDABLE_STRING" : {
"message" : [
"Only foldable `STRING` expressions are allowed to appear at odd position, but they are <inputExprs>."
]
},
"DATA_DIFF_TYPES" : {
"message" : [
"Input to <functionName> should all be the same type, but it's <dataType>."
Expand Down Expand Up @@ -190,6 +210,16 @@
"The <functionName> does not support ordering on type <dataType>."
]
},
"IN_SUBQUERY_DATA_TYPE_MISMATCH" : {
"message" : [
"The data type of one or more elements in the left hand side of an IN subquery is not compatible with the data type of the output of the subquery. Mismatched columns: [<mismatchedColumns>], left side: [<leftType>], right side: [<rightType>]."
]
},
"IN_SUBQUERY_LENGTH_MISMATCH" : {
"message" : [
"The number of columns in the left hand side of an IN subquery does not match the number of columns in the output of subquery. Left hand side columns(length: <leftLength>): [<leftColumns>], right hand side columns(length: <rightLength>): [<rightColumns>]."
]
},
"MAP_CONCAT_DIFF_TYPES" : {
"message" : [
"The <functionName> should all be of type map, but it's <dataType>."
Expand Down Expand Up @@ -539,6 +569,11 @@
],
"sqlState" : "22023"
},
"INVALID_IDENTIFIER" : {
"message" : [
"The identifier <ident> is invalid. Please, consider quoting it with back-quotes as `<ident>`."
]
},
"INVALID_JSON_SCHEMA_MAP_TYPE" : {
"message" : [
"Input schema <jsonSchema> can only contain STRING as a key type for a MAP."
Expand Down Expand Up @@ -1346,11 +1381,6 @@
"Unsupported SQL statement."
]
},
"_LEGACY_ERROR_TEMP_0040" : {
"message" : [
"Possibly unquoted identifier <ident> detected. Please consider quoting it with back-quotes as `<ident>`."
]
},
"_LEGACY_ERROR_TEMP_0041" : {
"message" : [
"Found duplicate clauses: <clauseName>."
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkThrowableHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark

import scala.collection.JavaConverters._

import com.fasterxml.jackson.core.util.MinimalPrettyPrinter

import org.apache.spark.util.JsonProtocol.toJsonString
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -119,4 +121,16 @@ private[spark] object SparkThrowableHelper {
}
}
}

def getMessage(throwable: Throwable): String = {
toJsonString { generator =>
val g = generator.setPrettyPrinter(new MinimalPrettyPrinter)
g.writeStartObject()
g.writeStringField("errorClass", throwable.getClass.getCanonicalName)
g.writeObjectFieldStart("messageParameters")
g.writeStringField("message", throwable.getMessage)
g.writeEndObject()
g.writeEndObject()
}
}
}
Loading