Skip to content

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gupadhyaya committed Oct 4, 2024
1 parent 9a44b84 commit 5e91f4b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 34 deletions.
45 changes: 19 additions & 26 deletions sequencing/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"

Expand Down Expand Up @@ -48,8 +46,8 @@ func NewBatchQueue() *BatchQueue {
// AddBatch adds a new transaction to the queue
func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error {
bq.mu.Lock()
defer bq.mu.Unlock()
bq.queue = append(bq.queue, batch)
bq.mu.Unlock()

// Get the hash and bytes of the batch
h, err := batch.Hash()
Expand All @@ -72,6 +70,8 @@ func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error {

// Next extracts a batch of transactions from the queue
func (bq *BatchQueue) Next(db *badger.DB) (*sequencing.Batch, error) {
bq.mu.Lock()
defer bq.mu.Unlock()
if len(bq.queue) == 0 {
return &sequencing.Batch{Transactions: nil}, nil
}
Expand Down Expand Up @@ -163,8 +163,8 @@ func GetTransactionHash(txBytes []byte) string {
// AddTransaction adds a new transaction to the queue
func (tq *TransactionQueue) AddTransaction(tx sequencing.Tx, db *badger.DB) error {
tq.mu.Lock()
defer tq.mu.Unlock()
tq.queue = append(tq.queue, tx)
tq.mu.Unlock()

// Store transaction in BadgerDB
err := db.Update(func(txn *badger.Txn) error {
Expand Down Expand Up @@ -269,17 +269,6 @@ func totalBytes(data [][]byte) int {
return total
}

func getDefaultDBPath() (string, error) {
// Get user's home directory
homeDir, err := os.UserHomeDir()
if err != nil {
return "", err
}

// Cross-platform default DB path
return filepath.Join(homeDir, "centralized-sequencer", "db"), nil
}

// Sequencer implements go-sequencing interface using celestia backend
type Sequencer struct {
dalc *da.DAClient
Expand Down Expand Up @@ -314,16 +303,14 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t
return nil, err
}

// Check db path
// Initialize BadgerDB
var opts badger.Options
if dbPath == "" {
dbPath, err = getDefaultDBPath()
if err != nil {
return nil, fmt.Errorf("failed to get default DB path: %w", err)
}
opts = badger.DefaultOptions("").WithInMemory(true)
} else {
opts = badger.DefaultOptions(dbPath)
}

// Initialize BadgerDB
opts := badger.DefaultOptions(dbPath)
opts = opts.WithLogger(nil)
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("failed to open BadgerDB: %w", err)
Expand All @@ -337,7 +324,7 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t
tq: NewTransactionQueue(),
bq: NewBatchQueue(),
seenBatches: make(map[string]struct{}),
db: db, // BadgerDB instance
db: db,
}

// Load last batch hash from DB to recover from crash
Expand Down Expand Up @@ -369,7 +356,10 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t
// Close safely closes the BadgerDB instance if it is open
func (c *Sequencer) Close() error {
if c.db != nil {
return c.db.Close()
err := c.db.Close()
if err != nil {
return fmt.Errorf("failed to close BadgerDB: %w", err)
}
}
return nil
}
Expand Down Expand Up @@ -398,6 +388,9 @@ func (c *Sequencer) LoadLastBatchHashFromDB() error {
c.lastBatchHash = nil
return nil
}
if err != nil {
return err
}
// Set lastBatchHash in memory from BadgerDB
return item.Value(func(val []byte) error {
hash = val
Expand Down Expand Up @@ -591,7 +584,7 @@ func (c *Sequencer) SubmitRollupTransaction(ctx context.Context, req sequencing.
}
err := c.tq.AddTransaction(req.Tx, c.db)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to add transaction: %w", err)
}
return &sequencing.SubmitRollupTransactionResponse{}, nil
}
Expand Down
26 changes: 18 additions & 8 deletions sequencing/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@ func startMockDAServJSONRPC(ctx context.Context, da_address string) (*proxy.Serv
}

func TestNewSequencer(t *testing.T) {
// Mock DA client
// mockDAClient := new(da.DAClient)

// Create a new sequencer with mock DA client
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 10*time.Second, "")
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 1*time.Second, "")
require.NoError(t, err)
defer func() {
err := seq.Close()
Expand All @@ -70,13 +67,12 @@ func TestNewSequencer(t *testing.T) {

func TestSequencer_SubmitRollupTransaction(t *testing.T) {
// Initialize a new sequencer
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 10*time.Second, "")
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 1*time.Second, "")
require.NoError(t, err)
defer func() {
err := seq.Close()
require.NoError(t, err)
}()

// Test with initial rollup ID
rollupId := []byte("rollup1")
tx := []byte("transaction1")
Expand All @@ -96,10 +92,14 @@ func TestSequencer_SubmitRollupTransaction(t *testing.T) {

func TestSequencer_GetNextBatch_NoLastBatch(t *testing.T) {
// Initialize a new sequencer
db, err := getDB()
require.NoError(t, err)

seq := &Sequencer{
bq: NewBatchQueue(),
seenBatches: make(map[string]struct{}),
rollupId: []byte("rollup"),
db: db,
}
defer func() {
err := seq.Close()
Expand All @@ -114,12 +114,15 @@ func TestSequencer_GetNextBatch_NoLastBatch(t *testing.T) {
}

func TestSequencer_GetNextBatch_LastBatchMismatch(t *testing.T) {
db, err := getDB()
require.NoError(t, err)
// Initialize a new sequencer with a mock batch
seq := &Sequencer{
lastBatchHash: []byte("existingHash"),
bq: NewBatchQueue(),
seenBatches: make(map[string]struct{}),
rollupId: []byte("rollup"),
db: db,
}
defer func() {
err := seq.Close()
Expand All @@ -133,12 +136,16 @@ func TestSequencer_GetNextBatch_LastBatchMismatch(t *testing.T) {
}

func TestSequencer_GetNextBatch_LastBatchNilMismatch(t *testing.T) {
db, err := getDB()
require.NoError(t, err)

// Initialize a new sequencer
seq := &Sequencer{
lastBatchHash: []byte("existingHash"),
bq: NewBatchQueue(),
seenBatches: make(map[string]struct{}),
rollupId: []byte("rollup"),
db: db,
}
defer func() {
err := seq.Close()
Expand All @@ -152,8 +159,8 @@ func TestSequencer_GetNextBatch_LastBatchNilMismatch(t *testing.T) {
}

func getDB() (*badger.DB, error) {
dbPath := "test_db"
opts := badger.DefaultOptions(dbPath)
opts := badger.DefaultOptions("").WithInMemory(true)
opts = opts.WithLogger(nil)
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("failed to open BadgerDB: %w", err)
Expand Down Expand Up @@ -196,10 +203,13 @@ func TestSequencer_GetNextBatch_Success(t *testing.T) {
}

func TestSequencer_VerifyBatch(t *testing.T) {
db, err := getDB()
require.NoError(t, err)
// Initialize a new sequencer with a seen batch
seq := &Sequencer{
seenBatches: make(map[string]struct{}),
rollupId: []byte("rollup"),
db: db,
}
defer func() {
err := seq.Close()
Expand Down

0 comments on commit 5e91f4b

Please sign in to comment.