Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions .github/workflows/prestocpp-linux-build-and-unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,84 @@ jobs:
-Duser.timezone=America/Bahia_Banderas \
-T1C

prestocpp-linux-presto-on-spark-e2e-tests:
needs: prestocpp-linux-build-for-test
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
storage-format: [ "PARQUET", "DWRF" ]
enable-sidecar: [ "true", "false" ]
container:
image: prestodb/presto-native-dependency:0.293-20250522140509-484b00e
env:
MAVEN_OPTS: "-Xmx4G -XX:+ExitOnOutOfMemoryError"
MAVEN_FAST_INSTALL: "-B -V --quiet -T 1C -DskipTests -Dair.check.skip-all -Dmaven.javadoc.skip=true"
MAVEN_TEST: "-B -Dair.check.skip-all -Dmaven.javadoc.skip=true -DLogTestDurationListener.enabled=true --fail-at-end"
steps:
- uses: actions/checkout@v4

- name: Fix git permissions
# Usually actions/checkout does this but as we run in a container
# it doesn't work
run: git config --global --add safe.directory ${GITHUB_WORKSPACE}

- name: Download artifacts
uses: actions/download-artifact@v4
with:
name: presto-native-build
path: presto-native-execution/_build/release

# Permissions are lost when uploading. Details here: https://github.com/actions/upload-artifact/issues/38
- name: Restore execute permissions and library path
run: |
chmod +x ${GITHUB_WORKSPACE}/presto-native-execution/_build/release/presto_cpp/main/presto_server
chmod +x ${GITHUB_WORKSPACE}/presto-native-execution/_build/release/velox/velox/functions/remote/server/velox_functions_remote_server_main
# Ensure transitive dependency libboost-iostreams is found.
ldconfig /usr/local/lib

- name: Install OpenJDK8
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '8.0.442'
cache: 'maven'
- name: Download nodejs to maven cache
run: .github/bin/download_nodejs

- name: Maven install
env:
# Use different Maven options to install.
MAVEN_OPTS: "-Xmx2G -XX:+ExitOnOutOfMemoryError"
run: |
for i in $(seq 1 3); do ./mvnw clean install $MAVEN_FAST_INSTALL -pl 'presto-native-tests' -am && s=0 && break || s=$? && sleep 10; done; (exit $s)

