Skip to content

Commit

Permalink
Merge pull request #4 from BSWANG/master
Browse files Browse the repository at this point in the history
fix eni-multi-ip state issues
  • Loading branch information
jzwlqx01 authored Mar 5, 2019
2 parents ddb3e3d + 31be2e9 commit a2bbbc8
Show file tree
Hide file tree
Showing 33 changed files with 6,218 additions and 91 deletions.
22 changes: 20 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 44 additions & 32 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
DaemonModeVPC = "VPC"
DaemonModeENIMultiIP = "ENIMultiIP"

gcPeriod = 5 * time.Second
gcPeriod = 5 * time.Minute
)

type networkService struct {
Expand Down Expand Up @@ -424,6 +424,7 @@ func (networkService networkService) startGarbageCollectionLoop() {
gcTicker := time.NewTicker(gcPeriod)
go func() {
for range gcTicker.C {
log.Debugf("do resource gc on node")
networkService.Lock()
pods, err := networkService.k8s.GetLocalPods()
if err != nil {
Expand All @@ -438,8 +439,8 @@ func (networkService networkService) startGarbageCollectionLoop() {
}

var (
inUseList = make(map[string][]string)
expireList = make(map[string][]string)
inUseSet = make(map[string]map[string]interface{})
expireSet = make(map[string]map[string]interface{})
relateExpireList = make([]string, 0)
)

Expand All @@ -451,30 +452,34 @@ func (networkService networkService) startGarbageCollectionLoop() {
}

for _, resRelateObj := range resRelateList {
resRelate := resRelateObj.(*PodResources)
resRelate := resRelateObj.(PodResources)
_, podExist := podKeyMap[podInfoKey(resRelate.PodInfo.Namespace, resRelate.PodInfo.Name)]
if !podExist {
relateExpireList = append(relateExpireList, podInfoKey(resRelate.PodInfo.Namespace, resRelate.PodInfo.Name))
}
for _, res := range resRelate.Resources {
if _, ok := inUseList[res.Type]; !ok {
inUseList[res.Type] = make([]string, 0)
expireList[res.Type] = make([]string, 0)
if _, ok := inUseSet[res.Type]; !ok {
inUseSet[res.Type] = make(map[string]interface{}, 0)
expireSet[res.Type] = make(map[string]interface{}, 0)
}
// already in use by others
if _, ok := inUseSet[res.Type][res.ID]; ok {
continue
}
if podExist {
inUseList[res.Type] = append(inUseList[res.Type], res.ID)
inUseSet[res.Type][res.ID] = struct {}{}
} else {
expireList[res.Type] = append(expireList[res.Type], res.ID)
expireSet[res.Type][res.ID] = struct {}{}
}
}
}
gcDone := true
for mgrType := range inUseList {
for mgrType := range inUseSet {
mgr, ok := networkService.mgrForResource[mgrType]
if ok {
err = mgr.GarbageCollection(inUseList[mgrType], expireList[mgrType])
err = mgr.GarbageCollection(inUseSet[mgrType], expireSet[mgrType])
if err != nil {
log.Warnf("error do garbage collection for %+v, inuse: %v, expire: %v", mgrType, inUseList, expireList)
log.Warnf("error do garbage collection for %+v, inuse: %v, expire: %v, err: %v", mgrType, inUseSet, expireSet, err)
gcDone = false
}
}
Expand Down Expand Up @@ -560,14 +565,30 @@ func newNetworkService(configFilePath, daemonMode string) (rpc.TerwayBackendServ
return nil, errors.Wrapf(err, "error init k8s service")
}

localPods, err := netSrv.k8s.GetLocalPods()
netSrv.resourceDB, err = storage.NewDiskStorage(
ResDBName, ResDBPath, json.Marshal, func(bytes []byte) (interface{}, error) {
resourceRel := &PodResources{}
err = json.Unmarshal(bytes, resourceRel)
if err != nil {
return nil, errors.Wrapf(err, "error unmarshal pod relate resource")
}
return *resourceRel, nil
})
if err != nil {
return nil, errors.Wrapf(err, "error get local pods")
return nil, errors.Wrapf(err, "error init resource manager storage")
}
localIPs := make([]string, 0)
for _, pod := range localPods {
if pod.PodIP != "" {
localIPs = append(localIPs, pod.PodIP)
localResource := make(map[string][]string)
resObjList, err := netSrv.resourceDB.List()
if err != nil {
return nil, errors.Wrapf(err, "error list resource relation db")
}
for _, resObj := range resObjList {
podRes := resObj.(PodResources)
for _, res := range podRes.Resources {
if localResource[res.Type] == nil {
localResource[res.Type] = make([]string, 0)
}
localResource[res.Type] = append(localResource[res.Type], res.ID)
}
}

Expand All @@ -581,7 +602,7 @@ func newNetworkService(configFilePath, daemonMode string) (rpc.TerwayBackendServ
switch daemonMode {
case DaemonModeVPC:
//init eni
netSrv.eniResMgr, err = NewENIResourceManager(poolConfig, ecs, localIPs)
netSrv.eniResMgr, err = NewENIResourceManager(poolConfig, ecs, localResource[types.ResourceTypeENI])
if err != nil {
return nil, errors.Wrapf(err, "error init eni resource manager")
}
Expand All @@ -598,7 +619,7 @@ func newNetworkService(configFilePath, daemonMode string) (rpc.TerwayBackendServ

case DaemonModeENIMultiIP:
//init eni multi ip
netSrv.eniIpResMgr, err = NewENIIPResourceManager(poolConfig, ecs, localIPs)
netSrv.eniIpResMgr, err = NewENIIPResourceManager(poolConfig, ecs, localResource[types.ResourceTypeENIIP])
if err != nil {
return nil, errors.Wrapf(err, "error init eni ip resource manager")
}
Expand All @@ -609,18 +630,9 @@ func newNetworkService(configFilePath, daemonMode string) (rpc.TerwayBackendServ
panic("unsupported daemon mode" + daemonMode)
}

netSrv.resourceDB, err = storage.NewDiskStorage(
ResDBName, ResDBPath, json.Marshal, func(bytes []byte) (interface{}, error) {
resourceRel := &PodResources{}
err = json.Unmarshal(bytes, resourceRel)
if err != nil {
return nil, errors.Wrapf(err, "error unmarshal pod relate resource")
}
return *resourceRel, nil
})
if err != nil {
return nil, errors.Wrapf(err, "error init resource manager storage")
}
//start gc loop
netSrv.startGarbageCollectionLoop()

return netSrv, nil
}

Expand Down
49 changes: 20 additions & 29 deletions daemon/eni-multi-ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type ENIIPFactory struct {

type ENIIP struct {
*types.ENIIP
idle bool
}

type ENI struct {
Expand All @@ -28,18 +27,11 @@ type ENI struct {
ecs aliyun.ECS
}

// 返回一个空闲的IP,将idle设置为false
// 如果没有空闲的,先尝试绑一个
// 尝试绑一个
// 如果满了,返回nil
func (e *ENI) getIdle() *ENIIP {
func (e *ENI) allocateExistENIsIP() *ENIIP {
e.lock.Lock()
defer e.lock.Unlock()
for _, ip := range e.ips {
if ip.idle {
ip.idle = false
return ip
}
}

if len(e.ips) < e.ENI.MaxIPs {
ip, err := e.ecs.AssignIPForENI(e.ENI.ID)
Expand All @@ -52,7 +44,6 @@ func (e *ENI) getIdle() *ENIIP {
Eni: e.ENI,
SecAddress: ip,
},
idle: false,
}
e.ips = append(e.ips, ipNew)
return ipNew
Expand All @@ -63,7 +54,7 @@ func (e *ENI) getIdle() *ENIIP {
func (f *ENIIPFactory) Create() (types.NetworkResource, error) {
f.RLock()
for _, eni := range f.enis {
ip := eni.getIdle()
ip := eni.allocateExistENIsIP()
if ip != nil {
f.RUnlock()
return ip.ENIIP, nil
Expand All @@ -86,23 +77,20 @@ func (f *ENIIPFactory) Create() (types.NetworkResource, error) {
ecs: f.eniFactory.ecs,
}

mainENIIP := &types.ENIIP{
Eni: eni.ENI,
SecAddress: eni.ENI.Address.IP,
}

eni.ips = append(eni.ips, &ENIIP{
ENIIP: &types.ENIIP{
Eni: eni.ENI,
SecAddress: eni.ENI.Address.IP,
},
idle: true,
ENIIP: mainENIIP,
})

f.Lock()
f.enis = append(f.enis, eni)
f.Unlock()

ip := eni.getIdle()
if ip == nil {
return nil, fmt.Errorf("can not get idle ip from newly created eni %s", eni.ID)
}
return ip.ENIIP, nil
return mainENIIP, nil
}

func (f *ENIIPFactory) Dispose(res types.NetworkResource) error {
Expand All @@ -126,7 +114,6 @@ func (f *ENIIPFactory) Dispose(res types.NetworkResource) error {
if eni == nil || eniip == nil {
return fmt.Errorf("invalid resource to dispose")
}
eniip.idle = true

ips, err := f.eniFactory.ecs.GetENIIPs(ip.Eni.ID)

Expand All @@ -152,6 +139,11 @@ func (f *ENIIPFactory) Dispose(res types.NetworkResource) error {
return nil
}

// main ip of eni, raise put_it_back error
if ip.Eni.Address.IP.Equal(ip.SecAddress) {
return fmt.Errorf("ip tobe release is primary ip of eni")
}

err = f.eniFactory.ecs.UnAssignIPForENI(ip.Eni.ID, ip.SecAddress)
if err != nil {
return fmt.Errorf("error unassign eniip, %v", err)
Expand All @@ -177,7 +169,7 @@ type ENIIPResourceManager struct {
pool pool.ObjectPool
}

func NewENIIPResourceManager(poolConfig *types.PoolConfig, ecs aliyun.ECS, allocatedIPs []string) (ResourceManager, error) {
func NewENIIPResourceManager(poolConfig *types.PoolConfig, ecs aliyun.ECS, allocatedResources []string) (ResourceManager, error) {
eniFactory, err := NewENIFactory(poolConfig, ecs)
if err != nil {
return nil, errors.Wrapf(err, "error get eni factory for eniip factory")
Expand Down Expand Up @@ -209,7 +201,7 @@ func NewENIIPResourceManager(poolConfig *types.PoolConfig, ecs aliyun.ECS, alloc
return errors.Wrapf(err, "error get attach eni on pool init")
}
stubMap := make(map[string]bool)
for _, allocated := range allocatedIPs {
for _, allocated := range allocatedResources {
stubMap[allocated] = true
}

Expand All @@ -230,11 +222,10 @@ func NewENIIPResourceManager(poolConfig *types.PoolConfig, ecs aliyun.ECS, alloc
Eni: eni,
SecAddress: ip,
}
_, ok := stubMap[ip.String()]
_, ok := stubMap[eniIP.GetResourceId()]

poolENI.ips = append(poolENI.ips, &ENIIP{
ENIIP: eniIP,
idle: !ok,
})
if !ok {
holder.AddIdle(eniIP)
Expand Down Expand Up @@ -263,8 +254,8 @@ func (m *ENIIPResourceManager) Release(context *NetworkContext, resId string) er
return m.pool.Release(resId)
}

func (m *ENIIPResourceManager) GarbageCollection(inUseResList []string, expireResList []string) error {
for _, expireRes := range expireResList {
func (m *ENIIPResourceManager) GarbageCollection(inUseSet map[string]interface{}, expireResSet map[string]interface{}) error {
for expireRes := range expireResSet {
if err := m.pool.Stat(expireRes); err == nil {
err = m.Release(nil, expireRes)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions daemon/eni.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ENIResourceManager struct {
ecs aliyun.ECS
}

func NewENIResourceManager(poolConfig *types.PoolConfig, ecs aliyun.ECS, allocatedIPs []string) (ResourceManager, error) {
func NewENIResourceManager(poolConfig *types.PoolConfig, ecs aliyun.ECS, allocatedResource []string) (ResourceManager, error) {
factory, err := NewENIFactory(poolConfig, ecs)
if err != nil {
return nil, errors.Wrapf(err, "error create eni factory")
Expand All @@ -42,11 +42,11 @@ func NewENIResourceManager(poolConfig *types.PoolConfig, ecs aliyun.ECS, allocat
return errors.Wrapf(err, "error get attach eni on pool init")
}
allocatedMap := make(map[string]bool)
for _, allocated := range allocatedIPs {
for _, allocated := range allocatedResource {
allocatedMap[allocated] = true
}
for _, e := range enis {
if _, ok := allocatedMap[e.Address.IP.String()]; ok {
if _, ok := allocatedMap[e.ID]; ok {
holder.AddInuse(e)
} else {
holder.AddIdle(e)
Expand Down Expand Up @@ -81,8 +81,8 @@ func (m *ENIResourceManager) Release(context *NetworkContext, resId string) erro
return m.pool.Release(resId)
}

func (m *ENIResourceManager) GarbageCollection(inUseResList []string, expireResList []string) error {
for _, expireRes := range expireResList {
func (m *ENIResourceManager) GarbageCollection(inUseSet map[string]interface{}, expireResSet map[string]interface{}) error {
for expireRes := range expireResSet {
if err := m.pool.Stat(expireRes); err == nil {
err = m.Release(nil, expireRes)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion daemon/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ func (p PodResources) GetResourceItemByType(resType string) []ResourceItem {
type ResourceManager interface {
Allocate(context *NetworkContext, prefer string) (types.NetworkResource, error)
Release(context *NetworkContext, resId string) error
GarbageCollection(inUseResList []string, expireResList []string) error
GarbageCollection(inUseResList map[string]interface{}, expireResList map[string]interface{}) error
}
2 changes: 1 addition & 1 deletion daemon/veth.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (*VethResourceManager) Release(context *NetworkContext, resId string) error
return nil
}

func (f *VethResourceManager) GarbageCollection(inUseResList []string, expireResList []string) error {
func (f *VethResourceManager) GarbageCollection(inUseSet map[string]interface{}, expireResSet map[string]interface{}) error {
// fixme do gc on cni binary
lock, err := disk.NewFileLock(defaultIpamPath)
if err != nil {
Expand Down
Loading

0 comments on commit a2bbbc8

Please sign in to comment.