Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public ConnectorSplitSource getSplits(
DynamicFilter dynamicFilter,
Constraint constraint)
{
return new FixedSplitSource(ImmutableList.of(MOCK_CONNECTOR_SPLIT));
return new FixedSplitSource(MOCK_CONNECTOR_SPLIT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ public class FixedSplitSource
private final Optional<List<Object>> tableExecuteSplitsInfo;
private int offset;

public static FixedSplitSource emptySplitSource()
{
return new FixedSplitSource(List.of());
}

public FixedSplitSource(ConnectorSplit split)
{
this(List.of(split));
}

public FixedSplitSource(Iterable<? extends ConnectorSplit> splits)
{
this(splits, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected static Optional<ColumnMapping> mapToUnboundedVarchar(JdbcTypeHandle ty
@Override
public ConnectorSplitSource getSplits(ConnectorSession session, JdbcTableHandle tableHandle)
{
return new FixedSplitSource(ImmutableList.of(new JdbcSplit(Optional.empty())));
return new FixedSplitSource(new JdbcSplit(Optional.empty()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public ConnectorSplitSource getSplits(

if (!bigQueryTableHandle.isNamedRelation()) {
List<BigQueryColumnHandle> columns = bigQueryTableHandle.getProjectedColumns().orElse(ImmutableList.of());
return new FixedSplitSource(ImmutableList.of(BigQuerySplit.forViewStream(columns, filter)));
return new FixedSplitSource(BigQuerySplit.forViewStream(columns, filter));
}

TableId remoteTableId = bigQueryTableHandle.asPlainTable().getRemoteTableName().toTableId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Set;

import static io.trino.plugin.cassandra.CassandraSessionProperties.getSplitsPerNode;
import static io.trino.spi.connector.FixedSplitSource.emptySplitSource;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -103,7 +104,7 @@ public ConnectorSplitSource getSplits(

if (partitions.isEmpty()) {
log.debug("No partitions matched predicates for table %s", connectorTableHandle);
return new FixedSplitSource(ImmutableList.of());
return emptySplitSource();
}

// if this is an unpartitioned table, split into equal ranges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxSplitSize;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.deserializePartitionValue;
import static io.trino.spi.connector.FixedSplitSource.emptySplitSource;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -104,7 +105,7 @@ public ConnectorSplitSource getSplits(
if (deltaLakeTableHandle.isRecordScannedFiles()) {
return new FixedSplitSource(ImmutableList.of(), ImmutableList.of());
}
return new FixedSplitSource(ImmutableList.of());
return emptySplitSource();
}

DeltaLakeSplitSource splitSource = new DeltaLakeSplitSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.elasticsearch;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.elasticsearch.client.ElasticsearchClient;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
Expand Down Expand Up @@ -55,7 +54,7 @@ public ConnectorSplitSource getSplits(
ElasticsearchTableHandle tableHandle = (ElasticsearchTableHandle) table;

if (tableHandle.getType().equals(QUERY)) {
return new FixedSplitSource(ImmutableList.of(new ElasticsearchSplit(tableHandle.getIndex(), 0, Optional.empty())));
return new FixedSplitSource(new ElasticsearchSplit(tableHandle.getIndex(), 0, Optional.empty()));
}
List<ElasticsearchSplit> splits = client.getSearchShards(tableHandle.getIndex()).stream()
.map(shard -> new ElasticsearchSplit(shard.getIndex(), shard.getId(), shard.getAddress()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.SERVER_SHUTTING_DOWN;
import static io.trino.spi.connector.FixedSplitSource.emptySplitSource;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Collections.emptyIterator;
Expand Down Expand Up @@ -225,7 +226,7 @@ public ConnectorSplitSource getSplits(
if (hiveTable.isRecordScannedFiles()) {
return new FixedSplitSource(ImmutableList.of(), ImmutableList.of());
}
return new FixedSplitSource(ImmutableList.of());
return emptySplitSource();
}

// get buckets from first partition (arbitrary)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import static io.trino.plugin.iceberg.IcebergSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static io.trino.spi.connector.FixedSplitSource.emptySplitSource;
import static java.util.Objects.requireNonNull;

public class IcebergSplitManager
Expand Down Expand Up @@ -66,7 +67,7 @@ public ConnectorSplitSource getSplits(
if (table.isRecordScannedFiles()) {
return new FixedSplitSource(ImmutableList.of(), ImmutableList.of());
}
return new FixedSplitSource(ImmutableList.of());
return emptySplitSource();
}

Table icebergTable = transactionManager.get(transaction, session.getIdentity()).getIcebergTable(session, table.getSchemaTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.mongodb;

import com.google.common.collect.ImmutableList;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
Expand Down Expand Up @@ -49,6 +48,6 @@ public ConnectorSplitSource getSplits(
{
MongoSplit split = new MongoSplit(addresses);

return new FixedSplitSource(ImmutableList.of(split));
return new FixedSplitSource(split);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import static io.trino.plugin.pinot.PinotSplit.createSegmentSplit;
import static io.trino.spi.ErrorType.USER_ERROR;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;

public class PinotSplitManager
Expand All @@ -66,7 +65,7 @@ public PinotSplitManager(PinotClient pinotClient)

protected ConnectorSplitSource generateSplitForBrokerBasedScan(PinotTableHandle pinotTableHandle)
{
return new FixedSplitSource(singletonList(createBrokerSplit()));
return new FixedSplitSource(createBrokerSplit());
}

protected ConnectorSplitSource generateSplitsForSegmentBasedScan(
Expand Down