Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions input/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type ChainSync struct {
bulkRangeStart ocommon.Point
bulkRangeEnd ocommon.Point
cursorCache []ocommon.Point
dialAddress string
dialFamily string
}

type ChainSyncStatus struct {
Expand Down Expand Up @@ -152,7 +154,6 @@ func (c *ChainSync) OutputChan() <-chan event.Event {
func (c *ChainSync) setupConnection() error {
// Determine connection parameters
var useNtn bool
var dialFamily, dialAddress string
// Lookup network by name, if provided
if c.network != "" {
network := ouroboros.NetworkByName(c.network)
Expand All @@ -162,8 +163,8 @@ func (c *ChainSync) setupConnection() error {
c.networkMagic = network.NetworkMagic
// If network has well-known public root address/port, use those as our dial default
if network.PublicRootAddress != "" && network.PublicRootPort > 0 {
dialFamily = "tcp"
dialAddress = fmt.Sprintf(
c.dialFamily = "tcp"
c.dialAddress = fmt.Sprintf(
"%s:%d",
network.PublicRootAddress,
network.PublicRootPort,
Expand All @@ -173,18 +174,18 @@ func (c *ChainSync) setupConnection() error {
}
// Use user-provided address or socket path, if provided
if c.address != "" {
dialFamily = "tcp"
dialAddress = c.address
c.dialFamily = "tcp"
c.dialAddress = c.address
if c.ntcTcp {
useNtn = false
} else {
useNtn = true
}
} else if c.socketPath != "" {
dialFamily = "unix"
dialAddress = c.socketPath
c.dialFamily = "unix"
c.dialAddress = c.socketPath
useNtn = false
} else if dialFamily == "" || dialAddress == "" {
} else if c.dialFamily == "" || c.dialAddress == "" {
return fmt.Errorf("you must specify a host/port, UNIX socket path, or well-known network name")
}
// Create connection
Expand All @@ -208,31 +209,37 @@ func (c *ChainSync) setupConnection() error {
if err != nil {
return err
}
if err := c.oConn.Dial(dialFamily, dialAddress); err != nil {
if err := c.oConn.Dial(c.dialFamily, c.dialAddress); err != nil {
return err
}
if c.logger != nil {
c.logger.Infof("connected to node at %s", dialAddress)
c.logger.Infof("connected to node at %s", c.dialAddress)
}
// Start async error handler
go func() {
err, ok := <-c.oConn.ErrorChan()
if ok {
if c.autoReconnect {
if c.logger != nil {
c.logger.Infof("reconnecting to %s due to error: %s", dialAddress, err)
c.logger.Infof("reconnecting to %s due to error: %s", c.dialAddress, err)
}
// Shutdown current connection
if err := c.oConn.Close(); err != nil {
c.errorChan <- err
return
}
// Set the intersect points from the cursor cache
c.intersectPoints = c.cursorCache[:]
// Restart the connection
if err := c.Start(); err != nil {
c.errorChan <- err
return
for {
// Shutdown current connection
if err := c.oConn.Close(); err != nil {
if c.logger != nil {
c.logger.Warnf("failed to properly close connection: %s", err)
}
}
// Set the intersect points from the cursor cache
c.intersectPoints = c.cursorCache[:]
// Restart the connection
if err := c.Start(); err != nil {
if c.logger != nil {
c.logger.Infof("reconnecting to %s due to error: %s", c.dialAddress, err)
}
continue
}
break
}
} else {
// Pass error through our own error channel
Expand Down