From 01a53cfc98a2be709c7c4a09ccc8c2136791da0d Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 23 Jan 2025 11:27:45 +0100 Subject: [PATCH 1/4] Add configuration for maximum Arrow allocation in BigQuery This helps bounding the memory allocations --- .../BigQueryArrowBufferAllocator.java | 41 +++++++++++ .../plugin/bigquery/BigQueryArrowConfig.java | 36 ++++++++++ .../bigquery/BigQueryConnectorModule.java | 71 +++++++++++-------- .../bigquery/BigQueryPageSourceProvider.java | 4 ++ .../BigQueryStorageArrowPageSource.java | 10 +-- .../bigquery/TestBigQueryArrowConfig.java | 48 +++++++++++++ 6 files changed, 173 insertions(+), 37 deletions(-) create mode 100644 plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java create mode 100644 plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowConfig.java create mode 100644 plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryArrowConfig.java diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java new file mode 100644 index 00000000000..ebad0aa459f --- /dev/null +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import com.google.inject.Inject; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; + +import static java.util.Objects.requireNonNull; + +public class BigQueryArrowBufferAllocator +{ + private final long maximumAllocation; + private final BufferAllocator rootAllocator; + + @Inject + public BigQueryArrowBufferAllocator(BigQueryArrowConfig config) + { + this.maximumAllocation = requireNonNull(config, "config is null").getMaxAllocation().toBytes(); + this.rootAllocator = new RootAllocator(maximumAllocation); + } + + public BufferAllocator newChildAllocator(BigQuerySplit split) + { + return rootAllocator.newChildAllocator( + split.streamName(), + split.dataSize().orElse(0), + maximumAllocation); + } +} diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowConfig.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowConfig.java new file mode 100644 index 00000000000..1c3aff33a79 --- /dev/null +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.units.DataSize; + +public class BigQueryArrowConfig +{ + private DataSize maxAllocation = DataSize.ofBytes(Integer.MAX_VALUE); + + public DataSize getMaxAllocation() + { + return maxAllocation; + } + + @ConfigDescription("Maximum memory allocation allowed for Arrow library") + @Config("bigquery.arrow-serialization.max-allocation") + public BigQueryArrowConfig setMaxAllocation(DataSize maxAllocation) + { + this.maxAllocation = maxAllocation; + return this; + } +} diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java index e1b1373453c..a044a368e66 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java @@ -84,10 +84,11 @@ protected void setup(Binder binder) binder.bind(ViewMaterializationCache.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(BigQueryConfig.class); configBinder(binder).bindConfig(BigQueryRpcConfig.class); + newOptionalBinder(binder, BigQueryArrowBufferAllocator.class); install(conditionalModule( BigQueryConfig.class, BigQueryConfig::isArrowSerializationEnabled, - ClientModule::verifyPackageAccessAllowed)); + new ArrowSerializationModule())); newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON); newSetBinder(binder, Procedure.class).addBinding().toProvider(ExecuteProcedure.class).in(Scopes.SINGLETON); newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(BigQuerySessionProperties.class).in(Scopes.SINGLETON); @@ -141,34 +142,6 @@ public ExecutorService provideExecutor(CatalogName catalogName) { return newCachedThreadPool(daemonThreadsNamed("bigquery-" + catalogName + "-%s")); } - - /** - * Apache Arrow requires reflective access to certain Java internals prohibited since Java 17. - * Adds an error to the {@code binder} if required --add-opens is not passed to the JVM. - */ - private static void verifyPackageAccessAllowed(Binder binder) - { - // Match an --add-opens argument that opens a package to unnamed modules. - // The first group is the opened package. - Pattern argPattern = Pattern.compile( - "^--add-opens=(.*)=([A-Za-z0-9_.]+,)*ALL-UNNAMED(,[A-Za-z0-9_.]+)*$"); - // We don't need to check for values in separate arguments because - // they are joined with "=" before we get them. - - Set openedModules = ManagementFactory.getRuntimeMXBean() - .getInputArguments() - .stream() - .map(argPattern::matcher) - .filter(Matcher::matches) - .map(matcher -> matcher.group(1)) - .collect(toSet()); - - if (!openedModules.contains("java.base/java.nio")) { - binder.addError( - "BigQuery connector requires additional JVM arguments to run when '" + ARROW_SERIALIZATION_ENABLED + "' is enabled. " + - "Please add '--add-opens=java.base/java.nio=ALL-UNNAMED' to the JVM configuration."); - } - } } public static class StaticCredentialsModule @@ -200,4 +173,44 @@ protected void setup(Binder binder) } } } + + public static class ArrowSerializationModule + extends AbstractConfigurationAwareModule + { + @Override + protected void setup(Binder binder) + { + verifyPackageAccessAllowed(binder); + configBinder(binder).bindConfig(BigQueryArrowConfig.class); + binder.bind(BigQueryArrowBufferAllocator.class).in(Scopes.SINGLETON); + } + + /** + * Apache Arrow requires reflective access to certain Java internals prohibited since Java 17. + * Adds an error to the {@code binder} if required --add-opens is not passed to the JVM. + */ + private static void verifyPackageAccessAllowed(Binder binder) + { + // Match an --add-opens argument that opens a package to unnamed modules. + // The first group is the opened package. + Pattern argPattern = Pattern.compile( + "^--add-opens=(.*)=([A-Za-z0-9_.]+,)*ALL-UNNAMED(,[A-Za-z0-9_.]+)*$"); + // We don't need to check for values in separate arguments because + // they are joined with "=" before we get them. + + Set openedModules = ManagementFactory.getRuntimeMXBean() + .getInputArguments() + .stream() + .map(argPattern::matcher) + .filter(Matcher::matches) + .map(matcher -> matcher.group(1)) + .collect(toSet()); + + if (!openedModules.contains("java.base/java.nio")) { + binder.addError( + "BigQuery connector requires additional JVM arguments to run when '" + ARROW_SERIALIZATION_ENABLED + "' is enabled. " + + "Please add '--add-opens=java.base/java.nio=ALL-UNNAMED' to the JVM configuration."); + } + } + } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java index ea66223c3e9..af70e00d9b0 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java @@ -45,6 +45,7 @@ public class BigQueryPageSourceProvider private final int maxReadRowsRetries; private final boolean arrowSerializationEnabled; private final ExecutorService executor; + private final Optional arrowBufferAllocator; @Inject public BigQueryPageSourceProvider( @@ -52,6 +53,7 @@ public BigQueryPageSourceProvider( BigQueryReadClientFactory bigQueryReadClientFactory, BigQueryTypeManager typeManager, BigQueryConfig config, + Optional arrowBufferAllocator, @ForBigQueryPageSource ExecutorService executor) { this.bigQueryClientFactory = requireNonNull(bigQueryClientFactory, "bigQueryClientFactory is null"); @@ -60,6 +62,7 @@ public BigQueryPageSourceProvider( this.maxReadRowsRetries = config.getMaxReadRowsRetries(); this.arrowSerializationEnabled = config.isArrowSerializationEnabled(); this.executor = requireNonNull(executor, "executor is null"); + this.arrowBufferAllocator = requireNonNull(arrowBufferAllocator, "arrowBufferAllocator is null"); } @Override @@ -111,6 +114,7 @@ private ConnectorPageSource createStoragePageSource(ConnectorSession session, Bi typeManager, bigQueryReadClientFactory.create(session), executor, + arrowBufferAllocator.orElseThrow(() -> new IllegalStateException("ArrowBufferAllocator was not bound")), maxReadRowsRetries, split, columnHandles); diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java index 0970add77e0..3bfab5472a8 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java @@ -21,7 +21,6 @@ import io.trino.spi.PageBuilder; import io.trino.spi.connector.ConnectorPageSource; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.ipc.ReadChannel; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.ipc.message.MessageSerializer; @@ -47,12 +46,6 @@ public class BigQueryStorageArrowPageSource { private static final Logger log = Logger.get(BigQueryStorageArrowPageSource.class); - private static final BufferAllocator allocator = new RootAllocator(RootAllocator - .configBuilder() - .from(RootAllocator.defaultConfig()) - .maxAllocation(Integer.MAX_VALUE) - .build()); - private final AtomicLong readBytes = new AtomicLong(); private final AtomicLong readTimeNanos = new AtomicLong(); private final BigQueryReadClient bigQueryReadClient; @@ -70,6 +63,7 @@ public BigQueryStorageArrowPageSource( BigQueryTypeManager typeManager, BigQueryReadClient bigQueryReadClient, ExecutorService executor, + BigQueryArrowBufferAllocator bufferAllocator, int maxReadRowsRetries, BigQuerySplit split, List columns) @@ -83,7 +77,7 @@ public BigQueryStorageArrowPageSource( log.debug("Starting to read from %s", split.streamName()); responses = new ReadRowsHelper(bigQueryReadClient, split.streamName(), maxReadRowsRetries).readRows(); nextResponse = CompletableFuture.supplyAsync(this::getResponse, executor); - this.streamBufferAllocator = allocator.newChildAllocator(split.streamName(), 1024, Long.MAX_VALUE); + this.streamBufferAllocator = bufferAllocator.newChildAllocator(split); this.bigQueryArrowToPageConverter = new BigQueryArrowToPageConverter(typeManager, streamBufferAllocator, schema, columns); this.pageBuilder = new PageBuilder(columns.stream() .map(BigQueryColumnHandle::trinoType) diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryArrowConfig.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryArrowConfig.java new file mode 100644 index 00000000000..e227614c51c --- /dev/null +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryArrowConfig.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.airlift.units.DataSize.Unit.GIGABYTE; + +class TestBigQueryArrowConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(BigQueryArrowConfig.class) + .setMaxAllocation(DataSize.ofBytes(Integer.MAX_VALUE))); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("bigquery.arrow-serialization.max-allocation", "1GB") + .buildOrThrow(); + + BigQueryArrowConfig expected = new BigQueryArrowConfig() + .setMaxAllocation(DataSize.of(1, GIGABYTE)); + + assertFullMapping(properties, expected); + } +} From 70444f09aed4d6a6636b60dde048ac08cc1d19f6 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 23 Jan 2025 11:32:41 +0100 Subject: [PATCH 2/4] Log failed buffer allocations --- .../BigQueryArrowBufferAllocator.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java index ebad0aa459f..3d2d759d81d 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java @@ -14,6 +14,9 @@ package io.trino.plugin.bigquery; import com.google.inject.Inject; +import io.airlift.log.Logger; +import org.apache.arrow.memory.AllocationListener; +import org.apache.arrow.memory.AllocationOutcome; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -21,6 +24,8 @@ public class BigQueryArrowBufferAllocator { + private static final Logger log = Logger.get(BigQueryArrowBufferAllocator.class); + private final long maximumAllocation; private final BufferAllocator rootAllocator; @@ -35,7 +40,29 @@ public BufferAllocator newChildAllocator(BigQuerySplit split) { return rootAllocator.newChildAllocator( split.streamName(), + new RetryingAllocationListener(split.streamName()), split.dataSize().orElse(0), maximumAllocation); } + + private static class RetryingAllocationListener + implements AllocationListener + { + private final String name; + + private RetryingAllocationListener(String name) + { + this.name = requireNonNull(name, "name is null"); + } + + @Override + public boolean onFailedAllocation(long size, AllocationOutcome outcome) + { + log.warn("Failed to allocate %d bytes for allocator '%s' due to %s", size, name, outcome.getStatus()); + outcome.getDetails().ifPresent(details -> { + log.warn("Allocation failure details: %s", details.toString()); + }); + return false; + } + } } From a8a74c0e4595ae0c8d430e2003ca6f55e22f11c8 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 23 Jan 2025 11:43:46 +0100 Subject: [PATCH 3/4] Add Arrow allocation stats to BigQuery --- plugin/trino-bigquery/pom.xml | 10 +++ .../bigquery/BigQueryArrowAllocatorStats.java | 82 +++++++++++++++++++ .../BigQueryArrowBufferAllocator.java | 65 ++++++++++++++- .../bigquery/BigQueryConnectorFactory.java | 6 ++ .../bigquery/BigQueryConnectorModule.java | 5 ++ 5 files changed, 164 insertions(+), 4 deletions(-) create mode 100644 plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowAllocatorStats.java diff --git a/plugin/trino-bigquery/pom.xml b/plugin/trino-bigquery/pom.xml index 4169be64c34..e154593db89 100644 --- a/plugin/trino-bigquery/pom.xml +++ b/plugin/trino-bigquery/pom.xml @@ -223,6 +223,11 @@ log + + io.airlift + stats + + io.airlift units @@ -323,6 +328,11 @@ threetenbp + + org.weakref + jmxutils + + com.fasterxml.jackson.core jackson-annotations diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowAllocatorStats.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowAllocatorStats.java new file mode 100644 index 00000000000..6f7c2e1e066 --- /dev/null +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowAllocatorStats.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import io.airlift.stats.CounterStat; +import org.apache.arrow.memory.AllocationListener; +import org.apache.arrow.memory.AllocationOutcome; +import org.weakref.jmx.Managed; +import org.weakref.jmx.Nested; + +public class BigQueryArrowAllocatorStats + implements AllocationListener +{ + private final CounterStat preAllocatedMemory = new CounterStat(); + private final CounterStat allocatedMemory = new CounterStat(); + private final CounterStat releasedMemory = new CounterStat(); + private final CounterStat failedMemory = new CounterStat(); + + @Managed + @Nested + public CounterStat getAllocatedMemory() + { + return allocatedMemory; + } + + @Managed + @Nested + public CounterStat getReleasedMemory() + { + return releasedMemory; + } + + @Managed + @Nested + public CounterStat getPreAllocatedMemory() + { + return preAllocatedMemory; + } + + @Managed + @Nested + public CounterStat getFailedMemory() + { + return failedMemory; + } + + @Override + public void onAllocation(long size) + { + allocatedMemory.update(size); + } + + @Override + public void onRelease(long size) + { + releasedMemory.update(size); + } + + @Override + public void onPreAllocation(long size) + { + preAllocatedMemory.update(size); + } + + @Override + public boolean onFailedAllocation(long size, AllocationOutcome outcome) + { + failedMemory.update(size); + return false; + } +} diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java index 3d2d759d81d..64076f0b76f 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowBufferAllocator.java @@ -19,6 +19,8 @@ import org.apache.arrow.memory.AllocationOutcome; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.weakref.jmx.Flatten; +import org.weakref.jmx.Managed; import static java.util.Objects.requireNonNull; @@ -27,42 +29,97 @@ public class BigQueryArrowBufferAllocator private static final Logger log = Logger.get(BigQueryArrowBufferAllocator.class); private final long maximumAllocation; + private final BigQueryArrowAllocatorStats stats; private final BufferAllocator rootAllocator; @Inject - public BigQueryArrowBufferAllocator(BigQueryArrowConfig config) + public BigQueryArrowBufferAllocator(BigQueryArrowConfig config, BigQueryArrowAllocatorStats stats) { this.maximumAllocation = requireNonNull(config, "config is null").getMaxAllocation().toBytes(); - this.rootAllocator = new RootAllocator(maximumAllocation); + this.stats = requireNonNull(stats, "stats is null"); + this.rootAllocator = new RootAllocator(stats, maximumAllocation); } public BufferAllocator newChildAllocator(BigQuerySplit split) { return rootAllocator.newChildAllocator( split.streamName(), - new RetryingAllocationListener(split.streamName()), + new RetryingAllocationListener(split.streamName(), stats), split.dataSize().orElse(0), maximumAllocation); } + @Managed + @Flatten + public BigQueryArrowAllocatorStats getStats() + { + return stats; + } + + @Managed + public long getCurrentAllocatedMemory() + { + return rootAllocator.getAllocatedMemory(); + } + + @Managed + public long getPeakAllocatedMemory() + { + return rootAllocator.getPeakMemoryAllocation(); + } + + @Managed + public long getCurrentMemoryHeadroom() + { + return rootAllocator.getHeadroom(); + } + + @Managed + public long getChildAllocatorsCount() + { + return rootAllocator.getChildAllocators().size(); + } + private static class RetryingAllocationListener implements AllocationListener { private final String name; + private final BigQueryArrowAllocatorStats stats; - private RetryingAllocationListener(String name) + private RetryingAllocationListener(String name, BigQueryArrowAllocatorStats stats) { this.name = requireNonNull(name, "name is null"); + this.stats = requireNonNull(stats, "stats is null"); } @Override public boolean onFailedAllocation(long size, AllocationOutcome outcome) { + stats.onFailedAllocation(size, outcome); + log.warn("Failed to allocate %d bytes for allocator '%s' due to %s", size, name, outcome.getStatus()); outcome.getDetails().ifPresent(details -> { log.warn("Allocation failure details: %s", details.toString()); }); return false; } + + @Override + public void onPreAllocation(long size) + { + stats.onPreAllocation(size); + } + + @Override + public void onAllocation(long size) + { + stats.onAllocation(size); + } + + @Override + public void onRelease(long size) + { + stats.onRelease(size); + } } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorFactory.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorFactory.java index 71e19df49e9..13aaf7daec4 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorFactory.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorFactory.java @@ -18,12 +18,15 @@ import io.airlift.json.JsonModule; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; +import io.trino.plugin.base.jmx.ConnectorObjectNameGeneratorModule; +import io.trino.plugin.base.jmx.MBeanServerModule; import io.trino.spi.NodeManager; import io.trino.spi.catalog.CatalogName; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; import io.trino.spi.type.TypeManager; +import org.weakref.jmx.guice.MBeanModule; import java.util.Map; @@ -49,6 +52,9 @@ public Connector create(String catalogName, Map config, Connecto Bootstrap app = new Bootstrap( new JsonModule(), new BigQueryConnectorModule(), + new MBeanServerModule(), + new MBeanModule(), + new ConnectorObjectNameGeneratorModule("io.trino.plugin.bigquery", "trino.plugin.bigquery"), binder -> { binder.bind(TypeManager.class).toInstance(context.getTypeManager()); binder.bind(NodeManager.class).toInstance(context.getNodeManager()); diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java index a044a368e66..dd77ccab7bd 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java @@ -51,6 +51,7 @@ import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.stream.Collectors.toSet; +import static org.weakref.jmx.guice.ExportBinder.newExporter; public class BigQueryConnectorModule extends AbstractConfigurationAwareModule @@ -181,8 +182,12 @@ public static class ArrowSerializationModule protected void setup(Binder binder) { verifyPackageAccessAllowed(binder); + configBinder(binder).bindConfig(BigQueryArrowConfig.class); binder.bind(BigQueryArrowBufferAllocator.class).in(Scopes.SINGLETON); + binder.bind(BigQueryArrowAllocatorStats.class).in(Scopes.SINGLETON); + + newExporter(binder).export(BigQueryArrowBufferAllocator.class).withGeneratedName(); } /** From f0afdf2707604c32fe0fe5514a58b2f27d0853ba Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 23 Jan 2025 13:45:03 +0100 Subject: [PATCH 4/4] Add allocator stats to PageSource metrics --- .../bigquery/BigQueryStorageArrowPageSource.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java index 3bfab5472a8..7330e61c955 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java @@ -15,11 +15,14 @@ import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import io.airlift.log.Logger; +import io.trino.plugin.base.metrics.LongCount; import io.trino.spi.Page; import io.trino.spi.PageBuilder; import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.metrics.Metrics; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.ipc.ReadChannel; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -147,6 +150,14 @@ public CompletableFuture isBlocked() return nextResponse; } + @Override + public Metrics getMetrics() + { + return new Metrics(ImmutableMap.of( + "ArrowAllocatedMemory", new LongCount(streamBufferAllocator.getAllocatedMemory()), + "ArrowPeakAllocatedMemory", new LongCount(streamBufferAllocator.getPeakMemoryAllocation()))); + } + private ReadRowsResponse getResponse() { long start = System.nanoTime();