Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
* A [[ContinuousReader]] for data from kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.unsafe.types.UTF8String

/** A simple class for converting Kafka ConsumerRecord to UnsafeRow */
private[kafka010] class KafkaRecordToUnsafeRowConverter {
private val sharedRow = new UnsafeRow(7)
private val bufferHolder = new BufferHolder(sharedRow)
private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
private val rowWriter = new UnsafeRowWriter(7)

def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
bufferHolder.reset()
rowWriter.reset()

if (record.key == null) {
rowWriter.setNullAt(0)
Expand All @@ -46,7 +44,6 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
5,
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(record.timestamp)))
rowWriter.write(6, record.timestampType.id)
sharedRow.setTotalSize(bufferHolder.totalSize)
sharedRow
rowWriter.getRow()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,21 @@
* this class per writing program, so that the memory segment/data buffer can be reused. Note that
* for each incoming record, we should call `reset` of BufferHolder instance before write the record
* and reuse the data buffer.
*
* Generally we should call `UnsafeRow.setTotalSize` and pass in `BufferHolder.totalSize` to update
* the size of the result row, after writing a record to the buffer. However, we can skip this step
* if the fields of row are all fixed-length, as the size of result row is also fixed.
*/
public class BufferHolder {
final class BufferHolder {

private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;

public byte[] buffer;
public int cursor = Platform.BYTE_ARRAY_OFFSET;
private byte[] buffer;
private int cursor = Platform.BYTE_ARRAY_OFFSET;
private final UnsafeRow row;
private final int fixedSize;

public BufferHolder(UnsafeRow row) {
BufferHolder(UnsafeRow row) {
this(row, 64);
}

public BufferHolder(UnsafeRow row, int initialSize) {
BufferHolder(UnsafeRow row, int initialSize) {
int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {
throw new UnsupportedOperationException(
Expand All @@ -64,7 +60,7 @@ public BufferHolder(UnsafeRow row, int initialSize) {
/**
* Grows the buffer by at least neededSize and points the row to the buffer.
*/
public void grow(int neededSize) {
void grow(int neededSize) {
if (neededSize > ARRAY_MAX - totalSize()) {
throw new UnsupportedOperationException(
"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
Expand All @@ -86,11 +82,23 @@ public void grow(int neededSize) {
}
}

public void reset() {
byte[] getBuffer() {
return buffer;
}

int getCursor() {
return cursor;
}

void increaseCursor(int val) {
cursor += val;
}

void reset() {
cursor = Platform.BYTE_ARRAY_OFFSET + fixedSize;
}

public int totalSize() {
int totalSize() {
return cursor - Platform.BYTE_ARRAY_OFFSET;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

import static org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.calculateHeaderPortionInBytes;

Expand All @@ -32,141 +30,123 @@
*/
public final class UnsafeArrayWriter extends UnsafeWriter {

private BufferHolder holder;

// The offset of the global buffer where we start to write this array.
private int startingOffset;

// The number of elements in this array
private int numElements;

// The element size in this array
private int elementSize;

private int headerInBytes;

private void assertIndexIsValid(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < numElements : "index (" + index + ") should < " + numElements;
}

public void initialize(BufferHolder holder, int numElements, int elementSize) {
public UnsafeArrayWriter(UnsafeWriter writer, int elementSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems cleaner to take a BufferHolder here instead of UnsafeWriter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the fact that you can build a writer from a writer. I don't think the buffer holder should be visible at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at the caller side, it makes sense to me now. We can add a new constructor to build top-level array writer in the future, if needed.

super(writer.getBufferHolder());
this.elementSize = elementSize;
}

public void initialize(int numElements) {
// We need 8 bytes to store numElements in header
this.numElements = numElements;
this.headerInBytes = calculateHeaderPortionInBytes(numElements);

this.holder = holder;
this.startingOffset = holder.cursor;
this.startingOffset = cursor();

// Grows the global buffer ahead for header and fixed size data.
int fixedPartInBytes =
ByteArrayMethods.roundNumberOfBytesToNearestWord(elementSize * numElements);
holder.grow(headerInBytes + fixedPartInBytes);

// Write numElements and clear out null bits to header
Platform.putLong(holder.buffer, startingOffset, numElements);
Platform.putLong(getBuffer(), startingOffset, numElements);
for (int i = 8; i < headerInBytes; i += 8) {
Platform.putLong(holder.buffer, startingOffset + i, 0L);
Platform.putLong(getBuffer(), startingOffset + i, 0L);
}

// fill 0 into reminder part of 8-bytes alignment in unsafe array
for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
Platform.putByte(holder.buffer, startingOffset + headerInBytes + i, (byte) 0);
Platform.putByte(getBuffer(), startingOffset + headerInBytes + i, (byte) 0);
}
holder.cursor += (headerInBytes + fixedPartInBytes);
increaseCursor(headerInBytes + fixedPartInBytes);
}

private void zeroOutPaddingBytes(int numBytes) {
if ((numBytes & 0x07) > 0) {
Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 3), 0L);
}
}

private long getElementOffset(int ordinal, int elementSize) {
private long getElementOffset(int ordinal) {
return startingOffset + headerInBytes + ordinal * elementSize;
}

public void setOffsetAndSize(int ordinal, int currentCursor, int size) {
assertIndexIsValid(ordinal);
final long relativeOffset = currentCursor - startingOffset;
final long offsetAndSize = (relativeOffset << 32) | (long)size;

write(ordinal, offsetAndSize);
}

private void setNullBit(int ordinal) {
assertIndexIsValid(ordinal);
BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
BitSetMethods.set(getBuffer(), startingOffset + 8, ordinal);
}

public void setNull1Bytes(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
writeByte(getElementOffset(ordinal), (byte)0);
}

public void setNull2Bytes(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), (short)0);
writeShort(getElementOffset(ordinal), (short)0);
}

public void setNull4Bytes(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), 0);
writeInt(getElementOffset(ordinal), 0);
}

public void setNull8Bytes(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
Platform.putLong(holder.buffer, getElementOffset(ordinal, 8), (long)0);
writeLong(getElementOffset(ordinal), 0);
}

public void setNull(int ordinal) { setNull8Bytes(ordinal); }

public void write(int ordinal, boolean value) {
assertIndexIsValid(ordinal);
Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), value);
writeBoolean(getElementOffset(ordinal), value);
}

public void write(int ordinal, byte value) {
assertIndexIsValid(ordinal);
Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), value);
writeByte(getElementOffset(ordinal), value);
}

public void write(int ordinal, short value) {
assertIndexIsValid(ordinal);
Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), value);
writeShort(getElementOffset(ordinal), value);
}

public void write(int ordinal, int value) {
assertIndexIsValid(ordinal);
Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), value);
writeInt(getElementOffset(ordinal), value);
}

public void write(int ordinal, long value) {
assertIndexIsValid(ordinal);
Platform.putLong(holder.buffer, getElementOffset(ordinal, 8), value);
writeLong(getElementOffset(ordinal), value);
}

public void write(int ordinal, float value) {
if (Float.isNaN(value)) {
value = Float.NaN;
}
assertIndexIsValid(ordinal);
Platform.putFloat(holder.buffer, getElementOffset(ordinal, 4), value);
writeFloat(getElementOffset(ordinal), value);
}

public void write(int ordinal, double value) {
if (Double.isNaN(value)) {
value = Double.NaN;
}
assertIndexIsValid(ordinal);
Platform.putDouble(holder.buffer, getElementOffset(ordinal, 8), value);
writeDouble(getElementOffset(ordinal), value);
}

public void write(int ordinal, Decimal input, int precision, int scale) {
// make sure Decimal object has the same scale as DecimalType
assertIndexIsValid(ordinal);
if (input.changePrecision(precision, scale)) {
if (input != null && input.changePrecision(precision, scale)) {
if (precision <= Decimal.MAX_LONG_DIGITS()) {
write(ordinal, input.toUnscaledLong());
} else {
Expand All @@ -180,65 +160,14 @@ public void write(int ordinal, Decimal input, int precision, int scale) {

// Write the bytes to the variable length portion.
Platform.copyMemory(
bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, numBytes);
setOffsetAndSize(ordinal, holder.cursor, numBytes);
bytes, Platform.BYTE_ARRAY_OFFSET, getBuffer(), cursor(), numBytes);
setOffsetAndSize(ordinal, numBytes);

// move the cursor forward with 8-bytes boundary
holder.cursor += roundedSize;
increaseCursor(roundedSize);
}
} else {
setNull(ordinal);
}
}

public void write(int ordinal, UTF8String input) {
final int numBytes = input.numBytes();
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);

// grow the global buffer before writing data.
holder.grow(roundedSize);

zeroOutPaddingBytes(numBytes);

// Write the bytes to the variable length portion.
input.writeToMemory(holder.buffer, holder.cursor);

setOffsetAndSize(ordinal, holder.cursor, numBytes);

// move the cursor forward.
holder.cursor += roundedSize;
}

public void write(int ordinal, byte[] input) {
final int numBytes = input.length;
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(input.length);

// grow the global buffer before writing data.
holder.grow(roundedSize);

zeroOutPaddingBytes(numBytes);

// Write the bytes to the variable length portion.
Platform.copyMemory(
input, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, numBytes);

setOffsetAndSize(ordinal, holder.cursor, numBytes);

// move the cursor forward.
holder.cursor += roundedSize;
}

public void write(int ordinal, CalendarInterval input) {
// grow the global buffer before writing data.
holder.grow(16);

// Write the months and microseconds fields of Interval to the variable length portion.
Platform.putLong(holder.buffer, holder.cursor, input.months);
Platform.putLong(holder.buffer, holder.cursor + 8, input.microseconds);

setOffsetAndSize(ordinal, holder.cursor, 16);

// move the cursor forward.
holder.cursor += 16;
}
}
Loading