Skip to content

Commit 91dc0c3

Browse files
committed
Spark testing setup with DuckLake
This PR adds new tests to CI that run queries from Spark to a DuckLake instance that is set up with Postgres catalog and Minio parquet storage.
1 parent c0b9cd1 commit 91dc0c3

File tree

8 files changed

+304
-4
lines changed

8 files changed

+304
-4
lines changed

.github/workflows/Java.yml

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ env:
1414
AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_DUCKDB_STAGING_KEY }}
1515

1616
jobs:
17+
1718
format-check:
1819
name: Format Check
1920
runs-on: ubuntu-latest
@@ -31,13 +32,29 @@ jobs:
3132
needs: format-check
3233
env:
3334
MANYLINUX_IMAGE: quay.io/pypa/manylinux_2_28_x86_64
35+
# Spark testing env vars below
36+
DUCKDB_JDBC_JAR: ${{ github.workspace }}/build/release/duckdb_jdbc.jar
37+
SPARK_SQL_EXE: ${{ github.workspace }}/sparktest/spark-3.5.3-bin-hadoop3/bin/spark-sql
38+
POSTGRES_HOST: 127.0.0.1
39+
POSTGRES_PORT: 5432
40+
POSTGRES_MAINTENANCE_DB: postgres
41+
POSTGRES_USERNAME: postgres
42+
POSTGRES_PASSWORD: postgres
43+
DUCKLAKE_CATALOG_DB: lake_test
44+
PARQUET_FILE_URL: https://blobs.duckdb.org/data/taxi_2019_04.parquet
45+
SESSION_INIT_SQL_FILE: ${{ github.workspace }}/sparktest/spark-session-init.sql
46+
MINIO_EXE: ${{ github.workspace }}/sparktest/minio
47+
MINIO_PID: ${{ github.workspace }}/sparktest/minio.pid
48+
MC_EXE: ${{ github.workspace }}/sparktest/mc
49+
MINIO_DATA: ${{ github.workspace }}/sparktest/minio_data
50+
MINIO_HOST: 127.0.0.1
51+
MINIO_PORT: 9000
3452
steps:
3553
- uses: actions/checkout@v4
3654
with:
3755
fetch-depth: 0
3856

3957
- name: Build
40-
shell: bash
4158
run: |
4259
docker run \
4360
-v.:/duckdb \
@@ -56,7 +73,6 @@ jobs:
5673
"
5774
5875
- name: JDBC Tests EL8
59-
shell: bash
6076
if: ${{ inputs.skip_tests != 'true' }}
6177
run: |
6278
docker run \
@@ -76,7 +92,6 @@ jobs:
7692
"
7793
7894
- name: JDBC Tests
79-
shell: bash
8095
if: ${{ inputs.skip_tests != 'true' }}
8196
run: |
8297
cat /etc/os-release
@@ -102,7 +117,6 @@ jobs:
102117

103118
- name: CTS tests
104119
if: ${{ inputs.skip_tests != 'true' }}
105-
shell: bash
106120
run: |
107121
docker run \
108122
-v.:/duckdb \
@@ -118,6 +132,51 @@ jobs:
118132
make -C /duckdb/jdbc_compatibility_test_suite_runner test
119133
"
120134
135+
- name: Spark Test Resources
136+
run: |
137+
mkdir sparktest
138+
cd sparktest
139+
cmake ..
140+
141+
- name: Setup Postgres
142+
uses: ikalnytskyi/action-setup-postgres@v7
143+
with:
144+
postgres-version: '17'
145+
username: ${{ env.POSTGRES_USERNAME }}
146+
password: ${{ env.POSTGRES_PASSWORD }}
147+
database: ${{ env.POSTGRES_MAINTENANCE_DB }}
148+
port: ${{ env.POSTGRES_PORT }}
149+
ssl: true
150+
151+
- name: Setup Minio
152+
working-directory: sparktest
153+
run: |
154+
wget -nv https://dl.min.io/server/minio/release/linux-amd64/minio
155+
chmod +x minio
156+
./minio --version
157+
wget -nv https://dl.min.io/client/mc/release/linux-amd64/mc
158+
chmod +x mc
159+
./mc --version
160+
java -version
161+
java ${{ github.workspace }}/src/test/external/SetupMinio.java
162+
163+
- name: Setup DuckLake
164+
working-directory: sparktest
165+
run: |
166+
cat ${{ env.SESSION_INIT_SQL_FILE }}
167+
wget -nv https://github.com/pgjdbc/pgjdbc/releases/download/REL42.7.7/postgresql-42.7.7.jar
168+
java -version
169+
java -cp ${{ env.DUCKDB_JDBC_JAR }}:postgresql-42.7.7.jar ${{ github.workspace }}/src/test/external/SetupDuckLake.java
170+
171+
- name: Spark Tests
172+
working-directory: sparktest
173+
run: |
174+
wget -nv https://blobs.duckdb.org/ci/spark-3.5.3-bin-hadoop3.tgz
175+
tar xf spark-3.5.3-bin-hadoop3.tgz
176+
cat spark-test.sql
177+
java -version
178+
java ${{ github.workspace }}/src/test/external/RunSpark.java spark-test.sql
179+
121180
- name: Deploy
122181
shell: bash
123182
run: |

CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,3 +658,12 @@ add_custom_command(
658658
$<TARGET_FILE_DIR:duckdb_java> $<TARGET_FILE_NAME:duckdb_java>)
659659

660660
add_custom_target(jdbc ALL DEPENDS dummy_jdbc_target)
661+
662+
# test resources
663+
664+
configure_file(
665+
src/test/external/spark-session-init.sql
666+
spark-session-init.sql)
667+
configure_file(
668+
src/test/external/spark-test.sql
669+
spark-test.sql)

CMakeLists.txt.in

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,12 @@ add_custom_command(
184184
$<TARGET_FILE_DIR:duckdb_java> $<TARGET_FILE_NAME:duckdb_java>)
185185

186186
add_custom_target(jdbc ALL DEPENDS dummy_jdbc_target)
187+
188+
# test resources
189+
190+
configure_file(
191+
src/test/external/spark-session-init.sql
192+
spark-session-init.sql)
193+
configure_file(
194+
src/test/external/spark-test.sql
195+
spark-test.sql)

src/test/external/RunSpark.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import static java.lang.ProcessBuilder.Redirect.INHERIT;
2+
import static java.nio.charset.StandardCharsets.UTF_8;
3+
4+
public class RunSpark {
5+
6+
static final String DUCKDB_JDBC_JAR = fromEnv("DUCKDB_JDBC_JAR", "./build/release/duckdb_jdbc.jar");
7+
static final String SPARK_SQL_EXE = fromEnv("SPARK_SQL_EXE", "../spark/spark-3.5.5-bin-hadoop3/bin/spark-sql");
8+
9+
public static void main(String[] args) throws Exception {
10+
if (args.length != 1) {
11+
throw new RuntimeException("Path to Spark SQL script must be specified as a first and only argument");
12+
}
13+
Process ps = new ProcessBuilder(SPARK_SQL_EXE, "--driver-class-path", DUCKDB_JDBC_JAR, "-f", args[0])
14+
.redirectInput(INHERIT)
15+
.redirectError(INHERIT)
16+
.start();
17+
String output = new String(ps.getInputStream().readAllBytes(), UTF_8);
18+
System.out.print(output);
19+
int status = ps.waitFor();
20+
String[] lines = output.split("\n");
21+
if (lines.length < 2 || !"7433139".equals(lines[0]) || !"1.429378704487457E8".equals(lines[1])) {
22+
throw new RuntimeException("Spark SQL test output check failed");
23+
}
24+
if (status == 0) {
25+
System.out.println("Success");
26+
}
27+
System.exit(status);
28+
}
29+
30+
static String fromEnv(String envVarName, String defaultValue) {
31+
String env = System.getenv(envVarName);
32+
if (null != env) {
33+
return env;
34+
}
35+
return defaultValue;
36+
}
37+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import java.sql.Connection;
2+
import java.sql.DriverManager;
3+
import java.sql.ResultSet;
4+
import java.sql.Statement;
5+
6+
class SetupDuckLake {
7+
8+
static final String POSTGRES_HOST = fromEnv("POSTGRES_HOST", "127.0.0.1");
9+
static final String POSTGRES_PORT = fromEnv("POSTGRES_PORT", "5432");
10+
static final String POSTGRES_MAINTENANCE_DB = fromEnv("POSTGRES_MAINTENANCE_DB", "postgres");
11+
static final String POSTGRES_USERNAME = fromEnv("POSTGRES_USERNAME", "postgres");
12+
static final String POSTGRES_PASSWORD = fromEnv("POSTGRES_PASSWORD", "postgres");
13+
static final String POSTGRES_URL = fromEnv("POSTGRES_URL", "jdbc:postgresql://" + POSTGRES_HOST + ":" +
14+
POSTGRES_PORT + "/" + POSTGRES_MAINTENANCE_DB);
15+
static final String DUCKLAKE_CATALOG_DB = fromEnv("DUCKLAKE_CATALOG_DB_NAME", "lake_test");
16+
static final String DUCKLAKE_URL =
17+
fromEnv("DUCKLAKE_URL", "ducklake:postgres:postgresql://" + POSTGRES_USERNAME + ":" + POSTGRES_PASSWORD + "@" +
18+
POSTGRES_HOST + ":" + POSTGRES_PORT + "/" + DUCKLAKE_CATALOG_DB);
19+
static final String PARQUET_FILE_URL =
20+
fromEnv("DUCKLAKE_DATA_PATH", "https://blobs.duckdb.org/data/taxi_2019_04.parquet");
21+
static final String SESSION_INIT_SQL_FILE =
22+
fromEnv("SESSION_INIT_SQL_FILE", "./src/test/external/spark-session-init.sql");
23+
24+
public static void main(String[] args) throws Exception {
25+
setupPostgres();
26+
setupDuckLake();
27+
System.out.println("Success");
28+
}
29+
30+
static void setupPostgres() throws Exception {
31+
System.out.println("Creating Postgres database ...");
32+
try (Connection conn = DriverManager.getConnection(POSTGRES_URL, POSTGRES_USERNAME, POSTGRES_PASSWORD);
33+
Statement stmt = conn.createStatement()) {
34+
stmt.execute("DROP DATABASE IF EXISTS " + DUCKLAKE_CATALOG_DB);
35+
stmt.execute("CREATE DATABASE " + DUCKLAKE_CATALOG_DB);
36+
}
37+
}
38+
39+
static void setupDuckLake() throws Exception {
40+
System.out.println("Creating DuckLake instance ...");
41+
try (Connection conn =
42+
DriverManager.getConnection("jdbc:duckdb:;session_init_sql_file=" + SESSION_INIT_SQL_FILE + ";");
43+
Statement stmt = conn.createStatement()) {
44+
stmt.execute("ATTACH '" + DUCKLAKE_URL + "' AS lake (DATA_PATH 's3://bucket1')");
45+
stmt.execute("USE lake");
46+
System.out.println("Loading data from URL: '" + PARQUET_FILE_URL + "' ...");
47+
stmt.execute("CREATE TABLE tab1 AS FROM '" + PARQUET_FILE_URL + "'");
48+
}
49+
try (Connection conn = DriverManager.getConnection("jdbc:duckdb:" + DUCKLAKE_URL +
50+
";session_init_sql_file=" + SESSION_INIT_SQL_FILE + ";");
51+
Statement stmt = conn.createStatement()) {
52+
try (ResultSet rs = stmt.executeQuery("SELECT count(*) FROM tab1")) {
53+
rs.next();
54+
System.out.println("Records loaded: " + rs.getLong(1));
55+
}
56+
}
57+
}
58+
59+
static String fromEnv(String envVarName, String defaultValue) {
60+
String env = System.getenv(envVarName);
61+
if (null != env) {
62+
return env;
63+
}
64+
return defaultValue;
65+
}
66+
}

src/test/external/SetupMinio.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import static java.lang.Integer.parseInt;
2+
import static java.nio.charset.StandardCharsets.UTF_8;
3+
import static java.nio.file.Files.readString;
4+
5+
import java.io.IOException;
6+
import java.net.Socket;
7+
import java.nio.file.Files;
8+
import java.nio.file.Path;
9+
import java.nio.file.Paths;
10+
import java.util.Comparator;
11+
12+
public class SetupMinio {
13+
14+
static final String MINIO_EXE_PATH = fromEnv("MINIO_EXE", "../minio/minio");
15+
static final String PID_PATH = fromEnv("MINIO_PID", "../minio/minio.pid");
16+
static final String MC_EXE_PATH = fromEnv("MC_EXE", "../minio/mc");
17+
static final String DATA_PATH = fromEnv("MINIO_DATA", "./build/data1");
18+
static final String MINIO_HOST = fromEnv("MINIO_HOST", "127.0.0.1");
19+
static final String MINIO_PORT = fromEnv("MINIO_PORT", "9000");
20+
21+
public static void main(String[] args) throws Exception {
22+
killMinioServer();
23+
deleteMinioData();
24+
setupMinio();
25+
}
26+
27+
static void killMinioServer() throws Exception {
28+
Path pidPath = Paths.get(PID_PATH);
29+
if (!Files.exists(pidPath)) {
30+
return;
31+
}
32+
long pid = Long.parseLong(readString(pidPath, UTF_8));
33+
System.out.println("Killing Minio server process, pid: " + pid + " ...");
34+
new ProcessBuilder("/usr/bin/kill", String.valueOf(pid)).inheritIO().start().waitFor();
35+
Files.delete(pidPath);
36+
}
37+
38+
static void deleteMinioData() throws Exception {
39+
Path minioDataPath = Paths.get(DATA_PATH);
40+
if (!Files.exists(minioDataPath)) {
41+
return;
42+
}
43+
System.out.println("Deleting Minio data: " + minioDataPath + " ...");
44+
Files.walk(minioDataPath).sorted(Comparator.reverseOrder()).forEach(p -> {
45+
try {
46+
Files.delete(p);
47+
} catch (IOException e) {
48+
throw new RuntimeException("Failed to delete " + p, e);
49+
}
50+
});
51+
}
52+
53+
static void setupMinio() throws Exception {
54+
System.out.println("Starting Minio server ...");
55+
Process minioServerProcess =
56+
new ProcessBuilder(MINIO_EXE_PATH, "server", "--address", MINIO_HOST + ":" + MINIO_PORT, DATA_PATH)
57+
.inheritIO()
58+
.start();
59+
Files.write(Paths.get(PID_PATH), String.valueOf(minioServerProcess.pid()).getBytes(UTF_8));
60+
boolean minioServerStarted = false;
61+
for (int i = 0; i < 16; i++) {
62+
try (Socket sock = new Socket(MINIO_HOST, parseInt(MINIO_PORT))) {
63+
minioServerStarted = true;
64+
break;
65+
} catch (IOException e) {
66+
Thread.sleep(1000);
67+
}
68+
}
69+
if (!minioServerStarted) {
70+
throw new RuntimeException("Cannot start Minio");
71+
}
72+
Thread.sleep(2000); // improve log output
73+
System.out.println("Minio server started, pid: " + minioServerProcess.pid() + ", creating bucket ...");
74+
int mcAliasStatus = new ProcessBuilder(MC_EXE_PATH, "alias", "set", "local",
75+
"http://" + MINIO_HOST + ":" + MINIO_PORT, "minioadmin", "minioadmin")
76+
.inheritIO()
77+
.start()
78+
.waitFor();
79+
if (mcAliasStatus != 0) {
80+
killMinioServer();
81+
throw new RuntimeException("Minio mc alias set error, status: " + mcAliasStatus);
82+
}
83+
int mcMbStatus = new ProcessBuilder(MC_EXE_PATH, "mb", "local/bucket1").inheritIO().start().waitFor();
84+
if (mcMbStatus != 0) {
85+
killMinioServer();
86+
throw new RuntimeException("Minio mc mb error, status: " + mcAliasStatus);
87+
}
88+
System.out.println("Minio server set up successfully");
89+
}
90+
91+
static String fromEnv(String envVarName, String defaultValue) {
92+
String env = System.getenv(envVarName);
93+
if (null != env) {
94+
return env;
95+
}
96+
return defaultValue;
97+
}
98+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
CREATE OR REPLACE TEMPORARY SECRET secret (
2+
TYPE s3,
3+
ENDPOINT '$ENV{MINIO_HOST}:$ENV{MINIO_PORT}',
4+
PROVIDER config,
5+
KEY_ID 'minioadmin',
6+
SECRET 'minioadmin',
7+
URL_STYLE 'path',
8+
USE_SSL false
9+
)

src/test/external/spark-test.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
CREATE OR REPLACE TEMPORARY VIEW tab1 USING jdbc OPTIONS (
2+
url "jdbc:duckdb:ducklake:postgres:postgresql://$ENV{POSTGRES_USERNAME}:$ENV{POSTGRES_PASSWORD}@$ENV{POSTGRES_HOST}:$ENV{POSTGRES_PORT}/$ENV{DUCKLAKE_CATALOG_DB};session_init_sql_file=$ENV{SESSION_INIT_SQL_FILE};",
3+
dbtable "tab1",
4+
5+
partitionColumn "pickup_at",
6+
lowerBound "2008-08-08 09:13:28",
7+
upperBound "2033-04-27 13:08:32",
8+
numPartitions "7"
9+
);
10+
11+
SELECT COUNT(*) FROM tab1;
12+
SELECT SUM(total_amount) FROM tab1;
13+
SELECT * FROM tab1 ORDER BY pickup_at LIMIT 4;

0 commit comments

Comments
 (0)