Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,70 @@
package org.apache.spark.sql.kafka010

import java.{util => ju}
import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit}
import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentMap, ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.google.common.cache._
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
import org.apache.kafka.clients.producer.KafkaProducer
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging

private[kafka010] object CachedKafkaProducer extends Logging {
private[kafka010] case class CachedKafkaProducer(
private val id: String = ju.UUID.randomUUID().toString,
private val inUseCount: AtomicInteger = new AtomicInteger(0),
private val kafkaParams: Seq[(String, Object)]) extends Logging {

private val configMap = kafkaParams.map(x => x._1 -> x._2).toMap.asJava

private def updatedAuthConfigIfNeeded(kafkaParamsMap: ju.Map[String, Object]) =
KafkaConfigUpdater("executor", kafkaParamsMap.asScala.toMap)
.setAuthenticationConfigIfNeeded()
.build()

lazy val kafkaProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
val producer = new KafkaProducer[Array[Byte], Array[Byte]](updatedAuthConfigIfNeeded(configMap))
logDebug(s"Created a new instance of KafkaProducer for " +
s"$kafkaParams with Id: $id")
closed = false
producer
}
@volatile
private var isCached: Boolean = true
private var closed: Boolean = true
private def close(): Unit = {
try {
this.synchronized {
if (!closed) {
closed = true
kafkaProducer.close()
logInfo(s"Closed kafka producer: $this")
}
}
} catch {
case NonFatal(e) =>
logWarning(s"Error while closing kafka producer with params: $kafkaParams", e)
}
}

private def inUse(): Boolean = inUseCount.get() > 0

private def unCache(): Unit = isCached = false

private[kafka010] def getInUseCount: Int = inUseCount.get()

private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
private[kafka010] def getKafkaParams: Seq[(String, Object)] = kafkaParams

private[kafka010] def flush(): Unit = kafkaProducer.flush()

private[kafka010] def isClosed: Boolean = closed
}

private[kafka010] object CachedKafkaProducer extends Logging {

private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10)

Expand All @@ -40,81 +90,114 @@ private[kafka010] object CachedKafkaProducer extends Logging {
"spark.kafka.producer.cache.timeout",
s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout)

private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
override def load(config: Seq[(String, Object)]): Producer = {
val configMap = config.map(x => x._1 -> x._2).toMap.asJava
createKafkaProducer(configMap)
private val cacheLoader = new CacheLoader[Seq[(String, Object)], CachedKafkaProducer] {
override def load(params: Seq[(String, Object)]): CachedKafkaProducer = {
CachedKafkaProducer(kafkaParams = params)
}
}

private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() {
private val closeQueue = new ConcurrentLinkedQueue[CachedKafkaProducer]()

private val removalListener = new RemovalListener[Seq[(String, Object)], CachedKafkaProducer]() {
override def onRemoval(
notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = {
val paramsSeq: Seq[(String, Object)] = notification.getKey
val producer: Producer = notification.getValue
logDebug(
s"Evicting kafka producer $producer params: $paramsSeq, due to ${notification.getCause}")
close(paramsSeq, producer)
notification: RemovalNotification[Seq[(String, Object)], CachedKafkaProducer]): Unit = {
val producer: CachedKafkaProducer = notification.getValue
if (producer.inUse()) {
logDebug(s"Evicting kafka producer $producer, due to ${notification.getCause}.")
// When `inuse` producer is evicted we wait for it to be released before finally closing it.
closeQueue.add(producer)
producer.unCache()
} else {
close(producer)
}
}
}

private lazy val guavaCache: LoadingCache[Seq[(String, Object)], Producer] =
private lazy val guavaCache: LoadingCache[Seq[(String, Object)], CachedKafkaProducer] =
CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
.removalListener(removalListener)
.build[Seq[(String, Object)], Producer](cacheLoader)

private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = {
val updatedKafkaProducerConfiguration =
KafkaConfigUpdater("executor", producerConfiguration.asScala.toMap)
.setAuthenticationConfigIfNeeded()
.build()
val kafkaProducer: Producer = new Producer(updatedKafkaProducerConfiguration)
logDebug(s"Created a new instance of KafkaProducer for $updatedKafkaProducerConfiguration.")
kafkaProducer
}
.build[Seq[(String, Object)], CachedKafkaProducer](cacheLoader)

/**
* Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
* exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
* one instance per specified kafkaParams.
*/
private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = {
val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams)
private[kafka010] def acquire(kafkaParamsMap: ju.Map[String, Object]): CachedKafkaProducer = {
val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParamsMap)
try {
guavaCache.get(paramsSeq)
val producer = this.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required? It's risky to add new global locks to things.

val cachedKafkaProducer: CachedKafkaProducer = guavaCache.get(paramsSeq)
cachedKafkaProducer.inUseCount.incrementAndGet()
logDebug(s"Granted producer $cachedKafkaProducer")
cachedKafkaProducer
}
producer
} catch {
case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError)
case e@(_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError)
if e.getCause != null =>
throw e.getCause
}
}

