@@ -55,6 +55,7 @@ import (
5555 "google.golang.org/grpc/stats"
5656 "storj.io/drpc"
5757 "storj.io/drpc/drpcclient"
58+ "storj.io/drpc/drpcmigrate"
5859)
5960
6061// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
@@ -1436,6 +1437,17 @@ func (rpcCtx *Context) GRPCDialOptions(
14361437 return rpcCtx .grpcDialOptionsInternal (ctx , target , class , transport , onNetworkDial )
14371438}
14381439
1440+ // DRPCDialOptions is same as GRPCDialOptions but for drpc connections.
1441+ func (rpcCtx * Context ) DRPCDialOptions (
1442+ ctx context.Context , target string , class rpcbase.ConnectionClass ,
1443+ ) ([]drpcclient.DialOption , error ) {
1444+ transport := tcpTransport
1445+ if rpcCtx .ContextOptions .AdvertiseAddr == target && rpcCtx .canLoopbackDial () {
1446+ transport = loopbackTransport
1447+ }
1448+ return rpcCtx .drpcDialOptionsInternal (ctx , target , class , transport )
1449+ }
1450+
14391451// grpcDialOptions produces dial options suitable for connecting to the given target and class.
14401452func (rpcCtx * Context ) grpcDialOptionsInternal (
14411453 ctx context.Context ,
@@ -1469,6 +1481,39 @@ func (rpcCtx *Context) grpcDialOptionsInternal(
14691481 return dialOpts , nil
14701482}
14711483
1484+ // drpcDialOptionsInternal is similar to grpcDialOptionsInternal but for
1485+ // drpc connections.
1486+ func (rpcCtx * Context ) drpcDialOptionsInternal (
1487+ ctx context.Context ,
1488+ target string ,
1489+ class rpcbase.ConnectionClass ,
1490+ transport transportType ,
1491+ ) ([]drpcclient.DialOption , error ) {
1492+ drpcDialOpts , err := rpcCtx .drpcDialOptsCommon (ctx , target , class )
1493+ if err != nil {
1494+ return nil , err
1495+ }
1496+
1497+ switch transport {
1498+ case tcpTransport :
1499+ netOpts , err := rpcCtx .drpcDialOptsNetwork (ctx , target , class )
1500+ if err != nil {
1501+ return nil , err
1502+ }
1503+ drpcDialOpts = append (drpcDialOpts , netOpts ... )
1504+ case loopbackTransport :
1505+ localOpts , err := rpcCtx .drpcDialOptsLocal ()
1506+ if err != nil {
1507+ return nil , err
1508+ }
1509+ drpcDialOpts = append (drpcDialOpts , localOpts ... )
1510+ default :
1511+ // This panic in case the type is ever changed to include more values.
1512+ panic (errors .AssertionFailedf ("unhandled: %v" , transport ))
1513+ }
1514+ return drpcDialOpts , nil
1515+ }
1516+
14721517// dialOptsLocal computes options used only for loopback connections.
14731518func (rpcCtx * Context ) dialOptsLocal () ([]grpc.DialOption , error ) {
14741519 // We need to include a TLS overlay even for loopback connections,
@@ -1493,6 +1538,22 @@ func (rpcCtx *Context) dialOptsLocal() ([]grpc.DialOption, error) {
14931538 return dialOpts , err
14941539}
14951540
1541+ // drpcDialOptsLocal is simialar to dialOptsLocal but for drpc connections.
1542+ func (rpcCtx * Context ) drpcDialOptsLocal () ([]drpcclient.DialOption , error ) {
1543+ drpcDialOpts , err := rpcCtx .drpcDialOptsNetworkCredentials ()
1544+ if err != nil {
1545+ return nil , err
1546+ }
1547+
1548+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithContextDialer (
1549+ func (ctx context.Context , target string ) (net.Conn , error ) {
1550+ return rpcCtx .loopbackDRPCDialFn (ctx )
1551+ },
1552+ ))
1553+
1554+ return drpcDialOpts , err
1555+ }
1556+
14961557// GetBreakerForAddr looks up a breaker for the matching (NodeID,Class,Addr).
14971558// If it exists, it is unique.
14981559//
@@ -1570,6 +1631,20 @@ func (rpcCtx *Context) dialOptsNetworkCredentials() ([]grpc.DialOption, error) {
15701631 return dialOpts , nil
15711632}
15721633
1634+ // drpcDialOptsNetworkCredentials is same as dialOptsNetworkCredentials but for drpc connections.
1635+ func (rpcCtx * Context ) drpcDialOptsNetworkCredentials () ([]drpcclient.DialOption , error ) {
1636+ drpcDialOpts := []drpcclient.DialOption {}
1637+ if ! rpcCtx .ContextOptions .Insecure {
1638+ tlsConfig , err := rpcCtx .GetClientTLSConfig ()
1639+ if err != nil {
1640+ return nil , err
1641+ }
1642+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithTLSConfig (tlsConfig ))
1643+ }
1644+
1645+ return drpcDialOpts , nil
1646+ }
1647+
15731648type statsTracker struct {
15741649 m localityMetrics
15751650}
@@ -1717,6 +1792,38 @@ func (rpcCtx *Context) dialOptsNetwork(
17171792 return dialOpts , nil
17181793}
17191794
1795+ // drpcDialOptsNetwork is same as dialOptsNetwork but for drpc connections.
1796+ func (rpcCtx * Context ) drpcDialOptsNetwork (
1797+ ctx context.Context , target string , class rpcbase.ConnectionClass ,
1798+ ) ([]drpcclient.DialOption , error ) {
1799+ // TODO(server): add compression support to drpc.
1800+ // TODO(server): add support for dial timeout.
1801+ // TODO(server): check if onlyOnceDialer is needed for drpc.
1802+
1803+ drpcDialOpts , err := rpcCtx .drpcDialOptsNetworkCredentials ()
1804+ if err != nil {
1805+ return nil , err
1806+ }
1807+
1808+ dialerFunc := func (ctx context.Context , target string ) (net.Conn , error ) {
1809+ return drpcmigrate .DialWithHeader (ctx , "tcp" , target , drpcmigrate .DRPCHeader )
1810+ }
1811+ if rpcCtx .Knobs .InjectedLatencyOracle != nil {
1812+ latency := rpcCtx .Knobs .InjectedLatencyOracle .GetLatency (target )
1813+ log .VEventf (ctx , 1 , "connecting with simulated latency %dms" ,
1814+ latency )
1815+ dialer := artificialLatencyDialer {
1816+ dialerFunc : dialerFunc ,
1817+ latency : latency ,
1818+ enabled : rpcCtx .Knobs .InjectedLatencyEnabled ,
1819+ }
1820+ dialerFunc = dialer .dial
1821+ }
1822+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithContextDialer (dialerFunc ))
1823+
1824+ return drpcDialOpts , nil
1825+ }
1826+
17201827// dialOptsCommon computes options used for both in-memory and
17211828// over-the-network RPC connections.
17221829func (rpcCtx * Context ) dialOptsCommon (
@@ -1766,6 +1873,37 @@ func (rpcCtx *Context) dialOptsCommon(
17661873 return dialOpts , nil
17671874}
17681875
1876+ // drpcDialOptsCommon is same as dialOptsCommon but for drpc connections.
1877+ func (rpcCtx * Context ) drpcDialOptsCommon (
1878+ ctx context.Context , target string , class rpcbase.ConnectionClass ,
1879+ ) ([]drpcclient.DialOption , error ) {
1880+ drpcDialOpts := []drpcclient.DialOption {}
1881+ if ! rpcCtx .TenantID .IsSystem () {
1882+ key , value := newPerRPCTIDMetdata (rpcCtx .TenantID )
1883+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithPerRPCMetadata (map [string ]string {key : value }))
1884+ }
1885+
1886+ unaryInterceptors := rpcCtx .clientUnaryInterceptorsDRPC
1887+ if rpcCtx .Knobs .UnaryClientInterceptorDRPC != nil {
1888+ interceptor := rpcCtx .Knobs .UnaryClientInterceptorDRPC (target , rpcbase .DefaultClass )
1889+ if interceptor != nil {
1890+ unaryInterceptors = append (unaryInterceptors , interceptor )
1891+ }
1892+ }
1893+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithChainUnaryInterceptor (unaryInterceptors ... ))
1894+
1895+ streamInterceptors := rpcCtx .clientStreamInterceptorsDRPC
1896+ if rpcCtx .Knobs .StreamClientInterceptorDRPC != nil {
1897+ interceptor := rpcCtx .Knobs .StreamClientInterceptorDRPC (target , rpcbase .DefaultClass )
1898+ if interceptor != nil {
1899+ streamInterceptors = append (streamInterceptors , interceptor )
1900+ }
1901+ }
1902+ drpcDialOpts = append (drpcDialOpts , drpcclient .WithChainStreamInterceptor (streamInterceptors ... ))
1903+
1904+ return drpcDialOpts , nil
1905+ }
1906+
17691907// ClientInterceptors returns the client interceptors that the Context uses on
17701908// RPC calls. They are exposed so that RPC calls that bypass the Context (i.e.
17711909// the ones done locally through the internalClientAdapater) can use the same
@@ -2069,6 +2207,32 @@ func (rpcCtx *Context) grpcDialRaw(
20692207 return grpc .DialContext (ctx , target , dialOpts ... )
20702208}
20712209
2210+ // drpcDialRaw is similar to grpcDialRaw but for drpc connections.
2211+ func (rpcCtx * Context ) drpcDialRaw (
2212+ ctx context.Context ,
2213+ target string ,
2214+ class rpcbase.ConnectionClass ,
2215+ additionalOpts ... drpcclient.DialOption ,
2216+ ) (* drpcclient.ClientConn , error ) {
2217+ transport := tcpTransport
2218+ if rpcCtx .ContextOptions .AdvertiseAddr == target && rpcCtx .canLoopbackDial () {
2219+ transport = loopbackTransport
2220+ }
2221+ drpcDialOpts , err := rpcCtx .drpcDialOptionsInternal (ctx , target , class , transport )
2222+ if err != nil {
2223+ return nil , err
2224+ }
2225+
2226+ drpcDialOpts = append (drpcDialOpts , additionalOpts ... )
2227+
2228+ drpcConn , err := drpcclient .DialContext (ctx , target , drpcDialOpts ... )
2229+ if err != nil {
2230+ return nil , err
2231+ }
2232+
2233+ return drpcclient .NewClientConnWithOptions (ctx , drpcConn , drpcDialOpts ... )
2234+ }
2235+
20722236// GRPCUnvalidatedDial uses GRPCDialNode and disables validation of the
20732237// node ID between client and server. This function should only be
20742238// used with the gossip client and CLI commands which can talk to any
0 commit comments