Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cc930b8
move optimistic block protos to v1
bharath-123 Nov 19, 2024
ed18019
move optimistic block protos to v1
bharath-123 Nov 5, 2024
e9e3ad6
implement optimistic block apis
bharath-123 Dec 9, 2024
0f5de0d
send commited block from finalize_block
bharath-123 Oct 22, 2024
061943f
add ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCK to configmaps
bharath-123 Oct 22, 2024
3db9fc2
fix lint issues
bharath-123 Oct 22, 2024
09e8969
remove debug trait from OptimisticBlockChannels
bharath-123 Oct 22, 2024
4339c09
minor updates
bharath-123 Oct 22, 2024
a9efa2a
update sequencer chart version
bharath-123 Oct 29, 2024
25cdfe0
minor changes
bharath-123 Oct 22, 2024
b4bccec
just use Ok while sending data over grpc stream instead of type casti…
bharath-123 Oct 22, 2024
67333af
update optimistic block imports
bharath-123 Dec 9, 2024
c3adf59
minor updates
bharath-123 Oct 29, 2024
e6baaeb
move optimistic block grpc wrappers to v1
bharath-123 Oct 30, 2024
97b827f
rebase from pr 1707
bharath-123 Dec 9, 2024
74643db
address review comments
bharath-123 Jan 7, 2025
6b514af
minor updates
bharath-123 Nov 18, 2024
d78cb11
address review comments
bharath-123 Nov 18, 2024
afb65a8
make task functions free standing
bharath-123 Nov 18, 2024
a3a2e57
minor renaming
bharath-123 Nov 19, 2024
0932308
rebase protos
bharath-123 Dec 9, 2024
de60e17
rename optimistic_block to optimistic
bharath-123 Nov 19, 2024
5d05571
address review comments
bharath-123 Jan 7, 2025
bdd721c
use in_scope to emit events under a specific span
bharath-123 Nov 26, 2024
0ca0832
add a comment
bharath-123 Nov 26, 2024
a22db23
add documentation to event bus
bharath-123 Nov 26, 2024
3c7f33a
run fmt + lint
bharath-123 Nov 26, 2024
49b29a9
fix comment
bharath-123 Nov 26, 2024
31fe404
lint + fmt
bharath-123 Dec 9, 2024
ea05594
only stream new values for the optimistic block and block commitment
bharath-123 Dec 9, 2024
7bb792d
add some comments
bharath-123 Dec 11, 2024
b09f2d5
address review comments - 4
bharath-123 Dec 13, 2024
f2ea8d2
save
bharath-123 Dec 17, 2024
d0191f0
run fmt + lint
bharath-123 Jan 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/sequencer/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ version: 1.0.1
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.0.0"
appVersion: "1.0.1"

dependencies:
- name: sequencer-relayer
Expand Down
1 change: 1 addition & 0 deletions charts/sequencer/templates/configmaps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,6 @@ data:
OTEL_SERVICE_NAME: "{{ tpl .Values.sequencer.otel.serviceName . }}"
{{- if not .Values.global.dev }}
{{- else }}
ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS: "{{ not .Values.sequencer.optimisticBlockApis.enabled }}"
{{- end }}
---
2 changes: 2 additions & 0 deletions charts/sequencer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ sequencer:
tracesTimeout: 10
otlpHeaders:
traceHeaders:
optimisticBlockApis:
enabled: false

cometbft:
config:
Expand Down
1 change: 1 addition & 0 deletions crates/astria-core/src/sequencerblock/v1/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod block;
pub mod celestia;
pub mod optimistic;

