Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9aee0d6
feat: add blob uploader service
yiweichi May 29, 2025
4bf4196
feat: add blob uploader aws s3
yiweichi May 30, 2025
385d97c
chores
yiweichi Jun 2, 2025
2acce16
feat: add start batch
yiweichi Jun 2, 2025
e8e7fb1
fix: app.go
yiweichi Jun 2, 2025
b9fa8c8
fix: database migrate
yiweichi Jun 2, 2025
a8c913f
fix: database migrate
yiweichi Jun 2, 2025
6c68f3c
debug logs
yiweichi Jun 2, 2025
e84a2b8
debug logs
yiweichi Jun 2, 2025
d623baf
fix: GetFirstUnuploadedAndFailedBatch
yiweichi Jun 2, 2025
6a499fa
rm logs
yiweichi Jun 2, 2025
b6bf805
fix: ci
yiweichi Jun 3, 2025
03410b7
fix: database test
yiweichi Jun 3, 2025
030742c
feat: support multi codec version
yiweichi Jun 3, 2025
b1a8ba7
fix: ci
yiweichi Jun 3, 2025
ef88fef
fix: address comments
yiweichi Jun 3, 2025
1d8a48a
fix: constructBlobCodec
yiweichi Jun 3, 2025
0570525
Update rollup/cmd/blob_uploader/app/app.go
yiweichi Jun 4, 2025
a434b2c
Apply suggestions from code review
yiweichi Jun 4, 2025
4f9a98e
fix: address comments
yiweichi Jun 4, 2025
1ba262a
fix: remove left join
yiweichi Jun 5, 2025
8964e3f
Merge branch 'develop' into feat-add-blob-storage-service
yiweichi Jun 5, 2025
94c8e4a
fix: ci
yiweichi Jun 5, 2025
bf8a149
fix: typo
yiweichi Jun 5, 2025
c4c765f
perfect logs
yiweichi Jun 5, 2025
8ac3181
fix: typo
yiweichi Jun 5, 2025
6d08239
fix: logs
yiweichi Jun 5, 2025
66ebecf
feat: add blob-uploader docker ci
yiweichi Jun 5, 2025
4c5a1bf
fix: address comments
yiweichi Jun 8, 2025
4c988a9
fix: address comments
yiweichi Jun 9, 2025
b6e7c3e
fix: use unique key
yiweichi Jun 9, 2025
19b59b9
chore: auto version bump [bot]
yiweichi Jun 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/testdata/blobdata.json

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions common/types/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,53 @@ func (s TxStatus) String() string {
return fmt.Sprintf("Unknown TxStatus (%d)", int32(s))
}
}

// BlobUploadStatus represents the status of a blob upload
type BlobUploadStatus int

const (
// BlobUploadStatusUndefined indicates an undefined status
BlobUploadStatusUndefined BlobUploadStatus = iota
// BlobUploadStatusPending indicates a pending upload status
BlobUploadStatusPending
// BlobUploadStatusUploaded indicates a successful upload status
BlobUploadStatusUploaded
// BlobUploadStatusFailed indicates a failed upload status
BlobUploadStatusFailed
)

func (s BlobUploadStatus) String() string {
switch s {
case BlobUploadStatusPending:
return "BlobUploadStatusPending"
case BlobUploadStatusUploaded:
return "BlobUploadStatusUploaded"
case BlobUploadStatusFailed:
return "BlobUploadStatusFailed"
default:
return fmt.Sprintf("Unknown BlobUploadStatus (%d)", int32(s))
}
}

// BlobStoragePlatform represents the platform a blob upload to
type BlobStoragePlatform int

const (
// BlobStoragePlatformUndefined indicates an undefined platform
BlobStoragePlatformUndefined BlobStoragePlatform = iota
// BlobStoragePlatformS3 represents AWS S3
BlobStoragePlatformS3
// BlobUploadStatusUploaded represents storage blockchain Arweave
BlobStoragePlatformArweave
)

func (s BlobStoragePlatform) String() string {
switch s {
case BlobStoragePlatformS3:
return "BlobStoragePlatformS3"
case BlobStoragePlatformArweave:
return "BlobStoragePlatformArweave"
default:
return fmt.Sprintf("Unknown BlobStoragePlatform (%d)", int32(s))
}
}
23 changes: 23 additions & 0 deletions common/utils/ethereum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package utils

