Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,35 @@
*/
package io.trino.plugin.iceberg;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.connector.MockConnectorFactory;
import io.trino.connector.MockConnectorPlugin;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.spi.Page;
import io.trino.spi.QueryId;
import io.trino.spi.SplitWeight;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.function.FunctionProvider;
import io.trino.spi.function.table.AbstractConnectorTableFunction;
import io.trino.spi.function.table.Argument;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.function.table.Descriptor;
import io.trino.spi.function.table.TableFunctionAnalysis;
import io.trino.spi.function.table.TableFunctionProcessorProvider;
import io.trino.spi.function.table.TableFunctionProcessorState;
import io.trino.spi.function.table.TableFunctionSplitProcessor;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.sql.tree.ExplainType;
import io.trino.testing.AbstractTestQueryFramework;
Expand All @@ -31,16 +53,23 @@
import org.assertj.core.api.Condition;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;

import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.trino.SystemSessionProperties.LEGACY_MATERIALIZED_VIEW_GRACE_PERIOD;
import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static io.trino.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE;
import static io.trino.spi.function.table.TableFunctionProcessorState.Finished.FINISHED;
import static io.trino.spi.function.table.TableFunctionProcessorState.Processed.produced;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION;
import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DROP_MATERIALIZED_VIEW;
import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.REFRESH_MATERIALIZED_VIEW;
Expand All @@ -51,11 +80,7 @@
import static org.assertj.core.api.Assertions.anyOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;

@TestInstance(PER_CLASS)
@Execution(CONCURRENT)
public abstract class BaseIcebergMaterializedViewTest
extends AbstractTestQueryFramework
{
Expand All @@ -72,6 +97,29 @@ public void setUp()

assertUpdate("CREATE TABLE base_table2 (_varchar VARCHAR, _bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_bigint', '_date'])");
assertUpdate("INSERT INTO base_table2 VALUES ('a', 0, DATE '2019-09-08'), ('a', 1, DATE '2019-09-08'), ('a', 0, DATE '2019-09-09')", 3);

QueryRunner queryRunner = getDistributedQueryRunner();
queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder()
.withTableFunctions(ImmutableSet.of(new SequenceTableFunction()))
.withFunctionProvider(Optional.of(new FunctionProvider()
{
@Override
public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle)
{
if (functionHandle instanceof SequenceTableFunctionHandle) {
return new SequenceTableFunctionProcessorProvider();
}
throw new IllegalArgumentException("This ConnectorTableFunctionHandle is not supported");
}
}))
.withTableFunctionSplitSources(functionHandle -> {
if (functionHandle instanceof SequenceTableFunctionHandle) {
return new FixedSplitSource(ImmutableList.of(new SequenceConnectorSplit()));
}
throw new IllegalArgumentException("This ConnectorTableFunctionHandle is not supported");
})
.build()));
queryRunner.createCatalog("mock", "mock");
}

