Skip to content

Commit

Permalink
Optimize JSON array data for non client-streaming calls
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Apr 30, 2019
1 parent d763723 commit 16d8c5e
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 58 deletions.
30 changes: 15 additions & 15 deletions runner/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func messageFromMap(input *dynamic.Message, data *map[string]interface{}) error
return nil
}

func createPayloadsFromJSON(data string, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
func createPayloadsFromJSON(data string, mtd *desc.MethodDescriptor) ([]*dynamic.Message, error) {
md := mtd.GetInputType()
var inputs []*dynamic.Message

Expand Down Expand Up @@ -65,16 +65,16 @@ func createPayloadsFromJSON(data string, mtd *desc.MethodDescriptor) (*[]*dynami
}
}

return &inputs, nil
return inputs, nil
}

func createPayloadsFromBinSingleMessage(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
var inputs []*dynamic.Message
func createPayloadsFromBinSingleMessage(binData []byte, mtd *desc.MethodDescriptor) ([]*dynamic.Message, error) {
inputs := make([]*dynamic.Message, 0, 1)
md := mtd.GetInputType()

// return empty array if no data
if len(binData) == 0 {
return &inputs, nil
return inputs, nil
}

// try to unmarshal input as a single message
Expand All @@ -83,19 +83,19 @@ func createPayloadsFromBinSingleMessage(binData []byte, mtd *desc.MethodDescript
if err != nil {
return nil, fmt.Errorf("Error creating message from binary data: %v", err.Error())
}
inputs = make([]*dynamic.Message, 1)
inputs[0] = singleMessage

return &inputs, nil
inputs = append(inputs, singleMessage)

return inputs, nil
}

func createPayloadsFromBinCountDelimited(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
var inputs []*dynamic.Message
func createPayloadsFromBinCountDelimited(binData []byte, mtd *desc.MethodDescriptor) ([]*dynamic.Message, error) {
inputs := make([]*dynamic.Message, 0)
md := mtd.GetInputType()

// return empty array if no data
if len(binData) == 0 {
return &inputs, nil
return inputs, nil
}

// try to unmarshal input as several count-delimited messages
Expand All @@ -115,15 +115,15 @@ func createPayloadsFromBinCountDelimited(binData []byte, mtd *desc.MethodDescrip
inputs = append(inputs, msg)
}

return &inputs, nil
return inputs, nil
}

func createPayloadsFromBin(binData []byte, mtd *desc.MethodDescriptor) (*[]*dynamic.Message, error) {
func createPayloadsFromBin(binData []byte, mtd *desc.MethodDescriptor) ([]*dynamic.Message, error) {
inputs, err := createPayloadsFromBinCountDelimited(binData, mtd)

if err == nil && len(*inputs) > 0 {
if err == nil && len(inputs) > 0 {
return inputs, err
}

return createPayloadsFromBinSingleMessage(binData, mtd)
}
}
40 changes: 20 additions & 20 deletions runner/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice with single element from map for client streaming", func(t *testing.T) {
Expand All @@ -84,8 +84,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdClientStreaming)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice of messages from slice for client streaming", func(t *testing.T) {
Expand All @@ -102,7 +102,7 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdClientStreaming)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 2)
assert.Len(t, inputs, 2)
})

t.Run("fail on invalid shape of data in slice for client streaming", func(t *testing.T) {
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 3)
assert.Len(t, inputs, 3)
})

t.Run("create slice with single object from map for unary with camelCase property", func(t *testing.T) {
Expand All @@ -154,8 +154,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdTestUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice with single object from map for unary with snake_case property", func(t *testing.T) {
Expand All @@ -167,8 +167,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdTestUnary)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice with single object from map for unary with nested camelCase property", func(t *testing.T) {
Expand All @@ -183,8 +183,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdTestUnaryTwo)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice with single object from map for unary with nested snake_case property", func(t *testing.T) {
Expand All @@ -199,8 +199,8 @@ func TestData_createPayloads(t *testing.T) {
inputs, err := createPayloadsFromJSON(string(jsonData), mtdTestUnaryTwo)
assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.NotNil(t, (*inputs)[0])
assert.Len(t, inputs, 1)
assert.NotNil(t, inputs[0])
})

