diff --git a/daemon/daemon.go b/daemon/daemon.go index 41aad9c5..ffbbf81f 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -18,6 +18,8 @@ import ( "github.com/AliyunContainerService/terway/pkg/tracing" "github.com/AliyunContainerService/terway/rpc" "github.com/AliyunContainerService/terway/types" + "github.com/containernetworking/cni/libcni" + containertypes "github.com/containernetworking/cni/pkg/types" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -45,6 +47,11 @@ const ( tracingKeyPendingPodsCount = "pending_pods_count" commandMapping = "mapping" + + cniDefaultPath = "/opt/cni/bin" + // this file is generated from configmap + terwayCNIConf = "/etc/eni/10-terway.conf" + cniExecTimeout = 10 * time.Second ) type networkService struct { @@ -64,6 +71,8 @@ type networkService struct { pendingPodsLock sync.RWMutex sync.RWMutex + cniBinPath string + rpc.UnimplementedTerwayBackendServer } @@ -261,6 +270,9 @@ func (networkService *networkService) AllocIP(ctx context.Context, r *rpc.AllocI Type: eniMultiIP.GetType(), }, }, + NetNs: func(s string) *string { + return &s + }(r.Netns), } networkContext.resources = append(networkContext.resources, newRes.Resources...) if networkService.eipResMgr != nil && podinfo.EipInfo.PodEip { @@ -319,6 +331,9 @@ func (networkService *networkService) AllocIP(ctx context.Context, r *rpc.AllocI Type: vpcEni.GetType(), }, }, + NetNs: func(s string) *string { + return &s + }(r.Netns), } networkContext.resources = append(networkContext.resources, newRes.Resources...) if networkService.eipResMgr != nil && podinfo.EipInfo.PodEip { @@ -381,6 +396,9 @@ func (networkService *networkService) AllocIP(ctx context.Context, r *rpc.AllocI Type: vpcVeth.GetType(), }, }, + NetNs: func(s string) *string { + return &s + }(r.Netns), } networkContext.resources = append(networkContext.resources, newRes.Resources...) err = networkService.resourceDB.Put(podInfoKey(podinfo.Namespace, podinfo.Name), newRes) @@ -679,18 +697,73 @@ func (networkService *networkService) startGarbageCollectionLoop() { } func (networkService *networkService) startPeriodCheck() { - log.Debugf("compare poll with metadata") - podMapping, err := networkService.GetResourceMapping() - if err != nil { - log.Error(err) - return - } - for _, res := range podMapping { - if res.Valid { - continue + // check pool + func() { + log.Debugf("compare poll with metadata") + podMapping, err := networkService.GetResourceMapping() + if err != nil { + log.Error(err) + return } - _ = tracing.RecordPodEvent(res.Name, res.Namespace, corev1.EventTypeWarning, "ResourceInvalid", fmt.Sprintf("resource %s", res.LocalResID)) - } + for _, res := range podMapping { + if res.Valid { + continue + } + _ = tracing.RecordPodEvent(res.Name, res.Namespace, corev1.EventTypeWarning, "ResourceInvalid", fmt.Sprintf("resource %s", res.LocalResID)) + } + }() + // call CNI CHECK, make sure all dev is ok + func() { + log.Debugf("call CNI CHECK") + defer func() { + log.Debugf("call CNI CHECK end") + }() + networkService.RLock() + podResList, err := networkService.resourceDB.List() + networkService.RUnlock() + if err != nil { + log.Error(err) + return + } + ff, err := ioutil.ReadFile(terwayCNIConf) + if err != nil { + log.Error(err) + return + } + for _, v := range podResList { + res := v.(PodResources) + if res.NetNs == nil { + continue + } + log.Debugf("checking pod name %s", res.PodInfo.Name) + cniCfg := libcni.NewCNIConfig([]string{networkService.cniBinPath}, nil) + func() { + ctx, cancel := context.WithTimeout(context.Background(), cniExecTimeout) + defer cancel() + err := cniCfg.CheckNetwork(ctx, &libcni.NetworkConfig{ + Network: &containertypes.NetConf{ + CNIVersion: "0.4.0", + Name: "terway", + Type: "terway", + }, + Bytes: ff, + }, &libcni.RuntimeConf{ + ContainerID: "fake", // must provide + NetNS: *res.NetNs, + IfName: "eth0", + Args: [][2]string{ + {"K8S_POD_NAME", res.PodInfo.Name}, + {"K8S_POD_NAMESPACE", res.PodInfo.Namespace}, + }, + }) + if err != nil { + log.Error(err) + return + } + }() + } + }() + } // tracing @@ -806,12 +879,17 @@ func (networkService *networkService) GetResourceMapping() ([]tracing.PodMapping func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (rpc.TerwayBackendServer, error) { log.Debugf("start network service with: %s, %s", configFilePath, daemonMode) + cniBinPath := os.Getenv("CNI_PATH") + if cniBinPath == "" { + cniBinPath = cniDefaultPath + } netSrv := &networkService{ configFilePath: configFilePath, kubeConfig: kubeconfig, master: master, pendingPods: map[string]interface{}{}, pendingPodsLock: sync.RWMutex{}, + cniBinPath: cniBinPath, } if daemonMode == daemonModeENIMultiIP || daemonMode == daemonModeVPC || daemonMode == daemonModeENIOnly { netSrv.daemonMode = daemonMode diff --git a/daemon/resource_manager.go b/daemon/resource_manager.go index 7a556b5c..9fe77b23 100644 --- a/daemon/resource_manager.go +++ b/daemon/resource_manager.go @@ -32,6 +32,7 @@ type ResourceItem struct { type PodResources struct { Resources []ResourceItem PodInfo *podInfo + NetNs *string } type resourceManagerInitItem struct { diff --git a/go.mod b/go.mod index 0cfabf85..155b0946 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/vishvananda/netlink v1.1.1-0.20201206203632-88079d98e65d github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae golang.org/x/net v0.0.0-20201209123823-ac852fbbde11 - golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e + golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0 google.golang.org/grpc v1.34.0 google.golang.org/protobuf v1.25.0 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index f4ea6bb6..a87cdb8a 100644 --- a/go.sum +++ b/go.sum @@ -291,8 +291,8 @@ golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e h1:AyodaIpKjppX+cBfTASF2E1US3H2JFBj920Ot3rtDjs= -golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0 h1:n+DPcgTwkgWzIFpLmoimYR2K2b0Ga5+Os4kayIN0vGo= +golang.org/x/sys v0.0.0-20201221093633-bc327ba9c2f0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/link/interface.go b/pkg/link/interface.go index c5ec24c7..9ceb6e30 100644 --- a/pkg/link/interface.go +++ b/pkg/link/interface.go @@ -19,7 +19,7 @@ func GetDeviceNumber(mac string) (int32, error) { return int32(link.Attrs().Index), nil } } - return 0, errors.Errorf("cannot found mac address: %s", mac) + return 0, errors.Wrapf(ErrNotFound, "can't found dev by mac %s", mac) } // GetDeviceName get interface device name by mac address @@ -34,5 +34,5 @@ func GetDeviceName(mac string) (string, error) { return link.Attrs().Name, nil } } - return "", errors.Errorf("cannot found mac address: %s", mac) + return "", errors.Wrapf(ErrNotFound, "can't found dev by mac %s", mac) } diff --git a/pkg/link/interface_unsupport.go b/pkg/link/interface_unsupport.go index da2b7472..a5ac75fd 100644 --- a/pkg/link/interface_unsupport.go +++ b/pkg/link/interface_unsupport.go @@ -2,16 +2,12 @@ package link -import ( - "github.com/pkg/errors" -) - // GetDeviceNumber get interface device number by mac address func GetDeviceNumber(mac string) (int32, error) { - return 0, errors.Errorf("not supported arch") + return 0, ErrUnsupported } // GetDeviceName get interface device name by mac address func GetDeviceName(mac string) (string, error) { - return "", errors.Errorf("not supported arch") + return "", ErrUnsupported } diff --git a/pkg/link/type.go b/pkg/link/type.go new file mode 100644 index 00000000..c4405109 --- /dev/null +++ b/pkg/link/type.go @@ -0,0 +1,10 @@ +package link + +import ( + "errors" +) + +var ( + ErrUnsupported = errors.New("not supported arch") + ErrNotFound = errors.New("not found") +) diff --git a/pkg/sysctl/sysctl.go b/pkg/sysctl/sysctl.go new file mode 100644 index 00000000..3017fb9d --- /dev/null +++ b/pkg/sysctl/sysctl.go @@ -0,0 +1,81 @@ +// Copyright 2019 Authors of Cilium +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sysctl + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" +) + +const ( + prefixDir = "/proc/sys" +) + +// Setting represents a sysctl setting. Its purpose it to be able to iterate +// over a slice of settings. +type Setting struct { + Name string + Val string + IgnoreErr bool +} + +func fullPath(name string) string { + return filepath.Join(prefixDir, strings.Replace(name, ".", "/", -1)) +} + +func writeSysctl(name string, value string) error { + fPath := fullPath(name) + f, err := os.OpenFile(fPath, os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("could not open the sysctl file %s: %s", + fPath, err) + } + defer f.Close() + if _, err := io.WriteString(f, value); err != nil { + return fmt.Errorf("could not write to the systctl file %s: %s", + fPath, err) + } + return nil +} + +// Disable disables the given sysctl parameter. +func Disable(name string) error { + return writeSysctl(name, "0") +} + +// Enable enables the given sysctl parameter. +func Enable(name string) error { + return writeSysctl(name, "1") +} + +// Write writes the given sysctl parameter. +func Write(name string, val string) error { + return writeSysctl(name, val) +} + +// Read reads the given sysctl parameter. +func Read(name string) (string, error) { + fPath := fullPath(name) + val, err := ioutil.ReadFile(fPath) + if err != nil { + return "", fmt.Errorf("Failed to read %s: %s", fPath, val) + } + + return strings.TrimRight(string(val), "\n"), nil +} diff --git a/plugin/driver/drivers.go b/plugin/driver/drivers.go index c326cee4..d4485d8d 100644 --- a/plugin/driver/drivers.go +++ b/plugin/driver/drivers.go @@ -21,6 +21,22 @@ var ( IPVlanDriver NetnsDriver = newIPVlanDriver() ) +type RecordPodEvent func(msg string) + +type CheckConfig struct { + RecordPodEvent + + NetNS ns.NetNS + + HostVethName string + DeviceID int32 + + ContainerIFName string + // for pod + IPv4Addr *net.IPNet + Gateway net.IP +} + // NetnsDriver to config container netns interface and routes type NetnsDriver interface { Setup(hostVeth string, @@ -40,6 +56,8 @@ type NetnsDriver interface { containerVeth string, netNS ns.NetNS, containerIP net.IP) error + + Check(cfg *CheckConfig) error } type vethDriver struct { @@ -430,6 +448,10 @@ func (d *vethDriver) Teardown(hostIfName string, return netlink.LinkDel(hostVeth) } +func (d *vethDriver) Check(cfg *CheckConfig) error { + return nil +} + func setupVethPair(contVethName, pairName string, mtu int, hostNS ns.NetNS) (net.Interface, net.Interface, error) { contVeth, err := makeVethPair(contVethName, pairName, mtu) if err != nil { diff --git a/plugin/driver/ipvlan.go b/plugin/driver/ipvlan.go index 4be15b53..4609c4a3 100644 --- a/plugin/driver/ipvlan.go +++ b/plugin/driver/ipvlan.go @@ -8,6 +8,7 @@ import ( "strconv" "syscall" + "github.com/AliyunContainerService/terway/pkg/sysctl" "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/plugins/pkg/ns" "github.com/pkg/errors" @@ -94,12 +95,10 @@ func (driver *ipvlanDriver) Setup( if err != nil { return errors.Wrapf(err, "%s, get device by index %d error.", driver.name, deviceID) } - if parentLink.Attrs().OperState != netlink.OperUp { - if err := netlink.LinkSetUp(parentLink); err != nil { - return errors.Wrapf(err, "%s, set device %s up error", driver.name, parentLink.Attrs().Name) - } + _, err = EnsureLinkUp(parentLink) + if err != nil { + return errors.Wrapf(err, "%s, set device %s up error", driver.name, parentLink.Attrs().Name) } - slaveIPVlan := &netlink.IPVlan{ LinkAttrs: netlink.LinkAttrs{ Name: hostIPVlan, @@ -150,13 +149,12 @@ func (driver *ipvlanDriver) Setup( } } } - defLink, err := netlink.LinkByName(containerIPVlan) if err != nil { return errors.Wrapf(err, "%s, get device %s error", name, defLink.Attrs().Name) } - if err := netlink.LinkSetUp(defLink); err != nil { + if _, err := EnsureLinkUp(defLink); err != nil { return errors.Wrapf(err, "%s, set device %s up error", name, defLink.Attrs().Name) } @@ -237,6 +235,56 @@ func (driver *ipvlanDriver) Teardown(hostIPVlan string, return driver.teardownInitNamespace(parents, containerIP) } +func (driver *ipvlanDriver) Check(cfg *CheckConfig) error { + // 1. check parent link ( this is called in every setup it is safe) + parentLink, err := netlink.LinkByIndex(int(cfg.DeviceID)) + if err != nil { + return errors.Wrapf(err, "%s, get device by index %d error.", driver.name, cfg.DeviceID) + } + changed, err := EnsureLinkUp(parentLink) + if err != nil { + return err + } + if changed { + cfg.RecordPodEvent(fmt.Sprintf("parent link id %d set to up", int(cfg.DeviceID))) + } + + // 2. check addr and default route + err = cfg.NetNS.Do(func(netNS ns.NetNS) error { + link, err := netlink.LinkByName(cfg.ContainerIFName) + if err != nil { + return err + } + + changed, err := EnsureLinkUp(link) + if err != nil { + return err + } + + if changed { + cfg.RecordPodEvent(fmt.Sprintf("link %s set to up", cfg.ContainerIFName)) + } + changed, err = EnsureDefaultRoute(link, cfg.Gateway) + if err != nil { + return err + } + if changed { + Log.Debugf("route is changed") + cfg.RecordPodEvent("default route is updated") + } + err = sysctl.Enable(fmt.Sprintf("net.ipv4.conf.%s.forwarding", cfg.ContainerIFName)) + if err != nil { + return err + } + err = sysctl.Disable(fmt.Sprintf("net.ipv4.conf.%s.rp_filter", cfg.ContainerIFName)) + if err != nil { + return err + } + return nil + }) + return err +} + func (driver *ipvlanDriver) createSlaveIfNotExist(parentLink netlink.Link, slaveName string) (netlink.Link, error) { slaveLink, err := netlink.LinkByName(slaveName) if err != nil { diff --git a/plugin/driver/raw_nic.go b/plugin/driver/raw_nic.go index f23d7ee8..d01f37ce 100644 --- a/plugin/driver/raw_nic.go +++ b/plugin/driver/raw_nic.go @@ -2,20 +2,24 @@ package driver import ( "encoding/hex" + "fmt" "math/rand" "net" "time" + "github.com/AliyunContainerService/terway/pkg/sysctl" "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/plugins/pkg/ns" "github.com/pkg/errors" "github.com/vishvananda/netlink" ) +// rawNicDriver put nic in net ns type rawNicDriver struct { } -func (r *rawNicDriver) Setup(hostVeth string, +func (r *rawNicDriver) Setup( + hostVeth string, containerVeth string, ipv4Addr *net.IPNet, primaryIpv4Addr net.IP, @@ -194,6 +198,40 @@ func (r *rawNicDriver) Teardown(hostVeth string, return nil } +func (r *rawNicDriver) Check(cfg *CheckConfig) error { + _ = cfg.NetNS.Do(func(netNS ns.NetNS) error { + link, err := netlink.LinkByName(cfg.ContainerIFName) + if err != nil { + return err + } + changed, err := EnsureLinkUp(link) + if err != nil { + return err + } + if changed { + cfg.RecordPodEvent(fmt.Sprintf("link %s set to up", cfg.ContainerIFName)) + } + changed, err = EnsureDefaultRoute(link, cfg.Gateway) + if err != nil { + return err + } + if changed { + Log.Debugf("route is changed") + cfg.RecordPodEvent("default route is updated") + } + err = sysctl.Enable(fmt.Sprintf("net.ipv4.conf.%s.forwarding", cfg.ContainerIFName)) + if err != nil { + return err + } + err = sysctl.Disable(fmt.Sprintf("net.ipv4.conf.%s.rp_filter", cfg.ContainerIFName)) + if err != nil { + return err + } + return nil + }) + return nil +} + const nicPrefix = "eth" func (*rawNicDriver) randomNicName() (string, error) { diff --git a/plugin/driver/utils.go b/plugin/driver/utils.go index 7502e447..26722a80 100644 --- a/plugin/driver/utils.go +++ b/plugin/driver/utils.go @@ -1,12 +1,26 @@ package driver import ( + "encoding/json" "fmt" + "io/ioutil" + "log" "net" + "os" + "strings" + "time" + "github.com/containernetworking/cni/pkg/types" + "github.com/containernetworking/plugins/pkg/ip" "github.com/containernetworking/plugins/pkg/utils/sysctl" "github.com/pkg/errors" "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + fileLockTimeOut = 11 * time.Second ) func deleteRoutesForAddr(addr *net.IPNet, tableID int) error { @@ -75,3 +89,124 @@ func EnsureHostNsConfig() error { } return nil } + +// EnsureLinkUp set link up,return changed and err +func EnsureLinkUp(link netlink.Link) (bool, error) { + if link.Attrs().Flags&net.FlagUp != 0 { + return false, nil + } + return true, netlink.LinkSetUp(link) +} + +// EnsureDefaultRoute +func EnsureDefaultRoute(link netlink.Link, gw net.IP) (bool, error) { + err := ip.ValidateExpectedRoute([]*types.Route{ + { + Dst: *defaultRoute, + GW: gw, + }, + }) + if err == nil { + return false, nil + } + + if !strings.Contains(err.Error(), "not found") { + return false, err + } + + err = netlink.RouteReplace(&netlink.Route{ + LinkIndex: link.Attrs().Index, + Scope: netlink.SCOPE_UNIVERSE, + Flags: int(netlink.FLAG_ONLINK), + Dst: defaultRoute, + Gw: gw, + }) + return true, err +} + +var Log = MyLog{ + l: log.New(ioutil.Discard, "", log.LstdFlags), +} + +type MyLog struct { + l *log.Logger + debug bool +} + +// Debugf +func (m *MyLog) Debugf(format string, v ...interface{}) { + if !m.debug { + return + } + m.l.Printf(format, v...) +} + +// Debug +func (m *MyLog) Debug(v ...interface{}) { + if !m.debug { + return + } + m.l.Print(v...) +} + +// SetDebug +func (m *MyLog) SetDebug(d bool, fd *os.File) { + if !d { + m.l.SetOutput(ioutil.Discard) + return + } + m.debug = true + m.l.SetOutput(fd) +} + +// JSONStr +func JSONStr(v interface{}) string { + b, err := json.Marshal(v) + if err != nil { + return "" + } + return string(b) +} + +type Locker struct { + FD *os.File +} + +// Close +func (l *Locker) Close() error { + if l.FD != nil { + return l.FD.Close() + } + return nil +} + +// GrabFileLock get file lock with timeout 11seconds +func GrabFileLock(lockfilePath string) (*Locker, error) { + var success bool + var err error + l := &Locker{} + defer func(l *Locker) { + if !success { + l.Close() + } + }(l) + + l.FD, err = os.OpenFile(lockfilePath, os.O_CREATE, 0600) + if err != nil { + return nil, fmt.Errorf("failed to open lock %s: %v", lockfilePath, err) + } + if err := wait.PollImmediate(200*time.Millisecond, fileLockTimeOut, func() (bool, error) { + if err := grabFileLock(l.FD); err != nil { + return false, nil + } + return true, nil + }); err != nil { + return nil, fmt.Errorf("failed to acquire new iptables lock: %v", err) + } + success = true + return l, nil +} + +func grabFileLock(f *os.File) error { + return unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB) +} diff --git a/plugin/terway/cni.go b/plugin/terway/cni.go index 634c88de..471590bf 100644 --- a/plugin/terway/cni.go +++ b/plugin/terway/cni.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net" + "os" "runtime" "strings" "time" @@ -13,6 +14,7 @@ import ( "github.com/AliyunContainerService/terway/plugin/driver" "github.com/AliyunContainerService/terway/rpc" "github.com/AliyunContainerService/terway/version" + "github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types/current" @@ -20,14 +22,14 @@ import ( "github.com/containernetworking/plugins/pkg/ipam" "github.com/containernetworking/plugins/pkg/ns" "github.com/pkg/errors" - "github.com/vishvananda/netlink" "google.golang.org/grpc" ) const ( defaultSocketPath = "/var/run/eni/eni.socket" defaultVethPrefix = "cali" - defaultCniTimeout = 120 + defaultCniTimeout = 120 * time.Second + defaultEventTimeout = 10 * time.Second defaultVethForENI = "veth1" delegateIpam = "host-local" eniIPVirtualTypeIPVlan = "ipvlan" @@ -44,6 +46,9 @@ const ( } } ` + + terwayCNILock = "/var/run/eni/terway_cni.lock" + terwayCNIDebugLogPath = "/tmp/terway_debug.log" ) func init() { @@ -73,6 +78,9 @@ type NetConf struct { // HostStackCIDRs is a list of CIDRs, all traffic targeting these CIDRs will be redirected to host network stack HostStackCIDRs []string `json:"host_stack_cidrs"` + + // Debug + Debug bool `json:"debug"` } // K8SArgs is cni args of kubernetes @@ -84,38 +92,53 @@ type K8SArgs struct { K8S_POD_INFRA_CONTAINER_ID types.UnmarshallableString // nolint } -var networkDriver = driver.VethDriver +var vethDriver = driver.VethDriver var eniMultiIPDriver = driver.VethDriver var nicDriver = driver.NicDriver -func cmdAdd(args *skel.CmdArgs) error { +func parseCmdArgs(args *skel.CmdArgs) (string, ns.NetNS, *NetConf, *K8SArgs, error) { versionDecoder := &cniversion.ConfigDecoder{} - var ( - confVersion string - cniNetns ns.NetNS - ) - confVersion, err := versionDecoder.Decode(args.StdinData) if err != nil { - return err + return "", nil, nil, nil, err } - - cniNetns, err = ns.GetNS(args.Netns) + netNS, err := ns.GetNS(args.Netns) if err != nil { - return err + return "", nil, nil, nil, err } conf := NetConf{} if err = json.Unmarshal(args.StdinData, &conf); err != nil { - return errors.Wrap(err, "add cmd: error loading config from args") + return "", nil, nil, nil, errors.Wrap(err, "error loading config from args") } k8sConfig := K8SArgs{} if err = types.LoadArgs(args.Args, &k8sConfig); err != nil { - return errors.Wrap(err, "add cmd: failed to load k8s config from args") + return "", nil, nil, nil, errors.Wrap(err, "error loading config from args") + } + + return confVersion, netNS, &conf, &k8sConfig, nil +} + +func cmdAdd(args *skel.CmdArgs) error { + confVersion, cniNetns, conf, k8sConfig, err := parseCmdArgs(args) + if err != nil { + return err } + defer cniNetns.Close() - if err = driver.EnsureHostNsConfig(); err != nil { + if conf.Debug { + fd, err := os.OpenFile(terwayCNIDebugLogPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + if err == nil { + defer fd.Close() + driver.Log.SetDebug(true, fd) + } + } + driver.Log.Debugf("args: %s", driver.JSONStr(args)) + driver.Log.Debugf("cmdAdd: ns %s , k8s %s, cni std %s", cniNetns.Path(), driver.JSONStr(k8sConfig), driver.JSONStr(conf)) + + err = driver.EnsureHostNsConfig() + if err != nil { return errors.Wrapf(err, "add cmd: failed setup host namespace configs") } @@ -127,14 +150,14 @@ func cmdAdd(args *skel.CmdArgs) error { } defer closeConn() - timeoutContext, cancel := context.WithTimeout(context.Background(), defaultCniTimeout*time.Second) + timeoutContext, cancel := context.WithTimeout(context.Background(), defaultCniTimeout) defer cancel() defer func() { if err != nil { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), defaultEventTimeout) defer cancel() - _, err = terwayBackendClient.RecordEvent(ctx, + _, _ = terwayBackendClient.RecordEvent(ctx, &rpc.EventRequest{ EventTarget: rpc.EventTarget_EventTargetPod, K8SPodName: string(k8sConfig.K8S_POD_NAME), @@ -168,9 +191,9 @@ func cmdAdd(args *skel.CmdArgs) error { defer func() { if err != nil { - ctx, cancel := context.WithTimeout(context.Background(), defaultCniTimeout*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), defaultCniTimeout) defer cancel() - _, err = terwayBackendClient.ReleaseIP(ctx, + _, _ = terwayBackendClient.ReleaseIP(ctx, &rpc.ReleaseIPRequest{ K8SPodName: string(k8sConfig.K8S_POD_NAME), K8SPodNamespace: string(k8sConfig.K8S_POD_NAMESPACE), @@ -255,13 +278,18 @@ func cmdAdd(args *skel.CmdArgs) error { eniMultiIPDriver = driver.IPVlanDriver } } + l, err := driver.GrabFileLock(terwayCNILock) + if err != nil { + driver.Log.Debug(err) + return nil + } + defer l.Close() err = eniMultiIPDriver.Setup(hostVethName, args.IfName, subnet, primaryIP, serviceCIDR, hostStackCIDRs, gw, nil, int(deviceID), ingress, egress, cniNetns) if err != nil { return fmt.Errorf("setup network failed: %v", err) } allocatedIPAddr = *subnet allocatedGatewayAddr = gw - case rpc.IPType_TypeVPCIP: if allocResult.GetVpcIp() == nil || allocResult.GetVpcIp().GetPodConfig() == nil || allocResult.GetVpcIp().NodeCidr == "" { @@ -298,8 +326,13 @@ func cmdAdd(args *skel.CmdArgs) error { ingress := allocResult.GetVpcIp().GetPodConfig().GetIngress() egress := allocResult.GetVpcIp().GetPodConfig().GetEgress() - - err = networkDriver.Setup(hostVethName, args.IfName, &podIPAddr, nil, nil, nil, gateway, nil, 0, ingress, egress, cniNetns) + l, err := driver.GrabFileLock(terwayCNILock) + if err != nil { + driver.Log.Debug(err) + return nil + } + defer l.Close() + err = vethDriver.Setup(hostVethName, args.IfName, &podIPAddr, nil, nil, nil, gateway, nil, 0, ingress, egress, cniNetns) if err != nil { return fmt.Errorf("setup network failed: %v", err) } @@ -335,26 +368,9 @@ func cmdAdd(args *skel.CmdArgs) error { return fmt.Errorf("error get gw from alloc result: %s", allocResult.GetVpcEni().GetEniConfig().GetGateway()) } - deviceNumber := 0 - if allocResult.GetVpcEni().GetEniConfig().GetMacAddr() == "" { - return fmt.Errorf("error get devicenumber from alloc result: %v", allocResult.GetVpcEni().GetEniConfig().GetMacAddr()) - } - var linkList []netlink.Link - linkList, err = netlink.LinkList() - found := false - for _, enilink := range linkList { - if enilink.Attrs().HardwareAddr.String() == allocResult.GetVpcEni().GetEniConfig().GetMacAddr() { - deviceNumber = enilink.Attrs().Index - found = true - break - } - } - if !found { - return fmt.Errorf("error get allocated mac address for eni: %s", allocResult.GetVpcEni().GetEniConfig().GetMacAddr()) - } - - if deviceNumber == 0 { - return fmt.Errorf("invaild device number: %v", deviceNumber) + deviceNumber, err := link.GetDeviceNumber(allocResult.GetVpcEni().GetEniConfig().GetMacAddr()) + if err != nil { + return err } extraRoutes := []*types.Route{ @@ -366,14 +382,20 @@ func cmdAdd(args *skel.CmdArgs) error { ingress := allocResult.GetVpcEni().GetPodConfig().GetIngress() egress := allocResult.GetVpcEni().GetPodConfig().GetEgress() - err = networkDriver.Setup(hostVethName, defaultVethForENI, eniAddrSubnet, nil, nil, nil, gw, extraRoutes, 0, ingress, egress, cniNetns) + l, err := driver.GrabFileLock(terwayCNILock) + if err != nil { + driver.Log.Debug(err) + return nil + } + defer l.Close() + err = vethDriver.Setup(hostVethName, defaultVethForENI, eniAddrSubnet, nil, nil, nil, gw, extraRoutes, 0, ingress, egress, cniNetns) if err != nil { return fmt.Errorf("setup veth network for eni failed: %v", err) } defer func() { if err != nil { - if e := networkDriver.Teardown(hostVethName, args.IfName, cniNetns, nil); e != nil { + if e := vethDriver.Teardown(hostVethName, args.IfName, cniNetns, nil); e != nil { err = errors.Wrapf(err, "tear down veth network for eni failed: %v", e) } } @@ -397,7 +419,7 @@ func cmdAdd(args *skel.CmdArgs) error { }}, } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), defaultEventTimeout) defer cancel() _, _ = terwayBackendClient.RecordEvent(ctx, &rpc.EventRequest{ @@ -413,36 +435,31 @@ func cmdAdd(args *skel.CmdArgs) error { } func cmdDel(args *skel.CmdArgs) error { - versionDecoder := &cniversion.ConfigDecoder{} - confVersion, err := versionDecoder.Decode(args.StdinData) + confVersion, cniNetns, conf, k8sConfig, err := parseCmdArgs(args) if err != nil { return err } + defer cniNetns.Close() - cniNetns, err := ns.GetNS(args.Netns) - if err != nil { - return err - } - - conf := NetConf{} - if err := json.Unmarshal(args.StdinData, &conf); err != nil { - return errors.Wrap(err, "add cmd: error loading config from args") - } - - k8sConfig := K8SArgs{} - if err := types.LoadArgs(args.Args, &k8sConfig); err != nil { - return errors.Wrap(err, "add cmd: failed to load k8s config from args") + if conf.Debug { + fd, err := os.OpenFile(terwayCNIDebugLogPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + if err == nil { + defer fd.Close() + driver.Log.SetDebug(true, fd) + } } + driver.Log.Debugf("args: %s", driver.JSONStr(args)) + driver.Log.Debugf("cmdDel: ns %s , k8s %s, cni std %s", cniNetns.Path(), driver.JSONStr(k8sConfig), driver.JSONStr(conf)) terwayBackendClient, closeConn, err := getNetworkClient() if err != nil { - return errors.Wrap(err, fmt.Sprintf("add cmd: create grpc client, pod: %s-%s", + return errors.Wrap(err, fmt.Sprintf("del cmd: create grpc client, pod: %s-%s", string(k8sConfig.K8S_POD_NAMESPACE), string(k8sConfig.K8S_POD_NAME), )) } defer closeConn() - timeoutContext, cancel := context.WithTimeout(context.Background(), defaultCniTimeout*time.Second) + timeoutContext, cancel := context.WithTimeout(context.Background(), defaultCniTimeout) defer cancel() infoResult, err := terwayBackendClient.GetIPInfo( @@ -454,7 +471,7 @@ func cmdDel(args *skel.CmdArgs) error { }) if err != nil { - return errors.Wrap(err, fmt.Sprintf("add cmd: error get ip info from grpc call, pod: %s-%s", + return errors.Wrap(err, fmt.Sprintf("del cmd: error get ip info from grpc call, pod: %s-%s", string(k8sConfig.K8S_POD_NAMESPACE), string(k8sConfig.K8S_POD_NAME), )) } @@ -478,7 +495,12 @@ func cmdDel(args *skel.CmdArgs) error { if podIP == nil { return errors.Wrapf(err, "invalid pod ip %s", infoResult.GetPodIP()) } - + l, err := driver.GrabFileLock(terwayCNILock) + if err != nil { + driver.Log.Debug(err) + return nil + } + defer l.Close() err = eniMultiIPDriver.Teardown(hostVethName, args.IfName, cniNetns, podIP) if err != nil { return errors.Wrapf(err, "error teardown network for pod: %s-%s", @@ -490,7 +512,13 @@ func cmdDel(args *skel.CmdArgs) error { if err != nil { return fmt.Errorf("get info return subnet is not vaild: %v", infoResult.GetNodeCidr()) } - err = networkDriver.Teardown(hostVethName, args.IfName, cniNetns, nil) + l, err := driver.GrabFileLock(terwayCNILock) + if err != nil { + driver.Log.Debug(err) + return nil + } + defer l.Close() + err = vethDriver.Teardown(hostVethName, args.IfName, cniNetns, nil) if err != nil { return errors.Wrapf(err, "error teardown network for pod: %s-%s", string(k8sConfig.K8S_POD_NAMESPACE), string(k8sConfig.K8S_POD_NAME)) @@ -503,7 +531,13 @@ func cmdDel(args *skel.CmdArgs) error { } case rpc.IPType_TypeVPCENI: - _ = networkDriver.Teardown(hostVethName, defaultVethForENI, cniNetns, nil) + l, err := driver.GrabFileLock(terwayCNILock) + if err != nil { + driver.Log.Debug(err) + return nil + } + defer l.Close() + _ = vethDriver.Teardown(hostVethName, defaultVethForENI, cniNetns, nil) // ignore ENI veth release error //if err != nil { // // ignore ENI veth release error @@ -540,11 +574,169 @@ func cmdDel(args *skel.CmdArgs) error { } func cmdCheck(args *skel.CmdArgs) error { + _, cniNetns, conf, k8sConfig, err := parseCmdArgs(args) + if err != nil { + return err + } + defer cniNetns.Close() + + if conf.Debug { + fd, err := os.OpenFile(terwayCNIDebugLogPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + if err == nil { + defer fd.Close() + driver.Log.SetDebug(true, fd) + } + } + driver.Log.Debugf("args: %s", driver.JSONStr(args)) + driver.Log.Debugf("cmdCheck: ns %s , k8s %s, cni std %s", cniNetns.Path(), driver.JSONStr(k8sConfig), driver.JSONStr(conf)) + + terwayBackendClient, closeConn, err := getNetworkClient() + if err != nil { + return errors.Wrap(err, fmt.Sprintf("add cmd: create grpc client, pod: %s-%s", + string(k8sConfig.K8S_POD_NAMESPACE), string(k8sConfig.K8S_POD_NAME), + )) + } + defer closeConn() + + timeoutContext, cancel := context.WithTimeout(context.Background(), defaultCniTimeout) + defer cancel() + allocResult, err := terwayBackendClient.AllocIP( + timeoutContext, + &rpc.AllocIPRequest{ + Netns: args.Netns, + K8SPodName: string(k8sConfig.K8S_POD_NAME), + K8SPodNamespace: string(k8sConfig.K8S_POD_NAMESPACE), + K8SPodInfraContainerId: string(k8sConfig.K8S_POD_INFRA_CONTAINER_ID), + IfName: args.IfName, + }) + + if err != nil { + driver.Log.Debug(err) + return nil + } + if !allocResult.Success { + return fmt.Errorf("error on alloc eip from terway backend") + } + + hostVethName, _ := link.VethNameForPod(string(k8sConfig.K8S_POD_NAME), string(k8sConfig.K8S_POD_NAMESPACE), defaultVethPrefix) + + switch allocResult.IPType { + case rpc.IPType_TypeENIMultiIP: + if allocResult.GetENIMultiIP() == nil || + allocResult.GetENIMultiIP().GetEniConfig() == nil { + return nil + } + containerIPv4Addr, err := ParseAddr(allocResult.GetENIMultiIP().GetEniConfig().GetIPv4Addr(), allocResult.GetENIMultiIP().GetEniConfig().GetIPv4Subnet()) + if err != nil { + driver.Log.Debug(err) + return nil + } + + gw, err := ParesIP(allocResult.GetENIMultiIP().GetEniConfig().GetGateway()) + if err != nil { + driver.Log.Debug(err) + return nil + } + + cfg := driver.CheckConfig{ + RecordPodEvent: func(msg string) { + ctx, cancel := context.WithTimeout(context.Background(), defaultEventTimeout) + defer cancel() + _, _ = terwayBackendClient.RecordEvent(ctx, + &rpc.EventRequest{ + EventTarget: rpc.EventTarget_EventTargetPod, + K8SPodName: string(k8sConfig.K8S_POD_NAME), + K8SPodNamespace: string(k8sConfig.K8S_POD_NAMESPACE), + EventType: rpc.EventType_EventTypeWarning, + Reason: "ConfigCheck", + Message: msg, + }) + }, + NetNS: cniNetns, + ContainerIFName: args.IfName, + HostVethName: hostVethName, + IPv4Addr: containerIPv4Addr, + Gateway: gw, + DeviceID: allocResult.GetENIMultiIP().GetEniConfig().GetDeviceNumber(), + } + if strings.ToLower(conf.ENIIPVirtualType) == eniIPVirtualTypeIPVlan { + ok, err := driver.CheckIPVLanAvailable() + if err != nil { + driver.Log.Debug(err) + return nil + } + if ok { + eniMultiIPDriver = driver.IPVlanDriver + } + } + l, err := driver.GrabFileLock(terwayCNILock) + if err != nil { + driver.Log.Debug(err) + return nil + } + defer l.Close() + err = eniMultiIPDriver.Check(&cfg) + if err != nil { + driver.Log.Debug(err) + return nil + } + case rpc.IPType_TypeVPCIP: + return nil + case rpc.IPType_TypeVPCENI: + if allocResult.GetVpcEni() == nil || + allocResult.GetVpcEni().GetServiceCidr() == "" || + allocResult.GetVpcEni().GetEniConfig() == nil { + return nil + } + + containerIPv4Addr, err := ParseAddr(allocResult.GetVpcEni().GetEniConfig().GetIPv4Addr(), allocResult.GetVpcEni().GetEniConfig().GetIPv4Subnet()) + if err != nil { + return nil + } + + gw, err := ParesIP(allocResult.GetVpcEni().GetEniConfig().GetGateway()) + if err != nil { + return nil + } + + cfg := &driver.CheckConfig{ + RecordPodEvent: func(msg string) { + ctx, cancel := context.WithTimeout(context.Background(), defaultEventTimeout) + defer cancel() + _, _ = terwayBackendClient.RecordEvent(ctx, + &rpc.EventRequest{ + EventTarget: rpc.EventTarget_EventTargetPod, + K8SPodName: string(k8sConfig.K8S_POD_NAME), + K8SPodNamespace: string(k8sConfig.K8S_POD_NAMESPACE), + EventType: rpc.EventType_EventTypeWarning, + Reason: "ConfigCheck", + Message: msg, + }) + }, + NetNS: cniNetns, + ContainerIFName: args.IfName, + IPv4Addr: containerIPv4Addr, + Gateway: gw, + } + l, err := driver.GrabFileLock(terwayCNILock) + if err != nil { + driver.Log.Debug(err) + return nil + } + defer l.Close() + err = nicDriver.Check(cfg) + if err != nil { + driver.Log.Debug(err) + return nil + } + default: + return nil + } return nil } func getNetworkClient() (rpc.TerwayBackendClient, func(), error) { - timeoutCtx, cancel := context.WithTimeout(context.Background(), defaultCniTimeout*time.Second) + timeoutCtx, cancel := context.WithTimeout(context.Background(), defaultCniTimeout) grpcConn, err := grpc.DialContext(timeoutCtx, defaultSocketPath, grpc.WithInsecure(), grpc.WithContextDialer( func(ctx context.Context, s string) (net.Conn, error) { unixAddr, err := net.ResolveUnixAddr("unix", defaultSocketPath) @@ -565,3 +757,26 @@ func getNetworkClient() (rpc.TerwayBackendClient, func(), error) { cancel() }, nil } + +// ParseAddr build with ip and subnet +func ParseAddr(ip string, subnet string) (*net.IPNet, error) { + i, err := ParesIP(ip) + if err != nil { + return nil, err + } + _, s, err := net.ParseCIDR(subnet) + if err != nil { + return nil, fmt.Errorf("parseCIDR failed [%s]", subnet) + } + s.IP = i + return s, nil +} + +// ParesIP return ip or err +func ParesIP(ip string) (net.IP, error) { + i := net.ParseIP(ip) + if i == nil { + return nil, fmt.Errorf("parseIP failed [%s]", i) + } + return i, nil +}