Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,7 @@
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimeType.TIME;
import static com.facebook.presto.common.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.plugin.clickhouse.ClickHouseEngineType.MERGETREE;
import static com.facebook.presto.plugin.clickhouse.ClickHouseErrorCode.JDBC_ERROR;
import static com.facebook.presto.plugin.clickhouse.ClickhouseDXLKeyWords.ORDER_BY_PROPERTY;
Expand All @@ -94,21 +89,6 @@
public class ClickHouseClient
{
private static final Logger log = Logger.get(ClickHouseClient.class);
private static final Map<Type, String> SQL_TYPES = ImmutableMap.<Type, String>builder()
.put(BOOLEAN, "boolean")
.put(BIGINT, "bigint")
.put(INTEGER, "integer")
.put(SMALLINT, "smallint")
.put(TINYINT, "tinyint")
.put(DOUBLE, "double precision")
.put(REAL, "real")
.put(VARBINARY, "varbinary")
.put(DATE, "Date")
.put(TIME, "time")
.put(TIME_WITH_TIME_ZONE, "time with timezone")
.put(TIMESTAMP, "timestamp")
.put(TIMESTAMP_WITH_TIME_ZONE, "timestamp with timezone")
.build();
private static final String tempTableNamePrefix = "tmp_presto_";
protected static final String identifierQuote = "\"";
protected final String connectorId;
Expand Down Expand Up @@ -185,7 +165,7 @@ public final Set<String> getSchemaNames(ClickHouseIdentity identity)
}
}

public ConnectorSplitSource getSplits(ClickHouseIdentity identity, ClickHouseTableLayoutHandle layoutHandle)
public ConnectorSplitSource getSplits(ClickHouseTableLayoutHandle layoutHandle)
{
ClickHouseTableHandle tableHandle = layoutHandle.getTable();
ClickHouseSplit clickHouseSplit = new ClickHouseSplit(
Expand Down Expand Up @@ -213,7 +193,7 @@ public List<ClickHouseColumnHandle> getColumns(ConnectorSession session, ClickHo
resultSet.getInt("DECIMAL_DIGITS"),
Optional.empty(),
Optional.empty());
Optional<ReadMapping> columnMapping = toPrestoType(session, typeHandle);
Optional<ReadMapping> columnMapping = toPrestoType(typeHandle);
// skip unsupported column types
if (columnMapping.isPresent()) {
String columnName = resultSet.getString("COLUMN_NAME");
Expand All @@ -235,7 +215,7 @@ public List<ClickHouseColumnHandle> getColumns(ConnectorSession session, ClickHo
}
}

public Optional<ReadMapping> toPrestoType(ConnectorSession session, ClickHouseTypeHandle typeHandle)
public Optional<ReadMapping> toPrestoType(ClickHouseTypeHandle typeHandle)
{
return jdbcTypeToPrestoType(typeHandle, mapStringAsVarchar);
}
Expand Down Expand Up @@ -293,7 +273,7 @@ public String buildInsertSql(ClickHouseOutputTableHandle handle)
String columns = Joiner.on(',').join(nCopies(handle.getColumnNames().size(), "?"));
return new StringBuilder()
.append("INSERT INTO ")
.append(quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName()))
.append(quoted(handle.getSchemaName(), handle.getTemporaryTableName()))
.append(" VALUES (").append(columns).append(")")
.toString();
}
Expand Down Expand Up @@ -389,7 +369,7 @@ private static String escapeNamePattern(String name, String escape)
return name;
}