t.Run("create slice from single message binary data", func(t *testing.T) {
Expand All @@ -213,8 +213,8 @@ func TestData_createPayloads(t *testing.T) {

assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 1)
assert.EqualValues(t, msg1.GetName(), (*inputs)[0].GetFieldByName("name"))
assert.Len(t, inputs, 1)
assert.EqualValues(t, msg1.GetName(), inputs[0].GetFieldByName("name"))
})

t.Run("create slice from count-delimited binary data", func(t *testing.T) {
Expand All @@ -231,9 +231,9 @@ func TestData_createPayloads(t *testing.T) {

assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 2)
assert.EqualValues(t, msg1.GetName(), (*inputs)[0].GetFieldByName("name"))
assert.EqualValues(t, msg2.GetName(), (*inputs)[1].GetFieldByName("name"))
assert.Len(t, inputs, 2)
assert.EqualValues(t, msg1.GetName(), inputs[0].GetFieldByName("name"))
assert.EqualValues(t, msg2.GetName(), inputs[1].GetFieldByName("name"))
})

t.Run("on empty binary data returns empty slice", func(t *testing.T) {
Expand All @@ -242,6 +242,6 @@ func TestData_createPayloads(t *testing.T) {

assert.NoError(t, err)
assert.NotNil(t, inputs)
assert.Len(t, *inputs, 0)
assert.Len(t, inputs, 0)
})
}
42 changes: 34 additions & 8 deletions runner/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package runner

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -49,6 +51,8 @@ type Requester struct {

reqCounter int64

arrayJSONData []string

stopReason StopReason
lock sync.Mutex
}
Expand Down Expand Up @@ -120,6 +124,27 @@ func newRequester(c *RunConfig) (*Requester, error) {
// fill in the rest
reqr.mtd = mtd

// fill in JSON string array data for optimization for non client-streaming
reqr.arrayJSONData = nil
if !c.binary && !reqr.mtd.IsClientStreaming() {
if strings.IndexRune(string(c.data), '[') == 0 { // it's an array
var dat []map[string]interface{}
if err := json.Unmarshal(c.data, &dat); err != nil {
return nil, err
}

reqr.arrayJSONData = make([]string, len(dat))
for i, d := range dat {
var strd []byte
if strd, err = json.Marshal(d); err != nil {
return nil, err
}

reqr.arrayJSONData[i] = string(strd)
}
}
}

return reqr, nil
}

Expand Down Expand Up @@ -270,14 +295,15 @@ func (b *Requester) runWorkers() error {
}

w := Worker{
stub: b.stubs[n],
mtd: b.mtd,
config: b.config,
stopCh: b.stopCh,
qpsTick: b.qpsTick,
reqCounter: &b.reqCounter,
nReq: nReqPerWorker,
workerID: wID,
stub: b.stubs[n],
mtd: b.mtd,
config: b.config,
stopCh: b.stopCh,
qpsTick: b.qpsTick,
reqCounter: &b.reqCounter,
nReq: nReqPerWorker,
workerID: wID,
arrayJSONData: b.arrayJSONData,
}

n++ // increment connection counter
Expand Down
45 changes: 30 additions & 15 deletions runner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ type Worker struct {
reqCounter *int64
nReq int
workerID string
cachedMessages *[]*dynamic.Message

// cached messages only for binary
cachedMessages []*dynamic.Message

// non-binary json optimization
arrayJSONData []string
}

