Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,27 @@
package io.trino.plugin.memory;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.type.Type;

import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public record ColumnInfo(ColumnHandle handle, String name, Type type, boolean nullable, Optional<String> comment)
public record ColumnInfo(MemoryColumnHandle handle, boolean nullable, Optional<String> comment)
{
public ColumnInfo
{
requireNonNull(handle, "handle is null");
requireNonNull(name, "name is null");
requireNonNull(type, "type is null");
requireNonNull(comment, "comment is null");
}

@JsonIgnore
public ColumnMetadata getMetadata()
{
return ColumnMetadata.builder()
.setName(name)
.setType(type)
.setName(handle.name())
.setType(handle.type())
.setNullable(nullable)
.setComment(comment)
.build();
Expand All @@ -48,8 +44,8 @@ public ColumnMetadata getMetadata()
public String toString()
{
return toStringHelper(this)
.add("name", name)
.add("type", type)
.add("name", handle.name())
.add("type", handle.type())
.add("nullable", nullable)
.add("comment", comment)
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.type.Type;

public record MemoryColumnHandle(int columnIndex, Type type)
import static java.util.Objects.requireNonNull;

public record MemoryColumnHandle(int columnIndex, String name, Type type)
implements ColumnHandle
{
public MemoryColumnHandle
{
requireNonNull(name, "name is null");
requireNonNull(type, "type is null");
}

@Override
public String toString()
{
return Integer.toString(columnIndex);
return name + ":" + type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public synchronized ConnectorTableHandle getTableHandle(ConnectorSession session
return null;
}

return new MemoryTableHandle(id, OptionalLong.empty(), OptionalDouble.empty());
return new MemoryTableHandle(id, schemaTableName, OptionalLong.empty(), OptionalDouble.empty());
}

@Override
Expand Down Expand Up @@ -243,9 +243,8 @@ public synchronized List<SchemaTableName> listTables(ConnectorSession session, O
public synchronized Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
MemoryTableHandle handle = (MemoryTableHandle) tableHandle;
return tables.get(handle.id())
.columns().stream()
.collect(toImmutableMap(ColumnInfo::name, ColumnInfo::handle));
return tables.get(handle.id()).columns().stream()
.collect(toImmutableMap(column -> column.handle().name(), ColumnInfo::handle));
}

@Override
Expand Down Expand Up @@ -348,7 +347,8 @@ public synchronized MemoryOutputTableHandle beginCreateTable(ConnectorSession se
ImmutableList.Builder<ColumnInfo> columns = ImmutableList.builder();
for (int i = 0; i < tableMetadata.getColumns().size(); i++) {
ColumnMetadata column = tableMetadata.getColumns().get(i);
columns.add(new ColumnInfo(new MemoryColumnHandle(i, column.getType()), column.getName(), column.getType(), column.isNullable(), Optional.ofNullable(column.getComment())));
MemoryColumnHandle handle = new MemoryColumnHandle(i, column.getName(), column.getType());
columns.add(new ColumnInfo(handle, column.isNullable(), Optional.ofNullable(column.getComment())));
}

tableIds.put(tableMetadata.getTable(), tableId);
Expand Down Expand Up @@ -446,9 +446,11 @@ public synchronized void addColumn(ConnectorSession session, ConnectorTableHandl
throw new TrinoException(NOT_SUPPORTED, format("Unable to add NOT NULL column '%s' for non-empty table: %s", column.getName(), table.getSchemaTableName()));
}

MemoryColumnHandle newColumn = new MemoryColumnHandle(table.columns().size(), column.getName(), column.getType());

List<ColumnInfo> columns = ImmutableList.<ColumnInfo>builderWithExpectedSize(table.columns().size() + 1)
.addAll(table.columns())
.add(new ColumnInfo(new MemoryColumnHandle(table.columns().size(), column.getType()), column.getName(), column.getType(), column.isNullable(), Optional.ofNullable(column.getComment())))
.add(new ColumnInfo(newColumn, column.isNullable(), Optional.ofNullable(column.getComment())))
.build();

tables.put(tableId, new TableInfo(tableId, table.schemaName(), table.tableName(), columns, table.truncated(), table.dataFragments(), table.comment()));
Expand All @@ -462,9 +464,11 @@ public synchronized void renameColumn(ConnectorSession session, ConnectorTableHa
long tableId = handle.id();
TableInfo table = tables.get(handle.id());

MemoryColumnHandle newColumn = new MemoryColumnHandle(column.columnIndex(), target, column.type());

List<ColumnInfo> columns = new ArrayList<>(table.columns());
ColumnInfo columnInfo = columns.get(column.columnIndex());
columns.set(column.columnIndex(), new ColumnInfo(columnInfo.handle(), target, columnInfo.type(), columnInfo.nullable(), columnInfo.comment()));
columns.set(column.columnIndex(), new ColumnInfo(newColumn, columnInfo.nullable(), columnInfo.comment()));

tables.put(tableId, new TableInfo(tableId, table.schemaName(), table.tableName(), ImmutableList.copyOf(columns), table.truncated(), table.dataFragments(), table.comment()));
}
Expand All @@ -479,7 +483,7 @@ public synchronized void dropNotNullConstraint(ConnectorSession session, Connect

List<ColumnInfo> columns = new ArrayList<>(table.columns());
ColumnInfo columnInfo = columns.get(column.columnIndex());
columns.set(column.columnIndex(), new ColumnInfo(columnInfo.handle(), columnInfo.name(), columnInfo.type(), true, columnInfo.comment()));
columns.set(column.columnIndex(), new ColumnInfo(columnInfo.handle(), true, columnInfo.comment()));

tables.put(tableId, new TableInfo(tableId, table.schemaName(), table.tableName(), ImmutableList.copyOf(columns), table.truncated(), table.dataFragments(), table.comment()));
}
Expand Down Expand Up @@ -626,10 +630,7 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
return Optional.empty();
}

return Optional.of(new LimitApplicationResult<>(
new MemoryTableHandle(table.id(), OptionalLong.of(limit), OptionalDouble.empty()),
true,
true));
return Optional.of(new LimitApplicationResult<>(table.withLimit(limit), true, true));
}

@Override
Expand All @@ -641,9 +642,8 @@ public Optional<SampleApplicationResult<ConnectorTableHandle>> applySample(Conne
return Optional.empty();
}

return Optional.of(new SampleApplicationResult<>(
new MemoryTableHandle(table.id(), table.limit(), OptionalDouble.of(table.sampleRatio().orElse(1) * sampleRatio)),
true));
double newRatio = table.sampleRatio().orElse(1) * sampleRatio;
return Optional.of(new SampleApplicationResult<>(table.withSampleRatio(newRatio), true));
}

@Override
Expand All @@ -667,18 +667,12 @@ public synchronized void setColumnComment(ConnectorSession session, ConnectorTab
MemoryTableHandle table = (MemoryTableHandle) tableHandle;
TableInfo info = tables.get(table.id());
checkArgument(info != null, "Table not found");
tables.put(
table.id(),
new TableInfo(
table.id(),
info.schemaName(),
info.tableName(),
info.columns().stream()
.map(tableColumn -> Objects.equals(tableColumn.handle(), columnHandle) ? new ColumnInfo(tableColumn.handle(), tableColumn.name(), tableColumn.getMetadata().getType(), tableColumn.nullable(), comment) : tableColumn)
.collect(toImmutableList()),
info.truncated(),
info.dataFragments(),
info.comment()));
List<ColumnInfo> newColumns = info.columns().stream()
.map(column -> column.handle().equals(columnHandle)
? new ColumnInfo(column.handle(), column.nullable(), comment)
: column)
.collect(toImmutableList());
tables.put(table.id(), new TableInfo(table.id(), info.schemaName(), info.tableName(), newColumns, info.truncated(), info.dataFragments(), info.comment()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.memory;

import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;

import java.util.OptionalDouble;
import java.util.OptionalLong;
Expand All @@ -22,12 +23,14 @@

public record MemoryTableHandle(
long id,
SchemaTableName name,
OptionalLong limit,
OptionalDouble sampleRatio)
implements ConnectorTableHandle
{
public MemoryTableHandle
{
requireNonNull(name, "name is null");
requireNonNull(limit, "limit is null");
requireNonNull(sampleRatio, "sampleRatio is null");
}
Expand All @@ -36,9 +39,19 @@ public record MemoryTableHandle(
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append(id);
limit.ifPresent(value -> builder.append("(limit:" + value + ")"));
sampleRatio.ifPresent(value -> builder.append("(sampleRatio:" + value + ")"));
builder.append(name);
limit.ifPresent(value -> builder.append(" limit=").append(value));
sampleRatio.ifPresent(value -> builder.append(" sampleRatio=").append(value));
return builder.toString();
}

public MemoryTableHandle withLimit(long limit)
{
return new MemoryTableHandle(id, name, OptionalLong.of(limit), sampleRatio);
}

public MemoryTableHandle withSampleRatio(double sampleRatio)
{
return new MemoryTableHandle(id, name, limit, OptionalDouble.of(sampleRatio));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ public ConnectorTableMetadata getMetadata()
@JsonIgnore
public ColumnInfo getColumn(ColumnHandle handle)
{
int columnIndex = ((MemoryColumnHandle) handle).columnIndex();
return columns.stream()
.filter(column -> column.handle().equals(handle))
.filter(column -> column.handle().columnIndex() == columnIndex)
.collect(onlyElement());
}
}