diff --git a/.github/workflow-templates/test.yml b/.github/workflow-templates/test.yml index 711855bc4d..8aa14db291 100644 --- a/.github/workflow-templates/test.yml +++ b/.github/workflow-templates/test.yml @@ -83,15 +83,15 @@ jobs: kubetest --ginkgo-parallel=2 --provider=local --deployment=kind --kind-cluster-name=${KIND_CLUSTER_NAME} --test --test_args='--ginkgo.focus=\[sig-network\].*Conformance --disable-log-dump=false --ginkgo.skip=\[Serial\]' kubetest --ginkgo-parallel=2 --provider=local --deployment=kind --kind-cluster-name=${KIND_CLUSTER_NAME} --test --test_args='--ginkgo.focus=\[sig-network\].*NetworkPolicy --disable-log-dump=false --ginkgo.skip=ingress\saccess|multiple\segress\spolicies|allow\segress\saccess|\[Serial\]' - - name: Export logs on failure + - name: Export logs + if: always() run: | mkdir -p /tmp/kind/logs kind export logs --name ${KIND_CLUSTER_NAME} /tmp/kind/logs working-directory: ${{ env.WORKDIR }} - if: failure() - name: Upload logs + if: always() uses: actions/upload-artifact@v1 - if: failure() with: name: kind-logs path: /tmp/kind/logs diff --git a/.github/workflows/test_generated.yml b/.github/workflows/test_generated.yml index 53e13349be..7b490d007d 100644 --- a/.github/workflows/test_generated.yml +++ b/.github/workflows/test_generated.yml @@ -90,14 +90,14 @@ jobs: export NODE_NAMES=${MASTER_NAME} kubetest --ginkgo-parallel=2 --provider=local --deployment=kind --kind-cluster-name=${KIND_CLUSTER_NAME} --test --test_args='--ginkgo.focus=\[sig-network\].*Conformance --disable-log-dump=false --ginkgo.skip=\[Serial\]' kubetest --ginkgo-parallel=2 --provider=local --deployment=kind --kind-cluster-name=${KIND_CLUSTER_NAME} --test --test_args='--ginkgo.focus=\[sig-network\].*NetworkPolicy --disable-log-dump=false --ginkgo.skip=ingress\saccess|multiple\segress\spolicies|allow\segress\saccess|\[Serial\]' - - name: Export logs on failure + - name: Export logs + if: always() run: "mkdir -p /tmp/kind/logs \nkind export logs --name ${KIND_CLUSTER_NAME} /tmp/kind/logs\n" working-directory: "${{ env.WORKDIR }}" - if: failure() - name: Upload logs + if: always() uses: actions/upload-artifact@v1 - if: failure() with: name: kind-logs path: "/tmp/kind/logs" diff --git a/contrib/kind.sh b/contrib/kind.sh index 1e501c32af..3a185022af 100755 --- a/contrib/kind.sh +++ b/contrib/kind.sh @@ -48,6 +48,7 @@ kubectl create -f ovnkube-db.yaml kubectl create -f ovnkube-master.yaml kubectl create -f ovnkube-node.yaml popd +kubectl -n kube-system delete ds kube-proxy kind get clusters kind get nodes --name ${CLUSTER_NAME} diff --git a/dist/images/Dockerfile b/dist/images/Dockerfile index cdbc5efc3c..d1ece50e68 100644 --- a/dist/images/Dockerfile +++ b/dist/images/Dockerfile @@ -62,6 +62,7 @@ COPY ovn-k8s-cni-overlay /usr/libexec/cni/ovn-k8s-cni-overlay # variables to direct operation and configure ovn COPY ovnkube.sh /root/ COPY ovn-debug.sh /root/ +COPY ovndb-raft-functions /root/ # override the rpm's ovn_k8s.conf with this local copy COPY ovn_k8s.conf /etc/openvswitch/ovn_k8s.conf diff --git a/dist/images/Dockerfile.fedora b/dist/images/Dockerfile.fedora index 6f6c6d0686..fc83def726 100644 --- a/dist/images/Dockerfile.fedora +++ b/dist/images/Dockerfile.fedora @@ -25,6 +25,9 @@ RUN INSTALL_PKGS=" \ dnf install --refresh -y --setopt=tsflags=nodocs $INSTALL_PKGS && \ dnf clean all && rm -rf /var/cache/dnf/* +# REMOVE ME when ovn is fixed +RUN dnf -y downgrade ovn + RUN mkdir -p /var/run/openvswitch && \ mkdir -p /usr/libexec/cni/ @@ -38,15 +41,7 @@ COPY git_info /root # variables to direct operation and configure ovn COPY ovnkube.sh /root/ COPY ovn-debug.sh /root/ - -# iptables wrappers -COPY ./iptables-scripts/iptables /usr/sbin/ -COPY ./iptables-scripts/iptables-save /usr/sbin/ -COPY ./iptables-scripts/iptables-restore /usr/sbin/ -COPY ./iptables-scripts/ip6tables /usr/sbin/ -COPY ./iptables-scripts/ip6tables-save /usr/sbin/ -COPY ./iptables-scripts/ip6tables-restore /usr/sbin/ -COPY ./iptables-scripts/iptables /usr/sbin/ +COPY ovndb-raft-functions /root/ LABEL io.k8s.display-name="ovn-kubernetes" \ io.k8s.description="This is a Kubernetes network plugin that provides an overlay network using OVN." \ diff --git a/dist/images/Dockerfile.ubuntu b/dist/images/Dockerfile.ubuntu index 99ad480a9b..517dd1a0f1 100644 --- a/dist/images/Dockerfile.ubuntu +++ b/dist/images/Dockerfile.ubuntu @@ -39,6 +39,7 @@ COPY ovn-k8s-cni-overlay /usr/libexec/cni/ovn-k8s-cni-overlay # variables to direct operation and configure ovn COPY ovnkube.sh /root/ COPY ovn-debug.sh /root/ +COPY ovndb-raft-functions /root/ # override the pkg's ovn_k8s.conf with this local copy COPY ovn_k8s.conf /etc/openvswitch/ovn_k8s.conf diff --git a/dist/images/daemonset.sh b/dist/images/daemonset.sh index 048bdf885a..e2ca07b949 100755 --- a/dist/images/daemonset.sh +++ b/dist/images/daemonset.sh @@ -121,6 +121,8 @@ ovn_db_replicas=${OVN_DB_REPLICAS:-3} echo "ovn_db_replicas: ${ovn_db_replicas}" ovn_db_vip=${OVN_DB_VIP} echo "ovn_db_vip: ${ovn_db_vip}" +ovn_db_minAvailable=$(((${ovn_db_replicas} + 1) / 2)) +echo "ovn_db_minAvailable: ${ovn_db_minAvailable}" ovn_image=${image} ovn_image_pull_policy=${policy} kind=${KIND} ovn_gateway_mode=${ovn_gateway_mode} \ ovn_gateway_opts=${ovn_gateway_opts} j2 ../templates/ovnkube-node.yaml.j2 -o ../yaml/ovnkube-node.yaml @@ -133,6 +135,9 @@ ovn_image=${image} ovn_image_pull_policy=${policy} j2 ../templates/ovnkube-db.ya ovn_db_vip_image=${ovn_db_vip_image} ovn_image_pull_policy=${policy} ovn_db_replicas=${ovn_db_replicas} \ ovn_db_vip=${ovn_db_vip} j2 ../templates/ovnkube-db-vip.yaml.j2 -o ../yaml/ovnkube-db-vip.yaml +ovn_image=${image} ovn_image_pull_policy=${policy} ovn_db_replicas=${ovn_db_replicas} \ +ovn_db_minAvailable=${ovn_db_minAvailable} j2 ../templates/ovnkube-db-raft.yaml.j2 > ../yaml/ovnkube-db-raft.yaml + # ovn-setup.yaml # net_cidr=10.128.0.0/14/23 # svc_cidr=172.30.0.0/16 diff --git a/dist/images/ovndb-raft-functions b/dist/images/ovndb-raft-functions new file mode 100644 index 0000000000..defc1b0ac4 --- /dev/null +++ b/dist/images/ovndb-raft-functions @@ -0,0 +1,135 @@ +#!/bin/bash +#set -euo pipefail + +verify-ovsdb-raft () { + check_ovn_daemonset_version "3" + + replicas=$(kubectl --server=${K8S_APISERVER} --token=${k8s_token} --certificate-authority=${K8S_CACERT} \ + get statefulset -n ovn-kubernetes ovnkube-db -o=jsonpath='{.spec.replicas}') + if [[ ${replicas} -lt 3 || $((${replicas} % 2)) -eq 0 ]]; then + echo "at least 3 nodes need to be configured, and it must be odd number of nodes" + exit 1 + fi +} + +# OVN DB must be up in the first DB node +# This waits for ovnkube-db-0 POD to come up +ready_to_join_cluster () { + # See if ep is available ... + db=${1} + port=${2} + + init_ip="$(kubectl --server=${K8S_APISERVER} --token=${k8s_token} --certificate-authority=${K8S_CACERT} \ + get pod -n ovn-kubernetes ovnkube-db-0 -o=jsonpath='{.status.podIP}')" + if [[ $? != 0 ]]; then + return 1 + fi + ovsdb-client list-dbs tcp:${init_ip}:${port} > /dev/null 2>&1 + if [[ $? != 0 ]] ; then + return 1 + fi + return 0 +} + +check_ovnkube_db_ep () { + local dbaddr=${1} + local dbport=${2} + + # TODO: Right now only checks for NB ovsdb instances + echo "======= checking ${dbaddr}:${dbport} OVSDB instance ===============" + ovsdb-client list-dbs tcp:${dbaddr}:${dbport} > /dev/null 2>&1 + if [[ $? != 0 ]] ; then + return 1 + fi + return 0 +} + +check_and_apply_ovnkube_db_ep () { + local port=${1} + + # Get IPs of all ovnkube-db PODs + ips=() + for (( i=0; i<${replicas}; i++ )); do + ip=$(kubectl --server=${K8S_APISERVER} --token=${k8s_token} --certificate-authority=${K8S_CACERT} \ + get pod -n ovn-kubernetes ovnkube-db-${i} -o=jsonpath='{.status.podIP}' 2>/dev/null) + if [[ ${ip} == "" ]]; then + break + fi + ips+=(${ip}) + done + + if [[ ${i} -eq ${replicas} ]]; then + # Number of POD IPs is same as number of statefulset replicas. Now, if the number of ovnkube-db endpoints + # is 0, then we are applying the endpoint for the first time. So, we need to make sure that each of the + # pod IP responds to the `ovsdb-client list-dbs` call before we set the endpoint. If they don't, retry several + # times and then give up. + + # Get the current set of ovnkube-db endpoints, if any + IFS=" " read -a old_ips <<< "$(kubectl --server=${K8S_APISERVER} --token=${k8s_token} --certificate-authority=${K8S_CACERT} \ + get ep -n ovn-kubernetes ovnkube-db -o=jsonpath='{range .subsets[0].addresses[*]}{.ip}{" "}' 2>/dev/null)" + if [[ ${#old_ips[@]} -ne 0 ]]; then + return + fi + + for ip in ${ips[@]} ; do + wait_for_event attempts=10 check_ovnkube_db_ep ${ip} ${port} + done + set_ovnkube_db_ep ${ips[@]} + else + # ideally shouldn't happen + echo "Not all the pods in the statefulset are up. Expecting ${replicas} pods, but found ${i} pods." + echo "Exiting...." + exit 10 + fi +} + +# v3 - create nb_ovsdb/sb_ovsdb cluster in a separate container +ovsdb-raft () { + trap 'kill $(jobs -p); exit 0' TERM + + local db=${1} + local port=${2} + + ovn_db_pidfile=${OVN_RUNDIR}/ovn${db}_db.pid + eval ovn_log_db=\$ovn_log_${db} + ovn_db_file=${OVN_ETCDIR}/ovn${db}.db + + rm -f ${ovn_db_pidfile} + verify-ovsdb-raft + local_ip=$(getent ahostsv4 $(hostname) | grep -v "^127\." | head -1 | awk '{ print $1 }') + echo "=============== run ${db}-ovsdb-raft pod ${POD_NAME} ==========" + + if [[ "${POD_NAME}" == "ovnkube-db-0" ]]; then + run_as_ovs_user_if_needed \ + ${OVNCTL_PATH} run_${db}_ovsdb --no-monitor \ + --db-${db}-create-insecure-remote=yes --db-${db}-cluster-local-addr=${local_ip} \ + --ovn-${db}-log="${ovn_log_db}" & + else + # join the remote cluster node if the DB is not created + if [[ ! -e ${ovn_db_file} ]] || ovsdb-tool db-is-standalone ${ovn_db_file} ; then + wait_for_event ready_to_join_cluster ${db} ${port} + fi + run_as_ovs_user_if_needed \ + ${OVNCTL_PATH} run_${db}_ovsdb --no-monitor \ + --db-${db}-create-insecure-remote=yes --db-${db}-cluster-local-addr=${local_ip} \ + --db-${db}-cluster-remote-addr=${init_ip} \ + --ovn-${db}-log="${ovn_log_db}" & + fi + + wait_for_event process_ready ovn${db}_db + echo "=============== ${db}-ovsdb-raft ========== RUNNING" + sleep 3 + + last_node_index=$(expr ${replicas} - 1) + # Create endpoints only if all ovnkube-db pods have started and are running. We do this + # from the last pod of the statefulset. + if [[ ${db} == "nb" && "${POD_NAME}" == "ovnkube-db-"${last_node_index} ]]; then + check_and_apply_ovnkube_db_ep ${port} + fi + + tail --follow=name ${OVN_LOGDIR}/ovsdb-server-${db}.log & + ovn_tail_pid=$! + + process_healthy ovn${db}_db ${ovn_tail_pid} + echo "=============== run ${db}_ovsdb-raft ========== terminated" +} diff --git a/dist/images/ovnkube.sh b/dist/images/ovnkube.sh index 23c5510148..266fa2bd09 100755 --- a/dist/images/ovnkube.sh +++ b/dist/images/ovnkube.sh @@ -6,6 +6,9 @@ if [[ "${OVNKUBE_SH_VERBOSE:-}" == "true" ]]; then set -x fi +# source the functions in ovndb-raft-functions +. /root/ovndb-raft-functions + # This script is the entrypoint to the image. # Supports version 3 daemonsets # $1 is the daemon to start. @@ -190,19 +193,19 @@ wait_for_event () { shift fi while true; do - $1 $2 + $@ if [[ $? != 0 ]] ; then (( retries += 1 )) if [[ "${retries}" -gt ${attempts} ]]; then - echo "error: $1 $2 did not come up, exiting" + echo "error: $@ did not come up, exiting" exit 1 fi - echo "info: Waiting for $1 $2 to come up, waiting ${sleeper}s ..." + echo "info: Waiting for $@ to come up, waiting ${sleeper}s ..." sleep ${sleeper} sleeper=5 else if [[ "${retries}" != 0 ]]; then - echo "$1 $2 came up in ${retries} ${sleeper} sec tries" + echo "$@ came up in ${retries} ${sleeper} sec tries" fi break fi @@ -215,13 +218,16 @@ wait_for_event () { ready_to_start_node () { # See if ep is available ... - ovn_db_host=$(kubectl --server=${K8S_APISERVER} --token=${k8s_token} --certificate-authority=${K8S_CACERT} \ - get ep -n ${ovn_kubernetes_namespace} ovnkube-db 2>/dev/null | grep ${ovn_sb_port} | sed 's/:/ /' | awk '/ovnkube-db/{ print $2 }') - if [[ ${ovn_db_host} == "" ]] ; then + IFS=" " read -a ovn_db_hosts <<< "$(kubectl --server=${K8S_APISERVER} --token=${k8s_token} --certificate-authority=${K8S_CACERT} \ + get ep -n ovn-kubernetes ovnkube-db -o=jsonpath='{range .subsets[0].addresses[*]}{.ip}{" "}')" + if [[ ${#ovn_db_hosts[@]} == 0 ]] ; then return 1 fi get_ovn_db_vars - ovsdb-client list-dbs ${ovn_nbdb_test} > /dev/null 2>&1 + # cannot use ovsdb-client in the case of raft, since it will succeed even if one of the + # instance of DB is up and running. HOwever, ovn-nbctl always connects to the leader in the clustered + # database, so use it. + ovn-nbctl --db=${ovn_nbdb_test} list NB_Global > /dev/null 2>&1 if [[ $? != 0 ]] ; then return 1 fi @@ -243,9 +249,24 @@ check_ovn_daemonset_version () { } get_ovn_db_vars () { - ovn_nbdb=tcp://${ovn_db_host}:${ovn_nb_port} - ovn_sbdb=tcp://${ovn_db_host}:${ovn_sb_port} - ovn_nbdb_test=$(echo ${ovn_nbdb} | sed 's;//;;') + # OVN_NORTH and OVN_SOUTH override derived host + # Currently limited to tcp (ssl is not supported yet) + ovn_nbdb_str="" + ovn_sbdb_str="" + for i in ${!ovn_db_hosts[@]}; do + if [[ ${i} -ne 0 ]]; then + ovn_nbdb_str=${ovn_nbdb_str}"," + ovn_sbdb_str=${ovn_sbdb_str}"," + fi + ovn_nbdb_str=${ovn_nbdb_str}tcp://${ovn_db_hosts[${i}]}:${ovn_nb_port} + ovn_sbdb_str=${ovn_sbdb_str}tcp://${ovn_db_hosts[${i}]}:${ovn_sb_port} + done + ovn_nbdb=${OVN_NORTH:-$ovn_nbdb_str} + ovn_sbdb=${OVN_SOUTH:-$ovn_sbdb_str} + + echo ovn_nbdb=$ovn_nbdb + echo ovn_sbdb=$ovn_sbdb + ovn_nbdb_test=$(echo ${ovn_nbdb} | sed 's;//;;g') } # OVS must be up before OVN comes up. @@ -413,7 +434,6 @@ echo ovnkube.sh version ${ovnkube_version} } ovn_debug () { - # get ovn_db_host ready_to_start_node echo "ovn_nbdb ${ovn_nbdb} ovn_sbdb ${ovn_sbdb}" echo "ovn_nbdb_test ${ovn_nbdb_test}" @@ -443,7 +463,7 @@ ovn_debug () { ovs-ofctl dump-flows br-int echo " " echo "=========== ovn-sbctl show =============" - ovn_sbdb_test=$(echo ${ovn_sbdb} | sed 's;//;;') + ovn_sbdb_test=$(echo ${ovn_sbdb} | sed 's;//;;g') echo "=========== ovn-sbctl --db=${ovn_sbdb_test} show =============" ovn-sbctl --db=${ovn_sbdb_test} show echo " " @@ -541,8 +561,10 @@ cleanup-ovs-server () { # set the ovnkube_db endpoint for other pods to query the OVN DB IP set_ovnkube_db_ep () { + ips=("$@") + + echo "=============== setting ovnkube-db endpoints to ${ips[@]}" # create a new endpoint for the headless onvkube-db service without selectors - # using the current host has the endpoint IP. Ignore IPs in loopback range (127.0.0.0/8) kubectl --server=${K8S_APISERVER} --token=${k8s_token} --certificate-authority=${K8S_CACERT} apply -f - << EOF apiVersion: v1 kind: Endpoints @@ -551,7 +573,7 @@ metadata: namespace: ${ovn_kubernetes_namespace} subsets: - addresses: - - ip: ${ovn_db_host} +`for ip in ${ips[@]}; do printf " - ip: ${ip}\n"; done` ports: - name: north port: ${ovn_nb_port} @@ -561,7 +583,7 @@ subsets: protocol: TCP EOF if [[ $? != 0 ]] ; then - echo "Failed to create endpoint with host ${ovn_db_host} for ovnkube-db service" + echo "Failed to create endpoint with host(s) ${ips[@]} for ovnkube-db service" exit 1 fi } @@ -612,7 +634,7 @@ sb-ovsdb () { ovn-sbctl set-connection ptcp:${ovn_sb_port}:${ovn_db_host} -- set connection . inactivity_probe=0 # create the ovnkube_db endpoint for other pods to query the OVN DB IP - set_ovnkube_db_ep + set_ovnkube_db_ep ${ovn_db_host} tail --follow=name ${OVN_LOGDIR}/ovsdb-server-sb.log & ovn_tail_pid=$! @@ -634,15 +656,14 @@ run-ovn-northd () { sleep 1 echo "=============== run_ovn_northd ========== MASTER ONLY" - echo "ovn_db_host ${ovn_db_host}" echo "ovn_nbdb ${ovn_nbdb} ovn_sbdb ${ovn_sbdb}" echo "ovn_northd_opts=${ovn_northd_opts}" echo "ovn_log_northd=${ovn_log_northd}" # no monitor (and no detach), start northd which connects to the # ovnkube-db service - ovn_nbdb_i=$(echo ${ovn_nbdb} | sed 's;//;;') - ovn_sbdb_i=$(echo ${ovn_sbdb} | sed 's;//;;') + ovn_nbdb_i=$(echo ${ovn_nbdb} | sed 's;//;;g') + ovn_sbdb_i=$(echo ${ovn_sbdb} | sed 's;//;;g') run_as_ovs_user_if_needed \ ${OVNCTL_PATH} start_northd \ --no-monitor --ovn-manage-ovsdb=no \ @@ -738,6 +759,10 @@ ovn-controller () { echo "ovn_nbdb ${ovn_nbdb} ovn_sbdb ${ovn_sbdb}" echo "ovn_nbdb_test ${ovn_nbdb_test}" + # cleanup any stale ovn-nb and ovn-remote keys in Open_vSwitch table + ovs-vsctl remove Open_vSwitch . external_ids ovn-remote + ovs-vsctl remove Open_vSwitch . external_ids ovn-nb + echo "=============== ovn-controller start_controller" rm -f /var/run/ovn-kubernetes/cni/* rm -f ${OVN_RUNDIR}/ovn-controller.*.ctl @@ -931,9 +956,15 @@ case ${cmd} in "cleanup-ovn-node") cleanup-ovn-node ;; + "nb-ovsdb-raft") + ovsdb-raft nb ${ovn_nb_port} + ;; + "sb-ovsdb-raft") + ovsdb-raft sb ${ovn_sb_port} + ;; *) - echo "invalid command ${cmd}" - echo "valid v3 commands: ovs-server nb-ovsdb sb-ovsdb run-ovn-northd ovn-master ovn-controller ovn-node display_env display ovn_debug cleanup-ovs-server cleanup-ovn-node" + echo "invalid command ${cmd}" + echo "valid v3 commands: ovs-server nb-ovsdb sb-ovsdb run-ovn-northd ovn-master ovn-controller ovn-node display_env display ovn_debug cleanup-ovs-server cleanup-ovn-node nb-ovsdb-raft sb-ovsdb-raft" exit 0 esac diff --git a/dist/templates/ovn-setup.yaml.j2 b/dist/templates/ovn-setup.yaml.j2 index a33448434b..ba65efa8ee 100644 --- a/dist/templates/ovn-setup.yaml.j2 +++ b/dist/templates/ovn-setup.yaml.j2 @@ -81,8 +81,10 @@ rules: - apiGroups: - extensions - networking.k8s.io + - apps resources: - networkpolicies + - statefulsets verbs: ["get", "list", "watch"] - apiGroups: - "" @@ -120,27 +122,6 @@ subjects: name: ovn namespace: ovn-kubernetes ---- -# service to expose the ovnkube-db pod -apiVersion: v1 -kind: Service -metadata: - name: ovnkube-db - namespace: ovn-kubernetes -spec: - ports: - - name: north - port: 6641 - protocol: TCP - targetPort: 6641 - - name: south - port: 6642 - protocol: TCP - targetPort: 6642 - sessionAffinity: None - clusterIP: None - type: ClusterIP - --- # The network cidr and service cidr are set in the ovn-config configmap kind: ConfigMap diff --git a/dist/templates/ovnkube-db-raft.yaml.j2 b/dist/templates/ovnkube-db-raft.yaml.j2 new file mode 100644 index 0000000000..c54f8a4c9b --- /dev/null +++ b/dist/templates/ovnkube-db-raft.yaml.j2 @@ -0,0 +1,235 @@ +# service to expose the ovnkube-db pod +apiVersion: v1 +kind: Service +metadata: + name: ovnkube-db + namespace: ovn-kubernetes +spec: + ports: + - name: north + port: 6641 + protocol: TCP + targetPort: 6641 + - name: south + port: 6642 + protocol: TCP + targetPort: 6642 + sessionAffinity: None + clusterIP: None + type: ClusterIP + +--- + +# ovndb-raft PodDisruptBudget to prevent majority of ovnkube raft cluster +# nodes from disruption +apiVersion: policy/v1beta1 +kind: PodDisruptionBudget +metadata: + name: ovndb-raft-pdb + namespace: ovn-kubernetes +spec: + minAvailable: {{ ovn_db_minAvailable | default(2) }} + selector: + matchLabels: + name: ovnkube-db + +--- + +# ovnkube-db raft statefulset +# daemonset version 3 +# starts ovn NB/SB ovsdb daemons, each in a separate container +# +kind: StatefulSet +apiVersion: apps/v1 +metadata: + name: ovnkube-db + namespace: ovn-kubernetes + annotations: + kubernetes.io/description: | + This statefulset launches the OVN NB/SB ovsdb components. +spec: + serviceName: ovnkube-db + podManagementPolicy: "Parallel" + replicas: {{ ovn_db_replicas | default(3) }} + revisionHistoryLimit: 10 + selector: + matchLabels: + name: ovnkube-db + template: + metadata: + labels: + name: ovnkube-db + component: network + type: infra + kubernetes.io/os: "linux" + annotations: + scheduler.alpha.kubernetes.io/critical-pod: '' + spec: + terminationGracePeriodSeconds: 30 + imagePullSecrets: + - name: registry-credentials + serviceAccountName: ovn + hostNetwork: true + + # required to be scheduled on node with ovn.org/ovnkube-db=true label but can + # only have one instance per node + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: ovn.org/ovnkube-db + operator: In + values: + - "true" + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: name + operator: In + values: + - ovnkube-db + topologyKey: kubernetes.io/hostname + + containers: + # nb-ovsdb - v3 + - name: nb-ovsdb + image: "{{ ovn_image | default('docker.io/ovnkube/ovn-daemonset:latest') }}" + imagePullPolicy: "{{ ovn_image_pull_policy | default('IfNotPresent') }}" + command: ["/root/ovnkube.sh", "nb-ovsdb-raft"] + + securityContext: + runAsUser: 0 + capabilities: + add: ["NET_ADMIN"] + + volumeMounts: + # ovn db is stored in the pod in /etc/openvswitch + # (or in /etc/ovn if OVN from new repository is used) + # and on the host in /var/lib/openvswitch/ + - mountPath: /etc/openvswitch/ + name: host-var-lib-ovs + - mountPath: /etc/ovn/ + name: host-var-lib-ovs + - mountPath: /var/log/openvswitch/ + name: host-var-log-ovs + - mountPath: /var/log/ovn/ + name: host-var-log-ovs + - mountPath: /var/run/openvswitch/ + name: host-var-run-ovs + - mountPath: /var/run/ovn/ + name: host-var-run-ovs + + resources: + requests: + cpu: 100m + memory: 300Mi + env: + - name: OVN_DAEMONSET_VERSION + value: "3" + - name: OVN_LOG_NB + value: "-vconsole:info -vfile:info" + - name: K8S_APISERVER + valueFrom: + configMapKeyRef: + name: ovn-config + key: k8s_apiserver + - name: OVN_KUBERNETES_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + ports: + - name: healthz + containerPort: 10256 + # TODO: Temporarily disabled until we determine how to wait for clean default + # config + # livenessProbe: + # initialDelaySeconds: 10 + # httpGet: + # path: /healthz + # port: 10256 + # scheme: HTTP + lifecycle: + # end of container + + # sb-ovsdb - v3 + - name: sb-ovsdb + image: "{{ ovn_image | default('docker.io/ovnkube/ovn-daemonset:latest') }}" + imagePullPolicy: "{{ ovn_image_pull_policy | default('IfNotPresent') }}" + command: ["/root/ovnkube.sh", "sb-ovsdb-raft"] + + securityContext: + runAsUser: 0 + capabilities: + add: ["NET_ADMIN"] + + volumeMounts: + # ovn db is stored in the pod in /etc/openvswitch + # (or in /etc/ovn if OVN from new repository is used) + # and on the host in /var/lib/openvswitch/ + - mountPath: /etc/openvswitch/ + name: host-var-lib-ovs + - mountPath: /etc/ovn/ + name: host-var-lib-ovs + - mountPath: /var/log/openvswitch/ + name: host-var-log-ovs + - mountPath: /var/log/ovn/ + name: host-var-log-ovs + - mountPath: /var/run/openvswitch/ + name: host-var-run-ovs + - mountPath: /var/run/ovn/ + name: host-var-run-ovs + + resources: + requests: + cpu: 100m + memory: 300Mi + env: + - name: OVN_DAEMONSET_VERSION + value: "3" + - name: OVN_LOG_SB + value: "-vconsole:info -vfile:info" + - name: K8S_APISERVER + valueFrom: + configMapKeyRef: + name: ovn-config + key: k8s_apiserver + - name: OVN_KUBERNETES_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + ports: + - name: healthz + containerPort: 10255 + # TODO: Temporarily disabled until we determine how to wait for clean default + # config + # livenessProbe: + # initialDelaySeconds: 10 + # httpGet: + # path: /healthz + # port: 10255 + # scheme: HTTP + lifecycle: + # end of container + + volumes: + - name: host-var-log-ovs + hostPath: + path: /var/log/openvswitch + - name: host-var-lib-ovs + hostPath: + path: /var/lib/openvswitch + - name: host-var-run-ovs + hostPath: + path: /var/run/openvswitch + tolerations: + - operator: "Exists" diff --git a/dist/templates/ovnkube-db-vip.yaml.j2 b/dist/templates/ovnkube-db-vip.yaml.j2 index 906177a685..6b37c7ebf8 100644 --- a/dist/templates/ovnkube-db-vip.yaml.j2 +++ b/dist/templates/ovnkube-db-vip.yaml.j2 @@ -1,3 +1,25 @@ +# service to expose the ovnkube-db pod +apiVersion: v1 +kind: Service +metadata: + name: ovnkube-db + namespace: ovn-kubernetes +spec: + ports: + - name: north + port: 6641 + protocol: TCP + targetPort: 6641 + - name: south + port: 6642 + protocol: TCP + targetPort: 6642 + sessionAffinity: None + clusterIP: None + type: ClusterIP + +--- + # ovnkube-db HA using Corosync/Pacemaker # daemonset version 3 # starts ovn NB/SB ovsdb daemons in a single container diff --git a/dist/templates/ovnkube-db.yaml.j2 b/dist/templates/ovnkube-db.yaml.j2 index 3a947d0063..5fe6eb4b3a 100644 --- a/dist/templates/ovnkube-db.yaml.j2 +++ b/dist/templates/ovnkube-db.yaml.j2 @@ -1,3 +1,25 @@ +# service to expose the ovnkube-db pod +apiVersion: v1 +kind: Service +metadata: + name: ovnkube-db + namespace: ovn-kubernetes +spec: + ports: + - name: north + port: 6641 + protocol: TCP + targetPort: 6641 + - name: south + port: 6642 + protocol: TCP + targetPort: 6642 + sessionAffinity: None + clusterIP: None + type: ClusterIP + +--- + # ovnkube-db # daemonset version 3 # starts ovn NB/SB ovsdb daemons, each in a separate container diff --git a/dist/yaml/.gitignore b/dist/yaml/.gitignore index ce0871d4c1..dd7c53c7e0 100644 --- a/dist/yaml/.gitignore +++ b/dist/yaml/.gitignore @@ -5,4 +5,4 @@ ovnkube-db-vip.yaml ovnkube-db.yaml ovnkube-node.yaml ovnkube-monitor.yaml - +ovnkube-db-raft.yaml diff --git a/go-controller/cmd/ovnkube/ovnkube.go b/go-controller/cmd/ovnkube/ovnkube.go index d00d5a94da..c01d0776d3 100644 --- a/go-controller/cmd/ovnkube/ovnkube.go +++ b/go-controller/cmd/ovnkube/ovnkube.go @@ -24,8 +24,6 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kexec "k8s.io/utils/exec" ) @@ -244,22 +242,16 @@ func runOvnKube(ctx *cli.Context) error { ovn.RegisterMetrics() - var nodeSelector *metav1.LabelSelector var hybridOverlayClusterSubnets []config.CIDRNetworkEntry if enableHybridOverlay { hybridOverlayClusterSubnets, err = hocontroller.GetHybridOverlayClusterSubnets(ctx) if err != nil { return err } - - nodeSelector = &metav1.LabelSelector{ - MatchLabels: map[string]string{v1.LabelOSStable: "linux"}, - } } // run the HA master controller to init the master - ovnHAController := ovn.NewHAMasterController(clientset, factory, master, stopChan, - hybridOverlayClusterSubnets, nodeSelector) + ovnHAController := ovn.NewHAMasterController(clientset, factory, master, stopChan, hybridOverlayClusterSubnets) if err := ovnHAController.StartHAMasterController(); err != nil { return err } diff --git a/go-controller/hybrid-overlay/pkg/controller/master_test.go b/go-controller/hybrid-overlay/pkg/controller/master_test.go index 4cd3585812..6b30f0f24e 100644 --- a/go-controller/hybrid-overlay/pkg/controller/master_test.go +++ b/go-controller/hybrid-overlay/pkg/controller/master_test.go @@ -113,12 +113,16 @@ var _ = Describe("Hybrid SDN Master Operations", func() { updatedNode, err := fakeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred()) Expect(updatedNode.Annotations).To(HaveKeyWithValue(types.HybridOverlayNodeSubnet, nodeSubnet)) + Expect(updatedNode.Annotations).NotTo(HaveKey(ovn.OvnNodeSubnets)) Expect(fexec.CalledMatchesExpected()).Should(BeTrue()) return nil } - err := app.Run([]string{app.Name}) + err := app.Run([]string{ + app.Name, + "-no-hostsubnet-nodes=" + v1.LabelOSStable + "=windows", + }) Expect(err).NotTo(HaveOccurred()) }) diff --git a/go-controller/hybrid-overlay/pkg/controller/node_linux.go b/go-controller/hybrid-overlay/pkg/controller/node_linux.go index cee5224fde..29a3b53063 100644 --- a/go-controller/hybrid-overlay/pkg/controller/node_linux.go +++ b/go-controller/hybrid-overlay/pkg/controller/node_linux.go @@ -54,13 +54,14 @@ func podToCookie(pod *kapi.Pod) string { } func (n *NodeController) addOrUpdatePod(pod *kapi.Pod) error { - podIP, podMAC := getPodDetails(pod, n.nodeName) - if podIP == "" || podMAC == "" { + podIP, podMAC, err := getPodDetails(pod, n.nodeName) + if err != nil { + logrus.Debugf("cleaning up hybrid overlay pod %s/%s because %v", pod.Namespace, pod.Name, err) return n.deletePod(pod) } cookie := podToCookie(pod) - _, _, err := util.RunOVSOfctl("add-flow", extBridgeName, + _, _, err = util.RunOVSOfctl("add-flow", extBridgeName, fmt.Sprintf("table=10, cookie=0x%s, priority=100, ip, nw_dst=%s, actions=set_field:%s->eth_src,set_field:%s->eth_dst,output:ext", cookie, podIP, n.drMAC, podMAC)) if err != nil { return fmt.Errorf("failed to add flows for pod %s/%s: %v", pod.Namespace, pod.Name, err) @@ -81,23 +82,22 @@ func (n *NodeController) deletePod(pod *kapi.Pod) error { return nil } -func getPodDetails(pod *kapi.Pod, nodeName string) (string, string) { +func getPodDetails(pod *kapi.Pod, nodeName string) (string, string, error) { if pod.Spec.NodeName != nodeName { - return "", "" + return "", "", fmt.Errorf("not scheduled") } podInfo, err := util.UnmarshalPodAnnotation(pod.Annotations) if err != nil { - logrus.Errorf("Unable to unmarshal pod annotations: %v", err) - return "", "" + return "", "", err } - return podInfo.IP.String(), podInfo.MAC.String() + return podInfo.IP.String(), podInfo.MAC.String(), nil } // podChanged returns true if any relevant pod attributes changed func podChanged(pod1 *kapi.Pod, pod2 *kapi.Pod, nodeName string) bool { - podIP1, mac1 := getPodDetails(pod1, nodeName) - podIP2, mac2 := getPodDetails(pod2, nodeName) + podIP1, mac1, _ := getPodDetails(pod1, nodeName) + podIP2, mac2, _ := getPodDetails(pod2, nodeName) return (podIP1 != podIP2 || mac1 != mac2) } diff --git a/go-controller/hybrid-overlay/pkg/controller/node_linux_test.go b/go-controller/hybrid-overlay/pkg/controller/node_linux_test.go index 07efd8d2d0..5ced55650c 100644 --- a/go-controller/hybrid-overlay/pkg/controller/node_linux_test.go +++ b/go-controller/hybrid-overlay/pkg/controller/node_linux_test.go @@ -85,8 +85,9 @@ func createNode(name, os, ip string, annotations map[string]string) *v1.Node { func createPod(namespace, name, node, podIP, podMAC string) *v1.Pod { annotations := map[string]string{} if podIP != "" || podMAC != "" { - annotations[util.OvnPodAnnotationName] = fmt.Sprintf(`{"%s":{"ip_address":"%s", "mac_address":"%s"}}`, - util.OvnPodDefaultNetwork, podIP, podMAC) + _, ipn, _ := net.ParseCIDR(podIP) + gatewayIP := util.NextIP(ipn.IP) + annotations[util.OvnPodAnnotationName] = fmt.Sprintf(`{"default": {"ip_address":"` + podIP + `", "mac_address":"` + podMAC + `", "gateway_ip": "` + gatewayIP.String() + `"}}`) } return &v1.Pod{ @@ -416,11 +417,10 @@ var _ = Describe("Hybrid Overlay Node Linux Operations", func() { It("sets up local pod flows", func() { app.Action = func(ctx *cli.Context) error { const ( - thisNode string = "mynode" - node1Name string = "node1" - pod1IP string = "1.2.3.5" - pod1CIDR string = pod1IP + "/24" - pod1MAC string = "aa:bb:cc:dd:ee:ff" + thisNode string = "mynode" + pod1IP string = "1.2.3.5" + pod1CIDR string = pod1IP + "/24" + pod1MAC string = "aa:bb:cc:dd:ee:ff" ) fakeClient := fake.NewSimpleClientset([]runtime.Object{ diff --git a/go-controller/hybrid-overlay/pkg/controller/node_windows.go b/go-controller/hybrid-overlay/pkg/controller/node_windows.go index 2816b7dd3d..29b1ec16db 100644 --- a/go-controller/hybrid-overlay/pkg/controller/node_windows.go +++ b/go-controller/hybrid-overlay/pkg/controller/node_windows.go @@ -337,8 +337,11 @@ func (n *NodeController) InitSelf() { logrus.Errorf("Error creating the network: no DRMAC address") return } - n.kube.SetAnnotationOnNode(n.thisNode, types.HybridOverlayDrMac, policySettings.Address) - + if err := n.kube.SetAnnotationsOnNode(n.thisNode, map[string]interface{}{ + types.HybridOverlayDrMac: policySettings.Address, + }); err != nil { + logrus.Errorf("failed to set DRMAC annotation on node: %v", err) + } break } } diff --git a/go-controller/pkg/cluster/cluster.go b/go-controller/pkg/cluster/cluster.go index cfe35f3a45..d9ea0cb696 100644 --- a/go-controller/pkg/cluster/cluster.go +++ b/go-controller/pkg/cluster/cluster.go @@ -8,6 +8,7 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" + kapi "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" ) @@ -26,14 +27,19 @@ func NewClusterController(kubeClient kubernetes.Interface, wf *factory.WatchFact } } -func setupOVNNode(nodeName string) error { +func setupOVNNode(node *kapi.Node) error { var err error + nodeName, err := util.GetNodeHostname(node) + if err != nil { + return fmt.Errorf("failed to obtain hostname from node %q: %v", node.Name, err) + } + nodeIP := config.Default.EncapIP if nodeIP == "" { - nodeIP, err = util.GetNodeIP(nodeName) + nodeIP, err = util.GetNodeIP(node) if err != nil { - return fmt.Errorf("failed to obtain local IP from hostname %q: %v", nodeName, err) + return fmt.Errorf("failed to obtain local IP from node %q: %v", node.Name, err) } } else { if ip := net.ParseIP(nodeIP); ip == nil { diff --git a/go-controller/pkg/cluster/gateway_init.go b/go-controller/pkg/cluster/gateway_init.go index 0e43e6a4df..cba2e9df67 100644 --- a/go-controller/pkg/cluster/gateway_init.go +++ b/go-controller/pkg/cluster/gateway_init.go @@ -176,12 +176,19 @@ func GatewayReady(nodeName string, portName string) (bool, error) { // OpenFlow table 41 performs SNATing of packets that are heading to physical network from // logical network. for _, clusterSubnet := range config.Default.ClusterSubnets { + var cidr, match string + cidr = clusterSubnet.CIDR.String() + if strings.Contains(cidr, ":") { + match = "ipv6,ipv6_src=" + cidr + } else { + match = "ip,nw_src=" + cidr + } stdout, _, err := util.RunOVSOfctl("--no-stats", "--no-names", "dump-flows", "br-int", - "table=41,ip,nw_src="+clusterSubnet.CIDR.String()) + "table=41,"+match) if err != nil { return false, nil } - if !strings.Contains(stdout, "nw_src="+clusterSubnet.CIDR.String()) { + if !strings.Contains(stdout, cidr) { return false, nil } } diff --git a/go-controller/pkg/cluster/node.go b/go-controller/pkg/cluster/node.go index 1ca84b0708..56b57e8973 100644 --- a/go-controller/pkg/cluster/node.go +++ b/go-controller/pkg/cluster/node.go @@ -121,7 +121,10 @@ func (cluster *OvnClusterController) StartClusterNode(name string) error { } } - err = setupOVNNode(name) + if node, err = cluster.Kube.GetNode(name); err != nil { + return fmt.Errorf("error retrieving node %s: %v", name, err) + } + err = setupOVNNode(node) if err != nil { return err } @@ -211,13 +214,13 @@ func (cluster *OvnClusterController) StartClusterNode(name string) error { wg.Wait() close(messages) }() - logrus.Infof("Gateway and ManagementPort are Ready") for i := range messages { if i != nil { return fmt.Errorf("Timeout error while obtaining addresses for %s (%v)", portName, i) } } + logrus.Infof("Gateway and ManagementPort are Ready") if postReady != nil { err = postReady() diff --git a/go-controller/pkg/cluster/node_test.go b/go-controller/pkg/cluster/node_test.go index 81c78ca167..19a635fcac 100644 --- a/go-controller/pkg/cluster/node_test.go +++ b/go-controller/pkg/cluster/node_test.go @@ -32,10 +32,25 @@ var _ = Describe("Node Operations", func() { It("sets correct OVN external IDs", func() { app.Action = func(ctx *cli.Context) error { const ( - nodeName string = "1.2.5.6" + nodeIP string = "1.2.5.6" + nodeName string = "cannot.be.resolv.ed" interval int = 100000 ofintval int = 180 ) + node := kapi.Node{ + Status: kapi.NodeStatus{ + Addresses: []kapi.NodeAddress{ + { + Type: kapi.NodeHostName, + Address: nodeName, + }, + { + Type: kapi.NodeExternalIP, + Address: nodeIP, + }, + }, + }, + } fexec := ovntest.NewFakeExec() fexec.AddFakeCmd(&ovntest.ExpectedCmd{ @@ -45,7 +60,7 @@ var _ = Describe("Node Operations", func() { "external_ids:ovn-remote-probe-interval=%d "+ "external_ids:ovn-openflow-probe-interval=%d "+ "external_ids:hostname=\"%s\"", - nodeName, interval, ofintval, nodeName), + nodeIP, interval, ofintval, nodeName), }) err := util.SetExec(fexec) @@ -54,7 +69,7 @@ var _ = Describe("Node Operations", func() { _, err = config.InitConfig(ctx, fexec, nil) Expect(err).NotTo(HaveOccurred()) - err = setupOVNNode(nodeName) + err = setupOVNNode(&node) Expect(err).NotTo(HaveOccurred()) Expect(fexec.CalledMatchesExpected()).To(BeTrue(), fexec.ErrorDesc) @@ -67,13 +82,28 @@ var _ = Describe("Node Operations", func() { It("sets non-default OVN encap port", func() { app.Action = func(ctx *cli.Context) error { const ( - nodeName string = "1.2.5.6" + nodeIP string = "1.2.5.6" + nodeName string = "cannot.be.resolv.ed" encapPort uint = 666 interval int = 100000 ofintval int = 180 chassisUUID string = "1a3dfc82-2749-4931-9190-c30e7c0ecea3" encapUUID string = "e4437094-0094-4223-9f14-995d98d5fff8" ) + node := kapi.Node{ + Status: kapi.NodeStatus{ + Addresses: []kapi.NodeAddress{ + { + Type: kapi.NodeHostName, + Address: nodeName, + }, + { + Type: kapi.NodeExternalIP, + Address: nodeIP, + }, + }, + }, + } fexec := ovntest.NewFakeExec() fexec.AddFakeCmd(&ovntest.ExpectedCmd{ @@ -83,7 +113,7 @@ var _ = Describe("Node Operations", func() { "external_ids:ovn-remote-probe-interval=%d "+ "external_ids:ovn-openflow-probe-interval=%d "+ "external_ids:hostname=\"%s\"", - nodeName, interval, ofintval, nodeName), + nodeIP, interval, ofintval, nodeName), }) fexec.AddFakeCmd(&ovntest.ExpectedCmd{ Cmd: fmt.Sprintf("ovs-vsctl --timeout=15 " + @@ -107,7 +137,7 @@ var _ = Describe("Node Operations", func() { Expect(err).NotTo(HaveOccurred()) config.Default.EncapPort = encapPort - err = setupOVNNode(nodeName) + err = setupOVNNode(&node) Expect(err).NotTo(HaveOccurred()) Expect(fexec.CalledMatchesExpected()).To(BeTrue(), fexec.ErrorDesc) diff --git a/go-controller/pkg/config/config.go b/go-controller/pkg/config/config.go index b772fc6a9f..8c07863c41 100644 --- a/go-controller/pkg/config/config.go +++ b/go-controller/pkg/config/config.go @@ -11,6 +11,8 @@ import ( "runtime" "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/sirupsen/logrus" "github.com/urfave/cli" gcfg "gopkg.in/gcfg.v1" @@ -23,6 +25,7 @@ import ( const DefaultEncapPort = 6081 const MetricNamespace = "ovnkube" +const DefaultAPIServer = "http://localhost:8443" // The following are global config parameters that other modules may access directly var ( @@ -59,7 +62,7 @@ var ( // Kubernetes holds Kubernetes-related parsed config file parameters and command-line overrides Kubernetes = KubernetesConfig{ - APIServer: "http://localhost:8080", + APIServer: DefaultAPIServer, ServiceCIDR: "172.16.1.0/24", OVNConfigNamespace: "ovn-kubernetes", } @@ -153,16 +156,18 @@ type CNIConfig struct { // KubernetesConfig holds Kubernetes-related parsed config file parameters and command-line overrides type KubernetesConfig struct { - Kubeconfig string `gcfg:"kubeconfig"` - CACert string `gcfg:"cacert"` - APIServer string `gcfg:"apiserver"` - Token string `gcfg:"token"` - ServiceCIDR string `gcfg:"service-cidr"` - OVNConfigNamespace string `gcfg:"ovn-config-namespace"` - MetricsBindAddress string `gcfg:"metrics-bind-address"` - MetricsEnablePprof bool `gcfg:"metrics-enable-pprof"` - OVNEmptyLbEvents bool `gcfg:"ovn-empty-lb-events"` - PodIP string `gcfg:"pod-ip"` + Kubeconfig string `gcfg:"kubeconfig"` + CACert string `gcfg:"cacert"` + APIServer string `gcfg:"apiserver"` + Token string `gcfg:"token"` + ServiceCIDR string `gcfg:"service-cidr"` + OVNConfigNamespace string `gcfg:"ovn-config-namespace"` + MetricsBindAddress string `gcfg:"metrics-bind-address"` + MetricsEnablePprof bool `gcfg:"metrics-enable-pprof"` + OVNEmptyLbEvents bool `gcfg:"ovn-empty-lb-events"` + PodIP string `gcfg:"pod-ip"` + RawNoHostSubnetNodes string `gcfg:"no-hostsubnet-nodes"` + NoHostSubnetNodes *metav1.LabelSelector } // GatewayMode holds the node gateway mode @@ -262,7 +267,7 @@ var ( ) func init() { - // Cache original default config values so they can be restored by testcases + // Cache original default config values savedDefault = Default savedLogging = Logging savedCNI = CNI @@ -294,9 +299,9 @@ func RestoreDefaultConfig() { } // copy members of struct 'src' into the corresponding field in struct 'dst' -// if the field in 'src' is a non-zero int or a non-zero-length string. This -// function should be called with pointers to structs. -func overrideFields(dst, src interface{}) error { +// if the field in 'src' is a non-zero int or a non-zero-length string and +// does not contain a default value. This function should be called with pointers to structs. +func overrideFields(dst, src, defaults interface{}) error { dstStruct := reflect.ValueOf(dst).Elem() srcStruct := reflect.ValueOf(src).Elem() if dstStruct.Kind() != srcStruct.Kind() || dstStruct.Kind() != reflect.Struct { @@ -306,6 +311,10 @@ func overrideFields(dst, src interface{}) error { return fmt.Errorf("mismatched struct types") } + var defStruct reflect.Value + if defaults != nil { + defStruct = reflect.ValueOf(defaults).Elem() + } // Iterate over each field in dst/src Type so we can get the tags, // and use the field name to retrieve the field's actual value from // the dst/src instance @@ -322,32 +331,20 @@ func overrideFields(dst, src interface{}) error { dstField := dstStruct.FieldByName(structField.Name) srcField := srcStruct.FieldByName(structField.Name) + var dv reflect.Value + if defStruct.IsValid() { + dv = defStruct.FieldByName(structField.Name) + } if !dstField.IsValid() || !srcField.IsValid() { return fmt.Errorf("invalid struct %q field %q", dstType.Name(), structField.Name) } if dstField.Kind() != srcField.Kind() { return fmt.Errorf("mismatched struct %q fields %q", dstType.Name(), structField.Name) } - switch srcField.Kind() { - case reflect.String: - if srcField.String() != "" { - dstField.Set(srcField) - } - case reflect.Int: - if srcField.Int() != 0 { - dstField.Set(srcField) - } - case reflect.Uint: - if srcField.Uint() != 0 { - dstField.Set(srcField) - } - case reflect.Bool: - if srcField.Bool() { - dstField.Set(srcField) - } - default: - return fmt.Errorf("unhandled struct %q field %q type %v", dstType.Name(), structField.Name, srcField.Kind()) + if dv.IsValid() && reflect.DeepEqual(dv.Interface(), srcField.Interface()) { + continue } + dstField.Set(srcField) } if !handled { // No tags found in the struct so we don't know how to override @@ -381,21 +378,25 @@ var CommonFlags = []cli.Flag{ cli.StringFlag{ Name: "config-file", Usage: "configuration file path (default: /etc/openvswitch/ovn_k8s.conf)", + //Value: "/etc/openvswitch/ovn_k8s.conf", }, cli.IntFlag{ Name: "mtu", Usage: "MTU value used for the overlay networks (default: 1400)", Destination: &cliConfig.Default.MTU, + Value: Default.MTU, }, cli.IntFlag{ Name: "conntrack-zone", Usage: "For gateway nodes, the conntrack zone used for conntrack flow rules (default: 64000)", Destination: &cliConfig.Default.ConntrackZone, + Value: Default.ConntrackZone, }, cli.StringFlag{ Name: "encap-type", Usage: "The encapsulation protocol to use to transmit packets between hypervisors (default: geneve)", Destination: &cliConfig.Default.EncapType, + Value: Default.EncapType, }, cli.StringFlag{ Name: "encap-ip", @@ -406,18 +407,21 @@ var CommonFlags = []cli.Flag{ Name: "encap-port", Usage: "The UDP port used by the encapsulation endpoint (default: 6081)", Destination: &cliConfig.Default.EncapPort, + Value: Default.EncapPort, }, cli.IntFlag{ Name: "inactivity-probe", Usage: "Maximum number of milliseconds of idle time on " + "connection for ovn-controller before it sends a inactivity probe", Destination: &cliConfig.Default.InactivityProbe, + Value: Default.InactivityProbe, }, cli.IntFlag{ Name: "openflow-probe", Usage: "Maximum number of seconds of idle time on the openflow " + "connection for ovn-controller before it sends a inactivity probe", Destination: &cliConfig.Default.OpenFlowProbe, + Value: Default.OpenFlowProbe, }, cli.StringFlag{ Name: "cluster-subnet", @@ -426,15 +430,16 @@ var CommonFlags = []cli.Flag{ }, cli.StringFlag{ Name: "cluster-subnets", - Value: "10.128.0.0/14/23", + Value: Default.RawClusterSubnets, Usage: "A comma separated set of IP subnets and the associated " + "hostsubnet prefix lengths to use for the cluster (eg, \"10.128.0.0/14/23,10.0.0.0/14/23\"). " + "Each entry is given in the form [IP address/prefix-length/hostsubnet-prefix-length] " + "and cannot overlap with other entries. The hostsubnet-prefix-length " + - "is optional and if unspecified defaults to 24. The " + - "hostsubnet-prefix-length defines how many IP addresses " + - "are dedicated to each node and may be different for each " + - "entry.", + "defines how large a subnet is given to each node and may be different " + + "for each entry. For IPv6 subnets, it must be 64 (and does not need to " + + "be explicitly specified). For IPv4 subnets an explicit " + + "hostsubnet-prefix should be specified, but for backward compatibility " + + "it defaults to 24 if unspecified.", Destination: &cliConfig.Default.RawClusterSubnets, }, cli.BoolFlag{ @@ -457,6 +462,7 @@ var CommonFlags = []cli.Flag{ Name: "loglevel", Usage: "log verbosity and level: 5=debug, 4=info, 3=warn, 2=error, 1=fatal (default: 4)", Destination: &cliConfig.Logging.Level, + Value: Logging.Level, }, cli.StringFlag{ Name: "logfile", @@ -472,11 +478,13 @@ var CNIFlags = []cli.Flag{ Name: "cni-conf-dir", Usage: "the CNI config directory in which to write the overlay CNI config file (default: /etc/cni/net.d)", Destination: &cliConfig.CNI.ConfDir, + Value: CNI.ConfDir, }, cli.StringFlag{ Name: "cni-plugin", Usage: "the name of the CNI plugin (default: ovn-k8s-cni-overlay)", Destination: &cliConfig.CNI.Plugin, + Value: CNI.Plugin, }, cli.StringFlag{ Name: "win-hnsnetwork-id", @@ -499,6 +507,7 @@ var K8sFlags = []cli.Flag{ "provided for kube-apiserver \"-service-cluster-ip-range\" " + "option. (default: 172.16.1.0/24)", Destination: &cliConfig.Kubernetes.ServiceCIDR, + Value: Kubernetes.ServiceCIDR, }, cli.StringFlag{ Name: "k8s-kubeconfig", @@ -509,6 +518,7 @@ var K8sFlags = []cli.Flag{ Name: "k8s-apiserver", Usage: "URL of the Kubernetes API server (not required if --k8s-kubeconfig is given) (default: http://localhost:8443)", Destination: &cliConfig.Kubernetes.APIServer, + Value: Kubernetes.APIServer, }, cli.StringFlag{ Name: "k8s-cacert", @@ -524,6 +534,7 @@ var K8sFlags = []cli.Flag{ Name: "ovn-config-namespace", Usage: "specify a namespace which will contain services to config the OVN databases", Destination: &cliConfig.Kubernetes.OVNConfigNamespace, + Value: Kubernetes.OVNConfigNamespace, }, cli.StringFlag{ Name: "metrics-bind-address", @@ -548,6 +559,11 @@ var K8sFlags = []cli.Flag{ Usage: "specify the ovnkube pod IP.", Destination: &cliConfig.Kubernetes.PodIP, }, + cli.StringFlag{ + Name: "no-hostsubnet-nodes", + Usage: "Specify a label for nodes that will manage their own hostsubnets", + Destination: &cliConfig.Kubernetes.RawNoHostSubnetNodes, + }, } // OvnNBFlags capture OVN northbound database options @@ -560,18 +576,21 @@ var OvnNBFlags = []cli.Flag{ Destination: &cliConfig.OvnNorth.Address, }, cli.StringFlag{ - Name: "nb-client-privkey", - Usage: "Private key that the client should use for talking to the OVN database. Leave empty to use local unix socket. (default: /etc/openvswitch/ovnnb-privkey.pem)", + Name: "nb-client-privkey", + Usage: "Private key that the client should use for talking to the OVN database (default when ssl address is used: /etc/openvswitch/ovnnb-privkey.pem). " + + "Default value for this setting is empty which defaults to use local unix socket.", Destination: &cliConfig.OvnNorth.PrivKey, }, cli.StringFlag{ - Name: "nb-client-cert", - Usage: "Client certificate that the client should use for talking to the OVN database. Leave empty to use local unix socket. (default: /etc/openvswitch/ovnnb-cert.pem)", + Name: "nb-client-cert", + Usage: "Client certificate that the client should use for talking to the OVN database (default when ssl address is used: /etc/openvswitch/ovnnb-cert.pem). " + + "Default value for this setting is empty which defaults to use local unix socket.", Destination: &cliConfig.OvnNorth.Cert, }, cli.StringFlag{ - Name: "nb-client-cacert", - Usage: "CA certificate that the client should use for talking to the OVN database. Leave empty to use local unix socket. (default: /etc/openvswitch/ovnnb-ca.cert)", + Name: "nb-client-cacert", + Usage: "CA certificate that the client should use for talking to the OVN database (default when ssl address is used: /etc/openvswitch/ovnnb-ca.cert)." + + "Default value for this setting is empty which defaults to use local unix socket.", Destination: &cliConfig.OvnNorth.CACert, }, } @@ -586,18 +605,21 @@ var OvnSBFlags = []cli.Flag{ Destination: &cliConfig.OvnSouth.Address, }, cli.StringFlag{ - Name: "sb-client-privkey", - Usage: "Private key that the client should use for talking to the OVN database. Leave empty to use local unix socket. (default: /etc/openvswitch/ovnsb-privkey.pem)", + Name: "sb-client-privkey", + Usage: "Private key that the client should use for talking to the OVN database (default when ssl address is used: /etc/openvswitch/ovnsb-privkey.pem)." + + "Default value for this setting is empty which defaults to use local unix socket.", Destination: &cliConfig.OvnSouth.PrivKey, }, cli.StringFlag{ - Name: "sb-client-cert", - Usage: "Client certificate that the client should use for talking to the OVN database. Leave empty to use local unix socket. (default: /etc/openvswitch/ovnsb-cert.pem)", + Name: "sb-client-cert", + Usage: "Client certificate that the client should use for talking to the OVN database(default when ssl address is used: /etc/openvswitch/ovnsb-cert.pem). " + + "Default value for this setting is empty which defaults to use local unix socket.", Destination: &cliConfig.OvnSouth.Cert, }, cli.StringFlag{ - Name: "sb-client-cacert", - Usage: "CA certificate that the client should use for talking to the OVN database. Leave empty to use local unix socket. (default: /etc/openvswitch/ovnsb-ca.cert)", + Name: "sb-client-cacert", + Usage: "CA certificate that the client should use for talking to the OVN database (default when ssl address is used /etc/openvswitch/ovnsb-ca.cert). " + + "Default value for this setting is empty which defaults to use local unix socket.", Destination: &cliConfig.OvnSouth.CACert, }, } @@ -662,26 +684,31 @@ var MasterHAFlags = []cli.Flag{ Name: "nb-port", Usage: "Port of the OVN northbound DB server to configure (default: 6641)", Destination: &cliConfig.MasterHA.NbPort, + Value: MasterHA.NbPort, }, cli.IntFlag{ Name: "sb-port", Usage: "Port of the OVN southbound DB server to configure (default: 6642)", Destination: &cliConfig.MasterHA.SbPort, + Value: MasterHA.SbPort, }, cli.IntFlag{ Name: "ha-election-lease-duration", Usage: "Leader election lease duration (in secs) (default: 60)", Destination: &cliConfig.MasterHA.ElectionLeaseDuration, + Value: MasterHA.ElectionLeaseDuration, }, cli.IntFlag{ Name: "ha-election-renew-deadline", Usage: "Leader election renew deadline (in secs) (default: 35)", Destination: &cliConfig.MasterHA.ElectionRenewDeadline, + Value: MasterHA.ElectionRenewDeadline, }, cli.IntFlag{ Name: "ha-election-retry-period", Usage: "Leader election retry period (in secs) (default: 10)", Destination: &cliConfig.MasterHA.ElectionRetryPeriod, + Value: MasterHA.ElectionRetryPeriod, }, } @@ -757,7 +784,7 @@ func setOVSExternalID(exec kexec.Interface, key, value string) error { func buildKubernetesConfig(exec kexec.Interface, cli, file *config, saPath string, defaults *Defaults) error { // token adn ca.crt may be from files mounted in container. - var saConfig KubernetesConfig + saConfig := savedKubernetes if data, err := ioutil.ReadFile(filepath.Join(saPath, kubeServiceAccountFileToken)); err == nil { saConfig.Token = string(data) } @@ -765,46 +792,52 @@ func buildKubernetesConfig(exec kexec.Interface, cli, file *config, saPath strin saConfig.CACert = filepath.Join(saPath, kubeServiceAccountFileCACert) } - if err := overrideFields(&Kubernetes, &saConfig); err != nil { + if err := overrideFields(&Kubernetes, &saConfig, &savedKubernetes); err != nil { return err } - // Grab default values from OVS external IDs - if defaults.K8sAPIServer { - Kubernetes.APIServer = getOVSExternalID(exec, "k8s-api-server") - } - if defaults.K8sToken { - Kubernetes.Token = getOVSExternalID(exec, "k8s-api-token") - } - if defaults.K8sCert { - Kubernetes.CACert = getOVSExternalID(exec, "k8s-ca-certificate") - } - // values for token, cacert, kubeconfig, api-server may be found in several places. - // Take the first found when looking in this order: command line options, config file, + // Priority order (highest first): OVS config, command line options, config file, // environment variables, service account files - envConfig := KubernetesConfig{ - Kubeconfig: os.Getenv("KUBECONFIG"), - CACert: os.Getenv("K8S_CACERT"), - APIServer: os.Getenv("K8S_APISERVER"), - Token: os.Getenv("K8S_TOKEN"), + envConfig := savedKubernetes + envVarsMap := map[string]string{ + "Kubeconfig": "KUBECONFIG", + "CACert": "K8S_CACERT", + "APIServer": "K8S_APISERVER", + "Token": "K8S_TOKEN", + } + for k, v := range envVarsMap { + if x, exists := os.LookupEnv(v); exists && len(x) > 0 { + reflect.ValueOf(&envConfig).Elem().FieldByName(k).SetString(x) + } } - if err := overrideFields(&Kubernetes, &envConfig); err != nil { + if err := overrideFields(&Kubernetes, &envConfig, &savedKubernetes); err != nil { return err } // Copy config file values over default values - if err := overrideFields(&Kubernetes, &file.Kubernetes); err != nil { + if err := overrideFields(&Kubernetes, &file.Kubernetes, &savedKubernetes); err != nil { return err } // And CLI overrides over config file and default values - if err := overrideFields(&Kubernetes, &cli.Kubernetes); err != nil { + if err := overrideFields(&Kubernetes, &cli.Kubernetes, &savedKubernetes); err != nil { return err } + // Grab default values from OVS external IDs + if defaults.K8sAPIServer { + Kubernetes.APIServer = getOVSExternalID(exec, "k8s-api-server") + } + if defaults.K8sToken { + Kubernetes.Token = getOVSExternalID(exec, "k8s-api-token") + } + if defaults.K8sCert { + Kubernetes.CACert = getOVSExternalID(exec, "k8s-ca-certificate") + } + if Kubernetes.Kubeconfig != "" && !pathExists(Kubernetes.Kubeconfig) { return fmt.Errorf("kubernetes kubeconfig file %q not found", Kubernetes.Kubeconfig) } @@ -834,12 +867,20 @@ func buildKubernetesConfig(exec kexec.Interface, cli, file *config, saPath strin return fmt.Errorf("Pod IP is invalid") } } + + if Kubernetes.RawNoHostSubnetNodes != "" { + if nodeSelector, err := metav1.ParseToLabelSelector(Kubernetes.RawNoHostSubnetNodes); err == nil { + Kubernetes.NoHostSubnetNodes = nodeSelector + } else { + return fmt.Errorf("labelSelector \"%s\" is invalid: %v", Kubernetes.RawNoHostSubnetNodes, err) + } + } return nil } func buildGatewayConfig(ctx *cli.Context, cli, file *config) error { // Copy config file values over default values - if err := overrideFields(&Gateway, &file.Gateway); err != nil { + if err := overrideFields(&Gateway, &file.Gateway, &savedGateway); err != nil { return err } @@ -854,7 +895,7 @@ func buildGatewayConfig(ctx *cli.Context, cli, file *config) error { } } // And CLI overrides over config file and default values - if err := overrideFields(&Gateway, &cli.Gateway); err != nil { + if err := overrideFields(&Gateway, &cli.Gateway, &savedGateway); err != nil { return err } @@ -889,12 +930,12 @@ func buildGatewayConfig(ctx *cli.Context, cli, file *config) error { func buildMasterHAConfig(ctx *cli.Context, cli, file *config) error { // Copy config file values over default values - if err := overrideFields(&MasterHA, &file.MasterHA); err != nil { + if err := overrideFields(&MasterHA, &file.MasterHA, &savedMasterHA); err != nil { return err } // And CLI overrides over config file and default values - if err := overrideFields(&MasterHA, &cli.MasterHA); err != nil { + if err := overrideFields(&MasterHA, &cli.MasterHA, &savedMasterHA); err != nil { return err } @@ -913,11 +954,11 @@ func buildMasterHAConfig(ctx *cli.Context, cli, file *config) error { } func buildDefaultConfig(cli, file *config) error { - if err := overrideFields(&Default, &file.Default); err != nil { + if err := overrideFields(&Default, &file.Default, &savedDefault); err != nil { return err } - if err := overrideFields(&Default, &cli.Default); err != nil { + if err := overrideFields(&Default, &cli.Default, &savedDefault); err != nil { return err } @@ -937,15 +978,6 @@ func buildDefaultConfig(cli, file *config) error { return nil } -// OtherConfigSubnet returns "other-config:subnet" for IPv4 clusters, and -// "other-config:ipv6_prefix" for IPv6 clusters -func OtherConfigSubnet() string { - if IPv6Mode { - return "other-config:ipv6_prefix" - } - return "other-config:subnet" -} - // getConfigFilePath returns config file path and 'true' if the config file is // the fallback path (eg not given by the user), 'false' if given explicitly // by the user @@ -983,11 +1015,21 @@ func InitConfigSa(ctx *cli.Context, exec kexec.Interface, saPath string, default // common command-line options and constructs the global config object from // them. It returns the config file path (if explicitly specified) or an error func initConfigWithPath(ctx *cli.Context, exec kexec.Interface, saPath string, defaults *Defaults) (string, error) { - var cfg config var retConfigFile string var configFile string var configFileIsDefault bool var err error + // initialize cfg with default values, allow file read to override + cfg := config{ + Default: savedDefault, + Logging: savedLogging, + CNI: savedCNI, + Kubernetes: savedKubernetes, + OvnNorth: savedOvnNorth, + OvnSouth: savedOvnSouth, + Gateway: savedGateway, + MasterHA: savedMasterHA, + } configFile, configFileIsDefault = getConfigFilePath(ctx) @@ -1019,18 +1061,18 @@ func initConfigWithPath(ctx *cli.Context, exec kexec.Interface, saPath string, d } // Build config that needs no special processing - if err = overrideFields(&CNI, &cfg.CNI); err != nil { + if err = overrideFields(&CNI, &cfg.CNI, &savedCNI); err != nil { return "", err } - if err = overrideFields(&CNI, &cliConfig.CNI); err != nil { + if err = overrideFields(&CNI, &cliConfig.CNI, &savedCNI); err != nil { return "", err } // Logging setup - if err = overrideFields(&Logging, &cfg.Logging); err != nil { + if err = overrideFields(&Logging, &cfg.Logging, &savedLogging); err != nil { return "", err } - if err = overrideFields(&Logging, &cliConfig.Logging); err != nil { + if err = overrideFields(&Logging, &cliConfig.Logging, &savedLogging); err != nil { return "", err } @@ -1152,12 +1194,15 @@ func buildOvnAuth(exec kexec.Interface, northbound bool, cliAuth, confAuth *OvnA } var direction string + var defaultAuth *OvnAuthConfig if northbound { auth.externalID = "ovn-nb" direction = "nb" + defaultAuth = &savedOvnNorth } else { auth.externalID = "ovn-remote" direction = "sb" + defaultAuth = &savedOvnSouth } // Determine final address so we know how to set cert/key defaults @@ -1176,10 +1221,10 @@ func buildOvnAuth(exec kexec.Interface, northbound bool, cliAuth, confAuth *OvnA } // Build the final auth config with overrides from CLI and config file - if err := overrideFields(auth, confAuth); err != nil { + if err := overrideFields(auth, confAuth, defaultAuth); err != nil { return nil, err } - if err := overrideFields(auth, cliAuth); err != nil { + if err := overrideFields(auth, cliAuth, defaultAuth); err != nil { return nil, err } diff --git a/go-controller/pkg/config/config_test.go b/go-controller/pkg/config/config_test.go index 964be67fa1..935e1045c2 100644 --- a/go-controller/pkg/config/config_test.go +++ b/go-controller/pkg/config/config_test.go @@ -143,6 +143,7 @@ apiserver=https://1.2.3.4:6443 token=TG9yZW0gaXBzdW0gZ cacert=/path/to/kubeca.crt service-cidr=172.18.0.0/24 +no-hostsubnet-nodes=label=another-test-label [logging] loglevel=5 @@ -225,6 +226,12 @@ var _ = Describe("Config Operations", func() { }) It("uses expected defaults", func() { + // Don't pick up defaults from the environment + os.Unsetenv("KUBECONFIG") + os.Unsetenv("K8S_CACERT") + os.Unsetenv("K8S_APISERVER") + os.Unsetenv("K8S_TOKEN") + app.Action = func(ctx *cli.Context) error { cfgPath, err := InitConfigSa(ctx, kexec.New(), tmpDir, nil) Expect(err).NotTo(HaveOccurred()) @@ -239,8 +246,9 @@ var _ = Describe("Config Operations", func() { Expect(Kubernetes.Kubeconfig).To(Equal("")) Expect(Kubernetes.CACert).To(Equal("")) Expect(Kubernetes.Token).To(Equal("")) - Expect(Kubernetes.APIServer).To(Equal("http://localhost:8080")) + Expect(Kubernetes.APIServer).To(Equal(DefaultAPIServer)) Expect(Kubernetes.ServiceCIDR).To(Equal("172.16.1.0/24")) + Expect(Kubernetes.RawNoHostSubnetNodes).To(Equal("")) Expect(Default.ClusterSubnets).To(Equal([]CIDRNetworkEntry{ {mustParseCIDR("10.128.0.0/14"), 23}, })) @@ -530,6 +538,7 @@ var _ = Describe("Config Operations", func() { Expect(Kubernetes.Token).To(Equal("asdfasdfasdfasfd")) Expect(Kubernetes.APIServer).To(Equal("https://4.4.3.2:8080")) Expect(Kubernetes.ServiceCIDR).To(Equal("172.15.0.0/24")) + Expect(Kubernetes.RawNoHostSubnetNodes).To(Equal("test=pass")) Expect(Default.ClusterSubnets).To(Equal([]CIDRNetworkEntry{ {mustParseCIDR("10.130.0.0/15"), 24}, })) @@ -566,6 +575,7 @@ var _ = Describe("Config Operations", func() { "-k8s-token=asdfasdfasdfasfd", "-k8s-service-cidr=172.15.0.0/24", "-nb-address=ssl://6.5.4.3:6651", + "-no-hostsubnet-nodes=test=pass", "-nb-client-privkey=/client/privkey", "-nb-client-cert=/client/cert", "-nb-client-cacert=/client/cacert", @@ -746,6 +756,7 @@ mode=shared Expect(Kubernetes.CACert).To(Equal(kubeCAFile)) Expect(Kubernetes.Token).To(Equal("asdfasdfasdfasfd")) Expect(Kubernetes.APIServer).To(Equal("https://4.4.3.2:8080")) + Expect(Kubernetes.RawNoHostSubnetNodes).To(Equal("label=another-test-label")) Expect(Kubernetes.ServiceCIDR).To(Equal("172.15.0.0/24")) Expect(OvnNorth.Scheme).To(Equal(OvnDBSchemeSSL)) @@ -791,6 +802,52 @@ mode=shared Expect(err).NotTo(HaveOccurred()) }) + It("does not override config file settings with default cli options", func() { + kubeconfigFile, err := createTempFile("kubeconfig") + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(kubeconfigFile) + + kubeCAFile, err := createTempFile("kube-ca.crt") + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(kubeCAFile) + + err = writeTestConfigFile(cfgFile.Name()) + Expect(err).NotTo(HaveOccurred()) + + app.Action = func(ctx *cli.Context) error { + var cfgPath string + cfgPath, err = InitConfig(ctx, kexec.New(), nil) + Expect(err).NotTo(HaveOccurred()) + Expect(cfgPath).To(Equal(cfgFile.Name())) + + Expect(Default.MTU).To(Equal(1500)) + Expect(Default.ConntrackZone).To(Equal(64321)) + Expect(Default.RawClusterSubnets).To(Equal("10.129.0.0/14/23")) + Expect(Default.ClusterSubnets).To(Equal([]CIDRNetworkEntry{ + {mustParseCIDR("10.129.0.0/14"), 23}, + })) + Expect(Logging.File).To(Equal("/var/log/ovnkube.log")) + Expect(Logging.Level).To(Equal(5)) + Expect(CNI.ConfDir).To(Equal("/etc/cni/net.d22")) + Expect(CNI.Plugin).To(Equal("ovn-k8s-cni-overlay22")) + Expect(Kubernetes.Kubeconfig).To(Equal(kubeconfigFile)) + Expect(Kubernetes.CACert).To(Equal(kubeCAFile)) + Expect(Kubernetes.Token).To(Equal("TG9yZW0gaXBzdW0gZ")) + Expect(Kubernetes.ServiceCIDR).To(Equal("172.18.0.0/24")) + + return nil + } + + cliArgs := []string{ + app.Name, + "-config-file=" + cfgFile.Name(), + "-k8s-kubeconfig=" + kubeconfigFile, + "-k8s-cacert=" + kubeCAFile, + } + err = app.Run(cliArgs) + Expect(err).NotTo(HaveOccurred()) + }) + Describe("OvnDBAuth operations", func() { var certFile, keyFile, caFile string diff --git a/go-controller/pkg/config/utils.go b/go-controller/pkg/config/utils.go index 2a14d7c4fa..27bbccaaf8 100644 --- a/go-controller/pkg/config/utils.go +++ b/go-controller/pkg/config/utils.go @@ -52,23 +52,24 @@ func ParseClusterSubnetEntries(clusterSubnetCmd string) ([]CIDRNetworkEntry, err return nil, err } parsedClusterEntry.HostSubnetLength = uint32(tmp) - if parsedClusterEntry.HostSubnetLength <= uint32(entryMaskLength) { - return nil, fmt.Errorf("cannot use a host subnet length mask shorter than or equal to the cluster subnet mask. "+ - "host subnet length: %d, cluster subnet length: %d", parsedClusterEntry.HostSubnetLength, entryMaskLength) + + if ipv6 && parsedClusterEntry.HostSubnetLength != 64 { + return nil, fmt.Errorf("IPv6 only supports /64 host subnets") } } else { - // no host subnet prefix provided - default to 24 for ipv4 (legacy behavior), error for ipv6 if ipv6 { - return nil, fmt.Errorf("host subnet prefix length missing. Required for IPv6") - } else if entryMaskLength >= 24 { - return nil, fmt.Errorf("cluster subnet prefix length of %d is too long for host subnet prefix"+ - "length of 24. Please specify host subnet prefix or shorten cluster subnet prefix", entryMaskLength) + parsedClusterEntry.HostSubnetLength = 64 } else { - // default to 24 bit prefix for IPv4 + // default for backward compatibility parsedClusterEntry.HostSubnetLength = 24 } } + if parsedClusterEntry.HostSubnetLength <= uint32(entryMaskLength) { + return nil, fmt.Errorf("cannot use a host subnet length mask shorter than or equal to the cluster subnet mask. "+ + "host subnet length: %d, cluster subnet length: %d", parsedClusterEntry.HostSubnetLength, entryMaskLength) + } + //check to make sure that no cidrs overlap if cidrsOverlap(parsedClusterEntry.CIDR, parsedClusterList) { return nil, fmt.Errorf("CIDR %q overlaps with another cluster network CIDR", clusterEntry) diff --git a/go-controller/pkg/config/utils_test.go b/go-controller/pkg/config/utils_test.go index 12ed6d5181..066e616062 100644 --- a/go-controller/pkg/config/utils_test.go +++ b/go-controller/pkg/config/utils_test.go @@ -81,8 +81,32 @@ func TestParseClusterSubnetEntries(t *testing.T) { expectedErr: true, }, { - name: "Test that legacy behavior does not work for IPv6", - cmdLineArg: "fda6:78cc:acf2:a039::/64", + name: "IPv6", + cmdLineArg: "fda6::/48/64", + clusterNetworks: []CIDRNetworkEntry{{CIDR: returnIPNetPointers("fda6::/48"), HostSubnetLength: 64}}, + expectedErr: false, + }, + { + name: "IPv6 defaults to /64 hostsubnets", + cmdLineArg: "fda6::/48", + clusterNetworks: []CIDRNetworkEntry{{CIDR: returnIPNetPointers("fda6::/48"), HostSubnetLength: 64}}, + expectedErr: false, + }, + { + name: "IPv6 doesn't allow longer than /64 hostsubnet", + cmdLineArg: "fda6::/48/56", + clusterNetworks: nil, + expectedErr: true, + }, + { + name: "IPv6 doesn't allow shorter than /64 hostsubnet", + cmdLineArg: "fda6::/48/72", + clusterNetworks: nil, + expectedErr: true, + }, + { + name: "IPv6 can't use /64 cluster net", + cmdLineArg: "fda6::/64", clusterNetworks: nil, expectedErr: true, }, diff --git a/go-controller/pkg/factory/factory.go b/go-controller/pkg/factory/factory.go index 26a22b46e8..254362aa60 100644 --- a/go-controller/pkg/factory/factory.go +++ b/go-controller/pkg/factory/factory.go @@ -58,22 +58,16 @@ func (h *Handler) kill() error { return nil } -type eventKind int - -const ( - addEvent eventKind = iota - updateEvent - deleteEvent -) - type event struct { - obj interface{} - oldObj interface{} - kind eventKind + obj interface{} + oldObj interface{} + process func(*event) } type listerInterface interface{} +type initialAddFn func(*Handler, []interface{}) + type informer struct { sync.RWMutex oType reflect.Type @@ -81,6 +75,9 @@ type informer struct { handlers map[uint64]*Handler events []chan *event lister listerInterface + // initialAddFunc will be called to deliver the initial list of objects + // when a handler is added + initialAddFunc initialAddFn } func (i *informer) forEachQueuedHandler(f func(h *Handler)) { @@ -97,8 +94,8 @@ func (i *informer) forEachQueuedHandler(f func(h *Handler)) { } func (i *informer) forEachHandler(obj interface{}, f func(h *Handler)) { - i.Lock() - defer i.Unlock() + i.RLock() + defer i.RUnlock() objType := reflect.TypeOf(obj) if objType != i.oType { @@ -111,10 +108,7 @@ func (i *informer) forEachHandler(obj interface{}, f func(h *Handler)) { } } -func (i *informer) addHandler(id uint64, filterFunc func(obj interface{}) bool, funcs cache.ResourceEventHandler) *Handler { - i.Lock() - defer i.Unlock() - +func (i *informer) addHandler(id uint64, filterFunc func(obj interface{}) bool, funcs cache.ResourceEventHandler, existingItems []interface{}) *Handler { handler := &Handler{ cache.FilteringResourceEventHandler{ FilterFunc: filterFunc, @@ -123,6 +117,12 @@ func (i *informer) addHandler(id uint64, filterFunc func(obj interface{}) bool, id, handlerAlive, } + + // Send existing items to the handler's add function; informers usually + // do this but since we share informers, it's long-since happened so + // we must emulate that here + i.initialAddFunc(handler, existingItems) + i.handlers[id] = handler return handler } @@ -156,31 +156,18 @@ func (i *informer) processEvents(events chan *event, stopChan <-chan struct{}) { if !ok { return } - switch e.kind { - case addEvent: - i.forEachQueuedHandler(func(h *Handler) { - h.OnAdd(e.obj) - }) - case updateEvent: - i.forEachQueuedHandler(func(h *Handler) { - h.OnUpdate(e.oldObj, e.obj) - }) - case deleteEvent: - i.forEachQueuedHandler(func(h *Handler) { - h.OnDelete(e.obj) - }) - } + e.process(e) case <-stopChan: return } } } -func (i *informer) enqueueEvent(oldObj, obj interface{}, kind eventKind) { - meta, err := getObjectMeta(i.oType, obj) +func getQueueNum(oType reflect.Type, obj interface{}) uint32 { + meta, err := getObjectMeta(oType, obj) if err != nil { logrus.Errorf("object has no meta: %v", err) - return + return 0 } // Distribute the object to an event queue based on a hash of its @@ -192,15 +179,20 @@ func (i *informer) enqueueEvent(oldObj, obj interface{}, kind eventKind) { _, _ = h.Write([]byte("/")) } _, _ = h.Write([]byte(meta.Name)) - queueIdx := h.Sum32() % uint32(numEventQueues) + return h.Sum32() % uint32(numEventQueues) +} +// enqueueEvent adds an event to the queue. Caller must hold at least a read lock +// on the informer. +func (i *informer) enqueueEvent(oldObj, obj interface{}, processFunc func(*event)) { i.RLock() defer i.RUnlock() + queueIdx := getQueueNum(i.oType, obj) if i.events[queueIdx] != nil { i.events[queueIdx] <- &event{ - obj: obj, - oldObj: oldObj, - kind: kind, + obj: obj, + oldObj: oldObj, + process: processFunc, } } } @@ -224,10 +216,18 @@ func ensureObjectOnDelete(obj interface{}, expectedType reflect.Type) (interface func (i *informer) newFederatedQueuedHandler() cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - i.enqueueEvent(nil, obj, addEvent) + i.enqueueEvent(nil, obj, func(e *event) { + i.forEachQueuedHandler(func(h *Handler) { + h.OnAdd(e.obj) + }) + }) }, UpdateFunc: func(oldObj, newObj interface{}) { - i.enqueueEvent(oldObj, newObj, updateEvent) + i.enqueueEvent(oldObj, newObj, func(e *event) { + i.forEachQueuedHandler(func(h *Handler) { + h.OnUpdate(e.oldObj, e.obj) + }) + }) }, DeleteFunc: func(obj interface{}) { realObj, err := ensureObjectOnDelete(obj, i.oType) @@ -235,7 +235,11 @@ func (i *informer) newFederatedQueuedHandler() cache.ResourceEventHandlerFuncs { logrus.Errorf(err.Error()) return } - i.enqueueEvent(nil, realObj, deleteEvent) + i.enqueueEvent(nil, realObj, func(e *event) { + i.forEachQueuedHandler(func(h *Handler) { + h.OnDelete(e.obj) + }) + }) }, } } @@ -319,7 +323,11 @@ func newInformer(oType reflect.Type, sharedInformer cache.SharedIndexInformer) ( if err != nil { return nil, err } - + i.initialAddFunc = func(h *Handler, items []interface{}) { + for _, item := range items { + h.OnAdd(item) + } + } i.inf.AddEventHandler(i.newFederatedHandler()) return i, nil } @@ -334,6 +342,38 @@ func newQueuedInformer(oType reflect.Type, sharedInformer cache.SharedIndexInfor i.events[j] = make(chan *event, 1) go i.processEvents(i.events[j], stopChan) } + i.initialAddFunc = func(h *Handler, items []interface{}) { + // Make a handler-specific channel array across which the + // initial add events will be distributed. + adds := make([]chan interface{}, numEventQueues) + queueWg := &sync.WaitGroup{} + queueWg.Add(len(adds)) + for j := range adds { + adds[j] = make(chan interface{}, 1) + go func(addChan chan interface{}) { + defer queueWg.Done() + for { + obj, ok := <-addChan + if !ok { + return + } + h.OnAdd(obj) + } + }(adds[j]) + } + // Distribute the existing items into the handler-specific + // channel array. + for _, obj := range items { + queueIdx := getQueueNum(i.oType, obj) + adds[queueIdx] <- obj + } + // Close all the channels + for j := range adds { + close(adds[j]) + } + // Wait until all the object additions have been processed + queueWg.Wait() + } i.inf.AddEventHandler(i.newFederatedQueuedHandler()) return i, nil } @@ -499,30 +539,24 @@ func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel return true } - // Process existing items as a set so the caller can clean up - // after a restart or whatever - existingItems := inf.inf.GetStore().List() - if processExisting != nil { - items := make([]interface{}, 0) - for _, obj := range existingItems { - if filterFunc(obj) { - items = append(items, obj) - } + inf.Lock() + defer inf.Unlock() + + items := make([]interface{}, 0) + for _, obj := range inf.inf.GetStore().List() { + if filterFunc(obj) { + items = append(items, obj) } + } + if processExisting != nil { + // Process existing items as a set so the caller can clean up + // after a restart or whatever processExisting(items) } handlerID := atomic.AddUint64(&wf.handlerCounter, 1) - handler := inf.addHandler(handlerID, filterFunc, funcs) + handler := inf.addHandler(handlerID, filterFunc, funcs, items) logrus.Debugf("added %v event handler %d", objType, handler.id) - - // Send existing items to the handler's add function; informers usually - // do this but since we share informers, it's long-since happened so - // we must emulate that here - for _, obj := range existingItems { - handler.OnAdd(obj) - } - return handler, nil } @@ -603,11 +637,6 @@ func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting) } -// AddFilteredNodeHandler adds a handler function that will be executed when Node objects that match the given filters change -func (wf *WatchFactory) AddFilteredNodeHandler(lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (*Handler, error) { - return wf.addHandler(nodeType, "", lsel, handlerFuncs, processExisting) -} - // RemoveNodeHandler removes a Node object event handler function func (wf *WatchFactory) RemoveNodeHandler(handler *Handler) error { return wf.removeHandler(nodeType, handler) diff --git a/go-controller/pkg/factory/factory_test.go b/go-controller/pkg/factory/factory_test.go index 12d1cd72a6..93325e67f7 100644 --- a/go-controller/pkg/factory/factory_test.go +++ b/go-controller/pkg/factory/factory_test.go @@ -3,6 +3,8 @@ package factory import ( "fmt" "reflect" + "sync" + "sync/atomic" "testing" "k8s.io/api/core/v1" @@ -103,6 +105,24 @@ func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, return w } +type handlerCalls struct { + added int32 + updated int32 + deleted int32 +} + +func (c *handlerCalls) getAdded() int { + return int(atomic.LoadInt32(&c.added)) +} + +func (c *handlerCalls) getUpdated() int { + return int(atomic.LoadInt32(&c.updated)) +} + +func (c *handlerCalls) getDeleted() int { + return int(atomic.LoadInt32(&c.deleted)) +} + var _ = Describe("Watch Factory Operations", func() { var ( fakeClient *fake.Clientset @@ -115,7 +135,6 @@ var _ = Describe("Watch Factory Operations", func() { endpoints []*v1.Endpoints services []*v1.Service stop chan struct{} - numAdded, numUpdated, numDeleted int ) BeforeEach(func() { @@ -175,10 +194,10 @@ var _ = Describe("Watch Factory Operations", func() { } return true, obj, nil }) + }) - numAdded = 0 - numUpdated = 0 - numDeleted = 0 + AfterEach(func() { + close(stop) }) Context("when a processExisting is given", func() { @@ -188,12 +207,12 @@ var _ = Describe("Watch Factory Operations", func() { h, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{}, func(objs []interface{}) { + defer GinkgoRecover() Expect(len(objs)).To(Equal(1)) }) Expect(err).NotTo(HaveOccurred()) Expect(h).NotTo(BeNil()) wf.removeHandler(objType, h) - close(stop) } It("is called for each existing pod", func() { @@ -234,33 +253,24 @@ var _ = Describe("Watch Factory Operations", func() { MatchLabels: map[string]string{"blah": "foobar"}, }) }) - - It("is called for each existing node that matches a given label", func() { - node := newNode("default") - node.ObjectMeta.Labels["blah"] = "foobar" - nodes = append(nodes, node) - testExisting(nodeType, "", &metav1.LabelSelector{ - MatchLabels: map[string]string{"blah": "foobar"}, - }) - }) }) Context("when existing items are known to the informer", func() { testExisting := func(objType reflect.Type) { wf, err := NewWatchFactory(fakeClient, stop) Expect(err).NotTo(HaveOccurred()) + var addCalls int32 h, err := wf.addHandler(objType, "", nil, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - numAdded++ + atomic.AddInt32(&addCalls, 1) }, UpdateFunc: func(old, new interface{}) {}, DeleteFunc: func(obj interface{}) {}, }, nil) Expect(err).NotTo(HaveOccurred()) - Expect(numAdded).To(Equal(2)) + Expect(int(addCalls)).To(Equal(2)) wf.removeHandler(objType, h) - close(stop) } It("calls ADD for each existing pod", func() { @@ -300,30 +310,31 @@ var _ = Describe("Watch Factory Operations", func() { }) }) - addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) *Handler { + addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) (*Handler, *handlerCalls) { + calls := handlerCalls{} h, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { defer GinkgoRecover() - numAdded++ + atomic.AddInt32(&calls.added, 1) funcs.AddFunc(obj) }, UpdateFunc: func(old, new interface{}) { defer GinkgoRecover() - numUpdated++ + atomic.AddInt32(&calls.updated, 1) funcs.UpdateFunc(old, new) }, DeleteFunc: func(obj interface{}) { defer GinkgoRecover() - numDeleted++ + atomic.AddInt32(&calls.deleted, 1) funcs.DeleteFunc(obj) }, }, nil) Expect(err).NotTo(HaveOccurred()) Expect(h).NotTo(BeNil()) - return h + return h, &calls } - addHandler := func(wf *WatchFactory, objType reflect.Type, funcs cache.ResourceEventHandlerFuncs) *Handler { + addHandler := func(wf *WatchFactory, objType reflect.Type, funcs cache.ResourceEventHandlerFuncs) (*Handler, *handlerCalls) { return addFilteredHandler(wf, objType, "", nil, funcs) } @@ -332,7 +343,7 @@ var _ = Describe("Watch Factory Operations", func() { Expect(err).NotTo(HaveOccurred()) added := newPod("pod1", "default") - h := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{ + h, c := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*v1.Pod) Expect(reflect.DeepEqual(pod, added)).To(BeTrue()) @@ -350,16 +361,15 @@ var _ = Describe("Watch Factory Operations", func() { pods = append(pods, added) podWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + Eventually(c.getAdded, 2).Should(Equal(1)) added.Spec.NodeName = "foobar" podWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + Eventually(c.getUpdated, 2).Should(Equal(1)) pods = pods[:0] podWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + Eventually(c.getDeleted, 2).Should(Equal(1)) wf.RemovePodHandler(h) - close(stop) }) It("responds to multiple pod add/update/delete events", func() { @@ -381,7 +391,7 @@ var _ = Describe("Watch Factory Operations", func() { testPods[name] = &opTest{pod: pod} } - h := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{ + h, c := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*v1.Pod) ot, ok := testPods[pod.Name] @@ -420,17 +430,16 @@ var _ = Describe("Watch Factory Operations", func() { // Ensure total number of each operation is 10; and each // node's individual operation count is 2 - Eventually(func() int { return numAdded }, 2).Should(Equal(10)) - Eventually(func() int { return numUpdated }, 2).Should(Equal(10)) - Eventually(func() int { return numDeleted }, 2).Should(Equal(10)) + Eventually(c.getAdded, 2).Should(Equal(10)) + Eventually(c.getUpdated, 2).Should(Equal(10)) + Eventually(c.getDeleted, 2).Should(Equal(10)) for _, ot := range testPods { Expect(ot.added).Should(Equal(2)) Expect(ot.updated).Should(Equal(2)) Expect(ot.deleted).Should(Equal(2)) } - wf.removeHandler(podType, h) - close(stop) + wf.RemovePodHandler(h) }) It("responds to namespace add/update/delete events", func() { @@ -438,7 +447,7 @@ var _ = Describe("Watch Factory Operations", func() { Expect(err).NotTo(HaveOccurred()) added := newNamespace("default") - h := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ + h, c := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ns := obj.(*v1.Namespace) Expect(reflect.DeepEqual(ns, added)).To(BeTrue()) @@ -456,16 +465,15 @@ var _ = Describe("Watch Factory Operations", func() { namespaces = append(namespaces, added) namespaceWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + Eventually(c.getAdded, 2).Should(Equal(1)) added.Status.Phase = v1.NamespaceTerminating namespaceWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + Eventually(c.getUpdated, 2).Should(Equal(1)) namespaces = namespaces[:0] namespaceWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + Eventually(c.getDeleted, 2).Should(Equal(1)) wf.RemoveNamespaceHandler(h) - close(stop) }) It("responds to node add/update/delete events", func() { @@ -473,7 +481,7 @@ var _ = Describe("Watch Factory Operations", func() { Expect(err).NotTo(HaveOccurred()) added := newNode("mynode") - h := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{ + h, c := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { node := obj.(*v1.Node) Expect(reflect.DeepEqual(node, added)).To(BeTrue()) @@ -491,16 +499,15 @@ var _ = Describe("Watch Factory Operations", func() { nodes = append(nodes, added) nodeWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + Eventually(c.getAdded, 2).Should(Equal(1)) added.Status.Phase = v1.NodeTerminated nodeWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + Eventually(c.getUpdated, 2).Should(Equal(1)) nodes = nodes[:0] nodeWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + Eventually(c.getDeleted, 2).Should(Equal(1)) - wf.removeHandler(nodeType, h) - close(stop) + wf.RemoveNodeHandler(h) }) It("responds to multiple node add/update/delete events", func() { @@ -521,7 +528,7 @@ var _ = Describe("Watch Factory Operations", func() { testNodes[name] = &opTest{node: node} } - h := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{ + h, c := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { node := obj.(*v1.Node) ot, ok := testNodes[node.Name] @@ -560,17 +567,162 @@ var _ = Describe("Watch Factory Operations", func() { // Ensure total number of each operation is 10; and each // node's individual operation count is 2 - Eventually(func() int { return numAdded }, 2).Should(Equal(10)) - Eventually(func() int { return numUpdated }, 2).Should(Equal(10)) - Eventually(func() int { return numDeleted }, 2).Should(Equal(10)) + Eventually(c.getAdded, 2).Should(Equal(10)) + Eventually(c.getUpdated, 2).Should(Equal(10)) + Eventually(c.getDeleted, 2).Should(Equal(10)) for _, ot := range testNodes { Expect(ot.added).Should(Equal(2)) Expect(ot.updated).Should(Equal(2)) Expect(ot.deleted).Should(Equal(2)) } - wf.removeHandler(nodeType, h) - close(stop) + wf.RemoveNodeHandler(h) + }) + + It("correctly orders queued informer initial add events and subsequent update events", func() { + type opTest struct { + node *v1.Node + added int + updated int + } + testNodes := make(map[string]*opTest) + + for i := 0; i < 600; i++ { + name := fmt.Sprintf("mynode-%d", i) + node := newNode(name) + testNodes[name] = &opTest{node: node} + // Add all nodes to the initial list + nodes = append(nodes, node) + } + + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + startWg := sync.WaitGroup{} + startWg.Add(1) + doneWg := sync.WaitGroup{} + doneWg.Add(1) + go func() { + startWg.Done() + // Send an update event for each node + for _, n := range nodes { + n.Status.Phase = v1.NodeTerminated + nodeWatch.Modify(n) + } + doneWg.Done() + }() + startWg.Wait() + + h, c := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + defer GinkgoRecover() + node := obj.(*v1.Node) + ot, ok := testNodes[node.Name] + Expect(ok).To(BeTrue()) + Expect(ot.added).To(Equal(0), "add for node %s already run", node.Name) + ot.added++ + }, + UpdateFunc: func(old, new interface{}) { + defer GinkgoRecover() + newNode := new.(*v1.Node) + ot, ok := testNodes[newNode.Name] + Expect(ok).To(BeTrue()) + // Expect updates to be processed after Add + Expect(ot.added).To(Equal(1), "update for node %s processed before initial add!", newNode.Name) + Expect(ot.updated).To(Equal(0)) + ot.updated++ + Expect(newNode.Status.Phase).To(Equal(v1.NodeTerminated)) + }, + DeleteFunc: func(obj interface{}) {}, + }) + doneWg.Wait() + + // Adds are done synchronously at handler addition time + for _, ot := range testNodes { + Expect(ot.added).To(Equal(1), "missing add for node %s", ot.node.Name) + } + Expect(c.getAdded()).To(Equal(len(testNodes))) + + // Updates are async and may take a bit longer to finish + Eventually(c.getUpdated, 10).Should(Equal(len(testNodes))) + for _, ot := range testNodes { + Expect(ot.updated).To(Equal(1), "missing update for node %s", ot.node.Name) + } + + wf.RemoveNodeHandler(h) + }) + + It("correctly orders serialized informer initial add events and subsequent update events", func() { + type opTest struct { + namespace *v1.Namespace + added int + updated int + } + testNamespaces := make(map[string]*opTest) + + for i := 0; i < 598; i++ { + name := fmt.Sprintf("mynamespace-%d", i) + namespace := newNamespace(name) + testNamespaces[name] = &opTest{namespace: namespace} + // Add all namespaces to the initial list + namespaces = append(namespaces, namespace) + } + + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + startWg := sync.WaitGroup{} + startWg.Add(1) + doneWg := sync.WaitGroup{} + doneWg.Add(1) + go func() { + startWg.Done() + // Send an update event for each namespace + for _, n := range namespaces { + n.Status.Phase = v1.NamespaceTerminating + namespaceWatch.Modify(n) + } + doneWg.Done() + }() + startWg.Wait() + + h, c := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + defer GinkgoRecover() + namespace := obj.(*v1.Namespace) + ot, ok := testNamespaces[namespace.Name] + Expect(ok).To(BeTrue()) + Expect(ot.added).To(Equal(0)) + ot.added++ + }, + UpdateFunc: func(old, new interface{}) { + defer GinkgoRecover() + newNamespace := new.(*v1.Namespace) + ot, ok := testNamespaces[newNamespace.Name] + Expect(ok).To(BeTrue()) + // Expect updates to be processed after Add + Expect(ot.added).To(Equal(1), "update for namespace %s processed before initial add!", newNamespace.Name) + Expect(ot.updated).To(Equal(0)) + ot.updated++ + Expect(newNamespace.Status.Phase).To(Equal(v1.NamespaceTerminating)) + }, + DeleteFunc: func(obj interface{}) {}, + }) + doneWg.Wait() + + // Adds are done synchronously at handler addition time + for _, ot := range testNamespaces { + Expect(ot.added).To(Equal(1), "missing add for namespace %s", ot.namespace.Name) + } + Expect(c.getAdded()).To(Equal(len(testNamespaces))) + + // Updates are async and may take a bit longer to finish + Eventually(c.getUpdated, 10).Should(Equal(len(testNamespaces))) + for _, ot := range testNamespaces { + Expect(ot.updated).To(Equal(1), "missing update for namespace %s", ot.namespace.Name) + } + + wf.RemoveNamespaceHandler(h) }) It("responds to policy add/update/delete events", func() { @@ -578,7 +730,7 @@ var _ = Describe("Watch Factory Operations", func() { Expect(err).NotTo(HaveOccurred()) added := newPolicy("mypolicy", "default") - h := addHandler(wf, policyType, cache.ResourceEventHandlerFuncs{ + h, c := addHandler(wf, policyType, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { np := obj.(*knet.NetworkPolicy) Expect(reflect.DeepEqual(np, added)).To(BeTrue()) @@ -596,16 +748,15 @@ var _ = Describe("Watch Factory Operations", func() { policies = append(policies, added) policyWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + Eventually(c.getAdded, 2).Should(Equal(1)) added.Spec.PolicyTypes = []knet.PolicyType{knet.PolicyTypeIngress} policyWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + Eventually(c.getUpdated, 2).Should(Equal(1)) policies = policies[:0] policyWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + Eventually(c.getDeleted, 2).Should(Equal(1)) - wf.removeHandler(policyType, h) - close(stop) + wf.RemovePolicyHandler(h) }) It("responds to endpoints add/update/delete events", func() { @@ -613,7 +764,7 @@ var _ = Describe("Watch Factory Operations", func() { Expect(err).NotTo(HaveOccurred()) added := newEndpoints("myendpoints", "default") - h := addHandler(wf, endpointsType, cache.ResourceEventHandlerFuncs{ + h, c := addHandler(wf, endpointsType, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { eps := obj.(*v1.Endpoints) Expect(reflect.DeepEqual(eps, added)).To(BeTrue()) @@ -631,7 +782,7 @@ var _ = Describe("Watch Factory Operations", func() { endpoints = append(endpoints, added) endpointsWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + Eventually(c.getAdded, 2).Should(Equal(1)) added.Subsets = append(added.Subsets, v1.EndpointSubset{ Ports: []v1.EndpointPort{ { @@ -641,13 +792,12 @@ var _ = Describe("Watch Factory Operations", func() { }, }) endpointsWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + Eventually(c.getUpdated, 2).Should(Equal(1)) endpoints = endpoints[:0] endpointsWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + Eventually(c.getDeleted, 2).Should(Equal(1)) - wf.removeHandler(endpointsType, h) - close(stop) + wf.RemoveEndpointsHandler(h) }) It("responds to service add/update/delete events", func() { @@ -655,7 +805,7 @@ var _ = Describe("Watch Factory Operations", func() { Expect(err).NotTo(HaveOccurred()) added := newService("myservice", "default") - h := addHandler(wf, serviceType, cache.ResourceEventHandlerFuncs{ + h, c := addHandler(wf, serviceType, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { service := obj.(*v1.Service) Expect(reflect.DeepEqual(service, added)).To(BeTrue()) @@ -673,16 +823,15 @@ var _ = Describe("Watch Factory Operations", func() { services = append(services, added) serviceWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + Eventually(c.getAdded, 2).Should(Equal(1)) added.Spec.ClusterIP = "1.1.1.1" serviceWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + Eventually(c.getUpdated, 2).Should(Equal(1)) services = services[:0] serviceWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + Eventually(c.getDeleted, 2).Should(Equal(1)) - wf.removeHandler(serviceType, h) - close(stop) + wf.RemoveServiceHandler(h) }) It("stops processing events after the handler is removed", func() { @@ -690,7 +839,7 @@ var _ = Describe("Watch Factory Operations", func() { Expect(err).NotTo(HaveOccurred()) added := newNamespace("default") - h := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ + h, c := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) {}, UpdateFunc: func(old, new interface{}) {}, DeleteFunc: func(obj interface{}) {}, @@ -698,22 +847,20 @@ var _ = Describe("Watch Factory Operations", func() { namespaces = append(namespaces, added) namespaceWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + Eventually(c.getAdded, 2).Should(Equal(1)) wf.RemoveNamespaceHandler(h) added2 := newNamespace("other") namespaces = append(namespaces, added2) namespaceWatch.Add(added2) - Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + Consistently(c.getAdded, 2).Should(Equal(1)) added2.Status.Phase = v1.NamespaceTerminating namespaceWatch.Modify(added2) - Consistently(func() int { return numUpdated }, 2).Should(Equal(0)) + Consistently(c.getUpdated, 2).Should(Equal(0)) namespaces = []*v1.Namespace{added} namespaceWatch.Delete(added2) - Consistently(func() int { return numDeleted }, 2).Should(Equal(0)) - - close(stop) + Consistently(c.getDeleted, 2).Should(Equal(0)) }) It("filters correctly by label and namespace", func() { @@ -727,7 +874,7 @@ var _ = Describe("Watch Factory Operations", func() { failsFilter2 := newPod("pod3", "otherns") failsFilter2.ObjectMeta.Labels["blah"] = "foobar" - addFilteredHandler(wf, + _, c := addFilteredHandler(wf, podType, "default", &metav1.LabelSelector{ @@ -750,36 +897,34 @@ var _ = Describe("Watch Factory Operations", func() { pods = append(pods, passesFilter) podWatch.Add(passesFilter) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + Eventually(c.getAdded, 2).Should(Equal(1)) // numAdded should remain 1 pods = append(pods, failsFilter) podWatch.Add(failsFilter) - Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + Consistently(c.getAdded, 2).Should(Equal(1)) // numAdded should remain 1 pods = append(pods, failsFilter2) podWatch.Add(failsFilter2) - Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + Consistently(c.getAdded, 2).Should(Equal(1)) passesFilter.Status.Phase = v1.PodFailed podWatch.Modify(passesFilter) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + Eventually(c.getUpdated, 2).Should(Equal(1)) // numAdded should remain 1 failsFilter.Status.Phase = v1.PodFailed podWatch.Modify(failsFilter) - Consistently(func() int { return numUpdated }, 2).Should(Equal(1)) + Consistently(c.getUpdated, 2).Should(Equal(1)) failsFilter2.Status.Phase = v1.PodFailed podWatch.Modify(failsFilter2) - Consistently(func() int { return numUpdated }, 2).Should(Equal(1)) + Consistently(c.getUpdated, 2).Should(Equal(1)) pods = []*v1.Pod{failsFilter, failsFilter2} podWatch.Delete(passesFilter) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) - - close(stop) + Eventually(c.getDeleted, 2).Should(Equal(1)) }) It("correctly handles object updates that cause filter changes", func() { @@ -790,7 +935,7 @@ var _ = Describe("Watch Factory Operations", func() { pod.ObjectMeta.Labels["blah"] = "baz" equalPod := pod - h := addFilteredHandler(wf, + h, c := addFilteredHandler(wf, podType, "default", &metav1.LabelSelector{ @@ -812,7 +957,7 @@ var _ = Describe("Watch Factory Operations", func() { // Pod doesn't pass filter; shouldn't be added podWatch.Add(pod) - Consistently(func() int { return numAdded }, 2).Should(Equal(0)) + Consistently(c.getAdded, 2).Should(Equal(0)) // Update pod to pass filter; should be treated as add. Need // to deep-copy pod when modifying because it's a pointer all @@ -822,16 +967,15 @@ var _ = Describe("Watch Factory Operations", func() { pods = []*v1.Pod{podCopy} equalPod = podCopy podWatch.Modify(podCopy) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + Eventually(c.getAdded, 2).Should(Equal(1)) // Update pod to fail filter; should be treated as delete pod.ObjectMeta.Labels["blah"] = "baz" podWatch.Modify(pod) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) - Consistently(func() int { return numAdded }, 2).Should(Equal(1)) - Consistently(func() int { return numUpdated }, 2).Should(Equal(0)) + Eventually(c.getDeleted, 2).Should(Equal(1)) + Consistently(c.getAdded, 2).Should(Equal(1)) + Consistently(c.getUpdated, 2).Should(Equal(0)) wf.RemovePodHandler(h) - close(stop) }) }) diff --git a/go-controller/pkg/kube/kube.go b/go-controller/pkg/kube/kube.go index bf14e3fea5..0f375b4cf3 100644 --- a/go-controller/pkg/kube/kube.go +++ b/go-controller/pkg/kube/kube.go @@ -1,15 +1,11 @@ package kube import ( - "fmt" - "strings" - "encoding/json" "github.com/sirupsen/logrus" kapi "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" kv1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -18,23 +14,15 @@ import ( // Interface represents the exported methods for dealing with getting/setting // kubernetes resources type Interface interface { - SetAnnotationOnPod(pod *kapi.Pod, key, value string) error SetAnnotationsOnPod(pod *kapi.Pod, annotations map[string]string) error - SetAnnotationOnNode(node *kapi.Node, key, value string) error SetAnnotationsOnNode(node *kapi.Node, annotations map[string]interface{}) error UpdateNodeStatus(node *kapi.Node) error GetAnnotationsOnPod(namespace, name string) (map[string]string, error) - GetPod(namespace, name string) (*kapi.Pod, error) - GetPods(namespace string) (*kapi.PodList, error) - GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error) GetNodes() (*kapi.NodeList, error) GetNode(name string) (*kapi.Node, error) - GetService(namespace, name string) (*kapi.Service, error) - GetEndpoints(namespace string) (*kapi.EndpointsList, error) GetEndpoint(namespace, name string) (*kapi.Endpoints, error) CreateEndpoint(namespace string, ep *kapi.Endpoints) (*kapi.Endpoints, error) UpdateEndpoint(namespace string, ep *kapi.Endpoints) (*kapi.Endpoints, error) - GetNamespaces() (*kapi.NamespaceList, error) Events() kv1core.EventInterface } @@ -43,19 +31,6 @@ type Kube struct { KClient kubernetes.Interface } -// SetAnnotationOnPod takes the pod object and key/value string pair to set it as an annotation -func (k *Kube) SetAnnotationOnPod(pod *kapi.Pod, key, value string) error { - logrus.Infof("Setting annotations %s=%s on pod %s", key, value, pod.Name) - // escape double quotes in the annotation value so it can be sent as a JSON patch - value = strings.Replace(value, "\"", "\\\"", -1) - patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value) - _, err := k.KClient.CoreV1().Pods(pod.Namespace).Patch(pod.Name, types.MergePatchType, []byte(patchData)) - if err != nil { - logrus.Errorf("Error in setting annotation on pod %s/%s: %v", pod.Name, pod.Namespace, err) - } - return err -} - // SetAnnotationsOnPod takes the pod object and map of key/value string pairs to set as annotations func (k *Kube) SetAnnotationsOnPod(pod *kapi.Pod, annotations map[string]string) error { var err error @@ -83,17 +58,6 @@ func (k *Kube) SetAnnotationsOnPod(pod *kapi.Pod, annotations map[string]string) return err } -// SetAnnotationOnNode takes the node object and key/value string pair to set it as an annotation -func (k *Kube) SetAnnotationOnNode(node *kapi.Node, key, value string) error { - logrus.Infof("Setting annotations %s=%s on node %s", key, value, node.Name) - patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value) - _, err := k.KClient.CoreV1().Nodes().Patch(node.Name, types.MergePatchType, []byte(patchData)) - if err != nil { - logrus.Errorf("Error in setting annotation on node %s: %v", node.Name, err) - } - return err -} - // SetAnnotationsOnNode takes the node object and map of key/value string pairs to set as annotations func (k *Kube) SetAnnotationsOnNode(node *kapi.Node, annotations map[string]interface{}) error { var err error @@ -139,24 +103,6 @@ func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, e return pod.ObjectMeta.Annotations, nil } -// GetPod obtains the Pod resource from kubernetes apiserver, given the name and namespace -func (k *Kube) GetPod(namespace, name string) (*kapi.Pod, error) { - return k.KClient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) -} - -// GetPods obtains the Pod resource from kubernetes apiserver, given the name and namespace -func (k *Kube) GetPods(namespace string) (*kapi.PodList, error) { - return k.KClient.CoreV1().Pods(namespace).List(metav1.ListOptions{}) -} - -// GetPodsByLabels obtains the Pod resources from kubernetes apiserver, -// given the namespace and label -func (k *Kube) GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error) { - options := metav1.ListOptions{} - options.LabelSelector = selector.String() - return k.KClient.CoreV1().Pods(namespace).List(options) -} - // GetNodes returns the list of all Node objects from kubernetes func (k *Kube) GetNodes() (*kapi.NodeList, error) { return k.KClient.CoreV1().Nodes().List(metav1.ListOptions{}) @@ -167,17 +113,6 @@ func (k *Kube) GetNode(name string) (*kapi.Node, error) { return k.KClient.CoreV1().Nodes().Get(name, metav1.GetOptions{}) } -// GetService returns the Service resource from kubernetes apiserver, given its name and namespace -func (k *Kube) GetService(namespace, name string) (*kapi.Service, error) { - return k.KClient.CoreV1().Services(namespace).Get(name, metav1.GetOptions{}) -} - -// GetEndpoints returns all the Endpoint resources from kubernetes -// apiserver, given namespace -func (k *Kube) GetEndpoints(namespace string) (*kapi.EndpointsList, error) { - return k.KClient.CoreV1().Endpoints(namespace).List(metav1.ListOptions{}) -} - // GetEndpoint returns the Endpoints resource func (k *Kube) GetEndpoint(namespace, name string) (*kapi.Endpoints, error) { return k.KClient.CoreV1().Endpoints(namespace).Get(name, metav1.GetOptions{}) @@ -193,11 +128,6 @@ func (k *Kube) UpdateEndpoint(namespace string, ep *kapi.Endpoints) (*kapi.Endpo return k.KClient.CoreV1().Endpoints(namespace).Update(ep) } -// GetNamespaces returns all Namespace resource from kubernetes apiserver -func (k *Kube) GetNamespaces() (*kapi.NamespaceList, error) { - return k.KClient.CoreV1().Namespaces().List(metav1.ListOptions{}) -} - // Events returns events to use when creating an EventSinkImpl func (k *Kube) Events() kv1core.EventInterface { return k.KClient.CoreV1().Events("") diff --git a/go-controller/pkg/ovn/ha_master.go b/go-controller/pkg/ovn/ha_master.go index bc06f80414..984a7be340 100644 --- a/go-controller/pkg/ovn/ha_master.go +++ b/go-controller/pkg/ovn/ha_master.go @@ -39,14 +39,12 @@ type HAMasterController struct { isLeader bool leaderElector *leaderelection.LeaderElector stopChan chan struct{} - nodeSelector *metav1.LabelSelector } // NewHAMasterController creates a new HA Master controller func NewHAMasterController(kubeClient kubernetes.Interface, wf *factory.WatchFactory, nodeName string, stopChan chan struct{}, - hybridOverlayClusterSubnets []config.CIDRNetworkEntry, - nodeSelector *metav1.LabelSelector) *HAMasterController { + hybridOverlayClusterSubnets []config.CIDRNetworkEntry) *HAMasterController { ovnController := NewOvnController(kubeClient, wf, hybridOverlayClusterSubnets) return &HAMasterController{ kubeClient: kubeClient, @@ -56,7 +54,6 @@ func NewHAMasterController(kubeClient kubernetes.Interface, wf *factory.WatchFac isLeader: false, leaderElector: nil, stopChan: stopChan, - nodeSelector: nodeSelector, } } @@ -206,7 +203,7 @@ func (hacontroller *HAMasterController) ConfigureAsActive(masterNodeName string) return err } - return hacontroller.ovnController.Run(hacontroller.nodeSelector, hacontroller.stopChan) + return hacontroller.ovnController.Run(hacontroller.stopChan) } //updateOvnDbEndpoints Updates the ovnkube-db endpoints. Should be called diff --git a/go-controller/pkg/ovn/master.go b/go-controller/pkg/ovn/master.go index 5aff381f75..12adeb8d5e 100644 --- a/go-controller/pkg/ovn/master.go +++ b/go-controller/pkg/ovn/master.go @@ -312,10 +312,11 @@ func (oc *Controller) syncNodeManagementPort(node *kapi.Node, subnet *net.IPNet) return err } - if err := util.UpdateNodeSwitchExcludeIPs(node.Name, subnet); err != nil { + if err := addAllowACLFromNode(node.Name, portIP.IP); err != nil { return err } - if err := addAllowACLFromNode(node.Name, portIP.IP); err != nil { + + if err := util.UpdateNodeSwitchExcludeIPs(node.Name, subnet); err != nil { return err } @@ -549,14 +550,22 @@ func (oc *Controller) ensureNodeLogicalNetwork(nodeName string, hostsubnet *net. return err } - // Create a logical switch and set its subnet. If all cluster subnets are - // big enough (/24 or greater), exclude the hybrid overlay port IP (even - // if hybrid overlay is not enabled) to allow enabling hybrid overlay - // in a running cluster without disrupting nodes. - var excludeIPs string + // Create a logical switch and set its subnet. + ocSubnet := "other-config:subnet=" + hostsubnet.String() + if config.IPv6Mode { + ocSubnet = "other-config:ipv6_prefix=" + hostsubnet.IP.String() + } + args := []string{ + "--", "--may-exist", "ls-add", nodeName, + "--", "set", "logical_switch", nodeName, ocSubnet, + } if !config.IPv6Mode { - excludeIPs = "other-config:exclude_ips=" + secondIP.IP.String() + excludeIPs := "other-config:exclude_ips=" + secondIP.IP.String() + // If all cluster subnets are big enough (/24 or greater), exclude + // the hybrid overlay port IP (even if hybrid overlay is not enabled) + // to allow enabling hybrid overlay in a running cluster without + // disrupting nodes. excludeHybridOverlayIP := true for _, clusterEntry := range config.Default.ClusterSubnets { if clusterEntry.HostSubnetLength > 24 { @@ -568,13 +577,9 @@ func (oc *Controller) ensureNodeLogicalNetwork(nodeName string, hostsubnet *net. thirdIP := util.NextIP(secondIP.IP) excludeIPs += ".." + thirdIP.String() } + args = append(args, excludeIPs) } - - // Create a logical switch and set its subnet. - stdout, stderr, err := util.RunOVNNbctl("--", "--may-exist", "ls-add", nodeName, - "--", "set", "logical_switch", nodeName, "other-config:subnet="+hostsubnet.String(), - excludeIPs, - "external-ids:gateway_ip="+firstIP.String()) + stdout, stderr, err := util.RunOVNNbctl(args...) if err != nil { logrus.Errorf("Failed to create a logical switch %v, stdout: %q, stderr: %q, error: %v", nodeName, stdout, stderr, err) return err @@ -850,9 +855,15 @@ func (oc *Controller) syncNodes(nodes []interface{}) { // watchNodes() will be called for all existing nodes at startup anyway. // Note that this list will include the 'join' cluster switch, which we // do not want to delete. + var subnetAttr string + if config.IPv6Mode { + subnetAttr = "ipv6_prefix" + } else { + subnetAttr = "subnet" + } nodeSwitches, stderr, err := util.RunOVNNbctl("--data=bare", "--no-heading", "--columns=name,other-config", "find", "logical_switch", - fmt.Sprintf("%s!=_", config.OtherConfigSubnet())) + "other-config:"+subnetAttr+"!=_") if err != nil { logrus.Errorf("Failed to get node logical switches: stderr: %q, error: %v", stderr, err) @@ -882,10 +893,13 @@ func (oc *Controller) syncNodes(nodes []interface{}) { } var subnet *net.IPNet - configs := strings.Fields(items[1]) - for _, config := range configs { - if strings.HasPrefix(config, "subnet=") { - subnetStr := strings.TrimPrefix(config, "subnet=") + attrs := strings.Fields(items[1]) + for _, attr := range attrs { + if strings.HasPrefix(attr, subnetAttr+"=") { + subnetStr := strings.TrimPrefix(attr, subnetAttr+"=") + if config.IPv6Mode { + subnetStr += "/64" + } _, subnet, _ = net.ParseCIDR(subnetStr) break } diff --git a/go-controller/pkg/ovn/master_test.go b/go-controller/pkg/ovn/master_test.go index b374bfe183..415c273b9e 100644 --- a/go-controller/pkg/ovn/master_test.go +++ b/go-controller/pkg/ovn/master_test.go @@ -120,13 +120,14 @@ func defaultFakeExec(nodeSubnet, nodeName string) (*ovntest.FakeExec, string, st }) fexec.AddFakeCmdsNoOutputNoError([]string{ "ovn-nbctl --timeout=15 --may-exist lrp-add ovn_cluster_router rtos-" + nodeName + " " + lrpMAC + " " + gwCIDR, - "ovn-nbctl --timeout=15 -- --may-exist ls-add " + nodeName + " -- set logical_switch " + nodeName + " other-config:subnet=" + nodeSubnet + " other-config:exclude_ips=" + nodeMgmtPortIP.String() + ".." + hybridOverlayIP.String() + " external-ids:gateway_ip=" + gwCIDR, + "ovn-nbctl --timeout=15 -- --may-exist ls-add " + nodeName + " -- set logical_switch " + nodeName + " other-config:subnet=" + nodeSubnet + " other-config:exclude_ips=" + nodeMgmtPortIP.String() + ".." + hybridOverlayIP.String(), "ovn-nbctl --timeout=15 set logical_switch " + nodeName + " other-config:mcast_snoop=\"true\"", "ovn-nbctl --timeout=15 set logical_switch " + nodeName + " other-config:mcast_querier=\"true\" other-config:mcast_eth_src=\"" + lrpMAC + "\" other-config:mcast_ip4_src=\"" + gwIP + "\"", "ovn-nbctl --timeout=15 -- --may-exist lsp-add " + nodeName + " stor-" + nodeName + " -- set logical_switch_port stor-" + nodeName + " type=router options:router-port=rtos-" + nodeName + " addresses=\"" + lrpMAC + "\"", "ovn-nbctl --timeout=15 set logical_switch " + nodeName + " load_balancer=" + tcpLBUUID, "ovn-nbctl --timeout=15 add logical_switch " + nodeName + " load_balancer " + udpLBUUID, "ovn-nbctl --timeout=15 -- --may-exist lsp-add " + nodeName + " k8s-" + nodeName + " -- lsp-set-addresses " + "k8s-" + nodeName + " " + mgmtMAC + " " + nodeMgmtPortIP.String(), + "ovn-nbctl --timeout=15 --may-exist acl-add " + nodeName + " to-lport 1001 ip4.src==" + nodeMgmtPortIP.String() + " allow-related", }) fexec.AddFakeCmd(&ovntest.ExpectedCmd{ Cmd: "ovn-nbctl --timeout=15 lsp-list " + nodeName, @@ -134,7 +135,6 @@ func defaultFakeExec(nodeSubnet, nodeName string) (*ovntest.FakeExec, string, st }) fexec.AddFakeCmdsNoOutputNoError([]string{ "ovn-nbctl --timeout=15 -- --if-exists set logical_switch " + nodeName + " other-config:exclude_ips=" + hybridOverlayIP.String(), - "ovn-nbctl --timeout=15 --may-exist acl-add " + nodeName + " to-lport 1001 ip4.src==" + nodeMgmtPortIP.String() + " allow-related", }) return fexec, tcpLBUUID, udpLBUUID @@ -232,7 +232,7 @@ var _ = Describe("Master Operations", func() { err = clusterController.StartClusterMaster("master") Expect(err).NotTo(HaveOccurred()) - err = clusterController.WatchNodes(nil) + err = clusterController.WatchNodes() Expect(err).NotTo(HaveOccurred()) Expect(fexec.CalledMatchesExpected()).To(BeTrue(), fexec.ErrorDesc) @@ -306,7 +306,7 @@ var _ = Describe("Master Operations", func() { err = clusterController.StartClusterMaster("master") Expect(err).NotTo(HaveOccurred()) - err = clusterController.WatchNodes(nil) + err = clusterController.WatchNodes() Expect(err).NotTo(HaveOccurred()) Expect(fexec.CalledMatchesExpected()).To(BeTrue(), fexec.ErrorDesc) @@ -341,8 +341,8 @@ var _ = Describe("Master Operations", func() { masterGWCIDR string = "10.128.2.1/24" masterMgmtPortIP string = "10.128.2.2" lrpMAC string = "0A:58:0A:80:02:01" - masterMgmtPortMAC string = "00:00:00:55:66:77" masterHOPortIP string = "10.128.2.3" + masterMgmtPortMAC string = "00:00:00:55:66:77" ) fexec := ovntest.NewFakeExec() @@ -400,11 +400,12 @@ subnet=%s // Kubernetes API nodes fexec.AddFakeCmdsNoOutputNoError([]string{ "ovn-nbctl --timeout=15 --may-exist lrp-add ovn_cluster_router rtos-" + masterName + " " + lrpMAC + " " + masterGWCIDR, - "ovn-nbctl --timeout=15 -- --may-exist ls-add " + masterName + " -- set logical_switch " + masterName + " other-config:subnet=" + masterSubnet + " other-config:exclude_ips=" + masterMgmtPortIP + ".." + masterHOPortIP + " external-ids:gateway_ip=" + masterGWCIDR, + "ovn-nbctl --timeout=15 -- --may-exist ls-add " + masterName + " -- set logical_switch " + masterName + " other-config:subnet=" + masterSubnet + " other-config:exclude_ips=" + masterMgmtPortIP + ".." + masterHOPortIP, "ovn-nbctl --timeout=15 -- --may-exist lsp-add " + masterName + " stor-" + masterName + " -- set logical_switch_port stor-" + masterName + " type=router options:router-port=rtos-" + masterName + " addresses=\"" + lrpMAC + "\"", "ovn-nbctl --timeout=15 set logical_switch " + masterName + " load_balancer=" + tcpLBUUID, "ovn-nbctl --timeout=15 add logical_switch " + masterName + " load_balancer " + udpLBUUID, "ovn-nbctl --timeout=15 -- --may-exist lsp-add " + masterName + " k8s-" + masterName + " -- lsp-set-addresses " + "k8s-" + masterName + " " + masterMgmtPortMAC + " " + masterMgmtPortIP, + "ovn-nbctl --timeout=15 --may-exist acl-add " + masterName + " to-lport 1001 ip4.src==" + masterMgmtPortIP + " allow-related", }) fexec.AddFakeCmd(&ovntest.ExpectedCmd{ Cmd: "ovn-nbctl --timeout=15 lsp-list " + masterName, @@ -412,7 +413,6 @@ subnet=%s }) fexec.AddFakeCmdsNoOutputNoError([]string{ "ovn-nbctl --timeout=15 -- --if-exists set logical_switch " + masterName + " other-config:exclude_ips=" + masterHOPortIP, - "ovn-nbctl --timeout=15 --may-exist acl-add " + masterName + " to-lport 1001 ip4.src==" + masterMgmtPortIP + " allow-related", }) cleanupGateway(fexec, masterName, masterSubnet, masterGWCIDR, masterMgmtPortIP) @@ -463,7 +463,7 @@ subnet=%s _ = clusterController.joinSubnetAllocator.AddNetworkRange("100.64.0.0/16", 3) // Let the real code run and ensure OVN database sync - err = clusterController.WatchNodes(nil) + err = clusterController.WatchNodes() Expect(err).NotTo(HaveOccurred()) Expect(fexec.CalledMatchesExpected()).To(BeTrue(), fexec.ErrorDesc) @@ -569,11 +569,12 @@ var _ = Describe("Gateway Init Operations", func() { fexec.AddFakeCmdsNoOutputNoError([]string{ "ovn-nbctl --timeout=15 --may-exist lrp-add ovn_cluster_router rtos-" + nodeName + " " + nodeLRPMAC + " " + masterGWCIDR, - "ovn-nbctl --timeout=15 -- --may-exist ls-add " + nodeName + " -- set logical_switch " + nodeName + " other-config:subnet=" + nodeSubnet + " other-config:exclude_ips=" + masterMgmtPortIP + ".." + masterHOPortIP + " external-ids:gateway_ip=" + masterGWCIDR, + "ovn-nbctl --timeout=15 -- --may-exist ls-add " + nodeName + " -- set logical_switch " + nodeName + " other-config:subnet=" + nodeSubnet + " other-config:exclude_ips=" + masterMgmtPortIP + ".." + masterHOPortIP, "ovn-nbctl --timeout=15 -- --may-exist lsp-add " + nodeName + " stor-" + nodeName + " -- set logical_switch_port stor-" + nodeName + " type=router options:router-port=rtos-" + nodeName + " addresses=\"" + nodeLRPMAC + "\"", "ovn-nbctl --timeout=15 set logical_switch " + nodeName + " load_balancer=" + tcpLBUUID, "ovn-nbctl --timeout=15 add logical_switch " + nodeName + " load_balancer " + udpLBUUID, "ovn-nbctl --timeout=15 -- --may-exist lsp-add " + nodeName + " k8s-" + nodeName + " -- lsp-set-addresses " + "k8s-" + nodeName + " " + brLocalnetMAC + " " + masterMgmtPortIP, + "ovn-nbctl --timeout=15 --may-exist acl-add " + nodeName + " to-lport 1001 ip4.src==" + masterMgmtPortIP + " allow-related", }) fexec.AddFakeCmd(&ovntest.ExpectedCmd{ Cmd: "ovn-nbctl --timeout=15 lsp-list " + nodeName, @@ -581,7 +582,6 @@ var _ = Describe("Gateway Init Operations", func() { }) fexec.AddFakeCmdsNoOutputNoError([]string{ "ovn-nbctl --timeout=15 -- --if-exists set logical_switch " + nodeName + " other-config:exclude_ips=" + masterHOPortIP, - "ovn-nbctl --timeout=15 --may-exist acl-add " + nodeName + " to-lport 1001 ip4.src==" + masterMgmtPortIP + " allow-related", }) joinSwitch := "join_" + nodeName fexec.AddFakeCmdsNoOutputNoError([]string{ @@ -674,7 +674,7 @@ var _ = Describe("Gateway Init Operations", func() { _ = clusterController.joinSubnetAllocator.AddNetworkRange("100.64.0.0/16", 3) // Let the real code run and ensure OVN database sync - err = clusterController.WatchNodes(nil) + err = clusterController.WatchNodes() Expect(err).NotTo(HaveOccurred()) _, subnet, err := net.ParseCIDR(nodeSubnet) @@ -767,11 +767,12 @@ var _ = Describe("Gateway Init Operations", func() { fexec.AddFakeCmdsNoOutputNoError([]string{ "ovn-nbctl --timeout=15 --may-exist lrp-add ovn_cluster_router rtos-" + nodeName + " " + nodeLRPMAC + " " + nodeGWIP, - "ovn-nbctl --timeout=15 -- --may-exist ls-add " + nodeName + " -- set logical_switch " + nodeName + " other-config:subnet=" + nodeSubnet + " other-config:exclude_ips=" + nodeMgmtPortIP + ".." + nodeHOPortIP + " external-ids:gateway_ip=" + nodeGWIP, + "ovn-nbctl --timeout=15 -- --may-exist ls-add " + nodeName + " -- set logical_switch " + nodeName + " other-config:subnet=" + nodeSubnet + " other-config:exclude_ips=" + nodeMgmtPortIP + ".." + nodeHOPortIP, "ovn-nbctl --timeout=15 -- --may-exist lsp-add " + nodeName + " stor-" + nodeName + " -- set logical_switch_port stor-" + nodeName + " type=router options:router-port=rtos-" + nodeName + " addresses=\"" + nodeLRPMAC + "\"", "ovn-nbctl --timeout=15 set logical_switch " + nodeName + " load_balancer=" + tcpLBUUID, "ovn-nbctl --timeout=15 add logical_switch " + nodeName + " load_balancer " + udpLBUUID, "ovn-nbctl --timeout=15 -- --may-exist lsp-add " + nodeName + " k8s-" + nodeName + " -- lsp-set-addresses " + "k8s-" + nodeName + " " + nodeMgmtPortMAC + " " + nodeMgmtPortIP, + "ovn-nbctl --timeout=15 --may-exist acl-add " + nodeName + " to-lport 1001 ip4.src==" + nodeMgmtPortIP + " allow-related", }) fexec.AddFakeCmd(&ovntest.ExpectedCmd{ Cmd: "ovn-nbctl --timeout=15 lsp-list " + nodeName, @@ -780,7 +781,6 @@ var _ = Describe("Gateway Init Operations", func() { joinSwitch := "join_" + nodeName fexec.AddFakeCmdsNoOutputNoError([]string{ "ovn-nbctl --timeout=15 -- --if-exists set logical_switch " + nodeName + " other-config:exclude_ips=" + nodeHOPortIP, - "ovn-nbctl --timeout=15 --may-exist acl-add " + nodeName + " to-lport 1001 ip4.src==" + nodeMgmtPortIP + " allow-related", "ovn-nbctl --timeout=15 -- --may-exist lr-add " + gwRouter + " -- set logical_router " + gwRouter + " options:chassis=" + systemID + " external_ids:physical_ip=" + physicalGatewayIP, "ovn-nbctl --timeout=15 -- --may-exist ls-add " + joinSwitch, "ovn-nbctl --timeout=15 -- --may-exist lsp-add " + joinSwitch + " jtor-" + gwRouter + " -- set logical_switch_port jtor-" + gwRouter + " type=router options:router-port=rtoj-" + gwRouter + " addresses=router", @@ -876,7 +876,7 @@ var _ = Describe("Gateway Init Operations", func() { _ = clusterController.joinSubnetAllocator.AddNetworkRange("100.64.0.0/16", 3) // Let the real code run and ensure OVN database sync - err = clusterController.WatchNodes(nil) + err = clusterController.WatchNodes() Expect(err).NotTo(HaveOccurred()) _, subnet, err := net.ParseCIDR(nodeSubnet) diff --git a/go-controller/pkg/ovn/ovn.go b/go-controller/pkg/ovn/ovn.go index ddf165e2a3..bb105debc9 100644 --- a/go-controller/pkg/ovn/ovn.go +++ b/go-controller/pkg/ovn/ovn.go @@ -18,6 +18,7 @@ import ( kapi "k8s.io/api/core/v1" kapisnetworking "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -128,40 +129,45 @@ const ( // NewOvnController creates a new OVN controller for creating logical network // infrastructure and policy func NewOvnController(kubeClient kubernetes.Interface, wf *factory.WatchFactory, hybridOverlayClusterSubnets []config.CIDRNetworkEntry) *Controller { - return &Controller{ - kube: &kube.Kube{KClient: kubeClient}, - watchFactory: wf, - masterSubnetAllocator: allocator.NewSubnetAllocator(), - logicalSwitchCache: make(map[string]*net.IPNet), - joinSubnetAllocator: allocator.NewSubnetAllocator(), - logicalPortCache: make(map[string]string), - logicalPortUUIDCache: make(map[string]string), - namespaceAddressSet: make(map[string]map[string]string), - namespacePolicies: make(map[string]map[string]*namespacePolicy), - namespaceMutex: make(map[string]*sync.Mutex), - namespaceMutexMutex: sync.Mutex{}, - lspIngressDenyCache: make(map[string]int), - lspEgressDenyCache: make(map[string]int), - lspMutex: &sync.Mutex{}, - lsMutex: &sync.Mutex{}, - loadbalancerClusterCache: make(map[string]string), - loadbalancerGWCache: make(map[string]string), - multicastEnabled: make(map[string]bool), - multicastSupport: config.EnableMulticast, - serviceVIPToName: make(map[ServiceVIPKey]types.NamespacedName), - serviceVIPToNameLock: sync.Mutex{}, - hybridOverlayClusterSubnets: hybridOverlayClusterSubnets, + oc := &Controller{ + kube: &kube.Kube{KClient: kubeClient}, + watchFactory: wf, + masterSubnetAllocator: allocator.NewSubnetAllocator(), + logicalSwitchCache: make(map[string]*net.IPNet), + joinSubnetAllocator: allocator.NewSubnetAllocator(), + logicalPortCache: make(map[string]string), + logicalPortUUIDCache: make(map[string]string), + namespaceAddressSet: make(map[string]map[string]string), + namespacePolicies: make(map[string]map[string]*namespacePolicy), + namespaceMutex: make(map[string]*sync.Mutex), + namespaceMutexMutex: sync.Mutex{}, + lspIngressDenyCache: make(map[string]int), + lspEgressDenyCache: make(map[string]int), + lspMutex: &sync.Mutex{}, + lsMutex: &sync.Mutex{}, + loadbalancerClusterCache: make(map[string]string), + loadbalancerGWCache: make(map[string]string), + multicastEnabled: make(map[string]bool), + multicastSupport: config.EnableMulticast, + serviceVIPToName: make(map[ServiceVIPKey]types.NamespacedName), + serviceVIPToNameLock: sync.Mutex{}, } + oc.hybridOverlayClusterSubnets = hybridOverlayClusterSubnets + return oc } // Run starts the actual watching. -func (oc *Controller) Run(nodeSelector *metav1.LabelSelector, stopChan chan struct{}) error { +func (oc *Controller) Run(stopChan chan struct{}) error { + // Setting debug log level during node bring up to expose bring up process. + // Log level is returned to configured value when bring up is complete. + logrus.SetLevel(5) + startOvnUpdater() // WatchNodes must be started first so that its initial Add will // create all node logical switches, which other watches may depend on. // https://github.com/ovn-org/ovn-kubernetes/pull/859 - if err := oc.WatchNodes(nodeSelector); err != nil { + if err := oc.WatchNodes(); err != nil { return err } @@ -503,12 +509,21 @@ func (oc *Controller) syncNodeGateway(node *kapi.Node, subnet *net.IPNet) error // WatchNodes starts the watching of node resource and calls // back the appropriate handler logic -func (oc *Controller) WatchNodes(nodeSelector *metav1.LabelSelector) error { +func (oc *Controller) WatchNodes() error { var gatewaysFailed sync.Map var mgmtPortFailed sync.Map - _, err := oc.watchFactory.AddFilteredNodeHandler(nodeSelector, cache.ResourceEventHandlerFuncs{ + _, err := oc.watchFactory.AddNodeHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { node := obj.(*kapi.Node) + + if noHostSubnet := noHostSubnet(node); noHostSubnet { + oc.lsMutex.Lock() + defer oc.lsMutex.Unlock() + //setting the value to nil in the cache means it was not assigned a hostSubnet by ovn-kube + oc.logicalSwitchCache[node.Name] = nil + return + } + logrus.Debugf("Added event for Node %q", node.Name) hostSubnet, err := oc.addNode(node) if err != nil { @@ -530,6 +545,16 @@ func (oc *Controller) WatchNodes(nodeSelector *metav1.LabelSelector) error { UpdateFunc: func(old, new interface{}) { oldNode := old.(*kapi.Node) node := new.(*kapi.Node) + + shouldUpdate, err := shouldUpdate(node, oldNode) + if err != nil { + logrus.Errorf(err.Error()) + } + if !shouldUpdate { + // the hostsubnet is not assigned by ovn-kubernetes + return + } + logrus.Debugf("Updated event for Node %q", node.Name) _, failed := mgmtPortFailed.Load(node.Name) @@ -616,3 +641,32 @@ func macAddressChanged(oldNode, node *kapi.Node) bool { macAddress := node.Annotations[OvnNodeManagementPortMacAddress] return oldMacAddress != macAddress } + +// noHostSubnet() compares the no-hostsubenet-nodes flag with node labels to see if the node is manageing its +// own network. +func noHostSubnet(node *kapi.Node) bool { + if config.Kubernetes.NoHostSubnetNodes == nil { + return false + } + + nodeSelector, _ := metav1.LabelSelectorAsSelector(config.Kubernetes.NoHostSubnetNodes) + return nodeSelector.Matches(labels.Set(node.Labels)) +} + +// shouldUpdate() determines if the ovn-kubernetes plugin should update the state of the node. +// ovn-kube should not perform an update if it does not assign a hostsubnet, or if you want to change +// whether or not ovn-kubernetes assigns a hostsubnet +func shouldUpdate(node, oldNode *kapi.Node) (bool, error) { + newNoHostSubnet := noHostSubnet(node) + oldNoHostSubnet := noHostSubnet(oldNode) + + if oldNoHostSubnet && newNoHostSubnet { + return false, nil + } else if oldNoHostSubnet && !newNoHostSubnet { + return false, fmt.Errorf("error updating node %s, cannot remove assigned hostsubnet, please delete node and recreate.", node.Name) + } else if !oldNoHostSubnet && newNoHostSubnet { + return false, fmt.Errorf("error updating node %s, cannot assign a hostsubnet to already created node, please delete node and recreate.", node.Name) + } + + return true, nil +} diff --git a/go-controller/pkg/ovn/pods.go b/go-controller/pkg/ovn/pods.go index aad593ac0e..e330632f58 100644 --- a/go-controller/pkg/ovn/pods.go +++ b/go-controller/pkg/ovn/pods.go @@ -263,6 +263,11 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) error { var out, stderr string var err error + // If a node does node have an assigned hostsubnet don't wait for the logical switch to appear + if val, ok := oc.logicalSwitchCache[pod.Spec.NodeName]; ok && val == nil { + return nil + } + // Keep track of how long syncs take. start := time.Now() defer func() { @@ -341,13 +346,14 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) error { "stdout: %q, stderr: %q (%v)", portName, out, stderr, err) } - routes := []util.PodRoute{} - var gatewayIP net.IP - if gatewayIPnet != nil && len(oc.hybridOverlayClusterSubnets) > 0 { - gatewayIP = gatewayIPnet.IP + routes, gatewayIP, err := getRoutesGatewayIP(pod, gatewayIPnet) + if err != nil { + return err + } + if gatewayIP != nil && len(oc.hybridOverlayClusterSubnets) > 0 { // Get the 3rd address in the node's subnet; the first is taken // by the k8s-cluster-router port, the second by the management port - second := util.NextIP(gatewayIP) + second := util.NextIP(gatewayIPnet.IP) thirdIP := util.NextIP(second) for _, subnet := range oc.hybridOverlayClusterSubnets { routes = append(routes, util.PodRoute{ @@ -355,12 +361,8 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) error { NextHop: thirdIP, }) } - } else { - routes, gatewayIP, err = getRoutesGatewayIP(pod, gatewayIPnet) - if err != nil { - return err - } } + marshalledAnnotation, err := util.MarshalPodAnnotation(&util.PodAnnotation{ IP: podCIDR, MAC: podMac, diff --git a/go-controller/pkg/ovn/policy.go b/go-controller/pkg/ovn/policy.go index f0e40f9209..081a2b41ad 100644 --- a/go-controller/pkg/ovn/policy.go +++ b/go-controller/pkg/ovn/policy.go @@ -881,6 +881,16 @@ func (oc *Controller) deleteNetworkPolicyPortGroup( // Mark the policy as deleted. np.deleted = true + // We should now stop all the handlers go routines. + oc.shutdownHandlers(np) + + for logicalPort := range np.localPods { + oc.localPodDelDefaultDeny(policy, logicalPort) + } + + // Delete the port group + deletePortGroup(np.portGroupName) + // Go through each ingress rule. For each ingress rule, delete the // addressSet for the local peer pods. for i := range np.ingressPolicies { @@ -898,15 +908,5 @@ func (oc *Controller) deleteNetworkPolicyPortGroup( deleteAddressSet(hashedAddressSet) } - // We should now stop all the handlers go routines. - oc.shutdownHandlers(np) - - for logicalPort := range np.localPods { - oc.localPodDelDefaultDeny(policy, logicalPort) - } - - // Delete the port group - deletePortGroup(np.portGroupName) - oc.namespacePolicies[policy.Namespace][policy.Name] = nil } diff --git a/go-controller/pkg/util/kube.go b/go-controller/pkg/util/kube.go index 97f1d51f94..088baff424 100644 --- a/go-controller/pkg/util/kube.go +++ b/go-controller/pkg/util/kube.go @@ -3,7 +3,6 @@ package util import ( "encoding/json" "fmt" - "net" "strings" kapi "k8s.io/api/core/v1" @@ -11,7 +10,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/cert" - "k8s.io/klog" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/cni/types" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" @@ -101,31 +99,41 @@ func ExtractDbRemotesFromEndpoint(ep *kapi.Endpoints) ([]string, int32, int32, e return masterIPList, sbDBPort, nbDBPort, nil } -func GetNodeIP(nodeName string) (string, error) { - ip := net.ParseIP(nodeName) - if ip == nil { - addrs, err := net.LookupIP(nodeName) - if err != nil { - return "", fmt.Errorf("Failed to lookup IP address for node %s: %v", nodeName, err) +// GetNodeIP extracts the ip address from the node status in the API +func GetNodeIP(node *kapi.Node) (string, error) { + for _, addr := range node.Status.Addresses { + if addr.Type == kapi.NodeInternalIP { + return addr.Address, nil } - for _, addr := range addrs { - // Skip loopback addrs - if addr.IsLoopback() { - klog.V(5).Infof("Skipping loopback addr: %q for node %s", addr.String(), nodeName) - continue - } - ip = addr - break + } + for _, addr := range node.Status.Addresses { + if addr.Type == kapi.NodeExternalIP { + return addr.Address, nil } - } else if ip.IsLoopback() { - klog.V(5).Infof("Skipping loopback addr: %q for node %s", ip.String(), nodeName) - ip = nil } + return "", fmt.Errorf("%s doesn't have an address with type %s or %s", node.GetName(), + kapi.NodeInternalIP, kapi.NodeExternalIP) +} - if ip == nil || len(ip.String()) == 0 { - return "", fmt.Errorf("Failed to obtain IP address from node name: %s", nodeName) +// GetNodeHostame extracts the hostname from the node status in the API +func GetNodeHostname(node *kapi.Node) (string, error) { + for _, addr := range node.Status.Addresses { + if addr.Type == kapi.NodeHostName { + return addr.Address, nil + } + } + for _, addr := range node.Status.Addresses { + if addr.Type == kapi.NodeExternalDNS { + return addr.Address, nil + } + } + for _, addr := range node.Status.Addresses { + if addr.Type == kapi.NodeInternalDNS { + return addr.Address, nil + } } - return ip.String(), nil + return "", fmt.Errorf("%s doesn't have an address with type %s, %s or %s", node.GetName(), + kapi.NodeHostName, kapi.NodeExternalDNS, kapi.NodeInternalDNS) } const (