Skip to content

Commit

Permalink
feat: allow messages to be return to request
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 22, 2021
1 parent 59669ca commit 2ad2f52
Show file tree
Hide file tree
Showing 28 changed files with 101 additions and 145 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ ADD runtimes/python3-9 /workspace
RUN chown -R 9653 /.cache /.local /workspace
WORKDIR /workspace
USER 9653:9653
RUN pip3 install -r requirements.txt
ENTRYPOINT ./entrypoint.sh
4 changes: 2 additions & 2 deletions api/v1alpha1/step_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ type StepStatus struct {
Reason string `json:"reason" protobuf:"bytes,8,opt,name=reason"`
Message string `json:"message" protobuf:"bytes,2,opt,name=message"`
Replicas uint32 `json:"replicas" protobuf:"varint,5,opt,name=replicas"`
Selector string `json:"selector" protobuf:"bytes,7,opt,name=selector"`
LastScaledAt metav1.Time `json:"lastScaledAt" protobuf:"bytes,6,opt,name=lastScaledAt"`
Selector string `json:"selector,omitempty" protobuf:"bytes,7,opt,name=selector"`
LastScaledAt metav1.Time `json:"lastScaledAt,omitempty" protobuf:"bytes,6,opt,name=lastScaledAt"`
SourceStatuses SourceStatuses `json:"sourceStatuses" protobuf:"bytes,3,rep,name=sourceStatuses"`
SinkStatues SinkStatuses `json:"sinkStatuses" protobuf:"bytes,4,rep,name=sinkStatuses"`
}
Expand Down
2 changes: 0 additions & 2 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3739,12 +3739,10 @@ spec:
type: object
type: object
required:
- lastScaledAt
- message
- phase
- reason
- replicas
- selector
- sinkStatuses
- sourceStatuses
type: object
Expand Down
2 changes: 0 additions & 2 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2766,12 +2766,10 @@ spec:
type: object
type: object
required:
- lastScaledAt
- message
- phase
- reason
- replicas
- selector
- sinkStatuses
- sourceStatuses
type: object
Expand Down
2 changes: 0 additions & 2 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3739,12 +3739,10 @@ spec:
type: object
type: object
required:
- lastScaledAt
- message
- phase
- reason
- replicas
- selector
- sinkStatuses
- sourceStatuses
type: object
Expand Down
2 changes: 0 additions & 2 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3739,12 +3739,10 @@ spec:
type: object
type: object
required:
- lastScaledAt
- message
- phase
- reason
- replicas
- selector
- sinkStatuses
- sourceStatuses
type: object
Expand Down
2 changes: 0 additions & 2 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3739,12 +3739,10 @@ spec:
type: object
type: object
required:
- lastScaledAt
- message
- phase
- reason
- replicas
- selector
- sinkStatuses
- sourceStatuses
type: object
Expand Down
4 changes: 2 additions & 2 deletions docs/IMAGE_CONTRACT.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ For an image to be run as a step, it must obey the image contract.

It must implement the following endpoints:

* http://localhost:8080/ready - must return 200 OK to a GET whenever it is ready to receive messages
* http://localhost:8080/messages - must return 200 OK to a POST (where the post body is the message bytes) whenever is successfully accepts a message. If it return any other code, then the message will be marked as errored.
* http://localhost:8080/ready - must return 204 to a GET whenever it is ready to receive messages, it should not return 204 when it is not ready.
* http://localhost:8080/messages - must return either 204, or 201 OK to a POST (where the post body is the message bytes) whenever is successfully accepts a message. If it return any other code, then the message will be marked as errored. If it return 201, it must return the data as the HTTP response body.

