diff --git a/pkg/k8s/client.go b/pkg/k8s/client.go index 42985506a..f27e92b1b 100644 --- a/pkg/k8s/client.go +++ b/pkg/k8s/client.go @@ -57,7 +57,7 @@ func newClient(informerCollection *fsminformers.InformerCollection, policyClient // If specific informers are not selected to be initialized, initialize all informers if len(selectInformers) == 0 { - selectInformers = []InformerKey{Namespaces, Services, ServiceAccounts, Pods, Endpoints} + selectInformers = []InformerKey{Namespaces, Services, ServiceAccounts, Pods, Endpoints, VirtualMachine} } for _, informer := range selectInformers { diff --git a/pkg/sidecar/providers/pipy/repo/util.go b/pkg/sidecar/providers/pipy/repo/util.go index 36422d2b9..5ef22c2a8 100644 --- a/pkg/sidecar/providers/pipy/repo/util.go +++ b/pkg/sidecar/providers/pipy/repo/util.go @@ -421,6 +421,11 @@ func generatePipyOutboundTrafficBalancePolicy(meshCatalog catalog.MeshCataloger, dependClusters map[service.ClusterName]*WeightedCluster) bool { ready := true viaGateway := cfg.GetMeshConfig().Spec.Connector.ViaGateway + clusterSet := fmt.Sprintf("%s.%s.%s.%s", + cfg.GetMeshConfig().Spec.ClusterSet.Name, + cfg.GetMeshConfig().Spec.ClusterSet.Group, + cfg.GetMeshConfig().Spec.ClusterSet.Zone, + cfg.GetMeshConfig().Spec.ClusterSet.Region) otp := pipyConf.newOutboundTrafficPolicy() clustersConfigsMap := make(map[string][]*trafficpolicy.MeshClusterConfig) if len(outboundPolicy.ClustersConfigs) > 0 { @@ -464,8 +469,14 @@ func generatePipyOutboundTrafficBalancePolicy(meshCatalog catalog.MeshCataloger, } if len(upstreamEndpoint.ViaGatewayMode) > 0 { if upstreamEndpoint.WithGateway { - if upstreamEndpoint.WithMultiGateways { - viaGw = generatePipyViaGateway(upstreamEndpoint.AppProtocol, upstreamEndpoint.ClusterID, proxy, &viaGateway) + if proxy.VM { + if strings.EqualFold(proxy.ClusterID, clusterSet) { + viaGw = generatePipyViaGateway(upstreamEndpoint.AppProtocol, upstreamEndpoint.ClusterID, "", &viaGateway) + } else { + viaGw = generatePipyViaGateway(upstreamEndpoint.AppProtocol, upstreamEndpoint.ClusterID, proxy.ClusterID, &viaGateway) + } + } else if upstreamEndpoint.WithMultiGateways { + viaGw = generatePipyViaGateway(upstreamEndpoint.AppProtocol, upstreamEndpoint.ClusterID, proxy.ClusterID, &viaGateway) } } else { port = Port(upstreamEndpoint.Port) @@ -486,10 +497,10 @@ func generatePipyOutboundTrafficBalancePolicy(meshCatalog catalog.MeshCataloger, return ready } -func generatePipyViaGateway(appProtocol, clusterID string, proxy *pipy.Proxy, viaGateway *configv1alpha3.ConnectorGatewaySpec) string { +func generatePipyViaGateway(appProtocol, endpointClusterID, proxyClusterID string, viaGateway *configv1alpha3.ConnectorGatewaySpec) string { viaGw := "" - if len(appProtocol) > 0 && !strings.EqualFold(proxy.ClusterID, clusterID) { - if len(proxy.ClusterID) == 0 { // k8s -> fgw(EgressIP:EgressPort) -> others + if len(appProtocol) > 0 && !strings.EqualFold(proxyClusterID, endpointClusterID) { + if len(proxyClusterID) == 0 { // k8s -> fgw(EgressIP:EgressPort) -> others if len(viaGateway.ClusterIP) > 0 && viaGateway.EgressHTTPPort > 0 && strings.EqualFold(constants.ProtocolHTTP, appProtocol) { viaGw = fmt.Sprintf("%s:%d", viaGateway.ClusterIP, viaGateway.EgressHTTPPort) @@ -499,7 +510,7 @@ func generatePipyViaGateway(appProtocol, clusterID string, proxy *pipy.Proxy, vi viaGw = fmt.Sprintf("%s:%d", viaGateway.ClusterIP, viaGateway.EgressGRPCPort) } } else { - if len(clusterID) == 0 { // others -> fgw(IngressIP:IngressPort) -> k8s + if len(endpointClusterID) == 0 { // others -> fgw(IngressIP:IngressPort) -> k8s if len(viaGateway.IngressAddr) > 0 && viaGateway.IngressHTTPPort > 0 && strings.EqualFold(constants.ProtocolHTTP, appProtocol) { viaGw = fmt.Sprintf("%s:%d", viaGateway.IngressAddr, viaGateway.IngressHTTPPort)