Skip to content

Commit

Permalink
RATIS-2244. Reduce the number of log messages during bootstrap (#1217)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkatsambath authored Feb 3, 2025
1 parent 345641f commit 2664ac8
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private void sendRequestWithRetry(PendingOrderedRequest pending) {
final Throwable exception = e;
final String key = client.getId() + "-" + request.getCallId() + "-" + exception;
final Consumer<String> op = suffix -> LOG.error("{} {}: Failed* {}", suffix, client.getId(), request, exception);
BatchLogger.warn(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
BatchLogger.print(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
handleException(pending, request, e);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ default TimeDuration getBatchDuration() {

private static final class UniqueId {
private final Key key;
private final String name;
private final Object name;

private UniqueId(Key key, String name) {
private UniqueId(Key key, Object name) {
this.key = Objects.requireNonNull(key, "key == null");
this.name = name;
}
Expand Down Expand Up @@ -99,15 +99,15 @@ private synchronized boolean tryStartBatch(Consumer<String> op) {
private static final TimeoutExecutor SCHEDULER = TimeoutExecutor.getInstance();
private static final ConcurrentMap<UniqueId, BatchedLogEntry> LOG_CACHE = new ConcurrentHashMap<>();

public static void warn(Key key, String name, Consumer<String> op) {
warn(key, name, op, key.getBatchDuration(), true);
public static void print(Key key, Object name, Consumer<String> op) {
print(key, name, op, key.getBatchDuration(), true);
}

public static void warn(Key key, String name, Consumer<String> op, TimeDuration batchDuration) {
warn(key, name, op, batchDuration, true);
public static void print(Key key, Object name, Consumer<String> op, TimeDuration batchDuration) {
print(key, name, op, batchDuration, true);
}

public static void warn(Key key, String name, Consumer<String> op, TimeDuration batchDuration, boolean shouldBatch) {
public static void print(Key key, Object name, Consumer<String> op, TimeDuration batchDuration, boolean shouldBatch) {
if (!shouldBatch || batchDuration.isNonPositive()) {
op.accept("");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class GrpcLogAppender extends LogAppenderBase {

private enum BatchLogKey implements BatchLogger.Key {
RESET_CLIENT,
INCONSISTENCY_REPLY,
APPEND_LOG_RESPONSE_HANDLER_ON_ERROR
}

Expand Down Expand Up @@ -217,7 +218,7 @@ private void resetClient(AppendEntriesRequest request, Event event) {
.orElseGet(f::getMatchIndex);
if (event.isError() && request == null) {
final long followerNextIndex = f.getNextIndex();
BatchLogger.warn(BatchLogKey.RESET_CLIENT, f.getId() + "-" + followerNextIndex, suffix ->
BatchLogger.print(BatchLogKey.RESET_CLIENT, f.getId() + "-" + followerNextIndex, suffix ->
LOG.warn("{}: Follower failed (request=null, errorCount={}); keep nextIndex ({}) unchanged and retry.{}",
this, errorCount, followerNextIndex, suffix), logMessageBatchDuration);
return;
Expand Down Expand Up @@ -534,8 +535,9 @@ private void onNextImpl(AppendEntriesRequest request, AppendEntriesReplyProto re
break;
case INCONSISTENCY:
grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
LOG.warn("{}: received {} reply with nextIndex {}, errorCount={}, request={}",
this, reply.getResult(), reply.getNextIndex(), errorCount, request);
BatchLogger.print(BatchLogKey.INCONSISTENCY_REPLY, getFollower().getName() + "_" + reply.getNextIndex(),
suffix -> LOG.warn("{}: received {} reply with nextIndex {}, errorCount={}, request={} {}",
this, reply.getResult(), reply.getNextIndex(), errorCount, request, suffix));
final long requestFirstIndex = request != null? request.getFirstIndex(): RaftLog.INVALID_LOG_INDEX;
updateNextIndex(getNextIndexForInconsistency(requestFirstIndex, reply.getNextIndex()));
break;
Expand All @@ -555,7 +557,7 @@ public void onError(Throwable t) {
LOG.info("{} is already stopped", GrpcLogAppender.this);
return;
}
BatchLogger.warn(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, AppendLogResponseHandler.this.name,
BatchLogger.print(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, AppendLogResponseHandler.this.name,
suffix -> GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries" + suffix, t),
logMessageBatchDuration, t instanceof StatusRuntimeException);
grpcServerMetrics.onRequestRetry(); // Update try counter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
import org.apache.ratis.util.BatchLogger;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
Expand All @@ -49,6 +51,11 @@
class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class);

private enum BatchLogKey implements BatchLogger.Key {
COMPLETED_REQUEST,
COMPLETED_REPLY
}

static class PendingServerRequest<REQUEST> {
private final AtomicReference<ReferenceCountedObject<REQUEST>> requestRef;
private final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down Expand Up @@ -76,6 +83,7 @@ void release() {

abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObserver<REQUEST> {
private final RaftServer.Op op;
private final Supplier<String> nameSupplier;
private final StreamObserver<REPLY> responseObserver;
/** For ordered {@link #onNext(Object)} requests. */
private final AtomicReference<PendingServerRequest<REQUEST>> previousOnNext = new AtomicReference<>();
Expand All @@ -86,9 +94,14 @@ abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObse

ServerRequestStreamObserver(RaftServer.Op op, StreamObserver<REPLY> responseObserver) {
this.op = op;
this.nameSupplier = MemoizedSupplier.valueOf(() -> getId() + "_" + op);
this.responseObserver = responseObserver;
}

String getName() {
return nameSupplier.get();
}

private String getPreviousRequestString() {
return Optional.ofNullable(previousOnNext.get())
.map(PendingServerRequest::getRequest)
Expand Down Expand Up @@ -197,9 +210,12 @@ public void onNext(REQUEST request) {
@Override
public void onCompleted() {
if (isClosed.compareAndSet(false, true)) {
LOG.info("{}: Completed {}, lastRequest: {}", getId(), op, getPreviousRequestString());
BatchLogger.print(BatchLogKey.COMPLETED_REQUEST, getName(),
suffix -> LOG.info("{}: Completed {}, lastRequest: {} {}",
getId(), op, getPreviousRequestString(), suffix));
requestFuture.get().thenAccept(reply -> {
LOG.info("{}: Completed {}, lastReply: {}", getId(), op, reply);
BatchLogger.print(BatchLogKey.COMPLETED_REPLY, getName(),
suffix -> LOG.info("{}: Completed {}, lastReply: {} {}", getId(), op, reply, suffix));
responseObserver.onCompleted();
});
releaseLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.util.BatchLogger;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
Expand All @@ -59,6 +60,11 @@
class SnapshotInstallationHandler {
static final Logger LOG = LoggerFactory.getLogger(SnapshotInstallationHandler.class);

private enum BatchLogKey implements BatchLogger.Key {
INSTALL_SNAPSHOT_REQUEST,
INSTALL_SNAPSHOT_REPLY
}

static final TermIndex INVALID_TERM_INDEX = TermIndex.valueOf(0, INVALID_LOG_INDEX);

private final RaftServerImpl server;
Expand Down Expand Up @@ -93,21 +99,19 @@ long getInProgressInstallSnapshotIndex() {
}

InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("{}: receive installSnapshot: {}", getMemberId(),
ServerStringUtils.toInstallSnapshotRequestString(request));
}
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REQUEST, getMemberId(),
suffix -> LOG.info("{}: receive installSnapshot: {} {}",
getMemberId(), ServerStringUtils.toInstallSnapshotRequestString(request), suffix));
final InstallSnapshotReplyProto reply;
try {
reply = installSnapshotImpl(request);
} catch (Exception e) {
LOG.error("{}: installSnapshot failed", getMemberId(), e);
throw e;
}
if (LOG.isInfoEnabled()) {
LOG.info("{}: reply installSnapshot: {}", getMemberId(),
ServerStringUtils.toInstallSnapshotReplyString(reply));
}
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REPLY, getMemberId(),
suffix -> LOG.info("{}: reply installSnapshot: {} {}",
getMemberId(), ServerStringUtils.toInstallSnapshotReplyString(reply), suffix));
return reply;
}

Expand Down

0 comments on commit 2664ac8

Please sign in to comment.