From 92317a1d9ac276212360aec9aa8cebee758635ba Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Mon, 16 Oct 2023 12:00:26 +0200 Subject: [PATCH 1/6] Test accuracy for reading multi-part checkpoint files --- .../plugin/deltalake/TestDeltaLakeBasic.java | 16 ++++++++++ .../deltalake/multipart_checkpoint/README.md | 28 ++++++++++++++++++ .../_delta_log/00000000000000000000.json | 3 ++ .../_delta_log/00000000000000000001.json | 2 ++ .../_delta_log/00000000000000000002.json | 2 ++ .../_delta_log/00000000000000000003.json | 2 ++ .../_delta_log/00000000000000000004.json | 2 ++ .../_delta_log/00000000000000000005.json | 2 ++ ...6.checkpoint.0000000001.0000000002.parquet | Bin 0 -> 14125 bytes ...6.checkpoint.0000000002.0000000002.parquet | Bin 0 -> 12886 bytes .../_delta_log/00000000000000000006.json | 2 ++ .../_delta_log/00000000000000000007.json | 2 ++ .../_delta_log/_last_checkpoint | 1 + ...4b96-8775-074e4e99e771-c000.snappy.parquet | Bin 0 -> 449 bytes ...4370-b4ec-9129845af85d-c000.snappy.parquet | Bin 0 -> 449 bytes ...4c93-ac3d-c1b26e2c5f3a-c000.snappy.parquet | Bin 0 -> 449 bytes ...4d91-a23d-1049d2630b60-c000.snappy.parquet | Bin 0 -> 449 bytes ...4582-b758-2424da5aef0f-c000.snappy.parquet | Bin 0 -> 449 bytes ...4425-ab3e-be80963fe765-c000.snappy.parquet | Bin 0 -> 449 bytes ...413a-ade1-409a1ce69d9c-c000.snappy.parquet | Bin 0 -> 449 bytes 20 files changed, 62 insertions(+) create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/README.md create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000000.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000001.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000002.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000003.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000004.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000005.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000006.checkpoint.0000000001.0000000002.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000006.checkpoint.0000000002.0000000002.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000006.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000007.json create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/_last_checkpoint create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-07432bff-a65c-4b96-8775-074e4e99e771-c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-30d20302-2223-4370-b4ec-9129845af85d-c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-4a4be67a-1d35-4c93-ac3d-c1b26e2c5f3a-c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-68d36678-1302-4d91-a23d-1049d2630b60-c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-81f5948d-6e60-4582-b758-2424da5aef0f-c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-aeede0e4-d6fd-4425-ab3e-be80963fe765-c000.snappy.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-c2ea82b3-cbf3-413a-ade1-409a1ce69d9c-c000.snappy.parquet diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index c3c45a1b7cfe..64f3e70d4de4 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -935,6 +935,22 @@ public void testStatsWithMinMaxValuesAsNulls() """); } + /** + * @see deltalake.multipart_checkpoint + */ + @Test + public void testReadMultipartCheckpoint() + throws Exception + { + String tableName = "test_multipart_checkpoint_" + randomNameSuffix(); + Path tableLocation = Files.createTempFile(tableName, null); + copyDirectoryContents(new File(Resources.getResource("deltalake/multipart_checkpoint").toURI()).toPath(), tableLocation); + + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + assertThat(query("DESCRIBE " + tableName)).projected("Column", "Type").skippingTypesCheck().matches("VALUES ('c', 'integer')"); + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES 1, 2, 3, 4, 5, 6, 7"); + } + private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocation) throws IOException { diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/README.md b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/README.md new file mode 100644 index 000000000000..99d540dbca44 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/README.md @@ -0,0 +1,28 @@ +Data generated using Apache Spark 3.4.0 & Delta Lake OSS 2.4.0. + +This test resource is used to verify whether the reading from Delta Lake tables with +multi-part checkpoint files works as expected. + +Trino +``` +CREATE TABLE multipartcheckpoint(c integer) with (checkpoint_interval = 6); +``` + +From https://docs.delta.io/latest/optimizations-oss.html + +> In Delta Lake, by default each checkpoint is written as a single Parquet file. To to use this feature, +> set the SQL configuration ``spark.databricks.delta.checkpoint.partSize=``, where n is the limit of +> number of actions (such as `AddFile`) at which Delta Lake on Apache Spark will start parallelizing the +> checkpoint and attempt to write a maximum of this many actions per checkpoint file. + +Spark +``` +SET spark.databricks.delta.checkpoint.partSize=3; +INSERT INTO multipartcheckpoint values 1; +INSERT INTO multipartcheckpoint values 2; +INSERT INTO multipartcheckpoint values 3; +INSERT INTO multipartcheckpoint values 4; +INSERT INTO multipartcheckpoint values 5; +INSERT INTO multipartcheckpoint values 6; +INSERT INTO multipartcheckpoint values 7; +``` diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..ba5929dec80d --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"version":0,"timestamp":1697439143958,"userId":"marius","userName":"marius","operation":"CREATE TABLE","operationParameters":{"queryId":"20231016_065223_00001_dhwpa"},"clusterId":"trino-428-191-g91ee252-presto-master","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"ce0eab6c-75a5-4904-9f90-2fe73bedf1ce","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpointInterval":"6"},"createdTime":1697439143958}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000001.json b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000001.json new file mode 100644 index 000000000000..6f728af0f9e0 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1697439172229,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"449"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/2.4.0","txnId":"ff40545b-ceb7-4836-8b35-a2147cf21677"}} +{"add":{"path":"part-00000-81f5948d-6e60-4582-b758-2424da5aef0f-c000.snappy.parquet","partitionValues":{},"size":449,"modificationTime":1697439172000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c\":1},\"maxValues\":{\"c\":1},\"nullCount\":{\"c\":0}}"}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000002.json b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000002.json new file mode 100644 index 000000000000..72ec0c619113 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1697439178642,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"449"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/2.4.0","txnId":"e730895b-9e56-4738-90fb-ce62ab08f3b1"}} +{"add":{"path":"part-00000-4a4be67a-1d35-4c93-ac3d-c1b26e2c5f3a-c000.snappy.parquet","partitionValues":{},"size":449,"modificationTime":1697439178000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c\":2},\"maxValues\":{\"c\":2},\"nullCount\":{\"c\":0}}"}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000003.json b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000003.json new file mode 100644 index 000000000000..5ce98ef913c7 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000003.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1697439181640,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"449"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/2.4.0","txnId":"0ee530c2-daef-4e2e-b2ad-de64e3e7b940"}} +{"add":{"path":"part-00000-aeede0e4-d6fd-4425-ab3e-be80963fe765-c000.snappy.parquet","partitionValues":{},"size":449,"modificationTime":1697439181000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c\":3},\"maxValues\":{\"c\":3},\"nullCount\":{\"c\":0}}"}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000004.json b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000004.json new file mode 100644 index 000000000000..8a079b24cc25 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000004.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1697439185136,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":3,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"449"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/2.4.0","txnId":"3b236730-8187-4d5d-9c2e-ecb281788a15"}} +{"add":{"path":"part-00000-30d20302-2223-4370-b4ec-9129845af85d-c000.snappy.parquet","partitionValues":{},"size":449,"modificationTime":1697439185000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c\":4},\"maxValues\":{\"c\":4},\"nullCount\":{\"c\":0}}"}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000005.json b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000005.json new file mode 100644 index 000000000000..92ad5eba1c9a --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000005.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1697439189907,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":4,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"449"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/2.4.0","txnId":"1df6e97f-ab4c-4f6d-ac90-48f2e8d0086b"}} +{"add":{"path":"part-00000-68d36678-1302-4d91-a23d-1049d2630b60-c000.snappy.parquet","partitionValues":{},"size":449,"modificationTime":1697439189000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c\":5},\"maxValues\":{\"c\":5},\"nullCount\":{\"c\":0}}"}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000006.checkpoint.0000000001.0000000002.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000006.checkpoint.0000000001.0000000002.parquet new file mode 100644 index 0000000000000000000000000000000000000000..5a8652e15f98a056c62b73ac16c06a76a5ecbf14 GIT binary patch literal 14125 zcmeHO4Qx}_6@GRc$0TV8A@^aA6w(x|(8Zqr^V6uJ0kr{T9iogOO#WL1)6GhP$A%xbJO*BOeRaLul?tAaP zmuDxjlhV;?RY5d%_`)E8_8Z+}Wg%PVaRC>*M{R)bSu!>-R)`9#hwDpU>xa1^lfZ zS2!R>TpGhI*r}K)@Gt31PqJ3X5%?_K?37+Gf^XqJz^EfUdF%D#FP9^RMV^#pjfxDe3@Gj+UF7K4>&KU?QefKF z?olXg`?^{W0NU<7^53(&<-fp(Vn4n3+>iCsZEH}GfdI$&@xk{#X9UJUmYaYI2YLSV zncq~Z;(=<`AeYsu&NKDL*Cx#>ndV#+2A%JA{{CNOWZXoqot*iI%_fZC-=?#Q!Fpi2 zo@`3?C$@``R3@5j>hQKSC1S~)LcCvuPfs>QnmU4m&?oHas^=P${qgvwRDV*squJ&e z9Q@HXjtgwv#+mB>1}#5r!7s4ytzMN2;iDJ)<=EdZu!%Ez0d-UKf~&`Vaam6_%L`<0 zUP#zi{rm5cFgf!1&mYz~`8Q5e4;kXtfe9iIZiH|s;%W^FL06#N6L7WnwtHMYh&BGO z813~&L}ku@aBSv>%$nL_*{J%&eBZAl&E%z@-8^K|=~1_t>M_n$fF4S$RF?1b)7PG6 zq{ijTSy3X+ZxLi~n#eg5nRxra`5K+*$hELygI#o$NPE*_Q%6%)%JfI1rk1AOm>9hNveu&w@S8@w| zG&}4Y<^3CWWsi$)!nk+OvkT|#uCpN)vq5K2SxYg>+SF~{IrjVjz^AYZe6d$q)moT}smZ_9O<2t}7-? z7fFVeu@&Qu6-rEY*wt3itl$Al3#rrtmIRTQLs_t0UoPi z-Pg6|*mA8`YmU`2eTG&reI{2h+FTuLPHN5EO08FG=IXUxt(hYvr>Zu>Gm0s#IkJlE zRCJ$N#grJgvu4ggK2sqMu4dgASCe72d#Zu#QpbNM*`_p~Swp&&<~gl-crAmtq%^^A zK7FveItxqku}8CT1V{*^k_(#cWzwD`fdf(Xdb7P8vd%5h^;}N_I0>;pPsM&vs zaO~(8AvXkO&RzyRs#vNB{Z{QkMJ(Ca_eEKY+C(ZE>y1SOMBfohKsN^zm3VO4v|C8_ ziKv@J68f@BkO~kru*^j|1Ti9|GNf7_X;l_J2z}xXIPJ+UPj756A}la&N1 ztAK)X){K})4Tz*B-~4puR;bFOg@8Aa&NmaaTfMkP5>dG5`rYVGoYq3sexEj%xDx9 zDax`GkEF}2T3)*kOPN@*PlK;z%WYFC-k(UK9BWUfyxuJFSXLrt*-{a{zWxkdP)k@c)bFYy84=?HU6^QLu!WS0q~cI}qwZe|C4}vw0M^;5#ui)^*!`VM z3=$Hx8$AWEfbao8?9Yg$465b{id8BnU+HLYeCl>W?pW;zS~;7e83Uvo7>Sv$QtKn{we1vtMg`ggO-G8E#A`1 zQZucWbF@Gy)Pjo?HUR2W$5GbhyUBf=^M))-c1Mtvw*ykSaXt-{omy(A<$EYpP5884 z{RE7ov=`Q)v?~?h*{TnNG~LEZ<=n@20Zk{Qitq!-^sgT4uZT6hjx=%3QM!=WT+&Us zzyx;lJwR$O_Em%#UXK%aeO?&DtX($(4Tk{ec?vYKj4hEioHfH1uix6~SaG#aEHfx} z1o2*_c*9U5R+d~LJj0QIK@L|MAWtCVl&m5^nq~7Bf>e$Y3~Is%^#($HB5PMQFO>cu z!tjQx460p3ydzGWh6%t^B2*#Os&vN~hBslvyM}nbrFg?o(U}iVcL-v5!_@};o^1uZ zcV+zmPagow>6wYLK5yY?OwI{lwlw!FSYDXFHUK4VrTPtJZu`P$3luB#X!>O|(h4zg z#z|x17}WpNktruXExMzVj@1~;{5;AnQ|$W})FlQKyLA0)x$1twFT zguEo{vmoSL(QDF(dKpoNzK+Jrt<*wJVMUDrEVt62<0puAl%h>8h*mI$O&TE&wFBfC zS;2ZX4RcwtP#DYA8?<~8Auq~WE(}>Hl1&;R&miPYSxYE-tLvTRD3F`bRk7d1=SB$pN8oxnZ(J5kvev%{YUjHAx^>7fd0#=T_3m|{1D4bcH$560T&;RSH}kK& zasVv>sHs1lL3FT#s|}8D4ISaI7ai};^$x{$18uG0&m-@Qpv4h_QtdQG;;0F|BHP+>`u0mG&9TV z%OBLvJ^VJHgse<0>}xH+FVL%(HipvP@nKf>%o?__HnW;aUM+=++{||ad8d3~ApZtY z^!h;lq!;lvia6vaZq9jbE)lcpGEBrd6oHoy@^2KOebS4VMiH-45r@}m)1nrJ3`L&8 zu6lT_!BuY{=v6ZL-Uv+F;FK`$9W4|W3R{s4gT*YDyhoi={7u^#SATI}{zrq3;; z1*n|6X%XG6RL1S)es{p_;p`joD6)^o)hAr;OzH`@db@8wzP6a433Vs-W}p~)TD)f* z_!a?5gbO_~XkiZ&l<{Z)@l+}7x~qZGARbZfT<3fTp{WQ7JHsx>FR7Z)Q-z?U!Om;mP7^LW$Jf|TmU z?3RzoE-^xUAuGGkRidU)$${d3DFUk#_5XOI-F|DXQF7F}t>aAz0YkliW~L}6WW=;% zP|5oaeU8eWq81kbwar3z`^_{iSp~V$!uX3Bv3jA-m`WC^+`lsbCiP1>#g*U@`U`%h zZOOxBp;LJez>wIug-k}+3*E|l0ON5L9tAA=q&n+CLB8ZK#p0UJ>K-I8U5eTqOPDk@ zmt_K&ndjO0jSnFhb5qIT(D-;_2~e}2TreOupj+rSvvc+@LG*$%{Kz0@-wJC9A3eAd znMNf^ZuGjnKDWQIIouzMM;m+Fq7gCbZSzF}5pQoeDn>$)NZ9Xf3kLnI;h^C0iXkx? Y3U7d?CGfQFHu>S|n>lV9{Eu?~0lqG^LjV8( literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000006.checkpoint.0000000002.0000000002.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000006.checkpoint.0000000002.0000000002.parquet new file mode 100644 index 0000000000000000000000000000000000000000..fc88d59544d60a9aaf1ba3280ecb7ffc4750a63e GIT binary patch literal 12886 zcmeI3e`s6B9l)PtMNwkMaT4FFl&Yz#EuPz1s%6WLQ%uR?uK9u2g{;d4qm<{TCs$Oz z`tLN@l^lL#9%Typbhjy z6a9m|(a|0KgVEkZZz8GmD_W{26;*qBdSb<#QYajWf#@^inlUW(?vXwjN+^j@ZD5BI zjVJs1qX~6qUsO^1l2J82+B=~2s{N@xrM@^Q5t^J*do^XSceF36j;8vei8!=Yl3F~P z=-H{nRc&Bra;F;IzX$EIm24wFef7s@H`=4}OVWlOFSVOBG-wqj9pu%8e=d0)ty&Xi zD<8H!2}bsjHquU>FMYVMeu#Jp)CaX#bsbh8QWLaz znCvD~v(tZHZ}$Roc<*7W#p!SE-tj!p+MY0fPrYpZ2085Z!k^we&$I)%K-eZj103XM zKl|htJ{v3!655!9T)K2^YJ;Oy)gbq=m9#KdIl0De##b7};7YuczyG+2yzU{to-2LU z;uyidJ(n$mjlgt07s-uh4{B;&PZlE&#k(WfbncLn8Q0*`a}gD)jzXPscz3(hksHrs zhV$b&V?$S<=jhRs2PCO?{{hL<{tlD|XbJuUrum|`=yH5q1%G+vA8)f|BCY~Dy|~mD zU-{+x&Q?{cz+BDCb@t`=tMkVVPD6vumYa`c z8w&0+&q6ri;P_j~X;zJB=s*+BQyrC+8c{Z2pHiV9E84;^mVkZqK#`p~78}0yfMGn; zGRt@7%8afWqs<%j+BKfsa^a022296ag}^!!F=3m{mF{0QZhUWjezpqB0kR7=h!hIT zds*p(hZQe-$hjt~!8IQlv5IqR$YWOVil01U6{pscf>k^gAiJ&N#Q+nVT*t)9>sTMt z>shhPi_^`#mKUd6cx?+=@>qS$x3cQVjjTA&i<518gX2nHboDH+K z*R3M#(HndAwiLldKmBA;eM-wJWWChy4K^8vbHp!^mhIl)8VF1FCAUi>VZ877pl0oa zrWe!socXCaqZEzr7LtmgC8?~EOp-QJ)>lxBrz}yxZp5=7ZGfohH6S+dj&uXI-xpl> zw07jl3A|ZCyZ{|b!6vA&yQPZIZ|xdX#8*uJP&2h?%jT2mR9aP#{&+eI)e^K=4{9mH zPbs-E4R!Mw%2;s~y#k^&sbw@e1Wh&aI%zRS+FXPW%9!@}k%DHch)TDj1-`BC%Vp9z z?U4!lD=&QI&!~J~ZP{_T_dTF|?v(S5?uI>70^D8%L}fxMv^CKf5oN)DBy7 zQ#2H#XhA_azpiET6B=o)H9wuXO}6rAA&|`$YRyFL)~+2kH0US!XgZ^POEVO-BU9KL0Zm3mk8M^M8Rb4~>KsP2@7;ItW)qDn$Gj#d4L0EZEgVcn6$o4I` zE3o=|dfMRCxKe-x4Ic&KxURVwRO_>p-4rv z{)9&Wyc8v}e0&@BeHj2uYjDlfB~!_dn&fj$@cqz$)HUq5rLW|QjVY_36SAVKK0urn zJR2oJ`OG#lhzGw+Tiex*>jg;Hg1}w^tqkDZwz8&@iuepkd}9ZPO5O zjd?2JJnxqu137_NM&>(|dBsm>%jd@>g2qO#PPLhkf7E%!-+9s>=8*+2$x;`KH+ZcS z>W1`~4>u_s0Pstam$0uJ9pquDa|+UrY*j&DH76kLHzOCI?lTu=pi=4MT5^gO&F4u)G`E3_<(@KR~9xovayTUAvD&q|S?UBeA)phw@D( z@S?b@&Liej1SfEEUDX5%7-7~X*9o)s27)e7P?>`0#va636OMT2(N4#TyS*&(-bLPD zDQ_Cmme!W5#}lpu3^~1CQ1L1vKQ&bZq}McmJ*0J&V5qW)df`Dpo%{-#ZMylEDCb3l z@ur&v)y^RA9OadPXRX6}sx>>VF^pFhc^8m3Z|Voh$XoFommtQQZV~jGN8X~TAMl(G za1FgPQPI;rdCAjR24SgdWZ9Vc)&Vc1J*|u=q&|bo)2zIyc@cKxQdts})12Gr{RhD( z{`{-dC(c6x-5!~B^W)wf7wLSfV3r$bmQp9(J&@84*!J#QUZ)Z8eCxH&Cumo1o8Y2>Z#`aAflUvmCpnPJ^e#ya;^@p{FVI z)TYLvg6lc8NznBp#J)zc6~GF!z|Rb{T16B-76;)n6)tUN^FL^fv=P@;+AQevA|kJw z`YemQnfI!Q)LEpy)P*xr+QMB+C)uZr9geZZZg+__*K>s3W@kVgYMJD{Uj9QE;#fIW}cV_!#GF4VzR$A5K9iP!4|5&8x~XDD=L zYpt#~g9?_M*(&JzE@FRAv5SDU9VWAZGrVMmTofr+k@7c6nZ27iq%IbV*2JI;HG8+9 z&rd`%SkUhRLeEp^^?K0iIJiFa zx(J;|=mJ9JPq)eb&Vc;-J?)S*?hXd?`dCaUD3C#n(PU!G$`A(E^u-df9w~T#O^f0) z8T;Ql#Ps~LG0eGOk}3F`d!LIKkVT6;6e$||xN1bYBdN5O!T0AMz$FcGFYpz_-%xM1 zYot9*VloLNL#hNm<8@0J%>WYm(%>oEQUh*ZM~`;1?yT`}Kql`-+>s)6hj_wzP-#F# z2wWJu5sP4Ak_osr2um$l7ucG)$SMV(LokjRQFAU$i_>VjZsl-3XDI1h@iCYab3Pg$ zJqFr*H11|cOdYS3!*k7?Fq+S2GzHgcgDVX))J&Ys<88L4ohCj+bKJG+++t%o^59Hv z1*^-MG+#N#%&>F$bk5MmG#x4$oDD}-eianQ-H}sVu2G))cZYD{L15chF7ku+^ zf_1YS9ljM-}k>pSAk`k%|ruJ@$E6>{w;{+~RsJyBo!8~zH&mxgPvbw~Rf z7Y@UIhhwH_#AT%UHOTe3Lob*1RmQcn;}wu;>>6SQ!%V)B5!(-oOvrVo);lBvuwHM= zy~D+da6WW0ZCwwSxlZd{l#$rD6caNShL?6;xoWoP(_qf&4` z>?M5sjb`-@D@g8$$Kt)QzK*WZ@pLBHks3^@S~5P^t0vTVYBZ^-1FAaO7a#2J@7poj guk^&V0WCQ&dOtj|!#^>)Lw=<2c1bz_|3}Gx0db-{D*ylh literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000006.json b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000006.json new file mode 100644 index 000000000000..94cd9a799777 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000006.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1697439194248,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":5,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"449"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/2.4.0","txnId":"c757b395-39ce-4007-871f-b648423ec886"}} +{"add":{"path":"part-00000-c2ea82b3-cbf3-413a-ade1-409a1ce69d9c-c000.snappy.parquet","partitionValues":{},"size":449,"modificationTime":1697439194000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c\":6},\"maxValues\":{\"c\":6},\"nullCount\":{\"c\":0}}"}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000007.json b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000007.json new file mode 100644 index 000000000000..60e3daf0a5a0 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/00000000000000000007.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1697439206526,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":6,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"449"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/2.4.0","txnId":"692e911c-78a9-4e97-9fdd-5e2bf33c7a2a"}} +{"add":{"path":"part-00000-07432bff-a65c-4b96-8775-074e4e99e771-c000.snappy.parquet","partitionValues":{},"size":449,"modificationTime":1697439206000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c\":7},\"maxValues\":{\"c\":7},\"nullCount\":{\"c\":0}}"}} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/_last_checkpoint b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/_last_checkpoint new file mode 100644 index 000000000000..e5d513c4df3a --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/_delta_log/_last_checkpoint @@ -0,0 +1 @@ +{"version":6,"size":8,"parts":2,"sizeInBytes":27011,"numOfAddFiles":6,"checkpointSchema":{"type":"struct","fields":[{"name":"txn","type":{"type":"struct","fields":[{"name":"appId","type":"string","nullable":true,"metadata":{}},{"name":"version","type":"long","nullable":true,"metadata":{}},{"name":"lastUpdated","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"add","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"modificationTime","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}},{"name":"maxRowIndex","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"stats","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"remove","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"deletionTimestamp","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"extendedFileMetadata","type":"boolean","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}},{"name":"maxRowIndex","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"metaData","type":{"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"description","type":"string","nullable":true,"metadata":{}},{"name":"format","type":{"type":"struct","fields":[{"name":"provider","type":"string","nullable":true,"metadata":{}},{"name":"options","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"schemaString","type":"string","nullable":true,"metadata":{}},{"name":"partitionColumns","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"configuration","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"createdTime","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"protocol","type":{"type":"struct","fields":[{"name":"minReaderVersion","type":"integer","nullable":true,"metadata":{}},{"name":"minWriterVersion","type":"integer","nullable":true,"metadata":{}},{"name":"readerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"writerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"checksum":"e3aeff08e804e2c1d2d8367707f7efca"} diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-07432bff-a65c-4b96-8775-074e4e99e771-c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-07432bff-a65c-4b96-8775-074e4e99e771-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..8cd919e54929c64565d1453c4d0a283fbae98aec GIT binary patch literal 449 zcmZWmO-sW-5S^}HK?D)H%P!;)7A-B*rD>a5iWfZ;@gT*MAR?R1wic7LO;SZ_|BpY= zgBQWEttT%t^WMz7H|!2BZhZ>0Oga7j`usR_DM7eP9f18~0svd3J&fXr2)@0l)`=3q zkan3v`&aHx5JT4py?0WOh>!}?{kuX8@hMoLdLTB?=1da+qPo}_%IrP`_iy!LM* zpT!Cfoy~2bvdODTY@s7vXnwbl#oN@%p3IAEZVFkK!?BH{T=wo4GS$g%%OJa%8mEOF z*$jzv9>+S2v5$+kF8?HDZ?RlHEP-Ed3{R8H+g#}R%5SQcR}JUmIP!+w$k@p5Hci|3 k!!WX@V@%lcyFt)83j^KoZO2BPuqIp=AAD8<7~tpq0Q$&hasU7T literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-30d20302-2223-4370-b4ec-9129845af85d-c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-30d20302-2223-4370-b4ec-9129845af85d-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..6bbd485721bf74f09d72308b6a0a5f6a66774954 GIT binary patch literal 449 zcmZWmO-sW-5S^}7g6Ki$E*r=pELvKqOVc#96fb%x;z5chK}0s2Z7rs0n-4{5PyP~r zjlaQveMD?t-Be|w z#AZl4%%Vdq0CDT;AX}*P`7zT^W~L}X@`Uh;21abhcIp<2+rb`KHjOE#8D;&_bZX8*>6iDyPcXQ zxtW+0iKK{Q6~@@dMbi?0l47u0uOHUHueXM$v((*Q=z7xYNZ)OQ#WarGaX-=~^7{If7nZO`;f)C-%oydQkzW{v;= literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-4a4be67a-1d35-4c93-ac3d-c1b26e2c5f3a-c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-4a4be67a-1d35-4c93-ac3d-c1b26e2c5f3a-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..07e176899a7b433be97beaf26ffb41518b746a78 GIT binary patch literal 449 zcmZWmO-sW-5S^|cMDZeYmtDvqELvKKOVck)@uG(!9;A2@L}ats)?k{pNvcTg$zS0= z@sId_oPK!nGBfYZ%zMM`;PNh@K+BZVukX*F1D6tn`_utAJRtzEQ(8rFOa$LuSDQqM zU`V^np?xd>v44G(Pj&Y4l$&Qe)|4Q5N_fctBf=GK#X)h=-Umk@NeLtDiuEhb#YS-x z8C2Ic8-xaU4e9`}4_aooQ~@rLio^P4wD&omWFw`g8ZFh@UA@{Yk0&YLYpRy21F!v? z$Y+VdLuYeasBH44#1EG9b>{~&~3My=V4pd1KY7tC#(tA#Rs3200#JZzci_4VE_OC literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-68d36678-1302-4d91-a23d-1049d2630b60-c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-68d36678-1302-4d91-a23d-1049d2630b60-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..08abfd031ef1500645937d375e566ee5235a6f43 GIT binary patch literal 449 zcmZWm!AiqG5S^~J1o0wtmks0)7A-BrrD@t)iWfZ;@gT*MAR?R1wg%I*O;SZ_|H5DJ zQ~V~cj!ivznVI)y=DlHed~xejpmoaW$LH5~)u9AojamQ)#{>ZODpid35wZFHwki`P zHbdHB79C*$h`p=Be4(=E$6P;|nW6;A6T&MR7_lAOX;>(3_iA9-NK(QGyJVw=bzUl# zkwJ57QzA6LYfuM(z0*3op%QS3G%Pl%qrJ!ZJex|jP-rQ)?()TCc|1>fHIQA|2HyHN zkuMX8hb~v9klFlI&P<^qRVaS97RB4bh@r@fY^4j~iAijdC>O)~wMfv#_VyzUiB&AGU1A!3SSf0LJ)vKg*kDz5oCK literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-81f5948d-6e60-4582-b758-2424da5aef0f-c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/multipart_checkpoint/part-00000-81f5948d-6e60-4582-b758-2424da5aef0f-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..60fa1ce9a490843497a5cad3ec8004d42c515ff4 GIT binary patch literal 449 zcmZWmO-sW-5Z$g?f_f3U%P!;)7A-BrrD>a5iWfZ;@gT*MAR?R1wg!{5&4(hjXa9}= z!oMYsO+9&;o%d$uy})?in-10rUl~MuOo{CNifCql8&3!d8JrI zidq|+3V{J^13C!NchaCYL?Rf%O^c2jVDBTO9?es>*OPtOL9G3+ zBVWc67G0L6klFlICZX`S6r3AjX>7Ms-3zQg$Ecz!CRc&V6~9}H{B3E(SY&xx=v;KfEHqJ&iShkfBr5)GX=Ha(VUnA< zNs&m3C{lieeOxph@h2(9>&@n21N?etcp9gT-GvQTx;^PNT7D5mL1Q)wvS5T34DgLo0T%LZ}?i*+^2t2)ks%x^=!$ z+(ZVAwao^h0bYYT0PLOC*bS9{OQdeGVGZp)&S%*~s<}c-xptQ?Cd=bl%J(|5D_g*u z|0eQ9BJt2gX$qOmUggvjDpG~wcS}*c&5am{yvRyj2v3Y-lSH`~+%H9{roSzX>~<%Tj)M=rr~r)c^L_v Date: Mon, 16 Oct 2023 12:01:20 +0200 Subject: [PATCH 2/6] Test multi-part checkpoint table file operations --- .../TestDeltaLakeFileOperations.java | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 71065d635569..45d903681eb5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -16,6 +16,7 @@ import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; +import com.google.common.io.Resources; import io.trino.Session; import io.trino.SystemSessionProperties; import io.trino.filesystem.TrackingFileSystemFactory; @@ -30,6 +31,8 @@ import java.io.File; import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Arrays; import java.util.Map; import java.util.Optional; @@ -51,6 +54,7 @@ import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.STARBURST_EXTENDED_STATS_JSON; import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.TRANSACTION_LOG_JSON; import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.TRINO_EXTENDED_STATS_JSON; +import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; @@ -95,7 +99,8 @@ protected DistributedQueryRunner createQueryRunner() Map.of( "hive.metastore", "file", "hive.metastore.catalog.dir", metastoreDirectory, - "delta.enable-non-concurrent-writes", "true")); + "delta.enable-non-concurrent-writes", "true", + "delta.register-table-procedure.enabled", "true")); queryRunner.execute("CREATE SCHEMA " + session.getSchema().orElseThrow()); return queryRunner; @@ -701,6 +706,28 @@ public void testShowTables() assertFileSystemAccesses("SHOW TABLES", ImmutableMultiset.of()); } + @Test + public void testReadMultipartCheckpoint() + throws Exception + { + String tableName = "test_multipart_checkpoint_" + randomNameSuffix(); + Path tableLocation = Files.createTempFile(tableName, null); + copyDirectoryContents(new File(Resources.getResource("deltalake/multipart_checkpoint").toURI()).toPath(), tableLocation); + + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + assertFileSystemAccesses("SELECT * FROM " + tableName, + ImmutableMultiset.builder() + .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000001.0000000002.parquet", INPUT_FILE_GET_LENGTH), 6) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000001.0000000002.parquet", INPUT_FILE_NEW_STREAM), 3) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000002.0000000002.parquet", INPUT_FILE_GET_LENGTH), 6) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000002.0000000002.parquet", INPUT_FILE_NEW_STREAM), 3) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000008.json", INPUT_FILE_NEW_STREAM)) + .addCopies(new FileOperation(DATA, "no partition", INPUT_FILE_NEW_STREAM), 7) + .build()); + } + private int countCdfFilesForKey(String partitionValue) { String path = (String) computeScalar("SELECT \"$path\" FROM table_changes_file_system_access WHERE key = '" + partitionValue + "'"); @@ -742,12 +769,11 @@ private record FileOperation(FileType fileType, String fileId, OperationType ope { public static FileOperation create(String path, OperationType operationType) { - Pattern dataFilePattern = Pattern.compile(".*?/(?key=[^/]*/)?(?\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); String fileName = path.replaceFirst(".*/", ""); if (path.matches(".*/_delta_log/_last_checkpoint")) { return new FileOperation(LAST_CHECKPOINT, fileName, operationType); } - if (path.matches(".*/_delta_log/\\d+\\.checkpoint\\.parquet")) { + if (path.matches(".*/_delta_log/\\d+\\.checkpoint(\\.\\d+\\.\\d+)?\\.parquet")) { return new FileOperation(CHECKPOINT, fileName, operationType); } if (path.matches(".*/_delta_log/\\d+\\.json")) { @@ -759,6 +785,7 @@ public static FileOperation create(String path, OperationType operationType) if (path.matches(".*/_delta_log/_starburst_meta/extendeded_stats.json")) { return new FileOperation(STARBURST_EXTENDED_STATS_JSON, fileName, operationType); } + Pattern dataFilePattern = Pattern.compile(".*?/(?key=[^/]*/)?[^/]+"); if (path.matches(".*/_change_data/.*")) { Matcher matcher = dataFilePattern.matcher(path); if (matcher.matches()) { From e19de6b574e07ae1f38d18d59ff56e5ef226670d Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Mon, 16 Oct 2023 17:03:46 +0200 Subject: [PATCH 3/6] Simplify occurrences builders in TestTransactionLogAccess Typically metadata files should be accessed once, so prefer `.add(x)` over `.addCopies(x, 1)`, so that `.addCopies` stand out as potentially something to address. --- .../deltalake/TestTransactionLogAccess.java | 70 +++++++++---------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index f8effbbcb2e3..a7e23a1b0fb1 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -556,11 +556,11 @@ public void testIncrementalCacheUpdates() assertEqualsIgnoreOrder(activeDataFiles.stream().map(AddFileEntry::getPath).collect(Collectors.toSet()), originalDataFiles); }, ImmutableMultiset.builder() - .addCopies(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .add(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM)) .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) - .addCopies(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM), 1) + .add(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) .build()); copyTransactionLogEntry(12, 14, resourceDir, transactionLogDir); @@ -575,10 +575,10 @@ public void testIncrementalCacheUpdates() assertEqualsIgnoreOrder(activeDataFiles.stream().map(AddFileEntry::getPath).collect(Collectors.toSet()), union(originalDataFiles, newDataFiles)); }, ImmutableMultiset.builder() - .addCopies(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .add(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM)) .addCopies(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM), 2) .addCopies(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM), 2) - .addCopies(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM), 1) + .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) .build()); } @@ -730,11 +730,11 @@ public void testTableSnapshotsCacheDisabled() setupTransactionLogAccess(tableName, tableDir, cacheDisabledConfig); }, ImmutableMultiset.builder() - .addCopies(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM), 1) + .add(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) .build()); // With the transaction log cache disabled, when loading the snapshot again, all the needed files will be opened again @@ -743,11 +743,11 @@ public void testTableSnapshotsCacheDisabled() transactionLogAccess.getSnapshot(SESSION, new SchemaTableName("schema", tableName), tableDir, Optional.empty()); }, ImmutableMultiset.builder() - .addCopies(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM), 1) + .add(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) .build()); } @@ -767,11 +767,11 @@ public void testTableSnapshotsActiveDataFilesCache() assertEquals(addFileEntries.size(), 12); }, ImmutableMultiset.builder() - .addCopies(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM), 1) + .add(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) .build()); @@ -801,11 +801,11 @@ public void testFlushSnapshotAndActiveFileCache() assertEquals(addFileEntries.size(), 12); }, ImmutableMultiset.builder() - .addCopies(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM), 1) + .add(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) .build()); @@ -819,11 +819,11 @@ public void testFlushSnapshotAndActiveFileCache() assertEquals(addFileEntries.size(), 12); }, ImmutableMultiset.builder() - .addCopies(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM), 1) + .add(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) .build()); @@ -845,11 +845,11 @@ public void testTableSnapshotsActiveDataFilesCacheDisabled() assertEquals(addFileEntries.size(), 12); }, ImmutableMultiset.builder() - .addCopies(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM), 1) - .addCopies(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM), 1) + .add(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) .build()); From 3573dc9659a37071386f3483bec9a5b7f06a7af4 Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Mon, 16 Oct 2023 12:24:26 +0200 Subject: [PATCH 4/6] Avoid useless scanning of multi-part checkpoint files Once the metadata & protocol entries are found, the scanning of multi-part checkpoint files can be stopped. --- .../plugin/deltalake/transactionlog/TableSnapshot.java | 9 ++++----- .../plugin/deltalake/TestDeltaLakeFileOperations.java | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index cafbc0fcea65..6e527ffdb47f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -291,14 +291,13 @@ private MetadataAndProtocolEntry getCheckpointMetadataAndProtocolEntries( protocol = entry.getProtocol(); } if (metadata != null && protocol != null) { - break; + // No need to read next checkpoint parts if requested info already found + return new MetadataAndProtocolEntry(metadata, protocol); } } } - if (metadata == null || protocol == null) { - throw new TrinoException(DELTA_LAKE_BAD_DATA, "Checkpoint found without metadata and protocol entry: " + checkpoint); - } - return new MetadataAndProtocolEntry(metadata, protocol); + + throw new TrinoException(DELTA_LAKE_BAD_DATA, "Checkpoint found without metadata and protocol entry: " + checkpoint); } private record MetadataAndProtocolEntry(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 45d903681eb5..564568943933 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -720,8 +720,8 @@ public void testReadMultipartCheckpoint() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000001.0000000002.parquet", INPUT_FILE_GET_LENGTH), 6) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000001.0000000002.parquet", INPUT_FILE_NEW_STREAM), 3) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query - .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000002.0000000002.parquet", INPUT_FILE_GET_LENGTH), 6) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query - .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000002.0000000002.parquet", INPUT_FILE_NEW_STREAM), 3) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000002.0000000002.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000002.0000000002.parquet", INPUT_FILE_NEW_STREAM), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000008.json", INPUT_FILE_NEW_STREAM)) .addCopies(new FileOperation(DATA, "no partition", INPUT_FILE_NEW_STREAM), 7) From 6e2c8fe21ae6cd439d570b57bca54fc3e6abab63 Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Mon, 16 Oct 2023 14:56:35 +0200 Subject: [PATCH 5/6] Reuse metadata and protocol entries while retrieving the active files The `metadata` & `protocol` entries are already read (and saved) once when retrieving the table handle. Reuse this information while retrieving the active files for the table. --- .../plugin/deltalake/DeltaLakeMetadata.java | 6 +- .../deltalake/DeltaLakeSplitManager.java | 2 +- .../deltalake/procedure/VacuumProcedure.java | 2 +- .../FileBasedTableStatisticsProvider.java | 2 +- .../transactionlog/TableSnapshot.java | 10 +- .../transactionlog/TransactionLogAccess.java | 28 +++--- .../checkpoint/CheckpointWriterManager.java | 6 +- .../TestDeltaLakeFileOperations.java | 8 +- .../deltalake/TestDeltaLakeSplitManager.java | 2 +- .../deltalake/TestTransactionLogAccess.java | 98 ++++++++++++------- .../deltalake/TestingDeltaLakeUtils.java | 6 +- .../transactionlog/TestTableSnapshot.java | 4 +- 12 files changed, 106 insertions(+), 68 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 6145d123218f..a828fa47b2b3 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -1425,7 +1425,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle } checkUnsupportedWriterFeatures(protocolEntry); - if (!newColumnMetadata.isNullable() && !transactionLogAccess.getActiveFiles(getSnapshot(session, handle), session).isEmpty()) { + if (!newColumnMetadata.isNullable() && !transactionLogAccess.getActiveFiles(getSnapshot(session, handle), handle.getMetadataEntry(), handle.getProtocolEntry(), session).isEmpty()) { throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to add NOT NULL column '%s' for non-empty table: %s.%s", newColumnMetadata.getName(), handle.getSchemaName(), handle.getTableName())); } @@ -3141,7 +3141,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH private void generateMissingFileStatistics(ConnectorSession session, DeltaLakeTableHandle tableHandle, Collection computedStatistics) { Map addFileEntriesWithNoStats = transactionLogAccess.getActiveFiles( - getSnapshot(session, tableHandle), session) + getSnapshot(session, tableHandle), tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session) .stream() .filter(addFileEntry -> addFileEntry.getStats().isEmpty() || addFileEntry.getStats().get().getNumRecords().isEmpty() @@ -3491,7 +3491,7 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl private List getAddFileEntriesMatchingEnforcedPartitionConstraint(ConnectorSession session, DeltaLakeTableHandle tableHandle) { TableSnapshot tableSnapshot = getSnapshot(session, tableHandle); - List validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, session); + List validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session); TupleDomain enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint(); if (enforcedPartitionConstraint.isAll()) { return validDataFiles; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 26fd1eb2d153..c7acaf3d8533 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -156,7 +156,7 @@ private Stream getSplits( catch (IOException e) { throw new RuntimeException(e); } - List validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, session); + List validDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session); TupleDomain enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint(); TupleDomain nonPartitionConstraint = tableHandle.getNonPartitionConstraint(); Domain pathDomain = getPathDomain(nonPartitionConstraint); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index fc86f851d1d6..e751861558fb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -197,7 +197,7 @@ private void doVacuum( // Any remaining file are not live, and not needed to read any "recent" snapshot. List recentVersions = transactionLogAccess.getPastTableVersions(fileSystem, transactionLogDir, threshold, tableSnapshot.getVersion()); Set retainedPaths = Stream.concat( - transactionLogAccess.getActiveFiles(tableSnapshot, session).stream() + transactionLogAccess.getActiveFiles(tableSnapshot, handle.getMetadataEntry(), handle.getProtocolEntry(), session).stream() .map(AddFileEntry::getPath), transactionLogAccess.getJsonEntries( fileSystem, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java index ec080238e7ab..eddcfcb99f84 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/statistics/FileBasedTableStatisticsProvider.java @@ -110,7 +110,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab .filter(column -> predicatedColumnNames.contains(column.getName())) .collect(toImmutableList()); - for (AddFileEntry addEntry : transactionLogAccess.getActiveFiles(tableSnapshot, session)) { + for (AddFileEntry addEntry : transactionLogAccess.getActiveFiles(tableSnapshot, tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), session)) { Optional fileStatistics = addEntry.getStats(); if (fileStatistics.isEmpty()) { // Open source Delta Lake does not collect stats diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index 6e527ffdb47f..638cd9eba215 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -180,7 +180,8 @@ public Stream getCheckpointTransactionLogEntries( CheckpointSchemaManager checkpointSchemaManager, TypeManager typeManager, TrinoFileSystem fileSystem, - FileFormatDataSourceStats stats) + FileFormatDataSourceStats stats, + Optional metadataAndProtocol) throws IOException { if (lastCheckpoint.isEmpty()) { @@ -190,8 +191,7 @@ public Stream getCheckpointTransactionLogEntries( LastCheckpoint checkpoint = lastCheckpoint.get(); // Add entries contain statistics. When struct statistics are used the format of the Parquet file depends on the schema. It is important to use the schema at the time // of the Checkpoint creation, in case the schema has evolved since it was written. - Optional metadataAndProtocol = Optional.empty(); - if (entryTypes.contains(ADD)) { + if (entryTypes.contains(ADD) && metadataAndProtocol.isEmpty()) { metadataAndProtocol = Optional.of(getCheckpointMetadataAndProtocolEntries( session, checkpointSchemaManager, @@ -300,9 +300,9 @@ private MetadataAndProtocolEntry getCheckpointMetadataAndProtocolEntries( throw new TrinoException(DELTA_LAKE_BAD_DATA, "Checkpoint found without metadata and protocol entry: " + checkpoint); } - private record MetadataAndProtocolEntry(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) + public record MetadataAndProtocolEntry(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { - private MetadataAndProtocolEntry + public MetadataAndProtocolEntry { requireNonNull(metadataEntry, "metadataEntry is null"); requireNonNull(protocolEntry, "protocolEntry is null"); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index cb2c0d7b289f..36bc1a1c24a6 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -30,6 +30,7 @@ import io.trino.parquet.ParquetReaderOptions; import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.DeltaLakeConfig; +import io.trino.plugin.deltalake.transactionlog.TableSnapshot.MetadataAndProtocolEntry; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint; @@ -255,7 +256,7 @@ public MetadataEntry getMetadataEntry(TableSnapshot tableSnapshot, ConnectorSess .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable())); } - public List getActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session) + public List getActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session) { try { TableVersion tableVersion = new TableVersion(new TableLocation(tableSnapshot.getTable(), tableSnapshot.getTableLocation()), tableSnapshot.getVersion()); @@ -285,7 +286,7 @@ public List getActiveFiles(TableSnapshot tableSnapshot, ConnectorS } } - List activeFiles = loadActiveFiles(tableSnapshot, session); + List activeFiles = loadActiveFiles(tableSnapshot, metadataEntry, protocolEntry, session); return new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles); }); return cacheEntry.getActiveFiles(); @@ -295,17 +296,22 @@ public List getActiveFiles(TableSnapshot tableSnapshot, ConnectorS } } - private List loadActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session) + private List loadActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session) { - try (Stream entries = getEntries( - tableSnapshot, - ImmutableSet.of(ADD), - this::activeAddEntries, + List transactions = tableSnapshot.getTransactions(); + try (Stream checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries( session, + ImmutableSet.of(ADD), + checkpointSchemaManager, + typeManager, fileSystemFactory.create(session), - fileFormatDataSourceStats)) { - List activeFiles = entries.collect(toImmutableList()); - return activeFiles; + fileFormatDataSourceStats, + Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)))) { + return activeAddEntries(checkpointEntries, transactions) + .collect(toImmutableList()); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error reading transaction log for " + tableSnapshot.getTable(), e); } } @@ -439,7 +445,7 @@ private Stream getEntries( try { List transactions = tableSnapshot.getTransactions(); Stream checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries( - session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats); + session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats, Optional.empty()); return entryMapper.apply( checkpointEntries, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java index 0e350ed1b5ab..1aa70f2c3792 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java @@ -99,7 +99,8 @@ public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot) checkpointSchemaManager, typeManager, fileSystem, - fileFormatDataSourceStats) + fileFormatDataSourceStats, + Optional.empty()) .collect(toOptional()); if (checkpointMetadataLogEntry.isPresent()) { // TODO HACK: this call is required only to ensure that cachedMetadataEntry is set in snapshot (https://github.com/trinodb/trino/issues/12032), @@ -119,7 +120,8 @@ public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot) checkpointSchemaManager, typeManager, fileSystem, - fileFormatDataSourceStats) + fileFormatDataSourceStats, + Optional.empty()) .forEach(checkpointBuilder::addLogEntry); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 564568943933..8eb0290c1c4d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -194,8 +194,8 @@ public void testReadTableCheckpointInterval() "TABLE test_read_checkpoint", ImmutableMultiset.builder() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) - .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 6) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query - .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 3) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM)) .addCopies(new FileOperation(DATA, "no partition", INPUT_FILE_NEW_STREAM), 2) .build()); @@ -718,8 +718,8 @@ public void testReadMultipartCheckpoint() assertFileSystemAccesses("SELECT * FROM " + tableName, ImmutableMultiset.builder() .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) - .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000001.0000000002.parquet", INPUT_FILE_GET_LENGTH), 6) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query - .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000001.0000000002.parquet", INPUT_FILE_NEW_STREAM), 3) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000001.0000000002.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000001.0000000002.parquet", INPUT_FILE_NEW_STREAM), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000002.0000000002.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .addCopies(new FileOperation(CHECKPOINT, "00000000000000000006.checkpoint.0000000002.0000000002.parquet", INPUT_FILE_NEW_STREAM), 2) // TODO (https://github.com/trinodb/trino/issues/18916) should be checked once per query .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", INPUT_FILE_NEW_STREAM)) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index d90f057ecdfe..39b35a2be7ec 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -180,7 +180,7 @@ public TableSnapshot getSnapshot(ConnectorSession session, SchemaTableName table } @Override - public List getActiveFiles(TableSnapshot tableSnapshot, ConnectorSession session) + public List getActiveFiles(TableSnapshot tableSnapshot, MetadataEntry metadataEntry, ProtocolEntry protocolEntry, ConnectorSession session) { return addFileEntries; } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index a7e23a1b0fb1..d2ef7fd718ab 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -200,7 +200,9 @@ public void testGetActiveAddEntries() { setupTransactionLogAccessFromResources("person", "databricks73/person"); - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); Set paths = addFileEntries .stream() .map(AddFileEntry::getPath) @@ -230,7 +232,9 @@ public void testAddFileEntryUppercase() { setupTransactionLogAccessFromResources("uppercase_columns", "databricks73/uppercase_columns"); - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); AddFileEntry addFileEntry = addFileEntries .stream() .filter(entry -> entry.getPath().equals("ALA=1/part-00000-20a863e0-890d-4776-8825-f9dccc8973ba.c000.snappy.parquet")) @@ -252,7 +256,9 @@ public void testAddEntryPruning() // - Added in the parquet checkpoint but removed in a JSON commit // - Added in a JSON commit and removed in a later JSON commit setupTransactionLogAccessFromResources("person_test_pruning", "databricks73/person_test_pruning"); - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); Set paths = addFileEntries .stream() .map(AddFileEntry::getPath) @@ -267,7 +273,9 @@ public void testAddEntryOverrides() throws Exception { setupTransactionLogAccessFromResources("person_test_pruning", "databricks73/person_test_pruning"); - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); // Test data contains two entries which are added multiple times, the most up to date one should be the only one in the active list List overwrittenPaths = ImmutableList.of( @@ -288,7 +296,9 @@ public void testAddRemoveAdd() throws Exception { setupTransactionLogAccessFromResources("person_test_pruning", "databricks73/person_test_pruning"); - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); // Test data contains an entry added by the parquet checkpoint, removed by a JSON action, and then added back by a later JSON action List activeEntries = addFileEntries.stream() @@ -366,8 +376,9 @@ public void testAllGetActiveAddEntries(String tableName, String resourcePath) throws Exception { setupTransactionLogAccessFromResources(tableName, resourcePath); - - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); Set paths = addFileEntries .stream() .map(AddFileEntry::getPath) @@ -445,8 +456,9 @@ public void testUpdatingTailEntriesNoCheckpoint() File resourceDir = new File(getClass().getClassLoader().getResource("databricks73/person/_delta_log").toURI()); copyTransactionLogEntry(0, 7, resourceDir, transactionLogDir); setupTransactionLogAccess(tableName, tableDir.toURI().toString()); - - List activeDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + List activeDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); Set dataFiles = ImmutableSet.of( "age=42/part-00000-b82d8859-84a0-4f05-872c-206b07dd54f0.c000.snappy.parquet", @@ -460,7 +472,7 @@ public void testUpdatingTailEntriesNoCheckpoint() copyTransactionLogEntry(7, 9, resourceDir, transactionLogDir); TableSnapshot updatedSnapshot = transactionLogAccess.getSnapshot(SESSION, new SchemaTableName("schema", tableName), tableDir.toURI().toString(), Optional.empty()); - activeDataFiles = transactionLogAccess.getActiveFiles(updatedSnapshot, SESSION); + activeDataFiles = transactionLogAccess.getActiveFiles(updatedSnapshot, metadataEntry, protocolEntry, SESSION); transactionLogAccess.cleanupQuery(SESSION); dataFiles = ImmutableSet.of( @@ -489,7 +501,9 @@ public void testLoadingTailEntriesPastCheckpoint() copyTransactionLogEntry(0, 8, resourceDir, transactionLogDir); setupTransactionLogAccess(tableName, tableDir.toURI().toString()); - List activeDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + List activeDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); transactionLogAccess.cleanupQuery(SESSION); Set dataFiles = ImmutableSet.of( @@ -505,7 +519,7 @@ public void testLoadingTailEntriesPastCheckpoint() copyTransactionLogEntry(8, 12, resourceDir, transactionLogDir); Files.copy(new File(resourceDir, LAST_CHECKPOINT_FILENAME).toPath(), new File(transactionLogDir, LAST_CHECKPOINT_FILENAME).toPath()); TableSnapshot updatedSnapshot = transactionLogAccess.getSnapshot(SESSION, new SchemaTableName("schema", tableName), tableDir.toURI().toString(), Optional.empty()); - activeDataFiles = transactionLogAccess.getActiveFiles(updatedSnapshot, SESSION); + activeDataFiles = transactionLogAccess.getActiveFiles(updatedSnapshot, metadataEntry, protocolEntry, SESSION); transactionLogAccess.cleanupQuery(SESSION); dataFiles = ImmutableSet.of( @@ -536,6 +550,8 @@ public void testIncrementalCacheUpdates() File resourceDir = new File(getClass().getClassLoader().getResource("databricks73/person/_delta_log").toURI()); copyTransactionLogEntry(0, 12, resourceDir, transactionLogDir); Files.copy(new File(resourceDir, LAST_CHECKPOINT_FILENAME).toPath(), new File(transactionLogDir, LAST_CHECKPOINT_FILENAME).toPath()); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); Set originalDataFiles = ImmutableSet.of( "age=42/part-00000-b26c891a-7288-4d96-9d3b-bef648f12a34.c000.snappy.parquet", @@ -552,13 +568,13 @@ public void testIncrementalCacheUpdates() assertFileSystemAccesses( () -> { setupTransactionLogAccess(tableName, tableDir.toURI().toString()); - List activeDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + List activeDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); assertEqualsIgnoreOrder(activeDataFiles.stream().map(AddFileEntry::getPath).collect(Collectors.toSet()), originalDataFiles); }, ImmutableMultiset.builder() .add(new FileOperation("_last_checkpoint", INPUT_FILE_NEW_STREAM)) - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 2) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? + .add(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM)) .add(new FileOperation("00000000000000000011.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) .build()); @@ -570,7 +586,7 @@ public void testIncrementalCacheUpdates() assertFileSystemAccesses( () -> { TableSnapshot updatedTableSnapshot = transactionLogAccess.getSnapshot(SESSION, new SchemaTableName("schema", tableName), tableDir.toURI().toString(), Optional.empty()); - List activeDataFiles = transactionLogAccess.getActiveFiles(updatedTableSnapshot, SESSION); + List activeDataFiles = transactionLogAccess.getActiveFiles(updatedTableSnapshot, metadataEntry, protocolEntry, SESSION); transactionLogAccess.cleanupQuery(SESSION); assertEqualsIgnoreOrder(activeDataFiles.stream().map(AddFileEntry::getPath).collect(Collectors.toSet()), union(originalDataFiles, newDataFiles)); }, @@ -597,7 +613,9 @@ public void testSnapshotsAreConsistent() Files.copy(new File(resourceDir, LAST_CHECKPOINT_FILENAME).toPath(), new File(transactionLogDir, LAST_CHECKPOINT_FILENAME).toPath()); setupTransactionLogAccess(tableName, tableDir.toURI().toString()); - List expectedDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + List expectedDataFiles = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); transactionLogAccess.cleanupQuery(SESSION); copyTransactionLogEntry(12, 14, resourceDir, transactionLogDir); @@ -605,8 +623,8 @@ public void testSnapshotsAreConsistent() "age=28/part-00000-40dd1707-1d42-4328-a59a-21f5c945fe60.c000.snappy.parquet", "age=29/part-00000-3794c463-cb0c-4beb-8d07-7cc1e3b5920f.c000.snappy.parquet"); TableSnapshot updatedTableSnapshot = transactionLogAccess.getSnapshot(SESSION, new SchemaTableName("schema", tableName), tableDir.toURI().toString(), Optional.empty()); - List allDataFiles = transactionLogAccess.getActiveFiles(updatedTableSnapshot, SESSION); - List dataFilesWithFixedVersion = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + List allDataFiles = transactionLogAccess.getActiveFiles(updatedTableSnapshot, metadataEntry, protocolEntry, SESSION); + List dataFilesWithFixedVersion = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); for (String newFilePath : newDataFiles) { assertTrue(allDataFiles.stream().anyMatch(entry -> entry.getPath().equals(newFilePath))); assertTrue(dataFilesWithFixedVersion.stream().noneMatch(entry -> entry.getPath().equals(newFilePath))); @@ -679,7 +697,9 @@ public void testParquetStructStatistics() String tableName = "parquet_struct_statistics"; setupTransactionLogAccess(tableName, getClass().getClassLoader().getResource("databricks73/pruning/" + tableName).toURI().toString()); - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); AddFileEntry addFileEntry = addFileEntries.stream() .filter(entry -> entry.getPath().equalsIgnoreCase("part-00000-0e22455f-5650-442f-a094-e1a8b7ed2271-c000.snappy.parquet")) @@ -759,11 +779,13 @@ public void testTableSnapshotsActiveDataFilesCache() String tableDir = getClass().getClassLoader().getResource("databricks73/" + tableName).toURI().toString(); DeltaLakeConfig shortLivedActiveDataFilesCacheConfig = new DeltaLakeConfig(); shortLivedActiveDataFilesCacheConfig.setDataFileCacheTtl(new Duration(10, TimeUnit.MINUTES)); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); assertFileSystemAccesses( () -> { setupTransactionLogAccess(tableName, tableDir, shortLivedActiveDataFilesCacheConfig); - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); assertEquals(addFileEntries.size(), 12); }, ImmutableMultiset.builder() @@ -772,14 +794,14 @@ public void testTableSnapshotsActiveDataFilesCache() .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 2) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? + .add(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM)) .build()); // The internal data cache should still contain the data files for the table assertFileSystemAccesses( () -> { - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); assertEquals(addFileEntries.size(), 12); }, ImmutableMultiset.of()); @@ -793,11 +815,13 @@ public void testFlushSnapshotAndActiveFileCache() String tableDir = getClass().getClassLoader().getResource("databricks73/" + tableName).toURI().toString(); DeltaLakeConfig shortLivedActiveDataFilesCacheConfig = new DeltaLakeConfig(); shortLivedActiveDataFilesCacheConfig.setDataFileCacheTtl(new Duration(10, TimeUnit.MINUTES)); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); assertFileSystemAccesses( () -> { setupTransactionLogAccess(tableName, tableDir, shortLivedActiveDataFilesCacheConfig); - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); assertEquals(addFileEntries.size(), 12); }, ImmutableMultiset.builder() @@ -806,8 +830,8 @@ public void testFlushSnapshotAndActiveFileCache() .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 2) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? + .add(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM)) .build()); // Flush all cache and then load snapshot and get active files @@ -815,7 +839,7 @@ public void testFlushSnapshotAndActiveFileCache() assertFileSystemAccesses( () -> { transactionLogAccess.getSnapshot(SESSION, new SchemaTableName("schema", tableName), tableDir, Optional.empty()); - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); assertEquals(addFileEntries.size(), 12); }, ImmutableMultiset.builder() @@ -824,8 +848,8 @@ public void testFlushSnapshotAndActiveFileCache() .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 2) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? + .add(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM)) .build()); } @@ -837,11 +861,13 @@ public void testTableSnapshotsActiveDataFilesCacheDisabled() String tableDir = getClass().getClassLoader().getResource("databricks73/" + tableName).toURI().toString(); DeltaLakeConfig shortLivedActiveDataFilesCacheConfig = new DeltaLakeConfig(); shortLivedActiveDataFilesCacheConfig.setDataFileCacheTtl(new Duration(0, TimeUnit.SECONDS)); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); assertFileSystemAccesses( () -> { setupTransactionLogAccess(tableName, tableDir, shortLivedActiveDataFilesCacheConfig); - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); assertEquals(addFileEntries.size(), 12); }, ImmutableMultiset.builder() @@ -850,20 +876,20 @@ public void testTableSnapshotsActiveDataFilesCacheDisabled() .add(new FileOperation("00000000000000000012.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation("00000000000000000013.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation("00000000000000000014.json", INPUT_FILE_NEW_STREAM)) - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 2) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? + .add(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM)) .build()); // With no caching for the transaction log entries, when loading the snapshot again, // the checkpoint file will be read again assertFileSystemAccesses( () -> { - List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, SESSION); + List addFileEntries = transactionLogAccess.getActiveFiles(tableSnapshot, metadataEntry, protocolEntry, SESSION); assertEquals(addFileEntries.size(), 12); }, ImmutableMultiset.builder() - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? - .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 2) // TODO (https://github.com/trinodb/trino/issues/16775) why not e.g. once? + .add(new FileOperation("00000000000000000010.checkpoint.parquet", INPUT_FILE_NEW_STREAM)) .build()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java index 18468f184be3..41d04cb1f002 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java @@ -14,6 +14,8 @@ package io.trino.plugin.deltalake; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; +import io.trino.plugin.deltalake.transactionlog.MetadataEntry; +import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; import io.trino.spi.connector.SchemaTableName; @@ -48,7 +50,9 @@ public static List getTableActiveFiles(TransactionLogAccess transa transactionLogAccess.flushCache(); TableSnapshot snapshot = transactionLogAccess.getSnapshot(SESSION, dummyTable, tableLocation, Optional.empty()); - List activeFiles = transactionLogAccess.getActiveFiles(snapshot, SESSION); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(snapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, snapshot); + List activeFiles = transactionLogAccess.getActiveFiles(snapshot, metadataEntry, protocolEntry, SESSION); transactionLogAccess.cleanupQuery(SESSION); return activeFiles; } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java index af5e8a6aa508..b474ae24a426 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java @@ -128,7 +128,7 @@ public void readsCheckpointFile() domainCompactionThreshold); tableSnapshot.setCachedMetadata(Optional.of(new MetadataEntry("id", "name", "description", null, "schema", ImmutableList.of(), ImmutableMap.of(), 0))); try (Stream stream = tableSnapshot.getCheckpointTransactionLogEntries( - SESSION, ImmutableSet.of(ADD), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats())) { + SESSION, ImmutableSet.of(ADD), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats(), Optional.empty())) { List entries = stream.collect(toImmutableList()); assertThat(entries).hasSize(9); @@ -170,7 +170,7 @@ public void readsCheckpointFile() // lets read two entry types in one call; add and protocol try (Stream stream = tableSnapshot.getCheckpointTransactionLogEntries( - SESSION, ImmutableSet.of(ADD, PROTOCOL), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats())) { + SESSION, ImmutableSet.of(ADD, PROTOCOL), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats(), Optional.empty())) { List entries = stream.collect(toImmutableList()); assertThat(entries).hasSize(10); From 2151f4545ca9bcb893724b8a6ce8633c556abd54 Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Tue, 17 Oct 2023 10:30:57 +0200 Subject: [PATCH 6/6] Require metadata and protocol entries to be provided when retrieving add files --- .../transactionlog/TableSnapshot.java | 56 +------------------ .../checkpoint/CheckpointWriterManager.java | 34 +++++++---- .../transactionlog/TestTableSnapshot.java | 30 +++++++--- 3 files changed, 49 insertions(+), 71 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index 638cd9eba215..5069517239a8 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -14,7 +14,6 @@ package io.trino.plugin.deltalake.transactionlog; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; @@ -37,14 +36,12 @@ import java.util.Set; import java.util.stream.Stream; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Streams.stream; -import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; -import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.METADATA; -import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -191,14 +188,8 @@ public Stream getCheckpointTransactionLogEntries( LastCheckpoint checkpoint = lastCheckpoint.get(); // Add entries contain statistics. When struct statistics are used the format of the Parquet file depends on the schema. It is important to use the schema at the time // of the Checkpoint creation, in case the schema has evolved since it was written. - if (entryTypes.contains(ADD) && metadataAndProtocol.isEmpty()) { - metadataAndProtocol = Optional.of(getCheckpointMetadataAndProtocolEntries( - session, - checkpointSchemaManager, - typeManager, - fileSystem, - stats, - checkpoint)); + if (entryTypes.contains(ADD)) { + checkState(metadataAndProtocol.isPresent(), "metadata and protocol information is needed to process the add log entries"); } Stream resultStream = Stream.empty(); @@ -259,47 +250,6 @@ private Iterator getCheckpointTransactionLogEntrie domainCompactionThreshold); } - private MetadataAndProtocolEntry getCheckpointMetadataAndProtocolEntries( - ConnectorSession session, - CheckpointSchemaManager checkpointSchemaManager, - TypeManager typeManager, - TrinoFileSystem fileSystem, - FileFormatDataSourceStats stats, - LastCheckpoint checkpoint) - throws IOException - { - MetadataEntry metadata = null; - ProtocolEntry protocol = null; - for (Location checkpointPath : getCheckpointPartPaths(checkpoint)) { - TrinoInputFile checkpointFile = fileSystem.newInputFile(checkpointPath); - Iterator entries = getCheckpointTransactionLogEntries( - session, - ImmutableSet.of(METADATA, PROTOCOL), - Optional.empty(), - Optional.empty(), - checkpointSchemaManager, - typeManager, - stats, - checkpoint, - checkpointFile); - while (entries.hasNext()) { - DeltaLakeTransactionLogEntry entry = entries.next(); - if (metadata == null && entry.getMetaData() != null) { - metadata = entry.getMetaData(); - } - if (protocol == null && entry.getProtocol() != null) { - protocol = entry.getProtocol(); - } - if (metadata != null && protocol != null) { - // No need to read next checkpoint parts if requested info already found - return new MetadataAndProtocolEntry(metadata, protocol); - } - } - } - - throw new TrinoException(DELTA_LAKE_BAD_DATA, "Checkpoint found without metadata and protocol entry: " + checkpoint); - } - public record MetadataAndProtocolEntry(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) { public MetadataAndProtocolEntry diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java index 1aa70f2c3792..44ebc5d96aed 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriterManager.java @@ -22,9 +22,11 @@ import io.trino.filesystem.TrinoOutputFile; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; +import io.trino.plugin.deltalake.transactionlog.TableSnapshot.MetadataAndProtocolEntry; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.NodeVersion; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.type.TypeManager; @@ -32,11 +34,12 @@ import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; +import java.util.List; import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.MoreCollectors.toOptional; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.LAST_CHECKPOINT_FILENAME; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; @@ -92,17 +95,19 @@ public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot) CheckpointBuilder checkpointBuilder = new CheckpointBuilder(); TrinoFileSystem fileSystem = fileSystemFactory.create(session); - Optional checkpointMetadataLogEntry = snapshot + List checkpointLogEntries = snapshot .getCheckpointTransactionLogEntries( session, - ImmutableSet.of(METADATA), + ImmutableSet.of(METADATA, PROTOCOL), checkpointSchemaManager, typeManager, fileSystem, fileFormatDataSourceStats, Optional.empty()) - .collect(toOptional()); - if (checkpointMetadataLogEntry.isPresent()) { + .filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null) + .collect(toImmutableList()); + + if (!checkpointLogEntries.isEmpty()) { // TODO HACK: this call is required only to ensure that cachedMetadataEntry is set in snapshot (https://github.com/trinodb/trino/issues/12032), // so we can read add entries below this should be reworked so we pass metadata entry explicitly to getCheckpointTransactionLogEntries, // and we should get rid of `setCachedMetadata` in TableSnapshot to make it immutable. @@ -110,18 +115,27 @@ public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot) transactionLogAccess.getMetadataEntry(snapshot, session); // register metadata entry in writer - checkState(checkpointMetadataLogEntry.get().getMetaData() != null, "metaData not present in log entry"); - checkpointBuilder.addLogEntry(checkpointMetadataLogEntry.get()); + DeltaLakeTransactionLogEntry metadataLogEntry = checkpointLogEntries.stream() + .filter(logEntry -> logEntry.getMetaData() != null) + .findFirst() + .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + snapshot.getTable())); + DeltaLakeTransactionLogEntry protocolLogEntry = checkpointLogEntries.stream() + .filter(logEntry -> logEntry.getProtocol() != null) + .findFirst() + .orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + snapshot.getTable())); + + checkpointBuilder.addLogEntry(metadataLogEntry); + checkpointBuilder.addLogEntry(protocolLogEntry); // read remaining entries from checkpoint register them in writer snapshot.getCheckpointTransactionLogEntries( session, - ImmutableSet.of(PROTOCOL, TRANSACTION, ADD, REMOVE, COMMIT), + ImmutableSet.of(TRANSACTION, ADD, REMOVE, COMMIT), checkpointSchemaManager, typeManager, fileSystem, fileFormatDataSourceStats, - Optional.empty()) + Optional.of(new MetadataAndProtocolEntry(metadataLogEntry.getMetaData(), protocolLogEntry.getProtocol()))) .forEach(checkpointBuilder::addLogEntry); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java index b474ae24a426..3b6186fe7654 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java @@ -14,8 +14,6 @@ package io.trino.plugin.deltalake.transactionlog; import com.google.common.collect.HashMultiset; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multiset; @@ -24,11 +22,14 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.parquet.ParquetReaderOptions; +import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TypeManager; +import io.trino.testing.TestingConnectorContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -43,6 +44,8 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_NEW_STREAM; +import static io.trino.plugin.deltalake.transactionlog.TableSnapshot.MetadataAndProtocolEntry; +import static io.trino.plugin.deltalake.transactionlog.TableSnapshot.load; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.ADD; import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.PROTOCOL; @@ -88,7 +91,7 @@ public void testOnlyReadsTrailingJsonFiles() assertFileSystemAccesses( () -> { Optional lastCheckpoint = readLastCheckpoint(trackingFileSystem, tableLocation); - tableSnapshot.set(TableSnapshot.load( + tableSnapshot.set(load( new SchemaTableName("schema", "person"), lastCheckpoint, trackingFileSystem, @@ -118,7 +121,7 @@ public void readsCheckpointFile() throws IOException { Optional lastCheckpoint = readLastCheckpoint(trackingFileSystem, tableLocation); - TableSnapshot tableSnapshot = TableSnapshot.load( + TableSnapshot tableSnapshot = load( new SchemaTableName("schema", "person"), lastCheckpoint, trackingFileSystem, @@ -126,9 +129,20 @@ public void readsCheckpointFile() parquetReaderOptions, true, domainCompactionThreshold); - tableSnapshot.setCachedMetadata(Optional.of(new MetadataEntry("id", "name", "description", null, "schema", ImmutableList.of(), ImmutableMap.of(), 0))); + TestingConnectorContext context = new TestingConnectorContext(); + TypeManager typeManager = context.getTypeManager(); + TransactionLogAccess transactionLogAccess = new TransactionLogAccess( + typeManager, + new CheckpointSchemaManager(typeManager), + new DeltaLakeConfig(), + new FileFormatDataSourceStats(), + trackingFileSystemFactory, + new ParquetReaderConfig()); + MetadataEntry metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, SESSION); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(SESSION, tableSnapshot); + tableSnapshot.setCachedMetadata(Optional.of(metadataEntry)); try (Stream stream = tableSnapshot.getCheckpointTransactionLogEntries( - SESSION, ImmutableSet.of(ADD), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats(), Optional.empty())) { + SESSION, ImmutableSet.of(ADD), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats(), Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)))) { List entries = stream.collect(toImmutableList()); assertThat(entries).hasSize(9); @@ -170,7 +184,7 @@ public void readsCheckpointFile() // lets read two entry types in one call; add and protocol try (Stream stream = tableSnapshot.getCheckpointTransactionLogEntries( - SESSION, ImmutableSet.of(ADD, PROTOCOL), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats(), Optional.empty())) { + SESSION, ImmutableSet.of(ADD, PROTOCOL), checkpointSchemaManager, TESTING_TYPE_MANAGER, trackingFileSystem, new FileFormatDataSourceStats(), Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)))) { List entries = stream.collect(toImmutableList()); assertThat(entries).hasSize(10); @@ -218,7 +232,7 @@ public void testMaxTransactionId() throws IOException { Optional lastCheckpoint = readLastCheckpoint(trackingFileSystem, tableLocation); - TableSnapshot tableSnapshot = TableSnapshot.load( + TableSnapshot tableSnapshot = load( new SchemaTableName("schema", "person"), lastCheckpoint, trackingFileSystem,