diff --git a/op-node/rollup/derive/channel_bank.go b/op-node/rollup/derive/channel_bank.go index 64ca3fcdd580f..9026b7b4d3043 100644 --- a/op-node/rollup/derive/channel_bank.go +++ b/op-node/rollup/derive/channel_bank.go @@ -136,7 +136,7 @@ func (ib *ChannelBank) IngestData(data []byte) error { } ib.log.Debug("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) - if err := currentCh.IngestData(f.FrameNumber, f.IsLast, f.Data); err != nil { + if err := currentCh.IngestData(uint64(f.FrameNumber), f.IsLast, f.Data); err != nil { ib.log.Debug("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) if done { return nil diff --git a/op-node/rollup/derive/channel_bank_test.go b/op-node/rollup/derive/channel_bank_test.go index 0a11b13a9f4e3..3f9836d5295b7 100644 --- a/op-node/rollup/derive/channel_bank_test.go +++ b/op-node/rollup/derive/channel_bank_test.go @@ -1,6 +1,8 @@ package derive import ( + "bytes" + "fmt" "math/rand" "strconv" "strings" @@ -106,32 +108,28 @@ func (tf testFrame) Content() []byte { return []byte(strings.TrimSuffix(parts[3], "!")) } -func (tf testFrame) Encode() []byte { - chID := tf.ChannelID() - var out []byte - out = append(out, chID.Data[:]...) - out = append(out, makeUVarint(chID.Time)...) - out = append(out, makeUVarint(tf.FrameNumber())...) - content := tf.Content() - out = append(out, makeUVarint(uint64(len(content)))...) - out = append(out, content...) - if tf.IsLast() { - out = append(out, 1) - } else { - out = append(out, 0) +func (tf testFrame) ToFrame() Frame { + return Frame{ + ID: tf.ChannelID(), + FrameNumber: uint16(tf.FrameNumber()), + Data: tf.Content(), + IsLast: tf.IsLast(), } - return out } func (bt *bankTestSetup) ingestData(data []byte) { require.NoError(bt.t, bt.cb.IngestData(data)) } func (bt *bankTestSetup) ingestFrames(frames ...testFrame) { - data := []byte{DerivationVersion0} + data := new(bytes.Buffer) + data.WriteByte(DerivationVersion0) for _, fr := range frames { - data = append(data, fr.Encode()...) + f := fr.ToFrame() + if err := f.MarshalBinary(data); err != nil { + panic(fmt.Errorf("error in making frame during test: %w", err)) + } } - bt.ingestData(data) + bt.ingestData(data.Bytes()) } func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err error) { require.Equal(bt.t, err, RepeatStep(bt.t, bt.cb.Step, Progress{Origin: bt.origins[outer], Closed: outerClosed}, max)) @@ -292,10 +290,14 @@ func TestL1ChannelBank(t *testing.T) { bt.assertOriginTime(101) - badTx := []byte{DerivationVersion0} - badTx = append(badTx, testFrame("a:101:0:helloworld!").Encode()...) - badTx = append(badTx, testutils.RandomData(bt.rng, 30)...) // incomplete frame data - bt.ingestData(badTx) + badTx := new(bytes.Buffer) + badTx.WriteByte(DerivationVersion0) + goodFrame := testFrame("a:101:0:helloworld!").ToFrame() + if err := goodFrame.MarshalBinary(badTx); err != nil { + panic(fmt.Errorf("error in marshalling frame: %w", err)) + } + badTx.Write(testutils.RandomData(bt.rng, 30)) // incomplete frame data + bt.ingestData(badTx.Bytes()) bt.expectChannel("helloworld") // can still read the frames before the invalid data bt.repeatStep(2, 0, false, nil) bt.assertExpectations() diff --git a/op-node/rollup/derive/channel_frame.go b/op-node/rollup/derive/channel_frame.go index 5c277005ff77c..06814f4f2490f 100644 --- a/op-node/rollup/derive/channel_frame.go +++ b/op-node/rollup/derive/channel_frame.go @@ -12,70 +12,56 @@ import ( // but we leave space to grow larger anyway (gas limit allows for more data). const MaxFrameLen = 1_000_000 -var ErrNotEnoughFrameBytes = errors.New("not enough available bytes for the frame") - // Data Format // // frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last // // channel_id = random ++ timestamp // random = bytes32 -// timestamp = uvarint -// frame_number = uvarint -// frame_data_length = uvarint +// timestamp = uint64 +// frame_number = uint16 +// frame_data_length = uint32 // frame_data = bytes // is_last = bool type Frame struct { ID ChannelID - FrameNumber uint64 + FrameNumber uint16 Data []byte IsLast bool } // MarshalBinary writes the frame to `w`. -// It returns the number of bytes written as well as any -// error encountered while writing. -func (f *Frame) MarshalBinary(w io.Writer) (int, error) { - n, err := w.Write(f.ID.Data[:]) +// It returns any errors encountered while writing, but +// generally expects the writer very rarely fail. +func (f *Frame) MarshalBinary(w io.Writer) error { + _, err := w.Write(f.ID.Data[:]) if err != nil { - return n, err + return err } - l, err := w.Write(makeUVarint(f.ID.Time)) - n += l - if err != nil { - return n, err + if err := binary.Write(w, binary.BigEndian, f.ID.Time); err != nil { + return err } - l, err = w.Write(makeUVarint(f.FrameNumber)) - n += l - if err != nil { - return n, err + if err := binary.Write(w, binary.BigEndian, f.FrameNumber); err != nil { + return err } - - l, err = w.Write(makeUVarint(uint64(len(f.Data)))) - n += l - if err != nil { - return n, err + if err := binary.Write(w, binary.BigEndian, uint32(len(f.Data))); err != nil { + return err } - l, err = w.Write(f.Data) - n += l + _, err = w.Write(f.Data) if err != nil { - return n, err + return err } if f.IsLast { - l, err = w.Write([]byte{1}) - n += l - if err != nil { - return n, err + if _, err = w.Write([]byte{1}); err != nil { + return err } } else { - l, err = w.Write([]byte{0}) - n += l - if err != nil { - return n, err + if _, err = w.Write([]byte{0}); err != nil { + return err } } - return n, nil + return nil } type ByteReader interface { @@ -87,25 +73,23 @@ type ByteReader interface { // If `r` fails a read, it returns the error from the reader // The reader will be left in a partially read state. func (f *Frame) UnmarshalBinary(r ByteReader) error { - _, err := io.ReadFull(r, f.ID.Data[:]) - if err != nil { + if _, err := io.ReadFull(r, f.ID.Data[:]); err != nil { return fmt.Errorf("error reading ID: %w", err) } - f.ID.Time, err = binary.ReadUvarint(r) - if err != nil { - return fmt.Errorf("error reading ID.Time: %w", err) + if err := binary.Read(r, binary.BigEndian, &f.ID.Time); err != nil { + return fmt.Errorf("error reading ID time: %w", err) } // stop reading and ignore remaining data if we encounter a zeroed ID if f.ID == (ChannelID{}) { return io.EOF } - f.FrameNumber, err = binary.ReadUvarint(r) - if err != nil { + + if err := binary.Read(r, binary.BigEndian, &f.FrameNumber); err != nil { return fmt.Errorf("error reading frame number: %w", err) } - frameLength, err := binary.ReadUvarint(r) - if err != nil { + var frameLength uint32 + if err := binary.Read(r, binary.BigEndian, &frameLength); err != nil { return fmt.Errorf("error reading frame length: %w", err) } @@ -118,16 +102,15 @@ func (f *Frame) UnmarshalBinary(r ByteReader) error { return fmt.Errorf("error reading frame data: %w", err) } - isLastByte, err := r.ReadByte() - if err != nil && err != io.EOF { + if isLastByte, err := r.ReadByte(); err != nil && err != io.EOF { return fmt.Errorf("error reading final byte: %w", err) - } - if isLastByte == 0 { + } else if isLastByte == 0 { f.IsLast = false + return err } else if isLastByte == 1 { f.IsLast = true + return err } else { return errors.New("invalid byte as is_last") } - return err } diff --git a/op-node/rollup/derive/channel_out.go b/op-node/rollup/derive/channel_out.go index 085fb3b22eb1c..0f5e18b832adb 100644 --- a/op-node/rollup/derive/channel_out.go +++ b/op-node/rollup/derive/channel_out.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/zlib" "crypto/rand" - "encoding/binary" "errors" "io" @@ -80,12 +79,6 @@ func (co *ChannelOut) AddBlock(block *types.Block) error { return blockToBatch(block, co.compress) } -func makeUVarint(x uint64) []byte { - var tmp [binary.MaxVarintLen64]byte - n := binary.PutUvarint(tmp[:], x) - return tmp[:n] -} - // ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. // Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes // are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage. @@ -115,18 +108,18 @@ func (co *ChannelOut) Close() error { func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { f := Frame{ ID: co.id, - FrameNumber: co.frame, + FrameNumber: uint16(co.frame), } // Copy data from the local buffer into the frame data buffer - // Don't go past the maxSize even with the max possible uvarints - // +1 for single byte of frame content, +1 for lastFrame bool - // +24 for maximum uvarints - // +32 for the data ID - maxDataSize := maxSize - 32 - 24 - 1 - 1 - if maxDataSize >= uint64(co.buf.Len()) { + // Don't go past the maxSize with the fixed frame overhead. + // Fixed overhead: 32 + 8 + 2 + 4 + 1 = 47 bytes. + // Add one extra byte for the version byte (for the entire L1 tx though) + maxDataSize := maxSize - 47 - 1 + if maxDataSize > uint64(co.buf.Len()) { maxDataSize = uint64(co.buf.Len()) - // If we are closed & will not spill past the current frame, end it. + // If we are closed & will not spill past the current frame + // mark it is the final frame of the channel. if co.closed { f.IsLast = true } @@ -137,7 +130,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { return err } - if _, err := f.MarshalBinary(w); err != nil { + if err := f.MarshalBinary(w); err != nil { return err } diff --git a/specs/derivation.md b/specs/derivation.md index db7105e17f059..0fcd297016c62 100644 --- a/specs/derivation.md +++ b/specs/derivation.md @@ -265,19 +265,21 @@ frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last channel_id = random ++ timestamp random = bytes32 -timestamp = uvarint -frame_number = uvarint -frame_data_length = uvarint +timestamp = uint64 +frame_number = uint16 +frame_data_length = uint32 frame_data = bytes is_last = bool + +Where `uint64`, `uint32` and `uint16` are all big-endian unsigned integers. ``` -> **TODO** replace `uvarint` by fixed size integers +All data in a frame is fixed-size, except the `frame_data`. The fixed overhead is `32 + 8 + 2 + 4 + 1 = 47 bytes`. +Fixed-size frame metadata avoids a circular dependency with the target total data length, +to simplify packing of frames with varying content length. where: -- `uvarint` is a variable-length encoding of a 64-bit unsigned integer into between 1 and 9 bytes, [as specified in - SQLite 4][sqlite-uvarint]. - `channel_id` uniquely identifies a channel as the concatenation of a random value and a timestamp - `random` is a random value such that two channels with different batches should have a different random value - `timestamp` is the time at which the channel was created (UNIX time in seconds) @@ -290,7 +292,7 @@ where: margin. (A soft constraint is not a consensus rule — nodes will accept such blocks in the canonical chain but will not attempt to build directly on them.) - `frame_number` identifies the index of the frame within the channel -- `frame_data_length` is the length of `frame_data` in bytes +- `frame_data_length` is the length of `frame_data` in bytes. It is capped to 1,000,000 bytes. - `frame_data` is a sequence of bytes belonging to the channel, logically after the bytes from the previous frames - `is_last` is a single byte with a value of 1 if the frame is the last in the channel, 0 if there are frames in the channel. Any other value makes the frame invalid (it must be ignored by the rollup node). @@ -302,8 +304,7 @@ where: > - Do we drop the channel or just the first frame? End result is the same but this changes the channel bank size, which > can influence things down the line!! -[sqlite-uvarint]: https://www.sqlite.org/src4/doc/trunk/www/varint.wiki -[batcher-spec]: batcher.md +[batcher-spec]: batching.md ### Channel Format