|
40 | 40 | import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_NEW_STREAM; |
41 | 41 | import static io.trino.plugin.base.util.Closables.closeAllSuppress; |
42 | 42 | import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.CDF_DATA; |
| 43 | +import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.CHECKPOINT; |
43 | 44 | import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.DATA; |
44 | 45 | import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.LAST_CHECKPOINT; |
45 | 46 | import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.TRANSACTION_LOG_JSON; |
@@ -92,6 +93,75 @@ protected DistributedQueryRunner createQueryRunner() |
92 | 93 | } |
93 | 94 | } |
94 | 95 |
|
| 96 | + @Test |
| 97 | + public void testCheckpointFileOperations() |
| 98 | + { |
| 99 | + assertUpdate("DROP TABLE IF EXISTS test_checkpoint_file_operations"); |
| 100 | + assertUpdate("CREATE TABLE test_checkpoint_file_operations(key varchar, data varchar) with (checkpoint_interval = 2, partitioned_by=ARRAY['key'])"); |
| 101 | + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p1', '1-abc')", 1); |
| 102 | + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p2', '2-xyz')", 1); |
| 103 | + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'test_checkpoint_file_operations')"); |
| 104 | + trackingFileSystemFactory.reset(); |
| 105 | + assertFileSystemAccesses( |
| 106 | + "SELECT * FROM test_checkpoint_file_operations", |
| 107 | + ImmutableMultiset.<FileOperation>builder() |
| 108 | + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 2) |
| 109 | + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 2) |
| 110 | + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 4) |
| 111 | + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 8) |
| 112 | + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_GET_LENGTH), 1) |
| 113 | + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) |
| 114 | + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_GET_LENGTH), 1) |
| 115 | + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) |
| 116 | + .build()); |
| 117 | + trackingFileSystemFactory.reset(); |
| 118 | + // reads of checkpoint, commits and metadata are partially cached |
| 119 | + assertFileSystemAccesses( |
| 120 | + "SELECT * FROM test_checkpoint_file_operations", |
| 121 | + ImmutableMultiset.<FileOperation>builder() |
| 122 | + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 2) |
| 123 | + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 2) |
| 124 | + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 1) // read protocol entries |
| 125 | + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 2) |
| 126 | + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_GET_LENGTH), 1) |
| 127 | + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) |
| 128 | + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_GET_LENGTH), 1) |
| 129 | + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) |
| 130 | + .build()); |
| 131 | + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p3', '3-xyz')", 1); |
| 132 | + trackingFileSystemFactory.reset(); |
| 133 | + assertFileSystemAccesses( |
| 134 | + "SELECT * FROM test_checkpoint_file_operations", |
| 135 | + ImmutableMultiset.<FileOperation>builder() |
| 136 | + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 2) |
| 137 | + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) |
| 138 | + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 2) |
| 139 | + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) // read metadata and protocol entries |
| 140 | + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) |
| 141 | + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_GET_LENGTH), 1) |
| 142 | + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) |
| 143 | + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_GET_LENGTH), 1) |
| 144 | + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) |
| 145 | + .addCopies(new FileOperation(DATA, "key=p3/", INPUT_FILE_GET_LENGTH), 1) |
| 146 | + .addCopies(new FileOperation(DATA, "key=p3/", INPUT_FILE_NEW_STREAM), 1) |
| 147 | + .build()); |
| 148 | + trackingFileSystemFactory.reset(); |
| 149 | + assertFileSystemAccesses( |
| 150 | + "SELECT * FROM test_checkpoint_file_operations", |
| 151 | + ImmutableMultiset.<FileOperation>builder() |
| 152 | + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 2) |
| 153 | + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 2) |
| 154 | + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 1) // read protocol entries |
| 155 | + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 2) |
| 156 | + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_GET_LENGTH), 1) |
| 157 | + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) |
| 158 | + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_GET_LENGTH), 1) |
| 159 | + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) |
| 160 | + .addCopies(new FileOperation(DATA, "key=p3/", INPUT_FILE_GET_LENGTH), 1) |
| 161 | + .addCopies(new FileOperation(DATA, "key=p3/", INPUT_FILE_NEW_STREAM), 1) |
| 162 | + .build()); |
| 163 | + } |
| 164 | + |
95 | 165 | @Test |
96 | 166 | public void testReadWholePartition() |
97 | 167 | { |
@@ -246,6 +316,9 @@ public static FileOperation create(String path, OperationType operationType) |
246 | 316 | if (path.matches(".*/_delta_log/\\d+\\.json")) { |
247 | 317 | return new FileOperation(TRANSACTION_LOG_JSON, fileName, operationType); |
248 | 318 | } |
| 319 | + if (path.matches(".*/_delta_log/\\d+\\.checkpoint.parquet")) { |
| 320 | + return new FileOperation(CHECKPOINT, fileName, operationType); |
| 321 | + } |
249 | 322 | if (path.matches(".*/_delta_log/_trino_meta/extended_stats.json")) { |
250 | 323 | return new FileOperation(TRINO_EXTENDED_STATS_JSON, fileName, operationType); |
251 | 324 | } |
@@ -276,6 +349,7 @@ enum FileType |
276 | 349 | { |
277 | 350 | LAST_CHECKPOINT, |
278 | 351 | TRANSACTION_LOG_JSON, |
| 352 | + CHECKPOINT, |
279 | 353 | TRINO_EXTENDED_STATS_JSON, |
280 | 354 | DATA, |
281 | 355 | CDF_DATA, |
|
0 commit comments