diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConnector.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConnector.java index f45e75851ed3..6032add6db8d 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConnector.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConnector.java @@ -28,6 +28,7 @@ import io.prestosql.spi.connector.ConnectorSplitManager; import io.prestosql.spi.connector.ConnectorTransactionHandle; import io.prestosql.spi.connector.SystemTable; +import io.prestosql.spi.eventlistener.EventListener; import io.prestosql.spi.procedure.Procedure; import io.prestosql.spi.session.PropertyMetadata; import io.prestosql.spi.transaction.IsolationLevel; @@ -52,6 +53,7 @@ public class HiveConnector private final ConnectorNodePartitioningProvider nodePartitioningProvider; private final Set systemTables; private final Set procedures; + private final Set eventListeners; private final List> sessionProperties; private final List> schemaProperties; private final List> tableProperties; @@ -72,6 +74,7 @@ public HiveConnector( ConnectorNodePartitioningProvider nodePartitioningProvider, Set systemTables, Set procedures, + Set eventListeners, List> sessionProperties, List> schemaProperties, List> tableProperties, @@ -88,6 +91,7 @@ public HiveConnector( this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null"); this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null")); this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null")); + this.eventListeners = ImmutableSet.copyOf(requireNonNull(eventListeners, "eventListeners is null")); this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); this.schemaProperties = ImmutableList.copyOf(requireNonNull(schemaProperties, "schemaProperties is null")); this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null")); @@ -170,6 +174,12 @@ public List> getTableProperties() return tableProperties; } + @Override + public Iterable getEventListeners() + { + return eventListeners; + } + @Override public ConnectorAccessControl getAccessControl() { diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/InternalHiveConnectorFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/InternalHiveConnectorFactory.java index 245cac99f750..493223eafc51 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/InternalHiveConnectorFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/InternalHiveConnectorFactory.java @@ -26,6 +26,7 @@ import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider; import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider; import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager; +import io.prestosql.plugin.base.classloader.ClassLoaderSafeEventListener; import io.prestosql.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider; import io.prestosql.plugin.base.jmx.ConnectorObjectNameGeneratorModule; import io.prestosql.plugin.base.jmx.MBeanServerModule; @@ -54,6 +55,7 @@ import io.prestosql.spi.connector.ConnectorPageSourceProvider; import io.prestosql.spi.connector.ConnectorSplitManager; import io.prestosql.spi.connector.SystemTable; +import io.prestosql.spi.eventlistener.EventListener; import io.prestosql.spi.procedure.Procedure; import io.prestosql.spi.type.TypeManager; import org.weakref.jmx.guice.MBeanModule; @@ -62,6 +64,8 @@ import java.util.Optional; import java.util.Set; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConditionalModule.installModuleIf; import static java.util.Objects.requireNonNull; @@ -104,6 +108,7 @@ public static Connector createConnector(String catalogName, Map binder.bind(PageSorter.class).toInstance(context.getPageSorter()); binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName)); }, + binder -> newSetBinder(binder, EventListener.class), module); Injector injector = app @@ -133,6 +138,10 @@ public static Connector createConnector(String catalogName, Map classLoader); Set procedures = injector.getInstance(Key.get(new TypeLiteral>() {})); Set systemTables = injector.getInstance(Key.get(new TypeLiteral>() {})); + Set eventListeners = injector.getInstance(Key.get(new TypeLiteral>() {})) + .stream() + .map(listener -> new ClassLoaderSafeEventListener(listener, classLoader)) + .collect(toImmutableSet()); return new HiveConnector( lifeCycleManager, @@ -144,6 +153,7 @@ public static Connector createConnector(String catalogName, Map new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader), systemTables, procedures, + eventListeners, hiveSessionProperties.getSessionProperties(), HiveSchemaProperties.SCHEMA_PROPERTIES, hiveTableProperties.getTableProperties(), diff --git a/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeEventListener.java b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeEventListener.java new file mode 100644 index 000000000000..d44f97d248e8 --- /dev/null +++ b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeEventListener.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.base.classloader; + +import io.prestosql.spi.classloader.ThreadContextClassLoader; +import io.prestosql.spi.eventlistener.EventListener; +import io.prestosql.spi.eventlistener.QueryCompletedEvent; +import io.prestosql.spi.eventlistener.QueryCreatedEvent; +import io.prestosql.spi.eventlistener.SplitCompletedEvent; + +import static java.util.Objects.requireNonNull; + +public class ClassLoaderSafeEventListener + implements EventListener +{ + private final EventListener delegate; + private final ClassLoader classLoader; + + public ClassLoaderSafeEventListener(@ForClassLoaderSafe EventListener delegate, ClassLoader classLoader) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public void queryCreated(QueryCreatedEvent queryCreatedEvent) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.queryCreated(queryCreatedEvent); + } + } + + @Override + public void queryCompleted(QueryCompletedEvent queryCompletedEvent) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.queryCompleted(queryCompletedEvent); + } + } + + @Override + public void splitCompleted(SplitCompletedEvent splitCompletedEvent) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + delegate.splitCompleted(splitCompletedEvent); + } + } +} diff --git a/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/TestClassLoaderSafeWrappers.java b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/TestClassLoaderSafeWrappers.java index e95c51928d6a..1cfb363e90d6 100644 --- a/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/TestClassLoaderSafeWrappers.java +++ b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/TestClassLoaderSafeWrappers.java @@ -24,6 +24,7 @@ import io.prestosql.spi.connector.ConnectorSplitSource; import io.prestosql.spi.connector.RecordSet; import io.prestosql.spi.connector.SystemTable; +import io.prestosql.spi.eventlistener.EventListener; import org.testng.annotations.Test; import static io.prestosql.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; @@ -44,5 +45,6 @@ public void testAllMethodsOverridden() assertAllMethodsOverridden(SystemTable.class, ClassLoaderSafeSystemTable.class); assertAllMethodsOverridden(ConnectorRecordSetProvider.class, ClassLoaderSafeConnectorRecordSetProvider.class); assertAllMethodsOverridden(RecordSet.class, ClassLoaderSafeRecordSet.class); + assertAllMethodsOverridden(EventListener.class, ClassLoaderSafeEventListener.class); } }