Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ jobs:
echo "Preparing the benchmark results:"
tar -cvf benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar `git diff --name-only` `git ls-files --others --exclude=tpcds-sf-1 --exclude-standard`
- name: Upload benchmark results
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}-${{ matrix.split }}
path: benchmark-results-${{ github.event.inputs.jdk }}-${{ github.event.inputs.scala }}.tar
Expand Down
43 changes: 25 additions & 18 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,13 @@ jobs:
./dev/run-tests --parallelism 1 --modules "$MODULES_TO_TEST" --included-tags "$INCLUDED_TAGS" --excluded-tags "$EXCLUDED_TAGS"
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-results-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }}
path: "**/target/test-reports/*.xml"
- name: Upload unit tests log files
if: ${{ !success() }}
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: unit-tests-log-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }}
path: "**/target/unit-tests.log"
Expand Down Expand Up @@ -470,16 +470,16 @@ jobs:
- name: Upload test results to report
env: ${{ fromJSON(inputs.envs) }}
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-results-${{ matrix.modules }}--${{ matrix.java }}-${{ inputs.hadoop }}-hive2.3
name: test-results-${{ matrix.modules }}--${{ matrix.java }}-${{ inputs.hadoop }}-hive2.3-${{ env.PYTHON_TO_TEST }}
path: "**/target/test-reports/*.xml"
- name: Upload unit tests log files
env: ${{ fromJSON(inputs.envs) }}
if: ${{ !success() }}
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: unit-tests-log-${{ matrix.modules }}--${{ matrix.java }}-${{ inputs.hadoop }}-hive2.3
name: unit-tests-log-${{ matrix.modules }}--${{ matrix.java }}-${{ inputs.hadoop }}-hive2.3-${{ env.PYTHON_TO_TEST }}
path: "**/target/unit-tests.log"

