From 160c1735e76c448541412d0b7ba7bfe0915996e3 Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Mon, 20 May 2024 18:03:22 -0500 Subject: [PATCH] Add initial support for BEP (Build Event Protocol) Add proto definitions for BEP and create `BepServer` which implements the `publish_lifecycle_event` and `publish_build_tool_event_stream` rpcs Closes #925 --- nativelink-config/src/cas_server.rs | 13 + nativelink-proto/BUILD.bazel | 5 + nativelink-proto/genproto/google.api.pb.rs | 90 ++ .../genproto/google.devtools.build.v1.pb.rs | 992 ++++++++++++++++++ nativelink-proto/genproto/lib.rs | 7 + .../google/api/field_behavior.proto | 104 ++ .../devtools/build/v1/build_events.proto | 187 ++++ .../devtools/build/v1/build_status.proto | 77 ++ .../build/v1/publish_build_event.proto | 187 ++++ nativelink-service/BUILD.bazel | 2 + nativelink-service/src/bep_server.rs | 204 ++++ nativelink-service/src/lib.rs | 1 + nativelink-service/tests/bep_server_test.rs | 310 ++++++ src/bin/nativelink.rs | 27 + 14 files changed, 2206 insertions(+) create mode 100644 nativelink-proto/genproto/google.devtools.build.v1.pb.rs create mode 100644 nativelink-proto/google/api/field_behavior.proto create mode 100644 nativelink-proto/google/devtools/build/v1/build_events.proto create mode 100644 nativelink-proto/google/devtools/build/v1/build_status.proto create mode 100644 nativelink-proto/google/devtools/build/v1/publish_build_event.proto create mode 100644 nativelink-service/src/bep_server.rs create mode 100644 nativelink-service/tests/bep_server_test.rs diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index d46ff0004..c8d84a9fb 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -197,6 +197,14 @@ pub struct HealthConfig { pub path: String, } +#[derive(Deserialize, Debug)] +pub struct BepConfig { + /// The store to publish build events to. + /// The store name referenced in the `stores` map in the main config. + #[serde(deserialize_with = "convert_string_with_shellexpand")] + pub store: StoreRefName, +} + #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct ServicesConfig { @@ -234,6 +242,11 @@ pub struct ServicesConfig { /// that makes the remote execution/cache requests. pub worker_api: Option, + /// Experimental - Build Event Protocol (BEP) configuration. This is + /// the service that will consume build events from the client and + /// publish them to a store for processing by an external service. + pub experimental_bep: Option, + /// Experimental - Prometheus metrics configuration. Metrics are gathered /// as a singleton but may be served on multiple endpoints. pub experimental_prometheus: Option, diff --git a/nativelink-proto/BUILD.bazel b/nativelink-proto/BUILD.bazel index f657abe7c..b69b61ea7 100644 --- a/nativelink-proto/BUILD.bazel +++ b/nativelink-proto/BUILD.bazel @@ -6,6 +6,7 @@ PROTO_NAMES = [ "com.github.trace_machina.nativelink.remote_execution", "google.api", "google.bytestream", + "google.devtools.build.v1", "google.longrunning", "google.rpc", ] @@ -28,8 +29,12 @@ genrule( "com/github/trace_machina/nativelink/remote_execution/worker_api.proto", "google/api/annotations.proto", "google/api/client.proto", + "google/api/field_behavior.proto", "google/api/http.proto", "google/bytestream/bytestream.proto", + "google/devtools/build/v1/build_events.proto", + "google/devtools/build/v1/build_status.proto", + "google/devtools/build/v1/publish_build_event.proto", "google/longrunning/operations.proto", "google/protobuf/any.proto", "google/protobuf/descriptor.proto", diff --git a/nativelink-proto/genproto/google.api.pb.rs b/nativelink-proto/genproto/google.api.pb.rs index 0ba04d997..3ce002428 100644 --- a/nativelink-proto/genproto/google.api.pb.rs +++ b/nativelink-proto/genproto/google.api.pb.rs @@ -380,3 +380,93 @@ pub struct CustomHttpPattern { #[prost(string, tag = "2")] pub path: ::prost::alloc::string::String, } +/// An indicator of the behavior of a given field (for example, that a field +/// is required in requests, or given as output but ignored as input). +/// This **does not** change the behavior in protocol buffers itself; it only +/// denotes the behavior and may affect how API tooling handles the field. +/// +/// Note: This enum **may** receive new values in the future. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum FieldBehavior { + /// Conventional default for enums. Do not use this. + Unspecified = 0, + /// Specifically denotes a field as optional. + /// While all fields in protocol buffers are optional, this may be specified + /// for emphasis if appropriate. + Optional = 1, + /// Denotes a field as required. + /// This indicates that the field **must** be provided as part of the request, + /// and failure to do so will cause an error (usually `INVALID_ARGUMENT`). + Required = 2, + /// Denotes a field as output only. + /// This indicates that the field is provided in responses, but including the + /// field in a request does nothing (the server *must* ignore it and + /// *must not* throw an error as a result of the field's presence). + OutputOnly = 3, + /// Denotes a field as input only. + /// This indicates that the field is provided in requests, and the + /// corresponding field is not included in output. + InputOnly = 4, + /// Denotes a field as immutable. + /// This indicates that the field may be set once in a request to create a + /// resource, but may not be changed thereafter. + Immutable = 5, + /// Denotes that a (repeated) field is an unordered list. + /// This indicates that the service may provide the elements of the list + /// in any arbitrary order, rather than the order the user originally + /// provided. Additionally, the list's order may or may not be stable. + UnorderedList = 6, + /// Denotes that this field returns a non-empty default value if not set. + /// This indicates that if the user provides the empty value in a request, + /// a non-empty value will be returned. The user will not be aware of what + /// non-empty value to expect. + NonEmptyDefault = 7, + /// Denotes that the field in a resource (a message annotated with + /// google.api.resource) is used in the resource name to uniquely identify the + /// resource. For AIP-compliant APIs, this should only be applied to the + /// `name` field on the resource. + /// + /// This behavior should not be applied to references to other resources within + /// the message. + /// + /// The identifier field of resources often have different field behavior + /// depending on the request it is embedded in (e.g. for Create methods name + /// is optional and unused, while for Update methods it is required). Instead + /// of method-specific annotations, only `IDENTIFIER` is required. + Identifier = 8, +} +impl FieldBehavior { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + FieldBehavior::Unspecified => "FIELD_BEHAVIOR_UNSPECIFIED", + FieldBehavior::Optional => "OPTIONAL", + FieldBehavior::Required => "REQUIRED", + FieldBehavior::OutputOnly => "OUTPUT_ONLY", + FieldBehavior::InputOnly => "INPUT_ONLY", + FieldBehavior::Immutable => "IMMUTABLE", + FieldBehavior::UnorderedList => "UNORDERED_LIST", + FieldBehavior::NonEmptyDefault => "NON_EMPTY_DEFAULT", + FieldBehavior::Identifier => "IDENTIFIER", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "FIELD_BEHAVIOR_UNSPECIFIED" => Some(Self::Unspecified), + "OPTIONAL" => Some(Self::Optional), + "REQUIRED" => Some(Self::Required), + "OUTPUT_ONLY" => Some(Self::OutputOnly), + "INPUT_ONLY" => Some(Self::InputOnly), + "IMMUTABLE" => Some(Self::Immutable), + "UNORDERED_LIST" => Some(Self::UnorderedList), + "NON_EMPTY_DEFAULT" => Some(Self::NonEmptyDefault), + "IDENTIFIER" => Some(Self::Identifier), + _ => None, + } + } +} diff --git a/nativelink-proto/genproto/google.devtools.build.v1.pb.rs b/nativelink-proto/genproto/google.devtools.build.v1.pb.rs new file mode 100644 index 000000000..2ea78bb11 --- /dev/null +++ b/nativelink-proto/genproto/google.devtools.build.v1.pb.rs @@ -0,0 +1,992 @@ +// Copyright 2022 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file is @generated by prost-build. +/// Status used for both invocation attempt and overall build completion. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BuildStatus { + /// The end result. + #[prost(enumeration = "build_status::Result", tag = "1")] + pub result: i32, + /// Final invocation ID of the build, if there was one. + /// This field is only set on a status in BuildFinished event. + #[prost(string, tag = "3")] + pub final_invocation_id: ::prost::alloc::string::String, + /// Build tool exit code. Integer value returned by the executed build tool. + /// Might not be available in some cases, e.g., a build timeout. + #[prost(message, optional, tag = "4")] + pub build_tool_exit_code: ::core::option::Option, + /// Human-readable error message. Do not use for programmatic purposes. + #[prost(string, tag = "5")] + pub error_message: ::prost::alloc::string::String, + /// Fine-grained diagnostic information to complement the status. + #[prost(message, optional, tag = "2")] + pub details: ::core::option::Option<::prost_types::Any>, +} +/// Nested message and enum types in `BuildStatus`. +pub mod build_status { + /// The end result of the Build. + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum Result { + /// Unspecified or unknown. + UnknownStatus = 0, + /// Build was successful and tests (if requested) all pass. + CommandSucceeded = 1, + /// Build error and/or test failure. + CommandFailed = 2, + /// Unable to obtain a result due to input provided by the user. + UserError = 3, + /// Unable to obtain a result due to a failure within the build system. + SystemError = 4, + /// Build required too many resources, such as build tool RAM. + ResourceExhausted = 5, + /// An invocation attempt time exceeded its deadline. + InvocationDeadlineExceeded = 6, + /// Build request time exceeded the request_deadline + RequestDeadlineExceeded = 8, + /// The build was cancelled by a call to CancelBuild. + Cancelled = 7, + } + impl Result { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Result::UnknownStatus => "UNKNOWN_STATUS", + Result::CommandSucceeded => "COMMAND_SUCCEEDED", + Result::CommandFailed => "COMMAND_FAILED", + Result::UserError => "USER_ERROR", + Result::SystemError => "SYSTEM_ERROR", + Result::ResourceExhausted => "RESOURCE_EXHAUSTED", + Result::InvocationDeadlineExceeded => "INVOCATION_DEADLINE_EXCEEDED", + Result::RequestDeadlineExceeded => "REQUEST_DEADLINE_EXCEEDED", + Result::Cancelled => "CANCELLED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "UNKNOWN_STATUS" => Some(Self::UnknownStatus), + "COMMAND_SUCCEEDED" => Some(Self::CommandSucceeded), + "COMMAND_FAILED" => Some(Self::CommandFailed), + "USER_ERROR" => Some(Self::UserError), + "SYSTEM_ERROR" => Some(Self::SystemError), + "RESOURCE_EXHAUSTED" => Some(Self::ResourceExhausted), + "INVOCATION_DEADLINE_EXCEEDED" => Some(Self::InvocationDeadlineExceeded), + "REQUEST_DEADLINE_EXCEEDED" => Some(Self::RequestDeadlineExceeded), + "CANCELLED" => Some(Self::Cancelled), + _ => None, + } + } + } +} +/// An event representing some state change that occurred in the build. This +/// message does not include field for uniquely identifying an event. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BuildEvent { + /// This should be precisely the time when this event happened, and not when + /// the event proto was created or sent. + #[prost(message, optional, tag = "1")] + pub event_time: ::core::option::Option<::prost_types::Timestamp>, + /// ////////////////////////////////////////////////////////////////////////// + /// Events that indicate a state change of a build request in the build + /// queue. + #[prost(oneof = "build_event::Event", tags = "51, 52, 53, 55, 56, 59, 60, 61, 62")] + pub event: ::core::option::Option, +} +/// Nested message and enum types in `BuildEvent`. +pub mod build_event { + /// Notification that the build system has attempted to run the build tool. + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct InvocationAttemptStarted { + /// The number of the invocation attempt, starting at 1 and increasing by 1 + /// for each new attempt. Can be used to determine if there is a later + /// invocation attempt replacing the current one a client is processing. + #[prost(int64, tag = "1")] + pub attempt_number: i64, + /// Arbitrary details about the invocation attempt. + #[prost(message, optional, tag = "2")] + pub details: ::core::option::Option<::prost_types::Any>, + } + /// Notification that an invocation attempt has finished. + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct InvocationAttemptFinished { + /// Final status of the invocation. + #[prost(message, optional, tag = "3")] + pub invocation_status: ::core::option::Option, + /// Arbitrary details about the invocation attempt. + #[prost(message, optional, tag = "4")] + pub details: ::core::option::Option<::prost_types::Any>, + } + /// Notification that the build request is enqueued. + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct BuildEnqueued { + /// Additional details about the Build. + #[prost(message, optional, tag = "1")] + pub details: ::core::option::Option<::prost_types::Any>, + } + /// Notification that the build request has finished, and no further + /// invocations will occur. Note that this applies to the entire Build. + /// Individual invocations trigger InvocationFinished when they finish. + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct BuildFinished { + /// Final status of the build. + #[prost(message, optional, tag = "1")] + pub status: ::core::option::Option, + /// Additional details about the Build. + #[prost(message, optional, tag = "2")] + pub details: ::core::option::Option<::prost_types::Any>, + } + /// Textual output written to standard output or standard error. + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct ConsoleOutput { + /// The output stream type. + #[prost(enumeration = "super::ConsoleOutputStream", tag = "1")] + pub r#type: i32, + /// The output stream content. + #[prost(oneof = "console_output::Output", tags = "2, 3")] + pub output: ::core::option::Option, + } + /// Nested message and enum types in `ConsoleOutput`. + pub mod console_output { + /// The output stream content. + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Output { + /// Regular UTF-8 output; normal text. + #[prost(string, tag = "2")] + TextOutput(::prost::alloc::string::String), + /// Used if the output is not UTF-8 text (for example, a binary proto). + #[prost(bytes, tag = "3")] + BinaryOutput(::prost::bytes::Bytes), + } + } + /// Notification of the end of a build event stream published by a build + /// component other than CONTROLLER (See StreamId.BuildComponents). + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct BuildComponentStreamFinished { + /// How the event stream finished. + #[prost(enumeration = "build_component_stream_finished::FinishType", tag = "1")] + pub r#type: i32, + } + /// Nested message and enum types in `BuildComponentStreamFinished`. + pub mod build_component_stream_finished { + /// How did the event stream finish. + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum FinishType { + /// Unknown or unspecified; callers should never set this value. + Unspecified = 0, + /// Set by the event publisher to indicate a build event stream is + /// finished. + Finished = 1, + /// Set by the WatchBuild RPC server when the publisher of a build event + /// stream stops publishing events without publishing a + /// BuildComponentStreamFinished event whose type equals FINISHED. + Expired = 2, + } + impl FinishType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + FinishType::Unspecified => "FINISH_TYPE_UNSPECIFIED", + FinishType::Finished => "FINISHED", + FinishType::Expired => "EXPIRED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "FINISH_TYPE_UNSPECIFIED" => Some(Self::Unspecified), + "FINISHED" => Some(Self::Finished), + "EXPIRED" => Some(Self::Expired), + _ => None, + } + } + } + } + /// ////////////////////////////////////////////////////////////////////////// + /// Events that indicate a state change of a build request in the build + /// queue. + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Event { + /// An invocation attempt has started. + #[prost(message, tag = "51")] + InvocationAttemptStarted(InvocationAttemptStarted), + /// An invocation attempt has finished. + #[prost(message, tag = "52")] + InvocationAttemptFinished(InvocationAttemptFinished), + /// The build is enqueued. + #[prost(message, tag = "53")] + BuildEnqueued(BuildEnqueued), + /// The build has finished. Set when the build is terminated. + #[prost(message, tag = "55")] + BuildFinished(BuildFinished), + /// An event containing printed text. + #[prost(message, tag = "56")] + ConsoleOutput(ConsoleOutput), + /// Indicates the end of a build event stream (with the same StreamId) from + /// a build component executing the requested build task. + /// *** This field does not indicate the WatchBuild RPC is finished. *** + #[prost(message, tag = "59")] + ComponentStreamFinished(BuildComponentStreamFinished), + /// Structured build event generated by Bazel about its execution progress. + #[prost(message, tag = "60")] + BazelEvent(::prost_types::Any), + /// An event that contains supplemental tool-specific information about + /// build execution. + #[prost(message, tag = "61")] + BuildExecutionEvent(::prost_types::Any), + /// An event that contains supplemental tool-specific information about + /// source fetching. + #[prost(message, tag = "62")] + SourceFetchEvent(::prost_types::Any), + } +} +/// Unique identifier for a build event stream. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct StreamId { + /// The id of a Build message. + #[prost(string, tag = "1")] + pub build_id: ::prost::alloc::string::String, + /// The unique invocation ID within this build. + /// It should be the same as {invocation} (below) during the migration. + #[prost(string, tag = "6")] + pub invocation_id: ::prost::alloc::string::String, + /// The component that emitted this event. + #[prost(enumeration = "stream_id::BuildComponent", tag = "3")] + pub component: i32, +} +/// Nested message and enum types in `StreamId`. +pub mod stream_id { + /// Which build component generates this event stream. Each build component + /// may generate one event stream. + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum BuildComponent { + /// Unknown or unspecified; callers should never set this value. + UnknownComponent = 0, + /// A component that coordinates builds. + Controller = 1, + /// A component that runs executables needed to complete a build. + Worker = 2, + /// A component that builds something. + Tool = 3, + } + impl BuildComponent { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + BuildComponent::UnknownComponent => "UNKNOWN_COMPONENT", + BuildComponent::Controller => "CONTROLLER", + BuildComponent::Worker => "WORKER", + BuildComponent::Tool => "TOOL", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "UNKNOWN_COMPONENT" => Some(Self::UnknownComponent), + "CONTROLLER" => Some(Self::Controller), + "WORKER" => Some(Self::Worker), + "TOOL" => Some(Self::Tool), + _ => None, + } + } + } +} +/// The type of console output stream. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ConsoleOutputStream { + /// Unspecified or unknown. + Unknown = 0, + /// Normal output stream. + Stdout = 1, + /// Error output stream. + Stderr = 2, +} +impl ConsoleOutputStream { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ConsoleOutputStream::Unknown => "UNKNOWN", + ConsoleOutputStream::Stdout => "STDOUT", + ConsoleOutputStream::Stderr => "STDERR", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "UNKNOWN" => Some(Self::Unknown), + "STDOUT" => Some(Self::Stdout), + "STDERR" => Some(Self::Stderr), + _ => None, + } + } +} +/// Publishes 'lifecycle events' that update the high-level state of a build: +/// - BuildEnqueued: When a build is scheduled. +/// - InvocationAttemptStarted: When work for a build starts; there can be +/// multiple invocations for a build (e.g. retries). +/// - InvocationAttemptCompleted: When work for a build finishes. +/// - BuildFinished: When a build is finished. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishLifecycleEventRequest { + /// The interactivity of this build. + #[prost(enumeration = "publish_lifecycle_event_request::ServiceLevel", tag = "1")] + pub service_level: i32, + /// Required. The lifecycle build event. If this is a build tool event, the RPC + /// will fail with INVALID_REQUEST. + #[prost(message, optional, tag = "2")] + pub build_event: ::core::option::Option, + /// If the next event for this build or invocation (depending on the event + /// type) hasn't been published after this duration from when {build_event} + /// is written to BES, consider this stream expired. If this field is not set, + /// BES backend will use its own default value. + #[prost(message, optional, tag = "3")] + pub stream_timeout: ::core::option::Option<::prost_types::Duration>, + /// Additional information about a build request. These are define by the event + /// publishers, and the Build Event Service does not validate or interpret + /// them. They are used while notifying internal systems of new builds and + /// invocations if the OrderedBuildEvent.event type is + /// BuildEnqueued/InvocationAttemptStarted. + #[prost(string, repeated, tag = "4")] + pub notification_keywords: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// Required. The project this build is associated with. + /// This should match the project used for the initial call to + /// PublishLifecycleEvent (containing a BuildEnqueued message). + #[prost(string, tag = "6")] + pub project_id: ::prost::alloc::string::String, + /// Whether to require a previously received matching parent lifecycle event + /// for the current request's event before continuing processing. + /// - InvocationAttemptStarted and BuildFinished events require a BuildEnqueued + /// parent event. + /// - InvocationAttemptFinished events require an InvocationAttemptStarted + /// parent event. + #[prost(bool, tag = "7")] + pub check_preceding_lifecycle_events_present: bool, +} +/// Nested message and enum types in `PublishLifecycleEventRequest`. +pub mod publish_lifecycle_event_request { + /// The service level of the build request. Backends only uses this value when + /// the BuildEnqueued event is published to determine what level of service + /// this build should receive. + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum ServiceLevel { + /// Non-interactive builds can tolerate longer event latencies. This is the + /// default ServiceLevel if callers do not specify one. + Noninteractive = 0, + /// The events of an interactive build should be delivered with low latency. + Interactive = 1, + } + impl ServiceLevel { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ServiceLevel::Noninteractive => "NONINTERACTIVE", + ServiceLevel::Interactive => "INTERACTIVE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "NONINTERACTIVE" => Some(Self::Noninteractive), + "INTERACTIVE" => Some(Self::Interactive), + _ => None, + } + } + } +} +/// States which event has been committed. Any failure to commit will cause +/// RPC errors, hence not recorded by this proto. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishBuildToolEventStreamResponse { + /// The stream that contains this event. + #[prost(message, optional, tag = "1")] + pub stream_id: ::core::option::Option, + /// The sequence number of this event that has been committed. + #[prost(int64, tag = "2")] + pub sequence_number: i64, +} +/// Build event with contextual information about the stream it belongs to and +/// its position in that stream. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OrderedBuildEvent { + /// Which build event stream this event belongs to. + #[prost(message, optional, tag = "1")] + pub stream_id: ::core::option::Option, + /// The position of this event in the stream. The sequence numbers for a build + /// event stream should be a sequence of consecutive natural numbers starting + /// from one. (1, 2, 3, ...) + #[prost(int64, tag = "2")] + pub sequence_number: i64, + /// The actual event. + #[prost(message, optional, tag = "3")] + pub event: ::core::option::Option, +} +/// Streaming request message for PublishBuildToolEventStream. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishBuildToolEventStreamRequest { + /// Required. The build event with position info. + /// New publishing clients should use this field rather than the 3 above. + #[prost(message, optional, tag = "4")] + pub ordered_build_event: ::core::option::Option, + /// The keywords to be attached to the notification which notifies the start + /// of a new build event stream. BES only reads this field when sequence_number + /// or ordered_build_event.sequence_number is 1 in this message. If this field + /// is empty, BES will not publish notification messages for this stream. + #[prost(string, repeated, tag = "5")] + pub notification_keywords: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// Required. The project this build is associated with. + /// This should match the project used for the initial call to + /// PublishLifecycleEvent (containing a BuildEnqueued message). + #[prost(string, tag = "6")] + pub project_id: ::prost::alloc::string::String, + /// Whether to require a previously received matching InvocationAttemptStarted + /// event before continuing event processing for the event in the current + /// request. BES only performs this check for events with sequence_number 1 + /// i.e. the first event in the stream. + #[prost(bool, tag = "7")] + pub check_preceding_lifecycle_events_present: bool, +} +/// Generated client implementations. +pub mod publish_build_event_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// A service for publishing BuildEvents. BuildEvents are generated by Build + /// Systems to record actions taken during a Build. Events occur in streams, + /// are identified by a StreamId, and ordered by sequence number in a stream. + /// + /// A Build may contain several streams of BuildEvents, depending on the systems + /// that are involved in the Build. Some BuildEvents are used to declare the + /// beginning and end of major portions of a Build; these are called + /// LifecycleEvents, and are used (for example) to indicate the beginning or end + /// of a Build, and the beginning or end of an Invocation attempt (there can be + /// more than 1 Invocation in a Build if, for example, a failure occurs somewhere + /// and it needs to be retried). + /// + /// Other, build-tool events represent actions taken by the Build tool, such as + /// target objects produced via compilation, tests run, et cetera. There could be + /// more than one build tool stream for an invocation attempt of a build. + #[derive(Debug, Clone)] + pub struct PublishBuildEventClient { + inner: tonic::client::Grpc, + } + impl PublishBuildEventClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl PublishBuildEventClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> PublishBuildEventClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + PublishBuildEventClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// Publish a build event stating the new state of a build (typically from the + /// build queue). The BuildEnqueued event must be published before all other + /// events for the same build ID. + /// + /// The backend will persist the event and deliver it to registered frontend + /// jobs immediately without batching. + /// + /// The commit status of the request is reported by the RPC's util_status() + /// function. The error code is the canonical error code defined in + /// //util/task/codes.proto. + pub async fn publish_lifecycle_event( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/google.devtools.build.v1.PublishBuildEvent/PublishLifecycleEvent", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "google.devtools.build.v1.PublishBuildEvent", + "PublishLifecycleEvent", + ), + ); + self.inner.unary(req, path, codec).await + } + /// Publish build tool events belonging to the same stream to a backend job + /// using bidirectional streaming. + pub async fn publish_build_tool_event_stream( + &mut self, + request: impl tonic::IntoStreamingRequest< + Message = super::PublishBuildToolEventStreamRequest, + >, + ) -> std::result::Result< + tonic::Response< + tonic::codec::Streaming, + >, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/google.devtools.build.v1.PublishBuildEvent/PublishBuildToolEventStream", + ); + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "google.devtools.build.v1.PublishBuildEvent", + "PublishBuildToolEventStream", + ), + ); + self.inner.streaming(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod publish_build_event_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with PublishBuildEventServer. + #[async_trait] + pub trait PublishBuildEvent: Send + Sync + 'static { + /// Publish a build event stating the new state of a build (typically from the + /// build queue). The BuildEnqueued event must be published before all other + /// events for the same build ID. + /// + /// The backend will persist the event and deliver it to registered frontend + /// jobs immediately without batching. + /// + /// The commit status of the request is reported by the RPC's util_status() + /// function. The error code is the canonical error code defined in + /// //util/task/codes.proto. + async fn publish_lifecycle_event( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the PublishBuildToolEventStream method. + type PublishBuildToolEventStreamStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result< + super::PublishBuildToolEventStreamResponse, + tonic::Status, + >, + > + + Send + + 'static; + /// Publish build tool events belonging to the same stream to a backend job + /// using bidirectional streaming. + async fn publish_build_tool_event_stream( + &self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + /// A service for publishing BuildEvents. BuildEvents are generated by Build + /// Systems to record actions taken during a Build. Events occur in streams, + /// are identified by a StreamId, and ordered by sequence number in a stream. + /// + /// A Build may contain several streams of BuildEvents, depending on the systems + /// that are involved in the Build. Some BuildEvents are used to declare the + /// beginning and end of major portions of a Build; these are called + /// LifecycleEvents, and are used (for example) to indicate the beginning or end + /// of a Build, and the beginning or end of an Invocation attempt (there can be + /// more than 1 Invocation in a Build if, for example, a failure occurs somewhere + /// and it needs to be retried). + /// + /// Other, build-tool events represent actions taken by the Build tool, such as + /// target objects produced via compilation, tests run, et cetera. There could be + /// more than one build tool stream for an invocation attempt of a build. + #[derive(Debug)] + pub struct PublishBuildEventServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl PublishBuildEventServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for PublishBuildEventServer + where + T: PublishBuildEvent, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/google.devtools.build.v1.PublishBuildEvent/PublishLifecycleEvent" => { + #[allow(non_camel_case_types)] + struct PublishLifecycleEventSvc(pub Arc); + impl< + T: PublishBuildEvent, + > tonic::server::UnaryService + for PublishLifecycleEventSvc { + type Response = (); + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::publish_lifecycle_event( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PublishLifecycleEventSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/google.devtools.build.v1.PublishBuildEvent/PublishBuildToolEventStream" => { + #[allow(non_camel_case_types)] + struct PublishBuildToolEventStreamSvc( + pub Arc, + ); + impl< + T: PublishBuildEvent, + > tonic::server::StreamingService< + super::PublishBuildToolEventStreamRequest, + > for PublishBuildToolEventStreamSvc { + type Response = super::PublishBuildToolEventStreamResponse; + type ResponseStream = T::PublishBuildToolEventStreamStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::publish_build_tool_event_stream( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PublishBuildToolEventStreamSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for PublishBuildEventServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService + for PublishBuildEventServer { + const NAME: &'static str = "google.devtools.build.v1.PublishBuildEvent"; + } +} diff --git a/nativelink-proto/genproto/lib.rs b/nativelink-proto/genproto/lib.rs index 411b30546..ea03ec505 100644 --- a/nativelink-proto/genproto/lib.rs +++ b/nativelink-proto/genproto/lib.rs @@ -48,6 +48,13 @@ pub mod google { pub mod bytestream { include!("google.bytestream.pb.rs"); } + pub mod devtools { + pub mod build { + pub mod v1 { + include!("google.devtools.build.v1.pb.rs"); + } + } + } pub mod longrunning { include!("google.longrunning.pb.rs"); } diff --git a/nativelink-proto/google/api/field_behavior.proto b/nativelink-proto/google/api/field_behavior.proto new file mode 100644 index 000000000..2865ba053 --- /dev/null +++ b/nativelink-proto/google/api/field_behavior.proto @@ -0,0 +1,104 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.api; + +import "google/protobuf/descriptor.proto"; + +option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations"; +option java_multiple_files = true; +option java_outer_classname = "FieldBehaviorProto"; +option java_package = "com.google.api"; +option objc_class_prefix = "GAPI"; + +extend google.protobuf.FieldOptions { + // A designation of a specific field behavior (required, output only, etc.) + // in protobuf messages. + // + // Examples: + // + // string name = 1 [(google.api.field_behavior) = REQUIRED]; + // State state = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; + // google.protobuf.Duration ttl = 1 + // [(google.api.field_behavior) = INPUT_ONLY]; + // google.protobuf.Timestamp expire_time = 1 + // [(google.api.field_behavior) = OUTPUT_ONLY, + // (google.api.field_behavior) = IMMUTABLE]; + repeated google.api.FieldBehavior field_behavior = 1052 [packed = false]; +} + +// An indicator of the behavior of a given field (for example, that a field +// is required in requests, or given as output but ignored as input). +// This **does not** change the behavior in protocol buffers itself; it only +// denotes the behavior and may affect how API tooling handles the field. +// +// Note: This enum **may** receive new values in the future. +enum FieldBehavior { + // Conventional default for enums. Do not use this. + FIELD_BEHAVIOR_UNSPECIFIED = 0; + + // Specifically denotes a field as optional. + // While all fields in protocol buffers are optional, this may be specified + // for emphasis if appropriate. + OPTIONAL = 1; + + // Denotes a field as required. + // This indicates that the field **must** be provided as part of the request, + // and failure to do so will cause an error (usually `INVALID_ARGUMENT`). + REQUIRED = 2; + + // Denotes a field as output only. + // This indicates that the field is provided in responses, but including the + // field in a request does nothing (the server *must* ignore it and + // *must not* throw an error as a result of the field's presence). + OUTPUT_ONLY = 3; + + // Denotes a field as input only. + // This indicates that the field is provided in requests, and the + // corresponding field is not included in output. + INPUT_ONLY = 4; + + // Denotes a field as immutable. + // This indicates that the field may be set once in a request to create a + // resource, but may not be changed thereafter. + IMMUTABLE = 5; + + // Denotes that a (repeated) field is an unordered list. + // This indicates that the service may provide the elements of the list + // in any arbitrary order, rather than the order the user originally + // provided. Additionally, the list's order may or may not be stable. + UNORDERED_LIST = 6; + + // Denotes that this field returns a non-empty default value if not set. + // This indicates that if the user provides the empty value in a request, + // a non-empty value will be returned. The user will not be aware of what + // non-empty value to expect. + NON_EMPTY_DEFAULT = 7; + + // Denotes that the field in a resource (a message annotated with + // google.api.resource) is used in the resource name to uniquely identify the + // resource. For AIP-compliant APIs, this should only be applied to the + // `name` field on the resource. + // + // This behavior should not be applied to references to other resources within + // the message. + // + // The identifier field of resources often have different field behavior + // depending on the request it is embedded in (e.g. for Create methods name + // is optional and unused, while for Update methods it is required). Instead + // of method-specific annotations, only `IDENTIFIER` is required. + IDENTIFIER = 8; +} diff --git a/nativelink-proto/google/devtools/build/v1/build_events.proto b/nativelink-proto/google/devtools/build/v1/build_events.proto new file mode 100644 index 000000000..43cf5e275 --- /dev/null +++ b/nativelink-proto/google/devtools/build/v1/build_events.proto @@ -0,0 +1,187 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.devtools.build.v1; + +import "google/devtools/build/v1/build_status.proto"; +import "google/protobuf/any.proto"; +import "google/protobuf/timestamp.proto"; + +option cc_enable_arenas = true; +option go_package = "google.golang.org/genproto/googleapis/devtools/build/v1;build"; +option java_multiple_files = true; +option java_outer_classname = "BuildEventProto"; +option java_package = "com.google.devtools.build.v1"; +option php_namespace = "Google\\Cloud\\Build\\V1"; + +// An event representing some state change that occurred in the build. This +// message does not include field for uniquely identifying an event. +message BuildEvent { + // Notification that the build system has attempted to run the build tool. + message InvocationAttemptStarted { + // The number of the invocation attempt, starting at 1 and increasing by 1 + // for each new attempt. Can be used to determine if there is a later + // invocation attempt replacing the current one a client is processing. + int64 attempt_number = 1; + + // Arbitrary details about the invocation attempt. + google.protobuf.Any details = 2; + } + + // Notification that an invocation attempt has finished. + message InvocationAttemptFinished { + // Final status of the invocation. + BuildStatus invocation_status = 3; + + // Arbitrary details about the invocation attempt. + google.protobuf.Any details = 4; + } + + // Notification that the build request is enqueued. + message BuildEnqueued { + // Additional details about the Build. + google.protobuf.Any details = 1; + } + + // Notification that the build request has finished, and no further + // invocations will occur. Note that this applies to the entire Build. + // Individual invocations trigger InvocationFinished when they finish. + message BuildFinished { + // Final status of the build. + BuildStatus status = 1; + + // Additional details about the Build. + google.protobuf.Any details = 2; + } + + // Textual output written to standard output or standard error. + message ConsoleOutput { + // The output stream type. + ConsoleOutputStream type = 1; + + // The output stream content. + oneof output { + // Regular UTF-8 output; normal text. + string text_output = 2; + + // Used if the output is not UTF-8 text (for example, a binary proto). + bytes binary_output = 3; + } + } + + // Notification of the end of a build event stream published by a build + // component other than CONTROLLER (See StreamId.BuildComponents). + message BuildComponentStreamFinished { + // How did the event stream finish. + enum FinishType { + // Unknown or unspecified; callers should never set this value. + FINISH_TYPE_UNSPECIFIED = 0; + + // Set by the event publisher to indicate a build event stream is + // finished. + FINISHED = 1; + + // Set by the WatchBuild RPC server when the publisher of a build event + // stream stops publishing events without publishing a + // BuildComponentStreamFinished event whose type equals FINISHED. + EXPIRED = 2; + } + + // How the event stream finished. + FinishType type = 1; + } + + // This should be precisely the time when this event happened, and not when + // the event proto was created or sent. + google.protobuf.Timestamp event_time = 1; + + // ////////////////////////////////////////////////////////////////////////// + // Events that indicate a state change of a build request in the build + // queue. + oneof event { + // An invocation attempt has started. + InvocationAttemptStarted invocation_attempt_started = 51; + + // An invocation attempt has finished. + InvocationAttemptFinished invocation_attempt_finished = 52; + + // The build is enqueued. + BuildEnqueued build_enqueued = 53; + + // The build has finished. Set when the build is terminated. + BuildFinished build_finished = 55; + + // An event containing printed text. + ConsoleOutput console_output = 56; + + // Indicates the end of a build event stream (with the same StreamId) from + // a build component executing the requested build task. + // *** This field does not indicate the WatchBuild RPC is finished. *** + BuildComponentStreamFinished component_stream_finished = 59; + + // Structured build event generated by Bazel about its execution progress. + google.protobuf.Any bazel_event = 60; + + // An event that contains supplemental tool-specific information about + // build execution. + google.protobuf.Any build_execution_event = 61; + + // An event that contains supplemental tool-specific information about + // source fetching. + google.protobuf.Any source_fetch_event = 62; + } +} + +// Unique identifier for a build event stream. +message StreamId { + // Which build component generates this event stream. Each build component + // may generate one event stream. + enum BuildComponent { + // Unknown or unspecified; callers should never set this value. + UNKNOWN_COMPONENT = 0; + + // A component that coordinates builds. + CONTROLLER = 1; + + // A component that runs executables needed to complete a build. + WORKER = 2; + + // A component that builds something. + TOOL = 3; + } + + // The id of a Build message. + string build_id = 1; + + // The unique invocation ID within this build. + // It should be the same as {invocation} (below) during the migration. + string invocation_id = 6; + + // The component that emitted this event. + BuildComponent component = 3; +} + +// The type of console output stream. +enum ConsoleOutputStream { + // Unspecified or unknown. + UNKNOWN = 0; + + // Normal output stream. + STDOUT = 1; + + // Error output stream. + STDERR = 2; +} diff --git a/nativelink-proto/google/devtools/build/v1/build_status.proto b/nativelink-proto/google/devtools/build/v1/build_status.proto new file mode 100644 index 000000000..93a525f13 --- /dev/null +++ b/nativelink-proto/google/devtools/build/v1/build_status.proto @@ -0,0 +1,77 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.devtools.build.v1; + +import "google/protobuf/any.proto"; +import "google/protobuf/wrappers.proto"; + +option cc_enable_arenas = true; +option go_package = "google.golang.org/genproto/googleapis/devtools/build/v1;build"; +option java_multiple_files = true; +option java_outer_classname = "BuildStatusProto"; +option java_package = "com.google.devtools.build.v1"; +option php_namespace = "Google\\Cloud\\Build\\V1"; + +// Status used for both invocation attempt and overall build completion. +message BuildStatus { + // The end result of the Build. + enum Result { + // Unspecified or unknown. + UNKNOWN_STATUS = 0; + + // Build was successful and tests (if requested) all pass. + COMMAND_SUCCEEDED = 1; + + // Build error and/or test failure. + COMMAND_FAILED = 2; + + // Unable to obtain a result due to input provided by the user. + USER_ERROR = 3; + + // Unable to obtain a result due to a failure within the build system. + SYSTEM_ERROR = 4; + + // Build required too many resources, such as build tool RAM. + RESOURCE_EXHAUSTED = 5; + + // An invocation attempt time exceeded its deadline. + INVOCATION_DEADLINE_EXCEEDED = 6; + + // Build request time exceeded the request_deadline + REQUEST_DEADLINE_EXCEEDED = 8; + + // The build was cancelled by a call to CancelBuild. + CANCELLED = 7; + } + + // The end result. + Result result = 1; + + // Final invocation ID of the build, if there was one. + // This field is only set on a status in BuildFinished event. + string final_invocation_id = 3; + + // Build tool exit code. Integer value returned by the executed build tool. + // Might not be available in some cases, e.g., a build timeout. + google.protobuf.Int32Value build_tool_exit_code = 4; + + // Human-readable error message. Do not use for programmatic purposes. + string error_message = 5; + + // Fine-grained diagnostic information to complement the status. + google.protobuf.Any details = 2; +} diff --git a/nativelink-proto/google/devtools/build/v1/publish_build_event.proto b/nativelink-proto/google/devtools/build/v1/publish_build_event.proto new file mode 100644 index 000000000..641ba5af2 --- /dev/null +++ b/nativelink-proto/google/devtools/build/v1/publish_build_event.proto @@ -0,0 +1,187 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.devtools.build.v1; + +import "google/api/annotations.proto"; +import "google/api/client.proto"; +import "google/api/field_behavior.proto"; +import "google/devtools/build/v1/build_events.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/empty.proto"; + +option cc_enable_arenas = true; +option go_package = "google.golang.org/genproto/googleapis/devtools/build/v1;build"; +option java_multiple_files = true; +option java_outer_classname = "BackendProto"; +option java_package = "com.google.devtools.build.v1"; +option php_namespace = "Google\\Cloud\\Build\\V1"; + +// A service for publishing BuildEvents. BuildEvents are generated by Build +// Systems to record actions taken during a Build. Events occur in streams, +// are identified by a StreamId, and ordered by sequence number in a stream. +// +// A Build may contain several streams of BuildEvents, depending on the systems +// that are involved in the Build. Some BuildEvents are used to declare the +// beginning and end of major portions of a Build; these are called +// LifecycleEvents, and are used (for example) to indicate the beginning or end +// of a Build, and the beginning or end of an Invocation attempt (there can be +// more than 1 Invocation in a Build if, for example, a failure occurs somewhere +// and it needs to be retried). +// +// Other, build-tool events represent actions taken by the Build tool, such as +// target objects produced via compilation, tests run, et cetera. There could be +// more than one build tool stream for an invocation attempt of a build. +service PublishBuildEvent { + option (google.api.default_host) = "buildeventservice.googleapis.com"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/cloud-platform"; + + // Publish a build event stating the new state of a build (typically from the + // build queue). The BuildEnqueued event must be published before all other + // events for the same build ID. + // + // The backend will persist the event and deliver it to registered frontend + // jobs immediately without batching. + // + // The commit status of the request is reported by the RPC's util_status() + // function. The error code is the canonical error code defined in + // //util/task/codes.proto. + rpc PublishLifecycleEvent(PublishLifecycleEventRequest) + returns (google.protobuf.Empty) { + option (google.api.http) = { + post: "/v1/projects/{project_id=*}/lifecycleEvents:publish" + body: "*" + additional_bindings { post: "/v1/lifecycleEvents:publish" body: "*" } + }; + } + + // Publish build tool events belonging to the same stream to a backend job + // using bidirectional streaming. + rpc PublishBuildToolEventStream(stream PublishBuildToolEventStreamRequest) + returns (stream PublishBuildToolEventStreamResponse) { + option (google.api.http) = { + post: "/v1/projects/{project_id=*}/events:publish" + body: "*" + additional_bindings { post: "/v1/events:publish" body: "*" } + }; + option (google.api.method_signature) = + "ordered_build_event,notification_keywords,project_id"; + } +} + +// Publishes 'lifecycle events' that update the high-level state of a build: +// - BuildEnqueued: When a build is scheduled. +// - InvocationAttemptStarted: When work for a build starts; there can be +// multiple invocations for a build (e.g. retries). +// - InvocationAttemptCompleted: When work for a build finishes. +// - BuildFinished: When a build is finished. +message PublishLifecycleEventRequest { + // The service level of the build request. Backends only uses this value when + // the BuildEnqueued event is published to determine what level of service + // this build should receive. + enum ServiceLevel { + // Non-interactive builds can tolerate longer event latencies. This is the + // default ServiceLevel if callers do not specify one. + NONINTERACTIVE = 0; + + // The events of an interactive build should be delivered with low latency. + INTERACTIVE = 1; + } + + // The interactivity of this build. + ServiceLevel service_level = 1; + + // Required. The lifecycle build event. If this is a build tool event, the RPC + // will fail with INVALID_REQUEST. + OrderedBuildEvent build_event = 2 [(google.api.field_behavior) = REQUIRED]; + + // If the next event for this build or invocation (depending on the event + // type) hasn't been published after this duration from when {build_event} + // is written to BES, consider this stream expired. If this field is not set, + // BES backend will use its own default value. + google.protobuf.Duration stream_timeout = 3; + + // Additional information about a build request. These are define by the event + // publishers, and the Build Event Service does not validate or interpret + // them. They are used while notifying internal systems of new builds and + // invocations if the OrderedBuildEvent.event type is + // BuildEnqueued/InvocationAttemptStarted. + repeated string notification_keywords = 4; + + // Required. The project this build is associated with. + // This should match the project used for the initial call to + // PublishLifecycleEvent (containing a BuildEnqueued message). + string project_id = 6 [(google.api.field_behavior) = REQUIRED]; + + // Whether to require a previously received matching parent lifecycle event + // for the current request's event before continuing processing. + // - InvocationAttemptStarted and BuildFinished events require a BuildEnqueued + // parent event. + // - InvocationAttemptFinished events require an InvocationAttemptStarted + // parent event. + bool check_preceding_lifecycle_events_present = 7; +} + +// States which event has been committed. Any failure to commit will cause +// RPC errors, hence not recorded by this proto. +message PublishBuildToolEventStreamResponse { + // The stream that contains this event. + StreamId stream_id = 1; + + // The sequence number of this event that has been committed. + int64 sequence_number = 2; +} + +// Build event with contextual information about the stream it belongs to and +// its position in that stream. +message OrderedBuildEvent { + // Which build event stream this event belongs to. + StreamId stream_id = 1; + + // The position of this event in the stream. The sequence numbers for a build + // event stream should be a sequence of consecutive natural numbers starting + // from one. (1, 2, 3, ...) + int64 sequence_number = 2; + + // The actual event. + BuildEvent event = 3; +} + +// Streaming request message for PublishBuildToolEventStream. +message PublishBuildToolEventStreamRequest { + // Required. The build event with position info. + // New publishing clients should use this field rather than the 3 above. + OrderedBuildEvent ordered_build_event = 4 + [(google.api.field_behavior) = REQUIRED]; + + // The keywords to be attached to the notification which notifies the start + // of a new build event stream. BES only reads this field when sequence_number + // or ordered_build_event.sequence_number is 1 in this message. If this field + // is empty, BES will not publish notification messages for this stream. + repeated string notification_keywords = 5; + + // Required. The project this build is associated with. + // This should match the project used for the initial call to + // PublishLifecycleEvent (containing a BuildEnqueued message). + string project_id = 6 [(google.api.field_behavior) = REQUIRED]; + + // Whether to require a previously received matching InvocationAttemptStarted + // event before continuing event processing for the event in the current + // request. BES only performs this check for events with sequence_number 1 + // i.e. the first event in the stream. + bool check_preceding_lifecycle_events_present = 7; +} diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index f42c72bd6..7c6d06a0c 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -10,6 +10,7 @@ rust_library( name = "nativelink-service", srcs = [ "src/ac_server.rs", + "src/bep_server.rs", "src/bytestream_server.rs", "src/capabilities_server.rs", "src/cas_server.rs", @@ -47,6 +48,7 @@ rust_test_suite( timeout = "short", srcs = [ "tests/ac_server_test.rs", + "tests/bep_server_test.rs", "tests/bytestream_server_test.rs", "tests/cas_server_test.rs", "tests/worker_api_server_test.rs", diff --git a/nativelink-service/src/bep_server.rs b/nativelink-service/src/bep_server.rs new file mode 100644 index 000000000..f2a0548a7 --- /dev/null +++ b/nativelink-service/src/bep_server.rs @@ -0,0 +1,204 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; + +use bytes::BytesMut; +use futures::stream::unfold; +use futures::Stream; +use nativelink_error::{Error, ResultExt}; +use nativelink_proto::google::devtools::build::v1::publish_build_event_server::{ + PublishBuildEvent, PublishBuildEventServer, +}; +use nativelink_proto::google::devtools::build::v1::{ + PublishBuildToolEventStreamRequest, PublishBuildToolEventStreamResponse, + PublishLifecycleEventRequest, StreamId, +}; +use nativelink_store::store_manager::StoreManager; +use nativelink_util::common::DigestInfo; +use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc}; +use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike}; +use prost::Message; +use tonic::{Request, Response, Result, Status, Streaming}; +use tracing::{instrument, Level}; + +pub struct BepServer { + store: Store, +} + +// TODO(caass): how are people expected to retrieve streams? if they need to access them via [`DigestInfo`] +// then we'll need to expose this function... +pub fn get_stream_digest(stream_id: &StreamId) -> Result { + let mut hasher = DigestHasherFunc::Blake3.hasher(); + hasher.update(stream_id.build_id.as_bytes()); + hasher.update(stream_id.invocation_id.as_bytes()); + hasher.update(format!("{}", stream_id.component).as_bytes()); + Ok(hasher.finalize_digest()) +} + +impl BepServer { + pub fn new( + config: &nativelink_config::cas_server::BepConfig, + store_manager: &StoreManager, + ) -> Result { + let store = store_manager + .get_store(&config.store) + .err_tip(|| format!("Expected store {} to exist in store manager", &config.store))?; + + Ok(Self { store }) + } + + pub fn into_service(self) -> PublishBuildEventServer { + PublishBuildEventServer::new(self) + } + + async fn inner_publish_lifecycle_event( + &self, + request: PublishLifecycleEventRequest, + ) -> Result, Error> { + let build_event = request + .build_event + .as_ref() + .err_tip(|| "Expected build_event to be set")?; + let stream_id = build_event + .stream_id + .as_ref() + .err_tip(|| "Expected stream_id to be set")?; + let digest_info = + get_stream_digest(stream_id).err_tip(|| "Failed to prepare request for upload")?; + + let mut buf = BytesMut::new(); + request + .encode(&mut buf) + .err_tip(|| "Could not encode PublishLifecycleEventRequest proto")?; + + self.store + .as_store_driver_pin() + .update_oneshot(StoreKey::Digest(digest_info), buf.freeze()) + .await + .err_tip(|| "Failed to store PublishLifecycleEventRequest")?; + + Ok(Response::new(())) + } + + async fn inner_publish_build_tool_event_stream( + &self, + stream: Streaming, + ) -> Result, Error> { + async fn process_request( + store: Pin<&dyn StoreDriver>, + request: PublishBuildToolEventStreamRequest, + ) -> Result { + let ordered_build_event = request + .ordered_build_event + .as_ref() + .err_tip(|| "Expected ordered_build_event to be set")?; + let stream_id = ordered_build_event + .stream_id + .as_ref() + .err_tip(|| "Expected stream_id to be set")?; + + let sequence_number = ordered_build_event.sequence_number; + + let digest_info = + get_stream_digest(stream_id).err_tip(|| "Failed to prepare request for upload")?; + + let mut buf = BytesMut::new(); + request + .encode(&mut buf) + .err_tip(|| "Could not encode PublishBuildToolEventStreamRequest proto")?; + + store + .update_oneshot(StoreKey::Digest(digest_info), buf.freeze()) + .await + .err_tip(|| "Failed to store PublishBuildToolEventStreamRequest")?; + + Ok(PublishBuildToolEventStreamResponse { + stream_id: Some(stream_id.clone()), + sequence_number, + }) + } + struct State { + store: Store, + stream: Streaming, + } + Ok(Response::new(Box::pin(unfold( + Some(State { + store: self.store.clone(), + stream, + }), + move |maybe_state| async move { + let mut state = maybe_state?; + let request = match state + .stream + .message() + .await + .err_tip(|| "While receiving message in publish_build_tool_event_stream") + { + Ok(Some(request)) => request, + Ok(None) => return None, + Err(e) => return Some((Err(e.into()), None)), + }; + process_request(state.store.as_store_driver_pin(), request) + .await + .map_or_else( + |e| Some((Err(e), None)), + |response| Some((Ok(response), Some(state))), + ) + }, + )))) + } +} + +type PublishBuildToolEventStreamStream = Pin< + Box> + Send + 'static>, +>; + +#[tonic::async_trait] +impl PublishBuildEvent for BepServer { + type PublishBuildToolEventStreamStream = PublishBuildToolEventStreamStream; + + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] + async fn publish_lifecycle_event( + &self, + grpc_request: Request, + ) -> Result, Status> { + self.inner_publish_lifecycle_event(grpc_request.into_inner()) + .await + .map_err(|e| e.into()) + } + + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] + async fn publish_build_tool_event_stream( + &self, + grpc_request: Request>, + ) -> Result, Status> { + self.inner_publish_build_tool_event_stream(grpc_request.into_inner()) + .await + .map_err(|e| e.into()) + } +} diff --git a/nativelink-service/src/lib.rs b/nativelink-service/src/lib.rs index ba78350b7..c49b8aa44 100644 --- a/nativelink-service/src/lib.rs +++ b/nativelink-service/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod ac_server; +pub mod bep_server; pub mod bytestream_server; pub mod capabilities_server; pub mod cas_server; diff --git a/nativelink-service/tests/bep_server_test.rs b/nativelink-service/tests/bep_server_test.rs new file mode 100644 index 000000000..d64cc8f72 --- /dev/null +++ b/nativelink-service/tests/bep_server_test.rs @@ -0,0 +1,310 @@ +// Copyright 2023 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::StreamExt; +use hyper::Body; +use nativelink_config::cas_server::BepConfig; +use nativelink_error::{Error, ResultExt}; +use nativelink_macro::nativelink_test; +use nativelink_proto::google::devtools::build::v1::build_event::console_output::Output; +use nativelink_proto::google::devtools::build::v1::build_event::{ + BuildEnqueued, BuildFinished, ConsoleOutput, Event, InvocationAttemptFinished, + InvocationAttemptStarted, +}; +use nativelink_proto::google::devtools::build::v1::publish_build_event_server::PublishBuildEvent; +use nativelink_proto::google::devtools::build::v1::publish_lifecycle_event_request::ServiceLevel; +use nativelink_proto::google::devtools::build::v1::stream_id::BuildComponent; +use nativelink_proto::google::devtools::build::v1::{ + build_status, BuildEvent, BuildStatus, ConsoleOutputStream, OrderedBuildEvent, + PublishBuildToolEventStreamRequest, PublishLifecycleEventRequest, StreamId, +}; +use nativelink_service::bep_server::{get_stream_digest, BepServer}; +use nativelink_store::default_store_factory::store_factory; +use nativelink_store::store_manager::StoreManager; +use nativelink_util::buf_channel::make_buf_channel_pair; +use nativelink_util::common::encode_stream_proto; +use nativelink_util::store_trait::{Store, StoreKey, StoreLike}; +use pretty_assertions::assert_eq; +use prometheus_client::registry::Registry; +use prost::Message; +use prost_types::Timestamp; +use tonic::codec::{Codec, ProstCodec}; +use tonic::{Request, Streaming}; + +const BEP_STORE_NAME: &str = "main_bep"; + +/// Utility function to construct a [`StoreManager`] +async fn make_store_manager() -> Result, Error> { + let store_manager = Arc::new(StoreManager::new()); + store_manager.add_store( + BEP_STORE_NAME, + store_factory( + &nativelink_config::stores::StoreConfig::memory( + nativelink_config::stores::MemoryStore::default(), + ), + &store_manager, + Some(&mut ::default()), + None, + ) + .await?, + ); + Ok(store_manager) +} + +/// Utility function to construct a [`BepServer`] +fn make_bep_server(store_manager: &StoreManager) -> Result { + BepServer::new( + &BepConfig { + store: BEP_STORE_NAME.to_string(), + }, + store_manager, + ) +} + +fn get_bep_store(store_manager: &StoreManager) -> Result { + store_manager + .get_store(BEP_STORE_NAME) + .err_tip(|| format!("While retrieving bep_store {BEP_STORE_NAME}")) +} + +/// Asserts that a gRPC request for a [`PublishLifecycleEventRequest`] is correctly dumped into a [`Store`] +#[nativelink_test] +async fn publish_lifecycle_event_test() -> Result<(), Box> { + let store_manager = make_store_manager().await?; + let bep_server = make_bep_server(&store_manager)?; + let bep_store = get_bep_store(&store_manager)?; + + let stream_id = StreamId { + build_id: "some-build-id".to_string(), + invocation_id: "some-invocation-id".to_string(), + component: BuildComponent::Controller as i32, + }; + let digest = get_stream_digest(&stream_id)?; + + let request = PublishLifecycleEventRequest { + service_level: ServiceLevel::Interactive as i32, + build_event: Some(OrderedBuildEvent { + stream_id: Some(stream_id), + sequence_number: 1, + event: Some(BuildEvent { + event_time: Some(Timestamp::date(1999, 1, 6)?), + event: Some(Event::ConsoleOutput(ConsoleOutput { + r#type: ConsoleOutputStream::Stdout as i32, + output: Some(Output::TextOutput( + "Here's some text that's been printed to stdout".to_string(), + )), + })), + }), + }), + stream_timeout: None, + notification_keywords: vec!["testing".to_string(), "console".to_string()], + project_id: "some-project-id".to_string(), + check_preceding_lifecycle_events_present: false, + }; + + bep_server + .publish_lifecycle_event(Request::new(request.clone())) + .await + .err_tip(|| "While invoking publish_lifecycle_event")?; + + let (mut writer, mut reader) = make_buf_channel_pair(); + + bep_store + .as_store_driver_pin() + .get_part(StoreKey::Digest(digest), &mut writer, 0, None) + .await + .err_tip(|| "While retrieving lifecycle_event_request from store")?; + + let bytes = reader + .recv() + .await + .err_tip(|| "While receiving bytes from reader")?; + + let decoded_request = PublishLifecycleEventRequest::decode(bytes) + .err_tip(|| "While decoding request from bytes")?; + + assert_eq!(request, decoded_request); + + Ok(()) +} + +#[nativelink_test] +async fn publish_build_tool_event_stream_test() -> Result<(), Box> { + let store_manager = make_store_manager().await?; + let bep_server = make_bep_server(&store_manager)?; + let bep_store = get_bep_store(&store_manager)?; + + let (mut request_tx, mut response_stream) = async { + // setup the request and response streams + let (tx, body) = Body::channel(); + let mut codec = ProstCodec::::default(); + let stream = Streaming::new_request(codec.decoder(), body, None, None); + let stream = bep_server + .publish_build_tool_event_stream(Request::new(stream)) + .await + .err_tip(|| "While invoking publish_build_tool_event_stream")? + .into_inner(); + + Ok::<_, Box>((tx, stream)) + } + .await?; + + let (requests, store_key) = { + // construct some requests to send off + let stream_id = StreamId { + build_id: "some-build-id".to_string(), + invocation_id: "some-invocation-id".to_string(), + component: BuildComponent::Controller as i32, + }; + let digest = get_stream_digest(&stream_id)?; + let project_id = "some-project-id".to_string(); + + let requests = [ + PublishBuildToolEventStreamRequest { + ordered_build_event: Some(OrderedBuildEvent { + stream_id: Some(stream_id.clone()), + sequence_number: 1, + event: Some(BuildEvent { + event_time: Some(Timestamp::date(1999, 1, 4)?), + event: Some(Event::BuildEnqueued(BuildEnqueued { details: None })), + }), + }), + notification_keywords: vec!["testing".to_string(), "build-enqueued".to_string()], + project_id: project_id.clone(), + check_preceding_lifecycle_events_present: false, + }, + PublishBuildToolEventStreamRequest { + ordered_build_event: Some(OrderedBuildEvent { + stream_id: Some(stream_id.clone()), + sequence_number: 2, + event: Some(BuildEvent { + event_time: Some(Timestamp::date(1999, 1, 5)?), + event: Some(Event::InvocationAttemptStarted(InvocationAttemptStarted { + attempt_number: 1, + details: None, + })), + }), + }), + notification_keywords: vec!["testing".to_string()], + project_id: project_id.clone(), + check_preceding_lifecycle_events_present: false, + }, + PublishBuildToolEventStreamRequest { + ordered_build_event: Some(OrderedBuildEvent { + stream_id: Some(stream_id.clone()), + sequence_number: 3, + event: Some(BuildEvent { + event_time: Some(Timestamp::date(1999, 1, 6)?), + event: Some(Event::ConsoleOutput(ConsoleOutput { + r#type: ConsoleOutputStream::Stdout as i32, + output: Some(Output::TextOutput( + "This is taking a while...".to_string(), + )), + })), + }), + }), + notification_keywords: vec!["testing".to_string()], + project_id: project_id.clone(), + check_preceding_lifecycle_events_present: false, + }, + PublishBuildToolEventStreamRequest { + ordered_build_event: Some(OrderedBuildEvent { + stream_id: Some(stream_id.clone()), + sequence_number: 4, + event: Some(BuildEvent { + event_time: Some(Timestamp::date(1999, 1, 7)?), + event: Some(Event::InvocationAttemptFinished( + InvocationAttemptFinished { + invocation_status: Some(BuildStatus { + result: build_status::Result::InvocationDeadlineExceeded as i32, + final_invocation_id: String::default(), + build_tool_exit_code: Some(1), + error_message: "You missed my birthday!".to_string(), + details: None, + }), + details: None, + }, + )), + }), + }), + notification_keywords: vec!["testing".to_string()], + project_id: "some-project-id".to_string(), + check_preceding_lifecycle_events_present: false, + }, + PublishBuildToolEventStreamRequest { + ordered_build_event: Some(OrderedBuildEvent { + stream_id: Some(stream_id.clone()), + sequence_number: 5, + event: Some(BuildEvent { + event_time: Some(Timestamp::date(1999, 1, 8)?), + event: Some(Event::BuildFinished(BuildFinished { + status: Some(BuildStatus { + result: build_status::Result::InvocationDeadlineExceeded as i32, + final_invocation_id: String::default(), + build_tool_exit_code: Some(1), + error_message: "Missed her birthday...".to_string(), + details: None, + }), + details: None, + })), + }), + }), + notification_keywords: vec!["testing".to_string()], + project_id: project_id.clone(), + check_preceding_lifecycle_events_present: false, + }, + ]; + + (requests, StoreKey::Digest(digest)) + }; + + { + // send off the requests and validate the responses + for (i, req) in requests.iter().enumerate() { + let encoded_request = encode_stream_proto(req)?; + request_tx.send_data(encoded_request).await?; + + let response = response_stream + .next() + .await + .err_tip(|| "Response stream closed unexpectedly")? + .err_tip(|| "While awaiting next PublishBuildToolEventStreamResponse")?; + + // First, check if the response matches what we expect + assert_eq!(response.sequence_number, i as i64 + 1); + assert_eq!( + response.stream_id, + req.ordered_build_event + .as_ref() + .and_then(|evt| evt.stream_id.clone()) + ); + + // Second, check if the message was forwarded correctly + let (mut writer, mut reader) = make_buf_channel_pair(); + bep_store + .as_store_driver_pin() + .get_part(store_key.borrow(), &mut writer, 0, None) + .await?; + let encoded_request = reader.recv().await?; + + let decoded_request = PublishBuildToolEventStreamRequest::decode(encoded_request)?; + + assert_eq!(*req, decoded_request); + } + + Ok(()) + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index f0d3fca7f..2bf1e6d27 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -34,6 +34,7 @@ use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_scheduler::default_scheduler_factory::scheduler_factory; use nativelink_scheduler::worker::WorkerId; use nativelink_service::ac_server::AcServer; +use nativelink_service::bep_server::BepServer; use nativelink_service::bytestream_server::ByteStreamServer; use nativelink_service::capabilities_server::CapabilitiesServer; use nativelink_service::cas_server::CasServer; @@ -395,6 +396,32 @@ async fn inner_main( }) }) .err_tip(|| "Could not create WorkerApi service")?, + ) + .add_optional_service( + services + .experimental_bep + .map_or(Ok(None), |cfg| { + BepServer::new(&cfg, &store_manager).map(|v| { + let mut service = v.into_service(); + let send_algo = &http_config.compression.send_compression_algorithm; + if let Some(encoding) = + into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::none)) + { + service = service.send_compressed(encoding); + } + for encoding in http_config + .compression + .accepted_compression_algorithms + .iter() + // Filter None values. + .filter_map(into_encoding) + { + service = service.accept_compressed(encoding); + } + Some(service) + }) + }) + .err_tip(|| "Could not create WorkerApi service")?, ); let root_metrics_registry = root_metrics_registry.clone();