Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,27 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect
import java.sql.{Connection, ResultSet, Statement}
import java.sql.{Connection, Statement}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.doris.{DorisRowSetHelper, DorisSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

class DorisDialect extends JdbcDialect {

override def createStatement(connection: Connection, fetchSize: Int): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val statement = super.createStatement(connection, fetchSize)
statement.setFetchSize(Integer.MIN_VALUE)
statement
}

override def getTypeInfoOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCatalogsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getSchemasOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getTablesQuery(
catalog: String,
schema: String,
Expand Down Expand Up @@ -96,10 +81,6 @@ class DorisDialect extends JdbcDialect {
query.toString()
}

override def getTableTypesOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getColumnsQuery(
session: Session,
catalogName: String,
Expand Down Expand Up @@ -139,18 +120,6 @@ class DorisDialect extends JdbcDialect {
query.toString()
}

override def getFunctionsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getPrimaryKeysOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCrossReferenceOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getRowSetHelper(): RowSetHelper = {
new DorisRowSetHelper
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.kyuubi.engine.jdbc.dialect

import java.sql.{Connection, Statement}
import java.sql.{Connection, ResultSet, Statement}
import java.util

import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_SHORT_NAME}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
Expand All @@ -30,21 +30,34 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._

abstract class JdbcDialect extends SupportServiceLoader with Logging {

def createStatement(connection: Connection, fetchSize: Int = 1000): Statement
def createStatement(connection: Connection, fetchSize: Int = 1000): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(fetchSize)
statement
}

def getTypeInfoOperation(session: Session): Operation
def getTypeInfoOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getCatalogsOperation(session: Session): Operation
def getCatalogsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getSchemasOperation(session: Session): Operation
def getSchemasOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getTablesQuery(
catalog: String,
schema: String,
tableName: String,
tableTypes: util.List[String]): String

def getTableTypesOperation(session: Session): Operation
def getTableTypesOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getColumnsQuery(
session: Session,
Expand All @@ -53,16 +66,21 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
tableName: String,
columnName: String): String

def getFunctionsOperation(session: Session): Operation
def getFunctionsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getPrimaryKeysOperation(session: Session): Operation
def getPrimaryKeysOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getCrossReferenceOperation(session: Session): Operation
def getCrossReferenceOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

def getRowSetHelper(): RowSetHelper

def getSchemaHelper(): SchemaHelper

}

object JdbcDialects extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,20 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.dialect

import java.sql.{Connection, ResultSet, Statement}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.phoenix.{PhoenixRowSetHelper, PhoenixSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

class PhoenixDialect extends JdbcDialect {

override def createStatement(connection: Connection, fetchSize: Int): Statement = {
val statement =
connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(fetchSize)
statement
}

override def getTypeInfoOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCatalogsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getSchemasOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getTablesQuery(
catalog: String,
schema: String,
Expand Down Expand Up @@ -91,10 +68,6 @@ class PhoenixDialect extends JdbcDialect {
query.toString()
}

override def getTableTypesOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getColumnsQuery(
session: Session,
catalogName: String,
Expand Down Expand Up @@ -127,18 +100,6 @@ class PhoenixDialect extends JdbcDialect {
query.toString()
}

override def getFunctionsOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getPrimaryKeysOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getCrossReferenceOperation(session: Session): Operation = {
throw KyuubiSQLException.featureNotSupported()
}

override def getRowSetHelper(): RowSetHelper = {
new PhoenixRowSetHelper
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,125 +16,21 @@
*/
package org.apache.kyuubi.engine.jdbc.doris

import java.sql.{Date, Types}
import java.time.LocalDateTime

import scala.collection.JavaConverters._

import org.apache.hive.service.rpc.thrift._

import org.apache.kyuubi.engine.jdbc.schema.{Column, RowSetHelper}
import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime}
import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper

class DorisRowSetHelper extends RowSetHelper {

protected def toTColumn(
rows: Seq[Seq[Any]],
ordinal: Int,
sqlType: Int): TColumn = {
val nulls = new java.util.BitSet()
sqlType match {
case Types.BIT =>
val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
TColumn.boolVal(new TBoolColumn(values, nulls))

case Types.TINYINT | Types.SMALLINT | Types.INTEGER =>
val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
TColumn.i32Val(new TI32Column(values, nulls))

case Types.BIGINT =>
val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
TColumn.i64Val(new TI64Column(values, nulls))

case Types.REAL =>
val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat)
.asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
TColumn.doubleVal(new TDoubleColumn(values, nulls))

case Types.DOUBLE =>
val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble)
TColumn.doubleVal(new TDoubleColumn(values, nulls))

case Types.CHAR | Types.VARCHAR =>
val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
TColumn.stringVal(new TStringColumn(values, nulls))

case _ =>
val rowSize = rows.length
val values = new java.util.ArrayList[String](rowSize)
var i = 0
while (i < rowSize) {
val row = rows(i)
nulls.set(i, row(ordinal) == null)
val value =
if (row(ordinal) == null) {
""
} else {
toHiveString(row(ordinal), sqlType)
}
values.add(value)
i += 1
}
TColumn.stringVal(new TStringColumn(values, nulls))
}
}

protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = {
types(ordinal).sqlType match {
case Types.BIT =>
val boolValue = new TBoolValue
if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
TColumnValue.boolVal(boolValue)

case Types.TINYINT | Types.SMALLINT | Types.INTEGER =>
val tI32Value = new TI32Value
if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int])
TColumnValue.i32Val(tI32Value)

case Types.BIGINT =>
val tI64Value = new TI64Value
if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long])
TColumnValue.i64Val(tI64Value)

case Types.REAL =>
val tDoubleValue = new TDoubleValue
if (row(ordinal) != null) {
val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString)
tDoubleValue.setValue(doubleValue)
}
TColumnValue.doubleVal(tDoubleValue)

case Types.DOUBLE =>
val tDoubleValue = new TDoubleValue
if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
TColumnValue.doubleVal(tDoubleValue)
override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

case Types.CHAR | Types.VARCHAR =>
val tStringValue = new TStringValue
if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String])
TColumnValue.stringVal(tStringValue)
override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

case _ =>
val tStrValue = new TStringValue
if (row(ordinal) != null) {
tStrValue.setValue(
toHiveString(row(ordinal), types(ordinal).sqlType))
}
TColumnValue.stringVal(tStrValue)
}
}
override def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)

protected def toHiveString(data: Any, sqlType: Int): String = {
(data, sqlType) match {
case (date: Date, Types.DATE) =>
formatDate(date)
case (dateTime: LocalDateTime, Types.TIMESTAMP) =>
formatLocalDateTime(dateTime)
case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
decimal.toPlainString
// TODO support bitmap and hll
case (other, _) =>
other.toString
}
}
override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)
}
Loading