Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,33 @@ class KafkaSourceSuite extends KafkaSourceTest {
testUnsupportedConfig("kafka.auto.offset.reset", "latest")
}

test("input row metrics") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)
testUtils.sendMessages(topic, Array("-1"))
require(testUtils.getLatestOffsets(Set(topic)).size === 5)

val kafka = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
AssertOnLastQueryStatus { status =>
assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
assert(status.sourceStatuses(0).processingRate > 0.0)
}
)
}

private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"

private def testFromLatestOffsets(topic: String, options: (String, String)*): Unit = {
Expand Down
10 changes: 10 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,16 @@ object MimaExcludes {
) ++ Seq(
// SPARK-16240: ML persistence backward compatibility for LDA
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$")
) ++ Seq(
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryInfo"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.queryInfo"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.queryInfo"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.SourceStatus.offsetDesc"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceStatus.this"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status")
)
}

Expand Down
301 changes: 301 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,304 @@ def resetTerminated(self):
self._jsqm.resetTerminated()


class StreamingQueryStatus(object):
"""A class used to report information about the progress of a StreamingQuery.

.. note:: Experimental

.. versionadded:: 2.1
"""

def __init__(self, jsqs):
self._jsqs = jsqs

def __str__(self):
"""
Pretty string of this query status.

>>> print(sqs)
StreamingQueryStatus:
Query name: query
Query id: 1
Status timestamp: 123
Input rate: 15.5 rows/sec
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
triggerId: 5
Source statuses [1 source]:
Source 1: MySource1
Available offset: #0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status: MySink
Committed offsets: [#1, -]
"""
return self._jsqs.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def name(self):
"""
Name of the query. This name is unique across all active queries.

>>> sqs.name
u'query'
"""
return self._jsqs.name()

@property
@since(2.1)
def id(self):
"""
Id of the query. This id is unique across all queries that have been started in
the current process.

>>> int(sqs.id)
1
"""
return self._jsqs.id()

@property
@since(2.1)
def timestamp(self):
"""
Timestamp (ms) of when this query was generated.

>>> int(sqs.timestamp)
123
"""
return self._jsqs.timestamp()

@property
@since(2.1)
def inputRate(self):
"""
Current total rate (rows/sec) at which data is being generated by all the sources.

>>> sqs.inputRate
15.5
"""
return self._jsqs.inputRate()

@property
@since(2.1)
def processingRate(self):
"""
Current rate (rows/sec) at which the query is processing data from all the sources.

>>> sqs.processingRate
23.5
"""
return self._jsqs.processingRate()

@property
@since(2.1)
def latency(self):
"""
Current average latency between the data being available in source and the sink
writing the corresponding output.

>>> sqs.latency
345.0
"""
if (self._jsqs.latency().nonEmpty()):
return self._jsqs.latency().get()
else:
return None

@property
@ignore_unicode_prefix
@since(2.1)
def sourceStatuses(self):
"""
Current statuses of the sources as a list.

>>> len(sqs.sourceStatuses)
1
>>> sqs.sourceStatuses[0].description
u'MySource1'
"""
return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]

@property
@ignore_unicode_prefix
@since(2.1)
def sinkStatus(self):
"""
Current status of the sink.

>>> sqs.sinkStatus.description
u'MySink'
"""
return SinkStatus(self._jsqs.sinkStatus())

@property
@ignore_unicode_prefix
@since(2.1)
def triggerDetails(self):
"""
Low-level details of the currently active trigger (e.g. number of rows processed
in trigger, latency of intermediate steps, etc.).

If no trigger is currently active, then it will have details of the last completed trigger.

>>> sqs.triggerDetails
{u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
u'isDataPresentInTrigger': u'true'}
"""
return self._jsqs.triggerDetails()


class SourceStatus(object):
"""
Status and metrics of a streaming Source.

.. note:: Experimental

.. versionadded:: 2.1
"""

def __init__(self, jss):
self._jss = jss

def __str__(self):
"""
Pretty string of this source status.

>>> print(sqs.sourceStatuses[0])
SourceStatus: MySource1
Available offset: #0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
"""
return self._jss.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def description(self):
"""
Description of the source corresponding to this status.

>>> sqs.sourceStatuses[0].description
u'MySource1'
"""
return self._jss.description()

@property
@ignore_unicode_prefix
@since(2.1)
def offsetDesc(self):
"""
Description of the current offset if known.

>>> sqs.sourceStatuses[0].offsetDesc
u'#0'
"""
return self._jss.offsetDesc()

@property
@since(2.1)
def inputRate(self):
"""
Current rate (rows/sec) at which data is being generated by the source.

>>> sqs.sourceStatuses[0].inputRate
15.5
"""
return self._jss.inputRate()

@property
@since(2.1)
def processingRate(self):
"""
Current rate (rows/sec) at which the query is processing data from the source.

>>> sqs.sourceStatuses[0].processingRate
23.5
"""
return self._jss.processingRate()

@property
@ignore_unicode_prefix
@since(2.1)
def triggerDetails(self):
"""
Low-level details of the currently active trigger (e.g. number of rows processed
in trigger, latency of intermediate steps, etc.).

If no trigger is currently active, then it will have details of the last completed trigger.

>>> sqs.sourceStatuses[0].triggerDetails
{u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
u'latency.getBatch.source': u'20'}
"""
return self._jss.triggerDetails()


class SinkStatus(object):
"""
Status and metrics of a streaming Sink.

.. note:: Experimental

.. versionadded:: 2.1
"""

def __init__(self, jss):
self._jss = jss

def __str__(self):
"""
Pretty string of this source status.

>>> print(sqs.sinkStatus)
SinkStatus: MySink
Committed offsets: [#1, -]
"""
return self._jss.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def description(self):
"""
Description of the source corresponding to this status.

>>> sqs.sinkStatus.description
u'MySink'
"""
return self._jss.description()

@property
@ignore_unicode_prefix
@since(2.1)
def offsetDesc(self):
"""
Description of the current offsets up to which data has been written by the sink.

>>> sqs.sinkStatus.offsetDesc
u'[#1, -]'
"""
return self._jss.offsetDesc()


class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.

Expand Down Expand Up @@ -751,11 +1049,14 @@ def _test():
globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
globs['df'] = \
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
globs['sqs'] = StreamingQueryStatus(
spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())

(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
globs['spark'].stop()

if failure_count:
exit(-1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
ret
}

/**
* Returns a Seq containing the leaves in this tree.
*/
def collectLeaves(): Seq[BaseType] = {
this.collect { case p if p.children.isEmpty => p }
}

/**
* Finds and returns the first [[TreeNode]] of the tree for which the given partial function
* is defined (pre-order), and applies the partial function to it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ case class LocalTableScanExec(
}

override def executeCollect(): Array[InternalRow] = {
longMetric("numOutputRows").add(unsafeRows.size)
unsafeRows
}

override def executeTake(limit: Int): Array[InternalRow] = {
unsafeRows.take(limit)
val taken = unsafeRows.take(limit)
longMetric("numOutputRows").add(taken.size)
taken
}
}
Loading