diff --git a/Makefile b/Makefile
index 2b33f8f4..0ee41ce6 100644
--- a/Makefile
+++ b/Makefile
@@ -20,6 +20,11 @@ MODULE := lance-spark-$(SPARK_VERSION)_$(SCALA_VERSION)
BUNDLE_MODULE := lance-spark-bundle-$(SPARK_VERSION)_$(SCALA_VERSION)
BASE_MODULE := lance-spark-base_$(SCALA_VERSION)
+# Spark download versions for Docker
+include docker/versions.mk
+SPARK_DOWNLOAD_VERSION := $(SPARK_DOWNLOAD_VERSION_$(SPARK_VERSION))
+PY4J_VERSION := $(PY4J_VERSION_$(SPARK_VERSION))
+
DOCKER_COMPOSE := $(shell \
if docker compose version >/dev/null 2>&1; then \
echo "docker compose"; \
@@ -95,9 +100,15 @@ clean:
.PHONY: docker-build
docker-build:
- $(MAKE) bundle SPARK_VERSION=3.5 SCALA_VERSION=2.12
- cp lance-spark-bundle-3.5_2.12/target/lance-spark-bundle-3.5_2.12-*.jar docker/
- cd docker && docker compose build --no-cache spark-lance
+ @ls $(BUNDLE_MODULE)/target/$(BUNDLE_MODULE)-*.jar >/dev/null 2>&1 || \
+ (echo "Error: Bundle jar not found. Run 'make bundle' first." && exit 1)
+ rm -f docker/lance-spark-bundle-*.jar
+ cp $(BUNDLE_MODULE)/target/$(BUNDLE_MODULE)-*.jar docker/
+ cd docker && $(DOCKER_COMPOSE) build --no-cache \
+ --build-arg SPARK_DOWNLOAD_VERSION=$(SPARK_DOWNLOAD_VERSION) \
+ --build-arg SPARK_MAJOR_VERSION=$(SPARK_VERSION) \
+ --build-arg SCALA_VERSION=$(SCALA_VERSION) \
+ spark-lance
.PHONY: docker-up
docker-up: check-docker-compose
@@ -111,6 +122,30 @@ docker-shell:
docker-down: check-docker-compose
cd docker && ${DOCKER_COMPOSE} down
+.PHONY: docker-build-minimal
+docker-build-minimal:
+ @ls $(BUNDLE_MODULE)/target/$(BUNDLE_MODULE)-*.jar >/dev/null 2>&1 || \
+ (echo "Error: Bundle jar not found. Run 'make bundle' first." && exit 1)
+ rm -f docker/lance-spark-bundle-*.jar
+ cp $(BUNDLE_MODULE)/target/$(BUNDLE_MODULE)-*.jar docker/
+ cd docker && docker build \
+ --build-arg SPARK_DOWNLOAD_VERSION=$(SPARK_DOWNLOAD_VERSION) \
+ --build-arg SPARK_MAJOR_VERSION=$(SPARK_VERSION) \
+ --build-arg SCALA_VERSION=$(SCALA_VERSION) \
+ --build-arg PY4J_VERSION=$(PY4J_VERSION) \
+ -f Dockerfile.minimal \
+ -t spark-lance-minimal:$(SPARK_VERSION)_$(SCALA_VERSION) \
+ .
+
+.PHONY: docker-test
+docker-test:
+ @docker image inspect spark-lance-minimal:$(SPARK_VERSION)_$(SCALA_VERSION) >/dev/null 2>&1 || \
+ (echo "Error: Docker image 'spark-lance-minimal:$(SPARK_VERSION)_$(SCALA_VERSION)' not found. Run 'make docker-build-minimal' first." && exit 1)
+ docker run --rm --hostname spark-lance \
+ -e SPARK_VERSION=$(SPARK_VERSION) \
+ spark-lance-minimal:$(SPARK_VERSION)_$(SCALA_VERSION) \
+ "pytest /home/lance/tests/ -v --timeout=120"
+
# =============================================================================
# Documentation
# =============================================================================
@@ -151,6 +186,7 @@ help:
@echo " docker-up - Start docker containers"
@echo " docker-shell - Open shell in spark-lance container"
@echo " docker-down - Stop docker containers"
+ @echo " docker-test - Run integration tests in spark-lance-minimal container"
@echo ""
@echo "Documentation:"
@echo " serve-docs - Serve documentation locally"
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 3da5293a..1ab7104a 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -27,14 +27,20 @@ RUN python3 -m venv /opt/venv && \
ENV PATH="/opt/venv/bin:$PATH"
+# Build arguments
+ARG SPARK_DOWNLOAD_VERSION=4.0.0
+ARG SPARK_MAJOR_VERSION=4.0
+ARG SCALA_VERSION=2.13
+
# Optional env variables
ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"}
ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH
WORKDIR ${SPARK_HOME}
-ENV SPARK_VERSION=3.5.8
-ENV SPARK_MAJOR_VERSION=3.5
+ENV SPARK_VERSION=${SPARK_DOWNLOAD_VERSION}
+ENV SPARK_MAJOR_VERSION=${SPARK_MAJOR_VERSION}
+ENV SCALA_VERSION=${SCALA_VERSION}
ENV LANCE_SPARK_VERSION=0.1.3-beta.8
ENV LANCE_NS_VERSION=0.4.5
@@ -53,7 +59,7 @@ RUN curl -L https://repo1.maven.org/maven2/com/lancedb/lance-namespace-glue/${LA
-o /opt/spark/jars/lance-namespace-glue-${LANCE_NS_VERSION}.jar
# For local testing, uncomment the lines below and comment out the Maven downloads above:
-#COPY lance-spark-bundle-${SPARK_MAJOR_VERSION}_2.12-${LANCE_SPARK_VERSION}.jar /opt/spark/jars/
+#COPY lance-spark-bundle-${SPARK_MAJOR_VERSION}_${SCALA_VERSION}-${LANCE_SPARK_VERSION}.jar /opt/spark/jars/
#COPY lance-namespace-glue-${LANCE_NS_VERSION}.jar /opt/spark/jars/
# Download OpenDAL native libraries for Linux architectures
@@ -82,6 +88,9 @@ RUN mkdir -p /home/lance/warehouse /home/lance/notebooks /home/lance/spark-event
# Copy notebooks if available
COPY notebooks/ /home/lance/notebooks/
+# Copy tests
+COPY tests/ /home/lance/tests/
+
# Add a notebook command
RUN echo '#! /bin/sh' >> /bin/notebook \
&& echo 'export PYSPARK_DRIVER_PYTHON=jupyter-lab' >> /bin/notebook \
diff --git a/docker/Dockerfile.minimal b/docker/Dockerfile.minimal
new file mode 100644
index 00000000..ce61f9c7
--- /dev/null
+++ b/docker/Dockerfile.minimal
@@ -0,0 +1,67 @@
+# syntax=docker/dockerfile:1
+#
+# Minimal image for running lance-spark integration tests.
+# Layers are ordered so that package installation is cached and only
+# the Spark download + bundle JAR copy run on each rebuild.
+
+FROM ubuntu:24.04
+
+# --- Cached layers: system packages and Python test deps ---
+RUN apt-get update && \
+ apt-get install -y --no-install-recommends \
+ curl \
+ openjdk-17-jdk-headless \
+ python3 \
+ python3-dev \
+ python3-venv \
+ python3-pip && \
+ apt-get clean && \
+ rm -rf /var/lib/apt/lists/*
+
+RUN python3 -m venv /opt/venv && \
+ . /opt/venv/bin/activate && \
+ pip install --upgrade pip && \
+ pip install pytest pytest-timeout packaging
+
+ENV PATH="/opt/venv/bin:$PATH"
+ENV SPARK_HOME="/opt/spark"
+ENV PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
+ENV PATH="${SPARK_HOME}/sbin:${SPARK_HOME}/bin:${PATH}"
+
+ARG SPARK_DOWNLOAD_VERSION=4.0.0
+ARG SPARK_MAJOR_VERSION=4.0
+ARG SCALA_VERSION=2.13
+ARG PY4J_VERSION=0.10.9.9
+
+ENV PYTHONPATH="${SPARK_HOME}/python/lib/py4j-${PY4J_VERSION}-src.zip:${PYTHONPATH}"
+
+RUN mkdir -p ${SPARK_HOME} \
+ && curl https://archive.apache.org/dist/spark/spark-${SPARK_DOWNLOAD_VERSION}/spark-${SPARK_DOWNLOAD_VERSION}-bin-hadoop3.tgz \
+ -o spark.tgz \
+ && tar xzf spark.tgz --directory ${SPARK_HOME} --strip-components 1 \
+ && rm spark.tgz
+
+# --- Uncached layers: Spark download and bundle JAR ---
+
+# Add a random query as a cache buster
+ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache
+
+COPY lance-spark-bundle-${SPARK_MAJOR_VERSION}_${SCALA_VERSION}-*.jar ${SPARK_HOME}/jars/
+
+# Spark configuration and entrypoint
+COPY spark-defaults.conf ${SPARK_HOME}/conf/
+RUN chmod u+x ${SPARK_HOME}/sbin/* && \
+ chmod u+x ${SPARK_HOME}/bin/*
+
+# Create directories for Spark events and test data
+RUN mkdir -p /home/lance/warehouse /home/lance/spark-events /home/lance/data
+
+# Copy tests
+RUN mkdir -p /home/lance/tests
+COPY tests/ /home/lance/tests/
+
+WORKDIR ${SPARK_HOME}
+COPY entrypoint.sh .
+
+ENTRYPOINT ["./entrypoint.sh"]
+CMD ["tail", "-f", "/dev/null"]
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 152d95e2..8c0c9302 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -3,12 +3,9 @@ services:
spark-lance:
image: spark-lance:latest
container_name: spark-lance
- build:
+ build:
context: .
dockerfile: Dockerfile
- args:
- - SPARK_VERSION=3.5.6
- - LANCE_VERSION=0.0.5
networks:
lance_net:
depends_on:
@@ -16,7 +13,6 @@ services:
volumes:
- ./warehouse:/home/lance/warehouse
- ./notebooks:/home/lance/notebooks
- - ../lance-spark-bundle-3.5_2.12/target:/lance-jars
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
@@ -66,4 +62,4 @@ services:
"
networks:
- lance_net:
\ No newline at end of file
+ lance_net:
diff --git a/docker/requirements.txt b/docker/requirements.txt
index 240e41a8..46965f77 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -1,3 +1,5 @@
+pytest==8.3.5
+pytest-timeout==2.3.1
jupyterlab==3.6.7
jupyter-server==1.24.0
pandas==2.3.1
diff --git a/docker/tests/test_lance_spark.py b/docker/tests/test_lance_spark.py
new file mode 100644
index 00000000..23855748
--- /dev/null
+++ b/docker/tests/test_lance_spark.py
@@ -0,0 +1,1198 @@
+"""
+Automated integration tests for Lance-Spark.
+
+These tests run inside the Docker container against a real Spark + MinIO environment.
+
+Test organization follows the Lance documentation structure:
+- DDL (Data Definition Language): Namespace, Table, Index, Optimize, Vacuum operations
+- DQL (Data Query Language): SELECT queries and data retrieval
+- DML (Data Manipulation Language): INSERT, UPDATE, DELETE, MERGE, ADD COLUMN operations
+"""
+
+import os
+import time
+import pytest
+from packaging.version import Version
+from pyspark.sql import SparkSession
+
+SPARK_VERSION = Version(os.environ.get("SPARK_VERSION", "3.5"))
+
+
+@pytest.fixture(scope="module")
+def spark():
+ """Create a Spark session configured with Lance catalog."""
+ session = (
+ SparkSession.builder
+ .appName("LanceSparkTests")
+ .config("spark.sql.catalog.lance", "org.lance.spark.LanceNamespaceSparkCatalog")
+ .config("spark.sql.catalog.lance.impl", "dir")
+ .config("spark.sql.catalog.lance.root", "/home/lance/data")
+ .config("spark.sql.extensions", "org.lance.spark.extensions.LanceSparkSessionExtensions")
+ .getOrCreate()
+ )
+ session.sql("SET spark.sql.defaultCatalog=lance")
+ yield session
+ session.stop()
+
+
+def drop_table(spark, table_name):
+ """Drop a table, using PURGE only on Spark >= 3.5."""
+ purge = " PURGE" if SPARK_VERSION >= Version("3.5") else ""
+ spark.sql(f"DROP TABLE IF EXISTS {table_name}{purge}")
+
+
+@pytest.fixture(autouse=True)
+def cleanup_tables(spark):
+ """Clean up test tables before and after each test."""
+ drop_table(spark, "default.test_table")
+ drop_table(spark, "default.employees")
+ # TODO - reenable once `tableExists` works on Spark 4.0
+ #spark.catalog.dropTempView("source") if spark.catalog.tableExists("source") else None
+ #spark.catalog.dropTempView("tmp_view") if spark.catalog.tableExists("tmp_view") else None
+ spark.catalog.dropTempView("source")
+ spark.catalog.dropTempView("tmp_view")
+ yield
+ drop_table(spark, "default.test_table")
+ drop_table(spark, "default.employees")
+ # TODO - reenable once `tableExists` works on Spark 4.0
+ #spark.catalog.dropTempView("source") if spark.catalog.tableExists("source") else None
+ #spark.catalog.dropTempView("tmp_view") if spark.catalog.tableExists("tmp_view") else None
+ spark.catalog.dropTempView("source")
+ spark.catalog.dropTempView("tmp_view")
+
+
+# =============================================================================
+# DDL (Data Definition Language) Tests
+# =============================================================================
+
+class TestDDLNamespace:
+ """Test DDL namespace operations: CREATE, DROP, SHOW, DESCRIBE NAMESPACE."""
+
+ def test_show_catalogs(self, spark):
+ """Verify Lance catalog is registered."""
+ catalogs = spark.sql("SHOW CATALOGS").collect()
+ catalog_names = [row[0] for row in catalogs]
+ assert "lance" in catalog_names
+
+ def test_create_namespace(self, spark):
+ """Test CREATE NAMESPACE."""
+ spark.sql("CREATE NAMESPACE IF NOT EXISTS default")
+ namespaces = spark.sql("SHOW NAMESPACES").collect()
+ namespace_names = [row[0] for row in namespaces]
+ assert "default" in namespace_names
+
+ def test_show_namespaces(self, spark):
+ """Test SHOW NAMESPACES."""
+ spark.sql("CREATE NAMESPACE IF NOT EXISTS default")
+ namespaces = spark.sql("SHOW NAMESPACES").collect()
+ assert len(namespaces) >= 1
+ namespace_names = [row[0] for row in namespaces]
+ assert "default" in namespace_names
+
+
+class TestDDLTable:
+ """Test DDL table operations: CREATE, SHOW, DESCRIBE, DROP TABLE."""
+
+ def test_create_table(self, spark):
+ """Test CREATE TABLE."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ )
+ """)
+
+ tables = spark.sql("SHOW TABLES IN default").collect()
+ table_names = [row.tableName for row in tables]
+ assert "test_table" in table_names
+
+ def test_show_tables(self, spark):
+ """Test SHOW TABLES."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING
+ )
+ """)
+
+ tables = spark.sql("SHOW TABLES IN default").collect()
+ assert len(tables) >= 1
+ table_names = [row.tableName for row in tables]
+ assert "test_table" in table_names
+
+ def test_describe_table(self, spark):
+ """Test DESCRIBE TABLE."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ )
+ """)
+
+ schema = spark.sql("DESCRIBE TABLE default.test_table").collect()
+ col_names = [row.col_name for row in schema if row.col_name and not row.col_name.startswith("#")]
+ assert "id" in col_names
+ assert "name" in col_names
+ assert "value" in col_names
+
+ def test_drop_table(self, spark):
+ """Test DROP TABLE."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING
+ )
+ """)
+
+ drop_table(spark, "default.test_table")
+
+ tables = spark.sql("SHOW TABLES IN default").collect()
+ table_names = [row.tableName for row in tables]
+ assert "test_table" not in table_names
+
+
+class TestDDLIndex:
+ """Test DDL index operations: CREATE INDEX (BTree, FTS)."""
+
+ def test_create_btree_index_on_int(self, spark):
+ """Test CREATE INDEX with BTree on integer column."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ )
+ """)
+
+ # Insert data first (index requires data)
+ data = [(i, f"Name{i}", float(i * 10)) for i in range(100)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ # Create BTree index on id column
+ result = spark.sql("""
+ ALTER TABLE default.test_table
+ CREATE INDEX idx_id USING btree (id)
+ """).collect()
+
+ # Verify index was created (returns fragment count and index name)
+ assert len(result) == 1
+ assert result[0][1] == "idx_id"
+
+ # Verify queries still work after indexing
+ query_result = spark.sql("""
+ SELECT * FROM default.test_table WHERE id = 50
+ """).collect()
+ assert len(query_result) == 1
+ assert query_result[0].id == 50
+
+ def test_create_btree_index_on_string(self, spark):
+ """Test CREATE INDEX with BTree on string column."""
+ spark.sql("""
+ CREATE TABLE default.employees (
+ id INT,
+ name STRING,
+ department STRING,
+ salary INT
+ )
+ """)
+
+ data = [
+ (1, "Alice", "Engineering", 75000),
+ (2, "Bob", "Marketing", 65000),
+ (3, "Charlie", "Engineering", 70000),
+ (4, "Diana", "Sales", 80000),
+ (5, "Eve", "Engineering", 60000),
+ ]
+ df = spark.createDataFrame(data, ["id", "name", "department", "salary"])
+ df.writeTo("default.employees").append()
+
+ # Create BTree index on department column
+ result = spark.sql("""
+ ALTER TABLE default.employees
+ CREATE INDEX idx_dept USING btree (department)
+ """).collect()
+
+ assert len(result) == 1
+ assert result[0][1] == "idx_dept"
+
+ # Query using the indexed column
+ query_result = spark.sql("""
+ SELECT * FROM default.employees WHERE department = 'Engineering'
+ """).collect()
+ assert len(query_result) == 3
+
+ def test_create_fts_index(self, spark):
+ """Test CREATE INDEX with full-text search (FTS)."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ title STRING,
+ content STRING
+ )
+ """)
+
+ data = [
+ (1, "Introduction to Python", "Python is a programming language"),
+ (2, "Java Basics", "Java is an object-oriented language"),
+ (3, "Python Advanced", "Advanced Python topics like decorators"),
+ (4, "Database Design", "SQL and database normalization"),
+ (5, "Web Development", "Building web applications with Python"),
+ ]
+ df = spark.createDataFrame(data, ["id", "title", "content"])
+ df.writeTo("default.test_table").append()
+
+ # Create FTS index on content column
+ result = spark.sql("""
+ ALTER TABLE default.test_table
+ CREATE INDEX idx_content_fts USING fts (content)
+ WITH ( base_tokenizer = 'simple', language = 'English' )
+ """).collect()
+
+ assert len(result) == 1
+ assert result[0][1] == "idx_content_fts"
+
+ def test_create_index_empty_table(self, spark):
+ """Test CREATE INDEX on empty table."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING
+ )
+ """)
+
+ # Creating index on empty table should return 0 fragments indexed
+ result = spark.sql("""
+ ALTER TABLE default.test_table
+ CREATE INDEX idx_id USING btree (id)
+ """).collect()
+
+ # Should return with 0 fragments indexed
+ assert result[0][0] == 0
+
+
+class TestDDLOptimize:
+ """Test DDL OPTIMIZE operations for compacting table fragments."""
+
+ def test_optimize_without_args(self, spark):
+ """Test OPTIMIZE without options."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ # Insert data in multiple batches to create multiple fragments
+ for batch in range(5):
+ data = [(batch * 10 + i, f"Name{batch * 10 + i}", batch * 10 + i) for i in range(10)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ # Run OPTIMIZE
+ result = spark.sql("OPTIMIZE default.test_table").collect()
+
+ # Verify output schema and that compaction occurred
+ assert len(result) == 1
+ row = result[0]
+ # OPTIMIZE returns: fragments_removed, fragments_added, files_removed, files_added
+ assert row.fragments_removed >= 0
+ assert row.fragments_added >= 0
+ assert row.files_removed >= 0
+ assert row.files_added >= 0
+
+ # Verify data integrity after optimization
+ count = spark.table("default.test_table").count()
+ assert count == 50 # 5 batches * 10 rows
+
+ def test_optimize_with_target_rows(self, spark):
+ """Test OPTIMIZE with target_rows_per_fragment option."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ # Insert data in multiple small batches
+ for batch in range(5):
+ data = [(batch * 10 + i, f"Name{batch * 10 + i}", batch * 10 + i) for i in range(10)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ # Optimize with target rows per fragment
+ result = spark.sql("""
+ OPTIMIZE default.test_table WITH (target_rows_per_fragment = 100)
+ """).collect()
+
+ assert len(result) == 1
+ row = result[0]
+ assert row.fragments_removed >= 0
+ assert row.fragments_added >= 0
+
+ # Verify data integrity
+ count = spark.table("default.test_table").count()
+ assert count == 50
+
+ def test_optimize_with_multiple_options(self, spark):
+ """Test OPTIMIZE with multiple options."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ # Insert data
+ for batch in range(3):
+ data = [(batch * 20 + i, f"Name{batch * 20 + i}", batch * 20 + i) for i in range(20)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ # Optimize with multiple options
+ result = spark.sql("""
+ OPTIMIZE default.test_table WITH (
+ target_rows_per_fragment = 100,
+ num_threads = 2,
+ materialize_deletions = true
+ )
+ """).collect()
+
+ assert len(result) == 1
+ row = result[0]
+ assert row.fragments_removed >= 0
+ assert row.fragments_added >= 0
+
+ # Verify data integrity
+ count = spark.table("default.test_table").count()
+ assert count == 60 # 3 batches * 20 rows
+
+ def test_optimize_after_deletes(self, spark):
+ """Test OPTIMIZE after DELETE to materialize soft deletes."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ # Insert data
+ data = [(i, f"Name{i}", i) for i in range(100)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ # Delete some rows
+ spark.sql("DELETE FROM default.test_table WHERE id < 20")
+
+ # Optimize to materialize deletions
+ result = spark.sql("""
+ OPTIMIZE default.test_table WITH (materialize_deletions = true)
+ """).collect()
+
+ assert len(result) == 1
+
+ # Verify correct row count after optimization
+ count = spark.table("default.test_table").count()
+ assert count == 80 # 100 - 20 deleted
+
+ def test_optimize_empty_table(self, spark):
+ """Test OPTIMIZE on empty table."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING
+ )
+ """)
+
+ # Optimize empty table should succeed without error
+ result = spark.sql("OPTIMIZE default.test_table").collect()
+
+ assert len(result) == 1
+ row = result[0]
+ # Empty table should have 0 fragments to compact
+ assert row.fragments_removed == 0
+ assert row.fragments_added == 0
+
+
+class TestDDLVacuum:
+ """Test DDL VACUUM operations for removing old versions."""
+
+ def test_vacuum_without_args(self, spark):
+ """Test VACUUM without options."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ # Insert data to create a version
+ data = [(i, f"Name{i}", i) for i in range(10)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ # Run VACUUM
+ result = spark.sql("VACUUM default.test_table").collect()
+
+ # Verify output schema
+ assert len(result) == 1
+ row = result[0]
+ # VACUUM returns: bytes_removed, old_versions
+ assert row.bytes_removed >= 0
+ assert row.old_versions >= 0
+
+ def test_vacuum_with_before_version(self, spark):
+ """Test VACUUM with before_version option."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ # Create multiple versions by inserting data multiple times
+ for version in range(5):
+ data = [(version * 10 + i, f"Name{version * 10 + i}", version * 10 + i) for i in range(10)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ # Vacuum with before_version to remove old versions
+ result = spark.sql("""
+ VACUUM default.test_table WITH (before_version = 1000000)
+ """).collect()
+
+ assert len(result) == 1
+ row = result[0]
+ # With multiple versions, we expect some cleanup
+ assert row.bytes_removed >= 0
+ assert row.old_versions >= 0
+
+ # Verify data is still accessible
+ count = spark.table("default.test_table").count()
+ assert count == 50 # 5 versions * 10 rows
+
+ def test_vacuum_with_timestamp(self, spark):
+ """Test VACUUM with before_timestamp_millis option."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ # Insert initial data
+ data = [(i, f"Name{i}", i) for i in range(10)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ # Small delay to separate versions
+ time.sleep(0.1)
+ before_ts = int(time.time() * 1000)
+
+ # Insert more data to create newer versions
+ for version in range(3):
+ data = [(100 + version * 10 + i, f"NewName{version * 10 + i}", version * 10 + i) for i in range(10)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ # Vacuum versions older than the timestamp
+ result = spark.sql(f"""
+ VACUUM default.test_table WITH (before_timestamp_millis = {before_ts})
+ """).collect()
+
+ assert len(result) == 1
+ row = result[0]
+ assert row.bytes_removed >= 0
+ assert row.old_versions >= 0
+
+ # Verify current data is accessible
+ count = spark.table("default.test_table").count()
+ assert count == 40 # 10 initial + 3 versions * 10 rows
+
+ def test_vacuum_after_optimize(self, spark):
+ """Test VACUUM after OPTIMIZE to clean up removed fragments."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ # Create fragmented data
+ for batch in range(5):
+ data = [(batch * 10 + i, f"Name{batch * 10 + i}", batch * 10 + i) for i in range(10)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ # Optimize to compact fragments
+ spark.sql("""
+ OPTIMIZE default.test_table WITH (target_rows_per_fragment = 100)
+ """)
+
+ # Vacuum to remove old fragment files
+ result = spark.sql("""
+ VACUUM default.test_table WITH (before_version = 1000000)
+ """).collect()
+
+ assert len(result) == 1
+ row = result[0]
+ # After optimize, vacuum should clean up the old fragments
+ assert row.bytes_removed >= 0
+ assert row.old_versions >= 0
+
+ # Verify data integrity
+ count = spark.table("default.test_table").count()
+ assert count == 50
+
+ def test_vacuum_preserves_current_data(self, spark):
+ """Test VACUUM preserves all current data."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ # Insert and update data multiple times
+ spark.sql("""
+ INSERT INTO default.test_table VALUES
+ (1, 'Alice', 100),
+ (2, 'Bob', 200),
+ (3, 'Charlie', 300)
+ """)
+
+ # Update to create new versions
+ spark.sql("UPDATE default.test_table SET value = 150 WHERE id = 1")
+ spark.sql("UPDATE default.test_table SET value = 250 WHERE id = 2")
+
+ # Delete and re-insert
+ spark.sql("DELETE FROM default.test_table WHERE id = 3")
+ spark.sql("INSERT INTO default.test_table VALUES (3, 'Charlie', 350)")
+
+ # Vacuum
+ result = spark.sql("""
+ VACUUM default.test_table WITH (before_version = 1000000)
+ """).collect()
+
+ assert len(result) == 1
+
+ # Verify all current data is preserved
+ rows = spark.table("default.test_table").orderBy("id").collect()
+ assert len(rows) == 3
+ assert rows[0].id == 1 and rows[0].value == 150
+ assert rows[1].id == 2 and rows[1].value == 250
+ assert rows[2].id == 3 and rows[2].value == 350
+
+
+# =============================================================================
+# DQL (Data Query Language) Tests
+# =============================================================================
+
+class TestDQLSelect:
+ """Test DQL SELECT operations."""
+
+ def test_select_all(self, spark):
+ """Test SELECT * query."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ )
+ """)
+
+ data = [(1, "Alice", 10.5), (2, "Bob", 20.3), (3, "Charlie", 30.1)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ result = spark.sql("SELECT * FROM default.test_table").collect()
+ assert len(result) == 3
+
+ ids = sorted([row.id for row in result])
+ assert ids == [1, 2, 3]
+
+ def test_select_with_where(self, spark):
+ """Test SELECT with WHERE clause."""
+ spark.sql("""
+ CREATE TABLE default.employees (
+ id INT,
+ name STRING,
+ age INT,
+ department STRING,
+ salary INT
+ )
+ """)
+
+ data = [
+ (1, "Alice", 25, "Engineering", 75000),
+ (2, "Bob", 30, "Marketing", 65000),
+ (3, "Charlie", 35, "Sales", 70000),
+ (4, "Diana", 28, "Engineering", 80000),
+ (5, "Eve", 32, "HR", 60000),
+ ]
+ df = spark.createDataFrame(data, ["id", "name", "age", "department", "salary"])
+ df.writeTo("default.employees").append()
+
+ result = spark.sql("""
+ SELECT * FROM default.employees
+ WHERE department = 'Engineering'
+ """).collect()
+
+ assert len(result) == 2
+ names = sorted([row.name for row in result])
+ assert names == ["Alice", "Diana"]
+
+ def test_select_with_group_by(self, spark):
+ """Test SELECT with GROUP BY aggregation."""
+ spark.sql("""
+ CREATE TABLE default.employees (
+ id INT,
+ name STRING,
+ age INT,
+ department STRING,
+ salary INT
+ )
+ """)
+
+ data = [
+ (1, "Alice", 25, "Engineering", 75000),
+ (2, "Bob", 30, "Marketing", 65000),
+ (3, "Charlie", 35, "Sales", 70000),
+ (4, "Diana", 28, "Engineering", 80000),
+ (5, "Eve", 32, "HR", 60000),
+ ]
+ df = spark.createDataFrame(data, ["id", "name", "age", "department", "salary"])
+ df.writeTo("default.employees").append()
+
+ result = spark.sql("""
+ SELECT department, COUNT(*) as count, AVG(salary) as avg_salary
+ FROM default.employees
+ GROUP BY department
+ ORDER BY count DESC
+ """).collect()
+
+ eng_row = [row for row in result if row.department == "Engineering"][0]
+ assert eng_row["count"] == 2
+ assert eng_row.avg_salary == 77500.0
+
+ def test_select_with_order_by(self, spark):
+ """Test SELECT with ORDER BY clause."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ )
+ """)
+
+ data = [(3, "Charlie", 30.1), (1, "Alice", 10.5), (2, "Bob", 20.3)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ result = spark.sql("""
+ SELECT * FROM default.test_table ORDER BY id ASC
+ """).collect()
+
+ ids = [row.id for row in result]
+ assert ids == [1, 2, 3]
+
+ def test_select_with_limit(self, spark):
+ """Test SELECT with LIMIT clause."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ )
+ """)
+
+ data = [(i, f"Name{i}", float(i)) for i in range(10)]
+ df = spark.createDataFrame(data, ["id", "name", "value"])
+ df.writeTo("default.test_table").append()
+
+ result = spark.sql("""
+ SELECT * FROM default.test_table LIMIT 5
+ """).collect()
+
+ assert len(result) == 5
+
+ def test_select_data_types(self, spark):
+ """Test SELECT with various data types: INT, BIGINT, FLOAT, DOUBLE, STRING, BOOLEAN."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ int_col INT,
+ long_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ string_col STRING,
+ bool_col BOOLEAN
+ )
+ """)
+
+ data = [(1, 100000000000, 1.5, 2.5, "test", True)]
+ df = spark.createDataFrame(
+ data,
+ ["int_col", "long_col", "float_col", "double_col", "string_col", "bool_col"]
+ )
+ df.writeTo("default.test_table").append()
+
+ result = spark.table("default.test_table").collect()[0]
+ assert result.int_col == 1
+ assert result.long_col == 100000000000
+ assert abs(result.float_col - 1.5) < 0.01
+ assert result.double_col == 2.5
+ assert result.string_col == "test"
+ assert result.bool_col is True
+
+
+# =============================================================================
+# DML (Data Manipulation Language) Tests
+# =============================================================================
+
+class TestDMLInsert:
+ """Test DML INSERT INTO operations."""
+
+ def test_insert_into_values(self, spark):
+ """Test INSERT INTO with VALUES clause."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ )
+ """)
+
+ # Insert using VALUES
+ spark.sql("""
+ INSERT INTO default.test_table VALUES
+ (1, 'Alice', 10.5),
+ (2, 'Bob', 20.3),
+ (3, 'Charlie', 30.1)
+ """)
+
+ result = spark.table("default.test_table").collect()
+ assert len(result) == 3
+
+ ids = sorted([row.id for row in result])
+ assert ids == [1, 2, 3]
+
+ def test_insert_into_select(self, spark):
+ """Test INSERT INTO with SELECT clause."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ )
+ """)
+
+ # Insert initial data
+ spark.sql("""
+ INSERT INTO default.test_table VALUES
+ (1, 'Alice', 10.5),
+ (2, 'Bob', 20.3)
+ """)
+
+ # Insert from select (duplicate the data with modified ids)
+ spark.sql("""
+ INSERT INTO default.test_table
+ SELECT id + 10, name, value * 2
+ FROM default.test_table
+ """)
+
+ result = spark.table("default.test_table").collect()
+ assert len(result) == 4
+
+ ids = sorted([row.id for row in result])
+ assert ids == [1, 2, 11, 12]
+
+ def test_insert_append_data(self, spark):
+ """Test INSERT by appending data with DataFrame API."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ )
+ """)
+
+ data1 = [(1, "Alice", 10.5), (2, "Bob", 20.3)]
+ df1 = spark.createDataFrame(data1, ["id", "name", "value"])
+ df1.writeTo("default.test_table").append()
+
+ data2 = [(3, "Charlie", 30.1), (4, "Diana", 40.2)]
+ df2 = spark.createDataFrame(data2, ["id", "name", "value"])
+ df2.writeTo("default.test_table").append()
+
+ count = spark.table("default.test_table").count()
+ assert count == 4
+
+
+class TestDMLUpdate:
+ """Test DML UPDATE SET operations."""
+
+ def test_update_single_column(self, spark):
+ """Test UPDATE SET single column."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ spark.sql("""
+ INSERT INTO default.test_table VALUES
+ (1, 'Alice', 10),
+ (2, 'Bob', 20),
+ (3, 'Charlie', 30)
+ """)
+
+ # Update value for specific id
+ spark.sql("""
+ UPDATE default.test_table SET value = 100 WHERE id = 2
+ """)
+
+ result = spark.sql("""
+ SELECT * FROM default.test_table WHERE id = 2
+ """).collect()
+
+ assert len(result) == 1
+ assert result[0].value == 100
+
+ def test_update_multiple_rows(self, spark):
+ """Test UPDATE SET affecting multiple rows."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ spark.sql("""
+ INSERT INTO default.test_table VALUES
+ (1, 'Alice', 10),
+ (2, 'Bob', 20),
+ (3, 'Charlie', 30),
+ (4, 'Diana', 40)
+ """)
+
+ # Update all rows where value < 30
+ spark.sql("""
+ UPDATE default.test_table SET value = value + 100 WHERE value < 30
+ """)
+
+ result = spark.table("default.test_table").orderBy("id").collect()
+
+ assert result[0].value == 110 # id=1: 10 + 100
+ assert result[1].value == 120 # id=2: 20 + 100
+ assert result[2].value == 30 # id=3: unchanged
+ assert result[3].value == 40 # id=4: unchanged
+
+
+class TestDMLDelete:
+ """Test DML DELETE FROM operations."""
+
+ def test_delete_with_condition(self, spark):
+ """Test DELETE FROM with WHERE clause."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value DOUBLE
+ )
+ """)
+
+ spark.sql("""
+ INSERT INTO default.test_table VALUES
+ (1, 'Alice', 10.5),
+ (2, 'Bob', 20.3),
+ (3, 'Charlie', 30.1),
+ (4, 'Diana', 40.2),
+ (5, 'Eve', 50.0)
+ """)
+
+ # Delete rows where id > 3
+ spark.sql("""
+ DELETE FROM default.test_table WHERE id > 3
+ """)
+
+ result = spark.table("default.test_table").collect()
+ assert len(result) == 3
+
+ ids = sorted([row.id for row in result])
+ assert ids == [1, 2, 3]
+
+ def test_delete_with_string_condition(self, spark):
+ """Test DELETE FROM with string column condition."""
+ spark.sql("""
+ CREATE TABLE default.employees (
+ id INT,
+ name STRING,
+ department STRING,
+ salary INT
+ )
+ """)
+
+ spark.sql("""
+ INSERT INTO default.employees VALUES
+ (1, 'Alice', 'Engineering', 75000),
+ (2, 'Bob', 'Marketing', 65000),
+ (3, 'Charlie', 'Engineering', 70000),
+ (4, 'Diana', 'Sales', 80000)
+ """)
+
+ # Delete all Marketing employees
+ spark.sql("""
+ DELETE FROM default.employees WHERE department = 'Marketing'
+ """)
+
+ result = spark.table("default.employees").collect()
+ assert len(result) == 3
+
+ departments = [row.department for row in result]
+ assert "Marketing" not in departments
+
+
+class TestDMLMerge:
+ """Test DML MERGE INTO operations."""
+
+ def test_merge_into(self, spark):
+ """Test MERGE INTO with WHEN MATCHED and WHEN NOT MATCHED."""
+ # Create target table
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ spark.sql("""
+ INSERT INTO default.test_table VALUES
+ (1, 'Alice', 10),
+ (2, 'Bob', 20),
+ (3, 'Charlie', 30)
+ """)
+
+ # Create source data as temp view
+ source_data = [(2, "Bob_Updated", 200), (4, "Diana", 40)]
+ source_df = spark.createDataFrame(source_data, ["id", "name", "value"])
+ source_df.createOrReplaceTempView("source")
+
+ # Merge: update matching rows, insert new rows
+ spark.sql("""
+ MERGE INTO default.test_table t
+ USING source s
+ ON t.id = s.id
+ WHEN MATCHED THEN UPDATE SET name = s.name, value = s.value
+ WHEN NOT MATCHED THEN INSERT (id, name, value) VALUES (s.id, s.name, s.value)
+ """)
+
+ result = spark.table("default.test_table").orderBy("id").collect()
+
+ assert len(result) == 4
+
+ # Check updated row
+ bob_row = [r for r in result if r.id == 2][0]
+ assert bob_row.name == "Bob_Updated"
+ assert bob_row.value == 200
+
+ # Check inserted row
+ diana_row = [r for r in result if r.id == 4][0]
+ assert diana_row.name == "Diana"
+ assert diana_row.value == 40
+
+
+class TestDMLAddColumn:
+ """Test DML ADD COLUMN FROM operations for schema evolution with backfill."""
+
+ def test_add_column_from_view(self, spark):
+ """Test ALTER TABLE ADD COLUMNS FROM with single column."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ spark.sql("""
+ INSERT INTO default.test_table VALUES
+ (1, 'Alice', 100),
+ (2, 'Bob', 200),
+ (3, 'Charlie', 300)
+ """)
+
+ # Create temp view with _rowaddr, _fragid, and new column
+ spark.sql("""
+ CREATE TEMPORARY VIEW tmp_view AS
+ SELECT _rowaddr, _fragid, value * 2 as doubled_value
+ FROM default.test_table
+ """)
+
+ # Add column from the view
+ spark.sql("""
+ ALTER TABLE default.test_table ADD COLUMNS doubled_value FROM tmp_view
+ """)
+
+ # Verify the new column was added with correct values
+ result = spark.sql("""
+ SELECT id, name, value, doubled_value
+ FROM default.test_table
+ ORDER BY id
+ """).collect()
+
+ assert len(result) == 3
+ assert result[0].doubled_value == 200 # 100 * 2
+ assert result[1].doubled_value == 400 # 200 * 2
+ assert result[2].doubled_value == 600 # 300 * 2
+
+ def test_add_multiple_columns(self, spark):
+ """Test ALTER TABLE ADD COLUMNS FROM with multiple columns."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ spark.sql("""
+ INSERT INTO default.test_table VALUES
+ (1, 'Alice', 100),
+ (2, 'Bob', 200),
+ (3, 'Charlie', 300)
+ """)
+
+ # Create temp view with multiple new columns
+ spark.sql("""
+ CREATE TEMPORARY VIEW tmp_view AS
+ SELECT _rowaddr, _fragid,
+ value * 2 as doubled,
+ value + 50 as plus_fifty,
+ CONCAT(name, '_suffix') as name_with_suffix
+ FROM default.test_table
+ """)
+
+ # Add multiple columns
+ spark.sql("""
+ ALTER TABLE default.test_table ADD COLUMNS doubled, plus_fifty, name_with_suffix FROM tmp_view
+ """)
+
+ # Verify all columns were added
+ result = spark.sql("""
+ SELECT id, doubled, plus_fifty, name_with_suffix
+ FROM default.test_table
+ ORDER BY id
+ """).collect()
+
+ assert len(result) == 3
+ assert result[0].doubled == 200
+ assert result[0].plus_fifty == 150
+ assert result[0].name_with_suffix == "Alice_suffix"
+ assert result[1].doubled == 400
+ assert result[1].plus_fifty == 250
+ assert result[1].name_with_suffix == "Bob_suffix"
+
+ def test_add_column_partial_rows(self, spark):
+ """Test ADD COLUMNS FROM with data for only some rows (others get null)."""
+ spark.sql("""
+ CREATE TABLE default.test_table (
+ id INT,
+ name STRING,
+ value INT
+ )
+ """)
+
+ spark.sql("""
+ INSERT INTO default.test_table VALUES
+ (1, 'Alice', 100),
+ (2, 'Bob', 200),
+ (3, 'Charlie', 300),
+ (4, 'Diana', 400),
+ (5, 'Eve', 500)
+ """)
+
+ # Create temp view with data for only some rows
+ spark.sql("""
+ CREATE TEMPORARY VIEW tmp_view AS
+ SELECT _rowaddr, _fragid, CONCAT('special_', name) as special_name
+ FROM default.test_table
+ WHERE id IN (1, 3, 5)
+ """)
+
+ # Add column - rows not in view should get null
+ spark.sql("""
+ ALTER TABLE default.test_table ADD COLUMNS special_name FROM tmp_view
+ """)
+
+ result = spark.sql("""
+ SELECT id, special_name
+ FROM default.test_table
+ ORDER BY id
+ """).collect()
+
+ assert len(result) == 5
+ assert result[0].special_name == "special_Alice"
+ assert result[1].special_name is None # id=2 not in view
+ assert result[2].special_name == "special_Charlie"
+ assert result[3].special_name is None # id=4 not in view
+ assert result[4].special_name == "special_Eve"
+
+ def test_add_column_computed_values(self, spark):
+ """Test ADD COLUMNS FROM with computed/derived values."""
+ spark.sql("""
+ CREATE TABLE default.employees (
+ id INT,
+ name STRING,
+ salary INT,
+ bonus_percent INT
+ )
+ """)
+
+ spark.sql("""
+ INSERT INTO default.employees VALUES
+ (1, 'Alice', 50000, 10),
+ (2, 'Bob', 60000, 15),
+ (3, 'Charlie', 70000, 20)
+ """)
+
+ # Compute total compensation as a new column
+ spark.sql("""
+ CREATE TEMPORARY VIEW tmp_view AS
+ SELECT _rowaddr, _fragid,
+ salary + (salary * bonus_percent / 100) as total_compensation
+ FROM default.employees
+ """)
+
+ spark.sql("""
+ ALTER TABLE default.employees ADD COLUMNS total_compensation FROM tmp_view
+ """)
+
+ result = spark.sql("""
+ SELECT id, name, salary, bonus_percent, total_compensation
+ FROM default.employees
+ ORDER BY id
+ """).collect()
+
+ assert len(result) == 3
+ assert result[0].total_compensation == 55000 # 50000 + 5000
+ assert result[1].total_compensation == 69000 # 60000 + 9000
+ assert result[2].total_compensation == 84000 # 70000 + 14000
+
+
+if __name__ == "__main__":
+ pytest.main([__file__, "-v"])
diff --git a/docker/versions.mk b/docker/versions.mk
new file mode 100644
index 00000000..385b472d
--- /dev/null
+++ b/docker/versions.mk
@@ -0,0 +1,9 @@
+# Full Spark download versions for Docker images
+SPARK_DOWNLOAD_VERSION_3.4 := 3.4.4
+SPARK_DOWNLOAD_VERSION_3.5 := 3.5.8
+SPARK_DOWNLOAD_VERSION_4.0 := 4.0.2
+
+# Py4J versions bundled with each Spark release
+PY4J_VERSION_3.4 := 0.10.9.7
+PY4J_VERSION_3.5 := 0.10.9.7
+PY4J_VERSION_4.0 := 0.10.9.9
diff --git a/lance-spark-3.4_2.12/pom.xml b/lance-spark-3.4_2.12/pom.xml
index f88c87a2..ab5ea4f5 100644
--- a/lance-spark-3.4_2.12/pom.xml
+++ b/lance-spark-3.4_2.12/pom.xml
@@ -15,8 +15,7 @@
jar
- ${scala212.version}
- ${scala212.compat.version}
+ ${arrow14.version}
@@ -180,4 +179,4 @@
-
\ No newline at end of file
+
diff --git a/lance-spark-3.4_2.13/pom.xml b/lance-spark-3.4_2.13/pom.xml
index 2207ebed..47ef8149 100644
--- a/lance-spark-3.4_2.13/pom.xml
+++ b/lance-spark-3.4_2.13/pom.xml
@@ -17,6 +17,7 @@
${scala213.version}
${scala213.compat.version}
+ ${arrow14.version}
@@ -184,4 +185,4 @@
-
\ No newline at end of file
+
diff --git a/lance-spark-3.5_2.12/pom.xml b/lance-spark-3.5_2.12/pom.xml
index 5b1119a6..97d0d970 100644
--- a/lance-spark-3.5_2.12/pom.xml
+++ b/lance-spark-3.5_2.12/pom.xml
@@ -14,10 +14,6 @@
Apache Spark 3.5 Connector for Lance
jar
-
- ${scala212.version}
- ${scala212.compat.version}
-
@@ -181,4 +177,4 @@
-
\ No newline at end of file
+
diff --git a/lance-spark-3.5_2.13/pom.xml b/lance-spark-3.5_2.13/pom.xml
index 56f05daa..c3b1d3c1 100644
--- a/lance-spark-3.5_2.13/pom.xml
+++ b/lance-spark-3.5_2.13/pom.xml
@@ -185,4 +185,4 @@
-
\ No newline at end of file
+
diff --git a/lance-spark-4.0_2.13/pom.xml b/lance-spark-4.0_2.13/pom.xml
index a7af240e..17a67e65 100644
--- a/lance-spark-4.0_2.13/pom.xml
+++ b/lance-spark-4.0_2.13/pom.xml
@@ -17,8 +17,9 @@
${scala213.version}
${scala213.compat.version}
+ ${arrow18.version}
+ ${java17.release}
4.13.1
- 17
diff --git a/lance-spark-base_2.12/pom.xml b/lance-spark-base_2.12/pom.xml
index 8acbb4c4..bd0d3bf3 100644
--- a/lance-spark-base_2.12/pom.xml
+++ b/lance-spark-base_2.12/pom.xml
@@ -102,4 +102,4 @@
-
\ No newline at end of file
+
diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentColumnarBatchScanner.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentColumnarBatchScanner.java
index 34b4d610..32feddd6 100644
--- a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentColumnarBatchScanner.java
+++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentColumnarBatchScanner.java
@@ -79,7 +79,9 @@ public boolean loadNextBatch() throws IOException {
return false;
}
- /** @return the current batch, the caller responsible for closing the batch */
+ /**
+ * @return the current batch, the caller responsible for closing the batch
+ */
public ColumnarBatch getCurrentBatch() {
return currentColumnarBatch;
}
diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java
index f5de7a4c..658770d6 100644
--- a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java
+++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentScanner.java
@@ -132,7 +132,9 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
}
}
- /** @return the arrow reader. The caller is responsible for closing the reader */
+ /**
+ * @return the arrow reader. The caller is responsible for closing the reader
+ */
public ArrowReader getArrowReader() {
return scanner.scanBatches();
}
diff --git a/lance-spark-base_2.13/pom.xml b/lance-spark-base_2.13/pom.xml
index 2c01de10..aa19758a 100644
--- a/lance-spark-base_2.13/pom.xml
+++ b/lance-spark-base_2.13/pom.xml
@@ -146,4 +146,4 @@
-
\ No newline at end of file
+
diff --git a/lance-spark-bundle-3.4_2.12/pom.xml b/lance-spark-bundle-3.4_2.12/pom.xml
index a2ff6448..aba9c2e4 100644
--- a/lance-spark-bundle-3.4_2.12/pom.xml
+++ b/lance-spark-bundle-3.4_2.12/pom.xml
@@ -15,8 +15,7 @@
jar
- ${scala212.version}
- ${scala212.compat.version}
+ ${arrow14.version}
@@ -108,4 +107,4 @@
-
\ No newline at end of file
+
diff --git a/lance-spark-bundle-3.4_2.12/src/main/java/org/lance/spark/bundle/BundleInfo.java b/lance-spark-bundle-3.4_2.12/src/main/java/org/lance/spark/bundle/BundleInfo.java
index 69381c91..987faabc 100644
--- a/lance-spark-bundle-3.4_2.12/src/main/java/org/lance/spark/bundle/BundleInfo.java
+++ b/lance-spark-bundle-3.4_2.12/src/main/java/org/lance/spark/bundle/BundleInfo.java
@@ -23,7 +23,9 @@ private BundleInfo() {
// Utility class
}
- /** @return the bundle name */
+ /**
+ * @return the bundle name
+ */
public static String getBundleName() {
return "lance-spark-bundle-3.4_2.12";
}
diff --git a/lance-spark-bundle-3.4_2.13/pom.xml b/lance-spark-bundle-3.4_2.13/pom.xml
index 06864f0c..0ec0f03c 100644
--- a/lance-spark-bundle-3.4_2.13/pom.xml
+++ b/lance-spark-bundle-3.4_2.13/pom.xml
@@ -17,6 +17,7 @@
${scala213.version}
${scala213.compat.version}
+ ${arrow14.version}
@@ -108,4 +109,4 @@
-
\ No newline at end of file
+
diff --git a/lance-spark-bundle-3.4_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java b/lance-spark-bundle-3.4_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java
index 5227b50a..8bb430e1 100644
--- a/lance-spark-bundle-3.4_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java
+++ b/lance-spark-bundle-3.4_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java
@@ -23,7 +23,9 @@ private BundleInfo() {
// Utility class
}
- /** @return the bundle name */
+ /**
+ * @return the bundle name
+ */
public static String getBundleName() {
return "lance-spark-bundle-3.4_2.13";
}
diff --git a/lance-spark-bundle-3.5_2.12/pom.xml b/lance-spark-bundle-3.5_2.12/pom.xml
index c227dc0c..9decf88e 100644
--- a/lance-spark-bundle-3.5_2.12/pom.xml
+++ b/lance-spark-bundle-3.5_2.12/pom.xml
@@ -14,10 +14,6 @@
Bundled Jar of Apache Spark 3.5 Connector for Lance
jar
-
- ${scala212.version}
- ${scala212.compat.version}
-
@@ -108,4 +104,4 @@
-
\ No newline at end of file
+
diff --git a/lance-spark-bundle-3.5_2.12/src/main/java/org/lance/spark/bundle/BundleInfo.java b/lance-spark-bundle-3.5_2.12/src/main/java/org/lance/spark/bundle/BundleInfo.java
index f0a2719d..c8c40763 100644
--- a/lance-spark-bundle-3.5_2.12/src/main/java/org/lance/spark/bundle/BundleInfo.java
+++ b/lance-spark-bundle-3.5_2.12/src/main/java/org/lance/spark/bundle/BundleInfo.java
@@ -23,7 +23,9 @@ private BundleInfo() {
// Utility class
}
- /** @return the bundle name */
+ /**
+ * @return the bundle name
+ */
public static String getBundleName() {
return "lance-spark-bundle-3.5_2.12";
}
diff --git a/lance-spark-bundle-3.5_2.13/pom.xml b/lance-spark-bundle-3.5_2.13/pom.xml
index 53cc92e5..196de1f6 100644
--- a/lance-spark-bundle-3.5_2.13/pom.xml
+++ b/lance-spark-bundle-3.5_2.13/pom.xml
@@ -108,4 +108,4 @@
-
\ No newline at end of file
+
diff --git a/lance-spark-bundle-3.5_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java b/lance-spark-bundle-3.5_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java
index 30772b11..e98e4c07 100644
--- a/lance-spark-bundle-3.5_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java
+++ b/lance-spark-bundle-3.5_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java
@@ -23,7 +23,9 @@ private BundleInfo() {
// Utility class
}
- /** @return the bundle name */
+ /**
+ * @return the bundle name
+ */
public static String getBundleName() {
return "lance-spark-bundle-3.5_2.13";
}
diff --git a/lance-spark-bundle-4.0_2.13/pom.xml b/lance-spark-bundle-4.0_2.13/pom.xml
index 092c78a0..ccb5de47 100644
--- a/lance-spark-bundle-4.0_2.13/pom.xml
+++ b/lance-spark-bundle-4.0_2.13/pom.xml
@@ -17,6 +17,8 @@
${scala213.version}
${scala213.compat.version}
+ ${arrow18.version}
+ ${java17.release}
diff --git a/lance-spark-bundle-4.0_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java b/lance-spark-bundle-4.0_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java
index bd3e02ae..a3c42072 100644
--- a/lance-spark-bundle-4.0_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java
+++ b/lance-spark-bundle-4.0_2.13/src/main/java/org/lance/spark/bundle/BundleInfo.java
@@ -23,7 +23,9 @@ private BundleInfo() {
// Utility class
}
- /** @return the bundle name */
+ /**
+ * @return the bundle name
+ */
public static String getBundleName() {
return "lance-spark-bundle-4.0_2.13";
}
diff --git a/pom.xml b/pom.xml
index 8c6d014d..7633e0ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,12 @@
2.0.0
0.1.3
+ 14.0.2
+ 15.0.2
+ 18.1.0
+
+ ${arrow15.version}
+
4.9.3
3.4.4
@@ -75,8 +81,8 @@
${scala212.compat.version}
false
- 2.30.0
- 1.7
+ 2.43.0
+ 1.19.2
3.7.5
package
@@ -112,7 +118,9 @@
2.5.2
3.0.2
3.2.5
- 11
+ 11
+ 17
+ ${java11.release}
@@ -132,13 +140,48 @@
-
org.apache.spark
spark-sql_${scala.compat.version}
${spark.version}
provided
+
+
+ org.apache.arrow
+ arrow-vector
+ ${arrow.version}
+
+
+ org.apache.arrow
+ arrow-format
+ ${arrow.version}
+
+
+ org.apache.arrow
+ arrow-memory-core
+ ${arrow.version}
+
+
+ org.apache.arrow
+ arrow-memory-netty
+ ${arrow.version}
+
+
+ org.apache.arrow
+ arrow-memory-netty-buffer-patch
+ ${arrow.version}
+
+
+ org.apache.arrow
+ arrow-c-data
+ ${arrow.version}
+
+
+ org.apache.arrow
+ arrow-dataset
+ ${arrow.version}
+
@@ -417,56 +460,6 @@
-
- scala-2.12
-
- false
-
-
- ${scala212.version}
- ${scala212.compat.version}
-
-
-
- scala-2.13
-
- false
-
-
- ${scala213.version}
- ${scala213.compat.version}
-
-
-
- spark-3.4
-
- false
-
-
- ${spark34.version}
- ${spark34.compat.version}
-
-
-
- spark-3.5
-
- false
-
-
- ${spark35.version}
- ${spark35.compat.version}
-
-
-
- spark-4.0
-
- false
-
-
- ${spark40.version}
- ${spark40.compat.version}
-
-
deploy-to-ossrh