diff --git a/npm/cmd/start.go b/npm/cmd/start.go index 391d994f28..0969720f94 100644 --- a/npm/cmd/start.go +++ b/npm/cmd/start.go @@ -137,6 +137,8 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error { stopChannel := wait.NeverStop if config.Toggles.EnableV2NPM { // update the dataplane config + npmV2DataplaneCfg.EnableNPMLite = config.Toggles.EnableNPMLite + npmV2DataplaneCfg.MaxBatchedACLsPerPod = config.MaxBatchedACLsPerPod npmV2DataplaneCfg.NetPolInBackground = config.Toggles.NetPolInBackground diff --git a/npm/npm.go b/npm/npm.go index 72a429b7d8..914809b437 100644 --- a/npm/npm.go +++ b/npm/npm.go @@ -192,7 +192,7 @@ func (npMgr *NetworkPolicyManager) Start(config npmconfig.Config, stopCh <-chan // Starts all informers manufactured by npMgr's informerFactory. npMgr.InformerFactory.Start(stopCh) - // npn lite + // npm lite if npMgr.NpmLiteToggle { npMgr.PodInformerFactory.Start(stopCh) } diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 4a3ccd68ef..5de6f931de 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -45,6 +45,7 @@ type Config struct { NetPolInBackground bool MaxPendingNetPols int NetPolInterval time.Duration + EnableNPMLite bool *ipsets.IPSetManagerCfg *policies.PolicyManagerCfg } @@ -64,12 +65,13 @@ type DataPlane struct { nodeName string // endpointCache stores all endpoints of the network (including off-node) // Key is PodIP - endpointCache *endpointCache - ioShim *common.IOShim - updatePodCache *updatePodCache - endpointQuery *endpointQuery - applyInfo *applyInfo - netPolQueue *netPolQueue + endpointCache *endpointCache + ioShim *common.IOShim + updatePodCache *updatePodCache + endpointQuery *endpointQuery + endpointQueryAttachedState *endpointQuery // windows -> filter for state 2 (attached) endpoints in l1vh + applyInfo *applyInfo + netPolQueue *netPolQueue // removePolicyInfo tracks when a policy was removed yet had ApplyIPSet failures. // This field is only relevant for Linux. removePolicyInfo removePolicyInfo @@ -88,11 +90,12 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann policyMgr: policies.NewPolicyManager(ioShim, cfg.PolicyManagerCfg), ipsetMgr: ipsets.NewIPSetManager(cfg.IPSetManagerCfg, ioShim), // networkID is set when initializing Windows dataplane - networkID: "", - endpointCache: newEndpointCache(), - nodeName: nodeName, - ioShim: ioShim, - endpointQuery: new(endpointQuery), + networkID: "", + endpointCache: newEndpointCache(), + nodeName: nodeName, + ioShim: ioShim, + endpointQuery: new(endpointQuery), + endpointQueryAttachedState: new(endpointQuery), applyInfo: &applyInfo{ inBootupPhase: true, }, @@ -128,7 +131,6 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann } else { metrics.SendLog(util.DaemonDataplaneID, "[DataPlane] dataplane configured to NOT add netpols in background", true) } - return dp, nil } diff --git a/npm/pkg/dataplane/dataplane_windows.go b/npm/pkg/dataplane/dataplane_windows.go index cb65bdd420..43af8bef6b 100644 --- a/npm/pkg/dataplane/dataplane_windows.go +++ b/npm/pkg/dataplane/dataplane_windows.go @@ -2,7 +2,6 @@ package dataplane import ( "encoding/json" - "errors" "fmt" "strings" "time" @@ -12,6 +11,7 @@ import ( "github.com/Azure/azure-container-networking/npm/util" npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" "github.com/Microsoft/hcsshim/hcn" + "github.com/pkg/errors" "k8s.io/klog" ) @@ -50,14 +50,31 @@ func (dp *DataPlane) initializeDataPlane() error { }, Flags: hcn.HostComputeQueryFlagsNone, } + // Initialize Endpoint query used to filter healthy endpoints (vNIC) of Windows pods on L1VH Node + dp.endpointQueryAttachedState.query = hcn.HostComputeQuery{ + SchemaVersion: hcn.SchemaVersion{ + Major: hcnSchemaMajorVersion, + Minor: hcnSchemaMinorVersion, + }, + Flags: hcn.HostComputeQueryFlagsNone, + } + // Filter out any endpoints that are not in "AttachedShared" State. All running Windows pods with networking must be in this state. filterMap := map[string]uint16{"State": hcnEndpointStateAttachedSharing} filter, err := json.Marshal(filterMap) if err != nil { - return npmerrors.SimpleErrorWrapper("failed to marshal endpoint filter map", err) + return errors.Wrap(err, "failed to marshal endpoint filter map for attachedsharing state") } dp.endpointQuery.query.Filter = string(filter) + // Filter out any endpoints that are not in "Attached" State. All running Windows pods on L1VH with networking must be in this state. + filterMapAttached := map[string]uint16{"State": hcnEndpointStateAttached} + filterAttached, err := json.Marshal(filterMapAttached) + if err != nil { + return errors.Wrap(err, "failed to marshal endpoint filter map for attched state") + } + dp.endpointQueryAttachedState.query.Filter = string(filterAttached) + // reset endpoint cache so that netpol references are removed for all endpoints while refreshing pod endpoints // no need to lock endpointCache at boot up dp.endpointCache.cache = make(map[string]*npmEndpoint) @@ -329,14 +346,28 @@ func (dp *DataPlane) getEndpointsToApplyPolicies(netPols []*policies.NPMNetworkP func (dp *DataPlane) getLocalPodEndpoints() ([]*hcn.HostComputeEndpoint, error) { klog.Info("getting local endpoints") + + // Gets endpoints in state: Attached timer := metrics.StartNewTimer() + endpointsAttached, err := dp.ioShim.Hns.ListEndpointsQuery(dp.endpointQueryAttachedState.query) + metrics.RecordListEndpointsLatency(timer) + if err != nil { + metrics.IncListEndpointsFailures() + return nil, errors.Wrap(err, "failed to get local pod endpoints in state:attached") + } + + // Gets endpoints in state: AttachedSharing + timer = metrics.StartNewTimer() endpoints, err := dp.ioShim.Hns.ListEndpointsQuery(dp.endpointQuery.query) metrics.RecordListEndpointsLatency(timer) if err != nil { metrics.IncListEndpointsFailures() - return nil, npmerrors.SimpleErrorWrapper("failed to get local pod endpoints", err) + return nil, errors.Wrap(err, "failed to get local pod endpoints in state: attachedSharing") } + // Get endpoints unique to endpoints and endpointsAttached + endpoints = GetUniqueEndpoints(endpoints, endpointsAttached) + epPointers := make([]*hcn.HostComputeEndpoint, 0, len(endpoints)) for k := range endpoints { epPointers = append(epPointers, &endpoints[k]) @@ -344,6 +375,24 @@ func (dp *DataPlane) getLocalPodEndpoints() ([]*hcn.HostComputeEndpoint, error) return epPointers, nil } +func GetUniqueEndpoints(endpoints, endpointsAttached []hcn.HostComputeEndpoint) []hcn.HostComputeEndpoint { + // Store IDs of endpoints list in a map for quick lookup + idMap := make(map[string]struct{}, len(endpoints)) + for i := 0; i < len(endpoints); i++ { + ep := endpoints[i] + idMap[ep.Id] = struct{}{} + } + + // Add endpointsAttached list endpoints in endpoints list if the endpoint is not in the map + for i := 0; i < len(endpointsAttached); i++ { + ep := endpointsAttached[i] + if _, ok := idMap[ep.Id]; !ok { + endpoints = append(endpoints, ep) + } + } + return endpoints +} + // refreshPodEndpoints will refresh all the pod endpoints and create empty netpol references for new endpoints /* Key Assumption: a new pod event (w/ IP) cannot come before HNS knows (and can tell us) about the endpoint. diff --git a/npm/pkg/dataplane/dataplane_windows_test.go b/npm/pkg/dataplane/dataplane_windows_test.go index 5cd69a23c2..bfd2dd11f3 100644 --- a/npm/pkg/dataplane/dataplane_windows_test.go +++ b/npm/pkg/dataplane/dataplane_windows_test.go @@ -10,6 +10,8 @@ import ( "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" dptestutils "github.com/Azure/azure-container-networking/npm/pkg/dataplane/testutils" + "github.com/Microsoft/hcsshim/hcn" + "github.com/google/go-cmp/cmp" "github.com/pkg/errors" "github.com/stretchr/testify/require" ) @@ -86,6 +88,68 @@ func TestMultiJobApplyInBackground(t *testing.T) { testMultiJobCases(t, multiJobApplyInBackgroundTests(), time.Duration(1*time.Second)) } +func TestRemoveCommonEndpoints(t *testing.T) { + tests := []struct { + name string + endpoints []hcn.HostComputeEndpoint + endpointsAttached []hcn.HostComputeEndpoint + expected []hcn.HostComputeEndpoint + }{ + { + name: "1 value same", + endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "123456"}, {Id: "560971"}}, + endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "123456"}, {Id: "789012"}}, + expected: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "123456"}, {Id: "560971"}, {Id: "567890"}, {Id: "789012"}}, + }, + { + name: "no values same", + endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}}, + endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "789012"}}, + expected: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}, {Id: "567890"}, {Id: "789012"}}, + }, + { + name: "1 value same", + endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "123456"}, {Id: "560971"}}, + endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "123456"}, {Id: "789012"}}, + expected: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "123456"}, {Id: "560971"}, {Id: "567890"}, {Id: "789012"}}, + }, + { + name: "two values same", + endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}, {Id: "123456"}, {Id: "789012"}}, + endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "789012"}, {Id: "123456"}}, + expected: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}, {Id: "123456"}, {Id: "789012"}, {Id: "567890"}}, + }, + { + name: "no values", + endpoints: []hcn.HostComputeEndpoint{}, + endpointsAttached: []hcn.HostComputeEndpoint{}, + expected: []hcn.HostComputeEndpoint{}, + }, + { + name: "1 value - same", + endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}}, + endpointsAttached: []hcn.HostComputeEndpoint{{Id: "456901"}}, + expected: []hcn.HostComputeEndpoint{{Id: "456901"}}, + }, + { + name: "1 value - different", + endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}}, + endpointsAttached: []hcn.HostComputeEndpoint{}, + expected: []hcn.HostComputeEndpoint{{Id: "456901"}}, + }, + } + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + result := GetUniqueEndpoints(tt.endpoints, tt.endpointsAttached) + if !cmp.Equal(tt.expected, result) { + t.Errorf("Test %s failed: expected %v, got %v", tt.name, tt.expected, result) + } + }) + } +} + func testSerialCases(t *testing.T, tests []*SerialTestCase, finalSleep time.Duration) { for i, tt := range tests { i := i