diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeTableFunctionProcessorProvider.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeTableFunctionProcessorProvider.java new file mode 100644 index 000000000000..51f54e99ea67 --- /dev/null +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeTableFunctionProcessorProvider.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.classloader; + +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.function.table.ConnectorTableFunctionHandle; +import io.trino.spi.function.table.TableFunctionDataProcessor; +import io.trino.spi.function.table.TableFunctionProcessorProvider; +import io.trino.spi.function.table.TableFunctionSplitProcessor; + +import static java.util.Objects.requireNonNull; + +public final class ClassLoaderSafeTableFunctionProcessorProvider + implements TableFunctionProcessorProvider +{ + private final TableFunctionProcessorProvider delegate; + private final ClassLoader classLoader; + + public ClassLoaderSafeTableFunctionProcessorProvider(TableFunctionProcessorProvider delegate, ClassLoader classLoader) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public TableFunctionDataProcessor getDataProcessor(ConnectorTableFunctionHandle handle) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getDataProcessor(handle); + } + } + + @Override + public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle, ConnectorSplit split) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getSplitProcessor(session, handle, split); + } + } +} diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeTableFunctionSplitProcessor.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeTableFunctionSplitProcessor.java new file mode 100644 index 000000000000..07cfe2b1ec45 --- /dev/null +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeTableFunctionSplitProcessor.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.classloader; + +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.function.table.TableFunctionProcessorState; +import io.trino.spi.function.table.TableFunctionSplitProcessor; + +import static java.util.Objects.requireNonNull; + +public final class ClassLoaderSafeTableFunctionSplitProcessor + implements TableFunctionSplitProcessor +{ + private final TableFunctionSplitProcessor delegate; + private final ClassLoader classLoader; + + public ClassLoaderSafeTableFunctionSplitProcessor(TableFunctionSplitProcessor delegate, ClassLoader classLoader) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public TableFunctionProcessorState process() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.process(); + } + } +} diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/classloader/TestClassLoaderSafeWrappers.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/classloader/TestClassLoaderSafeWrappers.java index 6e9bb520cf34..d0834ec00fa6 100644 --- a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/classloader/TestClassLoaderSafeWrappers.java +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/classloader/TestClassLoaderSafeWrappers.java @@ -27,6 +27,8 @@ import io.trino.spi.connector.SystemTable; import io.trino.spi.eventlistener.EventListener; import io.trino.spi.function.table.ConnectorTableFunction; +import io.trino.spi.function.table.TableFunctionProcessorProvider; +import io.trino.spi.function.table.TableFunctionSplitProcessor; import org.junit.jupiter.api.Test; import java.lang.reflect.Method; @@ -58,6 +60,8 @@ public void test() testClassLoaderSafe(RecordSet.class, ClassLoaderSafeRecordSet.class); testClassLoaderSafe(EventListener.class, ClassLoaderSafeEventListener.class); testClassLoaderSafe(ConnectorTableFunction.class, ClassLoaderSafeConnectorTableFunction.class); + testClassLoaderSafe(TableFunctionSplitProcessor.class, ClassLoaderSafeTableFunctionSplitProcessor.class); + testClassLoaderSafe(TableFunctionProcessorProvider.class, ClassLoaderSafeTableFunctionProcessorProvider.class); } private static void testClassLoaderSafe(Class iface, Class clazz) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeFunctionProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeFunctionProvider.java index de4732d88d34..4e541200b7e1 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeFunctionProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeFunctionProvider.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake; import com.google.inject.Inject; +import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProvider; import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider; import io.trino.plugin.deltalake.functions.tablechanges.TableChangesTableFunctionHandle; import io.trino.spi.function.FunctionProvider; @@ -37,7 +38,7 @@ public DeltaLakeFunctionProvider(TableChangesProcessorProvider tableChangesProce public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle) { if (functionHandle instanceof TableChangesTableFunctionHandle) { - return tableChangesProcessorProvider; + return new ClassLoaderSafeTableFunctionProcessorProvider(tableChangesProcessorProvider, getClass().getClassLoader()); } throw new UnsupportedOperationException("Unsupported function: " + functionHandle); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesProcessorProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesProcessorProvider.java index 5a250e87f73e..b0a7c74b4545 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesProcessorProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesProcessorProvider.java @@ -16,6 +16,7 @@ import com.google.inject.Inject; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.parquet.ParquetReaderOptions; +import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionSplitProcessor; import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.parquet.ParquetReaderConfig; @@ -54,7 +55,7 @@ public TableChangesProcessorProvider( @Override public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle, ConnectorSplit split) { - return new TableChangesFunctionProcessor( + return new ClassLoaderSafeTableFunctionSplitProcessor(new TableChangesFunctionProcessor( session, fileSystemFactory, parquetDateTimeZone, @@ -62,6 +63,7 @@ public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, C fileFormatDataSourceStats, parquetReaderOptions, (TableChangesTableFunctionHandle) handle, - (TableChangesSplit) split); + (TableChangesSplit) split), + getClass().getClassLoader()); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnector.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnector.java index 6771a8216ade..b1c4116ad26e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnector.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnector.java @@ -70,7 +70,7 @@ public class HiveConnector private final HiveTransactionManager transactionManager; private final Set connectorTableFunctions; - private final Optional functionProvider; + private final FunctionProvider functionProvider; private final boolean singleStatementWritesOnly; public HiveConnector( @@ -92,7 +92,7 @@ public HiveConnector( List> materializedViewProperties, Optional accessControl, Set connectorTableFunctions, - Optional functionProvider, + FunctionProvider functionProvider, boolean singleStatementWritesOnly, ClassLoader classLoader) { @@ -243,7 +243,7 @@ public final void shutdown() @Override public Optional getFunctionProvider() { - return functionProvider; + return Optional.of(functionProvider); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 2f0e6a6b97ea..878f6bf62257 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -19,7 +19,6 @@ import com.google.inject.Provides; import com.google.inject.Scopes; import com.google.inject.Singleton; -import com.google.inject.TypeLiteral; import com.google.inject.multibindings.Multibinder; import io.airlift.event.client.EventClient; import io.trino.plugin.base.CatalogName; @@ -57,7 +56,6 @@ import io.trino.spi.function.FunctionProvider; import io.trino.spi.function.table.ConnectorTableFunction; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -149,7 +147,7 @@ public void configure(Binder binder) configBinder(binder).bindConfig(ParquetWriterConfig.class); fileWriterFactoryBinder.addBinding().to(ParquetFileWriterFactory.class).in(Scopes.SINGLETON); - newOptionalBinder(binder, new TypeLiteral>(){}).setDefault().toInstance(Optional.empty()); + newOptionalBinder(binder, FunctionProvider.class).setDefault().toInstance(new NoopFunctionProvider()); newSetBinder(binder, ConnectorTableFunction.class); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java index 995606d56797..6dda13819bec 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java @@ -174,7 +174,7 @@ public static Connector createConnector( hiveMaterializedViewPropertiesProvider.getMaterializedViewProperties(), hiveAccessControl, injector.getInstance(Key.get(new TypeLiteral>() {})), - injector.getInstance(Key.get(new TypeLiteral>() {})), + injector.getInstance(FunctionProvider.class), injector.getInstance(HiveConfig.class).isSingleStatementWritesOnly(), classLoader); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/NoopFunctionProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/NoopFunctionProvider.java new file mode 100644 index 000000000000..f994b7a96c3f --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/NoopFunctionProvider.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import io.trino.spi.function.FunctionProvider; + +public class NoopFunctionProvider + implements FunctionProvider +{} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java index 57bbea9479df..dac202aacf5d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.functions; import com.google.inject.Inject; +import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProvider; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProvider; import io.trino.spi.function.FunctionProvider; @@ -37,7 +38,7 @@ public IcebergFunctionProvider(TableChangesFunctionProcessorProvider tableChange public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle) { if (functionHandle instanceof TableChangesFunctionHandle) { - return tableChangesFunctionProcessorProvider; + return new ClassLoaderSafeTableFunctionProcessorProvider(tableChangesFunctionProcessorProvider, getClass().getClassLoader()); } throw new UnsupportedOperationException("Unsupported function: " + functionHandle); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProvider.java index b052b722b44d..1121046bff33 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProvider.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.functions.tablechanges; import com.google.inject.Inject; +import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionSplitProcessor; import io.trino.plugin.iceberg.IcebergPageSourceProvider; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; @@ -40,10 +41,11 @@ public TableFunctionSplitProcessor getSplitProcessor( ConnectorTableFunctionHandle handle, ConnectorSplit split) { - return new TableChangesFunctionProcessor( + return new ClassLoaderSafeTableFunctionSplitProcessor(new TableChangesFunctionProcessor( session, (TableChangesFunctionHandle) handle, (TableChangesSplit) split, - icebergPageSourceProvider); + icebergPageSourceProvider), + getClass().getClassLoader()); } }