diff --git a/externals/kyuubi-jdbc-engine/pom.xml b/externals/kyuubi-jdbc-engine/pom.xml index e00902d7a92..899add2ee44 100644 --- a/externals/kyuubi-jdbc-engine/pom.xml +++ b/externals/kyuubi-jdbc-engine/pom.xml @@ -70,6 +70,12 @@ mysql-connector-java test + + + org.apache.phoenix + phoenix-queryserver-client + test + diff --git a/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider b/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider index a24780b3a71..ec68c6884a9 100644 --- a/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider +++ b/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider @@ -15,4 +15,5 @@ # limitations under the License. # -org.apache.kyuubi.engine.jdbc.doris.DorisConnectionProvider \ No newline at end of file +org.apache.kyuubi.engine.jdbc.doris.DorisConnectionProvider +org.apache.kyuubi.engine.jdbc.phoenix.PhoenixConnectionProvider \ No newline at end of file diff --git a/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect b/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect index 7fb7f64bb01..cf84af61253 100644 --- a/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect +++ b/externals/kyuubi-jdbc-engine/src/main/resources/META-INF/services/org.apache.kyuubi.engine.jdbc.dialect.JdbcDialect @@ -15,4 +15,5 @@ # limitations under the License. # -org.apache.kyuubi.engine.jdbc.dialect.DorisDialect \ No newline at end of file +org.apache.kyuubi.engine.jdbc.dialect.DorisDialect +org.apache.kyuubi.engine.jdbc.dialect.PhoenixDialect \ No newline at end of file diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala new file mode 100644 index 00000000000..0cce14b42fc --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.dialect + +import java.sql.{Connection, ResultSet, Statement} +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.commons.lang3.StringUtils + +import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.engine.jdbc.phoenix.{PhoenixRowSetHelper, PhoenixSchemaHelper} +import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper} +import org.apache.kyuubi.operation.Operation +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ +import org.apache.kyuubi.session.Session + +class PhoenixDialect extends JdbcDialect { + + override def createStatement(connection: Connection, fetchSize: Int): Statement = { + val statement = + connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + statement.setFetchSize(fetchSize) + statement + } + + override def getTypeInfoOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getCatalogsOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getSchemasOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getTablesQuery( + catalog: String, + schema: String, + tableName: String, + tableTypes: util.List[String]): String = { + val tTypes = + if (tableTypes == null || tableTypes.isEmpty) { + Set("s", "u") + } else { + tableTypes.asScala.toSet + } + val query = new StringBuilder( + s""" + |SELECT TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, + |TABLE_SEQ_NUM, TABLE_TYPE, PK_NAME, + |COLUMN_COUNT, SALT_BUCKETS, DATA_TABLE_NAME, INDEX_STATE + |IMMUTABLE_ROWS, VIEW_STATEMENT + |FROM SYSTEM.CATALOG + |""".stripMargin) + + val filters = ArrayBuffer[String]() + + if (StringUtils.isNotBlank(tableName)) { + filters += s"$TABLE_NAME LIKE '$tableName'" + } + + if (tTypes.nonEmpty) { + filters += s"(${tTypes.map { tableType => s"$TABLE_TYPE = '$tableType'" } + .mkString(" OR ")})" + } + + if (filters.nonEmpty) { + query.append(" WHERE ") + query.append(filters.mkString(" AND ")) + } + + query.toString() + } + + override def getTableTypesOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getColumnsQuery( + session: Session, + catalogName: String, + schemaName: String, + tableName: String, + columnName: String): String = { + val query = new StringBuilder( + """ + |SELECT TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, + |TABLE_SEQ_NUM, TABLE_TYPE, PK_NAME, + |COLUMN_COUNT, SALT_BUCKETS, DATA_TABLE_NAME, INDEX_STATE + |IMMUTABLE_ROWS, VIEW_STATEMENT + |FROM SYSTEM.CATALOG + |""".stripMargin) + + val filters = ArrayBuffer[String]() + + if (StringUtils.isNotEmpty(tableName)) { + filters += s"$TABLE_NAME LIKE '$tableName'" + } + if (StringUtils.isNotEmpty(columnName)) { + filters += s"$COLUMN_NAME LIKE '$columnName'" + } + + if (filters.nonEmpty) { + query.append(" WHERE ") + query.append(filters.mkString(" AND ")) + } + + query.toString() + } + + override def getFunctionsOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getPrimaryKeysOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getCrossReferenceOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getRowSetHelper(): RowSetHelper = { + new PhoenixRowSetHelper + } + + override def getSchemaHelper(): SchemaHelper = { + new PhoenixSchemaHelper + } + + override def name(): String = { + "phoenix" + } +} diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixConnectionProvider.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixConnectionProvider.scala new file mode 100644 index 00000000000..444e83d06ac --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixConnectionProvider.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.phoenix + +import org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider + +class PhoenixConnectionProvider extends JdbcConnectionProvider { + + override val name: String = classOf[PhoenixConnectionProvider].getSimpleName + + override val driverClass: String = "org.apache.phoenix.queryserver.client.Driver" + + override def canHandle(providerClass: String): Boolean = { + driverClass.equalsIgnoreCase(providerClass) + } + +} diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala new file mode 100644 index 00000000000..a1f6d4ac25c --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.phoenix + +import java.sql.{Date, Types} +import java.time.LocalDateTime + +import scala.collection.JavaConverters._ + +import org.apache.hive.service.rpc.thrift._ + +import org.apache.kyuubi.engine.jdbc.schema.{Column, RowSetHelper} +import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime} + +class PhoenixRowSetHelper extends RowSetHelper { + + protected def toTColumn( + rows: Seq[Seq[Any]], + ordinal: Int, + sqlType: Int): TColumn = { + val nulls = new java.util.BitSet() + sqlType match { + + case Types.BIT => + val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true) + TColumn.boolVal(new TBoolColumn(values, nulls)) + + case Types.TINYINT => + val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte) + TColumn.byteVal(new TByteColumn(values, nulls)) + + case Types.SMALLINT => + val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort) + TColumn.i16Val(new TI16Column(values, nulls)) + + case Types.INTEGER => + val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0) + TColumn.i32Val(new TI32Column(values, nulls)) + + case Types.BIGINT => + val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L) + TColumn.i64Val(new TI64Column(values, nulls)) + + case Types.REAL => + val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat) + .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava + TColumn.doubleVal(new TDoubleColumn(values, nulls)) + + case Types.DOUBLE => + val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble) + TColumn.doubleVal(new TDoubleColumn(values, nulls)) + + case Types.CHAR | Types.VARCHAR => + val values = getOrSetAsNull[String](rows, ordinal, nulls, "") + TColumn.stringVal(new TStringColumn(values, nulls)) + + case _ => + val rowSize = rows.length + val values = new java.util.ArrayList[String](rowSize) + var i = 0 + while (i < rowSize) { + val row = rows(i) + nulls.set(i, row(ordinal) == null) + val value = + if (row(ordinal) == null) { + "" + } else { + toHiveString(row(ordinal), sqlType) + } + values.add(value) + i += 1 + } + TColumn.stringVal(new TStringColumn(values, nulls)) + } + } + + protected def toTColumnValue(ordinal: Int, row: List[Any], types: List[Column]): TColumnValue = { + types(ordinal).sqlType match { + case Types.BIT => + val boolValue = new TBoolValue + if (row(ordinal) != null) boolValue.setValue(row(ordinal).asInstanceOf[Boolean]) + TColumnValue.boolVal(boolValue) + + case Types.TINYINT => + val byteValue = new TByteValue() + if (row(ordinal) != null) byteValue.setValue(row(ordinal).asInstanceOf[Byte]) + TColumnValue.byteVal(byteValue) + + case Types.SMALLINT => + val tI16Value = new TI16Value() + if (row(ordinal) != null) tI16Value.setValue(row(ordinal).asInstanceOf[Short]) + TColumnValue.i16Val(tI16Value) + + case Types.INTEGER => + val tI32Value = new TI32Value + if (row(ordinal) != null) tI32Value.setValue(row(ordinal).asInstanceOf[Int]) + TColumnValue.i32Val(tI32Value) + + case Types.BIGINT => + val tI64Value = new TI64Value + if (row(ordinal) != null) tI64Value.setValue(row(ordinal).asInstanceOf[Long]) + TColumnValue.i64Val(tI64Value) + + case Types.REAL => + val tDoubleValue = new TDoubleValue + if (row(ordinal) != null) { + val doubleValue = java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString) + tDoubleValue.setValue(doubleValue) + } + TColumnValue.doubleVal(tDoubleValue) + + case Types.DOUBLE => + val tDoubleValue = new TDoubleValue + if (row(ordinal) != null) tDoubleValue.setValue(row(ordinal).asInstanceOf[Double]) + TColumnValue.doubleVal(tDoubleValue) + + case Types.CHAR | Types.VARCHAR => + val tStringValue = new TStringValue + if (row(ordinal) != null) tStringValue.setValue(row(ordinal).asInstanceOf[String]) + TColumnValue.stringVal(tStringValue) + + case _ => + val tStrValue = new TStringValue + if (row(ordinal) != null) { + tStrValue.setValue( + toHiveString(row(ordinal), types(ordinal).sqlType)) + } + TColumnValue.stringVal(tStrValue) + } + } + + protected def toHiveString(data: Any, sqlType: Int): String = { + (data, sqlType) match { + case (date: Date, Types.DATE) => + formatDate(date) + case (dateTime: LocalDateTime, Types.TIMESTAMP) => + formatLocalDateTime(dateTime) + case (decimal: java.math.BigDecimal, Types.DECIMAL) => + decimal.toPlainString + // TODO support bitmap and hll + case (other, _) => + other.toString + } + } +} diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala new file mode 100644 index 00000000000..f5e04f7ca72 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixSchemaHelper.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.phoenix + +import java.sql.Types + +import org.apache.hive.service.rpc.thrift._ + +import org.apache.kyuubi.engine.jdbc.schema.SchemaHelper + +class PhoenixSchemaHelper extends SchemaHelper { + + override def toTTypeId(sqlType: Int): TTypeId = sqlType match { + case Types.BIT => + TTypeId.BOOLEAN_TYPE + + case Types.TINYINT => + TTypeId.TINYINT_TYPE + + case Types.SMALLINT => + TTypeId.SMALLINT_TYPE + + case Types.INTEGER => + TTypeId.INT_TYPE + + case Types.BIGINT => + TTypeId.BIGINT_TYPE + + case Types.REAL => + TTypeId.FLOAT_TYPE + + case Types.DOUBLE => + TTypeId.DOUBLE_TYPE + + case Types.CHAR | Types.VARCHAR => + TTypeId.STRING_TYPE + + case Types.DATE => + TTypeId.DATE_TYPE + + case Types.TIMESTAMP => + TTypeId.TIMESTAMP_TYPE + + case Types.DECIMAL => + TTypeId.DECIMAL_TYPE + + // TODO add more type support + case _ => + TTypeId.STRING_TYPE + } +} diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/OperationWithPhoenixEngineSuite.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/OperationWithPhoenixEngineSuite.scala new file mode 100644 index 00000000000..812efe3ee54 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/OperationWithPhoenixEngineSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.phoenix + +import org.apache.hive.service.rpc.thrift.{TGetInfoReq, TGetInfoType} + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider +import org.apache.kyuubi.operation.HiveJDBCTestHelper + +class OperationWithPhoenixEngineSuite extends PhoenixOperationSuite with HiveJDBCTestHelper { + + override protected def jdbcUrl: String = jdbcConnectionUrl + + test("Test for Jdbc engine getInfo") { + val metaData = ConnectionProvider.create(kyuubiConf).getMetaData + + withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() { + withSessionHandle { (client, handle) => + val req = new TGetInfoReq() + req.setSessionHandle(handle) + req.setInfoType(TGetInfoType.CLI_DBMS_NAME) + assert(client.GetInfo(req).getInfoValue.getStringValue == metaData.getDatabaseProductName) + + val req2 = new TGetInfoReq() + req2.setSessionHandle(handle) + req2.setInfoType(TGetInfoType.CLI_DBMS_VER) + assert( + client.GetInfo(req2).getInfoValue.getStringValue == metaData.getDatabaseProductVersion) + + val req3 = new TGetInfoReq() + req3.setSessionHandle(handle) + req3.setInfoType(TGetInfoType.CLI_MAX_COLUMN_NAME_LEN) + assert(client.GetInfo(req3).getInfoValue.getLenValue == metaData.getMaxColumnNameLength) + + val req4 = new TGetInfoReq() + req4.setSessionHandle(handle) + req4.setInfoType(TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN) + assert(client.GetInfo(req4).getInfoValue.getLenValue == metaData.getMaxSchemaNameLength) + + val req5 = new TGetInfoReq() + req5.setSessionHandle(handle) + req5.setInfoType(TGetInfoType.CLI_MAX_TABLE_NAME_LEN) + assert(client.GetInfo(req5).getInfoValue.getLenValue == metaData.getMaxTableNameLength) + } + } + } +} diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixOperationSuite.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixOperationSuite.scala new file mode 100644 index 00000000000..2c5d5e1803e --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixOperationSuite.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.phoenix + +import java.sql.ResultSet + +import scala.collection.mutable.ArrayBuffer + +import org.apache.kyuubi.operation.HiveJDBCTestHelper +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ + +abstract class PhoenixOperationSuite extends WithPhoenixEngine with HiveJDBCTestHelper { + test("phoenix - get tables") { + case class Table(catalog: String, schema: String, tableName: String, tableType: String) + + withJdbcStatement() { statement => + val meta = statement.getConnection.getMetaData + val resultBuffer = ArrayBuffer[Table]() + + var tables = meta.getTables(null, null, null, null) + while (tables.next()) { + resultBuffer += + Table( + null, + null, + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultBuffer.contains(Table(null, null, "CATALOG", "s"))) + assert(resultBuffer.contains(Table(null, null, "LOG", "s"))) + resultBuffer.clear() + + statement.execute("create table db1.test1(id bigint primary key)") + statement.execute("create table db1.test2(id bigint primary key)") + + tables = meta.getTables(null, null, "TEST1", Array("u")) + while (tables.next()) { + val table = Table( + null, + null, + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + assert(table == Table(null, null, "TEST1", "u")) + } + + tables = meta.getTables(null, null, "TEST2", null) + while (tables.next()) { + resultBuffer += Table( + null, + null, + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultBuffer.contains(Table(null, null, "TEST2", "u"))) + resultBuffer.clear() + + tables = meta.getTables(null, null, null, Array("u")) + while (tables.next()) { + resultBuffer += Table( + null, + null, + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultBuffer.contains(Table(null, null, "TEST1", "u"))) + assert(resultBuffer.contains(Table(null, null, "TEST2", "u"))) + resultBuffer.clear() + + tables = meta.getTables(null, null, null, Array("u", "s")) + while (tables.next()) { + resultBuffer += Table( + null, + null, + tables.getString(TABLE_NAME), + tables.getString(TABLE_TYPE)) + } + assert(resultBuffer.contains(Table(null, null, "TEST1", "u"))) + assert(resultBuffer.contains(Table(null, null, "TEST2", "u"))) + assert(resultBuffer.contains(Table(null, null, "LOG", "s"))) + assert(resultBuffer.contains(Table(null, null, "CATALOG", "s"))) + resultBuffer.clear() + + statement.execute("drop table db1.test1") + statement.execute("drop table db1.test2") + } + } + + test("phoenix - get columns") { + case class Column(tableName: String, columnName: String) + + def buildColumn(resultSet: ResultSet): Column = { + val tableName = resultSet.getString(TABLE_NAME) + val columnName = resultSet.getString(COLUMN_NAME) + val column = Column(tableName, columnName) + column + } + + withJdbcStatement() { statement => + val metadata = statement.getConnection.getMetaData + statement.execute("create table if not exists db1.test1" + + "(id bigint primary key, str1 varchar, str2 varchar, age integer)") + + statement.execute("create table if not exists db1.test2" + + "(id bigint primary key, str1 varchar, str2 varchar, age integer)") + + val resultBuffer = ArrayBuffer[Column]() + val resultSet1 = metadata.getColumns(null, null, null, null) + while (resultSet1.next()) { + val column = buildColumn(resultSet1) + resultBuffer += column + } + + assert(resultBuffer.contains(Column("TEST1", "ID"))) + assert(resultBuffer.contains(Column("TEST1", "STR1"))) + assert(resultBuffer.contains(Column("TEST1", "STR2"))) + assert(resultBuffer.contains(Column("TEST1", "AGE"))) + + assert(resultBuffer.contains(Column("TEST2", "ID"))) + assert(resultBuffer.contains(Column("TEST2", "STR1"))) + assert(resultBuffer.contains(Column("TEST2", "STR2"))) + assert(resultBuffer.contains(Column("TEST2", "AGE"))) + + resultBuffer.clear() + + val resultSet2 = metadata.getColumns(null, null, "TEST1", null) + while (resultSet2.next()) { + val column = buildColumn(resultSet2) + resultBuffer += column + } + + assert(resultBuffer.contains(Column("TEST1", "ID"))) + assert(resultBuffer.contains(Column("TEST1", "STR1"))) + assert(resultBuffer.contains(Column("TEST1", "STR2"))) + assert(resultBuffer.contains(Column("TEST1", "AGE"))) + + resultBuffer.clear() + + val resultSet3 = metadata.getColumns(null, null, null, "AGE") + while (resultSet3.next()) { + val column = buildColumn(resultSet3) + resultBuffer += column + } + + assert(resultBuffer.contains(Column("TEST1", "AGE"))) + assert(resultBuffer.contains(Column("TEST2", "AGE"))) + + resultBuffer.clear() + + val resultSet4 = metadata.getColumns(null, null, "T%1", "STR%") + while (resultSet4.next()) { + val column = buildColumn(resultSet4) + resultBuffer += column + } + + assert(resultBuffer.contains(Column("TEST1", "STR1"))) + + resultBuffer.clear() + + val resultSet5 = metadata.getColumns(null, null, "T%1", "fake") + assert(!resultSet5.next()) + + statement.execute("drop table db1.test1") + statement.execute("drop table db1.test2") + } + } +} diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/SessionSuite.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/SessionSuite.scala new file mode 100644 index 00000000000..e61d0916e5a --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/SessionSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.phoenix + +import org.apache.kyuubi.operation.HiveJDBCTestHelper + +class SessionSuite extends WithPhoenixEngine with HiveJDBCTestHelper { + + test("test session") { + withJdbcStatement() { statement => + val resultSet = statement.executeQuery( + "select '1' as id") + val metadata = resultSet.getMetaData + for (i <- 1 to metadata.getColumnCount) { + assert(metadata.getColumnName(i) == "ID") + } + while (resultSet.next()) { + val id = resultSet.getObject(1) + assert(id == "1") + } + } + } + + override protected def jdbcUrl: String = jdbcConnectionUrl +} diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/StatementSuite.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/StatementSuite.scala new file mode 100644 index 00000000000..d7e7ebb9b64 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/StatementSuite.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.phoenix + +import java.sql.{Date, Timestamp} + +import org.apache.kyuubi.operation.HiveJDBCTestHelper + +class StatementSuite extends WithPhoenixEngine with HiveJDBCTestHelper { + + test("test select") { + withJdbcStatement("test1") { statement => + statement.execute("create table db1.test1(id bigint primary key, " + + "name varchar(255), age integer)") + statement.execute("upsert into db1.test1 values(1, 'a', 11)") + + val resultSet1 = statement.executeQuery("select * from db1.test1") + while (resultSet1.next()) { + val id = resultSet1.getObject(1) + assert(id == 1) + val name = resultSet1.getObject(2) + assert(name == "a") + val age = resultSet1.getObject(3) + assert(age == 11) + } + } + } + + test("test types") { + withJdbcStatement("test1") { statement => + statement.execute("create table db1.type_test(" + + "id bigint primary key, " + + "tiny_col tinyint, smallint_col smallint, " + + "int_col integer, bigint_col bigint, " + + "decimal_col decimal(27, 9), " + + "date_col date, timestamp_col timestamp, " + + "char_col char(10), varchar_col varchar(255), " + + "boolean_col boolean, " + + "double_col double, float_col float)") + statement.execute("upsert into db1.type_test" + + "(id, " + + "tiny_col, smallint_col, int_col, bigint_col, " + + "decimal_col, " + + "date_col, timestamp_col, " + + "char_col, varchar_col, " + + "boolean_col, " + + "double_col, float_col) " + + "VALUES (1, 2, 3, 4, 5, 7.7, '2022-05-08', '2022-05-08 17:47:45'," + + "'a', 'Hello', true, 8.8, 9.9)") + + val resultSet1 = statement.executeQuery("select * from db1.type_test") + while (resultSet1.next()) { + assert(resultSet1.getObject(1) == 1) + assert(resultSet1.getObject(2) == 2) + assert(resultSet1.getObject(3) == 3) + assert(resultSet1.getObject(4) == 4) + assert(resultSet1.getObject(5) == 5) + assert(resultSet1.getObject(6) == new java.math.BigDecimal("7.7")) + assert(resultSet1.getObject(7) == Date.valueOf("2022-05-08")) + assert(resultSet1.getObject(8) == Timestamp.valueOf("2022-05-08 17:47:45")) + assert(resultSet1.getObject(9) == "a") + assert(resultSet1.getObject(10) == "Hello") + assert(resultSet1.getObject(11) == true) + assert(resultSet1.getObject(12) == 8.8) + assert(resultSet1.getObject(13) == 9.9) + } + } + } + + override protected def jdbcUrl: String = jdbcConnectionUrl +} diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/WithPhoenixContainer.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/WithPhoenixContainer.scala new file mode 100644 index 00000000000..49b4369bc46 --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/WithPhoenixContainer.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.phoenix + +import com.dimafeng.testcontainers.{GenericContainer, SingleContainer} +import org.testcontainers.containers.wait.strategy.Wait + +import org.apache.kyuubi.engine.jdbc.WithJdbcServerContainer + +trait WithPhoenixContainer extends WithJdbcServerContainer { + + private val PHOENIX_PORT = 8765 + + private val phoenixDockerImage = "iteblog/hbase-phoenix-docker:1.0" + + override val container: SingleContainer[_] = GenericContainer( + dockerImage = phoenixDockerImage, + exposedPorts = Seq(PHOENIX_PORT), + waitStrategy = Wait.forListeningPort) + + protected def queryServerUrl: String = { + val queryServerHost: String = container.host + val queryServerPort: Int = container.mappedPort(PHOENIX_PORT) + val url = s"$queryServerHost:$queryServerPort" + url + } + + override def afterAll(): Unit = { + super.afterAll() + container.close() + } + +} diff --git a/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/WithPhoenixEngine.scala b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/WithPhoenixEngine.scala new file mode 100644 index 00000000000..adf328d7b0c --- /dev/null +++ b/externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/phoenix/WithPhoenixEngine.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.jdbc.phoenix + +import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.engine.jdbc.WithJdbcEngine + +trait WithPhoenixEngine extends WithJdbcEngine with WithPhoenixContainer { + + private val PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0" + private val serialization = "serialization=PROTOBUF" + private val jdbcUrlPrefix = "jdbc:phoenix:thin:url" + + override def withKyuubiConf: Map[String, String] = Map( + ENGINE_SHARE_LEVEL.key -> "SERVER", + ENGINE_JDBC_CONNECTION_URL.key -> s"$getConnectString", + ENGINE_JDBC_CONNECTION_USER.key -> "root", + ENGINE_JDBC_CONNECTION_PASSWORD.key -> "", + ENGINE_TYPE.key -> "jdbc", + ENGINE_JDBC_SHORT_NAME.key -> "phoenix", + ENGINE_JDBC_DRIVER_CLASS.key -> "org.apache.phoenix.queryserver.client.Driver") + + private def getConnectString: String = s"$jdbcUrlPrefix=http://$queryServerUrl;$serialization" + +} diff --git a/integration-tests/kyuubi-jdbc-it/pom.xml b/integration-tests/kyuubi-jdbc-it/pom.xml index 526579e7cd9..34ab73e326d 100644 --- a/integration-tests/kyuubi-jdbc-it/pom.xml +++ b/integration-tests/kyuubi-jdbc-it/pom.xml @@ -101,6 +101,13 @@ true ${project.build.directory} + + org.apache.phoenix + phoenix-queryserver-client + ${phoenix.version} + true + ${project.build.directory} + diff --git a/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/OperationWithServerSuite.scala b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/OperationWithServerSuite.scala new file mode 100644 index 00000000000..45bc1ea6e4e --- /dev/null +++ b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/OperationWithServerSuite.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.it.jdbc.phoenix + +import org.apache.kyuubi.engine.jdbc.phoenix.PhoenixOperationSuite + +class OperationWithServerSuite extends PhoenixOperationSuite + with WithKyuubiServerAndPhoenixContainer { + + override protected def jdbcUrl: String = getJdbcUrl + +} diff --git a/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/SessionWithServerSuite.scala b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/SessionWithServerSuite.scala new file mode 100644 index 00000000000..b36a965c5d3 --- /dev/null +++ b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/SessionWithServerSuite.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.it.jdbc.phoenix + +import org.apache.kyuubi.engine.jdbc.phoenix.SessionSuite + +class SessionWithServerSuite extends SessionSuite + with WithKyuubiServerAndPhoenixContainer { + + override protected def jdbcUrl: String = getJdbcUrl + +} diff --git a/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/StatementWithServerSuite.scala b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/StatementWithServerSuite.scala new file mode 100644 index 00000000000..9b373968694 --- /dev/null +++ b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/StatementWithServerSuite.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.it.jdbc.phoenix + +import org.apache.kyuubi.engine.jdbc.phoenix.StatementSuite + +class StatementWithServerSuite extends StatementSuite + with WithKyuubiServerAndPhoenixContainer { + + override protected def jdbcUrl: String = getJdbcUrl + +} diff --git a/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/WithKyuubiServerAndPhoenixContainer.scala b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/WithKyuubiServerAndPhoenixContainer.scala new file mode 100644 index 00000000000..d9fb3258e46 --- /dev/null +++ b/integration-tests/kyuubi-jdbc-it/src/test/scala/org/apache/kyuubi/it/jdbc/phoenix/WithKyuubiServerAndPhoenixContainer.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.it.jdbc.phoenix + +import java.nio.file.{Files, Path, Paths} +import java.time.Duration + +import org.apache.kyuubi.{Utils, WithKyuubiServer} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_IDLE_TIMEOUT, ENGINE_JDBC_EXTRA_CLASSPATH, KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME} +import org.apache.kyuubi.engine.jdbc.phoenix.WithPhoenixEngine + +trait WithKyuubiServerAndPhoenixContainer extends WithKyuubiServer with WithPhoenixEngine { + + private val kyuubiHome: String = Utils + .getCodeSourceLocation(getClass).split("integration-tests").head + + private val phoenixJdbcConnectorPath: String = { + val keyword = "phoenix-queryserver" + + val jarsDir = Paths.get(kyuubiHome) + .resolve("integration-tests") + .resolve("kyuubi-jdbc-it") + .resolve("target") + + Files.list(jarsDir) + .filter { p: Path => p.getFileName.toString contains keyword } + .findFirst + .orElseThrow { () => new IllegalStateException(s"Can not find $keyword in $jarsDir.") } + .toAbsolutePath + .toString + } + + override protected val conf: KyuubiConf = { + KyuubiConf() + .set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome) + .set(ENGINE_JDBC_EXTRA_CLASSPATH, phoenixJdbcConnectorPath) + .set(ENGINE_IDLE_TIMEOUT, Duration.ofMinutes(1).toMillis) + } + + override def beforeAll(): Unit = { + val configs = withKyuubiConf + configs.foreach(config => conf.set(config._1, config._2)) + super.beforeAll() + } +} diff --git a/pom.xml b/pom.xml index 996988f9ccd..7bd42fb257e 100644 --- a/pom.xml +++ b/pom.xml @@ -172,6 +172,7 @@ 8.0.27 4.1.84.Final 1.10.1 + 6.0.0 0.16.0 3.21.7 0.10.7 @@ -1499,6 +1500,12 @@ ${mysql.jdbc.version} + + org.apache.phoenix + phoenix-queryserver-client + ${phoenix.version} + + org.apache.flink