Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
jboolean preferEvict,
jlong allocatorId,
jboolean writeSchema,
jboolean writeEOS,
jlong firstBatchHandle,
jlong taskAttemptId,
jint pushBufferMaxSize,
Expand Down Expand Up @@ -816,6 +817,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
}

shuffleWriterOptions.write_schema = writeSchema;
shuffleWriterOptions.write_eos = writeEOS;
shuffleWriterOptions.prefer_evict = preferEvict;

if (numSubDirs > 0) {
Expand Down
8 changes: 6 additions & 2 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class PreferEvictPartitionWriter::LocalPartitionWriterInstance {
}

RETURN_NOT_OK(writeRecordBatchPayload(dataFileOs.get()));
RETURN_NOT_OK(writeEos(dataFileOs.get()));
if (shuffleWriter_->options().write_eos) {
RETURN_NOT_OK(writeEos(dataFileOs.get()));
}
clearCache();

ARROW_ASSIGN_OR_RAISE(auto after_write, dataFileOs->Tell());
Expand Down Expand Up @@ -380,7 +382,9 @@ arrow::Status PreferCachePartitionWriter::stop() {
}
// 8. Write EOS if any payload written.
if (!firstWrite) {
RETURN_NOT_OK(writeEos(dataFileOs_.get()));
if (shuffleWriter_->options().write_eos) {
RETURN_NOT_OK(writeEos(dataFileOs_.get()));
}
}
ARROW_ASSIGN_OR_RAISE(auto endInFinalFile, dataFileOs_->Tell());

Expand Down
1 change: 1 addition & 0 deletions cpp/core/shuffle/ShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct ShuffleWriterOptions {
bool prefer_evict = true;
bool write_schema = false;
bool buffered_write = false;
bool write_eos = true;

std::string data_file;
std::string partition_writer_type = "local";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public long make(NativePartitioning part, long offheapPerTask, int bufferSize,
int bufferCompressThreshold, String dataFile,
int subDirsPerLocalDir, String localDirs,
boolean preferEvict, long memoryPoolId, boolean writeSchema,
long handle, long taskAttemptId) {
boolean writeEOS, long handle, long taskAttemptId) {
return nativeMake(part.getShortName(), part.getNumPartitions(),
offheapPerTask, bufferSize, codec, codecBackend,
bufferCompressThreshold, dataFile, subDirsPerLocalDir,
localDirs, preferEvict, memoryPoolId, writeSchema, handle,
taskAttemptId, 0, null, "local");
localDirs, preferEvict, memoryPoolId, writeSchema, writeEOS,
handle, taskAttemptId, 0, null, "local");
}

/**
Expand All @@ -70,7 +70,7 @@ public long makeForRSS(NativePartitioning part, long offheapPerTask,
return nativeMake(part.getShortName(), part.getNumPartitions(),
offheapPerTask, bufferSize, codec, null,
bufferCompressThreshold, null, 0, null, true,
memoryPoolId, false, handle, taskAttemptId,
memoryPoolId, false, true, handle, taskAttemptId,
pushBufferMaxSize, pusher, partitionWriterType);
}

Expand All @@ -80,8 +80,8 @@ public native long nativeMake(String shortName, int numPartitions,
int bufferCompressThreshold, String dataFile,
int subDirsPerLocalDir, String localDirs,
boolean preferEvict, long memoryPoolId,
boolean writeSchema, long handle,
long taskAttemptId, int pushBufferMaxSize,
boolean writeSchema, boolean writeEOS,
long handle, long taskAttemptId, int pushBufferMaxSize,
Object pusher, String partitionWriterType);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package io.glutenproject.vectorized

import io.glutenproject.GlutenConfig

import java.io._
import java.nio.ByteBuffer

Expand Down Expand Up @@ -50,10 +52,17 @@ class ColumnarBatchSerializer(
extends Serializer
with Serializable {

// if don't write schema and EOS in shuffle writer, then the erializer supports relocation
private val supportsRelocation =
!GlutenConfig.getConf.columnarShuffleWriteSchema &&
!GlutenConfig.getConf.columnarShuffleWriteEOS

/** Creates a new [[SerializerInstance]]. */
override def newInstance(): SerializerInstance = {
new ColumnarBatchSerializerInstance(schema, readBatchNumRows, numOutputRows, decompressTime)
}

override def supportsRelocationOfSerializedObjects: Boolean = supportsRelocation
}

private class ColumnarBatchSerializerInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class ColumnarShuffleWriter[K, V](

private val writeSchema = GlutenConfig.getConf.columnarShuffleWriteSchema

private val writeEOS = GlutenConfig.getConf.columnarShuffleWriteEOS

private val jniWrapper = new ShuffleWriterJniWrapper

private var nativeShuffleWriter: Long = -1L
Expand Down Expand Up @@ -152,6 +154,7 @@ class ColumnarShuffleWriter[K, V](
})
.getNativeInstanceId,
writeSchema,
writeEOS,
handle,
taskContext.taskAttemptId()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def columnarShuffleWriteSchema: Boolean = conf.getConf(COLUMNAR_SHUFFLE_WRITE_SCHEMA_ENABLED)

def columnarShuffleWriteEOS: Boolean = conf.getConf(COLUMNAR_SHUFFLE_WRITE_EOS_ENABLED)

def columnarShuffleCodec: Option[String] = conf.getConf(COLUMNAR_SHUFFLE_CODEC)

def columnarShuffleCodecBackend: Option[String] = conf
Expand Down Expand Up @@ -705,6 +707,12 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)

val COLUMNAR_SHUFFLE_WRITE_EOS_ENABLED =
buildConf("spark.gluten.sql.columnar.shuffle.writeEOS")
.internal()
.booleanConf
.createWithDefault(true)

val COLUMNAR_SHUFFLE_CODEC =
buildConf("spark.gluten.sql.columnar.shuffle.codec")
.internal()
Expand Down