Skip to content

Commit f6f031f

Browse files
committed
support column position in DS v2
1 parent 187f3c1 commit f6f031f

File tree

13 files changed

+331
-85
lines changed

13 files changed

+331
-85
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -846,7 +846,7 @@ intervalUnit
846846
;
847847

848848
colPosition
849-
: FIRST | AFTER multipartIdentifier
849+
: position=FIRST | position=AFTER multipartIdentifier
850850
;
851851

852852
dataType

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
package org.apache.spark.sql.connector.catalog;
1919

20-
import com.google.common.base.Preconditions;
21-
import org.apache.spark.annotation.Experimental;
22-
2320
import java.util.Arrays;
2421
import java.util.Objects;
25-
import java.util.stream.Collectors;
2622
import java.util.stream.Stream;
2723

24+
import com.google.common.base.Preconditions;
25+
26+
import org.apache.spark.annotation.Experimental;
27+
2828
/**
2929
* An {@link Identifier} implementation.
3030
*/
@@ -51,19 +51,10 @@ public String name() {
5151
return name;
5252
}
5353

54-
private String escapeQuote(String part) {
55-
if (part.contains("`")) {
56-
return part.replace("`", "``");
57-
} else {
58-
return part;
59-
}
60-
}
61-
6254
@Override
6355
public String toString() {
64-
return Stream.concat(Stream.of(namespace), Stream.of(name))
65-
.map(part -> '`' + escapeQuote(part) + '`')
66-
.collect(Collectors.joining("."));
56+
return CatalogV2Implicits.quoteNameParts(Stream.concat(
57+
Stream.of(namespace), Stream.of(name)).toArray(String[]::new));
6758
}
6859

6960
@Override

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java

Lines changed: 159 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717

1818
package org.apache.spark.sql.connector.catalog;
1919

20-
import org.apache.spark.annotation.Experimental;
21-
import org.apache.spark.sql.types.DataType;
22-
2320
import java.util.Arrays;
2421
import java.util.Objects;
22+
import javax.annotation.Nullable;
23+
24+
import org.apache.spark.annotation.Experimental;
25+
import org.apache.spark.sql.types.DataType;
2526

2627
/**
2728
* TableChange subclasses represent requested changes to a table. These are passed to
@@ -76,7 +77,7 @@ static TableChange removeProperty(String property) {
7677
* @return a TableChange for the addition
7778
*/
7879
static TableChange addColumn(String[] fieldNames, DataType dataType) {
79-
return new AddColumn(fieldNames, dataType, true, null);
80+
return new AddColumn(fieldNames, dataType, true, null, null);
8081
}
8182

8283
/**
@@ -92,7 +93,7 @@ static TableChange addColumn(String[] fieldNames, DataType dataType) {
9293
* @return a TableChange for the addition
9394
*/
9495
static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) {
95-
return new AddColumn(fieldNames, dataType, isNullable, null);
96+
return new AddColumn(fieldNames, dataType, isNullable, null, null);
9697
}
9798

9899
/**
@@ -113,7 +114,30 @@ static TableChange addColumn(
113114
DataType dataType,
114115
boolean isNullable,
115116
String comment) {
116-
return new AddColumn(fieldNames, dataType, isNullable, comment);
117+
return new AddColumn(fieldNames, dataType, isNullable, comment, null);
118+
}
119+
120+
/**
121+
* Create a TableChange for adding a column.
122+
* <p>
123+
* If the field already exists, the change will result in an {@link IllegalArgumentException}.
124+
* If the new field is nested and its parent does not exist or is not a struct, the change will
125+
* result in an {@link IllegalArgumentException}.
126+
*
127+
* @param fieldNames field names of the new column
128+
* @param dataType the new column's data type
129+
* @param isNullable whether the new column can contain null
130+
* @param comment the new field's comment string
131+
* @param position the new columns's position
132+
* @return a TableChange for the addition
133+
*/
134+
static TableChange addColumn(
135+
String[] fieldNames,
136+
DataType dataType,
137+
boolean isNullable,
138+
String comment,
139+
ColumnPosition position) {
140+
return new AddColumn(fieldNames, dataType, isNullable, comment, position);
117141
}
118142

