Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.1.0] Introduce a MessageInputStream abstraction, mirroring MessageOutputStream. #21207

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
.exit(
new AbruptExitException(
createDetailedExitCode(
"Error initializing execution log",
String.format("Error initializing execution log: %s", e.getMessage()),
Code.EXECUTION_LOG_INITIALIZATION_FAILURE)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.util.io.AsynchronousMessageOutputStream;
import com.google.devtools.build.lib.util.io.MessageInputStream;
import com.google.devtools.build.lib.util.io.MessageInputStreamWrapper.BinaryInputStreamWrapper;
import com.google.devtools.build.lib.util.io.MessageOutputStream;
import com.google.devtools.build.lib.util.io.MessageOutputStreamWrapper.BinaryOutputStreamWrapper;
import com.google.devtools.build.lib.util.io.MessageOutputStreamWrapper.JsonOutputStreamWrapper;
Expand All @@ -49,7 +51,6 @@
import com.google.devtools.build.lib.vfs.Symlinks;
import com.google.devtools.build.lib.vfs.XattrProvider;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -73,9 +74,12 @@ public enum Encoding {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final Path tempPath;
private final Encoding encoding;
private final boolean sorted;

private final Path tempPath;
private final Path outputPath;

private final PathFragment execRoot;
@Nullable private final RemoteOptions remoteOptions;
private final DigestHashFunction digestHashFunction;
Expand All @@ -84,9 +88,6 @@ public enum Encoding {
/** Output stream to write directly into during execution. */
private final MessageOutputStream<SpawnExec> rawOutputStream;

/** Output stream to convert the raw output stream into after execution, if required. */
@Nullable private final MessageOutputStream<SpawnExec> convertedOutputStream;

public ExpandedSpawnLogContext(
Path outputPath,
Path tempPath,
Expand All @@ -97,33 +98,38 @@ public ExpandedSpawnLogContext(
DigestHashFunction digestHashFunction,
XattrProvider xattrProvider)
throws IOException {
this.tempPath = tempPath;
this.encoding = encoding;
this.sorted = sorted;
this.tempPath = tempPath;
this.outputPath = outputPath;
this.execRoot = execRoot;
this.remoteOptions = remoteOptions;
this.digestHashFunction = digestHashFunction;
this.xattrProvider = xattrProvider;

if (encoding == Encoding.BINARY && !sorted) {
if (needsConversion()) {
// Write the unsorted binary format into a temporary path first, then convert into the output
// format after execution. Delete a preexisting output file so that an incomplete invocation
// doesn't appear to produce a nonsensical log.
outputPath.delete();
rawOutputStream = getRawOutputStream(tempPath);
} else {
// The unsorted binary format can be written directly into the output path during execution.
rawOutputStream = getRawOutputStream(outputPath);
convertedOutputStream = null;
} else {
// Otherwise, write the unsorted binary format into a temporary path first, then convert into
// the output format after execution.
rawOutputStream = getRawOutputStream(tempPath);
convertedOutputStream = getConvertedOutputStream(encoding, outputPath);
}
}

private boolean needsConversion() {
return encoding != Encoding.BINARY || sorted;
}

private static MessageOutputStream<SpawnExec> getRawOutputStream(Path path) throws IOException {
// Use an AsynchronousMessageOutputStream so that writes occur in a separate thread.
// This ensures concurrent writes don't tear and avoids blocking execution.
return new AsynchronousMessageOutputStream<>(path);
}

private static MessageOutputStream<SpawnExec> getConvertedOutputStream(
Encoding encoding, Path path) throws IOException {
private MessageOutputStream<SpawnExec> getConvertedOutputStream(Path path) throws IOException {
switch (encoding) {
case BINARY:
return new BinaryOutputStreamWrapper<>(path.getOutputStream());
Expand Down Expand Up @@ -283,21 +289,23 @@ public void logSpawn(
public void close() throws IOException {
rawOutputStream.close();

if (convertedOutputStream == null) {
// No conversion required.
if (!needsConversion()) {
return;
}

try (InputStream in = tempPath.getInputStream()) {
try (MessageInputStream<SpawnExec> rawInputStream =
new BinaryInputStreamWrapper<>(
tempPath.getInputStream(), SpawnExec.getDefaultInstance());
MessageOutputStream<SpawnExec> convertedOutputStream =
getConvertedOutputStream(outputPath)) {
if (sorted) {
StableSort.stableSort(in, convertedOutputStream);
StableSort.stableSort(rawInputStream, convertedOutputStream);
} else {
SpawnExec ex;
while ((ex = SpawnExec.parseDelimitedFrom(in)) != null) {
while ((ex = rawInputStream.read()) != null) {
convertedOutputStream.write(ex);
}
}
convertedOutputStream.close();
} finally {
try {
tempPath.delete();
Expand Down
148 changes: 69 additions & 79 deletions src/main/java/com/google/devtools/build/lib/exec/StableSort.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
//
package com.google.devtools.build.lib.exec;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
Expand All @@ -23,12 +22,12 @@
import com.google.devtools.build.lib.exec.Protos.SpawnExec;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.util.io.MessageInputStream;
import com.google.devtools.build.lib.util.io.MessageOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;

Expand All @@ -39,18 +38,9 @@
* <p>This is needed to allow textual diff comparisons of resultant logs.
*/
public final class StableSort {
private static ImmutableList<SpawnExec> read(InputStream in) throws IOException {
ImmutableList.Builder<SpawnExec> result = ImmutableList.builder();
SpawnExec ex;
while ((ex = SpawnExec.parseDelimitedFrom(in)) != null) {
result.add(ex);
}
return result.build();
}

/**
* Reads length-delimited wire format {@link SpawnExec} protos from an {@link InputStream}, sorts
* them, and writes them to a {@link MessageOutputStream}.
* Reads {@link SpawnExec} protos from an {@link MessageInputStream}, sorts them, and writes them
* to a {@link MessageOutputStream}.
*
* <p>The sort order has the following properties:
*
Expand All @@ -62,81 +52,81 @@ private static ImmutableList<SpawnExec> read(InputStream in) throws IOException
*
* <p>Assumes that there are no cyclic dependencies.
*/
public static void stableSort(InputStream in, MessageOutputStream<SpawnExec> out)
throws IOException {
public static void stableSort(
MessageInputStream<SpawnExec> in, MessageOutputStream<SpawnExec> out) throws IOException {
try (SilentCloseable c = Profiler.instance().profile("stableSort")) {
ImmutableList<SpawnExec> inputs;
ArrayList<SpawnExec> inputs = new ArrayList<>();

try (SilentCloseable c2 = Profiler.instance().profile("stableSort/read")) {
inputs = read(in);
SpawnExec ex;
while ((ex = in.read()) != null) {
inputs.add(ex);
}
}
stableSort(inputs, out);
}
}

public static void stableSort(List<SpawnExec> inputs, MessageOutputStream<SpawnExec> out)
throws IOException {
// A map from each output to every spawn that produces it.
// The same output may be produced by multiple spawns in the case of multiple test attempts.
Multimap<String, SpawnExec> outputProducer =
MultimapBuilder.hashKeys(inputs.size()).arrayListValues(1).build();

for (SpawnExec ex : inputs) {
for (File output : ex.getActualOutputsList()) {
String name = output.getPath();
outputProducer.put(name, ex);
}
}
// A map from each output to every spawn that produces it.
// The same output may be produced by multiple spawns in the case of multiple test attempts.
Multimap<String, SpawnExec> outputProducer =
MultimapBuilder.hashKeys(inputs.size()).arrayListValues(1).build();

// A blocks B if A produces an output consumed by B.
// Use reference equality to avoid expensive comparisons.
IdentitySetMultimap<SpawnExec, SpawnExec> blockedBy = new IdentitySetMultimap<>();
IdentitySetMultimap<SpawnExec, SpawnExec> blocking = new IdentitySetMultimap<>();

// The queue contains all spawns whose blockers have already been emitted.
PriorityQueue<SpawnExec> queue =
new PriorityQueue<>(
Comparator.comparing(
o -> {
// Sort by comparing the path of the first output. We don't want the sorting to
// rely on file hashes because we want the same action graph to be sorted in the
// same way regardless of file contents.
if (o.getListedOutputsCount() > 0) {
return "1_" + o.getListedOutputs(0);
}

// Get a proto with only stable information from this proto
SpawnExec.Builder stripped = SpawnExec.newBuilder();
stripped.addAllCommandArgs(o.getCommandArgsList());
stripped.addAllEnvironmentVariables(o.getEnvironmentVariablesList());
stripped.setPlatform(o.getPlatform());
stripped.addAllInputs(o.getInputsList());
stripped.setMnemonic(o.getMnemonic());

return "2_" + stripped.build();
}));

for (SpawnExec ex : inputs) {
boolean blocked = false;
for (File s : ex.getInputsList()) {
for (SpawnExec blocker : outputProducer.get(s.getPath())) {
blockedBy.put(ex, blocker);
blocking.put(blocker, ex);
blocked = true;
for (SpawnExec ex : inputs) {
for (File output : ex.getActualOutputsList()) {
String name = output.getPath();
outputProducer.put(name, ex);
}
}
if (!blocked) {
queue.add(ex);

// A blocks B if A produces an output consumed by B.
// Use reference equality to avoid expensive comparisons.
IdentitySetMultimap<SpawnExec, SpawnExec> blockedBy = new IdentitySetMultimap<>();
IdentitySetMultimap<SpawnExec, SpawnExec> blocking = new IdentitySetMultimap<>();

// The queue contains all spawns whose blockers have already been emitted.
PriorityQueue<SpawnExec> queue =
new PriorityQueue<>(
Comparator.comparing(
o -> {
// Sort by comparing the path of the first output. We don't want the sorting to
// rely on file hashes because we want the same action graph to be sorted in the
// same way regardless of file contents.
if (o.getListedOutputsCount() > 0) {
return "1_" + o.getListedOutputs(0);
}

// Get a proto with only stable information from this proto
SpawnExec.Builder stripped = SpawnExec.newBuilder();
stripped.addAllCommandArgs(o.getCommandArgsList());
stripped.addAllEnvironmentVariables(o.getEnvironmentVariablesList());
stripped.setPlatform(o.getPlatform());
stripped.addAllInputs(o.getInputsList());
stripped.setMnemonic(o.getMnemonic());

return "2_" + stripped.build();
}));

for (SpawnExec ex : inputs) {
boolean blocked = false;
for (File s : ex.getInputsList()) {
for (SpawnExec blocker : outputProducer.get(s.getPath())) {
blockedBy.put(ex, blocker);
blocking.put(blocker, ex);
blocked = true;
}
}
if (!blocked) {
queue.add(ex);
}
}
}

while (!queue.isEmpty()) {
SpawnExec curr = queue.remove();
out.write(curr);
while (!queue.isEmpty()) {
SpawnExec curr = queue.remove();
out.write(curr);

for (SpawnExec blocked : blocking.get(curr)) {
blockedBy.remove(blocked, curr);
if (!blockedBy.containsKey(blocked)) {
queue.add(blocked);
for (SpawnExec blocked : blocking.get(curr)) {
blockedBy.remove(blocked, curr);
if (!blockedBy.containsKey(blocked)) {
queue.add(blocked);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.util.io;

import com.google.protobuf.Message;
import java.io.IOException;
import javax.annotation.Nullable;

/** A variation of InputStream for protobuf messages. */
public interface MessageInputStream<T extends Message> extends AutoCloseable {
/** Reads a protobuf message from the underlying stream, or null if there are no more messages. */
@Nullable
T read() throws IOException;

/** Closes the underlying stream. Any following reads will fail. */
@Override
void close() throws IOException;
}
Loading
Loading