Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2055,6 +2055,11 @@ public static SlowLogResponseRequest buildSlowLogResponseRequest(
} else {
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR);
}
if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) {
builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
} else {
builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
}
return builder.setLimit(logQueryFilter.getLimit()).build();
}

Expand Down
12 changes: 12 additions & 0 deletions hbase-common/src/main/resources/hbase-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1996,4 +1996,16 @@ possible configurations would overwhelm and obscure the important.
too large batch request.
</description>
</property>
<property>
<name>hbase.namedqueue.provider.classes</name>
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value>
<description>
Default values for NamedQueueService implementors. This comma separated full class names
represent all implementors of NamedQueueService that we would like to be invoked by
LogEvent handler service. One example of NamedQueue service is SlowLogQueueService which
is used to store slow/large RPC logs in ringbuffer at each RegionServer.
All implementors of NamedQueueService should be found under package:
"org.apache.hadoop.hbase.namequeues.impl"
</description>
</property>
</configuration>
6 changes: 6 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,18 @@ message SlowLogResponseRequest {
OR = 1;
}

enum LogType {
SLOW_LOG = 0;
LARGE_LOG = 1;
}

optional string region_name = 1;
optional string table_name = 2;
optional string client_address = 3;
optional string user_name = 4;
optional uint32 limit = 5 [default = 10];
optional FilterByOperator filter_by_operator = 6 [default = OR];
optional LogType log_type = 7;
}

message SlowLogResponses {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
Expand Down Expand Up @@ -96,6 +97,7 @@ public abstract class RpcServer implements RpcServerInterface,
private static final String MULTI_SERVICE_CALLS = "multi.service_calls";

private final boolean authorize;
private final boolean isOnlineLogProviderEnabled;
protected boolean isSecurityEnabled;

public static final byte CURRENT_VERSION = 0;
Expand Down Expand Up @@ -227,7 +229,7 @@ public abstract class RpcServer implements RpcServerInterface,
/**
* Use to add online slowlog responses
*/
private SlowLogRecorder slowLogRecorder;
private NamedQueueRecorder namedQueueRecorder;

@FunctionalInterface
protected interface CallCleanup {
Expand Down Expand Up @@ -302,6 +304,8 @@ public RpcServer(final Server server, final String name,
saslProps = Collections.emptyMap();
}

this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
this.scheduler = scheduler;
}

Expand Down Expand Up @@ -430,11 +434,11 @@ public Pair<Message, CellScanner> call(RpcCall call,
tooLarge, tooSlow,
status.getClient(), startTime, processingTime, qTime,
responseSize, userName);
if (this.slowLogRecorder != null) {
if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
// send logs to ring buffer owned by slowLogRecorder
final String className = server == null ? StringUtils.EMPTY :
server.getClass().getSimpleName();
this.slowLogRecorder.addSlowLogPayload(
final String className =
server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
this.namedQueueRecorder.addRecord(
new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
tooLarge));
}
Expand Down Expand Up @@ -817,12 +821,8 @@ public void setRsRpcServices(RSRpcServices rsRpcServices) {
}

@Override
public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
this.slowLogRecorder = slowLogRecorder;
public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
this.namedQueueRecorder = namedQueueRecorder;
}

@Override
public SlowLogRecorder getSlowLogRecorder() {
return slowLogRecorder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -102,12 +102,8 @@ Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
/**
* Set Online SlowLog Provider
*
* @param slowLogRecorder instance of {@link SlowLogRecorder}
* @param namedQueueRecorder instance of {@link NamedQueueRecorder}
*/
void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
void setNamedQueueRecorder(final NamedQueueRecorder namedQueueRecorder);

/**
* @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer
*/
SlowLogRecorder getSlowLogRecorder();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;

import com.lmax.disruptor.ExceptionHandler;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.namequeues;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;

import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Event Handler run by disruptor ringbuffer consumer.
* Although this is generic implementation for namedQueue, it can have individual queue specific
* logic.
*/
@InterfaceAudience.Private
class LogEventHandler implements EventHandler<RingBufferEnvelope> {

private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);

// Map that binds namedQueues to corresponding queue service implementation.
// If NamedQueue of specific type is enabled, corresponding service will be used to
// insert and retrieve records.
// Individual queue sizes should be determined based on their individual configs within
// each service.
private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices =
new HashMap<>();

private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes";

LogEventHandler(final Configuration conf) {
for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) {
Class<?> clz;
try {
clz = Class.forName(implName);
} catch (ClassNotFoundException e) {
LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e);
continue;
}

if (!NamedQueueService.class.isAssignableFrom(clz)) {
LOG.warn("Class {} is not implementor of NamedQueueService.", clz);
continue;
}

// add all service mappings here
try {
NamedQueueService namedQueueService =
(NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf);
namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.",
clz);
}
}
}

/**
* Called when a publisher has published an event to the {@link RingBuffer}.
* This is generic consumer of disruptor ringbuffer and for each new namedQueue that we
* add, we should also provide specific consumer logic here.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from
* the {@link RingBuffer}
*/
@Override
public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) {
final NamedQueuePayload namedQueuePayload = event.getPayload();
// consume ringbuffer payload based on event type
namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
.consumeEventFromDisruptor(namedQueuePayload);
}

/**
* Cleans up queues maintained by services.
*
* @param namedQueueEvent type of queue to clear
* @return true if queue is cleaned up, false otherwise
*/
boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
}

/**
* Add all in memory queue records to system table. The implementors can use system table
* or direct HDFS file or ZK as persistence system.
*/
void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
namedQueueServices.get(namedQueueEvent).persistAll();
}

/**
* Retrieve in memory queue records from ringbuffer
*
* @param request namedQueue request with event type
* @return queue records from ringbuffer after filter (if applied)
*/
NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -30,7 +30,7 @@
* Event Handler utility class
*/
@InterfaceAudience.Private
class LogHandlerUtils {
public class LogHandlerUtils {

private static int getTotalFiltersCount(AdminProtos.SlowLogResponseRequest request) {
int totalFilters = 0;
Expand Down Expand Up @@ -91,7 +91,7 @@ private static List<TooSlowLog.SlowLogPayload> filterLogs(
return filteredSlowLogPayloads;
}

static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
public static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
AdminProtos.SlowLogResponseRequest request, List<TooSlowLog.SlowLogPayload> logPayloadList) {
int totalFilters = getTotalFiltersCount(request);
if (totalFilters > 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.namequeues;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Base payload to be prepared by client to send various namedQueue events for in-memory
* ring buffer storage in either HMaster or RegionServer.
* e.g slowLog responses
*/
@InterfaceAudience.Private
public class NamedQueuePayload {

public enum NamedQueueEvent {
SLOW_LOG
}

private final NamedQueueEvent namedQueueEvent;

public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
if (namedQueueEvent == null) {
throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
}
this.namedQueueEvent = namedQueueEvent;
}

public NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}

}
Loading