From db9160a5c65629aa55a8e5f68a32df4254f7eb49 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 23 Aug 2018 15:29:44 +0200 Subject: [PATCH] Added support for jsonpb-encoded workflow specs --- pkg/parse/protobuf/parser.go | 16 ++++--- pkg/parse/protobuf/parser_test.go | 71 +++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 pkg/parse/protobuf/parser_test.go diff --git a/pkg/parse/protobuf/parser.go b/pkg/parse/protobuf/parser.go index bc08e22b..569c096b 100644 --- a/pkg/parse/protobuf/parser.go +++ b/pkg/parse/protobuf/parser.go @@ -1,10 +1,13 @@ package protobuf import ( + "bytes" + "fmt" "io" "io/ioutil" "github.com/fission/fission-workflows/pkg/types" + "github.com/gogo/protobuf/jsonpb" "github.com/golang/protobuf/proto" ) @@ -27,11 +30,14 @@ func (p *Parser) Parse(r io.Reader) (*types.WorkflowSpec, error) { if err != nil { return nil, err } - var wf *types.WorkflowSpec - err = proto.Unmarshal(bs, wf) - if err != nil { - return nil, err + wf := &types.WorkflowSpec{} + protoErr := proto.Unmarshal(bs, wf) + if protoErr != nil { + // Fallback: it might be protobuf serialized into json. + jsonErr := jsonpb.Unmarshal(bytes.NewReader(bs), wf) + if jsonErr != nil { + return nil, fmt.Errorf("failed to parse protobuf-encoded workflow (proto: %v, jsonpb: %v)", protoErr, jsonErr) + } } - return wf, nil } diff --git a/pkg/parse/protobuf/parser_test.go b/pkg/parse/protobuf/parser_test.go new file mode 100644 index 00000000..eff4de9b --- /dev/null +++ b/pkg/parse/protobuf/parser_test.go @@ -0,0 +1,71 @@ +package protobuf + +import ( + "bytes" + "strings" + "testing" + + "github.com/fission/fission-workflows/pkg/types" + "github.com/fission/fission-workflows/pkg/types/typedvalues" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" +) + +func TestParseProto(t *testing.T) { + originalWfSpec := &types.WorkflowSpec{ + ApiVersion: types.WorkflowAPIVersion, + OutputTask: "fakeFinalTask", + Tasks: map[string]*types.TaskSpec{ + "fakeFinalTask": { + FunctionRef: "noop", + Inputs: map[string]*types.TypedValue{ + types.InputMain: typedvalues.MustParse("{$.Tasks.FirstTask.Output}"), + }, + Requires: map[string]*types.TaskDependencyParameters{ + "FirstTask": {}, + }, + }, + "FirstTask": { + FunctionRef: "noop", + Inputs: map[string]*types.TypedValue{ + types.InputMain: typedvalues.MustParse("{$.Invocation.Inputs.default.toUpperCase()}"), + }, + }, + }, + } + msg, err := proto.Marshal(originalWfSpec) + assert.NoError(t, err) + parsedWfSpec, err := Parse(bytes.NewReader(msg)) + assert.NoError(t, err) + assert.Equal(t, originalWfSpec, parsedWfSpec) +} + +func TestParseJson(t *testing.T) { + originalWfSpec := &types.WorkflowSpec{ + ApiVersion: types.WorkflowAPIVersion, + OutputTask: "fakeFinalTask", + Tasks: map[string]*types.TaskSpec{ + "fakeFinalTask": { + FunctionRef: "noop", + Inputs: map[string]*types.TypedValue{ + types.InputMain: typedvalues.MustParse("{$.Tasks.FirstTask.Output}"), + }, + Requires: map[string]*types.TaskDependencyParameters{ + "FirstTask": {}, + }, + }, + "FirstTask": { + FunctionRef: "noop", + Inputs: map[string]*types.TypedValue{ + types.InputMain: typedvalues.MustParse("{$.Invocation.Inputs.default.toUpperCase()}"), + }, + }, + }, + } + msg, err := (&jsonpb.Marshaler{}).MarshalToString(originalWfSpec) + assert.NoError(t, err) + parsedWfSpec, err := Parse(strings.NewReader(msg)) + assert.NoError(t, err) + assert.Equal(t, originalWfSpec, parsedWfSpec) +}