diff --git a/agent.go b/agent.go index d471ef9cd0..bdf3a6ba68 100644 --- a/agent.go +++ b/agent.go @@ -635,14 +635,15 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error { } buf, err := proto.Marshal(&EndpointRecord{ - Name: name, - ServiceName: ep.svcName, - ServiceID: ep.svcID, - VirtualIP: ep.virtualIP.String(), - IngressPorts: ingressPorts, - Aliases: ep.svcAliases, - TaskAliases: ep.myAliases, - EndpointIP: ep.Iface().Address().IP.String(), + Name: name, + ServiceName: ep.svcName, + ServiceID: ep.svcID, + VirtualIP: ep.virtualIP.String(), + IngressPorts: ingressPorts, + Aliases: ep.svcAliases, + TaskAliases: ep.myAliases, + EndpointIP: ep.Iface().Address().IP.String(), + ServiceDisabled: false, }) if err != nil { return err @@ -660,7 +661,7 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error { return nil } -func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error { +func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, fullRemove bool, method string) error { if ep.isAnonymous() && len(ep.myAliases) == 0 { return nil } @@ -674,6 +675,15 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err defer sb.Service.Unlock() logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID()) + // Avoid a race w/ with a container that aborts preemptively. This would + // get caught in disableServceInNetworkDB, but we check here to make the + // nature of the condition more clear. + // See comment in addServiceInfoToCluster() + if e := sb.getEndpoint(ep.ID()); e == nil { + logrus.Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID()) + return nil + } + c := n.getController() agent := c.getAgent() @@ -683,9 +693,13 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err } if agent != nil { - // First delete from networkDB then locally - if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil { - logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err) + // First update the networkDB then locally + if fullRemove { + if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil { + logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err) + } + } else { + disableServiceInNetworkDB(agent, n, ep) } } @@ -696,7 +710,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err if n.ingress { ingressPorts = ep.ingressPorts } - if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil { + if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil { return err } } else { @@ -712,6 +726,35 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err return nil } +func disableServiceInNetworkDB(a *agent, n *network, ep *endpoint) { + var epRec EndpointRecord + + logrus.Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID()) + + // Update existing record to indicate that the service is disabled + inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID()) + if err != nil { + logrus.Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err) + return + } + // Should never fail + if err := proto.Unmarshal(inBuf, &epRec); err != nil { + logrus.Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err) + return + } + epRec.ServiceDisabled = true + // Should never fail + outBuf, err := proto.Marshal(&epRec) + if err != nil { + logrus.Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err) + return + } + // Send update to the whole cluster + if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil { + logrus.Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err) + } +} + func (n *network) addDriverWatches() { if !n.isClusterEligible() { return @@ -841,7 +884,6 @@ func (c *controller) handleEpTableEvent(ev events.Event) { nid string eid string value []byte - isAdd bool epRec EndpointRecord ) @@ -850,12 +892,15 @@ func (c *controller) handleEpTableEvent(ev events.Event) { nid = event.NetworkID eid = event.Key value = event.Value - isAdd = true case networkdb.DeleteEvent: nid = event.NetworkID eid = event.Key value = event.Value case networkdb.UpdateEvent: + nid = event.NetworkID + eid = event.Key + value = event.Value + default: logrus.Errorf("Unexpected update service table event = %#v", event) return } @@ -880,7 +925,8 @@ func (c *controller) handleEpTableEvent(ev events.Event) { return } - if isAdd { + switch ev.(type) { + case networkdb.CreateEvent: logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec) if svcID != "" { // This is a remote task part of a service @@ -894,11 +940,12 @@ func (c *controller) handleEpTableEvent(ev events.Event) { logrus.Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err) } } - } else { + + case networkdb.DeleteEvent: logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec) if svcID != "" { // This is a remote task part of a service - if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil { + if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil { logrus.Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err) return } @@ -908,5 +955,18 @@ func (c *controller) handleEpTableEvent(ev events.Event) { logrus.Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err) } } + case networkdb.UpdateEvent: + logrus.Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec) + // We currently should only get these to inform us that an endpoint + // is disabled. Report if otherwise. + if svcID == "" || !epRec.ServiceDisabled { + logrus.Errorf("Unexpected update table event for %s epRec:%v", eid, epRec) + return + } + // This is a remote task that is part of a service that is now disabled + if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil { + logrus.Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err) + return + } } } diff --git a/agent.pb.go b/agent.pb.go index 5b969232a7..67391a0dde 100644 --- a/agent.pb.go +++ b/agent.pb.go @@ -1,6 +1,5 @@ -// Code generated by protoc-gen-gogo. +// Code generated by protoc-gen-gogo. DO NOT EDIT. // source: agent.proto -// DO NOT EDIT! /* Package libnetwork is a generated protocol buffer package. @@ -20,9 +19,6 @@ import math "math" import _ "github.com/gogo/protobuf/gogoproto" import strings "strings" -import github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto" -import sort "sort" -import strconv "strconv" import reflect "reflect" import io "io" @@ -34,7 +30,9 @@ var _ = math.Inf // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. -const _ = proto.GoGoProtoPackageIsVersion1 +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type PortConfig_Protocol int32 @@ -60,7 +58,7 @@ func (PortConfig_Protocol) EnumDescriptor() ([]byte, []int) { return fileDescrip // EndpointRecord specifies all the endpoint specific information that // needs to gossiped to nodes participating in the network. type EndpointRecord struct { - // Name of the endpoint + // Name of the container Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // Service name of the service to which this endpoint belongs. ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"` @@ -76,12 +74,49 @@ type EndpointRecord struct { Aliases []string `protobuf:"bytes,7,rep,name=aliases" json:"aliases,omitempty"` // List of aliases task specific aliases TaskAliases []string `protobuf:"bytes,8,rep,name=task_aliases,json=taskAliases" json:"task_aliases,omitempty"` + // Whether this enpoint's service has been disabled + ServiceDisabled bool `protobuf:"varint,9,opt,name=service_disabled,json=serviceDisabled,proto3" json:"service_disabled,omitempty"` } func (m *EndpointRecord) Reset() { *m = EndpointRecord{} } func (*EndpointRecord) ProtoMessage() {} func (*EndpointRecord) Descriptor() ([]byte, []int) { return fileDescriptorAgent, []int{0} } +func (m *EndpointRecord) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *EndpointRecord) GetServiceName() string { + if m != nil { + return m.ServiceName + } + return "" +} + +func (m *EndpointRecord) GetServiceID() string { + if m != nil { + return m.ServiceID + } + return "" +} + +func (m *EndpointRecord) GetVirtualIP() string { + if m != nil { + return m.VirtualIP + } + return "" +} + +func (m *EndpointRecord) GetEndpointIP() string { + if m != nil { + return m.EndpointIP + } + return "" +} + func (m *EndpointRecord) GetIngressPorts() []*PortConfig { if m != nil { return m.IngressPorts @@ -89,6 +124,27 @@ func (m *EndpointRecord) GetIngressPorts() []*PortConfig { return nil } +func (m *EndpointRecord) GetAliases() []string { + if m != nil { + return m.Aliases + } + return nil +} + +func (m *EndpointRecord) GetTaskAliases() []string { + if m != nil { + return m.TaskAliases + } + return nil +} + +func (m *EndpointRecord) GetServiceDisabled() bool { + if m != nil { + return m.ServiceDisabled + } + return false +} + // PortConfig specifies an exposed port which can be // addressed using the given name. This can be later queried // using a service discovery api or a DNS SRV query. The node @@ -115,6 +171,34 @@ func (m *PortConfig) Reset() { *m = PortConfig{} } func (*PortConfig) ProtoMessage() {} func (*PortConfig) Descriptor() ([]byte, []int) { return fileDescriptorAgent, []int{1} } +func (m *PortConfig) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *PortConfig) GetProtocol() PortConfig_Protocol { + if m != nil { + return m.Protocol + } + return ProtocolTCP +} + +func (m *PortConfig) GetTargetPort() uint32 { + if m != nil { + return m.TargetPort + } + return 0 +} + +func (m *PortConfig) GetPublishedPort() uint32 { + if m != nil { + return m.PublishedPort + } + return 0 +} + func init() { proto.RegisterType((*EndpointRecord)(nil), "libnetwork.EndpointRecord") proto.RegisterType((*PortConfig)(nil), "libnetwork.PortConfig") @@ -124,7 +208,7 @@ func (this *EndpointRecord) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 12) + s := make([]string, 0, 13) s = append(s, "&libnetwork.EndpointRecord{") s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n") s = append(s, "ServiceName: "+fmt.Sprintf("%#v", this.ServiceName)+",\n") @@ -136,6 +220,7 @@ func (this *EndpointRecord) GoString() string { } s = append(s, "Aliases: "+fmt.Sprintf("%#v", this.Aliases)+",\n") s = append(s, "TaskAliases: "+fmt.Sprintf("%#v", this.TaskAliases)+",\n") + s = append(s, "ServiceDisabled: "+fmt.Sprintf("%#v", this.ServiceDisabled)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -160,74 +245,57 @@ func valueToGoStringAgent(v interface{}, typ string) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) } -func extensionToGoStringAgent(e map[int32]github_com_gogo_protobuf_proto.Extension) string { - if e == nil { - return "nil" - } - s := "map[int32]proto.Extension{" - keys := make([]int, 0, len(e)) - for k := range e { - keys = append(keys, int(k)) - } - sort.Ints(keys) - ss := []string{} - for _, k := range keys { - ss = append(ss, strconv.Itoa(k)+": "+e[int32(k)].GoString()) - } - s += strings.Join(ss, ",") + "}" - return s -} -func (m *EndpointRecord) Marshal() (data []byte, err error) { +func (m *EndpointRecord) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *EndpointRecord) MarshalTo(data []byte) (int, error) { +func (m *EndpointRecord) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if len(m.Name) > 0 { - data[i] = 0xa + dAtA[i] = 0xa i++ - i = encodeVarintAgent(data, i, uint64(len(m.Name))) - i += copy(data[i:], m.Name) + i = encodeVarintAgent(dAtA, i, uint64(len(m.Name))) + i += copy(dAtA[i:], m.Name) } if len(m.ServiceName) > 0 { - data[i] = 0x12 + dAtA[i] = 0x12 i++ - i = encodeVarintAgent(data, i, uint64(len(m.ServiceName))) - i += copy(data[i:], m.ServiceName) + i = encodeVarintAgent(dAtA, i, uint64(len(m.ServiceName))) + i += copy(dAtA[i:], m.ServiceName) } if len(m.ServiceID) > 0 { - data[i] = 0x1a + dAtA[i] = 0x1a i++ - i = encodeVarintAgent(data, i, uint64(len(m.ServiceID))) - i += copy(data[i:], m.ServiceID) + i = encodeVarintAgent(dAtA, i, uint64(len(m.ServiceID))) + i += copy(dAtA[i:], m.ServiceID) } if len(m.VirtualIP) > 0 { - data[i] = 0x22 + dAtA[i] = 0x22 i++ - i = encodeVarintAgent(data, i, uint64(len(m.VirtualIP))) - i += copy(data[i:], m.VirtualIP) + i = encodeVarintAgent(dAtA, i, uint64(len(m.VirtualIP))) + i += copy(dAtA[i:], m.VirtualIP) } if len(m.EndpointIP) > 0 { - data[i] = 0x2a + dAtA[i] = 0x2a i++ - i = encodeVarintAgent(data, i, uint64(len(m.EndpointIP))) - i += copy(data[i:], m.EndpointIP) + i = encodeVarintAgent(dAtA, i, uint64(len(m.EndpointIP))) + i += copy(dAtA[i:], m.EndpointIP) } if len(m.IngressPorts) > 0 { for _, msg := range m.IngressPorts { - data[i] = 0x32 + dAtA[i] = 0x32 i++ - i = encodeVarintAgent(data, i, uint64(msg.Size())) - n, err := msg.MarshalTo(data[i:]) + i = encodeVarintAgent(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) if err != nil { return 0, err } @@ -236,101 +304,93 @@ func (m *EndpointRecord) MarshalTo(data []byte) (int, error) { } if len(m.Aliases) > 0 { for _, s := range m.Aliases { - data[i] = 0x3a + dAtA[i] = 0x3a i++ l = len(s) for l >= 1<<7 { - data[i] = uint8(uint64(l)&0x7f | 0x80) + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) l >>= 7 i++ } - data[i] = uint8(l) + dAtA[i] = uint8(l) i++ - i += copy(data[i:], s) + i += copy(dAtA[i:], s) } } if len(m.TaskAliases) > 0 { for _, s := range m.TaskAliases { - data[i] = 0x42 + dAtA[i] = 0x42 i++ l = len(s) for l >= 1<<7 { - data[i] = uint8(uint64(l)&0x7f | 0x80) + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) l >>= 7 i++ } - data[i] = uint8(l) + dAtA[i] = uint8(l) i++ - i += copy(data[i:], s) + i += copy(dAtA[i:], s) + } + } + if m.ServiceDisabled { + dAtA[i] = 0x48 + i++ + if m.ServiceDisabled { + dAtA[i] = 1 + } else { + dAtA[i] = 0 } + i++ } return i, nil } -func (m *PortConfig) Marshal() (data []byte, err error) { +func (m *PortConfig) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *PortConfig) MarshalTo(data []byte) (int, error) { +func (m *PortConfig) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if len(m.Name) > 0 { - data[i] = 0xa + dAtA[i] = 0xa i++ - i = encodeVarintAgent(data, i, uint64(len(m.Name))) - i += copy(data[i:], m.Name) + i = encodeVarintAgent(dAtA, i, uint64(len(m.Name))) + i += copy(dAtA[i:], m.Name) } if m.Protocol != 0 { - data[i] = 0x10 + dAtA[i] = 0x10 i++ - i = encodeVarintAgent(data, i, uint64(m.Protocol)) + i = encodeVarintAgent(dAtA, i, uint64(m.Protocol)) } if m.TargetPort != 0 { - data[i] = 0x18 + dAtA[i] = 0x18 i++ - i = encodeVarintAgent(data, i, uint64(m.TargetPort)) + i = encodeVarintAgent(dAtA, i, uint64(m.TargetPort)) } if m.PublishedPort != 0 { - data[i] = 0x20 + dAtA[i] = 0x20 i++ - i = encodeVarintAgent(data, i, uint64(m.PublishedPort)) + i = encodeVarintAgent(dAtA, i, uint64(m.PublishedPort)) } return i, nil } -func encodeFixed64Agent(data []byte, offset int, v uint64) int { - data[offset] = uint8(v) - data[offset+1] = uint8(v >> 8) - data[offset+2] = uint8(v >> 16) - data[offset+3] = uint8(v >> 24) - data[offset+4] = uint8(v >> 32) - data[offset+5] = uint8(v >> 40) - data[offset+6] = uint8(v >> 48) - data[offset+7] = uint8(v >> 56) - return offset + 8 -} -func encodeFixed32Agent(data []byte, offset int, v uint32) int { - data[offset] = uint8(v) - data[offset+1] = uint8(v >> 8) - data[offset+2] = uint8(v >> 16) - data[offset+3] = uint8(v >> 24) - return offset + 4 -} -func encodeVarintAgent(data []byte, offset int, v uint64) int { +func encodeVarintAgent(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { - data[offset] = uint8(v&0x7f | 0x80) + dAtA[offset] = uint8(v&0x7f | 0x80) v >>= 7 offset++ } - data[offset] = uint8(v) + dAtA[offset] = uint8(v) return offset + 1 } func (m *EndpointRecord) Size() (n int) { @@ -374,6 +434,9 @@ func (m *EndpointRecord) Size() (n int) { n += 1 + l + sovAgent(uint64(l)) } } + if m.ServiceDisabled { + n += 2 + } return n } @@ -422,6 +485,7 @@ func (this *EndpointRecord) String() string { `IngressPorts:` + strings.Replace(fmt.Sprintf("%v", this.IngressPorts), "PortConfig", "PortConfig", 1) + `,`, `Aliases:` + fmt.Sprintf("%v", this.Aliases) + `,`, `TaskAliases:` + fmt.Sprintf("%v", this.TaskAliases) + `,`, + `ServiceDisabled:` + fmt.Sprintf("%v", this.ServiceDisabled) + `,`, `}`, }, "") return s @@ -447,8 +511,8 @@ func valueToStringAgent(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } -func (m *EndpointRecord) Unmarshal(data []byte) error { - l := len(data) +func (m *EndpointRecord) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -460,7 +524,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -488,7 +552,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -503,7 +567,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Name = string(data[iNdEx:postIndex]) + m.Name = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: if wireType != 2 { @@ -517,7 +581,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -532,7 +596,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.ServiceName = string(data[iNdEx:postIndex]) + m.ServiceName = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 3: if wireType != 2 { @@ -546,7 +610,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -561,7 +625,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.ServiceID = string(data[iNdEx:postIndex]) + m.ServiceID = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 4: if wireType != 2 { @@ -575,7 +639,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -590,7 +654,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.VirtualIP = string(data[iNdEx:postIndex]) + m.VirtualIP = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 5: if wireType != 2 { @@ -604,7 +668,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -619,7 +683,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.EndpointIP = string(data[iNdEx:postIndex]) + m.EndpointIP = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 6: if wireType != 2 { @@ -633,7 +697,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ msglen |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -648,7 +712,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { return io.ErrUnexpectedEOF } m.IngressPorts = append(m.IngressPorts, &PortConfig{}) - if err := m.IngressPorts[len(m.IngressPorts)-1].Unmarshal(data[iNdEx:postIndex]); err != nil { + if err := m.IngressPorts[len(m.IngressPorts)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex @@ -664,7 +728,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -679,7 +743,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Aliases = append(m.Aliases, string(data[iNdEx:postIndex])) + m.Aliases = append(m.Aliases, string(dAtA[iNdEx:postIndex])) iNdEx = postIndex case 8: if wireType != 2 { @@ -693,7 +757,7 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -708,11 +772,31 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.TaskAliases = append(m.TaskAliases, string(data[iNdEx:postIndex])) + m.TaskAliases = append(m.TaskAliases, string(dAtA[iNdEx:postIndex])) iNdEx = postIndex + case 9: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ServiceDisabled", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAgent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.ServiceDisabled = bool(v != 0) default: iNdEx = preIndex - skippy, err := skipAgent(data[iNdEx:]) + skippy, err := skipAgent(dAtA[iNdEx:]) if err != nil { return err } @@ -731,8 +815,8 @@ func (m *EndpointRecord) Unmarshal(data []byte) error { } return nil } -func (m *PortConfig) Unmarshal(data []byte) error { - l := len(data) +func (m *PortConfig) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -744,7 +828,7 @@ func (m *PortConfig) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -772,7 +856,7 @@ func (m *PortConfig) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -787,7 +871,7 @@ func (m *PortConfig) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Name = string(data[iNdEx:postIndex]) + m.Name = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: if wireType != 0 { @@ -801,7 +885,7 @@ func (m *PortConfig) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.Protocol |= (PortConfig_Protocol(b) & 0x7F) << shift if b < 0x80 { @@ -820,7 +904,7 @@ func (m *PortConfig) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.TargetPort |= (uint32(b) & 0x7F) << shift if b < 0x80 { @@ -839,7 +923,7 @@ func (m *PortConfig) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.PublishedPort |= (uint32(b) & 0x7F) << shift if b < 0x80 { @@ -848,7 +932,7 @@ func (m *PortConfig) Unmarshal(data []byte) error { } default: iNdEx = preIndex - skippy, err := skipAgent(data[iNdEx:]) + skippy, err := skipAgent(dAtA[iNdEx:]) if err != nil { return err } @@ -867,8 +951,8 @@ func (m *PortConfig) Unmarshal(data []byte) error { } return nil } -func skipAgent(data []byte) (n int, err error) { - l := len(data) +func skipAgent(dAtA []byte) (n int, err error) { + l := len(dAtA) iNdEx := 0 for iNdEx < l { var wire uint64 @@ -879,7 +963,7 @@ func skipAgent(data []byte) (n int, err error) { if iNdEx >= l { return 0, io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -897,7 +981,7 @@ func skipAgent(data []byte) (n int, err error) { return 0, io.ErrUnexpectedEOF } iNdEx++ - if data[iNdEx-1] < 0x80 { + if dAtA[iNdEx-1] < 0x80 { break } } @@ -914,7 +998,7 @@ func skipAgent(data []byte) (n int, err error) { if iNdEx >= l { return 0, io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ length |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -937,7 +1021,7 @@ func skipAgent(data []byte) (n int, err error) { if iNdEx >= l { return 0, io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ innerWire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -948,7 +1032,7 @@ func skipAgent(data []byte) (n int, err error) { if innerWireType == 4 { break } - next, err := skipAgent(data[start:]) + next, err := skipAgent(dAtA[start:]) if err != nil { return 0, err } @@ -972,32 +1056,36 @@ var ( ErrIntOverflowAgent = fmt.Errorf("proto: integer overflow") ) +func init() { proto.RegisterFile("agent.proto", fileDescriptorAgent) } + var fileDescriptorAgent = []byte{ - // 413 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x90, 0xbf, 0xae, 0xd3, 0x30, - 0x14, 0x87, 0x9b, 0xdb, 0x70, 0x6f, 0x73, 0x72, 0x13, 0xae, 0x2c, 0x84, 0xa2, 0x0e, 0x69, 0xa9, - 0x84, 0x74, 0x07, 0x94, 0x2b, 0x95, 0xb1, 0x13, 0x6d, 0x19, 0xb2, 0xa0, 0xc8, 0xfc, 0x59, 0xa3, - 0xb4, 0x31, 0xc1, 0x6a, 0x88, 0x23, 0xdb, 0x2d, 0x2b, 0x23, 0xe2, 0x1d, 0x98, 0x78, 0x19, 0x26, - 0xc4, 0xc8, 0x84, 0x68, 0x57, 0x16, 0x1e, 0x01, 0xdb, 0x49, 0x5a, 0x21, 0x75, 0x38, 0x92, 0xf3, - 0xfd, 0xbe, 0xe3, 0x1c, 0x1f, 0x70, 0xb3, 0x82, 0x54, 0x32, 0xaa, 0x39, 0x93, 0x0c, 0x41, 0x49, - 0x57, 0x15, 0x91, 0x1f, 0x18, 0xdf, 0x0c, 0x1f, 0x14, 0xac, 0x60, 0x06, 0xdf, 0xe9, 0x53, 0x63, - 0x4c, 0xbe, 0x5f, 0x80, 0xff, 0xbc, 0xca, 0x6b, 0x46, 0x2b, 0x89, 0xc9, 0x9a, 0xf1, 0x1c, 0x21, - 0xb0, 0xab, 0xec, 0x3d, 0x09, 0xac, 0xb1, 0x75, 0xeb, 0x60, 0x73, 0x46, 0x8f, 0xe0, 0x5a, 0x10, - 0xbe, 0xa3, 0x6b, 0x92, 0x9a, 0xec, 0xc2, 0x64, 0x6e, 0xcb, 0x5e, 0x68, 0xe5, 0x09, 0x40, 0xa7, - 0xd0, 0x3c, 0xe8, 0x6b, 0x61, 0xee, 0x1d, 0x7e, 0x8d, 0x9c, 0x97, 0x0d, 0x8d, 0x97, 0xd8, 0x69, - 0x85, 0x38, 0xd7, 0xf6, 0x8e, 0x72, 0xb9, 0xcd, 0xca, 0x94, 0xd6, 0x81, 0x7d, 0xb2, 0xdf, 0x34, - 0x34, 0x4e, 0xb0, 0xd3, 0x0a, 0x71, 0x8d, 0xee, 0xc0, 0x25, 0xed, 0x90, 0x5a, 0xbf, 0x67, 0x74, - 0x5f, 0xe9, 0xd0, 0xcd, 0xae, 0x7c, 0xe8, 0x14, 0xd5, 0x30, 0x03, 0x8f, 0x56, 0x05, 0x27, 0x42, - 0xa4, 0x35, 0xe3, 0x52, 0x04, 0x97, 0xe3, 0xfe, 0xad, 0x3b, 0x7d, 0x18, 0x9d, 0x16, 0x12, 0x25, - 0x2a, 0x58, 0xb0, 0xea, 0x2d, 0x2d, 0xf0, 0x75, 0x2b, 0x6b, 0x24, 0x50, 0x00, 0x57, 0x59, 0x49, - 0x33, 0x41, 0x44, 0x70, 0xa5, 0xda, 0x1c, 0xdc, 0x7d, 0xea, 0x35, 0xc8, 0x4c, 0x6c, 0xd2, 0x2e, - 0x1e, 0x98, 0xd8, 0xd5, 0xec, 0x59, 0x83, 0x26, 0x7f, 0x2c, 0x80, 0xd3, 0xcd, 0x67, 0x97, 0x39, - 0x83, 0x81, 0x59, 0xfe, 0x9a, 0x95, 0x66, 0x91, 0xfe, 0x74, 0x74, 0x7e, 0xae, 0x28, 0x69, 0x35, - 0x7c, 0x6c, 0x40, 0x23, 0x50, 0xbf, 0xe3, 0x05, 0x91, 0xe6, 0x61, 0x66, 0xcf, 0x1e, 0x86, 0x06, - 0xe9, 0x4e, 0xf4, 0x18, 0xfc, 0x7a, 0xbb, 0x2a, 0xa9, 0x78, 0x47, 0xf2, 0xc6, 0xb1, 0x8d, 0xe3, - 0x1d, 0xa9, 0xd6, 0x26, 0x4b, 0x18, 0x74, 0xb7, 0xab, 0x07, 0xf7, 0x5f, 0x2d, 0x92, 0x9b, 0xde, - 0xf0, 0xfe, 0xe7, 0x2f, 0x63, 0xb7, 0xc3, 0x0a, 0xe9, 0xe4, 0xf5, 0x32, 0xb9, 0xb1, 0xfe, 0x4f, - 0x14, 0x1a, 0xda, 0x9f, 0xbe, 0x86, 0xbd, 0x79, 0xf0, 0x73, 0x1f, 0xf6, 0xfe, 0xee, 0x43, 0xeb, - 0xe3, 0x21, 0xb4, 0xbe, 0xa9, 0xfa, 0xa1, 0xea, 0xb7, 0xaa, 0xd5, 0xa5, 0x99, 0xf8, 0xe9, 0xbf, - 0x00, 0x00, 0x00, 0xff, 0xff, 0xc9, 0x63, 0x1a, 0x0f, 0x90, 0x02, 0x00, 0x00, + // 441 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x91, 0x31, 0x6f, 0xd3, 0x40, + 0x18, 0x86, 0x73, 0x4d, 0x68, 0xe3, 0xcf, 0x4d, 0x1a, 0x9d, 0x10, 0x3a, 0x65, 0x70, 0x4c, 0x24, + 0xa4, 0x20, 0xa1, 0x54, 0x2a, 0x63, 0x27, 0x1a, 0x33, 0x78, 0x41, 0xd6, 0xd1, 0xb2, 0x46, 0x4e, + 0x7c, 0x98, 0x53, 0x8d, 0xcf, 0xba, 0xbb, 0x96, 0x95, 0x11, 0xf1, 0x1f, 0x98, 0xf8, 0x33, 0x8c, + 0x8c, 0x4c, 0x15, 0xf5, 0xca, 0xc2, 0xca, 0x86, 0xee, 0x7c, 0xd7, 0xa8, 0x52, 0xb7, 0xf3, 0xf3, + 0x3e, 0x67, 0x7d, 0xf7, 0x7e, 0x10, 0xe6, 0x25, 0xab, 0xf5, 0xb2, 0x91, 0x42, 0x0b, 0x0c, 0x15, + 0xdf, 0xd4, 0x4c, 0x7f, 0x12, 0xf2, 0x72, 0xfa, 0xb8, 0x14, 0xa5, 0xb0, 0xf8, 0xd8, 0x9c, 0x3a, + 0x63, 0xfe, 0x6f, 0x0f, 0xc6, 0xaf, 0xeb, 0xa2, 0x11, 0xbc, 0xd6, 0x94, 0x6d, 0x85, 0x2c, 0x30, + 0x86, 0x41, 0x9d, 0x7f, 0x64, 0x04, 0xc5, 0x68, 0x11, 0x50, 0x7b, 0xc6, 0x4f, 0xe1, 0x50, 0x31, + 0x79, 0xcd, 0xb7, 0x6c, 0x6d, 0xb3, 0x3d, 0x9b, 0x85, 0x8e, 0xbd, 0x31, 0xca, 0x0b, 0x00, 0xaf, + 0xf0, 0x82, 0xf4, 0x8d, 0x70, 0x36, 0x6a, 0x6f, 0x66, 0xc1, 0xdb, 0x8e, 0xa6, 0x09, 0x0d, 0x9c, + 0x90, 0x16, 0xc6, 0xbe, 0xe6, 0x52, 0x5f, 0xe5, 0xd5, 0x9a, 0x37, 0x64, 0xb0, 0xb3, 0xdf, 0x75, + 0x34, 0xcd, 0x68, 0xe0, 0x84, 0xb4, 0xc1, 0xc7, 0x10, 0x32, 0x37, 0xa4, 0xd1, 0x1f, 0x59, 0x7d, + 0xdc, 0xde, 0xcc, 0xc0, 0xcf, 0x9e, 0x66, 0x14, 0xbc, 0x92, 0x36, 0xf8, 0x14, 0x46, 0xbc, 0x2e, + 0x25, 0x53, 0x6a, 0xdd, 0x08, 0xa9, 0x15, 0xd9, 0x8f, 0xfb, 0x8b, 0xf0, 0xe4, 0xc9, 0x72, 0x57, + 0xc8, 0x32, 0x13, 0x52, 0xaf, 0x44, 0xfd, 0x9e, 0x97, 0xf4, 0xd0, 0xc9, 0x06, 0x29, 0x4c, 0xe0, + 0x20, 0xaf, 0x78, 0xae, 0x98, 0x22, 0x07, 0x71, 0x7f, 0x11, 0x50, 0xff, 0x69, 0x6a, 0xd0, 0xb9, + 0xba, 0x5c, 0xfb, 0x78, 0x68, 0xe3, 0xd0, 0xb0, 0x57, 0x4e, 0x79, 0x0e, 0x13, 0x5f, 0x43, 0xc1, + 0x55, 0xbe, 0xa9, 0x58, 0x41, 0x82, 0x18, 0x2d, 0x86, 0xf4, 0xc8, 0xf1, 0xc4, 0xe1, 0xf9, 0x1f, + 0x04, 0xb0, 0x1b, 0xe2, 0xc1, 0xde, 0x4f, 0x61, 0x68, 0xf7, 0xb4, 0x15, 0x95, 0xed, 0x7c, 0x7c, + 0x32, 0x7b, 0xf8, 0x09, 0xcb, 0xcc, 0x69, 0xf4, 0xee, 0x02, 0x9e, 0x41, 0xa8, 0x73, 0x59, 0x32, + 0x6d, 0x3b, 0xb0, 0x2b, 0x19, 0x51, 0xe8, 0x90, 0xb9, 0x89, 0x9f, 0xc1, 0xb8, 0xb9, 0xda, 0x54, + 0x5c, 0x7d, 0x60, 0x45, 0xe7, 0x0c, 0xac, 0x33, 0xba, 0xa3, 0x46, 0x9b, 0x27, 0x30, 0xf4, 0x7f, + 0xc7, 0x04, 0xfa, 0xe7, 0xab, 0x6c, 0xd2, 0x9b, 0x1e, 0x7d, 0xfd, 0x16, 0x87, 0x1e, 0x9f, 0xaf, + 0x32, 0x93, 0x5c, 0x24, 0xd9, 0x04, 0xdd, 0x4f, 0x2e, 0x92, 0x6c, 0x3a, 0xf8, 0xf2, 0x3d, 0xea, + 0x9d, 0x91, 0x5f, 0xb7, 0x51, 0xef, 0xef, 0x6d, 0x84, 0x3e, 0xb7, 0x11, 0xfa, 0xd1, 0x46, 0xe8, + 0x67, 0x1b, 0xa1, 0xdf, 0x6d, 0x84, 0x36, 0xfb, 0x76, 0xe2, 0x97, 0xff, 0x03, 0x00, 0x00, 0xff, + 0xff, 0x4f, 0xa4, 0x1a, 0x30, 0xbb, 0x02, 0x00, 0x00, } diff --git a/agent.proto b/agent.proto index 54c71c0e2a..628f70ba8a 100644 --- a/agent.proto +++ b/agent.proto @@ -37,6 +37,9 @@ message EndpointRecord { // List of aliases task specific aliases repeated string task_aliases = 8; + + // Whether this enpoint's service has been disabled + bool service_disabled = 9; } // PortConfig specifies an exposed port which can be diff --git a/endpoint.go b/endpoint.go index 111b747352..333b62d4ae 100644 --- a/endpoint.go +++ b/endpoint.go @@ -304,16 +304,25 @@ func (ep *endpoint) isAnonymous() bool { return ep.anonymous } -// enableService sets ep's serviceEnabled to the passed value if it's not in the -// current state and returns true; false otherwise. -func (ep *endpoint) enableService(state bool) bool { +// isServiceEnabled check if service is enabled on the endpoint +func (ep *endpoint) isServiceEnabled() bool { ep.Lock() defer ep.Unlock() - if ep.serviceEnabled != state { - ep.serviceEnabled = state - return true - } - return false + return ep.serviceEnabled +} + +// enableService sets service enabled on the endpoint +func (ep *endpoint) enableService() { + ep.Lock() + defer ep.Unlock() + ep.serviceEnabled = true +} + +// disableService disables service on the endpoint +func (ep *endpoint) disableService() { + ep.Lock() + defer ep.Unlock() + ep.serviceEnabled = false } func (ep *endpoint) needResolver() bool { @@ -604,7 +613,7 @@ func (ep *endpoint) rename(name string) error { } if c.isAgent() { - if err = ep.deleteServiceInfoFromCluster(sb, "rename"); err != nil { + if err = ep.deleteServiceInfoFromCluster(sb, true, "rename"); err != nil { return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err) } } else { @@ -628,7 +637,7 @@ func (ep *endpoint) rename(name string) error { } defer func() { if err != nil { - ep.deleteServiceInfoFromCluster(sb, "rename") + ep.deleteServiceInfoFromCluster(sb, true, "rename") ep.name = oldName ep.anonymous = oldAnonymous ep.addServiceInfoToCluster(sb) @@ -739,8 +748,14 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption) } } + if ep.svcID != "" { + if err := ep.deleteServiceInfoFromCluster(sb, true, "sbLeave"); err != nil { + logrus.Warnf("Failed to clean up service info on container %s disconnect: %v", ep.name, err) + } + } + if err := sb.clearNetworkResources(ep); err != nil { - logrus.Warnf("Could not cleanup network resources on container %s disconnect: %v", ep.name, err) + logrus.Warnf("Failed to clean up network resources on container %s disconnect: %v", ep.name, err) } // Update the store about the sandbox detach only after we @@ -752,12 +767,8 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption) return err } - if e := ep.deleteServiceInfoFromCluster(sb, "sbLeave"); e != nil { - logrus.Errorf("Could not delete service state for endpoint %s from cluster: %v", ep.Name(), e) - } - if e := ep.deleteDriverInfoFromCluster(); e != nil { - logrus.Errorf("Could not delete endpoint state for endpoint %s from cluster: %v", ep.Name(), e) + logrus.Errorf("Failed to delete endpoint state for endpoint %s from cluster: %v", ep.Name(), e) } sb.deleteHostsEntries(n.getSvcRecords(ep)) diff --git a/sandbox.go b/sandbox.go index 472dbeafe7..7d34aace1c 100644 --- a/sandbox.go +++ b/sandbox.go @@ -670,24 +670,41 @@ func (sb *sandbox) SetKey(basePath string) error { return nil } -func (sb *sandbox) EnableService() error { +func (sb *sandbox) EnableService() (err error) { logrus.Debugf("EnableService %s START", sb.containerID) + defer func() { + if err != nil { + sb.DisableService() + } + }() for _, ep := range sb.getConnectedEndpoints() { - if ep.enableService(true) { + if !ep.isServiceEnabled() { if err := ep.addServiceInfoToCluster(sb); err != nil { - ep.enableService(false) return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err) } + ep.enableService() } } logrus.Debugf("EnableService %s DONE", sb.containerID) return nil } -func (sb *sandbox) DisableService() error { +func (sb *sandbox) DisableService() (err error) { logrus.Debugf("DisableService %s START", sb.containerID) + failedEps := []string{} + defer func() { + if len(failedEps) > 0 { + err = fmt.Errorf("failed to disable service on sandbox:%s, for endpoints %s", sb.ID(), strings.Join(failedEps, ",")) + } + }() for _, ep := range sb.getConnectedEndpoints() { - ep.enableService(false) + if ep.isServiceEnabled() { + if err := ep.deleteServiceInfoFromCluster(sb, false, "DisableService"); err != nil { + failedEps = append(failedEps, ep.Name()) + logrus.Warnf("failed update state for endpoint %s into cluster: %v", ep.Name(), err) + } + ep.disableService() + } } logrus.Debugf("DisableService %s DONE", sb.containerID) return nil diff --git a/service.go b/service.go index 5a0d7e0057..71374cacb2 100644 --- a/service.go +++ b/service.go @@ -79,13 +79,18 @@ func (s *service) printIPToEndpoint(ip string) (string, bool) { return s.ipToEndpoint.String(ip) } +type lbBackend struct { + ip net.IP + disabled bool +} + type loadBalancer struct { vip net.IP fwMark uint32 // Map of backend IPs backing this loadbalancer on this // network. It is keyed with endpoint ID. - backEnds map[string]net.IP + backEnds map[string]*lbBackend // Back pointer to service to which the loadbalancer belongs. service *service diff --git a/service_common.go b/service_common.go index 05ea2bf25f..98454c2c7e 100644 --- a/service_common.go +++ b/service_common.go @@ -196,23 +196,8 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) { if cleanupNID != "" && nid != cleanupNID { continue } - - for eid, ip := range lb.backEnds { - epID := eid - epIP := ip - service := s - loadBalancer := lb - networkID := nid - cleanupFuncs = append(cleanupFuncs, func() { - // ContainerName and taskAliases are not available here, this is still fine because the Service discovery - // cleanup already happened before. The only thing that rmServiceBinding is still doing here a part from the Load - // Balancer bookeeping, is to keep consistent the mapping of endpoint to IP. - if err := c.rmServiceBinding(service.name, service.id, networkID, epID, "", loadBalancer.vip, - service.ingressPorts, service.aliases, []string{}, epIP, "cleanupServiceBindings", false); err != nil { - logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", - service.id, networkID, epID, err) - } - }) + for eid, be := range lb.backEnds { + cleanupFuncs = append(cleanupFuncs, makeServiceCleanupFunc(c, s, nid, eid, lb.vip, be.ip)) } } s.Unlock() @@ -224,6 +209,17 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) { } +func makeServiceCleanupFunc(c *controller, s *service, nID, eID string, vip net.IP, ip net.IP) func() { + // ContainerName and taskAliases are not available here, this is still fine because the Service discovery + // cleanup already happened before. The only thing that rmServiceBinding is still doing here a part from the Load + // Balancer bookeeping, is to keep consistent the mapping of endpoint to IP. + return func() { + if err := c.rmServiceBinding(s.name, s.id, nID, eID, "", vip, s.ingressPorts, s.aliases, []string{}, ip, "cleanupServiceBindings", false, true); err != nil { + logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", s.id, nID, eID, err) + } + } +} + func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases, taskAliases []string, ip net.IP, method string) error { var addService bool @@ -269,7 +265,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s lb = &loadBalancer{ vip: vip, fwMark: fwMarkCtr, - backEnds: make(map[string]net.IP), + backEnds: make(map[string]*lbBackend), service: s, } @@ -280,7 +276,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s addService = true } - lb.backEnds[eID] = ip + lb.backEnds[eID] = &lbBackend{ip, false} ok, entries := s.assignIPToEndpoint(ip.String(), eID) if !ok || entries > 1 { @@ -302,7 +298,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s return nil } -func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string, deleteSvcRecords bool) error { +func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string, deleteSvcRecords bool, fullRemove bool) error { var rmService bool @@ -333,13 +329,19 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st return nil } - _, ok = lb.backEnds[eID] + be, ok := lb.backEnds[eID] if !ok { - logrus.Warnf("rmServiceBinding %s %s %s aborted lb.backEnds[eid] !ok", method, svcName, eID) + logrus.Warnf("rmServiceBinding %s %s %s aborted lb.backEnds[eid] && lb.disabled[eid] !ok", method, svcName, eID) return nil } - delete(lb.backEnds, eID) + if fullRemove { + // delete regardless + delete(lb.backEnds, eID) + } else { + be.disabled = true + } + if len(lb.backEnds) == 0 { // All the backends for this service have been // removed. Time to remove the load balancer and also @@ -359,7 +361,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st // Remove loadbalancer service(if needed) and backend in all // sandboxes in the network only if the vip is valid. if len(vip) != 0 && entries == 0 { - n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService) + n.(*network).rmLBBackend(ip, vip, lb, ingressPorts, rmService, fullRemove) } // Delete the name resolutions diff --git a/service_linux.go b/service_linux.go index ec002d6a1a..68be3f19f3 100644 --- a/service_linux.go +++ b/service_linux.go @@ -102,8 +102,10 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) { } lb.service.Lock() - for _, ip := range lb.backEnds { - sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) + for _, be := range lb.backEnds { + if !be.disabled { + sb.addLBBackend(be.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) + } } lb.service.Unlock() } @@ -134,7 +136,7 @@ func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po // Remove loadbalancer backend from all sandboxes which has a // connection to this network. If needed remove the service entry as // well, as specified by the rmService bool. -func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, rmService bool) { +func (n *network) rmLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*PortConfig, rmService bool, fullRemove bool) { n.WalkEndpoints(func(e Endpoint) bool { ep := e.(*endpoint) if sb, ok := ep.getSandbox(); ok { @@ -147,7 +149,7 @@ func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Por gwIP = ep.Iface().Address().IP } - sb.rmLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, rmService, n.ingress) + sb.rmLBBackend(ip, vip, lb.fwMark, ingressPorts, ep.Iface().Address(), gwIP, rmService, fullRemove, n.ingress) } return false @@ -214,7 +216,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P } // Remove loadbalancer backend from one connected sandbox. -func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, rmService bool, isIngressNetwork bool) { +func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, rmService bool, fullRemove bool, isIngressNetwork bool) { if sb.osSbox == nil { return } @@ -241,8 +243,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po Weight: 1, } - if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT { - logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d in sbox %s (%s): %v", ip, vip, fwMark, sb.ID()[0:7], sb.ContainerID()[0:7], err) + if fullRemove { + if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT { + logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d in sbox %s (%s): %v", ip, vip, fwMark, sb.ID()[0:7], sb.ContainerID()[0:7], err) + } + } else { + d.Weight = 0 + if err := i.UpdateDestination(s, d); err != nil && err != syscall.ENOENT { + logrus.Errorf("Failed to set LB weight of real server %s to 0 for vip %s fwmark %d in sbox %s (%s): %v", ip, vip, fwMark, sb.ID()[0:7], sb.ContainerID()[0:7], err) + } } if rmService { diff --git a/service_windows.go b/service_windows.go index 6fe521ef99..095d35d630 100644 --- a/service_windows.go +++ b/service_windows.go @@ -5,7 +5,7 @@ import "net" func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig) { } -func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, rmService bool) { +func (n *network) rmLBBackend(ip, vip net.IP, lb *loadBalancer, ingressPorts []*PortConfig, rmService bool, fullRemove bool) { } func (sb *sandbox) populateLoadbalancers(ep *endpoint) {