Skip to content

Commit c1fba28

Browse files
author
XuQianJin-Stars
committed
StreamingOffset Of Structured streaming read for Iceberg
1 parent 8a26f28 commit c1fba28

File tree

2 files changed

+23
-167
lines changed

2 files changed

+23
-167
lines changed

spark/src/test/java/org/apache/iceberg/spark/source/TestStreamingOffset.java

Lines changed: 23 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -19,156 +19,36 @@
1919

2020
package org.apache.iceberg.spark.source;
2121

22-
import java.io.File;
23-
import java.io.IOException;
24-
import java.nio.file.Files;
25-
import java.nio.file.Path;
26-
import java.util.List;
27-
import java.util.stream.Collectors;
28-
import org.apache.hadoop.conf.Configuration;
29-
import org.apache.iceberg.DataFile;
30-
import org.apache.iceberg.ManifestFile;
31-
import org.apache.iceberg.ManifestFiles;
32-
import org.apache.iceberg.ManifestReader;
33-
import org.apache.iceberg.PartitionSpec;
34-
import org.apache.iceberg.Schema;
35-
import org.apache.iceberg.Snapshot;
36-
import org.apache.iceberg.Table;
37-
import org.apache.iceberg.hadoop.HadoopTables;
38-
import org.apache.iceberg.io.FileIO;
39-
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
40-
import org.apache.iceberg.types.Types;
41-
import org.apache.spark.sql.Dataset;
42-
import org.apache.spark.sql.Row;
43-
import org.apache.spark.sql.SparkSession;
44-
import org.junit.AfterClass;
22+
import com.fasterxml.jackson.databind.node.ObjectNode;
23+
import java.util.Arrays;
24+
import org.apache.iceberg.util.JsonUtil;
4525
import org.junit.Assert;
46-
import org.junit.BeforeClass;
47-
import org.junit.Rule;
4826
import org.junit.Test;
49-
import org.junit.rules.ExpectedException;
50-
import org.junit.rules.TemporaryFolder;
5127

