@@ -2,6 +2,7 @@ package state
2
2
3
3
import (
4
4
"context"
5
+ "crypto/tls"
5
6
"errors"
6
7
"fmt"
7
8
"net"
@@ -21,7 +22,9 @@ import (
21
22
"github.com/tendermint/tendermint/proto/tendermint/crypto"
22
23
"google.golang.org/grpc"
23
24
"google.golang.org/grpc/connectivity"
25
+ "google.golang.org/grpc/credentials"
24
26
"google.golang.org/grpc/credentials/insecure"
27
+ "google.golang.org/grpc/metadata"
25
28
26
29
"github.com/celestiaorg/celestia-app/v3/app"
27
30
"github.com/celestiaorg/celestia-app/v3/app/encoding"
47
50
// to configure parameters.
48
51
type Option func (ca * CoreAccessor )
49
52
53
+ func WithTLSConfig (cfg * tls.Config ) Option {
54
+ return func (ca * CoreAccessor ) {
55
+ ca .tls = cfg
56
+ }
57
+ }
58
+
59
+ func WithXToken (xtoken string ) Option {
60
+ return func (ca * CoreAccessor ) {
61
+ ca .xtoken = xtoken
62
+ }
63
+ }
64
+
50
65
// CoreAccessor implements service over a gRPC connection
51
66
// with a celestia-core node.
52
67
type CoreAccessor struct {
@@ -73,6 +88,9 @@ type CoreAccessor struct {
73
88
port string
74
89
network string
75
90
91
+ tls * tls.Config
92
+ xtoken string
93
+
76
94
// these fields are mutatable and thus need to be protected by a mutex
77
95
lock sync.Mutex
78
96
lastPayForBlob int64
@@ -91,9 +109,7 @@ func NewCoreAccessor(
91
109
keyring keyring.Keyring ,
92
110
keyname string ,
93
111
getter libhead.Head [* header.ExtendedHeader ],
94
- coreIP ,
95
- port string ,
96
- network string ,
112
+ coreIP , port , network string ,
97
113
options ... Option ,
98
114
) (* CoreAccessor , error ) {
99
115
// create verifier
@@ -123,23 +139,10 @@ func (ca *CoreAccessor) Start(ctx context.Context) error {
123
139
}
124
140
ca .ctx , ca .cancel = context .WithCancel (context .Background ())
125
141
126
- // dial given celestia-core endpoint
127
- endpoint := net .JoinHostPort (ca .coreIP , ca .port )
128
- client , err := grpc .NewClient (
129
- endpoint ,
130
- grpc .WithTransportCredentials (insecure .NewCredentials ()),
131
- )
142
+ err := ca .startGRPCClient (ctx )
132
143
if err != nil {
133
- return err
144
+ return fmt . Errorf ( "failed to start grpc client: %w" , err )
134
145
}
135
- // this ensures we can't start the node without core connection
136
- client .Connect ()
137
- if ! client .WaitForStateChange (ctx , connectivity .Ready ) {
138
- // hits the case when context is canceled
139
- return fmt .Errorf ("couldn't connect to core endpoint(%s): %w" , endpoint , ctx .Err ())
140
- }
141
-
142
- ca .coreConn = client
143
146
144
147
// create the staking query client
145
148
ca .stakingCli = stakingtypes .NewQueryClient (ca .coreConn )
@@ -602,6 +605,40 @@ func (ca *CoreAccessor) setupTxClient(ctx context.Context, keyName string) (*use
602
605
)
603
606
}
604
607
608
+ func (ca * CoreAccessor ) startGRPCClient (ctx context.Context ) error {
609
+ // dial given celestia-core endpoint
610
+ endpoint := net .JoinHostPort (ca .coreIP , ca .port )
611
+ // By default, the gRPC client is configured to handle an insecure connection.
612
+ // If the TLS configuration is not empty, it will be applied to the client's options.
613
+ // If the TLS configuration is empty but the X-Token is provided,
614
+ // the X-Token will be applied as an interceptor along with an empty TLS configuration.
615
+ opts := []grpc.DialOption {grpc .WithTransportCredentials (insecure .NewCredentials ())}
616
+ if ca .tls != nil {
617
+ opts = append (opts , grpc .WithTransportCredentials (credentials .NewTLS (ca .tls )))
618
+ }
619
+ if ca .xtoken != "" {
620
+ opts = append (opts , grpc .WithUnaryInterceptor (authInterceptor (ca .xtoken )))
621
+ }
622
+
623
+ client , err := grpc .NewClient (
624
+ endpoint ,
625
+ opts ... ,
626
+ )
627
+ if err != nil {
628
+ return err
629
+ }
630
+ // this ensures we can't start the node without core connection
631
+ client .Connect ()
632
+ if ! client .WaitForStateChange (ctx , connectivity .Ready ) {
633
+ // hits the case when context is canceled
634
+ return fmt .Errorf ("couldn't connect to core endpoint(%s): %w" , endpoint , ctx .Err ())
635
+ }
636
+ ca .coreConn = client
637
+
638
+ log .Infof ("Connection with core endpoint(%s) established" , endpoint )
639
+ return nil
640
+ }
641
+
605
642
func (ca * CoreAccessor ) submitMsg (
606
643
ctx context.Context ,
607
644
msg sdktypes.Msg ,
@@ -658,3 +695,17 @@ func convertToSdkTxResponse(resp *user.TxResponse) *TxResponse {
658
695
Height : resp .Height ,
659
696
}
660
697
}
698
+
699
+ func authInterceptor (xtoken string ) grpc.UnaryClientInterceptor {
700
+ return func (
701
+ ctx context.Context ,
702
+ method string ,
703
+ req , reply interface {},
704
+ cc * grpc.ClientConn ,
705
+ invoker grpc.UnaryInvoker ,
706
+ opts ... grpc.CallOption ,
707
+ ) error {
708
+ ctx = metadata .AppendToOutgoingContext (ctx , "x-token" , xtoken )
709
+ return invoker (ctx , method , req , reply , cc , opts ... )
710
+ }
711
+ }
0 commit comments