@Test
Expand Down Expand Up @@ -103,7 +151,6 @@ public void testCommentColumnMaterializedView()
@Test
public void testMaterializedViewsMetadata()
{
String schemaName = getSession().getSchema().orElseThrow();
String materializedViewName = "test_materialized_view_" + randomNameSuffix();

computeActual("CREATE TABLE small_region AS SELECT * FROM tpch.tiny.region LIMIT 1");
Expand All @@ -112,14 +159,14 @@ public void testMaterializedViewsMetadata()
// test freshness update
assertQuery(
// TODO (https://github.com/trinodb/trino/issues/9039) remove redundant schema_name filter
format("SELECT freshness FROM system.metadata.materialized_views WHERE catalog_name = '%s' AND schema_name = '%s' AND name = '%s'", ICEBERG_CATALOG, schemaName, materializedViewName),
format("SELECT freshness FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '%s'", materializedViewName),
"VALUES 'STALE'");

computeActual(format("REFRESH MATERIALIZED VIEW %s", materializedViewName));

assertQuery(
// TODO (https://github.com/trinodb/trino/issues/9039) remove redundant schema_name filter
format("SELECT freshness FROM system.metadata.materialized_views WHERE catalog_name = '%s' AND schema_name = '%s' AND name = '%s'", ICEBERG_CATALOG, schemaName, materializedViewName),
format("SELECT freshness FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '%s'", materializedViewName),
"VALUES 'FRESH'");

assertUpdate("DROP TABLE small_region");
Expand Down Expand Up @@ -629,7 +676,7 @@ private void testBucketPartitioning(String dataType, String exampleValue)
assertUpdate("CREATE MATERIALIZED VIEW test_bucket_partitioning WITH (partitioning=ARRAY['bucket(col, 4)']) AS SELECT * FROM (VALUES CAST(NULL AS %s), %s) t(col)"
.formatted(dataType, exampleValue));
try {
TableMetadata storageMetadata = getStorageTableMetadata( "test_bucket_partitioning");
TableMetadata storageMetadata = getStorageTableMetadata("test_bucket_partitioning");
assertThat(storageMetadata.spec().fields()).hasSize(1);
PartitionField bucketPartitionField = getOnlyElement(storageMetadata.spec().fields());
assertThat(bucketPartitionField.name()).isEqualTo("col_bucket");
Expand Down Expand Up @@ -793,15 +840,119 @@ public void testDropLegacyMaterializedView()
assertUpdate(format("INSERT INTO %s VALUES (10, 10), (10, 40), (20, 20)", sourceTableName), 3);
assertUpdate("REFRESH MATERIALIZED VIEW " + materializedViewName, 6);

assertQuery("SELECT * FROM " + materializedViewName, "VALUES (1, 1), (1, 4), (2, 2), (10, 10), (10, 40), (20, 20)");
String storageTableName = (String) computeScalar("SELECT storage_table FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '" + materializedViewName + "'");
assertThat(storageTableName)
.isEqualTo(computeScalar("SELECT storage_table FROM system.metadata.materialized_views WHERE catalog_name = 'iceberg_legacy_mv' AND schema_name = CURRENT_SCHEMA AND name = '" + materializedViewName + "'"))
.startsWith("st_");

assertThat(query("TABLE " + materializedViewName)).matches("TABLE " + sourceTableName);
assertThat(query("TABLE " + storageTableName)).matches("TABLE " + sourceTableName);
assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName);
assertThat(query("TABLE " + materializedViewName)).failure().hasMessageMatching(".* does not exist");
assertThat(query("TABLE " + storageTableName)).failure().hasMessageMatching(".* does not exist");
}
finally {
assertUpdate("DROP TABLE " + sourceTableName);
assertUpdate(format("DROP MATERIALIZED VIEW IF EXISTS iceberg_legacy_mv.%s.%s", schemaName, materializedViewName));
}
}

@Test
public void testMaterializedViewCreatedFromTableFunction()
{
String viewName = "materialized_view_for_ptf_" + randomNameSuffix();
assertUpdate("CREATE MATERIALIZED VIEW " + viewName + " AS SELECT * FROM TABLE(mock.system.sequence_function())");

assertFreshness(viewName, "STALE");
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '" + viewName + "'").getOnlyValue()).isNull();
int result1 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();

int result2 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
assertThat(result2).isNotEqualTo(result1); // differs because PTF sequence_function is called directly as mv is considered stale
assertFreshness(viewName, "STALE");
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '" + viewName + "'").getOnlyValue()).isNull();

assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 1);
assertFreshness(viewName, "UNKNOWN");
ZonedDateTime lastFreshTime = (ZonedDateTime) computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '" + viewName + "'").getOnlyValue();
assertThat(lastFreshTime).isNotNull();
int result3 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
assertThat(result3).isNotEqualTo(result2); // mv is not stale anymore so all selects until next refresh returns same result
int result4 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
int result5 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
assertThat(result4).isEqualTo(result3);
assertThat(result4).isEqualTo(result5);

assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 1);
assertThat((ZonedDateTime) computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '" + viewName + "'").getOnlyValue()).isAfter(lastFreshTime);
assertFreshness(viewName, "UNKNOWN");
int result6 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
assertThat(result6).isNotEqualTo(result5);
}

@Test
public void testMaterializedViewCreatedFromTableFunctionAndTable()
{
String sourceTableName = "source_table_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + sourceTableName + " (VALUE INTEGER)");
assertUpdate("INSERT INTO " + sourceTableName + " VALUES 2", 1);
String viewName = "materialized_view_for_ptf_adn_table_" + randomNameSuffix();
assertUpdate("CREATE MATERIALIZED VIEW " + viewName + " AS SELECT * FROM TABLE(mock.system.sequence_function()) CROSS JOIN " + sourceTableName);

List<MaterializedRow> materializedRows = computeActual("SELECT * FROM " + viewName).getMaterializedRows();
assertThat(materializedRows.size()).isEqualTo(1);
assertThat(materializedRows.get(0).getField(1)).isEqualTo(2);
int valueFromPtf1 = (int) materializedRows.get(0).getField(0);
assertFreshness(viewName, "STALE");
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '" + viewName + "'").getOnlyValue()).isNull();

materializedRows = computeActual("SELECT * FROM " + viewName).getMaterializedRows();
assertThat(materializedRows.size()).isEqualTo(1);
assertThat(materializedRows.get(0).getField(1)).isEqualTo(2);
int valueFromPtf2 = (int) materializedRows.get(0).getField(0);
assertThat(valueFromPtf2).isNotEqualTo(valueFromPtf1); // differs because PTF sequence_function is called directly as mv is considered stale
assertFreshness(viewName, "STALE");
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '" + viewName + "'").getOnlyValue()).isNull();

assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 1);
assertFreshness(viewName, "UNKNOWN");
ZonedDateTime lastFreshTime = (ZonedDateTime) computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '" + viewName + "'").getOnlyValue();
assertThat(lastFreshTime).isNotNull();
materializedRows = computeActual("SELECT * FROM " + viewName).getMaterializedRows();
assertThat(materializedRows.size()).isEqualTo(1);
assertThat(materializedRows.get(0).getField(1)).isEqualTo(2);
int valueFromPtf3 = (int) materializedRows.get(0).getField(0);
assertThat(valueFromPtf3).isNotEqualTo(valueFromPtf1);
assertThat(valueFromPtf3).isNotEqualTo(valueFromPtf2);

