Skip to content

Commit fdcd0e7

Browse files
committed
[SPARK-30192][SQL] support column position in DS v2
### What changes were proposed in this pull request? update DS v2 API to support add/alter column with column position ### Why are the changes needed? We have a parser rule for column position, but we fail the query if it's specified, because the builtin catalog can't support add/alter column with column position. Since we have the catalog plugin API now, we should let the catalog implementation to decide if it supports column position or not. ### Does this PR introduce any user-facing change? not yet ### How was this patch tested? new tests Closes #26817 from cloud-fan/parser. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 72f5597 commit fdcd0e7

File tree

14 files changed

+444
-106
lines changed

14 files changed

+444
-106
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,7 @@ intervalUnit
855855
;
856856

857857
colPosition
858-
: FIRST | AFTER multipartIdentifier
858+
: position=FIRST | position=AFTER afterCol=errorCapturingIdentifier
859859
;
860860

861861
dataType

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717

1818
package org.apache.spark.sql.connector.catalog;
1919

20-
import com.google.common.base.Preconditions;
21-
import org.apache.spark.annotation.Experimental;
22-
2320
import java.util.Arrays;
2421
import java.util.Objects;
2522
import java.util.stream.Collectors;
2623
import java.util.stream.Stream;
2724

25+
import com.google.common.base.Preconditions;
26+
27+
import org.apache.spark.annotation.Experimental;
28+
2829
/**
2930
* An {@link Identifier} implementation.
3031
*/
@@ -51,19 +52,11 @@ public String name() {
5152
return name;
5253
}
5354

54-
private String escapeQuote(String part) {
55-
if (part.contains("`")) {
56-
return part.replace("`", "``");
57-
} else {
58-
return part;
59-
}
60-
}
61-
6255
@Override
6356
public String toString() {
6457
return Stream.concat(Stream.of(namespace), Stream.of(name))
65-
.map(part -> '`' + escapeQuote(part) + '`')
66-
.collect(Collectors.joining("."));
58+
.map(CatalogV2Implicits::quote)
59+
.collect(Collectors.joining("."));
6760
}
6861

6962
@Override

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java

Lines changed: 168 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717

1818
package org.apache.spark.sql.connector.catalog;
1919

20-
import org.apache.spark.annotation.Experimental;
21-
import org.apache.spark.sql.types.DataType;
22-
2320
import java.util.Arrays;
2421
import java.util.Objects;
22+
import javax.annotation.Nullable;
23+
24+
import org.apache.spark.annotation.Experimental;
25+
import org.apache.spark.sql.types.DataType;
2526

2627
/**
2728
* TableChange subclasses represent requested changes to a table. These are passed to
@@ -76,7 +77,7 @@ static TableChange removeProperty(String property) {
7677
* @return a TableChange for the addition
7778
*/
7879
static TableChange addColumn(String[] fieldNames, DataType dataType) {
79-
return new AddColumn(fieldNames, dataType, true, null);
80+
return new AddColumn(fieldNames, dataType, true, null, null);
8081
}
8182

8283
/**
@@ -92,7 +93,7 @@ static TableChange addColumn(String[] fieldNames, DataType dataType) {
9293
* @return a TableChange for the addition
9394
*/
9495
static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) {
95-
return new AddColumn(fieldNames, dataType, isNullable, null);
96+
return new AddColumn(fieldNames, dataType, isNullable, null, null);
9697
}
9798

