Skip to content

Commit

Permalink
feat!: Remove SunkMessages condition. Add sinks_total metric. (#328)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Sep 16, 2021
1 parent 4168dd0 commit 69e608a
Show file tree
Hide file tree
Showing 58 changed files with 475 additions and 287 deletions.
9 changes: 3 additions & 6 deletions api/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ import (

const (
// conditions
ConditionCompleted = "Completed" // the pipeline completed
ConditionRunning = "Running" // added if any step is currently running
// DEPRECATED: This is likely to be removed in future versions as in depends on metrics in source/sink status, and
// we plan to remove that.
ConditionSunkMessages = "SunkMessages" // added if any messages have been written to a sink for any step
ConditionTerminating = "Terminating" // added if any terminator step terminated
ConditionCompleted = "Completed" // the pipeline completed
ConditionRunning = "Running" // added if any step is currently running
ConditionTerminating = "Terminating" // added if any terminator step terminated
// container names
CtrInit = "init"
CtrMain = "main"
Expand Down
19 changes: 0 additions & 19 deletions api/v1alpha1/source_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,6 @@ func (in SourceStatuses) GetTotal() uint64 {
return v
}

// DEPRECATED: This is likely to be removed in future versions.
func (in SourceStatuses) AnySunk() bool {
for _, s := range in {
if s.AnySunk() {
return true
}
}
return false
}

// IncrRetries increase the retry_count metrics by 1
// DEPRECATED: This is likely to be removed in future versions.
func (in SourceStatuses) IncrRetries(name string, replica int) {
Expand All @@ -111,12 +101,3 @@ func (in SourceStatuses) IncrRetries(name string, replica int) {
x.Metrics[strconv.Itoa(replica)] = m
in[name] = x
}

// DEPRECATED: This is likely to be removed in future versions.
func (in SourceStatuses) GetTotalBytes() uint64 {
var v uint64
for _, s := range in {
v += s.GetTotalBytes()
}
return v
}
4 changes: 4 additions & 0 deletions config/apps/stan.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ spec:
name: monitor
- containerPort: 7777
name: metrics
readinessProbe:
httpGet:
path: /streaming/channelsz
port: 8222
volumeMounts:
- mountPath: /etc/stan-config
name: config-volume
Expand Down
6 changes: 6 additions & 0 deletions config/apps/stan/stan-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ spec:
args:
- -fvN
- nats:8222
containers:
- name: stan
readinessProbe:
httpGet:
port: 8222
path: /streaming/channelsz
4 changes: 4 additions & 0 deletions docs/METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ Only exposed by replica 0.

Golden metric type: traffic.

### sinks_total

Use this to track throughput. Includes retries and errors.

### sources_errors

Use this to track errors.
Expand Down
3 changes: 2 additions & 1 deletion examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

. "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"

sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -70,5 +71,5 @@ func getWaitFor(un metav1.Object) string {
if v := un.GetAnnotations()["dataflow.argoproj.io/wait-for"]; v != "" {
return v
}
return ConditionSunkMessages
return ConditionRunning
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/klauspost/compress v1.13.1 // indirect
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mitchellh/copystructure v1.1.1 // indirect
github.com/nats-io/nats-streaming-server v0.21.1 // indirect
github.com/nats-io/nats-streaming-server v0.21.1
github.com/nats-io/nats.go v1.11.0
github.com/nats-io/stan.go v0.8.3
github.com/onsi/ginkgo v1.14.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.9.0 h1:L8nSXQQzAYByakOFMTwpjRoHsMJklur4Gi59b6VivR8=
github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lucasb-eyer/go-colorful v1.0.2/go.mod h1:0MS4r+7BZKSJ5mw4/S5MPN+qHFF1fYclkSPilDOKW0s=
github.com/lucasb-eyer/go-colorful v1.0.3/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
Expand Down Expand Up @@ -555,6 +556,7 @@ github.com/onsi/gomega v1.10.2 h1:aY/nuoWlKJud2J6U0E3NWsjlg+0GtwXxgEqthRdzlcs=
github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down
10 changes: 4 additions & 6 deletions manager/controllers/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
pending, running, succeeded, failed := 0, 0, 0, 0
newStatus := *pipeline.Status.DeepCopy()
newStatus.Phase = dfv1.PipelineUnknown
terminate, sunkMessages := false, false
terminate := false
for _, step := range steps.Items {
stepName := step.Spec.Name
if !pipeline.Spec.HasStep(stepName) { // this happens when a pipeline changes and a step is removed
Expand All @@ -148,7 +148,6 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
panic("should never happen")
}
terminate = terminate || step.Status.Phase.Completed() && step.Spec.Terminator
sunkMessages = sunkMessages || step.Status.SinkStatues.AnySunk()
}

if newStatus.Phase.Completed() {
Expand All @@ -173,10 +172,9 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
newStatus.Message = strings.Join(ss, ", ")

for c, ok := range map[string]bool{
dfv1.ConditionRunning: newStatus.Phase == dfv1.PipelineRunning,
dfv1.ConditionCompleted: newStatus.Phase.Completed(),
dfv1.ConditionSunkMessages: sunkMessages,
dfv1.ConditionTerminating: terminate,
dfv1.ConditionRunning: newStatus.Phase == dfv1.PipelineRunning,
dfv1.ConditionCompleted: newStatus.Phase.Completed(),
dfv1.ConditionTerminating: terminate,
} {
if ok {
meta.SetStatusCondition(&newStatus.Conditions, metav1.Condition{Type: c, Status: metav1.ConditionTrue, Reason: c})
Expand Down
8 changes: 6 additions & 2 deletions runner/sidecar/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ func connectIn(ctx context.Context, sink func(context.Context, []byte) error) (f
}, nil
} else if in.HTTP != nil {
logger.Info("HTTP in interface configured")
if err := waitReady(ctx); err != nil {
return nil, err
if len(step.Spec.Sources) > 0 {
if err := waitReady(ctx); err != nil {
return nil, err
}
} else {
logger.Info("not waiting for HTTP to be read, this maybe a generator step and so may never be ready")
}
addStopHook(waitUnready)
return func(ctx context.Context, data []byte) error {
Expand Down
9 changes: 9 additions & 0 deletions runner/sidecar/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"io"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

volumesink "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/volume"

s3sink "github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink/s3"
Expand All @@ -20,6 +23,11 @@ import (

func connectSinks(ctx context.Context) (func(context.Context, []byte) error, error) {
sinks := map[string]sink.Interface{}
totalCounter := promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sinks",
Name: "total",
Help: "Total number of messages, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sinks_total",
}, []string{"sinkName", "replica"})
for _, s := range step.Spec.Sinks {
logger.Info("connecting sink", "sink", sharedutil.MustJSON(s))
sinkName := s.Name
Expand Down Expand Up @@ -78,6 +86,7 @@ func connectSinks(ctx context.Context) (func(context.Context, []byte) error, err

return func(ctx context.Context, msg []byte) error {
for sinkName, f := range sinks {
totalCounter.WithLabelValues(sinkName, fmt.Sprint(replica)).Inc()
withLock(func() {
step.Status.SinkStatues.IncrTotal(sinkName, replica, uint64(len(msg)))
})
Expand Down
40 changes: 12 additions & 28 deletions runner/sidecar/source/stan/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"net/http"
"time"

"github.com/nats-io/nats-streaming-server/server"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
sharedstan "github.com/argoproj-labs/argo-dataflow/runner/sidecar/shared/stan"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
Expand Down Expand Up @@ -127,13 +129,11 @@ func (s stanSource) Close() error {
return s.conn.Close()
}

func (s stanSource) GetPending(ctx context.Context) (uint64, error) {
httpClient := http.Client{
Timeout: time.Second * 3,
}

type obj = map[string]interface{}
var httpClient = http.Client{
Timeout: time.Second * 3,
}

func (s stanSource) GetPending(ctx context.Context) (uint64, error) {
pendingMessages := func(ctx context.Context, channel, queueNameCombo string) (int64, error) {
monitoringEndpoint := fmt.Sprintf("%s/streaming/channelsz?channel=%s&subs=1", s.natsMonitoringURL, channel)
req, err := http.NewRequestWithContext(ctx, "GET", monitoringEndpoint, nil)
Expand All @@ -148,33 +148,17 @@ func (s stanSource) GetPending(ctx context.Context) (uint64, error) {
return 0, fmt.Errorf("invalid response: %s", resp.Status)
}
defer func() { _ = resp.Body.Close() }()
o := make(obj)
o := server.Channelz{}
if err := json.NewDecoder(resp.Body).Decode(&o); err != nil {
return 0, err
}
lastSeq, ok := o["last_seq"].(float64)
if !ok {
return 0, fmt.Errorf("unrecognized last_seq: %v", o["last_seq"])
}
subs, ok := o["subscriptions"]
if !ok {
return 0, fmt.Errorf("no suscriptions field found in the monitoring endpoint response")
}
maxLastSent := float64(0)
for _, i := range subs.([]interface{}) {
s := i.(obj)
if fmt.Sprintf("%v", s["queue_name"]) != queueNameCombo {
continue
}
lastSent, ok := s["last_sent"].(float64)
if !ok {
return 0, fmt.Errorf("unrecognized last_sent: %v", s["last_sent"])
}
if lastSent > maxLastSent {
maxLastSent = lastSent
maxLastSent := uint64(0)
for _, s := range o.Subscriptions {
if s.QueueName == queueNameCombo && s.LastSent > maxLastSent {
maxLastSent = s.LastSent
}
}
return int64(lastSeq) - int64(maxLastSent), nil
return int64(o.LastSeq - maxLastSent), nil
}

// queueNameCombo := {durableName}:{queueGroup}
Expand Down
1 change: 0 additions & 1 deletion shared/symbol/symbol.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const (
Error = "⚠"
// Failed = "✖"
Pending = "◷"
Rate = "Δ"
Total = "Σ"
// Running = "●"
// Succeeded = "✔"
Expand Down
10 changes: 5 additions & 5 deletions test/db-e2e/db_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/kafka.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/kafka.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml
//go:generate kubectl -n argo-dataflow-system apply -f ../../config/apps/mysql.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/stan.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/stan.yaml

func TestDBSink(t *testing.T) {
defer Setup(t)()
Expand Down Expand Up @@ -90,8 +90,8 @@ func TestDBSink(t *testing.T) {
defer StartPortForward("db-main-0")()
SendMessageViaHTTP(`{"message": "hello", "number": 100}`)

WaitForPipeline(UntilSunkMessages)
WaitForStep(TotalSunkMessages(1))
WaitForSunkMessages()
WaitForTotalSunkMessages(1)

// TODO: verify the table records.

Expand Down
8 changes: 4 additions & 4 deletions test/db-e2e/db_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func TestDBSource(t *testing.T) {
SendMessageViaHTTP(`{"message": "msg2", "number": 102}`)
SendMessageViaHTTP(`{"message": "msg3", "number": 103}`)

WaitForPipeline(UntilSunkMessages)
WaitForStep(TotalSunkMessages(3))
WaitForSunkMessages()
WaitForTotalSunkMessages(3)

DeletePipelines()
WaitForPodsToBeDeleted()
Expand Down Expand Up @@ -158,8 +158,8 @@ func cleanupDB() {
defer StartPortForward("cleanup-db-main-0")()
SendMessageViaHTTP(`hello`)

WaitForPipeline(UntilSunkMessages)
WaitForStep(TotalSunkMessages(1))
WaitForSunkMessages()
WaitForTotalSunkMessages(1)

DeletePipelines()
WaitForPodsToBeDeleted()
Expand Down
12 changes: 6 additions & 6 deletions test/e2e/cat_step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/kafka.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/moto.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/mysql.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f../../config/apps/stan.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/kafka.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/moto.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/mysql.yaml
//go:generate kubectl -n argo-dataflow-system delete --ignore-not-found -f ../../config/apps/stan.yaml

func TestCatStep(t *testing.T) {
defer Setup(t)()
Expand All @@ -38,8 +38,8 @@ func TestCatStep(t *testing.T) {

SendMessageViaHTTP("foo-bar")

WaitForPipeline(UntilSunkMessages)
WaitForStep(TotalSunkMessages(1))
WaitForSunkMessages()
WaitForTotalSunkMessages(1)

ExpectLogLine("main", `foo-bar`)

Expand Down
4 changes: 2 additions & 2 deletions test/e2e/container_step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func TestContainerStep(t *testing.T) {

SendMessageViaHTTP("foo-bar")

WaitForPipeline(UntilSunkMessages)
WaitForStep(TotalSunkMessages(1))
WaitForSunkMessages()
WaitForTotalSunkMessages(1)

DeletePipelines()
WaitForPodsToBeDeleted()
Expand Down
4 changes: 3 additions & 1 deletion test/e2e/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func TestCronSource(t *testing.T) {
},
})
WaitForPipeline()
WaitForPipeline(UntilSunkMessages)
WaitForPod()
defer StartPortForward("cron-main-0")()
WaitForSunkMessages()

DeletePipelines()
WaitForPodsToBeDeleted()
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ func TestDedupe(t *testing.T) {
SendMessageViaHTTP("baz")
SendMessageViaHTTP("baz")

WaitForStep(TotalSourceMessages(6))
WaitForStep(TotalSunkMessages(4))
WaitForTotalSourceMessages(6)
WaitForTotalSunkMessages(4)

ExpectMetric("duplicate_messages", 2, 8080)
ExpectMetric("duplicate_messages", Eq(2), 8080)

DeletePipelines()
WaitForPodsToBeDeleted()
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/expand_step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestExpandStep(t *testing.T) {

SendMessageViaHTTP(`{"foo.bar": "baz"}`)

WaitForPipeline(UntilSunkMessages)
WaitForStep(TotalSunkMessages(1))
WaitForSunkMessages()
WaitForTotalSunkMessages(1)

ExpectLogLine("main", `"foo\\":`)

Expand Down
Loading

0 comments on commit 69e608a

Please sign in to comment.