Skip to content

Commit

Permalink
implemented count-delimited binary format
Browse files Browse the repository at this point in the history
  • Loading branch information
ezsilmar committed Apr 19, 2019
1 parent 983cad0 commit 95f479e
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 19 deletions.
47 changes: 42 additions & 5 deletions runner/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runner
import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/golang/protobuf/jsonpb"
Expand All @@ -28,7 +29,7 @@ func messageFromMap(input *dynamic.Message, data *map[string]interface{}) error
return nil
}

func createPayloads(data string, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
func createPayloadsFromJson(data string, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
md := mtd.GetInputType()
var inputs []*dynamic.Message

Expand Down Expand Up @@ -67,16 +68,52 @@ func createPayloads(data string, mtd *desc.MethodDescriptor) (*[]*dynamic.Messag
return &inputs, nil
}

func createPayloadsFromBin(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
func createPayloadsFromBinSingleMessage(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
var inputs []*dynamic.Message
md := mtd.GetInputType()

inputs := make([]*dynamic.Message, 1)
inputs[0] = dynamic.NewMessage(md)
// return empty array if no data
if len(binData) == 0 {
return &inputs, nil
}

err := proto.Unmarshal(binData, inputs[0])
// try to unmarshal input as a single message
singleMessage := dynamic.NewMessage(md)
err := proto.Unmarshal(binData, singleMessage)
if err != nil {
return nil, fmt.Errorf("Error creating message from binary data: %v", err.Error())
}
inputs = make([]*dynamic.Message, 1)
inputs[0] = singleMessage

return &inputs, nil
}

func createPayloadsFromBinCountDelimited(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
var inputs []*dynamic.Message
md := mtd.GetInputType()

// return empty array if no data
if len(binData) == 0 {
return &inputs, nil
}

// try to unmarshal input as several count-delimited messages
buffer := proto.NewBuffer(binData)
for {
msg := dynamic.NewMessage(md)
err := buffer.DecodeMessage(msg)

if err == io.ErrUnexpectedEOF {
break
}

if err != nil {
return nil, fmt.Errorf("Error creating message from binary data: %v", err.Error())
}

inputs = append(inputs, msg)
}

return &inputs, nil
}
57 changes: 46 additions & 11 deletions runner/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package runner

import (
"encoding/json"
"github.com/bojand/ghz/testdata"
"github.com/golang/protobuf/proto"
"testing"

"github.com/bojand/ghz/protodesc"
Expand Down Expand Up @@ -43,7 +45,7 @@ func TestData_createPayloads(t *testing.T) {
assert.NotNil(t, mtdTestUnaryTwo)

t.Run("get empty when empty", func(t *testing.T) {
inputs, err := createPayloads("", mtdUnary)
inputs, err := createPayloadsFromJson("", mtdUnary)
assert.NoError(t, err)
assert.Empty(t, inputs)
})
Expand All @@ -55,7 +57,7 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

inputs, err := createPayloads(string(jsonData), mtdUnary)
inputs, err := createPayloadsFromJson(string(jsonData), mtdUnary)
assert.Error(t, err)
assert.Nil(t, inputs)
})
Expand All @@ -66,7 +68,7 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

inputs, err := createPayloads(string(jsonData), mtdUnary)
inputs, err := createPayloadsFromJson(string(jsonData), mtdUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
Expand All @@ -79,7 +81,7 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

inputs, err := createPayloads(string(jsonData), mtdClientStreaming)
inputs, err := createPayloadsFromJson(string(jsonData), mtdClientStreaming)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
Expand All @@ -97,7 +99,7 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(s)

inputs, err := createPayloads(string(jsonData), mtdClientStreaming)
inputs, err := createPayloadsFromJson(string(jsonData), mtdClientStreaming)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 2)
Expand All @@ -118,7 +120,7 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(s)

inputs, err := createPayloads(string(jsonData), mtdClientStreaming)
inputs, err := createPayloadsFromJson(string(jsonData), mtdClientStreaming)
assert.Error(t, err)
assert.Nil(t, inputs)
})
Expand All @@ -137,7 +139,7 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(s)

inputs, err := createPayloads(string(jsonData), mtdUnary)
inputs, err := createPayloadsFromJson(string(jsonData), mtdUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 3)
Expand All @@ -149,7 +151,7 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

inputs, err := createPayloads(string(jsonData), mtdTestUnary)
inputs, err := createPayloadsFromJson(string(jsonData), mtdTestUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
Expand All @@ -162,7 +164,7 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

inputs, err := createPayloads(string(jsonData), mtdTestUnary)
inputs, err := createPayloadsFromJson(string(jsonData), mtdTestUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
Expand All @@ -178,7 +180,7 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

inputs, err := createPayloads(string(jsonData), mtdTestUnaryTwo)
inputs, err := createPayloadsFromJson(string(jsonData), mtdTestUnaryTwo)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
Expand All @@ -194,10 +196,43 @@ func TestData_createPayloads(t *testing.T) {

jsonData, _ := json.Marshal(m1)

inputs, err := createPayloads(string(jsonData), mtdTestUnaryTwo)
inputs, err := createPayloadsFromJson(string(jsonData), mtdTestUnaryTwo)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
})

t.Run("create slice from single message binary data", func(t *testing.T) {
msg1 := &helloworld.HelloRequest{}
msg1.Name = "bob"

binData, err := proto.Marshal(msg1)

inputs, err := createPayloadsFromBinSingleMessage(binData, mtdUnary)

assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.EqualValues(t, msg1.GetName(), (*inputs)[0].GetFieldByName("name"))
})

t.Run("create slice from count-delimited binary data", func(t *testing.T) {
msg1 := &helloworld.HelloRequest{}
msg1.Name = "bob"
msg2 := &helloworld.HelloRequest{}
msg2.Name = "alice"

buf := proto.Buffer{}
_ = buf.EncodeMessage(msg1)
_ = buf.EncodeMessage(msg2)

inputs, err := createPayloadsFromBinCountDelimited(buf.Bytes(), mtdUnary)

assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 2)
assert.EqualValues(t, msg1.GetName(), (*inputs)[0].GetFieldByName("name"))
assert.EqualValues(t, msg2.GetName(), (*inputs)[1].GetFieldByName("name"))
})
}
44 changes: 44 additions & 0 deletions runner/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,50 @@ func TestRunUnary(t *testing.T) {
// we expect the same order of messages with single worker
assert.Equal(t, []string {"0", "1", "2", "0", "1", "2"}, names)
})