119143
/**
@@ -180,6 +204,21 @@ static TableChange updateColumnComment(String[] fieldNames, String newComment) {
180204
return new UpdateColumnComment(fieldNames, newComment);
181205
}
182206

207+
/**
208+
* Create a TableChange for updating the position of a field.
209+
* <p>
210+
* The name is used to find the field to update.
211+
* <p>
212+
* If the field does not exist, the change will result in an {@link IllegalArgumentException}.
213+
*
214+
* @param fieldNames field names of the column to update
215+
* @param newPosition the new position
216+
* @return a TableChange for the update
217+
*/
218+
static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newPosition) {
219+
return new UpdateColumnPosition(fieldNames, newPosition);
220+
}
221+
183222
/**
184223
* Create a TableChange for deleting a field.
185224
* <p>
@@ -259,6 +298,60 @@ public int hashCode() {
259298
}
260299
}
261300

301+
interface ColumnPosition {
302+
First FIRST = new First();
303+
304+
static ColumnPosition After(String[] column) {
305+
return new After(column);
306+
}
307+
}
308+
309+
/**
310+
* Column position FIRST means the specified column should be the first column.
311+
*/
312+
final class First implements ColumnPosition {
313+
private First() {}
314+
315+
@Override
316+
public String toString() {
317+
return "FIRST";
318+
}
319+
}
320+
321+
/**
322+
* Column position AFTER means the specified column should be put after the given `column`.
323+
*/
324+
final class After implements ColumnPosition {
325+
private final String[] column;
326+
327+
private After(String[] column) {
328+
assert column != null;
329+
this.column = column;
330+
}
331+
332+
public String[] getColumn() {
333+
return column;
334+
}
335+
336+
@Override
337+
public String toString() {
338+
return "AFTER " + CatalogV2Implicits.quoteNameParts(column);
339+
}
340+
341+
@Override
342+
public boolean equals(Object o) {
343+
if (this == o) return true;
344+
if (o == null || getClass() != o.getClass()) return false;
345+
After after = (After) o;
346+
return Arrays.equals(column, after.column);
347+
}
348+
349+
@Override
350+
public int hashCode() {
351+
return Arrays.hashCode(column);
352+
}
353+
}
354+
262355
interface ColumnChange extends TableChange {
263356
String[] fieldNames();
264357
}
@@ -275,12 +368,19 @@ final class AddColumn implements ColumnChange {
275368
private final DataType dataType;
276369
private final boolean isNullable;
277370
private final String comment;
278-
279-
private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, String comment) {
371+
private final ColumnPosition position;
372+
373+
private AddColumn(
374+
String[] fieldNames,
375+
DataType dataType,
376+
boolean isNullable,
377+
String comment,
378+
ColumnPosition position) {
280379
this.fieldNames = fieldNames;
281380
this.dataType = dataType;
282381
this.isNullable = isNullable;
283382
this.comment = comment;
383+
this.position = position;
284384
}
285385

286386
@Override
@@ -296,10 +396,16 @@ public boolean isNullable() {
296396
return isNullable;
297397
}
298398

399+
@Nullable
299400
public String comment() {
300401
return comment;
301402
}
302403

404+
@Nullable
405+
public ColumnPosition position() {
406+
return position;
407+
}
408+
303409
@Override
304410
public boolean equals(Object o) {
305411
if (this == o) return true;
@@ -308,12 +414,13 @@ public boolean equals(Object o) {
308414
return isNullable == addColumn.isNullable &&
309415
Arrays.equals(fieldNames, addColumn.fieldNames) &&
310416
dataType.equals(addColumn.dataType) &&
311-
comment.equals(addColumn.comment);
417+
Objects.equals(comment, addColumn.comment) &&
418+
Objects.equals(position, addColumn.position);
312419
}
313420

314421
@Override
315422
public int hashCode() {
316-
int result = Objects.hash(dataType, isNullable, comment);
423+
int result = Objects.hash(dataType, isNullable, comment, position);
317424
result = 31 * result + Arrays.hashCode(fieldNames);
318425
return result;
319426
}
@@ -453,6 +560,48 @@ public int hashCode() {
453560
}
454561
}
455562

563+
/**
564+
* A TableChange to update the position of a field.
565+
* <p>
566+
* The field names are used to find the field to update.
567+
* <p>
568+
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
569+
*/
570+
final class UpdateColumnPosition implements ColumnChange {
571+
private final String[] fieldNames;
572+
private final ColumnPosition position;
573+
574+
private UpdateColumnPosition(String[] fieldNames, ColumnPosition position) {
575+
this.fieldNames = fieldNames;
576+
this.position = position;
577+
}
578+
579+
@Override
580+
public String[] fieldNames() {
581+
return fieldNames;
582+
}
583+
584+
public ColumnPosition position() {
585+
return position;
586+
}
587+
588+
@Override
589+
public boolean equals(Object o) {
590+
if (this == o) return true;
591+
if (o == null || getClass() != o.getClass()) return false;
592+
UpdateColumnPosition that = (UpdateColumnPosition) o;
593+
return Arrays.equals(fieldNames, that.fieldNames) &&
594+
position.equals(that.position);
595+
}
596+
597+
@Override
598+
public int hashCode() {
599+
int result = Objects.hash(position);
600+
result = 31 * result + Arrays.hashCode(fieldNames);
601+
return result;
602+
}
603+
}
604+
456605
/**
457606
* A TableChange to delete a field.
458607
* <p>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,32 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
3535
case AlterTableAddColumnsStatement(
3636
nameParts @ NonSessionCatalog(catalog, tableName), cols) =>
3737
val changes = cols.map { col =>
38-
TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
38+
TableChange.addColumn(
39+
col.name.toArray,
40+
col.dataType,
41+
true,
42+
col.comment.orNull,
43+
col.position.orNull)
3944
}
4045
createAlterTable(nameParts, catalog, tableName, changes)
4146

4247
case AlterTableAlterColumnStatement(
43-
nameParts @ NonSessionCatalog(catalog, tableName), colName, dataType, comment) =>
48+
nameParts @ NonSessionCatalog(catalog, tableName), colName, dataType, comment, position) =>
49+
val nameParts = colName.toArray
4450
val typeChange = dataType.map { newDataType =>
45-
TableChange.updateColumnType(colName.toArray, newDataType, true)
51+
TableChange.updateColumnType(nameParts, newDataType, true)
4652
}
4753
val commentChange = comment.map { newComment =>
48-
TableChange.updateColumnComment(colName.toArray, newComment)
54+
TableChange.updateColumnComment(nameParts, newComment)
4955
}
50-
createAlterTable(nameParts, catalog, tableName, typeChange.toSeq ++ commentChange)
56+
val positionChange = position.map { newPosition =>
57+
TableChange.updateColumnPosition(nameParts, newPosition)
58+
}
59+
createAlterTable(
60+
nameParts,
61+
catalog,
62+
tableName,
63+
typeChange.toSeq ++ commentChange ++ positionChange)
5164

5265
case AlterTableRenameColumnStatement(
5366
nameParts @ NonSessionCatalog(catalog, tableName), col, newName) =>

0 commit comments

Comments
 (0)