Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.columnar

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, GenericInternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -53,219 +53,288 @@ private[columnar] sealed trait ColumnStats extends Serializable {
/**
* Gathers statistics information from `row(ordinal)`.
*/
def gatherStats(row: InternalRow, ordinal: Int): Unit = {
if (row.isNullAt(ordinal)) {
nullCount += 1
// 4 bytes for null position
sizeInBytes += 4
}
def gatherStats(row: InternalRow, ordinal: Int): Unit

/**
* Gathers statistics information on `null`.
*/
def gatherNullStats(): Unit = {
nullCount += 1
// 4 bytes for null position
sizeInBytes += 4
count += 1
}

/**
* Column statistics represented as a single row, currently including closed lower bound, closed
* Column statistics represented as an array, currently including closed lower bound, closed
* upper bound and null count.
*/
def collectedStatistics: GenericInternalRow
def collectedStatistics: Array[Any]
}

/**
* A no-op ColumnStats only used for testing purposes.
*/
private[columnar] class NoopColumnStats extends ColumnStats {
override def gatherStats(row: InternalRow, ordinal: Int): Unit = super.gatherStats(row, ordinal)
private[columnar] final class NoopColumnStats extends ColumnStats {
override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
if (!row.isNullAt(ordinal)) {
count += 1
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](null, null, nullCount, count, 0L))
override def collectedStatistics: Array[Any] = Array[Any](null, null, nullCount, count, 0L)
}

private[columnar] class BooleanColumnStats extends ColumnStats {
private[columnar] final class BooleanColumnStats extends ColumnStats {
protected var upper = false
protected var lower = true

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getBoolean(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += BOOLEAN.defaultSize
gatherValueStats(value)
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
def gatherValueStats(value: Boolean): Unit = {
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += BOOLEAN.defaultSize
count += 1
}

override def collectedStatistics: Array[Any] =
Array[Any](lower, upper, nullCount, count, sizeInBytes)
}

private[columnar] class ByteColumnStats extends ColumnStats {
private[columnar] final class ByteColumnStats extends ColumnStats {
protected var upper = Byte.MinValue
protected var lower = Byte.MaxValue

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getByte(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += BYTE.defaultSize
gatherValueStats(value)
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
def gatherValueStats(value: Byte): Unit = {
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += BYTE.defaultSize
count += 1
}

override def collectedStatistics: Array[Any] =
Array[Any](lower, upper, nullCount, count, sizeInBytes)
}

private[columnar] class ShortColumnStats extends ColumnStats {
private[columnar] final class ShortColumnStats extends ColumnStats {
protected var upper = Short.MinValue
protected var lower = Short.MaxValue

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getShort(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += SHORT.defaultSize
gatherValueStats(value)
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
def gatherValueStats(value: Short): Unit = {
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += SHORT.defaultSize
count += 1
}

override def collectedStatistics: Array[Any] =
Array[Any](lower, upper, nullCount, count, sizeInBytes)
}

private[columnar] class IntColumnStats extends ColumnStats {
private[columnar] final class IntColumnStats extends ColumnStats {
protected var upper = Int.MinValue
protected var lower = Int.MaxValue

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getInt(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += INT.defaultSize
gatherValueStats(value)
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
def gatherValueStats(value: Int): Unit = {
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += INT.defaultSize
count += 1
}

override def collectedStatistics: Array[Any] =
Array[Any](lower, upper, nullCount, count, sizeInBytes)
}

private[columnar] class LongColumnStats extends ColumnStats {
private[columnar] final class LongColumnStats extends ColumnStats {
protected var upper = Long.MinValue
protected var lower = Long.MaxValue

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getLong(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += LONG.defaultSize
gatherValueStats(value)
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
def gatherValueStats(value: Long): Unit = {
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += LONG.defaultSize
count += 1
}

override def collectedStatistics: Array[Any] =
Array[Any](lower, upper, nullCount, count, sizeInBytes)
}

private[columnar] class FloatColumnStats extends ColumnStats {
private[columnar] final class FloatColumnStats extends ColumnStats {
protected var upper = Float.MinValue
protected var lower = Float.MaxValue

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getFloat(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += FLOAT.defaultSize
gatherValueStats(value)
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
def gatherValueStats(value: Float): Unit = {
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += FLOAT.defaultSize
count += 1
}

override def collectedStatistics: Array[Any] =
Array[Any](lower, upper, nullCount, count, sizeInBytes)
}

private[columnar] class DoubleColumnStats extends ColumnStats {
private[columnar] final class DoubleColumnStats extends ColumnStats {
protected var upper = Double.MinValue
protected var lower = Double.MaxValue

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getDouble(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += DOUBLE.defaultSize
gatherValueStats(value)
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
def gatherValueStats(value: Double): Unit = {
if (value > upper) upper = value
if (value < lower) lower = value
sizeInBytes += DOUBLE.defaultSize
count += 1
}

override def collectedStatistics: Array[Any] =
Array[Any](lower, upper, nullCount, count, sizeInBytes)
}

private[columnar] class StringColumnStats extends ColumnStats {
private[columnar] final class StringColumnStats extends ColumnStats {
protected var upper: UTF8String = null
protected var lower: UTF8String = null

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getUTF8String(ordinal)
if (upper == null || value.compareTo(upper) > 0) upper = value.clone()
if (lower == null || value.compareTo(lower) < 0) lower = value.clone()
sizeInBytes += STRING.actualSize(row, ordinal)
val size = STRING.actualSize(row, ordinal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related, but STRING.actualSize should just take UTF8String

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may not understand your point.
Do you want to use row.getUTF8String(ordinal).numBytes() + 4 instead of calling STRING.actualSize()? (i.e. method inlining).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we can just pass the UTF8String to STRING.actualSize

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In STRING.actualSize, we call row.getUTF8String(ordinal), so why not we pass in the UTF8String directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to add the new method STRING.actualSize(s: UTF8String)? The current signature actualSize(row: InternalRow, ordinal: Int) cannot be changed since it is declared at the super class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, nvm

gatherValueStats(value, size)
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
def gatherValueStats(value: UTF8String, size: Int): Unit = {
if (upper == null || value.compareTo(upper) > 0) upper = value.clone()
if (lower == null || value.compareTo(lower) < 0) lower = value.clone()
sizeInBytes += size
count += 1
}

override def collectedStatistics: Array[Any] =
Array[Any](lower, upper, nullCount, count, sizeInBytes)
}

private[columnar] class BinaryColumnStats extends ColumnStats {
private[columnar] final class BinaryColumnStats extends ColumnStats {
override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
sizeInBytes += BINARY.actualSize(row, ordinal)
val size = BINARY.actualSize(row, ordinal)
sizeInBytes += size
count += 1
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes))
override def collectedStatistics: Array[Any] =
Array[Any](null, null, nullCount, count, sizeInBytes)
}

private[columnar] class DecimalColumnStats(precision: Int, scale: Int) extends ColumnStats {
private[columnar] final class DecimalColumnStats(precision: Int, scale: Int) extends ColumnStats {
def this(dt: DecimalType) = this(dt.precision, dt.scale)

protected var upper: Decimal = null
protected var lower: Decimal = null

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getDecimal(ordinal, precision, scale)
if (upper == null || value.compareTo(upper) > 0) upper = value
if (lower == null || value.compareTo(lower) < 0) lower = value
// TODO: this is not right for DecimalType with precision > 18
sizeInBytes += 8
gatherValueStats(value)
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes))
def gatherValueStats(value: Decimal): Unit = {
if (upper == null || value.compareTo(upper) > 0) upper = value
if (lower == null || value.compareTo(lower) < 0) lower = value
sizeInBytes += 8
count += 1
}

override def collectedStatistics: Array[Any] =
Array[Any](lower, upper, nullCount, count, sizeInBytes)
}

private[columnar] class ObjectColumnStats(dataType: DataType) extends ColumnStats {
private[columnar] final class ObjectColumnStats(dataType: DataType) extends ColumnStats {
val columnType = ColumnType(dataType)

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
sizeInBytes += columnType.actualSize(row, ordinal)
val size = columnType.actualSize(row, ordinal)
sizeInBytes += size
count += 1
} else {
gatherNullStats
}
}

override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes))
override def collectedStatistics: Array[Any] =
Array[Any](null, null, nullCount, count, sizeInBytes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ case class InMemoryRelation(

batchStats.add(totalSize)

val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))
val stats = InternalRow.fromSeq(
columnBuilders.flatMap(_.columnStats.collectedStatistics))
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
Expand Down
Loading