sparkr:
Expand Down Expand Up @@ -556,7 +556,7 @@ jobs:
./dev/run-tests --parallelism 1 --modules sparkr
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-results-sparkr--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3
path: "**/target/test-reports/*.xml"
Expand Down Expand Up @@ -699,11 +699,18 @@ jobs:
# Should delete this section after SPARK 3.5 EOL.
python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 'jinja2<3.0.0' 'black==22.6.0'
python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 'grpcio==1.59.3' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0'
- name: Install Python linter dependencies
- name: Install Python dependencies for python linter and documentation generation
if: inputs.branch != 'branch-3.4' && inputs.branch != 'branch-3.5'
run: |
python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc jinja2 'black==23.9.1'
python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 'grpcio==1.59.3' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0'
# Should unpin 'sphinxcontrib-*' after upgrading sphinx>5
# See 'ipython_genutils' in SPARK-38517
# See 'docutils<0.18.0' in SPARK-39421
python3.9 -m pip install 'sphinx==4.5.0' mkdocs 'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 markupsafe 'pyzmq<24.0.0' \
ipython ipython_genutils sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' 'docutils<0.18.0' \
'flake8==3.9.0' 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' 'black==23.9.1' \
'pandas-stubs==1.2.0.53' 'grpcio==1.59.3' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' \
'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5'
python3.9 -m pip list
- name: Python linter
run: PYTHON_EXECUTABLE=python3.9 ./dev/lint-python
# Should delete this section after SPARK 3.5 EOL.
Expand Down Expand Up @@ -751,13 +758,13 @@ jobs:
Rscript -e "install.packages(c('devtools', 'testthat', 'knitr', 'rmarkdown', 'markdown', 'e1071', 'roxygen2', 'ggplot2', 'mvtnorm', 'statmod'), repos='https://cloud.r-project.org/')"
Rscript -e "devtools::install_version('pkgdown', version='2.0.1', repos='https://cloud.r-project.org')"
Rscript -e "devtools::install_version('preferably', version='0.4', repos='https://cloud.r-project.org')"
- name: Install dependencies for documentation generation
run: |
# Should unpin 'sphinxcontrib-*' after upgrading sphinx>5
python3.9 -m pip install 'sphinx==4.5.0' mkdocs 'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 markupsafe 'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5'
python3.9 -m pip install ipython_genutils # See SPARK-38517
python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8'
python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421
- name: Install dependencies for documentation generation
run: |
gem install bundler -v 2.4.22
cd docs
bundle install
Expand All @@ -781,7 +788,7 @@ jobs:
run: tar cjf site.tar.bz2 docs/_site
- name: Upload documentation
if: github.repository != 'apache/spark'
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: site
path: site.tar.bz2
Expand Down Expand Up @@ -934,13 +941,13 @@ jobs:
spark.sql.join.forceApplyShuffledHashJoin=true
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-results-tpcds--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3
path: "**/target/test-reports/*.xml"
- name: Upload unit tests log files
if: ${{ !success() }}
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: unit-tests-log-tpcds--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3
path: "**/target/unit-tests.log"
Expand Down Expand Up @@ -1003,13 +1010,13 @@ jobs:
./dev/run-tests --parallelism 1 --modules docker-integration-tests --included-tags org.apache.spark.tags.DockerTest
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-results-docker-integration--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3
path: "**/target/test-reports/*.xml"
- name: Upload unit tests log files
if: ${{ !success() }}
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: unit-tests-log-docker-integration--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3
path: "**/target/unit-tests.log"
Expand Down Expand Up @@ -1084,7 +1091,7 @@ jobs:
build/sbt -Phadoop-3 -Psparkr -Pkubernetes -Pvolcano -Pkubernetes-integration-tests -Dspark.kubernetes.test.driverRequestCores=0.5 -Dspark.kubernetes.test.executorRequestCores=0.2 -Dspark.kubernetes.test.volcanoMaxConcurrencyJobNum=1 -Dtest.exclude.tags=local "kubernetes-integration-tests/test"
- name: Upload Spark on K8S integration tests log files
if: ${{ !success() }}
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: spark-on-kubernetes-it-log
path: "**/target/integration-tests.log"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/maven_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ jobs:
rm -rf ~/.m2/repository/org/apache/spark
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: test-results-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }}
path: "**/target/test-reports/*.xml"
- name: Upload unit tests log files
if: failure()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: unit-tests-log-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }}
path: "**/target/unit-tests.log"
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,8 @@ private[sql] object Column {
val builder = proto.Expression.newBuilder()
name match {
case "*" =>
builder.getUnresolvedStarBuilder
val starBuilder = builder.getUnresolvedStarBuilder
planId.foreach(starBuilder.setPlanId)
case _ if name.endsWith(".*") =>
builder.getUnresolvedStarBuilder.setUnparsedTarget(name)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,14 @@ object functions {
* @group agg_funcs
* @since 3.4.0
*/
def count(e: Column): Column = Column.fn("count", e)
def count(e: Column): Column = {
val withoutStar = e.expr.getExprTypeCase match {
// Turn count(*) into count(1)
case proto.Expression.ExprTypeCase.UNRESOLVED_STAR => lit(1)
case _ => e
}
Column.fn("count", withoutStar)
}

/**
* Aggregate function: returns the number of items in a group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,31 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
assert(joined2.schema.catalogString === "struct<id:bigint,a:double>")
}

test("join with dataframe star") {
val left = spark.range(100)
val right = spark.range(100).select(col("id"), rand(12).as("a"))
val join1 = left.join(right, left("id") === right("id"))
assert(
join1.select(join1.col("*")).schema.catalogString ===
"struct<id:bigint,id:bigint,a:double>")
assert(join1.select(left.col("*")).schema.catalogString === "struct<id:bigint>")
assert(join1.select(right.col("*")).schema.catalogString === "struct<id:bigint,a:double>")

val join2 = left.join(right)
assert(
join2.select(join2.col("*")).schema.catalogString ===
"struct<id:bigint,id:bigint,a:double>")
assert(join2.select(left.col("*")).schema.catalogString === "struct<id:bigint>")
assert(join2.select(right.col("*")).schema.catalogString === "struct<id:bigint,a:double>")

val join3 = left.join(right, "id")
assert(
join3.select(join3.col("*")).schema.catalogString ===
"struct<id:bigint,a:double>")
assert(join3.select(left.col("*")).schema.catalogString === "struct<id:bigint>")
assert(join3.select(right.col("*")).schema.catalogString === "struct<id:bigint,a:double>")
}

test("SPARK-45509: ambiguous column reference") {
val session = spark
import session.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,22 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach {
assertContains("noException: Boolean = true", output)
}

test("broadcast works with REPL generated code") {
val input =
"""
|val add1 = udf((i: Long) => i + 1)
|val tableA = spark.range(2).alias("a")
|val tableB = broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b")
|tableA.join(tableB).
| where(col("a.id")===col("b.id")).
| select(col("a.id").alias("a_id"), col("b.id").alias("b_id")).
| collect().
| mkString("[", ", ", "]")
|""".stripMargin
val output = runCommandsInShell(input)
assertContains("""String = "[[1,1]]"""", output)
}

test("closure cleaner") {
val input =
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@
"unresolvedFunction": {
"functionName": "count",
"arguments": [{
"unresolvedStar": {
"literal": {
"integer": 1
}
}]
}
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"functionName": "struct",
"arguments": [{
"unresolvedStar": {
"planId": "0"
}
}]
}
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
},
errorClass = "INDEX_ALREADY_EXISTS",
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)

sql(s"DROP index i1 ON $catalogName.new_table")
Expand All @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"DROP index i1 ON $catalogName.new_table")
},
errorClass = "INDEX_NOT_FOUND",
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)
}
}
Expand Down
4 changes: 0 additions & 4 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2338,10 +2338,6 @@ There are a few parameters that can help you tune the memory usage and GC overhe
* **Clearing old data**: By default, all input data and persisted RDDs generated by DStream transformations are automatically cleared. Spark Streaming decides when to clear the data based on the transformations that are used. For example, if you are using a window operation of 10 minutes, then Spark Streaming will keep around the last 10 minutes of data, and actively throw away older data.
Data can be retained for a longer duration (e.g. interactively querying older data) by setting `streamingContext.remember`.

* **CMS Garbage Collector**: Use of the concurrent mark-and-sweep GC is strongly recommended for keeping GC-related pauses consistently low. Even though concurrent GC is known to reduce the
overall processing throughput of the system, its use is still recommended to achieve more
consistent batch processing times. Make sure you set the CMS GC on both the driver (using `--driver-java-options` in `spark-submit`) and the executors (using [Spark configuration](configuration.html#runtime-environment) `spark.executor.extraJavaOptions`).

* **Other tips**: To further reduce GC overheads, here are some more tips to try.
- Persist RDDs using the `OFF_HEAP` storage level. See more detail in the [Spark Programming Guide](rdd-programming-guide.html#rdd-persistence).
- Use more executors with smaller heap sizes. This will reduce the GC pressure within each JVM heap.
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,8 @@ def corr(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:


def count(col: "ColumnOrName") -> Column:
if isinstance(col, Column) and isinstance(col._expr, UnresolvedStar):
col = lit(1)
return _invoke_function_over_columns("count", col)


Expand Down
Loading