Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.{util => ju}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.CustomTaskMetric
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer

Expand Down Expand Up @@ -105,4 +106,16 @@ private case class KafkaBatchPartitionReader(
range
}
}

override def currentMetricsValues(): Array[CustomTaskMetric] = {
val offsetOutOfRange = new CustomTaskMetric {
override def name(): String = "offsetOutOfRange"
override def value(): Long = consumer.getNumOffsetOutOfRange()
}
val dataLoss = new CustomTaskMetric {
override def name(): String = "dataLoss"
override def value(): Long = consumer.getNumDataLoss()
}
Array(offsetOutOfRange, dataLoss)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.CustomMetric
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.execution.metric.CustomSumMetric
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -503,9 +505,23 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
startingStreamOffsets,
failOnDataLoss(caseInsensitiveOptions))
}

override def supportedCustomMetrics(): Array[CustomMetric] = {
Array(new OffsetOutOfRangeMetric, new DataLossMetric)
}
}
}

private[spark] class OffsetOutOfRangeMetric extends CustomSumMetric {
override def name(): String = "offsetOutOfRange"
override def description(): String = "estimated number of fetched offsets out of range"
}

private[spark] class DataLossMetric extends CustomSumMetric {
override def name(): String = "dataLoss"
override def description(): String = "number of data loss error"
}

private[kafka010] object KafkaSourceProvider extends Logging {
private val ASSIGN = "assign"
private val SUBSCRIBE_PATTERN = "subscribepattern"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ private[kafka010] class KafkaDataConsumer(
fetchedDataPool: FetchedDataPool) extends Logging {
import KafkaDataConsumer._

private var offsetOutOfRange = 0L
private var dataLoss = 0L

private val isTokenProviderEnabled =
HadoopDelegationTokenManager.isServiceEnabled(SparkEnv.get.conf, "kafka")

Expand Down Expand Up @@ -329,7 +332,14 @@ private[kafka010] class KafkaDataConsumer(

reportDataLoss(topicPartition, groupId, failOnDataLoss,
s"Cannot fetch offset $toFetchOffset", e)

val oldToFetchOffsetd = toFetchOffset
toFetchOffset = getEarliestAvailableOffsetBetween(consumer, toFetchOffset, untilOffset)
if (toFetchOffset == UNKNOWN_OFFSET) {
offsetOutOfRange += (untilOffset - oldToFetchOffsetd)
} else {
offsetOutOfRange += (toFetchOffset - oldToFetchOffsetd)
}
}
}

Expand All @@ -350,6 +360,9 @@ private[kafka010] class KafkaDataConsumer(
consumer.getAvailableOffsetRange()
}

def getNumOffsetOutOfRange(): Long = offsetOutOfRange
def getNumDataLoss(): Long = dataLoss

/**
* Release borrowed objects in data reader to the pool. Once the instance is created, caller
* must call method after using the instance to make sure resources are not leaked.
Expand Down Expand Up @@ -596,6 +609,7 @@ private[kafka010] class KafkaDataConsumer(
message: String,
cause: Throwable = null): Unit = {
val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, failOnDataLoss)}"
dataLoss += 1
reportDataLoss0(failOnDataLoss, finalMessage, cause)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ object CustomMetrics {
}

/**
* Built-in `CustomMetric` that sums up metric values.
* Built-in `CustomMetric` that sums up metric values. Note that please extend this class
* and override `name` and `description` to create your custom metric for real usage.
*/
class CustomSumMetric extends CustomMetric {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make it an abstract class and force people to implement name and description?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay. had the same thought.

override def name(): String = "CustomSumMetric"
Expand All @@ -60,7 +61,8 @@ class CustomSumMetric extends CustomMetric {
}

/**
* Built-in `CustomMetric` that computes average of metric values.
* Built-in `CustomMetric` that computes average of metric values. Note that please extend this
* class and override `name` and `description` to create your custom metric for real usage.
*/
class CustomAvgMetric extends CustomMetric {
override def name(): String = "CustomAvgMetric"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ object SQLMetrics {
*/
def createV2CustomMetric(sc: SparkContext, customMetric: CustomMetric): SQLMetric = {
val acc = new SQLMetric(CustomMetrics.buildV2CustomMetricTypeName(customMetric))
acc.register(sc, name = Some(customMetric.name()), countFailedValues = false)
acc.register(sc, name = Some(customMetric.description()), countFailedValues = false)
acc
}

Expand Down