Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions externals/kyuubi-jdbc-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@
<artifactId>mysql-connector-java</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-queryserver-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
# limitations under the License.
#

org.apache.kyuubi.engine.jdbc.doris.DorisConnectionProvider
org.apache.kyuubi.engine.jdbc.doris.DorisConnectionProvider
org.apache.kyuubi.engine.jdbc.phoenix.PhoenixConnectionProvider
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
# limitations under the License.
#

org.apache.kyuubi.engine.jdbc.dialect.DorisDialect
org.apache.kyuubi.engine.jdbc.dialect.DorisDialect
org.apache.kyuubi.engine.jdbc.dialect.PhoenixDialect
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect

import java.sql.{Connection, ResultSet, Statement}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.doris.{DorisRowSetHelper, DorisSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

class PhoenixDialect extends JdbcDialect {

override def createStatement(connection: Connection, fetchSize: Int): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ignore fetchSize and set it to Integer.MIN_VALUE?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ignore fetchSize and set it to Integer.MIN_VALUE?

fix it

statement
}

override def getTypeInfoOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCatalogsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getSchemasOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String = {
val tTypes =
if (tableTypes == null || tableTypes.isEmpty) {
Set("s", "u")
} else {
tableTypes.asScala.toSet
}
val query = new StringBuilder(
s"""
|SELECT TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY,
|TABLE_SEQ_NUM, TABLE_TYPE, PK_NAME,
|COLUMN_COUNT, SALT_BUCKETS, DATA_TABLE_NAME, INDEX_STATE
|IMMUTABLE_ROWS, VIEW_STATEMENT
|FROM SYSTEM.CATALOG
|""".stripMargin)

val filters = ArrayBuffer[String]()

if (StringUtils.isNotBlank(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}

if (tTypes.nonEmpty) {
filters += s"(${tTypes.map { tableType => s"$TABLE_TYPE = '$tableType'" }
.mkString(" OR ")})"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
}

override def getTableTypesOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this can be implemented since line 62 returns Set("s", "u")

Copy link
Author

@531651225 531651225 Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this can be implemented since line 62 returns Set("s", "u")

thanks your advice, Whether it can be improved in the future, and how to implement it has not been decided

}

override def getColumnsQuery(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): String = {
val query = new StringBuilder(
"""
|SELECT TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY,
|TABLE_SEQ_NUM, TABLE_TYPE, PK_NAME,
|COLUMN_COUNT, SALT_BUCKETS, DATA_TABLE_NAME, INDEX_STATE
|IMMUTABLE_ROWS, VIEW_STATEMENT
|FROM SYSTEM.CATALOG
|""".stripMargin)

val filters = ArrayBuffer[String]()

if (StringUtils.isNotEmpty(tableName)) {
filters += s"$TABLE_NAME LIKE '$tableName'"
}
if (StringUtils.isNotEmpty(columnName)) {
filters += s"$COLUMN_NAME LIKE '$columnName'"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
}

override def getFunctionsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getPrimaryKeysOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCrossReferenceOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getRowSetHelper(): RowSetHelper = {
new DorisRowSetHelper
}

override def getSchemaHelper(): SchemaHelper = {
new DorisSchemaHelper
}

override def name(): String = {
"phoenix"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.phoenix

import org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider

class PhoenixConnectionProvider extends JdbcConnectionProvider {

override val name: String = classOf[PhoenixConnectionProvider].getSimpleName

override val driverClass: String = "org.apache.phoenix.queryserver.client.Driver"

override def canHandle(providerClass: String): Boolean = {
driverClass.equalsIgnoreCase(providerClass)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.phoenix

import java.sql.{Date, Types}
import java.time.LocalDateTime

import scala.collection.JavaConverters._

import org.apache.hive.service.rpc.thrift._

import org.apache.kyuubi.engine.jdbc.schema.{Column, RowSetHelper}
import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime}

class PhoenixRowSetHelper extends RowSetHelper {

protected def toTColumn(
rows: Seq[Seq[Any]],
ordinal: Int,
sqlType: Int): TColumn = {
val nulls = new java.util.BitSet()
sqlType match {

case Types.BIT =>
val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
TColumn.boolVal(new TBoolColumn(values, nulls))

case Types.TINYINT =>
val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte)
TColumn.byteVal(new TByteColumn(values, nulls))

case Types.SMALLINT =>
val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort)
TColumn.i16Val(new TI16Column(values, nulls))

case Types.INTEGER =>
val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
TColumn.i32Val(new TI32Column(values, nulls))

case Types.BIGINT =>
val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
TColumn.i64Val(new TI64Column(values, nulls))

case Types.REAL =>
val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat)
.asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
TColumn.doubleVal(new TDoubleColumn(values, nulls))

case Types.DOUBLE =>
val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble)
TColumn.doubleVal(new TDoubleColumn(values, nulls))

case Types.CHAR | Types.VARCHAR =>
val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
TColumn.stringVal(new TStringColumn(values, nulls))

case _ =>
val rowSize = rows.length
val values = new java.util.ArrayList[String](rowSize)
var i = 0
while (i < rowSize) {
val row = rows(i)
nulls.set(i, row(ordinal) == null)
val value =
if (row(ordinal) == null) {
""
} else {
toHiveString(row(ordinal), sqlType)
}
values.add(value)
i += 1
}
TColumn.stringVal(new TStringColumn(values, nulls))
}
}

protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = {
types(ordinal).sqlType match {
case Types.BIT =>
val boolValue = new TBoolValue
if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
TColumnValue.boolVal(boolValue)

case Types.TINYINT =>
val byteValue = new TByteValue()
if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte])
TColumnValue.byteVal(byteValue)

case Types.SMALLINT =>
val tI16Value = new TI16Value()
if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short])
TColumnValue.i16Val(tI16Value)

case Types.INTEGER =>
val tI32Value = new TI32Value
if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
TColumnValue.i32Val(tI32Value)

case Types.BIGINT =>
val tI64Value = new TI64Value
if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
TColumnValue.i64Val(tI64Value)

case Types.REAL =>
val tDoubleValue = new TDoubleValue
if (row(ordinal) != null) {
val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString)
tDoubleValue.setValue(doubleValue)
}
TColumnValue.doubleVal(tDoubleValue)

case Types.DOUBLE =>
val tDoubleValue = new TDoubleValue
if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
TColumnValue.doubleVal(tDoubleValue)

case Types.CHAR | Types.VARCHAR =>
val tStringValue = new TStringValue
if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
TColumnValue.stringVal(tStringValue)

case _ =>
val tStrValue = new TStringValue
if (row(ordinal) != null) {
tStrValue.setValue(
toHiveString(row(ordinal), types(ordinal).sqlType))
}
TColumnValue.stringVal(tStrValue)
}
}

protected def toHiveString(data: Any, sqlType: Int): String = {
(data, sqlType) match {
case (date: Date, Types.DATE) =>
formatDate(date)
case (dateTime: LocalDateTime, Types.TIMESTAMP) =>
formatLocalDateTime(dateTime)
case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
decimal.toPlainString
// TODO support bitmap and hll
case (other, _) =>
other.toString
}
}
}
Loading