import (
"crypto/sha256"
"fmt"

"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
)

// CalculateVersionedBlobHash calculate the kzg4844 versioned blob hash from a blob
func CalculateVersionedBlobHash(blob kzg4844.Blob) ([32]byte, error) {
// calculate kzg4844 commitment from blob
commit, err := kzg4844.BlobToCommitment(&blob)
if err != nil {
return [32]byte{}, fmt.Errorf("failed to get blob commitment, err: %w", err)
}

// calculate kzg4844 versioned blob hash from blob commitment
hasher := sha256.New()
vh := kzg4844.CalcBlobHashV1(hasher, &commit)

return vh, nil
}
53 changes: 53 additions & 0 deletions common/utils/ethereum_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package utils

import (
"encoding/hex"
"encoding/json"
"fmt"
"os"
"testing"

"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
)

type BlobData struct {
VersionedBlobHash string `json:"versionedBlobHash"`
BlobData string `json:"blobData"`
}

// TestCalculateVersionedBlobHash tests the CalculateVersionedBlobHash function
func TestCalculateVersionedBlobHash(t *testing.T) {
// Read the test data
data, err := os.ReadFile("../testdata/blobdata.json")
if err != nil {
t.Fatalf("Failed to read blobdata.json: %v", err)
}

var blobData BlobData
if err := json.Unmarshal(data, &blobData); err != nil {
t.Fatalf("Failed to parse blobdata.json: %v", err)
}

fmt.Println(blobData.BlobData)
blobBytes, err := hex.DecodeString(blobData.BlobData)
if err != nil {
t.Fatalf("Failed to decode blob data: %v", err)
}

// Convert []byte to kzg4844.Blob
var blob kzg4844.Blob
copy(blob[:], blobBytes)

// Calculate the hash
calculatedHashBytes, err := CalculateVersionedBlobHash(blob)
if err != nil {
t.Fatalf("Failed to calculate versioned blob hash: %v", err)
}

calculatedHash := hex.EncodeToString(calculatedHashBytes[:])

if calculatedHash != blobData.VersionedBlobHash {
t.Fatalf("Hash mismatch: got %s, want %s", calculatedHash, blobData.VersionedBlobHash)
}

}
22 changes: 22 additions & 0 deletions database/migrate/migrations/00027_ blob_upload.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE blob_upload (
batch_index BIGINT NOT NULL,

platform TEXT NOT NULL,
status SMALLINT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT now(),

PRIMARY KEY (batch_index, platform),
FOREIGN KEY (batch_index) REFERENCES batch(index)
);

COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed';

-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP TABLE blob_upload;
-- +goose StatementEnd
20 changes: 20 additions & 0 deletions database/migrate/migrations/00028_add_blob_upload_indexes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +goose Up
-- +goose StatementBegin

-- Add index on status for faster filtering by status
CREATE INDEX idx_blob_upload_status ON blob_upload(status);

-- Add index on updated_at for faster sorting and filtering by time
CREATE INDEX idx_blob_upload_updated_at ON blob_upload(updated_at);

-- Add index on (batch_index, status) for faster filtering by both fields
CREATE INDEX idx_blob_upload_batch_index_status ON blob_upload(batch_index, status);

