Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 78 additions & 18 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,14 +635,15 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
}

buf, err := proto.Marshal(&EndpointRecord{
Name: name,
ServiceName: ep.svcName,
ServiceID: ep.svcID,
VirtualIP: ep.virtualIP.String(),
IngressPorts: ingressPorts,
Aliases: ep.svcAliases,
TaskAliases: ep.myAliases,
EndpointIP: ep.Iface().Address().IP.String(),
Name: name,
ServiceName: ep.svcName,
ServiceID: ep.svcID,
VirtualIP: ep.virtualIP.String(),
IngressPorts: ingressPorts,
Aliases: ep.svcAliases,
TaskAliases: ep.myAliases,
EndpointIP: ep.Iface().Address().IP.String(),
ServiceDisabled: false,
})
if err != nil {
return err
Expand All @@ -660,7 +661,7 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
return nil
}

func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, fullRemove bool, method string) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 {
return nil
}
Expand All @@ -674,6 +675,15 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
defer sb.Service.Unlock()
logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())

// Avoid a race w/ with a container that aborts preemptively. This would
// get caught in disableServceInNetworkDB, but we check here to make the
// nature of the condition more clear.
// See comment in addServiceInfoToCluster()
if e := sb.getEndpoint(ep.ID()); e == nil {
logrus.Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
return nil
}

c := n.getController()
agent := c.getAgent()

Expand All @@ -683,9 +693,13 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
}

if agent != nil {
// First delete from networkDB then locally
if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
// First update the networkDB then locally
if fullRemove {
if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
}
} else {
disableServiceInNetworkDB(agent, n, ep)
}
}

Expand All @@ -696,7 +710,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil {
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
return err
}
} else {
Expand All @@ -712,6 +726,35 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
return nil
}

func disableServiceInNetworkDB(a *agent, n *network, ep *endpoint) {
var epRec EndpointRecord

logrus.Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID())

// Update existing record to indicate that the service is disabled
inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID())
if err != nil {
logrus.Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err)
return
}
// Should never fail
if err := proto.Unmarshal(inBuf, &epRec); err != nil {
logrus.Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err)
return
}
epRec.ServiceDisabled = true
// Should never fail
outBuf, err := proto.Marshal(&epRec)
if err != nil {
logrus.Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err)
return
}
// Send update to the whole cluster
if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil {
logrus.Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err)
}
}

func (n *network) addDriverWatches() {
if !n.isClusterEligible() {
return
Expand Down Expand Up @@ -841,7 +884,6 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
nid string
eid string
value []byte
isAdd bool
epRec EndpointRecord
)

Expand All @@ -850,12 +892,15 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
nid = event.NetworkID
eid = event.Key
value = event.Value
isAdd = true
case networkdb.DeleteEvent:
nid = event.NetworkID
eid = event.Key
value = event.Value
case networkdb.UpdateEvent:
nid = event.NetworkID
eid = event.Key
value = event.Value
default:
logrus.Errorf("Unexpected update service table event = %#v", event)
return
}
Expand All @@ -880,7 +925,8 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
return
}

if isAdd {
switch ev.(type) {
case networkdb.CreateEvent:
logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
Expand All @@ -894,11 +940,12 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
logrus.Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
} else {

case networkdb.DeleteEvent:
logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
Expand All @@ -908,5 +955,18 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
logrus.Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
case networkdb.UpdateEvent:
logrus.Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
// We currently should only get these to inform us that an endpoint
// is disabled. Report if otherwise.
if svcID == "" || !epRec.ServiceDisabled {
logrus.Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
return
}
// This is a remote task that is part of a service that is now disabled
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
logrus.Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
}
}
Loading