diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java index 477ae948dde7..f22c0de90305 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java @@ -53,6 +53,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -209,9 +210,9 @@ public void close() public static class ArbitraryDistributionTaskSource implements TaskSource { - private final Map sourceFragmentToRemoteSourceNodeIdMap; - private final Map sourceExchanges; - private final Multimap exchangeSourceHandles; + private final IdentityHashMap sourceExchanges; + private final Multimap partitionedExchangeSourceHandles; + private final Multimap replicatedExchangeSourceHandles; private final long targetPartitionSizeInBytes; private boolean finished; @@ -223,33 +224,33 @@ public static ArbitraryDistributionTaskSource create( DataSize targetPartitionSize) { checkArgument(fragment.getPartitionedSources().isEmpty(), "no partitioned sources (table scans) expected, got: %s", fragment.getPartitionedSources()); - checkArgument(fragment.getRemoteSourceNodes().stream().noneMatch(node -> node.getExchangeType() == REPLICATE), "replicated exchanges are not expected in source distributed stage, got: %s", fragment.getRemoteSourceNodes()); + IdentityHashMap exchangeForHandleMap = getExchangeForHandleMap(sourceExchanges, exchangeSourceHandles); return new ArbitraryDistributionTaskSource( - getSourceFragmentToRemoteSourceNodeIdMap(fragment.getRemoteSourceNodes()), - sourceExchanges, - exchangeSourceHandles, + exchangeForHandleMap, + getPartitionedExchangeSourceHandles(fragment, exchangeSourceHandles), + getReplicatedExchangeSourceHandles(fragment, exchangeSourceHandles), targetPartitionSize); } public ArbitraryDistributionTaskSource( - Map sourceFragmentToRemoteSourceNodeIdMap, - Map sourceExchanges, - Multimap exchangeSourceHandles, + IdentityHashMap sourceExchanges, + Multimap partitionedExchangeSourceHandles, + Multimap replicatedExchangeSourceHandles, DataSize targetPartitionSize) { - this.sourceFragmentToRemoteSourceNodeIdMap = ImmutableMap.copyOf(requireNonNull(sourceFragmentToRemoteSourceNodeIdMap, "sourceFragmentToRemoteSourceNodeIdMap is null")); - this.sourceExchanges = ImmutableMap.copyOf(requireNonNull(sourceExchanges, "sourceExchanges is null")); + this.sourceExchanges = new IdentityHashMap<>(requireNonNull(sourceExchanges, "sourceExchanges is null")); + this.partitionedExchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(partitionedExchangeSourceHandles, "partitionedExchangeSourceHandles is null")); + this.replicatedExchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(replicatedExchangeSourceHandles, "replicatedExchangeSourceHandles is null")); checkArgument( - sourceFragmentToRemoteSourceNodeIdMap.keySet().equals(sourceExchanges.keySet()), - "sourceFragmentToRemoteSourceNodeIdMap and sourceExchanges are expected to have the same set of keys: %s != %s", - sourceFragmentToRemoteSourceNodeIdMap.keySet(), + sourceExchanges.keySet().containsAll(partitionedExchangeSourceHandles.values()), + "Unexpected entries in partitionedExchangeSourceHandles map: %s; allowed keys: %s", + partitionedExchangeSourceHandles.values(), sourceExchanges.keySet()); - this.exchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null")); checkArgument( - sourceExchanges.keySet().containsAll(exchangeSourceHandles.keySet()), - "Unexpected keys in exchangeSourceHandles map: %s; allowed keys: %s", - exchangeSourceHandles.keySet(), + sourceExchanges.keySet().containsAll(replicatedExchangeSourceHandles.values()), + "Unexpected entries in replicatedExchangeSourceHandles map: %s; allowed keys: %s", + replicatedExchangeSourceHandles.values(), sourceExchanges.keySet()); this.targetPartitionSizeInBytes = requireNonNull(targetPartitionSize, "targetPartitionSize is null").toBytes(); } @@ -266,11 +267,10 @@ public List getMoreTasks() long assignedExchangeDataSize = 0; int assignedExchangeSourceHandleCount = 0; - for (Map.Entry entry : exchangeSourceHandles.entries()) { - PlanFragmentId sourceFragmentId = entry.getKey(); - PlanNodeId remoteSourcePlanNodeId = sourceFragmentToRemoteSourceNodeIdMap.get(sourceFragmentId); + for (Map.Entry entry : partitionedExchangeSourceHandles.entries()) { + PlanNodeId remoteSourcePlanNodeId = entry.getKey(); ExchangeSourceHandle originalExchangeSourceHandle = entry.getValue(); - Exchange sourceExchange = sourceExchanges.get(sourceFragmentId); + Exchange sourceExchange = sourceExchanges.get(originalExchangeSourceHandle); ExchangeSourceSplitter splitter = sourceExchange.split(originalExchangeSourceHandle, targetPartitionSizeInBytes); ImmutableList.Builder sourceHandles = ImmutableList.builder(); @@ -286,6 +286,7 @@ public List getMoreTasks() for (ExchangeSourceHandle handle : sourceHandles.build()) { ExchangeSourceStatistics statistics = sourceExchange.getExchangeSourceStatistics(handle); if (assignedExchangeDataSize != 0 && assignedExchangeDataSize + statistics.getSizeInBytes() > targetPartitionSizeInBytes) { + assignedExchangeSourceHandles.putAll(replicatedExchangeSourceHandles); result.add(new TaskDescriptor(currentPartitionId++, ImmutableListMultimap.of(), assignedExchangeSourceHandles.build(), nodeRequirements)); assignedExchangeSourceHandles = ImmutableListMultimap.builder(); assignedExchangeDataSize = 0; @@ -299,6 +300,7 @@ public List getMoreTasks() } if (assignedExchangeSourceHandleCount > 0) { + assignedExchangeSourceHandles.putAll(replicatedExchangeSourceHandles); result.add(new TaskDescriptor(currentPartitionId, ImmutableListMultimap.of(), assignedExchangeSourceHandles.build(), nodeRequirements)); } @@ -704,6 +706,21 @@ public void close() } } + private static IdentityHashMap getExchangeForHandleMap( + Map sourceExchanges, + Multimap exchangeSourceHandles) + { + IdentityHashMap exchangeForHandle = new IdentityHashMap<>(); + for (Map.Entry entry : exchangeSourceHandles.entries()) { + PlanFragmentId fragmentId = entry.getKey(); + ExchangeSourceHandle handle = entry.getValue(); + Exchange exchange = sourceExchanges.get(fragmentId); + requireNonNull(exchange, "Exchange not found for fragment " + fragmentId); + exchangeForHandle.put(handle, exchange); + } + return exchangeForHandle; + } + private static ListMultimap getReplicatedExchangeSourceHandles(PlanFragment fragment, Multimap handles) { return getInputsForRemoteSources( diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java index 84cb0e72f7f5..8c95ff66548e 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java @@ -42,12 +42,12 @@ import io.trino.spi.exchange.ExchangeManager; import io.trino.spi.exchange.ExchangeSourceHandle; import io.trino.split.SplitSource; -import io.trino.sql.planner.plan.PlanFragmentId; import io.trino.sql.planner.plan.PlanNodeId; import org.openjdk.jol.info.ClassLayout; import org.testng.annotations.Test; import java.util.Arrays; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -71,8 +71,6 @@ public class TestStageTaskSourceFactory { - private static final PlanFragmentId FRAGMENT_ID_1 = new PlanFragmentId("1"); - private static final PlanFragmentId FRAGMENT_ID_2 = new PlanFragmentId("2"); private static final PlanNodeId PLAN_NODE_1 = new PlanNodeId("planNode1"); private static final PlanNodeId PLAN_NODE_2 = new PlanNodeId("planNode2"); private static final PlanNodeId PLAN_NODE_3 = new PlanNodeId("planNode3"); @@ -111,18 +109,27 @@ public void testArbitraryDistributionTaskSource() ExchangeManager splittingExchangeManager = new TestingExchangeManager(true); ExchangeManager nonSplittingExchangeManager = new TestingExchangeManager(false); - TaskSource taskSource = new ArbitraryDistributionTaskSource(ImmutableMap.of(), ImmutableMap.of(), ImmutableListMultimap.of(), DataSize.of(3, BYTE)); + TaskSource taskSource = new ArbitraryDistributionTaskSource(new IdentityHashMap<>(), + ImmutableListMultimap.of(), + ImmutableListMultimap.of(), + DataSize.of(3, BYTE)); assertFalse(taskSource.isFinished()); List tasks = taskSource.getMoreTasks(); assertThat(tasks).isEmpty(); assertTrue(taskSource.isFinished()); - Multimap sources = ImmutableListMultimap.of(FRAGMENT_ID_1, new TestingExchangeSourceHandle(0, 3)); + TestingExchangeSourceHandle sourceHandle1 = new TestingExchangeSourceHandle(0, 1); + TestingExchangeSourceHandle sourceHandle2 = new TestingExchangeSourceHandle(0, 2); + TestingExchangeSourceHandle sourceHandle3 = new TestingExchangeSourceHandle(0, 3); + TestingExchangeSourceHandle sourceHandle4 = new TestingExchangeSourceHandle(0, 4); + TestingExchangeSourceHandle sourceHandle123 = new TestingExchangeSourceHandle(0, 123); + TestingExchangeSourceHandle sourceHandle321 = new TestingExchangeSourceHandle(0, 321); + Multimap nonReplicatedSources = ImmutableListMultimap.of(PLAN_NODE_1, sourceHandle3); Exchange exchange = splittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3); taskSource = new ArbitraryDistributionTaskSource( - ImmutableMap.of(FRAGMENT_ID_1, PLAN_NODE_1), - ImmutableMap.of(FRAGMENT_ID_1, exchange), - sources, + new IdentityHashMap<>(ImmutableMap.of(sourceHandle3, exchange)), + nonReplicatedSources, + ImmutableListMultimap.of(), DataSize.of(3, BYTE)); tasks = taskSource.getMoreTasks(); assertTrue(taskSource.isFinished()); @@ -133,12 +140,12 @@ public void testArbitraryDistributionTaskSource() ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchangeSourceHandle(0, 3)), new NodeRequirements(Optional.empty(), ImmutableSet.of())))); - sources = ImmutableListMultimap.of(FRAGMENT_ID_1, new TestingExchangeSourceHandle(0, 123)); + nonReplicatedSources = ImmutableListMultimap.of(PLAN_NODE_1, sourceHandle123); exchange = nonSplittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3); taskSource = new ArbitraryDistributionTaskSource( - ImmutableMap.of(FRAGMENT_ID_1, PLAN_NODE_1), - ImmutableMap.of(FRAGMENT_ID_1, exchange), - sources, + new IdentityHashMap<>(ImmutableMap.of(sourceHandle123, exchange)), + nonReplicatedSources, + ImmutableListMultimap.of(), DataSize.of(3, BYTE)); tasks = taskSource.getMoreTasks(); assertEquals(tasks, ImmutableList.of(new TaskDescriptor( @@ -147,14 +154,16 @@ public void testArbitraryDistributionTaskSource() ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchangeSourceHandle(0, 123)), new NodeRequirements(Optional.empty(), ImmutableSet.of())))); - sources = ImmutableListMultimap.of( - FRAGMENT_ID_1, new TestingExchangeSourceHandle(0, 123), - FRAGMENT_ID_2, new TestingExchangeSourceHandle(0, 321)); + nonReplicatedSources = ImmutableListMultimap.of( + PLAN_NODE_1, sourceHandle123, + PLAN_NODE_2, sourceHandle321); exchange = nonSplittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3); taskSource = new ArbitraryDistributionTaskSource( - ImmutableMap.of(FRAGMENT_ID_1, PLAN_NODE_1, FRAGMENT_ID_2, PLAN_NODE_2), - ImmutableMap.of(FRAGMENT_ID_1, exchange, FRAGMENT_ID_2, exchange), - sources, + new IdentityHashMap<>(ImmutableMap.of( + sourceHandle123, exchange, + sourceHandle321, exchange)), + nonReplicatedSources, + ImmutableListMultimap.of(), DataSize.of(3, BYTE)); tasks = taskSource.getMoreTasks(); assertEquals(tasks, ImmutableList.of( @@ -169,22 +178,27 @@ FRAGMENT_ID_1, new TestingExchangeSourceHandle(0, 123), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchangeSourceHandle(0, 321)), new NodeRequirements(Optional.empty(), ImmutableSet.of())))); - sources = ImmutableListMultimap.of( - FRAGMENT_ID_1, new TestingExchangeSourceHandle(0, 1), - FRAGMENT_ID_1, new TestingExchangeSourceHandle(0, 2), - FRAGMENT_ID_2, new TestingExchangeSourceHandle(0, 4)); + nonReplicatedSources = ImmutableListMultimap.of( + PLAN_NODE_1, sourceHandle1, + PLAN_NODE_1, sourceHandle2, + PLAN_NODE_2, sourceHandle4); exchange = splittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3); taskSource = new ArbitraryDistributionTaskSource( - ImmutableMap.of(FRAGMENT_ID_1, PLAN_NODE_1, FRAGMENT_ID_2, PLAN_NODE_2), - ImmutableMap.of(FRAGMENT_ID_1, exchange, FRAGMENT_ID_2, exchange), - sources, + new IdentityHashMap<>(ImmutableMap.of( + sourceHandle1, exchange, + sourceHandle2, exchange, + sourceHandle4, exchange)), + nonReplicatedSources, + ImmutableListMultimap.of(), DataSize.of(3, BYTE)); tasks = taskSource.getMoreTasks(); assertEquals(tasks, ImmutableList.of( new TaskDescriptor( 0, ImmutableListMultimap.of(), - ImmutableListMultimap.of(PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1), PLAN_NODE_1, new TestingExchangeSourceHandle(0, 2)), + ImmutableListMultimap.of( + PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1), + PLAN_NODE_1, new TestingExchangeSourceHandle(0, 2)), new NodeRequirements(Optional.empty(), ImmutableSet.of())), new TaskDescriptor( 1, @@ -197,15 +211,18 @@ FRAGMENT_ID_1, new TestingExchangeSourceHandle(0, 2), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.empty(), ImmutableSet.of())))); - sources = ImmutableListMultimap.of( - FRAGMENT_ID_1, new TestingExchangeSourceHandle(0, 1), - FRAGMENT_ID_1, new TestingExchangeSourceHandle(0, 3), - FRAGMENT_ID_2, new TestingExchangeSourceHandle(0, 4)); + nonReplicatedSources = ImmutableListMultimap.of( + PLAN_NODE_1, sourceHandle1, + PLAN_NODE_1, sourceHandle3, + PLAN_NODE_2, sourceHandle4); exchange = splittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3); taskSource = new ArbitraryDistributionTaskSource( - ImmutableMap.of(FRAGMENT_ID_1, PLAN_NODE_1, FRAGMENT_ID_2, PLAN_NODE_2), - ImmutableMap.of(FRAGMENT_ID_1, exchange, FRAGMENT_ID_2, exchange), - sources, + new IdentityHashMap<>(ImmutableMap.of( + sourceHandle1, exchange, + sourceHandle3, exchange, + sourceHandle4, exchange)), + nonReplicatedSources, + ImmutableListMultimap.of(), DataSize.of(3, BYTE)); tasks = taskSource.getMoreTasks(); assertEquals(tasks, ImmutableList.of( @@ -229,6 +246,48 @@ FRAGMENT_ID_1, new TestingExchangeSourceHandle(0, 3), ImmutableListMultimap.of(), ImmutableListMultimap.of(PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.empty(), ImmutableSet.of())))); + + // with replicated sources + nonReplicatedSources = ImmutableListMultimap.of( + PLAN_NODE_1, sourceHandle1, + PLAN_NODE_1, sourceHandle2, + PLAN_NODE_1, sourceHandle4); + Multimap replicatedSources = ImmutableListMultimap.of( + PLAN_NODE_2, sourceHandle321); + exchange = splittingExchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId()), 3); + taskSource = new ArbitraryDistributionTaskSource( + new IdentityHashMap<>(ImmutableMap.of( + sourceHandle1, exchange, + sourceHandle2, exchange, + sourceHandle4, exchange, + sourceHandle321, exchange)), + nonReplicatedSources, + replicatedSources, + DataSize.of(3, BYTE)); + tasks = taskSource.getMoreTasks(); + assertEquals(tasks, ImmutableList.of( + new TaskDescriptor( + 0, + ImmutableListMultimap.of(), + ImmutableListMultimap.of( + PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1), + PLAN_NODE_1, new TestingExchangeSourceHandle(0, 2), + PLAN_NODE_2, new TestingExchangeSourceHandle(0, 321)), + new NodeRequirements(Optional.empty(), ImmutableSet.of())), + new TaskDescriptor( + 1, + ImmutableListMultimap.of(), + ImmutableListMultimap.of( + PLAN_NODE_1, new TestingExchangeSourceHandle(0, 3), + PLAN_NODE_2, sourceHandle321), + new NodeRequirements(Optional.empty(), ImmutableSet.of())), + new TaskDescriptor( + 2, + ImmutableListMultimap.of(), + ImmutableListMultimap.of( + PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1), + PLAN_NODE_2, sourceHandle321), + new NodeRequirements(Optional.empty(), ImmutableSet.of())))); } @Test diff --git a/plugin/trino-cassandra/pom.xml b/plugin/trino-cassandra/pom.xml index afbe5a175edd..6b3dae8c95ee 100644 --- a/plugin/trino-cassandra/pom.xml +++ b/plugin/trino-cassandra/pom.xml @@ -157,6 +157,13 @@ io.trino trino-testing test + + + + io.trino + trino-exchange + + diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java index 65edc9264e2c..66a9101f5bf4 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java @@ -20,7 +20,6 @@ import io.airlift.log.Logging; import io.trino.Session; import io.trino.metadata.QualifiedObjectName; -import io.trino.plugin.exchange.FileSystemExchangePlugin; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.MetastoreConfig; @@ -100,7 +99,6 @@ public static class Builder> { private boolean skipTimezoneSetup; private ImmutableMap.Builder hiveProperties = ImmutableMap.builder(); - private Map exchangeManagerProperties = ImmutableMap.of(); private List> initialTables = ImmutableList.of(); private Optional initialSchemasLocationBase = Optional.empty(); private Function initialTablesSessionMutator = Function.identity(); @@ -150,12 +148,6 @@ public SELF addHiveProperty(String key, String value) return self(); } - public SELF setExchangeManagerProperties(Map exchangeManagerProperties) - { - this.exchangeManagerProperties = ImmutableMap.copyOf(requireNonNull(exchangeManagerProperties, "exchangeManagerProperties is null")); - return self(); - } - public SELF setInitialTables(Iterable> initialTables) { this.initialTables = ImmutableList.copyOf(requireNonNull(initialTables, "initialTables is null")); @@ -240,11 +232,6 @@ public DistributedQueryRunner build() HiveMetastore metastore = this.metastore.apply(queryRunner); queryRunner.installPlugin(new TestingHivePlugin(metastore, module, cachingDirectoryLister)); - if (!exchangeManagerProperties.isEmpty()) { - queryRunner.installPlugin(new FileSystemExchangePlugin()); - queryRunner.loadExchangeManager("filesystem", exchangeManagerProperties); - } - Map hiveProperties = new HashMap<>(); if (!skipTimezoneSetup) { assertEquals(DateTimeZone.getDefault(), TIME_ZONE, "Timezone not configured correctly. Add -Duser.timezone=America/Bahia_Banderas to your JVM arguments"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index 060219161c79..7413219511a2 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -17,7 +17,6 @@ import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; import io.airlift.log.Logging; -import io.trino.plugin.exchange.FileSystemExchangePlugin; import io.trino.plugin.tpch.TpchPlugin; import io.trino.testing.DistributedQueryRunner; import io.trino.tpch.TpchTable; @@ -91,7 +90,6 @@ public static class Builder { private Optional metastoreDirectory = Optional.empty(); private ImmutableMap.Builder icebergProperties = ImmutableMap.builder(); - private Map exchangeManagerProperties = ImmutableMap.of(); private List> initialTables = ImmutableList.of(); protected Builder() @@ -121,12 +119,6 @@ public Builder addIcebergProperty(String key, String value) return self(); } - public Builder setExchangeManagerProperties(Map exchangeManagerProperties) - { - this.exchangeManagerProperties = ImmutableMap.copyOf(requireNonNull(exchangeManagerProperties, "exchangeManagerProperties is null")); - return self(); - } - public Builder setInitialTables(Iterable> initialTables) { this.initialTables = ImmutableList.copyOf(requireNonNull(initialTables, "initialTables is null")); @@ -142,11 +134,6 @@ public DistributedQueryRunner build() queryRunner.installPlugin(new TpchPlugin()); queryRunner.createCatalog("tpch", "tpch"); - if (!exchangeManagerProperties.isEmpty()) { - queryRunner.installPlugin(new FileSystemExchangePlugin()); - queryRunner.loadExchangeManager("filesystem", exchangeManagerProperties); - } - Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data")); queryRunner.installPlugin(new IcebergPlugin()); diff --git a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryMetadata.java b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryMetadata.java index ded365920bec..fcfcd3980531 100644 --- a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryMetadata.java +++ b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryMetadata.java @@ -36,6 +36,7 @@ import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.LimitApplicationResult; +import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SampleApplicationResult; import io.trino.spi.connector.SampleType; import io.trino.spi.connector.SchemaNotFoundException; @@ -68,6 +69,7 @@ import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.NOT_FOUND; import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; +import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.connector.SampleType.SYSTEM; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -214,12 +216,12 @@ public synchronized void renameTable(ConnectorSession session, ConnectorTableHan @Override public synchronized void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) { - ConnectorOutputTableHandle outputTableHandle = beginCreateTable(session, tableMetadata, Optional.empty()); + ConnectorOutputTableHandle outputTableHandle = beginCreateTable(session, tableMetadata, Optional.empty(), NO_RETRIES); finishCreateTable(session, outputTableHandle, ImmutableList.of(), ImmutableList.of()); } @Override - public synchronized MemoryOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout) + public synchronized MemoryOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode) { checkSchemaExists(tableMetadata.getTable().getSchemaName()); checkTableNotExists(tableMetadata.getTable()); @@ -272,7 +274,7 @@ public synchronized Optional finishCreateTable(Connecto } @Override - public synchronized MemoryInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns) + public synchronized MemoryInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns, RetryMode retryMode) { MemoryTableHandle memoryTableHandle = (MemoryTableHandle) tableHandle; return new MemoryInsertTableHandle(memoryTableHandle.getId(), ImmutableSet.copyOf(tableIds.values())); diff --git a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPageSinkProvider.java b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPageSinkProvider.java index cb737d4c9450..e5fc922e19e7 100644 --- a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPageSinkProvider.java +++ b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPageSinkProvider.java @@ -28,7 +28,9 @@ import javax.inject.Inject; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.CompletableFuture; import static com.google.common.base.Preconditions.checkState; @@ -84,7 +86,7 @@ private static class MemoryPageSink private final MemoryPagesStore pagesStore; private final HostAddress currentHostAddress; private final long tableId; - private long addedRows; + private final List appendedPages = new ArrayList<>(); public MemoryPageSink(MemoryPagesStore pagesStore, HostAddress currentHostAddress, long tableId) { @@ -96,14 +98,20 @@ public MemoryPageSink(MemoryPagesStore pagesStore, HostAddress currentHostAddres @Override public CompletableFuture appendPage(Page page) { - pagesStore.add(tableId, page); - addedRows += page.getPositionCount(); + appendedPages.add(page); return NOT_BLOCKED; } @Override public CompletableFuture> finish() { + // add pages to pagesStore + long addedRows = 0; + for (Page page : appendedPages) { + pagesStore.add(tableId, page); + addedRows += page.getPositionCount(); + } + return completedFuture(ImmutableList.of(new MemoryDataFragment(currentHostAddress, addedRows).toSlice())); } diff --git a/plugin/trino-memory/src/test/java/io/trino/plugin/memory/MemoryQueryRunner.java b/plugin/trino-memory/src/test/java/io/trino/plugin/memory/MemoryQueryRunner.java index 91ba89285bd4..6a64edebe101 100644 --- a/plugin/trino-memory/src/test/java/io/trino/plugin/memory/MemoryQueryRunner.java +++ b/plugin/trino-memory/src/test/java/io/trino/plugin/memory/MemoryQueryRunner.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.memory; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; import io.airlift.log.Logging; @@ -21,12 +22,14 @@ import io.trino.testing.DistributedQueryRunner; import io.trino.tpch.TpchTable; +import java.util.List; import java.util.Map; import static io.airlift.testing.Closeables.closeAllSuppress; import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.testing.QueryAssertions.copyTpchTables; import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.util.Objects.requireNonNull; public final class MemoryQueryRunner { @@ -39,29 +42,62 @@ public static DistributedQueryRunner createMemoryQueryRunner( Iterable> tables) throws Exception { - Session session = testSessionBuilder() - .setCatalog(CATALOG) - .setSchema("default") - .build(); - - DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session) + return builder() .setExtraProperties(extraProperties) + .setInitialTables(tables) .build(); + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + extends DistributedQueryRunner.Builder + { + private List> initialTables = ImmutableList.of(); + + protected Builder() + { + super(createSession()); + } - try { - queryRunner.installPlugin(new MemoryPlugin()); - queryRunner.createCatalog(CATALOG, "memory", ImmutableMap.of()); + public Builder setInitialTables(Iterable> initialTables) + { + this.initialTables = ImmutableList.copyOf(requireNonNull(initialTables, "initialTables is null")); + return self(); + } - queryRunner.installPlugin(new TpchPlugin()); - queryRunner.createCatalog("tpch", "tpch", ImmutableMap.of()); + @Override + public DistributedQueryRunner build() + throws Exception + { + DistributedQueryRunner queryRunner = super.build(); - copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, session, tables); + try { + queryRunner.installPlugin(new MemoryPlugin()); + queryRunner.createCatalog(CATALOG, "memory", ImmutableMap.of()); - return queryRunner; + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch", ImmutableMap.of()); + + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), initialTables); + + return queryRunner; + } + catch (Exception e) { + closeAllSuppress(e, queryRunner); + throw e; + } } - catch (Exception e) { - closeAllSuppress(e, queryRunner); - throw e; + + private static Session createSession() + { + return testSessionBuilder() + .setCatalog(CATALOG) + .setSchema("default") + .build(); } } diff --git a/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryMetadata.java b/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryMetadata.java index 9b52e4413157..23803a28072c 100644 --- a/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryMetadata.java +++ b/plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryMetadata.java @@ -35,6 +35,7 @@ import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.NOT_FOUND; +import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.security.PrincipalType.USER; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; @@ -70,7 +71,8 @@ public void tableIsCreatedAfterCommits() ConnectorOutputTableHandle table = metadata.beginCreateTable( SESSION, new ConnectorTableMetadata(schemaTableName, ImmutableList.of(), ImmutableMap.of()), - Optional.empty()); + Optional.empty(), + NO_RETRIES); metadata.finishCreateTable(SESSION, table, ImmutableList.of(), ImmutableList.of()); @@ -111,7 +113,7 @@ public void testActiveTableIds() MemoryTableHandle firstTableHandle = (MemoryTableHandle) metadata.getTableHandle(SESSION, firstTableName); long firstTableId = firstTableHandle.getId(); - assertTrue(metadata.beginInsert(SESSION, firstTableHandle, ImmutableList.of()).getActiveTableIds().contains(firstTableId)); + assertTrue(metadata.beginInsert(SESSION, firstTableHandle, ImmutableList.of(), NO_RETRIES).getActiveTableIds().contains(firstTableId)); SchemaTableName secondTableName = new SchemaTableName("default", "second_table"); metadata.createTable(SESSION, new ConnectorTableMetadata(secondTableName, ImmutableList.of(), ImmutableMap.of()), false); @@ -120,8 +122,8 @@ public void testActiveTableIds() long secondTableId = secondTableHandle.getId(); assertNotEquals(firstTableId, secondTableId); - assertTrue(metadata.beginInsert(SESSION, secondTableHandle, ImmutableList.of()).getActiveTableIds().contains(firstTableId)); - assertTrue(metadata.beginInsert(SESSION, secondTableHandle, ImmutableList.of()).getActiveTableIds().contains(secondTableId)); + assertTrue(metadata.beginInsert(SESSION, secondTableHandle, ImmutableList.of(), NO_RETRIES).getActiveTableIds().contains(firstTableId)); + assertTrue(metadata.beginInsert(SESSION, secondTableHandle, ImmutableList.of(), NO_RETRIES).getActiveTableIds().contains(secondTableId)); } @Test @@ -134,7 +136,8 @@ public void testReadTableBeforeCreationCompleted() ConnectorOutputTableHandle table = metadata.beginCreateTable( SESSION, new ConnectorTableMetadata(tableName, ImmutableList.of(), ImmutableMap.of()), - Optional.empty()); + Optional.empty(), + NO_RETRIES); List tableNames = metadata.listTables(SESSION, Optional.empty()); assertEquals(tableNames.size(), 1, "Expected exactly one table"); @@ -277,7 +280,11 @@ public void testCreateTableAndViewInNotExistSchema() assertEquals(metadata.listSchemaNames(SESSION), ImmutableList.of("default")); SchemaTableName table1 = new SchemaTableName("test1", "test_schema_table1"); - assertTrinoExceptionThrownBy(() -> metadata.beginCreateTable(SESSION, new ConnectorTableMetadata(table1, ImmutableList.of(), ImmutableMap.of()), Optional.empty())) + assertTrinoExceptionThrownBy(() -> metadata.beginCreateTable( + SESSION, + new ConnectorTableMetadata(table1, ImmutableList.of(), ImmutableMap.of()), + Optional.empty(), + NO_RETRIES)) .hasErrorCode(NOT_FOUND) .hasMessage("Schema test1 not found"); assertNull(metadata.getTableHandle(SESSION, table1)); @@ -305,7 +312,8 @@ public void testRenameTable() ConnectorOutputTableHandle table = metadata.beginCreateTable( SESSION, new ConnectorTableMetadata(tableName, ImmutableList.of(), ImmutableMap.of()), - Optional.empty()); + Optional.empty(), + NO_RETRIES); metadata.finishCreateTable(SESSION, table, ImmutableList.of(), ImmutableList.of()); // rename table to schema which does not exist diff --git a/plugin/trino-phoenix/pom.xml b/plugin/trino-phoenix/pom.xml index 8e428774ba3c..8e4c1820ab7c 100644 --- a/plugin/trino-phoenix/pom.xml +++ b/plugin/trino-phoenix/pom.xml @@ -182,6 +182,11 @@ junit junit + + + io.trino + trino-exchange + diff --git a/plugin/trino-phoenix5/pom.xml b/plugin/trino-phoenix5/pom.xml index ebdcf6864cf6..184df9ff7082 100644 --- a/plugin/trino-phoenix5/pom.xml +++ b/plugin/trino-phoenix5/pom.xml @@ -187,6 +187,11 @@ junit junit + + + io.trino + trino-exchange + diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml index 1e796b4157a7..c7bc167da628 100755 --- a/plugin/trino-pinot/pom.xml +++ b/plugin/trino-pinot/pom.xml @@ -483,6 +483,13 @@ io.trino trino-testing test + + + + io.trino + trino-exchange + + diff --git a/testing/trino-testing/pom.xml b/testing/trino-testing/pom.xml index edaa27d25994..7a42993bbc6b 100644 --- a/testing/trino-testing/pom.xml +++ b/testing/trino-testing/pom.xml @@ -22,6 +22,11 @@ trino-client + + io.trino + trino-exchange + + io.trino trino-main diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java new file mode 100644 index 000000000000..9756ff0e222c --- /dev/null +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java @@ -0,0 +1,297 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.testing; + +import io.trino.Session; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.Test; + +import java.time.ZonedDateTime; +import java.util.regex.Pattern; + +import static io.trino.FeaturesConfig.JoinDistributionType.BROADCAST; +import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; +import static io.trino.testing.assertions.Assert.assertEventually; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public abstract class AbstractDistributedEngineOnlyQueries + extends AbstractTestEngineOnlyQueries +{ + /** + * Ensure the tests are run with {@link io.trino.testing.DistributedQueryRunner}. E.g. {@link io.trino.testing.LocalQueryRunner} takes some + * shortcuts, not exercising certain aspects. + */ + @Test + public void ensureDistributedQueryRunner() + { + assertThat(getQueryRunner().getNodeCount()).as("query runner node count") + .isGreaterThanOrEqualTo(3); + } + + @Test + public void testTimestampWithTimeZoneLiteralsWithDifferentZone() + { + assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123 Europe/Warsaw'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123+01:00[Europe/Warsaw]")); + assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123 Europe/Paris'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123+01:00[Europe/Paris]")); + + assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456 Europe/Warsaw'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123456+01:00[Europe/Warsaw]")); + assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456 Europe/Paris'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123456+01:00[Europe/Paris]")); + + assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456789 Europe/Warsaw'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123456789+01:00[Europe/Warsaw]")); + assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456789 Europe/Paris'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123456789+01:00[Europe/Paris]")); + + assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456789012 Europe/Warsaw'").getOnlyValue()).isEqualTo("2017-01-02 09:12:34.123456789012 Europe/Warsaw"); + assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456789012 Europe/Paris'").getOnlyValue()).isEqualTo("2017-01-02 09:12:34.123456789012 Europe/Paris"); + } + + @Test + public void testUse() + { + assertQueryFails("USE invalid.xyz", "Catalog does not exist: invalid"); + assertQueryFails("USE tpch.invalid", "Schema does not exist: tpch.invalid"); + } + + @Test + public void testRoles() + { + Session invalid = Session.builder(getSession()).setCatalog("invalid").build(); + assertQueryFails(invalid, "CREATE ROLE test", "System roles are not enabled"); + assertQueryFails(invalid, "CREATE ROLE test", "System roles are not enabled"); + assertQueryFails(invalid, "DROP ROLE test", "line 1:1: Role 'test' does not exist"); + assertQueryFails(invalid, "GRANT test TO USER foo", "line 1:1: Role 'test' does not exist"); + assertQueryFails(invalid, "REVOKE test FROM USER foo", "line 1:1: Role 'test' does not exist"); + assertQueryFails(invalid, "SET ROLE test", "line 1:1: Role 'test' does not exist"); + + assertQueryFails(invalid, "CREATE ROLE test IN invalid", "line 1:1: Catalog 'invalid' does not exist"); + assertQueryFails(invalid, "DROP ROLE test IN invalid", "line 1:1: Catalog 'invalid' does not exist"); + assertQueryFails(invalid, "GRANT test TO USER foo IN invalid", "line 1:1: Catalog 'invalid' does not exist"); + assertQueryFails(invalid, "REVOKE test FROM USER foo IN invalid", "line 1:1: Catalog 'invalid' does not exist"); + assertQueryFails(invalid, "SET ROLE test IN invalid", "line 1:1: Catalog 'invalid' does not exist"); + } + + @Test + public void testDuplicatedRowCreateTable() + { + assertQueryFails("CREATE TABLE test (a integer, a integer)", + "line 1:31: Column name 'a' specified more than once"); + assertQueryFails("CREATE TABLE test (a integer, orderkey integer, LIKE orders INCLUDING PROPERTIES)", + "line 1:49: Column name 'orderkey' specified more than once"); + + assertQueryFails("CREATE TABLE test (a integer, A integer)", + "line 1:31: Column name 'A' specified more than once"); + assertQueryFails("CREATE TABLE test (a integer, OrderKey integer, LIKE orders INCLUDING PROPERTIES)", + "line 1:49: Column name 'orderkey' specified more than once"); + } + + @Test + public void testTooLongQuery() + { + // Generate a super-long query: SELECT x,x,x,x,x,... FROM (VALUES 1,2,3,4,5) t(x) + @Language("SQL") String longQuery = "SELECT x" + ",x".repeat(500_000) + " FROM (VALUES 1,2,3,4,5) t(x)"; + assertQueryFails(longQuery, "Query text length \\(1000037\\) exceeds the maximum length \\(1000000\\)"); + } + + @Test + public void testTooManyStages() + { + @Language("SQL") String query = "WITH\n" + + " t1 AS (SELECT nationkey AS x FROM nation where name='UNITED STATES'),\n" + + " t2 AS (SELECT a.x+b.x+c.x+d.x AS x FROM t1 a, t1 b, t1 c, t1 d),\n" + + " t3 AS (SELECT a.x+b.x+c.x+d.x AS x FROM t2 a, t2 b, t2 c, t2 d),\n" + + " t4 AS (SELECT a.x+b.x+c.x+d.x AS x FROM t3 a, t3 b, t3 c, t3 d),\n" + + " t5 AS (SELECT a.x+b.x+c.x+d.x AS x FROM t4 a, t4 b, t4 c, t4 d)\n" + + "SELECT x FROM t5\n"; + assertQueryFails(query, "Number of stages in the query \\([0-9]+\\) exceeds the allowed maximum \\([0-9]+\\).*"); + } + + @Test + public void testRowSubscriptWithReservedKeyword() + { + // Subscript over field named after reserved keyword. This test needs to run in distributed + // mode, as it uncovers a problem during deserialization plan expressions + assertQuery( + "SELECT cast(row(1) AS row(\"cross\" bigint))[1]", + "VALUES 1"); + } + + @Test + public void testRowTypeWithReservedKeyword() + { + // This test is here because it only reproduces the issue (https://github.com/trinodb/trino/issues/1962) + // when running in distributed mode + assertQuery( + "SELECT cast(row(1) AS row(\"cross\" bigint)).\"cross\"", + "VALUES 1"); + } + + @Test + public void testExplain() + { + assertExplain( + "explain select name from nation where abs(nationkey) = 22", + Pattern.quote("abs(\"nationkey\")"), + "Estimates: \\{rows: .* \\(.*\\), cpu: .*, memory: .*, network: .*}"); + } + + // explain analyze can only run on coordinator + @Test + public void testExplainAnalyze() + { + assertExplainAnalyze( + noJoinReordering(BROADCAST), + "EXPLAIN ANALYZE SELECT * FROM (SELECT nationkey, regionkey FROM nation GROUP BY nationkey, regionkey) a, nation b WHERE a.regionkey = b.regionkey"); + assertExplainAnalyze( + "EXPLAIN ANALYZE SELECT * FROM nation a, nation b WHERE a.nationkey = b.nationkey", + "Left \\(probe\\) Input avg\\.: .* rows, Input std\\.dev\\.: .*", + "Right \\(build\\) Input avg\\.: .* rows, Input std\\.dev\\.: .*", + "Collisions avg\\.: .* \\(.* est\\.\\), Collisions std\\.dev\\.: .*"); + assertExplainAnalyze( + Session.builder(getSession()) + .setSystemProperty(ENABLE_DYNAMIC_FILTERING, "false") + .build(), + "EXPLAIN ANALYZE SELECT * FROM nation a, nation b WHERE a.nationkey = b.nationkey", + "Left \\(probe\\) Input avg\\.: .* rows, Input std\\.dev\\.: .*", + "Right \\(build\\) Input avg\\.: .* rows, Input std\\.dev\\.: .*", + "Collisions avg\\.: .* \\(.* est\\.\\), Collisions std\\.dev\\.: .*"); + + assertExplainAnalyze( + "EXPLAIN ANALYZE SELECT nationkey FROM nation GROUP BY nationkey", + "Collisions avg\\.: .* \\(.* est\\.\\), Collisions std\\.dev\\.: .*"); + + assertExplainAnalyze( + "EXPLAIN ANALYZE SELECT * FROM nation a, nation b WHERE a.nationkey = b.nationkey", + "Estimates: \\{rows: .* \\(.*\\), cpu: .*, memory: .*, network: .*}"); + } + + @Test + public void testExplainAnalyzeDynamicFilterInfo() + { + // ExplainAnalyzeOperator may finish before dynamic filter stats are reported to QueryInfo + assertEventually(() -> assertExplainAnalyze( + "EXPLAIN ANALYZE SELECT * FROM nation a, nation b WHERE a.nationkey = b.nationkey", + "Dynamic filters: \n.*ranges=25, \\{\\[0], ..., \\[24]}.* collection time=\\d+.*")); + } + + @Test + public void testExplainAnalyzeVerbose() + { + assertExplainAnalyze( + "EXPLAIN ANALYZE VERBOSE SELECT * FROM nation a", + "'Input distribution' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}"); + } + + @Test + public void testInsertWithCoercion() + { + String tableName = "test_insert_with_coercion_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (" + + "tinyint_column tinyint, " + + "integer_column integer, " + + "decimal_column decimal(5, 3), " + + "real_column real, " + + "char_column char(3), " + + "bounded_varchar_column varchar(3), " + + "unbounded_varchar_column varchar, " + + "date_column date)"); + + assertUpdate("INSERT INTO " + tableName + " (tinyint_column, integer_column, decimal_column, real_column) VALUES (1e0, 2e0, 3e0, 4e0)", 1); + assertUpdate("INSERT INTO " + tableName + " (char_column, bounded_varchar_column, unbounded_varchar_column) VALUES (VARCHAR 'aa ', VARCHAR 'aa ', VARCHAR 'aa ')", 1); + assertUpdate("INSERT INTO " + tableName + " (char_column, bounded_varchar_column, unbounded_varchar_column) VALUES (NULL, NULL, NULL)", 1); + assertUpdate("INSERT INTO " + tableName + " (char_column, bounded_varchar_column, unbounded_varchar_column) VALUES (CAST(NULL AS varchar), CAST(NULL AS varchar), CAST(NULL AS varchar))", 1); + assertUpdate("INSERT INTO " + tableName + " (date_column) VALUES (TIMESTAMP '2019-11-18 22:13:40')", 1); + + assertQuery( + "SELECT * FROM " + tableName, + "VALUES " + + "(1, 2, 3, 4, NULL, NULL, NULL, NULL), " + + "(NULL, NULL, NULL, NULL, 'aa ', 'aa ', 'aa ', NULL), " + + "(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL), " + + "(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL), " + + "(NULL, NULL, NULL, NULL, NULL, NULL, NULL, DATE '2019-11-18')"); + + assertQueryFails("INSERT INTO " + tableName + " (integer_column) VALUES (3e9)", "Out of range for integer: 3.0E9"); + assertQueryFails("INSERT INTO " + tableName + " (char_column) VALUES ('abcd')", "\\QCannot truncate non-space characters when casting from varchar(4) to char(3) on INSERT"); + assertQueryFails("INSERT INTO " + tableName + " (bounded_varchar_column) VALUES ('abcd')", "\\QCannot truncate non-space characters when casting from varchar(4) to varchar(3) on INSERT"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testCreateTableAsTable() + { + // Ensure CTA works when the table exposes hidden fields + // First, verify that the table 'nation' contains the expected hidden column 'row_number' + assertThat(query("SELECT count(*) FROM information_schema.columns " + + "WHERE table_catalog = 'tpch' and table_schema = 'tiny' and table_name = 'nation' and column_name = 'row_number'")) + .matches("VALUES BIGINT '0'"); + assertThat(query("SELECT min(row_number) FROM tpch.tiny.nation")) + .matches("VALUES BIGINT '0'"); + + assertUpdate(getSession(), "CREATE TABLE n AS TABLE tpch.tiny.nation", 25); + assertThat(query("SELECT * FROM n")) + .matches("SELECT * FROM tpch.tiny.nation"); + + // Verify that hidden column is not present in the created table + assertThatThrownBy(() -> query("SELECT min(row_number) FROM n")) + .hasMessage("line 1:12: Column 'row_number' cannot be resolved"); + assertUpdate(getSession(), "DROP TABLE n"); + } + + @Test + public void testInsertTableIntoTable() + { + // Ensure INSERT works when the source table exposes hidden fields + // First, verify that the table 'nation' contains the expected hidden column 'row_number' + assertThat(query("SELECT count(*) FROM information_schema.columns " + + "WHERE table_catalog = 'tpch' and table_schema = 'tiny' and table_name = 'nation' and column_name = 'row_number'")) + .matches("VALUES BIGINT '0'"); + assertThat(query("SELECT min(row_number) FROM tpch.tiny.nation")) + .matches("VALUES BIGINT '0'"); + + // Create empty target table for INSERT + assertUpdate(getSession(), "CREATE TABLE n AS TABLE tpch.tiny.nation WITH NO DATA", 0); + assertThat(query("SELECT * FROM n")) + .matches("SELECT * FROM tpch.tiny.nation LIMIT 0"); + + // Verify that the hidden column is not present in the created table + assertThatThrownBy(() -> query("SELECT row_number FROM n")) + .hasMessage("line 1:8: Column 'row_number' cannot be resolved"); + + // Insert values from the original table into the created table + assertUpdate(getSession(), "INSERT INTO n TABLE tpch.tiny.nation", 25); + assertThat(query("SELECT * FROM n")) + .matches("SELECT * FROM tpch.tiny.nation"); + + assertUpdate(getSession(), "DROP TABLE n"); + } + + @Test + public void testImplicitCastToRowWithFieldsRequiringDelimitation() + { + // source table uses char(4) as ROW fields + assertUpdate("CREATE TABLE source_table(r ROW(a char(4), b char(4)))"); + + // target table uses varchar as ROW fields which will enforce implicit CAST on INSERT + // field names in target table require delimitation + // - "a b" has whitespace + // - "from" is a reserved key word + assertUpdate("CREATE TABLE target_table(r ROW(\"a b\" varchar, \"from\" varchar))"); + + // run INSERT to verify that field names in generated CAST expressions are properly delimited + assertUpdate("INSERT INTO target_table SELECT * from source_table", 0); + } +} diff --git a/testing/trino-tests/src/test/java/io/trino/tests/AbstractTestEngineOnlyQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestEngineOnlyQueries.java similarity index 99% rename from testing/trino-tests/src/test/java/io/trino/tests/AbstractTestEngineOnlyQueries.java rename to testing/trino-testing/src/main/java/io/trino/testing/AbstractTestEngineOnlyQueries.java index 28576f115e7b..28c960a88007 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/AbstractTestEngineOnlyQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestEngineOnlyQueries.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.tests; +package io.trino.testing; import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; @@ -27,10 +27,8 @@ import io.trino.SystemSessionProperties; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.type.TimeZoneKey; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.MaterializedResult; -import io.trino.testing.MaterializedRow; import io.trino.testing.assertions.Assert; +import io.trino.tests.QueryTemplate; import io.trino.tpch.TpchTable; import io.trino.type.SqlIntervalDayTime; import io.trino.type.SqlIntervalYearMonth; diff --git a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java index 01410fd54af7..4287c2e5981c 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java @@ -34,6 +34,7 @@ import io.trino.metadata.QualifiedObjectName; import io.trino.metadata.SessionPropertyManager; import io.trino.metadata.SqlFunction; +import io.trino.plugin.exchange.FileSystemExchangePlugin; import io.trino.server.BasicQueryInfo; import io.trino.server.SessionPropertyDefaults; import io.trino.server.testing.TestingTrinoServer; @@ -625,6 +626,7 @@ public static class Builder> private Map extraProperties = new HashMap<>(); private Map coordinatorProperties = ImmutableMap.of(); private Optional> backupCoordinatorProperties = Optional.empty(); + private Map exchangeManagerProperties = ImmutableMap.of(); private String environment = ENVIRONMENT; private Module additionalModule = EMPTY_MODULE; private Optional baseDataDir = Optional.empty(); @@ -673,6 +675,12 @@ public SELF setBackupCoordinatorProperties(Map backupCoordinator return self(); } + public SELF setExchangeManagerProperties(Map exchangeManagerProperties) + { + this.exchangeManagerProperties = ImmutableMap.copyOf(requireNonNull(exchangeManagerProperties, "exchangeManagerProperties is null")); + return self(); + } + /** * Sets coordinator properties being equal to a map containing given key and value. * Note, that calling this method OVERWRITES previously set property values. @@ -744,7 +752,7 @@ protected SELF self() public DistributedQueryRunner build() throws Exception { - return new DistributedQueryRunner( + DistributedQueryRunner queryRunner = new DistributedQueryRunner( defaultSession, nodeCount, extraProperties, @@ -755,6 +763,13 @@ public DistributedQueryRunner build() baseDataDir, systemAccessControls, eventListeners); + + if (!exchangeManagerProperties.isEmpty()) { + queryRunner.installPlugin(new FileSystemExchangePlugin()); + queryRunner.loadExchangeManager("filesystem", exchangeManagerProperties); + } + + return queryRunner; } } } diff --git a/testing/trino-tests/pom.xml b/testing/trino-tests/pom.xml index 58976c1f1b5a..364160f4753b 100644 --- a/testing/trino-tests/pom.xml +++ b/testing/trino-tests/pom.xml @@ -33,11 +33,6 @@ test-jar - - io.trino - trino-parser - - io.trino trino-plugin-toolkit diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedEngineOnlyQueries.java b/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedEngineOnlyQueries.java index 4ee632bfdd6f..8cef935e1db4 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedEngineOnlyQueries.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedEngineOnlyQueries.java @@ -14,29 +14,18 @@ package io.trino.tests; import com.google.common.collect.ImmutableMap; -import io.trino.Session; import io.trino.connector.MockConnectorFactory; import io.trino.connector.MockConnectorPlugin; +import io.trino.testing.AbstractDistributedEngineOnlyQueries; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; -import org.intellij.lang.annotations.Language; -import org.testng.annotations.Test; - -import java.time.ZonedDateTime; -import java.util.regex.Pattern; import static io.airlift.testing.Closeables.closeAllSuppress; -import static io.trino.FeaturesConfig.JoinDistributionType.BROADCAST; -import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; import static io.trino.plugin.memory.MemoryQueryRunner.createMemoryQueryRunner; -import static io.trino.testing.assertions.Assert.assertEventually; -import static io.trino.testing.sql.TestTable.randomTableSuffix; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestDistributedEngineOnlyQueries - extends AbstractTestEngineOnlyQueries + extends AbstractDistributedEngineOnlyQueries { @Override protected QueryRunner createQueryRunner() @@ -55,265 +44,4 @@ protected QueryRunner createQueryRunner() } return queryRunner; } - - /** - * Ensure the tests are run with {@link io.trino.testing.DistributedQueryRunner}. E.g. {@link io.trino.testing.LocalQueryRunner} takes some - * shortcuts, not exercising certain aspects. - */ - @Test - public void ensureDistributedQueryRunner() - { - assertThat(getQueryRunner().getNodeCount()).as("query runner node count") - .isGreaterThanOrEqualTo(3); - } - - @Test - public void testTimestampWithTimeZoneLiteralsWithDifferentZone() - { - assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123 Europe/Warsaw'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123+01:00[Europe/Warsaw]")); - assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123 Europe/Paris'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123+01:00[Europe/Paris]")); - - assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456 Europe/Warsaw'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123456+01:00[Europe/Warsaw]")); - assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456 Europe/Paris'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123456+01:00[Europe/Paris]")); - - assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456789 Europe/Warsaw'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123456789+01:00[Europe/Warsaw]")); - assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456789 Europe/Paris'").getOnlyValue()).isEqualTo(ZonedDateTime.parse("2017-01-02T09:12:34.123456789+01:00[Europe/Paris]")); - - assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456789012 Europe/Warsaw'").getOnlyValue()).isEqualTo("2017-01-02 09:12:34.123456789012 Europe/Warsaw"); - assertThat(getQueryRunner().execute("SELECT TIMESTAMP '2017-01-02 09:12:34.123456789012 Europe/Paris'").getOnlyValue()).isEqualTo("2017-01-02 09:12:34.123456789012 Europe/Paris"); - } - - @Test - public void testUse() - { - assertQueryFails("USE invalid.xyz", "Catalog does not exist: invalid"); - assertQueryFails("USE tpch.invalid", "Schema does not exist: tpch.invalid"); - } - - @Test - public void testRoles() - { - Session invalid = Session.builder(getSession()).setCatalog("invalid").build(); - assertQueryFails(invalid, "CREATE ROLE test", "System roles are not enabled"); - assertQueryFails(invalid, "CREATE ROLE test", "System roles are not enabled"); - assertQueryFails(invalid, "DROP ROLE test", "line 1:1: Role 'test' does not exist"); - assertQueryFails(invalid, "GRANT test TO USER foo", "line 1:1: Role 'test' does not exist"); - assertQueryFails(invalid, "REVOKE test FROM USER foo", "line 1:1: Role 'test' does not exist"); - assertQueryFails(invalid, "SET ROLE test", "line 1:1: Role 'test' does not exist"); - - assertQueryFails(invalid, "CREATE ROLE test IN invalid", "line 1:1: Catalog 'invalid' does not exist"); - assertQueryFails(invalid, "DROP ROLE test IN invalid", "line 1:1: Catalog 'invalid' does not exist"); - assertQueryFails(invalid, "GRANT test TO USER foo IN invalid", "line 1:1: Catalog 'invalid' does not exist"); - assertQueryFails(invalid, "REVOKE test FROM USER foo IN invalid", "line 1:1: Catalog 'invalid' does not exist"); - assertQueryFails(invalid, "SET ROLE test IN invalid", "line 1:1: Catalog 'invalid' does not exist"); - } - - @Test - public void testDuplicatedRowCreateTable() - { - assertQueryFails("CREATE TABLE test (a integer, a integer)", - "line 1:31: Column name 'a' specified more than once"); - assertQueryFails("CREATE TABLE test (a integer, orderkey integer, LIKE orders INCLUDING PROPERTIES)", - "line 1:49: Column name 'orderkey' specified more than once"); - - assertQueryFails("CREATE TABLE test (a integer, A integer)", - "line 1:31: Column name 'A' specified more than once"); - assertQueryFails("CREATE TABLE test (a integer, OrderKey integer, LIKE orders INCLUDING PROPERTIES)", - "line 1:49: Column name 'orderkey' specified more than once"); - } - - @Test - public void testTooLongQuery() - { - // Generate a super-long query: SELECT x,x,x,x,x,... FROM (VALUES 1,2,3,4,5) t(x) - @Language("SQL") String longQuery = "SELECT x" + ",x".repeat(500_000) + " FROM (VALUES 1,2,3,4,5) t(x)"; - assertQueryFails(longQuery, "Query text length \\(1000037\\) exceeds the maximum length \\(1000000\\)"); - } - - @Test - public void testTooManyStages() - { - @Language("SQL") String query = "WITH\n" + - " t1 AS (SELECT nationkey AS x FROM nation where name='UNITED STATES'),\n" + - " t2 AS (SELECT a.x+b.x+c.x+d.x AS x FROM t1 a, t1 b, t1 c, t1 d),\n" + - " t3 AS (SELECT a.x+b.x+c.x+d.x AS x FROM t2 a, t2 b, t2 c, t2 d),\n" + - " t4 AS (SELECT a.x+b.x+c.x+d.x AS x FROM t3 a, t3 b, t3 c, t3 d),\n" + - " t5 AS (SELECT a.x+b.x+c.x+d.x AS x FROM t4 a, t4 b, t4 c, t4 d)\n" + - "SELECT x FROM t5\n"; - assertQueryFails(query, "Number of stages in the query \\([0-9]+\\) exceeds the allowed maximum \\([0-9]+\\).*"); - } - - @Test - public void testRowSubscriptWithReservedKeyword() - { - // Subscript over field named after reserved keyword. This test needs to run in distributed - // mode, as it uncovers a problem during deserialization plan expressions - assertQuery( - "SELECT cast(row(1) AS row(\"cross\" bigint))[1]", - "VALUES 1"); - } - - @Test - public void testRowTypeWithReservedKeyword() - { - // This test is here because it only reproduces the issue (https://github.com/trinodb/trino/issues/1962) - // when running in distributed mode - assertQuery( - "SELECT cast(row(1) AS row(\"cross\" bigint)).\"cross\"", - "VALUES 1"); - } - - @Test - public void testExplain() - { - assertExplain( - "explain select name from nation where abs(nationkey) = 22", - Pattern.quote("abs(\"nationkey\")"), - "Estimates: \\{rows: .* \\(.*\\), cpu: .*, memory: .*, network: .*}"); - } - - // explain analyze can only run on coordinator - @Test - public void testExplainAnalyze() - { - assertExplainAnalyze( - noJoinReordering(BROADCAST), - "EXPLAIN ANALYZE SELECT * FROM (SELECT nationkey, regionkey FROM nation GROUP BY nationkey, regionkey) a, nation b WHERE a.regionkey = b.regionkey"); - assertExplainAnalyze( - "EXPLAIN ANALYZE SELECT * FROM nation a, nation b WHERE a.nationkey = b.nationkey", - "Left \\(probe\\) Input avg\\.: .* rows, Input std\\.dev\\.: .*", - "Right \\(build\\) Input avg\\.: .* rows, Input std\\.dev\\.: .*", - "Collisions avg\\.: .* \\(.* est\\.\\), Collisions std\\.dev\\.: .*"); - assertExplainAnalyze( - Session.builder(getSession()) - .setSystemProperty(ENABLE_DYNAMIC_FILTERING, "false") - .build(), - "EXPLAIN ANALYZE SELECT * FROM nation a, nation b WHERE a.nationkey = b.nationkey", - "Left \\(probe\\) Input avg\\.: .* rows, Input std\\.dev\\.: .*", - "Right \\(build\\) Input avg\\.: .* rows, Input std\\.dev\\.: .*", - "Collisions avg\\.: .* \\(.* est\\.\\), Collisions std\\.dev\\.: .*"); - - // ExplainAnalyzeOperator may finish before dynamic filter stats are reported to QueryInfo - assertEventually(() -> assertExplainAnalyze( - "EXPLAIN ANALYZE SELECT * FROM nation a, nation b WHERE a.nationkey = b.nationkey", - "Dynamic filters: \n.*ranges=25, \\{\\[0], ..., \\[24]}.* collection time=\\d+.*")); - - assertExplainAnalyze( - "EXPLAIN ANALYZE SELECT nationkey FROM nation GROUP BY nationkey", - "Collisions avg\\.: .* \\(.* est\\.\\), Collisions std\\.dev\\.: .*"); - - assertExplainAnalyze( - "EXPLAIN ANALYZE SELECT * FROM nation a, nation b WHERE a.nationkey = b.nationkey", - "Estimates: \\{rows: .* \\(.*\\), cpu: .*, memory: .*, network: .*}"); - } - - @Test - public void testExplainAnalyzeVerbose() - { - assertExplainAnalyze( - "EXPLAIN ANALYZE VERBOSE SELECT * FROM nation a", - "'Input distribution' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}"); - } - - @Test - public void testInsertWithCoercion() - { - String tableName = "test_insert_with_coercion_" + randomTableSuffix(); - - assertUpdate("CREATE TABLE " + tableName + " (" + - "tinyint_column tinyint, " + - "integer_column integer, " + - "decimal_column decimal(5, 3), " + - "real_column real, " + - "char_column char(3), " + - "bounded_varchar_column varchar(3), " + - "unbounded_varchar_column varchar, " + - "date_column date)"); - - assertUpdate("INSERT INTO " + tableName + " (tinyint_column, integer_column, decimal_column, real_column) VALUES (1e0, 2e0, 3e0, 4e0)", 1); - assertUpdate("INSERT INTO " + tableName + " (char_column, bounded_varchar_column, unbounded_varchar_column) VALUES (VARCHAR 'aa ', VARCHAR 'aa ', VARCHAR 'aa ')", 1); - assertUpdate("INSERT INTO " + tableName + " (char_column, bounded_varchar_column, unbounded_varchar_column) VALUES (NULL, NULL, NULL)", 1); - assertUpdate("INSERT INTO " + tableName + " (char_column, bounded_varchar_column, unbounded_varchar_column) VALUES (CAST(NULL AS varchar), CAST(NULL AS varchar), CAST(NULL AS varchar))", 1); - assertUpdate("INSERT INTO " + tableName + " (date_column) VALUES (TIMESTAMP '2019-11-18 22:13:40')", 1); - - assertQuery( - "SELECT * FROM " + tableName, - "VALUES " + - "(1, 2, 3, 4, NULL, NULL, NULL, NULL), " + - "(NULL, NULL, NULL, NULL, 'aa ', 'aa ', 'aa ', NULL), " + - "(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL), " + - "(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL), " + - "(NULL, NULL, NULL, NULL, NULL, NULL, NULL, DATE '2019-11-18')"); - - assertQueryFails("INSERT INTO " + tableName + " (integer_column) VALUES (3e9)", "Out of range for integer: 3.0E9"); - assertQueryFails("INSERT INTO " + tableName + " (char_column) VALUES ('abcd')", "\\QCannot truncate non-space characters when casting from varchar(4) to char(3) on INSERT"); - assertQueryFails("INSERT INTO " + tableName + " (bounded_varchar_column) VALUES ('abcd')", "\\QCannot truncate non-space characters when casting from varchar(4) to varchar(3) on INSERT"); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - public void testCreateTableAsTable() - { - // Ensure CTA works when the table exposes hidden fields - // First, verify that the table 'nation' contains the expected hidden column 'row_number' - assertThat(query("SELECT count(*) FROM information_schema.columns " + - "WHERE table_catalog = 'tpch' and table_schema = 'tiny' and table_name = 'nation' and column_name = 'row_number'")) - .matches("VALUES BIGINT '0'"); - assertThat(query("SELECT min(row_number) FROM tpch.tiny.nation")) - .matches("VALUES BIGINT '0'"); - - assertUpdate(getSession(), "CREATE TABLE n AS TABLE tpch.tiny.nation", 25); - assertThat(query("SELECT * FROM n")) - .matches("SELECT * FROM tpch.tiny.nation"); - - // Verify that hidden column is not present in the created table - assertThatThrownBy(() -> query("SELECT min(row_number) FROM n")) - .hasMessage("line 1:12: Column 'row_number' cannot be resolved"); - assertUpdate(getSession(), "DROP TABLE n"); - } - - @Test - public void testInsertTableIntoTable() - { - // Ensure INSERT works when the source table exposes hidden fields - // First, verify that the table 'nation' contains the expected hidden column 'row_number' - assertThat(query("SELECT count(*) FROM information_schema.columns " + - "WHERE table_catalog = 'tpch' and table_schema = 'tiny' and table_name = 'nation' and column_name = 'row_number'")) - .matches("VALUES BIGINT '0'"); - assertThat(query("SELECT min(row_number) FROM tpch.tiny.nation")) - .matches("VALUES BIGINT '0'"); - - // Create empty target table for INSERT - assertUpdate(getSession(), "CREATE TABLE n AS TABLE tpch.tiny.nation WITH NO DATA", 0); - assertThat(query("SELECT * FROM n")) - .matches("SELECT * FROM tpch.tiny.nation LIMIT 0"); - - // Verify that the hidden column is not present in the created table - assertThatThrownBy(() -> query("SELECT row_number FROM n")) - .hasMessage("line 1:8: Column 'row_number' cannot be resolved"); - - // Insert values from the original table into the created table - assertUpdate(getSession(), "INSERT INTO n TABLE tpch.tiny.nation", 25); - assertThat(query("SELECT * FROM n")) - .matches("SELECT * FROM tpch.tiny.nation"); - - assertUpdate(getSession(), "DROP TABLE n"); - } - - @Test - public void testImplicitCastToRowWithFieldsRequiringDelimitation() - { - // source table uses char(4) as ROW fields - assertUpdate("CREATE TABLE source_table(r ROW(a char(4), b char(4)))"); - - // target table uses varchar as ROW fields which will enforce implicit CAST on INSERT - // field names in target table require delimitation - // - "a b" has whitespace - // - "from" is a reserved key word - assertUpdate("CREATE TABLE target_table(r ROW(\"a b\" varchar, \"from\" varchar))"); - - // run INSERT to verify that field names in generated CAST expressions are properly delimited - assertUpdate("INSERT INTO target_table SELECT * from source_table", 0); - } } diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java b/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java new file mode 100644 index 000000000000..7e3d6a5b5685 --- /dev/null +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests; + +import com.google.common.collect.ImmutableMap; +import io.trino.connector.MockConnectorFactory; +import io.trino.connector.MockConnectorPlugin; +import io.trino.plugin.memory.MemoryQueryRunner; +import io.trino.testing.AbstractDistributedEngineOnlyQueries; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.FaultTolerantExecutionConnectorTestHelper; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; + +import static io.airlift.testing.Closeables.closeAllSuppress; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestDistributedFaultTolerantEngineOnlyQueries + extends AbstractDistributedEngineOnlyQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + ImmutableMap exchangeManagerProperties = ImmutableMap.builder() + .put("exchange.base-directory", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager") + .buildOrThrow(); + + DistributedQueryRunner queryRunner = MemoryQueryRunner.builder() + .setExchangeManagerProperties(exchangeManagerProperties) + .setExtraProperties(FaultTolerantExecutionConnectorTestHelper.getExtraProperties()) + .setInitialTables(TpchTable.getTables()) + .build(); + + queryRunner.getCoordinator().getSessionPropertyManager().addSystemSessionProperties(TEST_SYSTEM_PROPERTIES); + try { + queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder() + .withSessionProperties(TEST_CATALOG_PROPERTIES) + .build())); + queryRunner.createCatalog(TESTING_CATALOG, "mock"); + } + catch (RuntimeException e) { + throw closeAllSuppress(e, queryRunner); + } + return queryRunner; + } + + @Override + public void testExplainAnalyzeDynamicFilterInfo() + { + // dynamic filters are disabled + String result = (String) computeActual("EXPLAIN ANALYZE SELECT * FROM nation a, nation b WHERE a.nationkey = b.nationkey").getOnlyValue(); + assertThat(result).doesNotContainPattern("Dynamic filters:.*"); + } +} diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestLocalEngineOnlyQueries.java b/testing/trino-tests/src/test/java/io/trino/tests/TestLocalEngineOnlyQueries.java index 6cb647b91ac1..e33cea6524eb 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestLocalEngineOnlyQueries.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestLocalEngineOnlyQueries.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.connector.MockConnectorFactory; +import io.trino.testing.AbstractTestEngineOnlyQueries; import io.trino.testing.LocalQueryRunner; import io.trino.testing.QueryRunner; import org.testng.SkipException;