protected String quoted(@Nullable String catalog, @Nullable String schema, String table)
protected String quoted(@Nullable String schema, String table)
{
StringBuilder builder = new StringBuilder();
if (!isNullOrEmpty(schema)) {
Expand All @@ -406,16 +386,10 @@ public void addColumn(ClickHouseIdentity identity, ClickHouseTableHandle handle,
String columnName = column.getName();
String sql = format(
"ALTER TABLE %s ADD COLUMN %s",
quoted(handle.getCatalogName(), schema, table),
quoted(schema, table),
getColumnDefinitionSql(column, columnName));

try (Connection connection = connectionFactory.openConnection(identity)) {
DatabaseMetaData metadata = connection.getMetaData();
if (metadata.storesUpperCaseIdentifiers() && !caseSensitiveNameMatchingEnabled) {
schema = schema != null ? schema.toUpperCase(ENGLISH) : null;
table = table.toUpperCase(ENGLISH);
columnName = columnName.toUpperCase(ENGLISH);
}
execute(connection, sql);
}
catch (SQLException e) {
Expand Down Expand Up @@ -448,14 +422,9 @@ private ClickHouseOutputTableHandle beginWriteTable(ConnectorSession session, Co
public void dropColumn(ClickHouseIdentity identity, ClickHouseTableHandle handle, ClickHouseColumnHandle column)
{
try (Connection connection = connectionFactory.openConnection(identity)) {
DatabaseMetaData metadata = connection.getMetaData();
String columnName = column.getColumnName();
if (metadata.storesUpperCaseIdentifiers() && !caseSensitiveNameMatchingEnabled) {
columnName = columnName.toUpperCase(ENGLISH);
}
String sql = format(
"ALTER TABLE %s DROP COLUMN %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()),
quoted(handle.getSchemaName(), handle.getTableName()),
quoted(column.getColumnName()));
execute(connection, sql);
}
Expand All @@ -466,8 +435,8 @@ public void dropColumn(ClickHouseIdentity identity, ClickHouseTableHandle handle

public void finishInsertTable(ClickHouseIdentity identity, ClickHouseOutputTableHandle handle)
{
String temporaryTable = quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName());
String targetTable = quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName());
String temporaryTable = quoted(handle.getSchemaName(), handle.getTemporaryTableName());
String targetTable = quoted(handle.getSchemaName(), handle.getTableName());
String insertSql = format("INSERT INTO %s SELECT * FROM %s", targetTable, temporaryTable);
String cleanupSql = "DROP TABLE " + temporaryTable;

Expand Down Expand Up @@ -538,15 +507,11 @@ public void renameColumn(ClickHouseIdentity identity, ClickHouseTableHandle hand
{
String sql = format(
"ALTER TABLE %s RENAME COLUMN %s TO %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()),
quoted(handle.getSchemaName(), handle.getTableName()),
quoted(clickHouseColumn.getColumnName()),
quoted(newColumnName));

try (Connection connection = connectionFactory.openConnection(identity)) {
DatabaseMetaData metadata = connection.getMetaData();
if (metadata.storesUpperCaseIdentifiers() && !caseSensitiveNameMatchingEnabled) {
newColumnName = newColumnName.toUpperCase(ENGLISH);
}
execute(connection, sql);
}
catch (SQLException e) {
Expand Down Expand Up @@ -717,7 +682,7 @@ public void dropTable(ClickHouseIdentity identity, ClickHouseTableHandle handle)
{
StringBuilder sql = new StringBuilder()
.append("DROP TABLE ")
.append(quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()));
.append(quoted(handle.getSchemaName(), handle.getTableName()));

try (Connection connection = connectionFactory.openConnection(identity)) {
execute(connection, sql.toString());
Expand All @@ -744,7 +709,7 @@ public void renameTable(ClickHouseIdentity identity, ClickHouseTableHandle handl
renameTable(identity, handle.getCatalogName(), handle.getSchemaTableName(), newTable);
}

public void createSchema(ClickHouseIdentity identity, String schemaName, Map<String, Object> properties)
public void createSchema(ClickHouseIdentity identity, String schemaName)
{
try (Connection connection = connectionFactory.openConnection(identity)) {
execute(connection, "CREATE DATABASE " + quoted(schemaName));
Expand All @@ -770,20 +735,11 @@ protected void renameTable(ClickHouseIdentity identity, String catalogName, Sche
String tableName = oldTable.getTableName();
String newSchemaName = newTable.getSchemaName();
String newTableName = newTable.getTableName();
String sql = format("RENAME TABLE %s.%s TO %s.%s",
quoted(schemaName),
quoted(tableName),
quoted(newTable.getSchemaName()),
quoted(newTable.getTableName()));
String sql = format("RENAME TABLE %s TO %s",
quoted(schemaName, tableName),
quoted(newSchemaName, newTableName));

try (Connection connection = connectionFactory.openConnection(identity)) {
DatabaseMetaData metadata = connection.getMetaData();
if (metadata.storesUpperCaseIdentifiers() && !caseSensitiveNameMatchingEnabled) {
schemaName = schemaName.toUpperCase(ENGLISH);
tableName = tableName.toUpperCase(ENGLISH);
newSchemaName = newSchemaName.toUpperCase(ENGLISH);
newTableName = newTableName.toUpperCase(ENGLISH);
}
execute(connection, sql);
}
catch (SQLException e) {
Expand Down Expand Up @@ -901,8 +857,8 @@ protected void copyTableSchema(ClickHouseIdentity identity, String catalogName,
String newCreateTableName = newTableName.getTableName();
String sql = format(
"CREATE TABLE %s AS %s ",
quoted(null, schemaName, newCreateTableName),
quoted(null, schemaName, oldCreateTableName));
quoted(schemaName, newCreateTableName),
quoted(schemaName, oldCreateTableName));

try (Connection connection = connectionFactory.openConnection(identity)) {
execute(connection, sql);
Expand All @@ -917,7 +873,6 @@ protected void copyTableSchema(ClickHouseIdentity identity, String catalogName,
private String quoted(RemoteTableName remoteTableName)
{
return quoted(
remoteTableName.getCatalogName().orElse(null),
remoteTableName.getSchemaName().orElse(null),
remoteTableName.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
@Override
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties)
{
clickHouseClient.createSchema(ClickHouseIdentity.from(session), schemaName, properties);
clickHouseClient.createSchema(ClickHouseIdentity.from(session), schemaName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public ClickHouseRecordCursor(ClickHouseClient clickHouseClient, ConnectorSessio
sliceReadFunctions = new SliceReadFunction[columnHandles.size()];

for (int i = 0; i < this.columnHandles.length; i++) {
ReadMapping readMapping = clickHouseClient.toPrestoType(session, columnHandles.get(i).getClickHouseTypeHandle())
ReadMapping readMapping = clickHouseClient.toPrestoType(columnHandles.get(i).getClickHouseTypeHandle())
.orElseThrow(() -> new VerifyException("Unsupported column type"));
Class<?> javaType = readMapping.getType().getJavaType();
ReadFunction readFunction = readMapping.getReadFunction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ public ConnectorSplitSource getSplits(
SplitSchedulingContext splitSchedulingContext)
{
ClickHouseTableLayoutHandle layoutHandle = (ClickHouseTableLayoutHandle) layout;
return clickHouseClient.getSplits(ClickHouseIdentity.from(session), layoutHandle);
return clickHouseClient.getSplits(layoutHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,22 @@ public void testAddColumn()
assertFalse(getQueryRunner().tableExists(getSession(), tableName));
}

@Override
public void testRenameTable()
{
String tableName = "test_rename_table_" + randomTableSuffix();
String newTableName = "test_rename_table_new_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " (id int NOT NULL, x VARCHAR) WITH (engine = 'MergeTree', order_by = ARRAY['id'])");
assertUpdate("INSERT INTO " + tableName + " (id, x) VALUES(1, 'first')", 1);

assertUpdate("ALTER TABLE " + tableName + " RENAME TO " + newTableName);
assertFalse(getQueryRunner().tableExists(getSession(), tableName));
assertTrue(getQueryRunner().tableExists(getSession(), newTableName));
assertUpdate("DROP TABLE " + newTableName);

assertFalse(getQueryRunner().tableExists(getSession(), newTableName));
}

@Test
public void testShowCreateTable()
{
Expand Down
Loading