diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index 25c65afe26..f4f5c9247f 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -54,7 +54,8 @@ const ( ) var ( - port string = env.GetEnvString("E2E_PORT", "30080", ginkgo.GinkgoLogr) + port string = env.GetEnvString("E2E_PORT", "30080", ginkgo.GinkgoLogr) + metricsPort string = env.GetEnvString("E2E_METRICS_PORT", "32090", ginkgo.GinkgoLogr) testConfig *testutils.TestConfig @@ -80,7 +81,8 @@ var ( infPoolObjects []string createdNameSpace bool - portForwardSession *gexec.Session + portForwardSession *gexec.Session + eppPortForwardSession *gexec.Session ) func TestEndToEnd(t *testing.T) { @@ -115,6 +117,10 @@ var _ = ginkgo.AfterSuite(func() { portForwardSession.Terminate() } + if eppPortForwardSession != nil { + eppPortForwardSession.Terminate() + } + // cleanup created objects ginkgo.By("Deleting created Kubernetes objects") testutils.DeleteObjects(testConfig, infPoolObjects) @@ -149,6 +155,7 @@ func setupK8sCluster() { gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) }() clusterConfig := strings.ReplaceAll(kindClusterConfig, "${PORT}", port) + clusterConfig = strings.ReplaceAll(clusterConfig, "${METRICS_PORT}", metricsPort) _, err := io.WriteString(stdin, clusterConfig) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) }() @@ -284,6 +291,22 @@ func createInferencePool(numTargetPorts int, toDelete bool) []string { return testutils.CreateObjsFromYaml(testConfig, infPoolYaml) } +func startEPPMetricsPortForward() { + pods, err := testConfig.KubeCli.CoreV1().Pods(nsName).List(testConfig.Context, metav1.ListOptions{ + LabelSelector: "app=e2e-epp", + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(pods.Items).NotTo(gomega.BeEmpty()) + + eppPodName := pods.Items[0].Name + command := exec.Command("kubectl", "port-forward", "pod/"+eppPodName, metricsPort+":9090", + "--context="+k8sContext, "--namespace="+nsName) + eppPortForwardSession, err = gexec.Start(command, ginkgo.GinkgoWriter, ginkgo.GinkgoWriter) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + // Give it a moment to establish + time.Sleep(3 * time.Second) +} + const kindClusterConfig = ` kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 @@ -295,4 +318,7 @@ nodes: - containerPort: 30081 hostPort: 30081 protocol: TCP + - containerPort: 32090 + hostPort: ${METRICS_PORT} + protocol: TCP ` diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 4fcf3ccc10..d23b20a58d 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -2,8 +2,10 @@ package e2e import ( "fmt" + "io" "net/http" "strconv" + "strings" "time" "github.com/onsi/ginkgo/v2" @@ -76,6 +78,13 @@ var _ = ginkgo.Describe("Run end to end tests", ginkgo.Ordered, func() { epp := createEndPointPicker(pdConfig) + metricsURL := fmt.Sprintf("http://localhost:%s/metrics", metricsPort) + + if k8sContext != "" { + // Use port-forward to access the EPP pod's metrics endpoint. + startEPPMetricsPortForward() + } + prefillPods, decodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector) gomega.Expect(prefillPods).Should(gomega.HaveLen(prefillReplicas)) gomega.Expect(decodePods).Should(gomega.HaveLen(decodeReplicas)) @@ -110,6 +119,16 @@ var _ = ginkgo.Describe("Run end to end tests", ginkgo.Ordered, func() { gomega.Expect(podHdr).Should(gomega.BeElementOf(decodePods)) gomega.Expect(podHdr).Should(gomega.Equal(podHdrChat)) + // Metrics Validation + labelFilter := fmt.Sprintf(`decision_type="prefill-decode",model_name="%s"`, modelName) + prefillDecodeCount := getCounterMetric(metricsURL, "llm_d_inference_scheduler_pd_decision_total", labelFilter) + + labelFilter2 := fmt.Sprintf(`decision_type="decode-only",model_name="%s"`, modelName) + decodeOnlyCount := getCounterMetric(metricsURL, "llm_d_inference_scheduler_pd_decision_total", labelFilter2) + + gomega.Expect(prefillDecodeCount).Should(gomega.Equal(6)) + gomega.Expect(decodeOnlyCount).Should(gomega.Equal(0)) + testutils.DeleteObjects(testConfig, epp) testutils.DeleteObjects(testConfig, modelServers) }) @@ -383,6 +402,33 @@ func runChatCompletion(prompt string) (string, string, string) { return namespaceHeader, podHeader, podPort } +// getCounterMetric fetches the current value of a Prometheus counter metric from the given metrics URL. +func getCounterMetric(metricsURL, metricName, labelMatch string) int { + resp, err := http.Get(metricsURL) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer func() { + err = resp.Body.Close() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + }() + gomega.Expect(resp.StatusCode).Should(gomega.Equal(http.StatusOK)) + + body, err := io.ReadAll(resp.Body) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + metricsText := string(body) + for _, line := range strings.Split(metricsText, "\n") { + if strings.HasPrefix(line, metricName) && strings.Contains(line, labelMatch) { + fields := strings.Fields(line) + if len(fields) >= 2 { + valFloat, err := strconv.ParseFloat(fields[len(fields)-1], 64) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + return int(valFloat) + } + } + } + return 0 +} + // Simple EPP configuration for running without P/D const simpleConfig = `apiVersion: inference.networking.x-k8s.io/v1alpha1 kind: EndpointPickerConfig diff --git a/test/e2e/yaml/deployments.yaml b/test/e2e/yaml/deployments.yaml index 4d844e32c4..09cacdc176 100644 --- a/test/e2e/yaml/deployments.yaml +++ b/test/e2e/yaml/deployments.yaml @@ -35,6 +35,7 @@ spec: - "9003" - --config-file - "/etc/epp/epp-config.yaml" + - --metrics-endpoint-auth=false env: - name: PYTHONHASHSEED value: "42" diff --git a/test/e2e/yaml/services.yaml b/test/e2e/yaml/services.yaml index 76a54e8e77..1e4dbd5d56 100644 --- a/test/e2e/yaml/services.yaml +++ b/test/e2e/yaml/services.yaml @@ -33,3 +33,18 @@ spec: nodePort: 30081 appProtocol: http2 type: NodePort +--- +apiVersion: v1 +kind: Service +metadata: + name: e2e-epp-metrics +spec: + selector: + app: e2e-epp + ports: + - name: metrics + protocol: TCP + port: 9090 + targetPort: 9090 + nodePort: 32090 + type: NodePort