Skip to content

Commit 3a3a45f

Browse files
author
xiaojiebao
committed
[Cherry-Pick] Spark 3.3: Add SparkChangelogTable (apache#5740)
1 parent f36935e commit 3a3a45f

File tree

17 files changed

+998
-144
lines changed

17 files changed

+998
-144
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg;
20+
21+
import static org.apache.iceberg.MetadataColumns.CHANGE_ORDINAL;
22+
import static org.apache.iceberg.MetadataColumns.CHANGE_TYPE;
23+
import static org.apache.iceberg.MetadataColumns.COMMIT_SNAPSHOT_ID;
24+
25+
import java.util.Set;
26+
import java.util.stream.Collectors;
27+
import org.apache.iceberg.types.TypeUtil;
28+
import org.apache.iceberg.types.Types;
29+
30+
public class ChangelogUtil {
31+
32+
private static final Schema CHANGELOG_METADATA =
33+
new Schema(CHANGE_TYPE, CHANGE_ORDINAL, COMMIT_SNAPSHOT_ID);
34+
35+
private static final Set<Integer> CHANGELOG_METADATA_FIELD_IDS =
36+
CHANGELOG_METADATA.columns().stream()
37+
.map(Types.NestedField::fieldId)
38+
.collect(Collectors.toSet());
39+
40+
private ChangelogUtil() {}
41+
42+
public static Schema changelogSchema(Schema tableSchema) {
43+
return TypeUtil.join(tableSchema, CHANGELOG_METADATA);
44+
}
45+
46+
public static Schema dropChangelogMetadata(Schema changelogSchema) {
47+
return TypeUtil.selectNot(changelogSchema, CHANGELOG_METADATA_FIELD_IDS);
48+
}
49+
}

core/src/main/java/org/apache/iceberg/MetadataColumns.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,24 @@ private MetadataColumns() {}
7373
public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
7474
public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
7575
public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
76+
public static final NestedField CHANGE_TYPE =
77+
NestedField.required(
78+
Integer.MAX_VALUE - 104,
79+
"_change_type",
80+
Types.StringType.get(),
81+
"Record type in changelog");
82+
public static final NestedField CHANGE_ORDINAL =
83+
NestedField.optional(
84+
Integer.MAX_VALUE - 105,
85+
"_change_ordinal",
86+
Types.IntegerType.get(),
87+
"Change ordinal in changelog");
88+
public static final NestedField COMMIT_SNAPSHOT_ID =
89+
NestedField.optional(
90+
Integer.MAX_VALUE - 106,
91+
"_commit_snapshot_id",
92+
Types.LongType.get(),
93+
"Commit snapshot ID");
7694

7795
private static final Map<String, NestedField> META_COLUMNS =
7896
ImmutableMap.of(

core/src/main/java/org/apache/iceberg/hadoop/Util.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
import org.apache.hadoop.fs.FileSystem;
2929
import org.apache.hadoop.fs.Path;
3030
import org.apache.iceberg.CombinedScanTask;
31+
import org.apache.iceberg.ContentScanTask;
3132
import org.apache.iceberg.FileScanTask;
33+
import org.apache.iceberg.ScanTask;
34+
import org.apache.iceberg.ScanTaskGroup;
3235
import org.apache.iceberg.constants.ObjectStorageConstants;
3336
import org.apache.iceberg.exceptions.RuntimeIOException;
3437
import org.apache.iceberg.io.FileIO;
@@ -81,19 +84,27 @@ public static String[] blockLocations(CombinedScanTask task, Configuration conf)
8184
return locationSets.toArray(new String[0]);
8285
}
8386

84-
public static String[] blockLocations(FileIO io, CombinedScanTask task) {
87+
public static String[] blockLocations(FileIO io, ScanTaskGroup<?> taskGroup) {
8588
Set<String> locations = Sets.newHashSet();
86-
for (FileScanTask f : task.files()) {
87-
InputFile in = io.newInputFile(f.file().path().toString());
88-
if (in instanceof HadoopInputFile) {
89-
Collections.addAll(
90-
locations, ((HadoopInputFile) in).getBlockLocations(f.start(), f.length()));
89+
for (ScanTask task : taskGroup.tasks()) {
90+
if (task instanceof ContentScanTask) {
91+
Collections.addAll(locations, blockLocations(io, (ContentScanTask<?>) task));
9192
}
9293
}
9394

9495
return locations.toArray(HadoopInputFile.NO_LOCATION_PREFERENCE);
9596
}
9697

98+
private static String[] blockLocations(FileIO io, ContentScanTask<?> task) {
99+
InputFile inputFile = io.newInputFile(task.file().path().toString());
100+
if (inputFile instanceof HadoopInputFile) {
101+
HadoopInputFile hadoopInputFile = (HadoopInputFile) inputFile;
102+
return hadoopInputFile.getBlockLocations(task.start(), task.length());
103+
} else {
104+
return HadoopInputFile.NO_LOCATION_PREFERENCE;
105+
}
106+
}
107+
97108
/**
98109
* From Apache Spark
99110
*
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.extensions;
20+
21+
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
22+
import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
23+
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
24+
25+
import java.util.List;
26+
import java.util.Map;
27+
import org.apache.iceberg.DataOperations;
28+
import org.apache.iceberg.Snapshot;
29+
import org.apache.iceberg.Table;
30+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
31+
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
32+
import org.apache.iceberg.spark.SparkCatalogConfig;
33+
import org.apache.iceberg.spark.SparkReadOptions;
34+
import org.apache.iceberg.spark.source.SparkChangelogTable;
35+
import org.apache.spark.sql.DataFrameReader;
36+
import org.apache.spark.sql.Row;
37+
import org.junit.After;
38+
import org.junit.Assert;
39+
import org.junit.Test;
40+
import org.junit.runners.Parameterized.Parameters;
41+
42+
public class TestChangelogBatchReads extends SparkExtensionsTestBase {
43+
44+
@Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
45+
public static Object[][] parameters() {
46+
return new Object[][] {
47+
{
48+
1,
49+
SparkCatalogConfig.SPARK.catalogName(),
50+
SparkCatalogConfig.SPARK.implementation(),
51+
SparkCatalogConfig.SPARK.properties()
52+
},
53+
{
54+
2,
55+
SparkCatalogConfig.HIVE.catalogName(),
56+
SparkCatalogConfig.HIVE.implementation(),
57+
SparkCatalogConfig.HIVE.properties()
58+
}
59+
};
60+
}
61+
62+
private final int formatVersion;
63+
64+
public TestChangelogBatchReads(
65+
int formatVersion, String catalogName, String implementation, Map<String, String> config) {
66+
super(catalogName, implementation, config);
67+
this.formatVersion = formatVersion;
68+
}
69+
70+
@After
71+
public void removeTables() {
72+
sql("DROP TABLE IF EXISTS %s", tableName);
73+
}
74+
75+
@Test
76+
public void testDataFilters() {
77+
sql(
78+
"CREATE TABLE %s (id INT, data STRING) "
79+
+ "USING iceberg "
80+
+ "PARTITIONED BY (data) "
81+
+ "TBLPROPERTIES ( "
82+
+ " '%s' = '%d' "
83+
+ ")",
84+
tableName, FORMAT_VERSION, formatVersion);
85+
86+
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
87+
sql("INSERT INTO %s VALUES (2, 'b')", tableName);
88+
sql("INSERT INTO %s VALUES (3, 'c')", tableName);
89+
90+
Table table = validationCatalog.loadTable(tableIdent);
91+
92+
Snapshot snap3 = table.currentSnapshot();
93+
94+
sql("DELETE FROM %s WHERE id = 3", tableName);
95+
96+
table.refresh();
97+
98+
Snapshot snap4 = table.currentSnapshot();
99+
100+
assertEquals(
101+
"Should have expected rows",
102+
ImmutableList.of(
103+
row(3, "c", "INSERT", 2, snap3.snapshotId()),
104+
row(3, "c", "DELETE", 3, snap4.snapshotId())),
105+
sql("SELECT * FROM %s.changes WHERE id = 3 ORDER BY _change_ordinal, id", tableName));
106+
}
107+
108+
@Test
109+
public void testOverwrites() {
110+
sql(
111+
"CREATE TABLE %s (id INT, data STRING) "
112+
+ "USING iceberg "
113+
+ "PARTITIONED BY (data) "
114+
+ "TBLPROPERTIES ( "
115+
+ " '%s' = '%d' "
116+
+ ")",
117+
tableName, FORMAT_VERSION, formatVersion);
118+
119+
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
120+
sql("INSERT INTO %s VALUES (2, 'b')", tableName);
121+
122+
Table table = validationCatalog.loadTable(tableIdent);
123+
124+
Snapshot snap2 = table.currentSnapshot();
125+
126+
sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
127+
128+
table.refresh();
129+
130+
Snapshot snap3 = table.currentSnapshot();
131+
132+
assertEquals(
133+
"Rows should match",
134+
ImmutableList.of(
135+
row(2, "b", "DELETE", 0, snap3.snapshotId()),
136+
row(-2, "b", "INSERT", 0, snap3.snapshotId())),
137+
changelogRecords(snap2, snap3));
138+
}
139+
140+
@Test
141+
public void testMetadataDeletes() {
142+
sql(
143+
"CREATE TABLE %s (id INT, data STRING) "
144+
+ "USING iceberg "
145+
+ "PARTITIONED BY (data) "
146+
+ "TBLPROPERTIES ( "
147+
+ " '%s' = '%d' "
148+
+ ")",
149+
tableName, FORMAT_VERSION, formatVersion);
150+
151+
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
152+
sql("INSERT INTO %s VALUES (2, 'b')", tableName);
153+
154+
Table table = validationCatalog.loadTable(tableIdent);
155+
156+
Snapshot snap2 = table.currentSnapshot();
157+
158+
sql("DELETE FROM %s WHERE data = 'a'", tableName);
159+
160+
table.refresh();
161+
162+
Snapshot snap3 = table.currentSnapshot();
163+
Assert.assertEquals("Operation must match", DataOperations.DELETE, snap3.operation());
164+
165+
assertEquals(
166+
"Rows should match",
167+
ImmutableList.of(row(1, "a", "DELETE", 0, snap3.snapshotId())),
168+
changelogRecords(snap2, snap3));
169+
}
170+
171+
@Test
172+
public void testExistingEntriesInNewDataManifestsAreIgnored() {
173+
sql(
174+
"CREATE TABLE %s (id INT, data STRING) "
175+
+ "USING iceberg "
176+
+ "PARTITIONED BY (data) "
177+
+ "TBLPROPERTIES ( "
178+
+ " '%s' = '%d', "
179+
+ " '%s' = '1', "
180+
+ " '%s' = 'true' "
181+
+ ")",
182+
tableName, FORMAT_VERSION, formatVersion, MANIFEST_MIN_MERGE_COUNT, MANIFEST_MERGE_ENABLED);
183+
184+
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
185+
186+
Table table = validationCatalog.loadTable(tableIdent);
187+
188+
Snapshot snap1 = table.currentSnapshot();
189+
190+
sql("INSERT INTO %s VALUES (2, 'b')", tableName);
191+
192+
table.refresh();
193+
194+
Snapshot snap2 = table.currentSnapshot();
195+
Assert.assertEquals("Manifest number must match", 1, snap2.dataManifests(table.io()).size());
196+
197+
assertEquals(
198+
"Rows should match",
199+
ImmutableList.of(row(2, "b", "INSERT", 0, snap2.snapshotId())),
200+
changelogRecords(snap1, snap2));
201+
}
202+
203+
@Test
204+
public void testManifestRewritesAreIgnored() {
205+
sql(
206+
"CREATE TABLE %s (id INT, data STRING) "
207+
+ "USING iceberg "
208+
+ "PARTITIONED BY (data) "
209+
+ "TBLPROPERTIES ( "
210+
+ " '%s' = '%d' "
211+
+ ")",
212+
tableName, FORMAT_VERSION, formatVersion);
213+
214+
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
215+
sql("INSERT INTO %s VALUES (2, 'b')", tableName);
216+
217+
sql("CALL %s.system.rewrite_manifests('%s')", catalogName, tableIdent);
218+
219+
Table table = validationCatalog.loadTable(tableIdent);
220+
Assert.assertEquals("Num snapshots must match", 3, Iterables.size(table.snapshots()));
221+
222+
assertEquals(
223+
"Should have expected rows",
224+
ImmutableList.of(row(1, "INSERT"), row(2, "INSERT")),
225+
sql("SELECT id, _change_type FROM %s.changes ORDER BY id", tableName));
226+
}
227+
228+
private List<Object[]> changelogRecords(Snapshot startSnapshot, Snapshot endSnapshot) {
229+
DataFrameReader reader = spark.read();
230+
231+
if (startSnapshot != null) {
232+
reader = reader.option(SparkReadOptions.START_SNAPSHOT_ID, startSnapshot.snapshotId());
233+
}
234+
235+
if (endSnapshot != null) {
236+
reader = reader.option(SparkReadOptions.END_SNAPSHOT_ID, endSnapshot.snapshotId());
237+
}
238+
239+
return rowsToJava(collect(reader));
240+
}
241+
242+
private List<Row> collect(DataFrameReader reader) {
243+
return reader
244+
.table(tableName + "." + SparkChangelogTable.TABLE_NAME)
245+
.orderBy("_change_ordinal", "_commit_snapshot_id", "_change_type", "id")
246+
.collectAsList();
247+
}
248+
}

0 commit comments

Comments
 (0)