Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,14 @@ public final int appendFloats(int count, float v) {
return result;
}

public final int appendFloats(int length, float[] src, int offset) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really use this API and also appendDoubles? I scan the codes but didn't find anywhere they are used.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. That's the reason why this is missing until now. We need to use them in order to ColumnarBatch independently from Parquet.

reserve(elementsAppended + length);
int result = elementsAppended;
putFloats(elementsAppended, length, src, offset);
elementsAppended += length;
return result;
}

public final int appendDouble(double v) {
reserve(elementsAppended + 1);
putDouble(elementsAppended, v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,80 @@ class ColumnarBatchSuite extends SparkFunSuite {
}}
}

test("Float APIs") {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the added test, do we really test the added appendFloats?

(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Float]

val column = ColumnVector.allocate(1024, FloatType, memMode)
var idx = 0

val values = (1.0f :: 2.0f :: 3.0f :: 4.0f :: 5.0f :: Nil).toArray
column.putFloats(idx, 2, values, 0)
reference += 1.0f
reference += 2.0f
idx += 2

column.putFloats(idx, 3, values, 2)
reference += 3.0f
reference += 4.0f
reference += 5.0f
idx += 3

val buffer = new Array[Byte](8)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234f)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, 1.123f)

if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
// Ensure array contains Little Endian floats
val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getFloat(0))
Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, bb.getFloat(4))
}

column.putFloats(idx, 1, buffer, 4)
column.putFloats(idx + 1, 1, buffer, 0)
reference += 1.123f
reference += 2.234f
idx += 2

column.putFloats(idx, 2, buffer, 0)
reference += 2.234f
reference += 1.123f
idx += 2

while (idx < column.capacity) {
val single = random.nextBoolean()
if (single) {
val v = random.nextFloat()
column.putFloat(idx, v)
reference += v
idx += 1
} else {
val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
val v = random.nextFloat()
column.putFloats(idx, n, v)
var i = 0
while (i < n) {
reference += v
i += 1
}
idx += n
}
}

reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getFloat(v._2), "Seed = " + seed + " MemMode=" + memMode)
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getFloat(null, addr + 4 * v._2))
}
}
column.close
}}
}

test("Double APIs") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val seed = System.currentTimeMillis()
Expand All @@ -346,8 +420,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)

if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
// Ensure array contains Liitle Endian doubles
var bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
// Ensure array contains Little Endian doubles
val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0))
Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8))
}
Expand Down