Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,11 @@ public static SlowLogResponseRequest buildSlowLogResponseRequest(
} else {
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR);
}
if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) {
builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
} else {
builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
}
return builder.setLimit(logQueryFilter.getLimit()).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,18 @@ message SlowLogResponseRequest {
OR = 1;
}

enum LogType {
SLOW_LOG = 0;
LARGE_LOG = 1;
}

optional string region_name = 1;
optional string table_name = 2;
optional string client_address = 3;
optional string user_name = 4;
optional uint32 limit = 5 [default = 10];
optional FilterByOperator filter_by_operator = 6 [default = OR];
optional LogType log_type = 7;
}

message SlowLogResponses {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
Expand Down Expand Up @@ -94,10 +95,9 @@ public abstract class RpcServer implements RpcServerInterface,
private static final String MULTI_GETS = "multi.gets";
private static final String MULTI_MUTATIONS = "multi.mutations";
private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
private static final String GET_SLOW_LOG_RESPONSES = "GetSlowLogResponses";
private static final String CLEAR_SLOW_LOGS_RESPONSES = "ClearSlowLogsResponses";

private final boolean authorize;
private final boolean isOnlineLogProviderEnabled;
protected boolean isSecurityEnabled;

public static final byte CURRENT_VERSION = 0;
Expand Down Expand Up @@ -229,7 +229,7 @@ public abstract class RpcServer implements RpcServerInterface,
/**
* Use to add online slowlog responses
*/
private SlowLogRecorder slowLogRecorder;
private NamedQueueRecorder namedQueueRecorder;

@FunctionalInterface
protected interface CallCleanup {
Expand Down Expand Up @@ -304,6 +304,8 @@ public RpcServer(final Server server, final String name,
saslProps = Collections.emptyMap();
}

this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
this.scheduler = scheduler;
}

Expand Down Expand Up @@ -432,11 +434,11 @@ public Pair<Message, CellScanner> call(RpcCall call,
tooLarge, tooSlow,
status.getClient(), startTime, processingTime, qTime,
responseSize, userName);
if (this.slowLogRecorder != null) {
if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
// send logs to ring buffer owned by slowLogRecorder
final String className = server == null ? StringUtils.EMPTY :
server.getClass().getSimpleName();
this.slowLogRecorder.addSlowLogPayload(
final String className =
server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
this.namedQueueRecorder.addRecord(
new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
tooLarge));
}
Expand Down Expand Up @@ -819,12 +821,8 @@ public void setRsRpcServices(RSRpcServices rsRpcServices) {
}

@Override
public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
this.slowLogRecorder = slowLogRecorder;
public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
this.namedQueueRecorder = namedQueueRecorder;
}

@Override
public SlowLogRecorder getSlowLogRecorder() {
return slowLogRecorder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -102,12 +102,8 @@ Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
/**
* Set Online SlowLog Provider
*
* @param slowLogRecorder instance of {@link SlowLogRecorder}
* @param namedQueueRecorder instance of {@link NamedQueueRecorder}
*/
void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
void setNamedQueueRecorder(final NamedQueueRecorder namedQueueRecorder);

/**
* @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer
*/
SlowLogRecorder getSlowLogRecorder();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;

import com.lmax.disruptor.ExceptionHandler;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.namequeues;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Event Handler run by disruptor ringbuffer consumer.
* Although this is generic implementation for namedQueue, it can have individual queue specific
* logic.
*/
@InterfaceAudience.Private
class LogEventHandler implements EventHandler<RingBufferEnvelope> {

// Map that binds namedQueues to corresponding queue service implementation.
// If NamedQueue of specific type is enabled, corresponding service will be used to
// insert and retrieve records.
// Individual queue sizes should be determined based on their individual configs within
// each service.
private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices =
new HashMap<>();

LogEventHandler(final Configuration conf) {
// add all service mappings here
namedQueueServices
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about define a String collection property for all implementations of NamedQueueService? So that this handler can load those dynamically, without requiring code changes once new implementations are added.

Copy link
Contributor Author

@virajjasani virajjasani Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your suggestion is to use reflection to find all implementors of NamedQueueService interface? Or some separate string collection? If we have separate immutable string(classname) collection, even that will require changes once we implement new implementor. Or I am missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use reflection and avoid changing this class with every new implementation added, something similar to what we do for SaslServerAuthenticationProviders

.put(NamedQueuePayload.NamedQueueEvent.SLOW_LOG, new SlowLogQueueService(conf));
}

/**
* Called when a publisher has published an event to the {@link RingBuffer}.
* This is generic consumer of disruptor ringbuffer and for each new namedQueue that we
* add, we should also provide specific consumer logic here.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from
* the {@link RingBuffer}
*/
@Override
public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) {
final NamedQueuePayload namedQueuePayload = event.getPayload();
// consume ringbuffer payload based on event type
namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
.consumeEventFromDisruptor(namedQueuePayload);
}

/**
* Cleans up queues maintained by services.
*
* @param namedQueueEvent type of queue to clear
* @return true if queue is cleaned up, false otherwise
*/
boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
}

/**
* Add all in memory queue records to system table. The implementors can use system table
* or direct HDFS file or ZK as persistence system.
*/
void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
namedQueueServices.get(namedQueueEvent).persistAll();
}

/**
* Retrieve in memory queue records from ringbuffer
*
* @param request namedQueue request with event type
* @return queue records from ringbuffer after filter (if applied)
*/
NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.namequeues;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Base payload to be prepared by client to send various namedQueue events for in-memory
* ring buffer storage in either HMaster or RegionServer.
* e.g slowLog responses
*/
@InterfaceAudience.Private
public class NamedQueuePayload {

public enum NamedQueueEvent {
SLOW_LOG
}

private final NamedQueueEvent namedQueueEvent;

public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
if (namedQueueEvent == null) {
throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
}
this.namedQueueEvent = namedQueueEvent;
}

public NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}

}
Loading