9899
/**
@@ -113,7 +114,30 @@ static TableChange addColumn(
113114
DataType dataType,
114115
boolean isNullable,
115116
String comment) {
116-
return new AddColumn(fieldNames, dataType, isNullable, comment);
117+
return new AddColumn(fieldNames, dataType, isNullable, comment, null);
118+
}
119+
120+
/**
121+
* Create a TableChange for adding a column.
122+
* <p>
123+
* If the field already exists, the change will result in an {@link IllegalArgumentException}.
124+
* If the new field is nested and its parent does not exist or is not a struct, the change will
125+
* result in an {@link IllegalArgumentException}.
126+
*
127+
* @param fieldNames field names of the new column
128+
* @param dataType the new column's data type
129+
* @param isNullable whether the new column can contain null
130+
* @param comment the new field's comment string
131+
* @param position the new columns's position
132+
* @return a TableChange for the addition
133+
*/
134+
static TableChange addColumn(
135+
String[] fieldNames,
136+
DataType dataType,
137+
boolean isNullable,
138+
String comment,
139+
ColumnPosition position) {
140+
return new AddColumn(fieldNames, dataType, isNullable, comment, position);
117141
}
118142

119143
/**
@@ -180,6 +204,21 @@ static TableChange updateColumnComment(String[] fieldNames, String newComment) {
180204
return new UpdateColumnComment(fieldNames, newComment);
181205
}
182206

207+
/**
208+
* Create a TableChange for updating the position of a field.
209+
* <p>
210+
* The name is used to find the field to update.
211+
* <p>
212+
* If the field does not exist, the change will result in an {@link IllegalArgumentException}.
213+
*
214+
* @param fieldNames field names of the column to update
215+
* @param newPosition the new position
216+
* @return a TableChange for the update
217+
*/
218+
static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newPosition) {
219+
return new UpdateColumnPosition(fieldNames, newPosition);
220+
}
221+
183222
/**
184223
* Create a TableChange for deleting a field.
185224
* <p>
@@ -259,6 +298,69 @@ public int hashCode() {
259298
}
260299
}
261300

301+
interface ColumnPosition {
302+
303+
static ColumnPosition first() {
304+
return First.SINGLETON;
305+
}
306+
307+
static ColumnPosition after(String column) {
308+
return new After(column);
309+
}
310+
}
311+
312+
/**
313+
* Column position FIRST means the specified column should be the first column.
314+
* Note that, the specified column may be a nested field, and then FIRST means this field should
315+
* be the first one within the struct.
316+
*/
317+
final class First implements ColumnPosition {
318+
private static final First SINGLETON = new First();
319+
320+
private First() {}
321+
322+
@Override
323+
public String toString() {
324+
return "FIRST";
325+
}
326+
}
327+
328+
/**
329+
* Column position AFTER means the specified column should be put after the given `column`.
330+
* Note that, the specified column may be a nested field, and then the given `column` refers to
331+
* a field in the same struct.
332+
*/
333+
final class After implements ColumnPosition {
334+
private final String column;
335+
336+
private After(String column) {
337+
assert column != null;
338+
this.column = column;
339+
}
340+
341+
public String column() {
342+
return column;
343+
}
344+
345+
@Override
346+
public String toString() {
347+
return "AFTER " + column;
348+
}
349+
350+
@Override
351+
public boolean equals(Object o) {
352+
if (this == o) return true;
353+
if (o == null || getClass() != o.getClass()) return false;
354+
After after = (After) o;
355+
return column.equals(after.column);
356+
}
357+
358+
@Override
359+
public int hashCode() {
360+
return Objects.hash(column);
361+
}
362+
}
363+
262364
interface ColumnChange extends TableChange {
263365
String[] fieldNames();
264366
}
@@ -275,12 +377,19 @@ final class AddColumn implements ColumnChange {
275377
private final DataType dataType;
276378
private final boolean isNullable;
277379
private final String comment;
278-
279-
private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, String comment) {
380+
private final ColumnPosition position;
381+
382+
private AddColumn(
383+
String[] fieldNames,
384+
DataType dataType,
385+
boolean isNullable,
386+
String comment,
387+
ColumnPosition position) {
280388
this.fieldNames = fieldNames;
281389
this.dataType = dataType;
282390
this.isNullable = isNullable;
283391
this.comment = comment;
392+
this.position = position;
284393
}
285394

286395
@Override
@@ -296,10 +405,16 @@ public boolean isNullable() {
296405
return isNullable;
297406
}
298407

408+
@Nullable
299409
public String comment() {
300410
return comment;
301411
}
302412

413+
@Nullable
414+
public ColumnPosition position() {
415+
return position;
416+
}
417+
303418
@Override
304419
public boolean equals(Object o) {
305420
if (this == o) return true;
@@ -308,12 +423,13 @@ public boolean equals(Object o) {
308423
return isNullable == addColumn.isNullable &&
309424
Arrays.equals(fieldNames, addColumn.fieldNames) &&
310425
dataType.equals(addColumn.dataType) &&
311-
comment.equals(addColumn.comment);
426+
Objects.equals(comment, addColumn.comment) &&
427+
Objects.equals(position, addColumn.position);
312428
}
313429

314430
@Override
315431
public int hashCode() {
316-
int result = Objects.hash(dataType, isNullable, comment);
432+
int result = Objects.hash(dataType, isNullable, comment, position);
317433
result = 31 * result + Arrays.hashCode(fieldNames);
318434
return result;
319435
}
@@ -453,6 +569,48 @@ public int hashCode() {
453569
}
454570
}
455571

572+
/**
573+
* A TableChange to update the position of a field.
574+
* <p>
575+
* The field names are used to find the field to update.
576+
* <p>
577+
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
578+
*/
579+
final class UpdateColumnPosition implements ColumnChange {
580+
private final String[] fieldNames;
581+
private final ColumnPosition position;
582+
583+
private UpdateColumnPosition(String[] fieldNames, ColumnPosition position) {
584+
this.fieldNames = fieldNames;
585+
this.position = position;
586+
}
587+
588+
@Override
589+
public String[] fieldNames() {
590+
return fieldNames;
591+
}
592+
593+
public ColumnPosition position() {
594+
return position;
595+
}
596+
597+
@Override
598+
public boolean equals(Object o) {
599+
if (this == o) return true;
600+
if (o == null || getClass() != o.getClass()) return false;
601+
UpdateColumnPosition that = (UpdateColumnPosition) o;
602+
return Arrays.equals(fieldNames, that.fieldNames) &&
603+
position.equals(that.position);
604+
}
605+
606+
@Override
607+
public int hashCode() {
608+
int result = Objects.hash(position);
609+
result = 31 * result + Arrays.hashCode(fieldNames);
610+
return result;
611+
}
612+
}
613+
456614
/**
457615
* A TableChange to delete a field.
458616
* <p>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,32 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
3535
case AlterTableAddColumnsStatement(
3636
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
3737
val changes = cols.map { col =>
38-
TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
38+
TableChange.addColumn(
39+
col.name.toArray,
40+
col.dataType,
41+
true,
42+
col.comment.orNull,
43+
col.position.orNull)
3944
}
4045
createAlterTable(nameParts, catalog, tbl, changes)
4146

4247
case AlterTableAlterColumnStatement(
43-
nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment) =>
48+
nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment, pos) =>
49+
val colNameArray = colName.toArray
4450
val typeChange = dataType.map { newDataType =>
45-
TableChange.updateColumnType(colName.toArray, newDataType, true)
51+
TableChange.updateColumnType(colNameArray, newDataType, true)
4652
}
4753
val commentChange = comment.map { newComment =>
48-
TableChange.updateColumnComment(colName.toArray, newComment)
54+
TableChange.updateColumnComment(colNameArray, newComment)
4955
}
50-
createAlterTable(nameParts, catalog, tbl, typeChange.toSeq ++ commentChange)
56+
val positionChange = pos.map { newPosition =>
57+
TableChange.updateColumnPosition(colNameArray, newPosition)
58+
}
59+
createAlterTable(
60+
nameParts,
61+
catalog,
62+
tbl,
63+
typeChange.toSeq ++ commentChange ++ positionChange)
5164

5265
case AlterTableRenameColumnStatement(
5366
nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) =>

0 commit comments

Comments
 (0)