diff --git a/src/main/java/com/google/devtools/build/lib/bazel/SpawnLogModule.java b/src/main/java/com/google/devtools/build/lib/bazel/SpawnLogModule.java index 1fcce471cf4133..62bbbd7c0b384b 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/SpawnLogModule.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/SpawnLogModule.java @@ -149,7 +149,7 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB .exit( new AbruptExitException( createDetailedExitCode( - "Error initializing execution log", + String.format("Error initializing execution log: %s", e.getMessage()), Code.EXECUTION_LOG_INITIALIZATION_FAILURE))); } } diff --git a/src/main/java/com/google/devtools/build/lib/exec/ExpandedSpawnLogContext.java b/src/main/java/com/google/devtools/build/lib/exec/ExpandedSpawnLogContext.java index 5852f505752496..5e24922a3ef0e5 100644 --- a/src/main/java/com/google/devtools/build/lib/exec/ExpandedSpawnLogContext.java +++ b/src/main/java/com/google/devtools/build/lib/exec/ExpandedSpawnLogContext.java @@ -38,6 +38,8 @@ import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.util.io.AsynchronousMessageOutputStream; +import com.google.devtools.build.lib.util.io.MessageInputStream; +import com.google.devtools.build.lib.util.io.MessageInputStreamWrapper.BinaryInputStreamWrapper; import com.google.devtools.build.lib.util.io.MessageOutputStream; import com.google.devtools.build.lib.util.io.MessageOutputStreamWrapper.BinaryOutputStreamWrapper; import com.google.devtools.build.lib.util.io.MessageOutputStreamWrapper.JsonOutputStreamWrapper; @@ -49,7 +51,6 @@ import com.google.devtools.build.lib.vfs.Symlinks; import com.google.devtools.build.lib.vfs.XattrProvider; import java.io.IOException; -import java.io.InputStream; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -73,9 +74,12 @@ public enum Encoding { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); - private final Path tempPath; + private final Encoding encoding; private final boolean sorted; + private final Path tempPath; + private final Path outputPath; + private final PathFragment execRoot; @Nullable private final RemoteOptions remoteOptions; private final DigestHashFunction digestHashFunction; @@ -84,9 +88,6 @@ public enum Encoding { /** Output stream to write directly into during execution. */ private final MessageOutputStream rawOutputStream; - /** Output stream to convert the raw output stream into after execution, if required. */ - @Nullable private final MessageOutputStream convertedOutputStream; - public ExpandedSpawnLogContext( Path outputPath, Path tempPath, @@ -97,33 +98,38 @@ public ExpandedSpawnLogContext( DigestHashFunction digestHashFunction, XattrProvider xattrProvider) throws IOException { - this.tempPath = tempPath; + this.encoding = encoding; this.sorted = sorted; + this.tempPath = tempPath; + this.outputPath = outputPath; this.execRoot = execRoot; this.remoteOptions = remoteOptions; this.digestHashFunction = digestHashFunction; this.xattrProvider = xattrProvider; - if (encoding == Encoding.BINARY && !sorted) { + if (needsConversion()) { + // Write the unsorted binary format into a temporary path first, then convert into the output + // format after execution. Delete a preexisting output file so that an incomplete invocation + // doesn't appear to produce a nonsensical log. + outputPath.delete(); + rawOutputStream = getRawOutputStream(tempPath); + } else { // The unsorted binary format can be written directly into the output path during execution. rawOutputStream = getRawOutputStream(outputPath); - convertedOutputStream = null; - } else { - // Otherwise, write the unsorted binary format into a temporary path first, then convert into - // the output format after execution. - rawOutputStream = getRawOutputStream(tempPath); - convertedOutputStream = getConvertedOutputStream(encoding, outputPath); } } + private boolean needsConversion() { + return encoding != Encoding.BINARY || sorted; + } + private static MessageOutputStream getRawOutputStream(Path path) throws IOException { // Use an AsynchronousMessageOutputStream so that writes occur in a separate thread. // This ensures concurrent writes don't tear and avoids blocking execution. return new AsynchronousMessageOutputStream<>(path); } - private static MessageOutputStream getConvertedOutputStream( - Encoding encoding, Path path) throws IOException { + private MessageOutputStream getConvertedOutputStream(Path path) throws IOException { switch (encoding) { case BINARY: return new BinaryOutputStreamWrapper<>(path.getOutputStream()); @@ -283,21 +289,23 @@ public void logSpawn( public void close() throws IOException { rawOutputStream.close(); - if (convertedOutputStream == null) { - // No conversion required. + if (!needsConversion()) { return; } - try (InputStream in = tempPath.getInputStream()) { + try (MessageInputStream rawInputStream = + new BinaryInputStreamWrapper<>( + tempPath.getInputStream(), SpawnExec.getDefaultInstance()); + MessageOutputStream convertedOutputStream = + getConvertedOutputStream(outputPath)) { if (sorted) { - StableSort.stableSort(in, convertedOutputStream); + StableSort.stableSort(rawInputStream, convertedOutputStream); } else { SpawnExec ex; - while ((ex = SpawnExec.parseDelimitedFrom(in)) != null) { + while ((ex = rawInputStream.read()) != null) { convertedOutputStream.write(ex); } } - convertedOutputStream.close(); } finally { try { tempPath.delete(); diff --git a/src/main/java/com/google/devtools/build/lib/exec/StableSort.java b/src/main/java/com/google/devtools/build/lib/exec/StableSort.java index eebed68260abcc..92dddc6114e3a6 100644 --- a/src/main/java/com/google/devtools/build/lib/exec/StableSort.java +++ b/src/main/java/com/google/devtools/build/lib/exec/StableSort.java @@ -14,7 +14,6 @@ // package com.google.devtools.build.lib.exec; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.google.common.collect.MultimapBuilder; @@ -23,12 +22,12 @@ import com.google.devtools.build.lib.exec.Protos.SpawnExec; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.SilentCloseable; +import com.google.devtools.build.lib.util.io.MessageInputStream; import com.google.devtools.build.lib.util.io.MessageOutputStream; import java.io.IOException; -import java.io.InputStream; +import java.util.ArrayList; import java.util.Comparator; import java.util.IdentityHashMap; -import java.util.List; import java.util.PriorityQueue; import java.util.Set; @@ -39,18 +38,9 @@ *

This is needed to allow textual diff comparisons of resultant logs. */ public final class StableSort { - private static ImmutableList read(InputStream in) throws IOException { - ImmutableList.Builder result = ImmutableList.builder(); - SpawnExec ex; - while ((ex = SpawnExec.parseDelimitedFrom(in)) != null) { - result.add(ex); - } - return result.build(); - } - /** - * Reads length-delimited wire format {@link SpawnExec} protos from an {@link InputStream}, sorts - * them, and writes them to a {@link MessageOutputStream}. + * Reads {@link SpawnExec} protos from an {@link MessageInputStream}, sorts them, and writes them + * to a {@link MessageOutputStream}. * *

The sort order has the following properties: * @@ -62,81 +52,81 @@ private static ImmutableList read(InputStream in) throws IOException * *

Assumes that there are no cyclic dependencies. */ - public static void stableSort(InputStream in, MessageOutputStream out) - throws IOException { + public static void stableSort( + MessageInputStream in, MessageOutputStream out) throws IOException { try (SilentCloseable c = Profiler.instance().profile("stableSort")) { - ImmutableList inputs; + ArrayList inputs = new ArrayList<>(); + try (SilentCloseable c2 = Profiler.instance().profile("stableSort/read")) { - inputs = read(in); + SpawnExec ex; + while ((ex = in.read()) != null) { + inputs.add(ex); + } } - stableSort(inputs, out); - } - } - public static void stableSort(List inputs, MessageOutputStream out) - throws IOException { - // A map from each output to every spawn that produces it. - // The same output may be produced by multiple spawns in the case of multiple test attempts. - Multimap outputProducer = - MultimapBuilder.hashKeys(inputs.size()).arrayListValues(1).build(); - - for (SpawnExec ex : inputs) { - for (File output : ex.getActualOutputsList()) { - String name = output.getPath(); - outputProducer.put(name, ex); - } - } + // A map from each output to every spawn that produces it. + // The same output may be produced by multiple spawns in the case of multiple test attempts. + Multimap outputProducer = + MultimapBuilder.hashKeys(inputs.size()).arrayListValues(1).build(); - // A blocks B if A produces an output consumed by B. - // Use reference equality to avoid expensive comparisons. - IdentitySetMultimap blockedBy = new IdentitySetMultimap<>(); - IdentitySetMultimap blocking = new IdentitySetMultimap<>(); - - // The queue contains all spawns whose blockers have already been emitted. - PriorityQueue queue = - new PriorityQueue<>( - Comparator.comparing( - o -> { - // Sort by comparing the path of the first output. We don't want the sorting to - // rely on file hashes because we want the same action graph to be sorted in the - // same way regardless of file contents. - if (o.getListedOutputsCount() > 0) { - return "1_" + o.getListedOutputs(0); - } - - // Get a proto with only stable information from this proto - SpawnExec.Builder stripped = SpawnExec.newBuilder(); - stripped.addAllCommandArgs(o.getCommandArgsList()); - stripped.addAllEnvironmentVariables(o.getEnvironmentVariablesList()); - stripped.setPlatform(o.getPlatform()); - stripped.addAllInputs(o.getInputsList()); - stripped.setMnemonic(o.getMnemonic()); - - return "2_" + stripped.build(); - })); - - for (SpawnExec ex : inputs) { - boolean blocked = false; - for (File s : ex.getInputsList()) { - for (SpawnExec blocker : outputProducer.get(s.getPath())) { - blockedBy.put(ex, blocker); - blocking.put(blocker, ex); - blocked = true; + for (SpawnExec ex : inputs) { + for (File output : ex.getActualOutputsList()) { + String name = output.getPath(); + outputProducer.put(name, ex); } } - if (!blocked) { - queue.add(ex); + + // A blocks B if A produces an output consumed by B. + // Use reference equality to avoid expensive comparisons. + IdentitySetMultimap blockedBy = new IdentitySetMultimap<>(); + IdentitySetMultimap blocking = new IdentitySetMultimap<>(); + + // The queue contains all spawns whose blockers have already been emitted. + PriorityQueue queue = + new PriorityQueue<>( + Comparator.comparing( + o -> { + // Sort by comparing the path of the first output. We don't want the sorting to + // rely on file hashes because we want the same action graph to be sorted in the + // same way regardless of file contents. + if (o.getListedOutputsCount() > 0) { + return "1_" + o.getListedOutputs(0); + } + + // Get a proto with only stable information from this proto + SpawnExec.Builder stripped = SpawnExec.newBuilder(); + stripped.addAllCommandArgs(o.getCommandArgsList()); + stripped.addAllEnvironmentVariables(o.getEnvironmentVariablesList()); + stripped.setPlatform(o.getPlatform()); + stripped.addAllInputs(o.getInputsList()); + stripped.setMnemonic(o.getMnemonic()); + + return "2_" + stripped.build(); + })); + + for (SpawnExec ex : inputs) { + boolean blocked = false; + for (File s : ex.getInputsList()) { + for (SpawnExec blocker : outputProducer.get(s.getPath())) { + blockedBy.put(ex, blocker); + blocking.put(blocker, ex); + blocked = true; + } + } + if (!blocked) { + queue.add(ex); + } } - } - while (!queue.isEmpty()) { - SpawnExec curr = queue.remove(); - out.write(curr); + while (!queue.isEmpty()) { + SpawnExec curr = queue.remove(); + out.write(curr); - for (SpawnExec blocked : blocking.get(curr)) { - blockedBy.remove(blocked, curr); - if (!blockedBy.containsKey(blocked)) { - queue.add(blocked); + for (SpawnExec blocked : blocking.get(curr)) { + blockedBy.remove(blocked, curr); + if (!blockedBy.containsKey(blocked)) { + queue.add(blocked); + } } } } diff --git a/src/main/java/com/google/devtools/build/lib/util/io/MessageInputStream.java b/src/main/java/com/google/devtools/build/lib/util/io/MessageInputStream.java new file mode 100644 index 00000000000000..be146f0f1aef45 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/util/io/MessageInputStream.java @@ -0,0 +1,29 @@ +// Copyright 2024 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.util.io; + +import com.google.protobuf.Message; +import java.io.IOException; +import javax.annotation.Nullable; + +/** A variation of InputStream for protobuf messages. */ +public interface MessageInputStream extends AutoCloseable { + /** Reads a protobuf message from the underlying stream, or null if there are no more messages. */ + @Nullable + T read() throws IOException; + + /** Closes the underlying stream. Any following reads will fail. */ + @Override + void close() throws IOException; +} diff --git a/src/main/java/com/google/devtools/build/lib/util/io/MessageInputStreamWrapper.java b/src/main/java/com/google/devtools/build/lib/util/io/MessageInputStreamWrapper.java new file mode 100644 index 00000000000000..d36ffa0a2bdad0 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/util/io/MessageInputStreamWrapper.java @@ -0,0 +1,91 @@ +// Copyright 2024 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.util.io; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.protobuf.ExtensionRegistry; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.io.InputStream; +import java.util.Scanner; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import javax.annotation.Nullable; + +/** Creates a MessageInputStream from an OutputStream. */ +public class MessageInputStreamWrapper { + + private MessageInputStreamWrapper() {} + + /** Reads the messages in length-delimited protobuf wire format. */ + public static class BinaryInputStreamWrapper implements MessageInputStream { + private final InputStream stream; + private final Parser parser; + + @SuppressWarnings("unchecked") + public BinaryInputStreamWrapper(InputStream stream, T defaultInstance) { + this.stream = checkNotNull(stream); + this.parser = (Parser) defaultInstance.getParserForType(); + } + + @Override + @Nullable + public T read() throws IOException { + return parser.parseDelimitedFrom(stream, ExtensionRegistry.getEmptyRegistry()); + } + + @Override + public void close() throws IOException { + stream.close(); + } + } + + /** Reads the messages in concatenated JSON text format. */ + public static class JsonInputStreamWrapper implements MessageInputStream { + private static final JsonFormat.Parser PARSER = JsonFormat.parser().ignoringUnknownFields(); + + // The string `\n}{\n` is a reliable delimiter, but we must use lookbehind/lookahead to avoid + // consuming the braces when tokenizing. + private static final Pattern DELIMITER = Pattern.compile("(?<=\\n\\})(?=\\{\\n)"); + + private final Scanner scanner; + private final Supplier builderSupplier; + + public JsonInputStreamWrapper(InputStream stream, T defaultInstance) { + this.scanner = new Scanner(checkNotNull(stream), UTF_8).useDelimiter(DELIMITER); + this.builderSupplier = defaultInstance::newBuilderForType; + } + + @Override + @Nullable + @SuppressWarnings("unchecked") + public T read() throws IOException { + if (!scanner.hasNext()) { + return null; + } + Message.Builder builder = builderSupplier.get(); + PARSER.merge(scanner.next(), builder); + return (T) builder.build(); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/util/io/MessageOutputStream.java b/src/main/java/com/google/devtools/build/lib/util/io/MessageOutputStream.java index 418226a36e9450..93ac9746641e14 100644 --- a/src/main/java/com/google/devtools/build/lib/util/io/MessageOutputStream.java +++ b/src/main/java/com/google/devtools/build/lib/util/io/MessageOutputStream.java @@ -17,10 +17,11 @@ import java.io.IOException; /** A variation of OutputStream for protobuf messages. */ -public interface MessageOutputStream { +public interface MessageOutputStream extends AutoCloseable { /** Writes a protobuf message to the underlying stream. */ void write(T m) throws IOException; /** Closes the underlying stream. Any following writes will fail. */ + @Override void close() throws IOException; } diff --git a/src/main/java/com/google/devtools/build/lib/util/io/MessageOutputStreamWrapper.java b/src/main/java/com/google/devtools/build/lib/util/io/MessageOutputStreamWrapper.java index 8f48428a1d8831..0fa519c697dc83 100644 --- a/src/main/java/com/google/devtools/build/lib/util/io/MessageOutputStreamWrapper.java +++ b/src/main/java/com/google/devtools/build/lib/util/io/MessageOutputStreamWrapper.java @@ -11,30 +11,33 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package com.google.devtools.build.lib.util.io; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.nio.charset.StandardCharsets.UTF_8; + import com.google.protobuf.Message; import com.google.protobuf.util.JsonFormat; import java.io.IOException; import java.io.OutputStream; -import java.nio.charset.StandardCharsets; /** Creates a MessageOutputStream from an OutputStream. */ public class MessageOutputStreamWrapper { - /** Outputs the messages in delimited protobuf binary format. */ + + private MessageOutputStreamWrapper() {} + + /** Writes the messages in length-delimited protobuf wire format. */ public static class BinaryOutputStreamWrapper implements MessageOutputStream { private final OutputStream stream; public BinaryOutputStreamWrapper(OutputStream stream) { - this.stream = Preconditions.checkNotNull(stream); + this.stream = checkNotNull(stream); } @Override public void write(T m) throws IOException { - Preconditions.checkNotNull(m); + checkNotNull(m); m.writeDelimitedTo(stream); } @@ -44,20 +47,21 @@ public void close() throws IOException { } } - /** Outputs the messages in JSON text format. */ + /** Writes the messages in concatenated JSON text format. */ public static class JsonOutputStreamWrapper implements MessageOutputStream { + private static final JsonFormat.Printer PRINTER = + JsonFormat.printer().includingDefaultValueFields(); + private final OutputStream stream; - private final JsonFormat.Printer printer = JsonFormat.printer().includingDefaultValueFields(); public JsonOutputStreamWrapper(OutputStream stream) { - Preconditions.checkNotNull(stream); - this.stream = stream; + this.stream = checkNotNull(stream); } @Override public void write(T m) throws IOException { - Preconditions.checkNotNull(m); - stream.write(printer.print(m).getBytes(StandardCharsets.UTF_8)); + checkNotNull(m); + stream.write(PRINTER.print(m).getBytes(UTF_8)); } @Override diff --git a/src/test/java/com/google/devtools/build/lib/exec/StableSortTest.java b/src/test/java/com/google/devtools/build/lib/exec/StableSortTest.java index 4410de59aa5a67..ba56faf49e4a2c 100644 --- a/src/test/java/com/google/devtools/build/lib/exec/StableSortTest.java +++ b/src/test/java/com/google/devtools/build/lib/exec/StableSortTest.java @@ -20,11 +20,12 @@ import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.exec.Protos.File; import com.google.devtools.build.lib.exec.Protos.SpawnExec; +import com.google.devtools.build.lib.util.io.MessageInputStream; +import com.google.devtools.build.lib.util.io.MessageInputStreamWrapper.BinaryInputStreamWrapper; import com.google.devtools.build.lib.util.io.MessageOutputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; import org.junit.Test; @@ -51,16 +52,20 @@ public void write(SpawnExec m) throws IOException { public void close() throws IOException {} } - ArrayList testStableSort(List list) throws Exception { - ListOutput o = new ListOutput(); + private List testStableSort(List list) throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); for (SpawnExec spawn : list) { spawn.writeDelimitedTo(baos); } - InputStream inputStream = new ByteArrayInputStream(baos.toByteArray()); - StableSort.stableSort(inputStream, o); - return o.list; + MessageInputStream in = + new BinaryInputStreamWrapper<>( + new ByteArrayInputStream(baos.toByteArray()), SpawnExec.getDefaultInstance()); + + ListOutput out = new ListOutput(); + + StableSort.stableSort(in, out); + return out.list; } private static SpawnExec.Builder createSpawnExecBuilder(