Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ public class BaseHiveTableLayoutHandle
private final TupleDomain<ColumnHandle> partitionColumnPredicate;

// coordinator-only properties
private final Optional<List<HivePartition>> partitions;
private final Optional<PartitionSet> partitions;

public BaseHiveTableLayoutHandle(
List<BaseHiveColumnHandle> partitionColumns,
TupleDomain<Subfield> domainPredicate,
RowExpression remainingPredicate,
boolean pushdownFilterEnabled,
TupleDomain<ColumnHandle> partitionColumnPredicate,
Optional<List<HivePartition>> partitions)
Optional<PartitionSet> partitions)
{
this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null"));
this.domainPredicate = requireNonNull(domainPredicate, "domainPredicate is null");
Expand Down Expand Up @@ -91,7 +91,7 @@ public TupleDomain<ColumnHandle> getPartitionColumnPredicate()
* @return list of partitions if available, {@code Optional.empty()} if dropped
*/
@JsonIgnore
public Optional<List<HivePartition>> getPartitions()
public Optional<PartitionSet> getPartitions()
{
return partitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,21 @@ public final class MetadataUtils

private MetadataUtils() {}

public static Optional<DiscretePredicates> getDiscretePredicates(List<ColumnHandle> partitionColumns, List<HivePartition> partitions)
public static Optional<DiscretePredicates> getDiscretePredicates(List<ColumnHandle> partitionColumns, Iterable<HivePartition> partitions)
{
Optional<DiscretePredicates> discretePredicates = Optional.empty();
if (!partitionColumns.isEmpty() && !(partitions.size() == 1 && partitions.get(0).getPartitionId().equals(UNPARTITIONED_ID))) {
if (!partitionColumns.isEmpty()) {
// Do not create tuple domains for every partition at the same time!
// There can be a huge number of partitions so use an iterable so
// all domains do not need to be in memory at the same time.
Iterable<TupleDomain<ColumnHandle>> partitionDomains = Iterables.transform(partitions, (hivePartition) -> TupleDomain.fromFixedValues(hivePartition.getKeys()));
Iterable<TupleDomain<ColumnHandle>> partitionDomains = Iterables.transform(partitions, (hivePartition) -> {
if (hivePartition.getPartitionId().equals(UNPARTITIONED_ID)) {
return TupleDomain.all();
}
else {
return TupleDomain.fromFixedValues(hivePartition.getKeys());
}
});
discretePredicates = Optional.of(new DiscretePredicates(partitionColumns, partitionDomains));
}
return discretePredicates;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.ThreadSafe;

import java.util.Iterator;
import java.util.List;

import static java.util.Objects.requireNonNull;

@ThreadSafe
public class PartitionSet
implements Iterable<HivePartition>
{
private volatile boolean fullyLoaded;
private PartitionLoader partitionLoader;
private List<HivePartition> partitions;

public PartitionSet(List<HivePartition> partitions)
{
this.partitions = requireNonNull(partitions, "partitions is null");
this.fullyLoaded = true;
}

public PartitionSet(PartitionLoader partitionLoader)
{
this.partitionLoader = requireNonNull(partitionLoader, "partitionLoader is null");
}

@Override
public Iterator<HivePartition> iterator()
{
return new LazyIterator(this);
}

public List<HivePartition> getFullyLoadedPartitions()
{
tryFullyLoad();
return partitions;
}

public boolean isEmpty()
{
if (fullyLoaded) {
return partitions.isEmpty();
}
else {
synchronized (this) {
if (fullyLoaded) {
return partitions.isEmpty();
}
return partitionLoader.isEmpty();
}
}
}

private void tryFullyLoad()
{
if (!fullyLoaded) {
synchronized (this) {
if (!fullyLoaded) {
partitions = ImmutableList.copyOf(partitionLoader.loadPartitions());
fullyLoaded = true;
partitionLoader = null;
}
}
}
}

public interface PartitionLoader
{
List<HivePartition> loadPartitions();

boolean isEmpty();
}

private static class LazyIterator
extends AbstractIterator<HivePartition>
{
private final PartitionSet lazyPartitions;
private List<HivePartition> partitions;
private int position = -1;

private LazyIterator(PartitionSet lazyPartitions)
{
this.lazyPartitions = lazyPartitions;
}

@Override
protected HivePartition computeNext()
{
if (partitions == null) {
partitions = lazyPartitions.getFullyLoadedPartitions();
}

position++;
if (position >= partitions.size()) {
return endOfData();
}
return partitions.get(position);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ public Optional<Object> getInfo(ConnectorTableLayoutHandle layoutHandle)
HiveTableLayoutHandle tableLayoutHandle = (HiveTableLayoutHandle) layoutHandle;
if (tableLayoutHandle.getPartitions().isPresent()) {
return Optional.of(new HiveInputInfo(
tableLayoutHandle.getPartitions().get().stream()
tableLayoutHandle.getPartitions().map(PartitionSet::getFullyLoadedPartitions).get().stream()
.map(hivePartition -> hivePartition.getPartitionId().getPartitionName())
.collect(toList()),
false, tableLayoutHandle.getTablePath()));
Expand Down Expand Up @@ -2643,13 +2643,13 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl
private List<HivePartition> getOrComputePartitions(HiveTableLayoutHandle layoutHandle, ConnectorSession session, ConnectorTableHandle tableHandle)
{
if (layoutHandle.getPartitions().isPresent()) {
return layoutHandle.getPartitions().get();
return layoutHandle.getPartitions().map(PartitionSet::getFullyLoadedPartitions).get();
}
else {
TupleDomain<ColumnHandle> partitionColumnPredicate = layoutHandle.getPartitionColumnPredicate();
Predicate<Map<ColumnHandle, NullableValue>> predicate = convertToPredicate(partitionColumnPredicate);
ConnectorTableLayoutResult tableLayoutResult = getTableLayoutForConstraint(session, tableHandle, new Constraint<>(partitionColumnPredicate, predicate), Optional.empty());
return ((HiveTableLayoutHandle) tableLayoutResult.getTableLayout().getHandle()).getPartitions().get();
return ((HiveTableLayoutHandle) tableLayoutResult.getTableLayout().getHandle()).getPartitions().map(PartitionSet::getFullyLoadedPartitions).get();
}
}

Expand Down Expand Up @@ -2813,7 +2813,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
{
HiveTableLayoutHandle hiveLayoutHandle = (HiveTableLayoutHandle) layoutHandle;
List<ColumnHandle> partitionColumns = ImmutableList.copyOf(hiveLayoutHandle.getPartitionColumns());
List<HivePartition> partitions = hiveLayoutHandle.getPartitions().get();
List<HivePartition> partitions = hiveLayoutHandle.getPartitions().map(PartitionSet::getFullyLoadedPartitions).get();

Optional<DiscretePredicates> discretePredicates = getDiscretePredicates(partitionColumns, partitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public ConnectorSplitSource getSplits(

// get partitions
List<HivePartition> partitions = layout.getPartitions()
.map(PartitionSet::getFullyLoadedPartitions)
.orElseThrow(() -> new PrestoException(GENERIC_INTERNAL_ERROR, "Layout does not contain partitions"));

// short circuit if we don't have any partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public class HiveTableLayoutHandle
private final boolean footerStatsUnreliable;

// coordinator-only properties
private final Optional<List<HivePartition>> partitions;
private final Optional<HiveTableHandle> hiveTableHandle;

/**
Expand Down Expand Up @@ -143,7 +142,7 @@ protected HiveTableLayoutHandle(
remainingPredicate,
pushdownFilterEnabled,
partitionColumnPredicate,
partitions);
partitions.map(PartitionSet::new));

this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.tablePath = requireNonNull(tablePath, "tablePath is null");
Expand All @@ -165,7 +164,6 @@ else if (predicateColumns.values().stream().anyMatch(column -> isRowIdColumnHand
this.appendRowId = false;
}
this.appendRowNumberEnabled = appendRowNumberEnabled;
this.partitions = requireNonNull(partitions, "partitions is null");
this.footerStatsUnreliable = footerStatsUnreliable;
this.hiveTableHandle = requireNonNull(hiveTableHandle, "hiveTableHandle is null");
}
Expand Down Expand Up @@ -300,7 +298,7 @@ private TupleDomain<ColumnHandle> getConstraint(PlanCanonicalizationStrategy can
// Constants are only removed from point checks, and not range checks. Example:
// `x = 1` is equivalent to `x = 1000`
// `x > 1` is NOT equivalent to `x > 1000`
TupleDomain<ColumnHandle> constraint = createPredicate(ImmutableList.copyOf(getPartitionColumns()), partitions.get());
TupleDomain<ColumnHandle> constraint = createPredicate(ImmutableList.copyOf(getPartitionColumns()), getPartitions().map(PartitionSet::getFullyLoadedPartitions).get());
constraint = getDomainPredicate()
.transform(subfield -> subfield.getPath().isEmpty() ? subfield.getRootName() : null)
.transform(getPredicateColumns()::get)
Expand Down Expand Up @@ -363,7 +361,7 @@ public Builder builder()
.setRequestedColumns(getRequestedColumns())
.setPartialAggregationsPushedDown(isPartialAggregationsPushedDown())
.setAppendRowNumberEnabled(isAppendRowNumberEnabled())
.setPartitions(getPartitions())
.setPartitions(getPartitions().map(PartitionSet::getFullyLoadedPartitions))
.setFooterStatsUnreliable(isFooterStatsUnreliable())
.setHiveTableHandle(getHiveTableHandle());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,7 +1658,7 @@ protected void assertExpectedTableLayoutHandle(ConnectorTableLayoutHandle actual
assertInstanceOf(expectedTableLayoutHandle, HiveTableLayoutHandle.class);
HiveTableLayoutHandle actual = (HiveTableLayoutHandle) actualTableLayoutHandle;
HiveTableLayoutHandle expected = (HiveTableLayoutHandle) expectedTableLayoutHandle;
assertExpectedPartitions(actual.getPartitions().get(), expected.getPartitions().get());
assertExpectedPartitions(actual.getPartitions().map(PartitionSet::getFullyLoadedPartitions).get(), expected.getPartitions().get());
}

protected void assertExpectedPartitions(List<HivePartition> actualPartitions, Iterable<HivePartition> expectedPartitions)
Expand Down Expand Up @@ -5364,6 +5364,7 @@ protected static List<ConnectorSplit> getAllSplits(ConnectorSplitSource splitSou
protected List<?> getAllPartitions(ConnectorTableLayoutHandle layoutHandle)
{
return ((HiveTableLayoutHandle) layoutHandle).getPartitions()
.map(PartitionSet::getFullyLoadedPartitions)
.orElseThrow(() -> new AssertionError("layout has no partitions"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ private void createTable(MetastoreContext metastoreContext, SchemaTableName tabl
// verify the data
ConnectorTableLayoutResult tableLayoutResult = metadata.getTableLayoutForConstraint(session, hiveTableHandle, Constraint.alwaysTrue(), Optional.empty());
HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) tableLayoutResult.getTableLayout().getHandle();
assertEquals(layoutHandle.getPartitions().get().size(), 1);
assertEquals(layoutHandle.getPartitions().map(PartitionSet::getFullyLoadedPartitions).get().size(), 1);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, layoutHandle, SPLIT_SCHEDULING_CONTEXT);
ConnectorSplit split = getOnlyElement(getAllSplits(splitSource));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ protected QueryRunner createQueryRunner()

private List<?> getPartitions(HiveTableLayoutHandle tableLayoutHandle)
{
return tableLayoutHandle.getPartitions().get();
return tableLayoutHandle.getPartitions().map(PartitionSet::getFullyLoadedPartitions).get();
}

@Test
Expand Down
6 changes: 6 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.hive.HiveOutputMetadata;
import com.facebook.presto.hive.HivePartition;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.PartitionSet;
import com.facebook.presto.hive.UnknownTableTypeException;
import com.facebook.presto.iceberg.changelog.ChangelogOperation;
import com.facebook.presto.iceberg.changelog.ChangelogUtil;
Expand Down Expand Up @@ -137,6 +138,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
Expand Down Expand Up @@ -368,10 +370,10 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
TupleDomain<ColumnHandle> partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().orElse(ImmutableMap.of()), Predicates.in(partitionColumns)));
Optional<Set<IcebergColumnHandle>> requestedColumns = desiredColumns.map(columns -> columns.stream().map(column -> (IcebergColumnHandle) column).collect(toImmutableSet()));

List<HivePartition> partitions;
PartitionSet partitions;
if (handle.getIcebergTableName().getTableType() == CHANGELOG ||
handle.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
partitions = ImmutableList.of(new HivePartition(handle.getSchemaTableName()));
partitions = new PartitionSet(ImmutableList.of(new HivePartition(handle.getSchemaTableName())));
}
else {
RuntimeStats runtimeStats = session.getRuntimeStats();
Expand All @@ -395,7 +397,7 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
.setRequestedColumns(requestedColumns)
.setPushdownFilterEnabled(isPushdownFilterEnabled(session))
.setPartitionColumnPredicate(partitionColumnPredicate.simplify())
.setPartitions(Optional.ofNullable(partitions.size() == 0 ? null : partitions))
.setPartitions(Optional.ofNullable(partitions.isEmpty() ? null : partitions))
.setTable(handle)
.build());
return new ConnectorTableLayoutResult(layout, constraint.getSummary());
Expand All @@ -419,7 +421,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
Table icebergTable = getIcebergTable(session, tableHandle.getSchemaTableName());
validateTableMode(session, icebergTable);
List<ColumnHandle> partitionColumns = ImmutableList.copyOf(icebergTableLayoutHandle.getPartitionColumns());
Optional<List<HivePartition>> partitions = icebergTableLayoutHandle.getPartitions();
Optional<? extends Iterable<HivePartition>> partitions = icebergTableLayoutHandle.getPartitions();
Optional<DiscretePredicates> discretePredicates = partitions.flatMap(parts -> getDiscretePredicates(partitionColumns, parts));
if (!isPushdownFilterEnabled(session)) {
return new ConnectorTableLayout(
Expand All @@ -435,7 +437,11 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa

Map<String, ColumnHandle> predicateColumns = icebergTableLayoutHandle.getPredicateColumns().entrySet()
.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Optional<TupleDomain<ColumnHandle>> predicate = partitions.map(parts -> getPredicate(icebergTableLayoutHandle, partitionColumns, parts, predicateColumns));
Optional<TupleDomain<ColumnHandle>> predicate = partitions
.map(parts ->
getPredicate(icebergTableLayoutHandle, partitionColumns,
StreamSupport.stream(parts.spliterator(), false).toList(),
predicateColumns));
// capture subfields from domainPredicate to add to remainingPredicate
// so those filters don't get lost
Map<String, com.facebook.presto.common.type.Type> columnTypes = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager).stream()
Expand Down
Loading
Loading