From bb48a127f575847eaea73aa2f5bf9cff676d4ae4 Mon Sep 17 00:00:00 2001 From: George Gensure Date: Fri, 29 Jan 2021 11:52:22 -0500 Subject: [PATCH] Write/QueryWriteStatus logging refinement/addition Improve the logging of WriteRequests to include offset and finish_write information. Offsets are logged for the initial and non-sequential per successive write request. Each finish_write true request is logged with the effective size of the resource at the completion of the write request, including the current offset and payload. Clarified comments for WriteDetails, and corrected some comment inconsistencies. Add logging for QueryWriteStatus calls which occur on progressive writes to determine an offset to begin a write call on a retry. --- .../devtools/build/lib/remote/logging/BUILD | 1 + .../remote/logging/LoggingInterceptor.java | 2 + .../logging/QueryWriteStatusHandler.java | 40 ++++++ .../lib/remote/logging/WriteHandler.java | 20 ++- src/main/protobuf/remote_execution_log.proto | 30 ++++- .../logging/LoggingInterceptorTest.java | 124 ++++++++++++++++++ 6 files changed, 213 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/remote/logging/QueryWriteStatusHandler.java diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD index 46a9b5f5caaabe..b9fced4d796ed0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD @@ -19,6 +19,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/util/io", "//src/main/protobuf:remote_execution_log_java_proto", "//third_party:flogger", + "//third_party:guava", "//third_party:jsr305", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java index c6738942b2d783..8cffbaee9a4caa 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java @@ -74,6 +74,8 @@ public LoggingInterceptor(AsynchronousFileOutputStream rpcLogFile, Clock clock) return new ReadHandler(); // } else if (method == ByteStreamGrpc.getWriteMethod()) { return new WriteHandler(); // + } else if (method == ByteStreamGrpc.getQueryWriteStatusMethod()) { + return new QueryWriteStatusHandler(); // } else if (method == CapabilitiesGrpc.getGetCapabilitiesMethod()) { return new GetCapabilitiesHandler(); // } diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/QueryWriteStatusHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/QueryWriteStatusHandler.java new file mode 100644 index 00000000000000..7b7efcd4182bbb --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/QueryWriteStatusHandler.java @@ -0,0 +1,40 @@ +// Copyright 2021 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote.logging; + +import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; +import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse; +import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails; +import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails; + +/** LoggingHandler for {@link google.bytestream.QueryWriteStatus} gRPC call. */ +public class QueryWriteStatusHandler implements LoggingHandler { + private final QueryWriteStatusDetails.Builder builder = QueryWriteStatusDetails.newBuilder(); + + @Override + public void handleReq(QueryWriteStatusRequest message) { + builder.setRequest(message); + } + + @Override + public void handleResp(QueryWriteStatusResponse message) { + builder.setResponse(message); + } + + @Override + public RpcCallDetails getDetails() { + return RpcCallDetails.newBuilder().setQueryWriteStatus(builder).build(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java index 67e9e378e2ff3e..b5062957eaee20 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java @@ -16,24 +16,40 @@ import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; +import com.google.common.collect.Iterables; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails; +import java.util.ArrayList; import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; /** LoggingHandler for {@link google.bytestream.Write} gRPC call. */ public class WriteHandler implements LoggingHandler { private final WriteDetails.Builder builder = WriteDetails.newBuilder(); private final Set resources = new LinkedHashSet<>(); + private final List offsets = new ArrayList<>(); + private final List finishWrites = new ArrayList<>(); + private long bytesSentInSequence = 0; private long numWrites = 0; private long bytesSent = 0; @Override public void handleReq(WriteRequest message) { resources.add(message.getResourceName()); + long writeOffset = message.getWriteOffset(); + if (numWrites == 0 || Iterables.getLast(offsets) + bytesSentInSequence != writeOffset) { + offsets.add(writeOffset); + bytesSentInSequence = 0; + } + int size = message.getData().size(); + if (message.getFinishWrite()) { + finishWrites.add(writeOffset + size); + } numWrites++; - bytesSent += message.getData().size(); + bytesSent += size; + bytesSentInSequence += size; } @Override @@ -44,6 +60,8 @@ public void handleResp(WriteResponse message) { @Override public RpcCallDetails getDetails() { builder.addAllResourceNames(resources); + builder.addAllOffsets(offsets); + builder.addAllFinishWrites(finishWrites); builder.setNumWrites(numWrites); builder.setBytesSent(bytesSent); return RpcCallDetails.newBuilder().setWrite(builder).build(); diff --git a/src/main/protobuf/remote_execution_log.proto b/src/main/protobuf/remote_execution_log.proto index 84a07d64c8e230..51f64f1d8f9963 100644 --- a/src/main/protobuf/remote_execution_log.proto +++ b/src/main/protobuf/remote_execution_log.proto @@ -83,7 +83,7 @@ message GetActionResultDetails { // Details for a call to // build.bazel.remote.execution.v2.ActionCache.UpdateActionResult. message UpdateActionResultDetails { - // The build.bazel.remote.execution.v2.GetActionResultRequest sent by + // The build.bazel.remote.execution.v2.UpdateActionResultRequest sent by // the call. build.bazel.remote.execution.v2.UpdateActionResultRequest request = 1; @@ -128,10 +128,24 @@ message ReadDetails { message WriteDetails { // The names of resources requested to be written to in this call in the order // they were first requested in. If the ByteStream protocol is followed - // according to specification, this should only contain have a single element, - // which is the resource name specified in the first message of the stream. + // according to specification, this should contain at most two elements: + // The resource name specified in the first message of the stream, and an + // empty string specified in each successive request if num_writes > 1. repeated string resource_names = 1; + // The offsets sent for the initial request and any non-sequential offsets + // specified over the course of the call. If the ByteStream protocol is + // followed according to specification, this should contain a single element + // which is the starting point for the write call. + repeated int64 offsets = 5; + + // The effective final size for each request sent with finish_write true + // specified over the course of the call. If the ByteStream protocol is + // followed according to specification, this should contain a single element + // which is the total size of the written resource, including the initial + // offset. + repeated int64 finish_writes = 6; + // The number of writes performed in this call. int64 num_writes = 2; @@ -142,6 +156,15 @@ message WriteDetails { google.bytestream.WriteResponse response = 4; } +// Details for a call to google.bytestream.QueryWriteStatus. +message QueryWriteStatusDetails { + // The google.bytestream.QueryWriteStatusRequest sent by the call. + google.bytestream.QueryWriteStatusRequest request = 1; + + // The received google.bytestream.QueryWriteStatusResponse. + google.bytestream.QueryWriteStatusResponse response = 2; +} + // Contains details for specific types of calls. message RpcCallDetails { reserved 1 to 4, 11; @@ -152,6 +175,7 @@ message RpcCallDetails { FindMissingBlobsDetails find_missing_blobs = 10; ReadDetails read = 5; WriteDetails write = 6; + QueryWriteStatusDetails query_write_status = 14; GetCapabilitiesDetails get_capabilities = 12; UpdateActionResultDetails update_action_result = 13; } diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java index 17ee021c73a839..ed5f206deca460 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java @@ -49,6 +49,8 @@ import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub; import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; import com.google.bytestream.ByteStreamGrpc.ByteStreamStub; +import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; +import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse; import com.google.bytestream.ByteStreamProto.ReadRequest; import com.google.bytestream.ByteStreamProto.ReadResponse; import com.google.bytestream.ByteStreamProto.WriteRequest; @@ -58,6 +60,7 @@ import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetActionResultDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetCapabilitiesDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry; +import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ReadDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.UpdateActionResultDetails; @@ -854,6 +857,85 @@ public void onCompleted() { WriteDetails.newBuilder() .addResourceNames("test1") .addResourceNames("test2") + .addOffsets(0) + .addOffsets(0) + .addOffsets(0) + // finish write is empty + .setResponse(response) + .setBytesSent(9) + .setNumWrites(3))) + .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(10)) + .setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000)) + .build(); + + verify(logStream).write(expectedEntry); + } + + // verify that the offset and finish_write compounding happens + @Test + public void testWriteCallValid() { + WriteRequest request1 = + WriteRequest.newBuilder() + .setResourceName("test1") + .setData(ByteString.copyFromUtf8("abc")) + .setWriteOffset(10) + .build(); + WriteRequest request2 = + WriteRequest.newBuilder() + .setData(ByteString.copyFromUtf8("def")) + .setWriteOffset(request1.getWriteOffset() + request1.getData().size()) + .build(); + WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build(); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver streamObserver) { + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) {} + + @Override + public void onError(Throwable throwable) {} + + @Override + public void onCompleted() { + streamObserver.onNext(response); + streamObserver.onCompleted(); + } + }; + } + }); + + ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel); + @SuppressWarnings("unchecked") + StreamObserver responseObserver = Mockito.mock(StreamObserver.class); + + clock.advanceMillis(10000); + // Request three writes, the first identical with the third, but offset correctly and finish_writing + StreamObserver requester = stub.write(responseObserver); + requester.onNext(request1); + clock.advanceMillis(100); + requester.onNext(request2); + clock.advanceMillis(200); + requester.onNext(request1.toBuilder() + .setWriteOffset(request2.getWriteOffset() + request2.getData().size()) + .setFinishWrite(true) + .build()); + clock.advanceMillis(100); + requester.onCompleted(); + + LogEntry expectedEntry = + LogEntry.newBuilder() + .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName()) + .setDetails( + RpcCallDetails.newBuilder() + .setWrite( + WriteDetails.newBuilder() + .addResourceNames("test1") + .addResourceNames("") + .addOffsets(request1.getWriteOffset()) + .addFinishWrites(10 + request1.getData().size() * 2 + request2.getData().size()) .setResponse(response) .setBytesSent(9) .setNumWrites(3))) @@ -907,6 +989,7 @@ public StreamObserver write(StreamObserver streamOb .setWrite( WriteDetails.newBuilder() .addResourceNames("test") + .addOffsets(0) .setNumWrites(1) .setBytesSent(3))) .setStartTime(Timestamp.newBuilder().setSeconds(10000000)) @@ -915,4 +998,45 @@ public StreamObserver write(StreamObserver streamOb verify(logStream).write(expectedEntry); } + + @Test + public void testQueryWriteStatusCallOk() { + QueryWriteStatusRequest request = + QueryWriteStatusRequest.newBuilder() + .setResourceName("test") + .build(); + QueryWriteStatusResponse response = + QueryWriteStatusResponse.newBuilder() + .setCommittedSize(10) + .build(); + + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, StreamObserver responseObserver) { + clock.advanceMillis(22222); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + }); + ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(loggedChannel); + + clock.advanceMillis(11111); + stub.queryWriteStatus(request); + LogEntry expectedEntry = + LogEntry.newBuilder() + .setMethodName(ByteStreamGrpc.getQueryWriteStatusMethod().getFullMethodName()) + .setDetails( + RpcCallDetails.newBuilder() + .setQueryWriteStatus( + QueryWriteStatusDetails.newBuilder() + .setRequest(request) + .setResponse(response))) + .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000)) + .setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000)) + .build(); + verify(logStream).write(expectedEntry); + } }