-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP INDEX IF EXISTS idx_blob_upload_status;
DROP INDEX IF EXISTS idx_blob_upload_updated_at;
DROP INDEX IF EXISTS idx_blob_upload_batch_index_status;
-- +goose StatementEnd
3 changes: 2 additions & 1 deletion go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,8 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.30.6/go.mod h1:PudwVKUTApfm0nYaPutOXa
github.com/aws/aws-sdk-go-v2/service/sso v1.1.1/go.mod h1:SuZJxklHxLAXgLTc1iFXbEWkXs7QRTQpCLGaKIprQW0=
github.com/aws/aws-sdk-go-v2/service/sts v1.1.1/go.mod h1:Wi0EBZwiz/K44YliU0EKxqTCJGUfYTWXrrBwkq736bM=
github.com/aws/smithy-go v1.1.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB9LgIw=
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -1357,7 +1359,6 @@ github.com/scroll-tech/da-codec v0.1.1-0.20241014152913-2703f226fb0b h1:5H6V6yba
github.com/scroll-tech/da-codec v0.1.1-0.20241014152913-2703f226fb0b/go.mod h1:48uxaqVgpD8ulH8p+nrBtfeLHZ9tX82bVVdPNkW3rPE=
github.com/scroll-tech/da-codec v0.1.3-0.20250227072756-a1482833595f h1:YYbhuUwjowqI4oyXtECRofck7Fyj18e1tcRjuQlZpJE=
github.com/scroll-tech/da-codec v0.1.3-0.20250227072756-a1482833595f/go.mod h1:xECEHZLVzbdUn+tNbRJhRIjLGTOTmnFQuTgUTeVLX58=
github.com/scroll-tech/da-codec v0.1.3-0.20250519114140-bfa7133d4ad1/go.mod h1:yhTS9OVC0xQGhg7DN5iV5KZJvnSIlFWAxDdp+6jxQtY=
github.com/scroll-tech/go-ethereum v1.10.14-0.20240607130425-e2becce6a1a4/go.mod h1:byf/mZ8jLYUCnUePTicjJWn+RvKdxDn7buS6glTnMwQ=
github.com/scroll-tech/go-ethereum v1.10.14-0.20240821074444-b3fa00861e5e/go.mod h1:swB5NSp8pKNDuYsTxfR08bHS6L56i119PBx8fxvV8Cs=
github.com/scroll-tech/go-ethereum v1.10.14-0.20241010064814-3d88e870ae22/go.mod h1:r9FwtxCtybMkTbWYCyBuevT9TW3zHmOTHqD082Uh+Oo=
Expand Down
154 changes: 154 additions & 0 deletions rollup/cmd/blob_uploader/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package app

import (
"context"
"fmt"
"os"
"os/signal"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/scroll-tech/da-codec/encoding"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/urfave/cli/v2"

"scroll-tech/common/database"
"scroll-tech/common/observability"
"scroll-tech/common/utils"
"scroll-tech/common/version"

"scroll-tech/rollup/internal/config"
"scroll-tech/rollup/internal/controller/relayer"
"scroll-tech/rollup/internal/controller/watcher"
rutils "scroll-tech/rollup/internal/utils"
)

var app *cli.App

func init() {
// Set up blob-uploader app info.
app = cli.NewApp()
app.Action = action
app.Name = "blob-uploader"
app.Usage = "The Scroll Blob Uploader"
app.Version = version.Version
app.Flags = append(app.Flags, utils.CommonFlags...)
app.Flags = append(app.Flags, utils.RollupRelayerFlags...)
app.Commands = []*cli.Command{}
app.Before = func(ctx *cli.Context) error {
return utils.LogSetup(ctx)
}
// Register `rollup-relayer-test` app for integration-test.
utils.RegisterSimulation(app, utils.RollupRelayerApp)
}

