Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
7c7da7b
Adds handling for java.lang.String on getUTF8String
Aug 29, 2015
78f6586
Fixes whitespace and wording
Aug 29, 2015
aa8d361
Fixes getUTF8String to support off-heap backed memory
Aug 29, 2015
07b78b8
Adds project boilerplate
Aug 29, 2015
5a81c24
Gets to working example, fixes bug in UnsafeRow.getUTF8String
Aug 29, 2015
bb1e6bf
Makes tests work and cleans up code
Aug 30, 2015
5f0000a
Adds tungsten caching
Aug 30, 2015
9996689
Moves fix into UFT8String.fromAddress
Aug 31, 2015
d940380
Adds handle to cachedRDD in return for uncaching
Sep 1, 2015
0a6e5b2
Removes private access modifier for dogfood testing
Sep 1, 2015
0b065af
Adds toString
Sep 1, 2015
eab7ff1
Changes tungstenCache to store in contiguous block
Sep 2, 2015
b7972e0
Makes buildScan only scan until end of buffer
Sep 2, 2015
709830a
Cleans up rebase and imports
Sep 2, 2015
c010317
Implements prototype for compressed tungstenCache
Sep 3, 2015
ee216cf
Fixes serialization problems and adds padding for allocating compress…
Sep 3, 2015
85e64de
Fixes decompression of compressed tungsten caches
Sep 3, 2015
2012231
Removes extra copy during decompression
Sep 3, 2015
6a7b50d
Adds tests back and creates helper fromByteArray
Sep 3, 2015
2ad0379
Changes buildScan to use uncompressed size
Sep 4, 2015
6d239b0
Adds and fixes tests for multiblock compression
Sep 4, 2015
2457eb7
Adds action for triggering lazy caching
Sep 8, 2015
a1c8b85
Fixes compression reading error
Sep 10, 2015
b42d0a1
Merge remote-tracking branch 'apache/master' into tungsten-cache
Sep 10, 2015
6679449
Fixes compressed reading
Sep 10, 2015
7e30964
Uses DataInputStream.readFully
Sep 10, 2015
d6df496
Merge remote-tracking branch 'origin/master' into tungsten-cache
JoshRosen Sep 10, 2015
b048d6d
Back out two unnecessary changes.
JoshRosen Sep 11, 2015
d18f533
Reduce duplication in test code.
JoshRosen Sep 11, 2015
a5e2c3e
Use Guava ByteStreams.readFully().
JoshRosen Sep 11, 2015
15acd17
Reduce scope of some variables when compressing block.
JoshRosen Sep 11, 2015
18b9e68
Prefer .length to .size for arrays.
JoshRosen Sep 11, 2015
f584d8c
Move TungstenCache code into own file; avoid unnecesary row format co…
JoshRosen Sep 11, 2015
a766d19
Remove old debug code.
JoshRosen Sep 11, 2015
d37efa4
Reduce object creation and buffering when scanning cached tables.
JoshRosen Sep 11, 2015
898f9b6
Avoid Option.toString().
JoshRosen Sep 11, 2015
95b770f
Add comment to explain buffered iterator.
JoshRosen Sep 11, 2015
0a68e00
Move compression code to helper methods.
JoshRosen Sep 11, 2015
ac09110
Add benchmarking harness.
JoshRosen Sep 11, 2015
3523065
Fix code formatting issue.
JoshRosen Sep 11, 2015
7656024
Test against existing columnar cache.
JoshRosen Sep 11, 2015
492baff
Remove unused method.
JoshRosen Sep 11, 2015
1f5be7d
More benchmarking harness improvements.
JoshRosen Sep 11, 2015
7279a15
Make benchmarking harness inputs pluggable.
JoshRosen Sep 11, 2015
bbc6d7a
Add simple test with nested data.
JoshRosen Sep 11, 2015
5d36d89
Merge in Feynman's change for writing end of block marker.
JoshRosen Sep 11, 2015
1299ed9
Add comment.
JoshRosen Sep 11, 2015
61b5f3d
More updates for terminator refactoring.
JoshRosen Sep 11, 2015
3ba690c
Test for correctness as well.
JoshRosen Sep 12, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,9 @@ public String toString() {
StringBuilder build = new StringBuilder("[");
for (int i = 0; i < sizeInBytes; i += 8) {
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
build.append(',');
if (i <= sizeInBytes - 1) {
build.append(',');
}
}
build.append(']');
return build.toString();
Expand Down
30 changes: 28 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ package org.apache.spark.sql
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.annotation.Experimental
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.execution.TungstenCache
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -110,4 +112,28 @@ private[sql] abstract class SQLImplicits {
DataFrameHolder(
_sqlContext.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)))
}

