Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
42 changes: 28 additions & 14 deletions .github/workflows/velox_backend_x86.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ jobs:
fail-fast: false
matrix:
os: [ "ubuntu:20.04", "ubuntu:22.04" ]
spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0" ]
spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0", "spark-4.1" ]
java: [ "java-8", "java-11", "java-17", "java-21" ]
# Spark supports JDK17 since 3.3.
exclude:
Expand Down Expand Up @@ -141,6 +141,10 @@ jobs:
java: java-8
- spark: spark-4.0
java: java-11
- spark: spark-4.1
java: java-8
- spark: spark-4.1
java: java-11

runs-on: ubuntu-22.04
container: ${{ matrix.os }}
Expand Down Expand Up @@ -182,11 +186,14 @@ jobs:
cd $GITHUB_WORKSPACE/
export JAVA_HOME=/usr/lib/jvm/${{ matrix.java }}-openjdk-amd64
echo "JAVA_HOME: $JAVA_HOME"
if [ "${{ matrix.spark }}" = "spark-4.0" ]; then
$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pscala-2.13 -Pbackends-velox -DskipTests
else
$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pbackends-velox -DskipTests
fi
case "${{ matrix.spark }}" in
spark-4.0|spark-4.1)
$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pscala-2.13 -Pbackends-velox -DskipTests
;;
*)
$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pbackends-velox -DskipTests
;;
esac
cd $GITHUB_WORKSPACE/tools/gluten-it
$GITHUB_WORKSPACE/$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }}
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
Expand All @@ -200,7 +207,7 @@ jobs:
fail-fast: false
matrix:
os: [ "centos:8" ]
spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0" ]
spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0", "spark-4.1" ]
java: [ "java-8", "java-11", "java-17" ]
# Spark supports JDK17 since 3.3.
exclude:
Expand All @@ -220,6 +227,10 @@ jobs:
java: java-8
- spark: spark-4.0
java: java-11
- spark: spark-4.1
java: java-8
- spark: spark-4.1
java: java-11

runs-on: ubuntu-22.04
container: ${{ matrix.os }}
Expand Down Expand Up @@ -263,11 +274,14 @@ jobs:
run: |
echo "JAVA_HOME: $JAVA_HOME"
cd $GITHUB_WORKSPACE/
if [ "${{ matrix.spark }}" = "spark-4.0" ]; then
$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pscala-2.13 -Pbackends-velox -DskipTests
else
$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pbackends-velox -DskipTests
fi
case "${{ matrix.spark }}" in
spark-4.0|spark-4.1)
$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pscala-2.13 -Pbackends-velox -DskipTests
;;
*)
$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pbackends-velox -DskipTests
;;
esac
cd $GITHUB_WORKSPACE/tools/gluten-it
$GITHUB_WORKSPACE/build/mvn clean install -P${{ matrix.spark }} -P${{ matrix.java }}
- name: Run TPC-H / TPC-DS
Expand Down Expand Up @@ -1521,7 +1535,7 @@ jobs:
export PATH=$JAVA_HOME/bin:$PATH
java -version
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/ -Dspark.sql.unionOutputPartitioning=false" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
if: always()
Expand Down Expand Up @@ -1570,7 +1584,7 @@ jobs:
export PATH=$JAVA_HOME/bin:$PATH
java -version
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/ -Dspark.sql.unionOutputPartitioning=false" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
- name: Upload test report
if: always()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
import org.apache.spark.sql.catalyst.trees.TernaryLike
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DataType
import org.apache.spark.task.TaskResources
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.sketch.BloomFilter

import java.io.Serializable

