From 8f251c4e6f27d59180723680ba9fd00e134aacfd Mon Sep 17 00:00:00 2001 From: AdalbertMemSQL Date: Tue, 16 Jul 2024 14:56:47 +0300 Subject: [PATCH] Recreated table from scratch when PK changes (#27) * Recreated table from scratch when PK changes * Fixed test * Fixed bug in test * Reverted changes for testing --- docs/overview.md | 7 +- .../fivetran/destination/JDBCUtil.java | 75 +++++++++++++------ .../fivetran/destination/AlterTableTest.java | 41 +++++++--- 3 files changed, 89 insertions(+), 34 deletions(-) diff --git a/docs/overview.md b/docs/overview.md index a20509a..cc9c7ec 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -50,7 +50,6 @@ The following table illustrates how Fivetran data types are transformed into Sin | Schema Change | Supported | Notes | |------------------------|-----------|-----------------------------------------------------------------------------------------------------------| -| Add column | ✔ | When Fivetran detects the addition of a column in your source, it automatically adds that column in the SingleStore destination. | -| Change column type | ✔ | When Fivetran detects a change in the column type in the data source, it automatically changes the column type in the SingleStore destination. To change the column type, Fivetran creates a new column, copies the data from the existing column to the new column, deletes the existing column, and renames the new column. | -| Change key | ✘ | Changing PRIMARY KEY is not supported in SingleStore. | -| Change key column type | ✘ | Changing PRIMARY KEY column data type is not supported in SingleStore. | \ No newline at end of file +| Add column | ✔ | When Fivetran detects the addition of a column in your source, it automatically adds that column in the SingleStore destination. | +| Change column type | ✔ | When Fivetran detects a change in the column type in the data source, it automatically changes the column type in the SingleStore destination. To change the column type, Fivetran creates a new column, copies the data from the existing column to the new column, deletes the existing column, and renames the new column. | +| Change key or key column type | ✔ | Changing PRIMARY KEY is not supported in SingleStore. When Fivetran detects a change in a key, it creates a new table with updated PRIMARY KEY, copies the data from the existing table to the new one, deletes the existing table, and renames the new table | \ No newline at end of file diff --git a/src/main/java/com/singlestore/fivetran/destination/JDBCUtil.java b/src/main/java/com/singlestore/fivetran/destination/JDBCUtil.java index 591f634..7a994e4 100644 --- a/src/main/java/com/singlestore/fivetran/destination/JDBCUtil.java +++ b/src/main/java/com/singlestore/fivetran/destination/JDBCUtil.java @@ -62,14 +62,14 @@ static boolean checkTableExists(Statement stmt, String database, String table) { } static boolean checkDatabaseExists(Statement stmt, String database) throws SQLException { - try (ResultSet rs = stmt.executeQuery( - String.format("SHOW DATABASES LIKE %s", escapeString(database)))) { + try (ResultSet rs = stmt + .executeQuery(String.format("SHOW DATABASES LIKE %s", escapeString(database)))) { return rs.next(); } } - static Table getTable(SingleStoreConfiguration conf, String database, String table, String originalTableName) - throws Exception { + static Table getTable(SingleStoreConfiguration conf, String database, String table, + String originalTableName) throws Exception { try (Connection conn = JDBCUtil.createConnection(conf)) { DatabaseMetaData metadata = conn.getMetaData(); @@ -173,13 +173,15 @@ static String generateAlterTableQuery(AlterTableRequest request) throws Exceptio SingleStoreConfiguration conf = new SingleStoreConfiguration(request.getConfigurationMap()); String database = JDBCUtil.getDatabaseName(conf, request.getSchemaName()); - String table = JDBCUtil.getTableName(conf, request.getSchemaName(), request.getTable().getName()); + String table = + JDBCUtil.getTableName(conf, request.getSchemaName(), request.getTable().getName()); Table oldTable = getTable(conf, database, table, request.getTable().getName()); Table newTable = request.getTable(); + boolean pkChanged = false; if (!pkEquals(oldTable, newTable)) { - throw new Exception("Changing PRIMARY KEY is not supported in SingleStore"); + pkChanged = true; } Map oldColumns = oldTable.getColumnsList().stream() @@ -187,39 +189,63 @@ static String generateAlterTableQuery(AlterTableRequest request) throws Exceptio List columnsToAdd = new ArrayList<>(); List columnsToChange = new ArrayList<>(); + List commonColumns = new ArrayList(); for (Column column : newTable.getColumnsList()) { Column oldColumn = oldColumns.get(column.getName()); if (oldColumn == null) { columnsToAdd.add(column); } else { + commonColumns.add(column); String oldType = mapDataTypes(oldColumn.getType(), oldColumn.getDecimal()); String newType = mapDataTypes(column.getType(), column.getDecimal()); if (!oldType.equals(newType)) { if (oldColumn.getPrimaryKey()) { - throw new Exception( - "Changing PRIMARY KEY column data type is not supported in SingleStore"); + pkChanged = true; + continue; } columnsToChange.add(column); } } } - return generateAlterTableQuery(database, table, columnsToAdd, columnsToChange); + if (pkChanged) { + logger.warn( + "Alter table changes the key of the table. This operation is not supported by SingleStore. The table will be recreated from scratch."); + + return generateRecreateTableQuery(database, table, newTable, commonColumns); + } else { + return generateAlterTableQuery(database, table, columnsToAdd, columnsToChange); + } } - static String generateTruncateTableQuery(SingleStoreConfiguration conf, TruncateRequest request) { + static String generateRecreateTableQuery(String database, String tableName, Table table, + List commonColumns) { + String tmpTableName = tableName + "_alter_tmp"; + String columns = commonColumns.stream().map(column -> escapeIdentifier(column.getName())) + .collect(Collectors.joining(", ")); + + String createTable = generateCreateTableQuery(database, tmpTableName, table); + String insertData = String.format("INSERT INTO %s (%s) SELECT %s FROM %s", + escapeTable(database, tmpTableName), columns, columns, + escapeTable(database, tableName)); + String dropTable = String.format("DROP TABLE %s", escapeTable(database, tableName)); + String renameTable = String.format("ALTER TABLE %s RENAME AS %s", tmpTableName, tableName); + + return String.join("; ", createTable, insertData, dropTable, renameTable); + } + + static String generateTruncateTableQuery(SingleStoreConfiguration conf, + TruncateRequest request) { String query; String database = JDBCUtil.getDatabaseName(conf, request.getSchemaName()); String table = JDBCUtil.getTableName(conf, request.getSchemaName(), request.getTableName()); if (request.hasSoft()) { - query = String.format("UPDATE %s SET %s = 1 ", - escapeTable(database, table), + query = String.format("UPDATE %s SET %s = 1 ", escapeTable(database, table), escapeIdentifier(request.getSoft().getDeletedColumn())); } else { - query = String.format("DELETE FROM %s ", - escapeTable(database, table)); + query = String.format("DELETE FROM %s ", escapeTable(database, table)); } query += String.format("WHERE %s < FROM_UNIXTIME(%d.%09d)", @@ -265,17 +291,24 @@ static String generateAlterTableQuery(String database, String table, List JDBCUtil.generateAlterTableQuery(request)); - assertEquals("Changing PRIMARY KEY is not supported in SingleStore", ex.getMessage()); + String query = JDBCUtil.generateAlterTableQuery(request); + stmt.execute(query); + Table result = JDBCUtil.getTable(conf, database, "changeKey", "changeKey"); + List columns = result.getColumnsList(); + + assertEquals("a", columns.get(0).getName()); + assertEquals(DataType.INT, columns.get(0).getType()); + assertEquals(true, columns.get(0).getPrimaryKey()); + } } @@ -119,7 +125,8 @@ public void severalOperations() throws SQLException, Exception { String query = JDBCUtil.generateAlterTableQuery(request); stmt.execute(query); - Table result = JDBCUtil.getTable(conf, database, "severalOperations", "severalOperations"); + Table result = + JDBCUtil.getTable(conf, database, "severalOperations", "severalOperations"); List columns = result.getColumnsList(); assertEquals("a", columns.get(0).getName()); @@ -158,7 +165,8 @@ public void changeScaleAndPrecision() throws SQLException, Exception { String query = JDBCUtil.generateAlterTableQuery(request); stmt.execute(query); - Table result = JDBCUtil.getTable(conf, database, "changeScaleAndPrecision", "changeScaleAndPrecision"); + Table result = JDBCUtil.getTable(conf, database, "changeScaleAndPrecision", + "changeScaleAndPrecision"); List columns = result.getColumnsList(); assertEquals("a", columns.get(0).getName()); @@ -209,18 +217,33 @@ public void changeTypeOfKey() throws Exception { Statement stmt = conn.createStatement();) { stmt.execute(String.format("USE %s", database)); stmt.execute("CREATE TABLE changeTypeOfKey(a INT PRIMARY KEY)"); + stmt.execute("INSERT INTO changeTypeOfKey VALUES (1), (2)"); Table table = Table.newBuilder().setName("changeTypeOfKey") .addAllColumns(Arrays.asList(Column.newBuilder().setName("a") .setType(DataType.LONG).setPrimaryKey(true).build())) + .addAllColumns(Arrays.asList( + Column.newBuilder().setName("b").setType(DataType.LONG).build())) .build(); AlterTableRequest request = AlterTableRequest.newBuilder().putAllConfiguration(confMap) .setSchemaName(database).setTable(table).build(); - Exception ex = - assertThrows(Exception.class, () -> JDBCUtil.generateAlterTableQuery(request)); - assertEquals("Changing PRIMARY KEY column data type is not supported in SingleStore", - ex.getMessage()); + String query = JDBCUtil.generateAlterTableQuery(request); + + stmt.execute(query); + Table result = JDBCUtil.getTable(conf, database, "changeTypeOfKey", "changeTypeOfKey"); + List columns = result.getColumnsList(); + + assertEquals("a", columns.get(0).getName()); + assertEquals(DataType.LONG, columns.get(0).getType()); + assertEquals(true, columns.get(0).getPrimaryKey()); + + assertEquals("b", columns.get(1).getName()); + assertEquals(DataType.LONG, columns.get(1).getType()); + assertEquals(false, columns.get(1).getPrimaryKey()); + + checkResult("SELECT * FROM `changeTypeOfKey` ORDER BY a", + Arrays.asList(Arrays.asList("1", null), Arrays.asList("2", null))); } } }