Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorNam
new SystemConnector(
nodeManager,
systemTablesProvider,
transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogHandle)));
transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogHandle),
accessControl,
catalogHandle.getCatalogName().toString()));
Comment thread
lukasz-stec marked this conversation as resolved.

return new CatalogConnector(
catalogHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.connector.system;

import io.trino.metadata.InternalNodeManager;
import io.trino.security.AccessControl;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -38,15 +39,19 @@ public class SystemConnector
public SystemConnector(
InternalNodeManager nodeManager,
SystemTablesProvider tables,
Function<TransactionId, ConnectorTransactionHandle> transactionHandleFunction)
Function<TransactionId, ConnectorTransactionHandle> transactionHandleFunction,
AccessControl accessControl,
String catalogName)
{
requireNonNull(nodeManager, "nodeManager is null");
requireNonNull(tables, "tables is null");
requireNonNull(transactionHandleFunction, "transactionHandleFunction is null");
requireNonNull(accessControl, "accessControl is null");
requireNonNull(catalogName, "catalogName is null");

this.metadata = new SystemTablesMetadata(tables);
this.splitManager = new SystemSplitManager(nodeManager, tables);
this.pageSourceProvider = new SystemPageSourceProvider(tables);
this.pageSourceProvider = new SystemPageSourceProvider(tables, accessControl, catalogName);
this.transactionHandleFunction = transactionHandleFunction;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.FullConnectorSession;
import io.trino.plugin.base.MappedPageSource;
import io.trino.plugin.base.MappedRecordSet;
import io.trino.security.AccessControl;
import io.trino.security.InjectedConnectorAccessControl;
import io.trino.security.SecurityContext;
import io.trino.spi.QueryId;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -44,17 +50,22 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
import static io.trino.spi.connector.SystemTable.Distribution.ALL_NODES;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class SystemPageSourceProvider
implements ConnectorPageSourceProvider
{
private final SystemTablesProvider tables;
private final AccessControl accessControl;
private final String catalogName;

public SystemPageSourceProvider(SystemTablesProvider tables)
public SystemPageSourceProvider(SystemTablesProvider tables, AccessControl accessControl, String catalogName)
{
this.tables = requireNonNull(tables, "tables is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null");
}

@Override
Expand Down Expand Up @@ -105,8 +116,31 @@ public ConnectorPageSource createPageSource(
TupleDomain<Integer> newConstraint = systemSplit.getConstraint().transformKeys(columnHandle ->
columnsByName.get(((SystemColumnHandle) columnHandle).columnName()));

ConnectorAccessControl accessControl1 = new InjectedConnectorAccessControl(
accessControl,
new SecurityContext(
systemTransaction.getTransactionId(),
((FullConnectorSession) session).getSession().getIdentity(),
QueryId.valueOf(session.getQueryId()),
session.getStart()),
catalogName);
try {
return new MappedPageSource(systemTable.pageSource(systemTransaction.getConnectorTransactionHandle(), session, newConstraint), userToSystemFieldIndex.build());
// Do not pass access control for tables that execute on workers
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why so? I think we should put pass it too. Workers have access control, which could be different than on coordinator but still.

Or maybe we could pass DenyAll* with the error message that it is not yet supported on worker.

Also I think we should not care here about the access control. It is not the proper layer. The thing is in the guice context, and it is used in other places so I think we are fine to use it too here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why so? I think we should put pass it too. Workers have access control, which could be different than on coordinator but still.

  1. I wanted to be on the safe side in terms of security. Even though access control is available on workers, people normally don't configure it because it is almost not used.
    The only place I know it is used is ManagementAuthorizationFilter. Allowing access control on workers would be a subtle security gap, thus, because the system table creator could expect the same access control everywhere, and that may not be true in practice, and I don't think we can defend against that.
  2. Currently, table functions allow access control only on the coordinator during analysis, so allowing only coordinator for system tables makes sense from the consistency perspective.
  3. System tables that need to use access control are usually working on metadata available only on coordinator, so allowing this on workers will bring minimal to no benefit.

Or maybe we could pass DenyAll* with the error message that it is not yet supported on worker.

Effectively, this works like that because UnsupportedOperationException will be thrown in when the incorrect api is used. I think that is better because it makes the SystemTable implementation less likely to make a mistake.

Also I think we should not care here about the access control. It is not the proper layer. The thing is in the guice context, and it is used in other places so I think we are fine to use it too here.

I'm sorry, I don't understand this comment. Where should we not care about access control? in the SystemPageSourceProvider?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I don't understand this comment. Where should we not care about access control? in the SystemPageSourceProvider?

I believe that is a guice context responsiblity to provide proper implementation of access control. I access control is incorrect on worker, it should not be bounded in guice. If worker is using only part of the of the access control, then it should maybe use something else. My point is that here we are a dealing with a bigger design problem. Basically, I believe no access control should be available on the worker. However it is a preexisting issue that goes well beyond this PR.

if (systemTable.getDistribution().equals(ALL_NODES)) {
return new MappedPageSource(
systemTable.pageSource(
systemTransaction.getConnectorTransactionHandle(),
session,
newConstraint),
userToSystemFieldIndex.build());
}
return new MappedPageSource(
systemTable.pageSource(
systemTransaction.getConnectorTransactionHandle(),
session,
newConstraint,
accessControl1),
userToSystemFieldIndex.build());
}
catch (UnsupportedOperationException e) {
return new RecordPageSource(new MappedRecordSet(
Expand All @@ -116,7 +150,8 @@ public ConnectorPageSource createPageSource(
session,
newConstraint,
requiredColumns.build(),
systemSplit),
systemSplit,
accessControl1),
userToSystemFieldIndex.build()));
}
}
Expand All @@ -127,7 +162,8 @@ private static RecordSet toRecordSet(
ConnectorSession session,
TupleDomain<Integer> constraint,
Set<Integer> requiredColumns,
ConnectorSplit split)
ConnectorSplit split,
ConnectorAccessControl accessControl1)
{
return new RecordSet()
{
Expand All @@ -144,7 +180,11 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return table.cursor(sourceTransaction, session, constraint, requiredColumns, split);
// Do not pass access control for tables that execute on workers
if (table.getDistribution().equals(ALL_NODES)) {
return table.cursor(sourceTransaction, session, constraint, requiredColumns, split);
}
return table.cursor(sourceTransaction, session, constraint, requiredColumns, split, accessControl1);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ default RecordCursor cursor(
return cursor(transactionHandle, session, constraint);
}

default RecordCursor cursor(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
TupleDomain<Integer> constraint,
Set<Integer> requiredColumns,
ConnectorSplit split,
ConnectorAccessControl accessControl)
{
return cursor(transactionHandle, session, constraint, requiredColumns, split);
}

/**
* Create a page source for the data in this table.
*
Expand All @@ -64,6 +75,15 @@ default ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHan
throw new UnsupportedOperationException();
}

default ConnectorPageSource pageSource(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
TupleDomain<Integer> constraint,
ConnectorAccessControl accessControl)
{
return pageSource(transactionHandle, session, constraint);
}

default Optional<ConnectorSplitSource> splitSource(ConnectorSession connectorSession, TupleDomain<ColumnHandle> constraint)
{
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.inject.Inject;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
Expand Down Expand Up @@ -81,6 +82,20 @@ public RecordCursor cursor(
}
}

@Override
public RecordCursor cursor(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
TupleDomain<Integer> constraint,
Set<Integer> requiredColumns,
ConnectorSplit split,
ConnectorAccessControl accessControl)
{
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
return delegate.cursor(transactionHandle, session, constraint, requiredColumns, split, accessControl);
}
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
Expand All @@ -89,6 +104,18 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
}
}

@Override
public ConnectorPageSource pageSource(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
TupleDomain<Integer> constraint,
ConnectorAccessControl accessControl)
{
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
return delegate.pageSource(transactionHandle, session, constraint, accessControl);
}
}

@Override
public Optional<ConnectorSplitSource> splitSource(ConnectorSession connectorSession, TupleDomain<ColumnHandle> constraint)
{
Expand Down