Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write/QueryWriteStatus logging refinement/addition #12928

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/util/io",
"//src/main/protobuf:remote_execution_log_java_proto",
"//third_party:flogger",
"//third_party:guava",
"//third_party:jsr305",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public LoggingInterceptor(AsynchronousFileOutputStream rpcLogFile, Clock clock)
return new ReadHandler(); // <ReadRequest, ReadResponse>
} else if (method == ByteStreamGrpc.getWriteMethod()) {
return new WriteHandler(); // <WriteRequest, WriteResponse>
} else if (method == ByteStreamGrpc.getQueryWriteStatusMethod()) {
return new QueryWriteStatusHandler(); // <QueryWriteStatusRequest, QueryWriteStatusResponse>
} else if (method == CapabilitiesGrpc.getGetCapabilitiesMethod()) {
return new GetCapabilitiesHandler(); // <GetCapabilitiesRequest, ServerCapabilities>
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.remote.logging;

import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails;

/** LoggingHandler for {@link google.bytestream.QueryWriteStatus} gRPC call. */
public class QueryWriteStatusHandler implements LoggingHandler<QueryWriteStatusRequest, QueryWriteStatusResponse> {
private final QueryWriteStatusDetails.Builder builder = QueryWriteStatusDetails.newBuilder();

@Override
public void handleReq(QueryWriteStatusRequest message) {
builder.setRequest(message);
}

@Override
public void handleResp(QueryWriteStatusResponse message) {
builder.setResponse(message);
}

@Override
public RpcCallDetails getDetails() {
return RpcCallDetails.newBuilder().setQueryWriteStatus(builder).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,40 @@

import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

/** LoggingHandler for {@link google.bytestream.Write} gRPC call. */
public class WriteHandler implements LoggingHandler<WriteRequest, WriteResponse> {
private final WriteDetails.Builder builder = WriteDetails.newBuilder();
private final Set<String> resources = new LinkedHashSet<>();
private final List<Long> offsets = new ArrayList<>();
private final List<Long> finishWrites = new ArrayList<>();
private long bytesSentInSequence = 0;
private long numWrites = 0;
private long bytesSent = 0;

@Override
public void handleReq(WriteRequest message) {
resources.add(message.getResourceName());
long writeOffset = message.getWriteOffset();
if (numWrites == 0 || Iterables.getLast(offsets) + bytesSentInSequence != writeOffset) {
offsets.add(writeOffset);
bytesSentInSequence = 0;
}
int size = message.getData().size();
if (message.getFinishWrite()) {
finishWrites.add(writeOffset + size);
}

numWrites++;
bytesSent += message.getData().size();
bytesSent += size;
bytesSentInSequence += size;
}

@Override
Expand All @@ -44,6 +60,8 @@ public void handleResp(WriteResponse message) {
@Override
public RpcCallDetails getDetails() {
builder.addAllResourceNames(resources);
builder.addAllOffsets(offsets);
builder.addAllFinishWrites(finishWrites);
builder.setNumWrites(numWrites);
builder.setBytesSent(bytesSent);
return RpcCallDetails.newBuilder().setWrite(builder).build();
Expand Down
30 changes: 27 additions & 3 deletions src/main/protobuf/remote_execution_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ message GetActionResultDetails {
// Details for a call to
// build.bazel.remote.execution.v2.ActionCache.UpdateActionResult.
message UpdateActionResultDetails {
// The build.bazel.remote.execution.v2.GetActionResultRequest sent by
// The build.bazel.remote.execution.v2.UpdateActionResultRequest sent by
// the call.
build.bazel.remote.execution.v2.UpdateActionResultRequest request = 1;

Expand Down Expand Up @@ -128,10 +128,24 @@ message ReadDetails {
message WriteDetails {
// The names of resources requested to be written to in this call in the order
// they were first requested in. If the ByteStream protocol is followed
// according to specification, this should only contain have a single element,
// which is the resource name specified in the first message of the stream.
// according to specification, this should contain at most two elements:
// The resource name specified in the first message of the stream, and an
// empty string specified in each successive request if num_writes > 1.
repeated string resource_names = 1;

// The offsets sent for the initial request and any non-sequential offsets
// specified over the course of the call. If the ByteStream protocol is
// followed according to specification, this should contain a single element
// which is the starting point for the write call.
repeated int64 offsets = 5;

// The effective final size for each request sent with finish_write true
// specified over the course of the call. If the ByteStream protocol is
// followed according to specification, this should contain a single element
// which is the total size of the written resource, including the initial
// offset.
repeated int64 finish_writes = 6;

// The number of writes performed in this call.
int64 num_writes = 2;

Expand All @@ -142,6 +156,15 @@ message WriteDetails {
google.bytestream.WriteResponse response = 4;
}

// Details for a call to google.bytestream.QueryWriteStatus.
message QueryWriteStatusDetails {
// The google.bytestream.QueryWriteStatusRequest sent by the call.
google.bytestream.QueryWriteStatusRequest request = 1;

// The received google.bytestream.QueryWriteStatusResponse.
google.bytestream.QueryWriteStatusResponse response = 2;
}

// Contains details for specific types of calls.
message RpcCallDetails {
reserved 1 to 4, 11;
Expand All @@ -152,6 +175,7 @@ message RpcCallDetails {
FindMissingBlobsDetails find_missing_blobs = 10;
ReadDetails read = 5;
WriteDetails write = 6;
QueryWriteStatusDetails query_write_status = 14;
GetCapabilitiesDetails get_capabilities = 12;
UpdateActionResultDetails update_action_result = 13;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.bytestream.ByteStreamProto.WriteRequest;
Expand All @@ -58,6 +60,7 @@
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetActionResultDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetCapabilitiesDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ReadDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.UpdateActionResultDetails;
Expand Down Expand Up @@ -854,6 +857,85 @@ public void onCompleted() {
WriteDetails.newBuilder()
.addResourceNames("test1")
.addResourceNames("test2")
.addOffsets(0)
.addOffsets(0)
.addOffsets(0)
// finish write is empty
.setResponse(response)
.setBytesSent(9)
.setNumWrites(3)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(10))
.setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000))
.build();

verify(logStream).write(expectedEntry);
}

// verify that the offset and finish_write compounding happens
@Test
public void testWriteCallValid() {
WriteRequest request1 =
WriteRequest.newBuilder()
.setResourceName("test1")
.setData(ByteString.copyFromUtf8("abc"))
.setWriteOffset(10)
.build();
WriteRequest request2 =
WriteRequest.newBuilder()
.setData(ByteString.copyFromUtf8("def"))
.setWriteOffset(request1.getWriteOffset() + request1.getData().size())
.build();
WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build();
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest writeRequest) {}

@Override
public void onError(Throwable throwable) {}

@Override
public void onCompleted() {
streamObserver.onNext(response);
streamObserver.onCompleted();
}
};
}
});

ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
@SuppressWarnings("unchecked")
StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);

clock.advanceMillis(10000);
// Request three writes, the first identical with the third, but offset correctly and finish_writing
StreamObserver<WriteRequest> requester = stub.write(responseObserver);
requester.onNext(request1);
clock.advanceMillis(100);
requester.onNext(request2);
clock.advanceMillis(200);
requester.onNext(request1.toBuilder()
.setWriteOffset(request2.getWriteOffset() + request2.getData().size())
.setFinishWrite(true)
.build());
clock.advanceMillis(100);
requester.onCompleted();

LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setWrite(
WriteDetails.newBuilder()
.addResourceNames("test1")
.addResourceNames("")
.addOffsets(request1.getWriteOffset())
.addFinishWrites(10 + request1.getData().size() * 2 + request2.getData().size())
.setResponse(response)
.setBytesSent(9)
.setNumWrites(3)))
Expand Down Expand Up @@ -907,6 +989,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
.setWrite(
WriteDetails.newBuilder()
.addResourceNames("test")
.addOffsets(0)
.setNumWrites(1)
.setBytesSent(3)))
.setStartTime(Timestamp.newBuilder().setSeconds(10000000))
Expand All @@ -915,4 +998,45 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb

verify(logStream).write(expectedEntry);
}

@Test
public void testQueryWriteStatusCallOk() {
QueryWriteStatusRequest request =
QueryWriteStatusRequest.newBuilder()
.setResourceName("test")
.build();
QueryWriteStatusResponse response =
QueryWriteStatusResponse.newBuilder()
.setCommittedSize(10)
.build();

serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void queryWriteStatus(
QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> responseObserver) {
clock.advanceMillis(22222);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
});
ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(loggedChannel);

clock.advanceMillis(11111);
stub.queryWriteStatus(request);
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getQueryWriteStatusMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setQueryWriteStatus(
QueryWriteStatusDetails.newBuilder()
.setRequest(request)
.setResponse(response)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
.setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
.build();
verify(logStream).write(expectedEntry);
}
}