Skip to content

Commit 837c4f5

Browse files
committed
Merge remote-tracking branch 'apache-github/master' into wal-pluggable
2 parents 754fbf8 + 03e85b4 commit 837c4f5

File tree

7 files changed

+18
-21
lines changed

7 files changed

+18
-21
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4444
blockManager.get(key) match {
4545
case Some(blockResult) =>
4646
// Partition is already materialized, so just return its values
47-
val inputMetrics = blockResult.inputMetrics
4847
val existingMetrics = context.taskMetrics
49-
.getInputMetricsForReadMethod(inputMetrics.readMethod)
50-
existingMetrics.incBytesRead(inputMetrics.bytesRead)
48+
.getInputMetricsForReadMethod(blockResult.readMethod)
49+
existingMetrics.incBytesRead(blockResult.bytes)
5150

5251
val iter = blockResult.data.asInstanceOf[Iterator[T]]
5352
new InterruptibleIterator[T](context, iter) {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.util.Random
2929
import sun.nio.ch.DirectBuffer
3030

3131
import org.apache.spark._
32-
import org.apache.spark.executor._
32+
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
3333
import org.apache.spark.io.CompressionCodec
3434
import org.apache.spark.network._
3535
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -50,11 +50,8 @@ private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
5050
/* Class for returning a fetched block and associated metrics. */
5151
private[spark] class BlockResult(
5252
val data: Iterator[Any],
53-
readMethod: DataReadMethod.Value,
54-
bytes: Long) {
55-
val inputMetrics = new InputMetrics(readMethod)
56-
inputMetrics.incBytesRead(bytes)
57-
}
53+
val readMethod: DataReadMethod.Value,
54+
val bytes: Long)
5855

5956
/**
6057
* Manager running on every node (driver and executors) which provides interfaces for putting and

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -428,19 +428,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
428428
val list1Get = store.get("list1")
429429
assert(list1Get.isDefined, "list1 expected to be in store")
430430
assert(list1Get.get.data.size === 2)
431-
assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
432-
assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
431+
assert(list1Get.get.bytes === list1SizeEstimate)
432+
assert(list1Get.get.readMethod === DataReadMethod.Memory)
433433
val list2MemoryGet = store.get("list2memory")
434434
assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
435435
assert(list2MemoryGet.get.data.size === 3)
436-
assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
437-
assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
436+
assert(list2MemoryGet.get.bytes === list2SizeEstimate)
437+
assert(list2MemoryGet.get.readMethod === DataReadMethod.Memory)
438438
val list2DiskGet = store.get("list2disk")
439439
assert(list2DiskGet.isDefined, "list2memory expected to be in store")
440440
assert(list2DiskGet.get.data.size === 3)
441441
// We don't know the exact size of the data on disk, but it should certainly be > 0.
442-
assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
443-
assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
442+
assert(list2DiskGet.get.bytes > 0)
443+
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
444444
}
445445

446446
test("in-memory LRU storage") {

mllib/src/main/scala/org/apache/spark/ml/Transformer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private[ml] abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, O
9494
throw new IllegalArgumentException(s"Output column ${map(outputCol)} already exists.")
9595
}
9696
val outputFields = schema.fields :+
97-
StructField(map(outputCol), outputDataType, !outputDataType.isPrimitive)
97+
StructField(map(outputCol), outputDataType, nullable = false)
9898
StructType(outputFields)
9999
}
100100

mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ class VectorAssembler extends Transformer with HasInputCols with HasOutputCol {
5555
schema(c).dataType match {
5656
case DoubleType => UnresolvedAttribute(c)
5757
case t if t.isInstanceOf[VectorUDT] => UnresolvedAttribute(c)
58-
case _: NativeType => Alias(Cast(UnresolvedAttribute(c), DoubleType), s"${c}_double_$uid")()
58+
case _: NumericType =>
59+
Alias(Cast(UnresolvedAttribute(c), DoubleType), s"${c}_double_$uid")()
5960
}
6061
}
6162
dataset.select(col("*"), assembleFunc(new Column(CreateStruct(args))).as(map(outputCol)))
@@ -67,7 +68,7 @@ class VectorAssembler extends Transformer with HasInputCols with HasOutputCol {
6768
val outputColName = map(outputCol)
6869
val inputDataTypes = inputColNames.map(name => schema(name).dataType)
6970
inputDataTypes.foreach {
70-
case _: NativeType =>
71+
case _: NumericType =>
7172
case t if t.isInstanceOf[VectorUDT] =>
7273
case other =>
7374
throw new IllegalArgumentException(s"Data type $other is not supported.")

sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ class NullType private() extends DataType {
299299
case object NullType extends NullType
300300

301301

302-
protected[spark] object NativeType {
302+
protected[sql] object NativeType {
303303
val all = Seq(
304304
IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
305305

@@ -327,7 +327,7 @@ protected[sql] object PrimitiveType {
327327
}
328328
}
329329

330-
protected[spark] abstract class NativeType extends DataType {
330+
protected[sql] abstract class NativeType extends DataType {
331331
private[sql] type JvmType
332332
@transient private[sql] val tag: TypeTag[JvmType]
333333
private[sql] val ordering: Ordering[JvmType]

streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
155155
assert(recordedData.toSet === generatedData.toSet)
156156
}
157157

158-
test("block generator throttling") {
158+
ignore("block generator throttling") {
159159
val blockGeneratorListener = new FakeBlockGeneratorListener
160160
val blockIntervalMs = 100
161161
val maxRate = 1001

0 commit comments

Comments
 (0)