// todo fix this test
//t.Run("test round-robin binary", func(t *testing.T) {
// gs.ResetCounters()
//
// buf := proto.Buffer{}
// for i := 0; i < 3; i++ {
// msg := &helloworld.HelloRequest{}
// msg.Name = strconv.Itoa(i)
// err = buf.EncodeMessage(msg)
// assert.NoError(t, err)
// }
// binData := buf.Bytes()
//
// report, err := Run(
// "helloworld.Greeter.SayHello",
// internal.TestLocalhost,
// WithProtoFile("../testdata/greeter.proto", []string{}),
// WithTotalRequests(6),
// WithConcurrency(1),
// WithTimeout(time.Duration(20*time.Second)),
// WithDialTimeout(time.Duration(20*time.Second)),
// WithInsecure(true),
// WithBinaryDataCountDelimited(binData),
// )
//
// assert.NoError(t, err)
// assert.NotNil(t, report)
//
// count := gs.GetCount(callType)
// assert.Equal(t, 6, count)
//
// calls := gs.GetCalls(callType)
// assert.NotNil(t, calls)
// assert.Len(t, calls, 6)
// names := make([]string, 0)
// for _, msgs := range calls {
// for _, msg := range msgs {
// names = append(names, msg.GetName())
// }
// }
//
// assert.Equal(t, []string {"0", "1", "2", "0", "1", "2"}, names)
//})
}

func TestRunServerStreaming(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions runner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ func (w *Worker) makeRequest() error {
if err != nil {
return err
}
inputs, err = createPayloads(string(data), w.mtd)
inputs, err = createPayloadsFromJson(string(data), w.mtd)
if err != nil {
return err
}
} else {
var err error
inputs, err = createPayloadsFromBin(w.config.data, w.mtd)
// todo we need an explicit way to choose between binary formats, it's impossible to distinguish from data itself
inputs, err = createPayloadsFromBinSingleMessage(w.config.data, w.mtd)
if err != nil {
return err
}
Expand Down Expand Up @@ -114,7 +115,7 @@ func (w *Worker) makeRequest() error {

inputsLen := len(*inputs)
if inputsLen == 0 {
return fmt.Errorf("Error: can't create a request without payload. Check your data");
return fmt.Errorf("no data provided for request")
}
inputIdx := int((reqNum - 1) % int64(inputsLen)) // we want to start from inputs[0] so dec reqNum

Expand Down

0 comments on commit 95f479e

Please sign in to comment.