Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ kyuubi\.engine\.share<br>\.level|<div style='width: 65pt;word-wrap: break-word;w
kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) - Using kyuubi.engine.share.level.subdomain instead</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine\.share<br>\.level\.subdomain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values in `^[a-zA-Z_-]{1,14}$` form. For example, for `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.engine\.single<br>\.spark\.session|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine\.ui<br>\.retainedSessions|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of SQL client sessions kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.engine\.ui\.stop<br>\.enabled|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, allows Kyuubi engine to be killed from the Spark Web UI.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kyuubi.Utils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EventLoggingService}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, EventLoggingService}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service, ServiceState}
Expand All @@ -51,7 +51,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}

override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
val listener = new SparkSQLEngineListener(this, new EngineEventsStore(conf))
spark.sparkContext.addSparkListener(listener)
addService(eventLogging)
addService(frontendService)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.events

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT

/**
* A memory store that tracking the number of statements and sessions, it provides:
*
* - stores all events.
* - cleanup the events when reach a certain threshold:
* 1). remove the finished events first.
* 2). remove the active events if still reach the threshold.
*
* // TODO KYUUBI #983 this store will be used in the third task.
*/
class EngineEventsStore(conf: KyuubiConf) {

/**
* The number of SQL client sessions kept in the Kyuubi Query Engine web UI.
*/
private val retainedSessions: Int = conf.get(ENGINE_UI_SESSION_LIMIT)

/**
* store all session events.
*/
val sessions = new ConcurrentHashMap[String, SessionEvent]

/**
* get all session events order by startTime
*/
def getSessionList: Seq[SessionEvent] = {
sessions.values().asScala.toSeq.sortBy(_.startTime)
}

def getSession(sessionId: String): Option[SessionEvent] = {
Option(sessions.get(sessionId))
}

/**
* save session events and check the capacity threshold
*/
def saveSession(sessionEvent: SessionEvent): Unit = {
sessions.put(sessionEvent.sessionId, sessionEvent)
checkSessionCapacity()
}

/**
* cleanup the session events if reach the threshold
*/
private def checkSessionCapacity(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how efficient is this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't find a suitable opensource library to supports following characteristic:

  • Concurrency: concurrent read、update、remove
  • Update
  • Order
  • Complexity

If we implement a new Map which extends AbstractMap, it seems will be complex.

If we keep 200 events in memory, efficiency may not be affected.

var countToDelete = sessions.size - retainedSessions

val reverseSeq = sessions.values().asScala.toSeq.sortBy(_.startTime).reverse
Copy link
Member

@yaooqinn yaooqinn Sep 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we sort the value set for evevy single event?...

Copy link
Contributor Author

@charlesy6 charlesy6 Sep 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use treemap, and let startTime or endTime as key, it will remove events if key repeat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you should use asyn, by this you can sort the value only once when countToDelete reached the set value.
if (retainedSessions/sessions.size >= threshold) { new thread { sort() delete() } }
also you can see Guava Cache, you can set the expire strategy by youself but it will waste some Mem...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use a new thread, thinghs will be more complex


// remove finished sessions first.
for (event <- reverseSeq if event.endTime != 0L && countToDelete > 0) {
sessions.remove(event.sessionId)
countToDelete -= 1
}

// remove active event if still reach the threshold
for (event <- reverseSeq if countToDelete > 0) {
sessions.remove(event.sessionId)
countToDelete -= 1
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.Logging
import org.apache.kyuubi.Utils.stringifyException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent}
import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiJobInfo
import org.apache.kyuubi.ha.client.EngineServiceDiscovery
Expand All @@ -40,7 +41,9 @@ import org.apache.kyuubi.service.{Serverable, ServiceState}
*
* @param server the corresponding engine
*/
class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logging {
class SparkSQLEngineListener(
server: Serverable,
store: EngineEventsStore) extends SparkListener with Logging {

// the conf of server is null before initialized, use lazy val here
private lazy val deregisterExceptions: Seq[String] =
Expand Down Expand Up @@ -117,4 +120,15 @@ class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logg
if e.getCause != null => findCause(e.getCause)
case e => e
}

override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: SessionEvent => updateSession(e)
case _ => // Ignore
}
}

private def updateSession(event: SessionEvent): Unit = {
store.saveSession(event)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.events

import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT

class EngineEventsStoreSuite extends KyuubiFunSuite {

test("ensure that the sessions are stored in order") {
val store = new EngineEventsStore(KyuubiConf())

val s1 = SessionEvent("a", "ea", "test1", "1.1.1.1", 1L)
val s2 = SessionEvent("c", "ea", "test2", "1.1.1.1", 3L)
val s3 = SessionEvent("b", "ea", "test3", "1.1.1.1", 2L)

store.saveSession(s1)
store.saveSession(s2)
store.saveSession(s3)

assert(store.getSessionList.size == 3)
assert(store.getSessionList.head.sessionId == "a")
assert(store.getSessionList.last.sessionId == "c")
}

test("test drop sessions when reach the threshold ") {
val conf = KyuubiConf()
conf.set(ENGINE_UI_SESSION_LIMIT, 3)

val store = new EngineEventsStore(conf)
for (i <- 1 to 5) {
val s = SessionEvent(s"b$i", "ea", s"test$i", "1.1.1.1", 2L)
store.saveSession(s)
}

assert(store.getSessionList.size == 3)
}

test("test drop sessions when reach the threshold, and try to keep active events.") {
val conf = KyuubiConf()
conf.set(ENGINE_UI_SESSION_LIMIT, 3)

val store = new EngineEventsStore(conf)

store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, 0L))
store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, 0L))
store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", 3L, 1L))
store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, 0L))

assert(store.getSessionList.size == 3)
assert(store.getSessionList(2).sessionId == "s4")
}

test("test check session after update session") {
val store = new EngineEventsStore(KyuubiConf())
val s = SessionEvent("abc", "ea", "test3", "1.1.1.1", 2L)
store.saveSession(s)

val finishTimestamp: Long = 456L
s.endTime = finishTimestamp
store.saveSession(s)

assert(store.getSession("abc").get.endTime == finishTimestamp)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,14 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)

val ENGINE_UI_SESSION_LIMIT: ConfigEntry[Int] =
buildConf("engine.ui.retainedSessions")
.doc("The number of SQL client sessions kept in the Kyuubi Query Engine web UI.")
.version("1.4.0")
.intConf
.checkValue(_ > 0, "retained sessions must be positive.")
.createWithDefault(200)

val SESSION_NAME: OptionalConfigEntry[String] =
buildConf("session.name")
.doc("A human readable name of session and we use empty string by default. " +
Expand Down