Skip to content

Commit 39100a9

Browse files
committed
Properly handle projection in MockConnector
1 parent 754846c commit 39100a9

File tree

2 files changed

+34
-14
lines changed

2 files changed

+34
-14
lines changed

core/trino-main/src/test/java/io/trino/connector/MockConnector.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.connector;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import com.google.common.collect.ImmutableMap;
1718
import io.airlift.slice.Slice;
1819
import io.trino.spi.HostAddress;
1920
import io.trino.spi.Page;
@@ -94,6 +95,9 @@
9495
public class MockConnector
9596
implements Connector
9697
{
98+
private static final String DELETE_ROW_ID = "delete_row_id";
99+
private static final String UPDATE_ROW_ID = "update_row_id";
100+
97101
private final Function<ConnectorSession, List<String>> listSchemaNames;
98102
private final BiFunction<ConnectorSession, String, List<SchemaTableName>> listTables;
99103
private final Optional<BiFunction<ConnectorSession, SchemaTablePrefix, Stream<TableColumnsMetadata>>> streamTableColumns;
@@ -474,7 +478,7 @@ public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHan
474478
@Override
475479
public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns)
476480
{
477-
return new MockConnectorColumnHandle("update_row_id", BIGINT);
481+
return new MockConnectorColumnHandle(UPDATE_ROW_ID, BIGINT);
478482
}
479483

480484
@Override
@@ -486,7 +490,7 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
486490
@Override
487491
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
488492
{
489-
return new MockConnectorColumnHandle("delete_row_id", BIGINT);
493+
return new MockConnectorColumnHandle(DELETE_ROW_ID, BIGINT);
490494
}
491495

492496
@Override
@@ -513,7 +517,10 @@ public Set<String> listRoles(ConnectorSession session)
513517
@Override
514518
public Set<RoleGrant> listRoleGrants(ConnectorSession session, TrinoPrincipal principal)
515519
{
516-
return roleGrants.apply(session, Optional.empty(), Optional.empty(), OptionalLong.empty()).stream().filter(grant -> grant.getGrantee().equals(principal)).collect(toImmutableSet());
520+
return roleGrants.apply(session, Optional.empty(), Optional.empty(), OptionalLong.empty())
521+
.stream()
522+
.filter(grant -> grant.getGrantee().equals(principal))
523+
.collect(toImmutableSet());
517524
}
518525

519526
@Override
@@ -602,32 +609,42 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
602609
{
603610
MockConnectorTableHandle handle = (MockConnectorTableHandle) table;
604611
SchemaTableName tableName = handle.getTableName();
605-
Set<String> projection = columns.stream()
612+
List<MockConnectorColumnHandle> projection = columns.stream()
606613
.map(MockConnectorColumnHandle.class::cast)
607-
.map(MockConnectorColumnHandle::getName)
608-
.collect(toImmutableSet());
614+
.collect(toImmutableList());
609615
List<Type> types = columns.stream()
610616
.map(MockConnectorColumnHandle.class::cast)
611617
.map(MockConnectorColumnHandle::getType)
612618
.collect(toImmutableList());
613-
List<String> columnNames = getColumns.apply(tableName).stream()
614-
.map(ColumnMetadata::getName)
615-
.collect(toImmutableList());
619+
Map<String, Integer> columnIndexes = getColumnIndexes(tableName);
616620
List<List<?>> records = data.apply(tableName).stream()
617621
.map(record -> {
618622
ImmutableList.Builder<Object> projectedRow = ImmutableList.builder();
619-
for (int columnIndex = 0; columnIndex < columnNames.size(); columnIndex++) {
620-
if (projection.contains(columnNames.get(columnIndex))) {
621-
projectedRow.add(record.get(columnIndex));
623+
for (MockConnectorColumnHandle column : projection) {
624+
String columnName = column.getName();
625+
if (columnName.equals(DELETE_ROW_ID) || columnName.equals(UPDATE_ROW_ID)) {
626+
projectedRow.add(0);
627+
continue;
622628
}
629+
Integer index = columnIndexes.get(columnName);
630+
requireNonNull(index, "index is null");
631+
projectedRow.add(record.get(index));
623632
}
624-
// produce value for update_row_id or delete_row_id columns, needed in case of UPDATE and DELETE
625-
projectedRow.add(0);
626633
return projectedRow.build();
627634
})
628635
.collect(toImmutableList());
629636
return new MockConnectorPageSource(new RecordPageSource(new InMemoryRecordSet(types, records)));
630637
}
638+
639+
private Map<String, Integer> getColumnIndexes(SchemaTableName tableName)
640+
{
641+
ImmutableMap.Builder<String, Integer> columnIndexes = ImmutableMap.builder();
642+
List<ColumnMetadata> columnMetadata = getColumns.apply(tableName);
643+
for (int index = 0; index < columnMetadata.size(); index++) {
644+
columnIndexes.put(columnMetadata.get(index).getName(), index);
645+
}
646+
return columnIndexes.build();
647+
}
631648
}
632649

633650
public enum MockConnectorSplit

testing/trino-tests/src/test/java/io/trino/tests/TestMockConnector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ public void testDataGeneration()
129129
assertQuery(
130130
"SELECT nationkey, regionkey FROM mock.default.nation",
131131
"SELECT nationkey, regionkey FROM nation");
132+
assertQuery(
133+
"SELECT regionkey, nationkey FROM mock.default.nation",
134+
"SELECT regionkey, nationkey FROM nation");
132135
assertQuery(
133136
"SELECT regionkey FROM mock.default.nation",
134137
"SELECT regionkey FROM nation");

0 commit comments

Comments
 (0)