Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.slice.Slice;
import io.trino.plugin.exchange.filesystem.MetricsBuilder.CounterMetricBuilder;
import io.trino.spi.exchange.ExchangeSource;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceOutputSelector;
import io.trino.spi.metrics.Metrics;
import jakarta.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -42,6 +45,7 @@
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
import static io.trino.plugin.exchange.filesystem.MetricsBuilder.SOURCE_FILES_TOTAL;
import static io.trino.spi.exchange.ExchangeSourceOutputSelector.Selection.INCLUDED;
import static java.util.Objects.requireNonNull;

Expand All @@ -66,6 +70,9 @@ public class FileSystemExchangeSource
private final AtomicReference<ListenableFuture<Void>> blocked = new AtomicReference<>();
private final AtomicBoolean closed = new AtomicBoolean();

private final MetricsBuilder metricsBuilder = new MetricsBuilder();
private final CounterMetricBuilder totalFilesMetric = metricsBuilder.getCounterMetric(SOURCE_FILES_TOTAL);

public FileSystemExchangeSource(
FileSystemExchangeStorage exchangeStorage,
FileSystemExchangeStats stats,
Expand All @@ -86,7 +93,9 @@ public synchronized void addSourceHandles(List<ExchangeSourceHandle> handles)
if (closed.get()) {
return;
}
files.addAll(getFiles(handles));
List<ExchangeSourceFile> newFiles = getFiles(handles);
files.addAll(newFiles);
totalFilesMetric.add(newFiles.size());
closeAndCreateReadersIfNecessary();
}

Expand Down Expand Up @@ -289,7 +298,7 @@ private void closeAndCreateReadersIfNecessary()
break;
}
}
activeReaders.add(exchangeStorage.createExchangeStorageReader(readerFiles.build(), maxPageStorageSize));
activeReaders.add(exchangeStorage.createExchangeStorageReader(readerFiles.build(), maxPageStorageSize, metricsBuilder));
}
if (activeReaders.isEmpty()) {
if (noMoreFiles) {
Expand Down Expand Up @@ -338,6 +347,12 @@ private int getNumberOfActiveReaders()
return result;
}

@Override
public Optional<Metrics> getMetrics()
{
return Optional.of(metricsBuilder.buildMetrics());
}

private static List<ExchangeSourceFile> getFiles(List<ExchangeSourceHandle> handles)
{
return handles.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface FileSystemExchangeStorage
void createDirectories(URI dir)
throws IOException;

ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize);
ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize, MetricsBuilder metricsBuilder);

ExchangeStorageWriter createExchangeStorageWriter(URI file);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.exchange.filesystem;

import io.airlift.stats.TDigest;
import io.trino.plugin.base.metrics.LongCount;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.metrics.Metric;
import io.trino.spi.metrics.Metrics;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.collect.ImmutableMap.toImmutableMap;

public class MetricsBuilder
{
public static final String SOURCE_FILES_TOTAL = "FileSystemExchangeSource.filesTotal";
public static final String SOURCE_FILES_PROCESSED = "FileSystemExchangeSource.filesProcessed";

private final ConcurrentMap<String, MetricBuilder> metricBuilders = new ConcurrentHashMap<>();

public Metrics buildMetrics()
{
return new Metrics(
metricBuilders.entrySet().stream()
.collect(toImmutableMap(
Map.Entry::getKey,
entry -> entry.getValue().build())));
}

private interface MetricBuilder
{
Metric<?> build();
}

public CounterMetricBuilder getCounterMetric(String key)
{
return (CounterMetricBuilder) metricBuilders.computeIfAbsent(key, _ -> new CounterMetricBuilder());
}

public DistributionMetricBuilder getDistributionMetric(String key)
{
return (DistributionMetricBuilder) metricBuilders.computeIfAbsent(key, _ -> new DistributionMetricBuilder());
}

public static class CounterMetricBuilder
implements MetricBuilder
{
private final AtomicLong counter = new AtomicLong();

public void increment()
{
counter.incrementAndGet();
}

public void add(long delta)
{
counter.addAndGet(delta);
}

@Override
public Metric<?> build()
{
return new LongCount(counter.get());
}
}

public static class DistributionMetricBuilder
implements MetricBuilder
{
private final TDigest digest = new TDigest();

public synchronized void add(double value)
{
digest.add(value);
}

@Override
public synchronized Metric<?> build()
{
return new TDigestHistogram(TDigest.copyOf(digest));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter;
import io.trino.plugin.exchange.filesystem.FileStatus;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage;
import io.trino.plugin.exchange.filesystem.MetricsBuilder;
import io.trino.plugin.exchange.filesystem.MetricsBuilder.CounterMetricBuilder;
import jakarta.annotation.PreDestroy;
import reactor.core.publisher.Flux;

Expand Down Expand Up @@ -79,6 +81,7 @@
import static io.airlift.slice.SizeOf.instanceSize;
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeFutures.translateFailures;
import static io.trino.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR;
import static io.trino.plugin.exchange.filesystem.MetricsBuilder.SOURCE_FILES_PROCESSED;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.lang.System.arraycopy;
Expand Down Expand Up @@ -124,9 +127,9 @@ public void createDirectories(URI dir)
}

@Override
public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize)
public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize, MetricsBuilder metricsBuilder)
{
return new AzureExchangeStorageReader(blobServiceAsyncClient, sourceFiles, blockSize, maxPageStorageSize);
return new AzureExchangeStorageReader(blobServiceAsyncClient, sourceFiles, metricsBuilder, blockSize, maxPageStorageSize);
}

