diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaLakeQueryRunner.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaLakeQueryRunner.java index 1f6753eef266..0a11afa7ab51 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaLakeQueryRunner.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaLakeQueryRunner.java @@ -223,6 +223,29 @@ public static void main(String[] args) } } + public static final class DeltaLakeSparkQueryRunnerMain + { + private DeltaLakeSparkQueryRunnerMain() {} + + public static void main(String[] args) + throws Exception + { + String bucketName = "test-bucket"; + SparkDeltaLake sparkDeltaLake = new SparkDeltaLake(bucketName); + + QueryRunner queryRunner = builder() + .addCoordinatorProperty("http-server.http.port", "8080") + .addMetastoreProperties(sparkDeltaLake.hiveHadoop()) + .addS3Properties(sparkDeltaLake.minio(), bucketName) + .addDeltaProperty("delta.enable-non-concurrent-writes", "true") + .build(); + + Logger log = Logger.get(DeltaLakeSparkQueryRunnerMain.class); + log.info("======== SERVER STARTED ========"); + log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); + } + } + public static final class S3DeltaLakeQueryRunnerMain { private S3DeltaLakeQueryRunnerMain() {} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/SparkDeltaLake.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/SparkDeltaLake.java new file mode 100644 index 000000000000..6ac5e396d839 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/SparkDeltaLake.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import io.trino.plugin.base.util.AutoCloseableCloser; +import io.trino.plugin.hive.containers.HiveHadoop; +import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.testing.containers.Minio; +import org.testcontainers.containers.GenericContainer; + +import static io.trino.testing.TestingProperties.getDockerImagesVersion; +import static org.testcontainers.utility.MountableFile.forClasspathResource; + +public final class SparkDeltaLake + implements AutoCloseable +{ + private final AutoCloseableCloser closer = AutoCloseableCloser.create(); + private final HiveMinioDataLake hiveMinio; + + public SparkDeltaLake(String bucketName) + { + hiveMinio = closer.register(new HiveMinioDataLake(bucketName)); + hiveMinio.start(); + + closer.register(new GenericContainer<>("ghcr.io/trinodb/testing/spark3-delta:" + getDockerImagesVersion())) + .withCopyFileToContainer(forClasspathResource("spark-defaults.conf"), "/spark/conf/spark-defaults.conf") + .withNetwork(hiveMinio.getNetwork()) + .start(); + } + + public HiveHadoop hiveHadoop() + { + return hiveMinio.getHiveHadoop(); + } + + public Minio minio() + { + return hiveMinio.getMinio(); + } + + @Override + public void close() + throws Exception + { + closer.close(); + } +} diff --git a/plugin/trino-delta-lake/src/test/resources/spark-defaults.conf b/plugin/trino-delta-lake/src/test/resources/spark-defaults.conf new file mode 100644 index 000000000000..b81f050bf587 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/spark-defaults.conf @@ -0,0 +1,20 @@ +spark.driver.memory=2g + +spark.sql.catalogImplementation=hive +spark.sql.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse +spark.sql.hive.thriftServer.singleSession=false + +spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension +spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog + +spark.hadoop.fs.defaultFS=hdfs://hadoop-master:9000 +spark.hive.metastore.uris=thrift://hadoop-master:9083 +spark.hive.metastore.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse +spark.hive.metastore.schema.verification=false + +spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3n.impl=org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.endpoint=http://minio:4566 +spark.hadoop.fs.s3a.path.style.access=true +spark.hadoop.fs.s3a.access.key=accesskey +spark.hadoop.fs.s3a.secret.key=secretkey diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java index 06a9c23bf212..ae7178287135 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveMinioDataLake.java @@ -45,6 +45,7 @@ public class HiveMinioDataLake private final HiveHadoop hiveHadoop; private final AutoCloseableCloser closer = AutoCloseableCloser.create(); + private final Network network; private State state = State.INITIAL; private MinioClient minioClient; @@ -62,7 +63,7 @@ public HiveMinioDataLake(String bucketName, String hiveHadoopImage) public HiveMinioDataLake(String bucketName, Map hiveHadoopFilesToMount, String hiveHadoopImage) { this.bucketName = requireNonNull(bucketName, "bucketName is null"); - Network network = closer.register(newNetwork()); + network = closer.register(newNetwork()); this.minio = closer.register( Minio.builder() .withNetwork(network) @@ -104,6 +105,11 @@ public boolean isNotStopped() return state != State.STOPPED; } + public Network getNetwork() + { + return network; + } + public MinioClient getMinioClient() { checkState(state == State.STARTED, "Can't provide client when MinIO state is: %s", state);