Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;

Expand All @@ -45,7 +46,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
List<HostAddress> localAddress = ImmutableList.of(nodeManager.getCurrentNode().getHostAndPort());
ConnectorSplit split = new InformationSchemaSplit(localAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.SystemTable;
Expand Down Expand Up @@ -58,10 +59,11 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle tableHandle,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
SystemTableHandle table = (SystemTableHandle) tableHandle;
TupleDomain<ColumnHandle> constraint = table.getConstraint();
TupleDomain<ColumnHandle> tableConstraint = table.getConstraint();

SystemTable systemTable = tables.getSystemTable(session, table.getSchemaTableName())
// table might disappear in the meantime
Expand All @@ -70,7 +72,7 @@ public ConnectorSplitSource getSplits(
Distribution tableDistributionMode = systemTable.getDistribution();
if (tableDistributionMode == SINGLE_COORDINATOR) {
HostAddress address = nodeManager.getCurrentNode().getHostAndPort();
ConnectorSplit split = new SystemSplit(address, constraint);
ConnectorSplit split = new SystemSplit(address, tableConstraint);
return new FixedSplitSource(ImmutableList.of(split));
}

Expand All @@ -84,7 +86,7 @@ else if (tableDistributionMode == ALL_NODES) {
}
Set<InternalNode> nodeSet = nodes.build();
for (InternalNode node : nodeSet) {
splits.add(new SystemSplit(node.getHostAndPort(), constraint));
splits.add(new SystemSplit(node.getHostAndPort(), tableConstraint));
}
return new FixedSplitSource(splits.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,13 @@ public ConnectorSplitManager getSplitManager()
return new ConnectorSplitManager()
{
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy, DynamicFilter dynamicFilter)
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter,
Constraint constraint)
{
return new FixedSplitSource(ImmutableList.of(MOCK_CONNECTOR_SPLIT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@

public interface ConnectorSplitManager
{
@Deprecated
default ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
{
throw new UnsupportedOperationException();
}

default ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
Expand All @@ -34,7 +23,7 @@ default ConnectorSplitSource getSplits(
DynamicFilter dynamicFilter,
Constraint constraint)
{
return getSplits(transaction, session, table, splitSchedulingStrategy, dynamicFilter);
throw new UnsupportedOperationException();
}

enum SplitSchedulingStrategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,6 @@ public ClassLoaderSafeConnectorSplitManager(@ForClassLoaderSafe ConnectorSplitMa
this.classLoader = requireNonNull(classLoader, "classLoader is null");
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy, DynamicFilter dynamicFilter)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getSplits(transaction, session, table, splitSchedulingStrategy, dynamicFilter);
}
}

@Override
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.predicate.Domain;
Expand Down Expand Up @@ -58,7 +59,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle tableHandle,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
AccumuloTableHandle handle = (AccumuloTableHandle) tableHandle;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.predicate.Domain;
Expand Down Expand Up @@ -61,7 +62,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
AtopTableHandle tableHandle = (AtopTableHandle) table;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;

import javax.inject.Inject;
Expand All @@ -41,7 +42,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
return jdbcClient.getSplits(session, (JdbcTableHandle) table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -90,15 +91,16 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
log.debug("getSplits(transaction=%s, session=%s, table=%s, splitSchedulingStrategy=%s)", transaction, session, table, splitSchedulingStrategy);
BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) table;

TableId remoteTableId = bigQueryTableHandle.getRemoteTableName().toTableId();
int actualParallelism = parallelism.orElse(nodeManager.getRequiredWorkerNodes().size());
TupleDomain<ColumnHandle> constraint = bigQueryTableHandle.getConstraint();
Optional<String> filter = BigQueryFilterQueryBuilder.buildFilter(constraint);
TupleDomain<ColumnHandle> tableConstraint = bigQueryTableHandle.getConstraint();
Optional<String> filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint);
List<BigQuerySplit> splits = emptyProjectionIsRequired(bigQueryTableHandle.getProjectedColumns()) ?
createEmptyProjection(session, remoteTableId, actualParallelism, filter) :
readFromBigQuery(session, TableDefinition.Type.valueOf(bigQueryTableHandle.getType()), remoteTableId, bigQueryTableHandle.getProjectedColumns(), actualParallelism, filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;

Expand All @@ -32,7 +33,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
int splitCount = ((BlackHoleTableHandle) table).getSplitCount();
return new FixedSplitSource(nCopies(splitCount, BlackHoleSplit.INSTANCE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -76,7 +77,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle tableHandle,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
CassandraTableHandle cassandraTableHandle = (CassandraTableHandle) tableHandle;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void testGetRecords()

tableHandle = metadata.applyFilter(SESSION, tableHandle, Constraint.alwaysTrue()).get().getHandle();

List<ConnectorSplit> splits = getAllSplits(splitManager.getSplits(transaction, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY));
List<ConnectorSplit> splits = getAllSplits(splitManager.getSplits(transaction, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY, Constraint.alwaysTrue()));

long rowNumber = 0;
for (ConnectorSplit split : splits) {
Expand Down Expand Up @@ -260,7 +260,7 @@ public void testGetTupleType()

ConnectorTransactionHandle transaction = CassandraTransactionHandle.INSTANCE;

List<ConnectorSplit> splits = getAllSplits(splitManager.getSplits(transaction, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY));
List<ConnectorSplit> splits = getAllSplits(splitManager.getSplits(transaction, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY, Constraint.alwaysTrue()));

long rowNumber = 0;
for (ConnectorSplit split : splits) {
Expand Down Expand Up @@ -311,7 +311,7 @@ public void testGetUserDefinedType()

tableHandle = metadata.applyFilter(SESSION, tableHandle, Constraint.alwaysTrue()).get().getHandle();

List<ConnectorSplit> splits = getAllSplits(splitManager.getSplits(transaction, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY));
List<ConnectorSplit> splits = getAllSplits(splitManager.getSplits(transaction, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY, Constraint.alwaysTrue()));

long rowNumber = 0;
for (ConnectorSplit split : splits) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;

Expand Down Expand Up @@ -49,7 +50,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
ElasticsearchTableHandle tableHandle = (ElasticsearchTableHandle) table;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.TableNotFoundException;
Expand Down Expand Up @@ -49,7 +50,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle connectorTableHandle,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
ExampleTableHandle tableHandle = (ExampleTableHandle) connectorTableHandle;
ExampleTable table = exampleClient.getTable(tableHandle.getSchemaName(), tableHandle.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.TableNotFoundException;
Expand Down Expand Up @@ -49,7 +50,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle connectorTableHandle,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
SheetsTableHandle tableHandle = (SheetsTableHandle) connectorTableHandle;
Optional<SheetsTable> table = sheetsClient.getTable(tableHandle.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -179,7 +180,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle tableHandle,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
HiveTableHandle hiveTable = (HiveTableHandle) tableHandle;
SchemaTableName tableName = hiveTable.getSchemaTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.predicate.NullableValue;
Expand Down Expand Up @@ -58,7 +59,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
JmxTableHandle tableHandle = (JmxTableHandle) table;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RecordSet;
Expand Down Expand Up @@ -105,7 +106,7 @@ public void testPredicatePushdown()
TupleDomain<ColumnHandle> nodeTupleDomain = TupleDomain.fromFixedValues(ImmutableMap.of(columnHandle, NullableValue.of(createUnboundedVarcharType(), utf8Slice(nodeIdentifier))));
JmxTableHandle tableHandle = new JmxTableHandle(new SchemaTableName("schema", "tableName"), ImmutableList.of("objectName"), ImmutableList.of(columnHandle), true, nodeTupleDomain);

ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY);
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY, Constraint.alwaysTrue());
List<ConnectorSplit> allSplits = getAllSplits(splitSource);

assertEquals(allSplits.size(), 1);
Expand All @@ -119,7 +120,7 @@ public void testNoPredicate()
throws Exception
{
JmxTableHandle tableHandle = new JmxTableHandle(new SchemaTableName("schema", "tableName"), ImmutableList.of("objectName"), ImmutableList.of(columnHandle), true, TupleDomain.all());
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY);
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY, Constraint.alwaysTrue());
List<ConnectorSplit> allSplits = getAllSplits(splitSource);
assertEquals(allSplits.size(), nodes.size());

Expand Down Expand Up @@ -195,7 +196,7 @@ private RecordSet getRecordSet(SchemaTableName schemaTableName)
JmxTableHandle tableHandle = metadata.getTableHandle(SESSION, schemaTableName);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(SESSION, tableHandle).values());

ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY);
ConnectorSplitSource splitSource = splitManager.getSplits(JmxTransactionHandle.INSTANCE, SESSION, tableHandle, UNGROUPED_SCHEDULING, DynamicFilter.EMPTY, Constraint.alwaysTrue());
List<ConnectorSplit> allSplits = getAllSplits(splitSource);
assertEquals(allSplits.size(), nodes.size());
ConnectorSplit split = allSplits.get(0);
Expand Down
Loading