Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions cmd/snek/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,20 @@ func main() {

// Start debug listener
if cfg.Debug.ListenPort > 0 {
logger.Infof("starting debug listener on %s:%d", cfg.Debug.ListenAddress, cfg.Debug.ListenPort)
logger.Infof(
"starting debug listener on %s:%d",
cfg.Debug.ListenAddress,
cfg.Debug.ListenPort,
)
go func() {
err := http.ListenAndServe(fmt.Sprintf("%s:%d", cfg.Debug.ListenAddress, cfg.Debug.ListenPort), nil)
err := http.ListenAndServe(
fmt.Sprintf(
"%s:%d",
cfg.Debug.ListenAddress,
cfg.Debug.ListenPort,
),
nil,
)
if err != nil {
logger.Fatalf("failed to start debug listener: %s", err)
}
Expand Down
6 changes: 5 additions & 1 deletion event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ type Event struct {
Payload interface{} `json:"payload"`
}

func New(eventType string, timestamp time.Time, context, payload interface{}) Event {
func New(
eventType string,
timestamp time.Time,
context, payload interface{},
) Event {
return Event{
Type: eventType,
Timestamp: timestamp,
Expand Down
5 changes: 4 additions & 1 deletion fcm/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func NewMessage(token string, opts ...MessageOption) *Message {

func Send(accessToken string, projectId string, msg *Message) error {

fcmEndpoint := fmt.Sprintf("https://fcm.googleapis.com/v1/projects/%s/messages:send", projectId)
fcmEndpoint := fmt.Sprintf(
"https://fcm.googleapis.com/v1/projects/%s/messages:send",
projectId,
)

// Convert the message to JSON
payload, err := json.Marshal(msg)
Expand Down
63 changes: 54 additions & 9 deletions input/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ func (c *ChainSync) Start() error {
}
if c.bulkMode && !c.intersectTip && c.oConn.BlockFetch() != nil {
var err error
c.bulkRangeStart, c.bulkRangeEnd, err = c.oConn.ChainSync().Client.GetAvailableBlockRange(c.intersectPoints)
c.bulkRangeStart, c.bulkRangeEnd, err = c.oConn.ChainSync().Client.GetAvailableBlockRange(
c.intersectPoints,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -142,7 +144,11 @@ func (c *ChainSync) setupConnection() error {
// If network has well-known public root address/port, use those as our dial default
if network.PublicRootAddress != "" && network.PublicRootPort > 0 {
dialFamily = "tcp"
dialAddress = fmt.Sprintf("%s:%d", network.PublicRootAddress, network.PublicRootPort)
dialAddress = fmt.Sprintf(
"%s:%d",
network.PublicRootAddress,
network.PublicRootPort,
)
useNtn = true
}
}
Expand Down Expand Up @@ -199,13 +205,25 @@ func (c *ChainSync) setupConnection() error {
return nil
}

func (c *ChainSync) handleRollBackward(point ocommon.Point, tip ochainsync.Tip) error {
evt := event.New("chainsync.rollback", time.Now(), nil, NewRollbackEvent(point))
func (c *ChainSync) handleRollBackward(
point ocommon.Point,
tip ochainsync.Tip,
) error {
evt := event.New(
"chainsync.rollback",
time.Now(),
nil,
NewRollbackEvent(point),
)
c.eventChan <- evt
return nil
}

func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip ochainsync.Tip) error {
func (c *ChainSync) handleRollForward(
blockType uint,
blockData interface{},
tip ochainsync.Tip,
) error {
switch v := blockData.(type) {
case ledger.Block:
evt := event.New("chainsync.block", time.Now(), NewBlockContext(v, c.networkMagic), NewBlockEvent(v, c.includeCbor))
Expand All @@ -230,13 +248,34 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip
}

func (c *ChainSync) handleBlockFetchBlock(block ledger.Block) error {
blockEvt := event.New("chainsync.block", time.Now(), NewBlockContext(block, c.networkMagic), NewBlockEvent(block, c.includeCbor))
blockEvt := event.New(
"chainsync.block",
time.Now(),
NewBlockContext(block, c.networkMagic),
NewBlockEvent(block, c.includeCbor),
)
c.eventChan <- blockEvt
for t, transaction := range block.Transactions() {
txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionContext(block, transaction, uint32(t), c.networkMagic), NewTransactionEvent(block, transaction, c.includeCbor))
txEvt := event.New(
"chainsync.transaction",
time.Now(),
NewTransactionContext(
block,
transaction,
uint32(t),
c.networkMagic,
),
NewTransactionEvent(block, transaction, c.includeCbor),
)
c.eventChan <- txEvt
}
c.updateStatus(block.SlotNumber(), block.BlockNumber(), block.Hash(), c.bulkRangeEnd.Slot, hex.EncodeToString(c.bulkRangeEnd.Hash))
c.updateStatus(
block.SlotNumber(),
block.BlockNumber(),
block.Hash(),
c.bulkRangeEnd.Slot,
hex.EncodeToString(c.bulkRangeEnd.Hash),
)
// Start normal chain-sync if we've reached the last block of our bulk range
if block.SlotNumber() == c.bulkRangeEnd.Slot {
if err := c.oConn.ChainSync().Client.Sync([]ocommon.Point{c.bulkRangeEnd}); err != nil {
Expand All @@ -246,7 +285,13 @@ func (c *ChainSync) handleBlockFetchBlock(block ledger.Block) error {
return nil
}

func (c *ChainSync) updateStatus(slotNumber uint64, blockNumber uint64, blockHash string, tipSlotNumber uint64, tipBlockHash string) {
func (c *ChainSync) updateStatus(
slotNumber uint64,
blockNumber uint64,
blockHash string,
tipSlotNumber uint64,
tipBlockHash string,
) {
// Determine if we've reached the chain tip
if !c.status.TipReached {
// Make sure we're past the end slot in any bulk range, since we don't update the tip during bulk sync
Expand Down
4 changes: 3 additions & 1 deletion input/chainsync/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ func WithIncludeCbor(includeCbor bool) ChainSyncOptionFunc {

// WithStatusUpdateFunc specifies a callback function for status updates. This is useful for tracking the chain-sync status
// to be able to resume a sync at a later time, especially when any filtering could prevent you from getting all block update events
func WithStatusUpdateFunc(statusUpdateFunc StatusUpdateFunc) ChainSyncOptionFunc {
func WithStatusUpdateFunc(
statusUpdateFunc StatusUpdateFunc,
) ChainSyncOptionFunc {
return func(c *ChainSync) {
c.statusUpdateFunc = statusUpdateFunc
}
Expand Down
6 changes: 5 additions & 1 deletion input/chainsync/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ func NewFromCmdlineOptions() plugin.Plugin {
if len(intersectPointParts) != 2 {
panic("invalid intersect point format")
}
intersectSlot, err := strconv.ParseUint(intersectPointParts[0], 10, 64)
intersectSlot, err := strconv.ParseUint(
intersectPointParts[0],
10,
64,
)
if err != nil {
panic("invalid intersect point format")
}
Expand Down
13 changes: 11 additions & 2 deletions input/chainsync/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ type TransactionEvent struct {
TTL uint64 `json:"ttl,omitempty"`
}

func NewTransactionContext(block ledger.Block, tx ledger.Transaction, index uint32, networkMagic uint32) TransactionContext {
func NewTransactionContext(
block ledger.Block,
tx ledger.Transaction,
index uint32,
networkMagic uint32,
) TransactionContext {
ctx := TransactionContext{
BlockNumber: block.BlockNumber(),
SlotNumber: block.SlotNumber(),
Expand All @@ -48,7 +53,11 @@ func NewTransactionContext(block ledger.Block, tx ledger.Transaction, index uint
return ctx
}

func NewTransactionEvent(block ledger.Block, tx ledger.Transaction, includeCbor bool) TransactionEvent {
func NewTransactionEvent(
block ledger.Block,
tx ledger.Transaction,
includeCbor bool,
) TransactionEvent {
evt := TransactionEvent{
BlockHash: block.Hash(),
Inputs: tx.Inputs(),
Expand Down
20 changes: 15 additions & 5 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Config struct {
Version bool `yaml:"-"`
Logging LoggingConfig `yaml:"logging"`
Debug DebugConfig `yaml:"debug"`
Input string `yaml:"input" envconfig:"INPUT"`
Output string `yaml:"output" envconfig:"OUTPUT"`
Input string `yaml:"input" envconfig:"INPUT"`
Output string `yaml:"output" envconfig:"OUTPUT"`
Plugin map[string]map[string]map[interface{}]interface{} `yaml:"plugins"`
}

Expand All @@ -46,7 +46,7 @@ type LoggingConfig struct {

type DebugConfig struct {
ListenAddress string `yaml:"address" envconfig:"DEBUG_ADDRESS"`
ListenPort uint `yaml:"port" envconfig:"DEBUG_PORT"`
ListenPort uint `yaml:"port" envconfig:"DEBUG_PORT"`
}

// Singleton config instance with default values
Expand Down Expand Up @@ -88,8 +88,18 @@ func (c *Config) ParseCmdlineArgs(programName string, args []string) error {
fs := flag.NewFlagSet(programName, flag.ExitOnError)
fs.StringVar(&c.ConfigFile, "config", "", "path to config file to load")
fs.BoolVar(&c.Version, "version", false, "show version and exit")
fs.StringVar(&c.Input, "input", DefaultInputPlugin, "input plugin to use, 'list' to show available")
fs.StringVar(&c.Output, "output", DefaultOutputPlugin, "output plugin to use, 'list' to show available")
fs.StringVar(
&c.Input,
"input",
DefaultInputPlugin,
"input plugin to use, 'list' to show available",
)
fs.StringVar(
&c.Output,
"output",
DefaultOutputPlugin,
"output plugin to use, 'list' to show available",
)
if err := plugin.PopulateCmdlineOptions(fs); err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions internal/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func Configure() {
// Change timestamp key name
loggerConfig.EncoderConfig.TimeKey = "timestamp"
// Use a human readable time format
loggerConfig.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout(time.RFC3339)
loggerConfig.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout(
time.RFC3339,
)

// Set level
if cfg.Logging.Level != "" {
Expand Down Expand Up @@ -65,5 +67,7 @@ func GetDesugaredLogger() *zap.Logger {
}

func GetAccessLogger() *zap.Logger {
return globalLogger.Desugar().With(zap.String("type", "access")).WithOptions(zap.WithCaller(false))
return globalLogger.Desugar().
With(zap.String("type", "access")).
WithOptions(zap.WithCaller(false))
}
6 changes: 4 additions & 2 deletions output/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func (n *NotifyOutput) Start() error {
bc := context.(chainsync.BlockContext)
err := beeep.Notify(
n.title,
fmt.Sprintf("New Block!\nBlockNumber: %d, SlotNumber: %d\nHash: %s",
fmt.Sprintf(
"New Block!\nBlockNumber: %d, SlotNumber: %d\nHash: %s",
bc.BlockNumber,
bc.SlotNumber,
be.BlockHash,
Expand Down Expand Up @@ -106,7 +107,8 @@ func (n *NotifyOutput) Start() error {
tc := context.(chainsync.TransactionContext)
err := beeep.Notify(
n.title,
fmt.Sprintf("New Transaction!\nBlockNumber: %d, SlotNumber: %d\nInputs: %d, Outputs: %d\nFee: %d\nHash: %s",
fmt.Sprintf(
"New Transaction!\nBlockNumber: %d, SlotNumber: %d\nInputs: %d, Outputs: %d\nFee: %d\nHash: %s",
tc.BlockNumber,
tc.SlotNumber,
len(te.Inputs),
Expand Down
27 changes: 18 additions & 9 deletions output/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func (p *PushOutput) Start() error {
be := payload.(chainsync.BlockEvent)
bc := context.(chainsync.BlockContext)
fmt.Println("Snek")
fmt.Printf("New Block!\nBlockNumber: %d, SlotNumber: %d\nHash: %s",
fmt.Printf(
"New Block!\nBlockNumber: %d, SlotNumber: %d\nHash: %s",
bc.BlockNumber,
bc.SlotNumber,
be.BlockHash,
Expand Down Expand Up @@ -124,7 +125,8 @@ func (p *PushOutput) Start() error {

// Create notification message
title := "Snek"
body := fmt.Sprintf("New Transaction!\nBlockNumber: %d, SlotNumber: %d\nInputs: %d, Outputs: %d\nFee: %d\nHash: %s",
body := fmt.Sprintf(
"New Transaction!\nBlockNumber: %d, SlotNumber: %d\nInputs: %d, Outputs: %d\nFee: %d\nHash: %s",
tc.BlockNumber,
tc.SlotNumber,
len(te.Inputs),
Expand Down Expand Up @@ -167,7 +169,8 @@ func (p *PushOutput) processFcmNotifications(title, body string) {

// If no FCM tokens exist, log and exit
if len(p.fcmTokens) == 0 {
logging.GetLogger().Warnln("No FCM tokens found. Skipping notification.")
logging.GetLogger().
Warnln("No FCM tokens found. Skipping notification.")
return
}

Expand All @@ -179,23 +182,27 @@ func (p *PushOutput) processFcmNotifications(title, body string) {
)

if err := fcm.Send(p.accessToken, p.projectID, msg); err != nil {
logging.GetLogger().Errorf("Failed to send message to token %s: %v", fcmToken, err)
logging.GetLogger().
Errorf("Failed to send message to token %s: %v", fcmToken, err)
continue
}
logging.GetLogger().Infof("Message sent successfully to token %s!", fcmToken)
logging.GetLogger().
Infof("Message sent successfully to token %s!", fcmToken)
}
}

func (p *PushOutput) GetAccessToken() error {
data, err := os.ReadFile(p.serviceAccountFilePath)
if err != nil {
logging.GetLogger().Fatalf("Failed to read the credential file: %v", err)
logging.GetLogger().
Fatalf("Failed to read the credential file: %v", err)
return err
}

conf, err := google.JWTConfigFromJSON(data, p.accessTokenUrl)
if err != nil {
logging.GetLogger().Fatalf("Failed to parse the credential file: %v", err)
logging.GetLogger().
Fatalf("Failed to parse the credential file: %v", err)
return err
}

Expand All @@ -214,14 +221,16 @@ func (p *PushOutput) GetAccessToken() error {
func (p *PushOutput) GetProjectId() error {
data, err := os.ReadFile(p.serviceAccountFilePath)
if err != nil {
logging.GetLogger().Fatalf("Failed to read the credential file: %v", err)
logging.GetLogger().
Fatalf("Failed to read the credential file: %v", err)
return err
}

// Get project ID from file
var v map[string]any
if err := json.Unmarshal(data, &v); err != nil {
logging.GetLogger().Fatalf("Failed to parse the credential file: %v", err)
logging.GetLogger().
Fatalf("Failed to parse the credential file: %v", err)
return err
}
p.projectID = v["project_id"].(string)
Expand Down
12 changes: 10 additions & 2 deletions output/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,20 @@ func (w *WebhookOutput) SendWebhook(e *event.Event) error {
// Setup request
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, w.url, bytes.NewReader(data))
req, err := http.NewRequestWithContext(
ctx,
http.MethodPost,
w.url,
bytes.NewReader(data),
)
if err != nil {
return fmt.Errorf("%s", err)
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("User-Agent", fmt.Sprintf("Snek/%s", version.GetVersionString()))
req.Header.Add(
"User-Agent",
fmt.Sprintf("Snek/%s", version.GetVersionString()),
)

// Setup authorization
if w.username != "" && w.password != "" {
Expand Down
Loading