Skip to content

Commit

Permalink
Recreated table from scratch when PK changes (#27)
Browse files Browse the repository at this point in the history
* Recreated table from scratch when PK changes

* Fixed test

* Fixed bug in test

* Reverted changes for testing
  • Loading branch information
AdalbertMemSQL authored Jul 16, 2024
1 parent 704b0c5 commit 8f251c4
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 34 deletions.
7 changes: 3 additions & 4 deletions docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ The following table illustrates how Fivetran data types are transformed into Sin

| Schema Change | Supported | Notes |
|------------------------|-----------|-----------------------------------------------------------------------------------------------------------|
| Add column || When Fivetran detects the addition of a column in your source, it automatically adds that column in the SingleStore destination. |
| Change column type || When Fivetran detects a change in the column type in the data source, it automatically changes the column type in the SingleStore destination. To change the column type, Fivetran creates a new column, copies the data from the existing column to the new column, deletes the existing column, and renames the new column. |
| Change key || Changing PRIMARY KEY is not supported in SingleStore. |
| Change key column type || Changing PRIMARY KEY column data type is not supported in SingleStore. |
| Add column || When Fivetran detects the addition of a column in your source, it automatically adds that column in the SingleStore destination. |
| Change column type || When Fivetran detects a change in the column type in the data source, it automatically changes the column type in the SingleStore destination. To change the column type, Fivetran creates a new column, copies the data from the existing column to the new column, deletes the existing column, and renames the new column. |
| Change key or key column type || Changing PRIMARY KEY is not supported in SingleStore. When Fivetran detects a change in a key, it creates a new table with updated PRIMARY KEY, copies the data from the existing table to the new one, deletes the existing table, and renames the new table |
75 changes: 54 additions & 21 deletions src/main/java/com/singlestore/fivetran/destination/JDBCUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ static boolean checkTableExists(Statement stmt, String database, String table) {
}

static boolean checkDatabaseExists(Statement stmt, String database) throws SQLException {
try (ResultSet rs = stmt.executeQuery(
String.format("SHOW DATABASES LIKE %s", escapeString(database)))) {
try (ResultSet rs = stmt
.executeQuery(String.format("SHOW DATABASES LIKE %s", escapeString(database)))) {
return rs.next();
}
}

static Table getTable(SingleStoreConfiguration conf, String database, String table, String originalTableName)
throws Exception {
static Table getTable(SingleStoreConfiguration conf, String database, String table,
String originalTableName) throws Exception {
try (Connection conn = JDBCUtil.createConnection(conf)) {
DatabaseMetaData metadata = conn.getMetaData();

Expand Down Expand Up @@ -173,53 +173,79 @@ static String generateAlterTableQuery(AlterTableRequest request) throws Exceptio
SingleStoreConfiguration conf = new SingleStoreConfiguration(request.getConfigurationMap());

String database = JDBCUtil.getDatabaseName(conf, request.getSchemaName());
String table = JDBCUtil.getTableName(conf, request.getSchemaName(), request.getTable().getName());
String table =
JDBCUtil.getTableName(conf, request.getSchemaName(), request.getTable().getName());

Table oldTable = getTable(conf, database, table, request.getTable().getName());
Table newTable = request.getTable();
boolean pkChanged = false;

if (!pkEquals(oldTable, newTable)) {
throw new Exception("Changing PRIMARY KEY is not supported in SingleStore");
pkChanged = true;
}

Map<String, Column> oldColumns = oldTable.getColumnsList().stream()
.collect(Collectors.toMap(Column::getName, Function.identity()));

List<Column> columnsToAdd = new ArrayList<>();
List<Column> columnsToChange = new ArrayList<>();
List<Column> commonColumns = new ArrayList();

for (Column column : newTable.getColumnsList()) {
Column oldColumn = oldColumns.get(column.getName());
if (oldColumn == null) {
columnsToAdd.add(column);
} else {
commonColumns.add(column);
String oldType = mapDataTypes(oldColumn.getType(), oldColumn.getDecimal());
String newType = mapDataTypes(column.getType(), column.getDecimal());
if (!oldType.equals(newType)) {
if (oldColumn.getPrimaryKey()) {
throw new Exception(
"Changing PRIMARY KEY column data type is not supported in SingleStore");
pkChanged = true;
continue;
}
columnsToChange.add(column);
}
}
}

return generateAlterTableQuery(database, table, columnsToAdd, columnsToChange);
if (pkChanged) {
logger.warn(
"Alter table changes the key of the table. This operation is not supported by SingleStore. The table will be recreated from scratch.");

return generateRecreateTableQuery(database, table, newTable, commonColumns);
} else {
return generateAlterTableQuery(database, table, columnsToAdd, columnsToChange);
}
}

static String generateTruncateTableQuery(SingleStoreConfiguration conf, TruncateRequest request) {
static String generateRecreateTableQuery(String database, String tableName, Table table,
List<Column> commonColumns) {
String tmpTableName = tableName + "_alter_tmp";
String columns = commonColumns.stream().map(column -> escapeIdentifier(column.getName()))
.collect(Collectors.joining(", "));

String createTable = generateCreateTableQuery(database, tmpTableName, table);
String insertData = String.format("INSERT INTO %s (%s) SELECT %s FROM %s",
escapeTable(database, tmpTableName), columns, columns,
escapeTable(database, tableName));
String dropTable = String.format("DROP TABLE %s", escapeTable(database, tableName));
String renameTable = String.format("ALTER TABLE %s RENAME AS %s", tmpTableName, tableName);

return String.join("; ", createTable, insertData, dropTable, renameTable);
}

static String generateTruncateTableQuery(SingleStoreConfiguration conf,
TruncateRequest request) {
String query;
String database = JDBCUtil.getDatabaseName(conf, request.getSchemaName());
String table = JDBCUtil.getTableName(conf, request.getSchemaName(), request.getTableName());

if (request.hasSoft()) {
query = String.format("UPDATE %s SET %s = 1 ",
escapeTable(database, table),
query = String.format("UPDATE %s SET %s = 1 ", escapeTable(database, table),
escapeIdentifier(request.getSoft().getDeletedColumn()));
} else {
query = String.format("DELETE FROM %s ",
escapeTable(database, table));
query = String.format("DELETE FROM %s ", escapeTable(database, table));
}

query += String.format("WHERE %s < FROM_UNIXTIME(%d.%09d)",
Expand Down Expand Up @@ -265,17 +291,24 @@ static String generateAlterTableQuery(String database, String table, List<Column
return query.toString();
}

static String generateCreateTableQuery(SingleStoreConfiguration conf, Statement stmt, CreateTableRequest request) throws SQLException {
static String generateCreateTableQuery(String database, String tableName, Table table) {
String columnDefinitions = getColumnDefinitions(table.getColumnsList());
return String.format("CREATE TABLE %s (%s)", escapeTable(database, tableName),
columnDefinitions);
}

static String generateCreateTableQuery(SingleStoreConfiguration conf, Statement stmt,
CreateTableRequest request) throws SQLException {
String database = JDBCUtil.getDatabaseName(conf, request.getSchemaName());
String table = JDBCUtil.getTableName(conf, request.getSchemaName(), request.getTable().getName());
String columnDefinitions = getColumnDefinitions(request.getTable().getColumnsList());
String table =
JDBCUtil.getTableName(conf, request.getSchemaName(), request.getTable().getName());
String createTableQuery = generateCreateTableQuery(database, table, request.getTable());

if (!checkDatabaseExists(stmt, database)) {
return String.format("CREATE DATABASE IF NOT EXISTS %s; CREATE TABLE %s (%s)",
escapeIdentifier(database), escapeTable(database, table), columnDefinitions);
return String.format("CREATE DATABASE IF NOT EXISTS %s; %s", escapeIdentifier(database),
createTableQuery);
} else {
return String.format("CREATE TABLE %s (%s)",
escapeTable(database, table), columnDefinitions);
return createTableQuery;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,15 @@ public void changeKey() throws Exception {
AlterTableRequest request = AlterTableRequest.newBuilder().putAllConfiguration(confMap)
.setSchemaName(database).setTable(table).build();

Exception ex =
assertThrows(Exception.class, () -> JDBCUtil.generateAlterTableQuery(request));
assertEquals("Changing PRIMARY KEY is not supported in SingleStore", ex.getMessage());
String query = JDBCUtil.generateAlterTableQuery(request);
stmt.execute(query);
Table result = JDBCUtil.getTable(conf, database, "changeKey", "changeKey");
List<Column> columns = result.getColumnsList();

assertEquals("a", columns.get(0).getName());
assertEquals(DataType.INT, columns.get(0).getType());
assertEquals(true, columns.get(0).getPrimaryKey());

}
}

Expand All @@ -119,7 +125,8 @@ public void severalOperations() throws SQLException, Exception {

String query = JDBCUtil.generateAlterTableQuery(request);
stmt.execute(query);
Table result = JDBCUtil.getTable(conf, database, "severalOperations", "severalOperations");
Table result =
JDBCUtil.getTable(conf, database, "severalOperations", "severalOperations");
List<Column> columns = result.getColumnsList();

assertEquals("a", columns.get(0).getName());
Expand Down Expand Up @@ -158,7 +165,8 @@ public void changeScaleAndPrecision() throws SQLException, Exception {

String query = JDBCUtil.generateAlterTableQuery(request);
stmt.execute(query);
Table result = JDBCUtil.getTable(conf, database, "changeScaleAndPrecision", "changeScaleAndPrecision");
Table result = JDBCUtil.getTable(conf, database, "changeScaleAndPrecision",
"changeScaleAndPrecision");
List<Column> columns = result.getColumnsList();

assertEquals("a", columns.get(0).getName());
Expand Down Expand Up @@ -209,18 +217,33 @@ public void changeTypeOfKey() throws Exception {
Statement stmt = conn.createStatement();) {
stmt.execute(String.format("USE %s", database));
stmt.execute("CREATE TABLE changeTypeOfKey(a INT PRIMARY KEY)");
stmt.execute("INSERT INTO changeTypeOfKey VALUES (1), (2)");
Table table = Table.newBuilder().setName("changeTypeOfKey")
.addAllColumns(Arrays.asList(Column.newBuilder().setName("a")
.setType(DataType.LONG).setPrimaryKey(true).build()))
.addAllColumns(Arrays.asList(
Column.newBuilder().setName("b").setType(DataType.LONG).build()))
.build();

AlterTableRequest request = AlterTableRequest.newBuilder().putAllConfiguration(confMap)
.setSchemaName(database).setTable(table).build();

Exception ex =
assertThrows(Exception.class, () -> JDBCUtil.generateAlterTableQuery(request));
assertEquals("Changing PRIMARY KEY column data type is not supported in SingleStore",
ex.getMessage());
String query = JDBCUtil.generateAlterTableQuery(request);

stmt.execute(query);
Table result = JDBCUtil.getTable(conf, database, "changeTypeOfKey", "changeTypeOfKey");
List<Column> columns = result.getColumnsList();

assertEquals("a", columns.get(0).getName());
assertEquals(DataType.LONG, columns.get(0).getType());
assertEquals(true, columns.get(0).getPrimaryKey());

assertEquals("b", columns.get(1).getName());
assertEquals(DataType.LONG, columns.get(1).getType());
assertEquals(false, columns.get(1).getPrimaryKey());

checkResult("SELECT * FROM `changeTypeOfKey` ORDER BY a",
Arrays.asList(Arrays.asList("1", null), Arrays.asList("2", null)));
}
}
}

0 comments on commit 8f251c4

Please sign in to comment.