It may POST a message (as bytes) to http://localhost:3569/messages and this will be sent to each sink. This endpoint will return standard HTTP response codes, including 500 if the message could not be processed.
4 changes: 2 additions & 2 deletions examples/104-go1-16-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ spec:
code: |
package main
func Handler(m []byte) ([][]byte, error) {
return [][]byte{[]byte("hi " + string(m))}, nil
func Handler(m []byte) ([]byte, error) {
return []byte("hi " + string(m)), nil
}
runtime: go1-16
name: main
Expand Down
4 changes: 2 additions & 2 deletions examples/104-java16-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ spec:
- handler:
code: |
public class Handler {
public static byte[][] Handle(byte[] msg) throws Exception {
return new byte[][]{msg};
public static byte[] Handle(byte[] msg) throws Exception {
return msg;
}
}
runtime: java16
Expand Down
2 changes: 1 addition & 1 deletion examples/104-python-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ spec:
- handler:
code: |
def handler(msg):
return [msg]
return msg
runtime: python3-9
name: main
sinks:
Expand Down
4 changes: 2 additions & 2 deletions examples/git/handler.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package main

func Handler(m []byte) ([][]byte, error) {
return [][]byte{[]byte("hi " + string(m))}, nil
func Handler(m []byte) ([]byte, error) {
return []byte("hi " + string(m)), nil
}
4 changes: 2 additions & 2 deletions examples/git/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func main() {
http.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.WriteHeader(204)
})
http.HandleFunc("/messages", func(w http.ResponseWriter, r *http.Request) {
msg, err := ioutil.ReadAll(r.Body)
Expand All @@ -30,7 +30,7 @@ func main() {
w.WriteHeader(500)
return
}
if resp.StatusCode != 200 {
if resp.StatusCode >=300 {
println(err, "failed to post message", resp.Status)
w.WriteHeader(500)
return
Expand Down
3 changes: 1 addition & 2 deletions manager/controllers/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

log.Info("reconciling", "steps", len(pipeline.Spec.Steps))
log.Info("reconciling")

for _, step := range pipeline.Spec.Steps {
stepFullName := pipeline.Name + "-" + step.Name
log.Info("applying step", "stepName", step.Name, "stepFullName", stepFullName)
matchLabels := map[string]string{dfv1.KeyPipelineName: pipeline.Name, dfv1.KeyStepName: step.Name}
obj := &dfv1.Step{
ObjectMeta: metav1.ObjectMeta{
Expand Down
4 changes: 2 additions & 2 deletions runner/cat/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func Exec(ctx context.Context) error {
return util.Do(ctx, func(msg []byte) ([][]byte, error) {
return [][]byte{msg}, nil
return util.Do(ctx, func(msg []byte) ([]byte, error) {
return msg, nil
})
}
4 changes: 2 additions & 2 deletions runner/expand/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
)

func Exec(ctx context.Context) error {
return util.Do(ctx, func(msg []byte) ([][]byte, error) {
return util.Do(ctx, func(msg []byte) ([]byte, error) {
v := make(map[string]interface{})
if err := json.Unmarshal(msg, &v); err != nil {
return nil, err
}
if data, err := json.Marshal(bellows.Expand(v)); err != nil {
return nil, err
} else {
return [][]byte{data}, nil
return data, nil
}
})
}
4 changes: 2 additions & 2 deletions runner/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func Exec(ctx context.Context, x string) error {
if err != nil {
return fmt.Errorf("failed to compile %q: %w", x, err)
}
return util.Do(ctx, func(msg []byte) ([][]byte, error) {
return util.Do(ctx, func(msg []byte) ([]byte, error) {
res, err := expr.Run(prog, util.ExprEnv(msg))
if err != nil {
return nil, fmt.Errorf("failed to run program %x: %w", x, err)
Expand All @@ -24,7 +24,7 @@ func Exec(ctx context.Context, x string) error {
return nil, fmt.Errorf("%q must return bool", x)
}
if accept {
return [][]byte{msg}, nil
return msg, nil
} else {
return nil, nil
}
Expand Down
4 changes: 2 additions & 2 deletions runner/flatten/flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
)

func Exec(ctx context.Context) error {
return util.Do(ctx, func(msg []byte) ([][]byte, error) {
return util.Do(ctx, func(msg []byte) ([]byte, error) {
v := make(map[string]interface{})
if err := json.Unmarshal(msg, &v); err != nil {
return nil, err
}
if data, err := json.Marshal(bellows.Flatten(v)); err != nil {
return nil, err
} else {
return [][]byte{data}, nil
return data, nil
}
})
}
12 changes: 6 additions & 6 deletions runner/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/juju/fslock"
)

func withLock(dir string, f func() ([][]byte, error)) ([][]byte, error) {
func withLock(dir string, f func() ([]byte, error)) ([]byte, error) {
mu := fslock.New(fmt.Sprintf("%s.lock", dir))
if err := mu.Lock(); err != nil {
return nil, fmt.Errorf("failed to lock %s %w", dir, err)
Expand All @@ -43,7 +43,7 @@ func Exec(ctx context.Context, key string, endOfGroup string, groupFormat dfv1.G
if err != nil {
return fmt.Errorf("failed to compile %q: %w", endOfGroup, err)
}
return util.Do(ctx, func(msg []byte) ([][]byte, error) {
return util.Do(ctx, func(msg []byte) ([]byte, error) {
res, err := expr.Run(prog, util.ExprEnv(msg))
if err != nil {
return nil, fmt.Errorf("failed to run program %q: %w", key, err)
Expand All @@ -56,7 +56,7 @@ func Exec(ctx context.Context, key string, endOfGroup string, groupFormat dfv1.G
if err := os.Mkdir(dir, 0700); util2.IgnoreExist(err) != nil {
return nil, fmt.Errorf("failed to create group sub-dir: %w", err)
}
return withLock(dir, func() ([][]byte, error) {
return withLock(dir, func() ([]byte, error) {
path := filepath.Join(dir, uuid.New().String())
if err := ioutil.WriteFile(path, msg, 0600); err != nil {
return nil, fmt.Errorf("failed to create message file: %w", err)
Expand Down Expand Up @@ -96,7 +96,7 @@ func Exec(ctx context.Context, key string, endOfGroup string, groupFormat dfv1.G
if err != nil {
return nil, fmt.Errorf("failed to marshal messages: %w", err)
}
msgs = [][]byte{data}
return data, os.RemoveAll(dir)
case dfv1.GroupFormatJSONStringArray:
stringMsgs := make([]string, len(items))
for i, bytes := range msgs {
Expand All @@ -106,9 +106,9 @@ func Exec(ctx context.Context, key string, endOfGroup string, groupFormat dfv1.G
if err != nil {
return nil, fmt.Errorf("failed to marshal messages: %w", err)
}
msgs = [][]byte{data}
return data, os.RemoveAll(dir)
}
return msgs, os.RemoveAll(dir)
return nil, fmt.Errorf("unknown group format %q", groupFormat)
})
})
}
4 changes: 2 additions & 2 deletions runner/map/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func Exec(ctx context.Context, x string) error {
if err != nil {
return fmt.Errorf("failed to compile %q: %w", x, err)
}
return util.Do(ctx, func(msg []byte) ([][]byte, error) {
return util.Do(ctx, func(msg []byte) ([]byte, error) {
res, err := expr.Run(prog, util.ExprEnv(msg))
if err != nil {
return nil, err
Expand All @@ -23,6 +23,6 @@ func Exec(ctx context.Context, x string) error {
if !ok {
return nil, fmt.Errorf("must return []byte")
}
return [][]byte{b}, nil
return b, nil
})
}
15 changes: 9 additions & 6 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func Exec(ctx context.Context) error {

connectOut(toSink)

toMain, err := connectTo(ctx)
toMain, err := connectTo(ctx, toSink)
if err != nil {
return err
}
Expand Down Expand Up @@ -366,7 +366,7 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
} else {
w.WriteHeader(200)
w.WriteHeader(204)
}
})
} else {
Expand Down Expand Up @@ -394,7 +394,7 @@ func newKafkaConfig(k *dfv1.Kafka) (*sarama.Config, error) {
return x, nil
}

func connectTo(ctx context.Context) (func([]byte) error, error) {
func connectTo(ctx context.Context, sink func([]byte) error) (func([]byte) error, error) {
in := spec.GetIn()
if in == nil {
logger.Info("no in interface configured")
Expand Down Expand Up @@ -440,8 +440,11 @@ func connectTo(ctx context.Context) (func([]byte) error, error) {
if resp.StatusCode >= 300 {
return fmt.Errorf("failed to send to main: %q %q", resp.Status, body)
}
trace.Info("✔ source → http")
if resp.StatusCode == 201 {
return sink(body)
}
}
trace.Info("✔ source → http")
return nil
}, nil
} else {
Expand All @@ -456,7 +459,7 @@ func waitReady(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
default:
if resp, err := http.Get("http://localhost:8080/ready"); err == nil && resp.StatusCode == 200 {
if resp, err := http.Get("http://localhost:8080/ready"); err == nil && resp.StatusCode < 300 {
logger.Info("HTTP in interface ready")
return nil
}
Expand All @@ -472,7 +475,7 @@ func waitUnready(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
default:
if resp, err := http.Get("http://localhost:8080/ready"); err != nil || resp.StatusCode != 200 {
if resp, err := http.Get("http://localhost:8080/ready"); err != nil || resp.StatusCode >= 300 {
logger.Info("HTTP in interface unready")
return nil
}
Expand Down
Loading

0 comments on commit 2ad2f52

Please sign in to comment.