Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions lib/trino-hadoop-toolkit/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
</properties>

<dependencies>

<dependency>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
*/
package io.trino.hadoop;

import io.trino.spi.classloader.ThreadContextClassLoader;
import org.apache.hadoop.conf.Configuration;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;

import static com.google.common.base.Preconditions.checkState;

public final class ConfigurationInstantiator
{
private ConfigurationInstantiator() {}
Expand All @@ -37,17 +36,11 @@ public static Configuration newConfigurationWithDefaultResources()

private static Configuration newConfiguration(boolean loadDefaults)
{
// Configuration captures TCCL and it may used later e.g. to load filesystem implementation class
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
ClassLoader expectedClassLoader = ConfigurationInstantiator.class.getClassLoader();
checkState(
tccl == expectedClassLoader,
"During instantiation, the Configuration object captures the TCCL and uses it to resolve classes by name. " +
"For this reason, the current TCCL %s should be same as this class's classloader %s. " +
"Otherwise the constructed Configuration will use *some* classloader to resolve classes",
tccl,
expectedClassLoader);
return newConfigurationWithTccl(loadDefaults);
// Ensure that the context class loader used while instantiating the `Configuration` object corresponds to the
// class loader of the `ConfigurationInstantiator`
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(ConfigurationInstantiator.class.getClassLoader())) {
return newConfigurationWithTccl(loadDefaults);
}
}

// Usage of `new Configuration(boolean)` is not allowed. Only ConfigurationInstantiator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.base.classloader;

import io.airlift.slice.Slice;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.metrics.Metrics;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;

import static java.util.Objects.requireNonNull;

public class ClassLoaderSafeUpdatablePageSource
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test with TestClassLoaderSafeWrappers

implements UpdatablePageSource
{
private final UpdatablePageSource delegate;
private final ClassLoader classLoader;

public ClassLoaderSafeUpdatablePageSource(UpdatablePageSource delegate, ClassLoader classLoader)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.classLoader = requireNonNull(classLoader, "classLoader is null");
}

@Override
public long getCompletedBytes()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getCompletedBytes();
}
}

@Override
public OptionalLong getCompletedPositions()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getCompletedPositions();
}
}

@Override
public long getReadTimeNanos()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getReadTimeNanos();
}
}

@Override
public boolean isFinished()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.isFinished();
}
}

@Override
public Page getNextPage()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getNextPage();
}
}

@Override
public long getMemoryUsage()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getMemoryUsage();
}
}

@Override
public void close()
throws IOException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
delegate.close();
}
}

@Override
public CompletableFuture<?> isBlocked()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.isBlocked();
}
}

@Override
public Metrics getMetrics()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getMetrics();
}
}

@Override
public void deleteRows(Block rowIds)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
delegate.deleteRows(rowIds);
}
}

@Override
public void updateRows(Page page, List<Integer> columnValueAndRowIdChannels)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
delegate.updateRows(page, columnValueAndRowIdChannels);
}
}

@Override
public CompletableFuture<Collection<Slice>> finish()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.finish();
}
}

@Override
public void abort()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
delegate.abort();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.RecordSet;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.ptf.ConnectorTableFunction;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -51,6 +52,7 @@ public void test()
testClassLoaderSafe(ConnectorSplitManager.class, ClassLoaderSafeConnectorSplitManager.class);
testClassLoaderSafe(ConnectorNodePartitioningProvider.class, ClassLoaderSafeNodePartitioningProvider.class);
testClassLoaderSafe(ConnectorSplitSource.class, ClassLoaderSafeConnectorSplitSource.class);
testClassLoaderSafe(UpdatablePageSource.class, ClassLoaderSafeUpdatablePageSource.class);
testClassLoaderSafe(SystemTable.class, ClassLoaderSafeSystemTable.class);
testClassLoaderSafe(ConnectorRecordSetProvider.class, ClassLoaderSafeConnectorRecordSetProvider.class);
testClassLoaderSafe(RecordSet.class, ClassLoaderSafeRecordSet.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.parquet.predicate.Predicate;
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.ParquetReader;
import io.trino.plugin.base.classloader.ClassLoaderSafeUpdatablePageSource;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
Expand Down Expand Up @@ -336,17 +337,19 @@ public ConnectorPageSource createPageSource(
table.getStorageProperties(),
maxOpenPartitions);

return new IcebergPageSource(
tableSchema,
icebergColumns,
requiredColumns,
readColumns,
dataPageSource.get(),
projectionsAdapter,
Optional.of(deleteFilter).filter(filter -> filter.hasPosDeletes() || filter.hasEqDeletes()),
positionDeleteSink,
updatedRowPageSinkSupplier,
table.getUpdatedColumns());
return new ClassLoaderSafeUpdatablePageSource(
new IcebergPageSource(
tableSchema,
icebergColumns,
requiredColumns,
readColumns,
dataPageSource.get(),
projectionsAdapter,
Optional.of(deleteFilter).filter(filter -> filter.hasPosDeletes() || filter.hasEqDeletes()),
positionDeleteSink,
updatedRowPageSinkSupplier,
table.getUpdatedColumns()),
getClass().getClassLoader());
}

private ReaderPageSource createDataPageSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,21 @@
*/
package io.trino.plugin.phoenix5;

import io.trino.spi.classloader.ThreadContextClassLoader;
import org.apache.hadoop.conf.Configuration;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;

import static com.google.common.base.Preconditions.checkState;

final class ConfigurationInstantiator
{
private ConfigurationInstantiator() {}

public static Configuration newEmptyConfiguration()
{
// Configuration captures TCCL and it may used later e.g. to load filesystem implementation class
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
ClassLoader expectedClassLoader = ConfigurationInstantiator.class.getClassLoader();
checkState(
tccl == expectedClassLoader,
"During instantiation, the Configuration object captures the TCCL and uses it to resolve classes by name. " +
"For this reason, the current TCCL %s should be same as this class's classloader %s. " +
"Otherwise the constructed Configuration will use *some* classloader to resolve classes",
tccl,
expectedClassLoader);
return newConfigurationWithTccl();
// Ensure that the context class loader used while instantiating the `Configuration` object corresponds to the
// class loader of the `ConfigurationInstantiator`
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(ConfigurationInstantiator.class.getClassLoader())) {
return newConfigurationWithTccl();
}
}

// Usage of `new Configuration(boolean)` is not allowed. Only ConfigurationInstantiator
Expand Down
Loading