diff --git a/internal/imports/imports_linux.go b/internal/imports/imports_linux.go deleted file mode 100644 index 82b936e..0000000 --- a/internal/imports/imports_linux.go +++ /dev/null @@ -1,69 +0,0 @@ -// Code generated by github.com/edwarnicke/imports-gen DO NOT EDIT. -package imports - -import ( - _ "context" - _ "crypto/tls" - _ "github.com/antonfisher/nested-logrus-formatter" - _ "github.com/edwarnicke/genericsync" - _ "github.com/edwarnicke/grpcfd" - _ "github.com/kelseyhightower/envconfig" - _ "github.com/networkservicemesh/api/pkg/api/ipam" - _ "github.com/networkservicemesh/api/pkg/api/networkservice" - _ "github.com/networkservicemesh/api/pkg/api/networkservice/payload" - _ "github.com/networkservicemesh/api/pkg/api/registry" - _ "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/connectioncontext/ipcontext/ipaddress" - _ "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/connectioncontext/ipcontext/routes" - _ "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/connectioncontext/ipcontext/unnumbered" - _ "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/connectioncontext/mtu" - _ "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/loopback" - _ "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/memif" - _ "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/tag" - _ "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/up" - _ "github.com/networkservicemesh/sdk-vpp/pkg/networkservice/vrf" - _ "github.com/networkservicemesh/sdk/pkg/ipam/strictvl3ipam" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientinfo" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/recvfd" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/onidle" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/retry" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/upstreamrefresh" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/dnscontext/vl3dns" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/ipcontext/vl3" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/mtu/vl3mtu" - _ "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" - _ "github.com/networkservicemesh/sdk/pkg/registry/chains/client" - _ "github.com/networkservicemesh/sdk/pkg/registry/common/authorize" - _ "github.com/networkservicemesh/sdk/pkg/registry/common/clientinfo" - _ "github.com/networkservicemesh/sdk/pkg/registry/common/sendfd" - _ "github.com/networkservicemesh/sdk/pkg/tools/debug" - _ "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" - _ "github.com/networkservicemesh/sdk/pkg/tools/log" - _ "github.com/networkservicemesh/sdk/pkg/tools/log/logruslogger" - _ "github.com/networkservicemesh/sdk/pkg/tools/opentelemetry" - _ "github.com/networkservicemesh/sdk/pkg/tools/spiffejwt" - _ "github.com/networkservicemesh/sdk/pkg/tools/token" - _ "github.com/networkservicemesh/sdk/pkg/tools/tracing" - _ "github.com/networkservicemesh/vpphelper" - _ "github.com/pkg/errors" - _ "github.com/sirupsen/logrus" - _ "github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig" - _ "github.com/spiffe/go-spiffe/v2/svid/x509svid" - _ "github.com/spiffe/go-spiffe/v2/workloadapi" - _ "google.golang.org/grpc" - _ "google.golang.org/grpc/credentials" - _ "google.golang.org/protobuf/proto" - _ "net" - _ "net/url" - _ "os" - _ "os/signal" - _ "path/filepath" - _ "syscall" - _ "time" -) diff --git a/main.go b/main.go index 7a524bf..88d36b2 100644 --- a/main.go +++ b/main.go @@ -53,6 +53,7 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/ipcontext/vl3" "github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/mtu/vl3mtu" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" registryclientinfo "github.com/networkservicemesh/sdk/pkg/registry/common/clientinfo" registrysendfd "github.com/networkservicemesh/sdk/pkg/registry/common/sendfd" @@ -112,7 +113,7 @@ type Config struct { UnregisterItself bool `default:"true" desc:"if true then NSE unregister itself when it completes working" split_words:"true"` OpenTelemetryEndpoint string `default:"otel-collector.observability.svc.cluster.local:4317" desc:"OpenTelemetry Collector Endpoint"` MetricsExportInterval time.Duration `default:"10s" desc:"interval between mertics exports" split_words:"true"` - PrefixServerURL url.URL `default:"vl3-ipam:5006" desc:"URL to VL3 IPAM server" split_words:"true"` + PrefixServerURL []url.URL `default:"vl3-ipam:5006" desc:"URL to VL3 IPAM server(s)" split_words:"true"` DNSTemplates []string `default:"{{ index .Labels \"podName\" }}.{{ .NetworkService }}." desc:"Represents domain naming templates in go-template format. It is using for generating the domain name for each nse/nsc in the vl3 network" split_words:"true"` LogLevel string `default:"INFO" desc:"Log level" split_words:"true"` dnsServerAddr net.IP @@ -132,70 +133,63 @@ func (c *Config) Process() error { } func startListenPrefixes(ctx context.Context, c *Config, tlsClientConfig *tls.Config, subscriptions []chan *ipam.PrefixResponse) { - var previousResponse *ipam.PrefixResponse - go func() { - var cc *grpc.ClientConn - var err error - for ctx.Err() == nil { - // Close the previous clientConn - if cc != nil { - _ = cc.Close() - } - dialCtx, dialCtxCancel := context.WithTimeout(ctx, time.Millisecond*200) - cc, err = grpc.DialContext(dialCtx, - grpcutils.URLToTarget(&c.PrefixServerURL), - grpc.WithBlock(), - grpc.WithTransportCredentials( - credentials.NewTLS( - tlsClientConfig, + for i := range c.PrefixServerURL { + prefixServerURL := &c.PrefixServerURL[i] + go func(i int, prefixServerURL *url.URL) { + log.FromContext(ctx).Infof("Start listening prefix server %d: %s", i, prefixServerURL) + var previousResponse *ipam.PrefixResponse + var cc *grpc.ClientConn + var err error + for ctx.Err() == nil { + // Close the previous clientConn + if cc != nil { + _ = cc.Close() + } + dialCtx, dialCtxCancel := context.WithTimeout(ctx, time.Millisecond*200) + cc, err = grpc.DialContext(dialCtx, + grpcutils.URLToTarget(prefixServerURL), + grpc.WithBlock(), + grpc.WithTransportCredentials( + credentials.NewTLS( + tlsClientConfig, + ), ), - ), - ) - // It is safe to cancel dial ctx after DialContext if WithBlock() option is used - dialCtxCancel() - if err != nil { - logrus.Error(err.Error()) - continue - } + ) + // It is safe to cancel dial ctx after DialContext if WithBlock() option is used + dialCtxCancel() + if err != nil { + logrus.Error(err.Error()) + continue + } - managePrefixClient, err := ipam.NewIPAMClient(cc).ManagePrefixes(ctx) - if err != nil { - logrus.Error(err.Error()) - continue - } + managePrefixClient, err := ipam.NewIPAMClient(cc).ManagePrefixes(ctx) + if err != nil { + logrus.Error(err.Error()) + continue + } - request := &ipam.PrefixRequest{ - Type: ipam.Type_ALLOCATE, - Prefix: previousResponse.GetPrefix(), - } + request := &ipam.PrefixRequest{ + Type: ipam.Type_ALLOCATE, + Prefix: previousResponse.GetPrefix(), + } - err = managePrefixClient.Send(request) + err = managePrefixClient.Send(request) - if err != nil { - continue - } + if err != nil { + continue + } - for resp, recvErr := managePrefixClient.Recv(); recvErr == nil; resp, recvErr = managePrefixClient.Recv() { - if !proto.Equal(previousResponse, resp) { - previousResponse = resp - for _, sub := range subscriptions { - select { - case sub <- resp: - default: - } + for resp, recvErr := managePrefixClient.Recv(); recvErr == nil; resp, recvErr = managePrefixClient.Recv() { + if !proto.Equal(previousResponse, resp) { + previousResponse = resp + subscriptions[i] <- resp } } } - } - }() + }(i, prefixServerURL) + } } -const ( - serverSubscriptionIdx = iota - clientSubscriptionIdx - totalSubscriptions -) - func main() { // ******************************************************************************** // setup context to catch signals @@ -374,21 +368,22 @@ func main() { listenOn := &(url.URL{Scheme: "unix", Path: filepath.Join(tmpDir, config.ListenOn)}) // ******************************************************************************** - log.FromContext(ctx).Infof("executing phase 6.2: create and register nse with nsm") + log.FromContext(ctx).Infof("executing phase 6: create and register nse with nsm") // ******************************************************************************** - - var subscribedChannels []chan *ipam.PrefixResponse - for i := 0; i < totalSubscriptions; i++ { - subscribedChannels = append(subscribedChannels, make(chan *ipam.PrefixResponse, 1)) + prefixServerCount := len(config.PrefixServerURL) + var subscriptions []chan *ipam.PrefixResponse + for i := 0; i < prefixServerCount; i++ { + subscriptions = append(subscriptions, make(chan *ipam.PrefixResponse, 1)) } var closeSubscribedChannels = func() { - for i := 0; i < totalSubscriptions; i++ { - close(subscribedChannels[i]) + for i := 0; i < prefixServerCount; i++ { + close(subscriptions[i]) } } - startListenPrefixes(ctx, config, tlsClientConfig, subscribedChannels) + startListenPrefixes(ctx, config, tlsClientConfig, subscriptions) + ipams := extractIPAMList(ctx, subscriptions) - server := createVl3Endpoint(ctx, cancel, config, vppConn, tlsServerConfig, source, loopOptions, vrfOptions, subscribedChannels[serverSubscriptionIdx]) + server := createVl3Endpoint(ctx, cancel, config, vppConn, tlsServerConfig, source, loopOptions, vrfOptions, ipams) srvErrCh := grpcutils.ListenAndServe(ctx, listenOn, server) exitOnErr(ctx, cancel, srvErrCh) @@ -448,7 +443,7 @@ func main() { config.dnsServerAddr = conn.GetContext().GetIpContext().GetSrcIPNets()[0].IP config.dnsServerAddrCh <- conn.GetContext().GetIpContext().GetSrcIPNets()[0].IP - vl3Client := createVl3Client(ctx, config, vppConn, tlsClientConfig, source, loopOptions, vrfOptions, subscribedChannels[clientSubscriptionIdx], clientAdditionalFunctionality...) + vl3Client := createVl3Client(ctx, config, vppConn, tlsClientConfig, source, loopOptions, vrfOptions, ipams, clientAdditionalFunctionality...) for _, nse := range nseList { if nse.Name == config.Name { continue @@ -498,8 +493,31 @@ func main() { closeSubscribedChannels() } +func extractIPAMList(ctx context.Context, subscriptions []chan *ipam.PrefixResponse) []*vl3.IPAM { + ipams := make([]*vl3.IPAM, len(subscriptions)) + + for i := range ipams { + ipams[i] = new(vl3.IPAM) + } + + handleChannel := func(prefixCh <-chan *ipam.PrefixResponse, ipam *vl3.IPAM) { + for prefix := range prefixCh { + err := ipam.Reset(prefix.Prefix, prefix.ExcludePrefixes...) + if err != nil { + log.FromContext(ctx).Errorf("failed to reset vl3 IPAM pool: %s", err.Error()) + } + } + } + + for i, ch := range subscriptions { + go handleChannel(ch, ipams[i]) + } + + return ipams +} + func createVl3Client(ctx context.Context, config *Config, vppConn vpphelper.Connection, tlsClientConfig *tls.Config, source x509svid.Source, - loopOpts []loopback.Option, vrfOpts []vrf.Option, prefixCh <-chan *ipam.PrefixResponse, clientAdditionalFunctionality ...networkservice.NetworkServiceClient) networkservice.NetworkServiceClient { + loopOpts []loopback.Option, vrfOpts []vrf.Option, ipams []*vl3.IPAM, clientAdditionalFunctionality ...networkservice.NetworkServiceClient) networkservice.NetworkServiceClient { dialOptions := append(tracing.WithTracingDial(), grpcfd.WithChainStreamInterceptor(), grpcfd.WithChainUnaryInterceptor(), @@ -516,16 +534,6 @@ func createVl3Client(ctx context.Context, config *Config, vppConn vpphelper.Conn ), ) - var clientIpam vl3.IPAM - go func() { - for prefix := range prefixCh { - err := clientIpam.Reset(prefix.Prefix, prefix.ExcludePrefixes...) - if err != nil { - log.FromContext(ctx).Errorf("failed to reset vl3 client IPAM pool: %s", err.Error()) - } - } - }() - c := client.NewClient( ctx, client.WithClientURL(&config.ConnectTo), @@ -533,7 +541,7 @@ func createVl3Client(ctx context.Context, config *Config, vppConn vpphelper.Conn client.WithAdditionalFunctionality( append( clientAdditionalFunctionality, - vl3.NewClient(ctx, &clientIpam), + newMultiIPAMClient(ctx, ipams), vl3dns.NewClient(config.dnsServerAddr, &config.dnsConfigs), up.NewClient(ctx, vppConn, up.WithLoadSwIfIndex(loopback.Load)), ipaddress.NewClient(vppConn, ipaddress.WithLoadSwIfIndex(loopback.Load)), @@ -556,18 +564,16 @@ func createVl3Client(ctx context.Context, config *Config, vppConn vpphelper.Conn return retry.NewClient(c) } -func createVl3Endpoint(ctx context.Context, cancel context.CancelFunc, config *Config, vppConn vpphelper.Connection, tlsServerConfig *tls.Config, - source x509svid.Source, loopOpts []loopback.Option, vrfOpts []vrf.Option, prefixCh <-chan *ipam.PrefixResponse) *grpc.Server { - var serverIpam vl3.IPAM - go func() { - for prefix := range prefixCh { - err := serverIpam.Reset(prefix.Prefix, prefix.ExcludePrefixes...) - if err != nil { - log.FromContext(ctx).Errorf("failed to reset vl3 server IPAM pool: %s", err.Error()) - } - } - }() +func newMultiIPAMClient(ctx context.Context, ipams []*vl3.IPAM) networkservice.NetworkServiceClient { + var clients []networkservice.NetworkServiceClient + for _, ipam := range ipams { + clients = append(clients, vl3.NewClient(ctx, ipam)) + } + return next.NewNetworkServiceClient(clients...) +} +func createVl3Endpoint(ctx context.Context, cancel context.CancelFunc, config *Config, vppConn vpphelper.Connection, tlsServerConfig *tls.Config, + source x509svid.Source, loopOpts []loopback.Option, vrfOpts []vrf.Option, ipams []*vl3.IPAM) *grpc.Server { vl3Endpoint := endpoint.NewServer(ctx, spiffejwt.TokenGeneratorFunc(source, config.MaxTokenLifetime), endpoint.WithName(config.Name), @@ -580,7 +586,7 @@ func createVl3Endpoint(ctx context.Context, cancel context.CancelFunc, config *C vl3dns.WithConfigs(&config.dnsConfigs), ), vl3mtu.NewServer(), - strictvl3ipam.NewServer(ctx, vl3.NewServer, &serverIpam), + strictvl3ipam.NewServer(ctx, vl3.NewServer, ipams...), up.NewServer(ctx, vppConn, up.WithLoadSwIfIndex(loopback.Load)), ipaddress.NewServer(vppConn, ipaddress.WithLoadSwIfIndex(loopback.Load)), unnumbered.NewServer(vppConn, loopback.Load),