Skip to content

Commit d0ae3f3

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-2650][SQL] Try to partially fix SPARK-2650 by adjusting initial buffer size and reducing memory allocation
JIRA issue: [SPARK-2650](https://issues.apache.org/jira/browse/SPARK-2650) Please refer to [comments](https://issues.apache.org/jira/browse/SPARK-2650?focusedCommentId=14084397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14084397) of SPARK-2650 for some other details. This PR adjusts the initial in-memory columnar buffer size to 1MB, same as the default value of Shark's `shark.column.partitionSize.mb` property when running in local mode. Will add Shark style partition size estimation in another PR. Also, before this PR, `NullableColumnBuilder` copies the whole buffer to add the null positions section, and then `CompressibleColumnBuilder` copies and compresses the buffer again, even if compression is disabled (`PassThrough` compression scheme is used to disable compression). In this PR the first buffer copy is eliminated to reduce memory consumption. Author: Cheng Lian <[email protected]> Closes #1769 from liancheng/spark-2650 and squashes the following commits: 88a042e [Cheng Lian] Fixed method visibility and removed dead code 001f2e5 [Cheng Lian] Try fixing SPARK-2650 by adjusting initial buffer size and reducing memory allocation
1 parent d94f599 commit d0ae3f3

File tree

5 files changed

+20
-30
lines changed

5 files changed

+20
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
118118
private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)
119119

120120
private[sql] object ColumnBuilder {
121-
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
121+
val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
122122

123123
private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
124124
if (orig.remaining >= size) {

sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ import org.apache.spark.sql.Row
3636
* }}}
3737
*/
3838
private[sql] trait NullableColumnBuilder extends ColumnBuilder {
39-
private var nulls: ByteBuffer = _
39+
protected var nulls: ByteBuffer = _
40+
protected var nullCount: Int = _
4041
private var pos: Int = _
41-
private var nullCount: Int = _
4242

4343
abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
4444
nulls = ByteBuffer.allocate(1024)
@@ -78,4 +78,9 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
7878
buffer.rewind()
7979
buffer
8080
}
81+
82+
protected def buildNonNulls(): ByteBuffer = {
83+
nulls.limit(nulls.position()).rewind()
84+
super.build()
85+
}
8186
}

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
4646

4747
this: NativeColumnBuilder[T] with WithCompressionSchemes =>
4848

49-
import CompressionScheme._
50-
5149
var compressionEncoders: Seq[Encoder[T]] = _
5250

5351
abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
@@ -81,28 +79,32 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
8179
}
8280
}
8381

84-
abstract override def build() = {
85-
val rawBuffer = super.build()
82+
override def build() = {
83+
val nonNullBuffer = buildNonNulls()
84+
val typeId = nonNullBuffer.getInt()
8685
val encoder: Encoder[T] = {
8786
val candidate = compressionEncoders.minBy(_.compressionRatio)
8887
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
8988
}
9089

91-
val headerSize = columnHeaderSize(rawBuffer)
90+
// Header = column type ID + null count + null positions
91+
val headerSize = 4 + 4 + nulls.limit()
9292
val compressedSize = if (encoder.compressedSize == 0) {
93-
rawBuffer.limit - headerSize
93+
nonNullBuffer.remaining()
9494
} else {
9595
encoder.compressedSize
9696
}
9797

98-
// Reserves 4 bytes for compression scheme ID
9998
val compressedBuffer = ByteBuffer
99+
// Reserves 4 bytes for compression scheme ID
100100
.allocate(headerSize + 4 + compressedSize)
101101
.order(ByteOrder.nativeOrder)
102-
103-
copyColumnHeader(rawBuffer, compressedBuffer)
102+
// Write the header
103+
.putInt(typeId)
104+
.putInt(nullCount)
105+
.put(nulls)
104106

105107
logInfo(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}")
106-
encoder.compress(rawBuffer, compressedBuffer, columnType)
108+
encoder.compress(nonNullBuffer, compressedBuffer, columnType)
107109
}
108110
}

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,22 +67,6 @@ private[sql] object CompressionScheme {
6767
s"Unrecognized compression scheme type ID: $typeId"))
6868
}
6969

70-
def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {
71-
// Writes column type ID
72-
to.putInt(from.getInt())
73-
74-
// Writes null count
75-
val nullCount = from.getInt()
76-
to.putInt(nullCount)
77-
78-
// Writes null positions
79-
var i = 0
80-
while (i < nullCount) {
81-
to.putInt(from.getInt())
82-
i += 1
83-
}
84-
}
85-
8670
def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
8771
val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
8872
val nullCount = header.getInt(4)

sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,3 @@ object TestCompressibleColumnBuilder {
4242
builder
4343
}
4444
}
45-

0 commit comments

Comments
 (0)