@Override
Expand Down Expand Up @@ -279,6 +282,7 @@ private static class AzureExchangeStorageReader
private final Queue<ExchangeSourceFile> sourceFiles;
private final int blockSize;
private final int bufferSize;
CounterMetricBuilder sourceFilesProcessedMetric;

@GuardedBy("this")
private ExchangeSourceFile currentFile;
Expand All @@ -295,11 +299,14 @@ private static class AzureExchangeStorageReader
public AzureExchangeStorageReader(
BlobServiceAsyncClient blobServiceAsyncClient,
List<ExchangeSourceFile> sourceFiles,
MetricsBuilder metricsBuilder,
int blockSize,
int maxPageStorageSize)
{
this.blobServiceAsyncClient = requireNonNull(blobServiceAsyncClient, "blobServiceAsyncClient is null");
this.sourceFiles = new ArrayDeque<>(requireNonNull(sourceFiles, "sourceFiles is null"));
requireNonNull(metricsBuilder, "metricsBuilder is null");
sourceFilesProcessedMetric = metricsBuilder.getCounterMetric(SOURCE_FILES_PROCESSED);
this.blockSize = blockSize;
// Make sure buffer can accommodate at least one complete Slice, and keep reads aligned to block boundaries
this.bufferSize = maxPageStorageSize + blockSize;
Expand Down Expand Up @@ -440,6 +447,7 @@ private void fillBuffer()
}

if (fileOffset == fileSize) {
sourceFilesProcessedMetric.increment();
currentFile = sourceFiles.poll();
if (currentFile == null) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter;
import io.trino.plugin.exchange.filesystem.FileStatus;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage;
import io.trino.plugin.exchange.filesystem.MetricsBuilder;
import io.trino.plugin.exchange.filesystem.MetricsBuilder.CounterMetricBuilder;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand All @@ -50,6 +52,7 @@
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.trino.plugin.exchange.filesystem.MetricsBuilder.SOURCE_FILES_PROCESSED;
import static java.lang.Math.toIntExact;
import static java.nio.file.Files.createFile;
import static java.util.Objects.requireNonNull;
Expand All @@ -67,9 +70,9 @@ public void createDirectories(URI dir)
}

@Override
public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize)
public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, int maxPageStorageSize, MetricsBuilder metricsBuilder)
{
return new LocalExchangeStorageReader(sourceFiles);
return new LocalExchangeStorageReader(sourceFiles, metricsBuilder);
}

@Override
Expand Down Expand Up @@ -140,15 +143,18 @@ private static class LocalExchangeStorageReader

@GuardedBy("this")
private final Queue<ExchangeSourceFile> sourceFiles;
CounterMetricBuilder sourceFilesProcessedMetric;

@GuardedBy("this")
private InputStreamSliceInput sliceInput;
@GuardedBy("this")
private boolean closed;

public LocalExchangeStorageReader(List<ExchangeSourceFile> sourceFiles)
public LocalExchangeStorageReader(List<ExchangeSourceFile> sourceFiles, MetricsBuilder metricsBuilder)
{
this.sourceFiles = new ArrayDeque<>(requireNonNull(sourceFiles, "sourceFiles is null"));
requireNonNull(metricsBuilder, "metricsBuilder is null");
sourceFilesProcessedMetric = metricsBuilder.getCounterMetric(SOURCE_FILES_PROCESSED);
}

@Override
Expand All @@ -165,6 +171,7 @@ public synchronized Slice read()
}
else {
sliceInput.close();
sourceFilesProcessedMetric.increment();
}
}

Expand Down
Loading