diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 7c51800622..2b5c73ffdb 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -14,10 +14,10 @@ use anyhow::anyhow; use async_trait::async_trait; use core::fmt::Debug; -/// Provides batches for the [BatchQueue] stage. +/// Provides [Batch]es for the [BatchQueue] stage. #[async_trait] pub trait BatchQueueProvider { - /// Returns the next batch in the [ChannelReader] stage, if the stage is not complete. + /// Returns the next [Batch] in the [ChannelReader] stage, if the stage is not complete. /// This function can only be called once while the stage is in progress, and will return /// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is /// complete and the batch has been consumed, an [StageError::Eof] error is returned. @@ -117,8 +117,10 @@ where // Get the epoch let epoch = self.l1_blocks[0]; - // TODO: log that the next batch is being derived. - // TODO: metrice the time it takes to derive the next batch. + self.telemetry.write( + Bytes::from(alloc::format!("Deriving next batch for epoch: {}", epoch.number)), + LogLevel::Info, + ); // Note: epoch origin can now be one block ahead of the L2 Safe Head // This is in the case where we auto generate all batches in an epoch & advance the epoch @@ -149,9 +151,14 @@ where remaining.push(batch.clone()); } BatchValidity::Drop => { - // TODO: Log the drop reason with WARN level. - // batch.log_context(self.log).warn("Dropping batch", "parent", parent.id(), - // "parent_time", parent.info.time); + self.telemetry.write( + Bytes::from(alloc::format!( + "Dropping batch: {:?}, parent: {}", + batch.batch, + parent.block_info + )), + LogLevel::Warning, + ); continue; } BatchValidity::Accept => { @@ -171,7 +178,10 @@ where self.batches = remaining; if let Some(nb) = next_batch { - // TODO: log that the next batch is found. + self.telemetry.write( + Bytes::from(alloc::format!("Next batch found: {:?}", nb.batch)), + LogLevel::Info, + ); return Ok(nb.batch); } @@ -182,8 +192,6 @@ where expiry_epoch < parent.l1_origin.number; let first_of_epoch = epoch.number == parent.l1_origin.number + 1; - // TODO: Log the empty batch generation. - // If the sequencer window did not expire, // there is still room to receive batches for the current epoch. // No need to force-create empty batch(es) towards the next epoch yet. @@ -191,6 +199,15 @@ where return Err(StageError::Eof); } + self.telemetry.write( + Bytes::from(alloc::format!( + "Generating empty batches. Epoch: {}, Parent: {}", + epoch.number, + parent.l1_origin.number + )), + LogLevel::Info, + ); + // The next L1 block is needed to proceed towards the next epoch. if self.l1_blocks.len() < 2 { return Err(StageError::Eof); @@ -202,7 +219,10 @@ where // to preserve that L2 time >= L1 time. If this is the first block of the epoch, always // generate a batch to ensure that we at least have one batch per epoch. if next_timestamp < next_epoch.timestamp || first_of_epoch { - // TODO: log next batch generation. + self.telemetry.write( + Bytes::from(alloc::format!("Generating empty batch for epoch: {}", epoch.number)), + LogLevel::Info, + ); return Ok(Batch::Single(SingleBatch { parent_hash: parent.block_info.hash, epoch_num: epoch.number, @@ -214,9 +234,15 @@ where // At this point we have auto generated every batch for the current epoch // that we can, so we can advance to the next epoch. - // TODO: log that the epoch is advanced. - // bq.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, - // "next_epoch_time", nextEpoch.Time) + self.telemetry.write( + Bytes::from(alloc::format!( + "Advancing to next epoch: {}, timestamp: {}, epoch timestamp: {}", + next_epoch.number, + next_timestamp, + next_epoch.timestamp + )), + LogLevel::Info, + ); self.l1_blocks.remove(0); Err(StageError::Eof) } @@ -224,7 +250,8 @@ where /// Adds a batch to the queue. pub fn add_batch(&mut self, batch: Batch, parent: L2BlockInfo) -> StageResult<()> { if self.l1_blocks.is_empty() { - // TODO: log that the batch cannot be added without an origin + self.telemetry + .write(Bytes::from("Cannot add batch without an origin"), LogLevel::Error); panic!("Cannot add batch without an origin"); } let origin = self.origin.ok_or_else(|| anyhow!("cannot add batch with missing origin"))?; @@ -259,9 +286,14 @@ where // Parent block does not match the next batch. // Means the previously returned batch is invalid. // Drop cached batches and find another batch. + self.telemetry.write( + Bytes::from(alloc::format!( + "Parent block does not match the next batch. Dropping {} cached batches.", + self.next_spans.len() + )), + LogLevel::Warning, + ); self.next_spans.clear(); - // TODO: log that the provided parent block does not match the next batch. - // TODO: metrice the internal batch drop. } // If the epoch is advanced, update the l1 blocks. @@ -273,8 +305,7 @@ where for (i, block) in self.l1_blocks.iter().enumerate() { if parent.l1_origin.number == block.number { self.l1_blocks.drain(0..i); - self.telemetry - .write(Bytes::from("Advancing internal L1 blocks"), LogLevel::Info); + self.telemetry.write(Bytes::from("Adancing epoch"), LogLevel::Info); break; } } @@ -303,7 +334,10 @@ where // reset is called, the origin behind is false. self.l1_blocks.clear(); } - // TODO: log batch queue origin advancement. + self.telemetry.write( + Bytes::from(alloc::format!("Batch queue advanced origin: {:?}", self.origin)), + LogLevel::Info, + ); } // Load more data into the batch queue. @@ -313,7 +347,8 @@ where if !origin_behind { self.add_batch(b, parent).ok(); } else { - // TODO: metrice when the batch is dropped because the origin is behind. + self.telemetry + .write(Bytes::from("[Batch Dropped]: Origin is behind"), LogLevel::Warning); } } Err(StageError::Eof) => out_of_data = true, @@ -445,6 +480,7 @@ mod tests { // TODO(refcell): The batch reader here loops forever. // Maybe the cursor isn't being used? + // UPDATE: the batch data is not valid // #[tokio::test] // async fn test_next_batch_succeeds() { // let mut reader = new_batch_reader();