Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public boolean isCompressed() {

@Override
public void close() {
GlutenException.wrap(
() -> {
channel.close();
in.close();
return null;
});
try {
channel.close();
in.close();
} catch (Exception e) {
throw new GlutenException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ public boolean isCompressed() {

@Override
public void close() {
GlutenException.wrap(
() -> {
in.close();
return null;
});
try {
in.close();
} catch (Exception e) {
throw new GlutenException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,26 @@ public OnHeapCopyShuffleInputStream(InputStream in, boolean isCompressed) {

@Override
public long read(long destAddress, long maxReadSize) {
return GlutenException.wrap(
() -> {
int maxReadSize32 = Math.toIntExact(maxReadSize);
if (buffer == null || maxReadSize32 > buffer.length) {
this.buffer = new byte[maxReadSize32];
}
// The code conducts copy as long as 'in' wraps off-heap data,
// which is about to be moved to heap
int read = in.read(buffer, 0, maxReadSize32);
if (read == -1 || read == 0) {
return 0;
}
// The code conducts copy, from heap to off-heap
// memCopyFromHeap(buffer, destAddress, read);
PlatformDependent.copyMemory(buffer, 0, destAddress, read);
bytesRead += read;
return read;
});
try {

int maxReadSize32 = Math.toIntExact(maxReadSize);
if (buffer == null || maxReadSize32 > buffer.length) {
this.buffer = new byte[maxReadSize32];
}
// The code conducts copy as long as 'in' wraps off-heap data,
// which is about to be moved to heap
int read = in.read(buffer, 0, maxReadSize32);
if (read == -1 || read == 0) {
return 0;
}
// The code conducts copy, from heap to off-heap
// memCopyFromHeap(buffer, destAddress, read);
PlatformDependent.copyMemory(buffer, 0, destAddress, read);
bytesRead += read;
return read;
} catch (Exception e) {
throw new GlutenException(e);
}
}

@Override
Expand All @@ -69,11 +71,11 @@ public boolean isCompressed() {

@Override
public void close() {
GlutenException.wrap(
() -> {
in.close();
in = null;
return null;
});
try {
in.close();
in = null;
} catch (Exception e) {
throw new GlutenException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.{CH_BRANCH, CH_COMMIT, GlutenConfig}
import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi._
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._

Expand All @@ -43,10 +45,11 @@ import java.util.Locale

import scala.util.control.Breaks.{break, breakable}

class CHBackend extends Backend {
class CHBackend extends SubstraitBackend {
override def name(): String = CHBackend.BACKEND_NAME
override def buildInfo(): BackendBuildInfo =
BackendBuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN")
override def batchType: Convention.BatchType = CHBatch
override def buildInfo(): Backend.BuildInfo =
Backend.BuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN")
override def iteratorApi(): IteratorApi = new CHIteratorApi
override def sparkPlanExecApi(): SparkPlanExecApi = new CHSparkPlanExecApi
override def transformerApi(): TransformerApi = new CHTransformerApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import org.apache.gluten.GlutenNumaBindingInfo
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.memory.CHThreadGroup
import org.apache.gluten.metrics.{IMetrics, NativeMetrics}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils.LogLevelUtil
import org.apache.gluten.vectorized.{BatchIterator, CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, GeneralInIterator}

import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.backendsapi.MetricsApi
import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.metrics._
import org.apache.gluten.substrait.{AggregationParams, JoinParams}
import org.apache.gluten.utils.LogLevelUtil

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlParser}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.PhysicalPlanSelector

import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, EqualToRewrite}
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter}
Expand All @@ -34,6 +35,8 @@ import org.apache.spark.util.SparkPlanRules
class CHRuleApi extends RuleApi {
import CHRuleApi._
override def injectRules(injector: RuleInjector): Unit = {
injector.gluten.skipOn(PhysicalPlanSelector.skipCond)

injectSpark(injector.spark)
injectLegacy(injector.gluten.legacy)
injectRas(injector.gluten.ras)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.gluten.expression._
import org.apache.gluten.extension.ExpressionExtensionTrait
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
Expand Down Expand Up @@ -66,9 +65,6 @@ import scala.collection.mutable.ArrayBuffer

class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {

/** The columnar-batch type this backend is using. */
override def batchType: Convention.BatchType = CHBatch

/** Transform GetArrayItem to Substrait. */
override def genGetArrayItemTransformer(
substraitExprName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.apache.gluten.backendsapi.velox

import org.apache.gluten.{GlutenConfig, VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME}
import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi._
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat}
Expand All @@ -32,6 +35,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, De
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.execution.ColumnarCachedBatchSerializer
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand All @@ -43,10 +48,17 @@ import org.apache.hadoop.fs.Path

import scala.util.control.Breaks.breakable

class VeloxBackend extends Backend {
class VeloxBackend extends SubstraitBackend {
override def name(): String = VeloxBackend.BACKEND_NAME
override def buildInfo(): BackendBuildInfo =
BackendBuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME)
override def batchType: Convention.BatchType = VeloxBatch
override def batchTypeFunc(): BatchOverride = {
case i: InMemoryTableScanExec
if i.supportsColumnar && i.relation.cacheBuilder.serializer
.isInstanceOf[ColumnarCachedBatchSerializer] =>
VeloxBatch
}
override def buildInfo(): Backend.BuildInfo =
Backend.BuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME)
override def iteratorApi(): IteratorApi = new VeloxIteratorApi
override def sparkPlanExecApi(): SparkPlanExecApi = new VeloxSparkPlanExecApi
override def transformerApi(): TransformerApi = new VeloxTransformerApi
Expand Down Expand Up @@ -454,7 +466,5 @@ object VeloxBackendSettings extends BackendSettingsApi {

override def supportColumnarArrowUdf(): Boolean = true

override def generateHdfsConfForLibhdfs(): Boolean = true

override def needPreComputeRangeFrameBoundary(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.gluten.jni.{JniLibLoader, JniWorkspace}
import org.apache.gluten.udf.UdfJniWrapper
import org.apache.gluten.utils._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{HdfsConfGenerator, SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
Expand All @@ -44,6 +44,9 @@ class VeloxListenerApi extends ListenerApi with Logging {
override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
val conf = pc.conf()

// Generate HDFS client configurations.
HdfsConfGenerator.addHdfsClientToSparkWorkDirectory(sc)

// Overhead memory limits.
val offHeapSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
val desiredOverheadSize = (0.1 * offHeapSize).toLong.max(ByteUnit.MiB.toBytes(384))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ import org.apache.gluten.extension.columnar.transition.{InsertTransitions, Remov
import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.PhysicalPlanSelector

import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter}

class VeloxRuleApi extends RuleApi {
import VeloxRuleApi._

override def injectRules(injector: RuleInjector): Unit = {
injector.gluten.skipOn(PhysicalPlanSelector.skipCond)

injectSpark(injector.spark)
injectLegacy(injector.gluten.legacy)
injectRas(injector.gluten.ras)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.expression.aggregate.{HLLAdapter, VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.vectorized.{ColumnarBatchSerializer, ColumnarBatchSerializeResult}

Expand All @@ -42,7 +40,6 @@ import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode}
Expand All @@ -61,23 +58,6 @@ import javax.ws.rs.core.UriBuilder

class VeloxSparkPlanExecApi extends SparkPlanExecApi {

/** The columnar-batch type this backend is using. */
override def batchType: Convention.BatchType = {
VeloxBatch
}

/**
* Overrides [[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is using to
* determine the convention (its row-based processing / columnar-batch processing support) of a
* plan with a user-defined function that accepts a plan then returns batch type it outputs.
*/
override def batchTypeFunc(): BatchOverride = {
case i: InMemoryTableScanExec
if i.supportsColumnar && i.relation.cacheBuilder.serializer
.isInstanceOf[ColumnarCachedBatchSerializer] =>
VeloxBatch
}

/** Transform GetArrayItem to Substrait. */
override def genGetArrayItemTransformer(
substraitExprName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,26 @@

import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.plugin.PluginContext;
import org.apache.spark.resource.ResourceInformation;
import org.jetbrains.annotations.NotNull;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.Map;

/** For testing Velox backend without starting a Spark context. */
public abstract class VeloxBackendTestBase {
private static final ListenerApi API = new VeloxListenerApi();

@BeforeClass
public static void setup() {
final ListenerApi api = new VeloxListenerApi();
api.onDriverStart(mockSparkContext(), mockPluginContext());
API.onExecutorStart(mockPluginContext());
}

private static SparkContext mockSparkContext() {
// Not yet implemented.
return null;
@AfterClass
public static void tearDown() {
API.onExecutorShutdown();
}

private static PluginContext mockPluginContext() {
Expand All @@ -52,9 +53,7 @@ public MetricRegistry metricRegistry() {

@Override
public SparkConf conf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
return conf;
return newSparkConf();
}

@Override
Expand Down Expand Up @@ -83,4 +82,11 @@ public Object ask(Object message) throws Exception {
}
};
}

@NotNull
private static SparkConf newSparkConf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
return conf;
}
}
5 changes: 5 additions & 0 deletions gluten-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-ui</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Prevent our dummy JAR from being included in Spark distributions or uploaded to YARN -->
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Loading