diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index a4aca07d69f7..9df2688a83a2 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -117,896 +117,6 @@ jobs: IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME" echo "image_url=$IMG_URL" >> $GITHUB_OUTPUT - # Build: build Spark and run the tests for specified modules. - build: - name: "Build modules: ${{ matrix.modules }} ${{ matrix.comment }}" - needs: precondition - if: fromJson(needs.precondition.outputs.required).build == 'true' - runs-on: ubuntu-22.04 - timeout-minutes: 300 - strategy: - fail-fast: false - matrix: - java: - - ${{ inputs.java }} - hadoop: - - ${{ inputs.hadoop }} - hive: - - hive2.3 - # TODO(SPARK-32246): We don't test 'streaming-kinesis-asl' for now. - # Kinesis tests depends on external Amazon kinesis service. - # Note that the modules below are from sparktestsupport/modules.py. - modules: - - >- - core, unsafe, kvstore, avro, utils, - network-common, network-shuffle, repl, launcher, - examples, sketch - - >- - api, catalyst, hive-thriftserver - - >- - mllib-local, mllib, graphx - - >- - streaming, sql-kafka-0-10, streaming-kafka-0-10, streaming-kinesis-asl, - yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl, - connect, protobuf - # Here, we split Hive and SQL tests into some of slow ones and the rest of them. - included-tags: [""] - excluded-tags: [""] - comment: [""] - include: - # Hive tests - - modules: hive - java: ${{ inputs.java }} - hadoop: ${{ inputs.hadoop }} - hive: hive2.3 - included-tags: org.apache.spark.tags.SlowHiveTest - comment: "- slow tests" - - modules: hive - java: ${{ inputs.java }} - hadoop: ${{ inputs.hadoop }} - hive: hive2.3 - excluded-tags: org.apache.spark.tags.SlowHiveTest - comment: "- other tests" - # SQL tests - - modules: sql - java: ${{ inputs.java }} - hadoop: ${{ inputs.hadoop }} - hive: hive2.3 - included-tags: org.apache.spark.tags.ExtendedSQLTest - comment: "- extended tests" - - modules: sql - java: ${{ inputs.java }} - hadoop: ${{ inputs.hadoop }} - hive: hive2.3 - included-tags: org.apache.spark.tags.SlowSQLTest - comment: "- slow tests" - - modules: sql - java: ${{ inputs.java }} - hadoop: ${{ inputs.hadoop }} - hive: hive2.3 - excluded-tags: org.apache.spark.tags.ExtendedSQLTest,org.apache.spark.tags.SlowSQLTest - comment: "- other tests" - env: - MODULES_TO_TEST: ${{ matrix.modules }} - EXCLUDED_TAGS: ${{ matrix.excluded-tags }} - INCLUDED_TAGS: ${{ matrix.included-tags }} - HADOOP_PROFILE: ${{ matrix.hadoop }} - HIVE_PROFILE: ${{ matrix.hive }} - GITHUB_PREV_SHA: ${{ github.event.before }} - SPARK_LOCAL_IP: localhost - SKIP_UNIDOC: true - SKIP_MIMA: true - SKIP_PACKAGING: true - steps: - - name: Checkout Spark repository - uses: actions/checkout@v4 - # In order to fetch changed files - with: - fetch-depth: 0 - repository: apache/spark - ref: ${{ inputs.branch }} - - name: Sync the current branch with the latest in Apache Spark - if: github.repository != 'apache/spark' - run: | - echo "APACHE_SPARK_REF=$(git rev-parse HEAD)" >> $GITHUB_ENV - git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/} - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' merge --no-commit --progress --squash FETCH_HEAD - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty - # Cache local repositories. Note that GitHub Actions cache has a 2G limit. - - name: Cache Scala, SBT and Maven - uses: actions/cache@v3 - with: - path: | - build/apache-maven-* - build/scala-* - build/*.jar - ~/.sbt - key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} - restore-keys: | - build- - - name: Cache Coursier local repository - uses: actions/cache@v3 - with: - path: ~/.cache/coursier - key: ${{ matrix.java }}-${{ matrix.hadoop }}-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} - restore-keys: | - ${{ matrix.java }}-${{ matrix.hadoop }}-coursier- - - name: Free up disk space - run: | - if [ -f ./dev/free_disk_space ]; then - ./dev/free_disk_space - fi - - name: Install Java ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: zulu - java-version: ${{ matrix.java }} - - name: Install Python 3.9 - uses: actions/setup-python@v5 - # We should install one Python that is higher than 3+ for SQL and Yarn because: - # - SQL component also has Python related tests, for example, IntegratedUDFTestUtils. - # - Yarn has a Python specific test too, for example, YarnClusterSuite. - if: contains(matrix.modules, 'yarn') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect') - with: - python-version: '3.9' - architecture: x64 - - name: Install Python packages (Python 3.9) - if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect') - run: | - python3.9 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'grpcio==1.59.3' 'grpcio-status==1.59.3' 'protobuf==4.25.1' - python3.9 -m pip list - # Run the tests. - - name: Run tests - env: ${{ fromJSON(inputs.envs) }} - shell: 'script -q -e -c "bash {0}"' - run: | - # Fix for TTY related issues when launching the Ammonite REPL in tests. - export TERM=vt100 - # Hive "other tests" test needs larger metaspace size based on experiment. - if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi - # SPARK-46283: should delete the following env replacement after SPARK 3.x EOL - if [[ "$MODULES_TO_TEST" == *"streaming-kinesis-asl"* ]] && [[ "${{ inputs.branch }}" =~ ^branch-3 ]]; then - MODULES_TO_TEST=${MODULES_TO_TEST//streaming-kinesis-asl, /} - fi - export SERIAL_SBT_TESTS=1 - ./dev/run-tests --parallelism 1 --modules "$MODULES_TO_TEST" --included-tags "$INCLUDED_TAGS" --excluded-tags "$EXCLUDED_TAGS" - - name: Upload test results to report - if: always() - uses: actions/upload-artifact@v4 - with: - name: test-results-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }} - path: "**/target/test-reports/*.xml" - - name: Upload unit tests log files - if: ${{ !success() }} - uses: actions/upload-artifact@v4 - with: - name: unit-tests-log-${{ matrix.modules }}-${{ matrix.comment }}-${{ matrix.java }}-${{ matrix.hadoop }}-${{ matrix.hive }} - path: "**/target/unit-tests.log" - - infra-image: - name: "Base image build" - needs: precondition - # Currently, enable docker build from cache for `master` and branch (since 3.4) jobs - if: >- - fromJson(needs.precondition.outputs.required).pyspark == 'true' || - fromJson(needs.precondition.outputs.required).lint == 'true' || - fromJson(needs.precondition.outputs.required).sparkr == 'true' - runs-on: ubuntu-latest - permissions: - packages: write - steps: - - name: Login to GitHub Container Registry - uses: docker/login-action@v3 - with: - registry: ghcr.io - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - name: Checkout Spark repository - uses: actions/checkout@v4 - # In order to fetch changed files - with: - fetch-depth: 0 - repository: apache/spark - ref: ${{ inputs.branch }} - - name: Sync the current branch with the latest in Apache Spark - if: github.repository != 'apache/spark' - run: | - echo "APACHE_SPARK_REF=$(git rev-parse HEAD)" >> $GITHUB_ENV - git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/} - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' merge --no-commit --progress --squash FETCH_HEAD - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - name: Build and push - id: docker_build - uses: docker/build-push-action@v5 - with: - context: ./dev/infra/ - push: true - tags: | - ${{ needs.precondition.outputs.image_url }} - # Use the infra image cache to speed up - cache-from: type=registry,ref=ghcr.io/apache/spark/apache-spark-github-action-image-cache:${{ inputs.branch }} - - pyspark: - needs: [precondition, infra-image] - # always run if pyspark == 'true', even infra-image is skip (such as non-master job) - if: (!cancelled()) && fromJson(needs.precondition.outputs.required).pyspark == 'true' - name: "Build modules: ${{ matrix.modules }}" - runs-on: ubuntu-22.04 - timeout-minutes: 300 - container: - image: ${{ needs.precondition.outputs.image_url }} - strategy: - fail-fast: false - matrix: - java: - - ${{ inputs.java }} - modules: - - >- - pyspark-sql, pyspark-resource, pyspark-testing - - >- - pyspark-core, pyspark-errors, pyspark-streaming - - >- - pyspark-mllib, pyspark-ml, pyspark-ml-connect - - >- - pyspark-pandas - - >- - pyspark-pandas-slow - - >- - pyspark-connect - - >- - pyspark-pandas-connect-part0 - - >- - pyspark-pandas-connect-part1 - - >- - pyspark-pandas-connect-part2 - - >- - pyspark-pandas-connect-part3 - env: - MODULES_TO_TEST: ${{ matrix.modules }} - PYTHON_TO_TEST: 'python3.9' - HADOOP_PROFILE: ${{ inputs.hadoop }} - HIVE_PROFILE: hive2.3 - GITHUB_PREV_SHA: ${{ github.event.before }} - SPARK_LOCAL_IP: localhost - SKIP_UNIDOC: true - SKIP_MIMA: true - SKIP_PACKAGING: true - METASPACE_SIZE: 1g - BRANCH: ${{ inputs.branch }} - steps: - - name: Checkout Spark repository - uses: actions/checkout@v4 - # In order to fetch changed files - with: - fetch-depth: 0 - repository: apache/spark - ref: ${{ inputs.branch }} - - name: Add GITHUB_WORKSPACE to git trust safe.directory - run: | - git config --global --add safe.directory ${GITHUB_WORKSPACE} - - name: Sync the current branch with the latest in Apache Spark - if: github.repository != 'apache/spark' - run: | - echo "APACHE_SPARK_REF=$(git rev-parse HEAD)" >> $GITHUB_ENV - git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/} - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' merge --no-commit --progress --squash FETCH_HEAD - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty - # Cache local repositories. Note that GitHub Actions cache has a 2G limit. - - name: Cache Scala, SBT and Maven - uses: actions/cache@v3 - with: - path: | - build/apache-maven-* - build/scala-* - build/*.jar - ~/.sbt - key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} - restore-keys: | - build- - - name: Cache Coursier local repository - uses: actions/cache@v3 - with: - path: ~/.cache/coursier - key: pyspark-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} - restore-keys: | - pyspark-coursier- - - name: Free up disk space - shell: 'script -q -e -c "bash {0}"' - run: | - if [[ "$MODULES_TO_TEST" != *"pyspark-ml"* ]] && [[ "$BRANCH" != "branch-3.5" ]]; then - # uninstall libraries dedicated for ML testing - python3.9 -m pip uninstall -y torch torchvision torcheval torchtnt tensorboard mlflow deepspeed - fi - if [ -f ./dev/free_disk_space_container ]; then - ./dev/free_disk_space_container - fi - - name: Install Java ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: zulu - java-version: ${{ matrix.java }} - - name: List Python packages (${{ env.PYTHON_TO_TEST }}) - shell: 'script -q -e -c "bash {0}"' - run: | - for py in $(echo $PYTHON_TO_TEST | tr "," "\n") - do - echo $py - $py -m pip list - done - - name: Install Conda for pip packaging test - if: contains(matrix.modules, 'pyspark-errors') - run: | - curl -s https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh > miniconda.sh - bash miniconda.sh -b -p $HOME/miniconda - rm miniconda.sh - # Run the tests. - - name: Run tests - env: ${{ fromJSON(inputs.envs) }} - shell: 'script -q -e -c "bash {0}"' - run: | - if [[ "$MODULES_TO_TEST" == *"pyspark-errors"* ]]; then - export PATH=$PATH:$HOME/miniconda/bin - export SKIP_PACKAGING=false - echo "Python Packaging Tests Enabled!" - fi - if [ ! -z "$PYTHON_TO_TEST" ]; then - ./dev/run-tests --parallelism 1 --modules "$MODULES_TO_TEST" --python-executables "$PYTHON_TO_TEST" - else - # For branch-3.5 and below, it uses the default Python versions. - ./dev/run-tests --parallelism 1 --modules "$MODULES_TO_TEST" - fi - - name: Upload coverage to Codecov - if: fromJSON(inputs.envs).PYSPARK_CODECOV == 'true' - uses: codecov/codecov-action@v2 - with: - files: ./python/coverage.xml - flags: unittests - name: PySpark - - name: Upload test results to report - if: always() - uses: actions/upload-artifact@v4 - with: - name: test-results-${{ matrix.modules }}--${{ matrix.java }}-${{ inputs.hadoop }}-hive2.3 - path: "**/target/test-reports/*.xml" - - name: Upload unit tests log files - if: ${{ !success() }} - uses: actions/upload-artifact@v4 - with: - name: unit-tests-log-${{ matrix.modules }}--${{ matrix.java }}-${{ inputs.hadoop }}-hive2.3 - path: "**/target/unit-tests.log" - - sparkr: - needs: [precondition, infra-image] - # always run if sparkr == 'true', even infra-image is skip (such as non-master job) - if: (!cancelled()) && fromJson(needs.precondition.outputs.required).sparkr == 'true' - name: "Build modules: sparkr" - runs-on: ubuntu-22.04 - timeout-minutes: 300 - container: - image: ${{ needs.precondition.outputs.image_url }} - env: - HADOOP_PROFILE: ${{ inputs.hadoop }} - HIVE_PROFILE: hive2.3 - GITHUB_PREV_SHA: ${{ github.event.before }} - SPARK_LOCAL_IP: localhost - SKIP_UNIDOC: true - SKIP_MIMA: true - SKIP_PACKAGING: true - steps: - - name: Checkout Spark repository - uses: actions/checkout@v4 - # In order to fetch changed files - with: - fetch-depth: 0 - repository: apache/spark - ref: ${{ inputs.branch }} - - name: Add GITHUB_WORKSPACE to git trust safe.directory - run: | - git config --global --add safe.directory ${GITHUB_WORKSPACE} - - name: Sync the current branch with the latest in Apache Spark - if: github.repository != 'apache/spark' - run: | - echo "APACHE_SPARK_REF=$(git rev-parse HEAD)" >> $GITHUB_ENV - git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/} - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' merge --no-commit --progress --squash FETCH_HEAD - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty - # Cache local repositories. Note that GitHub Actions cache has a 2G limit. - - name: Cache Scala, SBT and Maven - uses: actions/cache@v3 - with: - path: | - build/apache-maven-* - build/scala-* - build/*.jar - ~/.sbt - key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} - restore-keys: | - build- - - name: Cache Coursier local repository - uses: actions/cache@v3 - with: - path: ~/.cache/coursier - key: sparkr-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} - restore-keys: | - sparkr-coursier- - - name: Free up disk space - run: | - if [ -f ./dev/free_disk_space_container ]; then - ./dev/free_disk_space_container - fi - - name: Install Java ${{ inputs.java }} - uses: actions/setup-java@v4 - with: - distribution: zulu - java-version: ${{ inputs.java }} - - name: Run tests - env: ${{ fromJSON(inputs.envs) }} - run: | - # The followings are also used by `r-lib/actions/setup-r` to avoid - # R issues at docker environment - export TZ=UTC - export _R_CHECK_SYSTEM_CLOCK_=FALSE - ./dev/run-tests --parallelism 1 --modules sparkr - - name: Upload test results to report - if: always() - uses: actions/upload-artifact@v4 - with: - name: test-results-sparkr--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3 - path: "**/target/test-reports/*.xml" - - buf: - needs: [precondition] - if: (!cancelled()) && fromJson(needs.precondition.outputs.required).buf == 'true' - name: Protobuf breaking change detection and Python CodeGen check - runs-on: ubuntu-22.04 - steps: - - name: Checkout Spark repository - uses: actions/checkout@v4 - with: - fetch-depth: 0 - repository: apache/spark - ref: ${{ inputs.branch }} - - name: Sync the current branch with the latest in Apache Spark - if: github.repository != 'apache/spark' - run: | - git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/} - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' merge --no-commit --progress --squash FETCH_HEAD - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty - - name: Install Buf - uses: bufbuild/buf-setup-action@v1 - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - - name: Protocol Buffers Linter - uses: bufbuild/buf-lint-action@v1 - with: - input: core/src/main/protobuf - # Change 'branch-3.5' to 'branch-4.0' in master branch after cutting branch-4.0 branch. - - name: Breaking change detection against branch-3.5 - uses: bufbuild/buf-breaking-action@v1 - with: - input: connector/connect/common/src/main - against: 'https://github.com/apache/spark.git#branch=branch-3.5,subdir=connector/connect/common/src/main' - - name: Install Python 3.9 - uses: actions/setup-python@v5 - with: - python-version: '3.9' - - name: Install dependencies for Python CodeGen check - run: | - python3.9 -m pip install 'black==23.9.1' 'protobuf==4.25.1' 'mypy==0.982' 'mypy-protobuf==3.3.0' - python3.9 -m pip list - - name: Python CodeGen check - run: ./dev/connect-check-protos.py - - # Static analysis, and documentation build - lint: - needs: [precondition, infra-image] - # always run if lint == 'true', even infra-image is skip (such as non-master job) - if: (!cancelled()) && fromJson(needs.precondition.outputs.required).lint == 'true' - name: Linters, licenses, dependencies and documentation generation - runs-on: ubuntu-22.04 - timeout-minutes: 300 - env: - LC_ALL: C.UTF-8 - LANG: C.UTF-8 - PYSPARK_DRIVER_PYTHON: python3.9 - PYSPARK_PYTHON: python3.9 - GITHUB_PREV_SHA: ${{ github.event.before }} - container: - image: ${{ needs.precondition.outputs.image_url }} - steps: - - name: Checkout Spark repository - uses: actions/checkout@v4 - with: - fetch-depth: 0 - repository: apache/spark - ref: ${{ inputs.branch }} - - name: Add GITHUB_WORKSPACE to git trust safe.directory - run: | - git config --global --add safe.directory ${GITHUB_WORKSPACE} - - name: Sync the current branch with the latest in Apache Spark - if: github.repository != 'apache/spark' - run: | - echo "APACHE_SPARK_REF=$(git rev-parse HEAD)" >> $GITHUB_ENV - git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/} - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' merge --no-commit --progress --squash FETCH_HEAD - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty - # Cache local repositories. Note that GitHub Actions cache has a 2G limit. - - name: Cache Scala, SBT and Maven - uses: actions/cache@v3 - with: - path: | - build/apache-maven-* - build/scala-* - build/*.jar - ~/.sbt - key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} - restore-keys: | - build- - - name: Cache Coursier local repository - uses: actions/cache@v3 - with: - path: ~/.cache/coursier - key: docs-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} - restore-keys: | - docs-coursier- - - name: Cache Maven local repository - uses: actions/cache@v3 - with: - path: ~/.m2/repository - key: docs-maven-${{ hashFiles('**/pom.xml') }} - restore-keys: | - docs-maven- - - name: Free up disk space - run: | - if [ -f ./dev/free_disk_space_container ]; then - ./dev/free_disk_space_container - fi - - name: Install Java ${{ inputs.java }} - uses: actions/setup-java@v4 - with: - distribution: zulu - java-version: ${{ inputs.java }} - - name: License test - run: ./dev/check-license - - name: Dependencies test - run: ./dev/test-dependencies.sh - - name: MIMA test - run: ./dev/mima - - name: Scala linter - run: ./dev/lint-scala - - name: Java linter - run: ./dev/lint-java - - name: Spark connect jvm client mima check - run: ./dev/connect-jvm-client-mima-check - - name: Install Python linter dependencies for branch-3.4 - if: inputs.branch == 'branch-3.4' - run: | - # SPARK-44554: Copy from https://github.com/apache/spark/blob/a05c27e85829fe742c1828507a1fd180cdc84b54/.github/workflows/build_and_test.yml#L571-L578 - # Should delete this section after SPARK 3.4 EOL. - python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.920' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 'jinja2<3.0.0' 'black==22.6.0' - python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 'grpcio==1.59.3' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' - - name: Install Python linter dependencies for branch-3.5 - if: inputs.branch == 'branch-3.5' - run: | - # SPARK-45212: Copy from https://github.com/apache/spark/blob/555c8def51e5951c7bf5165a332795e9e330ec9d/.github/workflows/build_and_test.yml#L631-L638 - # Should delete this section after SPARK 3.5 EOL. - python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 'jinja2<3.0.0' 'black==22.6.0' - python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 'grpcio==1.59.3' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' - - name: Install Python linter dependencies - if: inputs.branch != 'branch-3.4' && inputs.branch != 'branch-3.5' - run: | - python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc jinja2 'black==23.9.1' - python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 'grpcio==1.59.3' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' - - name: Python linter - run: PYTHON_EXECUTABLE=python3.9 ./dev/lint-python - # Should delete this section after SPARK 3.5 EOL. - - name: Install dependencies for Python code generation check for branch-3.5 - if: inputs.branch == 'branch-3.5' - run: | - # See more in "Installation" https://docs.buf.build/installation#tarball - curl -LO https://github.com/bufbuild/buf/releases/download/v1.28.1/buf-Linux-x86_64.tar.gz - mkdir -p $HOME/buf - tar -xvzf buf-Linux-x86_64.tar.gz -C $HOME/buf --strip-components 1 - rm buf-Linux-x86_64.tar.gz - python3.9 -m pip install 'protobuf==4.25.1' 'mypy-protobuf==3.3.0' - # Should delete this section after SPARK 3.5 EOL. - - name: Python code generation check for branch-3.5 - if: inputs.branch == 'branch-3.5' - run: if test -f ./dev/connect-check-protos.py; then PATH=$PATH:$HOME/buf/bin PYTHON_EXECUTABLE=python3.9 ./dev/connect-check-protos.py; fi - # Should delete this section after SPARK 3.5 EOL. - - name: Install JavaScript linter dependencies for branch-3.4, branch-3.5 - if: inputs.branch == 'branch-3.4' || inputs.branch == 'branch-3.5' - run: | - apt update - apt-get install -y nodejs npm - - name: JS linter - run: ./dev/lint-js - # Should delete this section after SPARK 3.5 EOL. - - name: Install R linter dependencies for branch-3.4, branch-3.5 - if: inputs.branch == 'branch-3.4' || inputs.branch == 'branch-3.5' - run: | - apt update - apt-get install -y libcurl4-openssl-dev libgit2-dev libssl-dev libxml2-dev \ - libfontconfig1-dev libharfbuzz-dev libfribidi-dev libfreetype6-dev libpng-dev \ - libtiff5-dev libjpeg-dev - Rscript -e "install.packages(c('devtools'), repos='https://cloud.r-project.org/')" - Rscript -e "devtools::install_version('lintr', version='2.0.1', repos='https://cloud.r-project.org')" - - name: Install R linter dependencies and SparkR - run: ./R/install-dev.sh - # Should delete this section after SPARK 3.5 EOL. - - name: Install dependencies for documentation generation for branch-3.4, branch-3.5 - if: inputs.branch == 'branch-3.4' || inputs.branch == 'branch-3.5' - run: | - # pandoc is required to generate PySpark APIs as well in nbsphinx. - apt-get update -y - apt-get install -y libcurl4-openssl-dev pandoc - apt-get install -y ruby ruby-dev - Rscript -e "install.packages(c('devtools', 'testthat', 'knitr', 'rmarkdown', 'markdown', 'e1071', 'roxygen2', 'ggplot2', 'mvtnorm', 'statmod'), repos='https://cloud.r-project.org/')" - Rscript -e "devtools::install_version('pkgdown', version='2.0.1', repos='https://cloud.r-project.org')" - Rscript -e "devtools::install_version('preferably', version='0.4', repos='https://cloud.r-project.org')" - - name: Install dependencies for documentation generation - run: | - python3.9 -m pip install 'sphinx==4.2.0' mkdocs 'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 markupsafe 'pyzmq<24.0.0' - python3.9 -m pip install ipython_genutils # See SPARK-38517 - python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' - python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421 - gem install bundler -v 2.4.22 - cd docs - bundle install - - name: R linter - run: ./dev/lint-r - - name: Run documentation build - run: | - if [ -f "./dev/is-changed.py" ]; then - # Skip PySpark and SparkR docs while keeping Scala/Java/SQL docs - pyspark_modules=`cd dev && python3.9 -c "import sparktestsupport.modules as m; print(','.join(m.name for m in m.all_modules if m.name.startswith('pyspark')))"` - if [ `./dev/is-changed.py -m $pyspark_modules` = false ]; then export SKIP_PYTHONDOC=1; fi - if [ `./dev/is-changed.py -m sparkr` = false ]; then export SKIP_RDOC=1; fi - fi - cd docs - bundle exec jekyll build - - name: Tar documentation - if: github.repository != 'apache/spark' - run: tar cjf site.tar.bz2 docs/_site - - name: Upload documentation - if: github.repository != 'apache/spark' - uses: actions/upload-artifact@v4 - with: - name: site - path: site.tar.bz2 - retention-days: 1 - - java-other-versions: - needs: precondition - if: fromJson(needs.precondition.outputs.required).java-other-versions == 'true' - name: Java ${{ matrix.java }} build with Maven - strategy: - fail-fast: false - matrix: - java: - - 17 - - 21 - runs-on: ubuntu-22.04 - timeout-minutes: 300 - steps: - - name: Checkout Spark repository - uses: actions/checkout@v4 - with: - fetch-depth: 0 - repository: apache/spark - ref: ${{ inputs.branch }} - - name: Sync the current branch with the latest in Apache Spark - if: github.repository != 'apache/spark' - run: | - git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/} - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' merge --no-commit --progress --squash FETCH_HEAD - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty - - name: Cache Scala, SBT and Maven - uses: actions/cache@v3 - with: - path: | - build/apache-maven-* - build/scala-* - build/*.jar - ~/.sbt - key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} - restore-keys: | - build- - - name: Cache Maven local repository - uses: actions/cache@v3 - with: - path: ~/.m2/repository - key: java${{ matrix.java }}-maven-${{ hashFiles('**/pom.xml') }} - restore-keys: | - java${{ matrix.java }}-maven- - - name: Install Java ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: zulu - java-version: ${{ matrix.java }} - - name: Build with Maven - run: | - export MAVEN_OPTS="-Xss64m -Xmx2g -XX:ReservedCodeCacheSize=1g -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN" - export MAVEN_CLI_OPTS="--no-transfer-progress" - export JAVA_VERSION=${{ matrix.java }} - # It uses Maven's 'install' intentionally, see https://github.com/apache/spark/pull/26414. - ./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install - rm -rf ~/.m2/repository/org/apache/spark - - # Any TPC-DS related updates on this job need to be applied to tpcds-1g-gen job of benchmark.yml as well - tpcds-1g: - needs: precondition - if: fromJson(needs.precondition.outputs.required).tpcds-1g == 'true' - name: Run TPC-DS queries with SF=1 - # Pin to 'Ubuntu 20.04' due to 'databricks/tpcds-kit' compilation - runs-on: ubuntu-20.04 - timeout-minutes: 300 - env: - SPARK_LOCAL_IP: localhost - steps: - - name: Checkout Spark repository - uses: actions/checkout@v4 - with: - fetch-depth: 0 - repository: apache/spark - ref: ${{ inputs.branch }} - - name: Sync the current branch with the latest in Apache Spark - if: github.repository != 'apache/spark' - run: | - git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/} - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' merge --no-commit --progress --squash FETCH_HEAD - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty - - name: Cache Scala, SBT and Maven - uses: actions/cache@v3 - with: - path: | - build/apache-maven-* - build/scala-* - build/*.jar - ~/.sbt - key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} - restore-keys: | - build- - - name: Cache Coursier local repository - uses: actions/cache@v3 - with: - path: ~/.cache/coursier - key: tpcds-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} - restore-keys: | - tpcds-coursier- - - name: Install Java ${{ inputs.java }} - uses: actions/setup-java@v4 - with: - distribution: zulu - java-version: ${{ inputs.java }} - - name: Cache TPC-DS generated data - id: cache-tpcds-sf-1 - uses: actions/cache@v3 - with: - path: ./tpcds-sf-1 - key: tpcds-${{ hashFiles('.github/workflows/build_and_test.yml', 'sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }} - - name: Checkout tpcds-kit repository - if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' - uses: actions/checkout@v4 - with: - repository: databricks/tpcds-kit - ref: 2a5078a782192ddb6efbcead8de9973d6ab4f069 - path: ./tpcds-kit - - name: Build tpcds-kit - if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' - run: cd tpcds-kit/tools && make OS=LINUX - - name: Generate TPC-DS (SF=1) table data - if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true' - run: build/sbt "sql/Test/runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite" - - name: Run TPC-DS queries (Sort merge join) - run: | - SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite" - env: - SPARK_ANSI_SQL_MODE: ${{ fromJSON(inputs.envs).SPARK_ANSI_SQL_MODE }} - SPARK_TPCDS_JOIN_CONF: | - spark.sql.autoBroadcastJoinThreshold=-1 - spark.sql.join.preferSortMergeJoin=true - - name: Run TPC-DS queries (Broadcast hash join) - run: | - SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite" - env: - SPARK_ANSI_SQL_MODE: ${{ fromJSON(inputs.envs).SPARK_ANSI_SQL_MODE }} - SPARK_TPCDS_JOIN_CONF: | - spark.sql.autoBroadcastJoinThreshold=10485760 - - name: Run TPC-DS queries (Shuffled hash join) - run: | - SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite" - env: - SPARK_ANSI_SQL_MODE: ${{ fromJSON(inputs.envs).SPARK_ANSI_SQL_MODE }} - SPARK_TPCDS_JOIN_CONF: | - spark.sql.autoBroadcastJoinThreshold=-1 - spark.sql.join.forceApplyShuffledHashJoin=true - - name: Upload test results to report - if: always() - uses: actions/upload-artifact@v4 - with: - name: test-results-tpcds--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3 - path: "**/target/test-reports/*.xml" - - name: Upload unit tests log files - if: ${{ !success() }} - uses: actions/upload-artifact@v4 - with: - name: unit-tests-log-tpcds--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3 - path: "**/target/unit-tests.log" - - docker-integration-tests: - needs: precondition - if: fromJson(needs.precondition.outputs.required).docker-integration-tests == 'true' - name: Run Docker integration tests - runs-on: ubuntu-22.04 - timeout-minutes: 300 - env: - HADOOP_PROFILE: ${{ inputs.hadoop }} - HIVE_PROFILE: hive2.3 - GITHUB_PREV_SHA: ${{ github.event.before }} - SPARK_LOCAL_IP: localhost - ORACLE_DOCKER_IMAGE_NAME: gvenzl/oracle-free:23.3 - SKIP_UNIDOC: true - SKIP_MIMA: true - SKIP_PACKAGING: true - steps: - - name: Checkout Spark repository - uses: actions/checkout@v4 - with: - fetch-depth: 0 - repository: apache/spark - ref: ${{ inputs.branch }} - - name: Sync the current branch with the latest in Apache Spark - if: github.repository != 'apache/spark' - run: | - echo "APACHE_SPARK_REF=$(git rev-parse HEAD)" >> $GITHUB_ENV - git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/} - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' merge --no-commit --progress --squash FETCH_HEAD - git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty - - name: Cache Scala, SBT and Maven - uses: actions/cache@v3 - with: - path: | - build/apache-maven-* - build/scala-* - build/*.jar - ~/.sbt - key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }} - restore-keys: | - build- - - name: Cache Coursier local repository - uses: actions/cache@v3 - with: - path: ~/.cache/coursier - key: docker-integration-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} - restore-keys: | - docker-integration-coursier- - - name: Install Java ${{ inputs.java }} - uses: actions/setup-java@v4 - with: - distribution: zulu - java-version: ${{ inputs.java }} - - name: Run tests - env: ${{ fromJSON(inputs.envs) }} - run: | - ./dev/run-tests --parallelism 1 --modules docker-integration-tests --included-tags org.apache.spark.tags.DockerTest - - name: Upload test results to report - if: always() - uses: actions/upload-artifact@v4 - with: - name: test-results-docker-integration--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3 - path: "**/target/test-reports/*.xml" - - name: Upload unit tests log files - if: ${{ !success() }} - uses: actions/upload-artifact@v4 - with: - name: unit-tests-log-docker-integration--${{ inputs.java }}-${{ inputs.hadoop }}-hive2.3 - path: "**/target/unit-tests.log" - k8s-integration-tests: needs: precondition if: fromJson(needs.precondition.outputs.required).k8s-integration-tests == 'true' @@ -1081,22 +191,3 @@ jobs: with: name: spark-on-kubernetes-it-log path: "**/target/integration-tests.log" - - ui: - needs: [precondition] - if: fromJson(needs.precondition.outputs.required).ui == 'true' - name: Run Spark UI tests - runs-on: ubuntu-22.04 - timeout-minutes: 300 - steps: - - uses: actions/checkout@v4 - - name: Use Node.js - uses: actions/setup-node@v4 - with: - node-version: 20 - cache: 'npm' - cache-dependency-path: ui-test/package-lock.json - - run: | - cd ui-test - npm install --save-dev - node --experimental-vm-modules node_modules/.bin/jest diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 093047ea72f5..0e61e38ff2b0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -18,19 +18,14 @@ package org.apache.spark.api.python import java.io.File -import java.nio.file.Paths import java.util.{List => JList} -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ -import scala.sys.process.Process import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.internal.Logging -import org.apache.spark.util.ArrayImplicits.SparkArrayOps -import org.apache.spark.util.Utils private[spark] object PythonUtils extends Logging { val PY4J_ZIP_NAME = "py4j-0.10.9.7-src.zip" @@ -118,10 +113,11 @@ private[spark] object PythonUtils extends Logging { } val pythonVersionCMD = Seq(pythonExec, "-VV") + val PYTHONPATH = "PYTHONPATH" val pythonPath = PythonUtils.mergePythonPaths( PythonUtils.sparkPythonPath, - sys.env.getOrElse("PYTHONPATH", "")) - val environment = Map("PYTHONPATH" -> pythonPath) + sys.env.getOrElse(PYTHONPATH, "")) + val environment = Map(PYTHONPATH -> pythonPath) logInfo(s"Python path $pythonPath") val processPythonVer = Process(pythonVersionCMD, None, environment.toSeq: _*) @@ -149,48 +145,4 @@ private[spark] object PythonUtils extends Logging { listOfPackages.foreach(x => logInfo(s"List of Python packages :- ${formatOutput(x)}")) } } - - // Only for testing. - private[spark] var additionalTestingPath: Option[String] = None - - private[spark] def createPythonFunction(command: Array[Byte]): SimplePythonFunction = { - val pythonExec: String = sys.env.getOrElse( - "PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python3")) - - val sourcePython = if (Utils.isTesting) { - // Put PySpark source code instead of the build zip archive so we don't need - // to build PySpark every time during development. - val sparkHome: String = { - require( - sys.props.contains("spark.test.home") || sys.env.contains("SPARK_HOME"), - "spark.test.home or SPARK_HOME is not set.") - sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME")) - } - val sourcePath = Paths.get(sparkHome, "python").toAbsolutePath - val py4jPath = Paths.get( - sparkHome, "python", "lib", PythonUtils.PY4J_ZIP_NAME).toAbsolutePath - val merged = mergePythonPaths(sourcePath.toString, py4jPath.toString) - // Adds a additional path to search Python packages for testing. - additionalTestingPath.map(mergePythonPaths(_, merged)).getOrElse(merged) - } else { - PythonUtils.sparkPythonPath - } - val pythonPath = PythonUtils.mergePythonPaths( - sourcePython, sys.env.getOrElse("PYTHONPATH", "")) - - val pythonVer: String = - Process( - Seq(pythonExec, "-c", "import sys; print('%d.%d' % sys.version_info[:2])"), - None, - "PYTHONPATH" -> pythonPath).!!.trim() - - SimplePythonFunction( - command = command.toImmutableArraySeq, - envVars = mutable.Map("PYTHONPATH" -> pythonPath).asJava, - pythonIncludes = List.empty.asJava, - pythonExec = pythonExec, - pythonVer = pythonVer, - broadcastVars = List.empty.asJava, - accumulator = null) - } } diff --git a/python/pyspark/sql/worker/lookup_data_sources.py b/python/pyspark/sql/worker/lookup_data_sources.py deleted file mode 100644 index 91963658ee61..000000000000 --- a/python/pyspark/sql/worker/lookup_data_sources.py +++ /dev/null @@ -1,99 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from importlib import import_module -from pkgutil import iter_modules -import os -import sys -from typing import IO - -from pyspark.accumulators import _accumulatorRegistry -from pyspark.java_gateway import local_connect_and_auth -from pyspark.serializers import ( - read_int, - write_int, - write_with_length, - SpecialLengths, -) -from pyspark.sql.datasource import DataSource -from pyspark.util import handle_worker_exception -from pyspark.worker_util import ( - check_python_version, - pickleSer, - send_accumulator_updates, - setup_broadcasts, - setup_memory_limits, - setup_spark_files, -) - - -def main(infile: IO, outfile: IO) -> None: - """ - Main method for looking up the available Python Data Sources in Python path. - - This process is invoked from the `UserDefinedPythonDataSourceLookupRunner.runInPython` - method in `UserDefinedPythonDataSource.lookupAllDataSourcesInPython` when the first - call related to Python Data Source happens via `DataSourceManager`. - - This is responsible for searching the available Python Data Sources so they can be - statically registered automatically. - """ - try: - check_python_version(infile) - - memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) - setup_memory_limits(memory_limit_mb) - - setup_spark_files(infile) - setup_broadcasts(infile) - - _accumulatorRegistry.clear() - - infos = {} - for info in iter_modules(): - if info.name.startswith("pyspark_"): - mod = import_module(info.name) - if hasattr(mod, "DefaultSource") and issubclass(mod.DefaultSource, DataSource): - infos[mod.DefaultSource.name()] = mod.DefaultSource - - # Writes name -> pickled data source to JVM side to be registered - # as a Data Source. - write_int(len(infos), outfile) - for name, dataSource in infos.items(): - write_with_length(name.encode("utf-8"), outfile) - pickleSer._write_with_length(dataSource, outfile) - - except BaseException as e: - handle_worker_exception(e, outfile) - sys.exit(-1) - - send_accumulator_updates(outfile) - - # check end of stream - if read_int(infile) == SpecialLengths.END_OF_STREAM: - write_int(SpecialLengths.END_OF_STREAM, outfile) - else: - # write a different value to tell JVM to not reuse this worker - write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) - sys.exit(-1) - - -if __name__ == "__main__": - # Read information about how to connect back to the JVM from the environment. - java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) - auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"] - (sock_file, _) = local_connect_and_auth(java_port, auth_secret) - main(sock_file, sock_file) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala index c207645ce526..e6c4749df60a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala @@ -20,9 +20,6 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale import java.util.concurrent.ConcurrentHashMap -import scala.jdk.CollectionConverters._ - -import org.apache.spark.api.python.PythonUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.python.UserDefinedPythonDataSource @@ -33,13 +30,9 @@ import org.apache.spark.sql.execution.python.UserDefinedPythonDataSource * their short names or fully qualified names. */ class DataSourceManager extends Logging { - // Lazy to avoid being invoked during Session initialization. - // Otherwise, it goes infinite loop, session -> Python runner -> SQLConf -> session. - private lazy val dataSourceBuilders = { - val builders = new ConcurrentHashMap[String, UserDefinedPythonDataSource]() - builders.putAll(DataSourceManager.initialDataSourceBuilders.asJava) - builders - } + // TODO(SPARK-45917): Statically load Python Data Source so idempotently Python + // Data Sources can be loaded even when the Driver is restarted. + private val dataSourceBuilders = new ConcurrentHashMap[String, UserDefinedPythonDataSource]() private def normalize(name: String): String = name.toLowerCase(Locale.ROOT) @@ -80,20 +73,3 @@ class DataSourceManager extends Logging { manager } } - - -object DataSourceManager { - // Visiable for testing - private[spark] var dataSourceBuilders: Option[Map[String, UserDefinedPythonDataSource]] = None - private def initialDataSourceBuilders = this.synchronized { - if (dataSourceBuilders.isEmpty) { - val result = UserDefinedPythonDataSource.lookupAllDataSourcesInPython() - val builders = result.names.zip(result.dataSources).map { case (name, dataSource) => - name -> - UserDefinedPythonDataSource(PythonUtils.createPythonFunction(dataSource)) - }.toMap - dataSourceBuilders = Some(builders) - } - dataSourceBuilders.get - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala index 5f66210ad9c6..778f55595aee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala @@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._ import net.razorvine.pickle.Pickler import org.apache.spark.{JobArtifactSet, SparkException} -import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonUtils, PythonWorkerUtils, SimplePythonFunction, SpecialLengths} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonWorkerUtils, SimplePythonFunction, SpecialLengths} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.PythonUDF @@ -404,59 +404,6 @@ object UserDefinedPythonDataSource { * The schema of the output to the Python data source write function. */ val writeOutputSchema: StructType = new StructType().add("message", BinaryType) - - /** - * (Driver-side) Look up all available Python Data Sources. - */ - def lookupAllDataSourcesInPython(): PythonLookupAllDataSourcesResult = { - new UserDefinedPythonDataSourceLookupRunner( - PythonUtils.createPythonFunction(Array.empty[Byte])).runInPython() - } -} - -/** - * All Data Sources in Python - */ -case class PythonLookupAllDataSourcesResult( - names: Array[String], dataSources: Array[Array[Byte]]) - -/** - * A runner used to look up Python Data Sources available in Python path. - */ -class UserDefinedPythonDataSourceLookupRunner(lookupSources: PythonFunction) - extends PythonPlannerRunner[PythonLookupAllDataSourcesResult](lookupSources) { - - override val workerModule = "pyspark.sql.worker.lookup_data_sources" - - override protected def writeToPython(dataOut: DataOutputStream, pickler: Pickler): Unit = { - // No input needed. - } - - override protected def receiveFromPython( - dataIn: DataInputStream): PythonLookupAllDataSourcesResult = { - // Receive the pickled data source or an exception raised in Python worker. - val length = dataIn.readInt() - if (length == SpecialLengths.PYTHON_EXCEPTION_THROWN) { - val msg = PythonWorkerUtils.readUTF(dataIn) - throw QueryCompilationErrors.failToPlanDataSourceError( - action = "lookup", tpe = "instance", msg = msg) - } - - val shortNames = ArrayBuffer.empty[String] - val pickledDataSources = ArrayBuffer.empty[Array[Byte]] - val numDataSources = length - - for (_ <- 0 until numDataSources) { - val shortName = PythonWorkerUtils.readUTF(dataIn) - val pickledDataSource: Array[Byte] = PythonWorkerUtils.readBytes(dataIn) - shortNames.append(shortName) - pickledDataSources.append(pickledDataSource) - } - - PythonLookupAllDataSourcesResult( - names = shortNames.toArray, - dataSources = pickledDataSources.toArray) - } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala index 45ee472ee638..c261f1d529fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala @@ -17,16 +17,11 @@ package org.apache.spark.sql.execution.python -import java.io.{File, FileWriter} - import org.apache.spark.SparkException -import org.apache.spark.api.python.PythonUtils import org.apache.spark.sql.{AnalysisException, IntegratedUDFTestUtils, QueryTest, Row} -import org.apache.spark.sql.execution.datasources.DataSourceManager import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType -import org.apache.spark.util.Utils class PythonDataSourceSuite extends QueryTest with SharedSparkSession { import IntegratedUDFTestUtils._ @@ -34,7 +29,7 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession { setupTestData() private def dataSourceName = "SimpleDataSource" - private val simpleDataSourceReaderScript: String = + private def simpleDataSourceReaderScript: String = """ |from pyspark.sql.datasource import DataSourceReader, InputPartition |class SimpleDataSourceReader(DataSourceReader): @@ -45,56 +40,6 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession { | yield (1, partition.value) | yield (2, partition.value) |""".stripMargin - private val staticSourceName = "custom_source" - private var tempDir: File = _ - - override def beforeAll(): Unit = { - // Create a Python Data Source package before starting up the Spark Session - // that triggers automatic registration of the Python Data Source. - val dataSourceScript = - s""" - |from pyspark.sql.datasource import DataSource, DataSourceReader - |$simpleDataSourceReaderScript - | - |class DefaultSource(DataSource): - | def schema(self) -> str: - | return "id INT, partition INT" - | - | def reader(self, schema): - | return SimpleDataSourceReader() - | - | @classmethod - | def name(cls): - | return "$staticSourceName" - |""".stripMargin - tempDir = Utils.createTempDir() - // Write a temporary package to test. - // tmp/my_source - // tmp/my_source/__init__.py - val packageDir = new File(tempDir, "pyspark_mysource") - assert(packageDir.mkdir()) - Utils.tryWithResource( - new FileWriter(new File(packageDir, "__init__.py")))(_.write(dataSourceScript)) - // So Spark Session initialization can lookup this temporary directory. - DataSourceManager.dataSourceBuilders = None - PythonUtils.additionalTestingPath = Some(tempDir.toString) - super.beforeAll() - } - - override def afterAll(): Unit = { - try { - Utils.deleteRecursively(tempDir) - PythonUtils.additionalTestingPath = None - } finally { - super.afterAll() - } - } - - test("SPARK-45917: automatic registration of Python Data Source") { - assume(shouldTestPandasUDFs) - val df = spark.read.format(staticSourceName).load() - checkAnswer(df, Seq(Row(0, 0), Row(0, 1), Row(1, 0), Row(1, 1), Row(2, 0), Row(2, 1))) - } test("simple data source") { assume(shouldTestPandasUDFs)