private def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = {
val paramsSeq: Seq[(String, Object)] = kafkaParams.asScala.toSeq.sortBy(x => x._1)
private def paramsToSeq(kafkaParamsMap: ju.Map[String, Object]): Seq[(String, Object)] = {
val paramsSeq: Seq[(String, Object)] = kafkaParamsMap.asScala.toSeq.sortBy(x => x._1)
paramsSeq
}

/** For explicitly closing kafka producer */
private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = {
val paramsSeq = paramsToSeq(kafkaParams)
guavaCache.invalidate(paramsSeq)
/* Release a kafka producer back to the kafka cache. We simply decrement it's inuse count. */
private[kafka010] def release(producer: CachedKafkaProducer, failing: Boolean): Unit = {
this.synchronized {
// It should be ok to call release multiple times on the same producer object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's not really okay, right? If task A calls release multiple times, the producer might have its inUseCount decremented to 0 even though task B is using it.

if (producer.inUse()) {
// So that we do not end up with -ve in-use counts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Not sure if I understand this comment. Maybe negative?

producer.inUseCount.decrementAndGet()
logDebug(s"Released producer $producer.")
} else {
logWarning(s"Tried to release a not in use producer, $producer.")
}
if (failing) {
// If this producer is failing to write, we remove it from cache.
// So that it is re-created, eventually.
val cachedProducer = guavaCache.getIfPresent(producer.kafkaParams)
if (cachedProducer != null && cachedProducer.id == producer.id) {
logDebug(s"Invalidating a failing producer: $producer.")
guavaCache.invalidate(producer.kafkaParams)
}
}
}
if (!producer.inUse() && !producer.isCached) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and removalListener are not guarded with same lock, so some kinds of race condition can happen. One of case would be:

  • producer going to be evicted somehow: removalListener called
    • in removalListener, inUse() = true, so it adds producer to closeQueue, and context-switch happens
  • that producer is going to be released from other thread: release called
    • in release, decrementing inUse count makes inUse = false, and forward to if (!producer.inUse() && !producer.isCached), and skip this line because of isCached = true (though it's in closeQueue)
  • removalListener takes control - producer.unCache() called but the producer cannot be cleaned up from usage threads

I guess iterating closeQueue and closing them would make leaked producers be eventually closed, but this if statement is not always true (it might not occur frequently) so don't know when they can be closed.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I feel complicated that there're two different places which close producer.

Every producers are cached, and all the producers will be go through removal listener to be removed in cache and be closed immediately or added to close queue to be closed eventually (when it fulfills the condition).

Then from outer removal listener we just need to concern about how to handle close queue. The if statement here makes us to determine when to handle close queue immediately, but there's race condition I described before so it's not exactly correct.
(Actually if it's exactly thread-safe we even don't need a close queue. Just release it once if conditions are met.)

Other approach would be injecting cleanup of close queue to the end of release method with some probability (Guava cache seems to evict like this way), or even periodically in background thread. Handling close queue doesn't need to be synchronized with others (if we avoid concurrent modification exception) - only thing would be inUse in producer, but once we revisit the producer eventually (not too late) no need to be exact.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized close which removal listener calls is same as close here so the possibility is pretty higher than I thought. (I feel it might be too frequent though.)

Then I think if statement here can be just removed and it wouldn't hurt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race condition is not clear to me, you have concluded in the comment that there is no problem, since eventually producer will be closed.
either a producer is closed immediately, because it is not in use and evicted by guava, or it is in use and should be closed once it is finally released by all the threads using it.

Can you give a test, to reproduce the problem you are stating?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Apr 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you point out which you don't understand my explanation of race condition? Race condition is not a thing a test can reproduce consistently.

In short, Adding producer to closeQueue and marking producer as uncached are not being done atomically given there's no synchronization in removalListener.onRemoval. There's a case producer is added to closeQueue but isCached is still true - race condition happens here.

In other words, suppose inCount in producer is exactly correct and thread-safe: then why we need close queue? There should be "exactly only once" when inUse = false, either onRemoval or release, then closeQueue doesn't be needed and just closing the producer should work. closeQueue is just for mitigation when synchronization of inCount is off, and looping closeQueue every time when other producer is closing is not free.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Apr 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just removing closeQueue might resolve the race condition. (Though I guess we still need to synchronize between release and removal listener to ensure safety) The issue happens because there're two steps to uncache the producer, and even each step is thread-safe, doing two steps together is NOT thread-safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just removing closeQueue might resolve the race condition.

@HeartSaVioR At the first glance don't really see how to solve it without closeQueue. If the task failed failed for instance (maybe producer issue maybe not) several other users not yet finished so cannot be closed immediately. Yeah, there is a reference to this consumer on the user side but since there is no guarantee they will call release with that specific producer I think it's better to have this queue.

I see the solution in the proper synchronization. As an optional change I like how guava does the clean-up and I would move this code:

    for (p <- closeQueue.iterator().asScala) {
      if (!p.inUse()) {
        closeQueue.remove(p)
        p.close()
      }
    }

at the end of release without any condition. If its empty doesn't consume much ticks and more safe to deal with producers in the queue.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Apr 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi

Yeah, there is a reference to this consumer on the user side but since there is no guarantee they will call release with that specific producer I think it's better to have this queue.

The situation is same even having closeQueue. If we suppose the case any borrower doesn't call release (say, leak on releasing resource), producer never reaches inCount = 0 and producer wouldn't be closed in closeQueue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the point now and makes sense. This case both way is fine for me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe close queue is helpful, as we can delegate a close to be done outside the synchronized block.

// it will take care of removing it from close queue as well.
close(producer)
}
}

/** Auto close on cache evict */
private def close(paramsSeq: Seq[(String, Object)], producer: Producer): Unit = {
try {
logInfo(s"Closing the KafkaProducer with params: ${paramsSeq.mkString("\n")}.")
producer.close()
} catch {
case NonFatal(e) => logWarning("Error while closing kafka producer.", e)
/** Close this producer and process pending closes. */
private def close(producer: CachedKafkaProducer): Unit = {
producer.close()
// Check and close any other producers previously evicted, but pending to be closed.
for (p <- closeQueue.iterator().asScala) {
if (!p.inUse()) {
closeQueue.remove(p)
p.close()
}
}
}

// Intended for testing purpose only.
private[kafka010] def clear(): Unit = {
logInfo("Cleaning up guava cache.")
logInfo("Cleaning up guava cache and force closing all kafka producers.")
guavaCache.invalidateAll()
for (p <- closeQueue.iterator().asScala) {
p.close()
}
closeQueue.clear()
}

// Intended for testing purpose only.
private def getAsMap: ConcurrentMap[Seq[(String, Object)], Producer] = guavaCache.asMap()
private[kafka010] def evict(params: Seq[(String, Object)]): Unit = {
guavaCache.invalidate(params)
}

private[kafka010] def getAsMap: ConcurrentMap[Seq[(String, Object)], CachedKafkaProducer] =
guavaCache.asMap()
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class KafkaStreamDataWriter(
inputSchema: Seq[Attribute])
extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {

private lazy val producer = CachedKafkaProducer.getOrCreate(producerParams)
protected val producer: CachedKafkaProducer = CachedKafkaProducer.acquire(producerParams)

def write(row: InternalRow): Unit = {
checkForErrors()
Expand All @@ -103,20 +103,21 @@ class KafkaStreamDataWriter(
// Send is asynchronous, but we can't commit until all rows are actually in Kafka.
// This requires flushing and then checking that no callbacks produced errors.
// We also check for errors before to fail as soon as possible - the check is cheap.
checkForErrors()
producer.flush()
checkForErrors()
close()
KafkaWriterCommitMessage
}

def abort(): Unit = {}
def abort(): Unit = {
close()
}

def close(): Unit = {
checkForErrors()
if (producer != null) {
private def close(): Unit = {
try {
checkForErrors()
producer.flush()
checkForErrors()
CachedKafkaProducer.close(producerParams)
} finally {
CachedKafkaProducer.release(producer, failedWrite != null)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
Expand All @@ -35,32 +35,35 @@ private[kafka010] class KafkaWriteTask(
inputSchema: Seq[Attribute],
topic: Option[String]) extends KafkaRowWriter(inputSchema, topic) {
// used to synchronize with Kafka callbacks
private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
protected val producer: CachedKafkaProducer =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in lifecycle for the producer. Are we sure that's safe?

CachedKafkaProducer.acquire(producerConfiguration)

/**
* Writes key value data out to topics.
*/
def execute(iterator: Iterator[InternalRow]): Unit = {
producer = CachedKafkaProducer.getOrCreate(producerConfiguration)
while (iterator.hasNext && failedWrite == null) {
val currentRow = iterator.next()
sendRow(currentRow, producer)
}
}

def close(): Unit = {
checkForErrors()
if (producer != null) {
try {
checkForErrors()
producer.flush()
checkForErrors()
producer = null
} finally {
CachedKafkaProducer.release(producer, failedWrite != null)
}
}

}

private[kafka010] abstract class KafkaRowWriter(
inputSchema: Seq[Attribute], topic: Option[String]) {

protected val producer: CachedKafkaProducer
// used to synchronize with Kafka callbacks
@volatile protected var failedWrite: Exception = _
protected val projection = createProjection
Expand All @@ -79,7 +82,7 @@ private[kafka010] abstract class KafkaRowWriter(
* assuming the row is in Kafka.
*/
protected def sendRow(
row: InternalRow, producer: KafkaProducer[Array[Byte], Array[Byte]]): Unit = {
row: InternalRow, producer: CachedKafkaProducer): Unit = {
val projectedRow = projection(row)
val topic = projectedRow.getUTF8String(0)
val key = projectedRow.getBinary(1)
Expand All @@ -89,7 +92,7 @@ private[kafka010] abstract class KafkaRowWriter(
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
}
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
producer.send(record, callback)
producer.kafkaProducer.send(record, callback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a wrapper since we have one for flush?

}

protected def checkForErrors(): Unit = {
Expand Down
Loading