From 69e608a53a4f326eda0c00063d20950b441879b0 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Thu, 16 Sep 2021 07:57:16 -0700 Subject: [PATCH] feat!: Remove `SunkMessages` condition. Add `sinks_total` metric. (#328) --- api/v1alpha1/const.go | 9 +-- api/v1alpha1/source_statuses.go | 19 ------- config/apps/stan.yaml | 4 ++ config/apps/stan/stan-statefulset.yaml | 6 ++ docs/METRICS.md | 4 ++ examples/main.go | 3 +- go.mod | 2 +- go.sum | 2 + manager/controllers/pipeline_controller.go | 10 ++-- runner/sidecar/in.go | 8 ++- runner/sidecar/sinks.go | 9 +++ runner/sidecar/source/stan/stan.go | 40 ++++---------- shared/symbol/symbol.go | 1 - test/db-e2e/db_sink_test.go | 10 ++-- test/db-e2e/db_source_test.go | 8 +-- test/e2e/cat_step_test.go | 12 ++-- test/e2e/container_step_test.go | 4 +- test/e2e/cron_test.go | 4 +- test/e2e/dedupe_test.go | 6 +- test/e2e/expand_step_test.go | 4 +- test/e2e/filter_step_test.go | 4 +- test/e2e/flatten_step_test.go | 4 +- test/e2e/git_step_test.go | 4 +- test/e2e/golang_code_step_test.go | 2 +- test/e2e/http_test.go | 2 +- test/e2e/java_code_step_test.go | 2 +- test/e2e/map_step_test.go | 4 +- test/e2e/messages_test.go | 4 +- test/e2e/metrics_test.go | 26 ++++----- test/e2e/python_code_step_test.go | 2 +- test/e2e/volume_sink_test.go | 4 +- test/e2e/volume_source_test.go | 6 +- test/examples/examples_test.go | 44 +++++++-------- test/fixtures.go | 18 ++++-- test/http-fmea/http_fmea_test.go | 33 ++++++----- test/http-stress/http_stress_test.go | 13 +++-- test/http-stress/test-results.json | 4 +- test/kafka-e2e/kafka_test.go | 28 +++++----- test/kafka-fmea/kafka_fmea_test.go | 30 +++++----- test/kafka-stress/kafka_stress_test.go | 10 ++-- test/kafka-stress/test-results.json | 4 +- test/kafka.go | 27 +++++++++ test/log.go | 4 +- test/matchers.go | 37 +++++++++++++ test/metric.go | 64 ++++++++++++++++++---- test/pipeline.go | 5 +- test/pod.go | 4 +- test/s3-e2e/s3_sink_test.go | 10 ++-- test/s3-e2e/s3_source_test.go | 5 +- test/stan-e2e/stan_test.go | 13 +++-- test/stan-fmea/stan_fmea_test.go | 45 +++++++++------ test/stan-stress/stan_stress_test.go | 10 ++-- test/stan-stress/test-results.json | 4 +- test/stan.go | 27 +++++++++ test/step.go | 28 ---------- test/testapi.go | 6 +- testapi/kafka.go | 35 ++++++++++++ testapi/stan.go | 25 +++++++++ 58 files changed, 475 insertions(+), 287 deletions(-) create mode 100644 test/matchers.go diff --git a/api/v1alpha1/const.go b/api/v1alpha1/const.go index bebe4a1a..c9964b03 100644 --- a/api/v1alpha1/const.go +++ b/api/v1alpha1/const.go @@ -9,12 +9,9 @@ import ( const ( // conditions - ConditionCompleted = "Completed" // the pipeline completed - ConditionRunning = "Running" // added if any step is currently running - // DEPRECATED: This is likely to be removed in future versions as in depends on metrics in source/sink status, and - // we plan to remove that. - ConditionSunkMessages = "SunkMessages" // added if any messages have been written to a sink for any step - ConditionTerminating = "Terminating" // added if any terminator step terminated + ConditionCompleted = "Completed" // the pipeline completed + ConditionRunning = "Running" // added if any step is currently running + ConditionTerminating = "Terminating" // added if any terminator step terminated // container names CtrInit = "init" CtrMain = "main" diff --git a/api/v1alpha1/source_statuses.go b/api/v1alpha1/source_statuses.go index 548ccabb..c1b556fe 100644 --- a/api/v1alpha1/source_statuses.go +++ b/api/v1alpha1/source_statuses.go @@ -89,16 +89,6 @@ func (in SourceStatuses) GetTotal() uint64 { return v } -// DEPRECATED: This is likely to be removed in future versions. -func (in SourceStatuses) AnySunk() bool { - for _, s := range in { - if s.AnySunk() { - return true - } - } - return false -} - // IncrRetries increase the retry_count metrics by 1 // DEPRECATED: This is likely to be removed in future versions. func (in SourceStatuses) IncrRetries(name string, replica int) { @@ -111,12 +101,3 @@ func (in SourceStatuses) IncrRetries(name string, replica int) { x.Metrics[strconv.Itoa(replica)] = m in[name] = x } - -// DEPRECATED: This is likely to be removed in future versions. -func (in SourceStatuses) GetTotalBytes() uint64 { - var v uint64 - for _, s := range in { - v += s.GetTotalBytes() - } - return v -} diff --git a/config/apps/stan.yaml b/config/apps/stan.yaml index ebbaecce..dd255f91 100644 --- a/config/apps/stan.yaml +++ b/config/apps/stan.yaml @@ -231,6 +231,10 @@ spec: name: monitor - containerPort: 7777 name: metrics + readinessProbe: + httpGet: + path: /streaming/channelsz + port: 8222 volumeMounts: - mountPath: /etc/stan-config name: config-volume diff --git a/config/apps/stan/stan-statefulset.yaml b/config/apps/stan/stan-statefulset.yaml index 423cdd8f..ef11a574 100644 --- a/config/apps/stan/stan-statefulset.yaml +++ b/config/apps/stan/stan-statefulset.yaml @@ -15,3 +15,9 @@ spec: args: - -fvN - nats:8222 + containers: + - name: stan + readinessProbe: + httpGet: + port: 8222 + path: /streaming/channelsz \ No newline at end of file diff --git a/docs/METRICS.md b/docs/METRICS.md index ce2661f7..a1563c62 100644 --- a/docs/METRICS.md +++ b/docs/METRICS.md @@ -26,6 +26,10 @@ Only exposed by replica 0. Golden metric type: traffic. +### sinks_total + +Use this to track throughput. Includes retries and errors. + ### sources_errors Use this to track errors. diff --git a/examples/main.go b/examples/main.go index bb9c7655..7e4ce75e 100644 --- a/examples/main.go +++ b/examples/main.go @@ -7,6 +7,7 @@ import ( "strings" . "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" + sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -70,5 +71,5 @@ func getWaitFor(un metav1.Object) string { if v := un.GetAnnotations()["dataflow.argoproj.io/wait-for"]; v != "" { return v } - return ConditionSunkMessages + return ConditionRunning } diff --git a/go.mod b/go.mod index 9428ae2c..4ced0a8b 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/klauspost/compress v1.13.1 // indirect github.com/mattn/go-colorable v0.1.6 // indirect github.com/mitchellh/copystructure v1.1.1 // indirect - github.com/nats-io/nats-streaming-server v0.21.1 // indirect + github.com/nats-io/nats-streaming-server v0.21.1 github.com/nats-io/nats.go v1.11.0 github.com/nats-io/stan.go v0.8.3 github.com/onsi/ginkgo v1.14.1 diff --git a/go.sum b/go.sum index e7cd7081..778f3714 100644 --- a/go.sum +++ b/go.sum @@ -475,6 +475,7 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.9.0 h1:L8nSXQQzAYByakOFMTwpjRoHsMJklur4Gi59b6VivR8= github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lucasb-eyer/go-colorful v1.0.2/go.mod h1:0MS4r+7BZKSJ5mw4/S5MPN+qHFF1fYclkSPilDOKW0s= github.com/lucasb-eyer/go-colorful v1.0.3/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= @@ -555,6 +556,7 @@ github.com/onsi/gomega v1.10.2 h1:aY/nuoWlKJud2J6U0E3NWsjlg+0GtwXxgEqthRdzlcs= github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= diff --git a/manager/controllers/pipeline_controller.go b/manager/controllers/pipeline_controller.go index 5c00f24a..76bd6f89 100644 --- a/manager/controllers/pipeline_controller.go +++ b/manager/controllers/pipeline_controller.go @@ -121,7 +121,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c pending, running, succeeded, failed := 0, 0, 0, 0 newStatus := *pipeline.Status.DeepCopy() newStatus.Phase = dfv1.PipelineUnknown - terminate, sunkMessages := false, false + terminate := false for _, step := range steps.Items { stepName := step.Spec.Name if !pipeline.Spec.HasStep(stepName) { // this happens when a pipeline changes and a step is removed @@ -148,7 +148,6 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c panic("should never happen") } terminate = terminate || step.Status.Phase.Completed() && step.Spec.Terminator - sunkMessages = sunkMessages || step.Status.SinkStatues.AnySunk() } if newStatus.Phase.Completed() { @@ -173,10 +172,9 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c newStatus.Message = strings.Join(ss, ", ") for c, ok := range map[string]bool{ - dfv1.ConditionRunning: newStatus.Phase == dfv1.PipelineRunning, - dfv1.ConditionCompleted: newStatus.Phase.Completed(), - dfv1.ConditionSunkMessages: sunkMessages, - dfv1.ConditionTerminating: terminate, + dfv1.ConditionRunning: newStatus.Phase == dfv1.PipelineRunning, + dfv1.ConditionCompleted: newStatus.Phase.Completed(), + dfv1.ConditionTerminating: terminate, } { if ok { meta.SetStatusCondition(&newStatus.Conditions, metav1.Condition{Type: c, Status: metav1.ConditionTrue, Reason: c}) diff --git a/runner/sidecar/in.go b/runner/sidecar/in.go index ff1f420f..c91e2740 100644 --- a/runner/sidecar/in.go +++ b/runner/sidecar/in.go @@ -76,8 +76,12 @@ func connectIn(ctx context.Context, sink func(context.Context, []byte) error) (f }, nil } else if in.HTTP != nil { logger.Info("HTTP in interface configured") - if err := waitReady(ctx); err != nil { - return nil, err + if len(step.Spec.Sources) > 0 { + if err := waitReady(ctx); err != nil { + return nil, err + } + } else { + logger.Info("not waiting for HTTP to be read, this maybe a generator step and so may never be ready") } addStopHook(waitUnready) return func(ctx context.Context, data []byte) error { diff --git a/runner/sidecar/sinks.go b/runner/sidecar/sinks.go index d5cac960..323dac06 100644 --- a/runner/sidecar/sinks.go +++ b/runner/sidecar/sinks.go @@ -5,6 +5,9 @@ import ( "fmt" "io" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + volumesink "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/volume" s3sink "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/s3" @@ -20,6 +23,11 @@ import ( func connectSinks(ctx context.Context) (func(context.Context, []byte) error, error) { sinks := map[string]sink.Interface{} + totalCounter := promauto.NewCounterVec(prometheus.CounterOpts{ + Subsystem: "sinks", + Name: "total", + Help: "Total number of messages, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sinks_total", + }, []string{"sinkName", "replica"}) for _, s := range step.Spec.Sinks { logger.Info("connecting sink", "sink", sharedutil.MustJSON(s)) sinkName := s.Name @@ -78,6 +86,7 @@ func connectSinks(ctx context.Context) (func(context.Context, []byte) error, err return func(ctx context.Context, msg []byte) error { for sinkName, f := range sinks { + totalCounter.WithLabelValues(sinkName, fmt.Sprint(replica)).Inc() withLock(func() { step.Status.SinkStatues.IncrTotal(sinkName, replica, uint64(len(msg))) }) diff --git a/runner/sidecar/source/stan/stan.go b/runner/sidecar/source/stan/stan.go index a45c47b2..ad899e90 100644 --- a/runner/sidecar/source/stan/stan.go +++ b/runner/sidecar/source/stan/stan.go @@ -8,6 +8,8 @@ import ( "net/http" "time" + "github.com/nats-io/nats-streaming-server/server" + dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" sharedstan "github.com/argoproj-labs/argo-dataflow/runner/sidecar/shared/stan" "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source" @@ -127,13 +129,11 @@ func (s stanSource) Close() error { return s.conn.Close() } -func (s stanSource) GetPending(ctx context.Context) (uint64, error) { - httpClient := http.Client{ - Timeout: time.Second * 3, - } - - type obj = map[string]interface{} +var httpClient = http.Client{ + Timeout: time.Second * 3, +} +func (s stanSource) GetPending(ctx context.Context) (uint64, error) { pendingMessages := func(ctx context.Context, channel, queueNameCombo string) (int64, error) { monitoringEndpoint := fmt.Sprintf("%s/streaming/channelsz?channel=%s&subs=1", s.natsMonitoringURL, channel) req, err := http.NewRequestWithContext(ctx, "GET", monitoringEndpoint, nil) @@ -148,33 +148,17 @@ func (s stanSource) GetPending(ctx context.Context) (uint64, error) { return 0, fmt.Errorf("invalid response: %s", resp.Status) } defer func() { _ = resp.Body.Close() }() - o := make(obj) + o := server.Channelz{} if err := json.NewDecoder(resp.Body).Decode(&o); err != nil { return 0, err } - lastSeq, ok := o["last_seq"].(float64) - if !ok { - return 0, fmt.Errorf("unrecognized last_seq: %v", o["last_seq"]) - } - subs, ok := o["subscriptions"] - if !ok { - return 0, fmt.Errorf("no suscriptions field found in the monitoring endpoint response") - } - maxLastSent := float64(0) - for _, i := range subs.([]interface{}) { - s := i.(obj) - if fmt.Sprintf("%v", s["queue_name"]) != queueNameCombo { - continue - } - lastSent, ok := s["last_sent"].(float64) - if !ok { - return 0, fmt.Errorf("unrecognized last_sent: %v", s["last_sent"]) - } - if lastSent > maxLastSent { - maxLastSent = lastSent + maxLastSent := uint64(0) + for _, s := range o.Subscriptions { + if s.QueueName == queueNameCombo && s.LastSent > maxLastSent { + maxLastSent = s.LastSent } } - return int64(lastSeq) - int64(maxLastSent), nil + return int64(o.LastSeq - maxLastSent), nil } // queueNameCombo := {durableName}:{queueGroup} diff --git a/shared/symbol/symbol.go b/shared/symbol/symbol.go index 82f7494e..08a94538 100644 --- a/shared/symbol/symbol.go +++ b/shared/symbol/symbol.go @@ -4,7 +4,6 @@ const ( Error = "⚠" // Failed = "✖" Pending = "◷" - Rate = "Δ" Total = "Σ" // Running = "●" // Succeeded = "✔" diff --git a/test/db-e2e/db_sink_test.go b/test/db-e2e/db_sink_test.go index f428c0c1..85fbd181 100644 --- a/test/db-e2e/db_sink_test.go +++ b/test/db-e2e/db_sink_test.go @@ -11,10 +11,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/kafka.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/kafka.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml //go:generate kubectl -n argo-dataflow-system apply -f ../../config/apps/mysql.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/stan.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/stan.yaml func TestDBSink(t *testing.T) { defer Setup(t)() @@ -90,8 +90,8 @@ func TestDBSink(t *testing.T) { defer StartPortForward("db-main-0")() SendMessageViaHTTP(`{"message": "hello", "number": 100}`) - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) // TODO: verify the table records. diff --git a/test/db-e2e/db_source_test.go b/test/db-e2e/db_source_test.go index 76f048d5..1eda3813 100644 --- a/test/db-e2e/db_source_test.go +++ b/test/db-e2e/db_source_test.go @@ -102,8 +102,8 @@ func TestDBSource(t *testing.T) { SendMessageViaHTTP(`{"message": "msg2", "number": 102}`) SendMessageViaHTTP(`{"message": "msg3", "number": 103}`) - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(3)) + WaitForSunkMessages() + WaitForTotalSunkMessages(3) DeletePipelines() WaitForPodsToBeDeleted() @@ -158,8 +158,8 @@ func cleanupDB() { defer StartPortForward("cleanup-db-main-0")() SendMessageViaHTTP(`hello`) - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) DeletePipelines() WaitForPodsToBeDeleted() diff --git a/test/e2e/cat_step_test.go b/test/e2e/cat_step_test.go index 907a4878..b0ef31ae 100644 --- a/test/e2e/cat_step_test.go +++ b/test/e2e/cat_step_test.go @@ -10,10 +10,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/kafka.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/stan.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/kafka.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/stan.yaml func TestCatStep(t *testing.T) { defer Setup(t)() @@ -38,8 +38,8 @@ func TestCatStep(t *testing.T) { SendMessageViaHTTP("foo-bar") - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) ExpectLogLine("main", `foo-bar`) diff --git a/test/e2e/container_step_test.go b/test/e2e/container_step_test.go index 6805f6c5..736a9677 100644 --- a/test/e2e/container_step_test.go +++ b/test/e2e/container_step_test.go @@ -36,8 +36,8 @@ func TestContainerStep(t *testing.T) { SendMessageViaHTTP("foo-bar") - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) DeletePipelines() WaitForPodsToBeDeleted() diff --git a/test/e2e/cron_test.go b/test/e2e/cron_test.go index bf5a6bc4..9e323fae 100644 --- a/test/e2e/cron_test.go +++ b/test/e2e/cron_test.go @@ -25,7 +25,9 @@ func TestCronSource(t *testing.T) { }, }) WaitForPipeline() - WaitForPipeline(UntilSunkMessages) + WaitForPod() + defer StartPortForward("cron-main-0")() + WaitForSunkMessages() DeletePipelines() WaitForPodsToBeDeleted() diff --git a/test/e2e/dedupe_test.go b/test/e2e/dedupe_test.go index 9918b067..3c03e742 100644 --- a/test/e2e/dedupe_test.go +++ b/test/e2e/dedupe_test.go @@ -47,10 +47,10 @@ func TestDedupe(t *testing.T) { SendMessageViaHTTP("baz") SendMessageViaHTTP("baz") - WaitForStep(TotalSourceMessages(6)) - WaitForStep(TotalSunkMessages(4)) + WaitForTotalSourceMessages(6) + WaitForTotalSunkMessages(4) - ExpectMetric("duplicate_messages", 2, 8080) + ExpectMetric("duplicate_messages", Eq(2), 8080) DeletePipelines() WaitForPodsToBeDeleted() diff --git a/test/e2e/expand_step_test.go b/test/e2e/expand_step_test.go index 73e618b0..2c2e1370 100644 --- a/test/e2e/expand_step_test.go +++ b/test/e2e/expand_step_test.go @@ -33,8 +33,8 @@ func TestExpandStep(t *testing.T) { SendMessageViaHTTP(`{"foo.bar": "baz"}`) - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) ExpectLogLine("main", `"foo\\":`) diff --git a/test/e2e/filter_step_test.go b/test/e2e/filter_step_test.go index eaa6aff1..15050fd1 100644 --- a/test/e2e/filter_step_test.go +++ b/test/e2e/filter_step_test.go @@ -34,8 +34,8 @@ func TestFilterStep(t *testing.T) { SendMessageViaHTTP("foo-bar") SendMessageViaHTTP("baz-qux") - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) ExpectLogLine("main", `foo-bar`) diff --git a/test/e2e/flatten_step_test.go b/test/e2e/flatten_step_test.go index 4a4b83f8..e277daf5 100644 --- a/test/e2e/flatten_step_test.go +++ b/test/e2e/flatten_step_test.go @@ -33,8 +33,8 @@ func TestFlattenStep(t *testing.T) { SendMessageViaHTTP(`{"foo": {"bar": "baz"}}`) - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) ExpectLogLine("main", `foo.bar`) diff --git a/test/e2e/git_step_test.go b/test/e2e/git_step_test.go index 1f28bcb0..8c492e54 100644 --- a/test/e2e/git_step_test.go +++ b/test/e2e/git_step_test.go @@ -43,8 +43,8 @@ func TestGitStep(t *testing.T) { SendMessageViaHTTP("foo-bar") - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) ExpectLogLine("main", `hi foo-bar`) diff --git a/test/e2e/golang_code_step_test.go b/test/e2e/golang_code_step_test.go index caff2361..e22f1e2f 100644 --- a/test/e2e/golang_code_step_test.go +++ b/test/e2e/golang_code_step_test.go @@ -42,7 +42,7 @@ func Handler(ctx context.Context, m []byte) ([]byte, error) { SendMessageViaHTTP("foo-bar") - WaitForStep(TotalSunkMessages(1)) + WaitForTotalSunkMessages(1) ExpectLogLine("main", `hi! foo-bar`) diff --git a/test/e2e/http_test.go b/test/e2e/http_test.go index 94d1e20d..2d11e891 100644 --- a/test/e2e/http_test.go +++ b/test/e2e/http_test.go @@ -34,7 +34,7 @@ func TestHTTP(t *testing.T) { SendMessageViaHTTP("my-msg") - WaitForPipeline(UntilSunkMessages) + WaitForSunkMessages() WaitForCounter(1, 1) DeletePipelines() diff --git a/test/e2e/java_code_step_test.go b/test/e2e/java_code_step_test.go index 62f096c2..a72ce824 100644 --- a/test/e2e/java_code_step_test.go +++ b/test/e2e/java_code_step_test.go @@ -42,7 +42,7 @@ public class Handler { SendMessageViaHTTP("foo-bar") - WaitForStep(TotalSunkMessages(1)) + WaitForTotalSunkMessages(1) ExpectLogLine("main", `hi! foo-bar`) diff --git a/test/e2e/map_step_test.go b/test/e2e/map_step_test.go index ded5f8bd..4c70917a 100644 --- a/test/e2e/map_step_test.go +++ b/test/e2e/map_step_test.go @@ -33,8 +33,8 @@ func TestMapStep(t *testing.T) { SendMessageViaHTTP("foo-bar") - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) ExpectLogLine("main", `hi! foo-bar`) diff --git a/test/e2e/messages_test.go b/test/e2e/messages_test.go index 4052c89b..7020aacf 100644 --- a/test/e2e/messages_test.go +++ b/test/e2e/messages_test.go @@ -33,8 +33,6 @@ curl -H "Authorization: $(cat /var/run/argo-dataflow/authorization)" http://loca }, }) - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) - + WaitForPipeline() ExpectLogLine("main", `foo-bar`) } diff --git a/test/e2e/metrics_test.go b/test/e2e/metrics_test.go index 9db3d162..5bda7798 100644 --- a/test/e2e/metrics_test.go +++ b/test/e2e/metrics_test.go @@ -41,21 +41,21 @@ func TestMetrics(t *testing.T) { sendHTTPMsag("my-msg") - WaitForStep(NothingPending) - WaitForStep(TotalSourceMessages(1)) - WaitForStep(func(s Step) bool { return s.Status.SinkStatues.GetPending() == 0 }) + WaitForSunkMessages() + WaitForTotalSourceMessages(1) - ExpectMetric("input_inflight", 0) - ExpectMetric("version_major", 0) - ExpectMetric("version_minor", 0) - ExpectMetric("version_patch", 0) - ExpectMetric("replicas", 1) - ExpectMetric("sources_errors", 1) - ExpectMetric("sources_total", 1) - ExpectMetric("sources_retries", 2) - ExpectMetric("sources_totalBytes", 6) + ExpectMetric("input_inflight", Eq(0)) + ExpectMetric("version_major", Eq(0)) + ExpectMetric("version_minor", Eq(0)) + ExpectMetric("version_patch", Eq(0)) + ExpectMetric("replicas", Eq(1)) + ExpectMetric("sources_errors", Eq(1)) + ExpectMetric("sources_total", Eq(1)) + ExpectMetric("sources_retries", Eq(2)) + ExpectMetric("sources_totalBytes", Eq(6)) + ExpectMetric("sinks_total", Eq(3)) sendHTTPMsag("my-msg") - ExpectMetric("sources_totalBytes", 12) + ExpectMetric("sources_totalBytes", Eq(12)) } func sendHTTPMsag(msg string) { diff --git a/test/e2e/python_code_step_test.go b/test/e2e/python_code_step_test.go index 9e570668..061d28de 100644 --- a/test/e2e/python_code_step_test.go +++ b/test/e2e/python_code_step_test.go @@ -38,7 +38,7 @@ func TestPythonCodeStep(t *testing.T) { SendMessageViaHTTP("foo-bar") - WaitForStep(TotalSunkMessages(1)) + WaitForTotalSunkMessages(1) ExpectLogLine("main", `hi! foo-bar`) diff --git a/test/e2e/volume_sink_test.go b/test/e2e/volume_sink_test.go index 2b76534d..c375ba6d 100644 --- a/test/e2e/volume_sink_test.go +++ b/test/e2e/volume_sink_test.go @@ -43,8 +43,8 @@ func Handler(ctx context.Context, m []byte) ([]byte, error) { defer StartPortForward("volume-main-0")() SendMessageViaHTTP("my-msg") - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) DeletePipelines() WaitForPodsToBeDeleted() diff --git a/test/e2e/volume_source_test.go b/test/e2e/volume_source_test.go index 82f3a174..c53ba0f0 100644 --- a/test/e2e/volume_source_test.go +++ b/test/e2e/volume_source_test.go @@ -45,10 +45,12 @@ func TestVolumeSource(t *testing.T) { }, }) + WaitForPipeline() WaitForPod() + defer StartPortForward("volume-main-0")() - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) ExpectLogLine("main", "my-content") diff --git a/test/examples/examples_test.go b/test/examples/examples_test.go index 2da6ed90..9131f144 100644 --- a/test/examples/examples_test.go +++ b/test/examples/examples_test.go @@ -18,7 +18,7 @@ func Test_101_hello_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/101-hello-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -30,7 +30,7 @@ func Test_101_two_node_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/101-two-node-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -42,7 +42,7 @@ func Test_102_filter_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/102-filter-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -54,7 +54,7 @@ func Test_102_flatten_expand_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/102-flatten-expand-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -66,7 +66,7 @@ func Test_102_map_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/102-map-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -78,7 +78,7 @@ func Test_103_autoscaling_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/103-autoscaling-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -90,7 +90,7 @@ func Test_103_scaling_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/103-scaling-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -102,7 +102,7 @@ func Test_104_golang1_16_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/104-golang1-16-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -114,7 +114,7 @@ func Test_104_java16_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/104-java16-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -126,7 +126,7 @@ func Test_104_node16_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/104-node16-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -138,7 +138,7 @@ func Test_104_python3_9_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/104-python3-9-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -150,7 +150,7 @@ func Test_106_git_go_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/106-git-go-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -162,7 +162,7 @@ func Test_106_git_nodejs_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/106-git-nodejs-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -174,7 +174,7 @@ func Test_106_git_python_generator_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/106-git-python-generator-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -186,7 +186,7 @@ func Test_106_git_python_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/106-git-python-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -234,7 +234,7 @@ func Test_108_fifos_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/108-fifos-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -246,7 +246,7 @@ func Test_109_group_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/109-group-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -258,7 +258,7 @@ func Test_301_cron_log_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/301-cron-log-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -272,7 +272,7 @@ func Test_301_http_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/301-http-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -284,7 +284,7 @@ func Test_301_kafka_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/301-kafka-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -296,7 +296,7 @@ func Test_301_two_sinks_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/301-two-sinks-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() @@ -308,7 +308,7 @@ func Test_301_two_sources_pipeline(t *testing.T) { CreatePipelineFromFile("../../examples/301-two-sources-pipeline.yaml") WaitForPipeline() - WaitForPipeline(UntilSunkMessages, 90*time.Second) + WaitForPipeline(UntilRunning, 90*time.Second) DeletePipelines() WaitForPodsToBeDeleted() diff --git a/test/fixtures.go b/test/fixtures.go index f490e41f..f790edf8 100644 --- a/test/fixtures.go +++ b/test/fixtures.go @@ -4,7 +4,6 @@ package test import ( "log" - "os" "runtime/debug" "testing" @@ -26,13 +25,12 @@ var ( stopTestAPIPortForward func() ) -func SkipIfCI(t *testing.T) { - if os.Getenv("CI") == "true" { - t.SkipNow() - } +func init() { + log.Default().SetFlags(0) // no log prefix } func Setup(t *testing.T) (teardown func()) { + log.Printf("\n") DeletePipelines() WaitForPodsToBeDeleted() @@ -40,22 +38,30 @@ func Setup(t *testing.T) (teardown func()) { ResetCount() + log.Printf("\n") log.Printf("🌀 START: %s", t.Name()) + log.Printf("\n") return func() { + log.Printf("\n") stopTestAPIPortForward() + log.Printf("\n") r := recover() // tests should panic on error, we recover so we can run other tests if r != nil { - log.Printf("📄 logs\n") TailLogs() + log.Printf("\n") log.Printf("❌ FAIL: %s %v\n", t.Name(), r) + log.Printf("\n") debug.PrintStack() + log.Printf("\n") t.Fail() } else if t.Failed() { log.Printf("❌ FAIL: %s\n", t.Name()) + log.Printf("\n") TailLogs() } else { log.Printf("✅ PASS: %s\n", t.Name()) + log.Printf("\n") } } } diff --git a/test/http-fmea/http_fmea_test.go b/test/http-fmea/http_fmea_test.go index f1e4f54f..bb4625a7 100644 --- a/test/http-fmea/http_fmea_test.go +++ b/test/http-fmea/http_fmea_test.go @@ -4,17 +4,16 @@ package http_fmea import ( "testing" - "time" . "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" . "github.com/argoproj-labs/argo-dataflow/test" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/kafka.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/stan.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/kafka.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/stan.yaml func TestHTTPFMEA_PodDeletedDisruption_OneReplica(t *testing.T) { defer Setup(t)() @@ -26,7 +25,7 @@ func TestHTTPFMEA_PodDeletedDisruption_OneReplica(t *testing.T) { Name: "main", Cat: &Cat{}, Sources: []Source{{HTTP: &HTTPSource{}}}, - Sinks: []Sink{DefaultLogSink}, + Sinks: []Sink{{HTTP: &HTTPSink{URL: "http://testapi/count/incr"}}}, }}, }, }) @@ -38,13 +37,17 @@ func TestHTTPFMEA_PodDeletedDisruption_OneReplica(t *testing.T) { // with a single replica, if you loose a replica, you loose service go PumpHTTPTolerantly(n) - WaitForPipeline(UntilSunkMessages) + WaitForPod("http-main-0") + stopPortForward := StartPortForward("http-main-0") + WaitForSunkMessages() + stopPortForward() DeletePod("http-main-0") // delete the pod to see that we recover and continue to process messages WaitForPod("http-main-0") - WaitForStep(TotalSourceMessages(n), 1*time.Minute) - WaitForStep(NoErrors) + WaitForCounter(n, n) + defer StartPortForward("http-main-0")() + WaitForNoErrors() } func TestHTTPFMEA_PodDeletedDisruption_TwoReplicas(t *testing.T) { @@ -58,7 +61,7 @@ func TestHTTPFMEA_PodDeletedDisruption_TwoReplicas(t *testing.T) { Cat: &Cat{}, Replicas: 2, Sources: []Source{{HTTP: &HTTPSource{}}}, - Sinks: []Sink{DefaultLogSink}, + Sinks: []Sink{{HTTP: &HTTPSink{URL: "http://testapi/count/incr"}}}, }}, }, }) @@ -73,11 +76,15 @@ func TestHTTPFMEA_PodDeletedDisruption_TwoReplicas(t *testing.T) { PumpHTTPTolerantly(n) - WaitForPipeline(UntilSunkMessages) + stopPortForward := StartPortForward("http-main-0") + WaitForSunkMessages() + stopPortForward() DeletePod("http-main-0") // delete the pod to see that we continue to process messages WaitForPod("http-main-0") - WaitForStep(TotalSunkMessages(n), 1*time.Minute) - WaitForStep(NoErrors) + defer StartPortForward("http-main-0")() + PumpHTTPTolerantly(n) + WaitForCounter(2*n, 2*n) + WaitForNoErrors() } diff --git a/test/http-stress/http_stress_test.go b/test/http-stress/http_stress_test.go index 9f77a6ba..46801fbf 100644 --- a/test/http-stress/http_stress_test.go +++ b/test/http-stress/http_stress_test.go @@ -4,6 +4,7 @@ package http_stress import ( "testing" + "time" . "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" . "github.com/argoproj-labs/argo-dataflow/test" @@ -13,10 +14,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/kafka.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/stan.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/kafka.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/stan.yaml func TestHTTPSourceStress(t *testing.T) { defer Setup(t)() @@ -58,7 +59,7 @@ func TestHTTPSourceStress(t *testing.T) { defer StartTPSReporter(t, "main", prefix, n)() go PumpHTTP("https://http-main/sources/default", prefix, n, Params.MessageSize) - WaitForStep(TotalSunkMessages(n), Params.Timeout) + WaitForTotalSunkMessages(n) } func TestHTTPSinkStress(t *testing.T) { @@ -99,5 +100,5 @@ func TestHTTPSinkStress(t *testing.T) { defer StartTPSReporter(t, "main", prefix, n)() go PumpHTTP("https://http-main/sources/default", prefix, n, Params.MessageSize) - WaitForStep(TotalSunkMessages(n*2), Params.Timeout) + WaitForTotalSunkMessages(n, 3*time.Minute) } diff --git a/test/http-stress/test-results.json b/test/http-stress/test-results.json index 9898b92b..0004b128 100644 --- a/test/http-stress/test-results.json +++ b/test/http-stress/test-results.json @@ -1,10 +1,10 @@ { - "TestHTTPSinkStress/.tps": 500, + "TestHTTPSinkStress/.tps": 950, "TestHTTPSinkStress/N=10,messageSize=100.tps": 0, "TestHTTPSinkStress/N=10,messageSize=1000.tps": 500, "TestHTTPSinkStress/messageSize=1000.tps": 450, "TestHTTPSinkStress/replicas=2.tps": 650, - "TestHTTPSourceStress/.tps": 1050, + "TestHTTPSourceStress/.tps": 1300, "TestHTTPSourceStress/N=10,messageSize=100.tps": 0, "TestHTTPSourceStress/N=10,messageSize=1000.tps": 1050, "TestHTTPSourceStress/N=100000.tps": 1000, diff --git a/test/kafka-e2e/kafka_test.go b/test/kafka-e2e/kafka_test.go index 694e93e1..c5c2d188 100644 --- a/test/kafka-e2e/kafka_test.go +++ b/test/kafka-e2e/kafka_test.go @@ -11,9 +11,9 @@ import ( ) //go:generate kubectl -n argo-dataflow-system apply -f ../../config/apps/kafka.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/stan.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/stan.yaml func TestKafka(t *testing.T) { defer Setup(t)() @@ -37,10 +37,11 @@ func TestKafka(t *testing.T) { PumpKafkaTopic(topic, 17) - WaitForPipeline(UntilSunkMessages) + defer StartPortForward("kafka-main-0")() + WaitForSunkMessages() - WaitForStep(TotalSourceMessages(17)) - WaitForStep(TotalSunkMessages(17)) + WaitForTotalSourceMessages(17) + WaitForTotalSunkMessages(17) DeletePipelines() WaitForPodsToBeDeleted() @@ -68,10 +69,11 @@ func TestKafkaAutoCommit(t *testing.T) { PumpKafkaTopic(topic, 17) - WaitForPipeline(UntilSunkMessages) + defer StartPortForward("kafka-main-0")() + WaitForSunkMessages() - WaitForStep(TotalSourceMessages(17)) - WaitForStep(TotalSunkMessages(17)) + WaitForTotalSourceMessages(17) + WaitForTotalSunkMessages(17) DeletePipelines() WaitForPodsToBeDeleted() @@ -101,12 +103,12 @@ func TestKafkaAsync(t *testing.T) { PumpKafkaTopic(topic, 17) - WaitForPipeline(UntilSunkMessages) + WaitForSunkMessages() - WaitForStep(TotalSourceMessages(17)) - WaitForStep(TotalSunkMessages(17)) + WaitForTotalSourceMessages(17) + WaitForTotalSunkMessages(17) - ExpectMetric("sinks_kafka_produced_successes", 17) + ExpectMetric("sinks_kafka_produced_successes", Eq(17)) DeletePipelines() WaitForPodsToBeDeleted() diff --git a/test/kafka-fmea/kafka_fmea_test.go b/test/kafka-fmea/kafka_fmea_test.go index 10212ac9..f35bc86f 100644 --- a/test/kafka-fmea/kafka_fmea_test.go +++ b/test/kafka-fmea/kafka_fmea_test.go @@ -12,9 +12,9 @@ import ( ) //go:generate kubectl -n argo-dataflow-system apply -f ../../config/apps/kafka.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/stan.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/stan.yaml func TestKafkaFMEA_PodDeletedDisruption(t *testing.T) { defer Setup(t)() @@ -44,8 +44,9 @@ func TestKafkaFMEA_PodDeletedDisruption(t *testing.T) { DeletePod("kafka-main-0") // delete the pod to see that we recover and continue to process messages WaitForPod("kafka-main-0") - WaitForStep(TotalSunkMessagesBetween(n, n+CommitN*2), 2*time.Minute) - WaitForStep(NoErrors) + ExpectKafkaTopicCount(sinkTopic, n, n+CommitN*2, 2*time.Minute) + defer StartPortForward("kafka-main-0")() + WaitForNoErrors() } func TestKafkaFMEA_KafkaServiceDisruption(t *testing.T) { @@ -54,6 +55,7 @@ func TestKafkaFMEA_KafkaServiceDisruption(t *testing.T) { defer Setup(t)() topic := CreateKafkaTopic() + sinkTopic := CreateKafkaTopic() CreatePipeline(Pipeline{ ObjectMeta: metav1.ObjectMeta{Name: "kafka"}, Spec: PipelineSpec{ @@ -61,7 +63,7 @@ func TestKafkaFMEA_KafkaServiceDisruption(t *testing.T) { Name: "main", Cat: &Cat{}, Sources: []Source{{Kafka: &KafkaSource{Kafka: Kafka{Topic: topic}}}}, - Sinks: []Sink{DefaultLogSink}, + Sinks: []Sink{{Kafka: &KafkaSink{Kafka: Kafka{Topic: sinkTopic}}}}, }}, }, }) @@ -76,8 +78,9 @@ func TestKafkaFMEA_KafkaServiceDisruption(t *testing.T) { RestartStatefulSet("kafka-broker") WaitForPod("kafka-broker-0") - WaitForStep(TotalSunkMessages(n), 3*time.Minute) - WaitForStep(NoErrors) + ExpectKafkaTopicCount(sinkTopic, n, n, 2*time.Minute) + defer StartPortForward("kafka-main-0")() + WaitForNoErrors() ExpectLogLine("main", "Failed to connect to broker kafka-broker:9092") } @@ -85,6 +88,7 @@ func TestKafkaFMEA_PipelineDeletedDisruption(t *testing.T) { defer Setup(t)() topic := CreateKafkaTopic() + sinkTopic := CreateKafkaTopic() pl := Pipeline{ ObjectMeta: metav1.ObjectMeta{Name: "kafka"}, @@ -93,10 +97,7 @@ func TestKafkaFMEA_PipelineDeletedDisruption(t *testing.T) { Name: "main", Cat: &Cat{}, Sources: []Source{{Kafka: &KafkaSource{Kafka: Kafka{Topic: topic}}}}, - Sinks: []Sink{ - DefaultLogSink, - {HTTP: &HTTPSink{URL: "http://testapi/count/incr"}}, - }, + Sinks: []Sink{{Kafka: &KafkaSink{Kafka: Kafka{Topic: sinkTopic}}}}, }}, }, } @@ -109,11 +110,12 @@ func TestKafkaFMEA_PipelineDeletedDisruption(t *testing.T) { n := 500 * 15 go PumpKafkaTopic(topic, n) - WaitForPipeline(UntilSunkMessages) + defer StartPortForward("kafka-main-0")() + WaitForSunkMessages() DeletePipelines() WaitForPodsToBeDeleted() CreatePipeline(pl) - WaitForCounter(n, n+CommitN*2) + ExpectKafkaTopicCount(sinkTopic, n, n+CommitN*2, time.Minute) } diff --git a/test/kafka-stress/kafka_stress_test.go b/test/kafka-stress/kafka_stress_test.go index 74114634..d7666edf 100644 --- a/test/kafka-stress/kafka_stress_test.go +++ b/test/kafka-stress/kafka_stress_test.go @@ -14,9 +14,9 @@ import ( ) //go:generate kubectl -n argo-dataflow-system apply -f ../../config/apps/kafka.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/stan.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/stan.yaml func TestKafkaSourceStress(t *testing.T) { defer Setup(t)() @@ -54,7 +54,7 @@ func TestKafkaSourceStress(t *testing.T) { defer StartTPSReporter(t, "main", prefix, n)() go PumpKafkaTopic(topic, n, prefix, Params.MessageSize) - WaitForStep(TotalSunkMessages(n), Params.Timeout) + WaitForTotalSunkMessages(n, Params.Timeout) } func TestKafkaSinkStress(t *testing.T) { @@ -97,5 +97,5 @@ func TestKafkaSinkStress(t *testing.T) { defer StartTPSReporter(t, "main", prefix, n)() go PumpKafkaTopic(topic, n, prefix, Params.MessageSize) - WaitForStep(TotalSunkMessages(n*2), Params.Timeout) // 2 sinks + WaitForTotalSunkMessages(n, Params.Timeout) } diff --git a/test/kafka-stress/test-results.json b/test/kafka-stress/test-results.json index d34851e7..76afd9ff 100644 --- a/test/kafka-stress/test-results.json +++ b/test/kafka-stress/test-results.json @@ -1,11 +1,11 @@ { - "TestKafkaSinkStress/.tps": 350, + "TestKafkaSinkStress/.tps": 650, "TestKafkaSinkStress/N=10,messageSize=100.tps": 200, "TestKafkaSinkStress/N=10,messageSize=1000.tps": 150, "TestKafkaSinkStress/async=true.tps": 400, "TestKafkaSinkStress/messageSize=1000.tps": 300, "TestKafkaSinkStress/replicas=2.tps": 400, - "TestKafkaSourceStress/.tps": 1100, + "TestKafkaSourceStress/.tps": 1250, "TestKafkaSourceStress/N=10,messageSize=100.tps": 450, "TestKafkaSourceStress/N=10,messageSize=1000.tps": 650, "TestKafkaSourceStress/messageSize=1000.tps": 850, diff --git a/test/kafka.go b/test/kafka.go index c4262055..a1e4148c 100644 --- a/test/kafka.go +++ b/test/kafka.go @@ -3,9 +3,11 @@ package test import ( + "context" "fmt" "log" "math/rand" + "strconv" "time" ) @@ -35,3 +37,28 @@ func PumpKafkaTopic(topic string, n int, opts ...interface{}) { log.Printf("puming Kafka topic %q sleeping %v with %d messages sized %d\n", topic, sleep, n, size) InvokeTestAPI("/kafka/pump-topic?topic=%s&sleep=%v&n=%d&prefix=%s&size=%d", topic, sleep, n, prefix, size) } + +func ExpectKafkaTopicCount(topic string, min, max int, timeout time.Duration) { + log.Printf("expecting count of Kafka topic %q to be %d to %d\n", topic, min, max) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for { + select { + case <-ctx.Done(): + panic(fmt.Errorf("timeout waiting for %d to %d messages in topic %q", min, max, topic)) + default: + count, err := strconv.Atoi(InvokeTestAPI("/kafka/count-topic?topic=%s", topic)) + if err != nil { + panic(fmt.Errorf("failed to count topic %q: %w", topic, err)) + } + log.Printf("count of Kafka topic %q is %d\n", topic, count) + if min <= count && count <= max { + return + } + if count > max { + panic(fmt.Errorf("too many messages %d > %d", count, max)) + } + time.Sleep(time.Second) + } + } +} diff --git a/test/log.go b/test/log.go index 1c7085ad..63a7e568 100644 --- a/test/log.go +++ b/test/log.go @@ -134,13 +134,13 @@ func TailLogs() { func tailLogs(podName, containerName string) { ctx := context.Background() - log.Printf("tailing logs for %q/%q\n", podName, containerName) + log.Printf("📄 tailing logs for %q/%q\n", podName, containerName) stream, err := podsInterface.GetLogs(podName, &corev1.PodLogOptions{Container: containerName}).Stream(ctx) if err != nil { panic(err) } defer func() { _ = stream.Close() }() for s := bufio.NewScanner(stream); s.Scan(); { - log.Println(s.Text()) + log.Println("> " + s.Text()) } } diff --git a/test/matchers.go b/test/matchers.go new file mode 100644 index 00000000..2d261846 --- /dev/null +++ b/test/matchers.go @@ -0,0 +1,37 @@ +package test + +import "fmt" + +type matcher struct { + string + match func(w float64) bool +} + +func (m matcher) String() string { return m.string } + +func Eq(v float64) matcher { + return matcher{ + fmt.Sprintf("=%v", v), + func(w float64) bool { + return w == v + }, + } +} + +func Gt(v float64) matcher { + return matcher{ + fmt.Sprintf(">%v", v), + func(w float64) bool { + return w > v + }, + } +} + +func Between(min, max float64) matcher { + return matcher{ + fmt.Sprintf("%v<= && <=%v", min, max), + func(w float64) bool { + return min <= w && w <= max + }, + } +} diff --git a/test/metric.go b/test/metric.go index 69222922..6217046d 100644 --- a/test/metric.go +++ b/test/metric.go @@ -6,37 +6,79 @@ import ( "context" "fmt" "log" + "math/rand" "net/http" + "time" "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" ) -func ExpectMetric(name string, value float64, opts ...interface{}) { +func WaitForNothingPending() { + ExpectMetric("pending", Eq(0)) +} + +func WaitForTotalSourceMessages(v int) { + ExpectMetric("sources_total", Eq(float64(v))) +} + +func WaitForNoErrors() { + ExpectMetric("sources_errors", Eq(missing)) +} + +func WaitForSunkMessages(opts ...interface{}) { + ExpectMetric("sinks_total", Gt(0), opts...) +} + +func WaitForTotalSunkMessages(v int, opts ...interface{}) { + ExpectMetric("sinks_total", Eq(float64(v)), opts...) +} + +var missing = rand.Float64() + +func ExpectMetric(name string, matcher matcher, opts ...interface{}) { ctx := context.Background() port := 3569 + timeout := 30 * time.Second for _, opt := range opts { switch v := opt.(type) { case int: port = v + case time.Duration: + timeout = v default: panic(fmt.Errorf("unsupported option type %T", v)) } } - log.Printf("expect metric %q to be %f on %d\n", name, value, port) - for n, family := range getMetrics(ctx, port) { - if n == name { - for _, m := range family.Metric { - v := getValue(m) - if value == v { - return - } else { - panic(fmt.Errorf("metric %q expected %f, got %f", n, value, v)) + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + log.Printf("expect metric %q to be %s within %v\n", name, matcher, timeout.String()) + for { + select { + case <-ctx.Done(): + println(getMetrics(ctx, port)) + panic(fmt.Errorf("failed to wait for metric named %q to be %s: %w", name, matcher, ctx.Err())) + default: + found := false + for n, family := range getMetrics(ctx, port) { + if n == name { + found = true + for _, m := range family.Metric { + v := getValue(m) + if matcher.match(v) { + return + } else { + log.Printf("%s=%v, !%s\n", name, v, matcher.String()) + } + } } } + if !found && matcher.match(missing) { + return + } + time.Sleep(2 * time.Second) } } - panic(fmt.Errorf("metric named %q not found in %q on %d", name, getMetrics(ctx, port), port)) } func getValue(m *io_prometheus_client.Metric) float64 { diff --git a/test/pipeline.go b/test/pipeline.go index f09c2500..8a59c1f7 100644 --- a/test/pipeline.go +++ b/test/pipeline.go @@ -20,9 +20,8 @@ import ( var pipelineInterface = dynamicInterface.Resource(PipelineGroupVersionResource).Namespace(namespace) -func UntilRunning(pl Pipeline) bool { return untilHasCondition(ConditionRunning)(pl) } -func UntilCompleted(pl Pipeline) bool { return untilHasCondition(ConditionCompleted)(pl) } -func UntilSunkMessages(pl Pipeline) bool { return untilHasCondition(ConditionSunkMessages)(pl) } +func UntilRunning(pl Pipeline) bool { return untilHasCondition(ConditionRunning)(pl) } +func UntilCompleted(pl Pipeline) bool { return untilHasCondition(ConditionCompleted)(pl) } func untilHasCondition(condition string) func(pl Pipeline) bool { return func(pl Pipeline) bool { diff --git a/test/pod.go b/test/pod.go index 0d1aae71..6fcd2189 100644 --- a/test/pod.go +++ b/test/pod.go @@ -18,12 +18,12 @@ import ( var podInterface = kubernetesInterface.CoreV1().Pods(namespace) func PodRunningAndReady(p *corev1.Pod) bool { - return p.Status.Phase == corev1.PodRunning && PodReady(p) + return p.GetDeletionTimestamp() == nil && p.Status.Phase == corev1.PodRunning && PodReady(p) } func PodReady(p *corev1.Pod) bool { for _, c := range p.Status.Conditions { - if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { + if p.GetDeletionTimestamp() == nil && c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { return true } } diff --git a/test/s3-e2e/s3_sink_test.go b/test/s3-e2e/s3_sink_test.go index 9fec283e..866b72a9 100644 --- a/test/s3-e2e/s3_sink_test.go +++ b/test/s3-e2e/s3_sink_test.go @@ -10,10 +10,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/kafka.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/kafka.yaml //go:generate kubectl -n argo-dataflow-system apply -f ../../config/apps/moto.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/stan.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/stan.yaml func TestS3Sink(t *testing.T) { defer Setup(t)() @@ -51,8 +51,8 @@ func Handler(ctx context.Context, m []byte) ([]byte, error) { defer StartPortForward("s3-main-0")() SendMessageViaHTTP("my-msg") - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + WaitForSunkMessages() + WaitForTotalSunkMessages(1) DeletePipelines() WaitForPodsToBeDeleted() diff --git a/test/s3-e2e/s3_source_test.go b/test/s3-e2e/s3_source_test.go index 37243fe9..51452de4 100644 --- a/test/s3-e2e/s3_source_test.go +++ b/test/s3-e2e/s3_source_test.go @@ -36,8 +36,9 @@ func TestS3Source(t *testing.T) { WaitForPod() - WaitForPipeline(UntilSunkMessages) - WaitForStep(TotalSunkMessages(1)) + defer StartPortForward("s3-main-0")() + WaitForSunkMessages() + WaitForTotalSunkMessages(1) ExpectLogLine("main", "my-content") diff --git a/test/stan-e2e/stan_test.go b/test/stan-e2e/stan_test.go index ef838a14..177a14ae 100644 --- a/test/stan-e2e/stan_test.go +++ b/test/stan-e2e/stan_test.go @@ -10,9 +10,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/kafka.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/kafka.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml //go:generate kubectl -n argo-dataflow-system apply -f ../../config/apps/stan.yaml func TestSTAN(t *testing.T) { @@ -40,10 +40,11 @@ func TestSTAN(t *testing.T) { PumpSTANSubject(longSubject, 7) - WaitForPipeline(UntilSunkMessages) + defer StartPortForward("stan-main-0")() + WaitForSunkMessages() - WaitForStep(TotalSourceMessages(7)) - WaitForStep(TotalSunkMessages(7)) + WaitForTotalSourceMessages(7) + WaitForTotalSunkMessages(7) DeletePipelines() WaitForPodsToBeDeleted() diff --git a/test/stan-fmea/stan_fmea_test.go b/test/stan-fmea/stan_fmea_test.go index 567c09a1..b14c20f1 100644 --- a/test/stan-fmea/stan_fmea_test.go +++ b/test/stan-fmea/stan_fmea_test.go @@ -11,16 +11,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/kafka.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/kafka.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml //go:generate kubectl -n argo-dataflow-system apply -f ../../config/apps/stan.yaml func TestStanFMEA_PodDeletedDisruption(t *testing.T) { defer Setup(t)() longSubject, subject := RandomSTANSubject() - _, sinkSubject := RandomSTANSubject() + longSinkSubject, sinkSubject := RandomSTANSubject() CreatePipeline(Pipeline{ ObjectMeta: metav1.ObjectMeta{Name: "stan"}, @@ -41,18 +41,24 @@ func TestStanFMEA_PodDeletedDisruption(t *testing.T) { n := 500 * 15 go PumpSTANSubject(longSubject, n) - WaitForPipeline(UntilSunkMessages) + stopPortForward := StartPortForward("stan-main-0") + WaitForSunkMessages() + stopPortForward() DeletePod("stan-main-0") // delete the pod to see that we recover and continue to process messages WaitForPod("stan-main-0") - WaitForStep(TotalSunkMessagesBetween(n, n+1), 1*time.Minute) + defer StartPortForward("stan-main-0")() + ExpectSTANSubjectCount(longSinkSubject, n, n+1, time.Minute) + WaitForNoErrors() } func TestStanFMEA_STANServiceDisruption(t *testing.T) { + t.SkipNow() // TODO defer Setup(t)() longSubject, subject := RandomSTANSubject() + longSinkSubject, sinkSubject := RandomSTANSubject() CreatePipeline(Pipeline{ ObjectMeta: metav1.ObjectMeta{Name: "stan"}, @@ -61,7 +67,7 @@ func TestStanFMEA_STANServiceDisruption(t *testing.T) { Name: "main", Cat: &Cat{}, Sources: []Source{{STAN: &STAN{Subject: subject}}}, - Sinks: []Sink{DefaultLogSink}, + Sinks: []Sink{{STAN: &STAN{Subject: sinkSubject}}}, }}, }, }) @@ -73,13 +79,16 @@ func TestStanFMEA_STANServiceDisruption(t *testing.T) { n := 500 * 15 go PumpSTANSubject(longSubject, n) - WaitForPipeline(UntilSunkMessages) + defer StartPortForward("stan-main-0")() + WaitForSunkMessages() RestartStatefulSet("stan") WaitForPod("stan-0") - WaitForStep(TotalSunkMessagesBetween(n, n+CommitN), 1*time.Minute) - WaitForStep(NoErrors) + WaitForPod("stan-main-0") + + ExpectSTANSubjectCount(longSinkSubject, n, n+CommitN, time.Minute) + WaitForNoErrors() } // when deleted and re-created, the pipeline should start at the same place in the queue @@ -87,6 +96,7 @@ func TestStanFMEA_PipelineDeletionDisruption(t *testing.T) { defer Setup(t)() longSubject, subject := RandomSTANSubject() + longSinkSubject, sinkSubject := RandomSTANSubject() pl := Pipeline{ ObjectMeta: metav1.ObjectMeta{Name: "stan"}, @@ -95,25 +105,28 @@ func TestStanFMEA_PipelineDeletionDisruption(t *testing.T) { Name: "main", Cat: &Cat{}, Sources: []Source{{STAN: &STAN{Subject: subject}}}, - Sinks: []Sink{{HTTP: &HTTPSink{URL: "http://testapi/count/incr"}}}, + Sinks: []Sink{{STAN: &STAN{Subject: sinkSubject}}}, }}, }, } CreatePipeline(pl) - WaitForPipeline() - WaitForPod() n := 500 * 15 go PumpSTANSubject(longSubject, n) - WaitForPipeline(UntilSunkMessages) + stopPortForward := StartPortForward("stan-main-0") + WaitForSunkMessages() + stopPortForward() DeletePipelines() WaitForPodsToBeDeleted() CreatePipeline(pl) - WaitForCounter(n, n+CommitN) - WaitForStep(NoErrors) + WaitForPipeline() + WaitForPod() + defer StartPortForward("stan-main-0")() + ExpectSTANSubjectCount(longSinkSubject, n, n+CommitN, time.Minute) + WaitForNoErrors() } diff --git a/test/stan-stress/stan_stress_test.go b/test/stan-stress/stan_stress_test.go index f9415179..a0f386cf 100644 --- a/test/stan-stress/stan_stress_test.go +++ b/test/stan-stress/stan_stress_test.go @@ -13,9 +13,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/kafka.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml -//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/kafka.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml +//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml //go:generate kubectl -n argo-dataflow-system apply -f ../../config/apps/stan.yaml func TestStanSourceStress(t *testing.T) { @@ -53,7 +53,7 @@ func TestStanSourceStress(t *testing.T) { defer StartTPSReporter(t, "main", prefix, n)() go PumpSTANSubject(longSubject, n, prefix, Params.MessageSize) - WaitForStep(TotalSunkMessages(n), Params.Timeout) + WaitForTotalSunkMessages(n, Params.Timeout) } func TestStanSinkStress(t *testing.T) { @@ -91,5 +91,5 @@ func TestStanSinkStress(t *testing.T) { defer StartTPSReporter(t, "main", prefix, n)() go PumpSTANSubject(longSubject, n, prefix, Params.MessageSize) - WaitForStep(TotalSunkMessages(n*2), Params.Timeout) // 2 sinks + WaitForTotalSunkMessages(n, Params.Timeout) } diff --git a/test/stan-stress/test-results.json b/test/stan-stress/test-results.json index dc6fab2b..e26fbf13 100644 --- a/test/stan-stress/test-results.json +++ b/test/stan-stress/test-results.json @@ -1,5 +1,5 @@ { - "TestStanSinkStress/.tps": 200, + "TestStanSinkStress/.tps": 350, "TestStanSinkStress/N=10,messageSize=100.tps": 150, "TestStanSinkStress/currentContext=gke_jesse-sb_us-central1-c_dataflow-test,N=10,messageSize=5000000.tps": 200, "TestStanSinkStress/currentContext=gke_jesse-sb_us-central1-c_dataflow-test,N=100,messageSize=10.tps": 350, @@ -10,7 +10,7 @@ "TestStanSinkStress/replicas=3.tps": 300, "TestStanSinkStress/replicas=4.tps": 300, "TestStanSinkStress/replicas=5.tps": 350, - "TestStanSourceStress/.tps": 1050, + "TestStanSourceStress/.tps": 1800, "TestStanSourceStress/N=10,messageSize=100.tps": 850, "TestStanSourceStress/N=10,messageSize=1000.tps": 400, "TestStanSourceStress/currentContext=gke_jesse-sb_us-central1-c_dataflow-test,N=10,messageSize=5000000.tps": 40300, diff --git a/test/stan.go b/test/stan.go index 0002f8c6..1716f9ce 100644 --- a/test/stan.go +++ b/test/stan.go @@ -3,9 +3,11 @@ package test import ( + "context" "fmt" "log" "math/rand" + "strconv" "time" ) @@ -34,3 +36,28 @@ func PumpSTANSubject(subject string, n int, opts ...interface{}) { log.Printf("puming stan subject %q sleeping %v with %d messages sized %d\n", subject, sleep, n, size) InvokeTestAPI("/stan/pump-subject?subject=%s&sleep=%v&n=%d&prefix=%s&size=%d", subject, sleep, n, prefix, size) } + +func ExpectSTANSubjectCount(subject string, min, max int, timeout time.Duration) { + log.Printf("expecting count of STAN subject %q to be %d to %d\n", subject, min, max) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for { + select { + case <-ctx.Done(): + panic(fmt.Errorf("timeout waiting for %d to %d messages in subject %q", min, max, subject)) + default: + count, err := strconv.Atoi(InvokeTestAPI("/stan/count-subject?subject=%s", subject)) + if err != nil { + panic(fmt.Errorf("failed to count subject %q: %w", subject, err)) + } + log.Printf("count of STAN subject %q is %d\n", subject, count) + if min <= count && count <= max { + return + } + if count > max { + panic(fmt.Errorf("too many messages %d > %d", count, max)) + } + time.Sleep(time.Second) + } + } +} diff --git a/test/step.go b/test/step.go index a992fca4..7379cc71 100644 --- a/test/step.go +++ b/test/step.go @@ -21,34 +21,6 @@ import ( var stepInterface = dynamicInterface.Resource(StepGroupVersionResource).Namespace(namespace) -func NoErrors(s Step) bool { - return s.Status.SourceStatuses.GetErrors() == 0 -} - -func NothingPending(s Step) bool { - return s.Status.SourceStatuses.GetPending() == 0 -} - -func TotalSourceMessagesFunc(f func(int) bool) func(s Step) bool { - return func(s Step) bool { return f(int(s.Status.SourceStatuses.GetTotal())) } -} - -func TotalSourceMessages(n int) func(s Step) bool { - return TotalSourceMessagesFunc(func(t int) bool { return t == n }) -} - -func TotalSunkMessagesFunc(f func(int) bool) func(s Step) bool { - return func(s Step) bool { return f(int(s.Status.SinkStatues.GetTotal())) } -} - -func TotalSunkMessages(n int) func(s Step) bool { - return TotalSunkMessagesBetween(n, n) -} - -func TotalSunkMessagesBetween(n, m int) func(s Step) bool { - return TotalSunkMessagesFunc(func(t int) bool { return n <= t && t <= m }) -} - func WaitForStep(opts ...interface{}) { var ( listOptions = metav1.ListOptions{} diff --git a/test/testapi.go b/test/testapi.go index 207a81c7..c708c4ca 100644 --- a/test/testapi.go +++ b/test/testapi.go @@ -13,19 +13,19 @@ import ( func InvokeTestAPI(format string, args ...interface{}) string { url := "http://localhost:8378" + fmt.Sprintf(format, args...) - log.Printf("getting test API %s\n", url) + log.Printf("GET %s\n", url) r, err := http.Get(url) if err != nil { panic(err) } - log.Printf("test API: %s\n", r.Status) + log.Printf("> %s\n", r.Status) body := "" for s := bufio.NewScanner(r.Body); s.Scan(); { x := s.Text() if strings.Contains(x, "ERROR") { // hacky way to return an error from an octet-stream panic(errors.New(x)) } - log.Printf("test API: %s\n", x) + log.Printf("> %s\n", x) body += x } if r.StatusCode >= 300 { diff --git a/testapi/kafka.go b/testapi/kafka.go index 4de8956b..44d3a053 100644 --- a/testapi/kafka.go +++ b/testapi/kafka.go @@ -39,6 +39,41 @@ func init() { } w.WriteHeader(201) }) + http.HandleFunc("/kafka/count-topic", func(w http.ResponseWriter, r *http.Request) { + topics := r.URL.Query()["topic"] + if len(topics) < 1 { + w.WriteHeader(400) + return + } + topic := topics[0] + client, err := sarama.NewClient(addrs, config) + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + return + } + + partitions, err := client.Partitions(topic) + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + return + } + + count := 0 + for _, p := range partitions { + offset, err := client.GetOffset(topic, p, sarama.OffsetNewest) + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + return + } + count += int(offset) + } + + w.WriteHeader(200) + _, _ = w.Write([]byte(strconv.Itoa(count))) + }) http.HandleFunc("/kafka/pump-topic", func(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") mf := newMessageFactory(r.URL.Query()) diff --git a/testapi/stan.go b/testapi/stan.go index 96c0553b..7c0acc42 100644 --- a/testapi/stan.go +++ b/testapi/stan.go @@ -1,11 +1,14 @@ package main import ( + "encoding/json" "fmt" "net/http" "strconv" "time" + "github.com/nats-io/nats-streaming-server/server" + "github.com/nats-io/nats.go" "github.com/nats-io/stan.go" ) @@ -81,4 +84,26 @@ func init() { } _, _ = fmt.Fprintf(w, "sent %d messages of size %d at %.0f TPS to %q\n", n, mf.size, float64(n)/time.Since(start).Seconds(), subject) }) + http.HandleFunc("/stan/count-subject", func(w http.ResponseWriter, r *http.Request) { + subject := r.URL.Query().Get("subject") + resp, err := http.Get("http://stan:8222/streaming/channelsz?channel=" + subject) + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(fmt.Sprintf("failed to request STAN channelz: %v", err))) + return + } + if resp.StatusCode != 200 { + w.WriteHeader(500) + _, _ = w.Write([]byte(fmt.Sprintf("failed to request STAN channelz: %s", resp.Status))) + return + } + defer func() { _ = resp.Body.Close() }() + o := server.Channelz{} + if err := json.NewDecoder(resp.Body).Decode(&o); err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + return + } + _, _ = fmt.Fprintf(w, "%d", o.LastSeq-o.FirstSeq) + }) }