From 0bb298f477dedd85a9145c6b6b285fabc51b4e1f Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sat, 2 Nov 2024 20:13:15 +0000 Subject: [PATCH 1/4] adds optional per table metrics For a subset of metrics in the tablet server and scan server adds optional tableId tags to meters. In a follow on change the compactor could be updated to emit per table metrics, however its current code is very process oriented and this change should be in its own commit. Each server process will automatically remove meters for tables that were delete or related to tables it has not been servicing in a while. --- .../apache/accumulo/core/conf/Property.java | 4 + pom.xml | 2 +- .../server/metrics/PerTableMetrics.java | 203 +++++++++++ .../accumulo/tserver/AssignmentHandler.java | 2 + .../accumulo/tserver/OnlineTablets.java | 31 ++ .../apache/accumulo/tserver/ScanServer.java | 9 +- .../accumulo/tserver/TabletClientHandler.java | 15 +- .../apache/accumulo/tserver/TabletServer.java | 25 +- .../tserver/ThriftScanClientHandler.java | 21 +- .../tserver/metrics/TabletServerMetrics.java | 92 +++-- .../metrics/TabletServerMetricsUtil.java | 80 ++--- .../metrics/TabletServerMinCMetrics.java | 68 +++- .../metrics/TabletServerScanMetrics.java | 166 +++++---- .../metrics/TabletServerUpdateMetrics.java | 97 ++++-- .../tserver/tablet/ScanDataSource.java | 2 +- .../tserver/tablet/SnapshotTablet.java | 2 +- .../accumulo/tserver/tablet/Tablet.java | 6 +- .../accumulo/tserver/tablet/TabletBase.java | 12 +- .../accumulo/test/metrics/MetricsIT.java | 7 +- .../test/metrics/PerTableMetricsIT.java | 329 ++++++++++++++++++ 20 files changed, 968 insertions(+), 205 deletions(-) create mode 100644 server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java create mode 100644 test/src/main/java/org/apache/accumulo/test/metrics/PerTableMetricsIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 669d812f38e..ff489cc2c03 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -330,6 +330,10 @@ public enum Property { PropertyType.TIMEDURATION, "The maximum amount of time that a Scanner should wait before retrying a failed RPC.", "1.7.3"), + GENERAL_MICROMETER_TABLE_METRICS_ENABLED("general.micrometer.table.metrics.enabled", "false", + PropertyType.BOOLEAN, + "Enables per table metrics for a subset of meters. Turning this on will add tableId tags to some meters which will increase the cardinality of metrics.", + "4.0.0"), GENERAL_MICROMETER_CACHE_METRICS_ENABLED("general.micrometer.cache.metrics.enabled", "false", PropertyType.BOOLEAN, "Enables Caffeine Cache metrics functionality using Micrometer.", "4.0.0"), diff --git a/pom.xml b/pom.xml index dedaefcc6d0..b43b5a62e3e 100644 --- a/pom.xml +++ b/pom.xml @@ -169,7 +169,7 @@ io.micrometer micrometer-bom - 1.12.2 + 1.13.6 pom import diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java new file mode 100644 index 00000000000..c3cc2ecee5d --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metrics; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.server.ServerContext; +import org.slf4j.Logger; + +import com.google.common.base.Preconditions; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; + +/** + * Common code for dealing with per table metrics. This code handles automatically creating and + * deleting per table metrics as needed. To use this class extend and implement + * {@link #newAllTablesMetrics(MeterRegistry, Consumer, List)} and + * {@link #newPerTableMetrics(MeterRegistry, TableId, Consumer, List)} to create per table metrics + * object and then use {@link #getTableMetrics(TableId)} to get those cached objects. + */ +public abstract class PerTableMetrics implements MetricsProducer { + + public static final String TABLE_ID_TAG_NAME = "tableId"; + + private final ServerContext context; + + private static class TableMetricsInfo { + final T2 tableMetrics; + volatile Timer inactiveTime; + final List meters; + + public TableMetricsInfo(T2 tableMetrics, List meters) { + this.tableMetrics = Objects.requireNonNull(tableMetrics); + this.meters = meters; + } + } + + private final boolean perTableActive; + private final Supplier> activeTables; + private final ConcurrentHashMap> perTableMetrics = + new ConcurrentHashMap<>(); + private T allTableMetrics; + private volatile MeterRegistry registry; + + public PerTableMetrics(ServerContext context, Supplier> activeTableSupplier) { + activeTables = activeTableSupplier; + perTableActive = + context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_TABLE_METRICS_ENABLED); + this.context = context; + if (perTableActive) { + context.getScheduledExecutor().scheduleAtFixedRate(this::refresh, 30, 30, TimeUnit.SECONDS); + } + } + + /** + * This method exist so this class can log using the logger of the subclass. + */ + protected abstract Logger getLog(); + + /** + * Subclasses should implement this method to create a TableMetrics object that will be used in + * the case when per table metrics are disabled. The object returned by this method will alway be + * returned by {@link #getTableMetrics(TableId)} no matter what the table id is. + * + * @param registry register an meters for the table metrics in this registry + * @param meters a consumer that accepts meters to be removed from the registry when the table + * metrics object is discarded. Currently this consumer does nothing with the meters, its + * passed for consistency with + * {@link #newPerTableMetrics(MeterRegistry, TableId, Consumer, List)} + * @param tags currently an empty collection of tags, this is passed for consistency with + * {@link #PerTableMetrics(ServerContext, Supplier)} + * @return a new object that will be cached and later returned by + * {@link #getTableMetrics(TableId)} + */ + protected abstract T newAllTablesMetrics(MeterRegistry registry, Consumer meters, + List tags); + + /** + * + * Subclasses should implement this method to create per table table metrics objects. This method + * is called in the case where per table metrics are enabled. These objects will be cached and + * returned by {@link #getTableMetrics(TableId)}. Table metrics object in the cache that are no + * longer needed will be automatically removed when the table is deleted or this server has not + * hosted the table for a bit. + * + * @param registry register an meters for the table metrics in this registry + * @param meters a consumer that accepts meters to be removed from the registry when the per table + * metrics object is discarded. + * @param tags returns a list with a single tag in it which is the tableId. These tags should be + * used when registering meters + * @return a new object that will be cached and later returned by + * {@link #getTableMetrics(TableId)} + */ + protected abstract T newPerTableMetrics(MeterRegistry registry, TableId tableId, + Consumer meters, List tags); + + private TableMetricsInfo getOrCreateTableMetrics(TableId tableId) { + Preconditions.checkState(perTableActive); + return perTableMetrics.computeIfAbsent(tableId, tid -> { + List meters = new ArrayList<>(); + T tableMetrics = newPerTableMetrics(registry, tableId, meters::add, + List.of(Tag.of(TABLE_ID_TAG_NAME, tid.canonical()))); + getLog().debug("Created {} meters for table id {} in metrics registry.", meters.size(), + tableId); + return new TableMetricsInfo<>(tableMetrics, meters); + }); + } + + public void registerMetrics(MeterRegistry registry) { + Preconditions.checkState(this.registry == null); + this.registry = registry; + if (!perTableActive) { + this.allTableMetrics = newAllTablesMetrics(registry, m -> {}, List.of()); + } + } + + public T getTableMetrics(TableId tableId) { + Preconditions.checkState(registry != null); + + if (!perTableActive) { + return allTableMetrics; + } + + return getOrCreateTableMetrics(tableId).tableMetrics; + } + + /** + * This method will create per table metrics for any tables that are active on this server and + * currently have no table metrics object in the cache. It will also remove an per table metrics + * object from the cache that have been inactive for a while or where the table was deleted. + */ + public synchronized void refresh() { + if (!perTableActive || registry == null) { + return; + } + + var currentActive = activeTables.get(); + + currentActive.forEach(tid -> { + // This registers metrics for the table if none are currently registered and resets the + // inactiveTime if one exists + getOrCreateTableMetrics(tid).inactiveTime = null; + }); + + // clean up any tables that have been inactive for a bit + var iter = perTableMetrics.entrySet().iterator(); + while (iter.hasNext()) { + var entry = iter.next(); + var tableId = entry.getKey(); + if (!currentActive.contains(tableId)) { + var tableMetricsInfo = entry.getValue(); + var tableState = context.getTableManager().getTableState(tableId); + if (tableState == null || tableState == TableState.DELETING) { + // immediately remove deleted tables + iter.remove(); + tableMetricsInfo.meters.forEach(registry::remove); + getLog().debug( + "Removed {} meters for table id {} from metrics registry because table was deleted.", + tableMetricsInfo.meters.size(), tableId); + } else if (tableMetricsInfo.inactiveTime == null) { + // the first time this table was seen as inactive so start a timer for removal + tableMetricsInfo.inactiveTime = Timer.startNew(); + } else if (tableMetricsInfo.inactiveTime.hasElapsed(10, TimeUnit.MINUTES)) { + iter.remove(); + tableMetricsInfo.meters.forEach(registry::remove); + getLog().debug( + "Removed {} meters for table id {} from metrics registry because table was inactive.", + tableMetricsInfo.meters.size(), tableId); + } + } + } + } +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java index ec56611a03e..8eee32e2a0a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java @@ -170,6 +170,8 @@ public void run() { } } + server.refreshMetrics(extent.tableId()); + tablet = null; // release this reference successful = true; } catch (Exception e) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/OnlineTablets.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/OnlineTablets.java index 9478deda328..99bef01d453 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/OnlineTablets.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/OnlineTablets.java @@ -19,9 +19,14 @@ package org.apache.accumulo.tserver; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.tserver.tablet.Tablet; @@ -34,19 +39,45 @@ */ public class OnlineTablets { private volatile SortedMap snapshot = Collections.emptySortedMap(); + private final AtomicReference>> perTableSnapshot = + new AtomicReference<>(null); private final SortedMap onlineTablets = new TreeMap<>(); public synchronized void put(KeyExtent ke, Tablet t) { onlineTablets.put(ke, t); snapshot = ImmutableSortedMap.copyOf(onlineTablets); + perTableSnapshot.set(null); } public synchronized void remove(KeyExtent ke) { onlineTablets.remove(ke); snapshot = ImmutableSortedMap.copyOf(onlineTablets); + perTableSnapshot.set(null); } SortedMap snapshot() { return snapshot; } + + private static Map> + createPerTableSnapshot(SortedMap snapshot) { + var tables = new HashMap>(); + snapshot.forEach(((keyExtent, tablet) -> { + tables.computeIfAbsent(keyExtent.tableId(), tableId -> new HashMap<>()).put(keyExtent, + tablet); + })); + return tables.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, + entry -> ImmutableSortedMap.copyOf(entry.getValue()))); + } + + Map> perTableSnapshot() { + var snap = perTableSnapshot.get(); + if (snap != null) { + return snap; + } else { + snap = createPerTableSnapshot(snapshot); + perTableSnapshot.compareAndSet(null, snap); + return snap; + } + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 021ee3dbde9..4188875f542 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -55,6 +56,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan; import org.apache.accumulo.core.dataImpl.thrift.InitialScan; @@ -409,7 +411,12 @@ public void run() { MetricsInfo metricsInfo = getContext().getMetricsInfo(); - scanMetrics = new TabletServerScanMetrics(resourceManager::getOpenFiles); + // The following will read through everything in the cache which could update the access time of + // everything which is not desired for this use case, however the cache is expire after write + // and not expire after access so its probably ok. + Supplier> activeTables = () -> tabletMetadataCache.asMap().keySet().stream() + .map(KeyExtent::tableId).collect(Collectors.toSet()); + scanMetrics = new TabletServerScanMetrics(context, activeTables, resourceManager::getOpenFiles); sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads); scanServerMetrics = new ScanServerMetrics(tabletMetadataCache); blockCacheMetrics = new BlockCacheMetrics(resourceManager.getIndexCache(), diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 10da2c5f4e8..0e377914e3e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -153,7 +153,6 @@ public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdura // Make sure user is real Durability durability = DurabilityImpl.fromThrift(tdurabilty); security.authenticateUser(credentials, credentials); - server.updateMetrics.addPermissionErrors(0); UpdateSession us = new UpdateSession(new TservConstraintEnv(server.getContext(), security, credentials), @@ -190,7 +189,7 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) { // not serving tablet, so report all mutations as // failures us.failures.put(keyExtent, 0L); - server.updateMetrics.addUnknownTabletErrors(0); + server.updateMetrics.addUnknownTabletErrors(keyExtent.tableId(), 1); } } else { log.warn("Denying access to table {} for user {}", keyExtent.tableId(), us.getUser()); @@ -198,7 +197,7 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) { us.authTimes.addStat(t2 - t1); us.currentTablet = null; us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED); - server.updateMetrics.addPermissionErrors(0); + server.updateMetrics.addPermissionErrors(keyExtent.tableId(), 1); } } catch (TableNotFoundException tnfe) { log.error("Table " + tableId + " not found ", tnfe); @@ -206,7 +205,7 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) { us.authTimes.addStat(t2 - t1); us.currentTablet = null; us.authFailures.put(keyExtent, SecurityErrorCode.TABLE_DOESNT_EXIST); - server.updateMetrics.addUnknownTabletErrors(0); + server.updateMetrics.addUnknownTabletErrors(keyExtent.tableId(), 1); } catch (ThriftSecurityException e) { log.error("Denying permission to check user " + us.getUser() + " with user " + e.getUser(), e); @@ -214,7 +213,7 @@ private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) { us.authTimes.addStat(t2 - t1); us.currentTablet = null; us.authFailures.put(keyExtent, e.getCode()); - server.updateMetrics.addPermissionErrors(0); + server.updateMetrics.addPermissionErrors(keyExtent.tableId(), 1); } } @@ -292,7 +291,9 @@ private void flush(UpdateSession us) { List mutations = entry.getValue(); if (!mutations.isEmpty()) { try { - server.updateMetrics.addMutationArraySize(mutations.size()); + // TODO this metrics seems very expensive because of the update frequency + server.updateMetrics.addMutationArraySize(tablet.getExtent().tableId(), + mutations.size()); PreparedMutations prepared = tablet.prepareMutationsForCommit(us.cenv, mutations); @@ -313,7 +314,7 @@ private void flush(UpdateSession us) { if (!prepared.getViolations().isEmpty()) { us.violations.add(prepared.getViolations()); - server.updateMetrics.addConstraintViolations(0); + server.updateMetrics.addConstraintViolations(tablet.getExtent().tableId(), 1); } // Use the size of the original mutation list, regardless of how many mutations // did not violate constraints. diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 0310a5b41bd..f81f5d16f76 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -171,6 +171,14 @@ public class TabletServer extends AbstractServer implements TabletHostingServer PausedCompactionMetrics pausedMetrics; BlockCacheMetrics blockCacheMetrics; + public void refreshMetrics(TableId tableId) { + // setup per table metrics for tables if not already setup + metrics.getTableMetrics(tableId); + scanMetrics.getTableMetrics(tableId); + updateMetrics.getTableMetrics(tableId); + mincMetrics.getTableMetrics(tableId); + } + @Override public TabletServerScanMetrics getScanMetrics() { return scanMetrics; @@ -584,10 +592,13 @@ public void run() { MetricsInfo metricsInfo = context.getMetricsInfo(); metrics = new TabletServerMetrics(this); - updateMetrics = new TabletServerUpdateMetrics(); - scanMetrics = new TabletServerScanMetrics(this.resourceManager::getOpenFiles); + updateMetrics = + new TabletServerUpdateMetrics(context, () -> onlineTablets.perTableSnapshot().keySet()); + scanMetrics = new TabletServerScanMetrics(context, + () -> onlineTablets.perTableSnapshot().keySet(), this.resourceManager::getOpenFiles); sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads); - mincMetrics = new TabletServerMinCMetrics(); + mincMetrics = + new TabletServerMinCMetrics(context, () -> onlineTablets.perTableSnapshot().keySet()); pausedMetrics = new PausedCompactionMetrics(); blockCacheMetrics = new BlockCacheMetrics(this.resourceManager.getIndexCache(), this.resourceManager.getDataCache(), this.resourceManager.getSummaryCache()); @@ -940,6 +951,14 @@ public SortedMap getOnlineTablets() { return onlineTablets.snapshot(); } + public SortedMap getOnlineTablets(TableId tableId) { + return onlineTablets.perTableSnapshot().getOrDefault(tableId, Collections.emptySortedMap()); + } + + public Set getOnlineTableIds() { + return onlineTablets.perTableSnapshot().keySet(); + } + @Override public Tablet getOnlineTablet(KeyExtent extent) { Tablet t = onlineTablets.snapshot().get(extent); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java index 9344b1c4ce4..72e95c1ecd9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java @@ -159,9 +159,8 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, KeyExtent ex ThriftSecurityException, org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException, TSampleNotPresentException, ScanServerBusyException { - server.getScanMetrics().incrementStartScan(); - TableId tableId = extent.tableId(); + server.getScanMetrics().incrementStartScan(tableId); NamespaceId namespaceId; try { namespaceId = server.getContext().getNamespaceId(tableId); @@ -252,7 +251,7 @@ protected ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession sc org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException, TSampleNotPresentException, ScanServerBusyException { - server.getScanMetrics().incrementContinueScan(); + server.getScanMetrics().incrementContinueScan(scanSession.getTableId()); if (scanSession.getScanTask() == null) { scanSession.setScanTask(new NextBatchTask(server, scanID, scanSession.interruptFlag)); @@ -328,11 +327,10 @@ protected ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession sc @Override public void closeScan(TInfo tinfo, long scanID) { - server.getScanMetrics().incrementCloseScan(); - final SingleScanSession ss = (SingleScanSession) server.getSessionManager().removeSession(scanID); if (ss != null) { + server.getScanMetrics().incrementCloseScan(ss.getTableId()); long t2 = System.currentTimeMillis(); if (log.isTraceEnabled()) { @@ -341,8 +339,8 @@ public void closeScan(TInfo tinfo, long scanID) { (t2 - ss.startTime) / 1000.0, ss.runStats.toString())); } - server.getScanMetrics().addScan(t2 - ss.startTime); - server.getScanMetrics().addResult(ss.entriesReturned); + server.getScanMetrics().addScan(ss.getTableId(), t2 - ss.startTime); + server.getScanMetrics().addResult(ss.getTableId(), ss.entriesReturned); } } @@ -380,8 +378,6 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, long busyTimeout) throws ThriftSecurityException, TSampleNotPresentException, ScanServerBusyException { - server.getScanMetrics().incrementStartScan(); - // find all of the tables that need to be scanned final HashSet tables = new HashSet<>(); for (KeyExtent keyExtent : tbatch.keySet()) { @@ -394,6 +390,7 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, // check if user has permission to the tables for (TableId tableId : tables) { + server.getScanMetrics().incrementStartScan(tableId); NamespaceId namespaceId = getNamespaceId(credentials, tableId); if (!security.canScan(credentials, tableId, namespaceId)) { throw new ThriftSecurityException(credentials.getPrincipal(), @@ -474,7 +471,7 @@ public MultiScanResult continueMultiScan(TInfo tinfo, long scanID, long busyTime private MultiScanResult continueMultiScan(long scanID, MultiScanSession session, long busyTimeout) throws TSampleNotPresentException, ScanServerBusyException { - server.getScanMetrics().incrementContinueScan(); + server.getScanMetrics().incrementContinueScan(session.getTableId()); if (session.getScanTask() == null) { session.setScanTask(new LookupTask(server, scanID)); @@ -522,13 +519,13 @@ private MultiScanResult continueMultiScan(long scanID, MultiScanSession session, @Override public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException { - server.getScanMetrics().incrementCloseScan(); - MultiScanSession session = (MultiScanSession) server.getSessionManager().removeSession(scanID); if (session == null) { throw new NoSuchScanIDException(); } + server.getScanMetrics().incrementCloseScan(session.getTableId()); + long t2 = System.currentTimeMillis(); if (log.isTraceEnabled()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java index 68798e6d67f..f0499b1e028 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java @@ -37,21 +37,39 @@ import static org.apache.accumulo.core.metrics.Metric.TSERVER_TABLETS_OPENING; import static org.apache.accumulo.core.metrics.Metric.TSERVER_TABLETS_UNOPENED; +import java.util.List; +import java.util.function.Consumer; + +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.server.compaction.CompactionWatcher; import org.apache.accumulo.server.compaction.FileCompactor; +import org.apache.accumulo.server.metrics.PerTableMetrics; import org.apache.accumulo.tserver.TabletServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.LongTaskTimer; +import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; -public class TabletServerMetrics implements MetricsProducer { +public class TabletServerMetrics extends PerTableMetrics + implements MetricsProducer { private final TabletServerMetricsUtil util; + private static final Logger log = LoggerFactory.getLogger(TabletServerMetrics.class); + + @Override + protected Logger getLog() { + return log; + } + public TabletServerMetrics(TabletServer tserver) { + super(tserver.getContext(), tserver::getOnlineTableIds); util = new TabletServerMetricsUtil(tserver); } @@ -63,8 +81,55 @@ private long getTotalEntriesWritten() { return FileCompactor.getTotalEntriesWritten(); } + public static class TableMetrics { + TableMetrics(TabletServerMetricsUtil util, TableId tableId, MeterRegistry registry, + Consumer meters, List tags) { + + meters.accept(Gauge.builder(TSERVER_ENTRIES.getName(), util, tsmu -> tsmu.getEntries(tableId)) + .description(TSERVER_ENTRIES.getDescription()).tags(tags).register(registry)); + meters.accept(Gauge + .builder(TSERVER_MEM_ENTRIES.getName(), util, tsmu -> tsmu.getEntriesInMemory(tableId)) + .description(TSERVER_MEM_ENTRIES.getDescription()).tags(tags).register(registry)); + meters.accept(Gauge + .builder(TSERVER_MINC_RUNNING.getName(), util, tsmu -> tsmu.getMinorCompactions(tableId)) + .description(TSERVER_MINC_RUNNING.getDescription()).tags(tags).register(registry)); + meters.accept(Gauge + .builder(TSERVER_MINC_QUEUED.getName(), util, + tsmu -> tsmu.getMinorCompactionsQueued(tableId)) + .description(TSERVER_MINC_QUEUED.getDescription()).tags(tags).register(registry)); + meters.accept(Gauge + .builder(TSERVER_TABLETS_ONLINE_ONDEMAND.getName(), util, + tsmu -> tsmu.getOnDemandOnlineCount(tableId)) + .description(TSERVER_TABLETS_ONLINE_ONDEMAND.getDescription()).tags(tags) + .register(registry)); + meters.accept(Gauge + .builder(TSERVER_TABLETS_FILES.getName(), util, + tsmu -> tsmu.getAverageFilesPerTablet(tableId)) + .description(TSERVER_TABLETS_FILES.getDescription()).tags(tags).register(registry)); + meters.accept(Gauge + .builder(TSERVER_INGEST_MUTATIONS.getName(), util, tsmu -> tsmu.getIngestCount(tableId)) + .description(TSERVER_INGEST_MUTATIONS.getDescription()).tags(tags).register(registry)); + meters.accept(Gauge + .builder(TSERVER_INGEST_BYTES.getName(), util, tsmu -> tsmu.getIngestByteCount(tableId)) + .description(TSERVER_INGEST_BYTES.getDescription()).tags(tags).register(registry)); + } + } + + @Override + protected TableMetrics newAllTablesMetrics(MeterRegistry registry, Consumer meters, + List tags) { + return new TableMetrics(util, null, registry, meters, tags); + } + + @Override + protected TableMetrics newPerTableMetrics(MeterRegistry registry, TableId tableId, + Consumer meters, List tags) { + return new TableMetrics(util, tableId, registry, meters, tags); + } + @Override public void registerMetrics(MeterRegistry registry) { + super.registerMetrics(registry); FunctionCounter .builder(COMPACTOR_ENTRIES_READ.getName(), this, TabletServerMetrics::getTotalEntriesRead) .description(COMPACTOR_ENTRIES_READ.getDescription()).register(registry); @@ -80,22 +145,6 @@ public void registerMetrics(MeterRegistry registry) { .builder(TSERVER_TABLETS_LONG_ASSIGNMENTS.getName(), util, TabletServerMetricsUtil::getLongTabletAssignments) .description(TSERVER_TABLETS_LONG_ASSIGNMENTS.getDescription()).register(registry); - - Gauge.builder(TSERVER_ENTRIES.getName(), util, TabletServerMetricsUtil::getEntries) - .description(TSERVER_ENTRIES.getDescription()).register(registry); - Gauge.builder(TSERVER_MEM_ENTRIES.getName(), util, TabletServerMetricsUtil::getEntriesInMemory) - .description(TSERVER_MEM_ENTRIES.getDescription()).register(registry); - Gauge - .builder(TSERVER_MINC_RUNNING.getName(), util, TabletServerMetricsUtil::getMinorCompactions) - .description(TSERVER_MINC_RUNNING.getDescription()).register(registry); - Gauge - .builder(TSERVER_MINC_QUEUED.getName(), util, - TabletServerMetricsUtil::getMinorCompactionsQueued) - .description(TSERVER_MINC_QUEUED.getDescription()).register(registry); - Gauge - .builder(TSERVER_TABLETS_ONLINE_ONDEMAND.getName(), util, - TabletServerMetricsUtil::getOnDemandOnlineCount) - .description(TSERVER_TABLETS_ONLINE_ONDEMAND.getDescription()).register(registry); Gauge .builder(TSERVER_TABLETS_ONDEMAND_UNLOADED_FOR_MEM.getName(), util, TabletServerMetricsUtil::getOnDemandUnloadedLowMem) @@ -112,16 +161,7 @@ public void registerMetrics(MeterRegistry registry) { .builder(TSERVER_MINC_TOTAL.getName(), util, TabletServerMetricsUtil::getTotalMinorCompactions) .description(TSERVER_MINC_TOTAL.getDescription()).register(registry); - - Gauge - .builder(TSERVER_TABLETS_FILES.getName(), util, - TabletServerMetricsUtil::getAverageFilesPerTablet) - .description(TSERVER_TABLETS_FILES.getDescription()).register(registry); Gauge.builder(TSERVER_HOLD.getName(), util, TabletServerMetricsUtil::getHoldTime) .description(TSERVER_HOLD.getDescription()).register(registry); - Gauge.builder(TSERVER_INGEST_MUTATIONS.getName(), util, TabletServerMetricsUtil::getIngestCount) - .description(TSERVER_INGEST_MUTATIONS.getDescription()).register(registry); - Gauge.builder(TSERVER_INGEST_BYTES.getName(), util, TabletServerMetricsUtil::getIngestByteCount) - .description(TSERVER_INGEST_BYTES.getDescription()).register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetricsUtil.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetricsUtil.java index e4287391b13..e391f23d852 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetricsUtil.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetricsUtil.java @@ -18,6 +18,9 @@ */ package org.apache.accumulo.tserver.metrics; +import java.util.Collection; + +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.TabletServerResourceManager.AssignmentWatcher; import org.apache.accumulo.tserver.tablet.Tablet; @@ -37,66 +40,47 @@ public long getLongTabletAssignments() { return AssignmentWatcher.getLongAssignments(); } - public long getEntries() { - long result = 0; - for (Tablet tablet : tserver.getOnlineTablets().values()) { - result += tablet.getNumEntries(); - } - return result; + interface TabletToLong { + long apply(Tablet t); } - public long getEntriesInMemory() { + private long sum(TableId tableId, TabletToLong tabletFunction) { + Collection tablets = tableId == null ? tserver.getOnlineTablets().values() + : tserver.getOnlineTablets(tableId).values(); + long result = 0; - for (Tablet tablet : tserver.getOnlineTablets().values()) { - result += tablet.getNumEntriesInMemory(); + for (Tablet tablet : tablets) { + result += tabletFunction.apply(tablet); } return result; } - public double getIngestCount() { - double result = 0; - for (Tablet tablet : tserver.getOnlineTablets().values()) { - result += tablet.totalIngest(); - } - return result; + public long getEntries(TableId tableId) { + return sum(tableId, Tablet::getNumEntries); } - public double getIngestByteCount() { - double result = 0; - for (Tablet tablet : tserver.getOnlineTablets().values()) { - result += tablet.totalIngestBytes(); - } - return result; + public long getEntriesInMemory(TableId tableId) { + return sum(tableId, Tablet::getNumEntriesInMemory); } - public int getMinorCompactions() { - int result = 0; - for (Tablet tablet : tserver.getOnlineTablets().values()) { - if (tablet.isMinorCompactionRunning()) { - result++; - } - } - return result; + public double getIngestCount(TableId tableId) { + return sum(tableId, Tablet::totalIngest); } - public int getMinorCompactionsQueued() { - int result = 0; - for (Tablet tablet : tserver.getOnlineTablets().values()) { - if (tablet.isMinorCompactionQueued()) { - result++; - } - } - return result; + public double getIngestByteCount(TableId tableId) { + return sum(tableId, Tablet::totalIngestBytes); } - public int getOnDemandOnlineCount() { - int result = 0; - for (Tablet tablet : tserver.getOnlineTablets().values()) { - if (tablet.isOnDemand()) { - result++; - } - } - return result; + public int getMinorCompactions(TableId tableId) { + return (int) sum(tableId, tablet -> tablet.isMinorCompactionRunning() ? 1 : 0); + } + + public int getMinorCompactionsQueued(TableId tableId) { + return (int) sum(tableId, tablet -> tablet.isMinorCompactionQueued() ? 1 : 0); + } + + public int getOnDemandOnlineCount(TableId tableId) { + return (int) sum(tableId, tablet -> tablet.isOnDemand() ? 1 : 0); } public int getOnDemandUnloadedLowMem() { @@ -123,10 +107,12 @@ public double getHoldTime() { return tserver.getHoldTimeMillis() / 1000.; } - public double getAverageFilesPerTablet() { + public double getAverageFilesPerTablet(TableId tableId) { + Collection tablets = tableId == null ? tserver.getOnlineTablets().values() + : tserver.getOnlineTablets(tableId).values(); int count = 0; long result = 0; - for (Tablet tablet : tserver.getOnlineTablets().values()) { + for (Tablet tablet : tablets) { result += tablet.getDatafiles().size(); count++; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java index 0ce05e77838..9ad9ba28d5e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java @@ -22,33 +22,75 @@ import static org.apache.accumulo.core.metrics.Metric.MINC_RUNNING; import java.time.Duration; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.server.metrics.NoopMetrics; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.metrics.PerTableMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; -public class TabletServerMinCMetrics implements MetricsProducer { +public class TabletServerMinCMetrics extends PerTableMetrics + implements MetricsProducer { - private Timer activeMinc = NoopMetrics.useNoopTimer(); - private Timer queuedMinc = NoopMetrics.useNoopTimer(); + private static final Logger log = LoggerFactory.getLogger(TabletServerMinCMetrics.class); - public void addActive(long value) { - activeMinc.record(Duration.ofMillis(value)); + @Override + protected Logger getLog() { + return log; + } + + public TabletServerMinCMetrics(ServerContext context, + Supplier> activeTableSupplier) { + super(context, activeTableSupplier); + } + + public static class TableMetrics { + private final Timer activeMinc; + private final Timer queuedMinc; + + TableMetrics(MeterRegistry registry, Consumer meters, List tags) { + activeMinc = Timer.builder(MINC_RUNNING.getName()).description(MINC_RUNNING.getDescription()) + .tags(tags).register(registry); + queuedMinc = Timer.builder(MINC_QUEUED.getName()).description(MINC_QUEUED.getDescription()) + .tags(tags).register(registry); + meters.accept(activeMinc); + meters.accept(queuedMinc); + } + } + + public void addActive(TableId tableId, long value) { + getTableMetrics(tableId).activeMinc.record(Duration.ofMillis(value)); } - public void addQueued(long value) { - queuedMinc.record(Duration.ofMillis(value)); + public void addQueued(TableId tableId, long value) { + getTableMetrics(tableId).queuedMinc.record(Duration.ofMillis(value)); } @Override - public void registerMetrics(MeterRegistry registry) { - activeMinc = Timer.builder(MINC_RUNNING.getName()).description(MINC_RUNNING.getDescription()) - .register(registry); + protected TableMetrics newAllTablesMetrics(MeterRegistry registry, Consumer meters, + List tags) { + return new TableMetrics(registry, meters, tags); + } - queuedMinc = Timer.builder(MINC_QUEUED.getName()).description(MINC_QUEUED.getDescription()) - .register(registry); + @Override + protected TableMetrics newPerTableMetrics(MeterRegistry registry, TableId tableId, + Consumer meters, List tags) { + return new TableMetrics(registry, meters, tags); + } + + @Override + public void registerMetrics(MeterRegistry registry) { + super.registerMetrics(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index 9275f6fd326..a6df7dbda85 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@ -35,75 +35,128 @@ import static org.apache.accumulo.core.metrics.Metric.SCAN_ZOMBIE_THREADS; import java.time.Duration; +import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import java.util.function.Consumer; import java.util.function.IntSupplier; +import java.util.function.Supplier; -import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.server.metrics.NoopMetrics; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.metrics.PerTableMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; -public class TabletServerScanMetrics implements MetricsProducer { +public class TabletServerScanMetrics extends PerTableMetrics { + + private static final Logger log = LoggerFactory.getLogger(TabletServerScanMetrics.class); + + @Override + protected Logger getLog() { + return log; + } private final IntSupplier openFiles; - private Timer scans = NoopMetrics.useNoopTimer(); - private DistributionSummary resultsPerScan = NoopMetrics.useNoopDistributionSummary(); - private DistributionSummary yields = NoopMetrics.useNoopDistributionSummary(); - private final AtomicLong startScanCalls = new AtomicLong(0); - private final AtomicLong continueScanCalls = new AtomicLong(0); - private final AtomicLong closeScanCalls = new AtomicLong(0); - private final AtomicLong busyTimeoutCount = new AtomicLong(0); private final AtomicLong pausedForMemory = new AtomicLong(0); private final AtomicLong earlyReturnForMemory = new AtomicLong(0); private final AtomicLong zombieScanThreads = new AtomicLong(0); - private final LongAdder lookupCount = new LongAdder(); - private final LongAdder queryResultCount = new LongAdder(); - private final LongAdder queryResultBytes = new LongAdder(); - private final LongAdder scannedCount = new LongAdder(); + private final AtomicLong busyTimeoutCount = new AtomicLong(0); - public void incrementLookupCount() { - this.lookupCount.increment(); + public static class TableMetrics { + private final Timer scans; + private final DistributionSummary resultsPerScan; + private final DistributionSummary yields; + private final AtomicLong startScanCalls = new AtomicLong(0); + private final AtomicLong continueScanCalls = new AtomicLong(0); + private final AtomicLong closeScanCalls = new AtomicLong(0); + private final LongAdder lookupCount = new LongAdder(); + private final LongAdder queryResultCount = new LongAdder(); + private final LongAdder queryResultBytes = new LongAdder(); + private final LongAdder scannedCount = new LongAdder(); + + TableMetrics(MeterRegistry registry, Consumer meters, List tags) { + scans = Timer.builder(SCAN_TIMES.getName()).description(SCAN_TIMES.getDescription()) + .tags(tags).register(registry); + resultsPerScan = DistributionSummary.builder(SCAN_RESULTS.getName()) + .description(SCAN_RESULTS.getDescription()).tags(tags).register(registry); + yields = DistributionSummary.builder(SCAN_YIELDS.getName()) + .description(SCAN_YIELDS.getDescription()).tags(tags).register(registry); + meters.accept(scans); + meters.accept(resultsPerScan); + meters.accept(yields); + meters.accept( + FunctionCounter.builder(SCAN_START.getName(), this.startScanCalls, AtomicLong::get) + .description(SCAN_START.getDescription()).tags(tags).register(registry)); + meters.accept( + FunctionCounter.builder(SCAN_CONTINUE.getName(), this.continueScanCalls, AtomicLong::get) + .description(SCAN_CONTINUE.getDescription()).tags(tags).register(registry)); + meters.accept( + FunctionCounter.builder(SCAN_CLOSE.getName(), this.closeScanCalls, AtomicLong::get) + .description(SCAN_CLOSE.getDescription()).tags(tags).register(registry)); + meters + .accept(FunctionCounter.builder(SCAN_QUERIES.getName(), this.lookupCount, LongAdder::sum) + .description(SCAN_QUERIES.getDescription()).tags(tags).register(registry)); + meters.accept( + FunctionCounter.builder(SCAN_SCANNED_ENTRIES.getName(), this.scannedCount, LongAdder::sum) + .description(SCAN_SCANNED_ENTRIES.getDescription()).tags(tags).register(registry)); + meters.accept( + Gauge.builder(SCAN_QUERY_SCAN_RESULTS.getName(), this.queryResultCount, LongAdder::sum) + .description(SCAN_QUERY_SCAN_RESULTS.getDescription()).tags(tags).register(registry)); + meters.accept(Gauge + .builder(SCAN_QUERY_SCAN_RESULTS_BYTES.getName(), this.queryResultBytes, LongAdder::sum) + .description(SCAN_QUERY_SCAN_RESULTS_BYTES.getDescription()).tags(tags) + .register(registry)); + } + } + + public void incrementLookupCount(TableId tableId) { + getTableMetrics(tableId).lookupCount.increment(); } - public void incrementQueryResultCount(long amount) { - this.queryResultCount.add(amount); + public void incrementQueryResultCount(TableId tableId, long amount) { + getTableMetrics(tableId).queryResultCount.add(amount); } - public void incrementQueryResultBytes(long amount) { - this.queryResultBytes.add(amount); + public void incrementQueryResultBytes(TableId tableId, long amount) { + getTableMetrics(tableId).queryResultBytes.add(amount); } - public LongAdder getScannedCounter() { - return this.scannedCount; + public LongAdder getScannedCounter(TableId tableId) { + return getTableMetrics(tableId).scannedCount; } - public void addScan(long value) { - scans.record(Duration.ofMillis(value)); + public void addScan(TableId tableId, long value) { + getTableMetrics(tableId).scans.record(Duration.ofMillis(value)); } - public void addResult(long value) { - resultsPerScan.record(value); + public void addResult(TableId tableId, long value) { + getTableMetrics(tableId).resultsPerScan.record(value); } - public void addYield(long value) { - yields.record(value); + public void addYield(TableId tableId, long value) { + getTableMetrics(tableId).yields.record(value); } - public void incrementStartScan() { - startScanCalls.incrementAndGet(); + public void incrementStartScan(TableId tableId) { + getTableMetrics(tableId).startScanCalls.incrementAndGet(); } - public void incrementContinueScan() { - continueScanCalls.incrementAndGet(); + public void incrementContinueScan(TableId tableId) { + getTableMetrics(tableId).continueScanCalls.incrementAndGet(); } - public void incrementCloseScan() { - closeScanCalls.incrementAndGet(); + public void incrementCloseScan(TableId tableId) { + getTableMetrics(tableId).closeScanCalls.incrementAndGet(); } public void incrementBusy() { @@ -126,46 +179,41 @@ public long getZombieThreadsCount() { return zombieScanThreads.get(); } - public TabletServerScanMetrics(IntSupplier openFileSupplier) { + public TabletServerScanMetrics(ServerContext serverContext, + Supplier> activeTableSupplier, IntSupplier openFileSupplier) { + super(serverContext, activeTableSupplier); openFiles = openFileSupplier; } + @Override + protected TableMetrics newAllTablesMetrics(MeterRegistry registry, Consumer meters, + List tags) { + return new TableMetrics(registry, meters, tags); + } + + @Override + protected TableMetrics newPerTableMetrics(MeterRegistry registry, TableId tableId, + Consumer meters, List tags) { + return new TableMetrics(registry, meters, tags); + } + @Override public void registerMetrics(MeterRegistry registry) { + super.registerMetrics(registry); + Gauge.builder(SCAN_OPEN_FILES.getName(), openFiles::getAsInt) .description(SCAN_OPEN_FILES.getDescription()).register(registry); - scans = Timer.builder(SCAN_TIMES.getName()).description(SCAN_TIMES.getDescription()) - .register(registry); - resultsPerScan = DistributionSummary.builder(SCAN_RESULTS.getName()) - .description(SCAN_RESULTS.getDescription()).register(registry); - yields = DistributionSummary.builder(SCAN_YIELDS.getName()) - .description(SCAN_YIELDS.getDescription()).register(registry); - FunctionCounter.builder(SCAN_START.getName(), this.startScanCalls, AtomicLong::get) - .description(SCAN_START.getDescription()).register(registry); - FunctionCounter.builder(SCAN_CONTINUE.getName(), this.continueScanCalls, AtomicLong::get) - .description(SCAN_CONTINUE.getDescription()).register(registry); - FunctionCounter.builder(SCAN_CLOSE.getName(), this.closeScanCalls, AtomicLong::get) - .description(SCAN_CLOSE.getDescription()).register(registry); - FunctionCounter - .builder(SCAN_BUSY_TIMEOUT_COUNT.getName(), this.busyTimeoutCount, AtomicLong::get) - .description(SCAN_BUSY_TIMEOUT_COUNT.getDescription()).register(registry); - FunctionCounter.builder(SCAN_QUERIES.getName(), this.lookupCount, LongAdder::sum) - .description(SCAN_QUERIES.getDescription()).register(registry); - FunctionCounter.builder(SCAN_SCANNED_ENTRIES.getName(), this.scannedCount, LongAdder::sum) - .description(SCAN_SCANNED_ENTRIES.getDescription()).register(registry); FunctionCounter.builder(SCAN_PAUSED_FOR_MEM.getName(), this.pausedForMemory, AtomicLong::get) .description(SCAN_PAUSED_FOR_MEM.getDescription()).register(registry); FunctionCounter .builder(SCAN_RETURN_FOR_MEM.getName(), this.earlyReturnForMemory, AtomicLong::get) .description(SCAN_RETURN_FOR_MEM.getDescription()).register(registry); - Gauge.builder(SCAN_QUERY_SCAN_RESULTS.getName(), this.queryResultCount, LongAdder::sum) - .description(SCAN_QUERY_SCAN_RESULTS.getDescription()).register(registry); - Gauge.builder(SCAN_QUERY_SCAN_RESULTS_BYTES.getName(), this.queryResultBytes, LongAdder::sum) - .description(SCAN_QUERY_SCAN_RESULTS_BYTES.getDescription()).register(registry); Gauge .builder(SCAN_ZOMBIE_THREADS.getName(), this, TabletServerScanMetrics::getZombieThreadsCount) .description(SCAN_ZOMBIE_THREADS.getDescription()).register(registry); + FunctionCounter + .builder(SCAN_BUSY_TIMEOUT_COUNT.getName(), this.busyTimeoutCount, AtomicLong::get) + .description(SCAN_BUSY_TIMEOUT_COUNT.getDescription()).register(registry); } - } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java index 52453916cce..fed7b1c1e97 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java @@ -25,36 +25,83 @@ import static org.apache.accumulo.core.metrics.Metric.UPDATE_WALOG_WRITE; import java.time.Duration; +import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.metrics.NoopMetrics; +import org.apache.accumulo.server.metrics.PerTableMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.FunctionCounter; +import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; -public class TabletServerUpdateMetrics implements MetricsProducer { +public class TabletServerUpdateMetrics + extends PerTableMetrics implements MetricsProducer { + + private static final Logger log = LoggerFactory.getLogger(TabletServerUpdateMetrics.class); + + @Override + protected Logger getLog() { + return log; + } - private final AtomicLong permissionErrorsCount = new AtomicLong(); - private final AtomicLong unknownTabletErrorsCount = new AtomicLong(); - private final AtomicLong constraintViolationsCount = new AtomicLong(); private Timer commitPrepStat = NoopMetrics.useNoopTimer(); private Timer walogWriteTimeStat = NoopMetrics.useNoopTimer(); private Timer commitTimeStat = NoopMetrics.useNoopTimer(); - private DistributionSummary mutationArraySizeStat = NoopMetrics.useNoopDistributionSummary(); - public void addPermissionErrors(long value) { - permissionErrorsCount.addAndGet(value); + public TabletServerUpdateMetrics(ServerContext context, + Supplier> activeTableSupplier) { + super(context, activeTableSupplier); + } + + public static class TableMetrics { + + private final AtomicLong permissionErrorsCount = new AtomicLong(); + private final AtomicLong unknownTabletErrorsCount = new AtomicLong(); + private final AtomicLong constraintViolationsCount = new AtomicLong(); + + private DistributionSummary mutationArraySizeStat = NoopMetrics.useNoopDistributionSummary(); + + TableMetrics(TableId tableId, MeterRegistry registry, Consumer meters, List tags) { + meters.accept( + FunctionCounter.builder(UPDATE_ERRORS.getName(), permissionErrorsCount, AtomicLong::get) + .tags("type", "permission").tags(tags).description(UPDATE_ERRORS.getDescription()) + .register(registry)); + meters.accept(FunctionCounter + .builder(UPDATE_ERRORS.getName(), unknownTabletErrorsCount, AtomicLong::get) + .tags("type", "unknown.tablet").tags(tags).description(UPDATE_ERRORS.getDescription()) + .register(registry)); + meters.accept(FunctionCounter + .builder(UPDATE_ERRORS.getName(), constraintViolationsCount, AtomicLong::get) + .tags("type", "constraint.violation").tags(tags) + .description(UPDATE_ERRORS.getDescription()).register(registry)); + mutationArraySizeStat = DistributionSummary.builder(UPDATE_MUTATION_ARRAY_SIZE.getName()) + .description(UPDATE_MUTATION_ARRAY_SIZE.getDescription()).tags(tags).register(registry); + meters.accept(mutationArraySizeStat); + } } - public void addUnknownTabletErrors(long value) { - unknownTabletErrorsCount.addAndGet(value); + public void addPermissionErrors(TableId tableId, long value) { + getTableMetrics(tableId).permissionErrorsCount.addAndGet(value); } - public void addConstraintViolations(long value) { - constraintViolationsCount.addAndGet(value); + public void addUnknownTabletErrors(TableId tableId, long value) { + getTableMetrics(tableId).unknownTabletErrorsCount.addAndGet(value); + } + + public void addConstraintViolations(TableId tableId, long value) { + getTableMetrics(tableId).constraintViolationsCount.addAndGet(value); } public void addCommitPrep(long value) { @@ -69,28 +116,30 @@ public void addCommitTime(long value) { commitTimeStat.record(Duration.ofMillis(value)); } - public void addMutationArraySize(long value) { - mutationArraySizeStat.record(value); + public void addMutationArraySize(TableId tableId, long value) { + getTableMetrics(tableId).mutationArraySizeStat.record(value); + } + + @Override + protected TableMetrics newAllTablesMetrics(MeterRegistry registry, Consumer meters, + List tags) { + return new TableMetrics(null, registry, meters, tags); + } + + @Override + protected TableMetrics newPerTableMetrics(MeterRegistry registry, TableId tableId, + Consumer meters, List tags) { + return new TableMetrics(tableId, registry, meters, tags); } @Override public void registerMetrics(MeterRegistry registry) { - FunctionCounter.builder(UPDATE_ERRORS.getName(), permissionErrorsCount, AtomicLong::get) - .tags("type", "permission").description(UPDATE_ERRORS.getDescription()).register(registry); - FunctionCounter.builder(UPDATE_ERRORS.getName(), unknownTabletErrorsCount, AtomicLong::get) - .tags("type", "unknown.tablet").description(UPDATE_ERRORS.getDescription()) - .register(registry); - FunctionCounter.builder(UPDATE_ERRORS.getName(), constraintViolationsCount, AtomicLong::get) - .tags("type", "constraint.violation").description(UPDATE_ERRORS.getDescription()) - .register(registry); + super.registerMetrics(registry); commitPrepStat = Timer.builder(UPDATE_COMMIT_PREP.getName()) .description(UPDATE_COMMIT_PREP.getDescription()).register(registry); walogWriteTimeStat = Timer.builder(UPDATE_WALOG_WRITE.getName()) .description(UPDATE_WALOG_WRITE.getDescription()).register(registry); commitTimeStat = Timer.builder(UPDATE_COMMIT.getName()) .description(UPDATE_COMMIT.getDescription()).register(registry); - mutationArraySizeStat = DistributionSummary.builder(UPDATE_MUTATION_ARRAY_SIZE.getName()) - .description(UPDATE_MUTATION_ARRAY_SIZE.getDescription()).register(registry); } - } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index a7ec6855d03..f3363dc7000 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -189,7 +189,7 @@ private SortedKeyValueIterator createIterator() throws IOException { fileManager, files, scanParams.getAuthorizations(), samplerConfig, new ArrayList<>()); statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter(), - tablet.getScanMetrics().getScannedCounter()); + tablet.getScanMetrics().getScannedCounter(tablet.getExtent().tableId())); SortedKeyValueIterator visFilter = SystemIteratorUtil.setupSystemScanIterators(statsIterator, scanParams.getColumnSet(), diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java index 7e1b1d5cfb5..cdcc4716828 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java @@ -69,7 +69,7 @@ public Map getDatafiles() { @Override public void addToYieldMetric(int i) { - this.server.getScanMetrics().addYield(i); + this.server.getScanMetrics().addYield(getExtent().tableId(), i); } @Override diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 1ce57c87e40..d2b59ce1a6c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -434,9 +434,9 @@ DataFileValue minorCompact(InMemoryMap memTable, ReferencedTabletFile tmpDatafil lastMinorCompactionFinishTime = System.currentTimeMillis(); } TabletServerMinCMetrics minCMetrics = getTabletServer().getMinCMetrics(); - minCMetrics.addActive(lastMinorCompactionFinishTime - start); + minCMetrics.addActive(getExtent().tableId(), lastMinorCompactionFinishTime - start); timer.updateTime(Operation.MINOR, queued, start, count, failed); - minCMetrics.addQueued(start - queued); + minCMetrics.addQueued(getExtent().tableId(), start - queued); } } @@ -1106,7 +1106,7 @@ public Map getDatafiles() { @Override public void addToYieldMetric(int i) { - getTabletServer().getScanMetrics().addYield(i); + getTabletServer().getScanMetrics().addYield(getExtent().tableId(), i); } public double queryRate() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index d79d8408ec1..6918cdb01e0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -217,7 +217,7 @@ public Tablet.LookupResult lookup(List ranges, List results, try { SortedKeyValueIterator iter = new SourceSwitchingIterator(dataSource); this.lookupCount.incrementAndGet(); - this.server.getScanMetrics().incrementLookupCount(); + this.server.getScanMetrics().incrementLookupCount(getExtent().tableId()); result = lookup(iter, ranges, results, scanParams, maxResultSize); return result; } catch (IOException | RuntimeException e) { @@ -230,10 +230,12 @@ public Tablet.LookupResult lookup(List ranges, List results, synchronized (this) { queryResultCount.addAndGet(results.size()); - this.server.getScanMetrics().incrementQueryResultCount(results.size()); + this.server.getScanMetrics().incrementQueryResultCount(getExtent().tableId(), + results.size()); if (result != null) { this.queryResultBytes.addAndGet(result.dataSize); - this.server.getScanMetrics().incrementQueryResultBytes(result.dataSize); + this.server.getScanMetrics().incrementQueryResultBytes(getExtent().tableId(), + result.dataSize); } } } @@ -533,8 +535,8 @@ private void addUnfinishedRange(Tablet.LookupResult lookupResult, Range range, K public synchronized void updateQueryStats(int size, long numBytes) { this.queryResultCount.addAndGet(size); - this.server.getScanMetrics().incrementQueryResultCount(size); + this.server.getScanMetrics().incrementQueryResultCount(getExtent().tableId(), size); this.queryResultBytes.addAndGet(numBytes); - this.server.getScanMetrics().incrementQueryResultBytes(numBytes); + this.server.getScanMetrics().incrementQueryResultBytes(getExtent().tableId(), numBytes); } } diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index 7880f4f8b28..c00571f8b4a 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@ -63,6 +63,7 @@ import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.metrics.PerTableMetrics; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.accumulo.test.functional.SlowIterator; import org.apache.hadoop.conf.Configuration; @@ -114,7 +115,6 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit @Test public void confirmMetricsPublished() throws Exception { - // meter names sorted and formatting disabled to make it easier to diff changes // @formatter:off Set unexpectedMetrics = Set.of( @@ -170,7 +170,10 @@ public void confirmMetricsPublished() throws Exception { && !queueMetricsSeen.intersects(trueSet)) { // for each metric name not yet seen, check if it is expected, flaky, or unknown statsDMetrics.stream().filter(line -> line.startsWith("accumulo")) - .map(TestStatsDSink::parseStatsDMetric).map(metric -> Metric.fromName(metric.getName())) + .map(TestStatsDSink::parseStatsDMetric) + .peek(statsDMetric -> assertFalse( + statsDMetric.getTags().containsKey(PerTableMetrics.TABLE_ID_TAG_NAME))) + .map(metric -> Metric.fromName(metric.getName())) .filter(metric -> !seenMetrics.contains(metric)).forEach(metric -> { if (expectedMetrics.contains(metric)) { // record expected Metric as seen diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/PerTableMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/PerTableMetricsIT.java new file mode 100644 index 00000000000..50bce94d823 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/metrics/PerTableMetricsIT.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.metrics; + +import static org.apache.accumulo.core.metrics.Metric.MINC_QUEUED; +import static org.apache.accumulo.core.metrics.Metric.MINC_RUNNING; +import static org.apache.accumulo.core.metrics.Metric.SCAN_CLOSE; +import static org.apache.accumulo.core.metrics.Metric.SCAN_CONTINUE; +import static org.apache.accumulo.core.metrics.Metric.SCAN_QUERIES; +import static org.apache.accumulo.core.metrics.Metric.SCAN_QUERY_SCAN_RESULTS; +import static org.apache.accumulo.core.metrics.Metric.SCAN_QUERY_SCAN_RESULTS_BYTES; +import static org.apache.accumulo.core.metrics.Metric.SCAN_RESULTS; +import static org.apache.accumulo.core.metrics.Metric.SCAN_SCANNED_ENTRIES; +import static org.apache.accumulo.core.metrics.Metric.SCAN_START; +import static org.apache.accumulo.core.metrics.Metric.SCAN_TIMES; +import static org.apache.accumulo.core.metrics.Metric.SCAN_YIELDS; +import static org.apache.accumulo.core.metrics.Metric.TSERVER_ENTRIES; +import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_BYTES; +import static org.apache.accumulo.core.metrics.Metric.TSERVER_INGEST_MUTATIONS; +import static org.apache.accumulo.core.metrics.Metric.TSERVER_MEM_ENTRIES; +import static org.apache.accumulo.core.metrics.Metric.TSERVER_MINC_QUEUED; +import static org.apache.accumulo.core.metrics.Metric.TSERVER_MINC_RUNNING; +import static org.apache.accumulo.core.metrics.Metric.TSERVER_TABLETS_FILES; +import static org.apache.accumulo.core.metrics.Metric.TSERVER_TABLETS_ONLINE_ONDEMAND; +import static org.apache.accumulo.core.metrics.Metric.UPDATE_ERRORS; +import static org.apache.accumulo.core.metrics.Metric.UPDATE_MUTATION_ARRAY_SIZE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.metrics.Metric; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.metrics.PerTableMetrics; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; + +import io.micrometer.core.instrument.MeterRegistry; + +public class PerTableMetricsIT extends ConfigurableMacBase implements MetricsProducer { + + private static TestStatsDSink sink; + + private static final Logger log = LoggerFactory.getLogger(PerTableMetricsIT.class); + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(3); + } + + @BeforeAll + public static void before() throws Exception { + sink = new TestStatsDSink(); + } + + @AfterAll + public static void after() throws Exception { + sink.close(); + } + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); + cfg.setProperty(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL, "1s"); + // Tell the server processes to use a StatsDMeterRegistry and the simple logging registry + // that will be configured to push all metrics to the sink we started. + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_TABLE_METRICS_ENABLED, "true"); + cfg.setProperty("general.custom.metrics.opts.logging.step", "10s"); + String clazzList = LoggingMeterRegistryFactory.class.getName() + "," + + TestStatsDRegistryFactory.class.getName(); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList); + // this will cause more frequent activity on the scan ref table generating metrics for that + // table + cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "3s"); + Map sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); + } + + @Test + public void confirmMetricsPublished() throws Exception { + + // meter names sorted and formatting disabled to make it easier to diff changes + // @formatter:off + + // all metrics that can have a table id + Set expectedPerTableMetrics = Set.of( + SCAN_TIMES, + SCAN_RESULTS, + SCAN_YIELDS, + SCAN_START, + SCAN_CONTINUE, + SCAN_CLOSE, + SCAN_QUERIES, + SCAN_SCANNED_ENTRIES, + SCAN_QUERY_SCAN_RESULTS, + SCAN_QUERY_SCAN_RESULTS_BYTES, + TSERVER_ENTRIES, + TSERVER_MEM_ENTRIES, + TSERVER_MINC_RUNNING, + TSERVER_MINC_QUEUED, + TSERVER_TABLETS_ONLINE_ONDEMAND, + TSERVER_TABLETS_FILES, + TSERVER_INGEST_MUTATIONS, + TSERVER_INGEST_BYTES, + MINC_RUNNING, + MINC_QUEUED, + UPDATE_ERRORS, + UPDATE_MUTATION_ARRAY_SIZE); + + // metrics this test may not see + Set tableMetricsOkToNoSee = Set.of( + SCAN_YIELDS); + + // @formatter:on + + String[] tableNames = getUniqueNames(10); + Set testTables = Collections.synchronizedSet(new HashSet<>(Arrays.asList(tableNames))); + + var executor = Executors.newCachedThreadPool(); + + List> futures = new ArrayList<>(); + + for (var tableName : tableNames) { + futures.add(executor.submit(() -> { + doWorkToGenerateMetrics(tableName, testTables); + return tableName; + })); + } + + Set expectedTableIds = new HashSet<>(); + + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + while (expectedTableIds.size() != tableNames.length) { + client.tableOperations().tableIdMap().forEach((name, id) -> { + if (testTables.contains(name)) { + expectedTableIds.add(TableId.of(id)); + } + }); + Thread.sleep(10); + } + } + + expectedTableIds.addAll(AccumuloTable.allTableIds()); + + Map> metricsSeen = new HashMap<>(); + Set processNamesSeen = new HashSet<>(); + + while (true) { + // generate minor compaction metrics for the accumulo system tables + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + for (var accumuloTable : AccumuloTable.values()) { + client.tableOperations().flush(accumuloTable.tableName()); + } + } + + for (String meticLine : sink.getLines()) { + if (meticLine.startsWith("accumulo")) { + var statsdMetric = TestStatsDSink.parseStatsDMetric(meticLine); + var metric = Metric.fromName(statsdMetric.getName()); + var tableId = statsdMetric.getTags().get(PerTableMetrics.TABLE_ID_TAG_NAME); + if (tableId != null) { + assertTrue(expectedTableIds.contains(TableId.of(tableId)), + () -> "Unexpected table id seen in tag " + tableId + " " + meticLine); + // only some metrics are expected to have a table id, ensure the tableId tag does not + // show up where its not expected + assertTrue(expectedPerTableMetrics.contains(metric), + () -> "Saw unexpected table id tag on metric " + metric + " " + tableId + " " + + meticLine); + metricsSeen.computeIfAbsent(metric, m -> new HashSet<>()).add(TableId.of(tableId)); + processNamesSeen.add(statsdMetric.getTags().get("process.name")); + } else { + assertFalse(expectedPerTableMetrics.contains(metric), + () -> "Saw metric without expected tableId tag " + metric + " " + meticLine); + } + } + } + + if (metricsSeen.keySet().equals(expectedPerTableMetrics) || metricsSeen.keySet() + .equals(Sets.difference(expectedPerTableMetrics, tableMetricsOkToNoSee))) { + // saw all expected metrics + if (metricsSeen.values().stream().allMatch(tableIds -> tableIds.equals(expectedTableIds))) { + if (testTables.size() == tableNames.length) { + // Now that all tables ids were seen, remove two tables from the set of + // expectedTableIds. This should cause the background threads to delete the tables and + // eventually those tables should no longer be seen in metrics. + var iter = testTables.iterator(); + var tid1 = iter.next(); + iter.remove(); + var tid2 = iter.next(); + iter.remove(); + assertEquals(tableNames.length - 2, testTables.size()); + log.debug("Removed tables {} {}", tid1, tid2); + // clear metrics seen, going forward should eventually stop seeing the table id from the + // deleted tables in metrics + metricsSeen.clear(); + } else { + // should have seen per table metrics from tablet and scan servers at this point + assertEquals(Set.of("tserver", "sserver"), processNamesSeen); + // have seen everything expected, so the test was successful + break; + } + } else { + metricsSeen.forEach((metricSeen, tableIdsSeen) -> { + if (!tableIdsSeen.equals(expectedTableIds)) { + log.debug("tableIds seen for metric not as expected {} {}", metricSeen, + Sets.symmetricDifference(expectedTableIds, tableIdsSeen)); + } + }); + } + } else { + for (var m : Sets.difference(expectedPerTableMetrics, metricsSeen.keySet())) { + log.debug("have not seen metric {}", m); + } + } + + Thread.sleep(1000); + } + + // this will cause the rest of the background threads to stop + testTables.clear(); + + // check for any errors in background threads + for (var future : futures) { + future.get(); + } + + executor.shutdown(); + cluster.stop(); + } + + private void doWorkToGenerateMetrics(String tableName, Set testTables) throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + client.tableOperations().create(tableName); + SortedSet splits = new TreeSet<>(List.of(new Text("5"))); + client.tableOperations().addSplits(tableName, splits); + while (testTables.contains(tableName)) { + Thread.sleep(3_000); + BatchWriterConfig config = new BatchWriterConfig().setMaxMemory(0); + try (BatchWriter writer = client.createBatchWriter(tableName, config)) { + Mutation m = new Mutation("row"); + m.put("cf", "cq", new Value("value")); + writer.addMutation(m); + } + client.tableOperations().flush(tableName); + try (BatchWriter writer = client.createBatchWriter(tableName, config)) { + Mutation m = new Mutation("row"); + m.put("cf", "cq", new Value("value")); + writer.addMutation(m); + } + client.tableOperations().flush(tableName); + try (BatchWriter writer = client.createBatchWriter(tableName, config)) { + for (int i = 0; i < 10; i++) { + Mutation m = new Mutation(i + "_row"); + m.put("cf", "cq", new Value("value")); + writer.addMutation(m); + } + } + client.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + try (Scanner scanner = client.createScanner(tableName)) { + scanner.forEach((k, v) -> {}); + } + try (Scanner scanner = client.createScanner(tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.forEach((k, v) -> {}); + } + } + client.tableOperations().delete(tableName); + } + } + + @Override + public void registerMetrics(MeterRegistry registry) { + // unused; this class only extends MetricsProducer to easily reference its methods/constants + } +} From afa401f898ef36ecdac30cba429fb35f5ba65850 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 4 Nov 2024 23:27:39 +0000 Subject: [PATCH 2/4] code review update --- .../server/metrics/PerTableMetrics.java | 127 +++++++++++++++--- .../accumulo/tserver/AssignmentHandler.java | 3 +- .../apache/accumulo/tserver/ScanServer.java | 32 +++-- .../apache/accumulo/tserver/TabletServer.java | 23 ++-- .../tserver/metrics/TabletServerMetrics.java | 2 +- .../metrics/TabletServerMinCMetrics.java | 7 +- .../metrics/TabletServerScanMetrics.java | 6 +- .../metrics/TabletServerUpdateMetrics.java | 6 +- 8 files changed, 153 insertions(+), 53 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java index c3cc2ecee5d..37ecd2fa2d3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java @@ -24,14 +24,17 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.*; import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.*; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.threads.*; import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; @@ -65,21 +68,113 @@ public TableMetricsInfo(T2 tableMetrics, List meters) { } } - private final boolean perTableActive; - private final Supplier> activeTables; + private final ActiveTableIdTracker activeTableIdTracker; private final ConcurrentHashMap> perTableMetrics = new ConcurrentHashMap<>(); private T allTableMetrics; private volatile MeterRegistry registry; - public PerTableMetrics(ServerContext context, Supplier> activeTableSupplier) { - activeTables = activeTableSupplier; - perTableActive = - context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_TABLE_METRICS_ENABLED); - this.context = context; - if (perTableActive) { - context.getScheduledExecutor().scheduleAtFixedRate(this::refresh, 30, 30, TimeUnit.SECONDS); + /** + * Tracks the active set of table ids in a scan server or tablet server for the purpose of per + * table metrics. + * + *

+ * Each tablet or scan server should create a single instance of this object and pass it to all + * {@link PerTableMetrics} objects. + *

+ * + *

+ * This class does not offer a tabletUnloaded method because there is no efficient way to compute + * this. For more detail see {@link #tabletLoaded(KeyExtent)} + *

+ * + */ + public static class ActiveTableIdTracker { + + private final List> listeners = new ArrayList<>(); + private final Supplier> activeTableSupplier; + private final AtomicReference> currentTableIds; + private final boolean perTableActive; + + /** + * @param activeTableSupplier this supplier should always return the latest set of table ids + * that exist on the server. This class will take care of caching that set. + */ + public ActiveTableIdTracker(ServerContext context, Supplier> activeTableSupplier) { + this.activeTableSupplier = activeTableSupplier; + this.perTableActive = + context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_TABLE_METRICS_ENABLED); + if (perTableActive) { + // The scan server and tablets servers will make a best effort attempt to call tabletLoaded, + // but may not always because of exceptions. This periodic task will ensure changes in the + // active set of tableIds are seen. It also handles cleanup of tablet unload for which there + // is no explicit notification. + var future = context.getScheduledExecutor() + .scheduleAtFixedRate(this::checkIfTableIdsChanged, 30, 30, TimeUnit.SECONDS); + ThreadPools.watchCriticalScheduledTask(future); + } + this.currentTableIds = new AtomicReference<>(Set.of()); + } + + private synchronized void addChangeListener(PerTableMetrics listener) { + if (perTableActive) { + listeners.add(listener); + } + } + + private synchronized void checkIfTableIdsChanged() { + if (!perTableActive) { + return; + } + + var latest = activeTableSupplier.get(); + if (!latest.equals(currentTableIds.get())) { + currentTableIds.set(Set.copyOf(latest)); + listeners.forEach(listener -> listener.refresh(latest)); + } + } + + /** + * If the table id for this tablet is not being tracked will notify all per table metrics + * implementations of the new table. + * + *

+ * There is no corresponding tableUnloaded method because implementing it would be inefficient. + * When a scan server computes the set of table ids it iterates over all tablets. When a tablet + * is unloaded a scan server it could still have other tablets for the same table. On each + * tablet unload do not want to iterator over all other tablets to see if the table id is still + * active, which is what the implementation of tableUnloaded would do. + */ + public void tabletLoaded(KeyExtent extent) { + if (perTableActive && !currentTableIds.get().contains(extent.tableId())) { + checkIfTableIdsChanged(); + } + } + + // This method avoids locking unless a table id is unknown. This is important because every scan + // on a scan server will call it and locking would introduce thread contention. + public void tabletsLoaded(Set extents) { + if (perTableActive) { + var currentSnapshot = currentTableIds.get(); + for (var extent : extents) { + if (!currentSnapshot.contains(extent.tableId())) { + checkIfTableIdsChanged(); + break; + } + } + } } + + public boolean isPerTableMetricsEnabled() { + return perTableActive; + } + + } + + public PerTableMetrics(ServerContext context, ActiveTableIdTracker activeTableIdTracker) { + this.context = context; + this.activeTableIdTracker = activeTableIdTracker; + this.activeTableIdTracker.addChangeListener(this); } /** @@ -98,7 +193,7 @@ public PerTableMetrics(ServerContext context, Supplier> activeTable * passed for consistency with * {@link #newPerTableMetrics(MeterRegistry, TableId, Consumer, List)} * @param tags currently an empty collection of tags, this is passed for consistency with - * {@link #PerTableMetrics(ServerContext, Supplier)} + * {@link #newPerTableMetrics(MeterRegistry, TableId, Consumer, List)} * @return a new object that will be cached and later returned by * {@link #getTableMetrics(TableId)} */ @@ -125,7 +220,7 @@ protected abstract T newPerTableMetrics(MeterRegistry registry, TableId tableId, Consumer meters, List tags); private TableMetricsInfo getOrCreateTableMetrics(TableId tableId) { - Preconditions.checkState(perTableActive); + Preconditions.checkState(activeTableIdTracker.isPerTableMetricsEnabled()); return perTableMetrics.computeIfAbsent(tableId, tid -> { List meters = new ArrayList<>(); T tableMetrics = newPerTableMetrics(registry, tableId, meters::add, @@ -139,7 +234,7 @@ private TableMetricsInfo getOrCreateTableMetrics(TableId tableId) { public void registerMetrics(MeterRegistry registry) { Preconditions.checkState(this.registry == null); this.registry = registry; - if (!perTableActive) { + if (!activeTableIdTracker.isPerTableMetricsEnabled()) { this.allTableMetrics = newAllTablesMetrics(registry, m -> {}, List.of()); } } @@ -147,7 +242,7 @@ public void registerMetrics(MeterRegistry registry) { public T getTableMetrics(TableId tableId) { Preconditions.checkState(registry != null); - if (!perTableActive) { + if (!activeTableIdTracker.isPerTableMetricsEnabled()) { return allTableMetrics; } @@ -159,13 +254,11 @@ public T getTableMetrics(TableId tableId) { * currently have no table metrics object in the cache. It will also remove an per table metrics * object from the cache that have been inactive for a while or where the table was deleted. */ - public synchronized void refresh() { - if (!perTableActive || registry == null) { + private void refresh(Set currentActive) { + if (!activeTableIdTracker.isPerTableMetricsEnabled() || registry == null) { return; } - var currentActive = activeTables.get(); - currentActive.forEach(tid -> { // This registers metrics for the table if none are currently registered and resets the // inactiveTime if one exists diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java index 8eee32e2a0a..0742b0c1edf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java @@ -170,7 +170,8 @@ public void run() { } } - server.refreshMetrics(extent.tableId()); + // this must be called after server.onlineTablets is updated + server.getActiveTableIdTracker().tabletLoaded(extent); tablet = null; // release this reference successful = true; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 4188875f542..55ae83aa642 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -104,6 +104,7 @@ import org.apache.accumulo.server.compaction.PausedCompactionMetrics; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.metrics.*; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; @@ -208,6 +209,8 @@ private TabletMetadataLoader(Ample ample) { private final ZooCache managerLockCache; + final PerTableMetrics.ActiveTableIdTracker activeTableIdTracker; + public ScanServer(ConfigOpts opts, String[] args) { super("sserver", opts, ServerContext::new, args); @@ -283,6 +286,13 @@ public ScanServer(ConfigOpts opts, String[] args) { scanServerReservationExpiration, scanServerReservationExpiration, TimeUnit.MILLISECONDS)); + // The following will read through everything in the cache which could update the access time of + // everything which is not desired for this use case, however the cache is expire after write + // and not expire after access so its probably ok. + Supplier> activeTables = () -> tabletMetadataCache.asMap().keySet().stream() + .map(KeyExtent::tableId).collect(Collectors.toUnmodifiableSet()); + activeTableIdTracker = new PerTableMetrics.ActiveTableIdTracker(context, activeTables); + } @Override @@ -411,12 +421,8 @@ public void run() { MetricsInfo metricsInfo = getContext().getMetricsInfo(); - // The following will read through everything in the cache which could update the access time of - // everything which is not desired for this use case, however the cache is expire after write - // and not expire after access so its probably ok. - Supplier> activeTables = () -> tabletMetadataCache.asMap().keySet().stream() - .map(KeyExtent::tableId).collect(Collectors.toSet()); - scanMetrics = new TabletServerScanMetrics(context, activeTables, resourceManager::getOpenFiles); + scanMetrics = + new TabletServerScanMetrics(context, activeTableIdTracker, resourceManager::getOpenFiles); sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads); scanServerMetrics = new ScanServerMetrics(tabletMetadataCache); blockCacheMetrics = new BlockCacheMetrics(resourceManager.getIndexCache(), @@ -489,12 +495,20 @@ public void run() { } @SuppressWarnings("unchecked") - private Map getTabletMetadata(Collection extents) { + private Map getTabletMetadata(Set extents) { if (tabletMetadataCache == null) { + activeTableIdTracker.tabletsLoaded(extents); return (Map) tabletMetadataLoader .loadAll((Set) extents); } else { - return tabletMetadataCache.getAll(extents); + var tabletMetadata = tabletMetadataCache.getAll(extents); + // The cache does not have a mechanism to notify of things that were loaded like it does for + // evictions. There is a load handler passed to the cache, but this is called prior to + // something being loaded. Want to call the per table metrics tracking code after the new + // extent is loaded in the cache. The following method call is cheap if a table id for an + // extent is already being tracked, so its ok to call it for each cache access. + activeTableIdTracker.tabletsLoaded(extents); + return tabletMetadata; } } @@ -575,7 +589,7 @@ public void close() { * All extents passed in should end up in either the returned map or the failures set, but no * extent should be in both. */ - private Map reserveFilesInner(Collection extents, + private Map reserveFilesInner(Set extents, long myReservationId, Set failures) throws AccumuloException { // RFS is an acronym for Reference files for scan LOG.debug("RFFS {} ensuring files are referenced for scan of extents {}", myReservationId, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index f81f5d16f76..9639ca234aa 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -122,6 +122,7 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; +import org.apache.accumulo.server.metrics.*; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; @@ -170,13 +171,10 @@ public class TabletServer extends AbstractServer implements TabletHostingServer TabletServerMinCMetrics mincMetrics; PausedCompactionMetrics pausedMetrics; BlockCacheMetrics blockCacheMetrics; + private final PerTableMetrics.ActiveTableIdTracker activeTableIdTracker; - public void refreshMetrics(TableId tableId) { - // setup per table metrics for tables if not already setup - metrics.getTableMetrics(tableId); - scanMetrics.getTableMetrics(tableId); - updateMetrics.getTableMetrics(tableId); - mincMetrics.getTableMetrics(tableId); + public PerTableMetrics.ActiveTableIdTracker getActiveTableIdTracker() { + return activeTableIdTracker; } @Override @@ -348,6 +346,9 @@ private void logBusyTablets(List> busyTablets, authKeyWatcher = null; } config(); + + activeTableIdTracker = new PerTableMetrics.ActiveTableIdTracker(context, + () -> onlineTablets.perTableSnapshot().keySet()); } @Override @@ -592,13 +593,11 @@ public void run() { MetricsInfo metricsInfo = context.getMetricsInfo(); metrics = new TabletServerMetrics(this); - updateMetrics = - new TabletServerUpdateMetrics(context, () -> onlineTablets.perTableSnapshot().keySet()); - scanMetrics = new TabletServerScanMetrics(context, - () -> onlineTablets.perTableSnapshot().keySet(), this.resourceManager::getOpenFiles); + updateMetrics = new TabletServerUpdateMetrics(context, activeTableIdTracker); + scanMetrics = new TabletServerScanMetrics(context, activeTableIdTracker, + this.resourceManager::getOpenFiles); sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads); - mincMetrics = - new TabletServerMinCMetrics(context, () -> onlineTablets.perTableSnapshot().keySet()); + mincMetrics = new TabletServerMinCMetrics(context, activeTableIdTracker); pausedMetrics = new PausedCompactionMetrics(); blockCacheMetrics = new BlockCacheMetrics(this.resourceManager.getIndexCache(), this.resourceManager.getDataCache(), this.resourceManager.getSummaryCache()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java index f0499b1e028..272f198602a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java @@ -69,7 +69,7 @@ protected Logger getLog() { } public TabletServerMetrics(TabletServer tserver) { - super(tserver.getContext(), tserver::getOnlineTableIds); + super(tserver.getContext(), tserver.getActiveTableIdTracker()); util = new TabletServerMetricsUtil(tserver); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java index 9ad9ba28d5e..cb7055de2f4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java @@ -23,9 +23,7 @@ import java.time.Duration; import java.util.List; -import java.util.Set; import java.util.function.Consumer; -import java.util.function.Supplier; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.metrics.MetricsProducer; @@ -49,9 +47,8 @@ protected Logger getLog() { return log; } - public TabletServerMinCMetrics(ServerContext context, - Supplier> activeTableSupplier) { - super(context, activeTableSupplier); + public TabletServerMinCMetrics(ServerContext context, ActiveTableIdTracker activeTableIdTracker) { + super(context, activeTableIdTracker); } public static class TableMetrics { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index a6df7dbda85..0c7a491db20 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@ -36,12 +36,10 @@ import java.time.Duration; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; import java.util.function.IntSupplier; -import java.util.function.Supplier; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.server.ServerContext; @@ -180,8 +178,8 @@ public long getZombieThreadsCount() { } public TabletServerScanMetrics(ServerContext serverContext, - Supplier> activeTableSupplier, IntSupplier openFileSupplier) { - super(serverContext, activeTableSupplier); + ActiveTableIdTracker activeTableIdTracker, IntSupplier openFileSupplier) { + super(serverContext, activeTableIdTracker); openFiles = openFileSupplier; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java index fed7b1c1e97..efa143a46a6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerUpdateMetrics.java @@ -26,10 +26,8 @@ import java.time.Duration; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.function.Supplier; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.metrics.MetricsProducer; @@ -61,8 +59,8 @@ protected Logger getLog() { private Timer commitTimeStat = NoopMetrics.useNoopTimer(); public TabletServerUpdateMetrics(ServerContext context, - Supplier> activeTableSupplier) { - super(context, activeTableSupplier); + ActiveTableIdTracker activeTableIdTracker) { + super(context, activeTableIdTracker); } public static class TableMetrics { From 1c08f78b5d5afe2a764576ea18bfc536b8fa3fb3 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 5 Nov 2024 01:07:33 +0000 Subject: [PATCH 3/4] fix imports --- .../org/apache/accumulo/server/metrics/PerTableMetrics.java | 6 +++--- .../main/java/org/apache/accumulo/tserver/ScanServer.java | 2 +- .../main/java/org/apache/accumulo/tserver/TabletServer.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java index 37ecd2fa2d3..a31e6bdfbf2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java @@ -24,17 +24,17 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.*; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.util.Timer; -import org.apache.accumulo.core.util.threads.*; +import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 55ae83aa642..6db7b92a4c1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -104,7 +104,7 @@ import org.apache.accumulo.server.compaction.PausedCompactionMetrics; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.metrics.*; +import org.apache.accumulo.server.metrics.PerTableMetrics; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 9639ca234aa..d83d92b00a8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -122,7 +122,7 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; -import org.apache.accumulo.server.metrics.*; +import org.apache.accumulo.server.metrics.PerTableMetrics; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; From 60c48f0283fce1b3cad7b1215fe95aa8e44f7bc3 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 5 Nov 2024 01:59:04 +0000 Subject: [PATCH 4/4] add override annotation --- .../java/org/apache/accumulo/server/metrics/PerTableMetrics.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java index a31e6bdfbf2..38fedd97ffe 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/PerTableMetrics.java @@ -231,6 +231,7 @@ private TableMetricsInfo getOrCreateTableMetrics(TableId tableId) { }); } + @Override public void registerMetrics(MeterRegistry registry) { Preconditions.checkState(this.registry == null); this.registry = registry;