Skip to content

Commit

Permalink
Merge pull request #688 from l1b0k/release-1.9
Browse files Browse the repository at this point in the history
backport fix for 1.9
  • Loading branch information
BSWANG authored Sep 13, 2024
2 parents 94795f0 + 38eeb31 commit 325d07e
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 8 deletions.
6 changes: 3 additions & 3 deletions pkg/aliyun/credential/aliyun_client_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (c *ClientMgr) refreshToken() (bool, error) {
if err != nil {
return false, err
}
c.ecs.SetEndpointRules(c.ecs.EndpointMap, "regional", "public")
c.ecs.SetEndpointRules(c.ecs.EndpointMap, "regional", "vpc")

if c.ecsDomainOverride != "" {
c.ecs.Domain = c.ecsDomainOverride
Expand All @@ -191,7 +191,7 @@ func (c *ClientMgr) refreshToken() (bool, error) {
if err != nil {
return false, err
}
c.vpc.SetEndpointRules(c.vpc.EndpointMap, "regional", "public")
c.vpc.SetEndpointRules(c.vpc.EndpointMap, "regional", "vpc")

if c.vpcDomainOverride != "" {
c.vpc.Domain = c.vpcDomainOverride
Expand All @@ -201,7 +201,7 @@ func (c *ClientMgr) refreshToken() (bool, error) {
if err != nil {
return false, err
}
c.eflo.SetEndpointRules(c.eflo.EndpointMap, "regional", "public")
c.eflo.SetEndpointRules(c.eflo.EndpointMap, "regional", "vpc")

if c.efloDomainOverride != "" {
c.eflo.Domain = c.efloDomainOverride
Expand Down
38 changes: 37 additions & 1 deletion pkg/eni/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/AliyunContainerService/terway/pkg/metric"
)

const defaultSyncPeriod = 1 * time.Minute

var _ NetworkInterface = &Local{}
var _ Usage = &Local{}
var _ ReportStatus = &Trunk{}
Expand Down Expand Up @@ -178,7 +181,7 @@ func (l *Local) Run(ctx context.Context, podResources []daemon.PodResources, wg

go l.notify(ctx)

go wait.JitterUntil(l.sync, 1*time.Minute, 1.0, true, ctx.Done())
go wait.JitterUntil(l.sync, defaultSyncPeriod, 1.0, true, ctx.Done())

return nil
}
Expand Down Expand Up @@ -371,6 +374,7 @@ func (l *Local) sync() {

syncIPLocked(l.ipv4, ipv4)
syncIPLocked(l.ipv6, ipv6)
report()

l.cond.Broadcast()
}
Expand Down Expand Up @@ -1038,8 +1042,40 @@ func syncIPLocked(lo Set, remote []netip.Addr) {
}
}
}
orphanIP(lo, s)
}

func orphanIP(lo Set, remote sets.Set[netip.Addr]) {
for key := range remote {
if _, ok := lo[key]; !ok {

prev, ok := invalidIPCache.Get(key)
if !ok {
invalidIPCache.Add(key, 1, 5*defaultSyncPeriod)
} else {
invalidIPCache.Add(key, prev.(int)+1, 5*defaultSyncPeriod)
}
} else {
invalidIPCache.Remove(key)
}
}
}

func report() {
for _, key := range invalidIPCache.Keys() {
count, ok := invalidIPCache.Get(key)
if !ok {
continue
}
if count.(int) > 1 {
_ = tracing.RecordNodeEvent(corev1.EventTypeWarning, string(types.ErrResourceInvalid), fmt.Sprintf("orphan ip found on ecs metadata, ip: %s", key))
logf.Log.Info("orphan ip found on ecs metadata", "ip", key)
}
}
}

var invalidIPCache = cache.NewLRUExpireCache(100)

func parseResourceID(id string) (string, string, error) {
parts := strings.SplitN(id, ".", 2)
if len(parts) < 2 {
Expand Down
29 changes: 29 additions & 0 deletions pkg/eni/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/stretchr/testify/assert"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/AliyunContainerService/terway/pkg/factory"
"github.com/AliyunContainerService/terway/types"
Expand Down Expand Up @@ -309,3 +311,30 @@ func Test_parseResourceID(t *testing.T) {
})
}
}

func Test_orphanIP(t *testing.T) {
invalidIPCache = cache.NewLRUExpireCache(100)

lo1 := map[netip.Addr]*IP{
netip.MustParseAddr("127.0.0.1"): {
ip: netip.MustParseAddr("127.0.0.1"),
},
}

remote1 := sets.Set[netip.Addr]{
netip.MustParseAddr("127.0.0.1"): {},
netip.MustParseAddr("127.0.0.2"): {},
}

orphanIP(lo1, remote1)

v, _ := invalidIPCache.Get(netip.MustParseAddr("127.0.0.1"))
assert.Equal(t, nil, v)

v, _ = invalidIPCache.Get(netip.MustParseAddr("127.0.0.2"))
assert.Equal(t, 1, v)

orphanIP(lo1, remote1)
v, _ = invalidIPCache.Get(netip.MustParseAddr("127.0.0.2"))
assert.Equal(t, 2, v)
}
2 changes: 1 addition & 1 deletion pkg/eni/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (ip *IP) Allocatable() bool {
return ip.Valid() && !ip.InUse()
}

type Set map[any]*IP
type Set map[netip.Addr]*IP

func (s Set) Idles() []*IP {
var result []*IP
Expand Down
13 changes: 13 additions & 0 deletions pkg/factory/aliyun/aliyun.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,19 @@ func (a *Aliyun) CreateNetworkInterface(ipv4, ipv6 int, eniType string) (*daemon
return r, nil, nil, err
}

// wait mac
err = wait.PollUntilContextTimeout(ctx, metadataPollInterval, metadataWaitTimeout, true, func(ctx context.Context) (bool, error) {
macs, err := metadata.GetENIsMAC()
if err != nil {
klog.Errorf("metadata: error get mac: %v", err)
return false, nil
}
return sets.NewString(macs...).Has(r.MAC), nil
})
if err != nil {
return r, nil, nil, err
}

prefix, err := metadata.GetVSwitchCIDR(eni.MacAddress)
if err != nil {
return r, nil, nil, err
Expand Down
15 changes: 14 additions & 1 deletion plugin/terway/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package main

import (
"context"
"errors"
"fmt"
"net"
"runtime"
"time"

"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/AliyunContainerService/terway/pkg/link"
"github.com/AliyunContainerService/terway/plugin/datapath"
Expand Down Expand Up @@ -295,7 +298,17 @@ func parseSetupConf(args *skel.CmdArgs, alloc *rpc.NetConf, conf *types.CNIConf,
if alloc.GetENIInfo() != nil {
mac := alloc.GetENIInfo().GetMAC()
if mac != "" {
deviceID, err = link.GetDeviceNumber(mac)
err = retry.OnError(wait.Backoff{
Steps: 10,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0,
}, func(err error) bool {
return errors.Is(err, link.ErrNotFound)
}, func() error {
deviceID, err = link.GetDeviceNumber(mac)
return err
})
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions policy/policyinit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ if [ "$DATASTORE_TYPE" = "kubernetes" ]; then
exit 1
fi
return_code="$(curl -k -o /dev/null -I -L -s -w "%{http_code}" https://"${KUBERNETES_SERVICE_HOST}":"${KUBERNETES_SERVICE_PORT:-443}")"
if [ "$return_code" -ne 403 ]&&[ "$return_code" -ne 200 ]&&[ "$return_code" -ne 201 ];then
if [ "$return_code" -ne 401 ]&&[ "$return_code" -ne 403 ]&&[ "$return_code" -ne 200 ]&&[ "$return_code" -ne 201 ];then
echo "can not access kubernetes service, exiting"
exit 1
fi
Expand Down Expand Up @@ -173,4 +173,4 @@ fi
else
# shellcheck disable=SC2016
exec socat TCP-LISTEN:9099,bind=127.0.0.1,fork,reuseaddr system:'sleep 2;kill -9 $SOCAT_PID 2>/dev/null'
fi
fi

0 comments on commit 325d07e

Please sign in to comment.