diff --git a/.github/actions/install-prometheus/values.yaml b/.github/actions/install-prometheus/values.yaml index d11ad8329f..f38906248d 100644 --- a/.github/actions/install-prometheus/values.yaml +++ b/.github/actions/install-prometheus/values.yaml @@ -41,6 +41,7 @@ kubelet: scrape: enabled prometheus: prometheusSpec: + maximumStartupDurationSeconds: 60 tolerations: - key: CriticalAddonsOnly operator: Exists diff --git a/.github/workflows/aws.yaml b/.github/workflows/aws.yaml new file mode 100644 index 0000000000..4991a70195 --- /dev/null +++ b/.github/workflows/aws.yaml @@ -0,0 +1,100 @@ +name: Build and Push AWS Karpenter Provider Image + +on: + workflow_dispatch: + push: + branches: + - main + - release-* + +jobs: + build: + runs-on: ubuntu-latest + + env: + KO_DOCKER_REPO: docker.io/inftyai/karpenter-provider-aws + + steps: + - name: Checkout forked karpenter + uses: actions/checkout@v4 + + - name: Set up Go 1.24 + uses: actions/setup-go@v5 + with: + go-version: "1.24" + + - name: Generate commit info and image tag + id: tag + run: | + BRANCH="${GITHUB_REF##*/}" + COMMIT=$(git rev-parse HEAD) + TIMESTAMP=$(git show -s --format=%ct "$COMMIT") + VERSION_DATE=$(date -u -d "@$TIMESTAMP" +'%Y%m%d%H%M%S') + PSEUDO_VERSION="v0.0.0-${VERSION_DATE}-${COMMIT:0:12}" + + if [[ "$BRANCH" == "main" ]]; then + TAG="latest" + IMAGE_TAG="latest" + elif [[ "$BRANCH" == release-* ]]; then + TAG="${BRANCH#release-}" # e.g. v0.36.2 + IMAGE_TAG="${TAG#v}" # e.g. 0.36.2 + else + TAG="fork-${PSEUDO_VERSION}" + IMAGE_TAG="${TAG}" # keep full tag + fi + + { + echo "commit=$COMMIT" + echo "version=$PSEUDO_VERSION" + echo "tag=$TAG" + echo "image_tag=$IMAGE_TAG" + } >> "$GITHUB_OUTPUT" + echo "✅ Using image tag: $IMAGE_TAG" + + - name: Clone karpenter-provider-aws + run: | + git clone https://github.com/aws/karpenter-provider-aws.git + cd karpenter-provider-aws + + TAG="${{ steps.tag.outputs.tag }}" + if [[ "$TAG" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then + echo "🔄 Attempting to checkout provider tag: $TAG" + if git rev-parse "refs/tags/$TAG" >/dev/null 2>&1; then + git checkout "tags/$TAG" -b "build-from-tag-$TAG" + else + echo "❌ Tag '$TAG' not found in karpenter-provider-aws repo." + exit 1 + fi + else + echo "🔄 Checking out provider branch: main" + git checkout main + fi + + - name: Replace karpenter module with forked commit version + run: | + cd karpenter-provider-aws + go mod edit -replace sigs.k8s.io/karpenter=github.com/InftyAI/karpenter@${{ steps.tag.outputs.version }} + go mod tidy + + - name: Install build tools via make toolchain + run: | + cd karpenter-provider-aws + make toolchain + + - name: Login to DockerHub + uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 #v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Build and push image using ko + run: | + cd karpenter-provider-aws + ko build --bare \ + --tags ${{ steps.tag.outputs.image_tag }} \ + github.com/aws/karpenter-provider-aws/cmd/controller + + - name: Show pushed image + run: | + echo "✅ Image pushed to:" + echo "${{ env.KO_DOCKER_REPO }}:${{ steps.tag.outputs.image_tag }}" diff --git a/Makefile b/Makefile index 38dafb606b..2b1101f281 100644 --- a/Makefile +++ b/Makefile @@ -15,10 +15,10 @@ help: ## Display help presubmit: verify test licenses vulncheck ## Run all steps required for code to be checked in install-kwok: ## Install kwok provider - UNINSTALL_KWOK=false ./hack/install-kwok.sh + ./hack/install-kwok.sh uninstall-kwok: ## Uninstall kwok provider - UNINSTALL_KWOK=true ./hack/install-kwok.sh + UNINSTALL=true ./hack/install-kwok.sh build-with-kind: # build with kind assumes the image will be uploaded directly onto the kind control plane, without an image repository $(eval CONTROLLER_IMG=$(shell $(WITH_GOFLAGS) KO_DOCKER_REPO="$(KWOK_REPO)" ko build sigs.k8s.io/karpenter/kwok)) @@ -29,7 +29,7 @@ build: ## Build the Karpenter KWOK controller images using ko build $(eval CONTROLLER_IMG=$(shell $(WITH_GOFLAGS) KO_DOCKER_REPO="$(KWOK_REPO)" ko build -B sigs.k8s.io/karpenter/kwok)) $(eval IMG_REPOSITORY=$(shell echo $(CONTROLLER_IMG) | cut -d "@" -f 1 | cut -d ":" -f 1)) $(eval IMG_TAG=$(shell echo $(CONTROLLER_IMG) | cut -d "@" -f 1 | cut -d ":" -f 2 -s)) - $(eval IMG_DIGEST=$(shell echo $(CONTROLLER_IMG) | cut -d "@" -f 2)) + $(eval IMG_DIGEST=$(shell echo $(CONTROLLER_IMG) | grep -q "@" && echo $(CONTROLLER_IMG) | cut -d "@" -f 2 || echo "")) apply-with-kind: verify build-with-kind ## Deploy the kwok controller from the current state of your git repository into your ~/.kube/config cluster kubectl apply -f kwok/charts/crds diff --git a/README.md b/README.md index 5954e4b593..5fd8439b3e 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ Karpenter is a multi-cloud project with implementations by the following cloud p - [AWS](https://github.com/aws/karpenter-provider-aws) - [Azure](https://github.com/Azure/karpenter-provider-azure) - [AlibabaCloud](https://github.com/cloudpilot-ai/karpenter-provider-alibabacloud) +- [Bizfly Cloud](https://github.com/bizflycloud/karpenter-provider-bizflycloud) - [Cluster API](https://github.com/kubernetes-sigs/karpenter-provider-cluster-api) - [GCP](https://github.com/cloudpilot-ai/karpenter-provider-gcp) - [Proxmox](https://github.com/sergelogvinov/karpenter-provider-proxmox) diff --git a/go.mod b/go.mod index 535624a2bc..a0986f4ea3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module sigs.k8s.io/karpenter -go 1.24.2 +go 1.24.4 require ( github.com/Pallinder/go-randomdata v1.2.0 @@ -9,6 +9,7 @@ require ( github.com/docker/docker v28.2.2+incompatible github.com/go-logr/logr v1.4.3 github.com/imdario/mergo v0.3.16 + github.com/inftyai/llmaz v0.1.3 github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.37.0 @@ -18,8 +19,8 @@ require ( github.com/samber/lo v1.50.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/text v0.25.0 - golang.org/x/time v0.11.0 + golang.org/x/text v0.26.0 + golang.org/x/time v0.12.0 k8s.io/api v0.32.3 k8s.io/apiextensions-apiserver v0.32.3 k8s.io/apimachinery v0.32.3 @@ -36,12 +37,12 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/emicklei/go-restful/v3 v3.12.0 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/zapr v1.3.0 github.com/go-openapi/jsonpointer v0.21.0 // indirect - github.com/go-openapi/jsonreference v0.20.2 // indirect + github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -65,11 +66,11 @@ require ( github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/net v0.38.0 // indirect + golang.org/x/net v0.40.0 // indirect golang.org/x/oauth2 v0.24.0 // indirect - golang.org/x/sys v0.32.0 // indirect - golang.org/x/term v0.30.0 // indirect - golang.org/x/tools v0.31.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/term v0.32.0 // indirect + golang.org/x/tools v0.33.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/protobuf v1.36.6 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect @@ -77,7 +78,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect - sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect sigs.k8s.io/yaml v1.4.0 ) @@ -86,7 +87,8 @@ require ( github.com/google/btree v1.1.3 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect go.uber.org/automaxprocs v1.6.0 // indirect - golang.org/x/sync v0.14.0 // indirect + golang.org/x/sync v0.15.0 // indirect + sigs.k8s.io/lws v0.5.1 // indirect ) retract ( diff --git a/go.sum b/go.sum index a26216411c..2da602c424 100644 --- a/go.sum +++ b/go.sum @@ -9,15 +9,14 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docker/docker v28.2.2+incompatible h1:CjwRSksz8Yo4+RmQ339Dp/D2tGO5JxwYeqtMOEe0LDw= github.com/docker/docker v28.2.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= -github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= -github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.12.0 h1:y2DdzBAURM29NFF94q6RaY4vjIH1rtwDapwQtU84iWk= +github.com/emicklei/go-restful/v3 v3.12.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU= @@ -30,12 +29,10 @@ github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= -github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= -github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= -github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= -github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= +github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= +github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= @@ -62,6 +59,8 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/inftyai/llmaz v0.1.3 h1:TZFsSkwxiZ34EfP1j8/DCGbU12U82FejOZK4WhCyOVI= +github.com/inftyai/llmaz v0.1.3/go.mod h1:2nVFb5ptJfe6Qhha5qOcMQBKK5hLgccKBZcsl1FURME= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -70,11 +69,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= @@ -124,12 +120,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= @@ -153,34 +144,34 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= +golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= -golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= -golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg= +golang.org/x/term v0.32.0/go.mod h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= -golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= -golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= -golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= +golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU= -golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ= +golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc= +golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -196,7 +187,6 @@ gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSP gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/api v0.32.3 h1:Hw7KqxRusq+6QSplE3NYG4MBxZw1BZnq4aP4cJVINls= @@ -223,7 +213,11 @@ sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+ sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= -sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= -sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= +sigs.k8s.io/lws v0.5.1 h1:eaeMNkP0manRluQZLN32atoULaGrzP611gSLdFaHZs4= +sigs.k8s.io/lws v0.5.1/go.mod h1:qprXSTTFnfmPZY3V3sUfk6ZPmAodsdoKS8XVElJ9kN0= +sigs.k8s.io/randfill v0.0.0-20250304075658-069ef1bbf016 h1:kXv6kKdoEtedwuqMmkqhbkgvYKeycVbC8+iPCP9j5kQ= +sigs.k8s.io/randfill v0.0.0-20250304075658-069ef1bbf016/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= +sigs.k8s.io/structured-merge-diff/v4 v4.7.0 h1:qPeWmscJcXP0snki5IYF79Z8xrl8ETFxgMd7wez1XkI= +sigs.k8s.io/structured-merge-diff/v4 v4.7.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vtxXpaZnkPGWeqDfCps= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/hack/install-kwok.sh b/hack/install-kwok.sh index f9a4a6664f..bfd37c0134 100755 --- a/hack/install-kwok.sh +++ b/hack/install-kwok.sh @@ -24,15 +24,18 @@ mkdir ${BASE} # make the alphabet, so that we can set be flexible to the number of allowed partitions (inferred max at 26) alphabet=( {a..z} ) # set the number of partitions to 1. Currently only one partition is supported. -num_partitions=1 +: "${PARTITION_COUNT:=1}" +: "${UNINSTALL:=false}" +: "${CPU_RESOURCES:=2}" +: "${MEMORY_RESOURCES:=4Gi}" # allow it to schedule to critical addons, but not schedule onto kwok nodes. -cat < "${BASE}/tolerate-all.yaml" +mkdir "${BASE}/deployment" +cat < "${BASE}/deployment/tolerate-all.yaml" apiVersion: apps/v1 kind: Deployment metadata: name: kwok-controller - namespace: kube-system spec: template: spec: @@ -49,22 +52,65 @@ spec: operator: DoesNotExist EOF +cat < "${BASE}/deployment/resources.yaml" +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kwok-controller +spec: + template: + spec: + containers: + - name: kwok-controller + resources: + limits: + cpu: $CPU_RESOURCES + memory: $MEMORY_RESOURCES +EOF + +curl -s -o "${BASE}/deployment/deployment.yaml" "https://raw.githubusercontent.com/kubernetes-sigs/kwok/refs/tags/${KWOK_RELEASE}/kustomize/kwok/deployment.yaml" # TODO: Simplify the kustomize to only use one copy of the RBAC that all # controllers can use. -cat < "${BASE}/kustomization.yaml" +cat < "${BASE}/deployment/kustomization.yaml" apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization + namespace: kube-system images: - name: registry.k8s.io/kwok/kwok newTag: "${KWOK_RELEASE}" resources: - - "https://github.com/${KWOK_REPO}/kustomize/kwok?ref=${KWOK_RELEASE}" + - deployment.yaml patches: - path: tolerate-all.yaml + - path: resources.yaml + labels: + - includeSelectors: true + pairs: + app: kwok-controller +EOF + + +curl -s -o "${BASE}/service.yaml" "https://raw.githubusercontent.com/kubernetes-sigs/kwok/refs/tags/${KWOK_RELEASE}/kustomize/kwok/service.yaml" +curl -s -o "${BASE}/kwok.yaml" "https://raw.githubusercontent.com/kubernetes-sigs/kwok/refs/tags/${KWOK_RELEASE}/kustomize/kwok/kwok.yaml" +cat < "${BASE}/kustomization.yaml" + apiVersion: kustomize.config.k8s.io/v1beta1 + kind: Kustomization + namespace: kube-system + resources: + - service.yaml + - "https://github.com/${KWOK_REPO}/kustomize/crd?ref=${KWOK_RELEASE}" + - "https://github.com/${KWOK_REPO}/kustomize/rbac?ref=${KWOK_RELEASE}" + configMapGenerator: + - name: kwok + namespace: kube-system + options: + disableNameSuffixHash: true + files: + - kwok.yaml EOF # Create num_partitions -for ((i=0; i "${SUB_LET_DIR}/kustomization.yaml" - name: registry.k8s.io/kwok/kwok newTag: "${KWOK_RELEASE}" resources: - - ./../base + - ./../base/deployment nameSuffix: -${alphabet[i]} patches: - path: ${SUB_LET_DIR}/patch.yaml @@ -97,10 +143,11 @@ done cat < "${HOME_DIR}/kustomization.yaml" resources: + - ./base EOF # Create num_partitions -for ((i=0; i> "${HOME_DIR}/kustomization.yaml" done @@ -110,8 +157,8 @@ kubectl kustomize "${HOME_DIR}" > "${HOME_DIR}/kwok.yaml" # v0.4.0 added in stage CRDs which are necessary for pod/node initialization crdURL="https://github.com/${KWOK_REPO}/releases/download/${KWOK_RELEASE}/stage-fast.yaml" -# Set UNINSTALL_KWOK=true if you want to uninstall. -if [[ ${UNINSTALL_KWOK} = "true" ]] +# Set UNINSTALL=true if you want to uninstall. +if [[ ${UNINSTALL} = "true" ]] then kubectl delete -f ${crdURL} kubectl delete -f ${HOME_DIR}/kwok.yaml diff --git a/kwok/charts/templates/clusterrole.yaml b/kwok/charts/templates/clusterrole.yaml index 82a402e0f4..0b9b9a5d26 100644 --- a/kwok/charts/templates/clusterrole.yaml +++ b/kwok/charts/templates/clusterrole.yaml @@ -47,6 +47,9 @@ rules: - apiGroups: ["policy"] resources: ["poddisruptionbudgets"] verbs: ["get", "list", "watch"] + - apiGroups: ["llmaz.io"] + resources: ["openmodels"] + verbs: ["get", "list", "watch"] # Write - apiGroups: ["karpenter.sh"] resources: ["nodeclaims", "nodeclaims/status"] diff --git a/kwok/cloudprovider/cloudprovider.go b/kwok/cloudprovider/cloudprovider.go index 89567a7a2b..9765a216b3 100644 --- a/kwok/cloudprovider/cloudprovider.go +++ b/kwok/cloudprovider/cloudprovider.go @@ -68,6 +68,9 @@ func (c CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1 } return nil, fmt.Errorf("resolving node class from nodeclaim, %w", err) } + if status := nodeClass.StatusConditions().Get(status.ConditionReady); status.IsFalse() { + return nil, cloudprovider.NewNodeClassNotReadyError(stderrors.New(status.Message)) + } // Kick-off a goroutine to allow us to asynchronously register nodes // We're fine to leak this because failed registration can also happen in real providers go func() { @@ -78,10 +81,6 @@ func (c CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1 log.FromContext(ctx).Error(err, "failed creating node from nodeclaim") } }() - nodeClassReady := nodeClass.StatusConditions().Get(status.ConditionReady) - if nodeClassReady.IsFalse() { - return nil, cloudprovider.NewNodeClassNotReadyError(stderrors.New(nodeClassReady.Message)) - } // convert the node back into a node claim to get the chosen resolved requirement values. return c.toNodeClaim(node) } diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index 01617e878b..c413e28cd7 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -207,8 +207,6 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, commandID := uuid.NewUUID() log.FromContext(ctx).WithValues(append([]any{"command-id", string(commandID), "reason", strings.ToLower(string(m.Reason()))}, cmd.LogValues()...)...).Info("disrupting node(s)") - c.cluster.MarkForDeletion(lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })...) - // Cordon the old nodes before we launch the replacements to prevent new pods from scheduling to the old nodes markedCandidates, markDisruptedErr := c.MarkDisrupted(ctx, m, cmd.candidates...) // If we get a failure marking some nodes as disrupted, if we are launching replacements, we shouldn't continue @@ -224,6 +222,14 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, return serrors.Wrap(fmt.Errorf("launching replacement nodeclaim, %w", err), "command-id", commandID) } + // IMPORTANT + // We must MarkForDeletion AFTER we launch the replacements and not before + // The reason for this is to avoid producing double-launches + // If we MarkForDeletion before we create replacements, it's possible for the provisioner + // to recognize that it needs to launch capacity for terminating pods, causing us to launch + // capacity for these pods twice instead of just once + c.cluster.MarkForDeletion(lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })...) + // Nominate each node for scheduling and emit pod nomination events // We emit all nominations before we exit the disruption loop as // we want to ensure that nodes that are nominated are respected in the subsequent @@ -261,6 +267,7 @@ func (c *Controller) createReplacementNodeClaims(ctx context.Context, m Method, return nodeClaimNames, nil } +// MarkDisrupted taints the node and adds the Disrupted condition to the NodeClaim for a candidate that is about to be disrupted func (c *Controller) MarkDisrupted(ctx context.Context, m Method, candidates ...*Candidate) ([]*Candidate, error) { errs := make([]error, len(candidates)) workqueue.ParallelizeUntil(ctx, len(candidates), len(candidates), func(i int) { diff --git a/pkg/controllers/disruption/orchestration/queue.go b/pkg/controllers/disruption/orchestration/queue.go index f40ac36678..7634e5b4f4 100644 --- a/pkg/controllers/disruption/orchestration/queue.go +++ b/pkg/controllers/disruption/orchestration/queue.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "strings" "sync" "time" @@ -228,13 +227,12 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) { multiErr := multierr.Combine(err, cmd.lastError, state.RequireNoScheduleTaint(ctx, q.kubeClient, false, cmd.candidates...)) multiErr = multierr.Combine(multiErr, state.ClearNodeClaimsCondition(ctx, q.kubeClient, v1.ConditionTypeDisruptionReason, cmd.candidates...)) // Log the error - log.FromContext(ctx).WithValues("nodes", strings.Join(lo.Map(cmd.candidates, func(s *state.StateNode, _ int) string { - return s.Name() - }), ",")).Error(multiErr, "failed terminating nodes while executing a disruption command") + log.FromContext(ctx).Error(multiErr, "failed terminating nodes while executing a disruption command") + } else { + log.FromContext(ctx).V(1).Info("command succeeded") } // If command is complete, remove command from queue. q.Remove(cmd) - log.FromContext(ctx).V(1).Info("command succeeded") return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil } diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 33ff2867de..28e1ca7be8 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -21,6 +21,7 @@ import ( "fmt" "sort" "sync" + "sync/atomic" "testing" "time" @@ -314,9 +315,7 @@ var _ = Describe("Simulate Scheduling", func() { } // Get a set of the node claim names so that it's easy to check if a new one is made - nodeClaimNames := lo.SliceToMap(nodeClaims, func(nc *v1.NodeClaim) (string, struct{}) { - return nc.Name, struct{}{} - }) + nodeClaimNames := sets.New(lo.Map(nodeClaims, func(nc *v1.NodeClaim, _ int) string { return nc.Name })...) ExpectSingletonReconciled(ctx, disruptionController) // Expect a replace action @@ -324,11 +323,10 @@ var _ = Describe("Simulate Scheduling", func() { ncs := ExpectNodeClaims(ctx, env.Client) // which would create one more node claim Expect(len(ncs)).To(Equal(11)) - nc, new := lo.Find(ncs, func(nc *v1.NodeClaim) bool { - _, ok := nodeClaimNames[nc.Name] - return !ok + nc, ok := lo.Find(ncs, func(nc *v1.NodeClaim) bool { + return !nodeClaimNames.Has(nc.Name) }) - Expect(new).To(BeTrue()) + Expect(ok).To(BeTrue()) // which needs to be deployed ExpectNodeClaimDeployedAndStateUpdated(ctx, env.Client, cluster, cloudProvider, nc) nodeClaimNames[nc.Name] = struct{}{} @@ -337,11 +335,10 @@ var _ = Describe("Simulate Scheduling", func() { // Another replacement disruption action ncs = ExpectNodeClaims(ctx, env.Client) Expect(len(ncs)).To(Equal(12)) - nc, new = lo.Find(ncs, func(nc *v1.NodeClaim) bool { - _, ok := nodeClaimNames[nc.Name] - return !ok + nc, ok = lo.Find(ncs, func(nc *v1.NodeClaim) bool { + return !nodeClaimNames.Has(nc.Name) }) - Expect(new).To(BeTrue()) + Expect(ok).To(BeTrue()) ExpectNodeClaimDeployedAndStateUpdated(ctx, env.Client, cluster, cloudProvider, nc) nodeClaimNames[nc.Name] = struct{}{} @@ -350,11 +347,10 @@ var _ = Describe("Simulate Scheduling", func() { // One more replacement disruption action ncs = ExpectNodeClaims(ctx, env.Client) Expect(len(ncs)).To(Equal(13)) - nc, new = lo.Find(ncs, func(nc *v1.NodeClaim) bool { - _, ok := nodeClaimNames[nc.Name] - return !ok + nc, ok = lo.Find(ncs, func(nc *v1.NodeClaim) bool { + return !nodeClaimNames.Has(nc.Name) }) - Expect(new).To(BeTrue()) + Expect(ok).To(BeTrue()) ExpectNodeClaimDeployedAndStateUpdated(ctx, env.Client, cluster, cloudProvider, nc) nodeClaimNames[nc.Name] = struct{}{} @@ -460,6 +456,74 @@ var _ = Describe("Simulate Scheduling", func() { Expect(nodeclaims[0].Name).ToNot(Equal(nodeClaim.Name)) Expect(nodes[0].Name).ToNot(Equal(node.Name)) }) + It("should ensure that we do not duplicate capacity for disrupted nodes with provisioning", func() { + // We create a client that hangs Create() so that when we try to create replacements + // we give ourselves time to check that we wouldn't provision additional capacity before the replacements are made + hangCreateClient := newHangCreateClient(env.Client) + defer hangCreateClient.Stop() + + p := provisioning.NewProvisioner(hangCreateClient, recorder, cloudProvider, cluster, fakeClock) + dc := disruption.NewController(fakeClock, env.Client, p, cloudProvider, recorder, cluster, queue) + + nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + ProviderID: test.RandomProviderID(), + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrifted) + labels := map[string]string{ + "app": "test", + } + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) + + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: lo.ToPtr(true), + BlockOwnerDeletion: lo.ToPtr(true), + }, + }}}) + + ExpectApplied(ctx, env.Client, rs, pod, nodeClaim, node, nodePool) + + // bind the pods to the node + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + // Expect the disruption controller to attempt to create a replacement and hang creation when we try to create the replacement + go ExpectSingletonReconciled(ctx, dc) + Eventually(func(g Gomega) { + g.Expect(hangCreateClient.HasWaiter()).To(BeTrue()) + }).Should(Succeed()) + + // If our code works correctly, the provisioner should not try to create a new NodeClaim since we shouldn't have marked + // our nodes for disruption until the new NodeClaims have been successfully launched + results, err := prov.Schedule(ctx) + Expect(err).ToNot(HaveOccurred()) + Expect(results.NewNodeClaims).To(BeEmpty()) + }) }) var _ = Describe("Disruption Taints", func() { @@ -1745,7 +1809,7 @@ var _ = Describe("Candidate Filtering", func() { Expect(cluster.Nodes()).To(HaveLen(1)) _, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass) - Expect(err).ToNot((HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) }) It("should consider candidates that have an instance type that cannot be resolved", func() { nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{ @@ -2102,8 +2166,8 @@ func mostExpensiveInstanceWithZone(zone string) *cloudprovider.InstanceType { } //nolint:unparam -func fromInt(i int) *intstr.IntOrString { - v := intstr.FromInt(i) +func fromInt(i int32) *intstr.IntOrString { + v := intstr.FromInt32(i) return &v } @@ -2212,3 +2276,28 @@ func NewTestingQueue(kubeClient client.Client, recorder events.Recorder, cluster q.TypedRateLimitingInterface = test.NewTypedRateLimitingInterface[*orchestration.Command](workqueue.TypedQueueConfig[*orchestration.Command]{Name: "disruption.workqueue"}) return q } + +type hangCreateClient struct { + client.Client + hasWaiter atomic.Bool + stop chan struct{} +} + +func newHangCreateClient(c client.Client) *hangCreateClient { + return &hangCreateClient{Client: c, stop: make(chan struct{})} +} + +func (h *hangCreateClient) HasWaiter() bool { + return h.hasWaiter.Load() +} + +func (h *hangCreateClient) Stop() { + close(h.stop) +} + +func (h *hangCreateClient) Create(_ context.Context, _ client.Object, _ ...client.CreateOption) error { + h.hasWaiter.Store(true) + <-h.stop + h.hasWaiter.Store(false) + return nil +} diff --git a/pkg/controllers/node/health/suite_test.go b/pkg/controllers/node/health/suite_test.go index 7418ab723b..6a50c33f65 100644 --- a/pkg/controllers/node/health/suite_test.go +++ b/pkg/controllers/node/health/suite_test.go @@ -67,7 +67,7 @@ var _ = BeforeSuite(func() { cloudProvider = fake.NewCloudProvider() cloudProvider = fake.NewCloudProvider() recorder = test.NewEventRecorder() - queue = terminator.NewTestingQueue(env.Client, recorder) + queue = terminator.NewQueue(env.Client, recorder) healthController = health.NewController(env.Client, cloudProvider, fakeClock, recorder) }) diff --git a/pkg/controllers/node/termination/suite_test.go b/pkg/controllers/node/termination/suite_test.go index b0d93e0b43..4ef54c6c83 100644 --- a/pkg/controllers/node/termination/suite_test.go +++ b/pkg/controllers/node/termination/suite_test.go @@ -70,7 +70,7 @@ var _ = BeforeSuite(func() { cloudProvider = fake.NewCloudProvider() recorder = test.NewEventRecorder() - queue = terminator.NewTestingQueue(env.Client, recorder) + queue = terminator.NewQueue(env.Client, recorder) terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder) }) @@ -86,7 +86,7 @@ var _ = Describe("Termination", func() { BeforeEach(func() { fakeClock.SetTime(time.Now()) cloudProvider.Reset() - *queue = lo.FromPtr(terminator.NewTestingQueue(env.Client, recorder)) + *queue = lo.FromPtr(terminator.NewQueue(env.Client, recorder)) nodePool = test.NodePool() nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}}) @@ -209,7 +209,8 @@ var _ = Describe("Termination", func() { ExpectManualBinding(ctx, env.Client, pod, node) Expect(env.Client.Delete(ctx, node)).To(Succeed()) // We only reconcile once since this label should be applied before draining the node - ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) + ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation + node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Labels[corev1.LabelNodeExcludeBalancers]).Should(Equal("karpenter")) }) @@ -225,9 +226,10 @@ var _ = Describe("Termination", func() { // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation - Expect(queue.Has(node, podSkip)).To(BeFalse()) - ExpectSingletonReconciled(ctx, queue) + Expect(queue.Has(podSkip)).To(BeFalse()) + ExpectObjectReconciled(ctx, env.Client, queue, podEvict) // Expect node to exist and be draining ExpectNodeWithNodeClaimDraining(env.Client, node.Name) @@ -256,9 +258,10 @@ var _ = Describe("Termination", func() { // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation - Expect(queue.Has(node, podSkip)).To(BeFalse()) - ExpectSingletonReconciled(ctx, queue) + Expect(queue.Has(podSkip)).To(BeFalse()) + ExpectObjectReconciled(ctx, env.Client, queue, podEvict) // Expect node to exist and be draining ExpectNodeWithNodeClaimDraining(env.Client, node.Name) @@ -267,7 +270,7 @@ var _ = Describe("Termination", func() { EventuallyExpectTerminating(ctx, env.Client, podEvict) ExpectDeleted(ctx, env.Client, podEvict) - Expect(queue.Has(node, podSkip)).To(BeFalse()) + Expect(queue.Has(podSkip)).To(BeFalse()) // Reconcile to delete node node = ExpectNodeExists(ctx, env.Client, node.Name) @@ -289,7 +292,7 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation - ExpectSingletonReconciled(ctx, queue) + ExpectObjectReconciled(ctx, env.Client, queue, podEvict) // Expect node to exist and be draining ExpectNodeWithNodeClaimDraining(env.Client, node.Name) @@ -317,8 +320,9 @@ var _ = Describe("Termination", func() { ExpectApplied(ctx, env.Client, node, nodeClaim, pod) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation - ExpectSingletonReconciled(ctx, queue) + ExpectObjectReconciled(ctx, env.Client, queue, pod) // Expect pod with no owner ref to be enqueued for eviction EventuallyExpectTerminating(ctx, env.Client, pod) @@ -385,13 +389,13 @@ var _ = Describe("Termination", func() { ExpectNodeWithNodeClaimDraining(env.Client, node.Name) // Expect podNoEvict to be added to the queue - Expect(queue.Has(node, podNoEvict)).To(BeTrue()) + Expect(queue.Has(podNoEvict)).To(BeTrue()) - // Attempt to evict the pod, but fail to do so - ExpectSingletonReconciled(ctx, queue) + // Attempt to evict the pod + ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, queue, podNoEvict)) // Expect podNoEvict to fail eviction due to PDB, and be retried - Expect(queue.Has(node, podNoEvict)).To(BeTrue()) + Expect(queue.Has(podNoEvict)).To(BeTrue()) // Delete pod to simulate successful eviction ExpectDeleted(ctx, env.Client, podNoEvict) @@ -452,18 +456,17 @@ var _ = Describe("Termination", func() { } ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) ExpectNodeWithNodeClaimDraining(env.Client, node.Name) - for range podGroup { - ExpectSingletonReconciled(ctx, queue) + for _, pod := range podGroup { + ExpectObjectReconciled(ctx, env.Client, queue, pod) } // Start draining the pod group, but don't complete it yet EventuallyExpectTerminating(ctx, env.Client, lo.Map(podGroup, func(p *corev1.Pod, _ int) client.Object { return p })...) // Look at the next pod group and ensure that none of the pods have started terminating on it if i != len(podGroups)-1 { - for range podGroups[i+1] { - ExpectSingletonReconciled(ctx, queue) + for _, pod := range podGroups[i+1] { + Expect(queue.Has(pod)).To(BeFalse()) } - ConsistentlyExpectNotTerminating(ctx, env.Client, lo.Map(podGroups[i+1], func(p *corev1.Pod, _ int) client.Object { return p })...) } // Expect that the pods are deleted -- which should unblock the next pod group ExpectDeleted(ctx, env.Client, lo.Map(podGroup, func(p *corev1.Pod, _ int) client.Object { return p })...) @@ -487,8 +490,9 @@ var _ = Describe("Termination", func() { // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) + ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation (non-critical) - ExpectSingletonReconciled(ctx, queue) + ExpectObjectReconciled(ctx, env.Client, queue, podEvict) // Expect node to exist and be draining ExpectNodeWithNodeClaimDraining(env.Client, node.Name) @@ -500,8 +504,8 @@ var _ = Describe("Termination", func() { // Expect the critical pods to be evicted and deleted node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation (critical) - ExpectSingletonReconciled(ctx, queue) - ExpectSingletonReconciled(ctx, queue) + ExpectObjectReconciled(ctx, env.Client, queue, podNodeCritical) + ExpectObjectReconciled(ctx, env.Client, queue, podClusterCritical) EventuallyExpectTerminating(ctx, env.Client, podNodeCritical, podClusterCritical) ExpectDeleted(ctx, env.Client, podNodeCritical) @@ -535,10 +539,10 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation - ExpectSingletonReconciled(ctx, queue) + ExpectObjectReconciled(ctx, env.Client, queue, podEvict) // Expect mirror pod to not be queued for eviction - Expect(queue.Has(node, podNoEvict)).To(BeFalse()) + Expect(queue.Has(podNoEvict)).To(BeFalse()) // Expect podEvict to be enqueued for eviction then be successful EventuallyExpectTerminating(ctx, env.Client, podEvict) @@ -566,8 +570,8 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation - ExpectSingletonReconciled(ctx, queue) - ExpectSingletonReconciled(ctx, queue) + ExpectObjectReconciled(ctx, env.Client, queue, pods[0]) + ExpectObjectReconciled(ctx, env.Client, queue, pods[1]) nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsUnknown()).To(BeTrue()) @@ -614,8 +618,8 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation - ExpectSingletonReconciled(ctx, queue) - ExpectSingletonReconciled(ctx, queue) + ExpectObjectReconciled(ctx, env.Client, queue, pods[0]) + ExpectObjectReconciled(ctx, env.Client, queue, pods[1]) // Expect the pods to be evicted EventuallyExpectTerminating(ctx, env.Client, pods[0], pods[1]) @@ -644,8 +648,8 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation - ExpectSingletonReconciled(ctx, queue) - ExpectSingletonReconciled(ctx, queue) + ExpectObjectReconciled(ctx, env.Client, queue, pods[0]) + ExpectObjectReconciled(ctx, env.Client, queue, pods[1]) // Expect the pods to be evicted EventuallyExpectTerminating(ctx, env.Client, pods[0], pods[1]) @@ -675,7 +679,8 @@ var _ = Describe("Termination", func() { // Before grace period, node should not delete Expect(env.Client.Delete(ctx, node)).To(Succeed()) ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation - ExpectSingletonReconciled(ctx, queue) + ExpectObjectReconciled(ctx, env.Client, queue, pod) + ExpectNodeExists(ctx, env.Client, node.Name) EventuallyExpectTerminating(ctx, env.Client, pod) @@ -720,7 +725,7 @@ var _ = Describe("Termination", func() { ExpectNotFound(ctx, env.Client, node) // Expect that the old pod's key still exists in the queue - Expect(queue.Has(node, pod)).To(BeTrue()) + Expect(queue.Has(pod)).To(BeTrue()) // Re-create the pod and node, it should now have the same name, but a different UUID node = test.Node(test.NodeOptions{ @@ -737,8 +742,8 @@ var _ = Describe("Termination", func() { }) ExpectApplied(ctx, env.Client, node, pod) - // Trigger eviction queue with the pod key still in it - ExpectSingletonReconciled(ctx, queue) + // Check if the queue has seen the pod (it shouldn't because its got a different UUID) + Expect(queue.Has(pod)).To(BeFalse()) Consistently(func(g Gomega) { g.Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pod), pod)).To(Succeed()) @@ -764,6 +769,8 @@ var _ = Describe("Termination", func() { Expect(env.Client.Delete(ctx, node)).To(Succeed()) ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // DrainInitiation + ExpectObjectReconciled(ctx, env.Client, queue, pod) + ExpectNodeExists(ctx, env.Client, node.Name) pod = ExpectExists(ctx, env.Client, pod) Expect(pod.DeletionTimestamp.IsZero()).To(BeFalse()) @@ -800,6 +807,7 @@ var _ = Describe("Termination", func() { ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) pod = ExpectExists(ctx, env.Client, pod) Expect(pod.DeletionTimestamp.IsZero()).To(BeFalse()) + }) Context("VolumeAttachments", func() { It("should wait for volume attachments", func() { @@ -944,29 +952,6 @@ var _ = Describe("Termination", func() { Expect(ok).To(BeTrue()) Expect(lo.FromPtr(m.GetHistogram().SampleCount)).To(BeNumerically("==", 1)) }) - It("should update the eviction queueDepth metric when reconciling pods", func() { - minAvailable := intstr.FromInt32(0) - labelSelector := map[string]string{test.RandomName(): test.RandomName()} - pdb := test.PodDisruptionBudget(test.PDBOptions{ - Labels: labelSelector, - // Don't let any pod evict - MinAvailable: &minAvailable, - }) - ExpectApplied(ctx, env.Client, pdb, node, nodeClaim) - pods := test.Pods(5, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: defaultOwnerRefs, - Labels: labelSelector, - }}) - ExpectApplied(ctx, env.Client, lo.Map(pods, func(p *corev1.Pod, _ int) client.Object { return p })...) - - wqDepthBefore, _ := FindMetricWithLabelValues("workqueue_adds_total", map[string]string{"name": "eviction.workqueue"}) - Expect(env.Client.Delete(ctx, node)).To(Succeed()) - node = ExpectNodeExists(ctx, env.Client, node.Name) - ExpectRequeued(ExpectObjectReconciled(ctx, env.Client, terminationController, node)) // Drain - wqDepthAfter, ok := FindMetricWithLabelValues("workqueue_adds_total", map[string]string{"name": "eviction.workqueue"}) - Expect(ok).To(BeTrue()) - Expect(lo.FromPtr(wqDepthAfter.GetCounter().Value) - lo.FromPtr(wqDepthBefore.GetCounter().Value)).To(BeNumerically("==", 5)) - }) }) }) diff --git a/pkg/controllers/node/termination/terminator/eviction.go b/pkg/controllers/node/termination/terminator/eviction.go index 3cbcbe2644..bb1c51fb16 100644 --- a/pkg/controllers/node/termination/terminator/eviction.go +++ b/pkg/controllers/node/termination/terminator/eviction.go @@ -24,8 +24,8 @@ import ( "time" "github.com/awslabs/operatorpkg/serrors" - "github.com/awslabs/operatorpkg/singleton" "github.com/samber/lo" + "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -36,16 +36,20 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/operator/injection" - "sigs.k8s.io/karpenter/pkg/utils/node" + nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" + podutils "sigs.k8s.io/karpenter/pkg/utils/pod" ) const ( @@ -71,23 +75,21 @@ func IsNodeDrainError(err error) bool { type QueueKey struct { types.NamespacedName - UID types.UID - providerID string + UID types.UID } -func NewQueueKey(pod *corev1.Pod, providerID string) QueueKey { +func NewQueueKey(pod *corev1.Pod) QueueKey { return QueueKey{ NamespacedName: client.ObjectKeyFromObject(pod), UID: pod.UID, - providerID: providerID, } } type Queue struct { - workqueue.TypedRateLimitingInterface[QueueKey] + sync.Mutex - mu sync.Mutex - set sets.Set[QueueKey] + source chan event.TypedGenericEvent[*corev1.Pod] + set sets.Set[QueueKey] kubeClient client.Client recorder events.Recorder @@ -95,93 +97,75 @@ type Queue struct { func NewQueue(kubeClient client.Client, recorder events.Recorder) *Queue { return &Queue{ - TypedRateLimitingInterface: workqueue.NewTypedRateLimitingQueueWithConfig[QueueKey]( - workqueue.NewTypedItemExponentialFailureRateLimiter[QueueKey](evictionQueueBaseDelay, evictionQueueMaxDelay), - workqueue.TypedRateLimitingQueueConfig[QueueKey]{ - Name: "eviction.workqueue", - }), + source: make(chan event.TypedGenericEvent[*corev1.Pod], 10000), set: sets.New[QueueKey](), kubeClient: kubeClient, recorder: recorder, } } -func NewTestingQueue(kubeClient client.Client, recorder events.Recorder) *Queue { - return &Queue{ - TypedRateLimitingInterface: &controllertest.TypedQueue[QueueKey]{TypedInterface: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[QueueKey]{Name: "eviction.workqueue"})}, - set: sets.New[QueueKey](), - kubeClient: kubeClient, - recorder: recorder, - } +func (q *Queue) Name() string { + return "eviction-queue" } func (q *Queue) Register(_ context.Context, m manager.Manager) error { return controllerruntime.NewControllerManagedBy(m). - Named("eviction-queue"). - WatchesRawSource(singleton.Source()). - Complete(singleton.AsReconciler(q)) + Named(q.Name()). + WatchesRawSource(source.Channel(q.source, handler.TypedFuncs[*corev1.Pod, reconcile.Request]{ + GenericFunc: func(_ context.Context, e event.TypedGenericEvent[*corev1.Pod], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { + queue.Add(reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(e.Object), + }) + }, + })). + WithOptions(controller.Options{ + RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request]( + workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](evictionQueueBaseDelay, evictionQueueMaxDelay), + &workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(100), 1000)}, + ), + MaxConcurrentReconciles: 100, + }). + Complete(reconcile.AsReconciler(m.GetClient(), q)) } // Add adds pods to the Queue -func (q *Queue) Add(node *corev1.Node, pods ...*corev1.Pod) { - q.mu.Lock() - defer q.mu.Unlock() +func (q *Queue) Add(pods ...*corev1.Pod) { + q.Lock() + defer q.Unlock() for _, pod := range pods { - qk := NewQueueKey(pod, node.Spec.ProviderID) + qk := NewQueueKey(pod) if !q.set.Has(qk) { q.set.Insert(qk) - q.TypedRateLimitingInterface.Add(qk) + q.source <- event.TypedGenericEvent[*corev1.Pod]{Object: pod} } } } -func (q *Queue) Has(node *corev1.Node, pod *corev1.Pod) bool { - q.mu.Lock() - defer q.mu.Unlock() +func (q *Queue) Has(pod *corev1.Pod) bool { + q.Lock() + defer q.Unlock() - return q.set.Has(NewQueueKey(pod, node.Spec.ProviderID)) + return q.set.Has(NewQueueKey(pod)) } -func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) { - ctx = injection.WithControllerName(ctx, "eviction-queue") - // Check if the queue is empty. client-go recommends not using this function to gate the subsequent - // get call, but since we're popping items off the queue synchronously, there should be no synchonization - // issues. - if q.TypedRateLimitingInterface.Len() == 0 { - return reconcile.Result{RequeueAfter: 1 * time.Second}, nil - } - // Get pod from queue. This waits until queue is non-empty. - item, shutdown := q.TypedRateLimitingInterface.Get() - if shutdown { - return reconcile.Result{}, fmt.Errorf("EvictionQueue is broken and has shutdown") - } - - defer q.TypedRateLimitingInterface.Done(item) +func (q *Queue) Reconcile(ctx context.Context, pod *corev1.Pod) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, q.Name()) - // Evict the pod - if q.Evict(ctx, item) { - q.TypedRateLimitingInterface.Forget(item) - q.mu.Lock() - q.set.Delete(item) - q.mu.Unlock() - return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil + if !q.Has(pod) { + //This is a different pod than the one the queue, we should exit without evicting + //This race happens when a pod is replaced with one that has the same namespace and name + //but a different UID after the original pod is added to the queue but before the + //controller can reconcile on it + return reconcile.Result{}, nil } - - // Requeue pod if eviction failed - q.TypedRateLimitingInterface.AddRateLimited(item) - return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil -} - -// Evict returns true if successful eviction call, and false if there was an eviction-related error -func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { - ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Pod", klog.KRef(key.Namespace, key.Name))) + // Evict the pod if err := q.kubeClient.SubResource("eviction").Create(ctx, - &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}}, + pod, &policyv1.Eviction{ DeleteOptions: &metav1.DeleteOptions{ Preconditions: &metav1.Preconditions{ - UID: lo.ToPtr(key.UID), + UID: lo.ToPtr(pod.UID), }, }, }); err != nil { @@ -197,27 +181,38 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { // https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L160 // 409 - The pod exists, but it is not the same pod that we initiated the eviction on // https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L318 - return true + return reconcile.Result{}, nil } + // The pod exists and is the same pod, we need to continue if apierrors.IsTooManyRequests(err) { // 429 - PDB violation - q.recorder.Publish(terminatorevents.NodeFailedToDrain(&corev1.Node{ObjectMeta: metav1.ObjectMeta{ - Name: key.Name, - Namespace: key.Namespace, - }}, serrors.Wrap(fmt.Errorf("evicting pod violates a PDB"), "Pod", klog.KRef(key.Namespace, key.Name)))) - return false + node, err2 := podutils.NodeForPod(ctx, q.kubeClient, pod) + if err2 != nil { + return reconcile.Result{}, err2 + } + q.recorder.Publish(terminatorevents.NodeFailedToDrain(node, serrors.Wrap(fmt.Errorf("evicting pod violates a PDB"), "Pod", klog.KRef(pod.Namespace, pod.Name)))) + return reconcile.Result{RequeueAfter: evictionQueueBaseDelay}, nil } - log.FromContext(ctx).Error(err, "failed evicting pod") - return false + // Its not a PDB, we should requeue + return reconcile.Result{}, err } NodesEvictionRequestsTotal.Inc(map[string]string{CodeLabel: "200"}) - reason := evictionReason(ctx, key, q.kubeClient) - q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}, reason)) + reason := evictionReason(ctx, pod, q.kubeClient) + q.recorder.Publish(terminatorevents.EvictPod(pod, reason)) PodsDrainedTotal.Inc(map[string]string{ReasonLabel: reason}) - return true + + q.Lock() + defer q.Unlock() + q.set.Delete(NewQueueKey(pod)) + return reconcile.Result{}, nil } -func evictionReason(ctx context.Context, key QueueKey, kubeClient client.Client) string { - nodeClaim, err := node.NodeClaimForNode(ctx, kubeClient, &corev1.Node{Spec: corev1.NodeSpec{ProviderID: key.providerID}}) +func evictionReason(ctx context.Context, pod *corev1.Pod, kubeClient client.Client) string { + node, err := podutils.NodeForPod(ctx, kubeClient, pod) + if err != nil { + log.FromContext(ctx).V(1).Error(err, "pod has no node, failed looking up pod eviction reason") + return "" + } + nodeClaim, err := nodeutils.NodeClaimForNode(ctx, kubeClient, node) if err != nil { log.FromContext(ctx).V(1).Error(err, "node has no nodeclaim, failed looking up pod eviction reason") return "" diff --git a/pkg/controllers/node/termination/terminator/suite_test.go b/pkg/controllers/node/termination/terminator/suite_test.go index 6aa0d81596..233554c19e 100644 --- a/pkg/controllers/node/termination/terminator/suite_test.go +++ b/pkg/controllers/node/termination/terminator/suite_test.go @@ -18,7 +18,6 @@ package terminator_test import ( "context" - "sync" "testing" "time" @@ -32,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" clock "k8s.io/utils/clock/testing" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/karpenter/pkg/apis" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -70,7 +68,7 @@ var _ = BeforeSuite(func() { ) ctx = options.ToContext(ctx, test.Options()) recorder = test.NewEventRecorder() - queue = terminator.NewTestingQueue(env.Client, recorder) + queue = terminator.NewQueue(env.Client, recorder) terminatorInstance = terminator.NewTerminator(fakeClock, env.Client, queue, recorder) }) @@ -81,7 +79,7 @@ var _ = AfterSuite(func() { var _ = BeforeEach(func() { recorder.Reset() // Reset the events that we captured during the run // Shut down the queue and restart it to ensure no races - *queue = lo.FromPtr(terminator.NewTestingQueue(env.Client, recorder)) + *queue = lo.FromPtr(terminator.NewQueue(env.Client, recorder)) }) var _ = AfterEach(func() { @@ -101,25 +99,27 @@ var _ = Describe("Eviction/Queue", func() { Labels: testLabels, }, }) - node = test.Node(test.NodeOptions{ProviderID: "123456789"}) + node = test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{Name: pod.Spec.NodeName, Namespace: pod.Namespace}, ProviderID: "my-provider"}) + terminator.NodesEvictionRequestsTotal.Reset() terminator.PodsDrainedTotal.Reset() }) Context("Eviction API", func() { It("should succeed with no event when the pod is not found", func() { - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeTrue()) + ExpectObjectReconciled(ctx, env.Client, queue, pod) Expect(recorder.Events()).To(HaveLen(0)) }) It("should succeed with no event when the pod UID conflicts", func() { - ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, terminator.QueueKey{NamespacedName: client.ObjectKeyFromObject(pod), UID: uuid.NewUUID()})).To(BeTrue()) - ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "409"}) - Expect(recorder.Events()).To(HaveLen(0)) + ExpectApplied(ctx, env.Client, pod, node) + queue.Add(pod) + pod.UID = uuid.NewUUID() + Expect(queue.Has(pod)).To(BeFalse()) }) It("should succeed with an evicted event when there are no PDBs", func() { - ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeTrue()) + ExpectApplied(ctx, env.Client, pod, node) + queue.Add(pod) + ExpectObjectReconciled(ctx, env.Client, queue, pod) ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "200"}) Expect(recorder.Calls(events.Evicted)).To(Equal(1)) }) @@ -128,13 +128,16 @@ var _ = Describe("Eviction/Queue", func() { Labels: testLabels, MaxUnavailable: &intstr.IntOrString{IntVal: 1}, }) - ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeTrue()) + ExpectApplied(ctx, env.Client, pod, node) + queue.Add(pod) + ExpectObjectReconciled(ctx, env.Client, queue, pod) Expect(recorder.Calls(events.Evicted)).To(Equal(1)) }) It("should return a NodeDrainError event when a PDB is blocking", func() { - ExpectApplied(ctx, env.Client, pdb, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeFalse()) + ExpectApplied(ctx, env.Client, pdb, pod, node) + ExpectManualBinding(ctx, env.Client, pod, node) + queue.Add(pod) + ExpectObjectReconciled(ctx, env.Client, queue, pod) Expect(recorder.Calls(events.FailedDraining)).To(Equal(1)) }) It("should fail when two PDBs refer to the same pod", func() { @@ -142,39 +145,37 @@ var _ = Describe("Eviction/Queue", func() { Labels: testLabels, MaxUnavailable: &intstr.IntOrString{IntVal: 0}, }) - ExpectApplied(ctx, env.Client, pdb, pdb2, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeFalse()) + ExpectApplied(ctx, env.Client, pdb, pdb2, pod, node) + queue.Add(pod) + _ = ExpectObjectReconcileFailed(ctx, env.Client, queue, pod) ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "500"}) }) It("should ensure that calling Evict() is valid while making Add() calls", func() { - cancelCtx, cancel := context.WithCancel(ctx) - wg := sync.WaitGroup{} - DeferCleanup(func() { - cancel() - wg.Wait() // Ensure that we wait for reconcile loop to finish so that we don't get a RACE - }) - - // Keep calling Reconcile() for the entirety of this test - wg.Add(1) - go func() { - defer wg.Done() - - for { - ExpectSingletonReconciled(ctx, queue) - if cancelCtx.Err() != nil { - return + // Ensure that we add enough pods to the queue while we are pulling items off of the queue (enough to trigger a DATA RACE) + pods := test.Pods(1000) + cancelContext, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + + for _, pod := range pods { + go func() { + for { + if cancelContext.Err() != nil { + return + } + queue.Add(pod) } - } - }() + }() + } - // Ensure that we add enough pods to the queue while we are pulling items off of the queue (enough to trigger a DATA RACE) - for i := 0; i < 10000; i++ { - queue.Add(node, test.Pod()) + for _, pod = range pods { + ExpectObjectReconciled(ctx, env.Client, queue, pod) } + }) It("should increment PodsDrainedTotal metric when a pod is evicted", func() { - ExpectApplied(ctx, env.Client, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeTrue()) + ExpectApplied(ctx, env.Client, pod, node) + queue.Add(pod) + ExpectObjectReconciled(ctx, env.Client, queue, pod) ExpectMetricCounterValue(terminator.PodsDrainedTotal, 1, map[string]string{terminator.ReasonLabel: ""}) ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "200"}) Expect(recorder.Calls(events.Evicted)).To(Equal(1)) @@ -186,6 +187,7 @@ var _ = Describe("Eviction/Queue", func() { }, Status: v1.NodeClaimStatus{ ProviderID: node.Spec.ProviderID, + NodeName: node.Name, }, }) nodeClaim.StatusConditions().Set(status.Condition{ @@ -195,8 +197,10 @@ var _ = Describe("Eviction/Queue", func() { Message: "Node is being interrupted", }) - ExpectApplied(ctx, env.Client, nodeClaim, pod) - Expect(queue.Evict(ctx, terminator.NewQueueKey(pod, node.Spec.ProviderID))).To(BeTrue()) + ExpectApplied(ctx, env.Client, nodeClaim, node, pod) + ExpectManualBinding(ctx, env.Client, pod, node) + queue.Add(pod) + ExpectObjectReconciled(ctx, env.Client, queue, pod) ExpectMetricCounterValue(terminator.PodsDrainedTotal, 1, map[string]string{terminator.ReasonLabel: "SpotInterruption"}) ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{terminator.CodeLabel: "200"}) @@ -206,7 +210,7 @@ var _ = Describe("Eviction/Queue", func() { Context("Pod Deletion API", func() { It("should not delete a pod with no nodeTerminationTime", func() { - ExpectApplied(ctx, env.Client, pod) + ExpectApplied(ctx, env.Client, pod, node) Expect(terminatorInstance.DeleteExpiringPods(ctx, []*corev1.Pod{pod}, nil)).To(Succeed()) ExpectExists(ctx, env.Client, pod) @@ -214,7 +218,7 @@ var _ = Describe("Eviction/Queue", func() { }) It("should not delete a pod with terminationGracePeriodSeconds still remaining before nodeTerminationTime", func() { pod.Spec.TerminationGracePeriodSeconds = lo.ToPtr[int64](60) - ExpectApplied(ctx, env.Client, pod) + ExpectApplied(ctx, env.Client, pod, node) nodeTerminationTime := time.Now().Add(time.Minute * 5) Expect(terminatorInstance.DeleteExpiringPods(ctx, []*corev1.Pod{pod}, &nodeTerminationTime)).To(Succeed()) diff --git a/pkg/controllers/node/termination/terminator/terminator.go b/pkg/controllers/node/termination/terminator/terminator.go index 7154df4e7e..989d33e9dc 100644 --- a/pkg/controllers/node/termination/terminator/terminator.go +++ b/pkg/controllers/node/termination/terminator/terminator.go @@ -109,7 +109,7 @@ func (t *Terminator) Drain(ctx context.Context, node *corev1.Node, nodeGracePeri for _, group := range podGroups { if len(group) > 0 { // Only add pods to the eviction queue that haven't been evicted yet - t.evictionQueue.Add(node, lo.Filter(group, func(p *corev1.Pod, _ int) bool { return podutil.IsEvictable(p) })...) + t.evictionQueue.Add(lo.Filter(group, func(p *corev1.Pod, _ int) bool { return podutil.IsEvictable(p) })...) return NewNodeDrainError(fmt.Errorf("%d pods are waiting to be evicted", lo.SumBy(podGroups, func(pods []*corev1.Pod) int { return len(pods) }))) } } diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 2cf733967c..6844ec9dd2 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -79,6 +79,7 @@ type Provisioner struct { kubeClient client.Client batcher *Batcher[types.UID] volumeTopology *scheduler.VolumeTopology + modelInference *scheduler.ModelInference cluster *state.Cluster recorder events.Recorder cm *pretty.ChangeMonitor @@ -94,6 +95,7 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder, cloudProvider: cloudProvider, kubeClient: kubeClient, volumeTopology: scheduler.NewVolumeTopology(kubeClient), + modelInference: scheduler.NewModelInference(kubeClient), cluster: cluster, recorder: recorder, cm: pretty.NewChangeMonitor(), @@ -266,6 +268,12 @@ func (p *Provisioner) NewScheduler( return nil, fmt.Errorf("injecting volume topology requirements, %w", err) } + // inject model inference requirements + pods, err = p.injectInferenceFlavorRequirements(ctx, pods) + if err != nil { + return nil, fmt.Errorf("injecting model inference requirements, %w", err) + } + // Calculate cluster topology, if a context error occurs, it is wrapped and returned topology, err := scheduler.NewTopology(ctx, p.kubeClient, p.cluster, stateNodes, nodePools, instanceTypes, pods, opts...) if err != nil { @@ -471,6 +479,7 @@ func (p *Provisioner) Validate(ctx context.Context, pod *corev1.Pod) error { validateNodeSelector(ctx, pod), validateAffinity(ctx, pod), p.volumeTopology.ValidatePersistentVolumeClaims(ctx, pod), + p.modelInference.ValidateInferenceFlavors(ctx, pod), ) } @@ -500,6 +509,21 @@ func (p *Provisioner) injectVolumeTopologyRequirements(ctx context.Context, pods return schedulablePods, nil } +func (p *Provisioner) injectInferenceFlavorRequirements(ctx context.Context, pods []*corev1.Pod) ([]*corev1.Pod, error) { + var schedulablePods []*corev1.Pod + for _, pod := range pods { + if err := p.modelInference.Inject(ctx, pod); err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return nil, err + } + log.FromContext(ctx).WithValues("Pod", klog.KObj(pod)).Error(err, "failed getting model inference requirements") + } else { + schedulablePods = append(schedulablePods, pod) + } + } + return schedulablePods, nil +} + func validateNodeSelector(ctx context.Context, p *corev1.Pod) (errs error) { terms := lo.MapToSlice(p.Spec.NodeSelector, func(k string, v string) corev1.NodeSelectorTerm { return corev1.NodeSelectorTerm{ diff --git a/pkg/controllers/provisioning/scheduling/modelinference.go b/pkg/controllers/provisioning/scheduling/modelinference.go new file mode 100644 index 0000000000..afbb45b3c4 --- /dev/null +++ b/pkg/controllers/provisioning/scheduling/modelinference.go @@ -0,0 +1,181 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "context" + "fmt" + "strings" + + "github.com/awslabs/operatorpkg/serrors" + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + "sigs.k8s.io/karpenter/pkg/utils/pretty" + + llmazcoreapi "github.com/inftyai/llmaz/api/core/v1alpha1" + llmazinferenceapi "github.com/inftyai/llmaz/api/inference/v1alpha1" +) + +func init() { + // Add support for llmaz CRDs. + utilruntime.Must(llmazcoreapi.AddToScheme(scheme.Scheme)) + utilruntime.Must(llmazinferenceapi.AddToScheme(scheme.Scheme)) +} + +func NewModelInference(kubeClient client.Client) *ModelInference { + return &ModelInference{kubeClient: kubeClient} +} + +type ModelInference struct { + kubeClient client.Client +} + +func (m *ModelInference) Inject(ctx context.Context, pod *v1.Pod) error { + flavors, err := m.getInferenceFlavors(ctx, pod) + if err != nil { + return err + } + + kept, rejected := lo.FilterReject(flavors, func(flavor llmazcoreapi.Flavor, _ int) bool { + return len(flavor.NodeSelector) > 0 + }) + if len(rejected) > 0 || len(kept) == 0 { + return nil + } + + if pod.Spec.Affinity == nil { + pod.Spec.Affinity = &v1.Affinity{} + } + if pod.Spec.Affinity.NodeAffinity == nil { + pod.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{} + } + if pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &v1.NodeSelector{} + } + if len(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 { + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = []v1.NodeSelectorTerm{{}} + } + + podCopy := pod.DeepCopy() + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = nil + + // Add the inference flavor requirements to the pod's node affinity. This causes it to be OR'd with every merged requirement, + // so that relaxation employs our flavor requirements according to the orders of the merged flavors, + // when no existing node, in-flight node claim, or node pool can satisfy the current flavor requirements. + lo.ForEach(kept, func(flavor llmazcoreapi.Flavor, _ int) { + matchExpressions := lo.MapToSlice(flavor.NodeSelector, func(key string, value string) v1.NodeSelectorRequirement { + return v1.NodeSelectorRequirement{ + Key: key, + Operator: v1.NodeSelectorOpIn, + Values: []string{value}, + } + }) + // We add our inference requirement to every node selector term. This causes it to be AND'd with every existing + // requirement so that relaxation won't remove our inference requirement. + nodeSelectorTermsCopy := podCopy.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.DeepCopy().NodeSelectorTerms + for i := 0; i < len(nodeSelectorTermsCopy); i++ { + nodeSelectorTermsCopy[i].MatchExpressions = append(nodeSelectorTermsCopy[i].MatchExpressions, matchExpressions...) + } + log.FromContext(ctx). + WithValues("Pod", klog.KObj(pod)). + V(1).Info(fmt.Sprintf("adding requirements derived from pod's inference flavor %q, %s", flavor.Name, matchExpressions)) + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = append(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, nodeSelectorTermsCopy...) + }) + + log.FromContext(ctx). + WithValues("Pod", klog.KObj(pod)). + V(1).Info(fmt.Sprintf("adding requirements derived from pod's inference flavors, %s", pretty.Concise(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution))) + + return nil +} + +func (m *ModelInference) getInferenceFlavors(ctx context.Context, pod *v1.Pod) ([]llmazcoreapi.Flavor, error) { + modelName, ok := pod.Labels[llmazcoreapi.ModelNameLabelKey] + if !ok { + // Ignore the pod that is not created via llmaz's inference service. + return nil, nil + } + + model := &llmazcoreapi.OpenModel{} + if err := m.kubeClient.Get(ctx, types.NamespacedName{Name: modelName}, model); err != nil { + return nil, fmt.Errorf("getting open model %q, %w", modelName, err) + } + modelFlavors := lo.FromPtrOr(model.Spec.InferenceConfig, llmazcoreapi.InferenceConfig{}).Flavors + + serviceFlavorRawStr, ok := pod.Annotations[llmazinferenceapi.InferenceServiceFlavorsAnnoKey] + if !ok { + // Not all inference pods specify the inference service flavors. + return modelFlavors, nil + } + + modelFlavorMap := lo.SliceToMap(modelFlavors, func(flavor llmazcoreapi.Flavor) (llmazcoreapi.FlavorName, llmazcoreapi.Flavor) { + return flavor.Name, flavor + }) + + var result []llmazcoreapi.Flavor + for _, flavorNameVal := range strings.Split(serviceFlavorRawStr, ",") { + flavor, ok := modelFlavorMap[llmazcoreapi.FlavorName(flavorNameVal)] + if !ok { + return nil, fmt.Errorf("unknown service inference flavor %q", flavorNameVal) + } + result = append(result, flavor) + } + return result, nil +} + +func (m *ModelInference) ValidateInferenceFlavors(ctx context.Context, pod *v1.Pod) (err error) { + modelName, ok := pod.Labels[llmazcoreapi.ModelNameLabelKey] + if !ok { + // Ignore the pod that is not created via llmaz's inference service. + return nil + } + + model := &llmazcoreapi.OpenModel{} + if err := m.kubeClient.Get(ctx, types.NamespacedName{Name: modelName}, model); err != nil { + return serrors.Wrap(fmt.Errorf("failed to validate open model, %w", err), "OpenModel", klog.KRef("", modelName)) + } + + serviceFlavorRawStr, ok := pod.Annotations[llmazinferenceapi.InferenceServiceFlavorsAnnoKey] + if !ok { + // Not all inference pods specify the inference service flavors. + return nil + } + + // Get all flavors from the model and check if the service flavors are valid. + allFlavors := lo.SliceToMap( + lo.FromPtrOr(model.Spec.InferenceConfig, llmazcoreapi.InferenceConfig{}).Flavors, + func(flavor llmazcoreapi.Flavor) (llmazcoreapi.FlavorName, llmazcoreapi.Flavor) { + return flavor.Name, flavor + }, + ) + unknownFlavors := lo.Reject(strings.Split(serviceFlavorRawStr, ","), func(flavor string, _ int) bool { + return lo.HasKey(allFlavors, llmazcoreapi.FlavorName(flavor)) + }) + + if len(unknownFlavors) > 0 { + err = serrors.Wrap(fmt.Errorf("unknown service inference flavors, %v", unknownFlavors), "OpenModel", klog.KRef("", modelName)) + return err + } + return nil +} diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 2d43e51018..fa2dd6eda3 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -49,9 +49,13 @@ import ( "sigs.k8s.io/karpenter/pkg/operator/options" "sigs.k8s.io/karpenter/pkg/scheduling" "sigs.k8s.io/karpenter/pkg/test" + testcrds "sigs.k8s.io/karpenter/pkg/test/crds" . "sigs.k8s.io/karpenter/pkg/test/expectations" "sigs.k8s.io/karpenter/pkg/test/v1alpha1" . "sigs.k8s.io/karpenter/pkg/utils/testing" + + llmazcoreapi "github.com/inftyai/llmaz/api/core/v1alpha1" + llmazinferenceapi "github.com/inftyai/llmaz/api/inference/v1alpha1" ) var ( @@ -73,7 +77,7 @@ func TestAPIs(t *testing.T) { } var _ = BeforeSuite(func() { - env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...)) + env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithCRDs(testcrds.CRDs...)) ctx = options.ToContext(ctx, test.Options()) cloudProvider = fake.NewCloudProvider() fakeClock = clock.NewFakeClock(time.Now()) @@ -2583,6 +2587,264 @@ var _ = Describe("Provisioning", func() { }) }) }) + + Context("Model Inference Requirements", func() { + It("should not schedule if the model is missing", func() { + ExpectApplied(ctx, env.Client, test.NodePool()) + pod := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: "unknown", + }, + }, + }) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + ExpectMetricGaugeValue(pscheduling.IgnoredPodCount, 1, nil) + ExpectNotScheduled(ctx, env.Client, pod) + }) + It("should schedule with model if the model does not have an inference flavor", func() { + model := test.OpenModel(test.OpenModelOptions{}) + ExpectApplied(ctx, env.Client, test.NodePool(), model) + pod := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: model.Name, + }, + }, + }) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + ExpectScheduled(ctx, env.Client, pod) + }) + It("should not schedule if the inference flavor annotation is incompatible between model and inference service", func() { + model := test.OpenModel(test.OpenModelOptions{}) + ExpectApplied(ctx, env.Client, test.NodePool(), model) + pod := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: model.Name, + }, + Annotations: map[string]string{ + llmazinferenceapi.InferenceServiceFlavorsAnnoKey: "unknown", + }, + }, + }) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + ExpectMetricGaugeValue(pscheduling.IgnoredPodCount, 1, nil) + ExpectNotScheduled(ctx, env.Client, pod) + }) + It("should schedule to target instance type if the model has the flavor", func() { + model := test.OpenModel(test.OpenModelOptions{ + Flavors: []llmazcoreapi.Flavor{ + { + Name: "test-flavor", + NodeSelector: map[string]string{ + corev1.LabelInstanceTypeStable: "gpu-vendor-instance-type", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, test.NodePool(), model) + pod := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: model.Name, + }, + }, + }) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "gpu-vendor-instance-type")) + }) + It("should schedule to target instance type if inference service specifies the flavor and model has multiple flavors", func() { + model := test.OpenModel(test.OpenModelOptions{ + Flavors: []llmazcoreapi.Flavor{ + { + Name: "test-flavor-1", + NodeSelector: map[string]string{ + corev1.LabelInstanceTypeStable: "gpu-vendor-instance-type", + }, + }, + { + Name: "test-flavor-2", + NodeSelector: map[string]string{ + corev1.LabelInstanceTypeStable: "gpu-vendor-b-instance-type", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, test.NodePool(), model) + pod := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: model.Name, + }, + Annotations: map[string]string{ + llmazinferenceapi.InferenceServiceFlavorsAnnoKey: "test-flavor-2", + }, + }, + }) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "gpu-vendor-b-instance-type")) + }) + It("should schedule to first available instance type if some inference flavors are not supported by node pools", func() { + model := test.OpenModel(test.OpenModelOptions{ + Flavors: []llmazcoreapi.Flavor{ + { + Name: "test-flavor-1", + NodeSelector: map[string]string{ + corev1.LabelInstanceTypeStable: "unavailable", + }, + }, + { + Name: "test-flavor-2", + NodeSelector: map[string]string{ + corev1.LabelInstanceTypeStable: "gpu-vendor-b-instance-type", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, test.NodePool(), model) + pod := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: model.Name, + }, + }, + }) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "gpu-vendor-b-instance-type")) + }) + It("shouldn't schedule to the in-flight node claim even if the node claim is compatible with second inference flavor when the first inference flavor is supported by node pools", func() { + model := test.OpenModel(test.OpenModelOptions{ + Flavors: []llmazcoreapi.Flavor{ + { + Name: "test-flavor-1", + NodeSelector: map[string]string{ + corev1.LabelInstanceTypeStable: "gpu-vendor-instance-type", + }, + }, + { + Name: "test-flavor-2", + NodeSelector: map[string]string{ + corev1.LabelInstanceTypeStable: "gpu-vendor-b-instance-type", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, test.NodePool(), model) + pod1 := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: model.Name, + }, + Annotations: map[string]string{ + llmazinferenceapi.InferenceServiceFlavorsAnnoKey: "test-flavor-2", + }, + }, + }) + pod2 := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: model.Name, + }, + }, + }) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod1, pod2) + node1 := ExpectScheduled(ctx, env.Client, pod1) + Expect(node1.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "gpu-vendor-b-instance-type")) + node2 := ExpectScheduled(ctx, env.Client, pod2) + Expect(node2.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "gpu-vendor-instance-type")) + }) + It("should schedule to the in-flight node claim if the node claim is compatible with second inference flavor and the first inference flavor is unsupported by node pools", func() { + model := test.OpenModel(test.OpenModelOptions{ + Flavors: []llmazcoreapi.Flavor{ + { + Name: "test-flavor-1", + NodeSelector: map[string]string{ + corev1.LabelInstanceTypeStable: "unavailable", + }, + }, + { + Name: "test-flavor-2", + NodeSelector: map[string]string{ + corev1.LabelInstanceTypeStable: "gpu-vendor-b-instance-type", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, test.NodePool(), model) + pod1 := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: model.Name, + }, + Annotations: map[string]string{ + llmazinferenceapi.InferenceServiceFlavorsAnnoKey: "test-flavor-2", + }, + }, + }) + pod2 := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: model.Name, + }, + }, + }) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod1, pod2) + node1 := ExpectScheduled(ctx, env.Client, pod1) + Expect(node1.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "gpu-vendor-b-instance-type")) + node2 := ExpectScheduled(ctx, env.Client, pod2) + Expect(node2.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "gpu-vendor-b-instance-type")) + Expect(node2.Name).To(Equal(node1.Name)) + }) + It("should not relax an added service inference node-selector away", func() { + model := test.OpenModel(test.OpenModelOptions{ + Flavors: []llmazcoreapi.Flavor{ + { + Name: "test-flavor", + NodeSelector: map[string]string{ + corev1.LabelInstanceTypeStable: "gpu-vendor-instance-type", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, test.NodePool(), model) + + pod := test.UnschedulablePod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + llmazcoreapi.ModelNameLabelKey: model.Name, + }, + }, + NodeRequirements: []corev1.NodeSelectorRequirement{ + { + Key: "example.com/label", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"unsupported"}, + }, + }, + }) + + // Add the second capacity type that is OR'd with the first. Previously we only added the service inference requirement + // to a single node selector term which would sometimes get relaxed away. Now we add it to all of them to AND + // it with each existing term. + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = append(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: v1.CapacityTypeLabelKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{v1.CapacityTypeOnDemand}, + }, + }, + }) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + node := ExpectScheduled(ctx, env.Client, pod) + Expect(node.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "gpu-vendor-instance-type")) + }) + }) }) func ExpectNodeClaimRequirements(nodeClaim *v1.NodeClaim, requirements ...corev1.NodeSelectorRequirement) { diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 3885d5ce13..c45050ebb0 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -60,6 +60,8 @@ import ( "sigs.k8s.io/karpenter/pkg/operator/logging" "sigs.k8s.io/karpenter/pkg/operator/options" "sigs.k8s.io/karpenter/pkg/utils/env" + + llmazcoreapi "github.com/inftyai/llmaz/api/core/v1alpha1" ) const ( @@ -192,7 +194,7 @@ func NewOperator() (context.Context, *Operator) { return lo.Ternary(mgr.GetCache().WaitForCacheSync(req.Context()), nil, fmt.Errorf("failed to sync caches")) })) lo.Must0(mgr.AddReadyzCheck("crd", func(_ *http.Request) error { - objects := []client.Object{&v1.NodePool{}, &v1.NodeClaim{}} + objects := []client.Object{&v1.NodePool{}, &v1.NodeClaim{}, &llmazcoreapi.OpenModel{}} for _, obj := range objects { gvk, err := apiutil.GVKForObject(obj, scheme.Scheme) if err != nil { diff --git a/pkg/test/crds/apis.go b/pkg/test/crds/apis.go new file mode 100644 index 0000000000..d2e975dca4 --- /dev/null +++ b/pkg/test/crds/apis.go @@ -0,0 +1,32 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package crds + +import ( + _ "embed" + + "github.com/awslabs/operatorpkg/object" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" +) + +var ( + //go:embed openmodel-crd.yaml + OpenModelCRD []byte + CRDs = []*apiextensionsv1.CustomResourceDefinition{ + object.Unmarshal[apiextensionsv1.CustomResourceDefinition](OpenModelCRD), + } +) diff --git a/pkg/test/crds/openmodel-crd.yaml b/pkg/test/crds/openmodel-crd.yaml new file mode 100644 index 0000000000..001f81004a --- /dev/null +++ b/pkg/test/crds/openmodel-crd.yaml @@ -0,0 +1,243 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.1 + name: openmodels.llmaz.io +spec: + conversion: + strategy: Webhook + webhook: + clientConfig: + service: + name: llmaz-webhook-service + namespace: llmaz-system + path: /convert + conversionReviewVersions: + - v1 + group: llmaz.io + names: + kind: OpenModel + listKind: OpenModelList + plural: openmodels + shortNames: + - om + singular: openmodel + scope: Cluster + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: OpenModel is the Schema for the open models API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: ModelSpec defines the desired state of Model + properties: + familyName: + description: |- + FamilyName represents the model type, like llama2, which will be auto injected + to the labels with the key of `llmaz.io/model-family-name`. + type: string + inferenceConfig: + description: InferenceConfig represents the inference configurations + for the model. + properties: + flavors: + description: |- + Flavors represents the accelerator requirements to serve the model. + Flavors are fungible following the priority represented by the slice order. + items: + description: |- + Flavor defines the accelerator requirements for a model and the necessary parameters + in autoscaling. Right now, it will be used in two places: + - Pod scheduling with node selectors specified. + - Cluster autoscaling with essential parameters provided. + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Limits defines the required accelerators to serve the model for each replica, + like . For multi-hosts cases, the limits here indicates + the resource requirements for each replica, usually equals to the TP size. + Not recommended to set the cpu and memory usage here: + - if using playground, you can define the cpu/mem usage at backendConfig. + - if using inference service, you can define the cpu/mem at the container resources. + However, if you define the same accelerator resources at playground/service as well, + the resources will be overwritten by the flavor limit here. + type: object + name: + description: Name represents the flavor name, which will + be used in model claim. + type: string + nodeSelector: + additionalProperties: + type: string + description: |- + NodeSelector represents the node candidates for Pod placements, if a node doesn't + meet the nodeSelector, it will be filtered out in the resourceFungibility scheduler plugin. + If nodeSelector is empty, it means every node is a candidate. + type: object + params: + additionalProperties: + type: string + description: |- + Params stores other useful parameters and will be consumed by cluster-autoscaler / Karpenter + for autoscaling or be defined as model parallelism parameters like TP or PP size. + E.g. with autoscaling, when scaling up nodes with 8x Nvidia A00, the parameter can be injected + with for AWS. + Preset parameters: TP, PP, INSTANCE-TYPE. + type: object + required: + - name + type: object + maxItems: 8 + type: array + type: object + source: + description: |- + Source represents the source of the model, there're several ways to load + the model such as loading from huggingface, OCI registry, s3, host path and so on. + properties: + modelHub: + description: ModelHub represents the model registry for model + downloads. + properties: + allowPatterns: + description: AllowPatterns refers to files matched with at + least one pattern will be downloaded. + items: + type: string + type: array + filename: + description: |- + Filename refers to a specified model file rather than the whole repo. + This is helpful to download a specified GGUF model rather than downloading + the whole repo which includes all kinds of quantized models. + in the near future. + Note: once filename is set, allowPatterns and ignorePatterns should be left unset. + type: string + ignorePatterns: + description: IgnorePatterns refers to files matched with any + of the patterns will not be downloaded. + items: + type: string + type: array + modelID: + description: |- + ModelID refers to the model identifier on model hub, + such as meta-llama/Meta-Llama-3-8B. + type: string + name: + default: Huggingface + description: Name refers to the model registry, such as huggingface. + enum: + - Huggingface + - ModelScope + type: string + revision: + default: main + description: Revision refers to a Git revision id which can + be a branch name, a tag, or a commit hash. + type: string + type: object + uri: + description: |- + URI represents a various kinds of model sources following the uri protocol, protocol://
, e.g. + - oss://./ + - ollama://llama3.3 + - host:// + type: string + type: object + required: + - familyName + - source + type: object + status: + description: ModelStatus defines the observed state of Model + properties: + conditions: + description: Conditions represents the Inference condition. + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + type: object + type: object + served: true + storage: true + subresources: + status: {} + diff --git a/pkg/test/nodes.go b/pkg/test/nodes.go index 4e8c9c5bb8..b71b767e36 100644 --- a/pkg/test/nodes.go +++ b/pkg/test/nodes.go @@ -54,7 +54,7 @@ func Node(overrides ...NodeOptions) *corev1.Node { } return &corev1.Node{ - ObjectMeta: ObjectMeta(options.ObjectMeta), + ObjectMeta: NamespacedObjectMeta(options.ObjectMeta), Spec: corev1.NodeSpec{ Unschedulable: options.Unschedulable, Taints: options.Taints, diff --git a/pkg/test/openmodel.go b/pkg/test/openmodel.go new file mode 100644 index 0000000000..d897c02339 --- /dev/null +++ b/pkg/test/openmodel.go @@ -0,0 +1,50 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "fmt" + + "github.com/imdario/mergo" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + llmazcoreapi "github.com/inftyai/llmaz/api/core/v1alpha1" +) + +type OpenModelOptions struct { + metav1.ObjectMeta + + Flavors []llmazcoreapi.Flavor +} + +func OpenModel(overrides ...OpenModelOptions) *llmazcoreapi.OpenModel { + options := OpenModelOptions{} + for _, opts := range overrides { + if err := mergo.Merge(&options, opts, mergo.WithOverride); err != nil { + panic(fmt.Sprintf("Failed to merge options: %s", err)) + } + } + + return &llmazcoreapi.OpenModel{ + ObjectMeta: ObjectMeta(options.ObjectMeta), + Spec: llmazcoreapi.ModelSpec{ + InferenceConfig: &llmazcoreapi.InferenceConfig{ + Flavors: options.Flavors, + }, + }, + } +} diff --git a/pkg/utils/pod/pod.go b/pkg/utils/pod/pod.go new file mode 100644 index 0000000000..cb7803e2b3 --- /dev/null +++ b/pkg/utils/pod/pod.go @@ -0,0 +1,34 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func NodeForPod(ctx context.Context, c client.Client, p *corev1.Pod) (*corev1.Node, error) { + node := &corev1.Node{} + if err := c.Get(ctx, client.ObjectKey{Name: p.Spec.NodeName}, node); err != nil { + return nil, fmt.Errorf("getting node, %w", err) + } + return node, nil +} diff --git a/test/pkg/environment/common/expectations.go b/test/pkg/environment/common/expectations.go index 55581943d9..857ef53865 100644 --- a/test/pkg/environment/common/expectations.go +++ b/test/pkg/environment/common/expectations.go @@ -26,7 +26,6 @@ import ( "time" "github.com/awslabs/operatorpkg/object" - "github.com/awslabs/operatorpkg/status" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/samber/lo" @@ -116,29 +115,34 @@ func (env *Environment) ExpectStatusUpdated(objects ...client.Object) { } } -func (env *Environment) ExpectNodeClassCondition(nodeclass *unstructured.Unstructured, conditions []status.Condition) *unstructured.Unstructured { +func (env *Environment) ExpectReplaceNodeClassCondition(nodeclass *unstructured.Unstructured, condition metav1.Condition) *unstructured.Unstructured { result := nodeclass.DeepCopy() + updateStatusCondition := []metav1.Condition{condition} - err := unstructured.SetNestedSlice(result.Object, lo.Map(conditions, func(condition status.Condition, _ int) interface{} { - b := map[string]interface{}{} - if condition.Type != "" { - b["type"] = condition.Type - } - if condition.Reason != "" { - b["reason"] = condition.Reason - } - if condition.Status != "" { - b["status"] = string(condition.Status) - } - if condition.Message != "" { - b["message"] = condition.Message - } - if !condition.LastTransitionTime.IsZero() { - b["lastTransitionTime"] = condition.LastTransitionTime.Format(time.RFC3339) - } - if condition.ObservedGeneration != 0 { - b["observedGeneration"] = condition.ObservedGeneration + tt, _, _ := unstructured.NestedSlice(result.Object, "status", "conditions") + for _, t := range tt { + cond := t.(map[string]interface{}) + if cond["type"].(string) == condition.Type { + continue } + updateStatusCondition = append(updateStatusCondition, metav1.Condition{ + Type: cond["type"].(string), + Status: metav1.ConditionStatus(cond["status"].(string)), + LastTransitionTime: metav1.Unix(lo.Must(time.Parse(time.RFC3339, cond["lastTransitionTime"].(string))).Unix(), 0), + Reason: cond["reason"].(string), + Message: cond["message"].(string), + ObservedGeneration: cond["observedGeneration"].(int64), + }) + } + + err := unstructured.SetNestedSlice(result.Object, lo.Map(updateStatusCondition, func(condition metav1.Condition, _ int) interface{} { + b := map[string]interface{}{} + b["type"] = condition.Type + b["reason"] = condition.Reason + b["status"] = string(condition.Status) + b["message"] = condition.Message + b["lastTransitionTime"] = condition.LastTransitionTime.Format(time.RFC3339) + b["observedGeneration"] = condition.ObservedGeneration return b }), "status", "conditions") Expect(err).To(BeNil()) @@ -941,7 +945,7 @@ func (env *Environment) ExpectBlockNodeRegistration() { // 3. Creates a binding for the admission policy to enforce the validation // // Note: Requires Kubernetes version 1.28+ to function properly. -func (env *Environment) ExpectBlockNodeClassStatus(obj *unstructured.Unstructured) { +func (env *Environment) ExpectBlockNodeClassStatus(nodeClass *unstructured.Unstructured) { GinkgoHelper() version, err := env.KubeClient.Discovery().ServerVersion() @@ -967,9 +971,9 @@ func (env *Environment) ExpectBlockNodeClassStatus(obj *unstructured.Unstructure RuleWithOperations: admissionregistrationv1.RuleWithOperations{ Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.Update}, Rule: admissionregistrationv1.Rule{ - APIGroups: []string{object.GVK(obj).Group}, - APIVersions: []string{object.GVK(obj).Version}, - Resources: []string{strings.ToLower(object.GVK(obj).Kind) + "es/status"}, + APIGroups: []string{object.GVK(nodeClass).Group}, + APIVersions: []string{object.GVK(nodeClass).Version}, + Resources: []string{strings.ToLower(object.GVK(nodeClass).Kind) + "es/status"}, }, }, }, @@ -977,7 +981,11 @@ func (env *Environment) ExpectBlockNodeClassStatus(obj *unstructured.Unstructure }, Validations: []admissionregistrationv1.Validation{ { - Expression: "false", + // Blocks status condition updates that lack the 'TestingNotReady' reason field. + // This prevents the Karpenter controller from modifying status conditions + // while allowing our test suite to make updates. This provides a deterministic + // mechanism for E2E tests to update NodeClass conditions. + Expression: "object.status.conditions.filter(c, c.type == 'Ready').all(c, c.reason == 'TestingNotReady')", }, }, }, @@ -998,6 +1006,22 @@ func (env *Environment) ExpectBlockNodeClassStatus(obj *unstructured.Unstructure } // Create both the policy and binding in the cluster env.ExpectCreated(admissionspolicy, admissionspolicybinding) + + // Wait for the admission policy to become active + // Note: There can be a delay between resource creation and policy enforcement + // We use a dry-run nodeclass status update attempt to verify the policy is active + nodeClass = env.ExpectReplaceNodeClassCondition(nodeClass, metav1.Condition{ + Type: "Ready", + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.Now(), + ObservedGeneration: nodeClass.GetGeneration(), + Reason: "NotReady", + Message: "NotReady", + }) + By("Validating the admission policy is applied") + Eventually(func(g Gomega) { + g.Expect(env.Client.Status().Update(env, nodeClass, client.DryRunAll)).ToNot(Succeed()) + }).Should(Succeed()) } func (env *Environment) ConsistentlyExpectNodeClaimsNotDrifted(duration time.Duration, nodeClaims ...*v1.NodeClaim) { diff --git a/test/pkg/environment/common/setup.go b/test/pkg/environment/common/setup.go index ccf0f120f1..a7982a646b 100644 --- a/test/pkg/environment/common/setup.go +++ b/test/pkg/environment/common/setup.go @@ -144,7 +144,15 @@ func (env *Environment) PrintCluster() { func (env *Environment) CleanupObjects(cleanableObjects ...client.Object) { time.Sleep(time.Second) // wait one second to let the caches get up-to-date for deletion wg := sync.WaitGroup{} + version, err := env.KubeClient.Discovery().ServerVersion() + Expect(err).To(BeNil()) for _, obj := range append(cleanableObjects, env.DefaultNodeClass.DeepCopy()) { + if version.Minor < "30" && + obj.GetObjectKind().GroupVersionKind().Kind == "ValidatingAdmissionPolicy" && + obj.GetObjectKind().GroupVersionKind().Kind == "ValidatingAdmissionPolicyBinding" { + continue + } + wg.Add(1) go func(obj client.Object) { defer wg.Done() diff --git a/test/suites/regression/nodeclaim_test.go b/test/suites/regression/nodeclaim_test.go index 7603d0ad9b..161e4c4998 100644 --- a/test/suites/regression/nodeclaim_test.go +++ b/test/suites/regression/nodeclaim_test.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/karpenter/pkg/utils/resources" @@ -30,7 +31,6 @@ import ( . "github.com/onsi/gomega" "github.com/awslabs/operatorpkg/object" - "github.com/awslabs/operatorpkg/status" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" @@ -248,19 +248,25 @@ var _ = Describe("NodeClaim", func() { }) It("should delete a NodeClaim if it references a NodeClass that isn't Ready", func() { env.ExpectCreated(nodeClass) - nodeClass = env.ExpectNodeClassCondition(nodeClass, []status.Condition{ - { - Type: "Ready", - Status: metav1.ConditionFalse, - LastTransitionTime: metav1.Now(), - Reason: "NotReady", - Message: "NodeClass is not ready", - }, + By("Validating the NodeClass status condition has been reconciled") + Eventually(func(g Gomega) { + g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(nodeClass), nodeClass)).To(Succeed()) + _, found, err := unstructured.NestedSlice(nodeClass.Object, "status", "conditions") + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(found).To(BeTrue()) + }, 5*time.Second).Should(Succeed()) + + env.ExpectBlockNodeClassStatus(nodeClass) + nodeClass = env.ExpectReplaceNodeClassCondition(nodeClass, metav1.Condition{ + Type: "Ready", + Status: metav1.ConditionFalse, + LastTransitionTime: metav1.Now(), + ObservedGeneration: nodeClass.GetGeneration(), + Reason: "TestingNotReady", + Message: "NodeClass is not ready", }) env.ExpectStatusUpdated(nodeClass) - env.ExpectBlockNodeClassStatus(nodeClass) - // TODO: better not to have this but this suite runs quickly as is and this solves for multiple cloudproviders - time.Sleep(10 * time.Second) + nodeClaim := test.NodeClaim(v1.NodeClaim{ Spec: v1.NodeClaimSpec{ Requirements: requirements,