diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index d37c53ce1fb..43050b589cb 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -187,6 +187,7 @@ kyuubi\.engine\.share
\.level|
<undefined>
|
(deprecated) - Using kyuubi.engine.share.level.subdomain instead
|
string
|
1.2.0
kyuubi\.engine\.share
\.level\.subdomain|
<undefined>
|
Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values in `^[a-zA-Z_-]{1,14}$` form. For example, for `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level
|
string
|
1.4.0
kyuubi\.engine\.single
\.spark\.session|
false
|
When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.
|
boolean
|
1.3.0
+kyuubi\.engine\.ui
\.retainedSessions|
200
|
The number of SQL client sessions kept in the Kyuubi Query Engine web UI.
|
int
|
1.4.0
kyuubi\.engine\.ui\.stop
\.enabled|
true
|
When true, allows Kyuubi engine to be killed from the Spark Web UI.
|
boolean
|
1.3.0
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index c8809003bc7..2732f8dee18 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -30,7 +30,7 @@ import org.apache.kyuubi.Utils._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch -import org.apache.kyuubi.engine.spark.events.{EngineEvent, EventLoggingService} +import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, EventLoggingService} import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, ServiceDiscovery} import org.apache.kyuubi.service.{Serverable, Service, ServiceState} @@ -51,7 +51,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin } override def initialize(conf: KyuubiConf): Unit = { - val listener = new SparkSQLEngineListener(this) + val listener = new SparkSQLEngineListener(this, new EngineEventsStore(conf)) spark.sparkContext.addSparkListener(listener) addService(eventLogging) addService(frontendService) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala new file mode 100644 index 00000000000..dd52635b178 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters.collectionAsScalaIterableConverter + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT + +/** + * A memory store that tracking the number of statements and sessions, it provides: + * + * - stores all events. + * - cleanup the events when reach a certain threshold: + * 1). remove the finished events first. + * 2). remove the active events if still reach the threshold. + * + * // TODO KYUUBI #983 this store will be used in the third task. + */ +class EngineEventsStore(conf: KyuubiConf) { + + /** + * The number of SQL client sessions kept in the Kyuubi Query Engine web UI. + */ + private val retainedSessions: Int = conf.get(ENGINE_UI_SESSION_LIMIT) + + /** + * store all session events. + */ + val sessions = new ConcurrentHashMap[String, SessionEvent] + + /** + * get all session events order by startTime + */ + def getSessionList: Seq[SessionEvent] = { + sessions.values().asScala.toSeq.sortBy(_.startTime) + } + + def getSession(sessionId: String): Option[SessionEvent] = { + Option(sessions.get(sessionId)) + } + + /** + * save session events and check the capacity threshold + */ + def saveSession(sessionEvent: SessionEvent): Unit = { + sessions.put(sessionEvent.sessionId, sessionEvent) + checkSessionCapacity() + } + + /** + * cleanup the session events if reach the threshold + */ + private def checkSessionCapacity(): Unit = { + var countToDelete = sessions.size - retainedSessions + + val reverseSeq = sessions.values().asScala.toSeq.sortBy(_.startTime).reverse + + // remove finished sessions first. + for (event <- reverseSeq if event.endTime != 0L && countToDelete > 0) { + sessions.remove(event.sessionId) + countToDelete -= 1 + } + + // remove active event if still reach the threshold + for (event <- reverseSeq if countToDelete > 0) { + sessions.remove(event.sessionId) + countToDelete -= 1 + } + } +} + diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala index 102930b7e06..9692b938b24 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala @@ -30,6 +30,7 @@ import org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_STATEMENT_ID_KEY import org.apache.kyuubi.Logging import org.apache.kyuubi.Utils.stringifyException import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent} import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiJobInfo import org.apache.kyuubi.ha.client.EngineServiceDiscovery @@ -40,7 +41,9 @@ import org.apache.kyuubi.service.{Serverable, ServiceState} * * @param server the corresponding engine */ -class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logging { +class SparkSQLEngineListener( + server: Serverable, + store: EngineEventsStore) extends SparkListener with Logging { // the conf of server is null before initialized, use lazy val here private lazy val deregisterExceptions: Seq[String] = @@ -117,4 +120,15 @@ class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logg if e.getCause != null => findCause(e.getCause) case e => e } + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: SessionEvent => updateSession(e) + case _ => // Ignore + } + } + + private def updateSession(event: SessionEvent): Unit = { + store.saveSession(event) + } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala new file mode 100644 index 00000000000..46da9c76173 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.events + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT + +class EngineEventsStoreSuite extends KyuubiFunSuite { + + test("ensure that the sessions are stored in order") { + val store = new EngineEventsStore(KyuubiConf()) + + val s1 = SessionEvent("a", "ea", "test1", "1.1.1.1", 1L) + val s2 = SessionEvent("c", "ea", "test2", "1.1.1.1", 3L) + val s3 = SessionEvent("b", "ea", "test3", "1.1.1.1", 2L) + + store.saveSession(s1) + store.saveSession(s2) + store.saveSession(s3) + + assert(store.getSessionList.size == 3) + assert(store.getSessionList.head.sessionId == "a") + assert(store.getSessionList.last.sessionId == "c") + } + + test("test drop sessions when reach the threshold ") { + val conf = KyuubiConf() + conf.set(ENGINE_UI_SESSION_LIMIT, 3) + + val store = new EngineEventsStore(conf) + for (i <- 1 to 5) { + val s = SessionEvent(s"b$i", "ea", s"test$i", "1.1.1.1", 2L) + store.saveSession(s) + } + + assert(store.getSessionList.size == 3) + } + + test("test drop sessions when reach the threshold, and try to keep active events.") { + val conf = KyuubiConf() + conf.set(ENGINE_UI_SESSION_LIMIT, 3) + + val store = new EngineEventsStore(conf) + + store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, 0L)) + store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, 0L)) + store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", 3L, 1L)) + store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, 0L)) + + assert(store.getSessionList.size == 3) + assert(store.getSessionList(2).sessionId == "s4") + } + + test("test check session after update session") { + val store = new EngineEventsStore(KyuubiConf()) + val s = SessionEvent("abc", "ea", "test3", "1.1.1.1", 2L) + store.saveSession(s) + + val finishTimestamp: Long = 456L + s.endTime = finishTimestamp + store.saveSession(s) + + assert(store.getSession("abc").get.endTime == finishTimestamp) + } + +} diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 6df9cac69bc..1d583b05f33 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -877,6 +877,14 @@ object KyuubiConf { .booleanConf .createWithDefault(true) + val ENGINE_UI_SESSION_LIMIT: ConfigEntry[Int] = + buildConf("engine.ui.retainedSessions") + .doc("The number of SQL client sessions kept in the Kyuubi Query Engine web UI.") + .version("1.4.0") + .intConf + .checkValue(_ > 0, "retained sessions must be positive.") + .createWithDefault(200) + val SESSION_NAME: OptionalConfigEntry[String] = buildConf("session.name") .doc("A human readable name of session and we use empty string by default. " +