Skip to content

Commit

Permalink
Generate external transform wrappers using a script (#29834)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 authored Feb 22, 2024
1 parent 450e7a7 commit 11f9bce
Show file tree
Hide file tree
Showing 33 changed files with 1,347 additions and 282 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Empty file.
Empty file.
1 change: 1 addition & 0 deletions .github/workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ PreCommit Jobs run in a schedule and also get triggered in a PR if relevant sour
| [ PreCommit Website ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml) | N/A |`Run Website PreCommit`| [![.github/workflows/beam_PreCommit_Website.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml?query=event%3Aschedule) |
| [ PreCommit Website Stage GCS ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml) | N/A |`Run Website_Stage_GCS PreCommit`| [![.github/workflows/beam_PreCommit_Website_Stage_GCS.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml?query=event%3Aschedule) |
| [ PreCommit Whitespace ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml) | N/A |`Run Whitespace PreCommit`| [![.github/workflows/beam_PreCommit_Whitespace.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml?query=event%3Aschedule) |
| [ PreCommit Xlang Generated Transforms ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml) | N/A |`Run Xlang_Generated_Transforms PreCommit`| [![.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml?query=event%3Aschedule) |

### PostCommit Jobs

Expand Down
114 changes: 114 additions & 0 deletions .github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: PreCommit Xlang Generated Transforms

on:
push:
tags: ['v*']
branches: ['master', 'release-*']
paths:
- 'model/**'
- 'sdks/python/**'
- 'sdks/java/expansion-service/**'
- 'sdks/java/core/**'
- 'sdks/java/io/**'
- 'sdks/java/extensions/sql/**'
- 'release/**'
- '.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml'
pull_request_target:
branches: ['master', 'release-*']
paths:
- 'model/**'
- 'sdks/python/**'
- 'sdks/java/expansion-service/**'
- 'sdks/java/core/**'
- 'sdks/java/io/**'
- 'sdks/java/extensions/sql/**'
- 'release/**'
- 'release/trigger_all_tests.json'
- '.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml'
issue_comment:
types: [created]
schedule:
- cron: '30 2/6 * * *'
workflow_dispatch:

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: read
checks: read
contents: read
deployments: read
id-token: none
issues: read
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true

env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
beam_PreCommit_Xlang_Generated_Transforms:
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{ matrix.python_version }})
timeout-minutes: 120
runs-on: ['self-hosted', ubuntu-20.04, main]
strategy:
fail-fast: false
matrix:
job_name: ['beam_PreCommit_Xlang_Generated_Transforms']
job_phrase: ['Run Xlang_Generated_Transforms PreCommit']
python_version: ['3.8']
if: |
github.event_name == 'push' ||
github.event_name == 'workflow_dispatch' ||
github.event_name == 'pull_request_target' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
startsWith(github.event.comment.body, 'Run Xlang_Generated_Transforms PreCommit')
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
java-version: 8
python-version: ${{ matrix.python_version }}
- name: Set PY_VER_CLEAN
id: set_py_ver_clean
run: |
PY_VER=${{ matrix.python_version }}
PY_VER_CLEAN=${PY_VER//.}
echo "py_ver_clean=$PY_VER_CLEAN" >> $GITHUB_OUTPUT
- name: run Cross-Language Wrapper Validation script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ sdks/python/**/*.egg
sdks/python/LICENSE
sdks/python/NOTICE
sdks/python/README.md
sdks/python/apache_beam/transforms/xlang/*
sdks/python/apache_beam/portability/api/*
sdks/python/apache_beam/yaml/docs/*
sdks/python/nosetests*.xml
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* The Python SDK will now include automatically generated wrappers for external Java transforms! ([#29834](https://github.com/apache/beam/pull/29834))

## I/Os

Expand Down
5 changes: 5 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,11 @@ tasks.register("checkSetup") {
dependsOn(":examples:java:wordCount")
}

// Generates external transform config
project.tasks.register("generateExternalTransformsConfig") {
dependsOn(":sdks:python:generateExternalTransformsConfig")
}

// Configure the release plugin to do only local work; the release manager determines what, if
// anything, to push. On failure, the release manager can reset the branch without pushing.
release {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,19 +321,17 @@ class BeamModulePlugin implements Plugin<Project> {

// A class defining the common properties in a given suite of cross-language tests
// Properties are shared across runners and are used when creating a CrossLanguageUsingJavaExpansionConfiguration object
static class CrossLanguageTaskCommon {
static class CrossLanguageTask {
// Used as the task name for cross-language
String name
// The expansion service's project path (required)
String expansionProjectPath
// List of project paths for required expansion services
List<String> expansionProjectPaths
// Collect Python pipeline tests with this marker
String collectMarker
// Job server startup task.
TaskProvider startJobServer
// Job server cleanup task.
TaskProvider cleanupJobServer
// any additional environment variables specific to the suite of tests
// Additional environment variables to set before running tests
Map<String,String> additionalEnvs
// Additional Python dependencies to install before running tests
List<String> additionalDeps
}

// A class defining the configuration for CrossLanguageUsingJavaExpansion.
Expand All @@ -349,18 +347,16 @@ class BeamModulePlugin implements Plugin<Project> {
]
// Additional pytest options
List<String> pytestOptions = []
// Job server startup task.
TaskProvider startJobServer
// Job server cleanup task.
TaskProvider cleanupJobServer
// Number of parallel test runs.
Integer numParallelTests = 1
// Project path for the expansion service to start up
String expansionProjectPath
// List of project paths for required expansion services
List<String> expansionProjectPaths
// Collect Python pipeline tests with this marker
String collectMarker
// any additional environment variables to be exported
Map<String,String> additionalEnvs
// Additional Python dependencies to install before running tests
List<String> additionalDeps
}

// A class defining the configuration for CrossLanguageValidatesRunner.
Expand Down Expand Up @@ -2576,7 +2572,7 @@ class BeamModulePlugin implements Plugin<Project> {
/** ***********************************************************************************************/
// Method to create the createCrossLanguageUsingJavaExpansionTask.
// The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
// This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
// This method creates a task that runs Python SDK test-suites that use external Java transforms
project.ext.createCrossLanguageUsingJavaExpansionTask = {
// This task won't work if the python build file doesn't exist.
if (!project.project(":sdks:python").buildFile.exists()) {
Expand All @@ -2586,49 +2582,29 @@ class BeamModulePlugin implements Plugin<Project> {
def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()

project.evaluationDependsOn(":sdks:python")
project.evaluationDependsOn(config.expansionProjectPath)
for (path in config.expansionProjectPaths) {
project.evaluationDependsOn(path)
}
project.evaluationDependsOn(":sdks:java:extensions:python")

// Setting up args to launch the expansion service
def pythonDir = project.project(":sdks:python").projectDir
def javaExpansionPort = -1 // will be populated in setupTask
def expansionJar = project.project(config.expansionProjectPath).shadowJar.archivePath
def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
def expansionServiceOpts = [
"group_id": project.name,
"java_expansion_service_jar": expansionJar,
"java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
]
def usesDataflowRunner = config.pythonPipelineOptions.contains("--runner=TestDataflowRunner") || config.pythonPipelineOptions.contains("--runner=DataflowRunner")
def javaContainerSuffix = getSupportedJavaVersion()

// 1. Builds the chosen expansion service jar and launches it
def setupTask = project.tasks.register(config.name+"Setup") {
dependsOn ':sdks:java:container:' + javaContainerSuffix + ':docker'
dependsOn project.project(config.expansionProjectPath).shadowJar.getPath()
dependsOn 'installGcpTest'
// Sets up, collects, and runs Python pipeline tests
project.tasks.register(config.name+"PythonUsingJava") {
group = "Verification"
description = "Runs Python SDK pipeline tests that use a Java expansion service"
// Each expansion service we use needs to be built before running these tests
// The built jars will be started up automatically using the BeamJarExpansionService utility
for (path in config.expansionProjectPaths) {
dependsOn project.project(path).shadowJar.getPath()
}
dependsOn ":sdks:java:container:$javaContainerSuffix:docker"
dependsOn "installGcpTest"
if (usesDataflowRunner) {
dependsOn ":sdks:python:test-suites:dataflow:py${project.ext.pythonVersion.replace('.', '')}:initializeForDataflowJob"
}
doLast {
project.exec {
// Prepare a port to use for the expansion service
javaExpansionPort = getRandomPort()
expansionServiceOpts.put("java_port", javaExpansionPort)
// setup test env
def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && $pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} && $pythonDir/scripts/run_expansion_services.sh start $serviceArgs"
}
}
}

// 2. Sets up, collects, and runs Python pipeline tests
def pythonTask = project.tasks.register(config.name+"PythonUsingJava") {
group = "Verification"
description = "Runs Python SDK pipeline tests that use a Java expansion service"
dependsOn setupTask
dependsOn config.startJobServer
doLast {
def beamPythonTestPipelineOptions = [
"pipeline_opts": config.pythonPipelineOptions + (usesDataflowRunner ? [
Expand All @@ -2641,29 +2617,19 @@ class BeamModulePlugin implements Plugin<Project> {
def cmdArgs = project.project(':sdks:python').mapToArgString(beamPythonTestPipelineOptions)

project.exec {
environment "EXPANSION_JAR", expansionJar
environment "EXPANSION_PORT", javaExpansionPort
for (envs in config.additionalEnvs){
environment envs.getKey(), envs.getValue()
// environment variable to indicate that jars have been built
environment "EXPANSION_JARS", config.expansionProjectPaths
String additionalDependencyCmd = ""
if (config.additionalDeps != null && !config.additionalDeps.isEmpty()){
additionalDependencyCmd = "&& pip install ${config.additionalDeps.join(' ')} "
}
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs"
args '-c', ". ${project.ext.envdir}/bin/activate " +
additionalDependencyCmd +
"&& cd $pythonDir && ./scripts/run_integration_test.sh $cmdArgs"
}
}
}

// 3. Shuts down the expansion service
def cleanupTask = project.tasks.register(config.name+'Cleanup', Exec) {
// teardown test env
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && $pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name}"
}

setupTask.configure {finalizedBy cleanupTask}
config.startJobServer.configure {finalizedBy config.cleanupJobServer}

cleanupTask.configure{mustRunAfter pythonTask}
config.cleanupJobServer.configure{mustRunAfter pythonTask}
}

/** ***********************************************************************************************/
Expand Down
2 changes: 2 additions & 0 deletions scripts/ci/release/test/resources/mass_comment.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Run PythonDocs PreCommit
Run PythonFormatter PreCommit
Run PythonLint PreCommit
Run Python_PVR_Flink PreCommit
Run Python_Xlang_Gcp_Direct PostCommit
Run RAT PreCommit
Run SQL PostCommit
Run SQL PreCommit
Expand All @@ -94,6 +95,7 @@ Run Twister2 ValidatesRunner
Run Typescript PreCommit
Run ULR Loopback ValidatesRunner
Run Whitespace PreCommit
Run Xlang_Generated_Transforms PreCommit
Run XVR_Direct PostCommit
Run XVR_Flink PostCommit
Run XVR_JavaUsingPython_Dataflow PostCommit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public List<String> outputCollectionNames() {
public String description() {
return String.format(
"Outputs a PCollection of Beam Rows, each containing a single INT64 "
+ "number called \"value\". The count is produced from the given \"start\""
+ "number called \"value\". The count is produced from the given \"start\" "
+ "value and either up to the given \"end\" or until 2^63 - 1.%n"
+ "To produce an unbounded PCollection, simply do not specify an \"end\" value. "
+ "Unbounded sequences can specify a \"rate\" for output elements.%n"
Expand Down
1 change: 1 addition & 0 deletions sdks/python/MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

include gen_protos.py
include gen_xlang_wrappers.py
include README.md
include NOTICE
include LICENSE
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.io.gcp.bigquery import *
from apache_beam.io.gcp.pubsub import *
from apache_beam.io.gcp import gcsio
from apache_beam.transforms.xlang.io import *
except ImportError:
pass
# pylint: enable=wrong-import-order, wrong-import-position
Loading

0 comments on commit 11f9bce

Please sign in to comment.