Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion op-node/rollup/derive/channel_bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (ib *ChannelBank) IngestData(data []byte) error {
}

ib.log.Debug("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.IngestData(f.FrameNumber, f.IsLast, f.Data); err != nil {
if err := currentCh.IngestData(uint64(f.FrameNumber), f.IsLast, f.Data); err != nil {
ib.log.Debug("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
if done {
return nil
Expand Down
44 changes: 23 additions & 21 deletions op-node/rollup/derive/channel_bank_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package derive

import (
"bytes"
"fmt"
"math/rand"
"strconv"
"strings"
Expand Down Expand Up @@ -106,32 +108,28 @@ func (tf testFrame) Content() []byte {
return []byte(strings.TrimSuffix(parts[3], "!"))
}

func (tf testFrame) Encode() []byte {
chID := tf.ChannelID()
var out []byte
out = append(out, chID.Data[:]...)
out = append(out, makeUVarint(chID.Time)...)
out = append(out, makeUVarint(tf.FrameNumber())...)
content := tf.Content()
out = append(out, makeUVarint(uint64(len(content)))...)
out = append(out, content...)
if tf.IsLast() {
out = append(out, 1)
} else {
out = append(out, 0)
func (tf testFrame) ToFrame() Frame {
return Frame{
ID: tf.ChannelID(),
FrameNumber: uint16(tf.FrameNumber()),
Data: tf.Content(),
IsLast: tf.IsLast(),
}
return out
}

func (bt *bankTestSetup) ingestData(data []byte) {
require.NoError(bt.t, bt.cb.IngestData(data))
}
func (bt *bankTestSetup) ingestFrames(frames ...testFrame) {
data := []byte{DerivationVersion0}
data := new(bytes.Buffer)
data.WriteByte(DerivationVersion0)
for _, fr := range frames {
data = append(data, fr.Encode()...)
f := fr.ToFrame()
if err := f.MarshalBinary(data); err != nil {
panic(fmt.Errorf("error in making frame during test: %w", err))
}
}
bt.ingestData(data)
bt.ingestData(data.Bytes())
}
func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err error) {
require.Equal(bt.t, err, RepeatStep(bt.t, bt.cb.Step, Progress{Origin: bt.origins[outer], Closed: outerClosed}, max))
Expand Down Expand Up @@ -292,10 +290,14 @@ func TestL1ChannelBank(t *testing.T) {

bt.assertOriginTime(101)

badTx := []byte{DerivationVersion0}
badTx = append(badTx, testFrame("a:101:0:helloworld!").Encode()...)
badTx = append(badTx, testutils.RandomData(bt.rng, 30)...) // incomplete frame data
bt.ingestData(badTx)
badTx := new(bytes.Buffer)
badTx.WriteByte(DerivationVersion0)
goodFrame := testFrame("a:101:0:helloworld!").ToFrame()
if err := goodFrame.MarshalBinary(badTx); err != nil {
panic(fmt.Errorf("error in marshalling frame: %w", err))
}
badTx.Write(testutils.RandomData(bt.rng, 30)) // incomplete frame data
bt.ingestData(badTx.Bytes())
bt.expectChannel("helloworld") // can still read the frames before the invalid data
bt.repeatStep(2, 0, false, nil)
bt.assertExpectations()
Expand Down
83 changes: 33 additions & 50 deletions op-node/rollup/derive/channel_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,70 +12,56 @@ import (
// but we leave space to grow larger anyway (gas limit allows for more data).
const MaxFrameLen = 1_000_000

var ErrNotEnoughFrameBytes = errors.New("not enough available bytes for the frame")

// Data Format
//
// frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last
//
// channel_id = random ++ timestamp
// random = bytes32
// timestamp = uvarint
// frame_number = uvarint
// frame_data_length = uvarint
// timestamp = uint64
// frame_number = uint16
// frame_data_length = uint32
// frame_data = bytes
// is_last = bool

type Frame struct {
ID ChannelID
FrameNumber uint64
FrameNumber uint16
Data []byte
IsLast bool
}

// MarshalBinary writes the frame to `w`.
// It returns the number of bytes written as well as any
// error encountered while writing.
func (f *Frame) MarshalBinary(w io.Writer) (int, error) {
n, err := w.Write(f.ID.Data[:])
// It returns any errors encountered while writing, but
// generally expects the writer very rarely fail.
func (f *Frame) MarshalBinary(w io.Writer) error {
_, err := w.Write(f.ID.Data[:])
if err != nil {
return n, err
return err
}
l, err := w.Write(makeUVarint(f.ID.Time))
n += l
if err != nil {
return n, err
if err := binary.Write(w, binary.BigEndian, f.ID.Time); err != nil {
return err
}
l, err = w.Write(makeUVarint(f.FrameNumber))
n += l
if err != nil {
return n, err
if err := binary.Write(w, binary.BigEndian, f.FrameNumber); err != nil {
return err
}

l, err = w.Write(makeUVarint(uint64(len(f.Data))))
n += l
if err != nil {
return n, err
if err := binary.Write(w, binary.BigEndian, uint32(len(f.Data))); err != nil {
return err
}
l, err = w.Write(f.Data)
n += l
_, err = w.Write(f.Data)
if err != nil {
return n, err
return err
}
if f.IsLast {
l, err = w.Write([]byte{1})
n += l
if err != nil {
return n, err
if _, err = w.Write([]byte{1}); err != nil {
return err
}
} else {
l, err = w.Write([]byte{0})
n += l
if err != nil {
return n, err
if _, err = w.Write([]byte{0}); err != nil {
return err
}
}
return n, nil
return nil
}

type ByteReader interface {
Expand All @@ -87,25 +73,23 @@ type ByteReader interface {
// If `r` fails a read, it returns the error from the reader
// The reader will be left in a partially read state.
func (f *Frame) UnmarshalBinary(r ByteReader) error {
_, err := io.ReadFull(r, f.ID.Data[:])
if err != nil {
if _, err := io.ReadFull(r, f.ID.Data[:]); err != nil {
return fmt.Errorf("error reading ID: %w", err)
}
f.ID.Time, err = binary.ReadUvarint(r)
if err != nil {
return fmt.Errorf("error reading ID.Time: %w", err)
if err := binary.Read(r, binary.BigEndian, &f.ID.Time); err != nil {
return fmt.Errorf("error reading ID time: %w", err)
}
// stop reading and ignore remaining data if we encounter a zeroed ID
if f.ID == (ChannelID{}) {
return io.EOF
}
f.FrameNumber, err = binary.ReadUvarint(r)
if err != nil {

if err := binary.Read(r, binary.BigEndian, &f.FrameNumber); err != nil {
return fmt.Errorf("error reading frame number: %w", err)
}

frameLength, err := binary.ReadUvarint(r)
if err != nil {
var frameLength uint32
if err := binary.Read(r, binary.BigEndian, &frameLength); err != nil {
return fmt.Errorf("error reading frame length: %w", err)
}

Expand All @@ -118,16 +102,15 @@ func (f *Frame) UnmarshalBinary(r ByteReader) error {
return fmt.Errorf("error reading frame data: %w", err)
}

isLastByte, err := r.ReadByte()
if err != nil && err != io.EOF {
if isLastByte, err := r.ReadByte(); err != nil && err != io.EOF {
return fmt.Errorf("error reading final byte: %w", err)
}
if isLastByte == 0 {
} else if isLastByte == 0 {
f.IsLast = false
return err
} else if isLastByte == 1 {
f.IsLast = true
return err
} else {
return errors.New("invalid byte as is_last")
}
return err
}
25 changes: 9 additions & 16 deletions op-node/rollup/derive/channel_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"compress/zlib"
"crypto/rand"
"encoding/binary"
"errors"
"io"

Expand Down Expand Up @@ -80,12 +79,6 @@ func (co *ChannelOut) AddBlock(block *types.Block) error {
return blockToBatch(block, co.compress)
}

func makeUVarint(x uint64) []byte {
var tmp [binary.MaxVarintLen64]byte
n := binary.PutUvarint(tmp[:], x)
return tmp[:n]
}

// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
Expand Down Expand Up @@ -115,18 +108,18 @@ func (co *ChannelOut) Close() error {
func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error {
f := Frame{
ID: co.id,
FrameNumber: co.frame,
FrameNumber: uint16(co.frame),
}

// Copy data from the local buffer into the frame data buffer
// Don't go past the maxSize even with the max possible uvarints
// +1 for single byte of frame content, +1 for lastFrame bool
// +24 for maximum uvarints
// +32 for the data ID
maxDataSize := maxSize - 32 - 24 - 1 - 1
if maxDataSize >= uint64(co.buf.Len()) {
// Don't go past the maxSize with the fixed frame overhead.
// Fixed overhead: 32 + 8 + 2 + 4 + 1 = 47 bytes.
// Add one extra byte for the version byte (for the entire L1 tx though)
maxDataSize := maxSize - 47 - 1
if maxDataSize > uint64(co.buf.Len()) {
maxDataSize = uint64(co.buf.Len())
// If we are closed & will not spill past the current frame, end it.
// If we are closed & will not spill past the current frame
// mark it is the final frame of the channel.
if co.closed {
f.IsLast = true
}
Expand All @@ -137,7 +130,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error {
return err
}

if _, err := f.MarshalBinary(w); err != nil {
if err := f.MarshalBinary(w); err != nil {
return err
}

Expand Down
19 changes: 10 additions & 9 deletions specs/derivation.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,19 +265,21 @@ frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last

channel_id = random ++ timestamp
random = bytes32
timestamp = uvarint
frame_number = uvarint
frame_data_length = uvarint
timestamp = uint64
frame_number = uint16
frame_data_length = uint32
frame_data = bytes
is_last = bool

Where `uint64`, `uint32` and `uint16` are all big-endian unsigned integers.
```

> **TODO** replace `uvarint` by fixed size integers
All data in a frame is fixed-size, except the `frame_data`. The fixed overhead is `32 + 8 + 2 + 4 + 1 = 47 bytes`.
Fixed-size frame metadata avoids a circular dependency with the target total data length,
to simplify packing of frames with varying content length.

where:

- `uvarint` is a variable-length encoding of a 64-bit unsigned integer into between 1 and 9 bytes, [as specified in
SQLite 4][sqlite-uvarint].
- `channel_id` uniquely identifies a channel as the concatenation of a random value and a timestamp
- `random` is a random value such that two channels with different batches should have a different random value
- `timestamp` is the time at which the channel was created (UNIX time in seconds)
Expand All @@ -290,7 +292,7 @@ where:
margin. (A soft constraint is not a consensus rule — nodes will accept such blocks in the canonical chain but will
not attempt to build directly on them.)
- `frame_number` identifies the index of the frame within the channel
- `frame_data_length` is the length of `frame_data` in bytes
- `frame_data_length` is the length of `frame_data` in bytes. It is capped to 1,000,000 bytes.
- `frame_data` is a sequence of bytes belonging to the channel, logically after the bytes from the previous frames
- `is_last` is a single byte with a value of 1 if the frame is the last in the channel, 0 if there are frames in the
channel. Any other value makes the frame invalid (it must be ignored by the rollup node).
Expand All @@ -302,8 +304,7 @@ where:
> - Do we drop the channel or just the first frame? End result is the same but this changes the channel bank size, which
> can influence things down the line!!

[sqlite-uvarint]: https://www.sqlite.org/src4/doc/trunk/www/varint.wiki
[batcher-spec]: batcher.md
[batcher-spec]: batching.md

### Channel Format

Expand Down