/**
* Velox's bloom-filter implementation uses different algorithms internally comparing to vanilla
* Spark so produces different intermediate aggregate data. Thus we use different filter function /
Expand Down Expand Up @@ -61,6 +65,15 @@ case class VeloxBloomFilterAggregate(
.toLong
)

// Mark as lazy so that `updater` is not evaluated during tree transformation.
private lazy val updater: BloomFilterUpdater = child.dataType match {
case LongType => LongUpdater
case IntegerType => IntUpdater
case ShortType => ShortUpdater
case ByteType => ByteUpdater
case _: StringType => BinaryUpdater
}

override def first: Expression = child

override def second: Expression = estimatedNumItemsExpression
Expand Down Expand Up @@ -97,7 +110,7 @@ case class VeloxBloomFilterAggregate(
if (value == null) {
return buffer
}
buffer.putLong(value.asInstanceOf[Long])
updater.update(buffer, value)
buffer
}

Expand Down Expand Up @@ -128,3 +141,33 @@ case class VeloxBloomFilterAggregate(
copy(inputAggBufferOffset = newOffset)

}

// see https://github.com/apache/spark/pull/42414
private trait BloomFilterUpdater {
def update(bf: BloomFilter, v: Any): Boolean
}

private object LongUpdater extends BloomFilterUpdater with Serializable {
override def update(bf: BloomFilter, v: Any): Boolean =
bf.putLong(v.asInstanceOf[Long])
}

private object IntUpdater extends BloomFilterUpdater with Serializable {
override def update(bf: BloomFilter, v: Any): Boolean =
bf.putLong(v.asInstanceOf[Int])
}

private object ShortUpdater extends BloomFilterUpdater with Serializable {
override def update(bf: BloomFilter, v: Any): Boolean =
bf.putLong(v.asInstanceOf[Short])
}

private object ByteUpdater extends BloomFilterUpdater with Serializable {
override def update(bf: BloomFilter, v: Any): Boolean =
bf.putLong(v.asInstanceOf[Byte])
}

private object BinaryUpdater extends BloomFilterUpdater with Serializable {
override def update(bf: BloomFilter, v: Any): Boolean =
bf.putBinary(v.asInstanceOf[UTF8String].getBytes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,24 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging {
conf: SQLConf): RDD[CachedBatch] = {
val localSchema = toStructType(schema)
if (!validateSchema(localSchema)) {
// we can not use columnar cache here, as the `RowToColumnar` does not support this schema
return rowBasedCachedBatchSerializer.convertInternalRowToCachedBatch(
// we cannot use columnar cache here, as the `RowToColumnar` does not support this schema
rowBasedCachedBatchSerializer.convertInternalRowToCachedBatch(
input,
schema,
storageLevel,
conf)
} else {
val numRows = conf.columnBatchSize
val rddColumnarBatch = input.mapPartitions {
it =>
RowToVeloxColumnarExec.toColumnarBatchIterator(
it,
localSchema,
numRows,
VeloxConfig.get.veloxPreferredBatchBytes)
}
convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, conf)
}

val numRows = conf.columnBatchSize
val rddColumnarBatch = input.mapPartitions {
it =>
RowToVeloxColumnarExec.toColumnarBatchIterator(
it,
localSchema,
numRows,
VeloxConfig.get.veloxPreferredBatchBytes)
}
convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, conf)
}

override def convertCachedBatchToInternalRow(
Expand All @@ -141,18 +141,18 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging {
selectedAttributes: Seq[Attribute],
conf: SQLConf): RDD[InternalRow] = {
if (!validateSchema(cacheAttributes)) {
// if we do not support this schema that means we are using row-based serializer,
// if we do not support this schema, that means we are using row-based serializer,
// see `convertInternalRowToCachedBatch`, so fallback to vanilla Spark serializer
return rowBasedCachedBatchSerializer.convertCachedBatchToInternalRow(
rowBasedCachedBatchSerializer.convertCachedBatchToInternalRow(
input,
cacheAttributes,
selectedAttributes,
conf)
} else {
val rddColumnarBatch =
convertCachedBatchToColumnarBatch(input, cacheAttributes, selectedAttributes, conf)
rddColumnarBatch.mapPartitions(it => VeloxColumnarToRowExec.toRowIterator(it))
}

val rddColumnarBatch =
convertCachedBatchToColumnarBatch(input, cacheAttributes, selectedAttributes, conf)
rddColumnarBatch.mapPartitions(it => VeloxColumnarToRowExec.toRowIterator(it))
}

override def convertColumnarBatchToCachedBatch(
Expand Down Expand Up @@ -190,58 +190,68 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging {
cacheAttributes: Seq[Attribute],
selectedAttributes: Seq[Attribute],
conf: SQLConf): RDD[ColumnarBatch] = {
// Find the ordinals and data types of the requested columns.
val requestedColumnIndices = selectedAttributes.map {
a => cacheAttributes.map(_.exprId).indexOf(a.exprId)
}
val shouldSelectAttributes = cacheAttributes != selectedAttributes
val localSchema = toStructType(cacheAttributes)
val timezoneId = SQLConf.get.sessionLocalTimeZone
input.mapPartitions {
it =>
val runtime = Runtimes.contextInstance(
BackendsApiManager.getBackendName,
"ColumnarCachedBatchSerializer#read")
val jniWrapper = ColumnarBatchSerializerJniWrapper
.create(runtime)
val schema = SparkArrowUtil.toArrowSchema(localSchema, timezoneId)
val arrowAlloc = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(arrowAlloc)
ArrowAbiUtil.exportSchema(arrowAlloc, schema, cSchema)
val deserializerHandle = jniWrapper
.init(cSchema.memoryAddress())
cSchema.close()

Iterators
.wrap(new Iterator[ColumnarBatch] {
override def hasNext: Boolean = it.hasNext

override def next(): ColumnarBatch = {
val cachedBatch = it.next().asInstanceOf[CachedColumnarBatch]
val batchHandle =
jniWrapper
.deserialize(deserializerHandle, cachedBatch.bytes)
val batch = ColumnarBatches.create(batchHandle)
if (shouldSelectAttributes) {
try {
ColumnarBatches.select(
BackendsApiManager.getBackendName,
batch,
requestedColumnIndices.toArray)
} finally {
batch.close()
if (!validateSchema(cacheAttributes)) {
// if we do not support this schema, that means we are using row-based serializer,
// see `convertInternalRowToCachedBatch`, so fallback to vanilla Spark serializer
rowBasedCachedBatchSerializer.convertCachedBatchToColumnarBatch(
input,
cacheAttributes,
selectedAttributes,
conf)
} else {
// Find the ordinals and data types of the requested columns.
val requestedColumnIndices = selectedAttributes.map {
a => cacheAttributes.map(_.exprId).indexOf(a.exprId)
}
val shouldSelectAttributes = cacheAttributes != selectedAttributes
val localSchema = toStructType(cacheAttributes)
val timezoneId = SQLConf.get.sessionLocalTimeZone
input.mapPartitions {
it =>
val runtime = Runtimes.contextInstance(
BackendsApiManager.getBackendName,
"ColumnarCachedBatchSerializer#read")
val jniWrapper = ColumnarBatchSerializerJniWrapper
.create(runtime)
val schema = SparkArrowUtil.toArrowSchema(localSchema, timezoneId)
val arrowAlloc = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(arrowAlloc)
ArrowAbiUtil.exportSchema(arrowAlloc, schema, cSchema)
val deserializerHandle = jniWrapper
.init(cSchema.memoryAddress())
cSchema.close()

Iterators
.wrap(new Iterator[ColumnarBatch] {
override def hasNext: Boolean = it.hasNext

override def next(): ColumnarBatch = {
val cachedBatch = it.next().asInstanceOf[CachedColumnarBatch]
val batchHandle =
jniWrapper
.deserialize(deserializerHandle, cachedBatch.bytes)
val batch = ColumnarBatches.create(batchHandle)
if (shouldSelectAttributes) {
try {
ColumnarBatches.select(
BackendsApiManager.getBackendName,
batch,
requestedColumnIndices.toArray)
} finally {
batch.close()
}
} else {
batch
}
} else {
batch
}
})
.protectInvocationFlow()
.recycleIterator {
jniWrapper.close(deserializerHandle)
}
})
.protectInvocationFlow()
.recycleIterator {
jniWrapper.close(deserializerHandle)
}
.recyclePayload(_.close())
.create()
.recyclePayload(_.close())
.create()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,14 @@ object ExpressionConverter extends SQLConfHelper with Logging {
case t: TransformKeys =>
// default is `EXCEPTION`
val mapKeyDedupPolicy = SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY)
if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {

// Calling `.toString` on both sides ensures compatibility across all Spark versions.
// Starting from Spark 4.1, `SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY)` returns
// an enum instead of a String. Without `.toString`, the comparison
// `mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString` would silently fail
// in tests, producing only a "Comparing unrelated types" warning in IntelliJ IDEA,
// but no compile-time error.
if (mapKeyDedupPolicy.toString == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
// TODO: Remove after fix ready for
// https://github.com/facebookincubator/velox/issues/10219
throw new GlutenNotSupportException(
Expand Down
6 changes: 6 additions & 0 deletions gluten-ut/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -230,5 +230,11 @@
<module>spark40</module>
</modules>
</profile>
<profile>
<id>spark-4.1</id>
<modules>
<module>spark41</module>
</modules>
</profile>
</profiles>
</project>
Loading