Skip to content

Commit

Permalink
feat(core): added support of the TLS certificates along with x-token …
Browse files Browse the repository at this point in the history
…authorisation token for the gRPC connection (#3954)
  • Loading branch information
vgonkivs authored Dec 11, 2024
1 parent 941dead commit b5fc555
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 23 deletions.
7 changes: 7 additions & 0 deletions nodebuilder/core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ var MetricsEnabled bool
type Config struct {
IP string
Port string
// TLSEnabled specifies whether the connection is secure or not.
// PLEASE NOTE: it should be set to true in order to handle XTokenPath.
TLSEnabled bool
// XTokenPath specifies the path to the directory with JSON file containing the X-Token for gRPC authentication.
// The JSON file should have a key-value pair where the key is "x-token" and the value is the authentication token.
// If left empty, the client will not include the X-Token in its requests.
XTokenPath string
}

// DefaultConfig returns default configuration for managing the
Expand Down
31 changes: 29 additions & 2 deletions nodebuilder/core/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
)

var (
coreFlag = "core.ip"
coreGRPCFlag = "core.grpc.port"
coreFlag = "core.ip"
coreGRPCFlag = "core.grpc.port"
coreTLS = "core.tls"
coreXTokenPathFlag = "core.xtoken.path" //nolint:gosec
)

// Flags gives a set of hardcoded Core flags.
Expand All @@ -28,6 +30,19 @@ func Flags() *flag.FlagSet {
DefaultPort,
"Set a custom gRPC port for the core node connection. The --core.ip flag must also be provided.",
)
flags.Bool(
coreTLS,
false,
"Specifies whether TLS is enabled or not. Default: false",
)
flags.String(
coreXTokenPathFlag,
"",
"specifies the file path to the JSON file containing the X-Token for gRPC authentication. "+
"The JSON file should have a key-value pair where the key is 'x-token' and the value is the authentication token. "+
"NOTE: the path is parsed only if coreTLS enabled."+
"If left empty, the client will not include the X-Token in its requests.",
)
return flags
}

Expand All @@ -49,6 +64,18 @@ func ParseFlags(
cfg.Port = grpc
}

enabled, err := cmd.Flags().GetBool(coreTLS)
if err != nil {
return err
}

if enabled {
cfg.TLSEnabled = true
if cmd.Flag(coreXTokenPathFlag).Changed {
path := cmd.Flag(coreXTokenPathFlag).Value.String()
cfg.XTokenPath = path
}
}
cfg.IP = coreIP
return cfg.Validate()
}
44 changes: 44 additions & 0 deletions nodebuilder/core/tls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package core

import (
"crypto/tls"
"encoding/json"
"errors"
"os"
"path/filepath"

"github.com/celestiaorg/celestia-node/libs/utils"
)

const xtokenFileName = "xtoken.json"

func EmptyTLSConfig() *tls.Config {
return &tls.Config{MinVersion: tls.VersionTLS12}
}

// XToken retrieves the authentication token from a JSON file at the specified path.
func XToken(xtokenPath string) (string, error) {
xtokenPath = filepath.Join(xtokenPath, xtokenFileName)
exist := utils.Exists(xtokenPath)
if !exist {
return "", os.ErrNotExist
}

token, err := os.ReadFile(xtokenPath)
if err != nil {
return "", err
}

auth := struct {
Token string `json:"x-token"`
}{}

err = json.Unmarshal(token, &auth)
if err != nil {
return "", err
}
if auth.Token == "" {
return "", errors.New("x-token is empty. Please setup a token or cleanup xtokenPath")
}
return auth.Token, nil
}
17 changes: 14 additions & 3 deletions nodebuilder/state/core.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package state

import (
"errors"
"os"

"github.com/cosmos/cosmos-sdk/crypto/keyring"

libfraud "github.com/celestiaorg/go-fraud"
Expand Down Expand Up @@ -30,14 +33,22 @@ func coreAccessor(
*modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader],
error,
) {
ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, corecfg.IP, corecfg.Port,
network.String(), opts...)
if corecfg.TLSEnabled {
tlsCfg := core.EmptyTLSConfig()
xtoken, err := core.XToken(corecfg.XTokenPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, nil, nil, err
}
opts = append(opts, state.WithTLSConfig(tlsCfg), state.WithXToken(xtoken))
}