func (w *Worker) runWorker() error {
Expand Down Expand Up @@ -59,9 +64,19 @@ func (w *Worker) makeRequest() error {

ctd := newCallTemplateData(w.mtd, w.workerID, reqNum)

inputs, err := w.getMessages(ctd)
if err != nil {
return err
var inputs []*dynamic.Message
var err error

// try the optimized path for JSON data for non client-streaming
if !w.config.binary && len(w.arrayJSONData) > 0 {
indx := int((reqNum - 1) % int64(len(w.arrayJSONData))) // we want to start from inputs[0] so dec reqNum
if inputs, err = w.getMessages(ctd, []byte(w.arrayJSONData[indx])); err != nil {
return err
}
} else {
if inputs, err = w.getMessages(ctd, w.config.data); err != nil {
return err
}
}

mdMap, err := ctd.executeMetadata(string(w.config.metadata))
Expand Down Expand Up @@ -99,30 +114,30 @@ func (w *Worker) makeRequest() error {
_ = w.makeClientStreamingRequest(&ctx, inputs)
}

inputsLen := len(*inputs)
inputsLen := len(inputs)
if inputsLen == 0 {
return fmt.Errorf("no data provided for request")
}
inputIdx := int((reqNum - 1) % int64(inputsLen)) // we want to start from inputs[0] so dec reqNum

if w.mtd.IsServerStreaming() {
_ = w.makeServerStreamingRequest(&ctx, (*inputs)[inputIdx])
_ = w.makeServerStreamingRequest(&ctx, inputs[inputIdx])
}
// TODO: handle response?
_, _ = w.stub.InvokeRpc(ctx, w.mtd, (*inputs)[inputIdx])
_, _ = w.stub.InvokeRpc(ctx, w.mtd, inputs[inputIdx])

return err
}

func (w *Worker) getMessages(ctd *callTemplateData) (*[]*dynamic.Message, error) {
var inputs *[]*dynamic.Message
func (w *Worker) getMessages(ctd *callTemplateData, inputData []byte) ([]*dynamic.Message, error) {
var inputs []*dynamic.Message

if w.cachedMessages != nil {
return w.cachedMessages, nil
}

if !w.config.binary {
data, err := ctd.executeData(string(w.config.data))
data, err := ctd.executeData(string(inputData))
if err != nil {
return nil, err
}
Expand All @@ -133,7 +148,7 @@ func (w *Worker) getMessages(ctd *callTemplateData) (*[]*dynamic.Message, error)
// Json messages are not cached due to templating
} else {
var err error
inputs, err = createPayloadsFromBin(w.config.data, w.mtd)
inputs, err = createPayloadsFromBin(inputData, w.mtd)
if err != nil {
return nil, err
}
Expand All @@ -144,12 +159,12 @@ func (w *Worker) getMessages(ctd *callTemplateData) (*[]*dynamic.Message, error)
return inputs, nil
}

func (w *Worker) makeClientStreamingRequest(ctx *context.Context, input *[]*dynamic.Message) error {
func (w *Worker) makeClientStreamingRequest(ctx *context.Context, input []*dynamic.Message) error {
str, err := w.stub.InvokeRpcClientStream(*ctx, w.mtd)
counter := 0
// TODO: need to handle and propagate errors
for err == nil {
streamInput := *input
streamInput := input
inputLen := len(streamInput)
if input == nil || inputLen == 0 {
// TODO: need to handle error
Expand Down Expand Up @@ -199,15 +214,15 @@ func (w *Worker) makeServerStreamingRequest(ctx *context.Context, input *dynamic
return nil
}

func (w *Worker) makeBidiRequest(ctx *context.Context, input *[]*dynamic.Message) error {
func (w *Worker) makeBidiRequest(ctx *context.Context, input []*dynamic.Message) error {
str, err := w.stub.InvokeRpcBidiStream(*ctx, w.mtd)
if err != nil {
return err
}

counter := 0

streamInput := *input
streamInput := input
inputLen := len(streamInput)

recvDone := make(chan bool)
Expand Down

0 comments on commit 16d8c5e

Please sign in to comment.