pub use block::{
RollupTransactions,
Expand Down
83 changes: 83 additions & 0 deletions crates/astria-core/src/sequencerblock/v1/optimistic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use bytes::Bytes;

use crate::{
generated::astria::sequencerblock::optimistic::v1alpha1 as raw,
Protobuf,
};

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub struct SequencerBlockCommitError(SequencerBlockCommitErrorKind);

impl SequencerBlockCommitError {
fn invalid_block_hash(len: usize) -> Self {
Self(SequencerBlockCommitErrorKind::InvalidBlockHash(len))
}
}

#[derive(Debug, thiserror::Error)]
enum SequencerBlockCommitErrorKind {
#[error("invalid block hash length: {0}")]
InvalidBlockHash(usize),
}

#[derive(Clone, Debug)]
pub struct SequencerBlockCommit {
height: u64,
block_hash: [u8; 32],
}

impl SequencerBlockCommit {
#[must_use]
pub fn new(height: u64, block_hash: [u8; 32]) -> Self {
Self {
height,
block_hash,
}
}

#[must_use]
pub fn height(&self) -> u64 {
self.height
}

#[must_use]
pub fn block_hash(&self) -> &[u8; 32] {
&self.block_hash
}
}

impl From<SequencerBlockCommit> for raw::SequencerBlockCommit {
fn from(value: SequencerBlockCommit) -> Self {
value.to_raw()
}
}

impl Protobuf for SequencerBlockCommit {
type Error = SequencerBlockCommitError;
type Raw = raw::SequencerBlockCommit;

fn try_from_raw_ref(raw: &Self::Raw) -> Result<Self, Self::Error> {
let Self::Raw {
height,
block_hash,
} = raw;

let block_hash = block_hash
.as_ref()
.try_into()
.map_err(|_| SequencerBlockCommitError::invalid_block_hash(block_hash.len()))?;

Ok(SequencerBlockCommit {
height: *height,
block_hash,
})
}

fn to_raw(&self) -> Self::Raw {
raw::SequencerBlockCommit {
height: self.height(),
block_hash: Bytes::copy_from_slice(self.block_hash()),
}
}
}
1 change: 1 addition & 0 deletions crates/astria-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ tendermint-proto = { workspace = true }
tendermint = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt", "tracing"] }
tokio-util = { workspace = true, features = ["rt"] }
tonic = { workspace = true }
tracing = { workspace = true }

Expand Down
3 changes: 3 additions & 0 deletions crates/astria-sequencer/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ ASTRIA_SEQUENCER_METRICS_HTTP_LISTENER_ADDR="127.0.0.1:9000"
# `ASTRIA_SEQUENCER_FORCE_STDOUT` is set to `true`.
ASTRIA_SEQUENCER_PRETTY_PRINT=false

# Disables streaming optimistic blocks to clients.
ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS=false

# If set to any non-empty value removes ANSI escape characters from the pretty
# printed output. Note that this does nothing unless `ASTRIA_SEQUENCER_PRETTY_PRINT`
# is set to `true`.
Expand Down
158 changes: 158 additions & 0 deletions crates/astria-sequencer/src/app/event_bus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::sync::Arc;

use astria_core::sequencerblock::v1::SequencerBlock;
use astria_eyre::eyre::WrapErr as _;
use tendermint::abci::request::FinalizeBlock;
use tokio::sync::watch::{
Receiver,
Sender,
};
use tokio_util::sync::CancellationToken;

/// `EventReceiver` contains the receiver side of the events sent by the Sequencer App.
/// The listeners of the events can receive the latest value of the event by calling the
/// `receive` method.
#[derive(Clone)]
pub(crate) struct EventReceiver<T> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add explanations on what the fields do (specifically, is_init because it might not be immediately obvious to a reader).

// The receiver side of the watch which is read for the latest value of the event.
// We receive an Option over T because the sender side of the watch is designed to send
// Option values. This allows the sender value to send objects which do not have a `Default`
// implementation.
inner: Receiver<Option<T>>,
// To signal subscribers that the event bus is initialized, i.e. that the value in `sender` was
// set.
is_init: CancellationToken,
}

impl<T> EventReceiver<T>
where
T: Clone,
{
// Marks the current message in the receiver end of the watch as seen.
// This is useful in situations where we want to ignore the current value of the watch
// and wait for the next value.
pub(crate) fn mark_latest_event_as_seen(&mut self) {
self.inner.mark_unchanged();
}

// Returns the latest value of the event, waiting for the value to change if it hasn't already.
pub(crate) async fn receive(&mut self) -> astria_eyre::Result<T> {
// This will get resolved when the sender side of the watch is initialized by sending
// the first value. We wait till the sender side of the watch has initialized the watch.
// Once the sender side has been initialized, it will get resolved immediately.
self.is_init.cancelled().await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note that this will resolve immediately once initialized.

// We want to only receive the latest value through the receiver, so we wait for the
// current value in the watch to change before we return it.
self.inner
.changed()
.await
.wrap_err("error waiting for latest event")?;
Ok(self.inner.borrow_and_update().clone().expect(
"events must be set after is_init is triggered; this means an invariant was violated",
))
}
}

