Skip to content

Commit 74a6cbf

Browse files
committed
Address review comments, also fix the code when idle evictor interval <= 0 for fetched pool
1 parent fa12a0a commit 74a6cbf

File tree

2 files changed

+21
-11
lines changed

2 files changed

+21
-11
lines changed

docs/structured-streaming-kafka-integration.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,11 @@ The following properties are available to configure the consumer pool:
452452
<td>The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
453453
<td>5m (5 minutes)</td>
454454
</tr>
455+
<tr>
456+
<td>spark.kafka.consumer.cache.evictorThreadRunInterval</td>
457+
<td>The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.</td>
458+
<td>3m (3 minutes)</td>
459+
</tr>
455460
<tr>
456461
<td>spark.kafka.consumer.cache.jmx.enable</td>
457462
<td>Enable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance.
@@ -464,11 +469,11 @@ The following properties are available to configure the consumer pool:
464469
The size of the pool is limited by <code>spark.kafka.consumer.cache.capacity</code>,
465470
but it works as "soft-limit" to not block Spark tasks.
466471

467-
Idle eviction thread periodically removes some consumers which are not used longer than given timeout.
472+
Idle eviction thread periodically removes consumers which are not used longer than given timeout.
468473
If this threshold is reached when borrowing, it tries to remove the least-used entry that is currently not in use.
469474

470475
If it cannot be removed, then the pool will keep growing. In the worst case, the pool will grow to
471-
the max number of concurrent tasks that can run in the executor (that is, number of tasks slots).
476+
the max number of concurrent tasks that can run in the executor (that is, number of task slots).
472477

473478
If a task fails for any reason, the new task is executed with a newly created Kafka consumer for safety reasons.
474479
At the same time, we invalidate all consumers in pool which have same caching key, to remove consumer which was used
@@ -485,13 +490,13 @@ The following properties are available to configure the fetched data pool:
485490
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
486491
<tr>
487492
<td>spark.kafka.consumer.fetchedData.cache.timeout</td>
488-
<td>The maximum number of fetched data cached. Please note that it's a soft limit.</td>
489-
<td>64</td>
493+
<td>The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.</td>
494+
<td>5m (5 minutes)</td>
490495
</tr>
491496
<tr>
492497
<td>spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval</td>
493-
<td>The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.</td>
494-
<td>5m (5 minutes)</td>
498+
<td>The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.</td>
499+
<td>3m (3 minutes)</td>
495500
</tr>
496501
</table>
497502

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,15 @@ private[kafka010] class FetchedDataPool(
5757
private val evictorThreadRunIntervalMillis =
5858
conf.get(FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL)
5959

60-
private def startEvictorThread(): ScheduledFuture[_] = {
61-
executorService.scheduleAtFixedRate(() => {
62-
Utils.tryLogNonFatalError(removeIdleFetchedData())
63-
}, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS)
60+
private def startEvictorThread(): Option[ScheduledFuture[_]] = {
61+
if (evictorThreadRunIntervalMillis > 0) {
62+
val future = executorService.scheduleAtFixedRate(() => {
63+
Utils.tryLogNonFatalError(removeIdleFetchedData())
64+
}, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS)
65+
Some(future)
66+
} else {
67+
None
68+
}
6469
}
6570

6671
private var scheduled = startEvictorThread()
@@ -132,7 +137,7 @@ private[kafka010] class FetchedDataPool(
132137
}
133138

134139
def reset(): Unit = synchronized {
135-
scheduled.cancel(true)
140+
scheduled.foreach(_.cancel(true))
136141

137142
cache.clear()
138143
numTotalElements.reset()

0 commit comments

Comments
 (0)