Skip to content

Commit

Permalink
fix: Auto create HTTP auth secret. Fixes #319 (#353)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Sep 20, 2021
1 parent 8d9b1ae commit 124d6cb
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 38 deletions.
1 change: 1 addition & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13944,6 +13944,7 @@ rules:
resources:
- secrets
verbs:
- create
- get
---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
1 change: 1 addition & 0 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13944,6 +13944,7 @@ rules:
resources:
- secrets
verbs:
- create
- get
---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
1 change: 1 addition & 0 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13944,6 +13944,7 @@ rules:
resources:
- secrets
verbs:
- create
- get
---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
1 change: 1 addition & 0 deletions config/rbac/pipeline-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ rules:
resources:
- secrets
verbs:
- create
- get
13 changes: 11 additions & 2 deletions runner/sidecar/source/http/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"context"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -11,13 +12,21 @@ import (
"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

type httpSource struct {
ready bool
}

func New(sourceURN, sourceName, authorization string, process source.Process) source.Interface {
func New(ctx context.Context, secretInterface corev1.SecretInterface, pipelineName, stepName, sourceURN, sourceName string, process source.Process) (string, source.Interface, error) {
// we don't want to share this secret
secret, err := secretInterface.Get(ctx, pipelineName+"-"+stepName, metav1.GetOptions{})
if err != nil {
return "", nil, fmt.Errorf("failed to get secret %q: %w", stepName, err)
}
authorization := string(secret.Data[fmt.Sprintf("sources.%s.http.authorization", sourceName)])
h := &httpSource{true}
http.HandleFunc("/sources/"+sourceName, func(w http.ResponseWriter, r *http.Request) {
wireContext, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
Expand Down Expand Up @@ -56,7 +65,7 @@ func New(sourceURN, sourceName, authorization string, process source.Process) so
w.WriteHeader(204)
}
})
return h
return authorization, h, nil
}

func (s *httpSource) Close() error {
Expand Down
11 changes: 6 additions & 5 deletions runner/sidecar/source/loadbalanced/loadbalanced.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (

"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
httpsource "github.com/argoproj-labs/argo-dataflow/runner/sidecar/source/http"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
"github.com/go-logr/logr"

"k8s.io/apimachinery/pkg/util/runtime"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/workqueue"
)

Expand Down Expand Up @@ -43,13 +42,15 @@ type NewReq struct {
ListItems func() ([]interface{}, error)
}

func New(ctx context.Context, r NewReq) (source.HasPending, error) {
func New(ctx context.Context, secretInterface corev1.SecretInterface, r NewReq) (source.HasPending, error) {
logger := r.Logger.WithValues("sourceName", r.SourceName)
// (a) in the future we could use a named queue to expose metrics
// (b) it would be good to limit the size of this work queue and have the `Add
jobs := workqueue.New()
authorization := sharedutil.RandString()
httpSource := httpsource.New(r.SourceURN, r.SourceName, authorization, r.Process)
authorization, httpSource, err := httpsource.New(ctx, secretInterface, r.PipelineName, r.StepName, r.SourceURN, r.SourceName, r.Process)
if err != nil {
return nil, err
}
if r.LeadReplica {
endpoint := "https://" + r.PipelineName + "-" + r.StepName + "/sources/" + r.SourceName
t := http.DefaultTransport.(*http.Transport).Clone()
Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/source/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, pipelineNa

client := s3.New(options)

return loadbalanced.New(ctx, loadbalanced.NewReq{
return loadbalanced.New(ctx, secretInterface, loadbalanced.NewReq{
Logger: logger,
PipelineName: pipelineName,
StepName: stepName,
Expand Down
6 changes: 4 additions & 2 deletions runner/sidecar/source/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/opentracing/opentracing-go"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
Expand All @@ -21,10 +23,10 @@ type message struct {
Path string `json:"path"`
}

func New(ctx context.Context, pipelineName, stepName, sourceName, sourceURN string, x dfv1.VolumeSource, process source.Process, leadReplica bool) (source.HasPending, error) {
func New(ctx context.Context, secretInterface corev1.SecretInterface, pipelineName, stepName, sourceName, sourceURN string, x dfv1.VolumeSource, process source.Process, leadReplica bool) (source.HasPending, error) {
logger := sharedutil.NewLogger().WithValues("source", sourceName)
dir := filepath.Join(dfv1.PathVarRun, "sources", sourceName)
return loadbalanced.New(ctx, loadbalanced.NewReq{
return loadbalanced.New(ctx, secretInterface, loadbalanced.NewReq{
Logger: logger,
PipelineName: pipelineName,
StepName: stepName,
Expand Down
35 changes: 29 additions & 6 deletions runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source/cron"
Expand Down Expand Up @@ -56,6 +58,10 @@ func connectSources(ctx context.Context, process func(context.Context, []byte) e
Help: "Total number of bytes processed, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sources_retries",
}, []string{"sourceName", "replica"})

if err := createSecret(ctx); err != nil {
return err
}

sources := make(map[string]source.Interface)
for _, s := range step.Spec.Sources {
sourceName := s.Name
Expand Down Expand Up @@ -127,12 +133,11 @@ func connectSources(ctx context.Context, process func(context.Context, []byte) e
sources[sourceName] = y
}
} else if x := s.HTTP; x != nil {
// we don't want to share this secret
secret, err := secretInterface.Get(ctx, step.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get secret %q: %w", step.Name, err)
if _, y, err := httpsource.New(ctx, secretInterface, pipelineName, stepName, sourceURN, sourceName, processWithRetry); err != nil {
return err
} else {
sources[sourceName] = y
}
sources[sourceName] = httpsource.New(sourceURN, sourceName, string(secret.Data[fmt.Sprintf("sources.%s.http.authorization", sourceName)]), processWithRetry)
} else if x := s.S3; x != nil {
if y, err := s3source.New(ctx, secretInterface, pipelineName, stepName, sourceName, sourceURN, *x, processWithRetry, leadReplica()); err != nil {
return err
Expand All @@ -146,7 +151,7 @@ func connectSources(ctx context.Context, process func(context.Context, []byte) e
sources[sourceName] = y
}
} else if x := s.Volume; x != nil {
if y, err := volumeSource.New(ctx, pipelineName, stepName, sourceName, sourceURN, *x, processWithRetry, leadReplica()); err != nil {
if y, err := volumeSource.New(ctx, secretInterface, pipelineName, stepName, sourceName, sourceURN, *x, processWithRetry, leadReplica()); err != nil {
return err
} else {
sources[sourceName] = y
Expand All @@ -173,3 +178,21 @@ func connectSources(ctx context.Context, process func(context.Context, []byte) e
}
return nil
}

func createSecret(ctx context.Context) error {
data := map[string]string{}
for _, s := range step.Spec.Sources {
data[fmt.Sprintf("sources.%s.http.authorization", s.Name)] = fmt.Sprintf("Bearer %s", sharedutil.RandString())
}
_, err := secretInterface.Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: step.Name,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(step.GetObjectMeta(), dfv1.StepGroupVersionKind)},
},
StringData: data,
}, metav1.CreateOptions{})
if sharedutil.IgnoreAlreadyExists(err) != nil {
return fmt.Errorf("failed to create secret %q: %w", step.Name, err)
}
return nil
}
4 changes: 2 additions & 2 deletions test/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func SendMessageViaHTTP(msg string) {
if err != nil {
panic(err)
}
req.Header.Set("Authorization", "Bearer my-bearer-token")
req.Header.Set("Authorization", GetAuthorization())
resp, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
Expand All @@ -40,7 +40,7 @@ func PumpHTTP(_url, prefix string, n int, opts ...interface{}) {
}
}
log.Printf("sending %d messages sized %d prefixed %q via HTTP to %q\n", n, size, prefix, _url)
InvokeTestAPI("/http/pump?url=%s&prefix=%s&n=%d&sleep=0&size=%d", url.QueryEscape(_url), prefix, n, size)
InvokeTestAPI("/http/pump?url=%s&prefix=%s&n=%d&sleep=0&size=%d&authorization=%s", url.QueryEscape(_url), prefix, n, size, url.QueryEscape(GetAuthorization()))
}

func PumpHTTPTolerantly(n int) {
Expand Down
33 changes: 14 additions & 19 deletions test/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"log"
"time"

corev1 "k8s.io/api/core/v1"

. "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -45,11 +43,10 @@ func CreatePipeline(pl Pipeline) {
ctx := context.Background()
log.Printf("creating pipeline %q\n", pl.Name)
un := ToUnstructured(pl)
created, err := pipelineInterface.Create(ctx, un, metav1.CreateOptions{})
_, err := pipelineInterface.Create(ctx, un, metav1.CreateOptions{})
if err != nil {
panic(err)
}
createSecretsForHTTPSources(pl, FromUnstructured(created), ctx)
}

func CreatePipelineFromFile(filename string) {
Expand All @@ -60,21 +57,19 @@ func CreatePipelineFromFile(filename string) {
CreatePipeline(FromUnstructured(&un.Items[0]))
}

func createSecretsForHTTPSources(pl Pipeline, x Pipeline, ctx context.Context) {
for _, step := range x.Spec.Steps {
for _, source := range step.Sources {
if source.HTTP != nil {
secretName := fmt.Sprintf("%s-%s", pl.Name, step.Name)
log.Printf("creating secret %q\n", secretName)
_, err := kubernetesInterface.CoreV1().Secrets(namespace).Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: secretName},
StringData: map[string]string{fmt.Sprintf("sources.%s.http.authorization", source.Name): "Bearer my-bearer-token"},
}, metav1.CreateOptions{})
if sharedutil.IgnoreAlreadyExists(err) != nil {
panic(err)
}
}
}
func GetPipeline() Pipeline {
ctx := context.Background()
list, err := pipelineInterface.List(ctx, metav1.ListOptions{})
if err != nil {
panic(fmt.Errorf("failed to list pipelines: %w", err))
}
switch len(list.Items) {
case 0:
panic(fmt.Errorf("no pipelines found"))
case 1:
return FromUnstructured(&list.Items[0])
default:
panic(fmt.Errorf("more than one pipeline found"))
}
}

Expand Down
33 changes: 33 additions & 0 deletions test/secrets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// +build test

package test

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var secretsInterface = kubernetesInterface.CoreV1().Secrets(namespace)

func GetAuthorization() string {
ctx := context.Background()
pl := GetPipeline()
for _, step := range pl.Spec.Steps {
for _, source := range step.Sources {
if source.HTTP != nil {
secret, err := secretsInterface.Get(ctx, fmt.Sprintf("%s-%s", pl.Name, step.Name), metav1.GetOptions{})
if err != nil {
panic(err)
}
data, ok := secret.Data[fmt.Sprintf("sources.%s.http.authorization", source.Name)]
if !ok {
panic(fmt.Errorf("source %q not found", source.Name))
}
return string(data)
}
}
}
panic(fmt.Errorf("not HTTP source"))
}
3 changes: 2 additions & 1 deletion testapi/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func init() {
httpClient := &http.Client{Timeout: 10 * time.Second, Transport: t}
http.HandleFunc("/http/pump", func(w http.ResponseWriter, r *http.Request) {
url := r.URL.Query().Get("url")
authorization := r.URL.Query().Get("authorization")
duration, err := time.ParseDuration(r.URL.Query().Get("sleep"))
if err != nil {
w.WriteHeader(400)
Expand All @@ -47,7 +48,7 @@ func init() {
if err != nil {
results <- err
} else {
req.Header.Set("Authorization", "Bearer my-bearer-token")
req.Header.Set("Authorization", authorization)
if resp, err := httpClient.Do(req); err != nil {
results <- err
} else {
Expand Down

0 comments on commit 124d6cb

Please sign in to comment.