/**
* ::Experimental::
*
* Pimp my library decorator for Tungsten caching of DataFrames.
* @since 1.6.0
*/
@Experimental
implicit class TungstenCacheImplicits(df: DataFrame) {
/**
* Packs the rows of [[df]] into contiguous blocks of memory.
* @param compressionType "" (default), "lz4", "lzf", or "snappy", see
* [[CompressionCodec.ALL_COMPRESSION_CODECS]]
* @param blockSize size of each MemoryBlock (default = 4 MB)
*/
def tungstenCache(compressionType: String = "", blockSize: Int = 4000000): DataFrame = {
val cached: RDD[InternalRow] =
TungstenCache.cache(
df.queryExecution.sparkPlan,
if (compressionType.isEmpty) None else Some(compressionType),
blockSize)
_sqlContext.internalCreateDataFrame(cached, df.schema)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import com.google.common.io.ByteStreams

import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.{TaskMemoryManager, MemoryBlock}
import org.apache.spark.{TaskContext, SparkEnv}
import org.apache.spark.annotation.Experimental
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow

/**
* ::Experimental::
*
* @since 1.6.0
*/
@Experimental
object TungstenCache {
def cache(
child: SparkPlan,
compressionType: Option[String] = None,
blockSize: Int = 4 * 1000 * 1000): RDD[InternalRow] = {
val END_OF_BLOCK: Int = -1
val numFields = child.schema.length
val childRDD: RDD[UnsafeRow] = {
if (child.outputsUnsafeRows) {
child.execute().asInstanceOf[RDD[UnsafeRow]]
} else {
ConvertToUnsafe(child).execute().asInstanceOf[RDD[UnsafeRow]]
}
}
val cachedRDD: RDD[MemoryBlock] = childRDD.mapPartitions { rowIterator =>
// Note: this buffering is used to let us hold onto rows when rolling across block boundaries.
val bufferedRowIterator = rowIterator.buffered
val taskMemoryManager = TaskContext.get().taskMemoryManager()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what we actually gain by using the TaskMemoryManager here. The TMM would allow us to cache off-heap but it looks like we don't / won't currently do that if we use compression. For the initial version of this task, I'd like to focus on making the basic caching work really well and defer the question of how to integrate offheap caching with block manager to a followup PR.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I take that back: I see that we do actually end up allocating off-heap for the compressed data as well. We still have complexities in how to ensure cleanup when things are evicted, but that's out of scope for perf. benchmarking studies for now.

val compressionCodec: Option[CompressionCodec] =
compressionType.map { t => CompressionCodec.createCodec(SparkEnv.get.conf, t) }
new Iterator[MemoryBlock] {
// NOTE: This assumes that size of every row < blockSize - 4
// TODO(josh): we'll have to figure out how to support large rows.
def next(): MemoryBlock = {
// Packs rows into a `blockSize` bytes contiguous block of memory, starting a new block
// whenever the current fills up.
// Each row is laid out in memory as [rowSize (int)|rowData (rowSize bytes)].
// The end of the block is marked by a special rowSize, END_OF_BLOCK (-1).
val block = taskMemoryManager.allocateUnchecked(blockSize)

var currOffset = 0
while (bufferedRowIterator.hasNext &&
currOffset + 4 + bufferedRowIterator.head.getSizeInBytes < blockSize - 4) {
val currRow = bufferedRowIterator.head
Platform.putInt(
block.getBaseObject, block.getBaseOffset + currOffset, currRow.getSizeInBytes)
currRow.writeToMemory(block.getBaseObject, block.getBaseOffset + currOffset + 4)
bufferedRowIterator.next()
currOffset += 4 + currRow.getSizeInBytes
}
Platform.putInt(block.getBaseObject, block.getBaseOffset + currOffset, END_OF_BLOCK)

compressionCodec match {
case Some(codec) => compressBlock(block, codec, TaskContext.get().taskMemoryManager())
case None => block
}
}

def hasNext: Boolean = bufferedRowIterator.hasNext
}
}
.setName(compressionType.getOrElse("") + "_" + child.nodeName)
.persist(StorageLevel.MEMORY_ONLY)
// TODO(josh): is this the right name?

cachedRDD.mapPartitions { blockIterator =>
val compressionCodec: Option[CompressionCodec] =
compressionType.map { t => CompressionCodec.createCodec(SparkEnv.get.conf, t) }
blockIterator.flatMap { rawBlock =>
val block: MemoryBlock = compressionCodec match {
case Some(codec) => decompressBlock(rawBlock, blockSize, codec)
case None => rawBlock
}

new Iterator[UnsafeRow] {
private[this] val unsafeRow = new UnsafeRow()
private[this] var currOffset: Long = 0
private[this] var _nextRowSize = getNextRowSize()
private[this] def getNextRowSize(): Int = {
if (currOffset >= block.size()) {
-1
} else {
Platform.getInt(block.getBaseObject, block.getBaseOffset + currOffset)
}
}
override def hasNext: Boolean = _nextRowSize != END_OF_BLOCK
override def next(): UnsafeRow = {
assert(_nextRowSize > 0)
currOffset += 4

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should document this is advancing offset over memory for "_nextRowSize"

unsafeRow.pointTo(
block.getBaseObject, block.getBaseOffset + currOffset, numFields, _nextRowSize)
currOffset += _nextRowSize
_nextRowSize = getNextRowSize()
unsafeRow
}
}
}
}
}

private def compressBlock(
memoryBlock: MemoryBlock,
compressionCodec: CompressionCodec,
taskMemoryManager: TaskMemoryManager): MemoryBlock = {
// Compress the block using an on-heap byte array
val compressedBlockArray: Array[Byte] = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also reuse this Array across MemoryBlocks since L162 copies it off-heap?

val blockArray = new Array[Byte](memoryBlock.size().toInt)
Platform.copyMemory(
memoryBlock.getBaseObject,
memoryBlock.getBaseOffset,
blockArray,
Platform.BYTE_ARRAY_OFFSET,
memoryBlock.size())
val baos = new ByteArrayOutputStream(memoryBlock.size().toInt)
val compressedBaos = compressionCodec.compressedOutputStream(baos)
compressedBaos.write(blockArray)
compressedBaos.flush()
compressedBaos.close()
baos.toByteArray
}

// Allocate a new block with compressed byte array padded to word boundary
val totalRecordSize = compressedBlockArray.length + 4 // data + int to store size of padding
val nearestWordBoundary =
ByteArrayMethods.roundNumberOfBytesToNearestWord(totalRecordSize)
val padding = nearestWordBoundary - totalRecordSize
val compressedBlock = taskMemoryManager.allocateUnchecked(totalRecordSize + padding)
Platform.putInt(
compressedBlock.getBaseObject,
compressedBlock.getBaseOffset,
padding)
Platform.copyMemory(
compressedBlockArray,
Platform.BYTE_ARRAY_OFFSET,
compressedBlock.getBaseObject,
compressedBlock.getBaseOffset + 4,
compressedBlockArray.length)
taskMemoryManager.freeUnchecked(memoryBlock)
compressedBlock
}

private def decompressBlock(
compressedMemoryBlock: MemoryBlock,
decompressedSize: Int,
compressionCodec: CompressionCodec): MemoryBlock = {
// Copy compressed block (excluding padding) to on-heap byte array
val padding =
Platform.getInt(compressedMemoryBlock.getBaseObject, compressedMemoryBlock.getBaseOffset)
val compressedBlockArray = new Array[Byte](compressedMemoryBlock.size().toInt - padding)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @JoshRosen mentioned earlier that we don't need to reinitialize here and can reuse this Array across multiple blocks

Platform.copyMemory(
compressedMemoryBlock.getBaseObject,
compressedMemoryBlock.getBaseOffset + 4,
compressedBlockArray,
Platform.BYTE_ARRAY_OFFSET,
compressedMemoryBlock.size() - padding)

// Decompress into MemoryBlock backed by on-heap byte array
val decompressionStream =
compressionCodec.compressedInputStream(new ByteArrayInputStream(compressedBlockArray))
val decompressedBlock = new Array[Byte](decompressedSize)
ByteStreams.readFully(decompressionStream, decompressedBlock)
decompressionStream.close()
MemoryBlock.fromByteArray(decompressedBlock)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql

import org.apache.spark.sql.execution.PhysicalRDD

import scala.concurrent.duration._
import scala.language.postfixOps
Expand All @@ -26,9 +25,10 @@ import org.scalatest.concurrent.Eventually._

import org.apache.spark.Accumulators
import org.apache.spark.sql.columnar._
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.storage.{StorageLevel, RDDBlockId}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}

private case class BigData(s: String)

Expand Down Expand Up @@ -341,6 +341,17 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
}
}

for (compressionCodec <- Seq("", "lz4", "lzf", "snappy")) {
val compressionHint = if (compressionCodec.isEmpty) "uncompressed" else compressionCodec
test(s"cache and read table with Tungsten cache ($compressionHint)") {
// Use a 0.4 KB block size to force multiple blocks
val tungstenCachedDF = testData.tungstenCache(compressionCodec, blockSize = 400)
checkAnswer(tungstenCachedDF, testData)
// Run the job again so that we run on the cached data:
checkAnswer(tungstenCachedDF, testData)
}
}

test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") {
sparkContext.parallelize((1, 1) :: (2, 2) :: Nil)
.toDF("key", "value").selectExpr("key", "value", "key+1").registerTempTable("abc")
Expand Down
Loading