Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
@Override
public boolean isFinished()
{
return changelogScanIterator != null && !changelogScanIterator.hasNext();
return changelogScanIterator != null && !changelogScanIterator.hasNext() && !fileTasksIterator.hasNext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,9 @@ private static boolean checkOrcFileSorting(Supplier<OrcDataSource> dataSourceSup
@SuppressWarnings({"unchecked", "rawtypes"})
public static boolean checkParquetFileSorting(TrinoInputFile inputFile, String sortColumnName)
{
ParquetMetadata parquetMetadata;
ParquetMetadata parquetMetadata = getParquetFileMetadata(inputFile);
List<BlockMetadata> blocks;
try {
parquetMetadata = MetadataReader.readFooter(
new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()),
Optional.empty());
blocks = parquetMetadata.getBlocks();
}
catch (IOException e) {
Expand Down Expand Up @@ -216,4 +213,16 @@ public static Map<String, Long> getMetadataFileAndUpdatedMillis(TrinoFileSystem
}
return metadataFiles;
}

public static ParquetMetadata getParquetFileMetadata(TrinoInputFile inputFile)
{
try {
return MetadataReader.readFooter(
new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()),
Optional.empty());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,32 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.Iterables;
import io.trino.Session;
import io.trino.execution.QueryManagerConfig;
import io.trino.filesystem.Location;
import io.trino.operator.OperatorStats;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.QueryRunner.MaterializedResultWithPlan;
import io.trino.testing.sql.TestTable;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Test;

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting;
import static io.trino.plugin.iceberg.IcebergTestUtils.getParquetFileMetadata;
import static io.trino.plugin.iceberg.IcebergTestUtils.withSmallRowGroups;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;
import static org.assertj.core.api.Assertions.assertThat;

public class TestIcebergParquetConnectorTest
Expand Down Expand Up @@ -146,6 +154,56 @@ public void testPushdownPredicateToParquetAfterColumnRename()
}
}

@Test
void testTableChangesOnMultiRowGroups()
throws Exception
{
try (TestTable table = newTrinoTable(
"test_table_changes_function_multi_row_groups_",
"AS SELECT orderkey, partkey, suppkey FROM tpch.tiny.lineitem WITH NO DATA")) {
long initialSnapshot = getMostRecentSnapshotId(table.getName());
assertUpdate(
withSmallRowGroups(getSession()),
"INSERT INTO %s SELECT orderkey, partkey, suppkey FROM tpch.tiny.lineitem".formatted(table.getName()),
60175L);
long snapshotAfterInsert = getMostRecentSnapshotId(table.getName());
DateTimeFormatter instantMillisFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSVV").withZone(UTC);
String snapshotAfterInsertTime = getSnapshotTime(table.getName(), snapshotAfterInsert).format(instantMillisFormatter);

// make sure splits are processed in more than one batch
// Decrease parquet row groups size or add more columns if this test fails
String filePath = getOnlyTableFilePath(table.getName());
ParquetMetadata parquetMetadata = getParquetFileMetadata(fileSystem.newInputFile(Location.of(filePath)));
int blocksSize = parquetMetadata.getBlocks().size();
int splitBatchSize = new QueryManagerConfig().getScheduleSplitBatchSize();
assertThat(blocksSize > splitBatchSize && blocksSize % splitBatchSize != 0).isTrue();

assertQuery(
"""
SELECT orderkey, partkey, suppkey, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal
FROM TABLE(system.table_changes(CURRENT_SCHEMA, '%s', %s, %s))
""".formatted(table.getName(), initialSnapshot, snapshotAfterInsert),
"SELECT orderkey, partkey, suppkey, 'insert', %s, '%s', 0 FROM lineitem".formatted(snapshotAfterInsert, snapshotAfterInsertTime));
}
}

private String getOnlyTableFilePath(String tableName)
{
return (String) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumnAsSet());
}

private long getMostRecentSnapshotId(String tableName)
{
return (long) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName))
.getOnlyColumnAsSet());
}

private ZonedDateTime getSnapshotTime(String tableName, long snapshotId)
{
return (ZonedDateTime) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT committed_at FROM \"%s$snapshots\" WHERE snapshot_id = %s", tableName, snapshotId))
.getOnlyColumnAsSet());
}

@Override
protected boolean isFileSorted(String path, String sortColumnName)
{
Expand Down
Loading