ca, err := state.NewCoreAccessor(keyring, string(keyname), sync,
corecfg.IP, corecfg.Port, network.String(), opts...)

sBreaker := &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{
Service: ca,
FraudType: byzantine.BadEncoding,
FraudServ: fraudServ,
}

return ca, ca, sBreaker, err
}
87 changes: 69 additions & 18 deletions state/core_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
Expand All @@ -21,7 +22,9 @@ import (
"github.com/tendermint/tendermint/proto/tendermint/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

"github.com/celestiaorg/celestia-app/v3/app"
"github.com/celestiaorg/celestia-app/v3/app/encoding"
Expand All @@ -47,6 +50,18 @@ var (
// to configure parameters.
type Option func(ca *CoreAccessor)

func WithTLSConfig(cfg *tls.Config) Option {
return func(ca *CoreAccessor) {
ca.tls = cfg
}
}

func WithXToken(xtoken string) Option {
return func(ca *CoreAccessor) {
ca.xtoken = xtoken
}
}

// CoreAccessor implements service over a gRPC connection
// with a celestia-core node.
type CoreAccessor struct {
Expand All @@ -73,6 +88,9 @@ type CoreAccessor struct {
port string
network string

tls *tls.Config
xtoken string

// these fields are mutatable and thus need to be protected by a mutex
lock sync.Mutex
lastPayForBlob int64
Expand All @@ -91,9 +109,7 @@ func NewCoreAccessor(
keyring keyring.Keyring,
keyname string,
getter libhead.Head[*header.ExtendedHeader],
coreIP,
port string,
network string,
coreIP, port, network string,
options ...Option,
) (*CoreAccessor, error) {
// create verifier
Expand Down Expand Up @@ -123,23 +139,10 @@ func (ca *CoreAccessor) Start(ctx context.Context) error {
}
ca.ctx, ca.cancel = context.WithCancel(context.Background())

// dial given celestia-core endpoint
endpoint := net.JoinHostPort(ca.coreIP, ca.port)
client, err := grpc.NewClient(
endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
err := ca.startGRPCClient(ctx)
if err != nil {
return err
return fmt.Errorf("failed to start grpc client: %w", err)
}
// this ensures we can't start the node without core connection
client.Connect()
if !client.WaitForStateChange(ctx, connectivity.Ready) {
// hits the case when context is canceled
return fmt.Errorf("couldn't connect to core endpoint(%s): %w", endpoint, ctx.Err())
}

ca.coreConn = client

// create the staking query client
ca.stakingCli = stakingtypes.NewQueryClient(ca.coreConn)
Expand Down Expand Up @@ -602,6 +605,40 @@ func (ca *CoreAccessor) setupTxClient(ctx context.Context, keyName string) (*use
)
}

func (ca *CoreAccessor) startGRPCClient(ctx context.Context) error {
// dial given celestia-core endpoint
endpoint := net.JoinHostPort(ca.coreIP, ca.port)
// By default, the gRPC client is configured to handle an insecure connection.
// If the TLS configuration is not empty, it will be applied to the client's options.
// If the TLS configuration is empty but the X-Token is provided,
// the X-Token will be applied as an interceptor along with an empty TLS configuration.
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
if ca.tls != nil {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(ca.tls)))
}
if ca.xtoken != "" {
opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor(ca.xtoken)))
}

client, err := grpc.NewClient(
endpoint,
opts...,
)
if err != nil {
return err
}
// this ensures we can't start the node without core connection
client.Connect()
if !client.WaitForStateChange(ctx, connectivity.Ready) {
// hits the case when context is canceled
return fmt.Errorf("couldn't connect to core endpoint(%s): %w", endpoint, ctx.Err())
}
ca.coreConn = client

log.Infof("Connection with core endpoint(%s) established", endpoint)
return nil
}

func (ca *CoreAccessor) submitMsg(
ctx context.Context,
msg sdktypes.Msg,
Expand Down Expand Up @@ -658,3 +695,17 @@ func convertToSdkTxResponse(resp *user.TxResponse) *TxResponse {
Height: resp.Height,
}
}

func authInterceptor(xtoken string) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken)
return invoker(ctx, method, req, reply, cc, opts...)
}
}

0 comments on commit b5fc555

Please sign in to comment.