/// `EventSender` contains the sender side of the events sent by the Sequencer App.
/// At any given time, it sends the latest value of the event.
struct EventSender<T> {
// A watch channel that is always starts unset. Once set, the `is_init` token is cancelled
// and value in the channel will never be unset.
inner: Sender<Option<T>>,
// To signal subscribers that the event bus is initialized, i.e. that the value in `sender` was
// set.
is_init: CancellationToken,
}

impl<T> EventSender<T> {
fn new() -> Self {
let (sender, _) = tokio::sync::watch::channel(None);
Self {
inner: sender,
is_init: CancellationToken::new(),
}
}

// Returns a `EventReceiver` object that contains the receiver side of the watch which can be
// used to receive the latest value of the event.
fn subscribe(&self) -> EventReceiver<T> {
EventReceiver {
inner: self.inner.subscribe(),
is_init: self.is_init.clone(),
}
}

// Sends the event to all the subscribers.
fn send(&self, event: T) {
self.inner.send_replace(Some(event));
// after sending the first value, we resolve the is_init token to signal that the sender
// side of the watch is initialized. The receiver side can now start receiving valid
// values.
self.is_init.cancel();
}
}

/// `EventBusSubscription` contains [`EventReceiver`] of various events that can be subscribed.
/// It can be cloned by various components in the sequencer app to receive events.
#[derive(Clone)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on the clone (and mentioned in App): I don't think this should be cloned anywhere - only App should send things over these channels. Instead, provide something like:

#[derive(Clone)]
pub(crate) struct EventBusSubscription {
    process_proposal_blocks: EventReceiver<Arc<SequencerBlock>>,
    finalize_blocks: EventReceiver<Arc<SequencerBlockCommit>>,
}

pub(crate) struct EventBusSubscription {
process_proposal_blocks: EventReceiver<Arc<SequencerBlock>>,
finalized_blocks: EventReceiver<Arc<FinalizeBlock>>,
}

impl EventBusSubscription {
pub(crate) fn process_proposal_blocks(&mut self) -> EventReceiver<Arc<SequencerBlock>> {
self.process_proposal_blocks.clone()
}

pub(crate) fn finalized_blocks(&mut self) -> EventReceiver<Arc<FinalizeBlock>> {
self.finalized_blocks.clone()
}
}

/// The Sequencer `EventBus` is used to send and receive events between different components of the
/// sequencer. Components of Sequencer can subscribe to the `EventBus` via the `subscribe` method
/// which returns a [`EventBusSubscription`] objects that contains receivers of various events which
/// are of type [`EventReceiver`].
///
/// The `EventBus` is implemented using [`tokio::sync::watch`] which allows for multiple receivers
/// to receive the event at any given time.
pub(super) struct EventBus {
// Sends a process proposal block event to the subscribers. The event is sent in the form of a
// sequencer block which is created during the process proposal block phase.
process_proposal_block_sender: EventSender<Arc<SequencerBlock>>,
// Sends a finalized block event to the subscribers. The event is sent in the form of the
// finalize block abci request.
finalized_block_sender: EventSender<Arc<FinalizeBlock>>,
}

impl EventBus {
pub(super) fn new() -> Self {
let process_proposal_block_sender = EventSender::new();
let finalized_block_sender = EventSender::new();

Self {
process_proposal_block_sender,
finalized_block_sender,
}
}

// Returns a `EventBusSubscription` object that contains receivers of various events that can
// be subscribed to.
pub(crate) fn subscribe(&self) -> EventBusSubscription {
EventBusSubscription {
process_proposal_blocks: self.process_proposal_block_sender.subscribe(),
finalized_blocks: self.finalized_block_sender.subscribe(),
}
}

// Sends a process proposal block event to the subscribers.
pub(super) fn send_process_proposal_block(&self, sequencer_block: Arc<SequencerBlock>) {
self.process_proposal_block_sender.send(sequencer_block);
}

// Sends a finalized block event to the subscribers.
pub(super) fn send_finalized_block(&self, sequencer_block_commit: Arc<FinalizeBlock>) {
self.finalized_block_sender.send(sequencer_block_commit);
}
}
Loading