func action(ctx *cli.Context) error {
// Load config file.
cfgFile := ctx.String(utils.ConfigFileFlag.Name)
cfg, err := config.NewConfig(cfgFile)
if err != nil {
log.Crit("failed to load config file", "config file", cfgFile, "error", err)
}

subCtx, cancel := context.WithCancel(ctx.Context)
// Init db connection
db, err := database.InitDB(cfg.DBConfig)
if err != nil {
log.Crit("failed to init db connection", "err", err)
}
defer func() {
cancel()
if err = database.CloseDB(db); err != nil {
log.Crit("failed to close db connection", "error", err)
}
}()

registry := prometheus.DefaultRegisterer
observability.Server(ctx, db)

// Init l2geth connection
l2client, err := ethclient.Dial(cfg.L2Config.Endpoint)
if err != nil {
log.Crit("failed to connect l2 geth", "config file", cfgFile, "error", err)
}

genesisPath := ctx.String(utils.Genesis.Name)
genesis, err := utils.ReadGenesis(genesisPath)
if err != nil {
log.Crit("failed to read genesis", "genesis file", genesisPath, "error", err)
}

// sanity check config
if cfg.L2Config.RelayerConfig.BatchSubmission == nil {
log.Crit("cfg.L2Config.RelayerConfig.BatchSubmission must not be nil")
}
if cfg.L2Config.RelayerConfig.BatchSubmission.MinBatches < 1 {
log.Crit("cfg.L2Config.RelayerConfig.SenderConfig.BatchSubmission.MinBatches must be at least 1")
}
if cfg.L2Config.RelayerConfig.BatchSubmission.MaxBatches < 1 {
log.Crit("cfg.L2Config.RelayerConfig.SenderConfig.BatchSubmission.MaxBatches must be at least 1")
}
if cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch <= 0 {
log.Crit("cfg.L2Config.BatchProposerConfig.MaxChunksPerBatch must be greater than 0")
}
if cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk <= 0 {
log.Crit("cfg.L2Config.ChunkProposerConfig.MaxL2GasPerChunk must be greater than 0")
}

l2relayer, err := relayer.NewLayer2Relayer(ctx.Context, l2client, db, cfg.L2Config.RelayerConfig, genesis.Config, relayer.ServiceTypeL2RollupRelayer, registry)
if err != nil {
log.Crit("failed to create l2 relayer", "config file", cfgFile, "error", err)
}

minCodecVersion := encoding.CodecVersion(ctx.Uint(utils.MinCodecVersionFlag.Name))
if minCodecVersion < encoding.CodecV7 {
log.Crit("min codec version must be greater than or equal to CodecV7", "minCodecVersion", minCodecVersion)
}

chunkProposer := watcher.NewChunkProposer(subCtx, cfg.L2Config.ChunkProposerConfig, minCodecVersion, genesis.Config, db, registry)
batchProposer := watcher.NewBatchProposer(subCtx, cfg.L2Config.BatchProposerConfig, minCodecVersion, genesis.Config, db, registry)
bundleProposer := watcher.NewBundleProposer(subCtx, cfg.L2Config.BundleProposerConfig, minCodecVersion, genesis.Config, db, registry)

l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, genesis.Config, db, registry)

// Watcher loop to fetch missing blocks
go utils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) {
number, loopErr := rutils.GetLatestConfirmedBlockNumber(ctx, l2client, cfg.L2Config.Confirmations)
if loopErr != nil {
log.Error("failed to get block number", "err", loopErr)
return
}
l2watcher.TryFetchRunningMissingBlocks(number)
})

go utils.Loop(subCtx, time.Duration(cfg.L2Config.ChunkProposerConfig.ProposeIntervalMilliseconds)*time.Millisecond, chunkProposer.TryProposeChunk)

go utils.Loop(subCtx, time.Duration(cfg.L2Config.BatchProposerConfig.ProposeIntervalMilliseconds)*time.Millisecond, batchProposer.TryProposeBatch)

go utils.Loop(subCtx, 10*time.Second, bundleProposer.TryProposeBundle)

go utils.Loop(subCtx, 2*time.Second, l2relayer.ProcessPendingBatches)

go utils.Loop(subCtx, 15*time.Second, l2relayer.ProcessPendingBundles)

// Finish start all blob-uploader functions.
log.Info("Start blob-uploader successfully", "version", version.Version)

// Catch CTRL-C to ensure a graceful shutdown.
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)

// Wait until the interrupt signal is received from an OS signal.
<-interrupt

return nil
}

// Run rollup relayer cmd instance.
func Run() {
if err := app.Run(os.Args); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
7 changes: 7 additions & 0 deletions rollup/cmd/blob_uploader/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "scroll-tech/rollup/cmd/rollup_relayer/app"

func main() {
app.Run()
}
18 changes: 18 additions & 0 deletions rollup/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@ require (

require (
github.com/VictoriaMetrics/fastcache v1.12.2 // indirect
github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
github.com/aws/aws-sdk-go-v2/config v1.29.14 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.80.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.25.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect
github.com/aws/smithy-go v1.22.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.20.0 // indirect
github.com/btcsuite/btcd v0.20.1-beta // indirect
Expand Down
Loading
Loading