diff --git a/contrib/completions/bash/openshift b/contrib/completions/bash/openshift index 61811e085b91..ea83d06b799c 100644 --- a/contrib/completions/bash/openshift +++ b/contrib/completions/bash/openshift @@ -19713,6 +19713,8 @@ _openshift_infra_f5-router() local_nonpersistent_flags+=("--f5-https-vserver=") flags+=("--f5-insecure") local_nonpersistent_flags+=("--f5-insecure") + flags+=("--f5-internal-address=") + local_nonpersistent_flags+=("--f5-internal-address=") flags+=("--f5-partition-path=") local_nonpersistent_flags+=("--f5-partition-path=") flags+=("--f5-password=") @@ -19721,6 +19723,8 @@ _openshift_infra_f5-router() local_nonpersistent_flags+=("--f5-private-key=") flags+=("--f5-username=") local_nonpersistent_flags+=("--f5-username=") + flags+=("--f5-vxlan-gateway-cidr=") + local_nonpersistent_flags+=("--f5-vxlan-gateway-cidr=") flags+=("--fields=") local_nonpersistent_flags+=("--fields=") flags+=("--hostname-template=") diff --git a/contrib/completions/zsh/openshift b/contrib/completions/zsh/openshift index 7054b16cfcb6..677adb6a51c2 100644 --- a/contrib/completions/zsh/openshift +++ b/contrib/completions/zsh/openshift @@ -19874,6 +19874,8 @@ _openshift_infra_f5-router() local_nonpersistent_flags+=("--f5-https-vserver=") flags+=("--f5-insecure") local_nonpersistent_flags+=("--f5-insecure") + flags+=("--f5-internal-address=") + local_nonpersistent_flags+=("--f5-internal-address=") flags+=("--f5-partition-path=") local_nonpersistent_flags+=("--f5-partition-path=") flags+=("--f5-password=") @@ -19882,6 +19884,8 @@ _openshift_infra_f5-router() local_nonpersistent_flags+=("--f5-private-key=") flags+=("--f5-username=") local_nonpersistent_flags+=("--f5-username=") + flags+=("--f5-vxlan-gateway-cidr=") + local_nonpersistent_flags+=("--f5-vxlan-gateway-cidr=") flags+=("--fields=") local_nonpersistent_flags+=("--fields=") flags+=("--hostname-template=") diff --git a/docs/man/man1/openshift-infra-f5-router.1 b/docs/man/man1/openshift-infra-f5-router.1 index cb780ed4c3d5..5208441a4a48 100644 --- a/docs/man/man1/openshift-infra-f5-router.1 +++ b/docs/man/man1/openshift-infra-f5-router.1 @@ -71,6 +71,10 @@ You may restrict the set of routes exposed to a single project (with \-\-namespa \fB\-\-f5\-insecure\fP=false Skip strict certificate verification +.PP +\fB\-\-f5\-internal\-address\fP="" + The F5 BIG\-IP internal interface's IP address + .PP \fB\-\-f5\-partition\-path\fP="/Common" The F5 BIG\-IP partition path to use @@ -87,6 +91,10 @@ You may restrict the set of routes exposed to a single project (with \-\-namespa \fB\-\-f5\-username\fP="" The username for F5 BIG\-IP's management utility +.PP +\fB\-\-f5\-vxlan\-gateway\-cidr\fP="" + The F5 BIG\-IP gateway\-ip\-address/cidr\-mask for setting up the VxLAN + .PP \fB\-\-fields\fP="" A field selector to apply to routes to watch diff --git a/pkg/cmd/infra/router/f5.go b/pkg/cmd/infra/router/f5.go index d7cc683d44a0..257272efbec8 100644 --- a/pkg/cmd/infra/router/f5.go +++ b/pkg/cmd/infra/router/f5.go @@ -76,6 +76,17 @@ type F5Router struct { // normally used to create access control boundaries for users // and applications. PartitionPath string + + // VxlanGateway is the ip address assigned to the local tunnel interface + // inside F5 box. This address is the one that the packets generated from F5 + // will carry. The pods will return the packets to this address itself. + // It is important that the gateway be one of the ip addresses of the subnet + // that has been generated for F5. + VxlanGateway string + + // InternalAddress is the ip address of the vtep interface used to connect to + // VxLAN overlay. It is the hostIP address listed in the subnet generated for F5 + InternalAddress string } // Bind binds F5Router arguments to flags @@ -89,6 +100,8 @@ func (o *F5Router) Bind(flag *pflag.FlagSet) { flag.StringVar(&o.PrivateKey, "f5-private-key", util.Env("ROUTER_EXTERNAL_HOST_PRIVKEY", ""), "The path to the F5 BIG-IP SSH private key file") flag.BoolVar(&o.Insecure, "f5-insecure", util.Env("ROUTER_EXTERNAL_HOST_INSECURE", "") == "true", "Skip strict certificate verification") flag.StringVar(&o.PartitionPath, "f5-partition-path", util.Env("ROUTER_EXTERNAL_HOST_PARTITION_PATH", f5plugin.F5DefaultPartitionPath), "The F5 BIG-IP partition path to use") + flag.StringVar(&o.InternalAddress, "f5-internal-address", util.Env("ROUTER_EXTERNAL_HOST_INTERNAL_ADDRESS", ""), "The F5 BIG-IP internal interface's IP address") + flag.StringVar(&o.VxlanGateway, "f5-vxlan-gateway-cidr", util.Env("ROUTER_EXTERNAL_HOST_VXLAN_GW_CIDR", ""), "The F5 BIG-IP gateway-ip-address/cidr-mask for setting up the VxLAN") } // Validate verifies the required F5 flags are present @@ -109,6 +122,11 @@ func (o *F5Router) Validate() error { return errors.New("F5 HTTP and HTTPS vservers cannot both be blank") } + valid := (len(o.VxlanGateway) == 0 && len(o.InternalAddress) == 0) || (len(o.VxlanGateway) != 0 && len(o.InternalAddress) != 0) + if !valid { + return errors.New("For VxLAN setup, both internal-address and gateway-cidr must be specified") + } + return nil } @@ -158,14 +176,16 @@ func (o *F5RouterOptions) Validate() error { // Run launches an F5 route sync process using the provided options. It never exits. func (o *F5RouterOptions) Run() error { cfg := f5plugin.F5PluginConfig{ - Host: o.Host, - Username: o.Username, - Password: o.Password, - HttpVserver: o.HttpVserver, - HttpsVserver: o.HttpsVserver, - PrivateKey: o.PrivateKey, - Insecure: o.Insecure, - PartitionPath: o.PartitionPath, + Host: o.Host, + Username: o.Username, + Password: o.Password, + HttpVserver: o.HttpVserver, + HttpsVserver: o.HttpsVserver, + PrivateKey: o.PrivateKey, + Insecure: o.Insecure, + PartitionPath: o.PartitionPath, + InternalAddress: o.InternalAddress, + VxlanGateway: o.VxlanGateway, } f5Plugin, err := f5plugin.NewF5Plugin(cfg) if err != nil { @@ -181,7 +201,8 @@ func (o *F5RouterOptions) Run() error { plugin := controller.NewUniqueHost(statusPlugin, o.RouteSelectionFunc(), statusPlugin) factory := o.RouterSelection.NewFactory(oc, kc) - controller := factory.Create(plugin) + watchNodes := (len(o.InternalAddress) != 0 && len(o.VxlanGateway) != 0) + controller := factory.Create(plugin, watchNodes) controller.Run() select {} diff --git a/pkg/cmd/infra/router/template.go b/pkg/cmd/infra/router/template.go index a3b0a7883040..a141238ef017 100644 --- a/pkg/cmd/infra/router/template.go +++ b/pkg/cmd/infra/router/template.go @@ -210,7 +210,7 @@ func (o *TemplateRouterOptions) Run() error { plugin := controller.NewUniqueHost(nextPlugin, o.RouteSelectionFunc(), controller.RejectionRecorder(statusPlugin)) factory := o.RouterSelection.NewFactory(oc, kc) - controller := factory.Create(plugin) + controller := factory.Create(plugin, false) controller.Run() proc.StartReaper() diff --git a/pkg/cmd/server/bootstrappolicy/policy.go b/pkg/cmd/server/bootstrappolicy/policy.go index 7a3ddcec7509..d52a2bfef40e 100644 --- a/pkg/cmd/server/bootstrappolicy/policy.go +++ b/pkg/cmd/server/bootstrappolicy/policy.go @@ -531,6 +531,7 @@ func GetBootstrapClusterRoles() []authorizationapi.ClusterRole { Rules: []authorizationapi.PolicyRule{ authorizationapi.NewRule("list", "watch").Groups(kapiGroup).Resources("endpoints").RuleOrDie(), authorizationapi.NewRule("list", "watch").Groups(kapiGroup).Resources("services").RuleOrDie(), + authorizationapi.NewRule("list", "watch").Groups(kapiGroup).Resources("nodes").RuleOrDie(), authorizationapi.NewRule("list", "watch").Groups(routeGroup).Resources("routes").RuleOrDie(), authorizationapi.NewRule("update").Groups(routeGroup).Resources("routes/status").RuleOrDie(), diff --git a/pkg/router/controller/controller.go b/pkg/router/controller/controller.go index 53cc45ea9866..2a125280c946 100644 --- a/pkg/router/controller/controller.go +++ b/pkg/router/controller/controller.go @@ -28,6 +28,7 @@ type RouterController struct { Plugin router.Plugin NextRoute func() (watch.EventType, *routeapi.Route, error) + NextNode func() (watch.EventType, *kapi.Node, error) NextEndpoints func() (watch.EventType, *kapi.Endpoints, error) RoutesListConsumed func() bool @@ -36,6 +37,8 @@ type RouterController struct { endpointsListConsumed bool filteredByNamespace bool + WatchNodes bool + Namespaces NamespaceLister NamespaceSyncInterval time.Duration NamespaceWaitInterval time.Duration @@ -51,6 +54,9 @@ func (c *RouterController) Run() { } go utilwait.Forever(c.HandleRoute, 0) go utilwait.Forever(c.HandleEndpoints, 0) + if c.WatchNodes { + go utilwait.Forever(c.HandleNode, 0) + } } func (c *RouterController) HandleNamespaces() { @@ -78,6 +84,25 @@ func (c *RouterController) HandleNamespaces() { glog.V(4).Infof("Unable to update list of namespaces") } +// HandleNode handles a single Node event and synchronizes the router backend +func (c *RouterController) HandleNode() { + eventType, node, err := c.NextNode() + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to read nodes: %v", err)) + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + glog.V(4).Infof("Processing Node : %s", node.Name) + glog.V(4).Infof(" Event: %s", eventType) + + if err := c.Plugin.HandleNode(eventType, node); err != nil { + utilruntime.HandleError(err) + } +} + // HandleRoute handles a single Route event and synchronizes the router backend. func (c *RouterController) HandleRoute() { eventType, route, err := c.NextRoute() diff --git a/pkg/router/controller/controller_test.go b/pkg/router/controller/controller_test.go index b11dfbaf3adb..b3c183d6e16e 100644 --- a/pkg/router/controller/controller_test.go +++ b/pkg/router/controller/controller_test.go @@ -17,6 +17,9 @@ type fakeRouterPlugin struct { func (p *fakeRouterPlugin) HandleRoute(t watch.EventType, route *routeapi.Route) error { return nil } +func (p *fakeRouterPlugin) HandleNode(t watch.EventType, node *kapi.Node) error { + return nil +} func (p *fakeRouterPlugin) HandleEndpoints(watch.EventType, *kapi.Endpoints) error { return nil } @@ -46,6 +49,9 @@ func TestRouterController_updateLastSyncProcessed(t *testing.T) { NextRoute: func() (watch.EventType, *routeapi.Route, error) { return watch.Modified, &routeapi.Route{}, nil }, + NextNode: func() (watch.EventType, *kapi.Node, error) { + return watch.Modified, &kapi.Node{}, nil + }, EndpointsListConsumed: func() bool { return true }, diff --git a/pkg/router/controller/extended_validator.go b/pkg/router/controller/extended_validator.go index 7463698358e2..e87b1fab6573 100644 --- a/pkg/router/controller/extended_validator.go +++ b/pkg/router/controller/extended_validator.go @@ -38,6 +38,11 @@ func NewExtendedValidator(plugin router.Plugin, recorder RejectionRecorder) *Ext } } +// HandleNode processes watch events on the node resource +func (p *ExtendedValidator) HandleNode(eventType watch.EventType, node *kapi.Node) error { + return p.plugin.HandleNode(eventType, node) +} + // HandleEndpoints processes watch events on the Endpoints resource. func (p *ExtendedValidator) HandleEndpoints(eventType watch.EventType, endpoints *kapi.Endpoints) error { return p.plugin.HandleEndpoints(eventType, endpoints) diff --git a/pkg/router/controller/factory/factory.go b/pkg/router/controller/factory/factory.go index 19592bdc3fa6..40ed873c0c30 100644 --- a/pkg/router/controller/factory/factory.go +++ b/pkg/router/controller/factory/factory.go @@ -27,6 +27,7 @@ import ( type RouterControllerFactory struct { KClient kclient.EndpointsNamespacer OSClient osclient.RoutesNamespacer + NodeClient kclient.NodesInterface Namespaces controller.NamespaceLister ResyncInterval time.Duration Namespace string @@ -35,10 +36,11 @@ type RouterControllerFactory struct { } // NewDefaultRouterControllerFactory initializes a default router controller factory. -func NewDefaultRouterControllerFactory(oc osclient.RoutesNamespacer, kc kclient.EndpointsNamespacer) *RouterControllerFactory { +func NewDefaultRouterControllerFactory(oc osclient.RoutesNamespacer, kc kclient.Interface) *RouterControllerFactory { return &RouterControllerFactory{ KClient: kc, OSClient: oc, + NodeClient: kc, ResyncInterval: 10 * time.Minute, Namespace: kapi.NamespaceAll, @@ -49,7 +51,7 @@ func NewDefaultRouterControllerFactory(oc osclient.RoutesNamespacer, kc kclient. // Create begins listing and watching against the API server for the desired route and endpoint // resources. It spawns child goroutines that cannot be terminated. -func (factory *RouterControllerFactory) Create(plugin router.Plugin) *controller.RouterController { +func (factory *RouterControllerFactory) Create(plugin router.Plugin, watchNodes bool) *controller.RouterController { routeEventQueue := oscache.NewEventQueue(cache.MetaNamespaceKeyFunc) cache.NewReflector(&routeLW{ client: factory.OSClient, @@ -65,6 +67,15 @@ func (factory *RouterControllerFactory) Create(plugin router.Plugin) *controller // we do not scope endpoints by labels or fields because the route labels != endpoints labels }, &kapi.Endpoints{}, endpointsEventQueue, factory.ResyncInterval).Run() + nodeEventQueue := oscache.NewEventQueue(cache.MetaNamespaceKeyFunc) + if watchNodes { + cache.NewReflector(&nodeLW{ + client: factory.NodeClient, + field: fields.Everything(), + label: labels.Everything(), + }, &kapi.Node{}, nodeEventQueue, factory.ResyncInterval).Run() + } + return &controller.RouterController{ Plugin: plugin, NextEndpoints: func() (watch.EventType, *kapi.Endpoints, error) { @@ -81,6 +92,13 @@ func (factory *RouterControllerFactory) Create(plugin router.Plugin) *controller } return eventType, obj.(*routeapi.Route), nil }, + NextNode: func() (watch.EventType, *kapi.Node, error) { + eventType, obj, err := nodeEventQueue.Pop() + if err != nil { + return watch.Error, nil, err + } + return eventType, obj.(*kapi.Node), nil + }, EndpointsListConsumed: func() bool { return endpointsEventQueue.ListConsumed() }, @@ -94,6 +112,7 @@ func (factory *RouterControllerFactory) Create(plugin router.Plugin) *controller NamespaceSyncInterval: factory.ResyncInterval - 10*time.Second, NamespaceWaitInterval: 10 * time.Second, NamespaceRetries: 5, + WatchNodes: watchNodes, } } @@ -256,3 +275,23 @@ func (lw *endpointsLW) Watch(options kapi.ListOptions) (watch.Interface, error) } return lw.client.Endpoints(lw.namespace).Watch(opts) } + +// nodeLW is a list watcher for nodes. +type nodeLW struct { + client kclient.NodesInterface + label labels.Selector + field fields.Selector +} + +func (lw *nodeLW) List(options kapi.ListOptions) (runtime.Object, error) { + return lw.client.Nodes().List(options) +} + +func (lw *nodeLW) Watch(options kapi.ListOptions) (watch.Interface, error) { + opts := kapi.ListOptions{ + LabelSelector: lw.label, + FieldSelector: lw.field, + ResourceVersion: options.ResourceVersion, + } + return lw.client.Nodes().Watch(opts) +} diff --git a/pkg/router/controller/status.go b/pkg/router/controller/status.go index ff303f687bf3..e67d9034c63e 100644 --- a/pkg/router/controller/status.go +++ b/pkg/router/controller/status.go @@ -295,6 +295,10 @@ func (a *StatusAdmitter) HandleRoute(eventType watch.EventType, route *routeapi. return a.plugin.HandleRoute(eventType, route) } +func (a *StatusAdmitter) HandleNode(eventType watch.EventType, node *kapi.Node) error { + return a.plugin.HandleNode(eventType, node) +} + func (a *StatusAdmitter) HandleEndpoints(eventType watch.EventType, route *kapi.Endpoints) error { return a.plugin.HandleEndpoints(eventType, route) } diff --git a/pkg/router/controller/status_test.go b/pkg/router/controller/status_test.go index 30865ffcb6b6..cc748f2ad404 100644 --- a/pkg/router/controller/status_test.go +++ b/pkg/router/controller/status_test.go @@ -28,6 +28,11 @@ func (p *fakePlugin) HandleRoute(t watch.EventType, route *routeapi.Route) error p.t, p.route = t, route return p.err } + +func (p *fakePlugin) HandleNode(t watch.EventType, node *kapi.Node) error { + return fmt.Errorf("not expected") +} + func (p *fakePlugin) HandleEndpoints(watch.EventType, *kapi.Endpoints) error { return fmt.Errorf("not expected") } diff --git a/pkg/router/controller/unique_host.go b/pkg/router/controller/unique_host.go index f99fe2d1846c..ca63eefecd38 100644 --- a/pkg/router/controller/unique_host.go +++ b/pkg/router/controller/unique_host.go @@ -84,6 +84,11 @@ func (p *UniqueHost) HandleEndpoints(eventType watch.EventType, endpoints *kapi. return p.plugin.HandleEndpoints(eventType, endpoints) } +// HandleNode processes watch events on the Node resource and calls the router +func (p *UniqueHost) HandleNode(eventType watch.EventType, node *kapi.Node) error { + return p.plugin.HandleNode(eventType, node) +} + // HandleRoute processes watch events on the Route resource. // TODO: this function can probably be collapsed with the router itself, as a function that // determines which component needs to be recalculated (which template) and then does so diff --git a/pkg/router/f5/f5.go b/pkg/router/f5/f5.go index e1846fc4f20a..83fc260ec787 100644 --- a/pkg/router/f5/f5.go +++ b/pkg/router/f5/f5.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" "os" "os/exec" @@ -21,6 +22,9 @@ import ( const ( // Default F5 partition path to use for syncing route config. F5DefaultPartitionPath = "/Common" + F5VxLANTunnelName = "vxlan5000" + F5VxLANProfileName = "vxlan-ose" + HTTP_CONFLICT_CODE = 409 ) // Error implements the error interface. @@ -104,6 +108,21 @@ type f5LTMCfg struct { // are normally used to create an access control boundary for // F5 users and applications. partitionPath string + + // vxlanGateway is the ip address assigned to the local tunnel interface + // inside F5 box. This address is the one that the packets generated from F5 + // will carry. The pods will return the packets to this address itself. + // It is important that the gateway be one of the ip addresses of the subnet + // that has been generated for F5. + vxlanGateway string + + // internalAddress is the ip address of the vtep interface used to connect to + // VxLAN overlay. It is the hostIP address listed in the subnet generated for F5 + internalAddress string + + // setupOSDNVxLAN is the boolean that conveys if F5 needs to setup a VxLAN + // to hook up with openshift-sdn + setupOSDNVxLAN bool } const ( @@ -327,17 +346,21 @@ func newF5LTM(cfg f5LTMCfg) (*f5LTM, error) { // Ensure path is rooted. partitionPath = path.Join("/", partitionPath) + setupOSDNVxLAN := (len(cfg.vxlanGateway) != 0 && len(cfg.internalAddress) != 0) router := &f5LTM{ f5LTMCfg: f5LTMCfg{ - host: cfg.host, - username: cfg.username, - password: cfg.password, - httpVserver: cfg.httpVserver, - httpsVserver: cfg.httpsVserver, - privkey: privkeyFileName, - insecure: cfg.insecure, - partitionPath: partitionPath, + host: cfg.host, + username: cfg.username, + password: cfg.password, + httpVserver: cfg.httpVserver, + httpsVserver: cfg.httpsVserver, + privkey: privkeyFileName, + insecure: cfg.insecure, + partitionPath: partitionPath, + vxlanGateway: cfg.vxlanGateway, + internalAddress: cfg.internalAddress, + setupOSDNVxLAN: setupOSDNVxLAN, }, poolMembers: map[string]map[string]bool{}, routes: map[string]map[string]bool{}, @@ -395,6 +418,7 @@ func (f5 *f5LTM) restRequest(verb string, url string, payload io.Reader, client := &http.Client{Transport: tr} + glog.V(4).Infof("Request sent: %v\n", req) resp, err := client.Do(req) if err != nil { errorResult.err = fmt.Errorf("client.Do failed: %v", err) @@ -461,6 +485,68 @@ func (f5 *f5LTM) delete(url string, result interface{}) error { // Routines for controlling F5. // +// ensureVxLANTunnel sets up the VxLAN tunnel profile and tunnel+selfIP +func (f5 *f5LTM) ensureVxLANTunnel() error { + glog.V(4).Infof("Checking and installing VxLAN setup") + + // create the profile + url := fmt.Sprintf("https://%s/mgmt/tm/net/tunnels/vxlan", f5.host) + profilePayload := f5CreateVxLANProfilePayload{ + Name: F5VxLANProfileName, + Partition: f5.partitionPath, + FloodingType: "multipoint", + Port: 4789, + } + err := f5.post(url, profilePayload, nil) + if err != nil && err.(F5Error).httpStatusCode != HTTP_CONFLICT_CODE { + // error HTTP_CONFLICT_CODE is fine, it just means the tunnel profile already exists + glog.V(4).Infof("Error while creating vxlan tunnel - %v", err) + return err + } + + // create the tunnel + url = fmt.Sprintf("https://%s/mgmt/tm/net/tunnels/tunnel", f5.host) + tunnelPayload := f5CreateVxLANTunnelPayload{ + Name: F5VxLANTunnelName, + Partition: f5.partitionPath, + Key: 0, + LocalAddress: f5.internalAddress, + Mode: "bidirectional", + Mtu: "0", + Profile: path.Join(f5.partitionPath, F5VxLANProfileName), + Tos: "preserve", + Transparent: "disabled", + UsePmtu: "enabled", + } + err = f5.post(url, tunnelPayload, nil) + if err != nil && err.(F5Error).httpStatusCode != HTTP_CONFLICT_CODE { + // error HTTP_CONFLICT_CODE is fine, it just means the tunnel already exists + return err + } + + selfUrl := fmt.Sprintf("https://%s/mgmt/tm/net/self", f5.host) + netSelfPayload := f5CreateNetSelfPayload{ + Name: f5.vxlanGateway, + Partition: f5.partitionPath, + Address: f5.vxlanGateway, + AddressSource: "from-user", + Floating: "disabled", + InheritedTrafficGroup: "false", + TrafficGroup: path.Join(f5.partitionPath, "traffic-group-local-only"), + Unit: 0, + Vlan: path.Join(f5.partitionPath, F5VxLANTunnelName), + AllowService: "all", + } + // create the net self IP + err = f5.post(selfUrl, netSelfPayload, nil) + if err != nil && err.(F5Error).httpStatusCode != HTTP_CONFLICT_CODE { + // error HTTP_CONFLICT_CODE is ok, netSelf already exists + return err + } + + return nil +} + // ensurePolicyExists checks whether the specified policy exists and creates it // if not. func (f5 *f5LTM) ensurePolicyExists(policyName string) error { @@ -484,14 +570,28 @@ func (f5 *f5LTM) ensurePolicyExists(policyName string) error { policiesUrl := fmt.Sprintf("https://%s/mgmt/tm/ltm/policy", f5.host) - policyPayload := f5Policy{ - Name: policyName, - Controls: []string{"forwarding"}, - Requires: []string{"http"}, - Strategy: "best-match", + if f5.setupOSDNVxLAN { + // if vxlan needs to be setup, it will only happen + // with ver12, for which we need to use a different payload + policyPayload := f5Ver12Policy{ + Name: policyName, + TmPartition: f5.partitionPath, + Controls: []string{"forwarding"}, + Requires: []string{"http"}, + Strategy: "best-match", + Legacy: true, + } + err = f5.post(policiesUrl, policyPayload, nil) + } else { + policyPayload := f5Policy{ + Name: policyName, + Controls: []string{"forwarding"}, + Requires: []string{"http"}, + Strategy: "best-match", + } + err = f5.post(policiesUrl, policyPayload, nil) } - err = f5.post(policiesUrl, policyPayload, nil) if err != nil { return err } @@ -717,7 +817,7 @@ func (f5 *f5LTM) addPartitionPath(pathName string) (bool, error) { payload := f5AddPartitionPathPayload{Name: pathName} err := f5.post(uri, payload, nil) if err != nil { - if err.(F5Error).httpStatusCode != 409 { + if err.(F5Error).httpStatusCode != HTTP_CONFLICT_CODE { glog.Errorf("Error adding partition path %q error: %v", pathName, err) return false, err } @@ -828,11 +928,77 @@ func (f5 *f5LTM) Initialize() error { } } + if f5.setupOSDNVxLAN { + err = f5.ensureVxLANTunnel() + if err != nil { + return err + } + } + glog.V(4).Infof("F5 initialization is complete.") return nil } +func checkIPAndGetMac(ipStr string) (string, error) { + ip := net.ParseIP(ipStr) + if ip == nil { + errStr := fmt.Sprintf("vtep IP '%s' is not a valid IP address", ipStr) + glog.Warning(errStr) + return "", fmt.Errorf(errStr) + } + ip4 := ip.To4() + if ip4 == nil { + errStr := fmt.Sprintf("vtep IP '%s' is not a valid IPv4 address", ipStr) + glog.Warning(errStr) + return "", fmt.Errorf(errStr) + } + macAddr := fmt.Sprintf("0a:0a:%02x:%02x:%02x:%02x", ip4[0], ip4[1], ip4[2], ip4[3]) + return macAddr, nil +} + +// AddVtep adds the Vtep IP to the VxLAN device's FDB +func (f5 *f5LTM) AddVtep(ipStr string) error { + if !f5.setupOSDNVxLAN { + return nil + } + macAddr, err := checkIPAndGetMac(ipStr) + if err != nil { + return err + } + + err = f5.ensurePartitionPathExists(f5.partitionPath) + if err != nil { + return err + } + + url := fmt.Sprintf("https://%s/mgmt/tm/net/fdb/tunnel/%s~%s/records", f5.host, strings.Replace(f5.partitionPath, "/", "~", -1), F5VxLANTunnelName) + payload := f5AddFDBRecordPayload{ + Name: macAddr, + Endpoint: ipStr, + } + return f5.post(url, payload, nil) +} + +// RemoveVtep removes the Vtep IP from the VxLAN device's FDB +func (f5 *f5LTM) RemoveVtep(ipStr string) error { + if !f5.setupOSDNVxLAN { + return nil + } + macAddr, err := checkIPAndGetMac(ipStr) + if err != nil { + return err + } + + err = f5.ensurePartitionPathExists(f5.partitionPath) + if err != nil { + return err + } + + url := fmt.Sprintf("https://%s/mgmt/tm/net/fdb/tunnel/%s~%s/records/%s", f5.host, strings.Replace(f5.partitionPath, "/", "~", -1), F5VxLANTunnelName, macAddr) + return f5.delete(url, nil) +} + // CreatePool creates a pool named poolname on F5 BIG-IP. func (f5 *f5LTM) CreatePool(poolname string) error { url := fmt.Sprintf("https://%s/mgmt/tm/ltm/pool", f5.host) @@ -1095,7 +1261,7 @@ func (f5 *f5LTM) addRoute(policyname, routename, poolname, hostname, err := f5.post(rulesUrl, rulesPayload, nil) if err != nil { - if err.(F5Error).httpStatusCode == 409 { + if err.(F5Error).httpStatusCode == HTTP_CONFLICT_CODE { glog.V(4).Infof("Warning: Rule %s already exists; continuing with"+ " initialization in case the existing rule is only partially"+ " initialized...", routename) diff --git a/pkg/router/f5/plugin.go b/pkg/router/f5/plugin.go index c1937040030d..c0d769925285 100644 --- a/pkg/router/f5/plugin.go +++ b/pkg/router/f5/plugin.go @@ -9,6 +9,7 @@ import ( "k8s.io/kubernetes/pkg/watch" routeapi "github.com/openshift/origin/pkg/route/api" + "github.com/openshift/origin/pkg/util/netutils" ) // F5Plugin holds state for the f5 plugin. @@ -52,19 +53,32 @@ type F5PluginConfig struct { // PartitionPath specifies the F5 partition path to use. This is used // to create an access control boundary for users and applications. PartitionPath string + + // VxlanGateway is the ip address assigned to the local tunnel interface + // inside F5 box. This address is the one that the packets generated from F5 + // will carry. The pods will return the packets to this address itself. + // It is important that the gateway be one of the ip addresses of the subnet + // that has been generated for F5. + VxlanGateway string + + // InternalAddress is the ip address of the vtep interface used to connect to + // VxLAN overlay. It is the hostIP address listed in the subnet generated for F5 + InternalAddress string } // NewF5Plugin makes a new f5 router plugin. func NewF5Plugin(cfg F5PluginConfig) (*F5Plugin, error) { f5LTMCfg := f5LTMCfg{ - host: cfg.Host, - username: cfg.Username, - password: cfg.Password, - httpVserver: cfg.HttpVserver, - httpsVserver: cfg.HttpsVserver, - privkey: cfg.PrivateKey, - insecure: cfg.Insecure, - partitionPath: cfg.PartitionPath, + host: cfg.Host, + username: cfg.Username, + password: cfg.Password, + httpVserver: cfg.HttpVserver, + httpsVserver: cfg.HttpsVserver, + privkey: cfg.PrivateKey, + insecure: cfg.Insecure, + partitionPath: cfg.PartitionPath, + vxlanGateway: cfg.VxlanGateway, + internalAddress: cfg.InternalAddress, } f5, err := newF5LTM(f5LTMCfg) if err != nil { @@ -466,10 +480,54 @@ func (p *F5Plugin) deleteRoute(routename string) error { return nil } +func getNodeIP(node *kapi.Node) (string, error) { + if len(node.Status.Addresses) > 0 && node.Status.Addresses[0].Address != "" { + return node.Status.Addresses[0].Address, nil + } else { + return netutils.GetNodeIP(node.Name) + } +} + func (p *F5Plugin) HandleNamespaces(namespaces sets.String) error { return fmt.Errorf("namespace limiting for F5 is not implemented") } +func (p *F5Plugin) HandleNode(eventType watch.EventType, node *kapi.Node) error { + // The F5 appliance, if hooked to use the VxLAN encapsulation + // should have its FDB updated depending on nodes arriving and leaving the cluster + switch eventType { + case watch.Added: + // New VTEP created, add the record to the vxlan fdb + ip, err := getNodeIP(node) + if err != nil { + // just log the error + glog.Warningf("Error in obtaining IP address of newly added node %s - %v", node.Name, err) + return nil + } + err = p.F5Client.AddVtep(ip) + if err != nil { + glog.Errorf("Error in adding node '%s' to F5s FDB - %v", ip, err) + return err + } + case watch.Deleted: + // VTEP deleted, delete the record from vxlan fdb + ip, err := getNodeIP(node) + if err != nil { + // just log the error + glog.Warningf("Error in obtaining IP address of deleted node %s - %v", node.Name, err) + return nil + } + err = p.F5Client.RemoveVtep(ip) + if err != nil { + glog.Errorf("Error in removing node '%s' from F5s FDB - %v", ip, err) + return err + } + case watch.Modified: + // ignore the modified event. Change in IP address of the node is not supported. + } + return nil +} + // HandleRoute processes watch events on the Route resource and // creates and deletes policy rules in response. func (p *F5Plugin) HandleRoute(eventType watch.EventType, diff --git a/pkg/router/f5/types.go b/pkg/router/f5/types.go index 50c7ff552cd3..df12b29e1411 100644 --- a/pkg/router/f5/types.go +++ b/pkg/router/f5/types.go @@ -100,6 +100,37 @@ type f5PoolMemberset struct { Members []f5PoolMember `json:"items"` } +// f5Ver12Policy represents an F5 BIG-IP LTM policy for versions 12.x +// It describes the payload for a POST request by which the router creates a new policy. +type f5Ver12Policy struct { + // Name is the name of the policy. + Name string `json:"name"` + + // TmPartition is the partition name for the policy + TmPartition string `json:"tmPartition"` + + // Controls is a list of F5 BIG-IP LTM features enabled for the pool. + // Typically we use just forwarding; other possible values are caching, + // classification, compression, request-adaption, response-adaption, and + // server-ssl. + Controls []string `json:"controls"` + + // Requires is a list of available profile types. Typically we use just http; + // other possible values are client-ssl, ssl-persistence, and tcp. + Requires []string `json:"requires"` + + // Strategy is the strategy according to which rules are applied to incoming + // connections when more than one rule matches. Typically we use best-match; + // other possible values are all-match and first-match. + Strategy string `json:"strategy"` + + // Legacy is the boolean keyword by which ver12.1 can be programmed + // for creating a policy using this payload. Eventually we need to move + // to creating Draft policies and then associating them with the virtual servers + // Note that this keyword will only work with versions 12.1 and above + Legacy bool `json:"legacy"` +} + // f5Policy represents an F5 BIG-IP LTM policy. It describes the payload for // a POST request by which the F5 router creates a new policy. type f5Policy struct { @@ -293,3 +324,46 @@ type f5AddPartitionPathPayload struct { // Name is the partition path to be added. Name string `json:"name"` } + +// Method:POST URL:/mgmt/tm/net/tunnels/vxlan +type f5CreateVxLANProfilePayload struct { + Name string `json:"name"` // e.g. vxlan-ose + Partition string `json:"partition"` // /Common + FloodingType string `json:"floodingType"` // multipoint + Port int `json:"port"` // 4789 (nothing else will work) +} + +// Method:POST URL:/mgmt/tm/net/tunnels/tunnel +type f5CreateVxLANTunnelPayload struct { + Name string `json:"name"` // vxlan5000 + Partition string `json:"partition"` // /Common + Key uint32 `json:"key"` // 0 + LocalAddress string `json:"localAddress"` // 172.30.1.5 + Mode string `json:"mode"` // bidirectional + Mtu string `json:"mtu"` // 0 + Profile string `json:"profile"` // / + Tos string `json:"tos"` // preserve + Transparent string `json:"transparent"` // disabled + UsePmtu string `json:"usePmtu"` // enabled +} + +// tmsh create net self / vlan vxlan5000 +// Method: POST URL: /mgmt/tm/net/self +type f5CreateNetSelfPayload struct { + Name string `json:"name"` // “10.0.1.10/16", + Partition string `json:"partition"` // "Common", + Address string `json:"address"` // “10.0.1.10/16", + AddressSource string `json:"addressSource"` // "from-user", + Floating string `json:"floating"` // "disabled", + InheritedTrafficGroup string `json:"inheritedTrafficGroup"` // "false", + TrafficGroup string `json:"trafficGroup"` // "/Common/traffic-group-local-only", + Unit uint32 `json:"unit"` // 0, + Vlan string `json:"vlan"` // "/Common/vxlan5000", + AllowService string `json:"allowService"` // "all" +} + +// POST /mgmt/tm/net/fdb/tunnel/~Common~vxlan5000/records +type f5AddFDBRecordPayload struct { + Name string `json:"name"` // "02:50:56:c0:00:06", + Endpoint string `json:"endpoint"` // "10.139.1.1" +} diff --git a/pkg/router/interfaces.go b/pkg/router/interfaces.go index 462fce49a8fd..c4c74748a2ed 100644 --- a/pkg/router/interfaces.go +++ b/pkg/router/interfaces.go @@ -15,5 +15,6 @@ type Plugin interface { HandleEndpoints(watch.EventType, *kapi.Endpoints) error // If sent, filter the list of accepted routes and endpoints to this set HandleNamespaces(namespaces sets.String) error + HandleNode(watch.EventType, *kapi.Node) error SetLastSyncProcessed(processed bool) error } diff --git a/pkg/router/template/plugin.go b/pkg/router/template/plugin.go index 802587e988aa..52ef0a165d8e 100644 --- a/pkg/router/template/plugin.go +++ b/pkg/router/template/plugin.go @@ -176,6 +176,13 @@ func (p *TemplatePlugin) HandleEndpoints(eventType watch.EventType, endpoints *k return nil } +// HandleNode processes watch events on the Node resource +// The template type of plugin currently does not need to act on such events +// so the implementation just returns without error +func (p *TemplatePlugin) HandleNode(eventType watch.EventType, node *kapi.Node) error { + return nil +} + // HandleRoute processes watch events on the Route resource. // TODO: this function can probably be collapsed with the router itself, as a function that // determines which component needs to be recalculated (which template) and then does so diff --git a/test/extended/router/scoped.go b/test/extended/router/scoped.go index 9e0a2a526aab..9c530bc7c8a1 100644 --- a/test/extended/router/scoped.go +++ b/test/extended/router/scoped.go @@ -23,7 +23,9 @@ var _ = g.Describe("[networking][router] openshift routers", func() { g.BeforeEach(func() { // defer oc.Run("delete").Args("-f", configPath).Execute() - err := oc.Run("create").Args("-f", configPath).Execute() + err := oc.AsAdmin().Run("policy").Args("add-role-to-user", "system:router", oc.Username()).Execute() + o.Expect(err).NotTo(o.HaveOccurred()) + err = oc.Run("create").Args("-f", configPath).Execute() o.Expect(err).NotTo(o.HaveOccurred()) }) diff --git a/test/extended/router/weighted.go b/test/extended/router/weighted.go index b39c89f15351..7feaf66c6165 100644 --- a/test/extended/router/weighted.go +++ b/test/extended/router/weighted.go @@ -27,7 +27,9 @@ var _ = g.Describe("[networking][router] weighted openshift router", func() { g.BeforeEach(func() { // defer oc.Run("delete").Args("-f", configPath).Execute() - err := oc.Run("create").Args("-f", configPath).Execute() + err := oc.AsAdmin().Run("policy").Args("add-role-to-user", "system:router", oc.Username()).Execute() + o.Expect(err).NotTo(o.HaveOccurred()) + err = oc.Run("create").Args("-f", configPath).Execute() o.Expect(err).NotTo(o.HaveOccurred()) }) diff --git a/test/integration/router/router_http_server.go b/test/integration/router/router_http_server.go index 3df71bc4a465..7ca8d552a8f3 100644 --- a/test/integration/router/router_http_server.go +++ b/test/integration/router/router_http_server.go @@ -36,6 +36,8 @@ func GetDefaultLocalAddress() string { func NewTestHttpService() *TestHttpService { endpointChannel := make(chan string) routeChannel := make(chan string) + nodeChannel := make(chan string) + svcChannel := make(chan string) addr := GetDefaultLocalAddress() @@ -55,6 +57,8 @@ func NewTestHttpService() *TestHttpService { PodHttpsCaCert: []byte(ExampleCACert), EndpointChannel: endpointChannel, RouteChannel: routeChannel, + NodeChannel: nodeChannel, + SvcChannel: svcChannel, } } @@ -77,6 +81,8 @@ type TestHttpService struct { PodTestPath string EndpointChannel chan string RouteChannel chan string + NodeChannel chan string + SvcChannel chan string listeners []net.Listener } @@ -131,6 +137,30 @@ func (s *TestHttpService) handleHelloPodTestSecure(w http.ResponseWriter, r *htt fmt.Fprint(w, HelloPodPathSecure) } +// handleSvcList handles calls to /api/v1beta1/services and always returns empty data +func (s *TestHttpService) handleSvcList(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, "{}") +} + +// handleSvcWatch handles calls to /api/v1beta1/watch/services and uses the svc channel to simulate watch events +func (s *TestHttpService) handleSvcWatch(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + io.WriteString(w, <-s.SvcChannel) +} + +// handleNodeList handles calls to /api/v1beta1/nodes and always returns empty data +func (s *TestHttpService) handleNodeList(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, "{}") +} + +// handleNodeWatch handles calls to /api/v1beta1/watch/nodes and uses the node channel to simulate watch events +func (s *TestHttpService) handleNodeWatch(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + io.WriteString(w, <-s.NodeChannel) +} + // handleRouteWatch handles calls to /osapi/v1beta1/watch/routes and uses the route channel to simulate watch events func (s *TestHttpService) handleRouteWatch(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -208,6 +238,10 @@ func (s *TestHttpService) startMaster() error { masterServer.HandleFunc(fmt.Sprintf("/oapi/%s/routes", version), s.handleRouteList) masterServer.HandleFunc(fmt.Sprintf("/oapi/%s/namespaces/", version), s.handleRouteCalls) masterServer.HandleFunc(fmt.Sprintf("/oapi/%s/watch/routes", version), s.handleRouteWatch) + masterServer.HandleFunc(fmt.Sprintf("/api/%s/nodes", version), s.handleNodeList) + masterServer.HandleFunc(fmt.Sprintf("/api/%s/watch/nodes", version), s.handleNodeWatch) + masterServer.HandleFunc(fmt.Sprintf("/api/%s/services", version), s.handleSvcList) + masterServer.HandleFunc(fmt.Sprintf("/api/%s/watch/services", version), s.handleSvcWatch) } if err := s.startServing(s.MasterHttpAddr, http.Handler(masterServer)); err != nil { diff --git a/test/integration/router_stress_test.go b/test/integration/router_stress_test.go index 0c9a18f031e8..d7ed723da279 100644 --- a/test/integration/router_stress_test.go +++ b/test/integration/router_stress_test.go @@ -238,6 +238,11 @@ func (p *DelayPlugin) HandleRoute(eventType watch.EventType, route *routeapi.Rou return p.plugin.HandleRoute(eventType, route) } +func (p *DelayPlugin) HandleNode(eventType watch.EventType, node *kapi.Node) error { + p.delay() + return p.plugin.HandleNode(eventType, node) +} + func (p *DelayPlugin) HandleEndpoints(eventType watch.EventType, endpoints *kapi.Endpoints) error { p.delay() return p.plugin.HandleEndpoints(eventType, endpoints) @@ -277,7 +282,7 @@ func launchRouter(oc osclient.Interface, kc kclient.Interface, maxDelay int32, n } factory := controllerfactory.NewDefaultRouterControllerFactory(oc, kc) - controller := factory.Create(plugin) + controller := factory.Create(plugin, false) controller.Run() return diff --git a/test/testdata/bootstrappolicy/bootstrap_cluster_roles.yaml b/test/testdata/bootstrappolicy/bootstrap_cluster_roles.yaml index 7026b9bef653..09b87327f258 100644 --- a/test/testdata/bootstrappolicy/bootstrap_cluster_roles.yaml +++ b/test/testdata/bootstrappolicy/bootstrap_cluster_roles.yaml @@ -1772,6 +1772,14 @@ items: verbs: - list - watch + - apiGroups: + - "" + attributeRestrictions: null + resources: + - nodes + verbs: + - list + - watch - apiGroups: - "" attributeRestrictions: null