- name: Run presto-on-spark native tests
run: |
export PRESTO_SERVER_PATH="${GITHUB_WORKSPACE}/presto-native-execution/_build/release/presto_cpp/main/presto_server"
export TESTFILES=`find ./presto-native-execution/src/test -type f -name 'TestPrestoSpark*.java'`
# Convert file paths to comma separated class names
export TESTCLASSES=TestPrestoSparkExpressionCompiler,TestPrestoSparkNativeBitwiseFunctionQueries,TestPrestoSparkNativeTpchConnectorQueries,TestPrestoSparkNativeSimpleQueries,TestPrestoSparkSqlFunctions,TestPrestoSparkNativeTpchQueries
for test_file in $TESTFILES
do
tmp=${test_file##*/}
test_class=${tmp%%\.*}
export TESTCLASSES="${TESTCLASSES},$test_class"
done
export TESTCLASSES=${TESTCLASSES#,}
echo "TESTCLASSES = $TESTCLASSES"

mvn test \
${MAVEN_TEST} \
-pl 'presto-native-execution' \
-DstorageFormat=${{ matrix.storage-format }} \
-DsidecarEnabled=${{ matrix.enable-sidecar }} \
-Dtest="${TESTCLASSES}" \
-DPRESTO_SERVER=${PRESTO_SERVER_PATH} \
-DDATA_DIR=${RUNNER_TEMP} \
-Duser.timezone=America/Bahia_Banderas \
-T1C

prestocpp-linux-presto-sidecar-tests:
needs: prestocpp-linux-build-for-test
runs-on: ubuntu-22.04
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2440,9 +2440,9 @@
</dependency>

<dependency>
<groupId>com.facebook.presto.spark</groupId>
<artifactId>spark-core</artifactId>
<version>2.0.2-6</version>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.4.0</version>
<scope>provided</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,26 @@ public SystemSessionProperties(
"Enable execution on native engine",
featuresConfig.isNativeExecutionEnabled(),
true),
stringProperty(
NATIVE_EXECUTION_EXECUTABLE_PATH,
"The native engine executable file path for native engine execution",
featuresConfig.getNativeExecutionExecutablePath(),
false),
stringProperty(
NATIVE_EXECUTION_PROGRAM_ARGUMENTS,
"Program arguments for native engine execution. The main target use case for this " +
"property is to control logging levels using glog flags. E,g, to enable verbose mode, add " +
"'--v 1'. More advanced glog gflags usage can be found at " +
"https://rpg.ifi.uzh.ch/docs/glog.html\n" +
"e.g. --vmodule=mapreduce=2,file=1,gfs*=3 --v=0\n" +
"will:\n" +
"\n" +
"a. Print VLOG(2) and lower messages from mapreduce.{h,cc}\n" +
"b. Print VLOG(1) and lower messages from file.{h,cc}\n" +
"c. Print VLOG(3) and lower messages from files prefixed with \"gfs\"\n" +
"d. Print VLOG(0) and lower messages from elsewhere",
featuresConfig.getNativeExecutionProgramArguments(),
false),
booleanProperty(
NATIVE_EXECUTION_PROCESS_REUSE_ENABLED,
"Enable reuse the native process within the same JVM",
Expand Down
9 changes: 7 additions & 2 deletions presto-native-execution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down Expand Up @@ -206,8 +211,8 @@
</dependency>

<dependency>
<groupId>com.facebook.presto.spark</groupId>
<artifactId>spark-core</artifactId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spark;

import com.facebook.airlift.log.Logging;
import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils;
import com.facebook.presto.spark.execution.nativeprocess.NativeExecutionModule;
import com.facebook.presto.spark.execution.property.NativeExecutionConnectorConfig;
import com.facebook.presto.spi.security.PrincipalType;
import com.facebook.presto.testing.QueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Module;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Optional;

import static com.facebook.airlift.log.Level.WARN;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerHiveProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerSystemProperties;
import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.getNativeQueryRunnerParameters;
import static com.facebook.presto.spark.PrestoSparkQueryRunner.METASTORE_CONTEXT;
import static java.lang.String.format;
import static java.nio.file.Files.createTempDirectory;

/**
* Following JVM argument is needed to run Spark native tests.
* <p>
* - PRESTO_SERVER
* - This tells Spark where to find the Presto native binary to launch the process.
* Example: -DPRESTO_SERVER=/path/to/native/process/bin
* <p>
* - DATA_DIR
* - Optional path to store TPC-H tables used in the test. If this directory is empty, it will be
* populated. If tables already exists, they will be reused.
* <p>
* Tests can be running in Interactive Debugging Mode that allows for easier debugging
* experience. Instead of launching its own native process, the test will connect to an existing
* native process. This gives developers flexibility to connect IDEA and debuggers to the native process.
* Enable this mode by setting NATIVE_PORT JVM argument.
* <p>
* - NATIVE_PORT
* - This is the port your externally launched native process listens to. It is used to tell Spark where to send
* requests. This port number has to be the same as to which your externally launched process listens.
* Example: -DNATIVE_PORT=7777.
* When NATIVE_PORT is specified, PRESTO_SERVER argument is not requires and is ignored if specified.
* <p>
* For test queries requiring shuffle, the disk-based local shuffle will be used.
*/
public class PrestoSparkNativeQueryRunnerUtils
{
private static final int AVAILABLE_CPU_COUNT = 4;
private static final String SPARK_SHUFFLE_MANAGER = "spark.shuffle.manager";
private static final String FALLBACK_SPARK_SHUFFLE_MANAGER = "spark.fallback.shuffle.manager";
private static final String DEFAULT_STORAGE_FORMAT = "DWRF";
private static Optional<Path> dataDirectory = Optional.empty();

private PrestoSparkNativeQueryRunnerUtils() {}

public static Map<String, String> getNativeExecutionSessionConfigs()
{
ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<String, String>()
// Do not use default Prestissimo config files. Presto-Spark will generate the configs on-the-fly.
.put("catalog.config-dir", "/")
.put("task.info-update-interval", "100ms")
.put("spark.initial-partition-count", "1")
.put("register-test-functions", "true")
.put("native-execution-program-arguments", "--logtostderr=1 --minloglevel=3")
.put("spark.partition-count-auto-tune-enabled", "false");

if (System.getProperty("NATIVE_PORT") == null) {
builder.put("native-execution-executable-path", getNativeQueryRunnerParameters().serverBinary.toString());
}

try {
builder.put("native-execution-broadcast-base-path",
Files.createTempDirectory("native_broadcast").toAbsolutePath().toString());
}
catch (IOException e) {
throw new UncheckedIOException("Error creating temporary directory for broadcast", e);
}

return builder.build();
}

public static PrestoSparkQueryRunner createHiveRunner()
{
PrestoSparkQueryRunner queryRunner = createRunner("hive", new NativeExecutionModule());
PrestoNativeQueryRunnerUtils.setupJsonFunctionNamespaceManager(queryRunner, "external_functions.json", "json");

return queryRunner;
}

private static PrestoSparkQueryRunner createRunner(String defaultCatalog, NativeExecutionModule nativeExecutionModule)
{
// Increases log level to reduce log spamming while running test.
customizeLogging();
return createRunner(
defaultCatalog,
Optional.of(getBaseDataPath()),
getNativeExecutionSessionConfigs(),
getNativeExecutionShuffleConfigs(),
ImmutableList.of(nativeExecutionModule));
}

// Similar to createPrestoSparkNativeQueryRunner, but with custom connector config and without jsonFunctionNamespaceManager
public static PrestoSparkQueryRunner createTpchRunner()
{
return createRunner(
"tpchstandard",
new NativeExecutionModule(
Optional.of(new NativeExecutionConnectorConfig().setConnectorName("tpch"))));
}

public static PrestoSparkQueryRunner createRunner(String defaultCatalog, Optional<Path> baseDir, Map<String, String> additionalConfigProperties, Map<String, String> additionalSparkProperties, ImmutableList<Module> nativeModules)
{
ImmutableMap.Builder<String, String> configBuilder = ImmutableMap.builder();
configBuilder.putAll(getNativeWorkerSystemProperties()).putAll(additionalConfigProperties);
Optional<Path> dataDir = baseDir.map(path -> Paths.get(path.toString() + '/' + DEFAULT_STORAGE_FORMAT));
PrestoSparkQueryRunner queryRunner = new PrestoSparkQueryRunner(
defaultCatalog,
configBuilder.build(),
getNativeWorkerHiveProperties(),
additionalSparkProperties,
dataDir,
nativeModules,
AVAILABLE_CPU_COUNT);

ExtendedHiveMetastore metastore = queryRunner.getMetastore();
if (!metastore.getDatabase(METASTORE_CONTEXT, "tpch").isPresent()) {
metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject("tpch"));
}
return queryRunner;
}

public static QueryRunner createJavaQueryRunner()
throws Exception
{
return PrestoNativeQueryRunnerUtils.javaHiveQueryRunnerBuilder()
.setAddStorageFormatToPath(true)
.setStorageFormat(DEFAULT_STORAGE_FORMAT)
.build();
}

public static void customizeLogging()
{
Logging logging = Logging.initialize();
logging.setLevel("org.apache.spark", WARN);
logging.setLevel("com.facebook.presto.spark", WARN);
}

private static Database createDatabaseMetastoreObject(String name)
{
return Database.builder()
.setDatabaseName(name)
.setOwnerName("public")
.setOwnerType(PrincipalType.ROLE)
.build();
}

private static Map<String, String> getNativeExecutionShuffleConfigs()
{
ImmutableMap.Builder<String, String> sparkConfigs = ImmutableMap.builder();
sparkConfigs.put(SPARK_SHUFFLE_MANAGER, "com.facebook.presto.spark.classloader_interface.PrestoSparkNativeExecutionShuffleManager");
sparkConfigs.put(FALLBACK_SPARK_SHUFFLE_MANAGER, "org.apache.spark.shuffle.sort.SortShuffleManager");
return sparkConfigs.build();
}

public static synchronized Path getBaseDataPath()
{
if (dataDirectory.isPresent()) {
return dataDirectory.get();
}

Optional<String> dataDirectoryStr = getProperty("DATA_DIR");
if (!dataDirectoryStr.isPresent()) {
try {
dataDirectory = Optional.of(createTempDirectory("PrestoTest").toAbsolutePath());
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
else {
dataDirectory = Optional.of(getNativeQueryRunnerParameters().dataDirectory);
}
return dataDirectory.get();
}

// This is a temporary replacement for the function from
// HiveTestUtils.getProperty. But HiveTestUtils instantiation seems to be
// failing due to missing info in Hdfs setup. Until we fix that, this is a
// copy of that function. As its a simple utility function, we are ok to punt
// fixing this
// TODO: Use HiveTestUtils.getProperty and delete this function
public static Optional<String> getProperty(String name)
{
String systemPropertyValue = System.getProperty(name);
String environmentVariableValue = System.getenv(name);
if (systemPropertyValue == null) {
if (environmentVariableValue == null) {
return Optional.empty();
}
else {
return Optional.of(environmentVariableValue);
}
}
else {
if (environmentVariableValue != null && !systemPropertyValue.equals(environmentVariableValue)) {
throw new IllegalArgumentException(format("%s is set in both Java system property and environment variable, but their values are different. The Java system property value is %s, while the" +
" environment variable value is %s. Please use only one value.",
name,
systemPropertyValue,
environmentVariableValue));
}
return Optional.of(systemPropertyValue);
}
}
}
Loading
Loading