Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions test/e2e/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ const (
)

var (
port string = env.GetEnvString("E2E_PORT", "30080", ginkgo.GinkgoLogr)
port string = env.GetEnvString("E2E_PORT", "30080", ginkgo.GinkgoLogr)
metricsPort string = env.GetEnvString("E2E_METRICS_PORT", "32090", ginkgo.GinkgoLogr)

testConfig *testutils.TestConfig

Expand All @@ -80,7 +81,8 @@ var (
infPoolObjects []string
createdNameSpace bool

portForwardSession *gexec.Session
portForwardSession *gexec.Session
eppPortForwardSession *gexec.Session
)

func TestEndToEnd(t *testing.T) {
Expand Down Expand Up @@ -115,6 +117,10 @@ var _ = ginkgo.AfterSuite(func() {
portForwardSession.Terminate()
}

if eppPortForwardSession != nil {
eppPortForwardSession.Terminate()
}

// cleanup created objects
ginkgo.By("Deleting created Kubernetes objects")
testutils.DeleteObjects(testConfig, infPoolObjects)
Expand Down Expand Up @@ -149,6 +155,7 @@ func setupK8sCluster() {
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}()
clusterConfig := strings.ReplaceAll(kindClusterConfig, "${PORT}", port)
clusterConfig = strings.ReplaceAll(clusterConfig, "${METRICS_PORT}", metricsPort)
_, err := io.WriteString(stdin, clusterConfig)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}()
Expand Down Expand Up @@ -284,6 +291,22 @@ func createInferencePool(numTargetPorts int, toDelete bool) []string {
return testutils.CreateObjsFromYaml(testConfig, infPoolYaml)
}

func startEPPMetricsPortForward() {
pods, err := testConfig.KubeCli.CoreV1().Pods(nsName).List(testConfig.Context, metav1.ListOptions{
LabelSelector: "app=e2e-epp",
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(pods.Items).NotTo(gomega.BeEmpty())

eppPodName := pods.Items[0].Name
command := exec.Command("kubectl", "port-forward", "pod/"+eppPodName, metricsPort+":9090",
"--context="+k8sContext, "--namespace="+nsName)
eppPortForwardSession, err = gexec.Start(command, ginkgo.GinkgoWriter, ginkgo.GinkgoWriter)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
// Give it a moment to establish
time.Sleep(3 * time.Second)
}

const kindClusterConfig = `
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
Expand All @@ -295,4 +318,7 @@ nodes:
- containerPort: 30081
hostPort: 30081
protocol: TCP
- containerPort: 32090
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add host port mapping

hostPort: ${METRICS_PORT}
protocol: TCP
`
46 changes: 46 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package e2e

import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

"github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -76,6 +78,13 @@ var _ = ginkgo.Describe("Run end to end tests", ginkgo.Ordered, func() {

epp := createEndPointPicker(pdConfig)

metricsURL := fmt.Sprintf("http://localhost:%s/metrics", metricsPort)

if k8sContext != "" {
// Use port-forward to access the EPP pod's metrics endpoint.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use port-forward

startEPPMetricsPortForward()
}

prefillPods, decodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector)
gomega.Expect(prefillPods).Should(gomega.HaveLen(prefillReplicas))
gomega.Expect(decodePods).Should(gomega.HaveLen(decodeReplicas))
Expand Down Expand Up @@ -110,6 +119,16 @@ var _ = ginkgo.Describe("Run end to end tests", ginkgo.Ordered, func() {
gomega.Expect(podHdr).Should(gomega.BeElementOf(decodePods))
gomega.Expect(podHdr).Should(gomega.Equal(podHdrChat))

// Metrics Validation
labelFilter := fmt.Sprintf(`decision_type="prefill-decode",model_name="%s"`, modelName)
prefillDecodeCount := getCounterMetric(metricsURL, "llm_d_inference_scheduler_pd_decision_total", labelFilter)

labelFilter2 := fmt.Sprintf(`decision_type="decode-only",model_name="%s"`, modelName)
decodeOnlyCount := getCounterMetric(metricsURL, "llm_d_inference_scheduler_pd_decision_total", labelFilter2)

gomega.Expect(prefillDecodeCount).Should(gomega.Equal(6))
gomega.Expect(decodeOnlyCount).Should(gomega.Equal(0))

testutils.DeleteObjects(testConfig, epp)
testutils.DeleteObjects(testConfig, modelServers)
})
Expand Down Expand Up @@ -383,6 +402,33 @@ func runChatCompletion(prompt string) (string, string, string) {
return namespaceHeader, podHeader, podPort
}

// getCounterMetric fetches the current value of a Prometheus counter metric from the given metrics URL.
func getCounterMetric(metricsURL, metricName, labelMatch string) int {
resp, err := http.Get(metricsURL)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer func() {
err = resp.Body.Close()
gomega.Expect(err).ToNot(gomega.HaveOccurred())
}()
gomega.Expect(resp.StatusCode).Should(gomega.Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())

metricsText := string(body)
for _, line := range strings.Split(metricsText, "\n") {
if strings.HasPrefix(line, metricName) && strings.Contains(line, labelMatch) {
fields := strings.Fields(line)
if len(fields) >= 2 {
valFloat, err := strconv.ParseFloat(fields[len(fields)-1], 64)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
return int(valFloat)
}
}
}
return 0
}

// Simple EPP configuration for running without P/D
const simpleConfig = `apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
Expand Down
1 change: 1 addition & 0 deletions test/e2e/yaml/deployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ spec:
- "9003"
- --config-file
- "/etc/epp/epp-config.yaml"
- --metrics-endpoint-auth=false
env:
- name: PYTHONHASHSEED
value: "42"
Expand Down
15 changes: 15 additions & 0 deletions test/e2e/yaml/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,18 @@ spec:
nodePort: 30081
appProtocol: http2
type: NodePort
---
apiVersion: v1
kind: Service
metadata:
name: e2e-epp-metrics
spec:
selector:
app: e2e-epp
ports:
- name: metrics
protocol: TCP
port: 9090
targetPort: 9090
nodePort: 32090
type: NodePort