|
19 | 19 |
|
20 | 20 | package org.apache.iceberg; |
21 | 21 |
|
| 22 | +import java.util.ArrayList; |
| 23 | +import java.util.HashSet; |
| 24 | +import java.util.List; |
| 25 | +import java.util.Set; |
| 26 | +import java.util.concurrent.Callable; |
| 27 | +import java.util.concurrent.ExecutorService; |
| 28 | +import java.util.concurrent.Executors; |
22 | 29 | import org.junit.Assert; |
23 | 30 | import org.junit.Test; |
24 | 31 |
|
25 | | - |
26 | 32 | public class TestSequenceNumber extends TableTestBase { |
27 | 33 |
|
28 | 34 | @Test |
29 | | - public void testWriteSequenceNumber() { |
| 35 | + public void testReadWriteSequenceNumber() { |
30 | 36 | table.newFastAppend().appendFile(FILE_A).commit(); |
| 37 | + Assert.assertEquals(1, table.currentSnapshot().sequenceNumber().longValue()); |
| 38 | + table.newFastAppend().appendFile(FILE_B).commit(); |
| 39 | + Assert.assertEquals(2, table.currentSnapshot().sequenceNumber().longValue()); |
| 40 | + } |
31 | 41 |
|
32 | | - Assert.assertEquals("sequence number should be 1", 1, |
33 | | - table.currentSnapshot().sequenceNumber().longValue()); |
| 42 | + @Test |
| 43 | + public void testCommitConflict() { |
| 44 | + Transaction txn = table.newTransaction(); |
34 | 45 |
|
| 46 | + txn.newFastAppend().appendFile(FILE_A).apply(); |
35 | 47 | table.newFastAppend().appendFile(FILE_B).commit(); |
36 | 48 |
|
37 | | - Assert.assertEquals("sequence number should be 2", 2, |
38 | | - table.currentSnapshot().sequenceNumber().longValue()); |
| 49 | + AssertHelpers.assertThrows("Should failed due to conflict", |
| 50 | + IllegalStateException.class, "last operation has not committed", txn::commitTransaction); |
| 51 | + |
| 52 | + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 53 | + .longValue()); |
| 54 | + } |
| 55 | + |
| 56 | + @Test |
| 57 | + public void testConcurrentCommit() throws InterruptedException { |
| 58 | + ExecutorService threadPool = Executors.newFixedThreadPool(4); |
| 59 | + List<Callable<Void>> tasks = new ArrayList<>(); |
| 60 | + |
| 61 | + Callable<Void> write1 = () -> { |
| 62 | + Transaction txn = table.newTransaction(); |
| 63 | + txn.newFastAppend().appendFile(FILE_A).commit(); |
| 64 | + txn.commitTransaction(); |
| 65 | + return null; |
| 66 | + }; |
| 67 | + |
| 68 | + Callable<Void> write2 = () -> { |
| 69 | + Transaction txn = table.newTransaction(); |
| 70 | + txn.newAppend().appendFile(FILE_B).commit(); |
| 71 | + txn.commitTransaction(); |
| 72 | + return null; |
| 73 | + }; |
| 74 | + |
| 75 | + Callable<Void> write3 = () -> { |
| 76 | + Transaction txn = table.newTransaction(); |
| 77 | + txn.newDelete().deleteFile(FILE_A).commit(); |
| 78 | + txn.commitTransaction(); |
| 79 | + return null; |
| 80 | + }; |
| 81 | + |
| 82 | + Callable<Void> write4 = () -> { |
| 83 | + Transaction txn = table.newTransaction(); |
| 84 | + txn.newOverwrite().addFile(FILE_D).commit(); |
| 85 | + txn.commitTransaction(); |
| 86 | + return null; |
| 87 | + }; |
| 88 | + |
| 89 | + tasks.add(write1); |
| 90 | + tasks.add(write2); |
| 91 | + tasks.add(write3); |
| 92 | + tasks.add(write4); |
| 93 | + threadPool.invokeAll(tasks); |
| 94 | + threadPool.shutdown(); |
39 | 95 |
|
| 96 | + Assert.assertEquals(4, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 97 | + .longValue()); |
40 | 98 | } |
41 | 99 |
|
42 | 100 | @Test |
43 | | - public void testReadSequenceNumber() { |
44 | | - long curSeqNum; |
45 | | - |
46 | | - if (table.currentSnapshot() == null) { |
47 | | - curSeqNum = 0; |
48 | | - } else { |
49 | | - curSeqNum = table.currentSnapshot().sequenceNumber(); |
50 | | - } |
| 101 | + public void testRollBack() { |
51 | 102 | table.newFastAppend().appendFile(FILE_A).commit(); |
| 103 | + long snapshotId = table.currentSnapshot().snapshotId(); |
52 | 104 | table.newFastAppend().appendFile(FILE_B).commit(); |
53 | 105 |
|
54 | | - Assert.assertEquals(curSeqNum + 2, |
55 | | - TestTables.load(tableDir, "test") |
56 | | - .currentSnapshot() |
57 | | - .sequenceNumber() |
58 | | - .longValue()); |
| 106 | + Assert.assertEquals(2, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 107 | + .longValue()); |
| 108 | + |
| 109 | + table.rollback().toSnapshotId(snapshotId).commit(); |
| 110 | + |
| 111 | + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 112 | + .longValue()); |
| 113 | + } |
| 114 | + |
| 115 | + @Test |
| 116 | + public void testMultipleTxnOperations() { |
| 117 | + Snapshot snapshot; |
| 118 | + Transaction txn = table.newTransaction(); |
| 119 | + txn.newOverwrite().addFile(FILE_A).commit(); |
| 120 | + txn.commitTransaction(); |
| 121 | + Assert.assertEquals(1, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 122 | + .longValue()); |
| 123 | + |
| 124 | + txn = table.newTransaction(); |
| 125 | + Set<DataFile> toAddFiles = new HashSet<>(); |
| 126 | + Set<DataFile> toDeleteFiles = new HashSet<>(); |
| 127 | + toAddFiles.add(FILE_B); |
| 128 | + toDeleteFiles.add(FILE_A); |
| 129 | + txn.newRewrite().rewriteFiles(toDeleteFiles, toAddFiles).commit(); |
| 130 | + txn.commitTransaction(); |
| 131 | + Assert.assertEquals(2, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 132 | + .longValue()); |
| 133 | + |
| 134 | + txn = table.newTransaction(); |
| 135 | + txn.newReplacePartitions().addFile(FILE_C).commit(); |
| 136 | + txn.commitTransaction(); |
| 137 | + Assert.assertEquals(3, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 138 | + .longValue()); |
| 139 | + |
| 140 | + txn = table.newTransaction(); |
| 141 | + txn.newDelete().deleteFile(FILE_C).commit(); |
| 142 | + txn.commitTransaction(); |
| 143 | + |
| 144 | + Assert.assertEquals(4, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 145 | + .longValue()); |
| 146 | + |
| 147 | + txn = table.newTransaction(); |
| 148 | + txn.newAppend().appendFile(FILE_C).commit(); |
| 149 | + txn.commitTransaction(); |
| 150 | + Assert.assertEquals(5, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 151 | + .longValue()); |
| 152 | + |
| 153 | + snapshot = table.currentSnapshot(); |
| 154 | + |
| 155 | + txn = table.newTransaction(); |
| 156 | + txn.newOverwrite().addFile(FILE_D).deleteFile(FILE_C).commit(); |
| 157 | + txn.commitTransaction(); |
| 158 | + Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 159 | + .longValue()); |
| 160 | + |
| 161 | + txn = table.newTransaction(); |
| 162 | + txn.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit(); |
| 163 | + txn.commitTransaction(); |
| 164 | + Assert.assertEquals(6, TestTables.load(tableDir, "test").currentSnapshot().sequenceNumber() |
| 165 | + .longValue()); |
59 | 166 | } |
60 | 167 | } |
0 commit comments