52-
import static org.apache.iceberg.types.Types.NestedField.optional;
53-
54-
public abstract class TestStreamingOffset {
55-
private static final Configuration CONF = new Configuration();
56-
private static final Schema SCHEMA = new Schema(
57-
optional(1, "id", Types.IntegerType.get()),
58-
optional(2, "data", Types.StringType.get())
59-
);
60-
protected static final int INIT_SCANNED_FILE_INDEX = -1;
61-
protected static final int END_SCANNED_FILE_INDEX = 3;
62-
protected static SparkSession spark = null;
63-
protected static Path parent = null;
64-
protected static File tableLocation = null;
65-
protected static Table table = null;
66-
protected static List<SimpleRecord> expected = null;
67-
protected static final FileIO FILE_IO = new TestTables.LocalFileIO();
68-
69-
@Rule
70-
public TemporaryFolder temp = new TemporaryFolder();
71-
@Rule
72-
public ExpectedException exceptionRule = ExpectedException.none();
73-
74-
@BeforeClass
75-
public static void startSpark() throws IOException {
76-
TestStreamingOffset.spark = SparkSession.builder()
77-
.master("local[2]")
78-
.config("spark.sql.shuffle.partitions", 4)
79-
.getOrCreate();
80-
81-
parent = Files.createTempDirectory("test");
82-
tableLocation = new File(parent.toFile(), "table");
83-
tableLocation.mkdir();
84-
HadoopTables tables = new HadoopTables(CONF);
85-
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
86-
table = tables.create(SCHEMA, spec, tableLocation.toString());
87-
88-
expected = Lists.newArrayList(new SimpleRecord(1, "1"),
89-
new SimpleRecord(2, "2"),
90-
new SimpleRecord(3, "3"),
91-
new SimpleRecord(4, "4"));
92-
93-
// Write records one by one to generate 3 snapshots.
94-
for (int i = 0; i < 3; i++) {
95-
Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
96-
df.select("id", "data").write()
97-
.format("iceberg")
98-
.mode("append")
99-
.save(tableLocation.toString());
100-
}
101-
102-
table.refresh();
103-
}
104-
105-
@AfterClass
106-
public static void stopSpark() {
107-
SparkSession currentSpark = TestStreamingOffset.spark;
108-
TestStreamingOffset.spark = null;
109-
currentSpark.stop();
110-
}
111-
112-
@Test
113-
public void testStreamingOffsetWithPosition() throws IOException {
114-
Snapshot currSnap = table.currentSnapshot();
115-
StreamingOffset startOffset =
116-
new StreamingOffset(currSnap.snapshotId(), INIT_SCANNED_FILE_INDEX, false);
117-
StreamingOffset endOffset = startOffset;
118-
ManifestFile manifest = currSnap.dataManifests().get(0);
119-
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)) {
120-
long expectedPos = startOffset.position();
121-
for (DataFile file : reader) {
122-
expectedPos += 1;
123-
Assert.assertEquals("Position should match", (Long) expectedPos, file.pos());
124-
endOffset = new StreamingOffset(currSnap.snapshotId(), Math.toIntExact(file.pos()), false);
125-
}
126-
StreamingOffset expectedOffset = new StreamingOffset(currSnap.snapshotId(), (int) expectedPos, false);
127-
Assert.assertEquals(expectedOffset, endOffset);
128-
}
129-
}
28+
public class TestStreamingOffset {
13029

13130
@Test
132-
public void testScanAllFiles() throws IOException {
133-
List<ManifestFile> manifests = table.currentSnapshot().dataManifests();
134-
List<StreamingOffset> expectedOffsets = Lists.newArrayList();
135-
for (ManifestFile manifest : manifests) {
136-
expectedOffsets.add(new StreamingOffset(manifest.snapshotId(), END_SCANNED_FILE_INDEX, true));
137-
}
138-
testStreamingOffsetWithScanFiles(expectedOffsets, true);
31+
public void testJsonConversion() {
32+
StreamingOffset[] expected = new StreamingOffset[]{
33+
new StreamingOffset(System.currentTimeMillis(), 1L, false),
34+
new StreamingOffset(System.currentTimeMillis(), 2L, false),
35+
new StreamingOffset(System.currentTimeMillis(), 3L, false),
36+
new StreamingOffset(System.currentTimeMillis(), 4L, true)
37+
};
38+
Assert.assertArrayEquals("Result StreamingOffsets should match", expected,
39+
Arrays.stream(expected).map(elem -> StreamingOffset.fromJson(elem.json())).toArray());
13940
}
14041

14142
@Test
142-
public void testNoScanAllFiles() throws IOException {
143-
List<StreamingOffset> expectedOffsets = Lists
144-
.newArrayList(new StreamingOffset(table.currentSnapshot().snapshotId(), END_SCANNED_FILE_INDEX, false));
145-
testStreamingOffsetWithScanFiles(expectedOffsets, false);
146-
}
147-
148-
private void testStreamingOffsetWithScanFiles(List<StreamingOffset> expectedOffsets,
149-
boolean scanAllFiles) throws IOException {
150-
Snapshot currSnap = table.currentSnapshot();
151-
List<ManifestFile> manifests = scanAllFiles ? currSnap.dataManifests() :
152-
currSnap.dataManifests().stream().filter(m -> m.snapshotId().equals(currSnap.snapshotId()))
153-
.collect(Collectors.toList());
154-
List<StreamingOffset> actualOffsets = Lists.newArrayList();
155-
for (ManifestFile manifest : manifests) {
156-
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)) {
157-
StreamingOffset offset = StreamingOffset.START_OFFSET;
158-
long expectedPos = INIT_SCANNED_FILE_INDEX;
159-
for (DataFile file : reader) {
160-
expectedPos += 1;
161-
Assert.assertEquals("Position should match", (Long) expectedPos, file.pos());
162-
if (scanAllFiles) {
163-
offset = new StreamingOffset(manifest.snapshotId(), file.pos(), scanAllFiles);
164-
} else {
165-
offset = new StreamingOffset(currSnap.snapshotId(), file.pos(), scanAllFiles);
166-
}
167-
}
168-
actualOffsets.add(offset);
169-
}
170-
}
171-
172-
Assert.assertArrayEquals(expectedOffsets.toArray(), actualOffsets.toArray());
43+
public void testToJson() throws Exception {
44+
StreamingOffset expected = new StreamingOffset(System.currentTimeMillis(), 1L, false);
45+
ObjectNode actual = JsonUtil.mapper().createObjectNode();
46+
actual.put("version", 1);
47+
actual.put("snapshot_id", expected.snapshotId());
48+
actual.put("position", 1L);
49+
actual.put("scan_all_files", false);
50+
String expectedJson = expected.json();
51+
String actualJson = JsonUtil.mapper().writeValueAsString(actual);
52+
Assert.assertEquals("Json should match", expectedJson, actualJson);
17353
}
17454
}

spark2/src/test/java/org/apache/iceberg/spark/source/TestStreamingOffset24.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)