@@ -55,6 +55,7 @@ import (
5555 "google.golang.org/grpc/stats"
5656 "storj.io/drpc"
5757 "storj.io/drpc/drpcclient"
58+ "storj.io/drpc/drpcmigrate"
5859)
5960
6061// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
@@ -1432,6 +1433,17 @@ func (rpcCtx *Context) GRPCDialOptions(
14321433 return rpcCtx .grpcDialOptionsInternal (ctx , target , class , transport , onNetworkDial )
14331434}
14341435
1436+ // DRPCDialOptions is same as GRPCDialOptions but for drpc connections.
1437+ func (rpcCtx * Context ) DRPCDialOptions (
1438+ ctx context.Context , target string , class rpcbase.ConnectionClass ,
1439+ ) ([]drpcclient.DialOption , error ) {
1440+ transport := tcpTransport
1441+ if rpcCtx .ContextOptions .AdvertiseAddr == target && rpcCtx .canLoopbackDial () {
1442+ transport = loopbackTransport
1443+ }
1444+ return rpcCtx .drpcDialOptionsInternal (ctx , target , class , transport )
1445+ }
1446+
14351447// grpcDialOptions produces dial options suitable for connecting to the given target and class.
14361448func (rpcCtx * Context ) grpcDialOptionsInternal (
14371449 ctx context.Context ,
@@ -1465,6 +1477,39 @@ func (rpcCtx *Context) grpcDialOptionsInternal(
14651477 return dialOpts , nil
14661478}
14671479
1480+ // drpcDialOptionsInternal is similar to grpcDialOptionsInternal but for
1481+ // drpc connections.
1482+ func (rpcCtx * Context ) drpcDialOptionsInternal (
1483+ ctx context.Context ,
1484+ target string ,
1485+ class rpcbase.ConnectionClass ,
1486+ transport transportType ,
1487+ ) ([]drpcclient.DialOption , error ) {
1488+ drpcDialOpts , err := rpcCtx .drpcDialOptsCommon (ctx , target , class )
1489+ if err != nil {
1490+ return nil , err
1491+ }
1492+
1493+ switch transport {
1494+ case tcpTransport :
1495+ netOpts , err := rpcCtx .drpcDialOptsNetwork (ctx , target , class )
1496+ if err != nil {
1497+ return nil , err
1498+ }
1499+ drpcDialOpts = append (drpcDialOpts , netOpts ... )
1500+ case loopbackTransport :
1501+ localOpts , err := rpcCtx .drpcDialOptsLocal ()
1502+ if err != nil {
1503+ return nil , err
1504+ }
1505+ drpcDialOpts = append (drpcDialOpts , localOpts ... )
1506+ default :
1507+ // This panic in case the type is ever changed to include more values.
1508+ panic (errors .AssertionFailedf ("unhandled: %v" , transport ))
1509+ }
1510+ return drpcDialOpts , nil
1511+ }
1512+
14681513// dialOptsLocal computes options used only for loopback connections.
14691514func (rpcCtx * Context ) dialOptsLocal () ([]grpc.DialOption , error ) {
14701515 // We need to include a TLS overlay even for loopback connections,
@@ -1489,6 +1534,22 @@ func (rpcCtx *Context) dialOptsLocal() ([]grpc.DialOption, error) {
14891534 return dialOpts , err
14901535}
14911536
1537+ // drpcDialOptsLocal is simialar to dialOptsLocal but for drpc connections.
1538+ func (rpcCtx * Context ) drpcDialOptsLocal () ([]drpcclient.DialOption , error ) {
1539+ drpcDialOpts , err := rpcCtx .drpcDialOptsNetworkCredentials ()
1540+ if err != nil {
1541+ return nil , err
1542+ }
1543+
1544+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithContextDialer (
1545+ func (ctx context.Context , target string ) (net.Conn , error ) {
1546+ return rpcCtx .loopbackDRPCDialFn (ctx )
1547+ },
1548+ ))
1549+
1550+ return drpcDialOpts , err
1551+ }
1552+
14921553// GetBreakerForAddr looks up a breaker for the matching (NodeID,Class,Addr).
14931554// If it exists, it is unique.
14941555//
@@ -1566,6 +1627,20 @@ func (rpcCtx *Context) dialOptsNetworkCredentials() ([]grpc.DialOption, error) {
15661627 return dialOpts , nil
15671628}
15681629
1630+ // drpcDialOptsNetworkCredentials is same as dialOptsNetworkCredentials but for drpc connections.
1631+ func (rpcCtx * Context ) drpcDialOptsNetworkCredentials () ([]drpcclient.DialOption , error ) {
1632+ drpcDialOpts := []drpcclient.DialOption {}
1633+ if ! rpcCtx .ContextOptions .Insecure {
1634+ tlsConfig , err := rpcCtx .GetClientTLSConfig ()
1635+ if err != nil {
1636+ return nil , err
1637+ }
1638+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithTLSConfig (tlsConfig ))
1639+ }
1640+
1641+ return drpcDialOpts , nil
1642+ }
1643+
15691644type statsTracker struct {
15701645 m localityMetrics
15711646}
@@ -1713,6 +1788,38 @@ func (rpcCtx *Context) dialOptsNetwork(
17131788 return dialOpts , nil
17141789}
17151790
1791+ // drpcDialOptsNetwork is same as dialOptsNetwork but for drpc connections.
1792+ func (rpcCtx * Context ) drpcDialOptsNetwork (
1793+ ctx context.Context , target string , class rpcbase.ConnectionClass ,
1794+ ) ([]drpcclient.DialOption , error ) {
1795+ // TODO(server): add compression support to drpc.
1796+ // TODO(server): add support for dial timeout.
1797+ // TODO(server): check if onlyOnceDialer is needed for drpc.
1798+
1799+ drpcDialOpts , err := rpcCtx .drpcDialOptsNetworkCredentials ()
1800+ if err != nil {
1801+ return nil , err
1802+ }
1803+
1804+ dialerFunc := func (ctx context.Context , target string ) (net.Conn , error ) {
1805+ return drpcmigrate .DialWithHeader (ctx , "tcp" , target , drpcmigrate .DRPCHeader )
1806+ }
1807+ if rpcCtx .Knobs .InjectedLatencyOracle != nil {
1808+ latency := rpcCtx .Knobs .InjectedLatencyOracle .GetLatency (target )
1809+ log .VEventf (ctx , 1 , "connecting with simulated latency %dms" ,
1810+ latency )
1811+ dialer := artificialLatencyDialer {
1812+ dialerFunc : dialerFunc ,
1813+ latency : latency ,
1814+ enabled : rpcCtx .Knobs .InjectedLatencyEnabled ,
1815+ }
1816+ dialerFunc = dialer .dial
1817+ }
1818+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithContextDialer (dialerFunc ))
1819+
1820+ return drpcDialOpts , nil
1821+ }
1822+
17161823// dialOptsCommon computes options used for both in-memory and
17171824// over-the-network RPC connections.
17181825func (rpcCtx * Context ) dialOptsCommon (
@@ -1762,6 +1869,37 @@ func (rpcCtx *Context) dialOptsCommon(
17621869 return dialOpts , nil
17631870}
17641871
1872+ // drpcDialOptsCommon is same as dialOptsCommon but for drpc connections.
1873+ func (rpcCtx * Context ) drpcDialOptsCommon (
1874+ ctx context.Context , target string , class rpcbase.ConnectionClass ,
1875+ ) ([]drpcclient.DialOption , error ) {
1876+ drpcDialOpts := []drpcclient.DialOption {}
1877+ if ! rpcCtx .TenantID .IsSystem () {
1878+ key , value := newPerRPCTIDMetdata (rpcCtx .TenantID )
1879+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithPerRPCMetadata (map [string ]string {key : value }))
1880+ }
1881+
1882+ unaryInterceptors := rpcCtx .clientUnaryInterceptorsDRPC
1883+ if rpcCtx .Knobs .UnaryClientInterceptorDRPC != nil {
1884+ interceptor := rpcCtx .Knobs .UnaryClientInterceptorDRPC (target , rpcbase .DefaultClass )
1885+ if interceptor != nil {
1886+ unaryInterceptors = append (unaryInterceptors , interceptor )
1887+ }
1888+ }
1889+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithChainUnaryInterceptor (unaryInterceptors ... ))
1890+
1891+ streamInterceptors := rpcCtx .clientStreamInterceptorsDRPC
1892+ if rpcCtx .Knobs .StreamClientInterceptorDRPC != nil {
1893+ interceptor := rpcCtx .Knobs .StreamClientInterceptorDRPC (target , rpcbase .DefaultClass )
1894+ if interceptor != nil {
1895+ streamInterceptors = append (streamInterceptors , interceptor )
1896+ }
1897+ }
1898+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithChainStreamInterceptor (streamInterceptors ... ))
1899+
1900+ return drpcDialOpts , nil
1901+ }
1902+
17651903// ClientInterceptors returns the client interceptors that the Context uses on
17661904// RPC calls. They are exposed so that RPC calls that bypass the Context (i.e.
17671905// the ones done locally through the internalClientAdapater) can use the same
@@ -2065,6 +2203,32 @@ func (rpcCtx *Context) grpcDialRaw(
20652203 return grpc .DialContext (ctx , target , dialOpts ... )
20662204}
20672205
2206+ // drpcDialRaw is similar to grpcDialRaw but for drpc connections.
2207+ func (rpcCtx * Context ) drpcDialRaw (
2208+ ctx context.Context ,
2209+ target string ,
2210+ class rpcbase.ConnectionClass ,
2211+ additionalOpts ... drpcclient.DialOption ,
2212+ ) (* drpcclient.ClientConn , error ) {
2213+ transport := tcpTransport
2214+ if rpcCtx .ContextOptions .AdvertiseAddr == target && rpcCtx .canLoopbackDial () {
2215+ transport = loopbackTransport
2216+ }
2217+ drpcDialOpts , err := rpcCtx .drpcDialOptionsInternal (ctx , target , class , transport )
2218+ if err != nil {
2219+ return nil , err
2220+ }
2221+
2222+ drpcDialOpts = append (drpcDialOpts , additionalOpts ... )
2223+
2224+ drpcConn , err := drpcclient .DialContext (ctx , target , drpcDialOpts ... )
2225+ if err != nil {
2226+ return nil , err
2227+ }
2228+
2229+ return drpcclient .NewClientConnWithOptions (ctx , drpcConn , drpcDialOpts ... )
2230+ }
2231+
20682232// GRPCUnvalidatedDial uses GRPCDialNode and disables validation of the
20692233// node ID between client and server. This function should only be
20702234// used with the gossip client and CLI commands which can talk to any
0 commit comments