diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java index 3d9157a8ef89..62dd5766477d 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java @@ -89,11 +89,11 @@ public boolean isCompressed() { @Override public void close() { - GlutenException.wrap( - () -> { - channel.close(); - in.close(); - return null; - }); + try { + channel.close(); + in.close(); + } catch (Exception e) { + throw new GlutenException(e); + } } } diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java index 0eb921579916..ca6ad213dce6 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java @@ -65,10 +65,10 @@ public boolean isCompressed() { @Override public void close() { - GlutenException.wrap( - () -> { - in.close(); - return null; - }); + try { + in.close(); + } catch (Exception e) { + throw new GlutenException(e); + } } } diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java index ea7a1e236c0f..0b1bd60be0d8 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java @@ -37,24 +37,26 @@ public OnHeapCopyShuffleInputStream(InputStream in, boolean isCompressed) { @Override public long read(long destAddress, long maxReadSize) { - return GlutenException.wrap( - () -> { - int maxReadSize32 = Math.toIntExact(maxReadSize); - if (buffer == null || maxReadSize32 > buffer.length) { - this.buffer = new byte[maxReadSize32]; - } - // The code conducts copy as long as 'in' wraps off-heap data, - // which is about to be moved to heap - int read = in.read(buffer, 0, maxReadSize32); - if (read == -1 || read == 0) { - return 0; - } - // The code conducts copy, from heap to off-heap - // memCopyFromHeap(buffer, destAddress, read); - PlatformDependent.copyMemory(buffer, 0, destAddress, read); - bytesRead += read; - return read; - }); + try { + + int maxReadSize32 = Math.toIntExact(maxReadSize); + if (buffer == null || maxReadSize32 > buffer.length) { + this.buffer = new byte[maxReadSize32]; + } + // The code conducts copy as long as 'in' wraps off-heap data, + // which is about to be moved to heap + int read = in.read(buffer, 0, maxReadSize32); + if (read == -1 || read == 0) { + return 0; + } + // The code conducts copy, from heap to off-heap + // memCopyFromHeap(buffer, destAddress, read); + PlatformDependent.copyMemory(buffer, 0, destAddress, read); + bytesRead += read; + return read; + } catch (Exception e) { + throw new GlutenException(e); + } } @Override @@ -69,11 +71,11 @@ public boolean isCompressed() { @Override public void close() { - GlutenException.wrap( - () -> { - in.close(); - in = null; - return null; - }); + try { + in.close(); + in = null; + } catch (Exception e) { + throw new GlutenException(e); + } } } diff --git a/backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend b/backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend similarity index 100% rename from backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend rename to backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 69ea899c42a5..857fa571c060 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -17,10 +17,12 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.gluten.{CH_BRANCH, CH_COMMIT, GlutenConfig} +import org.apache.gluten.backend.Backend import org.apache.gluten.backendsapi._ import org.apache.gluten.execution.WriteFilesExecTransformer import org.apache.gluten.expression.WindowFunctionsBuilder import org.apache.gluten.extension.ValidationResult +import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._ @@ -43,10 +45,11 @@ import java.util.Locale import scala.util.control.Breaks.{break, breakable} -class CHBackend extends Backend { +class CHBackend extends SubstraitBackend { override def name(): String = CHBackend.BACKEND_NAME - override def buildInfo(): BackendBuildInfo = - BackendBuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN") + override def batchType: Convention.BatchType = CHBatch + override def buildInfo(): Backend.BuildInfo = + Backend.BuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN") override def iteratorApi(): IteratorApi = new CHIteratorApi override def sparkPlanExecApi(): SparkPlanExecApi = new CHSparkPlanExecApi override def transformerApi(): TransformerApi = new CHTransformerApi diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index f33e767e13e0..d3a0b7e28e54 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -20,13 +20,13 @@ import org.apache.gluten.GlutenNumaBindingInfo import org.apache.gluten.backendsapi.IteratorApi import org.apache.gluten.execution._ import org.apache.gluten.expression.ConverterUtils +import org.apache.gluten.logging.LogLevelUtil import org.apache.gluten.memory.CHThreadGroup import org.apache.gluten.metrics.{IMetrics, NativeMetrics} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanNode import org.apache.gluten.substrait.rel._ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat -import org.apache.gluten.utils.LogLevelUtil import org.apache.gluten.vectorized.{BatchIterator, CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, GeneralInIterator} import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala index c53448cdd858..d84181fec1da 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala @@ -17,9 +17,9 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.gluten.backendsapi.MetricsApi +import org.apache.gluten.logging.LogLevelUtil import org.apache.gluten.metrics._ import org.apache.gluten.substrait.{AggregationParams, JoinParams} -import org.apache.gluten.utils.LogLevelUtil import org.apache.spark.SparkContext import org.apache.spark.internal.Logging diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 04644d4a2970..d5e079508159 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -26,6 +26,7 @@ import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector} import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector} import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlParser} import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.utils.PhysicalPlanSelector import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite} import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter} @@ -34,6 +35,8 @@ import org.apache.spark.util.SparkPlanRules class CHRuleApi extends RuleApi { import CHRuleApi._ override def injectRules(injector: RuleInjector): Unit = { + injector.gluten.skipOn(PhysicalPlanSelector.skipCond) + injectSpark(injector.spark) injectLegacy(injector.gluten.legacy) injectRas(injector.gluten.ras) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 1108b8b3c501..1f4836b2ec4c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -24,7 +24,6 @@ import org.apache.gluten.expression._ import org.apache.gluten.extension.ExpressionExtensionTrait import org.apache.gluten.extension.columnar.AddFallbackTagRule import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides -import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode} import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy} @@ -66,9 +65,6 @@ import scala.collection.mutable.ArrayBuffer class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { - /** The columnar-batch type this backend is using. */ - override def batchType: Convention.BatchType = CHBatch - /** Transform GetArrayItem to Substrait. */ override def genGetArrayItemTransformer( substraitExprName: String, diff --git a/backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend b/backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend similarity index 100% rename from backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend rename to backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index ecd053a632f3..19985d6b858d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -17,11 +17,14 @@ package org.apache.gluten.backendsapi.velox import org.apache.gluten.{GlutenConfig, VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME} +import org.apache.gluten.backend.Backend import org.apache.gluten.backendsapi._ import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution.WriteFilesExecTransformer import org.apache.gluten.expression.WindowFunctionsBuilder import org.apache.gluten.extension.ValidationResult +import org.apache.gluten.extension.columnar.transition.Convention +import org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat} @@ -32,6 +35,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, De import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile} import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.execution.ColumnarCachedBatchSerializer +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -43,10 +48,17 @@ import org.apache.hadoop.fs.Path import scala.util.control.Breaks.breakable -class VeloxBackend extends Backend { +class VeloxBackend extends SubstraitBackend { override def name(): String = VeloxBackend.BACKEND_NAME - override def buildInfo(): BackendBuildInfo = - BackendBuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME) + override def batchType: Convention.BatchType = VeloxBatch + override def batchTypeFunc(): BatchOverride = { + case i: InMemoryTableScanExec + if i.supportsColumnar && i.relation.cacheBuilder.serializer + .isInstanceOf[ColumnarCachedBatchSerializer] => + VeloxBatch + } + override def buildInfo(): Backend.BuildInfo = + Backend.BuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME) override def iteratorApi(): IteratorApi = new VeloxIteratorApi override def sparkPlanExecApi(): SparkPlanExecApi = new VeloxSparkPlanExecApi override def transformerApi(): TransformerApi = new VeloxTransformerApi @@ -454,7 +466,5 @@ object VeloxBackendSettings extends BackendSettingsApi { override def supportColumnarArrowUdf(): Boolean = true - override def generateHdfsConfForLibhdfs(): Boolean = true - override def needPreComputeRangeFrameBoundary(): Boolean = true } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala index 02597f815178..2cec8d392370 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala @@ -25,7 +25,7 @@ import org.apache.gluten.jni.{JniLibLoader, JniWorkspace} import org.apache.gluten.udf.UdfJniWrapper import org.apache.gluten.utils._ -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{HdfsConfGenerator, SparkConf, SparkContext} import org.apache.spark.api.plugin.PluginContext import org.apache.spark.internal.Logging import org.apache.spark.network.util.ByteUnit @@ -44,6 +44,9 @@ class VeloxListenerApi extends ListenerApi with Logging { override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = { val conf = pc.conf() + // Generate HDFS client configurations. + HdfsConfGenerator.addHdfsClientToSparkWorkDirectory(sc) + // Overhead memory limits. val offHeapSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY) val desiredOverheadSize = (0.1 * offHeapSize).toLong.max(ByteUnit.MiB.toBytes(384)) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 6204ab092a39..7f95c78bed64 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -28,6 +28,7 @@ import org.apache.gluten.extension.columnar.transition.{InsertTransitions, Remov import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector} import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector} import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.utils.PhysicalPlanSelector import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter} @@ -35,6 +36,8 @@ class VeloxRuleApi extends RuleApi { import VeloxRuleApi._ override def injectRules(injector: RuleInjector): Unit = { + injector.gluten.skipOn(PhysicalPlanSelector.skipCond) + injectSpark(injector.spark) injectLegacy(injector.gluten.legacy) injectRas(injector.gluten.ras) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 4a9bfef5520a..64c212690fb3 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -23,8 +23,6 @@ import org.apache.gluten.execution._ import org.apache.gluten.expression._ import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet} import org.apache.gluten.extension.columnar.FallbackTags -import org.apache.gluten.extension.columnar.transition.Convention -import org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.vectorized.{ColumnarBatchSerializer, ColumnarBatchSerializeResult} @@ -42,7 +40,6 @@ import org.apache.spark.sql.catalyst.optimizer.BuildSide import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode} @@ -61,23 +58,6 @@ import javax.ws.rs.core.UriBuilder class VeloxSparkPlanExecApi extends SparkPlanExecApi { - /** The columnar-batch type this backend is using. */ - override def batchType: Convention.BatchType = { - VeloxBatch - } - - /** - * Overrides [[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is using to - * determine the convention (its row-based processing / columnar-batch processing support) of a - * plan with a user-defined function that accepts a plan then returns batch type it outputs. - */ - override def batchTypeFunc(): BatchOverride = { - case i: InMemoryTableScanExec - if i.supportsColumnar && i.relation.cacheBuilder.serializer - .isInstanceOf[ColumnarCachedBatchSerializer] => - VeloxBatch - } - /** Transform GetArrayItem to Substrait. */ override def genGetArrayItemTransformer( substraitExprName: String, diff --git a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java index 9e5843060255..21173115991d 100644 --- a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java +++ b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java @@ -22,25 +22,26 @@ import com.codahale.metrics.MetricRegistry; import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; import org.apache.spark.api.plugin.PluginContext; import org.apache.spark.resource.ResourceInformation; +import org.jetbrains.annotations.NotNull; +import org.junit.AfterClass; import org.junit.BeforeClass; import java.io.IOException; import java.util.Map; -/** For testing Velox backend without starting a Spark context. */ public abstract class VeloxBackendTestBase { + private static final ListenerApi API = new VeloxListenerApi(); + @BeforeClass public static void setup() { - final ListenerApi api = new VeloxListenerApi(); - api.onDriverStart(mockSparkContext(), mockPluginContext()); + API.onExecutorStart(mockPluginContext()); } - private static SparkContext mockSparkContext() { - // Not yet implemented. - return null; + @AfterClass + public static void tearDown() { + API.onExecutorShutdown(); } private static PluginContext mockPluginContext() { @@ -52,9 +53,7 @@ public MetricRegistry metricRegistry() { @Override public SparkConf conf() { - final SparkConf conf = new SparkConf(); - conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g"); - return conf; + return newSparkConf(); } @Override @@ -83,4 +82,11 @@ public Object ask(Object message) throws Exception { } }; } + + @NotNull + private static SparkConf newSparkConf() { + final SparkConf conf = new SparkConf(); + conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g"); + return conf; + } } diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml index 448bc9ee2d13..5e077a8b7db5 100644 --- a/gluten-core/pom.xml +++ b/gluten-core/pom.xml @@ -31,6 +31,11 @@ ${project.version} compile + + org.apache.gluten + gluten-ui + ${project.version} + org.apache.spark diff --git a/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java b/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java index 147e69719789..adbc219df04b 100644 --- a/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java +++ b/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java @@ -16,8 +16,6 @@ */ package org.apache.gluten.exception; -import java.util.concurrent.Callable; - public class GlutenException extends RuntimeException { public GlutenException() {} @@ -33,12 +31,4 @@ public GlutenException(String message, Throwable cause) { public GlutenException(Throwable cause) { super(cause); } - - public static V wrap(Callable callable) { - try { - return callable.call(); - } catch (Exception e) { - throw new GlutenException(e); - } - } } diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java index b7f15d830bed..ea18a05d0c60 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java @@ -18,11 +18,13 @@ import org.apache.gluten.GlutenConfig; +import org.apache.spark.annotation.Experimental; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicLong; +@Experimental public class DynamicOffHeapSizingMemoryTarget implements MemoryTarget { private static final Logger LOG = LoggerFactory.getLogger(DynamicOffHeapSizingMemoryTarget.class); private final MemoryTarget delegated; diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java index bb1e7102b1c3..6f7cc9bd9c9c 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java @@ -20,6 +20,7 @@ import org.apache.gluten.memory.MemoryUsageStatsBuilder; import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers; +import org.apache.spark.annotation.Experimental; import org.apache.spark.memory.TaskMemoryManager; import java.util.Map; @@ -42,6 +43,7 @@ public static MemoryTarget overAcquire( return new OverAcquire(target, overTarget, overAcquiredRatio); } + @Experimental public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget memoryTarget) { if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) { return new DynamicOffHeapSizingMemoryTarget(memoryTarget); diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala b/gluten-core/src/main/java/org/apache/gluten/softaffinity/SoftAffinityManager.scala similarity index 99% rename from gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala rename to gluten-core/src/main/java/org/apache/gluten/softaffinity/SoftAffinityManager.scala index 222c7f91e34c..7cb0b2fa2617 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala +++ b/gluten-core/src/main/java/org/apache/gluten/softaffinity/SoftAffinityManager.scala @@ -19,18 +19,15 @@ package org.apache.gluten.softaffinity import org.apache.gluten.GlutenConfig import org.apache.gluten.softaffinity.strategy.SoftAffinityStrategy import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.gluten.utils.LogLevelUtil - import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd} import org.apache.spark.sql.execution.datasources.FilePartition - import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import org.apache.gluten.logging.LogLevelUtil import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantReadWriteLock - import scala.collection.mutable import scala.util.Random diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala b/gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala similarity index 100% rename from gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala rename to gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala b/gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala similarity index 92% rename from gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala rename to gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala index 7a45a07b4574..bc36c3b1ea0d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala +++ b/gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala @@ -18,7 +18,7 @@ package org.apache.gluten.softaffinity.strategy import org.apache.spark.internal.Logging -import scala.collection.mutable.LinkedHashSet +import scala.collection.mutable import scala.collection.mutable.ListBuffer class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging { @@ -32,7 +32,7 @@ class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging { } else { val candidatesSize = candidates.size val halfCandidatesSize = candidatesSize / softAffinityReplicationNum - val resultSet = new LinkedHashSet[(String, String)] + val resultSet = new mutable.LinkedHashSet[(String, String)] // TODO: try to use ConsistentHash val mod = file.hashCode % candidatesSize @@ -41,7 +41,7 @@ class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging { if (candidates(c1).isDefined) { resultSet.add(candidates(c1).get) } - for (i <- 1 to (softAffinityReplicationNum - 1)) { + for (i <- 1 until softAffinityReplicationNum) { val c2 = (c1 + halfCandidatesSize + i) % candidatesSize if (candidates(c2).isDefined) { resultSet.add(candidates(c2).get) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinity.scala similarity index 98% rename from gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala rename to gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinity.scala index cfdce536062e..0e439d245cf7 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala +++ b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinity.scala @@ -16,9 +16,8 @@ */ package org.apache.spark.softaffinity +import org.apache.gluten.logging.LogLevelUtil import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager} -import org.apache.gluten.utils.LogLevelUtil - import org.apache.spark.internal.Logging import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.execution.datasources.FilePartition diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/scheduler/SoftAffinityListener.scala b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinityListener.scala similarity index 89% rename from gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/scheduler/SoftAffinityListener.scala rename to gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinityListener.scala index b9ffd62dcd85..d6b9a8d70c92 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/scheduler/SoftAffinityListener.scala +++ b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinityListener.scala @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.softaffinity.scheduler +package org.apache.spark.softaffinity import org.apache.gluten.softaffinity.SoftAffinityManager - +import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ class SoftAffinityListener extends SparkListener with Logging { - override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { SoftAffinityManager.updateStageMap(event) } @@ -46,3 +45,9 @@ class SoftAffinityListener extends SparkListener with Logging { SoftAffinityManager.handleExecutorRemoved(execId) } } + +object SoftAffinityListener { + def register(sc: SparkContext): Unit = { + sc.listenerBus.addToStatusQueue(new SoftAffinityListener()) + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala similarity index 83% rename from gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala rename to gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index 66bab72bf577..72998c0b0637 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -17,19 +17,18 @@ package org.apache.gluten import org.apache.gluten.GlutenConfig.GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY -import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.backend.Backend import org.apache.gluten.events.GlutenBuildInfoEvent import org.apache.gluten.exception.GlutenException -import org.apache.gluten.extension.GlutenSessionExtensions.{GLUTEN_SESSION_EXTENSION_NAME, SPARK_SESSION_EXTS_KEY} +import org.apache.gluten.extension.GlutenSessionExtensions import org.apache.gluten.task.TaskListener -import org.apache.gluten.test.TestStats -import org.apache.spark.{HdfsConfGenerator, SparkConf, SparkContext, TaskFailedReason} +import org.apache.spark.{SparkConf, SparkContext, TaskFailedReason} import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.internal.Logging -import org.apache.spark.listener.GlutenListenerFactory import org.apache.spark.network.util.JavaUtils -import org.apache.spark.sql.execution.ui.GlutenEventUtils +import org.apache.spark.softaffinity.SoftAffinityListener +import org.apache.spark.sql.execution.ui.{GlutenEventUtils, GlutenSQLAppStatusListener} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.task.TaskResources import org.apache.spark.util.SparkResourceUtil @@ -54,23 +53,21 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = { _sc = Some(sc) - GlutenEventUtils.registerListener(sc) + GlutenSQLAppStatusListener.register(sc) postBuildInfoEvent(sc) val conf = pluginContext.conf() - if (conf.getBoolean(GlutenConfig.UT_STATISTIC.key, defaultValue = false)) { - // Only statistic in UT, not thread safe - TestStats.beginStatistic() - } setPredefinedConfigs(sc, conf) - if (BackendsApiManager.getSettings.generateHdfsConfForLibhdfs()) { - HdfsConfGenerator.addHdfsClientToSparkWorkDirectory(sc) + // Initialize Backends API. + Backend.get().onDriverStart(sc, pluginContext) + if ( + sc.getConf.getBoolean( + GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED, + GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE) + ) { + SoftAffinityListener.register(sc) } - // Initialize Backends API - BackendsApiManager.initialize() - BackendsApiManager.getListenerApiInstance.onDriverStart(sc, pluginContext) - GlutenListenerFactory.addToSparkListenerBus(sc) Collections.emptyMap() } @@ -86,11 +83,11 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { } override def shutdown(): Unit = { - BackendsApiManager.getListenerApiInstance.onDriverShutdown() + Backend.get().onDriverShutdown() } private def postBuildInfoEvent(sc: SparkContext): Unit = { - val buildInfo = BackendsApiManager.getBuildInfo + val buildInfo = Backend.get().buildInfo() // export gluten version to property to spark System.setProperty("gluten.version", VERSION) @@ -107,10 +104,10 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { glutenBuildInfo.put("Gluten Revision Time", REVISION_TIME) glutenBuildInfo.put("Gluten Build Time", BUILD_DATE) glutenBuildInfo.put("Gluten Repo URL", REPO_URL) - glutenBuildInfo.put("Backend", buildInfo.backend) - glutenBuildInfo.put("Backend Branch", buildInfo.backendBranch) - glutenBuildInfo.put("Backend Revision", buildInfo.backendRevision) - glutenBuildInfo.put("Backend Revision Time", buildInfo.backendRevisionTime) + glutenBuildInfo.put("Backend", buildInfo.name) + glutenBuildInfo.put("Backend Branch", buildInfo.branch) + glutenBuildInfo.put("Backend Revision", buildInfo.revision) + glutenBuildInfo.put("Backend Revision Time", buildInfo.revisionTime) val infoMap = glutenBuildInfo.toMap val loggingInfo = infoMap.toSeq .sortBy(_._1) @@ -130,12 +127,13 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { private def setPredefinedConfigs(sc: SparkContext, conf: SparkConf): Unit = { // sql extensions - val extensions = if (conf.contains(SPARK_SESSION_EXTS_KEY)) { - s"${conf.get(SPARK_SESSION_EXTS_KEY)},$GLUTEN_SESSION_EXTENSION_NAME" + val extensions = if (conf.contains(GlutenSessionExtensions.SPARK_SESSION_EXTS_KEY)) { + s"${conf.get(GlutenSessionExtensions.SPARK_SESSION_EXTS_KEY)}," + + s"${GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME}" } else { - s"$GLUTEN_SESSION_EXTENSION_NAME" + s"${GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME}" } - conf.set(SPARK_SESSION_EXTS_KEY, extensions) + conf.set(GlutenSessionExtensions.SPARK_SESSION_EXTS_KEY, extensions) // adaptive custom cost evaluator class if (GlutenConfig.getConf.enableGluten && GlutenConfig.getConf.enableGlutenCostEvaluator) { @@ -230,13 +228,19 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { conservativeOffHeapPerTask.toString) } - // disable vanilla columnar readers, to prevent columnar-to-columnar conversions - if (BackendsApiManager.getSettings.disableVanillaColumnarReaders(conf)) { + // Disable vanilla columnar readers, to prevent columnar-to-columnar conversions. + // FIXME: Do we still need this trick since + // https://github.com/apache/incubator-gluten/pull/1931 was merged? + if ( + !conf.getBoolean( + GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key, + GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get) + ) { // FIXME Hongze 22/12/06 // BatchScan.scala in shim was not always loaded by class loader. // The file should be removed and the "ClassCastException" issue caused by // spark.sql..enableVectorizedReader=true should be fixed in another way. - // Before the issue was fixed we force the use of vanilla row reader by using + // Before the issue is fixed we force the use of vanilla row reader by using // the following statement. conf.set("spark.sql.parquet.enableVectorizedReader", "false") conf.set("spark.sql.orc.enableVectorizedReader", "false") @@ -262,15 +266,13 @@ private[gluten] class GlutenExecutorPlugin extends ExecutorPlugin { override def init(ctx: PluginContext, extraConf: util.Map[String, String]): Unit = { val conf = ctx.conf() - // Initialize Backends API - // TODO categorize the APIs by driver's or executor's - BackendsApiManager.initialize() - BackendsApiManager.getListenerApiInstance.onExecutorStart(ctx) + // Initialize Backends API. + Backend.get().onExecutorStart(ctx) } /** Clean up and terminate this plugin. For example: close the native engine. */ override def shutdown(): Unit = { - BackendsApiManager.getListenerApiInstance.onExecutorShutdown() + Backend.get().onExecutorShutdown() super.shutdown() } diff --git a/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala new file mode 100644 index 000000000000..5f82a2ee7d32 --- /dev/null +++ b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backend + +import org.apache.gluten.extension.columnar.transition.{Convention, ConventionFunc} +import org.apache.gluten.extension.injector.RuleInjector + +import org.apache.spark.SparkContext +import org.apache.spark.api.plugin.PluginContext + +import java.util.ServiceLoader + +import scala.collection.JavaConverters + +trait Backend { + import Backend._ + + /** Base information. */ + def name(): String + def buildInfo(): BuildInfo + + /** Spark listeners. */ + def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {} + def onDriverShutdown(): Unit = {} + def onExecutorStart(pc: PluginContext): Unit = {} + def onExecutorShutdown(): Unit = {} + + /** The columnar-batch type this backend is using. */ + def batchType: Convention.BatchType + + /** + * Overrides [[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is using to + * determine the convention (its row-based processing / columnar-batch processing support) of a + * plan with a user-defined function that accepts a plan then returns batch type it outputs. + */ + def batchTypeFunc(): ConventionFunc.BatchOverride = PartialFunction.empty + + /** Query planner rules. */ + def injectRules(injector: RuleInjector): Unit +} + +object Backend { + private val be: Backend = { + val discoveredBackends = + JavaConverters.iterableAsScalaIterable(ServiceLoader.load(classOf[Backend])).toSeq + if (discoveredBackends.isEmpty) { + throw new IllegalStateException("Backend implementation not discovered from JVM classpath") + } + if (discoveredBackends.size != 1) { + throw new IllegalStateException( + s"More than one Backend implementation discovered from JVM classpath: " + + s"${discoveredBackends.map(_.name()).toList}") + } + val backend = discoveredBackends.head + backend + } + + def get(): Backend = { + be + } + + case class BuildInfo(name: String, branch: String, revision: String, revisionTime: String) +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala similarity index 96% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala index c5a9afec3210..ce6d50350cc1 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala @@ -16,9 +16,9 @@ */ package org.apache.gluten.extension -import org.apache.gluten.extension.columnar._ +import org.apache.gluten.extension.columnar.ColumnarRuleApplier import org.apache.gluten.extension.columnar.transition.Transitions -import org.apache.gluten.utils.LogLevelUtil +import org.apache.gluten.logging.LogLevelUtil import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.vectorized.ColumnarBatch -object ColumnarOverrideRules { +object GlutenColumnarRule { // Utilities to infer columnar rule's caller's property: // ApplyColumnarRulesAndInsertTransitions#outputsColumnar. @@ -92,14 +92,14 @@ object ColumnarOverrideRules { } } -case class ColumnarOverrideRules( +case class GlutenColumnarRule( session: SparkSession, applierBuilder: SparkSession => ColumnarRuleApplier) extends ColumnarRule with Logging with LogLevelUtil { - import ColumnarOverrideRules._ + import GlutenColumnarRule._ /** * Note: Do not implement this API. We basically inject all of Gluten's physical rules through diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala similarity index 92% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala index 4456dda61528..42c9cc94b00c 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.extension -import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.backend.Backend import org.apache.gluten.extension.injector.RuleInjector import org.apache.spark.sql.SparkSessionExtensions @@ -27,7 +27,7 @@ import java.util.Objects private[gluten] class GlutenSessionExtensions extends (SparkSessionExtensions => Unit) { override def apply(exts: SparkSessionExtensions): Unit = { val injector = new RuleInjector() - BackendsApiManager.getRuleApiInstance.injectRules(injector) + Backend.get().injectRules(injector) injector.inject(exts) } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala similarity index 90% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala index 9b78ccd11de2..b47b2f3388fa 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala @@ -17,9 +17,9 @@ package org.apache.gluten.extension.columnar import org.apache.gluten.GlutenConfig -import org.apache.gluten.extension.columnar.util.AdaptiveContext +import org.apache.gluten.extension.util.AdaptiveContext +import org.apache.gluten.logging.LogLevelUtil import org.apache.gluten.metrics.GlutenTimeMetric -import org.apache.gluten.utils.LogLevelUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -74,6 +74,11 @@ object ColumnarRuleApplier { logOnLevel(transformPlanLogLevel, message(plan, out, millisTime)) out } + } + // A temporary workaround for applying toggle `spark.gluten.enabled`, to be removed. + trait SkipCondition { + // True if the rule execution should be skipped. + def skip(session: SparkSession, plan: SparkPlan): Boolean } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala similarity index 82% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala index 6ce4e24ed329..de6ac2a7d912 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala @@ -17,9 +17,9 @@ package org.apache.gluten.extension.columnar.enumerated import org.apache.gluten.extension.columnar._ -import org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, ColumnarRuleCall} -import org.apache.gluten.extension.columnar.util.AdaptiveContext -import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector} +import org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, ColumnarRuleCall, SkipCondition} +import org.apache.gluten.extension.util.AdaptiveContext +import org.apache.gluten.logging.LogLevelUtil import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging @@ -36,20 +36,24 @@ import org.apache.spark.sql.execution.SparkPlan * implementing them in EnumeratedTransform. */ @Experimental -class EnumeratedApplier(session: SparkSession, ruleBuilders: Seq[ColumnarRuleBuilder]) +class EnumeratedApplier( + session: SparkSession, + skipConditions: Seq[SkipCondition], + ruleBuilders: Seq[ColumnarRuleBuilder]) extends ColumnarRuleApplier with Logging with LogLevelUtil { private val adaptiveContext = AdaptiveContext(session) override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = { + if (skipConditions.exists(_.skip(session, plan))) { + return plan + } val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar) - PhysicalPlanSelector.maybe(session, plan) { - val finalPlan = maybeAqe { - apply0(ruleBuilders.map(b => b(call)), plan) - } - finalPlan + val finalPlan = maybeAqe { + apply0(ruleBuilders.map(b => b(call)), plan) } + finalPlan } private def apply0(rules: Seq[Rule[SparkPlan]], plan: SparkPlan): SparkPlan = { diff --git a/gluten-substrait/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/FallbackNode.scala similarity index 58% rename from gluten-substrait/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/FallbackNode.scala index 721711af5fca..89409e5ad20e 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/FallbackNode.scala @@ -14,22 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.listener +package org.apache.gluten.extension.columnar.heuristic -import org.apache.gluten.GlutenConfig -import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} -import org.apache.spark.SparkContext - -object GlutenListenerFactory { - def addToSparkListenerBus(sc: SparkContext): Unit = { - if ( - sc.getConf.getBoolean( - GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED, - GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE - ) - ) { - sc.listenerBus.addToStatusQueue(new SoftAffinityListener()) - } - } +/** A wrapper to specify the plan is fallback plan, the caller side should unwrap it. */ +case class FallbackNode(fallbackPlan: SparkPlan) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() + override def output: Seq[Attribute] = fallbackPlan.output } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala similarity index 80% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala index 85f44878f2c1..8e612c6aed79 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala @@ -17,9 +17,9 @@ package org.apache.gluten.extension.columnar.heuristic import org.apache.gluten.extension.columnar._ -import org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, ColumnarRuleCall} -import org.apache.gluten.extension.columnar.util.AdaptiveContext -import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector} +import org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, ColumnarRuleCall, SkipCondition} +import org.apache.gluten.extension.util.AdaptiveContext +import org.apache.gluten.logging.LogLevelUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan */ class HeuristicApplier( session: SparkSession, + skipConditions: Seq[SkipCondition], transformBuilders: Seq[ColumnarRuleBuilder], fallbackPolicyBuilders: Seq[ColumnarRuleBuilder], postBuilders: Seq[ColumnarRuleBuilder], @@ -42,28 +43,30 @@ class HeuristicApplier( private val adaptiveContext = AdaptiveContext(session) override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = { + if (skipConditions.exists(_.skip(session, plan))) { + return plan + } val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar) makeRule(call).apply(plan) } - private def makeRule(call: ColumnarRuleCall): Rule[SparkPlan] = + private def makeRule(call: ColumnarRuleCall): Rule[SparkPlan] = { plan => - PhysicalPlanSelector.maybe(session, plan) { - val finalPlan = prepareFallback(plan) { - p => - val suggestedPlan = transformPlan("transform", transformRules(call), p) - transformPlan("fallback", fallbackPolicies(call), suggestedPlan) match { - case FallbackNode(fallbackPlan) => - // we should use vanilla c2r rather than native c2r, - // and there should be no `GlutenPlan` any more, - // so skip the `postRules()`. - fallbackPlan - case plan => - transformPlan("post", postRules(call), plan) - } - } - transformPlan("final", finalRules(call), finalPlan) + val finalPlan = prepareFallback(plan) { + p => + val suggestedPlan = transformPlan("transform", transformRules(call), p) + transformPlan("fallback", fallbackPolicies(call), suggestedPlan) match { + case FallbackNode(fallbackPlan) => + // we should use vanilla c2r rather than native c2r, + // and there should be no `GlutenPlan` any more, + // so skip the `postRules()`. + fallbackPlan + case plan => + transformPlan("post", postRules(call), plan) + } } + transformPlan("final", finalRules(call), finalPlan) + } private def transformPlan( phase: String, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala similarity index 100% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala similarity index 97% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala index beb80947409a..21662f503e13 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.extension.columnar.transition -import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.backend.Backend import org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions import org.apache.gluten.sql.shims.SparkShimLoader @@ -60,7 +60,7 @@ object ConventionFunc { return PartialFunction.empty } } - BackendsApiManager.getSparkPlanExecApiInstance.batchTypeFunc() + Backend.get().batchTypeFunc() } private class BuiltinFunc(o: BatchOverride) extends ConventionFunc { @@ -86,7 +86,7 @@ object ConventionFunc { val batchType = if (a.supportsColumnar) { // By default, we execute columnar AQE with backend batch output. // See org.apache.gluten.extension.columnar.transition.InsertTransitions.apply - BackendsApiManager.getSparkPlanExecApiInstance.batchType + Backend.get().batchType } else { Convention.BatchType.None } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala similarity index 94% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala index 65422b38070e..cb76ec4de098 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.extension.columnar.transition -import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.backend.Backend import org.apache.spark.sql.execution.SparkPlan @@ -58,7 +58,7 @@ object ConventionReq { val vanillaBatch: ConventionReq = Impl(RowType.Any, BatchType.Is(Convention.BatchType.VanillaBatch)) lazy val backendBatch: ConventionReq = - Impl(RowType.Any, BatchType.Is(BackendsApiManager.getSparkPlanExecApiInstance.batchType)) + Impl(RowType.Any, BatchType.Is(Backend.get().batchType)) def get(plan: SparkPlan): ConventionReq = ConventionFunc.create().conventionReqOf(plan) def of(rowType: RowType, batchType: BatchType): ConventionReq = Impl(rowType, batchType) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala similarity index 100% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala similarity index 96% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala index 602f0303c909..3ba09efefe92 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.extension.columnar.transition -import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.backend.Backend import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan @@ -89,7 +89,7 @@ object Transitions { } def toBackendBatchPlan(plan: SparkPlan): SparkPlan = { - val backendBatchType = BackendsApiManager.getSparkPlanExecApiInstance.batchType + val backendBatchType = Backend.get().batchType val out = toBatchPlan(plan, backendBatchType) out } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala similarity index 100% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala similarity index 77% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala index 728e569cc4eb..ca76e61b7bb0 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala @@ -17,9 +17,9 @@ package org.apache.gluten.extension.injector import org.apache.gluten.GlutenConfig -import org.apache.gluten.extension.ColumnarOverrideRules +import org.apache.gluten.extension.GlutenColumnarRule import org.apache.gluten.extension.columnar.ColumnarRuleApplier -import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder +import org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder, SkipCondition} import org.apache.gluten.extension.columnar.enumerated.EnumeratedApplier import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier @@ -30,20 +30,25 @@ import scala.collection.mutable /** Injector used to inject query planner rules into Gluten. */ class GlutenInjector private[injector] { import GlutenInjector._ + private val skipConditions: mutable.ListBuffer[SkipCondition] = mutable.ListBuffer() val legacy: LegacyInjector = new LegacyInjector() val ras: RasInjector = new RasInjector() private[injector] def inject(extensions: SparkSessionExtensions): Unit = { - val ruleBuilder = (session: SparkSession) => new ColumnarOverrideRules(session, applier) + val ruleBuilder = (session: SparkSession) => new GlutenColumnarRule(session, applier) extensions.injectColumnar(session => ruleBuilder(session)) } private def applier(session: SparkSession): ColumnarRuleApplier = { val conf = new GlutenConfig(session.sessionState.conf) if (conf.enableRas) { - return ras.createApplier(session) + return ras.createApplier(session, skipConditions.toSeq) } - legacy.createApplier(session) + legacy.createApplier(session, skipConditions.toSeq) + } + + def skipOn(skipCondition: SkipCondition): Unit = { + skipConditions += skipCondition } } @@ -70,9 +75,12 @@ object GlutenInjector { finalBuilders += builder } - private[injector] def createApplier(session: SparkSession): ColumnarRuleApplier = { + private[injector] def createApplier( + session: SparkSession, + skipConditions: Seq[SkipCondition]): ColumnarRuleApplier = { new HeuristicApplier( session, + skipConditions, transformBuilders.toSeq, fallbackPolicyBuilders.toSeq, postBuilders.toSeq, @@ -87,8 +95,10 @@ object GlutenInjector { ruleBuilders += builder } - private[injector] def createApplier(session: SparkSession): ColumnarRuleApplier = { - new EnumeratedApplier(session, ruleBuilders.toSeq) + private[injector] def createApplier( + session: SparkSession, + skipConditions: Seq[SkipCondition]): ColumnarRuleApplier = { + new EnumeratedApplier(session, skipConditions, ruleBuilders.toSeq) } } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala similarity index 100% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala similarity index 100% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/util/AdaptiveContext.scala similarity index 98% rename from gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala rename to gluten-core/src/main/scala/org/apache/gluten/extension/util/AdaptiveContext.scala index de72bc4bc97b..b0f42e79673d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/util/AdaptiveContext.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.extension.columnar.util +package org.apache.gluten.extension.util import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.SparkPlan diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/gluten.scala b/gluten-core/src/main/scala/org/apache/gluten/gluten.scala similarity index 100% rename from gluten-substrait/src/main/scala/org/apache/gluten/gluten.scala rename to gluten-core/src/main/scala/org/apache/gluten/gluten.scala diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/utils/LogLevelUtil.scala b/gluten-core/src/main/scala/org/apache/gluten/logging/LogLevelUtil.scala similarity index 97% rename from gluten-substrait/src/main/scala/org/apache/gluten/utils/LogLevelUtil.scala rename to gluten-core/src/main/scala/org/apache/gluten/logging/LogLevelUtil.scala index cbd52b2d07f8..07bbde2bad70 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/LogLevelUtil.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/logging/LogLevelUtil.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.utils +package org.apache.gluten.logging import org.apache.spark.internal.Logging diff --git a/gluten-substrait/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala similarity index 100% rename from gluten-substrait/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala rename to gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala diff --git a/gluten-substrait/pom.xml b/gluten-substrait/pom.xml index 77bb9f3c33e2..a863b7957d8e 100644 --- a/gluten-substrait/pom.xml +++ b/gluten-substrait/pom.xml @@ -32,11 +32,6 @@ test-jar test - - org.apache.gluten - gluten-ui - ${project.version} - org.apache.spark diff --git a/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java b/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java index 598ab556fa3f..37d9e09bb5d8 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java @@ -16,6 +16,8 @@ */ package org.apache.gluten.test; +import org.apache.gluten.GlutenConfig; + import java.util.*; /** Only use in UT Env. It's not thread safe. */ @@ -24,7 +26,6 @@ public class TestStats { private static final String ROW_FORMAT = "%s%s%s%s%s%s"; - private static boolean UT_ENV = false; private static final Map caseInfos = new HashMap<>(); private static String currentCase; public static int offloadGlutenUnitNumber = 0; @@ -35,8 +36,8 @@ public class TestStats { public static int suiteTestNumber = 0; public static int offloadGlutenTestNumber = 0; - public static void beginStatistic() { - UT_ENV = true; + private static boolean enabled() { + return GlutenConfig.getConf().collectUtStats(); } public static void reset() { @@ -56,7 +57,7 @@ public static void reset() { public static int totalOffloadGlutenCaseNumber = 0; public static void printMarkdown(String suitName) { - if (!UT_ENV) { + if (!enabled()) { return; } @@ -105,7 +106,7 @@ public static void printMarkdown(String suitName) { } public static void addFallBackClassName(String className) { - if (!UT_ENV) { + if (!enabled()) { return; } @@ -117,7 +118,7 @@ public static void addFallBackClassName(String className) { } public static void addFallBackCase() { - if (!UT_ENV) { + if (!enabled()) { return; } @@ -127,7 +128,7 @@ public static void addFallBackCase() { } public static void addExpressionClassName(String className) { - if (!UT_ENV) { + if (!enabled()) { return; } @@ -138,7 +139,7 @@ public static void addExpressionClassName(String className) { } public static Set getFallBackClassName() { - if (!UT_ENV) { + if (!enabled()) { return Collections.emptySet(); } @@ -150,7 +151,7 @@ public static Set getFallBackClassName() { } public static void addIgnoreCaseName(String caseName) { - if (!UT_ENV) { + if (!enabled()) { return; } @@ -160,7 +161,7 @@ public static void addIgnoreCaseName(String caseName) { } public static void resetCase() { - if (!UT_ENV) { + if (!enabled()) { return; } @@ -171,7 +172,7 @@ public static void resetCase() { } public static void startCase(String caseName) { - if (!UT_ENV) { + if (!enabled()) { return; } @@ -180,7 +181,7 @@ public static void startCase(String caseName) { } public static void endCase(boolean status) { - if (!UT_ENV) { + if (!enabled()) { return; } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 451cb2fd2568..0aa5d6ae8c5c 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig import org.apache.gluten.extension.ValidationResult import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat -import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} import org.apache.spark.sql.catalyst.plans._ @@ -72,11 +71,6 @@ trait BackendSettingsApi { // Whether to fallback aggregate at the same time if its empty-output child is fallen back. def fallbackAggregateWithEmptyOutputChild(): Boolean = false - def disableVanillaColumnarReaders(conf: SparkConf): Boolean = - !conf.getBoolean( - GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key, - GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get) - def recreateJoinExecOnFallback(): Boolean = false /** @@ -141,7 +135,5 @@ trait BackendSettingsApi { def supportColumnarArrowUdf(): Boolean = false - def generateHdfsConfForLibhdfs(): Boolean = false - def needPreComputeRangeFrameBoundary(): Boolean = false } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala index e4f5cbdc90b8..942058cc54d9 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala @@ -16,28 +16,14 @@ */ package org.apache.gluten.backendsapi -import java.util.ServiceLoader - -import scala.collection.JavaConverters +import org.apache.gluten.backend.Backend object BackendsApiManager { - - private lazy val backend: Backend = initializeInternal() + private lazy val backend: SubstraitBackend = initializeInternal() /** Initialize all backends api. */ - private def initializeInternal(): Backend = { - val discoveredBackends = - JavaConverters.iterableAsScalaIterable(ServiceLoader.load(classOf[Backend])).toSeq - if (discoveredBackends.isEmpty) { - throw new IllegalStateException("Backend implementation not discovered from JVM classpath") - } - if (discoveredBackends.size != 1) { - throw new IllegalStateException( - s"More than one Backend implementation discovered from JVM classpath: " + - s"${discoveredBackends.map(_.name()).toList}") - } - val backend = discoveredBackends.head - backend + private def initializeInternal(): SubstraitBackend = { + Backend.get().asInstanceOf[SubstraitBackend] } /** @@ -55,10 +41,6 @@ object BackendsApiManager { backend.name() } - def getBuildInfo: BackendBuildInfo = { - backend.buildInfo() - } - def getListenerApiInstance: ListenerApi = { backend.listenerApi() } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index a55926d76d12..77a983718d85 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -19,7 +19,6 @@ package org.apache.gluten.backendsapi import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.expression._ -import org.apache.gluten.extension.columnar.transition.{Convention, ConventionFunc} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode} @@ -52,16 +51,6 @@ import scala.collection.JavaConverters._ trait SparkPlanExecApi { - /** The columnar-batch type this backend is using. */ - def batchType: Convention.BatchType - - /** - * Overrides [[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is using to - * determine the convention (its row-based processing / columnar-batch processing support) of a - * plan with a user-defined function that accepts a plan then returns batch type it outputs. - */ - def batchTypeFunc(): ConventionFunc.BatchOverride = PartialFunction.empty - /** * Generate FilterExecTransformer. * diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/Backend.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala similarity index 59% rename from gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/Backend.scala rename to gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala index 3a597552207b..d7785663d577 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/Backend.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala @@ -16,30 +16,34 @@ */ package org.apache.gluten.backendsapi -trait Backend { - def name(): String - - def buildInfo(): BackendBuildInfo - +import org.apache.gluten.backend.Backend +import org.apache.gluten.extension.injector.RuleInjector + +import org.apache.spark.SparkContext +import org.apache.spark.api.plugin.PluginContext + +trait SubstraitBackend extends Backend { + final override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = { + listenerApi().onDriverStart(sc, pc) + } + final override def onDriverShutdown(): Unit = { + listenerApi().onDriverShutdown() + } + final override def onExecutorStart(pc: PluginContext): Unit = { + listenerApi().onExecutorStart(pc) + } + final override def onExecutorShutdown(): Unit = { + listenerApi().onExecutorShutdown() + } + final override def injectRules(injector: RuleInjector): Unit = { + ruleApi().injectRules(injector) + } def iteratorApi(): IteratorApi - def sparkPlanExecApi(): SparkPlanExecApi - def transformerApi(): TransformerApi - def validatorApi(): ValidatorApi - def metricsApi(): MetricsApi - def listenerApi(): ListenerApi - def ruleApi(): RuleApi - def settings(): BackendSettingsApi } - -case class BackendBuildInfo( - backend: String, - backendBranch: String, - backendRevision: String, - backendRevisionTime: String) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala index 0c70e1ea7a7b..14d038b15ed0 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala @@ -17,15 +17,16 @@ package org.apache.gluten.extension import org.apache.gluten.GlutenConfig +import org.apache.gluten.backend.Backend import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.expression.TransformerState import org.apache.gluten.extension.columnar.transition.Convention +import org.apache.gluten.logging.LogLevelUtil import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.plan.PlanBuilder import org.apache.gluten.substrait.rel.RelNode import org.apache.gluten.test.TestStats -import org.apache.gluten.utils.LogLevelUtil import org.apache.spark.sql.execution.SparkPlan @@ -108,7 +109,7 @@ trait GlutenPlan extends SparkPlan with Convention.KnownBatchType with LogLevelU } protected def batchType0(): Convention.BatchType = { - BackendsApiManager.getSparkPlanExecApiInstance.batchType + Backend.get().batchType } protected def doValidateInternal(): ValidationResult = ValidationResult.succeeded diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala index 29e1caae74ff..9c0ddac16b76 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala @@ -18,12 +18,10 @@ package org.apache.gluten.extension.columnar import org.apache.gluten.GlutenConfig import org.apache.gluten.extension.GlutenPlan +import org.apache.gluten.extension.columnar.heuristic.FallbackNode import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, RowToColumnarLike, Transitions} import org.apache.gluten.utils.PlanUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} @@ -275,9 +273,3 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, originalPlan: SparkP def DO_NOT_FALLBACK(): FallbackInfo = FallbackInfo() } } - -/** A wrapper to specify the plan is fallback plan, the caller side should unwrap it. */ -case class FallbackNode(fallbackPlan: SparkPlan) extends LeafExecNode { - override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() - override def output: Seq[Attribute] = fallbackPlan.output -} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala index 3bfa611ed531..69b74f3a68c8 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala @@ -17,7 +17,8 @@ package org.apache.gluten.extension.columnar import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, Transitions} -import org.apache.gluten.utils.{LogLevelUtil, PlanUtil} +import org.apache.gluten.logging.LogLevelUtil +import org.apache.gluten.utils.PlanUtil import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.Expression diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index cdc71f4478f0..a117b83b7ae0 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -21,8 +21,9 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.extension.GlutenPlan +import org.apache.gluten.logging.LogLevelUtil import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.gluten.utils.{LogLevelUtil, PlanUtil} +import org.apache.gluten.utils.PlanUtil import org.apache.spark.api.python.EvalPythonExecTransformer import org.apache.spark.internal.Logging diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala index 007f18fca40b..3737dd4af2f4 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala @@ -19,12 +19,12 @@ package org.apache.gluten.extension.columnar.enumerated import org.apache.gluten.extension.columnar.{OffloadExchange, OffloadJoin, OffloadOthers} import org.apache.gluten.extension.columnar.transition.ConventionReq import org.apache.gluten.extension.columnar.validator.{Validator, Validators} +import org.apache.gluten.logging.LogLevelUtil import org.apache.gluten.planner.GlutenOptimization import org.apache.gluten.planner.cost.GlutenCostModel import org.apache.gluten.planner.property.Conv import org.apache.gluten.ras.property.PropertySet import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.gluten.utils.LogLevelUtil import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.rules.Rule diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala index fe7eb5566378..e72677ebf53c 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.utils -import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.backend.Backend import org.apache.gluten.extension.columnar.transition.Convention import org.apache.spark.sql.execution._ @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec object PlanUtil { private def isGlutenTableCacheInternal(i: InMemoryTableScanExec): Boolean = { - Convention.get(i).batchType == BackendsApiManager.getSparkPlanExecApiInstance.batchType + Convention.get(i).batchType == Backend.get().batchType } def isGlutenTableCache(plan: SparkPlan): Boolean = { @@ -44,6 +44,6 @@ object PlanUtil { } def isGlutenColumnarOp(plan: SparkPlan): Boolean = { - Convention.get(plan).batchType == BackendsApiManager.getSparkPlanExecApiInstance.batchType + Convention.get(plan).batchType == Backend.get().batchType } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala index 2cb077d50a24..cd063ce31ac5 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala @@ -18,6 +18,7 @@ package org.apache.gluten.utils import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.extension.columnar.ColumnarRuleApplier.SkipCondition import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -26,6 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan object PhysicalPlanSelector extends QueryPlanSelector[SparkPlan] { + val skipCond: SkipCondition = (session: SparkSession, plan: SparkPlan) => + !shouldUseGluten(session, plan) + override protected def validate(plan: SparkPlan): Boolean = { BackendsApiManager.getValidatorApiInstance.doSparkPlanValidate(plan) } @@ -55,7 +59,7 @@ abstract class QueryPlanSelector[T <: QueryPlan[_]] extends Logging { protected def validate(plan: T): Boolean - private[this] def shouldUseGluten(session: SparkSession, plan: T): Boolean = { + def shouldUseGluten(session: SparkSession, plan: T): Boolean = { val glutenEnabled = session.conf .get(GlutenConfig.GLUTEN_ENABLE_KEY, GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString) .toBoolean && isGlutenEnabledForCurrentThread(session) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala index e5925e3ac4d0..c57df192c5da 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.GlutenConfig +import org.apache.gluten.backend.Backend import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution._ import org.apache.gluten.extension.columnar.transition.Convention @@ -165,7 +166,7 @@ case class ColumnarInputAdapter(child: SparkPlan) override def output: Seq[Attribute] = child.output override def supportsColumnar: Boolean = true override def batchType(): Convention.BatchType = - BackendsApiManager.getSparkPlanExecApiInstance.batchType + Backend.get().batchType override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() override protected def doExecuteColumnar(): RDD[ColumnarBatch] = child.executeColumnar() override def outputPartitioning: Partitioning = child.outputPartitioning diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala index 67ecf81b9ffd..f6e23e7cff67 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala @@ -20,7 +20,7 @@ import org.apache.gluten.GlutenConfig import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.gluten.extension.GlutenPlan import org.apache.gluten.extension.columnar.FallbackTags -import org.apache.gluten.utils.LogLevelUtil +import org.apache.gluten.logging.LogLevelUtil import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.rules.Rule diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala similarity index 100% rename from gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala rename to gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala diff --git a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala index ea3e50e81282..9337317d961d 100644 --- a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala +++ b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.softaffinity import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.GlutenPartition import org.apache.gluten.softaffinity.SoftAffinityManager -import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanBuilder diff --git a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala index 55f25309dc5e..c91df0b6d8ac 100644 --- a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala +++ b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala @@ -19,12 +19,11 @@ package org.apache.spark.softaffinity import org.apache.gluten.GlutenConfig import org.apache.gluten.execution.GlutenPartition import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager} -import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanBuilder import org.apache.spark.SparkConf -import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, StageInfo, TaskInfo, TaskLocality} +import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.InternalRow diff --git a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala index 2d61f0c4b366..245c678feb67 100644 --- a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala +++ b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala @@ -26,12 +26,6 @@ object GlutenEventUtils { sc.listenerBus.post(event) } - def registerListener(sc: SparkContext): Unit = { - val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore] - val listener = new GlutenSQLAppStatusListener(sc.conf, kvStore) - sc.listenerBus.addToStatusQueue(listener) - } - def attachUI(sc: SparkContext): Unit = { val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore] val statusStore = new GlutenSQLAppStatusStore(kvStore) diff --git a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala index 7c236b4e8881..b9a590f67831 100644 --- a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala +++ b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.ui import org.apache.gluten.events.{GlutenBuildInfoEvent, GlutenPlanFallbackEvent} -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.sql.internal.StaticSQLConf._ @@ -102,3 +102,11 @@ class GlutenSQLAppStatusListener(conf: SparkConf, kvstore: ElementTrackingStore) toDelete.foreach(e => kvstore.delete(e.getClass(), e.executionId)) } } + +object GlutenSQLAppStatusListener { + def register(sc: SparkContext): Unit = { + val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore] + val listener = new GlutenSQLAppStatusListener(sc.conf, kvStore) + sc.listenerBus.addToStatusQueue(listener) + } +} diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index 5d171a36bdd4..82d37b8ca176 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -24,7 +24,7 @@ import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions -import org.apache.gluten.utils.QueryPlanSelector +import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession} @@ -165,6 +165,7 @@ private object FallbackStrategiesSuite { transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = { new HeuristicApplier( spark, + Seq(PhysicalPlanSelector.skipCond), transformBuilders, List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())), List( diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala index 2ca7429f1679..b3b8483c3b29 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.extension -import org.apache.gluten.extension.ColumnarOverrideRules +import org.apache.gluten.extension.GlutenColumnarRule import org.apache.gluten.utils.BackendTestUtils import org.apache.spark.SparkConf @@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait { } testGluten("test gluten extensions") { - assert( - spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules])) + assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule])) assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark))) assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark))) diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index a4da5c127c5f..866c16d52fc1 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -24,7 +24,7 @@ import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions -import org.apache.gluten.utils.QueryPlanSelector +import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession} @@ -176,6 +176,7 @@ private object FallbackStrategiesSuite { transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = { new HeuristicApplier( spark, + Seq(PhysicalPlanSelector.skipCond), transformBuilders, List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())), List( diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala index 2ca7429f1679..b3b8483c3b29 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.extension -import org.apache.gluten.extension.ColumnarOverrideRules +import org.apache.gluten.extension.GlutenColumnarRule import org.apache.gluten.utils.BackendTestUtils import org.apache.spark.SparkConf @@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait { } testGluten("test gluten extensions") { - assert( - spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules])) + assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule])) assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark))) assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark))) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index a4da5c127c5f..866c16d52fc1 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -24,7 +24,7 @@ import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions -import org.apache.gluten.utils.QueryPlanSelector +import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession} @@ -176,6 +176,7 @@ private object FallbackStrategiesSuite { transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = { new HeuristicApplier( spark, + Seq(PhysicalPlanSelector.skipCond), transformBuilders, List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())), List( diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala index 2ca7429f1679..b3b8483c3b29 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.extension -import org.apache.gluten.extension.ColumnarOverrideRules +import org.apache.gluten.extension.GlutenColumnarRule import org.apache.gluten.utils.BackendTestUtils import org.apache.spark.SparkConf @@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait { } testGluten("test gluten extensions") { - assert( - spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules])) + assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule])) assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark))) assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark))) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala index bbdeebfe6a13..6318c0e06b37 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala @@ -24,7 +24,7 @@ import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil import org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier import org.apache.gluten.extension.columnar.transition.InsertTransitions -import org.apache.gluten.utils.QueryPlanSelector +import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession} @@ -177,6 +177,7 @@ private object FallbackStrategiesSuite { transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = { new HeuristicApplier( spark, + Seq(PhysicalPlanSelector.skipCond), transformBuilders, List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(), c.ac.originalPlan())), List( diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala index 2ca7429f1679..b3b8483c3b29 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.extension -import org.apache.gluten.extension.ColumnarOverrideRules +import org.apache.gluten.extension.GlutenColumnarRule import org.apache.gluten.utils.BackendTestUtils import org.apache.spark.SparkConf @@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait { } testGluten("test gluten extensions") { - assert( - spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules])) + assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule])) assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark))) assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark))) diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 927a39332eab..1d3c8e45d7f1 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -406,6 +406,7 @@ class GlutenConfig(conf: SQLConf) extends Logging { def debug: Boolean = conf.getConf(DEBUG_ENABLED) def debugKeepJniWorkspace: Boolean = conf.getConf(DEBUG_KEEP_JNI_WORKSPACE) + def collectUtStats: Boolean = conf.getConf(UT_STATISTIC) def taskStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID) def taskPartitionId: Int = conf.getConf(BENCHMARK_TASK_PARTITIONID) def taskId: Long = conf.getConf(BENCHMARK_TASK_TASK_ID) @@ -835,7 +836,7 @@ object GlutenConfig { .createWithDefault(true) val VANILLA_VECTORIZED_READERS_ENABLED = - buildConf("spark.gluten.sql.columnar.enableVanillaVectorizedReaders") + buildStaticConf("spark.gluten.sql.columnar.enableVanillaVectorizedReaders") .internal() .doc("Enable or disable vanilla vectorized scan.") .booleanConf @@ -1622,6 +1623,12 @@ object GlutenConfig { .stringConf .createWithDefault("/tmp") + val UT_STATISTIC = + buildStaticConf("spark.gluten.sql.ut.statistic") + .internal() + .booleanConf + .createWithDefault(false) + val BENCHMARK_TASK_STAGEID = buildConf("spark.gluten.sql.benchmark_task.stageId") .internal() @@ -1680,12 +1687,6 @@ object GlutenConfig { .booleanConf .createWithDefault(true) - val UT_STATISTIC = - buildConf("spark.gluten.sql.ut.statistic") - .internal() - .booleanConf - .createWithDefault(false) - // FIXME: This only works with CH backend. val EXTENDED_COLUMNAR_TRANSFORM_RULES = buildConf("spark.gluten.sql.columnar.extended.columnar.transform.rules")