Skip to content

Commit c5317d4

Browse files
committed
[apache#2447] feat(spark-connector): support alter table AddColumn and DropColumn for spark-connector
1 parent b288049 commit c5317d4

File tree

3 files changed

+31
-31
lines changed

3 files changed

+31
-31
lines changed

integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkIT.java

+15-12
Original file line numberDiff line numberDiff line change
@@ -284,22 +284,25 @@ void testAlterTableAddAndDeleteColumn() {
284284
dropTableIfExists(tableName);
285285

286286
createSimpleTable(tableName);
287-
List<SparkColumnInfo> sparkOldColumnInfos = getTableInfo(tableName).getColumns();
288-
sparkOldColumnInfos.forEach(
289-
sparkColumnInfo ->
290-
Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName())));
287+
String[] oldColumnNames =
288+
getTableInfo(tableName).getColumns().stream()
289+
.map(SparkColumnInfo::getName)
290+
.toArray(String[]::new);
291+
Assertions.assertEquals(oldColumnNames, new String[] {"id", "name", "age"});
291292

292293
sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 string)", tableName));
293-
List<SparkColumnInfo> sparkAddColumnInfos = getTableInfo(tableName).getColumns();
294-
Assertions.assertTrue(
295-
sparkAddColumnInfos.stream()
296-
.anyMatch(sparkColumnInfo -> "col1".equalsIgnoreCase(sparkColumnInfo.getName())));
294+
String[] addColumnNames =
295+
getTableInfo(tableName).getColumns().stream()
296+
.map(SparkColumnInfo::getName)
297+
.toArray(String[]::new);
298+
Assertions.assertEquals(addColumnNames, new String[] {"id", "name", "age", "col1"});
297299

298300
sql(String.format("ALTER TABLE %S DROP COLUMNS (col1)", tableName));
299-
List<SparkColumnInfo> sparkDeleteColumnInfos = getTableInfo(tableName).getColumns();
300-
sparkDeleteColumnInfos.forEach(
301-
sparkColumnInfo ->
302-
Assertions.assertFalse("col1".equalsIgnoreCase(sparkColumnInfo.getName())));
301+
String[] deleteColumnNames =
302+
getTableInfo(tableName).getColumns().stream()
303+
.map(SparkColumnInfo::getName)
304+
.toArray(String[]::new);
305+
Assertions.assertEquals(deleteColumnNames, new String[] {"id", "name", "age"});
303306
}
304307

305308
private void checkTableReadWrite(SparkTableInfo table) {

spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalog.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -381,13 +381,17 @@ static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange
381381

382382
private static com.datastrato.gravitino.rel.TableChange.ColumnPosition transformColumnPosition(
383383
TableChange.ColumnPosition columnPosition) {
384-
if (columnPosition instanceof TableChange.First) {
384+
if (null == columnPosition) {
385+
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.defaultPos();
386+
} else if (columnPosition instanceof TableChange.First) {
385387
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.first();
386388
} else if (columnPosition instanceof TableChange.After) {
387389
TableChange.After after = (TableChange.After) columnPosition;
388390
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.after(after.column());
389391
} else {
390-
return com.datastrato.gravitino.rel.TableChange.ColumnPosition.defaultPos();
392+
throw new UnsupportedOperationException(
393+
String.format(
394+
"Unsupported table column position %s", columnPosition.getClass().getName()));
391395
}
392396
}
393397
}

spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java

+10-17
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
package com.datastrato.gravitino.spark.connector.catalog;
77

8-
import java.util.Arrays;
9-
import java.util.Locale;
108
import org.apache.spark.sql.connector.catalog.TableChange;
119
import org.apache.spark.sql.types.DataTypes;
1210
import org.junit.jupiter.api.Assertions;
@@ -41,43 +39,38 @@ void testTransformRemoveProperty() {
4139

4240
@Test
4341
void testTransformAddColumn() {
44-
TableChange sparkChange = TableChange.addColumn(new String[] {"col1"}, DataTypes.StringType);
42+
TableChange.AddColumn sparkAddColumn =
43+
(TableChange.AddColumn) TableChange.addColumn(new String[] {"col1"}, DataTypes.StringType);
4544
com.datastrato.gravitino.rel.TableChange gravitinoChange =
46-
GravitinoCatalog.transformTableChange(sparkChange);
45+
GravitinoCatalog.transformTableChange(sparkAddColumn);
4746

48-
TableChange.AddColumn sparkAddColumn = (TableChange.AddColumn) sparkChange;
4947
Assertions.assertTrue(
5048
gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.AddColumn);
5149
com.datastrato.gravitino.rel.TableChange.AddColumn gravitinoAddColumn =
5250
(com.datastrato.gravitino.rel.TableChange.AddColumn) gravitinoChange;
5351

5452
Assertions.assertEquals(1, sparkAddColumn.fieldNames().length);
5553
Assertions.assertEquals(1, gravitinoAddColumn.fieldName().length);
56-
Assertions.assertEquals(
57-
Arrays.stream(sparkAddColumn.fieldNames()).findFirst(),
58-
Arrays.stream(gravitinoAddColumn.fieldName()).findFirst());
59-
Assertions.assertEquals(
60-
sparkAddColumn.dataType().typeName().toLowerCase(Locale.ROOT),
61-
gravitinoAddColumn.getDataType().simpleString().toLowerCase(Locale.ROOT));
54+
Assertions.assertEquals(sparkAddColumn.fieldNames(), gravitinoAddColumn.fieldName());
55+
Assertions.assertTrue(
56+
"string".equalsIgnoreCase(gravitinoAddColumn.getDataType().simpleString()));
6257
}
6358

6459
@Test
6560
void testTransformDeleteColumn() {
66-
TableChange sparkChange = TableChange.deleteColumn(new String[] {"col1"}, true);
61+
TableChange.DeleteColumn sparkDeleteColumn =
62+
(TableChange.DeleteColumn) TableChange.deleteColumn(new String[] {"col1"}, true);
6763
com.datastrato.gravitino.rel.TableChange gravitinoChange =
68-
GravitinoCatalog.transformTableChange(sparkChange);
64+
GravitinoCatalog.transformTableChange(sparkDeleteColumn);
6965

70-
TableChange.DeleteColumn sparkDeleteColumn = (TableChange.DeleteColumn) sparkChange;
7166
Assertions.assertTrue(
7267
gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.DeleteColumn);
7368
com.datastrato.gravitino.rel.TableChange.DeleteColumn gravitinoDeleteColumn =
7469
(com.datastrato.gravitino.rel.TableChange.DeleteColumn) gravitinoChange;
7570

7671
Assertions.assertEquals(1, sparkDeleteColumn.fieldNames().length);
7772
Assertions.assertEquals(1, gravitinoDeleteColumn.fieldName().length);
78-
Assertions.assertEquals(
79-
Arrays.stream(sparkDeleteColumn.fieldNames()).findFirst(),
80-
Arrays.stream(gravitinoDeleteColumn.fieldName()).findFirst());
73+
Assertions.assertEquals(sparkDeleteColumn.fieldNames(), gravitinoDeleteColumn.fieldName());
8174
Assertions.assertEquals(sparkDeleteColumn.ifExists(), gravitinoDeleteColumn.getIfExists());
8275
}
8376
}

0 commit comments

Comments
 (0)