1313 */
1414package io .trino .plugin .deltalake .transactionlog .reader ;
1515
16- import com .google .common .collect .ImmutableList ;
1716import io .airlift .units .DataSize ;
18- import io .trino .filesystem .TrinoFileSystem ;
1917import io .trino .filesystem .TrinoFileSystemFactory ;
20- import io .trino .plugin .deltalake .transactionlog .MissingTransactionLogException ;
21- import io .trino .plugin .deltalake .transactionlog .Transaction ;
22- import io .trino .plugin .deltalake .transactionlog .TransactionLogEntries ;
2318import io .trino .plugin .deltalake .transactionlog .checkpoint .TransactionLogTail ;
2419import io .trino .spi .connector .ConnectorSession ;
2520
2621import java .io .IOException ;
2722import java .util .Optional ;
2823
29- import static com .google .common .base .Preconditions .checkArgument ;
30- import static io .trino .plugin .deltalake .transactionlog .TransactionLogUtil .getTransactionLogDir ;
31- import static io .trino .plugin .deltalake .transactionlog .TransactionLogUtil .getTransactionLogJsonEntryPath ;
32- import static io .trino .plugin .deltalake .transactionlog .checkpoint .TransactionLogTail .getEntriesFromJson ;
3324import static java .util .Objects .requireNonNull ;
3425
3526public class FileSystemTransactionLogReader
@@ -52,41 +43,6 @@ public TransactionLogTail loadNewTail(
5243 DataSize transactionLogMaxCachedFileSize )
5344 throws IOException
5445 {
55- ImmutableList .Builder <Transaction > entriesBuilder = ImmutableList .builder ();
56-
57- if (startVersion .isPresent () && endVersion .isPresent () && startVersion .get ().equals (endVersion .get ())) {
58- // This is time travel to a specific checkpoint. No need to read transaction log files.
59- return new TransactionLogTail (entriesBuilder .build (), startVersion .get ());
60- }
61-
62- // TODO: check if we should use startVersion or endVersion, in the case that startVersion is not present this could returns empty entries which is not correct
63- long version = startVersion .orElse (0L );
64- long entryNumber = startVersion .map (start -> start + 1 ).orElse (0L );
65- checkArgument (endVersion .isEmpty () || entryNumber <= endVersion .get (), "Invalid start/end versions: %s, %s" , startVersion , endVersion );
66-
67- String transactionLogDir = getTransactionLogDir (tableLocation );
68- TrinoFileSystem fileSystem = fileSystemFactory .create (session );
69-
70- boolean endOfTail = false ;
71- while (!endOfTail ) {
72- Optional <TransactionLogEntries > results = getEntriesFromJson (entryNumber , transactionLogDir , fileSystem , transactionLogMaxCachedFileSize );
73- if (results .isPresent ()) {
74- entriesBuilder .add (new Transaction (entryNumber , results .get ()));
75- version = entryNumber ;
76- entryNumber ++;
77- }
78- else {
79- if (endVersion .isPresent ()) {
80- throw new MissingTransactionLogException (getTransactionLogJsonEntryPath (transactionLogDir , entryNumber ).toString ());
81- }
82- endOfTail = true ;
83- }
84-
85- if (endVersion .isPresent () && version == endVersion .get ()) {
86- endOfTail = true ;
87- }
88- }
89-
90- return new TransactionLogTail (entriesBuilder .build (), version );
46+ return TransactionLogTail .loadNewTail (fileSystemFactory .create (session ), tableLocation , startVersion , endVersion , transactionLogMaxCachedFileSize );
9147 }
9248}
0 commit comments