materializedRows = computeActual("SELECT * FROM " + viewName).getMaterializedRows();
assertThat(materializedRows.size()).isEqualTo(1);
assertThat(materializedRows.get(0).getField(1)).isEqualTo(2);
int valueFromPtf4 = (int) materializedRows.get(0).getField(0);
assertThat(valueFromPtf4).isNotEqualTo(valueFromPtf1);
assertThat(valueFromPtf4).isNotEqualTo(valueFromPtf2);
assertThat(valueFromPtf4).isEqualTo(valueFromPtf3); // mv is not stale anymore so all selects until next refresh returns same result
}

@Test
public void testMaterializedViewCreatedFromTableFunctionWithGracePeriod()
throws InterruptedException
{
String viewName = "materialized_view_for_ptf_with_grace_period_" + randomNameSuffix();
assertUpdate("CREATE MATERIALIZED VIEW " + viewName + " GRACE PERIOD INTERVAL '1' SECOND AS SELECT * FROM TABLE(mock.system.sequence_function())");

int result1 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
int result2 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
assertThat(result2).isNotEqualTo(result1);

assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 1);
int result3 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
assertThat(result3).isNotEqualTo(result2);
Thread.sleep(1001);
int result4 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
assertThat(result4).isNotEqualTo(result3);
}

protected String getColumnComment(String tableName, String columnName)
{
return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'");
Expand All @@ -821,4 +972,95 @@ private long getLatestSnapshotId(String tableName)
{
return (long) computeScalar(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES", tableName));
}

private void assertFreshness(String viewName, String expected)
{
assertThat((String) computeScalar("SELECT freshness FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA AND name = '" + viewName + "'")).isEqualTo(expected);
}

public static class SequenceTableFunction
extends AbstractConnectorTableFunction
{
public SequenceTableFunction()
{
super("system", "sequence_function", List.of(), GENERIC_TABLE);
}

@Override
public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map<String, Argument> arguments, ConnectorAccessControl accessControl)
{
return TableFunctionAnalysis.builder()
.handle(new SequenceTableFunctionHandle())
.returnedType(new Descriptor(ImmutableList.of(new Descriptor.Field("next_value", Optional.of(INTEGER)))))
.build();
}
}

public static class SequenceTableFunctionHandle
implements ConnectorTableFunctionHandle {}

public static class SequenceTableFunctionProcessorProvider
implements TableFunctionProcessorProvider
{
private final SequenceFunctionProcessor sequenceFunctionProcessor = new SequenceFunctionProcessor();

@Override
public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle, ConnectorSplit split)
{
sequenceFunctionProcessor.reset();
return sequenceFunctionProcessor;
}
}

public static class SequenceFunctionProcessor
implements TableFunctionSplitProcessor
{
private static final AtomicInteger generator = new AtomicInteger(10);
private final AtomicBoolean finished = new AtomicBoolean(false);

@Override
public TableFunctionProcessorState process()
{
if (finished.get()) {
return FINISHED;
}
BlockBuilder builder = INTEGER.createBlockBuilder(null, 1);
INTEGER.writeInt(builder, generator.getAndIncrement());
finished.set(true);
return produced(new Page(builder.build()));
}

public void reset()
{
finished.set(false);
}
}

public record SequenceConnectorSplit()
implements ConnectorSplit
{
private static final int INSTANCE_SIZE = instanceSize(SequenceConnectorSplit.class);

@JsonIgnore
@Override
public Object getInfo()
{
return ImmutableMap.builder()
.put("ignored", "ignored")
.buildOrThrow();
}

@JsonIgnore
@Override
public SplitWeight getSplitWeight()
{
return SplitWeight.standard();
}

@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE;
}
}
}
Loading