Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd | node: Refactor core flags #910

Merged
merged 6 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 48 additions & 40 deletions cmd/flags_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package cmd

import (
"fmt"
"net"
"net/url"
"strconv"
"strings"

"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
Expand All @@ -12,70 +12,78 @@ import (
)

var (
coreRemoteFlag = "core.remote"
coreGRPCFlag = "core.grpc"
coreFlag = "core.ip"
coreRPCFlag = "core.rpc.port"
coreGRPCFlag = "core.grpc.port"
)

// CoreFlags gives a set of hardcoded Core flags.
func CoreFlags() *flag.FlagSet {
flags := &flag.FlagSet{}

flags.String(
coreRemoteFlag,
coreFlag,
"",
"Indicates node to connect to the given remote core node. "+
"Example: <protocol>://<ip>:<port>, tcp://127.0.0.1:26657",
"Indicates node to connect to the given core node. "+
"Example: <ip>, 127.0.0.1. Assumes RPC port 26657 and gRPC port 9009 as default unless otherwise specified.",
)
flags.String(
coreRPCFlag,
"26657",
"Set a custom RPC port for the core node connection. The --core.ip flag must also be provided.",
)
flags.String(
coreGRPCFlag,
"",
"Indicates node to connect to the given celestia-app gRPC endpoint for state-related queries"+
"Example: <ip>:<port>, 127.0.0.1:9090",
"9090",
"Set a custom gRPC port for the core node connection. The --core.ip flag must also be provided.",
)

return flags
}

// ParseCoreFlags parses Core flags from the given cmd and applies values to Env.
func ParseCoreFlags(cmd *cobra.Command, env *Env) error {
coreRemote := cmd.Flag(coreRemoteFlag).Value.String()
if coreRemote != "" {
proto, addr, err := validateAddress(coreRemote)
if err != nil {
return fmt.Errorf("cmd: while parsing '%s': %w", coreRemoteFlag, err)
coreIP := cmd.Flag(coreFlag).Value.String()
if coreIP == "" {
if cmd.Flag(coreGRPCFlag).Changed || cmd.Flag(coreRPCFlag).Changed {
return fmt.Errorf("cannot specify RPC/gRPC ports without specifying an IP address for --core.ip")
}

env.AddOptions(node.WithRemoteCore(proto, addr))
return nil
}

grpcEndpoint := cmd.Flag(coreGRPCFlag).Value.String()
if grpcEndpoint != "" {
// parse ip:port from the endpoint
grpcURL, err := url.Parse(grpcEndpoint)
if err != nil {
return err
}
env.AddOptions(node.WithGRPCEndpoint(grpcURL.Host))
// sanity check given core ip addr and strip leading protocol
ip, err := sanityCheckIP(coreIP)
if err != nil {
return err
}

return nil
}

// validateAddress parses the given address of the remote core node
// and checks if it configures correctly
func validateAddress(address string) (string, string, error) {
u, err := url.Parse(address)
rpc := cmd.Flag(coreRPCFlag).Value.String()
// sanity check rpc endpoint
_, err = strconv.Atoi(rpc)
if err != nil {
return "", "", err
return err
}
env.AddOptions(node.WithRemoteCoreIP(ip), node.WithRemoteCorePort(rpc))

if u.Scheme == "" || u.Host == "" {
return "", "", fmt.Errorf("both protocol and host must present in the address")
grpc := cmd.Flag(coreGRPCFlag).Value.String()
// sanity check gRPC endpoint
_, err = strconv.Atoi(grpc)
if err != nil {
return err
}
env.AddOptions(node.WithGRPCPort(grpc))
return nil
}

if _, _, err := net.SplitHostPort(u.Host); err != nil {
return "", "", fmt.Errorf("incorrect address provided for Remote Core")
// sanityCheckIP trims leading protocol scheme and port from the given
// IP address if present.
func sanityCheckIP(ip string) (string, error) {
original := ip
ip = strings.TrimPrefix(ip, "http://")
ip = strings.TrimPrefix(ip, "https://")
ip = strings.TrimPrefix(ip, "tcp://")
ip = strings.Split(ip, ":")[0]
if ip == "" {
return "", fmt.Errorf("invalid IP addr given: %s", original)
}

return u.Scheme, u.Host, nil
return ip, nil
}
4 changes: 2 additions & 2 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
type Client = client.Client

// NewRemote creates a new Client that communicates with a remote Core endpoint over HTTP.
func NewRemote(protocol, remoteAddr string) (Client, error) {
func NewRemote(ip, port string) (Client, error) {
httpClient := retryhttp.NewClient()
httpClient.RetryMax = 2
// suppress logging
httpClient.Logger = nil

return http.NewWithClient(
fmt.Sprintf("%s://%s", protocol, remoteAddr),
fmt.Sprintf("tcp://%s:%s", ip, port),
httpClient.StandardClient(),
)
}
25 changes: 18 additions & 7 deletions core/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"math/rand"
"net"
"net/url"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -51,8 +53,11 @@ func CreateKVStore(retainBlocks int64) *kvstore.Application {
// mock Core Client.
func StartTestClient(ctx context.Context, t *testing.T) (tmservice.Service, Client) {
nd, _, cfg := StartTestKVApp(ctx, t)
protocol, ip := GetEndpoint(cfg)
client, err := NewRemote(protocol, ip)
endpoint, err := GetEndpoint(cfg)
require.NoError(t, err)
ip, port, err := net.SplitHostPort(endpoint)
require.NoError(t, err)
client, err := NewRemote(ip, port)
require.NoError(t, err)
t.Cleanup(func() {
err := client.Stop()
Expand All @@ -63,11 +68,17 @@ func StartTestClient(ctx context.Context, t *testing.T) (tmservice.Service, Clie
return nd, client
}

// GetEndpoint returns the protocol and ip of the remote node.
func GetEndpoint(cfg *config.Config) (string, string) {
endpoint := cfg.RPC.ListenAddress
protocol, ip := endpoint[:3], endpoint[6:]
return protocol, ip
// GetEndpoint returns the remote node's RPC endpoint.
func GetEndpoint(cfg *config.Config) (string, error) {
url, err := url.Parse(cfg.RPC.ListenAddress)
if err != nil {
return "", err
}
host, _, err := net.SplitHostPort(url.Host)
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%s", host, url.Port()), nil
}

func RandValidator(randPower bool, minPower int64) (*tmtypes.Validator, tmtypes.PrivValidator) {
Expand Down
2 changes: 1 addition & 1 deletion node/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func baseComponents(cfg *Config, store Store) fx.Option {
fx.Invoke(invokeWatchdog(store.Path())),
p2p.Components(cfg.P2P),
// state components
statecomponents.Components(cfg.Core.GRPCAddr, cfg.Key),
statecomponents.Components(cfg.Core.IP, cfg.Core.GRPCPort, cfg.Key),
// RPC components
rpc.Components(cfg.RPC),
)
Expand Down
20 changes: 13 additions & 7 deletions node/config_opts.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package node

// WithRemoteCore configures Node to start with remote Core.
func WithRemoteCore(protocol string, address string) Option {
// WithRemoteCoreIP configures Node to connect to the given remote Core IP.
func WithRemoteCoreIP(ip string) Option {
return func(sets *settings) {
sets.cfg.Core.Protocol = protocol
sets.cfg.Core.RemoteAddr = address
sets.cfg.Core.IP = ip
}
}

// WithGRPCEndpoint configures Node to connect to given gRPC address
// WithRemoteCorePort configures Node to connect to the given remote Core port.
func WithRemoteCorePort(port string) Option {
return func(sets *settings) {
sets.cfg.Core.RPCPort = port
}
}

// WithGRPCPort configures Node to connect to given gRPC port
// for state-related queries.
func WithGRPCEndpoint(address string) Option {
func WithGRPCPort(port string) Option {
return func(sets *settings) {
sets.cfg.Core.GRPCAddr = address
sets.cfg.Core.GRPCPort = port
}
}

Expand Down
13 changes: 7 additions & 6 deletions node/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (

// Config combines all configuration fields for managing the relationship with a Core node.
type Config struct {
Protocol string
RemoteAddr string
GRPCAddr string
IP string
RPCPort string
GRPCPort string
}

// DefaultConfig returns default configuration for Core subsystem.
// DefaultConfig returns default configuration for managing the
// node's connection to a Celestia-Core endpoint.
func DefaultConfig() Config {
return Config{}
}
Expand All @@ -33,7 +34,7 @@ func Components(cfg Config) fx.Option {
fxutil.ProvideAs(headercore.NewExchange, new(header.Exchange)),
fx.Invoke(HeaderListener),
fx.Provide(func(lc fx.Lifecycle) (core.Client, error) {
if cfg.RemoteAddr == "" {
if cfg.IP == "" {
return nil, fmt.Errorf("no celestia-core endpoint given")
}
client, err := RemoteClient(cfg)
Expand Down Expand Up @@ -71,5 +72,5 @@ func HeaderListener(

// RemoteClient provides a constructor for core.Client over RPC.
func RemoteClient(cfg Config) (core.Client, error) {
return core.NewRemote(cfg.Protocol, cfg.RemoteAddr)
return core.NewRemote(cfg.IP, cfg.RPCPort)
}
5 changes: 3 additions & 2 deletions node/state/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
// CoreAccessor constructs a new instance of state.Accessor over
// a celestia-core connection.
func CoreAccessor(
endpoint string,
coreIP,
grpcPort string,
) func(fx.Lifecycle, *apptypes.KeyringSigner) (state.Accessor, error) {
return func(lc fx.Lifecycle, signer *apptypes.KeyringSigner) (state.Accessor, error) {
ca := state.NewCoreAccessor(signer, endpoint)
ca := state.NewCoreAccessor(signer, coreIP, grpcPort)
lc.Append(fx.Hook{
OnStart: ca.Start,
OnStop: ca.Stop,
Expand Down
4 changes: 2 additions & 2 deletions node/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ var log = logging.Logger("state-access-constructor")

// Components provides all components necessary to construct the
// state service.
func Components(coreEndpoint string, cfg key.Config) fx.Option {
func Components(coreIP, grpcPort string, cfg key.Config) fx.Option {
return fx.Options(
fx.Provide(Keyring(cfg)),
fx.Provide(CoreAccessor(coreEndpoint)),
fx.Provide(CoreAccessor(coreIP, grpcPort)),
fx.Provide(Service),
)
}
Expand Down
8 changes: 7 additions & 1 deletion node/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package node

import (
"context"
"net"
"testing"
"time"

Expand Down Expand Up @@ -31,8 +32,13 @@ func TestNode(t *testing.T, tp Type, opts ...Option) *Node {

store := MockStore(t, DefaultConfig(tp))
_, _, cfg := core.StartTestKVApp(ctx, t)
endpoint, err := core.GetEndpoint(cfg)
require.NoError(t, err)
ip, port, err := net.SplitHostPort(endpoint)
require.NoError(t, err)
opts = append(opts,
WithRemoteCore(core.GetEndpoint(cfg)),
WithRemoteCoreIP(ip),
WithRemoteCorePort(port),
WithNetwork(params.Private),
WithRPCPort("0"),
WithKeyringSigner(TestKeyringSigner(t)),
Expand Down
7 changes: 5 additions & 2 deletions node/tests/swamp/swamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ func NewSwamp(t *testing.T, options ...Option) *Swamp {
// so, we are not creating bridge nodes with each one containing its own core client
// instead we are assigning all created BNs to 1 Core from the swamp
core.StartTestNode(ctx, t, ic.App, ic.CoreCfg)
protocol, ip := core.GetEndpoint(ic.CoreCfg)
remote, err := core.NewRemote(protocol, ip)
endpoint, err := core.GetEndpoint(ic.CoreCfg)
require.NoError(t, err)
ip, port, err := net.SplitHostPort(endpoint)
require.NoError(t, err)
remote, err := core.NewRemote(ip, port)
require.NoError(t, err)

err = remote.Start()
Expand Down
18 changes: 11 additions & 7 deletions service/state/core_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,24 @@ import (
type CoreAccessor struct {
signer *apptypes.KeyringSigner

coreEndpoint string
coreConn *grpc.ClientConn
queryCli banktypes.QueryClient
coreIP string
grpcPort string
coreConn *grpc.ClientConn
queryCli banktypes.QueryClient
}

// NewCoreAccessor dials the given celestia-core endpoint and
// constructs and returns a new CoreAccessor with the active
// connection.
func NewCoreAccessor(
signer *apptypes.KeyringSigner,
endpoint string,
coreIP,
grpcPort string,
) *CoreAccessor {
return &CoreAccessor{
signer: signer,
coreEndpoint: endpoint,
signer: signer,
coreIP: coreIP,
grpcPort: grpcPort,
}
}

Expand All @@ -44,7 +47,8 @@ func (ca *CoreAccessor) Start(ctx context.Context) error {
return fmt.Errorf("core-access: already connected to core endpoint")
}
// dial given celestia-core endpoint
client, err := grpc.DialContext(ctx, ca.coreEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.grpcPort)
client, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
Expand Down